Skip to main content

script/dom/stream/
readablestreamdefaultreader.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::collections::VecDeque;
7use std::mem;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::jsapi::Heap;
12use js::jsval::{JSVal, UndefinedValue};
13use js::realm::CurrentRealm;
14use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
15use script_bindings::cell::DomRefCell;
16use script_bindings::reflector::{Reflector, reflect_dom_object, reflect_dom_object_with_proto};
17
18use super::byteteereadrequest::ByteTeeReadRequest;
19use super::readablebytestreamcontroller::ReadableByteStreamController;
20use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::{
21    ReadableStreamDefaultReaderMethods, ReadableStreamReadResult,
22};
23use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
24use crate::dom::bindings::reflector::DomGlobal;
25use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
26use crate::dom::bindings::trace::RootedTraceableBox;
27use crate::dom::globalscope::GlobalScope;
28use crate::dom::promise::Promise;
29use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
30use crate::dom::readablestream::{ReadableStream, bytes_from_chunk_jsval};
31use crate::dom::stream::defaultteereadrequest::DefaultTeeReadRequest;
32use crate::dom::stream::readablestreamgenericreader::ReadableStreamGenericReader;
33use crate::dom::types::ReadableStreamDefaultController;
34use crate::realms::enter_auto_realm;
35use crate::script_runtime::CanGc;
36
37type ReadAllBytesSuccessSteps = dyn Fn(&mut js::context::JSContext, &[u8]);
38type ReadAllBytesFailureSteps = dyn Fn(&mut js::context::JSContext, SafeHandleValue);
39
40impl js::gc::Rootable for ContinueReadMicrotask {}
41
42/// Microtask handler to continue the read loop without recursion.
43/// Spec note: "This recursion could potentially cause a stack overflow
44/// if implemented directly. Implementations will need to mitigate this,
45/// e.g. by using a non-recursive variant of this algorithm, or queuing
46/// a microtask…"
47#[derive(Clone, JSTraceable, MallocSizeOf)]
48#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
49struct ContinueReadMicrotask {
50    reader: Dom<ReadableStreamDefaultReader>,
51    request: ReadRequest,
52}
53
54impl Callback for ContinueReadMicrotask {
55    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
56        // https://streams.spec.whatwg.org/#ref-for-read-loop%E2%91%A0
57        // Note: continuing the read-loop from inside a micro-task to break recursion.
58        self.reader.read(cx, &self.request);
59    }
60}
61
62/// <https://streams.spec.whatwg.org/#read-loop>
63fn read_loop(
64    cx: &mut js::context::JSContext,
65    reader: &ReadableStreamDefaultReader,
66    success_steps: Rc<ReadAllBytesSuccessSteps>,
67    failure_steps: Rc<ReadAllBytesFailureSteps>,
68) {
69    // For the purposes of the above algorithm, to read-loop given reader,
70    // bytes, successSteps, and failureSteps:
71
72    // Step 1 .Let readRequest be a new read request with the following items:
73    let req = ReadRequest::ReadLoop {
74        success_steps,
75        failure_steps,
76        reader: Dom::from_ref(reader),
77        bytes: Rc::new(DomRefCell::new(Vec::new())),
78    };
79    // Step 2 .Perform ! ReadableStreamDefaultReaderRead(reader, readRequest).
80    reader.read(cx, &req);
81}
82
83/// <https://streams.spec.whatwg.org/#read-request>
84#[derive(Clone, JSTraceable, MallocSizeOf)]
85pub(crate) enum ReadRequest {
86    /// <https://streams.spec.whatwg.org/#default-reader-read>
87    Read(#[conditional_malloc_size_of] Rc<Promise>),
88    /// <https://streams.spec.whatwg.org/#ref-for-read-request%E2%91%A2>
89    DefaultTee {
90        tee_read_request: Dom<DefaultTeeReadRequest>,
91    },
92    /// Spec read loop variant, driven by read-request steps (no Promise).
93    /// <https://streams.spec.whatwg.org/#read-loop>
94    ReadLoop {
95        #[ignore_malloc_size_of = "dyn Fn"]
96        #[no_trace]
97        success_steps: Rc<ReadAllBytesSuccessSteps>,
98        #[ignore_malloc_size_of = "dyn Fn"]
99        #[no_trace]
100        failure_steps: Rc<ReadAllBytesFailureSteps>,
101        reader: Dom<ReadableStreamDefaultReader>,
102        #[conditional_malloc_size_of]
103        bytes: Rc<DomRefCell<Vec<u8>>>,
104    },
105    ByteTee {
106        byte_tee_read_request: Dom<ByteTeeReadRequest>,
107    },
108}
109
110impl ReadRequest {
111    /// <https://streams.spec.whatwg.org/#read-request-chunk-steps>
112    pub(crate) fn chunk_steps(
113        &self,
114        cx: &mut js::context::JSContext,
115        chunk: RootedTraceableBox<Heap<JSVal>>,
116        global: &GlobalScope,
117    ) {
118        match self {
119            ReadRequest::Read(promise) => {
120                // chunk steps, given chunk
121                // Resolve promise with «[ "value" → chunk, "done" → false ]».
122                promise.resolve_native(
123                    &ReadableStreamReadResult {
124                        done: Some(false),
125                        value: chunk,
126                    },
127                    CanGc::from_cx(cx),
128                );
129            },
130            ReadRequest::DefaultTee { tee_read_request } => {
131                tee_read_request.enqueue_chunk_steps(chunk);
132            },
133            ReadRequest::ByteTee {
134                byte_tee_read_request,
135            } => {
136                byte_tee_read_request.enqueue_chunk_steps(global, chunk);
137            },
138            ReadRequest::ReadLoop {
139                success_steps: _,
140                failure_steps,
141                reader,
142                bytes,
143            } => {
144                // Spec: chunk steps, given chunk
145                let global = reader.global();
146
147                match bytes_from_chunk_jsval(cx, &chunk) {
148                    Ok(vec) => {
149                        // Step 2. Append the bytes represented by chunk to bytes.
150                        bytes.borrow_mut().extend_from_slice(&vec);
151
152                        // Step 3. Read-loop given reader, bytes, successSteps, and failureSteps.
153                        // Spec note: Avoid direct recursion; queue into a microtask.
154                        // Resolving the promise will queue a microtask to call into the native handler.
155                        let tick = Promise::new2(cx, &global);
156                        tick.resolve_native_with_cx(cx, &());
157
158                        let handler = PromiseNativeHandler::new(
159                            cx,
160                            &global,
161                            Some(Box::new(ContinueReadMicrotask {
162                                reader: Dom::from_ref(reader),
163                                request: self.clone(),
164                            })),
165                            None,
166                        );
167
168                        let mut realm = enter_auto_realm(cx, &*global);
169                        let cx = &mut realm.current_realm();
170                        tick.append_native_handler(cx, &handler);
171                    },
172                    Err(err) => {
173                        // Step 1. If chunk is not a Uint8Array object, call failureSteps with a TypeError and abort.
174                        rooted!(&in(cx) let mut v = UndefinedValue());
175                        err.to_jsval(cx.into(), &global, v.handle_mut(), CanGc::from_cx(cx));
176                        (failure_steps)(cx, v.handle());
177                    },
178                }
179            },
180        }
181    }
182
183    /// <https://streams.spec.whatwg.org/#read-request-close-steps>
184    pub(crate) fn close_steps(&self, cx: &mut js::context::JSContext) {
185        match self {
186            ReadRequest::Read(promise) => {
187                // close steps
188                // Resolve promise with «[ "value" → undefined, "done" → true ]».
189                let result = RootedTraceableBox::new(Heap::default());
190                result.set(UndefinedValue());
191                promise.resolve_native(
192                    &ReadableStreamReadResult {
193                        done: Some(true),
194                        value: result,
195                    },
196                    CanGc::from_cx(cx),
197                );
198            },
199            ReadRequest::DefaultTee { tee_read_request } => {
200                tee_read_request.close_steps(cx);
201            },
202            ReadRequest::ByteTee {
203                byte_tee_read_request,
204            } => {
205                byte_tee_read_request
206                    .close_steps(cx)
207                    .expect("ByteTeeReadRequest close steps should not fail");
208            },
209            ReadRequest::ReadLoop {
210                success_steps,
211                reader,
212                bytes,
213                ..
214            } => {
215                // Step 1. Call successSteps with bytes.
216                (success_steps)(cx, &bytes.borrow());
217
218                reader
219                    .release(cx)
220                    .expect("Releasing the read-all-bytes reader should succeed");
221            },
222        }
223    }
224
225    /// <https://streams.spec.whatwg.org/#read-request-error-steps>
226    pub(crate) fn error_steps(&self, cx: &mut js::context::JSContext, e: SafeHandleValue) {
227        match self {
228            ReadRequest::Read(promise) => {
229                // error steps, given e
230                // Reject promise with e.
231                promise.reject_native(&e, CanGc::from_cx(cx))
232            },
233            ReadRequest::DefaultTee { tee_read_request } => {
234                tee_read_request.error_steps();
235            },
236            ReadRequest::ByteTee {
237                byte_tee_read_request,
238            } => {
239                byte_tee_read_request.error_steps();
240            },
241            ReadRequest::ReadLoop {
242                failure_steps,
243                reader,
244                ..
245            } => {
246                // Step 1. Call failureSteps with e.
247                (failure_steps)(cx, e);
248
249                reader
250                    .release(cx)
251                    .expect("Releasing the read-all-bytes reader should succeed");
252            },
253        }
254    }
255}
256
257/// The rejection handler for
258/// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee>
259#[derive(Clone, JSTraceable, MallocSizeOf)]
260#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
261struct ByteTeeClosedPromiseRejectionHandler {
262    branch_1_controller: Dom<ReadableByteStreamController>,
263    branch_2_controller: Dom<ReadableByteStreamController>,
264    #[conditional_malloc_size_of]
265    canceled_1: Rc<Cell<bool>>,
266    #[conditional_malloc_size_of]
267    canceled_2: Rc<Cell<bool>>,
268    #[conditional_malloc_size_of]
269    cancel_promise: Rc<Promise>,
270    #[conditional_malloc_size_of]
271    reader_version: Rc<Cell<u64>>,
272    expected_version: u64,
273}
274
275impl Callback for ByteTeeClosedPromiseRejectionHandler {
276    /// Continuation of <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee>
277    /// Upon rejection of reader.[[closedPromise]] with reason r,
278    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
279        // If thisReader is not the current `reader`, return.
280        if self.reader_version.get() != self.expected_version {
281            return;
282        }
283
284        // Perform ! ReadableByteStreamControllerError(branch1.[[controller]], r).
285        self.branch_1_controller.error(cx, v);
286
287        // Perform ! ReadableByteStreamControllerError(branch2.[[controller]], r).
288        self.branch_2_controller.error(cx, v);
289
290        // If canceled1 is false or canceled2 is false, resolve cancelPromise with undefined.
291        if !self.canceled_1.get() || !self.canceled_2.get() {
292            self.cancel_promise.resolve_native_with_cx(cx, &());
293        }
294    }
295}
296
297/// The rejection handler for
298/// <https://streams.spec.whatwg.org/#readable-stream-tee>
299#[derive(Clone, JSTraceable, MallocSizeOf)]
300#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
301struct DefaultTeeClosedPromiseRejectionHandler {
302    branch_1_controller: Dom<ReadableStreamDefaultController>,
303    branch_2_controller: Dom<ReadableStreamDefaultController>,
304    #[conditional_malloc_size_of]
305    canceled_1: Rc<Cell<bool>>,
306    #[conditional_malloc_size_of]
307    canceled_2: Rc<Cell<bool>>,
308    #[conditional_malloc_size_of]
309    cancel_promise: Rc<Promise>,
310}
311
312impl Callback for DefaultTeeClosedPromiseRejectionHandler {
313    /// Continuation of <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee>
314    /// Upon rejection of reader.[[closedPromise]] with reason r,
315    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
316        // Perform ! ReadableStreamDefaultControllerError(branch_1.[[controller]], r).
317        self.branch_1_controller.error(cx, v);
318        // Perform ! ReadableStreamDefaultControllerError(branch_2.[[controller]], r).
319        self.branch_2_controller.error(cx, v);
320
321        // If canceled_1 is false or canceled_2 is false, resolve cancelPromise with undefined.
322        if !self.canceled_1.get() || !self.canceled_2.get() {
323            self.cancel_promise.resolve_native_with_cx(cx, &());
324        }
325    }
326}
327
328/// <https://streams.spec.whatwg.org/#readablestreamdefaultreader>
329#[dom_struct]
330pub(crate) struct ReadableStreamDefaultReader {
331    reflector_: Reflector,
332
333    /// <https://streams.spec.whatwg.org/#readablestreamgenericreader-stream>
334    stream: MutNullableDom<ReadableStream>,
335
336    read_requests: DomRefCell<VecDeque<ReadRequest>>,
337
338    /// <https://streams.spec.whatwg.org/#readablestreamgenericreader-closedpromise>
339    #[conditional_malloc_size_of]
340    closed_promise: DomRefCell<Rc<Promise>>,
341}
342
343impl ReadableStreamDefaultReader {
344    fn new_with_proto(
345        global: &GlobalScope,
346        proto: Option<SafeHandleObject>,
347        can_gc: CanGc,
348    ) -> DomRoot<ReadableStreamDefaultReader> {
349        reflect_dom_object_with_proto(
350            Box::new(ReadableStreamDefaultReader::new_inherited(global, can_gc)),
351            global,
352            proto,
353            can_gc,
354        )
355    }
356
357    fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> ReadableStreamDefaultReader {
358        ReadableStreamDefaultReader {
359            reflector_: Reflector::new(),
360            stream: MutNullableDom::new(None),
361            read_requests: DomRefCell::new(Default::default()),
362            closed_promise: DomRefCell::new(Promise::new(global, can_gc)),
363        }
364    }
365
366    pub(crate) fn new(global: &GlobalScope, can_gc: CanGc) -> DomRoot<ReadableStreamDefaultReader> {
367        reflect_dom_object(
368            Box::new(Self::new_inherited(global, can_gc)),
369            global,
370            can_gc,
371        )
372    }
373
374    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-reader>
375    pub(crate) fn set_up(
376        &self,
377        stream: &ReadableStream,
378        global: &GlobalScope,
379        can_gc: CanGc,
380    ) -> Fallible<()> {
381        // If ! IsReadableStreamLocked(stream) is true, throw a TypeError exception.
382        if stream.is_locked() {
383            return Err(Error::Type(c"stream is locked".to_owned()));
384        }
385        // Perform ! ReadableStreamReaderGenericInitialize(reader, stream).
386
387        self.generic_initialize(global, stream, can_gc);
388
389        // Set reader.[[readRequests]] to a new empty list.
390        self.read_requests.borrow_mut().clear();
391
392        Ok(())
393    }
394
395    /// <https://streams.spec.whatwg.org/#readable-stream-close>
396    pub(crate) fn close(&self, cx: &mut js::context::JSContext) {
397        // Resolve reader.[[closedPromise]] with undefined.
398        self.closed_promise.borrow().resolve_native_with_cx(cx, &());
399        // If reader implements ReadableStreamDefaultReader,
400        // Let readRequests be reader.[[readRequests]].
401        let mut read_requests = self.take_read_requests();
402        // Set reader.[[readRequests]] to an empty list.
403        // For each readRequest of readRequests,
404        for request in read_requests.drain(0..) {
405            // Perform readRequest’s close steps.
406            request.close_steps(cx);
407        }
408    }
409
410    /// <https://streams.spec.whatwg.org/#readable-stream-add-read-request>
411    pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
412        self.read_requests
413            .borrow_mut()
414            .push_back(read_request.clone());
415    }
416
417    /// <https://streams.spec.whatwg.org/#readable-stream-get-num-read-requests>
418    pub(crate) fn get_num_read_requests(&self) -> usize {
419        self.read_requests.borrow().len()
420    }
421
422    /// <https://streams.spec.whatwg.org/#readable-stream-error>
423    pub(crate) fn error(&self, cx: &mut js::context::JSContext, e: SafeHandleValue) {
424        // Reject reader.[[closedPromise]] with e.
425        self.closed_promise.borrow().reject_native_with_cx(cx, &e);
426
427        // Set reader.[[closedPromise]].[[PromiseIsHandled]] to true.
428        self.closed_promise.borrow().set_promise_is_handled();
429
430        // Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, e).
431        self.error_read_requests(cx, e);
432    }
433
434    /// The removal steps of <https://streams.spec.whatwg.org/#readable-stream-fulfill-read-request>
435    pub(crate) fn remove_read_request(&self) -> ReadRequest {
436        self.read_requests
437            .borrow_mut()
438            .pop_front()
439            .expect("Reader must have read request when remove is called into.")
440    }
441
442    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultreaderrelease>
443    pub(crate) fn release(&self, cx: &mut js::context::JSContext) -> Fallible<()> {
444        // Perform ! ReadableStreamReaderGenericRelease(reader).
445        self.generic_release(CanGc::from_cx(cx))
446            .expect("Generic release failed");
447        // Let e be a new TypeError exception.
448        rooted!(&in(cx) let mut error = UndefinedValue());
449        Error::Type(c"Reader is released".to_owned()).to_jsval(
450            cx.into(),
451            &self.global(),
452            error.handle_mut(),
453            CanGc::from_cx(cx),
454        );
455
456        // Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, e).
457        self.error_read_requests(cx, error.handle());
458        Ok(())
459    }
460
461    fn take_read_requests(&self) -> VecDeque<ReadRequest> {
462        mem::take(&mut *self.read_requests.borrow_mut())
463    }
464
465    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultreadererrorreadrequests>
466    fn error_read_requests(&self, cx: &mut js::context::JSContext, rval: SafeHandleValue) {
467        // step 1
468        let mut read_requests = self.take_read_requests();
469
470        // step 2 & 3
471        for request in read_requests.drain(0..) {
472            request.error_steps(cx, rval);
473        }
474    }
475
476    /// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read>
477    pub(crate) fn read(&self, cx: &mut js::context::JSContext, read_request: &ReadRequest) {
478        // Let stream be reader.[[stream]].
479
480        // Assert: stream is not undefined.
481        assert!(self.stream.get().is_some());
482
483        let stream = self.stream.get().unwrap();
484
485        // Set stream.[[disturbed]] to true.
486        stream.set_is_disturbed(true);
487        // If stream.[[state]] is "closed", perform readRequest’s close steps.
488        if stream.is_closed() {
489            read_request.close_steps(cx);
490        } else if stream.is_errored() {
491            // Otherwise, if stream.[[state]] is "errored",
492            // perform readRequest’s error steps given stream.[[storedError]].
493            rooted!(&in(cx) let mut error = UndefinedValue());
494            stream.get_stored_error(error.handle_mut());
495            read_request.error_steps(cx, error.handle());
496        } else {
497            // Otherwise
498            // Assert: stream.[[state]] is "readable".
499            assert!(stream.is_readable());
500            // Perform ! stream.[[controller]].[[PullSteps]](readRequest).
501            stream.perform_pull_steps(cx, read_request);
502        }
503    }
504
505    /// Attach the byte-tee error handler to this reader's closedPromise.
506    /// Used by ReadableByteStreamTee.
507    #[allow(clippy::too_many_arguments)]
508    pub(crate) fn byte_tee_append_native_handler_to_closed_promise(
509        &self,
510        cx: &mut js::context::JSContext,
511        branch_1: &ReadableStream,
512        branch_2: &ReadableStream,
513        canceled_1: Rc<Cell<bool>>,
514        canceled_2: Rc<Cell<bool>>,
515        cancel_promise: Rc<Promise>,
516        reader_version: Rc<Cell<u64>>,
517        expected_version: u64,
518    ) {
519        // Note: for byte tee we always operate on *byte controllers*.
520        let branch_1_controller = branch_1.get_byte_controller();
521        let branch_2_controller = branch_2.get_byte_controller();
522
523        let global = self.global();
524        let handler = PromiseNativeHandler::new(
525            cx,
526            &global,
527            None,
528            Some(Box::new(ByteTeeClosedPromiseRejectionHandler {
529                branch_1_controller: Dom::from_ref(&branch_1_controller),
530                branch_2_controller: Dom::from_ref(&branch_2_controller),
531                canceled_1,
532                canceled_2,
533                cancel_promise,
534                reader_version,
535                expected_version,
536            })),
537        );
538
539        let mut realm = enter_auto_realm(cx, &*global);
540        let cx = &mut realm.current_realm();
541
542        self.closed_promise
543            .borrow()
544            .append_native_handler(cx, &handler);
545    }
546
547    /// <https://streams.spec.whatwg.org/#ref-for-readablestreamgenericreader-closedpromise%E2%91%A1>
548    pub(crate) fn default_tee_append_native_handler_to_closed_promise(
549        &self,
550        cx: &mut js::context::JSContext,
551        branch_1: &ReadableStream,
552        branch_2: &ReadableStream,
553        canceled_1: Rc<Cell<bool>>,
554        canceled_2: Rc<Cell<bool>>,
555        cancel_promise: Rc<Promise>,
556    ) {
557        let branch_1_controller = branch_1.get_default_controller();
558
559        let branch_2_controller = branch_2.get_default_controller();
560
561        let global = self.global();
562        let handler = PromiseNativeHandler::new(
563            cx,
564            &global,
565            None,
566            Some(Box::new(DefaultTeeClosedPromiseRejectionHandler {
567                branch_1_controller: Dom::from_ref(&branch_1_controller),
568                branch_2_controller: Dom::from_ref(&branch_2_controller),
569                canceled_1,
570                canceled_2,
571                cancel_promise,
572            })),
573        );
574
575        let mut realm = enter_auto_realm(cx, &*global);
576        let cx = &mut realm.current_realm();
577
578        self.closed_promise
579            .borrow()
580            .append_native_handler(cx, &handler);
581    }
582
583    /// <https://streams.spec.whatwg.org/#readablestreamdefaultreader-read-all-bytes>
584    pub(crate) fn read_all_bytes(
585        &self,
586        cx: &mut js::context::JSContext,
587        success_steps: Rc<ReadAllBytesSuccessSteps>,
588        failure_steps: Rc<ReadAllBytesFailureSteps>,
589    ) {
590        // To read all bytes from a ReadableStreamDefaultReader reader,
591        // given successSteps, which is an algorithm accepting a byte sequence,
592        // and failureSteps, which is an algorithm accepting a JavaScript value:
593        // read-loop given reader, a new byte sequence, successSteps, and failureSteps.
594        read_loop(cx, self, success_steps, failure_steps);
595    }
596
597    /// step 3 of <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerprocessreadrequestsusingqueue>
598    pub(crate) fn process_read_requests(
599        &self,
600        cx: &mut js::context::JSContext,
601        controller: DomRoot<ReadableByteStreamController>,
602    ) -> Fallible<()> {
603        // While reader.[[readRequests]] is not empty,
604        while !self.read_requests.borrow().is_empty() {
605            // If controller.[[queueTotalSize]] is 0, return.
606            if controller.get_queue_total_size() == 0.0 {
607                return Ok(());
608            }
609
610            // Let readRequest be reader.[[readRequests]][0].
611            // Remove entry from controller.[[queue]].
612            let read_request = self.remove_read_request();
613
614            // Perform ! ReadableByteStreamControllerFillReadRequestFromQueue(controller, readRequest).
615            controller
616                .fill_read_request_from_queue(cx, &read_request)
617                .expect("Fill read request from queue failed");
618        }
619        Ok(())
620    }
621}
622
623impl ReadableStreamDefaultReaderMethods<crate::DomTypeHolder> for ReadableStreamDefaultReader {
624    /// <https://streams.spec.whatwg.org/#default-reader-constructor>
625    fn Constructor(
626        global: &GlobalScope,
627        proto: Option<SafeHandleObject>,
628        can_gc: CanGc,
629        stream: &ReadableStream,
630    ) -> Fallible<DomRoot<Self>> {
631        let reader = Self::new_with_proto(global, proto, can_gc);
632
633        // Perform ? SetUpReadableStreamDefaultReader(this, stream).
634        Self::set_up(&reader, stream, global, can_gc)?;
635
636        Ok(reader)
637    }
638
639    /// <https://streams.spec.whatwg.org/#default-reader-read>
640    fn Read(&self, cx: &mut js::context::JSContext) -> Rc<Promise> {
641        // If this.[[stream]] is undefined, return a promise rejected with a TypeError exception.
642        if self.stream.get().is_none() {
643            rooted!(&in(cx) let mut error = UndefinedValue());
644            Error::Type(c"stream is undefined".to_owned()).to_jsval(
645                cx.into(),
646                &self.global(),
647                error.handle_mut(),
648                CanGc::from_cx(cx),
649            );
650            return Promise::new_rejected(
651                &self.global(),
652                cx.into(),
653                error.handle(),
654                CanGc::from_cx(cx),
655            );
656        }
657        // Let promise be a new promise.
658        let promise = Promise::new2(cx, &self.global());
659
660        // Let readRequest be a new read request with the following items:
661        // chunk steps, given chunk
662        // Resolve promise with «[ "value" → chunk, "done" → false ]».
663        //
664        // close steps
665        // Resolve promise with «[ "value" → undefined, "done" → true ]».
666        //
667        // error steps, given e
668        // Reject promise with e.
669
670        // Rooting(unrooted_must_root): the read request contains only a promise,
671        // which does not need to be rooted,
672        // as it is safely managed natively via an Rc.
673        let read_request = ReadRequest::Read(promise.clone());
674
675        // Perform ! ReadableStreamDefaultReaderRead(this, readRequest).
676        self.read(cx, &read_request);
677
678        // Return promise.
679        promise
680    }
681
682    /// <https://streams.spec.whatwg.org/#default-reader-release-lock>
683    fn ReleaseLock(&self, cx: &mut js::context::JSContext) -> Fallible<()> {
684        if self.stream.get().is_none() {
685            // Step 1: If this.[[stream]] is undefined, return.
686            return Ok(());
687        }
688
689        // Step 2: Perform !ReadableStreamDefaultReaderRelease(this).
690        self.release(cx)
691    }
692
693    /// <https://streams.spec.whatwg.org/#generic-reader-closed>
694    fn Closed(&self) -> Rc<Promise> {
695        self.closed()
696    }
697
698    /// <https://streams.spec.whatwg.org/#generic-reader-cancel>
699    fn Cancel(&self, cx: &mut js::context::JSContext, reason: SafeHandleValue) -> Rc<Promise> {
700        self.generic_cancel(cx, &self.global(), reason)
701    }
702}
703
704impl ReadableStreamGenericReader for ReadableStreamDefaultReader {
705    fn get_closed_promise(&self) -> Rc<Promise> {
706        self.closed_promise.borrow().clone()
707    }
708
709    fn set_closed_promise(&self, promise: Rc<Promise>) {
710        *self.closed_promise.borrow_mut() = promise;
711    }
712
713    fn set_stream(&self, stream: Option<&ReadableStream>) {
714        self.stream.set(stream);
715    }
716
717    fn get_stream(&self) -> Option<DomRoot<ReadableStream>> {
718        self.stream.get()
719    }
720
721    fn as_default_reader(&self) -> Option<&ReadableStreamDefaultReader> {
722        Some(self)
723    }
724}