Skip to main content

script/dom/stream/
readablestreamdefaultreader.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::collections::VecDeque;
7use std::mem;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::context::JSContext;
12use js::jsapi::Heap;
13use js::jsval::{JSVal, UndefinedValue};
14use js::realm::CurrentRealm;
15use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
16use script_bindings::cell::DomRefCell;
17use script_bindings::reflector::{
18    Reflector, reflect_dom_object_with_cx, reflect_dom_object_with_proto_and_cx,
19};
20
21use super::byteteereadrequest::ByteTeeReadRequest;
22use super::readablebytestreamcontroller::ReadableByteStreamController;
23use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::{
24    ReadableStreamDefaultReaderMethods, ReadableStreamReadResult,
25};
26use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
27use crate::dom::bindings::reflector::DomGlobal;
28use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
29use crate::dom::bindings::trace::RootedTraceableBox;
30use crate::dom::globalscope::GlobalScope;
31use crate::dom::promise::Promise;
32use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
33use crate::dom::readablestream::{ReadableStream, bytes_from_chunk_jsval};
34use crate::dom::stream::defaultteereadrequest::DefaultTeeReadRequest;
35use crate::dom::stream::readablestreamgenericreader::ReadableStreamGenericReader;
36use crate::dom::types::ReadableStreamDefaultController;
37use crate::realms::enter_auto_realm;
38
39type ReadAllBytesSuccessSteps = dyn Fn(&mut js::context::JSContext, &[u8]);
40type ReadAllBytesFailureSteps = dyn Fn(&mut js::context::JSContext, SafeHandleValue);
41
42impl js::gc::Rootable for ContinueReadMicrotask {}
43
44/// Microtask handler to continue the read loop without recursion.
45/// Spec note: "This recursion could potentially cause a stack overflow
46/// if implemented directly. Implementations will need to mitigate this,
47/// e.g. by using a non-recursive variant of this algorithm, or queuing
48/// a microtask…"
49#[derive(Clone, JSTraceable, MallocSizeOf)]
50#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
51struct ContinueReadMicrotask {
52    reader: Dom<ReadableStreamDefaultReader>,
53    request: ReadRequest,
54}
55
56impl Callback for ContinueReadMicrotask {
57    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
58        // https://streams.spec.whatwg.org/#ref-for-read-loop%E2%91%A0
59        // Note: continuing the read-loop from inside a micro-task to break recursion.
60        self.reader.read(cx, &self.request);
61    }
62}
63
64/// <https://streams.spec.whatwg.org/#read-loop>
65fn read_loop(
66    cx: &mut js::context::JSContext,
67    reader: &ReadableStreamDefaultReader,
68    success_steps: Rc<ReadAllBytesSuccessSteps>,
69    failure_steps: Rc<ReadAllBytesFailureSteps>,
70) {
71    // For the purposes of the above algorithm, to read-loop given reader,
72    // bytes, successSteps, and failureSteps:
73
74    // Step 1 .Let readRequest be a new read request with the following items:
75    let req = ReadRequest::ReadLoop {
76        success_steps,
77        failure_steps,
78        reader: Dom::from_ref(reader),
79        bytes: Rc::new(DomRefCell::new(Vec::new())),
80    };
81    // Step 2 .Perform ! ReadableStreamDefaultReaderRead(reader, readRequest).
82    reader.read(cx, &req);
83}
84
85/// <https://streams.spec.whatwg.org/#read-request>
86#[derive(Clone, JSTraceable, MallocSizeOf)]
87pub(crate) enum ReadRequest {
88    /// <https://streams.spec.whatwg.org/#default-reader-read>
89    Read(#[conditional_malloc_size_of] Rc<Promise>),
90    /// <https://streams.spec.whatwg.org/#ref-for-read-request%E2%91%A2>
91    DefaultTee {
92        tee_read_request: Dom<DefaultTeeReadRequest>,
93    },
94    /// Spec read loop variant, driven by read-request steps (no Promise).
95    /// <https://streams.spec.whatwg.org/#read-loop>
96    ReadLoop {
97        #[ignore_malloc_size_of = "dyn Fn"]
98        #[no_trace]
99        success_steps: Rc<ReadAllBytesSuccessSteps>,
100        #[ignore_malloc_size_of = "dyn Fn"]
101        #[no_trace]
102        failure_steps: Rc<ReadAllBytesFailureSteps>,
103        reader: Dom<ReadableStreamDefaultReader>,
104        #[conditional_malloc_size_of]
105        bytes: Rc<DomRefCell<Vec<u8>>>,
106    },
107    ByteTee {
108        byte_tee_read_request: Dom<ByteTeeReadRequest>,
109    },
110}
111
112impl ReadRequest {
113    /// <https://streams.spec.whatwg.org/#read-request-chunk-steps>
114    pub(crate) fn chunk_steps(
115        &self,
116        cx: &mut js::context::JSContext,
117        chunk: RootedTraceableBox<Heap<JSVal>>,
118        global: &GlobalScope,
119    ) {
120        match self {
121            ReadRequest::Read(promise) => {
122                // chunk steps, given chunk
123                // Resolve promise with «[ "value" → chunk, "done" → false ]».
124                promise.resolve_native(
125                    cx,
126                    &ReadableStreamReadResult {
127                        done: Some(false),
128                        value: chunk,
129                    },
130                );
131            },
132            ReadRequest::DefaultTee { tee_read_request } => {
133                tee_read_request.enqueue_chunk_steps(cx, chunk);
134            },
135            ReadRequest::ByteTee {
136                byte_tee_read_request,
137            } => {
138                byte_tee_read_request.enqueue_chunk_steps(cx, global, chunk);
139            },
140            ReadRequest::ReadLoop {
141                success_steps: _,
142                failure_steps,
143                reader,
144                bytes,
145            } => {
146                // Spec: chunk steps, given chunk
147                let global = reader.global();
148
149                match bytes_from_chunk_jsval(cx, &chunk) {
150                    Ok(vec) => {
151                        // Step 2. Append the bytes represented by chunk to bytes.
152                        bytes.borrow_mut().extend_from_slice(&vec);
153
154                        // Step 3. Read-loop given reader, bytes, successSteps, and failureSteps.
155                        // Spec note: Avoid direct recursion; queue into a microtask.
156                        // Resolving the promise will queue a microtask to call into the native handler.
157                        let tick = Promise::new(cx, &global);
158                        tick.resolve_native(cx, &());
159
160                        let handler = PromiseNativeHandler::new(
161                            cx,
162                            &global,
163                            Some(Box::new(ContinueReadMicrotask {
164                                reader: Dom::from_ref(reader),
165                                request: self.clone(),
166                            })),
167                            None,
168                        );
169
170                        let mut realm = enter_auto_realm(cx, &*global);
171                        let cx = &mut realm.current_realm();
172                        tick.append_native_handler(cx, &handler);
173                    },
174                    Err(err) => {
175                        // Step 1. If chunk is not a Uint8Array object, call failureSteps with a TypeError and abort.
176                        rooted!(&in(cx) let mut v = UndefinedValue());
177                        err.to_jsval(cx, &global, v.handle_mut());
178                        (failure_steps)(cx, v.handle());
179                    },
180                }
181            },
182        }
183    }
184
185    /// <https://streams.spec.whatwg.org/#read-request-close-steps>
186    pub(crate) fn close_steps(&self, cx: &mut js::context::JSContext) {
187        match self {
188            ReadRequest::Read(promise) => {
189                // close steps
190                // Resolve promise with «[ "value" → undefined, "done" → true ]».
191                let result = RootedTraceableBox::new(Heap::default());
192                result.set(UndefinedValue());
193                promise.resolve_native(
194                    cx,
195                    &ReadableStreamReadResult {
196                        done: Some(true),
197                        value: result,
198                    },
199                );
200            },
201            ReadRequest::DefaultTee { tee_read_request } => {
202                tee_read_request.close_steps(cx);
203            },
204            ReadRequest::ByteTee {
205                byte_tee_read_request,
206            } => {
207                byte_tee_read_request
208                    .close_steps(cx)
209                    .expect("ByteTeeReadRequest close steps should not fail");
210            },
211            ReadRequest::ReadLoop {
212                success_steps,
213                reader,
214                bytes,
215                ..
216            } => {
217                // Step 1. Call successSteps with bytes.
218                (success_steps)(cx, &bytes.borrow());
219
220                reader
221                    .release(cx)
222                    .expect("Releasing the read-all-bytes reader should succeed");
223            },
224        }
225    }
226
227    /// <https://streams.spec.whatwg.org/#read-request-error-steps>
228    pub(crate) fn error_steps(&self, cx: &mut js::context::JSContext, e: SafeHandleValue) {
229        match self {
230            ReadRequest::Read(promise) => {
231                // error steps, given e
232                // Reject promise with e.
233                promise.reject_native(cx, &e)
234            },
235            ReadRequest::DefaultTee { tee_read_request } => {
236                tee_read_request.error_steps();
237            },
238            ReadRequest::ByteTee {
239                byte_tee_read_request,
240            } => {
241                byte_tee_read_request.error_steps();
242            },
243            ReadRequest::ReadLoop {
244                failure_steps,
245                reader,
246                ..
247            } => {
248                // Step 1. Call failureSteps with e.
249                (failure_steps)(cx, e);
250
251                reader
252                    .release(cx)
253                    .expect("Releasing the read-all-bytes reader should succeed");
254            },
255        }
256    }
257}
258
259/// The rejection handler for
260/// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee>
261#[derive(Clone, JSTraceable, MallocSizeOf)]
262#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
263struct ByteTeeClosedPromiseRejectionHandler {
264    branch_1_controller: Dom<ReadableByteStreamController>,
265    branch_2_controller: Dom<ReadableByteStreamController>,
266    #[conditional_malloc_size_of]
267    canceled_1: Rc<Cell<bool>>,
268    #[conditional_malloc_size_of]
269    canceled_2: Rc<Cell<bool>>,
270    #[conditional_malloc_size_of]
271    cancel_promise: Rc<Promise>,
272    #[conditional_malloc_size_of]
273    reader_version: Rc<Cell<u64>>,
274    expected_version: u64,
275}
276
277impl Callback for ByteTeeClosedPromiseRejectionHandler {
278    /// Continuation of <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee>
279    /// Upon rejection of reader.[[closedPromise]] with reason r,
280    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
281        // If thisReader is not the current `reader`, return.
282        if self.reader_version.get() != self.expected_version {
283            return;
284        }
285
286        // Perform ! ReadableByteStreamControllerError(branch1.[[controller]], r).
287        self.branch_1_controller.error(cx, v);
288
289        // Perform ! ReadableByteStreamControllerError(branch2.[[controller]], r).
290        self.branch_2_controller.error(cx, v);
291
292        // If canceled1 is false or canceled2 is false, resolve cancelPromise with undefined.
293        if !self.canceled_1.get() || !self.canceled_2.get() {
294            self.cancel_promise.resolve_native(cx, &());
295        }
296    }
297}
298
299/// The rejection handler for
300/// <https://streams.spec.whatwg.org/#readable-stream-tee>
301#[derive(Clone, JSTraceable, MallocSizeOf)]
302#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
303struct DefaultTeeClosedPromiseRejectionHandler {
304    branch_1_controller: Dom<ReadableStreamDefaultController>,
305    branch_2_controller: Dom<ReadableStreamDefaultController>,
306    #[conditional_malloc_size_of]
307    canceled_1: Rc<Cell<bool>>,
308    #[conditional_malloc_size_of]
309    canceled_2: Rc<Cell<bool>>,
310    #[conditional_malloc_size_of]
311    cancel_promise: Rc<Promise>,
312}
313
314impl Callback for DefaultTeeClosedPromiseRejectionHandler {
315    /// Continuation of <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee>
316    /// Upon rejection of reader.[[closedPromise]] with reason r,
317    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
318        // Perform ! ReadableStreamDefaultControllerError(branch_1.[[controller]], r).
319        self.branch_1_controller.error(cx, v);
320        // Perform ! ReadableStreamDefaultControllerError(branch_2.[[controller]], r).
321        self.branch_2_controller.error(cx, v);
322
323        // If canceled_1 is false or canceled_2 is false, resolve cancelPromise with undefined.
324        if !self.canceled_1.get() || !self.canceled_2.get() {
325            self.cancel_promise.resolve_native(cx, &());
326        }
327    }
328}
329
330/// <https://streams.spec.whatwg.org/#readablestreamdefaultreader>
331#[dom_struct]
332pub(crate) struct ReadableStreamDefaultReader {
333    reflector_: Reflector,
334
335    /// <https://streams.spec.whatwg.org/#readablestreamgenericreader-stream>
336    stream: MutNullableDom<ReadableStream>,
337
338    read_requests: DomRefCell<VecDeque<ReadRequest>>,
339
340    /// <https://streams.spec.whatwg.org/#readablestreamgenericreader-closedpromise>
341    #[conditional_malloc_size_of]
342    closed_promise: DomRefCell<Rc<Promise>>,
343}
344
345impl ReadableStreamDefaultReader {
346    fn new_with_proto(
347        cx: &mut JSContext,
348        global: &GlobalScope,
349        proto: Option<SafeHandleObject>,
350    ) -> DomRoot<ReadableStreamDefaultReader> {
351        reflect_dom_object_with_proto_and_cx(
352            Box::new(ReadableStreamDefaultReader::new_inherited(cx, global)),
353            global,
354            proto,
355            cx,
356        )
357    }
358
359    fn new_inherited(cx: &mut JSContext, global: &GlobalScope) -> ReadableStreamDefaultReader {
360        ReadableStreamDefaultReader {
361            reflector_: Reflector::new(),
362            stream: MutNullableDom::new(None),
363            read_requests: DomRefCell::new(Default::default()),
364            closed_promise: DomRefCell::new(Promise::new(cx, global)),
365        }
366    }
367
368    pub(crate) fn new(
369        cx: &mut JSContext,
370        global: &GlobalScope,
371    ) -> DomRoot<ReadableStreamDefaultReader> {
372        reflect_dom_object_with_cx(Box::new(Self::new_inherited(cx, global)), global, cx)
373    }
374
375    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-reader>
376    pub(crate) fn set_up(
377        &self,
378        cx: &mut JSContext,
379        stream: &ReadableStream,
380        global: &GlobalScope,
381    ) -> Fallible<()> {
382        // If ! IsReadableStreamLocked(stream) is true, throw a TypeError exception.
383        if stream.is_locked() {
384            return Err(Error::Type(c"stream is locked".to_owned()));
385        }
386        // Perform ! ReadableStreamReaderGenericInitialize(reader, stream).
387
388        self.generic_initialize(cx, global, stream);
389
390        // Set reader.[[readRequests]] to a new empty list.
391        self.read_requests.borrow_mut().clear();
392
393        Ok(())
394    }
395
396    /// <https://streams.spec.whatwg.org/#readable-stream-close>
397    pub(crate) fn close(&self, cx: &mut js::context::JSContext) {
398        // Resolve reader.[[closedPromise]] with undefined.
399        self.closed_promise.borrow().resolve_native(cx, &());
400        // If reader implements ReadableStreamDefaultReader,
401        // Let readRequests be reader.[[readRequests]].
402        let mut read_requests = self.take_read_requests();
403        // Set reader.[[readRequests]] to an empty list.
404        // For each readRequest of readRequests,
405        for request in read_requests.drain(0..) {
406            // Perform readRequest’s close steps.
407            request.close_steps(cx);
408        }
409    }
410
411    /// <https://streams.spec.whatwg.org/#readable-stream-add-read-request>
412    pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
413        self.read_requests
414            .borrow_mut()
415            .push_back(read_request.clone());
416    }
417
418    /// <https://streams.spec.whatwg.org/#readable-stream-get-num-read-requests>
419    pub(crate) fn get_num_read_requests(&self) -> usize {
420        self.read_requests.borrow().len()
421    }
422
423    /// <https://streams.spec.whatwg.org/#readable-stream-error>
424    pub(crate) fn error(&self, cx: &mut js::context::JSContext, e: SafeHandleValue) {
425        // Reject reader.[[closedPromise]] with e.
426        self.closed_promise.borrow().reject_native(cx, &e);
427
428        // Set reader.[[closedPromise]].[[PromiseIsHandled]] to true.
429        self.closed_promise.borrow().set_promise_is_handled(cx);
430
431        // Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, e).
432        self.error_read_requests(cx, e);
433    }
434
435    /// The removal steps of <https://streams.spec.whatwg.org/#readable-stream-fulfill-read-request>
436    pub(crate) fn remove_read_request(&self) -> ReadRequest {
437        self.read_requests
438            .borrow_mut()
439            .pop_front()
440            .expect("Reader must have read request when remove is called into.")
441    }
442
443    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultreaderrelease>
444    pub(crate) fn release(&self, cx: &mut js::context::JSContext) -> Fallible<()> {
445        // Perform ! ReadableStreamReaderGenericRelease(reader).
446        self.generic_release(cx).expect("Generic release failed");
447        // Let e be a new TypeError exception.
448        rooted!(&in(cx) let mut error = UndefinedValue());
449        Error::Type(c"Reader is released".to_owned()).to_jsval(
450            cx,
451            &self.global(),
452            error.handle_mut(),
453        );
454
455        // Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, e).
456        self.error_read_requests(cx, error.handle());
457        Ok(())
458    }
459
460    fn take_read_requests(&self) -> VecDeque<ReadRequest> {
461        mem::take(&mut *self.read_requests.borrow_mut())
462    }
463
464    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultreadererrorreadrequests>
465    fn error_read_requests(&self, cx: &mut js::context::JSContext, rval: SafeHandleValue) {
466        // step 1
467        let mut read_requests = self.take_read_requests();
468
469        // step 2 & 3
470        for request in read_requests.drain(0..) {
471            request.error_steps(cx, rval);
472        }
473    }
474
475    /// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read>
476    pub(crate) fn read(&self, cx: &mut js::context::JSContext, read_request: &ReadRequest) {
477        // Let stream be reader.[[stream]].
478
479        // Assert: stream is not undefined.
480        assert!(self.stream.get().is_some());
481
482        let stream = self.stream.get().unwrap();
483
484        // Set stream.[[disturbed]] to true.
485        stream.set_is_disturbed(true);
486        // If stream.[[state]] is "closed", perform readRequest’s close steps.
487        if stream.is_closed() {
488            read_request.close_steps(cx);
489        } else if stream.is_errored() {
490            // Otherwise, if stream.[[state]] is "errored",
491            // perform readRequest’s error steps given stream.[[storedError]].
492            rooted!(&in(cx) let mut error = UndefinedValue());
493            stream.get_stored_error(error.handle_mut());
494            read_request.error_steps(cx, error.handle());
495        } else {
496            // Otherwise
497            // Assert: stream.[[state]] is "readable".
498            assert!(stream.is_readable());
499            // Perform ! stream.[[controller]].[[PullSteps]](readRequest).
500            stream.perform_pull_steps(cx, read_request);
501        }
502    }
503
504    /// Attach the byte-tee error handler to this reader's closedPromise.
505    /// Used by ReadableByteStreamTee.
506    #[allow(clippy::too_many_arguments)]
507    pub(crate) fn byte_tee_append_native_handler_to_closed_promise(
508        &self,
509        cx: &mut js::context::JSContext,
510        branch_1: &ReadableStream,
511        branch_2: &ReadableStream,
512        canceled_1: Rc<Cell<bool>>,
513        canceled_2: Rc<Cell<bool>>,
514        cancel_promise: Rc<Promise>,
515        reader_version: Rc<Cell<u64>>,
516        expected_version: u64,
517    ) {
518        // Note: for byte tee we always operate on *byte controllers*.
519        let branch_1_controller = branch_1.get_byte_controller();
520        let branch_2_controller = branch_2.get_byte_controller();
521
522        let global = self.global();
523        let handler = PromiseNativeHandler::new(
524            cx,
525            &global,
526            None,
527            Some(Box::new(ByteTeeClosedPromiseRejectionHandler {
528                branch_1_controller: Dom::from_ref(&branch_1_controller),
529                branch_2_controller: Dom::from_ref(&branch_2_controller),
530                canceled_1,
531                canceled_2,
532                cancel_promise,
533                reader_version,
534                expected_version,
535            })),
536        );
537
538        let mut realm = enter_auto_realm(cx, &*global);
539        let cx = &mut realm.current_realm();
540
541        self.closed_promise
542            .borrow()
543            .append_native_handler(cx, &handler);
544    }
545
546    /// <https://streams.spec.whatwg.org/#ref-for-readablestreamgenericreader-closedpromise%E2%91%A1>
547    pub(crate) fn default_tee_append_native_handler_to_closed_promise(
548        &self,
549        cx: &mut js::context::JSContext,
550        branch_1: &ReadableStream,
551        branch_2: &ReadableStream,
552        canceled_1: Rc<Cell<bool>>,
553        canceled_2: Rc<Cell<bool>>,
554        cancel_promise: Rc<Promise>,
555    ) {
556        let branch_1_controller = branch_1.get_default_controller();
557
558        let branch_2_controller = branch_2.get_default_controller();
559
560        let global = self.global();
561        let handler = PromiseNativeHandler::new(
562            cx,
563            &global,
564            None,
565            Some(Box::new(DefaultTeeClosedPromiseRejectionHandler {
566                branch_1_controller: Dom::from_ref(&branch_1_controller),
567                branch_2_controller: Dom::from_ref(&branch_2_controller),
568                canceled_1,
569                canceled_2,
570                cancel_promise,
571            })),
572        );
573
574        let mut realm = enter_auto_realm(cx, &*global);
575        let cx = &mut realm.current_realm();
576
577        self.closed_promise
578            .borrow()
579            .append_native_handler(cx, &handler);
580    }
581
582    /// <https://streams.spec.whatwg.org/#readablestreamdefaultreader-read-all-bytes>
583    pub(crate) fn read_all_bytes(
584        &self,
585        cx: &mut js::context::JSContext,
586        success_steps: Rc<ReadAllBytesSuccessSteps>,
587        failure_steps: Rc<ReadAllBytesFailureSteps>,
588    ) {
589        // To read all bytes from a ReadableStreamDefaultReader reader,
590        // given successSteps, which is an algorithm accepting a byte sequence,
591        // and failureSteps, which is an algorithm accepting a JavaScript value:
592        // read-loop given reader, a new byte sequence, successSteps, and failureSteps.
593        read_loop(cx, self, success_steps, failure_steps);
594    }
595
596    /// step 3 of <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerprocessreadrequestsusingqueue>
597    pub(crate) fn process_read_requests(
598        &self,
599        cx: &mut js::context::JSContext,
600        controller: DomRoot<ReadableByteStreamController>,
601    ) -> Fallible<()> {
602        // While reader.[[readRequests]] is not empty,
603        while !self.read_requests.borrow().is_empty() {
604            // If controller.[[queueTotalSize]] is 0, return.
605            if controller.get_queue_total_size() == 0.0 {
606                return Ok(());
607            }
608
609            // Let readRequest be reader.[[readRequests]][0].
610            // Remove entry from controller.[[queue]].
611            let read_request = self.remove_read_request();
612
613            // Perform ! ReadableByteStreamControllerFillReadRequestFromQueue(controller, readRequest).
614            controller
615                .fill_read_request_from_queue(cx, &read_request)
616                .expect("Fill read request from queue failed");
617        }
618        Ok(())
619    }
620}
621
622impl ReadableStreamDefaultReaderMethods<crate::DomTypeHolder> for ReadableStreamDefaultReader {
623    /// <https://streams.spec.whatwg.org/#default-reader-constructor>
624    fn Constructor(
625        cx: &mut JSContext,
626        global: &GlobalScope,
627        proto: Option<SafeHandleObject>,
628        stream: &ReadableStream,
629    ) -> Fallible<DomRoot<Self>> {
630        let reader = Self::new_with_proto(cx, global, proto);
631
632        // Perform ? SetUpReadableStreamDefaultReader(this, stream).
633        reader.set_up(cx, stream, global)?;
634
635        Ok(reader)
636    }
637
638    /// <https://streams.spec.whatwg.org/#default-reader-read>
639    fn Read(&self, cx: &mut js::context::JSContext) -> Rc<Promise> {
640        // If this.[[stream]] is undefined, return a promise rejected with a TypeError exception.
641        if self.stream.get().is_none() {
642            rooted!(&in(cx) let mut error = UndefinedValue());
643            Error::Type(c"stream is undefined".to_owned()).to_jsval(
644                cx,
645                &self.global(),
646                error.handle_mut(),
647            );
648            return Promise::new_rejected(cx, &self.global(), error.handle());
649        }
650        // Let promise be a new promise.
651        let promise = Promise::new(cx, &self.global());
652
653        // Let readRequest be a new read request with the following items:
654        // chunk steps, given chunk
655        // Resolve promise with «[ "value" → chunk, "done" → false ]».
656        //
657        // close steps
658        // Resolve promise with «[ "value" → undefined, "done" → true ]».
659        //
660        // error steps, given e
661        // Reject promise with e.
662
663        // Rooting(unrooted_must_root): the read request contains only a promise,
664        // which does not need to be rooted,
665        // as it is safely managed natively via an Rc.
666        let read_request = ReadRequest::Read(promise.clone());
667
668        // Perform ! ReadableStreamDefaultReaderRead(this, readRequest).
669        self.read(cx, &read_request);
670
671        // Return promise.
672        promise
673    }
674
675    /// <https://streams.spec.whatwg.org/#default-reader-release-lock>
676    fn ReleaseLock(&self, cx: &mut js::context::JSContext) -> Fallible<()> {
677        if self.stream.get().is_none() {
678            // Step 1: If this.[[stream]] is undefined, return.
679            return Ok(());
680        }
681
682        // Step 2: Perform !ReadableStreamDefaultReaderRelease(this).
683        self.release(cx)
684    }
685
686    /// <https://streams.spec.whatwg.org/#generic-reader-closed>
687    fn Closed(&self) -> Rc<Promise> {
688        self.closed()
689    }
690
691    /// <https://streams.spec.whatwg.org/#generic-reader-cancel>
692    fn Cancel(&self, cx: &mut js::context::JSContext, reason: SafeHandleValue) -> Rc<Promise> {
693        self.generic_cancel(cx, &self.global(), reason)
694    }
695}
696
697impl ReadableStreamGenericReader for ReadableStreamDefaultReader {
698    fn get_closed_promise(&self) -> Rc<Promise> {
699        self.closed_promise.borrow().clone()
700    }
701
702    fn set_closed_promise(&self, promise: Rc<Promise>) {
703        *self.closed_promise.borrow_mut() = promise;
704    }
705
706    fn set_stream(&self, stream: Option<&ReadableStream>) {
707        self.stream.set(stream);
708    }
709
710    fn get_stream(&self) -> Option<DomRoot<ReadableStream>> {
711        self.stream.get()
712    }
713
714    fn as_default_reader(&self) -> Option<&ReadableStreamDefaultReader> {
715        Some(self)
716    }
717}