script/dom/
readablestreamdefaultcontroller.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::jsapi::{Heap, JSObject};
12use js::jsval::{JSVal, UndefinedValue};
13use js::realm::CurrentRealm;
14use js::rust::wrappers::JS_GetPendingException;
15use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue, MutableHandleValue};
16use js::typedarray::Uint8;
17use script_bindings::conversions::SafeToJSValConvertible;
18
19use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
20use super::bindings::root::Dom;
21use crate::dom::bindings::buffer_source::create_buffer_source;
22use crate::dom::bindings::callback::ExceptionHandling;
23use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultControllerMethods;
24use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
25use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible, throw_dom_exception};
26use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
27use crate::dom::bindings::root::{DomRoot, MutNullableDom};
28use crate::dom::bindings::trace::RootedTraceableBox;
29use crate::dom::globalscope::GlobalScope;
30use crate::dom::promise::Promise;
31use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
32use crate::dom::readablestream::ReadableStream;
33use crate::dom::readablestreamdefaultreader::ReadRequest;
34use crate::dom::underlyingsourcecontainer::{UnderlyingSourceContainer, UnderlyingSourceType};
35use crate::realms::{InRealm, enter_realm};
36use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
37
38/// The fulfillment handler for
39/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
40#[derive(Clone, JSTraceable, MallocSizeOf)]
41#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
42struct PullAlgorithmFulfillmentHandler {
43    controller: Dom<ReadableStreamDefaultController>,
44}
45
46impl Callback for PullAlgorithmFulfillmentHandler {
47    /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
48    /// Upon fulfillment of pullPromise
49    fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
50        let can_gc = CanGc::from_cx(cx);
51        // Set controller.[[pulling]] to false.
52        self.controller.pulling.set(false);
53
54        // If controller.[[pullAgain]] is true,
55        if self.controller.pull_again.get() {
56            // Set controller.[[pullAgain]] to false.
57            self.controller.pull_again.set(false);
58
59            // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
60            self.controller.call_pull_if_needed(can_gc);
61        }
62    }
63}
64
65/// The rejection handler for
66/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
67#[derive(Clone, JSTraceable, MallocSizeOf)]
68#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
69struct PullAlgorithmRejectionHandler {
70    controller: Dom<ReadableStreamDefaultController>,
71}
72
73impl Callback for PullAlgorithmRejectionHandler {
74    /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
75    /// Upon rejection of pullPromise with reason e.
76    fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
77        let can_gc = CanGc::from_cx(cx);
78        // Perform ! ReadableStreamDefaultControllerError(controller, e).
79        self.controller.error(v, can_gc);
80    }
81}
82
83/// The fulfillment handler for
84/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
85#[derive(Clone, JSTraceable, MallocSizeOf)]
86#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
87struct StartAlgorithmFulfillmentHandler {
88    controller: Dom<ReadableStreamDefaultController>,
89}
90
91impl Callback for StartAlgorithmFulfillmentHandler {
92    /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
93    /// Upon fulfillment of startPromise,
94    fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
95        let can_gc = CanGc::from_cx(cx);
96        // Set controller.[[started]] to true.
97        self.controller.started.set(true);
98
99        // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
100        self.controller.call_pull_if_needed(can_gc);
101    }
102}
103
104/// The rejection handler for
105/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
106#[derive(Clone, JSTraceable, MallocSizeOf)]
107#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
108struct StartAlgorithmRejectionHandler {
109    controller: Dom<ReadableStreamDefaultController>,
110}
111
112impl Callback for StartAlgorithmRejectionHandler {
113    /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
114    /// Upon rejection of startPromise with reason r,
115    fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
116        let can_gc = CanGc::from_cx(cx);
117        // Perform ! ReadableStreamDefaultControllerError(controller, r).
118        self.controller.error(v, can_gc);
119    }
120}
121
122/// <https://streams.spec.whatwg.org/#value-with-size>
123#[derive(Debug, JSTraceable, MallocSizeOf, PartialEq)]
124#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
125pub(crate) struct ValueWithSize {
126    /// <https://streams.spec.whatwg.org/#value-with-size-value>
127    #[ignore_malloc_size_of = "Heap is measured by mozjs"]
128    pub(crate) value: Box<Heap<JSVal>>,
129    /// <https://streams.spec.whatwg.org/#value-with-size-size>
130    pub(crate) size: f64,
131}
132
133/// <https://streams.spec.whatwg.org/#value-with-size>
134#[derive(Debug, JSTraceable, MallocSizeOf, PartialEq)]
135#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
136pub(crate) enum EnqueuedValue {
137    /// A value enqueued from Rust.
138    Native(Box<[u8]>),
139    /// A Js value.
140    Js(ValueWithSize),
141    /// <https://streams.spec.whatwg.org/#close-sentinel>
142    CloseSentinel,
143}
144
145impl EnqueuedValue {
146    fn size(&self) -> f64 {
147        match self {
148            EnqueuedValue::Native(v) => v.len() as f64,
149            EnqueuedValue::Js(v) => v.size,
150            // The size of the sentinel is zero,
151            // as per <https://streams.spec.whatwg.org/#ref-for-close-sentinel%E2%91%A0>
152            EnqueuedValue::CloseSentinel => 0.,
153        }
154    }
155
156    fn to_jsval(&self, cx: SafeJSContext, rval: MutableHandleValue, can_gc: CanGc) {
157        match self {
158            EnqueuedValue::Native(chunk) => {
159                rooted!(in(*cx) let mut array_buffer_ptr = ptr::null_mut::<JSObject>());
160                create_buffer_source::<Uint8>(cx, chunk, array_buffer_ptr.handle_mut(), can_gc)
161                    .expect("failed to create buffer source for native chunk.");
162                array_buffer_ptr.safe_to_jsval(cx, rval, can_gc);
163            },
164            EnqueuedValue::Js(value_with_size) => {
165                value_with_size.value.safe_to_jsval(cx, rval, can_gc)
166            },
167            EnqueuedValue::CloseSentinel => {
168                unreachable!("The close sentinel is never made available as a js val.")
169            },
170        }
171    }
172}
173
174/// <https://streams.spec.whatwg.org/#is-non-negative-number>
175fn is_non_negative_number(value: &EnqueuedValue) -> bool {
176    let value_with_size = match value {
177        EnqueuedValue::Native(_) => return true,
178        EnqueuedValue::Js(value_with_size) => value_with_size,
179        EnqueuedValue::CloseSentinel => return true,
180    };
181
182    // If v is not a Number, return false.
183    // Checked as part of the WebIDL.
184
185    // If v is NaN, return false.
186    if value_with_size.size.is_nan() {
187        return false;
188    }
189
190    // If v < 0, return false.
191    if value_with_size.size.is_sign_negative() {
192        return false;
193    }
194
195    true
196}
197
198/// <https://streams.spec.whatwg.org/#queue-with-sizes>
199#[derive(Default, JSTraceable, MallocSizeOf)]
200#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
201pub(crate) struct QueueWithSizes {
202    queue: RefCell<VecDeque<EnqueuedValue>>,
203    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-queuetotalsize>
204    pub(crate) total_size: Cell<f64>,
205}
206
207impl QueueWithSizes {
208    /// <https://streams.spec.whatwg.org/#dequeue-value>
209    /// A none `rval` means we're dequeing the close sentinel,
210    /// which should never be made available to script.
211    pub(crate) fn dequeue_value(
212        &self,
213        cx: SafeJSContext,
214        rval: Option<MutableHandleValue>,
215        can_gc: CanGc,
216    ) {
217        {
218            let queue = self.queue.borrow();
219            let Some(value) = queue.front() else {
220                unreachable!("Buffer cannot be empty when dequeue value is called into.");
221            };
222            self.total_size.set(self.total_size.get() - value.size());
223            if let Some(rval) = rval {
224                value.to_jsval(cx, rval, can_gc);
225            } else {
226                assert_eq!(value, &EnqueuedValue::CloseSentinel);
227            }
228        }
229        self.queue.borrow_mut().pop_front();
230    }
231
232    /// <https://streams.spec.whatwg.org/#enqueue-value-with-size>
233    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
234    pub(crate) fn enqueue_value_with_size(&self, value: EnqueuedValue) -> Result<(), Error> {
235        // If ! IsNonNegativeNumber(size) is false, throw a RangeError exception.
236        if !is_non_negative_number(&value) {
237            return Err(Error::Range(
238                "The size of the enqueued chunk is not a non-negative number.".to_string(),
239            ));
240        }
241
242        // If size is +∞, throw a RangeError exception.
243        if value.size().is_infinite() {
244            return Err(Error::Range(
245                "The size of the enqueued chunk is infinite.".to_string(),
246            ));
247        }
248
249        self.total_size.set(self.total_size.get() + value.size());
250        self.queue.borrow_mut().push_back(value);
251
252        Ok(())
253    }
254
255    pub(crate) fn is_empty(&self) -> bool {
256        self.queue.borrow().is_empty()
257    }
258
259    /// <https://streams.spec.whatwg.org/#peek-queue-value>
260    /// Returns whether value is the close sentinel.
261    pub(crate) fn peek_queue_value(
262        &self,
263        cx: SafeJSContext,
264        rval: MutableHandleValue,
265        can_gc: CanGc,
266    ) -> bool {
267        // Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
268        // Done with the QueueWithSizes type.
269
270        // Assert: container.[[queue]] is not empty.
271        assert!(!self.is_empty());
272
273        // Let valueWithSize be container.[[queue]][0].
274        let queue = self.queue.borrow();
275        let value_with_size = queue.front().expect("Queue is not empty.");
276        if let EnqueuedValue::CloseSentinel = value_with_size {
277            return true;
278        }
279
280        // Return valueWithSize’s value.
281        value_with_size.to_jsval(cx, rval, can_gc);
282        false
283    }
284
285    /// Only used with native sources.
286    fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
287        self.queue
288            .borrow()
289            .iter()
290            .try_fold(Vec::new(), |mut acc, value| match value {
291                EnqueuedValue::Native(chunk) => {
292                    acc.extend(chunk.iter().copied());
293                    Some(acc)
294                },
295                _ => {
296                    warn!("get_in_memory_bytes called on a controller with non-native source.");
297                    None
298                },
299            })
300    }
301
302    /// <https://streams.spec.whatwg.org/#reset-queue>
303    pub(crate) fn reset(&self) {
304        self.queue.borrow_mut().clear();
305        self.total_size.set(Default::default());
306    }
307}
308
309/// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller>
310#[dom_struct]
311pub(crate) struct ReadableStreamDefaultController {
312    reflector_: Reflector,
313
314    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-queue>
315    queue: QueueWithSizes,
316
317    /// A mutable reference to the underlying source is used to implement these two
318    /// internal slots:
319    ///
320    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pullalgorithm>
321    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-cancelalgorithm>
322    underlying_source: MutNullableDom<UnderlyingSourceContainer>,
323
324    stream: MutNullableDom<ReadableStream>,
325
326    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-strategyhwm>
327    strategy_hwm: f64,
328
329    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-strategysizealgorithm>
330    #[ignore_malloc_size_of = "mozjs"]
331    strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>,
332
333    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-closerequested>
334    close_requested: Cell<bool>,
335
336    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-started>
337    started: Cell<bool>,
338
339    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pulling>
340    pulling: Cell<bool>,
341
342    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pullagain>
343    pull_again: Cell<bool>,
344}
345
346impl ReadableStreamDefaultController {
347    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
348    fn new_inherited(
349        global: &GlobalScope,
350        underlying_source_type: UnderlyingSourceType,
351        strategy_hwm: f64,
352        strategy_size: Rc<QueuingStrategySize>,
353        can_gc: CanGc,
354    ) -> ReadableStreamDefaultController {
355        ReadableStreamDefaultController {
356            reflector_: Reflector::new(),
357            queue: Default::default(),
358            stream: MutNullableDom::new(None),
359            underlying_source: MutNullableDom::new(Some(&*UnderlyingSourceContainer::new(
360                global,
361                underlying_source_type,
362                can_gc,
363            ))),
364            strategy_hwm,
365            strategy_size: RefCell::new(Some(strategy_size)),
366            close_requested: Default::default(),
367            started: Default::default(),
368            pulling: Default::default(),
369            pull_again: Default::default(),
370        }
371    }
372
373    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
374    pub(crate) fn new(
375        global: &GlobalScope,
376        underlying_source: UnderlyingSourceType,
377        strategy_hwm: f64,
378        strategy_size: Rc<QueuingStrategySize>,
379        can_gc: CanGc,
380    ) -> DomRoot<ReadableStreamDefaultController> {
381        reflect_dom_object(
382            Box::new(ReadableStreamDefaultController::new_inherited(
383                global,
384                underlying_source,
385                strategy_hwm,
386                strategy_size,
387                can_gc,
388            )),
389            global,
390            can_gc,
391        )
392    }
393
394    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
395    pub(crate) fn setup(
396        &self,
397        stream: DomRoot<ReadableStream>,
398        can_gc: CanGc,
399    ) -> Result<(), Error> {
400        // Assert: stream.[[controller]] is undefined
401        stream.assert_no_controller();
402
403        // Set controller.[[stream]] to stream.
404        self.stream.set(Some(&stream));
405
406        let global = &*self.global();
407        let rooted_default_controller = DomRoot::from_ref(self);
408
409        // Perform ! ResetQueue(controller).
410        // Set controller.[[started]], controller.[[closeRequested]],
411        // controller.[[pullAgain]], and controller.[[pulling]] to false.
412        // Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm
413        // and controller.[[strategyHWM]] to highWaterMark.
414        // Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm
415        // and controller.[[strategyHWM]] to highWaterMark.
416        // Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
417
418        // Note: the above steps are done in `new`.
419
420        // Set stream.[[controller]] to controller.
421        stream.set_default_controller(&rooted_default_controller);
422
423        if let Some(underlying_source) = rooted_default_controller.underlying_source.get() {
424            // Let startResult be the result of performing startAlgorithm. (This might throw an exception.)
425            let start_result = underlying_source
426                .call_start_algorithm(
427                    Controller::ReadableStreamDefaultController(rooted_default_controller.clone()),
428                    can_gc,
429                )
430                .unwrap_or_else(|| {
431                    let promise = Promise::new(global, can_gc);
432                    promise.resolve_native(&(), can_gc);
433                    Ok(promise)
434                });
435
436            // Let startPromise be a promise resolved with startResult.
437            let start_promise = start_result?;
438
439            // Upon fulfillment of startPromise, Upon rejection of startPromise with reason r,
440            let handler = PromiseNativeHandler::new(
441                global,
442                Some(Box::new(StartAlgorithmFulfillmentHandler {
443                    controller: Dom::from_ref(&rooted_default_controller),
444                })),
445                Some(Box::new(StartAlgorithmRejectionHandler {
446                    controller: Dom::from_ref(&rooted_default_controller),
447                })),
448                can_gc,
449            );
450            let realm = enter_realm(global);
451            let comp = InRealm::Entered(&realm);
452            start_promise.append_native_handler(&handler, comp, can_gc);
453        };
454
455        Ok(())
456    }
457
458    /// Setting the JS object after the heap has settled down.
459    pub(crate) fn set_underlying_source_this_object(&self, this_object: HandleObject) {
460        if let Some(underlying_source) = self.underlying_source.get() {
461            underlying_source.set_underlying_source_this_object(this_object);
462        }
463    }
464
465    /// <https://streams.spec.whatwg.org/#dequeue-value>
466    fn dequeue_value(&self, cx: SafeJSContext, rval: MutableHandleValue, can_gc: CanGc) {
467        self.queue.dequeue_value(cx, Some(rval), can_gc);
468    }
469
470    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-should-call-pull>
471    fn should_call_pull(&self) -> bool {
472        // Let stream be controller.[[stream]].
473        // Note: the spec does not assert that stream is not undefined here,
474        // so we return false if it is.
475        let Some(stream) = self.stream.get() else {
476            debug!("`should_call_pull` called on a controller without a stream.");
477            return false;
478        };
479
480        // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return.
481        if !self.can_close_or_enqueue() {
482            return false;
483        }
484
485        // If controller.[[started]] is false, return false.
486        if !self.started.get() {
487            return false;
488        }
489
490        // If ! IsReadableStreamLocked(stream) is true
491        // and ! ReadableStreamGetNumReadRequests(stream) > 0, return true.
492        if stream.is_locked() && stream.get_num_read_requests() > 0 {
493            return true;
494        }
495
496        // Let desiredSize be ! ReadableStreamDefaultControllerGetDesiredSize(controller).
497        // Assert: desiredSize is not null.
498        let desired_size = self.get_desired_size().expect("desiredSize is not null.");
499
500        if desired_size > 0. {
501            return true;
502        }
503
504        false
505    }
506
507    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
508    fn call_pull_if_needed(&self, can_gc: CanGc) {
509        // Let shouldPull be ! ReadableStreamDefaultControllerShouldCallPull(controller).
510        // If shouldPull is false, return.
511        if !self.should_call_pull() {
512            return;
513        }
514
515        // If controller.[[pulling]] is true,
516        if self.pulling.get() {
517            // Set controller.[[pullAgain]] to true.
518            self.pull_again.set(true);
519
520            return;
521        }
522
523        // Set controller.[[pulling]] to true.
524        self.pulling.set(true);
525
526        // Let pullPromise be the result of performing controller.[[pullAlgorithm]].
527        // Continues into the resolve and reject handling of the native handler.
528        let global = self.global();
529        let rooted_default_controller = DomRoot::from_ref(self);
530        let controller =
531            Controller::ReadableStreamDefaultController(rooted_default_controller.clone());
532
533        let Some(underlying_source) = self.underlying_source.get() else {
534            return;
535        };
536        let handler = PromiseNativeHandler::new(
537            &global,
538            Some(Box::new(PullAlgorithmFulfillmentHandler {
539                controller: Dom::from_ref(&rooted_default_controller),
540            })),
541            Some(Box::new(PullAlgorithmRejectionHandler {
542                controller: Dom::from_ref(&rooted_default_controller),
543            })),
544            can_gc,
545        );
546
547        let realm = enter_realm(&*global);
548        let comp = InRealm::Entered(&realm);
549        let result = underlying_source
550            .call_pull_algorithm(controller, &global, can_gc)
551            .unwrap_or_else(|| {
552                let promise = Promise::new(&global, can_gc);
553                promise.resolve_native(&(), can_gc);
554                Ok(promise)
555            });
556        let promise = result.unwrap_or_else(|error| {
557            let cx = GlobalScope::get_cx();
558            rooted!(in(*cx) let mut rval = UndefinedValue());
559            // TODO: check if `self.global()` is the right globalscope.
560            error
561                .clone()
562                .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
563            let promise = Promise::new(&global, can_gc);
564            promise.reject_native(&rval.handle(), can_gc);
565            promise
566        });
567        promise.append_native_handler(&handler, comp, can_gc);
568    }
569
570    /// <https://streams.spec.whatwg.org/#rs-default-controller-private-cancel>
571    pub(crate) fn perform_cancel_steps(
572        &self,
573        cx: SafeJSContext,
574        global: &GlobalScope,
575        reason: SafeHandleValue,
576        can_gc: CanGc,
577    ) -> Rc<Promise> {
578        // Perform ! ResetQueue(this).
579        self.queue.reset();
580
581        let underlying_source = self
582            .underlying_source
583            .get()
584            .expect("Controller should have a source when the cancel steps are called into.");
585        // Let result be the result of performing this.[[cancelAlgorithm]], passing reason.
586        let result = underlying_source
587            .call_cancel_algorithm(cx, global, reason, can_gc)
588            .unwrap_or_else(|| {
589                let promise = Promise::new(global, can_gc);
590                promise.resolve_native(&(), can_gc);
591                Ok(promise)
592            });
593        let promise = result.unwrap_or_else(|error| {
594            rooted!(in(*cx) let mut rval = UndefinedValue());
595
596            error
597                .clone()
598                .to_jsval(cx, global, rval.handle_mut(), can_gc);
599            let promise = Promise::new(global, can_gc);
600            promise.reject_native(&rval.handle(), can_gc);
601            promise
602        });
603
604        // Perform ! ReadableStreamDefaultControllerClearAlgorithms(this).
605        self.clear_algorithms();
606
607        // Return result(the promise).
608        promise
609    }
610
611    /// <https://streams.spec.whatwg.org/#rs-default-controller-private-pull>
612    pub(crate) fn perform_pull_steps(&self, read_request: &ReadRequest, can_gc: CanGc) {
613        // Let stream be this.[[stream]].
614        // Note: the spec does not assert that there is a stream.
615        let Some(stream) = self.stream.get() else {
616            return;
617        };
618
619        // if queue contains bytes, perform chunk steps.
620        if !self.queue.is_empty() {
621            let cx = GlobalScope::get_cx();
622            rooted!(in(*cx) let mut rval = UndefinedValue());
623            let result = RootedTraceableBox::new(Heap::default());
624            self.dequeue_value(cx, rval.handle_mut(), can_gc);
625            result.set(*rval);
626
627            // If this.[[closeRequested]] is true and this.[[queue]] is empty
628            if self.close_requested.get() && self.queue.is_empty() {
629                // Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller).
630                self.clear_algorithms();
631
632                // Perform ! ReadableStreamClose(stream).
633                stream.close(can_gc);
634            } else {
635                // Otherwise, perform ! ReadableStreamDefaultControllerCallPullIfNeeded(this).
636                self.call_pull_if_needed(can_gc);
637            }
638            // Perform readRequest’s chunk steps, given chunk.
639            read_request.chunk_steps(result, &self.global(), can_gc);
640        } else {
641            // Perform ! ReadableStreamAddReadRequest(stream, readRequest).
642            stream.add_read_request(read_request);
643
644            // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(this).
645            self.call_pull_if_needed(can_gc);
646        }
647    }
648
649    /// <https://streams.spec.whatwg.org/#ref-for-abstract-opdef-readablestreamcontroller-releasesteps>
650    pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
651        // step 1 - Return.
652        Ok(())
653    }
654
655    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue>
656    #[expect(unsafe_code)]
657    pub(crate) fn enqueue(
658        &self,
659        cx: SafeJSContext,
660        chunk: SafeHandleValue,
661        can_gc: CanGc,
662    ) -> Result<(), Error> {
663        // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return.
664        if !self.can_close_or_enqueue() {
665            return Ok(());
666        }
667
668        let stream = self
669            .stream
670            .get()
671            .expect("Controller must have a stream when a chunk is enqueued.");
672
673        // If ! IsReadableStreamLocked(stream) is true
674        // and ! ReadableStreamGetNumReadRequests(stream) > 0,
675        // perform ! ReadableStreamFulfillReadRequest(stream, chunk, false).
676        if stream.is_locked() && stream.get_num_read_requests() > 0 {
677            stream.fulfill_read_request(chunk, false, can_gc);
678        } else {
679            // Otherwise,
680            // Let result be the result of performing controller.[[strategySizeAlgorithm]],
681            // passing in chunk, and interpreting the result as a completion record.
682            // Note: the clone is necessary to prevent potential re-borrow panics.
683            let strategy_size = {
684                let reference = self.strategy_size.borrow();
685                reference.clone()
686            };
687            let size = if let Some(strategy_size) = strategy_size {
688                // Note: the Rethrow exception handling is necessary,
689                // otherwise returning JSFailed will panic because no exception is pending.
690                let result = strategy_size.Call__(chunk, ExceptionHandling::Rethrow, can_gc);
691                match result {
692                    // Let chunkSize be result.[[Value]].
693                    Ok(size) => size,
694                    Err(error) => {
695                        // If result is an abrupt completion,
696                        rooted!(in(*cx) let mut rval = UndefinedValue());
697                        unsafe { assert!(JS_GetPendingException(*cx, rval.handle_mut())) };
698
699                        // Perform ! ReadableStreamDefaultControllerError(controller, result.[[Value]]).
700                        self.error(rval.handle(), can_gc);
701
702                        // Return result.
703                        // Note: we need to return a type error, because no exception is pending.
704                        return Err(error);
705                    },
706                }
707            } else {
708                0.
709            };
710
711            {
712                // Let enqueueResult be EnqueueValueWithSize(controller, chunk, chunkSize).
713                let res = self
714                    .queue
715                    .enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize {
716                        value: Heap::boxed(chunk.get()),
717                        size,
718                    }));
719                if let Err(error) = res {
720                    // If enqueueResult is an abrupt completion,
721
722                    // First, throw the exception.
723                    // Note: this must be done manually here,
724                    // because `enqueue_value_with_size` does not call into JS.
725                    throw_dom_exception(cx, &self.global(), error, can_gc);
726
727                    // Then, get a handle to the JS val for the exception,
728                    // and use that to error the stream.
729                    rooted!(in(*cx) let mut rval = UndefinedValue());
730                    unsafe { assert!(JS_GetPendingException(*cx, rval.handle_mut())) };
731
732                    // Perform ! ReadableStreamDefaultControllerError(controller, enqueueResult.[[Value]]).
733                    self.error(rval.handle(), can_gc);
734
735                    // Return enqueueResult.
736                    // Note: because we threw the exception above,
737                    // there is a pending exception and we can return JSFailed.
738                    return Err(Error::JSFailed);
739                }
740            }
741        }
742
743        // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
744        self.call_pull_if_needed(can_gc);
745
746        Ok(())
747    }
748
749    /// Native call to
750    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue>
751    pub(crate) fn enqueue_native(&self, chunk: Vec<u8>, can_gc: CanGc) {
752        let stream = self
753            .stream
754            .get()
755            .expect("Controller must have a stream when a chunk is enqueued.");
756        if stream.is_locked() && stream.get_num_read_requests() > 0 {
757            let cx = GlobalScope::get_cx();
758            rooted!(in(*cx) let mut rval = UndefinedValue());
759            EnqueuedValue::Native(chunk.into_boxed_slice()).to_jsval(cx, rval.handle_mut(), can_gc);
760            stream.fulfill_read_request(rval.handle(), false, can_gc);
761        } else {
762            self.queue
763                .enqueue_value_with_size(EnqueuedValue::Native(chunk.into_boxed_slice()))
764                .expect("Enqueuing a chunk from Rust should not fail.");
765        }
766    }
767
768    /// Does the stream have all data in memory?
769    pub(crate) fn in_memory(&self) -> bool {
770        let Some(underlying_source) = self.underlying_source.get() else {
771            return false;
772        };
773        underlying_source.in_memory()
774    }
775
776    /// Return bytes synchronously if the stream has all data in memory.
777    pub(crate) fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
778        let underlying_source = self.underlying_source.get()?;
779        if underlying_source.in_memory() {
780            return self.queue.get_in_memory_bytes();
781        }
782        None
783    }
784
785    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-clear-algorithms>
786    fn clear_algorithms(&self) {
787        // Set controller.[[pullAlgorithm]] to undefined.
788        // Set controller.[[cancelAlgorithm]] to undefined.
789        self.underlying_source.set(None);
790
791        // Set controller.[[strategySizeAlgorithm]] to undefined.
792        *self.strategy_size.borrow_mut() = None;
793    }
794
795    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-close>
796    pub(crate) fn close(&self, can_gc: CanGc) {
797        // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return.
798        if !self.can_close_or_enqueue() {
799            return;
800        }
801
802        let Some(stream) = self.stream.get() else {
803            return;
804        };
805
806        // Set controller.[[closeRequested]] to true.
807        self.close_requested.set(true);
808
809        if self.queue.is_empty() {
810            // Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller).
811            self.clear_algorithms();
812
813            // Perform ! ReadableStreamClose(stream).
814            stream.close(can_gc);
815        }
816    }
817
818    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-get-desired-size>
819    pub(crate) fn get_desired_size(&self) -> Option<f64> {
820        let stream = self.stream.get()?;
821
822        // If state is "errored", return null.
823        if stream.is_errored() {
824            return None;
825        }
826
827        // If state is "closed", return 0.
828        if stream.is_closed() {
829            return Some(0.0);
830        }
831
832        // Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
833        let desired_size = self.strategy_hwm - self.queue.total_size.get().clamp(0.0, f64::MAX);
834        Some(desired_size.clamp(desired_size, self.strategy_hwm))
835    }
836
837    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-can-close-or-enqueue>
838    pub(crate) fn can_close_or_enqueue(&self) -> bool {
839        let Some(stream) = self.stream.get() else {
840            return false;
841        };
842
843        // If controller.[[closeRequested]] is false and state is "readable", return true.
844        if !self.close_requested.get() && stream.is_readable() {
845            return true;
846        }
847
848        // Otherwise, return false.
849        false
850    }
851
852    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-error>
853    pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
854        let Some(stream) = self.stream.get() else {
855            return;
856        };
857
858        // If stream.[[state]] is not "readable", return.
859        if !stream.is_readable() {
860            return;
861        }
862
863        // Perform ! ResetQueue(controller).
864        self.queue.reset();
865
866        // Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller).
867        self.clear_algorithms();
868
869        stream.error(e, can_gc);
870    }
871
872    /// <https://streams.spec.whatwg.org/#rs-default-controller-has-backpressure>
873    pub(crate) fn has_backpressure(&self) -> bool {
874        // If ! ReadableStreamDefaultControllerShouldCallPull(controller) is true, return false.
875        // Otherwise, return true.
876        !self.should_call_pull()
877    }
878}
879
880impl ReadableStreamDefaultControllerMethods<crate::DomTypeHolder>
881    for ReadableStreamDefaultController
882{
883    /// <https://streams.spec.whatwg.org/#rs-default-controller-desired-size>
884    fn GetDesiredSize(&self) -> Option<f64> {
885        self.get_desired_size()
886    }
887
888    /// <https://streams.spec.whatwg.org/#rs-default-controller-close>
889    fn Close(&self, can_gc: CanGc) -> Fallible<()> {
890        if !self.can_close_or_enqueue() {
891            // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(this) is false,
892            // throw a TypeError exception.
893            return Err(Error::Type("Stream cannot be closed.".to_string()));
894        }
895
896        // Perform ! ReadableStreamDefaultControllerClose(this).
897        self.close(can_gc);
898
899        Ok(())
900    }
901
902    /// <https://streams.spec.whatwg.org/#rs-default-controller-enqueue>
903    fn Enqueue(&self, cx: SafeJSContext, chunk: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
904        // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(this) is false, throw a TypeError exception.
905        if !self.can_close_or_enqueue() {
906            return Err(Error::Type("Stream cannot be enqueued to.".to_string()));
907        }
908
909        // Perform ? ReadableStreamDefaultControllerEnqueue(this, chunk).
910        self.enqueue(cx, chunk, can_gc)
911    }
912
913    /// <https://streams.spec.whatwg.org/#rs-default-controller-error>
914    fn Error(&self, _cx: SafeJSContext, e: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
915        self.error(e, can_gc);
916        Ok(())
917    }
918}