script/dom/stream/
readablestream.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr::{self};
8use std::rc::Rc;
9
10use servo_base::generic_channel::GenericSharedMemory;
11use servo_base::id::{MessagePortId, MessagePortIndex};
12use servo_constellation_traits::MessagePortImpl;
13use dom_struct::dom_struct;
14use js::jsapi::{Heap, JSObject};
15use js::jsval::{JSVal, ObjectValue, UndefinedValue};
16use js::realm::CurrentRealm;
17use js::rust::{
18    HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
19    MutableHandleValue as SafeMutableHandleValue,
20};
21use js::typedarray::ArrayBufferViewU8;
22use rustc_hash::FxHashMap;
23use script_bindings::conversions::SafeToJSValConvertible;
24
25use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
26use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{
27    ReadableStreamGetReaderOptions, ReadableStreamMethods, ReadableStreamReaderMode,
28    ReadableWritablePair, StreamPipeOptions,
29};
30use script_bindings::str::DOMString;
31
32use crate::dom::domexception::{DOMErrorName, DOMException};
33use script_bindings::conversions::{is_array_like, StringificationBehavior};
34use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
35use crate::dom::abortsignal::{AbortAlgorithm, AbortSignal};
36use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods;
37use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultController_Binding::ReadableStreamDefaultControllerMethods;
38use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource;
39use crate::dom::bindings::conversions::{ConversionBehavior, ConversionResult, SafeFromJSValConvertible};
40use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
41use crate::dom::bindings::codegen::GenericBindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriter_Binding::WritableStreamDefaultWriterMethods;
42use crate::dom::stream::writablestream::WritableStream;
43use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultReaderOrReadableStreamBYOBReader as ReadableStreamReader;
44use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
45use crate::dom::bindings::root::{DomRoot, MutNullableDom, Dom};
46use crate::dom::bindings::trace::RootedTraceableBox;
47use crate::dom::bindings::utils::get_dictionary_property;
48use crate::dom::stream::byteteeunderlyingsource::{ByteTeeCancelAlgorithm, ByteTeePullAlgorithm, ByteTeeUnderlyingSource};
49use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
50use crate::dom::stream::readablestreamgenericreader::ReadableStreamGenericReader;
51use crate::dom::globalscope::GlobalScope;
52use crate::dom::promise::{wait_for_all_promise, Promise};
53use crate::dom::stream::readablebytestreamcontroller::ReadableByteStreamController;
54use crate::dom::stream::readablestreambyobreader::ReadableStreamBYOBReader;
55use crate::dom::stream::readablestreamdefaultcontroller::ReadableStreamDefaultController;
56use crate::dom::stream::readablestreamdefaultreader::{ReadRequest, ReadableStreamDefaultReader};
57use crate::dom::stream::defaultteeunderlyingsource::DefaultTeeCancelAlgorithm;
58use crate::dom::types::DefaultTeeUnderlyingSource;
59use crate::dom::stream::underlyingsourcecontainer::UnderlyingSourceType;
60use crate::dom::stream::writablestreamdefaultwriter::WritableStreamDefaultWriter;
61use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
62use crate::dom::messageport::MessagePort;
63use crate::realms::{enter_realm, InRealm, enter_auto_realm};
64use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
65use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
66use crate::dom::bindings::transferable::Transferable;
67use crate::dom::bindings::structuredclone::StructuredData;
68
69use crate::dom::bindings::buffer_source::HeapBufferSource;
70use super::readablestreambyobreader::ReadIntoRequest;
71
72/// State Machine for `PipeTo`.
73#[derive(Clone, Debug, Default, MallocSizeOf, PartialEq)]
74enum PipeToState {
75    /// The starting state
76    #[default]
77    Starting,
78    /// Waiting for the writer to be ready
79    PendingReady,
80    /// Waiting for a read to resolve.
81    PendingRead,
82    /// Waiting for all pending writes to finish,
83    /// as part of shutting down with an optional action.
84    ShuttingDownWithPendingWrites(Option<ShutdownAction>),
85    /// When shutting down with an action,
86    /// waiting for the action to complete,
87    /// at which point we can `finalize`.
88    ShuttingDownPendingAction,
89    /// The pipe has been finalized,
90    /// no further actions should be performed.
91    Finalized,
92}
93
94/// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action>
95#[derive(Clone, Debug, MallocSizeOf, PartialEq)]
96enum ShutdownAction {
97    /// <https://streams.spec.whatwg.org/#writable-stream-abort>
98    WritableStreamAbort,
99    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>
100    ReadableStreamCancel,
101    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close-with-error-propagation>
102    WritableStreamDefaultWriterCloseWithErrorPropagation,
103    /// <https://streams.spec.whatwg.org/#ref-for-rs-pipeTo-shutdown-with-action>
104    Abort,
105}
106
107impl js::gc::Rootable for PipeTo {}
108
109/// The "in parallel, but not really" part of
110/// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
111///
112/// Note: the spec is flexible about how this is done, but requires the following constraints to apply:
113/// - Public API must not be used: we'll only use Rust.
114/// - Backpressure must be enforced: we'll only read from source when dest is ready.
115/// - Shutdown must stop activity: we'll do this together with the below.
116/// - Error and close states must be propagated: we'll do this by checking these states at every step.
117#[derive(Clone, JSTraceable, MallocSizeOf)]
118#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
119pub(crate) struct PipeTo {
120    /// <https://streams.spec.whatwg.org/#ref-for-readablestream%E2%91%A7%E2%91%A0>
121    reader: Dom<ReadableStreamDefaultReader>,
122
123    /// <https://streams.spec.whatwg.org/#ref-for-acquire-writable-stream-default-writer>
124    writer: Dom<WritableStreamDefaultWriter>,
125
126    /// Pending writes are needed when shutting down(with an action),
127    /// because we can only finalize when all writes are finished.
128    #[ignore_malloc_size_of = "nested Rc"]
129    pending_writes: Rc<RefCell<VecDeque<Rc<Promise>>>>,
130
131    /// The state machine.
132    #[conditional_malloc_size_of]
133    #[no_trace]
134    state: Rc<RefCell<PipeToState>>,
135
136    /// <https://streams.spec.whatwg.org/#readablestream-pipe-to-preventabort>
137    prevent_abort: bool,
138
139    /// <https://streams.spec.whatwg.org/#readablestream-pipe-to-preventcancel>
140    prevent_cancel: bool,
141
142    /// <https://streams.spec.whatwg.org/#readablestream-pipe-to-preventclose>
143    prevent_close: bool,
144
145    /// The `shuttingDown` variable of
146    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
147    #[conditional_malloc_size_of]
148    shutting_down: Rc<Cell<bool>>,
149
150    /// The abort reason of the abort signal,
151    /// stored here because we must keep it across a microtask.
152    #[ignore_malloc_size_of = "mozjs"]
153    abort_reason: Rc<Heap<JSVal>>,
154
155    /// The error potentially passed to shutdown,
156    /// stored here because we must keep it across a microtask.
157    #[ignore_malloc_size_of = "mozjs"]
158    shutdown_error: Rc<RefCell<Option<Heap<JSVal>>>>,
159
160    /// The promise returned by a shutdown action.
161    /// We keep it to only continue when it is not pending anymore.
162    #[ignore_malloc_size_of = "nested Rc"]
163    shutdown_action_promise: Rc<RefCell<Option<Rc<Promise>>>>,
164
165    /// The promise resolved or rejected at
166    /// <https://streams.spec.whatwg.org/#rs-pipeTo-finalize>
167    #[conditional_malloc_size_of]
168    result_promise: Rc<Promise>,
169}
170
171impl PipeTo {
172    /// Run the `abortAlgorithm` defined at
173    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
174    pub(crate) fn abort_with_reason(
175        &self,
176        cx: &mut CurrentRealm,
177        global: &GlobalScope,
178        reason: SafeHandleValue,
179    ) {
180        // Abort should do nothing if we are already shutting down.
181        if self.shutting_down.get() {
182            return;
183        }
184
185        // Let error be signal’s abort reason.
186        // Note: storing it because it may need to be kept across a microtask,
187        // and see the note below as to why it is kept separately from `shutdown_error`.
188        self.abort_reason.set(reason.get());
189
190        // Note: setting the error now,
191        // will result in a rejection of the pipe promise, with this error.
192        // Unless any shutdown action raise their own error,
193        // in which case this error will be overwritten by the shutdown action error.
194        self.set_shutdown_error(reason);
195
196        // Let actions be an empty ordered set.
197        // Note: the actions are defined, and performed, inside `shutdown_with_an_action`.
198
199        // Shutdown with an action consisting of getting a promise to wait for all of the actions in actions,
200        // and with error.
201        self.shutdown(cx, global, Some(ShutdownAction::Abort));
202    }
203}
204
205impl Callback for PipeTo {
206    /// The pipe makes progress one microtask at a time.
207    /// Note: we use one struct as the callback for all promises,
208    /// and for both of their reactions.
209    ///
210    /// The context of the callback is determined from:
211    /// - the current state.
212    /// - the type of `result`.
213    /// - the state of a stored promise(in some cases).
214    #[expect(unsafe_code)]
215    fn callback(&self, cx: &mut CurrentRealm, result: SafeHandleValue) {
216        let in_realm_proof = cx.into();
217        let realm = InRealm::Already(&in_realm_proof);
218        let global = self.reader.global();
219
220        // Note: we only care about the result of writes when they are rejected,
221        // and the error is accessed not through handlers,
222        // but directly using `dest.get_stored_error`.
223        // So we must mark rejected promises as handled
224        // to prevent unhandled rejection errors.
225        self.pending_writes.borrow_mut().retain(|p| {
226            let pending = p.is_pending();
227            if !pending {
228                p.set_promise_is_handled();
229            }
230            pending
231        });
232
233        // Note: cloning to prevent re-borrow in methods called below.
234        let state_before_checks = self.state.borrow().clone();
235
236        // Note: if we are in a `PendingRead` state,
237        // and the source is closed,
238        // we try to write chunks before doing any shutdown,
239        // which is necessary to implement the
240        // "If any chunks have been read but not yet written, write them to dest."
241        // part of shutdown.
242        if state_before_checks == PipeToState::PendingRead {
243            let source = self.reader.get_stream().expect("Source stream must be set");
244            if source.is_closed() {
245                let dest = self
246                    .writer
247                    .get_stream()
248                    .expect("Destination stream must be set");
249
250                // If dest.[[state]] is "writable",
251                // and ! WritableStreamCloseQueuedOrInFlight(dest) is false,
252                if dest.is_writable() && !dest.close_queued_or_in_flight() {
253                    let Ok(done) = get_read_promise_done(cx.into(), &result, CanGc::from_cx(cx))
254                    else {
255                        // This is the case that the microtask ran in reaction
256                        // to the closed promise of the reader,
257                        // so we should wait for subsequent chunks,
258                        // and skip the shutdown below
259                        // (reader is closed, but there are still pending reads).
260                        // Shutdown will happen when the last chunk has been received.
261                        return;
262                    };
263
264                    if !done {
265                        // If any chunks have been read but not yet written, write them to dest.
266                        self.write_chunk(cx, &global, result);
267                    }
268                }
269            }
270        }
271
272        self.check_and_propagate_errors_forward(cx, &global);
273        self.check_and_propagate_errors_backward(cx, &global);
274        self.check_and_propagate_closing_forward(cx, &global);
275        self.check_and_propagate_closing_backward(cx, &global);
276
277        // Note: cloning to prevent re-borrow in methods called below.
278        let state = self.state.borrow().clone();
279
280        // If we switched to a shutdown state,
281        // return.
282        // Progress will be made at the next tick.
283        if state != state_before_checks {
284            return;
285        }
286
287        match state {
288            PipeToState::Starting => unreachable!("PipeTo should not be in the Starting state."),
289            PipeToState::PendingReady => {
290                // Read a chunk.
291                self.read_chunk(cx, &global);
292            },
293            PipeToState::PendingRead => {
294                // Write the chunk.
295                self.write_chunk(cx, &global, result);
296
297                // An early return is necessary if the write algorithm aborted the pipe.
298                if self.shutting_down.get() {
299                    return;
300                }
301
302                // Wait for the writer to be ready again.
303                self.wait_for_writer_ready(cx, &global);
304            },
305            PipeToState::ShuttingDownWithPendingWrites(action) => {
306                // Wait until every chunk that has been read has been written
307                // (i.e. the corresponding promises have settled).
308                if let Some(write) = self.pending_writes.borrow_mut().front().cloned() {
309                    self.wait_on_pending_write(&global, write, realm, CanGc::from_cx(cx));
310                    return;
311                }
312
313                // Note: error is stored in `self.shutdown_error`.
314                if let Some(action) = action {
315                    // Let p be the result of performing action.
316                    self.perform_action(cx, &global, action);
317                } else {
318                    // Finalize, passing along error if it was given.
319                    self.finalize(cx, &global);
320                }
321            },
322            PipeToState::ShuttingDownPendingAction => {
323                let Some(ref promise) = *self.shutdown_action_promise.borrow() else {
324                    unreachable!();
325                };
326                if promise.is_pending() {
327                    // While waiting for the action to complete,
328                    // we may get callbacks for other promises(closed, ready),
329                    // and we should ignore those.
330                    return;
331                }
332
333                let is_array_like = {
334                    if !result.is_object() {
335                        false
336                    } else {
337                        unsafe { is_array_like::<crate::DomTypeHolder>(cx.raw_cx(), result) }
338                    }
339                };
340
341                // Finalize, passing along error if it was given.
342                if !result.is_undefined() && !is_array_like {
343                    // Most actions either resolve with undefined,
344                    // or reject with an error,
345                    // and the error should be used when finalizing.
346                    // One exception is the `Abort` action,
347                    // which resolves with a list of undefined values.
348
349                    // If `result` isn't undefined or array-like,
350                    // then it is an error
351                    // and should overwrite the current shutdown error.
352                    self.set_shutdown_error(result);
353                }
354                self.finalize(cx, &global);
355            },
356            PipeToState::Finalized => {},
357        }
358    }
359}
360
361impl PipeTo {
362    /// Setting shutdown error in a way that ensures it isn't
363    /// moved after it has been set.
364    fn set_shutdown_error(&self, error: SafeHandleValue) {
365        *self.shutdown_error.borrow_mut() = Some(Heap::default());
366        let Some(ref heap) = *self.shutdown_error.borrow() else {
367            unreachable!("Option set to Some(heap) above.");
368        };
369        heap.set(error.get())
370    }
371
372    /// Wait for the writer to be ready,
373    /// which implements the constraint that backpressure must be enforced.
374    fn wait_for_writer_ready(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
375        let realm = cx.into();
376        let realm = InRealm::Already(&realm);
377
378        {
379            let mut state = self.state.borrow_mut();
380            *state = PipeToState::PendingReady;
381        }
382
383        let ready_promise = self.writer.Ready();
384        if ready_promise.is_fulfilled() {
385            self.read_chunk(cx, global);
386        } else {
387            let handler = PromiseNativeHandler::new(
388                global,
389                Some(Box::new(self.clone())),
390                Some(Box::new(self.clone())),
391                CanGc::from_cx(cx),
392            );
393            ready_promise.append_native_handler(&handler, realm, CanGc::from_cx(cx));
394
395            // Note: if the writer is not ready,
396            // in order to ensure progress we must
397            // also react to the closure of the source(because source may close empty).
398            let closed_promise = self.reader.Closed();
399            closed_promise.append_native_handler(&handler, realm, CanGc::from_cx(cx));
400        }
401    }
402
403    /// Read a chunk
404    fn read_chunk(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
405        let realm = cx.into();
406        let realm = InRealm::Already(&realm);
407
408        *self.state.borrow_mut() = PipeToState::PendingRead;
409        let chunk_promise = self.reader.Read(cx);
410        let handler = PromiseNativeHandler::new(
411            global,
412            Some(Box::new(self.clone())),
413            Some(Box::new(self.clone())),
414            CanGc::from_cx(cx),
415        );
416        chunk_promise.append_native_handler(&handler, realm, CanGc::from_cx(cx));
417
418        // Note: in order to ensure progress we must
419        // also react to the closure of the destination.
420        let ready_promise = self.writer.Closed();
421        ready_promise.append_native_handler(&handler, realm, CanGc::from_cx(cx));
422    }
423
424    /// Try to write a chunk using the jsval, and returns wether it succeeded
425    // It will fail if it is the last `done` chunk, or if it is not a chunk at all.
426    #[expect(unsafe_code)]
427    fn write_chunk(
428        &self,
429        cx: &mut js::context::JSContext,
430        global: &GlobalScope,
431        chunk: SafeHandleValue,
432    ) -> bool {
433        if chunk.is_object() {
434            rooted!(&in(cx) let object = chunk.to_object());
435            rooted!(&in(cx) let mut bytes = UndefinedValue());
436            let has_value = unsafe {
437                get_dictionary_property(
438                    cx.raw_cx(),
439                    object.handle(),
440                    c"value",
441                    bytes.handle_mut(),
442                    CanGc::from_cx(cx),
443                )
444                .expect("Chunk should have a value.")
445            };
446            if has_value {
447                // Write the chunk.
448                let write_promise = self.writer.write(cx, global, bytes.handle());
449                self.pending_writes.borrow_mut().push_back(write_promise);
450                return true;
451            }
452        }
453        false
454    }
455
456    /// Only as part of shutting-down do we wait on pending writes
457    /// (backpressure is communicated not through pending writes
458    /// but through the readiness of the writer).
459    fn wait_on_pending_write(
460        &self,
461        global: &GlobalScope,
462        promise: Rc<Promise>,
463        realm: InRealm,
464        can_gc: CanGc,
465    ) {
466        let handler = PromiseNativeHandler::new(
467            global,
468            Some(Box::new(self.clone())),
469            Some(Box::new(self.clone())),
470            can_gc,
471        );
472        promise.append_native_handler(&handler, realm, can_gc);
473    }
474
475    /// Errors must be propagated forward part of
476    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
477    fn check_and_propagate_errors_forward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
478        // An early return is necessary if we are shutting down,
479        // because in that case the source can already have been set to none.
480        if self.shutting_down.get() {
481            return;
482        }
483
484        // if source.[[state]] is or becomes "errored", then
485        let source = self
486            .reader
487            .get_stream()
488            .expect("Reader should still have a stream");
489        if source.is_errored() {
490            rooted!(&in(cx) let mut source_error = UndefinedValue());
491            source.get_stored_error(source_error.handle_mut());
492            self.set_shutdown_error(source_error.handle());
493
494            // If preventAbort is false,
495            if !self.prevent_abort {
496                // shutdown with an action of ! WritableStreamAbort(dest, source.[[storedError]])
497                // and with source.[[storedError]].
498                self.shutdown(cx, global, Some(ShutdownAction::WritableStreamAbort))
499            } else {
500                // Otherwise, shutdown with source.[[storedError]].
501                self.shutdown(cx, global, None);
502            }
503        }
504    }
505
506    /// Errors must be propagated backward part of
507    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
508    fn check_and_propagate_errors_backward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
509        // An early return is necessary if we are shutting down,
510        // because in that case the destination can already have been set to none.
511        if self.shutting_down.get() {
512            return;
513        }
514
515        // if dest.[[state]] is or becomes "errored", then
516        let dest = self
517            .writer
518            .get_stream()
519            .expect("Writer should still have a stream");
520        if dest.is_errored() {
521            rooted!(&in(cx) let mut dest_error = UndefinedValue());
522            dest.get_stored_error(dest_error.handle_mut());
523            self.set_shutdown_error(dest_error.handle());
524
525            // If preventCancel is false,
526            if !self.prevent_cancel {
527                // shutdown with an action of ! ReadableStreamCancel(source, dest.[[storedError]])
528                // and with dest.[[storedError]].
529                self.shutdown(cx, global, Some(ShutdownAction::ReadableStreamCancel))
530            } else {
531                // Otherwise, shutdown with dest.[[storedError]].
532                self.shutdown(cx, global, None);
533            }
534        }
535    }
536
537    /// Closing must be propagated forward part of
538    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
539    fn check_and_propagate_closing_forward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
540        // An early return is necessary if we are shutting down,
541        // because in that case the source can already have been set to none.
542        if self.shutting_down.get() {
543            return;
544        }
545
546        // if source.[[state]] is or becomes "closed", then
547        let source = self
548            .reader
549            .get_stream()
550            .expect("Reader should still have a stream");
551        if source.is_closed() {
552            // If preventClose is false,
553            if !self.prevent_close {
554                // shutdown with an action of ! WritableStreamAbort(dest, source.[[storedError]])
555                // and with source.[[storedError]].
556                self.shutdown(
557                    cx,
558                    global,
559                    Some(ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation),
560                )
561            } else {
562                // Otherwise, shutdown.
563                self.shutdown(cx, global, None);
564            }
565        }
566    }
567
568    /// Closing must be propagated backward part of
569    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
570    fn check_and_propagate_closing_backward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
571        // An early return is necessary if we are shutting down,
572        // because in that case the destination can already have been set to none.
573        if self.shutting_down.get() {
574            return;
575        }
576
577        // if ! WritableStreamCloseQueuedOrInFlight(dest) is true
578        // or dest.[[state]] is "closed"
579        let dest = self
580            .writer
581            .get_stream()
582            .expect("Writer should still have a stream");
583        if dest.close_queued_or_in_flight() || dest.is_closed() {
584            // Assert: no chunks have been read or written.
585            // Note: unclear how to perform this assertion.
586
587            // Let destClosed be a new TypeError.
588            rooted!(&in(cx) let mut dest_closed = UndefinedValue());
589            let error =
590                Error::Type(c"Destination is closed or has closed queued or in flight".to_owned());
591            error.to_jsval(
592                cx.into(),
593                global,
594                dest_closed.handle_mut(),
595                CanGc::from_cx(cx),
596            );
597            self.set_shutdown_error(dest_closed.handle());
598
599            // If preventCancel is false,
600            if !self.prevent_cancel {
601                // shutdown with an action of ! ReadableStreamCancel(source, destClosed)
602                // and with destClosed.
603                self.shutdown(cx, global, Some(ShutdownAction::ReadableStreamCancel))
604            } else {
605                // Otherwise, shutdown with destClosed.
606                self.shutdown(cx, global, None);
607            }
608        }
609    }
610
611    /// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action>
612    /// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown>
613    /// Combined into one method with an optional action.
614    fn shutdown(
615        &self,
616        cx: &mut CurrentRealm,
617        global: &GlobalScope,
618        action: Option<ShutdownAction>,
619    ) {
620        let realm = cx.into();
621        let realm = InRealm::Already(&realm);
622        // If shuttingDown is true, abort these substeps.
623        // Set shuttingDown to true.
624        if !self.shutting_down.replace(true) {
625            let dest = self.writer.get_stream().expect("Stream must be set");
626            // If dest.[[state]] is "writable",
627            // and ! WritableStreamCloseQueuedOrInFlight(dest) is false,
628            if dest.is_writable() && !dest.close_queued_or_in_flight() {
629                // If any chunks have been read but not yet written, write them to dest.
630                // Done at the top of `Callback`.
631
632                // Wait until every chunk that has been read has been written
633                // (i.e. the corresponding promises have settled).
634                if let Some(write) = self.pending_writes.borrow_mut().front() {
635                    *self.state.borrow_mut() = PipeToState::ShuttingDownWithPendingWrites(action);
636                    self.wait_on_pending_write(global, write.clone(), realm, CanGc::from_cx(cx));
637                    return;
638                }
639            }
640
641            // Note: error is stored in `self.shutdown_error`.
642            if let Some(action) = action {
643                // Let p be the result of performing action.
644                self.perform_action(cx, global, action);
645            } else {
646                // Finalize, passing along error if it was given.
647                self.finalize(cx, global);
648            }
649        }
650    }
651
652    /// The perform action part of
653    /// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action>
654    fn perform_action(&self, cx: &mut CurrentRealm, global: &GlobalScope, action: ShutdownAction) {
655        let realm = cx.into();
656        let realm = InRealm::Already(&realm);
657        rooted!(&in(cx) let mut error = UndefinedValue());
658        if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
659            error.set(shutdown_error.get());
660        }
661
662        *self.state.borrow_mut() = PipeToState::ShuttingDownPendingAction;
663
664        // Let p be the result of performing action.
665        let promise = match action {
666            ShutdownAction::WritableStreamAbort => {
667                let dest = self.writer.get_stream().expect("Stream must be set");
668                dest.abort(cx, global, error.handle())
669            },
670            ShutdownAction::ReadableStreamCancel => {
671                let source = self
672                    .reader
673                    .get_stream()
674                    .expect("Reader should have a stream.");
675                source.cancel(cx, global, error.handle())
676            },
677            ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation => {
678                self.writer.close_with_error_propagation(cx, global)
679            },
680            ShutdownAction::Abort => {
681                // Note: implementation of the `abortAlgorithm`
682                // of the signal associated with this piping operation.
683
684                // Let error be signal’s abort reason.
685                rooted!(&in(cx) let mut error = UndefinedValue());
686                error.set(self.abort_reason.get());
687
688                // Let actions be an empty ordered set.
689                let mut actions = vec![];
690
691                // If preventAbort is false, append the following action to actions:
692                if !self.prevent_abort {
693                    let dest = self
694                        .writer
695                        .get_stream()
696                        .expect("Destination stream must be set");
697
698                    // If dest.[[state]] is "writable",
699                    let promise = if dest.is_writable() {
700                        // return ! WritableStreamAbort(dest, error)
701                        dest.abort(cx, global, error.handle())
702                    } else {
703                        // Otherwise, return a promise resolved with undefined.
704                        Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
705                    };
706                    actions.push(promise);
707                }
708
709                // If preventCancel is false, append the following action action to actions:
710                if !self.prevent_cancel {
711                    let source = self.reader.get_stream().expect("Source stream must be set");
712
713                    // If source.[[state]] is "readable",
714                    let promise = if source.is_readable() {
715                        // return ! ReadableStreamCancel(source, error).
716                        source.cancel(cx, global, error.handle())
717                    } else {
718                        // Otherwise, return a promise resolved with undefined.
719                        Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
720                    };
721                    actions.push(promise);
722                }
723
724                // Shutdown with an action consisting
725                // of getting a promise to wait for all of the actions in actions,
726                // and with error.
727                wait_for_all_promise(cx.into(), global, actions, realm, CanGc::from_cx(cx))
728            },
729        };
730
731        // Upon fulfillment of p, finalize, passing along originalError if it was given.
732        // Upon rejection of p with reason newError, finalize with newError.
733        let handler = PromiseNativeHandler::new(
734            global,
735            Some(Box::new(self.clone())),
736            Some(Box::new(self.clone())),
737            CanGc::from_cx(cx),
738        );
739        promise.append_native_handler(&handler, realm, CanGc::from_cx(cx));
740        *self.shutdown_action_promise.borrow_mut() = Some(promise);
741    }
742
743    /// <https://streams.spec.whatwg.org/#rs-pipeTo-finalize>
744    fn finalize(&self, cx: &mut js::context::JSContext, global: &GlobalScope) {
745        *self.state.borrow_mut() = PipeToState::Finalized;
746
747        // Perform ! WritableStreamDefaultWriterRelease(writer).
748        self.writer.release(cx.into(), global, CanGc::from_cx(cx));
749
750        // If reader implements ReadableStreamBYOBReader,
751        // perform ! ReadableStreamBYOBReaderRelease(reader).
752        // TODO.
753
754        // Otherwise, perform ! ReadableStreamDefaultReaderRelease(reader).
755        self.reader
756            .release(CanGc::from_cx(cx))
757            .expect("Releasing the reader should not fail");
758
759        // If signal is not undefined, remove abortAlgorithm from signal.
760        // Note: since `self.shutdown` is true at this point,
761        // the abort algorithm is a no-op,
762        // so for now not implementing this step.
763
764        if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
765            rooted!(&in(cx) let mut error = UndefinedValue());
766            error.set(shutdown_error.get());
767            // If error was given, reject promise with error.
768            self.result_promise
769                .reject_native(&error.handle(), CanGc::from_cx(cx));
770        } else {
771            // Otherwise, resolve promise with undefined.
772            self.result_promise.resolve_native(&(), CanGc::from_cx(cx));
773        }
774    }
775}
776
777/// The fulfillment handler for the reacting to sourceCancelPromise part of
778/// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
779#[derive(Clone, JSTraceable, MallocSizeOf)]
780struct SourceCancelPromiseFulfillmentHandler {
781    #[conditional_malloc_size_of]
782    result: Rc<Promise>,
783}
784
785impl Callback for SourceCancelPromiseFulfillmentHandler {
786    /// The fulfillment handler for the reacting to sourceCancelPromise part of
787    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
788    /// An implementation of <https://webidl.spec.whatwg.org/#dfn-perform-steps-once-promise-is-settled>
789    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
790        self.result.resolve_native(&(), CanGc::from_cx(cx));
791    }
792}
793
794/// The rejection handler for the reacting to sourceCancelPromise part of
795/// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
796#[derive(Clone, JSTraceable, MallocSizeOf)]
797struct SourceCancelPromiseRejectionHandler {
798    #[conditional_malloc_size_of]
799    result: Rc<Promise>,
800}
801
802impl Callback for SourceCancelPromiseRejectionHandler {
803    /// The rejection handler for the reacting to sourceCancelPromise part of
804    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
805    /// An implementation of <https://webidl.spec.whatwg.org/#dfn-perform-steps-once-promise-is-settled>
806    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
807        self.result.reject_native(&v, CanGc::from_cx(cx));
808    }
809}
810
811/// <https://streams.spec.whatwg.org/#readablestream-state>
812#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf, PartialEq)]
813pub(crate) enum ReadableStreamState {
814    #[default]
815    Readable,
816    Closed,
817    Errored,
818}
819
820/// <https://streams.spec.whatwg.org/#readablestream-controller>
821#[derive(JSTraceable, MallocSizeOf)]
822#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
823pub(crate) enum ControllerType {
824    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller>
825    Byte(MutNullableDom<ReadableByteStreamController>),
826    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller>
827    Default(MutNullableDom<ReadableStreamDefaultController>),
828}
829
830/// <https://streams.spec.whatwg.org/#readablestream-readerr>
831#[derive(JSTraceable, MallocSizeOf)]
832#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
833pub(crate) enum ReaderType {
834    /// <https://streams.spec.whatwg.org/#readablestreambyobreader>
835    #[allow(clippy::upper_case_acronyms)]
836    BYOB(MutNullableDom<ReadableStreamBYOBReader>),
837    /// <https://streams.spec.whatwg.org/#readablestreamdefaultreader>
838    Default(MutNullableDom<ReadableStreamDefaultReader>),
839}
840
841impl Eq for ReaderType {}
842impl PartialEq for ReaderType {
843    fn eq(&self, other: &Self) -> bool {
844        matches!(
845            (self, other),
846            (ReaderType::BYOB(_), ReaderType::BYOB(_)) |
847                (ReaderType::Default(_), ReaderType::Default(_))
848        )
849    }
850}
851
852/// <https://streams.spec.whatwg.org/#create-readable-stream>
853#[cfg_attr(crown, expect(crown::unrooted_must_root))]
854pub(crate) fn create_readable_stream(
855    global: &GlobalScope,
856    underlying_source_type: UnderlyingSourceType,
857    queuing_strategy: Option<Rc<QueuingStrategySize>>,
858    high_water_mark: Option<f64>,
859    can_gc: CanGc,
860) -> DomRoot<ReadableStream> {
861    // If highWaterMark was not passed, set it to 1.
862    let high_water_mark = high_water_mark.unwrap_or(1.0);
863
864    // If sizeAlgorithm was not passed, set it to an algorithm that returns 1.
865    let size_algorithm =
866        queuing_strategy.unwrap_or(extract_size_algorithm(&QueuingStrategy::empty(), can_gc));
867
868    // Assert: ! IsNonNegativeNumber(highWaterMark) is true.
869    assert!(high_water_mark >= 0.0);
870
871    // Let stream be a new ReadableStream.
872    // Perform ! InitializeReadableStream(stream).
873    let stream = ReadableStream::new_with_proto(global, None, can_gc);
874
875    // Let controller be a new ReadableStreamDefaultController.
876    let controller = ReadableStreamDefaultController::new(
877        global,
878        underlying_source_type,
879        high_water_mark,
880        size_algorithm,
881        can_gc,
882    );
883
884    // Perform ? SetUpReadableStreamDefaultController(stream, controller, startAlgorithm,
885    // pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm).
886    controller
887        .setup(stream.clone(), can_gc)
888        .expect("Setup of default controller cannot fail");
889
890    // Return stream.
891    stream
892}
893
894/// <https://streams.spec.whatwg.org/#abstract-opdef-createreadablebytestream>
895#[cfg_attr(crown, expect(crown::unrooted_must_root))]
896pub(crate) fn readable_byte_stream_tee(
897    global: &GlobalScope,
898    underlying_source_type: UnderlyingSourceType,
899    can_gc: CanGc,
900) -> DomRoot<ReadableStream> {
901    // Let stream be a new ReadableStream.
902    // Perform ! InitializeReadableStream(stream).
903    let tee_stream = ReadableStream::new_with_proto(global, None, can_gc);
904
905    // Let controller be a new ReadableByteStreamController.
906    let controller = ReadableByteStreamController::new(underlying_source_type, 0.0, global, can_gc);
907
908    // Perform ? SetUpReadableByteStreamController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, 0, undefined).
909    controller
910        .setup(global, tee_stream.clone(), can_gc)
911        .expect("Setup of byte stream controller cannot fail");
912
913    // Return stream.
914    tee_stream
915}
916
917/// <https://streams.spec.whatwg.org/#rs-class>
918#[dom_struct]
919pub(crate) struct ReadableStream {
920    reflector_: Reflector,
921
922    /// <https://streams.spec.whatwg.org/#readablestream-controller>
923    /// Note: the inner `MutNullableDom` should really be an `Option<Dom>`,
924    /// because it is never unset once set.
925    controller: RefCell<Option<ControllerType>>,
926
927    /// <https://streams.spec.whatwg.org/#readablestream-storederror>
928    #[ignore_malloc_size_of = "mozjs"]
929    stored_error: Heap<JSVal>,
930
931    /// <https://streams.spec.whatwg.org/#readablestream-disturbed>
932    disturbed: Cell<bool>,
933
934    /// <https://streams.spec.whatwg.org/#readablestream-reader>
935    reader: RefCell<Option<ReaderType>>,
936
937    /// <https://streams.spec.whatwg.org/#readablestream-state>
938    state: Cell<ReadableStreamState>,
939}
940
941impl ReadableStream {
942    /// <https://streams.spec.whatwg.org/#initialize-readable-stream>
943    fn new_inherited() -> ReadableStream {
944        ReadableStream {
945            reflector_: Reflector::new(),
946            controller: RefCell::new(None),
947            stored_error: Heap::default(),
948            disturbed: Default::default(),
949            reader: RefCell::new(None),
950            state: Cell::new(Default::default()),
951        }
952    }
953
954    pub(crate) fn new_with_proto(
955        global: &GlobalScope,
956        proto: Option<SafeHandleObject>,
957        can_gc: CanGc,
958    ) -> DomRoot<ReadableStream> {
959        reflect_dom_object_with_proto(
960            Box::new(ReadableStream::new_inherited()),
961            global,
962            proto,
963            can_gc,
964        )
965    }
966
967    /// Used as part of
968    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
969    pub(crate) fn set_default_controller(&self, controller: &ReadableStreamDefaultController) {
970        *self.controller.borrow_mut() = Some(ControllerType::Default(MutNullableDom::new(Some(
971            controller,
972        ))));
973    }
974
975    /// Used as part of
976    /// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
977    pub(crate) fn set_byte_controller(&self, controller: &ReadableByteStreamController) {
978        *self.controller.borrow_mut() =
979            Some(ControllerType::Byte(MutNullableDom::new(Some(controller))));
980    }
981
982    /// Used as part of
983    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
984    pub(crate) fn assert_no_controller(&self) {
985        let has_no_controller = self.controller.borrow().is_none();
986        assert!(has_no_controller);
987    }
988
989    /// Build a stream backed by a Rust source that has already been read into memory.
990    pub(crate) fn new_from_bytes(
991        global: &GlobalScope,
992        bytes: Vec<u8>,
993        can_gc: CanGc,
994    ) -> Fallible<DomRoot<ReadableStream>> {
995        let stream = ReadableStream::new_with_external_underlying_source(
996            global,
997            UnderlyingSourceType::Memory(bytes.len()),
998            can_gc,
999        )?;
1000        stream.enqueue_native(bytes, can_gc);
1001        stream.controller_close_native(can_gc);
1002        Ok(stream)
1003    }
1004
1005    /// Build a stream backed by a Rust underlying source.
1006    /// Note: external sources are always paired with a default controller.
1007    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1008    pub(crate) fn new_with_external_underlying_source(
1009        global: &GlobalScope,
1010        source: UnderlyingSourceType,
1011        can_gc: CanGc,
1012    ) -> Fallible<DomRoot<ReadableStream>> {
1013        assert!(source.is_native());
1014        let stream = ReadableStream::new_with_proto(global, None, can_gc);
1015        let controller = ReadableStreamDefaultController::new(
1016            global,
1017            source,
1018            1.0,
1019            extract_size_algorithm(&QueuingStrategy::empty(), can_gc),
1020            can_gc,
1021        );
1022        controller.setup(stream.clone(), can_gc)?;
1023        Ok(stream)
1024    }
1025
1026    /// Call into the release steps of the controller,
1027    pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
1028        match self.controller.borrow().as_ref() {
1029            Some(ControllerType::Default(controller)) => {
1030                let controller = controller
1031                    .get()
1032                    .ok_or_else(|| Error::Type(c"Stream should have controller.".to_owned()))?;
1033                controller.perform_release_steps()
1034            },
1035            Some(ControllerType::Byte(controller)) => {
1036                let controller = controller
1037                    .get()
1038                    .ok_or_else(|| Error::Type(c"Stream should have controller.".to_owned()))?;
1039                controller.perform_release_steps()
1040            },
1041            None => Err(Error::Type(c"Stream should have controller.".to_owned())),
1042        }
1043    }
1044
1045    /// Call into the pull steps of the controller,
1046    /// as part of
1047    /// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read>
1048    pub(crate) fn perform_pull_steps(
1049        &self,
1050        cx: SafeJSContext,
1051        read_request: &ReadRequest,
1052        can_gc: CanGc,
1053    ) {
1054        match self.controller.borrow().as_ref() {
1055            Some(ControllerType::Default(controller)) => controller
1056                .get()
1057                .expect("Stream should have controller.")
1058                .perform_pull_steps(read_request, can_gc),
1059            Some(ControllerType::Byte(controller)) => controller
1060                .get()
1061                .expect("Stream should have controller.")
1062                .perform_pull_steps(cx, read_request, can_gc),
1063            None => {
1064                unreachable!("Stream does not have a controller.");
1065            },
1066        }
1067    }
1068
1069    /// Call into the pull steps of the controller,
1070    /// as part of
1071    /// <https://streams.spec.whatwg.org/#readable-stream-byob-reader-read>
1072    pub(crate) fn perform_pull_into(
1073        &self,
1074        cx: SafeJSContext,
1075        read_into_request: &ReadIntoRequest,
1076        view: HeapBufferSource<ArrayBufferViewU8>,
1077        min: u64,
1078        can_gc: CanGc,
1079    ) {
1080        match self.controller.borrow().as_ref() {
1081            Some(ControllerType::Byte(controller)) => controller
1082                .get()
1083                .expect("Stream should have controller.")
1084                .perform_pull_into(cx, read_into_request, view, min, can_gc),
1085            _ => {
1086                unreachable!(
1087                    "Pulling a chunk from a stream with a default controller using a BYOB reader"
1088                )
1089            },
1090        }
1091    }
1092
1093    /// <https://streams.spec.whatwg.org/#readable-stream-add-read-request>
1094    pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
1095        match self.reader.borrow().as_ref() {
1096            Some(ReaderType::Default(reader)) => {
1097                let Some(reader) = reader.get() else {
1098                    panic!("Attempt to add a read request without having first acquired a reader.");
1099                };
1100
1101                // Assert: stream.[[state]] is "readable".
1102                assert!(self.is_readable());
1103
1104                // Append readRequest to stream.[[reader]].[[readRequests]].
1105                reader.add_read_request(read_request);
1106            },
1107            _ => {
1108                unreachable!("Adding a read request can only be done on a default reader.")
1109            },
1110        }
1111    }
1112
1113    /// <https://streams.spec.whatwg.org/#readable-stream-add-read-into-request>
1114    pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
1115        match self.reader.borrow().as_ref() {
1116            // Assert: stream.[[reader]] implements ReadableStreamBYOBReader.
1117            Some(ReaderType::BYOB(reader)) => {
1118                let Some(reader) = reader.get() else {
1119                    unreachable!(
1120                        "Attempt to add a read into request without having first acquired a reader."
1121                    );
1122                };
1123
1124                // Assert: stream.[[state]] is "readable" or "closed".
1125                assert!(self.is_readable() || self.is_closed());
1126
1127                // Append readRequest to stream.[[reader]].[[readIntoRequests]].
1128                reader.add_read_into_request(read_request);
1129            },
1130            _ => {
1131                unreachable!("Adding a read into request can only be done on a BYOB reader.")
1132            },
1133        }
1134    }
1135
1136    /// Endpoint to enqueue chunks directly from Rust.
1137    /// Note: in other use cases this call happens via the controller.
1138    pub(crate) fn enqueue_native(&self, bytes: Vec<u8>, can_gc: CanGc) {
1139        match self.controller.borrow().as_ref() {
1140            Some(ControllerType::Default(controller)) => controller
1141                .get()
1142                .expect("Stream should have controller.")
1143                .enqueue_native(bytes, can_gc),
1144            _ => {
1145                unreachable!(
1146                    "Enqueueing chunk to a stream from Rust on other than default controller"
1147                );
1148            },
1149        }
1150    }
1151
1152    /// <https://streams.spec.whatwg.org/#readable-stream-error>
1153    pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
1154        // Assert: stream.[[state]] is "readable".
1155        assert!(self.is_readable());
1156
1157        // Set stream.[[state]] to "errored".
1158        self.state.set(ReadableStreamState::Errored);
1159
1160        // Set stream.[[storedError]] to e.
1161        self.stored_error.set(e.get());
1162
1163        // Let reader be stream.[[reader]].
1164
1165        let default_reader = {
1166            let reader_ref = self.reader.borrow();
1167            match reader_ref.as_ref() {
1168                Some(ReaderType::Default(reader)) => reader.get(),
1169                _ => None,
1170            }
1171        };
1172
1173        if let Some(reader) = default_reader {
1174            // Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, e).
1175            reader.error(e, can_gc);
1176            return;
1177        }
1178
1179        let byob_reader = {
1180            let reader_ref = self.reader.borrow();
1181            match reader_ref.as_ref() {
1182                Some(ReaderType::BYOB(reader)) => reader.get(),
1183                _ => None,
1184            }
1185        };
1186
1187        if let Some(reader) = byob_reader {
1188            // Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e).
1189            reader.error_read_into_requests(e, can_gc);
1190        }
1191
1192        // If reader is undefined, return.
1193    }
1194
1195    /// <https://streams.spec.whatwg.org/#readablestream-storederror>
1196    pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
1197        handle_mut.set(self.stored_error.get());
1198    }
1199
1200    /// <https://streams.spec.whatwg.org/#readable-stream-error>
1201    /// Note: in other use cases this call happens via the controller.
1202    pub(crate) fn error_native(&self, error: Error, can_gc: CanGc) {
1203        let cx = GlobalScope::get_cx();
1204        rooted!(in(*cx) let mut error_val = UndefinedValue());
1205        error.to_jsval(cx, &self.global(), error_val.handle_mut(), can_gc);
1206        self.error(error_val.handle(), can_gc);
1207    }
1208
1209    /// Call into the controller's `Close` method.
1210    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-close>
1211    pub(crate) fn controller_close_native(&self, can_gc: CanGc) {
1212        match self.controller.borrow().as_ref() {
1213            Some(ControllerType::Default(controller)) => {
1214                let _ = controller
1215                    .get()
1216                    .expect("Stream should have controller.")
1217                    .Close(can_gc);
1218            },
1219            _ => {
1220                unreachable!("Native closing is only done on default controllers.")
1221            },
1222        }
1223    }
1224
1225    /// Returns a boolean reflecting whether the stream has all data in memory.
1226    /// Useful for native source integration only.
1227    pub(crate) fn in_memory(&self) -> bool {
1228        match self.controller.borrow().as_ref() {
1229            Some(ControllerType::Default(controller)) => controller
1230                .get()
1231                .expect("Stream should have controller.")
1232                .in_memory(),
1233            _ => {
1234                unreachable!(
1235                    "Checking if source is in memory for a stream with a non-default controller"
1236                )
1237            },
1238        }
1239    }
1240
1241    /// Return bytes for synchronous use, if the stream has all data in memory.
1242    /// Useful for native source integration only.
1243    pub(crate) fn get_in_memory_bytes(&self) -> Option<GenericSharedMemory> {
1244        match self.controller.borrow().as_ref() {
1245            Some(ControllerType::Default(controller)) => controller
1246                .get()
1247                .expect("Stream should have controller.")
1248                .get_in_memory_bytes()
1249                .as_deref()
1250                .map(GenericSharedMemory::from_bytes),
1251            _ => {
1252                unreachable!("Getting in-memory bytes for a stream with a non-default controller")
1253            },
1254        }
1255    }
1256
1257    /// Acquires a reader and locks the stream,
1258    /// must be done before `read_a_chunk`.
1259    /// Native call to
1260    /// <https://streams.spec.whatwg.org/#acquire-readable-stream-reader>
1261    pub(crate) fn acquire_default_reader(
1262        &self,
1263        can_gc: CanGc,
1264    ) -> Fallible<DomRoot<ReadableStreamDefaultReader>> {
1265        // Let reader be a new ReadableStreamDefaultReader.
1266        let reader = ReadableStreamDefaultReader::new(&self.global(), can_gc);
1267
1268        // Perform ? SetUpReadableStreamDefaultReader(reader, stream).
1269        reader.set_up(self, &self.global(), can_gc)?;
1270
1271        // Return reader.
1272        Ok(reader)
1273    }
1274
1275    /// <https://streams.spec.whatwg.org/#acquire-readable-stream-byob-reader>
1276    pub(crate) fn acquire_byob_reader(
1277        &self,
1278        can_gc: CanGc,
1279    ) -> Fallible<DomRoot<ReadableStreamBYOBReader>> {
1280        // Let reader be a new ReadableStreamBYOBReader.
1281        let reader = ReadableStreamBYOBReader::new(&self.global(), can_gc);
1282        // Perform ? SetUpReadableStreamBYOBReader(reader, stream).
1283        reader.set_up(self, &self.global(), can_gc)?;
1284
1285        // Return reader.
1286        Ok(reader)
1287    }
1288
1289    pub(crate) fn get_default_controller(&self) -> DomRoot<ReadableStreamDefaultController> {
1290        match self.controller.borrow().as_ref() {
1291            Some(ControllerType::Default(controller)) => {
1292                controller.get().expect("Stream should have controller.")
1293            },
1294            _ => {
1295                unreachable!(
1296                    "Getting default controller for a stream with a non-default controller"
1297                )
1298            },
1299        }
1300    }
1301
1302    pub(crate) fn get_byte_controller(&self) -> DomRoot<ReadableByteStreamController> {
1303        match self.controller.borrow().as_ref() {
1304            Some(ControllerType::Byte(controller)) => {
1305                controller.get().expect("Stream should have controller.")
1306            },
1307            _ => {
1308                unreachable!("Getting byte controller for a stream with a non-byte controller")
1309            },
1310        }
1311    }
1312
1313    pub(crate) fn get_default_reader(&self) -> DomRoot<ReadableStreamDefaultReader> {
1314        match self.reader.borrow().as_ref() {
1315            Some(ReaderType::Default(reader)) => reader.get().expect("Stream should have reader."),
1316            _ => {
1317                unreachable!("Getting default reader for a stream with a non-default reader")
1318            },
1319        }
1320    }
1321
1322    /// Read a chunk from the stream,
1323    /// must be called after `start_reading`,
1324    /// and before `stop_reading`.
1325    /// Native call to
1326    /// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read>
1327    pub(crate) fn read_a_chunk(&self, cx: &mut js::context::JSContext) -> Rc<Promise> {
1328        match self.reader.borrow().as_ref() {
1329            Some(ReaderType::Default(reader)) => {
1330                let Some(reader) = reader.get() else {
1331                    unreachable!(
1332                        "Attempt to read stream chunk without having first acquired a reader."
1333                    );
1334                };
1335                reader.Read(cx)
1336            },
1337            _ => {
1338                unreachable!("Native reading of a chunk can only be done with a default reader.")
1339            },
1340        }
1341    }
1342
1343    /// Releases the lock on the reader,
1344    /// must be done after `start_reading`.
1345    /// Native call to
1346    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultreaderrelease>
1347    pub(crate) fn stop_reading(&self, can_gc: CanGc) {
1348        let reader_ref = self.reader.borrow();
1349
1350        match reader_ref.as_ref() {
1351            Some(ReaderType::Default(reader)) => {
1352                let Some(reader) = reader.get() else {
1353                    unreachable!("Attempt to stop reading without having first acquired a reader.");
1354                };
1355
1356                drop(reader_ref);
1357                reader.release(can_gc).expect("Reader release cannot fail.");
1358            },
1359            _ => {
1360                unreachable!("Native stop reading can only be done with a default reader.")
1361            },
1362        }
1363    }
1364
1365    /// <https://streams.spec.whatwg.org/#is-readable-stream-locked>
1366    pub(crate) fn is_locked(&self) -> bool {
1367        match self.reader.borrow().as_ref() {
1368            Some(ReaderType::Default(reader)) => reader.get().is_some(),
1369            Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1370            None => false,
1371        }
1372    }
1373
1374    pub(crate) fn is_disturbed(&self) -> bool {
1375        self.disturbed.get()
1376    }
1377
1378    pub(crate) fn set_is_disturbed(&self, disturbed: bool) {
1379        self.disturbed.set(disturbed);
1380    }
1381
1382    pub(crate) fn is_closed(&self) -> bool {
1383        self.state.get() == ReadableStreamState::Closed
1384    }
1385
1386    pub(crate) fn is_errored(&self) -> bool {
1387        self.state.get() == ReadableStreamState::Errored
1388    }
1389
1390    pub(crate) fn is_readable(&self) -> bool {
1391        self.state.get() == ReadableStreamState::Readable
1392    }
1393
1394    pub(crate) fn has_default_reader(&self) -> bool {
1395        match self.reader.borrow().as_ref() {
1396            Some(ReaderType::Default(reader)) => reader.get().is_some(),
1397            _ => false,
1398        }
1399    }
1400
1401    pub(crate) fn has_byob_reader(&self) -> bool {
1402        match self.reader.borrow().as_ref() {
1403            Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1404            _ => false,
1405        }
1406    }
1407
1408    pub(crate) fn has_byte_controller(&self) -> bool {
1409        match self.controller.borrow().as_ref() {
1410            Some(ControllerType::Byte(controller)) => controller.get().is_some(),
1411            _ => false,
1412        }
1413    }
1414
1415    /// <https://streams.spec.whatwg.org/#readable-stream-get-num-read-requests>
1416    pub(crate) fn get_num_read_requests(&self) -> usize {
1417        match self.reader.borrow().as_ref() {
1418            Some(ReaderType::Default(reader)) => {
1419                let reader = reader
1420                    .get()
1421                    .expect("Stream must have a reader when getting the number of read requests.");
1422                reader.get_num_read_requests()
1423            },
1424            _ => unreachable!(
1425                "Stream must have a default reader when get num read requests is called into."
1426            ),
1427        }
1428    }
1429
1430    /// <https://streams.spec.whatwg.org/#readable-stream-get-num-read-into-requests>
1431    pub(crate) fn get_num_read_into_requests(&self) -> usize {
1432        assert!(self.has_byob_reader());
1433
1434        match self.reader.borrow().as_ref() {
1435            Some(ReaderType::BYOB(reader)) => {
1436                let Some(reader) = reader.get() else {
1437                    unreachable!(
1438                        "Stream must have a reader when get num read into requests is called into."
1439                    );
1440                };
1441                reader.get_num_read_into_requests()
1442            },
1443            _ => {
1444                unreachable!(
1445                    "Stream must have a BYOB reader when get num read into requests is called into."
1446                );
1447            },
1448        }
1449    }
1450
1451    /// <https://streams.spec.whatwg.org/#readable-stream-fulfill-read-request>
1452    pub(crate) fn fulfill_read_request(&self, chunk: SafeHandleValue, done: bool, can_gc: CanGc) {
1453        // step 1 - Assert: ! ReadableStreamHasDefaultReader(stream) is true.
1454        assert!(self.has_default_reader());
1455
1456        match self.reader.borrow().as_ref() {
1457            Some(ReaderType::Default(reader)) => {
1458                // step 2 - Let reader be stream.[[reader]].
1459                let reader = reader
1460                    .get()
1461                    .expect("Stream must have a reader when a read request is fulfilled.");
1462                // step 3 - Assert: reader.[[readRequests]] is not empty.
1463                assert_ne!(reader.get_num_read_requests(), 0);
1464                // step 4 & 5
1465                // Let readRequest be reader.[[readRequests]][0]. & Remove readRequest from reader.[[readRequests]].
1466                let request = reader.remove_read_request();
1467
1468                if done {
1469                    // step 6 - If done is true, perform readRequest’s close steps.
1470                    request.close_steps(can_gc);
1471                } else {
1472                    // step 7 - Otherwise, perform readRequest’s chunk steps, given chunk.
1473                    let result = RootedTraceableBox::new(Heap::default());
1474                    result.set(*chunk);
1475                    request.chunk_steps(result, &self.global(), can_gc);
1476                }
1477            },
1478            _ => {
1479                unreachable!(
1480                    "Stream must have a default reader when fulfill read requests is called into."
1481                );
1482            },
1483        }
1484    }
1485
1486    /// <https://streams.spec.whatwg.org/#readable-stream-fulfill-read-into-request>
1487    pub(crate) fn fulfill_read_into_request(
1488        &self,
1489        chunk: SafeHandleValue,
1490        done: bool,
1491        can_gc: CanGc,
1492    ) {
1493        // Assert: ! ReadableStreamHasBYOBReader(stream) is true.
1494        assert!(self.has_byob_reader());
1495
1496        // Let reader be stream.[[reader]].
1497        match self.reader.borrow().as_ref() {
1498            Some(ReaderType::BYOB(reader)) => {
1499                let Some(reader) = reader.get() else {
1500                    unreachable!(
1501                        "Stream must have a reader when a read into request is fulfilled."
1502                    );
1503                };
1504
1505                // Assert: reader.[[readIntoRequests]] is not empty.
1506                assert!(reader.get_num_read_into_requests() > 0);
1507
1508                // Let readIntoRequest be reader.[[readIntoRequests]][0].
1509                // Remove readIntoRequest from reader.[[readIntoRequests]].
1510                let read_into_request = reader.remove_read_into_request();
1511
1512                // If done is true, perform readIntoRequest’s close steps, given chunk.
1513                let result = RootedTraceableBox::new(Heap::default());
1514                if done {
1515                    result.set(*chunk);
1516                    read_into_request.close_steps(Some(result), can_gc);
1517                } else {
1518                    // Otherwise, perform readIntoRequest’s chunk steps, given chunk.
1519                    result.set(*chunk);
1520                    read_into_request.chunk_steps(result, can_gc);
1521                }
1522            },
1523            _ => {
1524                unreachable!(
1525                    "Stream must have a BYOB reader when fulfill read into requests is called into."
1526                );
1527            },
1528        };
1529    }
1530
1531    /// <https://streams.spec.whatwg.org/#readable-stream-close>
1532    pub(crate) fn close(&self, can_gc: CanGc) {
1533        // Assert: stream.[[state]] is "readable".
1534        assert!(self.is_readable());
1535        // Set stream.[[state]] to "closed".
1536        self.state.set(ReadableStreamState::Closed);
1537        // Let reader be stream.[[reader]].
1538
1539        // NOTE: do not hold the RefCell borrow across reader.close(),
1540        // or release() will panic when it tries to mut-borrow stream.reader.
1541        // So we pull out the underlying DOM reader in a local, then drop the borrow.
1542        let default_reader = {
1543            let reader_ref = self.reader.borrow();
1544            match reader_ref.as_ref() {
1545                Some(ReaderType::Default(reader)) => reader.get(),
1546                _ => None,
1547            }
1548        };
1549
1550        if let Some(reader) = default_reader {
1551            // steps 5 & 6 for a default reader
1552            reader.close(can_gc);
1553            return;
1554        }
1555
1556        // Same for BYOB reader.
1557        let byob_reader = {
1558            let reader_ref = self.reader.borrow();
1559            match reader_ref.as_ref() {
1560                Some(ReaderType::BYOB(reader)) => reader.get(),
1561                _ => None,
1562            }
1563        };
1564
1565        if let Some(reader) = byob_reader {
1566            // steps 5 & 6 for a BYOB reader
1567            reader.close(can_gc);
1568        }
1569
1570        // If reader is undefined, return.
1571    }
1572
1573    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>
1574    pub(crate) fn cancel(
1575        &self,
1576        cx: &mut js::context::JSContext,
1577        global: &GlobalScope,
1578        reason: SafeHandleValue,
1579    ) -> Rc<Promise> {
1580        // Set stream.[[disturbed]] to true.
1581        self.disturbed.set(true);
1582
1583        // If stream.[[state]] is "closed", return a promise resolved with undefined.
1584        if self.is_closed() {
1585            return Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
1586        }
1587        // If stream.[[state]] is "errored", return a promise rejected with stream.[[storedError]].
1588        if self.is_errored() {
1589            let promise = Promise::new2(cx, global);
1590            rooted!(&in(cx) let mut rval = UndefinedValue());
1591            self.stored_error
1592                .safe_to_jsval(cx.into(), rval.handle_mut(), CanGc::from_cx(cx));
1593            promise.reject_native(&rval.handle(), CanGc::from_cx(cx));
1594            return promise;
1595        }
1596        // Perform ! ReadableStreamClose(stream).
1597        self.close(CanGc::from_cx(cx));
1598
1599        // If reader is not undefined and reader implements ReadableStreamBYOBReader,
1600        let byob_reader = {
1601            let reader_ref = self.reader.borrow();
1602            match reader_ref.as_ref() {
1603                Some(ReaderType::BYOB(reader)) => reader.get(),
1604                _ => None,
1605            }
1606        };
1607
1608        if let Some(reader) = byob_reader {
1609            // step 6.1, 6.2 & 6.3 of https://streams.spec.whatwg.org/#readable-stream-cancel
1610            reader.cancel(CanGc::from_cx(cx));
1611        }
1612
1613        // Let sourceCancelPromise be ! stream.[[controller]].[[CancelSteps]](reason).
1614
1615        let source_cancel_promise = match self.controller.borrow().as_ref() {
1616            Some(ControllerType::Default(controller)) => controller
1617                .get()
1618                .expect("Stream should have controller.")
1619                .perform_cancel_steps(cx, global, reason),
1620            Some(ControllerType::Byte(controller)) => controller
1621                .get()
1622                .expect("Stream should have controller.")
1623                .perform_cancel_steps(cx, global, reason),
1624            None => {
1625                panic!("Stream does not have a controller.");
1626            },
1627        };
1628
1629        // Create a new promise,
1630        // and setup a handler in order to react to the fulfillment of sourceCancelPromise.
1631        let global = self.global();
1632        let result_promise = Promise::new2(cx, &global);
1633        let fulfillment_handler = Box::new(SourceCancelPromiseFulfillmentHandler {
1634            result: result_promise.clone(),
1635        });
1636        let rejection_handler = Box::new(SourceCancelPromiseRejectionHandler {
1637            result: result_promise.clone(),
1638        });
1639        let handler = PromiseNativeHandler::new(
1640            &global,
1641            Some(fulfillment_handler),
1642            Some(rejection_handler),
1643            CanGc::from_cx(cx),
1644        );
1645        let realm = enter_realm(&*global);
1646        let comp = InRealm::Entered(&realm);
1647        source_cancel_promise.append_native_handler(&handler, comp, CanGc::from_cx(cx));
1648
1649        // Return the result of reacting to sourceCancelPromise
1650        // with a fulfillment step that returns undefined.
1651        result_promise
1652    }
1653
1654    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1655    pub(crate) fn set_reader(&self, new_reader: Option<ReaderType>) {
1656        *self.reader.borrow_mut() = new_reader;
1657    }
1658
1659    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1660    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee>
1661    fn byte_tee(&self, can_gc: CanGc) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1662        // Assert: stream implements ReadableStream.
1663        // Assert: stream.[[controller]] implements ReadableByteStreamController.
1664
1665        // Let reader be ? AcquireReadableStreamDefaultReader(stream).
1666        let reader = self.acquire_default_reader(can_gc)?;
1667        let reader = Rc::new(RefCell::new(ReaderType::Default(MutNullableDom::new(
1668            Some(&reader),
1669        ))));
1670
1671        // Let reading be false.
1672        let reading = Rc::new(Cell::new(false));
1673
1674        // Let readAgainForBranch1 be false.
1675        let read_again_for_branch_1 = Rc::new(Cell::new(false));
1676
1677        // Let readAgainForBranch2 be false.
1678        let read_again_for_branch_2 = Rc::new(Cell::new(false));
1679
1680        // Let canceled1 be false.
1681        let canceled_1 = Rc::new(Cell::new(false));
1682
1683        // Let canceled2 be false.
1684        let canceled_2 = Rc::new(Cell::new(false));
1685
1686        // Let reason1 be undefined.
1687        let reason_1 = Rc::new(Heap::boxed(UndefinedValue()));
1688
1689        // Let reason2 be undefined.
1690        let reason_2 = Rc::new(Heap::boxed(UndefinedValue()));
1691
1692        // Let cancelPromise be a new promise.
1693        let cancel_promise = Promise::new(&self.global(), can_gc);
1694        let reader_version = Rc::new(Cell::new(0));
1695
1696        let byte_tee_source_1 = ByteTeeUnderlyingSource::new(
1697            reader.clone(),
1698            self,
1699            reading.clone(),
1700            read_again_for_branch_1.clone(),
1701            read_again_for_branch_2.clone(),
1702            canceled_1.clone(),
1703            canceled_2.clone(),
1704            reason_1.clone(),
1705            reason_2.clone(),
1706            cancel_promise.clone(),
1707            reader_version.clone(),
1708            ByteTeeCancelAlgorithm::Cancel1Algorithm,
1709            ByteTeePullAlgorithm::Pull1Algorithm,
1710            can_gc,
1711        );
1712
1713        let byte_tee_source_2 = ByteTeeUnderlyingSource::new(
1714            reader.clone(),
1715            self,
1716            reading,
1717            read_again_for_branch_1,
1718            read_again_for_branch_2,
1719            canceled_1,
1720            canceled_2,
1721            reason_1,
1722            reason_2,
1723            cancel_promise,
1724            reader_version,
1725            ByteTeeCancelAlgorithm::Cancel2Algorithm,
1726            ByteTeePullAlgorithm::Pull2Algorithm,
1727            can_gc,
1728        );
1729
1730        // Set branch1 to ! CreateReadableByteStream(startAlgorithm, pull1Algorithm, cancel1Algorithm).
1731        let branch_1 = readable_byte_stream_tee(
1732            &self.global(),
1733            UnderlyingSourceType::TeeByte(Dom::from_ref(&byte_tee_source_1)),
1734            can_gc,
1735        );
1736        byte_tee_source_1.set_branch_1(&branch_1);
1737        byte_tee_source_2.set_branch_1(&branch_1);
1738
1739        // Set branch2 to ! CreateReadableByteStream(startAlgorithm, pull2Algorithm, cancel2Algorithm).
1740        let branch_2 = readable_byte_stream_tee(
1741            &self.global(),
1742            UnderlyingSourceType::TeeByte(Dom::from_ref(&byte_tee_source_2)),
1743            can_gc,
1744        );
1745        byte_tee_source_1.set_branch_2(&branch_2);
1746        byte_tee_source_2.set_branch_2(&branch_2);
1747
1748        // Perform forwardReaderError, given reader.
1749        byte_tee_source_1.forward_reader_error(reader.clone(), can_gc);
1750        byte_tee_source_2.forward_reader_error(reader, can_gc);
1751
1752        // Return « branch1, branch2 ».
1753        Ok(vec![branch_1, branch_2])
1754    }
1755
1756    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee>
1757    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1758    fn default_tee(
1759        &self,
1760        clone_for_branch_2: bool,
1761        can_gc: CanGc,
1762    ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1763        // Assert: stream implements ReadableStream.
1764
1765        // Assert: cloneForBranch2 is a boolean.
1766        let clone_for_branch_2 = Rc::new(Cell::new(clone_for_branch_2));
1767
1768        // Let reader be ? AcquireReadableStreamDefaultReader(stream).
1769        let reader = self.acquire_default_reader(can_gc)?;
1770
1771        // Let reading be false.
1772        let reading = Rc::new(Cell::new(false));
1773        // Let readAgain be false.
1774        let read_again = Rc::new(Cell::new(false));
1775        // Let canceled1 be false.
1776        let canceled_1 = Rc::new(Cell::new(false));
1777        // Let canceled2 be false.
1778        let canceled_2 = Rc::new(Cell::new(false));
1779
1780        // Let reason1 be undefined.
1781        let reason_1 = Rc::new(Heap::boxed(UndefinedValue()));
1782        // Let reason2 be undefined.
1783        let reason_2 = Rc::new(Heap::boxed(UndefinedValue()));
1784        // Let cancelPromise be a new promise.
1785        let cancel_promise = Promise::new(&self.global(), can_gc);
1786
1787        let tee_source_1 = DefaultTeeUnderlyingSource::new(
1788            &reader,
1789            self,
1790            reading.clone(),
1791            read_again.clone(),
1792            canceled_1.clone(),
1793            canceled_2.clone(),
1794            clone_for_branch_2.clone(),
1795            reason_1.clone(),
1796            reason_2.clone(),
1797            cancel_promise.clone(),
1798            DefaultTeeCancelAlgorithm::Cancel1Algorithm,
1799            can_gc,
1800        );
1801
1802        let underlying_source_type_branch_1 =
1803            UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_1));
1804
1805        let tee_source_2 = DefaultTeeUnderlyingSource::new(
1806            &reader,
1807            self,
1808            reading,
1809            read_again,
1810            canceled_1.clone(),
1811            canceled_2.clone(),
1812            clone_for_branch_2,
1813            reason_1,
1814            reason_2,
1815            cancel_promise.clone(),
1816            DefaultTeeCancelAlgorithm::Cancel2Algorithm,
1817            can_gc,
1818        );
1819
1820        let underlying_source_type_branch_2 =
1821            UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_2));
1822
1823        // Set branch_1 to ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel1Algorithm).
1824        let branch_1 = create_readable_stream(
1825            &self.global(),
1826            underlying_source_type_branch_1,
1827            None,
1828            None,
1829            can_gc,
1830        );
1831        tee_source_1.set_branch_1(&branch_1);
1832        tee_source_2.set_branch_1(&branch_1);
1833
1834        // Set branch_2 to ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel2Algorithm).
1835        let branch_2 = create_readable_stream(
1836            &self.global(),
1837            underlying_source_type_branch_2,
1838            None,
1839            None,
1840            can_gc,
1841        );
1842        tee_source_1.set_branch_2(&branch_2);
1843        tee_source_2.set_branch_2(&branch_2);
1844
1845        // Upon rejection of reader.[[closedPromise]] with reason r,
1846        reader.default_tee_append_native_handler_to_closed_promise(
1847            &branch_1,
1848            &branch_2,
1849            canceled_1,
1850            canceled_2,
1851            cancel_promise,
1852            can_gc,
1853        );
1854
1855        // Return « branch_1, branch_2 ».
1856        Ok(vec![branch_1, branch_2])
1857    }
1858
1859    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
1860    #[allow(clippy::too_many_arguments)]
1861    pub(crate) fn pipe_to(
1862        &self,
1863        cx: &mut CurrentRealm,
1864        global: &GlobalScope,
1865        dest: &WritableStream,
1866        prevent_close: bool,
1867        prevent_abort: bool,
1868        prevent_cancel: bool,
1869        signal: Option<&AbortSignal>,
1870    ) -> Rc<Promise> {
1871        // Assert: source implements ReadableStream.
1872        // Assert: dest implements WritableStream.
1873        // Assert: prevent_close, prevent_abort, and prevent_cancel are all booleans.
1874        // Done with method signature types.
1875
1876        // If signal was not given, let signal be undefined.
1877        // Assert: either signal is undefined, or signal implements AbortSignal.
1878        // Note: done with the `signal` argument.
1879
1880        // Assert: ! IsReadableStreamLocked(source) is false.
1881        assert!(!self.is_locked());
1882
1883        // Assert: ! IsWritableStreamLocked(dest) is false.
1884        assert!(!dest.is_locked());
1885
1886        // If source.[[controller]] implements ReadableByteStreamController,
1887        // let reader be either ! AcquireReadableStreamBYOBReader(source)
1888        // or ! AcquireReadableStreamDefaultReader(source),
1889        // at the user agent’s discretion.
1890        // Note: for now only using default readers.
1891
1892        // Otherwise, let reader be ! AcquireReadableStreamDefaultReader(source).
1893        let reader = self
1894            .acquire_default_reader(CanGc::from_cx(cx))
1895            .expect("Acquiring a default reader for pipe_to cannot fail");
1896
1897        // Let writer be ! AcquireWritableStreamDefaultWriter(dest).
1898        let writer = dest
1899            .aquire_default_writer(cx.into(), global, CanGc::from_cx(cx))
1900            .expect("Acquiring a default writer for pipe_to cannot fail");
1901
1902        // Set source.[[disturbed]] to true.
1903        self.disturbed.set(true);
1904
1905        // Let shuttingDown be false.
1906        // Done below with default.
1907
1908        // Let promise be a new promise.
1909        let promise = Promise::new2(cx, global);
1910
1911        // In parallel, but not really, using reader and writer, read all chunks from source and write them to dest.
1912        rooted!(&in(cx) let pipe_to = PipeTo {
1913            reader: Dom::from_ref(&reader),
1914            writer: Dom::from_ref(&writer),
1915            pending_writes: Default::default(),
1916            state: Default::default(),
1917            prevent_abort,
1918            prevent_cancel,
1919            prevent_close,
1920            shutting_down: Default::default(),
1921            abort_reason: Default::default(),
1922            shutdown_error: Default::default(),
1923            shutdown_action_promise:  Default::default(),
1924            result_promise: promise.clone(),
1925        });
1926
1927        // If signal is not undefined,
1928        // Note: moving the steps to here, so that the `PipeTo` is available.
1929        if let Some(signal) = signal {
1930            // Let abortAlgorithm be the following steps:
1931            // Note: steps are implemented at call site.
1932            rooted!(&in(cx) let abort_algorithm = AbortAlgorithm::StreamPiping(pipe_to.clone()));
1933
1934            // If signal is aborted, perform abortAlgorithm and return promise.
1935            if signal.aborted() {
1936                signal.run_abort_algorithm(cx, global, &abort_algorithm);
1937                return promise;
1938            }
1939
1940            // Add abortAlgorithm to signal.
1941            signal.add(&abort_algorithm);
1942        }
1943
1944        // Note: perfom checks now, since streams can start as closed or errored.
1945        pipe_to.check_and_propagate_errors_forward(cx, global);
1946        pipe_to.check_and_propagate_errors_backward(cx, global);
1947        pipe_to.check_and_propagate_closing_forward(cx, global);
1948        pipe_to.check_and_propagate_closing_backward(cx, global);
1949
1950        // If we are not closed or errored,
1951        if *pipe_to.state.borrow() == PipeToState::Starting {
1952            // Start the pipe, by waiting on the writer being ready for a chunk.
1953            pipe_to.wait_for_writer_ready(cx, global);
1954        }
1955
1956        // Return promise.
1957        promise
1958    }
1959
1960    /// <https://streams.spec.whatwg.org/#readable-stream-tee>
1961    pub(crate) fn tee(
1962        &self,
1963        clone_for_branch_2: bool,
1964        can_gc: CanGc,
1965    ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1966        // Assert: stream implements ReadableStream.
1967        // Assert: cloneForBranch2 is a boolean.
1968
1969        match self.controller.borrow().as_ref() {
1970            Some(ControllerType::Default(_)) => {
1971                // Return ? ReadableStreamDefaultTee(stream, cloneForBranch2).
1972                self.default_tee(clone_for_branch_2, can_gc)
1973            },
1974            Some(ControllerType::Byte(_)) => {
1975                // If stream.[[controller]] implements ReadableByteStreamController,
1976                // return ? ReadableByteStreamTee(stream).
1977                self.byte_tee(can_gc)
1978            },
1979            None => {
1980                unreachable!("Stream should have a controller.");
1981            },
1982        }
1983    }
1984
1985    /// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller-from-underlying-source>
1986    pub(crate) fn set_up_byte_controller(
1987        &self,
1988        global: &GlobalScope,
1989        underlying_source_dict: JsUnderlyingSource,
1990        underlying_source_handle: SafeHandleObject,
1991        stream: DomRoot<ReadableStream>,
1992        strategy_hwm: f64,
1993        can_gc: CanGc,
1994    ) -> Fallible<()> {
1995        // Let pullAlgorithm be an algorithm that returns a promise resolved with undefined.
1996        // Let cancelAlgorithm be an algorithm that returns a promise resolved with undefined.
1997        // If underlyingSourceDict["start"] exists, then set startAlgorithm to an algorithm which returns the result
1998        // of invoking underlyingSourceDict["start"] with argument list « controller »
1999        // and callback this value underlyingSource.
2000        // If underlyingSourceDict["pull"] exists, then set pullAlgorithm to an algorithm which returns the result
2001        // of invoking underlyingSourceDict["pull"] with argument list « controller »
2002        // and callback this value underlyingSource.
2003        // If underlyingSourceDict["cancel"] exists, then set cancelAlgorithm to an algorithm which takes an
2004        // argument reason and returns the result of invoking underlyingSourceDict["cancel"] with argument list
2005        // « reason » and callback this value underlyingSource.
2006
2007        // Let autoAllocateChunkSize be underlyingSourceDict["autoAllocateChunkSize"],
2008        // if it exists, or undefined otherwise.
2009        // If autoAllocateChunkSize is 0, then throw a TypeError exception.
2010        if let Some(0) = underlying_source_dict.autoAllocateChunkSize {
2011            return Err(Error::Type(c"autoAllocateChunkSize cannot be 0".to_owned()));
2012        }
2013
2014        let controller = ReadableByteStreamController::new(
2015            UnderlyingSourceType::Js(underlying_source_dict, Heap::default()),
2016            strategy_hwm,
2017            global,
2018            can_gc,
2019        );
2020
2021        // Note: this must be done before `setup`,
2022        // otherwise `thisOb` is null in the start callback.
2023        controller.set_underlying_source_this_object(underlying_source_handle);
2024
2025        // Perform ? SetUpReadableByteStreamController(stream, controller, startAlgorithm,
2026        // pullAlgorithm, cancelAlgorithm, highWaterMark, autoAllocateChunkSize).
2027        controller.setup(global, stream, can_gc)
2028    }
2029
2030    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2031    pub(crate) fn setup_cross_realm_transform_readable(
2032        &self,
2033        cx: &mut js::context::JSContext,
2034        port: &MessagePort,
2035    ) {
2036        let port_id = port.message_port_id();
2037        let global = self.global();
2038
2039        // Perform ! InitializeReadableStream(stream).
2040        // Done in `new_inherited`.
2041
2042        // Let sizeAlgorithm be an algorithm that returns 1.
2043        let size_algorithm =
2044            extract_size_algorithm(&QueuingStrategy::default(), CanGc::from_cx(cx));
2045
2046        // Note: other algorithms defined in the underlying source container.
2047
2048        // Let controller be a new ReadableStreamDefaultController.
2049        let controller = ReadableStreamDefaultController::new(
2050            &self.global(),
2051            UnderlyingSourceType::Transfer(Dom::from_ref(port)),
2052            0.,
2053            size_algorithm,
2054            CanGc::from_cx(cx),
2055        );
2056
2057        // Add a handler for port’s message event with the following steps:
2058        // Add a handler for port’s messageerror event with the following steps:
2059        rooted!(&in(cx) let cross_realm_transform_readable = CrossRealmTransformReadable {
2060            controller: Dom::from_ref(&controller),
2061        });
2062        global.note_cross_realm_transform_readable(&cross_realm_transform_readable, port_id);
2063
2064        // Enable port’s port message queue.
2065        port.Start(cx);
2066
2067        // Perform ! SetUpReadableStreamDefaultController
2068        controller
2069            .setup(DomRoot::from_ref(self), CanGc::from_cx(cx))
2070            .expect("Setting up controller for transfer cannot fail.");
2071    }
2072}
2073
2074impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
2075    /// <https://streams.spec.whatwg.org/#rs-constructor>
2076    fn Constructor(
2077        cx: SafeJSContext,
2078        global: &GlobalScope,
2079        proto: Option<SafeHandleObject>,
2080        can_gc: CanGc,
2081        underlying_source: Option<*mut JSObject>,
2082        strategy: &QueuingStrategy,
2083    ) -> Fallible<DomRoot<Self>> {
2084        // If underlyingSource is missing, set it to null.
2085        rooted!(in(*cx) let underlying_source_obj = underlying_source.unwrap_or(ptr::null_mut()));
2086        // Let underlyingSourceDict be underlyingSource,
2087        // converted to an IDL value of type UnderlyingSource.
2088        let underlying_source_dict = if !underlying_source_obj.is_null() {
2089            rooted!(in(*cx) let obj_val = ObjectValue(underlying_source_obj.get()));
2090            match JsUnderlyingSource::new(cx, obj_val.handle(), can_gc) {
2091                Ok(ConversionResult::Success(val)) => val,
2092                Ok(ConversionResult::Failure(error)) => {
2093                    return Err(Error::Type(error.into_owned()));
2094                },
2095                _ => {
2096                    return Err(Error::JSFailed);
2097                },
2098            }
2099        } else {
2100            JsUnderlyingSource::empty()
2101        };
2102
2103        // Perform ! InitializeReadableStream(this).
2104        let stream = ReadableStream::new_with_proto(global, proto, can_gc);
2105
2106        if underlying_source_dict.type_.is_some() {
2107            // If strategy["size"] exists, throw a RangeError exception.
2108            if strategy.size.is_some() {
2109                return Err(Error::Range(
2110                    c"size is not supported for byte streams".to_owned(),
2111                ));
2112            }
2113
2114            // Let highWaterMark be ? ExtractHighWaterMark(strategy, 0).
2115            let strategy_hwm = extract_high_water_mark(strategy, 0.0)?;
2116
2117            // Perform ? SetUpReadableByteStreamControllerFromUnderlyingSource(this,
2118            // underlyingSource, underlyingSourceDict, highWaterMark).
2119            stream.set_up_byte_controller(
2120                global,
2121                underlying_source_dict,
2122                underlying_source_obj.handle(),
2123                stream.clone(),
2124                strategy_hwm,
2125                can_gc,
2126            )?;
2127        } else {
2128            // Let highWaterMark be ? ExtractHighWaterMark(strategy, 1).
2129            let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
2130
2131            // Let sizeAlgorithm be ! ExtractSizeAlgorithm(strategy).
2132            let size_algorithm = extract_size_algorithm(strategy, can_gc);
2133
2134            let controller = ReadableStreamDefaultController::new(
2135                global,
2136                UnderlyingSourceType::Js(underlying_source_dict, Heap::default()),
2137                high_water_mark,
2138                size_algorithm,
2139                can_gc,
2140            );
2141
2142            // Note: this must be done before `setup`,
2143            // otherwise `thisOb` is null in the start callback.
2144            controller.set_underlying_source_this_object(underlying_source_obj.handle());
2145
2146            // Perform ? SetUpReadableStreamDefaultControllerFromUnderlyingSource
2147            controller.setup(stream.clone(), can_gc)?;
2148        };
2149
2150        Ok(stream)
2151    }
2152
2153    /// <https://streams.spec.whatwg.org/#rs-locked>
2154    fn Locked(&self) -> bool {
2155        self.is_locked()
2156    }
2157
2158    /// <https://streams.spec.whatwg.org/#rs-cancel>
2159    fn Cancel(&self, cx: &mut js::context::JSContext, reason: SafeHandleValue) -> Rc<Promise> {
2160        let global = self.global();
2161        if self.is_locked() {
2162            // If ! IsReadableStreamLocked(this) is true,
2163            // return a promise rejected with a TypeError exception.
2164            let promise = Promise::new2(cx, &global);
2165            promise.reject_error(
2166                Error::Type(c"stream is locked".to_owned()),
2167                CanGc::from_cx(cx),
2168            );
2169            promise
2170        } else {
2171            // Return ! ReadableStreamCancel(this, reason).
2172            self.cancel(cx, &global, reason)
2173        }
2174    }
2175
2176    /// <https://streams.spec.whatwg.org/#rs-get-reader>
2177    fn GetReader(
2178        &self,
2179        options: &ReadableStreamGetReaderOptions,
2180        can_gc: CanGc,
2181    ) -> Fallible<ReadableStreamReader> {
2182        // 1, If options["mode"] does not exist, return ? AcquireReadableStreamDefaultReader(this).
2183        if options.mode.is_none() {
2184            return Ok(ReadableStreamReader::ReadableStreamDefaultReader(
2185                self.acquire_default_reader(can_gc)?,
2186            ));
2187        }
2188        // 2. Assert: options["mode"] is "byob".
2189        assert!(options.mode.unwrap() == ReadableStreamReaderMode::Byob);
2190
2191        // 3. Return ? AcquireReadableStreamBYOBReader(this).
2192        Ok(ReadableStreamReader::ReadableStreamBYOBReader(
2193            self.acquire_byob_reader(can_gc)?,
2194        ))
2195    }
2196
2197    /// <https://streams.spec.whatwg.org/#rs-tee>
2198    fn Tee(&self, can_gc: CanGc) -> Fallible<Vec<DomRoot<ReadableStream>>> {
2199        // Return ? ReadableStreamTee(this, false).
2200        self.tee(false, can_gc)
2201    }
2202
2203    /// <https://streams.spec.whatwg.org/#rs-pipe-to>
2204    fn PipeTo(
2205        &self,
2206        cx: &mut CurrentRealm,
2207        destination: &WritableStream,
2208        options: &StreamPipeOptions,
2209    ) -> Rc<Promise> {
2210        let global = self.global();
2211
2212        // If ! IsReadableStreamLocked(this) is true,
2213        if self.is_locked() {
2214            // return a promise rejected with a TypeError exception.
2215            let promise = Promise::new2(cx, &global);
2216            promise.reject_error(
2217                Error::Type(c"Source stream is locked".to_owned()),
2218                CanGc::from_cx(cx),
2219            );
2220            return promise;
2221        }
2222
2223        // If ! IsWritableStreamLocked(destination) is true,
2224        if destination.is_locked() {
2225            // return a promise rejected with a TypeError exception.
2226            let promise = Promise::new2(cx, &global);
2227            promise.reject_error(
2228                Error::Type(c"Destination stream is locked".to_owned()),
2229                CanGc::from_cx(cx),
2230            );
2231            return promise;
2232        }
2233
2234        // Let signal be options["signal"] if it exists, or undefined otherwise.
2235        let signal = options.signal.as_deref();
2236
2237        // Return ! ReadableStreamPipeTo.
2238        self.pipe_to(
2239            cx,
2240            &global,
2241            destination,
2242            options.preventClose,
2243            options.preventAbort,
2244            options.preventCancel,
2245            signal,
2246        )
2247    }
2248
2249    /// <https://streams.spec.whatwg.org/#rs-pipe-through>
2250    fn PipeThrough(
2251        &self,
2252        cx: &mut CurrentRealm,
2253        transform: &ReadableWritablePair,
2254        options: &StreamPipeOptions,
2255    ) -> Fallible<DomRoot<ReadableStream>> {
2256        let global = self.global();
2257
2258        // If ! IsReadableStreamLocked(this) is true, throw a TypeError exception.
2259        if self.is_locked() {
2260            return Err(Error::Type(c"Source stream is locked".to_owned()));
2261        }
2262
2263        // If ! IsWritableStreamLocked(transform["writable"]) is true, throw a TypeError exception.
2264        if transform.writable.is_locked() {
2265            return Err(Error::Type(c"Destination stream is locked".to_owned()));
2266        }
2267
2268        // Let signal be options["signal"] if it exists, or undefined otherwise.
2269        let signal = options.signal.as_deref();
2270
2271        // Let promise be ! ReadableStreamPipeTo(this, transform["writable"],
2272        // options["preventClose"], options["preventAbort"], options["preventCancel"], signal).
2273        let promise = self.pipe_to(
2274            cx,
2275            &global,
2276            &transform.writable,
2277            options.preventClose,
2278            options.preventAbort,
2279            options.preventCancel,
2280            signal,
2281        );
2282
2283        // Set promise.[[PromiseIsHandled]] to true.
2284        promise.set_promise_is_handled();
2285
2286        // Return transform["readable"].
2287        Ok(transform.readable.clone())
2288    }
2289}
2290
2291#[expect(unsafe_code)]
2292/// The initial steps for the message handler for both readable and writable cross realm transforms.
2293/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2294/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
2295pub(crate) unsafe fn get_type_and_value_from_message(
2296    cx: SafeJSContext,
2297    data: SafeHandleValue,
2298    value: SafeMutableHandleValue,
2299    can_gc: CanGc,
2300) -> DOMString {
2301    // Let data be the data of the message.
2302    // Note: we are passed the data as argument,
2303    // which originates in the return value of `structuredclone::read`.
2304
2305    // Assert: data is an Object.
2306    assert!(data.is_object());
2307    rooted!(in(*cx) let data_object = data.to_object());
2308
2309    // Let type be ! Get(data, "type").
2310    rooted!(in(*cx) let mut type_ = UndefinedValue());
2311    unsafe {
2312        get_dictionary_property(
2313            *cx,
2314            data_object.handle(),
2315            c"type",
2316            type_.handle_mut(),
2317            can_gc,
2318        )
2319    }
2320    .expect("Getting the type should not fail.");
2321
2322    // Let value be ! Get(data, "value").
2323    unsafe { get_dictionary_property(*cx, data_object.handle(), c"value", value, can_gc) }
2324        .expect("Getting the value should not fail.");
2325
2326    // Assert: type is a String.
2327    let result =
2328        DOMString::safe_from_jsval(cx, type_.handle(), StringificationBehavior::Empty, can_gc)
2329            .expect("The type of the message should be a string");
2330    let ConversionResult::Success(type_string) = result else {
2331        unreachable!("The type of the message should be a string");
2332    };
2333
2334    type_string
2335}
2336
2337impl js::gc::Rootable for CrossRealmTransformReadable {}
2338
2339/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2340/// A wrapper to handle `message` and `messageerror` events
2341/// for the port used by the transfered stream.
2342#[derive(Clone, JSTraceable, MallocSizeOf)]
2343#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
2344pub(crate) struct CrossRealmTransformReadable {
2345    /// The controller used in the algorithm.
2346    controller: Dom<ReadableStreamDefaultController>,
2347}
2348
2349impl CrossRealmTransformReadable {
2350    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2351    /// Add a handler for port’s message event with the following steps:
2352    #[expect(unsafe_code)]
2353    pub(crate) fn handle_message(
2354        &self,
2355        cx: &mut CurrentRealm,
2356        global: &GlobalScope,
2357        port: &MessagePort,
2358        message: SafeHandleValue,
2359    ) {
2360        rooted!(&in(cx) let mut value = UndefinedValue());
2361        let type_string = unsafe {
2362            get_type_and_value_from_message(
2363                cx.into(),
2364                message,
2365                value.handle_mut(),
2366                CanGc::from_cx(cx),
2367            )
2368        };
2369
2370        // If type is "chunk",
2371        if type_string == "chunk" {
2372            // Perform ! ReadableStreamDefaultControllerEnqueue(controller, value).
2373            self.controller
2374                .enqueue(cx, value.handle())
2375                .expect("Enqueing a chunk should not fail.");
2376        }
2377
2378        // Otherwise, if type is "close",
2379        if type_string == "close" {
2380            // Perform ! ReadableStreamDefaultControllerClose(controller).
2381            self.controller.close(CanGc::from_cx(cx));
2382
2383            // Disentangle port.
2384            global.disentangle_port(port, CanGc::from_cx(cx));
2385        }
2386
2387        // Otherwise, if type is "error",
2388        if type_string == "error" {
2389            // Perform ! ReadableStreamDefaultControllerError(controller, value).
2390            self.controller.error(value.handle(), CanGc::from_cx(cx));
2391
2392            // Disentangle port.
2393            global.disentangle_port(port, CanGc::from_cx(cx));
2394        }
2395    }
2396
2397    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
2398    /// Add a handler for port’s messageerror event with the following steps:
2399    pub(crate) fn handle_error(
2400        &self,
2401        cx: &mut CurrentRealm,
2402        global: &GlobalScope,
2403        port: &MessagePort,
2404    ) {
2405        // Let error be a new "DataCloneError" DOMException.
2406        let error = DOMException::new(global, DOMErrorName::DataCloneError, CanGc::from_cx(cx));
2407        rooted!(&in(cx) let mut rooted_error = UndefinedValue());
2408        error.safe_to_jsval(cx.into(), rooted_error.handle_mut(), CanGc::from_cx(cx));
2409
2410        // Perform ! CrossRealmTransformSendError(port, error).
2411        port.cross_realm_transform_send_error(rooted_error.handle(), CanGc::from_cx(cx));
2412
2413        // Perform ! ReadableStreamDefaultControllerError(controller, error).
2414        self.controller
2415            .error(rooted_error.handle(), CanGc::from_cx(cx));
2416
2417        // Disentangle port.
2418        global.disentangle_port(port, CanGc::from_cx(cx));
2419    }
2420}
2421
2422#[expect(unsafe_code)]
2423/// Get the `done` property of an object that a read promise resolved to.
2424pub(crate) fn get_read_promise_done(
2425    cx: SafeJSContext,
2426    v: &SafeHandleValue,
2427    can_gc: CanGc,
2428) -> Result<bool, Error> {
2429    if !v.is_object() {
2430        return Err(Error::Type(c"Unknown format for done property.".to_owned()));
2431    }
2432    unsafe {
2433        rooted!(in(*cx) let object = v.to_object());
2434        rooted!(in(*cx) let mut done = UndefinedValue());
2435        match get_dictionary_property(*cx, object.handle(), c"done", done.handle_mut(), can_gc) {
2436            Ok(true) => match bool::safe_from_jsval(cx, done.handle(), (), can_gc) {
2437                Ok(ConversionResult::Success(val)) => Ok(val),
2438                Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.into_owned())),
2439                _ => Err(Error::Type(c"Unknown format for done property.".to_owned())),
2440            },
2441            Ok(false) => Err(Error::Type(c"Promise has no done property.".to_owned())),
2442            Err(()) => Err(Error::JSFailed),
2443        }
2444    }
2445}
2446
2447#[expect(unsafe_code)]
2448/// Get the `value` property of an object that a read promise resolved to.
2449pub(crate) fn get_read_promise_bytes(
2450    cx: SafeJSContext,
2451    v: &SafeHandleValue,
2452    can_gc: CanGc,
2453) -> Result<Vec<u8>, Error> {
2454    if !v.is_object() {
2455        return Err(Error::Type(
2456            c"Unknown format for for bytes read.".to_owned(),
2457        ));
2458    }
2459    unsafe {
2460        rooted!(in(*cx) let object = v.to_object());
2461        rooted!(in(*cx) let mut bytes = UndefinedValue());
2462        match get_dictionary_property(*cx, object.handle(), c"value", bytes.handle_mut(), can_gc) {
2463            Ok(true) => {
2464                match Vec::<u8>::safe_from_jsval(
2465                    cx,
2466                    bytes.handle(),
2467                    ConversionBehavior::EnforceRange,
2468                    can_gc,
2469                ) {
2470                    Ok(ConversionResult::Success(val)) => Ok(val),
2471                    Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.into_owned())),
2472                    _ => Err(Error::Type(c"Unknown format for bytes read.".to_owned())),
2473                }
2474            },
2475            Ok(false) => Err(Error::Type(c"Promise has no value property.".to_owned())),
2476            Err(()) => Err(Error::JSFailed),
2477        }
2478    }
2479}
2480
2481/// Convert a raw stream `chunk` JS value to `Vec<u8>`.
2482/// This mirrors the conversion used inside `get_read_promise_bytes`,
2483/// but operates on the raw chunk (no `{ value, done }` wrapper).
2484pub(crate) fn bytes_from_chunk_jsval(
2485    cx: SafeJSContext,
2486    chunk: &RootedTraceableBox<Heap<JSVal>>,
2487    can_gc: CanGc,
2488) -> Result<Vec<u8>, Error> {
2489    match Vec::<u8>::safe_from_jsval(cx, chunk.handle(), ConversionBehavior::EnforceRange, can_gc) {
2490        Ok(ConversionResult::Success(vec)) => Ok(vec),
2491        Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.into_owned())),
2492        _ => Err(Error::Type(c"Unknown format for bytes read.".to_owned())),
2493    }
2494}
2495
2496/// <https://streams.spec.whatwg.org/#rs-transfer>
2497impl Transferable for ReadableStream {
2498    type Index = MessagePortIndex;
2499    type Data = MessagePortImpl;
2500
2501    /// <https://streams.spec.whatwg.org/#ref-for-transfer-steps>
2502    fn transfer(
2503        &self,
2504        cx: &mut js::context::JSContext,
2505    ) -> Fallible<(MessagePortId, MessagePortImpl)> {
2506        // Step 1. If ! IsReadableStreamLocked(value) is true, throw a
2507        // "DataCloneError" DOMException.
2508        if self.is_locked() {
2509            return Err(Error::DataClone(None));
2510        }
2511
2512        let global = self.global();
2513        let mut realm = enter_auto_realm(cx, &*global);
2514        let mut realm = realm.current_realm();
2515        let cx = &mut realm;
2516
2517        // Step 2. Let port1 be a new MessagePort in the current Realm.
2518        let port_1 = MessagePort::new(&global, CanGc::from_cx(cx));
2519        global.track_message_port(&port_1, None);
2520
2521        // Step 3. Let port2 be a new MessagePort in the current Realm.
2522        let port_2 = MessagePort::new(&global, CanGc::from_cx(cx));
2523        global.track_message_port(&port_2, None);
2524
2525        // Step 4. Entangle port1 and port2.
2526        global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
2527
2528        // Step 5. Let writable be a new WritableStream in the current Realm.
2529        let writable = WritableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
2530
2531        // Step 6. Perform ! SetUpCrossRealmTransformWritable(writable, port1).
2532        writable.setup_cross_realm_transform_writable(cx, &port_1);
2533
2534        // Step 7. Let promise be ! ReadableStreamPipeTo(value, writable, false, false, false).
2535        let promise = self.pipe_to(cx, &global, &writable, false, false, false, None);
2536
2537        // Step 8. Set promise.[[PromiseIsHandled]] to true.
2538        promise.set_promise_is_handled();
2539
2540        // Step 9. Set dataHolder.[[port]] to ! StructuredSerializeWithTransfer(port2, « port2 »).
2541        port_2.transfer(cx)
2542    }
2543
2544    /// <https://streams.spec.whatwg.org/#ref-for-transfer-receiving-steps>
2545    fn transfer_receive(
2546        cx: &mut js::context::JSContext,
2547        owner: &GlobalScope,
2548        id: MessagePortId,
2549        port_impl: MessagePortImpl,
2550    ) -> Result<DomRoot<Self>, ()> {
2551        // Their transfer-receiving steps, given dataHolder and value, are:
2552        // Note: dataHolder is used in `structuredclone.rs`, and value is created here.
2553        let value = ReadableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
2554
2555        // Step 1. Let deserializedRecord be !
2556        // StructuredDeserializeWithTransfer(dataHolder.[[port]], the current
2557        // Realm).
2558        // Done with the `Deserialize` derive of `MessagePortImpl`.
2559
2560        // Step 2. Let port be deserializedRecord.[[Deserialized]].
2561        let transferred_port = MessagePort::transfer_receive(cx, owner, id, port_impl)?;
2562
2563        // Step 3. Perform ! SetUpCrossRealmTransformReadable(value, port).
2564        value.setup_cross_realm_transform_readable(cx, &transferred_port);
2565        Ok(value)
2566    }
2567
2568    /// Note: we are relying on the port transfer, so the data returned here are related to the port.
2569    fn serialized_storage<'a>(
2570        data: StructuredData<'a, '_>,
2571    ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
2572        match data {
2573            StructuredData::Reader(r) => &mut r.port_impls,
2574            StructuredData::Writer(w) => &mut w.ports,
2575        }
2576    }
2577}