script/dom/stream/
writablestream.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::mem;
8use std::ptr::{self};
9use std::rc::Rc;
10
11use base::id::{MessagePortId, MessagePortIndex};
12use constellation_traits::MessagePortImpl;
13use dom_struct::dom_struct;
14use js::jsapi::{Heap, JSObject};
15use js::jsval::{JSVal, ObjectValue, UndefinedValue};
16use js::realm::CurrentRealm;
17use js::rust::{
18    HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
19    MutableHandleValue as SafeMutableHandleValue,
20};
21use rustc_hash::FxHashMap;
22use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
23use script_bindings::conversions::SafeToJSValConvertible;
24
25use crate::dom::bindings::cell::DomRefCell;
26use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::{
27    QueuingStrategy, QueuingStrategySize,
28};
29use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::UnderlyingSink;
30use crate::dom::bindings::codegen::Bindings::WritableStreamBinding::WritableStreamMethods;
31use crate::dom::bindings::conversions::ConversionResult;
32use crate::dom::bindings::error::{Error, Fallible};
33use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
34use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
35use crate::dom::bindings::structuredclone::StructuredData;
36use crate::dom::bindings::transferable::Transferable;
37use crate::dom::domexception::{DOMErrorName, DOMException};
38use crate::dom::globalscope::GlobalScope;
39use crate::dom::messageport::MessagePort;
40use crate::dom::promise::Promise;
41use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
42use crate::dom::readablestream::{ReadableStream, get_type_and_value_from_message};
43use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
44use crate::dom::stream::writablestreamdefaultcontroller::{
45    UnderlyingSinkType, WritableStreamDefaultController,
46};
47use crate::dom::stream::writablestreamdefaultwriter::WritableStreamDefaultWriter;
48use crate::realms::{InRealm, enter_auto_realm, enter_realm};
49use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
50
51impl js::gc::Rootable for AbortAlgorithmFulfillmentHandler {}
52
53/// The fulfillment handler for the abort steps of
54/// <https://streams.spec.whatwg.org/#writable-stream-finish-erroring>
55#[derive(JSTraceable, MallocSizeOf)]
56#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
57struct AbortAlgorithmFulfillmentHandler {
58    stream: Dom<WritableStream>,
59    #[conditional_malloc_size_of]
60    abort_request_promise: Rc<Promise>,
61}
62
63impl Callback for AbortAlgorithmFulfillmentHandler {
64    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
65        let can_gc = CanGc::from_cx(cx);
66        let cx: SafeJSContext = cx.into();
67        // Resolve abortRequest’s promise with undefined.
68        self.abort_request_promise.resolve_native(&(), can_gc);
69
70        // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
71        self.stream
72            .as_rooted()
73            .reject_close_and_closed_promise_if_needed(cx, can_gc);
74    }
75}
76
77impl js::gc::Rootable for AbortAlgorithmRejectionHandler {}
78
79/// The rejection handler for the abort steps of
80/// <https://streams.spec.whatwg.org/#writable-stream-finish-erroring>
81#[derive(JSTraceable, MallocSizeOf)]
82#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
83struct AbortAlgorithmRejectionHandler {
84    stream: Dom<WritableStream>,
85    #[conditional_malloc_size_of]
86    abort_request_promise: Rc<Promise>,
87}
88
89impl Callback for AbortAlgorithmRejectionHandler {
90    fn callback(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) {
91        let can_gc = CanGc::from_cx(cx);
92        let cx: SafeJSContext = cx.into();
93        // Reject abortRequest’s promise with reason.
94        self.abort_request_promise.reject_native(&reason, can_gc);
95
96        // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
97        self.stream
98            .as_rooted()
99            .reject_close_and_closed_promise_if_needed(cx, can_gc);
100    }
101}
102
103impl js::gc::Rootable for PendingAbortRequest {}
104
105/// <https://streams.spec.whatwg.org/#pending-abort-request>
106#[derive(JSTraceable, MallocSizeOf)]
107#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
108struct PendingAbortRequest {
109    /// <https://streams.spec.whatwg.org/#pending-abort-request-promise>
110    #[conditional_malloc_size_of]
111    promise: Rc<Promise>,
112
113    /// <https://streams.spec.whatwg.org/#pending-abort-request-reason>
114    #[ignore_malloc_size_of = "mozjs"]
115    reason: Box<Heap<JSVal>>,
116
117    /// <https://streams.spec.whatwg.org/#pending-abort-request-was-already-erroring>
118    was_already_erroring: bool,
119}
120
121/// <https://streams.spec.whatwg.org/#writablestream-state>
122#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf)]
123pub(crate) enum WritableStreamState {
124    #[default]
125    Writable,
126    Closed,
127    Erroring,
128    Errored,
129}
130
131/// <https://streams.spec.whatwg.org/#ws-class>
132#[dom_struct]
133pub struct WritableStream {
134    reflector_: Reflector,
135
136    /// <https://streams.spec.whatwg.org/#writablestream-backpressure>
137    backpressure: Cell<bool>,
138
139    /// <https://streams.spec.whatwg.org/#writablestream-closerequest>
140    #[conditional_malloc_size_of]
141    close_request: DomRefCell<Option<Rc<Promise>>>,
142
143    /// <https://streams.spec.whatwg.org/#writablestream-controller>
144    controller: MutNullableDom<WritableStreamDefaultController>,
145
146    /// <https://streams.spec.whatwg.org/#writablestream-detached>
147    detached: Cell<bool>,
148
149    /// <https://streams.spec.whatwg.org/#writablestream-inflightwriterequest>
150    #[conditional_malloc_size_of]
151    in_flight_write_request: DomRefCell<Option<Rc<Promise>>>,
152
153    /// <https://streams.spec.whatwg.org/#writablestream-inflightcloserequest>
154    #[conditional_malloc_size_of]
155    in_flight_close_request: DomRefCell<Option<Rc<Promise>>>,
156
157    /// <https://streams.spec.whatwg.org/#writablestream-pendingabortrequest>
158    pending_abort_request: DomRefCell<Option<PendingAbortRequest>>,
159
160    /// <https://streams.spec.whatwg.org/#writablestream-state>
161    state: Cell<WritableStreamState>,
162
163    /// <https://streams.spec.whatwg.org/#writablestream-storederror>
164    #[ignore_malloc_size_of = "mozjs"]
165    stored_error: Heap<JSVal>,
166
167    /// <https://streams.spec.whatwg.org/#writablestream-writer>
168    writer: MutNullableDom<WritableStreamDefaultWriter>,
169
170    /// <https://streams.spec.whatwg.org/#writablestream-writerequests>
171    #[conditional_malloc_size_of]
172    write_requests: DomRefCell<VecDeque<Rc<Promise>>>,
173}
174
175impl WritableStream {
176    /// <https://streams.spec.whatwg.org/#initialize-writable-stream>
177    fn new_inherited() -> WritableStream {
178        WritableStream {
179            reflector_: Reflector::new(),
180            backpressure: Default::default(),
181            close_request: Default::default(),
182            controller: Default::default(),
183            detached: Default::default(),
184            in_flight_write_request: Default::default(),
185            in_flight_close_request: Default::default(),
186            pending_abort_request: Default::default(),
187            state: Default::default(),
188            stored_error: Default::default(),
189            writer: Default::default(),
190            write_requests: Default::default(),
191        }
192    }
193
194    pub(crate) fn new_with_proto(
195        global: &GlobalScope,
196        proto: Option<SafeHandleObject>,
197        can_gc: CanGc,
198    ) -> DomRoot<WritableStream> {
199        reflect_dom_object_with_proto(
200            Box::new(WritableStream::new_inherited()),
201            global,
202            proto,
203            can_gc,
204        )
205    }
206
207    /// Used as part of
208    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
209    pub(crate) fn assert_no_controller(&self) {
210        assert!(self.controller.get().is_none());
211    }
212
213    /// Used as part of
214    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
215    pub(crate) fn set_default_controller(&self, controller: &WritableStreamDefaultController) {
216        self.controller.set(Some(controller));
217    }
218
219    pub(crate) fn get_default_controller(&self) -> DomRoot<WritableStreamDefaultController> {
220        self.controller.get().expect("Controller should be set.")
221    }
222
223    pub(crate) fn is_writable(&self) -> bool {
224        matches!(self.state.get(), WritableStreamState::Writable)
225    }
226
227    pub(crate) fn is_erroring(&self) -> bool {
228        matches!(self.state.get(), WritableStreamState::Erroring)
229    }
230
231    pub(crate) fn is_errored(&self) -> bool {
232        matches!(self.state.get(), WritableStreamState::Errored)
233    }
234
235    pub(crate) fn is_closed(&self) -> bool {
236        matches!(self.state.get(), WritableStreamState::Closed)
237    }
238
239    pub(crate) fn has_in_flight_write_request(&self) -> bool {
240        self.in_flight_write_request.borrow().is_some()
241    }
242
243    /// <https://streams.spec.whatwg.org/#writable-stream-has-operation-marked-in-flight>
244    pub(crate) fn has_operations_marked_inflight(&self) -> bool {
245        let in_flight_write_requested = self.in_flight_write_request.borrow().is_some();
246        let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
247
248        in_flight_write_requested || in_flight_close_requested
249    }
250
251    /// <https://streams.spec.whatwg.org/#writablestream-storederror>
252    pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
253        handle_mut.set(self.stored_error.get());
254    }
255
256    /// <https://streams.spec.whatwg.org/#writable-stream-finish-erroring>
257    pub(crate) fn finish_erroring(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
258        // Assert: stream.[[state]] is "erroring".
259        assert!(self.is_erroring());
260
261        // Assert: ! WritableStreamHasOperationMarkedInFlight(stream) is false.
262        assert!(!self.has_operations_marked_inflight());
263
264        // Set stream.[[state]] to "errored".
265        self.state.set(WritableStreamState::Errored);
266
267        // Perform ! stream.[[controller]].[[ErrorSteps]]().
268        let Some(controller) = self.controller.get() else {
269            unreachable!("Stream should have a controller.");
270        };
271        controller.perform_error_steps();
272
273        // Let storedError be stream.[[storedError]].
274        rooted!(in(*cx) let mut stored_error = UndefinedValue());
275        self.get_stored_error(stored_error.handle_mut());
276
277        // For each writeRequest of stream.[[writeRequests]]:
278        let write_requests = mem::take(&mut *self.write_requests.borrow_mut());
279        for request in write_requests {
280            // Reject writeRequest with storedError.
281            request.reject(cx, stored_error.handle(), can_gc);
282        }
283
284        // Set stream.[[writeRequests]] to an empty list.
285        // Done above with `drain`.
286
287        // If stream.[[pendingAbortRequest]] is undefined,
288        if self.pending_abort_request.borrow().is_none() {
289            // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
290            self.reject_close_and_closed_promise_if_needed(cx, can_gc);
291
292            // Return.
293            return;
294        }
295
296        // Let abortRequest be stream.[[pendingAbortRequest]].
297        // Set stream.[[pendingAbortRequest]] to undefined.
298        rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
299        if let Some(pending_abort_request) = &*pending_abort_request {
300            // If abortRequest’s was already erroring is true,
301            if pending_abort_request.was_already_erroring {
302                // Reject abortRequest’s promise with storedError.
303                pending_abort_request
304                    .promise
305                    .reject(cx, stored_error.handle(), can_gc);
306
307                // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
308                self.reject_close_and_closed_promise_if_needed(cx, can_gc);
309
310                // Return.
311                return;
312            }
313
314            // Let promise be ! stream.[[controller]].[[AbortSteps]](abortRequest’s reason).
315            rooted!(in(*cx) let mut reason = UndefinedValue());
316            reason.set(pending_abort_request.reason.get());
317            let promise = controller.abort_steps(cx, global, reason.handle(), can_gc);
318
319            // Upon fulfillment of promise,
320            rooted!(in(*cx) let mut fulfillment_handler = Some(AbortAlgorithmFulfillmentHandler {
321                stream: Dom::from_ref(self),
322                abort_request_promise: pending_abort_request.promise.clone(),
323            }));
324
325            // Upon rejection of promise with reason r,
326            rooted!(in(*cx) let mut rejection_handler = Some(AbortAlgorithmRejectionHandler {
327                stream: Dom::from_ref(self),
328                abort_request_promise: pending_abort_request.promise.clone(),
329            }));
330
331            let handler = PromiseNativeHandler::new(
332                global,
333                fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
334                rejection_handler.take().map(|h| Box::new(h) as Box<_>),
335                can_gc,
336            );
337            let realm = enter_realm(global);
338            let comp = InRealm::Entered(&realm);
339            promise.append_native_handler(&handler, comp, can_gc);
340        }
341    }
342
343    /// <https://streams.spec.whatwg.org/#writable-stream-reject-close-and-closed-promise-if-needed>
344    fn reject_close_and_closed_promise_if_needed(&self, cx: SafeJSContext, can_gc: CanGc) {
345        // Assert: stream.[[state]] is "errored".
346        assert!(self.is_errored());
347
348        rooted!(in(*cx) let mut stored_error = UndefinedValue());
349        self.get_stored_error(stored_error.handle_mut());
350
351        // If stream.[[closeRequest]] is not undefined
352        let close_request = self.close_request.borrow_mut().take();
353        if let Some(close_request) = close_request {
354            // Assert: stream.[[inFlightCloseRequest]] is undefined.
355            assert!(self.in_flight_close_request.borrow().is_none());
356
357            // Reject stream.[[closeRequest]] with stream.[[storedError]].
358            close_request.reject_native(&stored_error.handle(), can_gc)
359
360            // Set stream.[[closeRequest]] to undefined.
361            // Done with `take` above.
362        }
363
364        // Let writer be stream.[[writer]].
365        // If writer is not undefined,
366        if let Some(writer) = self.writer.get() {
367            // Reject writer.[[closedPromise]] with stream.[[storedError]].
368            writer.reject_closed_promise_with_stored_error(&stored_error.handle(), can_gc);
369
370            // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
371            writer.set_close_promise_is_handled();
372        }
373    }
374
375    /// <https://streams.spec.whatwg.org/#writable-stream-close-queued-or-in-flight>
376    pub(crate) fn close_queued_or_in_flight(&self) -> bool {
377        let close_requested = self.close_request.borrow().is_some();
378        let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
379
380        close_requested || in_flight_close_requested
381    }
382
383    /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-write>
384    pub(crate) fn finish_in_flight_write(&self, can_gc: CanGc) {
385        let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
386            // Assert: stream.[[inFlightWriteRequest]] is not undefined.
387            unreachable!("Stream should have a write request");
388        };
389
390        // Resolve stream.[[inFlightWriteRequest]] with undefined.
391        in_flight_write_request.resolve_native(&(), can_gc);
392
393        // Set stream.[[inFlightWriteRequest]] to undefined.
394        // Done above with `take`.
395    }
396
397    /// <https://streams.spec.whatwg.org/#writable-stream-start-erroring>
398    pub(crate) fn start_erroring(
399        &self,
400        cx: SafeJSContext,
401        global: &GlobalScope,
402        error: SafeHandleValue,
403        can_gc: CanGc,
404    ) {
405        // Assert: stream.[[storedError]] is undefined.
406        assert!(self.stored_error.get().is_undefined());
407
408        // Assert: stream.[[state]] is "writable".
409        assert!(self.is_writable());
410
411        // Let controller be stream.[[controller]].
412        let Some(controller) = self.controller.get() else {
413            // Assert: controller is not undefined.
414            unreachable!("Stream should have a controller.");
415        };
416
417        // Set stream.[[state]] to "erroring".
418        self.state.set(WritableStreamState::Erroring);
419
420        // Set stream.[[storedError]] to reason.
421        self.stored_error.set(*error);
422
423        // Let writer be stream.[[writer]].
424        if let Some(writer) = self.writer.get() {
425            // If writer is not undefined, perform ! WritableStreamDefaultWriterEnsureReadyPromiseRejected
426            writer.ensure_ready_promise_rejected(global, error, can_gc);
427        }
428
429        // If ! WritableStreamHasOperationMarkedInFlight(stream) is false and controller.[[started]] is true
430        if !self.has_operations_marked_inflight() && controller.started() {
431            // perform ! WritableStreamFinishErroring
432            self.finish_erroring(cx, global, can_gc);
433        }
434    }
435
436    /// <https://streams.spec.whatwg.org/#writable-stream-deal-with-rejection>
437    pub(crate) fn deal_with_rejection(
438        &self,
439        cx: SafeJSContext,
440        global: &GlobalScope,
441        error: SafeHandleValue,
442        can_gc: CanGc,
443    ) {
444        // Let state be stream.[[state]].
445
446        // If state is "writable",
447        if self.is_writable() {
448            // Perform ! WritableStreamStartErroring(stream, error).
449            self.start_erroring(cx, global, error, can_gc);
450
451            // Return.
452            return;
453        }
454
455        // Assert: state is "erroring".
456        assert!(self.is_erroring());
457
458        // Perform ! WritableStreamFinishErroring(stream).
459        self.finish_erroring(cx, global, can_gc);
460    }
461
462    /// <https://streams.spec.whatwg.org/#writable-stream-mark-first-write-request-in-flight>
463    pub(crate) fn mark_first_write_request_in_flight(&self) {
464        let mut in_flight_write_request = self.in_flight_write_request.borrow_mut();
465        let mut write_requests = self.write_requests.borrow_mut();
466
467        // Assert: stream.[[inFlightWriteRequest]] is undefined.
468        assert!(in_flight_write_request.is_none());
469
470        // Assert: stream.[[writeRequests]] is not empty.
471        assert!(!write_requests.is_empty());
472
473        // Let writeRequest be stream.[[writeRequests]][0].
474        // Remove writeRequest from stream.[[writeRequests]].
475        let write_request = write_requests.pop_front().unwrap();
476
477        // Set stream.[[inFlightWriteRequest]] to writeRequest.
478        *in_flight_write_request = Some(write_request);
479    }
480
481    /// <https://streams.spec.whatwg.org/#writable-stream-mark-close-request-in-flight>
482    pub(crate) fn mark_close_request_in_flight(&self) {
483        let mut in_flight_close_request = self.in_flight_close_request.borrow_mut();
484        let mut close_request = self.close_request.borrow_mut();
485
486        // Assert: stream.[[inFlightCloseRequest]] is undefined.
487        assert!(in_flight_close_request.is_none());
488
489        // Assert: stream.[[closeRequest]] is not undefined.
490        assert!(close_request.is_some());
491
492        // Let closeRequest be stream.[[closeRequest]].
493        // Set stream.[[closeRequest]] to undefined.
494        let close_request = close_request.take().unwrap();
495
496        // Set stream.[[inFlightCloseRequest]] to closeRequest.
497        *in_flight_close_request = Some(close_request);
498    }
499
500    /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-close>
501    pub(crate) fn finish_in_flight_close(&self, cx: SafeJSContext, can_gc: CanGc) {
502        let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
503            // Assert: stream.[[inFlightCloseRequest]] is not undefined.
504            unreachable!("in_flight_close_request must be Some");
505        };
506
507        // Resolve stream.[[inFlightCloseRequest]] with undefined.
508        in_flight_close_request.resolve_native(&(), can_gc);
509
510        // Set stream.[[inFlightCloseRequest]] to undefined.
511        // Done with take above.
512
513        // Assert: stream.[[state]] is "writable" or "erroring".
514        assert!(self.is_writable() || self.is_erroring());
515
516        // If state is "erroring",
517        if self.is_erroring() {
518            // Set stream.[[storedError]] to undefined.
519            self.stored_error.set(UndefinedValue());
520
521            // If stream.[[pendingAbortRequest]] is not undefined,
522            rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
523            if let Some(pending_abort_request) = &*pending_abort_request {
524                // Resolve stream.[[pendingAbortRequest]]'s promise with undefined.
525                pending_abort_request.promise.resolve_native(&(), can_gc);
526
527                // Set stream.[[pendingAbortRequest]] to undefined.
528                // Done above with `take`.
529            }
530        }
531
532        // Set stream.[[state]] to "closed".
533        self.state.set(WritableStreamState::Closed);
534
535        // Let writer be stream.[[writer]].
536        if let Some(writer) = self.writer.get() {
537            // If writer is not undefined,
538            // resolve writer.[[closedPromise]] with undefined.
539            writer.resolve_closed_promise_with_undefined(can_gc);
540        }
541
542        // Assert: stream.[[pendingAbortRequest]] is undefined.
543        assert!(self.pending_abort_request.borrow().is_none());
544
545        // Assert: stream.[[storedError]] is undefined.
546        assert!(self.stored_error.get().is_undefined());
547    }
548
549    /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-close-with-error>
550    pub(crate) fn finish_in_flight_close_with_error(
551        &self,
552        cx: SafeJSContext,
553        global: &GlobalScope,
554        error: SafeHandleValue,
555        can_gc: CanGc,
556    ) {
557        let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
558            // Assert: stream.[[inFlightCloseRequest]] is not undefined.
559            unreachable!("Inflight close request must be defined.");
560        };
561
562        // Reject stream.[[inFlightCloseRequest]] with error.
563        in_flight_close_request.reject_native(&error, can_gc);
564
565        // Set stream.[[inFlightCloseRequest]] to undefined.
566        // Done above with `take`.
567
568        // Assert: stream.[[state]] is "writable" or "erroring".
569        assert!(self.is_erroring() || self.is_writable());
570
571        // If stream.[[pendingAbortRequest]] is not undefined,
572        rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
573        if let Some(pending_abort_request) = &*pending_abort_request {
574            // Reject stream.[[pendingAbortRequest]]'s promise with error.
575            pending_abort_request.promise.reject_native(&error, can_gc);
576
577            // Set stream.[[pendingAbortRequest]] to undefined.
578            // Done above with `take`.
579        }
580
581        // Perform ! WritableStreamDealWithRejection(stream, error).
582        self.deal_with_rejection(cx, global, error, can_gc);
583    }
584
585    /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-write-with-error>
586    pub(crate) fn finish_in_flight_write_with_error(
587        &self,
588        cx: SafeJSContext,
589        global: &GlobalScope,
590        error: SafeHandleValue,
591        can_gc: CanGc,
592    ) {
593        let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
594            // Assert: stream.[[inFlightWriteRequest]] is not undefined.
595            unreachable!("Inflight write request must be defined.");
596        };
597
598        // Reject stream.[[inFlightWriteRequest]] with error.
599        in_flight_write_request.reject_native(&error, can_gc);
600
601        // Set stream.[[inFlightWriteRequest]] to undefined.
602        // Done above with `take`.
603
604        // Assert: stream.[[state]] is "writable" or "erroring".
605        assert!(self.is_erroring() || self.is_writable());
606
607        // Perform ! WritableStreamDealWithRejection(stream, error).
608        self.deal_with_rejection(cx, global, error, can_gc);
609    }
610
611    pub(crate) fn get_writer(&self) -> Option<DomRoot<WritableStreamDefaultWriter>> {
612        self.writer.get()
613    }
614
615    pub(crate) fn set_writer(&self, writer: Option<&WritableStreamDefaultWriter>) {
616        self.writer.set(writer);
617    }
618
619    pub(crate) fn set_backpressure(&self, backpressure: bool) {
620        self.backpressure.set(backpressure);
621    }
622
623    pub(crate) fn get_backpressure(&self) -> bool {
624        self.backpressure.get()
625    }
626
627    /// <https://streams.spec.whatwg.org/#is-writable-stream-locked>
628    pub(crate) fn is_locked(&self) -> bool {
629        // If stream.[[writer]] is undefined, return false.
630        // Return true.
631        self.get_writer().is_some()
632    }
633
634    /// <https://streams.spec.whatwg.org/#writable-stream-add-write-request>
635    pub(crate) fn add_write_request(&self, global: &GlobalScope, can_gc: CanGc) -> Rc<Promise> {
636        // Assert: ! IsWritableStreamLocked(stream) is true.
637        assert!(self.is_locked());
638
639        // Assert: stream.[[state]] is "writable".
640        assert!(self.is_writable());
641
642        // Let promise be a new promise.
643        let promise = Promise::new(global, can_gc);
644
645        // Append promise to stream.[[writeRequests]].
646        self.write_requests.borrow_mut().push_back(promise.clone());
647
648        // Return promise.
649        promise
650    }
651
652    // Returns the rooted controller of the stream, if any.
653    pub(crate) fn get_controller(&self) -> Option<DomRoot<WritableStreamDefaultController>> {
654        self.controller.get()
655    }
656
657    /// <https://streams.spec.whatwg.org/#writable-stream-abort>
658    pub(crate) fn abort(
659        &self,
660        cx: &mut CurrentRealm,
661        global: &GlobalScope,
662        provided_reason: SafeHandleValue,
663    ) -> Rc<Promise> {
664        // If stream.[[state]] is "closed" or "errored",
665        if self.is_closed() || self.is_errored() {
666            // return a promise resolved with undefined.
667            return Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
668        }
669
670        // Signal abort on stream.[[controller]].[[abortController]] with reason.
671        self.get_controller()
672            .expect("Stream must have a controller.")
673            .signal_abort(cx, provided_reason);
674
675        // Let state be stream.[[state]].
676        let state = self.state.get();
677
678        // If state is "closed" or "errored", return a promise resolved with undefined.
679        if matches!(
680            state,
681            WritableStreamState::Closed | WritableStreamState::Errored
682        ) {
683            return Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
684        }
685
686        // If stream.[[pendingAbortRequest]] is not undefined,
687        if self.pending_abort_request.borrow().is_some() {
688            // return stream.[[pendingAbortRequest]]'s promise.
689            return self
690                .pending_abort_request
691                .borrow()
692                .as_ref()
693                .expect("Pending abort request must be Some.")
694                .promise
695                .clone();
696        }
697
698        // Assert: state is "writable" or "erroring".
699        assert!(self.is_writable() || self.is_erroring());
700
701        // Let wasAlreadyErroring be false.
702        let mut was_already_erroring = false;
703        rooted!(&in(cx) let undefined_reason = UndefinedValue());
704
705        // If state is "erroring",
706        let reason = if self.is_erroring() {
707            // Set wasAlreadyErroring to true.
708            was_already_erroring = true;
709
710            // Set reason to undefined.
711            undefined_reason.handle()
712        } else {
713            // Use the provided reason.
714            provided_reason
715        };
716
717        // Let promise be a new promise.
718        let promise = Promise::new2(cx, global);
719
720        // Set stream.[[pendingAbortRequest]] to a new pending abort request
721        // whose promise is promise,
722        // reason is reason,
723        // and was already erroring is wasAlreadyErroring.
724        *self.pending_abort_request.borrow_mut() = Some(PendingAbortRequest {
725            promise: promise.clone(),
726            reason: Heap::boxed(reason.get()),
727            was_already_erroring,
728        });
729
730        // If wasAlreadyErroring is false,
731        if !was_already_erroring {
732            // perform ! WritableStreamStartErroring(stream, reason)
733            self.start_erroring(cx.into(), global, reason, CanGc::from_cx(cx));
734        }
735
736        // Return promise.
737        promise
738    }
739
740    /// <https://streams.spec.whatwg.org/#writable-stream-close>
741    pub(crate) fn close(
742        &self,
743        cx: SafeJSContext,
744        global: &GlobalScope,
745        can_gc: CanGc,
746    ) -> Rc<Promise> {
747        // Let state be stream.[[state]].
748        // If state is "closed" or "errored",
749        if self.is_closed() || self.is_errored() {
750            // return a promise rejected with a TypeError exception.
751            let promise = Promise::new(global, can_gc);
752            promise.reject_error(
753                Error::Type(c"Stream is closed or errored.".to_owned()),
754                can_gc,
755            );
756            return promise;
757        }
758
759        // Assert: state is "writable" or "erroring".
760        assert!(self.is_writable() || self.is_erroring());
761
762        // Assert: ! WritableStreamCloseQueuedOrInFlight(stream) is false.
763        assert!(!self.close_queued_or_in_flight());
764
765        // Let promise be a new promise.
766        let promise = Promise::new(global, can_gc);
767
768        // Set stream.[[closeRequest]] to promise.
769        *self.close_request.borrow_mut() = Some(promise.clone());
770
771        // Let writer be stream.[[writer]].
772        // If writer is not undefined,
773        if let Some(writer) = self.writer.get() {
774            // and stream.[[backpressure]] is true,
775            // and state is "writable",
776            if self.get_backpressure() && self.is_writable() {
777                // resolve writer.[[readyPromise]] with undefined.
778                writer.resolve_ready_promise_with_undefined(can_gc);
779            }
780        }
781
782        // Perform ! WritableStreamDefaultControllerClose(stream.[[controller]]).
783        let Some(controller) = self.controller.get() else {
784            unreachable!("Stream must have a controller.");
785        };
786        controller.close(cx, global, can_gc);
787
788        // Return promise.
789        promise
790    }
791
792    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-get-desired-size>
793    /// Note: implement as a stream method, as opposed to a writer one, for convenience.
794    pub(crate) fn get_desired_size(&self) -> Option<f64> {
795        // Let stream be writer.[[stream]].
796        // Stream is `self`.
797
798        // Let state be stream.[[state]].
799        // If state is "errored" or "erroring", return null.
800        if self.is_errored() || self.is_erroring() {
801            return None;
802        }
803
804        // If state is "closed", return 0.
805        if self.is_closed() {
806            return Some(0.);
807        }
808
809        let Some(controller) = self.controller.get() else {
810            unreachable!("Stream must have a controller.");
811        };
812        Some(controller.get_desired_size())
813    }
814
815    /// <https://streams.spec.whatwg.org/#acquire-writable-stream-default-writer>
816    pub(crate) fn aquire_default_writer(
817        &self,
818        cx: SafeJSContext,
819        global: &GlobalScope,
820        can_gc: CanGc,
821    ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
822        // Let writer be a new WritableStreamDefaultWriter object.
823        let writer = WritableStreamDefaultWriter::new(global, None, can_gc);
824
825        // Perform ? SetUpWritableStreamDefaultWriter(writer, stream).
826        writer.setup(cx, self, can_gc)?;
827
828        // Return writer.
829        Ok(writer)
830    }
831
832    /// <https://streams.spec.whatwg.org/#writable-stream-update-backpressure>
833    pub(crate) fn update_backpressure(
834        &self,
835        backpressure: bool,
836        global: &GlobalScope,
837        can_gc: CanGc,
838    ) {
839        // Assert: stream.[[state]] is "writable".
840        self.is_writable();
841
842        // Assert: ! WritableStreamCloseQueuedOrInFlight(stream) is false.
843        assert!(!self.close_queued_or_in_flight());
844
845        // Let writer be stream.[[writer]].
846        let writer = self.get_writer();
847
848        if let Some(writer) = writer {
849            // If writer is not undefined
850            if backpressure != self.get_backpressure() {
851                // and backpressure is not stream.[[backpressure]],
852                if backpressure {
853                    // If backpressure is true, set writer.[[readyPromise]] to a new promise.
854                    let promise = Promise::new(global, can_gc);
855                    writer.set_ready_promise(promise);
856                } else {
857                    // Otherwise,
858                    // Assert: backpressure is false.
859                    assert!(!backpressure);
860                    // Resolve writer.[[readyPromise]] with undefined.
861                    writer.resolve_ready_promise_with_undefined(can_gc);
862                }
863            }
864        }
865
866        // Set stream.[[backpressure]] to backpressure.
867        self.set_backpressure(backpressure);
868    }
869
870    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
871    pub(crate) fn setup_cross_realm_transform_writable(
872        &self,
873        cx: SafeJSContext,
874        port: &MessagePort,
875        can_gc: CanGc,
876    ) {
877        let port_id = port.message_port_id();
878        let global = self.global();
879
880        // Perform ! InitializeWritableStream(stream).
881        // Done in `new_inherited`.
882
883        // Let sizeAlgorithm be an algorithm that returns 1.
884        // Re-ordered because of the need to pass it to `new`.
885        let size_algorithm = extract_size_algorithm(&QueuingStrategy::default(), can_gc);
886
887        // Note: other algorithms defined in the controller at call site.
888
889        // Let backpressurePromise be a new promise.
890        let backpressure_promise = Rc::new(RefCell::new(Some(Promise::new(&global, can_gc))));
891
892        // Let controller be a new WritableStreamDefaultController.
893        let controller = WritableStreamDefaultController::new(
894            &global,
895            UnderlyingSinkType::Transfer {
896                backpressure_promise: backpressure_promise.clone(),
897                port: Dom::from_ref(port),
898            },
899            1.0,
900            size_algorithm,
901            can_gc,
902        );
903
904        // Add a handler for port’s message event with the following steps:
905        // Add a handler for port’s messageerror event with the following steps:
906        rooted!(in(*cx) let cross_realm_transform_writable = CrossRealmTransformWritable {
907            controller: Dom::from_ref(&controller),
908            backpressure_promise: backpressure_promise.clone(),
909        });
910        global.note_cross_realm_transform_writable(&cross_realm_transform_writable, port_id);
911
912        // Enable port’s port message queue.
913        port.Start(can_gc);
914
915        // Perform ! SetUpWritableStreamDefaultController
916        controller
917            .setup(cx, &global, self, can_gc)
918            .expect("Setup for transfer cannot fail");
919    }
920    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink>
921    #[allow(clippy::too_many_arguments)]
922    pub(crate) fn setup_from_underlying_sink(
923        &self,
924        cx: SafeJSContext,
925        global: &GlobalScope,
926        stream: &WritableStream,
927        underlying_sink_obj: SafeHandleObject,
928        underlying_sink: &UnderlyingSink,
929        strategy_hwm: f64,
930        strategy_size: Rc<QueuingStrategySize>,
931        can_gc: CanGc,
932    ) -> Result<(), Error> {
933        // Let controller be a new WritableStreamDefaultController.
934
935        // Let startAlgorithm be an algorithm that returns undefined.
936
937        // Let writeAlgorithm be an algorithm that returns a promise resolved with undefined.
938
939        // Let closeAlgorithm be an algorithm that returns a promise resolved with undefined.
940
941        // Let abortAlgorithm be an algorithm that returns a promise resolved with undefined.
942
943        // If underlyingSinkDict["start"] exists, then set startAlgorithm to an algorithm which
944        // returns the result of invoking underlyingSinkDict["start"] with argument
945        // list « controller », exception behavior "rethrow", and callback this value underlyingSink.
946
947        // If underlyingSinkDict["write"] exists, then set writeAlgorithm to an algorithm which
948        // takes an argument chunk and returns the result of invoking underlyingSinkDict["write"]
949        // with argument list « chunk, controller » and callback this value underlyingSink.
950
951        // If underlyingSinkDict["close"] exists, then set closeAlgorithm to an algorithm which
952        // returns the result of invoking underlyingSinkDict["close"] with argument
953        // list «» and callback this value underlyingSink.
954
955        // If underlyingSinkDict["abort"] exists, then set abortAlgorithm to an algorithm which
956        // takes an argument reason and returns the result of invoking underlyingSinkDict["abort"]
957        // with argument list « reason » and callback this value underlyingSink.
958        let controller = WritableStreamDefaultController::new(
959            global,
960            UnderlyingSinkType::new_js(
961                underlying_sink.abort.clone(),
962                underlying_sink.start.clone(),
963                underlying_sink.close.clone(),
964                underlying_sink.write.clone(),
965            ),
966            strategy_hwm,
967            strategy_size,
968            can_gc,
969        );
970
971        // Note: this must be done before `setup`,
972        // otherwise `thisOb` is null in the start callback.
973        controller.set_underlying_sink_this_object(underlying_sink_obj);
974
975        // Perform ? SetUpWritableStreamDefaultController
976        controller.setup(cx, global, stream, can_gc)
977    }
978}
979
980/// <https://streams.spec.whatwg.org/#create-writable-stream>
981#[cfg_attr(crown, expect(crown::unrooted_must_root))]
982pub(crate) fn create_writable_stream(
983    cx: SafeJSContext,
984    global: &GlobalScope,
985    writable_high_water_mark: f64,
986    writable_size_algorithm: Rc<QueuingStrategySize>,
987    underlying_sink_type: UnderlyingSinkType,
988    can_gc: CanGc,
989) -> Fallible<DomRoot<WritableStream>> {
990    // Assert: ! IsNonNegativeNumber(highWaterMark) is true.
991    assert!(writable_high_water_mark >= 0.0);
992
993    // Let stream be a new WritableStream.
994    // Perform ! InitializeWritableStream(stream).
995    let stream = WritableStream::new_with_proto(global, None, can_gc);
996
997    // Let controller be a new WritableStreamDefaultController.
998    let controller = WritableStreamDefaultController::new(
999        global,
1000        underlying_sink_type,
1001        writable_high_water_mark,
1002        writable_size_algorithm,
1003        can_gc,
1004    );
1005
1006    // Perform ? SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm,
1007    // closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm).
1008    controller.setup(cx, global, &stream, can_gc)?;
1009
1010    // Return stream.
1011    Ok(stream)
1012}
1013
1014impl WritableStreamMethods<crate::DomTypeHolder> for WritableStream {
1015    /// <https://streams.spec.whatwg.org/#ws-constructor>
1016    fn Constructor(
1017        cx: SafeJSContext,
1018        global: &GlobalScope,
1019        proto: Option<SafeHandleObject>,
1020        can_gc: CanGc,
1021        underlying_sink: Option<*mut JSObject>,
1022        strategy: &QueuingStrategy,
1023    ) -> Fallible<DomRoot<WritableStream>> {
1024        // If underlyingSink is missing, set it to null.
1025        rooted!(in(*cx) let underlying_sink_obj = underlying_sink.unwrap_or(ptr::null_mut()));
1026
1027        // Let underlyingSinkDict be underlyingSink,
1028        // converted to an IDL value of type UnderlyingSink.
1029        let underlying_sink_dict = if !underlying_sink_obj.is_null() {
1030            rooted!(in(*cx) let obj_val = ObjectValue(underlying_sink_obj.get()));
1031            match UnderlyingSink::new(cx, obj_val.handle(), can_gc) {
1032                Ok(ConversionResult::Success(val)) => val,
1033                Ok(ConversionResult::Failure(error)) => {
1034                    return Err(Error::Type(error.into_owned()));
1035                },
1036                _ => {
1037                    return Err(Error::JSFailed);
1038                },
1039            }
1040        } else {
1041            UnderlyingSink::empty()
1042        };
1043
1044        if !underlying_sink_dict.type_.handle().is_undefined() {
1045            // If underlyingSinkDict["type"] exists, throw a RangeError exception.
1046            return Err(Error::Range(c"type is set".to_owned()));
1047        }
1048
1049        // Perform ! InitializeWritableStream(this).
1050        let stream = WritableStream::new_with_proto(global, proto, can_gc);
1051
1052        // Let sizeAlgorithm be ! ExtractSizeAlgorithm(strategy).
1053        let size_algorithm = extract_size_algorithm(strategy, can_gc);
1054
1055        // Let highWaterMark be ? ExtractHighWaterMark(strategy, 1).
1056        let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
1057
1058        // Perform ? SetUpWritableStreamDefaultControllerFromUnderlyingSink(this, underlyingSink,
1059        // underlyingSinkDict, highWaterMark, sizeAlgorithm).
1060        stream.setup_from_underlying_sink(
1061            cx,
1062            global,
1063            &stream,
1064            underlying_sink_obj.handle(),
1065            &underlying_sink_dict,
1066            high_water_mark,
1067            size_algorithm,
1068            can_gc,
1069        )?;
1070
1071        Ok(stream)
1072    }
1073
1074    /// <https://streams.spec.whatwg.org/#ws-locked>
1075    fn Locked(&self) -> bool {
1076        // Return ! IsWritableStreamLocked(this).
1077        self.is_locked()
1078    }
1079
1080    /// <https://streams.spec.whatwg.org/#ws-abort>
1081    fn Abort(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) -> Rc<Promise> {
1082        let global = GlobalScope::from_current_realm(cx);
1083
1084        // If ! IsWritableStreamLocked(this) is true,
1085        if self.is_locked() {
1086            // return a promise rejected with a TypeError exception.
1087            let promise = Promise::new2(cx, &global);
1088            promise.reject_error(
1089                Error::Type(c"Stream is locked.".to_owned()),
1090                CanGc::from_cx(cx),
1091            );
1092            return promise;
1093        }
1094
1095        // Return ! WritableStreamAbort(this, reason).
1096        self.abort(cx, &global, reason)
1097    }
1098
1099    /// <https://streams.spec.whatwg.org/#ws-close>
1100    fn Close(&self, realm: InRealm, can_gc: CanGc) -> Rc<Promise> {
1101        let cx = GlobalScope::get_cx();
1102        let global = GlobalScope::from_safe_context(cx, realm);
1103
1104        // If ! IsWritableStreamLocked(this) is true,
1105        if self.is_locked() {
1106            // return a promise rejected with a TypeError exception.
1107            let promise = Promise::new(&global, can_gc);
1108            promise.reject_error(Error::Type(c"Stream is locked.".to_owned()), can_gc);
1109            return promise;
1110        }
1111
1112        // If ! WritableStreamCloseQueuedOrInFlight(this) is true
1113        if self.close_queued_or_in_flight() {
1114            // return a promise rejected with a TypeError exception.
1115            let promise = Promise::new(&global, can_gc);
1116            promise.reject_error(
1117                Error::Type(c"Stream has closed queued or in-flight".to_owned()),
1118                can_gc,
1119            );
1120            return promise;
1121        }
1122
1123        // Return ! WritableStreamClose(this).
1124        self.close(cx, &global, can_gc)
1125    }
1126
1127    /// <https://streams.spec.whatwg.org/#ws-get-writer>
1128    fn GetWriter(
1129        &self,
1130        realm: InRealm,
1131        can_gc: CanGc,
1132    ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
1133        let cx = GlobalScope::get_cx();
1134        let global = GlobalScope::from_safe_context(cx, realm);
1135
1136        // Return ? AcquireWritableStreamDefaultWriter(this).
1137        self.aquire_default_writer(cx, &global, can_gc)
1138    }
1139}
1140
1141impl js::gc::Rootable for CrossRealmTransformWritable {}
1142
1143/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
1144/// A wrapper to handle `message` and `messageerror` events
1145/// for the port used by the transfered stream.
1146#[derive(Clone, JSTraceable, MallocSizeOf)]
1147#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
1148pub(crate) struct CrossRealmTransformWritable {
1149    /// The controller used in the algorithm.
1150    controller: Dom<WritableStreamDefaultController>,
1151
1152    /// The `backpressurePromise` used in the algorithm.
1153    #[ignore_malloc_size_of = "nested Rc"]
1154    backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
1155}
1156
1157impl CrossRealmTransformWritable {
1158    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
1159    /// Add a handler for port’s message event with the following steps:
1160    #[expect(unsafe_code)]
1161    pub(crate) fn handle_message(
1162        &self,
1163        cx: SafeJSContext,
1164        global: &GlobalScope,
1165        message: SafeHandleValue,
1166        _realm: InRealm,
1167        can_gc: CanGc,
1168    ) {
1169        rooted!(in(*cx) let mut value = UndefinedValue());
1170        let type_string =
1171            unsafe { get_type_and_value_from_message(cx, message, value.handle_mut(), can_gc) };
1172
1173        // If type is "pull",
1174        // Done below as the steps are the same for both types.
1175
1176        // Otherwise, if type is "error",
1177        if type_string == "error" {
1178            // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, value).
1179            self.controller
1180                .error_if_needed(cx, value.handle(), global, can_gc);
1181        }
1182
1183        let backpressure_promise = self.backpressure_promise.borrow_mut().take();
1184
1185        // Note: the below steps are for both "pull" and "error" types.
1186        // If backpressurePromise is not undefined,
1187        if let Some(promise) = backpressure_promise {
1188            // Resolve backpressurePromise with undefined.
1189            promise.resolve_native(&(), can_gc);
1190
1191            // Set backpressurePromise to undefined.
1192            // Done above with `take`.
1193        }
1194    }
1195
1196    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
1197    /// Add a handler for port’s messageerror event with the following steps:
1198    pub(crate) fn handle_error(
1199        &self,
1200        cx: SafeJSContext,
1201        global: &GlobalScope,
1202        port: &MessagePort,
1203        _realm: InRealm,
1204        can_gc: CanGc,
1205    ) {
1206        // Let error be a new "DataCloneError" DOMException.
1207        let error = DOMException::new(global, DOMErrorName::DataCloneError, can_gc);
1208        rooted!(in(*cx) let mut rooted_error = UndefinedValue());
1209        error.safe_to_jsval(cx, rooted_error.handle_mut(), can_gc);
1210
1211        // Perform ! CrossRealmTransformSendError(port, error).
1212        port.cross_realm_transform_send_error(rooted_error.handle(), can_gc);
1213
1214        // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, error).
1215        self.controller
1216            .error_if_needed(cx, rooted_error.handle(), global, can_gc);
1217
1218        // Disentangle port.
1219        global.disentangle_port(port, can_gc);
1220    }
1221}
1222
1223/// <https://streams.spec.whatwg.org/#ws-transfer>
1224impl Transferable for WritableStream {
1225    type Index = MessagePortIndex;
1226    type Data = MessagePortImpl;
1227
1228    /// <https://streams.spec.whatwg.org/#ref-for-transfer-steps①>
1229    fn transfer(
1230        &self,
1231        cx: &mut js::context::JSContext,
1232    ) -> Fallible<(MessagePortId, MessagePortImpl)> {
1233        // Step 1. If ! IsWritableStreamLocked(value) is true, throw a
1234        // "DataCloneError" DOMException.
1235        if self.is_locked() {
1236            return Err(Error::DataClone(None));
1237        }
1238
1239        let global = self.global();
1240        let mut realm = enter_auto_realm(cx, &*global);
1241        let mut realm = realm.current_realm();
1242        let cx = &mut realm;
1243
1244        // Step 2. Let port1 be a new MessagePort in the current Realm.
1245        let port_1 = MessagePort::new(&global, CanGc::from_cx(cx));
1246        global.track_message_port(&port_1, None);
1247
1248        // Step 3. Let port2 be a new MessagePort in the current Realm.
1249        let port_2 = MessagePort::new(&global, CanGc::from_cx(cx));
1250        global.track_message_port(&port_2, None);
1251
1252        // Step 4. Entangle port1 and port2.
1253        global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
1254
1255        // Step 5. Let readable be a new ReadableStream in the current Realm.
1256        let readable = ReadableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
1257
1258        // Step 6. Perform ! SetUpCrossRealmTransformReadable(readable, port1).
1259        readable.setup_cross_realm_transform_readable(cx.into(), &port_1, CanGc::from_cx(cx));
1260
1261        // Step 7. Let promise be ! ReadableStreamPipeTo(readable, value, false, false, false).
1262        let promise = readable.pipe_to(cx, &global, self, false, false, false, None);
1263
1264        // Step 8. Set promise.[[PromiseIsHandled]] to true.
1265        promise.set_promise_is_handled();
1266
1267        // Step 9. Set dataHolder.[[port]] to ! StructuredSerializeWithTransfer(port2, « port2 »).
1268        port_2.transfer(cx)
1269    }
1270
1271    /// <https://streams.spec.whatwg.org/#ref-for-transfer-receiving-steps①>
1272    fn transfer_receive(
1273        cx: &mut js::context::JSContext,
1274        owner: &GlobalScope,
1275        id: MessagePortId,
1276        port_impl: MessagePortImpl,
1277    ) -> Result<DomRoot<Self>, ()> {
1278        // Their transfer-receiving steps, given dataHolder and value, are:
1279        // Note: dataHolder is used in `structuredclone.rs`, and value is created here.
1280        let value = WritableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1281
1282        // Step 1. Let deserializedRecord be !
1283        // StructuredDeserializeWithTransfer(dataHolder.[[port]], the current
1284        // Realm).
1285        // Done with the `Deserialize` derive of `MessagePortImpl`.
1286
1287        // Step 2. Let port be deserializedRecord.[[Deserialized]].
1288        let transferred_port = MessagePort::transfer_receive(cx, owner, id, port_impl)?;
1289
1290        // Step 3. Perform ! SetUpCrossRealmTransformWritable(value, port).
1291        value.setup_cross_realm_transform_writable(
1292            cx.into(),
1293            &transferred_port,
1294            CanGc::from_cx(cx),
1295        );
1296        Ok(value)
1297    }
1298
1299    /// Note: we are relying on the port transfer, so the data returned here are related to the port.
1300    fn serialized_storage<'a>(
1301        data: StructuredData<'a, '_>,
1302    ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
1303        match data {
1304            StructuredData::Reader(r) => &mut r.port_impls,
1305            StructuredData::Writer(w) => &mut w.ports,
1306        }
1307    }
1308}