script/dom/
readablestreamdefaultcontroller.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::jsapi::{Heap, JSObject};
12use js::jsval::{JSVal, UndefinedValue};
13use js::rust::wrappers::JS_GetPendingException;
14use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue, MutableHandleValue};
15use js::typedarray::Uint8;
16use script_bindings::conversions::SafeToJSValConvertible;
17
18use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
19use super::bindings::root::Dom;
20use crate::dom::bindings::buffer_source::create_buffer_source;
21use crate::dom::bindings::callback::ExceptionHandling;
22use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultControllerMethods;
23use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
24use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible, throw_dom_exception};
25use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
26use crate::dom::bindings::root::{DomRoot, MutNullableDom};
27use crate::dom::bindings::trace::RootedTraceableBox;
28use crate::dom::globalscope::GlobalScope;
29use crate::dom::promise::Promise;
30use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
31use crate::dom::readablestream::ReadableStream;
32use crate::dom::readablestreamdefaultreader::ReadRequest;
33use crate::dom::underlyingsourcecontainer::{UnderlyingSourceContainer, UnderlyingSourceType};
34use crate::realms::{InRealm, enter_realm};
35use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
36
37/// The fulfillment handler for
38/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
39#[derive(Clone, JSTraceable, MallocSizeOf)]
40#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
41struct PullAlgorithmFulfillmentHandler {
42    controller: Dom<ReadableStreamDefaultController>,
43}
44
45impl Callback for PullAlgorithmFulfillmentHandler {
46    /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
47    /// Upon fulfillment of pullPromise
48    fn callback(&self, _cx: SafeJSContext, _v: HandleValue, _realm: InRealm, can_gc: CanGc) {
49        // Set controller.[[pulling]] to false.
50        self.controller.pulling.set(false);
51
52        // If controller.[[pullAgain]] is true,
53        if self.controller.pull_again.get() {
54            // Set controller.[[pullAgain]] to false.
55            self.controller.pull_again.set(false);
56
57            // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
58            self.controller.call_pull_if_needed(can_gc);
59        }
60    }
61}
62
63/// The rejection handler for
64/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
65#[derive(Clone, JSTraceable, MallocSizeOf)]
66#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
67struct PullAlgorithmRejectionHandler {
68    controller: Dom<ReadableStreamDefaultController>,
69}
70
71impl Callback for PullAlgorithmRejectionHandler {
72    /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
73    /// Upon rejection of pullPromise with reason e.
74    fn callback(&self, _cx: SafeJSContext, v: HandleValue, _realm: InRealm, can_gc: CanGc) {
75        // Perform ! ReadableStreamDefaultControllerError(controller, e).
76        self.controller.error(v, can_gc);
77    }
78}
79
80/// The fulfillment handler for
81/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
82#[derive(Clone, JSTraceable, MallocSizeOf)]
83#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
84struct StartAlgorithmFulfillmentHandler {
85    controller: Dom<ReadableStreamDefaultController>,
86}
87
88impl Callback for StartAlgorithmFulfillmentHandler {
89    /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
90    /// Upon fulfillment of startPromise,
91    fn callback(&self, _cx: SafeJSContext, _v: HandleValue, _realm: InRealm, can_gc: CanGc) {
92        // Set controller.[[started]] to true.
93        self.controller.started.set(true);
94
95        // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
96        self.controller.call_pull_if_needed(can_gc);
97    }
98}
99
100/// The rejection handler for
101/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
102#[derive(Clone, JSTraceable, MallocSizeOf)]
103#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
104struct StartAlgorithmRejectionHandler {
105    controller: Dom<ReadableStreamDefaultController>,
106}
107
108impl Callback for StartAlgorithmRejectionHandler {
109    /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
110    /// Upon rejection of startPromise with reason r,
111    fn callback(&self, _cx: SafeJSContext, v: HandleValue, _realm: InRealm, can_gc: CanGc) {
112        // Perform ! ReadableStreamDefaultControllerError(controller, r).
113        self.controller.error(v, can_gc);
114    }
115}
116
117/// <https://streams.spec.whatwg.org/#value-with-size>
118#[derive(Debug, JSTraceable, MallocSizeOf, PartialEq)]
119#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
120pub(crate) struct ValueWithSize {
121    /// <https://streams.spec.whatwg.org/#value-with-size-value>
122    #[ignore_malloc_size_of = "Heap is measured by mozjs"]
123    pub(crate) value: Box<Heap<JSVal>>,
124    /// <https://streams.spec.whatwg.org/#value-with-size-size>
125    pub(crate) size: f64,
126}
127
128/// <https://streams.spec.whatwg.org/#value-with-size>
129#[derive(Debug, JSTraceable, MallocSizeOf, PartialEq)]
130#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
131pub(crate) enum EnqueuedValue {
132    /// A value enqueued from Rust.
133    Native(Box<[u8]>),
134    /// A Js value.
135    Js(ValueWithSize),
136    /// <https://streams.spec.whatwg.org/#close-sentinel>
137    CloseSentinel,
138}
139
140impl EnqueuedValue {
141    fn size(&self) -> f64 {
142        match self {
143            EnqueuedValue::Native(v) => v.len() as f64,
144            EnqueuedValue::Js(v) => v.size,
145            // The size of the sentinel is zero,
146            // as per <https://streams.spec.whatwg.org/#ref-for-close-sentinel%E2%91%A0>
147            EnqueuedValue::CloseSentinel => 0.,
148        }
149    }
150
151    fn to_jsval(&self, cx: SafeJSContext, rval: MutableHandleValue, can_gc: CanGc) {
152        match self {
153            EnqueuedValue::Native(chunk) => {
154                rooted!(in(*cx) let mut array_buffer_ptr = ptr::null_mut::<JSObject>());
155                create_buffer_source::<Uint8>(cx, chunk, array_buffer_ptr.handle_mut(), can_gc)
156                    .expect("failed to create buffer source for native chunk.");
157                array_buffer_ptr.safe_to_jsval(cx, rval, can_gc);
158            },
159            EnqueuedValue::Js(value_with_size) => {
160                value_with_size.value.safe_to_jsval(cx, rval, can_gc)
161            },
162            EnqueuedValue::CloseSentinel => {
163                unreachable!("The close sentinel is never made available as a js val.")
164            },
165        }
166    }
167}
168
169/// <https://streams.spec.whatwg.org/#is-non-negative-number>
170fn is_non_negative_number(value: &EnqueuedValue) -> bool {
171    let value_with_size = match value {
172        EnqueuedValue::Native(_) => return true,
173        EnqueuedValue::Js(value_with_size) => value_with_size,
174        EnqueuedValue::CloseSentinel => return true,
175    };
176
177    // If v is not a Number, return false.
178    // Checked as part of the WebIDL.
179
180    // If v is NaN, return false.
181    if value_with_size.size.is_nan() {
182        return false;
183    }
184
185    // If v < 0, return false.
186    if value_with_size.size.is_sign_negative() {
187        return false;
188    }
189
190    true
191}
192
193/// <https://streams.spec.whatwg.org/#queue-with-sizes>
194#[derive(Default, JSTraceable, MallocSizeOf)]
195#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
196pub(crate) struct QueueWithSizes {
197    queue: RefCell<VecDeque<EnqueuedValue>>,
198    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-queuetotalsize>
199    pub(crate) total_size: Cell<f64>,
200}
201
202impl QueueWithSizes {
203    /// <https://streams.spec.whatwg.org/#dequeue-value>
204    /// A none `rval` means we're dequeing the close sentinel,
205    /// which should never be made available to script.
206    pub(crate) fn dequeue_value(
207        &self,
208        cx: SafeJSContext,
209        rval: Option<MutableHandleValue>,
210        can_gc: CanGc,
211    ) {
212        {
213            let queue = self.queue.borrow();
214            let Some(value) = queue.front() else {
215                unreachable!("Buffer cannot be empty when dequeue value is called into.");
216            };
217            self.total_size.set(self.total_size.get() - value.size());
218            if let Some(rval) = rval {
219                value.to_jsval(cx, rval, can_gc);
220            } else {
221                assert_eq!(value, &EnqueuedValue::CloseSentinel);
222            }
223        }
224        self.queue.borrow_mut().pop_front();
225    }
226
227    /// <https://streams.spec.whatwg.org/#enqueue-value-with-size>
228    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
229    pub(crate) fn enqueue_value_with_size(&self, value: EnqueuedValue) -> Result<(), Error> {
230        // If ! IsNonNegativeNumber(size) is false, throw a RangeError exception.
231        if !is_non_negative_number(&value) {
232            return Err(Error::Range(
233                "The size of the enqueued chunk is not a non-negative number.".to_string(),
234            ));
235        }
236
237        // If size is +∞, throw a RangeError exception.
238        if value.size().is_infinite() {
239            return Err(Error::Range(
240                "The size of the enqueued chunk is infinite.".to_string(),
241            ));
242        }
243
244        self.total_size.set(self.total_size.get() + value.size());
245        self.queue.borrow_mut().push_back(value);
246
247        Ok(())
248    }
249
250    pub(crate) fn is_empty(&self) -> bool {
251        self.queue.borrow().is_empty()
252    }
253
254    /// <https://streams.spec.whatwg.org/#peek-queue-value>
255    /// Returns whether value is the close sentinel.
256    pub(crate) fn peek_queue_value(
257        &self,
258        cx: SafeJSContext,
259        rval: MutableHandleValue,
260        can_gc: CanGc,
261    ) -> bool {
262        // Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
263        // Done with the QueueWithSizes type.
264
265        // Assert: container.[[queue]] is not empty.
266        assert!(!self.is_empty());
267
268        // Let valueWithSize be container.[[queue]][0].
269        let queue = self.queue.borrow();
270        let value_with_size = queue.front().expect("Queue is not empty.");
271        if let EnqueuedValue::CloseSentinel = value_with_size {
272            return true;
273        }
274
275        // Return valueWithSize’s value.
276        value_with_size.to_jsval(cx, rval, can_gc);
277        false
278    }
279
280    /// Only used with native sources.
281    fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
282        self.queue
283            .borrow()
284            .iter()
285            .try_fold(Vec::new(), |mut acc, value| match value {
286                EnqueuedValue::Native(chunk) => {
287                    acc.extend(chunk.iter().copied());
288                    Some(acc)
289                },
290                _ => {
291                    warn!("get_in_memory_bytes called on a controller with non-native source.");
292                    None
293                },
294            })
295    }
296
297    /// <https://streams.spec.whatwg.org/#reset-queue>
298    pub(crate) fn reset(&self) {
299        self.queue.borrow_mut().clear();
300        self.total_size.set(Default::default());
301    }
302}
303
304/// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller>
305#[dom_struct]
306pub(crate) struct ReadableStreamDefaultController {
307    reflector_: Reflector,
308
309    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-queue>
310    queue: QueueWithSizes,
311
312    /// A mutable reference to the underlying source is used to implement these two
313    /// internal slots:
314    ///
315    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pullalgorithm>
316    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-cancelalgorithm>
317    underlying_source: MutNullableDom<UnderlyingSourceContainer>,
318
319    stream: MutNullableDom<ReadableStream>,
320
321    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-strategyhwm>
322    strategy_hwm: f64,
323
324    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-strategysizealgorithm>
325    #[ignore_malloc_size_of = "mozjs"]
326    strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>,
327
328    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-closerequested>
329    close_requested: Cell<bool>,
330
331    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-started>
332    started: Cell<bool>,
333
334    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pulling>
335    pulling: Cell<bool>,
336
337    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pullagain>
338    pull_again: Cell<bool>,
339}
340
341impl ReadableStreamDefaultController {
342    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
343    fn new_inherited(
344        global: &GlobalScope,
345        underlying_source_type: UnderlyingSourceType,
346        strategy_hwm: f64,
347        strategy_size: Rc<QueuingStrategySize>,
348        can_gc: CanGc,
349    ) -> ReadableStreamDefaultController {
350        ReadableStreamDefaultController {
351            reflector_: Reflector::new(),
352            queue: Default::default(),
353            stream: MutNullableDom::new(None),
354            underlying_source: MutNullableDom::new(Some(&*UnderlyingSourceContainer::new(
355                global,
356                underlying_source_type,
357                can_gc,
358            ))),
359            strategy_hwm,
360            strategy_size: RefCell::new(Some(strategy_size)),
361            close_requested: Default::default(),
362            started: Default::default(),
363            pulling: Default::default(),
364            pull_again: Default::default(),
365        }
366    }
367
368    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
369    pub(crate) fn new(
370        global: &GlobalScope,
371        underlying_source: UnderlyingSourceType,
372        strategy_hwm: f64,
373        strategy_size: Rc<QueuingStrategySize>,
374        can_gc: CanGc,
375    ) -> DomRoot<ReadableStreamDefaultController> {
376        reflect_dom_object(
377            Box::new(ReadableStreamDefaultController::new_inherited(
378                global,
379                underlying_source,
380                strategy_hwm,
381                strategy_size,
382                can_gc,
383            )),
384            global,
385            can_gc,
386        )
387    }
388
389    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
390    pub(crate) fn setup(
391        &self,
392        stream: DomRoot<ReadableStream>,
393        can_gc: CanGc,
394    ) -> Result<(), Error> {
395        // Assert: stream.[[controller]] is undefined
396        stream.assert_no_controller();
397
398        // Set controller.[[stream]] to stream.
399        self.stream.set(Some(&stream));
400
401        let global = &*self.global();
402        let rooted_default_controller = DomRoot::from_ref(self);
403
404        // Perform ! ResetQueue(controller).
405        // Set controller.[[started]], controller.[[closeRequested]],
406        // controller.[[pullAgain]], and controller.[[pulling]] to false.
407        // Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm
408        // and controller.[[strategyHWM]] to highWaterMark.
409        // Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm
410        // and controller.[[strategyHWM]] to highWaterMark.
411        // Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
412
413        // Note: the above steps are done in `new`.
414
415        // Set stream.[[controller]] to controller.
416        stream.set_default_controller(&rooted_default_controller);
417
418        if let Some(underlying_source) = rooted_default_controller.underlying_source.get() {
419            // Let startResult be the result of performing startAlgorithm. (This might throw an exception.)
420            let start_result = underlying_source
421                .call_start_algorithm(
422                    Controller::ReadableStreamDefaultController(rooted_default_controller.clone()),
423                    can_gc,
424                )
425                .unwrap_or_else(|| {
426                    let promise = Promise::new(global, can_gc);
427                    promise.resolve_native(&(), can_gc);
428                    Ok(promise)
429                });
430
431            // Let startPromise be a promise resolved with startResult.
432            let start_promise = start_result?;
433
434            // Upon fulfillment of startPromise, Upon rejection of startPromise with reason r,
435            let handler = PromiseNativeHandler::new(
436                global,
437                Some(Box::new(StartAlgorithmFulfillmentHandler {
438                    controller: Dom::from_ref(&rooted_default_controller),
439                })),
440                Some(Box::new(StartAlgorithmRejectionHandler {
441                    controller: Dom::from_ref(&rooted_default_controller),
442                })),
443                can_gc,
444            );
445            let realm = enter_realm(global);
446            let comp = InRealm::Entered(&realm);
447            start_promise.append_native_handler(&handler, comp, can_gc);
448        };
449
450        Ok(())
451    }
452
453    /// Setting the JS object after the heap has settled down.
454    pub(crate) fn set_underlying_source_this_object(&self, this_object: HandleObject) {
455        if let Some(underlying_source) = self.underlying_source.get() {
456            underlying_source.set_underlying_source_this_object(this_object);
457        }
458    }
459
460    /// <https://streams.spec.whatwg.org/#dequeue-value>
461    fn dequeue_value(&self, cx: SafeJSContext, rval: MutableHandleValue, can_gc: CanGc) {
462        self.queue.dequeue_value(cx, Some(rval), can_gc);
463    }
464
465    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-should-call-pull>
466    fn should_call_pull(&self) -> bool {
467        // Let stream be controller.[[stream]].
468        // Note: the spec does not assert that stream is not undefined here,
469        // so we return false if it is.
470        let Some(stream) = self.stream.get() else {
471            debug!("`should_call_pull` called on a controller without a stream.");
472            return false;
473        };
474
475        // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return.
476        if !self.can_close_or_enqueue() {
477            return false;
478        }
479
480        // If controller.[[started]] is false, return false.
481        if !self.started.get() {
482            return false;
483        }
484
485        // If ! IsReadableStreamLocked(stream) is true
486        // and ! ReadableStreamGetNumReadRequests(stream) > 0, return true.
487        if stream.is_locked() && stream.get_num_read_requests() > 0 {
488            return true;
489        }
490
491        // Let desiredSize be ! ReadableStreamDefaultControllerGetDesiredSize(controller).
492        // Assert: desiredSize is not null.
493        let desired_size = self.get_desired_size().expect("desiredSize is not null.");
494
495        if desired_size > 0. {
496            return true;
497        }
498
499        false
500    }
501
502    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
503    fn call_pull_if_needed(&self, can_gc: CanGc) {
504        // Let shouldPull be ! ReadableStreamDefaultControllerShouldCallPull(controller).
505        // If shouldPull is false, return.
506        if !self.should_call_pull() {
507            return;
508        }
509
510        // If controller.[[pulling]] is true,
511        if self.pulling.get() {
512            // Set controller.[[pullAgain]] to true.
513            self.pull_again.set(true);
514
515            return;
516        }
517
518        // Set controller.[[pulling]] to true.
519        self.pulling.set(true);
520
521        // Let pullPromise be the result of performing controller.[[pullAlgorithm]].
522        // Continues into the resolve and reject handling of the native handler.
523        let global = self.global();
524        let rooted_default_controller = DomRoot::from_ref(self);
525        let controller =
526            Controller::ReadableStreamDefaultController(rooted_default_controller.clone());
527
528        let Some(underlying_source) = self.underlying_source.get() else {
529            return;
530        };
531        let handler = PromiseNativeHandler::new(
532            &global,
533            Some(Box::new(PullAlgorithmFulfillmentHandler {
534                controller: Dom::from_ref(&rooted_default_controller),
535            })),
536            Some(Box::new(PullAlgorithmRejectionHandler {
537                controller: Dom::from_ref(&rooted_default_controller),
538            })),
539            can_gc,
540        );
541
542        let realm = enter_realm(&*global);
543        let comp = InRealm::Entered(&realm);
544        let result = underlying_source
545            .call_pull_algorithm(controller, &global, can_gc)
546            .unwrap_or_else(|| {
547                let promise = Promise::new(&global, can_gc);
548                promise.resolve_native(&(), can_gc);
549                Ok(promise)
550            });
551        let promise = result.unwrap_or_else(|error| {
552            let cx = GlobalScope::get_cx();
553            rooted!(in(*cx) let mut rval = UndefinedValue());
554            // TODO: check if `self.global()` is the right globalscope.
555            error
556                .clone()
557                .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
558            let promise = Promise::new(&global, can_gc);
559            promise.reject_native(&rval.handle(), can_gc);
560            promise
561        });
562        promise.append_native_handler(&handler, comp, can_gc);
563    }
564
565    /// <https://streams.spec.whatwg.org/#rs-default-controller-private-cancel>
566    pub(crate) fn perform_cancel_steps(
567        &self,
568        cx: SafeJSContext,
569        global: &GlobalScope,
570        reason: SafeHandleValue,
571        can_gc: CanGc,
572    ) -> Rc<Promise> {
573        // Perform ! ResetQueue(this).
574        self.queue.reset();
575
576        let underlying_source = self
577            .underlying_source
578            .get()
579            .expect("Controller should have a source when the cancel steps are called into.");
580        // Let result be the result of performing this.[[cancelAlgorithm]], passing reason.
581        let result = underlying_source
582            .call_cancel_algorithm(cx, global, reason, can_gc)
583            .unwrap_or_else(|| {
584                let promise = Promise::new(global, can_gc);
585                promise.resolve_native(&(), can_gc);
586                Ok(promise)
587            });
588        let promise = result.unwrap_or_else(|error| {
589            rooted!(in(*cx) let mut rval = UndefinedValue());
590
591            error
592                .clone()
593                .to_jsval(cx, global, rval.handle_mut(), can_gc);
594            let promise = Promise::new(global, can_gc);
595            promise.reject_native(&rval.handle(), can_gc);
596            promise
597        });
598
599        // Perform ! ReadableStreamDefaultControllerClearAlgorithms(this).
600        self.clear_algorithms();
601
602        // Return result(the promise).
603        promise
604    }
605
606    /// <https://streams.spec.whatwg.org/#rs-default-controller-private-pull>
607    pub(crate) fn perform_pull_steps(&self, read_request: &ReadRequest, can_gc: CanGc) {
608        // Let stream be this.[[stream]].
609        // Note: the spec does not assert that there is a stream.
610        let Some(stream) = self.stream.get() else {
611            return;
612        };
613
614        // if queue contains bytes, perform chunk steps.
615        if !self.queue.is_empty() {
616            let cx = GlobalScope::get_cx();
617            rooted!(in(*cx) let mut rval = UndefinedValue());
618            let result = RootedTraceableBox::new(Heap::default());
619            self.dequeue_value(cx, rval.handle_mut(), can_gc);
620            result.set(*rval);
621
622            // If this.[[closeRequested]] is true and this.[[queue]] is empty
623            if self.close_requested.get() && self.queue.is_empty() {
624                // Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller).
625                self.clear_algorithms();
626
627                // Perform ! ReadableStreamClose(stream).
628                stream.close(can_gc);
629            } else {
630                // Otherwise, perform ! ReadableStreamDefaultControllerCallPullIfNeeded(this).
631                self.call_pull_if_needed(can_gc);
632            }
633            // Perform readRequest’s chunk steps, given chunk.
634            read_request.chunk_steps(result, can_gc);
635        } else {
636            // Perform ! ReadableStreamAddReadRequest(stream, readRequest).
637            stream.add_read_request(read_request);
638
639            // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(this).
640            self.call_pull_if_needed(can_gc);
641        }
642    }
643
644    /// <https://streams.spec.whatwg.org/#ref-for-abstract-opdef-readablestreamcontroller-releasesteps>
645    pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
646        // step 1 - Return.
647        Ok(())
648    }
649
650    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue>
651    #[allow(unsafe_code)]
652    pub(crate) fn enqueue(
653        &self,
654        cx: SafeJSContext,
655        chunk: SafeHandleValue,
656        can_gc: CanGc,
657    ) -> Result<(), Error> {
658        // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return.
659        if !self.can_close_or_enqueue() {
660            return Ok(());
661        }
662
663        let stream = self
664            .stream
665            .get()
666            .expect("Controller must have a stream when a chunk is enqueued.");
667
668        // If ! IsReadableStreamLocked(stream) is true
669        // and ! ReadableStreamGetNumReadRequests(stream) > 0,
670        // perform ! ReadableStreamFulfillReadRequest(stream, chunk, false).
671        if stream.is_locked() && stream.get_num_read_requests() > 0 {
672            stream.fulfill_read_request(chunk, false, can_gc);
673        } else {
674            // Otherwise,
675            // Let result be the result of performing controller.[[strategySizeAlgorithm]],
676            // passing in chunk, and interpreting the result as a completion record.
677            // Note: the clone is necessary to prevent potential re-borrow panics.
678            let strategy_size = {
679                let reference = self.strategy_size.borrow();
680                reference.clone()
681            };
682            let size = if let Some(strategy_size) = strategy_size {
683                // Note: the Rethrow exception handling is necessary,
684                // otherwise returning JSFailed will panic because no exception is pending.
685                let result = strategy_size.Call__(chunk, ExceptionHandling::Rethrow, can_gc);
686                match result {
687                    // Let chunkSize be result.[[Value]].
688                    Ok(size) => size,
689                    Err(error) => {
690                        // If result is an abrupt completion,
691                        rooted!(in(*cx) let mut rval = UndefinedValue());
692                        unsafe { assert!(JS_GetPendingException(*cx, rval.handle_mut())) };
693
694                        // Perform ! ReadableStreamDefaultControllerError(controller, result.[[Value]]).
695                        self.error(rval.handle(), can_gc);
696
697                        // Return result.
698                        // Note: we need to return a type error, because no exception is pending.
699                        return Err(error);
700                    },
701                }
702            } else {
703                0.
704            };
705
706            {
707                // Let enqueueResult be EnqueueValueWithSize(controller, chunk, chunkSize).
708                let res = self
709                    .queue
710                    .enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize {
711                        value: Heap::boxed(chunk.get()),
712                        size,
713                    }));
714                if let Err(error) = res {
715                    // If enqueueResult is an abrupt completion,
716
717                    // First, throw the exception.
718                    // Note: this must be done manually here,
719                    // because `enqueue_value_with_size` does not call into JS.
720                    throw_dom_exception(cx, &self.global(), error, can_gc);
721
722                    // Then, get a handle to the JS val for the exception,
723                    // and use that to error the stream.
724                    rooted!(in(*cx) let mut rval = UndefinedValue());
725                    unsafe { assert!(JS_GetPendingException(*cx, rval.handle_mut())) };
726
727                    // Perform ! ReadableStreamDefaultControllerError(controller, enqueueResult.[[Value]]).
728                    self.error(rval.handle(), can_gc);
729
730                    // Return enqueueResult.
731                    // Note: because we threw the exception above,
732                    // there is a pending exception and we can return JSFailed.
733                    return Err(Error::JSFailed);
734                }
735            }
736        }
737
738        // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
739        self.call_pull_if_needed(can_gc);
740
741        Ok(())
742    }
743
744    /// Native call to
745    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue>
746    pub(crate) fn enqueue_native(&self, chunk: Vec<u8>, can_gc: CanGc) {
747        let stream = self
748            .stream
749            .get()
750            .expect("Controller must have a stream when a chunk is enqueued.");
751        if stream.is_locked() && stream.get_num_read_requests() > 0 {
752            let cx = GlobalScope::get_cx();
753            rooted!(in(*cx) let mut rval = UndefinedValue());
754            EnqueuedValue::Native(chunk.into_boxed_slice()).to_jsval(cx, rval.handle_mut(), can_gc);
755            stream.fulfill_read_request(rval.handle(), false, can_gc);
756        } else {
757            self.queue
758                .enqueue_value_with_size(EnqueuedValue::Native(chunk.into_boxed_slice()))
759                .expect("Enqueuing a chunk from Rust should not fail.");
760        }
761    }
762
763    /// Does the stream have all data in memory?
764    pub(crate) fn in_memory(&self) -> bool {
765        let Some(underlying_source) = self.underlying_source.get() else {
766            return false;
767        };
768        underlying_source.in_memory()
769    }
770
771    /// Return bytes synchronously if the stream has all data in memory.
772    pub(crate) fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
773        let underlying_source = self.underlying_source.get()?;
774        if underlying_source.in_memory() {
775            return self.queue.get_in_memory_bytes();
776        }
777        None
778    }
779
780    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-clear-algorithms>
781    fn clear_algorithms(&self) {
782        // Set controller.[[pullAlgorithm]] to undefined.
783        // Set controller.[[cancelAlgorithm]] to undefined.
784        self.underlying_source.set(None);
785
786        // Set controller.[[strategySizeAlgorithm]] to undefined.
787        *self.strategy_size.borrow_mut() = None;
788    }
789
790    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-close>
791    pub(crate) fn close(&self, can_gc: CanGc) {
792        // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return.
793        if !self.can_close_or_enqueue() {
794            return;
795        }
796
797        let Some(stream) = self.stream.get() else {
798            return;
799        };
800
801        // Set controller.[[closeRequested]] to true.
802        self.close_requested.set(true);
803
804        if self.queue.is_empty() {
805            // Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller).
806            self.clear_algorithms();
807
808            // Perform ! ReadableStreamClose(stream).
809            stream.close(can_gc);
810        }
811    }
812
813    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-get-desired-size>
814    pub(crate) fn get_desired_size(&self) -> Option<f64> {
815        let stream = self.stream.get()?;
816
817        // If state is "errored", return null.
818        if stream.is_errored() {
819            return None;
820        }
821
822        // If state is "closed", return 0.
823        if stream.is_closed() {
824            return Some(0.0);
825        }
826
827        // Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
828        let desired_size = self.strategy_hwm - self.queue.total_size.get().clamp(0.0, f64::MAX);
829        Some(desired_size.clamp(desired_size, self.strategy_hwm))
830    }
831
832    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-can-close-or-enqueue>
833    pub(crate) fn can_close_or_enqueue(&self) -> bool {
834        let Some(stream) = self.stream.get() else {
835            return false;
836        };
837
838        // If controller.[[closeRequested]] is false and state is "readable", return true.
839        if !self.close_requested.get() && stream.is_readable() {
840            return true;
841        }
842
843        // Otherwise, return false.
844        false
845    }
846
847    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-error>
848    pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
849        let Some(stream) = self.stream.get() else {
850            return;
851        };
852
853        // If stream.[[state]] is not "readable", return.
854        if !stream.is_readable() {
855            return;
856        }
857
858        // Perform ! ResetQueue(controller).
859        self.queue.reset();
860
861        // Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller).
862        self.clear_algorithms();
863
864        stream.error(e, can_gc);
865    }
866
867    /// <https://streams.spec.whatwg.org/#rs-default-controller-has-backpressure>
868    pub(crate) fn has_backpressure(&self) -> bool {
869        // If ! ReadableStreamDefaultControllerShouldCallPull(controller) is true, return false.
870        // Otherwise, return true.
871        !self.should_call_pull()
872    }
873}
874
875impl ReadableStreamDefaultControllerMethods<crate::DomTypeHolder>
876    for ReadableStreamDefaultController
877{
878    /// <https://streams.spec.whatwg.org/#rs-default-controller-desired-size>
879    fn GetDesiredSize(&self) -> Option<f64> {
880        self.get_desired_size()
881    }
882
883    /// <https://streams.spec.whatwg.org/#rs-default-controller-close>
884    fn Close(&self, can_gc: CanGc) -> Fallible<()> {
885        if !self.can_close_or_enqueue() {
886            // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(this) is false,
887            // throw a TypeError exception.
888            return Err(Error::Type("Stream cannot be closed.".to_string()));
889        }
890
891        // Perform ! ReadableStreamDefaultControllerClose(this).
892        self.close(can_gc);
893
894        Ok(())
895    }
896
897    /// <https://streams.spec.whatwg.org/#rs-default-controller-enqueue>
898    fn Enqueue(&self, cx: SafeJSContext, chunk: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
899        // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(this) is false, throw a TypeError exception.
900        if !self.can_close_or_enqueue() {
901            return Err(Error::Type("Stream cannot be enqueued to.".to_string()));
902        }
903
904        // Perform ? ReadableStreamDefaultControllerEnqueue(this, chunk).
905        self.enqueue(cx, chunk, can_gc)
906    }
907
908    /// <https://streams.spec.whatwg.org/#rs-default-controller-error>
909    fn Error(&self, _cx: SafeJSContext, e: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
910        self.error(e, can_gc);
911        Ok(())
912    }
913}