script/dom/
writablestreamdefaultcontroller.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::ptr;
7use std::rc::Rc;
8
9use dom_struct::dom_struct;
10use js::jsapi::{Heap, IsPromiseObject, JSObject};
11use js::jsval::{JSVal, UndefinedValue};
12use js::realm::CurrentRealm;
13use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle};
14
15use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
16use super::types::TransformStream;
17use crate::dom::bindings::callback::ExceptionHandling;
18use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::{
19    UnderlyingSinkAbortCallback, UnderlyingSinkCloseCallback, UnderlyingSinkStartCallback,
20    UnderlyingSinkWriteCallback,
21};
22use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultControllerBinding::WritableStreamDefaultControllerMethods;
23use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
24use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
25use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
26use crate::dom::globalscope::GlobalScope;
27use crate::dom::messageport::MessagePort;
28use crate::dom::promise::Promise;
29use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
30use crate::dom::readablestreamdefaultcontroller::{EnqueuedValue, QueueWithSizes, ValueWithSize};
31use crate::dom::types::{AbortController, AbortSignal};
32use crate::dom::writablestream::WritableStream;
33use crate::realms::{InRealm, enter_realm};
34use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
35
36impl js::gc::Rootable for CloseAlgorithmFulfillmentHandler {}
37
38/// The fulfillment handler for
39/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close>
40#[derive(Clone, JSTraceable, MallocSizeOf)]
41#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
42struct CloseAlgorithmFulfillmentHandler {
43    stream: Dom<WritableStream>,
44}
45
46impl Callback for CloseAlgorithmFulfillmentHandler {
47    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
48        let can_gc = CanGc::from_cx(cx);
49        let stream = self.stream.as_rooted();
50
51        // Perform ! WritableStreamFinishInFlightClose(stream).
52        stream.finish_in_flight_close(cx.into(), can_gc);
53    }
54}
55
56impl js::gc::Rootable for CloseAlgorithmRejectionHandler {}
57
58/// The rejection handler for
59/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close>
60#[derive(Clone, JSTraceable, MallocSizeOf)]
61#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
62struct CloseAlgorithmRejectionHandler {
63    stream: Dom<WritableStream>,
64}
65
66impl Callback for CloseAlgorithmRejectionHandler {
67    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
68        let stream = self.stream.as_rooted();
69
70        let global = GlobalScope::from_current_realm(cx);
71
72        // Perform ! WritableStreamFinishInFlightCloseWithError(stream, reason).
73        stream.finish_in_flight_close_with_error(cx.into(), &global, v, CanGc::from_cx(cx));
74    }
75}
76
77impl js::gc::Rootable for StartAlgorithmFulfillmentHandler {}
78
79/// The fulfillment handler for
80/// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
81#[derive(Clone, JSTraceable, MallocSizeOf)]
82#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
83struct StartAlgorithmFulfillmentHandler {
84    controller: Dom<WritableStreamDefaultController>,
85}
86
87impl Callback for StartAlgorithmFulfillmentHandler {
88    /// Continuation of <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
89    /// Upon fulfillment of startPromise,
90    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
91        let controller = self.controller.as_rooted();
92        let stream = controller
93            .stream
94            .get()
95            .expect("Controller should have a stream.");
96
97        // Assert: stream.[[state]] is "writable" or "erroring".
98        assert!(stream.is_erroring() || stream.is_writable());
99
100        // Set controller.[[started]] to true.
101        controller.started.set(true);
102
103        let global = GlobalScope::from_current_realm(cx);
104
105        // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
106        controller.advance_queue_if_needed(cx.into(), &global, CanGc::from_cx(cx))
107    }
108}
109
110impl js::gc::Rootable for StartAlgorithmRejectionHandler {}
111
112/// The rejection handler for
113/// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
114#[derive(Clone, JSTraceable, MallocSizeOf)]
115#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
116struct StartAlgorithmRejectionHandler {
117    controller: Dom<WritableStreamDefaultController>,
118}
119
120impl Callback for StartAlgorithmRejectionHandler {
121    /// Continuation of <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
122    /// Upon rejection of startPromise with reason r,
123    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
124        let controller = self.controller.as_rooted();
125        let stream = controller
126            .stream
127            .get()
128            .expect("Controller should have a stream.");
129
130        // Assert: stream.[[state]] is "writable" or "erroring".
131        assert!(stream.is_erroring() || stream.is_writable());
132
133        // Set controller.[[started]] to true.
134        controller.started.set(true);
135
136        let global = GlobalScope::from_current_realm(cx);
137
138        // Perform ! WritableStreamDealWithRejection(stream, r).
139        stream.deal_with_rejection(cx.into(), &global, v, CanGc::from_cx(cx));
140    }
141}
142
143impl js::gc::Rootable for TransferBackPressurePromiseReaction {}
144
145/// Reacting to backpressurePromise as part of the `writeAlgorithm` of
146/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
147#[derive(JSTraceable, MallocSizeOf)]
148#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
149struct TransferBackPressurePromiseReaction {
150    /// The result of reacting to backpressurePromise.
151    #[conditional_malloc_size_of]
152    result_promise: Rc<Promise>,
153
154    /// The backpressurePromise.
155    #[ignore_malloc_size_of = "nested Rc"]
156    backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
157
158    /// The chunk received by the `writeAlgorithm`.
159    #[ignore_malloc_size_of = "mozjs"]
160    chunk: Box<Heap<JSVal>>,
161
162    /// The port used in the algorithm.
163    port: Dom<MessagePort>,
164}
165
166impl Callback for TransferBackPressurePromiseReaction {
167    /// Reacting to backpressurePromise with the following fulfillment steps:
168    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
169        let can_gc = CanGc::from_cx(cx);
170        let global = self.result_promise.global();
171        // Set backpressurePromise to a new promise.
172        let promise = Promise::new2(cx, &global);
173        *self.backpressure_promise.borrow_mut() = Some(promise);
174
175        // Let result be PackAndPostMessageHandlingError(port, "chunk", chunk).
176        rooted!(&in(cx) let mut chunk = UndefinedValue());
177        chunk.set(self.chunk.get());
178        let result =
179            self.port
180                .pack_and_post_message_handling_error("chunk", chunk.handle(), can_gc);
181
182        // If result is an abrupt completion,
183        if let Err(error) = result {
184            // Disentangle port.
185            global.disentangle_port(&self.port, can_gc);
186
187            // Return a promise rejected with result.[[Value]].
188            self.result_promise.reject_error(error, can_gc);
189        } else {
190            // Otherwise, return a promise resolved with undefined.
191            self.result_promise.resolve_native(&(), can_gc);
192        }
193    }
194}
195
196impl js::gc::Rootable for WriteAlgorithmFulfillmentHandler {}
197
198/// The fulfillment handler for
199/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write>
200#[derive(Clone, JSTraceable, MallocSizeOf)]
201#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
202struct WriteAlgorithmFulfillmentHandler {
203    controller: Dom<WritableStreamDefaultController>,
204}
205
206impl Callback for WriteAlgorithmFulfillmentHandler {
207    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
208        let can_gc = CanGc::from_cx(cx);
209        let controller = self.controller.as_rooted();
210        let stream = controller
211            .stream
212            .get()
213            .expect("Controller should have a stream.");
214
215        // Perform ! WritableStreamFinishInFlightWrite(stream).
216        stream.finish_in_flight_write(can_gc);
217
218        // Let state be stream.[[state]].
219        // Assert: state is "writable" or "erroring".
220        assert!(stream.is_erroring() || stream.is_writable());
221
222        // Perform ! DequeueValue(controller).
223        rooted!(&in(cx) let mut rval = UndefinedValue());
224        controller
225            .queue
226            .dequeue_value(cx.into(), Some(rval.handle_mut()), can_gc);
227
228        let global = GlobalScope::from_current_realm(cx);
229
230        // If ! WritableStreamCloseQueuedOrInFlight(stream) is false and state is "writable",
231        if !stream.close_queued_or_in_flight() && stream.is_writable() {
232            // Let backpressure be ! WritableStreamDefaultControllerGetBackpressure(controller).
233            let backpressure = controller.get_backpressure();
234
235            // Perform ! WritableStreamUpdateBackpressure(stream, backpressure).
236            stream.update_backpressure(backpressure, &global, can_gc);
237        }
238
239        // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
240        controller.advance_queue_if_needed(cx.into(), &global, can_gc)
241    }
242}
243
244impl js::gc::Rootable for WriteAlgorithmRejectionHandler {}
245
246/// The rejection handler for
247/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write>
248#[derive(Clone, JSTraceable, MallocSizeOf)]
249#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
250struct WriteAlgorithmRejectionHandler {
251    controller: Dom<WritableStreamDefaultController>,
252}
253
254impl Callback for WriteAlgorithmRejectionHandler {
255    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
256        let controller = self.controller.as_rooted();
257        let stream = controller
258            .stream
259            .get()
260            .expect("Controller should have a stream.");
261
262        // If stream.[[state]] is "writable",
263        if stream.is_writable() {
264            // perform ! WritableStreamDefaultControllerClearAlgorithms(controller).
265            controller.clear_algorithms();
266        }
267
268        let global = GlobalScope::from_current_realm(cx);
269
270        // Perform ! WritableStreamFinishInFlightWriteWithError(stream, reason).
271        stream.finish_in_flight_write_with_error(cx.into(), &global, v, CanGc::from_cx(cx));
272    }
273}
274
275/// The type of sink algorithms we are using.
276#[derive(JSTraceable, PartialEq)]
277#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
278pub enum UnderlyingSinkType {
279    /// Algorithms are provided by Js callbacks.
280    Js {
281        /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-abortalgorithm>
282        abort: RefCell<Option<Rc<UnderlyingSinkAbortCallback>>>,
283
284        start: RefCell<Option<Rc<UnderlyingSinkStartCallback>>>,
285
286        /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-closealgorithm>
287        close: RefCell<Option<Rc<UnderlyingSinkCloseCallback>>>,
288
289        /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-writealgorithm>
290        write: RefCell<Option<Rc<UnderlyingSinkWriteCallback>>>,
291    },
292    /// Algorithms supporting streams transfer are implemented in Rust.
293    /// The promise and port used in those algorithms are stored here.
294    Transfer {
295        backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
296        port: Dom<MessagePort>,
297    },
298    /// Algorithms supporting transform streams are implemented in Rust.
299    Transform(Dom<TransformStream>, Rc<Promise>),
300}
301
302impl UnderlyingSinkType {
303    pub(crate) fn new_js(
304        abort: Option<Rc<UnderlyingSinkAbortCallback>>,
305        start: Option<Rc<UnderlyingSinkStartCallback>>,
306        close: Option<Rc<UnderlyingSinkCloseCallback>>,
307        write: Option<Rc<UnderlyingSinkWriteCallback>>,
308    ) -> Self {
309        UnderlyingSinkType::Js {
310            abort: RefCell::new(abort),
311            start: RefCell::new(start),
312            close: RefCell::new(close),
313            write: RefCell::new(write),
314        }
315    }
316}
317
318/// <https://streams.spec.whatwg.org/#ws-default-controller-class>
319#[dom_struct]
320pub struct WritableStreamDefaultController {
321    reflector_: Reflector,
322
323    /// The type of underlying sink used. Besides the default JS one,
324    /// there will be others for stream transfer, and for transform stream.
325    #[ignore_malloc_size_of = "underlying_sink_type"]
326    underlying_sink_type: UnderlyingSinkType,
327
328    /// The JS object used as `this` when invoking sink algorithms.
329    #[ignore_malloc_size_of = "mozjs"]
330    underlying_sink_obj: Heap<*mut JSObject>,
331
332    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-queue>
333    queue: QueueWithSizes,
334
335    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-started>
336    started: Cell<bool>,
337
338    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-strategyhwm>
339    strategy_hwm: f64,
340
341    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-strategysizealgorithm>
342    #[ignore_malloc_size_of = "QueuingStrategySize"]
343    strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>,
344
345    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-stream>
346    stream: MutNullableDom<WritableStream>,
347
348    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-abortcontroller>
349    abort_controller: Dom<AbortController>,
350}
351
352impl WritableStreamDefaultController {
353    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink>
354    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
355    fn new_inherited(
356        global: &GlobalScope,
357        underlying_sink_type: UnderlyingSinkType,
358        strategy_hwm: f64,
359        strategy_size: Rc<QueuingStrategySize>,
360        can_gc: CanGc,
361    ) -> WritableStreamDefaultController {
362        WritableStreamDefaultController {
363            reflector_: Reflector::new(),
364            underlying_sink_type,
365            queue: Default::default(),
366            stream: Default::default(),
367            underlying_sink_obj: Default::default(),
368            strategy_hwm,
369            strategy_size: RefCell::new(Some(strategy_size)),
370            started: Default::default(),
371            abort_controller: Dom::from_ref(&AbortController::new_with_proto(global, None, can_gc)),
372        }
373    }
374
375    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
376    pub(crate) fn new(
377        global: &GlobalScope,
378        underlying_sink_type: UnderlyingSinkType,
379        strategy_hwm: f64,
380        strategy_size: Rc<QueuingStrategySize>,
381        can_gc: CanGc,
382    ) -> DomRoot<WritableStreamDefaultController> {
383        reflect_dom_object(
384            Box::new(WritableStreamDefaultController::new_inherited(
385                global,
386                underlying_sink_type,
387                strategy_hwm,
388                strategy_size,
389                can_gc,
390            )),
391            global,
392            can_gc,
393        )
394    }
395
396    pub(crate) fn started(&self) -> bool {
397        self.started.get()
398    }
399
400    /// Setting the JS object after the heap has settled down.
401    pub(crate) fn set_underlying_sink_this_object(&self, this_object: SafeHandleObject) {
402        self.underlying_sink_obj.set(*this_object);
403    }
404
405    /// "Signal abort" call from <https://streams.spec.whatwg.org/#writable-stream-abort>
406    pub(crate) fn signal_abort(
407        &self,
408        cx: SafeJSContext,
409        reason: SafeHandleValue,
410        realm: InRealm,
411        can_gc: CanGc,
412    ) {
413        self.abort_controller
414            .signal_abort(cx, reason, realm, can_gc);
415    }
416
417    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-clear-algorithms>
418    fn clear_algorithms(&self) {
419        match &self.underlying_sink_type {
420            UnderlyingSinkType::Js {
421                abort,
422                start: _,
423                close,
424                write,
425            } => {
426                // Set controller.[[writeAlgorithm]] to undefined.
427                write.borrow_mut().take();
428
429                // Set controller.[[closeAlgorithm]] to undefined.
430                close.borrow_mut().take();
431
432                // Set controller.[[abortAlgorithm]] to undefined.
433                abort.borrow_mut().take();
434            },
435            UnderlyingSinkType::Transfer {
436                backpressure_promise,
437                ..
438            } => {
439                backpressure_promise.borrow_mut().take();
440            },
441            UnderlyingSinkType::Transform(_, _) => {
442                return;
443            },
444        }
445
446        // Set controller.[[strategySizeAlgorithm]] to undefined.
447        self.strategy_size.borrow_mut().take();
448    }
449
450    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
451    pub(crate) fn setup(
452        &self,
453        cx: SafeJSContext,
454        global: &GlobalScope,
455        stream: &WritableStream,
456        can_gc: CanGc,
457    ) -> Result<(), Error> {
458        // Assert: stream implements WritableStream.
459        // Implied by stream type.
460
461        // Assert: stream.[[controller]] is undefined.
462        stream.assert_no_controller();
463
464        // Set controller.[[stream]] to stream.
465        self.stream.set(Some(stream));
466
467        // Set stream.[[controller]] to controller.
468        stream.set_default_controller(self);
469
470        // Perform ! ResetQueue(controller).
471
472        // Set controller.[[abortController]] to a new AbortController.
473
474        // Set controller.[[started]] to false.
475
476        // Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm.
477
478        // Set controller.[[strategyHWM]] to highWaterMark.
479
480        // Set controller.[[writeAlgorithm]] to writeAlgorithm.
481
482        // Set controller.[[closeAlgorithm]] to closeAlgorithm.
483
484        // Set controller.[[abortAlgorithm]] to abortAlgorithm.
485
486        // Note: above steps are done in `new_inherited`.
487
488        // Let backpressure be ! WritableStreamDefaultControllerGetBackpressure(controller).
489        let backpressure = self.get_backpressure();
490
491        // Perform ! WritableStreamUpdateBackpressure(stream, backpressure).
492        stream.update_backpressure(backpressure, global, can_gc);
493
494        // Let startResult be the result of performing startAlgorithm. (This may throw an exception.)
495        // Let startPromise be a promise resolved with startResult.
496        let start_promise = self.start_algorithm(cx, global, can_gc)?;
497
498        let rooted_default_controller = DomRoot::from_ref(self);
499
500        // Upon fulfillment of startPromise,
501        rooted!(in(*cx) let mut fulfillment_handler = Some(StartAlgorithmFulfillmentHandler {
502            controller: Dom::from_ref(&rooted_default_controller),
503        }));
504
505        // Upon rejection of startPromise with reason r,
506        rooted!(in(*cx) let mut rejection_handler = Some(StartAlgorithmRejectionHandler {
507            controller: Dom::from_ref(&rooted_default_controller),
508        }));
509
510        let handler = PromiseNativeHandler::new(
511            global,
512            fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
513            rejection_handler.take().map(|h| Box::new(h) as Box<_>),
514            can_gc,
515        );
516        let realm = enter_realm(global);
517        let comp = InRealm::Entered(&realm);
518        start_promise.append_native_handler(&handler, comp, can_gc);
519
520        Ok(())
521    }
522
523    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-close>
524    pub(crate) fn close(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
525        // Perform ! EnqueueValueWithSize(controller, close sentinel, 0).
526        self.queue
527            .enqueue_value_with_size(EnqueuedValue::CloseSentinel)
528            .expect("Enqueuing the close sentinel should not fail.");
529        // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
530        self.advance_queue_if_needed(cx, global, can_gc);
531    }
532
533    #[expect(unsafe_code)]
534    fn start_algorithm(
535        &self,
536        cx: SafeJSContext,
537        global: &GlobalScope,
538        can_gc: CanGc,
539    ) -> Fallible<Rc<Promise>> {
540        match &self.underlying_sink_type {
541            UnderlyingSinkType::Js {
542                start,
543                abort: _,
544                close: _,
545                write: _,
546            } => {
547                let algo = start.borrow().clone();
548                let start_promise = if let Some(start) = algo {
549                    rooted!(in(*cx) let mut result_object = ptr::null_mut::<JSObject>());
550                    rooted!(in(*cx) let mut result: JSVal);
551                    rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
552                    start.Call_(
553                        &this_object.handle(),
554                        self,
555                        result.handle_mut(),
556                        ExceptionHandling::Rethrow,
557                        can_gc,
558                    )?;
559                    let is_promise = unsafe {
560                        if result.is_object() {
561                            result_object.set(result.to_object());
562                            IsPromiseObject(result_object.handle().into_handle())
563                        } else {
564                            false
565                        }
566                    };
567                    if is_promise {
568                        Promise::new_with_js_promise(result_object.handle(), cx)
569                    } else {
570                        Promise::new_resolved(global, cx, result.get(), can_gc)
571                    }
572                } else {
573                    // Let startAlgorithm be an algorithm that returns undefined.
574                    Promise::new_resolved(global, cx, (), can_gc)
575                };
576
577                Ok(start_promise)
578            },
579            UnderlyingSinkType::Transfer { .. } => {
580                // Let startAlgorithm be an algorithm that returns undefined.
581                Ok(Promise::new_resolved(global, cx, (), can_gc))
582            },
583            UnderlyingSinkType::Transform(_, start_promise) => {
584                // Let startAlgorithm be an algorithm that returns startPromise.
585                Ok(start_promise.clone())
586            },
587        }
588    }
589
590    /// <https://streams.spec.whatwg.org/#ref-for-abstract-opdef-writablestreamcontroller-abortsteps>
591    pub(crate) fn abort_steps(
592        &self,
593        cx: SafeJSContext,
594        global: &GlobalScope,
595        reason: SafeHandleValue,
596        can_gc: CanGc,
597    ) -> Rc<Promise> {
598        let result = match &self.underlying_sink_type {
599            UnderlyingSinkType::Js {
600                abort,
601                start: _,
602                close: _,
603                write: _,
604            } => {
605                rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
606                let algo = abort.borrow().clone();
607                // Let result be the result of performing this.[[abortAlgorithm]], passing reason.
608                let result = if let Some(algo) = algo {
609                    algo.Call_(
610                        &this_object.handle(),
611                        Some(reason),
612                        ExceptionHandling::Rethrow,
613                        can_gc,
614                    )
615                } else {
616                    Ok(Promise::new_resolved(global, cx, (), can_gc))
617                };
618                result.unwrap_or_else(|e| {
619                    let promise = Promise::new(global, can_gc);
620                    promise.reject_error(e, can_gc);
621                    promise
622                })
623            },
624            UnderlyingSinkType::Transfer { port, .. } => {
625                // The steps from the `abortAlgorithm` at
626                // <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
627
628                // Let result be PackAndPostMessageHandlingError(port, "error", reason).
629                let result = port.pack_and_post_message_handling_error("error", reason, can_gc);
630
631                // Disentangle port.
632                global.disentangle_port(port, can_gc);
633
634                let promise = Promise::new(global, can_gc);
635
636                // If result is an abrupt completion, return a promise rejected with result.[[Value]]
637                if let Err(error) = result {
638                    promise.reject_error(error, can_gc);
639                } else {
640                    // Otherwise, return a promise resolved with undefined.
641                    promise.resolve_native(&(), can_gc);
642                }
643                promise
644            },
645            UnderlyingSinkType::Transform(stream, _) => {
646                // Return ! TransformStreamDefaultSinkAbortAlgorithm(stream, reason).
647                stream
648                    .transform_stream_default_sink_abort_algorithm(cx, global, reason, can_gc)
649                    .expect("Transform stream default sink abort algorithm should not fail.")
650            },
651        };
652
653        // Perform ! WritableStreamDefaultControllerClearAlgorithms(controller).
654        self.clear_algorithms();
655
656        result
657    }
658
659    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-writealgorithm>
660    fn call_write_algorithm(
661        &self,
662        cx: SafeJSContext,
663        chunk: SafeHandleValue,
664        global: &GlobalScope,
665        can_gc: CanGc,
666    ) -> Rc<Promise> {
667        match &self.underlying_sink_type {
668            UnderlyingSinkType::Js {
669                abort: _,
670                start: _,
671                close: _,
672                write,
673            } => {
674                rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
675                let algo = write.borrow().clone();
676                let result = if let Some(algo) = algo {
677                    algo.Call_(
678                        &this_object.handle(),
679                        chunk,
680                        self,
681                        ExceptionHandling::Rethrow,
682                        can_gc,
683                    )
684                } else {
685                    Ok(Promise::new_resolved(global, cx, (), can_gc))
686                };
687                result.unwrap_or_else(|e| {
688                    let promise = Promise::new(global, can_gc);
689                    promise.reject_error(e, can_gc);
690                    promise
691                })
692            },
693            UnderlyingSinkType::Transfer {
694                backpressure_promise,
695                port,
696            } => {
697                // The steps from the `writeAlgorithm` at
698                // <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
699
700                // If backpressurePromise is undefined,
701                // set backpressurePromise to a promise resolved with undefined.
702                if backpressure_promise.borrow().is_none() {
703                    let promise = Promise::new_resolved(global, cx, (), can_gc);
704                    *backpressure_promise.borrow_mut() = Some(promise);
705                }
706
707                // Return the result of reacting to backpressurePromise with the following fulfillment steps:
708                let result_promise = Promise::new(global, can_gc);
709                rooted!(in(*cx) let mut fulfillment_handler = Some(TransferBackPressurePromiseReaction {
710                    port: port.clone(),
711                    backpressure_promise: backpressure_promise.clone(),
712                    chunk: Heap::boxed(chunk.get()),
713                    result_promise: result_promise.clone(),
714                }));
715                let handler = PromiseNativeHandler::new(
716                    global,
717                    fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
718                    None,
719                    can_gc,
720                );
721                let realm = enter_realm(global);
722                let comp = InRealm::Entered(&realm);
723                backpressure_promise
724                    .borrow()
725                    .as_ref()
726                    .expect("Promise must be some by now.")
727                    .append_native_handler(&handler, comp, can_gc);
728                result_promise
729            },
730            UnderlyingSinkType::Transform(stream, _) => {
731                // Return ! TransformStreamDefaultSinkWriteAlgorithm(stream, chunk).
732                stream
733                    .transform_stream_default_sink_write_algorithm(cx, global, chunk, can_gc)
734                    .expect("Transform stream default sink write algorithm should not fail.")
735            },
736        }
737    }
738
739    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-closealgorithm>
740    fn call_close_algorithm(
741        &self,
742        cx: SafeJSContext,
743        global: &GlobalScope,
744        can_gc: CanGc,
745    ) -> Rc<Promise> {
746        match &self.underlying_sink_type {
747            UnderlyingSinkType::Js {
748                abort: _,
749                start: _,
750                close,
751                write: _,
752            } => {
753                rooted!(in(*cx) let mut this_object = ptr::null_mut::<JSObject>());
754                this_object.set(self.underlying_sink_obj.get());
755                let algo = close.borrow().clone();
756                let result = if let Some(algo) = algo {
757                    algo.Call_(&this_object.handle(), ExceptionHandling::Rethrow, can_gc)
758                } else {
759                    Ok(Promise::new_resolved(global, cx, (), can_gc))
760                };
761                result.unwrap_or_else(|e| {
762                    let promise = Promise::new(global, can_gc);
763                    promise.reject_error(e, can_gc);
764                    promise
765                })
766            },
767            UnderlyingSinkType::Transfer { port, .. } => {
768                // The steps from the `closeAlgorithm` at
769                // <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
770
771                // Perform ! PackAndPostMessage(port, "close", undefined).
772                rooted!(in(*cx) let mut value = UndefinedValue());
773                port.pack_and_post_message("close", value.handle(), can_gc)
774                    .expect("Sending close should not fail.");
775
776                // Disentangle port.
777                global.disentangle_port(port, can_gc);
778
779                // Return a promise resolved with undefined.
780                Promise::new_resolved(global, cx, (), can_gc)
781            },
782            UnderlyingSinkType::Transform(stream, _) => {
783                // Return ! TransformStreamDefaultSinkCloseAlgorithm(stream).
784                stream
785                    .transform_stream_default_sink_close_algorithm(cx, global, can_gc)
786                    .expect("Transform stream default sink close algorithm should not fail.")
787            },
788        }
789    }
790
791    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close>
792    pub(crate) fn process_close(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
793        // Let stream be controller.[[stream]].
794        let Some(stream) = self.stream.get() else {
795            unreachable!("Controller should have a stream");
796        };
797
798        // Perform ! WritableStreamMarkCloseRequestInFlight(stream).
799        stream.mark_close_request_in_flight();
800
801        // Perform ! DequeueValue(controller).
802        self.queue.dequeue_value(cx, None, can_gc);
803
804        // Assert: controller.[[queue]] is empty.
805        assert!(self.queue.is_empty());
806
807        // Let sinkClosePromise be the result of performing controller.[[closeAlgorithm]].
808        let sink_close_promise = self.call_close_algorithm(cx, global, can_gc);
809
810        // Perform ! WritableStreamDefaultControllerClearAlgorithms(controller).
811        self.clear_algorithms();
812
813        // Upon fulfillment of sinkClosePromise,
814        rooted!(in(*cx) let mut fulfillment_handler = Some(CloseAlgorithmFulfillmentHandler {
815            stream: Dom::from_ref(&stream),
816        }));
817
818        // Upon rejection of sinkClosePromise with reason reason,
819        rooted!(in(*cx) let mut rejection_handler = Some(CloseAlgorithmRejectionHandler {
820            stream: Dom::from_ref(&stream),
821        }));
822
823        // Attach handlers to the promise.
824        let handler = PromiseNativeHandler::new(
825            global,
826            fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
827            rejection_handler.take().map(|h| Box::new(h) as Box<_>),
828            can_gc,
829        );
830        let realm = enter_realm(global);
831        let comp = InRealm::Entered(&realm);
832        sink_close_promise.append_native_handler(&handler, comp, can_gc);
833    }
834
835    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-advance-queue-if-needed>
836    fn advance_queue_if_needed(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
837        // Let stream be controller.[[stream]].
838        let Some(stream) = self.stream.get() else {
839            unreachable!("Controller should have a stream");
840        };
841
842        // If controller.[[started]] is false, return.
843        if !self.started.get() {
844            return;
845        }
846
847        // If stream.[[inFlightWriteRequest]] is not undefined, return.
848        if stream.has_in_flight_write_request() {
849            return;
850        }
851
852        // Let state be stream.[[state]].
853
854        // Assert: state is not "closed" or "errored".
855        assert!(!(stream.is_errored() || stream.is_closed()));
856
857        // If state is "erroring",
858        if stream.is_erroring() {
859            // Perform ! WritableStreamFinishErroring(stream).
860            stream.finish_erroring(cx, global, can_gc);
861
862            // Return.
863            return;
864        }
865
866        // Let value be ! PeekQueueValue(controller).
867        rooted!(in(*cx) let mut value = UndefinedValue());
868        let is_closed = {
869            // If controller.[[queue]] is empty, return.
870            if self.queue.is_empty() {
871                return;
872            }
873            self.queue.peek_queue_value(cx, value.handle_mut(), can_gc)
874        };
875
876        if is_closed {
877            // If value is the close sentinel, perform ! WritableStreamDefaultControllerProcessClose(controller).
878            self.process_close(cx, global, can_gc);
879        } else {
880            // Otherwise, perform ! WritableStreamDefaultControllerProcessWrite(controller, value).
881            self.process_write(cx, value.handle(), global, can_gc);
882        };
883    }
884
885    /// <https://streams.spec.whatwg.org/#ws-default-controller-private-error>
886    pub(crate) fn perform_error_steps(&self) {
887        // Perform ! ResetQueue(this).
888        self.queue.reset();
889    }
890
891    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write>
892    fn process_write(
893        &self,
894        cx: SafeJSContext,
895        chunk: SafeHandleValue,
896        global: &GlobalScope,
897        can_gc: CanGc,
898    ) {
899        // Let stream be controller.[[stream]].
900        let Some(stream) = self.stream.get() else {
901            unreachable!("Controller should have a stream");
902        };
903
904        // Perform ! WritableStreamMarkFirstWriteRequestInFlight(stream).
905        stream.mark_first_write_request_in_flight();
906
907        // Let sinkWritePromise be the result of performing controller.[[writeAlgorithm]], passing in chunk.
908        let sink_write_promise = self.call_write_algorithm(cx, chunk, global, can_gc);
909
910        // Upon fulfillment of sinkWritePromise,
911        rooted!(in(*cx) let mut fulfillment_handler = Some(WriteAlgorithmFulfillmentHandler {
912            controller: Dom::from_ref(self),
913        }));
914
915        // Upon rejection of sinkWritePromise with reason,
916        rooted!(in(*cx) let mut rejection_handler = Some(WriteAlgorithmRejectionHandler {
917            controller: Dom::from_ref(self),
918        }));
919
920        // Attach handlers to the promise.
921        let handler = PromiseNativeHandler::new(
922            global,
923            fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
924            rejection_handler.take().map(|h| Box::new(h) as Box<_>),
925            can_gc,
926        );
927        let realm = enter_realm(global);
928        let comp = InRealm::Entered(&realm);
929        sink_write_promise.append_native_handler(&handler, comp, can_gc);
930    }
931
932    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-get-desired-size>
933    pub(crate) fn get_desired_size(&self) -> f64 {
934        // Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
935        let desired_size = self.strategy_hwm - self.queue.total_size.get().clamp(0.0, f64::MAX);
936        desired_size.clamp(desired_size, self.strategy_hwm)
937    }
938
939    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-get-backpressure>
940    fn get_backpressure(&self) -> bool {
941        // Let desiredSize be ! WritableStreamDefaultControllerGetDesiredSize(controller).
942        let desired_size = self.get_desired_size();
943
944        // Return true if desiredSize ≤ 0, or false otherwise.
945        desired_size == 0.0 || desired_size.is_sign_negative()
946    }
947
948    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-get-chunk-size>
949    pub(crate) fn get_chunk_size(
950        &self,
951        cx: SafeJSContext,
952        global: &GlobalScope,
953        chunk: SafeHandleValue,
954        can_gc: CanGc,
955    ) -> f64 {
956        // If controller.[[strategySizeAlgorithm]] is undefined, then:
957        let Some(strategy_size) = self.strategy_size.borrow().clone() else {
958            // Assert: controller.[[stream]].[[state]] is not "writable".
959            let Some(stream) = self.stream.get() else {
960                unreachable!("Controller should have a stream");
961            };
962            assert!(!stream.is_writable());
963
964            // Return 1.
965            return 1.0;
966        };
967
968        // Let returnValue be the result of performing controller.[[strategySizeAlgorithm]],
969        // passing in chunk, and interpreting the result as a completion record.
970        let result = strategy_size.Call__(chunk, ExceptionHandling::Rethrow, can_gc);
971
972        match result {
973            // Let chunkSize be result.[[Value]].
974            Ok(size) => size,
975            Err(error) => {
976                // If result is an abrupt completion,
977
978                // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, returnValue.[[Value]]).
979                // Create a rooted value for the error.
980                rooted!(in(*cx) let mut rooted_error = UndefinedValue());
981                error.to_jsval(cx, global, rooted_error.handle_mut(), can_gc);
982                self.error_if_needed(cx, rooted_error.handle(), global, can_gc);
983
984                // Return 1.
985                1.0
986            },
987        }
988    }
989
990    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-write>
991    pub(crate) fn write(
992        &self,
993        cx: SafeJSContext,
994        global: &GlobalScope,
995        chunk: SafeHandleValue,
996        chunk_size: f64,
997        can_gc: CanGc,
998    ) {
999        // Let enqueueResult be EnqueueValueWithSize(controller, chunk, chunkSize).
1000        let enqueue_result = self
1001            .queue
1002            .enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize {
1003                value: Heap::boxed(chunk.get()),
1004                size: chunk_size,
1005            }));
1006
1007        // If enqueueResult is an abrupt completion,
1008        if let Err(error) = enqueue_result {
1009            // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, enqueueResult.[[Value]]).
1010            // Create a rooted value for the error.
1011            rooted!(in(*cx) let mut rooted_error = UndefinedValue());
1012            error.to_jsval(cx, global, rooted_error.handle_mut(), can_gc);
1013            self.error_if_needed(cx, rooted_error.handle(), global, can_gc);
1014
1015            // Return.
1016            return;
1017        }
1018
1019        // Let stream be controller.[[stream]].
1020        let Some(stream) = self.stream.get() else {
1021            unreachable!("Controller should have a stream");
1022        };
1023
1024        // If ! WritableStreamCloseQueuedOrInFlight(stream) is false and stream.[[state]] is "writable",
1025        if !stream.close_queued_or_in_flight() && stream.is_writable() {
1026            // Let backpressure be ! WritableStreamDefaultControllerGetBackpressure(controller).
1027            let backpressure = self.get_backpressure();
1028
1029            // Perform ! WritableStreamUpdateBackpressure(stream, backpressure).
1030            stream.update_backpressure(backpressure, global, can_gc);
1031        }
1032
1033        // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
1034        self.advance_queue_if_needed(cx, global, can_gc);
1035    }
1036
1037    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-error-if-needed>
1038    pub(crate) fn error_if_needed(
1039        &self,
1040        cx: SafeJSContext,
1041        error: SafeHandleValue,
1042        global: &GlobalScope,
1043        can_gc: CanGc,
1044    ) {
1045        // Let stream be controller.[[stream]].
1046        let Some(stream) = self.stream.get() else {
1047            unreachable!("Controller should have a stream");
1048        };
1049
1050        // If stream.[[state]] is "writable",
1051        if stream.is_writable() {
1052            // Perform ! WritableStreamDefaultControllerError(controller, e).
1053            self.error(&stream, cx, error, global, can_gc);
1054        }
1055    }
1056
1057    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-error>
1058    pub(crate) fn error(
1059        &self,
1060        stream: &WritableStream,
1061        cx: SafeJSContext,
1062        e: SafeHandleValue,
1063        global: &GlobalScope,
1064        can_gc: CanGc,
1065    ) {
1066        // Let stream be controller.[[stream]].
1067        // Done above with the argument.
1068
1069        // Assert: stream.[[state]] is "writable".
1070        assert!(stream.is_writable());
1071
1072        // Perform ! WritableStreamDefaultControllerClearAlgorithms(controller).
1073        self.clear_algorithms();
1074
1075        // Perform ! WritableStreamStartErroring(stream, error).
1076        stream.start_erroring(cx, global, e, can_gc);
1077    }
1078}
1079
1080impl WritableStreamDefaultControllerMethods<crate::DomTypeHolder>
1081    for WritableStreamDefaultController
1082{
1083    /// <https://streams.spec.whatwg.org/#ws-default-controller-error>
1084    fn Error(&self, cx: SafeJSContext, e: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
1085        // Let state be this.[[stream]].[[state]].
1086        let Some(stream) = self.stream.get() else {
1087            unreachable!("Controller should have a stream");
1088        };
1089
1090        // If state is not "writable", return.
1091        if !stream.is_writable() {
1092            return;
1093        }
1094
1095        let global = GlobalScope::from_safe_context(cx, realm);
1096
1097        // Perform ! WritableStreamDefaultControllerError(this, e).
1098        self.error(&stream, cx, e, &global, can_gc);
1099    }
1100
1101    /// <https://streams.spec.whatwg.org/#ws-default-controller-signal>
1102    fn Signal(&self) -> DomRoot<AbortSignal> {
1103        // Return this.[[abortController]]’s signal.
1104        self.abort_controller.signal()
1105    }
1106}