script/dom/stream/
writablestream.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::mem;
8use std::ptr::{self};
9use std::rc::Rc;
10
11use dom_struct::dom_struct;
12use js::jsapi::{Heap, JSObject};
13use js::jsval::{JSVal, ObjectValue, UndefinedValue};
14use js::realm::CurrentRealm;
15use js::rust::{
16    HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
17    MutableHandleValue as SafeMutableHandleValue,
18};
19use rustc_hash::FxHashMap;
20use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
21use script_bindings::conversions::SafeToJSValConvertible;
22use servo_base::id::{MessagePortId, MessagePortIndex};
23use servo_constellation_traits::MessagePortImpl;
24
25use crate::dom::bindings::cell::DomRefCell;
26use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::{
27    QueuingStrategy, QueuingStrategySize,
28};
29use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::UnderlyingSink;
30use crate::dom::bindings::codegen::Bindings::WritableStreamBinding::WritableStreamMethods;
31use crate::dom::bindings::conversions::ConversionResult;
32use crate::dom::bindings::error::{Error, Fallible};
33use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
34use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
35use crate::dom::bindings::structuredclone::StructuredData;
36use crate::dom::bindings::transferable::Transferable;
37use crate::dom::domexception::{DOMErrorName, DOMException};
38use crate::dom::globalscope::GlobalScope;
39use crate::dom::messageport::MessagePort;
40use crate::dom::promise::Promise;
41use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
42use crate::dom::readablestream::{ReadableStream, get_type_and_value_from_message};
43use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
44use crate::dom::stream::writablestreamdefaultcontroller::{
45    UnderlyingSinkType, WritableStreamDefaultController,
46};
47use crate::dom::stream::writablestreamdefaultwriter::WritableStreamDefaultWriter;
48use crate::realms::{InRealm, enter_auto_realm, enter_realm};
49use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
50
51impl js::gc::Rootable for AbortAlgorithmFulfillmentHandler {}
52
53/// The fulfillment handler for the abort steps of
54/// <https://streams.spec.whatwg.org/#writable-stream-finish-erroring>
55#[derive(JSTraceable, MallocSizeOf)]
56#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
57struct AbortAlgorithmFulfillmentHandler {
58    stream: Dom<WritableStream>,
59    #[conditional_malloc_size_of]
60    abort_request_promise: Rc<Promise>,
61}
62
63impl Callback for AbortAlgorithmFulfillmentHandler {
64    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
65        // Resolve abortRequest’s promise with undefined.
66        self.abort_request_promise
67            .resolve_native(&(), CanGc::from_cx(cx));
68
69        // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
70        self.stream
71            .as_rooted()
72            .reject_close_and_closed_promise_if_needed(cx);
73    }
74}
75
76impl js::gc::Rootable for AbortAlgorithmRejectionHandler {}
77
78/// The rejection handler for the abort steps of
79/// <https://streams.spec.whatwg.org/#writable-stream-finish-erroring>
80#[derive(JSTraceable, MallocSizeOf)]
81#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
82struct AbortAlgorithmRejectionHandler {
83    stream: Dom<WritableStream>,
84    #[conditional_malloc_size_of]
85    abort_request_promise: Rc<Promise>,
86}
87
88impl Callback for AbortAlgorithmRejectionHandler {
89    fn callback(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) {
90        // Reject abortRequest’s promise with reason.
91        self.abort_request_promise
92            .reject_native(&reason, CanGc::from_cx(cx));
93
94        // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
95        self.stream
96            .as_rooted()
97            .reject_close_and_closed_promise_if_needed(cx);
98    }
99}
100
101impl js::gc::Rootable for PendingAbortRequest {}
102
103/// <https://streams.spec.whatwg.org/#pending-abort-request>
104#[derive(JSTraceable, MallocSizeOf)]
105#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
106struct PendingAbortRequest {
107    /// <https://streams.spec.whatwg.org/#pending-abort-request-promise>
108    #[conditional_malloc_size_of]
109    promise: Rc<Promise>,
110
111    /// <https://streams.spec.whatwg.org/#pending-abort-request-reason>
112    #[ignore_malloc_size_of = "mozjs"]
113    reason: Box<Heap<JSVal>>,
114
115    /// <https://streams.spec.whatwg.org/#pending-abort-request-was-already-erroring>
116    was_already_erroring: bool,
117}
118
119/// <https://streams.spec.whatwg.org/#writablestream-state>
120#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf)]
121pub(crate) enum WritableStreamState {
122    #[default]
123    Writable,
124    Closed,
125    Erroring,
126    Errored,
127}
128
129/// <https://streams.spec.whatwg.org/#ws-class>
130#[dom_struct]
131pub struct WritableStream {
132    reflector_: Reflector,
133
134    /// <https://streams.spec.whatwg.org/#writablestream-backpressure>
135    backpressure: Cell<bool>,
136
137    /// <https://streams.spec.whatwg.org/#writablestream-closerequest>
138    #[conditional_malloc_size_of]
139    close_request: DomRefCell<Option<Rc<Promise>>>,
140
141    /// <https://streams.spec.whatwg.org/#writablestream-controller>
142    controller: MutNullableDom<WritableStreamDefaultController>,
143
144    /// <https://streams.spec.whatwg.org/#writablestream-detached>
145    detached: Cell<bool>,
146
147    /// <https://streams.spec.whatwg.org/#writablestream-inflightwriterequest>
148    #[conditional_malloc_size_of]
149    in_flight_write_request: DomRefCell<Option<Rc<Promise>>>,
150
151    /// <https://streams.spec.whatwg.org/#writablestream-inflightcloserequest>
152    #[conditional_malloc_size_of]
153    in_flight_close_request: DomRefCell<Option<Rc<Promise>>>,
154
155    /// <https://streams.spec.whatwg.org/#writablestream-pendingabortrequest>
156    pending_abort_request: DomRefCell<Option<PendingAbortRequest>>,
157
158    /// <https://streams.spec.whatwg.org/#writablestream-state>
159    state: Cell<WritableStreamState>,
160
161    /// <https://streams.spec.whatwg.org/#writablestream-storederror>
162    #[ignore_malloc_size_of = "mozjs"]
163    stored_error: Heap<JSVal>,
164
165    /// <https://streams.spec.whatwg.org/#writablestream-writer>
166    writer: MutNullableDom<WritableStreamDefaultWriter>,
167
168    /// <https://streams.spec.whatwg.org/#writablestream-writerequests>
169    #[conditional_malloc_size_of]
170    write_requests: DomRefCell<VecDeque<Rc<Promise>>>,
171}
172
173impl WritableStream {
174    /// <https://streams.spec.whatwg.org/#initialize-writable-stream>
175    fn new_inherited() -> WritableStream {
176        WritableStream {
177            reflector_: Reflector::new(),
178            backpressure: Default::default(),
179            close_request: Default::default(),
180            controller: Default::default(),
181            detached: Default::default(),
182            in_flight_write_request: Default::default(),
183            in_flight_close_request: Default::default(),
184            pending_abort_request: Default::default(),
185            state: Default::default(),
186            stored_error: Default::default(),
187            writer: Default::default(),
188            write_requests: Default::default(),
189        }
190    }
191
192    pub(crate) fn new_with_proto(
193        global: &GlobalScope,
194        proto: Option<SafeHandleObject>,
195        can_gc: CanGc,
196    ) -> DomRoot<WritableStream> {
197        reflect_dom_object_with_proto(
198            Box::new(WritableStream::new_inherited()),
199            global,
200            proto,
201            can_gc,
202        )
203    }
204
205    /// Used as part of
206    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
207    pub(crate) fn assert_no_controller(&self) {
208        assert!(self.controller.get().is_none());
209    }
210
211    /// Used as part of
212    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller>
213    pub(crate) fn set_default_controller(&self, controller: &WritableStreamDefaultController) {
214        self.controller.set(Some(controller));
215    }
216
217    pub(crate) fn get_default_controller(&self) -> DomRoot<WritableStreamDefaultController> {
218        self.controller.get().expect("Controller should be set.")
219    }
220
221    pub(crate) fn is_writable(&self) -> bool {
222        matches!(self.state.get(), WritableStreamState::Writable)
223    }
224
225    pub(crate) fn is_erroring(&self) -> bool {
226        matches!(self.state.get(), WritableStreamState::Erroring)
227    }
228
229    pub(crate) fn is_errored(&self) -> bool {
230        matches!(self.state.get(), WritableStreamState::Errored)
231    }
232
233    pub(crate) fn is_closed(&self) -> bool {
234        matches!(self.state.get(), WritableStreamState::Closed)
235    }
236
237    pub(crate) fn has_in_flight_write_request(&self) -> bool {
238        self.in_flight_write_request.borrow().is_some()
239    }
240
241    /// <https://streams.spec.whatwg.org/#writable-stream-has-operation-marked-in-flight>
242    pub(crate) fn has_operations_marked_inflight(&self) -> bool {
243        let in_flight_write_requested = self.in_flight_write_request.borrow().is_some();
244        let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
245
246        in_flight_write_requested || in_flight_close_requested
247    }
248
249    /// <https://streams.spec.whatwg.org/#writablestream-storederror>
250    pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
251        handle_mut.set(self.stored_error.get());
252    }
253
254    /// <https://streams.spec.whatwg.org/#writable-stream-finish-erroring>
255    pub(crate) fn finish_erroring(&self, cx: &mut js::context::JSContext, global: &GlobalScope) {
256        // Assert: stream.[[state]] is "erroring".
257        assert!(self.is_erroring());
258
259        // Assert: ! WritableStreamHasOperationMarkedInFlight(stream) is false.
260        assert!(!self.has_operations_marked_inflight());
261
262        // Set stream.[[state]] to "errored".
263        self.state.set(WritableStreamState::Errored);
264
265        // Perform ! stream.[[controller]].[[ErrorSteps]]().
266        let Some(controller) = self.controller.get() else {
267            unreachable!("Stream should have a controller.");
268        };
269        controller.perform_error_steps();
270
271        // Let storedError be stream.[[storedError]].
272        rooted!(&in(cx) let mut stored_error = UndefinedValue());
273        self.get_stored_error(stored_error.handle_mut());
274
275        // For each writeRequest of stream.[[writeRequests]]:
276        let write_requests = mem::take(&mut *self.write_requests.borrow_mut());
277        for request in write_requests {
278            // Reject writeRequest with storedError.
279            request.reject(cx.into(), stored_error.handle(), CanGc::from_cx(cx));
280        }
281
282        // Set stream.[[writeRequests]] to an empty list.
283        // Done above with `drain`.
284
285        // If stream.[[pendingAbortRequest]] is undefined,
286        if self.pending_abort_request.borrow().is_none() {
287            // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
288            self.reject_close_and_closed_promise_if_needed(cx);
289
290            // Return.
291            return;
292        }
293
294        // Let abortRequest be stream.[[pendingAbortRequest]].
295        // Set stream.[[pendingAbortRequest]] to undefined.
296        rooted!(&in(cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
297        if let Some(pending_abort_request) = &*pending_abort_request {
298            // If abortRequest’s was already erroring is true,
299            if pending_abort_request.was_already_erroring {
300                // Reject abortRequest’s promise with storedError.
301                pending_abort_request.promise.reject(
302                    cx.into(),
303                    stored_error.handle(),
304                    CanGc::from_cx(cx),
305                );
306
307                // Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream).
308                self.reject_close_and_closed_promise_if_needed(cx);
309
310                // Return.
311                return;
312            }
313
314            // Let promise be ! stream.[[controller]].[[AbortSteps]](abortRequest’s reason).
315            rooted!(&in(cx) let mut reason = UndefinedValue());
316            reason.set(pending_abort_request.reason.get());
317            let promise = controller.abort_steps(cx, global, reason.handle());
318
319            // Upon fulfillment of promise,
320            rooted!(&in(cx) let mut fulfillment_handler = Some(AbortAlgorithmFulfillmentHandler {
321                stream: Dom::from_ref(self),
322                abort_request_promise: pending_abort_request.promise.clone(),
323            }));
324
325            // Upon rejection of promise with reason r,
326            rooted!(&in(cx) let mut rejection_handler = Some(AbortAlgorithmRejectionHandler {
327                stream: Dom::from_ref(self),
328                abort_request_promise: pending_abort_request.promise.clone(),
329            }));
330
331            let handler = PromiseNativeHandler::new(
332                global,
333                fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
334                rejection_handler.take().map(|h| Box::new(h) as Box<_>),
335                CanGc::from_cx(cx),
336            );
337            let realm = enter_realm(global);
338            let comp = InRealm::Entered(&realm);
339            promise.append_native_handler(&handler, comp, CanGc::from_cx(cx));
340        }
341    }
342
343    /// <https://streams.spec.whatwg.org/#writable-stream-reject-close-and-closed-promise-if-needed>
344    fn reject_close_and_closed_promise_if_needed(&self, cx: &mut js::context::JSContext) {
345        // Assert: stream.[[state]] is "errored".
346        assert!(self.is_errored());
347
348        rooted!(&in(cx) let mut stored_error = UndefinedValue());
349        self.get_stored_error(stored_error.handle_mut());
350
351        // If stream.[[closeRequest]] is not undefined
352        let close_request = self.close_request.borrow_mut().take();
353        if let Some(close_request) = close_request {
354            // Assert: stream.[[inFlightCloseRequest]] is undefined.
355            assert!(self.in_flight_close_request.borrow().is_none());
356
357            // Reject stream.[[closeRequest]] with stream.[[storedError]].
358            close_request.reject_native(&stored_error.handle(), CanGc::from_cx(cx))
359
360            // Set stream.[[closeRequest]] to undefined.
361            // Done with `take` above.
362        }
363
364        // Let writer be stream.[[writer]].
365        // If writer is not undefined,
366        if let Some(writer) = self.writer.get() {
367            // Reject writer.[[closedPromise]] with stream.[[storedError]].
368            writer.reject_closed_promise_with_stored_error(cx, &stored_error.handle());
369
370            // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
371            writer.set_close_promise_is_handled();
372        }
373    }
374
375    /// <https://streams.spec.whatwg.org/#writable-stream-close-queued-or-in-flight>
376    pub(crate) fn close_queued_or_in_flight(&self) -> bool {
377        let close_requested = self.close_request.borrow().is_some();
378        let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
379
380        close_requested || in_flight_close_requested
381    }
382
383    /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-write>
384    pub(crate) fn finish_in_flight_write(&self, can_gc: CanGc) {
385        let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
386            // Assert: stream.[[inFlightWriteRequest]] is not undefined.
387            unreachable!("Stream should have a write request");
388        };
389
390        // Resolve stream.[[inFlightWriteRequest]] with undefined.
391        in_flight_write_request.resolve_native(&(), can_gc);
392
393        // Set stream.[[inFlightWriteRequest]] to undefined.
394        // Done above with `take`.
395    }
396
397    /// <https://streams.spec.whatwg.org/#writable-stream-start-erroring>
398    pub(crate) fn start_erroring(
399        &self,
400        cx: &mut js::context::JSContext,
401        global: &GlobalScope,
402        error: SafeHandleValue,
403    ) {
404        // Assert: stream.[[storedError]] is undefined.
405        assert!(self.stored_error.get().is_undefined());
406
407        // Assert: stream.[[state]] is "writable".
408        assert!(self.is_writable());
409
410        // Let controller be stream.[[controller]].
411        let Some(controller) = self.controller.get() else {
412            // Assert: controller is not undefined.
413            unreachable!("Stream should have a controller.");
414        };
415
416        // Set stream.[[state]] to "erroring".
417        self.state.set(WritableStreamState::Erroring);
418
419        // Set stream.[[storedError]] to reason.
420        self.stored_error.set(*error);
421
422        // Let writer be stream.[[writer]].
423        if let Some(writer) = self.writer.get() {
424            // If writer is not undefined, perform ! WritableStreamDefaultWriterEnsureReadyPromiseRejected
425            writer.ensure_ready_promise_rejected(global, error, CanGc::from_cx(cx));
426        }
427
428        // If ! WritableStreamHasOperationMarkedInFlight(stream) is false and controller.[[started]] is true
429        if !self.has_operations_marked_inflight() && controller.started() {
430            // perform ! WritableStreamFinishErroring
431            self.finish_erroring(cx, global);
432        }
433    }
434
435    /// <https://streams.spec.whatwg.org/#writable-stream-deal-with-rejection>
436    pub(crate) fn deal_with_rejection(
437        &self,
438        cx: &mut js::context::JSContext,
439        global: &GlobalScope,
440        error: SafeHandleValue,
441    ) {
442        // Let state be stream.[[state]].
443
444        // If state is "writable",
445        if self.is_writable() {
446            // Perform ! WritableStreamStartErroring(stream, error).
447            self.start_erroring(cx, global, error);
448
449            // Return.
450            return;
451        }
452
453        // Assert: state is "erroring".
454        assert!(self.is_erroring());
455
456        // Perform ! WritableStreamFinishErroring(stream).
457        self.finish_erroring(cx, global);
458    }
459
460    /// <https://streams.spec.whatwg.org/#writable-stream-mark-first-write-request-in-flight>
461    pub(crate) fn mark_first_write_request_in_flight(&self) {
462        let mut in_flight_write_request = self.in_flight_write_request.borrow_mut();
463        let mut write_requests = self.write_requests.borrow_mut();
464
465        // Assert: stream.[[inFlightWriteRequest]] is undefined.
466        assert!(in_flight_write_request.is_none());
467
468        // Assert: stream.[[writeRequests]] is not empty.
469        assert!(!write_requests.is_empty());
470
471        // Let writeRequest be stream.[[writeRequests]][0].
472        // Remove writeRequest from stream.[[writeRequests]].
473        let write_request = write_requests.pop_front().unwrap();
474
475        // Set stream.[[inFlightWriteRequest]] to writeRequest.
476        *in_flight_write_request = Some(write_request);
477    }
478
479    /// <https://streams.spec.whatwg.org/#writable-stream-mark-close-request-in-flight>
480    pub(crate) fn mark_close_request_in_flight(&self) {
481        let mut in_flight_close_request = self.in_flight_close_request.borrow_mut();
482        let mut close_request = self.close_request.borrow_mut();
483
484        // Assert: stream.[[inFlightCloseRequest]] is undefined.
485        assert!(in_flight_close_request.is_none());
486
487        // Assert: stream.[[closeRequest]] is not undefined.
488        assert!(close_request.is_some());
489
490        // Let closeRequest be stream.[[closeRequest]].
491        // Set stream.[[closeRequest]] to undefined.
492        let close_request = close_request.take().unwrap();
493
494        // Set stream.[[inFlightCloseRequest]] to closeRequest.
495        *in_flight_close_request = Some(close_request);
496    }
497
498    /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-close>
499    pub(crate) fn finish_in_flight_close(&self, cx: SafeJSContext, can_gc: CanGc) {
500        let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
501            // Assert: stream.[[inFlightCloseRequest]] is not undefined.
502            unreachable!("in_flight_close_request must be Some");
503        };
504
505        // Resolve stream.[[inFlightCloseRequest]] with undefined.
506        in_flight_close_request.resolve_native(&(), can_gc);
507
508        // Set stream.[[inFlightCloseRequest]] to undefined.
509        // Done with take above.
510
511        // Assert: stream.[[state]] is "writable" or "erroring".
512        assert!(self.is_writable() || self.is_erroring());
513
514        // If state is "erroring",
515        if self.is_erroring() {
516            // Set stream.[[storedError]] to undefined.
517            self.stored_error.set(UndefinedValue());
518
519            // If stream.[[pendingAbortRequest]] is not undefined,
520            rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
521            if let Some(pending_abort_request) = &*pending_abort_request {
522                // Resolve stream.[[pendingAbortRequest]]'s promise with undefined.
523                pending_abort_request.promise.resolve_native(&(), can_gc);
524
525                // Set stream.[[pendingAbortRequest]] to undefined.
526                // Done above with `take`.
527            }
528        }
529
530        // Set stream.[[state]] to "closed".
531        self.state.set(WritableStreamState::Closed);
532
533        // Let writer be stream.[[writer]].
534        if let Some(writer) = self.writer.get() {
535            // If writer is not undefined,
536            // resolve writer.[[closedPromise]] with undefined.
537            writer.resolve_closed_promise_with_undefined(can_gc);
538        }
539
540        // Assert: stream.[[pendingAbortRequest]] is undefined.
541        assert!(self.pending_abort_request.borrow().is_none());
542
543        // Assert: stream.[[storedError]] is undefined.
544        assert!(self.stored_error.get().is_undefined());
545    }
546
547    /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-close-with-error>
548    pub(crate) fn finish_in_flight_close_with_error(
549        &self,
550        cx: &mut js::context::JSContext,
551        global: &GlobalScope,
552        error: SafeHandleValue,
553    ) {
554        let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
555            // Assert: stream.[[inFlightCloseRequest]] is not undefined.
556            unreachable!("Inflight close request must be defined.");
557        };
558
559        // Reject stream.[[inFlightCloseRequest]] with error.
560        in_flight_close_request.reject_native(&error, CanGc::from_cx(cx));
561
562        // Set stream.[[inFlightCloseRequest]] to undefined.
563        // Done above with `take`.
564
565        // Assert: stream.[[state]] is "writable" or "erroring".
566        assert!(self.is_erroring() || self.is_writable());
567
568        // If stream.[[pendingAbortRequest]] is not undefined,
569        rooted!(&in(cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
570        if let Some(pending_abort_request) = &*pending_abort_request {
571            // Reject stream.[[pendingAbortRequest]]'s promise with error.
572            pending_abort_request
573                .promise
574                .reject_native(&error, CanGc::from_cx(cx));
575
576            // Set stream.[[pendingAbortRequest]] to undefined.
577            // Done above with `take`.
578        }
579
580        // Perform ! WritableStreamDealWithRejection(stream, error).
581        self.deal_with_rejection(cx, global, error);
582    }
583
584    /// <https://streams.spec.whatwg.org/#writable-stream-finish-in-flight-write-with-error>
585    pub(crate) fn finish_in_flight_write_with_error(
586        &self,
587        cx: &mut js::context::JSContext,
588        global: &GlobalScope,
589        error: SafeHandleValue,
590    ) {
591        let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
592            // Assert: stream.[[inFlightWriteRequest]] is not undefined.
593            unreachable!("Inflight write request must be defined.");
594        };
595
596        // Reject stream.[[inFlightWriteRequest]] with error.
597        in_flight_write_request.reject_native(&error, CanGc::from_cx(cx));
598
599        // Set stream.[[inFlightWriteRequest]] to undefined.
600        // Done above with `take`.
601
602        // Assert: stream.[[state]] is "writable" or "erroring".
603        assert!(self.is_erroring() || self.is_writable());
604
605        // Perform ! WritableStreamDealWithRejection(stream, error).
606        self.deal_with_rejection(cx, global, error);
607    }
608
609    pub(crate) fn get_writer(&self) -> Option<DomRoot<WritableStreamDefaultWriter>> {
610        self.writer.get()
611    }
612
613    pub(crate) fn set_writer(&self, writer: Option<&WritableStreamDefaultWriter>) {
614        self.writer.set(writer);
615    }
616
617    pub(crate) fn set_backpressure(&self, backpressure: bool) {
618        self.backpressure.set(backpressure);
619    }
620
621    pub(crate) fn get_backpressure(&self) -> bool {
622        self.backpressure.get()
623    }
624
625    /// <https://streams.spec.whatwg.org/#is-writable-stream-locked>
626    pub(crate) fn is_locked(&self) -> bool {
627        // If stream.[[writer]] is undefined, return false.
628        // Return true.
629        self.get_writer().is_some()
630    }
631
632    /// <https://streams.spec.whatwg.org/#writable-stream-add-write-request>
633    pub(crate) fn add_write_request(&self, global: &GlobalScope, can_gc: CanGc) -> Rc<Promise> {
634        // Assert: ! IsWritableStreamLocked(stream) is true.
635        assert!(self.is_locked());
636
637        // Assert: stream.[[state]] is "writable".
638        assert!(self.is_writable());
639
640        // Let promise be a new promise.
641        let promise = Promise::new(global, can_gc);
642
643        // Append promise to stream.[[writeRequests]].
644        self.write_requests.borrow_mut().push_back(promise.clone());
645
646        // Return promise.
647        promise
648    }
649
650    // Returns the rooted controller of the stream, if any.
651    pub(crate) fn get_controller(&self) -> Option<DomRoot<WritableStreamDefaultController>> {
652        self.controller.get()
653    }
654
655    /// <https://streams.spec.whatwg.org/#writable-stream-abort>
656    pub(crate) fn abort(
657        &self,
658        cx: &mut CurrentRealm,
659        global: &GlobalScope,
660        provided_reason: SafeHandleValue,
661    ) -> Rc<Promise> {
662        // If stream.[[state]] is "closed" or "errored",
663        if self.is_closed() || self.is_errored() {
664            // return a promise resolved with undefined.
665            return Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
666        }
667
668        // Signal abort on stream.[[controller]].[[abortController]] with reason.
669        self.get_controller()
670            .expect("Stream must have a controller.")
671            .signal_abort(cx, provided_reason);
672
673        // Let state be stream.[[state]].
674        let state = self.state.get();
675
676        // If state is "closed" or "errored", return a promise resolved with undefined.
677        if matches!(
678            state,
679            WritableStreamState::Closed | WritableStreamState::Errored
680        ) {
681            return Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
682        }
683
684        // If stream.[[pendingAbortRequest]] is not undefined,
685        if self.pending_abort_request.borrow().is_some() {
686            // return stream.[[pendingAbortRequest]]'s promise.
687            return self
688                .pending_abort_request
689                .borrow()
690                .as_ref()
691                .expect("Pending abort request must be Some.")
692                .promise
693                .clone();
694        }
695
696        // Assert: state is "writable" or "erroring".
697        assert!(self.is_writable() || self.is_erroring());
698
699        // Let wasAlreadyErroring be false.
700        let mut was_already_erroring = false;
701        rooted!(&in(cx) let undefined_reason = UndefinedValue());
702
703        // If state is "erroring",
704        let reason = if self.is_erroring() {
705            // Set wasAlreadyErroring to true.
706            was_already_erroring = true;
707
708            // Set reason to undefined.
709            undefined_reason.handle()
710        } else {
711            // Use the provided reason.
712            provided_reason
713        };
714
715        // Let promise be a new promise.
716        let promise = Promise::new2(cx, global);
717
718        // Set stream.[[pendingAbortRequest]] to a new pending abort request
719        // whose promise is promise,
720        // reason is reason,
721        // and was already erroring is wasAlreadyErroring.
722        *self.pending_abort_request.borrow_mut() = Some(PendingAbortRequest {
723            promise: promise.clone(),
724            reason: Heap::boxed(reason.get()),
725            was_already_erroring,
726        });
727
728        // If wasAlreadyErroring is false,
729        if !was_already_erroring {
730            // perform ! WritableStreamStartErroring(stream, reason)
731            self.start_erroring(cx, global, reason);
732        }
733
734        // Return promise.
735        promise
736    }
737
738    /// <https://streams.spec.whatwg.org/#writable-stream-close>
739    pub(crate) fn close(
740        &self,
741        cx: &mut js::context::JSContext,
742        global: &GlobalScope,
743    ) -> Rc<Promise> {
744        // Let state be stream.[[state]].
745        // If state is "closed" or "errored",
746        if self.is_closed() || self.is_errored() {
747            // return a promise rejected with a TypeError exception.
748            let promise = Promise::new2(cx, global);
749            promise.reject_error(
750                Error::Type(c"Stream is closed or errored.".to_owned()),
751                CanGc::from_cx(cx),
752            );
753            return promise;
754        }
755
756        // Assert: state is "writable" or "erroring".
757        assert!(self.is_writable() || self.is_erroring());
758
759        // Assert: ! WritableStreamCloseQueuedOrInFlight(stream) is false.
760        assert!(!self.close_queued_or_in_flight());
761
762        // Let promise be a new promise.
763        let promise = Promise::new2(cx, global);
764
765        // Set stream.[[closeRequest]] to promise.
766        *self.close_request.borrow_mut() = Some(promise.clone());
767
768        // Let writer be stream.[[writer]].
769        // If writer is not undefined,
770        if let Some(writer) = self.writer.get() {
771            // and stream.[[backpressure]] is true,
772            // and state is "writable",
773            if self.get_backpressure() && self.is_writable() {
774                // resolve writer.[[readyPromise]] with undefined.
775                writer.resolve_ready_promise_with_undefined(CanGc::from_cx(cx));
776            }
777        }
778
779        // Perform ! WritableStreamDefaultControllerClose(stream.[[controller]]).
780        let Some(controller) = self.controller.get() else {
781            unreachable!("Stream must have a controller.");
782        };
783        controller.close(cx, global);
784
785        // Return promise.
786        promise
787    }
788
789    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-get-desired-size>
790    /// Note: implement as a stream method, as opposed to a writer one, for convenience.
791    pub(crate) fn get_desired_size(&self) -> Option<f64> {
792        // Let stream be writer.[[stream]].
793        // Stream is `self`.
794
795        // Let state be stream.[[state]].
796        // If state is "errored" or "erroring", return null.
797        if self.is_errored() || self.is_erroring() {
798            return None;
799        }
800
801        // If state is "closed", return 0.
802        if self.is_closed() {
803            return Some(0.);
804        }
805
806        let Some(controller) = self.controller.get() else {
807            unreachable!("Stream must have a controller.");
808        };
809        Some(controller.get_desired_size())
810    }
811
812    /// <https://streams.spec.whatwg.org/#acquire-writable-stream-default-writer>
813    pub(crate) fn aquire_default_writer(
814        &self,
815        cx: SafeJSContext,
816        global: &GlobalScope,
817        can_gc: CanGc,
818    ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
819        // Let writer be a new WritableStreamDefaultWriter object.
820        let writer = WritableStreamDefaultWriter::new(global, None, can_gc);
821
822        // Perform ? SetUpWritableStreamDefaultWriter(writer, stream).
823        writer.setup(cx, self, can_gc)?;
824
825        // Return writer.
826        Ok(writer)
827    }
828
829    /// <https://streams.spec.whatwg.org/#writable-stream-update-backpressure>
830    pub(crate) fn update_backpressure(
831        &self,
832        backpressure: bool,
833        global: &GlobalScope,
834        can_gc: CanGc,
835    ) {
836        // Assert: stream.[[state]] is "writable".
837        self.is_writable();
838
839        // Assert: ! WritableStreamCloseQueuedOrInFlight(stream) is false.
840        assert!(!self.close_queued_or_in_flight());
841
842        // Let writer be stream.[[writer]].
843        let writer = self.get_writer();
844
845        if let Some(writer) = writer {
846            // If writer is not undefined
847            if backpressure != self.get_backpressure() {
848                // and backpressure is not stream.[[backpressure]],
849                if backpressure {
850                    // If backpressure is true, set writer.[[readyPromise]] to a new promise.
851                    let promise = Promise::new(global, can_gc);
852                    writer.set_ready_promise(promise);
853                } else {
854                    // Otherwise,
855                    // Assert: backpressure is false.
856                    assert!(!backpressure);
857                    // Resolve writer.[[readyPromise]] with undefined.
858                    writer.resolve_ready_promise_with_undefined(can_gc);
859                }
860            }
861        }
862
863        // Set stream.[[backpressure]] to backpressure.
864        self.set_backpressure(backpressure);
865    }
866
867    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
868    pub(crate) fn setup_cross_realm_transform_writable(
869        &self,
870        cx: &mut js::context::JSContext,
871        port: &MessagePort,
872    ) {
873        let port_id = port.message_port_id();
874        let global = self.global();
875
876        // Perform ! InitializeWritableStream(stream).
877        // Done in `new_inherited`.
878
879        // Let sizeAlgorithm be an algorithm that returns 1.
880        // Re-ordered because of the need to pass it to `new`.
881        let size_algorithm =
882            extract_size_algorithm(&QueuingStrategy::default(), CanGc::from_cx(cx));
883
884        // Note: other algorithms defined in the controller at call site.
885
886        // Let backpressurePromise be a new promise.
887        let backpressure_promise = Rc::new(RefCell::new(Some(Promise::new(
888            &global,
889            CanGc::from_cx(cx),
890        ))));
891
892        // Let controller be a new WritableStreamDefaultController.
893        let controller = WritableStreamDefaultController::new(
894            &global,
895            UnderlyingSinkType::Transfer {
896                backpressure_promise: backpressure_promise.clone(),
897                port: Dom::from_ref(port),
898            },
899            1.0,
900            size_algorithm,
901            CanGc::from_cx(cx),
902        );
903
904        // Add a handler for port’s message event with the following steps:
905        // Add a handler for port’s messageerror event with the following steps:
906        rooted!(&in(cx) let cross_realm_transform_writable = CrossRealmTransformWritable {
907            controller: Dom::from_ref(&controller),
908            backpressure_promise,
909        });
910        global.note_cross_realm_transform_writable(&cross_realm_transform_writable, port_id);
911
912        // Enable port’s port message queue.
913        port.Start(cx);
914
915        // Perform ! SetUpWritableStreamDefaultController
916        controller
917            .setup(cx.into(), &global, self, CanGc::from_cx(cx))
918            .expect("Setup for transfer cannot fail");
919    }
920    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink>
921    #[allow(clippy::too_many_arguments)]
922    pub(crate) fn setup_from_underlying_sink(
923        &self,
924        cx: SafeJSContext,
925        global: &GlobalScope,
926        stream: &WritableStream,
927        underlying_sink_obj: SafeHandleObject,
928        underlying_sink: &UnderlyingSink,
929        strategy_hwm: f64,
930        strategy_size: Rc<QueuingStrategySize>,
931        can_gc: CanGc,
932    ) -> Result<(), Error> {
933        // Let controller be a new WritableStreamDefaultController.
934
935        // Let startAlgorithm be an algorithm that returns undefined.
936
937        // Let writeAlgorithm be an algorithm that returns a promise resolved with undefined.
938
939        // Let closeAlgorithm be an algorithm that returns a promise resolved with undefined.
940
941        // Let abortAlgorithm be an algorithm that returns a promise resolved with undefined.
942
943        // If underlyingSinkDict["start"] exists, then set startAlgorithm to an algorithm which
944        // returns the result of invoking underlyingSinkDict["start"] with argument
945        // list « controller », exception behavior "rethrow", and callback this value underlyingSink.
946
947        // If underlyingSinkDict["write"] exists, then set writeAlgorithm to an algorithm which
948        // takes an argument chunk and returns the result of invoking underlyingSinkDict["write"]
949        // with argument list « chunk, controller » and callback this value underlyingSink.
950
951        // If underlyingSinkDict["close"] exists, then set closeAlgorithm to an algorithm which
952        // returns the result of invoking underlyingSinkDict["close"] with argument
953        // list «» and callback this value underlyingSink.
954
955        // If underlyingSinkDict["abort"] exists, then set abortAlgorithm to an algorithm which
956        // takes an argument reason and returns the result of invoking underlyingSinkDict["abort"]
957        // with argument list « reason » and callback this value underlyingSink.
958        let controller = WritableStreamDefaultController::new(
959            global,
960            UnderlyingSinkType::new_js(
961                underlying_sink.abort.clone(),
962                underlying_sink.start.clone(),
963                underlying_sink.close.clone(),
964                underlying_sink.write.clone(),
965            ),
966            strategy_hwm,
967            strategy_size,
968            can_gc,
969        );
970
971        // Note: this must be done before `setup`,
972        // otherwise `thisOb` is null in the start callback.
973        controller.set_underlying_sink_this_object(underlying_sink_obj);
974
975        // Perform ? SetUpWritableStreamDefaultController
976        controller.setup(cx, global, stream, can_gc)
977    }
978}
979
980/// <https://streams.spec.whatwg.org/#create-writable-stream>
981#[cfg_attr(crown, expect(crown::unrooted_must_root))]
982pub(crate) fn create_writable_stream(
983    cx: SafeJSContext,
984    global: &GlobalScope,
985    writable_high_water_mark: f64,
986    writable_size_algorithm: Rc<QueuingStrategySize>,
987    underlying_sink_type: UnderlyingSinkType,
988    can_gc: CanGc,
989) -> Fallible<DomRoot<WritableStream>> {
990    // Assert: ! IsNonNegativeNumber(highWaterMark) is true.
991    assert!(writable_high_water_mark >= 0.0);
992
993    // Let stream be a new WritableStream.
994    // Perform ! InitializeWritableStream(stream).
995    let stream = WritableStream::new_with_proto(global, None, can_gc);
996
997    // Let controller be a new WritableStreamDefaultController.
998    let controller = WritableStreamDefaultController::new(
999        global,
1000        underlying_sink_type,
1001        writable_high_water_mark,
1002        writable_size_algorithm,
1003        can_gc,
1004    );
1005
1006    // Perform ? SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm,
1007    // closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm).
1008    controller.setup(cx, global, &stream, can_gc)?;
1009
1010    // Return stream.
1011    Ok(stream)
1012}
1013
1014impl WritableStreamMethods<crate::DomTypeHolder> for WritableStream {
1015    /// <https://streams.spec.whatwg.org/#ws-constructor>
1016    fn Constructor(
1017        cx: SafeJSContext,
1018        global: &GlobalScope,
1019        proto: Option<SafeHandleObject>,
1020        can_gc: CanGc,
1021        underlying_sink: Option<*mut JSObject>,
1022        strategy: &QueuingStrategy,
1023    ) -> Fallible<DomRoot<WritableStream>> {
1024        // If underlyingSink is missing, set it to null.
1025        rooted!(in(*cx) let underlying_sink_obj = underlying_sink.unwrap_or(ptr::null_mut()));
1026
1027        // Let underlyingSinkDict be underlyingSink,
1028        // converted to an IDL value of type UnderlyingSink.
1029        let underlying_sink_dict = if !underlying_sink_obj.is_null() {
1030            rooted!(in(*cx) let obj_val = ObjectValue(underlying_sink_obj.get()));
1031            match UnderlyingSink::new(cx, obj_val.handle(), can_gc) {
1032                Ok(ConversionResult::Success(val)) => val,
1033                Ok(ConversionResult::Failure(error)) => {
1034                    return Err(Error::Type(error.into_owned()));
1035                },
1036                _ => {
1037                    return Err(Error::JSFailed);
1038                },
1039            }
1040        } else {
1041            UnderlyingSink::empty()
1042        };
1043
1044        if !underlying_sink_dict.type_.handle().is_undefined() {
1045            // If underlyingSinkDict["type"] exists, throw a RangeError exception.
1046            return Err(Error::Range(c"type is set".to_owned()));
1047        }
1048
1049        // Perform ! InitializeWritableStream(this).
1050        let stream = WritableStream::new_with_proto(global, proto, can_gc);
1051
1052        // Let sizeAlgorithm be ! ExtractSizeAlgorithm(strategy).
1053        let size_algorithm = extract_size_algorithm(strategy, can_gc);
1054
1055        // Let highWaterMark be ? ExtractHighWaterMark(strategy, 1).
1056        let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
1057
1058        // Perform ? SetUpWritableStreamDefaultControllerFromUnderlyingSink(this, underlyingSink,
1059        // underlyingSinkDict, highWaterMark, sizeAlgorithm).
1060        stream.setup_from_underlying_sink(
1061            cx,
1062            global,
1063            &stream,
1064            underlying_sink_obj.handle(),
1065            &underlying_sink_dict,
1066            high_water_mark,
1067            size_algorithm,
1068            can_gc,
1069        )?;
1070
1071        Ok(stream)
1072    }
1073
1074    /// <https://streams.spec.whatwg.org/#ws-locked>
1075    fn Locked(&self) -> bool {
1076        // Return ! IsWritableStreamLocked(this).
1077        self.is_locked()
1078    }
1079
1080    /// <https://streams.spec.whatwg.org/#ws-abort>
1081    fn Abort(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) -> Rc<Promise> {
1082        let global = GlobalScope::from_current_realm(cx);
1083
1084        // If ! IsWritableStreamLocked(this) is true,
1085        if self.is_locked() {
1086            // return a promise rejected with a TypeError exception.
1087            let promise = Promise::new2(cx, &global);
1088            promise.reject_error(
1089                Error::Type(c"Stream is locked.".to_owned()),
1090                CanGc::from_cx(cx),
1091            );
1092            return promise;
1093        }
1094
1095        // Return ! WritableStreamAbort(this, reason).
1096        self.abort(cx, &global, reason)
1097    }
1098
1099    /// <https://streams.spec.whatwg.org/#ws-close>
1100    fn Close(&self, cx: &mut CurrentRealm) -> Rc<Promise> {
1101        let global = GlobalScope::from_current_realm(cx);
1102
1103        // If ! IsWritableStreamLocked(this) is true,
1104        if self.is_locked() {
1105            // return a promise rejected with a TypeError exception.
1106            let promise = Promise::new2(cx, &global);
1107            promise.reject_error(
1108                Error::Type(c"Stream is locked.".to_owned()),
1109                CanGc::from_cx(cx),
1110            );
1111            return promise;
1112        }
1113
1114        // If ! WritableStreamCloseQueuedOrInFlight(this) is true
1115        if self.close_queued_or_in_flight() {
1116            // return a promise rejected with a TypeError exception.
1117            let promise = Promise::new2(cx, &global);
1118            promise.reject_error(
1119                Error::Type(c"Stream has closed queued or in-flight".to_owned()),
1120                CanGc::from_cx(cx),
1121            );
1122            return promise;
1123        }
1124
1125        // Return ! WritableStreamClose(this).
1126        self.close(cx, &global)
1127    }
1128
1129    /// <https://streams.spec.whatwg.org/#ws-get-writer>
1130    fn GetWriter(
1131        &self,
1132        realm: InRealm,
1133        can_gc: CanGc,
1134    ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
1135        let cx = GlobalScope::get_cx();
1136        let global = GlobalScope::from_safe_context(cx, realm);
1137
1138        // Return ? AcquireWritableStreamDefaultWriter(this).
1139        self.aquire_default_writer(cx, &global, can_gc)
1140    }
1141}
1142
1143impl js::gc::Rootable for CrossRealmTransformWritable {}
1144
1145/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
1146/// A wrapper to handle `message` and `messageerror` events
1147/// for the port used by the transfered stream.
1148#[derive(Clone, JSTraceable, MallocSizeOf)]
1149#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
1150pub(crate) struct CrossRealmTransformWritable {
1151    /// The controller used in the algorithm.
1152    controller: Dom<WritableStreamDefaultController>,
1153
1154    /// The `backpressurePromise` used in the algorithm.
1155    #[ignore_malloc_size_of = "nested Rc"]
1156    backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
1157}
1158
1159impl CrossRealmTransformWritable {
1160    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
1161    /// Add a handler for port’s message event with the following steps:
1162    #[expect(unsafe_code)]
1163    pub(crate) fn handle_message(
1164        &self,
1165        cx: &mut CurrentRealm,
1166        global: &GlobalScope,
1167        message: SafeHandleValue,
1168    ) {
1169        rooted!(&in(cx) let mut value = UndefinedValue());
1170        let type_string = unsafe {
1171            get_type_and_value_from_message(
1172                cx.into(),
1173                message,
1174                value.handle_mut(),
1175                CanGc::from_cx(cx),
1176            )
1177        };
1178
1179        // If type is "pull",
1180        // Done below as the steps are the same for both types.
1181
1182        // Otherwise, if type is "error",
1183        if type_string == "error" {
1184            // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, value).
1185            self.controller.error_if_needed(cx, value.handle(), global);
1186        }
1187
1188        let backpressure_promise = self.backpressure_promise.borrow_mut().take();
1189
1190        // Note: the below steps are for both "pull" and "error" types.
1191        // If backpressurePromise is not undefined,
1192        if let Some(promise) = backpressure_promise {
1193            // Resolve backpressurePromise with undefined.
1194            promise.resolve_native(&(), CanGc::from_cx(cx));
1195
1196            // Set backpressurePromise to undefined.
1197            // Done above with `take`.
1198        }
1199    }
1200
1201    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
1202    /// Add a handler for port’s messageerror event with the following steps:
1203    pub(crate) fn handle_error(
1204        &self,
1205        cx: &mut CurrentRealm,
1206        global: &GlobalScope,
1207        port: &MessagePort,
1208    ) {
1209        // Let error be a new "DataCloneError" DOMException.
1210        let error = DOMException::new(global, DOMErrorName::DataCloneError, CanGc::from_cx(cx));
1211        rooted!(&in(cx) let mut rooted_error = UndefinedValue());
1212        error.safe_to_jsval(cx.into(), rooted_error.handle_mut(), CanGc::from_cx(cx));
1213
1214        // Perform ! CrossRealmTransformSendError(port, error).
1215        port.cross_realm_transform_send_error(rooted_error.handle(), CanGc::from_cx(cx));
1216
1217        // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, error).
1218        self.controller
1219            .error_if_needed(cx, rooted_error.handle(), global);
1220
1221        // Disentangle port.
1222        global.disentangle_port(port, CanGc::from_cx(cx));
1223    }
1224}
1225
1226/// <https://streams.spec.whatwg.org/#ws-transfer>
1227impl Transferable for WritableStream {
1228    type Index = MessagePortIndex;
1229    type Data = MessagePortImpl;
1230
1231    /// <https://streams.spec.whatwg.org/#ref-for-transfer-steps①>
1232    fn transfer(
1233        &self,
1234        cx: &mut js::context::JSContext,
1235    ) -> Fallible<(MessagePortId, MessagePortImpl)> {
1236        // Step 1. If ! IsWritableStreamLocked(value) is true, throw a
1237        // "DataCloneError" DOMException.
1238        if self.is_locked() {
1239            return Err(Error::DataClone(None));
1240        }
1241
1242        let global = self.global();
1243        let mut realm = enter_auto_realm(cx, &*global);
1244        let mut realm = realm.current_realm();
1245        let cx = &mut realm;
1246
1247        // Step 2. Let port1 be a new MessagePort in the current Realm.
1248        let port_1 = MessagePort::new(&global, CanGc::from_cx(cx));
1249        global.track_message_port(&port_1, None);
1250
1251        // Step 3. Let port2 be a new MessagePort in the current Realm.
1252        let port_2 = MessagePort::new(&global, CanGc::from_cx(cx));
1253        global.track_message_port(&port_2, None);
1254
1255        // Step 4. Entangle port1 and port2.
1256        global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
1257
1258        // Step 5. Let readable be a new ReadableStream in the current Realm.
1259        let readable = ReadableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
1260
1261        // Step 6. Perform ! SetUpCrossRealmTransformReadable(readable, port1).
1262        readable.setup_cross_realm_transform_readable(cx, &port_1);
1263
1264        // Step 7. Let promise be ! ReadableStreamPipeTo(readable, value, false, false, false).
1265        let promise = readable.pipe_to(cx, &global, self, false, false, false, None);
1266
1267        // Step 8. Set promise.[[PromiseIsHandled]] to true.
1268        promise.set_promise_is_handled();
1269
1270        // Step 9. Set dataHolder.[[port]] to ! StructuredSerializeWithTransfer(port2, « port2 »).
1271        port_2.transfer(cx)
1272    }
1273
1274    /// <https://streams.spec.whatwg.org/#ref-for-transfer-receiving-steps①>
1275    fn transfer_receive(
1276        cx: &mut js::context::JSContext,
1277        owner: &GlobalScope,
1278        id: MessagePortId,
1279        port_impl: MessagePortImpl,
1280    ) -> Result<DomRoot<Self>, ()> {
1281        // Their transfer-receiving steps, given dataHolder and value, are:
1282        // Note: dataHolder is used in `structuredclone.rs`, and value is created here.
1283        let value = WritableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1284
1285        // Step 1. Let deserializedRecord be !
1286        // StructuredDeserializeWithTransfer(dataHolder.[[port]], the current
1287        // Realm).
1288        // Done with the `Deserialize` derive of `MessagePortImpl`.
1289
1290        // Step 2. Let port be deserializedRecord.[[Deserialized]].
1291        let transferred_port = MessagePort::transfer_receive(cx, owner, id, port_impl)?;
1292
1293        // Step 3. Perform ! SetUpCrossRealmTransformWritable(value, port).
1294        value.setup_cross_realm_transform_writable(cx, &transferred_port);
1295        Ok(value)
1296    }
1297
1298    /// Note: we are relying on the port transfer, so the data returned here are related to the port.
1299    fn serialized_storage<'a>(
1300        data: StructuredData<'a, '_>,
1301    ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
1302        match data {
1303            StructuredData::Reader(r) => &mut r.port_impls,
1304            StructuredData::Writer(w) => &mut w.ports,
1305        }
1306    }
1307}