script/dom/
transformstream.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::collections::HashMap;
7use std::ptr::{self};
8use std::rc::Rc;
9
10use base::id::{MessagePortId, MessagePortIndex};
11use constellation_traits::TransformStreamData;
12use dom_struct::dom_struct;
13use js::jsapi::{Heap, IsPromiseObject, JSObject};
14use js::jsval::{JSVal, ObjectValue, UndefinedValue};
15use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle};
16use script_bindings::callback::ExceptionHandling;
17use script_bindings::realms::InRealm;
18
19use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
20use super::bindings::structuredclone::StructuredData;
21use super::bindings::transferable::Transferable;
22use super::messageport::MessagePort;
23use super::promisenativehandler::Callback;
24use super::readablestream::CrossRealmTransformReadable;
25use super::types::{TransformStreamDefaultController, WritableStream};
26use super::writablestream::CrossRealmTransformWritable;
27use crate::dom::bindings::cell::DomRefCell;
28use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
29use crate::dom::bindings::codegen::Bindings::TransformStreamBinding::TransformStreamMethods;
30use crate::dom::bindings::codegen::Bindings::TransformerBinding::Transformer;
31use crate::dom::bindings::conversions::ConversionResult;
32use crate::dom::bindings::error::{Error, Fallible};
33use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
34use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
35use crate::dom::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
36use crate::dom::globalscope::GlobalScope;
37use crate::dom::promise::Promise;
38use crate::dom::readablestream::{ReadableStream, create_readable_stream};
39use crate::dom::transformstreamdefaultcontroller::TransformerType;
40use crate::dom::types::PromiseNativeHandler;
41use crate::dom::underlyingsourcecontainer::UnderlyingSourceType;
42use crate::dom::writablestream::create_writable_stream;
43use crate::dom::writablestreamdefaultcontroller::UnderlyingSinkType;
44use crate::realms::enter_realm;
45use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
46
47impl js::gc::Rootable for TransformBackPressureChangePromiseFulfillment {}
48
49/// Reacting to backpressureChangePromise as part of
50/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
51#[derive(JSTraceable, MallocSizeOf)]
52#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
53struct TransformBackPressureChangePromiseFulfillment {
54    /// The result of reacting to backpressureChangePromise.
55    #[ignore_malloc_size_of = "Rc is hard"]
56    result_promise: Rc<Promise>,
57
58    #[ignore_malloc_size_of = "mozjs"]
59    chunk: Box<Heap<JSVal>>,
60
61    /// The writable used in the fulfillment steps
62    writable: Dom<WritableStream>,
63
64    controller: Dom<TransformStreamDefaultController>,
65}
66
67impl Callback for TransformBackPressureChangePromiseFulfillment {
68    /// Reacting to backpressureChangePromise with the following fulfillment steps:
69    fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
70        // Let writable be stream.[[writable]].
71        // Let state be writable.[[state]].
72        // If state is "erroring", throw writable.[[storedError]].
73        if self.writable.is_erroring() {
74            rooted!(in(*cx) let mut error = UndefinedValue());
75            self.writable.get_stored_error(error.handle_mut());
76            self.result_promise.reject(cx, error.handle(), can_gc);
77            return;
78        }
79
80        // Assert: state is "writable".
81        assert!(self.writable.is_writable());
82
83        // Return ! TransformStreamDefaultControllerPerformTransform(controller, chunk).
84        rooted!(in(*cx) let mut chunk = UndefinedValue());
85        chunk.set(self.chunk.get());
86        let transform_result = self
87            .controller
88            .transform_stream_default_controller_perform_transform(
89                cx,
90                &self.writable.global(),
91                chunk.handle(),
92                can_gc,
93            )
94            .expect("perform transform failed");
95
96        // PerformTransformFulfillment and PerformTransformRejection do not need
97        // to be rooted because they only contain an Rc.
98        let handler = PromiseNativeHandler::new(
99            &self.writable.global(),
100            Some(Box::new(PerformTransformFulfillment {
101                result_promise: self.result_promise.clone(),
102            })),
103            Some(Box::new(PerformTransformRejection {
104                result_promise: self.result_promise.clone(),
105            })),
106            can_gc,
107        );
108
109        let realm = enter_realm(&*self.writable.global());
110        let comp = InRealm::Entered(&realm);
111        transform_result.append_native_handler(&handler, comp, can_gc);
112    }
113}
114
115#[derive(JSTraceable, MallocSizeOf)]
116#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
117/// Reacting to fulfillment of performTransform as part of
118/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
119struct PerformTransformFulfillment {
120    #[ignore_malloc_size_of = "Rc is hard"]
121    result_promise: Rc<Promise>,
122}
123
124impl Callback for PerformTransformFulfillment {
125    fn callback(&self, _cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
126        // Fulfilled: resolve the outer promise
127        self.result_promise.resolve_native(&(), can_gc);
128    }
129}
130
131#[derive(JSTraceable, MallocSizeOf)]
132#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
133/// Reacting to rejection of performTransform as part of
134/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
135struct PerformTransformRejection {
136    #[ignore_malloc_size_of = "Rc is hard"]
137    result_promise: Rc<Promise>,
138}
139
140impl Callback for PerformTransformRejection {
141    fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
142        // Stream already errored in perform_transform, just reject result_promise
143        self.result_promise.reject(cx, v, can_gc);
144    }
145}
146
147#[derive(JSTraceable, MallocSizeOf)]
148#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
149/// Reacting to rejection of backpressureChangePromise as part of
150/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
151struct BackpressureChangeRejection {
152    #[ignore_malloc_size_of = "Rc is hard"]
153    result_promise: Rc<Promise>,
154}
155
156impl Callback for BackpressureChangeRejection {
157    fn callback(&self, cx: SafeJSContext, reason: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
158        self.result_promise.reject(cx, reason, can_gc);
159    }
160}
161
162impl js::gc::Rootable for CancelPromiseFulfillment {}
163
164/// Reacting to fulfillment of the cancelpromise as part of
165/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-abort-algorithm>
166#[derive(JSTraceable, MallocSizeOf)]
167#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
168struct CancelPromiseFulfillment {
169    readable: Dom<ReadableStream>,
170    controller: Dom<TransformStreamDefaultController>,
171    #[ignore_malloc_size_of = "mozjs"]
172    reason: Box<Heap<JSVal>>,
173}
174
175impl Callback for CancelPromiseFulfillment {
176    /// Reacting to backpressureChangePromise with the following fulfillment steps:
177    fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
178        // If readable.[[state]] is "errored", reject controller.[[finishPromise]] with readable.[[storedError]].
179        if self.readable.is_errored() {
180            rooted!(in(*cx) let mut error = UndefinedValue());
181            self.readable.get_stored_error(error.handle_mut());
182            self.controller
183                .get_finish_promise()
184                .expect("finish promise is not set")
185                .reject_native(&error.handle(), can_gc);
186        } else {
187            // Otherwise:
188            // Perform ! ReadableStreamDefaultControllerError(readable.[[controller]], reason).
189            rooted!(in(*cx) let mut reason = UndefinedValue());
190            reason.set(self.reason.get());
191            self.readable
192                .get_default_controller()
193                .error(reason.handle(), can_gc);
194
195            // Resolve controller.[[finishPromise]] with undefined.
196            self.controller
197                .get_finish_promise()
198                .expect("finish promise is not set")
199                .resolve_native(&(), can_gc);
200        }
201    }
202}
203
204impl js::gc::Rootable for CancelPromiseRejection {}
205
206/// Reacting to rejection of cancelpromise as part of
207/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-abort-algorithm>
208#[derive(JSTraceable, MallocSizeOf)]
209#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
210struct CancelPromiseRejection {
211    readable: Dom<ReadableStream>,
212    controller: Dom<TransformStreamDefaultController>,
213}
214
215impl Callback for CancelPromiseRejection {
216    /// Reacting to backpressureChangePromise with the following fulfillment steps:
217    fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
218        // Perform ! ReadableStreamDefaultControllerError(readable.[[controller]], r).
219        self.readable.get_default_controller().error(v, can_gc);
220
221        // Reject controller.[[finishPromise]] with r.
222        self.controller
223            .get_finish_promise()
224            .expect("finish promise is not set")
225            .reject(cx, v, can_gc);
226    }
227}
228
229impl js::gc::Rootable for SourceCancelPromiseFulfillment {}
230
231/// Reacting to fulfillment of the cancelpromise as part of
232/// <https://streams.spec.whatwg.org/#transform-stream-default-source-cancel>
233#[derive(JSTraceable, MallocSizeOf)]
234#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
235struct SourceCancelPromiseFulfillment {
236    writeable: Dom<WritableStream>,
237    controller: Dom<TransformStreamDefaultController>,
238    stream: Dom<TransformStream>,
239    #[ignore_malloc_size_of = "mozjs"]
240    reason: Box<Heap<JSVal>>,
241}
242
243impl Callback for SourceCancelPromiseFulfillment {
244    /// Reacting to backpressureChangePromise with the following fulfillment steps:
245    fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
246        // If cancelPromise was fulfilled, then:
247        let finish_promise = self
248            .controller
249            .get_finish_promise()
250            .expect("finish promise is not set");
251
252        let global = &self.writeable.global();
253        // If writable.[[state]] is "errored", reject controller.[[finishPromise]] with writable.[[storedError]].
254        if self.writeable.is_errored() {
255            rooted!(in(*cx) let mut error = UndefinedValue());
256            self.writeable.get_stored_error(error.handle_mut());
257            finish_promise.reject(cx, error.handle(), can_gc);
258        } else {
259            // Otherwise:
260            // Perform ! WritableStreamDefaultControllerErrorIfNeeded(writable.[[controller]], reason).
261            rooted!(in(*cx) let mut reason = UndefinedValue());
262            reason.set(self.reason.get());
263            self.writeable.get_default_controller().error_if_needed(
264                cx,
265                reason.handle(),
266                global,
267                can_gc,
268            );
269
270            // Perform ! TransformStreamUnblockWrite(stream).
271            self.stream.unblock_write(global, can_gc);
272
273            // Resolve controller.[[finishPromise]] with undefined.
274            finish_promise.resolve_native(&(), can_gc);
275        }
276    }
277}
278
279impl js::gc::Rootable for SourceCancelPromiseRejection {}
280
281/// Reacting to rejection of cancelpromise as part of
282/// <https://streams.spec.whatwg.org/#transform-stream-default-source-cancel>
283#[derive(JSTraceable, MallocSizeOf)]
284#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
285struct SourceCancelPromiseRejection {
286    writeable: Dom<WritableStream>,
287    controller: Dom<TransformStreamDefaultController>,
288    stream: Dom<TransformStream>,
289}
290
291impl Callback for SourceCancelPromiseRejection {
292    /// Reacting to backpressureChangePromise with the following fulfillment steps:
293    fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
294        // Perform ! WritableStreamDefaultControllerErrorIfNeeded(writable.[[controller]], r).
295        let global = &self.writeable.global();
296
297        self.writeable
298            .get_default_controller()
299            .error_if_needed(cx, v, global, can_gc);
300
301        // Perform ! TransformStreamUnblockWrite(stream).
302        self.stream.unblock_write(global, can_gc);
303
304        // Reject controller.[[finishPromise]] with r.
305        self.controller
306            .get_finish_promise()
307            .expect("finish promise is not set")
308            .reject(cx, v, can_gc);
309    }
310}
311
312impl js::gc::Rootable for FlushPromiseFulfillment {}
313
314/// Reacting to fulfillment of the flushpromise as part of
315/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-close-algorithm>
316#[derive(JSTraceable, MallocSizeOf)]
317#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
318struct FlushPromiseFulfillment {
319    readable: Dom<ReadableStream>,
320    controller: Dom<TransformStreamDefaultController>,
321}
322
323impl Callback for FlushPromiseFulfillment {
324    /// Reacting to flushpromise with the following fulfillment steps:
325    fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
326        // If flushPromise was fulfilled, then:
327        let finish_promise = self
328            .controller
329            .get_finish_promise()
330            .expect("finish promise is not set");
331
332        // If readable.[[state]] is "errored", reject controller.[[finishPromise]] with readable.[[storedError]].
333        if self.readable.is_errored() {
334            rooted!(in(*cx) let mut error = UndefinedValue());
335            self.readable.get_stored_error(error.handle_mut());
336            finish_promise.reject(cx, error.handle(), can_gc);
337        } else {
338            // Otherwise:
339            // Perform ! ReadableStreamDefaultControllerClose(readable.[[controller]]).
340            self.readable.get_default_controller().close(can_gc);
341
342            // Resolve controller.[[finishPromise]] with undefined.
343            finish_promise.resolve_native(&(), can_gc);
344        }
345    }
346}
347
348impl js::gc::Rootable for FlushPromiseRejection {}
349/// Reacting to rejection of flushpromise as part of
350/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-close-algorithm>
351
352#[derive(JSTraceable, MallocSizeOf)]
353#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
354struct FlushPromiseRejection {
355    readable: Dom<ReadableStream>,
356    controller: Dom<TransformStreamDefaultController>,
357}
358
359impl Callback for FlushPromiseRejection {
360    /// Reacting to flushpromise with the following fulfillment steps:
361    fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
362        // If flushPromise was rejected with reason r, then:
363        // Perform ! ReadableStreamDefaultControllerError(readable.[[controller]], r).
364        self.readable.get_default_controller().error(v, can_gc);
365
366        // Reject controller.[[finishPromise]] with r.
367        self.controller
368            .get_finish_promise()
369            .expect("finish promise is not set")
370            .reject(cx, v, can_gc);
371    }
372}
373
374impl js::gc::Rootable for CrossRealmTransform {}
375
376/// A wrapper to handle `message` and `messageerror` events
377/// for the message port used by the transfered stream.
378#[derive(Clone, JSTraceable, MallocSizeOf)]
379#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
380pub(crate) enum CrossRealmTransform {
381    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
382    Readable(CrossRealmTransformReadable),
383    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
384    Writable(CrossRealmTransformWritable),
385}
386
387/// <https://streams.spec.whatwg.org/#ts-class>
388#[dom_struct]
389pub struct TransformStream {
390    reflector_: Reflector,
391
392    /// <https://streams.spec.whatwg.org/#transformstream-backpressure>
393    backpressure: Cell<bool>,
394
395    /// <https://streams.spec.whatwg.org/#transformstream-backpressurechangepromise>
396    #[ignore_malloc_size_of = "Rc is hard"]
397    backpressure_change_promise: DomRefCell<Option<Rc<Promise>>>,
398
399    /// <https://streams.spec.whatwg.org/#transformstream-controller>
400    controller: MutNullableDom<TransformStreamDefaultController>,
401
402    /// <https://streams.spec.whatwg.org/#transformstream-detached>
403    detached: Cell<bool>,
404
405    /// <https://streams.spec.whatwg.org/#transformstream-readable>
406    readable: MutNullableDom<ReadableStream>,
407
408    /// <https://streams.spec.whatwg.org/#transformstream-writable>
409    writable: MutNullableDom<WritableStream>,
410}
411
412impl TransformStream {
413    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
414    /// <https://streams.spec.whatwg.org/#initialize-transform-stream>
415    fn new_inherited() -> TransformStream {
416        TransformStream {
417            reflector_: Reflector::new(),
418            backpressure: Default::default(),
419            backpressure_change_promise: DomRefCell::new(None),
420            controller: MutNullableDom::new(None),
421            detached: Cell::new(false),
422            readable: MutNullableDom::new(None),
423            writable: MutNullableDom::new(None),
424        }
425    }
426
427    pub(crate) fn new_with_proto(
428        global: &GlobalScope,
429        proto: Option<SafeHandleObject>,
430        can_gc: CanGc,
431    ) -> DomRoot<TransformStream> {
432        reflect_dom_object_with_proto(
433            Box::new(TransformStream::new_inherited()),
434            global,
435            proto,
436            can_gc,
437        )
438    }
439
440    /// Creates and set up the newly created transform stream following
441    /// <https://streams.spec.whatwg.org/#transformstream-set-up>
442    pub(crate) fn set_up(
443        &self,
444        cx: SafeJSContext,
445        global: &GlobalScope,
446        transformer_type: TransformerType,
447        can_gc: CanGc,
448    ) -> Fallible<()> {
449        // Step1. Let writableHighWaterMark be 1.
450        let writable_high_water_mark = 1.0;
451
452        // Step 2. Let writableSizeAlgorithm be an algorithm that returns 1.
453        let writable_size_algorithm = extract_size_algorithm(&Default::default(), can_gc);
454
455        // Step 3. Let readableHighWaterMark be 0.
456        let readable_high_water_mark = 0.0;
457
458        // Step 4. Let readableSizeAlgorithm be an algorithm that returns 1.
459        let readable_size_algorithm = extract_size_algorithm(&Default::default(), can_gc);
460
461        // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
462        // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
463        // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
464        // NOTE: These steps are implemented in `TransformStreamDefaultController::new`
465
466        // Step 8. Let startPromise be a promise resolved with undefined.
467        let start_promise = Promise::new_resolved(global, cx, (), can_gc);
468
469        // Step 9. Perform ! InitializeTransformStream(stream, startPromise,
470        // writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark,
471        // readableSizeAlgorithm).
472        self.initialize(
473            cx,
474            global,
475            start_promise.clone(),
476            writable_high_water_mark,
477            writable_size_algorithm,
478            readable_high_water_mark,
479            readable_size_algorithm,
480            can_gc,
481        )?;
482
483        // Step 10. Let controller be a new TransformStreamDefaultController.
484        let controller = TransformStreamDefaultController::new(global, transformer_type, can_gc);
485
486        // Step 11. Perform ! SetUpTransformStreamDefaultController(stream,
487        // controller, transformAlgorithmWrapper, flushAlgorithmWrapper,
488        // cancelAlgorithmWrapper).
489        self.set_up_transform_stream_default_controller(&controller);
490
491        Ok(())
492    }
493
494    pub(crate) fn get_controller(&self) -> DomRoot<TransformStreamDefaultController> {
495        self.controller.get().expect("controller is not set")
496    }
497
498    pub(crate) fn get_writable(&self) -> DomRoot<WritableStream> {
499        self.writable.get().expect("writable stream is not set")
500    }
501
502    pub(crate) fn get_readable(&self) -> DomRoot<ReadableStream> {
503        self.readable.get().expect("readable stream is not set")
504    }
505
506    pub(crate) fn get_backpressure(&self) -> bool {
507        self.backpressure.get()
508    }
509
510    /// <https://streams.spec.whatwg.org/#initialize-transform-stream>
511    #[allow(clippy::too_many_arguments)]
512    fn initialize(
513        &self,
514        cx: SafeJSContext,
515        global: &GlobalScope,
516        start_promise: Rc<Promise>,
517        writable_high_water_mark: f64,
518        writable_size_algorithm: Rc<QueuingStrategySize>,
519        readable_high_water_mark: f64,
520        readable_size_algorithm: Rc<QueuingStrategySize>,
521        can_gc: CanGc,
522    ) -> Fallible<()> {
523        // Let startAlgorithm be an algorithm that returns startPromise.
524        // Let writeAlgorithm be the following steps, taking a chunk argument:
525        //  Return ! TransformStreamDefaultSinkWriteAlgorithm(stream, chunk).
526        // Let abortAlgorithm be the following steps, taking a reason argument:
527        //  Return ! TransformStreamDefaultSinkAbortAlgorithm(stream, reason).
528        // Let closeAlgorithm be the following steps:
529        //  Return ! TransformStreamDefaultSinkCloseAlgorithm(stream).
530        // Set stream.[[writable]] to ! CreateWritableStream(startAlgorithm, writeAlgorithm,
531        // closeAlgorithm, abortAlgorithm, writableHighWaterMark, writableSizeAlgorithm).
532        // Note: Those steps are implemented using UnderlyingSinkType::Transform.
533
534        let writable = create_writable_stream(
535            cx,
536            global,
537            writable_high_water_mark,
538            writable_size_algorithm,
539            UnderlyingSinkType::Transform(Dom::from_ref(self), start_promise.clone()),
540            can_gc,
541        )?;
542        self.writable.set(Some(&writable));
543
544        // Let pullAlgorithm be the following steps:
545
546        // Return ! TransformStreamDefaultSourcePullAlgorithm(stream).
547
548        // Let cancelAlgorithm be the following steps, taking a reason argument:
549
550        // Return ! TransformStreamDefaultSourceCancelAlgorithm(stream, reason).
551
552        // Set stream.[[readable]] to ! CreateReadableStream(startAlgorithm, pullAlgorithm,
553        // cancelAlgorithm, readableHighWaterMark, readableSizeAlgorithm).
554
555        let readable = create_readable_stream(
556            global,
557            UnderlyingSourceType::Transform(Dom::from_ref(self), start_promise.clone()),
558            Some(readable_size_algorithm),
559            Some(readable_high_water_mark),
560            can_gc,
561        );
562        self.readable.set(Some(&readable));
563
564        // Set stream.[[backpressure]] and stream.[[backpressureChangePromise]] to undefined.
565        // Note: This is done in the constructor.
566
567        // Perform ! TransformStreamSetBackpressure(stream, true).
568        self.set_backpressure(global, true, can_gc);
569
570        // Set stream.[[controller]] to undefined.
571        self.controller.set(None);
572
573        Ok(())
574    }
575
576    /// <https://streams.spec.whatwg.org/#transform-stream-set-backpressure>
577    pub(crate) fn set_backpressure(&self, global: &GlobalScope, backpressure: bool, can_gc: CanGc) {
578        // Assert: stream.[[backpressure]] is not backpressure.
579        assert!(self.backpressure.get() != backpressure);
580
581        // If stream.[[backpressureChangePromise]] is not undefined, resolve
582        // stream.[[backpressureChangePromise]] with undefined.
583        if let Some(promise) = self.backpressure_change_promise.borrow_mut().take() {
584            promise.resolve_native(&(), can_gc);
585        }
586
587        // Set stream.[[backpressureChangePromise]] to a new promise.;
588        *self.backpressure_change_promise.borrow_mut() = Some(Promise::new(global, can_gc));
589
590        // Set stream.[[backpressure]] to backpressure.
591        self.backpressure.set(backpressure);
592    }
593
594    /// <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller>
595    fn set_up_transform_stream_default_controller(
596        &self,
597        controller: &TransformStreamDefaultController,
598    ) {
599        // Assert: stream implements TransformStream.
600        // Note: this is checked with type.
601
602        // Assert: stream.[[controller]] is undefined.
603        assert!(self.controller.get().is_none());
604
605        // Set controller.[[stream]] to stream.
606        controller.set_stream(self);
607
608        // Set stream.[[controller]] to controller.
609        self.controller.set(Some(controller));
610
611        // Set controller.[[transformAlgorithm]] to transformAlgorithm.
612        // Set controller.[[flushAlgorithm]] to flushAlgorithm.
613        // Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
614        // Note: These are set in the constructor.
615    }
616
617    /// <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
618    fn set_up_transform_stream_default_controller_from_transformer(
619        &self,
620        global: &GlobalScope,
621        transformer_obj: SafeHandleObject,
622        transformer: &Transformer,
623        can_gc: CanGc,
624    ) {
625        // Let controller be a new TransformStreamDefaultController.
626        let transformer_type = TransformerType::new_from_js_transformer(transformer);
627        let controller = TransformStreamDefaultController::new(global, transformer_type, can_gc);
628
629        // Let transformAlgorithm be the following steps, taking a chunk argument:
630        // Let result be TransformStreamDefaultControllerEnqueue(controller, chunk).
631        // If result is an abrupt completion, return a promise rejected with result.[[Value]].
632        // Otherwise, return a promise resolved with undefined.
633
634        // Let flushAlgorithm be an algorithm which returns a promise resolved with undefined.
635        // Let cancelAlgorithm be an algorithm which returns a promise resolved with undefined.
636
637        // If transformerDict["transform"] exists, set transformAlgorithm to an algorithm which
638        // takes an argument
639        // chunk and returns the result of invoking transformerDict["transform"] with argument
640        // list « chunk, controller »
641        // and callback this value transformer.
642
643        // If transformerDict["flush"] exists, set flushAlgorithm to an algorithm which returns
644        // the result
645        // of invoking transformerDict["flush"] with argument list « controller » and callback
646        // this value transformer.
647
648        // If transformerDict["cancel"] exists, set cancelAlgorithm to an algorithm which takes an argument
649        // reason and returns the result of invoking transformerDict["cancel"] with argument list « reason »
650        // and callback this value transformer.
651        controller.set_transform_obj(transformer_obj);
652
653        // Perform ! SetUpTransformStreamDefaultController(stream, controller,
654        // transformAlgorithm, flushAlgorithm, cancelAlgorithm).
655        self.set_up_transform_stream_default_controller(&controller);
656    }
657
658    /// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
659    pub(crate) fn transform_stream_default_sink_write_algorithm(
660        &self,
661        cx: SafeJSContext,
662        global: &GlobalScope,
663        chunk: SafeHandleValue,
664        can_gc: CanGc,
665    ) -> Fallible<Rc<Promise>> {
666        // Assert: stream.[[writable]].[[state]] is "writable".
667        assert!(self.writable.get().is_some());
668
669        // Let controller be stream.[[controller]].
670        let controller = self.controller.get().expect("controller is not set");
671
672        // If stream.[[backpressure]] is true,
673        if self.backpressure.get() {
674            // Let backpressureChangePromise be stream.[[backpressureChangePromise]].
675            let backpressure_change_promise = self.backpressure_change_promise.borrow();
676
677            // Assert: backpressureChangePromise is not undefined.
678            assert!(backpressure_change_promise.is_some());
679
680            // Return the result of reacting to backpressureChangePromise with the following fulfillment steps:
681            let result_promise = Promise::new(global, can_gc);
682            rooted!(in(*cx) let mut fulfillment_handler = Some(TransformBackPressureChangePromiseFulfillment {
683                controller: Dom::from_ref(&controller),
684                writable: Dom::from_ref(&self.writable.get().expect("writable stream")),
685                chunk: Heap::boxed(chunk.get()),
686                result_promise: result_promise.clone(),
687            }));
688
689            let handler = PromiseNativeHandler::new(
690                global,
691                fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
692                Some(Box::new(BackpressureChangeRejection {
693                    result_promise: result_promise.clone(),
694                })),
695                can_gc,
696            );
697            let realm = enter_realm(global);
698            let comp = InRealm::Entered(&realm);
699            backpressure_change_promise
700                .as_ref()
701                .expect("Promise must be some by now.")
702                .append_native_handler(&handler, comp, can_gc);
703
704            return Ok(result_promise);
705        }
706
707        // Return ! TransformStreamDefaultControllerPerformTransform(controller, chunk).
708        controller.transform_stream_default_controller_perform_transform(cx, global, chunk, can_gc)
709    }
710
711    /// <https://streams.spec.whatwg.org/#transform-stream-default-sink-abort-algorithm>
712    pub(crate) fn transform_stream_default_sink_abort_algorithm(
713        &self,
714        cx: SafeJSContext,
715        global: &GlobalScope,
716        reason: SafeHandleValue,
717        can_gc: CanGc,
718    ) -> Fallible<Rc<Promise>> {
719        // Let controller be stream.[[controller]].
720        let controller = self.controller.get().expect("controller is not set");
721
722        // If controller.[[finishPromise]] is not undefined, return controller.[[finishPromise]].
723        if let Some(finish_promise) = controller.get_finish_promise() {
724            return Ok(finish_promise);
725        }
726
727        // Let readable be stream.[[readable]].
728        let readable = self.readable.get().expect("readable stream is not set");
729
730        // Let controller.[[finishPromise]] be a new promise.
731        controller.set_finish_promise(Promise::new(global, can_gc));
732
733        // Let cancelPromise be the result of performing controller.[[cancelAlgorithm]], passing reason.
734        let cancel_promise = controller.perform_cancel(cx, global, reason, can_gc)?;
735
736        // Perform ! TransformStreamDefaultControllerClearAlgorithms(controller).
737        controller.clear_algorithms();
738
739        // React to cancelPromise:
740        let handler = PromiseNativeHandler::new(
741            global,
742            Some(Box::new(CancelPromiseFulfillment {
743                readable: Dom::from_ref(&readable),
744                controller: Dom::from_ref(&controller),
745                reason: Heap::boxed(reason.get()),
746            })),
747            Some(Box::new(CancelPromiseRejection {
748                readable: Dom::from_ref(&readable),
749                controller: Dom::from_ref(&controller),
750            })),
751            can_gc,
752        );
753        let realm = enter_realm(global);
754        let comp = InRealm::Entered(&realm);
755        cancel_promise.append_native_handler(&handler, comp, can_gc);
756
757        // Return controller.[[finishPromise]].
758        let finish_promise = controller
759            .get_finish_promise()
760            .expect("finish promise is not set");
761        Ok(finish_promise)
762    }
763
764    /// <https://streams.spec.whatwg.org/#transform-stream-default-sink-close-algorithm>
765    pub(crate) fn transform_stream_default_sink_close_algorithm(
766        &self,
767        cx: SafeJSContext,
768        global: &GlobalScope,
769        can_gc: CanGc,
770    ) -> Fallible<Rc<Promise>> {
771        // Let controller be stream.[[controller]].
772        let controller = self
773            .controller
774            .get()
775            .ok_or(Error::Type("controller is not set".to_string()))?;
776
777        // If controller.[[finishPromise]] is not undefined, return controller.[[finishPromise]].
778        if let Some(finish_promise) = controller.get_finish_promise() {
779            return Ok(finish_promise);
780        }
781
782        // Let readable be stream.[[readable]].
783        let readable = self
784            .readable
785            .get()
786            .ok_or(Error::Type("readable stream is not set".to_string()))?;
787
788        // Let controller.[[finishPromise]] be a new promise.
789        controller.set_finish_promise(Promise::new(global, can_gc));
790
791        // Let flushPromise be the result of performing controller.[[flushAlgorithm]].
792        let flush_promise = controller.perform_flush(cx, global, can_gc)?;
793
794        // Perform ! TransformStreamDefaultControllerClearAlgorithms(controller).
795        controller.clear_algorithms();
796
797        // React to flushPromise:
798        let handler = PromiseNativeHandler::new(
799            global,
800            Some(Box::new(FlushPromiseFulfillment {
801                readable: Dom::from_ref(&readable),
802                controller: Dom::from_ref(&controller),
803            })),
804            Some(Box::new(FlushPromiseRejection {
805                readable: Dom::from_ref(&readable),
806                controller: Dom::from_ref(&controller),
807            })),
808            can_gc,
809        );
810
811        let realm = enter_realm(global);
812        let comp = InRealm::Entered(&realm);
813        flush_promise.append_native_handler(&handler, comp, can_gc);
814        // Return controller.[[finishPromise]].
815        let finish_promise = controller
816            .get_finish_promise()
817            .expect("finish promise is not set");
818        Ok(finish_promise)
819    }
820
821    /// <https://streams.spec.whatwg.org/#transform-stream-default-source-cancel>
822    pub(crate) fn transform_stream_default_source_cancel(
823        &self,
824        cx: SafeJSContext,
825        global: &GlobalScope,
826        reason: SafeHandleValue,
827        can_gc: CanGc,
828    ) -> Fallible<Rc<Promise>> {
829        // Let controller be stream.[[controller]].
830        let controller = self
831            .controller
832            .get()
833            .ok_or(Error::Type("controller is not set".to_string()))?;
834
835        // If controller.[[finishPromise]] is not undefined, return controller.[[finishPromise]].
836        if let Some(finish_promise) = controller.get_finish_promise() {
837            return Ok(finish_promise);
838        }
839
840        // Let writable be stream.[[writable]].
841        let writable = self
842            .writable
843            .get()
844            .ok_or(Error::Type("writable stream is not set".to_string()))?;
845
846        // Let controller.[[finishPromise]] be a new promise.
847        controller.set_finish_promise(Promise::new(global, can_gc));
848
849        // Let cancelPromise be the result of performing controller.[[cancelAlgorithm]], passing reason.
850        let cancel_promise = controller.perform_cancel(cx, global, reason, can_gc)?;
851
852        // Perform ! TransformStreamDefaultControllerClearAlgorithms(controller).
853        controller.clear_algorithms();
854
855        // React to cancelPromise:
856        let handler = PromiseNativeHandler::new(
857            global,
858            Some(Box::new(SourceCancelPromiseFulfillment {
859                writeable: Dom::from_ref(&writable),
860                controller: Dom::from_ref(&controller),
861                stream: Dom::from_ref(self),
862                reason: Heap::boxed(reason.get()),
863            })),
864            Some(Box::new(SourceCancelPromiseRejection {
865                writeable: Dom::from_ref(&writable),
866                controller: Dom::from_ref(&controller),
867                stream: Dom::from_ref(self),
868            })),
869            can_gc,
870        );
871
872        // Return controller.[[finishPromise]].
873        let finish_promise = controller
874            .get_finish_promise()
875            .expect("finish promise is not set");
876        let realm = enter_realm(global);
877        let comp = InRealm::Entered(&realm);
878        cancel_promise.append_native_handler(&handler, comp, can_gc);
879        Ok(finish_promise)
880    }
881
882    /// <https://streams.spec.whatwg.org/#transform-stream-default-source-pull>
883    pub(crate) fn transform_stream_default_source_pull(
884        &self,
885        global: &GlobalScope,
886        can_gc: CanGc,
887    ) -> Fallible<Rc<Promise>> {
888        // Assert: stream.[[backpressure]] is true.
889        assert!(self.backpressure.get());
890
891        // Assert: stream.[[backpressureChangePromise]] is not undefined.
892        assert!(self.backpressure_change_promise.borrow().is_some());
893
894        // Perform ! TransformStreamSetBackpressure(stream, false).
895        self.set_backpressure(global, false, can_gc);
896
897        // Return stream.[[backpressureChangePromise]].
898        Ok(self
899            .backpressure_change_promise
900            .borrow()
901            .clone()
902            .expect("Promise must be some by now."))
903    }
904
905    /// <https://streams.spec.whatwg.org/#transform-stream-error-writable-and-unblock-write>
906    pub(crate) fn error_writable_and_unblock_write(
907        &self,
908        cx: SafeJSContext,
909        global: &GlobalScope,
910        error: SafeHandleValue,
911        can_gc: CanGc,
912    ) {
913        // Perform ! TransformStreamDefaultControllerClearAlgorithms(stream.[[controller]]).
914        self.get_controller().clear_algorithms();
915
916        // Perform ! WritableStreamDefaultControllerErrorIfNeeded(stream.[[writable]].[[controller]], e).
917        self.get_writable()
918            .get_default_controller()
919            .error_if_needed(cx, error, global, can_gc);
920
921        // Perform ! TransformStreamUnblockWrite(stream).
922        self.unblock_write(global, can_gc)
923    }
924
925    /// <https://streams.spec.whatwg.org/#transform-stream-unblock-write>
926    pub(crate) fn unblock_write(&self, global: &GlobalScope, can_gc: CanGc) {
927        // If stream.[[backpressure]] is true, perform ! TransformStreamSetBackpressure(stream, false).
928        if self.backpressure.get() {
929            self.set_backpressure(global, false, can_gc);
930        }
931    }
932
933    /// <https://streams.spec.whatwg.org/#transform-stream-error>
934    pub(crate) fn error(
935        &self,
936        cx: SafeJSContext,
937        global: &GlobalScope,
938        error: SafeHandleValue,
939        can_gc: CanGc,
940    ) {
941        // Perform ! ReadableStreamDefaultControllerError(stream.[[readable]].[[controller]], e).
942        self.get_readable()
943            .get_default_controller()
944            .error(error, can_gc);
945
946        // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, e).
947        self.error_writable_and_unblock_write(cx, global, error, can_gc);
948    }
949}
950
951#[allow(non_snake_case)]
952impl TransformStreamMethods<crate::DomTypeHolder> for TransformStream {
953    /// <https://streams.spec.whatwg.org/#ts-constructor>
954    #[allow(unsafe_code)]
955    fn Constructor(
956        cx: SafeJSContext,
957        global: &GlobalScope,
958        proto: Option<SafeHandleObject>,
959        can_gc: CanGc,
960        transformer: Option<*mut JSObject>,
961        writable_strategy: &QueuingStrategy,
962        readable_strategy: &QueuingStrategy,
963    ) -> Fallible<DomRoot<TransformStream>> {
964        // If transformer is missing, set it to null.
965        rooted!(in(*cx) let transformer_obj = transformer.unwrap_or(ptr::null_mut()));
966
967        // Let underlyingSinkDict be underlyingSink,
968        // converted to an IDL value of type UnderlyingSink.
969        let transformer_dict = if !transformer_obj.is_null() {
970            rooted!(in(*cx) let obj_val = ObjectValue(transformer_obj.get()));
971            match Transformer::new(cx, obj_val.handle()) {
972                Ok(ConversionResult::Success(val)) => val,
973                Ok(ConversionResult::Failure(error)) => return Err(Error::Type(error.to_string())),
974                _ => {
975                    return Err(Error::JSFailed);
976                },
977            }
978        } else {
979            Transformer::empty()
980        };
981
982        // If transformerDict["readableType"] exists, throw a RangeError exception.
983        if !transformer_dict.readableType.handle().is_undefined() {
984            return Err(Error::Range("readableType is set".to_string()));
985        }
986
987        // If transformerDict["writableType"] exists, throw a RangeError exception.
988        if !transformer_dict.writableType.handle().is_undefined() {
989            return Err(Error::Range("writableType is set".to_string()));
990        }
991
992        // Let readableHighWaterMark be ? ExtractHighWaterMark(readableStrategy, 0).
993        let readable_high_water_mark = extract_high_water_mark(readable_strategy, 0.0)?;
994
995        // Let readableSizeAlgorithm be ! ExtractSizeAlgorithm(readableStrategy).
996        let readable_size_algorithm = extract_size_algorithm(readable_strategy, can_gc);
997
998        // Let writableHighWaterMark be ? ExtractHighWaterMark(writableStrategy, 1).
999        let writable_high_water_mark = extract_high_water_mark(writable_strategy, 1.0)?;
1000
1001        // Let writableSizeAlgorithm be ! ExtractSizeAlgorithm(writableStrategy).
1002        let writable_size_algorithm = extract_size_algorithm(writable_strategy, can_gc);
1003
1004        // Let startPromise be a new promise.
1005        let start_promise = Promise::new(global, can_gc);
1006
1007        // Perform ! InitializeTransformStream(this, startPromise, writableHighWaterMark,
1008        // writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm).
1009        let stream = TransformStream::new_with_proto(global, proto, can_gc);
1010        stream.initialize(
1011            cx,
1012            global,
1013            start_promise.clone(),
1014            writable_high_water_mark,
1015            writable_size_algorithm,
1016            readable_high_water_mark,
1017            readable_size_algorithm,
1018            can_gc,
1019        )?;
1020
1021        // Perform ? SetUpTransformStreamDefaultControllerFromTransformer(this, transformer, transformerDict).
1022        stream.set_up_transform_stream_default_controller_from_transformer(
1023            global,
1024            transformer_obj.handle(),
1025            &transformer_dict,
1026            can_gc,
1027        );
1028
1029        // If transformerDict["start"] exists, then resolve startPromise with the
1030        // result of invoking transformerDict["start"]
1031        // with argument list « this.[[controller]] » and callback this value transformer.
1032        if let Some(start) = &transformer_dict.start {
1033            rooted!(in(*cx) let mut result_object = ptr::null_mut::<JSObject>());
1034            rooted!(in(*cx) let mut result: JSVal);
1035            rooted!(in(*cx) let this_object = transformer_obj.get());
1036            start.Call_(
1037                &this_object.handle(),
1038                &stream.get_controller(),
1039                result.handle_mut(),
1040                ExceptionHandling::Rethrow,
1041                can_gc,
1042            )?;
1043            let is_promise = unsafe {
1044                if result.is_object() {
1045                    result_object.set(result.to_object());
1046                    IsPromiseObject(result_object.handle().into_handle())
1047                } else {
1048                    false
1049                }
1050            };
1051            let promise = if is_promise {
1052                Promise::new_with_js_promise(result_object.handle(), cx)
1053            } else {
1054                Promise::new_resolved(global, cx, result.get(), can_gc)
1055            };
1056            start_promise.resolve_native(&promise, can_gc);
1057        } else {
1058            // Otherwise, resolve startPromise with undefined.
1059            start_promise.resolve_native(&(), can_gc);
1060        };
1061
1062        Ok(stream)
1063    }
1064
1065    /// <https://streams.spec.whatwg.org/#ts-readable>
1066    fn Readable(&self) -> DomRoot<ReadableStream> {
1067        // Return this.[[readable]].
1068        self.readable.get().expect("readable stream is not set")
1069    }
1070
1071    /// <https://streams.spec.whatwg.org/#ts-writable>
1072    fn Writable(&self) -> DomRoot<WritableStream> {
1073        // Return this.[[writable]].
1074        self.writable.get().expect("writable stream is not set")
1075    }
1076}
1077
1078/// <https://streams.spec.whatwg.org/#ts-transfer>
1079impl Transferable for TransformStream {
1080    type Index = MessagePortIndex;
1081    type Data = TransformStreamData;
1082
1083    /// <https://streams.spec.whatwg.org/#ref-for-transfer-steps②>
1084    fn transfer(&self) -> Fallible<(MessagePortId, TransformStreamData)> {
1085        let global = self.global();
1086        let realm = enter_realm(&*global);
1087        let comp = InRealm::Entered(&realm);
1088        let cx = GlobalScope::get_cx();
1089        let can_gc = CanGc::note();
1090
1091        // Step 1. Let readable be value.[[readable]].
1092        let readable = self.get_readable();
1093
1094        // Step 2. Let writable be value.[[writable]].
1095        let writable = self.get_writable();
1096
1097        // Step 3. If ! IsReadableStreamLocked(readable) is true, throw a
1098        // "DataCloneError" DOMException.
1099        // Step 4. If ! IsWritableStreamLocked(writable) is true, throw a
1100        // "DataCloneError" DOMException.
1101        if readable.is_locked() || writable.is_locked() {
1102            return Err(Error::DataClone(None));
1103        }
1104
1105        // First port pair (readable → proxy writable)
1106        let port1 = MessagePort::new(&global, can_gc);
1107        global.track_message_port(&port1, None);
1108        let port1_peer = MessagePort::new(&global, can_gc);
1109        global.track_message_port(&port1_peer, None);
1110        global.entangle_ports(*port1.message_port_id(), *port1_peer.message_port_id());
1111
1112        let proxy_readable = ReadableStream::new_with_proto(&global, None, can_gc);
1113        proxy_readable.setup_cross_realm_transform_readable(cx, &port1, can_gc);
1114        proxy_readable
1115            .pipe_to(
1116                cx, &global, &writable, false, false, false, None, comp, can_gc,
1117            )
1118            .set_promise_is_handled();
1119
1120        // Second port pair (proxy readable → writable)
1121        let port2 = MessagePort::new(&global, can_gc);
1122        global.track_message_port(&port2, None);
1123        let port2_peer = MessagePort::new(&global, can_gc);
1124        global.track_message_port(&port2_peer, None);
1125        global.entangle_ports(*port2.message_port_id(), *port2_peer.message_port_id());
1126
1127        let proxy_writable = WritableStream::new_with_proto(&global, None, can_gc);
1128        proxy_writable.setup_cross_realm_transform_writable(cx, &port2, can_gc);
1129
1130        // Pipe readable into the proxy writable (→ port_1)
1131        readable
1132            .pipe_to(
1133                cx,
1134                &global,
1135                &proxy_writable,
1136                false,
1137                false,
1138                false,
1139                None,
1140                comp,
1141                can_gc,
1142            )
1143            .set_promise_is_handled();
1144
1145        // Step 5. Set dataHolder.[[readable]] to !
1146        // StructuredSerializeWithTransfer(readable, « readable »).
1147        // Step 6. Set dataHolder.[[writable]] to !
1148        // StructuredSerializeWithTransfer(writable, « writable »).
1149        Ok((
1150            *port1_peer.message_port_id(),
1151            TransformStreamData {
1152                readable: port1_peer.transfer()?,
1153                writable: port2_peer.transfer()?,
1154            },
1155        ))
1156    }
1157
1158    /// <https://streams.spec.whatwg.org/#ref-for-transfer-receiving-steps②>
1159    fn transfer_receive(
1160        owner: &GlobalScope,
1161        _id: MessagePortId,
1162        data: TransformStreamData,
1163    ) -> Result<DomRoot<Self>, ()> {
1164        let can_gc = CanGc::note();
1165        let cx = GlobalScope::get_cx();
1166
1167        let port1 = MessagePort::transfer_receive(owner, data.readable.0, data.readable.1)?;
1168        let port2 = MessagePort::transfer_receive(owner, data.writable.0, data.writable.1)?;
1169
1170        // Step 1. Let readableRecord be !
1171        // StructuredDeserializeWithTransfer(dataHolder.[[readable]], the
1172        // current Realm).
1173        let proxy_readable = ReadableStream::new_with_proto(owner, None, can_gc);
1174        proxy_readable.setup_cross_realm_transform_readable(cx, &port2, can_gc);
1175
1176        // Step 2. Let writableRecord be !
1177        // StructuredDeserializeWithTransfer(dataHolder.[[writable]], the
1178        // current Realm).
1179        let proxy_writable = WritableStream::new_with_proto(owner, None, can_gc);
1180        proxy_writable.setup_cross_realm_transform_writable(cx, &port1, can_gc);
1181
1182        // Step 3. Set value.[[readable]] to readableRecord.[[Deserialized]].
1183        // Step 4. Set value.[[writable]] to writableRecord.[[Deserialized]].
1184        // Step 5. Set value.[[backpressure]],
1185        // value.[[backpressureChangePromise]], and value.[[controller]] to
1186        // undefined.
1187        let stream = TransformStream::new_with_proto(owner, None, can_gc);
1188        stream.readable.set(Some(&proxy_readable));
1189        stream.writable.set(Some(&proxy_writable));
1190
1191        Ok(stream)
1192    }
1193
1194    fn serialized_storage<'a>(
1195        data: StructuredData<'a, '_>,
1196    ) -> &'a mut Option<HashMap<MessagePortId, Self::Data>> {
1197        match data {
1198            StructuredData::Reader(r) => &mut r.transform_streams_port_impls,
1199            StructuredData::Writer(w) => &mut w.transform_streams_port,
1200        }
1201    }
1202}