Skip to main content

script/dom/stream/
readablestreambyobreader.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::collections::VecDeque;
7use std::mem;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::context::JSContext;
12use js::gc::CustomAutoRooterGuard;
13use js::jsapi::Heap;
14use js::jsval::{JSVal, UndefinedValue};
15use js::realm::CurrentRealm;
16use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
17use js::typedarray::{ArrayBufferView, ArrayBufferViewU8};
18use script_bindings::cell::DomRefCell;
19use script_bindings::reflector::{Reflector, reflect_dom_object, reflect_dom_object_with_proto};
20use script_bindings::root::Dom;
21
22use super::byteteereadintorequest::ByteTeeReadIntoRequest;
23use super::readablebytestreamcontroller::ReadableByteStreamController;
24use super::readablestreamgenericreader::ReadableStreamGenericReader;
25use crate::dom::bindings::buffer_source::{BufferSource, HeapBufferSource};
26use crate::dom::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::{
27    ReadableStreamBYOBReaderMethods, ReadableStreamBYOBReaderReadOptions,
28};
29use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamReadResult;
30use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
31use crate::dom::bindings::reflector::DomGlobal;
32use crate::dom::bindings::root::{DomRoot, MutNullableDom};
33use crate::dom::bindings::trace::RootedTraceableBox;
34use crate::dom::globalscope::GlobalScope;
35use crate::dom::promise::Promise;
36use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
37use crate::dom::stream::readablestream::ReadableStream;
38use crate::realms::enter_auto_realm;
39use crate::script_runtime::CanGc;
40
41/// <https://streams.spec.whatwg.org/#read-into-request>
42#[derive(Clone, JSTraceable, MallocSizeOf)]
43pub enum ReadIntoRequest {
44    /// <https://streams.spec.whatwg.org/#byob-reader-read>
45    Read(#[conditional_malloc_size_of] Rc<Promise>),
46    ByteTee {
47        byte_tee_read_into_request: Dom<ByteTeeReadIntoRequest>,
48    },
49}
50
51impl ReadIntoRequest {
52    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-chunk-steps%E2%91%A0>
53    pub fn chunk_steps(&self, chunk: RootedTraceableBox<Heap<JSVal>>, can_gc: CanGc) {
54        match self {
55            ReadIntoRequest::Read(promise) => {
56                // chunk steps, given chunk
57                // Resolve promise with «[ "value" → chunk, "done" → false ]».
58                promise.resolve_native(
59                    &ReadableStreamReadResult {
60                        done: Some(false),
61                        value: chunk,
62                    },
63                    can_gc,
64                );
65            },
66            ReadIntoRequest::ByteTee {
67                byte_tee_read_into_request,
68            } => byte_tee_read_into_request.enqueue_chunk_steps(RootedTraceableBox::new(
69                HeapBufferSource::<ArrayBufferViewU8>::new(BufferSource::ArrayBufferView(
70                    Heap::boxed(chunk.get().to_object()),
71                )),
72            )),
73        }
74    }
75
76    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-close-steps%E2%91%A0>
77    pub fn close_steps(&self, cx: &mut JSContext, chunk: Option<RootedTraceableBox<Heap<JSVal>>>) {
78        match self {
79            ReadIntoRequest::Read(promise) => match chunk {
80                // close steps, given chunk
81                // Resolve promise with «[ "value" → chunk, "done" → true ]».
82                Some(chunk) => promise.resolve_native(
83                    &ReadableStreamReadResult {
84                        done: Some(true),
85                        value: chunk,
86                    },
87                    CanGc::from_cx(cx),
88                ),
89                None => {
90                    let result = RootedTraceableBox::new(Heap::default());
91                    result.set(UndefinedValue());
92                    promise.resolve_native(
93                        &ReadableStreamReadResult {
94                            done: Some(true),
95                            value: result,
96                        },
97                        CanGc::from_cx(cx),
98                    );
99                },
100            },
101            ReadIntoRequest::ByteTee {
102                byte_tee_read_into_request,
103            } => match chunk {
104                Some(chunk) => byte_tee_read_into_request
105                    .close_steps(
106                        cx,
107                        Some(RootedTraceableBox::new(
108                            HeapBufferSource::<ArrayBufferViewU8>::new(
109                                BufferSource::ArrayBufferView(Heap::boxed(chunk.get().to_object())),
110                            ),
111                        )),
112                    )
113                    .expect("close steps should not fail"),
114                None => byte_tee_read_into_request
115                    .close_steps(cx, None)
116                    .expect("close steps should not fail"),
117            },
118        }
119    }
120
121    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-error-steps%E2%91%A0>
122    pub(crate) fn error_steps(&self, e: SafeHandleValue, can_gc: CanGc) {
123        match self {
124            ReadIntoRequest::Read(promise) => {
125                // error steps, given e
126                // Reject promise with e.
127                promise.reject_native(&e, can_gc)
128            },
129            ReadIntoRequest::ByteTee {
130                byte_tee_read_into_request,
131            } => {
132                byte_tee_read_into_request.error_steps();
133            },
134        }
135    }
136}
137
138/// The rejection handler for
139/// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee>
140#[derive(Clone, JSTraceable, MallocSizeOf)]
141#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
142struct ByteTeeClosedPromiseRejectionHandler {
143    branch_1_controller: Dom<ReadableByteStreamController>,
144    branch_2_controller: Dom<ReadableByteStreamController>,
145    #[conditional_malloc_size_of]
146    canceled_1: Rc<Cell<bool>>,
147    #[conditional_malloc_size_of]
148    canceled_2: Rc<Cell<bool>>,
149    #[conditional_malloc_size_of]
150    cancel_promise: Rc<Promise>,
151    #[conditional_malloc_size_of]
152    reader_version: Rc<Cell<u64>>,
153    expected_version: u64,
154}
155
156impl Callback for ByteTeeClosedPromiseRejectionHandler {
157    /// Continuation of <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee>
158    /// Upon rejection of `reader.closedPromise` with reason `r``,
159    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
160        // If thisReader is not reader, return.
161        if self.reader_version.get() != self.expected_version {
162            return;
163        }
164
165        // Perform ! ReadableByteStreamControllerError(branch1.[[controller]], r).
166        self.branch_1_controller.error(cx, v);
167
168        // Perform ! ReadableByteStreamControllerError(branch2.[[controller]], r).
169        self.branch_2_controller.error(cx, v);
170
171        // If canceled1 is false or canceled2 is false, resolve cancelPromise with undefined.
172        if !self.canceled_1.get() || !self.canceled_2.get() {
173            self.cancel_promise.resolve_native(&(), CanGc::from_cx(cx));
174        }
175    }
176}
177
178/// <https://streams.spec.whatwg.org/#readablestreambyobreader>
179#[dom_struct]
180pub(crate) struct ReadableStreamBYOBReader {
181    reflector_: Reflector,
182
183    /// <https://streams.spec.whatwg.org/#readablestreamgenericreader-stream>
184    stream: MutNullableDom<ReadableStream>,
185
186    read_into_requests: DomRefCell<VecDeque<ReadIntoRequest>>,
187
188    /// <https://streams.spec.whatwg.org/#readablestreamgenericreader-closedpromise>
189    #[conditional_malloc_size_of]
190    closed_promise: DomRefCell<Rc<Promise>>,
191}
192
193impl ReadableStreamBYOBReader {
194    fn new_with_proto(
195        global: &GlobalScope,
196        proto: Option<SafeHandleObject>,
197        can_gc: CanGc,
198    ) -> DomRoot<ReadableStreamBYOBReader> {
199        reflect_dom_object_with_proto(
200            Box::new(ReadableStreamBYOBReader::new_inherited(global, can_gc)),
201            global,
202            proto,
203            can_gc,
204        )
205    }
206
207    fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> ReadableStreamBYOBReader {
208        ReadableStreamBYOBReader {
209            reflector_: Reflector::new(),
210            stream: MutNullableDom::new(None),
211            read_into_requests: DomRefCell::new(Default::default()),
212            closed_promise: DomRefCell::new(Promise::new(global, can_gc)),
213        }
214    }
215
216    pub(crate) fn new(global: &GlobalScope, can_gc: CanGc) -> DomRoot<ReadableStreamBYOBReader> {
217        reflect_dom_object(
218            Box::new(Self::new_inherited(global, can_gc)),
219            global,
220            can_gc,
221        )
222    }
223
224    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-byob-reader>
225    pub(crate) fn set_up(
226        &self,
227        stream: &ReadableStream,
228        global: &GlobalScope,
229        can_gc: CanGc,
230    ) -> Fallible<()> {
231        // If ! IsReadableStreamLocked(stream) is true, throw a TypeError exception.
232        if stream.is_locked() {
233            return Err(Error::Type(c"stream is locked".to_owned()));
234        }
235
236        // If stream.[[controller]] does not implement ReadableByteStreamController, throw a TypeError exception.
237        if !stream.has_byte_controller() {
238            return Err(Error::Type(
239                c"stream controller is not a byte stream controller".to_owned(),
240            ));
241        }
242
243        // Perform ! ReadableStreamReaderGenericInitialize(reader, stream).
244        self.generic_initialize(global, stream, can_gc);
245
246        // Set reader.[[readIntoRequests]] to a new empty list.
247        self.read_into_requests.borrow_mut().clear();
248
249        Ok(())
250    }
251
252    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreambyobreaderrelease>
253    pub(crate) fn release(&self, can_gc: CanGc) -> Fallible<()> {
254        // Perform ! ReadableStreamReaderGenericRelease(reader).
255        self.generic_release(can_gc)
256            .expect("Generic release failed");
257        // Let e be a new TypeError exception.
258        let cx = GlobalScope::get_cx();
259        rooted!(in(*cx) let mut error = UndefinedValue());
260        Error::Type(c"Reader is released".to_owned()).to_jsval(
261            cx,
262            &self.global(),
263            error.handle_mut(),
264            can_gc,
265        );
266
267        // Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e).
268        self.error_read_into_requests(error.handle(), can_gc);
269        Ok(())
270    }
271
272    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreambyobreadererrorreadintorequests>
273    pub(crate) fn error_read_into_requests(&self, e: SafeHandleValue, can_gc: CanGc) {
274        // Reject reader.[[closedPromise]] with e.
275        self.closed_promise.borrow().reject_native(&e, can_gc);
276
277        // Set reader.[[closedPromise]].[[PromiseIsHandled]] to true.
278        self.closed_promise.borrow().set_promise_is_handled();
279
280        // Let readRequests be reader.[[readRequests]].
281        let mut read_into_requests = self.take_read_into_requests();
282
283        // Set reader.[[readIntoRequests]] to a new empty list.
284        for request in read_into_requests.drain(0..) {
285            // Perform readIntoRequest’s error steps, given e.
286            request.error_steps(e, can_gc);
287        }
288    }
289
290    fn take_read_into_requests(&self) -> VecDeque<ReadIntoRequest> {
291        mem::take(&mut *self.read_into_requests.borrow_mut())
292    }
293
294    /// <https://streams.spec.whatwg.org/#readable-stream-add-read-into-request>
295    pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
296        self.read_into_requests
297            .borrow_mut()
298            .push_back(read_request.clone());
299    }
300
301    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>
302    pub(crate) fn cancel(&self, cx: &mut JSContext) {
303        // If reader is not undefined and reader implements ReadableStreamBYOBReader,
304        // Let readIntoRequests be reader.[[readIntoRequests]].
305        let mut read_into_requests = self.take_read_into_requests();
306        // Set reader.[[readIntoRequests]] to an empty list.
307        // Perform readIntoRequest’s close steps, given undefined.
308        for request in read_into_requests.drain(0..) {
309            // Perform readIntoRequest’s close steps, given undefined.
310            request.close_steps(cx, None);
311        }
312    }
313
314    pub(crate) fn close(&self, can_gc: CanGc) {
315        // Resolve reader.[[closedPromise]] with undefined.
316        self.closed_promise.borrow().resolve_native(&(), can_gc);
317    }
318
319    /// <https://streams.spec.whatwg.org/#readable-stream-byob-reader-read>
320    pub(crate) fn read(
321        &self,
322        cx: &mut JSContext,
323        view: &HeapBufferSource<ArrayBufferViewU8>,
324        min: u64,
325        read_into_request: &ReadIntoRequest,
326    ) {
327        // Let stream be reader.[[stream]].
328
329        // Assert: stream is not undefined.
330        assert!(self.stream.get().is_some());
331
332        let stream = self.stream.get().unwrap();
333
334        // Set stream.[[disturbed]] to true.
335        stream.set_is_disturbed(true);
336        // If stream.[[state]] is "errored", perform readIntoRequest’s error steps given stream.[[storedError]].
337        if stream.is_errored() {
338            rooted!(&in(cx) let mut error = UndefinedValue());
339            stream.get_stored_error(error.handle_mut());
340
341            read_into_request.error_steps(error.handle(), CanGc::from_cx(cx));
342        } else {
343            // Otherwise,
344            // perform ! ReadableByteStreamControllerPullInto(stream.[[controller]], view, min, readIntoRequest).
345            stream.perform_pull_into(cx, read_into_request, view, min);
346        }
347    }
348
349    pub(crate) fn get_num_read_into_requests(&self) -> usize {
350        self.read_into_requests.borrow().len()
351    }
352
353    pub(crate) fn remove_read_into_request(&self) -> ReadIntoRequest {
354        self.read_into_requests
355            .borrow_mut()
356            .pop_front()
357            .expect("read into requests is empty")
358    }
359
360    #[allow(clippy::too_many_arguments)]
361    pub(crate) fn byte_tee_append_native_handler_to_closed_promise(
362        &self,
363        cx: &mut JSContext,
364        branch_1: &ReadableStream,
365        branch_2: &ReadableStream,
366        canceled_1: Rc<Cell<bool>>,
367        canceled_2: Rc<Cell<bool>>,
368        cancel_promise: Rc<Promise>,
369        reader_version: Rc<Cell<u64>>,
370        expected_version: u64,
371    ) {
372        let branch_1_controller = branch_1.get_byte_controller();
373
374        let branch_2_controller = branch_2.get_byte_controller();
375
376        let global = self.global();
377        let handler = PromiseNativeHandler::new(
378            &global,
379            None,
380            Some(Box::new(ByteTeeClosedPromiseRejectionHandler {
381                branch_1_controller: Dom::from_ref(&branch_1_controller),
382                branch_2_controller: Dom::from_ref(&branch_2_controller),
383                canceled_1,
384                canceled_2,
385                cancel_promise,
386                reader_version,
387                expected_version,
388            })),
389            CanGc::from_cx(cx),
390        );
391
392        let mut realm = enter_auto_realm(cx, &*global);
393        let cx = &mut realm.current_realm();
394
395        self.closed_promise
396            .borrow()
397            .append_native_handler(cx, &handler);
398    }
399}
400
401impl ReadableStreamBYOBReaderMethods<crate::DomTypeHolder> for ReadableStreamBYOBReader {
402    /// <https://streams.spec.whatwg.org/#byob-reader-constructor>
403    fn Constructor(
404        global: &GlobalScope,
405        proto: Option<SafeHandleObject>,
406        can_gc: CanGc,
407        stream: &ReadableStream,
408    ) -> Fallible<DomRoot<Self>> {
409        let reader = Self::new_with_proto(global, proto, can_gc);
410
411        // Perform ? SetUpReadableStreamBYOBReader(this, stream).
412        Self::set_up(&reader, stream, global, can_gc)?;
413
414        Ok(reader)
415    }
416
417    /// <https://streams.spec.whatwg.org/#byob-reader-read>
418    fn Read(
419        &self,
420        cx: &mut JSContext,
421        view: CustomAutoRooterGuard<ArrayBufferView>,
422        options: &ReadableStreamBYOBReaderReadOptions,
423    ) -> Rc<Promise> {
424        let view = HeapBufferSource::<ArrayBufferViewU8>::from_view(view);
425        let min = options.min;
426        // Let promise be a new promise.
427        let promise = Promise::new2(cx, &self.global());
428
429        // If view.[[ByteLength]] is 0, return a promise rejected with a TypeError exception.
430        if view.byte_length() == 0 {
431            promise.reject_error(
432                Error::Type(c"view byte length is 0".to_owned()),
433                CanGc::from_cx(cx),
434            );
435            return promise;
436        }
437        // If view.[[ViewedArrayBuffer]].[[ArrayBufferByteLength]] is 0,
438        // return a promise rejected with a TypeError exception.
439        if view.viewed_buffer_array_byte_length(cx.into()) == 0 {
440            promise.reject_error(
441                Error::Type(c"viewed buffer byte length is 0".to_owned()),
442                CanGc::from_cx(cx),
443            );
444            return promise;
445        }
446
447        // If ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is true,
448        // return a promise rejected with a TypeError exception.
449        if view.is_detached_buffer(cx.into()) {
450            promise.reject_error(
451                Error::Type(c"view is detached".to_owned()),
452                CanGc::from_cx(cx),
453            );
454            return promise;
455        }
456
457        // If options["min"] is 0, return a promise rejected with a TypeError exception.
458        if min == 0 {
459            promise.reject_error(Error::Type(c"min is 0".to_owned()), CanGc::from_cx(cx));
460            return promise;
461        }
462
463        // If view has a [[TypedArrayName]] internal slot,
464        if view.has_typed_array_name() {
465            // If options["min"] > view.[[ArrayLength]], return a promise rejected with a RangeError exception.
466            if min > (view.get_typed_array_length() as u64) {
467                promise.reject_error(
468                    Error::Range(c"min is greater than array length".to_owned()),
469                    CanGc::from_cx(cx),
470                );
471                return promise;
472            }
473        } else {
474            // Otherwise (i.e., it is a DataView),
475            // If options["min"] > view.[[ByteLength]], return a promise rejected with a RangeError exception.
476            if min > (view.byte_length() as u64) {
477                promise.reject_error(
478                    Error::Range(c"min is greater than byte length".to_owned()),
479                    CanGc::from_cx(cx),
480                );
481                return promise;
482            }
483        }
484
485        // If this.[[stream]] is undefined, return a promise rejected with a TypeError exception.
486        if self.stream.get().is_none() {
487            promise.reject_error(
488                Error::Type(c"min is greater than byte length".to_owned()),
489                CanGc::from_cx(cx),
490            );
491            return promise;
492        }
493
494        // Let readIntoRequest be a new read-into request with the following items:
495        //
496        // chunk steps, given chunk
497        // Resolve promise with «[ "value" → chunk, "done" → false ]».
498        //
499        // close steps, given chunk
500        // Resolve promise with «[ "value" → chunk, "done" → true ]».
501        //
502        // error steps, given e
503        // Reject promise with e
504        let read_into_request = ReadIntoRequest::Read(promise.clone());
505
506        // Perform ! ReadableStreamBYOBReaderRead(this, view, options["min"], readIntoRequest).
507        self.read(cx, &view, min, &read_into_request);
508
509        // Return promise.
510        promise
511    }
512
513    /// <https://streams.spec.whatwg.org/#byob-reader-release-lock>
514    fn ReleaseLock(&self, can_gc: CanGc) -> Fallible<()> {
515        if self.stream.get().is_none() {
516            // If this.[[stream]] is undefined, return.
517            return Ok(());
518        }
519
520        // Perform !ReadableStreamBYOBReaderRelease(this).
521        self.release(can_gc)
522    }
523
524    /// <https://streams.spec.whatwg.org/#generic-reader-closed>
525    fn Closed(&self) -> Rc<Promise> {
526        self.closed()
527    }
528
529    /// <https://streams.spec.whatwg.org/#generic-reader-cancel>
530    fn Cancel(&self, cx: &mut JSContext, reason: SafeHandleValue) -> Rc<Promise> {
531        self.generic_cancel(cx, &self.global(), reason)
532    }
533}
534
535impl ReadableStreamGenericReader for ReadableStreamBYOBReader {
536    fn get_closed_promise(&self) -> Rc<Promise> {
537        self.closed_promise.borrow().clone()
538    }
539
540    fn set_closed_promise(&self, promise: Rc<Promise>) {
541        *self.closed_promise.borrow_mut() = promise;
542    }
543
544    fn set_stream(&self, stream: Option<&ReadableStream>) {
545        self.stream.set(stream);
546    }
547
548    fn get_stream(&self) -> Option<DomRoot<ReadableStream>> {
549        self.stream.get()
550    }
551
552    fn as_byob_reader(&self) -> Option<&ReadableStreamBYOBReader> {
553        Some(self)
554    }
555}