Skip to main content

script/dom/stream/
writablestreamdefaultcontroller.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::ptr;
7use std::rc::Rc;
8
9use dom_struct::dom_struct;
10use js::context::JSContext;
11use js::jsapi::{Heap, IsPromiseObject, JSObject};
12use js::jsval::{JSVal, UndefinedValue};
13use js::realm::CurrentRealm;
14use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle};
15use script_bindings::reflector::{Reflector, reflect_dom_object_with_cx};
16
17use crate::dom::bindings::callback::ExceptionHandling;
18use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
19use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::{
20    UnderlyingSinkAbortCallback, UnderlyingSinkCloseCallback, UnderlyingSinkStartCallback,
21    UnderlyingSinkWriteCallback,
22};
23use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultControllerBinding::WritableStreamDefaultControllerMethods;
24use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
25use crate::dom::bindings::reflector::DomGlobal;
26use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
27use crate::dom::globalscope::GlobalScope;
28use crate::dom::messageport::MessagePort;
29use crate::dom::promise::Promise;
30use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
31use crate::dom::readablestreamdefaultcontroller::{EnqueuedValue, QueueWithSizes, ValueWithSize};
32use crate::dom::stream::writablestream::WritableStream;
33use crate::dom::types::{AbortController, AbortSignal, TransformStream};
34use crate::realms::enter_auto_realm;
35
36impl js::gc::Rootable for CloseAlgorithmFulfillmentHandler {}
37
38/// The fulfillment handler for
39/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close>
40#[derive(Clone, JSTraceable, MallocSizeOf)]
41#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
42struct CloseAlgorithmFulfillmentHandler {
43    stream: Dom<WritableStream>,
44}
45
46impl Callback for CloseAlgorithmFulfillmentHandler {
47    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
48        let stream = self.stream.as_rooted();
49
50        // Perform ! WritableStreamFinishInFlightClose(stream).
51        stream.finish_in_flight_close(cx);
52    }
53}
54
55impl js::gc::Rootable for CloseAlgorithmRejectionHandler {}
56
57/// The rejection handler for
58/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close>
59#[derive(Clone, JSTraceable, MallocSizeOf)]
60#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
61struct CloseAlgorithmRejectionHandler {
62    stream: Dom<WritableStream>,
63}
64
65impl Callback for CloseAlgorithmRejectionHandler {
66    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
67        let stream = self.stream.as_rooted();
68
69        let global = GlobalScope::from_current_realm(cx);
70
71        // Perform ! WritableStreamFinishInFlightCloseWithError(stream, reason).
72        stream.finish_in_flight_close_with_error(cx, &global, v);
73    }
74}
75
76impl js::gc::Rootable for StartAlgorithmFulfillmentHandler {}
77
78/// The fulfillment handler for
79/// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
80#[derive(Clone, JSTraceable, MallocSizeOf)]
81#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
82struct StartAlgorithmFulfillmentHandler {
83    controller: Dom<WritableStreamDefaultController>,
84}
85
86impl Callback for StartAlgorithmFulfillmentHandler {
87    /// Continuation of <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
88    /// Upon fulfillment of startPromise,
89    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
90        let controller = self.controller.as_rooted();
91        let stream = controller
92            .stream
93            .get()
94            .expect("Controller should have a stream.");
95
96        // Assert: stream.[[state]] is "writable" or "erroring".
97        assert!(stream.is_erroring() || stream.is_writable());
98
99        // Set controller.[[started]] to true.
100        controller.started.set(true);
101
102        let global = GlobalScope::from_current_realm(cx);
103
104        // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
105        controller.advance_queue_if_needed(cx, &global)
106    }
107}
108
109impl js::gc::Rootable for StartAlgorithmRejectionHandler {}
110
111/// The rejection handler for
112/// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
113#[derive(Clone, JSTraceable, MallocSizeOf)]
114#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
115struct StartAlgorithmRejectionHandler {
116    controller: Dom<WritableStreamDefaultController>,
117}
118
119impl Callback for StartAlgorithmRejectionHandler {
120    /// Continuation of <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
121    /// Upon rejection of startPromise with reason r,
122    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
123        let controller = self.controller.as_rooted();
124        let stream = controller
125            .stream
126            .get()
127            .expect("Controller should have a stream.");
128
129        // Assert: stream.[[state]] is "writable" or "erroring".
130        assert!(stream.is_erroring() || stream.is_writable());
131
132        // Set controller.[[started]] to true.
133        controller.started.set(true);
134
135        let global = GlobalScope::from_current_realm(cx);
136
137        // Perform ! WritableStreamDealWithRejection(stream, r).
138        stream.deal_with_rejection(cx, &global, v);
139    }
140}
141
142impl js::gc::Rootable for TransferBackPressurePromiseReaction {}
143
144/// Reacting to backpressurePromise as part of the `writeAlgorithm` of
145/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
146#[derive(JSTraceable, MallocSizeOf)]
147#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
148struct TransferBackPressurePromiseReaction {
149    /// The result of reacting to backpressurePromise.
150    #[conditional_malloc_size_of]
151    result_promise: Rc<Promise>,
152
153    /// The backpressurePromise.
154    #[ignore_malloc_size_of = "nested Rc"]
155    backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
156
157    /// The chunk received by the `writeAlgorithm`.
158    #[ignore_malloc_size_of = "mozjs"]
159    chunk: Box<Heap<JSVal>>,
160
161    /// The port used in the algorithm.
162    port: Dom<MessagePort>,
163}
164
165impl Callback for TransferBackPressurePromiseReaction {
166    /// Reacting to backpressurePromise with the following fulfillment steps:
167    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
168        let global = self.result_promise.global();
169        // Set backpressurePromise to a new promise.
170        let promise = Promise::new(cx, &global);
171        *self.backpressure_promise.borrow_mut() = Some(promise);
172
173        // Let result be PackAndPostMessageHandlingError(port, "chunk", chunk).
174        rooted!(&in(cx) let mut chunk = UndefinedValue());
175        chunk.set(self.chunk.get());
176        let result = self
177            .port
178            .pack_and_post_message_handling_error(cx, "chunk", chunk.handle());
179
180        // If result is an abrupt completion,
181        if let Err(error) = result {
182            // Disentangle port.
183            global.disentangle_port(cx, &self.port);
184
185            // Return a promise rejected with result.[[Value]].
186            self.result_promise.reject_error(cx, error);
187        } else {
188            // Otherwise, return a promise resolved with undefined.
189            self.result_promise.resolve_native(cx, &());
190        }
191    }
192}
193
194impl js::gc::Rootable for WriteAlgorithmFulfillmentHandler {}
195
196/// The fulfillment handler for
197/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write>
198#[derive(Clone, JSTraceable, MallocSizeOf)]
199#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
200struct WriteAlgorithmFulfillmentHandler {
201    controller: Dom<WritableStreamDefaultController>,
202}
203
204impl Callback for WriteAlgorithmFulfillmentHandler {
205    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
206        let controller = self.controller.as_rooted();
207        let stream = controller
208            .stream
209            .get()
210            .expect("Controller should have a stream.");
211
212        // Perform ! WritableStreamFinishInFlightWrite(stream).
213        stream.finish_in_flight_write(cx);
214
215        // Let state be stream.[[state]].
216        // Assert: state is "writable" or "erroring".
217        assert!(stream.is_erroring() || stream.is_writable());
218
219        // Perform ! DequeueValue(controller).
220        rooted!(&in(cx) let mut rval = UndefinedValue());
221        controller.queue.dequeue_value(cx, Some(rval.handle_mut()));
222
223        let global = GlobalScope::from_current_realm(cx);
224
225        // If ! WritableStreamCloseQueuedOrInFlight(stream) is false and state is "writable",
226        if !stream.close_queued_or_in_flight() && stream.is_writable() {
227            // Let backpressure be ! WritableStreamDefaultControllerGetBackpressure(controller).
228            let backpressure = controller.get_backpressure();
229
230            // Perform ! WritableStreamUpdateBackpressure(stream, backpressure).
231            stream.update_backpressure(cx, backpressure, &global);
232        }
233
234        // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
235        controller.advance_queue_if_needed(cx, &global)
236    }
237}
238
239impl js::gc::Rootable for WriteAlgorithmRejectionHandler {}
240
241/// The rejection handler for
242/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write>
243#[derive(Clone, JSTraceable, MallocSizeOf)]
244#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
245struct WriteAlgorithmRejectionHandler {
246    controller: Dom<WritableStreamDefaultController>,
247}
248
249impl Callback for WriteAlgorithmRejectionHandler {
250    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
251        let controller = self.controller.as_rooted();
252        let stream = controller
253            .stream
254            .get()
255            .expect("Controller should have a stream.");
256
257        // If stream.[[state]] is "writable",
258        if stream.is_writable() {
259            // perform ! WritableStreamDefaultControllerClearAlgorithms(controller).
260            controller.clear_algorithms();
261        }
262
263        let global = GlobalScope::from_current_realm(cx);
264
265        // Perform ! WritableStreamFinishInFlightWriteWithError(stream, reason).
266        stream.finish_in_flight_write_with_error(cx, &global, v);
267    }
268}
269
270/// The type of sink algorithms we are using.
271#[derive(JSTraceable, PartialEq)]
272#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
273pub enum UnderlyingSinkType {
274    /// Algorithms are provided by Js callbacks.
275    Js {
276        /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-abortalgorithm>
277        abort: RefCell<Option<Rc<UnderlyingSinkAbortCallback>>>,
278
279        start: RefCell<Option<Rc<UnderlyingSinkStartCallback>>>,
280
281        /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-closealgorithm>
282        close: RefCell<Option<Rc<UnderlyingSinkCloseCallback>>>,
283
284        /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-writealgorithm>
285        write: RefCell<Option<Rc<UnderlyingSinkWriteCallback>>>,
286    },
287    /// Algorithms supporting streams transfer are implemented in Rust.
288    /// The promise and port used in those algorithms are stored here.
289    Transfer {
290        backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
291        port: Dom<MessagePort>,
292    },
293    /// Algorithms supporting transform streams are implemented in Rust.
294    Transform(Dom<TransformStream>, Rc<Promise>),
295}
296
297impl UnderlyingSinkType {
298    pub(crate) fn new_js(
299        abort: Option<Rc<UnderlyingSinkAbortCallback>>,
300        start: Option<Rc<UnderlyingSinkStartCallback>>,
301        close: Option<Rc<UnderlyingSinkCloseCallback>>,
302        write: Option<Rc<UnderlyingSinkWriteCallback>>,
303    ) -> Self {
304        UnderlyingSinkType::Js {
305            abort: RefCell::new(abort),
306            start: RefCell::new(start),
307            close: RefCell::new(close),
308            write: RefCell::new(write),
309        }
310    }
311}
312
313/// <https://streams.spec.whatwg.org/#ws-default-controller-class>
314#[dom_struct]
315pub struct WritableStreamDefaultController {
316    reflector_: Reflector,
317
318    /// The type of underlying sink used. Besides the default JS one,
319    /// there will be others for stream transfer, and for transform stream.
320    #[ignore_malloc_size_of = "underlying_sink_type"]
321    underlying_sink_type: UnderlyingSinkType,
322
323    /// The JS object used as `this` when invoking sink algorithms.
324    #[ignore_malloc_size_of = "mozjs"]
325    underlying_sink_obj: Heap<*mut JSObject>,
326
327    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-queue>
328    queue: QueueWithSizes,
329
330    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-started>
331    started: Cell<bool>,
332
333    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-strategyhwm>
334    strategy_hwm: f64,
335
336    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-strategysizealgorithm>
337    #[ignore_malloc_size_of = "QueuingStrategySize"]
338    strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>,
339
340    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-stream>
341    stream: MutNullableDom<WritableStream>,
342
343    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-abortcontroller>
344    abort_controller: Dom<AbortController>,
345}
346
347impl WritableStreamDefaultController {
348    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink>
349    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
350    fn new_inherited(
351        cx: &mut JSContext,
352        global: &GlobalScope,
353        underlying_sink_type: UnderlyingSinkType,
354        strategy_hwm: f64,
355        strategy_size: Rc<QueuingStrategySize>,
356    ) -> WritableStreamDefaultController {
357        WritableStreamDefaultController {
358            reflector_: Reflector::new(),
359            underlying_sink_type,
360            queue: Default::default(),
361            stream: Default::default(),
362            underlying_sink_obj: Default::default(),
363            strategy_hwm,
364            strategy_size: RefCell::new(Some(strategy_size)),
365            started: Default::default(),
366            abort_controller: Dom::from_ref(&AbortController::new_with_proto(cx, global, None)),
367        }
368    }
369
370    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
371    pub(crate) fn new(
372        cx: &mut JSContext,
373        global: &GlobalScope,
374        underlying_sink_type: UnderlyingSinkType,
375        strategy_hwm: f64,
376        strategy_size: Rc<QueuingStrategySize>,
377    ) -> DomRoot<WritableStreamDefaultController> {
378        reflect_dom_object_with_cx(
379            Box::new(WritableStreamDefaultController::new_inherited(
380                cx,
381                global,
382                underlying_sink_type,
383                strategy_hwm,
384                strategy_size,
385            )),
386            global,
387            cx,
388        )
389    }
390
391    pub(crate) fn started(&self) -> bool {
392        self.started.get()
393    }
394
395    /// Setting the JS object after the heap has settled down.
396    pub(crate) fn set_underlying_sink_this_object(&self, this_object: SafeHandleObject) {
397        self.underlying_sink_obj.set(*this_object);
398    }
399
400    /// "Signal abort" call from <https://streams.spec.whatwg.org/#writable-stream-abort>
401    pub(crate) fn signal_abort(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) {
402        self.abort_controller.signal_abort(cx, reason);
403    }
404
405    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-clear-algorithms>
406    fn clear_algorithms(&self) {
407        match &self.underlying_sink_type {
408            UnderlyingSinkType::Js {
409                abort,
410                start: _,
411                close,
412                write,
413            } => {
414                // Set controller.[[writeAlgorithm]] to undefined.
415                write.borrow_mut().take();
416
417                // Set controller.[[closeAlgorithm]] to undefined.
418                close.borrow_mut().take();
419
420                // Set controller.[[abortAlgorithm]] to undefined.
421                abort.borrow_mut().take();
422            },
423            UnderlyingSinkType::Transfer {
424                backpressure_promise,
425                ..
426            } => {
427                backpressure_promise.borrow_mut().take();
428            },
429            UnderlyingSinkType::Transform(_, _) => {
430                return;
431            },
432        }
433
434        // Set controller.[[strategySizeAlgorithm]] to undefined.
435        self.strategy_size.borrow_mut().take();
436    }
437
438    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
439    pub(crate) fn setup(
440        &self,
441        cx: &mut JSContext,
442        global: &GlobalScope,
443        stream: &WritableStream,
444    ) -> Result<(), Error> {
445        // Assert: stream implements WritableStream.
446        // Implied by stream type.
447
448        // Assert: stream.[[controller]] is undefined.
449        stream.assert_no_controller();
450
451        // Set controller.[[stream]] to stream.
452        self.stream.set(Some(stream));
453
454        // Set stream.[[controller]] to controller.
455        stream.set_default_controller(self);
456
457        // Perform ! ResetQueue(controller).
458
459        // Set controller.[[abortController]] to a new AbortController.
460
461        // Set controller.[[started]] to false.
462
463        // Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm.
464
465        // Set controller.[[strategyHWM]] to highWaterMark.
466
467        // Set controller.[[writeAlgorithm]] to writeAlgorithm.
468
469        // Set controller.[[closeAlgorithm]] to closeAlgorithm.
470
471        // Set controller.[[abortAlgorithm]] to abortAlgorithm.
472
473        // Note: above steps are done in `new_inherited`.
474
475        // Let backpressure be ! WritableStreamDefaultControllerGetBackpressure(controller).
476        let backpressure = self.get_backpressure();
477
478        // Perform ! WritableStreamUpdateBackpressure(stream, backpressure).
479        stream.update_backpressure(cx, backpressure, global);
480
481        // Let startResult be the result of performing startAlgorithm. (This may throw an exception.)
482        // Let startPromise be a promise resolved with startResult.
483        let start_promise = self.start_algorithm(cx, global)?;
484
485        let rooted_default_controller = DomRoot::from_ref(self);
486
487        // Upon fulfillment of startPromise,
488        rooted!(&in(cx) let mut fulfillment_handler = Some(StartAlgorithmFulfillmentHandler {
489            controller: Dom::from_ref(&rooted_default_controller),
490        }));
491
492        // Upon rejection of startPromise with reason r,
493        rooted!(&in(cx) let mut rejection_handler = Some(StartAlgorithmRejectionHandler {
494            controller: Dom::from_ref(&rooted_default_controller),
495        }));
496
497        let handler = PromiseNativeHandler::new(
498            cx,
499            global,
500            fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
501            rejection_handler.take().map(|h| Box::new(h) as Box<_>),
502        );
503        let mut realm = enter_auto_realm(cx, global);
504        let cx = &mut realm.current_realm();
505        start_promise.append_native_handler(cx, &handler);
506
507        Ok(())
508    }
509
510    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-close>
511    pub(crate) fn close(&self, cx: &mut JSContext, global: &GlobalScope) {
512        // Perform ! EnqueueValueWithSize(controller, close sentinel, 0).
513        self.queue
514            .enqueue_value_with_size(EnqueuedValue::CloseSentinel)
515            .expect("Enqueuing the close sentinel should not fail.");
516        // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
517        self.advance_queue_if_needed(cx, global);
518    }
519
520    #[expect(unsafe_code)]
521    fn start_algorithm(&self, cx: &mut JSContext, global: &GlobalScope) -> Fallible<Rc<Promise>> {
522        match &self.underlying_sink_type {
523            UnderlyingSinkType::Js {
524                start,
525                abort: _,
526                close: _,
527                write: _,
528            } => {
529                let algo = start.borrow().clone();
530                let start_promise = if let Some(start) = algo {
531                    rooted!(&in(cx) let mut result_object = ptr::null_mut::<JSObject>());
532                    rooted!(&in(cx) let mut result: JSVal);
533                    rooted!(&in(cx) let this_object = self.underlying_sink_obj.get());
534                    start.Call_(
535                        cx,
536                        &this_object.handle(),
537                        self,
538                        result.handle_mut(),
539                        ExceptionHandling::Rethrow,
540                    )?;
541                    let is_promise = unsafe {
542                        if result.is_object() {
543                            result_object.set(result.to_object());
544                            IsPromiseObject(result_object.handle().into_handle())
545                        } else {
546                            false
547                        }
548                    };
549                    if is_promise {
550                        Promise::new_with_js_promise(cx, result_object.handle())
551                    } else {
552                        Promise::new_resolved(cx, global, result.get())
553                    }
554                } else {
555                    // Let startAlgorithm be an algorithm that returns undefined.
556                    Promise::new_resolved(cx, global, ())
557                };
558
559                Ok(start_promise)
560            },
561            UnderlyingSinkType::Transfer { .. } => {
562                // Let startAlgorithm be an algorithm that returns undefined.
563                Ok(Promise::new_resolved(cx, global, ()))
564            },
565            UnderlyingSinkType::Transform(_, start_promise) => {
566                // Let startAlgorithm be an algorithm that returns startPromise.
567                Ok(start_promise.clone())
568            },
569        }
570    }
571
572    /// <https://streams.spec.whatwg.org/#ref-for-abstract-opdef-writablestreamcontroller-abortsteps>
573    pub(crate) fn abort_steps(
574        &self,
575        cx: &mut JSContext,
576        global: &GlobalScope,
577        reason: SafeHandleValue,
578    ) -> Rc<Promise> {
579        let result = match &self.underlying_sink_type {
580            UnderlyingSinkType::Js {
581                abort,
582                start: _,
583                close: _,
584                write: _,
585            } => {
586                rooted!(&in(cx) let this_object = self.underlying_sink_obj.get());
587                let algo = abort.borrow().clone();
588                // Let result be the result of performing this.[[abortAlgorithm]], passing reason.
589                let result = if let Some(algo) = algo {
590                    algo.Call_(
591                        cx,
592                        &this_object.handle(),
593                        Some(reason),
594                        ExceptionHandling::Rethrow,
595                    )
596                } else {
597                    Ok(Promise::new_resolved(cx, global, ()))
598                };
599                result.unwrap_or_else(|e| {
600                    let promise = Promise::new(cx, global);
601                    promise.reject_error(cx, e);
602                    promise
603                })
604            },
605            UnderlyingSinkType::Transfer { port, .. } => {
606                // The steps from the `abortAlgorithm` at
607                // <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
608
609                // Let result be PackAndPostMessageHandlingError(port, "error", reason).
610                let result = port.pack_and_post_message_handling_error(cx, "error", reason);
611
612                // Disentangle port.
613                global.disentangle_port(cx, port);
614
615                let promise = Promise::new(cx, global);
616
617                // If result is an abrupt completion, return a promise rejected with result.[[Value]]
618                if let Err(error) = result {
619                    promise.reject_error(cx, error);
620                } else {
621                    // Otherwise, return a promise resolved with undefined.
622                    promise.resolve_native(cx, &());
623                }
624                promise
625            },
626            UnderlyingSinkType::Transform(stream, _) => {
627                // Return ! TransformStreamDefaultSinkAbortAlgorithm(stream, reason).
628                stream
629                    .transform_stream_default_sink_abort_algorithm(cx, global, reason)
630                    .expect("Transform stream default sink abort algorithm should not fail.")
631            },
632        };
633
634        // Perform ! WritableStreamDefaultControllerClearAlgorithms(controller).
635        self.clear_algorithms();
636
637        result
638    }
639
640    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-writealgorithm>
641    fn call_write_algorithm(
642        &self,
643        cx: &mut JSContext,
644        chunk: SafeHandleValue,
645        global: &GlobalScope,
646    ) -> Rc<Promise> {
647        match &self.underlying_sink_type {
648            UnderlyingSinkType::Js {
649                abort: _,
650                start: _,
651                close: _,
652                write,
653            } => {
654                rooted!(&in(cx) let this_object = self.underlying_sink_obj.get());
655                let algo = write.borrow().clone();
656                let result = if let Some(algo) = algo {
657                    algo.Call_(
658                        cx,
659                        &this_object.handle(),
660                        chunk,
661                        self,
662                        ExceptionHandling::Rethrow,
663                    )
664                } else {
665                    Ok(Promise::new_resolved(cx, global, ()))
666                };
667                result.unwrap_or_else(|e| {
668                    let promise = Promise::new(cx, global);
669                    promise.reject_error(cx, e);
670                    promise
671                })
672            },
673            UnderlyingSinkType::Transfer {
674                backpressure_promise,
675                port,
676            } => {
677                // The steps from the `writeAlgorithm` at
678                // <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
679
680                // If backpressurePromise is undefined,
681                // set backpressurePromise to a promise resolved with undefined.
682                if backpressure_promise.borrow().is_none() {
683                    let promise = Promise::new_resolved(cx, global, ());
684                    *backpressure_promise.borrow_mut() = Some(promise);
685                }
686
687                // Return the result of reacting to backpressurePromise with the following fulfillment steps:
688                let result_promise = Promise::new(cx, global);
689                rooted!(&in(cx) let mut fulfillment_handler = Some(TransferBackPressurePromiseReaction {
690                    port: port.clone(),
691                    backpressure_promise: backpressure_promise.clone(),
692                    chunk: Heap::boxed(chunk.get()),
693                    result_promise: result_promise.clone(),
694                }));
695                let handler = PromiseNativeHandler::new(
696                    cx,
697                    global,
698                    fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
699                    None,
700                );
701                let mut realm = enter_auto_realm(cx, global);
702                let realm = &mut realm.current_realm();
703                backpressure_promise
704                    .borrow()
705                    .as_ref()
706                    .expect("Promise must be some by now.")
707                    .append_native_handler(realm, &handler);
708                result_promise
709            },
710            UnderlyingSinkType::Transform(stream, _) => {
711                // Return ! TransformStreamDefaultSinkWriteAlgorithm(stream, chunk).
712                stream
713                    .transform_stream_default_sink_write_algorithm(cx, global, chunk)
714                    .expect("Transform stream default sink write algorithm should not fail.")
715            },
716        }
717    }
718
719    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-closealgorithm>
720    fn call_close_algorithm(&self, cx: &mut JSContext, global: &GlobalScope) -> Rc<Promise> {
721        match &self.underlying_sink_type {
722            UnderlyingSinkType::Js {
723                abort: _,
724                start: _,
725                close,
726                write: _,
727            } => {
728                rooted!(&in(cx) let mut this_object = ptr::null_mut::<JSObject>());
729                this_object.set(self.underlying_sink_obj.get());
730                let algo = close.borrow().clone();
731                let result = if let Some(algo) = algo {
732                    algo.Call_(cx, &this_object.handle(), ExceptionHandling::Rethrow)
733                } else {
734                    Ok(Promise::new_resolved(cx, global, ()))
735                };
736                result.unwrap_or_else(|e| {
737                    let promise = Promise::new(cx, global);
738                    promise.reject_error(cx, e);
739                    promise
740                })
741            },
742            UnderlyingSinkType::Transfer { port, .. } => {
743                // The steps from the `closeAlgorithm` at
744                // <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
745
746                // Perform ! PackAndPostMessage(port, "close", undefined).
747                rooted!(&in(cx) let mut value = UndefinedValue());
748                port.pack_and_post_message(cx, "close", value.handle())
749                    .expect("Sending close should not fail.");
750
751                // Disentangle port.
752                global.disentangle_port(cx, port);
753
754                // Return a promise resolved with undefined.
755                Promise::new_resolved(cx, global, ())
756            },
757            UnderlyingSinkType::Transform(stream, _) => {
758                // Return ! TransformStreamDefaultSinkCloseAlgorithm(stream).
759                stream
760                    .transform_stream_default_sink_close_algorithm(cx, global)
761                    .expect("Transform stream default sink close algorithm should not fail.")
762            },
763        }
764    }
765
766    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close>
767    pub(crate) fn process_close(&self, cx: &mut JSContext, global: &GlobalScope) {
768        // Let stream be controller.[[stream]].
769        let Some(stream) = self.stream.get() else {
770            unreachable!("Controller should have a stream");
771        };
772
773        // Perform ! WritableStreamMarkCloseRequestInFlight(stream).
774        stream.mark_close_request_in_flight();
775
776        // Perform ! DequeueValue(controller).
777        self.queue.dequeue_value(cx, None);
778
779        // Assert: controller.[[queue]] is empty.
780        assert!(self.queue.is_empty());
781
782        // Let sinkClosePromise be the result of performing controller.[[closeAlgorithm]].
783        let sink_close_promise = self.call_close_algorithm(cx, global);
784
785        // Perform ! WritableStreamDefaultControllerClearAlgorithms(controller).
786        self.clear_algorithms();
787
788        // Upon fulfillment of sinkClosePromise,
789        rooted!(&in(cx) let mut fulfillment_handler = Some(CloseAlgorithmFulfillmentHandler {
790            stream: Dom::from_ref(&stream),
791        }));
792
793        // Upon rejection of sinkClosePromise with reason reason,
794        rooted!(&in(cx) let mut rejection_handler = Some(CloseAlgorithmRejectionHandler {
795            stream: Dom::from_ref(&stream),
796        }));
797
798        // Attach handlers to the promise.
799        let handler = PromiseNativeHandler::new(
800            cx,
801            global,
802            fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
803            rejection_handler.take().map(|h| Box::new(h) as Box<_>),
804        );
805        let mut realm = enter_auto_realm(cx, global);
806        let realm = &mut realm.current_realm();
807        sink_close_promise.append_native_handler(realm, &handler);
808    }
809
810    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-advance-queue-if-needed>
811    fn advance_queue_if_needed(&self, cx: &mut JSContext, global: &GlobalScope) {
812        // Let stream be controller.[[stream]].
813        let Some(stream) = self.stream.get() else {
814            unreachable!("Controller should have a stream");
815        };
816
817        // If controller.[[started]] is false, return.
818        if !self.started.get() {
819            return;
820        }
821
822        // If stream.[[inFlightWriteRequest]] is not undefined, return.
823        if stream.has_in_flight_write_request() {
824            return;
825        }
826
827        // Let state be stream.[[state]].
828
829        // Assert: state is not "closed" or "errored".
830        assert!(!(stream.is_errored() || stream.is_closed()));
831
832        // If state is "erroring",
833        if stream.is_erroring() {
834            // Perform ! WritableStreamFinishErroring(stream).
835            stream.finish_erroring(cx, global);
836
837            // Return.
838            return;
839        }
840
841        // Let value be ! PeekQueueValue(controller).
842        rooted!(&in(cx) let mut value = UndefinedValue());
843        let is_closed = {
844            // If controller.[[queue]] is empty, return.
845            if self.queue.is_empty() {
846                return;
847            }
848            self.queue.peek_queue_value(cx, value.handle_mut())
849        };
850
851        if is_closed {
852            // If value is the close sentinel, perform ! WritableStreamDefaultControllerProcessClose(controller).
853            self.process_close(cx, global);
854        } else {
855            // Otherwise, perform ! WritableStreamDefaultControllerProcessWrite(controller, value).
856            self.process_write(cx, value.handle(), global);
857        };
858    }
859
860    /// <https://streams.spec.whatwg.org/#ws-default-controller-private-error>
861    pub(crate) fn perform_error_steps(&self) {
862        // Perform ! ResetQueue(this).
863        self.queue.reset();
864    }
865
866    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write>
867    fn process_write(&self, cx: &mut JSContext, chunk: SafeHandleValue, global: &GlobalScope) {
868        // Let stream be controller.[[stream]].
869        let Some(stream) = self.stream.get() else {
870            unreachable!("Controller should have a stream");
871        };
872
873        // Perform ! WritableStreamMarkFirstWriteRequestInFlight(stream).
874        stream.mark_first_write_request_in_flight();
875
876        // Let sinkWritePromise be the result of performing controller.[[writeAlgorithm]], passing in chunk.
877        let sink_write_promise = self.call_write_algorithm(cx, chunk, global);
878
879        // Upon fulfillment of sinkWritePromise,
880        rooted!(&in(cx) let mut fulfillment_handler = Some(WriteAlgorithmFulfillmentHandler {
881            controller: Dom::from_ref(self),
882        }));
883
884        // Upon rejection of sinkWritePromise with reason,
885        rooted!(&in(cx) let mut rejection_handler = Some(WriteAlgorithmRejectionHandler {
886            controller: Dom::from_ref(self),
887        }));
888
889        // Attach handlers to the promise.
890        let handler = PromiseNativeHandler::new(
891            cx,
892            global,
893            fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
894            rejection_handler.take().map(|h| Box::new(h) as Box<_>),
895        );
896        let mut realm = enter_auto_realm(cx, global);
897        let realm = &mut realm.current_realm();
898        sink_write_promise.append_native_handler(realm, &handler);
899    }
900
901    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-get-desired-size>
902    pub(crate) fn get_desired_size(&self) -> f64 {
903        // Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
904        let desired_size = self.strategy_hwm - self.queue.total_size.get().clamp(0.0, f64::MAX);
905        desired_size.clamp(desired_size, self.strategy_hwm)
906    }
907
908    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-get-backpressure>
909    fn get_backpressure(&self) -> bool {
910        // Let desiredSize be ! WritableStreamDefaultControllerGetDesiredSize(controller).
911        let desired_size = self.get_desired_size();
912
913        // Return true if desiredSize ≤ 0, or false otherwise.
914        desired_size == 0.0 || desired_size.is_sign_negative()
915    }
916
917    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-get-chunk-size>
918    pub(crate) fn get_chunk_size(
919        &self,
920        cx: &mut JSContext,
921        global: &GlobalScope,
922        chunk: SafeHandleValue,
923    ) -> f64 {
924        // If controller.[[strategySizeAlgorithm]] is undefined, then:
925        let Some(strategy_size) = self.strategy_size.borrow().clone() else {
926            // Assert: controller.[[stream]].[[state]] is not "writable".
927            let Some(stream) = self.stream.get() else {
928                unreachable!("Controller should have a stream");
929            };
930            assert!(!stream.is_writable());
931
932            // Return 1.
933            return 1.0;
934        };
935
936        // Let returnValue be the result of performing controller.[[strategySizeAlgorithm]],
937        // passing in chunk, and interpreting the result as a completion record.
938        let result = strategy_size.Call__(cx, chunk, ExceptionHandling::Rethrow);
939
940        match result {
941            // Let chunkSize be result.[[Value]].
942            Ok(size) => size,
943            Err(error) => {
944                // If result is an abrupt completion,
945
946                // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, returnValue.[[Value]]).
947                // Create a rooted value for the error.
948                rooted!(&in(cx) let mut rooted_error = UndefinedValue());
949                error.to_jsval(cx, global, rooted_error.handle_mut());
950                self.error_if_needed(cx, rooted_error.handle(), global);
951
952                // Return 1.
953                1.0
954            },
955        }
956    }
957
958    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-write>
959    pub(crate) fn write(
960        &self,
961        cx: &mut JSContext,
962        global: &GlobalScope,
963        chunk: SafeHandleValue,
964        chunk_size: f64,
965    ) {
966        // Let enqueueResult be EnqueueValueWithSize(controller, chunk, chunkSize).
967        let enqueue_result = self
968            .queue
969            .enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize {
970                value: Heap::boxed(chunk.get()),
971                size: chunk_size,
972            }));
973
974        // If enqueueResult is an abrupt completion,
975        if let Err(error) = enqueue_result {
976            // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, enqueueResult.[[Value]]).
977            // Create a rooted value for the error.
978            rooted!(&in(cx) let mut rooted_error = UndefinedValue());
979            error.to_jsval(cx, global, rooted_error.handle_mut());
980            self.error_if_needed(cx, rooted_error.handle(), global);
981
982            // Return.
983            return;
984        }
985
986        // Let stream be controller.[[stream]].
987        let Some(stream) = self.stream.get() else {
988            unreachable!("Controller should have a stream");
989        };
990
991        // If ! WritableStreamCloseQueuedOrInFlight(stream) is false and stream.[[state]] is "writable",
992        if !stream.close_queued_or_in_flight() && stream.is_writable() {
993            // Let backpressure be ! WritableStreamDefaultControllerGetBackpressure(controller).
994            let backpressure = self.get_backpressure();
995
996            // Perform ! WritableStreamUpdateBackpressure(stream, backpressure).
997            stream.update_backpressure(cx, backpressure, global);
998        }
999
1000        // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
1001        self.advance_queue_if_needed(cx, global);
1002    }
1003
1004    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-error-if-needed>
1005    pub(crate) fn error_if_needed(
1006        &self,
1007        cx: &mut JSContext,
1008        error: SafeHandleValue,
1009        global: &GlobalScope,
1010    ) {
1011        // Let stream be controller.[[stream]].
1012        let Some(stream) = self.stream.get() else {
1013            unreachable!("Controller should have a stream");
1014        };
1015
1016        // If stream.[[state]] is "writable",
1017        if stream.is_writable() {
1018            // Perform ! WritableStreamDefaultControllerError(controller, e).
1019            self.error(cx, &stream, error, global);
1020        }
1021    }
1022
1023    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-error>
1024    fn error(
1025        &self,
1026        cx: &mut JSContext,
1027        stream: &WritableStream,
1028        e: SafeHandleValue,
1029        global: &GlobalScope,
1030    ) {
1031        // Let stream be controller.[[stream]].
1032        // Done above with the argument.
1033
1034        // Assert: stream.[[state]] is "writable".
1035        assert!(stream.is_writable());
1036
1037        // Perform ! WritableStreamDefaultControllerClearAlgorithms(controller).
1038        self.clear_algorithms();
1039
1040        // Perform ! WritableStreamStartErroring(stream, error).
1041        stream.start_erroring(cx, global, e);
1042    }
1043}
1044
1045impl WritableStreamDefaultControllerMethods<crate::DomTypeHolder>
1046    for WritableStreamDefaultController
1047{
1048    /// <https://streams.spec.whatwg.org/#ws-default-controller-error>
1049    fn Error(&self, cx: &mut CurrentRealm, e: SafeHandleValue) {
1050        // Let state be this.[[stream]].[[state]].
1051        let Some(stream) = self.stream.get() else {
1052            unreachable!("Controller should have a stream");
1053        };
1054
1055        // If state is not "writable", return.
1056        if !stream.is_writable() {
1057            return;
1058        }
1059
1060        let global = GlobalScope::from_current_realm(cx);
1061
1062        // Perform ! WritableStreamDefaultControllerError(this, e).
1063        self.error(cx, &stream, e, &global);
1064    }
1065
1066    /// <https://streams.spec.whatwg.org/#ws-default-controller-signal>
1067    fn Signal(&self) -> DomRoot<AbortSignal> {
1068        // Return this.[[abortController]]’s signal.
1069        self.abort_controller.signal()
1070    }
1071}