Skip to main content

script/dom/stream/
readablestreambyobreader.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::collections::VecDeque;
7use std::mem;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::context::JSContext;
12use js::gc::CustomAutoRooterGuard;
13use js::jsapi::Heap;
14use js::jsval::{JSVal, UndefinedValue};
15use js::realm::CurrentRealm;
16use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
17use js::typedarray::{ArrayBufferView, ArrayBufferViewU8};
18use script_bindings::cell::DomRefCell;
19use script_bindings::reflector::{Reflector, reflect_dom_object, reflect_dom_object_with_proto};
20use script_bindings::root::Dom;
21
22use super::byteteereadintorequest::ByteTeeReadIntoRequest;
23use super::readablebytestreamcontroller::ReadableByteStreamController;
24use super::readablestreamgenericreader::ReadableStreamGenericReader;
25use crate::dom::bindings::buffer_source::{BufferSource, HeapBufferSource};
26use crate::dom::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::{
27    ReadableStreamBYOBReaderMethods, ReadableStreamBYOBReaderReadOptions,
28};
29use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamReadResult;
30use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
31use crate::dom::bindings::reflector::DomGlobal;
32use crate::dom::bindings::root::{DomRoot, MutNullableDom};
33use crate::dom::bindings::trace::RootedTraceableBox;
34use crate::dom::globalscope::GlobalScope;
35use crate::dom::promise::Promise;
36use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
37use crate::dom::stream::readablestream::ReadableStream;
38use crate::realms::enter_auto_realm;
39use crate::script_runtime::CanGc;
40
41/// <https://streams.spec.whatwg.org/#read-into-request>
42#[derive(Clone, JSTraceable, MallocSizeOf)]
43pub enum ReadIntoRequest {
44    /// <https://streams.spec.whatwg.org/#byob-reader-read>
45    Read(#[conditional_malloc_size_of] Rc<Promise>),
46    ByteTee {
47        byte_tee_read_into_request: Dom<ByteTeeReadIntoRequest>,
48    },
49}
50
51impl ReadIntoRequest {
52    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-chunk-steps%E2%91%A0>
53    pub fn chunk_steps(&self, chunk: RootedTraceableBox<Heap<JSVal>>, can_gc: CanGc) {
54        match self {
55            ReadIntoRequest::Read(promise) => {
56                // chunk steps, given chunk
57                // Resolve promise with «[ "value" → chunk, "done" → false ]».
58                promise.resolve_native(
59                    &ReadableStreamReadResult {
60                        done: Some(false),
61                        value: chunk,
62                    },
63                    can_gc,
64                );
65            },
66            ReadIntoRequest::ByteTee {
67                byte_tee_read_into_request,
68            } => byte_tee_read_into_request.enqueue_chunk_steps(RootedTraceableBox::new(
69                HeapBufferSource::<ArrayBufferViewU8>::new(BufferSource::ArrayBufferView(
70                    Heap::boxed(chunk.get().to_object()),
71                )),
72            )),
73        }
74    }
75
76    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-close-steps%E2%91%A0>
77    pub fn close_steps(&self, cx: &mut JSContext, chunk: Option<RootedTraceableBox<Heap<JSVal>>>) {
78        match self {
79            ReadIntoRequest::Read(promise) => match chunk {
80                // close steps, given chunk
81                // Resolve promise with «[ "value" → chunk, "done" → true ]».
82                Some(chunk) => promise.resolve_native(
83                    &ReadableStreamReadResult {
84                        done: Some(true),
85                        value: chunk,
86                    },
87                    CanGc::from_cx(cx),
88                ),
89                None => {
90                    let result = RootedTraceableBox::new(Heap::default());
91                    result.set(UndefinedValue());
92                    promise.resolve_native(
93                        &ReadableStreamReadResult {
94                            done: Some(true),
95                            value: result,
96                        },
97                        CanGc::from_cx(cx),
98                    );
99                },
100            },
101            ReadIntoRequest::ByteTee {
102                byte_tee_read_into_request,
103            } => match chunk {
104                Some(chunk) => byte_tee_read_into_request
105                    .close_steps(
106                        cx,
107                        Some(RootedTraceableBox::new(
108                            HeapBufferSource::<ArrayBufferViewU8>::new(
109                                BufferSource::ArrayBufferView(Heap::boxed(chunk.get().to_object())),
110                            ),
111                        )),
112                    )
113                    .expect("close steps should not fail"),
114                None => byte_tee_read_into_request
115                    .close_steps(cx, None)
116                    .expect("close steps should not fail"),
117            },
118        }
119    }
120
121    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-error-steps%E2%91%A0>
122    pub(crate) fn error_steps(&self, cx: &mut JSContext, e: SafeHandleValue) {
123        match self {
124            ReadIntoRequest::Read(promise) => {
125                // error steps, given e
126                // Reject promise with e.
127                promise.reject_native_with_cx(cx, &e)
128            },
129            ReadIntoRequest::ByteTee {
130                byte_tee_read_into_request,
131            } => {
132                byte_tee_read_into_request.error_steps();
133            },
134        }
135    }
136}
137
138/// The rejection handler for
139/// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee>
140#[derive(Clone, JSTraceable, MallocSizeOf)]
141#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
142struct ByteTeeClosedPromiseRejectionHandler {
143    branch_1_controller: Dom<ReadableByteStreamController>,
144    branch_2_controller: Dom<ReadableByteStreamController>,
145    #[conditional_malloc_size_of]
146    canceled_1: Rc<Cell<bool>>,
147    #[conditional_malloc_size_of]
148    canceled_2: Rc<Cell<bool>>,
149    #[conditional_malloc_size_of]
150    cancel_promise: Rc<Promise>,
151    #[conditional_malloc_size_of]
152    reader_version: Rc<Cell<u64>>,
153    expected_version: u64,
154}
155
156impl Callback for ByteTeeClosedPromiseRejectionHandler {
157    /// Continuation of <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee>
158    /// Upon rejection of `reader.closedPromise` with reason `r``,
159    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
160        // If thisReader is not reader, return.
161        if self.reader_version.get() != self.expected_version {
162            return;
163        }
164
165        // Perform ! ReadableByteStreamControllerError(branch1.[[controller]], r).
166        self.branch_1_controller.error(cx, v);
167
168        // Perform ! ReadableByteStreamControllerError(branch2.[[controller]], r).
169        self.branch_2_controller.error(cx, v);
170
171        // If canceled1 is false or canceled2 is false, resolve cancelPromise with undefined.
172        if !self.canceled_1.get() || !self.canceled_2.get() {
173            self.cancel_promise.resolve_native_with_cx(cx, &());
174        }
175    }
176}
177
178/// <https://streams.spec.whatwg.org/#readablestreambyobreader>
179#[dom_struct]
180pub(crate) struct ReadableStreamBYOBReader {
181    reflector_: Reflector,
182
183    /// <https://streams.spec.whatwg.org/#readablestreamgenericreader-stream>
184    stream: MutNullableDom<ReadableStream>,
185
186    read_into_requests: DomRefCell<VecDeque<ReadIntoRequest>>,
187
188    /// <https://streams.spec.whatwg.org/#readablestreamgenericreader-closedpromise>
189    #[conditional_malloc_size_of]
190    closed_promise: DomRefCell<Rc<Promise>>,
191}
192
193impl ReadableStreamBYOBReader {
194    fn new_with_proto(
195        global: &GlobalScope,
196        proto: Option<SafeHandleObject>,
197        can_gc: CanGc,
198    ) -> DomRoot<ReadableStreamBYOBReader> {
199        reflect_dom_object_with_proto(
200            Box::new(ReadableStreamBYOBReader::new_inherited(global, can_gc)),
201            global,
202            proto,
203            can_gc,
204        )
205    }
206
207    fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> ReadableStreamBYOBReader {
208        ReadableStreamBYOBReader {
209            reflector_: Reflector::new(),
210            stream: MutNullableDom::new(None),
211            read_into_requests: DomRefCell::new(Default::default()),
212            closed_promise: DomRefCell::new(Promise::new(global, can_gc)),
213        }
214    }
215
216    pub(crate) fn new(global: &GlobalScope, can_gc: CanGc) -> DomRoot<ReadableStreamBYOBReader> {
217        reflect_dom_object(
218            Box::new(Self::new_inherited(global, can_gc)),
219            global,
220            can_gc,
221        )
222    }
223
224    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-byob-reader>
225    pub(crate) fn set_up(
226        &self,
227        stream: &ReadableStream,
228        global: &GlobalScope,
229        can_gc: CanGc,
230    ) -> Fallible<()> {
231        // If ! IsReadableStreamLocked(stream) is true, throw a TypeError exception.
232        if stream.is_locked() {
233            return Err(Error::Type(c"stream is locked".to_owned()));
234        }
235
236        // If stream.[[controller]] does not implement ReadableByteStreamController, throw a TypeError exception.
237        if !stream.has_byte_controller() {
238            return Err(Error::Type(
239                c"stream controller is not a byte stream controller".to_owned(),
240            ));
241        }
242
243        // Perform ! ReadableStreamReaderGenericInitialize(reader, stream).
244        self.generic_initialize(global, stream, can_gc);
245
246        // Set reader.[[readIntoRequests]] to a new empty list.
247        self.read_into_requests.borrow_mut().clear();
248
249        Ok(())
250    }
251
252    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreambyobreaderrelease>
253    pub(crate) fn release(&self, cx: &mut JSContext) -> Fallible<()> {
254        // Perform ! ReadableStreamReaderGenericRelease(reader).
255        self.generic_release(cx).expect("Generic release failed");
256        // Let e be a new TypeError exception.
257        rooted!(&in(cx) let mut error = UndefinedValue());
258        Error::Type(c"Reader is released".to_owned()).to_jsval(
259            cx.into(),
260            &self.global(),
261            error.handle_mut(),
262            CanGc::from_cx(cx),
263        );
264
265        // Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e).
266        self.error_read_into_requests(cx, error.handle());
267        Ok(())
268    }
269
270    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreambyobreadererrorreadintorequests>
271    pub(crate) fn error_read_into_requests(&self, cx: &mut JSContext, e: SafeHandleValue) {
272        // Reject reader.[[closedPromise]] with e.
273        self.closed_promise.borrow().reject_native_with_cx(cx, &e);
274
275        // Set reader.[[closedPromise]].[[PromiseIsHandled]] to true.
276        self.closed_promise.borrow().set_promise_is_handled();
277
278        // Let readRequests be reader.[[readRequests]].
279        let mut read_into_requests = self.take_read_into_requests();
280
281        // Set reader.[[readIntoRequests]] to a new empty list.
282        for request in read_into_requests.drain(0..) {
283            // Perform readIntoRequest’s error steps, given e.
284            request.error_steps(cx, e);
285        }
286    }
287
288    fn take_read_into_requests(&self) -> VecDeque<ReadIntoRequest> {
289        mem::take(&mut *self.read_into_requests.borrow_mut())
290    }
291
292    /// <https://streams.spec.whatwg.org/#readable-stream-add-read-into-request>
293    pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
294        self.read_into_requests
295            .borrow_mut()
296            .push_back(read_request.clone());
297    }
298
299    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>
300    pub(crate) fn cancel(&self, cx: &mut JSContext) {
301        // If reader is not undefined and reader implements ReadableStreamBYOBReader,
302        // Let readIntoRequests be reader.[[readIntoRequests]].
303        let mut read_into_requests = self.take_read_into_requests();
304        // Set reader.[[readIntoRequests]] to an empty list.
305        // Perform readIntoRequest’s close steps, given undefined.
306        for request in read_into_requests.drain(0..) {
307            // Perform readIntoRequest’s close steps, given undefined.
308            request.close_steps(cx, None);
309        }
310    }
311
312    pub(crate) fn close(&self, can_gc: CanGc) {
313        // Resolve reader.[[closedPromise]] with undefined.
314        self.closed_promise.borrow().resolve_native(&(), can_gc);
315    }
316
317    /// <https://streams.spec.whatwg.org/#readable-stream-byob-reader-read>
318    pub(crate) fn read(
319        &self,
320        cx: &mut JSContext,
321        view: &HeapBufferSource<ArrayBufferViewU8>,
322        min: u64,
323        read_into_request: &ReadIntoRequest,
324    ) {
325        // Let stream be reader.[[stream]].
326
327        // Assert: stream is not undefined.
328        assert!(self.stream.get().is_some());
329
330        let stream = self.stream.get().unwrap();
331
332        // Set stream.[[disturbed]] to true.
333        stream.set_is_disturbed(true);
334        // If stream.[[state]] is "errored", perform readIntoRequest’s error steps given stream.[[storedError]].
335        if stream.is_errored() {
336            rooted!(&in(cx) let mut error = UndefinedValue());
337            stream.get_stored_error(error.handle_mut());
338
339            read_into_request.error_steps(cx, error.handle());
340        } else {
341            // Otherwise,
342            // perform ! ReadableByteStreamControllerPullInto(stream.[[controller]], view, min, readIntoRequest).
343            stream.perform_pull_into(cx, read_into_request, view, min);
344        }
345    }
346
347    pub(crate) fn get_num_read_into_requests(&self) -> usize {
348        self.read_into_requests.borrow().len()
349    }
350
351    pub(crate) fn remove_read_into_request(&self) -> ReadIntoRequest {
352        self.read_into_requests
353            .borrow_mut()
354            .pop_front()
355            .expect("read into requests is empty")
356    }
357
358    #[allow(clippy::too_many_arguments)]
359    pub(crate) fn byte_tee_append_native_handler_to_closed_promise(
360        &self,
361        cx: &mut JSContext,
362        branch_1: &ReadableStream,
363        branch_2: &ReadableStream,
364        canceled_1: Rc<Cell<bool>>,
365        canceled_2: Rc<Cell<bool>>,
366        cancel_promise: Rc<Promise>,
367        reader_version: Rc<Cell<u64>>,
368        expected_version: u64,
369    ) {
370        let branch_1_controller = branch_1.get_byte_controller();
371
372        let branch_2_controller = branch_2.get_byte_controller();
373
374        let global = self.global();
375        let handler = PromiseNativeHandler::new(
376            cx,
377            &global,
378            None,
379            Some(Box::new(ByteTeeClosedPromiseRejectionHandler {
380                branch_1_controller: Dom::from_ref(&branch_1_controller),
381                branch_2_controller: Dom::from_ref(&branch_2_controller),
382                canceled_1,
383                canceled_2,
384                cancel_promise,
385                reader_version,
386                expected_version,
387            })),
388        );
389
390        let mut realm = enter_auto_realm(cx, &*global);
391        let cx = &mut realm.current_realm();
392
393        self.closed_promise
394            .borrow()
395            .append_native_handler(cx, &handler);
396    }
397}
398
399impl ReadableStreamBYOBReaderMethods<crate::DomTypeHolder> for ReadableStreamBYOBReader {
400    /// <https://streams.spec.whatwg.org/#byob-reader-constructor>
401    fn Constructor(
402        global: &GlobalScope,
403        proto: Option<SafeHandleObject>,
404        can_gc: CanGc,
405        stream: &ReadableStream,
406    ) -> Fallible<DomRoot<Self>> {
407        let reader = Self::new_with_proto(global, proto, can_gc);
408
409        // Perform ? SetUpReadableStreamBYOBReader(this, stream).
410        Self::set_up(&reader, stream, global, can_gc)?;
411
412        Ok(reader)
413    }
414
415    /// <https://streams.spec.whatwg.org/#byob-reader-read>
416    fn Read(
417        &self,
418        cx: &mut JSContext,
419        view: CustomAutoRooterGuard<ArrayBufferView>,
420        options: &ReadableStreamBYOBReaderReadOptions,
421    ) -> Rc<Promise> {
422        let view = HeapBufferSource::<ArrayBufferViewU8>::from_view(view);
423        let min = options.min;
424        // Let promise be a new promise.
425        let promise = Promise::new2(cx, &self.global());
426
427        // If view.[[ByteLength]] is 0, return a promise rejected with a TypeError exception.
428        if view.byte_length() == 0 {
429            promise.reject_error_with_cx(cx, Error::Type(c"view byte length is 0".to_owned()));
430            return promise;
431        }
432        // If view.[[ViewedArrayBuffer]].[[ArrayBufferByteLength]] is 0,
433        // return a promise rejected with a TypeError exception.
434        if view.viewed_buffer_array_byte_length(cx.into()) == 0 {
435            promise.reject_error_with_cx(
436                cx,
437                Error::Type(c"viewed buffer byte length is 0".to_owned()),
438            );
439            return promise;
440        }
441
442        // If ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is true,
443        // return a promise rejected with a TypeError exception.
444        if view.is_detached_buffer(cx.into()) {
445            promise.reject_error_with_cx(cx, Error::Type(c"view is detached".to_owned()));
446            return promise;
447        }
448
449        // If options["min"] is 0, return a promise rejected with a TypeError exception.
450        if min == 0 {
451            promise.reject_error_with_cx(cx, Error::Type(c"min is 0".to_owned()));
452            return promise;
453        }
454
455        // If view has a [[TypedArrayName]] internal slot,
456        if view.has_typed_array_name() {
457            // If options["min"] > view.[[ArrayLength]], return a promise rejected with a RangeError exception.
458            if min > (view.get_typed_array_length() as u64) {
459                promise.reject_error_with_cx(
460                    cx,
461                    Error::Range(c"min is greater than array length".to_owned()),
462                );
463                return promise;
464            }
465        } else {
466            // Otherwise (i.e., it is a DataView),
467            // If options["min"] > view.[[ByteLength]], return a promise rejected with a RangeError exception.
468            if min > (view.byte_length() as u64) {
469                promise.reject_error_with_cx(
470                    cx,
471                    Error::Range(c"min is greater than byte length".to_owned()),
472                );
473                return promise;
474            }
475        }
476
477        // If this.[[stream]] is undefined, return a promise rejected with a TypeError exception.
478        if self.stream.get().is_none() {
479            promise.reject_error_with_cx(
480                cx,
481                Error::Type(c"min is greater than byte length".to_owned()),
482            );
483            return promise;
484        }
485
486        // Let readIntoRequest be a new read-into request with the following items:
487        //
488        // chunk steps, given chunk
489        // Resolve promise with «[ "value" → chunk, "done" → false ]».
490        //
491        // close steps, given chunk
492        // Resolve promise with «[ "value" → chunk, "done" → true ]».
493        //
494        // error steps, given e
495        // Reject promise with e
496        let read_into_request = ReadIntoRequest::Read(promise.clone());
497
498        // Perform ! ReadableStreamBYOBReaderRead(this, view, options["min"], readIntoRequest).
499        self.read(cx, &view, min, &read_into_request);
500
501        // Return promise.
502        promise
503    }
504
505    /// <https://streams.spec.whatwg.org/#byob-reader-release-lock>
506    fn ReleaseLock(&self, cx: &mut JSContext) -> Fallible<()> {
507        if self.stream.get().is_none() {
508            // If this.[[stream]] is undefined, return.
509            return Ok(());
510        }
511
512        // Perform !ReadableStreamBYOBReaderRelease(this).
513        self.release(cx)
514    }
515
516    /// <https://streams.spec.whatwg.org/#generic-reader-closed>
517    fn Closed(&self) -> Rc<Promise> {
518        self.closed()
519    }
520
521    /// <https://streams.spec.whatwg.org/#generic-reader-cancel>
522    fn Cancel(&self, cx: &mut JSContext, reason: SafeHandleValue) -> Rc<Promise> {
523        self.generic_cancel(cx, &self.global(), reason)
524    }
525}
526
527impl ReadableStreamGenericReader for ReadableStreamBYOBReader {
528    fn get_closed_promise(&self) -> Rc<Promise> {
529        self.closed_promise.borrow().clone()
530    }
531
532    fn set_closed_promise(&self, promise: Rc<Promise>) {
533        *self.closed_promise.borrow_mut() = promise;
534    }
535
536    fn set_stream(&self, stream: Option<&ReadableStream>) {
537        self.stream.set(stream);
538    }
539
540    fn get_stream(&self) -> Option<DomRoot<ReadableStream>> {
541        self.stream.get()
542    }
543
544    fn as_byob_reader(&self) -> Option<&ReadableStreamBYOBReader> {
545        Some(self)
546    }
547}