script/dom/stream/
transformstream.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::ptr::{self};
7use std::rc::Rc;
8
9use dom_struct::dom_struct;
10use js::jsapi::{Heap, IsPromiseObject, JSObject};
11use js::jsval::{JSVal, ObjectValue, UndefinedValue};
12use js::realm::CurrentRealm;
13use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle};
14use rustc_hash::FxHashMap;
15use script_bindings::callback::ExceptionHandling;
16use script_bindings::realms::InRealm;
17use servo_base::id::{MessagePortId, MessagePortIndex};
18use servo_constellation_traits::TransformStreamData;
19
20use super::readablestream::CrossRealmTransformReadable;
21use super::writablestream::CrossRealmTransformWritable;
22use crate::dom::bindings::cell::DomRefCell;
23use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::{
24    QueuingStrategy, QueuingStrategySize,
25};
26use crate::dom::bindings::codegen::Bindings::TransformStreamBinding::TransformStreamMethods;
27use crate::dom::bindings::codegen::Bindings::TransformerBinding::Transformer;
28use crate::dom::bindings::conversions::ConversionResult;
29use crate::dom::bindings::error::{Error, Fallible};
30use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
31use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
32use crate::dom::bindings::structuredclone::StructuredData;
33use crate::dom::bindings::transferable::Transferable;
34use crate::dom::globalscope::GlobalScope;
35use crate::dom::messageport::MessagePort;
36use crate::dom::promise::Promise;
37use crate::dom::promisenativehandler::Callback;
38use crate::dom::readablestream::{ReadableStream, create_readable_stream};
39use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
40use crate::dom::stream::transformstreamdefaultcontroller::TransformerType;
41use crate::dom::stream::underlyingsourcecontainer::UnderlyingSourceType;
42use crate::dom::stream::writablestream::create_writable_stream;
43use crate::dom::stream::writablestreamdefaultcontroller::UnderlyingSinkType;
44use crate::dom::types::{PromiseNativeHandler, TransformStreamDefaultController, WritableStream};
45use crate::realms::{enter_auto_realm, enter_realm};
46use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
47
48impl js::gc::Rootable for TransformBackPressureChangePromiseFulfillment {}
49
50/// Reacting to backpressureChangePromise as part of
51/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
52#[derive(JSTraceable, MallocSizeOf)]
53#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
54struct TransformBackPressureChangePromiseFulfillment {
55    /// The result of reacting to backpressureChangePromise.
56    #[conditional_malloc_size_of]
57    result_promise: Rc<Promise>,
58
59    #[ignore_malloc_size_of = "mozjs"]
60    chunk: Box<Heap<JSVal>>,
61
62    /// The writable used in the fulfillment steps
63    writable: Dom<WritableStream>,
64
65    controller: Dom<TransformStreamDefaultController>,
66}
67
68impl Callback for TransformBackPressureChangePromiseFulfillment {
69    /// Reacting to backpressureChangePromise with the following fulfillment steps:
70    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
71        // Let writable be stream.[[writable]].
72        // Let state be writable.[[state]].
73        // If state is "erroring", throw writable.[[storedError]].
74        if self.writable.is_erroring() {
75            rooted!(&in(cx) let mut error = UndefinedValue());
76            self.writable.get_stored_error(error.handle_mut());
77            self.result_promise
78                .reject(cx.into(), error.handle(), CanGc::from_cx(cx));
79            return;
80        }
81
82        // Assert: state is "writable".
83        assert!(self.writable.is_writable());
84
85        // Return ! TransformStreamDefaultControllerPerformTransform(controller, chunk).
86        rooted!(&in(cx) let mut chunk = UndefinedValue());
87        chunk.set(self.chunk.get());
88        let transform_result = self
89            .controller
90            .transform_stream_default_controller_perform_transform(
91                cx,
92                &self.writable.global(),
93                chunk.handle(),
94            )
95            .expect("perform transform failed");
96
97        // PerformTransformFulfillment and PerformTransformRejection do not need
98        // to be rooted because they only contain an Rc.
99        let handler = PromiseNativeHandler::new(
100            &self.writable.global(),
101            Some(Box::new(PerformTransformFulfillment {
102                result_promise: self.result_promise.clone(),
103            })),
104            Some(Box::new(PerformTransformRejection {
105                result_promise: self.result_promise.clone(),
106            })),
107            CanGc::from_cx(cx),
108        );
109
110        let mut realm = enter_auto_realm(cx, &*self.writable.global());
111        let realm = &mut realm.current_realm();
112        let in_realm_proof = realm.into();
113        let comp = InRealm::Already(&in_realm_proof);
114        transform_result.append_native_handler(&handler, comp, CanGc::from_cx(realm));
115    }
116}
117
118#[derive(JSTraceable, MallocSizeOf)]
119#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
120/// Reacting to fulfillment of performTransform as part of
121/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
122struct PerformTransformFulfillment {
123    #[conditional_malloc_size_of]
124    result_promise: Rc<Promise>,
125}
126
127impl Callback for PerformTransformFulfillment {
128    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
129        let can_gc = CanGc::from_cx(cx);
130        // Fulfilled: resolve the outer promise
131        self.result_promise.resolve_native(&(), can_gc);
132    }
133}
134
135#[derive(JSTraceable, MallocSizeOf)]
136#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
137/// Reacting to rejection of performTransform as part of
138/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
139struct PerformTransformRejection {
140    #[conditional_malloc_size_of]
141    result_promise: Rc<Promise>,
142}
143
144impl Callback for PerformTransformRejection {
145    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
146        let can_gc = CanGc::from_cx(cx);
147        // Stream already errored in perform_transform, just reject result_promise
148        self.result_promise.reject(cx.into(), v, can_gc);
149    }
150}
151
152#[derive(JSTraceable, MallocSizeOf)]
153#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
154/// Reacting to rejection of backpressureChangePromise as part of
155/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
156struct BackpressureChangeRejection {
157    #[conditional_malloc_size_of]
158    result_promise: Rc<Promise>,
159}
160
161impl Callback for BackpressureChangeRejection {
162    fn callback(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) {
163        let can_gc = CanGc::from_cx(cx);
164        self.result_promise.reject(cx.into(), reason, can_gc);
165    }
166}
167
168impl js::gc::Rootable for CancelPromiseFulfillment {}
169
170/// Reacting to fulfillment of the cancelpromise as part of
171/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-abort-algorithm>
172#[derive(JSTraceable, MallocSizeOf)]
173#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
174struct CancelPromiseFulfillment {
175    readable: Dom<ReadableStream>,
176    controller: Dom<TransformStreamDefaultController>,
177    #[ignore_malloc_size_of = "mozjs"]
178    reason: Box<Heap<JSVal>>,
179}
180
181impl Callback for CancelPromiseFulfillment {
182    /// Reacting to backpressureChangePromise with the following fulfillment steps:
183    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
184        let can_gc = CanGc::from_cx(cx);
185        let cx: SafeJSContext = cx.into();
186        // If readable.[[state]] is "errored", reject controller.[[finishPromise]] with readable.[[storedError]].
187        if self.readable.is_errored() {
188            rooted!(in(*cx) let mut error = UndefinedValue());
189            self.readable.get_stored_error(error.handle_mut());
190            self.controller
191                .get_finish_promise()
192                .expect("finish promise is not set")
193                .reject_native(&error.handle(), can_gc);
194        } else {
195            // Otherwise:
196            // Perform ! ReadableStreamDefaultControllerError(readable.[[controller]], reason).
197            rooted!(in(*cx) let mut reason = UndefinedValue());
198            reason.set(self.reason.get());
199            self.readable
200                .get_default_controller()
201                .error(reason.handle(), can_gc);
202
203            // Resolve controller.[[finishPromise]] with undefined.
204            self.controller
205                .get_finish_promise()
206                .expect("finish promise is not set")
207                .resolve_native(&(), can_gc);
208        }
209    }
210}
211
212impl js::gc::Rootable for CancelPromiseRejection {}
213
214/// Reacting to rejection of cancelpromise as part of
215/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-abort-algorithm>
216#[derive(JSTraceable, MallocSizeOf)]
217#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
218struct CancelPromiseRejection {
219    readable: Dom<ReadableStream>,
220    controller: Dom<TransformStreamDefaultController>,
221}
222
223impl Callback for CancelPromiseRejection {
224    /// Reacting to backpressureChangePromise with the following fulfillment steps:
225    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
226        let can_gc = CanGc::from_cx(cx);
227        let cx: SafeJSContext = cx.into();
228        // Perform ! ReadableStreamDefaultControllerError(readable.[[controller]], r).
229        self.readable.get_default_controller().error(v, can_gc);
230
231        // Reject controller.[[finishPromise]] with r.
232        self.controller
233            .get_finish_promise()
234            .expect("finish promise is not set")
235            .reject(cx, v, can_gc);
236    }
237}
238
239impl js::gc::Rootable for SourceCancelPromiseFulfillment {}
240
241/// Reacting to fulfillment of the cancelpromise as part of
242/// <https://streams.spec.whatwg.org/#transform-stream-default-source-cancel>
243#[derive(JSTraceable, MallocSizeOf)]
244#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
245struct SourceCancelPromiseFulfillment {
246    writeable: Dom<WritableStream>,
247    controller: Dom<TransformStreamDefaultController>,
248    stream: Dom<TransformStream>,
249    #[ignore_malloc_size_of = "mozjs"]
250    reason: Box<Heap<JSVal>>,
251}
252
253impl Callback for SourceCancelPromiseFulfillment {
254    /// Reacting to backpressureChangePromise with the following fulfillment steps:
255    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
256        // If cancelPromise was fulfilled, then:
257        let finish_promise = self
258            .controller
259            .get_finish_promise()
260            .expect("finish promise is not set");
261
262        let global = &self.writeable.global();
263        // If writable.[[state]] is "errored", reject controller.[[finishPromise]] with writable.[[storedError]].
264        if self.writeable.is_errored() {
265            rooted!(&in(cx) let mut error = UndefinedValue());
266            self.writeable.get_stored_error(error.handle_mut());
267            finish_promise.reject(cx.into(), error.handle(), CanGc::from_cx(cx));
268        } else {
269            // Otherwise:
270            // Perform ! WritableStreamDefaultControllerErrorIfNeeded(writable.[[controller]], reason).
271            rooted!(&in(cx) let mut reason = UndefinedValue());
272            reason.set(self.reason.get());
273            self.writeable
274                .get_default_controller()
275                .error_if_needed(cx, reason.handle(), global);
276
277            // Perform ! TransformStreamUnblockWrite(stream).
278            self.stream.unblock_write(global, CanGc::from_cx(cx));
279
280            // Resolve controller.[[finishPromise]] with undefined.
281            finish_promise.resolve_native(&(), CanGc::from_cx(cx));
282        }
283    }
284}
285
286impl js::gc::Rootable for SourceCancelPromiseRejection {}
287
288/// Reacting to rejection of cancelpromise as part of
289/// <https://streams.spec.whatwg.org/#transform-stream-default-source-cancel>
290#[derive(JSTraceable, MallocSizeOf)]
291#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
292struct SourceCancelPromiseRejection {
293    writeable: Dom<WritableStream>,
294    controller: Dom<TransformStreamDefaultController>,
295    stream: Dom<TransformStream>,
296}
297
298impl Callback for SourceCancelPromiseRejection {
299    /// Reacting to backpressureChangePromise with the following fulfillment steps:
300    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
301        // Perform ! WritableStreamDefaultControllerErrorIfNeeded(writable.[[controller]], r).
302        let global = &self.writeable.global();
303
304        self.writeable
305            .get_default_controller()
306            .error_if_needed(cx, v, global);
307
308        // Perform ! TransformStreamUnblockWrite(stream).
309        self.stream.unblock_write(global, CanGc::from_cx(cx));
310
311        // Reject controller.[[finishPromise]] with r.
312        self.controller
313            .get_finish_promise()
314            .expect("finish promise is not set")
315            .reject(cx.into(), v, CanGc::from_cx(cx));
316    }
317}
318
319impl js::gc::Rootable for FlushPromiseFulfillment {}
320
321/// Reacting to fulfillment of the flushpromise as part of
322/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-close-algorithm>
323#[derive(JSTraceable, MallocSizeOf)]
324#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
325struct FlushPromiseFulfillment {
326    readable: Dom<ReadableStream>,
327    controller: Dom<TransformStreamDefaultController>,
328}
329
330impl Callback for FlushPromiseFulfillment {
331    /// Reacting to flushpromise with the following fulfillment steps:
332    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
333        let can_gc = CanGc::from_cx(cx);
334        let cx: SafeJSContext = cx.into();
335        // If flushPromise was fulfilled, then:
336        let finish_promise = self
337            .controller
338            .get_finish_promise()
339            .expect("finish promise is not set");
340
341        // If readable.[[state]] is "errored", reject controller.[[finishPromise]] with readable.[[storedError]].
342        if self.readable.is_errored() {
343            rooted!(in(*cx) let mut error = UndefinedValue());
344            self.readable.get_stored_error(error.handle_mut());
345            finish_promise.reject(cx, error.handle(), can_gc);
346        } else {
347            // Otherwise:
348            // Perform ! ReadableStreamDefaultControllerClose(readable.[[controller]]).
349            self.readable.get_default_controller().close(can_gc);
350
351            // Resolve controller.[[finishPromise]] with undefined.
352            finish_promise.resolve_native(&(), can_gc);
353        }
354    }
355}
356
357impl js::gc::Rootable for FlushPromiseRejection {}
358/// Reacting to rejection of flushpromise as part of
359/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-close-algorithm>
360
361#[derive(JSTraceable, MallocSizeOf)]
362#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
363struct FlushPromiseRejection {
364    readable: Dom<ReadableStream>,
365    controller: Dom<TransformStreamDefaultController>,
366}
367
368impl Callback for FlushPromiseRejection {
369    /// Reacting to flushpromise with the following fulfillment steps:
370    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
371        let can_gc = CanGc::from_cx(cx);
372        let cx: SafeJSContext = cx.into();
373        // If flushPromise was rejected with reason r, then:
374        // Perform ! ReadableStreamDefaultControllerError(readable.[[controller]], r).
375        self.readable.get_default_controller().error(v, can_gc);
376
377        // Reject controller.[[finishPromise]] with r.
378        self.controller
379            .get_finish_promise()
380            .expect("finish promise is not set")
381            .reject(cx, v, can_gc);
382    }
383}
384
385impl js::gc::Rootable for CrossRealmTransform {}
386
387/// A wrapper to handle `message` and `messageerror` events
388/// for the message port used by the transfered stream.
389#[derive(Clone, JSTraceable, MallocSizeOf)]
390#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
391pub(crate) enum CrossRealmTransform {
392    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
393    Readable(CrossRealmTransformReadable),
394    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
395    Writable(CrossRealmTransformWritable),
396}
397
398/// <https://streams.spec.whatwg.org/#ts-class>
399#[dom_struct]
400pub struct TransformStream {
401    reflector_: Reflector,
402
403    /// <https://streams.spec.whatwg.org/#transformstream-backpressure>
404    backpressure: Cell<bool>,
405
406    /// <https://streams.spec.whatwg.org/#transformstream-backpressurechangepromise>
407    #[conditional_malloc_size_of]
408    backpressure_change_promise: DomRefCell<Option<Rc<Promise>>>,
409
410    /// <https://streams.spec.whatwg.org/#transformstream-controller>
411    controller: MutNullableDom<TransformStreamDefaultController>,
412
413    /// <https://streams.spec.whatwg.org/#transformstream-detached>
414    detached: Cell<bool>,
415
416    /// <https://streams.spec.whatwg.org/#transformstream-readable>
417    readable: MutNullableDom<ReadableStream>,
418
419    /// <https://streams.spec.whatwg.org/#transformstream-writable>
420    writable: MutNullableDom<WritableStream>,
421}
422
423impl TransformStream {
424    /// <https://streams.spec.whatwg.org/#initialize-transform-stream>
425    fn new_inherited() -> TransformStream {
426        TransformStream {
427            reflector_: Reflector::new(),
428            backpressure: Default::default(),
429            backpressure_change_promise: DomRefCell::new(None),
430            controller: MutNullableDom::new(None),
431            detached: Cell::new(false),
432            readable: MutNullableDom::new(None),
433            writable: MutNullableDom::new(None),
434        }
435    }
436
437    pub(crate) fn new_with_proto(
438        global: &GlobalScope,
439        proto: Option<SafeHandleObject>,
440        can_gc: CanGc,
441    ) -> DomRoot<TransformStream> {
442        reflect_dom_object_with_proto(
443            Box::new(TransformStream::new_inherited()),
444            global,
445            proto,
446            can_gc,
447        )
448    }
449
450    /// Creates and set up the newly created transform stream following
451    /// <https://streams.spec.whatwg.org/#transformstream-set-up>
452    pub(crate) fn set_up(
453        &self,
454        cx: SafeJSContext,
455        global: &GlobalScope,
456        transformer_type: TransformerType,
457        can_gc: CanGc,
458    ) -> Fallible<()> {
459        // Step1. Let writableHighWaterMark be 1.
460        let writable_high_water_mark = 1.0;
461
462        // Step 2. Let writableSizeAlgorithm be an algorithm that returns 1.
463        let writable_size_algorithm = extract_size_algorithm(&Default::default(), can_gc);
464
465        // Step 3. Let readableHighWaterMark be 0.
466        let readable_high_water_mark = 0.0;
467
468        // Step 4. Let readableSizeAlgorithm be an algorithm that returns 1.
469        let readable_size_algorithm = extract_size_algorithm(&Default::default(), can_gc);
470
471        // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
472        // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
473        // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
474        // NOTE: These steps are implemented in `TransformStreamDefaultController::new`
475
476        // Step 8. Let startPromise be a promise resolved with undefined.
477        let start_promise = Promise::new_resolved(global, cx, (), can_gc);
478
479        // Step 9. Perform ! InitializeTransformStream(stream, startPromise,
480        // writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark,
481        // readableSizeAlgorithm).
482        self.initialize(
483            cx,
484            global,
485            start_promise,
486            writable_high_water_mark,
487            writable_size_algorithm,
488            readable_high_water_mark,
489            readable_size_algorithm,
490            can_gc,
491        )?;
492
493        // Step 10. Let controller be a new TransformStreamDefaultController.
494        let controller = TransformStreamDefaultController::new(global, transformer_type, can_gc);
495
496        // Step 11. Perform ! SetUpTransformStreamDefaultController(stream,
497        // controller, transformAlgorithmWrapper, flushAlgorithmWrapper,
498        // cancelAlgorithmWrapper).
499        self.set_up_transform_stream_default_controller(&controller);
500
501        Ok(())
502    }
503
504    pub(crate) fn get_controller(&self) -> DomRoot<TransformStreamDefaultController> {
505        self.controller.get().expect("controller is not set")
506    }
507
508    pub(crate) fn get_writable(&self) -> DomRoot<WritableStream> {
509        self.writable.get().expect("writable stream is not set")
510    }
511
512    pub(crate) fn get_readable(&self) -> DomRoot<ReadableStream> {
513        self.readable.get().expect("readable stream is not set")
514    }
515
516    pub(crate) fn get_backpressure(&self) -> bool {
517        self.backpressure.get()
518    }
519
520    /// <https://streams.spec.whatwg.org/#initialize-transform-stream>
521    #[allow(clippy::too_many_arguments)]
522    fn initialize(
523        &self,
524        cx: SafeJSContext,
525        global: &GlobalScope,
526        start_promise: Rc<Promise>,
527        writable_high_water_mark: f64,
528        writable_size_algorithm: Rc<QueuingStrategySize>,
529        readable_high_water_mark: f64,
530        readable_size_algorithm: Rc<QueuingStrategySize>,
531        can_gc: CanGc,
532    ) -> Fallible<()> {
533        // Let startAlgorithm be an algorithm that returns startPromise.
534        // Let writeAlgorithm be the following steps, taking a chunk argument:
535        //  Return ! TransformStreamDefaultSinkWriteAlgorithm(stream, chunk).
536        // Let abortAlgorithm be the following steps, taking a reason argument:
537        //  Return ! TransformStreamDefaultSinkAbortAlgorithm(stream, reason).
538        // Let closeAlgorithm be the following steps:
539        //  Return ! TransformStreamDefaultSinkCloseAlgorithm(stream).
540        // Set stream.[[writable]] to ! CreateWritableStream(startAlgorithm, writeAlgorithm,
541        // closeAlgorithm, abortAlgorithm, writableHighWaterMark, writableSizeAlgorithm).
542        // Note: Those steps are implemented using UnderlyingSinkType::Transform.
543
544        let writable = create_writable_stream(
545            cx,
546            global,
547            writable_high_water_mark,
548            writable_size_algorithm,
549            UnderlyingSinkType::Transform(Dom::from_ref(self), start_promise.clone()),
550            can_gc,
551        )?;
552        self.writable.set(Some(&writable));
553
554        // Let pullAlgorithm be the following steps:
555
556        // Return ! TransformStreamDefaultSourcePullAlgorithm(stream).
557
558        // Let cancelAlgorithm be the following steps, taking a reason argument:
559
560        // Return ! TransformStreamDefaultSourceCancelAlgorithm(stream, reason).
561
562        // Set stream.[[readable]] to ! CreateReadableStream(startAlgorithm, pullAlgorithm,
563        // cancelAlgorithm, readableHighWaterMark, readableSizeAlgorithm).
564
565        let readable = create_readable_stream(
566            global,
567            UnderlyingSourceType::Transform(Dom::from_ref(self), start_promise),
568            Some(readable_size_algorithm),
569            Some(readable_high_water_mark),
570            can_gc,
571        );
572        self.readable.set(Some(&readable));
573
574        // Set stream.[[backpressure]] and stream.[[backpressureChangePromise]] to undefined.
575        // Note: This is done in the constructor.
576
577        // Perform ! TransformStreamSetBackpressure(stream, true).
578        self.set_backpressure(global, true, can_gc);
579
580        // Set stream.[[controller]] to undefined.
581        self.controller.set(None);
582
583        Ok(())
584    }
585
586    /// <https://streams.spec.whatwg.org/#transform-stream-set-backpressure>
587    pub(crate) fn set_backpressure(&self, global: &GlobalScope, backpressure: bool, can_gc: CanGc) {
588        // Assert: stream.[[backpressure]] is not backpressure.
589        assert!(self.backpressure.get() != backpressure);
590
591        // If stream.[[backpressureChangePromise]] is not undefined, resolve
592        // stream.[[backpressureChangePromise]] with undefined.
593        if let Some(promise) = self.backpressure_change_promise.borrow_mut().take() {
594            promise.resolve_native(&(), can_gc);
595        }
596
597        // Set stream.[[backpressureChangePromise]] to a new promise.;
598        *self.backpressure_change_promise.borrow_mut() = Some(Promise::new(global, can_gc));
599
600        // Set stream.[[backpressure]] to backpressure.
601        self.backpressure.set(backpressure);
602    }
603
604    /// <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller>
605    fn set_up_transform_stream_default_controller(
606        &self,
607        controller: &TransformStreamDefaultController,
608    ) {
609        // Assert: stream implements TransformStream.
610        // Note: this is checked with type.
611
612        // Assert: stream.[[controller]] is undefined.
613        assert!(self.controller.get().is_none());
614
615        // Set controller.[[stream]] to stream.
616        controller.set_stream(self);
617
618        // Set stream.[[controller]] to controller.
619        self.controller.set(Some(controller));
620
621        // Set controller.[[transformAlgorithm]] to transformAlgorithm.
622        // Set controller.[[flushAlgorithm]] to flushAlgorithm.
623        // Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
624        // Note: These are set in the constructor.
625    }
626
627    /// <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
628    fn set_up_transform_stream_default_controller_from_transformer(
629        &self,
630        global: &GlobalScope,
631        transformer_obj: SafeHandleObject,
632        transformer: &Transformer,
633        can_gc: CanGc,
634    ) {
635        // Let controller be a new TransformStreamDefaultController.
636        let transformer_type = TransformerType::new_from_js_transformer(transformer);
637        let controller = TransformStreamDefaultController::new(global, transformer_type, can_gc);
638
639        // Let transformAlgorithm be the following steps, taking a chunk argument:
640        // Let result be TransformStreamDefaultControllerEnqueue(controller, chunk).
641        // If result is an abrupt completion, return a promise rejected with result.[[Value]].
642        // Otherwise, return a promise resolved with undefined.
643
644        // Let flushAlgorithm be an algorithm which returns a promise resolved with undefined.
645        // Let cancelAlgorithm be an algorithm which returns a promise resolved with undefined.
646
647        // If transformerDict["transform"] exists, set transformAlgorithm to an algorithm which
648        // takes an argument
649        // chunk and returns the result of invoking transformerDict["transform"] with argument
650        // list « chunk, controller »
651        // and callback this value transformer.
652
653        // If transformerDict["flush"] exists, set flushAlgorithm to an algorithm which returns
654        // the result
655        // of invoking transformerDict["flush"] with argument list « controller » and callback
656        // this value transformer.
657
658        // If transformerDict["cancel"] exists, set cancelAlgorithm to an algorithm which takes an argument
659        // reason and returns the result of invoking transformerDict["cancel"] with argument list « reason »
660        // and callback this value transformer.
661        controller.set_transform_obj(transformer_obj);
662
663        // Perform ! SetUpTransformStreamDefaultController(stream, controller,
664        // transformAlgorithm, flushAlgorithm, cancelAlgorithm).
665        self.set_up_transform_stream_default_controller(&controller);
666    }
667
668    /// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
669    pub(crate) fn transform_stream_default_sink_write_algorithm(
670        &self,
671        cx: &mut js::context::JSContext,
672        global: &GlobalScope,
673        chunk: SafeHandleValue,
674    ) -> Fallible<Rc<Promise>> {
675        // Assert: stream.[[writable]].[[state]] is "writable".
676        assert!(self.writable.get().is_some());
677
678        // Let controller be stream.[[controller]].
679        let controller = self.controller.get().expect("controller is not set");
680
681        // If stream.[[backpressure]] is true,
682        if self.backpressure.get() {
683            // Let backpressureChangePromise be stream.[[backpressureChangePromise]].
684            let backpressure_change_promise = self.backpressure_change_promise.borrow();
685
686            // Assert: backpressureChangePromise is not undefined.
687            assert!(backpressure_change_promise.is_some());
688
689            // Return the result of reacting to backpressureChangePromise with the following fulfillment steps:
690            let result_promise = Promise::new2(cx, global);
691            rooted!(&in(cx) let mut fulfillment_handler = Some(TransformBackPressureChangePromiseFulfillment {
692                controller: Dom::from_ref(&controller),
693                writable: Dom::from_ref(&self.writable.get().expect("writable stream")),
694                chunk: Heap::boxed(chunk.get()),
695                result_promise: result_promise.clone(),
696            }));
697
698            let handler = PromiseNativeHandler::new(
699                global,
700                fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
701                Some(Box::new(BackpressureChangeRejection {
702                    result_promise: result_promise.clone(),
703                })),
704                CanGc::from_cx(cx),
705            );
706            let mut realm = enter_auto_realm(cx, global);
707            let realm = &mut realm.current_realm();
708            let in_realm_proof = realm.into();
709            let comp = InRealm::Already(&in_realm_proof);
710            backpressure_change_promise
711                .as_ref()
712                .expect("Promise must be some by now.")
713                .append_native_handler(&handler, comp, CanGc::from_cx(realm));
714
715            return Ok(result_promise);
716        }
717
718        // Return ! TransformStreamDefaultControllerPerformTransform(controller, chunk).
719        controller.transform_stream_default_controller_perform_transform(cx, global, chunk)
720    }
721
722    /// <https://streams.spec.whatwg.org/#transform-stream-default-sink-abort-algorithm>
723    pub(crate) fn transform_stream_default_sink_abort_algorithm(
724        &self,
725        cx: SafeJSContext,
726        global: &GlobalScope,
727        reason: SafeHandleValue,
728        can_gc: CanGc,
729    ) -> Fallible<Rc<Promise>> {
730        // Let controller be stream.[[controller]].
731        let controller = self.controller.get().expect("controller is not set");
732
733        // If controller.[[finishPromise]] is not undefined, return controller.[[finishPromise]].
734        if let Some(finish_promise) = controller.get_finish_promise() {
735            return Ok(finish_promise);
736        }
737
738        // Let readable be stream.[[readable]].
739        let readable = self.readable.get().expect("readable stream is not set");
740
741        // Let controller.[[finishPromise]] be a new promise.
742        controller.set_finish_promise(Promise::new(global, can_gc));
743
744        // Let cancelPromise be the result of performing controller.[[cancelAlgorithm]], passing reason.
745        let cancel_promise = controller.perform_cancel(cx, global, reason, can_gc)?;
746
747        // Perform ! TransformStreamDefaultControllerClearAlgorithms(controller).
748        controller.clear_algorithms();
749
750        // React to cancelPromise:
751        let handler = PromiseNativeHandler::new(
752            global,
753            Some(Box::new(CancelPromiseFulfillment {
754                readable: Dom::from_ref(&readable),
755                controller: Dom::from_ref(&controller),
756                reason: Heap::boxed(reason.get()),
757            })),
758            Some(Box::new(CancelPromiseRejection {
759                readable: Dom::from_ref(&readable),
760                controller: Dom::from_ref(&controller),
761            })),
762            can_gc,
763        );
764        let realm = enter_realm(global);
765        let comp = InRealm::Entered(&realm);
766        cancel_promise.append_native_handler(&handler, comp, can_gc);
767
768        // Return controller.[[finishPromise]].
769        let finish_promise = controller
770            .get_finish_promise()
771            .expect("finish promise is not set");
772        Ok(finish_promise)
773    }
774
775    /// <https://streams.spec.whatwg.org/#transform-stream-default-sink-close-algorithm>
776    pub(crate) fn transform_stream_default_sink_close_algorithm(
777        &self,
778        cx: &mut js::context::JSContext,
779        global: &GlobalScope,
780    ) -> Fallible<Rc<Promise>> {
781        // Let controller be stream.[[controller]].
782        let controller = self
783            .controller
784            .get()
785            .ok_or(Error::Type(c"controller is not set".to_owned()))?;
786
787        // If controller.[[finishPromise]] is not undefined, return controller.[[finishPromise]].
788        if let Some(finish_promise) = controller.get_finish_promise() {
789            return Ok(finish_promise);
790        }
791
792        // Let readable be stream.[[readable]].
793        let readable = self
794            .readable
795            .get()
796            .ok_or(Error::Type(c"readable stream is not set".to_owned()))?;
797
798        // Let controller.[[finishPromise]] be a new promise.
799        controller.set_finish_promise(Promise::new2(cx, global));
800
801        // Let flushPromise be the result of performing controller.[[flushAlgorithm]].
802        let flush_promise = controller.perform_flush(cx, global)?;
803
804        // Perform ! TransformStreamDefaultControllerClearAlgorithms(controller).
805        controller.clear_algorithms();
806
807        // React to flushPromise:
808        let handler = PromiseNativeHandler::new(
809            global,
810            Some(Box::new(FlushPromiseFulfillment {
811                readable: Dom::from_ref(&readable),
812                controller: Dom::from_ref(&controller),
813            })),
814            Some(Box::new(FlushPromiseRejection {
815                readable: Dom::from_ref(&readable),
816                controller: Dom::from_ref(&controller),
817            })),
818            CanGc::from_cx(cx),
819        );
820
821        let mut realm = enter_auto_realm(cx, global);
822        let realm = &mut realm.current_realm();
823        let in_realm_proof = realm.into();
824        let comp = InRealm::Already(&in_realm_proof);
825        flush_promise.append_native_handler(&handler, comp, CanGc::from_cx(realm));
826        // Return controller.[[finishPromise]].
827        let finish_promise = controller
828            .get_finish_promise()
829            .expect("finish promise is not set");
830        Ok(finish_promise)
831    }
832
833    /// <https://streams.spec.whatwg.org/#transform-stream-default-source-cancel>
834    pub(crate) fn transform_stream_default_source_cancel(
835        &self,
836        cx: SafeJSContext,
837        global: &GlobalScope,
838        reason: SafeHandleValue,
839        can_gc: CanGc,
840    ) -> Fallible<Rc<Promise>> {
841        // Let controller be stream.[[controller]].
842        let controller = self
843            .controller
844            .get()
845            .ok_or(Error::Type(c"controller is not set".to_owned()))?;
846
847        // If controller.[[finishPromise]] is not undefined, return controller.[[finishPromise]].
848        if let Some(finish_promise) = controller.get_finish_promise() {
849            return Ok(finish_promise);
850        }
851
852        // Let writable be stream.[[writable]].
853        let writable = self
854            .writable
855            .get()
856            .ok_or(Error::Type(c"writable stream is not set".to_owned()))?;
857
858        // Let controller.[[finishPromise]] be a new promise.
859        controller.set_finish_promise(Promise::new(global, can_gc));
860
861        // Let cancelPromise be the result of performing controller.[[cancelAlgorithm]], passing reason.
862        let cancel_promise = controller.perform_cancel(cx, global, reason, can_gc)?;
863
864        // Perform ! TransformStreamDefaultControllerClearAlgorithms(controller).
865        controller.clear_algorithms();
866
867        // React to cancelPromise:
868        let handler = PromiseNativeHandler::new(
869            global,
870            Some(Box::new(SourceCancelPromiseFulfillment {
871                writeable: Dom::from_ref(&writable),
872                controller: Dom::from_ref(&controller),
873                stream: Dom::from_ref(self),
874                reason: Heap::boxed(reason.get()),
875            })),
876            Some(Box::new(SourceCancelPromiseRejection {
877                writeable: Dom::from_ref(&writable),
878                controller: Dom::from_ref(&controller),
879                stream: Dom::from_ref(self),
880            })),
881            can_gc,
882        );
883
884        // Return controller.[[finishPromise]].
885        let finish_promise = controller
886            .get_finish_promise()
887            .expect("finish promise is not set");
888        let realm = enter_realm(global);
889        let comp = InRealm::Entered(&realm);
890        cancel_promise.append_native_handler(&handler, comp, can_gc);
891        Ok(finish_promise)
892    }
893
894    /// <https://streams.spec.whatwg.org/#transform-stream-default-source-pull>
895    pub(crate) fn transform_stream_default_source_pull(
896        &self,
897        global: &GlobalScope,
898        can_gc: CanGc,
899    ) -> Fallible<Rc<Promise>> {
900        // Assert: stream.[[backpressure]] is true.
901        assert!(self.backpressure.get());
902
903        // Assert: stream.[[backpressureChangePromise]] is not undefined.
904        assert!(self.backpressure_change_promise.borrow().is_some());
905
906        // Perform ! TransformStreamSetBackpressure(stream, false).
907        self.set_backpressure(global, false, can_gc);
908
909        // Return stream.[[backpressureChangePromise]].
910        Ok(self
911            .backpressure_change_promise
912            .borrow()
913            .clone()
914            .expect("Promise must be some by now."))
915    }
916
917    /// <https://streams.spec.whatwg.org/#transform-stream-error-writable-and-unblock-write>
918    pub(crate) fn error_writable_and_unblock_write(
919        &self,
920        cx: &mut js::context::JSContext,
921        global: &GlobalScope,
922        error: SafeHandleValue,
923    ) {
924        // Perform ! TransformStreamDefaultControllerClearAlgorithms(stream.[[controller]]).
925        self.get_controller().clear_algorithms();
926
927        // Perform ! WritableStreamDefaultControllerErrorIfNeeded(stream.[[writable]].[[controller]], e).
928        self.get_writable()
929            .get_default_controller()
930            .error_if_needed(cx, error, global);
931
932        // Perform ! TransformStreamUnblockWrite(stream).
933        self.unblock_write(global, CanGc::from_cx(cx))
934    }
935
936    /// <https://streams.spec.whatwg.org/#transform-stream-unblock-write>
937    pub(crate) fn unblock_write(&self, global: &GlobalScope, can_gc: CanGc) {
938        // If stream.[[backpressure]] is true, perform ! TransformStreamSetBackpressure(stream, false).
939        if self.backpressure.get() {
940            self.set_backpressure(global, false, can_gc);
941        }
942    }
943
944    /// <https://streams.spec.whatwg.org/#transform-stream-error>
945    pub(crate) fn error(
946        &self,
947        cx: &mut js::context::JSContext,
948        global: &GlobalScope,
949        error: SafeHandleValue,
950    ) {
951        // Perform ! ReadableStreamDefaultControllerError(stream.[[readable]].[[controller]], e).
952        self.get_readable()
953            .get_default_controller()
954            .error(error, CanGc::from_cx(cx));
955
956        // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, e).
957        self.error_writable_and_unblock_write(cx, global, error);
958    }
959}
960
961impl TransformStreamMethods<crate::DomTypeHolder> for TransformStream {
962    /// <https://streams.spec.whatwg.org/#ts-constructor>
963    #[expect(unsafe_code)]
964    fn Constructor(
965        cx: SafeJSContext,
966        global: &GlobalScope,
967        proto: Option<SafeHandleObject>,
968        can_gc: CanGc,
969        transformer: Option<*mut JSObject>,
970        writable_strategy: &QueuingStrategy,
971        readable_strategy: &QueuingStrategy,
972    ) -> Fallible<DomRoot<TransformStream>> {
973        // If transformer is missing, set it to null.
974        rooted!(in(*cx) let transformer_obj = transformer.unwrap_or(ptr::null_mut()));
975
976        // Let underlyingSinkDict be underlyingSink,
977        // converted to an IDL value of type UnderlyingSink.
978        let transformer_dict = if !transformer_obj.is_null() {
979            rooted!(in(*cx) let obj_val = ObjectValue(transformer_obj.get()));
980            match Transformer::new(cx, obj_val.handle(), can_gc) {
981                Ok(ConversionResult::Success(val)) => val,
982                Ok(ConversionResult::Failure(error)) => {
983                    return Err(Error::Type(error.into_owned()));
984                },
985                _ => {
986                    return Err(Error::JSFailed);
987                },
988            }
989        } else {
990            Transformer::empty()
991        };
992
993        // If transformerDict["readableType"] exists, throw a RangeError exception.
994        if !transformer_dict.readableType.handle().is_undefined() {
995            return Err(Error::Range(c"readableType is set".to_owned()));
996        }
997
998        // If transformerDict["writableType"] exists, throw a RangeError exception.
999        if !transformer_dict.writableType.handle().is_undefined() {
1000            return Err(Error::Range(c"writableType is set".to_owned()));
1001        }
1002
1003        // Let readableHighWaterMark be ? ExtractHighWaterMark(readableStrategy, 0).
1004        let readable_high_water_mark = extract_high_water_mark(readable_strategy, 0.0)?;
1005
1006        // Let readableSizeAlgorithm be ! ExtractSizeAlgorithm(readableStrategy).
1007        let readable_size_algorithm = extract_size_algorithm(readable_strategy, can_gc);
1008
1009        // Let writableHighWaterMark be ? ExtractHighWaterMark(writableStrategy, 1).
1010        let writable_high_water_mark = extract_high_water_mark(writable_strategy, 1.0)?;
1011
1012        // Let writableSizeAlgorithm be ! ExtractSizeAlgorithm(writableStrategy).
1013        let writable_size_algorithm = extract_size_algorithm(writable_strategy, can_gc);
1014
1015        // Let startPromise be a new promise.
1016        let start_promise = Promise::new(global, can_gc);
1017
1018        // Perform ! InitializeTransformStream(this, startPromise, writableHighWaterMark,
1019        // writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm).
1020        let stream = TransformStream::new_with_proto(global, proto, can_gc);
1021        stream.initialize(
1022            cx,
1023            global,
1024            start_promise.clone(),
1025            writable_high_water_mark,
1026            writable_size_algorithm,
1027            readable_high_water_mark,
1028            readable_size_algorithm,
1029            can_gc,
1030        )?;
1031
1032        // Perform ? SetUpTransformStreamDefaultControllerFromTransformer(this, transformer, transformerDict).
1033        stream.set_up_transform_stream_default_controller_from_transformer(
1034            global,
1035            transformer_obj.handle(),
1036            &transformer_dict,
1037            can_gc,
1038        );
1039
1040        // If transformerDict["start"] exists, then resolve startPromise with the
1041        // result of invoking transformerDict["start"]
1042        // with argument list « this.[[controller]] » and callback this value transformer.
1043        if let Some(start) = &transformer_dict.start {
1044            rooted!(in(*cx) let mut result_object = ptr::null_mut::<JSObject>());
1045            rooted!(in(*cx) let mut result: JSVal);
1046            rooted!(in(*cx) let this_object = transformer_obj.get());
1047            start.Call_(
1048                &this_object.handle(),
1049                &stream.get_controller(),
1050                result.handle_mut(),
1051                ExceptionHandling::Rethrow,
1052                can_gc,
1053            )?;
1054            let is_promise = unsafe {
1055                if result.is_object() {
1056                    result_object.set(result.to_object());
1057                    IsPromiseObject(result_object.handle().into_handle())
1058                } else {
1059                    false
1060                }
1061            };
1062            let promise = if is_promise {
1063                Promise::new_with_js_promise(result_object.handle(), cx)
1064            } else {
1065                Promise::new_resolved(global, cx, result.get(), can_gc)
1066            };
1067            start_promise.resolve_native(&promise, can_gc);
1068        } else {
1069            // Otherwise, resolve startPromise with undefined.
1070            start_promise.resolve_native(&(), can_gc);
1071        };
1072
1073        Ok(stream)
1074    }
1075
1076    /// <https://streams.spec.whatwg.org/#ts-readable>
1077    fn Readable(&self) -> DomRoot<ReadableStream> {
1078        // Return this.[[readable]].
1079        self.readable.get().expect("readable stream is not set")
1080    }
1081
1082    /// <https://streams.spec.whatwg.org/#ts-writable>
1083    fn Writable(&self) -> DomRoot<WritableStream> {
1084        // Return this.[[writable]].
1085        self.writable.get().expect("writable stream is not set")
1086    }
1087}
1088
1089/// <https://streams.spec.whatwg.org/#ts-transfer>
1090impl Transferable for TransformStream {
1091    type Index = MessagePortIndex;
1092    type Data = TransformStreamData;
1093
1094    /// <https://streams.spec.whatwg.org/#ref-for-transfer-steps②>
1095    fn transfer(
1096        &self,
1097        cx: &mut js::context::JSContext,
1098    ) -> Fallible<(MessagePortId, TransformStreamData)> {
1099        let global = self.global();
1100        let mut realm = enter_auto_realm(cx, &*global);
1101        let mut realm = realm.current_realm();
1102        let cx = &mut realm;
1103
1104        // Step 1. Let readable be value.[[readable]].
1105        let readable = self.get_readable();
1106
1107        // Step 2. Let writable be value.[[writable]].
1108        let writable = self.get_writable();
1109
1110        // Step 3. If ! IsReadableStreamLocked(readable) is true, throw a
1111        // "DataCloneError" DOMException.
1112        // Step 4. If ! IsWritableStreamLocked(writable) is true, throw a
1113        // "DataCloneError" DOMException.
1114        if readable.is_locked() || writable.is_locked() {
1115            return Err(Error::DataClone(None));
1116        }
1117
1118        // First port pair (readable → proxy writable)
1119        let port1 = MessagePort::new(&global, CanGc::from_cx(cx));
1120        global.track_message_port(&port1, None);
1121        let port1_peer = MessagePort::new(&global, CanGc::from_cx(cx));
1122        global.track_message_port(&port1_peer, None);
1123        global.entangle_ports(*port1.message_port_id(), *port1_peer.message_port_id());
1124
1125        let proxy_readable = ReadableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
1126        proxy_readable.setup_cross_realm_transform_readable(cx, &port1);
1127        proxy_readable
1128            .pipe_to(cx, &global, &writable, false, false, false, None)
1129            .set_promise_is_handled();
1130
1131        // Second port pair (proxy readable → writable)
1132        let port2 = MessagePort::new(&global, CanGc::from_cx(cx));
1133        global.track_message_port(&port2, None);
1134        let port2_peer = MessagePort::new(&global, CanGc::from_cx(cx));
1135        global.track_message_port(&port2_peer, None);
1136        global.entangle_ports(*port2.message_port_id(), *port2_peer.message_port_id());
1137
1138        let proxy_writable = WritableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
1139        proxy_writable.setup_cross_realm_transform_writable(cx, &port2);
1140
1141        // Pipe readable into the proxy writable (→ port_1)
1142        readable
1143            .pipe_to(cx, &global, &proxy_writable, false, false, false, None)
1144            .set_promise_is_handled();
1145
1146        // Step 5. Set dataHolder.[[readable]] to !
1147        // StructuredSerializeWithTransfer(readable, « readable »).
1148        // Step 6. Set dataHolder.[[writable]] to !
1149        // StructuredSerializeWithTransfer(writable, « writable »).
1150        Ok((
1151            *port1_peer.message_port_id(),
1152            TransformStreamData {
1153                readable: port1_peer.transfer(cx)?,
1154                writable: port2_peer.transfer(cx)?,
1155            },
1156        ))
1157    }
1158
1159    /// <https://streams.spec.whatwg.org/#ref-for-transfer-receiving-steps②>
1160    fn transfer_receive(
1161        cx: &mut js::context::JSContext,
1162        owner: &GlobalScope,
1163        _id: MessagePortId,
1164        data: TransformStreamData,
1165    ) -> Result<DomRoot<Self>, ()> {
1166        let port1 = MessagePort::transfer_receive(cx, owner, data.readable.0, data.readable.1)?;
1167        let port2 = MessagePort::transfer_receive(cx, owner, data.writable.0, data.writable.1)?;
1168
1169        // Step 1. Let readableRecord be !
1170        // StructuredDeserializeWithTransfer(dataHolder.[[readable]], the
1171        // current Realm).
1172        let proxy_readable = ReadableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1173        proxy_readable.setup_cross_realm_transform_readable(cx, &port2);
1174
1175        // Step 2. Let writableRecord be !
1176        // StructuredDeserializeWithTransfer(dataHolder.[[writable]], the
1177        // current Realm).
1178        let proxy_writable = WritableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1179        proxy_writable.setup_cross_realm_transform_writable(cx, &port1);
1180
1181        // Step 3. Set value.[[readable]] to readableRecord.[[Deserialized]].
1182        // Step 4. Set value.[[writable]] to writableRecord.[[Deserialized]].
1183        // Step 5. Set value.[[backpressure]],
1184        // value.[[backpressureChangePromise]], and value.[[controller]] to
1185        // undefined.
1186        let stream = TransformStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1187        stream.readable.set(Some(&proxy_readable));
1188        stream.writable.set(Some(&proxy_writable));
1189
1190        Ok(stream)
1191    }
1192
1193    fn serialized_storage<'a>(
1194        data: StructuredData<'a, '_>,
1195    ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
1196        match data {
1197            StructuredData::Reader(r) => &mut r.transform_streams_port_impls,
1198            StructuredData::Writer(w) => &mut w.transform_streams_port,
1199        }
1200    }
1201}