script/dom/stream/
readablestreambyobreader.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::collections::VecDeque;
7use std::mem;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::context::JSContext;
12use js::gc::CustomAutoRooterGuard;
13use js::jsapi::Heap;
14use js::jsval::{JSVal, UndefinedValue};
15use js::realm::CurrentRealm;
16use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
17use js::typedarray::{ArrayBufferView, ArrayBufferViewU8};
18use script_bindings::root::Dom;
19
20use super::byteteereadintorequest::ByteTeeReadIntoRequest;
21use super::readablebytestreamcontroller::ReadableByteStreamController;
22use super::readablestreamgenericreader::ReadableStreamGenericReader;
23use crate::dom::bindings::buffer_source::{BufferSource, HeapBufferSource};
24use crate::dom::bindings::cell::DomRefCell;
25use crate::dom::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::{
26    ReadableStreamBYOBReaderMethods, ReadableStreamBYOBReaderReadOptions,
27};
28use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamReadResult;
29use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
30use crate::dom::bindings::reflector::{
31    DomGlobal, Reflector, reflect_dom_object, reflect_dom_object_with_proto,
32};
33use crate::dom::bindings::root::{DomRoot, MutNullableDom};
34use crate::dom::bindings::trace::RootedTraceableBox;
35use crate::dom::globalscope::GlobalScope;
36use crate::dom::promise::Promise;
37use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
38use crate::dom::stream::readablestream::ReadableStream;
39use crate::realms::{InRealm, enter_realm};
40use crate::script_runtime::CanGc;
41
42/// <https://streams.spec.whatwg.org/#read-into-request>
43#[derive(Clone, JSTraceable, MallocSizeOf)]
44pub enum ReadIntoRequest {
45    /// <https://streams.spec.whatwg.org/#byob-reader-read>
46    Read(#[conditional_malloc_size_of] Rc<Promise>),
47    ByteTee {
48        byte_tee_read_into_request: Dom<ByteTeeReadIntoRequest>,
49    },
50}
51
52impl ReadIntoRequest {
53    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-chunk-steps%E2%91%A0>
54    pub fn chunk_steps(&self, chunk: RootedTraceableBox<Heap<JSVal>>, can_gc: CanGc) {
55        match self {
56            ReadIntoRequest::Read(promise) => {
57                // chunk steps, given chunk
58                // Resolve promise with «[ "value" → chunk, "done" → false ]».
59                promise.resolve_native(
60                    &ReadableStreamReadResult {
61                        done: Some(false),
62                        value: chunk,
63                    },
64                    can_gc,
65                );
66            },
67            ReadIntoRequest::ByteTee {
68                byte_tee_read_into_request,
69            } => {
70                byte_tee_read_into_request.enqueue_chunk_steps(
71                    HeapBufferSource::<ArrayBufferViewU8>::new(BufferSource::ArrayBufferView(
72                        RootedTraceableBox::from_box(Heap::boxed(chunk.get().to_object())),
73                    )),
74                )
75            },
76        }
77    }
78
79    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-close-steps%E2%91%A0>
80    pub fn close_steps(&self, cx: &mut JSContext, chunk: Option<RootedTraceableBox<Heap<JSVal>>>) {
81        match self {
82            ReadIntoRequest::Read(promise) => match chunk {
83                // close steps, given chunk
84                // Resolve promise with «[ "value" → chunk, "done" → true ]».
85                Some(chunk) => promise.resolve_native(
86                    &ReadableStreamReadResult {
87                        done: Some(true),
88                        value: chunk,
89                    },
90                    CanGc::from_cx(cx),
91                ),
92                None => {
93                    let result = RootedTraceableBox::new(Heap::default());
94                    result.set(UndefinedValue());
95                    promise.resolve_native(
96                        &ReadableStreamReadResult {
97                            done: Some(true),
98                            value: result,
99                        },
100                        CanGc::from_cx(cx),
101                    );
102                },
103            },
104            ReadIntoRequest::ByteTee {
105                byte_tee_read_into_request,
106            } => match chunk {
107                Some(chunk) => byte_tee_read_into_request
108                    .close_steps(
109                        cx,
110                        Some(HeapBufferSource::<ArrayBufferViewU8>::new(
111                            BufferSource::ArrayBufferView(RootedTraceableBox::from_box(
112                                Heap::boxed(chunk.get().to_object()),
113                            )),
114                        )),
115                    )
116                    .expect("close steps should not fail"),
117                None => byte_tee_read_into_request
118                    .close_steps(cx, None)
119                    .expect("close steps should not fail"),
120            },
121        }
122    }
123
124    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-error-steps%E2%91%A0>
125    pub(crate) fn error_steps(&self, e: SafeHandleValue, can_gc: CanGc) {
126        match self {
127            ReadIntoRequest::Read(promise) => {
128                // error steps, given e
129                // Reject promise with e.
130                promise.reject_native(&e, can_gc)
131            },
132            ReadIntoRequest::ByteTee {
133                byte_tee_read_into_request,
134            } => {
135                byte_tee_read_into_request.error_steps();
136            },
137        }
138    }
139}
140
141/// The rejection handler for
142/// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee>
143#[derive(Clone, JSTraceable, MallocSizeOf)]
144#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
145struct ByteTeeClosedPromiseRejectionHandler {
146    branch_1_controller: Dom<ReadableByteStreamController>,
147    branch_2_controller: Dom<ReadableByteStreamController>,
148    #[conditional_malloc_size_of]
149    canceled_1: Rc<Cell<bool>>,
150    #[conditional_malloc_size_of]
151    canceled_2: Rc<Cell<bool>>,
152    #[conditional_malloc_size_of]
153    cancel_promise: Rc<Promise>,
154    #[conditional_malloc_size_of]
155    reader_version: Rc<Cell<u64>>,
156    expected_version: u64,
157}
158
159impl Callback for ByteTeeClosedPromiseRejectionHandler {
160    /// Continuation of <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee>
161    /// Upon rejection of `reader.closedPromise` with reason `r``,
162    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
163        // If thisReader is not reader, return.
164        if self.reader_version.get() != self.expected_version {
165            return;
166        }
167
168        // Perform ! ReadableByteStreamControllerError(branch1.[[controller]], r).
169        self.branch_1_controller.error(cx, v);
170
171        // Perform ! ReadableByteStreamControllerError(branch2.[[controller]], r).
172        self.branch_2_controller.error(cx, v);
173
174        // If canceled1 is false or canceled2 is false, resolve cancelPromise with undefined.
175        if !self.canceled_1.get() || !self.canceled_2.get() {
176            self.cancel_promise.resolve_native(&(), CanGc::from_cx(cx));
177        }
178    }
179}
180
181/// <https://streams.spec.whatwg.org/#readablestreambyobreader>
182#[dom_struct]
183pub(crate) struct ReadableStreamBYOBReader {
184    reflector_: Reflector,
185
186    /// <https://streams.spec.whatwg.org/#readablestreamgenericreader-stream>
187    stream: MutNullableDom<ReadableStream>,
188
189    read_into_requests: DomRefCell<VecDeque<ReadIntoRequest>>,
190
191    /// <https://streams.spec.whatwg.org/#readablestreamgenericreader-closedpromise>
192    #[conditional_malloc_size_of]
193    closed_promise: DomRefCell<Rc<Promise>>,
194}
195
196impl ReadableStreamBYOBReader {
197    fn new_with_proto(
198        global: &GlobalScope,
199        proto: Option<SafeHandleObject>,
200        can_gc: CanGc,
201    ) -> DomRoot<ReadableStreamBYOBReader> {
202        reflect_dom_object_with_proto(
203            Box::new(ReadableStreamBYOBReader::new_inherited(global, can_gc)),
204            global,
205            proto,
206            can_gc,
207        )
208    }
209
210    fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> ReadableStreamBYOBReader {
211        ReadableStreamBYOBReader {
212            reflector_: Reflector::new(),
213            stream: MutNullableDom::new(None),
214            read_into_requests: DomRefCell::new(Default::default()),
215            closed_promise: DomRefCell::new(Promise::new(global, can_gc)),
216        }
217    }
218
219    pub(crate) fn new(global: &GlobalScope, can_gc: CanGc) -> DomRoot<ReadableStreamBYOBReader> {
220        reflect_dom_object(
221            Box::new(Self::new_inherited(global, can_gc)),
222            global,
223            can_gc,
224        )
225    }
226
227    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-byob-reader>
228    pub(crate) fn set_up(
229        &self,
230        stream: &ReadableStream,
231        global: &GlobalScope,
232        can_gc: CanGc,
233    ) -> Fallible<()> {
234        // If ! IsReadableStreamLocked(stream) is true, throw a TypeError exception.
235        if stream.is_locked() {
236            return Err(Error::Type(c"stream is locked".to_owned()));
237        }
238
239        // If stream.[[controller]] does not implement ReadableByteStreamController, throw a TypeError exception.
240        if !stream.has_byte_controller() {
241            return Err(Error::Type(
242                c"stream controller is not a byte stream controller".to_owned(),
243            ));
244        }
245
246        // Perform ! ReadableStreamReaderGenericInitialize(reader, stream).
247        self.generic_initialize(global, stream, can_gc);
248
249        // Set reader.[[readIntoRequests]] to a new empty list.
250        self.read_into_requests.borrow_mut().clear();
251
252        Ok(())
253    }
254
255    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreambyobreaderrelease>
256    pub(crate) fn release(&self, can_gc: CanGc) -> Fallible<()> {
257        // Perform ! ReadableStreamReaderGenericRelease(reader).
258        self.generic_release(can_gc)
259            .expect("Generic release failed");
260        // Let e be a new TypeError exception.
261        let cx = GlobalScope::get_cx();
262        rooted!(in(*cx) let mut error = UndefinedValue());
263        Error::Type(c"Reader is released".to_owned()).to_jsval(
264            cx,
265            &self.global(),
266            error.handle_mut(),
267            can_gc,
268        );
269
270        // Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e).
271        self.error_read_into_requests(error.handle(), can_gc);
272        Ok(())
273    }
274
275    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreambyobreadererrorreadintorequests>
276    pub(crate) fn error_read_into_requests(&self, e: SafeHandleValue, can_gc: CanGc) {
277        // Reject reader.[[closedPromise]] with e.
278        self.closed_promise.borrow().reject_native(&e, can_gc);
279
280        // Set reader.[[closedPromise]].[[PromiseIsHandled]] to true.
281        self.closed_promise.borrow().set_promise_is_handled();
282
283        // Let readRequests be reader.[[readRequests]].
284        let mut read_into_requests = self.take_read_into_requests();
285
286        // Set reader.[[readIntoRequests]] to a new empty list.
287        for request in read_into_requests.drain(0..) {
288            // Perform readIntoRequest’s error steps, given e.
289            request.error_steps(e, can_gc);
290        }
291    }
292
293    fn take_read_into_requests(&self) -> VecDeque<ReadIntoRequest> {
294        mem::take(&mut *self.read_into_requests.borrow_mut())
295    }
296
297    /// <https://streams.spec.whatwg.org/#readable-stream-add-read-into-request>
298    pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
299        self.read_into_requests
300            .borrow_mut()
301            .push_back(read_request.clone());
302    }
303
304    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>
305    pub(crate) fn cancel(&self, cx: &mut JSContext) {
306        // If reader is not undefined and reader implements ReadableStreamBYOBReader,
307        // Let readIntoRequests be reader.[[readIntoRequests]].
308        let mut read_into_requests = self.take_read_into_requests();
309        // Set reader.[[readIntoRequests]] to an empty list.
310        // Perform readIntoRequest’s close steps, given undefined.
311        for request in read_into_requests.drain(0..) {
312            // Perform readIntoRequest’s close steps, given undefined.
313            request.close_steps(cx, None);
314        }
315    }
316
317    pub(crate) fn close(&self, can_gc: CanGc) {
318        // Resolve reader.[[closedPromise]] with undefined.
319        self.closed_promise.borrow().resolve_native(&(), can_gc);
320    }
321
322    /// <https://streams.spec.whatwg.org/#readable-stream-byob-reader-read>
323    pub(crate) fn read(
324        &self,
325        cx: &mut JSContext,
326        view: HeapBufferSource<ArrayBufferViewU8>,
327        min: u64,
328        read_into_request: &ReadIntoRequest,
329    ) {
330        // Let stream be reader.[[stream]].
331
332        // Assert: stream is not undefined.
333        assert!(self.stream.get().is_some());
334
335        let stream = self.stream.get().unwrap();
336
337        // Set stream.[[disturbed]] to true.
338        stream.set_is_disturbed(true);
339        // If stream.[[state]] is "errored", perform readIntoRequest’s error steps given stream.[[storedError]].
340        if stream.is_errored() {
341            rooted!(&in(cx) let mut error = UndefinedValue());
342            stream.get_stored_error(error.handle_mut());
343
344            read_into_request.error_steps(error.handle(), CanGc::from_cx(cx));
345        } else {
346            // Otherwise,
347            // perform ! ReadableByteStreamControllerPullInto(stream.[[controller]], view, min, readIntoRequest).
348            stream.perform_pull_into(cx, read_into_request, view, min);
349        }
350    }
351
352    pub(crate) fn get_num_read_into_requests(&self) -> usize {
353        self.read_into_requests.borrow().len()
354    }
355
356    pub(crate) fn remove_read_into_request(&self) -> ReadIntoRequest {
357        self.read_into_requests
358            .borrow_mut()
359            .pop_front()
360            .expect("read into requests is empty")
361    }
362
363    #[allow(clippy::too_many_arguments)]
364    pub(crate) fn byte_tee_append_native_handler_to_closed_promise(
365        &self,
366        branch_1: &ReadableStream,
367        branch_2: &ReadableStream,
368        canceled_1: Rc<Cell<bool>>,
369        canceled_2: Rc<Cell<bool>>,
370        cancel_promise: Rc<Promise>,
371        reader_version: Rc<Cell<u64>>,
372        expected_version: u64,
373        can_gc: CanGc,
374    ) {
375        let branch_1_controller = branch_1.get_byte_controller();
376
377        let branch_2_controller = branch_2.get_byte_controller();
378
379        let global = self.global();
380        let handler = PromiseNativeHandler::new(
381            &global,
382            None,
383            Some(Box::new(ByteTeeClosedPromiseRejectionHandler {
384                branch_1_controller: Dom::from_ref(&branch_1_controller),
385                branch_2_controller: Dom::from_ref(&branch_2_controller),
386                canceled_1,
387                canceled_2,
388                cancel_promise,
389                reader_version,
390                expected_version,
391            })),
392            can_gc,
393        );
394
395        let realm = enter_realm(&*global);
396        let comp = InRealm::Entered(&realm);
397
398        self.closed_promise
399            .borrow()
400            .append_native_handler(&handler, comp, can_gc);
401    }
402}
403
404impl ReadableStreamBYOBReaderMethods<crate::DomTypeHolder> for ReadableStreamBYOBReader {
405    /// <https://streams.spec.whatwg.org/#byob-reader-constructor>
406    fn Constructor(
407        global: &GlobalScope,
408        proto: Option<SafeHandleObject>,
409        can_gc: CanGc,
410        stream: &ReadableStream,
411    ) -> Fallible<DomRoot<Self>> {
412        let reader = Self::new_with_proto(global, proto, can_gc);
413
414        // Perform ? SetUpReadableStreamBYOBReader(this, stream).
415        Self::set_up(&reader, stream, global, can_gc)?;
416
417        Ok(reader)
418    }
419
420    /// <https://streams.spec.whatwg.org/#byob-reader-read>
421    fn Read(
422        &self,
423        cx: &mut JSContext,
424        view: CustomAutoRooterGuard<ArrayBufferView>,
425        options: &ReadableStreamBYOBReaderReadOptions,
426    ) -> Rc<Promise> {
427        let view = HeapBufferSource::<ArrayBufferViewU8>::from_view(view);
428        let min = options.min;
429        // Let promise be a new promise.
430        let promise = Promise::new2(cx, &self.global());
431
432        // If view.[[ByteLength]] is 0, return a promise rejected with a TypeError exception.
433        if view.byte_length() == 0 {
434            promise.reject_error(
435                Error::Type(c"view byte length is 0".to_owned()),
436                CanGc::from_cx(cx),
437            );
438            return promise;
439        }
440        // If view.[[ViewedArrayBuffer]].[[ArrayBufferByteLength]] is 0,
441        // return a promise rejected with a TypeError exception.
442        if view.viewed_buffer_array_byte_length(cx.into()) == 0 {
443            promise.reject_error(
444                Error::Type(c"viewed buffer byte length is 0".to_owned()),
445                CanGc::from_cx(cx),
446            );
447            return promise;
448        }
449
450        // If ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is true,
451        // return a promise rejected with a TypeError exception.
452        if view.is_detached_buffer(cx.into()) {
453            promise.reject_error(
454                Error::Type(c"view is detached".to_owned()),
455                CanGc::from_cx(cx),
456            );
457            return promise;
458        }
459
460        // If options["min"] is 0, return a promise rejected with a TypeError exception.
461        if min == 0 {
462            promise.reject_error(Error::Type(c"min is 0".to_owned()), CanGc::from_cx(cx));
463            return promise;
464        }
465
466        // If view has a [[TypedArrayName]] internal slot,
467        if view.has_typed_array_name() {
468            // If options["min"] > view.[[ArrayLength]], return a promise rejected with a RangeError exception.
469            if min > (view.get_typed_array_length() as u64) {
470                promise.reject_error(
471                    Error::Range(c"min is greater than array length".to_owned()),
472                    CanGc::from_cx(cx),
473                );
474                return promise;
475            }
476        } else {
477            // Otherwise (i.e., it is a DataView),
478            // If options["min"] > view.[[ByteLength]], return a promise rejected with a RangeError exception.
479            if min > (view.byte_length() as u64) {
480                promise.reject_error(
481                    Error::Range(c"min is greater than byte length".to_owned()),
482                    CanGc::from_cx(cx),
483                );
484                return promise;
485            }
486        }
487
488        // If this.[[stream]] is undefined, return a promise rejected with a TypeError exception.
489        if self.stream.get().is_none() {
490            promise.reject_error(
491                Error::Type(c"min is greater than byte length".to_owned()),
492                CanGc::from_cx(cx),
493            );
494            return promise;
495        }
496
497        // Let readIntoRequest be a new read-into request with the following items:
498        //
499        // chunk steps, given chunk
500        // Resolve promise with «[ "value" → chunk, "done" → false ]».
501        //
502        // close steps, given chunk
503        // Resolve promise with «[ "value" → chunk, "done" → true ]».
504        //
505        // error steps, given e
506        // Reject promise with e
507        let read_into_request = ReadIntoRequest::Read(promise.clone());
508
509        // Perform ! ReadableStreamBYOBReaderRead(this, view, options["min"], readIntoRequest).
510        self.read(cx, view, min, &read_into_request);
511
512        // Return promise.
513        promise
514    }
515
516    /// <https://streams.spec.whatwg.org/#byob-reader-release-lock>
517    fn ReleaseLock(&self, can_gc: CanGc) -> Fallible<()> {
518        if self.stream.get().is_none() {
519            // If this.[[stream]] is undefined, return.
520            return Ok(());
521        }
522
523        // Perform !ReadableStreamBYOBReaderRelease(this).
524        self.release(can_gc)
525    }
526
527    /// <https://streams.spec.whatwg.org/#generic-reader-closed>
528    fn Closed(&self) -> Rc<Promise> {
529        self.closed()
530    }
531
532    /// <https://streams.spec.whatwg.org/#generic-reader-cancel>
533    fn Cancel(&self, cx: &mut JSContext, reason: SafeHandleValue) -> Rc<Promise> {
534        self.generic_cancel(cx, &self.global(), reason)
535    }
536}
537
538impl ReadableStreamGenericReader for ReadableStreamBYOBReader {
539    fn get_closed_promise(&self) -> Rc<Promise> {
540        self.closed_promise.borrow().clone()
541    }
542
543    fn set_closed_promise(&self, promise: Rc<Promise>) {
544        *self.closed_promise.borrow_mut() = promise;
545    }
546
547    fn set_stream(&self, stream: Option<&ReadableStream>) {
548        self.stream.set(stream);
549    }
550
551    fn get_stream(&self) -> Option<DomRoot<ReadableStream>> {
552        self.stream.get()
553    }
554
555    fn as_byob_reader(&self) -> Option<&ReadableStreamBYOBReader> {
556        Some(self)
557    }
558}