tendril/
stream.rs

1// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2// https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3// <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
4// option. This file may not be copied, modified, or distributed
5// except according to those terms.
6
7//! Streams of tendrils.
8
9use crate::fmt;
10use crate::{Atomicity, NonAtomic, Tendril};
11
12use std::borrow::Cow;
13use std::fs::File;
14use std::io;
15use std::marker::PhantomData;
16use std::path::Path;
17
18#[cfg(feature = "encoding_rs")]
19use encoding_rs::{self, DecoderResult};
20use utf8;
21
22/// Trait for types that can process a tendril.
23///
24/// This is a "push" interface, unlike the "pull" interface of
25/// `Iterator<Item=Tendril<F>>`. The push interface matches
26/// [html5ever][] and other incremental parsers with a similar
27/// architecture.
28///
29/// [html5ever]: https://github.com/servo/html5ever
30pub trait TendrilSink<F, A = NonAtomic>
31where
32    F: fmt::Format,
33    A: Atomicity,
34{
35    /// Process this tendril.
36    fn process(&mut self, t: Tendril<F, A>);
37
38    /// Indicates that an error has occurred.
39    fn error(&mut self, desc: Cow<'static, str>);
40
41    /// What the overall result of processing is.
42    type Output;
43
44    /// Indicates the end of the stream.
45    fn finish(self) -> Self::Output;
46
47    /// Process one tendril and finish.
48    fn one<T>(mut self, t: T) -> Self::Output
49    where
50        Self: Sized,
51        T: Into<Tendril<F, A>>,
52    {
53        self.process(t.into());
54        self.finish()
55    }
56
57    /// Consume an iterator of tendrils, processing each item, then finish.
58    fn from_iter<I>(mut self, i: I) -> Self::Output
59    where
60        Self: Sized,
61        I: IntoIterator,
62        I::Item: Into<Tendril<F, A>>,
63    {
64        for t in i {
65            self.process(t.into())
66        }
67        self.finish()
68    }
69
70    /// Read from the given stream of bytes until exhaustion and process incrementally,
71    /// then finish. Return `Err` at the first I/O error.
72    fn read_from<R>(mut self, r: &mut R) -> io::Result<Self::Output>
73    where
74        Self: Sized,
75        R: io::Read,
76        F: fmt::SliceFormat<Slice = [u8]>,
77    {
78        const BUFFER_SIZE: u32 = 4 * 1024;
79        loop {
80            let mut tendril = Tendril::<F, A>::new();
81            // FIXME: this exposes uninitialized bytes to a generic R type
82            // this is fine for R=File which never reads these bytes,
83            // but user-defined types might.
84            // The standard library pushes zeros to `Vec<u8>` for that reason.
85            unsafe {
86                tendril.push_uninitialized(BUFFER_SIZE);
87            }
88            loop {
89                match r.read(&mut tendril) {
90                    Ok(0) => return Ok(self.finish()),
91                    Ok(n) => {
92                        tendril.pop_back(BUFFER_SIZE - n as u32);
93                        self.process(tendril);
94                        break;
95                    },
96                    Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {},
97                    Err(e) => return Err(e),
98                }
99            }
100        }
101    }
102
103    /// Read from the file at the given path and process incrementally,
104    /// then finish. Return `Err` at the first I/O error.
105    fn from_file<P>(self, path: P) -> io::Result<Self::Output>
106    where
107        Self: Sized,
108        P: AsRef<Path>,
109        F: fmt::SliceFormat<Slice = [u8]>,
110    {
111        self.read_from(&mut File::open(path)?)
112    }
113}
114
115/// A `TendrilSink` adaptor that takes bytes, decodes them as UTF-8,
116/// lossily replace ill-formed byte sequences with U+FFFD replacement characters,
117/// and emits Unicode (`StrTendril`).
118///
119/// This does not allocate memory: the output is either subtendrils on the input,
120/// on inline tendrils for a single code point.
121pub struct Utf8LossyDecoder<Sink, A = NonAtomic>
122where
123    Sink: TendrilSink<fmt::UTF8, A>,
124    A: Atomicity,
125{
126    pub inner_sink: Sink,
127    incomplete: Option<utf8::Incomplete>,
128    marker: PhantomData<A>,
129}
130
131impl<Sink, A> Utf8LossyDecoder<Sink, A>
132where
133    Sink: TendrilSink<fmt::UTF8, A>,
134    A: Atomicity,
135{
136    /// Create a new incremental UTF-8 decoder.
137    #[inline]
138    pub fn new(inner_sink: Sink) -> Self {
139        Utf8LossyDecoder {
140            inner_sink,
141            incomplete: None,
142            marker: PhantomData,
143        }
144    }
145}
146
147impl<Sink, A> TendrilSink<fmt::Bytes, A> for Utf8LossyDecoder<Sink, A>
148where
149    Sink: TendrilSink<fmt::UTF8, A>,
150    A: Atomicity,
151{
152    #[inline]
153    fn process(&mut self, mut t: Tendril<fmt::Bytes, A>) {
154        // FIXME: remove take() and map() when non-lexical borrows are stable.
155        if let Some(mut incomplete) = self.incomplete.take() {
156            let resume_at = incomplete.try_complete(&t).map(|(result, rest)| {
157                match result {
158                    Ok(s) => self.inner_sink.process(Tendril::from_slice(s)),
159                    Err(_) => {
160                        self.inner_sink.error("invalid byte sequence".into());
161                        self.inner_sink
162                            .process(Tendril::from_slice(utf8::REPLACEMENT_CHARACTER));
163                    },
164                }
165                t.len() - rest.len()
166            });
167            match resume_at {
168                None => {
169                    self.incomplete = Some(incomplete);
170                    return;
171                },
172                Some(resume_at) => t.pop_front(resume_at as u32),
173            }
174        }
175        while !t.is_empty() {
176            let unborrowed_result = match utf8::decode(&t) {
177                Ok(s) => {
178                    debug_assert!(s.as_ptr() == t.as_ptr());
179                    debug_assert!(s.len() == t.len());
180                    Ok(())
181                },
182                Err(utf8::DecodeError::Invalid {
183                    valid_prefix,
184                    invalid_sequence,
185                    ..
186                }) => {
187                    debug_assert!(valid_prefix.as_ptr() == t.as_ptr());
188                    debug_assert!(valid_prefix.len() <= t.len());
189                    Err((
190                        valid_prefix.len(),
191                        Err(valid_prefix.len() + invalid_sequence.len()),
192                    ))
193                },
194                Err(utf8::DecodeError::Incomplete {
195                    valid_prefix,
196                    incomplete_suffix,
197                }) => {
198                    debug_assert!(valid_prefix.as_ptr() == t.as_ptr());
199                    debug_assert!(valid_prefix.len() <= t.len());
200                    Err((valid_prefix.len(), Ok(incomplete_suffix)))
201                },
202            };
203            match unborrowed_result {
204                Ok(()) => {
205                    unsafe { self.inner_sink.process(t.reinterpret_without_validating()) }
206                    return;
207                },
208                Err((valid_len, and_then)) => {
209                    if valid_len > 0 {
210                        let subtendril = t.subtendril(0, valid_len as u32);
211                        unsafe {
212                            self.inner_sink
213                                .process(subtendril.reinterpret_without_validating())
214                        }
215                    }
216                    match and_then {
217                        Ok(incomplete) => {
218                            self.incomplete = Some(incomplete);
219                            return;
220                        },
221                        Err(offset) => {
222                            self.inner_sink.error("invalid byte sequence".into());
223                            self.inner_sink
224                                .process(Tendril::from_slice(utf8::REPLACEMENT_CHARACTER));
225                            t.pop_front(offset as u32);
226                        },
227                    }
228                },
229            }
230        }
231    }
232
233    #[inline]
234    fn error(&mut self, desc: Cow<'static, str>) {
235        self.inner_sink.error(desc);
236    }
237
238    type Output = Sink::Output;
239
240    #[inline]
241    fn finish(mut self) -> Sink::Output {
242        if self.incomplete.is_some() {
243            self.inner_sink
244                .error("incomplete byte sequence at end of stream".into());
245            self.inner_sink
246                .process(Tendril::from_slice(utf8::REPLACEMENT_CHARACTER));
247        }
248        self.inner_sink.finish()
249    }
250}
251
252/// A `TendrilSink` adaptor that takes bytes, decodes them as the given character encoding,
253/// lossily replace ill-formed byte sequences with U+FFFD replacement characters,
254/// and emits Unicode (`StrTendril`).
255///
256/// This allocates new tendrils for encodings other than UTF-8.
257#[cfg(feature = "encoding_rs")]
258pub struct LossyDecoder<Sink, A = NonAtomic>
259where
260    Sink: TendrilSink<fmt::UTF8, A>,
261    A: Atomicity,
262{
263    inner: LossyDecoderInner<Sink, A>,
264}
265
266#[cfg(feature = "encoding_rs")]
267enum LossyDecoderInner<Sink, A>
268where
269    Sink: TendrilSink<fmt::UTF8, A>,
270    A: Atomicity,
271{
272    Utf8(Utf8LossyDecoder<Sink, A>),
273    #[cfg(feature = "encoding_rs")]
274    EncodingRs(encoding_rs::Decoder, Sink),
275}
276
277#[cfg(feature = "encoding_rs")]
278impl<Sink, A> LossyDecoder<Sink, A>
279where
280    Sink: TendrilSink<fmt::UTF8, A>,
281    A: Atomicity,
282{
283    /// Create a new incremental decoder using the encoding_rs crate.
284    #[cfg(feature = "encoding_rs")]
285    #[inline]
286    pub fn new_encoding_rs(encoding: &'static encoding_rs::Encoding, sink: Sink) -> Self {
287        if encoding == encoding_rs::UTF_8 {
288            return Self::utf8(sink);
289        }
290        Self {
291            inner: LossyDecoderInner::EncodingRs(encoding.new_decoder(), sink),
292        }
293    }
294
295    /// Create a new incremental decoder using the encoding_rs crate.
296    ///
297    /// This is a more flexible version of [Self::new_encoding_rs], allowing the caller
298    /// to configure the decoder themselves.
299    #[cfg(feature = "encoding_rs")]
300    #[inline]
301    pub fn new_from_encoding_rs_decoder(decoder: encoding_rs::Decoder, sink: Sink) -> Self {
302        Self {
303            inner: LossyDecoderInner::EncodingRs(decoder, sink),
304        }
305    }
306
307    /// Create a new incremental decoder for the UTF-8 encoding.
308    ///
309    /// This is useful for content that is known at run-time to be UTF-8
310    /// (whereas `Utf8LossyDecoder` requires knowning at compile-time.)
311    #[inline]
312    pub fn utf8(sink: Sink) -> LossyDecoder<Sink, A> {
313        LossyDecoder {
314            inner: LossyDecoderInner::Utf8(Utf8LossyDecoder::new(sink)),
315        }
316    }
317
318    /// Give a reference to the inner sink.
319    pub fn inner_sink(&self) -> &Sink {
320        match self.inner {
321            LossyDecoderInner::Utf8(ref utf8) => &utf8.inner_sink,
322            #[cfg(feature = "encoding_rs")]
323            LossyDecoderInner::EncodingRs(_, ref inner_sink) => inner_sink,
324        }
325    }
326
327    /// Give a mutable reference to the inner sink.
328    pub fn inner_sink_mut(&mut self) -> &mut Sink {
329        match self.inner {
330            LossyDecoderInner::Utf8(ref mut utf8) => &mut utf8.inner_sink,
331            #[cfg(feature = "encoding_rs")]
332            LossyDecoderInner::EncodingRs(_, ref mut inner_sink) => inner_sink,
333        }
334    }
335}
336
337#[cfg(feature = "encoding_rs")]
338impl<Sink, A> TendrilSink<fmt::Bytes, A> for LossyDecoder<Sink, A>
339where
340    Sink: TendrilSink<fmt::UTF8, A>,
341    A: Atomicity,
342{
343    #[inline]
344    fn process(&mut self, t: Tendril<fmt::Bytes, A>) {
345        match self.inner {
346            LossyDecoderInner::Utf8(ref mut utf8) => utf8.process(t),
347            #[cfg(feature = "encoding_rs")]
348            LossyDecoderInner::EncodingRs(ref mut decoder, ref mut sink) => {
349                if t.is_empty() {
350                    return;
351                }
352                decode_to_sink(t, decoder, sink, false);
353            },
354        }
355    }
356
357    #[inline]
358    fn error(&mut self, desc: Cow<'static, str>) {
359        match self.inner {
360            LossyDecoderInner::Utf8(ref mut utf8) => utf8.error(desc),
361            #[cfg(feature = "encoding_rs")]
362            LossyDecoderInner::EncodingRs(_, ref mut sink) => sink.error(desc),
363        }
364    }
365
366    type Output = Sink::Output;
367
368    #[inline]
369    fn finish(self) -> Sink::Output {
370        match self.inner {
371            LossyDecoderInner::Utf8(utf8) => utf8.finish(),
372            #[cfg(feature = "encoding_rs")]
373            LossyDecoderInner::EncodingRs(mut decoder, mut sink) => {
374                decode_to_sink(Tendril::new(), &mut decoder, &mut sink, true);
375                sink.finish()
376            },
377        }
378    }
379}
380
381#[cfg(feature = "encoding_rs")]
382fn decode_to_sink<Sink, A>(
383    mut t: Tendril<fmt::Bytes, A>,
384    decoder: &mut encoding_rs::Decoder,
385    sink: &mut Sink,
386    last: bool,
387) where
388    Sink: TendrilSink<fmt::UTF8, A>,
389    A: Atomicity,
390{
391    loop {
392        let mut out = <Tendril<fmt::Bytes, A>>::new();
393        let max_len = decoder
394            .max_utf8_buffer_length_without_replacement(t.len())
395            .unwrap_or(8192);
396        unsafe {
397            out.push_uninitialized(std::cmp::min(max_len as u32, 8192));
398        }
399        let (result, bytes_read, bytes_written) =
400            decoder.decode_to_utf8_without_replacement(&t, &mut out, last);
401        if bytes_written > 0 {
402            sink.process(unsafe {
403                out.subtendril(0, bytes_written as u32)
404                    .reinterpret_without_validating()
405            });
406        }
407        match result {
408            DecoderResult::InputEmpty => return,
409            DecoderResult::OutputFull => {},
410            DecoderResult::Malformed(_, _) => {
411                sink.error(Cow::Borrowed("invalid sequence"));
412                sink.process("\u{FFFD}".into());
413            },
414        }
415        t.pop_front(bytes_read as u32);
416        if t.is_empty() {
417            return;
418        }
419    }
420}
421
422#[cfg(test)]
423mod test {
424    use super::{TendrilSink, Utf8LossyDecoder};
425    use crate::fmt;
426    use crate::{Atomicity, NonAtomic, Tendril};
427    use std::borrow::Cow;
428
429    #[cfg(feature = "encoding_rs")]
430    use super::LossyDecoder;
431    #[cfg(feature = "encoding_rs")]
432    use crate::SliceExt;
433
434    #[cfg(feature = "encoding_rs")]
435    use encoding_rs as enc_rs;
436
437    struct Accumulate<A>
438    where
439        A: Atomicity,
440    {
441        tendrils: Vec<Tendril<fmt::UTF8, A>>,
442        errors: Vec<String>,
443    }
444
445    impl<A> Accumulate<A>
446    where
447        A: Atomicity,
448    {
449        fn new() -> Accumulate<A> {
450            Accumulate {
451                tendrils: vec![],
452                errors: vec![],
453            }
454        }
455    }
456
457    impl<A> TendrilSink<fmt::UTF8, A> for Accumulate<A>
458    where
459        A: Atomicity,
460    {
461        fn process(&mut self, t: Tendril<fmt::UTF8, A>) {
462            self.tendrils.push(t);
463        }
464
465        fn error(&mut self, desc: Cow<'static, str>) {
466            self.errors.push(desc.into_owned());
467        }
468
469        type Output = (Vec<Tendril<fmt::UTF8, A>>, Vec<String>);
470
471        fn finish(self) -> Self::Output {
472            (self.tendrils, self.errors)
473        }
474    }
475
476    fn check_utf8(input: &[&[u8]], expected: &[&str], errs: usize) {
477        let decoder = Utf8LossyDecoder::new(Accumulate::<NonAtomic>::new());
478        let (tendrils, errors) = decoder.from_iter(input.iter().cloned());
479        assert_eq!(
480            expected,
481            &*tendrils.iter().map(|t| &**t).collect::<Vec<_>>()
482        );
483        assert_eq!(errs, errors.len());
484    }
485
486    #[test]
487    fn utf8() {
488        check_utf8(&[], &[], 0);
489        check_utf8(&[b""], &[], 0);
490        check_utf8(&[b"xyz"], &["xyz"], 0);
491        check_utf8(&[b"x", b"y", b"z"], &["x", "y", "z"], 0);
492
493        check_utf8(&[b"xy\xEA\x99\xAEzw"], &["xy\u{a66e}zw"], 0);
494        check_utf8(&[b"xy\xEA", b"\x99\xAEzw"], &["xy", "\u{a66e}z", "w"], 0);
495        check_utf8(&[b"xy\xEA\x99", b"\xAEzw"], &["xy", "\u{a66e}z", "w"], 0);
496        check_utf8(
497            &[b"xy\xEA", b"\x99", b"\xAEzw"],
498            &["xy", "\u{a66e}z", "w"],
499            0,
500        );
501        check_utf8(&[b"\xEA", b"", b"\x99", b"", b"\xAE"], &["\u{a66e}"], 0);
502        check_utf8(
503            &[b"", b"\xEA", b"", b"\x99", b"", b"\xAE", b""],
504            &["\u{a66e}"],
505            0,
506        );
507
508        check_utf8(
509            &[b"xy\xEA", b"\xFF", b"\x99\xAEz"],
510            &["xy", "\u{fffd}", "\u{fffd}", "\u{fffd}", "\u{fffd}", "z"],
511            4,
512        );
513        check_utf8(
514            &[b"xy\xEA\x99", b"\xFFz"],
515            &["xy", "\u{fffd}", "\u{fffd}", "z"],
516            2,
517        );
518
519        check_utf8(&[b"\xC5\x91\xC5\x91\xC5\x91"], &["őőő"], 0);
520        check_utf8(
521            &[b"\xC5\x91", b"\xC5\x91", b"\xC5\x91"],
522            &["ő", "ő", "ő"],
523            0,
524        );
525        check_utf8(
526            &[b"\xC5", b"\x91\xC5", b"\x91\xC5", b"\x91"],
527            &["ő", "ő", "ő"],
528            0,
529        );
530        check_utf8(
531            &[b"\xC5", b"\x91\xff", b"\x91\xC5", b"\x91"],
532            &["ő", "\u{fffd}", "\u{fffd}", "ő"],
533            2,
534        );
535
536        // incomplete char at end of input
537        check_utf8(&[b"\xC0"], &["\u{fffd}"], 1);
538        check_utf8(&[b"\xEA\x99"], &["\u{fffd}"], 1);
539    }
540
541    #[cfg(feature = "encoding_rs")]
542    fn check_decode(
543        mut decoder: LossyDecoder<Accumulate<NonAtomic>>,
544        input: &[&[u8]],
545        expected: &str,
546        errs: usize,
547    ) {
548        for x in input {
549            decoder.process(x.to_tendril());
550        }
551        let (tendrils, errors) = decoder.finish();
552        let mut tendril: Tendril<fmt::UTF8> = Tendril::new();
553        for t in tendrils {
554            tendril.push_tendril(&t);
555        }
556        assert_eq!(expected, &*tendril);
557        assert_eq!(errs, errors.len());
558    }
559
560    #[cfg(feature = "encoding_rs")]
561    pub type Tests = &'static [(&'static [&'static [u8]], &'static str, usize)];
562
563    #[cfg(feature = "encoding_rs")]
564    const UTF_8: Tests = &[
565        (&[], "", 0),
566        (&[b""], "", 0),
567        (&[b"xyz"], "xyz", 0),
568        (&[b"x", b"y", b"z"], "xyz", 0),
569        (&[b"\xEA\x99\xAE"], "\u{a66e}", 0),
570        (&[b"\xEA", b"\x99\xAE"], "\u{a66e}", 0),
571        (&[b"\xEA\x99", b"\xAE"], "\u{a66e}", 0),
572        (&[b"\xEA", b"\x99", b"\xAE"], "\u{a66e}", 0),
573        (&[b"\xEA", b"", b"\x99", b"", b"\xAE"], "\u{a66e}", 0),
574        (
575            &[b"", b"\xEA", b"", b"\x99", b"", b"\xAE", b""],
576            "\u{a66e}",
577            0,
578        ),
579        (&[b"xy\xEA", b"\x99\xAEz"], "xy\u{a66e}z", 0),
580        (
581            &[b"xy\xEA", b"\xFF", b"\x99\xAEz"],
582            "xy\u{fffd}\u{fffd}\u{fffd}\u{fffd}z",
583            4,
584        ),
585        (&[b"xy\xEA\x99", b"\xFFz"], "xy\u{fffd}\u{fffd}z", 2),
586        // incomplete char at end of input
587        (&[b"\xC0"], "\u{fffd}", 1),
588        (&[b"\xEA\x99"], "\u{fffd}", 1),
589    ];
590
591    #[cfg(feature = "encoding_rs")]
592    #[test]
593    fn decode_utf8_encoding_rs() {
594        for &(input, expected, errs) in UTF_8 {
595            let decoder = LossyDecoder::new_encoding_rs(enc_rs::UTF_8, Accumulate::new());
596            check_decode(decoder, input, expected, errs);
597        }
598    }
599
600    #[cfg(feature = "encoding_rs")]
601    const KOI8_U: Tests = &[
602        (&[b"\xfc\xce\xc5\xd2\xc7\xc9\xd1"], "Энергия", 0),
603        (&[b"\xfc\xce", b"\xc5\xd2\xc7\xc9\xd1"], "Энергия", 0),
604        (&[b"\xfc\xce", b"\xc5\xd2\xc7", b"\xc9\xd1"], "Энергия", 0),
605        (
606            &[b"\xfc\xce", b"", b"\xc5\xd2\xc7", b"\xc9\xd1", b""],
607            "Энергия",
608            0,
609        ),
610    ];
611
612    #[cfg(feature = "encoding_rs")]
613    #[test]
614    fn decode_koi8_u_encoding_rs() {
615        for &(input, expected, errs) in KOI8_U {
616            let decoder = LossyDecoder::new_encoding_rs(enc_rs::KOI8_U, Accumulate::new());
617            check_decode(decoder, input, expected, errs);
618        }
619    }
620
621    #[cfg(feature = "encoding_rs")]
622    const WINDOWS_949: Tests = &[
623        (&[], "", 0),
624        (&[b""], "", 0),
625        (&[b"\xbe\xc8\xb3\xe7"], "안녕", 0),
626        (&[b"\xbe", b"\xc8\xb3\xe7"], "안녕", 0),
627        (&[b"\xbe", b"", b"\xc8\xb3\xe7"], "안녕", 0),
628        (
629            &[b"\xbe\xc8\xb3\xe7\xc7\xcf\xbc\xbc\xbf\xe4"],
630            "안녕하세요",
631            0,
632        ),
633        (&[b"\xbe\xc8\xb3\xe7\xc7"], "안녕\u{fffd}", 1),
634        (&[b"\xbe", b"", b"\xc8\xb3"], "안\u{fffd}", 1),
635        (&[b"\xbe\x28\xb3\xe7"], "\u{fffd}(녕", 1),
636    ];
637
638    #[cfg(feature = "encoding_rs")]
639    #[test]
640    fn decode_windows_949_encoding_rs() {
641        for &(input, expected, errs) in WINDOWS_949 {
642            let decoder = LossyDecoder::new_encoding_rs(enc_rs::EUC_KR, Accumulate::new());
643            check_decode(decoder, input, expected, errs);
644        }
645    }
646
647    #[test]
648    fn read_from() {
649        let decoder = Utf8LossyDecoder::new(Accumulate::<NonAtomic>::new());
650        let mut bytes: &[u8] = b"foo\xffbar";
651        let (tendrils, errors) = decoder.read_from(&mut bytes).unwrap();
652        assert_eq!(
653            &*tendrils.iter().map(|t| &**t).collect::<Vec<_>>(),
654            &["foo", "\u{FFFD}", "bar"]
655        );
656        assert_eq!(errors, &["invalid byte sequence"]);
657    }
658}