1use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr::{self};
8use std::rc::Rc;
9
10use base::generic_channel::GenericSharedMemory;
11use base::id::{MessagePortId, MessagePortIndex};
12use constellation_traits::MessagePortImpl;
13use dom_struct::dom_struct;
14use js::jsapi::{Heap, JSObject};
15use js::jsval::{JSVal, ObjectValue, UndefinedValue};
16use js::realm::CurrentRealm;
17use js::rust::{
18 HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
19 MutableHandleValue as SafeMutableHandleValue,
20};
21use js::typedarray::ArrayBufferViewU8;
22use rustc_hash::FxHashMap;
23use script_bindings::conversions::SafeToJSValConvertible;
24
25use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
26use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{
27 ReadableStreamGetReaderOptions, ReadableStreamMethods, ReadableStreamReaderMode,
28 ReadableWritablePair, StreamPipeOptions,
29};
30use script_bindings::str::DOMString;
31
32use crate::dom::domexception::{DOMErrorName, DOMException};
33use script_bindings::conversions::{is_array_like, StringificationBehavior};
34use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
35use crate::dom::abortsignal::{AbortAlgorithm, AbortSignal};
36use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods;
37use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultController_Binding::ReadableStreamDefaultControllerMethods;
38use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource;
39use crate::dom::bindings::conversions::{ConversionBehavior, ConversionResult, SafeFromJSValConvertible};
40use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
41use crate::dom::bindings::codegen::GenericBindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriter_Binding::WritableStreamDefaultWriterMethods;
42use crate::dom::stream::writablestream::WritableStream;
43use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultReaderOrReadableStreamBYOBReader as ReadableStreamReader;
44use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
45use crate::dom::bindings::root::{DomRoot, MutNullableDom, Dom};
46use crate::dom::bindings::trace::RootedTraceableBox;
47use crate::dom::bindings::utils::get_dictionary_property;
48use crate::dom::stream::byteteeunderlyingsource::{ByteTeeCancelAlgorithm, ByteTeePullAlgorithm, ByteTeeUnderlyingSource};
49use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
50use crate::dom::stream::readablestreamgenericreader::ReadableStreamGenericReader;
51use crate::dom::globalscope::GlobalScope;
52use crate::dom::promise::{wait_for_all_promise, Promise};
53use crate::dom::stream::readablebytestreamcontroller::ReadableByteStreamController;
54use crate::dom::stream::readablestreambyobreader::ReadableStreamBYOBReader;
55use crate::dom::stream::readablestreamdefaultcontroller::ReadableStreamDefaultController;
56use crate::dom::stream::readablestreamdefaultreader::{ReadRequest, ReadableStreamDefaultReader};
57use crate::dom::stream::defaultteeunderlyingsource::DefaultTeeCancelAlgorithm;
58use crate::dom::types::DefaultTeeUnderlyingSource;
59use crate::dom::stream::underlyingsourcecontainer::UnderlyingSourceType;
60use crate::dom::stream::writablestreamdefaultwriter::WritableStreamDefaultWriter;
61use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
62use crate::dom::messageport::MessagePort;
63use crate::realms::{enter_realm, InRealm, enter_auto_realm};
64use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
65use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
66use crate::dom::bindings::transferable::Transferable;
67use crate::dom::bindings::structuredclone::StructuredData;
68
69use crate::dom::bindings::buffer_source::HeapBufferSource;
70use super::readablestreambyobreader::ReadIntoRequest;
71
72#[derive(Clone, Debug, Default, MallocSizeOf, PartialEq)]
74enum PipeToState {
75 #[default]
77 Starting,
78 PendingReady,
80 PendingRead,
82 ShuttingDownWithPendingWrites(Option<ShutdownAction>),
85 ShuttingDownPendingAction,
89 Finalized,
92}
93
94#[derive(Clone, Debug, MallocSizeOf, PartialEq)]
96enum ShutdownAction {
97 WritableStreamAbort,
99 ReadableStreamCancel,
101 WritableStreamDefaultWriterCloseWithErrorPropagation,
103 Abort,
105}
106
107impl js::gc::Rootable for PipeTo {}
108
109#[derive(Clone, JSTraceable, MallocSizeOf)]
118#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
119pub(crate) struct PipeTo {
120 reader: Dom<ReadableStreamDefaultReader>,
122
123 writer: Dom<WritableStreamDefaultWriter>,
125
126 #[ignore_malloc_size_of = "nested Rc"]
129 pending_writes: Rc<RefCell<VecDeque<Rc<Promise>>>>,
130
131 #[conditional_malloc_size_of]
133 #[no_trace]
134 state: Rc<RefCell<PipeToState>>,
135
136 prevent_abort: bool,
138
139 prevent_cancel: bool,
141
142 prevent_close: bool,
144
145 #[conditional_malloc_size_of]
148 shutting_down: Rc<Cell<bool>>,
149
150 #[ignore_malloc_size_of = "mozjs"]
153 abort_reason: Rc<Heap<JSVal>>,
154
155 #[ignore_malloc_size_of = "mozjs"]
158 shutdown_error: Rc<RefCell<Option<Heap<JSVal>>>>,
159
160 #[ignore_malloc_size_of = "nested Rc"]
163 shutdown_action_promise: Rc<RefCell<Option<Rc<Promise>>>>,
164
165 #[conditional_malloc_size_of]
168 result_promise: Rc<Promise>,
169}
170
171impl PipeTo {
172 pub(crate) fn abort_with_reason(
175 &self,
176 cx: &mut CurrentRealm,
177 global: &GlobalScope,
178 reason: SafeHandleValue,
179 ) {
180 if self.shutting_down.get() {
182 return;
183 }
184
185 self.abort_reason.set(reason.get());
189
190 self.set_shutdown_error(reason);
195
196 self.shutdown(cx, global, Some(ShutdownAction::Abort));
202 }
203}
204
205impl Callback for PipeTo {
206 #[expect(unsafe_code)]
215 fn callback(&self, cx: &mut CurrentRealm, result: SafeHandleValue) {
216 let in_realm_proof = cx.into();
217 let realm = InRealm::Already(&in_realm_proof);
218 let global = self.reader.global();
219
220 self.pending_writes.borrow_mut().retain(|p| {
226 let pending = p.is_pending();
227 if !pending {
228 p.set_promise_is_handled();
229 }
230 pending
231 });
232
233 let state_before_checks = self.state.borrow().clone();
235
236 if state_before_checks == PipeToState::PendingRead {
243 let source = self.reader.get_stream().expect("Source stream must be set");
244 if source.is_closed() {
245 let dest = self
246 .writer
247 .get_stream()
248 .expect("Destination stream must be set");
249
250 if dest.is_writable() && !dest.close_queued_or_in_flight() {
253 let Ok(done) = get_read_promise_done(cx.into(), &result, CanGc::from_cx(cx))
254 else {
255 return;
262 };
263
264 if !done {
265 self.write_chunk(cx.into(), &global, result, CanGc::from_cx(cx));
267 }
268 }
269 }
270 }
271
272 self.check_and_propagate_errors_forward(cx, &global);
273 self.check_and_propagate_errors_backward(cx, &global);
274 self.check_and_propagate_closing_forward(cx, &global);
275 self.check_and_propagate_closing_backward(cx, &global);
276
277 let state = self.state.borrow().clone();
279
280 if state != state_before_checks {
284 return;
285 }
286
287 match state {
288 PipeToState::Starting => unreachable!("PipeTo should not be in the Starting state."),
289 PipeToState::PendingReady => {
290 self.read_chunk(&global, realm, CanGc::from_cx(cx));
292 },
293 PipeToState::PendingRead => {
294 self.write_chunk(cx.into(), &global, result, CanGc::from_cx(cx));
296
297 if self.shutting_down.get() {
299 return;
300 }
301
302 self.wait_for_writer_ready(&global, realm, CanGc::from_cx(cx));
304 },
305 PipeToState::ShuttingDownWithPendingWrites(action) => {
306 if let Some(write) = self.pending_writes.borrow_mut().front().cloned() {
309 self.wait_on_pending_write(&global, write, realm, CanGc::from_cx(cx));
310 return;
311 }
312
313 if let Some(action) = action {
315 self.perform_action(cx, &global, action);
317 } else {
318 self.finalize(cx.into(), &global, CanGc::from_cx(cx));
320 }
321 },
322 PipeToState::ShuttingDownPendingAction => {
323 let Some(ref promise) = *self.shutdown_action_promise.borrow() else {
324 unreachable!();
325 };
326 if promise.is_pending() {
327 return;
331 }
332
333 let is_array_like = {
334 if !result.is_object() {
335 false
336 } else {
337 unsafe { is_array_like::<crate::DomTypeHolder>(cx.raw_cx(), result) }
338 }
339 };
340
341 if !result.is_undefined() && !is_array_like {
343 self.set_shutdown_error(result);
353 }
354 self.finalize(cx.into(), &global, CanGc::from_cx(cx));
355 },
356 PipeToState::Finalized => {},
357 }
358 }
359}
360
361impl PipeTo {
362 fn set_shutdown_error(&self, error: SafeHandleValue) {
365 *self.shutdown_error.borrow_mut() = Some(Heap::default());
366 let Some(ref heap) = *self.shutdown_error.borrow() else {
367 unreachable!("Option set to Some(heap) above.");
368 };
369 heap.set(error.get())
370 }
371
372 fn wait_for_writer_ready(&self, global: &GlobalScope, realm: InRealm, can_gc: CanGc) {
375 {
376 let mut state = self.state.borrow_mut();
377 *state = PipeToState::PendingReady;
378 }
379
380 let ready_promise = self.writer.Ready();
381 if ready_promise.is_fulfilled() {
382 self.read_chunk(global, realm, can_gc);
383 } else {
384 let handler = PromiseNativeHandler::new(
385 global,
386 Some(Box::new(self.clone())),
387 Some(Box::new(self.clone())),
388 can_gc,
389 );
390 ready_promise.append_native_handler(&handler, realm, can_gc);
391
392 let closed_promise = self.reader.Closed();
396 closed_promise.append_native_handler(&handler, realm, can_gc);
397 }
398 }
399
400 fn read_chunk(&self, global: &GlobalScope, realm: InRealm, can_gc: CanGc) {
402 *self.state.borrow_mut() = PipeToState::PendingRead;
403 let chunk_promise = self.reader.Read(can_gc);
404 let handler = PromiseNativeHandler::new(
405 global,
406 Some(Box::new(self.clone())),
407 Some(Box::new(self.clone())),
408 can_gc,
409 );
410 chunk_promise.append_native_handler(&handler, realm, can_gc);
411
412 let ready_promise = self.writer.Closed();
415 ready_promise.append_native_handler(&handler, realm, can_gc);
416 }
417
418 #[expect(unsafe_code)]
421 fn write_chunk(
422 &self,
423 cx: SafeJSContext,
424 global: &GlobalScope,
425 chunk: SafeHandleValue,
426 can_gc: CanGc,
427 ) -> bool {
428 if chunk.is_object() {
429 rooted!(in(*cx) let object = chunk.to_object());
430 rooted!(in(*cx) let mut bytes = UndefinedValue());
431 let has_value = unsafe {
432 get_dictionary_property(*cx, object.handle(), c"value", bytes.handle_mut(), can_gc)
433 .expect("Chunk should have a value.")
434 };
435 if has_value {
436 let write_promise = self.writer.write(cx, global, bytes.handle(), can_gc);
438 self.pending_writes.borrow_mut().push_back(write_promise);
439 return true;
440 }
441 }
442 false
443 }
444
445 fn wait_on_pending_write(
449 &self,
450 global: &GlobalScope,
451 promise: Rc<Promise>,
452 realm: InRealm,
453 can_gc: CanGc,
454 ) {
455 let handler = PromiseNativeHandler::new(
456 global,
457 Some(Box::new(self.clone())),
458 Some(Box::new(self.clone())),
459 can_gc,
460 );
461 promise.append_native_handler(&handler, realm, can_gc);
462 }
463
464 fn check_and_propagate_errors_forward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
467 if self.shutting_down.get() {
470 return;
471 }
472
473 let source = self
475 .reader
476 .get_stream()
477 .expect("Reader should still have a stream");
478 if source.is_errored() {
479 rooted!(&in(cx) let mut source_error = UndefinedValue());
480 source.get_stored_error(source_error.handle_mut());
481 self.set_shutdown_error(source_error.handle());
482
483 if !self.prevent_abort {
485 self.shutdown(cx, global, Some(ShutdownAction::WritableStreamAbort))
488 } else {
489 self.shutdown(cx, global, None);
491 }
492 }
493 }
494
495 fn check_and_propagate_errors_backward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
498 if self.shutting_down.get() {
501 return;
502 }
503
504 let dest = self
506 .writer
507 .get_stream()
508 .expect("Writer should still have a stream");
509 if dest.is_errored() {
510 rooted!(&in(cx) let mut dest_error = UndefinedValue());
511 dest.get_stored_error(dest_error.handle_mut());
512 self.set_shutdown_error(dest_error.handle());
513
514 if !self.prevent_cancel {
516 self.shutdown(cx, global, Some(ShutdownAction::ReadableStreamCancel))
519 } else {
520 self.shutdown(cx, global, None);
522 }
523 }
524 }
525
526 fn check_and_propagate_closing_forward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
529 if self.shutting_down.get() {
532 return;
533 }
534
535 let source = self
537 .reader
538 .get_stream()
539 .expect("Reader should still have a stream");
540 if source.is_closed() {
541 if !self.prevent_close {
543 self.shutdown(
546 cx,
547 global,
548 Some(ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation),
549 )
550 } else {
551 self.shutdown(cx, global, None);
553 }
554 }
555 }
556
557 fn check_and_propagate_closing_backward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
560 if self.shutting_down.get() {
563 return;
564 }
565
566 let dest = self
569 .writer
570 .get_stream()
571 .expect("Writer should still have a stream");
572 if dest.close_queued_or_in_flight() || dest.is_closed() {
573 rooted!(&in(cx) let mut dest_closed = UndefinedValue());
578 let error =
579 Error::Type(c"Destination is closed or has closed queued or in flight".to_owned());
580 error.to_jsval(
581 cx.into(),
582 global,
583 dest_closed.handle_mut(),
584 CanGc::from_cx(cx),
585 );
586 self.set_shutdown_error(dest_closed.handle());
587
588 if !self.prevent_cancel {
590 self.shutdown(cx, global, Some(ShutdownAction::ReadableStreamCancel))
593 } else {
594 self.shutdown(cx, global, None);
596 }
597 }
598 }
599
600 fn shutdown(
604 &self,
605 cx: &mut CurrentRealm,
606 global: &GlobalScope,
607 action: Option<ShutdownAction>,
608 ) {
609 let realm = cx.into();
610 let realm = InRealm::Already(&realm);
611 if !self.shutting_down.replace(true) {
614 let dest = self.writer.get_stream().expect("Stream must be set");
615 if dest.is_writable() && !dest.close_queued_or_in_flight() {
618 if let Some(write) = self.pending_writes.borrow_mut().front() {
624 *self.state.borrow_mut() = PipeToState::ShuttingDownWithPendingWrites(action);
625 self.wait_on_pending_write(global, write.clone(), realm, CanGc::from_cx(cx));
626 return;
627 }
628 }
629
630 if let Some(action) = action {
632 self.perform_action(cx, global, action);
634 } else {
635 self.finalize(cx.into(), global, CanGc::from_cx(cx));
637 }
638 }
639 }
640
641 fn perform_action(&self, cx: &mut CurrentRealm, global: &GlobalScope, action: ShutdownAction) {
644 let realm = cx.into();
645 let realm = InRealm::Already(&realm);
646 rooted!(&in(cx) let mut error = UndefinedValue());
647 if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
648 error.set(shutdown_error.get());
649 }
650
651 *self.state.borrow_mut() = PipeToState::ShuttingDownPendingAction;
652
653 let promise = match action {
655 ShutdownAction::WritableStreamAbort => {
656 let dest = self.writer.get_stream().expect("Stream must be set");
657 dest.abort(cx, global, error.handle())
658 },
659 ShutdownAction::ReadableStreamCancel => {
660 let source = self
661 .reader
662 .get_stream()
663 .expect("Reader should have a stream.");
664 source.cancel(cx.into(), global, error.handle(), CanGc::from_cx(cx))
665 },
666 ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation => self
667 .writer
668 .close_with_error_propagation(cx.into(), global, CanGc::from_cx(cx)),
669 ShutdownAction::Abort => {
670 rooted!(&in(cx) let mut error = UndefinedValue());
675 error.set(self.abort_reason.get());
676
677 let mut actions = vec![];
679
680 if !self.prevent_abort {
682 let dest = self
683 .writer
684 .get_stream()
685 .expect("Destination stream must be set");
686
687 let promise = if dest.is_writable() {
689 dest.abort(cx, global, error.handle())
691 } else {
692 Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
694 };
695 actions.push(promise);
696 }
697
698 if !self.prevent_cancel {
700 let source = self.reader.get_stream().expect("Source stream must be set");
701
702 let promise = if source.is_readable() {
704 source.cancel(cx.into(), global, error.handle(), CanGc::from_cx(cx))
706 } else {
707 Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
709 };
710 actions.push(promise);
711 }
712
713 wait_for_all_promise(cx.into(), global, actions, realm, CanGc::from_cx(cx))
717 },
718 };
719
720 let handler = PromiseNativeHandler::new(
723 global,
724 Some(Box::new(self.clone())),
725 Some(Box::new(self.clone())),
726 CanGc::from_cx(cx),
727 );
728 promise.append_native_handler(&handler, realm, CanGc::from_cx(cx));
729 *self.shutdown_action_promise.borrow_mut() = Some(promise);
730 }
731
732 fn finalize(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
734 *self.state.borrow_mut() = PipeToState::Finalized;
735
736 self.writer.release(cx, global, can_gc);
738
739 self.reader
745 .release(can_gc)
746 .expect("Releasing the reader should not fail");
747
748 if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
754 rooted!(in(*cx) let mut error = UndefinedValue());
755 error.set(shutdown_error.get());
756 self.result_promise.reject_native(&error.handle(), can_gc);
758 } else {
759 self.result_promise.resolve_native(&(), can_gc);
761 }
762 }
763}
764
765#[derive(Clone, JSTraceable, MallocSizeOf)]
768struct SourceCancelPromiseFulfillmentHandler {
769 #[conditional_malloc_size_of]
770 result: Rc<Promise>,
771}
772
773impl Callback for SourceCancelPromiseFulfillmentHandler {
774 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
778 self.result.resolve_native(&(), CanGc::from_cx(cx));
779 }
780}
781
782#[derive(Clone, JSTraceable, MallocSizeOf)]
785struct SourceCancelPromiseRejectionHandler {
786 #[conditional_malloc_size_of]
787 result: Rc<Promise>,
788}
789
790impl Callback for SourceCancelPromiseRejectionHandler {
791 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
795 self.result.reject_native(&v, CanGc::from_cx(cx));
796 }
797}
798
799#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf, PartialEq)]
801pub(crate) enum ReadableStreamState {
802 #[default]
803 Readable,
804 Closed,
805 Errored,
806}
807
808#[derive(JSTraceable, MallocSizeOf)]
810#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
811pub(crate) enum ControllerType {
812 Byte(MutNullableDom<ReadableByteStreamController>),
814 Default(MutNullableDom<ReadableStreamDefaultController>),
816}
817
818#[derive(JSTraceable, MallocSizeOf)]
820#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
821pub(crate) enum ReaderType {
822 #[allow(clippy::upper_case_acronyms)]
824 BYOB(MutNullableDom<ReadableStreamBYOBReader>),
825 Default(MutNullableDom<ReadableStreamDefaultReader>),
827}
828
829impl Eq for ReaderType {}
830impl PartialEq for ReaderType {
831 fn eq(&self, other: &Self) -> bool {
832 matches!(
833 (self, other),
834 (ReaderType::BYOB(_), ReaderType::BYOB(_)) |
835 (ReaderType::Default(_), ReaderType::Default(_))
836 )
837 }
838}
839
840#[cfg_attr(crown, expect(crown::unrooted_must_root))]
842pub(crate) fn create_readable_stream(
843 global: &GlobalScope,
844 underlying_source_type: UnderlyingSourceType,
845 queuing_strategy: Option<Rc<QueuingStrategySize>>,
846 high_water_mark: Option<f64>,
847 can_gc: CanGc,
848) -> DomRoot<ReadableStream> {
849 let high_water_mark = high_water_mark.unwrap_or(1.0);
851
852 let size_algorithm =
854 queuing_strategy.unwrap_or(extract_size_algorithm(&QueuingStrategy::empty(), can_gc));
855
856 assert!(high_water_mark >= 0.0);
858
859 let stream = ReadableStream::new_with_proto(global, None, can_gc);
862
863 let controller = ReadableStreamDefaultController::new(
865 global,
866 underlying_source_type,
867 high_water_mark,
868 size_algorithm,
869 can_gc,
870 );
871
872 controller
875 .setup(stream.clone(), can_gc)
876 .expect("Setup of default controller cannot fail");
877
878 stream
880}
881
882#[cfg_attr(crown, expect(crown::unrooted_must_root))]
884pub(crate) fn readable_byte_stream_tee(
885 global: &GlobalScope,
886 underlying_source_type: UnderlyingSourceType,
887 can_gc: CanGc,
888) -> DomRoot<ReadableStream> {
889 let tee_stream = ReadableStream::new_with_proto(global, None, can_gc);
892
893 let controller = ReadableByteStreamController::new(underlying_source_type, 0.0, global, can_gc);
895
896 controller
898 .setup(global, tee_stream.clone(), can_gc)
899 .expect("Setup of byte stream controller cannot fail");
900
901 tee_stream
903}
904
905#[dom_struct]
907pub(crate) struct ReadableStream {
908 reflector_: Reflector,
909
910 controller: RefCell<Option<ControllerType>>,
914
915 #[ignore_malloc_size_of = "mozjs"]
917 stored_error: Heap<JSVal>,
918
919 disturbed: Cell<bool>,
921
922 reader: RefCell<Option<ReaderType>>,
924
925 state: Cell<ReadableStreamState>,
927}
928
929impl ReadableStream {
930 fn new_inherited() -> ReadableStream {
932 ReadableStream {
933 reflector_: Reflector::new(),
934 controller: RefCell::new(None),
935 stored_error: Heap::default(),
936 disturbed: Default::default(),
937 reader: RefCell::new(None),
938 state: Cell::new(Default::default()),
939 }
940 }
941
942 pub(crate) fn new_with_proto(
943 global: &GlobalScope,
944 proto: Option<SafeHandleObject>,
945 can_gc: CanGc,
946 ) -> DomRoot<ReadableStream> {
947 reflect_dom_object_with_proto(
948 Box::new(ReadableStream::new_inherited()),
949 global,
950 proto,
951 can_gc,
952 )
953 }
954
955 pub(crate) fn set_default_controller(&self, controller: &ReadableStreamDefaultController) {
958 *self.controller.borrow_mut() = Some(ControllerType::Default(MutNullableDom::new(Some(
959 controller,
960 ))));
961 }
962
963 pub(crate) fn set_byte_controller(&self, controller: &ReadableByteStreamController) {
966 *self.controller.borrow_mut() =
967 Some(ControllerType::Byte(MutNullableDom::new(Some(controller))));
968 }
969
970 pub(crate) fn assert_no_controller(&self) {
973 let has_no_controller = self.controller.borrow().is_none();
974 assert!(has_no_controller);
975 }
976
977 pub(crate) fn new_from_bytes(
979 global: &GlobalScope,
980 bytes: Vec<u8>,
981 can_gc: CanGc,
982 ) -> Fallible<DomRoot<ReadableStream>> {
983 let stream = ReadableStream::new_with_external_underlying_source(
984 global,
985 UnderlyingSourceType::Memory(bytes.len()),
986 can_gc,
987 )?;
988 stream.enqueue_native(bytes, can_gc);
989 stream.controller_close_native(can_gc);
990 Ok(stream)
991 }
992
993 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
996 pub(crate) fn new_with_external_underlying_source(
997 global: &GlobalScope,
998 source: UnderlyingSourceType,
999 can_gc: CanGc,
1000 ) -> Fallible<DomRoot<ReadableStream>> {
1001 assert!(source.is_native());
1002 let stream = ReadableStream::new_with_proto(global, None, can_gc);
1003 let controller = ReadableStreamDefaultController::new(
1004 global,
1005 source,
1006 1.0,
1007 extract_size_algorithm(&QueuingStrategy::empty(), can_gc),
1008 can_gc,
1009 );
1010 controller.setup(stream.clone(), can_gc)?;
1011 Ok(stream)
1012 }
1013
1014 pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
1016 match self.controller.borrow().as_ref() {
1017 Some(ControllerType::Default(controller)) => {
1018 let controller = controller
1019 .get()
1020 .ok_or_else(|| Error::Type(c"Stream should have controller.".to_owned()))?;
1021 controller.perform_release_steps()
1022 },
1023 Some(ControllerType::Byte(controller)) => {
1024 let controller = controller
1025 .get()
1026 .ok_or_else(|| Error::Type(c"Stream should have controller.".to_owned()))?;
1027 controller.perform_release_steps()
1028 },
1029 None => Err(Error::Type(c"Stream should have controller.".to_owned())),
1030 }
1031 }
1032
1033 pub(crate) fn perform_pull_steps(
1037 &self,
1038 cx: SafeJSContext,
1039 read_request: &ReadRequest,
1040 can_gc: CanGc,
1041 ) {
1042 match self.controller.borrow().as_ref() {
1043 Some(ControllerType::Default(controller)) => controller
1044 .get()
1045 .expect("Stream should have controller.")
1046 .perform_pull_steps(read_request, can_gc),
1047 Some(ControllerType::Byte(controller)) => controller
1048 .get()
1049 .expect("Stream should have controller.")
1050 .perform_pull_steps(cx, read_request, can_gc),
1051 None => {
1052 unreachable!("Stream does not have a controller.");
1053 },
1054 }
1055 }
1056
1057 pub(crate) fn perform_pull_into(
1061 &self,
1062 cx: SafeJSContext,
1063 read_into_request: &ReadIntoRequest,
1064 view: HeapBufferSource<ArrayBufferViewU8>,
1065 min: u64,
1066 can_gc: CanGc,
1067 ) {
1068 match self.controller.borrow().as_ref() {
1069 Some(ControllerType::Byte(controller)) => controller
1070 .get()
1071 .expect("Stream should have controller.")
1072 .perform_pull_into(cx, read_into_request, view, min, can_gc),
1073 _ => {
1074 unreachable!(
1075 "Pulling a chunk from a stream with a default controller using a BYOB reader"
1076 )
1077 },
1078 }
1079 }
1080
1081 pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
1083 match self.reader.borrow().as_ref() {
1084 Some(ReaderType::Default(reader)) => {
1085 let Some(reader) = reader.get() else {
1086 panic!("Attempt to add a read request without having first acquired a reader.");
1087 };
1088
1089 assert!(self.is_readable());
1091
1092 reader.add_read_request(read_request);
1094 },
1095 _ => {
1096 unreachable!("Adding a read request can only be done on a default reader.")
1097 },
1098 }
1099 }
1100
1101 pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
1103 match self.reader.borrow().as_ref() {
1104 Some(ReaderType::BYOB(reader)) => {
1106 let Some(reader) = reader.get() else {
1107 unreachable!(
1108 "Attempt to add a read into request without having first acquired a reader."
1109 );
1110 };
1111
1112 assert!(self.is_readable() || self.is_closed());
1114
1115 reader.add_read_into_request(read_request);
1117 },
1118 _ => {
1119 unreachable!("Adding a read into request can only be done on a BYOB reader.")
1120 },
1121 }
1122 }
1123
1124 pub(crate) fn enqueue_native(&self, bytes: Vec<u8>, can_gc: CanGc) {
1127 match self.controller.borrow().as_ref() {
1128 Some(ControllerType::Default(controller)) => controller
1129 .get()
1130 .expect("Stream should have controller.")
1131 .enqueue_native(bytes, can_gc),
1132 _ => {
1133 unreachable!(
1134 "Enqueueing chunk to a stream from Rust on other than default controller"
1135 );
1136 },
1137 }
1138 }
1139
1140 pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
1142 assert!(self.is_readable());
1144
1145 self.state.set(ReadableStreamState::Errored);
1147
1148 self.stored_error.set(e.get());
1150
1151 let default_reader = {
1154 let reader_ref = self.reader.borrow();
1155 match reader_ref.as_ref() {
1156 Some(ReaderType::Default(reader)) => reader.get(),
1157 _ => None,
1158 }
1159 };
1160
1161 if let Some(reader) = default_reader {
1162 reader.error(e, can_gc);
1164 return;
1165 }
1166
1167 let byob_reader = {
1168 let reader_ref = self.reader.borrow();
1169 match reader_ref.as_ref() {
1170 Some(ReaderType::BYOB(reader)) => reader.get(),
1171 _ => None,
1172 }
1173 };
1174
1175 if let Some(reader) = byob_reader {
1176 reader.error_read_into_requests(e, can_gc);
1178 }
1179
1180 }
1182
1183 pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
1185 handle_mut.set(self.stored_error.get());
1186 }
1187
1188 pub(crate) fn error_native(&self, error: Error, can_gc: CanGc) {
1191 let cx = GlobalScope::get_cx();
1192 rooted!(in(*cx) let mut error_val = UndefinedValue());
1193 error.to_jsval(cx, &self.global(), error_val.handle_mut(), can_gc);
1194 self.error(error_val.handle(), can_gc);
1195 }
1196
1197 pub(crate) fn controller_close_native(&self, can_gc: CanGc) {
1200 match self.controller.borrow().as_ref() {
1201 Some(ControllerType::Default(controller)) => {
1202 let _ = controller
1203 .get()
1204 .expect("Stream should have controller.")
1205 .Close(can_gc);
1206 },
1207 _ => {
1208 unreachable!("Native closing is only done on default controllers.")
1209 },
1210 }
1211 }
1212
1213 pub(crate) fn in_memory(&self) -> bool {
1216 match self.controller.borrow().as_ref() {
1217 Some(ControllerType::Default(controller)) => controller
1218 .get()
1219 .expect("Stream should have controller.")
1220 .in_memory(),
1221 _ => {
1222 unreachable!(
1223 "Checking if source is in memory for a stream with a non-default controller"
1224 )
1225 },
1226 }
1227 }
1228
1229 pub(crate) fn get_in_memory_bytes(&self) -> Option<GenericSharedMemory> {
1232 match self.controller.borrow().as_ref() {
1233 Some(ControllerType::Default(controller)) => controller
1234 .get()
1235 .expect("Stream should have controller.")
1236 .get_in_memory_bytes()
1237 .as_deref()
1238 .map(GenericSharedMemory::from_bytes),
1239 _ => {
1240 unreachable!("Getting in-memory bytes for a stream with a non-default controller")
1241 },
1242 }
1243 }
1244
1245 pub(crate) fn acquire_default_reader(
1250 &self,
1251 can_gc: CanGc,
1252 ) -> Fallible<DomRoot<ReadableStreamDefaultReader>> {
1253 let reader = ReadableStreamDefaultReader::new(&self.global(), can_gc);
1255
1256 reader.set_up(self, &self.global(), can_gc)?;
1258
1259 Ok(reader)
1261 }
1262
1263 pub(crate) fn acquire_byob_reader(
1265 &self,
1266 can_gc: CanGc,
1267 ) -> Fallible<DomRoot<ReadableStreamBYOBReader>> {
1268 let reader = ReadableStreamBYOBReader::new(&self.global(), can_gc);
1270 reader.set_up(self, &self.global(), can_gc)?;
1272
1273 Ok(reader)
1275 }
1276
1277 pub(crate) fn get_default_controller(&self) -> DomRoot<ReadableStreamDefaultController> {
1278 match self.controller.borrow().as_ref() {
1279 Some(ControllerType::Default(controller)) => {
1280 controller.get().expect("Stream should have controller.")
1281 },
1282 _ => {
1283 unreachable!(
1284 "Getting default controller for a stream with a non-default controller"
1285 )
1286 },
1287 }
1288 }
1289
1290 pub(crate) fn get_byte_controller(&self) -> DomRoot<ReadableByteStreamController> {
1291 match self.controller.borrow().as_ref() {
1292 Some(ControllerType::Byte(controller)) => {
1293 controller.get().expect("Stream should have controller.")
1294 },
1295 _ => {
1296 unreachable!("Getting byte controller for a stream with a non-byte controller")
1297 },
1298 }
1299 }
1300
1301 pub(crate) fn get_default_reader(&self) -> DomRoot<ReadableStreamDefaultReader> {
1302 match self.reader.borrow().as_ref() {
1303 Some(ReaderType::Default(reader)) => reader.get().expect("Stream should have reader."),
1304 _ => {
1305 unreachable!("Getting default reader for a stream with a non-default reader")
1306 },
1307 }
1308 }
1309
1310 pub(crate) fn read_a_chunk(&self, can_gc: CanGc) -> Rc<Promise> {
1316 match self.reader.borrow().as_ref() {
1317 Some(ReaderType::Default(reader)) => {
1318 let Some(reader) = reader.get() else {
1319 unreachable!(
1320 "Attempt to read stream chunk without having first acquired a reader."
1321 );
1322 };
1323 reader.Read(can_gc)
1324 },
1325 _ => {
1326 unreachable!("Native reading of a chunk can only be done with a default reader.")
1327 },
1328 }
1329 }
1330
1331 pub(crate) fn stop_reading(&self, can_gc: CanGc) {
1336 let reader_ref = self.reader.borrow();
1337
1338 match reader_ref.as_ref() {
1339 Some(ReaderType::Default(reader)) => {
1340 let Some(reader) = reader.get() else {
1341 unreachable!("Attempt to stop reading without having first acquired a reader.");
1342 };
1343
1344 drop(reader_ref);
1345 reader.release(can_gc).expect("Reader release cannot fail.");
1346 },
1347 _ => {
1348 unreachable!("Native stop reading can only be done with a default reader.")
1349 },
1350 }
1351 }
1352
1353 pub(crate) fn is_locked(&self) -> bool {
1355 match self.reader.borrow().as_ref() {
1356 Some(ReaderType::Default(reader)) => reader.get().is_some(),
1357 Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1358 None => false,
1359 }
1360 }
1361
1362 pub(crate) fn is_disturbed(&self) -> bool {
1363 self.disturbed.get()
1364 }
1365
1366 pub(crate) fn set_is_disturbed(&self, disturbed: bool) {
1367 self.disturbed.set(disturbed);
1368 }
1369
1370 pub(crate) fn is_closed(&self) -> bool {
1371 self.state.get() == ReadableStreamState::Closed
1372 }
1373
1374 pub(crate) fn is_errored(&self) -> bool {
1375 self.state.get() == ReadableStreamState::Errored
1376 }
1377
1378 pub(crate) fn is_readable(&self) -> bool {
1379 self.state.get() == ReadableStreamState::Readable
1380 }
1381
1382 pub(crate) fn has_default_reader(&self) -> bool {
1383 match self.reader.borrow().as_ref() {
1384 Some(ReaderType::Default(reader)) => reader.get().is_some(),
1385 _ => false,
1386 }
1387 }
1388
1389 pub(crate) fn has_byob_reader(&self) -> bool {
1390 match self.reader.borrow().as_ref() {
1391 Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1392 _ => false,
1393 }
1394 }
1395
1396 pub(crate) fn has_byte_controller(&self) -> bool {
1397 match self.controller.borrow().as_ref() {
1398 Some(ControllerType::Byte(controller)) => controller.get().is_some(),
1399 _ => false,
1400 }
1401 }
1402
1403 pub(crate) fn get_num_read_requests(&self) -> usize {
1405 match self.reader.borrow().as_ref() {
1406 Some(ReaderType::Default(reader)) => {
1407 let reader = reader
1408 .get()
1409 .expect("Stream must have a reader when getting the number of read requests.");
1410 reader.get_num_read_requests()
1411 },
1412 _ => unreachable!(
1413 "Stream must have a default reader when get num read requests is called into."
1414 ),
1415 }
1416 }
1417
1418 pub(crate) fn get_num_read_into_requests(&self) -> usize {
1420 assert!(self.has_byob_reader());
1421
1422 match self.reader.borrow().as_ref() {
1423 Some(ReaderType::BYOB(reader)) => {
1424 let Some(reader) = reader.get() else {
1425 unreachable!(
1426 "Stream must have a reader when get num read into requests is called into."
1427 );
1428 };
1429 reader.get_num_read_into_requests()
1430 },
1431 _ => {
1432 unreachable!(
1433 "Stream must have a BYOB reader when get num read into requests is called into."
1434 );
1435 },
1436 }
1437 }
1438
1439 pub(crate) fn fulfill_read_request(&self, chunk: SafeHandleValue, done: bool, can_gc: CanGc) {
1441 assert!(self.has_default_reader());
1443
1444 match self.reader.borrow().as_ref() {
1445 Some(ReaderType::Default(reader)) => {
1446 let reader = reader
1448 .get()
1449 .expect("Stream must have a reader when a read request is fulfilled.");
1450 assert_ne!(reader.get_num_read_requests(), 0);
1452 let request = reader.remove_read_request();
1455
1456 if done {
1457 request.close_steps(can_gc);
1459 } else {
1460 let result = RootedTraceableBox::new(Heap::default());
1462 result.set(*chunk);
1463 request.chunk_steps(result, &self.global(), can_gc);
1464 }
1465 },
1466 _ => {
1467 unreachable!(
1468 "Stream must have a default reader when fulfill read requests is called into."
1469 );
1470 },
1471 }
1472 }
1473
1474 pub(crate) fn fulfill_read_into_request(
1476 &self,
1477 chunk: SafeHandleValue,
1478 done: bool,
1479 can_gc: CanGc,
1480 ) {
1481 assert!(self.has_byob_reader());
1483
1484 match self.reader.borrow().as_ref() {
1486 Some(ReaderType::BYOB(reader)) => {
1487 let Some(reader) = reader.get() else {
1488 unreachable!(
1489 "Stream must have a reader when a read into request is fulfilled."
1490 );
1491 };
1492
1493 assert!(reader.get_num_read_into_requests() > 0);
1495
1496 let read_into_request = reader.remove_read_into_request();
1499
1500 let result = RootedTraceableBox::new(Heap::default());
1502 if done {
1503 result.set(*chunk);
1504 read_into_request.close_steps(Some(result), can_gc);
1505 } else {
1506 result.set(*chunk);
1508 read_into_request.chunk_steps(result, can_gc);
1509 }
1510 },
1511 _ => {
1512 unreachable!(
1513 "Stream must have a BYOB reader when fulfill read into requests is called into."
1514 );
1515 },
1516 };
1517 }
1518
1519 pub(crate) fn close(&self, can_gc: CanGc) {
1521 assert!(self.is_readable());
1523 self.state.set(ReadableStreamState::Closed);
1525 let default_reader = {
1531 let reader_ref = self.reader.borrow();
1532 match reader_ref.as_ref() {
1533 Some(ReaderType::Default(reader)) => reader.get(),
1534 _ => None,
1535 }
1536 };
1537
1538 if let Some(reader) = default_reader {
1539 reader.close(can_gc);
1541 return;
1542 }
1543
1544 let byob_reader = {
1546 let reader_ref = self.reader.borrow();
1547 match reader_ref.as_ref() {
1548 Some(ReaderType::BYOB(reader)) => reader.get(),
1549 _ => None,
1550 }
1551 };
1552
1553 if let Some(reader) = byob_reader {
1554 reader.close(can_gc);
1556 }
1557
1558 }
1560
1561 pub(crate) fn cancel(
1563 &self,
1564 cx: SafeJSContext,
1565 global: &GlobalScope,
1566 reason: SafeHandleValue,
1567 can_gc: CanGc,
1568 ) -> Rc<Promise> {
1569 self.disturbed.set(true);
1571
1572 if self.is_closed() {
1574 return Promise::new_resolved(global, cx, (), can_gc);
1575 }
1576 if self.is_errored() {
1578 let promise = Promise::new(global, can_gc);
1579 rooted!(in(*cx) let mut rval = UndefinedValue());
1580 self.stored_error
1581 .safe_to_jsval(cx, rval.handle_mut(), can_gc);
1582 promise.reject_native(&rval.handle(), can_gc);
1583 return promise;
1584 }
1585 self.close(can_gc);
1587
1588 let byob_reader = {
1590 let reader_ref = self.reader.borrow();
1591 match reader_ref.as_ref() {
1592 Some(ReaderType::BYOB(reader)) => reader.get(),
1593 _ => None,
1594 }
1595 };
1596
1597 if let Some(reader) = byob_reader {
1598 reader.cancel(can_gc);
1600 }
1601
1602 let source_cancel_promise = match self.controller.borrow().as_ref() {
1605 Some(ControllerType::Default(controller)) => controller
1606 .get()
1607 .expect("Stream should have controller.")
1608 .perform_cancel_steps(cx, global, reason, can_gc),
1609 Some(ControllerType::Byte(controller)) => controller
1610 .get()
1611 .expect("Stream should have controller.")
1612 .perform_cancel_steps(cx, global, reason, can_gc),
1613 None => {
1614 panic!("Stream does not have a controller.");
1615 },
1616 };
1617
1618 let global = self.global();
1621 let result_promise = Promise::new(&global, can_gc);
1622 let fulfillment_handler = Box::new(SourceCancelPromiseFulfillmentHandler {
1623 result: result_promise.clone(),
1624 });
1625 let rejection_handler = Box::new(SourceCancelPromiseRejectionHandler {
1626 result: result_promise.clone(),
1627 });
1628 let handler = PromiseNativeHandler::new(
1629 &global,
1630 Some(fulfillment_handler),
1631 Some(rejection_handler),
1632 can_gc,
1633 );
1634 let realm = enter_realm(&*global);
1635 let comp = InRealm::Entered(&realm);
1636 source_cancel_promise.append_native_handler(&handler, comp, can_gc);
1637
1638 result_promise
1641 }
1642
1643 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1644 pub(crate) fn set_reader(&self, new_reader: Option<ReaderType>) {
1645 *self.reader.borrow_mut() = new_reader;
1646 }
1647
1648 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1649 fn byte_tee(&self, can_gc: CanGc) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1651 let reader = self.acquire_default_reader(can_gc)?;
1656 let reader = Rc::new(RefCell::new(ReaderType::Default(MutNullableDom::new(
1657 Some(&reader),
1658 ))));
1659
1660 let reading = Rc::new(Cell::new(false));
1662
1663 let read_again_for_branch_1 = Rc::new(Cell::new(false));
1665
1666 let read_again_for_branch_2 = Rc::new(Cell::new(false));
1668
1669 let canceled_1 = Rc::new(Cell::new(false));
1671
1672 let canceled_2 = Rc::new(Cell::new(false));
1674
1675 let reason_1 = Rc::new(Heap::boxed(UndefinedValue()));
1677
1678 let reason_2 = Rc::new(Heap::boxed(UndefinedValue()));
1680
1681 let cancel_promise = Promise::new(&self.global(), can_gc);
1683 let reader_version = Rc::new(Cell::new(0));
1684
1685 let byte_tee_source_1 = ByteTeeUnderlyingSource::new(
1686 reader.clone(),
1687 self,
1688 reading.clone(),
1689 read_again_for_branch_1.clone(),
1690 read_again_for_branch_2.clone(),
1691 canceled_1.clone(),
1692 canceled_2.clone(),
1693 reason_1.clone(),
1694 reason_2.clone(),
1695 cancel_promise.clone(),
1696 reader_version.clone(),
1697 ByteTeeCancelAlgorithm::Cancel1Algorithm,
1698 ByteTeePullAlgorithm::Pull1Algorithm,
1699 can_gc,
1700 );
1701
1702 let byte_tee_source_2 = ByteTeeUnderlyingSource::new(
1703 reader.clone(),
1704 self,
1705 reading,
1706 read_again_for_branch_1,
1707 read_again_for_branch_2,
1708 canceled_1,
1709 canceled_2,
1710 reason_1,
1711 reason_2,
1712 cancel_promise.clone(),
1713 reader_version,
1714 ByteTeeCancelAlgorithm::Cancel2Algorithm,
1715 ByteTeePullAlgorithm::Pull2Algorithm,
1716 can_gc,
1717 );
1718
1719 let branch_1 = readable_byte_stream_tee(
1721 &self.global(),
1722 UnderlyingSourceType::TeeByte(Dom::from_ref(&byte_tee_source_1)),
1723 can_gc,
1724 );
1725 byte_tee_source_1.set_branch_1(&branch_1);
1726 byte_tee_source_2.set_branch_1(&branch_1);
1727
1728 let branch_2 = readable_byte_stream_tee(
1730 &self.global(),
1731 UnderlyingSourceType::TeeByte(Dom::from_ref(&byte_tee_source_2)),
1732 can_gc,
1733 );
1734 byte_tee_source_1.set_branch_2(&branch_2);
1735 byte_tee_source_2.set_branch_2(&branch_2);
1736
1737 byte_tee_source_1.forward_reader_error(reader.clone(), can_gc);
1739 byte_tee_source_2.forward_reader_error(reader, can_gc);
1740
1741 Ok(vec![branch_1, branch_2])
1743 }
1744
1745 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1747 fn default_tee(
1748 &self,
1749 clone_for_branch_2: bool,
1750 can_gc: CanGc,
1751 ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1752 let clone_for_branch_2 = Rc::new(Cell::new(clone_for_branch_2));
1756
1757 let reader = self.acquire_default_reader(can_gc)?;
1759
1760 let reading = Rc::new(Cell::new(false));
1762 let read_again = Rc::new(Cell::new(false));
1764 let canceled_1 = Rc::new(Cell::new(false));
1766 let canceled_2 = Rc::new(Cell::new(false));
1768
1769 let reason_1 = Rc::new(Heap::boxed(UndefinedValue()));
1771 let reason_2 = Rc::new(Heap::boxed(UndefinedValue()));
1773 let cancel_promise = Promise::new(&self.global(), can_gc);
1775
1776 let tee_source_1 = DefaultTeeUnderlyingSource::new(
1777 &reader,
1778 self,
1779 reading.clone(),
1780 read_again.clone(),
1781 canceled_1.clone(),
1782 canceled_2.clone(),
1783 clone_for_branch_2.clone(),
1784 reason_1.clone(),
1785 reason_2.clone(),
1786 cancel_promise.clone(),
1787 DefaultTeeCancelAlgorithm::Cancel1Algorithm,
1788 can_gc,
1789 );
1790
1791 let underlying_source_type_branch_1 =
1792 UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_1));
1793
1794 let tee_source_2 = DefaultTeeUnderlyingSource::new(
1795 &reader,
1796 self,
1797 reading,
1798 read_again,
1799 canceled_1.clone(),
1800 canceled_2.clone(),
1801 clone_for_branch_2,
1802 reason_1,
1803 reason_2,
1804 cancel_promise.clone(),
1805 DefaultTeeCancelAlgorithm::Cancel2Algorithm,
1806 can_gc,
1807 );
1808
1809 let underlying_source_type_branch_2 =
1810 UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_2));
1811
1812 let branch_1 = create_readable_stream(
1814 &self.global(),
1815 underlying_source_type_branch_1,
1816 None,
1817 None,
1818 can_gc,
1819 );
1820 tee_source_1.set_branch_1(&branch_1);
1821 tee_source_2.set_branch_1(&branch_1);
1822
1823 let branch_2 = create_readable_stream(
1825 &self.global(),
1826 underlying_source_type_branch_2,
1827 None,
1828 None,
1829 can_gc,
1830 );
1831 tee_source_1.set_branch_2(&branch_2);
1832 tee_source_2.set_branch_2(&branch_2);
1833
1834 reader.default_tee_append_native_handler_to_closed_promise(
1836 &branch_1,
1837 &branch_2,
1838 canceled_1,
1839 canceled_2,
1840 cancel_promise,
1841 can_gc,
1842 );
1843
1844 Ok(vec![branch_1, branch_2])
1846 }
1847
1848 #[allow(clippy::too_many_arguments)]
1850 pub(crate) fn pipe_to(
1851 &self,
1852 cx: &mut CurrentRealm,
1853 global: &GlobalScope,
1854 dest: &WritableStream,
1855 prevent_close: bool,
1856 prevent_abort: bool,
1857 prevent_cancel: bool,
1858 signal: Option<&AbortSignal>,
1859 ) -> Rc<Promise> {
1860 let realm = cx.into();
1861 let realm = InRealm::Already(&realm);
1862 assert!(!self.is_locked());
1873
1874 assert!(!dest.is_locked());
1876
1877 let reader = self
1885 .acquire_default_reader(CanGc::from_cx(cx))
1886 .expect("Acquiring a default reader for pipe_to cannot fail");
1887
1888 let writer = dest
1890 .aquire_default_writer(cx.into(), global, CanGc::from_cx(cx))
1891 .expect("Acquiring a default writer for pipe_to cannot fail");
1892
1893 self.disturbed.set(true);
1895
1896 let promise = Promise::new2(cx, global);
1901
1902 rooted!(&in(cx) let pipe_to = PipeTo {
1904 reader: Dom::from_ref(&reader),
1905 writer: Dom::from_ref(&writer),
1906 pending_writes: Default::default(),
1907 state: Default::default(),
1908 prevent_abort,
1909 prevent_cancel,
1910 prevent_close,
1911 shutting_down: Default::default(),
1912 abort_reason: Default::default(),
1913 shutdown_error: Default::default(),
1914 shutdown_action_promise: Default::default(),
1915 result_promise: promise.clone(),
1916 });
1917
1918 if let Some(signal) = signal {
1921 rooted!(&in(cx) let abort_algorithm = AbortAlgorithm::StreamPiping(pipe_to.clone()));
1924
1925 if signal.aborted() {
1927 signal.run_abort_algorithm(cx, global, &abort_algorithm);
1928 return promise;
1929 }
1930
1931 signal.add(&abort_algorithm);
1933 }
1934
1935 pipe_to.check_and_propagate_errors_forward(cx, global);
1937 pipe_to.check_and_propagate_errors_backward(cx, global);
1938 pipe_to.check_and_propagate_closing_forward(cx, global);
1939 pipe_to.check_and_propagate_closing_backward(cx, global);
1940
1941 if *pipe_to.state.borrow() == PipeToState::Starting {
1943 pipe_to.wait_for_writer_ready(global, realm, CanGc::from_cx(cx));
1945 }
1946
1947 promise
1949 }
1950
1951 pub(crate) fn tee(
1953 &self,
1954 clone_for_branch_2: bool,
1955 can_gc: CanGc,
1956 ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1957 match self.controller.borrow().as_ref() {
1961 Some(ControllerType::Default(_)) => {
1962 self.default_tee(clone_for_branch_2, can_gc)
1964 },
1965 Some(ControllerType::Byte(_)) => {
1966 self.byte_tee(can_gc)
1969 },
1970 None => {
1971 unreachable!("Stream should have a controller.");
1972 },
1973 }
1974 }
1975
1976 pub(crate) fn set_up_byte_controller(
1978 &self,
1979 global: &GlobalScope,
1980 underlying_source_dict: JsUnderlyingSource,
1981 underlying_source_handle: SafeHandleObject,
1982 stream: DomRoot<ReadableStream>,
1983 strategy_hwm: f64,
1984 can_gc: CanGc,
1985 ) -> Fallible<()> {
1986 if let Some(0) = underlying_source_dict.autoAllocateChunkSize {
2002 return Err(Error::Type(c"autoAllocateChunkSize cannot be 0".to_owned()));
2003 }
2004
2005 let controller = ReadableByteStreamController::new(
2006 UnderlyingSourceType::Js(underlying_source_dict, Heap::default()),
2007 strategy_hwm,
2008 global,
2009 can_gc,
2010 );
2011
2012 controller.set_underlying_source_this_object(underlying_source_handle);
2015
2016 controller.setup(global, stream, can_gc)
2019 }
2020
2021 pub(crate) fn setup_cross_realm_transform_readable(
2023 &self,
2024 cx: SafeJSContext,
2025 port: &MessagePort,
2026 can_gc: CanGc,
2027 ) {
2028 let port_id = port.message_port_id();
2029 let global = self.global();
2030
2031 let size_algorithm = extract_size_algorithm(&QueuingStrategy::default(), can_gc);
2036
2037 let controller = ReadableStreamDefaultController::new(
2041 &self.global(),
2042 UnderlyingSourceType::Transfer(Dom::from_ref(port)),
2043 0.,
2044 size_algorithm,
2045 can_gc,
2046 );
2047
2048 rooted!(in(*cx) let cross_realm_transform_readable = CrossRealmTransformReadable {
2051 controller: Dom::from_ref(&controller),
2052 });
2053 global.note_cross_realm_transform_readable(&cross_realm_transform_readable, port_id);
2054
2055 port.Start(can_gc);
2057
2058 controller
2060 .setup(DomRoot::from_ref(self), can_gc)
2061 .expect("Setting up controller for transfer cannot fail.");
2062 }
2063}
2064
2065impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
2066 fn Constructor(
2068 cx: SafeJSContext,
2069 global: &GlobalScope,
2070 proto: Option<SafeHandleObject>,
2071 can_gc: CanGc,
2072 underlying_source: Option<*mut JSObject>,
2073 strategy: &QueuingStrategy,
2074 ) -> Fallible<DomRoot<Self>> {
2075 rooted!(in(*cx) let underlying_source_obj = underlying_source.unwrap_or(ptr::null_mut()));
2077 let underlying_source_dict = if !underlying_source_obj.is_null() {
2080 rooted!(in(*cx) let obj_val = ObjectValue(underlying_source_obj.get()));
2081 match JsUnderlyingSource::new(cx, obj_val.handle(), can_gc) {
2082 Ok(ConversionResult::Success(val)) => val,
2083 Ok(ConversionResult::Failure(error)) => {
2084 return Err(Error::Type(error.into_owned()));
2085 },
2086 _ => {
2087 return Err(Error::JSFailed);
2088 },
2089 }
2090 } else {
2091 JsUnderlyingSource::empty()
2092 };
2093
2094 let stream = ReadableStream::new_with_proto(global, proto, can_gc);
2096
2097 if underlying_source_dict.type_.is_some() {
2098 if strategy.size.is_some() {
2100 return Err(Error::Range(
2101 c"size is not supported for byte streams".to_owned(),
2102 ));
2103 }
2104
2105 let strategy_hwm = extract_high_water_mark(strategy, 0.0)?;
2107
2108 stream.set_up_byte_controller(
2111 global,
2112 underlying_source_dict,
2113 underlying_source_obj.handle(),
2114 stream.clone(),
2115 strategy_hwm,
2116 can_gc,
2117 )?;
2118 } else {
2119 let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
2121
2122 let size_algorithm = extract_size_algorithm(strategy, can_gc);
2124
2125 let controller = ReadableStreamDefaultController::new(
2126 global,
2127 UnderlyingSourceType::Js(underlying_source_dict, Heap::default()),
2128 high_water_mark,
2129 size_algorithm,
2130 can_gc,
2131 );
2132
2133 controller.set_underlying_source_this_object(underlying_source_obj.handle());
2136
2137 controller.setup(stream.clone(), can_gc)?;
2139 };
2140
2141 Ok(stream)
2142 }
2143
2144 fn Locked(&self) -> bool {
2146 self.is_locked()
2147 }
2148
2149 fn Cancel(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
2151 let global = self.global();
2152 if self.is_locked() {
2153 let promise = Promise::new(&global, can_gc);
2156 promise.reject_error(Error::Type(c"stream is locked".to_owned()), can_gc);
2157 promise
2158 } else {
2159 self.cancel(cx, &global, reason, can_gc)
2161 }
2162 }
2163
2164 fn GetReader(
2166 &self,
2167 options: &ReadableStreamGetReaderOptions,
2168 can_gc: CanGc,
2169 ) -> Fallible<ReadableStreamReader> {
2170 if options.mode.is_none() {
2172 return Ok(ReadableStreamReader::ReadableStreamDefaultReader(
2173 self.acquire_default_reader(can_gc)?,
2174 ));
2175 }
2176 assert!(options.mode.unwrap() == ReadableStreamReaderMode::Byob);
2178
2179 Ok(ReadableStreamReader::ReadableStreamBYOBReader(
2181 self.acquire_byob_reader(can_gc)?,
2182 ))
2183 }
2184
2185 fn Tee(&self, can_gc: CanGc) -> Fallible<Vec<DomRoot<ReadableStream>>> {
2187 self.tee(false, can_gc)
2189 }
2190
2191 fn PipeTo(
2193 &self,
2194 cx: &mut CurrentRealm,
2195 destination: &WritableStream,
2196 options: &StreamPipeOptions,
2197 ) -> Rc<Promise> {
2198 let global = self.global();
2199
2200 if self.is_locked() {
2202 let promise = Promise::new2(cx, &global);
2204 promise.reject_error(
2205 Error::Type(c"Source stream is locked".to_owned()),
2206 CanGc::from_cx(cx),
2207 );
2208 return promise;
2209 }
2210
2211 if destination.is_locked() {
2213 let promise = Promise::new2(cx, &global);
2215 promise.reject_error(
2216 Error::Type(c"Destination stream is locked".to_owned()),
2217 CanGc::from_cx(cx),
2218 );
2219 return promise;
2220 }
2221
2222 let signal = options.signal.as_deref();
2224
2225 self.pipe_to(
2227 cx,
2228 &global,
2229 destination,
2230 options.preventClose,
2231 options.preventAbort,
2232 options.preventCancel,
2233 signal,
2234 )
2235 }
2236
2237 fn PipeThrough(
2239 &self,
2240 cx: &mut CurrentRealm,
2241 transform: &ReadableWritablePair,
2242 options: &StreamPipeOptions,
2243 ) -> Fallible<DomRoot<ReadableStream>> {
2244 let global = self.global();
2245
2246 if self.is_locked() {
2248 return Err(Error::Type(c"Source stream is locked".to_owned()));
2249 }
2250
2251 if transform.writable.is_locked() {
2253 return Err(Error::Type(c"Destination stream is locked".to_owned()));
2254 }
2255
2256 let signal = options.signal.as_deref();
2258
2259 let promise = self.pipe_to(
2262 cx,
2263 &global,
2264 &transform.writable,
2265 options.preventClose,
2266 options.preventAbort,
2267 options.preventCancel,
2268 signal,
2269 );
2270
2271 promise.set_promise_is_handled();
2273
2274 Ok(transform.readable.clone())
2276 }
2277}
2278
2279#[expect(unsafe_code)]
2280pub(crate) unsafe fn get_type_and_value_from_message(
2284 cx: SafeJSContext,
2285 data: SafeHandleValue,
2286 value: SafeMutableHandleValue,
2287 can_gc: CanGc,
2288) -> DOMString {
2289 assert!(data.is_object());
2295 rooted!(in(*cx) let data_object = data.to_object());
2296
2297 rooted!(in(*cx) let mut type_ = UndefinedValue());
2299 unsafe {
2300 get_dictionary_property(
2301 *cx,
2302 data_object.handle(),
2303 c"type",
2304 type_.handle_mut(),
2305 can_gc,
2306 )
2307 }
2308 .expect("Getting the type should not fail.");
2309
2310 unsafe { get_dictionary_property(*cx, data_object.handle(), c"value", value, can_gc) }
2312 .expect("Getting the value should not fail.");
2313
2314 let result =
2316 DOMString::safe_from_jsval(cx, type_.handle(), StringificationBehavior::Empty, can_gc)
2317 .expect("The type of the message should be a string");
2318 let ConversionResult::Success(type_string) = result else {
2319 unreachable!("The type of the message should be a string");
2320 };
2321
2322 type_string
2323}
2324
2325impl js::gc::Rootable for CrossRealmTransformReadable {}
2326
2327#[derive(Clone, JSTraceable, MallocSizeOf)]
2331#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
2332pub(crate) struct CrossRealmTransformReadable {
2333 controller: Dom<ReadableStreamDefaultController>,
2335}
2336
2337impl CrossRealmTransformReadable {
2338 #[expect(unsafe_code)]
2341 pub(crate) fn handle_message(
2342 &self,
2343 cx: SafeJSContext,
2344 global: &GlobalScope,
2345 port: &MessagePort,
2346 message: SafeHandleValue,
2347 _realm: InRealm,
2348 can_gc: CanGc,
2349 ) {
2350 rooted!(in(*cx) let mut value = UndefinedValue());
2351 let type_string =
2352 unsafe { get_type_and_value_from_message(cx, message, value.handle_mut(), can_gc) };
2353
2354 if type_string == "chunk" {
2356 self.controller
2358 .enqueue(cx, value.handle(), can_gc)
2359 .expect("Enqueing a chunk should not fail.");
2360 }
2361
2362 if type_string == "close" {
2364 self.controller.close(can_gc);
2366
2367 global.disentangle_port(port, can_gc);
2369 }
2370
2371 if type_string == "error" {
2373 self.controller.error(value.handle(), can_gc);
2375
2376 global.disentangle_port(port, can_gc);
2378 }
2379 }
2380
2381 pub(crate) fn handle_error(
2384 &self,
2385 cx: SafeJSContext,
2386 global: &GlobalScope,
2387 port: &MessagePort,
2388 _realm: InRealm,
2389 can_gc: CanGc,
2390 ) {
2391 let error = DOMException::new(global, DOMErrorName::DataCloneError, can_gc);
2393 rooted!(in(*cx) let mut rooted_error = UndefinedValue());
2394 error.safe_to_jsval(cx, rooted_error.handle_mut(), can_gc);
2395
2396 port.cross_realm_transform_send_error(rooted_error.handle(), can_gc);
2398
2399 self.controller.error(rooted_error.handle(), can_gc);
2401
2402 global.disentangle_port(port, can_gc);
2404 }
2405}
2406
2407#[expect(unsafe_code)]
2408pub(crate) fn get_read_promise_done(
2410 cx: SafeJSContext,
2411 v: &SafeHandleValue,
2412 can_gc: CanGc,
2413) -> Result<bool, Error> {
2414 if !v.is_object() {
2415 return Err(Error::Type(c"Unknown format for done property.".to_owned()));
2416 }
2417 unsafe {
2418 rooted!(in(*cx) let object = v.to_object());
2419 rooted!(in(*cx) let mut done = UndefinedValue());
2420 match get_dictionary_property(*cx, object.handle(), c"done", done.handle_mut(), can_gc) {
2421 Ok(true) => match bool::safe_from_jsval(cx, done.handle(), (), can_gc) {
2422 Ok(ConversionResult::Success(val)) => Ok(val),
2423 Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.into_owned())),
2424 _ => Err(Error::Type(c"Unknown format for done property.".to_owned())),
2425 },
2426 Ok(false) => Err(Error::Type(c"Promise has no done property.".to_owned())),
2427 Err(()) => Err(Error::JSFailed),
2428 }
2429 }
2430}
2431
2432#[expect(unsafe_code)]
2433pub(crate) fn get_read_promise_bytes(
2435 cx: SafeJSContext,
2436 v: &SafeHandleValue,
2437 can_gc: CanGc,
2438) -> Result<Vec<u8>, Error> {
2439 if !v.is_object() {
2440 return Err(Error::Type(
2441 c"Unknown format for for bytes read.".to_owned(),
2442 ));
2443 }
2444 unsafe {
2445 rooted!(in(*cx) let object = v.to_object());
2446 rooted!(in(*cx) let mut bytes = UndefinedValue());
2447 match get_dictionary_property(*cx, object.handle(), c"value", bytes.handle_mut(), can_gc) {
2448 Ok(true) => {
2449 match Vec::<u8>::safe_from_jsval(
2450 cx,
2451 bytes.handle(),
2452 ConversionBehavior::EnforceRange,
2453 can_gc,
2454 ) {
2455 Ok(ConversionResult::Success(val)) => Ok(val),
2456 Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.into_owned())),
2457 _ => Err(Error::Type(c"Unknown format for bytes read.".to_owned())),
2458 }
2459 },
2460 Ok(false) => Err(Error::Type(c"Promise has no value property.".to_owned())),
2461 Err(()) => Err(Error::JSFailed),
2462 }
2463 }
2464}
2465
2466pub(crate) fn bytes_from_chunk_jsval(
2470 cx: SafeJSContext,
2471 chunk: &RootedTraceableBox<Heap<JSVal>>,
2472 can_gc: CanGc,
2473) -> Result<Vec<u8>, Error> {
2474 match Vec::<u8>::safe_from_jsval(cx, chunk.handle(), ConversionBehavior::EnforceRange, can_gc) {
2475 Ok(ConversionResult::Success(vec)) => Ok(vec),
2476 Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.into_owned())),
2477 _ => Err(Error::Type(c"Unknown format for bytes read.".to_owned())),
2478 }
2479}
2480
2481impl Transferable for ReadableStream {
2483 type Index = MessagePortIndex;
2484 type Data = MessagePortImpl;
2485
2486 fn transfer(
2488 &self,
2489 cx: &mut js::context::JSContext,
2490 ) -> Fallible<(MessagePortId, MessagePortImpl)> {
2491 if self.is_locked() {
2494 return Err(Error::DataClone(None));
2495 }
2496
2497 let global = self.global();
2498 let mut realm = enter_auto_realm(cx, &*global);
2499 let mut realm = realm.current_realm();
2500 let cx = &mut realm;
2501
2502 let port_1 = MessagePort::new(&global, CanGc::from_cx(cx));
2504 global.track_message_port(&port_1, None);
2505
2506 let port_2 = MessagePort::new(&global, CanGc::from_cx(cx));
2508 global.track_message_port(&port_2, None);
2509
2510 global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
2512
2513 let writable = WritableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
2515
2516 writable.setup_cross_realm_transform_writable(cx.into(), &port_1, CanGc::from_cx(cx));
2518
2519 let promise = self.pipe_to(cx, &global, &writable, false, false, false, None);
2521
2522 promise.set_promise_is_handled();
2524
2525 port_2.transfer(cx)
2527 }
2528
2529 fn transfer_receive(
2531 cx: &mut js::context::JSContext,
2532 owner: &GlobalScope,
2533 id: MessagePortId,
2534 port_impl: MessagePortImpl,
2535 ) -> Result<DomRoot<Self>, ()> {
2536 let value = ReadableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
2539
2540 let transferred_port = MessagePort::transfer_receive(cx, owner, id, port_impl)?;
2547
2548 value.setup_cross_realm_transform_readable(
2550 cx.into(),
2551 &transferred_port,
2552 CanGc::from_cx(cx),
2553 );
2554 Ok(value)
2555 }
2556
2557 fn serialized_storage<'a>(
2559 data: StructuredData<'a, '_>,
2560 ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
2561 match data {
2562 StructuredData::Reader(r) => &mut r.port_impls,
2563 StructuredData::Writer(w) => &mut w.ports,
2564 }
2565 }
2566}