script/dom/
readablestream.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr::{self};
8use std::rc::Rc;
9
10use base::id::{MessagePortId, MessagePortIndex};
11use constellation_traits::MessagePortImpl;
12use dom_struct::dom_struct;
13use ipc_channel::ipc::IpcSharedMemory;
14use js::jsapi::{Heap, JSObject};
15use js::jsval::{JSVal, ObjectValue, UndefinedValue};
16use js::rust::{
17    HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
18    MutableHandleValue as SafeMutableHandleValue,
19};
20use js::typedarray::ArrayBufferViewU8;
21use rustc_hash::FxHashMap;
22use script_bindings::conversions::SafeToJSValConvertible;
23
24use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
25use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{
26    ReadableStreamGetReaderOptions, ReadableStreamMethods, ReadableStreamReaderMode,
27    ReadableWritablePair, StreamPipeOptions,
28};
29use script_bindings::str::DOMString;
30
31use crate::dom::domexception::{DOMErrorName, DOMException};
32use script_bindings::conversions::{is_array_like, StringificationBehavior};
33use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
34use crate::dom::abortsignal::{AbortAlgorithm, AbortSignal};
35use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods;
36use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultController_Binding::ReadableStreamDefaultControllerMethods;
37use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource;
38use crate::dom::bindings::conversions::{ConversionBehavior, ConversionResult, SafeFromJSValConvertible};
39use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
40use crate::dom::bindings::codegen::GenericBindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriter_Binding::WritableStreamDefaultWriterMethods;
41use crate::dom::writablestream::WritableStream;
42use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultReaderOrReadableStreamBYOBReader as ReadableStreamReader;
43use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
44use crate::dom::bindings::root::{DomRoot, MutNullableDom, Dom};
45use crate::dom::bindings::trace::RootedTraceableBox;
46use crate::dom::bindings::utils::get_dictionary_property;
47use crate::dom::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
48use crate::dom::readablestreamgenericreader::ReadableStreamGenericReader;
49use crate::dom::globalscope::GlobalScope;
50use crate::dom::promise::{wait_for_all_promise, Promise};
51use crate::dom::readablebytestreamcontroller::ReadableByteStreamController;
52use crate::dom::readablestreambyobreader::ReadableStreamBYOBReader;
53use crate::dom::readablestreamdefaultcontroller::ReadableStreamDefaultController;
54use crate::dom::readablestreamdefaultreader::{ReadRequest, ReadableStreamDefaultReader};
55use crate::dom::defaultteeunderlyingsource::TeeCancelAlgorithm;
56use crate::dom::types::DefaultTeeUnderlyingSource;
57use crate::dom::underlyingsourcecontainer::UnderlyingSourceType;
58use crate::dom::writablestreamdefaultwriter::WritableStreamDefaultWriter;
59use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
60use crate::dom::messageport::MessagePort;
61use crate::realms::{enter_realm, InRealm};
62use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
63use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
64use crate::dom::bindings::transferable::Transferable;
65use crate::dom::bindings::structuredclone::StructuredData;
66
67use super::bindings::buffer_source::HeapBufferSource;
68use super::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::ReadableStreamBYOBReaderReadOptions;
69use super::readablestreambyobreader::ReadIntoRequest;
70
71/// State Machine for `PipeTo`.
72#[derive(Clone, Debug, Default, MallocSizeOf, PartialEq)]
73enum PipeToState {
74    /// The starting state
75    #[default]
76    Starting,
77    /// Waiting for the writer to be ready
78    PendingReady,
79    /// Waiting for a read to resolve.
80    PendingRead,
81    /// Waiting for all pending writes to finish,
82    /// as part of shutting down with an optional action.
83    ShuttingDownWithPendingWrites(Option<ShutdownAction>),
84    /// When shutting down with an action,
85    /// waiting for the action to complete,
86    /// at which point we can `finalize`.
87    ShuttingDownPendingAction,
88    /// The pipe has been finalized,
89    /// no further actions should be performed.
90    Finalized,
91}
92
93/// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action>
94#[derive(Clone, Debug, MallocSizeOf, PartialEq)]
95enum ShutdownAction {
96    /// <https://streams.spec.whatwg.org/#writable-stream-abort>
97    WritableStreamAbort,
98    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>
99    ReadableStreamCancel,
100    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close-with-error-propagation>
101    WritableStreamDefaultWriterCloseWithErrorPropagation,
102    /// <https://streams.spec.whatwg.org/#ref-for-rs-pipeTo-shutdown-with-action>
103    Abort,
104}
105
106impl js::gc::Rootable for PipeTo {}
107
108/// The "in parallel, but not really" part of
109/// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
110///
111/// Note: the spec is flexible about how this is done, but requires the following constraints to apply:
112/// - Public API must not be used: we'll only use Rust.
113/// - Backpressure must be enforced: we'll only read from source when dest is ready.
114/// - Shutdown must stop activity: we'll do this together with the below.
115/// - Error and close states must be propagated: we'll do this by checking these states at every step.
116#[derive(Clone, JSTraceable, MallocSizeOf)]
117#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
118pub(crate) struct PipeTo {
119    /// <https://streams.spec.whatwg.org/#ref-for-readablestream%E2%91%A7%E2%91%A0>
120    reader: Dom<ReadableStreamDefaultReader>,
121
122    /// <https://streams.spec.whatwg.org/#ref-for-acquire-writable-stream-default-writer>
123    writer: Dom<WritableStreamDefaultWriter>,
124
125    /// Pending writes are needed when shutting down(with an action),
126    /// because we can only finalize when all writes are finished.
127    #[ignore_malloc_size_of = "nested Rc"]
128    pending_writes: Rc<RefCell<VecDeque<Rc<Promise>>>>,
129
130    /// The state machine.
131    #[conditional_malloc_size_of]
132    #[no_trace]
133    state: Rc<RefCell<PipeToState>>,
134
135    /// <https://streams.spec.whatwg.org/#readablestream-pipe-to-preventabort>
136    prevent_abort: bool,
137
138    /// <https://streams.spec.whatwg.org/#readablestream-pipe-to-preventcancel>
139    prevent_cancel: bool,
140
141    /// <https://streams.spec.whatwg.org/#readablestream-pipe-to-preventclose>
142    prevent_close: bool,
143
144    /// The `shuttingDown` variable of
145    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
146    #[conditional_malloc_size_of]
147    shutting_down: Rc<Cell<bool>>,
148
149    /// The abort reason of the abort signal,
150    /// stored here because we must keep it across a microtask.
151    #[ignore_malloc_size_of = "mozjs"]
152    abort_reason: Rc<Heap<JSVal>>,
153
154    /// The error potentially passed to shutdown,
155    /// stored here because we must keep it across a microtask.
156    #[ignore_malloc_size_of = "mozjs"]
157    shutdown_error: Rc<RefCell<Option<Heap<JSVal>>>>,
158
159    /// The promise returned by a shutdown action.
160    /// We keep it to only continue when it is not pending anymore.
161    #[ignore_malloc_size_of = "nested Rc"]
162    shutdown_action_promise: Rc<RefCell<Option<Rc<Promise>>>>,
163
164    /// The promise resolved or rejected at
165    /// <https://streams.spec.whatwg.org/#rs-pipeTo-finalize>
166    #[conditional_malloc_size_of]
167    result_promise: Rc<Promise>,
168}
169
170impl PipeTo {
171    /// Run the `abortAlgorithm` defined at
172    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
173    pub(crate) fn abort_with_reason(
174        &self,
175        cx: SafeJSContext,
176        global: &GlobalScope,
177        reason: SafeHandleValue,
178        realm: InRealm,
179        can_gc: CanGc,
180    ) {
181        // Abort should do nothing if we are already shutting down.
182        if self.shutting_down.get() {
183            return;
184        }
185
186        // Let error be signal’s abort reason.
187        // Note: storing it because it may need to be kept across a microtask,
188        // and see the note below as to why it is kept separately from `shutdown_error`.
189        self.abort_reason.set(reason.get());
190
191        // Note: setting the error now,
192        // will result in a rejection of the pipe promise, with this error.
193        // Unless any shutdown action raise their own error,
194        // in which case this error will be overwritten by the shutdown action error.
195        self.set_shutdown_error(reason);
196
197        // Let actions be an empty ordered set.
198        // Note: the actions are defined, and performed, inside `shutdown_with_an_action`.
199
200        // Shutdown with an action consisting of getting a promise to wait for all of the actions in actions,
201        // and with error.
202        self.shutdown(cx, global, Some(ShutdownAction::Abort), realm, can_gc);
203    }
204}
205
206impl Callback for PipeTo {
207    /// The pipe makes progress one microtask at a time.
208    /// Note: we use one struct as the callback for all promises,
209    /// and for both of their reactions.
210    ///
211    /// The context of the callback is determined from:
212    /// - the current state.
213    /// - the type of `result`.
214    /// - the state of a stored promise(in some cases).
215    #[expect(unsafe_code)]
216    fn callback(&self, cx: SafeJSContext, result: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
217        let global = self.reader.global();
218
219        // Note: we only care about the result of writes when they are rejected,
220        // and the error is accessed not through handlers,
221        // but directly using `dest.get_stored_error`.
222        // So we must mark rejected promises as handled
223        // to prevent unhandled rejection errors.
224        self.pending_writes.borrow_mut().retain(|p| {
225            let pending = p.is_pending();
226            if !pending {
227                p.set_promise_is_handled();
228            }
229            pending
230        });
231
232        // Note: cloning to prevent re-borrow in methods called below.
233        let state_before_checks = self.state.borrow().clone();
234
235        // Note: if we are in a `PendingRead` state,
236        // and the source is closed,
237        // we try to write chunks before doing any shutdown,
238        // which is necessary to implement the
239        // "If any chunks have been read but not yet written, write them to dest."
240        // part of shutdown.
241        if state_before_checks == PipeToState::PendingRead {
242            let source = self.reader.get_stream().expect("Source stream must be set");
243            if source.is_closed() {
244                let dest = self
245                    .writer
246                    .get_stream()
247                    .expect("Destination stream must be set");
248
249                // If dest.[[state]] is "writable",
250                // and ! WritableStreamCloseQueuedOrInFlight(dest) is false,
251                if dest.is_writable() && !dest.close_queued_or_in_flight() {
252                    let Ok(done) = get_read_promise_done(cx, &result, can_gc) else {
253                        // This is the case that the microtask ran in reaction
254                        // to the closed promise of the reader,
255                        // so we should wait for subsequent chunks,
256                        // and skip the shutdown below
257                        // (reader is closed, but there are still pending reads).
258                        // Shutdown will happen when the last chunk has been received.
259                        return;
260                    };
261
262                    if !done {
263                        // If any chunks have been read but not yet written, write them to dest.
264                        self.write_chunk(cx, &global, result, can_gc);
265                    }
266                }
267            }
268        }
269
270        self.check_and_propagate_errors_forward(cx, &global, realm, can_gc);
271        self.check_and_propagate_errors_backward(cx, &global, realm, can_gc);
272        self.check_and_propagate_closing_forward(cx, &global, realm, can_gc);
273        self.check_and_propagate_closing_backward(cx, &global, realm, can_gc);
274
275        // Note: cloning to prevent re-borrow in methods called below.
276        let state = self.state.borrow().clone();
277
278        // If we switched to a shutdown state,
279        // return.
280        // Progress will be made at the next tick.
281        if state != state_before_checks {
282            return;
283        }
284
285        match state {
286            PipeToState::Starting => unreachable!("PipeTo should not be in the Starting state."),
287            PipeToState::PendingReady => {
288                // Read a chunk.
289                self.read_chunk(&global, realm, can_gc);
290            },
291            PipeToState::PendingRead => {
292                // Write the chunk.
293                self.write_chunk(cx, &global, result, can_gc);
294
295                // An early return is necessary if the write algorithm aborted the pipe.
296                if self.shutting_down.get() {
297                    return;
298                }
299
300                // Wait for the writer to be ready again.
301                self.wait_for_writer_ready(&global, realm, can_gc);
302            },
303            PipeToState::ShuttingDownWithPendingWrites(action) => {
304                // Wait until every chunk that has been read has been written
305                // (i.e. the corresponding promises have settled).
306                if let Some(write) = self.pending_writes.borrow_mut().front().cloned() {
307                    self.wait_on_pending_write(&global, write, realm, can_gc);
308                    return;
309                }
310
311                // Note: error is stored in `self.shutdown_error`.
312                if let Some(action) = action {
313                    // Let p be the result of performing action.
314                    self.perform_action(cx, &global, action, realm, can_gc);
315                } else {
316                    // Finalize, passing along error if it was given.
317                    self.finalize(cx, &global, can_gc);
318                }
319            },
320            PipeToState::ShuttingDownPendingAction => {
321                let Some(ref promise) = *self.shutdown_action_promise.borrow() else {
322                    unreachable!();
323                };
324                if promise.is_pending() {
325                    // While waiting for the action to complete,
326                    // we may get callbacks for other promises(closed, ready),
327                    // and we should ignore those.
328                    return;
329                }
330
331                let is_array_like = {
332                    if !result.is_object() {
333                        false
334                    } else {
335                        unsafe { is_array_like::<crate::DomTypeHolder>(*cx, result) }
336                    }
337                };
338
339                // Finalize, passing along error if it was given.
340                if !result.is_undefined() && !is_array_like {
341                    // Most actions either resolve with undefined,
342                    // or reject with an error,
343                    // and the error should be used when finalizing.
344                    // One exception is the `Abort` action,
345                    // which resolves with a list of undefined values.
346
347                    // If `result` isn't undefined or array-like,
348                    // then it is an error
349                    // and should overwrite the current shutdown error.
350                    self.set_shutdown_error(result);
351                }
352                self.finalize(cx, &global, can_gc);
353            },
354            PipeToState::Finalized => {},
355        }
356    }
357}
358
359impl PipeTo {
360    /// Setting shutdown error in a way that ensures it isn't
361    /// moved after it has been set.
362    fn set_shutdown_error(&self, error: SafeHandleValue) {
363        *self.shutdown_error.borrow_mut() = Some(Heap::default());
364        let Some(ref heap) = *self.shutdown_error.borrow() else {
365            unreachable!("Option set to Some(heap) above.");
366        };
367        heap.set(error.get())
368    }
369
370    /// Wait for the writer to be ready,
371    /// which implements the constraint that backpressure must be enforced.
372    fn wait_for_writer_ready(&self, global: &GlobalScope, realm: InRealm, can_gc: CanGc) {
373        {
374            let mut state = self.state.borrow_mut();
375            *state = PipeToState::PendingReady;
376        }
377
378        let ready_promise = self.writer.Ready();
379        if ready_promise.is_fulfilled() {
380            self.read_chunk(global, realm, can_gc);
381        } else {
382            let handler = PromiseNativeHandler::new(
383                global,
384                Some(Box::new(self.clone())),
385                Some(Box::new(self.clone())),
386                can_gc,
387            );
388            ready_promise.append_native_handler(&handler, realm, can_gc);
389
390            // Note: if the writer is not ready,
391            // in order to ensure progress we must
392            // also react to the closure of the source(because source may close empty).
393            let closed_promise = self.reader.Closed();
394            closed_promise.append_native_handler(&handler, realm, can_gc);
395        }
396    }
397
398    /// Read a chunk
399    fn read_chunk(&self, global: &GlobalScope, realm: InRealm, can_gc: CanGc) {
400        *self.state.borrow_mut() = PipeToState::PendingRead;
401        let chunk_promise = self.reader.Read(can_gc);
402        let handler = PromiseNativeHandler::new(
403            global,
404            Some(Box::new(self.clone())),
405            Some(Box::new(self.clone())),
406            can_gc,
407        );
408        chunk_promise.append_native_handler(&handler, realm, can_gc);
409
410        // Note: in order to ensure progress we must
411        // also react to the closure of the destination.
412        let ready_promise = self.writer.Closed();
413        ready_promise.append_native_handler(&handler, realm, can_gc);
414    }
415
416    /// Try to write a chunk using the jsval, and returns wether it succeeded
417    // It will fail if it is the last `done` chunk, or if it is not a chunk at all.
418    #[expect(unsafe_code)]
419    fn write_chunk(
420        &self,
421        cx: SafeJSContext,
422        global: &GlobalScope,
423        chunk: SafeHandleValue,
424        can_gc: CanGc,
425    ) -> bool {
426        if chunk.is_object() {
427            rooted!(in(*cx) let object = chunk.to_object());
428            rooted!(in(*cx) let mut bytes = UndefinedValue());
429            let has_value = unsafe {
430                get_dictionary_property(*cx, object.handle(), "value", bytes.handle_mut(), can_gc)
431                    .expect("Chunk should have a value.")
432            };
433            if has_value {
434                // Write the chunk.
435                let write_promise = self.writer.write(cx, global, bytes.handle(), can_gc);
436                self.pending_writes.borrow_mut().push_back(write_promise);
437                return true;
438            }
439        }
440        false
441    }
442
443    /// Only as part of shutting-down do we wait on pending writes
444    /// (backpressure is communicated not through pending writes
445    /// but through the readiness of the writer).
446    fn wait_on_pending_write(
447        &self,
448        global: &GlobalScope,
449        promise: Rc<Promise>,
450        realm: InRealm,
451        can_gc: CanGc,
452    ) {
453        let handler = PromiseNativeHandler::new(
454            global,
455            Some(Box::new(self.clone())),
456            Some(Box::new(self.clone())),
457            can_gc,
458        );
459        promise.append_native_handler(&handler, realm, can_gc);
460    }
461
462    /// Errors must be propagated forward part of
463    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
464    fn check_and_propagate_errors_forward(
465        &self,
466        cx: SafeJSContext,
467        global: &GlobalScope,
468        realm: InRealm,
469        can_gc: CanGc,
470    ) {
471        // An early return is necessary if we are shutting down,
472        // because in that case the source can already have been set to none.
473        if self.shutting_down.get() {
474            return;
475        }
476
477        // if source.[[state]] is or becomes "errored", then
478        let source = self
479            .reader
480            .get_stream()
481            .expect("Reader should still have a stream");
482        if source.is_errored() {
483            rooted!(in(*cx) let mut source_error = UndefinedValue());
484            source.get_stored_error(source_error.handle_mut());
485            self.set_shutdown_error(source_error.handle());
486
487            // If preventAbort is false,
488            if !self.prevent_abort {
489                // shutdown with an action of ! WritableStreamAbort(dest, source.[[storedError]])
490                // and with source.[[storedError]].
491                self.shutdown(
492                    cx,
493                    global,
494                    Some(ShutdownAction::WritableStreamAbort),
495                    realm,
496                    can_gc,
497                )
498            } else {
499                // Otherwise, shutdown with source.[[storedError]].
500                self.shutdown(cx, global, None, realm, can_gc);
501            }
502        }
503    }
504
505    /// Errors must be propagated backward part of
506    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
507    fn check_and_propagate_errors_backward(
508        &self,
509        cx: SafeJSContext,
510        global: &GlobalScope,
511        realm: InRealm,
512        can_gc: CanGc,
513    ) {
514        // An early return is necessary if we are shutting down,
515        // because in that case the destination can already have been set to none.
516        if self.shutting_down.get() {
517            return;
518        }
519
520        // if dest.[[state]] is or becomes "errored", then
521        let dest = self
522            .writer
523            .get_stream()
524            .expect("Writer should still have a stream");
525        if dest.is_errored() {
526            rooted!(in(*cx) let mut dest_error = UndefinedValue());
527            dest.get_stored_error(dest_error.handle_mut());
528            self.set_shutdown_error(dest_error.handle());
529
530            // If preventCancel is false,
531            if !self.prevent_cancel {
532                // shutdown with an action of ! ReadableStreamCancel(source, dest.[[storedError]])
533                // and with dest.[[storedError]].
534                self.shutdown(
535                    cx,
536                    global,
537                    Some(ShutdownAction::ReadableStreamCancel),
538                    realm,
539                    can_gc,
540                )
541            } else {
542                // Otherwise, shutdown with dest.[[storedError]].
543                self.shutdown(cx, global, None, realm, can_gc);
544            }
545        }
546    }
547
548    /// Closing must be propagated forward part of
549    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
550    fn check_and_propagate_closing_forward(
551        &self,
552        cx: SafeJSContext,
553        global: &GlobalScope,
554        realm: InRealm,
555        can_gc: CanGc,
556    ) {
557        // An early return is necessary if we are shutting down,
558        // because in that case the source can already have been set to none.
559        if self.shutting_down.get() {
560            return;
561        }
562
563        // if source.[[state]] is or becomes "closed", then
564        let source = self
565            .reader
566            .get_stream()
567            .expect("Reader should still have a stream");
568        if source.is_closed() {
569            // If preventClose is false,
570            if !self.prevent_close {
571                // shutdown with an action of ! WritableStreamAbort(dest, source.[[storedError]])
572                // and with source.[[storedError]].
573                self.shutdown(
574                    cx,
575                    global,
576                    Some(ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation),
577                    realm,
578                    can_gc,
579                )
580            } else {
581                // Otherwise, shutdown.
582                self.shutdown(cx, global, None, realm, can_gc);
583            }
584        }
585    }
586
587    /// Closing must be propagated backward part of
588    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
589    fn check_and_propagate_closing_backward(
590        &self,
591        cx: SafeJSContext,
592        global: &GlobalScope,
593        realm: InRealm,
594        can_gc: CanGc,
595    ) {
596        // An early return is necessary if we are shutting down,
597        // because in that case the destination can already have been set to none.
598        if self.shutting_down.get() {
599            return;
600        }
601
602        // if ! WritableStreamCloseQueuedOrInFlight(dest) is true
603        // or dest.[[state]] is "closed"
604        let dest = self
605            .writer
606            .get_stream()
607            .expect("Writer should still have a stream");
608        if dest.close_queued_or_in_flight() || dest.is_closed() {
609            // Assert: no chunks have been read or written.
610            // Note: unclear how to perform this assertion.
611
612            // Let destClosed be a new TypeError.
613            rooted!(in(*cx) let mut dest_closed = UndefinedValue());
614            let error =
615                Error::Type("Destination is closed or has closed queued or in flight".to_string());
616            error.to_jsval(cx, global, dest_closed.handle_mut(), can_gc);
617            self.set_shutdown_error(dest_closed.handle());
618
619            // If preventCancel is false,
620            if !self.prevent_cancel {
621                // shutdown with an action of ! ReadableStreamCancel(source, destClosed)
622                // and with destClosed.
623                self.shutdown(
624                    cx,
625                    global,
626                    Some(ShutdownAction::ReadableStreamCancel),
627                    realm,
628                    can_gc,
629                )
630            } else {
631                // Otherwise, shutdown with destClosed.
632                self.shutdown(cx, global, None, realm, can_gc);
633            }
634        }
635    }
636
637    /// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action>
638    /// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown>
639    /// Combined into one method with an optional action.
640    fn shutdown(
641        &self,
642        cx: SafeJSContext,
643        global: &GlobalScope,
644        action: Option<ShutdownAction>,
645        realm: InRealm,
646        can_gc: CanGc,
647    ) {
648        // If shuttingDown is true, abort these substeps.
649        // Set shuttingDown to true.
650        if !self.shutting_down.replace(true) {
651            let dest = self.writer.get_stream().expect("Stream must be set");
652            // If dest.[[state]] is "writable",
653            // and ! WritableStreamCloseQueuedOrInFlight(dest) is false,
654            if dest.is_writable() && !dest.close_queued_or_in_flight() {
655                // If any chunks have been read but not yet written, write them to dest.
656                // Done at the top of `Callback`.
657
658                // Wait until every chunk that has been read has been written
659                // (i.e. the corresponding promises have settled).
660                if let Some(write) = self.pending_writes.borrow_mut().front() {
661                    *self.state.borrow_mut() = PipeToState::ShuttingDownWithPendingWrites(action);
662                    self.wait_on_pending_write(global, write.clone(), realm, can_gc);
663                    return;
664                }
665            }
666
667            // Note: error is stored in `self.shutdown_error`.
668            if let Some(action) = action {
669                // Let p be the result of performing action.
670                self.perform_action(cx, global, action, realm, can_gc);
671            } else {
672                // Finalize, passing along error if it was given.
673                self.finalize(cx, global, can_gc);
674            }
675        }
676    }
677
678    /// The perform action part of
679    /// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action>
680    fn perform_action(
681        &self,
682        cx: SafeJSContext,
683        global: &GlobalScope,
684        action: ShutdownAction,
685        realm: InRealm,
686        can_gc: CanGc,
687    ) {
688        rooted!(in(*cx) let mut error = UndefinedValue());
689        if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
690            error.set(shutdown_error.get());
691        }
692
693        *self.state.borrow_mut() = PipeToState::ShuttingDownPendingAction;
694
695        // Let p be the result of performing action.
696        let promise = match action {
697            ShutdownAction::WritableStreamAbort => {
698                let dest = self.writer.get_stream().expect("Stream must be set");
699                dest.abort(cx, global, error.handle(), realm, can_gc)
700            },
701            ShutdownAction::ReadableStreamCancel => {
702                let source = self
703                    .reader
704                    .get_stream()
705                    .expect("Reader should have a stream.");
706                source.cancel(cx, global, error.handle(), can_gc)
707            },
708            ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation => {
709                self.writer.close_with_error_propagation(cx, global, can_gc)
710            },
711            ShutdownAction::Abort => {
712                // Note: implementation of the `abortAlgorithm`
713                // of the signal associated with this piping operation.
714
715                // Let error be signal’s abort reason.
716                rooted!(in(*cx) let mut error = UndefinedValue());
717                error.set(self.abort_reason.get());
718
719                // Let actions be an empty ordered set.
720                let mut actions = vec![];
721
722                // If preventAbort is false, append the following action to actions:
723                if !self.prevent_abort {
724                    let dest = self
725                        .writer
726                        .get_stream()
727                        .expect("Destination stream must be set");
728
729                    // If dest.[[state]] is "writable",
730                    let promise = if dest.is_writable() {
731                        // return ! WritableStreamAbort(dest, error)
732                        dest.abort(cx, global, error.handle(), realm, can_gc)
733                    } else {
734                        // Otherwise, return a promise resolved with undefined.
735                        Promise::new_resolved(global, cx, (), can_gc)
736                    };
737                    actions.push(promise);
738                }
739
740                // If preventCancel is false, append the following action action to actions:
741                if !self.prevent_cancel {
742                    let source = self.reader.get_stream().expect("Source stream must be set");
743
744                    // If source.[[state]] is "readable",
745                    let promise = if source.is_readable() {
746                        // return ! ReadableStreamCancel(source, error).
747                        source.cancel(cx, global, error.handle(), can_gc)
748                    } else {
749                        // Otherwise, return a promise resolved with undefined.
750                        Promise::new_resolved(global, cx, (), can_gc)
751                    };
752                    actions.push(promise);
753                }
754
755                // Shutdown with an action consisting
756                // of getting a promise to wait for all of the actions in actions,
757                // and with error.
758                wait_for_all_promise(cx, global, actions, realm, can_gc)
759            },
760        };
761
762        // Upon fulfillment of p, finalize, passing along originalError if it was given.
763        // Upon rejection of p with reason newError, finalize with newError.
764        let handler = PromiseNativeHandler::new(
765            global,
766            Some(Box::new(self.clone())),
767            Some(Box::new(self.clone())),
768            can_gc,
769        );
770        promise.append_native_handler(&handler, realm, can_gc);
771        *self.shutdown_action_promise.borrow_mut() = Some(promise);
772    }
773
774    /// <https://streams.spec.whatwg.org/#rs-pipeTo-finalize>
775    fn finalize(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
776        *self.state.borrow_mut() = PipeToState::Finalized;
777
778        // Perform ! WritableStreamDefaultWriterRelease(writer).
779        self.writer.release(cx, global, can_gc);
780
781        // If reader implements ReadableStreamBYOBReader,
782        // perform ! ReadableStreamBYOBReaderRelease(reader).
783        // TODO.
784
785        // Otherwise, perform ! ReadableStreamDefaultReaderRelease(reader).
786        self.reader
787            .release(can_gc)
788            .expect("Releasing the reader should not fail");
789
790        // If signal is not undefined, remove abortAlgorithm from signal.
791        // Note: since `self.shutdown` is true at this point,
792        // the abort algorithm is a no-op,
793        // so for now not implementing this step.
794
795        if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
796            rooted!(in(*cx) let mut error = UndefinedValue());
797            error.set(shutdown_error.get());
798            // If error was given, reject promise with error.
799            self.result_promise.reject_native(&error.handle(), can_gc);
800        } else {
801            // Otherwise, resolve promise with undefined.
802            self.result_promise.resolve_native(&(), can_gc);
803        }
804    }
805}
806
807/// The fulfillment handler for the reacting to sourceCancelPromise part of
808/// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
809#[derive(Clone, JSTraceable, MallocSizeOf)]
810struct SourceCancelPromiseFulfillmentHandler {
811    #[conditional_malloc_size_of]
812    result: Rc<Promise>,
813}
814
815impl Callback for SourceCancelPromiseFulfillmentHandler {
816    /// The fulfillment handler for the reacting to sourceCancelPromise part of
817    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
818    /// An implementation of <https://webidl.spec.whatwg.org/#dfn-perform-steps-once-promise-is-settled>
819    fn callback(&self, _cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
820        self.result.resolve_native(&(), can_gc);
821    }
822}
823
824/// The rejection handler for the reacting to sourceCancelPromise part of
825/// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
826#[derive(Clone, JSTraceable, MallocSizeOf)]
827struct SourceCancelPromiseRejectionHandler {
828    #[conditional_malloc_size_of]
829    result: Rc<Promise>,
830}
831
832impl Callback for SourceCancelPromiseRejectionHandler {
833    /// The rejection handler for the reacting to sourceCancelPromise part of
834    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
835    /// An implementation of <https://webidl.spec.whatwg.org/#dfn-perform-steps-once-promise-is-settled>
836    fn callback(&self, _cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
837        self.result.reject_native(&v, can_gc);
838    }
839}
840
841/// <https://streams.spec.whatwg.org/#readablestream-state>
842#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf, PartialEq)]
843pub(crate) enum ReadableStreamState {
844    #[default]
845    Readable,
846    Closed,
847    Errored,
848}
849
850/// <https://streams.spec.whatwg.org/#readablestream-controller>
851#[derive(JSTraceable, MallocSizeOf)]
852#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
853pub(crate) enum ControllerType {
854    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller>
855    Byte(MutNullableDom<ReadableByteStreamController>),
856    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller>
857    Default(MutNullableDom<ReadableStreamDefaultController>),
858}
859
860/// <https://streams.spec.whatwg.org/#readablestream-readerr>
861#[derive(JSTraceable, MallocSizeOf)]
862#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
863pub(crate) enum ReaderType {
864    /// <https://streams.spec.whatwg.org/#readablestreambyobreader>
865    #[allow(clippy::upper_case_acronyms)]
866    BYOB(MutNullableDom<ReadableStreamBYOBReader>),
867    /// <https://streams.spec.whatwg.org/#readablestreamdefaultreader>
868    Default(MutNullableDom<ReadableStreamDefaultReader>),
869}
870
871impl Eq for ReaderType {}
872impl PartialEq for ReaderType {
873    fn eq(&self, other: &Self) -> bool {
874        matches!(
875            (self, other),
876            (ReaderType::BYOB(_), ReaderType::BYOB(_)) |
877                (ReaderType::Default(_), ReaderType::Default(_))
878        )
879    }
880}
881
882/// <https://streams.spec.whatwg.org/#create-readable-stream>
883#[cfg_attr(crown, allow(crown::unrooted_must_root))]
884pub(crate) fn create_readable_stream(
885    global: &GlobalScope,
886    underlying_source_type: UnderlyingSourceType,
887    queuing_strategy: Option<Rc<QueuingStrategySize>>,
888    high_water_mark: Option<f64>,
889    can_gc: CanGc,
890) -> DomRoot<ReadableStream> {
891    // If highWaterMark was not passed, set it to 1.
892    let high_water_mark = high_water_mark.unwrap_or(1.0);
893
894    // If sizeAlgorithm was not passed, set it to an algorithm that returns 1.
895    let size_algorithm =
896        queuing_strategy.unwrap_or(extract_size_algorithm(&QueuingStrategy::empty(), can_gc));
897
898    // Assert: ! IsNonNegativeNumber(highWaterMark) is true.
899    assert!(high_water_mark >= 0.0);
900
901    // Let stream be a new ReadableStream.
902    // Perform ! InitializeReadableStream(stream).
903    let stream = ReadableStream::new_with_proto(global, None, can_gc);
904
905    // Let controller be a new ReadableStreamDefaultController.
906    let controller = ReadableStreamDefaultController::new(
907        global,
908        underlying_source_type,
909        high_water_mark,
910        size_algorithm,
911        can_gc,
912    );
913
914    // Perform ? SetUpReadableStreamDefaultController(stream, controller, startAlgorithm,
915    // pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm).
916    controller
917        .setup(stream.clone(), can_gc)
918        .expect("Setup of default controller cannot fail");
919
920    // Return stream.
921    stream
922}
923
924/// <https://streams.spec.whatwg.org/#rs-class>
925#[dom_struct]
926pub(crate) struct ReadableStream {
927    reflector_: Reflector,
928
929    /// <https://streams.spec.whatwg.org/#readablestream-controller>
930    /// Note: the inner `MutNullableDom` should really be an `Option<Dom>`,
931    /// because it is never unset once set.
932    controller: RefCell<Option<ControllerType>>,
933
934    /// <https://streams.spec.whatwg.org/#readablestream-storederror>
935    #[ignore_malloc_size_of = "mozjs"]
936    stored_error: Heap<JSVal>,
937
938    /// <https://streams.spec.whatwg.org/#readablestream-disturbed>
939    disturbed: Cell<bool>,
940
941    /// <https://streams.spec.whatwg.org/#readablestream-reader>
942    reader: RefCell<Option<ReaderType>>,
943
944    /// <https://streams.spec.whatwg.org/#readablestream-state>
945    state: Cell<ReadableStreamState>,
946}
947
948impl ReadableStream {
949    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
950    /// <https://streams.spec.whatwg.org/#initialize-readable-stream>
951    fn new_inherited() -> ReadableStream {
952        ReadableStream {
953            reflector_: Reflector::new(),
954            controller: RefCell::new(None),
955            stored_error: Heap::default(),
956            disturbed: Default::default(),
957            reader: RefCell::new(None),
958            state: Cell::new(Default::default()),
959        }
960    }
961
962    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
963    pub(crate) fn new_with_proto(
964        global: &GlobalScope,
965        proto: Option<SafeHandleObject>,
966        can_gc: CanGc,
967    ) -> DomRoot<ReadableStream> {
968        reflect_dom_object_with_proto(
969            Box::new(ReadableStream::new_inherited()),
970            global,
971            proto,
972            can_gc,
973        )
974    }
975
976    /// Used as part of
977    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
978    pub(crate) fn set_default_controller(&self, controller: &ReadableStreamDefaultController) {
979        *self.controller.borrow_mut() = Some(ControllerType::Default(MutNullableDom::new(Some(
980            controller,
981        ))));
982    }
983
984    /// Used as part of
985    /// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
986    pub(crate) fn set_byte_controller(&self, controller: &ReadableByteStreamController) {
987        *self.controller.borrow_mut() =
988            Some(ControllerType::Byte(MutNullableDom::new(Some(controller))));
989    }
990
991    /// Used as part of
992    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
993    pub(crate) fn assert_no_controller(&self) {
994        let has_no_controller = self.controller.borrow().is_none();
995        assert!(has_no_controller);
996    }
997
998    /// Build a stream backed by a Rust source that has already been read into memory.
999    pub(crate) fn new_from_bytes(
1000        global: &GlobalScope,
1001        bytes: Vec<u8>,
1002        can_gc: CanGc,
1003    ) -> Fallible<DomRoot<ReadableStream>> {
1004        let stream = ReadableStream::new_with_external_underlying_source(
1005            global,
1006            UnderlyingSourceType::Memory(bytes.len()),
1007            can_gc,
1008        )?;
1009        stream.enqueue_native(bytes, can_gc);
1010        stream.controller_close_native(can_gc);
1011        Ok(stream)
1012    }
1013
1014    /// Build a stream backed by a Rust underlying source.
1015    /// Note: external sources are always paired with a default controller.
1016    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
1017    pub(crate) fn new_with_external_underlying_source(
1018        global: &GlobalScope,
1019        source: UnderlyingSourceType,
1020        can_gc: CanGc,
1021    ) -> Fallible<DomRoot<ReadableStream>> {
1022        assert!(source.is_native());
1023        let stream = ReadableStream::new_with_proto(global, None, can_gc);
1024        let controller = ReadableStreamDefaultController::new(
1025            global,
1026            source,
1027            1.0,
1028            extract_size_algorithm(&QueuingStrategy::empty(), can_gc),
1029            can_gc,
1030        );
1031        controller.setup(stream.clone(), can_gc)?;
1032        Ok(stream)
1033    }
1034
1035    /// Call into the release steps of the controller,
1036    pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
1037        match self.controller.borrow().as_ref() {
1038            Some(ControllerType::Default(controller)) => {
1039                let controller = controller
1040                    .get()
1041                    .ok_or_else(|| Error::Type("Stream should have controller.".to_string()))?;
1042                controller.perform_release_steps()
1043            },
1044            Some(ControllerType::Byte(controller)) => {
1045                let controller = controller
1046                    .get()
1047                    .ok_or_else(|| Error::Type("Stream should have controller.".to_string()))?;
1048                controller.perform_release_steps()
1049            },
1050            None => Err(Error::Type("Stream should have controller.".to_string())),
1051        }
1052    }
1053
1054    /// Call into the pull steps of the controller,
1055    /// as part of
1056    /// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read>
1057    pub(crate) fn perform_pull_steps(
1058        &self,
1059        cx: SafeJSContext,
1060        read_request: &ReadRequest,
1061        can_gc: CanGc,
1062    ) {
1063        match self.controller.borrow().as_ref() {
1064            Some(ControllerType::Default(controller)) => controller
1065                .get()
1066                .expect("Stream should have controller.")
1067                .perform_pull_steps(read_request, can_gc),
1068            Some(ControllerType::Byte(controller)) => controller
1069                .get()
1070                .expect("Stream should have controller.")
1071                .perform_pull_steps(cx, read_request, can_gc),
1072            None => {
1073                unreachable!("Stream does not have a controller.");
1074            },
1075        }
1076    }
1077
1078    /// Call into the pull steps of the controller,
1079    /// as part of
1080    /// <https://streams.spec.whatwg.org/#readable-stream-byob-reader-read>
1081    pub(crate) fn perform_pull_into(
1082        &self,
1083        cx: SafeJSContext,
1084        read_into_request: &ReadIntoRequest,
1085        view: HeapBufferSource<ArrayBufferViewU8>,
1086        options: &ReadableStreamBYOBReaderReadOptions,
1087        can_gc: CanGc,
1088    ) {
1089        match self.controller.borrow().as_ref() {
1090            Some(ControllerType::Byte(controller)) => controller
1091                .get()
1092                .expect("Stream should have controller.")
1093                .perform_pull_into(cx, read_into_request, view, options, can_gc),
1094            _ => {
1095                unreachable!(
1096                    "Pulling a chunk from a stream with a default controller using a BYOB reader"
1097                )
1098            },
1099        }
1100    }
1101
1102    /// <https://streams.spec.whatwg.org/#readable-stream-add-read-request>
1103    pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
1104        match self.reader.borrow().as_ref() {
1105            Some(ReaderType::Default(reader)) => {
1106                let Some(reader) = reader.get() else {
1107                    panic!("Attempt to add a read request without having first acquired a reader.");
1108                };
1109
1110                // Assert: stream.[[state]] is "readable".
1111                assert!(self.is_readable());
1112
1113                // Append readRequest to stream.[[reader]].[[readRequests]].
1114                reader.add_read_request(read_request);
1115            },
1116            _ => {
1117                unreachable!("Adding a read request can only be done on a default reader.")
1118            },
1119        }
1120    }
1121
1122    /// <https://streams.spec.whatwg.org/#readable-stream-add-read-into-request>
1123    pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
1124        match self.reader.borrow().as_ref() {
1125            // Assert: stream.[[reader]] implements ReadableStreamBYOBReader.
1126            Some(ReaderType::BYOB(reader)) => {
1127                let Some(reader) = reader.get() else {
1128                    unreachable!(
1129                        "Attempt to add a read into request without having first acquired a reader."
1130                    );
1131                };
1132
1133                // Assert: stream.[[state]] is "readable" or "closed".
1134                assert!(self.is_readable() || self.is_closed());
1135
1136                // Append readRequest to stream.[[reader]].[[readIntoRequests]].
1137                reader.add_read_into_request(read_request);
1138            },
1139            _ => {
1140                unreachable!("Adding a read into request can only be done on a BYOB reader.")
1141            },
1142        }
1143    }
1144
1145    /// Endpoint to enqueue chunks directly from Rust.
1146    /// Note: in other use cases this call happens via the controller.
1147    pub(crate) fn enqueue_native(&self, bytes: Vec<u8>, can_gc: CanGc) {
1148        match self.controller.borrow().as_ref() {
1149            Some(ControllerType::Default(controller)) => controller
1150                .get()
1151                .expect("Stream should have controller.")
1152                .enqueue_native(bytes, can_gc),
1153            _ => {
1154                unreachable!(
1155                    "Enqueueing chunk to a stream from Rust on other than default controller"
1156                );
1157            },
1158        }
1159    }
1160
1161    /// <https://streams.spec.whatwg.org/#readable-stream-error>
1162    pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
1163        // Assert: stream.[[state]] is "readable".
1164        assert!(self.is_readable());
1165
1166        // Set stream.[[state]] to "errored".
1167        self.state.set(ReadableStreamState::Errored);
1168
1169        // Set stream.[[storedError]] to e.
1170        self.stored_error.set(e.get());
1171
1172        // Let reader be stream.[[reader]].
1173
1174        let default_reader = {
1175            let reader_ref = self.reader.borrow();
1176            match reader_ref.as_ref() {
1177                Some(ReaderType::Default(reader)) => reader.get(),
1178                _ => None,
1179            }
1180        };
1181
1182        if let Some(reader) = default_reader {
1183            // Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, e).
1184            reader.error(e, can_gc);
1185            return;
1186        }
1187
1188        let byob_reader = {
1189            let reader_ref = self.reader.borrow();
1190            match reader_ref.as_ref() {
1191                Some(ReaderType::BYOB(reader)) => reader.get(),
1192                _ => None,
1193            }
1194        };
1195
1196        if let Some(reader) = byob_reader {
1197            // Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e).
1198            reader.error_read_into_requests(e, can_gc);
1199        }
1200
1201        // If reader is undefined, return.
1202    }
1203
1204    /// <https://streams.spec.whatwg.org/#readablestream-storederror>
1205    pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
1206        handle_mut.set(self.stored_error.get());
1207    }
1208
1209    /// <https://streams.spec.whatwg.org/#readable-stream-error>
1210    /// Note: in other use cases this call happens via the controller.
1211    pub(crate) fn error_native(&self, error: Error, can_gc: CanGc) {
1212        let cx = GlobalScope::get_cx();
1213        rooted!(in(*cx) let mut error_val = UndefinedValue());
1214        error.to_jsval(cx, &self.global(), error_val.handle_mut(), can_gc);
1215        self.error(error_val.handle(), can_gc);
1216    }
1217
1218    /// Call into the controller's `Close` method.
1219    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-close>
1220    pub(crate) fn controller_close_native(&self, can_gc: CanGc) {
1221        match self.controller.borrow().as_ref() {
1222            Some(ControllerType::Default(controller)) => {
1223                let _ = controller
1224                    .get()
1225                    .expect("Stream should have controller.")
1226                    .Close(can_gc);
1227            },
1228            _ => {
1229                unreachable!("Native closing is only done on default controllers.")
1230            },
1231        }
1232    }
1233
1234    /// Returns a boolean reflecting whether the stream has all data in memory.
1235    /// Useful for native source integration only.
1236    pub(crate) fn in_memory(&self) -> bool {
1237        match self.controller.borrow().as_ref() {
1238            Some(ControllerType::Default(controller)) => controller
1239                .get()
1240                .expect("Stream should have controller.")
1241                .in_memory(),
1242            _ => {
1243                unreachable!(
1244                    "Checking if source is in memory for a stream with a non-default controller"
1245                )
1246            },
1247        }
1248    }
1249
1250    /// Return bytes for synchronous use, if the stream has all data in memory.
1251    /// Useful for native source integration only.
1252    pub(crate) fn get_in_memory_bytes(&self) -> Option<IpcSharedMemory> {
1253        match self.controller.borrow().as_ref() {
1254            Some(ControllerType::Default(controller)) => controller
1255                .get()
1256                .expect("Stream should have controller.")
1257                .get_in_memory_bytes()
1258                .as_deref()
1259                .map(IpcSharedMemory::from_bytes),
1260            _ => {
1261                unreachable!("Getting in-memory bytes for a stream with a non-default controller")
1262            },
1263        }
1264    }
1265
1266    /// Acquires a reader and locks the stream,
1267    /// must be done before `read_a_chunk`.
1268    /// Native call to
1269    /// <https://streams.spec.whatwg.org/#acquire-readable-stream-reader>
1270    pub(crate) fn acquire_default_reader(
1271        &self,
1272        can_gc: CanGc,
1273    ) -> Fallible<DomRoot<ReadableStreamDefaultReader>> {
1274        // Let reader be a new ReadableStreamDefaultReader.
1275        let reader = ReadableStreamDefaultReader::new(&self.global(), can_gc);
1276
1277        // Perform ? SetUpReadableStreamDefaultReader(reader, stream).
1278        reader.set_up(self, &self.global(), can_gc)?;
1279
1280        // Return reader.
1281        Ok(reader)
1282    }
1283
1284    /// <https://streams.spec.whatwg.org/#acquire-readable-stream-byob-reader>
1285    pub(crate) fn acquire_byob_reader(
1286        &self,
1287        can_gc: CanGc,
1288    ) -> Fallible<DomRoot<ReadableStreamBYOBReader>> {
1289        // Let reader be a new ReadableStreamBYOBReader.
1290        let reader = ReadableStreamBYOBReader::new(&self.global(), can_gc);
1291        // Perform ? SetUpReadableStreamBYOBReader(reader, stream).
1292        reader.set_up(self, &self.global(), can_gc)?;
1293
1294        // Return reader.
1295        Ok(reader)
1296    }
1297
1298    pub(crate) fn get_default_controller(&self) -> DomRoot<ReadableStreamDefaultController> {
1299        match self.controller.borrow().as_ref() {
1300            Some(ControllerType::Default(controller)) => {
1301                controller.get().expect("Stream should have controller.")
1302            },
1303            _ => {
1304                unreachable!(
1305                    "Getting default controller for a stream with a non-default controller"
1306                )
1307            },
1308        }
1309    }
1310
1311    pub(crate) fn get_default_reader(&self) -> DomRoot<ReadableStreamDefaultReader> {
1312        match self.reader.borrow().as_ref() {
1313            Some(ReaderType::Default(reader)) => reader.get().expect("Stream should have reader."),
1314            _ => {
1315                unreachable!("Getting default reader for a stream with a non-default reader")
1316            },
1317        }
1318    }
1319
1320    /// Read a chunk from the stream,
1321    /// must be called after `start_reading`,
1322    /// and before `stop_reading`.
1323    /// Native call to
1324    /// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read>
1325    pub(crate) fn read_a_chunk(&self, can_gc: CanGc) -> Rc<Promise> {
1326        match self.reader.borrow().as_ref() {
1327            Some(ReaderType::Default(reader)) => {
1328                let Some(reader) = reader.get() else {
1329                    unreachable!(
1330                        "Attempt to read stream chunk without having first acquired a reader."
1331                    );
1332                };
1333                reader.Read(can_gc)
1334            },
1335            _ => {
1336                unreachable!("Native reading of a chunk can only be done with a default reader.")
1337            },
1338        }
1339    }
1340
1341    /// Releases the lock on the reader,
1342    /// must be done after `start_reading`.
1343    /// Native call to
1344    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultreaderrelease>
1345    pub(crate) fn stop_reading(&self, can_gc: CanGc) {
1346        let reader_ref = self.reader.borrow();
1347
1348        match reader_ref.as_ref() {
1349            Some(ReaderType::Default(reader)) => {
1350                let Some(reader) = reader.get() else {
1351                    unreachable!("Attempt to stop reading without having first acquired a reader.");
1352                };
1353
1354                drop(reader_ref);
1355                reader.release(can_gc).expect("Reader release cannot fail.");
1356            },
1357            _ => {
1358                unreachable!("Native stop reading can only be done with a default reader.")
1359            },
1360        }
1361    }
1362
1363    /// <https://streams.spec.whatwg.org/#is-readable-stream-locked>
1364    pub(crate) fn is_locked(&self) -> bool {
1365        match self.reader.borrow().as_ref() {
1366            Some(ReaderType::Default(reader)) => reader.get().is_some(),
1367            Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1368            None => false,
1369        }
1370    }
1371
1372    pub(crate) fn is_disturbed(&self) -> bool {
1373        self.disturbed.get()
1374    }
1375
1376    pub(crate) fn set_is_disturbed(&self, disturbed: bool) {
1377        self.disturbed.set(disturbed);
1378    }
1379
1380    pub(crate) fn is_closed(&self) -> bool {
1381        self.state.get() == ReadableStreamState::Closed
1382    }
1383
1384    pub(crate) fn is_errored(&self) -> bool {
1385        self.state.get() == ReadableStreamState::Errored
1386    }
1387
1388    pub(crate) fn is_readable(&self) -> bool {
1389        self.state.get() == ReadableStreamState::Readable
1390    }
1391
1392    pub(crate) fn has_default_reader(&self) -> bool {
1393        match self.reader.borrow().as_ref() {
1394            Some(ReaderType::Default(reader)) => reader.get().is_some(),
1395            _ => false,
1396        }
1397    }
1398
1399    pub(crate) fn has_byob_reader(&self) -> bool {
1400        match self.reader.borrow().as_ref() {
1401            Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1402            _ => false,
1403        }
1404    }
1405
1406    pub(crate) fn has_byte_controller(&self) -> bool {
1407        match self.controller.borrow().as_ref() {
1408            Some(ControllerType::Byte(controller)) => controller.get().is_some(),
1409            _ => false,
1410        }
1411    }
1412
1413    /// <https://streams.spec.whatwg.org/#readable-stream-get-num-read-requests>
1414    pub(crate) fn get_num_read_requests(&self) -> usize {
1415        match self.reader.borrow().as_ref() {
1416            Some(ReaderType::Default(reader)) => {
1417                let reader = reader
1418                    .get()
1419                    .expect("Stream must have a reader when getting the number of read requests.");
1420                reader.get_num_read_requests()
1421            },
1422            _ => unreachable!(
1423                "Stream must have a default reader when get num read requests is called into."
1424            ),
1425        }
1426    }
1427
1428    /// <https://streams.spec.whatwg.org/#readable-stream-get-num-read-into-requests>
1429    pub(crate) fn get_num_read_into_requests(&self) -> usize {
1430        assert!(self.has_byob_reader());
1431
1432        match self.reader.borrow().as_ref() {
1433            Some(ReaderType::BYOB(reader)) => {
1434                let Some(reader) = reader.get() else {
1435                    unreachable!(
1436                        "Stream must have a reader when get num read into requests is called into."
1437                    );
1438                };
1439                reader.get_num_read_into_requests()
1440            },
1441            _ => {
1442                unreachable!(
1443                    "Stream must have a BYOB reader when get num read into requests is called into."
1444                );
1445            },
1446        }
1447    }
1448
1449    /// <https://streams.spec.whatwg.org/#readable-stream-fulfill-read-request>
1450    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
1451    pub(crate) fn fulfill_read_request(&self, chunk: SafeHandleValue, done: bool, can_gc: CanGc) {
1452        // step 1 - Assert: ! ReadableStreamHasDefaultReader(stream) is true.
1453        assert!(self.has_default_reader());
1454
1455        match self.reader.borrow().as_ref() {
1456            Some(ReaderType::Default(reader)) => {
1457                // step 2 - Let reader be stream.[[reader]].
1458                let reader = reader
1459                    .get()
1460                    .expect("Stream must have a reader when a read request is fulfilled.");
1461                // step 3 - Assert: reader.[[readRequests]] is not empty.
1462                assert_ne!(reader.get_num_read_requests(), 0);
1463                // step 4 & 5
1464                // Let readRequest be reader.[[readRequests]][0]. & Remove readRequest from reader.[[readRequests]].
1465                let request = reader.remove_read_request();
1466
1467                if done {
1468                    // step 6 - If done is true, perform readRequest’s close steps.
1469                    request.close_steps(can_gc);
1470                } else {
1471                    // step 7 - Otherwise, perform readRequest’s chunk steps, given chunk.
1472                    let result = RootedTraceableBox::new(Heap::default());
1473                    result.set(*chunk);
1474                    request.chunk_steps(result, can_gc);
1475                }
1476            },
1477            _ => {
1478                unreachable!(
1479                    "Stream must have a default reader when fulfill read requests is called into."
1480                );
1481            },
1482        }
1483    }
1484
1485    /// <https://streams.spec.whatwg.org/#readable-stream-fulfill-read-into-request>
1486    pub(crate) fn fulfill_read_into_request(
1487        &self,
1488        chunk: SafeHandleValue,
1489        done: bool,
1490        can_gc: CanGc,
1491    ) {
1492        // Assert: ! ReadableStreamHasBYOBReader(stream) is true.
1493        assert!(self.has_byob_reader());
1494
1495        // Let reader be stream.[[reader]].
1496        match self.reader.borrow().as_ref() {
1497            Some(ReaderType::BYOB(reader)) => {
1498                let Some(reader) = reader.get() else {
1499                    unreachable!(
1500                        "Stream must have a reader when a read into request is fulfilled."
1501                    );
1502                };
1503
1504                // Assert: reader.[[readIntoRequests]] is not empty.
1505                assert!(reader.get_num_read_into_requests() > 0);
1506
1507                // Let readIntoRequest be reader.[[readIntoRequests]][0].
1508                // Remove readIntoRequest from reader.[[readIntoRequests]].
1509                let read_into_request = reader.remove_read_into_request();
1510
1511                // If done is true, perform readIntoRequest’s close steps, given chunk.
1512                let result = RootedTraceableBox::new(Heap::default());
1513                if done {
1514                    result.set(*chunk);
1515                    read_into_request.close_steps(Some(result), can_gc);
1516                } else {
1517                    // Otherwise, perform readIntoRequest’s chunk steps, given chunk.
1518                    result.set(*chunk);
1519                    read_into_request.chunk_steps(result, can_gc);
1520                }
1521            },
1522            _ => {
1523                unreachable!(
1524                    "Stream must have a BYOB reader when fulfill read into requests is called into."
1525                );
1526            },
1527        };
1528    }
1529
1530    /// <https://streams.spec.whatwg.org/#readable-stream-close>
1531    pub(crate) fn close(&self, can_gc: CanGc) {
1532        // Assert: stream.[[state]] is "readable".
1533        assert!(self.is_readable());
1534        // Set stream.[[state]] to "closed".
1535        self.state.set(ReadableStreamState::Closed);
1536        // Let reader be stream.[[reader]].
1537
1538        // NOTE: do not hold the RefCell borrow across reader.close(),
1539        // or release() will panic when it tries to mut-borrow stream.reader.
1540        // So we pull out the underlying DOM reader in a local, then drop the borrow.
1541        let default_reader = {
1542            let reader_ref = self.reader.borrow();
1543            match reader_ref.as_ref() {
1544                Some(ReaderType::Default(reader)) => reader.get(),
1545                _ => None,
1546            }
1547        };
1548
1549        if let Some(reader) = default_reader {
1550            // steps 5 & 6 for a default reader
1551            reader.close(can_gc);
1552            return;
1553        }
1554
1555        // Same for BYOB reader.
1556        let byob_reader = {
1557            let reader_ref = self.reader.borrow();
1558            match reader_ref.as_ref() {
1559                Some(ReaderType::BYOB(reader)) => reader.get(),
1560                _ => None,
1561            }
1562        };
1563
1564        if let Some(reader) = byob_reader {
1565            // steps 5 & 6 for a BYOB reader
1566            reader.close(can_gc);
1567        }
1568
1569        // If reader is undefined, return.
1570    }
1571
1572    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>
1573    pub(crate) fn cancel(
1574        &self,
1575        cx: SafeJSContext,
1576        global: &GlobalScope,
1577        reason: SafeHandleValue,
1578        can_gc: CanGc,
1579    ) -> Rc<Promise> {
1580        // Set stream.[[disturbed]] to true.
1581        self.disturbed.set(true);
1582
1583        // If stream.[[state]] is "closed", return a promise resolved with undefined.
1584        if self.is_closed() {
1585            return Promise::new_resolved(global, cx, (), can_gc);
1586        }
1587        // If stream.[[state]] is "errored", return a promise rejected with stream.[[storedError]].
1588        if self.is_errored() {
1589            let promise = Promise::new(global, can_gc);
1590            rooted!(in(*cx) let mut rval = UndefinedValue());
1591            self.stored_error
1592                .safe_to_jsval(cx, rval.handle_mut(), can_gc);
1593            promise.reject_native(&rval.handle(), can_gc);
1594            return promise;
1595        }
1596        // Perform ! ReadableStreamClose(stream).
1597        self.close(can_gc);
1598
1599        // If reader is not undefined and reader implements ReadableStreamBYOBReader,
1600        let byob_reader = {
1601            let reader_ref = self.reader.borrow();
1602            match reader_ref.as_ref() {
1603                Some(ReaderType::BYOB(reader)) => reader.get(),
1604                _ => None,
1605            }
1606        };
1607
1608        if let Some(reader) = byob_reader {
1609            // step 6.1, 6.2 & 6.3 of https://streams.spec.whatwg.org/#readable-stream-cancel
1610            reader.cancel(can_gc);
1611        }
1612
1613        // Let sourceCancelPromise be ! stream.[[controller]].[[CancelSteps]](reason).
1614
1615        let source_cancel_promise = match self.controller.borrow().as_ref() {
1616            Some(ControllerType::Default(controller)) => controller
1617                .get()
1618                .expect("Stream should have controller.")
1619                .perform_cancel_steps(cx, global, reason, can_gc),
1620            Some(ControllerType::Byte(controller)) => controller
1621                .get()
1622                .expect("Stream should have controller.")
1623                .perform_cancel_steps(cx, global, reason, can_gc),
1624            None => {
1625                panic!("Stream does not have a controller.");
1626            },
1627        };
1628
1629        // Create a new promise,
1630        // and setup a handler in order to react to the fulfillment of sourceCancelPromise.
1631        let global = self.global();
1632        let result_promise = Promise::new(&global, can_gc);
1633        let fulfillment_handler = Box::new(SourceCancelPromiseFulfillmentHandler {
1634            result: result_promise.clone(),
1635        });
1636        let rejection_handler = Box::new(SourceCancelPromiseRejectionHandler {
1637            result: result_promise.clone(),
1638        });
1639        let handler = PromiseNativeHandler::new(
1640            &global,
1641            Some(fulfillment_handler),
1642            Some(rejection_handler),
1643            can_gc,
1644        );
1645        let realm = enter_realm(&*global);
1646        let comp = InRealm::Entered(&realm);
1647        source_cancel_promise.append_native_handler(&handler, comp, can_gc);
1648
1649        // Return the result of reacting to sourceCancelPromise
1650        // with a fulfillment step that returns undefined.
1651        result_promise
1652    }
1653
1654    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
1655    pub(crate) fn set_reader(&self, new_reader: Option<ReaderType>) {
1656        *self.reader.borrow_mut() = new_reader;
1657    }
1658
1659    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee>
1660    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
1661    fn default_tee(
1662        &self,
1663        clone_for_branch_2: bool,
1664        can_gc: CanGc,
1665    ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1666        // Assert: stream implements ReadableStream.
1667
1668        // Assert: cloneForBranch2 is a boolean.
1669        let clone_for_branch_2 = Rc::new(Cell::new(clone_for_branch_2));
1670
1671        // Let reader be ? AcquireReadableStreamDefaultReader(stream).
1672        let reader = self.acquire_default_reader(can_gc)?;
1673        self.set_reader(Some(ReaderType::Default(MutNullableDom::new(Some(
1674            &reader,
1675        )))));
1676
1677        // Let reading be false.
1678        let reading = Rc::new(Cell::new(false));
1679        // Let readAgain be false.
1680        let read_again = Rc::new(Cell::new(false));
1681        // Let canceled1 be false.
1682        let canceled_1 = Rc::new(Cell::new(false));
1683        // Let canceled2 be false.
1684        let canceled_2 = Rc::new(Cell::new(false));
1685
1686        // Let reason1 be undefined.
1687        let reason_1 = Rc::new(Heap::boxed(UndefinedValue()));
1688        // Let reason2 be undefined.
1689        let reason_2 = Rc::new(Heap::boxed(UndefinedValue()));
1690        // Let cancelPromise be a new promise.
1691        let cancel_promise = Promise::new(&self.global(), can_gc);
1692
1693        let tee_source_1 = DefaultTeeUnderlyingSource::new(
1694            &reader,
1695            self,
1696            reading.clone(),
1697            read_again.clone(),
1698            canceled_1.clone(),
1699            canceled_2.clone(),
1700            clone_for_branch_2.clone(),
1701            reason_1.clone(),
1702            reason_2.clone(),
1703            cancel_promise.clone(),
1704            TeeCancelAlgorithm::Cancel1Algorithm,
1705            can_gc,
1706        );
1707
1708        let underlying_source_type_branch_1 =
1709            UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_1));
1710
1711        let tee_source_2 = DefaultTeeUnderlyingSource::new(
1712            &reader,
1713            self,
1714            reading,
1715            read_again,
1716            canceled_1.clone(),
1717            canceled_2.clone(),
1718            clone_for_branch_2,
1719            reason_1,
1720            reason_2,
1721            cancel_promise.clone(),
1722            TeeCancelAlgorithm::Cancel2Algorithm,
1723            can_gc,
1724        );
1725
1726        let underlying_source_type_branch_2 =
1727            UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_2));
1728
1729        // Set branch_1 to ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel1Algorithm).
1730        let branch_1 = create_readable_stream(
1731            &self.global(),
1732            underlying_source_type_branch_1,
1733            None,
1734            None,
1735            can_gc,
1736        );
1737        tee_source_1.set_branch_1(&branch_1);
1738        tee_source_2.set_branch_1(&branch_1);
1739
1740        // Set branch_2 to ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel2Algorithm).
1741        let branch_2 = create_readable_stream(
1742            &self.global(),
1743            underlying_source_type_branch_2,
1744            None,
1745            None,
1746            can_gc,
1747        );
1748        tee_source_1.set_branch_2(&branch_2);
1749        tee_source_2.set_branch_2(&branch_2);
1750
1751        // Upon rejection of reader.[[closedPromise]] with reason r,
1752        reader.append_native_handler_to_closed_promise(
1753            &branch_1,
1754            &branch_2,
1755            canceled_1,
1756            canceled_2,
1757            cancel_promise,
1758            can_gc,
1759        );
1760
1761        // Return « branch_1, branch_2 ».
1762        Ok(vec![branch_1, branch_2])
1763    }
1764
1765    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
1766    #[allow(clippy::too_many_arguments)]
1767    pub(crate) fn pipe_to(
1768        &self,
1769        cx: SafeJSContext,
1770        global: &GlobalScope,
1771        dest: &WritableStream,
1772        prevent_close: bool,
1773        prevent_abort: bool,
1774        prevent_cancel: bool,
1775        signal: Option<&AbortSignal>,
1776        realm: InRealm,
1777        can_gc: CanGc,
1778    ) -> Rc<Promise> {
1779        // Assert: source implements ReadableStream.
1780        // Assert: dest implements WritableStream.
1781        // Assert: prevent_close, prevent_abort, and prevent_cancel are all booleans.
1782        // Done with method signature types.
1783
1784        // If signal was not given, let signal be undefined.
1785        // Assert: either signal is undefined, or signal implements AbortSignal.
1786        // Note: done with the `signal` argument.
1787
1788        // Assert: ! IsReadableStreamLocked(source) is false.
1789        assert!(!self.is_locked());
1790
1791        // Assert: ! IsWritableStreamLocked(dest) is false.
1792        assert!(!dest.is_locked());
1793
1794        // If source.[[controller]] implements ReadableByteStreamController,
1795        // let reader be either ! AcquireReadableStreamBYOBReader(source)
1796        // or ! AcquireReadableStreamDefaultReader(source),
1797        // at the user agent’s discretion.
1798        // Note: for now only using default readers.
1799
1800        // Otherwise, let reader be ! AcquireReadableStreamDefaultReader(source).
1801        let reader = self
1802            .acquire_default_reader(can_gc)
1803            .expect("Acquiring a default reader for pipe_to cannot fail");
1804
1805        // Let writer be ! AcquireWritableStreamDefaultWriter(dest).
1806        let writer = dest
1807            .aquire_default_writer(cx, global, can_gc)
1808            .expect("Acquiring a default writer for pipe_to cannot fail");
1809
1810        // Set source.[[disturbed]] to true.
1811        self.disturbed.set(true);
1812
1813        // Let shuttingDown be false.
1814        // Done below with default.
1815
1816        // Let promise be a new promise.
1817        let promise = Promise::new(global, can_gc);
1818
1819        // In parallel, but not really, using reader and writer, read all chunks from source and write them to dest.
1820        rooted!(in(*cx) let pipe_to = PipeTo {
1821            reader: Dom::from_ref(&reader),
1822            writer: Dom::from_ref(&writer),
1823            pending_writes: Default::default(),
1824            state: Default::default(),
1825            prevent_abort,
1826            prevent_cancel,
1827            prevent_close,
1828            shutting_down: Default::default(),
1829            abort_reason: Default::default(),
1830            shutdown_error: Default::default(),
1831            shutdown_action_promise:  Default::default(),
1832            result_promise: promise.clone(),
1833        });
1834
1835        // If signal is not undefined,
1836        // Note: moving the steps to here, so that the `PipeTo` is available.
1837        if let Some(signal) = signal {
1838            // Let abortAlgorithm be the following steps:
1839            // Note: steps are implemented at call site.
1840            rooted!(in(*cx) let abort_algorithm = AbortAlgorithm::StreamPiping(pipe_to.clone()));
1841
1842            // If signal is aborted, perform abortAlgorithm and return promise.
1843            if signal.aborted() {
1844                signal.run_abort_algorithm(cx, global, &abort_algorithm, realm, can_gc);
1845                return promise;
1846            }
1847
1848            // Add abortAlgorithm to signal.
1849            signal.add(&abort_algorithm);
1850        }
1851
1852        // Note: perfom checks now, since streams can start as closed or errored.
1853        pipe_to.check_and_propagate_errors_forward(cx, global, realm, can_gc);
1854        pipe_to.check_and_propagate_errors_backward(cx, global, realm, can_gc);
1855        pipe_to.check_and_propagate_closing_forward(cx, global, realm, can_gc);
1856        pipe_to.check_and_propagate_closing_backward(cx, global, realm, can_gc);
1857
1858        // If we are not closed or errored,
1859        if *pipe_to.state.borrow() == PipeToState::Starting {
1860            // Start the pipe, by waiting on the writer being ready for a chunk.
1861            pipe_to.wait_for_writer_ready(global, realm, can_gc);
1862        }
1863
1864        // Return promise.
1865        promise
1866    }
1867
1868    /// <https://streams.spec.whatwg.org/#readable-stream-tee>
1869    fn tee(
1870        &self,
1871        clone_for_branch_2: bool,
1872        can_gc: CanGc,
1873    ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1874        // Assert: stream implements ReadableStream.
1875        // Assert: cloneForBranch2 is a boolean.
1876
1877        match self.controller.borrow().as_ref() {
1878            Some(ControllerType::Default(_)) => {
1879                // Return ? ReadableStreamDefaultTee(stream, cloneForBranch2).
1880                self.default_tee(clone_for_branch_2, can_gc)
1881            },
1882            Some(ControllerType::Byte(_)) => {
1883                // If stream.[[controller]] implements ReadableByteStreamController,
1884                // return ? ReadableByteStreamTee(stream).
1885                Err(Error::Type(
1886                    "Teeing is not yet supported for byte streams".to_owned(),
1887                ))
1888            },
1889            None => {
1890                unreachable!("Stream should have a controller.");
1891            },
1892        }
1893    }
1894
1895    /// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller-from-underlying-source>
1896    pub(crate) fn set_up_byte_controller(
1897        &self,
1898        global: &GlobalScope,
1899        underlying_source_dict: JsUnderlyingSource,
1900        underlying_source_handle: SafeHandleObject,
1901        stream: DomRoot<ReadableStream>,
1902        strategy_hwm: f64,
1903        can_gc: CanGc,
1904    ) -> Fallible<()> {
1905        // Let pullAlgorithm be an algorithm that returns a promise resolved with undefined.
1906        // Let cancelAlgorithm be an algorithm that returns a promise resolved with undefined.
1907        // If underlyingSourceDict["start"] exists, then set startAlgorithm to an algorithm which returns the result
1908        // of invoking underlyingSourceDict["start"] with argument list « controller »
1909        // and callback this value underlyingSource.
1910        // If underlyingSourceDict["pull"] exists, then set pullAlgorithm to an algorithm which returns the result
1911        // of invoking underlyingSourceDict["pull"] with argument list « controller »
1912        // and callback this value underlyingSource.
1913        // If underlyingSourceDict["cancel"] exists, then set cancelAlgorithm to an algorithm which takes an
1914        // argument reason and returns the result of invoking underlyingSourceDict["cancel"] with argument list
1915        // « reason » and callback this value underlyingSource.
1916
1917        // Let autoAllocateChunkSize be underlyingSourceDict["autoAllocateChunkSize"],
1918        // if it exists, or undefined otherwise.
1919        // If autoAllocateChunkSize is 0, then throw a TypeError exception.
1920        if let Some(0) = underlying_source_dict.autoAllocateChunkSize {
1921            return Err(Error::Type("autoAllocateChunkSize cannot be 0".to_owned()));
1922        }
1923
1924        let controller = ReadableByteStreamController::new(
1925            UnderlyingSourceType::Js(underlying_source_dict, Heap::default()),
1926            strategy_hwm,
1927            global,
1928            can_gc,
1929        );
1930
1931        // Note: this must be done before `setup`,
1932        // otherwise `thisOb` is null in the start callback.
1933        controller.set_underlying_source_this_object(underlying_source_handle);
1934
1935        // Perform ? SetUpReadableByteStreamController(stream, controller, startAlgorithm,
1936        // pullAlgorithm, cancelAlgorithm, highWaterMark, autoAllocateChunkSize).
1937        controller.setup(global, stream, can_gc)
1938    }
1939
1940    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
1941    pub(crate) fn setup_cross_realm_transform_readable(
1942        &self,
1943        cx: SafeJSContext,
1944        port: &MessagePort,
1945        can_gc: CanGc,
1946    ) {
1947        let port_id = port.message_port_id();
1948        let global = self.global();
1949
1950        // Perform ! InitializeReadableStream(stream).
1951        // Done in `new_inherited`.
1952
1953        // Let sizeAlgorithm be an algorithm that returns 1.
1954        let size_algorithm = extract_size_algorithm(&QueuingStrategy::default(), can_gc);
1955
1956        // Note: other algorithms defined in the underlying source container.
1957
1958        // Let controller be a new ReadableStreamDefaultController.
1959        let controller = ReadableStreamDefaultController::new(
1960            &self.global(),
1961            UnderlyingSourceType::Transfer(Dom::from_ref(port)),
1962            0.,
1963            size_algorithm,
1964            can_gc,
1965        );
1966
1967        // Add a handler for port’s message event with the following steps:
1968        // Add a handler for port’s messageerror event with the following steps:
1969        rooted!(in(*cx) let cross_realm_transform_readable = CrossRealmTransformReadable {
1970            controller: Dom::from_ref(&controller),
1971        });
1972        global.note_cross_realm_transform_readable(&cross_realm_transform_readable, port_id);
1973
1974        // Enable port’s port message queue.
1975        port.Start(can_gc);
1976
1977        // Perform ! SetUpReadableStreamDefaultController
1978        controller
1979            .setup(DomRoot::from_ref(self), can_gc)
1980            .expect("Setting up controller for transfer cannot fail.");
1981    }
1982}
1983
1984impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
1985    /// <https://streams.spec.whatwg.org/#rs-constructor>
1986    fn Constructor(
1987        cx: SafeJSContext,
1988        global: &GlobalScope,
1989        proto: Option<SafeHandleObject>,
1990        can_gc: CanGc,
1991        underlying_source: Option<*mut JSObject>,
1992        strategy: &QueuingStrategy,
1993    ) -> Fallible<DomRoot<Self>> {
1994        // If underlyingSource is missing, set it to null.
1995        rooted!(in(*cx) let underlying_source_obj = underlying_source.unwrap_or(ptr::null_mut()));
1996        // Let underlyingSourceDict be underlyingSource,
1997        // converted to an IDL value of type UnderlyingSource.
1998        let underlying_source_dict = if !underlying_source_obj.is_null() {
1999            rooted!(in(*cx) let obj_val = ObjectValue(underlying_source_obj.get()));
2000            match JsUnderlyingSource::new(cx, obj_val.handle(), can_gc) {
2001                Ok(ConversionResult::Success(val)) => val,
2002                Ok(ConversionResult::Failure(error)) => return Err(Error::Type(error.to_string())),
2003                _ => {
2004                    return Err(Error::JSFailed);
2005                },
2006            }
2007        } else {
2008            JsUnderlyingSource::empty()
2009        };
2010
2011        // Perform ! InitializeReadableStream(this).
2012        let stream = ReadableStream::new_with_proto(global, proto, can_gc);
2013
2014        if underlying_source_dict.type_.is_some() {
2015            // If strategy["size"] exists, throw a RangeError exception.
2016            if strategy.size.is_some() {
2017                return Err(Error::Range(
2018                    "size is not supported for byte streams".to_owned(),
2019                ));
2020            }
2021
2022            // Let highWaterMark be ? ExtractHighWaterMark(strategy, 0).
2023            let strategy_hwm = extract_high_water_mark(strategy, 0.0)?;
2024
2025            // Perform ? SetUpReadableByteStreamControllerFromUnderlyingSource(this,
2026            // underlyingSource, underlyingSourceDict, highWaterMark).
2027            stream.set_up_byte_controller(
2028                global,
2029                underlying_source_dict,
2030                underlying_source_obj.handle(),
2031                stream.clone(),
2032                strategy_hwm,
2033                can_gc,
2034            )?;
2035        } else {
2036            // Let highWaterMark be ? ExtractHighWaterMark(strategy, 1).
2037            let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
2038
2039            // Let sizeAlgorithm be ! ExtractSizeAlgorithm(strategy).
2040            let size_algorithm = extract_size_algorithm(strategy, can_gc);
2041
2042            let controller = ReadableStreamDefaultController::new(
2043                global,
2044                UnderlyingSourceType::Js(underlying_source_dict, Heap::default()),
2045                high_water_mark,
2046                size_algorithm,
2047                can_gc,
2048            );
2049
2050            // Note: this must be done before `setup`,
2051            // otherwise `thisOb` is null in the start callback.
2052            controller.set_underlying_source_this_object(underlying_source_obj.handle());
2053
2054            // Perform ? SetUpReadableStreamDefaultControllerFromUnderlyingSource
2055            controller.setup(stream.clone(), can_gc)?;
2056        };
2057
2058        Ok(stream)
2059    }
2060
2061    /// <https://streams.spec.whatwg.org/#rs-locked>
2062    fn Locked(&self) -> bool {
2063        self.is_locked()
2064    }
2065
2066    /// <https://streams.spec.whatwg.org/#rs-cancel>
2067    fn Cancel(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
2068        let global = self.global();
2069        if self.is_locked() {
2070            // If ! IsReadableStreamLocked(this) is true,
2071            // return a promise rejected with a TypeError exception.
2072            let promise = Promise::new(&global, can_gc);
2073            promise.reject_error(Error::Type("stream is locked".to_owned()), can_gc);
2074            promise
2075        } else {
2076            // Return ! ReadableStreamCancel(this, reason).
2077            self.cancel(cx, &global, reason, can_gc)
2078        }
2079    }
2080
2081    /// <https://streams.spec.whatwg.org/#rs-get-reader>
2082    fn GetReader(
2083        &self,
2084        options: &ReadableStreamGetReaderOptions,
2085        can_gc: CanGc,
2086    ) -> Fallible<ReadableStreamReader> {
2087        // 1, If options["mode"] does not exist, return ? AcquireReadableStreamDefaultReader(this).
2088        if options.mode.is_none() {
2089            return Ok(ReadableStreamReader::ReadableStreamDefaultReader(
2090                self.acquire_default_reader(can_gc)?,
2091            ));
2092        }
2093        // 2. Assert: options["mode"] is "byob".
2094        assert!(options.mode.unwrap() == ReadableStreamReaderMode::Byob);
2095
2096        // 3. Return ? AcquireReadableStreamBYOBReader(this).
2097        Ok(ReadableStreamReader::ReadableStreamBYOBReader(
2098            self.acquire_byob_reader(can_gc)?,
2099        ))
2100    }
2101
2102    /// <https://streams.spec.whatwg.org/#rs-tee>
2103    fn Tee(&self, can_gc: CanGc) -> Fallible<Vec<DomRoot<ReadableStream>>> {
2104        // Return ? ReadableStreamTee(this, false).
2105        self.tee(false, can_gc)
2106    }
2107
2108    /// <https://streams.spec.whatwg.org/#rs-pipe-to>
2109    fn PipeTo(
2110        &self,
2111        destination: &WritableStream,
2112        options: &StreamPipeOptions,
2113        realm: InRealm,
2114        can_gc: CanGc,
2115    ) -> Rc<Promise> {
2116        let cx = GlobalScope::get_cx();
2117        let global = self.global();
2118
2119        // If ! IsReadableStreamLocked(this) is true,
2120        if self.is_locked() {
2121            // return a promise rejected with a TypeError exception.
2122            let promise = Promise::new(&global, can_gc);
2123            promise.reject_error(Error::Type("Source stream is locked".to_owned()), can_gc);
2124            return promise;
2125        }
2126
2127        // If ! IsWritableStreamLocked(destination) is true,
2128        if destination.is_locked() {
2129            // return a promise rejected with a TypeError exception.
2130            let promise = Promise::new(&global, can_gc);
2131            promise.reject_error(
2132                Error::Type("Destination stream is locked".to_owned()),
2133                can_gc,
2134            );
2135            return promise;
2136        }
2137
2138        // Let signal be options["signal"] if it exists, or undefined otherwise.
2139        let signal = options.signal.as_deref();
2140
2141        // Return ! ReadableStreamPipeTo.
2142        self.pipe_to(
2143            cx,
2144            &global,
2145            destination,
2146            options.preventClose,
2147            options.preventAbort,
2148            options.preventCancel,
2149            signal,
2150            realm,
2151            can_gc,
2152        )
2153    }
2154
2155    /// <https://streams.spec.whatwg.org/#rs-pipe-through>
2156    fn PipeThrough(
2157        &self,
2158        transform: &ReadableWritablePair,
2159        options: &StreamPipeOptions,
2160        realm: InRealm,
2161        can_gc: CanGc,
2162    ) -> Fallible<DomRoot<ReadableStream>> {
2163        let global = self.global();
2164        let cx = GlobalScope::get_cx();
2165
2166        // If ! IsReadableStreamLocked(this) is true, throw a TypeError exception.
2167        if self.is_locked() {
2168            return Err(Error::Type("Source stream is locked".to_owned()));
2169        }
2170
2171        // If ! IsWritableStreamLocked(transform["writable"]) is true, throw a TypeError exception.
2172        if transform.writable.is_locked() {
2173            return Err(Error::Type("Destination stream is locked".to_owned()));
2174        }
2175
2176        // Let signal be options["signal"] if it exists, or undefined otherwise.
2177        let signal = options.signal.as_deref();
2178
2179        // Let promise be ! ReadableStreamPipeTo(this, transform["writable"],
2180        // options["preventClose"], options["preventAbort"], options["preventCancel"], signal).
2181        let promise = self.pipe_to(
2182            cx,
2183            &global,
2184            &transform.writable,
2185            options.preventClose,
2186            options.preventAbort,
2187            options.preventCancel,
2188            signal,
2189            realm,
2190            can_gc,
2191        );
2192
2193        // Set promise.[[PromiseIsHandled]] to true.
2194        promise.set_promise_is_handled();
2195
2196        // Return transform["readable"].
2197        Ok(transform.readable.clone())
2198    }
2199}
2200
2201#[expect(unsafe_code)]
2202/// The initial steps for the message handler for both readable and writable cross realm transforms.
2203/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2204/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
2205pub(crate) unsafe fn get_type_and_value_from_message(
2206    cx: SafeJSContext,
2207    data: SafeHandleValue,
2208    value: SafeMutableHandleValue,
2209    can_gc: CanGc,
2210) -> DOMString {
2211    // Let data be the data of the message.
2212    // Note: we are passed the data as argument,
2213    // which originates in the return value of `structuredclone::read`.
2214
2215    // Assert: data is an Object.
2216    assert!(data.is_object());
2217    rooted!(in(*cx) let data_object = data.to_object());
2218
2219    // Let type be ! Get(data, "type").
2220    rooted!(in(*cx) let mut type_ = UndefinedValue());
2221    unsafe {
2222        get_dictionary_property(
2223            *cx,
2224            data_object.handle(),
2225            "type",
2226            type_.handle_mut(),
2227            can_gc,
2228        )
2229    }
2230    .expect("Getting the type should not fail.");
2231
2232    // Let value be ! Get(data, "value").
2233    unsafe { get_dictionary_property(*cx, data_object.handle(), "value", value, can_gc) }
2234        .expect("Getting the value should not fail.");
2235
2236    // Assert: type is a String.
2237    let result =
2238        DOMString::safe_from_jsval(cx, type_.handle(), StringificationBehavior::Empty, can_gc)
2239            .expect("The type of the message should be a string");
2240    let ConversionResult::Success(type_string) = result else {
2241        unreachable!("The type of the message should be a string");
2242    };
2243
2244    type_string
2245}
2246
2247impl js::gc::Rootable for CrossRealmTransformReadable {}
2248
2249/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2250/// A wrapper to handle `message` and `messageerror` events
2251/// for the port used by the transfered stream.
2252#[derive(Clone, JSTraceable, MallocSizeOf)]
2253#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
2254pub(crate) struct CrossRealmTransformReadable {
2255    /// The controller used in the algorithm.
2256    controller: Dom<ReadableStreamDefaultController>,
2257}
2258
2259impl CrossRealmTransformReadable {
2260    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2261    /// Add a handler for port’s message event with the following steps:
2262    #[expect(unsafe_code)]
2263    pub(crate) fn handle_message(
2264        &self,
2265        cx: SafeJSContext,
2266        global: &GlobalScope,
2267        port: &MessagePort,
2268        message: SafeHandleValue,
2269        _realm: InRealm,
2270        can_gc: CanGc,
2271    ) {
2272        rooted!(in(*cx) let mut value = UndefinedValue());
2273        let type_string =
2274            unsafe { get_type_and_value_from_message(cx, message, value.handle_mut(), can_gc) };
2275
2276        // If type is "chunk",
2277        if type_string == "chunk" {
2278            // Perform ! ReadableStreamDefaultControllerEnqueue(controller, value).
2279            self.controller
2280                .enqueue(cx, value.handle(), can_gc)
2281                .expect("Enqueing a chunk should not fail.");
2282        }
2283
2284        // Otherwise, if type is "close",
2285        if type_string == "close" {
2286            // Perform ! ReadableStreamDefaultControllerClose(controller).
2287            self.controller.close(can_gc);
2288
2289            // Disentangle port.
2290            global.disentangle_port(port, can_gc);
2291        }
2292
2293        // Otherwise, if type is "error",
2294        if type_string == "error" {
2295            // Perform ! ReadableStreamDefaultControllerError(controller, value).
2296            self.controller.error(value.handle(), can_gc);
2297
2298            // Disentangle port.
2299            global.disentangle_port(port, can_gc);
2300        }
2301    }
2302
2303    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
2304    /// Add a handler for port’s messageerror event with the following steps:
2305    pub(crate) fn handle_error(
2306        &self,
2307        cx: SafeJSContext,
2308        global: &GlobalScope,
2309        port: &MessagePort,
2310        _realm: InRealm,
2311        can_gc: CanGc,
2312    ) {
2313        // Let error be a new "DataCloneError" DOMException.
2314        let error = DOMException::new(global, DOMErrorName::DataCloneError, can_gc);
2315        rooted!(in(*cx) let mut rooted_error = UndefinedValue());
2316        error.safe_to_jsval(cx, rooted_error.handle_mut(), can_gc);
2317
2318        // Perform ! CrossRealmTransformSendError(port, error).
2319        port.cross_realm_transform_send_error(rooted_error.handle(), can_gc);
2320
2321        // Perform ! ReadableStreamDefaultControllerError(controller, error).
2322        self.controller.error(rooted_error.handle(), can_gc);
2323
2324        // Disentangle port.
2325        global.disentangle_port(port, can_gc);
2326    }
2327}
2328
2329#[expect(unsafe_code)]
2330/// Get the `done` property of an object that a read promise resolved to.
2331pub(crate) fn get_read_promise_done(
2332    cx: SafeJSContext,
2333    v: &SafeHandleValue,
2334    can_gc: CanGc,
2335) -> Result<bool, Error> {
2336    if !v.is_object() {
2337        return Err(Error::Type("Unknown format for done property.".to_string()));
2338    }
2339    unsafe {
2340        rooted!(in(*cx) let object = v.to_object());
2341        rooted!(in(*cx) let mut done = UndefinedValue());
2342        match get_dictionary_property(*cx, object.handle(), "done", done.handle_mut(), can_gc) {
2343            Ok(true) => match bool::safe_from_jsval(cx, done.handle(), (), can_gc) {
2344                Ok(ConversionResult::Success(val)) => Ok(val),
2345                Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.to_string())),
2346                _ => Err(Error::Type("Unknown format for done property.".to_string())),
2347            },
2348            Ok(false) => Err(Error::Type("Promise has no done property.".to_string())),
2349            Err(()) => Err(Error::JSFailed),
2350        }
2351    }
2352}
2353
2354#[expect(unsafe_code)]
2355/// Get the `value` property of an object that a read promise resolved to.
2356pub(crate) fn get_read_promise_bytes(
2357    cx: SafeJSContext,
2358    v: &SafeHandleValue,
2359    can_gc: CanGc,
2360) -> Result<Vec<u8>, Error> {
2361    if !v.is_object() {
2362        return Err(Error::Type(
2363            "Unknown format for for bytes read.".to_string(),
2364        ));
2365    }
2366    unsafe {
2367        rooted!(in(*cx) let object = v.to_object());
2368        rooted!(in(*cx) let mut bytes = UndefinedValue());
2369        match get_dictionary_property(*cx, object.handle(), "value", bytes.handle_mut(), can_gc) {
2370            Ok(true) => {
2371                match Vec::<u8>::safe_from_jsval(
2372                    cx,
2373                    bytes.handle(),
2374                    ConversionBehavior::EnforceRange,
2375                    can_gc,
2376                ) {
2377                    Ok(ConversionResult::Success(val)) => Ok(val),
2378                    Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.to_string())),
2379                    _ => Err(Error::Type("Unknown format for bytes read.".to_string())),
2380                }
2381            },
2382            Ok(false) => Err(Error::Type("Promise has no value property.".to_string())),
2383            Err(()) => Err(Error::JSFailed),
2384        }
2385    }
2386}
2387
2388/// Convert a raw stream `chunk` JS value to `Vec<u8>`.
2389/// This mirrors the conversion used inside `get_read_promise_bytes`,
2390/// but operates on the raw chunk (no `{ value, done }` wrapper).
2391pub(crate) fn bytes_from_chunk_jsval(
2392    cx: SafeJSContext,
2393    chunk: &RootedTraceableBox<Heap<JSVal>>,
2394    can_gc: CanGc,
2395) -> Result<Vec<u8>, Error> {
2396    match Vec::<u8>::safe_from_jsval(cx, chunk.handle(), ConversionBehavior::EnforceRange, can_gc) {
2397        Ok(ConversionResult::Success(vec)) => Ok(vec),
2398        Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.to_string())),
2399        _ => Err(Error::Type("Unknown format for bytes read.".to_string())),
2400    }
2401}
2402
2403/// <https://streams.spec.whatwg.org/#rs-transfer>
2404impl Transferable for ReadableStream {
2405    type Index = MessagePortIndex;
2406    type Data = MessagePortImpl;
2407
2408    /// <https://streams.spec.whatwg.org/#ref-for-transfer-steps>
2409    fn transfer(&self) -> Fallible<(MessagePortId, MessagePortImpl)> {
2410        // Step 1. If ! IsReadableStreamLocked(value) is true, throw a
2411        // "DataCloneError" DOMException.
2412        if self.is_locked() {
2413            return Err(Error::DataClone(None));
2414        }
2415
2416        let global = self.global();
2417        let realm = enter_realm(&*global);
2418        let comp = InRealm::Entered(&realm);
2419        let cx = GlobalScope::get_cx();
2420        let can_gc = CanGc::note();
2421
2422        // Step 2. Let port1 be a new MessagePort in the current Realm.
2423        let port_1 = MessagePort::new(&global, can_gc);
2424        global.track_message_port(&port_1, None);
2425
2426        // Step 3. Let port2 be a new MessagePort in the current Realm.
2427        let port_2 = MessagePort::new(&global, can_gc);
2428        global.track_message_port(&port_2, None);
2429
2430        // Step 4. Entangle port1 and port2.
2431        global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
2432
2433        // Step 5. Let writable be a new WritableStream in the current Realm.
2434        let writable = WritableStream::new_with_proto(&global, None, can_gc);
2435
2436        // Step 6. Perform ! SetUpCrossRealmTransformWritable(writable, port1).
2437        writable.setup_cross_realm_transform_writable(cx, &port_1, can_gc);
2438
2439        // Step 7. Let promise be ! ReadableStreamPipeTo(value, writable, false, false, false).
2440        let promise = self.pipe_to(
2441            cx, &global, &writable, false, false, false, None, comp, can_gc,
2442        );
2443
2444        // Step 8. Set promise.[[PromiseIsHandled]] to true.
2445        promise.set_promise_is_handled();
2446
2447        // Step 9. Set dataHolder.[[port]] to ! StructuredSerializeWithTransfer(port2, « port2 »).
2448        port_2.transfer()
2449    }
2450
2451    /// <https://streams.spec.whatwg.org/#ref-for-transfer-receiving-steps>
2452    fn transfer_receive(
2453        owner: &GlobalScope,
2454        id: MessagePortId,
2455        port_impl: MessagePortImpl,
2456    ) -> Result<DomRoot<Self>, ()> {
2457        let cx = GlobalScope::get_cx();
2458        let can_gc = CanGc::note();
2459
2460        // Their transfer-receiving steps, given dataHolder and value, are:
2461        // Note: dataHolder is used in `structuredclone.rs`, and value is created here.
2462        let value = ReadableStream::new_with_proto(owner, None, can_gc);
2463
2464        // Step 1. Let deserializedRecord be !
2465        // StructuredDeserializeWithTransfer(dataHolder.[[port]], the current
2466        // Realm).
2467        // Done with the `Deserialize` derive of `MessagePortImpl`.
2468
2469        // Step 2. Let port be deserializedRecord.[[Deserialized]].
2470        let transferred_port = MessagePort::transfer_receive(owner, id, port_impl)?;
2471
2472        // Step 3. Perform ! SetUpCrossRealmTransformReadable(value, port).
2473        value.setup_cross_realm_transform_readable(cx, &transferred_port, can_gc);
2474        Ok(value)
2475    }
2476
2477    /// Note: we are relying on the port transfer, so the data returned here are related to the port.
2478    fn serialized_storage<'a>(
2479        data: StructuredData<'a, '_>,
2480    ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
2481        match data {
2482            StructuredData::Reader(r) => &mut r.port_impls,
2483            StructuredData::Writer(w) => &mut w.ports,
2484        }
2485    }
2486}