script/dom/stream/
transformstream.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::ptr::{self};
7use std::rc::Rc;
8
9use base::id::{MessagePortId, MessagePortIndex};
10use constellation_traits::TransformStreamData;
11use dom_struct::dom_struct;
12use js::jsapi::{Heap, IsPromiseObject, JSObject};
13use js::jsval::{JSVal, ObjectValue, UndefinedValue};
14use js::realm::CurrentRealm;
15use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle};
16use rustc_hash::FxHashMap;
17use script_bindings::callback::ExceptionHandling;
18use script_bindings::realms::InRealm;
19
20use super::readablestream::CrossRealmTransformReadable;
21use super::writablestream::CrossRealmTransformWritable;
22use crate::dom::bindings::cell::DomRefCell;
23use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::{
24    QueuingStrategy, QueuingStrategySize,
25};
26use crate::dom::bindings::codegen::Bindings::TransformStreamBinding::TransformStreamMethods;
27use crate::dom::bindings::codegen::Bindings::TransformerBinding::Transformer;
28use crate::dom::bindings::conversions::ConversionResult;
29use crate::dom::bindings::error::{Error, Fallible};
30use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
31use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
32use crate::dom::bindings::structuredclone::StructuredData;
33use crate::dom::bindings::transferable::Transferable;
34use crate::dom::globalscope::GlobalScope;
35use crate::dom::messageport::MessagePort;
36use crate::dom::promise::Promise;
37use crate::dom::promisenativehandler::Callback;
38use crate::dom::readablestream::{ReadableStream, create_readable_stream};
39use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
40use crate::dom::stream::transformstreamdefaultcontroller::TransformerType;
41use crate::dom::stream::underlyingsourcecontainer::UnderlyingSourceType;
42use crate::dom::stream::writablestream::create_writable_stream;
43use crate::dom::stream::writablestreamdefaultcontroller::UnderlyingSinkType;
44use crate::dom::types::{PromiseNativeHandler, TransformStreamDefaultController, WritableStream};
45use crate::realms::enter_realm;
46use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
47
48impl js::gc::Rootable for TransformBackPressureChangePromiseFulfillment {}
49
50/// Reacting to backpressureChangePromise as part of
51/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
52#[derive(JSTraceable, MallocSizeOf)]
53#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
54struct TransformBackPressureChangePromiseFulfillment {
55    /// The result of reacting to backpressureChangePromise.
56    #[conditional_malloc_size_of]
57    result_promise: Rc<Promise>,
58
59    #[ignore_malloc_size_of = "mozjs"]
60    chunk: Box<Heap<JSVal>>,
61
62    /// The writable used in the fulfillment steps
63    writable: Dom<WritableStream>,
64
65    controller: Dom<TransformStreamDefaultController>,
66}
67
68impl Callback for TransformBackPressureChangePromiseFulfillment {
69    /// Reacting to backpressureChangePromise with the following fulfillment steps:
70    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
71        let can_gc = CanGc::from_cx(cx);
72        let cx: SafeJSContext = cx.into();
73        // Let writable be stream.[[writable]].
74        // Let state be writable.[[state]].
75        // If state is "erroring", throw writable.[[storedError]].
76        if self.writable.is_erroring() {
77            rooted!(in(*cx) let mut error = UndefinedValue());
78            self.writable.get_stored_error(error.handle_mut());
79            self.result_promise.reject(cx, error.handle(), can_gc);
80            return;
81        }
82
83        // Assert: state is "writable".
84        assert!(self.writable.is_writable());
85
86        // Return ! TransformStreamDefaultControllerPerformTransform(controller, chunk).
87        rooted!(in(*cx) let mut chunk = UndefinedValue());
88        chunk.set(self.chunk.get());
89        let transform_result = self
90            .controller
91            .transform_stream_default_controller_perform_transform(
92                cx,
93                &self.writable.global(),
94                chunk.handle(),
95                can_gc,
96            )
97            .expect("perform transform failed");
98
99        // PerformTransformFulfillment and PerformTransformRejection do not need
100        // to be rooted because they only contain an Rc.
101        let handler = PromiseNativeHandler::new(
102            &self.writable.global(),
103            Some(Box::new(PerformTransformFulfillment {
104                result_promise: self.result_promise.clone(),
105            })),
106            Some(Box::new(PerformTransformRejection {
107                result_promise: self.result_promise.clone(),
108            })),
109            can_gc,
110        );
111
112        let realm = enter_realm(&*self.writable.global());
113        let comp = InRealm::Entered(&realm);
114        transform_result.append_native_handler(&handler, comp, can_gc);
115    }
116}
117
118#[derive(JSTraceable, MallocSizeOf)]
119#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
120/// Reacting to fulfillment of performTransform as part of
121/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
122struct PerformTransformFulfillment {
123    #[conditional_malloc_size_of]
124    result_promise: Rc<Promise>,
125}
126
127impl Callback for PerformTransformFulfillment {
128    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
129        let can_gc = CanGc::from_cx(cx);
130        // Fulfilled: resolve the outer promise
131        self.result_promise.resolve_native(&(), can_gc);
132    }
133}
134
135#[derive(JSTraceable, MallocSizeOf)]
136#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
137/// Reacting to rejection of performTransform as part of
138/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
139struct PerformTransformRejection {
140    #[conditional_malloc_size_of]
141    result_promise: Rc<Promise>,
142}
143
144impl Callback for PerformTransformRejection {
145    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
146        let can_gc = CanGc::from_cx(cx);
147        // Stream already errored in perform_transform, just reject result_promise
148        self.result_promise.reject(cx.into(), v, can_gc);
149    }
150}
151
152#[derive(JSTraceable, MallocSizeOf)]
153#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
154/// Reacting to rejection of backpressureChangePromise as part of
155/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
156struct BackpressureChangeRejection {
157    #[conditional_malloc_size_of]
158    result_promise: Rc<Promise>,
159}
160
161impl Callback for BackpressureChangeRejection {
162    fn callback(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) {
163        let can_gc = CanGc::from_cx(cx);
164        self.result_promise.reject(cx.into(), reason, can_gc);
165    }
166}
167
168impl js::gc::Rootable for CancelPromiseFulfillment {}
169
170/// Reacting to fulfillment of the cancelpromise as part of
171/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-abort-algorithm>
172#[derive(JSTraceable, MallocSizeOf)]
173#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
174struct CancelPromiseFulfillment {
175    readable: Dom<ReadableStream>,
176    controller: Dom<TransformStreamDefaultController>,
177    #[ignore_malloc_size_of = "mozjs"]
178    reason: Box<Heap<JSVal>>,
179}
180
181impl Callback for CancelPromiseFulfillment {
182    /// Reacting to backpressureChangePromise with the following fulfillment steps:
183    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
184        let can_gc = CanGc::from_cx(cx);
185        let cx: SafeJSContext = cx.into();
186        // If readable.[[state]] is "errored", reject controller.[[finishPromise]] with readable.[[storedError]].
187        if self.readable.is_errored() {
188            rooted!(in(*cx) let mut error = UndefinedValue());
189            self.readable.get_stored_error(error.handle_mut());
190            self.controller
191                .get_finish_promise()
192                .expect("finish promise is not set")
193                .reject_native(&error.handle(), can_gc);
194        } else {
195            // Otherwise:
196            // Perform ! ReadableStreamDefaultControllerError(readable.[[controller]], reason).
197            rooted!(in(*cx) let mut reason = UndefinedValue());
198            reason.set(self.reason.get());
199            self.readable
200                .get_default_controller()
201                .error(reason.handle(), can_gc);
202
203            // Resolve controller.[[finishPromise]] with undefined.
204            self.controller
205                .get_finish_promise()
206                .expect("finish promise is not set")
207                .resolve_native(&(), can_gc);
208        }
209    }
210}
211
212impl js::gc::Rootable for CancelPromiseRejection {}
213
214/// Reacting to rejection of cancelpromise as part of
215/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-abort-algorithm>
216#[derive(JSTraceable, MallocSizeOf)]
217#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
218struct CancelPromiseRejection {
219    readable: Dom<ReadableStream>,
220    controller: Dom<TransformStreamDefaultController>,
221}
222
223impl Callback for CancelPromiseRejection {
224    /// Reacting to backpressureChangePromise with the following fulfillment steps:
225    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
226        let can_gc = CanGc::from_cx(cx);
227        let cx: SafeJSContext = cx.into();
228        // Perform ! ReadableStreamDefaultControllerError(readable.[[controller]], r).
229        self.readable.get_default_controller().error(v, can_gc);
230
231        // Reject controller.[[finishPromise]] with r.
232        self.controller
233            .get_finish_promise()
234            .expect("finish promise is not set")
235            .reject(cx, v, can_gc);
236    }
237}
238
239impl js::gc::Rootable for SourceCancelPromiseFulfillment {}
240
241/// Reacting to fulfillment of the cancelpromise as part of
242/// <https://streams.spec.whatwg.org/#transform-stream-default-source-cancel>
243#[derive(JSTraceable, MallocSizeOf)]
244#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
245struct SourceCancelPromiseFulfillment {
246    writeable: Dom<WritableStream>,
247    controller: Dom<TransformStreamDefaultController>,
248    stream: Dom<TransformStream>,
249    #[ignore_malloc_size_of = "mozjs"]
250    reason: Box<Heap<JSVal>>,
251}
252
253impl Callback for SourceCancelPromiseFulfillment {
254    /// Reacting to backpressureChangePromise with the following fulfillment steps:
255    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
256        let can_gc = CanGc::from_cx(cx);
257        let cx: SafeJSContext = cx.into();
258        // If cancelPromise was fulfilled, then:
259        let finish_promise = self
260            .controller
261            .get_finish_promise()
262            .expect("finish promise is not set");
263
264        let global = &self.writeable.global();
265        // If writable.[[state]] is "errored", reject controller.[[finishPromise]] with writable.[[storedError]].
266        if self.writeable.is_errored() {
267            rooted!(in(*cx) let mut error = UndefinedValue());
268            self.writeable.get_stored_error(error.handle_mut());
269            finish_promise.reject(cx, error.handle(), can_gc);
270        } else {
271            // Otherwise:
272            // Perform ! WritableStreamDefaultControllerErrorIfNeeded(writable.[[controller]], reason).
273            rooted!(in(*cx) let mut reason = UndefinedValue());
274            reason.set(self.reason.get());
275            self.writeable.get_default_controller().error_if_needed(
276                cx,
277                reason.handle(),
278                global,
279                can_gc,
280            );
281
282            // Perform ! TransformStreamUnblockWrite(stream).
283            self.stream.unblock_write(global, can_gc);
284
285            // Resolve controller.[[finishPromise]] with undefined.
286            finish_promise.resolve_native(&(), can_gc);
287        }
288    }
289}
290
291impl js::gc::Rootable for SourceCancelPromiseRejection {}
292
293/// Reacting to rejection of cancelpromise as part of
294/// <https://streams.spec.whatwg.org/#transform-stream-default-source-cancel>
295#[derive(JSTraceable, MallocSizeOf)]
296#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
297struct SourceCancelPromiseRejection {
298    writeable: Dom<WritableStream>,
299    controller: Dom<TransformStreamDefaultController>,
300    stream: Dom<TransformStream>,
301}
302
303impl Callback for SourceCancelPromiseRejection {
304    /// Reacting to backpressureChangePromise with the following fulfillment steps:
305    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
306        let can_gc = CanGc::from_cx(cx);
307        let cx: SafeJSContext = cx.into();
308        // Perform ! WritableStreamDefaultControllerErrorIfNeeded(writable.[[controller]], r).
309        let global = &self.writeable.global();
310
311        self.writeable
312            .get_default_controller()
313            .error_if_needed(cx, v, global, can_gc);
314
315        // Perform ! TransformStreamUnblockWrite(stream).
316        self.stream.unblock_write(global, can_gc);
317
318        // Reject controller.[[finishPromise]] with r.
319        self.controller
320            .get_finish_promise()
321            .expect("finish promise is not set")
322            .reject(cx, v, can_gc);
323    }
324}
325
326impl js::gc::Rootable for FlushPromiseFulfillment {}
327
328/// Reacting to fulfillment of the flushpromise as part of
329/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-close-algorithm>
330#[derive(JSTraceable, MallocSizeOf)]
331#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
332struct FlushPromiseFulfillment {
333    readable: Dom<ReadableStream>,
334    controller: Dom<TransformStreamDefaultController>,
335}
336
337impl Callback for FlushPromiseFulfillment {
338    /// Reacting to flushpromise with the following fulfillment steps:
339    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
340        let can_gc = CanGc::from_cx(cx);
341        let cx: SafeJSContext = cx.into();
342        // If flushPromise was fulfilled, then:
343        let finish_promise = self
344            .controller
345            .get_finish_promise()
346            .expect("finish promise is not set");
347
348        // If readable.[[state]] is "errored", reject controller.[[finishPromise]] with readable.[[storedError]].
349        if self.readable.is_errored() {
350            rooted!(in(*cx) let mut error = UndefinedValue());
351            self.readable.get_stored_error(error.handle_mut());
352            finish_promise.reject(cx, error.handle(), can_gc);
353        } else {
354            // Otherwise:
355            // Perform ! ReadableStreamDefaultControllerClose(readable.[[controller]]).
356            self.readable.get_default_controller().close(can_gc);
357
358            // Resolve controller.[[finishPromise]] with undefined.
359            finish_promise.resolve_native(&(), can_gc);
360        }
361    }
362}
363
364impl js::gc::Rootable for FlushPromiseRejection {}
365/// Reacting to rejection of flushpromise as part of
366/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-close-algorithm>
367
368#[derive(JSTraceable, MallocSizeOf)]
369#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
370struct FlushPromiseRejection {
371    readable: Dom<ReadableStream>,
372    controller: Dom<TransformStreamDefaultController>,
373}
374
375impl Callback for FlushPromiseRejection {
376    /// Reacting to flushpromise with the following fulfillment steps:
377    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
378        let can_gc = CanGc::from_cx(cx);
379        let cx: SafeJSContext = cx.into();
380        // If flushPromise was rejected with reason r, then:
381        // Perform ! ReadableStreamDefaultControllerError(readable.[[controller]], r).
382        self.readable.get_default_controller().error(v, can_gc);
383
384        // Reject controller.[[finishPromise]] with r.
385        self.controller
386            .get_finish_promise()
387            .expect("finish promise is not set")
388            .reject(cx, v, can_gc);
389    }
390}
391
392impl js::gc::Rootable for CrossRealmTransform {}
393
394/// A wrapper to handle `message` and `messageerror` events
395/// for the message port used by the transfered stream.
396#[derive(Clone, JSTraceable, MallocSizeOf)]
397#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
398pub(crate) enum CrossRealmTransform {
399    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
400    Readable(CrossRealmTransformReadable),
401    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
402    Writable(CrossRealmTransformWritable),
403}
404
405/// <https://streams.spec.whatwg.org/#ts-class>
406#[dom_struct]
407pub struct TransformStream {
408    reflector_: Reflector,
409
410    /// <https://streams.spec.whatwg.org/#transformstream-backpressure>
411    backpressure: Cell<bool>,
412
413    /// <https://streams.spec.whatwg.org/#transformstream-backpressurechangepromise>
414    #[conditional_malloc_size_of]
415    backpressure_change_promise: DomRefCell<Option<Rc<Promise>>>,
416
417    /// <https://streams.spec.whatwg.org/#transformstream-controller>
418    controller: MutNullableDom<TransformStreamDefaultController>,
419
420    /// <https://streams.spec.whatwg.org/#transformstream-detached>
421    detached: Cell<bool>,
422
423    /// <https://streams.spec.whatwg.org/#transformstream-readable>
424    readable: MutNullableDom<ReadableStream>,
425
426    /// <https://streams.spec.whatwg.org/#transformstream-writable>
427    writable: MutNullableDom<WritableStream>,
428}
429
430impl TransformStream {
431    /// <https://streams.spec.whatwg.org/#initialize-transform-stream>
432    fn new_inherited() -> TransformStream {
433        TransformStream {
434            reflector_: Reflector::new(),
435            backpressure: Default::default(),
436            backpressure_change_promise: DomRefCell::new(None),
437            controller: MutNullableDom::new(None),
438            detached: Cell::new(false),
439            readable: MutNullableDom::new(None),
440            writable: MutNullableDom::new(None),
441        }
442    }
443
444    pub(crate) fn new_with_proto(
445        global: &GlobalScope,
446        proto: Option<SafeHandleObject>,
447        can_gc: CanGc,
448    ) -> DomRoot<TransformStream> {
449        reflect_dom_object_with_proto(
450            Box::new(TransformStream::new_inherited()),
451            global,
452            proto,
453            can_gc,
454        )
455    }
456
457    /// Creates and set up the newly created transform stream following
458    /// <https://streams.spec.whatwg.org/#transformstream-set-up>
459    pub(crate) fn set_up(
460        &self,
461        cx: SafeJSContext,
462        global: &GlobalScope,
463        transformer_type: TransformerType,
464        can_gc: CanGc,
465    ) -> Fallible<()> {
466        // Step1. Let writableHighWaterMark be 1.
467        let writable_high_water_mark = 1.0;
468
469        // Step 2. Let writableSizeAlgorithm be an algorithm that returns 1.
470        let writable_size_algorithm = extract_size_algorithm(&Default::default(), can_gc);
471
472        // Step 3. Let readableHighWaterMark be 0.
473        let readable_high_water_mark = 0.0;
474
475        // Step 4. Let readableSizeAlgorithm be an algorithm that returns 1.
476        let readable_size_algorithm = extract_size_algorithm(&Default::default(), can_gc);
477
478        // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
479        // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
480        // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
481        // NOTE: These steps are implemented in `TransformStreamDefaultController::new`
482
483        // Step 8. Let startPromise be a promise resolved with undefined.
484        let start_promise = Promise::new_resolved(global, cx, (), can_gc);
485
486        // Step 9. Perform ! InitializeTransformStream(stream, startPromise,
487        // writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark,
488        // readableSizeAlgorithm).
489        self.initialize(
490            cx,
491            global,
492            start_promise.clone(),
493            writable_high_water_mark,
494            writable_size_algorithm,
495            readable_high_water_mark,
496            readable_size_algorithm,
497            can_gc,
498        )?;
499
500        // Step 10. Let controller be a new TransformStreamDefaultController.
501        let controller = TransformStreamDefaultController::new(global, transformer_type, can_gc);
502
503        // Step 11. Perform ! SetUpTransformStreamDefaultController(stream,
504        // controller, transformAlgorithmWrapper, flushAlgorithmWrapper,
505        // cancelAlgorithmWrapper).
506        self.set_up_transform_stream_default_controller(&controller);
507
508        Ok(())
509    }
510
511    pub(crate) fn get_controller(&self) -> DomRoot<TransformStreamDefaultController> {
512        self.controller.get().expect("controller is not set")
513    }
514
515    pub(crate) fn get_writable(&self) -> DomRoot<WritableStream> {
516        self.writable.get().expect("writable stream is not set")
517    }
518
519    pub(crate) fn get_readable(&self) -> DomRoot<ReadableStream> {
520        self.readable.get().expect("readable stream is not set")
521    }
522
523    pub(crate) fn get_backpressure(&self) -> bool {
524        self.backpressure.get()
525    }
526
527    /// <https://streams.spec.whatwg.org/#initialize-transform-stream>
528    #[allow(clippy::too_many_arguments)]
529    fn initialize(
530        &self,
531        cx: SafeJSContext,
532        global: &GlobalScope,
533        start_promise: Rc<Promise>,
534        writable_high_water_mark: f64,
535        writable_size_algorithm: Rc<QueuingStrategySize>,
536        readable_high_water_mark: f64,
537        readable_size_algorithm: Rc<QueuingStrategySize>,
538        can_gc: CanGc,
539    ) -> Fallible<()> {
540        // Let startAlgorithm be an algorithm that returns startPromise.
541        // Let writeAlgorithm be the following steps, taking a chunk argument:
542        //  Return ! TransformStreamDefaultSinkWriteAlgorithm(stream, chunk).
543        // Let abortAlgorithm be the following steps, taking a reason argument:
544        //  Return ! TransformStreamDefaultSinkAbortAlgorithm(stream, reason).
545        // Let closeAlgorithm be the following steps:
546        //  Return ! TransformStreamDefaultSinkCloseAlgorithm(stream).
547        // Set stream.[[writable]] to ! CreateWritableStream(startAlgorithm, writeAlgorithm,
548        // closeAlgorithm, abortAlgorithm, writableHighWaterMark, writableSizeAlgorithm).
549        // Note: Those steps are implemented using UnderlyingSinkType::Transform.
550
551        let writable = create_writable_stream(
552            cx,
553            global,
554            writable_high_water_mark,
555            writable_size_algorithm,
556            UnderlyingSinkType::Transform(Dom::from_ref(self), start_promise.clone()),
557            can_gc,
558        )?;
559        self.writable.set(Some(&writable));
560
561        // Let pullAlgorithm be the following steps:
562
563        // Return ! TransformStreamDefaultSourcePullAlgorithm(stream).
564
565        // Let cancelAlgorithm be the following steps, taking a reason argument:
566
567        // Return ! TransformStreamDefaultSourceCancelAlgorithm(stream, reason).
568
569        // Set stream.[[readable]] to ! CreateReadableStream(startAlgorithm, pullAlgorithm,
570        // cancelAlgorithm, readableHighWaterMark, readableSizeAlgorithm).
571
572        let readable = create_readable_stream(
573            global,
574            UnderlyingSourceType::Transform(Dom::from_ref(self), start_promise.clone()),
575            Some(readable_size_algorithm),
576            Some(readable_high_water_mark),
577            can_gc,
578        );
579        self.readable.set(Some(&readable));
580
581        // Set stream.[[backpressure]] and stream.[[backpressureChangePromise]] to undefined.
582        // Note: This is done in the constructor.
583
584        // Perform ! TransformStreamSetBackpressure(stream, true).
585        self.set_backpressure(global, true, can_gc);
586
587        // Set stream.[[controller]] to undefined.
588        self.controller.set(None);
589
590        Ok(())
591    }
592
593    /// <https://streams.spec.whatwg.org/#transform-stream-set-backpressure>
594    pub(crate) fn set_backpressure(&self, global: &GlobalScope, backpressure: bool, can_gc: CanGc) {
595        // Assert: stream.[[backpressure]] is not backpressure.
596        assert!(self.backpressure.get() != backpressure);
597
598        // If stream.[[backpressureChangePromise]] is not undefined, resolve
599        // stream.[[backpressureChangePromise]] with undefined.
600        if let Some(promise) = self.backpressure_change_promise.borrow_mut().take() {
601            promise.resolve_native(&(), can_gc);
602        }
603
604        // Set stream.[[backpressureChangePromise]] to a new promise.;
605        *self.backpressure_change_promise.borrow_mut() = Some(Promise::new(global, can_gc));
606
607        // Set stream.[[backpressure]] to backpressure.
608        self.backpressure.set(backpressure);
609    }
610
611    /// <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller>
612    fn set_up_transform_stream_default_controller(
613        &self,
614        controller: &TransformStreamDefaultController,
615    ) {
616        // Assert: stream implements TransformStream.
617        // Note: this is checked with type.
618
619        // Assert: stream.[[controller]] is undefined.
620        assert!(self.controller.get().is_none());
621
622        // Set controller.[[stream]] to stream.
623        controller.set_stream(self);
624
625        // Set stream.[[controller]] to controller.
626        self.controller.set(Some(controller));
627
628        // Set controller.[[transformAlgorithm]] to transformAlgorithm.
629        // Set controller.[[flushAlgorithm]] to flushAlgorithm.
630        // Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
631        // Note: These are set in the constructor.
632    }
633
634    /// <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
635    fn set_up_transform_stream_default_controller_from_transformer(
636        &self,
637        global: &GlobalScope,
638        transformer_obj: SafeHandleObject,
639        transformer: &Transformer,
640        can_gc: CanGc,
641    ) {
642        // Let controller be a new TransformStreamDefaultController.
643        let transformer_type = TransformerType::new_from_js_transformer(transformer);
644        let controller = TransformStreamDefaultController::new(global, transformer_type, can_gc);
645
646        // Let transformAlgorithm be the following steps, taking a chunk argument:
647        // Let result be TransformStreamDefaultControllerEnqueue(controller, chunk).
648        // If result is an abrupt completion, return a promise rejected with result.[[Value]].
649        // Otherwise, return a promise resolved with undefined.
650
651        // Let flushAlgorithm be an algorithm which returns a promise resolved with undefined.
652        // Let cancelAlgorithm be an algorithm which returns a promise resolved with undefined.
653
654        // If transformerDict["transform"] exists, set transformAlgorithm to an algorithm which
655        // takes an argument
656        // chunk and returns the result of invoking transformerDict["transform"] with argument
657        // list « chunk, controller »
658        // and callback this value transformer.
659
660        // If transformerDict["flush"] exists, set flushAlgorithm to an algorithm which returns
661        // the result
662        // of invoking transformerDict["flush"] with argument list « controller » and callback
663        // this value transformer.
664
665        // If transformerDict["cancel"] exists, set cancelAlgorithm to an algorithm which takes an argument
666        // reason and returns the result of invoking transformerDict["cancel"] with argument list « reason »
667        // and callback this value transformer.
668        controller.set_transform_obj(transformer_obj);
669
670        // Perform ! SetUpTransformStreamDefaultController(stream, controller,
671        // transformAlgorithm, flushAlgorithm, cancelAlgorithm).
672        self.set_up_transform_stream_default_controller(&controller);
673    }
674
675    /// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
676    pub(crate) fn transform_stream_default_sink_write_algorithm(
677        &self,
678        cx: SafeJSContext,
679        global: &GlobalScope,
680        chunk: SafeHandleValue,
681        can_gc: CanGc,
682    ) -> Fallible<Rc<Promise>> {
683        // Assert: stream.[[writable]].[[state]] is "writable".
684        assert!(self.writable.get().is_some());
685
686        // Let controller be stream.[[controller]].
687        let controller = self.controller.get().expect("controller is not set");
688
689        // If stream.[[backpressure]] is true,
690        if self.backpressure.get() {
691            // Let backpressureChangePromise be stream.[[backpressureChangePromise]].
692            let backpressure_change_promise = self.backpressure_change_promise.borrow();
693
694            // Assert: backpressureChangePromise is not undefined.
695            assert!(backpressure_change_promise.is_some());
696
697            // Return the result of reacting to backpressureChangePromise with the following fulfillment steps:
698            let result_promise = Promise::new(global, can_gc);
699            rooted!(in(*cx) let mut fulfillment_handler = Some(TransformBackPressureChangePromiseFulfillment {
700                controller: Dom::from_ref(&controller),
701                writable: Dom::from_ref(&self.writable.get().expect("writable stream")),
702                chunk: Heap::boxed(chunk.get()),
703                result_promise: result_promise.clone(),
704            }));
705
706            let handler = PromiseNativeHandler::new(
707                global,
708                fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
709                Some(Box::new(BackpressureChangeRejection {
710                    result_promise: result_promise.clone(),
711                })),
712                can_gc,
713            );
714            let realm = enter_realm(global);
715            let comp = InRealm::Entered(&realm);
716            backpressure_change_promise
717                .as_ref()
718                .expect("Promise must be some by now.")
719                .append_native_handler(&handler, comp, can_gc);
720
721            return Ok(result_promise);
722        }
723
724        // Return ! TransformStreamDefaultControllerPerformTransform(controller, chunk).
725        controller.transform_stream_default_controller_perform_transform(cx, global, chunk, can_gc)
726    }
727
728    /// <https://streams.spec.whatwg.org/#transform-stream-default-sink-abort-algorithm>
729    pub(crate) fn transform_stream_default_sink_abort_algorithm(
730        &self,
731        cx: SafeJSContext,
732        global: &GlobalScope,
733        reason: SafeHandleValue,
734        can_gc: CanGc,
735    ) -> Fallible<Rc<Promise>> {
736        // Let controller be stream.[[controller]].
737        let controller = self.controller.get().expect("controller is not set");
738
739        // If controller.[[finishPromise]] is not undefined, return controller.[[finishPromise]].
740        if let Some(finish_promise) = controller.get_finish_promise() {
741            return Ok(finish_promise);
742        }
743
744        // Let readable be stream.[[readable]].
745        let readable = self.readable.get().expect("readable stream is not set");
746
747        // Let controller.[[finishPromise]] be a new promise.
748        controller.set_finish_promise(Promise::new(global, can_gc));
749
750        // Let cancelPromise be the result of performing controller.[[cancelAlgorithm]], passing reason.
751        let cancel_promise = controller.perform_cancel(cx, global, reason, can_gc)?;
752
753        // Perform ! TransformStreamDefaultControllerClearAlgorithms(controller).
754        controller.clear_algorithms();
755
756        // React to cancelPromise:
757        let handler = PromiseNativeHandler::new(
758            global,
759            Some(Box::new(CancelPromiseFulfillment {
760                readable: Dom::from_ref(&readable),
761                controller: Dom::from_ref(&controller),
762                reason: Heap::boxed(reason.get()),
763            })),
764            Some(Box::new(CancelPromiseRejection {
765                readable: Dom::from_ref(&readable),
766                controller: Dom::from_ref(&controller),
767            })),
768            can_gc,
769        );
770        let realm = enter_realm(global);
771        let comp = InRealm::Entered(&realm);
772        cancel_promise.append_native_handler(&handler, comp, can_gc);
773
774        // Return controller.[[finishPromise]].
775        let finish_promise = controller
776            .get_finish_promise()
777            .expect("finish promise is not set");
778        Ok(finish_promise)
779    }
780
781    /// <https://streams.spec.whatwg.org/#transform-stream-default-sink-close-algorithm>
782    pub(crate) fn transform_stream_default_sink_close_algorithm(
783        &self,
784        cx: SafeJSContext,
785        global: &GlobalScope,
786        can_gc: CanGc,
787    ) -> Fallible<Rc<Promise>> {
788        // Let controller be stream.[[controller]].
789        let controller = self
790            .controller
791            .get()
792            .ok_or(Error::Type("controller is not set".to_string()))?;
793
794        // If controller.[[finishPromise]] is not undefined, return controller.[[finishPromise]].
795        if let Some(finish_promise) = controller.get_finish_promise() {
796            return Ok(finish_promise);
797        }
798
799        // Let readable be stream.[[readable]].
800        let readable = self
801            .readable
802            .get()
803            .ok_or(Error::Type("readable stream is not set".to_string()))?;
804
805        // Let controller.[[finishPromise]] be a new promise.
806        controller.set_finish_promise(Promise::new(global, can_gc));
807
808        // Let flushPromise be the result of performing controller.[[flushAlgorithm]].
809        let flush_promise = controller.perform_flush(cx, global, can_gc)?;
810
811        // Perform ! TransformStreamDefaultControllerClearAlgorithms(controller).
812        controller.clear_algorithms();
813
814        // React to flushPromise:
815        let handler = PromiseNativeHandler::new(
816            global,
817            Some(Box::new(FlushPromiseFulfillment {
818                readable: Dom::from_ref(&readable),
819                controller: Dom::from_ref(&controller),
820            })),
821            Some(Box::new(FlushPromiseRejection {
822                readable: Dom::from_ref(&readable),
823                controller: Dom::from_ref(&controller),
824            })),
825            can_gc,
826        );
827
828        let realm = enter_realm(global);
829        let comp = InRealm::Entered(&realm);
830        flush_promise.append_native_handler(&handler, comp, can_gc);
831        // Return controller.[[finishPromise]].
832        let finish_promise = controller
833            .get_finish_promise()
834            .expect("finish promise is not set");
835        Ok(finish_promise)
836    }
837
838    /// <https://streams.spec.whatwg.org/#transform-stream-default-source-cancel>
839    pub(crate) fn transform_stream_default_source_cancel(
840        &self,
841        cx: SafeJSContext,
842        global: &GlobalScope,
843        reason: SafeHandleValue,
844        can_gc: CanGc,
845    ) -> Fallible<Rc<Promise>> {
846        // Let controller be stream.[[controller]].
847        let controller = self
848            .controller
849            .get()
850            .ok_or(Error::Type("controller is not set".to_string()))?;
851
852        // If controller.[[finishPromise]] is not undefined, return controller.[[finishPromise]].
853        if let Some(finish_promise) = controller.get_finish_promise() {
854            return Ok(finish_promise);
855        }
856
857        // Let writable be stream.[[writable]].
858        let writable = self
859            .writable
860            .get()
861            .ok_or(Error::Type("writable stream is not set".to_string()))?;
862
863        // Let controller.[[finishPromise]] be a new promise.
864        controller.set_finish_promise(Promise::new(global, can_gc));
865
866        // Let cancelPromise be the result of performing controller.[[cancelAlgorithm]], passing reason.
867        let cancel_promise = controller.perform_cancel(cx, global, reason, can_gc)?;
868
869        // Perform ! TransformStreamDefaultControllerClearAlgorithms(controller).
870        controller.clear_algorithms();
871
872        // React to cancelPromise:
873        let handler = PromiseNativeHandler::new(
874            global,
875            Some(Box::new(SourceCancelPromiseFulfillment {
876                writeable: Dom::from_ref(&writable),
877                controller: Dom::from_ref(&controller),
878                stream: Dom::from_ref(self),
879                reason: Heap::boxed(reason.get()),
880            })),
881            Some(Box::new(SourceCancelPromiseRejection {
882                writeable: Dom::from_ref(&writable),
883                controller: Dom::from_ref(&controller),
884                stream: Dom::from_ref(self),
885            })),
886            can_gc,
887        );
888
889        // Return controller.[[finishPromise]].
890        let finish_promise = controller
891            .get_finish_promise()
892            .expect("finish promise is not set");
893        let realm = enter_realm(global);
894        let comp = InRealm::Entered(&realm);
895        cancel_promise.append_native_handler(&handler, comp, can_gc);
896        Ok(finish_promise)
897    }
898
899    /// <https://streams.spec.whatwg.org/#transform-stream-default-source-pull>
900    pub(crate) fn transform_stream_default_source_pull(
901        &self,
902        global: &GlobalScope,
903        can_gc: CanGc,
904    ) -> Fallible<Rc<Promise>> {
905        // Assert: stream.[[backpressure]] is true.
906        assert!(self.backpressure.get());
907
908        // Assert: stream.[[backpressureChangePromise]] is not undefined.
909        assert!(self.backpressure_change_promise.borrow().is_some());
910
911        // Perform ! TransformStreamSetBackpressure(stream, false).
912        self.set_backpressure(global, false, can_gc);
913
914        // Return stream.[[backpressureChangePromise]].
915        Ok(self
916            .backpressure_change_promise
917            .borrow()
918            .clone()
919            .expect("Promise must be some by now."))
920    }
921
922    /// <https://streams.spec.whatwg.org/#transform-stream-error-writable-and-unblock-write>
923    pub(crate) fn error_writable_and_unblock_write(
924        &self,
925        cx: SafeJSContext,
926        global: &GlobalScope,
927        error: SafeHandleValue,
928        can_gc: CanGc,
929    ) {
930        // Perform ! TransformStreamDefaultControllerClearAlgorithms(stream.[[controller]]).
931        self.get_controller().clear_algorithms();
932
933        // Perform ! WritableStreamDefaultControllerErrorIfNeeded(stream.[[writable]].[[controller]], e).
934        self.get_writable()
935            .get_default_controller()
936            .error_if_needed(cx, error, global, can_gc);
937
938        // Perform ! TransformStreamUnblockWrite(stream).
939        self.unblock_write(global, can_gc)
940    }
941
942    /// <https://streams.spec.whatwg.org/#transform-stream-unblock-write>
943    pub(crate) fn unblock_write(&self, global: &GlobalScope, can_gc: CanGc) {
944        // If stream.[[backpressure]] is true, perform ! TransformStreamSetBackpressure(stream, false).
945        if self.backpressure.get() {
946            self.set_backpressure(global, false, can_gc);
947        }
948    }
949
950    /// <https://streams.spec.whatwg.org/#transform-stream-error>
951    pub(crate) fn error(
952        &self,
953        cx: SafeJSContext,
954        global: &GlobalScope,
955        error: SafeHandleValue,
956        can_gc: CanGc,
957    ) {
958        // Perform ! ReadableStreamDefaultControllerError(stream.[[readable]].[[controller]], e).
959        self.get_readable()
960            .get_default_controller()
961            .error(error, can_gc);
962
963        // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, e).
964        self.error_writable_and_unblock_write(cx, global, error, can_gc);
965    }
966}
967
968impl TransformStreamMethods<crate::DomTypeHolder> for TransformStream {
969    /// <https://streams.spec.whatwg.org/#ts-constructor>
970    #[expect(unsafe_code)]
971    fn Constructor(
972        cx: SafeJSContext,
973        global: &GlobalScope,
974        proto: Option<SafeHandleObject>,
975        can_gc: CanGc,
976        transformer: Option<*mut JSObject>,
977        writable_strategy: &QueuingStrategy,
978        readable_strategy: &QueuingStrategy,
979    ) -> Fallible<DomRoot<TransformStream>> {
980        // If transformer is missing, set it to null.
981        rooted!(in(*cx) let transformer_obj = transformer.unwrap_or(ptr::null_mut()));
982
983        // Let underlyingSinkDict be underlyingSink,
984        // converted to an IDL value of type UnderlyingSink.
985        let transformer_dict = if !transformer_obj.is_null() {
986            rooted!(in(*cx) let obj_val = ObjectValue(transformer_obj.get()));
987            match Transformer::new(cx, obj_val.handle(), can_gc) {
988                Ok(ConversionResult::Success(val)) => val,
989                Ok(ConversionResult::Failure(error)) => return Err(Error::Type(error.to_string())),
990                _ => {
991                    return Err(Error::JSFailed);
992                },
993            }
994        } else {
995            Transformer::empty()
996        };
997
998        // If transformerDict["readableType"] exists, throw a RangeError exception.
999        if !transformer_dict.readableType.handle().is_undefined() {
1000            return Err(Error::Range("readableType is set".to_string()));
1001        }
1002
1003        // If transformerDict["writableType"] exists, throw a RangeError exception.
1004        if !transformer_dict.writableType.handle().is_undefined() {
1005            return Err(Error::Range("writableType is set".to_string()));
1006        }
1007
1008        // Let readableHighWaterMark be ? ExtractHighWaterMark(readableStrategy, 0).
1009        let readable_high_water_mark = extract_high_water_mark(readable_strategy, 0.0)?;
1010
1011        // Let readableSizeAlgorithm be ! ExtractSizeAlgorithm(readableStrategy).
1012        let readable_size_algorithm = extract_size_algorithm(readable_strategy, can_gc);
1013
1014        // Let writableHighWaterMark be ? ExtractHighWaterMark(writableStrategy, 1).
1015        let writable_high_water_mark = extract_high_water_mark(writable_strategy, 1.0)?;
1016
1017        // Let writableSizeAlgorithm be ! ExtractSizeAlgorithm(writableStrategy).
1018        let writable_size_algorithm = extract_size_algorithm(writable_strategy, can_gc);
1019
1020        // Let startPromise be a new promise.
1021        let start_promise = Promise::new(global, can_gc);
1022
1023        // Perform ! InitializeTransformStream(this, startPromise, writableHighWaterMark,
1024        // writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm).
1025        let stream = TransformStream::new_with_proto(global, proto, can_gc);
1026        stream.initialize(
1027            cx,
1028            global,
1029            start_promise.clone(),
1030            writable_high_water_mark,
1031            writable_size_algorithm,
1032            readable_high_water_mark,
1033            readable_size_algorithm,
1034            can_gc,
1035        )?;
1036
1037        // Perform ? SetUpTransformStreamDefaultControllerFromTransformer(this, transformer, transformerDict).
1038        stream.set_up_transform_stream_default_controller_from_transformer(
1039            global,
1040            transformer_obj.handle(),
1041            &transformer_dict,
1042            can_gc,
1043        );
1044
1045        // If transformerDict["start"] exists, then resolve startPromise with the
1046        // result of invoking transformerDict["start"]
1047        // with argument list « this.[[controller]] » and callback this value transformer.
1048        if let Some(start) = &transformer_dict.start {
1049            rooted!(in(*cx) let mut result_object = ptr::null_mut::<JSObject>());
1050            rooted!(in(*cx) let mut result: JSVal);
1051            rooted!(in(*cx) let this_object = transformer_obj.get());
1052            start.Call_(
1053                &this_object.handle(),
1054                &stream.get_controller(),
1055                result.handle_mut(),
1056                ExceptionHandling::Rethrow,
1057                can_gc,
1058            )?;
1059            let is_promise = unsafe {
1060                if result.is_object() {
1061                    result_object.set(result.to_object());
1062                    IsPromiseObject(result_object.handle().into_handle())
1063                } else {
1064                    false
1065                }
1066            };
1067            let promise = if is_promise {
1068                Promise::new_with_js_promise(result_object.handle(), cx)
1069            } else {
1070                Promise::new_resolved(global, cx, result.get(), can_gc)
1071            };
1072            start_promise.resolve_native(&promise, can_gc);
1073        } else {
1074            // Otherwise, resolve startPromise with undefined.
1075            start_promise.resolve_native(&(), can_gc);
1076        };
1077
1078        Ok(stream)
1079    }
1080
1081    /// <https://streams.spec.whatwg.org/#ts-readable>
1082    fn Readable(&self) -> DomRoot<ReadableStream> {
1083        // Return this.[[readable]].
1084        self.readable.get().expect("readable stream is not set")
1085    }
1086
1087    /// <https://streams.spec.whatwg.org/#ts-writable>
1088    fn Writable(&self) -> DomRoot<WritableStream> {
1089        // Return this.[[writable]].
1090        self.writable.get().expect("writable stream is not set")
1091    }
1092}
1093
1094/// <https://streams.spec.whatwg.org/#ts-transfer>
1095impl Transferable for TransformStream {
1096    type Index = MessagePortIndex;
1097    type Data = TransformStreamData;
1098
1099    /// <https://streams.spec.whatwg.org/#ref-for-transfer-steps②>
1100    fn transfer(&self) -> Fallible<(MessagePortId, TransformStreamData)> {
1101        let global = self.global();
1102        let realm = enter_realm(&*global);
1103        let comp = InRealm::Entered(&realm);
1104        let cx = GlobalScope::get_cx();
1105        let can_gc = CanGc::note();
1106
1107        // Step 1. Let readable be value.[[readable]].
1108        let readable = self.get_readable();
1109
1110        // Step 2. Let writable be value.[[writable]].
1111        let writable = self.get_writable();
1112
1113        // Step 3. If ! IsReadableStreamLocked(readable) is true, throw a
1114        // "DataCloneError" DOMException.
1115        // Step 4. If ! IsWritableStreamLocked(writable) is true, throw a
1116        // "DataCloneError" DOMException.
1117        if readable.is_locked() || writable.is_locked() {
1118            return Err(Error::DataClone(None));
1119        }
1120
1121        // First port pair (readable → proxy writable)
1122        let port1 = MessagePort::new(&global, can_gc);
1123        global.track_message_port(&port1, None);
1124        let port1_peer = MessagePort::new(&global, can_gc);
1125        global.track_message_port(&port1_peer, None);
1126        global.entangle_ports(*port1.message_port_id(), *port1_peer.message_port_id());
1127
1128        let proxy_readable = ReadableStream::new_with_proto(&global, None, can_gc);
1129        proxy_readable.setup_cross_realm_transform_readable(cx, &port1, can_gc);
1130        proxy_readable
1131            .pipe_to(
1132                cx, &global, &writable, false, false, false, None, comp, can_gc,
1133            )
1134            .set_promise_is_handled();
1135
1136        // Second port pair (proxy readable → writable)
1137        let port2 = MessagePort::new(&global, can_gc);
1138        global.track_message_port(&port2, None);
1139        let port2_peer = MessagePort::new(&global, can_gc);
1140        global.track_message_port(&port2_peer, None);
1141        global.entangle_ports(*port2.message_port_id(), *port2_peer.message_port_id());
1142
1143        let proxy_writable = WritableStream::new_with_proto(&global, None, can_gc);
1144        proxy_writable.setup_cross_realm_transform_writable(cx, &port2, can_gc);
1145
1146        // Pipe readable into the proxy writable (→ port_1)
1147        readable
1148            .pipe_to(
1149                cx,
1150                &global,
1151                &proxy_writable,
1152                false,
1153                false,
1154                false,
1155                None,
1156                comp,
1157                can_gc,
1158            )
1159            .set_promise_is_handled();
1160
1161        // Step 5. Set dataHolder.[[readable]] to !
1162        // StructuredSerializeWithTransfer(readable, « readable »).
1163        // Step 6. Set dataHolder.[[writable]] to !
1164        // StructuredSerializeWithTransfer(writable, « writable »).
1165        Ok((
1166            *port1_peer.message_port_id(),
1167            TransformStreamData {
1168                readable: port1_peer.transfer()?,
1169                writable: port2_peer.transfer()?,
1170            },
1171        ))
1172    }
1173
1174    /// <https://streams.spec.whatwg.org/#ref-for-transfer-receiving-steps②>
1175    fn transfer_receive(
1176        owner: &GlobalScope,
1177        _id: MessagePortId,
1178        data: TransformStreamData,
1179    ) -> Result<DomRoot<Self>, ()> {
1180        let can_gc = CanGc::note();
1181        let cx = GlobalScope::get_cx();
1182
1183        let port1 = MessagePort::transfer_receive(owner, data.readable.0, data.readable.1)?;
1184        let port2 = MessagePort::transfer_receive(owner, data.writable.0, data.writable.1)?;
1185
1186        // Step 1. Let readableRecord be !
1187        // StructuredDeserializeWithTransfer(dataHolder.[[readable]], the
1188        // current Realm).
1189        let proxy_readable = ReadableStream::new_with_proto(owner, None, can_gc);
1190        proxy_readable.setup_cross_realm_transform_readable(cx, &port2, can_gc);
1191
1192        // Step 2. Let writableRecord be !
1193        // StructuredDeserializeWithTransfer(dataHolder.[[writable]], the
1194        // current Realm).
1195        let proxy_writable = WritableStream::new_with_proto(owner, None, can_gc);
1196        proxy_writable.setup_cross_realm_transform_writable(cx, &port1, can_gc);
1197
1198        // Step 3. Set value.[[readable]] to readableRecord.[[Deserialized]].
1199        // Step 4. Set value.[[writable]] to writableRecord.[[Deserialized]].
1200        // Step 5. Set value.[[backpressure]],
1201        // value.[[backpressureChangePromise]], and value.[[controller]] to
1202        // undefined.
1203        let stream = TransformStream::new_with_proto(owner, None, can_gc);
1204        stream.readable.set(Some(&proxy_readable));
1205        stream.writable.set(Some(&proxy_writable));
1206
1207        Ok(stream)
1208    }
1209
1210    fn serialized_storage<'a>(
1211        data: StructuredData<'a, '_>,
1212    ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
1213        match data {
1214            StructuredData::Reader(r) => &mut r.transform_streams_port_impls,
1215            StructuredData::Writer(w) => &mut w.transform_streams_port,
1216        }
1217    }
1218}