Skip to main content

script/dom/stream/
readablestream.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr::{self};
8use std::rc::Rc;
9
10use servo_base::generic_channel::GenericSharedMemory;
11use servo_base::id::{MessagePortId, MessagePortIndex};
12use servo_constellation_traits::MessagePortImpl;
13use dom_struct::dom_struct;
14use js::context::JSContext;
15use js::jsapi::{Heap, JSObject};
16use js::jsval::{JSVal, ObjectValue, UndefinedValue};
17use js::realm::CurrentRealm;
18use js::rust::{
19    HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
20    MutableHandleValue as SafeMutableHandleValue,
21};
22use js::typedarray::ArrayBufferViewU8;
23use rustc_hash::FxHashMap;
24use script_bindings::conversions::SafeToJSValConvertible;
25
26use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
27use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{
28    ReadableStreamGetReaderOptions, ReadableStreamMethods, ReadableStreamReaderMode,
29    ReadableWritablePair, StreamPipeOptions,
30};
31use script_bindings::str::DOMString;
32
33use crate::dom::domexception::{DOMErrorName, DOMException};
34use script_bindings::conversions::{is_array_like, StringificationBehavior};
35use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
36use crate::dom::abortsignal::{AbortAlgorithm, AbortSignal};
37use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods;
38use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultController_Binding::ReadableStreamDefaultControllerMethods;
39use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource;
40use crate::dom::bindings::conversions::{ConversionBehavior, ConversionResult, SafeFromJSValConvertible};
41use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
42use crate::dom::bindings::codegen::GenericBindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriter_Binding::WritableStreamDefaultWriterMethods;
43use crate::dom::stream::writablestream::WritableStream;
44use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultReaderOrReadableStreamBYOBReader as ReadableStreamReader;
45use crate::dom::bindings::reflector::DomGlobal;
46use script_bindings::reflector::{Reflector, reflect_dom_object_with_proto};
47use crate::dom::bindings::root::{DomRoot, MutNullableDom, Dom};
48use crate::dom::bindings::trace::RootedTraceableBox;
49use crate::dom::bindings::utils::get_dictionary_property;
50use crate::dom::stream::byteteeunderlyingsource::{ByteTeeCancelAlgorithm, ByteTeePullAlgorithm, ByteTeeUnderlyingSource};
51use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
52use crate::dom::stream::readablestreamgenericreader::ReadableStreamGenericReader;
53use crate::dom::globalscope::GlobalScope;
54use crate::dom::promise::{wait_for_all_promise, Promise};
55use crate::dom::stream::readablebytestreamcontroller::ReadableByteStreamController;
56use crate::dom::stream::readablestreambyobreader::ReadableStreamBYOBReader;
57use crate::dom::stream::readablestreamdefaultcontroller::ReadableStreamDefaultController;
58use crate::dom::stream::readablestreamdefaultreader::{ReadRequest, ReadableStreamDefaultReader};
59use crate::dom::stream::defaultteeunderlyingsource::DefaultTeeCancelAlgorithm;
60use crate::dom::types::DefaultTeeUnderlyingSource;
61use crate::dom::stream::underlyingsourcecontainer::UnderlyingSourceType;
62use crate::dom::stream::writablestreamdefaultwriter::WritableStreamDefaultWriter;
63use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
64use crate::dom::messageport::MessagePort;
65use crate::realms::{enter_auto_realm};
66use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
67use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
68use crate::dom::bindings::transferable::Transferable;
69use crate::dom::bindings::structuredclone::StructuredData;
70
71use crate::dom::bindings::buffer_source::HeapBufferSource;
72use super::readablestreambyobreader::ReadIntoRequest;
73
74/// State Machine for `PipeTo`.
75#[derive(Clone, Debug, Default, MallocSizeOf, PartialEq)]
76enum PipeToState {
77    /// The starting state
78    #[default]
79    Starting,
80    /// Waiting for the writer to be ready
81    PendingReady,
82    /// Waiting for a read to resolve.
83    PendingRead,
84    /// Waiting for all pending writes to finish,
85    /// as part of shutting down with an optional action.
86    ShuttingDownWithPendingWrites(Option<ShutdownAction>),
87    /// When shutting down with an action,
88    /// waiting for the action to complete,
89    /// at which point we can `finalize`.
90    ShuttingDownPendingAction,
91    /// The pipe has been finalized,
92    /// no further actions should be performed.
93    Finalized,
94}
95
96/// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action>
97#[derive(Clone, Debug, MallocSizeOf, PartialEq)]
98enum ShutdownAction {
99    /// <https://streams.spec.whatwg.org/#writable-stream-abort>
100    WritableStreamAbort,
101    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>
102    ReadableStreamCancel,
103    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close-with-error-propagation>
104    WritableStreamDefaultWriterCloseWithErrorPropagation,
105    /// <https://streams.spec.whatwg.org/#ref-for-rs-pipeTo-shutdown-with-action>
106    Abort,
107}
108
109impl js::gc::Rootable for PipeTo {}
110
111/// The "in parallel, but not really" part of
112/// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
113///
114/// Note: the spec is flexible about how this is done, but requires the following constraints to apply:
115/// - Public API must not be used: we'll only use Rust.
116/// - Backpressure must be enforced: we'll only read from source when dest is ready.
117/// - Shutdown must stop activity: we'll do this together with the below.
118/// - Error and close states must be propagated: we'll do this by checking these states at every step.
119#[derive(Clone, JSTraceable, MallocSizeOf)]
120#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
121pub(crate) struct PipeTo {
122    /// <https://streams.spec.whatwg.org/#ref-for-readablestream%E2%91%A7%E2%91%A0>
123    reader: Dom<ReadableStreamDefaultReader>,
124
125    /// <https://streams.spec.whatwg.org/#ref-for-acquire-writable-stream-default-writer>
126    writer: Dom<WritableStreamDefaultWriter>,
127
128    /// Pending writes are needed when shutting down(with an action),
129    /// because we can only finalize when all writes are finished.
130    #[ignore_malloc_size_of = "nested Rc"]
131    pending_writes: Rc<RefCell<VecDeque<Rc<Promise>>>>,
132
133    /// The state machine.
134    #[conditional_malloc_size_of]
135    #[no_trace]
136    state: Rc<RefCell<PipeToState>>,
137
138    /// <https://streams.spec.whatwg.org/#readablestream-pipe-to-preventabort>
139    prevent_abort: bool,
140
141    /// <https://streams.spec.whatwg.org/#readablestream-pipe-to-preventcancel>
142    prevent_cancel: bool,
143
144    /// <https://streams.spec.whatwg.org/#readablestream-pipe-to-preventclose>
145    prevent_close: bool,
146
147    /// The `shuttingDown` variable of
148    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
149    #[conditional_malloc_size_of]
150    shutting_down: Rc<Cell<bool>>,
151
152    /// The abort reason of the abort signal,
153    /// stored here because we must keep it across a microtask.
154    #[ignore_malloc_size_of = "mozjs"]
155    abort_reason: Rc<Heap<JSVal>>,
156
157    /// The error potentially passed to shutdown,
158    /// stored here because we must keep it across a microtask.
159    #[ignore_malloc_size_of = "mozjs"]
160    shutdown_error: Rc<RefCell<Option<Heap<JSVal>>>>,
161
162    /// The promise returned by a shutdown action.
163    /// We keep it to only continue when it is not pending anymore.
164    #[ignore_malloc_size_of = "nested Rc"]
165    shutdown_action_promise: Rc<RefCell<Option<Rc<Promise>>>>,
166
167    /// The promise resolved or rejected at
168    /// <https://streams.spec.whatwg.org/#rs-pipeTo-finalize>
169    #[conditional_malloc_size_of]
170    result_promise: Rc<Promise>,
171}
172
173impl PipeTo {
174    /// Run the `abortAlgorithm` defined at
175    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
176    pub(crate) fn abort_with_reason(
177        &self,
178        cx: &mut CurrentRealm,
179        global: &GlobalScope,
180        reason: SafeHandleValue,
181    ) {
182        // Abort should do nothing if we are already shutting down.
183        if self.shutting_down.get() {
184            return;
185        }
186
187        // Let error be signal’s abort reason.
188        // Note: storing it because it may need to be kept across a microtask,
189        // and see the note below as to why it is kept separately from `shutdown_error`.
190        self.abort_reason.set(reason.get());
191
192        // Note: setting the error now,
193        // will result in a rejection of the pipe promise, with this error.
194        // Unless any shutdown action raise their own error,
195        // in which case this error will be overwritten by the shutdown action error.
196        self.set_shutdown_error(reason);
197
198        // Let actions be an empty ordered set.
199        // Note: the actions are defined, and performed, inside `shutdown_with_an_action`.
200
201        // Shutdown with an action consisting of getting a promise to wait for all of the actions in actions,
202        // and with error.
203        self.shutdown(cx, global, Some(ShutdownAction::Abort));
204    }
205}
206
207impl Callback for PipeTo {
208    /// The pipe makes progress one microtask at a time.
209    /// Note: we use one struct as the callback for all promises,
210    /// and for both of their reactions.
211    ///
212    /// The context of the callback is determined from:
213    /// - the current state.
214    /// - the type of `result`.
215    /// - the state of a stored promise(in some cases).
216    #[expect(unsafe_code)]
217    fn callback(&self, cx: &mut CurrentRealm, result: SafeHandleValue) {
218        let global = self.reader.global();
219
220        // Note: we only care about the result of writes when they are rejected,
221        // and the error is accessed not through handlers,
222        // but directly using `dest.get_stored_error`.
223        // So we must mark rejected promises as handled
224        // to prevent unhandled rejection errors.
225        self.pending_writes.borrow_mut().retain(|p| {
226            let pending = p.is_pending();
227            if !pending {
228                p.set_promise_is_handled();
229            }
230            pending
231        });
232
233        // Note: cloning to prevent re-borrow in methods called below.
234        let state_before_checks = self.state.borrow().clone();
235
236        // Note: if we are in a `PendingRead` state,
237        // and the source is closed,
238        // we try to write chunks before doing any shutdown,
239        // which is necessary to implement the
240        // "If any chunks have been read but not yet written, write them to dest."
241        // part of shutdown.
242        if state_before_checks == PipeToState::PendingRead {
243            let source = self.reader.get_stream().expect("Source stream must be set");
244            if source.is_closed() {
245                let dest = self
246                    .writer
247                    .get_stream()
248                    .expect("Destination stream must be set");
249
250                // If dest.[[state]] is "writable",
251                // and ! WritableStreamCloseQueuedOrInFlight(dest) is false,
252                if dest.is_writable() && !dest.close_queued_or_in_flight() {
253                    let Ok(done) = get_read_promise_done(cx.into(), &result, CanGc::from_cx(cx))
254                    else {
255                        // This is the case that the microtask ran in reaction
256                        // to the closed promise of the reader,
257                        // so we should wait for subsequent chunks,
258                        // and skip the shutdown below
259                        // (reader is closed, but there are still pending reads).
260                        // Shutdown will happen when the last chunk has been received.
261                        return;
262                    };
263
264                    if !done {
265                        // If any chunks have been read but not yet written, write them to dest.
266                        self.write_chunk(cx, &global, result);
267                    }
268                }
269            }
270        }
271
272        self.check_and_propagate_errors_forward(cx, &global);
273        self.check_and_propagate_errors_backward(cx, &global);
274        self.check_and_propagate_closing_forward(cx, &global);
275        self.check_and_propagate_closing_backward(cx, &global);
276
277        // Note: cloning to prevent re-borrow in methods called below.
278        let state = self.state.borrow().clone();
279
280        // If we switched to a shutdown state,
281        // return.
282        // Progress will be made at the next tick.
283        if state != state_before_checks {
284            return;
285        }
286
287        match state {
288            PipeToState::Starting => unreachable!("PipeTo should not be in the Starting state."),
289            PipeToState::PendingReady => {
290                // Read a chunk.
291                self.read_chunk(cx, &global);
292            },
293            PipeToState::PendingRead => {
294                // Write the chunk.
295                self.write_chunk(cx, &global, result);
296
297                // An early return is necessary if the write algorithm aborted the pipe.
298                if self.shutting_down.get() {
299                    return;
300                }
301
302                // Wait for the writer to be ready again.
303                self.wait_for_writer_ready(cx, &global);
304            },
305            PipeToState::ShuttingDownWithPendingWrites(action) => {
306                // Wait until every chunk that has been read has been written
307                // (i.e. the corresponding promises have settled).
308                if let Some(write) = self.pending_writes.borrow_mut().front().cloned() {
309                    self.wait_on_pending_write(cx, &global, write);
310                    return;
311                }
312
313                // Note: error is stored in `self.shutdown_error`.
314                if let Some(action) = action {
315                    // Let p be the result of performing action.
316                    self.perform_action(cx, &global, action);
317                } else {
318                    // Finalize, passing along error if it was given.
319                    self.finalize(cx, &global);
320                }
321            },
322            PipeToState::ShuttingDownPendingAction => {
323                let Some(ref promise) = *self.shutdown_action_promise.borrow() else {
324                    unreachable!();
325                };
326                if promise.is_pending() {
327                    // While waiting for the action to complete,
328                    // we may get callbacks for other promises(closed, ready),
329                    // and we should ignore those.
330                    return;
331                }
332
333                let is_array_like = {
334                    if !result.is_object() {
335                        false
336                    } else {
337                        unsafe { is_array_like::<crate::DomTypeHolder>(cx.raw_cx(), result) }
338                    }
339                };
340
341                // Finalize, passing along error if it was given.
342                if !result.is_undefined() && !is_array_like {
343                    // Most actions either resolve with undefined,
344                    // or reject with an error,
345                    // and the error should be used when finalizing.
346                    // One exception is the `Abort` action,
347                    // which resolves with a list of undefined values.
348
349                    // If `result` isn't undefined or array-like,
350                    // then it is an error
351                    // and should overwrite the current shutdown error.
352                    self.set_shutdown_error(result);
353                }
354                self.finalize(cx, &global);
355            },
356            PipeToState::Finalized => {},
357        }
358    }
359}
360
361impl PipeTo {
362    /// Setting shutdown error in a way that ensures it isn't
363    /// moved after it has been set.
364    fn set_shutdown_error(&self, error: SafeHandleValue) {
365        *self.shutdown_error.borrow_mut() = Some(Heap::default());
366        let Some(ref heap) = *self.shutdown_error.borrow() else {
367            unreachable!("Option set to Some(heap) above.");
368        };
369        heap.set(error.get())
370    }
371
372    /// Wait for the writer to be ready,
373    /// which implements the constraint that backpressure must be enforced.
374    fn wait_for_writer_ready(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
375        {
376            let mut state = self.state.borrow_mut();
377            *state = PipeToState::PendingReady;
378        }
379
380        let ready_promise = self.writer.Ready();
381        if ready_promise.is_fulfilled() {
382            self.read_chunk(cx, global);
383        } else {
384            let handler = PromiseNativeHandler::new(
385                global,
386                Some(Box::new(self.clone())),
387                Some(Box::new(self.clone())),
388                CanGc::from_cx(cx),
389            );
390            ready_promise.append_native_handler(cx, &handler);
391
392            // Note: if the writer is not ready,
393            // in order to ensure progress we must
394            // also react to the closure of the source(because source may close empty).
395            let closed_promise = self.reader.Closed();
396            closed_promise.append_native_handler(cx, &handler);
397        }
398    }
399
400    /// Read a chunk
401    fn read_chunk(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
402        *self.state.borrow_mut() = PipeToState::PendingRead;
403        let chunk_promise = self.reader.Read(cx);
404        let handler = PromiseNativeHandler::new(
405            global,
406            Some(Box::new(self.clone())),
407            Some(Box::new(self.clone())),
408            CanGc::from_cx(cx),
409        );
410        chunk_promise.append_native_handler(cx, &handler);
411
412        // Note: in order to ensure progress we must
413        // also react to the closure of the destination.
414        let ready_promise = self.writer.Closed();
415        ready_promise.append_native_handler(cx, &handler);
416    }
417
418    /// Try to write a chunk using the jsval, and returns wether it succeeded
419    // It will fail if it is the last `done` chunk, or if it is not a chunk at all.
420    #[expect(unsafe_code)]
421    fn write_chunk(
422        &self,
423        cx: &mut JSContext,
424        global: &GlobalScope,
425        chunk: SafeHandleValue,
426    ) -> bool {
427        if chunk.is_object() {
428            rooted!(&in(cx) let object = chunk.to_object());
429            rooted!(&in(cx) let mut bytes = UndefinedValue());
430            let has_value = unsafe {
431                get_dictionary_property(
432                    cx.raw_cx(),
433                    object.handle(),
434                    c"value",
435                    bytes.handle_mut(),
436                    CanGc::from_cx(cx),
437                )
438                .expect("Chunk should have a value.")
439            };
440            if has_value {
441                // Write the chunk.
442                let write_promise = self.writer.write(cx, global, bytes.handle());
443                self.pending_writes.borrow_mut().push_back(write_promise);
444                return true;
445            }
446        }
447        false
448    }
449
450    /// Only as part of shutting-down do we wait on pending writes
451    /// (backpressure is communicated not through pending writes
452    /// but through the readiness of the writer).
453    fn wait_on_pending_write(
454        &self,
455        cx: &mut CurrentRealm,
456        global: &GlobalScope,
457        promise: Rc<Promise>,
458    ) {
459        let handler = PromiseNativeHandler::new(
460            global,
461            Some(Box::new(self.clone())),
462            Some(Box::new(self.clone())),
463            CanGc::from_cx(cx),
464        );
465        promise.append_native_handler(cx, &handler);
466    }
467
468    /// Errors must be propagated forward part of
469    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
470    fn check_and_propagate_errors_forward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
471        // An early return is necessary if we are shutting down,
472        // because in that case the source can already have been set to none.
473        if self.shutting_down.get() {
474            return;
475        }
476
477        // if source.[[state]] is or becomes "errored", then
478        let source = self
479            .reader
480            .get_stream()
481            .expect("Reader should still have a stream");
482        if source.is_errored() {
483            rooted!(&in(cx) let mut source_error = UndefinedValue());
484            source.get_stored_error(source_error.handle_mut());
485            self.set_shutdown_error(source_error.handle());
486
487            // If preventAbort is false,
488            if !self.prevent_abort {
489                // shutdown with an action of ! WritableStreamAbort(dest, source.[[storedError]])
490                // and with source.[[storedError]].
491                self.shutdown(cx, global, Some(ShutdownAction::WritableStreamAbort))
492            } else {
493                // Otherwise, shutdown with source.[[storedError]].
494                self.shutdown(cx, global, None);
495            }
496        }
497    }
498
499    /// Errors must be propagated backward part of
500    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
501    fn check_and_propagate_errors_backward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
502        // An early return is necessary if we are shutting down,
503        // because in that case the destination can already have been set to none.
504        if self.shutting_down.get() {
505            return;
506        }
507
508        // if dest.[[state]] is or becomes "errored", then
509        let dest = self
510            .writer
511            .get_stream()
512            .expect("Writer should still have a stream");
513        if dest.is_errored() {
514            rooted!(&in(cx) let mut dest_error = UndefinedValue());
515            dest.get_stored_error(dest_error.handle_mut());
516            self.set_shutdown_error(dest_error.handle());
517
518            // If preventCancel is false,
519            if !self.prevent_cancel {
520                // shutdown with an action of ! ReadableStreamCancel(source, dest.[[storedError]])
521                // and with dest.[[storedError]].
522                self.shutdown(cx, global, Some(ShutdownAction::ReadableStreamCancel))
523            } else {
524                // Otherwise, shutdown with dest.[[storedError]].
525                self.shutdown(cx, global, None);
526            }
527        }
528    }
529
530    /// Closing must be propagated forward part of
531    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
532    fn check_and_propagate_closing_forward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
533        // An early return is necessary if we are shutting down,
534        // because in that case the source can already have been set to none.
535        if self.shutting_down.get() {
536            return;
537        }
538
539        // if source.[[state]] is or becomes "closed", then
540        let source = self
541            .reader
542            .get_stream()
543            .expect("Reader should still have a stream");
544        if source.is_closed() {
545            // If preventClose is false,
546            if !self.prevent_close {
547                // shutdown with an action of ! WritableStreamAbort(dest, source.[[storedError]])
548                // and with source.[[storedError]].
549                self.shutdown(
550                    cx,
551                    global,
552                    Some(ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation),
553                )
554            } else {
555                // Otherwise, shutdown.
556                self.shutdown(cx, global, None);
557            }
558        }
559    }
560
561    /// Closing must be propagated backward part of
562    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
563    fn check_and_propagate_closing_backward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
564        // An early return is necessary if we are shutting down,
565        // because in that case the destination can already have been set to none.
566        if self.shutting_down.get() {
567            return;
568        }
569
570        // if ! WritableStreamCloseQueuedOrInFlight(dest) is true
571        // or dest.[[state]] is "closed"
572        let dest = self
573            .writer
574            .get_stream()
575            .expect("Writer should still have a stream");
576        if dest.close_queued_or_in_flight() || dest.is_closed() {
577            // Assert: no chunks have been read or written.
578            // Note: unclear how to perform this assertion.
579
580            // Let destClosed be a new TypeError.
581            rooted!(&in(cx) let mut dest_closed = UndefinedValue());
582            let error =
583                Error::Type(c"Destination is closed or has closed queued or in flight".to_owned());
584            error.to_jsval(
585                cx.into(),
586                global,
587                dest_closed.handle_mut(),
588                CanGc::from_cx(cx),
589            );
590            self.set_shutdown_error(dest_closed.handle());
591
592            // If preventCancel is false,
593            if !self.prevent_cancel {
594                // shutdown with an action of ! ReadableStreamCancel(source, destClosed)
595                // and with destClosed.
596                self.shutdown(cx, global, Some(ShutdownAction::ReadableStreamCancel))
597            } else {
598                // Otherwise, shutdown with destClosed.
599                self.shutdown(cx, global, None);
600            }
601        }
602    }
603
604    /// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action>
605    /// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown>
606    /// Combined into one method with an optional action.
607    fn shutdown(
608        &self,
609        cx: &mut CurrentRealm,
610        global: &GlobalScope,
611        action: Option<ShutdownAction>,
612    ) {
613        // If shuttingDown is true, abort these substeps.
614        // Set shuttingDown to true.
615        if !self.shutting_down.replace(true) {
616            let dest = self.writer.get_stream().expect("Stream must be set");
617            // If dest.[[state]] is "writable",
618            // and ! WritableStreamCloseQueuedOrInFlight(dest) is false,
619            if dest.is_writable() && !dest.close_queued_or_in_flight() {
620                // If any chunks have been read but not yet written, write them to dest.
621                // Done at the top of `Callback`.
622
623                // Wait until every chunk that has been read has been written
624                // (i.e. the corresponding promises have settled).
625                if let Some(write) = self.pending_writes.borrow_mut().front() {
626                    *self.state.borrow_mut() = PipeToState::ShuttingDownWithPendingWrites(action);
627                    self.wait_on_pending_write(cx, global, write.clone());
628                    return;
629                }
630            }
631
632            // Note: error is stored in `self.shutdown_error`.
633            if let Some(action) = action {
634                // Let p be the result of performing action.
635                self.perform_action(cx, global, action);
636            } else {
637                // Finalize, passing along error if it was given.
638                self.finalize(cx, global);
639            }
640        }
641    }
642
643    /// The perform action part of
644    /// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action>
645    fn perform_action(&self, cx: &mut CurrentRealm, global: &GlobalScope, action: ShutdownAction) {
646        rooted!(&in(cx) let mut error = UndefinedValue());
647        if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
648            error.set(shutdown_error.get());
649        }
650
651        *self.state.borrow_mut() = PipeToState::ShuttingDownPendingAction;
652
653        // Let p be the result of performing action.
654        let promise = match action {
655            ShutdownAction::WritableStreamAbort => {
656                let dest = self.writer.get_stream().expect("Stream must be set");
657                dest.abort(cx, global, error.handle())
658            },
659            ShutdownAction::ReadableStreamCancel => {
660                let source = self
661                    .reader
662                    .get_stream()
663                    .expect("Reader should have a stream.");
664                source.cancel(cx, global, error.handle())
665            },
666            ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation => {
667                self.writer.close_with_error_propagation(cx, global)
668            },
669            ShutdownAction::Abort => {
670                // Note: implementation of the `abortAlgorithm`
671                // of the signal associated with this piping operation.
672
673                // Let error be signal’s abort reason.
674                rooted!(&in(cx) let mut error = UndefinedValue());
675                error.set(self.abort_reason.get());
676
677                // Let actions be an empty ordered set.
678                let mut actions = vec![];
679
680                // If preventAbort is false, append the following action to actions:
681                if !self.prevent_abort {
682                    let dest = self
683                        .writer
684                        .get_stream()
685                        .expect("Destination stream must be set");
686
687                    // If dest.[[state]] is "writable",
688                    let promise = if dest.is_writable() {
689                        // return ! WritableStreamAbort(dest, error)
690                        dest.abort(cx, global, error.handle())
691                    } else {
692                        // Otherwise, return a promise resolved with undefined.
693                        Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
694                    };
695                    actions.push(promise);
696                }
697
698                // If preventCancel is false, append the following action action to actions:
699                if !self.prevent_cancel {
700                    let source = self.reader.get_stream().expect("Source stream must be set");
701
702                    // If source.[[state]] is "readable",
703                    let promise = if source.is_readable() {
704                        // return ! ReadableStreamCancel(source, error).
705                        source.cancel(cx, global, error.handle())
706                    } else {
707                        // Otherwise, return a promise resolved with undefined.
708                        Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
709                    };
710                    actions.push(promise);
711                }
712
713                // Shutdown with an action consisting
714                // of getting a promise to wait for all of the actions in actions,
715                // and with error.
716                wait_for_all_promise(cx, global, actions)
717            },
718        };
719
720        // Upon fulfillment of p, finalize, passing along originalError if it was given.
721        // Upon rejection of p with reason newError, finalize with newError.
722        let handler = PromiseNativeHandler::new(
723            global,
724            Some(Box::new(self.clone())),
725            Some(Box::new(self.clone())),
726            CanGc::from_cx(cx),
727        );
728        promise.append_native_handler(cx, &handler);
729        *self.shutdown_action_promise.borrow_mut() = Some(promise);
730    }
731
732    /// <https://streams.spec.whatwg.org/#rs-pipeTo-finalize>
733    fn finalize(&self, cx: &mut JSContext, global: &GlobalScope) {
734        *self.state.borrow_mut() = PipeToState::Finalized;
735
736        // Perform ! WritableStreamDefaultWriterRelease(writer).
737        self.writer.release(cx.into(), global, CanGc::from_cx(cx));
738
739        // If reader implements ReadableStreamBYOBReader,
740        // perform ! ReadableStreamBYOBReaderRelease(reader).
741        // TODO.
742
743        // Otherwise, perform ! ReadableStreamDefaultReaderRelease(reader).
744        self.reader
745            .release(cx)
746            .expect("Releasing the reader should not fail");
747
748        // If signal is not undefined, remove abortAlgorithm from signal.
749        // Note: since `self.shutdown` is true at this point,
750        // the abort algorithm is a no-op,
751        // so for now not implementing this step.
752
753        if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
754            rooted!(&in(cx) let mut error = UndefinedValue());
755            error.set(shutdown_error.get());
756            // If error was given, reject promise with error.
757            self.result_promise
758                .reject_native(&error.handle(), CanGc::from_cx(cx));
759        } else {
760            // Otherwise, resolve promise with undefined.
761            self.result_promise.resolve_native(&(), CanGc::from_cx(cx));
762        }
763    }
764}
765
766/// The fulfillment handler for the reacting to sourceCancelPromise part of
767/// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
768#[derive(Clone, JSTraceable, MallocSizeOf)]
769struct SourceCancelPromiseFulfillmentHandler {
770    #[conditional_malloc_size_of]
771    result: Rc<Promise>,
772}
773
774impl Callback for SourceCancelPromiseFulfillmentHandler {
775    /// The fulfillment handler for the reacting to sourceCancelPromise part of
776    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
777    /// An implementation of <https://webidl.spec.whatwg.org/#dfn-perform-steps-once-promise-is-settled>
778    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
779        self.result.resolve_native(&(), CanGc::from_cx(cx));
780    }
781}
782
783/// The rejection handler for the reacting to sourceCancelPromise part of
784/// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
785#[derive(Clone, JSTraceable, MallocSizeOf)]
786struct SourceCancelPromiseRejectionHandler {
787    #[conditional_malloc_size_of]
788    result: Rc<Promise>,
789}
790
791impl Callback for SourceCancelPromiseRejectionHandler {
792    /// The rejection handler for the reacting to sourceCancelPromise part of
793    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
794    /// An implementation of <https://webidl.spec.whatwg.org/#dfn-perform-steps-once-promise-is-settled>
795    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
796        self.result.reject_native(&v, CanGc::from_cx(cx));
797    }
798}
799
800/// <https://streams.spec.whatwg.org/#readablestream-state>
801#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf, PartialEq)]
802pub(crate) enum ReadableStreamState {
803    #[default]
804    Readable,
805    Closed,
806    Errored,
807}
808
809/// <https://streams.spec.whatwg.org/#readablestream-controller>
810#[derive(JSTraceable, MallocSizeOf)]
811#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
812pub(crate) enum ControllerType {
813    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller>
814    Byte(MutNullableDom<ReadableByteStreamController>),
815    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller>
816    Default(MutNullableDom<ReadableStreamDefaultController>),
817}
818
819/// <https://streams.spec.whatwg.org/#readablestream-readerr>
820#[derive(JSTraceable, MallocSizeOf)]
821#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
822pub(crate) enum ReaderType {
823    /// <https://streams.spec.whatwg.org/#readablestreambyobreader>
824    #[allow(clippy::upper_case_acronyms)]
825    BYOB(MutNullableDom<ReadableStreamBYOBReader>),
826    /// <https://streams.spec.whatwg.org/#readablestreamdefaultreader>
827    Default(MutNullableDom<ReadableStreamDefaultReader>),
828}
829
830impl Eq for ReaderType {}
831impl PartialEq for ReaderType {
832    fn eq(&self, other: &Self) -> bool {
833        matches!(
834            (self, other),
835            (ReaderType::BYOB(_), ReaderType::BYOB(_)) |
836                (ReaderType::Default(_), ReaderType::Default(_))
837        )
838    }
839}
840
841/// <https://streams.spec.whatwg.org/#create-readable-stream>
842pub(crate) fn create_readable_stream(
843    cx: &mut JSContext,
844    global: &GlobalScope,
845    underlying_source_type: UnderlyingSourceType,
846    queuing_strategy: Option<Rc<QueuingStrategySize>>,
847    high_water_mark: Option<f64>,
848) -> DomRoot<ReadableStream> {
849    // If highWaterMark was not passed, set it to 1.
850    let high_water_mark = high_water_mark.unwrap_or(1.0);
851
852    // If sizeAlgorithm was not passed, set it to an algorithm that returns 1.
853    let size_algorithm = queuing_strategy.unwrap_or(extract_size_algorithm(
854        &QueuingStrategy::empty(),
855        CanGc::from_cx(cx),
856    ));
857
858    // Assert: ! IsNonNegativeNumber(highWaterMark) is true.
859    assert!(high_water_mark >= 0.0);
860
861    // Let stream be a new ReadableStream.
862    // Perform ! InitializeReadableStream(stream).
863    let stream = ReadableStream::new_with_proto(global, None, CanGc::from_cx(cx));
864
865    // Let controller be a new ReadableStreamDefaultController.
866    let controller = ReadableStreamDefaultController::new(
867        global,
868        underlying_source_type,
869        high_water_mark,
870        size_algorithm,
871        CanGc::from_cx(cx),
872    );
873
874    // Perform ? SetUpReadableStreamDefaultController(stream, controller, startAlgorithm,
875    // pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm).
876    controller
877        .setup(cx, stream.clone())
878        .expect("Setup of default controller cannot fail");
879
880    // Return stream.
881    stream
882}
883
884/// <https://streams.spec.whatwg.org/#abstract-opdef-createreadablebytestream>
885fn readable_byte_stream_tee(
886    cx: &mut JSContext,
887    global: &GlobalScope,
888    underlying_source_type: UnderlyingSourceType,
889) -> DomRoot<ReadableStream> {
890    // Let stream be a new ReadableStream.
891    // Perform ! InitializeReadableStream(stream).
892    let tee_stream = ReadableStream::new_with_proto(global, None, CanGc::from_cx(cx));
893
894    // Let controller be a new ReadableByteStreamController.
895    let controller =
896        ReadableByteStreamController::new(underlying_source_type, 0.0, global, CanGc::from_cx(cx));
897
898    // Perform ? SetUpReadableByteStreamController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, 0, undefined).
899    controller
900        .setup(cx, global, tee_stream.clone())
901        .expect("Setup of byte stream controller cannot fail");
902
903    // Return stream.
904    tee_stream
905}
906
907/// <https://streams.spec.whatwg.org/#rs-class>
908#[dom_struct]
909pub(crate) struct ReadableStream {
910    reflector_: Reflector,
911
912    /// <https://streams.spec.whatwg.org/#readablestream-controller>
913    /// Note: the inner `MutNullableDom` should really be an `Option<Dom>`,
914    /// because it is never unset once set.
915    controller: RefCell<Option<ControllerType>>,
916
917    /// <https://streams.spec.whatwg.org/#readablestream-storederror>
918    #[ignore_malloc_size_of = "mozjs"]
919    stored_error: Heap<JSVal>,
920
921    /// <https://streams.spec.whatwg.org/#readablestream-disturbed>
922    disturbed: Cell<bool>,
923
924    /// <https://streams.spec.whatwg.org/#readablestream-reader>
925    reader: RefCell<Option<ReaderType>>,
926
927    /// <https://streams.spec.whatwg.org/#readablestream-state>
928    state: Cell<ReadableStreamState>,
929}
930
931impl ReadableStream {
932    /// <https://streams.spec.whatwg.org/#initialize-readable-stream>
933    fn new_inherited() -> ReadableStream {
934        ReadableStream {
935            reflector_: Reflector::new(),
936            controller: RefCell::new(None),
937            stored_error: Heap::default(),
938            disturbed: Default::default(),
939            reader: RefCell::new(None),
940            state: Cell::new(Default::default()),
941        }
942    }
943
944    pub(crate) fn new_with_proto(
945        global: &GlobalScope,
946        proto: Option<SafeHandleObject>,
947        can_gc: CanGc,
948    ) -> DomRoot<ReadableStream> {
949        reflect_dom_object_with_proto(
950            Box::new(ReadableStream::new_inherited()),
951            global,
952            proto,
953            can_gc,
954        )
955    }
956
957    /// Used as part of
958    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
959    pub(crate) fn set_default_controller(&self, controller: &ReadableStreamDefaultController) {
960        *self.controller.borrow_mut() = Some(ControllerType::Default(MutNullableDom::new(Some(
961            controller,
962        ))));
963    }
964
965    /// Used as part of
966    /// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
967    pub(crate) fn set_byte_controller(&self, controller: &ReadableByteStreamController) {
968        *self.controller.borrow_mut() =
969            Some(ControllerType::Byte(MutNullableDom::new(Some(controller))));
970    }
971
972    /// Used as part of
973    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
974    pub(crate) fn assert_no_controller(&self) {
975        let has_no_controller = self.controller.borrow().is_none();
976        assert!(has_no_controller);
977    }
978
979    /// Build a stream backed by a Rust source that has already been read into memory.
980    pub(crate) fn new_from_bytes(
981        cx: &mut JSContext,
982        global: &GlobalScope,
983        bytes: Vec<u8>,
984    ) -> Fallible<DomRoot<ReadableStream>> {
985        let stream = ReadableStream::new_with_external_underlying_source(
986            cx,
987            global,
988            UnderlyingSourceType::Memory(bytes.len()),
989        )?;
990        stream.enqueue_native(cx, bytes);
991        stream.controller_close_native(cx);
992        Ok(stream)
993    }
994
995    /// Build a stream backed by a Rust underlying source.
996    /// Note: external sources are always paired with a default controller.
997    pub(crate) fn new_with_external_underlying_source(
998        cx: &mut JSContext,
999        global: &GlobalScope,
1000        source: UnderlyingSourceType,
1001    ) -> Fallible<DomRoot<ReadableStream>> {
1002        assert!(source.is_native());
1003        let stream = ReadableStream::new_with_proto(global, None, CanGc::from_cx(cx));
1004        let controller = ReadableStreamDefaultController::new(
1005            global,
1006            source,
1007            1.0,
1008            extract_size_algorithm(&QueuingStrategy::empty(), CanGc::from_cx(cx)),
1009            CanGc::from_cx(cx),
1010        );
1011        controller.setup(cx, stream.clone())?;
1012        Ok(stream)
1013    }
1014
1015    /// Call into the release steps of the controller,
1016    pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
1017        match self.controller.borrow().as_ref() {
1018            Some(ControllerType::Default(controller)) => {
1019                let controller = controller
1020                    .get()
1021                    .ok_or_else(|| Error::Type(c"Stream should have controller.".to_owned()))?;
1022                controller.perform_release_steps()
1023            },
1024            Some(ControllerType::Byte(controller)) => {
1025                let controller = controller
1026                    .get()
1027                    .ok_or_else(|| Error::Type(c"Stream should have controller.".to_owned()))?;
1028                controller.perform_release_steps()
1029            },
1030            None => Err(Error::Type(c"Stream should have controller.".to_owned())),
1031        }
1032    }
1033
1034    /// Call into the pull steps of the controller,
1035    /// as part of
1036    /// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read>
1037    pub(crate) fn perform_pull_steps(&self, cx: &mut JSContext, read_request: &ReadRequest) {
1038        match self.controller.borrow().as_ref() {
1039            Some(ControllerType::Default(controller)) => controller
1040                .get()
1041                .expect("Stream should have controller.")
1042                .perform_pull_steps(cx, read_request),
1043            Some(ControllerType::Byte(controller)) => controller
1044                .get()
1045                .expect("Stream should have controller.")
1046                .perform_pull_steps(cx, read_request),
1047            None => {
1048                unreachable!("Stream does not have a controller.");
1049            },
1050        }
1051    }
1052
1053    /// Call into the pull steps of the controller,
1054    /// as part of
1055    /// <https://streams.spec.whatwg.org/#readable-stream-byob-reader-read>
1056    pub(crate) fn perform_pull_into(
1057        &self,
1058        cx: &mut JSContext,
1059        read_into_request: &ReadIntoRequest,
1060        view: &HeapBufferSource<ArrayBufferViewU8>,
1061        min: u64,
1062    ) {
1063        match self.controller.borrow().as_ref() {
1064            Some(ControllerType::Byte(controller)) => controller
1065                .get()
1066                .expect("Stream should have controller.")
1067                .perform_pull_into(cx, read_into_request, view, min),
1068            _ => {
1069                unreachable!(
1070                    "Pulling a chunk from a stream with a default controller using a BYOB reader"
1071                )
1072            },
1073        }
1074    }
1075
1076    /// <https://streams.spec.whatwg.org/#readable-stream-add-read-request>
1077    pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
1078        match self.reader.borrow().as_ref() {
1079            Some(ReaderType::Default(reader)) => {
1080                let Some(reader) = reader.get() else {
1081                    panic!("Attempt to add a read request without having first acquired a reader.");
1082                };
1083
1084                // Assert: stream.[[state]] is "readable".
1085                assert!(self.is_readable());
1086
1087                // Append readRequest to stream.[[reader]].[[readRequests]].
1088                reader.add_read_request(read_request);
1089            },
1090            _ => {
1091                unreachable!("Adding a read request can only be done on a default reader.")
1092            },
1093        }
1094    }
1095
1096    /// <https://streams.spec.whatwg.org/#readable-stream-add-read-into-request>
1097    pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
1098        match self.reader.borrow().as_ref() {
1099            // Assert: stream.[[reader]] implements ReadableStreamBYOBReader.
1100            Some(ReaderType::BYOB(reader)) => {
1101                let Some(reader) = reader.get() else {
1102                    unreachable!(
1103                        "Attempt to add a read into request without having first acquired a reader."
1104                    );
1105                };
1106
1107                // Assert: stream.[[state]] is "readable" or "closed".
1108                assert!(self.is_readable() || self.is_closed());
1109
1110                // Append readRequest to stream.[[reader]].[[readIntoRequests]].
1111                reader.add_read_into_request(read_request);
1112            },
1113            _ => {
1114                unreachable!("Adding a read into request can only be done on a BYOB reader.")
1115            },
1116        }
1117    }
1118
1119    /// Endpoint to enqueue chunks directly from Rust.
1120    /// Note: in other use cases this call happens via the controller.
1121    pub(crate) fn enqueue_native(&self, cx: &mut JSContext, bytes: Vec<u8>) {
1122        match self.controller.borrow().as_ref() {
1123            Some(ControllerType::Default(controller)) => controller
1124                .get()
1125                .expect("Stream should have controller.")
1126                .enqueue_native(cx, bytes),
1127            _ => {
1128                unreachable!(
1129                    "Enqueueing chunk to a stream from Rust on other than default controller"
1130                );
1131            },
1132        }
1133    }
1134
1135    /// <https://streams.spec.whatwg.org/#readable-stream-error>
1136    pub(crate) fn error(&self, cx: &mut JSContext, e: SafeHandleValue) {
1137        // Assert: stream.[[state]] is "readable".
1138        assert!(self.is_readable());
1139
1140        // Set stream.[[state]] to "errored".
1141        self.state.set(ReadableStreamState::Errored);
1142
1143        // Set stream.[[storedError]] to e.
1144        self.stored_error.set(e.get());
1145
1146        // Let reader be stream.[[reader]].
1147
1148        let default_reader = {
1149            let reader_ref = self.reader.borrow();
1150            match reader_ref.as_ref() {
1151                Some(ReaderType::Default(reader)) => reader.get(),
1152                _ => None,
1153            }
1154        };
1155
1156        if let Some(reader) = default_reader {
1157            // Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, e).
1158            reader.error(cx, e);
1159            return;
1160        }
1161
1162        let byob_reader = {
1163            let reader_ref = self.reader.borrow();
1164            match reader_ref.as_ref() {
1165                Some(ReaderType::BYOB(reader)) => reader.get(),
1166                _ => None,
1167            }
1168        };
1169
1170        if let Some(reader) = byob_reader {
1171            // Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e).
1172            reader.error_read_into_requests(e, CanGc::from_cx(cx));
1173        }
1174
1175        // If reader is undefined, return.
1176    }
1177
1178    /// <https://streams.spec.whatwg.org/#readablestream-storederror>
1179    pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
1180        handle_mut.set(self.stored_error.get());
1181    }
1182
1183    /// <https://streams.spec.whatwg.org/#readable-stream-error>
1184    /// Note: in other use cases this call happens via the controller.
1185    pub(crate) fn error_native(&self, cx: &mut JSContext, error: Error) {
1186        rooted!(&in(cx) let mut error_val = UndefinedValue());
1187        error.to_jsval(
1188            cx.into(),
1189            &self.global(),
1190            error_val.handle_mut(),
1191            CanGc::from_cx(cx),
1192        );
1193        self.error(cx, error_val.handle());
1194    }
1195
1196    /// Call into the controller's `Close` method.
1197    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-close>
1198    pub(crate) fn controller_close_native(&self, cx: &mut JSContext) {
1199        match self.controller.borrow().as_ref() {
1200            Some(ControllerType::Default(controller)) => {
1201                let _ = controller
1202                    .get()
1203                    .expect("Stream should have controller.")
1204                    .Close(cx);
1205            },
1206            _ => {
1207                unreachable!("Native closing is only done on default controllers.")
1208            },
1209        }
1210    }
1211
1212    /// Returns a boolean reflecting whether the stream has all data in memory.
1213    /// Useful for native source integration only.
1214    pub(crate) fn in_memory(&self) -> bool {
1215        match self.controller.borrow().as_ref() {
1216            Some(ControllerType::Default(controller)) => controller
1217                .get()
1218                .expect("Stream should have controller.")
1219                .in_memory(),
1220            _ => {
1221                unreachable!(
1222                    "Checking if source is in memory for a stream with a non-default controller"
1223                )
1224            },
1225        }
1226    }
1227
1228    /// Return bytes for synchronous use, if the stream has all data in memory.
1229    /// Useful for native source integration only.
1230    pub(crate) fn get_in_memory_bytes(&self) -> Option<GenericSharedMemory> {
1231        match self.controller.borrow().as_ref() {
1232            Some(ControllerType::Default(controller)) => controller
1233                .get()
1234                .expect("Stream should have controller.")
1235                .get_in_memory_bytes()
1236                .as_deref()
1237                .map(GenericSharedMemory::from_bytes),
1238            _ => {
1239                unreachable!("Getting in-memory bytes for a stream with a non-default controller")
1240            },
1241        }
1242    }
1243
1244    /// Acquires a reader and locks the stream,
1245    /// must be done before `read_a_chunk`.
1246    /// Native call to
1247    /// <https://streams.spec.whatwg.org/#acquire-readable-stream-reader>
1248    pub(crate) fn acquire_default_reader(
1249        &self,
1250        can_gc: CanGc,
1251    ) -> Fallible<DomRoot<ReadableStreamDefaultReader>> {
1252        // Let reader be a new ReadableStreamDefaultReader.
1253        let reader = ReadableStreamDefaultReader::new(&self.global(), can_gc);
1254
1255        // Perform ? SetUpReadableStreamDefaultReader(reader, stream).
1256        reader.set_up(self, &self.global(), can_gc)?;
1257
1258        // Return reader.
1259        Ok(reader)
1260    }
1261
1262    /// <https://streams.spec.whatwg.org/#acquire-readable-stream-byob-reader>
1263    pub(crate) fn acquire_byob_reader(
1264        &self,
1265        can_gc: CanGc,
1266    ) -> Fallible<DomRoot<ReadableStreamBYOBReader>> {
1267        // Let reader be a new ReadableStreamBYOBReader.
1268        let reader = ReadableStreamBYOBReader::new(&self.global(), can_gc);
1269        // Perform ? SetUpReadableStreamBYOBReader(reader, stream).
1270        reader.set_up(self, &self.global(), can_gc)?;
1271
1272        // Return reader.
1273        Ok(reader)
1274    }
1275
1276    pub(crate) fn get_default_controller(&self) -> DomRoot<ReadableStreamDefaultController> {
1277        match self.controller.borrow().as_ref() {
1278            Some(ControllerType::Default(controller)) => {
1279                controller.get().expect("Stream should have controller.")
1280            },
1281            _ => {
1282                unreachable!(
1283                    "Getting default controller for a stream with a non-default controller"
1284                )
1285            },
1286        }
1287    }
1288
1289    pub(crate) fn get_byte_controller(&self) -> DomRoot<ReadableByteStreamController> {
1290        match self.controller.borrow().as_ref() {
1291            Some(ControllerType::Byte(controller)) => {
1292                controller.get().expect("Stream should have controller.")
1293            },
1294            _ => {
1295                unreachable!("Getting byte controller for a stream with a non-byte controller")
1296            },
1297        }
1298    }
1299
1300    pub(crate) fn get_default_reader(&self) -> DomRoot<ReadableStreamDefaultReader> {
1301        match self.reader.borrow().as_ref() {
1302            Some(ReaderType::Default(reader)) => reader.get().expect("Stream should have reader."),
1303            _ => {
1304                unreachable!("Getting default reader for a stream with a non-default reader")
1305            },
1306        }
1307    }
1308
1309    /// Read a chunk from the stream,
1310    /// must be called after `start_reading`,
1311    /// and before `stop_reading`.
1312    /// Native call to
1313    /// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read>
1314    pub(crate) fn read_a_chunk(&self, cx: &mut JSContext) -> Rc<Promise> {
1315        match self.reader.borrow().as_ref() {
1316            Some(ReaderType::Default(reader)) => {
1317                let Some(reader) = reader.get() else {
1318                    unreachable!(
1319                        "Attempt to read stream chunk without having first acquired a reader."
1320                    );
1321                };
1322                reader.Read(cx)
1323            },
1324            _ => {
1325                unreachable!("Native reading of a chunk can only be done with a default reader.")
1326            },
1327        }
1328    }
1329
1330    /// Releases the lock on the reader,
1331    /// must be done after `start_reading`.
1332    /// Native call to
1333    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultreaderrelease>
1334    pub(crate) fn stop_reading(&self, cx: &mut JSContext) {
1335        let reader_ref = self.reader.borrow();
1336
1337        match reader_ref.as_ref() {
1338            Some(ReaderType::Default(reader)) => {
1339                let Some(reader) = reader.get() else {
1340                    unreachable!("Attempt to stop reading without having first acquired a reader.");
1341                };
1342
1343                drop(reader_ref);
1344                reader.release(cx).expect("Reader release cannot fail.");
1345            },
1346            _ => {
1347                unreachable!("Native stop reading can only be done with a default reader.")
1348            },
1349        }
1350    }
1351
1352    /// <https://streams.spec.whatwg.org/#is-readable-stream-locked>
1353    pub(crate) fn is_locked(&self) -> bool {
1354        match self.reader.borrow().as_ref() {
1355            Some(ReaderType::Default(reader)) => reader.get().is_some(),
1356            Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1357            None => false,
1358        }
1359    }
1360
1361    pub(crate) fn is_disturbed(&self) -> bool {
1362        self.disturbed.get()
1363    }
1364
1365    pub(crate) fn set_is_disturbed(&self, disturbed: bool) {
1366        self.disturbed.set(disturbed);
1367    }
1368
1369    pub(crate) fn is_closed(&self) -> bool {
1370        self.state.get() == ReadableStreamState::Closed
1371    }
1372
1373    pub(crate) fn is_errored(&self) -> bool {
1374        self.state.get() == ReadableStreamState::Errored
1375    }
1376
1377    pub(crate) fn is_readable(&self) -> bool {
1378        self.state.get() == ReadableStreamState::Readable
1379    }
1380
1381    pub(crate) fn has_default_reader(&self) -> bool {
1382        match self.reader.borrow().as_ref() {
1383            Some(ReaderType::Default(reader)) => reader.get().is_some(),
1384            _ => false,
1385        }
1386    }
1387
1388    pub(crate) fn has_byob_reader(&self) -> bool {
1389        match self.reader.borrow().as_ref() {
1390            Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1391            _ => false,
1392        }
1393    }
1394
1395    pub(crate) fn has_byte_controller(&self) -> bool {
1396        match self.controller.borrow().as_ref() {
1397            Some(ControllerType::Byte(controller)) => controller.get().is_some(),
1398            _ => false,
1399        }
1400    }
1401
1402    /// <https://streams.spec.whatwg.org/#readable-stream-get-num-read-requests>
1403    pub(crate) fn get_num_read_requests(&self) -> usize {
1404        match self.reader.borrow().as_ref() {
1405            Some(ReaderType::Default(reader)) => {
1406                let reader = reader
1407                    .get()
1408                    .expect("Stream must have a reader when getting the number of read requests.");
1409                reader.get_num_read_requests()
1410            },
1411            _ => unreachable!(
1412                "Stream must have a default reader when get num read requests is called into."
1413            ),
1414        }
1415    }
1416
1417    /// <https://streams.spec.whatwg.org/#readable-stream-get-num-read-into-requests>
1418    pub(crate) fn get_num_read_into_requests(&self) -> usize {
1419        assert!(self.has_byob_reader());
1420
1421        match self.reader.borrow().as_ref() {
1422            Some(ReaderType::BYOB(reader)) => {
1423                let Some(reader) = reader.get() else {
1424                    unreachable!(
1425                        "Stream must have a reader when get num read into requests is called into."
1426                    );
1427                };
1428                reader.get_num_read_into_requests()
1429            },
1430            _ => {
1431                unreachable!(
1432                    "Stream must have a BYOB reader when get num read into requests is called into."
1433                );
1434            },
1435        }
1436    }
1437
1438    /// <https://streams.spec.whatwg.org/#readable-stream-fulfill-read-request>
1439    pub(crate) fn fulfill_read_request(
1440        &self,
1441        cx: &mut JSContext,
1442        chunk: SafeHandleValue,
1443        done: bool,
1444    ) {
1445        // step 1 - Assert: ! ReadableStreamHasDefaultReader(stream) is true.
1446        assert!(self.has_default_reader());
1447
1448        match self.reader.borrow().as_ref() {
1449            Some(ReaderType::Default(reader)) => {
1450                // step 2 - Let reader be stream.[[reader]].
1451                let reader = reader
1452                    .get()
1453                    .expect("Stream must have a reader when a read request is fulfilled.");
1454                // step 3 - Assert: reader.[[readRequests]] is not empty.
1455                assert_ne!(reader.get_num_read_requests(), 0);
1456                // step 4 & 5
1457                // Let readRequest be reader.[[readRequests]][0]. & Remove readRequest from reader.[[readRequests]].
1458                let request = reader.remove_read_request();
1459
1460                if done {
1461                    // step 6 - If done is true, perform readRequest’s close steps.
1462                    request.close_steps(cx);
1463                } else {
1464                    // step 7 - Otherwise, perform readRequest’s chunk steps, given chunk.
1465                    let result = RootedTraceableBox::new(Heap::default());
1466                    result.set(*chunk);
1467                    request.chunk_steps(cx, result, &self.global());
1468                }
1469            },
1470            _ => {
1471                unreachable!(
1472                    "Stream must have a default reader when fulfill read requests is called into."
1473                );
1474            },
1475        }
1476    }
1477
1478    /// <https://streams.spec.whatwg.org/#readable-stream-fulfill-read-into-request>
1479    pub(crate) fn fulfill_read_into_request(
1480        &self,
1481        cx: &mut JSContext,
1482        chunk: SafeHandleValue,
1483        done: bool,
1484    ) {
1485        // Assert: ! ReadableStreamHasBYOBReader(stream) is true.
1486        assert!(self.has_byob_reader());
1487
1488        // Let reader be stream.[[reader]].
1489        match self.reader.borrow().as_ref() {
1490            Some(ReaderType::BYOB(reader)) => {
1491                let Some(reader) = reader.get() else {
1492                    unreachable!(
1493                        "Stream must have a reader when a read into request is fulfilled."
1494                    );
1495                };
1496
1497                // Assert: reader.[[readIntoRequests]] is not empty.
1498                assert!(reader.get_num_read_into_requests() > 0);
1499
1500                // Let readIntoRequest be reader.[[readIntoRequests]][0].
1501                // Remove readIntoRequest from reader.[[readIntoRequests]].
1502                let read_into_request = reader.remove_read_into_request();
1503
1504                // If done is true, perform readIntoRequest’s close steps, given chunk.
1505                let result = RootedTraceableBox::new(Heap::default());
1506                if done {
1507                    result.set(*chunk);
1508                    read_into_request.close_steps(cx, Some(result));
1509                } else {
1510                    // Otherwise, perform readIntoRequest’s chunk steps, given chunk.
1511                    result.set(*chunk);
1512                    read_into_request.chunk_steps(result, CanGc::from_cx(cx));
1513                }
1514            },
1515            _ => {
1516                unreachable!(
1517                    "Stream must have a BYOB reader when fulfill read into requests is called into."
1518                );
1519            },
1520        };
1521    }
1522
1523    /// <https://streams.spec.whatwg.org/#readable-stream-close>
1524    pub(crate) fn close(&self, cx: &mut JSContext) {
1525        // Assert: stream.[[state]] is "readable".
1526        assert!(self.is_readable());
1527        // Set stream.[[state]] to "closed".
1528        self.state.set(ReadableStreamState::Closed);
1529        // Let reader be stream.[[reader]].
1530
1531        // NOTE: do not hold the RefCell borrow across reader.close(),
1532        // or release() will panic when it tries to mut-borrow stream.reader.
1533        // So we pull out the underlying DOM reader in a local, then drop the borrow.
1534        let default_reader = {
1535            let reader_ref = self.reader.borrow();
1536            match reader_ref.as_ref() {
1537                Some(ReaderType::Default(reader)) => reader.get(),
1538                _ => None,
1539            }
1540        };
1541
1542        if let Some(reader) = default_reader {
1543            // steps 5 & 6 for a default reader
1544            reader.close(cx);
1545            return;
1546        }
1547
1548        // Same for BYOB reader.
1549        let byob_reader = {
1550            let reader_ref = self.reader.borrow();
1551            match reader_ref.as_ref() {
1552                Some(ReaderType::BYOB(reader)) => reader.get(),
1553                _ => None,
1554            }
1555        };
1556
1557        if let Some(reader) = byob_reader {
1558            // steps 5 & 6 for a BYOB reader
1559            reader.close(CanGc::from_cx(cx));
1560        }
1561
1562        // If reader is undefined, return.
1563    }
1564
1565    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>
1566    pub(crate) fn cancel(
1567        &self,
1568        cx: &mut JSContext,
1569        global: &GlobalScope,
1570        reason: SafeHandleValue,
1571    ) -> Rc<Promise> {
1572        // Set stream.[[disturbed]] to true.
1573        self.disturbed.set(true);
1574
1575        // If stream.[[state]] is "closed", return a promise resolved with undefined.
1576        if self.is_closed() {
1577            return Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
1578        }
1579        // If stream.[[state]] is "errored", return a promise rejected with stream.[[storedError]].
1580        if self.is_errored() {
1581            let promise = Promise::new2(cx, global);
1582            rooted!(&in(cx) let mut rval = UndefinedValue());
1583            self.stored_error
1584                .safe_to_jsval(cx.into(), rval.handle_mut(), CanGc::from_cx(cx));
1585            promise.reject_native(&rval.handle(), CanGc::from_cx(cx));
1586            return promise;
1587        }
1588        // Perform ! ReadableStreamClose(stream).
1589        self.close(cx);
1590
1591        // If reader is not undefined and reader implements ReadableStreamBYOBReader,
1592        let byob_reader = {
1593            let reader_ref = self.reader.borrow();
1594            match reader_ref.as_ref() {
1595                Some(ReaderType::BYOB(reader)) => reader.get(),
1596                _ => None,
1597            }
1598        };
1599
1600        if let Some(reader) = byob_reader {
1601            // step 6.1, 6.2 & 6.3 of https://streams.spec.whatwg.org/#readable-stream-cancel
1602            reader.cancel(cx);
1603        }
1604
1605        // Let sourceCancelPromise be ! stream.[[controller]].[[CancelSteps]](reason).
1606
1607        let source_cancel_promise = match self.controller.borrow().as_ref() {
1608            Some(ControllerType::Default(controller)) => controller
1609                .get()
1610                .expect("Stream should have controller.")
1611                .perform_cancel_steps(cx, global, reason),
1612            Some(ControllerType::Byte(controller)) => controller
1613                .get()
1614                .expect("Stream should have controller.")
1615                .perform_cancel_steps(cx, global, reason),
1616            None => {
1617                panic!("Stream does not have a controller.");
1618            },
1619        };
1620
1621        // Create a new promise,
1622        // and setup a handler in order to react to the fulfillment of sourceCancelPromise.
1623        let global = self.global();
1624        let result_promise = Promise::new2(cx, &global);
1625        let fulfillment_handler = Box::new(SourceCancelPromiseFulfillmentHandler {
1626            result: result_promise.clone(),
1627        });
1628        let rejection_handler = Box::new(SourceCancelPromiseRejectionHandler {
1629            result: result_promise.clone(),
1630        });
1631        let handler = PromiseNativeHandler::new(
1632            &global,
1633            Some(fulfillment_handler),
1634            Some(rejection_handler),
1635            CanGc::from_cx(cx),
1636        );
1637        let mut realm = enter_auto_realm(cx, &*global);
1638        let cx = &mut realm.current_realm();
1639        source_cancel_promise.append_native_handler(cx, &handler);
1640
1641        // Return the result of reacting to sourceCancelPromise
1642        // with a fulfillment step that returns undefined.
1643        result_promise
1644    }
1645
1646    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1647    pub(crate) fn set_reader(&self, new_reader: Option<ReaderType>) {
1648        *self.reader.borrow_mut() = new_reader;
1649    }
1650
1651    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1652    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee>
1653    fn byte_tee(&self, cx: &mut JSContext) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1654        // Assert: stream implements ReadableStream.
1655        // Assert: stream.[[controller]] implements ReadableByteStreamController.
1656
1657        // Let reader be ? AcquireReadableStreamDefaultReader(stream).
1658        let reader = self.acquire_default_reader(CanGc::from_cx(cx))?;
1659        let reader = Rc::new(RefCell::new(ReaderType::Default(MutNullableDom::new(
1660            Some(&reader),
1661        ))));
1662
1663        // Let reading be false.
1664        let reading = Rc::new(Cell::new(false));
1665
1666        // Let readAgainForBranch1 be false.
1667        let read_again_for_branch_1 = Rc::new(Cell::new(false));
1668
1669        // Let readAgainForBranch2 be false.
1670        let read_again_for_branch_2 = Rc::new(Cell::new(false));
1671
1672        // Let canceled1 be false.
1673        let canceled_1 = Rc::new(Cell::new(false));
1674
1675        // Let canceled2 be false.
1676        let canceled_2 = Rc::new(Cell::new(false));
1677
1678        // Let reason1 be undefined.
1679        let reason_1 = Rc::new(Heap::default());
1680
1681        // Let reason2 be undefined.
1682        let reason_2 = Rc::new(Heap::default());
1683
1684        // Let cancelPromise be a new promise.
1685        let cancel_promise = Promise::new2(cx, &self.global());
1686        let reader_version = Rc::new(Cell::new(0));
1687
1688        let byte_tee_source_1 = ByteTeeUnderlyingSource::new(
1689            reader.clone(),
1690            self,
1691            reading.clone(),
1692            read_again_for_branch_1.clone(),
1693            read_again_for_branch_2.clone(),
1694            canceled_1.clone(),
1695            canceled_2.clone(),
1696            reason_1.clone(),
1697            reason_2.clone(),
1698            cancel_promise.clone(),
1699            reader_version.clone(),
1700            ByteTeeCancelAlgorithm::Cancel1Algorithm,
1701            ByteTeePullAlgorithm::Pull1Algorithm,
1702            CanGc::from_cx(cx),
1703        );
1704
1705        let byte_tee_source_2 = ByteTeeUnderlyingSource::new(
1706            reader.clone(),
1707            self,
1708            reading,
1709            read_again_for_branch_1,
1710            read_again_for_branch_2,
1711            canceled_1,
1712            canceled_2,
1713            reason_1,
1714            reason_2,
1715            cancel_promise,
1716            reader_version,
1717            ByteTeeCancelAlgorithm::Cancel2Algorithm,
1718            ByteTeePullAlgorithm::Pull2Algorithm,
1719            CanGc::from_cx(cx),
1720        );
1721
1722        // Set branch1 to ! CreateReadableByteStream(startAlgorithm, pull1Algorithm, cancel1Algorithm).
1723        let branch_1 = readable_byte_stream_tee(
1724            cx,
1725            &self.global(),
1726            UnderlyingSourceType::TeeByte(&byte_tee_source_1),
1727        );
1728        byte_tee_source_1.set_branch_1(&branch_1);
1729        byte_tee_source_2.set_branch_1(&branch_1);
1730
1731        // Set branch2 to ! CreateReadableByteStream(startAlgorithm, pull2Algorithm, cancel2Algorithm).
1732        let branch_2 = readable_byte_stream_tee(
1733            cx,
1734            &self.global(),
1735            UnderlyingSourceType::TeeByte(&byte_tee_source_2),
1736        );
1737        byte_tee_source_1.set_branch_2(&branch_2);
1738        byte_tee_source_2.set_branch_2(&branch_2);
1739
1740        // Perform forwardReaderError, given reader.
1741        byte_tee_source_1.forward_reader_error(cx, reader.clone());
1742        byte_tee_source_2.forward_reader_error(cx, reader);
1743
1744        // Return « branch1, branch2 ».
1745        Ok(vec![branch_1, branch_2])
1746    }
1747
1748    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee>
1749    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1750    fn default_tee(
1751        &self,
1752        cx: &mut JSContext,
1753        clone_for_branch_2: bool,
1754    ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1755        // Assert: stream implements ReadableStream.
1756
1757        // Assert: cloneForBranch2 is a boolean.
1758        let clone_for_branch_2 = Rc::new(Cell::new(clone_for_branch_2));
1759
1760        // Let reader be ? AcquireReadableStreamDefaultReader(stream).
1761        let reader = self.acquire_default_reader(CanGc::from_cx(cx))?;
1762
1763        // Let reading be false.
1764        let reading = Rc::new(Cell::new(false));
1765        // Let readAgain be false.
1766        let read_again = Rc::new(Cell::new(false));
1767        // Let canceled1 be false.
1768        let canceled_1 = Rc::new(Cell::new(false));
1769        // Let canceled2 be false.
1770        let canceled_2 = Rc::new(Cell::new(false));
1771
1772        // Let reason1 be undefined.
1773        let reason_1 = Rc::new(Heap::default());
1774        // Let reason2 be undefined.
1775        let reason_2 = Rc::new(Heap::default());
1776        // Let cancelPromise be a new promise.
1777        let cancel_promise = Promise::new2(cx, &self.global());
1778
1779        let tee_source_1 = DefaultTeeUnderlyingSource::new(
1780            &reader,
1781            self,
1782            reading.clone(),
1783            read_again.clone(),
1784            canceled_1.clone(),
1785            canceled_2.clone(),
1786            clone_for_branch_2.clone(),
1787            reason_1.clone(),
1788            reason_2.clone(),
1789            cancel_promise.clone(),
1790            DefaultTeeCancelAlgorithm::Cancel1Algorithm,
1791            CanGc::from_cx(cx),
1792        );
1793
1794        let underlying_source_type_branch_1 = UnderlyingSourceType::Tee(&tee_source_1);
1795
1796        let tee_source_2 = DefaultTeeUnderlyingSource::new(
1797            &reader,
1798            self,
1799            reading,
1800            read_again,
1801            canceled_1.clone(),
1802            canceled_2.clone(),
1803            clone_for_branch_2,
1804            reason_1,
1805            reason_2,
1806            cancel_promise.clone(),
1807            DefaultTeeCancelAlgorithm::Cancel2Algorithm,
1808            CanGc::from_cx(cx),
1809        );
1810
1811        let underlying_source_type_branch_2 = UnderlyingSourceType::Tee(&tee_source_2);
1812
1813        // Set branch_1 to ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel1Algorithm).
1814        let branch_1 = create_readable_stream(
1815            cx,
1816            &self.global(),
1817            underlying_source_type_branch_1,
1818            None,
1819            None,
1820        );
1821        tee_source_1.set_branch_1(&branch_1);
1822        tee_source_2.set_branch_1(&branch_1);
1823
1824        // Set branch_2 to ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel2Algorithm).
1825        let branch_2 = create_readable_stream(
1826            cx,
1827            &self.global(),
1828            underlying_source_type_branch_2,
1829            None,
1830            None,
1831        );
1832        tee_source_1.set_branch_2(&branch_2);
1833        tee_source_2.set_branch_2(&branch_2);
1834
1835        // Upon rejection of reader.[[closedPromise]] with reason r,
1836        reader.default_tee_append_native_handler_to_closed_promise(
1837            cx,
1838            &branch_1,
1839            &branch_2,
1840            canceled_1,
1841            canceled_2,
1842            cancel_promise,
1843        );
1844
1845        // Return « branch_1, branch_2 ».
1846        Ok(vec![branch_1, branch_2])
1847    }
1848
1849    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
1850    #[allow(clippy::too_many_arguments)]
1851    pub(crate) fn pipe_to(
1852        &self,
1853        cx: &mut CurrentRealm,
1854        global: &GlobalScope,
1855        dest: &WritableStream,
1856        prevent_close: bool,
1857        prevent_abort: bool,
1858        prevent_cancel: bool,
1859        signal: Option<&AbortSignal>,
1860    ) -> Rc<Promise> {
1861        // Assert: source implements ReadableStream.
1862        // Assert: dest implements WritableStream.
1863        // Assert: prevent_close, prevent_abort, and prevent_cancel are all booleans.
1864        // Done with method signature types.
1865
1866        // If signal was not given, let signal be undefined.
1867        // Assert: either signal is undefined, or signal implements AbortSignal.
1868        // Note: done with the `signal` argument.
1869
1870        // Assert: ! IsReadableStreamLocked(source) is false.
1871        assert!(!self.is_locked());
1872
1873        // Assert: ! IsWritableStreamLocked(dest) is false.
1874        assert!(!dest.is_locked());
1875
1876        // If source.[[controller]] implements ReadableByteStreamController,
1877        // let reader be either ! AcquireReadableStreamBYOBReader(source)
1878        // or ! AcquireReadableStreamDefaultReader(source),
1879        // at the user agent’s discretion.
1880        // Note: for now only using default readers.
1881
1882        // Otherwise, let reader be ! AcquireReadableStreamDefaultReader(source).
1883        let reader = self
1884            .acquire_default_reader(CanGc::from_cx(cx))
1885            .expect("Acquiring a default reader for pipe_to cannot fail");
1886
1887        // Let writer be ! AcquireWritableStreamDefaultWriter(dest).
1888        let writer = dest
1889            .aquire_default_writer(cx.into(), global, CanGc::from_cx(cx))
1890            .expect("Acquiring a default writer for pipe_to cannot fail");
1891
1892        // Set source.[[disturbed]] to true.
1893        self.disturbed.set(true);
1894
1895        // Let shuttingDown be false.
1896        // Done below with default.
1897
1898        // Let promise be a new promise.
1899        let promise = Promise::new2(cx, global);
1900
1901        // In parallel, but not really, using reader and writer, read all chunks from source and write them to dest.
1902        rooted!(&in(cx) let pipe_to = PipeTo {
1903            reader: Dom::from_ref(&reader),
1904            writer: Dom::from_ref(&writer),
1905            pending_writes: Default::default(),
1906            state: Default::default(),
1907            prevent_abort,
1908            prevent_cancel,
1909            prevent_close,
1910            shutting_down: Default::default(),
1911            abort_reason: Default::default(),
1912            shutdown_error: Default::default(),
1913            shutdown_action_promise:  Default::default(),
1914            result_promise: promise.clone(),
1915        });
1916
1917        // If signal is not undefined,
1918        // Note: moving the steps to here, so that the `PipeTo` is available.
1919        if let Some(signal) = signal {
1920            // Let abortAlgorithm be the following steps:
1921            // Note: steps are implemented at call site.
1922            rooted!(&in(cx) let abort_algorithm = AbortAlgorithm::StreamPiping(pipe_to.clone()));
1923
1924            // If signal is aborted, perform abortAlgorithm and return promise.
1925            if signal.aborted() {
1926                signal.run_abort_algorithm(cx, global, &abort_algorithm);
1927                return promise;
1928            }
1929
1930            // Add abortAlgorithm to signal.
1931            signal.add(&abort_algorithm);
1932        }
1933
1934        // Note: perfom checks now, since streams can start as closed or errored.
1935        pipe_to.check_and_propagate_errors_forward(cx, global);
1936        pipe_to.check_and_propagate_errors_backward(cx, global);
1937        pipe_to.check_and_propagate_closing_forward(cx, global);
1938        pipe_to.check_and_propagate_closing_backward(cx, global);
1939
1940        // If we are not closed or errored,
1941        if *pipe_to.state.borrow() == PipeToState::Starting {
1942            // Start the pipe, by waiting on the writer being ready for a chunk.
1943            pipe_to.wait_for_writer_ready(cx, global);
1944        }
1945
1946        // Return promise.
1947        promise
1948    }
1949
1950    /// <https://streams.spec.whatwg.org/#readable-stream-tee>
1951    pub(crate) fn tee(
1952        &self,
1953        cx: &mut JSContext,
1954        clone_for_branch_2: bool,
1955    ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1956        // Assert: stream implements ReadableStream.
1957        // Assert: cloneForBranch2 is a boolean.
1958
1959        match self.controller.borrow().as_ref() {
1960            Some(ControllerType::Default(_)) => {
1961                // Return ? ReadableStreamDefaultTee(stream, cloneForBranch2).
1962                self.default_tee(cx, clone_for_branch_2)
1963            },
1964            Some(ControllerType::Byte(_)) => {
1965                // If stream.[[controller]] implements ReadableByteStreamController,
1966                // return ? ReadableByteStreamTee(stream).
1967                self.byte_tee(cx)
1968            },
1969            None => {
1970                unreachable!("Stream should have a controller.");
1971            },
1972        }
1973    }
1974
1975    /// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller-from-underlying-source>
1976    fn set_up_byte_controller(
1977        &self,
1978        cx: &mut JSContext,
1979        global: &GlobalScope,
1980        underlying_source_dict: JsUnderlyingSource,
1981        underlying_source_handle: SafeHandleObject,
1982        stream: DomRoot<ReadableStream>,
1983        strategy_hwm: f64,
1984    ) -> Fallible<()> {
1985        // Let pullAlgorithm be an algorithm that returns a promise resolved with undefined.
1986        // Let cancelAlgorithm be an algorithm that returns a promise resolved with undefined.
1987        // If underlyingSourceDict["start"] exists, then set startAlgorithm to an algorithm which returns the result
1988        // of invoking underlyingSourceDict["start"] with argument list « controller »
1989        // and callback this value underlyingSource.
1990        // If underlyingSourceDict["pull"] exists, then set pullAlgorithm to an algorithm which returns the result
1991        // of invoking underlyingSourceDict["pull"] with argument list « controller »
1992        // and callback this value underlyingSource.
1993        // If underlyingSourceDict["cancel"] exists, then set cancelAlgorithm to an algorithm which takes an
1994        // argument reason and returns the result of invoking underlyingSourceDict["cancel"] with argument list
1995        // « reason » and callback this value underlyingSource.
1996
1997        // Let autoAllocateChunkSize be underlyingSourceDict["autoAllocateChunkSize"],
1998        // if it exists, or undefined otherwise.
1999        // If autoAllocateChunkSize is 0, then throw a TypeError exception.
2000        if let Some(0) = underlying_source_dict.autoAllocateChunkSize {
2001            return Err(Error::Type(c"autoAllocateChunkSize cannot be 0".to_owned()));
2002        }
2003
2004        let controller = ReadableByteStreamController::new(
2005            UnderlyingSourceType::Js(underlying_source_dict),
2006            strategy_hwm,
2007            global,
2008            CanGc::from_cx(cx),
2009        );
2010
2011        // Note: this must be done before `setup`,
2012        // otherwise `thisOb` is null in the start callback.
2013        controller.set_underlying_source_this_object(underlying_source_handle);
2014
2015        // Perform ? SetUpReadableByteStreamController(stream, controller, startAlgorithm,
2016        // pullAlgorithm, cancelAlgorithm, highWaterMark, autoAllocateChunkSize).
2017        controller.setup(cx, global, stream)
2018    }
2019
2020    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2021    pub(crate) fn setup_cross_realm_transform_readable(
2022        &self,
2023        cx: &mut JSContext,
2024        port: &MessagePort,
2025    ) {
2026        let port_id = port.message_port_id();
2027        let global = self.global();
2028
2029        // Perform ! InitializeReadableStream(stream).
2030        // Done in `new_inherited`.
2031
2032        // Let sizeAlgorithm be an algorithm that returns 1.
2033        let size_algorithm =
2034            extract_size_algorithm(&QueuingStrategy::default(), CanGc::from_cx(cx));
2035
2036        // Note: other algorithms defined in the underlying source container.
2037
2038        // Let controller be a new ReadableStreamDefaultController.
2039        let controller = ReadableStreamDefaultController::new(
2040            &self.global(),
2041            UnderlyingSourceType::Transfer(port),
2042            0.,
2043            size_algorithm,
2044            CanGc::from_cx(cx),
2045        );
2046
2047        // Add a handler for port’s message event with the following steps:
2048        // Add a handler for port’s messageerror event with the following steps:
2049        rooted!(&in(cx) let cross_realm_transform_readable = CrossRealmTransformReadable {
2050            controller: Dom::from_ref(&controller),
2051        });
2052        global.note_cross_realm_transform_readable(&cross_realm_transform_readable, port_id);
2053
2054        // Enable port’s port message queue.
2055        port.Start(cx);
2056
2057        // Perform ! SetUpReadableStreamDefaultController
2058        controller
2059            .setup(cx, DomRoot::from_ref(self))
2060            .expect("Setting up controller for transfer cannot fail.");
2061    }
2062}
2063
2064impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
2065    /// <https://streams.spec.whatwg.org/#rs-constructor>
2066    fn Constructor(
2067        cx: &mut JSContext,
2068        global: &GlobalScope,
2069        proto: Option<SafeHandleObject>,
2070        underlying_source: Option<*mut JSObject>,
2071        strategy: &QueuingStrategy,
2072    ) -> Fallible<DomRoot<Self>> {
2073        // If underlyingSource is missing, set it to null.
2074        rooted!(&in(cx) let underlying_source_obj = underlying_source.unwrap_or(ptr::null_mut()));
2075        // Let underlyingSourceDict be underlyingSource,
2076        // converted to an IDL value of type UnderlyingSource.
2077        let underlying_source_dict = if !underlying_source_obj.is_null() {
2078            rooted!(&in(cx) let obj_val = ObjectValue(underlying_source_obj.get()));
2079            match JsUnderlyingSource::new(cx.into(), obj_val.handle(), CanGc::from_cx(cx)) {
2080                Ok(ConversionResult::Success(val)) => val,
2081                Ok(ConversionResult::Failure(error)) => {
2082                    return Err(Error::Type(error.into_owned()));
2083                },
2084                _ => {
2085                    return Err(Error::JSFailed);
2086                },
2087            }
2088        } else {
2089            JsUnderlyingSource::empty()
2090        };
2091
2092        // Perform ! InitializeReadableStream(this).
2093        let stream = ReadableStream::new_with_proto(global, proto, CanGc::from_cx(cx));
2094
2095        if underlying_source_dict.type_.is_some() {
2096            // If strategy["size"] exists, throw a RangeError exception.
2097            if strategy.size.is_some() {
2098                return Err(Error::Range(
2099                    c"size is not supported for byte streams".to_owned(),
2100                ));
2101            }
2102
2103            // Let highWaterMark be ? ExtractHighWaterMark(strategy, 0).
2104            let strategy_hwm = extract_high_water_mark(strategy, 0.0)?;
2105
2106            // Perform ? SetUpReadableByteStreamControllerFromUnderlyingSource(this,
2107            // underlyingSource, underlyingSourceDict, highWaterMark).
2108            stream.set_up_byte_controller(
2109                cx,
2110                global,
2111                underlying_source_dict,
2112                underlying_source_obj.handle(),
2113                stream.clone(),
2114                strategy_hwm,
2115            )?;
2116        } else {
2117            // Let highWaterMark be ? ExtractHighWaterMark(strategy, 1).
2118            let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
2119
2120            // Let sizeAlgorithm be ! ExtractSizeAlgorithm(strategy).
2121            let size_algorithm = extract_size_algorithm(strategy, CanGc::from_cx(cx));
2122
2123            let controller = ReadableStreamDefaultController::new(
2124                global,
2125                UnderlyingSourceType::Js(underlying_source_dict),
2126                high_water_mark,
2127                size_algorithm,
2128                CanGc::from_cx(cx),
2129            );
2130
2131            // Note: this must be done before `setup`,
2132            // otherwise `thisOb` is null in the start callback.
2133            controller.set_underlying_source_this_object(underlying_source_obj.handle());
2134
2135            // Perform ? SetUpReadableStreamDefaultControllerFromUnderlyingSource
2136            controller.setup(cx, stream.clone())?;
2137        };
2138
2139        Ok(stream)
2140    }
2141
2142    /// <https://streams.spec.whatwg.org/#rs-locked>
2143    fn Locked(&self) -> bool {
2144        self.is_locked()
2145    }
2146
2147    /// <https://streams.spec.whatwg.org/#rs-cancel>
2148    fn Cancel(&self, cx: &mut JSContext, reason: SafeHandleValue) -> Rc<Promise> {
2149        let global = self.global();
2150        if self.is_locked() {
2151            // If ! IsReadableStreamLocked(this) is true,
2152            // return a promise rejected with a TypeError exception.
2153            let promise = Promise::new2(cx, &global);
2154            promise.reject_error(
2155                Error::Type(c"stream is locked".to_owned()),
2156                CanGc::from_cx(cx),
2157            );
2158            promise
2159        } else {
2160            // Return ! ReadableStreamCancel(this, reason).
2161            self.cancel(cx, &global, reason)
2162        }
2163    }
2164
2165    /// <https://streams.spec.whatwg.org/#rs-get-reader>
2166    fn GetReader(
2167        &self,
2168        options: &ReadableStreamGetReaderOptions,
2169        can_gc: CanGc,
2170    ) -> Fallible<ReadableStreamReader> {
2171        // 1, If options["mode"] does not exist, return ? AcquireReadableStreamDefaultReader(this).
2172        if options.mode.is_none() {
2173            return Ok(ReadableStreamReader::ReadableStreamDefaultReader(
2174                self.acquire_default_reader(can_gc)?,
2175            ));
2176        }
2177        // 2. Assert: options["mode"] is "byob".
2178        assert!(options.mode.unwrap() == ReadableStreamReaderMode::Byob);
2179
2180        // 3. Return ? AcquireReadableStreamBYOBReader(this).
2181        Ok(ReadableStreamReader::ReadableStreamBYOBReader(
2182            self.acquire_byob_reader(can_gc)?,
2183        ))
2184    }
2185
2186    /// <https://streams.spec.whatwg.org/#rs-tee>
2187    fn Tee(&self, cx: &mut JSContext) -> Fallible<Vec<DomRoot<ReadableStream>>> {
2188        // Return ? ReadableStreamTee(this, false).
2189        self.tee(cx, false)
2190    }
2191
2192    /// <https://streams.spec.whatwg.org/#rs-pipe-to>
2193    fn PipeTo(
2194        &self,
2195        cx: &mut CurrentRealm,
2196        destination: &WritableStream,
2197        options: &StreamPipeOptions,
2198    ) -> Rc<Promise> {
2199        let global = self.global();
2200
2201        // If ! IsReadableStreamLocked(this) is true,
2202        if self.is_locked() {
2203            // return a promise rejected with a TypeError exception.
2204            let promise = Promise::new2(cx, &global);
2205            promise.reject_error(
2206                Error::Type(c"Source stream is locked".to_owned()),
2207                CanGc::from_cx(cx),
2208            );
2209            return promise;
2210        }
2211
2212        // If ! IsWritableStreamLocked(destination) is true,
2213        if destination.is_locked() {
2214            // return a promise rejected with a TypeError exception.
2215            let promise = Promise::new2(cx, &global);
2216            promise.reject_error(
2217                Error::Type(c"Destination stream is locked".to_owned()),
2218                CanGc::from_cx(cx),
2219            );
2220            return promise;
2221        }
2222
2223        // Let signal be options["signal"] if it exists, or undefined otherwise.
2224        let signal = options.signal.as_deref();
2225
2226        // Return ! ReadableStreamPipeTo.
2227        self.pipe_to(
2228            cx,
2229            &global,
2230            destination,
2231            options.preventClose,
2232            options.preventAbort,
2233            options.preventCancel,
2234            signal,
2235        )
2236    }
2237
2238    /// <https://streams.spec.whatwg.org/#rs-pipe-through>
2239    fn PipeThrough(
2240        &self,
2241        cx: &mut CurrentRealm,
2242        transform: &ReadableWritablePair,
2243        options: &StreamPipeOptions,
2244    ) -> Fallible<DomRoot<ReadableStream>> {
2245        let global = self.global();
2246
2247        // If ! IsReadableStreamLocked(this) is true, throw a TypeError exception.
2248        if self.is_locked() {
2249            return Err(Error::Type(c"Source stream is locked".to_owned()));
2250        }
2251
2252        // If ! IsWritableStreamLocked(transform["writable"]) is true, throw a TypeError exception.
2253        if transform.writable.is_locked() {
2254            return Err(Error::Type(c"Destination stream is locked".to_owned()));
2255        }
2256
2257        // Let signal be options["signal"] if it exists, or undefined otherwise.
2258        let signal = options.signal.as_deref();
2259
2260        // Let promise be ! ReadableStreamPipeTo(this, transform["writable"],
2261        // options["preventClose"], options["preventAbort"], options["preventCancel"], signal).
2262        let promise = self.pipe_to(
2263            cx,
2264            &global,
2265            &transform.writable,
2266            options.preventClose,
2267            options.preventAbort,
2268            options.preventCancel,
2269            signal,
2270        );
2271
2272        // Set promise.[[PromiseIsHandled]] to true.
2273        promise.set_promise_is_handled();
2274
2275        // Return transform["readable"].
2276        Ok(transform.readable.clone())
2277    }
2278}
2279
2280#[expect(unsafe_code)]
2281/// The initial steps for the message handler for both readable and writable cross realm transforms.
2282/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2283/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
2284pub(crate) unsafe fn get_type_and_value_from_message(
2285    cx: SafeJSContext,
2286    data: SafeHandleValue,
2287    value: SafeMutableHandleValue,
2288    can_gc: CanGc,
2289) -> DOMString {
2290    // Let data be the data of the message.
2291    // Note: we are passed the data as argument,
2292    // which originates in the return value of `structuredclone::read`.
2293
2294    // Assert: data is an Object.
2295    assert!(data.is_object());
2296    rooted!(in(*cx) let data_object = data.to_object());
2297
2298    // Let type be ! Get(data, "type").
2299    rooted!(in(*cx) let mut type_ = UndefinedValue());
2300    unsafe {
2301        get_dictionary_property(
2302            *cx,
2303            data_object.handle(),
2304            c"type",
2305            type_.handle_mut(),
2306            can_gc,
2307        )
2308    }
2309    .expect("Getting the type should not fail.");
2310
2311    // Let value be ! Get(data, "value").
2312    unsafe { get_dictionary_property(*cx, data_object.handle(), c"value", value, can_gc) }
2313        .expect("Getting the value should not fail.");
2314
2315    // Assert: type is a String.
2316    let result =
2317        DOMString::safe_from_jsval(cx, type_.handle(), StringificationBehavior::Empty, can_gc)
2318            .expect("The type of the message should be a string");
2319    let ConversionResult::Success(type_string) = result else {
2320        unreachable!("The type of the message should be a string");
2321    };
2322
2323    type_string
2324}
2325
2326impl js::gc::Rootable for CrossRealmTransformReadable {}
2327
2328/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2329/// A wrapper to handle `message` and `messageerror` events
2330/// for the port used by the transfered stream.
2331#[derive(Clone, JSTraceable, MallocSizeOf)]
2332#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
2333pub(crate) struct CrossRealmTransformReadable {
2334    /// The controller used in the algorithm.
2335    controller: Dom<ReadableStreamDefaultController>,
2336}
2337
2338impl CrossRealmTransformReadable {
2339    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2340    /// Add a handler for port’s message event with the following steps:
2341    #[expect(unsafe_code)]
2342    pub(crate) fn handle_message(
2343        &self,
2344        cx: &mut CurrentRealm,
2345        global: &GlobalScope,
2346        port: &MessagePort,
2347        message: SafeHandleValue,
2348    ) {
2349        rooted!(&in(cx) let mut value = UndefinedValue());
2350        let type_string = unsafe {
2351            get_type_and_value_from_message(
2352                cx.into(),
2353                message,
2354                value.handle_mut(),
2355                CanGc::from_cx(cx),
2356            )
2357        };
2358
2359        // If type is "chunk",
2360        if type_string == "chunk" {
2361            // Perform ! ReadableStreamDefaultControllerEnqueue(controller, value).
2362            self.controller
2363                .enqueue(cx, value.handle())
2364                .expect("Enqueing a chunk should not fail.");
2365        }
2366
2367        // Otherwise, if type is "close",
2368        if type_string == "close" {
2369            // Perform ! ReadableStreamDefaultControllerClose(controller).
2370            self.controller.close(cx);
2371
2372            // Disentangle port.
2373            global.disentangle_port(cx, port);
2374        }
2375
2376        // Otherwise, if type is "error",
2377        if type_string == "error" {
2378            // Perform ! ReadableStreamDefaultControllerError(controller, value).
2379            self.controller.error(cx, value.handle());
2380
2381            // Disentangle port.
2382            global.disentangle_port(cx, port);
2383        }
2384    }
2385
2386    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
2387    /// Add a handler for port’s messageerror event with the following steps:
2388    pub(crate) fn handle_error(
2389        &self,
2390        cx: &mut CurrentRealm,
2391        global: &GlobalScope,
2392        port: &MessagePort,
2393    ) {
2394        // Let error be a new "DataCloneError" DOMException.
2395        let error = DOMException::new(global, DOMErrorName::DataCloneError, CanGc::from_cx(cx));
2396        rooted!(&in(cx) let mut rooted_error = UndefinedValue());
2397        error.safe_to_jsval(cx.into(), rooted_error.handle_mut(), CanGc::from_cx(cx));
2398
2399        // Perform ! CrossRealmTransformSendError(port, error).
2400        port.cross_realm_transform_send_error(cx, rooted_error.handle());
2401
2402        // Perform ! ReadableStreamDefaultControllerError(controller, error).
2403        self.controller.error(cx, rooted_error.handle());
2404
2405        // Disentangle port.
2406        global.disentangle_port(cx, port);
2407    }
2408}
2409
2410#[expect(unsafe_code)]
2411/// Get the `done` property of an object that a read promise resolved to.
2412pub(crate) fn get_read_promise_done(
2413    cx: SafeJSContext,
2414    v: &SafeHandleValue,
2415    can_gc: CanGc,
2416) -> Result<bool, Error> {
2417    if !v.is_object() {
2418        return Err(Error::Type(c"Unknown format for done property.".to_owned()));
2419    }
2420    unsafe {
2421        rooted!(in(*cx) let object = v.to_object());
2422        rooted!(in(*cx) let mut done = UndefinedValue());
2423        match get_dictionary_property(*cx, object.handle(), c"done", done.handle_mut(), can_gc) {
2424            Ok(true) => match bool::safe_from_jsval(cx, done.handle(), (), can_gc) {
2425                Ok(ConversionResult::Success(val)) => Ok(val),
2426                Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.into_owned())),
2427                _ => Err(Error::Type(c"Unknown format for done property.".to_owned())),
2428            },
2429            Ok(false) => Err(Error::Type(c"Promise has no done property.".to_owned())),
2430            Err(()) => Err(Error::JSFailed),
2431        }
2432    }
2433}
2434
2435#[expect(unsafe_code)]
2436/// Get the `value` property of an object that a read promise resolved to.
2437pub(crate) fn get_read_promise_bytes(
2438    cx: SafeJSContext,
2439    v: &SafeHandleValue,
2440    can_gc: CanGc,
2441) -> Result<Vec<u8>, Error> {
2442    if !v.is_object() {
2443        return Err(Error::Type(
2444            c"Unknown format for for bytes read.".to_owned(),
2445        ));
2446    }
2447    unsafe {
2448        rooted!(in(*cx) let object = v.to_object());
2449        rooted!(in(*cx) let mut bytes = UndefinedValue());
2450        match get_dictionary_property(*cx, object.handle(), c"value", bytes.handle_mut(), can_gc) {
2451            Ok(true) => {
2452                match Vec::<u8>::safe_from_jsval(
2453                    cx,
2454                    bytes.handle(),
2455                    ConversionBehavior::EnforceRange,
2456                    can_gc,
2457                ) {
2458                    Ok(ConversionResult::Success(val)) => Ok(val),
2459                    Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.into_owned())),
2460                    _ => Err(Error::Type(c"Unknown format for bytes read.".to_owned())),
2461                }
2462            },
2463            Ok(false) => Err(Error::Type(c"Promise has no value property.".to_owned())),
2464            Err(()) => Err(Error::JSFailed),
2465        }
2466    }
2467}
2468
2469/// Convert a raw stream `chunk` JS value to `Vec<u8>`.
2470/// This mirrors the conversion used inside `get_read_promise_bytes`,
2471/// but operates on the raw chunk (no `{ value, done }` wrapper).
2472pub(crate) fn bytes_from_chunk_jsval(
2473    cx: SafeJSContext,
2474    chunk: &RootedTraceableBox<Heap<JSVal>>,
2475    can_gc: CanGc,
2476) -> Result<Vec<u8>, Error> {
2477    match Vec::<u8>::safe_from_jsval(cx, chunk.handle(), ConversionBehavior::EnforceRange, can_gc) {
2478        Ok(ConversionResult::Success(vec)) => Ok(vec),
2479        Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.into_owned())),
2480        _ => Err(Error::Type(c"Unknown format for bytes read.".to_owned())),
2481    }
2482}
2483
2484/// <https://streams.spec.whatwg.org/#rs-transfer>
2485impl Transferable for ReadableStream {
2486    type Index = MessagePortIndex;
2487    type Data = MessagePortImpl;
2488
2489    /// <https://streams.spec.whatwg.org/#ref-for-transfer-steps>
2490    fn transfer(&self, cx: &mut JSContext) -> Fallible<(MessagePortId, MessagePortImpl)> {
2491        // Step 1. If ! IsReadableStreamLocked(value) is true, throw a
2492        // "DataCloneError" DOMException.
2493        if self.is_locked() {
2494            return Err(Error::DataClone(None));
2495        }
2496
2497        let global = self.global();
2498        let mut realm = enter_auto_realm(cx, &*global);
2499        let mut realm = realm.current_realm();
2500        let cx = &mut realm;
2501
2502        // Step 2. Let port1 be a new MessagePort in the current Realm.
2503        let port_1 = MessagePort::new(&global, CanGc::from_cx(cx));
2504        global.track_message_port(&port_1, None);
2505
2506        // Step 3. Let port2 be a new MessagePort in the current Realm.
2507        let port_2 = MessagePort::new(&global, CanGc::from_cx(cx));
2508        global.track_message_port(&port_2, None);
2509
2510        // Step 4. Entangle port1 and port2.
2511        global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
2512
2513        // Step 5. Let writable be a new WritableStream in the current Realm.
2514        let writable = WritableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
2515
2516        // Step 6. Perform ! SetUpCrossRealmTransformWritable(writable, port1).
2517        writable.setup_cross_realm_transform_writable(cx, &port_1);
2518
2519        // Step 7. Let promise be ! ReadableStreamPipeTo(value, writable, false, false, false).
2520        let promise = self.pipe_to(cx, &global, &writable, false, false, false, None);
2521
2522        // Step 8. Set promise.[[PromiseIsHandled]] to true.
2523        promise.set_promise_is_handled();
2524
2525        // Step 9. Set dataHolder.[[port]] to ! StructuredSerializeWithTransfer(port2, « port2 »).
2526        port_2.transfer(cx)
2527    }
2528
2529    /// <https://streams.spec.whatwg.org/#ref-for-transfer-receiving-steps>
2530    fn transfer_receive(
2531        cx: &mut JSContext,
2532        owner: &GlobalScope,
2533        id: MessagePortId,
2534        port_impl: MessagePortImpl,
2535    ) -> Result<DomRoot<Self>, ()> {
2536        // Their transfer-receiving steps, given dataHolder and value, are:
2537        // Note: dataHolder is used in `structuredclone.rs`, and value is created here.
2538        let value = ReadableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
2539
2540        // Step 1. Let deserializedRecord be !
2541        // StructuredDeserializeWithTransfer(dataHolder.[[port]], the current
2542        // Realm).
2543        // Done with the `Deserialize` derive of `MessagePortImpl`.
2544
2545        // Step 2. Let port be deserializedRecord.[[Deserialized]].
2546        let transferred_port = MessagePort::transfer_receive(cx, owner, id, port_impl)?;
2547
2548        // Step 3. Perform ! SetUpCrossRealmTransformReadable(value, port).
2549        value.setup_cross_realm_transform_readable(cx, &transferred_port);
2550        Ok(value)
2551    }
2552
2553    /// Note: we are relying on the port transfer, so the data returned here are related to the port.
2554    fn serialized_storage<'a>(
2555        data: StructuredData<'a, '_>,
2556    ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
2557        match data {
2558            StructuredData::Reader(r) => &mut r.port_impls,
2559            StructuredData::Writer(w) => &mut w.ports,
2560        }
2561    }
2562}