Skip to main content

script/dom/stream/
readablestream.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr::{self};
8use std::rc::Rc;
9
10use servo_base::generic_channel::GenericSharedMemory;
11use servo_base::id::{MessagePortId, MessagePortIndex};
12use servo_constellation_traits::MessagePortImpl;
13use dom_struct::dom_struct;
14use js::context::JSContext;
15use js::conversions::{FromJSValConvertible, ToJSValConvertible};
16use js::jsapi::{Heap, JSObject};
17use js::jsval::{JSVal, ObjectValue, UndefinedValue};
18use js::realm::CurrentRealm;
19use js::rust::{
20    HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
21    MutableHandleValue as SafeMutableHandleValue,
22};
23use js::typedarray::{ArrayBufferViewU8, Uint8};
24use rustc_hash::FxHashMap;
25
26use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
27use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{
28    ReadableStreamGetReaderOptions, ReadableStreamMethods, ReadableStreamReaderMode,
29    ReadableWritablePair, StreamPipeOptions,
30};
31use script_bindings::str::DOMString;
32
33use crate::dom::domexception::{DOMErrorName, DOMException};
34use script_bindings::conversions::{is_array_like, StringificationBehavior};
35use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
36use crate::dom::abortsignal::{AbortAlgorithm, AbortSignal};
37use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods;
38use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultController_Binding::ReadableStreamDefaultControllerMethods;
39use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource;
40use crate::dom::bindings::conversions::{ConversionBehavior, ConversionResult, get_property, get_property_jsval};
41use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
42use crate::dom::bindings::codegen::GenericBindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriter_Binding::WritableStreamDefaultWriterMethods;
43use crate::dom::stream::writablestream::WritableStream;
44use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultReaderOrReadableStreamBYOBReader as ReadableStreamReader;
45use crate::dom::bindings::reflector::DomGlobal;
46use script_bindings::reflector::{Reflector, reflect_dom_object_with_proto};
47use crate::dom::bindings::root::{DomRoot, MutNullableDom, Dom};
48use crate::dom::bindings::trace::RootedTraceableBox;
49use crate::dom::stream::byteteeunderlyingsource::{ByteTeeCancelAlgorithm, ByteTeePullAlgorithm, ByteTeeUnderlyingSource};
50use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
51use crate::dom::stream::readablestreamgenericreader::ReadableStreamGenericReader;
52use crate::dom::globalscope::GlobalScope;
53use crate::dom::promise::{wait_for_all_promise, Promise};
54use crate::dom::stream::readablebytestreamcontroller::ReadableByteStreamController;
55use crate::dom::stream::readablestreambyobreader::ReadableStreamBYOBReader;
56use crate::dom::stream::readablestreamdefaultcontroller::ReadableStreamDefaultController;
57use crate::dom::stream::readablestreamdefaultreader::{ReadRequest, ReadableStreamDefaultReader};
58use crate::dom::stream::defaultteeunderlyingsource::DefaultTeeCancelAlgorithm;
59use crate::dom::types::DefaultTeeUnderlyingSource;
60use crate::dom::stream::underlyingsourcecontainer::UnderlyingSourceType;
61use crate::dom::stream::writablestreamdefaultwriter::WritableStreamDefaultWriter;
62use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
63use crate::dom::messageport::MessagePort;
64use crate::realms::{enter_auto_realm};
65use crate::script_runtime::CanGc;
66use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
67use crate::dom::bindings::transferable::Transferable;
68use crate::dom::bindings::structuredclone::StructuredData;
69
70use crate::dom::bindings::buffer_source::{BufferSource, HeapBufferSource, create_buffer_source};
71use super::readablestreambyobreader::ReadIntoRequest;
72
73/// State Machine for `PipeTo`.
74#[derive(Clone, Debug, Default, MallocSizeOf, PartialEq)]
75enum PipeToState {
76    /// The starting state
77    #[default]
78    Starting,
79    /// Waiting for the writer to be ready
80    PendingReady,
81    /// Waiting for a read to resolve.
82    PendingRead,
83    /// Waiting for all pending writes to finish,
84    /// as part of shutting down with an optional action.
85    ShuttingDownWithPendingWrites(Option<ShutdownAction>),
86    /// When shutting down with an action,
87    /// waiting for the action to complete,
88    /// at which point we can `finalize`.
89    ShuttingDownPendingAction,
90    /// The pipe has been finalized,
91    /// no further actions should be performed.
92    Finalized,
93}
94
95/// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action>
96#[derive(Clone, Debug, MallocSizeOf, PartialEq)]
97enum ShutdownAction {
98    /// <https://streams.spec.whatwg.org/#writable-stream-abort>
99    WritableStreamAbort,
100    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>
101    ReadableStreamCancel,
102    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close-with-error-propagation>
103    WritableStreamDefaultWriterCloseWithErrorPropagation,
104    /// <https://streams.spec.whatwg.org/#ref-for-rs-pipeTo-shutdown-with-action>
105    Abort,
106}
107
108impl js::gc::Rootable for PipeTo {}
109
110/// The "in parallel, but not really" part of
111/// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
112///
113/// Note: the spec is flexible about how this is done, but requires the following constraints to apply:
114/// - Public API must not be used: we'll only use Rust.
115/// - Backpressure must be enforced: we'll only read from source when dest is ready.
116/// - Shutdown must stop activity: we'll do this together with the below.
117/// - Error and close states must be propagated: we'll do this by checking these states at every step.
118#[derive(Clone, JSTraceable, MallocSizeOf)]
119#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
120pub(crate) struct PipeTo {
121    /// <https://streams.spec.whatwg.org/#ref-for-readablestream%E2%91%A7%E2%91%A0>
122    reader: Dom<ReadableStreamDefaultReader>,
123
124    /// <https://streams.spec.whatwg.org/#ref-for-acquire-writable-stream-default-writer>
125    writer: Dom<WritableStreamDefaultWriter>,
126
127    /// Pending writes are needed when shutting down(with an action),
128    /// because we can only finalize when all writes are finished.
129    #[ignore_malloc_size_of = "nested Rc"]
130    pending_writes: Rc<RefCell<VecDeque<Rc<Promise>>>>,
131
132    /// The state machine.
133    #[conditional_malloc_size_of]
134    #[no_trace]
135    state: Rc<RefCell<PipeToState>>,
136
137    /// <https://streams.spec.whatwg.org/#readablestream-pipe-to-preventabort>
138    prevent_abort: bool,
139
140    /// <https://streams.spec.whatwg.org/#readablestream-pipe-to-preventcancel>
141    prevent_cancel: bool,
142
143    /// <https://streams.spec.whatwg.org/#readablestream-pipe-to-preventclose>
144    prevent_close: bool,
145
146    /// The `shuttingDown` variable of
147    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
148    #[conditional_malloc_size_of]
149    shutting_down: Rc<Cell<bool>>,
150
151    /// The abort reason of the abort signal,
152    /// stored here because we must keep it across a microtask.
153    #[ignore_malloc_size_of = "mozjs"]
154    abort_reason: Rc<Heap<JSVal>>,
155
156    /// The error potentially passed to shutdown,
157    /// stored here because we must keep it across a microtask.
158    #[ignore_malloc_size_of = "mozjs"]
159    shutdown_error: Rc<RefCell<Option<Heap<JSVal>>>>,
160
161    /// The promise returned by a shutdown action.
162    /// We keep it to only continue when it is not pending anymore.
163    #[ignore_malloc_size_of = "nested Rc"]
164    shutdown_action_promise: Rc<RefCell<Option<Rc<Promise>>>>,
165
166    /// The promise resolved or rejected at
167    /// <https://streams.spec.whatwg.org/#rs-pipeTo-finalize>
168    #[conditional_malloc_size_of]
169    result_promise: Rc<Promise>,
170}
171
172impl PipeTo {
173    /// Run the `abortAlgorithm` defined at
174    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
175    pub(crate) fn abort_with_reason(
176        &self,
177        cx: &mut CurrentRealm,
178        global: &GlobalScope,
179        reason: SafeHandleValue,
180    ) {
181        // Abort should do nothing if we are already shutting down.
182        if self.shutting_down.get() {
183            return;
184        }
185
186        // Let error be signal’s abort reason.
187        // Note: storing it because it may need to be kept across a microtask,
188        // and see the note below as to why it is kept separately from `shutdown_error`.
189        self.abort_reason.set(reason.get());
190
191        // Note: setting the error now,
192        // will result in a rejection of the pipe promise, with this error.
193        // Unless any shutdown action raise their own error,
194        // in which case this error will be overwritten by the shutdown action error.
195        self.set_shutdown_error(reason);
196
197        // Let actions be an empty ordered set.
198        // Note: the actions are defined, and performed, inside `shutdown_with_an_action`.
199
200        // Shutdown with an action consisting of getting a promise to wait for all of the actions in actions,
201        // and with error.
202        self.shutdown(cx, global, Some(ShutdownAction::Abort));
203    }
204}
205
206impl Callback for PipeTo {
207    /// The pipe makes progress one microtask at a time.
208    /// Note: we use one struct as the callback for all promises,
209    /// and for both of their reactions.
210    ///
211    /// The context of the callback is determined from:
212    /// - the current state.
213    /// - the type of `result`.
214    /// - the state of a stored promise(in some cases).
215    fn callback(&self, cx: &mut CurrentRealm, result: SafeHandleValue) {
216        let global = self.reader.global();
217
218        // Note: we only care about the result of writes when they are rejected,
219        // and the error is accessed not through handlers,
220        // but directly using `dest.get_stored_error`.
221        // So we must mark rejected promises as handled
222        // to prevent unhandled rejection errors.
223        self.pending_writes.borrow_mut().retain(|p| {
224            let pending = p.is_pending();
225            if !pending {
226                p.set_promise_is_handled();
227            }
228            pending
229        });
230
231        // Note: cloning to prevent re-borrow in methods called below.
232        let state_before_checks = self.state.borrow().clone();
233
234        // Note: if we are in a `PendingRead` state,
235        // and the source is closed,
236        // we try to write chunks before doing any shutdown,
237        // which is necessary to implement the
238        // "If any chunks have been read but not yet written, write them to dest."
239        // part of shutdown.
240        if state_before_checks == PipeToState::PendingRead {
241            let source = self.reader.get_stream().expect("Source stream must be set");
242            if source.is_closed() {
243                let dest = self
244                    .writer
245                    .get_stream()
246                    .expect("Destination stream must be set");
247
248                // If dest.[[state]] is "writable",
249                // and ! WritableStreamCloseQueuedOrInFlight(dest) is false,
250                if dest.is_writable() && !dest.close_queued_or_in_flight() {
251                    let Ok(done) = get_read_promise_done(cx, &result) else {
252                        // This is the case that the microtask ran in reaction
253                        // to the closed promise of the reader,
254                        // so we should wait for subsequent chunks,
255                        // and skip the shutdown below
256                        // (reader is closed, but there are still pending reads).
257                        // Shutdown will happen when the last chunk has been received.
258                        return;
259                    };
260
261                    if !done {
262                        // If any chunks have been read but not yet written, write them to dest.
263                        self.write_chunk(cx, &global, result);
264                    }
265                }
266            }
267        }
268
269        self.check_and_propagate_errors_forward(cx, &global);
270        self.check_and_propagate_errors_backward(cx, &global);
271        self.check_and_propagate_closing_forward(cx, &global);
272        self.check_and_propagate_closing_backward(cx, &global);
273
274        // Note: cloning to prevent re-borrow in methods called below.
275        let state = self.state.borrow().clone();
276
277        // If we switched to a shutdown state,
278        // return.
279        // Progress will be made at the next tick.
280        if state != state_before_checks {
281            return;
282        }
283
284        match state {
285            PipeToState::Starting => unreachable!("PipeTo should not be in the Starting state."),
286            PipeToState::PendingReady => {
287                // Read a chunk.
288                self.read_chunk(cx, &global);
289            },
290            PipeToState::PendingRead => {
291                // Write the chunk.
292                self.write_chunk(cx, &global, result);
293
294                // An early return is necessary if the write algorithm aborted the pipe.
295                if self.shutting_down.get() {
296                    return;
297                }
298
299                // Wait for the writer to be ready again.
300                self.wait_for_writer_ready(cx, &global);
301            },
302            PipeToState::ShuttingDownWithPendingWrites(action) => {
303                // Wait until every chunk that has been read has been written
304                // (i.e. the corresponding promises have settled).
305                if let Some(write) = self.pending_writes.borrow_mut().front().cloned() {
306                    self.wait_on_pending_write(cx, &global, write);
307                    return;
308                }
309
310                // Note: error is stored in `self.shutdown_error`.
311                if let Some(action) = action {
312                    // Let p be the result of performing action.
313                    self.perform_action(cx, &global, action);
314                } else {
315                    // Finalize, passing along error if it was given.
316                    self.finalize(cx, &global);
317                }
318            },
319            PipeToState::ShuttingDownPendingAction => {
320                let Some(ref promise) = *self.shutdown_action_promise.borrow() else {
321                    unreachable!();
322                };
323                if promise.is_pending() {
324                    // While waiting for the action to complete,
325                    // we may get callbacks for other promises(closed, ready),
326                    // and we should ignore those.
327                    return;
328                }
329
330                let is_array_like = {
331                    if !result.is_object() {
332                        false
333                    } else {
334                        is_array_like::<crate::DomTypeHolder>(cx, result)
335                    }
336                };
337
338                // Finalize, passing along error if it was given.
339                if !result.is_undefined() && !is_array_like {
340                    // Most actions either resolve with undefined,
341                    // or reject with an error,
342                    // and the error should be used when finalizing.
343                    // One exception is the `Abort` action,
344                    // which resolves with a list of undefined values.
345
346                    // If `result` isn't undefined or array-like,
347                    // then it is an error
348                    // and should overwrite the current shutdown error.
349                    self.set_shutdown_error(result);
350                }
351                self.finalize(cx, &global);
352            },
353            PipeToState::Finalized => {},
354        }
355    }
356}
357
358impl PipeTo {
359    /// Setting shutdown error in a way that ensures it isn't
360    /// moved after it has been set.
361    fn set_shutdown_error(&self, error: SafeHandleValue) {
362        *self.shutdown_error.borrow_mut() = Some(Heap::default());
363        let Some(ref heap) = *self.shutdown_error.borrow() else {
364            unreachable!("Option set to Some(heap) above.");
365        };
366        heap.set(error.get())
367    }
368
369    /// Wait for the writer to be ready,
370    /// which implements the constraint that backpressure must be enforced.
371    fn wait_for_writer_ready(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
372        {
373            let mut state = self.state.borrow_mut();
374            *state = PipeToState::PendingReady;
375        }
376
377        let ready_promise = self.writer.Ready();
378        if ready_promise.is_fulfilled() {
379            self.read_chunk(cx, global);
380        } else {
381            let handler = PromiseNativeHandler::new(
382                cx,
383                global,
384                Some(Box::new(self.clone())),
385                Some(Box::new(self.clone())),
386            );
387            ready_promise.append_native_handler(cx, &handler);
388
389            // Note: if the writer is not ready,
390            // in order to ensure progress we must
391            // also react to the closure of the source(because source may close empty).
392            let closed_promise = self.reader.Closed();
393            closed_promise.append_native_handler(cx, &handler);
394        }
395    }
396
397    /// Read a chunk
398    fn read_chunk(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
399        *self.state.borrow_mut() = PipeToState::PendingRead;
400        let chunk_promise = self.reader.Read(cx);
401        let handler = PromiseNativeHandler::new(
402            cx,
403            global,
404            Some(Box::new(self.clone())),
405            Some(Box::new(self.clone())),
406        );
407        chunk_promise.append_native_handler(cx, &handler);
408
409        // Note: in order to ensure progress we must
410        // also react to the closure of the destination.
411        let ready_promise = self.writer.Closed();
412        ready_promise.append_native_handler(cx, &handler);
413    }
414
415    /// Try to write a chunk using the jsval, and returns wether it succeeded
416    // It will fail if it is the last `done` chunk, or if it is not a chunk at all.
417    fn write_chunk(
418        &self,
419        cx: &mut JSContext,
420        global: &GlobalScope,
421        chunk: SafeHandleValue,
422    ) -> bool {
423        if chunk.is_object() {
424            rooted!(&in(cx) let object = chunk.to_object());
425            rooted!(&in(cx) let mut bytes = UndefinedValue());
426            get_property_jsval(cx, object.handle(), c"value", bytes.handle_mut())
427                .expect("Chunk should have a value.");
428
429            // Write the chunk.
430            let write_promise = self.writer.write(cx, global, bytes.handle());
431            self.pending_writes.borrow_mut().push_back(write_promise);
432            return true;
433        }
434        false
435    }
436
437    /// Only as part of shutting-down do we wait on pending writes
438    /// (backpressure is communicated not through pending writes
439    /// but through the readiness of the writer).
440    fn wait_on_pending_write(
441        &self,
442        cx: &mut CurrentRealm,
443        global: &GlobalScope,
444        promise: Rc<Promise>,
445    ) {
446        let handler = PromiseNativeHandler::new(
447            cx,
448            global,
449            Some(Box::new(self.clone())),
450            Some(Box::new(self.clone())),
451        );
452        promise.append_native_handler(cx, &handler);
453    }
454
455    /// Errors must be propagated forward part of
456    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
457    fn check_and_propagate_errors_forward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
458        // An early return is necessary if we are shutting down,
459        // because in that case the source can already have been set to none.
460        if self.shutting_down.get() {
461            return;
462        }
463
464        // if source.[[state]] is or becomes "errored", then
465        let source = self
466            .reader
467            .get_stream()
468            .expect("Reader should still have a stream");
469        if source.is_errored() {
470            rooted!(&in(cx) let mut source_error = UndefinedValue());
471            source.get_stored_error(source_error.handle_mut());
472            self.set_shutdown_error(source_error.handle());
473
474            // If preventAbort is false,
475            if !self.prevent_abort {
476                // shutdown with an action of ! WritableStreamAbort(dest, source.[[storedError]])
477                // and with source.[[storedError]].
478                self.shutdown(cx, global, Some(ShutdownAction::WritableStreamAbort))
479            } else {
480                // Otherwise, shutdown with source.[[storedError]].
481                self.shutdown(cx, global, None);
482            }
483        }
484    }
485
486    /// Errors must be propagated backward part of
487    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
488    fn check_and_propagate_errors_backward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
489        // An early return is necessary if we are shutting down,
490        // because in that case the destination can already have been set to none.
491        if self.shutting_down.get() {
492            return;
493        }
494
495        // if dest.[[state]] is or becomes "errored", then
496        let dest = self
497            .writer
498            .get_stream()
499            .expect("Writer should still have a stream");
500        if dest.is_errored() {
501            rooted!(&in(cx) let mut dest_error = UndefinedValue());
502            dest.get_stored_error(dest_error.handle_mut());
503            self.set_shutdown_error(dest_error.handle());
504
505            // If preventCancel is false,
506            if !self.prevent_cancel {
507                // shutdown with an action of ! ReadableStreamCancel(source, dest.[[storedError]])
508                // and with dest.[[storedError]].
509                self.shutdown(cx, global, Some(ShutdownAction::ReadableStreamCancel))
510            } else {
511                // Otherwise, shutdown with dest.[[storedError]].
512                self.shutdown(cx, global, None);
513            }
514        }
515    }
516
517    /// Closing must be propagated forward part of
518    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
519    fn check_and_propagate_closing_forward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
520        // An early return is necessary if we are shutting down,
521        // because in that case the source can already have been set to none.
522        if self.shutting_down.get() {
523            return;
524        }
525
526        // if source.[[state]] is or becomes "closed", then
527        let source = self
528            .reader
529            .get_stream()
530            .expect("Reader should still have a stream");
531        if source.is_closed() {
532            // If preventClose is false,
533            if !self.prevent_close {
534                // shutdown with an action of ! WritableStreamAbort(dest, source.[[storedError]])
535                // and with source.[[storedError]].
536                self.shutdown(
537                    cx,
538                    global,
539                    Some(ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation),
540                )
541            } else {
542                // Otherwise, shutdown.
543                self.shutdown(cx, global, None);
544            }
545        }
546    }
547
548    /// Closing must be propagated backward part of
549    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
550    fn check_and_propagate_closing_backward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
551        // An early return is necessary if we are shutting down,
552        // because in that case the destination can already have been set to none.
553        if self.shutting_down.get() {
554            return;
555        }
556
557        // if ! WritableStreamCloseQueuedOrInFlight(dest) is true
558        // or dest.[[state]] is "closed"
559        let dest = self
560            .writer
561            .get_stream()
562            .expect("Writer should still have a stream");
563        if dest.close_queued_or_in_flight() || dest.is_closed() {
564            // Assert: no chunks have been read or written.
565            // Note: unclear how to perform this assertion.
566
567            // Let destClosed be a new TypeError.
568            rooted!(&in(cx) let mut dest_closed = UndefinedValue());
569            let error =
570                Error::Type(c"Destination is closed or has closed queued or in flight".to_owned());
571            error.to_jsval(
572                cx.into(),
573                global,
574                dest_closed.handle_mut(),
575                CanGc::from_cx(cx),
576            );
577            self.set_shutdown_error(dest_closed.handle());
578
579            // If preventCancel is false,
580            if !self.prevent_cancel {
581                // shutdown with an action of ! ReadableStreamCancel(source, destClosed)
582                // and with destClosed.
583                self.shutdown(cx, global, Some(ShutdownAction::ReadableStreamCancel))
584            } else {
585                // Otherwise, shutdown with destClosed.
586                self.shutdown(cx, global, None);
587            }
588        }
589    }
590
591    /// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action>
592    /// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown>
593    /// Combined into one method with an optional action.
594    fn shutdown(
595        &self,
596        cx: &mut CurrentRealm,
597        global: &GlobalScope,
598        action: Option<ShutdownAction>,
599    ) {
600        // If shuttingDown is true, abort these substeps.
601        // Set shuttingDown to true.
602        if !self.shutting_down.replace(true) {
603            let dest = self.writer.get_stream().expect("Stream must be set");
604            // If dest.[[state]] is "writable",
605            // and ! WritableStreamCloseQueuedOrInFlight(dest) is false,
606            if dest.is_writable() && !dest.close_queued_or_in_flight() {
607                // If any chunks have been read but not yet written, write them to dest.
608                // Done at the top of `Callback`.
609
610                // Wait until every chunk that has been read has been written
611                // (i.e. the corresponding promises have settled).
612                if let Some(write) = self.pending_writes.borrow_mut().front() {
613                    *self.state.borrow_mut() = PipeToState::ShuttingDownWithPendingWrites(action);
614                    self.wait_on_pending_write(cx, global, write.clone());
615                    return;
616                }
617            }
618
619            // Note: error is stored in `self.shutdown_error`.
620            if let Some(action) = action {
621                // Let p be the result of performing action.
622                self.perform_action(cx, global, action);
623            } else {
624                // Finalize, passing along error if it was given.
625                self.finalize(cx, global);
626            }
627        }
628    }
629
630    /// The perform action part of
631    /// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action>
632    fn perform_action(&self, cx: &mut CurrentRealm, global: &GlobalScope, action: ShutdownAction) {
633        rooted!(&in(cx) let mut error = UndefinedValue());
634        if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
635            error.set(shutdown_error.get());
636        }
637
638        *self.state.borrow_mut() = PipeToState::ShuttingDownPendingAction;
639
640        // Let p be the result of performing action.
641        let promise = match action {
642            ShutdownAction::WritableStreamAbort => {
643                let dest = self.writer.get_stream().expect("Stream must be set");
644                dest.abort(cx, global, error.handle())
645            },
646            ShutdownAction::ReadableStreamCancel => {
647                let source = self
648                    .reader
649                    .get_stream()
650                    .expect("Reader should have a stream.");
651                source.cancel(cx, global, error.handle())
652            },
653            ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation => {
654                self.writer.close_with_error_propagation(cx, global)
655            },
656            ShutdownAction::Abort => {
657                // Note: implementation of the `abortAlgorithm`
658                // of the signal associated with this piping operation.
659
660                // Let error be signal’s abort reason.
661                rooted!(&in(cx) let mut error = UndefinedValue());
662                error.set(self.abort_reason.get());
663
664                // Let actions be an empty ordered set.
665                let mut actions = vec![];
666
667                // If preventAbort is false, append the following action to actions:
668                if !self.prevent_abort {
669                    let dest = self
670                        .writer
671                        .get_stream()
672                        .expect("Destination stream must be set");
673
674                    // If dest.[[state]] is "writable",
675                    let promise = if dest.is_writable() {
676                        // return ! WritableStreamAbort(dest, error)
677                        dest.abort(cx, global, error.handle())
678                    } else {
679                        // Otherwise, return a promise resolved with undefined.
680                        Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
681                    };
682                    actions.push(promise);
683                }
684
685                // If preventCancel is false, append the following action action to actions:
686                if !self.prevent_cancel {
687                    let source = self.reader.get_stream().expect("Source stream must be set");
688
689                    // If source.[[state]] is "readable",
690                    let promise = if source.is_readable() {
691                        // return ! ReadableStreamCancel(source, error).
692                        source.cancel(cx, global, error.handle())
693                    } else {
694                        // Otherwise, return a promise resolved with undefined.
695                        Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
696                    };
697                    actions.push(promise);
698                }
699
700                // Shutdown with an action consisting
701                // of getting a promise to wait for all of the actions in actions,
702                // and with error.
703                wait_for_all_promise(cx, global, actions)
704            },
705        };
706
707        // Upon fulfillment of p, finalize, passing along originalError if it was given.
708        // Upon rejection of p with reason newError, finalize with newError.
709        let handler = PromiseNativeHandler::new(
710            cx,
711            global,
712            Some(Box::new(self.clone())),
713            Some(Box::new(self.clone())),
714        );
715        promise.append_native_handler(cx, &handler);
716        *self.shutdown_action_promise.borrow_mut() = Some(promise);
717    }
718
719    /// <https://streams.spec.whatwg.org/#rs-pipeTo-finalize>
720    fn finalize(&self, cx: &mut JSContext, global: &GlobalScope) {
721        *self.state.borrow_mut() = PipeToState::Finalized;
722
723        // Perform ! WritableStreamDefaultWriterRelease(writer).
724        self.writer.release(cx.into(), global, CanGc::from_cx(cx));
725
726        // If reader implements ReadableStreamBYOBReader,
727        // perform ! ReadableStreamBYOBReaderRelease(reader).
728        // TODO.
729
730        // Otherwise, perform ! ReadableStreamDefaultReaderRelease(reader).
731        self.reader
732            .release(cx)
733            .expect("Releasing the reader should not fail");
734
735        // If signal is not undefined, remove abortAlgorithm from signal.
736        // Note: since `self.shutdown` is true at this point,
737        // the abort algorithm is a no-op,
738        // so for now not implementing this step.
739
740        if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
741            rooted!(&in(cx) let mut error = UndefinedValue());
742            error.set(shutdown_error.get());
743            // If error was given, reject promise with error.
744            self.result_promise
745                .reject_native_with_cx(cx, &error.handle());
746        } else {
747            // Otherwise, resolve promise with undefined.
748            self.result_promise.resolve_native_with_cx(cx, &());
749        }
750    }
751}
752
753/// The fulfillment handler for the reacting to sourceCancelPromise part of
754/// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
755#[derive(Clone, JSTraceable, MallocSizeOf)]
756struct SourceCancelPromiseFulfillmentHandler {
757    #[conditional_malloc_size_of]
758    result: Rc<Promise>,
759}
760
761impl Callback for SourceCancelPromiseFulfillmentHandler {
762    /// The fulfillment handler for the reacting to sourceCancelPromise part of
763    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
764    /// An implementation of <https://webidl.spec.whatwg.org/#dfn-perform-steps-once-promise-is-settled>
765    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
766        self.result.resolve_native_with_cx(cx, &());
767    }
768}
769
770/// The rejection handler for the reacting to sourceCancelPromise part of
771/// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
772#[derive(Clone, JSTraceable, MallocSizeOf)]
773struct SourceCancelPromiseRejectionHandler {
774    #[conditional_malloc_size_of]
775    result: Rc<Promise>,
776}
777
778impl Callback for SourceCancelPromiseRejectionHandler {
779    /// The rejection handler for the reacting to sourceCancelPromise part of
780    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
781    /// An implementation of <https://webidl.spec.whatwg.org/#dfn-perform-steps-once-promise-is-settled>
782    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
783        self.result.reject_native_with_cx(cx, &v);
784    }
785}
786
787/// <https://streams.spec.whatwg.org/#readablestream-state>
788#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf, PartialEq)]
789pub(crate) enum ReadableStreamState {
790    #[default]
791    Readable,
792    Closed,
793    Errored,
794}
795
796/// <https://streams.spec.whatwg.org/#readablestream-controller>
797#[derive(JSTraceable, MallocSizeOf)]
798#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
799pub(crate) enum ControllerType {
800    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller>
801    Byte(MutNullableDom<ReadableByteStreamController>),
802    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller>
803    Default(MutNullableDom<ReadableStreamDefaultController>),
804}
805
806/// <https://streams.spec.whatwg.org/#readablestream-readerr>
807#[derive(JSTraceable, MallocSizeOf)]
808#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
809pub(crate) enum ReaderType {
810    /// <https://streams.spec.whatwg.org/#readablestreambyobreader>
811    #[allow(clippy::upper_case_acronyms)]
812    BYOB(MutNullableDom<ReadableStreamBYOBReader>),
813    /// <https://streams.spec.whatwg.org/#readablestreamdefaultreader>
814    Default(MutNullableDom<ReadableStreamDefaultReader>),
815}
816
817impl Eq for ReaderType {}
818impl PartialEq for ReaderType {
819    fn eq(&self, other: &Self) -> bool {
820        matches!(
821            (self, other),
822            (ReaderType::BYOB(_), ReaderType::BYOB(_)) |
823                (ReaderType::Default(_), ReaderType::Default(_))
824        )
825    }
826}
827
828/// <https://streams.spec.whatwg.org/#create-readable-stream>
829pub(crate) fn create_readable_stream(
830    cx: &mut JSContext,
831    global: &GlobalScope,
832    underlying_source_type: UnderlyingSourceType,
833    queuing_strategy: Option<Rc<QueuingStrategySize>>,
834    high_water_mark: Option<f64>,
835) -> DomRoot<ReadableStream> {
836    // If highWaterMark was not passed, set it to 1.
837    let high_water_mark = high_water_mark.unwrap_or(1.0);
838
839    // If sizeAlgorithm was not passed, set it to an algorithm that returns 1.
840    let size_algorithm = queuing_strategy.unwrap_or(extract_size_algorithm(
841        &QueuingStrategy::empty(),
842        CanGc::from_cx(cx),
843    ));
844
845    // Assert: ! IsNonNegativeNumber(highWaterMark) is true.
846    assert!(high_water_mark >= 0.0);
847
848    // Let stream be a new ReadableStream.
849    // Perform ! InitializeReadableStream(stream).
850    let stream = ReadableStream::new_with_proto(global, None, CanGc::from_cx(cx));
851
852    // Let controller be a new ReadableStreamDefaultController.
853    let controller = ReadableStreamDefaultController::new(
854        global,
855        underlying_source_type,
856        high_water_mark,
857        size_algorithm,
858        CanGc::from_cx(cx),
859    );
860
861    // Perform ? SetUpReadableStreamDefaultController(stream, controller, startAlgorithm,
862    // pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm).
863    controller
864        .setup(cx, stream.clone())
865        .expect("Setup of default controller cannot fail");
866
867    // Return stream.
868    stream
869}
870
871/// <https://streams.spec.whatwg.org/#abstract-opdef-createreadablebytestream>
872fn readable_byte_stream_tee(
873    cx: &mut JSContext,
874    global: &GlobalScope,
875    underlying_source_type: UnderlyingSourceType,
876) -> DomRoot<ReadableStream> {
877    // Let stream be a new ReadableStream.
878    // Perform ! InitializeReadableStream(stream).
879    let tee_stream = ReadableStream::new_with_proto(global, None, CanGc::from_cx(cx));
880
881    // Let controller be a new ReadableByteStreamController.
882    let controller =
883        ReadableByteStreamController::new(underlying_source_type, 0.0, global, CanGc::from_cx(cx));
884
885    // Perform ? SetUpReadableByteStreamController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, 0, undefined).
886    controller
887        .setup(cx, global, tee_stream.clone())
888        .expect("Setup of byte stream controller cannot fail");
889
890    // Return stream.
891    tee_stream
892}
893
894/// <https://streams.spec.whatwg.org/#rs-class>
895#[dom_struct]
896pub(crate) struct ReadableStream {
897    reflector_: Reflector,
898
899    /// <https://streams.spec.whatwg.org/#readablestream-controller>
900    /// Note: the inner `MutNullableDom` should really be an `Option<Dom>`,
901    /// because it is never unset once set.
902    controller: RefCell<Option<ControllerType>>,
903
904    /// <https://streams.spec.whatwg.org/#readablestream-storederror>
905    #[ignore_malloc_size_of = "mozjs"]
906    stored_error: Heap<JSVal>,
907
908    /// <https://streams.spec.whatwg.org/#readablestream-disturbed>
909    disturbed: Cell<bool>,
910
911    /// <https://streams.spec.whatwg.org/#readablestream-reader>
912    reader: RefCell<Option<ReaderType>>,
913
914    /// <https://streams.spec.whatwg.org/#readablestream-state>
915    state: Cell<ReadableStreamState>,
916}
917
918impl ReadableStream {
919    /// <https://streams.spec.whatwg.org/#initialize-readable-stream>
920    fn new_inherited() -> ReadableStream {
921        ReadableStream {
922            reflector_: Reflector::new(),
923            controller: RefCell::new(None),
924            stored_error: Heap::default(),
925            disturbed: Default::default(),
926            reader: RefCell::new(None),
927            state: Cell::new(Default::default()),
928        }
929    }
930
931    pub(crate) fn new_with_proto(
932        global: &GlobalScope,
933        proto: Option<SafeHandleObject>,
934        can_gc: CanGc,
935    ) -> DomRoot<ReadableStream> {
936        reflect_dom_object_with_proto(
937            Box::new(ReadableStream::new_inherited()),
938            global,
939            proto,
940            can_gc,
941        )
942    }
943
944    /// Used as part of
945    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
946    pub(crate) fn set_default_controller(&self, controller: &ReadableStreamDefaultController) {
947        *self.controller.borrow_mut() = Some(ControllerType::Default(MutNullableDom::new(Some(
948            controller,
949        ))));
950    }
951
952    /// Used as part of
953    /// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
954    pub(crate) fn set_byte_controller(&self, controller: &ReadableByteStreamController) {
955        *self.controller.borrow_mut() =
956            Some(ControllerType::Byte(MutNullableDom::new(Some(controller))));
957    }
958
959    /// Used as part of
960    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
961    pub(crate) fn assert_no_controller(&self) {
962        let has_no_controller = self.controller.borrow().is_none();
963        assert!(has_no_controller);
964    }
965
966    /// Build a stream backed by a Rust source that has already been read into memory.
967    pub(crate) fn new_from_bytes(
968        cx: &mut JSContext,
969        global: &GlobalScope,
970        bytes: Vec<u8>,
971    ) -> Fallible<DomRoot<ReadableStream>> {
972        let stream = ReadableStream::new_with_external_underlying_source(
973            cx,
974            global,
975            UnderlyingSourceType::Memory(bytes.len()),
976        )?;
977        stream.enqueue_native(cx, bytes);
978        stream.controller_close_native(cx);
979        Ok(stream)
980    }
981
982    /// <https://streams.spec.whatwg.org/#readablestream-set-up-with-byte-reading-support>
983    pub(crate) fn new_from_bytes_with_byte_reading_support(
984        cx: &mut JSContext,
985        global: &GlobalScope,
986        bytes: Vec<u8>,
987    ) -> Fallible<DomRoot<ReadableStream>> {
988        let stream = ReadableStream::new_with_external_underlying_byte_source(
989            cx,
990            global,
991            UnderlyingSourceType::Memory(bytes.len()),
992        )?;
993        stream.enqueue_native(cx, bytes);
994        stream.controller_close_native(cx);
995        Ok(stream)
996    }
997
998    /// Build a stream backed by a Rust underlying source.
999    /// Note: external sources are always paired with a default controller.
1000    pub(crate) fn new_with_external_underlying_source(
1001        cx: &mut JSContext,
1002        global: &GlobalScope,
1003        source: UnderlyingSourceType,
1004    ) -> Fallible<DomRoot<ReadableStream>> {
1005        assert!(source.is_native());
1006        let stream = ReadableStream::new_with_proto(global, None, CanGc::from_cx(cx));
1007        let controller = ReadableStreamDefaultController::new(
1008            global,
1009            source,
1010            1.0,
1011            extract_size_algorithm(&QueuingStrategy::empty(), CanGc::from_cx(cx)),
1012            CanGc::from_cx(cx),
1013        );
1014        controller.setup(cx, stream.clone())?;
1015        Ok(stream)
1016    }
1017
1018    /// <https://streams.spec.whatwg.org/#readablestream-set-up-with-byte-reading-support>
1019    pub(crate) fn new_with_external_underlying_byte_source(
1020        cx: &mut JSContext,
1021        global: &GlobalScope,
1022        source: UnderlyingSourceType,
1023    ) -> Fallible<DomRoot<ReadableStream>> {
1024        assert!(source.is_native());
1025        let stream = ReadableStream::new_with_proto(global, None, CanGc::from_cx(cx));
1026        let controller = ReadableByteStreamController::new(source, 0.0, global, CanGc::from_cx(cx));
1027        controller.setup(cx, global, stream.clone())?;
1028        Ok(stream)
1029    }
1030
1031    /// Call into the release steps of the controller,
1032    pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
1033        match self.controller.borrow().as_ref() {
1034            Some(ControllerType::Default(controller)) => {
1035                let controller = controller
1036                    .get()
1037                    .ok_or_else(|| Error::Type(c"Stream should have controller.".to_owned()))?;
1038                controller.perform_release_steps()
1039            },
1040            Some(ControllerType::Byte(controller)) => {
1041                let controller = controller
1042                    .get()
1043                    .ok_or_else(|| Error::Type(c"Stream should have controller.".to_owned()))?;
1044                controller.perform_release_steps()
1045            },
1046            None => Err(Error::Type(c"Stream should have controller.".to_owned())),
1047        }
1048    }
1049
1050    /// Call into the pull steps of the controller,
1051    /// as part of
1052    /// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read>
1053    pub(crate) fn perform_pull_steps(&self, cx: &mut JSContext, read_request: &ReadRequest) {
1054        match self.controller.borrow().as_ref() {
1055            Some(ControllerType::Default(controller)) => controller
1056                .get()
1057                .expect("Stream should have controller.")
1058                .perform_pull_steps(cx, read_request),
1059            Some(ControllerType::Byte(controller)) => controller
1060                .get()
1061                .expect("Stream should have controller.")
1062                .perform_pull_steps(cx, read_request),
1063            None => {
1064                unreachable!("Stream does not have a controller.");
1065            },
1066        }
1067    }
1068
1069    /// Call into the pull steps of the controller,
1070    /// as part of
1071    /// <https://streams.spec.whatwg.org/#readable-stream-byob-reader-read>
1072    pub(crate) fn perform_pull_into(
1073        &self,
1074        cx: &mut JSContext,
1075        read_into_request: &ReadIntoRequest,
1076        view: &HeapBufferSource<ArrayBufferViewU8>,
1077        min: u64,
1078    ) {
1079        match self.controller.borrow().as_ref() {
1080            Some(ControllerType::Byte(controller)) => controller
1081                .get()
1082                .expect("Stream should have controller.")
1083                .perform_pull_into(cx, read_into_request, view, min),
1084            _ => {
1085                unreachable!(
1086                    "Pulling a chunk from a stream with a default controller using a BYOB reader"
1087                )
1088            },
1089        }
1090    }
1091
1092    /// <https://streams.spec.whatwg.org/#readable-stream-add-read-request>
1093    pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
1094        match self.reader.borrow().as_ref() {
1095            Some(ReaderType::Default(reader)) => {
1096                let Some(reader) = reader.get() else {
1097                    panic!("Attempt to add a read request without having first acquired a reader.");
1098                };
1099
1100                // Assert: stream.[[state]] is "readable".
1101                assert!(self.is_readable());
1102
1103                // Append readRequest to stream.[[reader]].[[readRequests]].
1104                reader.add_read_request(read_request);
1105            },
1106            _ => {
1107                unreachable!("Adding a read request can only be done on a default reader.")
1108            },
1109        }
1110    }
1111
1112    /// <https://streams.spec.whatwg.org/#readable-stream-add-read-into-request>
1113    pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
1114        match self.reader.borrow().as_ref() {
1115            // Assert: stream.[[reader]] implements ReadableStreamBYOBReader.
1116            Some(ReaderType::BYOB(reader)) => {
1117                let Some(reader) = reader.get() else {
1118                    unreachable!(
1119                        "Attempt to add a read into request without having first acquired a reader."
1120                    );
1121                };
1122
1123                // Assert: stream.[[state]] is "readable" or "closed".
1124                assert!(self.is_readable() || self.is_closed());
1125
1126                // Append readRequest to stream.[[reader]].[[readIntoRequests]].
1127                reader.add_read_into_request(read_request);
1128            },
1129            _ => {
1130                unreachable!("Adding a read into request can only be done on a BYOB reader.")
1131            },
1132        }
1133    }
1134
1135    /// <https://streams.spec.whatwg.org/#readablestream-enqueue>
1136    pub(crate) fn enqueue_native(&self, cx: &mut JSContext, bytes: Vec<u8>) {
1137        match self.controller.borrow().as_ref() {
1138            Some(ControllerType::Default(controller)) => controller
1139                .get()
1140                .expect("Stream should have controller.")
1141                .enqueue_native(cx, bytes),
1142            Some(ControllerType::Byte(controller)) => {
1143                if bytes.is_empty() {
1144                    return;
1145                }
1146
1147                let controller = controller.get().expect("Stream should have controller.");
1148                rooted!(&in(cx) let mut chunk_object = ptr::null_mut::<JSObject>());
1149                create_buffer_source::<Uint8>(
1150                    cx.into(),
1151                    &bytes,
1152                    chunk_object.handle_mut(),
1153                    CanGc::from_cx(cx),
1154                )
1155                .expect("failed to create buffer source for native byte chunk.");
1156
1157                let chunk = RootedTraceableBox::new(HeapBufferSource::<ArrayBufferViewU8>::new(
1158                    BufferSource::ArrayBufferView(Heap::boxed(*chunk_object.handle())),
1159                ));
1160                controller
1161                    .enqueue(cx, chunk)
1162                    .expect("Enqueuing a native byte chunk should not fail.");
1163            },
1164            _ => {
1165                unreachable!("Enqueueing chunk to a stream from Rust without a controller");
1166            },
1167        }
1168    }
1169
1170    /// <https://streams.spec.whatwg.org/#readable-stream-error>
1171    pub(crate) fn error(&self, cx: &mut JSContext, e: SafeHandleValue) {
1172        // Assert: stream.[[state]] is "readable".
1173        assert!(self.is_readable());
1174
1175        // Set stream.[[state]] to "errored".
1176        self.state.set(ReadableStreamState::Errored);
1177
1178        // Set stream.[[storedError]] to e.
1179        self.stored_error.set(e.get());
1180
1181        // Let reader be stream.[[reader]].
1182
1183        let default_reader = {
1184            let reader_ref = self.reader.borrow();
1185            match reader_ref.as_ref() {
1186                Some(ReaderType::Default(reader)) => reader.get(),
1187                _ => None,
1188            }
1189        };
1190
1191        if let Some(reader) = default_reader {
1192            // Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, e).
1193            reader.error(cx, e);
1194            return;
1195        }
1196
1197        let byob_reader = {
1198            let reader_ref = self.reader.borrow();
1199            match reader_ref.as_ref() {
1200                Some(ReaderType::BYOB(reader)) => reader.get(),
1201                _ => None,
1202            }
1203        };
1204
1205        if let Some(reader) = byob_reader {
1206            // Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e).
1207            reader.error_read_into_requests(e, CanGc::from_cx(cx));
1208        }
1209
1210        // If reader is undefined, return.
1211    }
1212
1213    /// <https://streams.spec.whatwg.org/#readablestream-storederror>
1214    pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
1215        handle_mut.set(self.stored_error.get());
1216    }
1217
1218    /// <https://streams.spec.whatwg.org/#readable-stream-error>
1219    /// Note: in other use cases this call happens via the controller.
1220    pub(crate) fn error_native(&self, cx: &mut JSContext, error: Error) {
1221        rooted!(&in(cx) let mut error_val = UndefinedValue());
1222        error.to_jsval(
1223            cx.into(),
1224            &self.global(),
1225            error_val.handle_mut(),
1226            CanGc::from_cx(cx),
1227        );
1228        self.error(cx, error_val.handle());
1229    }
1230
1231    /// Call into the controller's `Close` method.
1232    /// <https://streams.spec.whatwg.org/#readablestream-close>
1233    pub(crate) fn controller_close_native(&self, cx: &mut JSContext) {
1234        match self.controller.borrow().as_ref() {
1235            Some(ControllerType::Default(controller)) => {
1236                let _ = controller
1237                    .get()
1238                    .expect("Stream should have controller.")
1239                    .Close(cx);
1240            },
1241            Some(ControllerType::Byte(controller)) => {
1242                let _ = controller
1243                    .get()
1244                    .expect("Stream should have controller.")
1245                    .close(cx);
1246            },
1247            _ => {
1248                unreachable!("Native closing requires a stream controller.")
1249            },
1250        }
1251    }
1252
1253    /// Returns a boolean reflecting whether the stream has all data in memory.
1254    /// Useful for native source integration only.
1255    pub(crate) fn in_memory(&self) -> bool {
1256        match self.controller.borrow().as_ref() {
1257            Some(ControllerType::Default(controller)) => controller
1258                .get()
1259                .expect("Stream should have controller.")
1260                .in_memory(),
1261            Some(ControllerType::Byte(controller)) => controller
1262                .get()
1263                .expect("Stream should have controller.")
1264                .in_memory(),
1265            _ => unreachable!("Checking if source is in memory for a stream without a controller"),
1266        }
1267    }
1268
1269    /// Return bytes for synchronous use, if the stream has all data in memory.
1270    /// Useful for native source integration only.
1271    pub(crate) fn get_in_memory_bytes(&self) -> Option<GenericSharedMemory> {
1272        match self.controller.borrow().as_ref() {
1273            Some(ControllerType::Default(controller)) => controller
1274                .get()
1275                .expect("Stream should have controller.")
1276                .get_in_memory_bytes()
1277                .map(GenericSharedMemory::from_vec),
1278            Some(ControllerType::Byte(controller)) => controller
1279                .get()
1280                .expect("Stream should have controller.")
1281                .get_in_memory_bytes()
1282                .map(GenericSharedMemory::from_vec),
1283            _ => unreachable!("Getting in-memory bytes for a stream without a controller"),
1284        }
1285    }
1286
1287    /// Acquires a reader and locks the stream,
1288    /// must be done before `read_a_chunk`.
1289    /// Native call to
1290    /// <https://streams.spec.whatwg.org/#acquire-readable-stream-reader>
1291    pub(crate) fn acquire_default_reader(
1292        &self,
1293        can_gc: CanGc,
1294    ) -> Fallible<DomRoot<ReadableStreamDefaultReader>> {
1295        // Let reader be a new ReadableStreamDefaultReader.
1296        let reader = ReadableStreamDefaultReader::new(&self.global(), can_gc);
1297
1298        // Perform ? SetUpReadableStreamDefaultReader(reader, stream).
1299        reader.set_up(self, &self.global(), can_gc)?;
1300
1301        // Return reader.
1302        Ok(reader)
1303    }
1304
1305    /// <https://streams.spec.whatwg.org/#acquire-readable-stream-byob-reader>
1306    pub(crate) fn acquire_byob_reader(
1307        &self,
1308        can_gc: CanGc,
1309    ) -> Fallible<DomRoot<ReadableStreamBYOBReader>> {
1310        // Let reader be a new ReadableStreamBYOBReader.
1311        let reader = ReadableStreamBYOBReader::new(&self.global(), can_gc);
1312        // Perform ? SetUpReadableStreamBYOBReader(reader, stream).
1313        reader.set_up(self, &self.global(), can_gc)?;
1314
1315        // Return reader.
1316        Ok(reader)
1317    }
1318
1319    pub(crate) fn get_default_controller(&self) -> DomRoot<ReadableStreamDefaultController> {
1320        match self.controller.borrow().as_ref() {
1321            Some(ControllerType::Default(controller)) => {
1322                controller.get().expect("Stream should have controller.")
1323            },
1324            _ => {
1325                unreachable!(
1326                    "Getting default controller for a stream with a non-default controller"
1327                )
1328            },
1329        }
1330    }
1331
1332    pub(crate) fn get_byte_controller(&self) -> DomRoot<ReadableByteStreamController> {
1333        match self.controller.borrow().as_ref() {
1334            Some(ControllerType::Byte(controller)) => {
1335                controller.get().expect("Stream should have controller.")
1336            },
1337            _ => {
1338                unreachable!("Getting byte controller for a stream with a non-byte controller")
1339            },
1340        }
1341    }
1342
1343    pub(crate) fn get_default_reader(&self) -> DomRoot<ReadableStreamDefaultReader> {
1344        match self.reader.borrow().as_ref() {
1345            Some(ReaderType::Default(reader)) => reader.get().expect("Stream should have reader."),
1346            _ => {
1347                unreachable!("Getting default reader for a stream with a non-default reader")
1348            },
1349        }
1350    }
1351
1352    /// Read a chunk from the stream,
1353    /// must be called after `start_reading`,
1354    /// and before `stop_reading`.
1355    /// Native call to
1356    /// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read>
1357    pub(crate) fn read_a_chunk(&self, cx: &mut JSContext) -> Rc<Promise> {
1358        match self.reader.borrow().as_ref() {
1359            Some(ReaderType::Default(reader)) => {
1360                let Some(reader) = reader.get() else {
1361                    unreachable!(
1362                        "Attempt to read stream chunk without having first acquired a reader."
1363                    );
1364                };
1365                reader.Read(cx)
1366            },
1367            _ => {
1368                unreachable!("Native reading of a chunk can only be done with a default reader.")
1369            },
1370        }
1371    }
1372
1373    /// Releases the lock on the reader,
1374    /// must be done after `start_reading`.
1375    /// Native call to
1376    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultreaderrelease>
1377    pub(crate) fn stop_reading(&self, cx: &mut JSContext) {
1378        let reader_ref = self.reader.borrow();
1379
1380        match reader_ref.as_ref() {
1381            Some(ReaderType::Default(reader)) => {
1382                let Some(reader) = reader.get() else {
1383                    unreachable!("Attempt to stop reading without having first acquired a reader.");
1384                };
1385
1386                drop(reader_ref);
1387                reader.release(cx).expect("Reader release cannot fail.");
1388            },
1389            _ => {
1390                unreachable!("Native stop reading can only be done with a default reader.")
1391            },
1392        }
1393    }
1394
1395    /// <https://streams.spec.whatwg.org/#is-readable-stream-locked>
1396    pub(crate) fn is_locked(&self) -> bool {
1397        match self.reader.borrow().as_ref() {
1398            Some(ReaderType::Default(reader)) => reader.get().is_some(),
1399            Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1400            None => false,
1401        }
1402    }
1403
1404    pub(crate) fn is_disturbed(&self) -> bool {
1405        self.disturbed.get()
1406    }
1407
1408    pub(crate) fn set_is_disturbed(&self, disturbed: bool) {
1409        self.disturbed.set(disturbed);
1410    }
1411
1412    pub(crate) fn is_closed(&self) -> bool {
1413        self.state.get() == ReadableStreamState::Closed
1414    }
1415
1416    pub(crate) fn is_errored(&self) -> bool {
1417        self.state.get() == ReadableStreamState::Errored
1418    }
1419
1420    pub(crate) fn is_readable(&self) -> bool {
1421        self.state.get() == ReadableStreamState::Readable
1422    }
1423
1424    pub(crate) fn has_default_reader(&self) -> bool {
1425        match self.reader.borrow().as_ref() {
1426            Some(ReaderType::Default(reader)) => reader.get().is_some(),
1427            _ => false,
1428        }
1429    }
1430
1431    pub(crate) fn has_byob_reader(&self) -> bool {
1432        match self.reader.borrow().as_ref() {
1433            Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1434            _ => false,
1435        }
1436    }
1437
1438    pub(crate) fn has_byte_controller(&self) -> bool {
1439        match self.controller.borrow().as_ref() {
1440            Some(ControllerType::Byte(controller)) => controller.get().is_some(),
1441            _ => false,
1442        }
1443    }
1444
1445    /// <https://streams.spec.whatwg.org/#readable-stream-get-num-read-requests>
1446    pub(crate) fn get_num_read_requests(&self) -> usize {
1447        match self.reader.borrow().as_ref() {
1448            Some(ReaderType::Default(reader)) => {
1449                let reader = reader
1450                    .get()
1451                    .expect("Stream must have a reader when getting the number of read requests.");
1452                reader.get_num_read_requests()
1453            },
1454            _ => unreachable!(
1455                "Stream must have a default reader when get num read requests is called into."
1456            ),
1457        }
1458    }
1459
1460    /// <https://streams.spec.whatwg.org/#readable-stream-get-num-read-into-requests>
1461    pub(crate) fn get_num_read_into_requests(&self) -> usize {
1462        assert!(self.has_byob_reader());
1463
1464        match self.reader.borrow().as_ref() {
1465            Some(ReaderType::BYOB(reader)) => {
1466                let Some(reader) = reader.get() else {
1467                    unreachable!(
1468                        "Stream must have a reader when get num read into requests is called into."
1469                    );
1470                };
1471                reader.get_num_read_into_requests()
1472            },
1473            _ => {
1474                unreachable!(
1475                    "Stream must have a BYOB reader when get num read into requests is called into."
1476                );
1477            },
1478        }
1479    }
1480
1481    /// <https://streams.spec.whatwg.org/#readable-stream-fulfill-read-request>
1482    pub(crate) fn fulfill_read_request(
1483        &self,
1484        cx: &mut JSContext,
1485        chunk: SafeHandleValue,
1486        done: bool,
1487    ) {
1488        // step 1 - Assert: ! ReadableStreamHasDefaultReader(stream) is true.
1489        assert!(self.has_default_reader());
1490
1491        match self.reader.borrow().as_ref() {
1492            Some(ReaderType::Default(reader)) => {
1493                // step 2 - Let reader be stream.[[reader]].
1494                let reader = reader
1495                    .get()
1496                    .expect("Stream must have a reader when a read request is fulfilled.");
1497                // step 3 - Assert: reader.[[readRequests]] is not empty.
1498                assert_ne!(reader.get_num_read_requests(), 0);
1499                // step 4 & 5
1500                // Let readRequest be reader.[[readRequests]][0]. & Remove readRequest from reader.[[readRequests]].
1501                let request = reader.remove_read_request();
1502
1503                if done {
1504                    // step 6 - If done is true, perform readRequest’s close steps.
1505                    request.close_steps(cx);
1506                } else {
1507                    // step 7 - Otherwise, perform readRequest’s chunk steps, given chunk.
1508                    let result = RootedTraceableBox::new(Heap::default());
1509                    result.set(*chunk);
1510                    request.chunk_steps(cx, result, &self.global());
1511                }
1512            },
1513            _ => {
1514                unreachable!(
1515                    "Stream must have a default reader when fulfill read requests is called into."
1516                );
1517            },
1518        }
1519    }
1520
1521    /// <https://streams.spec.whatwg.org/#readable-stream-fulfill-read-into-request>
1522    pub(crate) fn fulfill_read_into_request(
1523        &self,
1524        cx: &mut JSContext,
1525        chunk: SafeHandleValue,
1526        done: bool,
1527    ) {
1528        // Assert: ! ReadableStreamHasBYOBReader(stream) is true.
1529        assert!(self.has_byob_reader());
1530
1531        // Let reader be stream.[[reader]].
1532        match self.reader.borrow().as_ref() {
1533            Some(ReaderType::BYOB(reader)) => {
1534                let Some(reader) = reader.get() else {
1535                    unreachable!(
1536                        "Stream must have a reader when a read into request is fulfilled."
1537                    );
1538                };
1539
1540                // Assert: reader.[[readIntoRequests]] is not empty.
1541                assert!(reader.get_num_read_into_requests() > 0);
1542
1543                // Let readIntoRequest be reader.[[readIntoRequests]][0].
1544                // Remove readIntoRequest from reader.[[readIntoRequests]].
1545                let read_into_request = reader.remove_read_into_request();
1546
1547                // If done is true, perform readIntoRequest’s close steps, given chunk.
1548                let result = RootedTraceableBox::new(Heap::default());
1549                if done {
1550                    result.set(*chunk);
1551                    read_into_request.close_steps(cx, Some(result));
1552                } else {
1553                    // Otherwise, perform readIntoRequest’s chunk steps, given chunk.
1554                    result.set(*chunk);
1555                    read_into_request.chunk_steps(result, CanGc::from_cx(cx));
1556                }
1557            },
1558            _ => {
1559                unreachable!(
1560                    "Stream must have a BYOB reader when fulfill read into requests is called into."
1561                );
1562            },
1563        };
1564    }
1565
1566    /// <https://streams.spec.whatwg.org/#readable-stream-close>
1567    pub(crate) fn close(&self, cx: &mut JSContext) {
1568        // Assert: stream.[[state]] is "readable".
1569        assert!(self.is_readable());
1570        // Set stream.[[state]] to "closed".
1571        self.state.set(ReadableStreamState::Closed);
1572        // Let reader be stream.[[reader]].
1573
1574        // NOTE: do not hold the RefCell borrow across reader.close(),
1575        // or release() will panic when it tries to mut-borrow stream.reader.
1576        // So we pull out the underlying DOM reader in a local, then drop the borrow.
1577        let default_reader = {
1578            let reader_ref = self.reader.borrow();
1579            match reader_ref.as_ref() {
1580                Some(ReaderType::Default(reader)) => reader.get(),
1581                _ => None,
1582            }
1583        };
1584
1585        if let Some(reader) = default_reader {
1586            // steps 5 & 6 for a default reader
1587            reader.close(cx);
1588            return;
1589        }
1590
1591        // Same for BYOB reader.
1592        let byob_reader = {
1593            let reader_ref = self.reader.borrow();
1594            match reader_ref.as_ref() {
1595                Some(ReaderType::BYOB(reader)) => reader.get(),
1596                _ => None,
1597            }
1598        };
1599
1600        if let Some(reader) = byob_reader {
1601            // steps 5 & 6 for a BYOB reader
1602            reader.close(CanGc::from_cx(cx));
1603        }
1604
1605        // If reader is undefined, return.
1606    }
1607
1608    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>
1609    pub(crate) fn cancel(
1610        &self,
1611        cx: &mut JSContext,
1612        global: &GlobalScope,
1613        reason: SafeHandleValue,
1614    ) -> Rc<Promise> {
1615        // Set stream.[[disturbed]] to true.
1616        self.disturbed.set(true);
1617
1618        // If stream.[[state]] is "closed", return a promise resolved with undefined.
1619        if self.is_closed() {
1620            return Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
1621        }
1622        // If stream.[[state]] is "errored", return a promise rejected with stream.[[storedError]].
1623        if self.is_errored() {
1624            let promise = Promise::new2(cx, global);
1625            rooted!(&in(cx) let mut rval = UndefinedValue());
1626            self.stored_error.safe_to_jsval(cx, rval.handle_mut());
1627            promise.reject_native_with_cx(cx, &rval.handle());
1628            return promise;
1629        }
1630        // Perform ! ReadableStreamClose(stream).
1631        self.close(cx);
1632
1633        // If reader is not undefined and reader implements ReadableStreamBYOBReader,
1634        let byob_reader = {
1635            let reader_ref = self.reader.borrow();
1636            match reader_ref.as_ref() {
1637                Some(ReaderType::BYOB(reader)) => reader.get(),
1638                _ => None,
1639            }
1640        };
1641
1642        if let Some(reader) = byob_reader {
1643            // step 6.1, 6.2 & 6.3 of https://streams.spec.whatwg.org/#readable-stream-cancel
1644            reader.cancel(cx);
1645        }
1646
1647        // Let sourceCancelPromise be ! stream.[[controller]].[[CancelSteps]](reason).
1648
1649        let source_cancel_promise = match self.controller.borrow().as_ref() {
1650            Some(ControllerType::Default(controller)) => controller
1651                .get()
1652                .expect("Stream should have controller.")
1653                .perform_cancel_steps(cx, global, reason),
1654            Some(ControllerType::Byte(controller)) => controller
1655                .get()
1656                .expect("Stream should have controller.")
1657                .perform_cancel_steps(cx, global, reason),
1658            None => {
1659                panic!("Stream does not have a controller.");
1660            },
1661        };
1662
1663        // Create a new promise,
1664        // and setup a handler in order to react to the fulfillment of sourceCancelPromise.
1665        let global = self.global();
1666        let result_promise = Promise::new2(cx, &global);
1667        let fulfillment_handler = Box::new(SourceCancelPromiseFulfillmentHandler {
1668            result: result_promise.clone(),
1669        });
1670        let rejection_handler = Box::new(SourceCancelPromiseRejectionHandler {
1671            result: result_promise.clone(),
1672        });
1673        let handler = PromiseNativeHandler::new(
1674            cx,
1675            &global,
1676            Some(fulfillment_handler),
1677            Some(rejection_handler),
1678        );
1679        let mut realm = enter_auto_realm(cx, &*global);
1680        let cx = &mut realm.current_realm();
1681        source_cancel_promise.append_native_handler(cx, &handler);
1682
1683        // Return the result of reacting to sourceCancelPromise
1684        // with a fulfillment step that returns undefined.
1685        result_promise
1686    }
1687
1688    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1689    pub(crate) fn set_reader(&self, new_reader: Option<ReaderType>) {
1690        *self.reader.borrow_mut() = new_reader;
1691    }
1692
1693    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1694    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee>
1695    fn byte_tee(&self, cx: &mut JSContext) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1696        // Assert: stream implements ReadableStream.
1697        // Assert: stream.[[controller]] implements ReadableByteStreamController.
1698
1699        // Let reader be ? AcquireReadableStreamDefaultReader(stream).
1700        let reader = self.acquire_default_reader(CanGc::from_cx(cx))?;
1701        let reader = Rc::new(RefCell::new(ReaderType::Default(MutNullableDom::new(
1702            Some(&reader),
1703        ))));
1704
1705        // Let reading be false.
1706        let reading = Rc::new(Cell::new(false));
1707
1708        // Let readAgainForBranch1 be false.
1709        let read_again_for_branch_1 = Rc::new(Cell::new(false));
1710
1711        // Let readAgainForBranch2 be false.
1712        let read_again_for_branch_2 = Rc::new(Cell::new(false));
1713
1714        // Let canceled1 be false.
1715        let canceled_1 = Rc::new(Cell::new(false));
1716
1717        // Let canceled2 be false.
1718        let canceled_2 = Rc::new(Cell::new(false));
1719
1720        // Let reason1 be undefined.
1721        let reason_1 = Rc::new(Heap::default());
1722
1723        // Let reason2 be undefined.
1724        let reason_2 = Rc::new(Heap::default());
1725
1726        // Let cancelPromise be a new promise.
1727        let cancel_promise = Promise::new2(cx, &self.global());
1728        let reader_version = Rc::new(Cell::new(0));
1729
1730        let byte_tee_source_1 = ByteTeeUnderlyingSource::new(
1731            reader.clone(),
1732            self,
1733            reading.clone(),
1734            read_again_for_branch_1.clone(),
1735            read_again_for_branch_2.clone(),
1736            canceled_1.clone(),
1737            canceled_2.clone(),
1738            reason_1.clone(),
1739            reason_2.clone(),
1740            cancel_promise.clone(),
1741            reader_version.clone(),
1742            ByteTeeCancelAlgorithm::Cancel1Algorithm,
1743            ByteTeePullAlgorithm::Pull1Algorithm,
1744            CanGc::from_cx(cx),
1745        );
1746
1747        let byte_tee_source_2 = ByteTeeUnderlyingSource::new(
1748            reader.clone(),
1749            self,
1750            reading,
1751            read_again_for_branch_1,
1752            read_again_for_branch_2,
1753            canceled_1,
1754            canceled_2,
1755            reason_1,
1756            reason_2,
1757            cancel_promise,
1758            reader_version,
1759            ByteTeeCancelAlgorithm::Cancel2Algorithm,
1760            ByteTeePullAlgorithm::Pull2Algorithm,
1761            CanGc::from_cx(cx),
1762        );
1763
1764        // Set branch1 to ! CreateReadableByteStream(startAlgorithm, pull1Algorithm, cancel1Algorithm).
1765        let branch_1 = readable_byte_stream_tee(
1766            cx,
1767            &self.global(),
1768            UnderlyingSourceType::TeeByte(&byte_tee_source_1),
1769        );
1770        byte_tee_source_1.set_branch_1(&branch_1);
1771        byte_tee_source_2.set_branch_1(&branch_1);
1772
1773        // Set branch2 to ! CreateReadableByteStream(startAlgorithm, pull2Algorithm, cancel2Algorithm).
1774        let branch_2 = readable_byte_stream_tee(
1775            cx,
1776            &self.global(),
1777            UnderlyingSourceType::TeeByte(&byte_tee_source_2),
1778        );
1779        byte_tee_source_1.set_branch_2(&branch_2);
1780        byte_tee_source_2.set_branch_2(&branch_2);
1781
1782        // Perform forwardReaderError, given reader.
1783        byte_tee_source_1.forward_reader_error(cx, reader.clone());
1784        byte_tee_source_2.forward_reader_error(cx, reader);
1785
1786        // Return « branch1, branch2 ».
1787        Ok(vec![branch_1, branch_2])
1788    }
1789
1790    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee>
1791    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1792    fn default_tee(
1793        &self,
1794        cx: &mut JSContext,
1795        clone_for_branch_2: bool,
1796    ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1797        // Assert: stream implements ReadableStream.
1798
1799        // Assert: cloneForBranch2 is a boolean.
1800        let clone_for_branch_2 = Rc::new(Cell::new(clone_for_branch_2));
1801
1802        // Let reader be ? AcquireReadableStreamDefaultReader(stream).
1803        let reader = self.acquire_default_reader(CanGc::from_cx(cx))?;
1804
1805        // Let reading be false.
1806        let reading = Rc::new(Cell::new(false));
1807        // Let readAgain be false.
1808        let read_again = Rc::new(Cell::new(false));
1809        // Let canceled1 be false.
1810        let canceled_1 = Rc::new(Cell::new(false));
1811        // Let canceled2 be false.
1812        let canceled_2 = Rc::new(Cell::new(false));
1813
1814        // Let reason1 be undefined.
1815        let reason_1 = Rc::new(Heap::default());
1816        // Let reason2 be undefined.
1817        let reason_2 = Rc::new(Heap::default());
1818        // Let cancelPromise be a new promise.
1819        let cancel_promise = Promise::new2(cx, &self.global());
1820
1821        let tee_source_1 = DefaultTeeUnderlyingSource::new(
1822            &reader,
1823            self,
1824            reading.clone(),
1825            read_again.clone(),
1826            canceled_1.clone(),
1827            canceled_2.clone(),
1828            clone_for_branch_2.clone(),
1829            reason_1.clone(),
1830            reason_2.clone(),
1831            cancel_promise.clone(),
1832            DefaultTeeCancelAlgorithm::Cancel1Algorithm,
1833            CanGc::from_cx(cx),
1834        );
1835
1836        let underlying_source_type_branch_1 = UnderlyingSourceType::Tee(&tee_source_1);
1837
1838        let tee_source_2 = DefaultTeeUnderlyingSource::new(
1839            &reader,
1840            self,
1841            reading,
1842            read_again,
1843            canceled_1.clone(),
1844            canceled_2.clone(),
1845            clone_for_branch_2,
1846            reason_1,
1847            reason_2,
1848            cancel_promise.clone(),
1849            DefaultTeeCancelAlgorithm::Cancel2Algorithm,
1850            CanGc::from_cx(cx),
1851        );
1852
1853        let underlying_source_type_branch_2 = UnderlyingSourceType::Tee(&tee_source_2);
1854
1855        // Set branch_1 to ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel1Algorithm).
1856        let branch_1 = create_readable_stream(
1857            cx,
1858            &self.global(),
1859            underlying_source_type_branch_1,
1860            None,
1861            None,
1862        );
1863        tee_source_1.set_branch_1(&branch_1);
1864        tee_source_2.set_branch_1(&branch_1);
1865
1866        // Set branch_2 to ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel2Algorithm).
1867        let branch_2 = create_readable_stream(
1868            cx,
1869            &self.global(),
1870            underlying_source_type_branch_2,
1871            None,
1872            None,
1873        );
1874        tee_source_1.set_branch_2(&branch_2);
1875        tee_source_2.set_branch_2(&branch_2);
1876
1877        // Upon rejection of reader.[[closedPromise]] with reason r,
1878        reader.default_tee_append_native_handler_to_closed_promise(
1879            cx,
1880            &branch_1,
1881            &branch_2,
1882            canceled_1,
1883            canceled_2,
1884            cancel_promise,
1885        );
1886
1887        // Return « branch_1, branch_2 ».
1888        Ok(vec![branch_1, branch_2])
1889    }
1890
1891    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
1892    #[allow(clippy::too_many_arguments)]
1893    pub(crate) fn pipe_to(
1894        &self,
1895        cx: &mut CurrentRealm,
1896        global: &GlobalScope,
1897        dest: &WritableStream,
1898        prevent_close: bool,
1899        prevent_abort: bool,
1900        prevent_cancel: bool,
1901        signal: Option<&AbortSignal>,
1902    ) -> Rc<Promise> {
1903        // Assert: source implements ReadableStream.
1904        // Assert: dest implements WritableStream.
1905        // Assert: prevent_close, prevent_abort, and prevent_cancel are all booleans.
1906        // Done with method signature types.
1907
1908        // If signal was not given, let signal be undefined.
1909        // Assert: either signal is undefined, or signal implements AbortSignal.
1910        // Note: done with the `signal` argument.
1911
1912        // Assert: ! IsReadableStreamLocked(source) is false.
1913        assert!(!self.is_locked());
1914
1915        // Assert: ! IsWritableStreamLocked(dest) is false.
1916        assert!(!dest.is_locked());
1917
1918        // If source.[[controller]] implements ReadableByteStreamController,
1919        // let reader be either ! AcquireReadableStreamBYOBReader(source)
1920        // or ! AcquireReadableStreamDefaultReader(source),
1921        // at the user agent’s discretion.
1922        // Note: for now only using default readers.
1923
1924        // Otherwise, let reader be ! AcquireReadableStreamDefaultReader(source).
1925        let reader = self
1926            .acquire_default_reader(CanGc::from_cx(cx))
1927            .expect("Acquiring a default reader for pipe_to cannot fail");
1928
1929        // Let writer be ! AcquireWritableStreamDefaultWriter(dest).
1930        let writer = dest
1931            .aquire_default_writer(cx.into(), global, CanGc::from_cx(cx))
1932            .expect("Acquiring a default writer for pipe_to cannot fail");
1933
1934        // Set source.[[disturbed]] to true.
1935        self.disturbed.set(true);
1936
1937        // Let shuttingDown be false.
1938        // Done below with default.
1939
1940        // Let promise be a new promise.
1941        let promise = Promise::new2(cx, global);
1942
1943        // In parallel, but not really, using reader and writer, read all chunks from source and write them to dest.
1944        rooted!(&in(cx) let pipe_to = PipeTo {
1945            reader: Dom::from_ref(&reader),
1946            writer: Dom::from_ref(&writer),
1947            pending_writes: Default::default(),
1948            state: Default::default(),
1949            prevent_abort,
1950            prevent_cancel,
1951            prevent_close,
1952            shutting_down: Default::default(),
1953            abort_reason: Default::default(),
1954            shutdown_error: Default::default(),
1955            shutdown_action_promise:  Default::default(),
1956            result_promise: promise.clone(),
1957        });
1958
1959        // If signal is not undefined,
1960        // Note: moving the steps to here, so that the `PipeTo` is available.
1961        if let Some(signal) = signal {
1962            // Let abortAlgorithm be the following steps:
1963            // Note: steps are implemented at call site.
1964            rooted!(&in(cx) let abort_algorithm = AbortAlgorithm::StreamPiping(pipe_to.clone()));
1965
1966            // If signal is aborted, perform abortAlgorithm and return promise.
1967            if signal.aborted() {
1968                signal.run_abort_algorithm(cx, global, &abort_algorithm);
1969                return promise;
1970            }
1971
1972            // Add abortAlgorithm to signal.
1973            signal.add(&abort_algorithm);
1974        }
1975
1976        // Note: perfom checks now, since streams can start as closed or errored.
1977        pipe_to.check_and_propagate_errors_forward(cx, global);
1978        pipe_to.check_and_propagate_errors_backward(cx, global);
1979        pipe_to.check_and_propagate_closing_forward(cx, global);
1980        pipe_to.check_and_propagate_closing_backward(cx, global);
1981
1982        // If we are not closed or errored,
1983        if *pipe_to.state.borrow() == PipeToState::Starting {
1984            // Start the pipe, by waiting on the writer being ready for a chunk.
1985            pipe_to.wait_for_writer_ready(cx, global);
1986        }
1987
1988        // Return promise.
1989        promise
1990    }
1991
1992    /// <https://streams.spec.whatwg.org/#readable-stream-tee>
1993    pub(crate) fn tee(
1994        &self,
1995        cx: &mut JSContext,
1996        clone_for_branch_2: bool,
1997    ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1998        // Assert: stream implements ReadableStream.
1999        // Assert: cloneForBranch2 is a boolean.
2000
2001        match self.controller.borrow().as_ref() {
2002            Some(ControllerType::Default(_)) => {
2003                // Return ? ReadableStreamDefaultTee(stream, cloneForBranch2).
2004                self.default_tee(cx, clone_for_branch_2)
2005            },
2006            Some(ControllerType::Byte(_)) => {
2007                // If stream.[[controller]] implements ReadableByteStreamController,
2008                // return ? ReadableByteStreamTee(stream).
2009                self.byte_tee(cx)
2010            },
2011            None => {
2012                unreachable!("Stream should have a controller.");
2013            },
2014        }
2015    }
2016
2017    /// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller-from-underlying-source>
2018    fn set_up_byte_controller(
2019        &self,
2020        cx: &mut JSContext,
2021        global: &GlobalScope,
2022        underlying_source_dict: JsUnderlyingSource,
2023        underlying_source_handle: SafeHandleObject,
2024        stream: DomRoot<ReadableStream>,
2025        strategy_hwm: f64,
2026    ) -> Fallible<()> {
2027        // Let pullAlgorithm be an algorithm that returns a promise resolved with undefined.
2028        // Let cancelAlgorithm be an algorithm that returns a promise resolved with undefined.
2029        // If underlyingSourceDict["start"] exists, then set startAlgorithm to an algorithm which returns the result
2030        // of invoking underlyingSourceDict["start"] with argument list « controller »
2031        // and callback this value underlyingSource.
2032        // If underlyingSourceDict["pull"] exists, then set pullAlgorithm to an algorithm which returns the result
2033        // of invoking underlyingSourceDict["pull"] with argument list « controller »
2034        // and callback this value underlyingSource.
2035        // If underlyingSourceDict["cancel"] exists, then set cancelAlgorithm to an algorithm which takes an
2036        // argument reason and returns the result of invoking underlyingSourceDict["cancel"] with argument list
2037        // « reason » and callback this value underlyingSource.
2038
2039        // Let autoAllocateChunkSize be underlyingSourceDict["autoAllocateChunkSize"],
2040        // if it exists, or undefined otherwise.
2041        // If autoAllocateChunkSize is 0, then throw a TypeError exception.
2042        if let Some(0) = underlying_source_dict.autoAllocateChunkSize {
2043            return Err(Error::Type(c"autoAllocateChunkSize cannot be 0".to_owned()));
2044        }
2045
2046        let controller = ReadableByteStreamController::new(
2047            UnderlyingSourceType::Js(underlying_source_dict),
2048            strategy_hwm,
2049            global,
2050            CanGc::from_cx(cx),
2051        );
2052
2053        // Note: this must be done before `setup`,
2054        // otherwise `thisOb` is null in the start callback.
2055        controller.set_underlying_source_this_object(underlying_source_handle);
2056
2057        // Perform ? SetUpReadableByteStreamController(stream, controller, startAlgorithm,
2058        // pullAlgorithm, cancelAlgorithm, highWaterMark, autoAllocateChunkSize).
2059        controller.setup(cx, global, stream)
2060    }
2061
2062    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2063    pub(crate) fn setup_cross_realm_transform_readable(
2064        &self,
2065        cx: &mut JSContext,
2066        port: &MessagePort,
2067    ) {
2068        let port_id = port.message_port_id();
2069        let global = self.global();
2070
2071        // Perform ! InitializeReadableStream(stream).
2072        // Done in `new_inherited`.
2073
2074        // Let sizeAlgorithm be an algorithm that returns 1.
2075        let size_algorithm =
2076            extract_size_algorithm(&QueuingStrategy::default(), CanGc::from_cx(cx));
2077
2078        // Note: other algorithms defined in the underlying source container.
2079
2080        // Let controller be a new ReadableStreamDefaultController.
2081        let controller = ReadableStreamDefaultController::new(
2082            &self.global(),
2083            UnderlyingSourceType::Transfer(port),
2084            0.,
2085            size_algorithm,
2086            CanGc::from_cx(cx),
2087        );
2088
2089        // Add a handler for port’s message event with the following steps:
2090        // Add a handler for port’s messageerror event with the following steps:
2091        rooted!(&in(cx) let cross_realm_transform_readable = CrossRealmTransformReadable {
2092            controller: Dom::from_ref(&controller),
2093        });
2094        global.note_cross_realm_transform_readable(&cross_realm_transform_readable, port_id);
2095
2096        // Enable port’s port message queue.
2097        port.Start(cx);
2098
2099        // Perform ! SetUpReadableStreamDefaultController
2100        controller
2101            .setup(cx, DomRoot::from_ref(self))
2102            .expect("Setting up controller for transfer cannot fail.");
2103    }
2104}
2105
2106impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
2107    /// <https://streams.spec.whatwg.org/#rs-constructor>
2108    fn Constructor(
2109        cx: &mut JSContext,
2110        global: &GlobalScope,
2111        proto: Option<SafeHandleObject>,
2112        underlying_source: Option<*mut JSObject>,
2113        strategy: &QueuingStrategy,
2114    ) -> Fallible<DomRoot<Self>> {
2115        // If underlyingSource is missing, set it to null.
2116        rooted!(&in(cx) let underlying_source_obj = underlying_source.unwrap_or(ptr::null_mut()));
2117        // Let underlyingSourceDict be underlyingSource,
2118        // converted to an IDL value of type UnderlyingSource.
2119        let underlying_source_dict = if !underlying_source_obj.is_null() {
2120            rooted!(&in(cx) let obj_val = ObjectValue(underlying_source_obj.get()));
2121            match JsUnderlyingSource::new(cx, obj_val.handle()) {
2122                Ok(ConversionResult::Success(val)) => val,
2123                Ok(ConversionResult::Failure(error)) => {
2124                    return Err(Error::Type(error.into_owned()));
2125                },
2126                _ => {
2127                    return Err(Error::JSFailed);
2128                },
2129            }
2130        } else {
2131            JsUnderlyingSource::empty()
2132        };
2133
2134        // Perform ! InitializeReadableStream(this).
2135        let stream = ReadableStream::new_with_proto(global, proto, CanGc::from_cx(cx));
2136
2137        if underlying_source_dict.type_.is_some() {
2138            // If strategy["size"] exists, throw a RangeError exception.
2139            if strategy.size.is_some() {
2140                return Err(Error::Range(
2141                    c"size is not supported for byte streams".to_owned(),
2142                ));
2143            }
2144
2145            // Let highWaterMark be ? ExtractHighWaterMark(strategy, 0).
2146            let strategy_hwm = extract_high_water_mark(strategy, 0.0)?;
2147
2148            // Perform ? SetUpReadableByteStreamControllerFromUnderlyingSource(this,
2149            // underlyingSource, underlyingSourceDict, highWaterMark).
2150            stream.set_up_byte_controller(
2151                cx,
2152                global,
2153                underlying_source_dict,
2154                underlying_source_obj.handle(),
2155                stream.clone(),
2156                strategy_hwm,
2157            )?;
2158        } else {
2159            // Let highWaterMark be ? ExtractHighWaterMark(strategy, 1).
2160            let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
2161
2162            // Let sizeAlgorithm be ! ExtractSizeAlgorithm(strategy).
2163            let size_algorithm = extract_size_algorithm(strategy, CanGc::from_cx(cx));
2164
2165            let controller = ReadableStreamDefaultController::new(
2166                global,
2167                UnderlyingSourceType::Js(underlying_source_dict),
2168                high_water_mark,
2169                size_algorithm,
2170                CanGc::from_cx(cx),
2171            );
2172
2173            // Note: this must be done before `setup`,
2174            // otherwise `thisOb` is null in the start callback.
2175            controller.set_underlying_source_this_object(underlying_source_obj.handle());
2176
2177            // Perform ? SetUpReadableStreamDefaultControllerFromUnderlyingSource
2178            controller.setup(cx, stream.clone())?;
2179        };
2180
2181        Ok(stream)
2182    }
2183
2184    /// <https://streams.spec.whatwg.org/#rs-locked>
2185    fn Locked(&self) -> bool {
2186        self.is_locked()
2187    }
2188
2189    /// <https://streams.spec.whatwg.org/#rs-cancel>
2190    fn Cancel(&self, cx: &mut JSContext, reason: SafeHandleValue) -> Rc<Promise> {
2191        let global = self.global();
2192        if self.is_locked() {
2193            // If ! IsReadableStreamLocked(this) is true,
2194            // return a promise rejected with a TypeError exception.
2195            let promise = Promise::new2(cx, &global);
2196            promise.reject_error_with_cx(cx, Error::Type(c"stream is locked".to_owned()));
2197            promise
2198        } else {
2199            // Return ! ReadableStreamCancel(this, reason).
2200            self.cancel(cx, &global, reason)
2201        }
2202    }
2203
2204    /// <https://streams.spec.whatwg.org/#rs-get-reader>
2205    fn GetReader(
2206        &self,
2207        options: &ReadableStreamGetReaderOptions,
2208        can_gc: CanGc,
2209    ) -> Fallible<ReadableStreamReader> {
2210        // 1, If options["mode"] does not exist, return ? AcquireReadableStreamDefaultReader(this).
2211        if options.mode.is_none() {
2212            return Ok(ReadableStreamReader::ReadableStreamDefaultReader(
2213                self.acquire_default_reader(can_gc)?,
2214            ));
2215        }
2216        // 2. Assert: options["mode"] is "byob".
2217        assert!(options.mode.unwrap() == ReadableStreamReaderMode::Byob);
2218
2219        // 3. Return ? AcquireReadableStreamBYOBReader(this).
2220        Ok(ReadableStreamReader::ReadableStreamBYOBReader(
2221            self.acquire_byob_reader(can_gc)?,
2222        ))
2223    }
2224
2225    /// <https://streams.spec.whatwg.org/#rs-tee>
2226    fn Tee(&self, cx: &mut JSContext) -> Fallible<Vec<DomRoot<ReadableStream>>> {
2227        // Return ? ReadableStreamTee(this, false).
2228        self.tee(cx, false)
2229    }
2230
2231    /// <https://streams.spec.whatwg.org/#rs-pipe-to>
2232    fn PipeTo(
2233        &self,
2234        cx: &mut CurrentRealm,
2235        destination: &WritableStream,
2236        options: &StreamPipeOptions,
2237    ) -> Rc<Promise> {
2238        let global = self.global();
2239
2240        // If ! IsReadableStreamLocked(this) is true,
2241        if self.is_locked() {
2242            // return a promise rejected with a TypeError exception.
2243            let promise = Promise::new2(cx, &global);
2244            promise.reject_error_with_cx(cx, Error::Type(c"Source stream is locked".to_owned()));
2245            return promise;
2246        }
2247
2248        // If ! IsWritableStreamLocked(destination) is true,
2249        if destination.is_locked() {
2250            // return a promise rejected with a TypeError exception.
2251            let promise = Promise::new2(cx, &global);
2252            promise
2253                .reject_error_with_cx(cx, Error::Type(c"Destination stream is locked".to_owned()));
2254            return promise;
2255        }
2256
2257        // Let signal be options["signal"] if it exists, or undefined otherwise.
2258        let signal = options.signal.as_deref();
2259
2260        // Return ! ReadableStreamPipeTo.
2261        self.pipe_to(
2262            cx,
2263            &global,
2264            destination,
2265            options.preventClose,
2266            options.preventAbort,
2267            options.preventCancel,
2268            signal,
2269        )
2270    }
2271
2272    /// <https://streams.spec.whatwg.org/#rs-pipe-through>
2273    fn PipeThrough(
2274        &self,
2275        cx: &mut CurrentRealm,
2276        transform: &ReadableWritablePair,
2277        options: &StreamPipeOptions,
2278    ) -> Fallible<DomRoot<ReadableStream>> {
2279        let global = self.global();
2280
2281        // If ! IsReadableStreamLocked(this) is true, throw a TypeError exception.
2282        if self.is_locked() {
2283            return Err(Error::Type(c"Source stream is locked".to_owned()));
2284        }
2285
2286        // If ! IsWritableStreamLocked(transform["writable"]) is true, throw a TypeError exception.
2287        if transform.writable.is_locked() {
2288            return Err(Error::Type(c"Destination stream is locked".to_owned()));
2289        }
2290
2291        // Let signal be options["signal"] if it exists, or undefined otherwise.
2292        let signal = options.signal.as_deref();
2293
2294        // Let promise be ! ReadableStreamPipeTo(this, transform["writable"],
2295        // options["preventClose"], options["preventAbort"], options["preventCancel"], signal).
2296        let promise = self.pipe_to(
2297            cx,
2298            &global,
2299            &transform.writable,
2300            options.preventClose,
2301            options.preventAbort,
2302            options.preventCancel,
2303            signal,
2304        );
2305
2306        // Set promise.[[PromiseIsHandled]] to true.
2307        promise.set_promise_is_handled();
2308
2309        // Return transform["readable"].
2310        Ok(transform.readable.clone())
2311    }
2312}
2313
2314/// The initial steps for the message handler for both readable and writable cross realm transforms.
2315/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2316/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
2317pub(crate) fn get_type_and_value_from_message(
2318    cx: &mut JSContext,
2319    data: SafeHandleValue,
2320    value: SafeMutableHandleValue,
2321) -> DOMString {
2322    // Let data be the data of the message.
2323    // Note: we are passed the data as argument,
2324    // which originates in the return value of `structuredclone::read`.
2325
2326    // Assert: data is an Object.
2327    assert!(data.is_object());
2328    rooted!(&in(cx) let data_object = data.to_object());
2329
2330    // Let type be ! Get(data, "type").
2331    let type_ = get_property::<DOMString>(
2332        cx,
2333        data_object.handle(),
2334        c"type",
2335        StringificationBehavior::Empty,
2336    );
2337
2338    // Let value be ! Get(data, "value").
2339    get_property_jsval(cx, data_object.handle(), c"value", value)
2340        .expect("Getting the value should not fail.");
2341
2342    // Assert: type is a String.
2343    type_
2344        .expect("The type of the message should be a string")
2345        .expect("Property should be present")
2346}
2347
2348impl js::gc::Rootable for CrossRealmTransformReadable {}
2349
2350/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2351/// A wrapper to handle `message` and `messageerror` events
2352/// for the port used by the transfered stream.
2353#[derive(Clone, JSTraceable, MallocSizeOf)]
2354#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
2355pub(crate) struct CrossRealmTransformReadable {
2356    /// The controller used in the algorithm.
2357    controller: Dom<ReadableStreamDefaultController>,
2358}
2359
2360impl CrossRealmTransformReadable {
2361    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2362    /// Add a handler for port’s message event with the following steps:
2363    pub(crate) fn handle_message(
2364        &self,
2365        cx: &mut CurrentRealm,
2366        global: &GlobalScope,
2367        port: &MessagePort,
2368        message: SafeHandleValue,
2369    ) {
2370        rooted!(&in(cx) let mut value = UndefinedValue());
2371        let type_string = get_type_and_value_from_message(cx, message, value.handle_mut());
2372
2373        // If type is "chunk",
2374        if type_string == "chunk" {
2375            // Perform ! ReadableStreamDefaultControllerEnqueue(controller, value).
2376            self.controller
2377                .enqueue(cx, value.handle())
2378                .expect("Enqueing a chunk should not fail.");
2379        }
2380
2381        // Otherwise, if type is "close",
2382        if type_string == "close" {
2383            // Perform ! ReadableStreamDefaultControllerClose(controller).
2384            self.controller.close(cx);
2385
2386            // Disentangle port.
2387            global.disentangle_port(cx, port);
2388        }
2389
2390        // Otherwise, if type is "error",
2391        if type_string == "error" {
2392            // Perform ! ReadableStreamDefaultControllerError(controller, value).
2393            self.controller.error(cx, value.handle());
2394
2395            // Disentangle port.
2396            global.disentangle_port(cx, port);
2397        }
2398    }
2399
2400    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
2401    /// Add a handler for port’s messageerror event with the following steps:
2402    pub(crate) fn handle_error(
2403        &self,
2404        cx: &mut CurrentRealm,
2405        global: &GlobalScope,
2406        port: &MessagePort,
2407    ) {
2408        // Let error be a new "DataCloneError" DOMException.
2409        let error = DOMException::new(global, DOMErrorName::DataCloneError, CanGc::from_cx(cx));
2410        rooted!(&in(cx) let mut rooted_error = UndefinedValue());
2411        error.safe_to_jsval(cx, rooted_error.handle_mut());
2412
2413        // Perform ! CrossRealmTransformSendError(port, error).
2414        port.cross_realm_transform_send_error(cx, rooted_error.handle());
2415
2416        // Perform ! ReadableStreamDefaultControllerError(controller, error).
2417        self.controller.error(cx, rooted_error.handle());
2418
2419        // Disentangle port.
2420        global.disentangle_port(cx, port);
2421    }
2422}
2423
2424/// Get the `done` property of an object that a read promise resolved to.
2425pub(crate) fn get_read_promise_done(
2426    cx: &mut JSContext,
2427    v: &SafeHandleValue,
2428) -> Result<bool, Error> {
2429    if !v.is_object() {
2430        return Err(Error::Type(c"Unknown format for done property.".to_owned()));
2431    }
2432
2433    rooted!(&in(cx) let object = v.to_object());
2434    get_property::<bool>(cx, object.handle(), c"done", ())?
2435        .ok_or(Error::Type(c"Promise has no done property.".to_owned()))
2436}
2437
2438/// Get the `value` property of an object that a read promise resolved to.
2439pub(crate) fn get_read_promise_bytes(
2440    cx: &mut JSContext,
2441    v: &SafeHandleValue,
2442) -> Result<Vec<u8>, Error> {
2443    if !v.is_object() {
2444        return Err(Error::Type(
2445            c"Unknown format for for bytes read.".to_owned(),
2446        ));
2447    }
2448
2449    rooted!(&in(cx) let object = v.to_object());
2450    get_property::<Vec<u8>>(
2451        cx,
2452        object.handle(),
2453        c"value",
2454        ConversionBehavior::EnforceRange,
2455    )?
2456    .ok_or(Error::Type(c"Promise has no value property.".to_owned()))
2457}
2458
2459/// Convert a raw stream `chunk` JS value to `Vec<u8>`.
2460/// This mirrors the conversion used inside `get_read_promise_bytes`,
2461/// but operates on the raw chunk (no `{ value, done }` wrapper).
2462pub(crate) fn bytes_from_chunk_jsval(
2463    cx: &mut JSContext,
2464    chunk: &RootedTraceableBox<Heap<JSVal>>,
2465) -> Result<Vec<u8>, Error> {
2466    match Vec::<u8>::safe_from_jsval(cx, chunk.handle(), ConversionBehavior::EnforceRange) {
2467        Ok(ConversionResult::Success(vec)) => Ok(vec),
2468        Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.into_owned())),
2469        _ => Err(Error::Type(c"Unknown format for bytes read.".to_owned())),
2470    }
2471}
2472
2473/// <https://streams.spec.whatwg.org/#rs-transfer>
2474impl Transferable for ReadableStream {
2475    type Index = MessagePortIndex;
2476    type Data = MessagePortImpl;
2477
2478    /// <https://streams.spec.whatwg.org/#ref-for-transfer-steps>
2479    fn transfer(&self, cx: &mut JSContext) -> Fallible<(MessagePortId, MessagePortImpl)> {
2480        // Step 1. If ! IsReadableStreamLocked(value) is true, throw a
2481        // "DataCloneError" DOMException.
2482        if self.is_locked() {
2483            return Err(Error::DataClone(None));
2484        }
2485
2486        let global = self.global();
2487        let mut realm = enter_auto_realm(cx, &*global);
2488        let mut realm = realm.current_realm();
2489        let cx = &mut realm;
2490
2491        // Step 2. Let port1 be a new MessagePort in the current Realm.
2492        let port_1 = MessagePort::new(&global, CanGc::from_cx(cx));
2493        global.track_message_port(&port_1, None);
2494
2495        // Step 3. Let port2 be a new MessagePort in the current Realm.
2496        let port_2 = MessagePort::new(&global, CanGc::from_cx(cx));
2497        global.track_message_port(&port_2, None);
2498
2499        // Step 4. Entangle port1 and port2.
2500        global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
2501
2502        // Step 5. Let writable be a new WritableStream in the current Realm.
2503        let writable = WritableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
2504
2505        // Step 6. Perform ! SetUpCrossRealmTransformWritable(writable, port1).
2506        writable.setup_cross_realm_transform_writable(cx, &port_1);
2507
2508        // Step 7. Let promise be ! ReadableStreamPipeTo(value, writable, false, false, false).
2509        let promise = self.pipe_to(cx, &global, &writable, false, false, false, None);
2510
2511        // Step 8. Set promise.[[PromiseIsHandled]] to true.
2512        promise.set_promise_is_handled();
2513
2514        // Step 9. Set dataHolder.[[port]] to ! StructuredSerializeWithTransfer(port2, « port2 »).
2515        port_2.transfer(cx)
2516    }
2517
2518    /// <https://streams.spec.whatwg.org/#ref-for-transfer-receiving-steps>
2519    fn transfer_receive(
2520        cx: &mut JSContext,
2521        owner: &GlobalScope,
2522        id: MessagePortId,
2523        port_impl: MessagePortImpl,
2524    ) -> Result<DomRoot<Self>, ()> {
2525        // Their transfer-receiving steps, given dataHolder and value, are:
2526        // Note: dataHolder is used in `structuredclone.rs`, and value is created here.
2527        let value = ReadableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
2528
2529        // Step 1. Let deserializedRecord be !
2530        // StructuredDeserializeWithTransfer(dataHolder.[[port]], the current
2531        // Realm).
2532        // Done with the `Deserialize` derive of `MessagePortImpl`.
2533
2534        // Step 2. Let port be deserializedRecord.[[Deserialized]].
2535        let transferred_port = MessagePort::transfer_receive(cx, owner, id, port_impl)?;
2536
2537        // Step 3. Perform ! SetUpCrossRealmTransformReadable(value, port).
2538        value.setup_cross_realm_transform_readable(cx, &transferred_port);
2539        Ok(value)
2540    }
2541
2542    /// Note: we are relying on the port transfer, so the data returned here are related to the port.
2543    fn serialized_storage<'a>(
2544        data: StructuredData<'a, '_>,
2545    ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
2546        match data {
2547            StructuredData::Reader(r) => &mut r.port_impls,
2548            StructuredData::Writer(w) => &mut w.ports,
2549        }
2550    }
2551}