Skip to main content

script/dom/stream/
readablestreamdefaultreader.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::collections::VecDeque;
7use std::mem;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::context::JSContext;
12use js::jsapi::Heap;
13use js::jsval::{JSVal, UndefinedValue};
14use js::realm::CurrentRealm;
15use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
16use script_bindings::cell::DomRefCell;
17use script_bindings::reflector::{
18    Reflector, reflect_dom_object_with_cx, reflect_dom_object_with_proto_and_cx,
19};
20
21use super::byteteereadrequest::ByteTeeReadRequest;
22use super::readablebytestreamcontroller::ReadableByteStreamController;
23use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::{
24    ReadableStreamDefaultReaderMethods, ReadableStreamReadResult,
25};
26use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
27use crate::dom::bindings::reflector::DomGlobal;
28use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
29use crate::dom::bindings::trace::RootedTraceableBox;
30use crate::dom::globalscope::GlobalScope;
31use crate::dom::promise::Promise;
32use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
33use crate::dom::readablestream::{ReadableStream, bytes_from_chunk_jsval};
34use crate::dom::stream::defaultteereadrequest::DefaultTeeReadRequest;
35use crate::dom::stream::readablestreamgenericreader::ReadableStreamGenericReader;
36use crate::dom::types::ReadableStreamDefaultController;
37use crate::realms::enter_auto_realm;
38use crate::script_runtime::CanGc;
39
40type ReadAllBytesSuccessSteps = dyn Fn(&mut js::context::JSContext, &[u8]);
41type ReadAllBytesFailureSteps = dyn Fn(&mut js::context::JSContext, SafeHandleValue);
42
43impl js::gc::Rootable for ContinueReadMicrotask {}
44
45/// Microtask handler to continue the read loop without recursion.
46/// Spec note: "This recursion could potentially cause a stack overflow
47/// if implemented directly. Implementations will need to mitigate this,
48/// e.g. by using a non-recursive variant of this algorithm, or queuing
49/// a microtask…"
50#[derive(Clone, JSTraceable, MallocSizeOf)]
51#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
52struct ContinueReadMicrotask {
53    reader: Dom<ReadableStreamDefaultReader>,
54    request: ReadRequest,
55}
56
57impl Callback for ContinueReadMicrotask {
58    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
59        // https://streams.spec.whatwg.org/#ref-for-read-loop%E2%91%A0
60        // Note: continuing the read-loop from inside a micro-task to break recursion.
61        self.reader.read(cx, &self.request);
62    }
63}
64
65/// <https://streams.spec.whatwg.org/#read-loop>
66fn read_loop(
67    cx: &mut js::context::JSContext,
68    reader: &ReadableStreamDefaultReader,
69    success_steps: Rc<ReadAllBytesSuccessSteps>,
70    failure_steps: Rc<ReadAllBytesFailureSteps>,
71) {
72    // For the purposes of the above algorithm, to read-loop given reader,
73    // bytes, successSteps, and failureSteps:
74
75    // Step 1 .Let readRequest be a new read request with the following items:
76    let req = ReadRequest::ReadLoop {
77        success_steps,
78        failure_steps,
79        reader: Dom::from_ref(reader),
80        bytes: Rc::new(DomRefCell::new(Vec::new())),
81    };
82    // Step 2 .Perform ! ReadableStreamDefaultReaderRead(reader, readRequest).
83    reader.read(cx, &req);
84}
85
86/// <https://streams.spec.whatwg.org/#read-request>
87#[derive(Clone, JSTraceable, MallocSizeOf)]
88pub(crate) enum ReadRequest {
89    /// <https://streams.spec.whatwg.org/#default-reader-read>
90    Read(#[conditional_malloc_size_of] Rc<Promise>),
91    /// <https://streams.spec.whatwg.org/#ref-for-read-request%E2%91%A2>
92    DefaultTee {
93        tee_read_request: Dom<DefaultTeeReadRequest>,
94    },
95    /// Spec read loop variant, driven by read-request steps (no Promise).
96    /// <https://streams.spec.whatwg.org/#read-loop>
97    ReadLoop {
98        #[ignore_malloc_size_of = "dyn Fn"]
99        #[no_trace]
100        success_steps: Rc<ReadAllBytesSuccessSteps>,
101        #[ignore_malloc_size_of = "dyn Fn"]
102        #[no_trace]
103        failure_steps: Rc<ReadAllBytesFailureSteps>,
104        reader: Dom<ReadableStreamDefaultReader>,
105        #[conditional_malloc_size_of]
106        bytes: Rc<DomRefCell<Vec<u8>>>,
107    },
108    ByteTee {
109        byte_tee_read_request: Dom<ByteTeeReadRequest>,
110    },
111}
112
113impl ReadRequest {
114    /// <https://streams.spec.whatwg.org/#read-request-chunk-steps>
115    pub(crate) fn chunk_steps(
116        &self,
117        cx: &mut js::context::JSContext,
118        chunk: RootedTraceableBox<Heap<JSVal>>,
119        global: &GlobalScope,
120    ) {
121        match self {
122            ReadRequest::Read(promise) => {
123                // chunk steps, given chunk
124                // Resolve promise with «[ "value" → chunk, "done" → false ]».
125                promise.resolve_native(
126                    &ReadableStreamReadResult {
127                        done: Some(false),
128                        value: chunk,
129                    },
130                    CanGc::from_cx(cx),
131                );
132            },
133            ReadRequest::DefaultTee { tee_read_request } => {
134                tee_read_request.enqueue_chunk_steps(chunk);
135            },
136            ReadRequest::ByteTee {
137                byte_tee_read_request,
138            } => {
139                byte_tee_read_request.enqueue_chunk_steps(global, chunk);
140            },
141            ReadRequest::ReadLoop {
142                success_steps: _,
143                failure_steps,
144                reader,
145                bytes,
146            } => {
147                // Spec: chunk steps, given chunk
148                let global = reader.global();
149
150                match bytes_from_chunk_jsval(cx, &chunk) {
151                    Ok(vec) => {
152                        // Step 2. Append the bytes represented by chunk to bytes.
153                        bytes.borrow_mut().extend_from_slice(&vec);
154
155                        // Step 3. Read-loop given reader, bytes, successSteps, and failureSteps.
156                        // Spec note: Avoid direct recursion; queue into a microtask.
157                        // Resolving the promise will queue a microtask to call into the native handler.
158                        let tick = Promise::new2(cx, &global);
159                        tick.resolve_native_with_cx(cx, &());
160
161                        let handler = PromiseNativeHandler::new(
162                            cx,
163                            &global,
164                            Some(Box::new(ContinueReadMicrotask {
165                                reader: Dom::from_ref(reader),
166                                request: self.clone(),
167                            })),
168                            None,
169                        );
170
171                        let mut realm = enter_auto_realm(cx, &*global);
172                        let cx = &mut realm.current_realm();
173                        tick.append_native_handler(cx, &handler);
174                    },
175                    Err(err) => {
176                        // Step 1. If chunk is not a Uint8Array object, call failureSteps with a TypeError and abort.
177                        rooted!(&in(cx) let mut v = UndefinedValue());
178                        err.to_jsval(cx.into(), &global, v.handle_mut(), CanGc::from_cx(cx));
179                        (failure_steps)(cx, v.handle());
180                    },
181                }
182            },
183        }
184    }
185
186    /// <https://streams.spec.whatwg.org/#read-request-close-steps>
187    pub(crate) fn close_steps(&self, cx: &mut js::context::JSContext) {
188        match self {
189            ReadRequest::Read(promise) => {
190                // close steps
191                // Resolve promise with «[ "value" → undefined, "done" → true ]».
192                let result = RootedTraceableBox::new(Heap::default());
193                result.set(UndefinedValue());
194                promise.resolve_native(
195                    &ReadableStreamReadResult {
196                        done: Some(true),
197                        value: result,
198                    },
199                    CanGc::from_cx(cx),
200                );
201            },
202            ReadRequest::DefaultTee { tee_read_request } => {
203                tee_read_request.close_steps(cx);
204            },
205            ReadRequest::ByteTee {
206                byte_tee_read_request,
207            } => {
208                byte_tee_read_request
209                    .close_steps(cx)
210                    .expect("ByteTeeReadRequest close steps should not fail");
211            },
212            ReadRequest::ReadLoop {
213                success_steps,
214                reader,
215                bytes,
216                ..
217            } => {
218                // Step 1. Call successSteps with bytes.
219                (success_steps)(cx, &bytes.borrow());
220
221                reader
222                    .release(cx)
223                    .expect("Releasing the read-all-bytes reader should succeed");
224            },
225        }
226    }
227
228    /// <https://streams.spec.whatwg.org/#read-request-error-steps>
229    pub(crate) fn error_steps(&self, cx: &mut js::context::JSContext, e: SafeHandleValue) {
230        match self {
231            ReadRequest::Read(promise) => {
232                // error steps, given e
233                // Reject promise with e.
234                promise.reject_native(&e, CanGc::from_cx(cx))
235            },
236            ReadRequest::DefaultTee { tee_read_request } => {
237                tee_read_request.error_steps();
238            },
239            ReadRequest::ByteTee {
240                byte_tee_read_request,
241            } => {
242                byte_tee_read_request.error_steps();
243            },
244            ReadRequest::ReadLoop {
245                failure_steps,
246                reader,
247                ..
248            } => {
249                // Step 1. Call failureSteps with e.
250                (failure_steps)(cx, e);
251
252                reader
253                    .release(cx)
254                    .expect("Releasing the read-all-bytes reader should succeed");
255            },
256        }
257    }
258}
259
260/// The rejection handler for
261/// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee>
262#[derive(Clone, JSTraceable, MallocSizeOf)]
263#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
264struct ByteTeeClosedPromiseRejectionHandler {
265    branch_1_controller: Dom<ReadableByteStreamController>,
266    branch_2_controller: Dom<ReadableByteStreamController>,
267    #[conditional_malloc_size_of]
268    canceled_1: Rc<Cell<bool>>,
269    #[conditional_malloc_size_of]
270    canceled_2: Rc<Cell<bool>>,
271    #[conditional_malloc_size_of]
272    cancel_promise: Rc<Promise>,
273    #[conditional_malloc_size_of]
274    reader_version: Rc<Cell<u64>>,
275    expected_version: u64,
276}
277
278impl Callback for ByteTeeClosedPromiseRejectionHandler {
279    /// Continuation of <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee>
280    /// Upon rejection of reader.[[closedPromise]] with reason r,
281    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
282        // If thisReader is not the current `reader`, return.
283        if self.reader_version.get() != self.expected_version {
284            return;
285        }
286
287        // Perform ! ReadableByteStreamControllerError(branch1.[[controller]], r).
288        self.branch_1_controller.error(cx, v);
289
290        // Perform ! ReadableByteStreamControllerError(branch2.[[controller]], r).
291        self.branch_2_controller.error(cx, v);
292
293        // If canceled1 is false or canceled2 is false, resolve cancelPromise with undefined.
294        if !self.canceled_1.get() || !self.canceled_2.get() {
295            self.cancel_promise.resolve_native_with_cx(cx, &());
296        }
297    }
298}
299
300/// The rejection handler for
301/// <https://streams.spec.whatwg.org/#readable-stream-tee>
302#[derive(Clone, JSTraceable, MallocSizeOf)]
303#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
304struct DefaultTeeClosedPromiseRejectionHandler {
305    branch_1_controller: Dom<ReadableStreamDefaultController>,
306    branch_2_controller: Dom<ReadableStreamDefaultController>,
307    #[conditional_malloc_size_of]
308    canceled_1: Rc<Cell<bool>>,
309    #[conditional_malloc_size_of]
310    canceled_2: Rc<Cell<bool>>,
311    #[conditional_malloc_size_of]
312    cancel_promise: Rc<Promise>,
313}
314
315impl Callback for DefaultTeeClosedPromiseRejectionHandler {
316    /// Continuation of <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee>
317    /// Upon rejection of reader.[[closedPromise]] with reason r,
318    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
319        // Perform ! ReadableStreamDefaultControllerError(branch_1.[[controller]], r).
320        self.branch_1_controller.error(cx, v);
321        // Perform ! ReadableStreamDefaultControllerError(branch_2.[[controller]], r).
322        self.branch_2_controller.error(cx, v);
323
324        // If canceled_1 is false or canceled_2 is false, resolve cancelPromise with undefined.
325        if !self.canceled_1.get() || !self.canceled_2.get() {
326            self.cancel_promise.resolve_native_with_cx(cx, &());
327        }
328    }
329}
330
331/// <https://streams.spec.whatwg.org/#readablestreamdefaultreader>
332#[dom_struct]
333pub(crate) struct ReadableStreamDefaultReader {
334    reflector_: Reflector,
335
336    /// <https://streams.spec.whatwg.org/#readablestreamgenericreader-stream>
337    stream: MutNullableDom<ReadableStream>,
338
339    read_requests: DomRefCell<VecDeque<ReadRequest>>,
340
341    /// <https://streams.spec.whatwg.org/#readablestreamgenericreader-closedpromise>
342    #[conditional_malloc_size_of]
343    closed_promise: DomRefCell<Rc<Promise>>,
344}
345
346impl ReadableStreamDefaultReader {
347    fn new_with_proto(
348        cx: &mut JSContext,
349        global: &GlobalScope,
350        proto: Option<SafeHandleObject>,
351    ) -> DomRoot<ReadableStreamDefaultReader> {
352        reflect_dom_object_with_proto_and_cx(
353            Box::new(ReadableStreamDefaultReader::new_inherited(cx, global)),
354            global,
355            proto,
356            cx,
357        )
358    }
359
360    fn new_inherited(cx: &mut JSContext, global: &GlobalScope) -> ReadableStreamDefaultReader {
361        ReadableStreamDefaultReader {
362            reflector_: Reflector::new(),
363            stream: MutNullableDom::new(None),
364            read_requests: DomRefCell::new(Default::default()),
365            closed_promise: DomRefCell::new(Promise::new2(cx, global)),
366        }
367    }
368
369    pub(crate) fn new(
370        cx: &mut JSContext,
371        global: &GlobalScope,
372    ) -> DomRoot<ReadableStreamDefaultReader> {
373        reflect_dom_object_with_cx(Box::new(Self::new_inherited(cx, global)), global, cx)
374    }
375
376    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-reader>
377    pub(crate) fn set_up(
378        &self,
379        cx: &mut JSContext,
380        stream: &ReadableStream,
381        global: &GlobalScope,
382    ) -> Fallible<()> {
383        // If ! IsReadableStreamLocked(stream) is true, throw a TypeError exception.
384        if stream.is_locked() {
385            return Err(Error::Type(c"stream is locked".to_owned()));
386        }
387        // Perform ! ReadableStreamReaderGenericInitialize(reader, stream).
388
389        self.generic_initialize(cx, global, stream);
390
391        // Set reader.[[readRequests]] to a new empty list.
392        self.read_requests.borrow_mut().clear();
393
394        Ok(())
395    }
396
397    /// <https://streams.spec.whatwg.org/#readable-stream-close>
398    pub(crate) fn close(&self, cx: &mut js::context::JSContext) {
399        // Resolve reader.[[closedPromise]] with undefined.
400        self.closed_promise.borrow().resolve_native_with_cx(cx, &());
401        // If reader implements ReadableStreamDefaultReader,
402        // Let readRequests be reader.[[readRequests]].
403        let mut read_requests = self.take_read_requests();
404        // Set reader.[[readRequests]] to an empty list.
405        // For each readRequest of readRequests,
406        for request in read_requests.drain(0..) {
407            // Perform readRequest’s close steps.
408            request.close_steps(cx);
409        }
410    }
411
412    /// <https://streams.spec.whatwg.org/#readable-stream-add-read-request>
413    pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
414        self.read_requests
415            .borrow_mut()
416            .push_back(read_request.clone());
417    }
418
419    /// <https://streams.spec.whatwg.org/#readable-stream-get-num-read-requests>
420    pub(crate) fn get_num_read_requests(&self) -> usize {
421        self.read_requests.borrow().len()
422    }
423
424    /// <https://streams.spec.whatwg.org/#readable-stream-error>
425    pub(crate) fn error(&self, cx: &mut js::context::JSContext, e: SafeHandleValue) {
426        // Reject reader.[[closedPromise]] with e.
427        self.closed_promise.borrow().reject_native_with_cx(cx, &e);
428
429        // Set reader.[[closedPromise]].[[PromiseIsHandled]] to true.
430        self.closed_promise.borrow().set_promise_is_handled();
431
432        // Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, e).
433        self.error_read_requests(cx, e);
434    }
435
436    /// The removal steps of <https://streams.spec.whatwg.org/#readable-stream-fulfill-read-request>
437    pub(crate) fn remove_read_request(&self) -> ReadRequest {
438        self.read_requests
439            .borrow_mut()
440            .pop_front()
441            .expect("Reader must have read request when remove is called into.")
442    }
443
444    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultreaderrelease>
445    pub(crate) fn release(&self, cx: &mut js::context::JSContext) -> Fallible<()> {
446        // Perform ! ReadableStreamReaderGenericRelease(reader).
447        self.generic_release(cx).expect("Generic release failed");
448        // Let e be a new TypeError exception.
449        rooted!(&in(cx) let mut error = UndefinedValue());
450        Error::Type(c"Reader is released".to_owned()).to_jsval(
451            cx.into(),
452            &self.global(),
453            error.handle_mut(),
454            CanGc::from_cx(cx),
455        );
456
457        // Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, e).
458        self.error_read_requests(cx, error.handle());
459        Ok(())
460    }
461
462    fn take_read_requests(&self) -> VecDeque<ReadRequest> {
463        mem::take(&mut *self.read_requests.borrow_mut())
464    }
465
466    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultreadererrorreadrequests>
467    fn error_read_requests(&self, cx: &mut js::context::JSContext, rval: SafeHandleValue) {
468        // step 1
469        let mut read_requests = self.take_read_requests();
470
471        // step 2 & 3
472        for request in read_requests.drain(0..) {
473            request.error_steps(cx, rval);
474        }
475    }
476
477    /// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read>
478    pub(crate) fn read(&self, cx: &mut js::context::JSContext, read_request: &ReadRequest) {
479        // Let stream be reader.[[stream]].
480
481        // Assert: stream is not undefined.
482        assert!(self.stream.get().is_some());
483
484        let stream = self.stream.get().unwrap();
485
486        // Set stream.[[disturbed]] to true.
487        stream.set_is_disturbed(true);
488        // If stream.[[state]] is "closed", perform readRequest’s close steps.
489        if stream.is_closed() {
490            read_request.close_steps(cx);
491        } else if stream.is_errored() {
492            // Otherwise, if stream.[[state]] is "errored",
493            // perform readRequest’s error steps given stream.[[storedError]].
494            rooted!(&in(cx) let mut error = UndefinedValue());
495            stream.get_stored_error(error.handle_mut());
496            read_request.error_steps(cx, error.handle());
497        } else {
498            // Otherwise
499            // Assert: stream.[[state]] is "readable".
500            assert!(stream.is_readable());
501            // Perform ! stream.[[controller]].[[PullSteps]](readRequest).
502            stream.perform_pull_steps(cx, read_request);
503        }
504    }
505
506    /// Attach the byte-tee error handler to this reader's closedPromise.
507    /// Used by ReadableByteStreamTee.
508    #[allow(clippy::too_many_arguments)]
509    pub(crate) fn byte_tee_append_native_handler_to_closed_promise(
510        &self,
511        cx: &mut js::context::JSContext,
512        branch_1: &ReadableStream,
513        branch_2: &ReadableStream,
514        canceled_1: Rc<Cell<bool>>,
515        canceled_2: Rc<Cell<bool>>,
516        cancel_promise: Rc<Promise>,
517        reader_version: Rc<Cell<u64>>,
518        expected_version: u64,
519    ) {
520        // Note: for byte tee we always operate on *byte controllers*.
521        let branch_1_controller = branch_1.get_byte_controller();
522        let branch_2_controller = branch_2.get_byte_controller();
523
524        let global = self.global();
525        let handler = PromiseNativeHandler::new(
526            cx,
527            &global,
528            None,
529            Some(Box::new(ByteTeeClosedPromiseRejectionHandler {
530                branch_1_controller: Dom::from_ref(&branch_1_controller),
531                branch_2_controller: Dom::from_ref(&branch_2_controller),
532                canceled_1,
533                canceled_2,
534                cancel_promise,
535                reader_version,
536                expected_version,
537            })),
538        );
539
540        let mut realm = enter_auto_realm(cx, &*global);
541        let cx = &mut realm.current_realm();
542
543        self.closed_promise
544            .borrow()
545            .append_native_handler(cx, &handler);
546    }
547
548    /// <https://streams.spec.whatwg.org/#ref-for-readablestreamgenericreader-closedpromise%E2%91%A1>
549    pub(crate) fn default_tee_append_native_handler_to_closed_promise(
550        &self,
551        cx: &mut js::context::JSContext,
552        branch_1: &ReadableStream,
553        branch_2: &ReadableStream,
554        canceled_1: Rc<Cell<bool>>,
555        canceled_2: Rc<Cell<bool>>,
556        cancel_promise: Rc<Promise>,
557    ) {
558        let branch_1_controller = branch_1.get_default_controller();
559
560        let branch_2_controller = branch_2.get_default_controller();
561
562        let global = self.global();
563        let handler = PromiseNativeHandler::new(
564            cx,
565            &global,
566            None,
567            Some(Box::new(DefaultTeeClosedPromiseRejectionHandler {
568                branch_1_controller: Dom::from_ref(&branch_1_controller),
569                branch_2_controller: Dom::from_ref(&branch_2_controller),
570                canceled_1,
571                canceled_2,
572                cancel_promise,
573            })),
574        );
575
576        let mut realm = enter_auto_realm(cx, &*global);
577        let cx = &mut realm.current_realm();
578
579        self.closed_promise
580            .borrow()
581            .append_native_handler(cx, &handler);
582    }
583
584    /// <https://streams.spec.whatwg.org/#readablestreamdefaultreader-read-all-bytes>
585    pub(crate) fn read_all_bytes(
586        &self,
587        cx: &mut js::context::JSContext,
588        success_steps: Rc<ReadAllBytesSuccessSteps>,
589        failure_steps: Rc<ReadAllBytesFailureSteps>,
590    ) {
591        // To read all bytes from a ReadableStreamDefaultReader reader,
592        // given successSteps, which is an algorithm accepting a byte sequence,
593        // and failureSteps, which is an algorithm accepting a JavaScript value:
594        // read-loop given reader, a new byte sequence, successSteps, and failureSteps.
595        read_loop(cx, self, success_steps, failure_steps);
596    }
597
598    /// step 3 of <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerprocessreadrequestsusingqueue>
599    pub(crate) fn process_read_requests(
600        &self,
601        cx: &mut js::context::JSContext,
602        controller: DomRoot<ReadableByteStreamController>,
603    ) -> Fallible<()> {
604        // While reader.[[readRequests]] is not empty,
605        while !self.read_requests.borrow().is_empty() {
606            // If controller.[[queueTotalSize]] is 0, return.
607            if controller.get_queue_total_size() == 0.0 {
608                return Ok(());
609            }
610
611            // Let readRequest be reader.[[readRequests]][0].
612            // Remove entry from controller.[[queue]].
613            let read_request = self.remove_read_request();
614
615            // Perform ! ReadableByteStreamControllerFillReadRequestFromQueue(controller, readRequest).
616            controller
617                .fill_read_request_from_queue(cx, &read_request)
618                .expect("Fill read request from queue failed");
619        }
620        Ok(())
621    }
622}
623
624impl ReadableStreamDefaultReaderMethods<crate::DomTypeHolder> for ReadableStreamDefaultReader {
625    /// <https://streams.spec.whatwg.org/#default-reader-constructor>
626    fn Constructor(
627        cx: &mut JSContext,
628        global: &GlobalScope,
629        proto: Option<SafeHandleObject>,
630        stream: &ReadableStream,
631    ) -> Fallible<DomRoot<Self>> {
632        let reader = Self::new_with_proto(cx, global, proto);
633
634        // Perform ? SetUpReadableStreamDefaultReader(this, stream).
635        reader.set_up(cx, stream, global)?;
636
637        Ok(reader)
638    }
639
640    /// <https://streams.spec.whatwg.org/#default-reader-read>
641    fn Read(&self, cx: &mut js::context::JSContext) -> Rc<Promise> {
642        // If this.[[stream]] is undefined, return a promise rejected with a TypeError exception.
643        if self.stream.get().is_none() {
644            rooted!(&in(cx) let mut error = UndefinedValue());
645            Error::Type(c"stream is undefined".to_owned()).to_jsval(
646                cx.into(),
647                &self.global(),
648                error.handle_mut(),
649                CanGc::from_cx(cx),
650            );
651            return Promise::new_rejected(cx, &self.global(), error.handle());
652        }
653        // Let promise be a new promise.
654        let promise = Promise::new2(cx, &self.global());
655
656        // Let readRequest be a new read request with the following items:
657        // chunk steps, given chunk
658        // Resolve promise with «[ "value" → chunk, "done" → false ]».
659        //
660        // close steps
661        // Resolve promise with «[ "value" → undefined, "done" → true ]».
662        //
663        // error steps, given e
664        // Reject promise with e.
665
666        // Rooting(unrooted_must_root): the read request contains only a promise,
667        // which does not need to be rooted,
668        // as it is safely managed natively via an Rc.
669        let read_request = ReadRequest::Read(promise.clone());
670
671        // Perform ! ReadableStreamDefaultReaderRead(this, readRequest).
672        self.read(cx, &read_request);
673
674        // Return promise.
675        promise
676    }
677
678    /// <https://streams.spec.whatwg.org/#default-reader-release-lock>
679    fn ReleaseLock(&self, cx: &mut js::context::JSContext) -> Fallible<()> {
680        if self.stream.get().is_none() {
681            // Step 1: If this.[[stream]] is undefined, return.
682            return Ok(());
683        }
684
685        // Step 2: Perform !ReadableStreamDefaultReaderRelease(this).
686        self.release(cx)
687    }
688
689    /// <https://streams.spec.whatwg.org/#generic-reader-closed>
690    fn Closed(&self) -> Rc<Promise> {
691        self.closed()
692    }
693
694    /// <https://streams.spec.whatwg.org/#generic-reader-cancel>
695    fn Cancel(&self, cx: &mut js::context::JSContext, reason: SafeHandleValue) -> Rc<Promise> {
696        self.generic_cancel(cx, &self.global(), reason)
697    }
698}
699
700impl ReadableStreamGenericReader for ReadableStreamDefaultReader {
701    fn get_closed_promise(&self) -> Rc<Promise> {
702        self.closed_promise.borrow().clone()
703    }
704
705    fn set_closed_promise(&self, promise: Rc<Promise>) {
706        *self.closed_promise.borrow_mut() = promise;
707    }
708
709    fn set_stream(&self, stream: Option<&ReadableStream>) {
710        self.stream.set(stream);
711    }
712
713    fn get_stream(&self) -> Option<DomRoot<ReadableStream>> {
714        self.stream.get()
715    }
716
717    fn as_default_reader(&self) -> Option<&ReadableStreamDefaultReader> {
718        Some(self)
719    }
720}