script/dom/
writablestream.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::mem;
8use std::ptr::{self};
9use std::rc::Rc;
10
11use base::id::{MessagePortId, MessagePortIndex};
12use constellation_traits::MessagePortImpl;
13use dom_struct::dom_struct;
14use js::jsapi::{Heap, JSObject};
15use js::jsval::{JSVal, ObjectValue, UndefinedValue};
16use js::rust::{
17    HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
18    MutableHandleValue as SafeMutableHandleValue,
19};
20use rustc_hash::FxHashMap;
21use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
22use script_bindings::conversions::SafeToJSValConvertible;
23
24use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
25use crate::dom::bindings::cell::DomRefCell;
26use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
27use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::UnderlyingSink;
28use crate::dom::bindings::codegen::Bindings::WritableStreamBinding::WritableStreamMethods;
29use crate::dom::bindings::conversions::ConversionResult;
30use crate::dom::bindings::error::{Error, Fallible};
31use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
32use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
33use crate::dom::bindings::structuredclone::StructuredData;
34use crate::dom::bindings::transferable::Transferable;
35use crate::dom::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
36use crate::dom::domexception::{DOMErrorName, DOMException};
37use crate::dom::globalscope::GlobalScope;
38use crate::dom::messageport::MessagePort;
39use crate::dom::promise::Promise;
40use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
41use crate::dom::readablestream::{ReadableStream, get_type_and_value_from_message};
42use crate::dom::writablestreamdefaultcontroller::{
43    UnderlyingSinkType, WritableStreamDefaultController,
44};
45use crate::dom::writablestreamdefaultwriter::WritableStreamDefaultWriter;
46use crate::realms::{InRealm, enter_realm};
47use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
48
49impl js::gc::Rootable for AbortAlgorithmFulfillmentHandler {}
50
51/// The fulfillment handler for the abort steps of
52/// <https://streams.spec.whatwg.org/#writable-stream-finish-erroring>
53#[derive(JSTraceable, MallocSizeOf)]
54#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
55struct AbortAlgorithmFulfillmentHandler {
56    stream: Dom<WritableStream>,
57    #[ignore_malloc_size_of = "Rc is hard"]
58    abort_request_promise: Rc<Promise>,
59}
60
61impl Callback for AbortAlgorithmFulfillmentHandler {
62    fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
63        // Resolve abortRequest’s promise with undefined.
64        self.abort_request_promise.resolve_native(&(), can_gc);
65
66        // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
67        self.stream
68            .as_rooted()
69            .reject_close_and_closed_promise_if_needed(cx, can_gc);
70    }
71}
72
73impl js::gc::Rootable for AbortAlgorithmRejectionHandler {}
74
75/// The rejection handler for the abort steps of
76/// <https://streams.spec.whatwg.org/#writable-stream-finish-erroring>
77#[derive(JSTraceable, MallocSizeOf)]
78#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
79struct AbortAlgorithmRejectionHandler {
80    stream: Dom<WritableStream>,
81    #[ignore_malloc_size_of = "Rc is hard"]
82    abort_request_promise: Rc<Promise>,
83}
84
85impl Callback for AbortAlgorithmRejectionHandler {
86    fn callback(&self, cx: SafeJSContext, reason: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
87        // Reject abortRequest’s promise with reason.
88        self.abort_request_promise.reject_native(&reason, can_gc);
89
90        // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
91        self.stream
92            .as_rooted()
93            .reject_close_and_closed_promise_if_needed(cx, can_gc);
94    }
95}
96
97impl js::gc::Rootable for PendingAbortRequest {}
98
99/// <https://streams.spec.whatwg.org/#pending-abort-request>
100#[derive(JSTraceable, MallocSizeOf)]
101#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
102struct PendingAbortRequest {
103    /// <https://streams.spec.whatwg.org/#pending-abort-request-promise>
104    #[ignore_malloc_size_of = "Rc is hard"]
105    promise: Rc<Promise>,
106
107    /// <https://streams.spec.whatwg.org/#pending-abort-request-reason>
108    #[ignore_malloc_size_of = "mozjs"]
109    reason: Box<Heap<JSVal>>,
110
111    /// <https://streams.spec.whatwg.org/#pending-abort-request-was-already-erroring>
112    was_already_erroring: bool,
113}
114
115/// <https://streams.spec.whatwg.org/#writablestream-state>
116#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf)]
117pub(crate) enum WritableStreamState {
118    #[default]
119    Writable,
120    Closed,
121    Erroring,
122    Errored,
123}
124
125/// <https://streams.spec.whatwg.org/#ws-class>
126#[dom_struct]
127pub struct WritableStream {
128    reflector_: Reflector,
129
130    /// <https://streams.spec.whatwg.org/#writablestream-backpressure>
131    backpressure: Cell<bool>,
132
133    /// <https://streams.spec.whatwg.org/#writablestream-closerequest>
134    #[ignore_malloc_size_of = "Rc is hard"]
135    close_request: DomRefCell<Option<Rc<Promise>>>,
136
137    /// <https://streams.spec.whatwg.org/#writablestream-controller>
138    controller: MutNullableDom<WritableStreamDefaultController>,
139
140    /// <https://streams.spec.whatwg.org/#writablestream-detached>
141    detached: Cell<bool>,
142
143    /// <https://streams.spec.whatwg.org/#writablestream-inflightwriterequest>
144    #[ignore_malloc_size_of = "Rc is hard"]
145    in_flight_write_request: DomRefCell<Option<Rc<Promise>>>,
146
147    /// <https://streams.spec.whatwg.org/#writablestream-inflightcloserequest>
148    #[ignore_malloc_size_of = "Rc is hard"]
149    in_flight_close_request: DomRefCell<Option<Rc<Promise>>>,
150
151    /// <https://streams.spec.whatwg.org/#writablestream-pendingabortrequest>
152    pending_abort_request: DomRefCell<Option<PendingAbortRequest>>,
153
154    /// <https://streams.spec.whatwg.org/#writablestream-state>
155    state: Cell<WritableStreamState>,
156
157    /// <https://streams.spec.whatwg.org/#writablestream-storederror>
158    #[ignore_malloc_size_of = "mozjs"]
159    stored_error: Heap<JSVal>,
160
161    /// <https://streams.spec.whatwg.org/#writablestream-writer>
162    writer: MutNullableDom<WritableStreamDefaultWriter>,
163
164    /// <https://streams.spec.whatwg.org/#writablestream-writerequests>
165    #[ignore_malloc_size_of = "Rc is hard"]
166    write_requests: DomRefCell<VecDeque<Rc<Promise>>>,
167}
168
169impl WritableStream {
170    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
171    /// <https://streams.spec.whatwg.org/#initialize-writable-stream>
172    fn new_inherited() -> WritableStream {
173        WritableStream {
174            reflector_: Reflector::new(),
175            backpressure: Default::default(),
176            close_request: Default::default(),
177            controller: Default::default(),
178            detached: Default::default(),
179            in_flight_write_request: Default::default(),
180            in_flight_close_request: Default::default(),
181            pending_abort_request: Default::default(),
182            state: Default::default(),
183            stored_error: Default::default(),
184            writer: Default::default(),
185            write_requests: Default::default(),
186        }
187    }
188
189    pub(crate) fn new_with_proto(
190        global: &GlobalScope,
191        proto: Option<SafeHandleObject>,
192        can_gc: CanGc,
193    ) -> DomRoot<WritableStream> {
194        reflect_dom_object_with_proto(
195            Box::new(WritableStream::new_inherited()),
196            global,
197            proto,
198            can_gc,
199        )
200    }
201
202    /// Used as part of
203    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
204    pub(crate) fn assert_no_controller(&self) {
205        assert!(self.controller.get().is_none());
206    }
207
208    /// Used as part of
209    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
210    pub(crate) fn set_default_controller(&self, controller: &WritableStreamDefaultController) {
211        self.controller.set(Some(controller));
212    }
213
214    #[allow(unused)]
215    pub(crate) fn get_default_controller(&self) -> DomRoot<WritableStreamDefaultController> {
216        self.controller.get().expect("Controller should be set.")
217    }
218
219    pub(crate) fn is_writable(&self) -> bool {
220        matches!(self.state.get(), WritableStreamState::Writable)
221    }
222
223    pub(crate) fn is_erroring(&self) -> bool {
224        matches!(self.state.get(), WritableStreamState::Erroring)
225    }
226
227    pub(crate) fn is_errored(&self) -> bool {
228        matches!(self.state.get(), WritableStreamState::Errored)
229    }
230
231    pub(crate) fn is_closed(&self) -> bool {
232        matches!(self.state.get(), WritableStreamState::Closed)
233    }
234
235    pub(crate) fn has_in_flight_write_request(&self) -> bool {
236        self.in_flight_write_request.borrow().is_some()
237    }
238
239    /// <https://streams.spec.whatwg.org/#writable-stream-has-operation-marked-in-flight>
240    pub(crate) fn has_operations_marked_inflight(&self) -> bool {
241        let in_flight_write_requested = self.in_flight_write_request.borrow().is_some();
242        let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
243
244        in_flight_write_requested || in_flight_close_requested
245    }
246
247    /// <https://streams.spec.whatwg.org/#writablestream-storederror>
248    pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
249        handle_mut.set(self.stored_error.get());
250    }
251
252    /// <https://streams.spec.whatwg.org/#writable-stream-finish-erroring>
253    pub(crate) fn finish_erroring(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
254        // Assert: stream.[[state]] is "erroring".
255        assert!(self.is_erroring());
256
257        // Assert: ! WritableStreamHasOperationMarkedInFlight(stream) is false.
258        assert!(!self.has_operations_marked_inflight());
259
260        // Set stream.[[state]] to "errored".
261        self.state.set(WritableStreamState::Errored);
262
263        // Perform ! stream.[[controller]].[[ErrorSteps]]().
264        let Some(controller) = self.controller.get() else {
265            unreachable!("Stream should have a controller.");
266        };
267        controller.perform_error_steps();
268
269        // Let storedError be stream.[[storedError]].
270        rooted!(in(*cx) let mut stored_error = UndefinedValue());
271        self.get_stored_error(stored_error.handle_mut());
272
273        // For each writeRequest of stream.[[writeRequests]]:
274        let write_requests = mem::take(&mut *self.write_requests.borrow_mut());
275        for request in write_requests {
276            // Reject writeRequest with storedError.
277            request.reject(cx, stored_error.handle(), can_gc);
278        }
279
280        // Set stream.[[writeRequests]] to an empty list.
281        // Done above with `drain`.
282
283        // If stream.[[pendingAbortRequest]] is undefined,
284        if self.pending_abort_request.borrow().is_none() {
285            // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
286            self.reject_close_and_closed_promise_if_needed(cx, can_gc);
287
288            // Return.
289            return;
290        }
291
292        // Let abortRequest be stream.[[pendingAbortRequest]].
293        // Set stream.[[pendingAbortRequest]] to undefined.
294        rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
295        if let Some(pending_abort_request) = &*pending_abort_request {
296            // If abortRequest’s was already erroring is true,
297            if pending_abort_request.was_already_erroring {
298                // Reject abortRequest’s promise with storedError.
299                pending_abort_request
300                    .promise
301                    .reject(cx, stored_error.handle(), can_gc);
302
303                // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
304                self.reject_close_and_closed_promise_if_needed(cx, can_gc);
305
306                // Return.
307                return;
308            }
309
310            // Let promise be ! stream.[[controller]].[[AbortSteps]](abortRequest’s reason).
311            rooted!(in(*cx) let mut reason = UndefinedValue());
312            reason.set(pending_abort_request.reason.get());
313            let promise = controller.abort_steps(cx, global, reason.handle(), can_gc);
314
315            // Upon fulfillment of promise,
316            rooted!(in(*cx) let mut fulfillment_handler = Some(AbortAlgorithmFulfillmentHandler {
317                stream: Dom::from_ref(self),
318                abort_request_promise: pending_abort_request.promise.clone(),
319            }));
320
321            // Upon rejection of promise with reason r,
322            rooted!(in(*cx) let mut rejection_handler = Some(AbortAlgorithmRejectionHandler {
323                stream: Dom::from_ref(self),
324                abort_request_promise: pending_abort_request.promise.clone(),
325            }));
326
327            let handler = PromiseNativeHandler::new(
328                global,
329                fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
330                rejection_handler.take().map(|h| Box::new(h) as Box<_>),
331                can_gc,
332            );
333            let realm = enter_realm(global);
334            let comp = InRealm::Entered(&realm);
335            promise.append_native_handler(&handler, comp, can_gc);
336        }
337    }
338
339    /// <https://streams.spec.whatwg.org/#writable-stream-reject-close-and-closed-promise-if-needed>
340    fn reject_close_and_closed_promise_if_needed(&self, cx: SafeJSContext, can_gc: CanGc) {
341        // Assert: stream.[[state]] is "errored".
342        assert!(self.is_errored());
343
344        rooted!(in(*cx) let mut stored_error = UndefinedValue());
345        self.get_stored_error(stored_error.handle_mut());
346
347        // If stream.[[closeRequest]] is not undefined
348        let close_request = self.close_request.borrow_mut().take();
349        if let Some(close_request) = close_request {
350            // Assert: stream.[[inFlightCloseRequest]] is undefined.
351            assert!(self.in_flight_close_request.borrow().is_none());
352
353            // Reject stream.[[closeRequest]] with stream.[[storedError]].
354            close_request.reject_native(&stored_error.handle(), can_gc)
355
356            // Set stream.[[closeRequest]] to undefined.
357            // Done with `take` above.
358        }
359
360        // Let writer be stream.[[writer]].
361        // If writer is not undefined,
362        if let Some(writer) = self.writer.get() {
363            // Reject writer.[[closedPromise]] with stream.[[storedError]].
364            writer.reject_closed_promise_with_stored_error(&stored_error.handle(), can_gc);
365
366            // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
367            writer.set_close_promise_is_handled();
368        }
369    }
370
371    /// <https://streams.spec.whatwg.org/#writable-stream-close-queued-or-in-flight>
372    pub(crate) fn close_queued_or_in_flight(&self) -> bool {
373        let close_requested = self.close_request.borrow().is_some();
374        let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
375
376        close_requested || in_flight_close_requested
377    }
378
379    /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-write>
380    pub(crate) fn finish_in_flight_write(&self, can_gc: CanGc) {
381        let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
382            // Assert: stream.[[inFlightWriteRequest]] is not undefined.
383            unreachable!("Stream should have a write request");
384        };
385
386        // Resolve stream.[[inFlightWriteRequest]] with undefined.
387        in_flight_write_request.resolve_native(&(), can_gc);
388
389        // Set stream.[[inFlightWriteRequest]] to undefined.
390        // Done above with `take`.
391    }
392
393    /// <https://streams.spec.whatwg.org/#writable-stream-start-erroring>
394    pub(crate) fn start_erroring(
395        &self,
396        cx: SafeJSContext,
397        global: &GlobalScope,
398        error: SafeHandleValue,
399        can_gc: CanGc,
400    ) {
401        // Assert: stream.[[storedError]] is undefined.
402        assert!(self.stored_error.get().is_undefined());
403
404        // Assert: stream.[[state]] is "writable".
405        assert!(self.is_writable());
406
407        // Let controller be stream.[[controller]].
408        let Some(controller) = self.controller.get() else {
409            // Assert: controller is not undefined.
410            unreachable!("Stream should have a controller.");
411        };
412
413        // Set stream.[[state]] to "erroring".
414        self.state.set(WritableStreamState::Erroring);
415
416        // Set stream.[[storedError]] to reason.
417        self.stored_error.set(*error);
418
419        // Let writer be stream.[[writer]].
420        if let Some(writer) = self.writer.get() {
421            // If writer is not undefined, perform ! WritableStreamDefaultWriterEnsureReadyPromiseRejected
422            writer.ensure_ready_promise_rejected(global, error, can_gc);
423        }
424
425        // If ! WritableStreamHasOperationMarkedInFlight(stream) is false and controller.[[started]] is true
426        if !self.has_operations_marked_inflight() && controller.started() {
427            // perform ! WritableStreamFinishErroring
428            self.finish_erroring(cx, global, can_gc);
429        }
430    }
431
432    /// <https://streams.spec.whatwg.org/#writable-stream-deal-with-rejection>
433    pub(crate) fn deal_with_rejection(
434        &self,
435        cx: SafeJSContext,
436        global: &GlobalScope,
437        error: SafeHandleValue,
438        can_gc: CanGc,
439    ) {
440        // Let state be stream.[[state]].
441
442        // If state is "writable",
443        if self.is_writable() {
444            // Perform ! WritableStreamStartErroring(stream, error).
445            self.start_erroring(cx, global, error, can_gc);
446
447            // Return.
448            return;
449        }
450
451        // Assert: state is "erroring".
452        assert!(self.is_erroring());
453
454        // Perform ! WritableStreamFinishErroring(stream).
455        self.finish_erroring(cx, global, can_gc);
456    }
457
458    /// <https://streams.spec.whatwg.org/#writable-stream-mark-first-write-request-in-flight>
459    pub(crate) fn mark_first_write_request_in_flight(&self) {
460        let mut in_flight_write_request = self.in_flight_write_request.borrow_mut();
461        let mut write_requests = self.write_requests.borrow_mut();
462
463        // Assert: stream.[[inFlightWriteRequest]] is undefined.
464        assert!(in_flight_write_request.is_none());
465
466        // Assert: stream.[[writeRequests]] is not empty.
467        assert!(!write_requests.is_empty());
468
469        // Let writeRequest be stream.[[writeRequests]][0].
470        // Remove writeRequest from stream.[[writeRequests]].
471        let write_request = write_requests.pop_front().unwrap();
472
473        // Set stream.[[inFlightWriteRequest]] to writeRequest.
474        *in_flight_write_request = Some(write_request);
475    }
476
477    /// <https://streams.spec.whatwg.org/#writable-stream-mark-close-request-in-flight>
478    pub(crate) fn mark_close_request_in_flight(&self) {
479        let mut in_flight_close_request = self.in_flight_close_request.borrow_mut();
480        let mut close_request = self.close_request.borrow_mut();
481
482        // Assert: stream.[[inFlightCloseRequest]] is undefined.
483        assert!(in_flight_close_request.is_none());
484
485        // Assert: stream.[[closeRequest]] is not undefined.
486        assert!(close_request.is_some());
487
488        // Let closeRequest be stream.[[closeRequest]].
489        // Set stream.[[closeRequest]] to undefined.
490        let close_request = close_request.take().unwrap();
491
492        // Set stream.[[inFlightCloseRequest]] to closeRequest.
493        *in_flight_close_request = Some(close_request);
494    }
495
496    /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-close>
497    pub(crate) fn finish_in_flight_close(&self, cx: SafeJSContext, can_gc: CanGc) {
498        let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
499            // Assert: stream.[[inFlightCloseRequest]] is not undefined.
500            unreachable!("in_flight_close_request must be Some");
501        };
502
503        // Resolve stream.[[inFlightCloseRequest]] with undefined.
504        in_flight_close_request.resolve_native(&(), can_gc);
505
506        // Set stream.[[inFlightCloseRequest]] to undefined.
507        // Done with take above.
508
509        // Assert: stream.[[state]] is "writable" or "erroring".
510        assert!(self.is_writable() || self.is_erroring());
511
512        // If state is "erroring",
513        if self.is_erroring() {
514            // Set stream.[[storedError]] to undefined.
515            self.stored_error.set(UndefinedValue());
516
517            // If stream.[[pendingAbortRequest]] is not undefined,
518            rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
519            if let Some(pending_abort_request) = &*pending_abort_request {
520                // Resolve stream.[[pendingAbortRequest]]'s promise with undefined.
521                pending_abort_request.promise.resolve_native(&(), can_gc);
522
523                // Set stream.[[pendingAbortRequest]] to undefined.
524                // Done above with `take`.
525            }
526        }
527
528        // Set stream.[[state]] to "closed".
529        self.state.set(WritableStreamState::Closed);
530
531        // Let writer be stream.[[writer]].
532        if let Some(writer) = self.writer.get() {
533            // If writer is not undefined,
534            // resolve writer.[[closedPromise]] with undefined.
535            writer.resolve_closed_promise_with_undefined(can_gc);
536        }
537
538        // Assert: stream.[[pendingAbortRequest]] is undefined.
539        assert!(self.pending_abort_request.borrow().is_none());
540
541        // Assert: stream.[[storedError]] is undefined.
542        assert!(self.stored_error.get().is_undefined());
543    }
544
545    /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-close-with-error>
546    pub(crate) fn finish_in_flight_close_with_error(
547        &self,
548        cx: SafeJSContext,
549        global: &GlobalScope,
550        error: SafeHandleValue,
551        can_gc: CanGc,
552    ) {
553        let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
554            // Assert: stream.[[inFlightCloseRequest]] is not undefined.
555            unreachable!("Inflight close request must be defined.");
556        };
557
558        // Reject stream.[[inFlightCloseRequest]] with error.
559        in_flight_close_request.reject_native(&error, can_gc);
560
561        // Set stream.[[inFlightCloseRequest]] to undefined.
562        // Done above with `take`.
563
564        // Assert: stream.[[state]] is "writable" or "erroring".
565        assert!(self.is_erroring() || self.is_writable());
566
567        // If stream.[[pendingAbortRequest]] is not undefined,
568        rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
569        if let Some(pending_abort_request) = &*pending_abort_request {
570            // Reject stream.[[pendingAbortRequest]]'s promise with error.
571            pending_abort_request.promise.reject_native(&error, can_gc);
572
573            // Set stream.[[pendingAbortRequest]] to undefined.
574            // Done above with `take`.
575        }
576
577        // Perform ! WritableStreamDealWithRejection(stream, error).
578        self.deal_with_rejection(cx, global, error, can_gc);
579    }
580
581    /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-write-with-error>
582    pub(crate) fn finish_in_flight_write_with_error(
583        &self,
584        cx: SafeJSContext,
585        global: &GlobalScope,
586        error: SafeHandleValue,
587        can_gc: CanGc,
588    ) {
589        let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
590            // Assert: stream.[[inFlightWriteRequest]] is not undefined.
591            unreachable!("Inflight write request must be defined.");
592        };
593
594        // Reject stream.[[inFlightWriteRequest]] with error.
595        in_flight_write_request.reject_native(&error, can_gc);
596
597        // Set stream.[[inFlightWriteRequest]] to undefined.
598        // Done above with `take`.
599
600        // Assert: stream.[[state]] is "writable" or "erroring".
601        assert!(self.is_erroring() || self.is_writable());
602
603        // Perform ! WritableStreamDealWithRejection(stream, error).
604        self.deal_with_rejection(cx, global, error, can_gc);
605    }
606
607    pub(crate) fn get_writer(&self) -> Option<DomRoot<WritableStreamDefaultWriter>> {
608        self.writer.get()
609    }
610
611    pub(crate) fn set_writer(&self, writer: Option<&WritableStreamDefaultWriter>) {
612        self.writer.set(writer);
613    }
614
615    pub(crate) fn set_backpressure(&self, backpressure: bool) {
616        self.backpressure.set(backpressure);
617    }
618
619    pub(crate) fn get_backpressure(&self) -> bool {
620        self.backpressure.get()
621    }
622
623    /// <https://streams.spec.whatwg.org/#is-writable-stream-locked>
624    pub(crate) fn is_locked(&self) -> bool {
625        // If stream.[[writer]] is undefined, return false.
626        // Return true.
627        self.get_writer().is_some()
628    }
629
630    /// <https://streams.spec.whatwg.org/#writable-stream-add-write-request>
631    pub(crate) fn add_write_request(&self, global: &GlobalScope, can_gc: CanGc) -> Rc<Promise> {
632        // Assert: ! IsWritableStreamLocked(stream) is true.
633        assert!(self.is_locked());
634
635        // Assert: stream.[[state]] is "writable".
636        assert!(self.is_writable());
637
638        // Let promise be a new promise.
639        let promise = Promise::new(global, can_gc);
640
641        // Append promise to stream.[[writeRequests]].
642        self.write_requests.borrow_mut().push_back(promise.clone());
643
644        // Return promise.
645        promise
646    }
647
648    // Returns the rooted controller of the stream, if any.
649    pub(crate) fn get_controller(&self) -> Option<DomRoot<WritableStreamDefaultController>> {
650        self.controller.get()
651    }
652
653    /// <https://streams.spec.whatwg.org/#writable-stream-abort>
654    pub(crate) fn abort(
655        &self,
656        cx: SafeJSContext,
657        global: &GlobalScope,
658        provided_reason: SafeHandleValue,
659        realm: InRealm,
660        can_gc: CanGc,
661    ) -> Rc<Promise> {
662        // If stream.[[state]] is "closed" or "errored",
663        if self.is_closed() || self.is_errored() {
664            // return a promise resolved with undefined.
665            return Promise::new_resolved(global, cx, (), can_gc);
666        }
667
668        // Signal abort on stream.[[controller]].[[abortController]] with reason.
669        self.get_controller()
670            .expect("Stream must have a controller.")
671            .signal_abort(cx, provided_reason, realm, can_gc);
672
673        // Let state be stream.[[state]].
674        let state = self.state.get();
675
676        // If state is "closed" or "errored", return a promise resolved with undefined.
677        if matches!(
678            state,
679            WritableStreamState::Closed | WritableStreamState::Errored
680        ) {
681            return Promise::new_resolved(global, cx, (), can_gc);
682        }
683
684        // If stream.[[pendingAbortRequest]] is not undefined,
685        if self.pending_abort_request.borrow().is_some() {
686            // return stream.[[pendingAbortRequest]]'s promise.
687            return self
688                .pending_abort_request
689                .borrow()
690                .as_ref()
691                .expect("Pending abort request must be Some.")
692                .promise
693                .clone();
694        }
695
696        // Assert: state is "writable" or "erroring".
697        assert!(self.is_writable() || self.is_erroring());
698
699        // Let wasAlreadyErroring be false.
700        let mut was_already_erroring = false;
701        rooted!(in(*cx) let undefined_reason = UndefinedValue());
702
703        // If state is "erroring",
704        let reason = if self.is_erroring() {
705            // Set wasAlreadyErroring to true.
706            was_already_erroring = true;
707
708            // Set reason to undefined.
709            undefined_reason.handle()
710        } else {
711            // Use the provided reason.
712            provided_reason
713        };
714
715        // Let promise be a new promise.
716        let promise = Promise::new(global, can_gc);
717
718        // Set stream.[[pendingAbortRequest]] to a new pending abort request
719        // whose promise is promise,
720        // reason is reason,
721        // and was already erroring is wasAlreadyErroring.
722        *self.pending_abort_request.borrow_mut() = Some(PendingAbortRequest {
723            promise: promise.clone(),
724            reason: Heap::boxed(reason.get()),
725            was_already_erroring,
726        });
727
728        // If wasAlreadyErroring is false,
729        if !was_already_erroring {
730            // perform ! WritableStreamStartErroring(stream, reason)
731            self.start_erroring(cx, global, reason, can_gc);
732        }
733
734        // Return promise.
735        promise
736    }
737
738    /// <https://streams.spec.whatwg.org/#writable-stream-close>
739    pub(crate) fn close(
740        &self,
741        cx: SafeJSContext,
742        global: &GlobalScope,
743        can_gc: CanGc,
744    ) -> Rc<Promise> {
745        // Let state be stream.[[state]].
746        // If state is "closed" or "errored",
747        if self.is_closed() || self.is_errored() {
748            // return a promise rejected with a TypeError exception.
749            let promise = Promise::new(global, can_gc);
750            promise.reject_error(
751                Error::Type("Stream is closed or errored.".to_string()),
752                can_gc,
753            );
754            return promise;
755        }
756
757        // Assert: state is "writable" or "erroring".
758        assert!(self.is_writable() || self.is_erroring());
759
760        // Assert: ! WritableStreamCloseQueuedOrInFlight(stream) is false.
761        assert!(!self.close_queued_or_in_flight());
762
763        // Let promise be a new promise.
764        let promise = Promise::new(global, can_gc);
765
766        // Set stream.[[closeRequest]] to promise.
767        *self.close_request.borrow_mut() = Some(promise.clone());
768
769        // Let writer be stream.[[writer]].
770        // If writer is not undefined,
771        if let Some(writer) = self.writer.get() {
772            // and stream.[[backpressure]] is true,
773            // and state is "writable",
774            if self.get_backpressure() && self.is_writable() {
775                // resolve writer.[[readyPromise]] with undefined.
776                writer.resolve_ready_promise_with_undefined(can_gc);
777            }
778        }
779
780        // Perform ! WritableStreamDefaultControllerClose(stream.[[controller]]).
781        let Some(controller) = self.controller.get() else {
782            unreachable!("Stream must have a controller.");
783        };
784        controller.close(cx, global, can_gc);
785
786        // Return promise.
787        promise
788    }
789
790    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-get-desired-size>
791    /// Note: implement as a stream method, as opposed to a writer one, for convenience.
792    pub(crate) fn get_desired_size(&self) -> Option<f64> {
793        // Let stream be writer.[[stream]].
794        // Stream is `self`.
795
796        // Let state be stream.[[state]].
797        // If state is "errored" or "erroring", return null.
798        if self.is_errored() || self.is_erroring() {
799            return None;
800        }
801
802        // If state is "closed", return 0.
803        if self.is_closed() {
804            return Some(0.);
805        }
806
807        let Some(controller) = self.controller.get() else {
808            unreachable!("Stream must have a controller.");
809        };
810        Some(controller.get_desired_size())
811    }
812
813    /// <https://streams.spec.whatwg.org/#acquire-writable-stream-default-writer>
814    pub(crate) fn aquire_default_writer(
815        &self,
816        cx: SafeJSContext,
817        global: &GlobalScope,
818        can_gc: CanGc,
819    ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
820        // Let writer be a new WritableStreamDefaultWriter object.
821        let writer = WritableStreamDefaultWriter::new(global, None, can_gc);
822
823        // Perform ? SetUpWritableStreamDefaultWriter(writer, stream).
824        writer.setup(cx, self, can_gc)?;
825
826        // Return writer.
827        Ok(writer)
828    }
829
830    /// <https://streams.spec.whatwg.org/#writable-stream-update-backpressure>
831    pub(crate) fn update_backpressure(
832        &self,
833        backpressure: bool,
834        global: &GlobalScope,
835        can_gc: CanGc,
836    ) {
837        // Assert: stream.[[state]] is "writable".
838        self.is_writable();
839
840        // Assert: ! WritableStreamCloseQueuedOrInFlight(stream) is false.
841        assert!(!self.close_queued_or_in_flight());
842
843        // Let writer be stream.[[writer]].
844        let writer = self.get_writer();
845
846        if let Some(writer) = writer {
847            // If writer is not undefined
848            if backpressure != self.get_backpressure() {
849                // and backpressure is not stream.[[backpressure]],
850                if backpressure {
851                    // If backpressure is true, set writer.[[readyPromise]] to a new promise.
852                    let promise = Promise::new(global, can_gc);
853                    writer.set_ready_promise(promise);
854                } else {
855                    // Otherwise,
856                    // Assert: backpressure is false.
857                    assert!(!backpressure);
858                    // Resolve writer.[[readyPromise]] with undefined.
859                    writer.resolve_ready_promise_with_undefined(can_gc);
860                }
861            }
862        }
863
864        // Set stream.[[backpressure]] to backpressure.
865        self.set_backpressure(backpressure);
866    }
867
868    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
869    pub(crate) fn setup_cross_realm_transform_writable(
870        &self,
871        cx: SafeJSContext,
872        port: &MessagePort,
873        can_gc: CanGc,
874    ) {
875        let port_id = port.message_port_id();
876        let global = self.global();
877
878        // Perform ! InitializeWritableStream(stream).
879        // Done in `new_inherited`.
880
881        // Let sizeAlgorithm be an algorithm that returns 1.
882        // Re-ordered because of the need to pass it to `new`.
883        let size_algorithm = extract_size_algorithm(&QueuingStrategy::default(), can_gc);
884
885        // Note: other algorithms defined in the controller at call site.
886
887        // Let backpressurePromise be a new promise.
888        let backpressure_promise = Rc::new(RefCell::new(Some(Promise::new(&global, can_gc))));
889
890        // Let controller be a new WritableStreamDefaultController.
891        let controller = WritableStreamDefaultController::new(
892            &global,
893            UnderlyingSinkType::Transfer {
894                backpressure_promise: backpressure_promise.clone(),
895                port: Dom::from_ref(port),
896            },
897            1.0,
898            size_algorithm,
899            can_gc,
900        );
901
902        // Add a handler for port’s message event with the following steps:
903        // Add a handler for port’s messageerror event with the following steps:
904        rooted!(in(*cx) let cross_realm_transform_writable = CrossRealmTransformWritable {
905            controller: Dom::from_ref(&controller),
906            backpressure_promise: backpressure_promise.clone(),
907        });
908        global.note_cross_realm_transform_writable(&cross_realm_transform_writable, port_id);
909
910        // Enable port’s port message queue.
911        port.Start(can_gc);
912
913        // Perform ! SetUpWritableStreamDefaultController
914        controller
915            .setup(cx, &global, self, can_gc)
916            .expect("Setup for transfer cannot fail");
917    }
918    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink>
919    #[allow(clippy::too_many_arguments)]
920    pub(crate) fn setup_from_underlying_sink(
921        &self,
922        cx: SafeJSContext,
923        global: &GlobalScope,
924        stream: &WritableStream,
925        underlying_sink_obj: SafeHandleObject,
926        underlying_sink: &UnderlyingSink,
927        strategy_hwm: f64,
928        strategy_size: Rc<QueuingStrategySize>,
929        can_gc: CanGc,
930    ) -> Result<(), Error> {
931        // Let controller be a new WritableStreamDefaultController.
932
933        // Let startAlgorithm be an algorithm that returns undefined.
934
935        // Let writeAlgorithm be an algorithm that returns a promise resolved with undefined.
936
937        // Let closeAlgorithm be an algorithm that returns a promise resolved with undefined.
938
939        // Let abortAlgorithm be an algorithm that returns a promise resolved with undefined.
940
941        // If underlyingSinkDict["start"] exists, then set startAlgorithm to an algorithm which
942        // returns the result of invoking underlyingSinkDict["start"] with argument
943        // list « controller », exception behavior "rethrow", and callback this value underlyingSink.
944
945        // If underlyingSinkDict["write"] exists, then set writeAlgorithm to an algorithm which
946        // takes an argument chunk and returns the result of invoking underlyingSinkDict["write"]
947        // with argument list « chunk, controller » and callback this value underlyingSink.
948
949        // If underlyingSinkDict["close"] exists, then set closeAlgorithm to an algorithm which
950        // returns the result of invoking underlyingSinkDict["close"] with argument
951        // list «» and callback this value underlyingSink.
952
953        // If underlyingSinkDict["abort"] exists, then set abortAlgorithm to an algorithm which
954        // takes an argument reason and returns the result of invoking underlyingSinkDict["abort"]
955        // with argument list « reason » and callback this value underlyingSink.
956        let controller = WritableStreamDefaultController::new(
957            global,
958            UnderlyingSinkType::new_js(
959                underlying_sink.abort.clone(),
960                underlying_sink.start.clone(),
961                underlying_sink.close.clone(),
962                underlying_sink.write.clone(),
963            ),
964            strategy_hwm,
965            strategy_size,
966            can_gc,
967        );
968
969        // Note: this must be done before `setup`,
970        // otherwise `thisOb` is null in the start callback.
971        controller.set_underlying_sink_this_object(underlying_sink_obj);
972
973        // Perform ? SetUpWritableStreamDefaultController
974        controller.setup(cx, global, stream, can_gc)
975    }
976}
977
978/// <https://streams.spec.whatwg.org/#create-writable-stream>
979#[cfg_attr(crown, allow(crown::unrooted_must_root))]
980pub(crate) fn create_writable_stream(
981    cx: SafeJSContext,
982    global: &GlobalScope,
983    writable_high_water_mark: f64,
984    writable_size_algorithm: Rc<QueuingStrategySize>,
985    underlying_sink_type: UnderlyingSinkType,
986    can_gc: CanGc,
987) -> Fallible<DomRoot<WritableStream>> {
988    // Assert: ! IsNonNegativeNumber(highWaterMark) is true.
989    assert!(writable_high_water_mark >= 0.0);
990
991    // Let stream be a new WritableStream.
992    // Perform ! InitializeWritableStream(stream).
993    let stream = WritableStream::new_with_proto(global, None, can_gc);
994
995    // Let controller be a new WritableStreamDefaultController.
996    let controller = WritableStreamDefaultController::new(
997        global,
998        underlying_sink_type,
999        writable_high_water_mark,
1000        writable_size_algorithm,
1001        can_gc,
1002    );
1003
1004    // Perform ? SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm,
1005    // closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm).
1006    controller.setup(cx, global, &stream, can_gc)?;
1007
1008    // Return stream.
1009    Ok(stream)
1010}
1011
1012impl WritableStreamMethods<crate::DomTypeHolder> for WritableStream {
1013    /// <https://streams.spec.whatwg.org/#ws-constructor>
1014    fn Constructor(
1015        cx: SafeJSContext,
1016        global: &GlobalScope,
1017        proto: Option<SafeHandleObject>,
1018        can_gc: CanGc,
1019        underlying_sink: Option<*mut JSObject>,
1020        strategy: &QueuingStrategy,
1021    ) -> Fallible<DomRoot<WritableStream>> {
1022        // If underlyingSink is missing, set it to null.
1023        rooted!(in(*cx) let underlying_sink_obj = underlying_sink.unwrap_or(ptr::null_mut()));
1024
1025        // Let underlyingSinkDict be underlyingSink,
1026        // converted to an IDL value of type UnderlyingSink.
1027        let underlying_sink_dict = if !underlying_sink_obj.is_null() {
1028            rooted!(in(*cx) let obj_val = ObjectValue(underlying_sink_obj.get()));
1029            match UnderlyingSink::new(cx, obj_val.handle(), can_gc) {
1030                Ok(ConversionResult::Success(val)) => val,
1031                Ok(ConversionResult::Failure(error)) => return Err(Error::Type(error.to_string())),
1032                _ => {
1033                    return Err(Error::JSFailed);
1034                },
1035            }
1036        } else {
1037            UnderlyingSink::empty()
1038        };
1039
1040        if !underlying_sink_dict.type_.handle().is_undefined() {
1041            // If underlyingSinkDict["type"] exists, throw a RangeError exception.
1042            return Err(Error::Range("type is set".to_string()));
1043        }
1044
1045        // Perform ! InitializeWritableStream(this).
1046        let stream = WritableStream::new_with_proto(global, proto, can_gc);
1047
1048        // Let sizeAlgorithm be ! ExtractSizeAlgorithm(strategy).
1049        let size_algorithm = extract_size_algorithm(strategy, can_gc);
1050
1051        // Let highWaterMark be ? ExtractHighWaterMark(strategy, 1).
1052        let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
1053
1054        // Perform ? SetUpWritableStreamDefaultControllerFromUnderlyingSink(this, underlyingSink,
1055        // underlyingSinkDict, highWaterMark, sizeAlgorithm).
1056        stream.setup_from_underlying_sink(
1057            cx,
1058            global,
1059            &stream,
1060            underlying_sink_obj.handle(),
1061            &underlying_sink_dict,
1062            high_water_mark,
1063            size_algorithm,
1064            can_gc,
1065        )?;
1066
1067        Ok(stream)
1068    }
1069
1070    /// <https://streams.spec.whatwg.org/#ws-locked>
1071    fn Locked(&self) -> bool {
1072        // Return ! IsWritableStreamLocked(this).
1073        self.is_locked()
1074    }
1075
1076    /// <https://streams.spec.whatwg.org/#ws-abort>
1077    fn Abort(
1078        &self,
1079        cx: SafeJSContext,
1080        reason: SafeHandleValue,
1081        realm: InRealm,
1082        can_gc: CanGc,
1083    ) -> Rc<Promise> {
1084        let global = GlobalScope::from_safe_context(cx, realm);
1085
1086        // If ! IsWritableStreamLocked(this) is true,
1087        if self.is_locked() {
1088            // return a promise rejected with a TypeError exception.
1089            let promise = Promise::new(&global, can_gc);
1090            promise.reject_error(Error::Type("Stream is locked.".to_string()), can_gc);
1091            return promise;
1092        }
1093
1094        // Return ! WritableStreamAbort(this, reason).
1095        self.abort(cx, &global, reason, realm, can_gc)
1096    }
1097
1098    /// <https://streams.spec.whatwg.org/#ws-close>
1099    fn Close(&self, realm: InRealm, can_gc: CanGc) -> Rc<Promise> {
1100        let cx = GlobalScope::get_cx();
1101        let global = GlobalScope::from_safe_context(cx, realm);
1102
1103        // If ! IsWritableStreamLocked(this) is true,
1104        if self.is_locked() {
1105            // return a promise rejected with a TypeError exception.
1106            let promise = Promise::new(&global, can_gc);
1107            promise.reject_error(Error::Type("Stream is locked.".to_string()), can_gc);
1108            return promise;
1109        }
1110
1111        // If ! WritableStreamCloseQueuedOrInFlight(this) is true
1112        if self.close_queued_or_in_flight() {
1113            // return a promise rejected with a TypeError exception.
1114            let promise = Promise::new(&global, can_gc);
1115            promise.reject_error(
1116                Error::Type("Stream has closed queued or in-flight".to_string()),
1117                can_gc,
1118            );
1119            return promise;
1120        }
1121
1122        // Return ! WritableStreamClose(this).
1123        self.close(cx, &global, can_gc)
1124    }
1125
1126    /// <https://streams.spec.whatwg.org/#ws-get-writer>
1127    fn GetWriter(
1128        &self,
1129        realm: InRealm,
1130        can_gc: CanGc,
1131    ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
1132        let cx = GlobalScope::get_cx();
1133        let global = GlobalScope::from_safe_context(cx, realm);
1134
1135        // Return ? AcquireWritableStreamDefaultWriter(this).
1136        self.aquire_default_writer(cx, &global, can_gc)
1137    }
1138}
1139
1140impl js::gc::Rootable for CrossRealmTransformWritable {}
1141
1142/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
1143/// A wrapper to handle `message` and `messageerror` events
1144/// for the port used by the transfered stream.
1145#[derive(Clone, JSTraceable, MallocSizeOf)]
1146#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
1147pub(crate) struct CrossRealmTransformWritable {
1148    /// The controller used in the algorithm.
1149    controller: Dom<WritableStreamDefaultController>,
1150
1151    /// The `backpressurePromise` used in the algorithm.
1152    #[ignore_malloc_size_of = "Rc is hard"]
1153    backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
1154}
1155
1156impl CrossRealmTransformWritable {
1157    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
1158    /// Add a handler for port’s message event with the following steps:
1159    #[allow(unsafe_code)]
1160    pub(crate) fn handle_message(
1161        &self,
1162        cx: SafeJSContext,
1163        global: &GlobalScope,
1164        message: SafeHandleValue,
1165        _realm: InRealm,
1166        can_gc: CanGc,
1167    ) {
1168        rooted!(in(*cx) let mut value = UndefinedValue());
1169        let type_string =
1170            unsafe { get_type_and_value_from_message(cx, message, value.handle_mut(), can_gc) };
1171
1172        // If type is "pull",
1173        // Done below as the steps are the same for both types.
1174
1175        // Otherwise, if type is "error",
1176        if type_string == "error" {
1177            // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, value).
1178            self.controller
1179                .error_if_needed(cx, value.handle(), global, can_gc);
1180        }
1181
1182        let backpressure_promise = self.backpressure_promise.borrow_mut().take();
1183
1184        // Note: the below steps are for both "pull" and "error" types.
1185        // If backpressurePromise is not undefined,
1186        if let Some(promise) = backpressure_promise {
1187            // Resolve backpressurePromise with undefined.
1188            promise.resolve_native(&(), can_gc);
1189
1190            // Set backpressurePromise to undefined.
1191            // Done above with `take`.
1192        }
1193    }
1194
1195    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
1196    /// Add a handler for port’s messageerror event with the following steps:
1197    pub(crate) fn handle_error(
1198        &self,
1199        cx: SafeJSContext,
1200        global: &GlobalScope,
1201        port: &MessagePort,
1202        _realm: InRealm,
1203        can_gc: CanGc,
1204    ) {
1205        // Let error be a new "DataCloneError" DOMException.
1206        let error = DOMException::new(global, DOMErrorName::DataCloneError, can_gc);
1207        rooted!(in(*cx) let mut rooted_error = UndefinedValue());
1208        error.safe_to_jsval(cx, rooted_error.handle_mut());
1209
1210        // Perform ! CrossRealmTransformSendError(port, error).
1211        port.cross_realm_transform_send_error(rooted_error.handle(), can_gc);
1212
1213        // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, error).
1214        self.controller
1215            .error_if_needed(cx, rooted_error.handle(), global, can_gc);
1216
1217        // Disentangle port.
1218        global.disentangle_port(port, can_gc);
1219    }
1220}
1221
1222/// <https://streams.spec.whatwg.org/#ws-transfer>
1223impl Transferable for WritableStream {
1224    type Index = MessagePortIndex;
1225    type Data = MessagePortImpl;
1226
1227    /// <https://streams.spec.whatwg.org/#ref-for-transfer-steps①>
1228    fn transfer(&self) -> Fallible<(MessagePortId, MessagePortImpl)> {
1229        // Step 1. If ! IsWritableStreamLocked(value) is true, throw a
1230        // "DataCloneError" DOMException.
1231        if self.is_locked() {
1232            return Err(Error::DataClone(None));
1233        }
1234
1235        let global = self.global();
1236        let realm = enter_realm(&*global);
1237        let comp = InRealm::Entered(&realm);
1238        let cx = GlobalScope::get_cx();
1239        let can_gc = CanGc::note();
1240
1241        // Step 2. Let port1 be a new MessagePort in the current Realm.
1242        let port_1 = MessagePort::new(&global, can_gc);
1243        global.track_message_port(&port_1, None);
1244
1245        // Step 3. Let port2 be a new MessagePort in the current Realm.
1246        let port_2 = MessagePort::new(&global, can_gc);
1247        global.track_message_port(&port_2, None);
1248
1249        // Step 4. Entangle port1 and port2.
1250        global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
1251
1252        // Step 5. Let readable be a new ReadableStream in the current Realm.
1253        let readable = ReadableStream::new_with_proto(&global, None, can_gc);
1254
1255        // Step 6. Perform ! SetUpCrossRealmTransformReadable(readable, port1).
1256        readable.setup_cross_realm_transform_readable(cx, &port_1, can_gc);
1257
1258        // Step 7. Let promise be ! ReadableStreamPipeTo(readable, value, false, false, false).
1259        let promise = readable.pipe_to(cx, &global, self, false, false, false, None, comp, can_gc);
1260
1261        // Step 8. Set promise.[[PromiseIsHandled]] to true.
1262        promise.set_promise_is_handled();
1263
1264        // Step 9. Set dataHolder.[[port]] to ! StructuredSerializeWithTransfer(port2, « port2 »).
1265        port_2.transfer()
1266    }
1267
1268    /// <https://streams.spec.whatwg.org/#ref-for-transfer-receiving-steps①>
1269    fn transfer_receive(
1270        owner: &GlobalScope,
1271        id: MessagePortId,
1272        port_impl: MessagePortImpl,
1273    ) -> Result<DomRoot<Self>, ()> {
1274        let cx = GlobalScope::get_cx();
1275        let can_gc = CanGc::note();
1276
1277        // Their transfer-receiving steps, given dataHolder and value, are:
1278        // Note: dataHolder is used in `structuredclone.rs`, and value is created here.
1279        let value = WritableStream::new_with_proto(owner, None, can_gc);
1280
1281        // Step 1. Let deserializedRecord be !
1282        // StructuredDeserializeWithTransfer(dataHolder.[[port]], the current
1283        // Realm).
1284        // Done with the `Deserialize` derive of `MessagePortImpl`.
1285
1286        // Step 2. Let port be deserializedRecord.[[Deserialized]].
1287        let transferred_port = MessagePort::transfer_receive(owner, id, port_impl)?;
1288
1289        // Step 3. Perform ! SetUpCrossRealmTransformWritable(value, port).
1290        value.setup_cross_realm_transform_writable(cx, &transferred_port, can_gc);
1291        Ok(value)
1292    }
1293
1294    /// Note: we are relying on the port transfer, so the data returned here are related to the port.
1295    fn serialized_storage<'a>(
1296        data: StructuredData<'a, '_>,
1297    ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
1298        match data {
1299            StructuredData::Reader(r) => &mut r.port_impls,
1300            StructuredData::Writer(w) => &mut w.ports,
1301        }
1302    }
1303}