Skip to main content

script/dom/stream/
writablestream.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::mem;
8use std::ptr::{self};
9use std::rc::Rc;
10
11use dom_struct::dom_struct;
12use js::context::JSContext;
13use js::jsapi::{Heap, JSObject};
14use js::jsval::{JSVal, ObjectValue, UndefinedValue};
15use js::realm::CurrentRealm;
16use js::rust::{
17    HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
18    MutableHandleValue as SafeMutableHandleValue,
19};
20use rustc_hash::FxHashMap;
21use script_bindings::cell::DomRefCell;
22use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
23use script_bindings::conversions::SafeToJSValConvertible;
24use script_bindings::reflector::{Reflector, reflect_dom_object_with_proto};
25use servo_base::id::{MessagePortId, MessagePortIndex};
26use servo_constellation_traits::MessagePortImpl;
27
28use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::{
29    QueuingStrategy, QueuingStrategySize,
30};
31use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::UnderlyingSink;
32use crate::dom::bindings::codegen::Bindings::WritableStreamBinding::WritableStreamMethods;
33use crate::dom::bindings::conversions::ConversionResult;
34use crate::dom::bindings::error::{Error, Fallible};
35use crate::dom::bindings::reflector::DomGlobal;
36use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
37use crate::dom::bindings::structuredclone::StructuredData;
38use crate::dom::bindings::transferable::Transferable;
39use crate::dom::domexception::{DOMErrorName, DOMException};
40use crate::dom::globalscope::GlobalScope;
41use crate::dom::messageport::MessagePort;
42use crate::dom::promise::Promise;
43use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
44use crate::dom::readablestream::{ReadableStream, get_type_and_value_from_message};
45use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
46use crate::dom::stream::writablestreamdefaultcontroller::{
47    UnderlyingSinkType, WritableStreamDefaultController,
48};
49use crate::dom::stream::writablestreamdefaultwriter::WritableStreamDefaultWriter;
50use crate::realms::{InRealm, enter_auto_realm};
51use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
52
53impl js::gc::Rootable for AbortAlgorithmFulfillmentHandler {}
54
55/// The fulfillment handler for the abort steps of
56/// <https://streams.spec.whatwg.org/#writable-stream-finish-erroring>
57#[derive(JSTraceable, MallocSizeOf)]
58#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
59struct AbortAlgorithmFulfillmentHandler {
60    stream: Dom<WritableStream>,
61    #[conditional_malloc_size_of]
62    abort_request_promise: Rc<Promise>,
63}
64
65impl Callback for AbortAlgorithmFulfillmentHandler {
66    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
67        // Resolve abortRequest’s promise with undefined.
68        self.abort_request_promise
69            .resolve_native(&(), CanGc::from_cx(cx));
70
71        // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
72        self.stream
73            .as_rooted()
74            .reject_close_and_closed_promise_if_needed(cx);
75    }
76}
77
78impl js::gc::Rootable for AbortAlgorithmRejectionHandler {}
79
80/// The rejection handler for the abort steps of
81/// <https://streams.spec.whatwg.org/#writable-stream-finish-erroring>
82#[derive(JSTraceable, MallocSizeOf)]
83#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
84struct AbortAlgorithmRejectionHandler {
85    stream: Dom<WritableStream>,
86    #[conditional_malloc_size_of]
87    abort_request_promise: Rc<Promise>,
88}
89
90impl Callback for AbortAlgorithmRejectionHandler {
91    fn callback(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) {
92        // Reject abortRequest’s promise with reason.
93        self.abort_request_promise
94            .reject_native(&reason, CanGc::from_cx(cx));
95
96        // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
97        self.stream
98            .as_rooted()
99            .reject_close_and_closed_promise_if_needed(cx);
100    }
101}
102
103impl js::gc::Rootable for PendingAbortRequest {}
104
105/// <https://streams.spec.whatwg.org/#pending-abort-request>
106#[derive(JSTraceable, MallocSizeOf)]
107#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
108struct PendingAbortRequest {
109    /// <https://streams.spec.whatwg.org/#pending-abort-request-promise>
110    #[conditional_malloc_size_of]
111    promise: Rc<Promise>,
112
113    /// <https://streams.spec.whatwg.org/#pending-abort-request-reason>
114    #[ignore_malloc_size_of = "mozjs"]
115    reason: Box<Heap<JSVal>>,
116
117    /// <https://streams.spec.whatwg.org/#pending-abort-request-was-already-erroring>
118    was_already_erroring: bool,
119}
120
121/// <https://streams.spec.whatwg.org/#writablestream-state>
122#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf)]
123pub(crate) enum WritableStreamState {
124    #[default]
125    Writable,
126    Closed,
127    Erroring,
128    Errored,
129}
130
131/// <https://streams.spec.whatwg.org/#ws-class>
132#[dom_struct]
133pub struct WritableStream {
134    reflector_: Reflector,
135
136    /// <https://streams.spec.whatwg.org/#writablestream-backpressure>
137    backpressure: Cell<bool>,
138
139    /// <https://streams.spec.whatwg.org/#writablestream-closerequest>
140    #[conditional_malloc_size_of]
141    close_request: DomRefCell<Option<Rc<Promise>>>,
142
143    /// <https://streams.spec.whatwg.org/#writablestream-controller>
144    controller: MutNullableDom<WritableStreamDefaultController>,
145
146    /// <https://streams.spec.whatwg.org/#writablestream-detached>
147    detached: Cell<bool>,
148
149    /// <https://streams.spec.whatwg.org/#writablestream-inflightwriterequest>
150    #[conditional_malloc_size_of]
151    in_flight_write_request: DomRefCell<Option<Rc<Promise>>>,
152
153    /// <https://streams.spec.whatwg.org/#writablestream-inflightcloserequest>
154    #[conditional_malloc_size_of]
155    in_flight_close_request: DomRefCell<Option<Rc<Promise>>>,
156
157    /// <https://streams.spec.whatwg.org/#writablestream-pendingabortrequest>
158    pending_abort_request: DomRefCell<Option<PendingAbortRequest>>,
159
160    /// <https://streams.spec.whatwg.org/#writablestream-state>
161    state: Cell<WritableStreamState>,
162
163    /// <https://streams.spec.whatwg.org/#writablestream-storederror>
164    #[ignore_malloc_size_of = "mozjs"]
165    stored_error: Heap<JSVal>,
166
167    /// <https://streams.spec.whatwg.org/#writablestream-writer>
168    writer: MutNullableDom<WritableStreamDefaultWriter>,
169
170    /// <https://streams.spec.whatwg.org/#writablestream-writerequests>
171    #[conditional_malloc_size_of]
172    write_requests: DomRefCell<VecDeque<Rc<Promise>>>,
173}
174
175impl WritableStream {
176    /// <https://streams.spec.whatwg.org/#initialize-writable-stream>
177    fn new_inherited() -> WritableStream {
178        WritableStream {
179            reflector_: Reflector::new(),
180            backpressure: Default::default(),
181            close_request: Default::default(),
182            controller: Default::default(),
183            detached: Default::default(),
184            in_flight_write_request: Default::default(),
185            in_flight_close_request: Default::default(),
186            pending_abort_request: Default::default(),
187            state: Default::default(),
188            stored_error: Default::default(),
189            writer: Default::default(),
190            write_requests: Default::default(),
191        }
192    }
193
194    pub(crate) fn new_with_proto(
195        global: &GlobalScope,
196        proto: Option<SafeHandleObject>,
197        can_gc: CanGc,
198    ) -> DomRoot<WritableStream> {
199        reflect_dom_object_with_proto(
200            Box::new(WritableStream::new_inherited()),
201            global,
202            proto,
203            can_gc,
204        )
205    }
206
207    /// Used as part of
208    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
209    pub(crate) fn assert_no_controller(&self) {
210        assert!(self.controller.get().is_none());
211    }
212
213    /// Used as part of
214    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
215    pub(crate) fn set_default_controller(&self, controller: &WritableStreamDefaultController) {
216        self.controller.set(Some(controller));
217    }
218
219    pub(crate) fn get_default_controller(&self) -> DomRoot<WritableStreamDefaultController> {
220        self.controller.get().expect("Controller should be set.")
221    }
222
223    pub(crate) fn is_writable(&self) -> bool {
224        matches!(self.state.get(), WritableStreamState::Writable)
225    }
226
227    pub(crate) fn is_erroring(&self) -> bool {
228        matches!(self.state.get(), WritableStreamState::Erroring)
229    }
230
231    pub(crate) fn is_errored(&self) -> bool {
232        matches!(self.state.get(), WritableStreamState::Errored)
233    }
234
235    pub(crate) fn is_closed(&self) -> bool {
236        matches!(self.state.get(), WritableStreamState::Closed)
237    }
238
239    pub(crate) fn has_in_flight_write_request(&self) -> bool {
240        self.in_flight_write_request.borrow().is_some()
241    }
242
243    /// <https://streams.spec.whatwg.org/#writable-stream-has-operation-marked-in-flight>
244    pub(crate) fn has_operations_marked_inflight(&self) -> bool {
245        let in_flight_write_requested = self.in_flight_write_request.borrow().is_some();
246        let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
247
248        in_flight_write_requested || in_flight_close_requested
249    }
250
251    /// <https://streams.spec.whatwg.org/#writablestream-storederror>
252    pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
253        handle_mut.set(self.stored_error.get());
254    }
255
256    /// <https://streams.spec.whatwg.org/#writable-stream-finish-erroring>
257    pub(crate) fn finish_erroring(&self, cx: &mut JSContext, global: &GlobalScope) {
258        // Assert: stream.[[state]] is "erroring".
259        assert!(self.is_erroring());
260
261        // Assert: ! WritableStreamHasOperationMarkedInFlight(stream) is false.
262        assert!(!self.has_operations_marked_inflight());
263
264        // Set stream.[[state]] to "errored".
265        self.state.set(WritableStreamState::Errored);
266
267        // Perform ! stream.[[controller]].[[ErrorSteps]]().
268        let Some(controller) = self.controller.get() else {
269            unreachable!("Stream should have a controller.");
270        };
271        controller.perform_error_steps();
272
273        // Let storedError be stream.[[storedError]].
274        rooted!(&in(cx) let mut stored_error = UndefinedValue());
275        self.get_stored_error(stored_error.handle_mut());
276
277        // For each writeRequest of stream.[[writeRequests]]:
278        let write_requests = mem::take(&mut *self.write_requests.borrow_mut());
279        for request in write_requests {
280            // Reject writeRequest with storedError.
281            request.reject(cx.into(), stored_error.handle(), CanGc::from_cx(cx));
282        }
283
284        // Set stream.[[writeRequests]] to an empty list.
285        // Done above with `drain`.
286
287        // If stream.[[pendingAbortRequest]] is undefined,
288        if self.pending_abort_request.borrow().is_none() {
289            // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
290            self.reject_close_and_closed_promise_if_needed(cx);
291
292            // Return.
293            return;
294        }
295
296        // Let abortRequest be stream.[[pendingAbortRequest]].
297        // Set stream.[[pendingAbortRequest]] to undefined.
298        rooted!(&in(cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
299        if let Some(pending_abort_request) = &*pending_abort_request {
300            // If abortRequest’s was already erroring is true,
301            if pending_abort_request.was_already_erroring {
302                // Reject abortRequest’s promise with storedError.
303                pending_abort_request.promise.reject(
304                    cx.into(),
305                    stored_error.handle(),
306                    CanGc::from_cx(cx),
307                );
308
309                // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
310                self.reject_close_and_closed_promise_if_needed(cx);
311
312                // Return.
313                return;
314            }
315
316            // Let promise be ! stream.[[controller]].[[AbortSteps]](abortRequest’s reason).
317            rooted!(&in(cx) let mut reason = UndefinedValue());
318            reason.set(pending_abort_request.reason.get());
319            let promise = controller.abort_steps(cx, global, reason.handle());
320
321            // Upon fulfillment of promise,
322            rooted!(&in(cx) let mut fulfillment_handler = Some(AbortAlgorithmFulfillmentHandler {
323                stream: Dom::from_ref(self),
324                abort_request_promise: pending_abort_request.promise.clone(),
325            }));
326
327            // Upon rejection of promise with reason r,
328            rooted!(&in(cx) let mut rejection_handler = Some(AbortAlgorithmRejectionHandler {
329                stream: Dom::from_ref(self),
330                abort_request_promise: pending_abort_request.promise.clone(),
331            }));
332
333            let handler = PromiseNativeHandler::new(
334                global,
335                fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
336                rejection_handler.take().map(|h| Box::new(h) as Box<_>),
337                CanGc::from_cx(cx),
338            );
339
340            let mut realm = enter_auto_realm(cx, global);
341            let cx = &mut realm.current_realm();
342            promise.append_native_handler(cx, &handler);
343        }
344    }
345
346    /// <https://streams.spec.whatwg.org/#writable-stream-reject-close-and-closed-promise-if-needed>
347    fn reject_close_and_closed_promise_if_needed(&self, cx: &mut JSContext) {
348        // Assert: stream.[[state]] is "errored".
349        assert!(self.is_errored());
350
351        rooted!(&in(cx) let mut stored_error = UndefinedValue());
352        self.get_stored_error(stored_error.handle_mut());
353
354        // If stream.[[closeRequest]] is not undefined
355        let close_request = self.close_request.borrow_mut().take();
356        if let Some(close_request) = close_request {
357            // Assert: stream.[[inFlightCloseRequest]] is undefined.
358            assert!(self.in_flight_close_request.borrow().is_none());
359
360            // Reject stream.[[closeRequest]] with stream.[[storedError]].
361            close_request.reject_native(&stored_error.handle(), CanGc::from_cx(cx))
362
363            // Set stream.[[closeRequest]] to undefined.
364            // Done with `take` above.
365        }
366
367        // Let writer be stream.[[writer]].
368        // If writer is not undefined,
369        if let Some(writer) = self.writer.get() {
370            // Reject writer.[[closedPromise]] with stream.[[storedError]].
371            writer.reject_closed_promise_with_stored_error(cx, &stored_error.handle());
372
373            // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
374            writer.set_close_promise_is_handled();
375        }
376    }
377
378    /// <https://streams.spec.whatwg.org/#writable-stream-close-queued-or-in-flight>
379    pub(crate) fn close_queued_or_in_flight(&self) -> bool {
380        let close_requested = self.close_request.borrow().is_some();
381        let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
382
383        close_requested || in_flight_close_requested
384    }
385
386    /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-write>
387    pub(crate) fn finish_in_flight_write(&self, can_gc: CanGc) {
388        let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
389            // Assert: stream.[[inFlightWriteRequest]] is not undefined.
390            unreachable!("Stream should have a write request");
391        };
392
393        // Resolve stream.[[inFlightWriteRequest]] with undefined.
394        in_flight_write_request.resolve_native(&(), can_gc);
395
396        // Set stream.[[inFlightWriteRequest]] to undefined.
397        // Done above with `take`.
398    }
399
400    /// <https://streams.spec.whatwg.org/#writable-stream-start-erroring>
401    pub(crate) fn start_erroring(
402        &self,
403        cx: &mut JSContext,
404        global: &GlobalScope,
405        error: SafeHandleValue,
406    ) {
407        // Assert: stream.[[storedError]] is undefined.
408        assert!(self.stored_error.get().is_undefined());
409
410        // Assert: stream.[[state]] is "writable".
411        assert!(self.is_writable());
412
413        // Let controller be stream.[[controller]].
414        let Some(controller) = self.controller.get() else {
415            // Assert: controller is not undefined.
416            unreachable!("Stream should have a controller.");
417        };
418
419        // Set stream.[[state]] to "erroring".
420        self.state.set(WritableStreamState::Erroring);
421
422        // Set stream.[[storedError]] to reason.
423        self.stored_error.set(*error);
424
425        // Let writer be stream.[[writer]].
426        if let Some(writer) = self.writer.get() {
427            // If writer is not undefined, perform ! WritableStreamDefaultWriterEnsureReadyPromiseRejected
428            writer.ensure_ready_promise_rejected(global, error, CanGc::from_cx(cx));
429        }
430
431        // If ! WritableStreamHasOperationMarkedInFlight(stream) is false and controller.[[started]] is true
432        if !self.has_operations_marked_inflight() && controller.started() {
433            // perform ! WritableStreamFinishErroring
434            self.finish_erroring(cx, global);
435        }
436    }
437
438    /// <https://streams.spec.whatwg.org/#writable-stream-deal-with-rejection>
439    pub(crate) fn deal_with_rejection(
440        &self,
441        cx: &mut JSContext,
442        global: &GlobalScope,
443        error: SafeHandleValue,
444    ) {
445        // Let state be stream.[[state]].
446
447        // If state is "writable",
448        if self.is_writable() {
449            // Perform ! WritableStreamStartErroring(stream, error).
450            self.start_erroring(cx, global, error);
451
452            // Return.
453            return;
454        }
455
456        // Assert: state is "erroring".
457        assert!(self.is_erroring());
458
459        // Perform ! WritableStreamFinishErroring(stream).
460        self.finish_erroring(cx, global);
461    }
462
463    /// <https://streams.spec.whatwg.org/#writable-stream-mark-first-write-request-in-flight>
464    pub(crate) fn mark_first_write_request_in_flight(&self) {
465        let mut in_flight_write_request = self.in_flight_write_request.borrow_mut();
466        let mut write_requests = self.write_requests.borrow_mut();
467
468        // Assert: stream.[[inFlightWriteRequest]] is undefined.
469        assert!(in_flight_write_request.is_none());
470
471        // Assert: stream.[[writeRequests]] is not empty.
472        assert!(!write_requests.is_empty());
473
474        // Let writeRequest be stream.[[writeRequests]][0].
475        // Remove writeRequest from stream.[[writeRequests]].
476        let write_request = write_requests.pop_front().unwrap();
477
478        // Set stream.[[inFlightWriteRequest]] to writeRequest.
479        *in_flight_write_request = Some(write_request);
480    }
481
482    /// <https://streams.spec.whatwg.org/#writable-stream-mark-close-request-in-flight>
483    pub(crate) fn mark_close_request_in_flight(&self) {
484        let mut in_flight_close_request = self.in_flight_close_request.borrow_mut();
485        let mut close_request = self.close_request.borrow_mut();
486
487        // Assert: stream.[[inFlightCloseRequest]] is undefined.
488        assert!(in_flight_close_request.is_none());
489
490        // Assert: stream.[[closeRequest]] is not undefined.
491        assert!(close_request.is_some());
492
493        // Let closeRequest be stream.[[closeRequest]].
494        // Set stream.[[closeRequest]] to undefined.
495        let close_request = close_request.take().unwrap();
496
497        // Set stream.[[inFlightCloseRequest]] to closeRequest.
498        *in_flight_close_request = Some(close_request);
499    }
500
501    /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-close>
502    pub(crate) fn finish_in_flight_close(&self, cx: SafeJSContext, can_gc: CanGc) {
503        let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
504            // Assert: stream.[[inFlightCloseRequest]] is not undefined.
505            unreachable!("in_flight_close_request must be Some");
506        };
507
508        // Resolve stream.[[inFlightCloseRequest]] with undefined.
509        in_flight_close_request.resolve_native(&(), can_gc);
510
511        // Set stream.[[inFlightCloseRequest]] to undefined.
512        // Done with take above.
513
514        // Assert: stream.[[state]] is "writable" or "erroring".
515        assert!(self.is_writable() || self.is_erroring());
516
517        // If state is "erroring",
518        if self.is_erroring() {
519            // Set stream.[[storedError]] to undefined.
520            self.stored_error.set(UndefinedValue());
521
522            // If stream.[[pendingAbortRequest]] is not undefined,
523            rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
524            if let Some(pending_abort_request) = &*pending_abort_request {
525                // Resolve stream.[[pendingAbortRequest]]'s promise with undefined.
526                pending_abort_request.promise.resolve_native(&(), can_gc);
527
528                // Set stream.[[pendingAbortRequest]] to undefined.
529                // Done above with `take`.
530            }
531        }
532
533        // Set stream.[[state]] to "closed".
534        self.state.set(WritableStreamState::Closed);
535
536        // Let writer be stream.[[writer]].
537        if let Some(writer) = self.writer.get() {
538            // If writer is not undefined,
539            // resolve writer.[[closedPromise]] with undefined.
540            writer.resolve_closed_promise_with_undefined(can_gc);
541        }
542
543        // Assert: stream.[[pendingAbortRequest]] is undefined.
544        assert!(self.pending_abort_request.borrow().is_none());
545
546        // Assert: stream.[[storedError]] is undefined.
547        assert!(self.stored_error.get().is_undefined());
548    }
549
550    /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-close-with-error>
551    pub(crate) fn finish_in_flight_close_with_error(
552        &self,
553        cx: &mut JSContext,
554        global: &GlobalScope,
555        error: SafeHandleValue,
556    ) {
557        let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
558            // Assert: stream.[[inFlightCloseRequest]] is not undefined.
559            unreachable!("Inflight close request must be defined.");
560        };
561
562        // Reject stream.[[inFlightCloseRequest]] with error.
563        in_flight_close_request.reject_native(&error, CanGc::from_cx(cx));
564
565        // Set stream.[[inFlightCloseRequest]] to undefined.
566        // Done above with `take`.
567
568        // Assert: stream.[[state]] is "writable" or "erroring".
569        assert!(self.is_erroring() || self.is_writable());
570
571        // If stream.[[pendingAbortRequest]] is not undefined,
572        rooted!(&in(cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
573        if let Some(pending_abort_request) = &*pending_abort_request {
574            // Reject stream.[[pendingAbortRequest]]'s promise with error.
575            pending_abort_request
576                .promise
577                .reject_native(&error, CanGc::from_cx(cx));
578
579            // Set stream.[[pendingAbortRequest]] to undefined.
580            // Done above with `take`.
581        }
582
583        // Perform ! WritableStreamDealWithRejection(stream, error).
584        self.deal_with_rejection(cx, global, error);
585    }
586
587    /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-write-with-error>
588    pub(crate) fn finish_in_flight_write_with_error(
589        &self,
590        cx: &mut JSContext,
591        global: &GlobalScope,
592        error: SafeHandleValue,
593    ) {
594        let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
595            // Assert: stream.[[inFlightWriteRequest]] is not undefined.
596            unreachable!("Inflight write request must be defined.");
597        };
598
599        // Reject stream.[[inFlightWriteRequest]] with error.
600        in_flight_write_request.reject_native(&error, CanGc::from_cx(cx));
601
602        // Set stream.[[inFlightWriteRequest]] to undefined.
603        // Done above with `take`.
604
605        // Assert: stream.[[state]] is "writable" or "erroring".
606        assert!(self.is_erroring() || self.is_writable());
607
608        // Perform ! WritableStreamDealWithRejection(stream, error).
609        self.deal_with_rejection(cx, global, error);
610    }
611
612    pub(crate) fn get_writer(&self) -> Option<DomRoot<WritableStreamDefaultWriter>> {
613        self.writer.get()
614    }
615
616    pub(crate) fn set_writer(&self, writer: Option<&WritableStreamDefaultWriter>) {
617        self.writer.set(writer);
618    }
619
620    pub(crate) fn set_backpressure(&self, backpressure: bool) {
621        self.backpressure.set(backpressure);
622    }
623
624    pub(crate) fn get_backpressure(&self) -> bool {
625        self.backpressure.get()
626    }
627
628    /// <https://streams.spec.whatwg.org/#is-writable-stream-locked>
629    pub(crate) fn is_locked(&self) -> bool {
630        // If stream.[[writer]] is undefined, return false.
631        // Return true.
632        self.get_writer().is_some()
633    }
634
635    /// <https://streams.spec.whatwg.org/#writable-stream-add-write-request>
636    pub(crate) fn add_write_request(&self, global: &GlobalScope, can_gc: CanGc) -> Rc<Promise> {
637        // Assert: ! IsWritableStreamLocked(stream) is true.
638        assert!(self.is_locked());
639
640        // Assert: stream.[[state]] is "writable".
641        assert!(self.is_writable());
642
643        // Let promise be a new promise.
644        let promise = Promise::new(global, can_gc);
645
646        // Append promise to stream.[[writeRequests]].
647        self.write_requests.borrow_mut().push_back(promise.clone());
648
649        // Return promise.
650        promise
651    }
652
653    // Returns the rooted controller of the stream, if any.
654    pub(crate) fn get_controller(&self) -> Option<DomRoot<WritableStreamDefaultController>> {
655        self.controller.get()
656    }
657
658    /// <https://streams.spec.whatwg.org/#writable-stream-abort>
659    pub(crate) fn abort(
660        &self,
661        cx: &mut CurrentRealm,
662        global: &GlobalScope,
663        provided_reason: SafeHandleValue,
664    ) -> Rc<Promise> {
665        // If stream.[[state]] is "closed" or "errored",
666        if self.is_closed() || self.is_errored() {
667            // return a promise resolved with undefined.
668            return Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
669        }
670
671        // Signal abort on stream.[[controller]].[[abortController]] with reason.
672        self.get_controller()
673            .expect("Stream must have a controller.")
674            .signal_abort(cx, provided_reason);
675
676        // Let state be stream.[[state]].
677        let state = self.state.get();
678
679        // If state is "closed" or "errored", return a promise resolved with undefined.
680        if matches!(
681            state,
682            WritableStreamState::Closed | WritableStreamState::Errored
683        ) {
684            return Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
685        }
686
687        // If stream.[[pendingAbortRequest]] is not undefined,
688        if self.pending_abort_request.borrow().is_some() {
689            // return stream.[[pendingAbortRequest]]'s promise.
690            return self
691                .pending_abort_request
692                .borrow()
693                .as_ref()
694                .expect("Pending abort request must be Some.")
695                .promise
696                .clone();
697        }
698
699        // Assert: state is "writable" or "erroring".
700        assert!(self.is_writable() || self.is_erroring());
701
702        // Let wasAlreadyErroring be false.
703        let mut was_already_erroring = false;
704        rooted!(&in(cx) let undefined_reason = UndefinedValue());
705
706        // If state is "erroring",
707        let reason = if self.is_erroring() {
708            // Set wasAlreadyErroring to true.
709            was_already_erroring = true;
710
711            // Set reason to undefined.
712            undefined_reason.handle()
713        } else {
714            // Use the provided reason.
715            provided_reason
716        };
717
718        // Let promise be a new promise.
719        let promise = Promise::new2(cx, global);
720
721        // Set stream.[[pendingAbortRequest]] to a new pending abort request
722        // whose promise is promise,
723        // reason is reason,
724        // and was already erroring is wasAlreadyErroring.
725        *self.pending_abort_request.borrow_mut() = Some(PendingAbortRequest {
726            promise: promise.clone(),
727            reason: Heap::boxed(reason.get()),
728            was_already_erroring,
729        });
730
731        // If wasAlreadyErroring is false,
732        if !was_already_erroring {
733            // perform ! WritableStreamStartErroring(stream, reason)
734            self.start_erroring(cx, global, reason);
735        }
736
737        // Return promise.
738        promise
739    }
740
741    /// <https://streams.spec.whatwg.org/#writable-stream-close>
742    pub(crate) fn close(&self, cx: &mut JSContext, global: &GlobalScope) -> Rc<Promise> {
743        // Let state be stream.[[state]].
744        // If state is "closed" or "errored",
745        if self.is_closed() || self.is_errored() {
746            // return a promise rejected with a TypeError exception.
747            let promise = Promise::new2(cx, global);
748            promise.reject_error(
749                Error::Type(c"Stream is closed or errored.".to_owned()),
750                CanGc::from_cx(cx),
751            );
752            return promise;
753        }
754
755        // Assert: state is "writable" or "erroring".
756        assert!(self.is_writable() || self.is_erroring());
757
758        // Assert: ! WritableStreamCloseQueuedOrInFlight(stream) is false.
759        assert!(!self.close_queued_or_in_flight());
760
761        // Let promise be a new promise.
762        let promise = Promise::new2(cx, global);
763
764        // Set stream.[[closeRequest]] to promise.
765        *self.close_request.borrow_mut() = Some(promise.clone());
766
767        // Let writer be stream.[[writer]].
768        // If writer is not undefined,
769        if let Some(writer) = self.writer.get() {
770            // and stream.[[backpressure]] is true,
771            // and state is "writable",
772            if self.get_backpressure() && self.is_writable() {
773                // resolve writer.[[readyPromise]] with undefined.
774                writer.resolve_ready_promise_with_undefined(CanGc::from_cx(cx));
775            }
776        }
777
778        // Perform ! WritableStreamDefaultControllerClose(stream.[[controller]]).
779        let Some(controller) = self.controller.get() else {
780            unreachable!("Stream must have a controller.");
781        };
782        controller.close(cx, global);
783
784        // Return promise.
785        promise
786    }
787
788    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-get-desired-size>
789    /// Note: implement as a stream method, as opposed to a writer one, for convenience.
790    pub(crate) fn get_desired_size(&self) -> Option<f64> {
791        // Let stream be writer.[[stream]].
792        // Stream is `self`.
793
794        // Let state be stream.[[state]].
795        // If state is "errored" or "erroring", return null.
796        if self.is_errored() || self.is_erroring() {
797            return None;
798        }
799
800        // If state is "closed", return 0.
801        if self.is_closed() {
802            return Some(0.);
803        }
804
805        let Some(controller) = self.controller.get() else {
806            unreachable!("Stream must have a controller.");
807        };
808        Some(controller.get_desired_size())
809    }
810
811    /// <https://streams.spec.whatwg.org/#acquire-writable-stream-default-writer>
812    pub(crate) fn aquire_default_writer(
813        &self,
814        cx: SafeJSContext,
815        global: &GlobalScope,
816        can_gc: CanGc,
817    ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
818        // Let writer be a new WritableStreamDefaultWriter object.
819        let writer = WritableStreamDefaultWriter::new(global, None, can_gc);
820
821        // Perform ? SetUpWritableStreamDefaultWriter(writer, stream).
822        writer.setup(cx, self, can_gc)?;
823
824        // Return writer.
825        Ok(writer)
826    }
827
828    /// <https://streams.spec.whatwg.org/#writable-stream-update-backpressure>
829    pub(crate) fn update_backpressure(
830        &self,
831        backpressure: bool,
832        global: &GlobalScope,
833        can_gc: CanGc,
834    ) {
835        // Assert: stream.[[state]] is "writable".
836        self.is_writable();
837
838        // Assert: ! WritableStreamCloseQueuedOrInFlight(stream) is false.
839        assert!(!self.close_queued_or_in_flight());
840
841        // Let writer be stream.[[writer]].
842        let writer = self.get_writer();
843
844        if let Some(writer) = writer {
845            // If writer is not undefined
846            if backpressure != self.get_backpressure() {
847                // and backpressure is not stream.[[backpressure]],
848                if backpressure {
849                    // If backpressure is true, set writer.[[readyPromise]] to a new promise.
850                    let promise = Promise::new(global, can_gc);
851                    writer.set_ready_promise(promise);
852                } else {
853                    // Otherwise,
854                    // Assert: backpressure is false.
855                    assert!(!backpressure);
856                    // Resolve writer.[[readyPromise]] with undefined.
857                    writer.resolve_ready_promise_with_undefined(can_gc);
858                }
859            }
860        }
861
862        // Set stream.[[backpressure]] to backpressure.
863        self.set_backpressure(backpressure);
864    }
865
866    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
867    pub(crate) fn setup_cross_realm_transform_writable(
868        &self,
869        cx: &mut JSContext,
870        port: &MessagePort,
871    ) {
872        let port_id = port.message_port_id();
873        let global = self.global();
874
875        // Perform ! InitializeWritableStream(stream).
876        // Done in `new_inherited`.
877
878        // Let sizeAlgorithm be an algorithm that returns 1.
879        // Re-ordered because of the need to pass it to `new`.
880        let size_algorithm =
881            extract_size_algorithm(&QueuingStrategy::default(), CanGc::from_cx(cx));
882
883        // Note: other algorithms defined in the controller at call site.
884
885        // Let backpressurePromise be a new promise.
886        let backpressure_promise = Rc::new(RefCell::new(Some(Promise::new(
887            &global,
888            CanGc::from_cx(cx),
889        ))));
890
891        // Let controller be a new WritableStreamDefaultController.
892        let controller = WritableStreamDefaultController::new(
893            &global,
894            UnderlyingSinkType::Transfer {
895                backpressure_promise: backpressure_promise.clone(),
896                port: Dom::from_ref(port),
897            },
898            1.0,
899            size_algorithm,
900            CanGc::from_cx(cx),
901        );
902
903        // Add a handler for port’s message event with the following steps:
904        // Add a handler for port’s messageerror event with the following steps:
905        rooted!(&in(cx) let cross_realm_transform_writable = CrossRealmTransformWritable {
906            controller: Dom::from_ref(&controller),
907            backpressure_promise,
908        });
909        global.note_cross_realm_transform_writable(&cross_realm_transform_writable, port_id);
910
911        // Enable port’s port message queue.
912        port.Start(cx);
913
914        // Perform ! SetUpWritableStreamDefaultController
915        controller
916            .setup(cx, &global, self)
917            .expect("Setup for transfer cannot fail");
918    }
919    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink>
920    #[allow(clippy::too_many_arguments)]
921    fn setup_from_underlying_sink(
922        &self,
923        cx: &mut JSContext,
924        global: &GlobalScope,
925        stream: &WritableStream,
926        underlying_sink_obj: SafeHandleObject,
927        underlying_sink: &UnderlyingSink,
928        strategy_hwm: f64,
929        strategy_size: Rc<QueuingStrategySize>,
930    ) -> Result<(), Error> {
931        // Let controller be a new WritableStreamDefaultController.
932
933        // Let startAlgorithm be an algorithm that returns undefined.
934
935        // Let writeAlgorithm be an algorithm that returns a promise resolved with undefined.
936
937        // Let closeAlgorithm be an algorithm that returns a promise resolved with undefined.
938
939        // Let abortAlgorithm be an algorithm that returns a promise resolved with undefined.
940
941        // If underlyingSinkDict["start"] exists, then set startAlgorithm to an algorithm which
942        // returns the result of invoking underlyingSinkDict["start"] with argument
943        // list « controller », exception behavior "rethrow", and callback this value underlyingSink.
944
945        // If underlyingSinkDict["write"] exists, then set writeAlgorithm to an algorithm which
946        // takes an argument chunk and returns the result of invoking underlyingSinkDict["write"]
947        // with argument list « chunk, controller » and callback this value underlyingSink.
948
949        // If underlyingSinkDict["close"] exists, then set closeAlgorithm to an algorithm which
950        // returns the result of invoking underlyingSinkDict["close"] with argument
951        // list «» and callback this value underlyingSink.
952
953        // If underlyingSinkDict["abort"] exists, then set abortAlgorithm to an algorithm which
954        // takes an argument reason and returns the result of invoking underlyingSinkDict["abort"]
955        // with argument list « reason » and callback this value underlyingSink.
956        let controller = WritableStreamDefaultController::new(
957            global,
958            UnderlyingSinkType::new_js(
959                underlying_sink.abort.clone(),
960                underlying_sink.start.clone(),
961                underlying_sink.close.clone(),
962                underlying_sink.write.clone(),
963            ),
964            strategy_hwm,
965            strategy_size,
966            CanGc::from_cx(cx),
967        );
968
969        // Note: this must be done before `setup`,
970        // otherwise `thisOb` is null in the start callback.
971        controller.set_underlying_sink_this_object(underlying_sink_obj);
972
973        // Perform ? SetUpWritableStreamDefaultController
974        controller.setup(cx, global, stream)
975    }
976}
977
978/// <https://streams.spec.whatwg.org/#create-writable-stream>
979#[cfg_attr(crown, expect(crown::unrooted_must_root))]
980pub(crate) fn create_writable_stream(
981    cx: &mut JSContext,
982    global: &GlobalScope,
983    writable_high_water_mark: f64,
984    writable_size_algorithm: Rc<QueuingStrategySize>,
985    underlying_sink_type: UnderlyingSinkType,
986) -> Fallible<DomRoot<WritableStream>> {
987    // Assert: ! IsNonNegativeNumber(highWaterMark) is true.
988    assert!(writable_high_water_mark >= 0.0);
989
990    // Let stream be a new WritableStream.
991    // Perform ! InitializeWritableStream(stream).
992    let stream = WritableStream::new_with_proto(global, None, CanGc::from_cx(cx));
993
994    // Let controller be a new WritableStreamDefaultController.
995    let controller = WritableStreamDefaultController::new(
996        global,
997        underlying_sink_type,
998        writable_high_water_mark,
999        writable_size_algorithm,
1000        CanGc::from_cx(cx),
1001    );
1002
1003    // Perform ? SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm,
1004    // closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm).
1005    controller.setup(cx, global, &stream)?;
1006
1007    // Return stream.
1008    Ok(stream)
1009}
1010
1011impl WritableStreamMethods<crate::DomTypeHolder> for WritableStream {
1012    /// <https://streams.spec.whatwg.org/#ws-constructor>
1013    fn Constructor(
1014        cx: &mut JSContext,
1015        global: &GlobalScope,
1016        proto: Option<SafeHandleObject>,
1017        underlying_sink: Option<*mut JSObject>,
1018        strategy: &QueuingStrategy,
1019    ) -> Fallible<DomRoot<WritableStream>> {
1020        // If underlyingSink is missing, set it to null.
1021        rooted!(&in(cx) let underlying_sink_obj = underlying_sink.unwrap_or(ptr::null_mut()));
1022
1023        // Let underlyingSinkDict be underlyingSink,
1024        // converted to an IDL value of type UnderlyingSink.
1025        let underlying_sink_dict = if !underlying_sink_obj.is_null() {
1026            rooted!(&in(cx) let obj_val = ObjectValue(underlying_sink_obj.get()));
1027            match UnderlyingSink::new(cx.into(), obj_val.handle(), CanGc::from_cx(cx)) {
1028                Ok(ConversionResult::Success(val)) => val,
1029                Ok(ConversionResult::Failure(error)) => {
1030                    return Err(Error::Type(error.into_owned()));
1031                },
1032                _ => {
1033                    return Err(Error::JSFailed);
1034                },
1035            }
1036        } else {
1037            UnderlyingSink::empty()
1038        };
1039
1040        if !underlying_sink_dict.type_.handle().is_undefined() {
1041            // If underlyingSinkDict["type"] exists, throw a RangeError exception.
1042            return Err(Error::Range(c"type is set".to_owned()));
1043        }
1044
1045        // Perform ! InitializeWritableStream(this).
1046        let stream = WritableStream::new_with_proto(global, proto, CanGc::from_cx(cx));
1047
1048        // Let sizeAlgorithm be ! ExtractSizeAlgorithm(strategy).
1049        let size_algorithm = extract_size_algorithm(strategy, CanGc::from_cx(cx));
1050
1051        // Let highWaterMark be ? ExtractHighWaterMark(strategy, 1).
1052        let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
1053
1054        // Perform ? SetUpWritableStreamDefaultControllerFromUnderlyingSink(this, underlyingSink,
1055        // underlyingSinkDict, highWaterMark, sizeAlgorithm).
1056        stream.setup_from_underlying_sink(
1057            cx,
1058            global,
1059            &stream,
1060            underlying_sink_obj.handle(),
1061            &underlying_sink_dict,
1062            high_water_mark,
1063            size_algorithm,
1064        )?;
1065
1066        Ok(stream)
1067    }
1068
1069    /// <https://streams.spec.whatwg.org/#ws-locked>
1070    fn Locked(&self) -> bool {
1071        // Return ! IsWritableStreamLocked(this).
1072        self.is_locked()
1073    }
1074
1075    /// <https://streams.spec.whatwg.org/#ws-abort>
1076    fn Abort(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) -> Rc<Promise> {
1077        let global = GlobalScope::from_current_realm(cx);
1078
1079        // If ! IsWritableStreamLocked(this) is true,
1080        if self.is_locked() {
1081            // return a promise rejected with a TypeError exception.
1082            let promise = Promise::new2(cx, &global);
1083            promise.reject_error(
1084                Error::Type(c"Stream is locked.".to_owned()),
1085                CanGc::from_cx(cx),
1086            );
1087            return promise;
1088        }
1089
1090        // Return ! WritableStreamAbort(this, reason).
1091        self.abort(cx, &global, reason)
1092    }
1093
1094    /// <https://streams.spec.whatwg.org/#ws-close>
1095    fn Close(&self, cx: &mut CurrentRealm) -> Rc<Promise> {
1096        let global = GlobalScope::from_current_realm(cx);
1097
1098        // If ! IsWritableStreamLocked(this) is true,
1099        if self.is_locked() {
1100            // return a promise rejected with a TypeError exception.
1101            let promise = Promise::new2(cx, &global);
1102            promise.reject_error(
1103                Error::Type(c"Stream is locked.".to_owned()),
1104                CanGc::from_cx(cx),
1105            );
1106            return promise;
1107        }
1108
1109        // If ! WritableStreamCloseQueuedOrInFlight(this) is true
1110        if self.close_queued_or_in_flight() {
1111            // return a promise rejected with a TypeError exception.
1112            let promise = Promise::new2(cx, &global);
1113            promise.reject_error(
1114                Error::Type(c"Stream has closed queued or in-flight".to_owned()),
1115                CanGc::from_cx(cx),
1116            );
1117            return promise;
1118        }
1119
1120        // Return ! WritableStreamClose(this).
1121        self.close(cx, &global)
1122    }
1123
1124    /// <https://streams.spec.whatwg.org/#ws-get-writer>
1125    fn GetWriter(
1126        &self,
1127        realm: InRealm,
1128        can_gc: CanGc,
1129    ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
1130        let cx = GlobalScope::get_cx();
1131        let global = GlobalScope::from_safe_context(cx, realm);
1132
1133        // Return ? AcquireWritableStreamDefaultWriter(this).
1134        self.aquire_default_writer(cx, &global, can_gc)
1135    }
1136}
1137
1138impl js::gc::Rootable for CrossRealmTransformWritable {}
1139
1140/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
1141/// A wrapper to handle `message` and `messageerror` events
1142/// for the port used by the transfered stream.
1143#[derive(Clone, JSTraceable, MallocSizeOf)]
1144#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
1145pub(crate) struct CrossRealmTransformWritable {
1146    /// The controller used in the algorithm.
1147    controller: Dom<WritableStreamDefaultController>,
1148
1149    /// The `backpressurePromise` used in the algorithm.
1150    #[ignore_malloc_size_of = "nested Rc"]
1151    backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
1152}
1153
1154impl CrossRealmTransformWritable {
1155    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
1156    /// Add a handler for port’s message event with the following steps:
1157    #[expect(unsafe_code)]
1158    pub(crate) fn handle_message(
1159        &self,
1160        cx: &mut CurrentRealm,
1161        global: &GlobalScope,
1162        message: SafeHandleValue,
1163    ) {
1164        rooted!(&in(cx) let mut value = UndefinedValue());
1165        let type_string = unsafe {
1166            get_type_and_value_from_message(
1167                cx.into(),
1168                message,
1169                value.handle_mut(),
1170                CanGc::from_cx(cx),
1171            )
1172        };
1173
1174        // If type is "pull",
1175        // Done below as the steps are the same for both types.
1176
1177        // Otherwise, if type is "error",
1178        if type_string == "error" {
1179            // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, value).
1180            self.controller.error_if_needed(cx, value.handle(), global);
1181        }
1182
1183        let backpressure_promise = self.backpressure_promise.borrow_mut().take();
1184
1185        // Note: the below steps are for both "pull" and "error" types.
1186        // If backpressurePromise is not undefined,
1187        if let Some(promise) = backpressure_promise {
1188            // Resolve backpressurePromise with undefined.
1189            promise.resolve_native(&(), CanGc::from_cx(cx));
1190
1191            // Set backpressurePromise to undefined.
1192            // Done above with `take`.
1193        }
1194    }
1195
1196    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
1197    /// Add a handler for port’s messageerror event with the following steps:
1198    pub(crate) fn handle_error(
1199        &self,
1200        cx: &mut CurrentRealm,
1201        global: &GlobalScope,
1202        port: &MessagePort,
1203    ) {
1204        // Let error be a new "DataCloneError" DOMException.
1205        let error = DOMException::new(global, DOMErrorName::DataCloneError, CanGc::from_cx(cx));
1206        rooted!(&in(cx) let mut rooted_error = UndefinedValue());
1207        error.safe_to_jsval(cx.into(), rooted_error.handle_mut(), CanGc::from_cx(cx));
1208
1209        // Perform ! CrossRealmTransformSendError(port, error).
1210        port.cross_realm_transform_send_error(cx, rooted_error.handle());
1211
1212        // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, error).
1213        self.controller
1214            .error_if_needed(cx, rooted_error.handle(), global);
1215
1216        // Disentangle port.
1217        global.disentangle_port(cx, port);
1218    }
1219}
1220
1221/// <https://streams.spec.whatwg.org/#ws-transfer>
1222impl Transferable for WritableStream {
1223    type Index = MessagePortIndex;
1224    type Data = MessagePortImpl;
1225
1226    /// <https://streams.spec.whatwg.org/#ref-for-transfer-steps①>
1227    fn transfer(&self, cx: &mut JSContext) -> Fallible<(MessagePortId, MessagePortImpl)> {
1228        // Step 1. If ! IsWritableStreamLocked(value) is true, throw a
1229        // "DataCloneError" DOMException.
1230        if self.is_locked() {
1231            return Err(Error::DataClone(None));
1232        }
1233
1234        let global = self.global();
1235        let mut realm = enter_auto_realm(cx, &*global);
1236        let mut realm = realm.current_realm();
1237        let cx = &mut realm;
1238
1239        // Step 2. Let port1 be a new MessagePort in the current Realm.
1240        let port_1 = MessagePort::new(&global, CanGc::from_cx(cx));
1241        global.track_message_port(&port_1, None);
1242
1243        // Step 3. Let port2 be a new MessagePort in the current Realm.
1244        let port_2 = MessagePort::new(&global, CanGc::from_cx(cx));
1245        global.track_message_port(&port_2, None);
1246
1247        // Step 4. Entangle port1 and port2.
1248        global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
1249
1250        // Step 5. Let readable be a new ReadableStream in the current Realm.
1251        let readable = ReadableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
1252
1253        // Step 6. Perform ! SetUpCrossRealmTransformReadable(readable, port1).
1254        readable.setup_cross_realm_transform_readable(cx, &port_1);
1255
1256        // Step 7. Let promise be ! ReadableStreamPipeTo(readable, value, false, false, false).
1257        let promise = readable.pipe_to(cx, &global, self, false, false, false, None);
1258
1259        // Step 8. Set promise.[[PromiseIsHandled]] to true.
1260        promise.set_promise_is_handled();
1261
1262        // Step 9. Set dataHolder.[[port]] to ! StructuredSerializeWithTransfer(port2, « port2 »).
1263        port_2.transfer(cx)
1264    }
1265
1266    /// <https://streams.spec.whatwg.org/#ref-for-transfer-receiving-steps①>
1267    fn transfer_receive(
1268        cx: &mut JSContext,
1269        owner: &GlobalScope,
1270        id: MessagePortId,
1271        port_impl: MessagePortImpl,
1272    ) -> Result<DomRoot<Self>, ()> {
1273        // Their transfer-receiving steps, given dataHolder and value, are:
1274        // Note: dataHolder is used in `structuredclone.rs`, and value is created here.
1275        let value = WritableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1276
1277        // Step 1. Let deserializedRecord be !
1278        // StructuredDeserializeWithTransfer(dataHolder.[[port]], the current
1279        // Realm).
1280        // Done with the `Deserialize` derive of `MessagePortImpl`.
1281
1282        // Step 2. Let port be deserializedRecord.[[Deserialized]].
1283        let transferred_port = MessagePort::transfer_receive(cx, owner, id, port_impl)?;
1284
1285        // Step 3. Perform ! SetUpCrossRealmTransformWritable(value, port).
1286        value.setup_cross_realm_transform_writable(cx, &transferred_port);
1287        Ok(value)
1288    }
1289
1290    /// Note: we are relying on the port transfer, so the data returned here are related to the port.
1291    fn serialized_storage<'a>(
1292        data: StructuredData<'a, '_>,
1293    ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
1294        match data {
1295            StructuredData::Reader(r) => &mut r.port_impls,
1296            StructuredData::Writer(w) => &mut w.ports,
1297        }
1298    }
1299}