script/dom/stream/
readablestreamdefaultreader.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::collections::VecDeque;
7use std::mem;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::jsapi::Heap;
12use js::jsval::{JSVal, UndefinedValue};
13use js::realm::CurrentRealm;
14use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
15
16use super::byteteereadrequest::ByteTeeReadRequest;
17use super::readablebytestreamcontroller::ReadableByteStreamController;
18use crate::dom::bindings::cell::DomRefCell;
19use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::{
20    ReadableStreamDefaultReaderMethods, ReadableStreamReadResult,
21};
22use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
23use crate::dom::bindings::reflector::{
24    DomGlobal, Reflector, reflect_dom_object, reflect_dom_object_with_proto,
25};
26use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
27use crate::dom::bindings::trace::RootedTraceableBox;
28use crate::dom::globalscope::GlobalScope;
29use crate::dom::promise::Promise;
30use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
31use crate::dom::readablestream::{ReadableStream, bytes_from_chunk_jsval};
32use crate::dom::stream::defaultteereadrequest::DefaultTeeReadRequest;
33use crate::dom::stream::readablestreamgenericreader::ReadableStreamGenericReader;
34use crate::dom::types::ReadableStreamDefaultController;
35use crate::realms::{InRealm, enter_realm};
36use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
37
38type ReadAllBytesSuccessSteps = dyn Fn(&[u8]);
39type ReadAllBytesFailureSteps = dyn Fn(SafeJSContext, SafeHandleValue);
40
41impl js::gc::Rootable for ContinueReadMicrotask {}
42
43/// Microtask handler to continue the read loop without recursion.
44/// Spec note: "This recursion could potentially cause a stack overflow
45/// if implemented directly. Implementations will need to mitigate this,
46/// e.g. by using a non-recursive variant of this algorithm, or queuing
47/// a microtask…"
48#[derive(Clone, JSTraceable, MallocSizeOf)]
49#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
50struct ContinueReadMicrotask {
51    reader: Dom<ReadableStreamDefaultReader>,
52    request: ReadRequest,
53}
54
55impl Callback for ContinueReadMicrotask {
56    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
57        let can_gc = CanGc::from_cx(cx);
58        // https://streams.spec.whatwg.org/#ref-for-read-loop%E2%91%A0
59        // Note: continuing the read-loop from inside a micro-task to break recursion.
60        self.reader.read(cx.into(), &self.request, can_gc);
61    }
62}
63
64/// <https://streams.spec.whatwg.org/#read-loop>
65fn read_loop(
66    reader: &ReadableStreamDefaultReader,
67    cx: SafeJSContext,
68    success_steps: Rc<ReadAllBytesSuccessSteps>,
69    failure_steps: Rc<ReadAllBytesFailureSteps>,
70    can_gc: CanGc,
71) {
72    // For the purposes of the above algorithm, to read-loop given reader,
73    // bytes, successSteps, and failureSteps:
74
75    // Step 1 .Let readRequest be a new read request with the following items:
76    let req = ReadRequest::ReadLoop {
77        success_steps,
78        failure_steps,
79        reader: Dom::from_ref(reader),
80        bytes: Rc::new(DomRefCell::new(Vec::new())),
81    };
82    // Step 2 .Perform ! ReadableStreamDefaultReaderRead(reader, readRequest).
83    reader.read(cx, &req, can_gc);
84}
85
86/// <https://streams.spec.whatwg.org/#read-request>
87#[derive(Clone, JSTraceable, MallocSizeOf)]
88pub(crate) enum ReadRequest {
89    /// <https://streams.spec.whatwg.org/#default-reader-read>
90    Read(#[conditional_malloc_size_of] Rc<Promise>),
91    /// <https://streams.spec.whatwg.org/#ref-for-read-request%E2%91%A2>
92    DefaultTee {
93        tee_read_request: Dom<DefaultTeeReadRequest>,
94    },
95    /// Spec read loop variant, driven by read-request steps (no Promise).
96    /// <https://streams.spec.whatwg.org/#read-loop>
97    ReadLoop {
98        #[ignore_malloc_size_of = "Rc is hard"]
99        #[no_trace]
100        success_steps: Rc<ReadAllBytesSuccessSteps>,
101        #[ignore_malloc_size_of = "Rc is hard"]
102        #[no_trace]
103        failure_steps: Rc<ReadAllBytesFailureSteps>,
104        reader: Dom<ReadableStreamDefaultReader>,
105        #[conditional_malloc_size_of]
106        bytes: Rc<DomRefCell<Vec<u8>>>,
107    },
108    ByteTee {
109        byte_tee_read_request: Dom<ByteTeeReadRequest>,
110    },
111}
112
113impl ReadRequest {
114    /// <https://streams.spec.whatwg.org/#read-request-chunk-steps>
115    pub(crate) fn chunk_steps(
116        &self,
117        chunk: RootedTraceableBox<Heap<JSVal>>,
118        global: &GlobalScope,
119        can_gc: CanGc,
120    ) {
121        match self {
122            ReadRequest::Read(promise) => {
123                // chunk steps, given chunk
124                // Resolve promise with «[ "value" → chunk, "done" → false ]».
125                promise.resolve_native(
126                    &ReadableStreamReadResult {
127                        done: Some(false),
128                        value: chunk,
129                    },
130                    can_gc,
131                );
132            },
133            ReadRequest::DefaultTee { tee_read_request } => {
134                tee_read_request.enqueue_chunk_steps(chunk);
135            },
136            ReadRequest::ByteTee {
137                byte_tee_read_request,
138            } => {
139                byte_tee_read_request.enqueue_chunk_steps(global, chunk);
140            },
141            ReadRequest::ReadLoop {
142                success_steps: _,
143                failure_steps,
144                reader,
145                bytes,
146            } => {
147                // Spec: chunk steps, given chunk
148                let cx = GlobalScope::get_cx();
149                let global = reader.global();
150
151                match bytes_from_chunk_jsval(cx, &chunk, can_gc) {
152                    Ok(vec) => {
153                        // Step 2. Append the bytes represented by chunk to bytes.
154                        bytes.borrow_mut().extend_from_slice(&vec);
155
156                        // Step 3. Read-loop given reader, bytes, successSteps, and failureSteps.
157                        // Spec note: Avoid direct recursion; queue into a microtask.
158                        // Resolving the promise will queue a microtask to call into the native handler.
159                        let tick = Promise::new(&global, can_gc);
160                        tick.resolve_native(&(), can_gc);
161
162                        let handler = PromiseNativeHandler::new(
163                            &global,
164                            Some(Box::new(ContinueReadMicrotask {
165                                reader: Dom::from_ref(reader),
166                                request: self.clone(),
167                            })),
168                            None,
169                            can_gc,
170                        );
171
172                        let realm = enter_realm(&*global);
173                        let comp = InRealm::Entered(&realm);
174                        tick.append_native_handler(&handler, comp, can_gc);
175                    },
176                    Err(err) => {
177                        // Step 1. If chunk is not a Uint8Array object, call failureSteps with a TypeError and abort.
178                        rooted!(in(*cx) let mut v = UndefinedValue());
179                        err.to_jsval(cx, &global, v.handle_mut(), can_gc);
180                        (failure_steps)(cx, v.handle());
181                    },
182                }
183            },
184        }
185    }
186
187    /// <https://streams.spec.whatwg.org/#read-request-close-steps>
188    pub(crate) fn close_steps(&self, can_gc: CanGc) {
189        match self {
190            ReadRequest::Read(promise) => {
191                // close steps
192                // Resolve promise with «[ "value" → undefined, "done" → true ]».
193                let result = RootedTraceableBox::new(Heap::default());
194                result.set(UndefinedValue());
195                promise.resolve_native(
196                    &ReadableStreamReadResult {
197                        done: Some(true),
198                        value: result,
199                    },
200                    can_gc,
201                );
202            },
203            ReadRequest::DefaultTee { tee_read_request } => {
204                tee_read_request.close_steps(can_gc);
205            },
206            ReadRequest::ByteTee {
207                byte_tee_read_request,
208            } => {
209                byte_tee_read_request
210                    .close_steps(can_gc)
211                    .expect("ByteTeeReadRequest close steps should not fail");
212            },
213            ReadRequest::ReadLoop {
214                success_steps,
215                reader,
216                bytes,
217                ..
218            } => {
219                // Step 1. Call successSteps with bytes.
220                (success_steps)(&bytes.borrow());
221
222                reader
223                    .release(can_gc)
224                    .expect("Releasing the read-all-bytes reader should succeed");
225            },
226        }
227    }
228
229    /// <https://streams.spec.whatwg.org/#read-request-error-steps>
230    pub(crate) fn error_steps(&self, e: SafeHandleValue, can_gc: CanGc) {
231        match self {
232            ReadRequest::Read(promise) => {
233                // error steps, given e
234                // Reject promise with e.
235                promise.reject_native(&e, can_gc)
236            },
237            ReadRequest::DefaultTee { tee_read_request } => {
238                tee_read_request.error_steps();
239            },
240            ReadRequest::ByteTee {
241                byte_tee_read_request,
242            } => {
243                byte_tee_read_request.error_steps();
244            },
245            ReadRequest::ReadLoop {
246                failure_steps,
247                reader,
248                ..
249            } => {
250                // Step 1. Call failureSteps with e.
251                let cx = GlobalScope::get_cx();
252                (failure_steps)(cx, e);
253
254                reader
255                    .release(can_gc)
256                    .expect("Releasing the read-all-bytes reader should succeed");
257            },
258        }
259    }
260}
261
262/// The rejection handler for
263/// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee>
264#[derive(Clone, JSTraceable, MallocSizeOf)]
265#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
266struct ByteTeeClosedPromiseRejectionHandler {
267    branch_1_controller: Dom<ReadableByteStreamController>,
268    branch_2_controller: Dom<ReadableByteStreamController>,
269    #[conditional_malloc_size_of]
270    canceled_1: Rc<Cell<bool>>,
271    #[conditional_malloc_size_of]
272    canceled_2: Rc<Cell<bool>>,
273    #[conditional_malloc_size_of]
274    cancel_promise: Rc<Promise>,
275    #[conditional_malloc_size_of]
276    reader_version: Rc<Cell<u64>>,
277    expected_version: u64,
278}
279
280impl Callback for ByteTeeClosedPromiseRejectionHandler {
281    /// Continuation of <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee>
282    /// Upon rejection of reader.[[closedPromise]] with reason r,
283    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
284        let can_gc = CanGc::from_cx(cx);
285        // If thisReader is not the current `reader`, return.
286        if self.reader_version.get() != self.expected_version {
287            return;
288        }
289
290        // Perform ! ReadableByteStreamControllerError(branch1.[[controller]], r).
291        self.branch_1_controller.error(v, can_gc);
292
293        // Perform ! ReadableByteStreamControllerError(branch2.[[controller]], r).
294        self.branch_2_controller.error(v, can_gc);
295
296        // If canceled1 is false or canceled2 is false, resolve cancelPromise with undefined.
297        if !self.canceled_1.get() || !self.canceled_2.get() {
298            self.cancel_promise.resolve_native(&(), can_gc);
299        }
300    }
301}
302
303/// The rejection handler for
304/// <https://streams.spec.whatwg.org/#readable-stream-tee>
305#[derive(Clone, JSTraceable, MallocSizeOf)]
306#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
307struct DefaultTeeClosedPromiseRejectionHandler {
308    branch_1_controller: Dom<ReadableStreamDefaultController>,
309    branch_2_controller: Dom<ReadableStreamDefaultController>,
310    #[conditional_malloc_size_of]
311    canceled_1: Rc<Cell<bool>>,
312    #[conditional_malloc_size_of]
313    canceled_2: Rc<Cell<bool>>,
314    #[conditional_malloc_size_of]
315    cancel_promise: Rc<Promise>,
316}
317
318impl Callback for DefaultTeeClosedPromiseRejectionHandler {
319    /// Continuation of <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee>
320    /// Upon rejection of reader.[[closedPromise]] with reason r,
321    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
322        let can_gc = CanGc::from_cx(cx);
323        // Perform ! ReadableStreamDefaultControllerError(branch_1.[[controller]], r).
324        self.branch_1_controller.error(v, can_gc);
325        // Perform ! ReadableStreamDefaultControllerError(branch_2.[[controller]], r).
326        self.branch_2_controller.error(v, can_gc);
327
328        // If canceled_1 is false or canceled_2 is false, resolve cancelPromise with undefined.
329        if !self.canceled_1.get() || !self.canceled_2.get() {
330            self.cancel_promise.resolve_native(&(), can_gc);
331        }
332    }
333}
334
335/// <https://streams.spec.whatwg.org/#readablestreamdefaultreader>
336#[dom_struct]
337pub(crate) struct ReadableStreamDefaultReader {
338    reflector_: Reflector,
339
340    /// <https://streams.spec.whatwg.org/#readablestreamgenericreader-stream>
341    stream: MutNullableDom<ReadableStream>,
342
343    read_requests: DomRefCell<VecDeque<ReadRequest>>,
344
345    /// <https://streams.spec.whatwg.org/#readablestreamgenericreader-closedpromise>
346    #[conditional_malloc_size_of]
347    closed_promise: DomRefCell<Rc<Promise>>,
348}
349
350impl ReadableStreamDefaultReader {
351    fn new_with_proto(
352        global: &GlobalScope,
353        proto: Option<SafeHandleObject>,
354        can_gc: CanGc,
355    ) -> DomRoot<ReadableStreamDefaultReader> {
356        reflect_dom_object_with_proto(
357            Box::new(ReadableStreamDefaultReader::new_inherited(global, can_gc)),
358            global,
359            proto,
360            can_gc,
361        )
362    }
363
364    fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> ReadableStreamDefaultReader {
365        ReadableStreamDefaultReader {
366            reflector_: Reflector::new(),
367            stream: MutNullableDom::new(None),
368            read_requests: DomRefCell::new(Default::default()),
369            closed_promise: DomRefCell::new(Promise::new(global, can_gc)),
370        }
371    }
372
373    pub(crate) fn new(global: &GlobalScope, can_gc: CanGc) -> DomRoot<ReadableStreamDefaultReader> {
374        reflect_dom_object(
375            Box::new(Self::new_inherited(global, can_gc)),
376            global,
377            can_gc,
378        )
379    }
380
381    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-reader>
382    pub(crate) fn set_up(
383        &self,
384        stream: &ReadableStream,
385        global: &GlobalScope,
386        can_gc: CanGc,
387    ) -> Fallible<()> {
388        // If ! IsReadableStreamLocked(stream) is true, throw a TypeError exception.
389        if stream.is_locked() {
390            return Err(Error::Type("stream is locked".to_owned()));
391        }
392        // Perform ! ReadableStreamReaderGenericInitialize(reader, stream).
393
394        self.generic_initialize(global, stream, can_gc);
395
396        // Set reader.[[readRequests]] to a new empty list.
397        self.read_requests.borrow_mut().clear();
398
399        Ok(())
400    }
401
402    /// <https://streams.spec.whatwg.org/#readable-stream-close>
403    pub(crate) fn close(&self, can_gc: CanGc) {
404        // Resolve reader.[[closedPromise]] with undefined.
405        self.closed_promise.borrow().resolve_native(&(), can_gc);
406        // If reader implements ReadableStreamDefaultReader,
407        // Let readRequests be reader.[[readRequests]].
408        let mut read_requests = self.take_read_requests();
409        // Set reader.[[readRequests]] to an empty list.
410        // For each readRequest of readRequests,
411        for request in read_requests.drain(0..) {
412            // Perform readRequest’s close steps.
413            request.close_steps(can_gc);
414        }
415    }
416
417    /// <https://streams.spec.whatwg.org/#readable-stream-add-read-request>
418    pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
419        self.read_requests
420            .borrow_mut()
421            .push_back(read_request.clone());
422    }
423
424    /// <https://streams.spec.whatwg.org/#readable-stream-get-num-read-requests>
425    pub(crate) fn get_num_read_requests(&self) -> usize {
426        self.read_requests.borrow().len()
427    }
428
429    /// <https://streams.spec.whatwg.org/#readable-stream-error>
430    pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
431        // Reject reader.[[closedPromise]] with e.
432        self.closed_promise.borrow().reject_native(&e, can_gc);
433
434        // Set reader.[[closedPromise]].[[PromiseIsHandled]] to true.
435        self.closed_promise.borrow().set_promise_is_handled();
436
437        // Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, e).
438        self.error_read_requests(e, can_gc);
439    }
440
441    /// The removal steps of <https://streams.spec.whatwg.org/#readable-stream-fulfill-read-request>
442    pub(crate) fn remove_read_request(&self) -> ReadRequest {
443        self.read_requests
444            .borrow_mut()
445            .pop_front()
446            .expect("Reader must have read request when remove is called into.")
447    }
448
449    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultreaderrelease>
450    pub(crate) fn release(&self, can_gc: CanGc) -> Fallible<()> {
451        // Perform ! ReadableStreamReaderGenericRelease(reader).
452        self.generic_release(can_gc)
453            .expect("Generic release failed");
454        // Let e be a new TypeError exception.
455        let cx = GlobalScope::get_cx();
456        rooted!(in(*cx) let mut error = UndefinedValue());
457        Error::Type("Reader is released".to_owned()).to_jsval(
458            cx,
459            &self.global(),
460            error.handle_mut(),
461            can_gc,
462        );
463
464        // Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, e).
465        self.error_read_requests(error.handle(), can_gc);
466        Ok(())
467    }
468
469    fn take_read_requests(&self) -> VecDeque<ReadRequest> {
470        mem::take(&mut *self.read_requests.borrow_mut())
471    }
472
473    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultreadererrorreadrequests>
474    fn error_read_requests(&self, rval: SafeHandleValue, can_gc: CanGc) {
475        // step 1
476        let mut read_requests = self.take_read_requests();
477
478        // step 2 & 3
479        for request in read_requests.drain(0..) {
480            request.error_steps(rval, can_gc);
481        }
482    }
483
484    /// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read>
485    pub(crate) fn read(&self, cx: SafeJSContext, read_request: &ReadRequest, can_gc: CanGc) {
486        // Let stream be reader.[[stream]].
487
488        // Assert: stream is not undefined.
489        assert!(self.stream.get().is_some());
490
491        let stream = self.stream.get().unwrap();
492
493        // Set stream.[[disturbed]] to true.
494        stream.set_is_disturbed(true);
495        // If stream.[[state]] is "closed", perform readRequest’s close steps.
496        if stream.is_closed() {
497            read_request.close_steps(can_gc);
498        } else if stream.is_errored() {
499            // Otherwise, if stream.[[state]] is "errored",
500            // perform readRequest’s error steps given stream.[[storedError]].
501            let cx = GlobalScope::get_cx();
502            rooted!(in(*cx) let mut error = UndefinedValue());
503            stream.get_stored_error(error.handle_mut());
504            read_request.error_steps(error.handle(), can_gc);
505        } else {
506            // Otherwise
507            // Assert: stream.[[state]] is "readable".
508            assert!(stream.is_readable());
509            // Perform ! stream.[[controller]].[[PullSteps]](readRequest).
510            stream.perform_pull_steps(cx, read_request, can_gc);
511        }
512    }
513
514    /// Attach the byte-tee error handler to this reader's closedPromise.
515    /// Used by ReadableByteStreamTee.
516    #[allow(clippy::too_many_arguments)]
517    pub(crate) fn byte_tee_append_native_handler_to_closed_promise(
518        &self,
519        branch_1: &ReadableStream,
520        branch_2: &ReadableStream,
521        canceled_1: Rc<Cell<bool>>,
522        canceled_2: Rc<Cell<bool>>,
523        cancel_promise: Rc<Promise>,
524        reader_version: Rc<Cell<u64>>,
525        expected_version: u64,
526        can_gc: CanGc,
527    ) {
528        // Note: for byte tee we always operate on *byte controllers*.
529        let branch_1_controller = branch_1.get_byte_controller();
530        let branch_2_controller = branch_2.get_byte_controller();
531
532        let global = self.global();
533        let handler = PromiseNativeHandler::new(
534            &global,
535            None,
536            Some(Box::new(ByteTeeClosedPromiseRejectionHandler {
537                branch_1_controller: Dom::from_ref(&branch_1_controller),
538                branch_2_controller: Dom::from_ref(&branch_2_controller),
539                canceled_1,
540                canceled_2,
541                cancel_promise,
542                reader_version,
543                expected_version,
544            })),
545            can_gc,
546        );
547
548        let realm = enter_realm(&*global);
549        let comp = InRealm::Entered(&realm);
550
551        self.closed_promise
552            .borrow()
553            .append_native_handler(&handler, comp, can_gc);
554    }
555
556    /// <https://streams.spec.whatwg.org/#ref-for-readablestreamgenericreader-closedpromise%E2%91%A1>
557    pub(crate) fn default_tee_append_native_handler_to_closed_promise(
558        &self,
559        branch_1: &ReadableStream,
560        branch_2: &ReadableStream,
561        canceled_1: Rc<Cell<bool>>,
562        canceled_2: Rc<Cell<bool>>,
563        cancel_promise: Rc<Promise>,
564        can_gc: CanGc,
565    ) {
566        let branch_1_controller = branch_1.get_default_controller();
567
568        let branch_2_controller = branch_2.get_default_controller();
569
570        let global = self.global();
571        let handler = PromiseNativeHandler::new(
572            &global,
573            None,
574            Some(Box::new(DefaultTeeClosedPromiseRejectionHandler {
575                branch_1_controller: Dom::from_ref(&branch_1_controller),
576                branch_2_controller: Dom::from_ref(&branch_2_controller),
577                canceled_1,
578                canceled_2,
579                cancel_promise,
580            })),
581            can_gc,
582        );
583
584        let realm = enter_realm(&*global);
585        let comp = InRealm::Entered(&realm);
586
587        self.closed_promise
588            .borrow()
589            .append_native_handler(&handler, comp, can_gc);
590    }
591
592    /// <https://streams.spec.whatwg.org/#readablestreamdefaultreader-read-all-bytes>
593    pub(crate) fn read_all_bytes(
594        &self,
595        cx: SafeJSContext,
596        success_steps: Rc<ReadAllBytesSuccessSteps>,
597        failure_steps: Rc<ReadAllBytesFailureSteps>,
598        can_gc: CanGc,
599    ) {
600        // To read all bytes from a ReadableStreamDefaultReader reader,
601        // given successSteps, which is an algorithm accepting a byte sequence,
602        // and failureSteps, which is an algorithm accepting a JavaScript value:
603        // read-loop given reader, a new byte sequence, successSteps, and failureSteps.
604        read_loop(self, cx, success_steps, failure_steps, can_gc);
605    }
606
607    /// step 3 of <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerprocessreadrequestsusingqueue>
608    pub(crate) fn process_read_requests(
609        &self,
610        cx: SafeJSContext,
611        controller: DomRoot<ReadableByteStreamController>,
612        can_gc: CanGc,
613    ) -> Fallible<()> {
614        // While reader.[[readRequests]] is not empty,
615        while !self.read_requests.borrow().is_empty() {
616            // If controller.[[queueTotalSize]] is 0, return.
617            if controller.get_queue_total_size() == 0.0 {
618                return Ok(());
619            }
620
621            // Let readRequest be reader.[[readRequests]][0].
622            // Remove entry from controller.[[queue]].
623            let read_request = self.remove_read_request();
624
625            // Perform ! ReadableByteStreamControllerFillReadRequestFromQueue(controller, readRequest).
626            controller
627                .fill_read_request_from_queue(cx, &read_request, can_gc)
628                .expect("Fill read request from queue failed");
629        }
630        Ok(())
631    }
632}
633
634impl ReadableStreamDefaultReaderMethods<crate::DomTypeHolder> for ReadableStreamDefaultReader {
635    /// <https://streams.spec.whatwg.org/#default-reader-constructor>
636    fn Constructor(
637        global: &GlobalScope,
638        proto: Option<SafeHandleObject>,
639        can_gc: CanGc,
640        stream: &ReadableStream,
641    ) -> Fallible<DomRoot<Self>> {
642        let reader = Self::new_with_proto(global, proto, can_gc);
643
644        // Perform ? SetUpReadableStreamDefaultReader(this, stream).
645        Self::set_up(&reader, stream, global, can_gc)?;
646
647        Ok(reader)
648    }
649
650    /// <https://streams.spec.whatwg.org/#default-reader-read>
651    fn Read(&self, can_gc: CanGc) -> Rc<Promise> {
652        let cx = GlobalScope::get_cx();
653        // If this.[[stream]] is undefined, return a promise rejected with a TypeError exception.
654        if self.stream.get().is_none() {
655            rooted!(in(*cx) let mut error = UndefinedValue());
656            Error::Type("stream is undefined".to_owned()).to_jsval(
657                cx,
658                &self.global(),
659                error.handle_mut(),
660                can_gc,
661            );
662            return Promise::new_rejected(&self.global(), cx, error.handle(), can_gc);
663        }
664        // Let promise be a new promise.
665        let promise = Promise::new(&self.global(), can_gc);
666
667        // Let readRequest be a new read request with the following items:
668        // chunk steps, given chunk
669        // Resolve promise with «[ "value" → chunk, "done" → false ]».
670        //
671        // close steps
672        // Resolve promise with «[ "value" → undefined, "done" → true ]».
673        //
674        // error steps, given e
675        // Reject promise with e.
676
677        // Rooting(unrooted_must_root): the read request contains only a promise,
678        // which does not need to be rooted,
679        // as it is safely managed natively via an Rc.
680        let read_request = ReadRequest::Read(promise.clone());
681
682        // Perform ! ReadableStreamDefaultReaderRead(this, readRequest).
683        self.read(cx, &read_request, can_gc);
684
685        // Return promise.
686        promise
687    }
688
689    /// <https://streams.spec.whatwg.org/#default-reader-release-lock>
690    fn ReleaseLock(&self, can_gc: CanGc) -> Fallible<()> {
691        if self.stream.get().is_none() {
692            // Step 1: If this.[[stream]] is undefined, return.
693            return Ok(());
694        }
695
696        // Step 2: Perform !ReadableStreamDefaultReaderRelease(this).
697        self.release(can_gc)
698    }
699
700    /// <https://streams.spec.whatwg.org/#generic-reader-closed>
701    fn Closed(&self) -> Rc<Promise> {
702        self.closed()
703    }
704
705    /// <https://streams.spec.whatwg.org/#generic-reader-cancel>
706    fn Cancel(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
707        self.generic_cancel(cx, &self.global(), reason, can_gc)
708    }
709}
710
711impl ReadableStreamGenericReader for ReadableStreamDefaultReader {
712    fn get_closed_promise(&self) -> Rc<Promise> {
713        self.closed_promise.borrow().clone()
714    }
715
716    fn set_closed_promise(&self, promise: Rc<Promise>) {
717        *self.closed_promise.borrow_mut() = promise;
718    }
719
720    fn set_stream(&self, stream: Option<&ReadableStream>) {
721        self.stream.set(stream);
722    }
723
724    fn get_stream(&self) -> Option<DomRoot<ReadableStream>> {
725        self.stream.get()
726    }
727
728    fn as_default_reader(&self) -> Option<&ReadableStreamDefaultReader> {
729        Some(self)
730    }
731}