Skip to main content

script/dom/stream/
transformstream.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::ptr::{self};
7use std::rc::Rc;
8
9use dom_struct::dom_struct;
10use js::context::JSContext;
11use js::jsapi::{Heap, IsPromiseObject, JSObject};
12use js::jsval::{JSVal, ObjectValue, UndefinedValue};
13use js::realm::CurrentRealm;
14use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle};
15use rustc_hash::FxHashMap;
16use script_bindings::callback::ExceptionHandling;
17use script_bindings::cell::DomRefCell;
18use script_bindings::reflector::{Reflector, reflect_dom_object_with_proto};
19use servo_base::id::{MessagePortId, MessagePortIndex};
20use servo_constellation_traits::TransformStreamData;
21
22use super::readablestream::CrossRealmTransformReadable;
23use super::writablestream::CrossRealmTransformWritable;
24use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::{
25    QueuingStrategy, QueuingStrategySize,
26};
27use crate::dom::bindings::codegen::Bindings::TransformStreamBinding::TransformStreamMethods;
28use crate::dom::bindings::codegen::Bindings::TransformerBinding::Transformer;
29use crate::dom::bindings::conversions::ConversionResult;
30use crate::dom::bindings::error::{Error, Fallible};
31use crate::dom::bindings::reflector::DomGlobal;
32use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
33use crate::dom::bindings::structuredclone::StructuredData;
34use crate::dom::bindings::transferable::Transferable;
35use crate::dom::globalscope::GlobalScope;
36use crate::dom::messageport::MessagePort;
37use crate::dom::promise::Promise;
38use crate::dom::promisenativehandler::Callback;
39use crate::dom::readablestream::{ReadableStream, create_readable_stream};
40use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
41use crate::dom::stream::transformstreamdefaultcontroller::TransformerType;
42use crate::dom::stream::underlyingsourcecontainer::UnderlyingSourceType;
43use crate::dom::stream::writablestream::create_writable_stream;
44use crate::dom::stream::writablestreamdefaultcontroller::UnderlyingSinkType;
45use crate::dom::types::{PromiseNativeHandler, TransformStreamDefaultController, WritableStream};
46use crate::realms::enter_auto_realm;
47use crate::script_runtime::CanGc;
48
49impl js::gc::Rootable for TransformBackPressureChangePromiseFulfillment {}
50
51/// Reacting to backpressureChangePromise as part of
52/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
53#[derive(JSTraceable, MallocSizeOf)]
54#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
55struct TransformBackPressureChangePromiseFulfillment {
56    /// The result of reacting to backpressureChangePromise.
57    #[conditional_malloc_size_of]
58    result_promise: Rc<Promise>,
59
60    #[ignore_malloc_size_of = "mozjs"]
61    chunk: Box<Heap<JSVal>>,
62
63    /// The writable used in the fulfillment steps
64    writable: Dom<WritableStream>,
65
66    controller: Dom<TransformStreamDefaultController>,
67}
68
69impl Callback for TransformBackPressureChangePromiseFulfillment {
70    /// Reacting to backpressureChangePromise with the following fulfillment steps:
71    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
72        // Let writable be stream.[[writable]].
73        // Let state be writable.[[state]].
74        // If state is "erroring", throw writable.[[storedError]].
75        if self.writable.is_erroring() {
76            rooted!(&in(cx) let mut error = UndefinedValue());
77            self.writable.get_stored_error(error.handle_mut());
78            self.result_promise.reject_with_cx(cx, error.handle());
79            return;
80        }
81
82        // Assert: state is "writable".
83        assert!(self.writable.is_writable());
84
85        // Return ! TransformStreamDefaultControllerPerformTransform(controller, chunk).
86        rooted!(&in(cx) let mut chunk = UndefinedValue());
87        chunk.set(self.chunk.get());
88        let transform_result = self
89            .controller
90            .transform_stream_default_controller_perform_transform(
91                cx,
92                &self.writable.global(),
93                chunk.handle(),
94            )
95            .expect("perform transform failed");
96
97        // PerformTransformFulfillment and PerformTransformRejection do not need
98        // to be rooted because they only contain an Rc.
99        let handler = PromiseNativeHandler::new(
100            cx,
101            &self.writable.global(),
102            Some(Box::new(PerformTransformFulfillment {
103                result_promise: self.result_promise.clone(),
104            })),
105            Some(Box::new(PerformTransformRejection {
106                result_promise: self.result_promise.clone(),
107            })),
108        );
109
110        let mut realm = enter_auto_realm(cx, &*self.writable.global());
111        let realm = &mut realm.current_realm();
112        transform_result.append_native_handler(realm, &handler);
113    }
114}
115
116#[derive(JSTraceable, MallocSizeOf)]
117#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
118/// Reacting to fulfillment of performTransform as part of
119/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
120struct PerformTransformFulfillment {
121    #[conditional_malloc_size_of]
122    result_promise: Rc<Promise>,
123}
124
125impl Callback for PerformTransformFulfillment {
126    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
127        let can_gc = CanGc::from_cx(cx);
128        // Fulfilled: resolve the outer promise
129        self.result_promise.resolve_native(&(), can_gc);
130    }
131}
132
133#[derive(JSTraceable, MallocSizeOf)]
134#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
135/// Reacting to rejection of performTransform as part of
136/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
137struct PerformTransformRejection {
138    #[conditional_malloc_size_of]
139    result_promise: Rc<Promise>,
140}
141
142impl Callback for PerformTransformRejection {
143    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
144        let can_gc = CanGc::from_cx(cx);
145        // Stream already errored in perform_transform, just reject result_promise
146        self.result_promise.reject(cx.into(), v, can_gc);
147    }
148}
149
150#[derive(JSTraceable, MallocSizeOf)]
151#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
152/// Reacting to rejection of backpressureChangePromise as part of
153/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
154struct BackpressureChangeRejection {
155    #[conditional_malloc_size_of]
156    result_promise: Rc<Promise>,
157}
158
159impl Callback for BackpressureChangeRejection {
160    fn callback(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) {
161        let can_gc = CanGc::from_cx(cx);
162        self.result_promise.reject(cx.into(), reason, can_gc);
163    }
164}
165
166impl js::gc::Rootable for CancelPromiseFulfillment {}
167
168/// Reacting to fulfillment of the cancelpromise as part of
169/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-abort-algorithm>
170#[derive(JSTraceable, MallocSizeOf)]
171#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
172struct CancelPromiseFulfillment {
173    readable: Dom<ReadableStream>,
174    controller: Dom<TransformStreamDefaultController>,
175    #[ignore_malloc_size_of = "mozjs"]
176    reason: Box<Heap<JSVal>>,
177}
178
179impl Callback for CancelPromiseFulfillment {
180    /// Reacting to backpressureChangePromise with the following fulfillment steps:
181    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
182        // If readable.[[state]] is "errored", reject controller.[[finishPromise]] with readable.[[storedError]].
183        if self.readable.is_errored() {
184            rooted!(&in(cx) let mut error = UndefinedValue());
185            self.readable.get_stored_error(error.handle_mut());
186            self.controller
187                .get_finish_promise()
188                .expect("finish promise is not set")
189                .reject_native_with_cx(cx, &error.handle());
190        } else {
191            // Otherwise:
192            // Perform ! ReadableStreamDefaultControllerError(readable.[[controller]], reason).
193            rooted!(&in(cx) let mut reason = UndefinedValue());
194            reason.set(self.reason.get());
195            self.readable
196                .get_default_controller()
197                .error(cx, reason.handle());
198
199            // Resolve controller.[[finishPromise]] with undefined.
200            self.controller
201                .get_finish_promise()
202                .expect("finish promise is not set")
203                .resolve_native_with_cx(cx, &());
204        }
205    }
206}
207
208impl js::gc::Rootable for CancelPromiseRejection {}
209
210/// Reacting to rejection of cancelpromise as part of
211/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-abort-algorithm>
212#[derive(JSTraceable, MallocSizeOf)]
213#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
214struct CancelPromiseRejection {
215    readable: Dom<ReadableStream>,
216    controller: Dom<TransformStreamDefaultController>,
217}
218
219impl Callback for CancelPromiseRejection {
220    /// Reacting to backpressureChangePromise with the following fulfillment steps:
221    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
222        // Perform ! ReadableStreamDefaultControllerError(readable.[[controller]], r).
223        self.readable.get_default_controller().error(cx, v);
224
225        // Reject controller.[[finishPromise]] with r.
226        self.controller
227            .get_finish_promise()
228            .expect("finish promise is not set")
229            .reject_with_cx(cx, v);
230    }
231}
232
233impl js::gc::Rootable for SourceCancelPromiseFulfillment {}
234
235/// Reacting to fulfillment of the cancelpromise as part of
236/// <https://streams.spec.whatwg.org/#transform-stream-default-source-cancel>
237#[derive(JSTraceable, MallocSizeOf)]
238#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
239struct SourceCancelPromiseFulfillment {
240    writeable: Dom<WritableStream>,
241    controller: Dom<TransformStreamDefaultController>,
242    stream: Dom<TransformStream>,
243    #[ignore_malloc_size_of = "mozjs"]
244    reason: Box<Heap<JSVal>>,
245}
246
247impl Callback for SourceCancelPromiseFulfillment {
248    /// Reacting to backpressureChangePromise with the following fulfillment steps:
249    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
250        // If cancelPromise was fulfilled, then:
251        let finish_promise = self
252            .controller
253            .get_finish_promise()
254            .expect("finish promise is not set");
255
256        let global = &self.writeable.global();
257        // If writable.[[state]] is "errored", reject controller.[[finishPromise]] with writable.[[storedError]].
258        if self.writeable.is_errored() {
259            rooted!(&in(cx) let mut error = UndefinedValue());
260            self.writeable.get_stored_error(error.handle_mut());
261            finish_promise.reject_with_cx(cx, error.handle());
262        } else {
263            // Otherwise:
264            // Perform ! WritableStreamDefaultControllerErrorIfNeeded(writable.[[controller]], reason).
265            rooted!(&in(cx) let mut reason = UndefinedValue());
266            reason.set(self.reason.get());
267            self.writeable
268                .get_default_controller()
269                .error_if_needed(cx, reason.handle(), global);
270
271            // Perform ! TransformStreamUnblockWrite(stream).
272            self.stream.unblock_write(global, CanGc::from_cx(cx));
273
274            // Resolve controller.[[finishPromise]] with undefined.
275            finish_promise.resolve_native_with_cx(cx, &());
276        }
277    }
278}
279
280impl js::gc::Rootable for SourceCancelPromiseRejection {}
281
282/// Reacting to rejection of cancelpromise as part of
283/// <https://streams.spec.whatwg.org/#transform-stream-default-source-cancel>
284#[derive(JSTraceable, MallocSizeOf)]
285#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
286struct SourceCancelPromiseRejection {
287    writeable: Dom<WritableStream>,
288    controller: Dom<TransformStreamDefaultController>,
289    stream: Dom<TransformStream>,
290}
291
292impl Callback for SourceCancelPromiseRejection {
293    /// Reacting to backpressureChangePromise with the following fulfillment steps:
294    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
295        // Perform ! WritableStreamDefaultControllerErrorIfNeeded(writable.[[controller]], r).
296        let global = &self.writeable.global();
297
298        self.writeable
299            .get_default_controller()
300            .error_if_needed(cx, v, global);
301
302        // Perform ! TransformStreamUnblockWrite(stream).
303        self.stream.unblock_write(global, CanGc::from_cx(cx));
304
305        // Reject controller.[[finishPromise]] with r.
306        self.controller
307            .get_finish_promise()
308            .expect("finish promise is not set")
309            .reject_with_cx(cx, v);
310    }
311}
312
313impl js::gc::Rootable for FlushPromiseFulfillment {}
314
315/// Reacting to fulfillment of the flushpromise as part of
316/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-close-algorithm>
317#[derive(JSTraceable, MallocSizeOf)]
318#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
319struct FlushPromiseFulfillment {
320    readable: Dom<ReadableStream>,
321    controller: Dom<TransformStreamDefaultController>,
322}
323
324impl Callback for FlushPromiseFulfillment {
325    /// Reacting to flushpromise with the following fulfillment steps:
326    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
327        // If flushPromise was fulfilled, then:
328        let finish_promise = self
329            .controller
330            .get_finish_promise()
331            .expect("finish promise is not set");
332
333        // If readable.[[state]] is "errored", reject controller.[[finishPromise]] with readable.[[storedError]].
334        if self.readable.is_errored() {
335            rooted!(&in(cx) let mut error = UndefinedValue());
336            self.readable.get_stored_error(error.handle_mut());
337            finish_promise.reject_with_cx(cx, error.handle());
338        } else {
339            // Otherwise:
340            // Perform ! ReadableStreamDefaultControllerClose(readable.[[controller]]).
341            self.readable.get_default_controller().close(cx);
342
343            // Resolve controller.[[finishPromise]] with undefined.
344            finish_promise.resolve_native_with_cx(cx, &());
345        }
346    }
347}
348
349impl js::gc::Rootable for FlushPromiseRejection {}
350/// Reacting to rejection of flushpromise as part of
351/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-close-algorithm>
352
353#[derive(JSTraceable, MallocSizeOf)]
354#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
355struct FlushPromiseRejection {
356    readable: Dom<ReadableStream>,
357    controller: Dom<TransformStreamDefaultController>,
358}
359
360impl Callback for FlushPromiseRejection {
361    /// Reacting to flushpromise with the following fulfillment steps:
362    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
363        // If flushPromise was rejected with reason r, then:
364        // Perform ! ReadableStreamDefaultControllerError(readable.[[controller]], r).
365        self.readable.get_default_controller().error(cx, v);
366
367        // Reject controller.[[finishPromise]] with r.
368        self.controller
369            .get_finish_promise()
370            .expect("finish promise is not set")
371            .reject_with_cx(cx, v);
372    }
373}
374
375impl js::gc::Rootable for CrossRealmTransform {}
376
377/// A wrapper to handle `message` and `messageerror` events
378/// for the message port used by the transfered stream.
379#[derive(Clone, JSTraceable, MallocSizeOf)]
380#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
381pub(crate) enum CrossRealmTransform {
382    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
383    Readable(CrossRealmTransformReadable),
384    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
385    Writable(CrossRealmTransformWritable),
386}
387
388/// <https://streams.spec.whatwg.org/#ts-class>
389#[dom_struct]
390pub struct TransformStream {
391    reflector_: Reflector,
392
393    /// <https://streams.spec.whatwg.org/#transformstream-backpressure>
394    backpressure: Cell<bool>,
395
396    /// <https://streams.spec.whatwg.org/#transformstream-backpressurechangepromise>
397    #[conditional_malloc_size_of]
398    backpressure_change_promise: DomRefCell<Option<Rc<Promise>>>,
399
400    /// <https://streams.spec.whatwg.org/#transformstream-controller>
401    controller: MutNullableDom<TransformStreamDefaultController>,
402
403    /// <https://streams.spec.whatwg.org/#transformstream-detached>
404    detached: Cell<bool>,
405
406    /// <https://streams.spec.whatwg.org/#transformstream-readable>
407    readable: MutNullableDom<ReadableStream>,
408
409    /// <https://streams.spec.whatwg.org/#transformstream-writable>
410    writable: MutNullableDom<WritableStream>,
411}
412
413impl TransformStream {
414    /// <https://streams.spec.whatwg.org/#initialize-transform-stream>
415    fn new_inherited() -> TransformStream {
416        TransformStream {
417            reflector_: Reflector::new(),
418            backpressure: Default::default(),
419            backpressure_change_promise: DomRefCell::new(None),
420            controller: MutNullableDom::new(None),
421            detached: Cell::new(false),
422            readable: MutNullableDom::new(None),
423            writable: MutNullableDom::new(None),
424        }
425    }
426
427    pub(crate) fn new_with_proto(
428        global: &GlobalScope,
429        proto: Option<SafeHandleObject>,
430        can_gc: CanGc,
431    ) -> DomRoot<TransformStream> {
432        reflect_dom_object_with_proto(
433            Box::new(TransformStream::new_inherited()),
434            global,
435            proto,
436            can_gc,
437        )
438    }
439
440    /// Creates and set up the newly created transform stream following
441    /// <https://streams.spec.whatwg.org/#transformstream-set-up>
442    pub(crate) fn set_up(
443        &self,
444        cx: &mut JSContext,
445        global: &GlobalScope,
446        transformer_type: TransformerType,
447    ) -> Fallible<()> {
448        // Step1. Let writableHighWaterMark be 1.
449        let writable_high_water_mark = 1.0;
450
451        // Step 2. Let writableSizeAlgorithm be an algorithm that returns 1.
452        let writable_size_algorithm =
453            extract_size_algorithm(&Default::default(), CanGc::from_cx(cx));
454
455        // Step 3. Let readableHighWaterMark be 0.
456        let readable_high_water_mark = 0.0;
457
458        // Step 4. Let readableSizeAlgorithm be an algorithm that returns 1.
459        let readable_size_algorithm =
460            extract_size_algorithm(&Default::default(), CanGc::from_cx(cx));
461
462        // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
463        // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
464        // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
465        // NOTE: These steps are implemented in `TransformStreamDefaultController::new`
466
467        // Step 8. Let startPromise be a promise resolved with undefined.
468        let start_promise = Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
469
470        // Step 9. Perform ! InitializeTransformStream(stream, startPromise,
471        // writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark,
472        // readableSizeAlgorithm).
473        self.initialize(
474            cx,
475            global,
476            start_promise,
477            writable_high_water_mark,
478            writable_size_algorithm,
479            readable_high_water_mark,
480            readable_size_algorithm,
481        )?;
482
483        // Step 10. Let controller be a new TransformStreamDefaultController.
484        let controller =
485            TransformStreamDefaultController::new(global, transformer_type, CanGc::from_cx(cx));
486
487        // Step 11. Perform ! SetUpTransformStreamDefaultController(stream,
488        // controller, transformAlgorithmWrapper, flushAlgorithmWrapper,
489        // cancelAlgorithmWrapper).
490        self.set_up_transform_stream_default_controller(&controller);
491
492        Ok(())
493    }
494
495    pub(crate) fn get_controller(&self) -> DomRoot<TransformStreamDefaultController> {
496        self.controller.get().expect("controller is not set")
497    }
498
499    pub(crate) fn get_writable(&self) -> DomRoot<WritableStream> {
500        self.writable.get().expect("writable stream is not set")
501    }
502
503    pub(crate) fn get_readable(&self) -> DomRoot<ReadableStream> {
504        self.readable.get().expect("readable stream is not set")
505    }
506
507    pub(crate) fn get_backpressure(&self) -> bool {
508        self.backpressure.get()
509    }
510
511    /// <https://streams.spec.whatwg.org/#initialize-transform-stream>
512    #[expect(clippy::too_many_arguments)]
513    fn initialize(
514        &self,
515        cx: &mut JSContext,
516        global: &GlobalScope,
517        start_promise: Rc<Promise>,
518        writable_high_water_mark: f64,
519        writable_size_algorithm: Rc<QueuingStrategySize>,
520        readable_high_water_mark: f64,
521        readable_size_algorithm: Rc<QueuingStrategySize>,
522    ) -> Fallible<()> {
523        // Let startAlgorithm be an algorithm that returns startPromise.
524        // Let writeAlgorithm be the following steps, taking a chunk argument:
525        //  Return ! TransformStreamDefaultSinkWriteAlgorithm(stream, chunk).
526        // Let abortAlgorithm be the following steps, taking a reason argument:
527        //  Return ! TransformStreamDefaultSinkAbortAlgorithm(stream, reason).
528        // Let closeAlgorithm be the following steps:
529        //  Return ! TransformStreamDefaultSinkCloseAlgorithm(stream).
530        // Set stream.[[writable]] to ! CreateWritableStream(startAlgorithm, writeAlgorithm,
531        // closeAlgorithm, abortAlgorithm, writableHighWaterMark, writableSizeAlgorithm).
532        // Note: Those steps are implemented using UnderlyingSinkType::Transform.
533
534        let writable = create_writable_stream(
535            cx,
536            global,
537            writable_high_water_mark,
538            writable_size_algorithm,
539            UnderlyingSinkType::Transform(Dom::from_ref(self), start_promise.clone()),
540        )?;
541        self.writable.set(Some(&writable));
542
543        // Let pullAlgorithm be the following steps:
544
545        // Return ! TransformStreamDefaultSourcePullAlgorithm(stream).
546
547        // Let cancelAlgorithm be the following steps, taking a reason argument:
548
549        // Return ! TransformStreamDefaultSourceCancelAlgorithm(stream, reason).
550
551        // Set stream.[[readable]] to ! CreateReadableStream(startAlgorithm, pullAlgorithm,
552        // cancelAlgorithm, readableHighWaterMark, readableSizeAlgorithm).
553
554        let readable = create_readable_stream(
555            cx,
556            global,
557            UnderlyingSourceType::Transform(self, start_promise),
558            Some(readable_size_algorithm),
559            Some(readable_high_water_mark),
560        );
561        self.readable.set(Some(&readable));
562
563        // Set stream.[[backpressure]] and stream.[[backpressureChangePromise]] to undefined.
564        // Note: This is done in the constructor.
565
566        // Perform ! TransformStreamSetBackpressure(stream, true).
567        self.set_backpressure(global, true, CanGc::from_cx(cx));
568
569        // Set stream.[[controller]] to undefined.
570        self.controller.set(None);
571
572        Ok(())
573    }
574
575    /// <https://streams.spec.whatwg.org/#transform-stream-set-backpressure>
576    pub(crate) fn set_backpressure(&self, global: &GlobalScope, backpressure: bool, can_gc: CanGc) {
577        // Assert: stream.[[backpressure]] is not backpressure.
578        assert!(self.backpressure.get() != backpressure);
579
580        // If stream.[[backpressureChangePromise]] is not undefined, resolve
581        // stream.[[backpressureChangePromise]] with undefined.
582        if let Some(promise) = self.backpressure_change_promise.borrow_mut().take() {
583            promise.resolve_native(&(), can_gc);
584        }
585
586        // Set stream.[[backpressureChangePromise]] to a new promise.;
587        *self.backpressure_change_promise.borrow_mut() = Some(Promise::new(global, can_gc));
588
589        // Set stream.[[backpressure]] to backpressure.
590        self.backpressure.set(backpressure);
591    }
592
593    /// <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller>
594    fn set_up_transform_stream_default_controller(
595        &self,
596        controller: &TransformStreamDefaultController,
597    ) {
598        // Assert: stream implements TransformStream.
599        // Note: this is checked with type.
600
601        // Assert: stream.[[controller]] is undefined.
602        assert!(self.controller.get().is_none());
603
604        // Set controller.[[stream]] to stream.
605        controller.set_stream(self);
606
607        // Set stream.[[controller]] to controller.
608        self.controller.set(Some(controller));
609
610        // Set controller.[[transformAlgorithm]] to transformAlgorithm.
611        // Set controller.[[flushAlgorithm]] to flushAlgorithm.
612        // Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
613        // Note: These are set in the constructor.
614    }
615
616    /// <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
617    fn set_up_transform_stream_default_controller_from_transformer(
618        &self,
619        global: &GlobalScope,
620        transformer_obj: SafeHandleObject,
621        transformer: &Transformer,
622        can_gc: CanGc,
623    ) {
624        // Let controller be a new TransformStreamDefaultController.
625        let transformer_type = TransformerType::new_from_js_transformer(transformer);
626        let controller = TransformStreamDefaultController::new(global, transformer_type, can_gc);
627
628        // Let transformAlgorithm be the following steps, taking a chunk argument:
629        // Let result be TransformStreamDefaultControllerEnqueue(controller, chunk).
630        // If result is an abrupt completion, return a promise rejected with result.[[Value]].
631        // Otherwise, return a promise resolved with undefined.
632
633        // Let flushAlgorithm be an algorithm which returns a promise resolved with undefined.
634        // Let cancelAlgorithm be an algorithm which returns a promise resolved with undefined.
635
636        // If transformerDict["transform"] exists, set transformAlgorithm to an algorithm which
637        // takes an argument
638        // chunk and returns the result of invoking transformerDict["transform"] with argument
639        // list « chunk, controller »
640        // and callback this value transformer.
641
642        // If transformerDict["flush"] exists, set flushAlgorithm to an algorithm which returns
643        // the result
644        // of invoking transformerDict["flush"] with argument list « controller » and callback
645        // this value transformer.
646
647        // If transformerDict["cancel"] exists, set cancelAlgorithm to an algorithm which takes an argument
648        // reason and returns the result of invoking transformerDict["cancel"] with argument list « reason »
649        // and callback this value transformer.
650        controller.set_transform_obj(transformer_obj);
651
652        // Perform ! SetUpTransformStreamDefaultController(stream, controller,
653        // transformAlgorithm, flushAlgorithm, cancelAlgorithm).
654        self.set_up_transform_stream_default_controller(&controller);
655    }
656
657    /// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
658    pub(crate) fn transform_stream_default_sink_write_algorithm(
659        &self,
660        cx: &mut JSContext,
661        global: &GlobalScope,
662        chunk: SafeHandleValue,
663    ) -> Fallible<Rc<Promise>> {
664        // Assert: stream.[[writable]].[[state]] is "writable".
665        assert!(self.writable.get().is_some());
666
667        // Let controller be stream.[[controller]].
668        let controller = self.controller.get().expect("controller is not set");
669
670        // If stream.[[backpressure]] is true,
671        if self.backpressure.get() {
672            // Let backpressureChangePromise be stream.[[backpressureChangePromise]].
673            let backpressure_change_promise = self.backpressure_change_promise.borrow();
674
675            // Assert: backpressureChangePromise is not undefined.
676            assert!(backpressure_change_promise.is_some());
677
678            // Return the result of reacting to backpressureChangePromise with the following fulfillment steps:
679            let result_promise = Promise::new2(cx, global);
680            rooted!(&in(cx) let mut fulfillment_handler = Some(TransformBackPressureChangePromiseFulfillment {
681                controller: Dom::from_ref(&controller),
682                writable: Dom::from_ref(&self.writable.get().expect("writable stream")),
683                chunk: Heap::boxed(chunk.get()),
684                result_promise: result_promise.clone(),
685            }));
686
687            let handler = PromiseNativeHandler::new(
688                cx,
689                global,
690                fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
691                Some(Box::new(BackpressureChangeRejection {
692                    result_promise: result_promise.clone(),
693                })),
694            );
695            let mut realm = enter_auto_realm(cx, global);
696            let realm = &mut realm.current_realm();
697            backpressure_change_promise
698                .as_ref()
699                .expect("Promise must be some by now.")
700                .append_native_handler(realm, &handler);
701
702            return Ok(result_promise);
703        }
704
705        // Return ! TransformStreamDefaultControllerPerformTransform(controller, chunk).
706        controller.transform_stream_default_controller_perform_transform(cx, global, chunk)
707    }
708
709    /// <https://streams.spec.whatwg.org/#transform-stream-default-sink-abort-algorithm>
710    pub(crate) fn transform_stream_default_sink_abort_algorithm(
711        &self,
712        cx: &mut JSContext,
713        global: &GlobalScope,
714        reason: SafeHandleValue,
715    ) -> Fallible<Rc<Promise>> {
716        // Let controller be stream.[[controller]].
717        let controller = self.controller.get().expect("controller is not set");
718
719        // If controller.[[finishPromise]] is not undefined, return controller.[[finishPromise]].
720        if let Some(finish_promise) = controller.get_finish_promise() {
721            return Ok(finish_promise);
722        }
723
724        // Let readable be stream.[[readable]].
725        let readable = self.readable.get().expect("readable stream is not set");
726
727        // Let controller.[[finishPromise]] be a new promise.
728        controller.set_finish_promise(Promise::new2(cx, global));
729
730        // Let cancelPromise be the result of performing controller.[[cancelAlgorithm]], passing reason.
731        let cancel_promise = controller.perform_cancel(cx, global, reason)?;
732
733        // Perform ! TransformStreamDefaultControllerClearAlgorithms(controller).
734        controller.clear_algorithms();
735
736        // React to cancelPromise:
737        let handler = PromiseNativeHandler::new(
738            cx,
739            global,
740            Some(Box::new(CancelPromiseFulfillment {
741                readable: Dom::from_ref(&readable),
742                controller: Dom::from_ref(&controller),
743                reason: Heap::boxed(reason.get()),
744            })),
745            Some(Box::new(CancelPromiseRejection {
746                readable: Dom::from_ref(&readable),
747                controller: Dom::from_ref(&controller),
748            })),
749        );
750        let mut realm = enter_auto_realm(cx, global);
751        let cx = &mut realm.current_realm();
752        cancel_promise.append_native_handler(cx, &handler);
753
754        // Return controller.[[finishPromise]].
755        let finish_promise = controller
756            .get_finish_promise()
757            .expect("finish promise is not set");
758        Ok(finish_promise)
759    }
760
761    /// <https://streams.spec.whatwg.org/#transform-stream-default-sink-close-algorithm>
762    pub(crate) fn transform_stream_default_sink_close_algorithm(
763        &self,
764        cx: &mut JSContext,
765        global: &GlobalScope,
766    ) -> Fallible<Rc<Promise>> {
767        // Let controller be stream.[[controller]].
768        let controller = self
769            .controller
770            .get()
771            .ok_or(Error::Type(c"controller is not set".to_owned()))?;
772
773        // If controller.[[finishPromise]] is not undefined, return controller.[[finishPromise]].
774        if let Some(finish_promise) = controller.get_finish_promise() {
775            return Ok(finish_promise);
776        }
777
778        // Let readable be stream.[[readable]].
779        let readable = self
780            .readable
781            .get()
782            .ok_or(Error::Type(c"readable stream is not set".to_owned()))?;
783
784        // Let controller.[[finishPromise]] be a new promise.
785        controller.set_finish_promise(Promise::new2(cx, global));
786
787        // Let flushPromise be the result of performing controller.[[flushAlgorithm]].
788        let flush_promise = controller.perform_flush(cx, global)?;
789
790        // Perform ! TransformStreamDefaultControllerClearAlgorithms(controller).
791        controller.clear_algorithms();
792
793        // React to flushPromise:
794        let handler = PromiseNativeHandler::new(
795            cx,
796            global,
797            Some(Box::new(FlushPromiseFulfillment {
798                readable: Dom::from_ref(&readable),
799                controller: Dom::from_ref(&controller),
800            })),
801            Some(Box::new(FlushPromiseRejection {
802                readable: Dom::from_ref(&readable),
803                controller: Dom::from_ref(&controller),
804            })),
805        );
806
807        let mut realm = enter_auto_realm(cx, global);
808        let realm = &mut realm.current_realm();
809        flush_promise.append_native_handler(realm, &handler);
810        // Return controller.[[finishPromise]].
811        let finish_promise = controller
812            .get_finish_promise()
813            .expect("finish promise is not set");
814        Ok(finish_promise)
815    }
816
817    /// <https://streams.spec.whatwg.org/#transform-stream-default-source-cancel>
818    pub(crate) fn transform_stream_default_source_cancel(
819        &self,
820        cx: &mut JSContext,
821        global: &GlobalScope,
822        reason: SafeHandleValue,
823    ) -> Fallible<Rc<Promise>> {
824        // Let controller be stream.[[controller]].
825        let controller = self
826            .controller
827            .get()
828            .ok_or(Error::Type(c"controller is not set".to_owned()))?;
829
830        // If controller.[[finishPromise]] is not undefined, return controller.[[finishPromise]].
831        if let Some(finish_promise) = controller.get_finish_promise() {
832            return Ok(finish_promise);
833        }
834
835        // Let writable be stream.[[writable]].
836        let writable = self
837            .writable
838            .get()
839            .ok_or(Error::Type(c"writable stream is not set".to_owned()))?;
840
841        // Let controller.[[finishPromise]] be a new promise.
842        controller.set_finish_promise(Promise::new2(cx, global));
843
844        // Let cancelPromise be the result of performing controller.[[cancelAlgorithm]], passing reason.
845        let cancel_promise = controller.perform_cancel(cx, global, reason)?;
846
847        // Perform ! TransformStreamDefaultControllerClearAlgorithms(controller).
848        controller.clear_algorithms();
849
850        // React to cancelPromise:
851        let handler = PromiseNativeHandler::new(
852            cx,
853            global,
854            Some(Box::new(SourceCancelPromiseFulfillment {
855                writeable: Dom::from_ref(&writable),
856                controller: Dom::from_ref(&controller),
857                stream: Dom::from_ref(self),
858                reason: Heap::boxed(reason.get()),
859            })),
860            Some(Box::new(SourceCancelPromiseRejection {
861                writeable: Dom::from_ref(&writable),
862                controller: Dom::from_ref(&controller),
863                stream: Dom::from_ref(self),
864            })),
865        );
866
867        // Return controller.[[finishPromise]].
868        let finish_promise = controller
869            .get_finish_promise()
870            .expect("finish promise is not set");
871        let mut realm = enter_auto_realm(cx, global);
872        let cx = &mut realm.current_realm();
873        cancel_promise.append_native_handler(cx, &handler);
874        Ok(finish_promise)
875    }
876
877    /// <https://streams.spec.whatwg.org/#transform-stream-default-source-pull>
878    pub(crate) fn transform_stream_default_source_pull(
879        &self,
880        global: &GlobalScope,
881        can_gc: CanGc,
882    ) -> Fallible<Rc<Promise>> {
883        // Assert: stream.[[backpressure]] is true.
884        assert!(self.backpressure.get());
885
886        // Assert: stream.[[backpressureChangePromise]] is not undefined.
887        assert!(self.backpressure_change_promise.borrow().is_some());
888
889        // Perform ! TransformStreamSetBackpressure(stream, false).
890        self.set_backpressure(global, false, can_gc);
891
892        // Return stream.[[backpressureChangePromise]].
893        Ok(self
894            .backpressure_change_promise
895            .borrow()
896            .clone()
897            .expect("Promise must be some by now."))
898    }
899
900    /// <https://streams.spec.whatwg.org/#transform-stream-error-writable-and-unblock-write>
901    pub(crate) fn error_writable_and_unblock_write(
902        &self,
903        cx: &mut JSContext,
904        global: &GlobalScope,
905        error: SafeHandleValue,
906    ) {
907        // Perform ! TransformStreamDefaultControllerClearAlgorithms(stream.[[controller]]).
908        self.get_controller().clear_algorithms();
909
910        // Perform ! WritableStreamDefaultControllerErrorIfNeeded(stream.[[writable]].[[controller]], e).
911        self.get_writable()
912            .get_default_controller()
913            .error_if_needed(cx, error, global);
914
915        // Perform ! TransformStreamUnblockWrite(stream).
916        self.unblock_write(global, CanGc::from_cx(cx))
917    }
918
919    /// <https://streams.spec.whatwg.org/#transform-stream-unblock-write>
920    pub(crate) fn unblock_write(&self, global: &GlobalScope, can_gc: CanGc) {
921        // If stream.[[backpressure]] is true, perform ! TransformStreamSetBackpressure(stream, false).
922        if self.backpressure.get() {
923            self.set_backpressure(global, false, can_gc);
924        }
925    }
926
927    /// <https://streams.spec.whatwg.org/#transform-stream-error>
928    pub(crate) fn error(&self, cx: &mut JSContext, global: &GlobalScope, error: SafeHandleValue) {
929        // Perform ! ReadableStreamDefaultControllerError(stream.[[readable]].[[controller]], e).
930        self.get_readable()
931            .get_default_controller()
932            .error(cx, error);
933
934        // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, e).
935        self.error_writable_and_unblock_write(cx, global, error);
936    }
937}
938
939impl TransformStreamMethods<crate::DomTypeHolder> for TransformStream {
940    /// <https://streams.spec.whatwg.org/#ts-constructor>
941    #[expect(unsafe_code)]
942    fn Constructor(
943        cx: &mut JSContext,
944        global: &GlobalScope,
945        proto: Option<SafeHandleObject>,
946        transformer: Option<*mut JSObject>,
947        writable_strategy: &QueuingStrategy,
948        readable_strategy: &QueuingStrategy,
949    ) -> Fallible<DomRoot<TransformStream>> {
950        // If transformer is missing, set it to null.
951        rooted!(&in(cx) let transformer_obj = transformer.unwrap_or(ptr::null_mut()));
952
953        // Let underlyingSinkDict be underlyingSink,
954        // converted to an IDL value of type UnderlyingSink.
955        let transformer_dict = if !transformer_obj.is_null() {
956            rooted!(&in(cx) let obj_val = ObjectValue(transformer_obj.get()));
957            match Transformer::new(cx, obj_val.handle()) {
958                Ok(ConversionResult::Success(val)) => val,
959                Ok(ConversionResult::Failure(error)) => {
960                    return Err(Error::Type(error.into_owned()));
961                },
962                _ => {
963                    return Err(Error::JSFailed);
964                },
965            }
966        } else {
967            Transformer::empty()
968        };
969
970        // If transformerDict["readableType"] exists, throw a RangeError exception.
971        if !transformer_dict.readableType.handle().is_undefined() {
972            return Err(Error::Range(c"readableType is set".to_owned()));
973        }
974
975        // If transformerDict["writableType"] exists, throw a RangeError exception.
976        if !transformer_dict.writableType.handle().is_undefined() {
977            return Err(Error::Range(c"writableType is set".to_owned()));
978        }
979
980        // Let readableHighWaterMark be ? ExtractHighWaterMark(readableStrategy, 0).
981        let readable_high_water_mark = extract_high_water_mark(readable_strategy, 0.0)?;
982
983        // Let readableSizeAlgorithm be ! ExtractSizeAlgorithm(readableStrategy).
984        let readable_size_algorithm = extract_size_algorithm(readable_strategy, CanGc::from_cx(cx));
985
986        // Let writableHighWaterMark be ? ExtractHighWaterMark(writableStrategy, 1).
987        let writable_high_water_mark = extract_high_water_mark(writable_strategy, 1.0)?;
988
989        // Let writableSizeAlgorithm be ! ExtractSizeAlgorithm(writableStrategy).
990        let writable_size_algorithm = extract_size_algorithm(writable_strategy, CanGc::from_cx(cx));
991
992        // Let startPromise be a new promise.
993        let start_promise = Promise::new2(cx, global);
994
995        // Perform ! InitializeTransformStream(this, startPromise, writableHighWaterMark,
996        // writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm).
997        let stream = TransformStream::new_with_proto(global, proto, CanGc::from_cx(cx));
998        stream.initialize(
999            cx,
1000            global,
1001            start_promise.clone(),
1002            writable_high_water_mark,
1003            writable_size_algorithm,
1004            readable_high_water_mark,
1005            readable_size_algorithm,
1006        )?;
1007
1008        // Perform ? SetUpTransformStreamDefaultControllerFromTransformer(this, transformer, transformerDict).
1009        stream.set_up_transform_stream_default_controller_from_transformer(
1010            global,
1011            transformer_obj.handle(),
1012            &transformer_dict,
1013            CanGc::from_cx(cx),
1014        );
1015
1016        // If transformerDict["start"] exists, then resolve startPromise with the
1017        // result of invoking transformerDict["start"]
1018        // with argument list « this.[[controller]] » and callback this value transformer.
1019        if let Some(start) = &transformer_dict.start {
1020            rooted!(&in(cx) let mut result_object = ptr::null_mut::<JSObject>());
1021            rooted!(&in(cx) let mut result: JSVal);
1022            rooted!(&in(cx) let this_object = transformer_obj.get());
1023            start.Call_(
1024                cx,
1025                &this_object.handle(),
1026                &stream.get_controller(),
1027                result.handle_mut(),
1028                ExceptionHandling::Rethrow,
1029            )?;
1030            let is_promise = unsafe {
1031                if result.is_object() {
1032                    result_object.set(result.to_object());
1033                    IsPromiseObject(result_object.handle().into_handle())
1034                } else {
1035                    false
1036                }
1037            };
1038            let promise = if is_promise {
1039                Promise::new_with_js_promise(result_object.handle(), cx.into())
1040            } else {
1041                Promise::new_resolved(global, cx.into(), result.get(), CanGc::from_cx(cx))
1042            };
1043            start_promise.resolve_native_with_cx(cx, &promise);
1044        } else {
1045            // Otherwise, resolve startPromise with undefined.
1046            start_promise.resolve_native_with_cx(cx, &());
1047        };
1048
1049        Ok(stream)
1050    }
1051
1052    /// <https://streams.spec.whatwg.org/#ts-readable>
1053    fn Readable(&self) -> DomRoot<ReadableStream> {
1054        // Return this.[[readable]].
1055        self.readable.get().expect("readable stream is not set")
1056    }
1057
1058    /// <https://streams.spec.whatwg.org/#ts-writable>
1059    fn Writable(&self) -> DomRoot<WritableStream> {
1060        // Return this.[[writable]].
1061        self.writable.get().expect("writable stream is not set")
1062    }
1063}
1064
1065/// <https://streams.spec.whatwg.org/#ts-transfer>
1066impl Transferable for TransformStream {
1067    type Index = MessagePortIndex;
1068    type Data = TransformStreamData;
1069
1070    /// <https://streams.spec.whatwg.org/#ref-for-transfer-steps②>
1071    fn transfer(&self, cx: &mut JSContext) -> Fallible<(MessagePortId, TransformStreamData)> {
1072        let global = self.global();
1073        let mut realm = enter_auto_realm(cx, &*global);
1074        let mut realm = realm.current_realm();
1075        let cx = &mut realm;
1076
1077        // Step 1. Let readable be value.[[readable]].
1078        let readable = self.get_readable();
1079
1080        // Step 2. Let writable be value.[[writable]].
1081        let writable = self.get_writable();
1082
1083        // Step 3. If ! IsReadableStreamLocked(readable) is true, throw a
1084        // "DataCloneError" DOMException.
1085        // Step 4. If ! IsWritableStreamLocked(writable) is true, throw a
1086        // "DataCloneError" DOMException.
1087        if readable.is_locked() || writable.is_locked() {
1088            return Err(Error::DataClone(None));
1089        }
1090
1091        // First port pair (readable → proxy writable)
1092        let port1 = MessagePort::new(&global, CanGc::from_cx(cx));
1093        global.track_message_port(&port1, None);
1094        let port1_peer = MessagePort::new(&global, CanGc::from_cx(cx));
1095        global.track_message_port(&port1_peer, None);
1096        global.entangle_ports(*port1.message_port_id(), *port1_peer.message_port_id());
1097
1098        let proxy_readable = ReadableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
1099        proxy_readable.setup_cross_realm_transform_readable(cx, &port1);
1100        proxy_readable
1101            .pipe_to(cx, &global, &writable, false, false, false, None)
1102            .set_promise_is_handled();
1103
1104        // Second port pair (proxy readable → writable)
1105        let port2 = MessagePort::new(&global, CanGc::from_cx(cx));
1106        global.track_message_port(&port2, None);
1107        let port2_peer = MessagePort::new(&global, CanGc::from_cx(cx));
1108        global.track_message_port(&port2_peer, None);
1109        global.entangle_ports(*port2.message_port_id(), *port2_peer.message_port_id());
1110
1111        let proxy_writable = WritableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
1112        proxy_writable.setup_cross_realm_transform_writable(cx, &port2);
1113
1114        // Pipe readable into the proxy writable (→ port_1)
1115        readable
1116            .pipe_to(cx, &global, &proxy_writable, false, false, false, None)
1117            .set_promise_is_handled();
1118
1119        // Step 5. Set dataHolder.[[readable]] to !
1120        // StructuredSerializeWithTransfer(readable, « readable »).
1121        // Step 6. Set dataHolder.[[writable]] to !
1122        // StructuredSerializeWithTransfer(writable, « writable »).
1123        Ok((
1124            *port1_peer.message_port_id(),
1125            TransformStreamData {
1126                readable: port1_peer.transfer(cx)?,
1127                writable: port2_peer.transfer(cx)?,
1128            },
1129        ))
1130    }
1131
1132    /// <https://streams.spec.whatwg.org/#ref-for-transfer-receiving-steps②>
1133    fn transfer_receive(
1134        cx: &mut JSContext,
1135        owner: &GlobalScope,
1136        _id: MessagePortId,
1137        data: TransformStreamData,
1138    ) -> Result<DomRoot<Self>, ()> {
1139        let port1 = MessagePort::transfer_receive(cx, owner, data.readable.0, data.readable.1)?;
1140        let port2 = MessagePort::transfer_receive(cx, owner, data.writable.0, data.writable.1)?;
1141
1142        // Step 1. Let readableRecord be !
1143        // StructuredDeserializeWithTransfer(dataHolder.[[readable]], the
1144        // current Realm).
1145        let proxy_readable = ReadableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1146        proxy_readable.setup_cross_realm_transform_readable(cx, &port2);
1147
1148        // Step 2. Let writableRecord be !
1149        // StructuredDeserializeWithTransfer(dataHolder.[[writable]], the
1150        // current Realm).
1151        let proxy_writable = WritableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1152        proxy_writable.setup_cross_realm_transform_writable(cx, &port1);
1153
1154        // Step 3. Set value.[[readable]] to readableRecord.[[Deserialized]].
1155        // Step 4. Set value.[[writable]] to writableRecord.[[Deserialized]].
1156        // Step 5. Set value.[[backpressure]],
1157        // value.[[backpressureChangePromise]], and value.[[controller]] to
1158        // undefined.
1159        let stream = TransformStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1160        stream.readable.set(Some(&proxy_readable));
1161        stream.writable.set(Some(&proxy_writable));
1162
1163        Ok(stream)
1164    }
1165
1166    fn serialized_storage<'a>(
1167        data: StructuredData<'a, '_>,
1168    ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
1169        match data {
1170            StructuredData::Reader(r) => &mut r.transform_streams_port_impls,
1171            StructuredData::Writer(w) => &mut w.transform_streams_port,
1172        }
1173    }
1174}