Skip to main content

script/dom/stream/
transformstream.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::ptr::{self};
7use std::rc::Rc;
8
9use dom_struct::dom_struct;
10use js::context::JSContext;
11use js::jsapi::{Heap, IsPromiseObject, JSObject};
12use js::jsval::{JSVal, ObjectValue, UndefinedValue};
13use js::realm::CurrentRealm;
14use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle};
15use rustc_hash::FxHashMap;
16use script_bindings::callback::ExceptionHandling;
17use script_bindings::cell::DomRefCell;
18use script_bindings::reflector::{Reflector, reflect_dom_object_with_proto_and_cx};
19use servo_base::id::{MessagePortId, MessagePortIndex};
20use servo_constellation_traits::TransformStreamData;
21
22use super::readablestream::CrossRealmTransformReadable;
23use super::writablestream::CrossRealmTransformWritable;
24use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::{
25    QueuingStrategy, QueuingStrategySize,
26};
27use crate::dom::bindings::codegen::Bindings::TransformStreamBinding::TransformStreamMethods;
28use crate::dom::bindings::codegen::Bindings::TransformerBinding::Transformer;
29use crate::dom::bindings::conversions::ConversionResult;
30use crate::dom::bindings::error::{Error, Fallible};
31use crate::dom::bindings::reflector::DomGlobal;
32use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
33use crate::dom::bindings::structuredclone::StructuredData;
34use crate::dom::bindings::transferable::Transferable;
35use crate::dom::globalscope::GlobalScope;
36use crate::dom::messageport::MessagePort;
37use crate::dom::promise::Promise;
38use crate::dom::promisenativehandler::Callback;
39use crate::dom::readablestream::{ReadableStream, create_readable_stream};
40use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
41use crate::dom::stream::transformstreamdefaultcontroller::TransformerType;
42use crate::dom::stream::underlyingsourcecontainer::UnderlyingSourceType;
43use crate::dom::stream::writablestream::create_writable_stream;
44use crate::dom::stream::writablestreamdefaultcontroller::UnderlyingSinkType;
45use crate::dom::types::{PromiseNativeHandler, TransformStreamDefaultController, WritableStream};
46use crate::realms::enter_auto_realm;
47
48impl js::gc::Rootable for TransformBackPressureChangePromiseFulfillment {}
49
50/// Reacting to backpressureChangePromise as part of
51/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
52#[derive(JSTraceable, MallocSizeOf)]
53#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
54struct TransformBackPressureChangePromiseFulfillment {
55    /// The result of reacting to backpressureChangePromise.
56    #[conditional_malloc_size_of]
57    result_promise: Rc<Promise>,
58
59    #[ignore_malloc_size_of = "mozjs"]
60    chunk: Box<Heap<JSVal>>,
61
62    /// The writable used in the fulfillment steps
63    writable: Dom<WritableStream>,
64
65    controller: Dom<TransformStreamDefaultController>,
66}
67
68impl Callback for TransformBackPressureChangePromiseFulfillment {
69    /// Reacting to backpressureChangePromise with the following fulfillment steps:
70    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
71        // Let writable be stream.[[writable]].
72        // Let state be writable.[[state]].
73        // If state is "erroring", throw writable.[[storedError]].
74        if self.writable.is_erroring() {
75            rooted!(&in(cx) let mut error = UndefinedValue());
76            self.writable.get_stored_error(error.handle_mut());
77            self.result_promise.reject(cx, error.handle());
78            return;
79        }
80
81        // Assert: state is "writable".
82        assert!(self.writable.is_writable());
83
84        // Return ! TransformStreamDefaultControllerPerformTransform(controller, chunk).
85        rooted!(&in(cx) let mut chunk = UndefinedValue());
86        chunk.set(self.chunk.get());
87        let transform_result = self
88            .controller
89            .transform_stream_default_controller_perform_transform(
90                cx,
91                &self.writable.global(),
92                chunk.handle(),
93            )
94            .expect("perform transform failed");
95
96        // PerformTransformFulfillment and PerformTransformRejection do not need
97        // to be rooted because they only contain an Rc.
98        let handler = PromiseNativeHandler::new(
99            cx,
100            &self.writable.global(),
101            Some(Box::new(PerformTransformFulfillment {
102                result_promise: self.result_promise.clone(),
103            })),
104            Some(Box::new(PerformTransformRejection {
105                result_promise: self.result_promise.clone(),
106            })),
107        );
108
109        let mut realm = enter_auto_realm(cx, &*self.writable.global());
110        let realm = &mut realm.current_realm();
111        transform_result.append_native_handler(realm, &handler);
112    }
113}
114
115#[derive(JSTraceable, MallocSizeOf)]
116#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
117/// Reacting to fulfillment of performTransform as part of
118/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
119struct PerformTransformFulfillment {
120    #[conditional_malloc_size_of]
121    result_promise: Rc<Promise>,
122}
123
124impl Callback for PerformTransformFulfillment {
125    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
126        // Fulfilled: resolve the outer promise
127        self.result_promise.resolve_native(cx, &());
128    }
129}
130
131#[derive(JSTraceable, MallocSizeOf)]
132#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
133/// Reacting to rejection of performTransform as part of
134/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
135struct PerformTransformRejection {
136    #[conditional_malloc_size_of]
137    result_promise: Rc<Promise>,
138}
139
140impl Callback for PerformTransformRejection {
141    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
142        // Stream already errored in perform_transform, just reject result_promise
143        self.result_promise.reject(cx, v);
144    }
145}
146
147#[derive(JSTraceable, MallocSizeOf)]
148#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
149/// Reacting to rejection of backpressureChangePromise as part of
150/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
151struct BackpressureChangeRejection {
152    #[conditional_malloc_size_of]
153    result_promise: Rc<Promise>,
154}
155
156impl Callback for BackpressureChangeRejection {
157    fn callback(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) {
158        self.result_promise.reject(cx, reason);
159    }
160}
161
162impl js::gc::Rootable for CancelPromiseFulfillment {}
163
164/// Reacting to fulfillment of the cancelpromise as part of
165/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-abort-algorithm>
166#[derive(JSTraceable, MallocSizeOf)]
167#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
168struct CancelPromiseFulfillment {
169    readable: Dom<ReadableStream>,
170    controller: Dom<TransformStreamDefaultController>,
171    #[ignore_malloc_size_of = "mozjs"]
172    reason: Box<Heap<JSVal>>,
173}
174
175impl Callback for CancelPromiseFulfillment {
176    /// Reacting to backpressureChangePromise with the following fulfillment steps:
177    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
178        // If readable.[[state]] is "errored", reject controller.[[finishPromise]] with readable.[[storedError]].
179        if self.readable.is_errored() {
180            rooted!(&in(cx) let mut error = UndefinedValue());
181            self.readable.get_stored_error(error.handle_mut());
182            self.controller
183                .get_finish_promise()
184                .expect("finish promise is not set")
185                .reject_native(cx, &error.handle());
186        } else {
187            // Otherwise:
188            // Perform ! ReadableStreamDefaultControllerError(readable.[[controller]], reason).
189            rooted!(&in(cx) let mut reason = UndefinedValue());
190            reason.set(self.reason.get());
191            self.readable
192                .get_default_controller()
193                .error(cx, reason.handle());
194
195            // Resolve controller.[[finishPromise]] with undefined.
196            self.controller
197                .get_finish_promise()
198                .expect("finish promise is not set")
199                .resolve_native(cx, &());
200        }
201    }
202}
203
204impl js::gc::Rootable for CancelPromiseRejection {}
205
206/// Reacting to rejection of cancelpromise as part of
207/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-abort-algorithm>
208#[derive(JSTraceable, MallocSizeOf)]
209#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
210struct CancelPromiseRejection {
211    readable: Dom<ReadableStream>,
212    controller: Dom<TransformStreamDefaultController>,
213}
214
215impl Callback for CancelPromiseRejection {
216    /// Reacting to backpressureChangePromise with the following fulfillment steps:
217    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
218        // Perform ! ReadableStreamDefaultControllerError(readable.[[controller]], r).
219        self.readable.get_default_controller().error(cx, v);
220
221        // Reject controller.[[finishPromise]] with r.
222        self.controller
223            .get_finish_promise()
224            .expect("finish promise is not set")
225            .reject(cx, v);
226    }
227}
228
229impl js::gc::Rootable for SourceCancelPromiseFulfillment {}
230
231/// Reacting to fulfillment of the cancelpromise as part of
232/// <https://streams.spec.whatwg.org/#transform-stream-default-source-cancel>
233#[derive(JSTraceable, MallocSizeOf)]
234#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
235struct SourceCancelPromiseFulfillment {
236    writeable: Dom<WritableStream>,
237    controller: Dom<TransformStreamDefaultController>,
238    stream: Dom<TransformStream>,
239    #[ignore_malloc_size_of = "mozjs"]
240    reason: Box<Heap<JSVal>>,
241}
242
243impl Callback for SourceCancelPromiseFulfillment {
244    /// Reacting to backpressureChangePromise with the following fulfillment steps:
245    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
246        // If cancelPromise was fulfilled, then:
247        let finish_promise = self
248            .controller
249            .get_finish_promise()
250            .expect("finish promise is not set");
251
252        let global = &self.writeable.global();
253        // If writable.[[state]] is "errored", reject controller.[[finishPromise]] with writable.[[storedError]].
254        if self.writeable.is_errored() {
255            rooted!(&in(cx) let mut error = UndefinedValue());
256            self.writeable.get_stored_error(error.handle_mut());
257            finish_promise.reject(cx, error.handle());
258        } else {
259            // Otherwise:
260            // Perform ! WritableStreamDefaultControllerErrorIfNeeded(writable.[[controller]], reason).
261            rooted!(&in(cx) let mut reason = UndefinedValue());
262            reason.set(self.reason.get());
263            self.writeable
264                .get_default_controller()
265                .error_if_needed(cx, reason.handle(), global);
266
267            // Perform ! TransformStreamUnblockWrite(stream).
268            self.stream.unblock_write(cx, global);
269
270            // Resolve controller.[[finishPromise]] with undefined.
271            finish_promise.resolve_native(cx, &());
272        }
273    }
274}
275
276impl js::gc::Rootable for SourceCancelPromiseRejection {}
277
278/// Reacting to rejection of cancelpromise as part of
279/// <https://streams.spec.whatwg.org/#transform-stream-default-source-cancel>
280#[derive(JSTraceable, MallocSizeOf)]
281#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
282struct SourceCancelPromiseRejection {
283    writeable: Dom<WritableStream>,
284    controller: Dom<TransformStreamDefaultController>,
285    stream: Dom<TransformStream>,
286}
287
288impl Callback for SourceCancelPromiseRejection {
289    /// Reacting to backpressureChangePromise with the following fulfillment steps:
290    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
291        // Perform ! WritableStreamDefaultControllerErrorIfNeeded(writable.[[controller]], r).
292        let global = &self.writeable.global();
293
294        self.writeable
295            .get_default_controller()
296            .error_if_needed(cx, v, global);
297
298        // Perform ! TransformStreamUnblockWrite(stream).
299        self.stream.unblock_write(cx, global);
300
301        // Reject controller.[[finishPromise]] with r.
302        self.controller
303            .get_finish_promise()
304            .expect("finish promise is not set")
305            .reject(cx, v);
306    }
307}
308
309impl js::gc::Rootable for FlushPromiseFulfillment {}
310
311/// Reacting to fulfillment of the flushpromise as part of
312/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-close-algorithm>
313#[derive(JSTraceable, MallocSizeOf)]
314#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
315struct FlushPromiseFulfillment {
316    readable: Dom<ReadableStream>,
317    controller: Dom<TransformStreamDefaultController>,
318}
319
320impl Callback for FlushPromiseFulfillment {
321    /// Reacting to flushpromise with the following fulfillment steps:
322    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
323        // If flushPromise was fulfilled, then:
324        let finish_promise = self
325            .controller
326            .get_finish_promise()
327            .expect("finish promise is not set");
328
329        // If readable.[[state]] is "errored", reject controller.[[finishPromise]] with readable.[[storedError]].
330        if self.readable.is_errored() {
331            rooted!(&in(cx) let mut error = UndefinedValue());
332            self.readable.get_stored_error(error.handle_mut());
333            finish_promise.reject(cx, error.handle());
334        } else {
335            // Otherwise:
336            // Perform ! ReadableStreamDefaultControllerClose(readable.[[controller]]).
337            self.readable.get_default_controller().close(cx);
338
339            // Resolve controller.[[finishPromise]] with undefined.
340            finish_promise.resolve_native(cx, &());
341        }
342    }
343}
344
345impl js::gc::Rootable for FlushPromiseRejection {}
346/// Reacting to rejection of flushpromise as part of
347/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-close-algorithm>
348
349#[derive(JSTraceable, MallocSizeOf)]
350#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
351struct FlushPromiseRejection {
352    readable: Dom<ReadableStream>,
353    controller: Dom<TransformStreamDefaultController>,
354}
355
356impl Callback for FlushPromiseRejection {
357    /// Reacting to flushpromise with the following fulfillment steps:
358    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
359        // If flushPromise was rejected with reason r, then:
360        // Perform ! ReadableStreamDefaultControllerError(readable.[[controller]], r).
361        self.readable.get_default_controller().error(cx, v);
362
363        // Reject controller.[[finishPromise]] with r.
364        self.controller
365            .get_finish_promise()
366            .expect("finish promise is not set")
367            .reject(cx, v);
368    }
369}
370
371impl js::gc::Rootable for CrossRealmTransform {}
372
373/// A wrapper to handle `message` and `messageerror` events
374/// for the message port used by the transfered stream.
375#[derive(Clone, JSTraceable, MallocSizeOf)]
376#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
377pub(crate) enum CrossRealmTransform {
378    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
379    Readable(CrossRealmTransformReadable),
380    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
381    Writable(CrossRealmTransformWritable),
382}
383
384/// <https://streams.spec.whatwg.org/#ts-class>
385#[dom_struct]
386pub struct TransformStream {
387    reflector_: Reflector,
388
389    /// <https://streams.spec.whatwg.org/#transformstream-backpressure>
390    backpressure: Cell<bool>,
391
392    /// <https://streams.spec.whatwg.org/#transformstream-backpressurechangepromise>
393    #[conditional_malloc_size_of]
394    backpressure_change_promise: DomRefCell<Option<Rc<Promise>>>,
395
396    /// <https://streams.spec.whatwg.org/#transformstream-controller>
397    controller: MutNullableDom<TransformStreamDefaultController>,
398
399    /// <https://streams.spec.whatwg.org/#transformstream-detached>
400    detached: Cell<bool>,
401
402    /// <https://streams.spec.whatwg.org/#transformstream-readable>
403    readable: MutNullableDom<ReadableStream>,
404
405    /// <https://streams.spec.whatwg.org/#transformstream-writable>
406    writable: MutNullableDom<WritableStream>,
407}
408
409impl TransformStream {
410    /// <https://streams.spec.whatwg.org/#initialize-transform-stream>
411    fn new_inherited() -> TransformStream {
412        TransformStream {
413            reflector_: Reflector::new(),
414            backpressure: Default::default(),
415            backpressure_change_promise: DomRefCell::new(None),
416            controller: MutNullableDom::new(None),
417            detached: Cell::new(false),
418            readable: MutNullableDom::new(None),
419            writable: MutNullableDom::new(None),
420        }
421    }
422
423    pub(crate) fn new_with_proto(
424        cx: &mut JSContext,
425        global: &GlobalScope,
426        proto: Option<SafeHandleObject>,
427    ) -> DomRoot<TransformStream> {
428        reflect_dom_object_with_proto_and_cx(
429            Box::new(TransformStream::new_inherited()),
430            global,
431            proto,
432            cx,
433        )
434    }
435
436    /// Creates and set up the newly created transform stream following
437    /// <https://streams.spec.whatwg.org/#transformstream-set-up>
438    pub(crate) fn set_up(
439        &self,
440        cx: &mut JSContext,
441        global: &GlobalScope,
442        transformer_type: TransformerType,
443    ) -> Fallible<()> {
444        // Step1. Let writableHighWaterMark be 1.
445        let writable_high_water_mark = 1.0;
446
447        // Step 2. Let writableSizeAlgorithm be an algorithm that returns 1.
448        let writable_size_algorithm = extract_size_algorithm(cx, &Default::default());
449
450        // Step 3. Let readableHighWaterMark be 0.
451        let readable_high_water_mark = 0.0;
452
453        // Step 4. Let readableSizeAlgorithm be an algorithm that returns 1.
454        let readable_size_algorithm = extract_size_algorithm(cx, &Default::default());
455
456        // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
457        // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
458        // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
459        // NOTE: These steps are implemented in `TransformStreamDefaultController::new`
460
461        // Step 8. Let startPromise be a promise resolved with undefined.
462        let start_promise = Promise::new_resolved(cx, global, ());
463
464        // Step 9. Perform ! InitializeTransformStream(stream, startPromise,
465        // writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark,
466        // readableSizeAlgorithm).
467        self.initialize(
468            cx,
469            global,
470            start_promise,
471            writable_high_water_mark,
472            writable_size_algorithm,
473            readable_high_water_mark,
474            readable_size_algorithm,
475        )?;
476
477        // Step 10. Let controller be a new TransformStreamDefaultController.
478        let controller = TransformStreamDefaultController::new(cx, global, transformer_type);
479
480        // Step 11. Perform ! SetUpTransformStreamDefaultController(stream,
481        // controller, transformAlgorithmWrapper, flushAlgorithmWrapper,
482        // cancelAlgorithmWrapper).
483        self.set_up_transform_stream_default_controller(&controller);
484
485        Ok(())
486    }
487
488    pub(crate) fn get_controller(&self) -> DomRoot<TransformStreamDefaultController> {
489        self.controller.get().expect("controller is not set")
490    }
491
492    pub(crate) fn get_writable(&self) -> DomRoot<WritableStream> {
493        self.writable.get().expect("writable stream is not set")
494    }
495
496    pub(crate) fn get_readable(&self) -> DomRoot<ReadableStream> {
497        self.readable.get().expect("readable stream is not set")
498    }
499
500    pub(crate) fn get_backpressure(&self) -> bool {
501        self.backpressure.get()
502    }
503
504    /// <https://streams.spec.whatwg.org/#initialize-transform-stream>
505    #[expect(clippy::too_many_arguments)]
506    fn initialize(
507        &self,
508        cx: &mut JSContext,
509        global: &GlobalScope,
510        start_promise: Rc<Promise>,
511        writable_high_water_mark: f64,
512        writable_size_algorithm: Rc<QueuingStrategySize>,
513        readable_high_water_mark: f64,
514        readable_size_algorithm: Rc<QueuingStrategySize>,
515    ) -> Fallible<()> {
516        // Let startAlgorithm be an algorithm that returns startPromise.
517        // Let writeAlgorithm be the following steps, taking a chunk argument:
518        //  Return ! TransformStreamDefaultSinkWriteAlgorithm(stream, chunk).
519        // Let abortAlgorithm be the following steps, taking a reason argument:
520        //  Return ! TransformStreamDefaultSinkAbortAlgorithm(stream, reason).
521        // Let closeAlgorithm be the following steps:
522        //  Return ! TransformStreamDefaultSinkCloseAlgorithm(stream).
523        // Set stream.[[writable]] to ! CreateWritableStream(startAlgorithm, writeAlgorithm,
524        // closeAlgorithm, abortAlgorithm, writableHighWaterMark, writableSizeAlgorithm).
525        // Note: Those steps are implemented using UnderlyingSinkType::Transform.
526
527        let writable = create_writable_stream(
528            cx,
529            global,
530            writable_high_water_mark,
531            writable_size_algorithm,
532            UnderlyingSinkType::Transform(Dom::from_ref(self), start_promise.clone()),
533        )?;
534        self.writable.set(Some(&writable));
535
536        // Let pullAlgorithm be the following steps:
537
538        // Return ! TransformStreamDefaultSourcePullAlgorithm(stream).
539
540        // Let cancelAlgorithm be the following steps, taking a reason argument:
541
542        // Return ! TransformStreamDefaultSourceCancelAlgorithm(stream, reason).
543
544        // Set stream.[[readable]] to ! CreateReadableStream(startAlgorithm, pullAlgorithm,
545        // cancelAlgorithm, readableHighWaterMark, readableSizeAlgorithm).
546
547        let readable = create_readable_stream(
548            cx,
549            global,
550            UnderlyingSourceType::Transform(self, start_promise),
551            Some(readable_size_algorithm),
552            Some(readable_high_water_mark),
553        );
554        self.readable.set(Some(&readable));
555
556        // Set stream.[[backpressure]] and stream.[[backpressureChangePromise]] to undefined.
557        // Note: This is done in the constructor.
558
559        // Perform ! TransformStreamSetBackpressure(stream, true).
560        self.set_backpressure(cx, global, true);
561
562        // Set stream.[[controller]] to undefined.
563        self.controller.set(None);
564
565        Ok(())
566    }
567
568    /// <https://streams.spec.whatwg.org/#transform-stream-set-backpressure>
569    pub(crate) fn set_backpressure(
570        &self,
571        cx: &mut JSContext,
572        global: &GlobalScope,
573        backpressure: bool,
574    ) {
575        // Assert: stream.[[backpressure]] is not backpressure.
576        assert!(self.backpressure.get() != backpressure);
577
578        // If stream.[[backpressureChangePromise]] is not undefined, resolve
579        // stream.[[backpressureChangePromise]] with undefined.
580        if let Some(promise) = self.backpressure_change_promise.borrow_mut().take() {
581            promise.resolve_native(cx, &());
582        }
583
584        // Set stream.[[backpressureChangePromise]] to a new promise.;
585        *self.backpressure_change_promise.borrow_mut() = Some(Promise::new(cx, global));
586
587        // Set stream.[[backpressure]] to backpressure.
588        self.backpressure.set(backpressure);
589    }
590
591    /// <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller>
592    fn set_up_transform_stream_default_controller(
593        &self,
594        controller: &TransformStreamDefaultController,
595    ) {
596        // Assert: stream implements TransformStream.
597        // Note: this is checked with type.
598
599        // Assert: stream.[[controller]] is undefined.
600        assert!(self.controller.get().is_none());
601
602        // Set controller.[[stream]] to stream.
603        controller.set_stream(self);
604
605        // Set stream.[[controller]] to controller.
606        self.controller.set(Some(controller));
607
608        // Set controller.[[transformAlgorithm]] to transformAlgorithm.
609        // Set controller.[[flushAlgorithm]] to flushAlgorithm.
610        // Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
611        // Note: These are set in the constructor.
612    }
613
614    /// <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
615    fn set_up_transform_stream_default_controller_from_transformer(
616        &self,
617        cx: &mut JSContext,
618        global: &GlobalScope,
619        transformer_obj: SafeHandleObject,
620        transformer: &Transformer,
621    ) {
622        // Let controller be a new TransformStreamDefaultController.
623        let transformer_type = TransformerType::new_from_js_transformer(transformer);
624        let controller = TransformStreamDefaultController::new(cx, global, transformer_type);
625
626        // Let transformAlgorithm be the following steps, taking a chunk argument:
627        // Let result be TransformStreamDefaultControllerEnqueue(controller, chunk).
628        // If result is an abrupt completion, return a promise rejected with result.[[Value]].
629        // Otherwise, return a promise resolved with undefined.
630
631        // Let flushAlgorithm be an algorithm which returns a promise resolved with undefined.
632        // Let cancelAlgorithm be an algorithm which returns a promise resolved with undefined.
633
634        // If transformerDict["transform"] exists, set transformAlgorithm to an algorithm which
635        // takes an argument
636        // chunk and returns the result of invoking transformerDict["transform"] with argument
637        // list « chunk, controller »
638        // and callback this value transformer.
639
640        // If transformerDict["flush"] exists, set flushAlgorithm to an algorithm which returns
641        // the result
642        // of invoking transformerDict["flush"] with argument list « controller » and callback
643        // this value transformer.
644
645        // If transformerDict["cancel"] exists, set cancelAlgorithm to an algorithm which takes an argument
646        // reason and returns the result of invoking transformerDict["cancel"] with argument list « reason »
647        // and callback this value transformer.
648        controller.set_transform_obj(transformer_obj);
649
650        // Perform ! SetUpTransformStreamDefaultController(stream, controller,
651        // transformAlgorithm, flushAlgorithm, cancelAlgorithm).
652        self.set_up_transform_stream_default_controller(&controller);
653    }
654
655    /// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
656    pub(crate) fn transform_stream_default_sink_write_algorithm(
657        &self,
658        cx: &mut JSContext,
659        global: &GlobalScope,
660        chunk: SafeHandleValue,
661    ) -> Fallible<Rc<Promise>> {
662        // Assert: stream.[[writable]].[[state]] is "writable".
663        assert!(self.writable.get().is_some());
664
665        // Let controller be stream.[[controller]].
666        let controller = self.controller.get().expect("controller is not set");
667
668        // If stream.[[backpressure]] is true,
669        if self.backpressure.get() {
670            // Let backpressureChangePromise be stream.[[backpressureChangePromise]].
671            let backpressure_change_promise = self.backpressure_change_promise.borrow();
672
673            // Assert: backpressureChangePromise is not undefined.
674            assert!(backpressure_change_promise.is_some());
675
676            // Return the result of reacting to backpressureChangePromise with the following fulfillment steps:
677            let result_promise = Promise::new(cx, global);
678            rooted!(&in(cx) let mut fulfillment_handler = Some(TransformBackPressureChangePromiseFulfillment {
679                controller: Dom::from_ref(&controller),
680                writable: Dom::from_ref(&self.writable.get().expect("writable stream")),
681                chunk: Heap::boxed(chunk.get()),
682                result_promise: result_promise.clone(),
683            }));
684
685            let handler = PromiseNativeHandler::new(
686                cx,
687                global,
688                fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
689                Some(Box::new(BackpressureChangeRejection {
690                    result_promise: result_promise.clone(),
691                })),
692            );
693            let mut realm = enter_auto_realm(cx, global);
694            let realm = &mut realm.current_realm();
695            backpressure_change_promise
696                .as_ref()
697                .expect("Promise must be some by now.")
698                .append_native_handler(realm, &handler);
699
700            return Ok(result_promise);
701        }
702
703        // Return ! TransformStreamDefaultControllerPerformTransform(controller, chunk).
704        controller.transform_stream_default_controller_perform_transform(cx, global, chunk)
705    }
706
707    /// <https://streams.spec.whatwg.org/#transform-stream-default-sink-abort-algorithm>
708    pub(crate) fn transform_stream_default_sink_abort_algorithm(
709        &self,
710        cx: &mut JSContext,
711        global: &GlobalScope,
712        reason: SafeHandleValue,
713    ) -> Fallible<Rc<Promise>> {
714        // Let controller be stream.[[controller]].
715        let controller = self.controller.get().expect("controller is not set");
716
717        // If controller.[[finishPromise]] is not undefined, return controller.[[finishPromise]].
718        if let Some(finish_promise) = controller.get_finish_promise() {
719            return Ok(finish_promise);
720        }
721
722        // Let readable be stream.[[readable]].
723        let readable = self.readable.get().expect("readable stream is not set");
724
725        // Let controller.[[finishPromise]] be a new promise.
726        controller.set_finish_promise(Promise::new(cx, global));
727
728        // Let cancelPromise be the result of performing controller.[[cancelAlgorithm]], passing reason.
729        let cancel_promise = controller.perform_cancel(cx, global, reason)?;
730
731        // Perform ! TransformStreamDefaultControllerClearAlgorithms(controller).
732        controller.clear_algorithms();
733
734        // React to cancelPromise:
735        let handler = PromiseNativeHandler::new(
736            cx,
737            global,
738            Some(Box::new(CancelPromiseFulfillment {
739                readable: Dom::from_ref(&readable),
740                controller: Dom::from_ref(&controller),
741                reason: Heap::boxed(reason.get()),
742            })),
743            Some(Box::new(CancelPromiseRejection {
744                readable: Dom::from_ref(&readable),
745                controller: Dom::from_ref(&controller),
746            })),
747        );
748        let mut realm = enter_auto_realm(cx, global);
749        let cx = &mut realm.current_realm();
750        cancel_promise.append_native_handler(cx, &handler);
751
752        // Return controller.[[finishPromise]].
753        let finish_promise = controller
754            .get_finish_promise()
755            .expect("finish promise is not set");
756        Ok(finish_promise)
757    }
758
759    /// <https://streams.spec.whatwg.org/#transform-stream-default-sink-close-algorithm>
760    pub(crate) fn transform_stream_default_sink_close_algorithm(
761        &self,
762        cx: &mut JSContext,
763        global: &GlobalScope,
764    ) -> Fallible<Rc<Promise>> {
765        // Let controller be stream.[[controller]].
766        let controller = self
767            .controller
768            .get()
769            .ok_or(Error::Type(c"controller is not set".to_owned()))?;
770
771        // If controller.[[finishPromise]] is not undefined, return controller.[[finishPromise]].
772        if let Some(finish_promise) = controller.get_finish_promise() {
773            return Ok(finish_promise);
774        }
775
776        // Let readable be stream.[[readable]].
777        let readable = self
778            .readable
779            .get()
780            .ok_or(Error::Type(c"readable stream is not set".to_owned()))?;
781
782        // Let controller.[[finishPromise]] be a new promise.
783        controller.set_finish_promise(Promise::new(cx, global));
784
785        // Let flushPromise be the result of performing controller.[[flushAlgorithm]].
786        let flush_promise = controller.perform_flush(cx, global)?;
787
788        // Perform ! TransformStreamDefaultControllerClearAlgorithms(controller).
789        controller.clear_algorithms();
790
791        // React to flushPromise:
792        let handler = PromiseNativeHandler::new(
793            cx,
794            global,
795            Some(Box::new(FlushPromiseFulfillment {
796                readable: Dom::from_ref(&readable),
797                controller: Dom::from_ref(&controller),
798            })),
799            Some(Box::new(FlushPromiseRejection {
800                readable: Dom::from_ref(&readable),
801                controller: Dom::from_ref(&controller),
802            })),
803        );
804
805        let mut realm = enter_auto_realm(cx, global);
806        let realm = &mut realm.current_realm();
807        flush_promise.append_native_handler(realm, &handler);
808        // Return controller.[[finishPromise]].
809        let finish_promise = controller
810            .get_finish_promise()
811            .expect("finish promise is not set");
812        Ok(finish_promise)
813    }
814
815    /// <https://streams.spec.whatwg.org/#transform-stream-default-source-cancel>
816    pub(crate) fn transform_stream_default_source_cancel(
817        &self,
818        cx: &mut JSContext,
819        global: &GlobalScope,
820        reason: SafeHandleValue,
821    ) -> Fallible<Rc<Promise>> {
822        // Let controller be stream.[[controller]].
823        let controller = self
824            .controller
825            .get()
826            .ok_or(Error::Type(c"controller is not set".to_owned()))?;
827
828        // If controller.[[finishPromise]] is not undefined, return controller.[[finishPromise]].
829        if let Some(finish_promise) = controller.get_finish_promise() {
830            return Ok(finish_promise);
831        }
832
833        // Let writable be stream.[[writable]].
834        let writable = self
835            .writable
836            .get()
837            .ok_or(Error::Type(c"writable stream is not set".to_owned()))?;
838
839        // Let controller.[[finishPromise]] be a new promise.
840        controller.set_finish_promise(Promise::new(cx, global));
841
842        // Let cancelPromise be the result of performing controller.[[cancelAlgorithm]], passing reason.
843        let cancel_promise = controller.perform_cancel(cx, global, reason)?;
844
845        // Perform ! TransformStreamDefaultControllerClearAlgorithms(controller).
846        controller.clear_algorithms();
847
848        // React to cancelPromise:
849        let handler = PromiseNativeHandler::new(
850            cx,
851            global,
852            Some(Box::new(SourceCancelPromiseFulfillment {
853                writeable: Dom::from_ref(&writable),
854                controller: Dom::from_ref(&controller),
855                stream: Dom::from_ref(self),
856                reason: Heap::boxed(reason.get()),
857            })),
858            Some(Box::new(SourceCancelPromiseRejection {
859                writeable: Dom::from_ref(&writable),
860                controller: Dom::from_ref(&controller),
861                stream: Dom::from_ref(self),
862            })),
863        );
864
865        // Return controller.[[finishPromise]].
866        let finish_promise = controller
867            .get_finish_promise()
868            .expect("finish promise is not set");
869        let mut realm = enter_auto_realm(cx, global);
870        let cx = &mut realm.current_realm();
871        cancel_promise.append_native_handler(cx, &handler);
872        Ok(finish_promise)
873    }
874
875    /// <https://streams.spec.whatwg.org/#transform-stream-default-source-pull>
876    pub(crate) fn transform_stream_default_source_pull(
877        &self,
878        cx: &mut JSContext,
879        global: &GlobalScope,
880    ) -> Fallible<Rc<Promise>> {
881        // Assert: stream.[[backpressure]] is true.
882        assert!(self.backpressure.get());
883
884        // Assert: stream.[[backpressureChangePromise]] is not undefined.
885        assert!(self.backpressure_change_promise.borrow().is_some());
886
887        // Perform ! TransformStreamSetBackpressure(stream, false).
888        self.set_backpressure(cx, global, false);
889
890        // Return stream.[[backpressureChangePromise]].
891        Ok(self
892            .backpressure_change_promise
893            .borrow()
894            .clone()
895            .expect("Promise must be some by now."))
896    }
897
898    /// <https://streams.spec.whatwg.org/#transform-stream-error-writable-and-unblock-write>
899    pub(crate) fn error_writable_and_unblock_write(
900        &self,
901        cx: &mut JSContext,
902        global: &GlobalScope,
903        error: SafeHandleValue,
904    ) {
905        // Perform ! TransformStreamDefaultControllerClearAlgorithms(stream.[[controller]]).
906        self.get_controller().clear_algorithms();
907
908        // Perform ! WritableStreamDefaultControllerErrorIfNeeded(stream.[[writable]].[[controller]], e).
909        self.get_writable()
910            .get_default_controller()
911            .error_if_needed(cx, error, global);
912
913        // Perform ! TransformStreamUnblockWrite(stream).
914        self.unblock_write(cx, global)
915    }
916
917    /// <https://streams.spec.whatwg.org/#transform-stream-unblock-write>
918    pub(crate) fn unblock_write(&self, cx: &mut JSContext, global: &GlobalScope) {
919        // If stream.[[backpressure]] is true, perform ! TransformStreamSetBackpressure(stream, false).
920        if self.backpressure.get() {
921            self.set_backpressure(cx, global, false);
922        }
923    }
924
925    /// <https://streams.spec.whatwg.org/#transform-stream-error>
926    pub(crate) fn error(&self, cx: &mut JSContext, global: &GlobalScope, error: SafeHandleValue) {
927        // Perform ! ReadableStreamDefaultControllerError(stream.[[readable]].[[controller]], e).
928        self.get_readable()
929            .get_default_controller()
930            .error(cx, error);
931
932        // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, e).
933        self.error_writable_and_unblock_write(cx, global, error);
934    }
935}
936
937impl TransformStreamMethods<crate::DomTypeHolder> for TransformStream {
938    /// <https://streams.spec.whatwg.org/#ts-constructor>
939    #[expect(unsafe_code)]
940    fn Constructor(
941        cx: &mut JSContext,
942        global: &GlobalScope,
943        proto: Option<SafeHandleObject>,
944        transformer: Option<*mut JSObject>,
945        writable_strategy: &QueuingStrategy,
946        readable_strategy: &QueuingStrategy,
947    ) -> Fallible<DomRoot<TransformStream>> {
948        // If transformer is missing, set it to null.
949        rooted!(&in(cx) let transformer_obj = transformer.unwrap_or(ptr::null_mut()));
950
951        // Let underlyingSinkDict be underlyingSink,
952        // converted to an IDL value of type UnderlyingSink.
953        let transformer_dict = if !transformer_obj.is_null() {
954            rooted!(&in(cx) let obj_val = ObjectValue(transformer_obj.get()));
955            match Transformer::new(cx, obj_val.handle()) {
956                Ok(ConversionResult::Success(val)) => val,
957                Ok(ConversionResult::Failure(error)) => {
958                    return Err(Error::Type(error.into_owned()));
959                },
960                _ => {
961                    return Err(Error::JSFailed);
962                },
963            }
964        } else {
965            Transformer::empty()
966        };
967
968        // If transformerDict["readableType"] exists, throw a RangeError exception.
969        if !transformer_dict.readableType.handle().is_undefined() {
970            return Err(Error::Range(c"readableType is set".to_owned()));
971        }
972
973        // If transformerDict["writableType"] exists, throw a RangeError exception.
974        if !transformer_dict.writableType.handle().is_undefined() {
975            return Err(Error::Range(c"writableType is set".to_owned()));
976        }
977
978        // Let readableHighWaterMark be ? ExtractHighWaterMark(readableStrategy, 0).
979        let readable_high_water_mark = extract_high_water_mark(readable_strategy, 0.0)?;
980
981        // Let readableSizeAlgorithm be ! ExtractSizeAlgorithm(readableStrategy).
982        let readable_size_algorithm = extract_size_algorithm(cx, readable_strategy);
983
984        // Let writableHighWaterMark be ? ExtractHighWaterMark(writableStrategy, 1).
985        let writable_high_water_mark = extract_high_water_mark(writable_strategy, 1.0)?;
986
987        // Let writableSizeAlgorithm be ! ExtractSizeAlgorithm(writableStrategy).
988        let writable_size_algorithm = extract_size_algorithm(cx, writable_strategy);
989
990        // Let startPromise be a new promise.
991        let start_promise = Promise::new(cx, global);
992
993        // Perform ! InitializeTransformStream(this, startPromise, writableHighWaterMark,
994        // writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm).
995        let stream = TransformStream::new_with_proto(cx, global, proto);
996        stream.initialize(
997            cx,
998            global,
999            start_promise.clone(),
1000            writable_high_water_mark,
1001            writable_size_algorithm,
1002            readable_high_water_mark,
1003            readable_size_algorithm,
1004        )?;
1005
1006        // Perform ? SetUpTransformStreamDefaultControllerFromTransformer(this, transformer, transformerDict).
1007        stream.set_up_transform_stream_default_controller_from_transformer(
1008            cx,
1009            global,
1010            transformer_obj.handle(),
1011            &transformer_dict,
1012        );
1013
1014        // If transformerDict["start"] exists, then resolve startPromise with the
1015        // result of invoking transformerDict["start"]
1016        // with argument list « this.[[controller]] » and callback this value transformer.
1017        if let Some(start) = &transformer_dict.start {
1018            rooted!(&in(cx) let mut result_object = ptr::null_mut::<JSObject>());
1019            rooted!(&in(cx) let mut result: JSVal);
1020            rooted!(&in(cx) let this_object = transformer_obj.get());
1021            start.Call_(
1022                cx,
1023                &this_object.handle(),
1024                &stream.get_controller(),
1025                result.handle_mut(),
1026                ExceptionHandling::Rethrow,
1027            )?;
1028            let is_promise = unsafe {
1029                if result.is_object() {
1030                    result_object.set(result.to_object());
1031                    IsPromiseObject(result_object.handle().into_handle())
1032                } else {
1033                    false
1034                }
1035            };
1036            let promise = if is_promise {
1037                Promise::new_with_js_promise(cx, result_object.handle())
1038            } else {
1039                Promise::new_resolved(cx, global, result.get())
1040            };
1041            start_promise.resolve_native(cx, &promise);
1042        } else {
1043            // Otherwise, resolve startPromise with undefined.
1044            start_promise.resolve_native(cx, &());
1045        };
1046
1047        Ok(stream)
1048    }
1049
1050    /// <https://streams.spec.whatwg.org/#ts-readable>
1051    fn Readable(&self) -> DomRoot<ReadableStream> {
1052        // Return this.[[readable]].
1053        self.readable.get().expect("readable stream is not set")
1054    }
1055
1056    /// <https://streams.spec.whatwg.org/#ts-writable>
1057    fn Writable(&self) -> DomRoot<WritableStream> {
1058        // Return this.[[writable]].
1059        self.writable.get().expect("writable stream is not set")
1060    }
1061}
1062
1063/// <https://streams.spec.whatwg.org/#ts-transfer>
1064impl Transferable for TransformStream {
1065    type Index = MessagePortIndex;
1066    type Data = TransformStreamData;
1067
1068    /// <https://streams.spec.whatwg.org/#ref-for-transfer-steps②>
1069    fn transfer(&self, cx: &mut JSContext) -> Fallible<(MessagePortId, TransformStreamData)> {
1070        let global = self.global();
1071        let mut realm = enter_auto_realm(cx, &*global);
1072        let mut realm = realm.current_realm();
1073        let cx = &mut realm;
1074
1075        // Step 1. Let readable be value.[[readable]].
1076        let readable = self.get_readable();
1077
1078        // Step 2. Let writable be value.[[writable]].
1079        let writable = self.get_writable();
1080
1081        // Step 3. If ! IsReadableStreamLocked(readable) is true, throw a
1082        // "DataCloneError" DOMException.
1083        // Step 4. If ! IsWritableStreamLocked(writable) is true, throw a
1084        // "DataCloneError" DOMException.
1085        if readable.is_locked() || writable.is_locked() {
1086            return Err(Error::DataClone(None));
1087        }
1088
1089        // First port pair (readable → proxy writable)
1090        let port1 = MessagePort::new(cx, &global);
1091        global.track_message_port(&port1, None);
1092        let port1_peer = MessagePort::new(cx, &global);
1093        global.track_message_port(&port1_peer, None);
1094        global.entangle_ports(*port1.message_port_id(), *port1_peer.message_port_id());
1095
1096        let proxy_readable = ReadableStream::new_with_proto(cx, &global, None);
1097        proxy_readable.setup_cross_realm_transform_readable(cx, &port1);
1098        proxy_readable
1099            .pipe_to(cx, &global, &writable, false, false, false, None)
1100            .set_promise_is_handled(cx);
1101
1102        // Second port pair (proxy readable → writable)
1103        let port2 = MessagePort::new(cx, &global);
1104        global.track_message_port(&port2, None);
1105        let port2_peer = MessagePort::new(cx, &global);
1106        global.track_message_port(&port2_peer, None);
1107        global.entangle_ports(*port2.message_port_id(), *port2_peer.message_port_id());
1108
1109        let proxy_writable = WritableStream::new_with_proto(cx, &global, None);
1110        proxy_writable.setup_cross_realm_transform_writable(cx, &port2);
1111
1112        // Pipe readable into the proxy writable (→ port_1)
1113        readable
1114            .pipe_to(cx, &global, &proxy_writable, false, false, false, None)
1115            .set_promise_is_handled(cx);
1116
1117        // Step 5. Set dataHolder.[[readable]] to !
1118        // StructuredSerializeWithTransfer(readable, « readable »).
1119        // Step 6. Set dataHolder.[[writable]] to !
1120        // StructuredSerializeWithTransfer(writable, « writable »).
1121        Ok((
1122            *port1_peer.message_port_id(),
1123            TransformStreamData {
1124                readable: port1_peer.transfer(cx)?,
1125                writable: port2_peer.transfer(cx)?,
1126            },
1127        ))
1128    }
1129
1130    /// <https://streams.spec.whatwg.org/#ref-for-transfer-receiving-steps②>
1131    fn transfer_receive(
1132        cx: &mut JSContext,
1133        owner: &GlobalScope,
1134        _id: MessagePortId,
1135        data: TransformStreamData,
1136    ) -> Result<DomRoot<Self>, ()> {
1137        let port1 = MessagePort::transfer_receive(cx, owner, data.readable.0, data.readable.1)?;
1138        let port2 = MessagePort::transfer_receive(cx, owner, data.writable.0, data.writable.1)?;
1139
1140        // Step 1. Let readableRecord be !
1141        // StructuredDeserializeWithTransfer(dataHolder.[[readable]], the
1142        // current Realm).
1143        let proxy_readable = ReadableStream::new_with_proto(cx, owner, None);
1144        proxy_readable.setup_cross_realm_transform_readable(cx, &port2);
1145
1146        // Step 2. Let writableRecord be !
1147        // StructuredDeserializeWithTransfer(dataHolder.[[writable]], the
1148        // current Realm).
1149        let proxy_writable = WritableStream::new_with_proto(cx, owner, None);
1150        proxy_writable.setup_cross_realm_transform_writable(cx, &port1);
1151
1152        // Step 3. Set value.[[readable]] to readableRecord.[[Deserialized]].
1153        // Step 4. Set value.[[writable]] to writableRecord.[[Deserialized]].
1154        // Step 5. Set value.[[backpressure]],
1155        // value.[[backpressureChangePromise]], and value.[[controller]] to
1156        // undefined.
1157        let stream = TransformStream::new_with_proto(cx, owner, None);
1158        stream.readable.set(Some(&proxy_readable));
1159        stream.writable.set(Some(&proxy_writable));
1160
1161        Ok(stream)
1162    }
1163
1164    fn serialized_storage<'a>(
1165        data: StructuredData<'a, '_>,
1166    ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
1167        match data {
1168            StructuredData::Reader(r) => &mut r.transform_streams_port_impls,
1169            StructuredData::Writer(w) => &mut w.transform_streams_port,
1170        }
1171    }
1172}