script/dom/stream/
readablestream.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr::{self};
8use std::rc::Rc;
9
10use base::generic_channel::GenericSharedMemory;
11use base::id::{MessagePortId, MessagePortIndex};
12use constellation_traits::MessagePortImpl;
13use dom_struct::dom_struct;
14use js::jsapi::{Heap, JSObject};
15use js::jsval::{JSVal, ObjectValue, UndefinedValue};
16use js::realm::CurrentRealm;
17use js::rust::{
18    HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
19    MutableHandleValue as SafeMutableHandleValue,
20};
21use js::typedarray::ArrayBufferViewU8;
22use rustc_hash::FxHashMap;
23use script_bindings::conversions::SafeToJSValConvertible;
24
25use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
26use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{
27    ReadableStreamGetReaderOptions, ReadableStreamMethods, ReadableStreamReaderMode,
28    ReadableWritablePair, StreamPipeOptions,
29};
30use script_bindings::str::DOMString;
31
32use crate::dom::domexception::{DOMErrorName, DOMException};
33use script_bindings::conversions::{is_array_like, StringificationBehavior};
34use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
35use crate::dom::abortsignal::{AbortAlgorithm, AbortSignal};
36use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods;
37use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultController_Binding::ReadableStreamDefaultControllerMethods;
38use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource;
39use crate::dom::bindings::conversions::{ConversionBehavior, ConversionResult, SafeFromJSValConvertible};
40use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
41use crate::dom::bindings::codegen::GenericBindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriter_Binding::WritableStreamDefaultWriterMethods;
42use crate::dom::stream::writablestream::WritableStream;
43use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultReaderOrReadableStreamBYOBReader as ReadableStreamReader;
44use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
45use crate::dom::bindings::root::{DomRoot, MutNullableDom, Dom};
46use crate::dom::bindings::trace::RootedTraceableBox;
47use crate::dom::bindings::utils::get_dictionary_property;
48use crate::dom::stream::byteteeunderlyingsource::{ByteTeeCancelAlgorithm, ByteTeePullAlgorithm, ByteTeeUnderlyingSource};
49use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
50use crate::dom::stream::readablestreamgenericreader::ReadableStreamGenericReader;
51use crate::dom::globalscope::GlobalScope;
52use crate::dom::promise::{wait_for_all_promise, Promise};
53use crate::dom::stream::readablebytestreamcontroller::ReadableByteStreamController;
54use crate::dom::stream::readablestreambyobreader::ReadableStreamBYOBReader;
55use crate::dom::stream::readablestreamdefaultcontroller::ReadableStreamDefaultController;
56use crate::dom::stream::readablestreamdefaultreader::{ReadRequest, ReadableStreamDefaultReader};
57use crate::dom::stream::defaultteeunderlyingsource::DefaultTeeCancelAlgorithm;
58use crate::dom::types::DefaultTeeUnderlyingSource;
59use crate::dom::stream::underlyingsourcecontainer::UnderlyingSourceType;
60use crate::dom::stream::writablestreamdefaultwriter::WritableStreamDefaultWriter;
61use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
62use crate::dom::messageport::MessagePort;
63use crate::realms::{enter_realm, InRealm, enter_auto_realm};
64use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
65use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
66use crate::dom::bindings::transferable::Transferable;
67use crate::dom::bindings::structuredclone::StructuredData;
68
69use crate::dom::bindings::buffer_source::HeapBufferSource;
70use super::readablestreambyobreader::ReadIntoRequest;
71
72/// State Machine for `PipeTo`.
73#[derive(Clone, Debug, Default, MallocSizeOf, PartialEq)]
74enum PipeToState {
75    /// The starting state
76    #[default]
77    Starting,
78    /// Waiting for the writer to be ready
79    PendingReady,
80    /// Waiting for a read to resolve.
81    PendingRead,
82    /// Waiting for all pending writes to finish,
83    /// as part of shutting down with an optional action.
84    ShuttingDownWithPendingWrites(Option<ShutdownAction>),
85    /// When shutting down with an action,
86    /// waiting for the action to complete,
87    /// at which point we can `finalize`.
88    ShuttingDownPendingAction,
89    /// The pipe has been finalized,
90    /// no further actions should be performed.
91    Finalized,
92}
93
94/// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action>
95#[derive(Clone, Debug, MallocSizeOf, PartialEq)]
96enum ShutdownAction {
97    /// <https://streams.spec.whatwg.org/#writable-stream-abort>
98    WritableStreamAbort,
99    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>
100    ReadableStreamCancel,
101    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close-with-error-propagation>
102    WritableStreamDefaultWriterCloseWithErrorPropagation,
103    /// <https://streams.spec.whatwg.org/#ref-for-rs-pipeTo-shutdown-with-action>
104    Abort,
105}
106
107impl js::gc::Rootable for PipeTo {}
108
109/// The "in parallel, but not really" part of
110/// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
111///
112/// Note: the spec is flexible about how this is done, but requires the following constraints to apply:
113/// - Public API must not be used: we'll only use Rust.
114/// - Backpressure must be enforced: we'll only read from source when dest is ready.
115/// - Shutdown must stop activity: we'll do this together with the below.
116/// - Error and close states must be propagated: we'll do this by checking these states at every step.
117#[derive(Clone, JSTraceable, MallocSizeOf)]
118#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
119pub(crate) struct PipeTo {
120    /// <https://streams.spec.whatwg.org/#ref-for-readablestream%E2%91%A7%E2%91%A0>
121    reader: Dom<ReadableStreamDefaultReader>,
122
123    /// <https://streams.spec.whatwg.org/#ref-for-acquire-writable-stream-default-writer>
124    writer: Dom<WritableStreamDefaultWriter>,
125
126    /// Pending writes are needed when shutting down(with an action),
127    /// because we can only finalize when all writes are finished.
128    #[ignore_malloc_size_of = "nested Rc"]
129    pending_writes: Rc<RefCell<VecDeque<Rc<Promise>>>>,
130
131    /// The state machine.
132    #[conditional_malloc_size_of]
133    #[no_trace]
134    state: Rc<RefCell<PipeToState>>,
135
136    /// <https://streams.spec.whatwg.org/#readablestream-pipe-to-preventabort>
137    prevent_abort: bool,
138
139    /// <https://streams.spec.whatwg.org/#readablestream-pipe-to-preventcancel>
140    prevent_cancel: bool,
141
142    /// <https://streams.spec.whatwg.org/#readablestream-pipe-to-preventclose>
143    prevent_close: bool,
144
145    /// The `shuttingDown` variable of
146    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
147    #[conditional_malloc_size_of]
148    shutting_down: Rc<Cell<bool>>,
149
150    /// The abort reason of the abort signal,
151    /// stored here because we must keep it across a microtask.
152    #[ignore_malloc_size_of = "mozjs"]
153    abort_reason: Rc<Heap<JSVal>>,
154
155    /// The error potentially passed to shutdown,
156    /// stored here because we must keep it across a microtask.
157    #[ignore_malloc_size_of = "mozjs"]
158    shutdown_error: Rc<RefCell<Option<Heap<JSVal>>>>,
159
160    /// The promise returned by a shutdown action.
161    /// We keep it to only continue when it is not pending anymore.
162    #[ignore_malloc_size_of = "nested Rc"]
163    shutdown_action_promise: Rc<RefCell<Option<Rc<Promise>>>>,
164
165    /// The promise resolved or rejected at
166    /// <https://streams.spec.whatwg.org/#rs-pipeTo-finalize>
167    #[conditional_malloc_size_of]
168    result_promise: Rc<Promise>,
169}
170
171impl PipeTo {
172    /// Run the `abortAlgorithm` defined at
173    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
174    pub(crate) fn abort_with_reason(
175        &self,
176        cx: &mut CurrentRealm,
177        global: &GlobalScope,
178        reason: SafeHandleValue,
179    ) {
180        // Abort should do nothing if we are already shutting down.
181        if self.shutting_down.get() {
182            return;
183        }
184
185        // Let error be signal’s abort reason.
186        // Note: storing it because it may need to be kept across a microtask,
187        // and see the note below as to why it is kept separately from `shutdown_error`.
188        self.abort_reason.set(reason.get());
189
190        // Note: setting the error now,
191        // will result in a rejection of the pipe promise, with this error.
192        // Unless any shutdown action raise their own error,
193        // in which case this error will be overwritten by the shutdown action error.
194        self.set_shutdown_error(reason);
195
196        // Let actions be an empty ordered set.
197        // Note: the actions are defined, and performed, inside `shutdown_with_an_action`.
198
199        // Shutdown with an action consisting of getting a promise to wait for all of the actions in actions,
200        // and with error.
201        self.shutdown(cx, global, Some(ShutdownAction::Abort));
202    }
203}
204
205impl Callback for PipeTo {
206    /// The pipe makes progress one microtask at a time.
207    /// Note: we use one struct as the callback for all promises,
208    /// and for both of their reactions.
209    ///
210    /// The context of the callback is determined from:
211    /// - the current state.
212    /// - the type of `result`.
213    /// - the state of a stored promise(in some cases).
214    #[expect(unsafe_code)]
215    fn callback(&self, cx: &mut CurrentRealm, result: SafeHandleValue) {
216        let in_realm_proof = cx.into();
217        let realm = InRealm::Already(&in_realm_proof);
218        let global = self.reader.global();
219
220        // Note: we only care about the result of writes when they are rejected,
221        // and the error is accessed not through handlers,
222        // but directly using `dest.get_stored_error`.
223        // So we must mark rejected promises as handled
224        // to prevent unhandled rejection errors.
225        self.pending_writes.borrow_mut().retain(|p| {
226            let pending = p.is_pending();
227            if !pending {
228                p.set_promise_is_handled();
229            }
230            pending
231        });
232
233        // Note: cloning to prevent re-borrow in methods called below.
234        let state_before_checks = self.state.borrow().clone();
235
236        // Note: if we are in a `PendingRead` state,
237        // and the source is closed,
238        // we try to write chunks before doing any shutdown,
239        // which is necessary to implement the
240        // "If any chunks have been read but not yet written, write them to dest."
241        // part of shutdown.
242        if state_before_checks == PipeToState::PendingRead {
243            let source = self.reader.get_stream().expect("Source stream must be set");
244            if source.is_closed() {
245                let dest = self
246                    .writer
247                    .get_stream()
248                    .expect("Destination stream must be set");
249
250                // If dest.[[state]] is "writable",
251                // and ! WritableStreamCloseQueuedOrInFlight(dest) is false,
252                if dest.is_writable() && !dest.close_queued_or_in_flight() {
253                    let Ok(done) = get_read_promise_done(cx.into(), &result, CanGc::from_cx(cx))
254                    else {
255                        // This is the case that the microtask ran in reaction
256                        // to the closed promise of the reader,
257                        // so we should wait for subsequent chunks,
258                        // and skip the shutdown below
259                        // (reader is closed, but there are still pending reads).
260                        // Shutdown will happen when the last chunk has been received.
261                        return;
262                    };
263
264                    if !done {
265                        // If any chunks have been read but not yet written, write them to dest.
266                        self.write_chunk(cx.into(), &global, result, CanGc::from_cx(cx));
267                    }
268                }
269            }
270        }
271
272        self.check_and_propagate_errors_forward(cx, &global);
273        self.check_and_propagate_errors_backward(cx, &global);
274        self.check_and_propagate_closing_forward(cx, &global);
275        self.check_and_propagate_closing_backward(cx, &global);
276
277        // Note: cloning to prevent re-borrow in methods called below.
278        let state = self.state.borrow().clone();
279
280        // If we switched to a shutdown state,
281        // return.
282        // Progress will be made at the next tick.
283        if state != state_before_checks {
284            return;
285        }
286
287        match state {
288            PipeToState::Starting => unreachable!("PipeTo should not be in the Starting state."),
289            PipeToState::PendingReady => {
290                // Read a chunk.
291                self.read_chunk(&global, realm, CanGc::from_cx(cx));
292            },
293            PipeToState::PendingRead => {
294                // Write the chunk.
295                self.write_chunk(cx.into(), &global, result, CanGc::from_cx(cx));
296
297                // An early return is necessary if the write algorithm aborted the pipe.
298                if self.shutting_down.get() {
299                    return;
300                }
301
302                // Wait for the writer to be ready again.
303                self.wait_for_writer_ready(&global, realm, CanGc::from_cx(cx));
304            },
305            PipeToState::ShuttingDownWithPendingWrites(action) => {
306                // Wait until every chunk that has been read has been written
307                // (i.e. the corresponding promises have settled).
308                if let Some(write) = self.pending_writes.borrow_mut().front().cloned() {
309                    self.wait_on_pending_write(&global, write, realm, CanGc::from_cx(cx));
310                    return;
311                }
312
313                // Note: error is stored in `self.shutdown_error`.
314                if let Some(action) = action {
315                    // Let p be the result of performing action.
316                    self.perform_action(cx, &global, action);
317                } else {
318                    // Finalize, passing along error if it was given.
319                    self.finalize(cx.into(), &global, CanGc::from_cx(cx));
320                }
321            },
322            PipeToState::ShuttingDownPendingAction => {
323                let Some(ref promise) = *self.shutdown_action_promise.borrow() else {
324                    unreachable!();
325                };
326                if promise.is_pending() {
327                    // While waiting for the action to complete,
328                    // we may get callbacks for other promises(closed, ready),
329                    // and we should ignore those.
330                    return;
331                }
332
333                let is_array_like = {
334                    if !result.is_object() {
335                        false
336                    } else {
337                        unsafe { is_array_like::<crate::DomTypeHolder>(cx.raw_cx(), result) }
338                    }
339                };
340
341                // Finalize, passing along error if it was given.
342                if !result.is_undefined() && !is_array_like {
343                    // Most actions either resolve with undefined,
344                    // or reject with an error,
345                    // and the error should be used when finalizing.
346                    // One exception is the `Abort` action,
347                    // which resolves with a list of undefined values.
348
349                    // If `result` isn't undefined or array-like,
350                    // then it is an error
351                    // and should overwrite the current shutdown error.
352                    self.set_shutdown_error(result);
353                }
354                self.finalize(cx.into(), &global, CanGc::from_cx(cx));
355            },
356            PipeToState::Finalized => {},
357        }
358    }
359}
360
361impl PipeTo {
362    /// Setting shutdown error in a way that ensures it isn't
363    /// moved after it has been set.
364    fn set_shutdown_error(&self, error: SafeHandleValue) {
365        *self.shutdown_error.borrow_mut() = Some(Heap::default());
366        let Some(ref heap) = *self.shutdown_error.borrow() else {
367            unreachable!("Option set to Some(heap) above.");
368        };
369        heap.set(error.get())
370    }
371
372    /// Wait for the writer to be ready,
373    /// which implements the constraint that backpressure must be enforced.
374    fn wait_for_writer_ready(&self, global: &GlobalScope, realm: InRealm, can_gc: CanGc) {
375        {
376            let mut state = self.state.borrow_mut();
377            *state = PipeToState::PendingReady;
378        }
379
380        let ready_promise = self.writer.Ready();
381        if ready_promise.is_fulfilled() {
382            self.read_chunk(global, realm, can_gc);
383        } else {
384            let handler = PromiseNativeHandler::new(
385                global,
386                Some(Box::new(self.clone())),
387                Some(Box::new(self.clone())),
388                can_gc,
389            );
390            ready_promise.append_native_handler(&handler, realm, can_gc);
391
392            // Note: if the writer is not ready,
393            // in order to ensure progress we must
394            // also react to the closure of the source(because source may close empty).
395            let closed_promise = self.reader.Closed();
396            closed_promise.append_native_handler(&handler, realm, can_gc);
397        }
398    }
399
400    /// Read a chunk
401    fn read_chunk(&self, global: &GlobalScope, realm: InRealm, can_gc: CanGc) {
402        *self.state.borrow_mut() = PipeToState::PendingRead;
403        let chunk_promise = self.reader.Read(can_gc);
404        let handler = PromiseNativeHandler::new(
405            global,
406            Some(Box::new(self.clone())),
407            Some(Box::new(self.clone())),
408            can_gc,
409        );
410        chunk_promise.append_native_handler(&handler, realm, can_gc);
411
412        // Note: in order to ensure progress we must
413        // also react to the closure of the destination.
414        let ready_promise = self.writer.Closed();
415        ready_promise.append_native_handler(&handler, realm, can_gc);
416    }
417
418    /// Try to write a chunk using the jsval, and returns wether it succeeded
419    // It will fail if it is the last `done` chunk, or if it is not a chunk at all.
420    #[expect(unsafe_code)]
421    fn write_chunk(
422        &self,
423        cx: SafeJSContext,
424        global: &GlobalScope,
425        chunk: SafeHandleValue,
426        can_gc: CanGc,
427    ) -> bool {
428        if chunk.is_object() {
429            rooted!(in(*cx) let object = chunk.to_object());
430            rooted!(in(*cx) let mut bytes = UndefinedValue());
431            let has_value = unsafe {
432                get_dictionary_property(*cx, object.handle(), c"value", bytes.handle_mut(), can_gc)
433                    .expect("Chunk should have a value.")
434            };
435            if has_value {
436                // Write the chunk.
437                let write_promise = self.writer.write(cx, global, bytes.handle(), can_gc);
438                self.pending_writes.borrow_mut().push_back(write_promise);
439                return true;
440            }
441        }
442        false
443    }
444
445    /// Only as part of shutting-down do we wait on pending writes
446    /// (backpressure is communicated not through pending writes
447    /// but through the readiness of the writer).
448    fn wait_on_pending_write(
449        &self,
450        global: &GlobalScope,
451        promise: Rc<Promise>,
452        realm: InRealm,
453        can_gc: CanGc,
454    ) {
455        let handler = PromiseNativeHandler::new(
456            global,
457            Some(Box::new(self.clone())),
458            Some(Box::new(self.clone())),
459            can_gc,
460        );
461        promise.append_native_handler(&handler, realm, can_gc);
462    }
463
464    /// Errors must be propagated forward part of
465    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
466    fn check_and_propagate_errors_forward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
467        // An early return is necessary if we are shutting down,
468        // because in that case the source can already have been set to none.
469        if self.shutting_down.get() {
470            return;
471        }
472
473        // if source.[[state]] is or becomes "errored", then
474        let source = self
475            .reader
476            .get_stream()
477            .expect("Reader should still have a stream");
478        if source.is_errored() {
479            rooted!(&in(cx) let mut source_error = UndefinedValue());
480            source.get_stored_error(source_error.handle_mut());
481            self.set_shutdown_error(source_error.handle());
482
483            // If preventAbort is false,
484            if !self.prevent_abort {
485                // shutdown with an action of ! WritableStreamAbort(dest, source.[[storedError]])
486                // and with source.[[storedError]].
487                self.shutdown(cx, global, Some(ShutdownAction::WritableStreamAbort))
488            } else {
489                // Otherwise, shutdown with source.[[storedError]].
490                self.shutdown(cx, global, None);
491            }
492        }
493    }
494
495    /// Errors must be propagated backward part of
496    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
497    fn check_and_propagate_errors_backward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
498        // An early return is necessary if we are shutting down,
499        // because in that case the destination can already have been set to none.
500        if self.shutting_down.get() {
501            return;
502        }
503
504        // if dest.[[state]] is or becomes "errored", then
505        let dest = self
506            .writer
507            .get_stream()
508            .expect("Writer should still have a stream");
509        if dest.is_errored() {
510            rooted!(&in(cx) let mut dest_error = UndefinedValue());
511            dest.get_stored_error(dest_error.handle_mut());
512            self.set_shutdown_error(dest_error.handle());
513
514            // If preventCancel is false,
515            if !self.prevent_cancel {
516                // shutdown with an action of ! ReadableStreamCancel(source, dest.[[storedError]])
517                // and with dest.[[storedError]].
518                self.shutdown(cx, global, Some(ShutdownAction::ReadableStreamCancel))
519            } else {
520                // Otherwise, shutdown with dest.[[storedError]].
521                self.shutdown(cx, global, None);
522            }
523        }
524    }
525
526    /// Closing must be propagated forward part of
527    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
528    fn check_and_propagate_closing_forward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
529        // An early return is necessary if we are shutting down,
530        // because in that case the source can already have been set to none.
531        if self.shutting_down.get() {
532            return;
533        }
534
535        // if source.[[state]] is or becomes "closed", then
536        let source = self
537            .reader
538            .get_stream()
539            .expect("Reader should still have a stream");
540        if source.is_closed() {
541            // If preventClose is false,
542            if !self.prevent_close {
543                // shutdown with an action of ! WritableStreamAbort(dest, source.[[storedError]])
544                // and with source.[[storedError]].
545                self.shutdown(
546                    cx,
547                    global,
548                    Some(ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation),
549                )
550            } else {
551                // Otherwise, shutdown.
552                self.shutdown(cx, global, None);
553            }
554        }
555    }
556
557    /// Closing must be propagated backward part of
558    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
559    fn check_and_propagate_closing_backward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
560        // An early return is necessary if we are shutting down,
561        // because in that case the destination can already have been set to none.
562        if self.shutting_down.get() {
563            return;
564        }
565
566        // if ! WritableStreamCloseQueuedOrInFlight(dest) is true
567        // or dest.[[state]] is "closed"
568        let dest = self
569            .writer
570            .get_stream()
571            .expect("Writer should still have a stream");
572        if dest.close_queued_or_in_flight() || dest.is_closed() {
573            // Assert: no chunks have been read or written.
574            // Note: unclear how to perform this assertion.
575
576            // Let destClosed be a new TypeError.
577            rooted!(&in(cx) let mut dest_closed = UndefinedValue());
578            let error =
579                Error::Type(c"Destination is closed or has closed queued or in flight".to_owned());
580            error.to_jsval(
581                cx.into(),
582                global,
583                dest_closed.handle_mut(),
584                CanGc::from_cx(cx),
585            );
586            self.set_shutdown_error(dest_closed.handle());
587
588            // If preventCancel is false,
589            if !self.prevent_cancel {
590                // shutdown with an action of ! ReadableStreamCancel(source, destClosed)
591                // and with destClosed.
592                self.shutdown(cx, global, Some(ShutdownAction::ReadableStreamCancel))
593            } else {
594                // Otherwise, shutdown with destClosed.
595                self.shutdown(cx, global, None);
596            }
597        }
598    }
599
600    /// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action>
601    /// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown>
602    /// Combined into one method with an optional action.
603    fn shutdown(
604        &self,
605        cx: &mut CurrentRealm,
606        global: &GlobalScope,
607        action: Option<ShutdownAction>,
608    ) {
609        let realm = cx.into();
610        let realm = InRealm::Already(&realm);
611        // If shuttingDown is true, abort these substeps.
612        // Set shuttingDown to true.
613        if !self.shutting_down.replace(true) {
614            let dest = self.writer.get_stream().expect("Stream must be set");
615            // If dest.[[state]] is "writable",
616            // and ! WritableStreamCloseQueuedOrInFlight(dest) is false,
617            if dest.is_writable() && !dest.close_queued_or_in_flight() {
618                // If any chunks have been read but not yet written, write them to dest.
619                // Done at the top of `Callback`.
620
621                // Wait until every chunk that has been read has been written
622                // (i.e. the corresponding promises have settled).
623                if let Some(write) = self.pending_writes.borrow_mut().front() {
624                    *self.state.borrow_mut() = PipeToState::ShuttingDownWithPendingWrites(action);
625                    self.wait_on_pending_write(global, write.clone(), realm, CanGc::from_cx(cx));
626                    return;
627                }
628            }
629
630            // Note: error is stored in `self.shutdown_error`.
631            if let Some(action) = action {
632                // Let p be the result of performing action.
633                self.perform_action(cx, global, action);
634            } else {
635                // Finalize, passing along error if it was given.
636                self.finalize(cx.into(), global, CanGc::from_cx(cx));
637            }
638        }
639    }
640
641    /// The perform action part of
642    /// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action>
643    fn perform_action(&self, cx: &mut CurrentRealm, global: &GlobalScope, action: ShutdownAction) {
644        let realm = cx.into();
645        let realm = InRealm::Already(&realm);
646        rooted!(&in(cx) let mut error = UndefinedValue());
647        if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
648            error.set(shutdown_error.get());
649        }
650
651        *self.state.borrow_mut() = PipeToState::ShuttingDownPendingAction;
652
653        // Let p be the result of performing action.
654        let promise = match action {
655            ShutdownAction::WritableStreamAbort => {
656                let dest = self.writer.get_stream().expect("Stream must be set");
657                dest.abort(cx, global, error.handle())
658            },
659            ShutdownAction::ReadableStreamCancel => {
660                let source = self
661                    .reader
662                    .get_stream()
663                    .expect("Reader should have a stream.");
664                source.cancel(cx.into(), global, error.handle(), CanGc::from_cx(cx))
665            },
666            ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation => self
667                .writer
668                .close_with_error_propagation(cx.into(), global, CanGc::from_cx(cx)),
669            ShutdownAction::Abort => {
670                // Note: implementation of the `abortAlgorithm`
671                // of the signal associated with this piping operation.
672
673                // Let error be signal’s abort reason.
674                rooted!(&in(cx) let mut error = UndefinedValue());
675                error.set(self.abort_reason.get());
676
677                // Let actions be an empty ordered set.
678                let mut actions = vec![];
679
680                // If preventAbort is false, append the following action to actions:
681                if !self.prevent_abort {
682                    let dest = self
683                        .writer
684                        .get_stream()
685                        .expect("Destination stream must be set");
686
687                    // If dest.[[state]] is "writable",
688                    let promise = if dest.is_writable() {
689                        // return ! WritableStreamAbort(dest, error)
690                        dest.abort(cx, global, error.handle())
691                    } else {
692                        // Otherwise, return a promise resolved with undefined.
693                        Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
694                    };
695                    actions.push(promise);
696                }
697
698                // If preventCancel is false, append the following action action to actions:
699                if !self.prevent_cancel {
700                    let source = self.reader.get_stream().expect("Source stream must be set");
701
702                    // If source.[[state]] is "readable",
703                    let promise = if source.is_readable() {
704                        // return ! ReadableStreamCancel(source, error).
705                        source.cancel(cx.into(), global, error.handle(), CanGc::from_cx(cx))
706                    } else {
707                        // Otherwise, return a promise resolved with undefined.
708                        Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
709                    };
710                    actions.push(promise);
711                }
712
713                // Shutdown with an action consisting
714                // of getting a promise to wait for all of the actions in actions,
715                // and with error.
716                wait_for_all_promise(cx.into(), global, actions, realm, CanGc::from_cx(cx))
717            },
718        };
719
720        // Upon fulfillment of p, finalize, passing along originalError if it was given.
721        // Upon rejection of p with reason newError, finalize with newError.
722        let handler = PromiseNativeHandler::new(
723            global,
724            Some(Box::new(self.clone())),
725            Some(Box::new(self.clone())),
726            CanGc::from_cx(cx),
727        );
728        promise.append_native_handler(&handler, realm, CanGc::from_cx(cx));
729        *self.shutdown_action_promise.borrow_mut() = Some(promise);
730    }
731
732    /// <https://streams.spec.whatwg.org/#rs-pipeTo-finalize>
733    fn finalize(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
734        *self.state.borrow_mut() = PipeToState::Finalized;
735
736        // Perform ! WritableStreamDefaultWriterRelease(writer).
737        self.writer.release(cx, global, can_gc);
738
739        // If reader implements ReadableStreamBYOBReader,
740        // perform ! ReadableStreamBYOBReaderRelease(reader).
741        // TODO.
742
743        // Otherwise, perform ! ReadableStreamDefaultReaderRelease(reader).
744        self.reader
745            .release(can_gc)
746            .expect("Releasing the reader should not fail");
747
748        // If signal is not undefined, remove abortAlgorithm from signal.
749        // Note: since `self.shutdown` is true at this point,
750        // the abort algorithm is a no-op,
751        // so for now not implementing this step.
752
753        if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
754            rooted!(in(*cx) let mut error = UndefinedValue());
755            error.set(shutdown_error.get());
756            // If error was given, reject promise with error.
757            self.result_promise.reject_native(&error.handle(), can_gc);
758        } else {
759            // Otherwise, resolve promise with undefined.
760            self.result_promise.resolve_native(&(), can_gc);
761        }
762    }
763}
764
765/// The fulfillment handler for the reacting to sourceCancelPromise part of
766/// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
767#[derive(Clone, JSTraceable, MallocSizeOf)]
768struct SourceCancelPromiseFulfillmentHandler {
769    #[conditional_malloc_size_of]
770    result: Rc<Promise>,
771}
772
773impl Callback for SourceCancelPromiseFulfillmentHandler {
774    /// The fulfillment handler for the reacting to sourceCancelPromise part of
775    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
776    /// An implementation of <https://webidl.spec.whatwg.org/#dfn-perform-steps-once-promise-is-settled>
777    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
778        self.result.resolve_native(&(), CanGc::from_cx(cx));
779    }
780}
781
782/// The rejection handler for the reacting to sourceCancelPromise part of
783/// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
784#[derive(Clone, JSTraceable, MallocSizeOf)]
785struct SourceCancelPromiseRejectionHandler {
786    #[conditional_malloc_size_of]
787    result: Rc<Promise>,
788}
789
790impl Callback for SourceCancelPromiseRejectionHandler {
791    /// The rejection handler for the reacting to sourceCancelPromise part of
792    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
793    /// An implementation of <https://webidl.spec.whatwg.org/#dfn-perform-steps-once-promise-is-settled>
794    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
795        self.result.reject_native(&v, CanGc::from_cx(cx));
796    }
797}
798
799/// <https://streams.spec.whatwg.org/#readablestream-state>
800#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf, PartialEq)]
801pub(crate) enum ReadableStreamState {
802    #[default]
803    Readable,
804    Closed,
805    Errored,
806}
807
808/// <https://streams.spec.whatwg.org/#readablestream-controller>
809#[derive(JSTraceable, MallocSizeOf)]
810#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
811pub(crate) enum ControllerType {
812    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller>
813    Byte(MutNullableDom<ReadableByteStreamController>),
814    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller>
815    Default(MutNullableDom<ReadableStreamDefaultController>),
816}
817
818/// <https://streams.spec.whatwg.org/#readablestream-readerr>
819#[derive(JSTraceable, MallocSizeOf)]
820#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
821pub(crate) enum ReaderType {
822    /// <https://streams.spec.whatwg.org/#readablestreambyobreader>
823    #[allow(clippy::upper_case_acronyms)]
824    BYOB(MutNullableDom<ReadableStreamBYOBReader>),
825    /// <https://streams.spec.whatwg.org/#readablestreamdefaultreader>
826    Default(MutNullableDom<ReadableStreamDefaultReader>),
827}
828
829impl Eq for ReaderType {}
830impl PartialEq for ReaderType {
831    fn eq(&self, other: &Self) -> bool {
832        matches!(
833            (self, other),
834            (ReaderType::BYOB(_), ReaderType::BYOB(_)) |
835                (ReaderType::Default(_), ReaderType::Default(_))
836        )
837    }
838}
839
840/// <https://streams.spec.whatwg.org/#create-readable-stream>
841#[cfg_attr(crown, expect(crown::unrooted_must_root))]
842pub(crate) fn create_readable_stream(
843    global: &GlobalScope,
844    underlying_source_type: UnderlyingSourceType,
845    queuing_strategy: Option<Rc<QueuingStrategySize>>,
846    high_water_mark: Option<f64>,
847    can_gc: CanGc,
848) -> DomRoot<ReadableStream> {
849    // If highWaterMark was not passed, set it to 1.
850    let high_water_mark = high_water_mark.unwrap_or(1.0);
851
852    // If sizeAlgorithm was not passed, set it to an algorithm that returns 1.
853    let size_algorithm =
854        queuing_strategy.unwrap_or(extract_size_algorithm(&QueuingStrategy::empty(), can_gc));
855
856    // Assert: ! IsNonNegativeNumber(highWaterMark) is true.
857    assert!(high_water_mark >= 0.0);
858
859    // Let stream be a new ReadableStream.
860    // Perform ! InitializeReadableStream(stream).
861    let stream = ReadableStream::new_with_proto(global, None, can_gc);
862
863    // Let controller be a new ReadableStreamDefaultController.
864    let controller = ReadableStreamDefaultController::new(
865        global,
866        underlying_source_type,
867        high_water_mark,
868        size_algorithm,
869        can_gc,
870    );
871
872    // Perform ? SetUpReadableStreamDefaultController(stream, controller, startAlgorithm,
873    // pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm).
874    controller
875        .setup(stream.clone(), can_gc)
876        .expect("Setup of default controller cannot fail");
877
878    // Return stream.
879    stream
880}
881
882/// <https://streams.spec.whatwg.org/#abstract-opdef-createreadablebytestream>
883#[cfg_attr(crown, expect(crown::unrooted_must_root))]
884pub(crate) fn readable_byte_stream_tee(
885    global: &GlobalScope,
886    underlying_source_type: UnderlyingSourceType,
887    can_gc: CanGc,
888) -> DomRoot<ReadableStream> {
889    // Let stream be a new ReadableStream.
890    // Perform ! InitializeReadableStream(stream).
891    let tee_stream = ReadableStream::new_with_proto(global, None, can_gc);
892
893    // Let controller be a new ReadableByteStreamController.
894    let controller = ReadableByteStreamController::new(underlying_source_type, 0.0, global, can_gc);
895
896    // Perform ? SetUpReadableByteStreamController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, 0, undefined).
897    controller
898        .setup(global, tee_stream.clone(), can_gc)
899        .expect("Setup of byte stream controller cannot fail");
900
901    // Return stream.
902    tee_stream
903}
904
905/// <https://streams.spec.whatwg.org/#rs-class>
906#[dom_struct]
907pub(crate) struct ReadableStream {
908    reflector_: Reflector,
909
910    /// <https://streams.spec.whatwg.org/#readablestream-controller>
911    /// Note: the inner `MutNullableDom` should really be an `Option<Dom>`,
912    /// because it is never unset once set.
913    controller: RefCell<Option<ControllerType>>,
914
915    /// <https://streams.spec.whatwg.org/#readablestream-storederror>
916    #[ignore_malloc_size_of = "mozjs"]
917    stored_error: Heap<JSVal>,
918
919    /// <https://streams.spec.whatwg.org/#readablestream-disturbed>
920    disturbed: Cell<bool>,
921
922    /// <https://streams.spec.whatwg.org/#readablestream-reader>
923    reader: RefCell<Option<ReaderType>>,
924
925    /// <https://streams.spec.whatwg.org/#readablestream-state>
926    state: Cell<ReadableStreamState>,
927}
928
929impl ReadableStream {
930    /// <https://streams.spec.whatwg.org/#initialize-readable-stream>
931    fn new_inherited() -> ReadableStream {
932        ReadableStream {
933            reflector_: Reflector::new(),
934            controller: RefCell::new(None),
935            stored_error: Heap::default(),
936            disturbed: Default::default(),
937            reader: RefCell::new(None),
938            state: Cell::new(Default::default()),
939        }
940    }
941
942    pub(crate) fn new_with_proto(
943        global: &GlobalScope,
944        proto: Option<SafeHandleObject>,
945        can_gc: CanGc,
946    ) -> DomRoot<ReadableStream> {
947        reflect_dom_object_with_proto(
948            Box::new(ReadableStream::new_inherited()),
949            global,
950            proto,
951            can_gc,
952        )
953    }
954
955    /// Used as part of
956    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
957    pub(crate) fn set_default_controller(&self, controller: &ReadableStreamDefaultController) {
958        *self.controller.borrow_mut() = Some(ControllerType::Default(MutNullableDom::new(Some(
959            controller,
960        ))));
961    }
962
963    /// Used as part of
964    /// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
965    pub(crate) fn set_byte_controller(&self, controller: &ReadableByteStreamController) {
966        *self.controller.borrow_mut() =
967            Some(ControllerType::Byte(MutNullableDom::new(Some(controller))));
968    }
969
970    /// Used as part of
971    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
972    pub(crate) fn assert_no_controller(&self) {
973        let has_no_controller = self.controller.borrow().is_none();
974        assert!(has_no_controller);
975    }
976
977    /// Build a stream backed by a Rust source that has already been read into memory.
978    pub(crate) fn new_from_bytes(
979        global: &GlobalScope,
980        bytes: Vec<u8>,
981        can_gc: CanGc,
982    ) -> Fallible<DomRoot<ReadableStream>> {
983        let stream = ReadableStream::new_with_external_underlying_source(
984            global,
985            UnderlyingSourceType::Memory(bytes.len()),
986            can_gc,
987        )?;
988        stream.enqueue_native(bytes, can_gc);
989        stream.controller_close_native(can_gc);
990        Ok(stream)
991    }
992
993    /// Build a stream backed by a Rust underlying source.
994    /// Note: external sources are always paired with a default controller.
995    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
996    pub(crate) fn new_with_external_underlying_source(
997        global: &GlobalScope,
998        source: UnderlyingSourceType,
999        can_gc: CanGc,
1000    ) -> Fallible<DomRoot<ReadableStream>> {
1001        assert!(source.is_native());
1002        let stream = ReadableStream::new_with_proto(global, None, can_gc);
1003        let controller = ReadableStreamDefaultController::new(
1004            global,
1005            source,
1006            1.0,
1007            extract_size_algorithm(&QueuingStrategy::empty(), can_gc),
1008            can_gc,
1009        );
1010        controller.setup(stream.clone(), can_gc)?;
1011        Ok(stream)
1012    }
1013
1014    /// Call into the release steps of the controller,
1015    pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
1016        match self.controller.borrow().as_ref() {
1017            Some(ControllerType::Default(controller)) => {
1018                let controller = controller
1019                    .get()
1020                    .ok_or_else(|| Error::Type(c"Stream should have controller.".to_owned()))?;
1021                controller.perform_release_steps()
1022            },
1023            Some(ControllerType::Byte(controller)) => {
1024                let controller = controller
1025                    .get()
1026                    .ok_or_else(|| Error::Type(c"Stream should have controller.".to_owned()))?;
1027                controller.perform_release_steps()
1028            },
1029            None => Err(Error::Type(c"Stream should have controller.".to_owned())),
1030        }
1031    }
1032
1033    /// Call into the pull steps of the controller,
1034    /// as part of
1035    /// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read>
1036    pub(crate) fn perform_pull_steps(
1037        &self,
1038        cx: SafeJSContext,
1039        read_request: &ReadRequest,
1040        can_gc: CanGc,
1041    ) {
1042        match self.controller.borrow().as_ref() {
1043            Some(ControllerType::Default(controller)) => controller
1044                .get()
1045                .expect("Stream should have controller.")
1046                .perform_pull_steps(read_request, can_gc),
1047            Some(ControllerType::Byte(controller)) => controller
1048                .get()
1049                .expect("Stream should have controller.")
1050                .perform_pull_steps(cx, read_request, can_gc),
1051            None => {
1052                unreachable!("Stream does not have a controller.");
1053            },
1054        }
1055    }
1056
1057    /// Call into the pull steps of the controller,
1058    /// as part of
1059    /// <https://streams.spec.whatwg.org/#readable-stream-byob-reader-read>
1060    pub(crate) fn perform_pull_into(
1061        &self,
1062        cx: SafeJSContext,
1063        read_into_request: &ReadIntoRequest,
1064        view: HeapBufferSource<ArrayBufferViewU8>,
1065        min: u64,
1066        can_gc: CanGc,
1067    ) {
1068        match self.controller.borrow().as_ref() {
1069            Some(ControllerType::Byte(controller)) => controller
1070                .get()
1071                .expect("Stream should have controller.")
1072                .perform_pull_into(cx, read_into_request, view, min, can_gc),
1073            _ => {
1074                unreachable!(
1075                    "Pulling a chunk from a stream with a default controller using a BYOB reader"
1076                )
1077            },
1078        }
1079    }
1080
1081    /// <https://streams.spec.whatwg.org/#readable-stream-add-read-request>
1082    pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
1083        match self.reader.borrow().as_ref() {
1084            Some(ReaderType::Default(reader)) => {
1085                let Some(reader) = reader.get() else {
1086                    panic!("Attempt to add a read request without having first acquired a reader.");
1087                };
1088
1089                // Assert: stream.[[state]] is "readable".
1090                assert!(self.is_readable());
1091
1092                // Append readRequest to stream.[[reader]].[[readRequests]].
1093                reader.add_read_request(read_request);
1094            },
1095            _ => {
1096                unreachable!("Adding a read request can only be done on a default reader.")
1097            },
1098        }
1099    }
1100
1101    /// <https://streams.spec.whatwg.org/#readable-stream-add-read-into-request>
1102    pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
1103        match self.reader.borrow().as_ref() {
1104            // Assert: stream.[[reader]] implements ReadableStreamBYOBReader.
1105            Some(ReaderType::BYOB(reader)) => {
1106                let Some(reader) = reader.get() else {
1107                    unreachable!(
1108                        "Attempt to add a read into request without having first acquired a reader."
1109                    );
1110                };
1111
1112                // Assert: stream.[[state]] is "readable" or "closed".
1113                assert!(self.is_readable() || self.is_closed());
1114
1115                // Append readRequest to stream.[[reader]].[[readIntoRequests]].
1116                reader.add_read_into_request(read_request);
1117            },
1118            _ => {
1119                unreachable!("Adding a read into request can only be done on a BYOB reader.")
1120            },
1121        }
1122    }
1123
1124    /// Endpoint to enqueue chunks directly from Rust.
1125    /// Note: in other use cases this call happens via the controller.
1126    pub(crate) fn enqueue_native(&self, bytes: Vec<u8>, can_gc: CanGc) {
1127        match self.controller.borrow().as_ref() {
1128            Some(ControllerType::Default(controller)) => controller
1129                .get()
1130                .expect("Stream should have controller.")
1131                .enqueue_native(bytes, can_gc),
1132            _ => {
1133                unreachable!(
1134                    "Enqueueing chunk to a stream from Rust on other than default controller"
1135                );
1136            },
1137        }
1138    }
1139
1140    /// <https://streams.spec.whatwg.org/#readable-stream-error>
1141    pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
1142        // Assert: stream.[[state]] is "readable".
1143        assert!(self.is_readable());
1144
1145        // Set stream.[[state]] to "errored".
1146        self.state.set(ReadableStreamState::Errored);
1147
1148        // Set stream.[[storedError]] to e.
1149        self.stored_error.set(e.get());
1150
1151        // Let reader be stream.[[reader]].
1152
1153        let default_reader = {
1154            let reader_ref = self.reader.borrow();
1155            match reader_ref.as_ref() {
1156                Some(ReaderType::Default(reader)) => reader.get(),
1157                _ => None,
1158            }
1159        };
1160
1161        if let Some(reader) = default_reader {
1162            // Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, e).
1163            reader.error(e, can_gc);
1164            return;
1165        }
1166
1167        let byob_reader = {
1168            let reader_ref = self.reader.borrow();
1169            match reader_ref.as_ref() {
1170                Some(ReaderType::BYOB(reader)) => reader.get(),
1171                _ => None,
1172            }
1173        };
1174
1175        if let Some(reader) = byob_reader {
1176            // Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e).
1177            reader.error_read_into_requests(e, can_gc);
1178        }
1179
1180        // If reader is undefined, return.
1181    }
1182
1183    /// <https://streams.spec.whatwg.org/#readablestream-storederror>
1184    pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
1185        handle_mut.set(self.stored_error.get());
1186    }
1187
1188    /// <https://streams.spec.whatwg.org/#readable-stream-error>
1189    /// Note: in other use cases this call happens via the controller.
1190    pub(crate) fn error_native(&self, error: Error, can_gc: CanGc) {
1191        let cx = GlobalScope::get_cx();
1192        rooted!(in(*cx) let mut error_val = UndefinedValue());
1193        error.to_jsval(cx, &self.global(), error_val.handle_mut(), can_gc);
1194        self.error(error_val.handle(), can_gc);
1195    }
1196
1197    /// Call into the controller's `Close` method.
1198    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-close>
1199    pub(crate) fn controller_close_native(&self, can_gc: CanGc) {
1200        match self.controller.borrow().as_ref() {
1201            Some(ControllerType::Default(controller)) => {
1202                let _ = controller
1203                    .get()
1204                    .expect("Stream should have controller.")
1205                    .Close(can_gc);
1206            },
1207            _ => {
1208                unreachable!("Native closing is only done on default controllers.")
1209            },
1210        }
1211    }
1212
1213    /// Returns a boolean reflecting whether the stream has all data in memory.
1214    /// Useful for native source integration only.
1215    pub(crate) fn in_memory(&self) -> bool {
1216        match self.controller.borrow().as_ref() {
1217            Some(ControllerType::Default(controller)) => controller
1218                .get()
1219                .expect("Stream should have controller.")
1220                .in_memory(),
1221            _ => {
1222                unreachable!(
1223                    "Checking if source is in memory for a stream with a non-default controller"
1224                )
1225            },
1226        }
1227    }
1228
1229    /// Return bytes for synchronous use, if the stream has all data in memory.
1230    /// Useful for native source integration only.
1231    pub(crate) fn get_in_memory_bytes(&self) -> Option<GenericSharedMemory> {
1232        match self.controller.borrow().as_ref() {
1233            Some(ControllerType::Default(controller)) => controller
1234                .get()
1235                .expect("Stream should have controller.")
1236                .get_in_memory_bytes()
1237                .as_deref()
1238                .map(GenericSharedMemory::from_bytes),
1239            _ => {
1240                unreachable!("Getting in-memory bytes for a stream with a non-default controller")
1241            },
1242        }
1243    }
1244
1245    /// Acquires a reader and locks the stream,
1246    /// must be done before `read_a_chunk`.
1247    /// Native call to
1248    /// <https://streams.spec.whatwg.org/#acquire-readable-stream-reader>
1249    pub(crate) fn acquire_default_reader(
1250        &self,
1251        can_gc: CanGc,
1252    ) -> Fallible<DomRoot<ReadableStreamDefaultReader>> {
1253        // Let reader be a new ReadableStreamDefaultReader.
1254        let reader = ReadableStreamDefaultReader::new(&self.global(), can_gc);
1255
1256        // Perform ? SetUpReadableStreamDefaultReader(reader, stream).
1257        reader.set_up(self, &self.global(), can_gc)?;
1258
1259        // Return reader.
1260        Ok(reader)
1261    }
1262
1263    /// <https://streams.spec.whatwg.org/#acquire-readable-stream-byob-reader>
1264    pub(crate) fn acquire_byob_reader(
1265        &self,
1266        can_gc: CanGc,
1267    ) -> Fallible<DomRoot<ReadableStreamBYOBReader>> {
1268        // Let reader be a new ReadableStreamBYOBReader.
1269        let reader = ReadableStreamBYOBReader::new(&self.global(), can_gc);
1270        // Perform ? SetUpReadableStreamBYOBReader(reader, stream).
1271        reader.set_up(self, &self.global(), can_gc)?;
1272
1273        // Return reader.
1274        Ok(reader)
1275    }
1276
1277    pub(crate) fn get_default_controller(&self) -> DomRoot<ReadableStreamDefaultController> {
1278        match self.controller.borrow().as_ref() {
1279            Some(ControllerType::Default(controller)) => {
1280                controller.get().expect("Stream should have controller.")
1281            },
1282            _ => {
1283                unreachable!(
1284                    "Getting default controller for a stream with a non-default controller"
1285                )
1286            },
1287        }
1288    }
1289
1290    pub(crate) fn get_byte_controller(&self) -> DomRoot<ReadableByteStreamController> {
1291        match self.controller.borrow().as_ref() {
1292            Some(ControllerType::Byte(controller)) => {
1293                controller.get().expect("Stream should have controller.")
1294            },
1295            _ => {
1296                unreachable!("Getting byte controller for a stream with a non-byte controller")
1297            },
1298        }
1299    }
1300
1301    pub(crate) fn get_default_reader(&self) -> DomRoot<ReadableStreamDefaultReader> {
1302        match self.reader.borrow().as_ref() {
1303            Some(ReaderType::Default(reader)) => reader.get().expect("Stream should have reader."),
1304            _ => {
1305                unreachable!("Getting default reader for a stream with a non-default reader")
1306            },
1307        }
1308    }
1309
1310    /// Read a chunk from the stream,
1311    /// must be called after `start_reading`,
1312    /// and before `stop_reading`.
1313    /// Native call to
1314    /// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read>
1315    pub(crate) fn read_a_chunk(&self, can_gc: CanGc) -> Rc<Promise> {
1316        match self.reader.borrow().as_ref() {
1317            Some(ReaderType::Default(reader)) => {
1318                let Some(reader) = reader.get() else {
1319                    unreachable!(
1320                        "Attempt to read stream chunk without having first acquired a reader."
1321                    );
1322                };
1323                reader.Read(can_gc)
1324            },
1325            _ => {
1326                unreachable!("Native reading of a chunk can only be done with a default reader.")
1327            },
1328        }
1329    }
1330
1331    /// Releases the lock on the reader,
1332    /// must be done after `start_reading`.
1333    /// Native call to
1334    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultreaderrelease>
1335    pub(crate) fn stop_reading(&self, can_gc: CanGc) {
1336        let reader_ref = self.reader.borrow();
1337
1338        match reader_ref.as_ref() {
1339            Some(ReaderType::Default(reader)) => {
1340                let Some(reader) = reader.get() else {
1341                    unreachable!("Attempt to stop reading without having first acquired a reader.");
1342                };
1343
1344                drop(reader_ref);
1345                reader.release(can_gc).expect("Reader release cannot fail.");
1346            },
1347            _ => {
1348                unreachable!("Native stop reading can only be done with a default reader.")
1349            },
1350        }
1351    }
1352
1353    /// <https://streams.spec.whatwg.org/#is-readable-stream-locked>
1354    pub(crate) fn is_locked(&self) -> bool {
1355        match self.reader.borrow().as_ref() {
1356            Some(ReaderType::Default(reader)) => reader.get().is_some(),
1357            Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1358            None => false,
1359        }
1360    }
1361
1362    pub(crate) fn is_disturbed(&self) -> bool {
1363        self.disturbed.get()
1364    }
1365
1366    pub(crate) fn set_is_disturbed(&self, disturbed: bool) {
1367        self.disturbed.set(disturbed);
1368    }
1369
1370    pub(crate) fn is_closed(&self) -> bool {
1371        self.state.get() == ReadableStreamState::Closed
1372    }
1373
1374    pub(crate) fn is_errored(&self) -> bool {
1375        self.state.get() == ReadableStreamState::Errored
1376    }
1377
1378    pub(crate) fn is_readable(&self) -> bool {
1379        self.state.get() == ReadableStreamState::Readable
1380    }
1381
1382    pub(crate) fn has_default_reader(&self) -> bool {
1383        match self.reader.borrow().as_ref() {
1384            Some(ReaderType::Default(reader)) => reader.get().is_some(),
1385            _ => false,
1386        }
1387    }
1388
1389    pub(crate) fn has_byob_reader(&self) -> bool {
1390        match self.reader.borrow().as_ref() {
1391            Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1392            _ => false,
1393        }
1394    }
1395
1396    pub(crate) fn has_byte_controller(&self) -> bool {
1397        match self.controller.borrow().as_ref() {
1398            Some(ControllerType::Byte(controller)) => controller.get().is_some(),
1399            _ => false,
1400        }
1401    }
1402
1403    /// <https://streams.spec.whatwg.org/#readable-stream-get-num-read-requests>
1404    pub(crate) fn get_num_read_requests(&self) -> usize {
1405        match self.reader.borrow().as_ref() {
1406            Some(ReaderType::Default(reader)) => {
1407                let reader = reader
1408                    .get()
1409                    .expect("Stream must have a reader when getting the number of read requests.");
1410                reader.get_num_read_requests()
1411            },
1412            _ => unreachable!(
1413                "Stream must have a default reader when get num read requests is called into."
1414            ),
1415        }
1416    }
1417
1418    /// <https://streams.spec.whatwg.org/#readable-stream-get-num-read-into-requests>
1419    pub(crate) fn get_num_read_into_requests(&self) -> usize {
1420        assert!(self.has_byob_reader());
1421
1422        match self.reader.borrow().as_ref() {
1423            Some(ReaderType::BYOB(reader)) => {
1424                let Some(reader) = reader.get() else {
1425                    unreachable!(
1426                        "Stream must have a reader when get num read into requests is called into."
1427                    );
1428                };
1429                reader.get_num_read_into_requests()
1430            },
1431            _ => {
1432                unreachable!(
1433                    "Stream must have a BYOB reader when get num read into requests is called into."
1434                );
1435            },
1436        }
1437    }
1438
1439    /// <https://streams.spec.whatwg.org/#readable-stream-fulfill-read-request>
1440    pub(crate) fn fulfill_read_request(&self, chunk: SafeHandleValue, done: bool, can_gc: CanGc) {
1441        // step 1 - Assert: ! ReadableStreamHasDefaultReader(stream) is true.
1442        assert!(self.has_default_reader());
1443
1444        match self.reader.borrow().as_ref() {
1445            Some(ReaderType::Default(reader)) => {
1446                // step 2 - Let reader be stream.[[reader]].
1447                let reader = reader
1448                    .get()
1449                    .expect("Stream must have a reader when a read request is fulfilled.");
1450                // step 3 - Assert: reader.[[readRequests]] is not empty.
1451                assert_ne!(reader.get_num_read_requests(), 0);
1452                // step 4 & 5
1453                // Let readRequest be reader.[[readRequests]][0]. & Remove readRequest from reader.[[readRequests]].
1454                let request = reader.remove_read_request();
1455
1456                if done {
1457                    // step 6 - If done is true, perform readRequest’s close steps.
1458                    request.close_steps(can_gc);
1459                } else {
1460                    // step 7 - Otherwise, perform readRequest’s chunk steps, given chunk.
1461                    let result = RootedTraceableBox::new(Heap::default());
1462                    result.set(*chunk);
1463                    request.chunk_steps(result, &self.global(), can_gc);
1464                }
1465            },
1466            _ => {
1467                unreachable!(
1468                    "Stream must have a default reader when fulfill read requests is called into."
1469                );
1470            },
1471        }
1472    }
1473
1474    /// <https://streams.spec.whatwg.org/#readable-stream-fulfill-read-into-request>
1475    pub(crate) fn fulfill_read_into_request(
1476        &self,
1477        chunk: SafeHandleValue,
1478        done: bool,
1479        can_gc: CanGc,
1480    ) {
1481        // Assert: ! ReadableStreamHasBYOBReader(stream) is true.
1482        assert!(self.has_byob_reader());
1483
1484        // Let reader be stream.[[reader]].
1485        match self.reader.borrow().as_ref() {
1486            Some(ReaderType::BYOB(reader)) => {
1487                let Some(reader) = reader.get() else {
1488                    unreachable!(
1489                        "Stream must have a reader when a read into request is fulfilled."
1490                    );
1491                };
1492
1493                // Assert: reader.[[readIntoRequests]] is not empty.
1494                assert!(reader.get_num_read_into_requests() > 0);
1495
1496                // Let readIntoRequest be reader.[[readIntoRequests]][0].
1497                // Remove readIntoRequest from reader.[[readIntoRequests]].
1498                let read_into_request = reader.remove_read_into_request();
1499
1500                // If done is true, perform readIntoRequest’s close steps, given chunk.
1501                let result = RootedTraceableBox::new(Heap::default());
1502                if done {
1503                    result.set(*chunk);
1504                    read_into_request.close_steps(Some(result), can_gc);
1505                } else {
1506                    // Otherwise, perform readIntoRequest’s chunk steps, given chunk.
1507                    result.set(*chunk);
1508                    read_into_request.chunk_steps(result, can_gc);
1509                }
1510            },
1511            _ => {
1512                unreachable!(
1513                    "Stream must have a BYOB reader when fulfill read into requests is called into."
1514                );
1515            },
1516        };
1517    }
1518
1519    /// <https://streams.spec.whatwg.org/#readable-stream-close>
1520    pub(crate) fn close(&self, can_gc: CanGc) {
1521        // Assert: stream.[[state]] is "readable".
1522        assert!(self.is_readable());
1523        // Set stream.[[state]] to "closed".
1524        self.state.set(ReadableStreamState::Closed);
1525        // Let reader be stream.[[reader]].
1526
1527        // NOTE: do not hold the RefCell borrow across reader.close(),
1528        // or release() will panic when it tries to mut-borrow stream.reader.
1529        // So we pull out the underlying DOM reader in a local, then drop the borrow.
1530        let default_reader = {
1531            let reader_ref = self.reader.borrow();
1532            match reader_ref.as_ref() {
1533                Some(ReaderType::Default(reader)) => reader.get(),
1534                _ => None,
1535            }
1536        };
1537
1538        if let Some(reader) = default_reader {
1539            // steps 5 & 6 for a default reader
1540            reader.close(can_gc);
1541            return;
1542        }
1543
1544        // Same for BYOB reader.
1545        let byob_reader = {
1546            let reader_ref = self.reader.borrow();
1547            match reader_ref.as_ref() {
1548                Some(ReaderType::BYOB(reader)) => reader.get(),
1549                _ => None,
1550            }
1551        };
1552
1553        if let Some(reader) = byob_reader {
1554            // steps 5 & 6 for a BYOB reader
1555            reader.close(can_gc);
1556        }
1557
1558        // If reader is undefined, return.
1559    }
1560
1561    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>
1562    pub(crate) fn cancel(
1563        &self,
1564        cx: SafeJSContext,
1565        global: &GlobalScope,
1566        reason: SafeHandleValue,
1567        can_gc: CanGc,
1568    ) -> Rc<Promise> {
1569        // Set stream.[[disturbed]] to true.
1570        self.disturbed.set(true);
1571
1572        // If stream.[[state]] is "closed", return a promise resolved with undefined.
1573        if self.is_closed() {
1574            return Promise::new_resolved(global, cx, (), can_gc);
1575        }
1576        // If stream.[[state]] is "errored", return a promise rejected with stream.[[storedError]].
1577        if self.is_errored() {
1578            let promise = Promise::new(global, can_gc);
1579            rooted!(in(*cx) let mut rval = UndefinedValue());
1580            self.stored_error
1581                .safe_to_jsval(cx, rval.handle_mut(), can_gc);
1582            promise.reject_native(&rval.handle(), can_gc);
1583            return promise;
1584        }
1585        // Perform ! ReadableStreamClose(stream).
1586        self.close(can_gc);
1587
1588        // If reader is not undefined and reader implements ReadableStreamBYOBReader,
1589        let byob_reader = {
1590            let reader_ref = self.reader.borrow();
1591            match reader_ref.as_ref() {
1592                Some(ReaderType::BYOB(reader)) => reader.get(),
1593                _ => None,
1594            }
1595        };
1596
1597        if let Some(reader) = byob_reader {
1598            // step 6.1, 6.2 & 6.3 of https://streams.spec.whatwg.org/#readable-stream-cancel
1599            reader.cancel(can_gc);
1600        }
1601
1602        // Let sourceCancelPromise be ! stream.[[controller]].[[CancelSteps]](reason).
1603
1604        let source_cancel_promise = match self.controller.borrow().as_ref() {
1605            Some(ControllerType::Default(controller)) => controller
1606                .get()
1607                .expect("Stream should have controller.")
1608                .perform_cancel_steps(cx, global, reason, can_gc),
1609            Some(ControllerType::Byte(controller)) => controller
1610                .get()
1611                .expect("Stream should have controller.")
1612                .perform_cancel_steps(cx, global, reason, can_gc),
1613            None => {
1614                panic!("Stream does not have a controller.");
1615            },
1616        };
1617
1618        // Create a new promise,
1619        // and setup a handler in order to react to the fulfillment of sourceCancelPromise.
1620        let global = self.global();
1621        let result_promise = Promise::new(&global, can_gc);
1622        let fulfillment_handler = Box::new(SourceCancelPromiseFulfillmentHandler {
1623            result: result_promise.clone(),
1624        });
1625        let rejection_handler = Box::new(SourceCancelPromiseRejectionHandler {
1626            result: result_promise.clone(),
1627        });
1628        let handler = PromiseNativeHandler::new(
1629            &global,
1630            Some(fulfillment_handler),
1631            Some(rejection_handler),
1632            can_gc,
1633        );
1634        let realm = enter_realm(&*global);
1635        let comp = InRealm::Entered(&realm);
1636        source_cancel_promise.append_native_handler(&handler, comp, can_gc);
1637
1638        // Return the result of reacting to sourceCancelPromise
1639        // with a fulfillment step that returns undefined.
1640        result_promise
1641    }
1642
1643    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1644    pub(crate) fn set_reader(&self, new_reader: Option<ReaderType>) {
1645        *self.reader.borrow_mut() = new_reader;
1646    }
1647
1648    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1649    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee>
1650    fn byte_tee(&self, can_gc: CanGc) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1651        // Assert: stream implements ReadableStream.
1652        // Assert: stream.[[controller]] implements ReadableByteStreamController.
1653
1654        // Let reader be ? AcquireReadableStreamDefaultReader(stream).
1655        let reader = self.acquire_default_reader(can_gc)?;
1656        let reader = Rc::new(RefCell::new(ReaderType::Default(MutNullableDom::new(
1657            Some(&reader),
1658        ))));
1659
1660        // Let reading be false.
1661        let reading = Rc::new(Cell::new(false));
1662
1663        // Let readAgainForBranch1 be false.
1664        let read_again_for_branch_1 = Rc::new(Cell::new(false));
1665
1666        // Let readAgainForBranch2 be false.
1667        let read_again_for_branch_2 = Rc::new(Cell::new(false));
1668
1669        // Let canceled1 be false.
1670        let canceled_1 = Rc::new(Cell::new(false));
1671
1672        // Let canceled2 be false.
1673        let canceled_2 = Rc::new(Cell::new(false));
1674
1675        // Let reason1 be undefined.
1676        let reason_1 = Rc::new(Heap::boxed(UndefinedValue()));
1677
1678        // Let reason2 be undefined.
1679        let reason_2 = Rc::new(Heap::boxed(UndefinedValue()));
1680
1681        // Let cancelPromise be a new promise.
1682        let cancel_promise = Promise::new(&self.global(), can_gc);
1683        let reader_version = Rc::new(Cell::new(0));
1684
1685        let byte_tee_source_1 = ByteTeeUnderlyingSource::new(
1686            reader.clone(),
1687            self,
1688            reading.clone(),
1689            read_again_for_branch_1.clone(),
1690            read_again_for_branch_2.clone(),
1691            canceled_1.clone(),
1692            canceled_2.clone(),
1693            reason_1.clone(),
1694            reason_2.clone(),
1695            cancel_promise.clone(),
1696            reader_version.clone(),
1697            ByteTeeCancelAlgorithm::Cancel1Algorithm,
1698            ByteTeePullAlgorithm::Pull1Algorithm,
1699            can_gc,
1700        );
1701
1702        let byte_tee_source_2 = ByteTeeUnderlyingSource::new(
1703            reader.clone(),
1704            self,
1705            reading,
1706            read_again_for_branch_1,
1707            read_again_for_branch_2,
1708            canceled_1,
1709            canceled_2,
1710            reason_1,
1711            reason_2,
1712            cancel_promise.clone(),
1713            reader_version,
1714            ByteTeeCancelAlgorithm::Cancel2Algorithm,
1715            ByteTeePullAlgorithm::Pull2Algorithm,
1716            can_gc,
1717        );
1718
1719        // Set branch1 to ! CreateReadableByteStream(startAlgorithm, pull1Algorithm, cancel1Algorithm).
1720        let branch_1 = readable_byte_stream_tee(
1721            &self.global(),
1722            UnderlyingSourceType::TeeByte(Dom::from_ref(&byte_tee_source_1)),
1723            can_gc,
1724        );
1725        byte_tee_source_1.set_branch_1(&branch_1);
1726        byte_tee_source_2.set_branch_1(&branch_1);
1727
1728        // Set branch2 to ! CreateReadableByteStream(startAlgorithm, pull2Algorithm, cancel2Algorithm).
1729        let branch_2 = readable_byte_stream_tee(
1730            &self.global(),
1731            UnderlyingSourceType::TeeByte(Dom::from_ref(&byte_tee_source_2)),
1732            can_gc,
1733        );
1734        byte_tee_source_1.set_branch_2(&branch_2);
1735        byte_tee_source_2.set_branch_2(&branch_2);
1736
1737        // Perform forwardReaderError, given reader.
1738        byte_tee_source_1.forward_reader_error(reader.clone(), can_gc);
1739        byte_tee_source_2.forward_reader_error(reader, can_gc);
1740
1741        // Return « branch1, branch2 ».
1742        Ok(vec![branch_1, branch_2])
1743    }
1744
1745    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee>
1746    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1747    fn default_tee(
1748        &self,
1749        clone_for_branch_2: bool,
1750        can_gc: CanGc,
1751    ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1752        // Assert: stream implements ReadableStream.
1753
1754        // Assert: cloneForBranch2 is a boolean.
1755        let clone_for_branch_2 = Rc::new(Cell::new(clone_for_branch_2));
1756
1757        // Let reader be ? AcquireReadableStreamDefaultReader(stream).
1758        let reader = self.acquire_default_reader(can_gc)?;
1759
1760        // Let reading be false.
1761        let reading = Rc::new(Cell::new(false));
1762        // Let readAgain be false.
1763        let read_again = Rc::new(Cell::new(false));
1764        // Let canceled1 be false.
1765        let canceled_1 = Rc::new(Cell::new(false));
1766        // Let canceled2 be false.
1767        let canceled_2 = Rc::new(Cell::new(false));
1768
1769        // Let reason1 be undefined.
1770        let reason_1 = Rc::new(Heap::boxed(UndefinedValue()));
1771        // Let reason2 be undefined.
1772        let reason_2 = Rc::new(Heap::boxed(UndefinedValue()));
1773        // Let cancelPromise be a new promise.
1774        let cancel_promise = Promise::new(&self.global(), can_gc);
1775
1776        let tee_source_1 = DefaultTeeUnderlyingSource::new(
1777            &reader,
1778            self,
1779            reading.clone(),
1780            read_again.clone(),
1781            canceled_1.clone(),
1782            canceled_2.clone(),
1783            clone_for_branch_2.clone(),
1784            reason_1.clone(),
1785            reason_2.clone(),
1786            cancel_promise.clone(),
1787            DefaultTeeCancelAlgorithm::Cancel1Algorithm,
1788            can_gc,
1789        );
1790
1791        let underlying_source_type_branch_1 =
1792            UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_1));
1793
1794        let tee_source_2 = DefaultTeeUnderlyingSource::new(
1795            &reader,
1796            self,
1797            reading,
1798            read_again,
1799            canceled_1.clone(),
1800            canceled_2.clone(),
1801            clone_for_branch_2,
1802            reason_1,
1803            reason_2,
1804            cancel_promise.clone(),
1805            DefaultTeeCancelAlgorithm::Cancel2Algorithm,
1806            can_gc,
1807        );
1808
1809        let underlying_source_type_branch_2 =
1810            UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_2));
1811
1812        // Set branch_1 to ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel1Algorithm).
1813        let branch_1 = create_readable_stream(
1814            &self.global(),
1815            underlying_source_type_branch_1,
1816            None,
1817            None,
1818            can_gc,
1819        );
1820        tee_source_1.set_branch_1(&branch_1);
1821        tee_source_2.set_branch_1(&branch_1);
1822
1823        // Set branch_2 to ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel2Algorithm).
1824        let branch_2 = create_readable_stream(
1825            &self.global(),
1826            underlying_source_type_branch_2,
1827            None,
1828            None,
1829            can_gc,
1830        );
1831        tee_source_1.set_branch_2(&branch_2);
1832        tee_source_2.set_branch_2(&branch_2);
1833
1834        // Upon rejection of reader.[[closedPromise]] with reason r,
1835        reader.default_tee_append_native_handler_to_closed_promise(
1836            &branch_1,
1837            &branch_2,
1838            canceled_1,
1839            canceled_2,
1840            cancel_promise,
1841            can_gc,
1842        );
1843
1844        // Return « branch_1, branch_2 ».
1845        Ok(vec![branch_1, branch_2])
1846    }
1847
1848    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
1849    #[allow(clippy::too_many_arguments)]
1850    pub(crate) fn pipe_to(
1851        &self,
1852        cx: &mut CurrentRealm,
1853        global: &GlobalScope,
1854        dest: &WritableStream,
1855        prevent_close: bool,
1856        prevent_abort: bool,
1857        prevent_cancel: bool,
1858        signal: Option<&AbortSignal>,
1859    ) -> Rc<Promise> {
1860        let realm = cx.into();
1861        let realm = InRealm::Already(&realm);
1862        // Assert: source implements ReadableStream.
1863        // Assert: dest implements WritableStream.
1864        // Assert: prevent_close, prevent_abort, and prevent_cancel are all booleans.
1865        // Done with method signature types.
1866
1867        // If signal was not given, let signal be undefined.
1868        // Assert: either signal is undefined, or signal implements AbortSignal.
1869        // Note: done with the `signal` argument.
1870
1871        // Assert: ! IsReadableStreamLocked(source) is false.
1872        assert!(!self.is_locked());
1873
1874        // Assert: ! IsWritableStreamLocked(dest) is false.
1875        assert!(!dest.is_locked());
1876
1877        // If source.[[controller]] implements ReadableByteStreamController,
1878        // let reader be either ! AcquireReadableStreamBYOBReader(source)
1879        // or ! AcquireReadableStreamDefaultReader(source),
1880        // at the user agent’s discretion.
1881        // Note: for now only using default readers.
1882
1883        // Otherwise, let reader be ! AcquireReadableStreamDefaultReader(source).
1884        let reader = self
1885            .acquire_default_reader(CanGc::from_cx(cx))
1886            .expect("Acquiring a default reader for pipe_to cannot fail");
1887
1888        // Let writer be ! AcquireWritableStreamDefaultWriter(dest).
1889        let writer = dest
1890            .aquire_default_writer(cx.into(), global, CanGc::from_cx(cx))
1891            .expect("Acquiring a default writer for pipe_to cannot fail");
1892
1893        // Set source.[[disturbed]] to true.
1894        self.disturbed.set(true);
1895
1896        // Let shuttingDown be false.
1897        // Done below with default.
1898
1899        // Let promise be a new promise.
1900        let promise = Promise::new2(cx, global);
1901
1902        // In parallel, but not really, using reader and writer, read all chunks from source and write them to dest.
1903        rooted!(&in(cx) let pipe_to = PipeTo {
1904            reader: Dom::from_ref(&reader),
1905            writer: Dom::from_ref(&writer),
1906            pending_writes: Default::default(),
1907            state: Default::default(),
1908            prevent_abort,
1909            prevent_cancel,
1910            prevent_close,
1911            shutting_down: Default::default(),
1912            abort_reason: Default::default(),
1913            shutdown_error: Default::default(),
1914            shutdown_action_promise:  Default::default(),
1915            result_promise: promise.clone(),
1916        });
1917
1918        // If signal is not undefined,
1919        // Note: moving the steps to here, so that the `PipeTo` is available.
1920        if let Some(signal) = signal {
1921            // Let abortAlgorithm be the following steps:
1922            // Note: steps are implemented at call site.
1923            rooted!(&in(cx) let abort_algorithm = AbortAlgorithm::StreamPiping(pipe_to.clone()));
1924
1925            // If signal is aborted, perform abortAlgorithm and return promise.
1926            if signal.aborted() {
1927                signal.run_abort_algorithm(cx, global, &abort_algorithm);
1928                return promise;
1929            }
1930
1931            // Add abortAlgorithm to signal.
1932            signal.add(&abort_algorithm);
1933        }
1934
1935        // Note: perfom checks now, since streams can start as closed or errored.
1936        pipe_to.check_and_propagate_errors_forward(cx, global);
1937        pipe_to.check_and_propagate_errors_backward(cx, global);
1938        pipe_to.check_and_propagate_closing_forward(cx, global);
1939        pipe_to.check_and_propagate_closing_backward(cx, global);
1940
1941        // If we are not closed or errored,
1942        if *pipe_to.state.borrow() == PipeToState::Starting {
1943            // Start the pipe, by waiting on the writer being ready for a chunk.
1944            pipe_to.wait_for_writer_ready(global, realm, CanGc::from_cx(cx));
1945        }
1946
1947        // Return promise.
1948        promise
1949    }
1950
1951    /// <https://streams.spec.whatwg.org/#readable-stream-tee>
1952    pub(crate) fn tee(
1953        &self,
1954        clone_for_branch_2: bool,
1955        can_gc: CanGc,
1956    ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1957        // Assert: stream implements ReadableStream.
1958        // Assert: cloneForBranch2 is a boolean.
1959
1960        match self.controller.borrow().as_ref() {
1961            Some(ControllerType::Default(_)) => {
1962                // Return ? ReadableStreamDefaultTee(stream, cloneForBranch2).
1963                self.default_tee(clone_for_branch_2, can_gc)
1964            },
1965            Some(ControllerType::Byte(_)) => {
1966                // If stream.[[controller]] implements ReadableByteStreamController,
1967                // return ? ReadableByteStreamTee(stream).
1968                self.byte_tee(can_gc)
1969            },
1970            None => {
1971                unreachable!("Stream should have a controller.");
1972            },
1973        }
1974    }
1975
1976    /// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller-from-underlying-source>
1977    pub(crate) fn set_up_byte_controller(
1978        &self,
1979        global: &GlobalScope,
1980        underlying_source_dict: JsUnderlyingSource,
1981        underlying_source_handle: SafeHandleObject,
1982        stream: DomRoot<ReadableStream>,
1983        strategy_hwm: f64,
1984        can_gc: CanGc,
1985    ) -> Fallible<()> {
1986        // Let pullAlgorithm be an algorithm that returns a promise resolved with undefined.
1987        // Let cancelAlgorithm be an algorithm that returns a promise resolved with undefined.
1988        // If underlyingSourceDict["start"] exists, then set startAlgorithm to an algorithm which returns the result
1989        // of invoking underlyingSourceDict["start"] with argument list « controller »
1990        // and callback this value underlyingSource.
1991        // If underlyingSourceDict["pull"] exists, then set pullAlgorithm to an algorithm which returns the result
1992        // of invoking underlyingSourceDict["pull"] with argument list « controller »
1993        // and callback this value underlyingSource.
1994        // If underlyingSourceDict["cancel"] exists, then set cancelAlgorithm to an algorithm which takes an
1995        // argument reason and returns the result of invoking underlyingSourceDict["cancel"] with argument list
1996        // « reason » and callback this value underlyingSource.
1997
1998        // Let autoAllocateChunkSize be underlyingSourceDict["autoAllocateChunkSize"],
1999        // if it exists, or undefined otherwise.
2000        // If autoAllocateChunkSize is 0, then throw a TypeError exception.
2001        if let Some(0) = underlying_source_dict.autoAllocateChunkSize {
2002            return Err(Error::Type(c"autoAllocateChunkSize cannot be 0".to_owned()));
2003        }
2004
2005        let controller = ReadableByteStreamController::new(
2006            UnderlyingSourceType::Js(underlying_source_dict, Heap::default()),
2007            strategy_hwm,
2008            global,
2009            can_gc,
2010        );
2011
2012        // Note: this must be done before `setup`,
2013        // otherwise `thisOb` is null in the start callback.
2014        controller.set_underlying_source_this_object(underlying_source_handle);
2015
2016        // Perform ? SetUpReadableByteStreamController(stream, controller, startAlgorithm,
2017        // pullAlgorithm, cancelAlgorithm, highWaterMark, autoAllocateChunkSize).
2018        controller.setup(global, stream, can_gc)
2019    }
2020
2021    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2022    pub(crate) fn setup_cross_realm_transform_readable(
2023        &self,
2024        cx: SafeJSContext,
2025        port: &MessagePort,
2026        can_gc: CanGc,
2027    ) {
2028        let port_id = port.message_port_id();
2029        let global = self.global();
2030
2031        // Perform ! InitializeReadableStream(stream).
2032        // Done in `new_inherited`.
2033
2034        // Let sizeAlgorithm be an algorithm that returns 1.
2035        let size_algorithm = extract_size_algorithm(&QueuingStrategy::default(), can_gc);
2036
2037        // Note: other algorithms defined in the underlying source container.
2038
2039        // Let controller be a new ReadableStreamDefaultController.
2040        let controller = ReadableStreamDefaultController::new(
2041            &self.global(),
2042            UnderlyingSourceType::Transfer(Dom::from_ref(port)),
2043            0.,
2044            size_algorithm,
2045            can_gc,
2046        );
2047
2048        // Add a handler for port’s message event with the following steps:
2049        // Add a handler for port’s messageerror event with the following steps:
2050        rooted!(in(*cx) let cross_realm_transform_readable = CrossRealmTransformReadable {
2051            controller: Dom::from_ref(&controller),
2052        });
2053        global.note_cross_realm_transform_readable(&cross_realm_transform_readable, port_id);
2054
2055        // Enable port’s port message queue.
2056        port.Start(can_gc);
2057
2058        // Perform ! SetUpReadableStreamDefaultController
2059        controller
2060            .setup(DomRoot::from_ref(self), can_gc)
2061            .expect("Setting up controller for transfer cannot fail.");
2062    }
2063}
2064
2065impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
2066    /// <https://streams.spec.whatwg.org/#rs-constructor>
2067    fn Constructor(
2068        cx: SafeJSContext,
2069        global: &GlobalScope,
2070        proto: Option<SafeHandleObject>,
2071        can_gc: CanGc,
2072        underlying_source: Option<*mut JSObject>,
2073        strategy: &QueuingStrategy,
2074    ) -> Fallible<DomRoot<Self>> {
2075        // If underlyingSource is missing, set it to null.
2076        rooted!(in(*cx) let underlying_source_obj = underlying_source.unwrap_or(ptr::null_mut()));
2077        // Let underlyingSourceDict be underlyingSource,
2078        // converted to an IDL value of type UnderlyingSource.
2079        let underlying_source_dict = if !underlying_source_obj.is_null() {
2080            rooted!(in(*cx) let obj_val = ObjectValue(underlying_source_obj.get()));
2081            match JsUnderlyingSource::new(cx, obj_val.handle(), can_gc) {
2082                Ok(ConversionResult::Success(val)) => val,
2083                Ok(ConversionResult::Failure(error)) => {
2084                    return Err(Error::Type(error.into_owned()));
2085                },
2086                _ => {
2087                    return Err(Error::JSFailed);
2088                },
2089            }
2090        } else {
2091            JsUnderlyingSource::empty()
2092        };
2093
2094        // Perform ! InitializeReadableStream(this).
2095        let stream = ReadableStream::new_with_proto(global, proto, can_gc);
2096
2097        if underlying_source_dict.type_.is_some() {
2098            // If strategy["size"] exists, throw a RangeError exception.
2099            if strategy.size.is_some() {
2100                return Err(Error::Range(
2101                    c"size is not supported for byte streams".to_owned(),
2102                ));
2103            }
2104
2105            // Let highWaterMark be ? ExtractHighWaterMark(strategy, 0).
2106            let strategy_hwm = extract_high_water_mark(strategy, 0.0)?;
2107
2108            // Perform ? SetUpReadableByteStreamControllerFromUnderlyingSource(this,
2109            // underlyingSource, underlyingSourceDict, highWaterMark).
2110            stream.set_up_byte_controller(
2111                global,
2112                underlying_source_dict,
2113                underlying_source_obj.handle(),
2114                stream.clone(),
2115                strategy_hwm,
2116                can_gc,
2117            )?;
2118        } else {
2119            // Let highWaterMark be ? ExtractHighWaterMark(strategy, 1).
2120            let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
2121
2122            // Let sizeAlgorithm be ! ExtractSizeAlgorithm(strategy).
2123            let size_algorithm = extract_size_algorithm(strategy, can_gc);
2124
2125            let controller = ReadableStreamDefaultController::new(
2126                global,
2127                UnderlyingSourceType::Js(underlying_source_dict, Heap::default()),
2128                high_water_mark,
2129                size_algorithm,
2130                can_gc,
2131            );
2132
2133            // Note: this must be done before `setup`,
2134            // otherwise `thisOb` is null in the start callback.
2135            controller.set_underlying_source_this_object(underlying_source_obj.handle());
2136
2137            // Perform ? SetUpReadableStreamDefaultControllerFromUnderlyingSource
2138            controller.setup(stream.clone(), can_gc)?;
2139        };
2140
2141        Ok(stream)
2142    }
2143
2144    /// <https://streams.spec.whatwg.org/#rs-locked>
2145    fn Locked(&self) -> bool {
2146        self.is_locked()
2147    }
2148
2149    /// <https://streams.spec.whatwg.org/#rs-cancel>
2150    fn Cancel(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
2151        let global = self.global();
2152        if self.is_locked() {
2153            // If ! IsReadableStreamLocked(this) is true,
2154            // return a promise rejected with a TypeError exception.
2155            let promise = Promise::new(&global, can_gc);
2156            promise.reject_error(Error::Type(c"stream is locked".to_owned()), can_gc);
2157            promise
2158        } else {
2159            // Return ! ReadableStreamCancel(this, reason).
2160            self.cancel(cx, &global, reason, can_gc)
2161        }
2162    }
2163
2164    /// <https://streams.spec.whatwg.org/#rs-get-reader>
2165    fn GetReader(
2166        &self,
2167        options: &ReadableStreamGetReaderOptions,
2168        can_gc: CanGc,
2169    ) -> Fallible<ReadableStreamReader> {
2170        // 1, If options["mode"] does not exist, return ? AcquireReadableStreamDefaultReader(this).
2171        if options.mode.is_none() {
2172            return Ok(ReadableStreamReader::ReadableStreamDefaultReader(
2173                self.acquire_default_reader(can_gc)?,
2174            ));
2175        }
2176        // 2. Assert: options["mode"] is "byob".
2177        assert!(options.mode.unwrap() == ReadableStreamReaderMode::Byob);
2178
2179        // 3. Return ? AcquireReadableStreamBYOBReader(this).
2180        Ok(ReadableStreamReader::ReadableStreamBYOBReader(
2181            self.acquire_byob_reader(can_gc)?,
2182        ))
2183    }
2184
2185    /// <https://streams.spec.whatwg.org/#rs-tee>
2186    fn Tee(&self, can_gc: CanGc) -> Fallible<Vec<DomRoot<ReadableStream>>> {
2187        // Return ? ReadableStreamTee(this, false).
2188        self.tee(false, can_gc)
2189    }
2190
2191    /// <https://streams.spec.whatwg.org/#rs-pipe-to>
2192    fn PipeTo(
2193        &self,
2194        cx: &mut CurrentRealm,
2195        destination: &WritableStream,
2196        options: &StreamPipeOptions,
2197    ) -> Rc<Promise> {
2198        let global = self.global();
2199
2200        // If ! IsReadableStreamLocked(this) is true,
2201        if self.is_locked() {
2202            // return a promise rejected with a TypeError exception.
2203            let promise = Promise::new2(cx, &global);
2204            promise.reject_error(
2205                Error::Type(c"Source stream is locked".to_owned()),
2206                CanGc::from_cx(cx),
2207            );
2208            return promise;
2209        }
2210
2211        // If ! IsWritableStreamLocked(destination) is true,
2212        if destination.is_locked() {
2213            // return a promise rejected with a TypeError exception.
2214            let promise = Promise::new2(cx, &global);
2215            promise.reject_error(
2216                Error::Type(c"Destination stream is locked".to_owned()),
2217                CanGc::from_cx(cx),
2218            );
2219            return promise;
2220        }
2221
2222        // Let signal be options["signal"] if it exists, or undefined otherwise.
2223        let signal = options.signal.as_deref();
2224
2225        // Return ! ReadableStreamPipeTo.
2226        self.pipe_to(
2227            cx,
2228            &global,
2229            destination,
2230            options.preventClose,
2231            options.preventAbort,
2232            options.preventCancel,
2233            signal,
2234        )
2235    }
2236
2237    /// <https://streams.spec.whatwg.org/#rs-pipe-through>
2238    fn PipeThrough(
2239        &self,
2240        cx: &mut CurrentRealm,
2241        transform: &ReadableWritablePair,
2242        options: &StreamPipeOptions,
2243    ) -> Fallible<DomRoot<ReadableStream>> {
2244        let global = self.global();
2245
2246        // If ! IsReadableStreamLocked(this) is true, throw a TypeError exception.
2247        if self.is_locked() {
2248            return Err(Error::Type(c"Source stream is locked".to_owned()));
2249        }
2250
2251        // If ! IsWritableStreamLocked(transform["writable"]) is true, throw a TypeError exception.
2252        if transform.writable.is_locked() {
2253            return Err(Error::Type(c"Destination stream is locked".to_owned()));
2254        }
2255
2256        // Let signal be options["signal"] if it exists, or undefined otherwise.
2257        let signal = options.signal.as_deref();
2258
2259        // Let promise be ! ReadableStreamPipeTo(this, transform["writable"],
2260        // options["preventClose"], options["preventAbort"], options["preventCancel"], signal).
2261        let promise = self.pipe_to(
2262            cx,
2263            &global,
2264            &transform.writable,
2265            options.preventClose,
2266            options.preventAbort,
2267            options.preventCancel,
2268            signal,
2269        );
2270
2271        // Set promise.[[PromiseIsHandled]] to true.
2272        promise.set_promise_is_handled();
2273
2274        // Return transform["readable"].
2275        Ok(transform.readable.clone())
2276    }
2277}
2278
2279#[expect(unsafe_code)]
2280/// The initial steps for the message handler for both readable and writable cross realm transforms.
2281/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2282/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
2283pub(crate) unsafe fn get_type_and_value_from_message(
2284    cx: SafeJSContext,
2285    data: SafeHandleValue,
2286    value: SafeMutableHandleValue,
2287    can_gc: CanGc,
2288) -> DOMString {
2289    // Let data be the data of the message.
2290    // Note: we are passed the data as argument,
2291    // which originates in the return value of `structuredclone::read`.
2292
2293    // Assert: data is an Object.
2294    assert!(data.is_object());
2295    rooted!(in(*cx) let data_object = data.to_object());
2296
2297    // Let type be ! Get(data, "type").
2298    rooted!(in(*cx) let mut type_ = UndefinedValue());
2299    unsafe {
2300        get_dictionary_property(
2301            *cx,
2302            data_object.handle(),
2303            c"type",
2304            type_.handle_mut(),
2305            can_gc,
2306        )
2307    }
2308    .expect("Getting the type should not fail.");
2309
2310    // Let value be ! Get(data, "value").
2311    unsafe { get_dictionary_property(*cx, data_object.handle(), c"value", value, can_gc) }
2312        .expect("Getting the value should not fail.");
2313
2314    // Assert: type is a String.
2315    let result =
2316        DOMString::safe_from_jsval(cx, type_.handle(), StringificationBehavior::Empty, can_gc)
2317            .expect("The type of the message should be a string");
2318    let ConversionResult::Success(type_string) = result else {
2319        unreachable!("The type of the message should be a string");
2320    };
2321
2322    type_string
2323}
2324
2325impl js::gc::Rootable for CrossRealmTransformReadable {}
2326
2327/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2328/// A wrapper to handle `message` and `messageerror` events
2329/// for the port used by the transfered stream.
2330#[derive(Clone, JSTraceable, MallocSizeOf)]
2331#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
2332pub(crate) struct CrossRealmTransformReadable {
2333    /// The controller used in the algorithm.
2334    controller: Dom<ReadableStreamDefaultController>,
2335}
2336
2337impl CrossRealmTransformReadable {
2338    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2339    /// Add a handler for port’s message event with the following steps:
2340    #[expect(unsafe_code)]
2341    pub(crate) fn handle_message(
2342        &self,
2343        cx: SafeJSContext,
2344        global: &GlobalScope,
2345        port: &MessagePort,
2346        message: SafeHandleValue,
2347        _realm: InRealm,
2348        can_gc: CanGc,
2349    ) {
2350        rooted!(in(*cx) let mut value = UndefinedValue());
2351        let type_string =
2352            unsafe { get_type_and_value_from_message(cx, message, value.handle_mut(), can_gc) };
2353
2354        // If type is "chunk",
2355        if type_string == "chunk" {
2356            // Perform ! ReadableStreamDefaultControllerEnqueue(controller, value).
2357            self.controller
2358                .enqueue(cx, value.handle(), can_gc)
2359                .expect("Enqueing a chunk should not fail.");
2360        }
2361
2362        // Otherwise, if type is "close",
2363        if type_string == "close" {
2364            // Perform ! ReadableStreamDefaultControllerClose(controller).
2365            self.controller.close(can_gc);
2366
2367            // Disentangle port.
2368            global.disentangle_port(port, can_gc);
2369        }
2370
2371        // Otherwise, if type is "error",
2372        if type_string == "error" {
2373            // Perform ! ReadableStreamDefaultControllerError(controller, value).
2374            self.controller.error(value.handle(), can_gc);
2375
2376            // Disentangle port.
2377            global.disentangle_port(port, can_gc);
2378        }
2379    }
2380
2381    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
2382    /// Add a handler for port’s messageerror event with the following steps:
2383    pub(crate) fn handle_error(
2384        &self,
2385        cx: SafeJSContext,
2386        global: &GlobalScope,
2387        port: &MessagePort,
2388        _realm: InRealm,
2389        can_gc: CanGc,
2390    ) {
2391        // Let error be a new "DataCloneError" DOMException.
2392        let error = DOMException::new(global, DOMErrorName::DataCloneError, can_gc);
2393        rooted!(in(*cx) let mut rooted_error = UndefinedValue());
2394        error.safe_to_jsval(cx, rooted_error.handle_mut(), can_gc);
2395
2396        // Perform ! CrossRealmTransformSendError(port, error).
2397        port.cross_realm_transform_send_error(rooted_error.handle(), can_gc);
2398
2399        // Perform ! ReadableStreamDefaultControllerError(controller, error).
2400        self.controller.error(rooted_error.handle(), can_gc);
2401
2402        // Disentangle port.
2403        global.disentangle_port(port, can_gc);
2404    }
2405}
2406
2407#[expect(unsafe_code)]
2408/// Get the `done` property of an object that a read promise resolved to.
2409pub(crate) fn get_read_promise_done(
2410    cx: SafeJSContext,
2411    v: &SafeHandleValue,
2412    can_gc: CanGc,
2413) -> Result<bool, Error> {
2414    if !v.is_object() {
2415        return Err(Error::Type(c"Unknown format for done property.".to_owned()));
2416    }
2417    unsafe {
2418        rooted!(in(*cx) let object = v.to_object());
2419        rooted!(in(*cx) let mut done = UndefinedValue());
2420        match get_dictionary_property(*cx, object.handle(), c"done", done.handle_mut(), can_gc) {
2421            Ok(true) => match bool::safe_from_jsval(cx, done.handle(), (), can_gc) {
2422                Ok(ConversionResult::Success(val)) => Ok(val),
2423                Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.into_owned())),
2424                _ => Err(Error::Type(c"Unknown format for done property.".to_owned())),
2425            },
2426            Ok(false) => Err(Error::Type(c"Promise has no done property.".to_owned())),
2427            Err(()) => Err(Error::JSFailed),
2428        }
2429    }
2430}
2431
2432#[expect(unsafe_code)]
2433/// Get the `value` property of an object that a read promise resolved to.
2434pub(crate) fn get_read_promise_bytes(
2435    cx: SafeJSContext,
2436    v: &SafeHandleValue,
2437    can_gc: CanGc,
2438) -> Result<Vec<u8>, Error> {
2439    if !v.is_object() {
2440        return Err(Error::Type(
2441            c"Unknown format for for bytes read.".to_owned(),
2442        ));
2443    }
2444    unsafe {
2445        rooted!(in(*cx) let object = v.to_object());
2446        rooted!(in(*cx) let mut bytes = UndefinedValue());
2447        match get_dictionary_property(*cx, object.handle(), c"value", bytes.handle_mut(), can_gc) {
2448            Ok(true) => {
2449                match Vec::<u8>::safe_from_jsval(
2450                    cx,
2451                    bytes.handle(),
2452                    ConversionBehavior::EnforceRange,
2453                    can_gc,
2454                ) {
2455                    Ok(ConversionResult::Success(val)) => Ok(val),
2456                    Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.into_owned())),
2457                    _ => Err(Error::Type(c"Unknown format for bytes read.".to_owned())),
2458                }
2459            },
2460            Ok(false) => Err(Error::Type(c"Promise has no value property.".to_owned())),
2461            Err(()) => Err(Error::JSFailed),
2462        }
2463    }
2464}
2465
2466/// Convert a raw stream `chunk` JS value to `Vec<u8>`.
2467/// This mirrors the conversion used inside `get_read_promise_bytes`,
2468/// but operates on the raw chunk (no `{ value, done }` wrapper).
2469pub(crate) fn bytes_from_chunk_jsval(
2470    cx: SafeJSContext,
2471    chunk: &RootedTraceableBox<Heap<JSVal>>,
2472    can_gc: CanGc,
2473) -> Result<Vec<u8>, Error> {
2474    match Vec::<u8>::safe_from_jsval(cx, chunk.handle(), ConversionBehavior::EnforceRange, can_gc) {
2475        Ok(ConversionResult::Success(vec)) => Ok(vec),
2476        Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.into_owned())),
2477        _ => Err(Error::Type(c"Unknown format for bytes read.".to_owned())),
2478    }
2479}
2480
2481/// <https://streams.spec.whatwg.org/#rs-transfer>
2482impl Transferable for ReadableStream {
2483    type Index = MessagePortIndex;
2484    type Data = MessagePortImpl;
2485
2486    /// <https://streams.spec.whatwg.org/#ref-for-transfer-steps>
2487    fn transfer(
2488        &self,
2489        cx: &mut js::context::JSContext,
2490    ) -> Fallible<(MessagePortId, MessagePortImpl)> {
2491        // Step 1. If ! IsReadableStreamLocked(value) is true, throw a
2492        // "DataCloneError" DOMException.
2493        if self.is_locked() {
2494            return Err(Error::DataClone(None));
2495        }
2496
2497        let global = self.global();
2498        let mut realm = enter_auto_realm(cx, &*global);
2499        let mut realm = realm.current_realm();
2500        let cx = &mut realm;
2501
2502        // Step 2. Let port1 be a new MessagePort in the current Realm.
2503        let port_1 = MessagePort::new(&global, CanGc::from_cx(cx));
2504        global.track_message_port(&port_1, None);
2505
2506        // Step 3. Let port2 be a new MessagePort in the current Realm.
2507        let port_2 = MessagePort::new(&global, CanGc::from_cx(cx));
2508        global.track_message_port(&port_2, None);
2509
2510        // Step 4. Entangle port1 and port2.
2511        global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
2512
2513        // Step 5. Let writable be a new WritableStream in the current Realm.
2514        let writable = WritableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
2515
2516        // Step 6. Perform ! SetUpCrossRealmTransformWritable(writable, port1).
2517        writable.setup_cross_realm_transform_writable(cx.into(), &port_1, CanGc::from_cx(cx));
2518
2519        // Step 7. Let promise be ! ReadableStreamPipeTo(value, writable, false, false, false).
2520        let promise = self.pipe_to(cx, &global, &writable, false, false, false, None);
2521
2522        // Step 8. Set promise.[[PromiseIsHandled]] to true.
2523        promise.set_promise_is_handled();
2524
2525        // Step 9. Set dataHolder.[[port]] to ! StructuredSerializeWithTransfer(port2, « port2 »).
2526        port_2.transfer(cx)
2527    }
2528
2529    /// <https://streams.spec.whatwg.org/#ref-for-transfer-receiving-steps>
2530    fn transfer_receive(
2531        cx: &mut js::context::JSContext,
2532        owner: &GlobalScope,
2533        id: MessagePortId,
2534        port_impl: MessagePortImpl,
2535    ) -> Result<DomRoot<Self>, ()> {
2536        // Their transfer-receiving steps, given dataHolder and value, are:
2537        // Note: dataHolder is used in `structuredclone.rs`, and value is created here.
2538        let value = ReadableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
2539
2540        // Step 1. Let deserializedRecord be !
2541        // StructuredDeserializeWithTransfer(dataHolder.[[port]], the current
2542        // Realm).
2543        // Done with the `Deserialize` derive of `MessagePortImpl`.
2544
2545        // Step 2. Let port be deserializedRecord.[[Deserialized]].
2546        let transferred_port = MessagePort::transfer_receive(cx, owner, id, port_impl)?;
2547
2548        // Step 3. Perform ! SetUpCrossRealmTransformReadable(value, port).
2549        value.setup_cross_realm_transform_readable(
2550            cx.into(),
2551            &transferred_port,
2552            CanGc::from_cx(cx),
2553        );
2554        Ok(value)
2555    }
2556
2557    /// Note: we are relying on the port transfer, so the data returned here are related to the port.
2558    fn serialized_storage<'a>(
2559        data: StructuredData<'a, '_>,
2560    ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
2561        match data {
2562            StructuredData::Reader(r) => &mut r.port_impls,
2563            StructuredData::Writer(w) => &mut w.ports,
2564        }
2565    }
2566}