Skip to main content

script/dom/stream/
readablestreamdefaultreader.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::collections::VecDeque;
7use std::mem;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::jsapi::Heap;
12use js::jsval::{JSVal, UndefinedValue};
13use js::realm::CurrentRealm;
14use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
15use script_bindings::cell::DomRefCell;
16use script_bindings::reflector::{Reflector, reflect_dom_object, reflect_dom_object_with_proto};
17
18use super::byteteereadrequest::ByteTeeReadRequest;
19use super::readablebytestreamcontroller::ReadableByteStreamController;
20use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::{
21    ReadableStreamDefaultReaderMethods, ReadableStreamReadResult,
22};
23use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
24use crate::dom::bindings::reflector::DomGlobal;
25use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
26use crate::dom::bindings::trace::RootedTraceableBox;
27use crate::dom::globalscope::GlobalScope;
28use crate::dom::promise::Promise;
29use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
30use crate::dom::readablestream::{ReadableStream, bytes_from_chunk_jsval};
31use crate::dom::stream::defaultteereadrequest::DefaultTeeReadRequest;
32use crate::dom::stream::readablestreamgenericreader::ReadableStreamGenericReader;
33use crate::dom::types::ReadableStreamDefaultController;
34use crate::realms::enter_auto_realm;
35use crate::script_runtime::CanGc;
36
37type ReadAllBytesSuccessSteps = dyn Fn(&mut js::context::JSContext, &[u8]);
38type ReadAllBytesFailureSteps = dyn Fn(&mut js::context::JSContext, SafeHandleValue);
39
40impl js::gc::Rootable for ContinueReadMicrotask {}
41
42/// Microtask handler to continue the read loop without recursion.
43/// Spec note: "This recursion could potentially cause a stack overflow
44/// if implemented directly. Implementations will need to mitigate this,
45/// e.g. by using a non-recursive variant of this algorithm, or queuing
46/// a microtask…"
47#[derive(Clone, JSTraceable, MallocSizeOf)]
48#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
49struct ContinueReadMicrotask {
50    reader: Dom<ReadableStreamDefaultReader>,
51    request: ReadRequest,
52}
53
54impl Callback for ContinueReadMicrotask {
55    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
56        // https://streams.spec.whatwg.org/#ref-for-read-loop%E2%91%A0
57        // Note: continuing the read-loop from inside a micro-task to break recursion.
58        self.reader.read(cx, &self.request);
59    }
60}
61
62/// <https://streams.spec.whatwg.org/#read-loop>
63fn read_loop(
64    cx: &mut js::context::JSContext,
65    reader: &ReadableStreamDefaultReader,
66    success_steps: Rc<ReadAllBytesSuccessSteps>,
67    failure_steps: Rc<ReadAllBytesFailureSteps>,
68) {
69    // For the purposes of the above algorithm, to read-loop given reader,
70    // bytes, successSteps, and failureSteps:
71
72    // Step 1 .Let readRequest be a new read request with the following items:
73    let req = ReadRequest::ReadLoop {
74        success_steps,
75        failure_steps,
76        reader: Dom::from_ref(reader),
77        bytes: Rc::new(DomRefCell::new(Vec::new())),
78    };
79    // Step 2 .Perform ! ReadableStreamDefaultReaderRead(reader, readRequest).
80    reader.read(cx, &req);
81}
82
83/// <https://streams.spec.whatwg.org/#read-request>
84#[derive(Clone, JSTraceable, MallocSizeOf)]
85pub(crate) enum ReadRequest {
86    /// <https://streams.spec.whatwg.org/#default-reader-read>
87    Read(#[conditional_malloc_size_of] Rc<Promise>),
88    /// <https://streams.spec.whatwg.org/#ref-for-read-request%E2%91%A2>
89    DefaultTee {
90        tee_read_request: Dom<DefaultTeeReadRequest>,
91    },
92    /// Spec read loop variant, driven by read-request steps (no Promise).
93    /// <https://streams.spec.whatwg.org/#read-loop>
94    ReadLoop {
95        #[ignore_malloc_size_of = "dyn Fn"]
96        #[no_trace]
97        success_steps: Rc<ReadAllBytesSuccessSteps>,
98        #[ignore_malloc_size_of = "dyn Fn"]
99        #[no_trace]
100        failure_steps: Rc<ReadAllBytesFailureSteps>,
101        reader: Dom<ReadableStreamDefaultReader>,
102        #[conditional_malloc_size_of]
103        bytes: Rc<DomRefCell<Vec<u8>>>,
104    },
105    ByteTee {
106        byte_tee_read_request: Dom<ByteTeeReadRequest>,
107    },
108}
109
110impl ReadRequest {
111    /// <https://streams.spec.whatwg.org/#read-request-chunk-steps>
112    pub(crate) fn chunk_steps(
113        &self,
114        cx: &mut js::context::JSContext,
115        chunk: RootedTraceableBox<Heap<JSVal>>,
116        global: &GlobalScope,
117    ) {
118        match self {
119            ReadRequest::Read(promise) => {
120                // chunk steps, given chunk
121                // Resolve promise with «[ "value" → chunk, "done" → false ]».
122                promise.resolve_native(
123                    &ReadableStreamReadResult {
124                        done: Some(false),
125                        value: chunk,
126                    },
127                    CanGc::from_cx(cx),
128                );
129            },
130            ReadRequest::DefaultTee { tee_read_request } => {
131                tee_read_request.enqueue_chunk_steps(chunk);
132            },
133            ReadRequest::ByteTee {
134                byte_tee_read_request,
135            } => {
136                byte_tee_read_request.enqueue_chunk_steps(global, chunk);
137            },
138            ReadRequest::ReadLoop {
139                success_steps: _,
140                failure_steps,
141                reader,
142                bytes,
143            } => {
144                // Spec: chunk steps, given chunk
145                let global = reader.global();
146
147                match bytes_from_chunk_jsval(cx.into(), &chunk, CanGc::from_cx(cx)) {
148                    Ok(vec) => {
149                        // Step 2. Append the bytes represented by chunk to bytes.
150                        bytes.borrow_mut().extend_from_slice(&vec);
151
152                        // Step 3. Read-loop given reader, bytes, successSteps, and failureSteps.
153                        // Spec note: Avoid direct recursion; queue into a microtask.
154                        // Resolving the promise will queue a microtask to call into the native handler.
155                        let tick = Promise::new(&global, CanGc::from_cx(cx));
156                        tick.resolve_native(&(), CanGc::from_cx(cx));
157
158                        let handler = PromiseNativeHandler::new(
159                            &global,
160                            Some(Box::new(ContinueReadMicrotask {
161                                reader: Dom::from_ref(reader),
162                                request: self.clone(),
163                            })),
164                            None,
165                            CanGc::from_cx(cx),
166                        );
167
168                        let mut realm = enter_auto_realm(cx, &*global);
169                        let cx = &mut realm.current_realm();
170                        tick.append_native_handler(cx, &handler);
171                    },
172                    Err(err) => {
173                        // Step 1. If chunk is not a Uint8Array object, call failureSteps with a TypeError and abort.
174                        rooted!(&in(cx) let mut v = UndefinedValue());
175                        err.to_jsval(cx.into(), &global, v.handle_mut(), CanGc::from_cx(cx));
176                        (failure_steps)(cx, v.handle());
177                    },
178                }
179            },
180        }
181    }
182
183    /// <https://streams.spec.whatwg.org/#read-request-close-steps>
184    pub(crate) fn close_steps(&self, cx: &mut js::context::JSContext) {
185        match self {
186            ReadRequest::Read(promise) => {
187                // close steps
188                // Resolve promise with «[ "value" → undefined, "done" → true ]».
189                let result = RootedTraceableBox::new(Heap::default());
190                result.set(UndefinedValue());
191                promise.resolve_native(
192                    &ReadableStreamReadResult {
193                        done: Some(true),
194                        value: result,
195                    },
196                    CanGc::from_cx(cx),
197                );
198            },
199            ReadRequest::DefaultTee { tee_read_request } => {
200                tee_read_request.close_steps(cx);
201            },
202            ReadRequest::ByteTee {
203                byte_tee_read_request,
204            } => {
205                byte_tee_read_request
206                    .close_steps(cx)
207                    .expect("ByteTeeReadRequest close steps should not fail");
208            },
209            ReadRequest::ReadLoop {
210                success_steps,
211                reader,
212                bytes,
213                ..
214            } => {
215                // Step 1. Call successSteps with bytes.
216                (success_steps)(cx, &bytes.borrow());
217
218                reader
219                    .release(cx)
220                    .expect("Releasing the read-all-bytes reader should succeed");
221            },
222        }
223    }
224
225    /// <https://streams.spec.whatwg.org/#read-request-error-steps>
226    pub(crate) fn error_steps(&self, cx: &mut js::context::JSContext, e: SafeHandleValue) {
227        match self {
228            ReadRequest::Read(promise) => {
229                // error steps, given e
230                // Reject promise with e.
231                promise.reject_native(&e, CanGc::from_cx(cx))
232            },
233            ReadRequest::DefaultTee { tee_read_request } => {
234                tee_read_request.error_steps();
235            },
236            ReadRequest::ByteTee {
237                byte_tee_read_request,
238            } => {
239                byte_tee_read_request.error_steps();
240            },
241            ReadRequest::ReadLoop {
242                failure_steps,
243                reader,
244                ..
245            } => {
246                // Step 1. Call failureSteps with e.
247                (failure_steps)(cx, e);
248
249                reader
250                    .release(cx)
251                    .expect("Releasing the read-all-bytes reader should succeed");
252            },
253        }
254    }
255}
256
257/// The rejection handler for
258/// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee>
259#[derive(Clone, JSTraceable, MallocSizeOf)]
260#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
261struct ByteTeeClosedPromiseRejectionHandler {
262    branch_1_controller: Dom<ReadableByteStreamController>,
263    branch_2_controller: Dom<ReadableByteStreamController>,
264    #[conditional_malloc_size_of]
265    canceled_1: Rc<Cell<bool>>,
266    #[conditional_malloc_size_of]
267    canceled_2: Rc<Cell<bool>>,
268    #[conditional_malloc_size_of]
269    cancel_promise: Rc<Promise>,
270    #[conditional_malloc_size_of]
271    reader_version: Rc<Cell<u64>>,
272    expected_version: u64,
273}
274
275impl Callback for ByteTeeClosedPromiseRejectionHandler {
276    /// Continuation of <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee>
277    /// Upon rejection of reader.[[closedPromise]] with reason r,
278    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
279        // If thisReader is not the current `reader`, return.
280        if self.reader_version.get() != self.expected_version {
281            return;
282        }
283
284        // Perform ! ReadableByteStreamControllerError(branch1.[[controller]], r).
285        self.branch_1_controller.error(cx, v);
286
287        // Perform ! ReadableByteStreamControllerError(branch2.[[controller]], r).
288        self.branch_2_controller.error(cx, v);
289
290        // If canceled1 is false or canceled2 is false, resolve cancelPromise with undefined.
291        if !self.canceled_1.get() || !self.canceled_2.get() {
292            self.cancel_promise.resolve_native(&(), CanGc::from_cx(cx));
293        }
294    }
295}
296
297/// The rejection handler for
298/// <https://streams.spec.whatwg.org/#readable-stream-tee>
299#[derive(Clone, JSTraceable, MallocSizeOf)]
300#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
301struct DefaultTeeClosedPromiseRejectionHandler {
302    branch_1_controller: Dom<ReadableStreamDefaultController>,
303    branch_2_controller: Dom<ReadableStreamDefaultController>,
304    #[conditional_malloc_size_of]
305    canceled_1: Rc<Cell<bool>>,
306    #[conditional_malloc_size_of]
307    canceled_2: Rc<Cell<bool>>,
308    #[conditional_malloc_size_of]
309    cancel_promise: Rc<Promise>,
310}
311
312impl Callback for DefaultTeeClosedPromiseRejectionHandler {
313    /// Continuation of <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee>
314    /// Upon rejection of reader.[[closedPromise]] with reason r,
315    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
316        // Perform ! ReadableStreamDefaultControllerError(branch_1.[[controller]], r).
317        self.branch_1_controller.error(cx, v);
318        // Perform ! ReadableStreamDefaultControllerError(branch_2.[[controller]], r).
319        self.branch_2_controller.error(cx, v);
320
321        // If canceled_1 is false or canceled_2 is false, resolve cancelPromise with undefined.
322        if !self.canceled_1.get() || !self.canceled_2.get() {
323            self.cancel_promise.resolve_native(&(), CanGc::from_cx(cx));
324        }
325    }
326}
327
328/// <https://streams.spec.whatwg.org/#readablestreamdefaultreader>
329#[dom_struct]
330pub(crate) struct ReadableStreamDefaultReader {
331    reflector_: Reflector,
332
333    /// <https://streams.spec.whatwg.org/#readablestreamgenericreader-stream>
334    stream: MutNullableDom<ReadableStream>,
335
336    read_requests: DomRefCell<VecDeque<ReadRequest>>,
337
338    /// <https://streams.spec.whatwg.org/#readablestreamgenericreader-closedpromise>
339    #[conditional_malloc_size_of]
340    closed_promise: DomRefCell<Rc<Promise>>,
341}
342
343impl ReadableStreamDefaultReader {
344    fn new_with_proto(
345        global: &GlobalScope,
346        proto: Option<SafeHandleObject>,
347        can_gc: CanGc,
348    ) -> DomRoot<ReadableStreamDefaultReader> {
349        reflect_dom_object_with_proto(
350            Box::new(ReadableStreamDefaultReader::new_inherited(global, can_gc)),
351            global,
352            proto,
353            can_gc,
354        )
355    }
356
357    fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> ReadableStreamDefaultReader {
358        ReadableStreamDefaultReader {
359            reflector_: Reflector::new(),
360            stream: MutNullableDom::new(None),
361            read_requests: DomRefCell::new(Default::default()),
362            closed_promise: DomRefCell::new(Promise::new(global, can_gc)),
363        }
364    }
365
366    pub(crate) fn new(global: &GlobalScope, can_gc: CanGc) -> DomRoot<ReadableStreamDefaultReader> {
367        reflect_dom_object(
368            Box::new(Self::new_inherited(global, can_gc)),
369            global,
370            can_gc,
371        )
372    }
373
374    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-reader>
375    pub(crate) fn set_up(
376        &self,
377        stream: &ReadableStream,
378        global: &GlobalScope,
379        can_gc: CanGc,
380    ) -> Fallible<()> {
381        // If ! IsReadableStreamLocked(stream) is true, throw a TypeError exception.
382        if stream.is_locked() {
383            return Err(Error::Type(c"stream is locked".to_owned()));
384        }
385        // Perform ! ReadableStreamReaderGenericInitialize(reader, stream).
386
387        self.generic_initialize(global, stream, can_gc);
388
389        // Set reader.[[readRequests]] to a new empty list.
390        self.read_requests.borrow_mut().clear();
391
392        Ok(())
393    }
394
395    /// <https://streams.spec.whatwg.org/#readable-stream-close>
396    pub(crate) fn close(&self, cx: &mut js::context::JSContext) {
397        // Resolve reader.[[closedPromise]] with undefined.
398        self.closed_promise
399            .borrow()
400            .resolve_native(&(), CanGc::from_cx(cx));
401        // If reader implements ReadableStreamDefaultReader,
402        // Let readRequests be reader.[[readRequests]].
403        let mut read_requests = self.take_read_requests();
404        // Set reader.[[readRequests]] to an empty list.
405        // For each readRequest of readRequests,
406        for request in read_requests.drain(0..) {
407            // Perform readRequest’s close steps.
408            request.close_steps(cx);
409        }
410    }
411
412    /// <https://streams.spec.whatwg.org/#readable-stream-add-read-request>
413    pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
414        self.read_requests
415            .borrow_mut()
416            .push_back(read_request.clone());
417    }
418
419    /// <https://streams.spec.whatwg.org/#readable-stream-get-num-read-requests>
420    pub(crate) fn get_num_read_requests(&self) -> usize {
421        self.read_requests.borrow().len()
422    }
423
424    /// <https://streams.spec.whatwg.org/#readable-stream-error>
425    pub(crate) fn error(&self, cx: &mut js::context::JSContext, e: SafeHandleValue) {
426        // Reject reader.[[closedPromise]] with e.
427        self.closed_promise
428            .borrow()
429            .reject_native(&e, CanGc::from_cx(cx));
430
431        // Set reader.[[closedPromise]].[[PromiseIsHandled]] to true.
432        self.closed_promise.borrow().set_promise_is_handled();
433
434        // Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, e).
435        self.error_read_requests(cx, e);
436    }
437
438    /// The removal steps of <https://streams.spec.whatwg.org/#readable-stream-fulfill-read-request>
439    pub(crate) fn remove_read_request(&self) -> ReadRequest {
440        self.read_requests
441            .borrow_mut()
442            .pop_front()
443            .expect("Reader must have read request when remove is called into.")
444    }
445
446    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultreaderrelease>
447    pub(crate) fn release(&self, cx: &mut js::context::JSContext) -> Fallible<()> {
448        // Perform ! ReadableStreamReaderGenericRelease(reader).
449        self.generic_release(CanGc::from_cx(cx))
450            .expect("Generic release failed");
451        // Let e be a new TypeError exception.
452        rooted!(&in(cx) let mut error = UndefinedValue());
453        Error::Type(c"Reader is released".to_owned()).to_jsval(
454            cx.into(),
455            &self.global(),
456            error.handle_mut(),
457            CanGc::from_cx(cx),
458        );
459
460        // Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, e).
461        self.error_read_requests(cx, error.handle());
462        Ok(())
463    }
464
465    fn take_read_requests(&self) -> VecDeque<ReadRequest> {
466        mem::take(&mut *self.read_requests.borrow_mut())
467    }
468
469    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultreadererrorreadrequests>
470    fn error_read_requests(&self, cx: &mut js::context::JSContext, rval: SafeHandleValue) {
471        // step 1
472        let mut read_requests = self.take_read_requests();
473
474        // step 2 & 3
475        for request in read_requests.drain(0..) {
476            request.error_steps(cx, rval);
477        }
478    }
479
480    /// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read>
481    pub(crate) fn read(&self, cx: &mut js::context::JSContext, read_request: &ReadRequest) {
482        // Let stream be reader.[[stream]].
483
484        // Assert: stream is not undefined.
485        assert!(self.stream.get().is_some());
486
487        let stream = self.stream.get().unwrap();
488
489        // Set stream.[[disturbed]] to true.
490        stream.set_is_disturbed(true);
491        // If stream.[[state]] is "closed", perform readRequest’s close steps.
492        if stream.is_closed() {
493            read_request.close_steps(cx);
494        } else if stream.is_errored() {
495            // Otherwise, if stream.[[state]] is "errored",
496            // perform readRequest’s error steps given stream.[[storedError]].
497            rooted!(&in(cx) let mut error = UndefinedValue());
498            stream.get_stored_error(error.handle_mut());
499            read_request.error_steps(cx, error.handle());
500        } else {
501            // Otherwise
502            // Assert: stream.[[state]] is "readable".
503            assert!(stream.is_readable());
504            // Perform ! stream.[[controller]].[[PullSteps]](readRequest).
505            stream.perform_pull_steps(cx, read_request);
506        }
507    }
508
509    /// Attach the byte-tee error handler to this reader's closedPromise.
510    /// Used by ReadableByteStreamTee.
511    #[allow(clippy::too_many_arguments)]
512    pub(crate) fn byte_tee_append_native_handler_to_closed_promise(
513        &self,
514        cx: &mut js::context::JSContext,
515        branch_1: &ReadableStream,
516        branch_2: &ReadableStream,
517        canceled_1: Rc<Cell<bool>>,
518        canceled_2: Rc<Cell<bool>>,
519        cancel_promise: Rc<Promise>,
520        reader_version: Rc<Cell<u64>>,
521        expected_version: u64,
522    ) {
523        // Note: for byte tee we always operate on *byte controllers*.
524        let branch_1_controller = branch_1.get_byte_controller();
525        let branch_2_controller = branch_2.get_byte_controller();
526
527        let global = self.global();
528        let handler = PromiseNativeHandler::new(
529            &global,
530            None,
531            Some(Box::new(ByteTeeClosedPromiseRejectionHandler {
532                branch_1_controller: Dom::from_ref(&branch_1_controller),
533                branch_2_controller: Dom::from_ref(&branch_2_controller),
534                canceled_1,
535                canceled_2,
536                cancel_promise,
537                reader_version,
538                expected_version,
539            })),
540            CanGc::from_cx(cx),
541        );
542
543        let mut realm = enter_auto_realm(cx, &*global);
544        let cx = &mut realm.current_realm();
545
546        self.closed_promise
547            .borrow()
548            .append_native_handler(cx, &handler);
549    }
550
551    /// <https://streams.spec.whatwg.org/#ref-for-readablestreamgenericreader-closedpromise%E2%91%A1>
552    pub(crate) fn default_tee_append_native_handler_to_closed_promise(
553        &self,
554        cx: &mut js::context::JSContext,
555        branch_1: &ReadableStream,
556        branch_2: &ReadableStream,
557        canceled_1: Rc<Cell<bool>>,
558        canceled_2: Rc<Cell<bool>>,
559        cancel_promise: Rc<Promise>,
560    ) {
561        let branch_1_controller = branch_1.get_default_controller();
562
563        let branch_2_controller = branch_2.get_default_controller();
564
565        let global = self.global();
566        let handler = PromiseNativeHandler::new(
567            &global,
568            None,
569            Some(Box::new(DefaultTeeClosedPromiseRejectionHandler {
570                branch_1_controller: Dom::from_ref(&branch_1_controller),
571                branch_2_controller: Dom::from_ref(&branch_2_controller),
572                canceled_1,
573                canceled_2,
574                cancel_promise,
575            })),
576            CanGc::from_cx(cx),
577        );
578
579        let mut realm = enter_auto_realm(cx, &*global);
580        let cx = &mut realm.current_realm();
581
582        self.closed_promise
583            .borrow()
584            .append_native_handler(cx, &handler);
585    }
586
587    /// <https://streams.spec.whatwg.org/#readablestreamdefaultreader-read-all-bytes>
588    pub(crate) fn read_all_bytes(
589        &self,
590        cx: &mut js::context::JSContext,
591        success_steps: Rc<ReadAllBytesSuccessSteps>,
592        failure_steps: Rc<ReadAllBytesFailureSteps>,
593    ) {
594        // To read all bytes from a ReadableStreamDefaultReader reader,
595        // given successSteps, which is an algorithm accepting a byte sequence,
596        // and failureSteps, which is an algorithm accepting a JavaScript value:
597        // read-loop given reader, a new byte sequence, successSteps, and failureSteps.
598        read_loop(cx, self, success_steps, failure_steps);
599    }
600
601    /// step 3 of <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerprocessreadrequestsusingqueue>
602    pub(crate) fn process_read_requests(
603        &self,
604        cx: &mut js::context::JSContext,
605        controller: DomRoot<ReadableByteStreamController>,
606    ) -> Fallible<()> {
607        // While reader.[[readRequests]] is not empty,
608        while !self.read_requests.borrow().is_empty() {
609            // If controller.[[queueTotalSize]] is 0, return.
610            if controller.get_queue_total_size() == 0.0 {
611                return Ok(());
612            }
613
614            // Let readRequest be reader.[[readRequests]][0].
615            // Remove entry from controller.[[queue]].
616            let read_request = self.remove_read_request();
617
618            // Perform ! ReadableByteStreamControllerFillReadRequestFromQueue(controller, readRequest).
619            controller
620                .fill_read_request_from_queue(cx, &read_request)
621                .expect("Fill read request from queue failed");
622        }
623        Ok(())
624    }
625}
626
627impl ReadableStreamDefaultReaderMethods<crate::DomTypeHolder> for ReadableStreamDefaultReader {
628    /// <https://streams.spec.whatwg.org/#default-reader-constructor>
629    fn Constructor(
630        global: &GlobalScope,
631        proto: Option<SafeHandleObject>,
632        can_gc: CanGc,
633        stream: &ReadableStream,
634    ) -> Fallible<DomRoot<Self>> {
635        let reader = Self::new_with_proto(global, proto, can_gc);
636
637        // Perform ? SetUpReadableStreamDefaultReader(this, stream).
638        Self::set_up(&reader, stream, global, can_gc)?;
639
640        Ok(reader)
641    }
642
643    /// <https://streams.spec.whatwg.org/#default-reader-read>
644    fn Read(&self, cx: &mut js::context::JSContext) -> Rc<Promise> {
645        // If this.[[stream]] is undefined, return a promise rejected with a TypeError exception.
646        if self.stream.get().is_none() {
647            rooted!(&in(cx) let mut error = UndefinedValue());
648            Error::Type(c"stream is undefined".to_owned()).to_jsval(
649                cx.into(),
650                &self.global(),
651                error.handle_mut(),
652                CanGc::from_cx(cx),
653            );
654            return Promise::new_rejected(
655                &self.global(),
656                cx.into(),
657                error.handle(),
658                CanGc::from_cx(cx),
659            );
660        }
661        // Let promise be a new promise.
662        let promise = Promise::new2(cx, &self.global());
663
664        // Let readRequest be a new read request with the following items:
665        // chunk steps, given chunk
666        // Resolve promise with «[ "value" → chunk, "done" → false ]».
667        //
668        // close steps
669        // Resolve promise with «[ "value" → undefined, "done" → true ]».
670        //
671        // error steps, given e
672        // Reject promise with e.
673
674        // Rooting(unrooted_must_root): the read request contains only a promise,
675        // which does not need to be rooted,
676        // as it is safely managed natively via an Rc.
677        let read_request = ReadRequest::Read(promise.clone());
678
679        // Perform ! ReadableStreamDefaultReaderRead(this, readRequest).
680        self.read(cx, &read_request);
681
682        // Return promise.
683        promise
684    }
685
686    /// <https://streams.spec.whatwg.org/#default-reader-release-lock>
687    fn ReleaseLock(&self, cx: &mut js::context::JSContext) -> Fallible<()> {
688        if self.stream.get().is_none() {
689            // Step 1: If this.[[stream]] is undefined, return.
690            return Ok(());
691        }
692
693        // Step 2: Perform !ReadableStreamDefaultReaderRelease(this).
694        self.release(cx)
695    }
696
697    /// <https://streams.spec.whatwg.org/#generic-reader-closed>
698    fn Closed(&self) -> Rc<Promise> {
699        self.closed()
700    }
701
702    /// <https://streams.spec.whatwg.org/#generic-reader-cancel>
703    fn Cancel(&self, cx: &mut js::context::JSContext, reason: SafeHandleValue) -> Rc<Promise> {
704        self.generic_cancel(cx, &self.global(), reason)
705    }
706}
707
708impl ReadableStreamGenericReader for ReadableStreamDefaultReader {
709    fn get_closed_promise(&self) -> Rc<Promise> {
710        self.closed_promise.borrow().clone()
711    }
712
713    fn set_closed_promise(&self, promise: Rc<Promise>) {
714        *self.closed_promise.borrow_mut() = promise;
715    }
716
717    fn set_stream(&self, stream: Option<&ReadableStream>) {
718        self.stream.set(stream);
719    }
720
721    fn get_stream(&self) -> Option<DomRoot<ReadableStream>> {
722        self.stream.get()
723    }
724
725    fn as_default_reader(&self) -> Option<&ReadableStreamDefaultReader> {
726        Some(self)
727    }
728}