script/dom/
readablestreambyobreader.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::collections::VecDeque;
7use std::mem;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::gc::CustomAutoRooterGuard;
12use js::jsapi::Heap;
13use js::jsval::{JSVal, UndefinedValue};
14use js::realm::CurrentRealm;
15use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
16use js::typedarray::{ArrayBufferView, ArrayBufferViewU8};
17use script_bindings::root::Dom;
18
19use super::bindings::buffer_source::{BufferSource, HeapBufferSource};
20use super::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::ReadableStreamBYOBReaderReadOptions;
21use super::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamReadResult;
22use super::bindings::reflector::reflect_dom_object;
23use super::byteteereadintorequest::ByteTeeReadIntoRequest;
24use super::promisenativehandler::{Callback, PromiseNativeHandler};
25use super::readablebytestreamcontroller::ReadableByteStreamController;
26use super::readablestreamgenericreader::ReadableStreamGenericReader;
27use crate::dom::bindings::cell::DomRefCell;
28use crate::dom::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::ReadableStreamBYOBReaderMethods;
29use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
30use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
31use crate::dom::bindings::root::{DomRoot, MutNullableDom};
32use crate::dom::bindings::trace::RootedTraceableBox;
33use crate::dom::globalscope::GlobalScope;
34use crate::dom::promise::Promise;
35use crate::dom::readablestream::ReadableStream;
36use crate::realms::{InRealm, enter_realm};
37use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
38
39/// <https://streams.spec.whatwg.org/#read-into-request>
40#[derive(Clone, JSTraceable, MallocSizeOf)]
41pub enum ReadIntoRequest {
42    /// <https://streams.spec.whatwg.org/#byob-reader-read>
43    Read(#[conditional_malloc_size_of] Rc<Promise>),
44    ByteTee {
45        byte_tee_read_into_request: Dom<ByteTeeReadIntoRequest>,
46    },
47}
48
49impl ReadIntoRequest {
50    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-chunk-steps%E2%91%A0>
51    pub fn chunk_steps(&self, chunk: RootedTraceableBox<Heap<JSVal>>, can_gc: CanGc) {
52        match self {
53            ReadIntoRequest::Read(promise) => {
54                // chunk steps, given chunk
55                // Resolve promise with «[ "value" → chunk, "done" → false ]».
56                promise.resolve_native(
57                    &ReadableStreamReadResult {
58                        done: Some(false),
59                        value: chunk,
60                    },
61                    can_gc,
62                );
63            },
64            ReadIntoRequest::ByteTee {
65                byte_tee_read_into_request,
66            } => {
67                byte_tee_read_into_request.enqueue_chunk_steps(
68                    HeapBufferSource::<ArrayBufferViewU8>::new(BufferSource::ArrayBufferView(
69                        RootedTraceableBox::from_box(Heap::boxed(chunk.get().to_object())),
70                    )),
71                )
72            },
73        }
74    }
75
76    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-close-steps%E2%91%A0>
77    pub fn close_steps(&self, chunk: Option<RootedTraceableBox<Heap<JSVal>>>, can_gc: CanGc) {
78        match self {
79            ReadIntoRequest::Read(promise) => match chunk {
80                // close steps, given chunk
81                // Resolve promise with «[ "value" → chunk, "done" → true ]».
82                Some(chunk) => promise.resolve_native(
83                    &ReadableStreamReadResult {
84                        done: Some(true),
85                        value: chunk,
86                    },
87                    can_gc,
88                ),
89                None => {
90                    let result = RootedTraceableBox::new(Heap::default());
91                    result.set(UndefinedValue());
92                    promise.resolve_native(
93                        &ReadableStreamReadResult {
94                            done: Some(true),
95                            value: result,
96                        },
97                        can_gc,
98                    );
99                },
100            },
101            ReadIntoRequest::ByteTee {
102                byte_tee_read_into_request,
103            } => match chunk {
104                Some(chunk) => byte_tee_read_into_request
105                    .close_steps(
106                        Some(HeapBufferSource::<ArrayBufferViewU8>::new(
107                            BufferSource::ArrayBufferView(RootedTraceableBox::from_box(
108                                Heap::boxed(chunk.get().to_object()),
109                            )),
110                        )),
111                        can_gc,
112                    )
113                    .expect("close steps should not fail"),
114                None => byte_tee_read_into_request
115                    .close_steps(None, can_gc)
116                    .expect("close steps should not fail"),
117            },
118        }
119    }
120
121    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-error-steps%E2%91%A0>
122    pub(crate) fn error_steps(&self, e: SafeHandleValue, can_gc: CanGc) {
123        match self {
124            ReadIntoRequest::Read(promise) => {
125                // error steps, given e
126                // Reject promise with e.
127                promise.reject_native(&e, can_gc)
128            },
129            ReadIntoRequest::ByteTee {
130                byte_tee_read_into_request,
131            } => {
132                byte_tee_read_into_request.error_steps();
133            },
134        }
135    }
136}
137
138/// The rejection handler for
139/// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee>
140#[derive(Clone, JSTraceable, MallocSizeOf)]
141#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
142struct ByteTeeClosedPromiseRejectionHandler {
143    branch_1_controller: Dom<ReadableByteStreamController>,
144    branch_2_controller: Dom<ReadableByteStreamController>,
145    #[ignore_malloc_size_of = "Rc"]
146    canceled_1: Rc<Cell<bool>>,
147    #[ignore_malloc_size_of = "Rc"]
148    canceled_2: Rc<Cell<bool>>,
149    #[ignore_malloc_size_of = "Rc"]
150    cancel_promise: Rc<Promise>,
151    #[ignore_malloc_size_of = "Rc"]
152    reader_version: Rc<Cell<u64>>,
153    expected_version: u64,
154}
155
156impl Callback for ByteTeeClosedPromiseRejectionHandler {
157    /// Continuation of <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee>
158    /// Upon rejection of `reader.closedPromise` with reason `r``,
159    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
160        let can_gc = CanGc::from_cx(cx);
161        // If thisReader is not reader, return.
162        if self.reader_version.get() != self.expected_version {
163            return;
164        }
165
166        // Perform ! ReadableByteStreamControllerError(branch1.[[controller]], r).
167        self.branch_1_controller.error(v, can_gc);
168
169        // Perform ! ReadableByteStreamControllerError(branch2.[[controller]], r).
170        self.branch_2_controller.error(v, can_gc);
171
172        // If canceled1 is false or canceled2 is false, resolve cancelPromise with undefined.
173        if !self.canceled_1.get() || !self.canceled_2.get() {
174            self.cancel_promise.resolve_native(&(), can_gc);
175        }
176    }
177}
178
179/// <https://streams.spec.whatwg.org/#readablestreambyobreader>
180#[dom_struct]
181pub(crate) struct ReadableStreamBYOBReader {
182    reflector_: Reflector,
183
184    /// <https://streams.spec.whatwg.org/#readablestreamgenericreader-stream>
185    stream: MutNullableDom<ReadableStream>,
186
187    read_into_requests: DomRefCell<VecDeque<ReadIntoRequest>>,
188
189    /// <https://streams.spec.whatwg.org/#readablestreamgenericreader-closedpromise>
190    #[conditional_malloc_size_of]
191    closed_promise: DomRefCell<Rc<Promise>>,
192}
193
194impl ReadableStreamBYOBReader {
195    fn new_with_proto(
196        global: &GlobalScope,
197        proto: Option<SafeHandleObject>,
198        can_gc: CanGc,
199    ) -> DomRoot<ReadableStreamBYOBReader> {
200        reflect_dom_object_with_proto(
201            Box::new(ReadableStreamBYOBReader::new_inherited(global, can_gc)),
202            global,
203            proto,
204            can_gc,
205        )
206    }
207
208    fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> ReadableStreamBYOBReader {
209        ReadableStreamBYOBReader {
210            reflector_: Reflector::new(),
211            stream: MutNullableDom::new(None),
212            read_into_requests: DomRefCell::new(Default::default()),
213            closed_promise: DomRefCell::new(Promise::new(global, can_gc)),
214        }
215    }
216
217    pub(crate) fn new(global: &GlobalScope, can_gc: CanGc) -> DomRoot<ReadableStreamBYOBReader> {
218        reflect_dom_object(
219            Box::new(Self::new_inherited(global, can_gc)),
220            global,
221            can_gc,
222        )
223    }
224
225    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-byob-reader>
226    pub(crate) fn set_up(
227        &self,
228        stream: &ReadableStream,
229        global: &GlobalScope,
230        can_gc: CanGc,
231    ) -> Fallible<()> {
232        // If ! IsReadableStreamLocked(stream) is true, throw a TypeError exception.
233        if stream.is_locked() {
234            return Err(Error::Type("stream is locked".to_owned()));
235        }
236
237        // If stream.[[controller]] does not implement ReadableByteStreamController, throw a TypeError exception.
238        if !stream.has_byte_controller() {
239            return Err(Error::Type(
240                "stream controller is not a byte stream controller".to_owned(),
241            ));
242        }
243
244        // Perform ! ReadableStreamReaderGenericInitialize(reader, stream).
245        self.generic_initialize(global, stream, can_gc);
246
247        // Set reader.[[readIntoRequests]] to a new empty list.
248        self.read_into_requests.borrow_mut().clear();
249
250        Ok(())
251    }
252
253    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreambyobreaderrelease>
254    pub(crate) fn release(&self, can_gc: CanGc) -> Fallible<()> {
255        // Perform ! ReadableStreamReaderGenericRelease(reader).
256        self.generic_release(can_gc)
257            .expect("Generic release failed");
258        // Let e be a new TypeError exception.
259        let cx = GlobalScope::get_cx();
260        rooted!(in(*cx) let mut error = UndefinedValue());
261        Error::Type("Reader is released".to_owned()).to_jsval(
262            cx,
263            &self.global(),
264            error.handle_mut(),
265            can_gc,
266        );
267
268        // Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e).
269        self.error_read_into_requests(error.handle(), can_gc);
270        Ok(())
271    }
272
273    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreambyobreadererrorreadintorequests>
274    pub(crate) fn error_read_into_requests(&self, e: SafeHandleValue, can_gc: CanGc) {
275        // Reject reader.[[closedPromise]] with e.
276        self.closed_promise.borrow().reject_native(&e, can_gc);
277
278        // Set reader.[[closedPromise]].[[PromiseIsHandled]] to true.
279        self.closed_promise.borrow().set_promise_is_handled();
280
281        // Let readRequests be reader.[[readRequests]].
282        let mut read_into_requests = self.take_read_into_requests();
283
284        // Set reader.[[readIntoRequests]] to a new empty list.
285        for request in read_into_requests.drain(0..) {
286            // Perform readIntoRequest’s error steps, given e.
287            request.error_steps(e, can_gc);
288        }
289    }
290
291    fn take_read_into_requests(&self) -> VecDeque<ReadIntoRequest> {
292        mem::take(&mut *self.read_into_requests.borrow_mut())
293    }
294
295    /// <https://streams.spec.whatwg.org/#readable-stream-add-read-into-request>
296    pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
297        self.read_into_requests
298            .borrow_mut()
299            .push_back(read_request.clone());
300    }
301
302    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>
303    pub(crate) fn cancel(&self, can_gc: CanGc) {
304        // If reader is not undefined and reader implements ReadableStreamBYOBReader,
305        // Let readIntoRequests be reader.[[readIntoRequests]].
306        let mut read_into_requests = self.take_read_into_requests();
307        // Set reader.[[readIntoRequests]] to an empty list.
308        // Perform readIntoRequest’s close steps, given undefined.
309        for request in read_into_requests.drain(0..) {
310            // Perform readIntoRequest’s close steps, given undefined.
311            request.close_steps(None, can_gc);
312        }
313    }
314
315    pub(crate) fn close(&self, can_gc: CanGc) {
316        // Resolve reader.[[closedPromise]] with undefined.
317        self.closed_promise.borrow().resolve_native(&(), can_gc);
318    }
319
320    /// <https://streams.spec.whatwg.org/#readable-stream-byob-reader-read>
321    pub(crate) fn read(
322        &self,
323        cx: SafeJSContext,
324        view: HeapBufferSource<ArrayBufferViewU8>,
325        min: u64,
326        read_into_request: &ReadIntoRequest,
327        can_gc: CanGc,
328    ) {
329        // Let stream be reader.[[stream]].
330
331        // Assert: stream is not undefined.
332        assert!(self.stream.get().is_some());
333
334        let stream = self.stream.get().unwrap();
335
336        // Set stream.[[disturbed]] to true.
337        stream.set_is_disturbed(true);
338        // If stream.[[state]] is "errored", perform readIntoRequest’s error steps given stream.[[storedError]].
339        if stream.is_errored() {
340            let cx = GlobalScope::get_cx();
341            rooted!(in(*cx) let mut error = UndefinedValue());
342            stream.get_stored_error(error.handle_mut());
343
344            read_into_request.error_steps(error.handle(), can_gc);
345        } else {
346            // Otherwise,
347            // perform ! ReadableByteStreamControllerPullInto(stream.[[controller]], view, min, readIntoRequest).
348            stream.perform_pull_into(cx, read_into_request, view, min, can_gc);
349        }
350    }
351
352    pub(crate) fn get_num_read_into_requests(&self) -> usize {
353        self.read_into_requests.borrow().len()
354    }
355
356    pub(crate) fn remove_read_into_request(&self) -> ReadIntoRequest {
357        self.read_into_requests
358            .borrow_mut()
359            .pop_front()
360            .expect("read into requests is empty")
361    }
362
363    #[allow(clippy::too_many_arguments)]
364    pub(crate) fn byte_tee_append_native_handler_to_closed_promise(
365        &self,
366        branch_1: &ReadableStream,
367        branch_2: &ReadableStream,
368        canceled_1: Rc<Cell<bool>>,
369        canceled_2: Rc<Cell<bool>>,
370        cancel_promise: Rc<Promise>,
371        reader_version: Rc<Cell<u64>>,
372        expected_version: u64,
373        can_gc: CanGc,
374    ) {
375        let branch_1_controller = branch_1.get_byte_controller();
376
377        let branch_2_controller = branch_2.get_byte_controller();
378
379        let global = self.global();
380        let handler = PromiseNativeHandler::new(
381            &global,
382            None,
383            Some(Box::new(ByteTeeClosedPromiseRejectionHandler {
384                branch_1_controller: Dom::from_ref(&branch_1_controller),
385                branch_2_controller: Dom::from_ref(&branch_2_controller),
386                canceled_1,
387                canceled_2,
388                cancel_promise,
389                reader_version,
390                expected_version,
391            })),
392            can_gc,
393        );
394
395        let realm = enter_realm(&*global);
396        let comp = InRealm::Entered(&realm);
397
398        self.closed_promise
399            .borrow()
400            .append_native_handler(&handler, comp, can_gc);
401    }
402}
403
404impl ReadableStreamBYOBReaderMethods<crate::DomTypeHolder> for ReadableStreamBYOBReader {
405    /// <https://streams.spec.whatwg.org/#byob-reader-constructor>
406    fn Constructor(
407        global: &GlobalScope,
408        proto: Option<SafeHandleObject>,
409        can_gc: CanGc,
410        stream: &ReadableStream,
411    ) -> Fallible<DomRoot<Self>> {
412        let reader = Self::new_with_proto(global, proto, can_gc);
413
414        // Perform ? SetUpReadableStreamBYOBReader(this, stream).
415        Self::set_up(&reader, stream, global, can_gc)?;
416
417        Ok(reader)
418    }
419
420    /// <https://streams.spec.whatwg.org/#byob-reader-read>
421    fn Read(
422        &self,
423        view: CustomAutoRooterGuard<ArrayBufferView>,
424        options: &ReadableStreamBYOBReaderReadOptions,
425        can_gc: CanGc,
426    ) -> Rc<Promise> {
427        let view = HeapBufferSource::<ArrayBufferViewU8>::from_view(view);
428        let min = options.min;
429        // Let promise be a new promise.
430        let promise = Promise::new(&self.global(), can_gc);
431
432        let cx = GlobalScope::get_cx();
433        // If view.[[ByteLength]] is 0, return a promise rejected with a TypeError exception.
434        if view.byte_length() == 0 {
435            promise.reject_error(Error::Type("view byte length is 0".to_owned()), can_gc);
436            return promise;
437        }
438        // If view.[[ViewedArrayBuffer]].[[ArrayBufferByteLength]] is 0,
439        // return a promise rejected with a TypeError exception.
440        if view.viewed_buffer_array_byte_length(cx) == 0 {
441            promise.reject_error(
442                Error::Type("viewed buffer byte length is 0".to_owned()),
443                can_gc,
444            );
445            return promise;
446        }
447
448        // If ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is true,
449        // return a promise rejected with a TypeError exception.
450        if view.is_detached_buffer(cx) {
451            promise.reject_error(Error::Type("view is detached".to_owned()), can_gc);
452            return promise;
453        }
454
455        // If options["min"] is 0, return a promise rejected with a TypeError exception.
456        if min == 0 {
457            promise.reject_error(Error::Type("min is 0".to_owned()), can_gc);
458            return promise;
459        }
460
461        // If view has a [[TypedArrayName]] internal slot,
462        if view.has_typed_array_name() {
463            // If options["min"] > view.[[ArrayLength]], return a promise rejected with a RangeError exception.
464            if min > (view.get_typed_array_length() as u64) {
465                promise.reject_error(
466                    Error::Range("min is greater than array length".to_owned()),
467                    can_gc,
468                );
469                return promise;
470            }
471        } else {
472            // Otherwise (i.e., it is a DataView),
473            // If options["min"] > view.[[ByteLength]], return a promise rejected with a RangeError exception.
474            if min > (view.byte_length() as u64) {
475                promise.reject_error(
476                    Error::Range("min is greater than byte length".to_owned()),
477                    can_gc,
478                );
479                return promise;
480            }
481        }
482
483        // If this.[[stream]] is undefined, return a promise rejected with a TypeError exception.
484        if self.stream.get().is_none() {
485            promise.reject_error(
486                Error::Type("min is greater than byte length".to_owned()),
487                can_gc,
488            );
489            return promise;
490        }
491
492        // Let readIntoRequest be a new read-into request with the following items:
493        //
494        // chunk steps, given chunk
495        // Resolve promise with «[ "value" → chunk, "done" → false ]».
496        //
497        // close steps, given chunk
498        // Resolve promise with «[ "value" → chunk, "done" → true ]».
499        //
500        // error steps, given e
501        // Reject promise with e
502        let read_into_request = ReadIntoRequest::Read(promise.clone());
503
504        // Perform ! ReadableStreamBYOBReaderRead(this, view, options["min"], readIntoRequest).
505        self.read(cx, view, min, &read_into_request, can_gc);
506
507        // Return promise.
508        promise
509    }
510
511    /// <https://streams.spec.whatwg.org/#byob-reader-release-lock>
512    fn ReleaseLock(&self, can_gc: CanGc) -> Fallible<()> {
513        if self.stream.get().is_none() {
514            // If this.[[stream]] is undefined, return.
515            return Ok(());
516        }
517
518        // Perform !ReadableStreamBYOBReaderRelease(this).
519        self.release(can_gc)
520    }
521
522    /// <https://streams.spec.whatwg.org/#generic-reader-closed>
523    fn Closed(&self) -> Rc<Promise> {
524        self.closed()
525    }
526
527    /// <https://streams.spec.whatwg.org/#generic-reader-cancel>
528    fn Cancel(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
529        self.generic_cancel(cx, &self.global(), reason, can_gc)
530    }
531}
532
533impl ReadableStreamGenericReader for ReadableStreamBYOBReader {
534    fn get_closed_promise(&self) -> Rc<Promise> {
535        self.closed_promise.borrow().clone()
536    }
537
538    fn set_closed_promise(&self, promise: Rc<Promise>) {
539        *self.closed_promise.borrow_mut() = promise;
540    }
541
542    fn set_stream(&self, stream: Option<&ReadableStream>) {
543        self.stream.set(stream);
544    }
545
546    fn get_stream(&self) -> Option<DomRoot<ReadableStream>> {
547        self.stream.get()
548    }
549
550    fn as_byob_reader(&self) -> Option<&ReadableStreamBYOBReader> {
551        Some(self)
552    }
553}