script/dom/stream/
readablestream.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr::{self};
8use std::rc::Rc;
9
10use servo_base::generic_channel::GenericSharedMemory;
11use servo_base::id::{MessagePortId, MessagePortIndex};
12use servo_constellation_traits::MessagePortImpl;
13use dom_struct::dom_struct;
14use js::jsapi::{Heap, JSObject};
15use js::jsval::{JSVal, ObjectValue, UndefinedValue};
16use js::realm::CurrentRealm;
17use js::rust::{
18    HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
19    MutableHandleValue as SafeMutableHandleValue,
20};
21use js::typedarray::ArrayBufferViewU8;
22use rustc_hash::FxHashMap;
23use script_bindings::conversions::SafeToJSValConvertible;
24
25use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
26use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{
27    ReadableStreamGetReaderOptions, ReadableStreamMethods, ReadableStreamReaderMode,
28    ReadableWritablePair, StreamPipeOptions,
29};
30use script_bindings::str::DOMString;
31
32use crate::dom::domexception::{DOMErrorName, DOMException};
33use script_bindings::conversions::{is_array_like, StringificationBehavior};
34use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
35use crate::dom::abortsignal::{AbortAlgorithm, AbortSignal};
36use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods;
37use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultController_Binding::ReadableStreamDefaultControllerMethods;
38use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource;
39use crate::dom::bindings::conversions::{ConversionBehavior, ConversionResult, SafeFromJSValConvertible};
40use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
41use crate::dom::bindings::codegen::GenericBindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriter_Binding::WritableStreamDefaultWriterMethods;
42use crate::dom::stream::writablestream::WritableStream;
43use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultReaderOrReadableStreamBYOBReader as ReadableStreamReader;
44use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
45use crate::dom::bindings::root::{DomRoot, MutNullableDom, Dom};
46use crate::dom::bindings::trace::RootedTraceableBox;
47use crate::dom::bindings::utils::get_dictionary_property;
48use crate::dom::stream::byteteeunderlyingsource::{ByteTeeCancelAlgorithm, ByteTeePullAlgorithm, ByteTeeUnderlyingSource};
49use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
50use crate::dom::stream::readablestreamgenericreader::ReadableStreamGenericReader;
51use crate::dom::globalscope::GlobalScope;
52use crate::dom::promise::{wait_for_all_promise, Promise};
53use crate::dom::stream::readablebytestreamcontroller::ReadableByteStreamController;
54use crate::dom::stream::readablestreambyobreader::ReadableStreamBYOBReader;
55use crate::dom::stream::readablestreamdefaultcontroller::ReadableStreamDefaultController;
56use crate::dom::stream::readablestreamdefaultreader::{ReadRequest, ReadableStreamDefaultReader};
57use crate::dom::stream::defaultteeunderlyingsource::DefaultTeeCancelAlgorithm;
58use crate::dom::types::DefaultTeeUnderlyingSource;
59use crate::dom::stream::underlyingsourcecontainer::UnderlyingSourceType;
60use crate::dom::stream::writablestreamdefaultwriter::WritableStreamDefaultWriter;
61use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
62use crate::dom::messageport::MessagePort;
63use crate::realms::{enter_realm, InRealm, enter_auto_realm};
64use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
65use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
66use crate::dom::bindings::transferable::Transferable;
67use crate::dom::bindings::structuredclone::StructuredData;
68
69use crate::dom::bindings::buffer_source::HeapBufferSource;
70use super::readablestreambyobreader::ReadIntoRequest;
71
72/// State Machine for `PipeTo`.
73#[derive(Clone, Debug, Default, MallocSizeOf, PartialEq)]
74enum PipeToState {
75    /// The starting state
76    #[default]
77    Starting,
78    /// Waiting for the writer to be ready
79    PendingReady,
80    /// Waiting for a read to resolve.
81    PendingRead,
82    /// Waiting for all pending writes to finish,
83    /// as part of shutting down with an optional action.
84    ShuttingDownWithPendingWrites(Option<ShutdownAction>),
85    /// When shutting down with an action,
86    /// waiting for the action to complete,
87    /// at which point we can `finalize`.
88    ShuttingDownPendingAction,
89    /// The pipe has been finalized,
90    /// no further actions should be performed.
91    Finalized,
92}
93
94/// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action>
95#[derive(Clone, Debug, MallocSizeOf, PartialEq)]
96enum ShutdownAction {
97    /// <https://streams.spec.whatwg.org/#writable-stream-abort>
98    WritableStreamAbort,
99    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>
100    ReadableStreamCancel,
101    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close-with-error-propagation>
102    WritableStreamDefaultWriterCloseWithErrorPropagation,
103    /// <https://streams.spec.whatwg.org/#ref-for-rs-pipeTo-shutdown-with-action>
104    Abort,
105}
106
107impl js::gc::Rootable for PipeTo {}
108
109/// The "in parallel, but not really" part of
110/// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
111///
112/// Note: the spec is flexible about how this is done, but requires the following constraints to apply:
113/// - Public API must not be used: we'll only use Rust.
114/// - Backpressure must be enforced: we'll only read from source when dest is ready.
115/// - Shutdown must stop activity: we'll do this together with the below.
116/// - Error and close states must be propagated: we'll do this by checking these states at every step.
117#[derive(Clone, JSTraceable, MallocSizeOf)]
118#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
119pub(crate) struct PipeTo {
120    /// <https://streams.spec.whatwg.org/#ref-for-readablestream%E2%91%A7%E2%91%A0>
121    reader: Dom<ReadableStreamDefaultReader>,
122
123    /// <https://streams.spec.whatwg.org/#ref-for-acquire-writable-stream-default-writer>
124    writer: Dom<WritableStreamDefaultWriter>,
125
126    /// Pending writes are needed when shutting down(with an action),
127    /// because we can only finalize when all writes are finished.
128    #[ignore_malloc_size_of = "nested Rc"]
129    pending_writes: Rc<RefCell<VecDeque<Rc<Promise>>>>,
130
131    /// The state machine.
132    #[conditional_malloc_size_of]
133    #[no_trace]
134    state: Rc<RefCell<PipeToState>>,
135
136    /// <https://streams.spec.whatwg.org/#readablestream-pipe-to-preventabort>
137    prevent_abort: bool,
138
139    /// <https://streams.spec.whatwg.org/#readablestream-pipe-to-preventcancel>
140    prevent_cancel: bool,
141
142    /// <https://streams.spec.whatwg.org/#readablestream-pipe-to-preventclose>
143    prevent_close: bool,
144
145    /// The `shuttingDown` variable of
146    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
147    #[conditional_malloc_size_of]
148    shutting_down: Rc<Cell<bool>>,
149
150    /// The abort reason of the abort signal,
151    /// stored here because we must keep it across a microtask.
152    #[ignore_malloc_size_of = "mozjs"]
153    abort_reason: Rc<Heap<JSVal>>,
154
155    /// The error potentially passed to shutdown,
156    /// stored here because we must keep it across a microtask.
157    #[ignore_malloc_size_of = "mozjs"]
158    shutdown_error: Rc<RefCell<Option<Heap<JSVal>>>>,
159
160    /// The promise returned by a shutdown action.
161    /// We keep it to only continue when it is not pending anymore.
162    #[ignore_malloc_size_of = "nested Rc"]
163    shutdown_action_promise: Rc<RefCell<Option<Rc<Promise>>>>,
164
165    /// The promise resolved or rejected at
166    /// <https://streams.spec.whatwg.org/#rs-pipeTo-finalize>
167    #[conditional_malloc_size_of]
168    result_promise: Rc<Promise>,
169}
170
171impl PipeTo {
172    /// Run the `abortAlgorithm` defined at
173    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
174    pub(crate) fn abort_with_reason(
175        &self,
176        cx: &mut CurrentRealm,
177        global: &GlobalScope,
178        reason: SafeHandleValue,
179    ) {
180        // Abort should do nothing if we are already shutting down.
181        if self.shutting_down.get() {
182            return;
183        }
184
185        // Let error be signal’s abort reason.
186        // Note: storing it because it may need to be kept across a microtask,
187        // and see the note below as to why it is kept separately from `shutdown_error`.
188        self.abort_reason.set(reason.get());
189
190        // Note: setting the error now,
191        // will result in a rejection of the pipe promise, with this error.
192        // Unless any shutdown action raise their own error,
193        // in which case this error will be overwritten by the shutdown action error.
194        self.set_shutdown_error(reason);
195
196        // Let actions be an empty ordered set.
197        // Note: the actions are defined, and performed, inside `shutdown_with_an_action`.
198
199        // Shutdown with an action consisting of getting a promise to wait for all of the actions in actions,
200        // and with error.
201        self.shutdown(cx, global, Some(ShutdownAction::Abort));
202    }
203}
204
205impl Callback for PipeTo {
206    /// The pipe makes progress one microtask at a time.
207    /// Note: we use one struct as the callback for all promises,
208    /// and for both of their reactions.
209    ///
210    /// The context of the callback is determined from:
211    /// - the current state.
212    /// - the type of `result`.
213    /// - the state of a stored promise(in some cases).
214    #[expect(unsafe_code)]
215    fn callback(&self, cx: &mut CurrentRealm, result: SafeHandleValue) {
216        let in_realm_proof = cx.into();
217        let realm = InRealm::Already(&in_realm_proof);
218        let global = self.reader.global();
219
220        // Note: we only care about the result of writes when they are rejected,
221        // and the error is accessed not through handlers,
222        // but directly using `dest.get_stored_error`.
223        // So we must mark rejected promises as handled
224        // to prevent unhandled rejection errors.
225        self.pending_writes.borrow_mut().retain(|p| {
226            let pending = p.is_pending();
227            if !pending {
228                p.set_promise_is_handled();
229            }
230            pending
231        });
232
233        // Note: cloning to prevent re-borrow in methods called below.
234        let state_before_checks = self.state.borrow().clone();
235
236        // Note: if we are in a `PendingRead` state,
237        // and the source is closed,
238        // we try to write chunks before doing any shutdown,
239        // which is necessary to implement the
240        // "If any chunks have been read but not yet written, write them to dest."
241        // part of shutdown.
242        if state_before_checks == PipeToState::PendingRead {
243            let source = self.reader.get_stream().expect("Source stream must be set");
244            if source.is_closed() {
245                let dest = self
246                    .writer
247                    .get_stream()
248                    .expect("Destination stream must be set");
249
250                // If dest.[[state]] is "writable",
251                // and ! WritableStreamCloseQueuedOrInFlight(dest) is false,
252                if dest.is_writable() && !dest.close_queued_or_in_flight() {
253                    let Ok(done) = get_read_promise_done(cx.into(), &result, CanGc::from_cx(cx))
254                    else {
255                        // This is the case that the microtask ran in reaction
256                        // to the closed promise of the reader,
257                        // so we should wait for subsequent chunks,
258                        // and skip the shutdown below
259                        // (reader is closed, but there are still pending reads).
260                        // Shutdown will happen when the last chunk has been received.
261                        return;
262                    };
263
264                    if !done {
265                        // If any chunks have been read but not yet written, write them to dest.
266                        self.write_chunk(cx, &global, result);
267                    }
268                }
269            }
270        }
271
272        self.check_and_propagate_errors_forward(cx, &global);
273        self.check_and_propagate_errors_backward(cx, &global);
274        self.check_and_propagate_closing_forward(cx, &global);
275        self.check_and_propagate_closing_backward(cx, &global);
276
277        // Note: cloning to prevent re-borrow in methods called below.
278        let state = self.state.borrow().clone();
279
280        // If we switched to a shutdown state,
281        // return.
282        // Progress will be made at the next tick.
283        if state != state_before_checks {
284            return;
285        }
286
287        match state {
288            PipeToState::Starting => unreachable!("PipeTo should not be in the Starting state."),
289            PipeToState::PendingReady => {
290                // Read a chunk.
291                self.read_chunk(cx, &global);
292            },
293            PipeToState::PendingRead => {
294                // Write the chunk.
295                self.write_chunk(cx, &global, result);
296
297                // An early return is necessary if the write algorithm aborted the pipe.
298                if self.shutting_down.get() {
299                    return;
300                }
301
302                // Wait for the writer to be ready again.
303                self.wait_for_writer_ready(cx, &global);
304            },
305            PipeToState::ShuttingDownWithPendingWrites(action) => {
306                // Wait until every chunk that has been read has been written
307                // (i.e. the corresponding promises have settled).
308                if let Some(write) = self.pending_writes.borrow_mut().front().cloned() {
309                    self.wait_on_pending_write(&global, write, realm, CanGc::from_cx(cx));
310                    return;
311                }
312
313                // Note: error is stored in `self.shutdown_error`.
314                if let Some(action) = action {
315                    // Let p be the result of performing action.
316                    self.perform_action(cx, &global, action);
317                } else {
318                    // Finalize, passing along error if it was given.
319                    self.finalize(cx, &global);
320                }
321            },
322            PipeToState::ShuttingDownPendingAction => {
323                let Some(ref promise) = *self.shutdown_action_promise.borrow() else {
324                    unreachable!();
325                };
326                if promise.is_pending() {
327                    // While waiting for the action to complete,
328                    // we may get callbacks for other promises(closed, ready),
329                    // and we should ignore those.
330                    return;
331                }
332
333                let is_array_like = {
334                    if !result.is_object() {
335                        false
336                    } else {
337                        unsafe { is_array_like::<crate::DomTypeHolder>(cx.raw_cx(), result) }
338                    }
339                };
340
341                // Finalize, passing along error if it was given.
342                if !result.is_undefined() && !is_array_like {
343                    // Most actions either resolve with undefined,
344                    // or reject with an error,
345                    // and the error should be used when finalizing.
346                    // One exception is the `Abort` action,
347                    // which resolves with a list of undefined values.
348
349                    // If `result` isn't undefined or array-like,
350                    // then it is an error
351                    // and should overwrite the current shutdown error.
352                    self.set_shutdown_error(result);
353                }
354                self.finalize(cx, &global);
355            },
356            PipeToState::Finalized => {},
357        }
358    }
359}
360
361impl PipeTo {
362    /// Setting shutdown error in a way that ensures it isn't
363    /// moved after it has been set.
364    fn set_shutdown_error(&self, error: SafeHandleValue) {
365        *self.shutdown_error.borrow_mut() = Some(Heap::default());
366        let Some(ref heap) = *self.shutdown_error.borrow() else {
367            unreachable!("Option set to Some(heap) above.");
368        };
369        heap.set(error.get())
370    }
371
372    /// Wait for the writer to be ready,
373    /// which implements the constraint that backpressure must be enforced.
374    fn wait_for_writer_ready(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
375        let realm = cx.into();
376        let realm = InRealm::Already(&realm);
377
378        {
379            let mut state = self.state.borrow_mut();
380            *state = PipeToState::PendingReady;
381        }
382
383        let ready_promise = self.writer.Ready();
384        if ready_promise.is_fulfilled() {
385            self.read_chunk(cx, global);
386        } else {
387            let handler = PromiseNativeHandler::new(
388                global,
389                Some(Box::new(self.clone())),
390                Some(Box::new(self.clone())),
391                CanGc::from_cx(cx),
392            );
393            ready_promise.append_native_handler(&handler, realm, CanGc::from_cx(cx));
394
395            // Note: if the writer is not ready,
396            // in order to ensure progress we must
397            // also react to the closure of the source(because source may close empty).
398            let closed_promise = self.reader.Closed();
399            closed_promise.append_native_handler(&handler, realm, CanGc::from_cx(cx));
400        }
401    }
402
403    /// Read a chunk
404    fn read_chunk(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
405        let realm = cx.into();
406        let realm = InRealm::Already(&realm);
407
408        *self.state.borrow_mut() = PipeToState::PendingRead;
409        let chunk_promise = self.reader.Read(cx);
410        let handler = PromiseNativeHandler::new(
411            global,
412            Some(Box::new(self.clone())),
413            Some(Box::new(self.clone())),
414            CanGc::from_cx(cx),
415        );
416        chunk_promise.append_native_handler(&handler, realm, CanGc::from_cx(cx));
417
418        // Note: in order to ensure progress we must
419        // also react to the closure of the destination.
420        let ready_promise = self.writer.Closed();
421        ready_promise.append_native_handler(&handler, realm, CanGc::from_cx(cx));
422    }
423
424    /// Try to write a chunk using the jsval, and returns wether it succeeded
425    // It will fail if it is the last `done` chunk, or if it is not a chunk at all.
426    #[expect(unsafe_code)]
427    fn write_chunk(
428        &self,
429        cx: &mut js::context::JSContext,
430        global: &GlobalScope,
431        chunk: SafeHandleValue,
432    ) -> bool {
433        if chunk.is_object() {
434            rooted!(&in(cx) let object = chunk.to_object());
435            rooted!(&in(cx) let mut bytes = UndefinedValue());
436            let has_value = unsafe {
437                get_dictionary_property(
438                    cx.raw_cx(),
439                    object.handle(),
440                    c"value",
441                    bytes.handle_mut(),
442                    CanGc::from_cx(cx),
443                )
444                .expect("Chunk should have a value.")
445            };
446            if has_value {
447                // Write the chunk.
448                let write_promise = self.writer.write(cx, global, bytes.handle());
449                self.pending_writes.borrow_mut().push_back(write_promise);
450                return true;
451            }
452        }
453        false
454    }
455
456    /// Only as part of shutting-down do we wait on pending writes
457    /// (backpressure is communicated not through pending writes
458    /// but through the readiness of the writer).
459    fn wait_on_pending_write(
460        &self,
461        global: &GlobalScope,
462        promise: Rc<Promise>,
463        realm: InRealm,
464        can_gc: CanGc,
465    ) {
466        let handler = PromiseNativeHandler::new(
467            global,
468            Some(Box::new(self.clone())),
469            Some(Box::new(self.clone())),
470            can_gc,
471        );
472        promise.append_native_handler(&handler, realm, can_gc);
473    }
474
475    /// Errors must be propagated forward part of
476    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
477    fn check_and_propagate_errors_forward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
478        // An early return is necessary if we are shutting down,
479        // because in that case the source can already have been set to none.
480        if self.shutting_down.get() {
481            return;
482        }
483
484        // if source.[[state]] is or becomes "errored", then
485        let source = self
486            .reader
487            .get_stream()
488            .expect("Reader should still have a stream");
489        if source.is_errored() {
490            rooted!(&in(cx) let mut source_error = UndefinedValue());
491            source.get_stored_error(source_error.handle_mut());
492            self.set_shutdown_error(source_error.handle());
493
494            // If preventAbort is false,
495            if !self.prevent_abort {
496                // shutdown with an action of ! WritableStreamAbort(dest, source.[[storedError]])
497                // and with source.[[storedError]].
498                self.shutdown(cx, global, Some(ShutdownAction::WritableStreamAbort))
499            } else {
500                // Otherwise, shutdown with source.[[storedError]].
501                self.shutdown(cx, global, None);
502            }
503        }
504    }
505
506    /// Errors must be propagated backward part of
507    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
508    fn check_and_propagate_errors_backward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
509        // An early return is necessary if we are shutting down,
510        // because in that case the destination can already have been set to none.
511        if self.shutting_down.get() {
512            return;
513        }
514
515        // if dest.[[state]] is or becomes "errored", then
516        let dest = self
517            .writer
518            .get_stream()
519            .expect("Writer should still have a stream");
520        if dest.is_errored() {
521            rooted!(&in(cx) let mut dest_error = UndefinedValue());
522            dest.get_stored_error(dest_error.handle_mut());
523            self.set_shutdown_error(dest_error.handle());
524
525            // If preventCancel is false,
526            if !self.prevent_cancel {
527                // shutdown with an action of ! ReadableStreamCancel(source, dest.[[storedError]])
528                // and with dest.[[storedError]].
529                self.shutdown(cx, global, Some(ShutdownAction::ReadableStreamCancel))
530            } else {
531                // Otherwise, shutdown with dest.[[storedError]].
532                self.shutdown(cx, global, None);
533            }
534        }
535    }
536
537    /// Closing must be propagated forward part of
538    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
539    fn check_and_propagate_closing_forward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
540        // An early return is necessary if we are shutting down,
541        // because in that case the source can already have been set to none.
542        if self.shutting_down.get() {
543            return;
544        }
545
546        // if source.[[state]] is or becomes "closed", then
547        let source = self
548            .reader
549            .get_stream()
550            .expect("Reader should still have a stream");
551        if source.is_closed() {
552            // If preventClose is false,
553            if !self.prevent_close {
554                // shutdown with an action of ! WritableStreamAbort(dest, source.[[storedError]])
555                // and with source.[[storedError]].
556                self.shutdown(
557                    cx,
558                    global,
559                    Some(ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation),
560                )
561            } else {
562                // Otherwise, shutdown.
563                self.shutdown(cx, global, None);
564            }
565        }
566    }
567
568    /// Closing must be propagated backward part of
569    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
570    fn check_and_propagate_closing_backward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
571        // An early return is necessary if we are shutting down,
572        // because in that case the destination can already have been set to none.
573        if self.shutting_down.get() {
574            return;
575        }
576
577        // if ! WritableStreamCloseQueuedOrInFlight(dest) is true
578        // or dest.[[state]] is "closed"
579        let dest = self
580            .writer
581            .get_stream()
582            .expect("Writer should still have a stream");
583        if dest.close_queued_or_in_flight() || dest.is_closed() {
584            // Assert: no chunks have been read or written.
585            // Note: unclear how to perform this assertion.
586
587            // Let destClosed be a new TypeError.
588            rooted!(&in(cx) let mut dest_closed = UndefinedValue());
589            let error =
590                Error::Type(c"Destination is closed or has closed queued or in flight".to_owned());
591            error.to_jsval(
592                cx.into(),
593                global,
594                dest_closed.handle_mut(),
595                CanGc::from_cx(cx),
596            );
597            self.set_shutdown_error(dest_closed.handle());
598
599            // If preventCancel is false,
600            if !self.prevent_cancel {
601                // shutdown with an action of ! ReadableStreamCancel(source, destClosed)
602                // and with destClosed.
603                self.shutdown(cx, global, Some(ShutdownAction::ReadableStreamCancel))
604            } else {
605                // Otherwise, shutdown with destClosed.
606                self.shutdown(cx, global, None);
607            }
608        }
609    }
610
611    /// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action>
612    /// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown>
613    /// Combined into one method with an optional action.
614    fn shutdown(
615        &self,
616        cx: &mut CurrentRealm,
617        global: &GlobalScope,
618        action: Option<ShutdownAction>,
619    ) {
620        let realm = cx.into();
621        let realm = InRealm::Already(&realm);
622        // If shuttingDown is true, abort these substeps.
623        // Set shuttingDown to true.
624        if !self.shutting_down.replace(true) {
625            let dest = self.writer.get_stream().expect("Stream must be set");
626            // If dest.[[state]] is "writable",
627            // and ! WritableStreamCloseQueuedOrInFlight(dest) is false,
628            if dest.is_writable() && !dest.close_queued_or_in_flight() {
629                // If any chunks have been read but not yet written, write them to dest.
630                // Done at the top of `Callback`.
631
632                // Wait until every chunk that has been read has been written
633                // (i.e. the corresponding promises have settled).
634                if let Some(write) = self.pending_writes.borrow_mut().front() {
635                    *self.state.borrow_mut() = PipeToState::ShuttingDownWithPendingWrites(action);
636                    self.wait_on_pending_write(global, write.clone(), realm, CanGc::from_cx(cx));
637                    return;
638                }
639            }
640
641            // Note: error is stored in `self.shutdown_error`.
642            if let Some(action) = action {
643                // Let p be the result of performing action.
644                self.perform_action(cx, global, action);
645            } else {
646                // Finalize, passing along error if it was given.
647                self.finalize(cx, global);
648            }
649        }
650    }
651
652    /// The perform action part of
653    /// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action>
654    fn perform_action(&self, cx: &mut CurrentRealm, global: &GlobalScope, action: ShutdownAction) {
655        let realm = cx.into();
656        let realm = InRealm::Already(&realm);
657        rooted!(&in(cx) let mut error = UndefinedValue());
658        if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
659            error.set(shutdown_error.get());
660        }
661
662        *self.state.borrow_mut() = PipeToState::ShuttingDownPendingAction;
663
664        // Let p be the result of performing action.
665        let promise = match action {
666            ShutdownAction::WritableStreamAbort => {
667                let dest = self.writer.get_stream().expect("Stream must be set");
668                dest.abort(cx, global, error.handle())
669            },
670            ShutdownAction::ReadableStreamCancel => {
671                let source = self
672                    .reader
673                    .get_stream()
674                    .expect("Reader should have a stream.");
675                source.cancel(cx, global, error.handle())
676            },
677            ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation => {
678                self.writer.close_with_error_propagation(cx, global)
679            },
680            ShutdownAction::Abort => {
681                // Note: implementation of the `abortAlgorithm`
682                // of the signal associated with this piping operation.
683
684                // Let error be signal’s abort reason.
685                rooted!(&in(cx) let mut error = UndefinedValue());
686                error.set(self.abort_reason.get());
687
688                // Let actions be an empty ordered set.
689                let mut actions = vec![];
690
691                // If preventAbort is false, append the following action to actions:
692                if !self.prevent_abort {
693                    let dest = self
694                        .writer
695                        .get_stream()
696                        .expect("Destination stream must be set");
697
698                    // If dest.[[state]] is "writable",
699                    let promise = if dest.is_writable() {
700                        // return ! WritableStreamAbort(dest, error)
701                        dest.abort(cx, global, error.handle())
702                    } else {
703                        // Otherwise, return a promise resolved with undefined.
704                        Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
705                    };
706                    actions.push(promise);
707                }
708
709                // If preventCancel is false, append the following action action to actions:
710                if !self.prevent_cancel {
711                    let source = self.reader.get_stream().expect("Source stream must be set");
712
713                    // If source.[[state]] is "readable",
714                    let promise = if source.is_readable() {
715                        // return ! ReadableStreamCancel(source, error).
716                        source.cancel(cx, global, error.handle())
717                    } else {
718                        // Otherwise, return a promise resolved with undefined.
719                        Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
720                    };
721                    actions.push(promise);
722                }
723
724                // Shutdown with an action consisting
725                // of getting a promise to wait for all of the actions in actions,
726                // and with error.
727                wait_for_all_promise(cx.into(), global, actions, realm, CanGc::from_cx(cx))
728            },
729        };
730
731        // Upon fulfillment of p, finalize, passing along originalError if it was given.
732        // Upon rejection of p with reason newError, finalize with newError.
733        let handler = PromiseNativeHandler::new(
734            global,
735            Some(Box::new(self.clone())),
736            Some(Box::new(self.clone())),
737            CanGc::from_cx(cx),
738        );
739        promise.append_native_handler(&handler, realm, CanGc::from_cx(cx));
740        *self.shutdown_action_promise.borrow_mut() = Some(promise);
741    }
742
743    /// <https://streams.spec.whatwg.org/#rs-pipeTo-finalize>
744    fn finalize(&self, cx: &mut js::context::JSContext, global: &GlobalScope) {
745        *self.state.borrow_mut() = PipeToState::Finalized;
746
747        // Perform ! WritableStreamDefaultWriterRelease(writer).
748        self.writer.release(cx.into(), global, CanGc::from_cx(cx));
749
750        // If reader implements ReadableStreamBYOBReader,
751        // perform ! ReadableStreamBYOBReaderRelease(reader).
752        // TODO.
753
754        // Otherwise, perform ! ReadableStreamDefaultReaderRelease(reader).
755        self.reader
756            .release(CanGc::from_cx(cx))
757            .expect("Releasing the reader should not fail");
758
759        // If signal is not undefined, remove abortAlgorithm from signal.
760        // Note: since `self.shutdown` is true at this point,
761        // the abort algorithm is a no-op,
762        // so for now not implementing this step.
763
764        if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
765            rooted!(&in(cx) let mut error = UndefinedValue());
766            error.set(shutdown_error.get());
767            // If error was given, reject promise with error.
768            self.result_promise
769                .reject_native(&error.handle(), CanGc::from_cx(cx));
770        } else {
771            // Otherwise, resolve promise with undefined.
772            self.result_promise.resolve_native(&(), CanGc::from_cx(cx));
773        }
774    }
775}
776
777/// The fulfillment handler for the reacting to sourceCancelPromise part of
778/// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
779#[derive(Clone, JSTraceable, MallocSizeOf)]
780struct SourceCancelPromiseFulfillmentHandler {
781    #[conditional_malloc_size_of]
782    result: Rc<Promise>,
783}
784
785impl Callback for SourceCancelPromiseFulfillmentHandler {
786    /// The fulfillment handler for the reacting to sourceCancelPromise part of
787    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
788    /// An implementation of <https://webidl.spec.whatwg.org/#dfn-perform-steps-once-promise-is-settled>
789    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
790        self.result.resolve_native(&(), CanGc::from_cx(cx));
791    }
792}
793
794/// The rejection handler for the reacting to sourceCancelPromise part of
795/// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
796#[derive(Clone, JSTraceable, MallocSizeOf)]
797struct SourceCancelPromiseRejectionHandler {
798    #[conditional_malloc_size_of]
799    result: Rc<Promise>,
800}
801
802impl Callback for SourceCancelPromiseRejectionHandler {
803    /// The rejection handler for the reacting to sourceCancelPromise part of
804    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
805    /// An implementation of <https://webidl.spec.whatwg.org/#dfn-perform-steps-once-promise-is-settled>
806    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
807        self.result.reject_native(&v, CanGc::from_cx(cx));
808    }
809}
810
811/// <https://streams.spec.whatwg.org/#readablestream-state>
812#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf, PartialEq)]
813pub(crate) enum ReadableStreamState {
814    #[default]
815    Readable,
816    Closed,
817    Errored,
818}
819
820/// <https://streams.spec.whatwg.org/#readablestream-controller>
821#[derive(JSTraceable, MallocSizeOf)]
822#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
823pub(crate) enum ControllerType {
824    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller>
825    Byte(MutNullableDom<ReadableByteStreamController>),
826    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller>
827    Default(MutNullableDom<ReadableStreamDefaultController>),
828}
829
830/// <https://streams.spec.whatwg.org/#readablestream-readerr>
831#[derive(JSTraceable, MallocSizeOf)]
832#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
833pub(crate) enum ReaderType {
834    /// <https://streams.spec.whatwg.org/#readablestreambyobreader>
835    #[allow(clippy::upper_case_acronyms)]
836    BYOB(MutNullableDom<ReadableStreamBYOBReader>),
837    /// <https://streams.spec.whatwg.org/#readablestreamdefaultreader>
838    Default(MutNullableDom<ReadableStreamDefaultReader>),
839}
840
841impl Eq for ReaderType {}
842impl PartialEq for ReaderType {
843    fn eq(&self, other: &Self) -> bool {
844        matches!(
845            (self, other),
846            (ReaderType::BYOB(_), ReaderType::BYOB(_)) |
847                (ReaderType::Default(_), ReaderType::Default(_))
848        )
849    }
850}
851
852/// <https://streams.spec.whatwg.org/#create-readable-stream>
853#[cfg_attr(crown, expect(crown::unrooted_must_root))]
854pub(crate) fn create_readable_stream(
855    cx: &mut js::context::JSContext,
856    global: &GlobalScope,
857    underlying_source_type: UnderlyingSourceType,
858    queuing_strategy: Option<Rc<QueuingStrategySize>>,
859    high_water_mark: Option<f64>,
860) -> DomRoot<ReadableStream> {
861    // If highWaterMark was not passed, set it to 1.
862    let high_water_mark = high_water_mark.unwrap_or(1.0);
863
864    // If sizeAlgorithm was not passed, set it to an algorithm that returns 1.
865    let size_algorithm = queuing_strategy.unwrap_or(extract_size_algorithm(
866        &QueuingStrategy::empty(),
867        CanGc::from_cx(cx),
868    ));
869
870    // Assert: ! IsNonNegativeNumber(highWaterMark) is true.
871    assert!(high_water_mark >= 0.0);
872
873    // Let stream be a new ReadableStream.
874    // Perform ! InitializeReadableStream(stream).
875    let stream = ReadableStream::new_with_proto(global, None, CanGc::from_cx(cx));
876
877    // Let controller be a new ReadableStreamDefaultController.
878    let controller = ReadableStreamDefaultController::new(
879        global,
880        underlying_source_type,
881        high_water_mark,
882        size_algorithm,
883        CanGc::from_cx(cx),
884    );
885
886    // Perform ? SetUpReadableStreamDefaultController(stream, controller, startAlgorithm,
887    // pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm).
888    controller
889        .setup(cx, stream.clone())
890        .expect("Setup of default controller cannot fail");
891
892    // Return stream.
893    stream
894}
895
896/// <https://streams.spec.whatwg.org/#abstract-opdef-createreadablebytestream>
897#[cfg_attr(crown, expect(crown::unrooted_must_root))]
898fn readable_byte_stream_tee(
899    cx: &mut js::context::JSContext,
900    global: &GlobalScope,
901    underlying_source_type: UnderlyingSourceType,
902) -> DomRoot<ReadableStream> {
903    // Let stream be a new ReadableStream.
904    // Perform ! InitializeReadableStream(stream).
905    let tee_stream = ReadableStream::new_with_proto(global, None, CanGc::from_cx(cx));
906
907    // Let controller be a new ReadableByteStreamController.
908    let controller =
909        ReadableByteStreamController::new(underlying_source_type, 0.0, global, CanGc::from_cx(cx));
910
911    // Perform ? SetUpReadableByteStreamController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, 0, undefined).
912    controller
913        .setup(cx, global, tee_stream.clone())
914        .expect("Setup of byte stream controller cannot fail");
915
916    // Return stream.
917    tee_stream
918}
919
920/// <https://streams.spec.whatwg.org/#rs-class>
921#[dom_struct]
922pub(crate) struct ReadableStream {
923    reflector_: Reflector,
924
925    /// <https://streams.spec.whatwg.org/#readablestream-controller>
926    /// Note: the inner `MutNullableDom` should really be an `Option<Dom>`,
927    /// because it is never unset once set.
928    controller: RefCell<Option<ControllerType>>,
929
930    /// <https://streams.spec.whatwg.org/#readablestream-storederror>
931    #[ignore_malloc_size_of = "mozjs"]
932    stored_error: Heap<JSVal>,
933
934    /// <https://streams.spec.whatwg.org/#readablestream-disturbed>
935    disturbed: Cell<bool>,
936
937    /// <https://streams.spec.whatwg.org/#readablestream-reader>
938    reader: RefCell<Option<ReaderType>>,
939
940    /// <https://streams.spec.whatwg.org/#readablestream-state>
941    state: Cell<ReadableStreamState>,
942}
943
944impl ReadableStream {
945    /// <https://streams.spec.whatwg.org/#initialize-readable-stream>
946    fn new_inherited() -> ReadableStream {
947        ReadableStream {
948            reflector_: Reflector::new(),
949            controller: RefCell::new(None),
950            stored_error: Heap::default(),
951            disturbed: Default::default(),
952            reader: RefCell::new(None),
953            state: Cell::new(Default::default()),
954        }
955    }
956
957    pub(crate) fn new_with_proto(
958        global: &GlobalScope,
959        proto: Option<SafeHandleObject>,
960        can_gc: CanGc,
961    ) -> DomRoot<ReadableStream> {
962        reflect_dom_object_with_proto(
963            Box::new(ReadableStream::new_inherited()),
964            global,
965            proto,
966            can_gc,
967        )
968    }
969
970    /// Used as part of
971    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
972    pub(crate) fn set_default_controller(&self, controller: &ReadableStreamDefaultController) {
973        *self.controller.borrow_mut() = Some(ControllerType::Default(MutNullableDom::new(Some(
974            controller,
975        ))));
976    }
977
978    /// Used as part of
979    /// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
980    pub(crate) fn set_byte_controller(&self, controller: &ReadableByteStreamController) {
981        *self.controller.borrow_mut() =
982            Some(ControllerType::Byte(MutNullableDom::new(Some(controller))));
983    }
984
985    /// Used as part of
986    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
987    pub(crate) fn assert_no_controller(&self) {
988        let has_no_controller = self.controller.borrow().is_none();
989        assert!(has_no_controller);
990    }
991
992    /// Build a stream backed by a Rust source that has already been read into memory.
993    pub(crate) fn new_from_bytes(
994        cx: &mut js::context::JSContext,
995        global: &GlobalScope,
996        bytes: Vec<u8>,
997    ) -> Fallible<DomRoot<ReadableStream>> {
998        let stream = ReadableStream::new_with_external_underlying_source(
999            cx,
1000            global,
1001            UnderlyingSourceType::Memory(bytes.len()),
1002        )?;
1003        stream.enqueue_native(bytes, CanGc::from_cx(cx));
1004        stream.controller_close_native(CanGc::from_cx(cx));
1005        Ok(stream)
1006    }
1007
1008    /// Build a stream backed by a Rust underlying source.
1009    /// Note: external sources are always paired with a default controller.
1010    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1011    pub(crate) fn new_with_external_underlying_source(
1012        cx: &mut js::context::JSContext,
1013        global: &GlobalScope,
1014        source: UnderlyingSourceType,
1015    ) -> Fallible<DomRoot<ReadableStream>> {
1016        assert!(source.is_native());
1017        let stream = ReadableStream::new_with_proto(global, None, CanGc::from_cx(cx));
1018        let controller = ReadableStreamDefaultController::new(
1019            global,
1020            source,
1021            1.0,
1022            extract_size_algorithm(&QueuingStrategy::empty(), CanGc::from_cx(cx)),
1023            CanGc::from_cx(cx),
1024        );
1025        controller.setup(cx, stream.clone())?;
1026        Ok(stream)
1027    }
1028
1029    /// Call into the release steps of the controller,
1030    pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
1031        match self.controller.borrow().as_ref() {
1032            Some(ControllerType::Default(controller)) => {
1033                let controller = controller
1034                    .get()
1035                    .ok_or_else(|| Error::Type(c"Stream should have controller.".to_owned()))?;
1036                controller.perform_release_steps()
1037            },
1038            Some(ControllerType::Byte(controller)) => {
1039                let controller = controller
1040                    .get()
1041                    .ok_or_else(|| Error::Type(c"Stream should have controller.".to_owned()))?;
1042                controller.perform_release_steps()
1043            },
1044            None => Err(Error::Type(c"Stream should have controller.".to_owned())),
1045        }
1046    }
1047
1048    /// Call into the pull steps of the controller,
1049    /// as part of
1050    /// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read>
1051    pub(crate) fn perform_pull_steps(
1052        &self,
1053        cx: SafeJSContext,
1054        read_request: &ReadRequest,
1055        can_gc: CanGc,
1056    ) {
1057        match self.controller.borrow().as_ref() {
1058            Some(ControllerType::Default(controller)) => controller
1059                .get()
1060                .expect("Stream should have controller.")
1061                .perform_pull_steps(read_request, can_gc),
1062            Some(ControllerType::Byte(controller)) => controller
1063                .get()
1064                .expect("Stream should have controller.")
1065                .perform_pull_steps(cx, read_request, can_gc),
1066            None => {
1067                unreachable!("Stream does not have a controller.");
1068            },
1069        }
1070    }
1071
1072    /// Call into the pull steps of the controller,
1073    /// as part of
1074    /// <https://streams.spec.whatwg.org/#readable-stream-byob-reader-read>
1075    pub(crate) fn perform_pull_into(
1076        &self,
1077        cx: SafeJSContext,
1078        read_into_request: &ReadIntoRequest,
1079        view: HeapBufferSource<ArrayBufferViewU8>,
1080        min: u64,
1081        can_gc: CanGc,
1082    ) {
1083        match self.controller.borrow().as_ref() {
1084            Some(ControllerType::Byte(controller)) => controller
1085                .get()
1086                .expect("Stream should have controller.")
1087                .perform_pull_into(cx, read_into_request, view, min, can_gc),
1088            _ => {
1089                unreachable!(
1090                    "Pulling a chunk from a stream with a default controller using a BYOB reader"
1091                )
1092            },
1093        }
1094    }
1095
1096    /// <https://streams.spec.whatwg.org/#readable-stream-add-read-request>
1097    pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
1098        match self.reader.borrow().as_ref() {
1099            Some(ReaderType::Default(reader)) => {
1100                let Some(reader) = reader.get() else {
1101                    panic!("Attempt to add a read request without having first acquired a reader.");
1102                };
1103
1104                // Assert: stream.[[state]] is "readable".
1105                assert!(self.is_readable());
1106
1107                // Append readRequest to stream.[[reader]].[[readRequests]].
1108                reader.add_read_request(read_request);
1109            },
1110            _ => {
1111                unreachable!("Adding a read request can only be done on a default reader.")
1112            },
1113        }
1114    }
1115
1116    /// <https://streams.spec.whatwg.org/#readable-stream-add-read-into-request>
1117    pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
1118        match self.reader.borrow().as_ref() {
1119            // Assert: stream.[[reader]] implements ReadableStreamBYOBReader.
1120            Some(ReaderType::BYOB(reader)) => {
1121                let Some(reader) = reader.get() else {
1122                    unreachable!(
1123                        "Attempt to add a read into request without having first acquired a reader."
1124                    );
1125                };
1126
1127                // Assert: stream.[[state]] is "readable" or "closed".
1128                assert!(self.is_readable() || self.is_closed());
1129
1130                // Append readRequest to stream.[[reader]].[[readIntoRequests]].
1131                reader.add_read_into_request(read_request);
1132            },
1133            _ => {
1134                unreachable!("Adding a read into request can only be done on a BYOB reader.")
1135            },
1136        }
1137    }
1138
1139    /// Endpoint to enqueue chunks directly from Rust.
1140    /// Note: in other use cases this call happens via the controller.
1141    pub(crate) fn enqueue_native(&self, bytes: Vec<u8>, can_gc: CanGc) {
1142        match self.controller.borrow().as_ref() {
1143            Some(ControllerType::Default(controller)) => controller
1144                .get()
1145                .expect("Stream should have controller.")
1146                .enqueue_native(bytes, can_gc),
1147            _ => {
1148                unreachable!(
1149                    "Enqueueing chunk to a stream from Rust on other than default controller"
1150                );
1151            },
1152        }
1153    }
1154
1155    /// <https://streams.spec.whatwg.org/#readable-stream-error>
1156    pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
1157        // Assert: stream.[[state]] is "readable".
1158        assert!(self.is_readable());
1159
1160        // Set stream.[[state]] to "errored".
1161        self.state.set(ReadableStreamState::Errored);
1162
1163        // Set stream.[[storedError]] to e.
1164        self.stored_error.set(e.get());
1165
1166        // Let reader be stream.[[reader]].
1167
1168        let default_reader = {
1169            let reader_ref = self.reader.borrow();
1170            match reader_ref.as_ref() {
1171                Some(ReaderType::Default(reader)) => reader.get(),
1172                _ => None,
1173            }
1174        };
1175
1176        if let Some(reader) = default_reader {
1177            // Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, e).
1178            reader.error(e, can_gc);
1179            return;
1180        }
1181
1182        let byob_reader = {
1183            let reader_ref = self.reader.borrow();
1184            match reader_ref.as_ref() {
1185                Some(ReaderType::BYOB(reader)) => reader.get(),
1186                _ => None,
1187            }
1188        };
1189
1190        if let Some(reader) = byob_reader {
1191            // Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e).
1192            reader.error_read_into_requests(e, can_gc);
1193        }
1194
1195        // If reader is undefined, return.
1196    }
1197
1198    /// <https://streams.spec.whatwg.org/#readablestream-storederror>
1199    pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
1200        handle_mut.set(self.stored_error.get());
1201    }
1202
1203    /// <https://streams.spec.whatwg.org/#readable-stream-error>
1204    /// Note: in other use cases this call happens via the controller.
1205    pub(crate) fn error_native(&self, error: Error, can_gc: CanGc) {
1206        let cx = GlobalScope::get_cx();
1207        rooted!(in(*cx) let mut error_val = UndefinedValue());
1208        error.to_jsval(cx, &self.global(), error_val.handle_mut(), can_gc);
1209        self.error(error_val.handle(), can_gc);
1210    }
1211
1212    /// Call into the controller's `Close` method.
1213    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-close>
1214    pub(crate) fn controller_close_native(&self, can_gc: CanGc) {
1215        match self.controller.borrow().as_ref() {
1216            Some(ControllerType::Default(controller)) => {
1217                let _ = controller
1218                    .get()
1219                    .expect("Stream should have controller.")
1220                    .Close(can_gc);
1221            },
1222            _ => {
1223                unreachable!("Native closing is only done on default controllers.")
1224            },
1225        }
1226    }
1227
1228    /// Returns a boolean reflecting whether the stream has all data in memory.
1229    /// Useful for native source integration only.
1230    pub(crate) fn in_memory(&self) -> bool {
1231        match self.controller.borrow().as_ref() {
1232            Some(ControllerType::Default(controller)) => controller
1233                .get()
1234                .expect("Stream should have controller.")
1235                .in_memory(),
1236            _ => {
1237                unreachable!(
1238                    "Checking if source is in memory for a stream with a non-default controller"
1239                )
1240            },
1241        }
1242    }
1243
1244    /// Return bytes for synchronous use, if the stream has all data in memory.
1245    /// Useful for native source integration only.
1246    pub(crate) fn get_in_memory_bytes(&self) -> Option<GenericSharedMemory> {
1247        match self.controller.borrow().as_ref() {
1248            Some(ControllerType::Default(controller)) => controller
1249                .get()
1250                .expect("Stream should have controller.")
1251                .get_in_memory_bytes()
1252                .as_deref()
1253                .map(GenericSharedMemory::from_bytes),
1254            _ => {
1255                unreachable!("Getting in-memory bytes for a stream with a non-default controller")
1256            },
1257        }
1258    }
1259
1260    /// Acquires a reader and locks the stream,
1261    /// must be done before `read_a_chunk`.
1262    /// Native call to
1263    /// <https://streams.spec.whatwg.org/#acquire-readable-stream-reader>
1264    pub(crate) fn acquire_default_reader(
1265        &self,
1266        can_gc: CanGc,
1267    ) -> Fallible<DomRoot<ReadableStreamDefaultReader>> {
1268        // Let reader be a new ReadableStreamDefaultReader.
1269        let reader = ReadableStreamDefaultReader::new(&self.global(), can_gc);
1270
1271        // Perform ? SetUpReadableStreamDefaultReader(reader, stream).
1272        reader.set_up(self, &self.global(), can_gc)?;
1273
1274        // Return reader.
1275        Ok(reader)
1276    }
1277
1278    /// <https://streams.spec.whatwg.org/#acquire-readable-stream-byob-reader>
1279    pub(crate) fn acquire_byob_reader(
1280        &self,
1281        can_gc: CanGc,
1282    ) -> Fallible<DomRoot<ReadableStreamBYOBReader>> {
1283        // Let reader be a new ReadableStreamBYOBReader.
1284        let reader = ReadableStreamBYOBReader::new(&self.global(), can_gc);
1285        // Perform ? SetUpReadableStreamBYOBReader(reader, stream).
1286        reader.set_up(self, &self.global(), can_gc)?;
1287
1288        // Return reader.
1289        Ok(reader)
1290    }
1291
1292    pub(crate) fn get_default_controller(&self) -> DomRoot<ReadableStreamDefaultController> {
1293        match self.controller.borrow().as_ref() {
1294            Some(ControllerType::Default(controller)) => {
1295                controller.get().expect("Stream should have controller.")
1296            },
1297            _ => {
1298                unreachable!(
1299                    "Getting default controller for a stream with a non-default controller"
1300                )
1301            },
1302        }
1303    }
1304
1305    pub(crate) fn get_byte_controller(&self) -> DomRoot<ReadableByteStreamController> {
1306        match self.controller.borrow().as_ref() {
1307            Some(ControllerType::Byte(controller)) => {
1308                controller.get().expect("Stream should have controller.")
1309            },
1310            _ => {
1311                unreachable!("Getting byte controller for a stream with a non-byte controller")
1312            },
1313        }
1314    }
1315
1316    pub(crate) fn get_default_reader(&self) -> DomRoot<ReadableStreamDefaultReader> {
1317        match self.reader.borrow().as_ref() {
1318            Some(ReaderType::Default(reader)) => reader.get().expect("Stream should have reader."),
1319            _ => {
1320                unreachable!("Getting default reader for a stream with a non-default reader")
1321            },
1322        }
1323    }
1324
1325    /// Read a chunk from the stream,
1326    /// must be called after `start_reading`,
1327    /// and before `stop_reading`.
1328    /// Native call to
1329    /// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read>
1330    pub(crate) fn read_a_chunk(&self, cx: &mut js::context::JSContext) -> Rc<Promise> {
1331        match self.reader.borrow().as_ref() {
1332            Some(ReaderType::Default(reader)) => {
1333                let Some(reader) = reader.get() else {
1334                    unreachable!(
1335                        "Attempt to read stream chunk without having first acquired a reader."
1336                    );
1337                };
1338                reader.Read(cx)
1339            },
1340            _ => {
1341                unreachable!("Native reading of a chunk can only be done with a default reader.")
1342            },
1343        }
1344    }
1345
1346    /// Releases the lock on the reader,
1347    /// must be done after `start_reading`.
1348    /// Native call to
1349    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultreaderrelease>
1350    pub(crate) fn stop_reading(&self, can_gc: CanGc) {
1351        let reader_ref = self.reader.borrow();
1352
1353        match reader_ref.as_ref() {
1354            Some(ReaderType::Default(reader)) => {
1355                let Some(reader) = reader.get() else {
1356                    unreachable!("Attempt to stop reading without having first acquired a reader.");
1357                };
1358
1359                drop(reader_ref);
1360                reader.release(can_gc).expect("Reader release cannot fail.");
1361            },
1362            _ => {
1363                unreachable!("Native stop reading can only be done with a default reader.")
1364            },
1365        }
1366    }
1367
1368    /// <https://streams.spec.whatwg.org/#is-readable-stream-locked>
1369    pub(crate) fn is_locked(&self) -> bool {
1370        match self.reader.borrow().as_ref() {
1371            Some(ReaderType::Default(reader)) => reader.get().is_some(),
1372            Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1373            None => false,
1374        }
1375    }
1376
1377    pub(crate) fn is_disturbed(&self) -> bool {
1378        self.disturbed.get()
1379    }
1380
1381    pub(crate) fn set_is_disturbed(&self, disturbed: bool) {
1382        self.disturbed.set(disturbed);
1383    }
1384
1385    pub(crate) fn is_closed(&self) -> bool {
1386        self.state.get() == ReadableStreamState::Closed
1387    }
1388
1389    pub(crate) fn is_errored(&self) -> bool {
1390        self.state.get() == ReadableStreamState::Errored
1391    }
1392
1393    pub(crate) fn is_readable(&self) -> bool {
1394        self.state.get() == ReadableStreamState::Readable
1395    }
1396
1397    pub(crate) fn has_default_reader(&self) -> bool {
1398        match self.reader.borrow().as_ref() {
1399            Some(ReaderType::Default(reader)) => reader.get().is_some(),
1400            _ => false,
1401        }
1402    }
1403
1404    pub(crate) fn has_byob_reader(&self) -> bool {
1405        match self.reader.borrow().as_ref() {
1406            Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1407            _ => false,
1408        }
1409    }
1410
1411    pub(crate) fn has_byte_controller(&self) -> bool {
1412        match self.controller.borrow().as_ref() {
1413            Some(ControllerType::Byte(controller)) => controller.get().is_some(),
1414            _ => false,
1415        }
1416    }
1417
1418    /// <https://streams.spec.whatwg.org/#readable-stream-get-num-read-requests>
1419    pub(crate) fn get_num_read_requests(&self) -> usize {
1420        match self.reader.borrow().as_ref() {
1421            Some(ReaderType::Default(reader)) => {
1422                let reader = reader
1423                    .get()
1424                    .expect("Stream must have a reader when getting the number of read requests.");
1425                reader.get_num_read_requests()
1426            },
1427            _ => unreachable!(
1428                "Stream must have a default reader when get num read requests is called into."
1429            ),
1430        }
1431    }
1432
1433    /// <https://streams.spec.whatwg.org/#readable-stream-get-num-read-into-requests>
1434    pub(crate) fn get_num_read_into_requests(&self) -> usize {
1435        assert!(self.has_byob_reader());
1436
1437        match self.reader.borrow().as_ref() {
1438            Some(ReaderType::BYOB(reader)) => {
1439                let Some(reader) = reader.get() else {
1440                    unreachable!(
1441                        "Stream must have a reader when get num read into requests is called into."
1442                    );
1443                };
1444                reader.get_num_read_into_requests()
1445            },
1446            _ => {
1447                unreachable!(
1448                    "Stream must have a BYOB reader when get num read into requests is called into."
1449                );
1450            },
1451        }
1452    }
1453
1454    /// <https://streams.spec.whatwg.org/#readable-stream-fulfill-read-request>
1455    pub(crate) fn fulfill_read_request(&self, chunk: SafeHandleValue, done: bool, can_gc: CanGc) {
1456        // step 1 - Assert: ! ReadableStreamHasDefaultReader(stream) is true.
1457        assert!(self.has_default_reader());
1458
1459        match self.reader.borrow().as_ref() {
1460            Some(ReaderType::Default(reader)) => {
1461                // step 2 - Let reader be stream.[[reader]].
1462                let reader = reader
1463                    .get()
1464                    .expect("Stream must have a reader when a read request is fulfilled.");
1465                // step 3 - Assert: reader.[[readRequests]] is not empty.
1466                assert_ne!(reader.get_num_read_requests(), 0);
1467                // step 4 & 5
1468                // Let readRequest be reader.[[readRequests]][0]. & Remove readRequest from reader.[[readRequests]].
1469                let request = reader.remove_read_request();
1470
1471                if done {
1472                    // step 6 - If done is true, perform readRequest’s close steps.
1473                    request.close_steps(can_gc);
1474                } else {
1475                    // step 7 - Otherwise, perform readRequest’s chunk steps, given chunk.
1476                    let result = RootedTraceableBox::new(Heap::default());
1477                    result.set(*chunk);
1478                    request.chunk_steps(result, &self.global(), can_gc);
1479                }
1480            },
1481            _ => {
1482                unreachable!(
1483                    "Stream must have a default reader when fulfill read requests is called into."
1484                );
1485            },
1486        }
1487    }
1488
1489    /// <https://streams.spec.whatwg.org/#readable-stream-fulfill-read-into-request>
1490    pub(crate) fn fulfill_read_into_request(
1491        &self,
1492        chunk: SafeHandleValue,
1493        done: bool,
1494        can_gc: CanGc,
1495    ) {
1496        // Assert: ! ReadableStreamHasBYOBReader(stream) is true.
1497        assert!(self.has_byob_reader());
1498
1499        // Let reader be stream.[[reader]].
1500        match self.reader.borrow().as_ref() {
1501            Some(ReaderType::BYOB(reader)) => {
1502                let Some(reader) = reader.get() else {
1503                    unreachable!(
1504                        "Stream must have a reader when a read into request is fulfilled."
1505                    );
1506                };
1507
1508                // Assert: reader.[[readIntoRequests]] is not empty.
1509                assert!(reader.get_num_read_into_requests() > 0);
1510
1511                // Let readIntoRequest be reader.[[readIntoRequests]][0].
1512                // Remove readIntoRequest from reader.[[readIntoRequests]].
1513                let read_into_request = reader.remove_read_into_request();
1514
1515                // If done is true, perform readIntoRequest’s close steps, given chunk.
1516                let result = RootedTraceableBox::new(Heap::default());
1517                if done {
1518                    result.set(*chunk);
1519                    read_into_request.close_steps(Some(result), can_gc);
1520                } else {
1521                    // Otherwise, perform readIntoRequest’s chunk steps, given chunk.
1522                    result.set(*chunk);
1523                    read_into_request.chunk_steps(result, can_gc);
1524                }
1525            },
1526            _ => {
1527                unreachable!(
1528                    "Stream must have a BYOB reader when fulfill read into requests is called into."
1529                );
1530            },
1531        };
1532    }
1533
1534    /// <https://streams.spec.whatwg.org/#readable-stream-close>
1535    pub(crate) fn close(&self, can_gc: CanGc) {
1536        // Assert: stream.[[state]] is "readable".
1537        assert!(self.is_readable());
1538        // Set stream.[[state]] to "closed".
1539        self.state.set(ReadableStreamState::Closed);
1540        // Let reader be stream.[[reader]].
1541
1542        // NOTE: do not hold the RefCell borrow across reader.close(),
1543        // or release() will panic when it tries to mut-borrow stream.reader.
1544        // So we pull out the underlying DOM reader in a local, then drop the borrow.
1545        let default_reader = {
1546            let reader_ref = self.reader.borrow();
1547            match reader_ref.as_ref() {
1548                Some(ReaderType::Default(reader)) => reader.get(),
1549                _ => None,
1550            }
1551        };
1552
1553        if let Some(reader) = default_reader {
1554            // steps 5 & 6 for a default reader
1555            reader.close(can_gc);
1556            return;
1557        }
1558
1559        // Same for BYOB reader.
1560        let byob_reader = {
1561            let reader_ref = self.reader.borrow();
1562            match reader_ref.as_ref() {
1563                Some(ReaderType::BYOB(reader)) => reader.get(),
1564                _ => None,
1565            }
1566        };
1567
1568        if let Some(reader) = byob_reader {
1569            // steps 5 & 6 for a BYOB reader
1570            reader.close(can_gc);
1571        }
1572
1573        // If reader is undefined, return.
1574    }
1575
1576    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>
1577    pub(crate) fn cancel(
1578        &self,
1579        cx: &mut js::context::JSContext,
1580        global: &GlobalScope,
1581        reason: SafeHandleValue,
1582    ) -> Rc<Promise> {
1583        // Set stream.[[disturbed]] to true.
1584        self.disturbed.set(true);
1585
1586        // If stream.[[state]] is "closed", return a promise resolved with undefined.
1587        if self.is_closed() {
1588            return Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
1589        }
1590        // If stream.[[state]] is "errored", return a promise rejected with stream.[[storedError]].
1591        if self.is_errored() {
1592            let promise = Promise::new2(cx, global);
1593            rooted!(&in(cx) let mut rval = UndefinedValue());
1594            self.stored_error
1595                .safe_to_jsval(cx.into(), rval.handle_mut(), CanGc::from_cx(cx));
1596            promise.reject_native(&rval.handle(), CanGc::from_cx(cx));
1597            return promise;
1598        }
1599        // Perform ! ReadableStreamClose(stream).
1600        self.close(CanGc::from_cx(cx));
1601
1602        // If reader is not undefined and reader implements ReadableStreamBYOBReader,
1603        let byob_reader = {
1604            let reader_ref = self.reader.borrow();
1605            match reader_ref.as_ref() {
1606                Some(ReaderType::BYOB(reader)) => reader.get(),
1607                _ => None,
1608            }
1609        };
1610
1611        if let Some(reader) = byob_reader {
1612            // step 6.1, 6.2 & 6.3 of https://streams.spec.whatwg.org/#readable-stream-cancel
1613            reader.cancel(CanGc::from_cx(cx));
1614        }
1615
1616        // Let sourceCancelPromise be ! stream.[[controller]].[[CancelSteps]](reason).
1617
1618        let source_cancel_promise = match self.controller.borrow().as_ref() {
1619            Some(ControllerType::Default(controller)) => controller
1620                .get()
1621                .expect("Stream should have controller.")
1622                .perform_cancel_steps(cx, global, reason),
1623            Some(ControllerType::Byte(controller)) => controller
1624                .get()
1625                .expect("Stream should have controller.")
1626                .perform_cancel_steps(cx, global, reason),
1627            None => {
1628                panic!("Stream does not have a controller.");
1629            },
1630        };
1631
1632        // Create a new promise,
1633        // and setup a handler in order to react to the fulfillment of sourceCancelPromise.
1634        let global = self.global();
1635        let result_promise = Promise::new2(cx, &global);
1636        let fulfillment_handler = Box::new(SourceCancelPromiseFulfillmentHandler {
1637            result: result_promise.clone(),
1638        });
1639        let rejection_handler = Box::new(SourceCancelPromiseRejectionHandler {
1640            result: result_promise.clone(),
1641        });
1642        let handler = PromiseNativeHandler::new(
1643            &global,
1644            Some(fulfillment_handler),
1645            Some(rejection_handler),
1646            CanGc::from_cx(cx),
1647        );
1648        let realm = enter_realm(&*global);
1649        let comp = InRealm::Entered(&realm);
1650        source_cancel_promise.append_native_handler(&handler, comp, CanGc::from_cx(cx));
1651
1652        // Return the result of reacting to sourceCancelPromise
1653        // with a fulfillment step that returns undefined.
1654        result_promise
1655    }
1656
1657    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1658    pub(crate) fn set_reader(&self, new_reader: Option<ReaderType>) {
1659        *self.reader.borrow_mut() = new_reader;
1660    }
1661
1662    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1663    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee>
1664    fn byte_tee(&self, cx: &mut js::context::JSContext) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1665        // Assert: stream implements ReadableStream.
1666        // Assert: stream.[[controller]] implements ReadableByteStreamController.
1667
1668        // Let reader be ? AcquireReadableStreamDefaultReader(stream).
1669        let reader = self.acquire_default_reader(CanGc::from_cx(cx))?;
1670        let reader = Rc::new(RefCell::new(ReaderType::Default(MutNullableDom::new(
1671            Some(&reader),
1672        ))));
1673
1674        // Let reading be false.
1675        let reading = Rc::new(Cell::new(false));
1676
1677        // Let readAgainForBranch1 be false.
1678        let read_again_for_branch_1 = Rc::new(Cell::new(false));
1679
1680        // Let readAgainForBranch2 be false.
1681        let read_again_for_branch_2 = Rc::new(Cell::new(false));
1682
1683        // Let canceled1 be false.
1684        let canceled_1 = Rc::new(Cell::new(false));
1685
1686        // Let canceled2 be false.
1687        let canceled_2 = Rc::new(Cell::new(false));
1688
1689        // Let reason1 be undefined.
1690        let reason_1 = Rc::new(Heap::boxed(UndefinedValue()));
1691
1692        // Let reason2 be undefined.
1693        let reason_2 = Rc::new(Heap::boxed(UndefinedValue()));
1694
1695        // Let cancelPromise be a new promise.
1696        let cancel_promise = Promise::new2(cx, &self.global());
1697        let reader_version = Rc::new(Cell::new(0));
1698
1699        let byte_tee_source_1 = ByteTeeUnderlyingSource::new(
1700            reader.clone(),
1701            self,
1702            reading.clone(),
1703            read_again_for_branch_1.clone(),
1704            read_again_for_branch_2.clone(),
1705            canceled_1.clone(),
1706            canceled_2.clone(),
1707            reason_1.clone(),
1708            reason_2.clone(),
1709            cancel_promise.clone(),
1710            reader_version.clone(),
1711            ByteTeeCancelAlgorithm::Cancel1Algorithm,
1712            ByteTeePullAlgorithm::Pull1Algorithm,
1713            CanGc::from_cx(cx),
1714        );
1715
1716        let byte_tee_source_2 = ByteTeeUnderlyingSource::new(
1717            reader.clone(),
1718            self,
1719            reading,
1720            read_again_for_branch_1,
1721            read_again_for_branch_2,
1722            canceled_1,
1723            canceled_2,
1724            reason_1,
1725            reason_2,
1726            cancel_promise,
1727            reader_version,
1728            ByteTeeCancelAlgorithm::Cancel2Algorithm,
1729            ByteTeePullAlgorithm::Pull2Algorithm,
1730            CanGc::from_cx(cx),
1731        );
1732
1733        // Set branch1 to ! CreateReadableByteStream(startAlgorithm, pull1Algorithm, cancel1Algorithm).
1734        let branch_1 = readable_byte_stream_tee(
1735            cx,
1736            &self.global(),
1737            UnderlyingSourceType::TeeByte(Dom::from_ref(&byte_tee_source_1)),
1738        );
1739        byte_tee_source_1.set_branch_1(&branch_1);
1740        byte_tee_source_2.set_branch_1(&branch_1);
1741
1742        // Set branch2 to ! CreateReadableByteStream(startAlgorithm, pull2Algorithm, cancel2Algorithm).
1743        let branch_2 = readable_byte_stream_tee(
1744            cx,
1745            &self.global(),
1746            UnderlyingSourceType::TeeByte(Dom::from_ref(&byte_tee_source_2)),
1747        );
1748        byte_tee_source_1.set_branch_2(&branch_2);
1749        byte_tee_source_2.set_branch_2(&branch_2);
1750
1751        // Perform forwardReaderError, given reader.
1752        byte_tee_source_1.forward_reader_error(reader.clone(), CanGc::from_cx(cx));
1753        byte_tee_source_2.forward_reader_error(reader, CanGc::from_cx(cx));
1754
1755        // Return « branch1, branch2 ».
1756        Ok(vec![branch_1, branch_2])
1757    }
1758
1759    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee>
1760    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1761    fn default_tee(
1762        &self,
1763        cx: &mut js::context::JSContext,
1764        clone_for_branch_2: bool,
1765    ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1766        // Assert: stream implements ReadableStream.
1767
1768        // Assert: cloneForBranch2 is a boolean.
1769        let clone_for_branch_2 = Rc::new(Cell::new(clone_for_branch_2));
1770
1771        // Let reader be ? AcquireReadableStreamDefaultReader(stream).
1772        let reader = self.acquire_default_reader(CanGc::from_cx(cx))?;
1773
1774        // Let reading be false.
1775        let reading = Rc::new(Cell::new(false));
1776        // Let readAgain be false.
1777        let read_again = Rc::new(Cell::new(false));
1778        // Let canceled1 be false.
1779        let canceled_1 = Rc::new(Cell::new(false));
1780        // Let canceled2 be false.
1781        let canceled_2 = Rc::new(Cell::new(false));
1782
1783        // Let reason1 be undefined.
1784        let reason_1 = Rc::new(Heap::boxed(UndefinedValue()));
1785        // Let reason2 be undefined.
1786        let reason_2 = Rc::new(Heap::boxed(UndefinedValue()));
1787        // Let cancelPromise be a new promise.
1788        let cancel_promise = Promise::new2(cx, &self.global());
1789
1790        let tee_source_1 = DefaultTeeUnderlyingSource::new(
1791            &reader,
1792            self,
1793            reading.clone(),
1794            read_again.clone(),
1795            canceled_1.clone(),
1796            canceled_2.clone(),
1797            clone_for_branch_2.clone(),
1798            reason_1.clone(),
1799            reason_2.clone(),
1800            cancel_promise.clone(),
1801            DefaultTeeCancelAlgorithm::Cancel1Algorithm,
1802            CanGc::from_cx(cx),
1803        );
1804
1805        let underlying_source_type_branch_1 =
1806            UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_1));
1807
1808        let tee_source_2 = DefaultTeeUnderlyingSource::new(
1809            &reader,
1810            self,
1811            reading,
1812            read_again,
1813            canceled_1.clone(),
1814            canceled_2.clone(),
1815            clone_for_branch_2,
1816            reason_1,
1817            reason_2,
1818            cancel_promise.clone(),
1819            DefaultTeeCancelAlgorithm::Cancel2Algorithm,
1820            CanGc::from_cx(cx),
1821        );
1822
1823        let underlying_source_type_branch_2 =
1824            UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_2));
1825
1826        // Set branch_1 to ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel1Algorithm).
1827        let branch_1 = create_readable_stream(
1828            cx,
1829            &self.global(),
1830            underlying_source_type_branch_1,
1831            None,
1832            None,
1833        );
1834        tee_source_1.set_branch_1(&branch_1);
1835        tee_source_2.set_branch_1(&branch_1);
1836
1837        // Set branch_2 to ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel2Algorithm).
1838        let branch_2 = create_readable_stream(
1839            cx,
1840            &self.global(),
1841            underlying_source_type_branch_2,
1842            None,
1843            None,
1844        );
1845        tee_source_1.set_branch_2(&branch_2);
1846        tee_source_2.set_branch_2(&branch_2);
1847
1848        // Upon rejection of reader.[[closedPromise]] with reason r,
1849        reader.default_tee_append_native_handler_to_closed_promise(
1850            &branch_1,
1851            &branch_2,
1852            canceled_1,
1853            canceled_2,
1854            cancel_promise,
1855            CanGc::from_cx(cx),
1856        );
1857
1858        // Return « branch_1, branch_2 ».
1859        Ok(vec![branch_1, branch_2])
1860    }
1861
1862    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
1863    #[allow(clippy::too_many_arguments)]
1864    pub(crate) fn pipe_to(
1865        &self,
1866        cx: &mut CurrentRealm,
1867        global: &GlobalScope,
1868        dest: &WritableStream,
1869        prevent_close: bool,
1870        prevent_abort: bool,
1871        prevent_cancel: bool,
1872        signal: Option<&AbortSignal>,
1873    ) -> Rc<Promise> {
1874        // Assert: source implements ReadableStream.
1875        // Assert: dest implements WritableStream.
1876        // Assert: prevent_close, prevent_abort, and prevent_cancel are all booleans.
1877        // Done with method signature types.
1878
1879        // If signal was not given, let signal be undefined.
1880        // Assert: either signal is undefined, or signal implements AbortSignal.
1881        // Note: done with the `signal` argument.
1882
1883        // Assert: ! IsReadableStreamLocked(source) is false.
1884        assert!(!self.is_locked());
1885
1886        // Assert: ! IsWritableStreamLocked(dest) is false.
1887        assert!(!dest.is_locked());
1888
1889        // If source.[[controller]] implements ReadableByteStreamController,
1890        // let reader be either ! AcquireReadableStreamBYOBReader(source)
1891        // or ! AcquireReadableStreamDefaultReader(source),
1892        // at the user agent’s discretion.
1893        // Note: for now only using default readers.
1894
1895        // Otherwise, let reader be ! AcquireReadableStreamDefaultReader(source).
1896        let reader = self
1897            .acquire_default_reader(CanGc::from_cx(cx))
1898            .expect("Acquiring a default reader for pipe_to cannot fail");
1899
1900        // Let writer be ! AcquireWritableStreamDefaultWriter(dest).
1901        let writer = dest
1902            .aquire_default_writer(cx.into(), global, CanGc::from_cx(cx))
1903            .expect("Acquiring a default writer for pipe_to cannot fail");
1904
1905        // Set source.[[disturbed]] to true.
1906        self.disturbed.set(true);
1907
1908        // Let shuttingDown be false.
1909        // Done below with default.
1910
1911        // Let promise be a new promise.
1912        let promise = Promise::new2(cx, global);
1913
1914        // In parallel, but not really, using reader and writer, read all chunks from source and write them to dest.
1915        rooted!(&in(cx) let pipe_to = PipeTo {
1916            reader: Dom::from_ref(&reader),
1917            writer: Dom::from_ref(&writer),
1918            pending_writes: Default::default(),
1919            state: Default::default(),
1920            prevent_abort,
1921            prevent_cancel,
1922            prevent_close,
1923            shutting_down: Default::default(),
1924            abort_reason: Default::default(),
1925            shutdown_error: Default::default(),
1926            shutdown_action_promise:  Default::default(),
1927            result_promise: promise.clone(),
1928        });
1929
1930        // If signal is not undefined,
1931        // Note: moving the steps to here, so that the `PipeTo` is available.
1932        if let Some(signal) = signal {
1933            // Let abortAlgorithm be the following steps:
1934            // Note: steps are implemented at call site.
1935            rooted!(&in(cx) let abort_algorithm = AbortAlgorithm::StreamPiping(pipe_to.clone()));
1936
1937            // If signal is aborted, perform abortAlgorithm and return promise.
1938            if signal.aborted() {
1939                signal.run_abort_algorithm(cx, global, &abort_algorithm);
1940                return promise;
1941            }
1942
1943            // Add abortAlgorithm to signal.
1944            signal.add(&abort_algorithm);
1945        }
1946
1947        // Note: perfom checks now, since streams can start as closed or errored.
1948        pipe_to.check_and_propagate_errors_forward(cx, global);
1949        pipe_to.check_and_propagate_errors_backward(cx, global);
1950        pipe_to.check_and_propagate_closing_forward(cx, global);
1951        pipe_to.check_and_propagate_closing_backward(cx, global);
1952
1953        // If we are not closed or errored,
1954        if *pipe_to.state.borrow() == PipeToState::Starting {
1955            // Start the pipe, by waiting on the writer being ready for a chunk.
1956            pipe_to.wait_for_writer_ready(cx, global);
1957        }
1958
1959        // Return promise.
1960        promise
1961    }
1962
1963    /// <https://streams.spec.whatwg.org/#readable-stream-tee>
1964    pub(crate) fn tee(
1965        &self,
1966        cx: &mut js::context::JSContext,
1967        clone_for_branch_2: bool,
1968    ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1969        // Assert: stream implements ReadableStream.
1970        // Assert: cloneForBranch2 is a boolean.
1971
1972        match self.controller.borrow().as_ref() {
1973            Some(ControllerType::Default(_)) => {
1974                // Return ? ReadableStreamDefaultTee(stream, cloneForBranch2).
1975                self.default_tee(cx, clone_for_branch_2)
1976            },
1977            Some(ControllerType::Byte(_)) => {
1978                // If stream.[[controller]] implements ReadableByteStreamController,
1979                // return ? ReadableByteStreamTee(stream).
1980                self.byte_tee(cx)
1981            },
1982            None => {
1983                unreachable!("Stream should have a controller.");
1984            },
1985        }
1986    }
1987
1988    /// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller-from-underlying-source>
1989    fn set_up_byte_controller(
1990        &self,
1991        cx: &mut js::context::JSContext,
1992        global: &GlobalScope,
1993        underlying_source_dict: JsUnderlyingSource,
1994        underlying_source_handle: SafeHandleObject,
1995        stream: DomRoot<ReadableStream>,
1996        strategy_hwm: f64,
1997    ) -> Fallible<()> {
1998        // Let pullAlgorithm be an algorithm that returns a promise resolved with undefined.
1999        // Let cancelAlgorithm be an algorithm that returns a promise resolved with undefined.
2000        // If underlyingSourceDict["start"] exists, then set startAlgorithm to an algorithm which returns the result
2001        // of invoking underlyingSourceDict["start"] with argument list « controller »
2002        // and callback this value underlyingSource.
2003        // If underlyingSourceDict["pull"] exists, then set pullAlgorithm to an algorithm which returns the result
2004        // of invoking underlyingSourceDict["pull"] with argument list « controller »
2005        // and callback this value underlyingSource.
2006        // If underlyingSourceDict["cancel"] exists, then set cancelAlgorithm to an algorithm which takes an
2007        // argument reason and returns the result of invoking underlyingSourceDict["cancel"] with argument list
2008        // « reason » and callback this value underlyingSource.
2009
2010        // Let autoAllocateChunkSize be underlyingSourceDict["autoAllocateChunkSize"],
2011        // if it exists, or undefined otherwise.
2012        // If autoAllocateChunkSize is 0, then throw a TypeError exception.
2013        if let Some(0) = underlying_source_dict.autoAllocateChunkSize {
2014            return Err(Error::Type(c"autoAllocateChunkSize cannot be 0".to_owned()));
2015        }
2016
2017        let controller = ReadableByteStreamController::new(
2018            UnderlyingSourceType::Js(underlying_source_dict, Heap::default()),
2019            strategy_hwm,
2020            global,
2021            CanGc::from_cx(cx),
2022        );
2023
2024        // Note: this must be done before `setup`,
2025        // otherwise `thisOb` is null in the start callback.
2026        controller.set_underlying_source_this_object(underlying_source_handle);
2027
2028        // Perform ? SetUpReadableByteStreamController(stream, controller, startAlgorithm,
2029        // pullAlgorithm, cancelAlgorithm, highWaterMark, autoAllocateChunkSize).
2030        controller.setup(cx, global, stream)
2031    }
2032
2033    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2034    pub(crate) fn setup_cross_realm_transform_readable(
2035        &self,
2036        cx: &mut js::context::JSContext,
2037        port: &MessagePort,
2038    ) {
2039        let port_id = port.message_port_id();
2040        let global = self.global();
2041
2042        // Perform ! InitializeReadableStream(stream).
2043        // Done in `new_inherited`.
2044
2045        // Let sizeAlgorithm be an algorithm that returns 1.
2046        let size_algorithm =
2047            extract_size_algorithm(&QueuingStrategy::default(), CanGc::from_cx(cx));
2048
2049        // Note: other algorithms defined in the underlying source container.
2050
2051        // Let controller be a new ReadableStreamDefaultController.
2052        let controller = ReadableStreamDefaultController::new(
2053            &self.global(),
2054            UnderlyingSourceType::Transfer(Dom::from_ref(port)),
2055            0.,
2056            size_algorithm,
2057            CanGc::from_cx(cx),
2058        );
2059
2060        // Add a handler for port’s message event with the following steps:
2061        // Add a handler for port’s messageerror event with the following steps:
2062        rooted!(&in(cx) let cross_realm_transform_readable = CrossRealmTransformReadable {
2063            controller: Dom::from_ref(&controller),
2064        });
2065        global.note_cross_realm_transform_readable(&cross_realm_transform_readable, port_id);
2066
2067        // Enable port’s port message queue.
2068        port.Start(cx);
2069
2070        // Perform ! SetUpReadableStreamDefaultController
2071        controller
2072            .setup(cx, DomRoot::from_ref(self))
2073            .expect("Setting up controller for transfer cannot fail.");
2074    }
2075}
2076
2077impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
2078    /// <https://streams.spec.whatwg.org/#rs-constructor>
2079    fn Constructor(
2080        cx: &mut js::context::JSContext,
2081        global: &GlobalScope,
2082        proto: Option<SafeHandleObject>,
2083        underlying_source: Option<*mut JSObject>,
2084        strategy: &QueuingStrategy,
2085    ) -> Fallible<DomRoot<Self>> {
2086        // If underlyingSource is missing, set it to null.
2087        rooted!(&in(cx) let underlying_source_obj = underlying_source.unwrap_or(ptr::null_mut()));
2088        // Let underlyingSourceDict be underlyingSource,
2089        // converted to an IDL value of type UnderlyingSource.
2090        let underlying_source_dict = if !underlying_source_obj.is_null() {
2091            rooted!(&in(cx) let obj_val = ObjectValue(underlying_source_obj.get()));
2092            match JsUnderlyingSource::new(cx.into(), obj_val.handle(), CanGc::from_cx(cx)) {
2093                Ok(ConversionResult::Success(val)) => val,
2094                Ok(ConversionResult::Failure(error)) => {
2095                    return Err(Error::Type(error.into_owned()));
2096                },
2097                _ => {
2098                    return Err(Error::JSFailed);
2099                },
2100            }
2101        } else {
2102            JsUnderlyingSource::empty()
2103        };
2104
2105        // Perform ! InitializeReadableStream(this).
2106        let stream = ReadableStream::new_with_proto(global, proto, CanGc::from_cx(cx));
2107
2108        if underlying_source_dict.type_.is_some() {
2109            // If strategy["size"] exists, throw a RangeError exception.
2110            if strategy.size.is_some() {
2111                return Err(Error::Range(
2112                    c"size is not supported for byte streams".to_owned(),
2113                ));
2114            }
2115
2116            // Let highWaterMark be ? ExtractHighWaterMark(strategy, 0).
2117            let strategy_hwm = extract_high_water_mark(strategy, 0.0)?;
2118
2119            // Perform ? SetUpReadableByteStreamControllerFromUnderlyingSource(this,
2120            // underlyingSource, underlyingSourceDict, highWaterMark).
2121            stream.set_up_byte_controller(
2122                cx,
2123                global,
2124                underlying_source_dict,
2125                underlying_source_obj.handle(),
2126                stream.clone(),
2127                strategy_hwm,
2128            )?;
2129        } else {
2130            // Let highWaterMark be ? ExtractHighWaterMark(strategy, 1).
2131            let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
2132
2133            // Let sizeAlgorithm be ! ExtractSizeAlgorithm(strategy).
2134            let size_algorithm = extract_size_algorithm(strategy, CanGc::from_cx(cx));
2135
2136            let controller = ReadableStreamDefaultController::new(
2137                global,
2138                UnderlyingSourceType::Js(underlying_source_dict, Heap::default()),
2139                high_water_mark,
2140                size_algorithm,
2141                CanGc::from_cx(cx),
2142            );
2143
2144            // Note: this must be done before `setup`,
2145            // otherwise `thisOb` is null in the start callback.
2146            controller.set_underlying_source_this_object(underlying_source_obj.handle());
2147
2148            // Perform ? SetUpReadableStreamDefaultControllerFromUnderlyingSource
2149            controller.setup(cx, stream.clone())?;
2150        };
2151
2152        Ok(stream)
2153    }
2154
2155    /// <https://streams.spec.whatwg.org/#rs-locked>
2156    fn Locked(&self) -> bool {
2157        self.is_locked()
2158    }
2159
2160    /// <https://streams.spec.whatwg.org/#rs-cancel>
2161    fn Cancel(&self, cx: &mut js::context::JSContext, reason: SafeHandleValue) -> Rc<Promise> {
2162        let global = self.global();
2163        if self.is_locked() {
2164            // If ! IsReadableStreamLocked(this) is true,
2165            // return a promise rejected with a TypeError exception.
2166            let promise = Promise::new2(cx, &global);
2167            promise.reject_error(
2168                Error::Type(c"stream is locked".to_owned()),
2169                CanGc::from_cx(cx),
2170            );
2171            promise
2172        } else {
2173            // Return ! ReadableStreamCancel(this, reason).
2174            self.cancel(cx, &global, reason)
2175        }
2176    }
2177
2178    /// <https://streams.spec.whatwg.org/#rs-get-reader>
2179    fn GetReader(
2180        &self,
2181        options: &ReadableStreamGetReaderOptions,
2182        can_gc: CanGc,
2183    ) -> Fallible<ReadableStreamReader> {
2184        // 1, If options["mode"] does not exist, return ? AcquireReadableStreamDefaultReader(this).
2185        if options.mode.is_none() {
2186            return Ok(ReadableStreamReader::ReadableStreamDefaultReader(
2187                self.acquire_default_reader(can_gc)?,
2188            ));
2189        }
2190        // 2. Assert: options["mode"] is "byob".
2191        assert!(options.mode.unwrap() == ReadableStreamReaderMode::Byob);
2192
2193        // 3. Return ? AcquireReadableStreamBYOBReader(this).
2194        Ok(ReadableStreamReader::ReadableStreamBYOBReader(
2195            self.acquire_byob_reader(can_gc)?,
2196        ))
2197    }
2198
2199    /// <https://streams.spec.whatwg.org/#rs-tee>
2200    fn Tee(&self, cx: &mut js::context::JSContext) -> Fallible<Vec<DomRoot<ReadableStream>>> {
2201        // Return ? ReadableStreamTee(this, false).
2202        self.tee(cx, false)
2203    }
2204
2205    /// <https://streams.spec.whatwg.org/#rs-pipe-to>
2206    fn PipeTo(
2207        &self,
2208        cx: &mut CurrentRealm,
2209        destination: &WritableStream,
2210        options: &StreamPipeOptions,
2211    ) -> Rc<Promise> {
2212        let global = self.global();
2213
2214        // If ! IsReadableStreamLocked(this) is true,
2215        if self.is_locked() {
2216            // return a promise rejected with a TypeError exception.
2217            let promise = Promise::new2(cx, &global);
2218            promise.reject_error(
2219                Error::Type(c"Source stream is locked".to_owned()),
2220                CanGc::from_cx(cx),
2221            );
2222            return promise;
2223        }
2224
2225        // If ! IsWritableStreamLocked(destination) is true,
2226        if destination.is_locked() {
2227            // return a promise rejected with a TypeError exception.
2228            let promise = Promise::new2(cx, &global);
2229            promise.reject_error(
2230                Error::Type(c"Destination stream is locked".to_owned()),
2231                CanGc::from_cx(cx),
2232            );
2233            return promise;
2234        }
2235
2236        // Let signal be options["signal"] if it exists, or undefined otherwise.
2237        let signal = options.signal.as_deref();
2238
2239        // Return ! ReadableStreamPipeTo.
2240        self.pipe_to(
2241            cx,
2242            &global,
2243            destination,
2244            options.preventClose,
2245            options.preventAbort,
2246            options.preventCancel,
2247            signal,
2248        )
2249    }
2250
2251    /// <https://streams.spec.whatwg.org/#rs-pipe-through>
2252    fn PipeThrough(
2253        &self,
2254        cx: &mut CurrentRealm,
2255        transform: &ReadableWritablePair,
2256        options: &StreamPipeOptions,
2257    ) -> Fallible<DomRoot<ReadableStream>> {
2258        let global = self.global();
2259
2260        // If ! IsReadableStreamLocked(this) is true, throw a TypeError exception.
2261        if self.is_locked() {
2262            return Err(Error::Type(c"Source stream is locked".to_owned()));
2263        }
2264
2265        // If ! IsWritableStreamLocked(transform["writable"]) is true, throw a TypeError exception.
2266        if transform.writable.is_locked() {
2267            return Err(Error::Type(c"Destination stream is locked".to_owned()));
2268        }
2269
2270        // Let signal be options["signal"] if it exists, or undefined otherwise.
2271        let signal = options.signal.as_deref();
2272
2273        // Let promise be ! ReadableStreamPipeTo(this, transform["writable"],
2274        // options["preventClose"], options["preventAbort"], options["preventCancel"], signal).
2275        let promise = self.pipe_to(
2276            cx,
2277            &global,
2278            &transform.writable,
2279            options.preventClose,
2280            options.preventAbort,
2281            options.preventCancel,
2282            signal,
2283        );
2284
2285        // Set promise.[[PromiseIsHandled]] to true.
2286        promise.set_promise_is_handled();
2287
2288        // Return transform["readable"].
2289        Ok(transform.readable.clone())
2290    }
2291}
2292
2293#[expect(unsafe_code)]
2294/// The initial steps for the message handler for both readable and writable cross realm transforms.
2295/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2296/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
2297pub(crate) unsafe fn get_type_and_value_from_message(
2298    cx: SafeJSContext,
2299    data: SafeHandleValue,
2300    value: SafeMutableHandleValue,
2301    can_gc: CanGc,
2302) -> DOMString {
2303    // Let data be the data of the message.
2304    // Note: we are passed the data as argument,
2305    // which originates in the return value of `structuredclone::read`.
2306
2307    // Assert: data is an Object.
2308    assert!(data.is_object());
2309    rooted!(in(*cx) let data_object = data.to_object());
2310
2311    // Let type be ! Get(data, "type").
2312    rooted!(in(*cx) let mut type_ = UndefinedValue());
2313    unsafe {
2314        get_dictionary_property(
2315            *cx,
2316            data_object.handle(),
2317            c"type",
2318            type_.handle_mut(),
2319            can_gc,
2320        )
2321    }
2322    .expect("Getting the type should not fail.");
2323
2324    // Let value be ! Get(data, "value").
2325    unsafe { get_dictionary_property(*cx, data_object.handle(), c"value", value, can_gc) }
2326        .expect("Getting the value should not fail.");
2327
2328    // Assert: type is a String.
2329    let result =
2330        DOMString::safe_from_jsval(cx, type_.handle(), StringificationBehavior::Empty, can_gc)
2331            .expect("The type of the message should be a string");
2332    let ConversionResult::Success(type_string) = result else {
2333        unreachable!("The type of the message should be a string");
2334    };
2335
2336    type_string
2337}
2338
2339impl js::gc::Rootable for CrossRealmTransformReadable {}
2340
2341/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2342/// A wrapper to handle `message` and `messageerror` events
2343/// for the port used by the transfered stream.
2344#[derive(Clone, JSTraceable, MallocSizeOf)]
2345#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
2346pub(crate) struct CrossRealmTransformReadable {
2347    /// The controller used in the algorithm.
2348    controller: Dom<ReadableStreamDefaultController>,
2349}
2350
2351impl CrossRealmTransformReadable {
2352    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2353    /// Add a handler for port’s message event with the following steps:
2354    #[expect(unsafe_code)]
2355    pub(crate) fn handle_message(
2356        &self,
2357        cx: &mut CurrentRealm,
2358        global: &GlobalScope,
2359        port: &MessagePort,
2360        message: SafeHandleValue,
2361    ) {
2362        rooted!(&in(cx) let mut value = UndefinedValue());
2363        let type_string = unsafe {
2364            get_type_and_value_from_message(
2365                cx.into(),
2366                message,
2367                value.handle_mut(),
2368                CanGc::from_cx(cx),
2369            )
2370        };
2371
2372        // If type is "chunk",
2373        if type_string == "chunk" {
2374            // Perform ! ReadableStreamDefaultControllerEnqueue(controller, value).
2375            self.controller
2376                .enqueue(cx, value.handle())
2377                .expect("Enqueing a chunk should not fail.");
2378        }
2379
2380        // Otherwise, if type is "close",
2381        if type_string == "close" {
2382            // Perform ! ReadableStreamDefaultControllerClose(controller).
2383            self.controller.close(CanGc::from_cx(cx));
2384
2385            // Disentangle port.
2386            global.disentangle_port(cx, port);
2387        }
2388
2389        // Otherwise, if type is "error",
2390        if type_string == "error" {
2391            // Perform ! ReadableStreamDefaultControllerError(controller, value).
2392            self.controller.error(value.handle(), CanGc::from_cx(cx));
2393
2394            // Disentangle port.
2395            global.disentangle_port(cx, port);
2396        }
2397    }
2398
2399    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
2400    /// Add a handler for port’s messageerror event with the following steps:
2401    pub(crate) fn handle_error(
2402        &self,
2403        cx: &mut CurrentRealm,
2404        global: &GlobalScope,
2405        port: &MessagePort,
2406    ) {
2407        // Let error be a new "DataCloneError" DOMException.
2408        let error = DOMException::new(global, DOMErrorName::DataCloneError, CanGc::from_cx(cx));
2409        rooted!(&in(cx) let mut rooted_error = UndefinedValue());
2410        error.safe_to_jsval(cx.into(), rooted_error.handle_mut(), CanGc::from_cx(cx));
2411
2412        // Perform ! CrossRealmTransformSendError(port, error).
2413        port.cross_realm_transform_send_error(rooted_error.handle(), CanGc::from_cx(cx));
2414
2415        // Perform ! ReadableStreamDefaultControllerError(controller, error).
2416        self.controller
2417            .error(rooted_error.handle(), CanGc::from_cx(cx));
2418
2419        // Disentangle port.
2420        global.disentangle_port(cx, port);
2421    }
2422}
2423
2424#[expect(unsafe_code)]
2425/// Get the `done` property of an object that a read promise resolved to.
2426pub(crate) fn get_read_promise_done(
2427    cx: SafeJSContext,
2428    v: &SafeHandleValue,
2429    can_gc: CanGc,
2430) -> Result<bool, Error> {
2431    if !v.is_object() {
2432        return Err(Error::Type(c"Unknown format for done property.".to_owned()));
2433    }
2434    unsafe {
2435        rooted!(in(*cx) let object = v.to_object());
2436        rooted!(in(*cx) let mut done = UndefinedValue());
2437        match get_dictionary_property(*cx, object.handle(), c"done", done.handle_mut(), can_gc) {
2438            Ok(true) => match bool::safe_from_jsval(cx, done.handle(), (), can_gc) {
2439                Ok(ConversionResult::Success(val)) => Ok(val),
2440                Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.into_owned())),
2441                _ => Err(Error::Type(c"Unknown format for done property.".to_owned())),
2442            },
2443            Ok(false) => Err(Error::Type(c"Promise has no done property.".to_owned())),
2444            Err(()) => Err(Error::JSFailed),
2445        }
2446    }
2447}
2448
2449#[expect(unsafe_code)]
2450/// Get the `value` property of an object that a read promise resolved to.
2451pub(crate) fn get_read_promise_bytes(
2452    cx: SafeJSContext,
2453    v: &SafeHandleValue,
2454    can_gc: CanGc,
2455) -> Result<Vec<u8>, Error> {
2456    if !v.is_object() {
2457        return Err(Error::Type(
2458            c"Unknown format for for bytes read.".to_owned(),
2459        ));
2460    }
2461    unsafe {
2462        rooted!(in(*cx) let object = v.to_object());
2463        rooted!(in(*cx) let mut bytes = UndefinedValue());
2464        match get_dictionary_property(*cx, object.handle(), c"value", bytes.handle_mut(), can_gc) {
2465            Ok(true) => {
2466                match Vec::<u8>::safe_from_jsval(
2467                    cx,
2468                    bytes.handle(),
2469                    ConversionBehavior::EnforceRange,
2470                    can_gc,
2471                ) {
2472                    Ok(ConversionResult::Success(val)) => Ok(val),
2473                    Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.into_owned())),
2474                    _ => Err(Error::Type(c"Unknown format for bytes read.".to_owned())),
2475                }
2476            },
2477            Ok(false) => Err(Error::Type(c"Promise has no value property.".to_owned())),
2478            Err(()) => Err(Error::JSFailed),
2479        }
2480    }
2481}
2482
2483/// Convert a raw stream `chunk` JS value to `Vec<u8>`.
2484/// This mirrors the conversion used inside `get_read_promise_bytes`,
2485/// but operates on the raw chunk (no `{ value, done }` wrapper).
2486pub(crate) fn bytes_from_chunk_jsval(
2487    cx: SafeJSContext,
2488    chunk: &RootedTraceableBox<Heap<JSVal>>,
2489    can_gc: CanGc,
2490) -> Result<Vec<u8>, Error> {
2491    match Vec::<u8>::safe_from_jsval(cx, chunk.handle(), ConversionBehavior::EnforceRange, can_gc) {
2492        Ok(ConversionResult::Success(vec)) => Ok(vec),
2493        Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.into_owned())),
2494        _ => Err(Error::Type(c"Unknown format for bytes read.".to_owned())),
2495    }
2496}
2497
2498/// <https://streams.spec.whatwg.org/#rs-transfer>
2499impl Transferable for ReadableStream {
2500    type Index = MessagePortIndex;
2501    type Data = MessagePortImpl;
2502
2503    /// <https://streams.spec.whatwg.org/#ref-for-transfer-steps>
2504    fn transfer(
2505        &self,
2506        cx: &mut js::context::JSContext,
2507    ) -> Fallible<(MessagePortId, MessagePortImpl)> {
2508        // Step 1. If ! IsReadableStreamLocked(value) is true, throw a
2509        // "DataCloneError" DOMException.
2510        if self.is_locked() {
2511            return Err(Error::DataClone(None));
2512        }
2513
2514        let global = self.global();
2515        let mut realm = enter_auto_realm(cx, &*global);
2516        let mut realm = realm.current_realm();
2517        let cx = &mut realm;
2518
2519        // Step 2. Let port1 be a new MessagePort in the current Realm.
2520        let port_1 = MessagePort::new(&global, CanGc::from_cx(cx));
2521        global.track_message_port(&port_1, None);
2522
2523        // Step 3. Let port2 be a new MessagePort in the current Realm.
2524        let port_2 = MessagePort::new(&global, CanGc::from_cx(cx));
2525        global.track_message_port(&port_2, None);
2526
2527        // Step 4. Entangle port1 and port2.
2528        global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
2529
2530        // Step 5. Let writable be a new WritableStream in the current Realm.
2531        let writable = WritableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
2532
2533        // Step 6. Perform ! SetUpCrossRealmTransformWritable(writable, port1).
2534        writable.setup_cross_realm_transform_writable(cx, &port_1);
2535
2536        // Step 7. Let promise be ! ReadableStreamPipeTo(value, writable, false, false, false).
2537        let promise = self.pipe_to(cx, &global, &writable, false, false, false, None);
2538
2539        // Step 8. Set promise.[[PromiseIsHandled]] to true.
2540        promise.set_promise_is_handled();
2541
2542        // Step 9. Set dataHolder.[[port]] to ! StructuredSerializeWithTransfer(port2, « port2 »).
2543        port_2.transfer(cx)
2544    }
2545
2546    /// <https://streams.spec.whatwg.org/#ref-for-transfer-receiving-steps>
2547    fn transfer_receive(
2548        cx: &mut js::context::JSContext,
2549        owner: &GlobalScope,
2550        id: MessagePortId,
2551        port_impl: MessagePortImpl,
2552    ) -> Result<DomRoot<Self>, ()> {
2553        // Their transfer-receiving steps, given dataHolder and value, are:
2554        // Note: dataHolder is used in `structuredclone.rs`, and value is created here.
2555        let value = ReadableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
2556
2557        // Step 1. Let deserializedRecord be !
2558        // StructuredDeserializeWithTransfer(dataHolder.[[port]], the current
2559        // Realm).
2560        // Done with the `Deserialize` derive of `MessagePortImpl`.
2561
2562        // Step 2. Let port be deserializedRecord.[[Deserialized]].
2563        let transferred_port = MessagePort::transfer_receive(cx, owner, id, port_impl)?;
2564
2565        // Step 3. Perform ! SetUpCrossRealmTransformReadable(value, port).
2566        value.setup_cross_realm_transform_readable(cx, &transferred_port);
2567        Ok(value)
2568    }
2569
2570    /// Note: we are relying on the port transfer, so the data returned here are related to the port.
2571    fn serialized_storage<'a>(
2572        data: StructuredData<'a, '_>,
2573    ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
2574        match data {
2575            StructuredData::Reader(r) => &mut r.port_impls,
2576            StructuredData::Writer(w) => &mut w.ports,
2577        }
2578    }
2579}