Skip to main content

script/dom/stream/
writablestreamdefaultcontroller.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::ptr;
7use std::rc::Rc;
8
9use dom_struct::dom_struct;
10use js::context::JSContext;
11use js::jsapi::{Heap, IsPromiseObject, JSObject};
12use js::jsval::{JSVal, UndefinedValue};
13use js::realm::CurrentRealm;
14use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle};
15use script_bindings::reflector::{Reflector, reflect_dom_object};
16
17use crate::dom::bindings::callback::ExceptionHandling;
18use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
19use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::{
20    UnderlyingSinkAbortCallback, UnderlyingSinkCloseCallback, UnderlyingSinkStartCallback,
21    UnderlyingSinkWriteCallback,
22};
23use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultControllerBinding::WritableStreamDefaultControllerMethods;
24use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
25use crate::dom::bindings::reflector::DomGlobal;
26use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
27use crate::dom::globalscope::GlobalScope;
28use crate::dom::messageport::MessagePort;
29use crate::dom::promise::Promise;
30use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
31use crate::dom::readablestreamdefaultcontroller::{EnqueuedValue, QueueWithSizes, ValueWithSize};
32use crate::dom::stream::writablestream::WritableStream;
33use crate::dom::types::{AbortController, AbortSignal, TransformStream};
34use crate::realms::enter_auto_realm;
35use crate::script_runtime::CanGc;
36
37impl js::gc::Rootable for CloseAlgorithmFulfillmentHandler {}
38
39/// The fulfillment handler for
40/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close>
41#[derive(Clone, JSTraceable, MallocSizeOf)]
42#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
43struct CloseAlgorithmFulfillmentHandler {
44    stream: Dom<WritableStream>,
45}
46
47impl Callback for CloseAlgorithmFulfillmentHandler {
48    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
49        let can_gc = CanGc::from_cx(cx);
50        let stream = self.stream.as_rooted();
51
52        // Perform ! WritableStreamFinishInFlightClose(stream).
53        stream.finish_in_flight_close(cx.into(), can_gc);
54    }
55}
56
57impl js::gc::Rootable for CloseAlgorithmRejectionHandler {}
58
59/// The rejection handler for
60/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close>
61#[derive(Clone, JSTraceable, MallocSizeOf)]
62#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
63struct CloseAlgorithmRejectionHandler {
64    stream: Dom<WritableStream>,
65}
66
67impl Callback for CloseAlgorithmRejectionHandler {
68    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
69        let stream = self.stream.as_rooted();
70
71        let global = GlobalScope::from_current_realm(cx);
72
73        // Perform ! WritableStreamFinishInFlightCloseWithError(stream, reason).
74        stream.finish_in_flight_close_with_error(cx, &global, v);
75    }
76}
77
78impl js::gc::Rootable for StartAlgorithmFulfillmentHandler {}
79
80/// The fulfillment handler for
81/// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
82#[derive(Clone, JSTraceable, MallocSizeOf)]
83#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
84struct StartAlgorithmFulfillmentHandler {
85    controller: Dom<WritableStreamDefaultController>,
86}
87
88impl Callback for StartAlgorithmFulfillmentHandler {
89    /// Continuation of <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
90    /// Upon fulfillment of startPromise,
91    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
92        let controller = self.controller.as_rooted();
93        let stream = controller
94            .stream
95            .get()
96            .expect("Controller should have a stream.");
97
98        // Assert: stream.[[state]] is "writable" or "erroring".
99        assert!(stream.is_erroring() || stream.is_writable());
100
101        // Set controller.[[started]] to true.
102        controller.started.set(true);
103
104        let global = GlobalScope::from_current_realm(cx);
105
106        // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
107        controller.advance_queue_if_needed(cx, &global)
108    }
109}
110
111impl js::gc::Rootable for StartAlgorithmRejectionHandler {}
112
113/// The rejection handler for
114/// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
115#[derive(Clone, JSTraceable, MallocSizeOf)]
116#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
117struct StartAlgorithmRejectionHandler {
118    controller: Dom<WritableStreamDefaultController>,
119}
120
121impl Callback for StartAlgorithmRejectionHandler {
122    /// Continuation of <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
123    /// Upon rejection of startPromise with reason r,
124    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
125        let controller = self.controller.as_rooted();
126        let stream = controller
127            .stream
128            .get()
129            .expect("Controller should have a stream.");
130
131        // Assert: stream.[[state]] is "writable" or "erroring".
132        assert!(stream.is_erroring() || stream.is_writable());
133
134        // Set controller.[[started]] to true.
135        controller.started.set(true);
136
137        let global = GlobalScope::from_current_realm(cx);
138
139        // Perform ! WritableStreamDealWithRejection(stream, r).
140        stream.deal_with_rejection(cx, &global, v);
141    }
142}
143
144impl js::gc::Rootable for TransferBackPressurePromiseReaction {}
145
146/// Reacting to backpressurePromise as part of the `writeAlgorithm` of
147/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
148#[derive(JSTraceable, MallocSizeOf)]
149#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
150struct TransferBackPressurePromiseReaction {
151    /// The result of reacting to backpressurePromise.
152    #[conditional_malloc_size_of]
153    result_promise: Rc<Promise>,
154
155    /// The backpressurePromise.
156    #[ignore_malloc_size_of = "nested Rc"]
157    backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
158
159    /// The chunk received by the `writeAlgorithm`.
160    #[ignore_malloc_size_of = "mozjs"]
161    chunk: Box<Heap<JSVal>>,
162
163    /// The port used in the algorithm.
164    port: Dom<MessagePort>,
165}
166
167impl Callback for TransferBackPressurePromiseReaction {
168    /// Reacting to backpressurePromise with the following fulfillment steps:
169    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
170        let can_gc = CanGc::from_cx(cx);
171        let global = self.result_promise.global();
172        // Set backpressurePromise to a new promise.
173        let promise = Promise::new2(cx, &global);
174        *self.backpressure_promise.borrow_mut() = Some(promise);
175
176        // Let result be PackAndPostMessageHandlingError(port, "chunk", chunk).
177        rooted!(&in(cx) let mut chunk = UndefinedValue());
178        chunk.set(self.chunk.get());
179        let result = self
180            .port
181            .pack_and_post_message_handling_error(cx, "chunk", chunk.handle());
182
183        // If result is an abrupt completion,
184        if let Err(error) = result {
185            // Disentangle port.
186            global.disentangle_port(cx, &self.port);
187
188            // Return a promise rejected with result.[[Value]].
189            self.result_promise.reject_error(error, can_gc);
190        } else {
191            // Otherwise, return a promise resolved with undefined.
192            self.result_promise.resolve_native(&(), can_gc);
193        }
194    }
195}
196
197impl js::gc::Rootable for WriteAlgorithmFulfillmentHandler {}
198
199/// The fulfillment handler for
200/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write>
201#[derive(Clone, JSTraceable, MallocSizeOf)]
202#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
203struct WriteAlgorithmFulfillmentHandler {
204    controller: Dom<WritableStreamDefaultController>,
205}
206
207impl Callback for WriteAlgorithmFulfillmentHandler {
208    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
209        let can_gc = CanGc::from_cx(cx);
210        let controller = self.controller.as_rooted();
211        let stream = controller
212            .stream
213            .get()
214            .expect("Controller should have a stream.");
215
216        // Perform ! WritableStreamFinishInFlightWrite(stream).
217        stream.finish_in_flight_write(can_gc);
218
219        // Let state be stream.[[state]].
220        // Assert: state is "writable" or "erroring".
221        assert!(stream.is_erroring() || stream.is_writable());
222
223        // Perform ! DequeueValue(controller).
224        rooted!(&in(cx) let mut rval = UndefinedValue());
225        controller
226            .queue
227            .dequeue_value(cx.into(), Some(rval.handle_mut()), can_gc);
228
229        let global = GlobalScope::from_current_realm(cx);
230
231        // If ! WritableStreamCloseQueuedOrInFlight(stream) is false and state is "writable",
232        if !stream.close_queued_or_in_flight() && stream.is_writable() {
233            // Let backpressure be ! WritableStreamDefaultControllerGetBackpressure(controller).
234            let backpressure = controller.get_backpressure();
235
236            // Perform ! WritableStreamUpdateBackpressure(stream, backpressure).
237            stream.update_backpressure(backpressure, &global, can_gc);
238        }
239
240        // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
241        controller.advance_queue_if_needed(cx, &global)
242    }
243}
244
245impl js::gc::Rootable for WriteAlgorithmRejectionHandler {}
246
247/// The rejection handler for
248/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write>
249#[derive(Clone, JSTraceable, MallocSizeOf)]
250#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
251struct WriteAlgorithmRejectionHandler {
252    controller: Dom<WritableStreamDefaultController>,
253}
254
255impl Callback for WriteAlgorithmRejectionHandler {
256    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
257        let controller = self.controller.as_rooted();
258        let stream = controller
259            .stream
260            .get()
261            .expect("Controller should have a stream.");
262
263        // If stream.[[state]] is "writable",
264        if stream.is_writable() {
265            // perform ! WritableStreamDefaultControllerClearAlgorithms(controller).
266            controller.clear_algorithms();
267        }
268
269        let global = GlobalScope::from_current_realm(cx);
270
271        // Perform ! WritableStreamFinishInFlightWriteWithError(stream, reason).
272        stream.finish_in_flight_write_with_error(cx, &global, v);
273    }
274}
275
276/// The type of sink algorithms we are using.
277#[derive(JSTraceable, PartialEq)]
278#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
279pub enum UnderlyingSinkType {
280    /// Algorithms are provided by Js callbacks.
281    Js {
282        /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-abortalgorithm>
283        abort: RefCell<Option<Rc<UnderlyingSinkAbortCallback>>>,
284
285        start: RefCell<Option<Rc<UnderlyingSinkStartCallback>>>,
286
287        /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-closealgorithm>
288        close: RefCell<Option<Rc<UnderlyingSinkCloseCallback>>>,
289
290        /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-writealgorithm>
291        write: RefCell<Option<Rc<UnderlyingSinkWriteCallback>>>,
292    },
293    /// Algorithms supporting streams transfer are implemented in Rust.
294    /// The promise and port used in those algorithms are stored here.
295    Transfer {
296        backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
297        port: Dom<MessagePort>,
298    },
299    /// Algorithms supporting transform streams are implemented in Rust.
300    Transform(Dom<TransformStream>, Rc<Promise>),
301}
302
303impl UnderlyingSinkType {
304    pub(crate) fn new_js(
305        abort: Option<Rc<UnderlyingSinkAbortCallback>>,
306        start: Option<Rc<UnderlyingSinkStartCallback>>,
307        close: Option<Rc<UnderlyingSinkCloseCallback>>,
308        write: Option<Rc<UnderlyingSinkWriteCallback>>,
309    ) -> Self {
310        UnderlyingSinkType::Js {
311            abort: RefCell::new(abort),
312            start: RefCell::new(start),
313            close: RefCell::new(close),
314            write: RefCell::new(write),
315        }
316    }
317}
318
319/// <https://streams.spec.whatwg.org/#ws-default-controller-class>
320#[dom_struct]
321pub struct WritableStreamDefaultController {
322    reflector_: Reflector,
323
324    /// The type of underlying sink used. Besides the default JS one,
325    /// there will be others for stream transfer, and for transform stream.
326    #[ignore_malloc_size_of = "underlying_sink_type"]
327    underlying_sink_type: UnderlyingSinkType,
328
329    /// The JS object used as `this` when invoking sink algorithms.
330    #[ignore_malloc_size_of = "mozjs"]
331    underlying_sink_obj: Heap<*mut JSObject>,
332
333    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-queue>
334    queue: QueueWithSizes,
335
336    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-started>
337    started: Cell<bool>,
338
339    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-strategyhwm>
340    strategy_hwm: f64,
341
342    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-strategysizealgorithm>
343    #[ignore_malloc_size_of = "QueuingStrategySize"]
344    strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>,
345
346    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-stream>
347    stream: MutNullableDom<WritableStream>,
348
349    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-abortcontroller>
350    abort_controller: Dom<AbortController>,
351}
352
353impl WritableStreamDefaultController {
354    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink>
355    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
356    fn new_inherited(
357        global: &GlobalScope,
358        underlying_sink_type: UnderlyingSinkType,
359        strategy_hwm: f64,
360        strategy_size: Rc<QueuingStrategySize>,
361        can_gc: CanGc,
362    ) -> WritableStreamDefaultController {
363        WritableStreamDefaultController {
364            reflector_: Reflector::new(),
365            underlying_sink_type,
366            queue: Default::default(),
367            stream: Default::default(),
368            underlying_sink_obj: Default::default(),
369            strategy_hwm,
370            strategy_size: RefCell::new(Some(strategy_size)),
371            started: Default::default(),
372            abort_controller: Dom::from_ref(&AbortController::new_with_proto(global, None, can_gc)),
373        }
374    }
375
376    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
377    pub(crate) fn new(
378        global: &GlobalScope,
379        underlying_sink_type: UnderlyingSinkType,
380        strategy_hwm: f64,
381        strategy_size: Rc<QueuingStrategySize>,
382        can_gc: CanGc,
383    ) -> DomRoot<WritableStreamDefaultController> {
384        reflect_dom_object(
385            Box::new(WritableStreamDefaultController::new_inherited(
386                global,
387                underlying_sink_type,
388                strategy_hwm,
389                strategy_size,
390                can_gc,
391            )),
392            global,
393            can_gc,
394        )
395    }
396
397    pub(crate) fn started(&self) -> bool {
398        self.started.get()
399    }
400
401    /// Setting the JS object after the heap has settled down.
402    pub(crate) fn set_underlying_sink_this_object(&self, this_object: SafeHandleObject) {
403        self.underlying_sink_obj.set(*this_object);
404    }
405
406    /// "Signal abort" call from <https://streams.spec.whatwg.org/#writable-stream-abort>
407    pub(crate) fn signal_abort(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) {
408        self.abort_controller.signal_abort(cx, reason);
409    }
410
411    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-clear-algorithms>
412    fn clear_algorithms(&self) {
413        match &self.underlying_sink_type {
414            UnderlyingSinkType::Js {
415                abort,
416                start: _,
417                close,
418                write,
419            } => {
420                // Set controller.[[writeAlgorithm]] to undefined.
421                write.borrow_mut().take();
422
423                // Set controller.[[closeAlgorithm]] to undefined.
424                close.borrow_mut().take();
425
426                // Set controller.[[abortAlgorithm]] to undefined.
427                abort.borrow_mut().take();
428            },
429            UnderlyingSinkType::Transfer {
430                backpressure_promise,
431                ..
432            } => {
433                backpressure_promise.borrow_mut().take();
434            },
435            UnderlyingSinkType::Transform(_, _) => {
436                return;
437            },
438        }
439
440        // Set controller.[[strategySizeAlgorithm]] to undefined.
441        self.strategy_size.borrow_mut().take();
442    }
443
444    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
445    pub(crate) fn setup(
446        &self,
447        cx: &mut JSContext,
448        global: &GlobalScope,
449        stream: &WritableStream,
450    ) -> Result<(), Error> {
451        // Assert: stream implements WritableStream.
452        // Implied by stream type.
453
454        // Assert: stream.[[controller]] is undefined.
455        stream.assert_no_controller();
456
457        // Set controller.[[stream]] to stream.
458        self.stream.set(Some(stream));
459
460        // Set stream.[[controller]] to controller.
461        stream.set_default_controller(self);
462
463        // Perform ! ResetQueue(controller).
464
465        // Set controller.[[abortController]] to a new AbortController.
466
467        // Set controller.[[started]] to false.
468
469        // Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm.
470
471        // Set controller.[[strategyHWM]] to highWaterMark.
472
473        // Set controller.[[writeAlgorithm]] to writeAlgorithm.
474
475        // Set controller.[[closeAlgorithm]] to closeAlgorithm.
476
477        // Set controller.[[abortAlgorithm]] to abortAlgorithm.
478
479        // Note: above steps are done in `new_inherited`.
480
481        // Let backpressure be ! WritableStreamDefaultControllerGetBackpressure(controller).
482        let backpressure = self.get_backpressure();
483
484        // Perform ! WritableStreamUpdateBackpressure(stream, backpressure).
485        stream.update_backpressure(backpressure, global, CanGc::from_cx(cx));
486
487        // Let startResult be the result of performing startAlgorithm. (This may throw an exception.)
488        // Let startPromise be a promise resolved with startResult.
489        let start_promise = self.start_algorithm(cx, global)?;
490
491        let rooted_default_controller = DomRoot::from_ref(self);
492
493        // Upon fulfillment of startPromise,
494        rooted!(&in(cx) let mut fulfillment_handler = Some(StartAlgorithmFulfillmentHandler {
495            controller: Dom::from_ref(&rooted_default_controller),
496        }));
497
498        // Upon rejection of startPromise with reason r,
499        rooted!(&in(cx) let mut rejection_handler = Some(StartAlgorithmRejectionHandler {
500            controller: Dom::from_ref(&rooted_default_controller),
501        }));
502
503        let handler = PromiseNativeHandler::new(
504            global,
505            fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
506            rejection_handler.take().map(|h| Box::new(h) as Box<_>),
507            CanGc::from_cx(cx),
508        );
509        let mut realm = enter_auto_realm(cx, global);
510        let cx = &mut realm.current_realm();
511        start_promise.append_native_handler(cx, &handler);
512
513        Ok(())
514    }
515
516    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-close>
517    pub(crate) fn close(&self, cx: &mut JSContext, global: &GlobalScope) {
518        // Perform ! EnqueueValueWithSize(controller, close sentinel, 0).
519        self.queue
520            .enqueue_value_with_size(EnqueuedValue::CloseSentinel)
521            .expect("Enqueuing the close sentinel should not fail.");
522        // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
523        self.advance_queue_if_needed(cx, global);
524    }
525
526    #[expect(unsafe_code)]
527    fn start_algorithm(&self, cx: &mut JSContext, global: &GlobalScope) -> Fallible<Rc<Promise>> {
528        match &self.underlying_sink_type {
529            UnderlyingSinkType::Js {
530                start,
531                abort: _,
532                close: _,
533                write: _,
534            } => {
535                let algo = start.borrow().clone();
536                let start_promise = if let Some(start) = algo {
537                    rooted!(&in(cx) let mut result_object = ptr::null_mut::<JSObject>());
538                    rooted!(&in(cx) let mut result: JSVal);
539                    rooted!(&in(cx) let this_object = self.underlying_sink_obj.get());
540                    start.Call_(
541                        cx,
542                        &this_object.handle(),
543                        self,
544                        result.handle_mut(),
545                        ExceptionHandling::Rethrow,
546                    )?;
547                    let is_promise = unsafe {
548                        if result.is_object() {
549                            result_object.set(result.to_object());
550                            IsPromiseObject(result_object.handle().into_handle())
551                        } else {
552                            false
553                        }
554                    };
555                    if is_promise {
556                        Promise::new_with_js_promise(result_object.handle(), cx.into())
557                    } else {
558                        Promise::new_resolved(global, cx.into(), result.get(), CanGc::from_cx(cx))
559                    }
560                } else {
561                    // Let startAlgorithm be an algorithm that returns undefined.
562                    Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
563                };
564
565                Ok(start_promise)
566            },
567            UnderlyingSinkType::Transfer { .. } => {
568                // Let startAlgorithm be an algorithm that returns undefined.
569                Ok(Promise::new_resolved(
570                    global,
571                    cx.into(),
572                    (),
573                    CanGc::from_cx(cx),
574                ))
575            },
576            UnderlyingSinkType::Transform(_, start_promise) => {
577                // Let startAlgorithm be an algorithm that returns startPromise.
578                Ok(start_promise.clone())
579            },
580        }
581    }
582
583    /// <https://streams.spec.whatwg.org/#ref-for-abstract-opdef-writablestreamcontroller-abortsteps>
584    pub(crate) fn abort_steps(
585        &self,
586        cx: &mut JSContext,
587        global: &GlobalScope,
588        reason: SafeHandleValue,
589    ) -> Rc<Promise> {
590        let result = match &self.underlying_sink_type {
591            UnderlyingSinkType::Js {
592                abort,
593                start: _,
594                close: _,
595                write: _,
596            } => {
597                rooted!(&in(cx) let this_object = self.underlying_sink_obj.get());
598                let algo = abort.borrow().clone();
599                // Let result be the result of performing this.[[abortAlgorithm]], passing reason.
600                let result = if let Some(algo) = algo {
601                    algo.Call_(
602                        cx,
603                        &this_object.handle(),
604                        Some(reason),
605                        ExceptionHandling::Rethrow,
606                    )
607                } else {
608                    Ok(Promise::new_resolved(
609                        global,
610                        cx.into(),
611                        (),
612                        CanGc::from_cx(cx),
613                    ))
614                };
615                result.unwrap_or_else(|e| {
616                    let promise = Promise::new(global, CanGc::from_cx(cx));
617                    promise.reject_error(e, CanGc::from_cx(cx));
618                    promise
619                })
620            },
621            UnderlyingSinkType::Transfer { port, .. } => {
622                // The steps from the `abortAlgorithm` at
623                // <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
624
625                // Let result be PackAndPostMessageHandlingError(port, "error", reason).
626                let result = port.pack_and_post_message_handling_error(cx, "error", reason);
627
628                // Disentangle port.
629                global.disentangle_port(cx, port);
630
631                let promise = Promise::new(global, CanGc::from_cx(cx));
632
633                // If result is an abrupt completion, return a promise rejected with result.[[Value]]
634                if let Err(error) = result {
635                    promise.reject_error(error, CanGc::from_cx(cx));
636                } else {
637                    // Otherwise, return a promise resolved with undefined.
638                    promise.resolve_native(&(), CanGc::from_cx(cx));
639                }
640                promise
641            },
642            UnderlyingSinkType::Transform(stream, _) => {
643                // Return ! TransformStreamDefaultSinkAbortAlgorithm(stream, reason).
644                stream
645                    .transform_stream_default_sink_abort_algorithm(cx, global, reason)
646                    .expect("Transform stream default sink abort algorithm should not fail.")
647            },
648        };
649
650        // Perform ! WritableStreamDefaultControllerClearAlgorithms(controller).
651        self.clear_algorithms();
652
653        result
654    }
655
656    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-writealgorithm>
657    fn call_write_algorithm(
658        &self,
659        cx: &mut JSContext,
660        chunk: SafeHandleValue,
661        global: &GlobalScope,
662    ) -> Rc<Promise> {
663        match &self.underlying_sink_type {
664            UnderlyingSinkType::Js {
665                abort: _,
666                start: _,
667                close: _,
668                write,
669            } => {
670                rooted!(&in(cx) let this_object = self.underlying_sink_obj.get());
671                let algo = write.borrow().clone();
672                let result = if let Some(algo) = algo {
673                    algo.Call_(
674                        cx,
675                        &this_object.handle(),
676                        chunk,
677                        self,
678                        ExceptionHandling::Rethrow,
679                    )
680                } else {
681                    Ok(Promise::new_resolved(
682                        global,
683                        cx.into(),
684                        (),
685                        CanGc::from_cx(cx),
686                    ))
687                };
688                result.unwrap_or_else(|e| {
689                    let promise = Promise::new2(cx, global);
690                    promise.reject_error(e, CanGc::from_cx(cx));
691                    promise
692                })
693            },
694            UnderlyingSinkType::Transfer {
695                backpressure_promise,
696                port,
697            } => {
698                // The steps from the `writeAlgorithm` at
699                // <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
700
701                // If backpressurePromise is undefined,
702                // set backpressurePromise to a promise resolved with undefined.
703                if backpressure_promise.borrow().is_none() {
704                    let promise = Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
705                    *backpressure_promise.borrow_mut() = Some(promise);
706                }
707
708                // Return the result of reacting to backpressurePromise with the following fulfillment steps:
709                let result_promise = Promise::new2(cx, global);
710                rooted!(&in(cx) let mut fulfillment_handler = Some(TransferBackPressurePromiseReaction {
711                    port: port.clone(),
712                    backpressure_promise: backpressure_promise.clone(),
713                    chunk: Heap::boxed(chunk.get()),
714                    result_promise: result_promise.clone(),
715                }));
716                let handler = PromiseNativeHandler::new(
717                    global,
718                    fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
719                    None,
720                    CanGc::from_cx(cx),
721                );
722                let mut realm = enter_auto_realm(cx, global);
723                let realm = &mut realm.current_realm();
724                backpressure_promise
725                    .borrow()
726                    .as_ref()
727                    .expect("Promise must be some by now.")
728                    .append_native_handler(realm, &handler);
729                result_promise
730            },
731            UnderlyingSinkType::Transform(stream, _) => {
732                // Return ! TransformStreamDefaultSinkWriteAlgorithm(stream, chunk).
733                stream
734                    .transform_stream_default_sink_write_algorithm(cx, global, chunk)
735                    .expect("Transform stream default sink write algorithm should not fail.")
736            },
737        }
738    }
739
740    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-closealgorithm>
741    fn call_close_algorithm(&self, cx: &mut JSContext, global: &GlobalScope) -> Rc<Promise> {
742        match &self.underlying_sink_type {
743            UnderlyingSinkType::Js {
744                abort: _,
745                start: _,
746                close,
747                write: _,
748            } => {
749                rooted!(&in(cx) let mut this_object = ptr::null_mut::<JSObject>());
750                this_object.set(self.underlying_sink_obj.get());
751                let algo = close.borrow().clone();
752                let result = if let Some(algo) = algo {
753                    algo.Call_(cx, &this_object.handle(), ExceptionHandling::Rethrow)
754                } else {
755                    Ok(Promise::new_resolved(
756                        global,
757                        cx.into(),
758                        (),
759                        CanGc::from_cx(cx),
760                    ))
761                };
762                result.unwrap_or_else(|e| {
763                    let promise = Promise::new2(cx, global);
764                    promise.reject_error(e, CanGc::from_cx(cx));
765                    promise
766                })
767            },
768            UnderlyingSinkType::Transfer { port, .. } => {
769                // The steps from the `closeAlgorithm` at
770                // <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
771
772                // Perform ! PackAndPostMessage(port, "close", undefined).
773                rooted!(&in(cx) let mut value = UndefinedValue());
774                port.pack_and_post_message(cx, "close", value.handle())
775                    .expect("Sending close should not fail.");
776
777                // Disentangle port.
778                global.disentangle_port(cx, port);
779
780                // Return a promise resolved with undefined.
781                Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
782            },
783            UnderlyingSinkType::Transform(stream, _) => {
784                // Return ! TransformStreamDefaultSinkCloseAlgorithm(stream).
785                stream
786                    .transform_stream_default_sink_close_algorithm(cx, global)
787                    .expect("Transform stream default sink close algorithm should not fail.")
788            },
789        }
790    }
791
792    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close>
793    pub(crate) fn process_close(&self, cx: &mut JSContext, global: &GlobalScope) {
794        // Let stream be controller.[[stream]].
795        let Some(stream) = self.stream.get() else {
796            unreachable!("Controller should have a stream");
797        };
798
799        // Perform ! WritableStreamMarkCloseRequestInFlight(stream).
800        stream.mark_close_request_in_flight();
801
802        // Perform ! DequeueValue(controller).
803        self.queue
804            .dequeue_value(cx.into(), None, CanGc::from_cx(cx));
805
806        // Assert: controller.[[queue]] is empty.
807        assert!(self.queue.is_empty());
808
809        // Let sinkClosePromise be the result of performing controller.[[closeAlgorithm]].
810        let sink_close_promise = self.call_close_algorithm(cx, global);
811
812        // Perform ! WritableStreamDefaultControllerClearAlgorithms(controller).
813        self.clear_algorithms();
814
815        // Upon fulfillment of sinkClosePromise,
816        rooted!(&in(cx) let mut fulfillment_handler = Some(CloseAlgorithmFulfillmentHandler {
817            stream: Dom::from_ref(&stream),
818        }));
819
820        // Upon rejection of sinkClosePromise with reason reason,
821        rooted!(&in(cx) let mut rejection_handler = Some(CloseAlgorithmRejectionHandler {
822            stream: Dom::from_ref(&stream),
823        }));
824
825        // Attach handlers to the promise.
826        let handler = PromiseNativeHandler::new(
827            global,
828            fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
829            rejection_handler.take().map(|h| Box::new(h) as Box<_>),
830            CanGc::from_cx(cx),
831        );
832        let mut realm = enter_auto_realm(cx, global);
833        let realm = &mut realm.current_realm();
834        sink_close_promise.append_native_handler(realm, &handler);
835    }
836
837    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-advance-queue-if-needed>
838    fn advance_queue_if_needed(&self, cx: &mut JSContext, global: &GlobalScope) {
839        // Let stream be controller.[[stream]].
840        let Some(stream) = self.stream.get() else {
841            unreachable!("Controller should have a stream");
842        };
843
844        // If controller.[[started]] is false, return.
845        if !self.started.get() {
846            return;
847        }
848
849        // If stream.[[inFlightWriteRequest]] is not undefined, return.
850        if stream.has_in_flight_write_request() {
851            return;
852        }
853
854        // Let state be stream.[[state]].
855
856        // Assert: state is not "closed" or "errored".
857        assert!(!(stream.is_errored() || stream.is_closed()));
858
859        // If state is "erroring",
860        if stream.is_erroring() {
861            // Perform ! WritableStreamFinishErroring(stream).
862            stream.finish_erroring(cx, global);
863
864            // Return.
865            return;
866        }
867
868        // Let value be ! PeekQueueValue(controller).
869        rooted!(&in(cx) let mut value = UndefinedValue());
870        let is_closed = {
871            // If controller.[[queue]] is empty, return.
872            if self.queue.is_empty() {
873                return;
874            }
875            self.queue
876                .peek_queue_value(cx.into(), value.handle_mut(), CanGc::from_cx(cx))
877        };
878
879        if is_closed {
880            // If value is the close sentinel, perform ! WritableStreamDefaultControllerProcessClose(controller).
881            self.process_close(cx, global);
882        } else {
883            // Otherwise, perform ! WritableStreamDefaultControllerProcessWrite(controller, value).
884            self.process_write(cx, value.handle(), global);
885        };
886    }
887
888    /// <https://streams.spec.whatwg.org/#ws-default-controller-private-error>
889    pub(crate) fn perform_error_steps(&self) {
890        // Perform ! ResetQueue(this).
891        self.queue.reset();
892    }
893
894    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write>
895    fn process_write(&self, cx: &mut JSContext, chunk: SafeHandleValue, global: &GlobalScope) {
896        // Let stream be controller.[[stream]].
897        let Some(stream) = self.stream.get() else {
898            unreachable!("Controller should have a stream");
899        };
900
901        // Perform ! WritableStreamMarkFirstWriteRequestInFlight(stream).
902        stream.mark_first_write_request_in_flight();
903
904        // Let sinkWritePromise be the result of performing controller.[[writeAlgorithm]], passing in chunk.
905        let sink_write_promise = self.call_write_algorithm(cx, chunk, global);
906
907        // Upon fulfillment of sinkWritePromise,
908        rooted!(&in(cx) let mut fulfillment_handler = Some(WriteAlgorithmFulfillmentHandler {
909            controller: Dom::from_ref(self),
910        }));
911
912        // Upon rejection of sinkWritePromise with reason,
913        rooted!(&in(cx) let mut rejection_handler = Some(WriteAlgorithmRejectionHandler {
914            controller: Dom::from_ref(self),
915        }));
916
917        // Attach handlers to the promise.
918        let handler = PromiseNativeHandler::new(
919            global,
920            fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
921            rejection_handler.take().map(|h| Box::new(h) as Box<_>),
922            CanGc::from_cx(cx),
923        );
924        let mut realm = enter_auto_realm(cx, global);
925        let realm = &mut realm.current_realm();
926        sink_write_promise.append_native_handler(realm, &handler);
927    }
928
929    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-get-desired-size>
930    pub(crate) fn get_desired_size(&self) -> f64 {
931        // Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
932        let desired_size = self.strategy_hwm - self.queue.total_size.get().clamp(0.0, f64::MAX);
933        desired_size.clamp(desired_size, self.strategy_hwm)
934    }
935
936    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-get-backpressure>
937    fn get_backpressure(&self) -> bool {
938        // Let desiredSize be ! WritableStreamDefaultControllerGetDesiredSize(controller).
939        let desired_size = self.get_desired_size();
940
941        // Return true if desiredSize ≤ 0, or false otherwise.
942        desired_size == 0.0 || desired_size.is_sign_negative()
943    }
944
945    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-get-chunk-size>
946    pub(crate) fn get_chunk_size(
947        &self,
948        cx: &mut JSContext,
949        global: &GlobalScope,
950        chunk: SafeHandleValue,
951    ) -> f64 {
952        // If controller.[[strategySizeAlgorithm]] is undefined, then:
953        let Some(strategy_size) = self.strategy_size.borrow().clone() else {
954            // Assert: controller.[[stream]].[[state]] is not "writable".
955            let Some(stream) = self.stream.get() else {
956                unreachable!("Controller should have a stream");
957            };
958            assert!(!stream.is_writable());
959
960            // Return 1.
961            return 1.0;
962        };
963
964        // Let returnValue be the result of performing controller.[[strategySizeAlgorithm]],
965        // passing in chunk, and interpreting the result as a completion record.
966        let result = strategy_size.Call__(cx, chunk, ExceptionHandling::Rethrow);
967
968        match result {
969            // Let chunkSize be result.[[Value]].
970            Ok(size) => size,
971            Err(error) => {
972                // If result is an abrupt completion,
973
974                // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, returnValue.[[Value]]).
975                // Create a rooted value for the error.
976                rooted!(&in(cx) let mut rooted_error = UndefinedValue());
977                error.to_jsval(
978                    cx.into(),
979                    global,
980                    rooted_error.handle_mut(),
981                    CanGc::from_cx(cx),
982                );
983                self.error_if_needed(cx, rooted_error.handle(), global);
984
985                // Return 1.
986                1.0
987            },
988        }
989    }
990
991    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-write>
992    pub(crate) fn write(
993        &self,
994        cx: &mut JSContext,
995        global: &GlobalScope,
996        chunk: SafeHandleValue,
997        chunk_size: f64,
998    ) {
999        // Let enqueueResult be EnqueueValueWithSize(controller, chunk, chunkSize).
1000        let enqueue_result = self
1001            .queue
1002            .enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize {
1003                value: Heap::boxed(chunk.get()),
1004                size: chunk_size,
1005            }));
1006
1007        // If enqueueResult is an abrupt completion,
1008        if let Err(error) = enqueue_result {
1009            // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, enqueueResult.[[Value]]).
1010            // Create a rooted value for the error.
1011            rooted!(&in(cx) let mut rooted_error = UndefinedValue());
1012            error.to_jsval(
1013                cx.into(),
1014                global,
1015                rooted_error.handle_mut(),
1016                CanGc::from_cx(cx),
1017            );
1018            self.error_if_needed(cx, rooted_error.handle(), global);
1019
1020            // Return.
1021            return;
1022        }
1023
1024        // Let stream be controller.[[stream]].
1025        let Some(stream) = self.stream.get() else {
1026            unreachable!("Controller should have a stream");
1027        };
1028
1029        // If ! WritableStreamCloseQueuedOrInFlight(stream) is false and stream.[[state]] is "writable",
1030        if !stream.close_queued_or_in_flight() && stream.is_writable() {
1031            // Let backpressure be ! WritableStreamDefaultControllerGetBackpressure(controller).
1032            let backpressure = self.get_backpressure();
1033
1034            // Perform ! WritableStreamUpdateBackpressure(stream, backpressure).
1035            stream.update_backpressure(backpressure, global, CanGc::from_cx(cx));
1036        }
1037
1038        // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
1039        self.advance_queue_if_needed(cx, global);
1040    }
1041
1042    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-error-if-needed>
1043    pub(crate) fn error_if_needed(
1044        &self,
1045        cx: &mut JSContext,
1046        error: SafeHandleValue,
1047        global: &GlobalScope,
1048    ) {
1049        // Let stream be controller.[[stream]].
1050        let Some(stream) = self.stream.get() else {
1051            unreachable!("Controller should have a stream");
1052        };
1053
1054        // If stream.[[state]] is "writable",
1055        if stream.is_writable() {
1056            // Perform ! WritableStreamDefaultControllerError(controller, e).
1057            self.error(cx, &stream, error, global);
1058        }
1059    }
1060
1061    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-error>
1062    fn error(
1063        &self,
1064        cx: &mut JSContext,
1065        stream: &WritableStream,
1066        e: SafeHandleValue,
1067        global: &GlobalScope,
1068    ) {
1069        // Let stream be controller.[[stream]].
1070        // Done above with the argument.
1071
1072        // Assert: stream.[[state]] is "writable".
1073        assert!(stream.is_writable());
1074
1075        // Perform ! WritableStreamDefaultControllerClearAlgorithms(controller).
1076        self.clear_algorithms();
1077
1078        // Perform ! WritableStreamStartErroring(stream, error).
1079        stream.start_erroring(cx, global, e);
1080    }
1081}
1082
1083impl WritableStreamDefaultControllerMethods<crate::DomTypeHolder>
1084    for WritableStreamDefaultController
1085{
1086    /// <https://streams.spec.whatwg.org/#ws-default-controller-error>
1087    fn Error(&self, cx: &mut CurrentRealm, e: SafeHandleValue) {
1088        // Let state be this.[[stream]].[[state]].
1089        let Some(stream) = self.stream.get() else {
1090            unreachable!("Controller should have a stream");
1091        };
1092
1093        // If state is not "writable", return.
1094        if !stream.is_writable() {
1095            return;
1096        }
1097
1098        let global = GlobalScope::from_current_realm(cx);
1099
1100        // Perform ! WritableStreamDefaultControllerError(this, e).
1101        self.error(cx, &stream, e, &global);
1102    }
1103
1104    /// <https://streams.spec.whatwg.org/#ws-default-controller-signal>
1105    fn Signal(&self) -> DomRoot<AbortSignal> {
1106        // Return this.[[abortController]]’s signal.
1107        self.abort_controller.signal()
1108    }
1109}