script/dom/stream/
readablestream.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr::{self};
8use std::rc::Rc;
9
10use base::generic_channel::GenericSharedMemory;
11use base::id::{MessagePortId, MessagePortIndex};
12use constellation_traits::MessagePortImpl;
13use dom_struct::dom_struct;
14use js::jsapi::{Heap, JSObject};
15use js::jsval::{JSVal, ObjectValue, UndefinedValue};
16use js::realm::CurrentRealm;
17use js::rust::{
18    HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
19    MutableHandleValue as SafeMutableHandleValue,
20};
21use js::typedarray::ArrayBufferViewU8;
22use rustc_hash::FxHashMap;
23use script_bindings::conversions::SafeToJSValConvertible;
24
25use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
26use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{
27    ReadableStreamGetReaderOptions, ReadableStreamMethods, ReadableStreamReaderMode,
28    ReadableWritablePair, StreamPipeOptions,
29};
30use script_bindings::str::DOMString;
31
32use crate::dom::domexception::{DOMErrorName, DOMException};
33use script_bindings::conversions::{is_array_like, StringificationBehavior};
34use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
35use crate::dom::abortsignal::{AbortAlgorithm, AbortSignal};
36use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods;
37use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultController_Binding::ReadableStreamDefaultControllerMethods;
38use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource;
39use crate::dom::bindings::conversions::{ConversionBehavior, ConversionResult, SafeFromJSValConvertible};
40use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
41use crate::dom::bindings::codegen::GenericBindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriter_Binding::WritableStreamDefaultWriterMethods;
42use crate::dom::stream::writablestream::WritableStream;
43use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultReaderOrReadableStreamBYOBReader as ReadableStreamReader;
44use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
45use crate::dom::bindings::root::{DomRoot, MutNullableDom, Dom};
46use crate::dom::bindings::trace::RootedTraceableBox;
47use crate::dom::bindings::utils::get_dictionary_property;
48use crate::dom::stream::byteteeunderlyingsource::{ByteTeeCancelAlgorithm, ByteTeePullAlgorithm, ByteTeeUnderlyingSource};
49use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
50use crate::dom::stream::readablestreamgenericreader::ReadableStreamGenericReader;
51use crate::dom::globalscope::GlobalScope;
52use crate::dom::promise::{wait_for_all_promise, Promise};
53use crate::dom::stream::readablebytestreamcontroller::ReadableByteStreamController;
54use crate::dom::stream::readablestreambyobreader::ReadableStreamBYOBReader;
55use crate::dom::stream::readablestreamdefaultcontroller::ReadableStreamDefaultController;
56use crate::dom::stream::readablestreamdefaultreader::{ReadRequest, ReadableStreamDefaultReader};
57use crate::dom::stream::defaultteeunderlyingsource::DefaultTeeCancelAlgorithm;
58use crate::dom::types::DefaultTeeUnderlyingSource;
59use crate::dom::stream::underlyingsourcecontainer::UnderlyingSourceType;
60use crate::dom::stream::writablestreamdefaultwriter::WritableStreamDefaultWriter;
61use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
62use crate::dom::messageport::MessagePort;
63use crate::realms::{enter_realm, InRealm};
64use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
65use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
66use crate::dom::bindings::transferable::Transferable;
67use crate::dom::bindings::structuredclone::StructuredData;
68
69use crate::dom::bindings::buffer_source::HeapBufferSource;
70use super::readablestreambyobreader::ReadIntoRequest;
71
72/// State Machine for `PipeTo`.
73#[derive(Clone, Debug, Default, MallocSizeOf, PartialEq)]
74enum PipeToState {
75    /// The starting state
76    #[default]
77    Starting,
78    /// Waiting for the writer to be ready
79    PendingReady,
80    /// Waiting for a read to resolve.
81    PendingRead,
82    /// Waiting for all pending writes to finish,
83    /// as part of shutting down with an optional action.
84    ShuttingDownWithPendingWrites(Option<ShutdownAction>),
85    /// When shutting down with an action,
86    /// waiting for the action to complete,
87    /// at which point we can `finalize`.
88    ShuttingDownPendingAction,
89    /// The pipe has been finalized,
90    /// no further actions should be performed.
91    Finalized,
92}
93
94/// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action>
95#[derive(Clone, Debug, MallocSizeOf, PartialEq)]
96enum ShutdownAction {
97    /// <https://streams.spec.whatwg.org/#writable-stream-abort>
98    WritableStreamAbort,
99    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>
100    ReadableStreamCancel,
101    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close-with-error-propagation>
102    WritableStreamDefaultWriterCloseWithErrorPropagation,
103    /// <https://streams.spec.whatwg.org/#ref-for-rs-pipeTo-shutdown-with-action>
104    Abort,
105}
106
107impl js::gc::Rootable for PipeTo {}
108
109/// The "in parallel, but not really" part of
110/// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
111///
112/// Note: the spec is flexible about how this is done, but requires the following constraints to apply:
113/// - Public API must not be used: we'll only use Rust.
114/// - Backpressure must be enforced: we'll only read from source when dest is ready.
115/// - Shutdown must stop activity: we'll do this together with the below.
116/// - Error and close states must be propagated: we'll do this by checking these states at every step.
117#[derive(Clone, JSTraceable, MallocSizeOf)]
118#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
119pub(crate) struct PipeTo {
120    /// <https://streams.spec.whatwg.org/#ref-for-readablestream%E2%91%A7%E2%91%A0>
121    reader: Dom<ReadableStreamDefaultReader>,
122
123    /// <https://streams.spec.whatwg.org/#ref-for-acquire-writable-stream-default-writer>
124    writer: Dom<WritableStreamDefaultWriter>,
125
126    /// Pending writes are needed when shutting down(with an action),
127    /// because we can only finalize when all writes are finished.
128    #[ignore_malloc_size_of = "nested Rc"]
129    pending_writes: Rc<RefCell<VecDeque<Rc<Promise>>>>,
130
131    /// The state machine.
132    #[conditional_malloc_size_of]
133    #[no_trace]
134    state: Rc<RefCell<PipeToState>>,
135
136    /// <https://streams.spec.whatwg.org/#readablestream-pipe-to-preventabort>
137    prevent_abort: bool,
138
139    /// <https://streams.spec.whatwg.org/#readablestream-pipe-to-preventcancel>
140    prevent_cancel: bool,
141
142    /// <https://streams.spec.whatwg.org/#readablestream-pipe-to-preventclose>
143    prevent_close: bool,
144
145    /// The `shuttingDown` variable of
146    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
147    #[conditional_malloc_size_of]
148    shutting_down: Rc<Cell<bool>>,
149
150    /// The abort reason of the abort signal,
151    /// stored here because we must keep it across a microtask.
152    #[ignore_malloc_size_of = "mozjs"]
153    abort_reason: Rc<Heap<JSVal>>,
154
155    /// The error potentially passed to shutdown,
156    /// stored here because we must keep it across a microtask.
157    #[ignore_malloc_size_of = "mozjs"]
158    shutdown_error: Rc<RefCell<Option<Heap<JSVal>>>>,
159
160    /// The promise returned by a shutdown action.
161    /// We keep it to only continue when it is not pending anymore.
162    #[ignore_malloc_size_of = "nested Rc"]
163    shutdown_action_promise: Rc<RefCell<Option<Rc<Promise>>>>,
164
165    /// The promise resolved or rejected at
166    /// <https://streams.spec.whatwg.org/#rs-pipeTo-finalize>
167    #[conditional_malloc_size_of]
168    result_promise: Rc<Promise>,
169}
170
171impl PipeTo {
172    /// Run the `abortAlgorithm` defined at
173    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
174    pub(crate) fn abort_with_reason(
175        &self,
176        cx: SafeJSContext,
177        global: &GlobalScope,
178        reason: SafeHandleValue,
179        realm: InRealm,
180        can_gc: CanGc,
181    ) {
182        // Abort should do nothing if we are already shutting down.
183        if self.shutting_down.get() {
184            return;
185        }
186
187        // Let error be signal’s abort reason.
188        // Note: storing it because it may need to be kept across a microtask,
189        // and see the note below as to why it is kept separately from `shutdown_error`.
190        self.abort_reason.set(reason.get());
191
192        // Note: setting the error now,
193        // will result in a rejection of the pipe promise, with this error.
194        // Unless any shutdown action raise their own error,
195        // in which case this error will be overwritten by the shutdown action error.
196        self.set_shutdown_error(reason);
197
198        // Let actions be an empty ordered set.
199        // Note: the actions are defined, and performed, inside `shutdown_with_an_action`.
200
201        // Shutdown with an action consisting of getting a promise to wait for all of the actions in actions,
202        // and with error.
203        self.shutdown(cx, global, Some(ShutdownAction::Abort), realm, can_gc);
204    }
205}
206
207impl Callback for PipeTo {
208    /// The pipe makes progress one microtask at a time.
209    /// Note: we use one struct as the callback for all promises,
210    /// and for both of their reactions.
211    ///
212    /// The context of the callback is determined from:
213    /// - the current state.
214    /// - the type of `result`.
215    /// - the state of a stored promise(in some cases).
216    #[expect(unsafe_code)]
217    fn callback(&self, cx: &mut CurrentRealm, result: SafeHandleValue) {
218        let can_gc = CanGc::from_cx(cx);
219        let in_realm_proof = cx.into();
220        let realm = InRealm::Already(&in_realm_proof);
221        let cx = cx.into();
222        let global = self.reader.global();
223
224        // Note: we only care about the result of writes when they are rejected,
225        // and the error is accessed not through handlers,
226        // but directly using `dest.get_stored_error`.
227        // So we must mark rejected promises as handled
228        // to prevent unhandled rejection errors.
229        self.pending_writes.borrow_mut().retain(|p| {
230            let pending = p.is_pending();
231            if !pending {
232                p.set_promise_is_handled();
233            }
234            pending
235        });
236
237        // Note: cloning to prevent re-borrow in methods called below.
238        let state_before_checks = self.state.borrow().clone();
239
240        // Note: if we are in a `PendingRead` state,
241        // and the source is closed,
242        // we try to write chunks before doing any shutdown,
243        // which is necessary to implement the
244        // "If any chunks have been read but not yet written, write them to dest."
245        // part of shutdown.
246        if state_before_checks == PipeToState::PendingRead {
247            let source = self.reader.get_stream().expect("Source stream must be set");
248            if source.is_closed() {
249                let dest = self
250                    .writer
251                    .get_stream()
252                    .expect("Destination stream must be set");
253
254                // If dest.[[state]] is "writable",
255                // and ! WritableStreamCloseQueuedOrInFlight(dest) is false,
256                if dest.is_writable() && !dest.close_queued_or_in_flight() {
257                    let Ok(done) = get_read_promise_done(cx, &result, can_gc) else {
258                        // This is the case that the microtask ran in reaction
259                        // to the closed promise of the reader,
260                        // so we should wait for subsequent chunks,
261                        // and skip the shutdown below
262                        // (reader is closed, but there are still pending reads).
263                        // Shutdown will happen when the last chunk has been received.
264                        return;
265                    };
266
267                    if !done {
268                        // If any chunks have been read but not yet written, write them to dest.
269                        self.write_chunk(cx, &global, result, can_gc);
270                    }
271                }
272            }
273        }
274
275        self.check_and_propagate_errors_forward(cx, &global, realm, can_gc);
276        self.check_and_propagate_errors_backward(cx, &global, realm, can_gc);
277        self.check_and_propagate_closing_forward(cx, &global, realm, can_gc);
278        self.check_and_propagate_closing_backward(cx, &global, realm, can_gc);
279
280        // Note: cloning to prevent re-borrow in methods called below.
281        let state = self.state.borrow().clone();
282
283        // If we switched to a shutdown state,
284        // return.
285        // Progress will be made at the next tick.
286        if state != state_before_checks {
287            return;
288        }
289
290        match state {
291            PipeToState::Starting => unreachable!("PipeTo should not be in the Starting state."),
292            PipeToState::PendingReady => {
293                // Read a chunk.
294                self.read_chunk(&global, realm, can_gc);
295            },
296            PipeToState::PendingRead => {
297                // Write the chunk.
298                self.write_chunk(cx, &global, result, can_gc);
299
300                // An early return is necessary if the write algorithm aborted the pipe.
301                if self.shutting_down.get() {
302                    return;
303                }
304
305                // Wait for the writer to be ready again.
306                self.wait_for_writer_ready(&global, realm, can_gc);
307            },
308            PipeToState::ShuttingDownWithPendingWrites(action) => {
309                // Wait until every chunk that has been read has been written
310                // (i.e. the corresponding promises have settled).
311                if let Some(write) = self.pending_writes.borrow_mut().front().cloned() {
312                    self.wait_on_pending_write(&global, write, realm, can_gc);
313                    return;
314                }
315
316                // Note: error is stored in `self.shutdown_error`.
317                if let Some(action) = action {
318                    // Let p be the result of performing action.
319                    self.perform_action(cx, &global, action, realm, can_gc);
320                } else {
321                    // Finalize, passing along error if it was given.
322                    self.finalize(cx, &global, can_gc);
323                }
324            },
325            PipeToState::ShuttingDownPendingAction => {
326                let Some(ref promise) = *self.shutdown_action_promise.borrow() else {
327                    unreachable!();
328                };
329                if promise.is_pending() {
330                    // While waiting for the action to complete,
331                    // we may get callbacks for other promises(closed, ready),
332                    // and we should ignore those.
333                    return;
334                }
335
336                let is_array_like = {
337                    if !result.is_object() {
338                        false
339                    } else {
340                        unsafe { is_array_like::<crate::DomTypeHolder>(*cx, result) }
341                    }
342                };
343
344                // Finalize, passing along error if it was given.
345                if !result.is_undefined() && !is_array_like {
346                    // Most actions either resolve with undefined,
347                    // or reject with an error,
348                    // and the error should be used when finalizing.
349                    // One exception is the `Abort` action,
350                    // which resolves with a list of undefined values.
351
352                    // If `result` isn't undefined or array-like,
353                    // then it is an error
354                    // and should overwrite the current shutdown error.
355                    self.set_shutdown_error(result);
356                }
357                self.finalize(cx, &global, can_gc);
358            },
359            PipeToState::Finalized => {},
360        }
361    }
362}
363
364impl PipeTo {
365    /// Setting shutdown error in a way that ensures it isn't
366    /// moved after it has been set.
367    fn set_shutdown_error(&self, error: SafeHandleValue) {
368        *self.shutdown_error.borrow_mut() = Some(Heap::default());
369        let Some(ref heap) = *self.shutdown_error.borrow() else {
370            unreachable!("Option set to Some(heap) above.");
371        };
372        heap.set(error.get())
373    }
374
375    /// Wait for the writer to be ready,
376    /// which implements the constraint that backpressure must be enforced.
377    fn wait_for_writer_ready(&self, global: &GlobalScope, realm: InRealm, can_gc: CanGc) {
378        {
379            let mut state = self.state.borrow_mut();
380            *state = PipeToState::PendingReady;
381        }
382
383        let ready_promise = self.writer.Ready();
384        if ready_promise.is_fulfilled() {
385            self.read_chunk(global, realm, can_gc);
386        } else {
387            let handler = PromiseNativeHandler::new(
388                global,
389                Some(Box::new(self.clone())),
390                Some(Box::new(self.clone())),
391                can_gc,
392            );
393            ready_promise.append_native_handler(&handler, realm, can_gc);
394
395            // Note: if the writer is not ready,
396            // in order to ensure progress we must
397            // also react to the closure of the source(because source may close empty).
398            let closed_promise = self.reader.Closed();
399            closed_promise.append_native_handler(&handler, realm, can_gc);
400        }
401    }
402
403    /// Read a chunk
404    fn read_chunk(&self, global: &GlobalScope, realm: InRealm, can_gc: CanGc) {
405        *self.state.borrow_mut() = PipeToState::PendingRead;
406        let chunk_promise = self.reader.Read(can_gc);
407        let handler = PromiseNativeHandler::new(
408            global,
409            Some(Box::new(self.clone())),
410            Some(Box::new(self.clone())),
411            can_gc,
412        );
413        chunk_promise.append_native_handler(&handler, realm, can_gc);
414
415        // Note: in order to ensure progress we must
416        // also react to the closure of the destination.
417        let ready_promise = self.writer.Closed();
418        ready_promise.append_native_handler(&handler, realm, can_gc);
419    }
420
421    /// Try to write a chunk using the jsval, and returns wether it succeeded
422    // It will fail if it is the last `done` chunk, or if it is not a chunk at all.
423    #[expect(unsafe_code)]
424    fn write_chunk(
425        &self,
426        cx: SafeJSContext,
427        global: &GlobalScope,
428        chunk: SafeHandleValue,
429        can_gc: CanGc,
430    ) -> bool {
431        if chunk.is_object() {
432            rooted!(in(*cx) let object = chunk.to_object());
433            rooted!(in(*cx) let mut bytes = UndefinedValue());
434            let has_value = unsafe {
435                get_dictionary_property(*cx, object.handle(), c"value", bytes.handle_mut(), can_gc)
436                    .expect("Chunk should have a value.")
437            };
438            if has_value {
439                // Write the chunk.
440                let write_promise = self.writer.write(cx, global, bytes.handle(), can_gc);
441                self.pending_writes.borrow_mut().push_back(write_promise);
442                return true;
443            }
444        }
445        false
446    }
447
448    /// Only as part of shutting-down do we wait on pending writes
449    /// (backpressure is communicated not through pending writes
450    /// but through the readiness of the writer).
451    fn wait_on_pending_write(
452        &self,
453        global: &GlobalScope,
454        promise: Rc<Promise>,
455        realm: InRealm,
456        can_gc: CanGc,
457    ) {
458        let handler = PromiseNativeHandler::new(
459            global,
460            Some(Box::new(self.clone())),
461            Some(Box::new(self.clone())),
462            can_gc,
463        );
464        promise.append_native_handler(&handler, realm, can_gc);
465    }
466
467    /// Errors must be propagated forward part of
468    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
469    fn check_and_propagate_errors_forward(
470        &self,
471        cx: SafeJSContext,
472        global: &GlobalScope,
473        realm: InRealm,
474        can_gc: CanGc,
475    ) {
476        // An early return is necessary if we are shutting down,
477        // because in that case the source can already have been set to none.
478        if self.shutting_down.get() {
479            return;
480        }
481
482        // if source.[[state]] is or becomes "errored", then
483        let source = self
484            .reader
485            .get_stream()
486            .expect("Reader should still have a stream");
487        if source.is_errored() {
488            rooted!(in(*cx) let mut source_error = UndefinedValue());
489            source.get_stored_error(source_error.handle_mut());
490            self.set_shutdown_error(source_error.handle());
491
492            // If preventAbort is false,
493            if !self.prevent_abort {
494                // shutdown with an action of ! WritableStreamAbort(dest, source.[[storedError]])
495                // and with source.[[storedError]].
496                self.shutdown(
497                    cx,
498                    global,
499                    Some(ShutdownAction::WritableStreamAbort),
500                    realm,
501                    can_gc,
502                )
503            } else {
504                // Otherwise, shutdown with source.[[storedError]].
505                self.shutdown(cx, global, None, realm, can_gc);
506            }
507        }
508    }
509
510    /// Errors must be propagated backward part of
511    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
512    fn check_and_propagate_errors_backward(
513        &self,
514        cx: SafeJSContext,
515        global: &GlobalScope,
516        realm: InRealm,
517        can_gc: CanGc,
518    ) {
519        // An early return is necessary if we are shutting down,
520        // because in that case the destination can already have been set to none.
521        if self.shutting_down.get() {
522            return;
523        }
524
525        // if dest.[[state]] is or becomes "errored", then
526        let dest = self
527            .writer
528            .get_stream()
529            .expect("Writer should still have a stream");
530        if dest.is_errored() {
531            rooted!(in(*cx) let mut dest_error = UndefinedValue());
532            dest.get_stored_error(dest_error.handle_mut());
533            self.set_shutdown_error(dest_error.handle());
534
535            // If preventCancel is false,
536            if !self.prevent_cancel {
537                // shutdown with an action of ! ReadableStreamCancel(source, dest.[[storedError]])
538                // and with dest.[[storedError]].
539                self.shutdown(
540                    cx,
541                    global,
542                    Some(ShutdownAction::ReadableStreamCancel),
543                    realm,
544                    can_gc,
545                )
546            } else {
547                // Otherwise, shutdown with dest.[[storedError]].
548                self.shutdown(cx, global, None, realm, can_gc);
549            }
550        }
551    }
552
553    /// Closing must be propagated forward part of
554    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
555    fn check_and_propagate_closing_forward(
556        &self,
557        cx: SafeJSContext,
558        global: &GlobalScope,
559        realm: InRealm,
560        can_gc: CanGc,
561    ) {
562        // An early return is necessary if we are shutting down,
563        // because in that case the source can already have been set to none.
564        if self.shutting_down.get() {
565            return;
566        }
567
568        // if source.[[state]] is or becomes "closed", then
569        let source = self
570            .reader
571            .get_stream()
572            .expect("Reader should still have a stream");
573        if source.is_closed() {
574            // If preventClose is false,
575            if !self.prevent_close {
576                // shutdown with an action of ! WritableStreamAbort(dest, source.[[storedError]])
577                // and with source.[[storedError]].
578                self.shutdown(
579                    cx,
580                    global,
581                    Some(ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation),
582                    realm,
583                    can_gc,
584                )
585            } else {
586                // Otherwise, shutdown.
587                self.shutdown(cx, global, None, realm, can_gc);
588            }
589        }
590    }
591
592    /// Closing must be propagated backward part of
593    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
594    fn check_and_propagate_closing_backward(
595        &self,
596        cx: SafeJSContext,
597        global: &GlobalScope,
598        realm: InRealm,
599        can_gc: CanGc,
600    ) {
601        // An early return is necessary if we are shutting down,
602        // because in that case the destination can already have been set to none.
603        if self.shutting_down.get() {
604            return;
605        }
606
607        // if ! WritableStreamCloseQueuedOrInFlight(dest) is true
608        // or dest.[[state]] is "closed"
609        let dest = self
610            .writer
611            .get_stream()
612            .expect("Writer should still have a stream");
613        if dest.close_queued_or_in_flight() || dest.is_closed() {
614            // Assert: no chunks have been read or written.
615            // Note: unclear how to perform this assertion.
616
617            // Let destClosed be a new TypeError.
618            rooted!(in(*cx) let mut dest_closed = UndefinedValue());
619            let error =
620                Error::Type("Destination is closed or has closed queued or in flight".to_string());
621            error.to_jsval(cx, global, dest_closed.handle_mut(), can_gc);
622            self.set_shutdown_error(dest_closed.handle());
623
624            // If preventCancel is false,
625            if !self.prevent_cancel {
626                // shutdown with an action of ! ReadableStreamCancel(source, destClosed)
627                // and with destClosed.
628                self.shutdown(
629                    cx,
630                    global,
631                    Some(ShutdownAction::ReadableStreamCancel),
632                    realm,
633                    can_gc,
634                )
635            } else {
636                // Otherwise, shutdown with destClosed.
637                self.shutdown(cx, global, None, realm, can_gc);
638            }
639        }
640    }
641
642    /// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action>
643    /// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown>
644    /// Combined into one method with an optional action.
645    fn shutdown(
646        &self,
647        cx: SafeJSContext,
648        global: &GlobalScope,
649        action: Option<ShutdownAction>,
650        realm: InRealm,
651        can_gc: CanGc,
652    ) {
653        // If shuttingDown is true, abort these substeps.
654        // Set shuttingDown to true.
655        if !self.shutting_down.replace(true) {
656            let dest = self.writer.get_stream().expect("Stream must be set");
657            // If dest.[[state]] is "writable",
658            // and ! WritableStreamCloseQueuedOrInFlight(dest) is false,
659            if dest.is_writable() && !dest.close_queued_or_in_flight() {
660                // If any chunks have been read but not yet written, write them to dest.
661                // Done at the top of `Callback`.
662
663                // Wait until every chunk that has been read has been written
664                // (i.e. the corresponding promises have settled).
665                if let Some(write) = self.pending_writes.borrow_mut().front() {
666                    *self.state.borrow_mut() = PipeToState::ShuttingDownWithPendingWrites(action);
667                    self.wait_on_pending_write(global, write.clone(), realm, can_gc);
668                    return;
669                }
670            }
671
672            // Note: error is stored in `self.shutdown_error`.
673            if let Some(action) = action {
674                // Let p be the result of performing action.
675                self.perform_action(cx, global, action, realm, can_gc);
676            } else {
677                // Finalize, passing along error if it was given.
678                self.finalize(cx, global, can_gc);
679            }
680        }
681    }
682
683    /// The perform action part of
684    /// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action>
685    fn perform_action(
686        &self,
687        cx: SafeJSContext,
688        global: &GlobalScope,
689        action: ShutdownAction,
690        realm: InRealm,
691        can_gc: CanGc,
692    ) {
693        rooted!(in(*cx) let mut error = UndefinedValue());
694        if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
695            error.set(shutdown_error.get());
696        }
697
698        *self.state.borrow_mut() = PipeToState::ShuttingDownPendingAction;
699
700        // Let p be the result of performing action.
701        let promise = match action {
702            ShutdownAction::WritableStreamAbort => {
703                let dest = self.writer.get_stream().expect("Stream must be set");
704                dest.abort(cx, global, error.handle(), realm, can_gc)
705            },
706            ShutdownAction::ReadableStreamCancel => {
707                let source = self
708                    .reader
709                    .get_stream()
710                    .expect("Reader should have a stream.");
711                source.cancel(cx, global, error.handle(), can_gc)
712            },
713            ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation => {
714                self.writer.close_with_error_propagation(cx, global, can_gc)
715            },
716            ShutdownAction::Abort => {
717                // Note: implementation of the `abortAlgorithm`
718                // of the signal associated with this piping operation.
719
720                // Let error be signal’s abort reason.
721                rooted!(in(*cx) let mut error = UndefinedValue());
722                error.set(self.abort_reason.get());
723
724                // Let actions be an empty ordered set.
725                let mut actions = vec![];
726
727                // If preventAbort is false, append the following action to actions:
728                if !self.prevent_abort {
729                    let dest = self
730                        .writer
731                        .get_stream()
732                        .expect("Destination stream must be set");
733
734                    // If dest.[[state]] is "writable",
735                    let promise = if dest.is_writable() {
736                        // return ! WritableStreamAbort(dest, error)
737                        dest.abort(cx, global, error.handle(), realm, can_gc)
738                    } else {
739                        // Otherwise, return a promise resolved with undefined.
740                        Promise::new_resolved(global, cx, (), can_gc)
741                    };
742                    actions.push(promise);
743                }
744
745                // If preventCancel is false, append the following action action to actions:
746                if !self.prevent_cancel {
747                    let source = self.reader.get_stream().expect("Source stream must be set");
748
749                    // If source.[[state]] is "readable",
750                    let promise = if source.is_readable() {
751                        // return ! ReadableStreamCancel(source, error).
752                        source.cancel(cx, global, error.handle(), can_gc)
753                    } else {
754                        // Otherwise, return a promise resolved with undefined.
755                        Promise::new_resolved(global, cx, (), can_gc)
756                    };
757                    actions.push(promise);
758                }
759
760                // Shutdown with an action consisting
761                // of getting a promise to wait for all of the actions in actions,
762                // and with error.
763                wait_for_all_promise(cx, global, actions, realm, can_gc)
764            },
765        };
766
767        // Upon fulfillment of p, finalize, passing along originalError if it was given.
768        // Upon rejection of p with reason newError, finalize with newError.
769        let handler = PromiseNativeHandler::new(
770            global,
771            Some(Box::new(self.clone())),
772            Some(Box::new(self.clone())),
773            can_gc,
774        );
775        promise.append_native_handler(&handler, realm, can_gc);
776        *self.shutdown_action_promise.borrow_mut() = Some(promise);
777    }
778
779    /// <https://streams.spec.whatwg.org/#rs-pipeTo-finalize>
780    fn finalize(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
781        *self.state.borrow_mut() = PipeToState::Finalized;
782
783        // Perform ! WritableStreamDefaultWriterRelease(writer).
784        self.writer.release(cx, global, can_gc);
785
786        // If reader implements ReadableStreamBYOBReader,
787        // perform ! ReadableStreamBYOBReaderRelease(reader).
788        // TODO.
789
790        // Otherwise, perform ! ReadableStreamDefaultReaderRelease(reader).
791        self.reader
792            .release(can_gc)
793            .expect("Releasing the reader should not fail");
794
795        // If signal is not undefined, remove abortAlgorithm from signal.
796        // Note: since `self.shutdown` is true at this point,
797        // the abort algorithm is a no-op,
798        // so for now not implementing this step.
799
800        if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
801            rooted!(in(*cx) let mut error = UndefinedValue());
802            error.set(shutdown_error.get());
803            // If error was given, reject promise with error.
804            self.result_promise.reject_native(&error.handle(), can_gc);
805        } else {
806            // Otherwise, resolve promise with undefined.
807            self.result_promise.resolve_native(&(), can_gc);
808        }
809    }
810}
811
812/// The fulfillment handler for the reacting to sourceCancelPromise part of
813/// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
814#[derive(Clone, JSTraceable, MallocSizeOf)]
815struct SourceCancelPromiseFulfillmentHandler {
816    #[conditional_malloc_size_of]
817    result: Rc<Promise>,
818}
819
820impl Callback for SourceCancelPromiseFulfillmentHandler {
821    /// The fulfillment handler for the reacting to sourceCancelPromise part of
822    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
823    /// An implementation of <https://webidl.spec.whatwg.org/#dfn-perform-steps-once-promise-is-settled>
824    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
825        let can_gc = CanGc::from_cx(cx);
826        self.result.resolve_native(&(), can_gc);
827    }
828}
829
830/// The rejection handler for the reacting to sourceCancelPromise part of
831/// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
832#[derive(Clone, JSTraceable, MallocSizeOf)]
833struct SourceCancelPromiseRejectionHandler {
834    #[conditional_malloc_size_of]
835    result: Rc<Promise>,
836}
837
838impl Callback for SourceCancelPromiseRejectionHandler {
839    /// The rejection handler for the reacting to sourceCancelPromise part of
840    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
841    /// An implementation of <https://webidl.spec.whatwg.org/#dfn-perform-steps-once-promise-is-settled>
842    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
843        let can_gc = CanGc::from_cx(cx);
844        self.result.reject_native(&v, can_gc);
845    }
846}
847
848/// <https://streams.spec.whatwg.org/#readablestream-state>
849#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf, PartialEq)]
850pub(crate) enum ReadableStreamState {
851    #[default]
852    Readable,
853    Closed,
854    Errored,
855}
856
857/// <https://streams.spec.whatwg.org/#readablestream-controller>
858#[derive(JSTraceable, MallocSizeOf)]
859#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
860pub(crate) enum ControllerType {
861    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller>
862    Byte(MutNullableDom<ReadableByteStreamController>),
863    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller>
864    Default(MutNullableDom<ReadableStreamDefaultController>),
865}
866
867/// <https://streams.spec.whatwg.org/#readablestream-readerr>
868#[derive(JSTraceable, MallocSizeOf)]
869#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
870pub(crate) enum ReaderType {
871    /// <https://streams.spec.whatwg.org/#readablestreambyobreader>
872    #[allow(clippy::upper_case_acronyms)]
873    BYOB(MutNullableDom<ReadableStreamBYOBReader>),
874    /// <https://streams.spec.whatwg.org/#readablestreamdefaultreader>
875    Default(MutNullableDom<ReadableStreamDefaultReader>),
876}
877
878impl Eq for ReaderType {}
879impl PartialEq for ReaderType {
880    fn eq(&self, other: &Self) -> bool {
881        matches!(
882            (self, other),
883            (ReaderType::BYOB(_), ReaderType::BYOB(_)) |
884                (ReaderType::Default(_), ReaderType::Default(_))
885        )
886    }
887}
888
889/// <https://streams.spec.whatwg.org/#create-readable-stream>
890#[cfg_attr(crown, expect(crown::unrooted_must_root))]
891pub(crate) fn create_readable_stream(
892    global: &GlobalScope,
893    underlying_source_type: UnderlyingSourceType,
894    queuing_strategy: Option<Rc<QueuingStrategySize>>,
895    high_water_mark: Option<f64>,
896    can_gc: CanGc,
897) -> DomRoot<ReadableStream> {
898    // If highWaterMark was not passed, set it to 1.
899    let high_water_mark = high_water_mark.unwrap_or(1.0);
900
901    // If sizeAlgorithm was not passed, set it to an algorithm that returns 1.
902    let size_algorithm =
903        queuing_strategy.unwrap_or(extract_size_algorithm(&QueuingStrategy::empty(), can_gc));
904
905    // Assert: ! IsNonNegativeNumber(highWaterMark) is true.
906    assert!(high_water_mark >= 0.0);
907
908    // Let stream be a new ReadableStream.
909    // Perform ! InitializeReadableStream(stream).
910    let stream = ReadableStream::new_with_proto(global, None, can_gc);
911
912    // Let controller be a new ReadableStreamDefaultController.
913    let controller = ReadableStreamDefaultController::new(
914        global,
915        underlying_source_type,
916        high_water_mark,
917        size_algorithm,
918        can_gc,
919    );
920
921    // Perform ? SetUpReadableStreamDefaultController(stream, controller, startAlgorithm,
922    // pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm).
923    controller
924        .setup(stream.clone(), can_gc)
925        .expect("Setup of default controller cannot fail");
926
927    // Return stream.
928    stream
929}
930
931/// <https://streams.spec.whatwg.org/#abstract-opdef-createreadablebytestream>
932#[cfg_attr(crown, expect(crown::unrooted_must_root))]
933pub(crate) fn readable_byte_stream_tee(
934    global: &GlobalScope,
935    underlying_source_type: UnderlyingSourceType,
936    can_gc: CanGc,
937) -> DomRoot<ReadableStream> {
938    // Let stream be a new ReadableStream.
939    // Perform ! InitializeReadableStream(stream).
940    let tee_stream = ReadableStream::new_with_proto(global, None, can_gc);
941
942    // Let controller be a new ReadableByteStreamController.
943    let controller = ReadableByteStreamController::new(underlying_source_type, 0.0, global, can_gc);
944
945    // Perform ? SetUpReadableByteStreamController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, 0, undefined).
946    controller
947        .setup(global, tee_stream.clone(), can_gc)
948        .expect("Setup of byte stream controller cannot fail");
949
950    // Return stream.
951    tee_stream
952}
953
954/// <https://streams.spec.whatwg.org/#rs-class>
955#[dom_struct]
956pub(crate) struct ReadableStream {
957    reflector_: Reflector,
958
959    /// <https://streams.spec.whatwg.org/#readablestream-controller>
960    /// Note: the inner `MutNullableDom` should really be an `Option<Dom>`,
961    /// because it is never unset once set.
962    controller: RefCell<Option<ControllerType>>,
963
964    /// <https://streams.spec.whatwg.org/#readablestream-storederror>
965    #[ignore_malloc_size_of = "mozjs"]
966    stored_error: Heap<JSVal>,
967
968    /// <https://streams.spec.whatwg.org/#readablestream-disturbed>
969    disturbed: Cell<bool>,
970
971    /// <https://streams.spec.whatwg.org/#readablestream-reader>
972    reader: RefCell<Option<ReaderType>>,
973
974    /// <https://streams.spec.whatwg.org/#readablestream-state>
975    state: Cell<ReadableStreamState>,
976}
977
978impl ReadableStream {
979    /// <https://streams.spec.whatwg.org/#initialize-readable-stream>
980    fn new_inherited() -> ReadableStream {
981        ReadableStream {
982            reflector_: Reflector::new(),
983            controller: RefCell::new(None),
984            stored_error: Heap::default(),
985            disturbed: Default::default(),
986            reader: RefCell::new(None),
987            state: Cell::new(Default::default()),
988        }
989    }
990
991    pub(crate) fn new_with_proto(
992        global: &GlobalScope,
993        proto: Option<SafeHandleObject>,
994        can_gc: CanGc,
995    ) -> DomRoot<ReadableStream> {
996        reflect_dom_object_with_proto(
997            Box::new(ReadableStream::new_inherited()),
998            global,
999            proto,
1000            can_gc,
1001        )
1002    }
1003
1004    /// Used as part of
1005    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
1006    pub(crate) fn set_default_controller(&self, controller: &ReadableStreamDefaultController) {
1007        *self.controller.borrow_mut() = Some(ControllerType::Default(MutNullableDom::new(Some(
1008            controller,
1009        ))));
1010    }
1011
1012    /// Used as part of
1013    /// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
1014    pub(crate) fn set_byte_controller(&self, controller: &ReadableByteStreamController) {
1015        *self.controller.borrow_mut() =
1016            Some(ControllerType::Byte(MutNullableDom::new(Some(controller))));
1017    }
1018
1019    /// Used as part of
1020    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
1021    pub(crate) fn assert_no_controller(&self) {
1022        let has_no_controller = self.controller.borrow().is_none();
1023        assert!(has_no_controller);
1024    }
1025
1026    /// Build a stream backed by a Rust source that has already been read into memory.
1027    pub(crate) fn new_from_bytes(
1028        global: &GlobalScope,
1029        bytes: Vec<u8>,
1030        can_gc: CanGc,
1031    ) -> Fallible<DomRoot<ReadableStream>> {
1032        let stream = ReadableStream::new_with_external_underlying_source(
1033            global,
1034            UnderlyingSourceType::Memory(bytes.len()),
1035            can_gc,
1036        )?;
1037        stream.enqueue_native(bytes, can_gc);
1038        stream.controller_close_native(can_gc);
1039        Ok(stream)
1040    }
1041
1042    /// Build a stream backed by a Rust underlying source.
1043    /// Note: external sources are always paired with a default controller.
1044    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1045    pub(crate) fn new_with_external_underlying_source(
1046        global: &GlobalScope,
1047        source: UnderlyingSourceType,
1048        can_gc: CanGc,
1049    ) -> Fallible<DomRoot<ReadableStream>> {
1050        assert!(source.is_native());
1051        let stream = ReadableStream::new_with_proto(global, None, can_gc);
1052        let controller = ReadableStreamDefaultController::new(
1053            global,
1054            source,
1055            1.0,
1056            extract_size_algorithm(&QueuingStrategy::empty(), can_gc),
1057            can_gc,
1058        );
1059        controller.setup(stream.clone(), can_gc)?;
1060        Ok(stream)
1061    }
1062
1063    /// Call into the release steps of the controller,
1064    pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
1065        match self.controller.borrow().as_ref() {
1066            Some(ControllerType::Default(controller)) => {
1067                let controller = controller
1068                    .get()
1069                    .ok_or_else(|| Error::Type("Stream should have controller.".to_string()))?;
1070                controller.perform_release_steps()
1071            },
1072            Some(ControllerType::Byte(controller)) => {
1073                let controller = controller
1074                    .get()
1075                    .ok_or_else(|| Error::Type("Stream should have controller.".to_string()))?;
1076                controller.perform_release_steps()
1077            },
1078            None => Err(Error::Type("Stream should have controller.".to_string())),
1079        }
1080    }
1081
1082    /// Call into the pull steps of the controller,
1083    /// as part of
1084    /// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read>
1085    pub(crate) fn perform_pull_steps(
1086        &self,
1087        cx: SafeJSContext,
1088        read_request: &ReadRequest,
1089        can_gc: CanGc,
1090    ) {
1091        match self.controller.borrow().as_ref() {
1092            Some(ControllerType::Default(controller)) => controller
1093                .get()
1094                .expect("Stream should have controller.")
1095                .perform_pull_steps(read_request, can_gc),
1096            Some(ControllerType::Byte(controller)) => controller
1097                .get()
1098                .expect("Stream should have controller.")
1099                .perform_pull_steps(cx, read_request, can_gc),
1100            None => {
1101                unreachable!("Stream does not have a controller.");
1102            },
1103        }
1104    }
1105
1106    /// Call into the pull steps of the controller,
1107    /// as part of
1108    /// <https://streams.spec.whatwg.org/#readable-stream-byob-reader-read>
1109    pub(crate) fn perform_pull_into(
1110        &self,
1111        cx: SafeJSContext,
1112        read_into_request: &ReadIntoRequest,
1113        view: HeapBufferSource<ArrayBufferViewU8>,
1114        min: u64,
1115        can_gc: CanGc,
1116    ) {
1117        match self.controller.borrow().as_ref() {
1118            Some(ControllerType::Byte(controller)) => controller
1119                .get()
1120                .expect("Stream should have controller.")
1121                .perform_pull_into(cx, read_into_request, view, min, can_gc),
1122            _ => {
1123                unreachable!(
1124                    "Pulling a chunk from a stream with a default controller using a BYOB reader"
1125                )
1126            },
1127        }
1128    }
1129
1130    /// <https://streams.spec.whatwg.org/#readable-stream-add-read-request>
1131    pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
1132        match self.reader.borrow().as_ref() {
1133            Some(ReaderType::Default(reader)) => {
1134                let Some(reader) = reader.get() else {
1135                    panic!("Attempt to add a read request without having first acquired a reader.");
1136                };
1137
1138                // Assert: stream.[[state]] is "readable".
1139                assert!(self.is_readable());
1140
1141                // Append readRequest to stream.[[reader]].[[readRequests]].
1142                reader.add_read_request(read_request);
1143            },
1144            _ => {
1145                unreachable!("Adding a read request can only be done on a default reader.")
1146            },
1147        }
1148    }
1149
1150    /// <https://streams.spec.whatwg.org/#readable-stream-add-read-into-request>
1151    pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
1152        match self.reader.borrow().as_ref() {
1153            // Assert: stream.[[reader]] implements ReadableStreamBYOBReader.
1154            Some(ReaderType::BYOB(reader)) => {
1155                let Some(reader) = reader.get() else {
1156                    unreachable!(
1157                        "Attempt to add a read into request without having first acquired a reader."
1158                    );
1159                };
1160
1161                // Assert: stream.[[state]] is "readable" or "closed".
1162                assert!(self.is_readable() || self.is_closed());
1163
1164                // Append readRequest to stream.[[reader]].[[readIntoRequests]].
1165                reader.add_read_into_request(read_request);
1166            },
1167            _ => {
1168                unreachable!("Adding a read into request can only be done on a BYOB reader.")
1169            },
1170        }
1171    }
1172
1173    /// Endpoint to enqueue chunks directly from Rust.
1174    /// Note: in other use cases this call happens via the controller.
1175    pub(crate) fn enqueue_native(&self, bytes: Vec<u8>, can_gc: CanGc) {
1176        match self.controller.borrow().as_ref() {
1177            Some(ControllerType::Default(controller)) => controller
1178                .get()
1179                .expect("Stream should have controller.")
1180                .enqueue_native(bytes, can_gc),
1181            _ => {
1182                unreachable!(
1183                    "Enqueueing chunk to a stream from Rust on other than default controller"
1184                );
1185            },
1186        }
1187    }
1188
1189    /// <https://streams.spec.whatwg.org/#readable-stream-error>
1190    pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
1191        // Assert: stream.[[state]] is "readable".
1192        assert!(self.is_readable());
1193
1194        // Set stream.[[state]] to "errored".
1195        self.state.set(ReadableStreamState::Errored);
1196
1197        // Set stream.[[storedError]] to e.
1198        self.stored_error.set(e.get());
1199
1200        // Let reader be stream.[[reader]].
1201
1202        let default_reader = {
1203            let reader_ref = self.reader.borrow();
1204            match reader_ref.as_ref() {
1205                Some(ReaderType::Default(reader)) => reader.get(),
1206                _ => None,
1207            }
1208        };
1209
1210        if let Some(reader) = default_reader {
1211            // Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, e).
1212            reader.error(e, can_gc);
1213            return;
1214        }
1215
1216        let byob_reader = {
1217            let reader_ref = self.reader.borrow();
1218            match reader_ref.as_ref() {
1219                Some(ReaderType::BYOB(reader)) => reader.get(),
1220                _ => None,
1221            }
1222        };
1223
1224        if let Some(reader) = byob_reader {
1225            // Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e).
1226            reader.error_read_into_requests(e, can_gc);
1227        }
1228
1229        // If reader is undefined, return.
1230    }
1231
1232    /// <https://streams.spec.whatwg.org/#readablestream-storederror>
1233    pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
1234        handle_mut.set(self.stored_error.get());
1235    }
1236
1237    /// <https://streams.spec.whatwg.org/#readable-stream-error>
1238    /// Note: in other use cases this call happens via the controller.
1239    pub(crate) fn error_native(&self, error: Error, can_gc: CanGc) {
1240        let cx = GlobalScope::get_cx();
1241        rooted!(in(*cx) let mut error_val = UndefinedValue());
1242        error.to_jsval(cx, &self.global(), error_val.handle_mut(), can_gc);
1243        self.error(error_val.handle(), can_gc);
1244    }
1245
1246    /// Call into the controller's `Close` method.
1247    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-close>
1248    pub(crate) fn controller_close_native(&self, can_gc: CanGc) {
1249        match self.controller.borrow().as_ref() {
1250            Some(ControllerType::Default(controller)) => {
1251                let _ = controller
1252                    .get()
1253                    .expect("Stream should have controller.")
1254                    .Close(can_gc);
1255            },
1256            _ => {
1257                unreachable!("Native closing is only done on default controllers.")
1258            },
1259        }
1260    }
1261
1262    /// Returns a boolean reflecting whether the stream has all data in memory.
1263    /// Useful for native source integration only.
1264    pub(crate) fn in_memory(&self) -> bool {
1265        match self.controller.borrow().as_ref() {
1266            Some(ControllerType::Default(controller)) => controller
1267                .get()
1268                .expect("Stream should have controller.")
1269                .in_memory(),
1270            _ => {
1271                unreachable!(
1272                    "Checking if source is in memory for a stream with a non-default controller"
1273                )
1274            },
1275        }
1276    }
1277
1278    /// Return bytes for synchronous use, if the stream has all data in memory.
1279    /// Useful for native source integration only.
1280    pub(crate) fn get_in_memory_bytes(&self) -> Option<GenericSharedMemory> {
1281        match self.controller.borrow().as_ref() {
1282            Some(ControllerType::Default(controller)) => controller
1283                .get()
1284                .expect("Stream should have controller.")
1285                .get_in_memory_bytes()
1286                .as_deref()
1287                .map(GenericSharedMemory::from_bytes),
1288            _ => {
1289                unreachable!("Getting in-memory bytes for a stream with a non-default controller")
1290            },
1291        }
1292    }
1293
1294    /// Acquires a reader and locks the stream,
1295    /// must be done before `read_a_chunk`.
1296    /// Native call to
1297    /// <https://streams.spec.whatwg.org/#acquire-readable-stream-reader>
1298    pub(crate) fn acquire_default_reader(
1299        &self,
1300        can_gc: CanGc,
1301    ) -> Fallible<DomRoot<ReadableStreamDefaultReader>> {
1302        // Let reader be a new ReadableStreamDefaultReader.
1303        let reader = ReadableStreamDefaultReader::new(&self.global(), can_gc);
1304
1305        // Perform ? SetUpReadableStreamDefaultReader(reader, stream).
1306        reader.set_up(self, &self.global(), can_gc)?;
1307
1308        // Return reader.
1309        Ok(reader)
1310    }
1311
1312    /// <https://streams.spec.whatwg.org/#acquire-readable-stream-byob-reader>
1313    pub(crate) fn acquire_byob_reader(
1314        &self,
1315        can_gc: CanGc,
1316    ) -> Fallible<DomRoot<ReadableStreamBYOBReader>> {
1317        // Let reader be a new ReadableStreamBYOBReader.
1318        let reader = ReadableStreamBYOBReader::new(&self.global(), can_gc);
1319        // Perform ? SetUpReadableStreamBYOBReader(reader, stream).
1320        reader.set_up(self, &self.global(), can_gc)?;
1321
1322        // Return reader.
1323        Ok(reader)
1324    }
1325
1326    pub(crate) fn get_default_controller(&self) -> DomRoot<ReadableStreamDefaultController> {
1327        match self.controller.borrow().as_ref() {
1328            Some(ControllerType::Default(controller)) => {
1329                controller.get().expect("Stream should have controller.")
1330            },
1331            _ => {
1332                unreachable!(
1333                    "Getting default controller for a stream with a non-default controller"
1334                )
1335            },
1336        }
1337    }
1338
1339    pub(crate) fn get_byte_controller(&self) -> DomRoot<ReadableByteStreamController> {
1340        match self.controller.borrow().as_ref() {
1341            Some(ControllerType::Byte(controller)) => {
1342                controller.get().expect("Stream should have controller.")
1343            },
1344            _ => {
1345                unreachable!("Getting byte controller for a stream with a non-byte controller")
1346            },
1347        }
1348    }
1349
1350    pub(crate) fn get_default_reader(&self) -> DomRoot<ReadableStreamDefaultReader> {
1351        match self.reader.borrow().as_ref() {
1352            Some(ReaderType::Default(reader)) => reader.get().expect("Stream should have reader."),
1353            _ => {
1354                unreachable!("Getting default reader for a stream with a non-default reader")
1355            },
1356        }
1357    }
1358
1359    /// Read a chunk from the stream,
1360    /// must be called after `start_reading`,
1361    /// and before `stop_reading`.
1362    /// Native call to
1363    /// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read>
1364    pub(crate) fn read_a_chunk(&self, can_gc: CanGc) -> Rc<Promise> {
1365        match self.reader.borrow().as_ref() {
1366            Some(ReaderType::Default(reader)) => {
1367                let Some(reader) = reader.get() else {
1368                    unreachable!(
1369                        "Attempt to read stream chunk without having first acquired a reader."
1370                    );
1371                };
1372                reader.Read(can_gc)
1373            },
1374            _ => {
1375                unreachable!("Native reading of a chunk can only be done with a default reader.")
1376            },
1377        }
1378    }
1379
1380    /// Releases the lock on the reader,
1381    /// must be done after `start_reading`.
1382    /// Native call to
1383    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultreaderrelease>
1384    pub(crate) fn stop_reading(&self, can_gc: CanGc) {
1385        let reader_ref = self.reader.borrow();
1386
1387        match reader_ref.as_ref() {
1388            Some(ReaderType::Default(reader)) => {
1389                let Some(reader) = reader.get() else {
1390                    unreachable!("Attempt to stop reading without having first acquired a reader.");
1391                };
1392
1393                drop(reader_ref);
1394                reader.release(can_gc).expect("Reader release cannot fail.");
1395            },
1396            _ => {
1397                unreachable!("Native stop reading can only be done with a default reader.")
1398            },
1399        }
1400    }
1401
1402    /// <https://streams.spec.whatwg.org/#is-readable-stream-locked>
1403    pub(crate) fn is_locked(&self) -> bool {
1404        match self.reader.borrow().as_ref() {
1405            Some(ReaderType::Default(reader)) => reader.get().is_some(),
1406            Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1407            None => false,
1408        }
1409    }
1410
1411    pub(crate) fn is_disturbed(&self) -> bool {
1412        self.disturbed.get()
1413    }
1414
1415    pub(crate) fn set_is_disturbed(&self, disturbed: bool) {
1416        self.disturbed.set(disturbed);
1417    }
1418
1419    pub(crate) fn is_closed(&self) -> bool {
1420        self.state.get() == ReadableStreamState::Closed
1421    }
1422
1423    pub(crate) fn is_errored(&self) -> bool {
1424        self.state.get() == ReadableStreamState::Errored
1425    }
1426
1427    pub(crate) fn is_readable(&self) -> bool {
1428        self.state.get() == ReadableStreamState::Readable
1429    }
1430
1431    pub(crate) fn has_default_reader(&self) -> bool {
1432        match self.reader.borrow().as_ref() {
1433            Some(ReaderType::Default(reader)) => reader.get().is_some(),
1434            _ => false,
1435        }
1436    }
1437
1438    pub(crate) fn has_byob_reader(&self) -> bool {
1439        match self.reader.borrow().as_ref() {
1440            Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1441            _ => false,
1442        }
1443    }
1444
1445    pub(crate) fn has_byte_controller(&self) -> bool {
1446        match self.controller.borrow().as_ref() {
1447            Some(ControllerType::Byte(controller)) => controller.get().is_some(),
1448            _ => false,
1449        }
1450    }
1451
1452    /// <https://streams.spec.whatwg.org/#readable-stream-get-num-read-requests>
1453    pub(crate) fn get_num_read_requests(&self) -> usize {
1454        match self.reader.borrow().as_ref() {
1455            Some(ReaderType::Default(reader)) => {
1456                let reader = reader
1457                    .get()
1458                    .expect("Stream must have a reader when getting the number of read requests.");
1459                reader.get_num_read_requests()
1460            },
1461            _ => unreachable!(
1462                "Stream must have a default reader when get num read requests is called into."
1463            ),
1464        }
1465    }
1466
1467    /// <https://streams.spec.whatwg.org/#readable-stream-get-num-read-into-requests>
1468    pub(crate) fn get_num_read_into_requests(&self) -> usize {
1469        assert!(self.has_byob_reader());
1470
1471        match self.reader.borrow().as_ref() {
1472            Some(ReaderType::BYOB(reader)) => {
1473                let Some(reader) = reader.get() else {
1474                    unreachable!(
1475                        "Stream must have a reader when get num read into requests is called into."
1476                    );
1477                };
1478                reader.get_num_read_into_requests()
1479            },
1480            _ => {
1481                unreachable!(
1482                    "Stream must have a BYOB reader when get num read into requests is called into."
1483                );
1484            },
1485        }
1486    }
1487
1488    /// <https://streams.spec.whatwg.org/#readable-stream-fulfill-read-request>
1489    pub(crate) fn fulfill_read_request(&self, chunk: SafeHandleValue, done: bool, can_gc: CanGc) {
1490        // step 1 - Assert: ! ReadableStreamHasDefaultReader(stream) is true.
1491        assert!(self.has_default_reader());
1492
1493        match self.reader.borrow().as_ref() {
1494            Some(ReaderType::Default(reader)) => {
1495                // step 2 - Let reader be stream.[[reader]].
1496                let reader = reader
1497                    .get()
1498                    .expect("Stream must have a reader when a read request is fulfilled.");
1499                // step 3 - Assert: reader.[[readRequests]] is not empty.
1500                assert_ne!(reader.get_num_read_requests(), 0);
1501                // step 4 & 5
1502                // Let readRequest be reader.[[readRequests]][0]. & Remove readRequest from reader.[[readRequests]].
1503                let request = reader.remove_read_request();
1504
1505                if done {
1506                    // step 6 - If done is true, perform readRequest’s close steps.
1507                    request.close_steps(can_gc);
1508                } else {
1509                    // step 7 - Otherwise, perform readRequest’s chunk steps, given chunk.
1510                    let result = RootedTraceableBox::new(Heap::default());
1511                    result.set(*chunk);
1512                    request.chunk_steps(result, &self.global(), can_gc);
1513                }
1514            },
1515            _ => {
1516                unreachable!(
1517                    "Stream must have a default reader when fulfill read requests is called into."
1518                );
1519            },
1520        }
1521    }
1522
1523    /// <https://streams.spec.whatwg.org/#readable-stream-fulfill-read-into-request>
1524    pub(crate) fn fulfill_read_into_request(
1525        &self,
1526        chunk: SafeHandleValue,
1527        done: bool,
1528        can_gc: CanGc,
1529    ) {
1530        // Assert: ! ReadableStreamHasBYOBReader(stream) is true.
1531        assert!(self.has_byob_reader());
1532
1533        // Let reader be stream.[[reader]].
1534        match self.reader.borrow().as_ref() {
1535            Some(ReaderType::BYOB(reader)) => {
1536                let Some(reader) = reader.get() else {
1537                    unreachable!(
1538                        "Stream must have a reader when a read into request is fulfilled."
1539                    );
1540                };
1541
1542                // Assert: reader.[[readIntoRequests]] is not empty.
1543                assert!(reader.get_num_read_into_requests() > 0);
1544
1545                // Let readIntoRequest be reader.[[readIntoRequests]][0].
1546                // Remove readIntoRequest from reader.[[readIntoRequests]].
1547                let read_into_request = reader.remove_read_into_request();
1548
1549                // If done is true, perform readIntoRequest’s close steps, given chunk.
1550                let result = RootedTraceableBox::new(Heap::default());
1551                if done {
1552                    result.set(*chunk);
1553                    read_into_request.close_steps(Some(result), can_gc);
1554                } else {
1555                    // Otherwise, perform readIntoRequest’s chunk steps, given chunk.
1556                    result.set(*chunk);
1557                    read_into_request.chunk_steps(result, can_gc);
1558                }
1559            },
1560            _ => {
1561                unreachable!(
1562                    "Stream must have a BYOB reader when fulfill read into requests is called into."
1563                );
1564            },
1565        };
1566    }
1567
1568    /// <https://streams.spec.whatwg.org/#readable-stream-close>
1569    pub(crate) fn close(&self, can_gc: CanGc) {
1570        // Assert: stream.[[state]] is "readable".
1571        assert!(self.is_readable());
1572        // Set stream.[[state]] to "closed".
1573        self.state.set(ReadableStreamState::Closed);
1574        // Let reader be stream.[[reader]].
1575
1576        // NOTE: do not hold the RefCell borrow across reader.close(),
1577        // or release() will panic when it tries to mut-borrow stream.reader.
1578        // So we pull out the underlying DOM reader in a local, then drop the borrow.
1579        let default_reader = {
1580            let reader_ref = self.reader.borrow();
1581            match reader_ref.as_ref() {
1582                Some(ReaderType::Default(reader)) => reader.get(),
1583                _ => None,
1584            }
1585        };
1586
1587        if let Some(reader) = default_reader {
1588            // steps 5 & 6 for a default reader
1589            reader.close(can_gc);
1590            return;
1591        }
1592
1593        // Same for BYOB reader.
1594        let byob_reader = {
1595            let reader_ref = self.reader.borrow();
1596            match reader_ref.as_ref() {
1597                Some(ReaderType::BYOB(reader)) => reader.get(),
1598                _ => None,
1599            }
1600        };
1601
1602        if let Some(reader) = byob_reader {
1603            // steps 5 & 6 for a BYOB reader
1604            reader.close(can_gc);
1605        }
1606
1607        // If reader is undefined, return.
1608    }
1609
1610    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>
1611    pub(crate) fn cancel(
1612        &self,
1613        cx: SafeJSContext,
1614        global: &GlobalScope,
1615        reason: SafeHandleValue,
1616        can_gc: CanGc,
1617    ) -> Rc<Promise> {
1618        // Set stream.[[disturbed]] to true.
1619        self.disturbed.set(true);
1620
1621        // If stream.[[state]] is "closed", return a promise resolved with undefined.
1622        if self.is_closed() {
1623            return Promise::new_resolved(global, cx, (), can_gc);
1624        }
1625        // If stream.[[state]] is "errored", return a promise rejected with stream.[[storedError]].
1626        if self.is_errored() {
1627            let promise = Promise::new(global, can_gc);
1628            rooted!(in(*cx) let mut rval = UndefinedValue());
1629            self.stored_error
1630                .safe_to_jsval(cx, rval.handle_mut(), can_gc);
1631            promise.reject_native(&rval.handle(), can_gc);
1632            return promise;
1633        }
1634        // Perform ! ReadableStreamClose(stream).
1635        self.close(can_gc);
1636
1637        // If reader is not undefined and reader implements ReadableStreamBYOBReader,
1638        let byob_reader = {
1639            let reader_ref = self.reader.borrow();
1640            match reader_ref.as_ref() {
1641                Some(ReaderType::BYOB(reader)) => reader.get(),
1642                _ => None,
1643            }
1644        };
1645
1646        if let Some(reader) = byob_reader {
1647            // step 6.1, 6.2 & 6.3 of https://streams.spec.whatwg.org/#readable-stream-cancel
1648            reader.cancel(can_gc);
1649        }
1650
1651        // Let sourceCancelPromise be ! stream.[[controller]].[[CancelSteps]](reason).
1652
1653        let source_cancel_promise = match self.controller.borrow().as_ref() {
1654            Some(ControllerType::Default(controller)) => controller
1655                .get()
1656                .expect("Stream should have controller.")
1657                .perform_cancel_steps(cx, global, reason, can_gc),
1658            Some(ControllerType::Byte(controller)) => controller
1659                .get()
1660                .expect("Stream should have controller.")
1661                .perform_cancel_steps(cx, global, reason, can_gc),
1662            None => {
1663                panic!("Stream does not have a controller.");
1664            },
1665        };
1666
1667        // Create a new promise,
1668        // and setup a handler in order to react to the fulfillment of sourceCancelPromise.
1669        let global = self.global();
1670        let result_promise = Promise::new(&global, can_gc);
1671        let fulfillment_handler = Box::new(SourceCancelPromiseFulfillmentHandler {
1672            result: result_promise.clone(),
1673        });
1674        let rejection_handler = Box::new(SourceCancelPromiseRejectionHandler {
1675            result: result_promise.clone(),
1676        });
1677        let handler = PromiseNativeHandler::new(
1678            &global,
1679            Some(fulfillment_handler),
1680            Some(rejection_handler),
1681            can_gc,
1682        );
1683        let realm = enter_realm(&*global);
1684        let comp = InRealm::Entered(&realm);
1685        source_cancel_promise.append_native_handler(&handler, comp, can_gc);
1686
1687        // Return the result of reacting to sourceCancelPromise
1688        // with a fulfillment step that returns undefined.
1689        result_promise
1690    }
1691
1692    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1693    pub(crate) fn set_reader(&self, new_reader: Option<ReaderType>) {
1694        *self.reader.borrow_mut() = new_reader;
1695    }
1696
1697    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1698    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee>
1699    fn byte_tee(&self, can_gc: CanGc) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1700        // Assert: stream implements ReadableStream.
1701        // Assert: stream.[[controller]] implements ReadableByteStreamController.
1702
1703        // Let reader be ? AcquireReadableStreamDefaultReader(stream).
1704        let reader = self.acquire_default_reader(can_gc)?;
1705        let reader = Rc::new(RefCell::new(ReaderType::Default(MutNullableDom::new(
1706            Some(&reader),
1707        ))));
1708
1709        // Let reading be false.
1710        let reading = Rc::new(Cell::new(false));
1711
1712        // Let readAgainForBranch1 be false.
1713        let read_again_for_branch_1 = Rc::new(Cell::new(false));
1714
1715        // Let readAgainForBranch2 be false.
1716        let read_again_for_branch_2 = Rc::new(Cell::new(false));
1717
1718        // Let canceled1 be false.
1719        let canceled_1 = Rc::new(Cell::new(false));
1720
1721        // Let canceled2 be false.
1722        let canceled_2 = Rc::new(Cell::new(false));
1723
1724        // Let reason1 be undefined.
1725        let reason_1 = Rc::new(Heap::boxed(UndefinedValue()));
1726
1727        // Let reason2 be undefined.
1728        let reason_2 = Rc::new(Heap::boxed(UndefinedValue()));
1729
1730        // Let cancelPromise be a new promise.
1731        let cancel_promise = Promise::new(&self.global(), can_gc);
1732        let reader_version = Rc::new(Cell::new(0));
1733
1734        let byte_tee_source_1 = ByteTeeUnderlyingSource::new(
1735            reader.clone(),
1736            self,
1737            reading.clone(),
1738            read_again_for_branch_1.clone(),
1739            read_again_for_branch_2.clone(),
1740            canceled_1.clone(),
1741            canceled_2.clone(),
1742            reason_1.clone(),
1743            reason_2.clone(),
1744            cancel_promise.clone(),
1745            reader_version.clone(),
1746            ByteTeeCancelAlgorithm::Cancel1Algorithm,
1747            ByteTeePullAlgorithm::Pull1Algorithm,
1748            can_gc,
1749        );
1750
1751        let byte_tee_source_2 = ByteTeeUnderlyingSource::new(
1752            reader.clone(),
1753            self,
1754            reading,
1755            read_again_for_branch_1,
1756            read_again_for_branch_2,
1757            canceled_1,
1758            canceled_2,
1759            reason_1,
1760            reason_2,
1761            cancel_promise.clone(),
1762            reader_version,
1763            ByteTeeCancelAlgorithm::Cancel2Algorithm,
1764            ByteTeePullAlgorithm::Pull2Algorithm,
1765            can_gc,
1766        );
1767
1768        // Set branch1 to ! CreateReadableByteStream(startAlgorithm, pull1Algorithm, cancel1Algorithm).
1769        let branch_1 = readable_byte_stream_tee(
1770            &self.global(),
1771            UnderlyingSourceType::TeeByte(Dom::from_ref(&byte_tee_source_1)),
1772            can_gc,
1773        );
1774        byte_tee_source_1.set_branch_1(&branch_1);
1775        byte_tee_source_2.set_branch_1(&branch_1);
1776
1777        // Set branch2 to ! CreateReadableByteStream(startAlgorithm, pull2Algorithm, cancel2Algorithm).
1778        let branch_2 = readable_byte_stream_tee(
1779            &self.global(),
1780            UnderlyingSourceType::TeeByte(Dom::from_ref(&byte_tee_source_2)),
1781            can_gc,
1782        );
1783        byte_tee_source_1.set_branch_2(&branch_2);
1784        byte_tee_source_2.set_branch_2(&branch_2);
1785
1786        // Perform forwardReaderError, given reader.
1787        byte_tee_source_1.forward_reader_error(reader.clone(), can_gc);
1788        byte_tee_source_2.forward_reader_error(reader, can_gc);
1789
1790        // Return « branch1, branch2 ».
1791        Ok(vec![branch_1, branch_2])
1792    }
1793
1794    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee>
1795    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1796    fn default_tee(
1797        &self,
1798        clone_for_branch_2: bool,
1799        can_gc: CanGc,
1800    ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1801        // Assert: stream implements ReadableStream.
1802
1803        // Assert: cloneForBranch2 is a boolean.
1804        let clone_for_branch_2 = Rc::new(Cell::new(clone_for_branch_2));
1805
1806        // Let reader be ? AcquireReadableStreamDefaultReader(stream).
1807        let reader = self.acquire_default_reader(can_gc)?;
1808
1809        // Let reading be false.
1810        let reading = Rc::new(Cell::new(false));
1811        // Let readAgain be false.
1812        let read_again = Rc::new(Cell::new(false));
1813        // Let canceled1 be false.
1814        let canceled_1 = Rc::new(Cell::new(false));
1815        // Let canceled2 be false.
1816        let canceled_2 = Rc::new(Cell::new(false));
1817
1818        // Let reason1 be undefined.
1819        let reason_1 = Rc::new(Heap::boxed(UndefinedValue()));
1820        // Let reason2 be undefined.
1821        let reason_2 = Rc::new(Heap::boxed(UndefinedValue()));
1822        // Let cancelPromise be a new promise.
1823        let cancel_promise = Promise::new(&self.global(), can_gc);
1824
1825        let tee_source_1 = DefaultTeeUnderlyingSource::new(
1826            &reader,
1827            self,
1828            reading.clone(),
1829            read_again.clone(),
1830            canceled_1.clone(),
1831            canceled_2.clone(),
1832            clone_for_branch_2.clone(),
1833            reason_1.clone(),
1834            reason_2.clone(),
1835            cancel_promise.clone(),
1836            DefaultTeeCancelAlgorithm::Cancel1Algorithm,
1837            can_gc,
1838        );
1839
1840        let underlying_source_type_branch_1 =
1841            UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_1));
1842
1843        let tee_source_2 = DefaultTeeUnderlyingSource::new(
1844            &reader,
1845            self,
1846            reading,
1847            read_again,
1848            canceled_1.clone(),
1849            canceled_2.clone(),
1850            clone_for_branch_2,
1851            reason_1,
1852            reason_2,
1853            cancel_promise.clone(),
1854            DefaultTeeCancelAlgorithm::Cancel2Algorithm,
1855            can_gc,
1856        );
1857
1858        let underlying_source_type_branch_2 =
1859            UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_2));
1860
1861        // Set branch_1 to ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel1Algorithm).
1862        let branch_1 = create_readable_stream(
1863            &self.global(),
1864            underlying_source_type_branch_1,
1865            None,
1866            None,
1867            can_gc,
1868        );
1869        tee_source_1.set_branch_1(&branch_1);
1870        tee_source_2.set_branch_1(&branch_1);
1871
1872        // Set branch_2 to ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel2Algorithm).
1873        let branch_2 = create_readable_stream(
1874            &self.global(),
1875            underlying_source_type_branch_2,
1876            None,
1877            None,
1878            can_gc,
1879        );
1880        tee_source_1.set_branch_2(&branch_2);
1881        tee_source_2.set_branch_2(&branch_2);
1882
1883        // Upon rejection of reader.[[closedPromise]] with reason r,
1884        reader.default_tee_append_native_handler_to_closed_promise(
1885            &branch_1,
1886            &branch_2,
1887            canceled_1,
1888            canceled_2,
1889            cancel_promise,
1890            can_gc,
1891        );
1892
1893        // Return « branch_1, branch_2 ».
1894        Ok(vec![branch_1, branch_2])
1895    }
1896
1897    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
1898    #[allow(clippy::too_many_arguments)]
1899    pub(crate) fn pipe_to(
1900        &self,
1901        cx: SafeJSContext,
1902        global: &GlobalScope,
1903        dest: &WritableStream,
1904        prevent_close: bool,
1905        prevent_abort: bool,
1906        prevent_cancel: bool,
1907        signal: Option<&AbortSignal>,
1908        realm: InRealm,
1909        can_gc: CanGc,
1910    ) -> Rc<Promise> {
1911        // Assert: source implements ReadableStream.
1912        // Assert: dest implements WritableStream.
1913        // Assert: prevent_close, prevent_abort, and prevent_cancel are all booleans.
1914        // Done with method signature types.
1915
1916        // If signal was not given, let signal be undefined.
1917        // Assert: either signal is undefined, or signal implements AbortSignal.
1918        // Note: done with the `signal` argument.
1919
1920        // Assert: ! IsReadableStreamLocked(source) is false.
1921        assert!(!self.is_locked());
1922
1923        // Assert: ! IsWritableStreamLocked(dest) is false.
1924        assert!(!dest.is_locked());
1925
1926        // If source.[[controller]] implements ReadableByteStreamController,
1927        // let reader be either ! AcquireReadableStreamBYOBReader(source)
1928        // or ! AcquireReadableStreamDefaultReader(source),
1929        // at the user agent’s discretion.
1930        // Note: for now only using default readers.
1931
1932        // Otherwise, let reader be ! AcquireReadableStreamDefaultReader(source).
1933        let reader = self
1934            .acquire_default_reader(can_gc)
1935            .expect("Acquiring a default reader for pipe_to cannot fail");
1936
1937        // Let writer be ! AcquireWritableStreamDefaultWriter(dest).
1938        let writer = dest
1939            .aquire_default_writer(cx, global, can_gc)
1940            .expect("Acquiring a default writer for pipe_to cannot fail");
1941
1942        // Set source.[[disturbed]] to true.
1943        self.disturbed.set(true);
1944
1945        // Let shuttingDown be false.
1946        // Done below with default.
1947
1948        // Let promise be a new promise.
1949        let promise = Promise::new(global, can_gc);
1950
1951        // In parallel, but not really, using reader and writer, read all chunks from source and write them to dest.
1952        rooted!(in(*cx) let pipe_to = PipeTo {
1953            reader: Dom::from_ref(&reader),
1954            writer: Dom::from_ref(&writer),
1955            pending_writes: Default::default(),
1956            state: Default::default(),
1957            prevent_abort,
1958            prevent_cancel,
1959            prevent_close,
1960            shutting_down: Default::default(),
1961            abort_reason: Default::default(),
1962            shutdown_error: Default::default(),
1963            shutdown_action_promise:  Default::default(),
1964            result_promise: promise.clone(),
1965        });
1966
1967        // If signal is not undefined,
1968        // Note: moving the steps to here, so that the `PipeTo` is available.
1969        if let Some(signal) = signal {
1970            // Let abortAlgorithm be the following steps:
1971            // Note: steps are implemented at call site.
1972            rooted!(in(*cx) let abort_algorithm = AbortAlgorithm::StreamPiping(pipe_to.clone()));
1973
1974            // If signal is aborted, perform abortAlgorithm and return promise.
1975            if signal.aborted() {
1976                signal.run_abort_algorithm(cx, global, &abort_algorithm, realm, can_gc);
1977                return promise;
1978            }
1979
1980            // Add abortAlgorithm to signal.
1981            signal.add(&abort_algorithm);
1982        }
1983
1984        // Note: perfom checks now, since streams can start as closed or errored.
1985        pipe_to.check_and_propagate_errors_forward(cx, global, realm, can_gc);
1986        pipe_to.check_and_propagate_errors_backward(cx, global, realm, can_gc);
1987        pipe_to.check_and_propagate_closing_forward(cx, global, realm, can_gc);
1988        pipe_to.check_and_propagate_closing_backward(cx, global, realm, can_gc);
1989
1990        // If we are not closed or errored,
1991        if *pipe_to.state.borrow() == PipeToState::Starting {
1992            // Start the pipe, by waiting on the writer being ready for a chunk.
1993            pipe_to.wait_for_writer_ready(global, realm, can_gc);
1994        }
1995
1996        // Return promise.
1997        promise
1998    }
1999
2000    /// <https://streams.spec.whatwg.org/#readable-stream-tee>
2001    pub(crate) fn tee(
2002        &self,
2003        clone_for_branch_2: bool,
2004        can_gc: CanGc,
2005    ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
2006        // Assert: stream implements ReadableStream.
2007        // Assert: cloneForBranch2 is a boolean.
2008
2009        match self.controller.borrow().as_ref() {
2010            Some(ControllerType::Default(_)) => {
2011                // Return ? ReadableStreamDefaultTee(stream, cloneForBranch2).
2012                self.default_tee(clone_for_branch_2, can_gc)
2013            },
2014            Some(ControllerType::Byte(_)) => {
2015                // If stream.[[controller]] implements ReadableByteStreamController,
2016                // return ? ReadableByteStreamTee(stream).
2017                self.byte_tee(can_gc)
2018            },
2019            None => {
2020                unreachable!("Stream should have a controller.");
2021            },
2022        }
2023    }
2024
2025    /// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller-from-underlying-source>
2026    pub(crate) fn set_up_byte_controller(
2027        &self,
2028        global: &GlobalScope,
2029        underlying_source_dict: JsUnderlyingSource,
2030        underlying_source_handle: SafeHandleObject,
2031        stream: DomRoot<ReadableStream>,
2032        strategy_hwm: f64,
2033        can_gc: CanGc,
2034    ) -> Fallible<()> {
2035        // Let pullAlgorithm be an algorithm that returns a promise resolved with undefined.
2036        // Let cancelAlgorithm be an algorithm that returns a promise resolved with undefined.
2037        // If underlyingSourceDict["start"] exists, then set startAlgorithm to an algorithm which returns the result
2038        // of invoking underlyingSourceDict["start"] with argument list « controller »
2039        // and callback this value underlyingSource.
2040        // If underlyingSourceDict["pull"] exists, then set pullAlgorithm to an algorithm which returns the result
2041        // of invoking underlyingSourceDict["pull"] with argument list « controller »
2042        // and callback this value underlyingSource.
2043        // If underlyingSourceDict["cancel"] exists, then set cancelAlgorithm to an algorithm which takes an
2044        // argument reason and returns the result of invoking underlyingSourceDict["cancel"] with argument list
2045        // « reason » and callback this value underlyingSource.
2046
2047        // Let autoAllocateChunkSize be underlyingSourceDict["autoAllocateChunkSize"],
2048        // if it exists, or undefined otherwise.
2049        // If autoAllocateChunkSize is 0, then throw a TypeError exception.
2050        if let Some(0) = underlying_source_dict.autoAllocateChunkSize {
2051            return Err(Error::Type("autoAllocateChunkSize cannot be 0".to_owned()));
2052        }
2053
2054        let controller = ReadableByteStreamController::new(
2055            UnderlyingSourceType::Js(underlying_source_dict, Heap::default()),
2056            strategy_hwm,
2057            global,
2058            can_gc,
2059        );
2060
2061        // Note: this must be done before `setup`,
2062        // otherwise `thisOb` is null in the start callback.
2063        controller.set_underlying_source_this_object(underlying_source_handle);
2064
2065        // Perform ? SetUpReadableByteStreamController(stream, controller, startAlgorithm,
2066        // pullAlgorithm, cancelAlgorithm, highWaterMark, autoAllocateChunkSize).
2067        controller.setup(global, stream, can_gc)
2068    }
2069
2070    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2071    pub(crate) fn setup_cross_realm_transform_readable(
2072        &self,
2073        cx: SafeJSContext,
2074        port: &MessagePort,
2075        can_gc: CanGc,
2076    ) {
2077        let port_id = port.message_port_id();
2078        let global = self.global();
2079
2080        // Perform ! InitializeReadableStream(stream).
2081        // Done in `new_inherited`.
2082
2083        // Let sizeAlgorithm be an algorithm that returns 1.
2084        let size_algorithm = extract_size_algorithm(&QueuingStrategy::default(), can_gc);
2085
2086        // Note: other algorithms defined in the underlying source container.
2087
2088        // Let controller be a new ReadableStreamDefaultController.
2089        let controller = ReadableStreamDefaultController::new(
2090            &self.global(),
2091            UnderlyingSourceType::Transfer(Dom::from_ref(port)),
2092            0.,
2093            size_algorithm,
2094            can_gc,
2095        );
2096
2097        // Add a handler for port’s message event with the following steps:
2098        // Add a handler for port’s messageerror event with the following steps:
2099        rooted!(in(*cx) let cross_realm_transform_readable = CrossRealmTransformReadable {
2100            controller: Dom::from_ref(&controller),
2101        });
2102        global.note_cross_realm_transform_readable(&cross_realm_transform_readable, port_id);
2103
2104        // Enable port’s port message queue.
2105        port.Start(can_gc);
2106
2107        // Perform ! SetUpReadableStreamDefaultController
2108        controller
2109            .setup(DomRoot::from_ref(self), can_gc)
2110            .expect("Setting up controller for transfer cannot fail.");
2111    }
2112}
2113
2114impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
2115    /// <https://streams.spec.whatwg.org/#rs-constructor>
2116    fn Constructor(
2117        cx: SafeJSContext,
2118        global: &GlobalScope,
2119        proto: Option<SafeHandleObject>,
2120        can_gc: CanGc,
2121        underlying_source: Option<*mut JSObject>,
2122        strategy: &QueuingStrategy,
2123    ) -> Fallible<DomRoot<Self>> {
2124        // If underlyingSource is missing, set it to null.
2125        rooted!(in(*cx) let underlying_source_obj = underlying_source.unwrap_or(ptr::null_mut()));
2126        // Let underlyingSourceDict be underlyingSource,
2127        // converted to an IDL value of type UnderlyingSource.
2128        let underlying_source_dict = if !underlying_source_obj.is_null() {
2129            rooted!(in(*cx) let obj_val = ObjectValue(underlying_source_obj.get()));
2130            match JsUnderlyingSource::new(cx, obj_val.handle(), can_gc) {
2131                Ok(ConversionResult::Success(val)) => val,
2132                Ok(ConversionResult::Failure(error)) => return Err(Error::Type(error.to_string())),
2133                _ => {
2134                    return Err(Error::JSFailed);
2135                },
2136            }
2137        } else {
2138            JsUnderlyingSource::empty()
2139        };
2140
2141        // Perform ! InitializeReadableStream(this).
2142        let stream = ReadableStream::new_with_proto(global, proto, can_gc);
2143
2144        if underlying_source_dict.type_.is_some() {
2145            // If strategy["size"] exists, throw a RangeError exception.
2146            if strategy.size.is_some() {
2147                return Err(Error::Range(
2148                    "size is not supported for byte streams".to_owned(),
2149                ));
2150            }
2151
2152            // Let highWaterMark be ? ExtractHighWaterMark(strategy, 0).
2153            let strategy_hwm = extract_high_water_mark(strategy, 0.0)?;
2154
2155            // Perform ? SetUpReadableByteStreamControllerFromUnderlyingSource(this,
2156            // underlyingSource, underlyingSourceDict, highWaterMark).
2157            stream.set_up_byte_controller(
2158                global,
2159                underlying_source_dict,
2160                underlying_source_obj.handle(),
2161                stream.clone(),
2162                strategy_hwm,
2163                can_gc,
2164            )?;
2165        } else {
2166            // Let highWaterMark be ? ExtractHighWaterMark(strategy, 1).
2167            let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
2168
2169            // Let sizeAlgorithm be ! ExtractSizeAlgorithm(strategy).
2170            let size_algorithm = extract_size_algorithm(strategy, can_gc);
2171
2172            let controller = ReadableStreamDefaultController::new(
2173                global,
2174                UnderlyingSourceType::Js(underlying_source_dict, Heap::default()),
2175                high_water_mark,
2176                size_algorithm,
2177                can_gc,
2178            );
2179
2180            // Note: this must be done before `setup`,
2181            // otherwise `thisOb` is null in the start callback.
2182            controller.set_underlying_source_this_object(underlying_source_obj.handle());
2183
2184            // Perform ? SetUpReadableStreamDefaultControllerFromUnderlyingSource
2185            controller.setup(stream.clone(), can_gc)?;
2186        };
2187
2188        Ok(stream)
2189    }
2190
2191    /// <https://streams.spec.whatwg.org/#rs-locked>
2192    fn Locked(&self) -> bool {
2193        self.is_locked()
2194    }
2195
2196    /// <https://streams.spec.whatwg.org/#rs-cancel>
2197    fn Cancel(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
2198        let global = self.global();
2199        if self.is_locked() {
2200            // If ! IsReadableStreamLocked(this) is true,
2201            // return a promise rejected with a TypeError exception.
2202            let promise = Promise::new(&global, can_gc);
2203            promise.reject_error(Error::Type("stream is locked".to_owned()), can_gc);
2204            promise
2205        } else {
2206            // Return ! ReadableStreamCancel(this, reason).
2207            self.cancel(cx, &global, reason, can_gc)
2208        }
2209    }
2210
2211    /// <https://streams.spec.whatwg.org/#rs-get-reader>
2212    fn GetReader(
2213        &self,
2214        options: &ReadableStreamGetReaderOptions,
2215        can_gc: CanGc,
2216    ) -> Fallible<ReadableStreamReader> {
2217        // 1, If options["mode"] does not exist, return ? AcquireReadableStreamDefaultReader(this).
2218        if options.mode.is_none() {
2219            return Ok(ReadableStreamReader::ReadableStreamDefaultReader(
2220                self.acquire_default_reader(can_gc)?,
2221            ));
2222        }
2223        // 2. Assert: options["mode"] is "byob".
2224        assert!(options.mode.unwrap() == ReadableStreamReaderMode::Byob);
2225
2226        // 3. Return ? AcquireReadableStreamBYOBReader(this).
2227        Ok(ReadableStreamReader::ReadableStreamBYOBReader(
2228            self.acquire_byob_reader(can_gc)?,
2229        ))
2230    }
2231
2232    /// <https://streams.spec.whatwg.org/#rs-tee>
2233    fn Tee(&self, can_gc: CanGc) -> Fallible<Vec<DomRoot<ReadableStream>>> {
2234        // Return ? ReadableStreamTee(this, false).
2235        self.tee(false, can_gc)
2236    }
2237
2238    /// <https://streams.spec.whatwg.org/#rs-pipe-to>
2239    fn PipeTo(
2240        &self,
2241        destination: &WritableStream,
2242        options: &StreamPipeOptions,
2243        realm: InRealm,
2244        can_gc: CanGc,
2245    ) -> Rc<Promise> {
2246        let cx = GlobalScope::get_cx();
2247        let global = self.global();
2248
2249        // If ! IsReadableStreamLocked(this) is true,
2250        if self.is_locked() {
2251            // return a promise rejected with a TypeError exception.
2252            let promise = Promise::new(&global, can_gc);
2253            promise.reject_error(Error::Type("Source stream is locked".to_owned()), can_gc);
2254            return promise;
2255        }
2256
2257        // If ! IsWritableStreamLocked(destination) is true,
2258        if destination.is_locked() {
2259            // return a promise rejected with a TypeError exception.
2260            let promise = Promise::new(&global, can_gc);
2261            promise.reject_error(
2262                Error::Type("Destination stream is locked".to_owned()),
2263                can_gc,
2264            );
2265            return promise;
2266        }
2267
2268        // Let signal be options["signal"] if it exists, or undefined otherwise.
2269        let signal = options.signal.as_deref();
2270
2271        // Return ! ReadableStreamPipeTo.
2272        self.pipe_to(
2273            cx,
2274            &global,
2275            destination,
2276            options.preventClose,
2277            options.preventAbort,
2278            options.preventCancel,
2279            signal,
2280            realm,
2281            can_gc,
2282        )
2283    }
2284
2285    /// <https://streams.spec.whatwg.org/#rs-pipe-through>
2286    fn PipeThrough(
2287        &self,
2288        transform: &ReadableWritablePair,
2289        options: &StreamPipeOptions,
2290        realm: InRealm,
2291        can_gc: CanGc,
2292    ) -> Fallible<DomRoot<ReadableStream>> {
2293        let global = self.global();
2294        let cx = GlobalScope::get_cx();
2295
2296        // If ! IsReadableStreamLocked(this) is true, throw a TypeError exception.
2297        if self.is_locked() {
2298            return Err(Error::Type("Source stream is locked".to_owned()));
2299        }
2300
2301        // If ! IsWritableStreamLocked(transform["writable"]) is true, throw a TypeError exception.
2302        if transform.writable.is_locked() {
2303            return Err(Error::Type("Destination stream is locked".to_owned()));
2304        }
2305
2306        // Let signal be options["signal"] if it exists, or undefined otherwise.
2307        let signal = options.signal.as_deref();
2308
2309        // Let promise be ! ReadableStreamPipeTo(this, transform["writable"],
2310        // options["preventClose"], options["preventAbort"], options["preventCancel"], signal).
2311        let promise = self.pipe_to(
2312            cx,
2313            &global,
2314            &transform.writable,
2315            options.preventClose,
2316            options.preventAbort,
2317            options.preventCancel,
2318            signal,
2319            realm,
2320            can_gc,
2321        );
2322
2323        // Set promise.[[PromiseIsHandled]] to true.
2324        promise.set_promise_is_handled();
2325
2326        // Return transform["readable"].
2327        Ok(transform.readable.clone())
2328    }
2329}
2330
2331#[expect(unsafe_code)]
2332/// The initial steps for the message handler for both readable and writable cross realm transforms.
2333/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2334/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
2335pub(crate) unsafe fn get_type_and_value_from_message(
2336    cx: SafeJSContext,
2337    data: SafeHandleValue,
2338    value: SafeMutableHandleValue,
2339    can_gc: CanGc,
2340) -> DOMString {
2341    // Let data be the data of the message.
2342    // Note: we are passed the data as argument,
2343    // which originates in the return value of `structuredclone::read`.
2344
2345    // Assert: data is an Object.
2346    assert!(data.is_object());
2347    rooted!(in(*cx) let data_object = data.to_object());
2348
2349    // Let type be ! Get(data, "type").
2350    rooted!(in(*cx) let mut type_ = UndefinedValue());
2351    unsafe {
2352        get_dictionary_property(
2353            *cx,
2354            data_object.handle(),
2355            c"type",
2356            type_.handle_mut(),
2357            can_gc,
2358        )
2359    }
2360    .expect("Getting the type should not fail.");
2361
2362    // Let value be ! Get(data, "value").
2363    unsafe { get_dictionary_property(*cx, data_object.handle(), c"value", value, can_gc) }
2364        .expect("Getting the value should not fail.");
2365
2366    // Assert: type is a String.
2367    let result =
2368        DOMString::safe_from_jsval(cx, type_.handle(), StringificationBehavior::Empty, can_gc)
2369            .expect("The type of the message should be a string");
2370    let ConversionResult::Success(type_string) = result else {
2371        unreachable!("The type of the message should be a string");
2372    };
2373
2374    type_string
2375}
2376
2377impl js::gc::Rootable for CrossRealmTransformReadable {}
2378
2379/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2380/// A wrapper to handle `message` and `messageerror` events
2381/// for the port used by the transfered stream.
2382#[derive(Clone, JSTraceable, MallocSizeOf)]
2383#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
2384pub(crate) struct CrossRealmTransformReadable {
2385    /// The controller used in the algorithm.
2386    controller: Dom<ReadableStreamDefaultController>,
2387}
2388
2389impl CrossRealmTransformReadable {
2390    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2391    /// Add a handler for port’s message event with the following steps:
2392    #[expect(unsafe_code)]
2393    pub(crate) fn handle_message(
2394        &self,
2395        cx: SafeJSContext,
2396        global: &GlobalScope,
2397        port: &MessagePort,
2398        message: SafeHandleValue,
2399        _realm: InRealm,
2400        can_gc: CanGc,
2401    ) {
2402        rooted!(in(*cx) let mut value = UndefinedValue());
2403        let type_string =
2404            unsafe { get_type_and_value_from_message(cx, message, value.handle_mut(), can_gc) };
2405
2406        // If type is "chunk",
2407        if type_string == "chunk" {
2408            // Perform ! ReadableStreamDefaultControllerEnqueue(controller, value).
2409            self.controller
2410                .enqueue(cx, value.handle(), can_gc)
2411                .expect("Enqueing a chunk should not fail.");
2412        }
2413
2414        // Otherwise, if type is "close",
2415        if type_string == "close" {
2416            // Perform ! ReadableStreamDefaultControllerClose(controller).
2417            self.controller.close(can_gc);
2418
2419            // Disentangle port.
2420            global.disentangle_port(port, can_gc);
2421        }
2422
2423        // Otherwise, if type is "error",
2424        if type_string == "error" {
2425            // Perform ! ReadableStreamDefaultControllerError(controller, value).
2426            self.controller.error(value.handle(), can_gc);
2427
2428            // Disentangle port.
2429            global.disentangle_port(port, can_gc);
2430        }
2431    }
2432
2433    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
2434    /// Add a handler for port’s messageerror event with the following steps:
2435    pub(crate) fn handle_error(
2436        &self,
2437        cx: SafeJSContext,
2438        global: &GlobalScope,
2439        port: &MessagePort,
2440        _realm: InRealm,
2441        can_gc: CanGc,
2442    ) {
2443        // Let error be a new "DataCloneError" DOMException.
2444        let error = DOMException::new(global, DOMErrorName::DataCloneError, can_gc);
2445        rooted!(in(*cx) let mut rooted_error = UndefinedValue());
2446        error.safe_to_jsval(cx, rooted_error.handle_mut(), can_gc);
2447
2448        // Perform ! CrossRealmTransformSendError(port, error).
2449        port.cross_realm_transform_send_error(rooted_error.handle(), can_gc);
2450
2451        // Perform ! ReadableStreamDefaultControllerError(controller, error).
2452        self.controller.error(rooted_error.handle(), can_gc);
2453
2454        // Disentangle port.
2455        global.disentangle_port(port, can_gc);
2456    }
2457}
2458
2459#[expect(unsafe_code)]
2460/// Get the `done` property of an object that a read promise resolved to.
2461pub(crate) fn get_read_promise_done(
2462    cx: SafeJSContext,
2463    v: &SafeHandleValue,
2464    can_gc: CanGc,
2465) -> Result<bool, Error> {
2466    if !v.is_object() {
2467        return Err(Error::Type("Unknown format for done property.".to_string()));
2468    }
2469    unsafe {
2470        rooted!(in(*cx) let object = v.to_object());
2471        rooted!(in(*cx) let mut done = UndefinedValue());
2472        match get_dictionary_property(*cx, object.handle(), c"done", done.handle_mut(), can_gc) {
2473            Ok(true) => match bool::safe_from_jsval(cx, done.handle(), (), can_gc) {
2474                Ok(ConversionResult::Success(val)) => Ok(val),
2475                Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.to_string())),
2476                _ => Err(Error::Type("Unknown format for done property.".to_string())),
2477            },
2478            Ok(false) => Err(Error::Type("Promise has no done property.".to_string())),
2479            Err(()) => Err(Error::JSFailed),
2480        }
2481    }
2482}
2483
2484#[expect(unsafe_code)]
2485/// Get the `value` property of an object that a read promise resolved to.
2486pub(crate) fn get_read_promise_bytes(
2487    cx: SafeJSContext,
2488    v: &SafeHandleValue,
2489    can_gc: CanGc,
2490) -> Result<Vec<u8>, Error> {
2491    if !v.is_object() {
2492        return Err(Error::Type(
2493            "Unknown format for for bytes read.".to_string(),
2494        ));
2495    }
2496    unsafe {
2497        rooted!(in(*cx) let object = v.to_object());
2498        rooted!(in(*cx) let mut bytes = UndefinedValue());
2499        match get_dictionary_property(*cx, object.handle(), c"value", bytes.handle_mut(), can_gc) {
2500            Ok(true) => {
2501                match Vec::<u8>::safe_from_jsval(
2502                    cx,
2503                    bytes.handle(),
2504                    ConversionBehavior::EnforceRange,
2505                    can_gc,
2506                ) {
2507                    Ok(ConversionResult::Success(val)) => Ok(val),
2508                    Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.to_string())),
2509                    _ => Err(Error::Type("Unknown format for bytes read.".to_string())),
2510                }
2511            },
2512            Ok(false) => Err(Error::Type("Promise has no value property.".to_string())),
2513            Err(()) => Err(Error::JSFailed),
2514        }
2515    }
2516}
2517
2518/// Convert a raw stream `chunk` JS value to `Vec<u8>`.
2519/// This mirrors the conversion used inside `get_read_promise_bytes`,
2520/// but operates on the raw chunk (no `{ value, done }` wrapper).
2521pub(crate) fn bytes_from_chunk_jsval(
2522    cx: SafeJSContext,
2523    chunk: &RootedTraceableBox<Heap<JSVal>>,
2524    can_gc: CanGc,
2525) -> Result<Vec<u8>, Error> {
2526    match Vec::<u8>::safe_from_jsval(cx, chunk.handle(), ConversionBehavior::EnforceRange, can_gc) {
2527        Ok(ConversionResult::Success(vec)) => Ok(vec),
2528        Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.to_string())),
2529        _ => Err(Error::Type("Unknown format for bytes read.".to_string())),
2530    }
2531}
2532
2533/// <https://streams.spec.whatwg.org/#rs-transfer>
2534impl Transferable for ReadableStream {
2535    type Index = MessagePortIndex;
2536    type Data = MessagePortImpl;
2537
2538    /// <https://streams.spec.whatwg.org/#ref-for-transfer-steps>
2539    fn transfer(&self) -> Fallible<(MessagePortId, MessagePortImpl)> {
2540        // Step 1. If ! IsReadableStreamLocked(value) is true, throw a
2541        // "DataCloneError" DOMException.
2542        if self.is_locked() {
2543            return Err(Error::DataClone(None));
2544        }
2545
2546        let global = self.global();
2547        let realm = enter_realm(&*global);
2548        let comp = InRealm::Entered(&realm);
2549        let cx = GlobalScope::get_cx();
2550        let can_gc = CanGc::note();
2551
2552        // Step 2. Let port1 be a new MessagePort in the current Realm.
2553        let port_1 = MessagePort::new(&global, can_gc);
2554        global.track_message_port(&port_1, None);
2555
2556        // Step 3. Let port2 be a new MessagePort in the current Realm.
2557        let port_2 = MessagePort::new(&global, can_gc);
2558        global.track_message_port(&port_2, None);
2559
2560        // Step 4. Entangle port1 and port2.
2561        global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
2562
2563        // Step 5. Let writable be a new WritableStream in the current Realm.
2564        let writable = WritableStream::new_with_proto(&global, None, can_gc);
2565
2566        // Step 6. Perform ! SetUpCrossRealmTransformWritable(writable, port1).
2567        writable.setup_cross_realm_transform_writable(cx, &port_1, can_gc);
2568
2569        // Step 7. Let promise be ! ReadableStreamPipeTo(value, writable, false, false, false).
2570        let promise = self.pipe_to(
2571            cx, &global, &writable, false, false, false, None, comp, can_gc,
2572        );
2573
2574        // Step 8. Set promise.[[PromiseIsHandled]] to true.
2575        promise.set_promise_is_handled();
2576
2577        // Step 9. Set dataHolder.[[port]] to ! StructuredSerializeWithTransfer(port2, « port2 »).
2578        port_2.transfer()
2579    }
2580
2581    /// <https://streams.spec.whatwg.org/#ref-for-transfer-receiving-steps>
2582    fn transfer_receive(
2583        owner: &GlobalScope,
2584        id: MessagePortId,
2585        port_impl: MessagePortImpl,
2586    ) -> Result<DomRoot<Self>, ()> {
2587        let cx = GlobalScope::get_cx();
2588        let can_gc = CanGc::note();
2589
2590        // Their transfer-receiving steps, given dataHolder and value, are:
2591        // Note: dataHolder is used in `structuredclone.rs`, and value is created here.
2592        let value = ReadableStream::new_with_proto(owner, None, can_gc);
2593
2594        // Step 1. Let deserializedRecord be !
2595        // StructuredDeserializeWithTransfer(dataHolder.[[port]], the current
2596        // Realm).
2597        // Done with the `Deserialize` derive of `MessagePortImpl`.
2598
2599        // Step 2. Let port be deserializedRecord.[[Deserialized]].
2600        let transferred_port = MessagePort::transfer_receive(owner, id, port_impl)?;
2601
2602        // Step 3. Perform ! SetUpCrossRealmTransformReadable(value, port).
2603        value.setup_cross_realm_transform_readable(cx, &transferred_port, can_gc);
2604        Ok(value)
2605    }
2606
2607    /// Note: we are relying on the port transfer, so the data returned here are related to the port.
2608    fn serialized_storage<'a>(
2609        data: StructuredData<'a, '_>,
2610    ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
2611        match data {
2612            StructuredData::Reader(r) => &mut r.port_impls,
2613            StructuredData::Writer(w) => &mut w.ports,
2614        }
2615    }
2616}