script/dom/
readablestreamdefaultcontroller.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::jsapi::{Heap, JSObject};
12use js::jsval::{JSVal, UndefinedValue};
13use js::rust::wrappers::JS_GetPendingException;
14use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue, MutableHandleValue};
15use js::typedarray::Uint8;
16use script_bindings::conversions::SafeToJSValConvertible;
17
18use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
19use super::bindings::root::Dom;
20use crate::dom::bindings::buffer_source::create_buffer_source;
21use crate::dom::bindings::callback::ExceptionHandling;
22use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultControllerMethods;
23use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
24use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible, throw_dom_exception};
25use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
26use crate::dom::bindings::root::{DomRoot, MutNullableDom};
27use crate::dom::bindings::trace::RootedTraceableBox;
28use crate::dom::globalscope::GlobalScope;
29use crate::dom::promise::Promise;
30use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
31use crate::dom::readablestream::ReadableStream;
32use crate::dom::readablestreamdefaultreader::ReadRequest;
33use crate::dom::underlyingsourcecontainer::{UnderlyingSourceContainer, UnderlyingSourceType};
34use crate::realms::{InRealm, enter_realm};
35use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
36
37/// The fulfillment handler for
38/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
39#[derive(Clone, JSTraceable, MallocSizeOf)]
40#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
41struct PullAlgorithmFulfillmentHandler {
42    controller: Dom<ReadableStreamDefaultController>,
43}
44
45impl Callback for PullAlgorithmFulfillmentHandler {
46    /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
47    /// Upon fulfillment of pullPromise
48    fn callback(&self, _cx: SafeJSContext, _v: HandleValue, _realm: InRealm, can_gc: CanGc) {
49        // Set controller.[[pulling]] to false.
50        self.controller.pulling.set(false);
51
52        // If controller.[[pullAgain]] is true,
53        if self.controller.pull_again.get() {
54            // Set controller.[[pullAgain]] to false.
55            self.controller.pull_again.set(false);
56
57            // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
58            self.controller.call_pull_if_needed(can_gc);
59        }
60    }
61}
62
63/// The rejection handler for
64/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
65#[derive(Clone, JSTraceable, MallocSizeOf)]
66#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
67struct PullAlgorithmRejectionHandler {
68    controller: Dom<ReadableStreamDefaultController>,
69}
70
71impl Callback for PullAlgorithmRejectionHandler {
72    /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
73    /// Upon rejection of pullPromise with reason e.
74    fn callback(&self, _cx: SafeJSContext, v: HandleValue, _realm: InRealm, can_gc: CanGc) {
75        // Perform ! ReadableStreamDefaultControllerError(controller, e).
76        self.controller.error(v, can_gc);
77    }
78}
79
80/// The fulfillment handler for
81/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
82#[derive(Clone, JSTraceable, MallocSizeOf)]
83#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
84struct StartAlgorithmFulfillmentHandler {
85    controller: Dom<ReadableStreamDefaultController>,
86}
87
88impl Callback for StartAlgorithmFulfillmentHandler {
89    /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
90    /// Upon fulfillment of startPromise,
91    fn callback(&self, _cx: SafeJSContext, _v: HandleValue, _realm: InRealm, can_gc: CanGc) {
92        // Set controller.[[started]] to true.
93        self.controller.started.set(true);
94
95        // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
96        self.controller.call_pull_if_needed(can_gc);
97    }
98}
99
100/// The rejection handler for
101/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
102#[derive(Clone, JSTraceable, MallocSizeOf)]
103#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
104struct StartAlgorithmRejectionHandler {
105    controller: Dom<ReadableStreamDefaultController>,
106}
107
108impl Callback for StartAlgorithmRejectionHandler {
109    /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
110    /// Upon rejection of startPromise with reason r,
111    fn callback(&self, _cx: SafeJSContext, v: HandleValue, _realm: InRealm, can_gc: CanGc) {
112        // Perform ! ReadableStreamDefaultControllerError(controller, r).
113        self.controller.error(v, can_gc);
114    }
115}
116
117/// <https://streams.spec.whatwg.org/#value-with-size>
118#[derive(Debug, JSTraceable, MallocSizeOf, PartialEq)]
119#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
120pub(crate) struct ValueWithSize {
121    /// <https://streams.spec.whatwg.org/#value-with-size-value>
122    #[ignore_malloc_size_of = "Heap is measured by mozjs"]
123    pub(crate) value: Box<Heap<JSVal>>,
124    /// <https://streams.spec.whatwg.org/#value-with-size-size>
125    pub(crate) size: f64,
126}
127
128/// <https://streams.spec.whatwg.org/#value-with-size>
129#[derive(Debug, JSTraceable, MallocSizeOf, PartialEq)]
130#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
131pub(crate) enum EnqueuedValue {
132    /// A value enqueued from Rust.
133    Native(Box<[u8]>),
134    /// A Js value.
135    Js(ValueWithSize),
136    /// <https://streams.spec.whatwg.org/#close-sentinel>
137    CloseSentinel,
138}
139
140impl EnqueuedValue {
141    fn size(&self) -> f64 {
142        match self {
143            EnqueuedValue::Native(v) => v.len() as f64,
144            EnqueuedValue::Js(v) => v.size,
145            // The size of the sentinel is zero,
146            // as per <https://streams.spec.whatwg.org/#ref-for-close-sentinel%E2%91%A0>
147            EnqueuedValue::CloseSentinel => 0.,
148        }
149    }
150
151    fn to_jsval(&self, cx: SafeJSContext, rval: MutableHandleValue, can_gc: CanGc) {
152        match self {
153            EnqueuedValue::Native(chunk) => {
154                rooted!(in(*cx) let mut array_buffer_ptr = ptr::null_mut::<JSObject>());
155                create_buffer_source::<Uint8>(cx, chunk, array_buffer_ptr.handle_mut(), can_gc)
156                    .expect("failed to create buffer source for native chunk.");
157                array_buffer_ptr.safe_to_jsval(cx, rval);
158            },
159            EnqueuedValue::Js(value_with_size) => value_with_size.value.safe_to_jsval(cx, rval),
160            EnqueuedValue::CloseSentinel => {
161                unreachable!("The close sentinel is never made available as a js val.")
162            },
163        }
164    }
165}
166
167/// <https://streams.spec.whatwg.org/#is-non-negative-number>
168fn is_non_negative_number(value: &EnqueuedValue) -> bool {
169    let value_with_size = match value {
170        EnqueuedValue::Native(_) => return true,
171        EnqueuedValue::Js(value_with_size) => value_with_size,
172        EnqueuedValue::CloseSentinel => return true,
173    };
174
175    // If v is not a Number, return false.
176    // Checked as part of the WebIDL.
177
178    // If v is NaN, return false.
179    if value_with_size.size.is_nan() {
180        return false;
181    }
182
183    // If v < 0, return false.
184    if value_with_size.size.is_sign_negative() {
185        return false;
186    }
187
188    true
189}
190
191/// <https://streams.spec.whatwg.org/#queue-with-sizes>
192#[derive(Default, JSTraceable, MallocSizeOf)]
193#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
194pub(crate) struct QueueWithSizes {
195    queue: VecDeque<EnqueuedValue>,
196    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-queuetotalsize>
197    pub(crate) total_size: f64,
198}
199
200impl QueueWithSizes {
201    /// <https://streams.spec.whatwg.org/#dequeue-value>
202    /// A none `rval` means we're dequeing the close sentinel,
203    /// which should never be made available to script.
204    pub(crate) fn dequeue_value(
205        &mut self,
206        cx: SafeJSContext,
207        rval: Option<MutableHandleValue>,
208        can_gc: CanGc,
209    ) {
210        let Some(value) = self.queue.front() else {
211            unreachable!("Buffer cannot be empty when dequeue value is called into.");
212        };
213        self.total_size -= value.size();
214        if let Some(rval) = rval {
215            value.to_jsval(cx, rval, can_gc);
216        } else {
217            assert_eq!(value, &EnqueuedValue::CloseSentinel);
218        }
219        self.queue.pop_front();
220    }
221
222    /// <https://streams.spec.whatwg.org/#enqueue-value-with-size>
223    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
224    pub(crate) fn enqueue_value_with_size(&mut self, value: EnqueuedValue) -> Result<(), Error> {
225        // If ! IsNonNegativeNumber(size) is false, throw a RangeError exception.
226        if !is_non_negative_number(&value) {
227            return Err(Error::Range(
228                "The size of the enqueued chunk is not a non-negative number.".to_string(),
229            ));
230        }
231
232        // If size is +∞, throw a RangeError exception.
233        if value.size().is_infinite() {
234            return Err(Error::Range(
235                "The size of the enqueued chunk is infinite.".to_string(),
236            ));
237        }
238
239        self.total_size += value.size();
240        self.queue.push_back(value);
241
242        Ok(())
243    }
244
245    pub(crate) fn is_empty(&self) -> bool {
246        self.queue.is_empty()
247    }
248
249    /// <https://streams.spec.whatwg.org/#peek-queue-value>
250    /// Returns whether value is the close sentinel.
251    pub(crate) fn peek_queue_value(
252        &self,
253        cx: SafeJSContext,
254        rval: MutableHandleValue,
255        can_gc: CanGc,
256    ) -> bool {
257        // Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
258        // Done with the QueueWithSizes type.
259
260        // Assert: container.[[queue]] is not empty.
261        assert!(!self.is_empty());
262
263        // Let valueWithSize be container.[[queue]][0].
264        let value_with_size = self.queue.front().expect("Queue is not empty.");
265        if let EnqueuedValue::CloseSentinel = value_with_size {
266            return true;
267        }
268
269        // Return valueWithSize’s value.
270        value_with_size.to_jsval(cx, rval, can_gc);
271        false
272    }
273
274    /// Only used with native sources.
275    fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
276        self.queue
277            .iter()
278            .try_fold(Vec::new(), |mut acc, value| match value {
279                EnqueuedValue::Native(chunk) => {
280                    acc.extend(chunk.iter().copied());
281                    Some(acc)
282                },
283                _ => {
284                    warn!("get_in_memory_bytes called on a controller with non-native source.");
285                    None
286                },
287            })
288    }
289
290    /// <https://streams.spec.whatwg.org/#reset-queue>
291    pub(crate) fn reset(&mut self) {
292        self.queue.clear();
293        self.total_size = Default::default();
294    }
295}
296
297/// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller>
298#[dom_struct]
299pub(crate) struct ReadableStreamDefaultController {
300    reflector_: Reflector,
301
302    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-queue>
303    queue: RefCell<QueueWithSizes>,
304
305    /// A mutable reference to the underlying source is used to implement these two
306    /// internal slots:
307    ///
308    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pullalgorithm>
309    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-cancelalgorithm>
310    underlying_source: MutNullableDom<UnderlyingSourceContainer>,
311
312    stream: MutNullableDom<ReadableStream>,
313
314    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-strategyhwm>
315    strategy_hwm: f64,
316
317    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-strategysizealgorithm>
318    #[ignore_malloc_size_of = "mozjs"]
319    strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>,
320
321    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-closerequested>
322    close_requested: Cell<bool>,
323
324    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-started>
325    started: Cell<bool>,
326
327    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pulling>
328    pulling: Cell<bool>,
329
330    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pullagain>
331    pull_again: Cell<bool>,
332}
333
334impl ReadableStreamDefaultController {
335    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
336    fn new_inherited(
337        global: &GlobalScope,
338        underlying_source_type: UnderlyingSourceType,
339        strategy_hwm: f64,
340        strategy_size: Rc<QueuingStrategySize>,
341        can_gc: CanGc,
342    ) -> ReadableStreamDefaultController {
343        ReadableStreamDefaultController {
344            reflector_: Reflector::new(),
345            queue: RefCell::new(Default::default()),
346            stream: MutNullableDom::new(None),
347            underlying_source: MutNullableDom::new(Some(&*UnderlyingSourceContainer::new(
348                global,
349                underlying_source_type,
350                can_gc,
351            ))),
352            strategy_hwm,
353            strategy_size: RefCell::new(Some(strategy_size)),
354            close_requested: Default::default(),
355            started: Default::default(),
356            pulling: Default::default(),
357            pull_again: Default::default(),
358        }
359    }
360
361    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
362    pub(crate) fn new(
363        global: &GlobalScope,
364        underlying_source: UnderlyingSourceType,
365        strategy_hwm: f64,
366        strategy_size: Rc<QueuingStrategySize>,
367        can_gc: CanGc,
368    ) -> DomRoot<ReadableStreamDefaultController> {
369        reflect_dom_object(
370            Box::new(ReadableStreamDefaultController::new_inherited(
371                global,
372                underlying_source,
373                strategy_hwm,
374                strategy_size,
375                can_gc,
376            )),
377            global,
378            can_gc,
379        )
380    }
381
382    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
383    pub(crate) fn setup(
384        &self,
385        stream: DomRoot<ReadableStream>,
386        can_gc: CanGc,
387    ) -> Result<(), Error> {
388        // Assert: stream.[[controller]] is undefined
389        stream.assert_no_controller();
390
391        // Set controller.[[stream]] to stream.
392        self.stream.set(Some(&stream));
393
394        let global = &*self.global();
395        let rooted_default_controller = DomRoot::from_ref(self);
396
397        // Perform ! ResetQueue(controller).
398        // Set controller.[[started]], controller.[[closeRequested]],
399        // controller.[[pullAgain]], and controller.[[pulling]] to false.
400        // Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm
401        // and controller.[[strategyHWM]] to highWaterMark.
402        // Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm
403        // and controller.[[strategyHWM]] to highWaterMark.
404        // Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
405
406        // Note: the above steps are done in `new`.
407
408        // Set stream.[[controller]] to controller.
409        stream.set_default_controller(&rooted_default_controller);
410
411        if let Some(underlying_source) = rooted_default_controller.underlying_source.get() {
412            // Let startResult be the result of performing startAlgorithm. (This might throw an exception.)
413            let start_result = underlying_source
414                .call_start_algorithm(
415                    Controller::ReadableStreamDefaultController(rooted_default_controller.clone()),
416                    can_gc,
417                )
418                .unwrap_or_else(|| {
419                    let promise = Promise::new(global, can_gc);
420                    promise.resolve_native(&(), can_gc);
421                    Ok(promise)
422                });
423
424            // Let startPromise be a promise resolved with startResult.
425            let start_promise = start_result?;
426
427            // Upon fulfillment of startPromise, Upon rejection of startPromise with reason r,
428            let handler = PromiseNativeHandler::new(
429                global,
430                Some(Box::new(StartAlgorithmFulfillmentHandler {
431                    controller: Dom::from_ref(&rooted_default_controller),
432                })),
433                Some(Box::new(StartAlgorithmRejectionHandler {
434                    controller: Dom::from_ref(&rooted_default_controller),
435                })),
436                can_gc,
437            );
438            let realm = enter_realm(global);
439            let comp = InRealm::Entered(&realm);
440            start_promise.append_native_handler(&handler, comp, can_gc);
441        };
442
443        Ok(())
444    }
445
446    /// Setting the JS object after the heap has settled down.
447    pub(crate) fn set_underlying_source_this_object(&self, this_object: HandleObject) {
448        if let Some(underlying_source) = self.underlying_source.get() {
449            underlying_source.set_underlying_source_this_object(this_object);
450        }
451    }
452
453    /// <https://streams.spec.whatwg.org/#dequeue-value>
454    fn dequeue_value(&self, cx: SafeJSContext, rval: MutableHandleValue, can_gc: CanGc) {
455        let mut queue = self.queue.borrow_mut();
456        queue.dequeue_value(cx, Some(rval), can_gc);
457    }
458
459    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-should-call-pull>
460    fn should_call_pull(&self) -> bool {
461        // Let stream be controller.[[stream]].
462        // Note: the spec does not assert that stream is not undefined here,
463        // so we return false if it is.
464        let Some(stream) = self.stream.get() else {
465            debug!("`should_call_pull` called on a controller without a stream.");
466            return false;
467        };
468
469        // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return.
470        if !self.can_close_or_enqueue() {
471            return false;
472        }
473
474        // If controller.[[started]] is false, return false.
475        if !self.started.get() {
476            return false;
477        }
478
479        // If ! IsReadableStreamLocked(stream) is true
480        // and ! ReadableStreamGetNumReadRequests(stream) > 0, return true.
481        if stream.is_locked() && stream.get_num_read_requests() > 0 {
482            return true;
483        }
484
485        // Let desiredSize be ! ReadableStreamDefaultControllerGetDesiredSize(controller).
486        // Assert: desiredSize is not null.
487        let desired_size = self.get_desired_size().expect("desiredSize is not null.");
488
489        if desired_size > 0. {
490            return true;
491        }
492
493        false
494    }
495
496    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
497    fn call_pull_if_needed(&self, can_gc: CanGc) {
498        // Let shouldPull be ! ReadableStreamDefaultControllerShouldCallPull(controller).
499        // If shouldPull is false, return.
500        if !self.should_call_pull() {
501            return;
502        }
503
504        // If controller.[[pulling]] is true,
505        if self.pulling.get() {
506            // Set controller.[[pullAgain]] to true.
507            self.pull_again.set(true);
508
509            return;
510        }
511
512        // Set controller.[[pulling]] to true.
513        self.pulling.set(true);
514
515        // Let pullPromise be the result of performing controller.[[pullAlgorithm]].
516        // Continues into the resolve and reject handling of the native handler.
517        let global = self.global();
518        let rooted_default_controller = DomRoot::from_ref(self);
519        let controller =
520            Controller::ReadableStreamDefaultController(rooted_default_controller.clone());
521
522        let Some(underlying_source) = self.underlying_source.get() else {
523            return;
524        };
525        let handler = PromiseNativeHandler::new(
526            &global,
527            Some(Box::new(PullAlgorithmFulfillmentHandler {
528                controller: Dom::from_ref(&rooted_default_controller),
529            })),
530            Some(Box::new(PullAlgorithmRejectionHandler {
531                controller: Dom::from_ref(&rooted_default_controller),
532            })),
533            can_gc,
534        );
535
536        let realm = enter_realm(&*global);
537        let comp = InRealm::Entered(&realm);
538        let result = underlying_source
539            .call_pull_algorithm(controller, &global, can_gc)
540            .unwrap_or_else(|| {
541                let promise = Promise::new(&global, can_gc);
542                promise.resolve_native(&(), can_gc);
543                Ok(promise)
544            });
545        let promise = result.unwrap_or_else(|error| {
546            let cx = GlobalScope::get_cx();
547            rooted!(in(*cx) let mut rval = UndefinedValue());
548            // TODO: check if `self.global()` is the right globalscope.
549            error
550                .clone()
551                .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
552            let promise = Promise::new(&global, can_gc);
553            promise.reject_native(&rval.handle(), can_gc);
554            promise
555        });
556        promise.append_native_handler(&handler, comp, can_gc);
557    }
558
559    /// <https://streams.spec.whatwg.org/#rs-default-controller-private-cancel>
560    pub(crate) fn perform_cancel_steps(
561        &self,
562        cx: SafeJSContext,
563        global: &GlobalScope,
564        reason: SafeHandleValue,
565        can_gc: CanGc,
566    ) -> Rc<Promise> {
567        // Perform ! ResetQueue(this).
568        self.queue.borrow_mut().reset();
569
570        let underlying_source = self
571            .underlying_source
572            .get()
573            .expect("Controller should have a source when the cancel steps are called into.");
574        // Let result be the result of performing this.[[cancelAlgorithm]], passing reason.
575        let result = underlying_source
576            .call_cancel_algorithm(cx, global, reason, can_gc)
577            .unwrap_or_else(|| {
578                let promise = Promise::new(global, can_gc);
579                promise.resolve_native(&(), can_gc);
580                Ok(promise)
581            });
582        let promise = result.unwrap_or_else(|error| {
583            rooted!(in(*cx) let mut rval = UndefinedValue());
584
585            error
586                .clone()
587                .to_jsval(cx, global, rval.handle_mut(), can_gc);
588            let promise = Promise::new(global, can_gc);
589            promise.reject_native(&rval.handle(), can_gc);
590            promise
591        });
592
593        // Perform ! ReadableStreamDefaultControllerClearAlgorithms(this).
594        self.clear_algorithms();
595
596        // Return result(the promise).
597        promise
598    }
599
600    /// <https://streams.spec.whatwg.org/#rs-default-controller-private-pull>
601    pub(crate) fn perform_pull_steps(&self, read_request: &ReadRequest, can_gc: CanGc) {
602        // Let stream be this.[[stream]].
603        // Note: the spec does not assert that there is a stream.
604        let Some(stream) = self.stream.get() else {
605            return;
606        };
607
608        // if queue contains bytes, perform chunk steps.
609        if !self.queue.borrow().is_empty() {
610            let cx = GlobalScope::get_cx();
611            rooted!(in(*cx) let mut rval = UndefinedValue());
612            let result = RootedTraceableBox::new(Heap::default());
613            self.dequeue_value(cx, rval.handle_mut(), can_gc);
614            result.set(*rval);
615
616            // If this.[[closeRequested]] is true and this.[[queue]] is empty
617            if self.close_requested.get() && self.queue.borrow().is_empty() {
618                // Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller).
619                self.clear_algorithms();
620
621                // Perform ! ReadableStreamClose(stream).
622                stream.close(can_gc);
623            } else {
624                // Otherwise, perform ! ReadableStreamDefaultControllerCallPullIfNeeded(this).
625                self.call_pull_if_needed(can_gc);
626            }
627            // Perform readRequest’s chunk steps, given chunk.
628            read_request.chunk_steps(result, can_gc);
629        } else {
630            // Perform ! ReadableStreamAddReadRequest(stream, readRequest).
631            stream.add_read_request(read_request);
632
633            // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(this).
634            self.call_pull_if_needed(can_gc);
635        }
636    }
637
638    /// <https://streams.spec.whatwg.org/#ref-for-abstract-opdef-readablestreamcontroller-releasesteps>
639    pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
640        // step 1 - Return.
641        Ok(())
642    }
643
644    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue>
645    #[allow(unsafe_code)]
646    pub(crate) fn enqueue(
647        &self,
648        cx: SafeJSContext,
649        chunk: SafeHandleValue,
650        can_gc: CanGc,
651    ) -> Result<(), Error> {
652        // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return.
653        if !self.can_close_or_enqueue() {
654            return Ok(());
655        }
656
657        let stream = self
658            .stream
659            .get()
660            .expect("Controller must have a stream when a chunk is enqueued.");
661
662        // If ! IsReadableStreamLocked(stream) is true
663        // and ! ReadableStreamGetNumReadRequests(stream) > 0,
664        // perform ! ReadableStreamFulfillReadRequest(stream, chunk, false).
665        if stream.is_locked() && stream.get_num_read_requests() > 0 {
666            stream.fulfill_read_request(chunk, false, can_gc);
667        } else {
668            // Otherwise,
669            // Let result be the result of performing controller.[[strategySizeAlgorithm]],
670            // passing in chunk, and interpreting the result as a completion record.
671            // Note: the clone is necessary to prevent potential re-borrow panics.
672            let strategy_size = {
673                let reference = self.strategy_size.borrow();
674                reference.clone()
675            };
676            let size = if let Some(strategy_size) = strategy_size {
677                // Note: the Rethrow exception handling is necessary,
678                // otherwise returning JSFailed will panic because no exception is pending.
679                let result = strategy_size.Call__(chunk, ExceptionHandling::Rethrow, can_gc);
680                match result {
681                    // Let chunkSize be result.[[Value]].
682                    Ok(size) => size,
683                    Err(error) => {
684                        // If result is an abrupt completion,
685                        rooted!(in(*cx) let mut rval = UndefinedValue());
686                        unsafe { assert!(JS_GetPendingException(*cx, rval.handle_mut())) };
687
688                        // Perform ! ReadableStreamDefaultControllerError(controller, result.[[Value]]).
689                        self.error(rval.handle(), can_gc);
690
691                        // Return result.
692                        // Note: we need to return a type error, because no exception is pending.
693                        return Err(error);
694                    },
695                }
696            } else {
697                0.
698            };
699
700            {
701                // Let enqueueResult be EnqueueValueWithSize(controller, chunk, chunkSize).
702                let res = {
703                    let mut queue = self.queue.borrow_mut();
704                    queue.enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize {
705                        value: Heap::boxed(chunk.get()),
706                        size,
707                    }))
708                };
709                if let Err(error) = res {
710                    // If enqueueResult is an abrupt completion,
711
712                    // First, throw the exception.
713                    // Note: this must be done manually here,
714                    // because `enqueue_value_with_size` does not call into JS.
715                    throw_dom_exception(cx, &self.global(), error, can_gc);
716
717                    // Then, get a handle to the JS val for the exception,
718                    // and use that to error the stream.
719                    rooted!(in(*cx) let mut rval = UndefinedValue());
720                    unsafe { assert!(JS_GetPendingException(*cx, rval.handle_mut())) };
721
722                    // Perform ! ReadableStreamDefaultControllerError(controller, enqueueResult.[[Value]]).
723                    self.error(rval.handle(), can_gc);
724
725                    // Return enqueueResult.
726                    // Note: because we threw the exception above,
727                    // there is a pending exception and we can return JSFailed.
728                    return Err(Error::JSFailed);
729                }
730            }
731        }
732
733        // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
734        self.call_pull_if_needed(can_gc);
735
736        Ok(())
737    }
738
739    /// Native call to
740    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue>
741    pub(crate) fn enqueue_native(&self, chunk: Vec<u8>, can_gc: CanGc) {
742        let stream = self
743            .stream
744            .get()
745            .expect("Controller must have a stream when a chunk is enqueued.");
746        if stream.is_locked() && stream.get_num_read_requests() > 0 {
747            let cx = GlobalScope::get_cx();
748            rooted!(in(*cx) let mut rval = UndefinedValue());
749            EnqueuedValue::Native(chunk.into_boxed_slice()).to_jsval(cx, rval.handle_mut(), can_gc);
750            stream.fulfill_read_request(rval.handle(), false, can_gc);
751        } else {
752            let mut queue = self.queue.borrow_mut();
753            queue
754                .enqueue_value_with_size(EnqueuedValue::Native(chunk.into_boxed_slice()))
755                .expect("Enqueuing a chunk from Rust should not fail.");
756        }
757    }
758
759    /// Does the stream have all data in memory?
760    pub(crate) fn in_memory(&self) -> bool {
761        let Some(underlying_source) = self.underlying_source.get() else {
762            return false;
763        };
764        underlying_source.in_memory()
765    }
766
767    /// Return bytes synchronously if the stream has all data in memory.
768    pub(crate) fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
769        let underlying_source = self.underlying_source.get()?;
770        if underlying_source.in_memory() {
771            return self.queue.borrow().get_in_memory_bytes();
772        }
773        None
774    }
775
776    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-clear-algorithms>
777    fn clear_algorithms(&self) {
778        // Set controller.[[pullAlgorithm]] to undefined.
779        // Set controller.[[cancelAlgorithm]] to undefined.
780        self.underlying_source.set(None);
781
782        // Set controller.[[strategySizeAlgorithm]] to undefined.
783        *self.strategy_size.borrow_mut() = None;
784    }
785
786    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-close>
787    pub(crate) fn close(&self, can_gc: CanGc) {
788        // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return.
789        if !self.can_close_or_enqueue() {
790            return;
791        }
792
793        let Some(stream) = self.stream.get() else {
794            return;
795        };
796
797        // Set controller.[[closeRequested]] to true.
798        self.close_requested.set(true);
799
800        if self.queue.borrow().is_empty() {
801            // Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller).
802            self.clear_algorithms();
803
804            // Perform ! ReadableStreamClose(stream).
805            stream.close(can_gc);
806        }
807    }
808
809    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-get-desired-size>
810    pub(crate) fn get_desired_size(&self) -> Option<f64> {
811        let stream = self.stream.get()?;
812
813        // If state is "errored", return null.
814        if stream.is_errored() {
815            return None;
816        }
817
818        // If state is "closed", return 0.
819        if stream.is_closed() {
820            return Some(0.0);
821        }
822
823        // Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
824        let queue = self.queue.borrow();
825        let desired_size = self.strategy_hwm - queue.total_size.clamp(0.0, f64::MAX);
826        Some(desired_size.clamp(desired_size, self.strategy_hwm))
827    }
828
829    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-can-close-or-enqueue>
830    pub(crate) fn can_close_or_enqueue(&self) -> bool {
831        let Some(stream) = self.stream.get() else {
832            return false;
833        };
834
835        // If controller.[[closeRequested]] is false and state is "readable", return true.
836        if !self.close_requested.get() && stream.is_readable() {
837            return true;
838        }
839
840        // Otherwise, return false.
841        false
842    }
843
844    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-error>
845    pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
846        let Some(stream) = self.stream.get() else {
847            return;
848        };
849
850        // If stream.[[state]] is not "readable", return.
851        if !stream.is_readable() {
852            return;
853        }
854
855        // Perform ! ResetQueue(controller).
856        self.queue.borrow_mut().reset();
857
858        // Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller).
859        self.clear_algorithms();
860
861        stream.error(e, can_gc);
862    }
863
864    /// <https://streams.spec.whatwg.org/#rs-default-controller-has-backpressure>
865    pub(crate) fn has_backpressure(&self) -> bool {
866        // If ! ReadableStreamDefaultControllerShouldCallPull(controller) is true, return false.
867        // Otherwise, return true.
868        !self.should_call_pull()
869    }
870}
871
872impl ReadableStreamDefaultControllerMethods<crate::DomTypeHolder>
873    for ReadableStreamDefaultController
874{
875    /// <https://streams.spec.whatwg.org/#rs-default-controller-desired-size>
876    fn GetDesiredSize(&self) -> Option<f64> {
877        self.get_desired_size()
878    }
879
880    /// <https://streams.spec.whatwg.org/#rs-default-controller-close>
881    fn Close(&self, can_gc: CanGc) -> Fallible<()> {
882        if !self.can_close_or_enqueue() {
883            // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(this) is false,
884            // throw a TypeError exception.
885            return Err(Error::Type("Stream cannot be closed.".to_string()));
886        }
887
888        // Perform ! ReadableStreamDefaultControllerClose(this).
889        self.close(can_gc);
890
891        Ok(())
892    }
893
894    /// <https://streams.spec.whatwg.org/#rs-default-controller-enqueue>
895    fn Enqueue(&self, cx: SafeJSContext, chunk: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
896        // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(this) is false, throw a TypeError exception.
897        if !self.can_close_or_enqueue() {
898            return Err(Error::Type("Stream cannot be enqueued to.".to_string()));
899        }
900
901        // Perform ? ReadableStreamDefaultControllerEnqueue(this, chunk).
902        self.enqueue(cx, chunk, can_gc)
903    }
904
905    /// <https://streams.spec.whatwg.org/#rs-default-controller-error>
906    fn Error(&self, _cx: SafeJSContext, e: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
907        self.error(e, can_gc);
908        Ok(())
909    }
910}