Skip to main content

script/dom/stream/
transformstream.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::ptr::{self};
7use std::rc::Rc;
8
9use dom_struct::dom_struct;
10use js::context::JSContext;
11use js::jsapi::{Heap, IsPromiseObject, JSObject};
12use js::jsval::{JSVal, ObjectValue, UndefinedValue};
13use js::realm::CurrentRealm;
14use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle};
15use rustc_hash::FxHashMap;
16use script_bindings::callback::ExceptionHandling;
17use script_bindings::cell::DomRefCell;
18use script_bindings::reflector::{Reflector, reflect_dom_object_with_proto};
19use servo_base::id::{MessagePortId, MessagePortIndex};
20use servo_constellation_traits::TransformStreamData;
21
22use super::readablestream::CrossRealmTransformReadable;
23use super::writablestream::CrossRealmTransformWritable;
24use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::{
25    QueuingStrategy, QueuingStrategySize,
26};
27use crate::dom::bindings::codegen::Bindings::TransformStreamBinding::TransformStreamMethods;
28use crate::dom::bindings::codegen::Bindings::TransformerBinding::Transformer;
29use crate::dom::bindings::conversions::ConversionResult;
30use crate::dom::bindings::error::{Error, Fallible};
31use crate::dom::bindings::reflector::DomGlobal;
32use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
33use crate::dom::bindings::structuredclone::StructuredData;
34use crate::dom::bindings::transferable::Transferable;
35use crate::dom::globalscope::GlobalScope;
36use crate::dom::messageport::MessagePort;
37use crate::dom::promise::Promise;
38use crate::dom::promisenativehandler::Callback;
39use crate::dom::readablestream::{ReadableStream, create_readable_stream};
40use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
41use crate::dom::stream::transformstreamdefaultcontroller::TransformerType;
42use crate::dom::stream::underlyingsourcecontainer::UnderlyingSourceType;
43use crate::dom::stream::writablestream::create_writable_stream;
44use crate::dom::stream::writablestreamdefaultcontroller::UnderlyingSinkType;
45use crate::dom::types::{PromiseNativeHandler, TransformStreamDefaultController, WritableStream};
46use crate::realms::enter_auto_realm;
47use crate::script_runtime::CanGc;
48
49impl js::gc::Rootable for TransformBackPressureChangePromiseFulfillment {}
50
51/// Reacting to backpressureChangePromise as part of
52/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
53#[derive(JSTraceable, MallocSizeOf)]
54#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
55struct TransformBackPressureChangePromiseFulfillment {
56    /// The result of reacting to backpressureChangePromise.
57    #[conditional_malloc_size_of]
58    result_promise: Rc<Promise>,
59
60    #[ignore_malloc_size_of = "mozjs"]
61    chunk: Box<Heap<JSVal>>,
62
63    /// The writable used in the fulfillment steps
64    writable: Dom<WritableStream>,
65
66    controller: Dom<TransformStreamDefaultController>,
67}
68
69impl Callback for TransformBackPressureChangePromiseFulfillment {
70    /// Reacting to backpressureChangePromise with the following fulfillment steps:
71    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
72        // Let writable be stream.[[writable]].
73        // Let state be writable.[[state]].
74        // If state is "erroring", throw writable.[[storedError]].
75        if self.writable.is_erroring() {
76            rooted!(&in(cx) let mut error = UndefinedValue());
77            self.writable.get_stored_error(error.handle_mut());
78            self.result_promise
79                .reject(cx.into(), error.handle(), CanGc::from_cx(cx));
80            return;
81        }
82
83        // Assert: state is "writable".
84        assert!(self.writable.is_writable());
85
86        // Return ! TransformStreamDefaultControllerPerformTransform(controller, chunk).
87        rooted!(&in(cx) let mut chunk = UndefinedValue());
88        chunk.set(self.chunk.get());
89        let transform_result = self
90            .controller
91            .transform_stream_default_controller_perform_transform(
92                cx,
93                &self.writable.global(),
94                chunk.handle(),
95            )
96            .expect("perform transform failed");
97
98        // PerformTransformFulfillment and PerformTransformRejection do not need
99        // to be rooted because they only contain an Rc.
100        let handler = PromiseNativeHandler::new(
101            &self.writable.global(),
102            Some(Box::new(PerformTransformFulfillment {
103                result_promise: self.result_promise.clone(),
104            })),
105            Some(Box::new(PerformTransformRejection {
106                result_promise: self.result_promise.clone(),
107            })),
108            CanGc::from_cx(cx),
109        );
110
111        let mut realm = enter_auto_realm(cx, &*self.writable.global());
112        let realm = &mut realm.current_realm();
113        transform_result.append_native_handler(realm, &handler);
114    }
115}
116
117#[derive(JSTraceable, MallocSizeOf)]
118#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
119/// Reacting to fulfillment of performTransform as part of
120/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
121struct PerformTransformFulfillment {
122    #[conditional_malloc_size_of]
123    result_promise: Rc<Promise>,
124}
125
126impl Callback for PerformTransformFulfillment {
127    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
128        let can_gc = CanGc::from_cx(cx);
129        // Fulfilled: resolve the outer promise
130        self.result_promise.resolve_native(&(), can_gc);
131    }
132}
133
134#[derive(JSTraceable, MallocSizeOf)]
135#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
136/// Reacting to rejection of performTransform as part of
137/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
138struct PerformTransformRejection {
139    #[conditional_malloc_size_of]
140    result_promise: Rc<Promise>,
141}
142
143impl Callback for PerformTransformRejection {
144    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
145        let can_gc = CanGc::from_cx(cx);
146        // Stream already errored in perform_transform, just reject result_promise
147        self.result_promise.reject(cx.into(), v, can_gc);
148    }
149}
150
151#[derive(JSTraceable, MallocSizeOf)]
152#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
153/// Reacting to rejection of backpressureChangePromise as part of
154/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
155struct BackpressureChangeRejection {
156    #[conditional_malloc_size_of]
157    result_promise: Rc<Promise>,
158}
159
160impl Callback for BackpressureChangeRejection {
161    fn callback(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) {
162        let can_gc = CanGc::from_cx(cx);
163        self.result_promise.reject(cx.into(), reason, can_gc);
164    }
165}
166
167impl js::gc::Rootable for CancelPromiseFulfillment {}
168
169/// Reacting to fulfillment of the cancelpromise as part of
170/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-abort-algorithm>
171#[derive(JSTraceable, MallocSizeOf)]
172#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
173struct CancelPromiseFulfillment {
174    readable: Dom<ReadableStream>,
175    controller: Dom<TransformStreamDefaultController>,
176    #[ignore_malloc_size_of = "mozjs"]
177    reason: Box<Heap<JSVal>>,
178}
179
180impl Callback for CancelPromiseFulfillment {
181    /// Reacting to backpressureChangePromise with the following fulfillment steps:
182    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
183        // If readable.[[state]] is "errored", reject controller.[[finishPromise]] with readable.[[storedError]].
184        if self.readable.is_errored() {
185            rooted!(&in(cx) let mut error = UndefinedValue());
186            self.readable.get_stored_error(error.handle_mut());
187            self.controller
188                .get_finish_promise()
189                .expect("finish promise is not set")
190                .reject_native(&error.handle(), CanGc::from_cx(cx));
191        } else {
192            // Otherwise:
193            // Perform ! ReadableStreamDefaultControllerError(readable.[[controller]], reason).
194            rooted!(&in(cx) let mut reason = UndefinedValue());
195            reason.set(self.reason.get());
196            self.readable
197                .get_default_controller()
198                .error(cx, reason.handle());
199
200            // Resolve controller.[[finishPromise]] with undefined.
201            self.controller
202                .get_finish_promise()
203                .expect("finish promise is not set")
204                .resolve_native(&(), CanGc::from_cx(cx));
205        }
206    }
207}
208
209impl js::gc::Rootable for CancelPromiseRejection {}
210
211/// Reacting to rejection of cancelpromise as part of
212/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-abort-algorithm>
213#[derive(JSTraceable, MallocSizeOf)]
214#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
215struct CancelPromiseRejection {
216    readable: Dom<ReadableStream>,
217    controller: Dom<TransformStreamDefaultController>,
218}
219
220impl Callback for CancelPromiseRejection {
221    /// Reacting to backpressureChangePromise with the following fulfillment steps:
222    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
223        // Perform ! ReadableStreamDefaultControllerError(readable.[[controller]], r).
224        self.readable.get_default_controller().error(cx, v);
225
226        // Reject controller.[[finishPromise]] with r.
227        self.controller
228            .get_finish_promise()
229            .expect("finish promise is not set")
230            .reject(cx.into(), v, CanGc::from_cx(cx));
231    }
232}
233
234impl js::gc::Rootable for SourceCancelPromiseFulfillment {}
235
236/// Reacting to fulfillment of the cancelpromise as part of
237/// <https://streams.spec.whatwg.org/#transform-stream-default-source-cancel>
238#[derive(JSTraceable, MallocSizeOf)]
239#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
240struct SourceCancelPromiseFulfillment {
241    writeable: Dom<WritableStream>,
242    controller: Dom<TransformStreamDefaultController>,
243    stream: Dom<TransformStream>,
244    #[ignore_malloc_size_of = "mozjs"]
245    reason: Box<Heap<JSVal>>,
246}
247
248impl Callback for SourceCancelPromiseFulfillment {
249    /// Reacting to backpressureChangePromise with the following fulfillment steps:
250    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
251        // If cancelPromise was fulfilled, then:
252        let finish_promise = self
253            .controller
254            .get_finish_promise()
255            .expect("finish promise is not set");
256
257        let global = &self.writeable.global();
258        // If writable.[[state]] is "errored", reject controller.[[finishPromise]] with writable.[[storedError]].
259        if self.writeable.is_errored() {
260            rooted!(&in(cx) let mut error = UndefinedValue());
261            self.writeable.get_stored_error(error.handle_mut());
262            finish_promise.reject(cx.into(), error.handle(), CanGc::from_cx(cx));
263        } else {
264            // Otherwise:
265            // Perform ! WritableStreamDefaultControllerErrorIfNeeded(writable.[[controller]], reason).
266            rooted!(&in(cx) let mut reason = UndefinedValue());
267            reason.set(self.reason.get());
268            self.writeable
269                .get_default_controller()
270                .error_if_needed(cx, reason.handle(), global);
271
272            // Perform ! TransformStreamUnblockWrite(stream).
273            self.stream.unblock_write(global, CanGc::from_cx(cx));
274
275            // Resolve controller.[[finishPromise]] with undefined.
276            finish_promise.resolve_native(&(), CanGc::from_cx(cx));
277        }
278    }
279}
280
281impl js::gc::Rootable for SourceCancelPromiseRejection {}
282
283/// Reacting to rejection of cancelpromise as part of
284/// <https://streams.spec.whatwg.org/#transform-stream-default-source-cancel>
285#[derive(JSTraceable, MallocSizeOf)]
286#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
287struct SourceCancelPromiseRejection {
288    writeable: Dom<WritableStream>,
289    controller: Dom<TransformStreamDefaultController>,
290    stream: Dom<TransformStream>,
291}
292
293impl Callback for SourceCancelPromiseRejection {
294    /// Reacting to backpressureChangePromise with the following fulfillment steps:
295    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
296        // Perform ! WritableStreamDefaultControllerErrorIfNeeded(writable.[[controller]], r).
297        let global = &self.writeable.global();
298
299        self.writeable
300            .get_default_controller()
301            .error_if_needed(cx, v, global);
302
303        // Perform ! TransformStreamUnblockWrite(stream).
304        self.stream.unblock_write(global, CanGc::from_cx(cx));
305
306        // Reject controller.[[finishPromise]] with r.
307        self.controller
308            .get_finish_promise()
309            .expect("finish promise is not set")
310            .reject(cx.into(), v, CanGc::from_cx(cx));
311    }
312}
313
314impl js::gc::Rootable for FlushPromiseFulfillment {}
315
316/// Reacting to fulfillment of the flushpromise as part of
317/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-close-algorithm>
318#[derive(JSTraceable, MallocSizeOf)]
319#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
320struct FlushPromiseFulfillment {
321    readable: Dom<ReadableStream>,
322    controller: Dom<TransformStreamDefaultController>,
323}
324
325impl Callback for FlushPromiseFulfillment {
326    /// Reacting to flushpromise with the following fulfillment steps:
327    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
328        // If flushPromise was fulfilled, then:
329        let finish_promise = self
330            .controller
331            .get_finish_promise()
332            .expect("finish promise is not set");
333
334        // If readable.[[state]] is "errored", reject controller.[[finishPromise]] with readable.[[storedError]].
335        if self.readable.is_errored() {
336            rooted!(&in(cx) let mut error = UndefinedValue());
337            self.readable.get_stored_error(error.handle_mut());
338            finish_promise.reject(cx.into(), error.handle(), CanGc::from_cx(cx));
339        } else {
340            // Otherwise:
341            // Perform ! ReadableStreamDefaultControllerClose(readable.[[controller]]).
342            self.readable.get_default_controller().close(cx);
343
344            // Resolve controller.[[finishPromise]] with undefined.
345            finish_promise.resolve_native(&(), CanGc::from_cx(cx));
346        }
347    }
348}
349
350impl js::gc::Rootable for FlushPromiseRejection {}
351/// Reacting to rejection of flushpromise as part of
352/// <https://streams.spec.whatwg.org/#transform-stream-default-sink-close-algorithm>
353
354#[derive(JSTraceable, MallocSizeOf)]
355#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
356struct FlushPromiseRejection {
357    readable: Dom<ReadableStream>,
358    controller: Dom<TransformStreamDefaultController>,
359}
360
361impl Callback for FlushPromiseRejection {
362    /// Reacting to flushpromise with the following fulfillment steps:
363    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
364        // If flushPromise was rejected with reason r, then:
365        // Perform ! ReadableStreamDefaultControllerError(readable.[[controller]], r).
366        self.readable.get_default_controller().error(cx, v);
367
368        // Reject controller.[[finishPromise]] with r.
369        self.controller
370            .get_finish_promise()
371            .expect("finish promise is not set")
372            .reject(cx.into(), v, CanGc::from_cx(cx));
373    }
374}
375
376impl js::gc::Rootable for CrossRealmTransform {}
377
378/// A wrapper to handle `message` and `messageerror` events
379/// for the message port used by the transfered stream.
380#[derive(Clone, JSTraceable, MallocSizeOf)]
381#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
382pub(crate) enum CrossRealmTransform {
383    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
384    Readable(CrossRealmTransformReadable),
385    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
386    Writable(CrossRealmTransformWritable),
387}
388
389/// <https://streams.spec.whatwg.org/#ts-class>
390#[dom_struct]
391pub struct TransformStream {
392    reflector_: Reflector,
393
394    /// <https://streams.spec.whatwg.org/#transformstream-backpressure>
395    backpressure: Cell<bool>,
396
397    /// <https://streams.spec.whatwg.org/#transformstream-backpressurechangepromise>
398    #[conditional_malloc_size_of]
399    backpressure_change_promise: DomRefCell<Option<Rc<Promise>>>,
400
401    /// <https://streams.spec.whatwg.org/#transformstream-controller>
402    controller: MutNullableDom<TransformStreamDefaultController>,
403
404    /// <https://streams.spec.whatwg.org/#transformstream-detached>
405    detached: Cell<bool>,
406
407    /// <https://streams.spec.whatwg.org/#transformstream-readable>
408    readable: MutNullableDom<ReadableStream>,
409
410    /// <https://streams.spec.whatwg.org/#transformstream-writable>
411    writable: MutNullableDom<WritableStream>,
412}
413
414impl TransformStream {
415    /// <https://streams.spec.whatwg.org/#initialize-transform-stream>
416    fn new_inherited() -> TransformStream {
417        TransformStream {
418            reflector_: Reflector::new(),
419            backpressure: Default::default(),
420            backpressure_change_promise: DomRefCell::new(None),
421            controller: MutNullableDom::new(None),
422            detached: Cell::new(false),
423            readable: MutNullableDom::new(None),
424            writable: MutNullableDom::new(None),
425        }
426    }
427
428    pub(crate) fn new_with_proto(
429        global: &GlobalScope,
430        proto: Option<SafeHandleObject>,
431        can_gc: CanGc,
432    ) -> DomRoot<TransformStream> {
433        reflect_dom_object_with_proto(
434            Box::new(TransformStream::new_inherited()),
435            global,
436            proto,
437            can_gc,
438        )
439    }
440
441    /// Creates and set up the newly created transform stream following
442    /// <https://streams.spec.whatwg.org/#transformstream-set-up>
443    pub(crate) fn set_up(
444        &self,
445        cx: &mut JSContext,
446        global: &GlobalScope,
447        transformer_type: TransformerType,
448    ) -> Fallible<()> {
449        // Step1. Let writableHighWaterMark be 1.
450        let writable_high_water_mark = 1.0;
451
452        // Step 2. Let writableSizeAlgorithm be an algorithm that returns 1.
453        let writable_size_algorithm =
454            extract_size_algorithm(&Default::default(), CanGc::from_cx(cx));
455
456        // Step 3. Let readableHighWaterMark be 0.
457        let readable_high_water_mark = 0.0;
458
459        // Step 4. Let readableSizeAlgorithm be an algorithm that returns 1.
460        let readable_size_algorithm =
461            extract_size_algorithm(&Default::default(), CanGc::from_cx(cx));
462
463        // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
464        // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
465        // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
466        // NOTE: These steps are implemented in `TransformStreamDefaultController::new`
467
468        // Step 8. Let startPromise be a promise resolved with undefined.
469        let start_promise = Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
470
471        // Step 9. Perform ! InitializeTransformStream(stream, startPromise,
472        // writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark,
473        // readableSizeAlgorithm).
474        self.initialize(
475            cx,
476            global,
477            start_promise,
478            writable_high_water_mark,
479            writable_size_algorithm,
480            readable_high_water_mark,
481            readable_size_algorithm,
482        )?;
483
484        // Step 10. Let controller be a new TransformStreamDefaultController.
485        let controller =
486            TransformStreamDefaultController::new(global, transformer_type, CanGc::from_cx(cx));
487
488        // Step 11. Perform ! SetUpTransformStreamDefaultController(stream,
489        // controller, transformAlgorithmWrapper, flushAlgorithmWrapper,
490        // cancelAlgorithmWrapper).
491        self.set_up_transform_stream_default_controller(&controller);
492
493        Ok(())
494    }
495
496    pub(crate) fn get_controller(&self) -> DomRoot<TransformStreamDefaultController> {
497        self.controller.get().expect("controller is not set")
498    }
499
500    pub(crate) fn get_writable(&self) -> DomRoot<WritableStream> {
501        self.writable.get().expect("writable stream is not set")
502    }
503
504    pub(crate) fn get_readable(&self) -> DomRoot<ReadableStream> {
505        self.readable.get().expect("readable stream is not set")
506    }
507
508    pub(crate) fn get_backpressure(&self) -> bool {
509        self.backpressure.get()
510    }
511
512    /// <https://streams.spec.whatwg.org/#initialize-transform-stream>
513    #[expect(clippy::too_many_arguments)]
514    fn initialize(
515        &self,
516        cx: &mut JSContext,
517        global: &GlobalScope,
518        start_promise: Rc<Promise>,
519        writable_high_water_mark: f64,
520        writable_size_algorithm: Rc<QueuingStrategySize>,
521        readable_high_water_mark: f64,
522        readable_size_algorithm: Rc<QueuingStrategySize>,
523    ) -> Fallible<()> {
524        // Let startAlgorithm be an algorithm that returns startPromise.
525        // Let writeAlgorithm be the following steps, taking a chunk argument:
526        //  Return ! TransformStreamDefaultSinkWriteAlgorithm(stream, chunk).
527        // Let abortAlgorithm be the following steps, taking a reason argument:
528        //  Return ! TransformStreamDefaultSinkAbortAlgorithm(stream, reason).
529        // Let closeAlgorithm be the following steps:
530        //  Return ! TransformStreamDefaultSinkCloseAlgorithm(stream).
531        // Set stream.[[writable]] to ! CreateWritableStream(startAlgorithm, writeAlgorithm,
532        // closeAlgorithm, abortAlgorithm, writableHighWaterMark, writableSizeAlgorithm).
533        // Note: Those steps are implemented using UnderlyingSinkType::Transform.
534
535        let writable = create_writable_stream(
536            cx,
537            global,
538            writable_high_water_mark,
539            writable_size_algorithm,
540            UnderlyingSinkType::Transform(Dom::from_ref(self), start_promise.clone()),
541        )?;
542        self.writable.set(Some(&writable));
543
544        // Let pullAlgorithm be the following steps:
545
546        // Return ! TransformStreamDefaultSourcePullAlgorithm(stream).
547
548        // Let cancelAlgorithm be the following steps, taking a reason argument:
549
550        // Return ! TransformStreamDefaultSourceCancelAlgorithm(stream, reason).
551
552        // Set stream.[[readable]] to ! CreateReadableStream(startAlgorithm, pullAlgorithm,
553        // cancelAlgorithm, readableHighWaterMark, readableSizeAlgorithm).
554
555        let readable = create_readable_stream(
556            cx,
557            global,
558            UnderlyingSourceType::Transform(self, start_promise),
559            Some(readable_size_algorithm),
560            Some(readable_high_water_mark),
561        );
562        self.readable.set(Some(&readable));
563
564        // Set stream.[[backpressure]] and stream.[[backpressureChangePromise]] to undefined.
565        // Note: This is done in the constructor.
566
567        // Perform ! TransformStreamSetBackpressure(stream, true).
568        self.set_backpressure(global, true, CanGc::from_cx(cx));
569
570        // Set stream.[[controller]] to undefined.
571        self.controller.set(None);
572
573        Ok(())
574    }
575
576    /// <https://streams.spec.whatwg.org/#transform-stream-set-backpressure>
577    pub(crate) fn set_backpressure(&self, global: &GlobalScope, backpressure: bool, can_gc: CanGc) {
578        // Assert: stream.[[backpressure]] is not backpressure.
579        assert!(self.backpressure.get() != backpressure);
580
581        // If stream.[[backpressureChangePromise]] is not undefined, resolve
582        // stream.[[backpressureChangePromise]] with undefined.
583        if let Some(promise) = self.backpressure_change_promise.borrow_mut().take() {
584            promise.resolve_native(&(), can_gc);
585        }
586
587        // Set stream.[[backpressureChangePromise]] to a new promise.;
588        *self.backpressure_change_promise.borrow_mut() = Some(Promise::new(global, can_gc));
589
590        // Set stream.[[backpressure]] to backpressure.
591        self.backpressure.set(backpressure);
592    }
593
594    /// <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller>
595    fn set_up_transform_stream_default_controller(
596        &self,
597        controller: &TransformStreamDefaultController,
598    ) {
599        // Assert: stream implements TransformStream.
600        // Note: this is checked with type.
601
602        // Assert: stream.[[controller]] is undefined.
603        assert!(self.controller.get().is_none());
604
605        // Set controller.[[stream]] to stream.
606        controller.set_stream(self);
607
608        // Set stream.[[controller]] to controller.
609        self.controller.set(Some(controller));
610
611        // Set controller.[[transformAlgorithm]] to transformAlgorithm.
612        // Set controller.[[flushAlgorithm]] to flushAlgorithm.
613        // Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
614        // Note: These are set in the constructor.
615    }
616
617    /// <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
618    fn set_up_transform_stream_default_controller_from_transformer(
619        &self,
620        global: &GlobalScope,
621        transformer_obj: SafeHandleObject,
622        transformer: &Transformer,
623        can_gc: CanGc,
624    ) {
625        // Let controller be a new TransformStreamDefaultController.
626        let transformer_type = TransformerType::new_from_js_transformer(transformer);
627        let controller = TransformStreamDefaultController::new(global, transformer_type, can_gc);
628
629        // Let transformAlgorithm be the following steps, taking a chunk argument:
630        // Let result be TransformStreamDefaultControllerEnqueue(controller, chunk).
631        // If result is an abrupt completion, return a promise rejected with result.[[Value]].
632        // Otherwise, return a promise resolved with undefined.
633
634        // Let flushAlgorithm be an algorithm which returns a promise resolved with undefined.
635        // Let cancelAlgorithm be an algorithm which returns a promise resolved with undefined.
636
637        // If transformerDict["transform"] exists, set transformAlgorithm to an algorithm which
638        // takes an argument
639        // chunk and returns the result of invoking transformerDict["transform"] with argument
640        // list « chunk, controller »
641        // and callback this value transformer.
642
643        // If transformerDict["flush"] exists, set flushAlgorithm to an algorithm which returns
644        // the result
645        // of invoking transformerDict["flush"] with argument list « controller » and callback
646        // this value transformer.
647
648        // If transformerDict["cancel"] exists, set cancelAlgorithm to an algorithm which takes an argument
649        // reason and returns the result of invoking transformerDict["cancel"] with argument list « reason »
650        // and callback this value transformer.
651        controller.set_transform_obj(transformer_obj);
652
653        // Perform ! SetUpTransformStreamDefaultController(stream, controller,
654        // transformAlgorithm, flushAlgorithm, cancelAlgorithm).
655        self.set_up_transform_stream_default_controller(&controller);
656    }
657
658    /// <https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm>
659    pub(crate) fn transform_stream_default_sink_write_algorithm(
660        &self,
661        cx: &mut JSContext,
662        global: &GlobalScope,
663        chunk: SafeHandleValue,
664    ) -> Fallible<Rc<Promise>> {
665        // Assert: stream.[[writable]].[[state]] is "writable".
666        assert!(self.writable.get().is_some());
667
668        // Let controller be stream.[[controller]].
669        let controller = self.controller.get().expect("controller is not set");
670
671        // If stream.[[backpressure]] is true,
672        if self.backpressure.get() {
673            // Let backpressureChangePromise be stream.[[backpressureChangePromise]].
674            let backpressure_change_promise = self.backpressure_change_promise.borrow();
675
676            // Assert: backpressureChangePromise is not undefined.
677            assert!(backpressure_change_promise.is_some());
678
679            // Return the result of reacting to backpressureChangePromise with the following fulfillment steps:
680            let result_promise = Promise::new2(cx, global);
681            rooted!(&in(cx) let mut fulfillment_handler = Some(TransformBackPressureChangePromiseFulfillment {
682                controller: Dom::from_ref(&controller),
683                writable: Dom::from_ref(&self.writable.get().expect("writable stream")),
684                chunk: Heap::boxed(chunk.get()),
685                result_promise: result_promise.clone(),
686            }));
687
688            let handler = PromiseNativeHandler::new(
689                global,
690                fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
691                Some(Box::new(BackpressureChangeRejection {
692                    result_promise: result_promise.clone(),
693                })),
694                CanGc::from_cx(cx),
695            );
696            let mut realm = enter_auto_realm(cx, global);
697            let realm = &mut realm.current_realm();
698            backpressure_change_promise
699                .as_ref()
700                .expect("Promise must be some by now.")
701                .append_native_handler(realm, &handler);
702
703            return Ok(result_promise);
704        }
705
706        // Return ! TransformStreamDefaultControllerPerformTransform(controller, chunk).
707        controller.transform_stream_default_controller_perform_transform(cx, global, chunk)
708    }
709
710    /// <https://streams.spec.whatwg.org/#transform-stream-default-sink-abort-algorithm>
711    pub(crate) fn transform_stream_default_sink_abort_algorithm(
712        &self,
713        cx: &mut JSContext,
714        global: &GlobalScope,
715        reason: SafeHandleValue,
716    ) -> Fallible<Rc<Promise>> {
717        // Let controller be stream.[[controller]].
718        let controller = self.controller.get().expect("controller is not set");
719
720        // If controller.[[finishPromise]] is not undefined, return controller.[[finishPromise]].
721        if let Some(finish_promise) = controller.get_finish_promise() {
722            return Ok(finish_promise);
723        }
724
725        // Let readable be stream.[[readable]].
726        let readable = self.readable.get().expect("readable stream is not set");
727
728        // Let controller.[[finishPromise]] be a new promise.
729        controller.set_finish_promise(Promise::new2(cx, global));
730
731        // Let cancelPromise be the result of performing controller.[[cancelAlgorithm]], passing reason.
732        let cancel_promise = controller.perform_cancel(cx, global, reason)?;
733
734        // Perform ! TransformStreamDefaultControllerClearAlgorithms(controller).
735        controller.clear_algorithms();
736
737        // React to cancelPromise:
738        let handler = PromiseNativeHandler::new(
739            global,
740            Some(Box::new(CancelPromiseFulfillment {
741                readable: Dom::from_ref(&readable),
742                controller: Dom::from_ref(&controller),
743                reason: Heap::boxed(reason.get()),
744            })),
745            Some(Box::new(CancelPromiseRejection {
746                readable: Dom::from_ref(&readable),
747                controller: Dom::from_ref(&controller),
748            })),
749            CanGc::from_cx(cx),
750        );
751        let mut realm = enter_auto_realm(cx, global);
752        let cx = &mut realm.current_realm();
753        cancel_promise.append_native_handler(cx, &handler);
754
755        // Return controller.[[finishPromise]].
756        let finish_promise = controller
757            .get_finish_promise()
758            .expect("finish promise is not set");
759        Ok(finish_promise)
760    }
761
762    /// <https://streams.spec.whatwg.org/#transform-stream-default-sink-close-algorithm>
763    pub(crate) fn transform_stream_default_sink_close_algorithm(
764        &self,
765        cx: &mut JSContext,
766        global: &GlobalScope,
767    ) -> Fallible<Rc<Promise>> {
768        // Let controller be stream.[[controller]].
769        let controller = self
770            .controller
771            .get()
772            .ok_or(Error::Type(c"controller is not set".to_owned()))?;
773
774        // If controller.[[finishPromise]] is not undefined, return controller.[[finishPromise]].
775        if let Some(finish_promise) = controller.get_finish_promise() {
776            return Ok(finish_promise);
777        }
778
779        // Let readable be stream.[[readable]].
780        let readable = self
781            .readable
782            .get()
783            .ok_or(Error::Type(c"readable stream is not set".to_owned()))?;
784
785        // Let controller.[[finishPromise]] be a new promise.
786        controller.set_finish_promise(Promise::new2(cx, global));
787
788        // Let flushPromise be the result of performing controller.[[flushAlgorithm]].
789        let flush_promise = controller.perform_flush(cx, global)?;
790
791        // Perform ! TransformStreamDefaultControllerClearAlgorithms(controller).
792        controller.clear_algorithms();
793
794        // React to flushPromise:
795        let handler = PromiseNativeHandler::new(
796            global,
797            Some(Box::new(FlushPromiseFulfillment {
798                readable: Dom::from_ref(&readable),
799                controller: Dom::from_ref(&controller),
800            })),
801            Some(Box::new(FlushPromiseRejection {
802                readable: Dom::from_ref(&readable),
803                controller: Dom::from_ref(&controller),
804            })),
805            CanGc::from_cx(cx),
806        );
807
808        let mut realm = enter_auto_realm(cx, global);
809        let realm = &mut realm.current_realm();
810        flush_promise.append_native_handler(realm, &handler);
811        // Return controller.[[finishPromise]].
812        let finish_promise = controller
813            .get_finish_promise()
814            .expect("finish promise is not set");
815        Ok(finish_promise)
816    }
817
818    /// <https://streams.spec.whatwg.org/#transform-stream-default-source-cancel>
819    pub(crate) fn transform_stream_default_source_cancel(
820        &self,
821        cx: &mut JSContext,
822        global: &GlobalScope,
823        reason: SafeHandleValue,
824    ) -> Fallible<Rc<Promise>> {
825        // Let controller be stream.[[controller]].
826        let controller = self
827            .controller
828            .get()
829            .ok_or(Error::Type(c"controller is not set".to_owned()))?;
830
831        // If controller.[[finishPromise]] is not undefined, return controller.[[finishPromise]].
832        if let Some(finish_promise) = controller.get_finish_promise() {
833            return Ok(finish_promise);
834        }
835
836        // Let writable be stream.[[writable]].
837        let writable = self
838            .writable
839            .get()
840            .ok_or(Error::Type(c"writable stream is not set".to_owned()))?;
841
842        // Let controller.[[finishPromise]] be a new promise.
843        controller.set_finish_promise(Promise::new2(cx, global));
844
845        // Let cancelPromise be the result of performing controller.[[cancelAlgorithm]], passing reason.
846        let cancel_promise = controller.perform_cancel(cx, global, reason)?;
847
848        // Perform ! TransformStreamDefaultControllerClearAlgorithms(controller).
849        controller.clear_algorithms();
850
851        // React to cancelPromise:
852        let handler = PromiseNativeHandler::new(
853            global,
854            Some(Box::new(SourceCancelPromiseFulfillment {
855                writeable: Dom::from_ref(&writable),
856                controller: Dom::from_ref(&controller),
857                stream: Dom::from_ref(self),
858                reason: Heap::boxed(reason.get()),
859            })),
860            Some(Box::new(SourceCancelPromiseRejection {
861                writeable: Dom::from_ref(&writable),
862                controller: Dom::from_ref(&controller),
863                stream: Dom::from_ref(self),
864            })),
865            CanGc::from_cx(cx),
866        );
867
868        // Return controller.[[finishPromise]].
869        let finish_promise = controller
870            .get_finish_promise()
871            .expect("finish promise is not set");
872        let mut realm = enter_auto_realm(cx, global);
873        let cx = &mut realm.current_realm();
874        cancel_promise.append_native_handler(cx, &handler);
875        Ok(finish_promise)
876    }
877
878    /// <https://streams.spec.whatwg.org/#transform-stream-default-source-pull>
879    pub(crate) fn transform_stream_default_source_pull(
880        &self,
881        global: &GlobalScope,
882        can_gc: CanGc,
883    ) -> Fallible<Rc<Promise>> {
884        // Assert: stream.[[backpressure]] is true.
885        assert!(self.backpressure.get());
886
887        // Assert: stream.[[backpressureChangePromise]] is not undefined.
888        assert!(self.backpressure_change_promise.borrow().is_some());
889
890        // Perform ! TransformStreamSetBackpressure(stream, false).
891        self.set_backpressure(global, false, can_gc);
892
893        // Return stream.[[backpressureChangePromise]].
894        Ok(self
895            .backpressure_change_promise
896            .borrow()
897            .clone()
898            .expect("Promise must be some by now."))
899    }
900
901    /// <https://streams.spec.whatwg.org/#transform-stream-error-writable-and-unblock-write>
902    pub(crate) fn error_writable_and_unblock_write(
903        &self,
904        cx: &mut JSContext,
905        global: &GlobalScope,
906        error: SafeHandleValue,
907    ) {
908        // Perform ! TransformStreamDefaultControllerClearAlgorithms(stream.[[controller]]).
909        self.get_controller().clear_algorithms();
910
911        // Perform ! WritableStreamDefaultControllerErrorIfNeeded(stream.[[writable]].[[controller]], e).
912        self.get_writable()
913            .get_default_controller()
914            .error_if_needed(cx, error, global);
915
916        // Perform ! TransformStreamUnblockWrite(stream).
917        self.unblock_write(global, CanGc::from_cx(cx))
918    }
919
920    /// <https://streams.spec.whatwg.org/#transform-stream-unblock-write>
921    pub(crate) fn unblock_write(&self, global: &GlobalScope, can_gc: CanGc) {
922        // If stream.[[backpressure]] is true, perform ! TransformStreamSetBackpressure(stream, false).
923        if self.backpressure.get() {
924            self.set_backpressure(global, false, can_gc);
925        }
926    }
927
928    /// <https://streams.spec.whatwg.org/#transform-stream-error>
929    pub(crate) fn error(&self, cx: &mut JSContext, global: &GlobalScope, error: SafeHandleValue) {
930        // Perform ! ReadableStreamDefaultControllerError(stream.[[readable]].[[controller]], e).
931        self.get_readable()
932            .get_default_controller()
933            .error(cx, error);
934
935        // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, e).
936        self.error_writable_and_unblock_write(cx, global, error);
937    }
938}
939
940impl TransformStreamMethods<crate::DomTypeHolder> for TransformStream {
941    /// <https://streams.spec.whatwg.org/#ts-constructor>
942    #[expect(unsafe_code)]
943    fn Constructor(
944        cx: &mut JSContext,
945        global: &GlobalScope,
946        proto: Option<SafeHandleObject>,
947        transformer: Option<*mut JSObject>,
948        writable_strategy: &QueuingStrategy,
949        readable_strategy: &QueuingStrategy,
950    ) -> Fallible<DomRoot<TransformStream>> {
951        // If transformer is missing, set it to null.
952        rooted!(&in(cx) let transformer_obj = transformer.unwrap_or(ptr::null_mut()));
953
954        // Let underlyingSinkDict be underlyingSink,
955        // converted to an IDL value of type UnderlyingSink.
956        let transformer_dict = if !transformer_obj.is_null() {
957            rooted!(&in(cx) let obj_val = ObjectValue(transformer_obj.get()));
958            match Transformer::new(cx.into(), obj_val.handle(), CanGc::from_cx(cx)) {
959                Ok(ConversionResult::Success(val)) => val,
960                Ok(ConversionResult::Failure(error)) => {
961                    return Err(Error::Type(error.into_owned()));
962                },
963                _ => {
964                    return Err(Error::JSFailed);
965                },
966            }
967        } else {
968            Transformer::empty()
969        };
970
971        // If transformerDict["readableType"] exists, throw a RangeError exception.
972        if !transformer_dict.readableType.handle().is_undefined() {
973            return Err(Error::Range(c"readableType is set".to_owned()));
974        }
975
976        // If transformerDict["writableType"] exists, throw a RangeError exception.
977        if !transformer_dict.writableType.handle().is_undefined() {
978            return Err(Error::Range(c"writableType is set".to_owned()));
979        }
980
981        // Let readableHighWaterMark be ? ExtractHighWaterMark(readableStrategy, 0).
982        let readable_high_water_mark = extract_high_water_mark(readable_strategy, 0.0)?;
983
984        // Let readableSizeAlgorithm be ! ExtractSizeAlgorithm(readableStrategy).
985        let readable_size_algorithm = extract_size_algorithm(readable_strategy, CanGc::from_cx(cx));
986
987        // Let writableHighWaterMark be ? ExtractHighWaterMark(writableStrategy, 1).
988        let writable_high_water_mark = extract_high_water_mark(writable_strategy, 1.0)?;
989
990        // Let writableSizeAlgorithm be ! ExtractSizeAlgorithm(writableStrategy).
991        let writable_size_algorithm = extract_size_algorithm(writable_strategy, CanGc::from_cx(cx));
992
993        // Let startPromise be a new promise.
994        let start_promise = Promise::new2(cx, global);
995
996        // Perform ! InitializeTransformStream(this, startPromise, writableHighWaterMark,
997        // writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm).
998        let stream = TransformStream::new_with_proto(global, proto, CanGc::from_cx(cx));
999        stream.initialize(
1000            cx,
1001            global,
1002            start_promise.clone(),
1003            writable_high_water_mark,
1004            writable_size_algorithm,
1005            readable_high_water_mark,
1006            readable_size_algorithm,
1007        )?;
1008
1009        // Perform ? SetUpTransformStreamDefaultControllerFromTransformer(this, transformer, transformerDict).
1010        stream.set_up_transform_stream_default_controller_from_transformer(
1011            global,
1012            transformer_obj.handle(),
1013            &transformer_dict,
1014            CanGc::from_cx(cx),
1015        );
1016
1017        // If transformerDict["start"] exists, then resolve startPromise with the
1018        // result of invoking transformerDict["start"]
1019        // with argument list « this.[[controller]] » and callback this value transformer.
1020        if let Some(start) = &transformer_dict.start {
1021            rooted!(&in(cx) let mut result_object = ptr::null_mut::<JSObject>());
1022            rooted!(&in(cx) let mut result: JSVal);
1023            rooted!(&in(cx) let this_object = transformer_obj.get());
1024            start.Call_(
1025                cx,
1026                &this_object.handle(),
1027                &stream.get_controller(),
1028                result.handle_mut(),
1029                ExceptionHandling::Rethrow,
1030            )?;
1031            let is_promise = unsafe {
1032                if result.is_object() {
1033                    result_object.set(result.to_object());
1034                    IsPromiseObject(result_object.handle().into_handle())
1035                } else {
1036                    false
1037                }
1038            };
1039            let promise = if is_promise {
1040                Promise::new_with_js_promise(result_object.handle(), cx.into())
1041            } else {
1042                Promise::new_resolved(global, cx.into(), result.get(), CanGc::from_cx(cx))
1043            };
1044            start_promise.resolve_native(&promise, CanGc::from_cx(cx));
1045        } else {
1046            // Otherwise, resolve startPromise with undefined.
1047            start_promise.resolve_native(&(), CanGc::from_cx(cx));
1048        };
1049
1050        Ok(stream)
1051    }
1052
1053    /// <https://streams.spec.whatwg.org/#ts-readable>
1054    fn Readable(&self) -> DomRoot<ReadableStream> {
1055        // Return this.[[readable]].
1056        self.readable.get().expect("readable stream is not set")
1057    }
1058
1059    /// <https://streams.spec.whatwg.org/#ts-writable>
1060    fn Writable(&self) -> DomRoot<WritableStream> {
1061        // Return this.[[writable]].
1062        self.writable.get().expect("writable stream is not set")
1063    }
1064}
1065
1066/// <https://streams.spec.whatwg.org/#ts-transfer>
1067impl Transferable for TransformStream {
1068    type Index = MessagePortIndex;
1069    type Data = TransformStreamData;
1070
1071    /// <https://streams.spec.whatwg.org/#ref-for-transfer-steps②>
1072    fn transfer(&self, cx: &mut JSContext) -> Fallible<(MessagePortId, TransformStreamData)> {
1073        let global = self.global();
1074        let mut realm = enter_auto_realm(cx, &*global);
1075        let mut realm = realm.current_realm();
1076        let cx = &mut realm;
1077
1078        // Step 1. Let readable be value.[[readable]].
1079        let readable = self.get_readable();
1080
1081        // Step 2. Let writable be value.[[writable]].
1082        let writable = self.get_writable();
1083
1084        // Step 3. If ! IsReadableStreamLocked(readable) is true, throw a
1085        // "DataCloneError" DOMException.
1086        // Step 4. If ! IsWritableStreamLocked(writable) is true, throw a
1087        // "DataCloneError" DOMException.
1088        if readable.is_locked() || writable.is_locked() {
1089            return Err(Error::DataClone(None));
1090        }
1091
1092        // First port pair (readable → proxy writable)
1093        let port1 = MessagePort::new(&global, CanGc::from_cx(cx));
1094        global.track_message_port(&port1, None);
1095        let port1_peer = MessagePort::new(&global, CanGc::from_cx(cx));
1096        global.track_message_port(&port1_peer, None);
1097        global.entangle_ports(*port1.message_port_id(), *port1_peer.message_port_id());
1098
1099        let proxy_readable = ReadableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
1100        proxy_readable.setup_cross_realm_transform_readable(cx, &port1);
1101        proxy_readable
1102            .pipe_to(cx, &global, &writable, false, false, false, None)
1103            .set_promise_is_handled();
1104
1105        // Second port pair (proxy readable → writable)
1106        let port2 = MessagePort::new(&global, CanGc::from_cx(cx));
1107        global.track_message_port(&port2, None);
1108        let port2_peer = MessagePort::new(&global, CanGc::from_cx(cx));
1109        global.track_message_port(&port2_peer, None);
1110        global.entangle_ports(*port2.message_port_id(), *port2_peer.message_port_id());
1111
1112        let proxy_writable = WritableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
1113        proxy_writable.setup_cross_realm_transform_writable(cx, &port2);
1114
1115        // Pipe readable into the proxy writable (→ port_1)
1116        readable
1117            .pipe_to(cx, &global, &proxy_writable, false, false, false, None)
1118            .set_promise_is_handled();
1119
1120        // Step 5. Set dataHolder.[[readable]] to !
1121        // StructuredSerializeWithTransfer(readable, « readable »).
1122        // Step 6. Set dataHolder.[[writable]] to !
1123        // StructuredSerializeWithTransfer(writable, « writable »).
1124        Ok((
1125            *port1_peer.message_port_id(),
1126            TransformStreamData {
1127                readable: port1_peer.transfer(cx)?,
1128                writable: port2_peer.transfer(cx)?,
1129            },
1130        ))
1131    }
1132
1133    /// <https://streams.spec.whatwg.org/#ref-for-transfer-receiving-steps②>
1134    fn transfer_receive(
1135        cx: &mut JSContext,
1136        owner: &GlobalScope,
1137        _id: MessagePortId,
1138        data: TransformStreamData,
1139    ) -> Result<DomRoot<Self>, ()> {
1140        let port1 = MessagePort::transfer_receive(cx, owner, data.readable.0, data.readable.1)?;
1141        let port2 = MessagePort::transfer_receive(cx, owner, data.writable.0, data.writable.1)?;
1142
1143        // Step 1. Let readableRecord be !
1144        // StructuredDeserializeWithTransfer(dataHolder.[[readable]], the
1145        // current Realm).
1146        let proxy_readable = ReadableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1147        proxy_readable.setup_cross_realm_transform_readable(cx, &port2);
1148
1149        // Step 2. Let writableRecord be !
1150        // StructuredDeserializeWithTransfer(dataHolder.[[writable]], the
1151        // current Realm).
1152        let proxy_writable = WritableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1153        proxy_writable.setup_cross_realm_transform_writable(cx, &port1);
1154
1155        // Step 3. Set value.[[readable]] to readableRecord.[[Deserialized]].
1156        // Step 4. Set value.[[writable]] to writableRecord.[[Deserialized]].
1157        // Step 5. Set value.[[backpressure]],
1158        // value.[[backpressureChangePromise]], and value.[[controller]] to
1159        // undefined.
1160        let stream = TransformStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1161        stream.readable.set(Some(&proxy_readable));
1162        stream.writable.set(Some(&proxy_writable));
1163
1164        Ok(stream)
1165    }
1166
1167    fn serialized_storage<'a>(
1168        data: StructuredData<'a, '_>,
1169    ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
1170        match data {
1171            StructuredData::Reader(r) => &mut r.transform_streams_port_impls,
1172            StructuredData::Writer(w) => &mut w.transform_streams_port,
1173        }
1174    }
1175}