Skip to main content

script/dom/stream/
readablestreambyobreader.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::collections::VecDeque;
7use std::mem;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::context::JSContext;
12use js::gc::CustomAutoRooterGuard;
13use js::jsapi::Heap;
14use js::jsval::{JSVal, UndefinedValue};
15use js::realm::CurrentRealm;
16use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
17use js::typedarray::{ArrayBufferView, ArrayBufferViewU8};
18use script_bindings::cell::DomRefCell;
19use script_bindings::reflector::{
20    Reflector, reflect_dom_object_with_cx, reflect_dom_object_with_proto_and_cx,
21};
22use script_bindings::root::Dom;
23
24use super::byteteereadintorequest::ByteTeeReadIntoRequest;
25use super::readablebytestreamcontroller::ReadableByteStreamController;
26use super::readablestreamgenericreader::ReadableStreamGenericReader;
27use crate::dom::bindings::buffer_source::{BufferSource, HeapBufferSource};
28use crate::dom::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::{
29    ReadableStreamBYOBReaderMethods, ReadableStreamBYOBReaderReadOptions,
30};
31use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamReadResult;
32use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
33use crate::dom::bindings::reflector::DomGlobal;
34use crate::dom::bindings::root::{DomRoot, MutNullableDom};
35use crate::dom::bindings::trace::RootedTraceableBox;
36use crate::dom::globalscope::GlobalScope;
37use crate::dom::promise::Promise;
38use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
39use crate::dom::stream::readablestream::ReadableStream;
40use crate::realms::enter_auto_realm;
41
42/// <https://streams.spec.whatwg.org/#read-into-request>
43#[derive(Clone, JSTraceable, MallocSizeOf)]
44pub enum ReadIntoRequest {
45    /// <https://streams.spec.whatwg.org/#byob-reader-read>
46    Read(#[conditional_malloc_size_of] Rc<Promise>),
47    ByteTee {
48        byte_tee_read_into_request: Dom<ByteTeeReadIntoRequest>,
49    },
50}
51
52impl ReadIntoRequest {
53    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-chunk-steps%E2%91%A0>
54    pub fn chunk_steps(&self, cx: &mut JSContext, chunk: RootedTraceableBox<Heap<JSVal>>) {
55        match self {
56            ReadIntoRequest::Read(promise) => {
57                // chunk steps, given chunk
58                // Resolve promise with «[ "value" → chunk, "done" → false ]».
59                promise.resolve_native(
60                    cx,
61                    &ReadableStreamReadResult {
62                        done: Some(false),
63                        value: chunk,
64                    },
65                );
66            },
67            ReadIntoRequest::ByteTee {
68                byte_tee_read_into_request,
69            } => byte_tee_read_into_request.enqueue_chunk_steps(
70                cx,
71                RootedTraceableBox::new(HeapBufferSource::<ArrayBufferViewU8>::new(
72                    BufferSource::ArrayBufferView(Heap::boxed(chunk.get().to_object())),
73                )),
74            ),
75        }
76    }
77
78    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-close-steps%E2%91%A0>
79    pub fn close_steps(&self, cx: &mut JSContext, chunk: Option<RootedTraceableBox<Heap<JSVal>>>) {
80        match self {
81            ReadIntoRequest::Read(promise) => match chunk {
82                // close steps, given chunk
83                // Resolve promise with «[ "value" → chunk, "done" → true ]».
84                Some(chunk) => promise.resolve_native(
85                    cx,
86                    &ReadableStreamReadResult {
87                        done: Some(true),
88                        value: chunk,
89                    },
90                ),
91                None => {
92                    let result = RootedTraceableBox::new(Heap::default());
93                    result.set(UndefinedValue());
94                    promise.resolve_native(
95                        cx,
96                        &ReadableStreamReadResult {
97                            done: Some(true),
98                            value: result,
99                        },
100                    );
101                },
102            },
103            ReadIntoRequest::ByteTee {
104                byte_tee_read_into_request,
105            } => match chunk {
106                Some(chunk) => byte_tee_read_into_request
107                    .close_steps(
108                        cx,
109                        Some(RootedTraceableBox::new(
110                            HeapBufferSource::<ArrayBufferViewU8>::new(
111                                BufferSource::ArrayBufferView(Heap::boxed(chunk.get().to_object())),
112                            ),
113                        )),
114                    )
115                    .expect("close steps should not fail"),
116                None => byte_tee_read_into_request
117                    .close_steps(cx, None)
118                    .expect("close steps should not fail"),
119            },
120        }
121    }
122
123    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-error-steps%E2%91%A0>
124    pub(crate) fn error_steps(&self, cx: &mut JSContext, e: SafeHandleValue) {
125        match self {
126            ReadIntoRequest::Read(promise) => {
127                // error steps, given e
128                // Reject promise with e.
129                promise.reject_native(cx, &e)
130            },
131            ReadIntoRequest::ByteTee {
132                byte_tee_read_into_request,
133            } => {
134                byte_tee_read_into_request.error_steps();
135            },
136        }
137    }
138}
139
140/// The rejection handler for
141/// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee>
142#[derive(Clone, JSTraceable, MallocSizeOf)]
143#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
144struct ByteTeeClosedPromiseRejectionHandler {
145    branch_1_controller: Dom<ReadableByteStreamController>,
146    branch_2_controller: Dom<ReadableByteStreamController>,
147    #[conditional_malloc_size_of]
148    canceled_1: Rc<Cell<bool>>,
149    #[conditional_malloc_size_of]
150    canceled_2: Rc<Cell<bool>>,
151    #[conditional_malloc_size_of]
152    cancel_promise: Rc<Promise>,
153    #[conditional_malloc_size_of]
154    reader_version: Rc<Cell<u64>>,
155    expected_version: u64,
156}
157
158impl Callback for ByteTeeClosedPromiseRejectionHandler {
159    /// Continuation of <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee>
160    /// Upon rejection of `reader.closedPromise` with reason `r``,
161    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
162        // If thisReader is not reader, return.
163        if self.reader_version.get() != self.expected_version {
164            return;
165        }
166
167        // Perform ! ReadableByteStreamControllerError(branch1.[[controller]], r).
168        self.branch_1_controller.error(cx, v);
169
170        // Perform ! ReadableByteStreamControllerError(branch2.[[controller]], r).
171        self.branch_2_controller.error(cx, v);
172
173        // If canceled1 is false or canceled2 is false, resolve cancelPromise with undefined.
174        if !self.canceled_1.get() || !self.canceled_2.get() {
175            self.cancel_promise.resolve_native(cx, &());
176        }
177    }
178}
179
180/// <https://streams.spec.whatwg.org/#readablestreambyobreader>
181#[dom_struct]
182pub(crate) struct ReadableStreamBYOBReader {
183    reflector_: Reflector,
184
185    /// <https://streams.spec.whatwg.org/#readablestreamgenericreader-stream>
186    stream: MutNullableDom<ReadableStream>,
187
188    read_into_requests: DomRefCell<VecDeque<ReadIntoRequest>>,
189
190    /// <https://streams.spec.whatwg.org/#readablestreamgenericreader-closedpromise>
191    #[conditional_malloc_size_of]
192    closed_promise: DomRefCell<Rc<Promise>>,
193}
194
195impl ReadableStreamBYOBReader {
196    fn new_with_proto(
197        cx: &mut JSContext,
198        global: &GlobalScope,
199        proto: Option<SafeHandleObject>,
200    ) -> DomRoot<ReadableStreamBYOBReader> {
201        reflect_dom_object_with_proto_and_cx(
202            Box::new(ReadableStreamBYOBReader::new_inherited(cx, global)),
203            global,
204            proto,
205            cx,
206        )
207    }
208
209    fn new_inherited(cx: &mut JSContext, global: &GlobalScope) -> ReadableStreamBYOBReader {
210        ReadableStreamBYOBReader {
211            reflector_: Reflector::new(),
212            stream: MutNullableDom::new(None),
213            read_into_requests: DomRefCell::new(Default::default()),
214            closed_promise: DomRefCell::new(Promise::new(cx, global)),
215        }
216    }
217
218    pub(crate) fn new(
219        cx: &mut JSContext,
220        global: &GlobalScope,
221    ) -> DomRoot<ReadableStreamBYOBReader> {
222        reflect_dom_object_with_cx(Box::new(Self::new_inherited(cx, global)), global, cx)
223    }
224
225    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-byob-reader>
226    pub(crate) fn set_up(
227        &self,
228        cx: &mut JSContext,
229        stream: &ReadableStream,
230        global: &GlobalScope,
231    ) -> Fallible<()> {
232        // If ! IsReadableStreamLocked(stream) is true, throw a TypeError exception.
233        if stream.is_locked() {
234            return Err(Error::Type(c"stream is locked".to_owned()));
235        }
236
237        // If stream.[[controller]] does not implement ReadableByteStreamController, throw a TypeError exception.
238        if !stream.has_byte_controller() {
239            return Err(Error::Type(
240                c"stream controller is not a byte stream controller".to_owned(),
241            ));
242        }
243
244        // Perform ! ReadableStreamReaderGenericInitialize(reader, stream).
245        self.generic_initialize(cx, global, stream);
246
247        // Set reader.[[readIntoRequests]] to a new empty list.
248        self.read_into_requests.borrow_mut().clear();
249
250        Ok(())
251    }
252
253    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreambyobreaderrelease>
254    pub(crate) fn release(&self, cx: &mut JSContext) -> Fallible<()> {
255        // Perform ! ReadableStreamReaderGenericRelease(reader).
256        self.generic_release(cx).expect("Generic release failed");
257        // Let e be a new TypeError exception.
258        rooted!(&in(cx) let mut error = UndefinedValue());
259        Error::Type(c"Reader is released".to_owned()).to_jsval(
260            cx,
261            &self.global(),
262            error.handle_mut(),
263        );
264
265        // Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e).
266        self.error_read_into_requests(cx, error.handle());
267        Ok(())
268    }
269
270    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreambyobreadererrorreadintorequests>
271    pub(crate) fn error_read_into_requests(&self, cx: &mut JSContext, e: SafeHandleValue) {
272        // Reject reader.[[closedPromise]] with e.
273        self.closed_promise.borrow().reject_native(cx, &e);
274
275        // Set reader.[[closedPromise]].[[PromiseIsHandled]] to true.
276        self.closed_promise.borrow().set_promise_is_handled(cx);
277
278        // Let readRequests be reader.[[readRequests]].
279        let mut read_into_requests = self.take_read_into_requests();
280
281        // Set reader.[[readIntoRequests]] to a new empty list.
282        for request in read_into_requests.drain(0..) {
283            // Perform readIntoRequest’s error steps, given e.
284            request.error_steps(cx, e);
285        }
286    }
287
288    fn take_read_into_requests(&self) -> VecDeque<ReadIntoRequest> {
289        mem::take(&mut *self.read_into_requests.borrow_mut())
290    }
291
292    /// <https://streams.spec.whatwg.org/#readable-stream-add-read-into-request>
293    pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
294        self.read_into_requests
295            .borrow_mut()
296            .push_back(read_request.clone());
297    }
298
299    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>
300    pub(crate) fn cancel(&self, cx: &mut JSContext) {
301        // If reader is not undefined and reader implements ReadableStreamBYOBReader,
302        // Let readIntoRequests be reader.[[readIntoRequests]].
303        let mut read_into_requests = self.take_read_into_requests();
304        // Set reader.[[readIntoRequests]] to an empty list.
305        // Perform readIntoRequest’s close steps, given undefined.
306        for request in read_into_requests.drain(0..) {
307            // Perform readIntoRequest’s close steps, given undefined.
308            request.close_steps(cx, None);
309        }
310    }
311
312    pub(crate) fn close(&self, cx: &mut JSContext) {
313        // Resolve reader.[[closedPromise]] with undefined.
314        self.closed_promise.borrow().resolve_native(cx, &());
315    }
316
317    /// <https://streams.spec.whatwg.org/#readable-stream-byob-reader-read>
318    pub(crate) fn read(
319        &self,
320        cx: &mut JSContext,
321        view: &HeapBufferSource<ArrayBufferViewU8>,
322        min: u64,
323        read_into_request: &ReadIntoRequest,
324    ) {
325        // Let stream be reader.[[stream]].
326
327        // Assert: stream is not undefined.
328        assert!(self.stream.get().is_some());
329
330        let stream = self.stream.get().unwrap();
331
332        // Set stream.[[disturbed]] to true.
333        stream.set_is_disturbed(true);
334        // If stream.[[state]] is "errored", perform readIntoRequest’s error steps given stream.[[storedError]].
335        if stream.is_errored() {
336            rooted!(&in(cx) let mut error = UndefinedValue());
337            stream.get_stored_error(error.handle_mut());
338
339            read_into_request.error_steps(cx, error.handle());
340        } else {
341            // Otherwise,
342            // perform ! ReadableByteStreamControllerPullInto(stream.[[controller]], view, min, readIntoRequest).
343            stream.perform_pull_into(cx, read_into_request, view, min);
344        }
345    }
346
347    pub(crate) fn get_num_read_into_requests(&self) -> usize {
348        self.read_into_requests.borrow().len()
349    }
350
351    pub(crate) fn remove_read_into_request(&self) -> ReadIntoRequest {
352        self.read_into_requests
353            .borrow_mut()
354            .pop_front()
355            .expect("read into requests is empty")
356    }
357
358    #[allow(clippy::too_many_arguments)]
359    pub(crate) fn byte_tee_append_native_handler_to_closed_promise(
360        &self,
361        cx: &mut JSContext,
362        branch_1: &ReadableStream,
363        branch_2: &ReadableStream,
364        canceled_1: Rc<Cell<bool>>,
365        canceled_2: Rc<Cell<bool>>,
366        cancel_promise: Rc<Promise>,
367        reader_version: Rc<Cell<u64>>,
368        expected_version: u64,
369    ) {
370        let branch_1_controller = branch_1.get_byte_controller();
371
372        let branch_2_controller = branch_2.get_byte_controller();
373
374        let global = self.global();
375        let handler = PromiseNativeHandler::new(
376            cx,
377            &global,
378            None,
379            Some(Box::new(ByteTeeClosedPromiseRejectionHandler {
380                branch_1_controller: Dom::from_ref(&branch_1_controller),
381                branch_2_controller: Dom::from_ref(&branch_2_controller),
382                canceled_1,
383                canceled_2,
384                cancel_promise,
385                reader_version,
386                expected_version,
387            })),
388        );
389
390        let mut realm = enter_auto_realm(cx, &*global);
391        let cx = &mut realm.current_realm();
392
393        self.closed_promise
394            .borrow()
395            .append_native_handler(cx, &handler);
396    }
397}
398
399impl ReadableStreamBYOBReaderMethods<crate::DomTypeHolder> for ReadableStreamBYOBReader {
400    /// <https://streams.spec.whatwg.org/#byob-reader-constructor>
401    fn Constructor(
402        cx: &mut JSContext,
403        global: &GlobalScope,
404        proto: Option<SafeHandleObject>,
405        stream: &ReadableStream,
406    ) -> Fallible<DomRoot<Self>> {
407        let reader = Self::new_with_proto(cx, global, proto);
408
409        // Perform ? SetUpReadableStreamBYOBReader(this, stream).
410        reader.set_up(cx, stream, global)?;
411
412        Ok(reader)
413    }
414
415    /// <https://streams.spec.whatwg.org/#byob-reader-read>
416    fn Read(
417        &self,
418        cx: &mut JSContext,
419        view: CustomAutoRooterGuard<ArrayBufferView>,
420        options: &ReadableStreamBYOBReaderReadOptions,
421    ) -> Rc<Promise> {
422        let view = HeapBufferSource::<ArrayBufferViewU8>::from_view(view);
423        let min = options.min;
424        // Let promise be a new promise.
425        let promise = Promise::new(cx, &self.global());
426
427        // If view.[[ByteLength]] is 0, return a promise rejected with a TypeError exception.
428        if view.byte_length() == 0 {
429            promise.reject_error(cx, Error::Type(c"view byte length is 0".to_owned()));
430            return promise;
431        }
432        // If view.[[ViewedArrayBuffer]].[[ArrayBufferByteLength]] is 0,
433        // return a promise rejected with a TypeError exception.
434        if view.viewed_buffer_array_byte_length(cx) == 0 {
435            promise.reject_error(
436                cx,
437                Error::Type(c"viewed buffer byte length is 0".to_owned()),
438            );
439            return promise;
440        }
441
442        // If ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is true,
443        // return a promise rejected with a TypeError exception.
444        if view.is_detached_buffer(cx) {
445            promise.reject_error(cx, Error::Type(c"view is detached".to_owned()));
446            return promise;
447        }
448
449        // If options["min"] is 0, return a promise rejected with a TypeError exception.
450        if min == 0 {
451            promise.reject_error(cx, Error::Type(c"min is 0".to_owned()));
452            return promise;
453        }
454
455        // If view has a [[TypedArrayName]] internal slot,
456        if view.has_typed_array_name() {
457            // If options["min"] > view.[[ArrayLength]], return a promise rejected with a RangeError exception.
458            if min > (view.get_typed_array_length() as u64) {
459                promise.reject_error(
460                    cx,
461                    Error::Range(c"min is greater than array length".to_owned()),
462                );
463                return promise;
464            }
465        } else {
466            // Otherwise (i.e., it is a DataView),
467            // If options["min"] > view.[[ByteLength]], return a promise rejected with a RangeError exception.
468            if min > (view.byte_length() as u64) {
469                promise.reject_error(
470                    cx,
471                    Error::Range(c"min is greater than byte length".to_owned()),
472                );
473                return promise;
474            }
475        }
476
477        // If this.[[stream]] is undefined, return a promise rejected with a TypeError exception.
478        if self.stream.get().is_none() {
479            promise.reject_error(
480                cx,
481                Error::Type(c"min is greater than byte length".to_owned()),
482            );
483            return promise;
484        }
485
486        // Let readIntoRequest be a new read-into request with the following items:
487        //
488        // chunk steps, given chunk
489        // Resolve promise with «[ "value" → chunk, "done" → false ]».
490        //
491        // close steps, given chunk
492        // Resolve promise with «[ "value" → chunk, "done" → true ]».
493        //
494        // error steps, given e
495        // Reject promise with e
496        let read_into_request = ReadIntoRequest::Read(promise.clone());
497
498        // Perform ! ReadableStreamBYOBReaderRead(this, view, options["min"], readIntoRequest).
499        self.read(cx, &view, min, &read_into_request);
500
501        // Return promise.
502        promise
503    }
504
505    /// <https://streams.spec.whatwg.org/#byob-reader-release-lock>
506    fn ReleaseLock(&self, cx: &mut JSContext) -> Fallible<()> {
507        if self.stream.get().is_none() {
508            // If this.[[stream]] is undefined, return.
509            return Ok(());
510        }
511
512        // Perform !ReadableStreamBYOBReaderRelease(this).
513        self.release(cx)
514    }
515
516    /// <https://streams.spec.whatwg.org/#generic-reader-closed>
517    fn Closed(&self) -> Rc<Promise> {
518        self.closed()
519    }
520
521    /// <https://streams.spec.whatwg.org/#generic-reader-cancel>
522    fn Cancel(&self, cx: &mut JSContext, reason: SafeHandleValue) -> Rc<Promise> {
523        self.generic_cancel(cx, &self.global(), reason)
524    }
525}
526
527impl ReadableStreamGenericReader for ReadableStreamBYOBReader {
528    fn get_closed_promise(&self) -> Rc<Promise> {
529        self.closed_promise.borrow().clone()
530    }
531
532    fn set_closed_promise(&self, promise: Rc<Promise>) {
533        *self.closed_promise.borrow_mut() = promise;
534    }
535
536    fn set_stream(&self, stream: Option<&ReadableStream>) {
537        self.stream.set(stream);
538    }
539
540    fn get_stream(&self) -> Option<DomRoot<ReadableStream>> {
541        self.stream.get()
542    }
543
544    fn as_byob_reader(&self) -> Option<&ReadableStreamBYOBReader> {
545        Some(self)
546    }
547}