Skip to main content

script/dom/stream/
readablestreamdefaultcontroller.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::context::JSContext;
12use js::jsapi::{Heap, JSObject};
13use js::jsval::{JSVal, UndefinedValue};
14use js::realm::CurrentRealm;
15use js::rust::wrappers2::JS_GetPendingException;
16use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue, MutableHandleValue};
17use js::typedarray::Uint8;
18use script_bindings::conversions::SafeToJSValConvertible;
19use script_bindings::reflector::{Reflector, reflect_dom_object_with_cx};
20
21use crate::dom::bindings::buffer_source::create_buffer_source;
22use crate::dom::bindings::callback::ExceptionHandling;
23use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
24use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultControllerMethods;
25use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
26use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible, throw_dom_exception};
27use crate::dom::bindings::reflector::DomGlobal;
28use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
29use crate::dom::bindings::trace::RootedTraceableBox;
30use crate::dom::globalscope::GlobalScope;
31use crate::dom::promise::Promise;
32use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
33use crate::dom::stream::readablestream::ReadableStream;
34use crate::dom::stream::readablestreamdefaultreader::ReadRequest;
35use crate::dom::stream::underlyingsourcecontainer::{
36    UnderlyingSourceContainer, UnderlyingSourceType,
37};
38use crate::realms::enter_auto_realm;
39
40/// The fulfillment handler for
41/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
42#[derive(Clone, JSTraceable, MallocSizeOf)]
43#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
44struct PullAlgorithmFulfillmentHandler {
45    controller: Dom<ReadableStreamDefaultController>,
46}
47
48impl Callback for PullAlgorithmFulfillmentHandler {
49    /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
50    /// Upon fulfillment of pullPromise
51    fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
52        // Set controller.[[pulling]] to false.
53        self.controller.pulling.set(false);
54
55        // If controller.[[pullAgain]] is true,
56        if self.controller.pull_again.get() {
57            // Set controller.[[pullAgain]] to false.
58            self.controller.pull_again.set(false);
59
60            // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
61            self.controller.call_pull_if_needed(cx);
62        }
63    }
64}
65
66/// The rejection handler for
67/// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
68#[derive(Clone, JSTraceable, MallocSizeOf)]
69#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
70struct PullAlgorithmRejectionHandler {
71    controller: Dom<ReadableStreamDefaultController>,
72}
73
74impl Callback for PullAlgorithmRejectionHandler {
75    /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
76    /// Upon rejection of pullPromise with reason e.
77    fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
78        // Perform ! ReadableStreamDefaultControllerError(controller, e).
79        self.controller.error(cx, v);
80    }
81}
82
83/// The fulfillment handler for
84/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
85#[derive(Clone, JSTraceable, MallocSizeOf)]
86#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
87struct StartAlgorithmFulfillmentHandler {
88    controller: Dom<ReadableStreamDefaultController>,
89}
90
91impl Callback for StartAlgorithmFulfillmentHandler {
92    /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
93    /// Upon fulfillment of startPromise,
94    fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
95        // Set controller.[[started]] to true.
96        self.controller.started.set(true);
97
98        // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
99        self.controller.call_pull_if_needed(cx);
100    }
101}
102
103/// The rejection handler for
104/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
105#[derive(Clone, JSTraceable, MallocSizeOf)]
106#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
107struct StartAlgorithmRejectionHandler {
108    controller: Dom<ReadableStreamDefaultController>,
109}
110
111impl Callback for StartAlgorithmRejectionHandler {
112    /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
113    /// Upon rejection of startPromise with reason r,
114    fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
115        // Perform ! ReadableStreamDefaultControllerError(controller, r).
116        self.controller.error(cx, v);
117    }
118}
119
120/// <https://streams.spec.whatwg.org/#value-with-size>
121#[derive(Debug, JSTraceable, MallocSizeOf, PartialEq)]
122#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
123pub(crate) struct ValueWithSize {
124    /// <https://streams.spec.whatwg.org/#value-with-size-value>
125    #[ignore_malloc_size_of = "Heap is measured by mozjs"]
126    pub(crate) value: Box<Heap<JSVal>>,
127    /// <https://streams.spec.whatwg.org/#value-with-size-size>
128    pub(crate) size: f64,
129}
130
131/// <https://streams.spec.whatwg.org/#value-with-size>
132#[derive(Debug, JSTraceable, MallocSizeOf, PartialEq)]
133#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
134pub(crate) enum EnqueuedValue {
135    /// A value enqueued from Rust.
136    Native(Box<[u8]>),
137    /// A Js value.
138    Js(ValueWithSize),
139    /// <https://streams.spec.whatwg.org/#close-sentinel>
140    CloseSentinel,
141}
142
143impl EnqueuedValue {
144    fn size(&self) -> f64 {
145        match self {
146            EnqueuedValue::Native(v) => v.len() as f64,
147            EnqueuedValue::Js(v) => v.size,
148            // The size of the sentinel is zero,
149            // as per <https://streams.spec.whatwg.org/#ref-for-close-sentinel%E2%91%A0>
150            EnqueuedValue::CloseSentinel => 0.,
151        }
152    }
153
154    fn to_jsval(&self, cx: &mut JSContext, rval: MutableHandleValue) {
155        match self {
156            EnqueuedValue::Native(chunk) => {
157                rooted!(&in(cx) let mut array_buffer_ptr = ptr::null_mut::<JSObject>());
158                create_buffer_source::<Uint8>(cx, chunk, array_buffer_ptr.handle_mut())
159                    .expect("failed to create buffer source for native chunk.");
160                array_buffer_ptr.safe_to_jsval(cx, rval);
161            },
162            EnqueuedValue::Js(value_with_size) => value_with_size.value.safe_to_jsval(cx, rval),
163            EnqueuedValue::CloseSentinel => {
164                unreachable!("The close sentinel is never made available as a js val.")
165            },
166        }
167    }
168}
169
170/// <https://streams.spec.whatwg.org/#is-non-negative-number>
171fn is_non_negative_number(value: &EnqueuedValue) -> bool {
172    let value_with_size = match value {
173        EnqueuedValue::Native(_) => return true,
174        EnqueuedValue::Js(value_with_size) => value_with_size,
175        EnqueuedValue::CloseSentinel => return true,
176    };
177
178    // If v is not a Number, return false.
179    // Checked as part of the WebIDL.
180
181    // If v is NaN, return false.
182    if value_with_size.size.is_nan() {
183        return false;
184    }
185
186    // If v < 0, return false.
187    if value_with_size.size.is_sign_negative() {
188        return false;
189    }
190
191    true
192}
193
194/// <https://streams.spec.whatwg.org/#queue-with-sizes>
195#[derive(Default, JSTraceable, MallocSizeOf)]
196#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
197pub(crate) struct QueueWithSizes {
198    queue: RefCell<VecDeque<EnqueuedValue>>,
199    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-queuetotalsize>
200    pub(crate) total_size: Cell<f64>,
201}
202
203impl QueueWithSizes {
204    /// <https://streams.spec.whatwg.org/#dequeue-value>
205    /// A none `rval` means we're dequeing the close sentinel,
206    /// which should never be made available to script.
207    pub(crate) fn dequeue_value(&self, cx: &mut JSContext, rval: Option<MutableHandleValue>) {
208        {
209            let queue = self.queue.borrow();
210            let Some(value) = queue.front() else {
211                unreachable!("Buffer cannot be empty when dequeue value is called into.");
212            };
213            self.total_size.set(self.total_size.get() - value.size());
214            if let Some(rval) = rval {
215                value.to_jsval(cx, rval);
216            } else {
217                assert_eq!(value, &EnqueuedValue::CloseSentinel);
218            }
219        }
220        self.queue.borrow_mut().pop_front();
221    }
222
223    /// <https://streams.spec.whatwg.org/#enqueue-value-with-size>
224    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
225    pub(crate) fn enqueue_value_with_size(&self, value: EnqueuedValue) -> Result<(), Error> {
226        // If ! IsNonNegativeNumber(size) is false, throw a RangeError exception.
227        if !is_non_negative_number(&value) {
228            return Err(Error::Range(
229                c"The size of the enqueued chunk is not a non-negative number.".to_owned(),
230            ));
231        }
232
233        // If size is +∞, throw a RangeError exception.
234        if value.size().is_infinite() {
235            return Err(Error::Range(
236                c"The size of the enqueued chunk is infinite.".to_owned(),
237            ));
238        }
239
240        self.total_size.set(self.total_size.get() + value.size());
241        self.queue.borrow_mut().push_back(value);
242
243        Ok(())
244    }
245
246    pub(crate) fn is_empty(&self) -> bool {
247        self.queue.borrow().is_empty()
248    }
249
250    /// <https://streams.spec.whatwg.org/#peek-queue-value>
251    /// Returns whether value is the close sentinel.
252    pub(crate) fn peek_queue_value(&self, cx: &mut JSContext, rval: MutableHandleValue) -> bool {
253        // Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
254        // Done with the QueueWithSizes type.
255
256        // Assert: container.[[queue]] is not empty.
257        assert!(!self.is_empty());
258
259        // Let valueWithSize be container.[[queue]][0].
260        let queue = self.queue.borrow();
261        let value_with_size = queue.front().expect("Queue is not empty.");
262        if let EnqueuedValue::CloseSentinel = value_with_size {
263            return true;
264        }
265
266        // Return valueWithSize’s value.
267        value_with_size.to_jsval(cx, rval);
268        false
269    }
270
271    /// Only used with native sources.
272    fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
273        self.queue
274            .borrow()
275            .iter()
276            .try_fold(Vec::new(), |mut acc, value| match value {
277                EnqueuedValue::Native(chunk) => {
278                    acc.extend(chunk.iter().copied());
279                    Some(acc)
280                },
281                _ => {
282                    warn!("get_in_memory_bytes called on a controller with non-native source.");
283                    None
284                },
285            })
286    }
287
288    /// <https://streams.spec.whatwg.org/#reset-queue>
289    pub(crate) fn reset(&self) {
290        self.queue.borrow_mut().clear();
291        self.total_size.set(Default::default());
292    }
293}
294
295/// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller>
296#[dom_struct]
297pub(crate) struct ReadableStreamDefaultController {
298    reflector_: Reflector,
299
300    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-queue>
301    queue: QueueWithSizes,
302
303    /// A mutable reference to the underlying source is used to implement these two
304    /// internal slots:
305    ///
306    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pullalgorithm>
307    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-cancelalgorithm>
308    underlying_source: MutNullableDom<UnderlyingSourceContainer>,
309
310    stream: MutNullableDom<ReadableStream>,
311
312    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-strategyhwm>
313    strategy_hwm: f64,
314
315    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-strategysizealgorithm>
316    #[ignore_malloc_size_of = "mozjs"]
317    strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>,
318
319    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-closerequested>
320    close_requested: Cell<bool>,
321
322    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-started>
323    started: Cell<bool>,
324
325    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pulling>
326    pulling: Cell<bool>,
327
328    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pullagain>
329    pull_again: Cell<bool>,
330}
331
332impl ReadableStreamDefaultController {
333    fn new_inherited(
334        strategy_hwm: f64,
335        strategy_size: Rc<QueuingStrategySize>,
336        underlying_source: &UnderlyingSourceContainer,
337    ) -> ReadableStreamDefaultController {
338        ReadableStreamDefaultController {
339            reflector_: Reflector::new(),
340            queue: Default::default(),
341            stream: MutNullableDom::new(None),
342            underlying_source: MutNullableDom::new(Some(underlying_source)),
343            strategy_hwm,
344            strategy_size: RefCell::new(Some(strategy_size)),
345            close_requested: Default::default(),
346            started: Default::default(),
347            pulling: Default::default(),
348            pull_again: Default::default(),
349        }
350    }
351
352    pub(crate) fn new(
353        cx: &mut JSContext,
354        global: &GlobalScope,
355        underlying_source: UnderlyingSourceType,
356        strategy_hwm: f64,
357        strategy_size: Rc<QueuingStrategySize>,
358    ) -> DomRoot<ReadableStreamDefaultController> {
359        let underlying_source = UnderlyingSourceContainer::new(cx, global, underlying_source);
360        reflect_dom_object_with_cx(
361            Box::new(ReadableStreamDefaultController::new_inherited(
362                strategy_hwm,
363                strategy_size,
364                &underlying_source,
365            )),
366            global,
367            cx,
368        )
369    }
370
371    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
372    pub(crate) fn setup(
373        &self,
374        cx: &mut JSContext,
375        stream: DomRoot<ReadableStream>,
376    ) -> Result<(), Error> {
377        // Assert: stream.[[controller]] is undefined
378        stream.assert_no_controller();
379
380        // Set controller.[[stream]] to stream.
381        self.stream.set(Some(&stream));
382
383        let global = &*self.global();
384        let rooted_default_controller = DomRoot::from_ref(self);
385
386        // Perform ! ResetQueue(controller).
387        // Set controller.[[started]], controller.[[closeRequested]],
388        // controller.[[pullAgain]], and controller.[[pulling]] to false.
389        // Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm
390        // and controller.[[strategyHWM]] to highWaterMark.
391        // Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm
392        // and controller.[[strategyHWM]] to highWaterMark.
393        // Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
394
395        // Note: the above steps are done in `new`.
396
397        // Set stream.[[controller]] to controller.
398        stream.set_default_controller(&rooted_default_controller);
399
400        if let Some(underlying_source) = rooted_default_controller.underlying_source.get() {
401            // Let startResult be the result of performing startAlgorithm. (This might throw an exception.)
402            let start_result = underlying_source
403                .call_start_algorithm(
404                    cx,
405                    Controller::ReadableStreamDefaultController(rooted_default_controller.clone()),
406                )
407                .unwrap_or_else(|| {
408                    let promise = Promise::new_resolved(cx, global, ());
409                    Ok(promise)
410                });
411
412            // Let startPromise be a promise resolved with startResult.
413            let start_promise = start_result?;
414
415            // Upon fulfillment of startPromise, Upon rejection of startPromise with reason r,
416            let handler = PromiseNativeHandler::new(
417                cx,
418                global,
419                Some(Box::new(StartAlgorithmFulfillmentHandler {
420                    controller: Dom::from_ref(&rooted_default_controller),
421                })),
422                Some(Box::new(StartAlgorithmRejectionHandler {
423                    controller: Dom::from_ref(&rooted_default_controller),
424                })),
425            );
426            let mut realm = enter_auto_realm(cx, global);
427            let cx = &mut realm.current_realm();
428            start_promise.append_native_handler(cx, &handler);
429        };
430
431        Ok(())
432    }
433
434    /// Setting the JS object after the heap has settled down.
435    pub(crate) fn set_underlying_source_this_object(&self, this_object: HandleObject) {
436        if let Some(underlying_source) = self.underlying_source.get() {
437            underlying_source.set_underlying_source_this_object(this_object);
438        }
439    }
440
441    /// <https://streams.spec.whatwg.org/#dequeue-value>
442    fn dequeue_value(&self, cx: &mut JSContext, rval: MutableHandleValue) {
443        self.queue.dequeue_value(cx, Some(rval));
444    }
445
446    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-should-call-pull>
447    fn should_call_pull(&self) -> bool {
448        // Let stream be controller.[[stream]].
449        // Note: the spec does not assert that stream is not undefined here,
450        // so we return false if it is.
451        let Some(stream) = self.stream.get() else {
452            debug!("`should_call_pull` called on a controller without a stream.");
453            return false;
454        };
455
456        // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return.
457        if !self.can_close_or_enqueue() {
458            return false;
459        }
460
461        // If controller.[[started]] is false, return false.
462        if !self.started.get() {
463            return false;
464        }
465
466        // If ! IsReadableStreamLocked(stream) is true
467        // and ! ReadableStreamGetNumReadRequests(stream) > 0, return true.
468        if stream.is_locked() && stream.get_num_read_requests() > 0 {
469            return true;
470        }
471
472        // Let desiredSize be ! ReadableStreamDefaultControllerGetDesiredSize(controller).
473        // Assert: desiredSize is not null.
474        let desired_size = self.get_desired_size().expect("desiredSize is not null.");
475
476        if desired_size > 0. {
477            return true;
478        }
479
480        false
481    }
482
483    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-call-pull-if-needed>
484    fn call_pull_if_needed(&self, cx: &mut JSContext) {
485        // Let shouldPull be ! ReadableStreamDefaultControllerShouldCallPull(controller).
486        // If shouldPull is false, return.
487        if !self.should_call_pull() {
488            return;
489        }
490
491        // If controller.[[pulling]] is true,
492        if self.pulling.get() {
493            // Set controller.[[pullAgain]] to true.
494            self.pull_again.set(true);
495
496            return;
497        }
498
499        // Set controller.[[pulling]] to true.
500        self.pulling.set(true);
501
502        // Let pullPromise be the result of performing controller.[[pullAlgorithm]].
503        // Continues into the resolve and reject handling of the native handler.
504        let global = self.global();
505        let rooted_default_controller = DomRoot::from_ref(self);
506        let controller =
507            Controller::ReadableStreamDefaultController(rooted_default_controller.clone());
508
509        let Some(underlying_source) = self.underlying_source.get() else {
510            return;
511        };
512        let handler = PromiseNativeHandler::new(
513            cx,
514            &global,
515            Some(Box::new(PullAlgorithmFulfillmentHandler {
516                controller: Dom::from_ref(&rooted_default_controller),
517            })),
518            Some(Box::new(PullAlgorithmRejectionHandler {
519                controller: Dom::from_ref(&rooted_default_controller),
520            })),
521        );
522
523        let mut realm = enter_auto_realm(cx, &*global);
524        let cx = &mut realm.current_realm();
525
526        let result = underlying_source
527            .call_pull_algorithm(cx, controller)
528            .unwrap_or_else(|| {
529                let promise = Promise::new_resolved(cx, &global, ());
530                Ok(promise)
531            });
532        let promise = result.unwrap_or_else(|error| {
533            rooted!(&in(cx) let mut rval = UndefinedValue());
534            // TODO: check if `self.global()` is the right globalscope.
535            error.to_jsval(cx, &global, rval.handle_mut());
536            Promise::new_rejected(cx, &global, rval.handle())
537        });
538        promise.append_native_handler(cx, &handler);
539    }
540
541    /// <https://streams.spec.whatwg.org/#rs-default-controller-private-cancel>
542    pub(crate) fn perform_cancel_steps(
543        &self,
544        cx: &mut JSContext,
545        global: &GlobalScope,
546        reason: SafeHandleValue,
547    ) -> Rc<Promise> {
548        // Perform ! ResetQueue(this).
549        self.queue.reset();
550
551        let underlying_source = self
552            .underlying_source
553            .get()
554            .expect("Controller should have a source when the cancel steps are called into.");
555        // Let result be the result of performing this.[[cancelAlgorithm]], passing reason.
556        let result = underlying_source
557            .call_cancel_algorithm(cx, global, reason)
558            .unwrap_or_else(|| {
559                let promise = Promise::new(cx, global);
560                promise.resolve_native(cx, &());
561                Ok(promise)
562            });
563        let promise = result.unwrap_or_else(|error| {
564            rooted!(&in(cx) let mut rval = UndefinedValue());
565
566            error.to_jsval(cx, global, rval.handle_mut());
567            let promise = Promise::new(cx, global);
568            promise.reject_native(cx, &rval.handle());
569            promise
570        });
571
572        // Perform ! ReadableStreamDefaultControllerClearAlgorithms(this).
573        self.clear_algorithms();
574
575        // Return result(the promise).
576        promise
577    }
578
579    /// <https://streams.spec.whatwg.org/#rs-default-controller-private-pull>
580    pub(crate) fn perform_pull_steps(&self, cx: &mut JSContext, read_request: &ReadRequest) {
581        // Let stream be this.[[stream]].
582        // Note: the spec does not assert that there is a stream.
583        let Some(stream) = self.stream.get() else {
584            return;
585        };
586
587        // if queue contains bytes, perform chunk steps.
588        if !self.queue.is_empty() {
589            rooted!(&in(cx) let mut rval = UndefinedValue());
590            let result = RootedTraceableBox::new(Heap::default());
591            self.dequeue_value(cx, rval.handle_mut());
592            result.set(*rval);
593
594            // If this.[[closeRequested]] is true and this.[[queue]] is empty
595            if self.close_requested.get() && self.queue.is_empty() {
596                // Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller).
597                self.clear_algorithms();
598
599                // Perform ! ReadableStreamClose(stream).
600                stream.close(cx);
601            } else {
602                // Otherwise, perform ! ReadableStreamDefaultControllerCallPullIfNeeded(this).
603                self.call_pull_if_needed(cx);
604            }
605            // Perform readRequest’s chunk steps, given chunk.
606            read_request.chunk_steps(cx, result, &self.global());
607        } else {
608            // Perform ! ReadableStreamAddReadRequest(stream, readRequest).
609            stream.add_read_request(read_request);
610
611            // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(this).
612            self.call_pull_if_needed(cx);
613        }
614    }
615
616    /// <https://streams.spec.whatwg.org/#ref-for-abstract-opdef-readablestreamcontroller-releasesteps>
617    pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
618        // step 1 - Return.
619        Ok(())
620    }
621
622    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue>
623    #[expect(unsafe_code)]
624    pub(crate) fn enqueue(&self, cx: &mut JSContext, chunk: SafeHandleValue) -> Result<(), Error> {
625        // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return.
626        if !self.can_close_or_enqueue() {
627            return Ok(());
628        }
629
630        let stream = self
631            .stream
632            .get()
633            .expect("Controller must have a stream when a chunk is enqueued.");
634
635        // If ! IsReadableStreamLocked(stream) is true
636        // and ! ReadableStreamGetNumReadRequests(stream) > 0,
637        // perform ! ReadableStreamFulfillReadRequest(stream, chunk, false).
638        if stream.is_locked() && stream.get_num_read_requests() > 0 {
639            stream.fulfill_read_request(cx, chunk, false);
640        } else {
641            // Otherwise,
642            // Let result be the result of performing controller.[[strategySizeAlgorithm]],
643            // passing in chunk, and interpreting the result as a completion record.
644            // Note: the clone is necessary to prevent potential re-borrow panics.
645            let strategy_size = {
646                let reference = self.strategy_size.borrow();
647                reference.clone()
648            };
649            let size = if let Some(strategy_size) = strategy_size {
650                // Note: the Rethrow exception handling is necessary,
651                // otherwise returning JSFailed will panic because no exception is pending.
652                let result = strategy_size.Call__(cx, chunk, ExceptionHandling::Rethrow);
653                match result {
654                    // Let chunkSize be result.[[Value]].
655                    Ok(size) => size,
656                    Err(error) => {
657                        // If result is an abrupt completion,
658                        rooted!(&in(cx) let mut rval = UndefinedValue());
659                        unsafe { assert!(JS_GetPendingException(cx, rval.handle_mut())) };
660
661                        // Perform ! ReadableStreamDefaultControllerError(controller, result.[[Value]]).
662                        self.error(cx, rval.handle());
663
664                        // Return result.
665                        // Note: we need to return a type error, because no exception is pending.
666                        return Err(error);
667                    },
668                }
669            } else {
670                0.
671            };
672
673            {
674                // Let enqueueResult be EnqueueValueWithSize(controller, chunk, chunkSize).
675                let res = self
676                    .queue
677                    .enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize {
678                        value: Heap::boxed(chunk.get()),
679                        size,
680                    }));
681                if let Err(error) = res {
682                    // If enqueueResult is an abrupt completion,
683
684                    // First, throw the exception.
685                    // Note: this must be done manually here,
686                    // because `enqueue_value_with_size` does not call into JS.
687                    throw_dom_exception(cx, &self.global(), error);
688
689                    // Then, get a handle to the JS val for the exception,
690                    // and use that to error the stream.
691                    rooted!(&in(cx) let mut rval = UndefinedValue());
692                    unsafe { assert!(JS_GetPendingException(cx, rval.handle_mut())) };
693
694                    // Perform ! ReadableStreamDefaultControllerError(controller, enqueueResult.[[Value]]).
695                    self.error(cx, rval.handle());
696
697                    // Return enqueueResult.
698                    // Note: because we threw the exception above,
699                    // there is a pending exception and we can return JSFailed.
700                    return Err(Error::JSFailed);
701                }
702            }
703        }
704
705        // Perform ! ReadableStreamDefaultControllerCallPullIfNeeded(controller).
706        self.call_pull_if_needed(cx);
707
708        Ok(())
709    }
710
711    /// Native call to
712    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue>
713    pub(crate) fn enqueue_native(&self, cx: &mut JSContext, chunk: Vec<u8>) {
714        let stream = self
715            .stream
716            .get()
717            .expect("Controller must have a stream when a chunk is enqueued.");
718        if stream.is_locked() && stream.get_num_read_requests() > 0 {
719            rooted!(&in(cx) let mut rval = UndefinedValue());
720            EnqueuedValue::Native(chunk.into_boxed_slice()).to_jsval(cx, rval.handle_mut());
721            stream.fulfill_read_request(cx, rval.handle(), false);
722        } else {
723            self.queue
724                .enqueue_value_with_size(EnqueuedValue::Native(chunk.into_boxed_slice()))
725                .expect("Enqueuing a chunk from Rust should not fail.");
726        }
727    }
728
729    /// Does the stream have all data in memory?
730    pub(crate) fn in_memory(&self) -> bool {
731        let Some(underlying_source) = self.underlying_source.get() else {
732            return false;
733        };
734        underlying_source.in_memory()
735    }
736
737    /// Return bytes synchronously if the stream has all data in memory.
738    pub(crate) fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
739        let underlying_source = self.underlying_source.get()?;
740        if underlying_source.in_memory() {
741            return self.queue.get_in_memory_bytes();
742        }
743        None
744    }
745
746    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-clear-algorithms>
747    fn clear_algorithms(&self) {
748        // Set controller.[[pullAlgorithm]] to undefined.
749        // Set controller.[[cancelAlgorithm]] to undefined.
750        self.underlying_source.set(None);
751
752        // Set controller.[[strategySizeAlgorithm]] to undefined.
753        *self.strategy_size.borrow_mut() = None;
754    }
755
756    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-close>
757    pub(crate) fn close(&self, cx: &mut JSContext) {
758        // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is false, return.
759        if !self.can_close_or_enqueue() {
760            return;
761        }
762
763        let Some(stream) = self.stream.get() else {
764            return;
765        };
766
767        // Set controller.[[closeRequested]] to true.
768        self.close_requested.set(true);
769
770        if self.queue.is_empty() {
771            // Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller).
772            self.clear_algorithms();
773
774            // Perform ! ReadableStreamClose(stream).
775            stream.close(cx);
776        }
777    }
778
779    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-get-desired-size>
780    pub(crate) fn get_desired_size(&self) -> Option<f64> {
781        let stream = self.stream.get()?;
782
783        // If state is "errored", return null.
784        if stream.is_errored() {
785            return None;
786        }
787
788        // If state is "closed", return 0.
789        if stream.is_closed() {
790            return Some(0.0);
791        }
792
793        // Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
794        let desired_size = self.strategy_hwm - self.queue.total_size.get().clamp(0.0, f64::MAX);
795        Some(desired_size.clamp(desired_size, self.strategy_hwm))
796    }
797
798    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-can-close-or-enqueue>
799    pub(crate) fn can_close_or_enqueue(&self) -> bool {
800        let Some(stream) = self.stream.get() else {
801            return false;
802        };
803
804        // If controller.[[closeRequested]] is false and state is "readable", return true.
805        if !self.close_requested.get() && stream.is_readable() {
806            return true;
807        }
808
809        // Otherwise, return false.
810        false
811    }
812
813    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-error>
814    pub(crate) fn error(&self, cx: &mut JSContext, e: SafeHandleValue) {
815        let Some(stream) = self.stream.get() else {
816            return;
817        };
818
819        // If stream.[[state]] is not "readable", return.
820        if !stream.is_readable() {
821            return;
822        }
823
824        // Perform ! ResetQueue(controller).
825        self.queue.reset();
826
827        // Perform ! ReadableStreamDefaultControllerClearAlgorithms(controller).
828        self.clear_algorithms();
829
830        stream.error(cx, e);
831    }
832
833    /// <https://streams.spec.whatwg.org/#rs-default-controller-has-backpressure>
834    pub(crate) fn has_backpressure(&self) -> bool {
835        // If ! ReadableStreamDefaultControllerShouldCallPull(controller) is true, return false.
836        // Otherwise, return true.
837        !self.should_call_pull()
838    }
839}
840
841impl ReadableStreamDefaultControllerMethods<crate::DomTypeHolder>
842    for ReadableStreamDefaultController
843{
844    /// <https://streams.spec.whatwg.org/#rs-default-controller-desired-size>
845    fn GetDesiredSize(&self) -> Option<f64> {
846        self.get_desired_size()
847    }
848
849    /// <https://streams.spec.whatwg.org/#rs-default-controller-close>
850    fn Close(&self, cx: &mut JSContext) -> Fallible<()> {
851        if !self.can_close_or_enqueue() {
852            // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(this) is false,
853            // throw a TypeError exception.
854            return Err(Error::Type(c"Stream cannot be closed.".to_owned()));
855        }
856
857        // Perform ! ReadableStreamDefaultControllerClose(this).
858        self.close(cx);
859
860        Ok(())
861    }
862
863    /// <https://streams.spec.whatwg.org/#rs-default-controller-enqueue>
864    fn Enqueue(&self, cx: &mut JSContext, chunk: SafeHandleValue) -> Fallible<()> {
865        // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(this) is false, throw a TypeError exception.
866        if !self.can_close_or_enqueue() {
867            return Err(Error::Type(c"Stream cannot be enqueued to.".to_owned()));
868        }
869
870        // Perform ? ReadableStreamDefaultControllerEnqueue(this, chunk).
871        self.enqueue(cx, chunk)
872    }
873
874    /// <https://streams.spec.whatwg.org/#rs-default-controller-error>
875    fn Error(&self, cx: &mut JSContext, e: SafeHandleValue) -> Fallible<()> {
876        self.error(cx, e);
877        Ok(())
878    }
879}