script/dom/
writablestream.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::collections::{HashMap, VecDeque};
7use std::mem;
8use std::ptr::{self};
9use std::rc::Rc;
10
11use base::id::{MessagePortId, MessagePortIndex};
12use constellation_traits::MessagePortImpl;
13use dom_struct::dom_struct;
14use js::jsapi::{Heap, JSObject};
15use js::jsval::{JSVal, ObjectValue, UndefinedValue};
16use js::rust::{
17    HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
18    MutableHandleValue as SafeMutableHandleValue,
19};
20use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
21use script_bindings::conversions::SafeToJSValConvertible;
22
23use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
24use crate::dom::bindings::cell::DomRefCell;
25use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
26use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::UnderlyingSink;
27use crate::dom::bindings::codegen::Bindings::WritableStreamBinding::WritableStreamMethods;
28use crate::dom::bindings::conversions::ConversionResult;
29use crate::dom::bindings::error::{Error, Fallible};
30use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
31use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
32use crate::dom::bindings::structuredclone::StructuredData;
33use crate::dom::bindings::transferable::Transferable;
34use crate::dom::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
35use crate::dom::domexception::{DOMErrorName, DOMException};
36use crate::dom::globalscope::GlobalScope;
37use crate::dom::messageport::MessagePort;
38use crate::dom::promise::Promise;
39use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
40use crate::dom::readablestream::{ReadableStream, get_type_and_value_from_message};
41use crate::dom::writablestreamdefaultcontroller::{
42    UnderlyingSinkType, WritableStreamDefaultController,
43};
44use crate::dom::writablestreamdefaultwriter::WritableStreamDefaultWriter;
45use crate::realms::{InRealm, enter_realm};
46use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
47
48impl js::gc::Rootable for AbortAlgorithmFulfillmentHandler {}
49
50/// The fulfillment handler for the abort steps of
51/// <https://streams.spec.whatwg.org/#writable-stream-finish-erroring>
52#[derive(JSTraceable, MallocSizeOf)]
53#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
54struct AbortAlgorithmFulfillmentHandler {
55    stream: Dom<WritableStream>,
56    #[ignore_malloc_size_of = "Rc is hard"]
57    abort_request_promise: Rc<Promise>,
58}
59
60impl Callback for AbortAlgorithmFulfillmentHandler {
61    fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
62        // Resolve abortRequest’s promise with undefined.
63        self.abort_request_promise.resolve_native(&(), can_gc);
64
65        // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
66        self.stream
67            .as_rooted()
68            .reject_close_and_closed_promise_if_needed(cx, can_gc);
69    }
70}
71
72impl js::gc::Rootable for AbortAlgorithmRejectionHandler {}
73
74/// The rejection handler for the abort steps of
75/// <https://streams.spec.whatwg.org/#writable-stream-finish-erroring>
76#[derive(JSTraceable, MallocSizeOf)]
77#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
78struct AbortAlgorithmRejectionHandler {
79    stream: Dom<WritableStream>,
80    #[ignore_malloc_size_of = "Rc is hard"]
81    abort_request_promise: Rc<Promise>,
82}
83
84impl Callback for AbortAlgorithmRejectionHandler {
85    fn callback(&self, cx: SafeJSContext, reason: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
86        // Reject abortRequest’s promise with reason.
87        self.abort_request_promise.reject_native(&reason, can_gc);
88
89        // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
90        self.stream
91            .as_rooted()
92            .reject_close_and_closed_promise_if_needed(cx, can_gc);
93    }
94}
95
96impl js::gc::Rootable for PendingAbortRequest {}
97
98/// <https://streams.spec.whatwg.org/#pending-abort-request>
99#[derive(JSTraceable, MallocSizeOf)]
100#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
101struct PendingAbortRequest {
102    /// <https://streams.spec.whatwg.org/#pending-abort-request-promise>
103    #[ignore_malloc_size_of = "Rc is hard"]
104    promise: Rc<Promise>,
105
106    /// <https://streams.spec.whatwg.org/#pending-abort-request-reason>
107    #[ignore_malloc_size_of = "mozjs"]
108    reason: Box<Heap<JSVal>>,
109
110    /// <https://streams.spec.whatwg.org/#pending-abort-request-was-already-erroring>
111    was_already_erroring: bool,
112}
113
114/// <https://streams.spec.whatwg.org/#writablestream-state>
115#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf)]
116pub(crate) enum WritableStreamState {
117    #[default]
118    Writable,
119    Closed,
120    Erroring,
121    Errored,
122}
123
124/// <https://streams.spec.whatwg.org/#ws-class>
125#[dom_struct]
126pub struct WritableStream {
127    reflector_: Reflector,
128
129    /// <https://streams.spec.whatwg.org/#writablestream-backpressure>
130    backpressure: Cell<bool>,
131
132    /// <https://streams.spec.whatwg.org/#writablestream-closerequest>
133    #[ignore_malloc_size_of = "Rc is hard"]
134    close_request: DomRefCell<Option<Rc<Promise>>>,
135
136    /// <https://streams.spec.whatwg.org/#writablestream-controller>
137    controller: MutNullableDom<WritableStreamDefaultController>,
138
139    /// <https://streams.spec.whatwg.org/#writablestream-detached>
140    detached: Cell<bool>,
141
142    /// <https://streams.spec.whatwg.org/#writablestream-inflightwriterequest>
143    #[ignore_malloc_size_of = "Rc is hard"]
144    in_flight_write_request: DomRefCell<Option<Rc<Promise>>>,
145
146    /// <https://streams.spec.whatwg.org/#writablestream-inflightcloserequest>
147    #[ignore_malloc_size_of = "Rc is hard"]
148    in_flight_close_request: DomRefCell<Option<Rc<Promise>>>,
149
150    /// <https://streams.spec.whatwg.org/#writablestream-pendingabortrequest>
151    pending_abort_request: DomRefCell<Option<PendingAbortRequest>>,
152
153    /// <https://streams.spec.whatwg.org/#writablestream-state>
154    state: Cell<WritableStreamState>,
155
156    /// <https://streams.spec.whatwg.org/#writablestream-storederror>
157    #[ignore_malloc_size_of = "mozjs"]
158    stored_error: Heap<JSVal>,
159
160    /// <https://streams.spec.whatwg.org/#writablestream-writer>
161    writer: MutNullableDom<WritableStreamDefaultWriter>,
162
163    /// <https://streams.spec.whatwg.org/#writablestream-writerequests>
164    #[ignore_malloc_size_of = "Rc is hard"]
165    write_requests: DomRefCell<VecDeque<Rc<Promise>>>,
166}
167
168impl WritableStream {
169    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
170    /// <https://streams.spec.whatwg.org/#initialize-writable-stream>
171    fn new_inherited() -> WritableStream {
172        WritableStream {
173            reflector_: Reflector::new(),
174            backpressure: Default::default(),
175            close_request: Default::default(),
176            controller: Default::default(),
177            detached: Default::default(),
178            in_flight_write_request: Default::default(),
179            in_flight_close_request: Default::default(),
180            pending_abort_request: Default::default(),
181            state: Default::default(),
182            stored_error: Default::default(),
183            writer: Default::default(),
184            write_requests: Default::default(),
185        }
186    }
187
188    pub(crate) fn new_with_proto(
189        global: &GlobalScope,
190        proto: Option<SafeHandleObject>,
191        can_gc: CanGc,
192    ) -> DomRoot<WritableStream> {
193        reflect_dom_object_with_proto(
194            Box::new(WritableStream::new_inherited()),
195            global,
196            proto,
197            can_gc,
198        )
199    }
200
201    /// Used as part of
202    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
203    pub(crate) fn assert_no_controller(&self) {
204        assert!(self.controller.get().is_none());
205    }
206
207    /// Used as part of
208    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
209    pub(crate) fn set_default_controller(&self, controller: &WritableStreamDefaultController) {
210        self.controller.set(Some(controller));
211    }
212
213    #[allow(unused)]
214    pub(crate) fn get_default_controller(&self) -> DomRoot<WritableStreamDefaultController> {
215        self.controller.get().expect("Controller should be set.")
216    }
217
218    pub(crate) fn is_writable(&self) -> bool {
219        matches!(self.state.get(), WritableStreamState::Writable)
220    }
221
222    pub(crate) fn is_erroring(&self) -> bool {
223        matches!(self.state.get(), WritableStreamState::Erroring)
224    }
225
226    pub(crate) fn is_errored(&self) -> bool {
227        matches!(self.state.get(), WritableStreamState::Errored)
228    }
229
230    pub(crate) fn is_closed(&self) -> bool {
231        matches!(self.state.get(), WritableStreamState::Closed)
232    }
233
234    pub(crate) fn has_in_flight_write_request(&self) -> bool {
235        self.in_flight_write_request.borrow().is_some()
236    }
237
238    /// <https://streams.spec.whatwg.org/#writable-stream-has-operation-marked-in-flight>
239    pub(crate) fn has_operations_marked_inflight(&self) -> bool {
240        let in_flight_write_requested = self.in_flight_write_request.borrow().is_some();
241        let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
242
243        in_flight_write_requested || in_flight_close_requested
244    }
245
246    /// <https://streams.spec.whatwg.org/#writablestream-storederror>
247    pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
248        handle_mut.set(self.stored_error.get());
249    }
250
251    /// <https://streams.spec.whatwg.org/#writable-stream-finish-erroring>
252    pub(crate) fn finish_erroring(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
253        // Assert: stream.[[state]] is "erroring".
254        assert!(self.is_erroring());
255
256        // Assert: ! WritableStreamHasOperationMarkedInFlight(stream) is false.
257        assert!(!self.has_operations_marked_inflight());
258
259        // Set stream.[[state]] to "errored".
260        self.state.set(WritableStreamState::Errored);
261
262        // Perform ! stream.[[controller]].[[ErrorSteps]]().
263        let Some(controller) = self.controller.get() else {
264            unreachable!("Stream should have a controller.");
265        };
266        controller.perform_error_steps();
267
268        // Let storedError be stream.[[storedError]].
269        rooted!(in(*cx) let mut stored_error = UndefinedValue());
270        self.get_stored_error(stored_error.handle_mut());
271
272        // For each writeRequest of stream.[[writeRequests]]:
273        let write_requests = mem::take(&mut *self.write_requests.borrow_mut());
274        for request in write_requests {
275            // Reject writeRequest with storedError.
276            request.reject(cx, stored_error.handle(), can_gc);
277        }
278
279        // Set stream.[[writeRequests]] to an empty list.
280        // Done above with `drain`.
281
282        // If stream.[[pendingAbortRequest]] is undefined,
283        if self.pending_abort_request.borrow().is_none() {
284            // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
285            self.reject_close_and_closed_promise_if_needed(cx, can_gc);
286
287            // Return.
288            return;
289        }
290
291        // Let abortRequest be stream.[[pendingAbortRequest]].
292        // Set stream.[[pendingAbortRequest]] to undefined.
293        rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
294        if let Some(pending_abort_request) = &*pending_abort_request {
295            // If abortRequest’s was already erroring is true,
296            if pending_abort_request.was_already_erroring {
297                // Reject abortRequest’s promise with storedError.
298                pending_abort_request
299                    .promise
300                    .reject(cx, stored_error.handle(), can_gc);
301
302                // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
303                self.reject_close_and_closed_promise_if_needed(cx, can_gc);
304
305                // Return.
306                return;
307            }
308
309            // Let promise be ! stream.[[controller]].[[AbortSteps]](abortRequest’s reason).
310            rooted!(in(*cx) let mut reason = UndefinedValue());
311            reason.set(pending_abort_request.reason.get());
312            let promise = controller.abort_steps(cx, global, reason.handle(), can_gc);
313
314            // Upon fulfillment of promise,
315            rooted!(in(*cx) let mut fulfillment_handler = Some(AbortAlgorithmFulfillmentHandler {
316                stream: Dom::from_ref(self),
317                abort_request_promise: pending_abort_request.promise.clone(),
318            }));
319
320            // Upon rejection of promise with reason r,
321            rooted!(in(*cx) let mut rejection_handler = Some(AbortAlgorithmRejectionHandler {
322                stream: Dom::from_ref(self),
323                abort_request_promise: pending_abort_request.promise.clone(),
324            }));
325
326            let handler = PromiseNativeHandler::new(
327                global,
328                fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
329                rejection_handler.take().map(|h| Box::new(h) as Box<_>),
330                can_gc,
331            );
332            let realm = enter_realm(global);
333            let comp = InRealm::Entered(&realm);
334            promise.append_native_handler(&handler, comp, can_gc);
335        }
336    }
337
338    /// <https://streams.spec.whatwg.org/#writable-stream-reject-close-and-closed-promise-if-needed>
339    fn reject_close_and_closed_promise_if_needed(&self, cx: SafeJSContext, can_gc: CanGc) {
340        // Assert: stream.[[state]] is "errored".
341        assert!(self.is_errored());
342
343        rooted!(in(*cx) let mut stored_error = UndefinedValue());
344        self.get_stored_error(stored_error.handle_mut());
345
346        // If stream.[[closeRequest]] is not undefined
347        let close_request = self.close_request.borrow_mut().take();
348        if let Some(close_request) = close_request {
349            // Assert: stream.[[inFlightCloseRequest]] is undefined.
350            assert!(self.in_flight_close_request.borrow().is_none());
351
352            // Reject stream.[[closeRequest]] with stream.[[storedError]].
353            close_request.reject_native(&stored_error.handle(), can_gc)
354
355            // Set stream.[[closeRequest]] to undefined.
356            // Done with `take` above.
357        }
358
359        // Let writer be stream.[[writer]].
360        // If writer is not undefined,
361        if let Some(writer) = self.writer.get() {
362            // Reject writer.[[closedPromise]] with stream.[[storedError]].
363            writer.reject_closed_promise_with_stored_error(&stored_error.handle(), can_gc);
364
365            // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
366            writer.set_close_promise_is_handled();
367        }
368    }
369
370    /// <https://streams.spec.whatwg.org/#writable-stream-close-queued-or-in-flight>
371    pub(crate) fn close_queued_or_in_flight(&self) -> bool {
372        let close_requested = self.close_request.borrow().is_some();
373        let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
374
375        close_requested || in_flight_close_requested
376    }
377
378    /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-write>
379    pub(crate) fn finish_in_flight_write(&self, can_gc: CanGc) {
380        let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
381            // Assert: stream.[[inFlightWriteRequest]] is not undefined.
382            unreachable!("Stream should have a write request");
383        };
384
385        // Resolve stream.[[inFlightWriteRequest]] with undefined.
386        in_flight_write_request.resolve_native(&(), can_gc);
387
388        // Set stream.[[inFlightWriteRequest]] to undefined.
389        // Done above with `take`.
390    }
391
392    /// <https://streams.spec.whatwg.org/#writable-stream-start-erroring>
393    pub(crate) fn start_erroring(
394        &self,
395        cx: SafeJSContext,
396        global: &GlobalScope,
397        error: SafeHandleValue,
398        can_gc: CanGc,
399    ) {
400        // Assert: stream.[[storedError]] is undefined.
401        assert!(self.stored_error.get().is_undefined());
402
403        // Assert: stream.[[state]] is "writable".
404        assert!(self.is_writable());
405
406        // Let controller be stream.[[controller]].
407        let Some(controller) = self.controller.get() else {
408            // Assert: controller is not undefined.
409            unreachable!("Stream should have a controller.");
410        };
411
412        // Set stream.[[state]] to "erroring".
413        self.state.set(WritableStreamState::Erroring);
414
415        // Set stream.[[storedError]] to reason.
416        self.stored_error.set(*error);
417
418        // Let writer be stream.[[writer]].
419        if let Some(writer) = self.writer.get() {
420            // If writer is not undefined, perform ! WritableStreamDefaultWriterEnsureReadyPromiseRejected
421            writer.ensure_ready_promise_rejected(global, error, can_gc);
422        }
423
424        // If ! WritableStreamHasOperationMarkedInFlight(stream) is false and controller.[[started]] is true
425        if !self.has_operations_marked_inflight() && controller.started() {
426            // perform ! WritableStreamFinishErroring
427            self.finish_erroring(cx, global, can_gc);
428        }
429    }
430
431    /// <https://streams.spec.whatwg.org/#writable-stream-deal-with-rejection>
432    pub(crate) fn deal_with_rejection(
433        &self,
434        cx: SafeJSContext,
435        global: &GlobalScope,
436        error: SafeHandleValue,
437        can_gc: CanGc,
438    ) {
439        // Let state be stream.[[state]].
440
441        // If state is "writable",
442        if self.is_writable() {
443            // Perform ! WritableStreamStartErroring(stream, error).
444            self.start_erroring(cx, global, error, can_gc);
445
446            // Return.
447            return;
448        }
449
450        // Assert: state is "erroring".
451        assert!(self.is_erroring());
452
453        // Perform ! WritableStreamFinishErroring(stream).
454        self.finish_erroring(cx, global, can_gc);
455    }
456
457    /// <https://streams.spec.whatwg.org/#writable-stream-mark-first-write-request-in-flight>
458    pub(crate) fn mark_first_write_request_in_flight(&self) {
459        let mut in_flight_write_request = self.in_flight_write_request.borrow_mut();
460        let mut write_requests = self.write_requests.borrow_mut();
461
462        // Assert: stream.[[inFlightWriteRequest]] is undefined.
463        assert!(in_flight_write_request.is_none());
464
465        // Assert: stream.[[writeRequests]] is not empty.
466        assert!(!write_requests.is_empty());
467
468        // Let writeRequest be stream.[[writeRequests]][0].
469        // Remove writeRequest from stream.[[writeRequests]].
470        let write_request = write_requests.pop_front().unwrap();
471
472        // Set stream.[[inFlightWriteRequest]] to writeRequest.
473        *in_flight_write_request = Some(write_request);
474    }
475
476    /// <https://streams.spec.whatwg.org/#writable-stream-mark-close-request-in-flight>
477    pub(crate) fn mark_close_request_in_flight(&self) {
478        let mut in_flight_close_request = self.in_flight_close_request.borrow_mut();
479        let mut close_request = self.close_request.borrow_mut();
480
481        // Assert: stream.[[inFlightCloseRequest]] is undefined.
482        assert!(in_flight_close_request.is_none());
483
484        // Assert: stream.[[closeRequest]] is not undefined.
485        assert!(close_request.is_some());
486
487        // Let closeRequest be stream.[[closeRequest]].
488        // Set stream.[[closeRequest]] to undefined.
489        let close_request = close_request.take().unwrap();
490
491        // Set stream.[[inFlightCloseRequest]] to closeRequest.
492        *in_flight_close_request = Some(close_request);
493    }
494
495    /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-close>
496    pub(crate) fn finish_in_flight_close(&self, cx: SafeJSContext, can_gc: CanGc) {
497        let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
498            // Assert: stream.[[inFlightCloseRequest]] is not undefined.
499            unreachable!("in_flight_close_request must be Some");
500        };
501
502        // Resolve stream.[[inFlightCloseRequest]] with undefined.
503        in_flight_close_request.resolve_native(&(), can_gc);
504
505        // Set stream.[[inFlightCloseRequest]] to undefined.
506        // Done with take above.
507
508        // Assert: stream.[[state]] is "writable" or "erroring".
509        assert!(self.is_writable() || self.is_erroring());
510
511        // If state is "erroring",
512        if self.is_erroring() {
513            // Set stream.[[storedError]] to undefined.
514            self.stored_error.set(UndefinedValue());
515
516            // If stream.[[pendingAbortRequest]] is not undefined,
517            rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
518            if let Some(pending_abort_request) = &*pending_abort_request {
519                // Resolve stream.[[pendingAbortRequest]]'s promise with undefined.
520                pending_abort_request.promise.resolve_native(&(), can_gc);
521
522                // Set stream.[[pendingAbortRequest]] to undefined.
523                // Done above with `take`.
524            }
525        }
526
527        // Set stream.[[state]] to "closed".
528        self.state.set(WritableStreamState::Closed);
529
530        // Let writer be stream.[[writer]].
531        if let Some(writer) = self.writer.get() {
532            // If writer is not undefined,
533            // resolve writer.[[closedPromise]] with undefined.
534            writer.resolve_closed_promise_with_undefined(can_gc);
535        }
536
537        // Assert: stream.[[pendingAbortRequest]] is undefined.
538        assert!(self.pending_abort_request.borrow().is_none());
539
540        // Assert: stream.[[storedError]] is undefined.
541        assert!(self.stored_error.get().is_undefined());
542    }
543
544    /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-close-with-error>
545    pub(crate) fn finish_in_flight_close_with_error(
546        &self,
547        cx: SafeJSContext,
548        global: &GlobalScope,
549        error: SafeHandleValue,
550        can_gc: CanGc,
551    ) {
552        let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
553            // Assert: stream.[[inFlightCloseRequest]] is not undefined.
554            unreachable!("Inflight close request must be defined.");
555        };
556
557        // Reject stream.[[inFlightCloseRequest]] with error.
558        in_flight_close_request.reject_native(&error, can_gc);
559
560        // Set stream.[[inFlightCloseRequest]] to undefined.
561        // Done above with `take`.
562
563        // Assert: stream.[[state]] is "writable" or "erroring".
564        assert!(self.is_erroring() || self.is_writable());
565
566        // If stream.[[pendingAbortRequest]] is not undefined,
567        rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
568        if let Some(pending_abort_request) = &*pending_abort_request {
569            // Reject stream.[[pendingAbortRequest]]'s promise with error.
570            pending_abort_request.promise.reject_native(&error, can_gc);
571
572            // Set stream.[[pendingAbortRequest]] to undefined.
573            // Done above with `take`.
574        }
575
576        // Perform ! WritableStreamDealWithRejection(stream, error).
577        self.deal_with_rejection(cx, global, error, can_gc);
578    }
579
580    /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-write-with-error>
581    pub(crate) fn finish_in_flight_write_with_error(
582        &self,
583        cx: SafeJSContext,
584        global: &GlobalScope,
585        error: SafeHandleValue,
586        can_gc: CanGc,
587    ) {
588        let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
589            // Assert: stream.[[inFlightWriteRequest]] is not undefined.
590            unreachable!("Inflight write request must be defined.");
591        };
592
593        // Reject stream.[[inFlightWriteRequest]] with error.
594        in_flight_write_request.reject_native(&error, can_gc);
595
596        // Set stream.[[inFlightWriteRequest]] to undefined.
597        // Done above with `take`.
598
599        // Assert: stream.[[state]] is "writable" or "erroring".
600        assert!(self.is_erroring() || self.is_writable());
601
602        // Perform ! WritableStreamDealWithRejection(stream, error).
603        self.deal_with_rejection(cx, global, error, can_gc);
604    }
605
606    pub(crate) fn get_writer(&self) -> Option<DomRoot<WritableStreamDefaultWriter>> {
607        self.writer.get()
608    }
609
610    pub(crate) fn set_writer(&self, writer: Option<&WritableStreamDefaultWriter>) {
611        self.writer.set(writer);
612    }
613
614    pub(crate) fn set_backpressure(&self, backpressure: bool) {
615        self.backpressure.set(backpressure);
616    }
617
618    pub(crate) fn get_backpressure(&self) -> bool {
619        self.backpressure.get()
620    }
621
622    /// <https://streams.spec.whatwg.org/#is-writable-stream-locked>
623    pub(crate) fn is_locked(&self) -> bool {
624        // If stream.[[writer]] is undefined, return false.
625        // Return true.
626        self.get_writer().is_some()
627    }
628
629    /// <https://streams.spec.whatwg.org/#writable-stream-add-write-request>
630    pub(crate) fn add_write_request(&self, global: &GlobalScope, can_gc: CanGc) -> Rc<Promise> {
631        // Assert: ! IsWritableStreamLocked(stream) is true.
632        assert!(self.is_locked());
633
634        // Assert: stream.[[state]] is "writable".
635        assert!(self.is_writable());
636
637        // Let promise be a new promise.
638        let promise = Promise::new(global, can_gc);
639
640        // Append promise to stream.[[writeRequests]].
641        self.write_requests.borrow_mut().push_back(promise.clone());
642
643        // Return promise.
644        promise
645    }
646
647    // Returns the rooted controller of the stream, if any.
648    pub(crate) fn get_controller(&self) -> Option<DomRoot<WritableStreamDefaultController>> {
649        self.controller.get()
650    }
651
652    /// <https://streams.spec.whatwg.org/#writable-stream-abort>
653    pub(crate) fn abort(
654        &self,
655        cx: SafeJSContext,
656        global: &GlobalScope,
657        provided_reason: SafeHandleValue,
658        realm: InRealm,
659        can_gc: CanGc,
660    ) -> Rc<Promise> {
661        // If stream.[[state]] is "closed" or "errored",
662        if self.is_closed() || self.is_errored() {
663            // return a promise resolved with undefined.
664            return Promise::new_resolved(global, cx, (), can_gc);
665        }
666
667        // Signal abort on stream.[[controller]].[[abortController]] with reason.
668        self.get_controller()
669            .expect("Stream must have a controller.")
670            .signal_abort(cx, provided_reason, realm, can_gc);
671
672        // Let state be stream.[[state]].
673        let state = self.state.get();
674
675        // If state is "closed" or "errored", return a promise resolved with undefined.
676        if matches!(
677            state,
678            WritableStreamState::Closed | WritableStreamState::Errored
679        ) {
680            return Promise::new_resolved(global, cx, (), can_gc);
681        }
682
683        // If stream.[[pendingAbortRequest]] is not undefined,
684        if self.pending_abort_request.borrow().is_some() {
685            // return stream.[[pendingAbortRequest]]'s promise.
686            return self
687                .pending_abort_request
688                .borrow()
689                .as_ref()
690                .expect("Pending abort request must be Some.")
691                .promise
692                .clone();
693        }
694
695        // Assert: state is "writable" or "erroring".
696        assert!(self.is_writable() || self.is_erroring());
697
698        // Let wasAlreadyErroring be false.
699        let mut was_already_erroring = false;
700        rooted!(in(*cx) let undefined_reason = UndefinedValue());
701
702        // If state is "erroring",
703        let reason = if self.is_erroring() {
704            // Set wasAlreadyErroring to true.
705            was_already_erroring = true;
706
707            // Set reason to undefined.
708            undefined_reason.handle()
709        } else {
710            // Use the provided reason.
711            provided_reason
712        };
713
714        // Let promise be a new promise.
715        let promise = Promise::new(global, can_gc);
716
717        // Set stream.[[pendingAbortRequest]] to a new pending abort request
718        // whose promise is promise,
719        // reason is reason,
720        // and was already erroring is wasAlreadyErroring.
721        *self.pending_abort_request.borrow_mut() = Some(PendingAbortRequest {
722            promise: promise.clone(),
723            reason: Heap::boxed(reason.get()),
724            was_already_erroring,
725        });
726
727        // If wasAlreadyErroring is false,
728        if !was_already_erroring {
729            // perform ! WritableStreamStartErroring(stream, reason)
730            self.start_erroring(cx, global, reason, can_gc);
731        }
732
733        // Return promise.
734        promise
735    }
736
737    /// <https://streams.spec.whatwg.org/#writable-stream-close>
738    pub(crate) fn close(
739        &self,
740        cx: SafeJSContext,
741        global: &GlobalScope,
742        can_gc: CanGc,
743    ) -> Rc<Promise> {
744        // Let state be stream.[[state]].
745        // If state is "closed" or "errored",
746        if self.is_closed() || self.is_errored() {
747            // return a promise rejected with a TypeError exception.
748            let promise = Promise::new(global, can_gc);
749            promise.reject_error(
750                Error::Type("Stream is closed or errored.".to_string()),
751                can_gc,
752            );
753            return promise;
754        }
755
756        // Assert: state is "writable" or "erroring".
757        assert!(self.is_writable() || self.is_erroring());
758
759        // Assert: ! WritableStreamCloseQueuedOrInFlight(stream) is false.
760        assert!(!self.close_queued_or_in_flight());
761
762        // Let promise be a new promise.
763        let promise = Promise::new(global, can_gc);
764
765        // Set stream.[[closeRequest]] to promise.
766        *self.close_request.borrow_mut() = Some(promise.clone());
767
768        // Let writer be stream.[[writer]].
769        // If writer is not undefined,
770        if let Some(writer) = self.writer.get() {
771            // and stream.[[backpressure]] is true,
772            // and state is "writable",
773            if self.get_backpressure() && self.is_writable() {
774                // resolve writer.[[readyPromise]] with undefined.
775                writer.resolve_ready_promise_with_undefined(can_gc);
776            }
777        }
778
779        // Perform ! WritableStreamDefaultControllerClose(stream.[[controller]]).
780        let Some(controller) = self.controller.get() else {
781            unreachable!("Stream must have a controller.");
782        };
783        controller.close(cx, global, can_gc);
784
785        // Return promise.
786        promise
787    }
788
789    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-get-desired-size>
790    /// Note: implement as a stream method, as opposed to a writer one, for convenience.
791    pub(crate) fn get_desired_size(&self) -> Option<f64> {
792        // Let stream be writer.[[stream]].
793        // Stream is `self`.
794
795        // Let state be stream.[[state]].
796        // If state is "errored" or "erroring", return null.
797        if self.is_errored() || self.is_erroring() {
798            return None;
799        }
800
801        // If state is "closed", return 0.
802        if self.is_closed() {
803            return Some(0.);
804        }
805
806        let Some(controller) = self.controller.get() else {
807            unreachable!("Stream must have a controller.");
808        };
809        Some(controller.get_desired_size())
810    }
811
812    /// <https://streams.spec.whatwg.org/#acquire-writable-stream-default-writer>
813    pub(crate) fn aquire_default_writer(
814        &self,
815        cx: SafeJSContext,
816        global: &GlobalScope,
817        can_gc: CanGc,
818    ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
819        // Let writer be a new WritableStreamDefaultWriter object.
820        let writer = WritableStreamDefaultWriter::new(global, None, can_gc);
821
822        // Perform ? SetUpWritableStreamDefaultWriter(writer, stream).
823        writer.setup(cx, self, can_gc)?;
824
825        // Return writer.
826        Ok(writer)
827    }
828
829    /// <https://streams.spec.whatwg.org/#writable-stream-update-backpressure>
830    pub(crate) fn update_backpressure(
831        &self,
832        backpressure: bool,
833        global: &GlobalScope,
834        can_gc: CanGc,
835    ) {
836        // Assert: stream.[[state]] is "writable".
837        self.is_writable();
838
839        // Assert: ! WritableStreamCloseQueuedOrInFlight(stream) is false.
840        assert!(!self.close_queued_or_in_flight());
841
842        // Let writer be stream.[[writer]].
843        let writer = self.get_writer();
844
845        if let Some(writer) = writer {
846            // If writer is not undefined
847            if backpressure != self.get_backpressure() {
848                // and backpressure is not stream.[[backpressure]],
849                if backpressure {
850                    // If backpressure is true, set writer.[[readyPromise]] to a new promise.
851                    let promise = Promise::new(global, can_gc);
852                    writer.set_ready_promise(promise);
853                } else {
854                    // Otherwise,
855                    // Assert: backpressure is false.
856                    assert!(!backpressure);
857                    // Resolve writer.[[readyPromise]] with undefined.
858                    writer.resolve_ready_promise_with_undefined(can_gc);
859                }
860            }
861        }
862
863        // Set stream.[[backpressure]] to backpressure.
864        self.set_backpressure(backpressure);
865    }
866
867    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
868    pub(crate) fn setup_cross_realm_transform_writable(
869        &self,
870        cx: SafeJSContext,
871        port: &MessagePort,
872        can_gc: CanGc,
873    ) {
874        let port_id = port.message_port_id();
875        let global = self.global();
876
877        // Perform ! InitializeWritableStream(stream).
878        // Done in `new_inherited`.
879
880        // Let sizeAlgorithm be an algorithm that returns 1.
881        // Re-ordered because of the need to pass it to `new`.
882        let size_algorithm = extract_size_algorithm(&QueuingStrategy::default(), can_gc);
883
884        // Note: other algorithms defined in the controller at call site.
885
886        // Let backpressurePromise be a new promise.
887        let backpressure_promise = Rc::new(RefCell::new(Some(Promise::new(&global, can_gc))));
888
889        // Let controller be a new WritableStreamDefaultController.
890        let controller = WritableStreamDefaultController::new(
891            &global,
892            UnderlyingSinkType::Transfer {
893                backpressure_promise: backpressure_promise.clone(),
894                port: Dom::from_ref(port),
895            },
896            1.0,
897            size_algorithm,
898            can_gc,
899        );
900
901        // Add a handler for port’s message event with the following steps:
902        // Add a handler for port’s messageerror event with the following steps:
903        rooted!(in(*cx) let cross_realm_transform_writable = CrossRealmTransformWritable {
904            controller: Dom::from_ref(&controller),
905            backpressure_promise: backpressure_promise.clone(),
906        });
907        global.note_cross_realm_transform_writable(&cross_realm_transform_writable, port_id);
908
909        // Enable port’s port message queue.
910        port.Start(can_gc);
911
912        // Perform ! SetUpWritableStreamDefaultController
913        controller
914            .setup(cx, &global, self, can_gc)
915            .expect("Setup for transfer cannot fail");
916    }
917    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink>
918    #[allow(clippy::too_many_arguments)]
919    pub(crate) fn setup_from_underlying_sink(
920        &self,
921        cx: SafeJSContext,
922        global: &GlobalScope,
923        stream: &WritableStream,
924        underlying_sink_obj: SafeHandleObject,
925        underlying_sink: &UnderlyingSink,
926        strategy_hwm: f64,
927        strategy_size: Rc<QueuingStrategySize>,
928        can_gc: CanGc,
929    ) -> Result<(), Error> {
930        // Let controller be a new WritableStreamDefaultController.
931
932        // Let startAlgorithm be an algorithm that returns undefined.
933
934        // Let writeAlgorithm be an algorithm that returns a promise resolved with undefined.
935
936        // Let closeAlgorithm be an algorithm that returns a promise resolved with undefined.
937
938        // Let abortAlgorithm be an algorithm that returns a promise resolved with undefined.
939
940        // If underlyingSinkDict["start"] exists, then set startAlgorithm to an algorithm which
941        // returns the result of invoking underlyingSinkDict["start"] with argument
942        // list « controller », exception behavior "rethrow", and callback this value underlyingSink.
943
944        // If underlyingSinkDict["write"] exists, then set writeAlgorithm to an algorithm which
945        // takes an argument chunk and returns the result of invoking underlyingSinkDict["write"]
946        // with argument list « chunk, controller » and callback this value underlyingSink.
947
948        // If underlyingSinkDict["close"] exists, then set closeAlgorithm to an algorithm which
949        // returns the result of invoking underlyingSinkDict["close"] with argument
950        // list «» and callback this value underlyingSink.
951
952        // If underlyingSinkDict["abort"] exists, then set abortAlgorithm to an algorithm which
953        // takes an argument reason and returns the result of invoking underlyingSinkDict["abort"]
954        // with argument list « reason » and callback this value underlyingSink.
955        let controller = WritableStreamDefaultController::new(
956            global,
957            UnderlyingSinkType::new_js(
958                underlying_sink.abort.clone(),
959                underlying_sink.start.clone(),
960                underlying_sink.close.clone(),
961                underlying_sink.write.clone(),
962            ),
963            strategy_hwm,
964            strategy_size,
965            can_gc,
966        );
967
968        // Note: this must be done before `setup`,
969        // otherwise `thisOb` is null in the start callback.
970        controller.set_underlying_sink_this_object(underlying_sink_obj);
971
972        // Perform ? SetUpWritableStreamDefaultController
973        controller.setup(cx, global, stream, can_gc)
974    }
975}
976
977/// <https://streams.spec.whatwg.org/#create-writable-stream>
978#[cfg_attr(crown, allow(crown::unrooted_must_root))]
979pub(crate) fn create_writable_stream(
980    cx: SafeJSContext,
981    global: &GlobalScope,
982    writable_high_water_mark: f64,
983    writable_size_algorithm: Rc<QueuingStrategySize>,
984    underlying_sink_type: UnderlyingSinkType,
985    can_gc: CanGc,
986) -> Fallible<DomRoot<WritableStream>> {
987    // Assert: ! IsNonNegativeNumber(highWaterMark) is true.
988    assert!(writable_high_water_mark >= 0.0);
989
990    // Let stream be a new WritableStream.
991    // Perform ! InitializeWritableStream(stream).
992    let stream = WritableStream::new_with_proto(global, None, can_gc);
993
994    // Let controller be a new WritableStreamDefaultController.
995    let controller = WritableStreamDefaultController::new(
996        global,
997        underlying_sink_type,
998        writable_high_water_mark,
999        writable_size_algorithm,
1000        can_gc,
1001    );
1002
1003    // Perform ? SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm,
1004    // closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm).
1005    controller.setup(cx, global, &stream, can_gc)?;
1006
1007    // Return stream.
1008    Ok(stream)
1009}
1010
1011impl WritableStreamMethods<crate::DomTypeHolder> for WritableStream {
1012    /// <https://streams.spec.whatwg.org/#ws-constructor>
1013    fn Constructor(
1014        cx: SafeJSContext,
1015        global: &GlobalScope,
1016        proto: Option<SafeHandleObject>,
1017        can_gc: CanGc,
1018        underlying_sink: Option<*mut JSObject>,
1019        strategy: &QueuingStrategy,
1020    ) -> Fallible<DomRoot<WritableStream>> {
1021        // If underlyingSink is missing, set it to null.
1022        rooted!(in(*cx) let underlying_sink_obj = underlying_sink.unwrap_or(ptr::null_mut()));
1023
1024        // Let underlyingSinkDict be underlyingSink,
1025        // converted to an IDL value of type UnderlyingSink.
1026        let underlying_sink_dict = if !underlying_sink_obj.is_null() {
1027            rooted!(in(*cx) let obj_val = ObjectValue(underlying_sink_obj.get()));
1028            match UnderlyingSink::new(cx, obj_val.handle()) {
1029                Ok(ConversionResult::Success(val)) => val,
1030                Ok(ConversionResult::Failure(error)) => return Err(Error::Type(error.to_string())),
1031                _ => {
1032                    return Err(Error::JSFailed);
1033                },
1034            }
1035        } else {
1036            UnderlyingSink::empty()
1037        };
1038
1039        if !underlying_sink_dict.type_.handle().is_undefined() {
1040            // If underlyingSinkDict["type"] exists, throw a RangeError exception.
1041            return Err(Error::Range("type is set".to_string()));
1042        }
1043
1044        // Perform ! InitializeWritableStream(this).
1045        let stream = WritableStream::new_with_proto(global, proto, can_gc);
1046
1047        // Let sizeAlgorithm be ! ExtractSizeAlgorithm(strategy).
1048        let size_algorithm = extract_size_algorithm(strategy, can_gc);
1049
1050        // Let highWaterMark be ? ExtractHighWaterMark(strategy, 1).
1051        let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
1052
1053        // Perform ? SetUpWritableStreamDefaultControllerFromUnderlyingSink(this, underlyingSink,
1054        // underlyingSinkDict, highWaterMark, sizeAlgorithm).
1055        stream.setup_from_underlying_sink(
1056            cx,
1057            global,
1058            &stream,
1059            underlying_sink_obj.handle(),
1060            &underlying_sink_dict,
1061            high_water_mark,
1062            size_algorithm,
1063            can_gc,
1064        )?;
1065
1066        Ok(stream)
1067    }
1068
1069    /// <https://streams.spec.whatwg.org/#ws-locked>
1070    fn Locked(&self) -> bool {
1071        // Return ! IsWritableStreamLocked(this).
1072        self.is_locked()
1073    }
1074
1075    /// <https://streams.spec.whatwg.org/#ws-abort>
1076    fn Abort(
1077        &self,
1078        cx: SafeJSContext,
1079        reason: SafeHandleValue,
1080        realm: InRealm,
1081        can_gc: CanGc,
1082    ) -> Rc<Promise> {
1083        let global = GlobalScope::from_safe_context(cx, realm);
1084
1085        // If ! IsWritableStreamLocked(this) is true,
1086        if self.is_locked() {
1087            // return a promise rejected with a TypeError exception.
1088            let promise = Promise::new(&global, can_gc);
1089            promise.reject_error(Error::Type("Stream is locked.".to_string()), can_gc);
1090            return promise;
1091        }
1092
1093        // Return ! WritableStreamAbort(this, reason).
1094        self.abort(cx, &global, reason, realm, can_gc)
1095    }
1096
1097    /// <https://streams.spec.whatwg.org/#ws-close>
1098    fn Close(&self, realm: InRealm, can_gc: CanGc) -> Rc<Promise> {
1099        let cx = GlobalScope::get_cx();
1100        let global = GlobalScope::from_safe_context(cx, realm);
1101
1102        // If ! IsWritableStreamLocked(this) is true,
1103        if self.is_locked() {
1104            // return a promise rejected with a TypeError exception.
1105            let promise = Promise::new(&global, can_gc);
1106            promise.reject_error(Error::Type("Stream is locked.".to_string()), can_gc);
1107            return promise;
1108        }
1109
1110        // If ! WritableStreamCloseQueuedOrInFlight(this) is true
1111        if self.close_queued_or_in_flight() {
1112            // return a promise rejected with a TypeError exception.
1113            let promise = Promise::new(&global, can_gc);
1114            promise.reject_error(
1115                Error::Type("Stream has closed queued or in-flight".to_string()),
1116                can_gc,
1117            );
1118            return promise;
1119        }
1120
1121        // Return ! WritableStreamClose(this).
1122        self.close(cx, &global, can_gc)
1123    }
1124
1125    /// <https://streams.spec.whatwg.org/#ws-get-writer>
1126    fn GetWriter(
1127        &self,
1128        realm: InRealm,
1129        can_gc: CanGc,
1130    ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
1131        let cx = GlobalScope::get_cx();
1132        let global = GlobalScope::from_safe_context(cx, realm);
1133
1134        // Return ? AcquireWritableStreamDefaultWriter(this).
1135        self.aquire_default_writer(cx, &global, can_gc)
1136    }
1137}
1138
1139impl js::gc::Rootable for CrossRealmTransformWritable {}
1140
1141/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
1142/// A wrapper to handle `message` and `messageerror` events
1143/// for the port used by the transfered stream.
1144#[derive(Clone, JSTraceable, MallocSizeOf)]
1145#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
1146pub(crate) struct CrossRealmTransformWritable {
1147    /// The controller used in the algorithm.
1148    controller: Dom<WritableStreamDefaultController>,
1149
1150    /// The `backpressurePromise` used in the algorithm.
1151    #[ignore_malloc_size_of = "Rc is hard"]
1152    backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
1153}
1154
1155impl CrossRealmTransformWritable {
1156    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
1157    /// Add a handler for port’s message event with the following steps:
1158    #[allow(unsafe_code)]
1159    pub(crate) fn handle_message(
1160        &self,
1161        cx: SafeJSContext,
1162        global: &GlobalScope,
1163        message: SafeHandleValue,
1164        _realm: InRealm,
1165        can_gc: CanGc,
1166    ) {
1167        rooted!(in(*cx) let mut value = UndefinedValue());
1168        let type_string =
1169            unsafe { get_type_and_value_from_message(cx, message, value.handle_mut(), can_gc) };
1170
1171        // If type is "pull",
1172        // Done below as the steps are the same for both types.
1173
1174        // Otherwise, if type is "error",
1175        if type_string == "error" {
1176            // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, value).
1177            self.controller
1178                .error_if_needed(cx, value.handle(), global, can_gc);
1179        }
1180
1181        let backpressure_promise = self.backpressure_promise.borrow_mut().take();
1182
1183        // Note: the below steps are for both "pull" and "error" types.
1184        // If backpressurePromise is not undefined,
1185        if let Some(promise) = backpressure_promise {
1186            // Resolve backpressurePromise with undefined.
1187            promise.resolve_native(&(), can_gc);
1188
1189            // Set backpressurePromise to undefined.
1190            // Done above with `take`.
1191        }
1192    }
1193
1194    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
1195    /// Add a handler for port’s messageerror event with the following steps:
1196    pub(crate) fn handle_error(
1197        &self,
1198        cx: SafeJSContext,
1199        global: &GlobalScope,
1200        port: &MessagePort,
1201        _realm: InRealm,
1202        can_gc: CanGc,
1203    ) {
1204        // Let error be a new "DataCloneError" DOMException.
1205        let error = DOMException::new(global, DOMErrorName::DataCloneError, can_gc);
1206        rooted!(in(*cx) let mut rooted_error = UndefinedValue());
1207        error.safe_to_jsval(cx, rooted_error.handle_mut());
1208
1209        // Perform ! CrossRealmTransformSendError(port, error).
1210        port.cross_realm_transform_send_error(rooted_error.handle(), can_gc);
1211
1212        // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, error).
1213        self.controller
1214            .error_if_needed(cx, rooted_error.handle(), global, can_gc);
1215
1216        // Disentangle port.
1217        global.disentangle_port(port, can_gc);
1218    }
1219}
1220
1221/// <https://streams.spec.whatwg.org/#ws-transfer>
1222impl Transferable for WritableStream {
1223    type Index = MessagePortIndex;
1224    type Data = MessagePortImpl;
1225
1226    /// <https://streams.spec.whatwg.org/#ref-for-transfer-steps①>
1227    fn transfer(&self) -> Fallible<(MessagePortId, MessagePortImpl)> {
1228        // Step 1. If ! IsWritableStreamLocked(value) is true, throw a
1229        // "DataCloneError" DOMException.
1230        if self.is_locked() {
1231            return Err(Error::DataClone(None));
1232        }
1233
1234        let global = self.global();
1235        let realm = enter_realm(&*global);
1236        let comp = InRealm::Entered(&realm);
1237        let cx = GlobalScope::get_cx();
1238        let can_gc = CanGc::note();
1239
1240        // Step 2. Let port1 be a new MessagePort in the current Realm.
1241        let port_1 = MessagePort::new(&global, can_gc);
1242        global.track_message_port(&port_1, None);
1243
1244        // Step 3. Let port2 be a new MessagePort in the current Realm.
1245        let port_2 = MessagePort::new(&global, can_gc);
1246        global.track_message_port(&port_2, None);
1247
1248        // Step 4. Entangle port1 and port2.
1249        global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
1250
1251        // Step 5. Let readable be a new ReadableStream in the current Realm.
1252        let readable = ReadableStream::new_with_proto(&global, None, can_gc);
1253
1254        // Step 6. Perform ! SetUpCrossRealmTransformReadable(readable, port1).
1255        readable.setup_cross_realm_transform_readable(cx, &port_1, can_gc);
1256
1257        // Step 7. Let promise be ! ReadableStreamPipeTo(readable, value, false, false, false).
1258        let promise = readable.pipe_to(cx, &global, self, false, false, false, None, comp, can_gc);
1259
1260        // Step 8. Set promise.[[PromiseIsHandled]] to true.
1261        promise.set_promise_is_handled();
1262
1263        // Step 9. Set dataHolder.[[port]] to ! StructuredSerializeWithTransfer(port2, « port2 »).
1264        port_2.transfer()
1265    }
1266
1267    /// <https://streams.spec.whatwg.org/#ref-for-transfer-receiving-steps①>
1268    fn transfer_receive(
1269        owner: &GlobalScope,
1270        id: MessagePortId,
1271        port_impl: MessagePortImpl,
1272    ) -> Result<DomRoot<Self>, ()> {
1273        let cx = GlobalScope::get_cx();
1274        let can_gc = CanGc::note();
1275
1276        // Their transfer-receiving steps, given dataHolder and value, are:
1277        // Note: dataHolder is used in `structuredclone.rs`, and value is created here.
1278        let value = WritableStream::new_with_proto(owner, None, can_gc);
1279
1280        // Step 1. Let deserializedRecord be !
1281        // StructuredDeserializeWithTransfer(dataHolder.[[port]], the current
1282        // Realm).
1283        // Done with the `Deserialize` derive of `MessagePortImpl`.
1284
1285        // Step 2. Let port be deserializedRecord.[[Deserialized]].
1286        let transferred_port = MessagePort::transfer_receive(owner, id, port_impl)?;
1287
1288        // Step 3. Perform ! SetUpCrossRealmTransformWritable(value, port).
1289        value.setup_cross_realm_transform_writable(cx, &transferred_port, can_gc);
1290        Ok(value)
1291    }
1292
1293    /// Note: we are relying on the port transfer, so the data returned here are related to the port.
1294    fn serialized_storage<'a>(
1295        data: StructuredData<'a, '_>,
1296    ) -> &'a mut Option<HashMap<MessagePortId, Self::Data>> {
1297        match data {
1298            StructuredData::Reader(r) => &mut r.port_impls,
1299            StructuredData::Writer(w) => &mut w.ports,
1300        }
1301    }
1302}