Skip to main content

script/dom/stream/
writablestream.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::mem;
8use std::ptr::{self};
9use std::rc::Rc;
10
11use dom_struct::dom_struct;
12use js::context::JSContext;
13use js::jsapi::{Heap, JSObject};
14use js::jsval::{JSVal, ObjectValue, UndefinedValue};
15use js::realm::CurrentRealm;
16use js::rust::{
17    HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
18    MutableHandleValue as SafeMutableHandleValue,
19};
20use rustc_hash::FxHashMap;
21use script_bindings::cell::DomRefCell;
22use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
23use script_bindings::conversions::SafeToJSValConvertible;
24use script_bindings::reflector::{Reflector, reflect_dom_object_with_proto};
25use servo_base::id::{MessagePortId, MessagePortIndex};
26use servo_constellation_traits::MessagePortImpl;
27
28use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::{
29    QueuingStrategy, QueuingStrategySize,
30};
31use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::UnderlyingSink;
32use crate::dom::bindings::codegen::Bindings::WritableStreamBinding::WritableStreamMethods;
33use crate::dom::bindings::conversions::ConversionResult;
34use crate::dom::bindings::error::{Error, Fallible};
35use crate::dom::bindings::reflector::DomGlobal;
36use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
37use crate::dom::bindings::structuredclone::StructuredData;
38use crate::dom::bindings::transferable::Transferable;
39use crate::dom::domexception::{DOMErrorName, DOMException};
40use crate::dom::globalscope::GlobalScope;
41use crate::dom::messageport::MessagePort;
42use crate::dom::promise::Promise;
43use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
44use crate::dom::readablestream::{ReadableStream, get_type_and_value_from_message};
45use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
46use crate::dom::stream::writablestreamdefaultcontroller::{
47    UnderlyingSinkType, WritableStreamDefaultController,
48};
49use crate::dom::stream::writablestreamdefaultwriter::WritableStreamDefaultWriter;
50use crate::realms::{InRealm, enter_auto_realm};
51use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
52
53impl js::gc::Rootable for AbortAlgorithmFulfillmentHandler {}
54
55/// The fulfillment handler for the abort steps of
56/// <https://streams.spec.whatwg.org/#writable-stream-finish-erroring>
57#[derive(JSTraceable, MallocSizeOf)]
58#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
59struct AbortAlgorithmFulfillmentHandler {
60    stream: Dom<WritableStream>,
61    #[conditional_malloc_size_of]
62    abort_request_promise: Rc<Promise>,
63}
64
65impl Callback for AbortAlgorithmFulfillmentHandler {
66    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
67        // Resolve abortRequest’s promise with undefined.
68        self.abort_request_promise.resolve_native_with_cx(cx, &());
69
70        // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
71        self.stream
72            .as_rooted()
73            .reject_close_and_closed_promise_if_needed(cx);
74    }
75}
76
77impl js::gc::Rootable for AbortAlgorithmRejectionHandler {}
78
79/// The rejection handler for the abort steps of
80/// <https://streams.spec.whatwg.org/#writable-stream-finish-erroring>
81#[derive(JSTraceable, MallocSizeOf)]
82#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
83struct AbortAlgorithmRejectionHandler {
84    stream: Dom<WritableStream>,
85    #[conditional_malloc_size_of]
86    abort_request_promise: Rc<Promise>,
87}
88
89impl Callback for AbortAlgorithmRejectionHandler {
90    fn callback(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) {
91        // Reject abortRequest’s promise with reason.
92        self.abort_request_promise
93            .reject_native_with_cx(cx, &reason);
94
95        // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
96        self.stream
97            .as_rooted()
98            .reject_close_and_closed_promise_if_needed(cx);
99    }
100}
101
102impl js::gc::Rootable for PendingAbortRequest {}
103
104/// <https://streams.spec.whatwg.org/#pending-abort-request>
105#[derive(JSTraceable, MallocSizeOf)]
106#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
107struct PendingAbortRequest {
108    /// <https://streams.spec.whatwg.org/#pending-abort-request-promise>
109    #[conditional_malloc_size_of]
110    promise: Rc<Promise>,
111
112    /// <https://streams.spec.whatwg.org/#pending-abort-request-reason>
113    #[ignore_malloc_size_of = "mozjs"]
114    reason: Box<Heap<JSVal>>,
115
116    /// <https://streams.spec.whatwg.org/#pending-abort-request-was-already-erroring>
117    was_already_erroring: bool,
118}
119
120/// <https://streams.spec.whatwg.org/#writablestream-state>
121#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf)]
122pub(crate) enum WritableStreamState {
123    #[default]
124    Writable,
125    Closed,
126    Erroring,
127    Errored,
128}
129
130/// <https://streams.spec.whatwg.org/#ws-class>
131#[dom_struct]
132pub struct WritableStream {
133    reflector_: Reflector,
134
135    /// <https://streams.spec.whatwg.org/#writablestream-backpressure>
136    backpressure: Cell<bool>,
137
138    /// <https://streams.spec.whatwg.org/#writablestream-closerequest>
139    #[conditional_malloc_size_of]
140    close_request: DomRefCell<Option<Rc<Promise>>>,
141
142    /// <https://streams.spec.whatwg.org/#writablestream-controller>
143    controller: MutNullableDom<WritableStreamDefaultController>,
144
145    /// <https://streams.spec.whatwg.org/#writablestream-detached>
146    detached: Cell<bool>,
147
148    /// <https://streams.spec.whatwg.org/#writablestream-inflightwriterequest>
149    #[conditional_malloc_size_of]
150    in_flight_write_request: DomRefCell<Option<Rc<Promise>>>,
151
152    /// <https://streams.spec.whatwg.org/#writablestream-inflightcloserequest>
153    #[conditional_malloc_size_of]
154    in_flight_close_request: DomRefCell<Option<Rc<Promise>>>,
155
156    /// <https://streams.spec.whatwg.org/#writablestream-pendingabortrequest>
157    pending_abort_request: DomRefCell<Option<PendingAbortRequest>>,
158
159    /// <https://streams.spec.whatwg.org/#writablestream-state>
160    state: Cell<WritableStreamState>,
161
162    /// <https://streams.spec.whatwg.org/#writablestream-storederror>
163    #[ignore_malloc_size_of = "mozjs"]
164    stored_error: Heap<JSVal>,
165
166    /// <https://streams.spec.whatwg.org/#writablestream-writer>
167    writer: MutNullableDom<WritableStreamDefaultWriter>,
168
169    /// <https://streams.spec.whatwg.org/#writablestream-writerequests>
170    #[conditional_malloc_size_of]
171    write_requests: DomRefCell<VecDeque<Rc<Promise>>>,
172}
173
174impl WritableStream {
175    /// <https://streams.spec.whatwg.org/#initialize-writable-stream>
176    fn new_inherited() -> WritableStream {
177        WritableStream {
178            reflector_: Reflector::new(),
179            backpressure: Default::default(),
180            close_request: Default::default(),
181            controller: Default::default(),
182            detached: Default::default(),
183            in_flight_write_request: Default::default(),
184            in_flight_close_request: Default::default(),
185            pending_abort_request: Default::default(),
186            state: Default::default(),
187            stored_error: Default::default(),
188            writer: Default::default(),
189            write_requests: Default::default(),
190        }
191    }
192
193    pub(crate) fn new_with_proto(
194        global: &GlobalScope,
195        proto: Option<SafeHandleObject>,
196        can_gc: CanGc,
197    ) -> DomRoot<WritableStream> {
198        reflect_dom_object_with_proto(
199            Box::new(WritableStream::new_inherited()),
200            global,
201            proto,
202            can_gc,
203        )
204    }
205
206    /// Used as part of
207    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
208    pub(crate) fn assert_no_controller(&self) {
209        assert!(self.controller.get().is_none());
210    }
211
212    /// Used as part of
213    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
214    pub(crate) fn set_default_controller(&self, controller: &WritableStreamDefaultController) {
215        self.controller.set(Some(controller));
216    }
217
218    pub(crate) fn get_default_controller(&self) -> DomRoot<WritableStreamDefaultController> {
219        self.controller.get().expect("Controller should be set.")
220    }
221
222    pub(crate) fn is_writable(&self) -> bool {
223        matches!(self.state.get(), WritableStreamState::Writable)
224    }
225
226    pub(crate) fn is_erroring(&self) -> bool {
227        matches!(self.state.get(), WritableStreamState::Erroring)
228    }
229
230    pub(crate) fn is_errored(&self) -> bool {
231        matches!(self.state.get(), WritableStreamState::Errored)
232    }
233
234    pub(crate) fn is_closed(&self) -> bool {
235        matches!(self.state.get(), WritableStreamState::Closed)
236    }
237
238    pub(crate) fn has_in_flight_write_request(&self) -> bool {
239        self.in_flight_write_request.borrow().is_some()
240    }
241
242    /// <https://streams.spec.whatwg.org/#writable-stream-has-operation-marked-in-flight>
243    pub(crate) fn has_operations_marked_inflight(&self) -> bool {
244        let in_flight_write_requested = self.in_flight_write_request.borrow().is_some();
245        let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
246
247        in_flight_write_requested || in_flight_close_requested
248    }
249
250    /// <https://streams.spec.whatwg.org/#writablestream-storederror>
251    pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
252        handle_mut.set(self.stored_error.get());
253    }
254
255    /// <https://streams.spec.whatwg.org/#writable-stream-finish-erroring>
256    pub(crate) fn finish_erroring(&self, cx: &mut JSContext, global: &GlobalScope) {
257        // Assert: stream.[[state]] is "erroring".
258        assert!(self.is_erroring());
259
260        // Assert: ! WritableStreamHasOperationMarkedInFlight(stream) is false.
261        assert!(!self.has_operations_marked_inflight());
262
263        // Set stream.[[state]] to "errored".
264        self.state.set(WritableStreamState::Errored);
265
266        // Perform ! stream.[[controller]].[[ErrorSteps]]().
267        let Some(controller) = self.controller.get() else {
268            unreachable!("Stream should have a controller.");
269        };
270        controller.perform_error_steps();
271
272        // Let storedError be stream.[[storedError]].
273        rooted!(&in(cx) let mut stored_error = UndefinedValue());
274        self.get_stored_error(stored_error.handle_mut());
275
276        // For each writeRequest of stream.[[writeRequests]]:
277        let write_requests = mem::take(&mut *self.write_requests.borrow_mut());
278        for request in write_requests {
279            // Reject writeRequest with storedError.
280            request.reject_with_cx(cx, stored_error.handle());
281        }
282
283        // Set stream.[[writeRequests]] to an empty list.
284        // Done above with `drain`.
285
286        // If stream.[[pendingAbortRequest]] is undefined,
287        if self.pending_abort_request.borrow().is_none() {
288            // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
289            self.reject_close_and_closed_promise_if_needed(cx);
290
291            // Return.
292            return;
293        }
294
295        // Let abortRequest be stream.[[pendingAbortRequest]].
296        // Set stream.[[pendingAbortRequest]] to undefined.
297        rooted!(&in(cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
298        if let Some(pending_abort_request) = &*pending_abort_request {
299            // If abortRequest’s was already erroring is true,
300            if pending_abort_request.was_already_erroring {
301                // Reject abortRequest’s promise with storedError.
302                pending_abort_request
303                    .promise
304                    .reject_with_cx(cx, stored_error.handle());
305
306                // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
307                self.reject_close_and_closed_promise_if_needed(cx);
308
309                // Return.
310                return;
311            }
312
313            // Let promise be ! stream.[[controller]].[[AbortSteps]](abortRequest’s reason).
314            rooted!(&in(cx) let mut reason = UndefinedValue());
315            reason.set(pending_abort_request.reason.get());
316            let promise = controller.abort_steps(cx, global, reason.handle());
317
318            // Upon fulfillment of promise,
319            rooted!(&in(cx) let mut fulfillment_handler = Some(AbortAlgorithmFulfillmentHandler {
320                stream: Dom::from_ref(self),
321                abort_request_promise: pending_abort_request.promise.clone(),
322            }));
323
324            // Upon rejection of promise with reason r,
325            rooted!(&in(cx) let mut rejection_handler = Some(AbortAlgorithmRejectionHandler {
326                stream: Dom::from_ref(self),
327                abort_request_promise: pending_abort_request.promise.clone(),
328            }));
329
330            let handler = PromiseNativeHandler::new(
331                cx,
332                global,
333                fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
334                rejection_handler.take().map(|h| Box::new(h) as Box<_>),
335            );
336
337            let mut realm = enter_auto_realm(cx, global);
338            let cx = &mut realm.current_realm();
339            promise.append_native_handler(cx, &handler);
340        }
341    }
342
343    /// <https://streams.spec.whatwg.org/#writable-stream-reject-close-and-closed-promise-if-needed>
344    fn reject_close_and_closed_promise_if_needed(&self, cx: &mut JSContext) {
345        // Assert: stream.[[state]] is "errored".
346        assert!(self.is_errored());
347
348        rooted!(&in(cx) let mut stored_error = UndefinedValue());
349        self.get_stored_error(stored_error.handle_mut());
350
351        // If stream.[[closeRequest]] is not undefined
352        let close_request = self.close_request.borrow_mut().take();
353        if let Some(close_request) = close_request {
354            // Assert: stream.[[inFlightCloseRequest]] is undefined.
355            assert!(self.in_flight_close_request.borrow().is_none());
356
357            // Reject stream.[[closeRequest]] with stream.[[storedError]].
358            close_request.reject_native(&stored_error.handle(), CanGc::from_cx(cx))
359
360            // Set stream.[[closeRequest]] to undefined.
361            // Done with `take` above.
362        }
363
364        // Let writer be stream.[[writer]].
365        // If writer is not undefined,
366        if let Some(writer) = self.writer.get() {
367            // Reject writer.[[closedPromise]] with stream.[[storedError]].
368            writer.reject_closed_promise_with_stored_error(cx, &stored_error.handle());
369
370            // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
371            writer.set_close_promise_is_handled();
372        }
373    }
374
375    /// <https://streams.spec.whatwg.org/#writable-stream-close-queued-or-in-flight>
376    pub(crate) fn close_queued_or_in_flight(&self) -> bool {
377        let close_requested = self.close_request.borrow().is_some();
378        let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
379
380        close_requested || in_flight_close_requested
381    }
382
383    /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-write>
384    pub(crate) fn finish_in_flight_write(&self, can_gc: CanGc) {
385        let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
386            // Assert: stream.[[inFlightWriteRequest]] is not undefined.
387            unreachable!("Stream should have a write request");
388        };
389
390        // Resolve stream.[[inFlightWriteRequest]] with undefined.
391        in_flight_write_request.resolve_native(&(), can_gc);
392
393        // Set stream.[[inFlightWriteRequest]] to undefined.
394        // Done above with `take`.
395    }
396
397    /// <https://streams.spec.whatwg.org/#writable-stream-start-erroring>
398    pub(crate) fn start_erroring(
399        &self,
400        cx: &mut JSContext,
401        global: &GlobalScope,
402        error: SafeHandleValue,
403    ) {
404        // Assert: stream.[[storedError]] is undefined.
405        assert!(self.stored_error.get().is_undefined());
406
407        // Assert: stream.[[state]] is "writable".
408        assert!(self.is_writable());
409
410        // Let controller be stream.[[controller]].
411        let Some(controller) = self.controller.get() else {
412            // Assert: controller is not undefined.
413            unreachable!("Stream should have a controller.");
414        };
415
416        // Set stream.[[state]] to "erroring".
417        self.state.set(WritableStreamState::Erroring);
418
419        // Set stream.[[storedError]] to reason.
420        self.stored_error.set(*error);
421
422        // Let writer be stream.[[writer]].
423        if let Some(writer) = self.writer.get() {
424            // If writer is not undefined, perform ! WritableStreamDefaultWriterEnsureReadyPromiseRejected
425            writer.ensure_ready_promise_rejected(global, error, CanGc::from_cx(cx));
426        }
427
428        // If ! WritableStreamHasOperationMarkedInFlight(stream) is false and controller.[[started]] is true
429        if !self.has_operations_marked_inflight() && controller.started() {
430            // perform ! WritableStreamFinishErroring
431            self.finish_erroring(cx, global);
432        }
433    }
434
435    /// <https://streams.spec.whatwg.org/#writable-stream-deal-with-rejection>
436    pub(crate) fn deal_with_rejection(
437        &self,
438        cx: &mut JSContext,
439        global: &GlobalScope,
440        error: SafeHandleValue,
441    ) {
442        // Let state be stream.[[state]].
443
444        // If state is "writable",
445        if self.is_writable() {
446            // Perform ! WritableStreamStartErroring(stream, error).
447            self.start_erroring(cx, global, error);
448
449            // Return.
450            return;
451        }
452
453        // Assert: state is "erroring".
454        assert!(self.is_erroring());
455
456        // Perform ! WritableStreamFinishErroring(stream).
457        self.finish_erroring(cx, global);
458    }
459
460    /// <https://streams.spec.whatwg.org/#writable-stream-mark-first-write-request-in-flight>
461    pub(crate) fn mark_first_write_request_in_flight(&self) {
462        let mut in_flight_write_request = self.in_flight_write_request.borrow_mut();
463        let mut write_requests = self.write_requests.borrow_mut();
464
465        // Assert: stream.[[inFlightWriteRequest]] is undefined.
466        assert!(in_flight_write_request.is_none());
467
468        // Assert: stream.[[writeRequests]] is not empty.
469        assert!(!write_requests.is_empty());
470
471        // Let writeRequest be stream.[[writeRequests]][0].
472        // Remove writeRequest from stream.[[writeRequests]].
473        let write_request = write_requests.pop_front().unwrap();
474
475        // Set stream.[[inFlightWriteRequest]] to writeRequest.
476        *in_flight_write_request = Some(write_request);
477    }
478
479    /// <https://streams.spec.whatwg.org/#writable-stream-mark-close-request-in-flight>
480    pub(crate) fn mark_close_request_in_flight(&self) {
481        let mut in_flight_close_request = self.in_flight_close_request.borrow_mut();
482        let mut close_request = self.close_request.borrow_mut();
483
484        // Assert: stream.[[inFlightCloseRequest]] is undefined.
485        assert!(in_flight_close_request.is_none());
486
487        // Assert: stream.[[closeRequest]] is not undefined.
488        assert!(close_request.is_some());
489
490        // Let closeRequest be stream.[[closeRequest]].
491        // Set stream.[[closeRequest]] to undefined.
492        let close_request = close_request.take().unwrap();
493
494        // Set stream.[[inFlightCloseRequest]] to closeRequest.
495        *in_flight_close_request = Some(close_request);
496    }
497
498    /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-close>
499    pub(crate) fn finish_in_flight_close(&self, cx: SafeJSContext, can_gc: CanGc) {
500        let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
501            // Assert: stream.[[inFlightCloseRequest]] is not undefined.
502            unreachable!("in_flight_close_request must be Some");
503        };
504
505        // Resolve stream.[[inFlightCloseRequest]] with undefined.
506        in_flight_close_request.resolve_native(&(), can_gc);
507
508        // Set stream.[[inFlightCloseRequest]] to undefined.
509        // Done with take above.
510
511        // Assert: stream.[[state]] is "writable" or "erroring".
512        assert!(self.is_writable() || self.is_erroring());
513
514        // If state is "erroring",
515        if self.is_erroring() {
516            // Set stream.[[storedError]] to undefined.
517            self.stored_error.set(UndefinedValue());
518
519            // If stream.[[pendingAbortRequest]] is not undefined,
520            rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
521            if let Some(pending_abort_request) = &*pending_abort_request {
522                // Resolve stream.[[pendingAbortRequest]]'s promise with undefined.
523                pending_abort_request.promise.resolve_native(&(), can_gc);
524
525                // Set stream.[[pendingAbortRequest]] to undefined.
526                // Done above with `take`.
527            }
528        }
529
530        // Set stream.[[state]] to "closed".
531        self.state.set(WritableStreamState::Closed);
532
533        // Let writer be stream.[[writer]].
534        if let Some(writer) = self.writer.get() {
535            // If writer is not undefined,
536            // resolve writer.[[closedPromise]] with undefined.
537            writer.resolve_closed_promise_with_undefined(can_gc);
538        }
539
540        // Assert: stream.[[pendingAbortRequest]] is undefined.
541        assert!(self.pending_abort_request.borrow().is_none());
542
543        // Assert: stream.[[storedError]] is undefined.
544        assert!(self.stored_error.get().is_undefined());
545    }
546
547    /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-close-with-error>
548    pub(crate) fn finish_in_flight_close_with_error(
549        &self,
550        cx: &mut JSContext,
551        global: &GlobalScope,
552        error: SafeHandleValue,
553    ) {
554        let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
555            // Assert: stream.[[inFlightCloseRequest]] is not undefined.
556            unreachable!("Inflight close request must be defined.");
557        };
558
559        // Reject stream.[[inFlightCloseRequest]] with error.
560        in_flight_close_request.reject_native_with_cx(cx, &error);
561
562        // Set stream.[[inFlightCloseRequest]] to undefined.
563        // Done above with `take`.
564
565        // Assert: stream.[[state]] is "writable" or "erroring".
566        assert!(self.is_erroring() || self.is_writable());
567
568        // If stream.[[pendingAbortRequest]] is not undefined,
569        rooted!(&in(cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
570        if let Some(pending_abort_request) = &*pending_abort_request {
571            // Reject stream.[[pendingAbortRequest]]'s promise with error.
572            pending_abort_request
573                .promise
574                .reject_native_with_cx(cx, &error);
575
576            // Set stream.[[pendingAbortRequest]] to undefined.
577            // Done above with `take`.
578        }
579
580        // Perform ! WritableStreamDealWithRejection(stream, error).
581        self.deal_with_rejection(cx, global, error);
582    }
583
584    /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-write-with-error>
585    pub(crate) fn finish_in_flight_write_with_error(
586        &self,
587        cx: &mut JSContext,
588        global: &GlobalScope,
589        error: SafeHandleValue,
590    ) {
591        let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
592            // Assert: stream.[[inFlightWriteRequest]] is not undefined.
593            unreachable!("Inflight write request must be defined.");
594        };
595
596        // Reject stream.[[inFlightWriteRequest]] with error.
597        in_flight_write_request.reject_native_with_cx(cx, &error);
598
599        // Set stream.[[inFlightWriteRequest]] to undefined.
600        // Done above with `take`.
601
602        // Assert: stream.[[state]] is "writable" or "erroring".
603        assert!(self.is_erroring() || self.is_writable());
604
605        // Perform ! WritableStreamDealWithRejection(stream, error).
606        self.deal_with_rejection(cx, global, error);
607    }
608
609    pub(crate) fn get_writer(&self) -> Option<DomRoot<WritableStreamDefaultWriter>> {
610        self.writer.get()
611    }
612
613    pub(crate) fn set_writer(&self, writer: Option<&WritableStreamDefaultWriter>) {
614        self.writer.set(writer);
615    }
616
617    pub(crate) fn set_backpressure(&self, backpressure: bool) {
618        self.backpressure.set(backpressure);
619    }
620
621    pub(crate) fn get_backpressure(&self) -> bool {
622        self.backpressure.get()
623    }
624
625    /// <https://streams.spec.whatwg.org/#is-writable-stream-locked>
626    pub(crate) fn is_locked(&self) -> bool {
627        // If stream.[[writer]] is undefined, return false.
628        // Return true.
629        self.get_writer().is_some()
630    }
631
632    /// <https://streams.spec.whatwg.org/#writable-stream-add-write-request>
633    pub(crate) fn add_write_request(&self, global: &GlobalScope, can_gc: CanGc) -> Rc<Promise> {
634        // Assert: ! IsWritableStreamLocked(stream) is true.
635        assert!(self.is_locked());
636
637        // Assert: stream.[[state]] is "writable".
638        assert!(self.is_writable());
639
640        // Let promise be a new promise.
641        let promise = Promise::new(global, can_gc);
642
643        // Append promise to stream.[[writeRequests]].
644        self.write_requests.borrow_mut().push_back(promise.clone());
645
646        // Return promise.
647        promise
648    }
649
650    // Returns the rooted controller of the stream, if any.
651    pub(crate) fn get_controller(&self) -> Option<DomRoot<WritableStreamDefaultController>> {
652        self.controller.get()
653    }
654
655    /// <https://streams.spec.whatwg.org/#writable-stream-abort>
656    pub(crate) fn abort(
657        &self,
658        cx: &mut CurrentRealm,
659        global: &GlobalScope,
660        provided_reason: SafeHandleValue,
661    ) -> Rc<Promise> {
662        // If stream.[[state]] is "closed" or "errored",
663        if self.is_closed() || self.is_errored() {
664            // return a promise resolved with undefined.
665            return Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
666        }
667
668        // Signal abort on stream.[[controller]].[[abortController]] with reason.
669        self.get_controller()
670            .expect("Stream must have a controller.")
671            .signal_abort(cx, provided_reason);
672
673        // Let state be stream.[[state]].
674        let state = self.state.get();
675
676        // If state is "closed" or "errored", return a promise resolved with undefined.
677        if matches!(
678            state,
679            WritableStreamState::Closed | WritableStreamState::Errored
680        ) {
681            return Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
682        }
683
684        // If stream.[[pendingAbortRequest]] is not undefined,
685        if self.pending_abort_request.borrow().is_some() {
686            // return stream.[[pendingAbortRequest]]'s promise.
687            return self
688                .pending_abort_request
689                .borrow()
690                .as_ref()
691                .expect("Pending abort request must be Some.")
692                .promise
693                .clone();
694        }
695
696        // Assert: state is "writable" or "erroring".
697        assert!(self.is_writable() || self.is_erroring());
698
699        // Let wasAlreadyErroring be false.
700        let mut was_already_erroring = false;
701        rooted!(&in(cx) let undefined_reason = UndefinedValue());
702
703        // If state is "erroring",
704        let reason = if self.is_erroring() {
705            // Set wasAlreadyErroring to true.
706            was_already_erroring = true;
707
708            // Set reason to undefined.
709            undefined_reason.handle()
710        } else {
711            // Use the provided reason.
712            provided_reason
713        };
714
715        // Let promise be a new promise.
716        let promise = Promise::new2(cx, global);
717
718        // Set stream.[[pendingAbortRequest]] to a new pending abort request
719        // whose promise is promise,
720        // reason is reason,
721        // and was already erroring is wasAlreadyErroring.
722        *self.pending_abort_request.borrow_mut() = Some(PendingAbortRequest {
723            promise: promise.clone(),
724            reason: Heap::boxed(reason.get()),
725            was_already_erroring,
726        });
727
728        // If wasAlreadyErroring is false,
729        if !was_already_erroring {
730            // perform ! WritableStreamStartErroring(stream, reason)
731            self.start_erroring(cx, global, reason);
732        }
733
734        // Return promise.
735        promise
736    }
737
738    /// <https://streams.spec.whatwg.org/#writable-stream-close>
739    pub(crate) fn close(&self, cx: &mut JSContext, global: &GlobalScope) -> Rc<Promise> {
740        // Let state be stream.[[state]].
741        // If state is "closed" or "errored",
742        if self.is_closed() || self.is_errored() {
743            // return a promise rejected with a TypeError exception.
744            let promise = Promise::new2(cx, global);
745            promise
746                .reject_error_with_cx(cx, Error::Type(c"Stream is closed or errored.".to_owned()));
747            return promise;
748        }
749
750        // Assert: state is "writable" or "erroring".
751        assert!(self.is_writable() || self.is_erroring());
752
753        // Assert: ! WritableStreamCloseQueuedOrInFlight(stream) is false.
754        assert!(!self.close_queued_or_in_flight());
755
756        // Let promise be a new promise.
757        let promise = Promise::new2(cx, global);
758
759        // Set stream.[[closeRequest]] to promise.
760        *self.close_request.borrow_mut() = Some(promise.clone());
761
762        // Let writer be stream.[[writer]].
763        // If writer is not undefined,
764        if let Some(writer) = self.writer.get() {
765            // and stream.[[backpressure]] is true,
766            // and state is "writable",
767            if self.get_backpressure() && self.is_writable() {
768                // resolve writer.[[readyPromise]] with undefined.
769                writer.resolve_ready_promise_with_undefined(CanGc::from_cx(cx));
770            }
771        }
772
773        // Perform ! WritableStreamDefaultControllerClose(stream.[[controller]]).
774        let Some(controller) = self.controller.get() else {
775            unreachable!("Stream must have a controller.");
776        };
777        controller.close(cx, global);
778
779        // Return promise.
780        promise
781    }
782
783    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-get-desired-size>
784    /// Note: implement as a stream method, as opposed to a writer one, for convenience.
785    pub(crate) fn get_desired_size(&self) -> Option<f64> {
786        // Let stream be writer.[[stream]].
787        // Stream is `self`.
788
789        // Let state be stream.[[state]].
790        // If state is "errored" or "erroring", return null.
791        if self.is_errored() || self.is_erroring() {
792            return None;
793        }
794
795        // If state is "closed", return 0.
796        if self.is_closed() {
797            return Some(0.);
798        }
799
800        let Some(controller) = self.controller.get() else {
801            unreachable!("Stream must have a controller.");
802        };
803        Some(controller.get_desired_size())
804    }
805
806    /// <https://streams.spec.whatwg.org/#acquire-writable-stream-default-writer>
807    pub(crate) fn aquire_default_writer(
808        &self,
809        cx: SafeJSContext,
810        global: &GlobalScope,
811        can_gc: CanGc,
812    ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
813        // Let writer be a new WritableStreamDefaultWriter object.
814        let writer = WritableStreamDefaultWriter::new(global, None, can_gc);
815
816        // Perform ? SetUpWritableStreamDefaultWriter(writer, stream).
817        writer.setup(cx, self, can_gc)?;
818
819        // Return writer.
820        Ok(writer)
821    }
822
823    /// <https://streams.spec.whatwg.org/#writable-stream-update-backpressure>
824    pub(crate) fn update_backpressure(
825        &self,
826        backpressure: bool,
827        global: &GlobalScope,
828        can_gc: CanGc,
829    ) {
830        // Assert: stream.[[state]] is "writable".
831        self.is_writable();
832
833        // Assert: ! WritableStreamCloseQueuedOrInFlight(stream) is false.
834        assert!(!self.close_queued_or_in_flight());
835
836        // Let writer be stream.[[writer]].
837        let writer = self.get_writer();
838
839        if let Some(writer) = writer {
840            // If writer is not undefined
841            if backpressure != self.get_backpressure() {
842                // and backpressure is not stream.[[backpressure]],
843                if backpressure {
844                    // If backpressure is true, set writer.[[readyPromise]] to a new promise.
845                    let promise = Promise::new(global, can_gc);
846                    writer.set_ready_promise(promise);
847                } else {
848                    // Otherwise,
849                    // Assert: backpressure is false.
850                    assert!(!backpressure);
851                    // Resolve writer.[[readyPromise]] with undefined.
852                    writer.resolve_ready_promise_with_undefined(can_gc);
853                }
854            }
855        }
856
857        // Set stream.[[backpressure]] to backpressure.
858        self.set_backpressure(backpressure);
859    }
860
861    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
862    pub(crate) fn setup_cross_realm_transform_writable(
863        &self,
864        cx: &mut JSContext,
865        port: &MessagePort,
866    ) {
867        let port_id = port.message_port_id();
868        let global = self.global();
869
870        // Perform ! InitializeWritableStream(stream).
871        // Done in `new_inherited`.
872
873        // Let sizeAlgorithm be an algorithm that returns 1.
874        // Re-ordered because of the need to pass it to `new`.
875        let size_algorithm =
876            extract_size_algorithm(&QueuingStrategy::default(), CanGc::from_cx(cx));
877
878        // Note: other algorithms defined in the controller at call site.
879
880        // Let backpressurePromise be a new promise.
881        let backpressure_promise = Rc::new(RefCell::new(Some(Promise::new2(cx, &global))));
882
883        // Let controller be a new WritableStreamDefaultController.
884        let controller = WritableStreamDefaultController::new(
885            &global,
886            UnderlyingSinkType::Transfer {
887                backpressure_promise: backpressure_promise.clone(),
888                port: Dom::from_ref(port),
889            },
890            1.0,
891            size_algorithm,
892            CanGc::from_cx(cx),
893        );
894
895        // Add a handler for port’s message event with the following steps:
896        // Add a handler for port’s messageerror event with the following steps:
897        rooted!(&in(cx) let cross_realm_transform_writable = CrossRealmTransformWritable {
898            controller: Dom::from_ref(&controller),
899            backpressure_promise,
900        });
901        global.note_cross_realm_transform_writable(&cross_realm_transform_writable, port_id);
902
903        // Enable port’s port message queue.
904        port.Start(cx);
905
906        // Perform ! SetUpWritableStreamDefaultController
907        controller
908            .setup(cx, &global, self)
909            .expect("Setup for transfer cannot fail");
910    }
911    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink>
912    #[allow(clippy::too_many_arguments)]
913    fn setup_from_underlying_sink(
914        &self,
915        cx: &mut JSContext,
916        global: &GlobalScope,
917        stream: &WritableStream,
918        underlying_sink_obj: SafeHandleObject,
919        underlying_sink: &UnderlyingSink,
920        strategy_hwm: f64,
921        strategy_size: Rc<QueuingStrategySize>,
922    ) -> Result<(), Error> {
923        // Let controller be a new WritableStreamDefaultController.
924
925        // Let startAlgorithm be an algorithm that returns undefined.
926
927        // Let writeAlgorithm be an algorithm that returns a promise resolved with undefined.
928
929        // Let closeAlgorithm be an algorithm that returns a promise resolved with undefined.
930
931        // Let abortAlgorithm be an algorithm that returns a promise resolved with undefined.
932
933        // If underlyingSinkDict["start"] exists, then set startAlgorithm to an algorithm which
934        // returns the result of invoking underlyingSinkDict["start"] with argument
935        // list « controller », exception behavior "rethrow", and callback this value underlyingSink.
936
937        // If underlyingSinkDict["write"] exists, then set writeAlgorithm to an algorithm which
938        // takes an argument chunk and returns the result of invoking underlyingSinkDict["write"]
939        // with argument list « chunk, controller » and callback this value underlyingSink.
940
941        // If underlyingSinkDict["close"] exists, then set closeAlgorithm to an algorithm which
942        // returns the result of invoking underlyingSinkDict["close"] with argument
943        // list «» and callback this value underlyingSink.
944
945        // If underlyingSinkDict["abort"] exists, then set abortAlgorithm to an algorithm which
946        // takes an argument reason and returns the result of invoking underlyingSinkDict["abort"]
947        // with argument list « reason » and callback this value underlyingSink.
948        let controller = WritableStreamDefaultController::new(
949            global,
950            UnderlyingSinkType::new_js(
951                underlying_sink.abort.clone(),
952                underlying_sink.start.clone(),
953                underlying_sink.close.clone(),
954                underlying_sink.write.clone(),
955            ),
956            strategy_hwm,
957            strategy_size,
958            CanGc::from_cx(cx),
959        );
960
961        // Note: this must be done before `setup`,
962        // otherwise `thisOb` is null in the start callback.
963        controller.set_underlying_sink_this_object(underlying_sink_obj);
964
965        // Perform ? SetUpWritableStreamDefaultController
966        controller.setup(cx, global, stream)
967    }
968}
969
970/// <https://streams.spec.whatwg.org/#create-writable-stream>
971#[cfg_attr(crown, expect(crown::unrooted_must_root))]
972pub(crate) fn create_writable_stream(
973    cx: &mut JSContext,
974    global: &GlobalScope,
975    writable_high_water_mark: f64,
976    writable_size_algorithm: Rc<QueuingStrategySize>,
977    underlying_sink_type: UnderlyingSinkType,
978) -> Fallible<DomRoot<WritableStream>> {
979    // Assert: ! IsNonNegativeNumber(highWaterMark) is true.
980    assert!(writable_high_water_mark >= 0.0);
981
982    // Let stream be a new WritableStream.
983    // Perform ! InitializeWritableStream(stream).
984    let stream = WritableStream::new_with_proto(global, None, CanGc::from_cx(cx));
985
986    // Let controller be a new WritableStreamDefaultController.
987    let controller = WritableStreamDefaultController::new(
988        global,
989        underlying_sink_type,
990        writable_high_water_mark,
991        writable_size_algorithm,
992        CanGc::from_cx(cx),
993    );
994
995    // Perform ? SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm,
996    // closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm).
997    controller.setup(cx, global, &stream)?;
998
999    // Return stream.
1000    Ok(stream)
1001}
1002
1003impl WritableStreamMethods<crate::DomTypeHolder> for WritableStream {
1004    /// <https://streams.spec.whatwg.org/#ws-constructor>
1005    fn Constructor(
1006        cx: &mut JSContext,
1007        global: &GlobalScope,
1008        proto: Option<SafeHandleObject>,
1009        underlying_sink: Option<*mut JSObject>,
1010        strategy: &QueuingStrategy,
1011    ) -> Fallible<DomRoot<WritableStream>> {
1012        // If underlyingSink is missing, set it to null.
1013        rooted!(&in(cx) let underlying_sink_obj = underlying_sink.unwrap_or(ptr::null_mut()));
1014
1015        // Let underlyingSinkDict be underlyingSink,
1016        // converted to an IDL value of type UnderlyingSink.
1017        let underlying_sink_dict = if !underlying_sink_obj.is_null() {
1018            rooted!(&in(cx) let obj_val = ObjectValue(underlying_sink_obj.get()));
1019            match UnderlyingSink::new(cx, obj_val.handle()) {
1020                Ok(ConversionResult::Success(val)) => val,
1021                Ok(ConversionResult::Failure(error)) => {
1022                    return Err(Error::Type(error.into_owned()));
1023                },
1024                _ => {
1025                    return Err(Error::JSFailed);
1026                },
1027            }
1028        } else {
1029            UnderlyingSink::empty()
1030        };
1031
1032        if !underlying_sink_dict.type_.handle().is_undefined() {
1033            // If underlyingSinkDict["type"] exists, throw a RangeError exception.
1034            return Err(Error::Range(c"type is set".to_owned()));
1035        }
1036
1037        // Perform ! InitializeWritableStream(this).
1038        let stream = WritableStream::new_with_proto(global, proto, CanGc::from_cx(cx));
1039
1040        // Let sizeAlgorithm be ! ExtractSizeAlgorithm(strategy).
1041        let size_algorithm = extract_size_algorithm(strategy, CanGc::from_cx(cx));
1042
1043        // Let highWaterMark be ? ExtractHighWaterMark(strategy, 1).
1044        let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
1045
1046        // Perform ? SetUpWritableStreamDefaultControllerFromUnderlyingSink(this, underlyingSink,
1047        // underlyingSinkDict, highWaterMark, sizeAlgorithm).
1048        stream.setup_from_underlying_sink(
1049            cx,
1050            global,
1051            &stream,
1052            underlying_sink_obj.handle(),
1053            &underlying_sink_dict,
1054            high_water_mark,
1055            size_algorithm,
1056        )?;
1057
1058        Ok(stream)
1059    }
1060
1061    /// <https://streams.spec.whatwg.org/#ws-locked>
1062    fn Locked(&self) -> bool {
1063        // Return ! IsWritableStreamLocked(this).
1064        self.is_locked()
1065    }
1066
1067    /// <https://streams.spec.whatwg.org/#ws-abort>
1068    fn Abort(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) -> Rc<Promise> {
1069        let global = GlobalScope::from_current_realm(cx);
1070
1071        // If ! IsWritableStreamLocked(this) is true,
1072        if self.is_locked() {
1073            // return a promise rejected with a TypeError exception.
1074            let promise = Promise::new2(cx, &global);
1075            promise.reject_error_with_cx(cx, Error::Type(c"Stream is locked.".to_owned()));
1076            return promise;
1077        }
1078
1079        // Return ! WritableStreamAbort(this, reason).
1080        self.abort(cx, &global, reason)
1081    }
1082
1083    /// <https://streams.spec.whatwg.org/#ws-close>
1084    fn Close(&self, cx: &mut CurrentRealm) -> Rc<Promise> {
1085        let global = GlobalScope::from_current_realm(cx);
1086
1087        // If ! IsWritableStreamLocked(this) is true,
1088        if self.is_locked() {
1089            // return a promise rejected with a TypeError exception.
1090            let promise = Promise::new2(cx, &global);
1091            promise.reject_error_with_cx(cx, Error::Type(c"Stream is locked.".to_owned()));
1092            return promise;
1093        }
1094
1095        // If ! WritableStreamCloseQueuedOrInFlight(this) is true
1096        if self.close_queued_or_in_flight() {
1097            // return a promise rejected with a TypeError exception.
1098            let promise = Promise::new2(cx, &global);
1099            promise.reject_error_with_cx(
1100                cx,
1101                Error::Type(c"Stream has closed queued or in-flight".to_owned()),
1102            );
1103            return promise;
1104        }
1105
1106        // Return ! WritableStreamClose(this).
1107        self.close(cx, &global)
1108    }
1109
1110    /// <https://streams.spec.whatwg.org/#ws-get-writer>
1111    fn GetWriter(
1112        &self,
1113        realm: InRealm,
1114        can_gc: CanGc,
1115    ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
1116        let cx = GlobalScope::get_cx();
1117        let global = GlobalScope::from_safe_context(cx, realm);
1118
1119        // Return ? AcquireWritableStreamDefaultWriter(this).
1120        self.aquire_default_writer(cx, &global, can_gc)
1121    }
1122}
1123
1124impl js::gc::Rootable for CrossRealmTransformWritable {}
1125
1126/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
1127/// A wrapper to handle `message` and `messageerror` events
1128/// for the port used by the transfered stream.
1129#[derive(Clone, JSTraceable, MallocSizeOf)]
1130#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
1131pub(crate) struct CrossRealmTransformWritable {
1132    /// The controller used in the algorithm.
1133    controller: Dom<WritableStreamDefaultController>,
1134
1135    /// The `backpressurePromise` used in the algorithm.
1136    #[ignore_malloc_size_of = "nested Rc"]
1137    backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
1138}
1139
1140impl CrossRealmTransformWritable {
1141    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
1142    /// Add a handler for port’s message event with the following steps:
1143    pub(crate) fn handle_message(
1144        &self,
1145        cx: &mut CurrentRealm,
1146        global: &GlobalScope,
1147        message: SafeHandleValue,
1148    ) {
1149        rooted!(&in(cx) let mut value = UndefinedValue());
1150        let type_string = get_type_and_value_from_message(cx, message, value.handle_mut());
1151
1152        // If type is "pull",
1153        // Done below as the steps are the same for both types.
1154
1155        // Otherwise, if type is "error",
1156        if type_string == "error" {
1157            // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, value).
1158            self.controller.error_if_needed(cx, value.handle(), global);
1159        }
1160
1161        let backpressure_promise = self.backpressure_promise.borrow_mut().take();
1162
1163        // Note: the below steps are for both "pull" and "error" types.
1164        // If backpressurePromise is not undefined,
1165        if let Some(promise) = backpressure_promise {
1166            // Resolve backpressurePromise with undefined.
1167            promise.resolve_native_with_cx(cx, &());
1168
1169            // Set backpressurePromise to undefined.
1170            // Done above with `take`.
1171        }
1172    }
1173
1174    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
1175    /// Add a handler for port’s messageerror event with the following steps:
1176    pub(crate) fn handle_error(
1177        &self,
1178        cx: &mut CurrentRealm,
1179        global: &GlobalScope,
1180        port: &MessagePort,
1181    ) {
1182        // Let error be a new "DataCloneError" DOMException.
1183        let error = DOMException::new(global, DOMErrorName::DataCloneError, CanGc::from_cx(cx));
1184        rooted!(&in(cx) let mut rooted_error = UndefinedValue());
1185        error.safe_to_jsval(cx.into(), rooted_error.handle_mut(), CanGc::from_cx(cx));
1186
1187        // Perform ! CrossRealmTransformSendError(port, error).
1188        port.cross_realm_transform_send_error(cx, rooted_error.handle());
1189
1190        // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, error).
1191        self.controller
1192            .error_if_needed(cx, rooted_error.handle(), global);
1193
1194        // Disentangle port.
1195        global.disentangle_port(cx, port);
1196    }
1197}
1198
1199/// <https://streams.spec.whatwg.org/#ws-transfer>
1200impl Transferable for WritableStream {
1201    type Index = MessagePortIndex;
1202    type Data = MessagePortImpl;
1203
1204    /// <https://streams.spec.whatwg.org/#ref-for-transfer-steps①>
1205    fn transfer(&self, cx: &mut JSContext) -> Fallible<(MessagePortId, MessagePortImpl)> {
1206        // Step 1. If ! IsWritableStreamLocked(value) is true, throw a
1207        // "DataCloneError" DOMException.
1208        if self.is_locked() {
1209            return Err(Error::DataClone(None));
1210        }
1211
1212        let global = self.global();
1213        let mut realm = enter_auto_realm(cx, &*global);
1214        let mut realm = realm.current_realm();
1215        let cx = &mut realm;
1216
1217        // Step 2. Let port1 be a new MessagePort in the current Realm.
1218        let port_1 = MessagePort::new(&global, CanGc::from_cx(cx));
1219        global.track_message_port(&port_1, None);
1220
1221        // Step 3. Let port2 be a new MessagePort in the current Realm.
1222        let port_2 = MessagePort::new(&global, CanGc::from_cx(cx));
1223        global.track_message_port(&port_2, None);
1224
1225        // Step 4. Entangle port1 and port2.
1226        global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
1227
1228        // Step 5. Let readable be a new ReadableStream in the current Realm.
1229        let readable = ReadableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
1230
1231        // Step 6. Perform ! SetUpCrossRealmTransformReadable(readable, port1).
1232        readable.setup_cross_realm_transform_readable(cx, &port_1);
1233
1234        // Step 7. Let promise be ! ReadableStreamPipeTo(readable, value, false, false, false).
1235        let promise = readable.pipe_to(cx, &global, self, false, false, false, None);
1236
1237        // Step 8. Set promise.[[PromiseIsHandled]] to true.
1238        promise.set_promise_is_handled();
1239
1240        // Step 9. Set dataHolder.[[port]] to ! StructuredSerializeWithTransfer(port2, « port2 »).
1241        port_2.transfer(cx)
1242    }
1243
1244    /// <https://streams.spec.whatwg.org/#ref-for-transfer-receiving-steps①>
1245    fn transfer_receive(
1246        cx: &mut JSContext,
1247        owner: &GlobalScope,
1248        id: MessagePortId,
1249        port_impl: MessagePortImpl,
1250    ) -> Result<DomRoot<Self>, ()> {
1251        // Their transfer-receiving steps, given dataHolder and value, are:
1252        // Note: dataHolder is used in `structuredclone.rs`, and value is created here.
1253        let value = WritableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1254
1255        // Step 1. Let deserializedRecord be !
1256        // StructuredDeserializeWithTransfer(dataHolder.[[port]], the current
1257        // Realm).
1258        // Done with the `Deserialize` derive of `MessagePortImpl`.
1259
1260        // Step 2. Let port be deserializedRecord.[[Deserialized]].
1261        let transferred_port = MessagePort::transfer_receive(cx, owner, id, port_impl)?;
1262
1263        // Step 3. Perform ! SetUpCrossRealmTransformWritable(value, port).
1264        value.setup_cross_realm_transform_writable(cx, &transferred_port);
1265        Ok(value)
1266    }
1267
1268    /// Note: we are relying on the port transfer, so the data returned here are related to the port.
1269    fn serialized_storage<'a>(
1270        data: StructuredData<'a, '_>,
1271    ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
1272        match data {
1273            StructuredData::Reader(r) => &mut r.port_impls,
1274            StructuredData::Writer(w) => &mut w.ports,
1275        }
1276    }
1277}