script/dom/stream/
writablestreamdefaultcontroller.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::ptr;
7use std::rc::Rc;
8
9use dom_struct::dom_struct;
10use js::jsapi::{Heap, IsPromiseObject, JSObject};
11use js::jsval::{JSVal, UndefinedValue};
12use js::realm::CurrentRealm;
13use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle};
14
15use crate::dom::bindings::callback::ExceptionHandling;
16use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
17use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::{
18    UnderlyingSinkAbortCallback, UnderlyingSinkCloseCallback, UnderlyingSinkStartCallback,
19    UnderlyingSinkWriteCallback,
20};
21use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultControllerBinding::WritableStreamDefaultControllerMethods;
22use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
23use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
24use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
25use crate::dom::globalscope::GlobalScope;
26use crate::dom::messageport::MessagePort;
27use crate::dom::promise::Promise;
28use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
29use crate::dom::readablestreamdefaultcontroller::{EnqueuedValue, QueueWithSizes, ValueWithSize};
30use crate::dom::stream::writablestream::WritableStream;
31use crate::dom::types::{AbortController, AbortSignal, TransformStream};
32use crate::realms::{InRealm, enter_auto_realm};
33use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
34
35impl js::gc::Rootable for CloseAlgorithmFulfillmentHandler {}
36
37/// The fulfillment handler for
38/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close>
39#[derive(Clone, JSTraceable, MallocSizeOf)]
40#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
41struct CloseAlgorithmFulfillmentHandler {
42    stream: Dom<WritableStream>,
43}
44
45impl Callback for CloseAlgorithmFulfillmentHandler {
46    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
47        let can_gc = CanGc::from_cx(cx);
48        let stream = self.stream.as_rooted();
49
50        // Perform ! WritableStreamFinishInFlightClose(stream).
51        stream.finish_in_flight_close(cx.into(), can_gc);
52    }
53}
54
55impl js::gc::Rootable for CloseAlgorithmRejectionHandler {}
56
57/// The rejection handler for
58/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close>
59#[derive(Clone, JSTraceable, MallocSizeOf)]
60#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
61struct CloseAlgorithmRejectionHandler {
62    stream: Dom<WritableStream>,
63}
64
65impl Callback for CloseAlgorithmRejectionHandler {
66    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
67        let stream = self.stream.as_rooted();
68
69        let global = GlobalScope::from_current_realm(cx);
70
71        // Perform ! WritableStreamFinishInFlightCloseWithError(stream, reason).
72        stream.finish_in_flight_close_with_error(cx, &global, v);
73    }
74}
75
76impl js::gc::Rootable for StartAlgorithmFulfillmentHandler {}
77
78/// The fulfillment handler for
79/// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
80#[derive(Clone, JSTraceable, MallocSizeOf)]
81#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
82struct StartAlgorithmFulfillmentHandler {
83    controller: Dom<WritableStreamDefaultController>,
84}
85
86impl Callback for StartAlgorithmFulfillmentHandler {
87    /// Continuation of <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
88    /// Upon fulfillment of startPromise,
89    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
90        let controller = self.controller.as_rooted();
91        let stream = controller
92            .stream
93            .get()
94            .expect("Controller should have a stream.");
95
96        // Assert: stream.[[state]] is "writable" or "erroring".
97        assert!(stream.is_erroring() || stream.is_writable());
98
99        // Set controller.[[started]] to true.
100        controller.started.set(true);
101
102        let global = GlobalScope::from_current_realm(cx);
103
104        // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
105        controller.advance_queue_if_needed(cx, &global)
106    }
107}
108
109impl js::gc::Rootable for StartAlgorithmRejectionHandler {}
110
111/// The rejection handler for
112/// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
113#[derive(Clone, JSTraceable, MallocSizeOf)]
114#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
115struct StartAlgorithmRejectionHandler {
116    controller: Dom<WritableStreamDefaultController>,
117}
118
119impl Callback for StartAlgorithmRejectionHandler {
120    /// Continuation of <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
121    /// Upon rejection of startPromise with reason r,
122    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
123        let controller = self.controller.as_rooted();
124        let stream = controller
125            .stream
126            .get()
127            .expect("Controller should have a stream.");
128
129        // Assert: stream.[[state]] is "writable" or "erroring".
130        assert!(stream.is_erroring() || stream.is_writable());
131
132        // Set controller.[[started]] to true.
133        controller.started.set(true);
134
135        let global = GlobalScope::from_current_realm(cx);
136
137        // Perform ! WritableStreamDealWithRejection(stream, r).
138        stream.deal_with_rejection(cx, &global, v);
139    }
140}
141
142impl js::gc::Rootable for TransferBackPressurePromiseReaction {}
143
144/// Reacting to backpressurePromise as part of the `writeAlgorithm` of
145/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
146#[derive(JSTraceable, MallocSizeOf)]
147#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
148struct TransferBackPressurePromiseReaction {
149    /// The result of reacting to backpressurePromise.
150    #[conditional_malloc_size_of]
151    result_promise: Rc<Promise>,
152
153    /// The backpressurePromise.
154    #[ignore_malloc_size_of = "nested Rc"]
155    backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
156
157    /// The chunk received by the `writeAlgorithm`.
158    #[ignore_malloc_size_of = "mozjs"]
159    chunk: Box<Heap<JSVal>>,
160
161    /// The port used in the algorithm.
162    port: Dom<MessagePort>,
163}
164
165impl Callback for TransferBackPressurePromiseReaction {
166    /// Reacting to backpressurePromise with the following fulfillment steps:
167    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
168        let can_gc = CanGc::from_cx(cx);
169        let global = self.result_promise.global();
170        // Set backpressurePromise to a new promise.
171        let promise = Promise::new2(cx, &global);
172        *self.backpressure_promise.borrow_mut() = Some(promise);
173
174        // Let result be PackAndPostMessageHandlingError(port, "chunk", chunk).
175        rooted!(&in(cx) let mut chunk = UndefinedValue());
176        chunk.set(self.chunk.get());
177        let result =
178            self.port
179                .pack_and_post_message_handling_error("chunk", chunk.handle(), can_gc);
180
181        // If result is an abrupt completion,
182        if let Err(error) = result {
183            // Disentangle port.
184            global.disentangle_port(cx, &self.port);
185
186            // Return a promise rejected with result.[[Value]].
187            self.result_promise.reject_error(error, can_gc);
188        } else {
189            // Otherwise, return a promise resolved with undefined.
190            self.result_promise.resolve_native(&(), can_gc);
191        }
192    }
193}
194
195impl js::gc::Rootable for WriteAlgorithmFulfillmentHandler {}
196
197/// The fulfillment handler for
198/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write>
199#[derive(Clone, JSTraceable, MallocSizeOf)]
200#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
201struct WriteAlgorithmFulfillmentHandler {
202    controller: Dom<WritableStreamDefaultController>,
203}
204
205impl Callback for WriteAlgorithmFulfillmentHandler {
206    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
207        let can_gc = CanGc::from_cx(cx);
208        let controller = self.controller.as_rooted();
209        let stream = controller
210            .stream
211            .get()
212            .expect("Controller should have a stream.");
213
214        // Perform ! WritableStreamFinishInFlightWrite(stream).
215        stream.finish_in_flight_write(can_gc);
216
217        // Let state be stream.[[state]].
218        // Assert: state is "writable" or "erroring".
219        assert!(stream.is_erroring() || stream.is_writable());
220
221        // Perform ! DequeueValue(controller).
222        rooted!(&in(cx) let mut rval = UndefinedValue());
223        controller
224            .queue
225            .dequeue_value(cx.into(), Some(rval.handle_mut()), can_gc);
226
227        let global = GlobalScope::from_current_realm(cx);
228
229        // If ! WritableStreamCloseQueuedOrInFlight(stream) is false and state is "writable",
230        if !stream.close_queued_or_in_flight() && stream.is_writable() {
231            // Let backpressure be ! WritableStreamDefaultControllerGetBackpressure(controller).
232            let backpressure = controller.get_backpressure();
233
234            // Perform ! WritableStreamUpdateBackpressure(stream, backpressure).
235            stream.update_backpressure(backpressure, &global, can_gc);
236        }
237
238        // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
239        controller.advance_queue_if_needed(cx, &global)
240    }
241}
242
243impl js::gc::Rootable for WriteAlgorithmRejectionHandler {}
244
245/// The rejection handler for
246/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write>
247#[derive(Clone, JSTraceable, MallocSizeOf)]
248#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
249struct WriteAlgorithmRejectionHandler {
250    controller: Dom<WritableStreamDefaultController>,
251}
252
253impl Callback for WriteAlgorithmRejectionHandler {
254    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
255        let controller = self.controller.as_rooted();
256        let stream = controller
257            .stream
258            .get()
259            .expect("Controller should have a stream.");
260
261        // If stream.[[state]] is "writable",
262        if stream.is_writable() {
263            // perform ! WritableStreamDefaultControllerClearAlgorithms(controller).
264            controller.clear_algorithms();
265        }
266
267        let global = GlobalScope::from_current_realm(cx);
268
269        // Perform ! WritableStreamFinishInFlightWriteWithError(stream, reason).
270        stream.finish_in_flight_write_with_error(cx, &global, v);
271    }
272}
273
274/// The type of sink algorithms we are using.
275#[derive(JSTraceable, PartialEq)]
276#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
277pub enum UnderlyingSinkType {
278    /// Algorithms are provided by Js callbacks.
279    Js {
280        /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-abortalgorithm>
281        abort: RefCell<Option<Rc<UnderlyingSinkAbortCallback>>>,
282
283        start: RefCell<Option<Rc<UnderlyingSinkStartCallback>>>,
284
285        /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-closealgorithm>
286        close: RefCell<Option<Rc<UnderlyingSinkCloseCallback>>>,
287
288        /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-writealgorithm>
289        write: RefCell<Option<Rc<UnderlyingSinkWriteCallback>>>,
290    },
291    /// Algorithms supporting streams transfer are implemented in Rust.
292    /// The promise and port used in those algorithms are stored here.
293    Transfer {
294        backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
295        port: Dom<MessagePort>,
296    },
297    /// Algorithms supporting transform streams are implemented in Rust.
298    Transform(Dom<TransformStream>, Rc<Promise>),
299}
300
301impl UnderlyingSinkType {
302    pub(crate) fn new_js(
303        abort: Option<Rc<UnderlyingSinkAbortCallback>>,
304        start: Option<Rc<UnderlyingSinkStartCallback>>,
305        close: Option<Rc<UnderlyingSinkCloseCallback>>,
306        write: Option<Rc<UnderlyingSinkWriteCallback>>,
307    ) -> Self {
308        UnderlyingSinkType::Js {
309            abort: RefCell::new(abort),
310            start: RefCell::new(start),
311            close: RefCell::new(close),
312            write: RefCell::new(write),
313        }
314    }
315}
316
317/// <https://streams.spec.whatwg.org/#ws-default-controller-class>
318#[dom_struct]
319pub struct WritableStreamDefaultController {
320    reflector_: Reflector,
321
322    /// The type of underlying sink used. Besides the default JS one,
323    /// there will be others for stream transfer, and for transform stream.
324    #[ignore_malloc_size_of = "underlying_sink_type"]
325    underlying_sink_type: UnderlyingSinkType,
326
327    /// The JS object used as `this` when invoking sink algorithms.
328    #[ignore_malloc_size_of = "mozjs"]
329    underlying_sink_obj: Heap<*mut JSObject>,
330
331    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-queue>
332    queue: QueueWithSizes,
333
334    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-started>
335    started: Cell<bool>,
336
337    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-strategyhwm>
338    strategy_hwm: f64,
339
340    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-strategysizealgorithm>
341    #[ignore_malloc_size_of = "QueuingStrategySize"]
342    strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>,
343
344    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-stream>
345    stream: MutNullableDom<WritableStream>,
346
347    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-abortcontroller>
348    abort_controller: Dom<AbortController>,
349}
350
351impl WritableStreamDefaultController {
352    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink>
353    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
354    fn new_inherited(
355        global: &GlobalScope,
356        underlying_sink_type: UnderlyingSinkType,
357        strategy_hwm: f64,
358        strategy_size: Rc<QueuingStrategySize>,
359        can_gc: CanGc,
360    ) -> WritableStreamDefaultController {
361        WritableStreamDefaultController {
362            reflector_: Reflector::new(),
363            underlying_sink_type,
364            queue: Default::default(),
365            stream: Default::default(),
366            underlying_sink_obj: Default::default(),
367            strategy_hwm,
368            strategy_size: RefCell::new(Some(strategy_size)),
369            started: Default::default(),
370            abort_controller: Dom::from_ref(&AbortController::new_with_proto(global, None, can_gc)),
371        }
372    }
373
374    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
375    pub(crate) fn new(
376        global: &GlobalScope,
377        underlying_sink_type: UnderlyingSinkType,
378        strategy_hwm: f64,
379        strategy_size: Rc<QueuingStrategySize>,
380        can_gc: CanGc,
381    ) -> DomRoot<WritableStreamDefaultController> {
382        reflect_dom_object(
383            Box::new(WritableStreamDefaultController::new_inherited(
384                global,
385                underlying_sink_type,
386                strategy_hwm,
387                strategy_size,
388                can_gc,
389            )),
390            global,
391            can_gc,
392        )
393    }
394
395    pub(crate) fn started(&self) -> bool {
396        self.started.get()
397    }
398
399    /// Setting the JS object after the heap has settled down.
400    pub(crate) fn set_underlying_sink_this_object(&self, this_object: SafeHandleObject) {
401        self.underlying_sink_obj.set(*this_object);
402    }
403
404    /// "Signal abort" call from <https://streams.spec.whatwg.org/#writable-stream-abort>
405    pub(crate) fn signal_abort(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) {
406        self.abort_controller.signal_abort(cx, reason);
407    }
408
409    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-clear-algorithms>
410    fn clear_algorithms(&self) {
411        match &self.underlying_sink_type {
412            UnderlyingSinkType::Js {
413                abort,
414                start: _,
415                close,
416                write,
417            } => {
418                // Set controller.[[writeAlgorithm]] to undefined.
419                write.borrow_mut().take();
420
421                // Set controller.[[closeAlgorithm]] to undefined.
422                close.borrow_mut().take();
423
424                // Set controller.[[abortAlgorithm]] to undefined.
425                abort.borrow_mut().take();
426            },
427            UnderlyingSinkType::Transfer {
428                backpressure_promise,
429                ..
430            } => {
431                backpressure_promise.borrow_mut().take();
432            },
433            UnderlyingSinkType::Transform(_, _) => {
434                return;
435            },
436        }
437
438        // Set controller.[[strategySizeAlgorithm]] to undefined.
439        self.strategy_size.borrow_mut().take();
440    }
441
442    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
443    pub(crate) fn setup(
444        &self,
445        cx: &mut js::context::JSContext,
446        global: &GlobalScope,
447        stream: &WritableStream,
448    ) -> Result<(), Error> {
449        // Assert: stream implements WritableStream.
450        // Implied by stream type.
451
452        // Assert: stream.[[controller]] is undefined.
453        stream.assert_no_controller();
454
455        // Set controller.[[stream]] to stream.
456        self.stream.set(Some(stream));
457
458        // Set stream.[[controller]] to controller.
459        stream.set_default_controller(self);
460
461        // Perform ! ResetQueue(controller).
462
463        // Set controller.[[abortController]] to a new AbortController.
464
465        // Set controller.[[started]] to false.
466
467        // Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm.
468
469        // Set controller.[[strategyHWM]] to highWaterMark.
470
471        // Set controller.[[writeAlgorithm]] to writeAlgorithm.
472
473        // Set controller.[[closeAlgorithm]] to closeAlgorithm.
474
475        // Set controller.[[abortAlgorithm]] to abortAlgorithm.
476
477        // Note: above steps are done in `new_inherited`.
478
479        // Let backpressure be ! WritableStreamDefaultControllerGetBackpressure(controller).
480        let backpressure = self.get_backpressure();
481
482        // Perform ! WritableStreamUpdateBackpressure(stream, backpressure).
483        stream.update_backpressure(backpressure, global, CanGc::from_cx(cx));
484
485        // Let startResult be the result of performing startAlgorithm. (This may throw an exception.)
486        // Let startPromise be a promise resolved with startResult.
487        let start_promise = self.start_algorithm(cx.into(), global, CanGc::from_cx(cx))?;
488
489        let rooted_default_controller = DomRoot::from_ref(self);
490
491        // Upon fulfillment of startPromise,
492        rooted!(&in(cx) let mut fulfillment_handler = Some(StartAlgorithmFulfillmentHandler {
493            controller: Dom::from_ref(&rooted_default_controller),
494        }));
495
496        // Upon rejection of startPromise with reason r,
497        rooted!(&in(cx) let mut rejection_handler = Some(StartAlgorithmRejectionHandler {
498            controller: Dom::from_ref(&rooted_default_controller),
499        }));
500
501        let handler = PromiseNativeHandler::new(
502            global,
503            fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
504            rejection_handler.take().map(|h| Box::new(h) as Box<_>),
505            CanGc::from_cx(cx),
506        );
507        let mut realm = enter_auto_realm(cx, global);
508        let cx = &mut realm.current_realm();
509
510        let in_realm_proof = cx.into();
511        let comp = InRealm::Already(&in_realm_proof);
512        start_promise.append_native_handler(&handler, comp, CanGc::from_cx(cx));
513
514        Ok(())
515    }
516
517    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-close>
518    pub(crate) fn close(&self, cx: &mut js::context::JSContext, global: &GlobalScope) {
519        // Perform ! EnqueueValueWithSize(controller, close sentinel, 0).
520        self.queue
521            .enqueue_value_with_size(EnqueuedValue::CloseSentinel)
522            .expect("Enqueuing the close sentinel should not fail.");
523        // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
524        self.advance_queue_if_needed(cx, global);
525    }
526
527    #[expect(unsafe_code)]
528    fn start_algorithm(
529        &self,
530        cx: SafeJSContext,
531        global: &GlobalScope,
532        can_gc: CanGc,
533    ) -> Fallible<Rc<Promise>> {
534        match &self.underlying_sink_type {
535            UnderlyingSinkType::Js {
536                start,
537                abort: _,
538                close: _,
539                write: _,
540            } => {
541                let algo = start.borrow().clone();
542                let start_promise = if let Some(start) = algo {
543                    rooted!(in(*cx) let mut result_object = ptr::null_mut::<JSObject>());
544                    rooted!(in(*cx) let mut result: JSVal);
545                    rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
546                    start.Call_(
547                        &this_object.handle(),
548                        self,
549                        result.handle_mut(),
550                        ExceptionHandling::Rethrow,
551                        can_gc,
552                    )?;
553                    let is_promise = unsafe {
554                        if result.is_object() {
555                            result_object.set(result.to_object());
556                            IsPromiseObject(result_object.handle().into_handle())
557                        } else {
558                            false
559                        }
560                    };
561                    if is_promise {
562                        Promise::new_with_js_promise(result_object.handle(), cx)
563                    } else {
564                        Promise::new_resolved(global, cx, result.get(), can_gc)
565                    }
566                } else {
567                    // Let startAlgorithm be an algorithm that returns undefined.
568                    Promise::new_resolved(global, cx, (), can_gc)
569                };
570
571                Ok(start_promise)
572            },
573            UnderlyingSinkType::Transfer { .. } => {
574                // Let startAlgorithm be an algorithm that returns undefined.
575                Ok(Promise::new_resolved(global, cx, (), can_gc))
576            },
577            UnderlyingSinkType::Transform(_, start_promise) => {
578                // Let startAlgorithm be an algorithm that returns startPromise.
579                Ok(start_promise.clone())
580            },
581        }
582    }
583
584    /// <https://streams.spec.whatwg.org/#ref-for-abstract-opdef-writablestreamcontroller-abortsteps>
585    pub(crate) fn abort_steps(
586        &self,
587        cx: &mut js::context::JSContext,
588        global: &GlobalScope,
589        reason: SafeHandleValue,
590    ) -> Rc<Promise> {
591        let result = match &self.underlying_sink_type {
592            UnderlyingSinkType::Js {
593                abort,
594                start: _,
595                close: _,
596                write: _,
597            } => {
598                rooted!(&in(cx) let this_object = self.underlying_sink_obj.get());
599                let algo = abort.borrow().clone();
600                // Let result be the result of performing this.[[abortAlgorithm]], passing reason.
601                let result = if let Some(algo) = algo {
602                    algo.Call_(
603                        &this_object.handle(),
604                        Some(reason),
605                        ExceptionHandling::Rethrow,
606                        CanGc::from_cx(cx),
607                    )
608                } else {
609                    Ok(Promise::new_resolved(
610                        global,
611                        cx.into(),
612                        (),
613                        CanGc::from_cx(cx),
614                    ))
615                };
616                result.unwrap_or_else(|e| {
617                    let promise = Promise::new(global, CanGc::from_cx(cx));
618                    promise.reject_error(e, CanGc::from_cx(cx));
619                    promise
620                })
621            },
622            UnderlyingSinkType::Transfer { port, .. } => {
623                // The steps from the `abortAlgorithm` at
624                // <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
625
626                // Let result be PackAndPostMessageHandlingError(port, "error", reason).
627                let result =
628                    port.pack_and_post_message_handling_error("error", reason, CanGc::from_cx(cx));
629
630                // Disentangle port.
631                global.disentangle_port(cx, port);
632
633                let promise = Promise::new(global, CanGc::from_cx(cx));
634
635                // If result is an abrupt completion, return a promise rejected with result.[[Value]]
636                if let Err(error) = result {
637                    promise.reject_error(error, CanGc::from_cx(cx));
638                } else {
639                    // Otherwise, return a promise resolved with undefined.
640                    promise.resolve_native(&(), CanGc::from_cx(cx));
641                }
642                promise
643            },
644            UnderlyingSinkType::Transform(stream, _) => {
645                // Return ! TransformStreamDefaultSinkAbortAlgorithm(stream, reason).
646                stream
647                    .transform_stream_default_sink_abort_algorithm(
648                        cx.into(),
649                        global,
650                        reason,
651                        CanGc::from_cx(cx),
652                    )
653                    .expect("Transform stream default sink abort algorithm should not fail.")
654            },
655        };
656
657        // Perform ! WritableStreamDefaultControllerClearAlgorithms(controller).
658        self.clear_algorithms();
659
660        result
661    }
662
663    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-writealgorithm>
664    fn call_write_algorithm(
665        &self,
666        cx: &mut js::context::JSContext,
667        chunk: SafeHandleValue,
668        global: &GlobalScope,
669    ) -> Rc<Promise> {
670        match &self.underlying_sink_type {
671            UnderlyingSinkType::Js {
672                abort: _,
673                start: _,
674                close: _,
675                write,
676            } => {
677                rooted!(&in(cx) let this_object = self.underlying_sink_obj.get());
678                let algo = write.borrow().clone();
679                let result = if let Some(algo) = algo {
680                    algo.Call_(
681                        &this_object.handle(),
682                        chunk,
683                        self,
684                        ExceptionHandling::Rethrow,
685                        CanGc::from_cx(cx),
686                    )
687                } else {
688                    Ok(Promise::new_resolved(
689                        global,
690                        cx.into(),
691                        (),
692                        CanGc::from_cx(cx),
693                    ))
694                };
695                result.unwrap_or_else(|e| {
696                    let promise = Promise::new2(cx, global);
697                    promise.reject_error(e, CanGc::from_cx(cx));
698                    promise
699                })
700            },
701            UnderlyingSinkType::Transfer {
702                backpressure_promise,
703                port,
704            } => {
705                // The steps from the `writeAlgorithm` at
706                // <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
707
708                // If backpressurePromise is undefined,
709                // set backpressurePromise to a promise resolved with undefined.
710                if backpressure_promise.borrow().is_none() {
711                    let promise = Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
712                    *backpressure_promise.borrow_mut() = Some(promise);
713                }
714
715                // Return the result of reacting to backpressurePromise with the following fulfillment steps:
716                let result_promise = Promise::new2(cx, global);
717                rooted!(&in(cx) let mut fulfillment_handler = Some(TransferBackPressurePromiseReaction {
718                    port: port.clone(),
719                    backpressure_promise: backpressure_promise.clone(),
720                    chunk: Heap::boxed(chunk.get()),
721                    result_promise: result_promise.clone(),
722                }));
723                let handler = PromiseNativeHandler::new(
724                    global,
725                    fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
726                    None,
727                    CanGc::from_cx(cx),
728                );
729                let mut realm = enter_auto_realm(cx, global);
730                let realm = &mut realm.current_realm();
731                let in_realm_proof = realm.into();
732                let comp = InRealm::Already(&in_realm_proof);
733                backpressure_promise
734                    .borrow()
735                    .as_ref()
736                    .expect("Promise must be some by now.")
737                    .append_native_handler(&handler, comp, CanGc::from_cx(realm));
738                result_promise
739            },
740            UnderlyingSinkType::Transform(stream, _) => {
741                // Return ! TransformStreamDefaultSinkWriteAlgorithm(stream, chunk).
742                stream
743                    .transform_stream_default_sink_write_algorithm(cx, global, chunk)
744                    .expect("Transform stream default sink write algorithm should not fail.")
745            },
746        }
747    }
748
749    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-closealgorithm>
750    fn call_close_algorithm(
751        &self,
752        cx: &mut js::context::JSContext,
753        global: &GlobalScope,
754    ) -> Rc<Promise> {
755        match &self.underlying_sink_type {
756            UnderlyingSinkType::Js {
757                abort: _,
758                start: _,
759                close,
760                write: _,
761            } => {
762                rooted!(&in(cx) let mut this_object = ptr::null_mut::<JSObject>());
763                this_object.set(self.underlying_sink_obj.get());
764                let algo = close.borrow().clone();
765                let result = if let Some(algo) = algo {
766                    algo.Call_(
767                        &this_object.handle(),
768                        ExceptionHandling::Rethrow,
769                        CanGc::from_cx(cx),
770                    )
771                } else {
772                    Ok(Promise::new_resolved(
773                        global,
774                        cx.into(),
775                        (),
776                        CanGc::from_cx(cx),
777                    ))
778                };
779                result.unwrap_or_else(|e| {
780                    let promise = Promise::new2(cx, global);
781                    promise.reject_error(e, CanGc::from_cx(cx));
782                    promise
783                })
784            },
785            UnderlyingSinkType::Transfer { port, .. } => {
786                // The steps from the `closeAlgorithm` at
787                // <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
788
789                // Perform ! PackAndPostMessage(port, "close", undefined).
790                rooted!(&in(cx) let mut value = UndefinedValue());
791                port.pack_and_post_message("close", value.handle(), CanGc::from_cx(cx))
792                    .expect("Sending close should not fail.");
793
794                // Disentangle port.
795                global.disentangle_port(cx, port);
796
797                // Return a promise resolved with undefined.
798                Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
799            },
800            UnderlyingSinkType::Transform(stream, _) => {
801                // Return ! TransformStreamDefaultSinkCloseAlgorithm(stream).
802                stream
803                    .transform_stream_default_sink_close_algorithm(cx, global)
804                    .expect("Transform stream default sink close algorithm should not fail.")
805            },
806        }
807    }
808
809    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close>
810    pub(crate) fn process_close(&self, cx: &mut js::context::JSContext, global: &GlobalScope) {
811        // Let stream be controller.[[stream]].
812        let Some(stream) = self.stream.get() else {
813            unreachable!("Controller should have a stream");
814        };
815
816        // Perform ! WritableStreamMarkCloseRequestInFlight(stream).
817        stream.mark_close_request_in_flight();
818
819        // Perform ! DequeueValue(controller).
820        self.queue
821            .dequeue_value(cx.into(), None, CanGc::from_cx(cx));
822
823        // Assert: controller.[[queue]] is empty.
824        assert!(self.queue.is_empty());
825
826        // Let sinkClosePromise be the result of performing controller.[[closeAlgorithm]].
827        let sink_close_promise = self.call_close_algorithm(cx, global);
828
829        // Perform ! WritableStreamDefaultControllerClearAlgorithms(controller).
830        self.clear_algorithms();
831
832        // Upon fulfillment of sinkClosePromise,
833        rooted!(&in(cx) let mut fulfillment_handler = Some(CloseAlgorithmFulfillmentHandler {
834            stream: Dom::from_ref(&stream),
835        }));
836
837        // Upon rejection of sinkClosePromise with reason reason,
838        rooted!(&in(cx) let mut rejection_handler = Some(CloseAlgorithmRejectionHandler {
839            stream: Dom::from_ref(&stream),
840        }));
841
842        // Attach handlers to the promise.
843        let handler = PromiseNativeHandler::new(
844            global,
845            fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
846            rejection_handler.take().map(|h| Box::new(h) as Box<_>),
847            CanGc::from_cx(cx),
848        );
849        let mut realm = enter_auto_realm(cx, global);
850        let realm = &mut realm.current_realm();
851        let in_realm_proof = realm.into();
852        let comp = InRealm::Already(&in_realm_proof);
853        sink_close_promise.append_native_handler(&handler, comp, CanGc::from_cx(realm));
854    }
855
856    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-advance-queue-if-needed>
857    fn advance_queue_if_needed(&self, cx: &mut js::context::JSContext, global: &GlobalScope) {
858        // Let stream be controller.[[stream]].
859        let Some(stream) = self.stream.get() else {
860            unreachable!("Controller should have a stream");
861        };
862
863        // If controller.[[started]] is false, return.
864        if !self.started.get() {
865            return;
866        }
867
868        // If stream.[[inFlightWriteRequest]] is not undefined, return.
869        if stream.has_in_flight_write_request() {
870            return;
871        }
872
873        // Let state be stream.[[state]].
874
875        // Assert: state is not "closed" or "errored".
876        assert!(!(stream.is_errored() || stream.is_closed()));
877
878        // If state is "erroring",
879        if stream.is_erroring() {
880            // Perform ! WritableStreamFinishErroring(stream).
881            stream.finish_erroring(cx, global);
882
883            // Return.
884            return;
885        }
886
887        // Let value be ! PeekQueueValue(controller).
888        rooted!(&in(cx) let mut value = UndefinedValue());
889        let is_closed = {
890            // If controller.[[queue]] is empty, return.
891            if self.queue.is_empty() {
892                return;
893            }
894            self.queue
895                .peek_queue_value(cx.into(), value.handle_mut(), CanGc::from_cx(cx))
896        };
897
898        if is_closed {
899            // If value is the close sentinel, perform ! WritableStreamDefaultControllerProcessClose(controller).
900            self.process_close(cx, global);
901        } else {
902            // Otherwise, perform ! WritableStreamDefaultControllerProcessWrite(controller, value).
903            self.process_write(cx, value.handle(), global);
904        };
905    }
906
907    /// <https://streams.spec.whatwg.org/#ws-default-controller-private-error>
908    pub(crate) fn perform_error_steps(&self) {
909        // Perform ! ResetQueue(this).
910        self.queue.reset();
911    }
912
913    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write>
914    fn process_write(
915        &self,
916        cx: &mut js::context::JSContext,
917        chunk: SafeHandleValue,
918        global: &GlobalScope,
919    ) {
920        // Let stream be controller.[[stream]].
921        let Some(stream) = self.stream.get() else {
922            unreachable!("Controller should have a stream");
923        };
924
925        // Perform ! WritableStreamMarkFirstWriteRequestInFlight(stream).
926        stream.mark_first_write_request_in_flight();
927
928        // Let sinkWritePromise be the result of performing controller.[[writeAlgorithm]], passing in chunk.
929        let sink_write_promise = self.call_write_algorithm(cx, chunk, global);
930
931        // Upon fulfillment of sinkWritePromise,
932        rooted!(&in(cx) let mut fulfillment_handler = Some(WriteAlgorithmFulfillmentHandler {
933            controller: Dom::from_ref(self),
934        }));
935
936        // Upon rejection of sinkWritePromise with reason,
937        rooted!(&in(cx) let mut rejection_handler = Some(WriteAlgorithmRejectionHandler {
938            controller: Dom::from_ref(self),
939        }));
940
941        // Attach handlers to the promise.
942        let handler = PromiseNativeHandler::new(
943            global,
944            fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
945            rejection_handler.take().map(|h| Box::new(h) as Box<_>),
946            CanGc::from_cx(cx),
947        );
948        let mut realm = enter_auto_realm(cx, global);
949        let realm = &mut realm.current_realm();
950        let in_realm_proof = realm.into();
951        let comp = InRealm::Already(&in_realm_proof);
952        sink_write_promise.append_native_handler(&handler, comp, CanGc::from_cx(realm));
953    }
954
955    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-get-desired-size>
956    pub(crate) fn get_desired_size(&self) -> f64 {
957        // Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
958        let desired_size = self.strategy_hwm - self.queue.total_size.get().clamp(0.0, f64::MAX);
959        desired_size.clamp(desired_size, self.strategy_hwm)
960    }
961
962    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-get-backpressure>
963    fn get_backpressure(&self) -> bool {
964        // Let desiredSize be ! WritableStreamDefaultControllerGetDesiredSize(controller).
965        let desired_size = self.get_desired_size();
966
967        // Return true if desiredSize ≤ 0, or false otherwise.
968        desired_size == 0.0 || desired_size.is_sign_negative()
969    }
970
971    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-get-chunk-size>
972    pub(crate) fn get_chunk_size(
973        &self,
974        cx: &mut js::context::JSContext,
975        global: &GlobalScope,
976        chunk: SafeHandleValue,
977    ) -> f64 {
978        // If controller.[[strategySizeAlgorithm]] is undefined, then:
979        let Some(strategy_size) = self.strategy_size.borrow().clone() else {
980            // Assert: controller.[[stream]].[[state]] is not "writable".
981            let Some(stream) = self.stream.get() else {
982                unreachable!("Controller should have a stream");
983            };
984            assert!(!stream.is_writable());
985
986            // Return 1.
987            return 1.0;
988        };
989
990        // Let returnValue be the result of performing controller.[[strategySizeAlgorithm]],
991        // passing in chunk, and interpreting the result as a completion record.
992        let result = strategy_size.Call__(chunk, ExceptionHandling::Rethrow, CanGc::from_cx(cx));
993
994        match result {
995            // Let chunkSize be result.[[Value]].
996            Ok(size) => size,
997            Err(error) => {
998                // If result is an abrupt completion,
999
1000                // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, returnValue.[[Value]]).
1001                // Create a rooted value for the error.
1002                rooted!(&in(cx) let mut rooted_error = UndefinedValue());
1003                error.to_jsval(
1004                    cx.into(),
1005                    global,
1006                    rooted_error.handle_mut(),
1007                    CanGc::from_cx(cx),
1008                );
1009                self.error_if_needed(cx, rooted_error.handle(), global);
1010
1011                // Return 1.
1012                1.0
1013            },
1014        }
1015    }
1016
1017    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-write>
1018    pub(crate) fn write(
1019        &self,
1020        cx: &mut js::context::JSContext,
1021        global: &GlobalScope,
1022        chunk: SafeHandleValue,
1023        chunk_size: f64,
1024    ) {
1025        // Let enqueueResult be EnqueueValueWithSize(controller, chunk, chunkSize).
1026        let enqueue_result = self
1027            .queue
1028            .enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize {
1029                value: Heap::boxed(chunk.get()),
1030                size: chunk_size,
1031            }));
1032
1033        // If enqueueResult is an abrupt completion,
1034        if let Err(error) = enqueue_result {
1035            // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, enqueueResult.[[Value]]).
1036            // Create a rooted value for the error.
1037            rooted!(&in(cx) let mut rooted_error = UndefinedValue());
1038            error.to_jsval(
1039                cx.into(),
1040                global,
1041                rooted_error.handle_mut(),
1042                CanGc::from_cx(cx),
1043            );
1044            self.error_if_needed(cx, rooted_error.handle(), global);
1045
1046            // Return.
1047            return;
1048        }
1049
1050        // Let stream be controller.[[stream]].
1051        let Some(stream) = self.stream.get() else {
1052            unreachable!("Controller should have a stream");
1053        };
1054
1055        // If ! WritableStreamCloseQueuedOrInFlight(stream) is false and stream.[[state]] is "writable",
1056        if !stream.close_queued_or_in_flight() && stream.is_writable() {
1057            // Let backpressure be ! WritableStreamDefaultControllerGetBackpressure(controller).
1058            let backpressure = self.get_backpressure();
1059
1060            // Perform ! WritableStreamUpdateBackpressure(stream, backpressure).
1061            stream.update_backpressure(backpressure, global, CanGc::from_cx(cx));
1062        }
1063
1064        // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
1065        self.advance_queue_if_needed(cx, global);
1066    }
1067
1068    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-error-if-needed>
1069    pub(crate) fn error_if_needed(
1070        &self,
1071        cx: &mut js::context::JSContext,
1072        error: SafeHandleValue,
1073        global: &GlobalScope,
1074    ) {
1075        // Let stream be controller.[[stream]].
1076        let Some(stream) = self.stream.get() else {
1077            unreachable!("Controller should have a stream");
1078        };
1079
1080        // If stream.[[state]] is "writable",
1081        if stream.is_writable() {
1082            // Perform ! WritableStreamDefaultControllerError(controller, e).
1083            self.error(cx, &stream, error, global);
1084        }
1085    }
1086
1087    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-error>
1088    fn error(
1089        &self,
1090        cx: &mut js::context::JSContext,
1091        stream: &WritableStream,
1092        e: SafeHandleValue,
1093        global: &GlobalScope,
1094    ) {
1095        // Let stream be controller.[[stream]].
1096        // Done above with the argument.
1097
1098        // Assert: stream.[[state]] is "writable".
1099        assert!(stream.is_writable());
1100
1101        // Perform ! WritableStreamDefaultControllerClearAlgorithms(controller).
1102        self.clear_algorithms();
1103
1104        // Perform ! WritableStreamStartErroring(stream, error).
1105        stream.start_erroring(cx, global, e);
1106    }
1107}
1108
1109impl WritableStreamDefaultControllerMethods<crate::DomTypeHolder>
1110    for WritableStreamDefaultController
1111{
1112    /// <https://streams.spec.whatwg.org/#ws-default-controller-error>
1113    fn Error(&self, cx: &mut CurrentRealm, e: SafeHandleValue) {
1114        // Let state be this.[[stream]].[[state]].
1115        let Some(stream) = self.stream.get() else {
1116            unreachable!("Controller should have a stream");
1117        };
1118
1119        // If state is not "writable", return.
1120        if !stream.is_writable() {
1121            return;
1122        }
1123
1124        let global = GlobalScope::from_current_realm(cx);
1125
1126        // Perform ! WritableStreamDefaultControllerError(this, e).
1127        self.error(cx, &stream, e, &global);
1128    }
1129
1130    /// <https://streams.spec.whatwg.org/#ws-default-controller-signal>
1131    fn Signal(&self) -> DomRoot<AbortSignal> {
1132        // Return this.[[abortController]]’s signal.
1133        self.abort_controller.signal()
1134    }
1135}