script/dom/stream/
readablestreamdefaultcontroller.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::jsapi::{Heap, JSObject};
12use js::jsval::{JSVal, UndefinedValue};
13use js::realm::CurrentRealm;
14use js::rust::wrappers2::JS_GetPendingException;
15use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue, MutableHandleValue};
16use js::typedarray::Uint8;
17use script_bindings::conversions::SafeToJSValConvertible;
18
19use crate::dom::bindings::buffer_source::create_buffer_source;
20use crate::dom::bindings::callback::ExceptionHandling;
21use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
22use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultControllerMethods;
23use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
24use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible, throw_dom_exception};
25use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
26use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
27use crate::dom::bindings::trace::RootedTraceableBox;
28use crate::dom::globalscope::GlobalScope;
29use crate::dom::promise::Promise;
30use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
31use crate::dom::stream::readablestream::ReadableStream;
32use crate::dom::stream::readablestreamdefaultreader::ReadRequest;
33use crate::dom::stream::underlyingsourcecontainer::{
34    UnderlyingSourceContainer, UnderlyingSourceType,
35};
36use crate::realms::{InRealm, enter_auto_realm, enter_realm};
37use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
38
39/// The fulfillment handler for
40/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
41#[derive(Clone, JSTraceable, MallocSizeOf)]
42#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
43struct PullAlgorithmFulfillmentHandler {
44    controller: Dom<ReadableStreamDefaultController>,
45}
46
47impl Callback for PullAlgorithmFulfillmentHandler {
48    /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
49    /// Upon fulfillment of pullPromise
50    fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
51        let can_gc = CanGc::from_cx(cx);
52        // Set controller.[[pulling]] to false.
53        self.controller.pulling.set(false);
54
55        // If controller.[[pullAgain]] is true,
56        if self.controller.pull_again.get() {
57            // Set controller.[[pullAgain]] to false.
58            self.controller.pull_again.set(false);
59
60            // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
61            self.controller.call_pull_if_needed(can_gc);
62        }
63    }
64}
65
66/// The rejection handler for
67/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
68#[derive(Clone, JSTraceable, MallocSizeOf)]
69#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
70struct PullAlgorithmRejectionHandler {
71    controller: Dom<ReadableStreamDefaultController>,
72}
73
74impl Callback for PullAlgorithmRejectionHandler {
75    /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
76    /// Upon rejection of pullPromise with reason e.
77    fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
78        let can_gc = CanGc::from_cx(cx);
79        // Perform ! ReadableStreamDefaultControllerError(controller, e).
80        self.controller.error(v, can_gc);
81    }
82}
83
84/// The fulfillment handler for
85/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
86#[derive(Clone, JSTraceable, MallocSizeOf)]
87#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
88struct StartAlgorithmFulfillmentHandler {
89    controller: Dom<ReadableStreamDefaultController>,
90}
91
92impl Callback for StartAlgorithmFulfillmentHandler {
93    /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
94    /// Upon fulfillment of startPromise,
95    fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
96        let can_gc = CanGc::from_cx(cx);
97        // Set controller.[[started]] to true.
98        self.controller.started.set(true);
99
100        // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
101        self.controller.call_pull_if_needed(can_gc);
102    }
103}
104
105/// The rejection handler for
106/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
107#[derive(Clone, JSTraceable, MallocSizeOf)]
108#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
109struct StartAlgorithmRejectionHandler {
110    controller: Dom<ReadableStreamDefaultController>,
111}
112
113impl Callback for StartAlgorithmRejectionHandler {
114    /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
115    /// Upon rejection of startPromise with reason r,
116    fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
117        let can_gc = CanGc::from_cx(cx);
118        // Perform ! ReadableStreamDefaultControllerError(controller, r).
119        self.controller.error(v, can_gc);
120    }
121}
122
123/// <https://streams.spec.whatwg.org/#value-with-size>
124#[derive(Debug, JSTraceable, MallocSizeOf, PartialEq)]
125#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
126pub(crate) struct ValueWithSize {
127    /// <https://streams.spec.whatwg.org/#value-with-size-value>
128    #[ignore_malloc_size_of = "Heap is measured by mozjs"]
129    pub(crate) value: Box<Heap<JSVal>>,
130    /// <https://streams.spec.whatwg.org/#value-with-size-size>
131    pub(crate) size: f64,
132}
133
134/// <https://streams.spec.whatwg.org/#value-with-size>
135#[derive(Debug, JSTraceable, MallocSizeOf, PartialEq)]
136#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
137pub(crate) enum EnqueuedValue {
138    /// A value enqueued from Rust.
139    Native(Box<[u8]>),
140    /// A Js value.
141    Js(ValueWithSize),
142    /// <https://streams.spec.whatwg.org/#close-sentinel>
143    CloseSentinel,
144}
145
146impl EnqueuedValue {
147    fn size(&self) -> f64 {
148        match self {
149            EnqueuedValue::Native(v) => v.len() as f64,
150            EnqueuedValue::Js(v) => v.size,
151            // The size of the sentinel is zero,
152            // as per <https://streams.spec.whatwg.org/#ref-for-close-sentinel%E2%91%A0>
153            EnqueuedValue::CloseSentinel => 0.,
154        }
155    }
156
157    fn to_jsval(&self, cx: SafeJSContext, rval: MutableHandleValue, can_gc: CanGc) {
158        match self {
159            EnqueuedValue::Native(chunk) => {
160                rooted!(in(*cx) let mut array_buffer_ptr = ptr::null_mut::<JSObject>());
161                create_buffer_source::<Uint8>(cx, chunk, array_buffer_ptr.handle_mut(), can_gc)
162                    .expect("failed to create buffer source for native chunk.");
163                array_buffer_ptr.safe_to_jsval(cx, rval, can_gc);
164            },
165            EnqueuedValue::Js(value_with_size) => {
166                value_with_size.value.safe_to_jsval(cx, rval, can_gc)
167            },
168            EnqueuedValue::CloseSentinel => {
169                unreachable!("The close sentinel is never made available as a js val.")
170            },
171        }
172    }
173}
174
175/// <https://streams.spec.whatwg.org/#is-non-negative-number>
176fn is_non_negative_number(value: &EnqueuedValue) -> bool {
177    let value_with_size = match value {
178        EnqueuedValue::Native(_) => return true,
179        EnqueuedValue::Js(value_with_size) => value_with_size,
180        EnqueuedValue::CloseSentinel => return true,
181    };
182
183    // If v is not a Number, return false.
184    // Checked as part of the WebIDL.
185
186    // If v is NaN, return false.
187    if value_with_size.size.is_nan() {
188        return false;
189    }
190
191    // If v < 0, return false.
192    if value_with_size.size.is_sign_negative() {
193        return false;
194    }
195
196    true
197}
198
199/// <https://streams.spec.whatwg.org/#queue-with-sizes>
200#[derive(Default, JSTraceable, MallocSizeOf)]
201#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
202pub(crate) struct QueueWithSizes {
203    queue: RefCell<VecDeque<EnqueuedValue>>,
204    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-queuetotalsize>
205    pub(crate) total_size: Cell<f64>,
206}
207
208impl QueueWithSizes {
209    /// <https://streams.spec.whatwg.org/#dequeue-value>
210    /// A none `rval` means we're dequeing the close sentinel,
211    /// which should never be made available to script.
212    pub(crate) fn dequeue_value(
213        &self,
214        cx: SafeJSContext,
215        rval: Option<MutableHandleValue>,
216        can_gc: CanGc,
217    ) {
218        {
219            let queue = self.queue.borrow();
220            let Some(value) = queue.front() else {
221                unreachable!("Buffer cannot be empty when dequeue value is called into.");
222            };
223            self.total_size.set(self.total_size.get() - value.size());
224            if let Some(rval) = rval {
225                value.to_jsval(cx, rval, can_gc);
226            } else {
227                assert_eq!(value, &EnqueuedValue::CloseSentinel);
228            }
229        }
230        self.queue.borrow_mut().pop_front();
231    }
232
233    /// <https://streams.spec.whatwg.org/#enqueue-value-with-size>
234    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
235    pub(crate) fn enqueue_value_with_size(&self, value: EnqueuedValue) -> Result<(), Error> {
236        // If ! IsNonNegativeNumber(size) is false, throw a RangeError exception.
237        if !is_non_negative_number(&value) {
238            return Err(Error::Range(
239                c"The size of the enqueued chunk is not a non-negative number.".to_owned(),
240            ));
241        }
242
243        // If size is +∞, throw a RangeError exception.
244        if value.size().is_infinite() {
245            return Err(Error::Range(
246                c"The size of the enqueued chunk is infinite.".to_owned(),
247            ));
248        }
249
250        self.total_size.set(self.total_size.get() + value.size());
251        self.queue.borrow_mut().push_back(value);
252
253        Ok(())
254    }
255
256    pub(crate) fn is_empty(&self) -> bool {
257        self.queue.borrow().is_empty()
258    }
259
260    /// <https://streams.spec.whatwg.org/#peek-queue-value>
261    /// Returns whether value is the close sentinel.
262    pub(crate) fn peek_queue_value(
263        &self,
264        cx: SafeJSContext,
265        rval: MutableHandleValue,
266        can_gc: CanGc,
267    ) -> bool {
268        // Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
269        // Done with the QueueWithSizes type.
270
271        // Assert: container.[[queue]] is not empty.
272        assert!(!self.is_empty());
273
274        // Let valueWithSize be container.[[queue]][0].
275        let queue = self.queue.borrow();
276        let value_with_size = queue.front().expect("Queue is not empty.");
277        if let EnqueuedValue::CloseSentinel = value_with_size {
278            return true;
279        }
280
281        // Return valueWithSize’s value.
282        value_with_size.to_jsval(cx, rval, can_gc);
283        false
284    }
285
286    /// Only used with native sources.
287    fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
288        self.queue
289            .borrow()
290            .iter()
291            .try_fold(Vec::new(), |mut acc, value| match value {
292                EnqueuedValue::Native(chunk) => {
293                    acc.extend(chunk.iter().copied());
294                    Some(acc)
295                },
296                _ => {
297                    warn!("get_in_memory_bytes called on a controller with non-native source.");
298                    None
299                },
300            })
301    }
302
303    /// <https://streams.spec.whatwg.org/#reset-queue>
304    pub(crate) fn reset(&self) {
305        self.queue.borrow_mut().clear();
306        self.total_size.set(Default::default());
307    }
308}
309
310/// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller>
311#[dom_struct]
312pub(crate) struct ReadableStreamDefaultController {
313    reflector_: Reflector,
314
315    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-queue>
316    queue: QueueWithSizes,
317
318    /// A mutable reference to the underlying source is used to implement these two
319    /// internal slots:
320    ///
321    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pullalgorithm>
322    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-cancelalgorithm>
323    underlying_source: MutNullableDom<UnderlyingSourceContainer>,
324
325    stream: MutNullableDom<ReadableStream>,
326
327    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-strategyhwm>
328    strategy_hwm: f64,
329
330    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-strategysizealgorithm>
331    #[ignore_malloc_size_of = "mozjs"]
332    strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>,
333
334    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-closerequested>
335    close_requested: Cell<bool>,
336
337    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-started>
338    started: Cell<bool>,
339
340    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pulling>
341    pulling: Cell<bool>,
342
343    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pullagain>
344    pull_again: Cell<bool>,
345}
346
347impl ReadableStreamDefaultController {
348    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
349    fn new_inherited(
350        global: &GlobalScope,
351        underlying_source_type: UnderlyingSourceType,
352        strategy_hwm: f64,
353        strategy_size: Rc<QueuingStrategySize>,
354        can_gc: CanGc,
355    ) -> ReadableStreamDefaultController {
356        ReadableStreamDefaultController {
357            reflector_: Reflector::new(),
358            queue: Default::default(),
359            stream: MutNullableDom::new(None),
360            underlying_source: MutNullableDom::new(Some(&*UnderlyingSourceContainer::new(
361                global,
362                underlying_source_type,
363                can_gc,
364            ))),
365            strategy_hwm,
366            strategy_size: RefCell::new(Some(strategy_size)),
367            close_requested: Default::default(),
368            started: Default::default(),
369            pulling: Default::default(),
370            pull_again: Default::default(),
371        }
372    }
373
374    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
375    pub(crate) fn new(
376        global: &GlobalScope,
377        underlying_source: UnderlyingSourceType,
378        strategy_hwm: f64,
379        strategy_size: Rc<QueuingStrategySize>,
380        can_gc: CanGc,
381    ) -> DomRoot<ReadableStreamDefaultController> {
382        reflect_dom_object(
383            Box::new(ReadableStreamDefaultController::new_inherited(
384                global,
385                underlying_source,
386                strategy_hwm,
387                strategy_size,
388                can_gc,
389            )),
390            global,
391            can_gc,
392        )
393    }
394
395    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
396    pub(crate) fn setup(
397        &self,
398        cx: &mut js::context::JSContext,
399        stream: DomRoot<ReadableStream>,
400    ) -> Result<(), Error> {
401        // Assert: stream.[[controller]] is undefined
402        stream.assert_no_controller();
403
404        // Set controller.[[stream]] to stream.
405        self.stream.set(Some(&stream));
406
407        let global = &*self.global();
408        let rooted_default_controller = DomRoot::from_ref(self);
409
410        // Perform ! ResetQueue(controller).
411        // Set controller.[[started]], controller.[[closeRequested]],
412        // controller.[[pullAgain]], and controller.[[pulling]] to false.
413        // Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm
414        // and controller.[[strategyHWM]] to highWaterMark.
415        // Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm
416        // and controller.[[strategyHWM]] to highWaterMark.
417        // Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
418
419        // Note: the above steps are done in `new`.
420
421        // Set stream.[[controller]] to controller.
422        stream.set_default_controller(&rooted_default_controller);
423
424        if let Some(underlying_source) = rooted_default_controller.underlying_source.get() {
425            // Let startResult be the result of performing startAlgorithm. (This might throw an exception.)
426            let start_result = underlying_source
427                .call_start_algorithm(
428                    cx,
429                    Controller::ReadableStreamDefaultController(rooted_default_controller.clone()),
430                )
431                .unwrap_or_else(|| {
432                    let promise = Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
433                    Ok(promise)
434                });
435
436            // Let startPromise be a promise resolved with startResult.
437            let start_promise = start_result?;
438
439            // Upon fulfillment of startPromise, Upon rejection of startPromise with reason r,
440            let handler = PromiseNativeHandler::new(
441                global,
442                Some(Box::new(StartAlgorithmFulfillmentHandler {
443                    controller: Dom::from_ref(&rooted_default_controller),
444                })),
445                Some(Box::new(StartAlgorithmRejectionHandler {
446                    controller: Dom::from_ref(&rooted_default_controller),
447                })),
448                CanGc::from_cx(cx),
449            );
450            let mut realm = enter_auto_realm(cx, global);
451            let cx = &mut realm.current_realm();
452            let in_realm_proof = cx.into();
453            let comp = InRealm::Already(&in_realm_proof);
454            start_promise.append_native_handler(&handler, comp, CanGc::from_cx(cx));
455        };
456
457        Ok(())
458    }
459
460    /// Setting the JS object after the heap has settled down.
461    pub(crate) fn set_underlying_source_this_object(&self, this_object: HandleObject) {
462        if let Some(underlying_source) = self.underlying_source.get() {
463            underlying_source.set_underlying_source_this_object(this_object);
464        }
465    }
466
467    /// <https://streams.spec.whatwg.org/#dequeue-value>
468    fn dequeue_value(&self, cx: SafeJSContext, rval: MutableHandleValue, can_gc: CanGc) {
469        self.queue.dequeue_value(cx, Some(rval), can_gc);
470    }
471
472    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-should-call-pull>
473    fn should_call_pull(&self) -> bool {
474        // Let stream be controller.[[stream]].
475        // Note: the spec does not assert that stream is not undefined here,
476        // so we return false if it is.
477        let Some(stream) = self.stream.get() else {
478            debug!("`should_call_pull` called on a controller without a stream.");
479            return false;
480        };
481
482        // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return.
483        if !self.can_close_or_enqueue() {
484            return false;
485        }
486
487        // If controller.[[started]] is false, return false.
488        if !self.started.get() {
489            return false;
490        }
491
492        // If ! IsReadableStreamLocked(stream) is true
493        // and ! ReadableStreamGetNumReadRequests(stream) > 0, return true.
494        if stream.is_locked() && stream.get_num_read_requests() > 0 {
495            return true;
496        }
497
498        // Let desiredSize be ! ReadableStreamDefaultControllerGetDesiredSize(controller).
499        // Assert: desiredSize is not null.
500        let desired_size = self.get_desired_size().expect("desiredSize is not null.");
501
502        if desired_size > 0. {
503            return true;
504        }
505
506        false
507    }
508
509    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
510    fn call_pull_if_needed(&self, can_gc: CanGc) {
511        // Let shouldPull be ! ReadableStreamDefaultControllerShouldCallPull(controller).
512        // If shouldPull is false, return.
513        if !self.should_call_pull() {
514            return;
515        }
516
517        // If controller.[[pulling]] is true,
518        if self.pulling.get() {
519            // Set controller.[[pullAgain]] to true.
520            self.pull_again.set(true);
521
522            return;
523        }
524
525        // Set controller.[[pulling]] to true.
526        self.pulling.set(true);
527
528        // Let pullPromise be the result of performing controller.[[pullAlgorithm]].
529        // Continues into the resolve and reject handling of the native handler.
530        let global = self.global();
531        let rooted_default_controller = DomRoot::from_ref(self);
532        let controller =
533            Controller::ReadableStreamDefaultController(rooted_default_controller.clone());
534
535        let Some(underlying_source) = self.underlying_source.get() else {
536            return;
537        };
538        let handler = PromiseNativeHandler::new(
539            &global,
540            Some(Box::new(PullAlgorithmFulfillmentHandler {
541                controller: Dom::from_ref(&rooted_default_controller),
542            })),
543            Some(Box::new(PullAlgorithmRejectionHandler {
544                controller: Dom::from_ref(&rooted_default_controller),
545            })),
546            can_gc,
547        );
548
549        let realm = enter_realm(&*global);
550        let comp = InRealm::Entered(&realm);
551        let result = underlying_source
552            .call_pull_algorithm(controller, can_gc)
553            .unwrap_or_else(|| {
554                let promise = Promise::new(&global, can_gc);
555                promise.resolve_native(&(), can_gc);
556                Ok(promise)
557            });
558        let promise = result.unwrap_or_else(|error| {
559            let cx = GlobalScope::get_cx();
560            rooted!(in(*cx) let mut rval = UndefinedValue());
561            // TODO: check if `self.global()` is the right globalscope.
562            error.to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
563            let promise = Promise::new(&global, can_gc);
564            promise.reject_native(&rval.handle(), can_gc);
565            promise
566        });
567        promise.append_native_handler(&handler, comp, can_gc);
568    }
569
570    /// <https://streams.spec.whatwg.org/#rs-default-controller-private-cancel>
571    pub(crate) fn perform_cancel_steps(
572        &self,
573        cx: &mut js::context::JSContext,
574        global: &GlobalScope,
575        reason: SafeHandleValue,
576    ) -> Rc<Promise> {
577        // Perform ! ResetQueue(this).
578        self.queue.reset();
579
580        let underlying_source = self
581            .underlying_source
582            .get()
583            .expect("Controller should have a source when the cancel steps are called into.");
584        // Let result be the result of performing this.[[cancelAlgorithm]], passing reason.
585        let result = underlying_source
586            .call_cancel_algorithm(cx, global, reason)
587            .unwrap_or_else(|| {
588                let promise = Promise::new2(cx, global);
589                promise.resolve_native(&(), CanGc::from_cx(cx));
590                Ok(promise)
591            });
592        let promise = result.unwrap_or_else(|error| {
593            rooted!(&in(cx) let mut rval = UndefinedValue());
594
595            error.to_jsval(cx.into(), global, rval.handle_mut(), CanGc::from_cx(cx));
596            let promise = Promise::new2(cx, global);
597            promise.reject_native(&rval.handle(), CanGc::from_cx(cx));
598            promise
599        });
600
601        // Perform ! ReadableStreamDefaultControllerClearAlgorithms(this).
602        self.clear_algorithms();
603
604        // Return result(the promise).
605        promise
606    }
607
608    /// <https://streams.spec.whatwg.org/#rs-default-controller-private-pull>
609    pub(crate) fn perform_pull_steps(&self, read_request: &ReadRequest, can_gc: CanGc) {
610        // Let stream be this.[[stream]].
611        // Note: the spec does not assert that there is a stream.
612        let Some(stream) = self.stream.get() else {
613            return;
614        };
615
616        // if queue contains bytes, perform chunk steps.
617        if !self.queue.is_empty() {
618            let cx = GlobalScope::get_cx();
619            rooted!(in(*cx) let mut rval = UndefinedValue());
620            let result = RootedTraceableBox::new(Heap::default());
621            self.dequeue_value(cx, rval.handle_mut(), can_gc);
622            result.set(*rval);
623
624            // If this.[[closeRequested]] is true and this.[[queue]] is empty
625            if self.close_requested.get() && self.queue.is_empty() {
626                // Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller).
627                self.clear_algorithms();
628
629                // Perform ! ReadableStreamClose(stream).
630                stream.close(can_gc);
631            } else {
632                // Otherwise, perform ! ReadableStreamDefaultControllerCallPullIfNeeded(this).
633                self.call_pull_if_needed(can_gc);
634            }
635            // Perform readRequest’s chunk steps, given chunk.
636            read_request.chunk_steps(result, &self.global(), can_gc);
637        } else {
638            // Perform ! ReadableStreamAddReadRequest(stream, readRequest).
639            stream.add_read_request(read_request);
640
641            // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(this).
642            self.call_pull_if_needed(can_gc);
643        }
644    }
645
646    /// <https://streams.spec.whatwg.org/#ref-for-abstract-opdef-readablestreamcontroller-releasesteps>
647    pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
648        // step 1 - Return.
649        Ok(())
650    }
651
652    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue>
653    #[expect(unsafe_code)]
654    pub(crate) fn enqueue(
655        &self,
656        cx: &mut js::context::JSContext,
657        chunk: SafeHandleValue,
658    ) -> Result<(), Error> {
659        // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return.
660        if !self.can_close_or_enqueue() {
661            return Ok(());
662        }
663
664        let stream = self
665            .stream
666            .get()
667            .expect("Controller must have a stream when a chunk is enqueued.");
668
669        // If ! IsReadableStreamLocked(stream) is true
670        // and ! ReadableStreamGetNumReadRequests(stream) > 0,
671        // perform ! ReadableStreamFulfillReadRequest(stream, chunk, false).
672        if stream.is_locked() && stream.get_num_read_requests() > 0 {
673            stream.fulfill_read_request(chunk, false, CanGc::from_cx(cx));
674        } else {
675            // Otherwise,
676            // Let result be the result of performing controller.[[strategySizeAlgorithm]],
677            // passing in chunk, and interpreting the result as a completion record.
678            // Note: the clone is necessary to prevent potential re-borrow panics.
679            let strategy_size = {
680                let reference = self.strategy_size.borrow();
681                reference.clone()
682            };
683            let size = if let Some(strategy_size) = strategy_size {
684                // Note: the Rethrow exception handling is necessary,
685                // otherwise returning JSFailed will panic because no exception is pending.
686                let result =
687                    strategy_size.Call__(chunk, ExceptionHandling::Rethrow, CanGc::from_cx(cx));
688                match result {
689                    // Let chunkSize be result.[[Value]].
690                    Ok(size) => size,
691                    Err(error) => {
692                        // If result is an abrupt completion,
693                        rooted!(&in(cx) let mut rval = UndefinedValue());
694                        unsafe { assert!(JS_GetPendingException(cx, rval.handle_mut())) };
695
696                        // Perform ! ReadableStreamDefaultControllerError(controller, result.[[Value]]).
697                        self.error(rval.handle(), CanGc::from_cx(cx));
698
699                        // Return result.
700                        // Note: we need to return a type error, because no exception is pending.
701                        return Err(error);
702                    },
703                }
704            } else {
705                0.
706            };
707
708            {
709                // Let enqueueResult be EnqueueValueWithSize(controller, chunk, chunkSize).
710                let res = self
711                    .queue
712                    .enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize {
713                        value: Heap::boxed(chunk.get()),
714                        size,
715                    }));
716                if let Err(error) = res {
717                    // If enqueueResult is an abrupt completion,
718
719                    // First, throw the exception.
720                    // Note: this must be done manually here,
721                    // because `enqueue_value_with_size` does not call into JS.
722                    throw_dom_exception(cx.into(), &self.global(), error, CanGc::from_cx(cx));
723
724                    // Then, get a handle to the JS val for the exception,
725                    // and use that to error the stream.
726                    rooted!(&in(cx) let mut rval = UndefinedValue());
727                    unsafe { assert!(JS_GetPendingException(cx, rval.handle_mut())) };
728
729                    // Perform ! ReadableStreamDefaultControllerError(controller, enqueueResult.[[Value]]).
730                    self.error(rval.handle(), CanGc::from_cx(cx));
731
732                    // Return enqueueResult.
733                    // Note: because we threw the exception above,
734                    // there is a pending exception and we can return JSFailed.
735                    return Err(Error::JSFailed);
736                }
737            }
738        }
739
740        // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
741        self.call_pull_if_needed(CanGc::from_cx(cx));
742
743        Ok(())
744    }
745
746    /// Native call to
747    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue>
748    pub(crate) fn enqueue_native(&self, chunk: Vec<u8>, can_gc: CanGc) {
749        let stream = self
750            .stream
751            .get()
752            .expect("Controller must have a stream when a chunk is enqueued.");
753        if stream.is_locked() && stream.get_num_read_requests() > 0 {
754            let cx = GlobalScope::get_cx();
755            rooted!(in(*cx) let mut rval = UndefinedValue());
756            EnqueuedValue::Native(chunk.into_boxed_slice()).to_jsval(cx, rval.handle_mut(), can_gc);
757            stream.fulfill_read_request(rval.handle(), false, can_gc);
758        } else {
759            self.queue
760                .enqueue_value_with_size(EnqueuedValue::Native(chunk.into_boxed_slice()))
761                .expect("Enqueuing a chunk from Rust should not fail.");
762        }
763    }
764
765    /// Does the stream have all data in memory?
766    pub(crate) fn in_memory(&self) -> bool {
767        let Some(underlying_source) = self.underlying_source.get() else {
768            return false;
769        };
770        underlying_source.in_memory()
771    }
772
773    /// Return bytes synchronously if the stream has all data in memory.
774    pub(crate) fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
775        let underlying_source = self.underlying_source.get()?;
776        if underlying_source.in_memory() {
777            return self.queue.get_in_memory_bytes();
778        }
779        None
780    }
781
782    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-clear-algorithms>
783    fn clear_algorithms(&self) {
784        // Set controller.[[pullAlgorithm]] to undefined.
785        // Set controller.[[cancelAlgorithm]] to undefined.
786        self.underlying_source.set(None);
787
788        // Set controller.[[strategySizeAlgorithm]] to undefined.
789        *self.strategy_size.borrow_mut() = None;
790    }
791
792    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-close>
793    pub(crate) fn close(&self, can_gc: CanGc) {
794        // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return.
795        if !self.can_close_or_enqueue() {
796            return;
797        }
798
799        let Some(stream) = self.stream.get() else {
800            return;
801        };
802
803        // Set controller.[[closeRequested]] to true.
804        self.close_requested.set(true);
805
806        if self.queue.is_empty() {
807            // Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller).
808            self.clear_algorithms();
809
810            // Perform ! ReadableStreamClose(stream).
811            stream.close(can_gc);
812        }
813    }
814
815    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-get-desired-size>
816    pub(crate) fn get_desired_size(&self) -> Option<f64> {
817        let stream = self.stream.get()?;
818
819        // If state is "errored", return null.
820        if stream.is_errored() {
821            return None;
822        }
823
824        // If state is "closed", return 0.
825        if stream.is_closed() {
826            return Some(0.0);
827        }
828
829        // Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
830        let desired_size = self.strategy_hwm - self.queue.total_size.get().clamp(0.0, f64::MAX);
831        Some(desired_size.clamp(desired_size, self.strategy_hwm))
832    }
833
834    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-can-close-or-enqueue>
835    pub(crate) fn can_close_or_enqueue(&self) -> bool {
836        let Some(stream) = self.stream.get() else {
837            return false;
838        };
839
840        // If controller.[[closeRequested]] is false and state is "readable", return true.
841        if !self.close_requested.get() && stream.is_readable() {
842            return true;
843        }
844
845        // Otherwise, return false.
846        false
847    }
848
849    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-error>
850    pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
851        let Some(stream) = self.stream.get() else {
852            return;
853        };
854
855        // If stream.[[state]] is not "readable", return.
856        if !stream.is_readable() {
857            return;
858        }
859
860        // Perform ! ResetQueue(controller).
861        self.queue.reset();
862
863        // Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller).
864        self.clear_algorithms();
865
866        stream.error(e, can_gc);
867    }
868
869    /// <https://streams.spec.whatwg.org/#rs-default-controller-has-backpressure>
870    pub(crate) fn has_backpressure(&self) -> bool {
871        // If ! ReadableStreamDefaultControllerShouldCallPull(controller) is true, return false.
872        // Otherwise, return true.
873        !self.should_call_pull()
874    }
875}
876
877impl ReadableStreamDefaultControllerMethods<crate::DomTypeHolder>
878    for ReadableStreamDefaultController
879{
880    /// <https://streams.spec.whatwg.org/#rs-default-controller-desired-size>
881    fn GetDesiredSize(&self) -> Option<f64> {
882        self.get_desired_size()
883    }
884
885    /// <https://streams.spec.whatwg.org/#rs-default-controller-close>
886    fn Close(&self, can_gc: CanGc) -> Fallible<()> {
887        if !self.can_close_or_enqueue() {
888            // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(this) is false,
889            // throw a TypeError exception.
890            return Err(Error::Type(c"Stream cannot be closed.".to_owned()));
891        }
892
893        // Perform ! ReadableStreamDefaultControllerClose(this).
894        self.close(can_gc);
895
896        Ok(())
897    }
898
899    /// <https://streams.spec.whatwg.org/#rs-default-controller-enqueue>
900    fn Enqueue(&self, cx: &mut js::context::JSContext, chunk: SafeHandleValue) -> Fallible<()> {
901        // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(this) is false, throw a TypeError exception.
902        if !self.can_close_or_enqueue() {
903            return Err(Error::Type(c"Stream cannot be enqueued to.".to_owned()));
904        }
905
906        // Perform ? ReadableStreamDefaultControllerEnqueue(this, chunk).
907        self.enqueue(cx, chunk)
908    }
909
910    /// <https://streams.spec.whatwg.org/#rs-default-controller-error>
911    fn Error(&self, cx: &mut js::context::JSContext, e: SafeHandleValue) -> Fallible<()> {
912        self.error(e, CanGc::from_cx(cx));
913        Ok(())
914    }
915}