script/dom/stream/
transformstream.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::ptr::{self};
7use std::rc::Rc;
8
9use dom_struct::dom_struct;
10use js::jsapi::{Heap, IsPromiseObject, JSObject};
11use js::jsval::{JSVal, ObjectValue, UndefinedValue};
12use js::realm::CurrentRealm;
13use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle};
14use rustc_hash::FxHashMap;
15use script_bindings::callback::ExceptionHandling;
16use script_bindings::realms::InRealm;
17use servo_base::id::{MessagePortId, MessagePortIndex};
18use servo_constellation_traits::TransformStreamData;
19
20use super::readablestream::CrossRealmTransformReadable;
21use super::writablestream::CrossRealmTransformWritable;
22use crate::dom::bindings::cell::DomRefCell;
23use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::{
24    QueuingStrategy, QueuingStrategySize,
25};
26use crate::dom::bindings::codegen::Bindings::TransformStreamBinding::TransformStreamMethods;
27use crate::dom::bindings::codegen::Bindings::TransformerBinding::Transformer;
28use crate::dom::bindings::conversions::ConversionResult;
29use crate::dom::bindings::error::{Error, Fallible};
30use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
31use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
32use crate::dom::bindings::structuredclone::StructuredData;
33use crate::dom::bindings::transferable::Transferable;
34use crate::dom::globalscope::GlobalScope;
35use crate::dom::messageport::MessagePort;
36use crate::dom::promise::Promise;
37use crate::dom::promisenativehandler::Callback;
38use crate::dom::readablestream::{ReadableStream, create_readable_stream};
39use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
40use crate::dom::stream::transformstreamdefaultcontroller::TransformerType;
41use crate::dom::stream::underlyingsourcecontainer::UnderlyingSourceType;
42use crate::dom::stream::writablestream::create_writable_stream;
43use crate::dom::stream::writablestreamdefaultcontroller::UnderlyingSinkType;
44use crate::dom::types::{PromiseNativeHandler, TransformStreamDefaultController, WritableStream};
45use crate::realms::{enter_auto_realm, enter_realm};
46use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
47
48impl js::gc::Rootable for TransformBackPressureChangePromiseFulfillment {}
49
50/// Reacting to backpressureChangePromise as part of
51/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
52#[derive(JSTraceable, MallocSizeOf)]
53#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
54struct TransformBackPressureChangePromiseFulfillment {
55    /// The result of reacting to backpressureChangePromise.
56    #[conditional_malloc_size_of]
57    result_promise: Rc<Promise>,
58
59    #[ignore_malloc_size_of = "mozjs"]
60    chunk: Box<Heap<JSVal>>,
61
62    /// The writable used in the fulfillment steps
63    writable: Dom<WritableStream>,
64
65    controller: Dom<TransformStreamDefaultController>,
66}
67
68impl Callback for TransformBackPressureChangePromiseFulfillment {
69    /// Reacting to backpressureChangePromise with the following fulfillment steps:
70    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
71        // Let writable be stream.[[writable]].
72        // Let state be writable.[[state]].
73        // If state is "erroring", throw writable.[[storedError]].
74        if self.writable.is_erroring() {
75            rooted!(&in(cx) let mut error = UndefinedValue());
76            self.writable.get_stored_error(error.handle_mut());
77            self.result_promise
78                .reject(cx.into(), error.handle(), CanGc::from_cx(cx));
79            return;
80        }
81
82        // Assert: state is "writable".
83        assert!(self.writable.is_writable());
84
85        // Return ! TransformStreamDefaultControllerPerformTransform(controller, chunk).
86        rooted!(&in(cx) let mut chunk = UndefinedValue());
87        chunk.set(self.chunk.get());
88        let transform_result = self
89            .controller
90            .transform_stream_default_controller_perform_transform(
91                cx,
92                &self.writable.global(),
93                chunk.handle(),
94            )
95            .expect("perform transform failed");
96
97        // PerformTransformFulfillment and PerformTransformRejection do not need
98        // to be rooted because they only contain an Rc.
99        let handler = PromiseNativeHandler::new(
100            &self.writable.global(),
101            Some(Box::new(PerformTransformFulfillment {
102                result_promise: self.result_promise.clone(),
103            })),
104            Some(Box::new(PerformTransformRejection {
105                result_promise: self.result_promise.clone(),
106            })),
107            CanGc::from_cx(cx),
108        );
109
110        let mut realm = enter_auto_realm(cx, &*self.writable.global());
111        let realm = &mut realm.current_realm();
112        let in_realm_proof = realm.into();
113        let comp = InRealm::Already(&in_realm_proof);
114        transform_result.append_native_handler(&handler, comp, CanGc::from_cx(realm));
115    }
116}
117
118#[derive(JSTraceable, MallocSizeOf)]
119#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
120/// Reacting to fulfillment of performTransform as part of
121/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
122struct PerformTransformFulfillment {
123    #[conditional_malloc_size_of]
124    result_promise: Rc<Promise>,
125}
126
127impl Callback for PerformTransformFulfillment {
128    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
129        let can_gc = CanGc::from_cx(cx);
130        // Fulfilled: resolve the outer promise
131        self.result_promise.resolve_native(&(), can_gc);
132    }
133}
134
135#[derive(JSTraceable, MallocSizeOf)]
136#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
137/// Reacting to rejection of performTransform as part of
138/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
139struct PerformTransformRejection {
140    #[conditional_malloc_size_of]
141    result_promise: Rc<Promise>,
142}
143
144impl Callback for PerformTransformRejection {
145    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
146        let can_gc = CanGc::from_cx(cx);
147        // Stream already errored in perform_transform, just reject result_promise
148        self.result_promise.reject(cx.into(), v, can_gc);
149    }
150}
151
152#[derive(JSTraceable, MallocSizeOf)]
153#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
154/// Reacting to rejection of backpressureChangePromise as part of
155/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
156struct BackpressureChangeRejection {
157    #[conditional_malloc_size_of]
158    result_promise: Rc<Promise>,
159}
160
161impl Callback for BackpressureChangeRejection {
162    fn callback(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) {
163        let can_gc = CanGc::from_cx(cx);
164        self.result_promise.reject(cx.into(), reason, can_gc);
165    }
166}
167
168impl js::gc::Rootable for CancelPromiseFulfillment {}
169
170/// Reacting to fulfillment of the cancelpromise as part of
171/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-abort-algorithm>
172#[derive(JSTraceable, MallocSizeOf)]
173#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
174struct CancelPromiseFulfillment {
175    readable: Dom<ReadableStream>,
176    controller: Dom<TransformStreamDefaultController>,
177    #[ignore_malloc_size_of = "mozjs"]
178    reason: Box<Heap<JSVal>>,
179}
180
181impl Callback for CancelPromiseFulfillment {
182    /// Reacting to backpressureChangePromise with the following fulfillment steps:
183    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
184        let can_gc = CanGc::from_cx(cx);
185        let cx: SafeJSContext = cx.into();
186        // If readable.[[state]] is "errored", reject controller.[[finishPromise]] with readable.[[storedError]].
187        if self.readable.is_errored() {
188            rooted!(in(*cx) let mut error = UndefinedValue());
189            self.readable.get_stored_error(error.handle_mut());
190            self.controller
191                .get_finish_promise()
192                .expect("finish promise is not set")
193                .reject_native(&error.handle(), can_gc);
194        } else {
195            // Otherwise:
196            // Perform ! ReadableStreamDefaultControllerError(readable.[[controller]], reason).
197            rooted!(in(*cx) let mut reason = UndefinedValue());
198            reason.set(self.reason.get());
199            self.readable
200                .get_default_controller()
201                .error(reason.handle(), can_gc);
202
203            // Resolve controller.[[finishPromise]] with undefined.
204            self.controller
205                .get_finish_promise()
206                .expect("finish promise is not set")
207                .resolve_native(&(), can_gc);
208        }
209    }
210}
211
212impl js::gc::Rootable for CancelPromiseRejection {}
213
214/// Reacting to rejection of cancelpromise as part of
215/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-abort-algorithm>
216#[derive(JSTraceable, MallocSizeOf)]
217#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
218struct CancelPromiseRejection {
219    readable: Dom<ReadableStream>,
220    controller: Dom<TransformStreamDefaultController>,
221}
222
223impl Callback for CancelPromiseRejection {
224    /// Reacting to backpressureChangePromise with the following fulfillment steps:
225    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
226        let can_gc = CanGc::from_cx(cx);
227        let cx: SafeJSContext = cx.into();
228        // Perform ! ReadableStreamDefaultControllerError(readable.[[controller]], r).
229        self.readable.get_default_controller().error(v, can_gc);
230
231        // Reject controller.[[finishPromise]] with r.
232        self.controller
233            .get_finish_promise()
234            .expect("finish promise is not set")
235            .reject(cx, v, can_gc);
236    }
237}
238
239impl js::gc::Rootable for SourceCancelPromiseFulfillment {}
240
241/// Reacting to fulfillment of the cancelpromise as part of
242/// <https://streams.spec.whatwg.org/#transform-stream-default-source-cancel>
243#[derive(JSTraceable, MallocSizeOf)]
244#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
245struct SourceCancelPromiseFulfillment {
246    writeable: Dom<WritableStream>,
247    controller: Dom<TransformStreamDefaultController>,
248    stream: Dom<TransformStream>,
249    #[ignore_malloc_size_of = "mozjs"]
250    reason: Box<Heap<JSVal>>,
251}
252
253impl Callback for SourceCancelPromiseFulfillment {
254    /// Reacting to backpressureChangePromise with the following fulfillment steps:
255    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
256        // If cancelPromise was fulfilled, then:
257        let finish_promise = self
258            .controller
259            .get_finish_promise()
260            .expect("finish promise is not set");
261
262        let global = &self.writeable.global();
263        // If writable.[[state]] is "errored", reject controller.[[finishPromise]] with writable.[[storedError]].
264        if self.writeable.is_errored() {
265            rooted!(&in(cx) let mut error = UndefinedValue());
266            self.writeable.get_stored_error(error.handle_mut());
267            finish_promise.reject(cx.into(), error.handle(), CanGc::from_cx(cx));
268        } else {
269            // Otherwise:
270            // Perform ! WritableStreamDefaultControllerErrorIfNeeded(writable.[[controller]], reason).
271            rooted!(&in(cx) let mut reason = UndefinedValue());
272            reason.set(self.reason.get());
273            self.writeable
274                .get_default_controller()
275                .error_if_needed(cx, reason.handle(), global);
276
277            // Perform ! TransformStreamUnblockWrite(stream).
278            self.stream.unblock_write(global, CanGc::from_cx(cx));
279
280            // Resolve controller.[[finishPromise]] with undefined.
281            finish_promise.resolve_native(&(), CanGc::from_cx(cx));
282        }
283    }
284}
285
286impl js::gc::Rootable for SourceCancelPromiseRejection {}
287
288/// Reacting to rejection of cancelpromise as part of
289/// <https://streams.spec.whatwg.org/#transform-stream-default-source-cancel>
290#[derive(JSTraceable, MallocSizeOf)]
291#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
292struct SourceCancelPromiseRejection {
293    writeable: Dom<WritableStream>,
294    controller: Dom<TransformStreamDefaultController>,
295    stream: Dom<TransformStream>,
296}
297
298impl Callback for SourceCancelPromiseRejection {
299    /// Reacting to backpressureChangePromise with the following fulfillment steps:
300    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
301        // Perform ! WritableStreamDefaultControllerErrorIfNeeded(writable.[[controller]], r).
302        let global = &self.writeable.global();
303
304        self.writeable
305            .get_default_controller()
306            .error_if_needed(cx, v, global);
307
308        // Perform ! TransformStreamUnblockWrite(stream).
309        self.stream.unblock_write(global, CanGc::from_cx(cx));
310
311        // Reject controller.[[finishPromise]] with r.
312        self.controller
313            .get_finish_promise()
314            .expect("finish promise is not set")
315            .reject(cx.into(), v, CanGc::from_cx(cx));
316    }
317}
318
319impl js::gc::Rootable for FlushPromiseFulfillment {}
320
321/// Reacting to fulfillment of the flushpromise as part of
322/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-close-algorithm>
323#[derive(JSTraceable, MallocSizeOf)]
324#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
325struct FlushPromiseFulfillment {
326    readable: Dom<ReadableStream>,
327    controller: Dom<TransformStreamDefaultController>,
328}
329
330impl Callback for FlushPromiseFulfillment {
331    /// Reacting to flushpromise with the following fulfillment steps:
332    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
333        let can_gc = CanGc::from_cx(cx);
334        let cx: SafeJSContext = cx.into();
335        // If flushPromise was fulfilled, then:
336        let finish_promise = self
337            .controller
338            .get_finish_promise()
339            .expect("finish promise is not set");
340
341        // If readable.[[state]] is "errored", reject controller.[[finishPromise]] with readable.[[storedError]].
342        if self.readable.is_errored() {
343            rooted!(in(*cx) let mut error = UndefinedValue());
344            self.readable.get_stored_error(error.handle_mut());
345            finish_promise.reject(cx, error.handle(), can_gc);
346        } else {
347            // Otherwise:
348            // Perform ! ReadableStreamDefaultControllerClose(readable.[[controller]]).
349            self.readable.get_default_controller().close(can_gc);
350
351            // Resolve controller.[[finishPromise]] with undefined.
352            finish_promise.resolve_native(&(), can_gc);
353        }
354    }
355}
356
357impl js::gc::Rootable for FlushPromiseRejection {}
358/// Reacting to rejection of flushpromise as part of
359/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-close-algorithm>
360
361#[derive(JSTraceable, MallocSizeOf)]
362#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
363struct FlushPromiseRejection {
364    readable: Dom<ReadableStream>,
365    controller: Dom<TransformStreamDefaultController>,
366}
367
368impl Callback for FlushPromiseRejection {
369    /// Reacting to flushpromise with the following fulfillment steps:
370    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
371        let can_gc = CanGc::from_cx(cx);
372        let cx: SafeJSContext = cx.into();
373        // If flushPromise was rejected with reason r, then:
374        // Perform ! ReadableStreamDefaultControllerError(readable.[[controller]], r).
375        self.readable.get_default_controller().error(v, can_gc);
376
377        // Reject controller.[[finishPromise]] with r.
378        self.controller
379            .get_finish_promise()
380            .expect("finish promise is not set")
381            .reject(cx, v, can_gc);
382    }
383}
384
385impl js::gc::Rootable for CrossRealmTransform {}
386
387/// A wrapper to handle `message` and `messageerror` events
388/// for the message port used by the transfered stream.
389#[derive(Clone, JSTraceable, MallocSizeOf)]
390#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
391pub(crate) enum CrossRealmTransform {
392    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
393    Readable(CrossRealmTransformReadable),
394    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
395    Writable(CrossRealmTransformWritable),
396}
397
398/// <https://streams.spec.whatwg.org/#ts-class>
399#[dom_struct]
400pub struct TransformStream {
401    reflector_: Reflector,
402
403    /// <https://streams.spec.whatwg.org/#transformstream-backpressure>
404    backpressure: Cell<bool>,
405
406    /// <https://streams.spec.whatwg.org/#transformstream-backpressurechangepromise>
407    #[conditional_malloc_size_of]
408    backpressure_change_promise: DomRefCell<Option<Rc<Promise>>>,
409
410    /// <https://streams.spec.whatwg.org/#transformstream-controller>
411    controller: MutNullableDom<TransformStreamDefaultController>,
412
413    /// <https://streams.spec.whatwg.org/#transformstream-detached>
414    detached: Cell<bool>,
415
416    /// <https://streams.spec.whatwg.org/#transformstream-readable>
417    readable: MutNullableDom<ReadableStream>,
418
419    /// <https://streams.spec.whatwg.org/#transformstream-writable>
420    writable: MutNullableDom<WritableStream>,
421}
422
423impl TransformStream {
424    /// <https://streams.spec.whatwg.org/#initialize-transform-stream>
425    fn new_inherited() -> TransformStream {
426        TransformStream {
427            reflector_: Reflector::new(),
428            backpressure: Default::default(),
429            backpressure_change_promise: DomRefCell::new(None),
430            controller: MutNullableDom::new(None),
431            detached: Cell::new(false),
432            readable: MutNullableDom::new(None),
433            writable: MutNullableDom::new(None),
434        }
435    }
436
437    pub(crate) fn new_with_proto(
438        global: &GlobalScope,
439        proto: Option<SafeHandleObject>,
440        can_gc: CanGc,
441    ) -> DomRoot<TransformStream> {
442        reflect_dom_object_with_proto(
443            Box::new(TransformStream::new_inherited()),
444            global,
445            proto,
446            can_gc,
447        )
448    }
449
450    /// Creates and set up the newly created transform stream following
451    /// <https://streams.spec.whatwg.org/#transformstream-set-up>
452    pub(crate) fn set_up(
453        &self,
454        cx: &mut js::context::JSContext,
455        global: &GlobalScope,
456        transformer_type: TransformerType,
457    ) -> Fallible<()> {
458        // Step1. Let writableHighWaterMark be 1.
459        let writable_high_water_mark = 1.0;
460
461        // Step 2. Let writableSizeAlgorithm be an algorithm that returns 1.
462        let writable_size_algorithm =
463            extract_size_algorithm(&Default::default(), CanGc::from_cx(cx));
464
465        // Step 3. Let readableHighWaterMark be 0.
466        let readable_high_water_mark = 0.0;
467
468        // Step 4. Let readableSizeAlgorithm be an algorithm that returns 1.
469        let readable_size_algorithm =
470            extract_size_algorithm(&Default::default(), CanGc::from_cx(cx));
471
472        // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
473        // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
474        // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
475        // NOTE: These steps are implemented in `TransformStreamDefaultController::new`
476
477        // Step 8. Let startPromise be a promise resolved with undefined.
478        let start_promise = Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
479
480        // Step 9. Perform ! InitializeTransformStream(stream, startPromise,
481        // writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark,
482        // readableSizeAlgorithm).
483        self.initialize(
484            cx,
485            global,
486            start_promise,
487            writable_high_water_mark,
488            writable_size_algorithm,
489            readable_high_water_mark,
490            readable_size_algorithm,
491        )?;
492
493        // Step 10. Let controller be a new TransformStreamDefaultController.
494        let controller =
495            TransformStreamDefaultController::new(global, transformer_type, CanGc::from_cx(cx));
496
497        // Step 11. Perform ! SetUpTransformStreamDefaultController(stream,
498        // controller, transformAlgorithmWrapper, flushAlgorithmWrapper,
499        // cancelAlgorithmWrapper).
500        self.set_up_transform_stream_default_controller(&controller);
501
502        Ok(())
503    }
504
505    pub(crate) fn get_controller(&self) -> DomRoot<TransformStreamDefaultController> {
506        self.controller.get().expect("controller is not set")
507    }
508
509    pub(crate) fn get_writable(&self) -> DomRoot<WritableStream> {
510        self.writable.get().expect("writable stream is not set")
511    }
512
513    pub(crate) fn get_readable(&self) -> DomRoot<ReadableStream> {
514        self.readable.get().expect("readable stream is not set")
515    }
516
517    pub(crate) fn get_backpressure(&self) -> bool {
518        self.backpressure.get()
519    }
520
521    /// <https://streams.spec.whatwg.org/#initialize-transform-stream>
522    #[expect(clippy::too_many_arguments)]
523    fn initialize(
524        &self,
525        cx: &mut js::context::JSContext,
526        global: &GlobalScope,
527        start_promise: Rc<Promise>,
528        writable_high_water_mark: f64,
529        writable_size_algorithm: Rc<QueuingStrategySize>,
530        readable_high_water_mark: f64,
531        readable_size_algorithm: Rc<QueuingStrategySize>,
532    ) -> Fallible<()> {
533        // Let startAlgorithm be an algorithm that returns startPromise.
534        // Let writeAlgorithm be the following steps, taking a chunk argument:
535        //  Return ! TransformStreamDefaultSinkWriteAlgorithm(stream, chunk).
536        // Let abortAlgorithm be the following steps, taking a reason argument:
537        //  Return ! TransformStreamDefaultSinkAbortAlgorithm(stream, reason).
538        // Let closeAlgorithm be the following steps:
539        //  Return ! TransformStreamDefaultSinkCloseAlgorithm(stream).
540        // Set stream.[[writable]] to ! CreateWritableStream(startAlgorithm, writeAlgorithm,
541        // closeAlgorithm, abortAlgorithm, writableHighWaterMark, writableSizeAlgorithm).
542        // Note: Those steps are implemented using UnderlyingSinkType::Transform.
543
544        let writable = create_writable_stream(
545            cx,
546            global,
547            writable_high_water_mark,
548            writable_size_algorithm,
549            UnderlyingSinkType::Transform(Dom::from_ref(self), start_promise.clone()),
550        )?;
551        self.writable.set(Some(&writable));
552
553        // Let pullAlgorithm be the following steps:
554
555        // Return ! TransformStreamDefaultSourcePullAlgorithm(stream).
556
557        // Let cancelAlgorithm be the following steps, taking a reason argument:
558
559        // Return ! TransformStreamDefaultSourceCancelAlgorithm(stream, reason).
560
561        // Set stream.[[readable]] to ! CreateReadableStream(startAlgorithm, pullAlgorithm,
562        // cancelAlgorithm, readableHighWaterMark, readableSizeAlgorithm).
563
564        let readable = create_readable_stream(
565            cx,
566            global,
567            UnderlyingSourceType::Transform(Dom::from_ref(self), start_promise),
568            Some(readable_size_algorithm),
569            Some(readable_high_water_mark),
570        );
571        self.readable.set(Some(&readable));
572
573        // Set stream.[[backpressure]] and stream.[[backpressureChangePromise]] to undefined.
574        // Note: This is done in the constructor.
575
576        // Perform ! TransformStreamSetBackpressure(stream, true).
577        self.set_backpressure(global, true, CanGc::from_cx(cx));
578
579        // Set stream.[[controller]] to undefined.
580        self.controller.set(None);
581
582        Ok(())
583    }
584
585    /// <https://streams.spec.whatwg.org/#transform-stream-set-backpressure>
586    pub(crate) fn set_backpressure(&self, global: &GlobalScope, backpressure: bool, can_gc: CanGc) {
587        // Assert: stream.[[backpressure]] is not backpressure.
588        assert!(self.backpressure.get() != backpressure);
589
590        // If stream.[[backpressureChangePromise]] is not undefined, resolve
591        // stream.[[backpressureChangePromise]] with undefined.
592        if let Some(promise) = self.backpressure_change_promise.borrow_mut().take() {
593            promise.resolve_native(&(), can_gc);
594        }
595
596        // Set stream.[[backpressureChangePromise]] to a new promise.;
597        *self.backpressure_change_promise.borrow_mut() = Some(Promise::new(global, can_gc));
598
599        // Set stream.[[backpressure]] to backpressure.
600        self.backpressure.set(backpressure);
601    }
602
603    /// <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller>
604    fn set_up_transform_stream_default_controller(
605        &self,
606        controller: &TransformStreamDefaultController,
607    ) {
608        // Assert: stream implements TransformStream.
609        // Note: this is checked with type.
610
611        // Assert: stream.[[controller]] is undefined.
612        assert!(self.controller.get().is_none());
613
614        // Set controller.[[stream]] to stream.
615        controller.set_stream(self);
616
617        // Set stream.[[controller]] to controller.
618        self.controller.set(Some(controller));
619
620        // Set controller.[[transformAlgorithm]] to transformAlgorithm.
621        // Set controller.[[flushAlgorithm]] to flushAlgorithm.
622        // Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
623        // Note: These are set in the constructor.
624    }
625
626    /// <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
627    fn set_up_transform_stream_default_controller_from_transformer(
628        &self,
629        global: &GlobalScope,
630        transformer_obj: SafeHandleObject,
631        transformer: &Transformer,
632        can_gc: CanGc,
633    ) {
634        // Let controller be a new TransformStreamDefaultController.
635        let transformer_type = TransformerType::new_from_js_transformer(transformer);
636        let controller = TransformStreamDefaultController::new(global, transformer_type, can_gc);
637
638        // Let transformAlgorithm be the following steps, taking a chunk argument:
639        // Let result be TransformStreamDefaultControllerEnqueue(controller, chunk).
640        // If result is an abrupt completion, return a promise rejected with result.[[Value]].
641        // Otherwise, return a promise resolved with undefined.
642
643        // Let flushAlgorithm be an algorithm which returns a promise resolved with undefined.
644        // Let cancelAlgorithm be an algorithm which returns a promise resolved with undefined.
645
646        // If transformerDict["transform"] exists, set transformAlgorithm to an algorithm which
647        // takes an argument
648        // chunk and returns the result of invoking transformerDict["transform"] with argument
649        // list « chunk, controller »
650        // and callback this value transformer.
651
652        // If transformerDict["flush"] exists, set flushAlgorithm to an algorithm which returns
653        // the result
654        // of invoking transformerDict["flush"] with argument list « controller » and callback
655        // this value transformer.
656
657        // If transformerDict["cancel"] exists, set cancelAlgorithm to an algorithm which takes an argument
658        // reason and returns the result of invoking transformerDict["cancel"] with argument list « reason »
659        // and callback this value transformer.
660        controller.set_transform_obj(transformer_obj);
661
662        // Perform ! SetUpTransformStreamDefaultController(stream, controller,
663        // transformAlgorithm, flushAlgorithm, cancelAlgorithm).
664        self.set_up_transform_stream_default_controller(&controller);
665    }
666
667    /// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
668    pub(crate) fn transform_stream_default_sink_write_algorithm(
669        &self,
670        cx: &mut js::context::JSContext,
671        global: &GlobalScope,
672        chunk: SafeHandleValue,
673    ) -> Fallible<Rc<Promise>> {
674        // Assert: stream.[[writable]].[[state]] is "writable".
675        assert!(self.writable.get().is_some());
676
677        // Let controller be stream.[[controller]].
678        let controller = self.controller.get().expect("controller is not set");
679
680        // If stream.[[backpressure]] is true,
681        if self.backpressure.get() {
682            // Let backpressureChangePromise be stream.[[backpressureChangePromise]].
683            let backpressure_change_promise = self.backpressure_change_promise.borrow();
684
685            // Assert: backpressureChangePromise is not undefined.
686            assert!(backpressure_change_promise.is_some());
687
688            // Return the result of reacting to backpressureChangePromise with the following fulfillment steps:
689            let result_promise = Promise::new2(cx, global);
690            rooted!(&in(cx) let mut fulfillment_handler = Some(TransformBackPressureChangePromiseFulfillment {
691                controller: Dom::from_ref(&controller),
692                writable: Dom::from_ref(&self.writable.get().expect("writable stream")),
693                chunk: Heap::boxed(chunk.get()),
694                result_promise: result_promise.clone(),
695            }));
696
697            let handler = PromiseNativeHandler::new(
698                global,
699                fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
700                Some(Box::new(BackpressureChangeRejection {
701                    result_promise: result_promise.clone(),
702                })),
703                CanGc::from_cx(cx),
704            );
705            let mut realm = enter_auto_realm(cx, global);
706            let realm = &mut realm.current_realm();
707            let in_realm_proof = realm.into();
708            let comp = InRealm::Already(&in_realm_proof);
709            backpressure_change_promise
710                .as_ref()
711                .expect("Promise must be some by now.")
712                .append_native_handler(&handler, comp, CanGc::from_cx(realm));
713
714            return Ok(result_promise);
715        }
716
717        // Return ! TransformStreamDefaultControllerPerformTransform(controller, chunk).
718        controller.transform_stream_default_controller_perform_transform(cx, global, chunk)
719    }
720
721    /// <https://streams.spec.whatwg.org/#transform-stream-default-sink-abort-algorithm>
722    pub(crate) fn transform_stream_default_sink_abort_algorithm(
723        &self,
724        cx: SafeJSContext,
725        global: &GlobalScope,
726        reason: SafeHandleValue,
727        can_gc: CanGc,
728    ) -> Fallible<Rc<Promise>> {
729        // Let controller be stream.[[controller]].
730        let controller = self.controller.get().expect("controller is not set");
731
732        // If controller.[[finishPromise]] is not undefined, return controller.[[finishPromise]].
733        if let Some(finish_promise) = controller.get_finish_promise() {
734            return Ok(finish_promise);
735        }
736
737        // Let readable be stream.[[readable]].
738        let readable = self.readable.get().expect("readable stream is not set");
739
740        // Let controller.[[finishPromise]] be a new promise.
741        controller.set_finish_promise(Promise::new(global, can_gc));
742
743        // Let cancelPromise be the result of performing controller.[[cancelAlgorithm]], passing reason.
744        let cancel_promise = controller.perform_cancel(cx, global, reason, can_gc)?;
745
746        // Perform ! TransformStreamDefaultControllerClearAlgorithms(controller).
747        controller.clear_algorithms();
748
749        // React to cancelPromise:
750        let handler = PromiseNativeHandler::new(
751            global,
752            Some(Box::new(CancelPromiseFulfillment {
753                readable: Dom::from_ref(&readable),
754                controller: Dom::from_ref(&controller),
755                reason: Heap::boxed(reason.get()),
756            })),
757            Some(Box::new(CancelPromiseRejection {
758                readable: Dom::from_ref(&readable),
759                controller: Dom::from_ref(&controller),
760            })),
761            can_gc,
762        );
763        let realm = enter_realm(global);
764        let comp = InRealm::Entered(&realm);
765        cancel_promise.append_native_handler(&handler, comp, can_gc);
766
767        // Return controller.[[finishPromise]].
768        let finish_promise = controller
769            .get_finish_promise()
770            .expect("finish promise is not set");
771        Ok(finish_promise)
772    }
773
774    /// <https://streams.spec.whatwg.org/#transform-stream-default-sink-close-algorithm>
775    pub(crate) fn transform_stream_default_sink_close_algorithm(
776        &self,
777        cx: &mut js::context::JSContext,
778        global: &GlobalScope,
779    ) -> Fallible<Rc<Promise>> {
780        // Let controller be stream.[[controller]].
781        let controller = self
782            .controller
783            .get()
784            .ok_or(Error::Type(c"controller is not set".to_owned()))?;
785
786        // If controller.[[finishPromise]] is not undefined, return controller.[[finishPromise]].
787        if let Some(finish_promise) = controller.get_finish_promise() {
788            return Ok(finish_promise);
789        }
790
791        // Let readable be stream.[[readable]].
792        let readable = self
793            .readable
794            .get()
795            .ok_or(Error::Type(c"readable stream is not set".to_owned()))?;
796
797        // Let controller.[[finishPromise]] be a new promise.
798        controller.set_finish_promise(Promise::new2(cx, global));
799
800        // Let flushPromise be the result of performing controller.[[flushAlgorithm]].
801        let flush_promise = controller.perform_flush(cx, global)?;
802
803        // Perform ! TransformStreamDefaultControllerClearAlgorithms(controller).
804        controller.clear_algorithms();
805
806        // React to flushPromise:
807        let handler = PromiseNativeHandler::new(
808            global,
809            Some(Box::new(FlushPromiseFulfillment {
810                readable: Dom::from_ref(&readable),
811                controller: Dom::from_ref(&controller),
812            })),
813            Some(Box::new(FlushPromiseRejection {
814                readable: Dom::from_ref(&readable),
815                controller: Dom::from_ref(&controller),
816            })),
817            CanGc::from_cx(cx),
818        );
819
820        let mut realm = enter_auto_realm(cx, global);
821        let realm = &mut realm.current_realm();
822        let in_realm_proof = realm.into();
823        let comp = InRealm::Already(&in_realm_proof);
824        flush_promise.append_native_handler(&handler, comp, CanGc::from_cx(realm));
825        // Return controller.[[finishPromise]].
826        let finish_promise = controller
827            .get_finish_promise()
828            .expect("finish promise is not set");
829        Ok(finish_promise)
830    }
831
832    /// <https://streams.spec.whatwg.org/#transform-stream-default-source-cancel>
833    pub(crate) fn transform_stream_default_source_cancel(
834        &self,
835        cx: SafeJSContext,
836        global: &GlobalScope,
837        reason: SafeHandleValue,
838        can_gc: CanGc,
839    ) -> Fallible<Rc<Promise>> {
840        // Let controller be stream.[[controller]].
841        let controller = self
842            .controller
843            .get()
844            .ok_or(Error::Type(c"controller is not set".to_owned()))?;
845
846        // If controller.[[finishPromise]] is not undefined, return controller.[[finishPromise]].
847        if let Some(finish_promise) = controller.get_finish_promise() {
848            return Ok(finish_promise);
849        }
850
851        // Let writable be stream.[[writable]].
852        let writable = self
853            .writable
854            .get()
855            .ok_or(Error::Type(c"writable stream is not set".to_owned()))?;
856
857        // Let controller.[[finishPromise]] be a new promise.
858        controller.set_finish_promise(Promise::new(global, can_gc));
859
860        // Let cancelPromise be the result of performing controller.[[cancelAlgorithm]], passing reason.
861        let cancel_promise = controller.perform_cancel(cx, global, reason, can_gc)?;
862
863        // Perform ! TransformStreamDefaultControllerClearAlgorithms(controller).
864        controller.clear_algorithms();
865
866        // React to cancelPromise:
867        let handler = PromiseNativeHandler::new(
868            global,
869            Some(Box::new(SourceCancelPromiseFulfillment {
870                writeable: Dom::from_ref(&writable),
871                controller: Dom::from_ref(&controller),
872                stream: Dom::from_ref(self),
873                reason: Heap::boxed(reason.get()),
874            })),
875            Some(Box::new(SourceCancelPromiseRejection {
876                writeable: Dom::from_ref(&writable),
877                controller: Dom::from_ref(&controller),
878                stream: Dom::from_ref(self),
879            })),
880            can_gc,
881        );
882
883        // Return controller.[[finishPromise]].
884        let finish_promise = controller
885            .get_finish_promise()
886            .expect("finish promise is not set");
887        let realm = enter_realm(global);
888        let comp = InRealm::Entered(&realm);
889        cancel_promise.append_native_handler(&handler, comp, can_gc);
890        Ok(finish_promise)
891    }
892
893    /// <https://streams.spec.whatwg.org/#transform-stream-default-source-pull>
894    pub(crate) fn transform_stream_default_source_pull(
895        &self,
896        global: &GlobalScope,
897        can_gc: CanGc,
898    ) -> Fallible<Rc<Promise>> {
899        // Assert: stream.[[backpressure]] is true.
900        assert!(self.backpressure.get());
901
902        // Assert: stream.[[backpressureChangePromise]] is not undefined.
903        assert!(self.backpressure_change_promise.borrow().is_some());
904
905        // Perform ! TransformStreamSetBackpressure(stream, false).
906        self.set_backpressure(global, false, can_gc);
907
908        // Return stream.[[backpressureChangePromise]].
909        Ok(self
910            .backpressure_change_promise
911            .borrow()
912            .clone()
913            .expect("Promise must be some by now."))
914    }
915
916    /// <https://streams.spec.whatwg.org/#transform-stream-error-writable-and-unblock-write>
917    pub(crate) fn error_writable_and_unblock_write(
918        &self,
919        cx: &mut js::context::JSContext,
920        global: &GlobalScope,
921        error: SafeHandleValue,
922    ) {
923        // Perform ! TransformStreamDefaultControllerClearAlgorithms(stream.[[controller]]).
924        self.get_controller().clear_algorithms();
925
926        // Perform ! WritableStreamDefaultControllerErrorIfNeeded(stream.[[writable]].[[controller]], e).
927        self.get_writable()
928            .get_default_controller()
929            .error_if_needed(cx, error, global);
930
931        // Perform ! TransformStreamUnblockWrite(stream).
932        self.unblock_write(global, CanGc::from_cx(cx))
933    }
934
935    /// <https://streams.spec.whatwg.org/#transform-stream-unblock-write>
936    pub(crate) fn unblock_write(&self, global: &GlobalScope, can_gc: CanGc) {
937        // If stream.[[backpressure]] is true, perform ! TransformStreamSetBackpressure(stream, false).
938        if self.backpressure.get() {
939            self.set_backpressure(global, false, can_gc);
940        }
941    }
942
943    /// <https://streams.spec.whatwg.org/#transform-stream-error>
944    pub(crate) fn error(
945        &self,
946        cx: &mut js::context::JSContext,
947        global: &GlobalScope,
948        error: SafeHandleValue,
949    ) {
950        // Perform ! ReadableStreamDefaultControllerError(stream.[[readable]].[[controller]], e).
951        self.get_readable()
952            .get_default_controller()
953            .error(error, CanGc::from_cx(cx));
954
955        // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, e).
956        self.error_writable_and_unblock_write(cx, global, error);
957    }
958}
959
960impl TransformStreamMethods<crate::DomTypeHolder> for TransformStream {
961    /// <https://streams.spec.whatwg.org/#ts-constructor>
962    #[expect(unsafe_code)]
963    fn Constructor(
964        cx: &mut js::context::JSContext,
965        global: &GlobalScope,
966        proto: Option<SafeHandleObject>,
967        transformer: Option<*mut JSObject>,
968        writable_strategy: &QueuingStrategy,
969        readable_strategy: &QueuingStrategy,
970    ) -> Fallible<DomRoot<TransformStream>> {
971        // If transformer is missing, set it to null.
972        rooted!(&in(cx) let transformer_obj = transformer.unwrap_or(ptr::null_mut()));
973
974        // Let underlyingSinkDict be underlyingSink,
975        // converted to an IDL value of type UnderlyingSink.
976        let transformer_dict = if !transformer_obj.is_null() {
977            rooted!(&in(cx) let obj_val = ObjectValue(transformer_obj.get()));
978            match Transformer::new(cx.into(), obj_val.handle(), CanGc::from_cx(cx)) {
979                Ok(ConversionResult::Success(val)) => val,
980                Ok(ConversionResult::Failure(error)) => {
981                    return Err(Error::Type(error.into_owned()));
982                },
983                _ => {
984                    return Err(Error::JSFailed);
985                },
986            }
987        } else {
988            Transformer::empty()
989        };
990
991        // If transformerDict["readableType"] exists, throw a RangeError exception.
992        if !transformer_dict.readableType.handle().is_undefined() {
993            return Err(Error::Range(c"readableType is set".to_owned()));
994        }
995
996        // If transformerDict["writableType"] exists, throw a RangeError exception.
997        if !transformer_dict.writableType.handle().is_undefined() {
998            return Err(Error::Range(c"writableType is set".to_owned()));
999        }
1000
1001        // Let readableHighWaterMark be ? ExtractHighWaterMark(readableStrategy, 0).
1002        let readable_high_water_mark = extract_high_water_mark(readable_strategy, 0.0)?;
1003
1004        // Let readableSizeAlgorithm be ! ExtractSizeAlgorithm(readableStrategy).
1005        let readable_size_algorithm = extract_size_algorithm(readable_strategy, CanGc::from_cx(cx));
1006
1007        // Let writableHighWaterMark be ? ExtractHighWaterMark(writableStrategy, 1).
1008        let writable_high_water_mark = extract_high_water_mark(writable_strategy, 1.0)?;
1009
1010        // Let writableSizeAlgorithm be ! ExtractSizeAlgorithm(writableStrategy).
1011        let writable_size_algorithm = extract_size_algorithm(writable_strategy, CanGc::from_cx(cx));
1012
1013        // Let startPromise be a new promise.
1014        let start_promise = Promise::new2(cx, global);
1015
1016        // Perform ! InitializeTransformStream(this, startPromise, writableHighWaterMark,
1017        // writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm).
1018        let stream = TransformStream::new_with_proto(global, proto, CanGc::from_cx(cx));
1019        stream.initialize(
1020            cx,
1021            global,
1022            start_promise.clone(),
1023            writable_high_water_mark,
1024            writable_size_algorithm,
1025            readable_high_water_mark,
1026            readable_size_algorithm,
1027        )?;
1028
1029        // Perform ? SetUpTransformStreamDefaultControllerFromTransformer(this, transformer, transformerDict).
1030        stream.set_up_transform_stream_default_controller_from_transformer(
1031            global,
1032            transformer_obj.handle(),
1033            &transformer_dict,
1034            CanGc::from_cx(cx),
1035        );
1036
1037        // If transformerDict["start"] exists, then resolve startPromise with the
1038        // result of invoking transformerDict["start"]
1039        // with argument list « this.[[controller]] » and callback this value transformer.
1040        if let Some(start) = &transformer_dict.start {
1041            rooted!(&in(cx) let mut result_object = ptr::null_mut::<JSObject>());
1042            rooted!(&in(cx) let mut result: JSVal);
1043            rooted!(&in(cx) let this_object = transformer_obj.get());
1044            start.Call_(
1045                &this_object.handle(),
1046                &stream.get_controller(),
1047                result.handle_mut(),
1048                ExceptionHandling::Rethrow,
1049                CanGc::from_cx(cx),
1050            )?;
1051            let is_promise = unsafe {
1052                if result.is_object() {
1053                    result_object.set(result.to_object());
1054                    IsPromiseObject(result_object.handle().into_handle())
1055                } else {
1056                    false
1057                }
1058            };
1059            let promise = if is_promise {
1060                Promise::new_with_js_promise(result_object.handle(), cx.into())
1061            } else {
1062                Promise::new_resolved(global, cx.into(), result.get(), CanGc::from_cx(cx))
1063            };
1064            start_promise.resolve_native(&promise, CanGc::from_cx(cx));
1065        } else {
1066            // Otherwise, resolve startPromise with undefined.
1067            start_promise.resolve_native(&(), CanGc::from_cx(cx));
1068        };
1069
1070        Ok(stream)
1071    }
1072
1073    /// <https://streams.spec.whatwg.org/#ts-readable>
1074    fn Readable(&self) -> DomRoot<ReadableStream> {
1075        // Return this.[[readable]].
1076        self.readable.get().expect("readable stream is not set")
1077    }
1078
1079    /// <https://streams.spec.whatwg.org/#ts-writable>
1080    fn Writable(&self) -> DomRoot<WritableStream> {
1081        // Return this.[[writable]].
1082        self.writable.get().expect("writable stream is not set")
1083    }
1084}
1085
1086/// <https://streams.spec.whatwg.org/#ts-transfer>
1087impl Transferable for TransformStream {
1088    type Index = MessagePortIndex;
1089    type Data = TransformStreamData;
1090
1091    /// <https://streams.spec.whatwg.org/#ref-for-transfer-steps②>
1092    fn transfer(
1093        &self,
1094        cx: &mut js::context::JSContext,
1095    ) -> Fallible<(MessagePortId, TransformStreamData)> {
1096        let global = self.global();
1097        let mut realm = enter_auto_realm(cx, &*global);
1098        let mut realm = realm.current_realm();
1099        let cx = &mut realm;
1100
1101        // Step 1. Let readable be value.[[readable]].
1102        let readable = self.get_readable();
1103
1104        // Step 2. Let writable be value.[[writable]].
1105        let writable = self.get_writable();
1106
1107        // Step 3. If ! IsReadableStreamLocked(readable) is true, throw a
1108        // "DataCloneError" DOMException.
1109        // Step 4. If ! IsWritableStreamLocked(writable) is true, throw a
1110        // "DataCloneError" DOMException.
1111        if readable.is_locked() || writable.is_locked() {
1112            return Err(Error::DataClone(None));
1113        }
1114
1115        // First port pair (readable → proxy writable)
1116        let port1 = MessagePort::new(&global, CanGc::from_cx(cx));
1117        global.track_message_port(&port1, None);
1118        let port1_peer = MessagePort::new(&global, CanGc::from_cx(cx));
1119        global.track_message_port(&port1_peer, None);
1120        global.entangle_ports(*port1.message_port_id(), *port1_peer.message_port_id());
1121
1122        let proxy_readable = ReadableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
1123        proxy_readable.setup_cross_realm_transform_readable(cx, &port1);
1124        proxy_readable
1125            .pipe_to(cx, &global, &writable, false, false, false, None)
1126            .set_promise_is_handled();
1127
1128        // Second port pair (proxy readable → writable)
1129        let port2 = MessagePort::new(&global, CanGc::from_cx(cx));
1130        global.track_message_port(&port2, None);
1131        let port2_peer = MessagePort::new(&global, CanGc::from_cx(cx));
1132        global.track_message_port(&port2_peer, None);
1133        global.entangle_ports(*port2.message_port_id(), *port2_peer.message_port_id());
1134
1135        let proxy_writable = WritableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
1136        proxy_writable.setup_cross_realm_transform_writable(cx, &port2);
1137
1138        // Pipe readable into the proxy writable (→ port_1)
1139        readable
1140            .pipe_to(cx, &global, &proxy_writable, false, false, false, None)
1141            .set_promise_is_handled();
1142
1143        // Step 5. Set dataHolder.[[readable]] to !
1144        // StructuredSerializeWithTransfer(readable, « readable »).
1145        // Step 6. Set dataHolder.[[writable]] to !
1146        // StructuredSerializeWithTransfer(writable, « writable »).
1147        Ok((
1148            *port1_peer.message_port_id(),
1149            TransformStreamData {
1150                readable: port1_peer.transfer(cx)?,
1151                writable: port2_peer.transfer(cx)?,
1152            },
1153        ))
1154    }
1155
1156    /// <https://streams.spec.whatwg.org/#ref-for-transfer-receiving-steps②>
1157    fn transfer_receive(
1158        cx: &mut js::context::JSContext,
1159        owner: &GlobalScope,
1160        _id: MessagePortId,
1161        data: TransformStreamData,
1162    ) -> Result<DomRoot<Self>, ()> {
1163        let port1 = MessagePort::transfer_receive(cx, owner, data.readable.0, data.readable.1)?;
1164        let port2 = MessagePort::transfer_receive(cx, owner, data.writable.0, data.writable.1)?;
1165
1166        // Step 1. Let readableRecord be !
1167        // StructuredDeserializeWithTransfer(dataHolder.[[readable]], the
1168        // current Realm).
1169        let proxy_readable = ReadableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1170        proxy_readable.setup_cross_realm_transform_readable(cx, &port2);
1171
1172        // Step 2. Let writableRecord be !
1173        // StructuredDeserializeWithTransfer(dataHolder.[[writable]], the
1174        // current Realm).
1175        let proxy_writable = WritableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1176        proxy_writable.setup_cross_realm_transform_writable(cx, &port1);
1177
1178        // Step 3. Set value.[[readable]] to readableRecord.[[Deserialized]].
1179        // Step 4. Set value.[[writable]] to writableRecord.[[Deserialized]].
1180        // Step 5. Set value.[[backpressure]],
1181        // value.[[backpressureChangePromise]], and value.[[controller]] to
1182        // undefined.
1183        let stream = TransformStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1184        stream.readable.set(Some(&proxy_readable));
1185        stream.writable.set(Some(&proxy_writable));
1186
1187        Ok(stream)
1188    }
1189
1190    fn serialized_storage<'a>(
1191        data: StructuredData<'a, '_>,
1192    ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
1193        match data {
1194            StructuredData::Reader(r) => &mut r.transform_streams_port_impls,
1195            StructuredData::Writer(w) => &mut w.transform_streams_port,
1196        }
1197    }
1198}