script/dom/stream/
readablestreambyobreader.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::collections::VecDeque;
7use std::mem;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::gc::CustomAutoRooterGuard;
12use js::jsapi::Heap;
13use js::jsval::{JSVal, UndefinedValue};
14use js::realm::CurrentRealm;
15use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
16use js::typedarray::{ArrayBufferView, ArrayBufferViewU8};
17use script_bindings::root::Dom;
18
19use super::byteteereadintorequest::ByteTeeReadIntoRequest;
20use super::readablebytestreamcontroller::ReadableByteStreamController;
21use super::readablestreamgenericreader::ReadableStreamGenericReader;
22use crate::dom::bindings::buffer_source::{BufferSource, HeapBufferSource};
23use crate::dom::bindings::cell::DomRefCell;
24use crate::dom::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::{
25    ReadableStreamBYOBReaderMethods, ReadableStreamBYOBReaderReadOptions,
26};
27use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamReadResult;
28use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
29use crate::dom::bindings::reflector::{
30    DomGlobal, Reflector, reflect_dom_object, reflect_dom_object_with_proto,
31};
32use crate::dom::bindings::root::{DomRoot, MutNullableDom};
33use crate::dom::bindings::trace::RootedTraceableBox;
34use crate::dom::globalscope::GlobalScope;
35use crate::dom::promise::Promise;
36use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
37use crate::dom::stream::readablestream::ReadableStream;
38use crate::realms::{InRealm, enter_realm};
39use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
40
41/// <https://streams.spec.whatwg.org/#read-into-request>
42#[derive(Clone, JSTraceable, MallocSizeOf)]
43pub enum ReadIntoRequest {
44    /// <https://streams.spec.whatwg.org/#byob-reader-read>
45    Read(#[conditional_malloc_size_of] Rc<Promise>),
46    ByteTee {
47        byte_tee_read_into_request: Dom<ByteTeeReadIntoRequest>,
48    },
49}
50
51impl ReadIntoRequest {
52    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-chunk-steps%E2%91%A0>
53    pub fn chunk_steps(&self, chunk: RootedTraceableBox<Heap<JSVal>>, can_gc: CanGc) {
54        match self {
55            ReadIntoRequest::Read(promise) => {
56                // chunk steps, given chunk
57                // Resolve promise with «[ "value" → chunk, "done" → false ]».
58                promise.resolve_native(
59                    &ReadableStreamReadResult {
60                        done: Some(false),
61                        value: chunk,
62                    },
63                    can_gc,
64                );
65            },
66            ReadIntoRequest::ByteTee {
67                byte_tee_read_into_request,
68            } => {
69                byte_tee_read_into_request.enqueue_chunk_steps(
70                    HeapBufferSource::<ArrayBufferViewU8>::new(BufferSource::ArrayBufferView(
71                        RootedTraceableBox::from_box(Heap::boxed(chunk.get().to_object())),
72                    )),
73                )
74            },
75        }
76    }
77
78    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-close-steps%E2%91%A0>
79    pub fn close_steps(&self, chunk: Option<RootedTraceableBox<Heap<JSVal>>>, can_gc: CanGc) {
80        match self {
81            ReadIntoRequest::Read(promise) => match chunk {
82                // close steps, given chunk
83                // Resolve promise with «[ "value" → chunk, "done" → true ]».
84                Some(chunk) => promise.resolve_native(
85                    &ReadableStreamReadResult {
86                        done: Some(true),
87                        value: chunk,
88                    },
89                    can_gc,
90                ),
91                None => {
92                    let result = RootedTraceableBox::new(Heap::default());
93                    result.set(UndefinedValue());
94                    promise.resolve_native(
95                        &ReadableStreamReadResult {
96                            done: Some(true),
97                            value: result,
98                        },
99                        can_gc,
100                    );
101                },
102            },
103            ReadIntoRequest::ByteTee {
104                byte_tee_read_into_request,
105            } => match chunk {
106                Some(chunk) => byte_tee_read_into_request
107                    .close_steps(
108                        Some(HeapBufferSource::<ArrayBufferViewU8>::new(
109                            BufferSource::ArrayBufferView(RootedTraceableBox::from_box(
110                                Heap::boxed(chunk.get().to_object()),
111                            )),
112                        )),
113                        can_gc,
114                    )
115                    .expect("close steps should not fail"),
116                None => byte_tee_read_into_request
117                    .close_steps(None, can_gc)
118                    .expect("close steps should not fail"),
119            },
120        }
121    }
122
123    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-error-steps%E2%91%A0>
124    pub(crate) fn error_steps(&self, e: SafeHandleValue, can_gc: CanGc) {
125        match self {
126            ReadIntoRequest::Read(promise) => {
127                // error steps, given e
128                // Reject promise with e.
129                promise.reject_native(&e, can_gc)
130            },
131            ReadIntoRequest::ByteTee {
132                byte_tee_read_into_request,
133            } => {
134                byte_tee_read_into_request.error_steps();
135            },
136        }
137    }
138}
139
140/// The rejection handler for
141/// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee>
142#[derive(Clone, JSTraceable, MallocSizeOf)]
143#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
144struct ByteTeeClosedPromiseRejectionHandler {
145    branch_1_controller: Dom<ReadableByteStreamController>,
146    branch_2_controller: Dom<ReadableByteStreamController>,
147    #[ignore_malloc_size_of = "Rc"]
148    canceled_1: Rc<Cell<bool>>,
149    #[ignore_malloc_size_of = "Rc"]
150    canceled_2: Rc<Cell<bool>>,
151    #[ignore_malloc_size_of = "Rc"]
152    cancel_promise: Rc<Promise>,
153    #[ignore_malloc_size_of = "Rc"]
154    reader_version: Rc<Cell<u64>>,
155    expected_version: u64,
156}
157
158impl Callback for ByteTeeClosedPromiseRejectionHandler {
159    /// Continuation of <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee>
160    /// Upon rejection of `reader.closedPromise` with reason `r``,
161    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
162        let can_gc = CanGc::from_cx(cx);
163        // If thisReader is not reader, return.
164        if self.reader_version.get() != self.expected_version {
165            return;
166        }
167
168        // Perform ! ReadableByteStreamControllerError(branch1.[[controller]], r).
169        self.branch_1_controller.error(v, can_gc);
170
171        // Perform ! ReadableByteStreamControllerError(branch2.[[controller]], r).
172        self.branch_2_controller.error(v, can_gc);
173
174        // If canceled1 is false or canceled2 is false, resolve cancelPromise with undefined.
175        if !self.canceled_1.get() || !self.canceled_2.get() {
176            self.cancel_promise.resolve_native(&(), can_gc);
177        }
178    }
179}
180
181/// <https://streams.spec.whatwg.org/#readablestreambyobreader>
182#[dom_struct]
183pub(crate) struct ReadableStreamBYOBReader {
184    reflector_: Reflector,
185
186    /// <https://streams.spec.whatwg.org/#readablestreamgenericreader-stream>
187    stream: MutNullableDom<ReadableStream>,
188
189    read_into_requests: DomRefCell<VecDeque<ReadIntoRequest>>,
190
191    /// <https://streams.spec.whatwg.org/#readablestreamgenericreader-closedpromise>
192    #[conditional_malloc_size_of]
193    closed_promise: DomRefCell<Rc<Promise>>,
194}
195
196impl ReadableStreamBYOBReader {
197    fn new_with_proto(
198        global: &GlobalScope,
199        proto: Option<SafeHandleObject>,
200        can_gc: CanGc,
201    ) -> DomRoot<ReadableStreamBYOBReader> {
202        reflect_dom_object_with_proto(
203            Box::new(ReadableStreamBYOBReader::new_inherited(global, can_gc)),
204            global,
205            proto,
206            can_gc,
207        )
208    }
209
210    fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> ReadableStreamBYOBReader {
211        ReadableStreamBYOBReader {
212            reflector_: Reflector::new(),
213            stream: MutNullableDom::new(None),
214            read_into_requests: DomRefCell::new(Default::default()),
215            closed_promise: DomRefCell::new(Promise::new(global, can_gc)),
216        }
217    }
218
219    pub(crate) fn new(global: &GlobalScope, can_gc: CanGc) -> DomRoot<ReadableStreamBYOBReader> {
220        reflect_dom_object(
221            Box::new(Self::new_inherited(global, can_gc)),
222            global,
223            can_gc,
224        )
225    }
226
227    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-byob-reader>
228    pub(crate) fn set_up(
229        &self,
230        stream: &ReadableStream,
231        global: &GlobalScope,
232        can_gc: CanGc,
233    ) -> Fallible<()> {
234        // If ! IsReadableStreamLocked(stream) is true, throw a TypeError exception.
235        if stream.is_locked() {
236            return Err(Error::Type("stream is locked".to_owned()));
237        }
238
239        // If stream.[[controller]] does not implement ReadableByteStreamController, throw a TypeError exception.
240        if !stream.has_byte_controller() {
241            return Err(Error::Type(
242                "stream controller is not a byte stream controller".to_owned(),
243            ));
244        }
245
246        // Perform ! ReadableStreamReaderGenericInitialize(reader, stream).
247        self.generic_initialize(global, stream, can_gc);
248
249        // Set reader.[[readIntoRequests]] to a new empty list.
250        self.read_into_requests.borrow_mut().clear();
251
252        Ok(())
253    }
254
255    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreambyobreaderrelease>
256    pub(crate) fn release(&self, can_gc: CanGc) -> Fallible<()> {
257        // Perform ! ReadableStreamReaderGenericRelease(reader).
258        self.generic_release(can_gc)
259            .expect("Generic release failed");
260        // Let e be a new TypeError exception.
261        let cx = GlobalScope::get_cx();
262        rooted!(in(*cx) let mut error = UndefinedValue());
263        Error::Type("Reader is released".to_owned()).to_jsval(
264            cx,
265            &self.global(),
266            error.handle_mut(),
267            can_gc,
268        );
269
270        // Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e).
271        self.error_read_into_requests(error.handle(), can_gc);
272        Ok(())
273    }
274
275    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreambyobreadererrorreadintorequests>
276    pub(crate) fn error_read_into_requests(&self, e: SafeHandleValue, can_gc: CanGc) {
277        // Reject reader.[[closedPromise]] with e.
278        self.closed_promise.borrow().reject_native(&e, can_gc);
279
280        // Set reader.[[closedPromise]].[[PromiseIsHandled]] to true.
281        self.closed_promise.borrow().set_promise_is_handled();
282
283        // Let readRequests be reader.[[readRequests]].
284        let mut read_into_requests = self.take_read_into_requests();
285
286        // Set reader.[[readIntoRequests]] to a new empty list.
287        for request in read_into_requests.drain(0..) {
288            // Perform readIntoRequest’s error steps, given e.
289            request.error_steps(e, can_gc);
290        }
291    }
292
293    fn take_read_into_requests(&self) -> VecDeque<ReadIntoRequest> {
294        mem::take(&mut *self.read_into_requests.borrow_mut())
295    }
296
297    /// <https://streams.spec.whatwg.org/#readable-stream-add-read-into-request>
298    pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
299        self.read_into_requests
300            .borrow_mut()
301            .push_back(read_request.clone());
302    }
303
304    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>
305    pub(crate) fn cancel(&self, can_gc: CanGc) {
306        // If reader is not undefined and reader implements ReadableStreamBYOBReader,
307        // Let readIntoRequests be reader.[[readIntoRequests]].
308        let mut read_into_requests = self.take_read_into_requests();
309        // Set reader.[[readIntoRequests]] to an empty list.
310        // Perform readIntoRequest’s close steps, given undefined.
311        for request in read_into_requests.drain(0..) {
312            // Perform readIntoRequest’s close steps, given undefined.
313            request.close_steps(None, can_gc);
314        }
315    }
316
317    pub(crate) fn close(&self, can_gc: CanGc) {
318        // Resolve reader.[[closedPromise]] with undefined.
319        self.closed_promise.borrow().resolve_native(&(), can_gc);
320    }
321
322    /// <https://streams.spec.whatwg.org/#readable-stream-byob-reader-read>
323    pub(crate) fn read(
324        &self,
325        cx: SafeJSContext,
326        view: HeapBufferSource<ArrayBufferViewU8>,
327        min: u64,
328        read_into_request: &ReadIntoRequest,
329        can_gc: CanGc,
330    ) {
331        // Let stream be reader.[[stream]].
332
333        // Assert: stream is not undefined.
334        assert!(self.stream.get().is_some());
335
336        let stream = self.stream.get().unwrap();
337
338        // Set stream.[[disturbed]] to true.
339        stream.set_is_disturbed(true);
340        // If stream.[[state]] is "errored", perform readIntoRequest’s error steps given stream.[[storedError]].
341        if stream.is_errored() {
342            let cx = GlobalScope::get_cx();
343            rooted!(in(*cx) let mut error = UndefinedValue());
344            stream.get_stored_error(error.handle_mut());
345
346            read_into_request.error_steps(error.handle(), can_gc);
347        } else {
348            // Otherwise,
349            // perform ! ReadableByteStreamControllerPullInto(stream.[[controller]], view, min, readIntoRequest).
350            stream.perform_pull_into(cx, read_into_request, view, min, can_gc);
351        }
352    }
353
354    pub(crate) fn get_num_read_into_requests(&self) -> usize {
355        self.read_into_requests.borrow().len()
356    }
357
358    pub(crate) fn remove_read_into_request(&self) -> ReadIntoRequest {
359        self.read_into_requests
360            .borrow_mut()
361            .pop_front()
362            .expect("read into requests is empty")
363    }
364
365    #[allow(clippy::too_many_arguments)]
366    pub(crate) fn byte_tee_append_native_handler_to_closed_promise(
367        &self,
368        branch_1: &ReadableStream,
369        branch_2: &ReadableStream,
370        canceled_1: Rc<Cell<bool>>,
371        canceled_2: Rc<Cell<bool>>,
372        cancel_promise: Rc<Promise>,
373        reader_version: Rc<Cell<u64>>,
374        expected_version: u64,
375        can_gc: CanGc,
376    ) {
377        let branch_1_controller = branch_1.get_byte_controller();
378
379        let branch_2_controller = branch_2.get_byte_controller();
380
381        let global = self.global();
382        let handler = PromiseNativeHandler::new(
383            &global,
384            None,
385            Some(Box::new(ByteTeeClosedPromiseRejectionHandler {
386                branch_1_controller: Dom::from_ref(&branch_1_controller),
387                branch_2_controller: Dom::from_ref(&branch_2_controller),
388                canceled_1,
389                canceled_2,
390                cancel_promise,
391                reader_version,
392                expected_version,
393            })),
394            can_gc,
395        );
396
397        let realm = enter_realm(&*global);
398        let comp = InRealm::Entered(&realm);
399
400        self.closed_promise
401            .borrow()
402            .append_native_handler(&handler, comp, can_gc);
403    }
404}
405
406impl ReadableStreamBYOBReaderMethods<crate::DomTypeHolder> for ReadableStreamBYOBReader {
407    /// <https://streams.spec.whatwg.org/#byob-reader-constructor>
408    fn Constructor(
409        global: &GlobalScope,
410        proto: Option<SafeHandleObject>,
411        can_gc: CanGc,
412        stream: &ReadableStream,
413    ) -> Fallible<DomRoot<Self>> {
414        let reader = Self::new_with_proto(global, proto, can_gc);
415
416        // Perform ? SetUpReadableStreamBYOBReader(this, stream).
417        Self::set_up(&reader, stream, global, can_gc)?;
418
419        Ok(reader)
420    }
421
422    /// <https://streams.spec.whatwg.org/#byob-reader-read>
423    fn Read(
424        &self,
425        view: CustomAutoRooterGuard<ArrayBufferView>,
426        options: &ReadableStreamBYOBReaderReadOptions,
427        can_gc: CanGc,
428    ) -> Rc<Promise> {
429        let view = HeapBufferSource::<ArrayBufferViewU8>::from_view(view);
430        let min = options.min;
431        // Let promise be a new promise.
432        let promise = Promise::new(&self.global(), can_gc);
433
434        let cx = GlobalScope::get_cx();
435        // If view.[[ByteLength]] is 0, return a promise rejected with a TypeError exception.
436        if view.byte_length() == 0 {
437            promise.reject_error(Error::Type("view byte length is 0".to_owned()), can_gc);
438            return promise;
439        }
440        // If view.[[ViewedArrayBuffer]].[[ArrayBufferByteLength]] is 0,
441        // return a promise rejected with a TypeError exception.
442        if view.viewed_buffer_array_byte_length(cx) == 0 {
443            promise.reject_error(
444                Error::Type("viewed buffer byte length is 0".to_owned()),
445                can_gc,
446            );
447            return promise;
448        }
449
450        // If ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is true,
451        // return a promise rejected with a TypeError exception.
452        if view.is_detached_buffer(cx) {
453            promise.reject_error(Error::Type("view is detached".to_owned()), can_gc);
454            return promise;
455        }
456
457        // If options["min"] is 0, return a promise rejected with a TypeError exception.
458        if min == 0 {
459            promise.reject_error(Error::Type("min is 0".to_owned()), can_gc);
460            return promise;
461        }
462
463        // If view has a [[TypedArrayName]] internal slot,
464        if view.has_typed_array_name() {
465            // If options["min"] > view.[[ArrayLength]], return a promise rejected with a RangeError exception.
466            if min > (view.get_typed_array_length() as u64) {
467                promise.reject_error(
468                    Error::Range("min is greater than array length".to_owned()),
469                    can_gc,
470                );
471                return promise;
472            }
473        } else {
474            // Otherwise (i.e., it is a DataView),
475            // If options["min"] > view.[[ByteLength]], return a promise rejected with a RangeError exception.
476            if min > (view.byte_length() as u64) {
477                promise.reject_error(
478                    Error::Range("min is greater than byte length".to_owned()),
479                    can_gc,
480                );
481                return promise;
482            }
483        }
484
485        // If this.[[stream]] is undefined, return a promise rejected with a TypeError exception.
486        if self.stream.get().is_none() {
487            promise.reject_error(
488                Error::Type("min is greater than byte length".to_owned()),
489                can_gc,
490            );
491            return promise;
492        }
493
494        // Let readIntoRequest be a new read-into request with the following items:
495        //
496        // chunk steps, given chunk
497        // Resolve promise with «[ "value" → chunk, "done" → false ]».
498        //
499        // close steps, given chunk
500        // Resolve promise with «[ "value" → chunk, "done" → true ]».
501        //
502        // error steps, given e
503        // Reject promise with e
504        let read_into_request = ReadIntoRequest::Read(promise.clone());
505
506        // Perform ! ReadableStreamBYOBReaderRead(this, view, options["min"], readIntoRequest).
507        self.read(cx, view, min, &read_into_request, can_gc);
508
509        // Return promise.
510        promise
511    }
512
513    /// <https://streams.spec.whatwg.org/#byob-reader-release-lock>
514    fn ReleaseLock(&self, can_gc: CanGc) -> Fallible<()> {
515        if self.stream.get().is_none() {
516            // If this.[[stream]] is undefined, return.
517            return Ok(());
518        }
519
520        // Perform !ReadableStreamBYOBReaderRelease(this).
521        self.release(can_gc)
522    }
523
524    /// <https://streams.spec.whatwg.org/#generic-reader-closed>
525    fn Closed(&self) -> Rc<Promise> {
526        self.closed()
527    }
528
529    /// <https://streams.spec.whatwg.org/#generic-reader-cancel>
530    fn Cancel(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
531        self.generic_cancel(cx, &self.global(), reason, can_gc)
532    }
533}
534
535impl ReadableStreamGenericReader for ReadableStreamBYOBReader {
536    fn get_closed_promise(&self) -> Rc<Promise> {
537        self.closed_promise.borrow().clone()
538    }
539
540    fn set_closed_promise(&self, promise: Rc<Promise>) {
541        *self.closed_promise.borrow_mut() = promise;
542    }
543
544    fn set_stream(&self, stream: Option<&ReadableStream>) {
545        self.stream.set(stream);
546    }
547
548    fn get_stream(&self) -> Option<DomRoot<ReadableStream>> {
549        self.stream.get()
550    }
551
552    fn as_byob_reader(&self) -> Option<&ReadableStreamBYOBReader> {
553        Some(self)
554    }
555}