script/dom/stream/
writablestream.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::mem;
8use std::ptr::{self};
9use std::rc::Rc;
10
11use base::id::{MessagePortId, MessagePortIndex};
12use constellation_traits::MessagePortImpl;
13use dom_struct::dom_struct;
14use js::jsapi::{Heap, JSObject};
15use js::jsval::{JSVal, ObjectValue, UndefinedValue};
16use js::realm::CurrentRealm;
17use js::rust::{
18    HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
19    MutableHandleValue as SafeMutableHandleValue,
20};
21use rustc_hash::FxHashMap;
22use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
23use script_bindings::conversions::SafeToJSValConvertible;
24
25use crate::dom::bindings::cell::DomRefCell;
26use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::{
27    QueuingStrategy, QueuingStrategySize,
28};
29use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::UnderlyingSink;
30use crate::dom::bindings::codegen::Bindings::WritableStreamBinding::WritableStreamMethods;
31use crate::dom::bindings::conversions::ConversionResult;
32use crate::dom::bindings::error::{Error, Fallible};
33use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
34use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
35use crate::dom::bindings::structuredclone::StructuredData;
36use crate::dom::bindings::transferable::Transferable;
37use crate::dom::domexception::{DOMErrorName, DOMException};
38use crate::dom::globalscope::GlobalScope;
39use crate::dom::messageport::MessagePort;
40use crate::dom::promise::Promise;
41use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
42use crate::dom::readablestream::{ReadableStream, get_type_and_value_from_message};
43use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
44use crate::dom::stream::writablestreamdefaultcontroller::{
45    UnderlyingSinkType, WritableStreamDefaultController,
46};
47use crate::dom::stream::writablestreamdefaultwriter::WritableStreamDefaultWriter;
48use crate::realms::{InRealm, enter_realm};
49use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
50
51impl js::gc::Rootable for AbortAlgorithmFulfillmentHandler {}
52
53/// The fulfillment handler for the abort steps of
54/// <https://streams.spec.whatwg.org/#writable-stream-finish-erroring>
55#[derive(JSTraceable, MallocSizeOf)]
56#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
57struct AbortAlgorithmFulfillmentHandler {
58    stream: Dom<WritableStream>,
59    #[conditional_malloc_size_of]
60    abort_request_promise: Rc<Promise>,
61}
62
63impl Callback for AbortAlgorithmFulfillmentHandler {
64    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
65        let can_gc = CanGc::from_cx(cx);
66        let cx: SafeJSContext = cx.into();
67        // Resolve abortRequest’s promise with undefined.
68        self.abort_request_promise.resolve_native(&(), can_gc);
69
70        // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
71        self.stream
72            .as_rooted()
73            .reject_close_and_closed_promise_if_needed(cx, can_gc);
74    }
75}
76
77impl js::gc::Rootable for AbortAlgorithmRejectionHandler {}
78
79/// The rejection handler for the abort steps of
80/// <https://streams.spec.whatwg.org/#writable-stream-finish-erroring>
81#[derive(JSTraceable, MallocSizeOf)]
82#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
83struct AbortAlgorithmRejectionHandler {
84    stream: Dom<WritableStream>,
85    #[conditional_malloc_size_of]
86    abort_request_promise: Rc<Promise>,
87}
88
89impl Callback for AbortAlgorithmRejectionHandler {
90    fn callback(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) {
91        let can_gc = CanGc::from_cx(cx);
92        let cx: SafeJSContext = cx.into();
93        // Reject abortRequest’s promise with reason.
94        self.abort_request_promise.reject_native(&reason, can_gc);
95
96        // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
97        self.stream
98            .as_rooted()
99            .reject_close_and_closed_promise_if_needed(cx, can_gc);
100    }
101}
102
103impl js::gc::Rootable for PendingAbortRequest {}
104
105/// <https://streams.spec.whatwg.org/#pending-abort-request>
106#[derive(JSTraceable, MallocSizeOf)]
107#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
108struct PendingAbortRequest {
109    /// <https://streams.spec.whatwg.org/#pending-abort-request-promise>
110    #[conditional_malloc_size_of]
111    promise: Rc<Promise>,
112
113    /// <https://streams.spec.whatwg.org/#pending-abort-request-reason>
114    #[ignore_malloc_size_of = "mozjs"]
115    reason: Box<Heap<JSVal>>,
116
117    /// <https://streams.spec.whatwg.org/#pending-abort-request-was-already-erroring>
118    was_already_erroring: bool,
119}
120
121/// <https://streams.spec.whatwg.org/#writablestream-state>
122#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf)]
123pub(crate) enum WritableStreamState {
124    #[default]
125    Writable,
126    Closed,
127    Erroring,
128    Errored,
129}
130
131/// <https://streams.spec.whatwg.org/#ws-class>
132#[dom_struct]
133pub struct WritableStream {
134    reflector_: Reflector,
135
136    /// <https://streams.spec.whatwg.org/#writablestream-backpressure>
137    backpressure: Cell<bool>,
138
139    /// <https://streams.spec.whatwg.org/#writablestream-closerequest>
140    #[conditional_malloc_size_of]
141    close_request: DomRefCell<Option<Rc<Promise>>>,
142
143    /// <https://streams.spec.whatwg.org/#writablestream-controller>
144    controller: MutNullableDom<WritableStreamDefaultController>,
145
146    /// <https://streams.spec.whatwg.org/#writablestream-detached>
147    detached: Cell<bool>,
148
149    /// <https://streams.spec.whatwg.org/#writablestream-inflightwriterequest>
150    #[conditional_malloc_size_of]
151    in_flight_write_request: DomRefCell<Option<Rc<Promise>>>,
152
153    /// <https://streams.spec.whatwg.org/#writablestream-inflightcloserequest>
154    #[conditional_malloc_size_of]
155    in_flight_close_request: DomRefCell<Option<Rc<Promise>>>,
156
157    /// <https://streams.spec.whatwg.org/#writablestream-pendingabortrequest>
158    pending_abort_request: DomRefCell<Option<PendingAbortRequest>>,
159
160    /// <https://streams.spec.whatwg.org/#writablestream-state>
161    state: Cell<WritableStreamState>,
162
163    /// <https://streams.spec.whatwg.org/#writablestream-storederror>
164    #[ignore_malloc_size_of = "mozjs"]
165    stored_error: Heap<JSVal>,
166
167    /// <https://streams.spec.whatwg.org/#writablestream-writer>
168    writer: MutNullableDom<WritableStreamDefaultWriter>,
169
170    /// <https://streams.spec.whatwg.org/#writablestream-writerequests>
171    #[conditional_malloc_size_of]
172    write_requests: DomRefCell<VecDeque<Rc<Promise>>>,
173}
174
175impl WritableStream {
176    /// <https://streams.spec.whatwg.org/#initialize-writable-stream>
177    fn new_inherited() -> WritableStream {
178        WritableStream {
179            reflector_: Reflector::new(),
180            backpressure: Default::default(),
181            close_request: Default::default(),
182            controller: Default::default(),
183            detached: Default::default(),
184            in_flight_write_request: Default::default(),
185            in_flight_close_request: Default::default(),
186            pending_abort_request: Default::default(),
187            state: Default::default(),
188            stored_error: Default::default(),
189            writer: Default::default(),
190            write_requests: Default::default(),
191        }
192    }
193
194    pub(crate) fn new_with_proto(
195        global: &GlobalScope,
196        proto: Option<SafeHandleObject>,
197        can_gc: CanGc,
198    ) -> DomRoot<WritableStream> {
199        reflect_dom_object_with_proto(
200            Box::new(WritableStream::new_inherited()),
201            global,
202            proto,
203            can_gc,
204        )
205    }
206
207    /// Used as part of
208    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
209    pub(crate) fn assert_no_controller(&self) {
210        assert!(self.controller.get().is_none());
211    }
212
213    /// Used as part of
214    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
215    pub(crate) fn set_default_controller(&self, controller: &WritableStreamDefaultController) {
216        self.controller.set(Some(controller));
217    }
218
219    pub(crate) fn get_default_controller(&self) -> DomRoot<WritableStreamDefaultController> {
220        self.controller.get().expect("Controller should be set.")
221    }
222
223    pub(crate) fn is_writable(&self) -> bool {
224        matches!(self.state.get(), WritableStreamState::Writable)
225    }
226
227    pub(crate) fn is_erroring(&self) -> bool {
228        matches!(self.state.get(), WritableStreamState::Erroring)
229    }
230
231    pub(crate) fn is_errored(&self) -> bool {
232        matches!(self.state.get(), WritableStreamState::Errored)
233    }
234
235    pub(crate) fn is_closed(&self) -> bool {
236        matches!(self.state.get(), WritableStreamState::Closed)
237    }
238
239    pub(crate) fn has_in_flight_write_request(&self) -> bool {
240        self.in_flight_write_request.borrow().is_some()
241    }
242
243    /// <https://streams.spec.whatwg.org/#writable-stream-has-operation-marked-in-flight>
244    pub(crate) fn has_operations_marked_inflight(&self) -> bool {
245        let in_flight_write_requested = self.in_flight_write_request.borrow().is_some();
246        let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
247
248        in_flight_write_requested || in_flight_close_requested
249    }
250
251    /// <https://streams.spec.whatwg.org/#writablestream-storederror>
252    pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
253        handle_mut.set(self.stored_error.get());
254    }
255
256    /// <https://streams.spec.whatwg.org/#writable-stream-finish-erroring>
257    pub(crate) fn finish_erroring(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
258        // Assert: stream.[[state]] is "erroring".
259        assert!(self.is_erroring());
260
261        // Assert: ! WritableStreamHasOperationMarkedInFlight(stream) is false.
262        assert!(!self.has_operations_marked_inflight());
263
264        // Set stream.[[state]] to "errored".
265        self.state.set(WritableStreamState::Errored);
266
267        // Perform ! stream.[[controller]].[[ErrorSteps]]().
268        let Some(controller) = self.controller.get() else {
269            unreachable!("Stream should have a controller.");
270        };
271        controller.perform_error_steps();
272
273        // Let storedError be stream.[[storedError]].
274        rooted!(in(*cx) let mut stored_error = UndefinedValue());
275        self.get_stored_error(stored_error.handle_mut());
276
277        // For each writeRequest of stream.[[writeRequests]]:
278        let write_requests = mem::take(&mut *self.write_requests.borrow_mut());
279        for request in write_requests {
280            // Reject writeRequest with storedError.
281            request.reject(cx, stored_error.handle(), can_gc);
282        }
283
284        // Set stream.[[writeRequests]] to an empty list.
285        // Done above with `drain`.
286
287        // If stream.[[pendingAbortRequest]] is undefined,
288        if self.pending_abort_request.borrow().is_none() {
289            // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
290            self.reject_close_and_closed_promise_if_needed(cx, can_gc);
291
292            // Return.
293            return;
294        }
295
296        // Let abortRequest be stream.[[pendingAbortRequest]].
297        // Set stream.[[pendingAbortRequest]] to undefined.
298        rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
299        if let Some(pending_abort_request) = &*pending_abort_request {
300            // If abortRequest’s was already erroring is true,
301            if pending_abort_request.was_already_erroring {
302                // Reject abortRequest’s promise with storedError.
303                pending_abort_request
304                    .promise
305                    .reject(cx, stored_error.handle(), can_gc);
306
307                // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
308                self.reject_close_and_closed_promise_if_needed(cx, can_gc);
309
310                // Return.
311                return;
312            }
313
314            // Let promise be ! stream.[[controller]].[[AbortSteps]](abortRequest’s reason).
315            rooted!(in(*cx) let mut reason = UndefinedValue());
316            reason.set(pending_abort_request.reason.get());
317            let promise = controller.abort_steps(cx, global, reason.handle(), can_gc);
318
319            // Upon fulfillment of promise,
320            rooted!(in(*cx) let mut fulfillment_handler = Some(AbortAlgorithmFulfillmentHandler {
321                stream: Dom::from_ref(self),
322                abort_request_promise: pending_abort_request.promise.clone(),
323            }));
324
325            // Upon rejection of promise with reason r,
326            rooted!(in(*cx) let mut rejection_handler = Some(AbortAlgorithmRejectionHandler {
327                stream: Dom::from_ref(self),
328                abort_request_promise: pending_abort_request.promise.clone(),
329            }));
330
331            let handler = PromiseNativeHandler::new(
332                global,
333                fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
334                rejection_handler.take().map(|h| Box::new(h) as Box<_>),
335                can_gc,
336            );
337            let realm = enter_realm(global);
338            let comp = InRealm::Entered(&realm);
339            promise.append_native_handler(&handler, comp, can_gc);
340        }
341    }
342
343    /// <https://streams.spec.whatwg.org/#writable-stream-reject-close-and-closed-promise-if-needed>
344    fn reject_close_and_closed_promise_if_needed(&self, cx: SafeJSContext, can_gc: CanGc) {
345        // Assert: stream.[[state]] is "errored".
346        assert!(self.is_errored());
347
348        rooted!(in(*cx) let mut stored_error = UndefinedValue());
349        self.get_stored_error(stored_error.handle_mut());
350
351        // If stream.[[closeRequest]] is not undefined
352        let close_request = self.close_request.borrow_mut().take();
353        if let Some(close_request) = close_request {
354            // Assert: stream.[[inFlightCloseRequest]] is undefined.
355            assert!(self.in_flight_close_request.borrow().is_none());
356
357            // Reject stream.[[closeRequest]] with stream.[[storedError]].
358            close_request.reject_native(&stored_error.handle(), can_gc)
359
360            // Set stream.[[closeRequest]] to undefined.
361            // Done with `take` above.
362        }
363
364        // Let writer be stream.[[writer]].
365        // If writer is not undefined,
366        if let Some(writer) = self.writer.get() {
367            // Reject writer.[[closedPromise]] with stream.[[storedError]].
368            writer.reject_closed_promise_with_stored_error(&stored_error.handle(), can_gc);
369
370            // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
371            writer.set_close_promise_is_handled();
372        }
373    }
374
375    /// <https://streams.spec.whatwg.org/#writable-stream-close-queued-or-in-flight>
376    pub(crate) fn close_queued_or_in_flight(&self) -> bool {
377        let close_requested = self.close_request.borrow().is_some();
378        let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
379
380        close_requested || in_flight_close_requested
381    }
382
383    /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-write>
384    pub(crate) fn finish_in_flight_write(&self, can_gc: CanGc) {
385        let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
386            // Assert: stream.[[inFlightWriteRequest]] is not undefined.
387            unreachable!("Stream should have a write request");
388        };
389
390        // Resolve stream.[[inFlightWriteRequest]] with undefined.
391        in_flight_write_request.resolve_native(&(), can_gc);
392
393        // Set stream.[[inFlightWriteRequest]] to undefined.
394        // Done above with `take`.
395    }
396
397    /// <https://streams.spec.whatwg.org/#writable-stream-start-erroring>
398    pub(crate) fn start_erroring(
399        &self,
400        cx: SafeJSContext,
401        global: &GlobalScope,
402        error: SafeHandleValue,
403        can_gc: CanGc,
404    ) {
405        // Assert: stream.[[storedError]] is undefined.
406        assert!(self.stored_error.get().is_undefined());
407
408        // Assert: stream.[[state]] is "writable".
409        assert!(self.is_writable());
410
411        // Let controller be stream.[[controller]].
412        let Some(controller) = self.controller.get() else {
413            // Assert: controller is not undefined.
414            unreachable!("Stream should have a controller.");
415        };
416
417        // Set stream.[[state]] to "erroring".
418        self.state.set(WritableStreamState::Erroring);
419
420        // Set stream.[[storedError]] to reason.
421        self.stored_error.set(*error);
422
423        // Let writer be stream.[[writer]].
424        if let Some(writer) = self.writer.get() {
425            // If writer is not undefined, perform ! WritableStreamDefaultWriterEnsureReadyPromiseRejected
426            writer.ensure_ready_promise_rejected(global, error, can_gc);
427        }
428
429        // If ! WritableStreamHasOperationMarkedInFlight(stream) is false and controller.[[started]] is true
430        if !self.has_operations_marked_inflight() && controller.started() {
431            // perform ! WritableStreamFinishErroring
432            self.finish_erroring(cx, global, can_gc);
433        }
434    }
435
436    /// <https://streams.spec.whatwg.org/#writable-stream-deal-with-rejection>
437    pub(crate) fn deal_with_rejection(
438        &self,
439        cx: SafeJSContext,
440        global: &GlobalScope,
441        error: SafeHandleValue,
442        can_gc: CanGc,
443    ) {
444        // Let state be stream.[[state]].
445
446        // If state is "writable",
447        if self.is_writable() {
448            // Perform ! WritableStreamStartErroring(stream, error).
449            self.start_erroring(cx, global, error, can_gc);
450
451            // Return.
452            return;
453        }
454
455        // Assert: state is "erroring".
456        assert!(self.is_erroring());
457
458        // Perform ! WritableStreamFinishErroring(stream).
459        self.finish_erroring(cx, global, can_gc);
460    }
461
462    /// <https://streams.spec.whatwg.org/#writable-stream-mark-first-write-request-in-flight>
463    pub(crate) fn mark_first_write_request_in_flight(&self) {
464        let mut in_flight_write_request = self.in_flight_write_request.borrow_mut();
465        let mut write_requests = self.write_requests.borrow_mut();
466
467        // Assert: stream.[[inFlightWriteRequest]] is undefined.
468        assert!(in_flight_write_request.is_none());
469
470        // Assert: stream.[[writeRequests]] is not empty.
471        assert!(!write_requests.is_empty());
472
473        // Let writeRequest be stream.[[writeRequests]][0].
474        // Remove writeRequest from stream.[[writeRequests]].
475        let write_request = write_requests.pop_front().unwrap();
476
477        // Set stream.[[inFlightWriteRequest]] to writeRequest.
478        *in_flight_write_request = Some(write_request);
479    }
480
481    /// <https://streams.spec.whatwg.org/#writable-stream-mark-close-request-in-flight>
482    pub(crate) fn mark_close_request_in_flight(&self) {
483        let mut in_flight_close_request = self.in_flight_close_request.borrow_mut();
484        let mut close_request = self.close_request.borrow_mut();
485
486        // Assert: stream.[[inFlightCloseRequest]] is undefined.
487        assert!(in_flight_close_request.is_none());
488
489        // Assert: stream.[[closeRequest]] is not undefined.
490        assert!(close_request.is_some());
491
492        // Let closeRequest be stream.[[closeRequest]].
493        // Set stream.[[closeRequest]] to undefined.
494        let close_request = close_request.take().unwrap();
495
496        // Set stream.[[inFlightCloseRequest]] to closeRequest.
497        *in_flight_close_request = Some(close_request);
498    }
499
500    /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-close>
501    pub(crate) fn finish_in_flight_close(&self, cx: SafeJSContext, can_gc: CanGc) {
502        let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
503            // Assert: stream.[[inFlightCloseRequest]] is not undefined.
504            unreachable!("in_flight_close_request must be Some");
505        };
506
507        // Resolve stream.[[inFlightCloseRequest]] with undefined.
508        in_flight_close_request.resolve_native(&(), can_gc);
509
510        // Set stream.[[inFlightCloseRequest]] to undefined.
511        // Done with take above.
512
513        // Assert: stream.[[state]] is "writable" or "erroring".
514        assert!(self.is_writable() || self.is_erroring());
515
516        // If state is "erroring",
517        if self.is_erroring() {
518            // Set stream.[[storedError]] to undefined.
519            self.stored_error.set(UndefinedValue());
520
521            // If stream.[[pendingAbortRequest]] is not undefined,
522            rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
523            if let Some(pending_abort_request) = &*pending_abort_request {
524                // Resolve stream.[[pendingAbortRequest]]'s promise with undefined.
525                pending_abort_request.promise.resolve_native(&(), can_gc);
526
527                // Set stream.[[pendingAbortRequest]] to undefined.
528                // Done above with `take`.
529            }
530        }
531
532        // Set stream.[[state]] to "closed".
533        self.state.set(WritableStreamState::Closed);
534
535        // Let writer be stream.[[writer]].
536        if let Some(writer) = self.writer.get() {
537            // If writer is not undefined,
538            // resolve writer.[[closedPromise]] with undefined.
539            writer.resolve_closed_promise_with_undefined(can_gc);
540        }
541
542        // Assert: stream.[[pendingAbortRequest]] is undefined.
543        assert!(self.pending_abort_request.borrow().is_none());
544
545        // Assert: stream.[[storedError]] is undefined.
546        assert!(self.stored_error.get().is_undefined());
547    }
548
549    /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-close-with-error>
550    pub(crate) fn finish_in_flight_close_with_error(
551        &self,
552        cx: SafeJSContext,
553        global: &GlobalScope,
554        error: SafeHandleValue,
555        can_gc: CanGc,
556    ) {
557        let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
558            // Assert: stream.[[inFlightCloseRequest]] is not undefined.
559            unreachable!("Inflight close request must be defined.");
560        };
561
562        // Reject stream.[[inFlightCloseRequest]] with error.
563        in_flight_close_request.reject_native(&error, can_gc);
564
565        // Set stream.[[inFlightCloseRequest]] to undefined.
566        // Done above with `take`.
567
568        // Assert: stream.[[state]] is "writable" or "erroring".
569        assert!(self.is_erroring() || self.is_writable());
570
571        // If stream.[[pendingAbortRequest]] is not undefined,
572        rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
573        if let Some(pending_abort_request) = &*pending_abort_request {
574            // Reject stream.[[pendingAbortRequest]]'s promise with error.
575            pending_abort_request.promise.reject_native(&error, can_gc);
576
577            // Set stream.[[pendingAbortRequest]] to undefined.
578            // Done above with `take`.
579        }
580
581        // Perform ! WritableStreamDealWithRejection(stream, error).
582        self.deal_with_rejection(cx, global, error, can_gc);
583    }
584
585    /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-write-with-error>
586    pub(crate) fn finish_in_flight_write_with_error(
587        &self,
588        cx: SafeJSContext,
589        global: &GlobalScope,
590        error: SafeHandleValue,
591        can_gc: CanGc,
592    ) {
593        let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
594            // Assert: stream.[[inFlightWriteRequest]] is not undefined.
595            unreachable!("Inflight write request must be defined.");
596        };
597
598        // Reject stream.[[inFlightWriteRequest]] with error.
599        in_flight_write_request.reject_native(&error, can_gc);
600
601        // Set stream.[[inFlightWriteRequest]] to undefined.
602        // Done above with `take`.
603
604        // Assert: stream.[[state]] is "writable" or "erroring".
605        assert!(self.is_erroring() || self.is_writable());
606
607        // Perform ! WritableStreamDealWithRejection(stream, error).
608        self.deal_with_rejection(cx, global, error, can_gc);
609    }
610
611    pub(crate) fn get_writer(&self) -> Option<DomRoot<WritableStreamDefaultWriter>> {
612        self.writer.get()
613    }
614
615    pub(crate) fn set_writer(&self, writer: Option<&WritableStreamDefaultWriter>) {
616        self.writer.set(writer);
617    }
618
619    pub(crate) fn set_backpressure(&self, backpressure: bool) {
620        self.backpressure.set(backpressure);
621    }
622
623    pub(crate) fn get_backpressure(&self) -> bool {
624        self.backpressure.get()
625    }
626
627    /// <https://streams.spec.whatwg.org/#is-writable-stream-locked>
628    pub(crate) fn is_locked(&self) -> bool {
629        // If stream.[[writer]] is undefined, return false.
630        // Return true.
631        self.get_writer().is_some()
632    }
633
634    /// <https://streams.spec.whatwg.org/#writable-stream-add-write-request>
635    pub(crate) fn add_write_request(&self, global: &GlobalScope, can_gc: CanGc) -> Rc<Promise> {
636        // Assert: ! IsWritableStreamLocked(stream) is true.
637        assert!(self.is_locked());
638
639        // Assert: stream.[[state]] is "writable".
640        assert!(self.is_writable());
641
642        // Let promise be a new promise.
643        let promise = Promise::new(global, can_gc);
644
645        // Append promise to stream.[[writeRequests]].
646        self.write_requests.borrow_mut().push_back(promise.clone());
647
648        // Return promise.
649        promise
650    }
651
652    // Returns the rooted controller of the stream, if any.
653    pub(crate) fn get_controller(&self) -> Option<DomRoot<WritableStreamDefaultController>> {
654        self.controller.get()
655    }
656
657    /// <https://streams.spec.whatwg.org/#writable-stream-abort>
658    pub(crate) fn abort(
659        &self,
660        cx: SafeJSContext,
661        global: &GlobalScope,
662        provided_reason: SafeHandleValue,
663        realm: InRealm,
664        can_gc: CanGc,
665    ) -> Rc<Promise> {
666        // If stream.[[state]] is "closed" or "errored",
667        if self.is_closed() || self.is_errored() {
668            // return a promise resolved with undefined.
669            return Promise::new_resolved(global, cx, (), can_gc);
670        }
671
672        // Signal abort on stream.[[controller]].[[abortController]] with reason.
673        self.get_controller()
674            .expect("Stream must have a controller.")
675            .signal_abort(cx, provided_reason, realm, can_gc);
676
677        // Let state be stream.[[state]].
678        let state = self.state.get();
679
680        // If state is "closed" or "errored", return a promise resolved with undefined.
681        if matches!(
682            state,
683            WritableStreamState::Closed | WritableStreamState::Errored
684        ) {
685            return Promise::new_resolved(global, cx, (), can_gc);
686        }
687
688        // If stream.[[pendingAbortRequest]] is not undefined,
689        if self.pending_abort_request.borrow().is_some() {
690            // return stream.[[pendingAbortRequest]]'s promise.
691            return self
692                .pending_abort_request
693                .borrow()
694                .as_ref()
695                .expect("Pending abort request must be Some.")
696                .promise
697                .clone();
698        }
699
700        // Assert: state is "writable" or "erroring".
701        assert!(self.is_writable() || self.is_erroring());
702
703        // Let wasAlreadyErroring be false.
704        let mut was_already_erroring = false;
705        rooted!(in(*cx) let undefined_reason = UndefinedValue());
706
707        // If state is "erroring",
708        let reason = if self.is_erroring() {
709            // Set wasAlreadyErroring to true.
710            was_already_erroring = true;
711
712            // Set reason to undefined.
713            undefined_reason.handle()
714        } else {
715            // Use the provided reason.
716            provided_reason
717        };
718
719        // Let promise be a new promise.
720        let promise = Promise::new(global, can_gc);
721
722        // Set stream.[[pendingAbortRequest]] to a new pending abort request
723        // whose promise is promise,
724        // reason is reason,
725        // and was already erroring is wasAlreadyErroring.
726        *self.pending_abort_request.borrow_mut() = Some(PendingAbortRequest {
727            promise: promise.clone(),
728            reason: Heap::boxed(reason.get()),
729            was_already_erroring,
730        });
731
732        // If wasAlreadyErroring is false,
733        if !was_already_erroring {
734            // perform ! WritableStreamStartErroring(stream, reason)
735            self.start_erroring(cx, global, reason, can_gc);
736        }
737
738        // Return promise.
739        promise
740    }
741
742    /// <https://streams.spec.whatwg.org/#writable-stream-close>
743    pub(crate) fn close(
744        &self,
745        cx: SafeJSContext,
746        global: &GlobalScope,
747        can_gc: CanGc,
748    ) -> Rc<Promise> {
749        // Let state be stream.[[state]].
750        // If state is "closed" or "errored",
751        if self.is_closed() || self.is_errored() {
752            // return a promise rejected with a TypeError exception.
753            let promise = Promise::new(global, can_gc);
754            promise.reject_error(
755                Error::Type("Stream is closed or errored.".to_string()),
756                can_gc,
757            );
758            return promise;
759        }
760
761        // Assert: state is "writable" or "erroring".
762        assert!(self.is_writable() || self.is_erroring());
763
764        // Assert: ! WritableStreamCloseQueuedOrInFlight(stream) is false.
765        assert!(!self.close_queued_or_in_flight());
766
767        // Let promise be a new promise.
768        let promise = Promise::new(global, can_gc);
769
770        // Set stream.[[closeRequest]] to promise.
771        *self.close_request.borrow_mut() = Some(promise.clone());
772
773        // Let writer be stream.[[writer]].
774        // If writer is not undefined,
775        if let Some(writer) = self.writer.get() {
776            // and stream.[[backpressure]] is true,
777            // and state is "writable",
778            if self.get_backpressure() && self.is_writable() {
779                // resolve writer.[[readyPromise]] with undefined.
780                writer.resolve_ready_promise_with_undefined(can_gc);
781            }
782        }
783
784        // Perform ! WritableStreamDefaultControllerClose(stream.[[controller]]).
785        let Some(controller) = self.controller.get() else {
786            unreachable!("Stream must have a controller.");
787        };
788        controller.close(cx, global, can_gc);
789
790        // Return promise.
791        promise
792    }
793
794    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-get-desired-size>
795    /// Note: implement as a stream method, as opposed to a writer one, for convenience.
796    pub(crate) fn get_desired_size(&self) -> Option<f64> {
797        // Let stream be writer.[[stream]].
798        // Stream is `self`.
799
800        // Let state be stream.[[state]].
801        // If state is "errored" or "erroring", return null.
802        if self.is_errored() || self.is_erroring() {
803            return None;
804        }
805
806        // If state is "closed", return 0.
807        if self.is_closed() {
808            return Some(0.);
809        }
810
811        let Some(controller) = self.controller.get() else {
812            unreachable!("Stream must have a controller.");
813        };
814        Some(controller.get_desired_size())
815    }
816
817    /// <https://streams.spec.whatwg.org/#acquire-writable-stream-default-writer>
818    pub(crate) fn aquire_default_writer(
819        &self,
820        cx: SafeJSContext,
821        global: &GlobalScope,
822        can_gc: CanGc,
823    ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
824        // Let writer be a new WritableStreamDefaultWriter object.
825        let writer = WritableStreamDefaultWriter::new(global, None, can_gc);
826
827        // Perform ? SetUpWritableStreamDefaultWriter(writer, stream).
828        writer.setup(cx, self, can_gc)?;
829
830        // Return writer.
831        Ok(writer)
832    }
833
834    /// <https://streams.spec.whatwg.org/#writable-stream-update-backpressure>
835    pub(crate) fn update_backpressure(
836        &self,
837        backpressure: bool,
838        global: &GlobalScope,
839        can_gc: CanGc,
840    ) {
841        // Assert: stream.[[state]] is "writable".
842        self.is_writable();
843
844        // Assert: ! WritableStreamCloseQueuedOrInFlight(stream) is false.
845        assert!(!self.close_queued_or_in_flight());
846
847        // Let writer be stream.[[writer]].
848        let writer = self.get_writer();
849
850        if let Some(writer) = writer {
851            // If writer is not undefined
852            if backpressure != self.get_backpressure() {
853                // and backpressure is not stream.[[backpressure]],
854                if backpressure {
855                    // If backpressure is true, set writer.[[readyPromise]] to a new promise.
856                    let promise = Promise::new(global, can_gc);
857                    writer.set_ready_promise(promise);
858                } else {
859                    // Otherwise,
860                    // Assert: backpressure is false.
861                    assert!(!backpressure);
862                    // Resolve writer.[[readyPromise]] with undefined.
863                    writer.resolve_ready_promise_with_undefined(can_gc);
864                }
865            }
866        }
867
868        // Set stream.[[backpressure]] to backpressure.
869        self.set_backpressure(backpressure);
870    }
871
872    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
873    pub(crate) fn setup_cross_realm_transform_writable(
874        &self,
875        cx: SafeJSContext,
876        port: &MessagePort,
877        can_gc: CanGc,
878    ) {
879        let port_id = port.message_port_id();
880        let global = self.global();
881
882        // Perform ! InitializeWritableStream(stream).
883        // Done in `new_inherited`.
884
885        // Let sizeAlgorithm be an algorithm that returns 1.
886        // Re-ordered because of the need to pass it to `new`.
887        let size_algorithm = extract_size_algorithm(&QueuingStrategy::default(), can_gc);
888
889        // Note: other algorithms defined in the controller at call site.
890
891        // Let backpressurePromise be a new promise.
892        let backpressure_promise = Rc::new(RefCell::new(Some(Promise::new(&global, can_gc))));
893
894        // Let controller be a new WritableStreamDefaultController.
895        let controller = WritableStreamDefaultController::new(
896            &global,
897            UnderlyingSinkType::Transfer {
898                backpressure_promise: backpressure_promise.clone(),
899                port: Dom::from_ref(port),
900            },
901            1.0,
902            size_algorithm,
903            can_gc,
904        );
905
906        // Add a handler for port’s message event with the following steps:
907        // Add a handler for port’s messageerror event with the following steps:
908        rooted!(in(*cx) let cross_realm_transform_writable = CrossRealmTransformWritable {
909            controller: Dom::from_ref(&controller),
910            backpressure_promise: backpressure_promise.clone(),
911        });
912        global.note_cross_realm_transform_writable(&cross_realm_transform_writable, port_id);
913
914        // Enable port’s port message queue.
915        port.Start(can_gc);
916
917        // Perform ! SetUpWritableStreamDefaultController
918        controller
919            .setup(cx, &global, self, can_gc)
920            .expect("Setup for transfer cannot fail");
921    }
922    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink>
923    #[allow(clippy::too_many_arguments)]
924    pub(crate) fn setup_from_underlying_sink(
925        &self,
926        cx: SafeJSContext,
927        global: &GlobalScope,
928        stream: &WritableStream,
929        underlying_sink_obj: SafeHandleObject,
930        underlying_sink: &UnderlyingSink,
931        strategy_hwm: f64,
932        strategy_size: Rc<QueuingStrategySize>,
933        can_gc: CanGc,
934    ) -> Result<(), Error> {
935        // Let controller be a new WritableStreamDefaultController.
936
937        // Let startAlgorithm be an algorithm that returns undefined.
938
939        // Let writeAlgorithm be an algorithm that returns a promise resolved with undefined.
940
941        // Let closeAlgorithm be an algorithm that returns a promise resolved with undefined.
942
943        // Let abortAlgorithm be an algorithm that returns a promise resolved with undefined.
944
945        // If underlyingSinkDict["start"] exists, then set startAlgorithm to an algorithm which
946        // returns the result of invoking underlyingSinkDict["start"] with argument
947        // list « controller », exception behavior "rethrow", and callback this value underlyingSink.
948
949        // If underlyingSinkDict["write"] exists, then set writeAlgorithm to an algorithm which
950        // takes an argument chunk and returns the result of invoking underlyingSinkDict["write"]
951        // with argument list « chunk, controller » and callback this value underlyingSink.
952
953        // If underlyingSinkDict["close"] exists, then set closeAlgorithm to an algorithm which
954        // returns the result of invoking underlyingSinkDict["close"] with argument
955        // list «» and callback this value underlyingSink.
956
957        // If underlyingSinkDict["abort"] exists, then set abortAlgorithm to an algorithm which
958        // takes an argument reason and returns the result of invoking underlyingSinkDict["abort"]
959        // with argument list « reason » and callback this value underlyingSink.
960        let controller = WritableStreamDefaultController::new(
961            global,
962            UnderlyingSinkType::new_js(
963                underlying_sink.abort.clone(),
964                underlying_sink.start.clone(),
965                underlying_sink.close.clone(),
966                underlying_sink.write.clone(),
967            ),
968            strategy_hwm,
969            strategy_size,
970            can_gc,
971        );
972
973        // Note: this must be done before `setup`,
974        // otherwise `thisOb` is null in the start callback.
975        controller.set_underlying_sink_this_object(underlying_sink_obj);
976
977        // Perform ? SetUpWritableStreamDefaultController
978        controller.setup(cx, global, stream, can_gc)
979    }
980}
981
982/// <https://streams.spec.whatwg.org/#create-writable-stream>
983#[cfg_attr(crown, expect(crown::unrooted_must_root))]
984pub(crate) fn create_writable_stream(
985    cx: SafeJSContext,
986    global: &GlobalScope,
987    writable_high_water_mark: f64,
988    writable_size_algorithm: Rc<QueuingStrategySize>,
989    underlying_sink_type: UnderlyingSinkType,
990    can_gc: CanGc,
991) -> Fallible<DomRoot<WritableStream>> {
992    // Assert: ! IsNonNegativeNumber(highWaterMark) is true.
993    assert!(writable_high_water_mark >= 0.0);
994
995    // Let stream be a new WritableStream.
996    // Perform ! InitializeWritableStream(stream).
997    let stream = WritableStream::new_with_proto(global, None, can_gc);
998
999    // Let controller be a new WritableStreamDefaultController.
1000    let controller = WritableStreamDefaultController::new(
1001        global,
1002        underlying_sink_type,
1003        writable_high_water_mark,
1004        writable_size_algorithm,
1005        can_gc,
1006    );
1007
1008    // Perform ? SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm,
1009    // closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm).
1010    controller.setup(cx, global, &stream, can_gc)?;
1011
1012    // Return stream.
1013    Ok(stream)
1014}
1015
1016impl WritableStreamMethods<crate::DomTypeHolder> for WritableStream {
1017    /// <https://streams.spec.whatwg.org/#ws-constructor>
1018    fn Constructor(
1019        cx: SafeJSContext,
1020        global: &GlobalScope,
1021        proto: Option<SafeHandleObject>,
1022        can_gc: CanGc,
1023        underlying_sink: Option<*mut JSObject>,
1024        strategy: &QueuingStrategy,
1025    ) -> Fallible<DomRoot<WritableStream>> {
1026        // If underlyingSink is missing, set it to null.
1027        rooted!(in(*cx) let underlying_sink_obj = underlying_sink.unwrap_or(ptr::null_mut()));
1028
1029        // Let underlyingSinkDict be underlyingSink,
1030        // converted to an IDL value of type UnderlyingSink.
1031        let underlying_sink_dict = if !underlying_sink_obj.is_null() {
1032            rooted!(in(*cx) let obj_val = ObjectValue(underlying_sink_obj.get()));
1033            match UnderlyingSink::new(cx, obj_val.handle(), can_gc) {
1034                Ok(ConversionResult::Success(val)) => val,
1035                Ok(ConversionResult::Failure(error)) => return Err(Error::Type(error.to_string())),
1036                _ => {
1037                    return Err(Error::JSFailed);
1038                },
1039            }
1040        } else {
1041            UnderlyingSink::empty()
1042        };
1043
1044        if !underlying_sink_dict.type_.handle().is_undefined() {
1045            // If underlyingSinkDict["type"] exists, throw a RangeError exception.
1046            return Err(Error::Range("type is set".to_string()));
1047        }
1048
1049        // Perform ! InitializeWritableStream(this).
1050        let stream = WritableStream::new_with_proto(global, proto, can_gc);
1051
1052        // Let sizeAlgorithm be ! ExtractSizeAlgorithm(strategy).
1053        let size_algorithm = extract_size_algorithm(strategy, can_gc);
1054
1055        // Let highWaterMark be ? ExtractHighWaterMark(strategy, 1).
1056        let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
1057
1058        // Perform ? SetUpWritableStreamDefaultControllerFromUnderlyingSink(this, underlyingSink,
1059        // underlyingSinkDict, highWaterMark, sizeAlgorithm).
1060        stream.setup_from_underlying_sink(
1061            cx,
1062            global,
1063            &stream,
1064            underlying_sink_obj.handle(),
1065            &underlying_sink_dict,
1066            high_water_mark,
1067            size_algorithm,
1068            can_gc,
1069        )?;
1070
1071        Ok(stream)
1072    }
1073
1074    /// <https://streams.spec.whatwg.org/#ws-locked>
1075    fn Locked(&self) -> bool {
1076        // Return ! IsWritableStreamLocked(this).
1077        self.is_locked()
1078    }
1079
1080    /// <https://streams.spec.whatwg.org/#ws-abort>
1081    fn Abort(
1082        &self,
1083        cx: SafeJSContext,
1084        reason: SafeHandleValue,
1085        realm: InRealm,
1086        can_gc: CanGc,
1087    ) -> Rc<Promise> {
1088        let global = GlobalScope::from_safe_context(cx, realm);
1089
1090        // If ! IsWritableStreamLocked(this) is true,
1091        if self.is_locked() {
1092            // return a promise rejected with a TypeError exception.
1093            let promise = Promise::new(&global, can_gc);
1094            promise.reject_error(Error::Type("Stream is locked.".to_string()), can_gc);
1095            return promise;
1096        }
1097
1098        // Return ! WritableStreamAbort(this, reason).
1099        self.abort(cx, &global, reason, realm, can_gc)
1100    }
1101
1102    /// <https://streams.spec.whatwg.org/#ws-close>
1103    fn Close(&self, realm: InRealm, can_gc: CanGc) -> Rc<Promise> {
1104        let cx = GlobalScope::get_cx();
1105        let global = GlobalScope::from_safe_context(cx, realm);
1106
1107        // If ! IsWritableStreamLocked(this) is true,
1108        if self.is_locked() {
1109            // return a promise rejected with a TypeError exception.
1110            let promise = Promise::new(&global, can_gc);
1111            promise.reject_error(Error::Type("Stream is locked.".to_string()), can_gc);
1112            return promise;
1113        }
1114
1115        // If ! WritableStreamCloseQueuedOrInFlight(this) is true
1116        if self.close_queued_or_in_flight() {
1117            // return a promise rejected with a TypeError exception.
1118            let promise = Promise::new(&global, can_gc);
1119            promise.reject_error(
1120                Error::Type("Stream has closed queued or in-flight".to_string()),
1121                can_gc,
1122            );
1123            return promise;
1124        }
1125
1126        // Return ! WritableStreamClose(this).
1127        self.close(cx, &global, can_gc)
1128    }
1129
1130    /// <https://streams.spec.whatwg.org/#ws-get-writer>
1131    fn GetWriter(
1132        &self,
1133        realm: InRealm,
1134        can_gc: CanGc,
1135    ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
1136        let cx = GlobalScope::get_cx();
1137        let global = GlobalScope::from_safe_context(cx, realm);
1138
1139        // Return ? AcquireWritableStreamDefaultWriter(this).
1140        self.aquire_default_writer(cx, &global, can_gc)
1141    }
1142}
1143
1144impl js::gc::Rootable for CrossRealmTransformWritable {}
1145
1146/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
1147/// A wrapper to handle `message` and `messageerror` events
1148/// for the port used by the transfered stream.
1149#[derive(Clone, JSTraceable, MallocSizeOf)]
1150#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
1151pub(crate) struct CrossRealmTransformWritable {
1152    /// The controller used in the algorithm.
1153    controller: Dom<WritableStreamDefaultController>,
1154
1155    /// The `backpressurePromise` used in the algorithm.
1156    #[ignore_malloc_size_of = "nested Rc"]
1157    backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
1158}
1159
1160impl CrossRealmTransformWritable {
1161    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
1162    /// Add a handler for port’s message event with the following steps:
1163    #[expect(unsafe_code)]
1164    pub(crate) fn handle_message(
1165        &self,
1166        cx: SafeJSContext,
1167        global: &GlobalScope,
1168        message: SafeHandleValue,
1169        _realm: InRealm,
1170        can_gc: CanGc,
1171    ) {
1172        rooted!(in(*cx) let mut value = UndefinedValue());
1173        let type_string =
1174            unsafe { get_type_and_value_from_message(cx, message, value.handle_mut(), can_gc) };
1175
1176        // If type is "pull",
1177        // Done below as the steps are the same for both types.
1178
1179        // Otherwise, if type is "error",
1180        if type_string == "error" {
1181            // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, value).
1182            self.controller
1183                .error_if_needed(cx, value.handle(), global, can_gc);
1184        }
1185
1186        let backpressure_promise = self.backpressure_promise.borrow_mut().take();
1187
1188        // Note: the below steps are for both "pull" and "error" types.
1189        // If backpressurePromise is not undefined,
1190        if let Some(promise) = backpressure_promise {
1191            // Resolve backpressurePromise with undefined.
1192            promise.resolve_native(&(), can_gc);
1193
1194            // Set backpressurePromise to undefined.
1195            // Done above with `take`.
1196        }
1197    }
1198
1199    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
1200    /// Add a handler for port’s messageerror event with the following steps:
1201    pub(crate) fn handle_error(
1202        &self,
1203        cx: SafeJSContext,
1204        global: &GlobalScope,
1205        port: &MessagePort,
1206        _realm: InRealm,
1207        can_gc: CanGc,
1208    ) {
1209        // Let error be a new "DataCloneError" DOMException.
1210        let error = DOMException::new(global, DOMErrorName::DataCloneError, can_gc);
1211        rooted!(in(*cx) let mut rooted_error = UndefinedValue());
1212        error.safe_to_jsval(cx, rooted_error.handle_mut(), can_gc);
1213
1214        // Perform ! CrossRealmTransformSendError(port, error).
1215        port.cross_realm_transform_send_error(rooted_error.handle(), can_gc);
1216
1217        // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, error).
1218        self.controller
1219            .error_if_needed(cx, rooted_error.handle(), global, can_gc);
1220
1221        // Disentangle port.
1222        global.disentangle_port(port, can_gc);
1223    }
1224}
1225
1226/// <https://streams.spec.whatwg.org/#ws-transfer>
1227impl Transferable for WritableStream {
1228    type Index = MessagePortIndex;
1229    type Data = MessagePortImpl;
1230
1231    /// <https://streams.spec.whatwg.org/#ref-for-transfer-steps①>
1232    fn transfer(&self) -> Fallible<(MessagePortId, MessagePortImpl)> {
1233        // Step 1. If ! IsWritableStreamLocked(value) is true, throw a
1234        // "DataCloneError" DOMException.
1235        if self.is_locked() {
1236            return Err(Error::DataClone(None));
1237        }
1238
1239        let global = self.global();
1240        let realm = enter_realm(&*global);
1241        let comp = InRealm::Entered(&realm);
1242        let cx = GlobalScope::get_cx();
1243        let can_gc = CanGc::note();
1244
1245        // Step 2. Let port1 be a new MessagePort in the current Realm.
1246        let port_1 = MessagePort::new(&global, can_gc);
1247        global.track_message_port(&port_1, None);
1248
1249        // Step 3. Let port2 be a new MessagePort in the current Realm.
1250        let port_2 = MessagePort::new(&global, can_gc);
1251        global.track_message_port(&port_2, None);
1252
1253        // Step 4. Entangle port1 and port2.
1254        global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
1255
1256        // Step 5. Let readable be a new ReadableStream in the current Realm.
1257        let readable = ReadableStream::new_with_proto(&global, None, can_gc);
1258
1259        // Step 6. Perform ! SetUpCrossRealmTransformReadable(readable, port1).
1260        readable.setup_cross_realm_transform_readable(cx, &port_1, can_gc);
1261
1262        // Step 7. Let promise be ! ReadableStreamPipeTo(readable, value, false, false, false).
1263        let promise = readable.pipe_to(cx, &global, self, false, false, false, None, comp, can_gc);
1264
1265        // Step 8. Set promise.[[PromiseIsHandled]] to true.
1266        promise.set_promise_is_handled();
1267
1268        // Step 9. Set dataHolder.[[port]] to ! StructuredSerializeWithTransfer(port2, « port2 »).
1269        port_2.transfer()
1270    }
1271
1272    /// <https://streams.spec.whatwg.org/#ref-for-transfer-receiving-steps①>
1273    fn transfer_receive(
1274        owner: &GlobalScope,
1275        id: MessagePortId,
1276        port_impl: MessagePortImpl,
1277    ) -> Result<DomRoot<Self>, ()> {
1278        let cx = GlobalScope::get_cx();
1279        let can_gc = CanGc::note();
1280
1281        // Their transfer-receiving steps, given dataHolder and value, are:
1282        // Note: dataHolder is used in `structuredclone.rs`, and value is created here.
1283        let value = WritableStream::new_with_proto(owner, None, can_gc);
1284
1285        // Step 1. Let deserializedRecord be !
1286        // StructuredDeserializeWithTransfer(dataHolder.[[port]], the current
1287        // Realm).
1288        // Done with the `Deserialize` derive of `MessagePortImpl`.
1289
1290        // Step 2. Let port be deserializedRecord.[[Deserialized]].
1291        let transferred_port = MessagePort::transfer_receive(owner, id, port_impl)?;
1292
1293        // Step 3. Perform ! SetUpCrossRealmTransformWritable(value, port).
1294        value.setup_cross_realm_transform_writable(cx, &transferred_port, can_gc);
1295        Ok(value)
1296    }
1297
1298    /// Note: we are relying on the port transfer, so the data returned here are related to the port.
1299    fn serialized_storage<'a>(
1300        data: StructuredData<'a, '_>,
1301    ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
1302        match data {
1303            StructuredData::Reader(r) => &mut r.port_impls,
1304            StructuredData::Writer(w) => &mut w.ports,
1305        }
1306    }
1307}