Skip to main content

script/dom/stream/
readablestream.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr::{self};
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::context::JSContext;
12use js::conversions::{FromJSValConvertible, ToJSValConvertible};
13use js::jsapi::{Heap, JSObject};
14use js::jsval::{JSVal, ObjectValue, UndefinedValue};
15use js::realm::CurrentRealm;
16use js::rust::{
17    HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
18    MutableHandleValue as SafeMutableHandleValue,
19};
20use js::typedarray::{ArrayBufferViewU8, Uint8};
21use rustc_hash::FxHashMap;
22use servo_base::generic_channel::GenericSharedMemory;
23use servo_base::id::{MessagePortId, MessagePortIndex};
24use servo_constellation_traits::MessagePortImpl;
25
26use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
27use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{
28    ReadableStreamGetReaderOptions, ReadableStreamMethods, ReadableStreamReaderMode,
29    ReadableWritablePair, StreamPipeOptions,
30};
31use script_bindings::str::DOMString;
32
33use crate::dom::domexception::{DOMErrorName, DOMException};
34use crate::dom::encoding::textdecoderstream::TextDecoderStream;
35use script_bindings::codegen::GenericBindings::TextDecoderStreamBinding::TextDecoderStreamMethods;
36use script_bindings::conversions::{is_array_like, StringificationBehavior};
37use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
38use crate::dom::abortsignal::{AbortAlgorithm, AbortSignal};
39use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods;
40use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultController_Binding::ReadableStreamDefaultControllerMethods;
41use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource;
42use crate::dom::bindings::conversions::{ConversionBehavior, ConversionResult, get_property, get_property_jsval};
43use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
44use crate::dom::bindings::codegen::GenericBindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriter_Binding::WritableStreamDefaultWriterMethods;
45use crate::dom::stream::writablestream::WritableStream;
46use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultReaderOrReadableStreamBYOBReader as ReadableStreamReader;
47use crate::dom::bindings::reflector::DomGlobal;
48use script_bindings::reflector::{Reflector, reflect_dom_object_with_proto_and_cx};
49use crate::dom::bindings::root::{DomRoot, MutNullableDom, Dom};
50use crate::dom::bindings::trace::RootedTraceableBox;
51use crate::dom::stream::byteteeunderlyingsource::{ByteTeeCancelAlgorithm, ByteTeePullAlgorithm, ByteTeeUnderlyingSource};
52use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
53use crate::dom::stream::readablestreamgenericreader::ReadableStreamGenericReader;
54use crate::dom::globalscope::GlobalScope;
55use crate::dom::promise::{wait_for_all_promise, Promise};
56use crate::dom::stream::readablebytestreamcontroller::ReadableByteStreamController;
57use crate::dom::stream::readablestreambyobreader::ReadableStreamBYOBReader;
58use crate::dom::stream::readablestreamdefaultcontroller::ReadableStreamDefaultController;
59use crate::dom::stream::readablestreamdefaultreader::{ReadRequest, ReadableStreamDefaultReader};
60use crate::dom::stream::defaultteeunderlyingsource::DefaultTeeCancelAlgorithm;
61use crate::dom::types::DefaultTeeUnderlyingSource;
62use crate::dom::stream::underlyingsourcecontainer::UnderlyingSourceType;
63use crate::dom::stream::writablestreamdefaultwriter::WritableStreamDefaultWriter;
64use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
65use crate::dom::messageport::MessagePort;
66use crate::realms::{enter_auto_realm};
67use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
68use crate::dom::bindings::transferable::Transferable;
69use crate::dom::bindings::structuredclone::StructuredData;
70
71use super::readablestreambyobreader::ReadIntoRequest;
72use crate::dom::bindings::buffer_source::{BufferSource, HeapBufferSource, create_buffer_source};
73
74/// State Machine for `PipeTo`.
75#[derive(Clone, Debug, Default, MallocSizeOf, PartialEq)]
76enum PipeToState {
77    /// The starting state
78    #[default]
79    Starting,
80    /// Waiting for the writer to be ready
81    PendingReady,
82    /// Waiting for a read to resolve.
83    PendingRead,
84    /// Waiting for all pending writes to finish,
85    /// as part of shutting down with an optional action.
86    ShuttingDownWithPendingWrites(Option<ShutdownAction>),
87    /// When shutting down with an action,
88    /// waiting for the action to complete,
89    /// at which point we can `finalize`.
90    ShuttingDownPendingAction,
91    /// The pipe has been finalized,
92    /// no further actions should be performed.
93    Finalized,
94}
95
96/// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action>
97#[derive(Clone, Debug, MallocSizeOf, PartialEq)]
98enum ShutdownAction {
99    /// <https://streams.spec.whatwg.org/#writable-stream-abort>
100    WritableStreamAbort,
101    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>
102    ReadableStreamCancel,
103    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close-with-error-propagation>
104    WritableStreamDefaultWriterCloseWithErrorPropagation,
105    /// <https://streams.spec.whatwg.org/#ref-for-rs-pipeTo-shutdown-with-action>
106    Abort,
107}
108
109impl js::gc::Rootable for PipeTo {}
110
111/// The "in parallel, but not really" part of
112/// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
113///
114/// Note: the spec is flexible about how this is done, but requires the following constraints to apply:
115/// - Public API must not be used: we'll only use Rust.
116/// - Backpressure must be enforced: we'll only read from source when dest is ready.
117/// - Shutdown must stop activity: we'll do this together with the below.
118/// - Error and close states must be propagated: we'll do this by checking these states at every step.
119#[derive(Clone, JSTraceable, MallocSizeOf)]
120#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
121pub(crate) struct PipeTo {
122    /// <https://streams.spec.whatwg.org/#ref-for-readablestream%E2%91%A7%E2%91%A0>
123    reader: Dom<ReadableStreamDefaultReader>,
124
125    /// <https://streams.spec.whatwg.org/#ref-for-acquire-writable-stream-default-writer>
126    writer: Dom<WritableStreamDefaultWriter>,
127
128    /// Pending writes are needed when shutting down(with an action),
129    /// because we can only finalize when all writes are finished.
130    #[ignore_malloc_size_of = "nested Rc"]
131    pending_writes: Rc<RefCell<VecDeque<Rc<Promise>>>>,
132
133    /// The state machine.
134    #[conditional_malloc_size_of]
135    #[no_trace]
136    state: Rc<RefCell<PipeToState>>,
137
138    /// <https://streams.spec.whatwg.org/#readablestream-pipe-to-preventabort>
139    prevent_abort: bool,
140
141    /// <https://streams.spec.whatwg.org/#readablestream-pipe-to-preventcancel>
142    prevent_cancel: bool,
143
144    /// <https://streams.spec.whatwg.org/#readablestream-pipe-to-preventclose>
145    prevent_close: bool,
146
147    /// The `shuttingDown` variable of
148    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
149    #[conditional_malloc_size_of]
150    shutting_down: Rc<Cell<bool>>,
151
152    /// The abort reason of the abort signal,
153    /// stored here because we must keep it across a microtask.
154    #[ignore_malloc_size_of = "mozjs"]
155    abort_reason: Rc<Heap<JSVal>>,
156
157    /// The error potentially passed to shutdown,
158    /// stored here because we must keep it across a microtask.
159    #[ignore_malloc_size_of = "mozjs"]
160    shutdown_error: Rc<RefCell<Option<Heap<JSVal>>>>,
161
162    /// The promise returned by a shutdown action.
163    /// We keep it to only continue when it is not pending anymore.
164    #[ignore_malloc_size_of = "nested Rc"]
165    shutdown_action_promise: Rc<RefCell<Option<Rc<Promise>>>>,
166
167    /// The promise resolved or rejected at
168    /// <https://streams.spec.whatwg.org/#rs-pipeTo-finalize>
169    #[conditional_malloc_size_of]
170    result_promise: Rc<Promise>,
171}
172
173impl PipeTo {
174    /// Run the `abortAlgorithm` defined at
175    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
176    pub(crate) fn abort_with_reason(
177        &self,
178        cx: &mut CurrentRealm,
179        global: &GlobalScope,
180        reason: SafeHandleValue,
181    ) {
182        // Abort should do nothing if we are already shutting down.
183        if self.shutting_down.get() {
184            return;
185        }
186
187        // Let error be signal’s abort reason.
188        // Note: storing it because it may need to be kept across a microtask,
189        // and see the note below as to why it is kept separately from `shutdown_error`.
190        self.abort_reason.set(reason.get());
191
192        // Note: setting the error now,
193        // will result in a rejection of the pipe promise, with this error.
194        // Unless any shutdown action raise their own error,
195        // in which case this error will be overwritten by the shutdown action error.
196        self.set_shutdown_error(reason);
197
198        // Let actions be an empty ordered set.
199        // Note: the actions are defined, and performed, inside `shutdown_with_an_action`.
200
201        // Shutdown with an action consisting of getting a promise to wait for all of the actions in actions,
202        // and with error.
203        self.shutdown(cx, global, Some(ShutdownAction::Abort));
204    }
205}
206
207impl Callback for PipeTo {
208    /// The pipe makes progress one microtask at a time.
209    /// Note: we use one struct as the callback for all promises,
210    /// and for both of their reactions.
211    ///
212    /// The context of the callback is determined from:
213    /// - the current state.
214    /// - the type of `result`.
215    /// - the state of a stored promise(in some cases).
216    fn callback(&self, cx: &mut CurrentRealm, result: SafeHandleValue) {
217        let global = self.reader.global();
218
219        // Note: we only care about the result of writes when they are rejected,
220        // and the error is accessed not through handlers,
221        // but directly using `dest.get_stored_error`.
222        // So we must mark rejected promises as handled
223        // to prevent unhandled rejection errors.
224        self.pending_writes.borrow_mut().retain(|p| {
225            let pending = p.is_pending();
226            if !pending {
227                p.set_promise_is_handled(cx);
228            }
229            pending
230        });
231
232        // Note: cloning to prevent re-borrow in methods called below.
233        let state_before_checks = self.state.borrow().clone();
234
235        // Note: if we are in a `PendingRead` state,
236        // and the source is closed,
237        // we try to write chunks before doing any shutdown,
238        // which is necessary to implement the
239        // "If any chunks have been read but not yet written, write them to dest."
240        // part of shutdown.
241        if state_before_checks == PipeToState::PendingRead {
242            let source = self.reader.get_stream().expect("Source stream must be set");
243            if source.is_closed() {
244                let dest = self
245                    .writer
246                    .get_stream()
247                    .expect("Destination stream must be set");
248
249                // If dest.[[state]] is "writable",
250                // and ! WritableStreamCloseQueuedOrInFlight(dest) is false,
251                if dest.is_writable() && !dest.close_queued_or_in_flight() {
252                    let Ok(done) = get_read_promise_done(cx, &result) else {
253                        // This is the case that the microtask ran in reaction
254                        // to the closed promise of the reader,
255                        // so we should wait for subsequent chunks,
256                        // and skip the shutdown below
257                        // (reader is closed, but there are still pending reads).
258                        // Shutdown will happen when the last chunk has been received.
259                        return;
260                    };
261
262                    if !done {
263                        // If any chunks have been read but not yet written, write them to dest.
264                        self.write_chunk(cx, &global, result);
265                    }
266                }
267            }
268        }
269
270        self.check_and_propagate_errors_forward(cx, &global);
271        self.check_and_propagate_errors_backward(cx, &global);
272        self.check_and_propagate_closing_forward(cx, &global);
273        self.check_and_propagate_closing_backward(cx, &global);
274
275        // Note: cloning to prevent re-borrow in methods called below.
276        let state = self.state.borrow().clone();
277
278        // If we switched to a shutdown state,
279        // return.
280        // Progress will be made at the next tick.
281        if state != state_before_checks {
282            return;
283        }
284
285        match state {
286            PipeToState::Starting => unreachable!("PipeTo should not be in the Starting state."),
287            PipeToState::PendingReady => {
288                // Read a chunk.
289                self.read_chunk(cx, &global);
290            },
291            PipeToState::PendingRead => {
292                // Write the chunk.
293                self.write_chunk(cx, &global, result);
294
295                // An early return is necessary if the write algorithm aborted the pipe.
296                if self.shutting_down.get() {
297                    return;
298                }
299
300                // Wait for the writer to be ready again.
301                self.wait_for_writer_ready(cx, &global);
302            },
303            PipeToState::ShuttingDownWithPendingWrites(action) => {
304                // Wait until every chunk that has been read has been written
305                // (i.e. the corresponding promises have settled).
306                if let Some(write) = self.pending_writes.borrow_mut().front().cloned() {
307                    self.wait_on_pending_write(cx, &global, write);
308                    return;
309                }
310
311                // Note: error is stored in `self.shutdown_error`.
312                if let Some(action) = action {
313                    // Let p be the result of performing action.
314                    self.perform_action(cx, &global, action);
315                } else {
316                    // Finalize, passing along error if it was given.
317                    self.finalize(cx, &global);
318                }
319            },
320            PipeToState::ShuttingDownPendingAction => {
321                let Some(ref promise) = *self.shutdown_action_promise.borrow() else {
322                    unreachable!();
323                };
324                if promise.is_pending() {
325                    // While waiting for the action to complete,
326                    // we may get callbacks for other promises(closed, ready),
327                    // and we should ignore those.
328                    return;
329                }
330
331                let is_array_like = {
332                    if !result.is_object() {
333                        false
334                    } else {
335                        is_array_like::<crate::DomTypeHolder>(cx, result)
336                    }
337                };
338
339                // Finalize, passing along error if it was given.
340                if !result.is_undefined() && !is_array_like {
341                    // Most actions either resolve with undefined,
342                    // or reject with an error,
343                    // and the error should be used when finalizing.
344                    // One exception is the `Abort` action,
345                    // which resolves with a list of undefined values.
346
347                    // If `result` isn't undefined or array-like,
348                    // then it is an error
349                    // and should overwrite the current shutdown error.
350                    self.set_shutdown_error(result);
351                }
352                self.finalize(cx, &global);
353            },
354            PipeToState::Finalized => {},
355        }
356    }
357}
358
359impl PipeTo {
360    /// Setting shutdown error in a way that ensures it isn't
361    /// moved after it has been set.
362    fn set_shutdown_error(&self, error: SafeHandleValue) {
363        *self.shutdown_error.borrow_mut() = Some(Heap::default());
364        let Some(ref heap) = *self.shutdown_error.borrow() else {
365            unreachable!("Option set to Some(heap) above.");
366        };
367        heap.set(error.get())
368    }
369
370    /// Wait for the writer to be ready,
371    /// which implements the constraint that backpressure must be enforced.
372    fn wait_for_writer_ready(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
373        {
374            let mut state = self.state.borrow_mut();
375            *state = PipeToState::PendingReady;
376        }
377
378        let ready_promise = self.writer.Ready();
379        if ready_promise.is_fulfilled() {
380            self.read_chunk(cx, global);
381        } else {
382            let handler = PromiseNativeHandler::new(
383                cx,
384                global,
385                Some(Box::new(self.clone())),
386                Some(Box::new(self.clone())),
387            );
388            ready_promise.append_native_handler(cx, &handler);
389
390            // Note: if the writer is not ready,
391            // in order to ensure progress we must
392            // also react to the closure of the source(because source may close empty).
393            let closed_promise = self.reader.Closed();
394            closed_promise.append_native_handler(cx, &handler);
395        }
396    }
397
398    /// Read a chunk
399    fn read_chunk(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
400        *self.state.borrow_mut() = PipeToState::PendingRead;
401        let chunk_promise = self.reader.Read(cx);
402        let handler = PromiseNativeHandler::new(
403            cx,
404            global,
405            Some(Box::new(self.clone())),
406            Some(Box::new(self.clone())),
407        );
408        chunk_promise.append_native_handler(cx, &handler);
409
410        // Note: in order to ensure progress we must
411        // also react to the closure of the destination.
412        let ready_promise = self.writer.Closed();
413        ready_promise.append_native_handler(cx, &handler);
414    }
415
416    /// Try to write a chunk using the jsval, and returns wether it succeeded
417    // It will fail if it is the last `done` chunk, or if it is not a chunk at all.
418    fn write_chunk(
419        &self,
420        cx: &mut JSContext,
421        global: &GlobalScope,
422        chunk: SafeHandleValue,
423    ) -> bool {
424        if chunk.is_object() {
425            rooted!(&in(cx) let object = chunk.to_object());
426            rooted!(&in(cx) let mut bytes = UndefinedValue());
427            get_property_jsval(cx, object.handle(), c"value", bytes.handle_mut())
428                .expect("Chunk should have a value.");
429
430            // Write the chunk.
431            let write_promise = self.writer.write(cx, global, bytes.handle());
432            self.pending_writes.borrow_mut().push_back(write_promise);
433            return true;
434        }
435        false
436    }
437
438    /// Only as part of shutting-down do we wait on pending writes
439    /// (backpressure is communicated not through pending writes
440    /// but through the readiness of the writer).
441    fn wait_on_pending_write(
442        &self,
443        cx: &mut CurrentRealm,
444        global: &GlobalScope,
445        promise: Rc<Promise>,
446    ) {
447        let handler = PromiseNativeHandler::new(
448            cx,
449            global,
450            Some(Box::new(self.clone())),
451            Some(Box::new(self.clone())),
452        );
453        promise.append_native_handler(cx, &handler);
454    }
455
456    /// Errors must be propagated forward part of
457    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
458    fn check_and_propagate_errors_forward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
459        // An early return is necessary if we are shutting down,
460        // because in that case the source can already have been set to none.
461        if self.shutting_down.get() {
462            return;
463        }
464
465        // if source.[[state]] is or becomes "errored", then
466        let source = self
467            .reader
468            .get_stream()
469            .expect("Reader should still have a stream");
470        if source.is_errored() {
471            rooted!(&in(cx) let mut source_error = UndefinedValue());
472            source.get_stored_error(source_error.handle_mut());
473            self.set_shutdown_error(source_error.handle());
474
475            // If preventAbort is false,
476            if !self.prevent_abort {
477                // shutdown with an action of ! WritableStreamAbort(dest, source.[[storedError]])
478                // and with source.[[storedError]].
479                self.shutdown(cx, global, Some(ShutdownAction::WritableStreamAbort))
480            } else {
481                // Otherwise, shutdown with source.[[storedError]].
482                self.shutdown(cx, global, None);
483            }
484        }
485    }
486
487    /// Errors must be propagated backward part of
488    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
489    fn check_and_propagate_errors_backward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
490        // An early return is necessary if we are shutting down,
491        // because in that case the destination can already have been set to none.
492        if self.shutting_down.get() {
493            return;
494        }
495
496        // if dest.[[state]] is or becomes "errored", then
497        let dest = self
498            .writer
499            .get_stream()
500            .expect("Writer should still have a stream");
501        if dest.is_errored() {
502            rooted!(&in(cx) let mut dest_error = UndefinedValue());
503            dest.get_stored_error(dest_error.handle_mut());
504            self.set_shutdown_error(dest_error.handle());
505
506            // If preventCancel is false,
507            if !self.prevent_cancel {
508                // shutdown with an action of ! ReadableStreamCancel(source, dest.[[storedError]])
509                // and with dest.[[storedError]].
510                self.shutdown(cx, global, Some(ShutdownAction::ReadableStreamCancel))
511            } else {
512                // Otherwise, shutdown with dest.[[storedError]].
513                self.shutdown(cx, global, None);
514            }
515        }
516    }
517
518    /// Closing must be propagated forward part of
519    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
520    fn check_and_propagate_closing_forward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
521        // An early return is necessary if we are shutting down,
522        // because in that case the source can already have been set to none.
523        if self.shutting_down.get() {
524            return;
525        }
526
527        // if source.[[state]] is or becomes "closed", then
528        let source = self
529            .reader
530            .get_stream()
531            .expect("Reader should still have a stream");
532        if source.is_closed() {
533            // If preventClose is false,
534            if !self.prevent_close {
535                // shutdown with an action of ! WritableStreamAbort(dest, source.[[storedError]])
536                // and with source.[[storedError]].
537                self.shutdown(
538                    cx,
539                    global,
540                    Some(ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation),
541                )
542            } else {
543                // Otherwise, shutdown.
544                self.shutdown(cx, global, None);
545            }
546        }
547    }
548
549    /// Closing must be propagated backward part of
550    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
551    fn check_and_propagate_closing_backward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
552        // An early return is necessary if we are shutting down,
553        // because in that case the destination can already have been set to none.
554        if self.shutting_down.get() {
555            return;
556        }
557
558        // if ! WritableStreamCloseQueuedOrInFlight(dest) is true
559        // or dest.[[state]] is "closed"
560        let dest = self
561            .writer
562            .get_stream()
563            .expect("Writer should still have a stream");
564        if dest.close_queued_or_in_flight() || dest.is_closed() {
565            // Assert: no chunks have been read or written.
566            // Note: unclear how to perform this assertion.
567
568            // Let destClosed be a new TypeError.
569            rooted!(&in(cx) let mut dest_closed = UndefinedValue());
570            let error =
571                Error::Type(c"Destination is closed or has closed queued or in flight".to_owned());
572            error.to_jsval(cx, global, dest_closed.handle_mut());
573            self.set_shutdown_error(dest_closed.handle());
574
575            // If preventCancel is false,
576            if !self.prevent_cancel {
577                // shutdown with an action of ! ReadableStreamCancel(source, destClosed)
578                // and with destClosed.
579                self.shutdown(cx, global, Some(ShutdownAction::ReadableStreamCancel))
580            } else {
581                // Otherwise, shutdown with destClosed.
582                self.shutdown(cx, global, None);
583            }
584        }
585    }
586
587    /// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action>
588    /// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown>
589    /// Combined into one method with an optional action.
590    fn shutdown(
591        &self,
592        cx: &mut CurrentRealm,
593        global: &GlobalScope,
594        action: Option<ShutdownAction>,
595    ) {
596        // If shuttingDown is true, abort these substeps.
597        // Set shuttingDown to true.
598        if !self.shutting_down.replace(true) {
599            let dest = self.writer.get_stream().expect("Stream must be set");
600            // If dest.[[state]] is "writable",
601            // and ! WritableStreamCloseQueuedOrInFlight(dest) is false,
602            if dest.is_writable() && !dest.close_queued_or_in_flight() {
603                // If any chunks have been read but not yet written, write them to dest.
604                // Done at the top of `Callback`.
605
606                // Wait until every chunk that has been read has been written
607                // (i.e. the corresponding promises have settled).
608                if let Some(write) = self.pending_writes.borrow_mut().front() {
609                    *self.state.borrow_mut() = PipeToState::ShuttingDownWithPendingWrites(action);
610                    self.wait_on_pending_write(cx, global, write.clone());
611                    return;
612                }
613            }
614
615            // Note: error is stored in `self.shutdown_error`.
616            if let Some(action) = action {
617                // Let p be the result of performing action.
618                self.perform_action(cx, global, action);
619            } else {
620                // Finalize, passing along error if it was given.
621                self.finalize(cx, global);
622            }
623        }
624    }
625
626    /// The perform action part of
627    /// <https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action>
628    fn perform_action(&self, cx: &mut CurrentRealm, global: &GlobalScope, action: ShutdownAction) {
629        rooted!(&in(cx) let mut error = UndefinedValue());
630        if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
631            error.set(shutdown_error.get());
632        }
633
634        *self.state.borrow_mut() = PipeToState::ShuttingDownPendingAction;
635
636        // Let p be the result of performing action.
637        let promise = match action {
638            ShutdownAction::WritableStreamAbort => {
639                let dest = self.writer.get_stream().expect("Stream must be set");
640                dest.abort(cx, global, error.handle())
641            },
642            ShutdownAction::ReadableStreamCancel => {
643                let source = self
644                    .reader
645                    .get_stream()
646                    .expect("Reader should have a stream.");
647                source.cancel(cx, global, error.handle())
648            },
649            ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation => {
650                self.writer.close_with_error_propagation(cx, global)
651            },
652            ShutdownAction::Abort => {
653                // Note: implementation of the `abortAlgorithm`
654                // of the signal associated with this piping operation.
655
656                // Let error be signal’s abort reason.
657                rooted!(&in(cx) let mut error = UndefinedValue());
658                error.set(self.abort_reason.get());
659
660                // Let actions be an empty ordered set.
661                let mut actions = vec![];
662
663                // If preventAbort is false, append the following action to actions:
664                if !self.prevent_abort {
665                    let dest = self
666                        .writer
667                        .get_stream()
668                        .expect("Destination stream must be set");
669
670                    // If dest.[[state]] is "writable",
671                    let promise = if dest.is_writable() {
672                        // return ! WritableStreamAbort(dest, error)
673                        dest.abort(cx, global, error.handle())
674                    } else {
675                        // Otherwise, return a promise resolved with undefined.
676                        Promise::new_resolved(cx, global, ())
677                    };
678                    actions.push(promise);
679                }
680
681                // If preventCancel is false, append the following action action to actions:
682                if !self.prevent_cancel {
683                    let source = self.reader.get_stream().expect("Source stream must be set");
684
685                    // If source.[[state]] is "readable",
686                    let promise = if source.is_readable() {
687                        // return ! ReadableStreamCancel(source, error).
688                        source.cancel(cx, global, error.handle())
689                    } else {
690                        // Otherwise, return a promise resolved with undefined.
691                        Promise::new_resolved(cx, global, ())
692                    };
693                    actions.push(promise);
694                }
695
696                // Shutdown with an action consisting
697                // of getting a promise to wait for all of the actions in actions,
698                // and with error.
699                wait_for_all_promise(cx, global, actions)
700            },
701        };
702
703        // Upon fulfillment of p, finalize, passing along originalError if it was given.
704        // Upon rejection of p with reason newError, finalize with newError.
705        let handler = PromiseNativeHandler::new(
706            cx,
707            global,
708            Some(Box::new(self.clone())),
709            Some(Box::new(self.clone())),
710        );
711        promise.append_native_handler(cx, &handler);
712        *self.shutdown_action_promise.borrow_mut() = Some(promise);
713    }
714
715    /// <https://streams.spec.whatwg.org/#rs-pipeTo-finalize>
716    fn finalize(&self, cx: &mut JSContext, global: &GlobalScope) {
717        *self.state.borrow_mut() = PipeToState::Finalized;
718
719        // Perform ! WritableStreamDefaultWriterRelease(writer).
720        self.writer.release(cx, global);
721
722        // If reader implements ReadableStreamBYOBReader,
723        // perform ! ReadableStreamBYOBReaderRelease(reader).
724        // TODO.
725
726        // Otherwise, perform ! ReadableStreamDefaultReaderRelease(reader).
727        self.reader
728            .release(cx)
729            .expect("Releasing the reader should not fail");
730
731        // If signal is not undefined, remove abortAlgorithm from signal.
732        // Note: since `self.shutdown` is true at this point,
733        // the abort algorithm is a no-op,
734        // so for now not implementing this step.
735
736        if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
737            rooted!(&in(cx) let mut error = UndefinedValue());
738            error.set(shutdown_error.get());
739            // If error was given, reject promise with error.
740            self.result_promise.reject_native(cx, &error.handle());
741        } else {
742            // Otherwise, resolve promise with undefined.
743            self.result_promise.resolve_native(cx, &());
744        }
745    }
746}
747
748/// The fulfillment handler for the reacting to sourceCancelPromise part of
749/// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
750#[derive(Clone, JSTraceable, MallocSizeOf)]
751struct SourceCancelPromiseFulfillmentHandler {
752    #[conditional_malloc_size_of]
753    result: Rc<Promise>,
754}
755
756impl Callback for SourceCancelPromiseFulfillmentHandler {
757    /// The fulfillment handler for the reacting to sourceCancelPromise part of
758    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
759    /// An implementation of <https://webidl.spec.whatwg.org/#dfn-perform-steps-once-promise-is-settled>
760    fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
761        self.result.resolve_native(cx, &());
762    }
763}
764
765/// The rejection handler for the reacting to sourceCancelPromise part of
766/// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
767#[derive(Clone, JSTraceable, MallocSizeOf)]
768struct SourceCancelPromiseRejectionHandler {
769    #[conditional_malloc_size_of]
770    result: Rc<Promise>,
771}
772
773impl Callback for SourceCancelPromiseRejectionHandler {
774    /// The rejection handler for the reacting to sourceCancelPromise part of
775    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>.
776    /// An implementation of <https://webidl.spec.whatwg.org/#dfn-perform-steps-once-promise-is-settled>
777    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
778        self.result.reject_native(cx, &v);
779    }
780}
781
782/// <https://streams.spec.whatwg.org/#readablestream-state>
783#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf, PartialEq)]
784pub(crate) enum ReadableStreamState {
785    #[default]
786    Readable,
787    Closed,
788    Errored,
789}
790
791/// <https://streams.spec.whatwg.org/#readablestream-controller>
792#[derive(JSTraceable, MallocSizeOf)]
793#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
794pub(crate) enum ControllerType {
795    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller>
796    Byte(MutNullableDom<ReadableByteStreamController>),
797    /// <https://streams.spec.whatwg.org/#readablestreamdefaultcontroller>
798    Default(MutNullableDom<ReadableStreamDefaultController>),
799}
800
801/// <https://streams.spec.whatwg.org/#readablestream-readerr>
802#[derive(JSTraceable, MallocSizeOf)]
803#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
804pub(crate) enum ReaderType {
805    /// <https://streams.spec.whatwg.org/#readablestreambyobreader>
806    #[allow(clippy::upper_case_acronyms)]
807    BYOB(MutNullableDom<ReadableStreamBYOBReader>),
808    /// <https://streams.spec.whatwg.org/#readablestreamdefaultreader>
809    Default(MutNullableDom<ReadableStreamDefaultReader>),
810}
811
812impl Eq for ReaderType {}
813impl PartialEq for ReaderType {
814    fn eq(&self, other: &Self) -> bool {
815        matches!(
816            (self, other),
817            (ReaderType::BYOB(_), ReaderType::BYOB(_)) |
818                (ReaderType::Default(_), ReaderType::Default(_))
819        )
820    }
821}
822
823/// <https://streams.spec.whatwg.org/#create-readable-stream>
824pub(crate) fn create_readable_stream(
825    cx: &mut JSContext,
826    global: &GlobalScope,
827    underlying_source_type: UnderlyingSourceType,
828    queuing_strategy: Option<Rc<QueuingStrategySize>>,
829    high_water_mark: Option<f64>,
830) -> DomRoot<ReadableStream> {
831    // If highWaterMark was not passed, set it to 1.
832    let high_water_mark = high_water_mark.unwrap_or(1.0);
833
834    // If sizeAlgorithm was not passed, set it to an algorithm that returns 1.
835    let size_algorithm =
836        queuing_strategy.unwrap_or(extract_size_algorithm(cx, &QueuingStrategy::empty()));
837
838    // Assert: ! IsNonNegativeNumber(highWaterMark) is true.
839    assert!(high_water_mark >= 0.0);
840
841    // Let stream be a new ReadableStream.
842    // Perform ! InitializeReadableStream(stream).
843    let stream = ReadableStream::new_with_proto(cx, global, None);
844
845    // Let controller be a new ReadableStreamDefaultController.
846    let controller = ReadableStreamDefaultController::new(
847        cx,
848        global,
849        underlying_source_type,
850        high_water_mark,
851        size_algorithm,
852    );
853
854    // Perform ? SetUpReadableStreamDefaultController(stream, controller, startAlgorithm,
855    // pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm).
856    controller
857        .setup(cx, stream.clone())
858        .expect("Setup of default controller cannot fail");
859
860    // Return stream.
861    stream
862}
863
864/// <https://streams.spec.whatwg.org/#abstract-opdef-createreadablebytestream>
865fn readable_byte_stream_tee(
866    cx: &mut JSContext,
867    global: &GlobalScope,
868    underlying_source_type: UnderlyingSourceType,
869) -> DomRoot<ReadableStream> {
870    // Let stream be a new ReadableStream.
871    // Perform ! InitializeReadableStream(stream).
872    let tee_stream = ReadableStream::new_with_proto(cx, global, None);
873
874    // Let controller be a new ReadableByteStreamController.
875    let controller = ReadableByteStreamController::new(cx, underlying_source_type, 0.0, global);
876
877    // Perform ? SetUpReadableByteStreamController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, 0, undefined).
878    controller
879        .setup(cx, global, tee_stream.clone())
880        .expect("Setup of byte stream controller cannot fail");
881
882    // Return stream.
883    tee_stream
884}
885
886/// <https://streams.spec.whatwg.org/#rs-class>
887#[dom_struct]
888pub(crate) struct ReadableStream {
889    reflector_: Reflector,
890
891    /// <https://streams.spec.whatwg.org/#readablestream-controller>
892    /// Note: the inner `MutNullableDom` should really be an `Option<Dom>`,
893    /// because it is never unset once set.
894    controller: RefCell<Option<ControllerType>>,
895
896    /// <https://streams.spec.whatwg.org/#readablestream-storederror>
897    #[ignore_malloc_size_of = "mozjs"]
898    stored_error: Heap<JSVal>,
899
900    /// <https://streams.spec.whatwg.org/#readablestream-disturbed>
901    disturbed: Cell<bool>,
902
903    /// <https://streams.spec.whatwg.org/#readablestream-reader>
904    reader: RefCell<Option<ReaderType>>,
905
906    /// <https://streams.spec.whatwg.org/#readablestream-state>
907    state: Cell<ReadableStreamState>,
908}
909
910impl ReadableStream {
911    /// <https://streams.spec.whatwg.org/#initialize-readable-stream>
912    fn new_inherited() -> ReadableStream {
913        ReadableStream {
914            reflector_: Reflector::new(),
915            controller: RefCell::new(None),
916            stored_error: Heap::default(),
917            disturbed: Default::default(),
918            reader: RefCell::new(None),
919            state: Cell::new(Default::default()),
920        }
921    }
922
923    pub(crate) fn new_with_proto(
924        cx: &mut JSContext,
925        global: &GlobalScope,
926        proto: Option<SafeHandleObject>,
927    ) -> DomRoot<ReadableStream> {
928        reflect_dom_object_with_proto_and_cx(
929            Box::new(ReadableStream::new_inherited()),
930            global,
931            proto,
932            cx,
933        )
934    }
935
936    /// Used as part of
937    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
938    pub(crate) fn set_default_controller(&self, controller: &ReadableStreamDefaultController) {
939        *self.controller.borrow_mut() = Some(ControllerType::Default(MutNullableDom::new(Some(
940            controller,
941        ))));
942    }
943
944    /// Used as part of
945    /// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
946    pub(crate) fn set_byte_controller(&self, controller: &ReadableByteStreamController) {
947        *self.controller.borrow_mut() =
948            Some(ControllerType::Byte(MutNullableDom::new(Some(controller))));
949    }
950
951    /// Used as part of
952    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller>
953    pub(crate) fn assert_no_controller(&self) {
954        let has_no_controller = self.controller.borrow().is_none();
955        assert!(has_no_controller);
956    }
957
958    /// Build a stream backed by a Rust source that has already been read into memory.
959    pub(crate) fn new_from_bytes(
960        cx: &mut JSContext,
961        global: &GlobalScope,
962        bytes: Vec<u8>,
963    ) -> Fallible<DomRoot<ReadableStream>> {
964        let stream = ReadableStream::new_with_external_underlying_source(
965            cx,
966            global,
967            UnderlyingSourceType::Memory(bytes.len()),
968        )?;
969        stream.enqueue_native(cx, bytes);
970        stream.controller_close_native(cx);
971        Ok(stream)
972    }
973
974    /// Build an empty stream.
975    /// Used as step 2 of <https://fetch.spec.whatwg.org/#dom-body-textstream>
976    pub(crate) fn new_empty(
977        cx: &mut JSContext,
978        global: &GlobalScope,
979    ) -> Fallible<DomRoot<ReadableStream>> {
980        // Step 1. Let emptyStream be a new ReadableStream in this’s relevant realm.
981        // Step 2. Set up emptyStream.
982        let empty_stream = ReadableStream::new_with_external_underlying_source(
983            cx,
984            global,
985            UnderlyingSourceType::Memory(0),
986        )?;
987        // Step 3. Close emptyStream.
988        empty_stream.controller_close_native(cx);
989        // Step 4. Return emptyStream.
990        Ok(empty_stream)
991    }
992
993    /// <https://streams.spec.whatwg.org/#readablestream-set-up-with-byte-reading-support>
994    pub(crate) fn new_from_bytes_with_byte_reading_support(
995        cx: &mut JSContext,
996        global: &GlobalScope,
997        bytes: Vec<u8>,
998    ) -> Fallible<DomRoot<ReadableStream>> {
999        let stream = ReadableStream::new_with_external_underlying_byte_source(
1000            cx,
1001            global,
1002            UnderlyingSourceType::Memory(bytes.len()),
1003        )?;
1004        stream.enqueue_native(cx, bytes);
1005        stream.controller_close_native(cx);
1006        Ok(stream)
1007    }
1008
1009    /// Build a stream backed by a Rust underlying source.
1010    /// Note: external sources are always paired with a default controller.
1011    pub(crate) fn new_with_external_underlying_source(
1012        cx: &mut JSContext,
1013        global: &GlobalScope,
1014        source: UnderlyingSourceType,
1015    ) -> Fallible<DomRoot<ReadableStream>> {
1016        assert!(source.is_native());
1017        let stream = ReadableStream::new_with_proto(cx, global, None);
1018        let strategy_size = extract_size_algorithm(cx, &QueuingStrategy::empty());
1019        let controller =
1020            ReadableStreamDefaultController::new(cx, global, source, 1.0, strategy_size);
1021        controller.setup(cx, stream.clone())?;
1022        Ok(stream)
1023    }
1024
1025    /// <https://streams.spec.whatwg.org/#readablestream-set-up-with-byte-reading-support>
1026    pub(crate) fn new_with_external_underlying_byte_source(
1027        cx: &mut JSContext,
1028        global: &GlobalScope,
1029        source: UnderlyingSourceType,
1030    ) -> Fallible<DomRoot<ReadableStream>> {
1031        assert!(source.is_native());
1032        let stream = ReadableStream::new_with_proto(cx, global, None);
1033        let controller = ReadableByteStreamController::new(cx, source, 0.0, global);
1034        controller.setup(cx, global, stream.clone())?;
1035        Ok(stream)
1036    }
1037
1038    /// Call into the release steps of the controller,
1039    pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
1040        match self.controller.borrow().as_ref() {
1041            Some(ControllerType::Default(controller)) => {
1042                let controller = controller
1043                    .get()
1044                    .ok_or_else(|| Error::Type(c"Stream should have controller.".to_owned()))?;
1045                controller.perform_release_steps()
1046            },
1047            Some(ControllerType::Byte(controller)) => {
1048                let controller = controller
1049                    .get()
1050                    .ok_or_else(|| Error::Type(c"Stream should have controller.".to_owned()))?;
1051                controller.perform_release_steps()
1052            },
1053            None => Err(Error::Type(c"Stream should have controller.".to_owned())),
1054        }
1055    }
1056
1057    /// Call into the pull steps of the controller,
1058    /// as part of
1059    /// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read>
1060    pub(crate) fn perform_pull_steps(&self, cx: &mut JSContext, read_request: &ReadRequest) {
1061        match self.controller.borrow().as_ref() {
1062            Some(ControllerType::Default(controller)) => controller
1063                .get()
1064                .expect("Stream should have controller.")
1065                .perform_pull_steps(cx, read_request),
1066            Some(ControllerType::Byte(controller)) => controller
1067                .get()
1068                .expect("Stream should have controller.")
1069                .perform_pull_steps(cx, read_request),
1070            None => {
1071                unreachable!("Stream does not have a controller.");
1072            },
1073        }
1074    }
1075
1076    /// Call into the pull steps of the controller,
1077    /// as part of
1078    /// <https://streams.spec.whatwg.org/#readable-stream-byob-reader-read>
1079    pub(crate) fn perform_pull_into(
1080        &self,
1081        cx: &mut JSContext,
1082        read_into_request: &ReadIntoRequest,
1083        view: &HeapBufferSource<ArrayBufferViewU8>,
1084        min: u64,
1085    ) {
1086        match self.controller.borrow().as_ref() {
1087            Some(ControllerType::Byte(controller)) => controller
1088                .get()
1089                .expect("Stream should have controller.")
1090                .perform_pull_into(cx, read_into_request, view, min),
1091            _ => {
1092                unreachable!(
1093                    "Pulling a chunk from a stream with a default controller using a BYOB reader"
1094                )
1095            },
1096        }
1097    }
1098
1099    /// <https://streams.spec.whatwg.org/#readable-stream-add-read-request>
1100    pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
1101        match self.reader.borrow().as_ref() {
1102            Some(ReaderType::Default(reader)) => {
1103                let Some(reader) = reader.get() else {
1104                    panic!("Attempt to add a read request without having first acquired a reader.");
1105                };
1106
1107                // Assert: stream.[[state]] is "readable".
1108                assert!(self.is_readable());
1109
1110                // Append readRequest to stream.[[reader]].[[readRequests]].
1111                reader.add_read_request(read_request);
1112            },
1113            _ => {
1114                unreachable!("Adding a read request can only be done on a default reader.")
1115            },
1116        }
1117    }
1118
1119    /// <https://streams.spec.whatwg.org/#readable-stream-add-read-into-request>
1120    pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
1121        match self.reader.borrow().as_ref() {
1122            // Assert: stream.[[reader]] implements ReadableStreamBYOBReader.
1123            Some(ReaderType::BYOB(reader)) => {
1124                let Some(reader) = reader.get() else {
1125                    unreachable!(
1126                        "Attempt to add a read into request without having first acquired a reader."
1127                    );
1128                };
1129
1130                // Assert: stream.[[state]] is "readable" or "closed".
1131                assert!(self.is_readable() || self.is_closed());
1132
1133                // Append readRequest to stream.[[reader]].[[readIntoRequests]].
1134                reader.add_read_into_request(read_request);
1135            },
1136            _ => {
1137                unreachable!("Adding a read into request can only be done on a BYOB reader.")
1138            },
1139        }
1140    }
1141
1142    /// <https://streams.spec.whatwg.org/#readablestream-enqueue>
1143    pub(crate) fn enqueue_native(&self, cx: &mut JSContext, bytes: Vec<u8>) {
1144        match self.controller.borrow().as_ref() {
1145            Some(ControllerType::Default(controller)) => controller
1146                .get()
1147                .expect("Stream should have controller.")
1148                .enqueue_native(cx, bytes),
1149            Some(ControllerType::Byte(controller)) => {
1150                if bytes.is_empty() {
1151                    return;
1152                }
1153
1154                let controller = controller.get().expect("Stream should have controller.");
1155                rooted!(&in(cx) let mut chunk_object = ptr::null_mut::<JSObject>());
1156                create_buffer_source::<Uint8>(cx, &bytes, chunk_object.handle_mut())
1157                    .expect("failed to create buffer source for native byte chunk.");
1158
1159                let chunk = RootedTraceableBox::new(HeapBufferSource::<ArrayBufferViewU8>::new(
1160                    BufferSource::ArrayBufferView(Heap::boxed(*chunk_object.handle())),
1161                ));
1162                controller
1163                    .enqueue(cx, chunk)
1164                    .expect("Enqueuing a native byte chunk should not fail.");
1165            },
1166            _ => {
1167                unreachable!("Enqueueing chunk to a stream from Rust without a controller");
1168            },
1169        }
1170    }
1171
1172    /// <https://streams.spec.whatwg.org/#readable-stream-error>
1173    pub(crate) fn error(&self, cx: &mut JSContext, e: SafeHandleValue) {
1174        // Assert: stream.[[state]] is "readable".
1175        assert!(self.is_readable());
1176
1177        // Set stream.[[state]] to "errored".
1178        self.state.set(ReadableStreamState::Errored);
1179
1180        // Set stream.[[storedError]] to e.
1181        self.stored_error.set(e.get());
1182
1183        // Let reader be stream.[[reader]].
1184
1185        let default_reader = {
1186            let reader_ref = self.reader.borrow();
1187            match reader_ref.as_ref() {
1188                Some(ReaderType::Default(reader)) => reader.get(),
1189                _ => None,
1190            }
1191        };
1192
1193        if let Some(reader) = default_reader {
1194            // Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, e).
1195            reader.error(cx, e);
1196            return;
1197        }
1198
1199        let byob_reader = {
1200            let reader_ref = self.reader.borrow();
1201            match reader_ref.as_ref() {
1202                Some(ReaderType::BYOB(reader)) => reader.get(),
1203                _ => None,
1204            }
1205        };
1206
1207        if let Some(reader) = byob_reader {
1208            // Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e).
1209            reader.error_read_into_requests(cx, e);
1210        }
1211
1212        // If reader is undefined, return.
1213    }
1214
1215    /// <https://streams.spec.whatwg.org/#readablestream-storederror>
1216    pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
1217        handle_mut.set(self.stored_error.get());
1218    }
1219
1220    /// <https://streams.spec.whatwg.org/#readable-stream-error>
1221    /// Note: in other use cases this call happens via the controller.
1222    pub(crate) fn error_native(&self, cx: &mut JSContext, error: Error) {
1223        rooted!(&in(cx) let mut error_val = UndefinedValue());
1224        error.to_jsval(cx, &self.global(), error_val.handle_mut());
1225        self.error(cx, error_val.handle());
1226    }
1227
1228    /// Call into the controller's `Close` method.
1229    /// <https://streams.spec.whatwg.org/#readablestream-close>
1230    pub(crate) fn controller_close_native(&self, cx: &mut JSContext) {
1231        match self.controller.borrow().as_ref() {
1232            Some(ControllerType::Default(controller)) => {
1233                let _ = controller
1234                    .get()
1235                    .expect("Stream should have controller.")
1236                    .Close(cx);
1237            },
1238            Some(ControllerType::Byte(controller)) => {
1239                let _ = controller
1240                    .get()
1241                    .expect("Stream should have controller.")
1242                    .close(cx);
1243            },
1244            _ => {
1245                unreachable!("Native closing requires a stream controller.")
1246            },
1247        }
1248    }
1249
1250    /// Returns a boolean reflecting whether the stream has all data in memory.
1251    /// Useful for native source integration only.
1252    pub(crate) fn in_memory(&self) -> bool {
1253        match self.controller.borrow().as_ref() {
1254            Some(ControllerType::Default(controller)) => controller
1255                .get()
1256                .expect("Stream should have controller.")
1257                .in_memory(),
1258            Some(ControllerType::Byte(controller)) => controller
1259                .get()
1260                .expect("Stream should have controller.")
1261                .in_memory(),
1262            _ => unreachable!("Checking if source is in memory for a stream without a controller"),
1263        }
1264    }
1265
1266    /// Return bytes for synchronous use, if the stream has all data in memory.
1267    /// Useful for native source integration only.
1268    pub(crate) fn get_in_memory_bytes(&self, cx: &mut JSContext) -> Option<GenericSharedMemory> {
1269        match self.controller.borrow().as_ref() {
1270            Some(ControllerType::Default(controller)) => controller
1271                .get()
1272                .expect("Stream should have controller.")
1273                .get_in_memory_bytes()
1274                .map(GenericSharedMemory::from_vec),
1275            Some(ControllerType::Byte(controller)) => controller
1276                .get()
1277                .expect("Stream should have controller.")
1278                .get_in_memory_bytes(cx)
1279                .map(GenericSharedMemory::from_vec),
1280            _ => unreachable!("Getting in-memory bytes for a stream without a controller"),
1281        }
1282    }
1283
1284    /// Acquires a reader and locks the stream,
1285    /// must be done before `read_a_chunk`.
1286    /// Native call to
1287    /// <https://streams.spec.whatwg.org/#acquire-readable-stream-reader>
1288    pub(crate) fn acquire_default_reader(
1289        &self,
1290        cx: &mut JSContext,
1291    ) -> Fallible<DomRoot<ReadableStreamDefaultReader>> {
1292        // Let reader be a new ReadableStreamDefaultReader.
1293        let reader = ReadableStreamDefaultReader::new(cx, &self.global());
1294
1295        // Perform ? SetUpReadableStreamDefaultReader(reader, stream).
1296        reader.set_up(cx, self, &self.global())?;
1297
1298        // Return reader.
1299        Ok(reader)
1300    }
1301
1302    /// <https://streams.spec.whatwg.org/#acquire-readable-stream-byob-reader>
1303    pub(crate) fn acquire_byob_reader(
1304        &self,
1305        cx: &mut JSContext,
1306    ) -> Fallible<DomRoot<ReadableStreamBYOBReader>> {
1307        // Let reader be a new ReadableStreamBYOBReader.
1308        let reader = ReadableStreamBYOBReader::new(cx, &self.global());
1309        // Perform ? SetUpReadableStreamBYOBReader(reader, stream).
1310        reader.set_up(cx, self, &self.global())?;
1311
1312        // Return reader.
1313        Ok(reader)
1314    }
1315
1316    pub(crate) fn get_default_controller(&self) -> DomRoot<ReadableStreamDefaultController> {
1317        match self.controller.borrow().as_ref() {
1318            Some(ControllerType::Default(controller)) => {
1319                controller.get().expect("Stream should have controller.")
1320            },
1321            _ => {
1322                unreachable!(
1323                    "Getting default controller for a stream with a non-default controller"
1324                )
1325            },
1326        }
1327    }
1328
1329    pub(crate) fn get_byte_controller(&self) -> DomRoot<ReadableByteStreamController> {
1330        match self.controller.borrow().as_ref() {
1331            Some(ControllerType::Byte(controller)) => {
1332                controller.get().expect("Stream should have controller.")
1333            },
1334            _ => {
1335                unreachable!("Getting byte controller for a stream with a non-byte controller")
1336            },
1337        }
1338    }
1339
1340    pub(crate) fn get_default_reader(&self) -> DomRoot<ReadableStreamDefaultReader> {
1341        match self.reader.borrow().as_ref() {
1342            Some(ReaderType::Default(reader)) => reader.get().expect("Stream should have reader."),
1343            _ => {
1344                unreachable!("Getting default reader for a stream with a non-default reader")
1345            },
1346        }
1347    }
1348
1349    /// Read a chunk from the stream,
1350    /// must be called after `start_reading`,
1351    /// and before `stop_reading`.
1352    /// Native call to
1353    /// <https://streams.spec.whatwg.org/#readable-stream-default-reader-read>
1354    pub(crate) fn read_a_chunk(&self, cx: &mut JSContext) -> Rc<Promise> {
1355        match self.reader.borrow().as_ref() {
1356            Some(ReaderType::Default(reader)) => {
1357                let Some(reader) = reader.get() else {
1358                    unreachable!(
1359                        "Attempt to read stream chunk without having first acquired a reader."
1360                    );
1361                };
1362                reader.Read(cx)
1363            },
1364            _ => {
1365                unreachable!("Native reading of a chunk can only be done with a default reader.")
1366            },
1367        }
1368    }
1369
1370    /// Releases the lock on the reader,
1371    /// must be done after `start_reading`.
1372    /// Native call to
1373    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultreaderrelease>
1374    pub(crate) fn stop_reading(&self, cx: &mut JSContext) {
1375        let reader_ref = self.reader.borrow();
1376
1377        match reader_ref.as_ref() {
1378            Some(ReaderType::Default(reader)) => {
1379                let Some(reader) = reader.get() else {
1380                    unreachable!("Attempt to stop reading without having first acquired a reader.");
1381                };
1382
1383                drop(reader_ref);
1384                reader.release(cx).expect("Reader release cannot fail.");
1385            },
1386            _ => {
1387                unreachable!("Native stop reading can only be done with a default reader.")
1388            },
1389        }
1390    }
1391
1392    /// <https://streams.spec.whatwg.org/#is-readable-stream-locked>
1393    pub(crate) fn is_locked(&self) -> bool {
1394        match self.reader.borrow().as_ref() {
1395            Some(ReaderType::Default(reader)) => reader.get().is_some(),
1396            Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1397            None => false,
1398        }
1399    }
1400
1401    pub(crate) fn is_disturbed(&self) -> bool {
1402        self.disturbed.get()
1403    }
1404
1405    pub(crate) fn set_is_disturbed(&self, disturbed: bool) {
1406        self.disturbed.set(disturbed);
1407    }
1408
1409    pub(crate) fn is_closed(&self) -> bool {
1410        self.state.get() == ReadableStreamState::Closed
1411    }
1412
1413    pub(crate) fn is_errored(&self) -> bool {
1414        self.state.get() == ReadableStreamState::Errored
1415    }
1416
1417    pub(crate) fn is_readable(&self) -> bool {
1418        self.state.get() == ReadableStreamState::Readable
1419    }
1420
1421    pub(crate) fn has_default_reader(&self) -> bool {
1422        match self.reader.borrow().as_ref() {
1423            Some(ReaderType::Default(reader)) => reader.get().is_some(),
1424            _ => false,
1425        }
1426    }
1427
1428    pub(crate) fn has_byob_reader(&self) -> bool {
1429        match self.reader.borrow().as_ref() {
1430            Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1431            _ => false,
1432        }
1433    }
1434
1435    pub(crate) fn has_byte_controller(&self) -> bool {
1436        match self.controller.borrow().as_ref() {
1437            Some(ControllerType::Byte(controller)) => controller.get().is_some(),
1438            _ => false,
1439        }
1440    }
1441
1442    /// <https://streams.spec.whatwg.org/#readable-stream-get-num-read-requests>
1443    pub(crate) fn get_num_read_requests(&self) -> usize {
1444        match self.reader.borrow().as_ref() {
1445            Some(ReaderType::Default(reader)) => {
1446                let reader = reader
1447                    .get()
1448                    .expect("Stream must have a reader when getting the number of read requests.");
1449                reader.get_num_read_requests()
1450            },
1451            _ => unreachable!(
1452                "Stream must have a default reader when get num read requests is called into."
1453            ),
1454        }
1455    }
1456
1457    /// <https://streams.spec.whatwg.org/#readable-stream-get-num-read-into-requests>
1458    pub(crate) fn get_num_read_into_requests(&self) -> usize {
1459        assert!(self.has_byob_reader());
1460
1461        match self.reader.borrow().as_ref() {
1462            Some(ReaderType::BYOB(reader)) => {
1463                let Some(reader) = reader.get() else {
1464                    unreachable!(
1465                        "Stream must have a reader when get num read into requests is called into."
1466                    );
1467                };
1468                reader.get_num_read_into_requests()
1469            },
1470            _ => {
1471                unreachable!(
1472                    "Stream must have a BYOB reader when get num read into requests is called into."
1473                );
1474            },
1475        }
1476    }
1477
1478    /// <https://streams.spec.whatwg.org/#readable-stream-fulfill-read-request>
1479    pub(crate) fn fulfill_read_request(
1480        &self,
1481        cx: &mut JSContext,
1482        chunk: SafeHandleValue,
1483        done: bool,
1484    ) {
1485        // step 1 - Assert: ! ReadableStreamHasDefaultReader(stream) is true.
1486        assert!(self.has_default_reader());
1487
1488        match self.reader.borrow().as_ref() {
1489            Some(ReaderType::Default(reader)) => {
1490                // step 2 - Let reader be stream.[[reader]].
1491                let reader = reader
1492                    .get()
1493                    .expect("Stream must have a reader when a read request is fulfilled.");
1494                // step 3 - Assert: reader.[[readRequests]] is not empty.
1495                assert_ne!(reader.get_num_read_requests(), 0);
1496                // step 4 & 5
1497                // Let readRequest be reader.[[readRequests]][0]. & Remove readRequest from reader.[[readRequests]].
1498                let request = reader.remove_read_request();
1499
1500                if done {
1501                    // step 6 - If done is true, perform readRequest’s close steps.
1502                    request.close_steps(cx);
1503                } else {
1504                    // step 7 - Otherwise, perform readRequest’s chunk steps, given chunk.
1505                    let result = RootedTraceableBox::new(Heap::default());
1506                    result.set(*chunk);
1507                    request.chunk_steps(cx, result, &self.global());
1508                }
1509            },
1510            _ => {
1511                unreachable!(
1512                    "Stream must have a default reader when fulfill read requests is called into."
1513                );
1514            },
1515        }
1516    }
1517
1518    /// <https://streams.spec.whatwg.org/#readable-stream-fulfill-read-into-request>
1519    pub(crate) fn fulfill_read_into_request(
1520        &self,
1521        cx: &mut JSContext,
1522        chunk: SafeHandleValue,
1523        done: bool,
1524    ) {
1525        // Assert: ! ReadableStreamHasBYOBReader(stream) is true.
1526        assert!(self.has_byob_reader());
1527
1528        // Let reader be stream.[[reader]].
1529        match self.reader.borrow().as_ref() {
1530            Some(ReaderType::BYOB(reader)) => {
1531                let Some(reader) = reader.get() else {
1532                    unreachable!(
1533                        "Stream must have a reader when a read into request is fulfilled."
1534                    );
1535                };
1536
1537                // Assert: reader.[[readIntoRequests]] is not empty.
1538                assert!(reader.get_num_read_into_requests() > 0);
1539
1540                // Let readIntoRequest be reader.[[readIntoRequests]][0].
1541                // Remove readIntoRequest from reader.[[readIntoRequests]].
1542                let read_into_request = reader.remove_read_into_request();
1543
1544                // If done is true, perform readIntoRequest’s close steps, given chunk.
1545                let result = RootedTraceableBox::new(Heap::default());
1546                if done {
1547                    result.set(*chunk);
1548                    read_into_request.close_steps(cx, Some(result));
1549                } else {
1550                    // Otherwise, perform readIntoRequest’s chunk steps, given chunk.
1551                    result.set(*chunk);
1552                    read_into_request.chunk_steps(cx, result);
1553                }
1554            },
1555            _ => {
1556                unreachable!(
1557                    "Stream must have a BYOB reader when fulfill read into requests is called into."
1558                );
1559            },
1560        };
1561    }
1562
1563    /// <https://streams.spec.whatwg.org/#readable-stream-close>
1564    pub(crate) fn close(&self, cx: &mut JSContext) {
1565        // Assert: stream.[[state]] is "readable".
1566        assert!(self.is_readable());
1567        // Set stream.[[state]] to "closed".
1568        self.state.set(ReadableStreamState::Closed);
1569        // Let reader be stream.[[reader]].
1570
1571        // NOTE: do not hold the RefCell borrow across reader.close(),
1572        // or release() will panic when it tries to mut-borrow stream.reader.
1573        // So we pull out the underlying DOM reader in a local, then drop the borrow.
1574        let default_reader = {
1575            let reader_ref = self.reader.borrow();
1576            match reader_ref.as_ref() {
1577                Some(ReaderType::Default(reader)) => reader.get(),
1578                _ => None,
1579            }
1580        };
1581
1582        if let Some(reader) = default_reader {
1583            // steps 5 & 6 for a default reader
1584            reader.close(cx);
1585            return;
1586        }
1587
1588        // Same for BYOB reader.
1589        let byob_reader = {
1590            let reader_ref = self.reader.borrow();
1591            match reader_ref.as_ref() {
1592                Some(ReaderType::BYOB(reader)) => reader.get(),
1593                _ => None,
1594            }
1595        };
1596
1597        if let Some(reader) = byob_reader {
1598            // steps 5 & 6 for a BYOB reader
1599            reader.close(cx);
1600        }
1601
1602        // If reader is undefined, return.
1603    }
1604
1605    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>
1606    pub(crate) fn cancel(
1607        &self,
1608        cx: &mut JSContext,
1609        global: &GlobalScope,
1610        reason: SafeHandleValue,
1611    ) -> Rc<Promise> {
1612        // Set stream.[[disturbed]] to true.
1613        self.disturbed.set(true);
1614
1615        // If stream.[[state]] is "closed", return a promise resolved with undefined.
1616        if self.is_closed() {
1617            return Promise::new_resolved(cx, global, ());
1618        }
1619        // If stream.[[state]] is "errored", return a promise rejected with stream.[[storedError]].
1620        if self.is_errored() {
1621            let promise = Promise::new(cx, global);
1622            rooted!(&in(cx) let mut rval = UndefinedValue());
1623            self.stored_error.safe_to_jsval(cx, rval.handle_mut());
1624            promise.reject_native(cx, &rval.handle());
1625            return promise;
1626        }
1627        // Perform ! ReadableStreamClose(stream).
1628        self.close(cx);
1629
1630        // If reader is not undefined and reader implements ReadableStreamBYOBReader,
1631        let byob_reader = {
1632            let reader_ref = self.reader.borrow();
1633            match reader_ref.as_ref() {
1634                Some(ReaderType::BYOB(reader)) => reader.get(),
1635                _ => None,
1636            }
1637        };
1638
1639        if let Some(reader) = byob_reader {
1640            // step 6.1, 6.2 & 6.3 of https://streams.spec.whatwg.org/#readable-stream-cancel
1641            reader.cancel(cx);
1642        }
1643
1644        // Let sourceCancelPromise be ! stream.[[controller]].[[CancelSteps]](reason).
1645
1646        let source_cancel_promise = match self.controller.borrow().as_ref() {
1647            Some(ControllerType::Default(controller)) => controller
1648                .get()
1649                .expect("Stream should have controller.")
1650                .perform_cancel_steps(cx, global, reason),
1651            Some(ControllerType::Byte(controller)) => controller
1652                .get()
1653                .expect("Stream should have controller.")
1654                .perform_cancel_steps(cx, global, reason),
1655            None => {
1656                panic!("Stream does not have a controller.");
1657            },
1658        };
1659
1660        // Create a new promise,
1661        // and setup a handler in order to react to the fulfillment of sourceCancelPromise.
1662        let global = self.global();
1663        let result_promise = Promise::new(cx, &global);
1664        let fulfillment_handler = Box::new(SourceCancelPromiseFulfillmentHandler {
1665            result: result_promise.clone(),
1666        });
1667        let rejection_handler = Box::new(SourceCancelPromiseRejectionHandler {
1668            result: result_promise.clone(),
1669        });
1670        let handler = PromiseNativeHandler::new(
1671            cx,
1672            &global,
1673            Some(fulfillment_handler),
1674            Some(rejection_handler),
1675        );
1676        let mut realm = enter_auto_realm(cx, &*global);
1677        let cx = &mut realm.current_realm();
1678        source_cancel_promise.append_native_handler(cx, &handler);
1679
1680        // Return the result of reacting to sourceCancelPromise
1681        // with a fulfillment step that returns undefined.
1682        result_promise
1683    }
1684
1685    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1686    pub(crate) fn set_reader(&self, new_reader: Option<ReaderType>) {
1687        *self.reader.borrow_mut() = new_reader;
1688    }
1689
1690    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1691    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee>
1692    fn byte_tee(&self, cx: &mut JSContext) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1693        // Assert: stream implements ReadableStream.
1694        // Assert: stream.[[controller]] implements ReadableByteStreamController.
1695
1696        // Let reader be ? AcquireReadableStreamDefaultReader(stream).
1697        let reader = self.acquire_default_reader(cx)?;
1698        let reader = Rc::new(RefCell::new(ReaderType::Default(MutNullableDom::new(
1699            Some(&reader),
1700        ))));
1701
1702        // Let reading be false.
1703        let reading = Rc::new(Cell::new(false));
1704
1705        // Let readAgainForBranch1 be false.
1706        let read_again_for_branch_1 = Rc::new(Cell::new(false));
1707
1708        // Let readAgainForBranch2 be false.
1709        let read_again_for_branch_2 = Rc::new(Cell::new(false));
1710
1711        // Let canceled1 be false.
1712        let canceled_1 = Rc::new(Cell::new(false));
1713
1714        // Let canceled2 be false.
1715        let canceled_2 = Rc::new(Cell::new(false));
1716
1717        // Let reason1 be undefined.
1718        let reason_1 = Rc::new(Heap::default());
1719
1720        // Let reason2 be undefined.
1721        let reason_2 = Rc::new(Heap::default());
1722
1723        // Let cancelPromise be a new promise.
1724        let cancel_promise = Promise::new(cx, &self.global());
1725        let reader_version = Rc::new(Cell::new(0));
1726
1727        let byte_tee_source_1 = ByteTeeUnderlyingSource::new(
1728            cx,
1729            reader.clone(),
1730            self,
1731            reading.clone(),
1732            read_again_for_branch_1.clone(),
1733            read_again_for_branch_2.clone(),
1734            canceled_1.clone(),
1735            canceled_2.clone(),
1736            reason_1.clone(),
1737            reason_2.clone(),
1738            cancel_promise.clone(),
1739            reader_version.clone(),
1740            ByteTeeCancelAlgorithm::Cancel1Algorithm,
1741            ByteTeePullAlgorithm::Pull1Algorithm,
1742        );
1743
1744        let byte_tee_source_2 = ByteTeeUnderlyingSource::new(
1745            cx,
1746            reader.clone(),
1747            self,
1748            reading,
1749            read_again_for_branch_1,
1750            read_again_for_branch_2,
1751            canceled_1,
1752            canceled_2,
1753            reason_1,
1754            reason_2,
1755            cancel_promise,
1756            reader_version,
1757            ByteTeeCancelAlgorithm::Cancel2Algorithm,
1758            ByteTeePullAlgorithm::Pull2Algorithm,
1759        );
1760
1761        // Set branch1 to ! CreateReadableByteStream(startAlgorithm, pull1Algorithm, cancel1Algorithm).
1762        let branch_1 = readable_byte_stream_tee(
1763            cx,
1764            &self.global(),
1765            UnderlyingSourceType::TeeByte(&byte_tee_source_1),
1766        );
1767        byte_tee_source_1.set_branch_1(&branch_1);
1768        byte_tee_source_2.set_branch_1(&branch_1);
1769
1770        // Set branch2 to ! CreateReadableByteStream(startAlgorithm, pull2Algorithm, cancel2Algorithm).
1771        let branch_2 = readable_byte_stream_tee(
1772            cx,
1773            &self.global(),
1774            UnderlyingSourceType::TeeByte(&byte_tee_source_2),
1775        );
1776        byte_tee_source_1.set_branch_2(&branch_2);
1777        byte_tee_source_2.set_branch_2(&branch_2);
1778
1779        // Perform forwardReaderError, given reader.
1780        byte_tee_source_1.forward_reader_error(cx, reader.clone());
1781        byte_tee_source_2.forward_reader_error(cx, reader);
1782
1783        // Return « branch1, branch2 ».
1784        Ok(vec![branch_1, branch_2])
1785    }
1786
1787    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee>
1788    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1789    fn default_tee(
1790        &self,
1791        cx: &mut JSContext,
1792        clone_for_branch_2: bool,
1793    ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1794        // Assert: stream implements ReadableStream.
1795
1796        // Assert: cloneForBranch2 is a boolean.
1797        let clone_for_branch_2 = Rc::new(Cell::new(clone_for_branch_2));
1798
1799        // Let reader be ? AcquireReadableStreamDefaultReader(stream).
1800        let reader = self.acquire_default_reader(cx)?;
1801
1802        // Let reading be false.
1803        let reading = Rc::new(Cell::new(false));
1804        // Let readAgain be false.
1805        let read_again = Rc::new(Cell::new(false));
1806        // Let canceled1 be false.
1807        let canceled_1 = Rc::new(Cell::new(false));
1808        // Let canceled2 be false.
1809        let canceled_2 = Rc::new(Cell::new(false));
1810
1811        // Let reason1 be undefined.
1812        let reason_1 = Rc::new(Heap::default());
1813        // Let reason2 be undefined.
1814        let reason_2 = Rc::new(Heap::default());
1815        // Let cancelPromise be a new promise.
1816        let cancel_promise = Promise::new(cx, &self.global());
1817
1818        let tee_source_1 = DefaultTeeUnderlyingSource::new(
1819            cx,
1820            &reader,
1821            self,
1822            reading.clone(),
1823            read_again.clone(),
1824            canceled_1.clone(),
1825            canceled_2.clone(),
1826            clone_for_branch_2.clone(),
1827            reason_1.clone(),
1828            reason_2.clone(),
1829            cancel_promise.clone(),
1830            DefaultTeeCancelAlgorithm::Cancel1Algorithm,
1831        );
1832
1833        let underlying_source_type_branch_1 = UnderlyingSourceType::Tee(&tee_source_1);
1834
1835        let tee_source_2 = DefaultTeeUnderlyingSource::new(
1836            cx,
1837            &reader,
1838            self,
1839            reading,
1840            read_again,
1841            canceled_1.clone(),
1842            canceled_2.clone(),
1843            clone_for_branch_2,
1844            reason_1,
1845            reason_2,
1846            cancel_promise.clone(),
1847            DefaultTeeCancelAlgorithm::Cancel2Algorithm,
1848        );
1849
1850        let underlying_source_type_branch_2 = UnderlyingSourceType::Tee(&tee_source_2);
1851
1852        // Set branch_1 to ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel1Algorithm).
1853        let branch_1 = create_readable_stream(
1854            cx,
1855            &self.global(),
1856            underlying_source_type_branch_1,
1857            None,
1858            None,
1859        );
1860        tee_source_1.set_branch_1(&branch_1);
1861        tee_source_2.set_branch_1(&branch_1);
1862
1863        // Set branch_2 to ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel2Algorithm).
1864        let branch_2 = create_readable_stream(
1865            cx,
1866            &self.global(),
1867            underlying_source_type_branch_2,
1868            None,
1869            None,
1870        );
1871        tee_source_1.set_branch_2(&branch_2);
1872        tee_source_2.set_branch_2(&branch_2);
1873
1874        // Upon rejection of reader.[[closedPromise]] with reason r,
1875        reader.default_tee_append_native_handler_to_closed_promise(
1876            cx,
1877            &branch_1,
1878            &branch_2,
1879            canceled_1,
1880            canceled_2,
1881            cancel_promise,
1882        );
1883
1884        // Return « branch_1, branch_2 ».
1885        Ok(vec![branch_1, branch_2])
1886    }
1887
1888    /// <https://streams.spec.whatwg.org/#readable-stream-pipe-to>
1889    #[allow(clippy::too_many_arguments)]
1890    pub(crate) fn pipe_to(
1891        &self,
1892        cx: &mut CurrentRealm,
1893        global: &GlobalScope,
1894        dest: &WritableStream,
1895        prevent_close: bool,
1896        prevent_abort: bool,
1897        prevent_cancel: bool,
1898        signal: Option<&AbortSignal>,
1899    ) -> Rc<Promise> {
1900        // Assert: source implements ReadableStream.
1901        // Assert: dest implements WritableStream.
1902        // Assert: prevent_close, prevent_abort, and prevent_cancel are all booleans.
1903        // Done with method signature types.
1904
1905        // If signal was not given, let signal be undefined.
1906        // Assert: either signal is undefined, or signal implements AbortSignal.
1907        // Note: done with the `signal` argument.
1908
1909        // Assert: ! IsReadableStreamLocked(source) is false.
1910        assert!(!self.is_locked());
1911
1912        // Assert: ! IsWritableStreamLocked(dest) is false.
1913        assert!(!dest.is_locked());
1914
1915        // If source.[[controller]] implements ReadableByteStreamController,
1916        // let reader be either ! AcquireReadableStreamBYOBReader(source)
1917        // or ! AcquireReadableStreamDefaultReader(source),
1918        // at the user agent’s discretion.
1919        // Note: for now only using default readers.
1920
1921        // Otherwise, let reader be ! AcquireReadableStreamDefaultReader(source).
1922        let reader = self
1923            .acquire_default_reader(cx)
1924            .expect("Acquiring a default reader for pipe_to cannot fail");
1925
1926        // Let writer be ! AcquireWritableStreamDefaultWriter(dest).
1927        let writer = dest
1928            .aquire_default_writer(cx, global)
1929            .expect("Acquiring a default writer for pipe_to cannot fail");
1930
1931        // Set source.[[disturbed]] to true.
1932        self.disturbed.set(true);
1933
1934        // Let shuttingDown be false.
1935        // Done below with default.
1936
1937        // Let promise be a new promise.
1938        let promise = Promise::new(cx, global);
1939
1940        // In parallel, but not really, using reader and writer, read all chunks from source and write them to dest.
1941        rooted!(&in(cx) let pipe_to = PipeTo {
1942            reader: Dom::from_ref(&reader),
1943            writer: Dom::from_ref(&writer),
1944            pending_writes: Default::default(),
1945            state: Default::default(),
1946            prevent_abort,
1947            prevent_cancel,
1948            prevent_close,
1949            shutting_down: Default::default(),
1950            abort_reason: Default::default(),
1951            shutdown_error: Default::default(),
1952            shutdown_action_promise:  Default::default(),
1953            result_promise: promise.clone(),
1954        });
1955
1956        // If signal is not undefined,
1957        // Note: moving the steps to here, so that the `PipeTo` is available.
1958        if let Some(signal) = signal {
1959            // Let abortAlgorithm be the following steps:
1960            // Note: steps are implemented at call site.
1961            rooted!(&in(cx) let abort_algorithm = AbortAlgorithm::StreamPiping(pipe_to.clone()));
1962
1963            // If signal is aborted, perform abortAlgorithm and return promise.
1964            if signal.aborted() {
1965                signal.run_abort_algorithm(cx, global, &abort_algorithm);
1966                return promise;
1967            }
1968
1969            // Add abortAlgorithm to signal.
1970            signal.add(&abort_algorithm);
1971        }
1972
1973        // Note: perfom checks now, since streams can start as closed or errored.
1974        pipe_to.check_and_propagate_errors_forward(cx, global);
1975        pipe_to.check_and_propagate_errors_backward(cx, global);
1976        pipe_to.check_and_propagate_closing_forward(cx, global);
1977        pipe_to.check_and_propagate_closing_backward(cx, global);
1978
1979        // If we are not closed or errored,
1980        if *pipe_to.state.borrow() == PipeToState::Starting {
1981            // Start the pipe, by waiting on the writer being ready for a chunk.
1982            pipe_to.wait_for_writer_ready(cx, global);
1983        }
1984
1985        // Return promise.
1986        promise
1987    }
1988
1989    /// <https://streams.spec.whatwg.org/#readable-stream-tee>
1990    pub(crate) fn tee(
1991        &self,
1992        cx: &mut JSContext,
1993        clone_for_branch_2: bool,
1994    ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1995        // Assert: stream implements ReadableStream.
1996        // Assert: cloneForBranch2 is a boolean.
1997
1998        match self.controller.borrow().as_ref() {
1999            Some(ControllerType::Default(_)) => {
2000                // Return ? ReadableStreamDefaultTee(stream, cloneForBranch2).
2001                self.default_tee(cx, clone_for_branch_2)
2002            },
2003            Some(ControllerType::Byte(_)) => {
2004                // If stream.[[controller]] implements ReadableByteStreamController,
2005                // return ? ReadableByteStreamTee(stream).
2006                self.byte_tee(cx)
2007            },
2008            None => {
2009                unreachable!("Stream should have a controller.");
2010            },
2011        }
2012    }
2013
2014    /// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller-from-underlying-source>
2015    fn set_up_byte_controller(
2016        &self,
2017        cx: &mut JSContext,
2018        global: &GlobalScope,
2019        underlying_source_dict: JsUnderlyingSource,
2020        underlying_source_handle: SafeHandleObject,
2021        stream: DomRoot<ReadableStream>,
2022        strategy_hwm: f64,
2023    ) -> Fallible<()> {
2024        // Let pullAlgorithm be an algorithm that returns a promise resolved with undefined.
2025        // Let cancelAlgorithm be an algorithm that returns a promise resolved with undefined.
2026        // If underlyingSourceDict["start"] exists, then set startAlgorithm to an algorithm which returns the result
2027        // of invoking underlyingSourceDict["start"] with argument list « controller »
2028        // and callback this value underlyingSource.
2029        // If underlyingSourceDict["pull"] exists, then set pullAlgorithm to an algorithm which returns the result
2030        // of invoking underlyingSourceDict["pull"] with argument list « controller »
2031        // and callback this value underlyingSource.
2032        // If underlyingSourceDict["cancel"] exists, then set cancelAlgorithm to an algorithm which takes an
2033        // argument reason and returns the result of invoking underlyingSourceDict["cancel"] with argument list
2034        // « reason » and callback this value underlyingSource.
2035
2036        // Let autoAllocateChunkSize be underlyingSourceDict["autoAllocateChunkSize"],
2037        // if it exists, or undefined otherwise.
2038        // If autoAllocateChunkSize is 0, then throw a TypeError exception.
2039        if let Some(0) = underlying_source_dict.autoAllocateChunkSize {
2040            return Err(Error::Type(c"autoAllocateChunkSize cannot be 0".to_owned()));
2041        }
2042
2043        let controller = ReadableByteStreamController::new(
2044            cx,
2045            UnderlyingSourceType::Js(underlying_source_dict),
2046            strategy_hwm,
2047            global,
2048        );
2049
2050        // Note: this must be done before `setup`,
2051        // otherwise `thisOb` is null in the start callback.
2052        controller.set_underlying_source_this_object(underlying_source_handle);
2053
2054        // Perform ? SetUpReadableByteStreamController(stream, controller, startAlgorithm,
2055        // pullAlgorithm, cancelAlgorithm, highWaterMark, autoAllocateChunkSize).
2056        controller.setup(cx, global, stream)
2057    }
2058
2059    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2060    pub(crate) fn setup_cross_realm_transform_readable(
2061        &self,
2062        cx: &mut JSContext,
2063        port: &MessagePort,
2064    ) {
2065        let port_id = port.message_port_id();
2066        let global = self.global();
2067
2068        // Perform ! InitializeReadableStream(stream).
2069        // Done in `new_inherited`.
2070
2071        // Let sizeAlgorithm be an algorithm that returns 1.
2072        let size_algorithm = extract_size_algorithm(cx, &QueuingStrategy::default());
2073
2074        // Note: other algorithms defined in the underlying source container.
2075
2076        // Let controller be a new ReadableStreamDefaultController.
2077        let controller = ReadableStreamDefaultController::new(
2078            cx,
2079            &self.global(),
2080            UnderlyingSourceType::Transfer(port),
2081            0.,
2082            size_algorithm,
2083        );
2084
2085        // Add a handler for port’s message event with the following steps:
2086        // Add a handler for port’s messageerror event with the following steps:
2087        rooted!(&in(cx) let cross_realm_transform_readable = CrossRealmTransformReadable {
2088            controller: Dom::from_ref(&controller),
2089        });
2090        global.note_cross_realm_transform_readable(&cross_realm_transform_readable, port_id);
2091
2092        // Enable port’s port message queue.
2093        port.Start(cx);
2094
2095        // Perform ! SetUpReadableStreamDefaultController
2096        controller
2097            .setup(cx, DomRoot::from_ref(self))
2098            .expect("Setting up controller for transfer cannot fail.");
2099    }
2100}
2101
2102impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
2103    /// <https://streams.spec.whatwg.org/#rs-constructor>
2104    fn Constructor(
2105        cx: &mut JSContext,
2106        global: &GlobalScope,
2107        proto: Option<SafeHandleObject>,
2108        underlying_source: Option<*mut JSObject>,
2109        strategy: &QueuingStrategy,
2110    ) -> Fallible<DomRoot<Self>> {
2111        // If underlyingSource is missing, set it to null.
2112        rooted!(&in(cx) let underlying_source_obj = underlying_source.unwrap_or(ptr::null_mut()));
2113        // Let underlyingSourceDict be underlyingSource,
2114        // converted to an IDL value of type UnderlyingSource.
2115        let underlying_source_dict = if !underlying_source_obj.is_null() {
2116            rooted!(&in(cx) let obj_val = ObjectValue(underlying_source_obj.get()));
2117            match JsUnderlyingSource::new(cx, obj_val.handle()) {
2118                Ok(ConversionResult::Success(val)) => val,
2119                Ok(ConversionResult::Failure(error)) => {
2120                    return Err(Error::Type(error.into_owned()));
2121                },
2122                _ => {
2123                    return Err(Error::JSFailed);
2124                },
2125            }
2126        } else {
2127            JsUnderlyingSource::empty()
2128        };
2129
2130        // Perform ! InitializeReadableStream(this).
2131        let stream = ReadableStream::new_with_proto(cx, global, proto);
2132
2133        if underlying_source_dict.type_.is_some() {
2134            // If strategy["size"] exists, throw a RangeError exception.
2135            if strategy.size.is_some() {
2136                return Err(Error::Range(
2137                    c"size is not supported for byte streams".to_owned(),
2138                ));
2139            }
2140
2141            // Let highWaterMark be ? ExtractHighWaterMark(strategy, 0).
2142            let strategy_hwm = extract_high_water_mark(strategy, 0.0)?;
2143
2144            // Perform ? SetUpReadableByteStreamControllerFromUnderlyingSource(this,
2145            // underlyingSource, underlyingSourceDict, highWaterMark).
2146            stream.set_up_byte_controller(
2147                cx,
2148                global,
2149                underlying_source_dict,
2150                underlying_source_obj.handle(),
2151                stream.clone(),
2152                strategy_hwm,
2153            )?;
2154        } else {
2155            // Let highWaterMark be ? ExtractHighWaterMark(strategy, 1).
2156            let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
2157
2158            // Let sizeAlgorithm be ! ExtractSizeAlgorithm(strategy).
2159            let size_algorithm = extract_size_algorithm(cx, strategy);
2160
2161            let controller = ReadableStreamDefaultController::new(
2162                cx,
2163                global,
2164                UnderlyingSourceType::Js(underlying_source_dict),
2165                high_water_mark,
2166                size_algorithm,
2167            );
2168
2169            // Note: this must be done before `setup`,
2170            // otherwise `thisOb` is null in the start callback.
2171            controller.set_underlying_source_this_object(underlying_source_obj.handle());
2172
2173            // Perform ? SetUpReadableStreamDefaultControllerFromUnderlyingSource
2174            controller.setup(cx, stream.clone())?;
2175        };
2176
2177        Ok(stream)
2178    }
2179
2180    /// <https://streams.spec.whatwg.org/#rs-locked>
2181    fn Locked(&self) -> bool {
2182        self.is_locked()
2183    }
2184
2185    /// <https://streams.spec.whatwg.org/#rs-cancel>
2186    fn Cancel(&self, cx: &mut JSContext, reason: SafeHandleValue) -> Rc<Promise> {
2187        let global = self.global();
2188        if self.is_locked() {
2189            // If ! IsReadableStreamLocked(this) is true,
2190            // return a promise rejected with a TypeError exception.
2191            let promise = Promise::new(cx, &global);
2192            promise.reject_error(cx, Error::Type(c"stream is locked".to_owned()));
2193            promise
2194        } else {
2195            // Return ! ReadableStreamCancel(this, reason).
2196            self.cancel(cx, &global, reason)
2197        }
2198    }
2199
2200    /// <https://streams.spec.whatwg.org/#rs-get-reader>
2201    fn GetReader(
2202        &self,
2203        cx: &mut JSContext,
2204        options: &ReadableStreamGetReaderOptions,
2205    ) -> Fallible<ReadableStreamReader> {
2206        // 1, If options["mode"] does not exist, return ? AcquireReadableStreamDefaultReader(this).
2207        if options.mode.is_none() {
2208            return Ok(ReadableStreamReader::ReadableStreamDefaultReader(
2209                self.acquire_default_reader(cx)?,
2210            ));
2211        }
2212        // 2. Assert: options["mode"] is "byob".
2213        assert!(options.mode.unwrap() == ReadableStreamReaderMode::Byob);
2214
2215        // 3. Return ? AcquireReadableStreamBYOBReader(this).
2216        Ok(ReadableStreamReader::ReadableStreamBYOBReader(
2217            self.acquire_byob_reader(cx)?,
2218        ))
2219    }
2220
2221    /// <https://streams.spec.whatwg.org/#rs-tee>
2222    fn Tee(&self, cx: &mut JSContext) -> Fallible<Vec<DomRoot<ReadableStream>>> {
2223        // Return ? ReadableStreamTee(this, false).
2224        self.tee(cx, false)
2225    }
2226
2227    /// <https://streams.spec.whatwg.org/#rs-pipe-to>
2228    fn PipeTo(
2229        &self,
2230        cx: &mut CurrentRealm,
2231        destination: &WritableStream,
2232        options: &StreamPipeOptions,
2233    ) -> Rc<Promise> {
2234        let global = self.global();
2235
2236        // If ! IsReadableStreamLocked(this) is true,
2237        if self.is_locked() {
2238            // return a promise rejected with a TypeError exception.
2239            let promise = Promise::new(cx, &global);
2240            promise.reject_error(cx, Error::Type(c"Source stream is locked".to_owned()));
2241            return promise;
2242        }
2243
2244        // If ! IsWritableStreamLocked(destination) is true,
2245        if destination.is_locked() {
2246            // return a promise rejected with a TypeError exception.
2247            let promise = Promise::new(cx, &global);
2248            promise.reject_error(cx, Error::Type(c"Destination stream is locked".to_owned()));
2249            return promise;
2250        }
2251
2252        // Let signal be options["signal"] if it exists, or undefined otherwise.
2253        let signal = options.signal.as_deref();
2254
2255        // Return ! ReadableStreamPipeTo.
2256        self.pipe_to(
2257            cx,
2258            &global,
2259            destination,
2260            options.preventClose,
2261            options.preventAbort,
2262            options.preventCancel,
2263            signal,
2264        )
2265    }
2266
2267    /// <https://streams.spec.whatwg.org/#rs-pipe-through>
2268    fn PipeThrough(
2269        &self,
2270        cx: &mut CurrentRealm,
2271        transform: &ReadableWritablePair,
2272        options: &StreamPipeOptions,
2273    ) -> Fallible<DomRoot<ReadableStream>> {
2274        let global = self.global();
2275
2276        // If ! IsReadableStreamLocked(this) is true, throw a TypeError exception.
2277        if self.is_locked() {
2278            return Err(Error::Type(c"Source stream is locked".to_owned()));
2279        }
2280
2281        // If ! IsWritableStreamLocked(transform["writable"]) is true, throw a TypeError exception.
2282        if transform.writable.is_locked() {
2283            return Err(Error::Type(c"Destination stream is locked".to_owned()));
2284        }
2285
2286        // Let signal be options["signal"] if it exists, or undefined otherwise.
2287        let signal = options.signal.as_deref();
2288
2289        // Let promise be ! ReadableStreamPipeTo(this, transform["writable"],
2290        // options["preventClose"], options["preventAbort"], options["preventCancel"], signal).
2291        let promise = self.pipe_to(
2292            cx,
2293            &global,
2294            &transform.writable,
2295            options.preventClose,
2296            options.preventAbort,
2297            options.preventCancel,
2298            signal,
2299        );
2300
2301        // Set promise.[[PromiseIsHandled]] to true.
2302        promise.set_promise_is_handled(cx);
2303
2304        // Return transform["readable"].
2305        Ok(transform.readable.clone())
2306    }
2307}
2308
2309/// The initial steps for the message handler for both readable and writable cross realm transforms.
2310/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2311/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
2312pub(crate) fn get_type_and_value_from_message(
2313    cx: &mut JSContext,
2314    data: SafeHandleValue,
2315    value: SafeMutableHandleValue,
2316) -> DOMString {
2317    // Let data be the data of the message.
2318    // Note: we are passed the data as argument,
2319    // which originates in the return value of `structuredclone::read`.
2320
2321    // Assert: data is an Object.
2322    assert!(data.is_object());
2323    rooted!(&in(cx) let data_object = data.to_object());
2324
2325    // Let type be ! Get(data, "type").
2326    let type_ = get_property::<DOMString>(
2327        cx,
2328        data_object.handle(),
2329        c"type",
2330        StringificationBehavior::Empty,
2331    );
2332
2333    // Let value be ! Get(data, "value").
2334    get_property_jsval(cx, data_object.handle(), c"value", value)
2335        .expect("Getting the value should not fail.");
2336
2337    // Assert: type is a String.
2338    type_
2339        .expect("The type of the message should be a string")
2340        .expect("Property should be present")
2341}
2342
2343impl js::gc::Rootable for CrossRealmTransformReadable {}
2344
2345/// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2346/// A wrapper to handle `message` and `messageerror` events
2347/// for the port used by the transfered stream.
2348#[derive(Clone, JSTraceable, MallocSizeOf)]
2349#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
2350pub(crate) struct CrossRealmTransformReadable {
2351    /// The controller used in the algorithm.
2352    controller: Dom<ReadableStreamDefaultController>,
2353}
2354
2355impl CrossRealmTransformReadable {
2356    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformreadable>
2357    /// Add a handler for port’s message event with the following steps:
2358    pub(crate) fn handle_message(
2359        &self,
2360        cx: &mut CurrentRealm,
2361        global: &GlobalScope,
2362        port: &MessagePort,
2363        message: SafeHandleValue,
2364    ) {
2365        rooted!(&in(cx) let mut value = UndefinedValue());
2366        let type_string = get_type_and_value_from_message(cx, message, value.handle_mut());
2367
2368        // If type is "chunk",
2369        if type_string == "chunk" {
2370            // Perform ! ReadableStreamDefaultControllerEnqueue(controller, value).
2371            self.controller
2372                .enqueue(cx, value.handle())
2373                .expect("Enqueing a chunk should not fail.");
2374        }
2375
2376        // Otherwise, if type is "close",
2377        if type_string == "close" {
2378            // Perform ! ReadableStreamDefaultControllerClose(controller).
2379            self.controller.close(cx);
2380
2381            // Disentangle port.
2382            global.disentangle_port(cx, port);
2383        }
2384
2385        // Otherwise, if type is "error",
2386        if type_string == "error" {
2387            // Perform ! ReadableStreamDefaultControllerError(controller, value).
2388            self.controller.error(cx, value.handle());
2389
2390            // Disentangle port.
2391            global.disentangle_port(cx, port);
2392        }
2393    }
2394
2395    /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable>
2396    /// Add a handler for port’s messageerror event with the following steps:
2397    pub(crate) fn handle_error(
2398        &self,
2399        cx: &mut CurrentRealm,
2400        global: &GlobalScope,
2401        port: &MessagePort,
2402    ) {
2403        // Let error be a new "DataCloneError" DOMException.
2404        let error = DOMException::new(cx, global, DOMErrorName::DataCloneError);
2405        rooted!(&in(cx) let mut rooted_error = UndefinedValue());
2406        error.safe_to_jsval(cx, rooted_error.handle_mut());
2407
2408        // Perform ! CrossRealmTransformSendError(port, error).
2409        port.cross_realm_transform_send_error(cx, rooted_error.handle());
2410
2411        // Perform ! ReadableStreamDefaultControllerError(controller, error).
2412        self.controller.error(cx, rooted_error.handle());
2413
2414        // Disentangle port.
2415        global.disentangle_port(cx, port);
2416    }
2417}
2418
2419/// Get the `done` property of an object that a read promise resolved to.
2420pub(crate) fn get_read_promise_done(
2421    cx: &mut JSContext,
2422    v: &SafeHandleValue,
2423) -> Result<bool, Error> {
2424    if !v.is_object() {
2425        return Err(Error::Type(c"Unknown format for done property.".to_owned()));
2426    }
2427
2428    rooted!(&in(cx) let object = v.to_object());
2429    get_property::<bool>(cx, object.handle(), c"done", ())?
2430        .ok_or(Error::Type(c"Promise has no done property.".to_owned()))
2431}
2432
2433/// Get the `value` property of an object that a read promise resolved to.
2434pub(crate) fn get_read_promise_bytes(
2435    cx: &mut JSContext,
2436    v: &SafeHandleValue,
2437) -> Result<Vec<u8>, Error> {
2438    if !v.is_object() {
2439        return Err(Error::Type(
2440            c"Unknown format for for bytes read.".to_owned(),
2441        ));
2442    }
2443
2444    rooted!(&in(cx) let object = v.to_object());
2445    get_property::<Vec<u8>>(
2446        cx,
2447        object.handle(),
2448        c"value",
2449        ConversionBehavior::EnforceRange,
2450    )?
2451    .ok_or(Error::Type(c"Promise has no value property.".to_owned()))
2452}
2453
2454/// Convert a raw stream `chunk` JS value to `Vec<u8>`.
2455/// This mirrors the conversion used inside `get_read_promise_bytes`,
2456/// but operates on the raw chunk (no `{ value, done }` wrapper).
2457pub(crate) fn bytes_from_chunk_jsval(
2458    cx: &mut JSContext,
2459    chunk: &RootedTraceableBox<Heap<JSVal>>,
2460) -> Result<Vec<u8>, Error> {
2461    match Vec::<u8>::safe_from_jsval(cx, chunk.handle(), ConversionBehavior::EnforceRange) {
2462        Ok(ConversionResult::Success(vec)) => Ok(vec),
2463        Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.into_owned())),
2464        _ => Err(Error::Type(c"Unknown format for bytes read.".to_owned())),
2465    }
2466}
2467
2468/// <https://streams.spec.whatwg.org/#rs-transfer>
2469impl Transferable for ReadableStream {
2470    type Index = MessagePortIndex;
2471    type Data = MessagePortImpl;
2472
2473    /// <https://streams.spec.whatwg.org/#ref-for-transfer-steps>
2474    fn transfer(&self, cx: &mut JSContext) -> Fallible<(MessagePortId, MessagePortImpl)> {
2475        // Step 1. If ! IsReadableStreamLocked(value) is true, throw a
2476        // "DataCloneError" DOMException.
2477        if self.is_locked() {
2478            return Err(Error::DataClone(None));
2479        }
2480
2481        let global = self.global();
2482        let mut realm = enter_auto_realm(cx, &*global);
2483        let mut realm = realm.current_realm();
2484        let cx = &mut realm;
2485
2486        // Step 2. Let port1 be a new MessagePort in the current Realm.
2487        let port_1 = MessagePort::new(cx, &global);
2488        global.track_message_port(&port_1, None);
2489
2490        // Step 3. Let port2 be a new MessagePort in the current Realm.
2491        let port_2 = MessagePort::new(cx, &global);
2492        global.track_message_port(&port_2, None);
2493
2494        // Step 4. Entangle port1 and port2.
2495        global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
2496
2497        // Step 5. Let writable be a new WritableStream in the current Realm.
2498        let writable = WritableStream::new_with_proto(cx, &global, None);
2499
2500        // Step 6. Perform ! SetUpCrossRealmTransformWritable(writable, port1).
2501        writable.setup_cross_realm_transform_writable(cx, &port_1);
2502
2503        // Step 7. Let promise be ! ReadableStreamPipeTo(value, writable, false, false, false).
2504        let promise = self.pipe_to(cx, &global, &writable, false, false, false, None);
2505
2506        // Step 8. Set promise.[[PromiseIsHandled]] to true.
2507        promise.set_promise_is_handled(cx);
2508
2509        // Step 9. Set dataHolder.[[port]] to ! StructuredSerializeWithTransfer(port2, « port2 »).
2510        port_2.transfer(cx)
2511    }
2512
2513    /// <https://streams.spec.whatwg.org/#ref-for-transfer-receiving-steps>
2514    fn transfer_receive(
2515        cx: &mut JSContext,
2516        owner: &GlobalScope,
2517        id: MessagePortId,
2518        port_impl: MessagePortImpl,
2519    ) -> Result<DomRoot<Self>, ()> {
2520        // Their transfer-receiving steps, given dataHolder and value, are:
2521        // Note: dataHolder is used in `structuredclone.rs`, and value is created here.
2522        let value = ReadableStream::new_with_proto(cx, owner, None);
2523
2524        // Step 1. Let deserializedRecord be !
2525        // StructuredDeserializeWithTransfer(dataHolder.[[port]], the current
2526        // Realm).
2527        // Done with the `Deserialize` derive of `MessagePortImpl`.
2528
2529        // Step 2. Let port be deserializedRecord.[[Deserialized]].
2530        let transferred_port = MessagePort::transfer_receive(cx, owner, id, port_impl)?;
2531
2532        // Step 3. Perform ! SetUpCrossRealmTransformReadable(value, port).
2533        value.setup_cross_realm_transform_readable(cx, &transferred_port);
2534        Ok(value)
2535    }
2536
2537    /// Note: we are relying on the port transfer, so the data returned here are related to the port.
2538    fn serialized_storage<'a>(
2539        data: StructuredData<'a, '_>,
2540    ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
2541        match data {
2542            StructuredData::Reader(r) => &mut r.port_impls,
2543            StructuredData::Writer(w) => &mut w.ports,
2544        }
2545    }
2546}
2547
2548/// <https://streams.spec.whatwg.org/#readablestream-pipe-through>
2549/// Pipe a ReadableStream through a transform and return the readable side.
2550/// Note: Unlike [`ReadableStream::PipeThrough`], this is not failliable.
2551///
2552/// Note: Spec says it takes same options as [`ReadableStream::PipeThrough`],
2553/// however all usages use default `false`.
2554pub(crate) fn pipe_through(
2555    source: &ReadableStream,
2556    cx: &mut JSContext,
2557    global: &GlobalScope,
2558    transform: &TextDecoderStream,
2559) -> DomRoot<ReadableStream> {
2560    // Step 1. Assert: `! IsReadableStreamLocked(readable)` is false.
2561
2562    // Step 2. Assert: `! IsWritableStreamLocked(transform.[[writable]])` is false.
2563
2564    // Above is done in `pipe_to` below.
2565    let mut realm = CurrentRealm::assert(cx);
2566    // Step 4. Let promise be ! ReadableStreamPipeTo(readable,
2567    // transform.[[writable]], preventClose, preventAbort, preventCancel, signalArg).
2568    let promise = source.pipe_to(
2569        &mut realm,
2570        global,
2571        &transform.Writable(),
2572        false, // preventClose
2573        false, // preventAbort
2574        false, // preventCancel
2575        None,  // signal
2576    );
2577
2578    // Step 5. Set promise.[[PromiseIsHandled]] to true.
2579    promise.set_promise_is_handled(cx);
2580    // Step 6. Return transform.[[readable]].
2581    transform.Readable()
2582}