script/dom/
writablestreamdefaultcontroller.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::ptr;
7use std::rc::Rc;
8
9use dom_struct::dom_struct;
10use js::jsapi::{Heap, IsPromiseObject, JSObject};
11use js::jsval::{JSVal, UndefinedValue};
12use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle};
13
14use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
15use super::types::TransformStream;
16use crate::dom::bindings::callback::ExceptionHandling;
17use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::{
18    UnderlyingSinkAbortCallback, UnderlyingSinkCloseCallback, UnderlyingSinkStartCallback,
19    UnderlyingSinkWriteCallback,
20};
21use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultControllerBinding::WritableStreamDefaultControllerMethods;
22use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
23use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
24use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
25use crate::dom::globalscope::GlobalScope;
26use crate::dom::messageport::MessagePort;
27use crate::dom::promise::Promise;
28use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
29use crate::dom::readablestreamdefaultcontroller::{EnqueuedValue, QueueWithSizes, ValueWithSize};
30use crate::dom::types::{AbortController, AbortSignal};
31use crate::dom::writablestream::WritableStream;
32use crate::realms::{InRealm, enter_realm};
33use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
34
35impl js::gc::Rootable for CloseAlgorithmFulfillmentHandler {}
36
37/// The fulfillment handler for
38/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close>
39#[derive(Clone, JSTraceable, MallocSizeOf)]
40#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
41struct CloseAlgorithmFulfillmentHandler {
42    stream: Dom<WritableStream>,
43}
44
45impl Callback for CloseAlgorithmFulfillmentHandler {
46    fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
47        let stream = self.stream.as_rooted();
48
49        // Perform ! WritableStreamFinishInFlightClose(stream).
50        stream.finish_in_flight_close(cx, can_gc);
51    }
52}
53
54impl js::gc::Rootable for CloseAlgorithmRejectionHandler {}
55
56/// The rejection handler for
57/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close>
58#[derive(Clone, JSTraceable, MallocSizeOf)]
59#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
60struct CloseAlgorithmRejectionHandler {
61    stream: Dom<WritableStream>,
62}
63
64impl Callback for CloseAlgorithmRejectionHandler {
65    fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
66        let stream = self.stream.as_rooted();
67
68        let global = GlobalScope::from_safe_context(cx, realm);
69
70        // Perform ! WritableStreamFinishInFlightCloseWithError(stream, reason).
71        stream.finish_in_flight_close_with_error(cx, &global, v, can_gc);
72    }
73}
74
75impl js::gc::Rootable for StartAlgorithmFulfillmentHandler {}
76
77/// The fulfillment handler for
78/// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
79#[derive(Clone, JSTraceable, MallocSizeOf)]
80#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
81struct StartAlgorithmFulfillmentHandler {
82    controller: Dom<WritableStreamDefaultController>,
83}
84
85impl Callback for StartAlgorithmFulfillmentHandler {
86    /// Continuation of <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
87    /// Upon fulfillment of startPromise,
88    fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
89        let controller = self.controller.as_rooted();
90        let stream = controller
91            .stream
92            .get()
93            .expect("Controller should have a stream.");
94
95        // Assert: stream.[[state]] is "writable" or "erroring".
96        assert!(stream.is_erroring() || stream.is_writable());
97
98        // Set controller.[[started]] to true.
99        controller.started.set(true);
100
101        let global = GlobalScope::from_safe_context(cx, realm);
102
103        // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
104        controller.advance_queue_if_needed(cx, &global, can_gc)
105    }
106}
107
108impl js::gc::Rootable for StartAlgorithmRejectionHandler {}
109
110/// The rejection handler for
111/// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
112#[derive(Clone, JSTraceable, MallocSizeOf)]
113#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
114struct StartAlgorithmRejectionHandler {
115    controller: Dom<WritableStreamDefaultController>,
116}
117
118impl Callback for StartAlgorithmRejectionHandler {
119    /// Continuation of <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
120    /// Upon rejection of startPromise with reason r,
121    fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
122        let controller = self.controller.as_rooted();
123        let stream = controller
124            .stream
125            .get()
126            .expect("Controller should have a stream.");
127
128        // Assert: stream.[[state]] is "writable" or "erroring".
129        assert!(stream.is_erroring() || stream.is_writable());
130
131        // Set controller.[[started]] to true.
132        controller.started.set(true);
133
134        let global = GlobalScope::from_safe_context(cx, realm);
135
136        // Perform ! WritableStreamDealWithRejection(stream, r).
137        stream.deal_with_rejection(cx, &global, v, can_gc);
138    }
139}
140
141impl js::gc::Rootable for TransferBackPressurePromiseReaction {}
142
143/// Reacting to backpressurePromise as part of the `writeAlgorithm` of
144/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
145#[derive(JSTraceable, MallocSizeOf)]
146#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
147struct TransferBackPressurePromiseReaction {
148    /// The result of reacting to backpressurePromise.
149    #[ignore_malloc_size_of = "Rc is hard"]
150    result_promise: Rc<Promise>,
151
152    /// The backpressurePromise.
153    #[ignore_malloc_size_of = "Rc is hard"]
154    backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
155
156    /// The chunk received by the `writeAlgorithm`.
157    #[ignore_malloc_size_of = "mozjs"]
158    chunk: Box<Heap<JSVal>>,
159
160    /// The port used in the algorithm.
161    port: Dom<MessagePort>,
162}
163
164impl Callback for TransferBackPressurePromiseReaction {
165    /// Reacting to backpressurePromise with the following fulfillment steps:
166    fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
167        let global = self.result_promise.global();
168        // Set backpressurePromise to a new promise.
169        *self.backpressure_promise.borrow_mut() = Some(Promise::new(&global, can_gc));
170
171        // Let result be PackAndPostMessageHandlingError(port, "chunk", chunk).
172        rooted!(in(*cx) let mut chunk = UndefinedValue());
173        chunk.set(self.chunk.get());
174        let result =
175            self.port
176                .pack_and_post_message_handling_error("chunk", chunk.handle(), can_gc);
177
178        // If result is an abrupt completion,
179        if let Err(error) = result {
180            // Disentangle port.
181            global.disentangle_port(&self.port, can_gc);
182
183            // Return a promise rejected with result.[[Value]].
184            self.result_promise.reject_error(error, can_gc);
185        } else {
186            // Otherwise, return a promise resolved with undefined.
187            self.result_promise.resolve_native(&(), can_gc);
188        }
189    }
190}
191
192impl js::gc::Rootable for WriteAlgorithmFulfillmentHandler {}
193
194/// The fulfillment handler for
195/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write>
196#[derive(Clone, JSTraceable, MallocSizeOf)]
197#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
198struct WriteAlgorithmFulfillmentHandler {
199    controller: Dom<WritableStreamDefaultController>,
200}
201
202impl Callback for WriteAlgorithmFulfillmentHandler {
203    fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
204        let controller = self.controller.as_rooted();
205        let stream = controller
206            .stream
207            .get()
208            .expect("Controller should have a stream.");
209
210        // Perform ! WritableStreamFinishInFlightWrite(stream).
211        stream.finish_in_flight_write(can_gc);
212
213        // Let state be stream.[[state]].
214        // Assert: state is "writable" or "erroring".
215        assert!(stream.is_erroring() || stream.is_writable());
216
217        // Perform ! DequeueValue(controller).
218        {
219            rooted!(in(*cx) let mut rval = UndefinedValue());
220            let mut queue = controller.queue.borrow_mut();
221            queue.dequeue_value(cx, Some(rval.handle_mut()), can_gc);
222        }
223
224        let global = GlobalScope::from_safe_context(cx, realm);
225
226        // If ! WritableStreamCloseQueuedOrInFlight(stream) is false and state is "writable",
227        if !stream.close_queued_or_in_flight() && stream.is_writable() {
228            // Let backpressure be ! WritableStreamDefaultControllerGetBackpressure(controller).
229            let backpressure = controller.get_backpressure();
230
231            // Perform ! WritableStreamUpdateBackpressure(stream, backpressure).
232            stream.update_backpressure(backpressure, &global, can_gc);
233        }
234
235        // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
236        controller.advance_queue_if_needed(cx, &global, can_gc)
237    }
238}
239
240impl js::gc::Rootable for WriteAlgorithmRejectionHandler {}
241
242/// The rejection handler for
243/// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write>
244#[derive(Clone, JSTraceable, MallocSizeOf)]
245#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
246struct WriteAlgorithmRejectionHandler {
247    controller: Dom<WritableStreamDefaultController>,
248}
249
250impl Callback for WriteAlgorithmRejectionHandler {
251    fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
252        let controller = self.controller.as_rooted();
253        let stream = controller
254            .stream
255            .get()
256            .expect("Controller should have a stream.");
257
258        // If stream.[[state]] is "writable",
259        if stream.is_writable() {
260            // perform ! WritableStreamDefaultControllerClearAlgorithms(controller).
261            controller.clear_algorithms();
262        }
263
264        let global = GlobalScope::from_safe_context(cx, realm);
265
266        // Perform ! WritableStreamFinishInFlightWriteWithError(stream, reason).
267        stream.finish_in_flight_write_with_error(cx, &global, v, can_gc);
268    }
269}
270
271/// The type of sink algorithms we are using.
272#[derive(JSTraceable, PartialEq)]
273#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
274pub enum UnderlyingSinkType {
275    /// Algorithms are provided by Js callbacks.
276    Js {
277        /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-abortalgorithm>
278        abort: RefCell<Option<Rc<UnderlyingSinkAbortCallback>>>,
279
280        start: RefCell<Option<Rc<UnderlyingSinkStartCallback>>>,
281
282        /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-closealgorithm>
283        close: RefCell<Option<Rc<UnderlyingSinkCloseCallback>>>,
284
285        /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-writealgorithm>
286        write: RefCell<Option<Rc<UnderlyingSinkWriteCallback>>>,
287    },
288    /// Algorithms supporting streams transfer are implemented in Rust.
289    /// The promise and port used in those algorithms are stored here.
290    Transfer {
291        backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
292        port: Dom<MessagePort>,
293    },
294    /// Algorithms supporting transform streams are implemented in Rust.
295    Transform(Dom<TransformStream>, Rc<Promise>),
296}
297
298impl UnderlyingSinkType {
299    pub(crate) fn new_js(
300        abort: Option<Rc<UnderlyingSinkAbortCallback>>,
301        start: Option<Rc<UnderlyingSinkStartCallback>>,
302        close: Option<Rc<UnderlyingSinkCloseCallback>>,
303        write: Option<Rc<UnderlyingSinkWriteCallback>>,
304    ) -> Self {
305        UnderlyingSinkType::Js {
306            abort: RefCell::new(abort),
307            start: RefCell::new(start),
308            close: RefCell::new(close),
309            write: RefCell::new(write),
310        }
311    }
312}
313
314/// <https://streams.spec.whatwg.org/#ws-default-controller-class>
315#[dom_struct]
316pub struct WritableStreamDefaultController {
317    reflector_: Reflector,
318
319    /// The type of underlying sink used. Besides the default JS one,
320    /// there will be others for stream transfer, and for transform stream.
321    #[ignore_malloc_size_of = "underlying_sink_type"]
322    underlying_sink_type: UnderlyingSinkType,
323
324    /// The JS object used as `this` when invoking sink algorithms.
325    #[ignore_malloc_size_of = "mozjs"]
326    underlying_sink_obj: Heap<*mut JSObject>,
327
328    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-queue>
329    queue: RefCell<QueueWithSizes>,
330
331    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-started>
332    started: Cell<bool>,
333
334    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-strategyhwm>
335    strategy_hwm: f64,
336
337    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-strategysizealgorithm>
338    #[ignore_malloc_size_of = "Rc is hard"]
339    strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>,
340
341    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-stream>
342    stream: MutNullableDom<WritableStream>,
343
344    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-abortcontroller>
345    abort_controller: Dom<AbortController>,
346}
347
348impl WritableStreamDefaultController {
349    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink>
350    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
351    fn new_inherited(
352        global: &GlobalScope,
353        underlying_sink_type: UnderlyingSinkType,
354        strategy_hwm: f64,
355        strategy_size: Rc<QueuingStrategySize>,
356        can_gc: CanGc,
357    ) -> WritableStreamDefaultController {
358        WritableStreamDefaultController {
359            reflector_: Reflector::new(),
360            underlying_sink_type,
361            queue: Default::default(),
362            stream: Default::default(),
363            underlying_sink_obj: Default::default(),
364            strategy_hwm,
365            strategy_size: RefCell::new(Some(strategy_size)),
366            started: Default::default(),
367            abort_controller: Dom::from_ref(&AbortController::new_with_proto(global, None, can_gc)),
368        }
369    }
370
371    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
372    pub(crate) fn new(
373        global: &GlobalScope,
374        underlying_sink_type: UnderlyingSinkType,
375        strategy_hwm: f64,
376        strategy_size: Rc<QueuingStrategySize>,
377        can_gc: CanGc,
378    ) -> DomRoot<WritableStreamDefaultController> {
379        reflect_dom_object(
380            Box::new(WritableStreamDefaultController::new_inherited(
381                global,
382                underlying_sink_type,
383                strategy_hwm,
384                strategy_size,
385                can_gc,
386            )),
387            global,
388            can_gc,
389        )
390    }
391
392    pub(crate) fn started(&self) -> bool {
393        self.started.get()
394    }
395
396    /// Setting the JS object after the heap has settled down.
397    pub(crate) fn set_underlying_sink_this_object(&self, this_object: SafeHandleObject) {
398        self.underlying_sink_obj.set(*this_object);
399    }
400
401    /// "Signal abort" call from <https://streams.spec.whatwg.org/#writable-stream-abort>
402    pub(crate) fn signal_abort(
403        &self,
404        cx: SafeJSContext,
405        reason: SafeHandleValue,
406        realm: InRealm,
407        can_gc: CanGc,
408    ) {
409        self.abort_controller
410            .signal_abort(cx, reason, realm, can_gc);
411    }
412
413    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-clear-algorithms>
414    fn clear_algorithms(&self) {
415        match &self.underlying_sink_type {
416            UnderlyingSinkType::Js {
417                abort,
418                start: _,
419                close,
420                write,
421            } => {
422                // Set controller.[[writeAlgorithm]] to undefined.
423                write.borrow_mut().take();
424
425                // Set controller.[[closeAlgorithm]] to undefined.
426                close.borrow_mut().take();
427
428                // Set controller.[[abortAlgorithm]] to undefined.
429                abort.borrow_mut().take();
430            },
431            UnderlyingSinkType::Transfer {
432                backpressure_promise,
433                ..
434            } => {
435                backpressure_promise.borrow_mut().take();
436            },
437            UnderlyingSinkType::Transform(_, _) => {
438                return;
439            },
440        }
441
442        // Set controller.[[strategySizeAlgorithm]] to undefined.
443        self.strategy_size.borrow_mut().take();
444    }
445
446    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
447    pub(crate) fn setup(
448        &self,
449        cx: SafeJSContext,
450        global: &GlobalScope,
451        stream: &WritableStream,
452        can_gc: CanGc,
453    ) -> Result<(), Error> {
454        // Assert: stream implements WritableStream.
455        // Implied by stream type.
456
457        // Assert: stream.[[controller]] is undefined.
458        stream.assert_no_controller();
459
460        // Set controller.[[stream]] to stream.
461        self.stream.set(Some(stream));
462
463        // Set stream.[[controller]] to controller.
464        stream.set_default_controller(self);
465
466        // Perform ! ResetQueue(controller).
467
468        // Set controller.[[abortController]] to a new AbortController.
469
470        // Set controller.[[started]] to false.
471
472        // Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm.
473
474        // Set controller.[[strategyHWM]] to highWaterMark.
475
476        // Set controller.[[writeAlgorithm]] to writeAlgorithm.
477
478        // Set controller.[[closeAlgorithm]] to closeAlgorithm.
479
480        // Set controller.[[abortAlgorithm]] to abortAlgorithm.
481
482        // Note: above steps are done in `new_inherited`.
483
484        // Let backpressure be ! WritableStreamDefaultControllerGetBackpressure(controller).
485        let backpressure = self.get_backpressure();
486
487        // Perform ! WritableStreamUpdateBackpressure(stream, backpressure).
488        stream.update_backpressure(backpressure, global, can_gc);
489
490        // Let startResult be the result of performing startAlgorithm. (This may throw an exception.)
491        // Let startPromise be a promise resolved with startResult.
492        let start_promise = self.start_algorithm(cx, global, can_gc)?;
493
494        let rooted_default_controller = DomRoot::from_ref(self);
495
496        // Upon fulfillment of startPromise,
497        rooted!(in(*cx) let mut fulfillment_handler = Some(StartAlgorithmFulfillmentHandler {
498            controller: Dom::from_ref(&rooted_default_controller),
499        }));
500
501        // Upon rejection of startPromise with reason r,
502        rooted!(in(*cx) let mut rejection_handler = Some(StartAlgorithmRejectionHandler {
503            controller: Dom::from_ref(&rooted_default_controller),
504        }));
505
506        let handler = PromiseNativeHandler::new(
507            global,
508            fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
509            rejection_handler.take().map(|h| Box::new(h) as Box<_>),
510            can_gc,
511        );
512        let realm = enter_realm(global);
513        let comp = InRealm::Entered(&realm);
514        start_promise.append_native_handler(&handler, comp, can_gc);
515
516        Ok(())
517    }
518
519    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-close>
520    pub(crate) fn close(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
521        // Perform ! EnqueueValueWithSize(controller, close sentinel, 0).
522        {
523            let mut queue = self.queue.borrow_mut();
524            queue
525                .enqueue_value_with_size(EnqueuedValue::CloseSentinel)
526                .expect("Enqueuing the close sentinel should not fail.");
527        }
528        // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
529        self.advance_queue_if_needed(cx, global, can_gc);
530    }
531
532    #[allow(unsafe_code)]
533    fn start_algorithm(
534        &self,
535        cx: SafeJSContext,
536        global: &GlobalScope,
537        can_gc: CanGc,
538    ) -> Fallible<Rc<Promise>> {
539        match &self.underlying_sink_type {
540            UnderlyingSinkType::Js {
541                start,
542                abort: _,
543                close: _,
544                write: _,
545            } => {
546                let algo = start.borrow().clone();
547                let start_promise = if let Some(start) = algo {
548                    rooted!(in(*cx) let mut result_object = ptr::null_mut::<JSObject>());
549                    rooted!(in(*cx) let mut result: JSVal);
550                    rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
551                    start.Call_(
552                        &this_object.handle(),
553                        self,
554                        result.handle_mut(),
555                        ExceptionHandling::Rethrow,
556                        can_gc,
557                    )?;
558                    let is_promise = unsafe {
559                        if result.is_object() {
560                            result_object.set(result.to_object());
561                            IsPromiseObject(result_object.handle().into_handle())
562                        } else {
563                            false
564                        }
565                    };
566                    if is_promise {
567                        Promise::new_with_js_promise(result_object.handle(), cx)
568                    } else {
569                        Promise::new_resolved(global, cx, result.get(), can_gc)
570                    }
571                } else {
572                    // Let startAlgorithm be an algorithm that returns undefined.
573                    Promise::new_resolved(global, cx, (), can_gc)
574                };
575
576                Ok(start_promise)
577            },
578            UnderlyingSinkType::Transfer { .. } => {
579                // Let startAlgorithm be an algorithm that returns undefined.
580                Ok(Promise::new_resolved(global, cx, (), can_gc))
581            },
582            UnderlyingSinkType::Transform(_, start_promise) => {
583                // Let startAlgorithm be an algorithm that returns startPromise.
584                Ok(start_promise.clone())
585            },
586        }
587    }
588
589    /// <https://streams.spec.whatwg.org/#ref-for-abstract-opdef-writablestreamcontroller-abortsteps>
590    pub(crate) fn abort_steps(
591        &self,
592        cx: SafeJSContext,
593        global: &GlobalScope,
594        reason: SafeHandleValue,
595        can_gc: CanGc,
596    ) -> Rc<Promise> {
597        let result = match &self.underlying_sink_type {
598            UnderlyingSinkType::Js {
599                abort,
600                start: _,
601                close: _,
602                write: _,
603            } => {
604                rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
605                let algo = abort.borrow().clone();
606                // Let result be the result of performing this.[[abortAlgorithm]], passing reason.
607                let result = if let Some(algo) = algo {
608                    algo.Call_(
609                        &this_object.handle(),
610                        Some(reason),
611                        ExceptionHandling::Rethrow,
612                        can_gc,
613                    )
614                } else {
615                    Ok(Promise::new_resolved(global, cx, (), can_gc))
616                };
617                result.unwrap_or_else(|e| {
618                    let promise = Promise::new(global, can_gc);
619                    promise.reject_error(e, can_gc);
620                    promise
621                })
622            },
623            UnderlyingSinkType::Transfer { port, .. } => {
624                // The steps from the `abortAlgorithm` at
625                // <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
626
627                // Let result be PackAndPostMessageHandlingError(port, "error", reason).
628                let result = port.pack_and_post_message_handling_error("error", reason, can_gc);
629
630                // Disentangle port.
631                global.disentangle_port(port, can_gc);
632
633                let promise = Promise::new(global, can_gc);
634
635                // If result is an abrupt completion, return a promise rejected with result.[[Value]]
636                if let Err(error) = result {
637                    promise.reject_error(error, can_gc);
638                } else {
639                    // Otherwise, return a promise resolved with undefined.
640                    promise.resolve_native(&(), can_gc);
641                }
642                promise
643            },
644            UnderlyingSinkType::Transform(stream, _) => {
645                // Return ! TransformStreamDefaultSinkAbortAlgorithm(stream, reason).
646                stream
647                    .transform_stream_default_sink_abort_algorithm(cx, global, reason, can_gc)
648                    .expect("Transform stream default sink abort algorithm should not fail.")
649            },
650        };
651
652        // Perform ! WritableStreamDefaultControllerClearAlgorithms(controller).
653        self.clear_algorithms();
654
655        result
656    }
657
658    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-writealgorithm>
659    fn call_write_algorithm(
660        &self,
661        cx: SafeJSContext,
662        chunk: SafeHandleValue,
663        global: &GlobalScope,
664        can_gc: CanGc,
665    ) -> Rc<Promise> {
666        match &self.underlying_sink_type {
667            UnderlyingSinkType::Js {
668                abort: _,
669                start: _,
670                close: _,
671                write,
672            } => {
673                rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
674                let algo = write.borrow().clone();
675                let result = if let Some(algo) = algo {
676                    algo.Call_(
677                        &this_object.handle(),
678                        chunk,
679                        self,
680                        ExceptionHandling::Rethrow,
681                        can_gc,
682                    )
683                } else {
684                    Ok(Promise::new_resolved(global, cx, (), can_gc))
685                };
686                result.unwrap_or_else(|e| {
687                    let promise = Promise::new(global, can_gc);
688                    promise.reject_error(e, can_gc);
689                    promise
690                })
691            },
692            UnderlyingSinkType::Transfer {
693                backpressure_promise,
694                port,
695            } => {
696                // The steps from the `writeAlgorithm` at
697                // <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
698
699                {
700                    // If backpressurePromise is undefined,
701                    // set backpressurePromise to a promise resolved with undefined.
702                    let mut backpressure_promise = backpressure_promise.borrow_mut();
703                    if backpressure_promise.is_none() {
704                        *backpressure_promise = Some(Promise::new_resolved(global, cx, (), can_gc));
705                    }
706                }
707
708                // Return the result of reacting to backpressurePromise with the following fulfillment steps:
709                let result_promise = Promise::new(global, can_gc);
710                rooted!(in(*cx) let mut fulfillment_handler = Some(TransferBackPressurePromiseReaction {
711                    port: port.clone(),
712                    backpressure_promise: backpressure_promise.clone(),
713                    chunk: Heap::boxed(chunk.get()),
714                    result_promise: result_promise.clone(),
715                }));
716                let handler = PromiseNativeHandler::new(
717                    global,
718                    fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
719                    None,
720                    can_gc,
721                );
722                let realm = enter_realm(global);
723                let comp = InRealm::Entered(&realm);
724                backpressure_promise
725                    .borrow()
726                    .as_ref()
727                    .expect("Promise must be some by now.")
728                    .append_native_handler(&handler, comp, can_gc);
729                result_promise
730            },
731            UnderlyingSinkType::Transform(stream, _) => {
732                // Return ! TransformStreamDefaultSinkWriteAlgorithm(stream, chunk).
733                stream
734                    .transform_stream_default_sink_write_algorithm(cx, global, chunk, can_gc)
735                    .expect("Transform stream default sink write algorithm should not fail.")
736            },
737        }
738    }
739
740    /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-closealgorithm>
741    fn call_close_algorithm(
742        &self,
743        cx: SafeJSContext,
744        global: &GlobalScope,
745        can_gc: CanGc,
746    ) -> Rc<Promise> {
747        match &self.underlying_sink_type {
748            UnderlyingSinkType::Js {
749                abort: _,
750                start: _,
751                close,
752                write: _,
753            } => {
754                rooted!(in(*cx) let mut this_object = ptr::null_mut::<JSObject>());
755                this_object.set(self.underlying_sink_obj.get());
756                let algo = close.borrow().clone();
757                let result = if let Some(algo) = algo {
758                    algo.Call_(&this_object.handle(), ExceptionHandling::Rethrow, can_gc)
759                } else {
760                    Ok(Promise::new_resolved(global, cx, (), can_gc))
761                };
762                result.unwrap_or_else(|e| {
763                    let promise = Promise::new(global, can_gc);
764                    promise.reject_error(e, can_gc);
765                    promise
766                })
767            },
768            UnderlyingSinkType::Transfer { port, .. } => {
769                // The steps from the `closeAlgorithm` at
770                // <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
771
772                // Perform ! PackAndPostMessage(port, "close", undefined).
773                rooted!(in(*cx) let mut value = UndefinedValue());
774                port.pack_and_post_message("close", value.handle(), can_gc)
775                    .expect("Sending close should not fail.");
776
777                // Disentangle port.
778                global.disentangle_port(port, can_gc);
779
780                // Return a promise resolved with undefined.
781                Promise::new_resolved(global, cx, (), can_gc)
782            },
783            UnderlyingSinkType::Transform(stream, _) => {
784                // Return ! TransformStreamDefaultSinkCloseAlgorithm(stream).
785                stream
786                    .transform_stream_default_sink_close_algorithm(cx, global, can_gc)
787                    .expect("Transform stream default sink close algorithm should not fail.")
788            },
789        }
790    }
791
792    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close>
793    pub(crate) fn process_close(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
794        // Let stream be controller.[[stream]].
795        let Some(stream) = self.stream.get() else {
796            unreachable!("Controller should have a stream");
797        };
798
799        // Perform ! WritableStreamMarkCloseRequestInFlight(stream).
800        stream.mark_close_request_in_flight();
801
802        // Perform ! DequeueValue(controller).
803        {
804            let mut queue = self.queue.borrow_mut();
805            queue.dequeue_value(cx, None, can_gc);
806        }
807
808        // Assert: controller.[[queue]] is empty.
809        assert!(self.queue.borrow().is_empty());
810
811        // Let sinkClosePromise be the result of performing controller.[[closeAlgorithm]].
812        let sink_close_promise = self.call_close_algorithm(cx, global, can_gc);
813
814        // Perform ! WritableStreamDefaultControllerClearAlgorithms(controller).
815        self.clear_algorithms();
816
817        // Upon fulfillment of sinkClosePromise,
818        rooted!(in(*cx) let mut fulfillment_handler = Some(CloseAlgorithmFulfillmentHandler {
819            stream: Dom::from_ref(&stream),
820        }));
821
822        // Upon rejection of sinkClosePromise with reason reason,
823        rooted!(in(*cx) let mut rejection_handler = Some(CloseAlgorithmRejectionHandler {
824            stream: Dom::from_ref(&stream),
825        }));
826
827        // Attach handlers to the promise.
828        let handler = PromiseNativeHandler::new(
829            global,
830            fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
831            rejection_handler.take().map(|h| Box::new(h) as Box<_>),
832            can_gc,
833        );
834        let realm = enter_realm(global);
835        let comp = InRealm::Entered(&realm);
836        sink_close_promise.append_native_handler(&handler, comp, can_gc);
837    }
838
839    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-advance-queue-if-needed>
840    fn advance_queue_if_needed(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
841        // Let stream be controller.[[stream]].
842        let Some(stream) = self.stream.get() else {
843            unreachable!("Controller should have a stream");
844        };
845
846        // If controller.[[started]] is false, return.
847        if !self.started.get() {
848            return;
849        }
850
851        // If stream.[[inFlightWriteRequest]] is not undefined, return.
852        if stream.has_in_flight_write_request() {
853            return;
854        }
855
856        // Let state be stream.[[state]].
857
858        // Assert: state is not "closed" or "errored".
859        assert!(!(stream.is_errored() || stream.is_closed()));
860
861        // If state is "erroring",
862        if stream.is_erroring() {
863            // Perform ! WritableStreamFinishErroring(stream).
864            stream.finish_erroring(cx, global, can_gc);
865
866            // Return.
867            return;
868        }
869
870        // Let value be ! PeekQueueValue(controller).
871        rooted!(in(*cx) let mut value = UndefinedValue());
872        let is_closed = {
873            let queue = self.queue.borrow_mut();
874
875            // If controller.[[queue]] is empty, return.
876            if queue.is_empty() {
877                return;
878            }
879            queue.peek_queue_value(cx, value.handle_mut(), can_gc)
880        };
881
882        if is_closed {
883            // If value is the close sentinel, perform ! WritableStreamDefaultControllerProcessClose(controller).
884            self.process_close(cx, global, can_gc);
885        } else {
886            // Otherwise, perform ! WritableStreamDefaultControllerProcessWrite(controller, value).
887            self.process_write(cx, value.handle(), global, can_gc);
888        };
889    }
890
891    /// <https://streams.spec.whatwg.org/#ws-default-controller-private-error>
892    pub(crate) fn perform_error_steps(&self) {
893        // Perform ! ResetQueue(this).
894        self.queue.borrow_mut().reset();
895    }
896
897    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write>
898    fn process_write(
899        &self,
900        cx: SafeJSContext,
901        chunk: SafeHandleValue,
902        global: &GlobalScope,
903        can_gc: CanGc,
904    ) {
905        // Let stream be controller.[[stream]].
906        let Some(stream) = self.stream.get() else {
907            unreachable!("Controller should have a stream");
908        };
909
910        // Perform ! WritableStreamMarkFirstWriteRequestInFlight(stream).
911        stream.mark_first_write_request_in_flight();
912
913        // Let sinkWritePromise be the result of performing controller.[[writeAlgorithm]], passing in chunk.
914        let sink_write_promise = self.call_write_algorithm(cx, chunk, global, can_gc);
915
916        // Upon fulfillment of sinkWritePromise,
917        rooted!(in(*cx) let mut fulfillment_handler = Some(WriteAlgorithmFulfillmentHandler {
918            controller: Dom::from_ref(self),
919        }));
920
921        // Upon rejection of sinkWritePromise with reason,
922        rooted!(in(*cx) let mut rejection_handler = Some(WriteAlgorithmRejectionHandler {
923            controller: Dom::from_ref(self),
924        }));
925
926        // Attach handlers to the promise.
927        let handler = PromiseNativeHandler::new(
928            global,
929            fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
930            rejection_handler.take().map(|h| Box::new(h) as Box<_>),
931            can_gc,
932        );
933        let realm = enter_realm(global);
934        let comp = InRealm::Entered(&realm);
935        sink_write_promise.append_native_handler(&handler, comp, can_gc);
936    }
937
938    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-get-desired-size>
939    pub(crate) fn get_desired_size(&self) -> f64 {
940        // Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
941        let queue = self.queue.borrow();
942        let desired_size = self.strategy_hwm - queue.total_size.clamp(0.0, f64::MAX);
943        desired_size.clamp(desired_size, self.strategy_hwm)
944    }
945
946    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-get-backpressure>
947    fn get_backpressure(&self) -> bool {
948        // Let desiredSize be ! WritableStreamDefaultControllerGetDesiredSize(controller).
949        let desired_size = self.get_desired_size();
950
951        // Return true if desiredSize ≤ 0, or false otherwise.
952        desired_size == 0.0 || desired_size.is_sign_negative()
953    }
954
955    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-get-chunk-size>
956    pub(crate) fn get_chunk_size(
957        &self,
958        cx: SafeJSContext,
959        global: &GlobalScope,
960        chunk: SafeHandleValue,
961        can_gc: CanGc,
962    ) -> f64 {
963        // If controller.[[strategySizeAlgorithm]] is undefined, then:
964        let Some(strategy_size) = self.strategy_size.borrow().clone() else {
965            // Assert: controller.[[stream]].[[state]] is not "writable".
966            let Some(stream) = self.stream.get() else {
967                unreachable!("Controller should have a stream");
968            };
969            assert!(!stream.is_writable());
970
971            // Return 1.
972            return 1.0;
973        };
974
975        // Let returnValue be the result of performing controller.[[strategySizeAlgorithm]],
976        // passing in chunk, and interpreting the result as a completion record.
977        let result = strategy_size.Call__(chunk, ExceptionHandling::Rethrow, can_gc);
978
979        match result {
980            // Let chunkSize be result.[[Value]].
981            Ok(size) => size,
982            Err(error) => {
983                // If result is an abrupt completion,
984
985                // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, returnValue.[[Value]]).
986                // Create a rooted value for the error.
987                rooted!(in(*cx) let mut rooted_error = UndefinedValue());
988                error.to_jsval(cx, global, rooted_error.handle_mut(), can_gc);
989                self.error_if_needed(cx, rooted_error.handle(), global, can_gc);
990
991                // Return 1.
992                1.0
993            },
994        }
995    }
996
997    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-write>
998    pub(crate) fn write(
999        &self,
1000        cx: SafeJSContext,
1001        global: &GlobalScope,
1002        chunk: SafeHandleValue,
1003        chunk_size: f64,
1004        can_gc: CanGc,
1005    ) {
1006        // Let enqueueResult be EnqueueValueWithSize(controller, chunk, chunkSize).
1007        let enqueue_result = {
1008            let mut queue = self.queue.borrow_mut();
1009            queue.enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize {
1010                value: Heap::boxed(chunk.get()),
1011                size: chunk_size,
1012            }))
1013        };
1014
1015        // If enqueueResult is an abrupt completion,
1016        if let Err(error) = enqueue_result {
1017            // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, enqueueResult.[[Value]]).
1018            // Create a rooted value for the error.
1019            rooted!(in(*cx) let mut rooted_error = UndefinedValue());
1020            error.to_jsval(cx, global, rooted_error.handle_mut(), can_gc);
1021            self.error_if_needed(cx, rooted_error.handle(), global, can_gc);
1022
1023            // Return.
1024            return;
1025        }
1026
1027        // Let stream be controller.[[stream]].
1028        let Some(stream) = self.stream.get() else {
1029            unreachable!("Controller should have a stream");
1030        };
1031
1032        // If ! WritableStreamCloseQueuedOrInFlight(stream) is false and stream.[[state]] is "writable",
1033        if !stream.close_queued_or_in_flight() && stream.is_writable() {
1034            // Let backpressure be ! WritableStreamDefaultControllerGetBackpressure(controller).
1035            let backpressure = self.get_backpressure();
1036
1037            // Perform ! WritableStreamUpdateBackpressure(stream, backpressure).
1038            stream.update_backpressure(backpressure, global, can_gc);
1039        }
1040
1041        // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller).
1042        self.advance_queue_if_needed(cx, global, can_gc);
1043    }
1044
1045    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-error-if-needed>
1046    pub(crate) fn error_if_needed(
1047        &self,
1048        cx: SafeJSContext,
1049        error: SafeHandleValue,
1050        global: &GlobalScope,
1051        can_gc: CanGc,
1052    ) {
1053        // Let stream be controller.[[stream]].
1054        let Some(stream) = self.stream.get() else {
1055            unreachable!("Controller should have a stream");
1056        };
1057
1058        // If stream.[[state]] is "writable",
1059        if stream.is_writable() {
1060            // Perform ! WritableStreamDefaultControllerError(controller, e).
1061            self.error(&stream, cx, error, global, can_gc);
1062        }
1063    }
1064
1065    /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-error>
1066    pub(crate) fn error(
1067        &self,
1068        stream: &WritableStream,
1069        cx: SafeJSContext,
1070        e: SafeHandleValue,
1071        global: &GlobalScope,
1072        can_gc: CanGc,
1073    ) {
1074        // Let stream be controller.[[stream]].
1075        // Done above with the argument.
1076
1077        // Assert: stream.[[state]] is "writable".
1078        assert!(stream.is_writable());
1079
1080        // Perform ! WritableStreamDefaultControllerClearAlgorithms(controller).
1081        self.clear_algorithms();
1082
1083        // Perform ! WritableStreamStartErroring(stream, error).
1084        stream.start_erroring(cx, global, e, can_gc);
1085    }
1086}
1087
1088impl WritableStreamDefaultControllerMethods<crate::DomTypeHolder>
1089    for WritableStreamDefaultController
1090{
1091    /// <https://streams.spec.whatwg.org/#ws-default-controller-error>
1092    fn Error(&self, cx: SafeJSContext, e: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
1093        // Let state be this.[[stream]].[[state]].
1094        let Some(stream) = self.stream.get() else {
1095            unreachable!("Controller should have a stream");
1096        };
1097
1098        // If state is not "writable", return.
1099        if !stream.is_writable() {
1100            return;
1101        }
1102
1103        let global = GlobalScope::from_safe_context(cx, realm);
1104
1105        // Perform ! WritableStreamDefaultControllerError(this, e).
1106        self.error(&stream, cx, e, &global, can_gc);
1107    }
1108
1109    /// <https://streams.spec.whatwg.org/#ws-default-controller-signal>
1110    fn Signal(&self) -> DomRoot<AbortSignal> {
1111        // Return this.[[abortController]]’s signal.
1112        self.abort_controller.signal()
1113    }
1114}