Skip to main content

script/dom/stream/
readablestreamdefaultcontroller.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::context::JSContext;
12use js::jsapi::{Heap, JSObject};
13use js::jsval::{JSVal, UndefinedValue};
14use js::realm::CurrentRealm;
15use js::rust::wrappers2::JS_GetPendingException;
16use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue, MutableHandleValue};
17use js::typedarray::Uint8;
18use script_bindings::conversions::SafeToJSValConvertible;
19use script_bindings::reflector::{Reflector, reflect_dom_object};
20
21use crate::dom::bindings::buffer_source::create_buffer_source;
22use crate::dom::bindings::callback::ExceptionHandling;
23use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
24use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultControllerMethods;
25use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
26use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible, throw_dom_exception};
27use crate::dom::bindings::reflector::DomGlobal;
28use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
29use crate::dom::bindings::trace::RootedTraceableBox;
30use crate::dom::globalscope::GlobalScope;
31use crate::dom::promise::Promise;
32use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
33use crate::dom::stream::readablestream::ReadableStream;
34use crate::dom::stream::readablestreamdefaultreader::ReadRequest;
35use crate::dom::stream::underlyingsourcecontainer::{
36    UnderlyingSourceContainer, UnderlyingSourceType,
37};
38use crate::realms::enter_auto_realm;
39use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
40
41/// The fulfillment handler for
42/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
43#[derive(Clone, JSTraceable, MallocSizeOf)]
44#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
45struct PullAlgorithmFulfillmentHandler {
46    controller: Dom<ReadableStreamDefaultController>,
47}
48
49impl Callback for PullAlgorithmFulfillmentHandler {
50    /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
51    /// Upon fulfillment of pullPromise
52    fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
53        // Set controller.[[pulling]] to false.
54        self.controller.pulling.set(false);
55
56        // If controller.[[pullAgain]] is true,
57        if self.controller.pull_again.get() {
58            // Set controller.[[pullAgain]] to false.
59            self.controller.pull_again.set(false);
60
61            // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
62            self.controller.call_pull_if_needed(cx);
63        }
64    }
65}
66
67/// The rejection handler for
68/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
69#[derive(Clone, JSTraceable, MallocSizeOf)]
70#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
71struct PullAlgorithmRejectionHandler {
72    controller: Dom<ReadableStreamDefaultController>,
73}
74
75impl Callback for PullAlgorithmRejectionHandler {
76    /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
77    /// Upon rejection of pullPromise with reason e.
78    fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
79        // Perform ! ReadableStreamDefaultControllerError(controller, e).
80        self.controller.error(cx, v);
81    }
82}
83
84/// The fulfillment handler for
85/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
86#[derive(Clone, JSTraceable, MallocSizeOf)]
87#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
88struct StartAlgorithmFulfillmentHandler {
89    controller: Dom<ReadableStreamDefaultController>,
90}
91
92impl Callback for StartAlgorithmFulfillmentHandler {
93    /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
94    /// Upon fulfillment of startPromise,
95    fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
96        // Set controller.[[started]] to true.
97        self.controller.started.set(true);
98
99        // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
100        self.controller.call_pull_if_needed(cx);
101    }
102}
103
104/// The rejection handler for
105/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
106#[derive(Clone, JSTraceable, MallocSizeOf)]
107#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
108struct StartAlgorithmRejectionHandler {
109    controller: Dom<ReadableStreamDefaultController>,
110}
111
112impl Callback for StartAlgorithmRejectionHandler {
113    /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
114    /// Upon rejection of startPromise with reason r,
115    fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
116        // Perform ! ReadableStreamDefaultControllerError(controller, r).
117        self.controller.error(cx, v);
118    }
119}
120
121/// <https://streams.spec.whatwg.org/#value-with-size>
122#[derive(Debug, JSTraceable, MallocSizeOf, PartialEq)]
123#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
124pub(crate) struct ValueWithSize {
125    /// <https://streams.spec.whatwg.org/#value-with-size-value>
126    #[ignore_malloc_size_of = "Heap is measured by mozjs"]
127    pub(crate) value: Box<Heap<JSVal>>,
128    /// <https://streams.spec.whatwg.org/#value-with-size-size>
129    pub(crate) size: f64,
130}
131
132/// <https://streams.spec.whatwg.org/#value-with-size>
133#[derive(Debug, JSTraceable, MallocSizeOf, PartialEq)]
134#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
135pub(crate) enum EnqueuedValue {
136    /// A value enqueued from Rust.
137    Native(Box<[u8]>),
138    /// A Js value.
139    Js(ValueWithSize),
140    /// <https://streams.spec.whatwg.org/#close-sentinel>
141    CloseSentinel,
142}
143
144impl EnqueuedValue {
145    fn size(&self) -> f64 {
146        match self {
147            EnqueuedValue::Native(v) => v.len() as f64,
148            EnqueuedValue::Js(v) => v.size,
149            // The size of the sentinel is zero,
150            // as per <https://streams.spec.whatwg.org/#ref-for-close-sentinel%E2%91%A0>
151            EnqueuedValue::CloseSentinel => 0.,
152        }
153    }
154
155    fn to_jsval(&self, cx: SafeJSContext, rval: MutableHandleValue, can_gc: CanGc) {
156        match self {
157            EnqueuedValue::Native(chunk) => {
158                rooted!(in(*cx) let mut array_buffer_ptr = ptr::null_mut::<JSObject>());
159                create_buffer_source::<Uint8>(cx, chunk, array_buffer_ptr.handle_mut(), can_gc)
160                    .expect("failed to create buffer source for native chunk.");
161                array_buffer_ptr.safe_to_jsval(cx, rval, can_gc);
162            },
163            EnqueuedValue::Js(value_with_size) => {
164                value_with_size.value.safe_to_jsval(cx, rval, can_gc)
165            },
166            EnqueuedValue::CloseSentinel => {
167                unreachable!("The close sentinel is never made available as a js val.")
168            },
169        }
170    }
171}
172
173/// <https://streams.spec.whatwg.org/#is-non-negative-number>
174fn is_non_negative_number(value: &EnqueuedValue) -> bool {
175    let value_with_size = match value {
176        EnqueuedValue::Native(_) => return true,
177        EnqueuedValue::Js(value_with_size) => value_with_size,
178        EnqueuedValue::CloseSentinel => return true,
179    };
180
181    // If v is not a Number, return false.
182    // Checked as part of the WebIDL.
183
184    // If v is NaN, return false.
185    if value_with_size.size.is_nan() {
186        return false;
187    }
188
189    // If v < 0, return false.
190    if value_with_size.size.is_sign_negative() {
191        return false;
192    }
193
194    true
195}
196
197/// <https://streams.spec.whatwg.org/#queue-with-sizes>
198#[derive(Default, JSTraceable, MallocSizeOf)]
199#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
200pub(crate) struct QueueWithSizes {
201    queue: RefCell<VecDeque<EnqueuedValue>>,
202    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-queuetotalsize>
203    pub(crate) total_size: Cell<f64>,
204}
205
206impl QueueWithSizes {
207    /// <https://streams.spec.whatwg.org/#dequeue-value>
208    /// A none `rval` means we're dequeing the close sentinel,
209    /// which should never be made available to script.
210    pub(crate) fn dequeue_value(
211        &self,
212        cx: SafeJSContext,
213        rval: Option<MutableHandleValue>,
214        can_gc: CanGc,
215    ) {
216        {
217            let queue = self.queue.borrow();
218            let Some(value) = queue.front() else {
219                unreachable!("Buffer cannot be empty when dequeue value is called into.");
220            };
221            self.total_size.set(self.total_size.get() - value.size());
222            if let Some(rval) = rval {
223                value.to_jsval(cx, rval, can_gc);
224            } else {
225                assert_eq!(value, &EnqueuedValue::CloseSentinel);
226            }
227        }
228        self.queue.borrow_mut().pop_front();
229    }
230
231    /// <https://streams.spec.whatwg.org/#enqueue-value-with-size>
232    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
233    pub(crate) fn enqueue_value_with_size(&self, value: EnqueuedValue) -> Result<(), Error> {
234        // If ! IsNonNegativeNumber(size) is false, throw a RangeError exception.
235        if !is_non_negative_number(&value) {
236            return Err(Error::Range(
237                c"The size of the enqueued chunk is not a non-negative number.".to_owned(),
238            ));
239        }
240
241        // If size is +∞, throw a RangeError exception.
242        if value.size().is_infinite() {
243            return Err(Error::Range(
244                c"The size of the enqueued chunk is infinite.".to_owned(),
245            ));
246        }
247
248        self.total_size.set(self.total_size.get() + value.size());
249        self.queue.borrow_mut().push_back(value);
250
251        Ok(())
252    }
253
254    pub(crate) fn is_empty(&self) -> bool {
255        self.queue.borrow().is_empty()
256    }
257
258    /// <https://streams.spec.whatwg.org/#peek-queue-value>
259    /// Returns whether value is the close sentinel.
260    pub(crate) fn peek_queue_value(
261        &self,
262        cx: SafeJSContext,
263        rval: MutableHandleValue,
264        can_gc: CanGc,
265    ) -> bool {
266        // Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
267        // Done with the QueueWithSizes type.
268
269        // Assert: container.[[queue]] is not empty.
270        assert!(!self.is_empty());
271
272        // Let valueWithSize be container.[[queue]][0].
273        let queue = self.queue.borrow();
274        let value_with_size = queue.front().expect("Queue is not empty.");
275        if let EnqueuedValue::CloseSentinel = value_with_size {
276            return true;
277        }
278
279        // Return valueWithSize’s value.
280        value_with_size.to_jsval(cx, rval, can_gc);
281        false
282    }
283
284    /// Only used with native sources.
285    fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
286        self.queue
287            .borrow()
288            .iter()
289            .try_fold(Vec::new(), |mut acc, value| match value {
290                EnqueuedValue::Native(chunk) => {
291                    acc.extend(chunk.iter().copied());
292                    Some(acc)
293                },
294                _ => {
295                    warn!("get_in_memory_bytes called on a controller with non-native source.");
296                    None
297                },
298            })
299    }
300
301    /// <https://streams.spec.whatwg.org/#reset-queue>
302    pub(crate) fn reset(&self) {
303        self.queue.borrow_mut().clear();
304        self.total_size.set(Default::default());
305    }
306}
307
308/// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller>
309#[dom_struct]
310pub(crate) struct ReadableStreamDefaultController {
311    reflector_: Reflector,
312
313    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-queue>
314    queue: QueueWithSizes,
315
316    /// A mutable reference to the underlying source is used to implement these two
317    /// internal slots:
318    ///
319    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pullalgorithm>
320    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-cancelalgorithm>
321    underlying_source: MutNullableDom<UnderlyingSourceContainer>,
322
323    stream: MutNullableDom<ReadableStream>,
324
325    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-strategyhwm>
326    strategy_hwm: f64,
327
328    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-strategysizealgorithm>
329    #[ignore_malloc_size_of = "mozjs"]
330    strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>,
331
332    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-closerequested>
333    close_requested: Cell<bool>,
334
335    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-started>
336    started: Cell<bool>,
337
338    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pulling>
339    pulling: Cell<bool>,
340
341    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pullagain>
342    pull_again: Cell<bool>,
343}
344
345impl ReadableStreamDefaultController {
346    fn new_inherited(
347        global: &GlobalScope,
348        underlying_source_type: UnderlyingSourceType,
349        strategy_hwm: f64,
350        strategy_size: Rc<QueuingStrategySize>,
351        can_gc: CanGc,
352    ) -> ReadableStreamDefaultController {
353        ReadableStreamDefaultController {
354            reflector_: Reflector::new(),
355            queue: Default::default(),
356            stream: MutNullableDom::new(None),
357            underlying_source: MutNullableDom::new(Some(&*UnderlyingSourceContainer::new(
358                global,
359                underlying_source_type,
360                can_gc,
361            ))),
362            strategy_hwm,
363            strategy_size: RefCell::new(Some(strategy_size)),
364            close_requested: Default::default(),
365            started: Default::default(),
366            pulling: Default::default(),
367            pull_again: Default::default(),
368        }
369    }
370
371    pub(crate) fn new(
372        global: &GlobalScope,
373        underlying_source: UnderlyingSourceType,
374        strategy_hwm: f64,
375        strategy_size: Rc<QueuingStrategySize>,
376        can_gc: CanGc,
377    ) -> DomRoot<ReadableStreamDefaultController> {
378        reflect_dom_object(
379            Box::new(ReadableStreamDefaultController::new_inherited(
380                global,
381                underlying_source,
382                strategy_hwm,
383                strategy_size,
384                can_gc,
385            )),
386            global,
387            can_gc,
388        )
389    }
390
391    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
392    pub(crate) fn setup(
393        &self,
394        cx: &mut JSContext,
395        stream: DomRoot<ReadableStream>,
396    ) -> Result<(), Error> {
397        // Assert: stream.[[controller]] is undefined
398        stream.assert_no_controller();
399
400        // Set controller.[[stream]] to stream.
401        self.stream.set(Some(&stream));
402
403        let global = &*self.global();
404        let rooted_default_controller = DomRoot::from_ref(self);
405
406        // Perform ! ResetQueue(controller).
407        // Set controller.[[started]], controller.[[closeRequested]],
408        // controller.[[pullAgain]], and controller.[[pulling]] to false.
409        // Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm
410        // and controller.[[strategyHWM]] to highWaterMark.
411        // Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm
412        // and controller.[[strategyHWM]] to highWaterMark.
413        // Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
414
415        // Note: the above steps are done in `new`.
416
417        // Set stream.[[controller]] to controller.
418        stream.set_default_controller(&rooted_default_controller);
419
420        if let Some(underlying_source) = rooted_default_controller.underlying_source.get() {
421            // Let startResult be the result of performing startAlgorithm. (This might throw an exception.)
422            let start_result = underlying_source
423                .call_start_algorithm(
424                    cx,
425                    Controller::ReadableStreamDefaultController(rooted_default_controller.clone()),
426                )
427                .unwrap_or_else(|| {
428                    let promise = Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
429                    Ok(promise)
430                });
431
432            // Let startPromise be a promise resolved with startResult.
433            let start_promise = start_result?;
434
435            // Upon fulfillment of startPromise, Upon rejection of startPromise with reason r,
436            let handler = PromiseNativeHandler::new(
437                global,
438                Some(Box::new(StartAlgorithmFulfillmentHandler {
439                    controller: Dom::from_ref(&rooted_default_controller),
440                })),
441                Some(Box::new(StartAlgorithmRejectionHandler {
442                    controller: Dom::from_ref(&rooted_default_controller),
443                })),
444                CanGc::from_cx(cx),
445            );
446            let mut realm = enter_auto_realm(cx, global);
447            let cx = &mut realm.current_realm();
448            start_promise.append_native_handler(cx, &handler);
449        };
450
451        Ok(())
452    }
453
454    /// Setting the JS object after the heap has settled down.
455    pub(crate) fn set_underlying_source_this_object(&self, this_object: HandleObject) {
456        if let Some(underlying_source) = self.underlying_source.get() {
457            underlying_source.set_underlying_source_this_object(this_object);
458        }
459    }
460
461    /// <https://streams.spec.whatwg.org/#dequeue-value>
462    fn dequeue_value(&self, cx: SafeJSContext, rval: MutableHandleValue, can_gc: CanGc) {
463        self.queue.dequeue_value(cx, Some(rval), can_gc);
464    }
465
466    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-should-call-pull>
467    fn should_call_pull(&self) -> bool {
468        // Let stream be controller.[[stream]].
469        // Note: the spec does not assert that stream is not undefined here,
470        // so we return false if it is.
471        let Some(stream) = self.stream.get() else {
472            debug!("`should_call_pull` called on a controller without a stream.");
473            return false;
474        };
475
476        // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return.
477        if !self.can_close_or_enqueue() {
478            return false;
479        }
480
481        // If controller.[[started]] is false, return false.
482        if !self.started.get() {
483            return false;
484        }
485
486        // If ! IsReadableStreamLocked(stream) is true
487        // and ! ReadableStreamGetNumReadRequests(stream) > 0, return true.
488        if stream.is_locked() && stream.get_num_read_requests() > 0 {
489            return true;
490        }
491
492        // Let desiredSize be ! ReadableStreamDefaultControllerGetDesiredSize(controller).
493        // Assert: desiredSize is not null.
494        let desired_size = self.get_desired_size().expect("desiredSize is not null.");
495
496        if desired_size > 0. {
497            return true;
498        }
499
500        false
501    }
502
503    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
504    fn call_pull_if_needed(&self, cx: &mut JSContext) {
505        // Let shouldPull be ! ReadableStreamDefaultControllerShouldCallPull(controller).
506        // If shouldPull is false, return.
507        if !self.should_call_pull() {
508            return;
509        }
510
511        // If controller.[[pulling]] is true,
512        if self.pulling.get() {
513            // Set controller.[[pullAgain]] to true.
514            self.pull_again.set(true);
515
516            return;
517        }
518
519        // Set controller.[[pulling]] to true.
520        self.pulling.set(true);
521
522        // Let pullPromise be the result of performing controller.[[pullAlgorithm]].
523        // Continues into the resolve and reject handling of the native handler.
524        let global = self.global();
525        let rooted_default_controller = DomRoot::from_ref(self);
526        let controller =
527            Controller::ReadableStreamDefaultController(rooted_default_controller.clone());
528
529        let Some(underlying_source) = self.underlying_source.get() else {
530            return;
531        };
532        let handler = PromiseNativeHandler::new(
533            &global,
534            Some(Box::new(PullAlgorithmFulfillmentHandler {
535                controller: Dom::from_ref(&rooted_default_controller),
536            })),
537            Some(Box::new(PullAlgorithmRejectionHandler {
538                controller: Dom::from_ref(&rooted_default_controller),
539            })),
540            CanGc::from_cx(cx),
541        );
542
543        let mut realm = enter_auto_realm(cx, &*global);
544        let cx = &mut realm.current_realm();
545
546        let result = underlying_source
547            .call_pull_algorithm(cx, controller)
548            .unwrap_or_else(|| {
549                let promise = Promise::new_resolved(&global, cx.into(), (), CanGc::from_cx(cx));
550                Ok(promise)
551            });
552        let promise = result.unwrap_or_else(|error| {
553            rooted!(&in(cx) let mut rval = UndefinedValue());
554            // TODO: check if `self.global()` is the right globalscope.
555            error.to_jsval(cx.into(), &global, rval.handle_mut(), CanGc::from_cx(cx));
556            Promise::new_rejected(&global, cx.into(), rval.handle(), CanGc::from_cx(cx))
557        });
558        promise.append_native_handler(cx, &handler);
559    }
560
561    /// <https://streams.spec.whatwg.org/#rs-default-controller-private-cancel>
562    pub(crate) fn perform_cancel_steps(
563        &self,
564        cx: &mut JSContext,
565        global: &GlobalScope,
566        reason: SafeHandleValue,
567    ) -> Rc<Promise> {
568        // Perform ! ResetQueue(this).
569        self.queue.reset();
570
571        let underlying_source = self
572            .underlying_source
573            .get()
574            .expect("Controller should have a source when the cancel steps are called into.");
575        // Let result be the result of performing this.[[cancelAlgorithm]], passing reason.
576        let result = underlying_source
577            .call_cancel_algorithm(cx, global, reason)
578            .unwrap_or_else(|| {
579                let promise = Promise::new2(cx, global);
580                promise.resolve_native(&(), CanGc::from_cx(cx));
581                Ok(promise)
582            });
583        let promise = result.unwrap_or_else(|error| {
584            rooted!(&in(cx) let mut rval = UndefinedValue());
585
586            error.to_jsval(cx.into(), global, rval.handle_mut(), CanGc::from_cx(cx));
587            let promise = Promise::new2(cx, global);
588            promise.reject_native(&rval.handle(), CanGc::from_cx(cx));
589            promise
590        });
591
592        // Perform ! ReadableStreamDefaultControllerClearAlgorithms(this).
593        self.clear_algorithms();
594
595        // Return result(the promise).
596        promise
597    }
598
599    /// <https://streams.spec.whatwg.org/#rs-default-controller-private-pull>
600    pub(crate) fn perform_pull_steps(&self, cx: &mut JSContext, read_request: &ReadRequest) {
601        // Let stream be this.[[stream]].
602        // Note: the spec does not assert that there is a stream.
603        let Some(stream) = self.stream.get() else {
604            return;
605        };
606
607        // if queue contains bytes, perform chunk steps.
608        if !self.queue.is_empty() {
609            rooted!(&in(cx) let mut rval = UndefinedValue());
610            let result = RootedTraceableBox::new(Heap::default());
611            self.dequeue_value(cx.into(), rval.handle_mut(), CanGc::from_cx(cx));
612            result.set(*rval);
613
614            // If this.[[closeRequested]] is true and this.[[queue]] is empty
615            if self.close_requested.get() && self.queue.is_empty() {
616                // Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller).
617                self.clear_algorithms();
618
619                // Perform ! ReadableStreamClose(stream).
620                stream.close(cx);
621            } else {
622                // Otherwise, perform ! ReadableStreamDefaultControllerCallPullIfNeeded(this).
623                self.call_pull_if_needed(cx);
624            }
625            // Perform readRequest’s chunk steps, given chunk.
626            read_request.chunk_steps(cx, result, &self.global());
627        } else {
628            // Perform ! ReadableStreamAddReadRequest(stream, readRequest).
629            stream.add_read_request(read_request);
630
631            // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(this).
632            self.call_pull_if_needed(cx);
633        }
634    }
635
636    /// <https://streams.spec.whatwg.org/#ref-for-abstract-opdef-readablestreamcontroller-releasesteps>
637    pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
638        // step 1 - Return.
639        Ok(())
640    }
641
642    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue>
643    #[expect(unsafe_code)]
644    pub(crate) fn enqueue(&self, cx: &mut JSContext, chunk: SafeHandleValue) -> Result<(), Error> {
645        // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return.
646        if !self.can_close_or_enqueue() {
647            return Ok(());
648        }
649
650        let stream = self
651            .stream
652            .get()
653            .expect("Controller must have a stream when a chunk is enqueued.");
654
655        // If ! IsReadableStreamLocked(stream) is true
656        // and ! ReadableStreamGetNumReadRequests(stream) > 0,
657        // perform ! ReadableStreamFulfillReadRequest(stream, chunk, false).
658        if stream.is_locked() && stream.get_num_read_requests() > 0 {
659            stream.fulfill_read_request(cx, chunk, false);
660        } else {
661            // Otherwise,
662            // Let result be the result of performing controller.[[strategySizeAlgorithm]],
663            // passing in chunk, and interpreting the result as a completion record.
664            // Note: the clone is necessary to prevent potential re-borrow panics.
665            let strategy_size = {
666                let reference = self.strategy_size.borrow();
667                reference.clone()
668            };
669            let size = if let Some(strategy_size) = strategy_size {
670                // Note: the Rethrow exception handling is necessary,
671                // otherwise returning JSFailed will panic because no exception is pending.
672                let result = strategy_size.Call__(cx, chunk, ExceptionHandling::Rethrow);
673                match result {
674                    // Let chunkSize be result.[[Value]].
675                    Ok(size) => size,
676                    Err(error) => {
677                        // If result is an abrupt completion,
678                        rooted!(&in(cx) let mut rval = UndefinedValue());
679                        unsafe { assert!(JS_GetPendingException(cx, rval.handle_mut())) };
680
681                        // Perform ! ReadableStreamDefaultControllerError(controller, result.[[Value]]).
682                        self.error(cx, rval.handle());
683
684                        // Return result.
685                        // Note: we need to return a type error, because no exception is pending.
686                        return Err(error);
687                    },
688                }
689            } else {
690                0.
691            };
692
693            {
694                // Let enqueueResult be EnqueueValueWithSize(controller, chunk, chunkSize).
695                let res = self
696                    .queue
697                    .enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize {
698                        value: Heap::boxed(chunk.get()),
699                        size,
700                    }));
701                if let Err(error) = res {
702                    // If enqueueResult is an abrupt completion,
703
704                    // First, throw the exception.
705                    // Note: this must be done manually here,
706                    // because `enqueue_value_with_size` does not call into JS.
707                    throw_dom_exception(cx.into(), &self.global(), error, CanGc::from_cx(cx));
708
709                    // Then, get a handle to the JS val for the exception,
710                    // and use that to error the stream.
711                    rooted!(&in(cx) let mut rval = UndefinedValue());
712                    unsafe { assert!(JS_GetPendingException(cx, rval.handle_mut())) };
713
714                    // Perform ! ReadableStreamDefaultControllerError(controller, enqueueResult.[[Value]]).
715                    self.error(cx, rval.handle());
716
717                    // Return enqueueResult.
718                    // Note: because we threw the exception above,
719                    // there is a pending exception and we can return JSFailed.
720                    return Err(Error::JSFailed);
721                }
722            }
723        }
724
725        // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
726        self.call_pull_if_needed(cx);
727
728        Ok(())
729    }
730
731    /// Native call to
732    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue>
733    pub(crate) fn enqueue_native(&self, cx: &mut JSContext, chunk: Vec<u8>) {
734        let stream = self
735            .stream
736            .get()
737            .expect("Controller must have a stream when a chunk is enqueued.");
738        if stream.is_locked() && stream.get_num_read_requests() > 0 {
739            rooted!(&in(cx) let mut rval = UndefinedValue());
740            EnqueuedValue::Native(chunk.into_boxed_slice()).to_jsval(
741                cx.into(),
742                rval.handle_mut(),
743                CanGc::from_cx(cx),
744            );
745            stream.fulfill_read_request(cx, rval.handle(), false);
746        } else {
747            self.queue
748                .enqueue_value_with_size(EnqueuedValue::Native(chunk.into_boxed_slice()))
749                .expect("Enqueuing a chunk from Rust should not fail.");
750        }
751    }
752
753    /// Does the stream have all data in memory?
754    pub(crate) fn in_memory(&self) -> bool {
755        let Some(underlying_source) = self.underlying_source.get() else {
756            return false;
757        };
758        underlying_source.in_memory()
759    }
760
761    /// Return bytes synchronously if the stream has all data in memory.
762    pub(crate) fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
763        let underlying_source = self.underlying_source.get()?;
764        if underlying_source.in_memory() {
765            return self.queue.get_in_memory_bytes();
766        }
767        None
768    }
769
770    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-clear-algorithms>
771    fn clear_algorithms(&self) {
772        // Set controller.[[pullAlgorithm]] to undefined.
773        // Set controller.[[cancelAlgorithm]] to undefined.
774        self.underlying_source.set(None);
775
776        // Set controller.[[strategySizeAlgorithm]] to undefined.
777        *self.strategy_size.borrow_mut() = None;
778    }
779
780    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-close>
781    pub(crate) fn close(&self, cx: &mut JSContext) {
782        // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return.
783        if !self.can_close_or_enqueue() {
784            return;
785        }
786
787        let Some(stream) = self.stream.get() else {
788            return;
789        };
790
791        // Set controller.[[closeRequested]] to true.
792        self.close_requested.set(true);
793
794        if self.queue.is_empty() {
795            // Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller).
796            self.clear_algorithms();
797
798            // Perform ! ReadableStreamClose(stream).
799            stream.close(cx);
800        }
801    }
802
803    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-get-desired-size>
804    pub(crate) fn get_desired_size(&self) -> Option<f64> {
805        let stream = self.stream.get()?;
806
807        // If state is "errored", return null.
808        if stream.is_errored() {
809            return None;
810        }
811
812        // If state is "closed", return 0.
813        if stream.is_closed() {
814            return Some(0.0);
815        }
816
817        // Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
818        let desired_size = self.strategy_hwm - self.queue.total_size.get().clamp(0.0, f64::MAX);
819        Some(desired_size.clamp(desired_size, self.strategy_hwm))
820    }
821
822    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-can-close-or-enqueue>
823    pub(crate) fn can_close_or_enqueue(&self) -> bool {
824        let Some(stream) = self.stream.get() else {
825            return false;
826        };
827
828        // If controller.[[closeRequested]] is false and state is "readable", return true.
829        if !self.close_requested.get() && stream.is_readable() {
830            return true;
831        }
832
833        // Otherwise, return false.
834        false
835    }
836
837    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-error>
838    pub(crate) fn error(&self, cx: &mut JSContext, e: SafeHandleValue) {
839        let Some(stream) = self.stream.get() else {
840            return;
841        };
842
843        // If stream.[[state]] is not "readable", return.
844        if !stream.is_readable() {
845            return;
846        }
847
848        // Perform ! ResetQueue(controller).
849        self.queue.reset();
850
851        // Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller).
852        self.clear_algorithms();
853
854        stream.error(cx, e);
855    }
856
857    /// <https://streams.spec.whatwg.org/#rs-default-controller-has-backpressure>
858    pub(crate) fn has_backpressure(&self) -> bool {
859        // If ! ReadableStreamDefaultControllerShouldCallPull(controller) is true, return false.
860        // Otherwise, return true.
861        !self.should_call_pull()
862    }
863}
864
865impl ReadableStreamDefaultControllerMethods<crate::DomTypeHolder>
866    for ReadableStreamDefaultController
867{
868    /// <https://streams.spec.whatwg.org/#rs-default-controller-desired-size>
869    fn GetDesiredSize(&self) -> Option<f64> {
870        self.get_desired_size()
871    }
872
873    /// <https://streams.spec.whatwg.org/#rs-default-controller-close>
874    fn Close(&self, cx: &mut JSContext) -> Fallible<()> {
875        if !self.can_close_or_enqueue() {
876            // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(this) is false,
877            // throw a TypeError exception.
878            return Err(Error::Type(c"Stream cannot be closed.".to_owned()));
879        }
880
881        // Perform ! ReadableStreamDefaultControllerClose(this).
882        self.close(cx);
883
884        Ok(())
885    }
886
887    /// <https://streams.spec.whatwg.org/#rs-default-controller-enqueue>
888    fn Enqueue(&self, cx: &mut JSContext, chunk: SafeHandleValue) -> Fallible<()> {
889        // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(this) is false, throw a TypeError exception.
890        if !self.can_close_or_enqueue() {
891            return Err(Error::Type(c"Stream cannot be enqueued to.".to_owned()));
892        }
893
894        // Perform ? ReadableStreamDefaultControllerEnqueue(this, chunk).
895        self.enqueue(cx, chunk)
896    }
897
898    /// <https://streams.spec.whatwg.org/#rs-default-controller-error>
899    fn Error(&self, cx: &mut JSContext, e: SafeHandleValue) -> Fallible<()> {
900        self.error(cx, e);
901        Ok(())
902    }
903}