1use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr::{self};
8use std::rc::Rc;
9
10use servo_base::generic_channel::GenericSharedMemory;
11use servo_base::id::{MessagePortId, MessagePortIndex};
12use servo_constellation_traits::MessagePortImpl;
13use dom_struct::dom_struct;
14use js::jsapi::{Heap, JSObject};
15use js::jsval::{JSVal, ObjectValue, UndefinedValue};
16use js::realm::CurrentRealm;
17use js::rust::{
18 HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
19 MutableHandleValue as SafeMutableHandleValue,
20};
21use js::typedarray::ArrayBufferViewU8;
22use rustc_hash::FxHashMap;
23use script_bindings::conversions::SafeToJSValConvertible;
24
25use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
26use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{
27 ReadableStreamGetReaderOptions, ReadableStreamMethods, ReadableStreamReaderMode,
28 ReadableWritablePair, StreamPipeOptions,
29};
30use script_bindings::str::DOMString;
31
32use crate::dom::domexception::{DOMErrorName, DOMException};
33use script_bindings::conversions::{is_array_like, StringificationBehavior};
34use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
35use crate::dom::abortsignal::{AbortAlgorithm, AbortSignal};
36use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods;
37use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultController_Binding::ReadableStreamDefaultControllerMethods;
38use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource;
39use crate::dom::bindings::conversions::{ConversionBehavior, ConversionResult, SafeFromJSValConvertible};
40use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
41use crate::dom::bindings::codegen::GenericBindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriter_Binding::WritableStreamDefaultWriterMethods;
42use crate::dom::stream::writablestream::WritableStream;
43use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultReaderOrReadableStreamBYOBReader as ReadableStreamReader;
44use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
45use crate::dom::bindings::root::{DomRoot, MutNullableDom, Dom};
46use crate::dom::bindings::trace::RootedTraceableBox;
47use crate::dom::bindings::utils::get_dictionary_property;
48use crate::dom::stream::byteteeunderlyingsource::{ByteTeeCancelAlgorithm, ByteTeePullAlgorithm, ByteTeeUnderlyingSource};
49use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
50use crate::dom::stream::readablestreamgenericreader::ReadableStreamGenericReader;
51use crate::dom::globalscope::GlobalScope;
52use crate::dom::promise::{wait_for_all_promise, Promise};
53use crate::dom::stream::readablebytestreamcontroller::ReadableByteStreamController;
54use crate::dom::stream::readablestreambyobreader::ReadableStreamBYOBReader;
55use crate::dom::stream::readablestreamdefaultcontroller::ReadableStreamDefaultController;
56use crate::dom::stream::readablestreamdefaultreader::{ReadRequest, ReadableStreamDefaultReader};
57use crate::dom::stream::defaultteeunderlyingsource::DefaultTeeCancelAlgorithm;
58use crate::dom::types::DefaultTeeUnderlyingSource;
59use crate::dom::stream::underlyingsourcecontainer::UnderlyingSourceType;
60use crate::dom::stream::writablestreamdefaultwriter::WritableStreamDefaultWriter;
61use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
62use crate::dom::messageport::MessagePort;
63use crate::realms::{enter_realm, InRealm, enter_auto_realm};
64use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
65use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
66use crate::dom::bindings::transferable::Transferable;
67use crate::dom::bindings::structuredclone::StructuredData;
68
69use crate::dom::bindings::buffer_source::HeapBufferSource;
70use super::readablestreambyobreader::ReadIntoRequest;
71
72#[derive(Clone, Debug, Default, MallocSizeOf, PartialEq)]
74enum PipeToState {
75 #[default]
77 Starting,
78 PendingReady,
80 PendingRead,
82 ShuttingDownWithPendingWrites(Option<ShutdownAction>),
85 ShuttingDownPendingAction,
89 Finalized,
92}
93
94#[derive(Clone, Debug, MallocSizeOf, PartialEq)]
96enum ShutdownAction {
97 WritableStreamAbort,
99 ReadableStreamCancel,
101 WritableStreamDefaultWriterCloseWithErrorPropagation,
103 Abort,
105}
106
107impl js::gc::Rootable for PipeTo {}
108
109#[derive(Clone, JSTraceable, MallocSizeOf)]
118#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
119pub(crate) struct PipeTo {
120 reader: Dom<ReadableStreamDefaultReader>,
122
123 writer: Dom<WritableStreamDefaultWriter>,
125
126 #[ignore_malloc_size_of = "nested Rc"]
129 pending_writes: Rc<RefCell<VecDeque<Rc<Promise>>>>,
130
131 #[conditional_malloc_size_of]
133 #[no_trace]
134 state: Rc<RefCell<PipeToState>>,
135
136 prevent_abort: bool,
138
139 prevent_cancel: bool,
141
142 prevent_close: bool,
144
145 #[conditional_malloc_size_of]
148 shutting_down: Rc<Cell<bool>>,
149
150 #[ignore_malloc_size_of = "mozjs"]
153 abort_reason: Rc<Heap<JSVal>>,
154
155 #[ignore_malloc_size_of = "mozjs"]
158 shutdown_error: Rc<RefCell<Option<Heap<JSVal>>>>,
159
160 #[ignore_malloc_size_of = "nested Rc"]
163 shutdown_action_promise: Rc<RefCell<Option<Rc<Promise>>>>,
164
165 #[conditional_malloc_size_of]
168 result_promise: Rc<Promise>,
169}
170
171impl PipeTo {
172 pub(crate) fn abort_with_reason(
175 &self,
176 cx: &mut CurrentRealm,
177 global: &GlobalScope,
178 reason: SafeHandleValue,
179 ) {
180 if self.shutting_down.get() {
182 return;
183 }
184
185 self.abort_reason.set(reason.get());
189
190 self.set_shutdown_error(reason);
195
196 self.shutdown(cx, global, Some(ShutdownAction::Abort));
202 }
203}
204
205impl Callback for PipeTo {
206 #[expect(unsafe_code)]
215 fn callback(&self, cx: &mut CurrentRealm, result: SafeHandleValue) {
216 let in_realm_proof = cx.into();
217 let realm = InRealm::Already(&in_realm_proof);
218 let global = self.reader.global();
219
220 self.pending_writes.borrow_mut().retain(|p| {
226 let pending = p.is_pending();
227 if !pending {
228 p.set_promise_is_handled();
229 }
230 pending
231 });
232
233 let state_before_checks = self.state.borrow().clone();
235
236 if state_before_checks == PipeToState::PendingRead {
243 let source = self.reader.get_stream().expect("Source stream must be set");
244 if source.is_closed() {
245 let dest = self
246 .writer
247 .get_stream()
248 .expect("Destination stream must be set");
249
250 if dest.is_writable() && !dest.close_queued_or_in_flight() {
253 let Ok(done) = get_read_promise_done(cx.into(), &result, CanGc::from_cx(cx))
254 else {
255 return;
262 };
263
264 if !done {
265 self.write_chunk(cx, &global, result);
267 }
268 }
269 }
270 }
271
272 self.check_and_propagate_errors_forward(cx, &global);
273 self.check_and_propagate_errors_backward(cx, &global);
274 self.check_and_propagate_closing_forward(cx, &global);
275 self.check_and_propagate_closing_backward(cx, &global);
276
277 let state = self.state.borrow().clone();
279
280 if state != state_before_checks {
284 return;
285 }
286
287 match state {
288 PipeToState::Starting => unreachable!("PipeTo should not be in the Starting state."),
289 PipeToState::PendingReady => {
290 self.read_chunk(cx, &global);
292 },
293 PipeToState::PendingRead => {
294 self.write_chunk(cx, &global, result);
296
297 if self.shutting_down.get() {
299 return;
300 }
301
302 self.wait_for_writer_ready(cx, &global);
304 },
305 PipeToState::ShuttingDownWithPendingWrites(action) => {
306 if let Some(write) = self.pending_writes.borrow_mut().front().cloned() {
309 self.wait_on_pending_write(&global, write, realm, CanGc::from_cx(cx));
310 return;
311 }
312
313 if let Some(action) = action {
315 self.perform_action(cx, &global, action);
317 } else {
318 self.finalize(cx, &global);
320 }
321 },
322 PipeToState::ShuttingDownPendingAction => {
323 let Some(ref promise) = *self.shutdown_action_promise.borrow() else {
324 unreachable!();
325 };
326 if promise.is_pending() {
327 return;
331 }
332
333 let is_array_like = {
334 if !result.is_object() {
335 false
336 } else {
337 unsafe { is_array_like::<crate::DomTypeHolder>(cx.raw_cx(), result) }
338 }
339 };
340
341 if !result.is_undefined() && !is_array_like {
343 self.set_shutdown_error(result);
353 }
354 self.finalize(cx, &global);
355 },
356 PipeToState::Finalized => {},
357 }
358 }
359}
360
361impl PipeTo {
362 fn set_shutdown_error(&self, error: SafeHandleValue) {
365 *self.shutdown_error.borrow_mut() = Some(Heap::default());
366 let Some(ref heap) = *self.shutdown_error.borrow() else {
367 unreachable!("Option set to Some(heap) above.");
368 };
369 heap.set(error.get())
370 }
371
372 fn wait_for_writer_ready(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
375 let realm = cx.into();
376 let realm = InRealm::Already(&realm);
377
378 {
379 let mut state = self.state.borrow_mut();
380 *state = PipeToState::PendingReady;
381 }
382
383 let ready_promise = self.writer.Ready();
384 if ready_promise.is_fulfilled() {
385 self.read_chunk(cx, global);
386 } else {
387 let handler = PromiseNativeHandler::new(
388 global,
389 Some(Box::new(self.clone())),
390 Some(Box::new(self.clone())),
391 CanGc::from_cx(cx),
392 );
393 ready_promise.append_native_handler(&handler, realm, CanGc::from_cx(cx));
394
395 let closed_promise = self.reader.Closed();
399 closed_promise.append_native_handler(&handler, realm, CanGc::from_cx(cx));
400 }
401 }
402
403 fn read_chunk(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
405 let realm = cx.into();
406 let realm = InRealm::Already(&realm);
407
408 *self.state.borrow_mut() = PipeToState::PendingRead;
409 let chunk_promise = self.reader.Read(cx);
410 let handler = PromiseNativeHandler::new(
411 global,
412 Some(Box::new(self.clone())),
413 Some(Box::new(self.clone())),
414 CanGc::from_cx(cx),
415 );
416 chunk_promise.append_native_handler(&handler, realm, CanGc::from_cx(cx));
417
418 let ready_promise = self.writer.Closed();
421 ready_promise.append_native_handler(&handler, realm, CanGc::from_cx(cx));
422 }
423
424 #[expect(unsafe_code)]
427 fn write_chunk(
428 &self,
429 cx: &mut js::context::JSContext,
430 global: &GlobalScope,
431 chunk: SafeHandleValue,
432 ) -> bool {
433 if chunk.is_object() {
434 rooted!(&in(cx) let object = chunk.to_object());
435 rooted!(&in(cx) let mut bytes = UndefinedValue());
436 let has_value = unsafe {
437 get_dictionary_property(
438 cx.raw_cx(),
439 object.handle(),
440 c"value",
441 bytes.handle_mut(),
442 CanGc::from_cx(cx),
443 )
444 .expect("Chunk should have a value.")
445 };
446 if has_value {
447 let write_promise = self.writer.write(cx, global, bytes.handle());
449 self.pending_writes.borrow_mut().push_back(write_promise);
450 return true;
451 }
452 }
453 false
454 }
455
456 fn wait_on_pending_write(
460 &self,
461 global: &GlobalScope,
462 promise: Rc<Promise>,
463 realm: InRealm,
464 can_gc: CanGc,
465 ) {
466 let handler = PromiseNativeHandler::new(
467 global,
468 Some(Box::new(self.clone())),
469 Some(Box::new(self.clone())),
470 can_gc,
471 );
472 promise.append_native_handler(&handler, realm, can_gc);
473 }
474
475 fn check_and_propagate_errors_forward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
478 if self.shutting_down.get() {
481 return;
482 }
483
484 let source = self
486 .reader
487 .get_stream()
488 .expect("Reader should still have a stream");
489 if source.is_errored() {
490 rooted!(&in(cx) let mut source_error = UndefinedValue());
491 source.get_stored_error(source_error.handle_mut());
492 self.set_shutdown_error(source_error.handle());
493
494 if !self.prevent_abort {
496 self.shutdown(cx, global, Some(ShutdownAction::WritableStreamAbort))
499 } else {
500 self.shutdown(cx, global, None);
502 }
503 }
504 }
505
506 fn check_and_propagate_errors_backward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
509 if self.shutting_down.get() {
512 return;
513 }
514
515 let dest = self
517 .writer
518 .get_stream()
519 .expect("Writer should still have a stream");
520 if dest.is_errored() {
521 rooted!(&in(cx) let mut dest_error = UndefinedValue());
522 dest.get_stored_error(dest_error.handle_mut());
523 self.set_shutdown_error(dest_error.handle());
524
525 if !self.prevent_cancel {
527 self.shutdown(cx, global, Some(ShutdownAction::ReadableStreamCancel))
530 } else {
531 self.shutdown(cx, global, None);
533 }
534 }
535 }
536
537 fn check_and_propagate_closing_forward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
540 if self.shutting_down.get() {
543 return;
544 }
545
546 let source = self
548 .reader
549 .get_stream()
550 .expect("Reader should still have a stream");
551 if source.is_closed() {
552 if !self.prevent_close {
554 self.shutdown(
557 cx,
558 global,
559 Some(ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation),
560 )
561 } else {
562 self.shutdown(cx, global, None);
564 }
565 }
566 }
567
568 fn check_and_propagate_closing_backward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
571 if self.shutting_down.get() {
574 return;
575 }
576
577 let dest = self
580 .writer
581 .get_stream()
582 .expect("Writer should still have a stream");
583 if dest.close_queued_or_in_flight() || dest.is_closed() {
584 rooted!(&in(cx) let mut dest_closed = UndefinedValue());
589 let error =
590 Error::Type(c"Destination is closed or has closed queued or in flight".to_owned());
591 error.to_jsval(
592 cx.into(),
593 global,
594 dest_closed.handle_mut(),
595 CanGc::from_cx(cx),
596 );
597 self.set_shutdown_error(dest_closed.handle());
598
599 if !self.prevent_cancel {
601 self.shutdown(cx, global, Some(ShutdownAction::ReadableStreamCancel))
604 } else {
605 self.shutdown(cx, global, None);
607 }
608 }
609 }
610
611 fn shutdown(
615 &self,
616 cx: &mut CurrentRealm,
617 global: &GlobalScope,
618 action: Option<ShutdownAction>,
619 ) {
620 let realm = cx.into();
621 let realm = InRealm::Already(&realm);
622 if !self.shutting_down.replace(true) {
625 let dest = self.writer.get_stream().expect("Stream must be set");
626 if dest.is_writable() && !dest.close_queued_or_in_flight() {
629 if let Some(write) = self.pending_writes.borrow_mut().front() {
635 *self.state.borrow_mut() = PipeToState::ShuttingDownWithPendingWrites(action);
636 self.wait_on_pending_write(global, write.clone(), realm, CanGc::from_cx(cx));
637 return;
638 }
639 }
640
641 if let Some(action) = action {
643 self.perform_action(cx, global, action);
645 } else {
646 self.finalize(cx, global);
648 }
649 }
650 }
651
652 fn perform_action(&self, cx: &mut CurrentRealm, global: &GlobalScope, action: ShutdownAction) {
655 let realm = cx.into();
656 let realm = InRealm::Already(&realm);
657 rooted!(&in(cx) let mut error = UndefinedValue());
658 if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
659 error.set(shutdown_error.get());
660 }
661
662 *self.state.borrow_mut() = PipeToState::ShuttingDownPendingAction;
663
664 let promise = match action {
666 ShutdownAction::WritableStreamAbort => {
667 let dest = self.writer.get_stream().expect("Stream must be set");
668 dest.abort(cx, global, error.handle())
669 },
670 ShutdownAction::ReadableStreamCancel => {
671 let source = self
672 .reader
673 .get_stream()
674 .expect("Reader should have a stream.");
675 source.cancel(cx, global, error.handle())
676 },
677 ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation => {
678 self.writer.close_with_error_propagation(cx, global)
679 },
680 ShutdownAction::Abort => {
681 rooted!(&in(cx) let mut error = UndefinedValue());
686 error.set(self.abort_reason.get());
687
688 let mut actions = vec![];
690
691 if !self.prevent_abort {
693 let dest = self
694 .writer
695 .get_stream()
696 .expect("Destination stream must be set");
697
698 let promise = if dest.is_writable() {
700 dest.abort(cx, global, error.handle())
702 } else {
703 Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
705 };
706 actions.push(promise);
707 }
708
709 if !self.prevent_cancel {
711 let source = self.reader.get_stream().expect("Source stream must be set");
712
713 let promise = if source.is_readable() {
715 source.cancel(cx, global, error.handle())
717 } else {
718 Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
720 };
721 actions.push(promise);
722 }
723
724 wait_for_all_promise(cx.into(), global, actions, realm, CanGc::from_cx(cx))
728 },
729 };
730
731 let handler = PromiseNativeHandler::new(
734 global,
735 Some(Box::new(self.clone())),
736 Some(Box::new(self.clone())),
737 CanGc::from_cx(cx),
738 );
739 promise.append_native_handler(&handler, realm, CanGc::from_cx(cx));
740 *self.shutdown_action_promise.borrow_mut() = Some(promise);
741 }
742
743 fn finalize(&self, cx: &mut js::context::JSContext, global: &GlobalScope) {
745 *self.state.borrow_mut() = PipeToState::Finalized;
746
747 self.writer.release(cx.into(), global, CanGc::from_cx(cx));
749
750 self.reader
756 .release(CanGc::from_cx(cx))
757 .expect("Releasing the reader should not fail");
758
759 if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
765 rooted!(&in(cx) let mut error = UndefinedValue());
766 error.set(shutdown_error.get());
767 self.result_promise
769 .reject_native(&error.handle(), CanGc::from_cx(cx));
770 } else {
771 self.result_promise.resolve_native(&(), CanGc::from_cx(cx));
773 }
774 }
775}
776
777#[derive(Clone, JSTraceable, MallocSizeOf)]
780struct SourceCancelPromiseFulfillmentHandler {
781 #[conditional_malloc_size_of]
782 result: Rc<Promise>,
783}
784
785impl Callback for SourceCancelPromiseFulfillmentHandler {
786 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
790 self.result.resolve_native(&(), CanGc::from_cx(cx));
791 }
792}
793
794#[derive(Clone, JSTraceable, MallocSizeOf)]
797struct SourceCancelPromiseRejectionHandler {
798 #[conditional_malloc_size_of]
799 result: Rc<Promise>,
800}
801
802impl Callback for SourceCancelPromiseRejectionHandler {
803 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
807 self.result.reject_native(&v, CanGc::from_cx(cx));
808 }
809}
810
811#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf, PartialEq)]
813pub(crate) enum ReadableStreamState {
814 #[default]
815 Readable,
816 Closed,
817 Errored,
818}
819
820#[derive(JSTraceable, MallocSizeOf)]
822#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
823pub(crate) enum ControllerType {
824 Byte(MutNullableDom<ReadableByteStreamController>),
826 Default(MutNullableDom<ReadableStreamDefaultController>),
828}
829
830#[derive(JSTraceable, MallocSizeOf)]
832#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
833pub(crate) enum ReaderType {
834 #[allow(clippy::upper_case_acronyms)]
836 BYOB(MutNullableDom<ReadableStreamBYOBReader>),
837 Default(MutNullableDom<ReadableStreamDefaultReader>),
839}
840
841impl Eq for ReaderType {}
842impl PartialEq for ReaderType {
843 fn eq(&self, other: &Self) -> bool {
844 matches!(
845 (self, other),
846 (ReaderType::BYOB(_), ReaderType::BYOB(_)) |
847 (ReaderType::Default(_), ReaderType::Default(_))
848 )
849 }
850}
851
852#[cfg_attr(crown, expect(crown::unrooted_must_root))]
854pub(crate) fn create_readable_stream(
855 global: &GlobalScope,
856 underlying_source_type: UnderlyingSourceType,
857 queuing_strategy: Option<Rc<QueuingStrategySize>>,
858 high_water_mark: Option<f64>,
859 can_gc: CanGc,
860) -> DomRoot<ReadableStream> {
861 let high_water_mark = high_water_mark.unwrap_or(1.0);
863
864 let size_algorithm =
866 queuing_strategy.unwrap_or(extract_size_algorithm(&QueuingStrategy::empty(), can_gc));
867
868 assert!(high_water_mark >= 0.0);
870
871 let stream = ReadableStream::new_with_proto(global, None, can_gc);
874
875 let controller = ReadableStreamDefaultController::new(
877 global,
878 underlying_source_type,
879 high_water_mark,
880 size_algorithm,
881 can_gc,
882 );
883
884 controller
887 .setup(stream.clone(), can_gc)
888 .expect("Setup of default controller cannot fail");
889
890 stream
892}
893
894#[cfg_attr(crown, expect(crown::unrooted_must_root))]
896pub(crate) fn readable_byte_stream_tee(
897 global: &GlobalScope,
898 underlying_source_type: UnderlyingSourceType,
899 can_gc: CanGc,
900) -> DomRoot<ReadableStream> {
901 let tee_stream = ReadableStream::new_with_proto(global, None, can_gc);
904
905 let controller = ReadableByteStreamController::new(underlying_source_type, 0.0, global, can_gc);
907
908 controller
910 .setup(global, tee_stream.clone(), can_gc)
911 .expect("Setup of byte stream controller cannot fail");
912
913 tee_stream
915}
916
917#[dom_struct]
919pub(crate) struct ReadableStream {
920 reflector_: Reflector,
921
922 controller: RefCell<Option<ControllerType>>,
926
927 #[ignore_malloc_size_of = "mozjs"]
929 stored_error: Heap<JSVal>,
930
931 disturbed: Cell<bool>,
933
934 reader: RefCell<Option<ReaderType>>,
936
937 state: Cell<ReadableStreamState>,
939}
940
941impl ReadableStream {
942 fn new_inherited() -> ReadableStream {
944 ReadableStream {
945 reflector_: Reflector::new(),
946 controller: RefCell::new(None),
947 stored_error: Heap::default(),
948 disturbed: Default::default(),
949 reader: RefCell::new(None),
950 state: Cell::new(Default::default()),
951 }
952 }
953
954 pub(crate) fn new_with_proto(
955 global: &GlobalScope,
956 proto: Option<SafeHandleObject>,
957 can_gc: CanGc,
958 ) -> DomRoot<ReadableStream> {
959 reflect_dom_object_with_proto(
960 Box::new(ReadableStream::new_inherited()),
961 global,
962 proto,
963 can_gc,
964 )
965 }
966
967 pub(crate) fn set_default_controller(&self, controller: &ReadableStreamDefaultController) {
970 *self.controller.borrow_mut() = Some(ControllerType::Default(MutNullableDom::new(Some(
971 controller,
972 ))));
973 }
974
975 pub(crate) fn set_byte_controller(&self, controller: &ReadableByteStreamController) {
978 *self.controller.borrow_mut() =
979 Some(ControllerType::Byte(MutNullableDom::new(Some(controller))));
980 }
981
982 pub(crate) fn assert_no_controller(&self) {
985 let has_no_controller = self.controller.borrow().is_none();
986 assert!(has_no_controller);
987 }
988
989 pub(crate) fn new_from_bytes(
991 global: &GlobalScope,
992 bytes: Vec<u8>,
993 can_gc: CanGc,
994 ) -> Fallible<DomRoot<ReadableStream>> {
995 let stream = ReadableStream::new_with_external_underlying_source(
996 global,
997 UnderlyingSourceType::Memory(bytes.len()),
998 can_gc,
999 )?;
1000 stream.enqueue_native(bytes, can_gc);
1001 stream.controller_close_native(can_gc);
1002 Ok(stream)
1003 }
1004
1005 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1008 pub(crate) fn new_with_external_underlying_source(
1009 global: &GlobalScope,
1010 source: UnderlyingSourceType,
1011 can_gc: CanGc,
1012 ) -> Fallible<DomRoot<ReadableStream>> {
1013 assert!(source.is_native());
1014 let stream = ReadableStream::new_with_proto(global, None, can_gc);
1015 let controller = ReadableStreamDefaultController::new(
1016 global,
1017 source,
1018 1.0,
1019 extract_size_algorithm(&QueuingStrategy::empty(), can_gc),
1020 can_gc,
1021 );
1022 controller.setup(stream.clone(), can_gc)?;
1023 Ok(stream)
1024 }
1025
1026 pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
1028 match self.controller.borrow().as_ref() {
1029 Some(ControllerType::Default(controller)) => {
1030 let controller = controller
1031 .get()
1032 .ok_or_else(|| Error::Type(c"Stream should have controller.".to_owned()))?;
1033 controller.perform_release_steps()
1034 },
1035 Some(ControllerType::Byte(controller)) => {
1036 let controller = controller
1037 .get()
1038 .ok_or_else(|| Error::Type(c"Stream should have controller.".to_owned()))?;
1039 controller.perform_release_steps()
1040 },
1041 None => Err(Error::Type(c"Stream should have controller.".to_owned())),
1042 }
1043 }
1044
1045 pub(crate) fn perform_pull_steps(
1049 &self,
1050 cx: SafeJSContext,
1051 read_request: &ReadRequest,
1052 can_gc: CanGc,
1053 ) {
1054 match self.controller.borrow().as_ref() {
1055 Some(ControllerType::Default(controller)) => controller
1056 .get()
1057 .expect("Stream should have controller.")
1058 .perform_pull_steps(read_request, can_gc),
1059 Some(ControllerType::Byte(controller)) => controller
1060 .get()
1061 .expect("Stream should have controller.")
1062 .perform_pull_steps(cx, read_request, can_gc),
1063 None => {
1064 unreachable!("Stream does not have a controller.");
1065 },
1066 }
1067 }
1068
1069 pub(crate) fn perform_pull_into(
1073 &self,
1074 cx: SafeJSContext,
1075 read_into_request: &ReadIntoRequest,
1076 view: HeapBufferSource<ArrayBufferViewU8>,
1077 min: u64,
1078 can_gc: CanGc,
1079 ) {
1080 match self.controller.borrow().as_ref() {
1081 Some(ControllerType::Byte(controller)) => controller
1082 .get()
1083 .expect("Stream should have controller.")
1084 .perform_pull_into(cx, read_into_request, view, min, can_gc),
1085 _ => {
1086 unreachable!(
1087 "Pulling a chunk from a stream with a default controller using a BYOB reader"
1088 )
1089 },
1090 }
1091 }
1092
1093 pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
1095 match self.reader.borrow().as_ref() {
1096 Some(ReaderType::Default(reader)) => {
1097 let Some(reader) = reader.get() else {
1098 panic!("Attempt to add a read request without having first acquired a reader.");
1099 };
1100
1101 assert!(self.is_readable());
1103
1104 reader.add_read_request(read_request);
1106 },
1107 _ => {
1108 unreachable!("Adding a read request can only be done on a default reader.")
1109 },
1110 }
1111 }
1112
1113 pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
1115 match self.reader.borrow().as_ref() {
1116 Some(ReaderType::BYOB(reader)) => {
1118 let Some(reader) = reader.get() else {
1119 unreachable!(
1120 "Attempt to add a read into request without having first acquired a reader."
1121 );
1122 };
1123
1124 assert!(self.is_readable() || self.is_closed());
1126
1127 reader.add_read_into_request(read_request);
1129 },
1130 _ => {
1131 unreachable!("Adding a read into request can only be done on a BYOB reader.")
1132 },
1133 }
1134 }
1135
1136 pub(crate) fn enqueue_native(&self, bytes: Vec<u8>, can_gc: CanGc) {
1139 match self.controller.borrow().as_ref() {
1140 Some(ControllerType::Default(controller)) => controller
1141 .get()
1142 .expect("Stream should have controller.")
1143 .enqueue_native(bytes, can_gc),
1144 _ => {
1145 unreachable!(
1146 "Enqueueing chunk to a stream from Rust on other than default controller"
1147 );
1148 },
1149 }
1150 }
1151
1152 pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
1154 assert!(self.is_readable());
1156
1157 self.state.set(ReadableStreamState::Errored);
1159
1160 self.stored_error.set(e.get());
1162
1163 let default_reader = {
1166 let reader_ref = self.reader.borrow();
1167 match reader_ref.as_ref() {
1168 Some(ReaderType::Default(reader)) => reader.get(),
1169 _ => None,
1170 }
1171 };
1172
1173 if let Some(reader) = default_reader {
1174 reader.error(e, can_gc);
1176 return;
1177 }
1178
1179 let byob_reader = {
1180 let reader_ref = self.reader.borrow();
1181 match reader_ref.as_ref() {
1182 Some(ReaderType::BYOB(reader)) => reader.get(),
1183 _ => None,
1184 }
1185 };
1186
1187 if let Some(reader) = byob_reader {
1188 reader.error_read_into_requests(e, can_gc);
1190 }
1191
1192 }
1194
1195 pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
1197 handle_mut.set(self.stored_error.get());
1198 }
1199
1200 pub(crate) fn error_native(&self, error: Error, can_gc: CanGc) {
1203 let cx = GlobalScope::get_cx();
1204 rooted!(in(*cx) let mut error_val = UndefinedValue());
1205 error.to_jsval(cx, &self.global(), error_val.handle_mut(), can_gc);
1206 self.error(error_val.handle(), can_gc);
1207 }
1208
1209 pub(crate) fn controller_close_native(&self, can_gc: CanGc) {
1212 match self.controller.borrow().as_ref() {
1213 Some(ControllerType::Default(controller)) => {
1214 let _ = controller
1215 .get()
1216 .expect("Stream should have controller.")
1217 .Close(can_gc);
1218 },
1219 _ => {
1220 unreachable!("Native closing is only done on default controllers.")
1221 },
1222 }
1223 }
1224
1225 pub(crate) fn in_memory(&self) -> bool {
1228 match self.controller.borrow().as_ref() {
1229 Some(ControllerType::Default(controller)) => controller
1230 .get()
1231 .expect("Stream should have controller.")
1232 .in_memory(),
1233 _ => {
1234 unreachable!(
1235 "Checking if source is in memory for a stream with a non-default controller"
1236 )
1237 },
1238 }
1239 }
1240
1241 pub(crate) fn get_in_memory_bytes(&self) -> Option<GenericSharedMemory> {
1244 match self.controller.borrow().as_ref() {
1245 Some(ControllerType::Default(controller)) => controller
1246 .get()
1247 .expect("Stream should have controller.")
1248 .get_in_memory_bytes()
1249 .as_deref()
1250 .map(GenericSharedMemory::from_bytes),
1251 _ => {
1252 unreachable!("Getting in-memory bytes for a stream with a non-default controller")
1253 },
1254 }
1255 }
1256
1257 pub(crate) fn acquire_default_reader(
1262 &self,
1263 can_gc: CanGc,
1264 ) -> Fallible<DomRoot<ReadableStreamDefaultReader>> {
1265 let reader = ReadableStreamDefaultReader::new(&self.global(), can_gc);
1267
1268 reader.set_up(self, &self.global(), can_gc)?;
1270
1271 Ok(reader)
1273 }
1274
1275 pub(crate) fn acquire_byob_reader(
1277 &self,
1278 can_gc: CanGc,
1279 ) -> Fallible<DomRoot<ReadableStreamBYOBReader>> {
1280 let reader = ReadableStreamBYOBReader::new(&self.global(), can_gc);
1282 reader.set_up(self, &self.global(), can_gc)?;
1284
1285 Ok(reader)
1287 }
1288
1289 pub(crate) fn get_default_controller(&self) -> DomRoot<ReadableStreamDefaultController> {
1290 match self.controller.borrow().as_ref() {
1291 Some(ControllerType::Default(controller)) => {
1292 controller.get().expect("Stream should have controller.")
1293 },
1294 _ => {
1295 unreachable!(
1296 "Getting default controller for a stream with a non-default controller"
1297 )
1298 },
1299 }
1300 }
1301
1302 pub(crate) fn get_byte_controller(&self) -> DomRoot<ReadableByteStreamController> {
1303 match self.controller.borrow().as_ref() {
1304 Some(ControllerType::Byte(controller)) => {
1305 controller.get().expect("Stream should have controller.")
1306 },
1307 _ => {
1308 unreachable!("Getting byte controller for a stream with a non-byte controller")
1309 },
1310 }
1311 }
1312
1313 pub(crate) fn get_default_reader(&self) -> DomRoot<ReadableStreamDefaultReader> {
1314 match self.reader.borrow().as_ref() {
1315 Some(ReaderType::Default(reader)) => reader.get().expect("Stream should have reader."),
1316 _ => {
1317 unreachable!("Getting default reader for a stream with a non-default reader")
1318 },
1319 }
1320 }
1321
1322 pub(crate) fn read_a_chunk(&self, cx: &mut js::context::JSContext) -> Rc<Promise> {
1328 match self.reader.borrow().as_ref() {
1329 Some(ReaderType::Default(reader)) => {
1330 let Some(reader) = reader.get() else {
1331 unreachable!(
1332 "Attempt to read stream chunk without having first acquired a reader."
1333 );
1334 };
1335 reader.Read(cx)
1336 },
1337 _ => {
1338 unreachable!("Native reading of a chunk can only be done with a default reader.")
1339 },
1340 }
1341 }
1342
1343 pub(crate) fn stop_reading(&self, can_gc: CanGc) {
1348 let reader_ref = self.reader.borrow();
1349
1350 match reader_ref.as_ref() {
1351 Some(ReaderType::Default(reader)) => {
1352 let Some(reader) = reader.get() else {
1353 unreachable!("Attempt to stop reading without having first acquired a reader.");
1354 };
1355
1356 drop(reader_ref);
1357 reader.release(can_gc).expect("Reader release cannot fail.");
1358 },
1359 _ => {
1360 unreachable!("Native stop reading can only be done with a default reader.")
1361 },
1362 }
1363 }
1364
1365 pub(crate) fn is_locked(&self) -> bool {
1367 match self.reader.borrow().as_ref() {
1368 Some(ReaderType::Default(reader)) => reader.get().is_some(),
1369 Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1370 None => false,
1371 }
1372 }
1373
1374 pub(crate) fn is_disturbed(&self) -> bool {
1375 self.disturbed.get()
1376 }
1377
1378 pub(crate) fn set_is_disturbed(&self, disturbed: bool) {
1379 self.disturbed.set(disturbed);
1380 }
1381
1382 pub(crate) fn is_closed(&self) -> bool {
1383 self.state.get() == ReadableStreamState::Closed
1384 }
1385
1386 pub(crate) fn is_errored(&self) -> bool {
1387 self.state.get() == ReadableStreamState::Errored
1388 }
1389
1390 pub(crate) fn is_readable(&self) -> bool {
1391 self.state.get() == ReadableStreamState::Readable
1392 }
1393
1394 pub(crate) fn has_default_reader(&self) -> bool {
1395 match self.reader.borrow().as_ref() {
1396 Some(ReaderType::Default(reader)) => reader.get().is_some(),
1397 _ => false,
1398 }
1399 }
1400
1401 pub(crate) fn has_byob_reader(&self) -> bool {
1402 match self.reader.borrow().as_ref() {
1403 Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1404 _ => false,
1405 }
1406 }
1407
1408 pub(crate) fn has_byte_controller(&self) -> bool {
1409 match self.controller.borrow().as_ref() {
1410 Some(ControllerType::Byte(controller)) => controller.get().is_some(),
1411 _ => false,
1412 }
1413 }
1414
1415 pub(crate) fn get_num_read_requests(&self) -> usize {
1417 match self.reader.borrow().as_ref() {
1418 Some(ReaderType::Default(reader)) => {
1419 let reader = reader
1420 .get()
1421 .expect("Stream must have a reader when getting the number of read requests.");
1422 reader.get_num_read_requests()
1423 },
1424 _ => unreachable!(
1425 "Stream must have a default reader when get num read requests is called into."
1426 ),
1427 }
1428 }
1429
1430 pub(crate) fn get_num_read_into_requests(&self) -> usize {
1432 assert!(self.has_byob_reader());
1433
1434 match self.reader.borrow().as_ref() {
1435 Some(ReaderType::BYOB(reader)) => {
1436 let Some(reader) = reader.get() else {
1437 unreachable!(
1438 "Stream must have a reader when get num read into requests is called into."
1439 );
1440 };
1441 reader.get_num_read_into_requests()
1442 },
1443 _ => {
1444 unreachable!(
1445 "Stream must have a BYOB reader when get num read into requests is called into."
1446 );
1447 },
1448 }
1449 }
1450
1451 pub(crate) fn fulfill_read_request(&self, chunk: SafeHandleValue, done: bool, can_gc: CanGc) {
1453 assert!(self.has_default_reader());
1455
1456 match self.reader.borrow().as_ref() {
1457 Some(ReaderType::Default(reader)) => {
1458 let reader = reader
1460 .get()
1461 .expect("Stream must have a reader when a read request is fulfilled.");
1462 assert_ne!(reader.get_num_read_requests(), 0);
1464 let request = reader.remove_read_request();
1467
1468 if done {
1469 request.close_steps(can_gc);
1471 } else {
1472 let result = RootedTraceableBox::new(Heap::default());
1474 result.set(*chunk);
1475 request.chunk_steps(result, &self.global(), can_gc);
1476 }
1477 },
1478 _ => {
1479 unreachable!(
1480 "Stream must have a default reader when fulfill read requests is called into."
1481 );
1482 },
1483 }
1484 }
1485
1486 pub(crate) fn fulfill_read_into_request(
1488 &self,
1489 chunk: SafeHandleValue,
1490 done: bool,
1491 can_gc: CanGc,
1492 ) {
1493 assert!(self.has_byob_reader());
1495
1496 match self.reader.borrow().as_ref() {
1498 Some(ReaderType::BYOB(reader)) => {
1499 let Some(reader) = reader.get() else {
1500 unreachable!(
1501 "Stream must have a reader when a read into request is fulfilled."
1502 );
1503 };
1504
1505 assert!(reader.get_num_read_into_requests() > 0);
1507
1508 let read_into_request = reader.remove_read_into_request();
1511
1512 let result = RootedTraceableBox::new(Heap::default());
1514 if done {
1515 result.set(*chunk);
1516 read_into_request.close_steps(Some(result), can_gc);
1517 } else {
1518 result.set(*chunk);
1520 read_into_request.chunk_steps(result, can_gc);
1521 }
1522 },
1523 _ => {
1524 unreachable!(
1525 "Stream must have a BYOB reader when fulfill read into requests is called into."
1526 );
1527 },
1528 };
1529 }
1530
1531 pub(crate) fn close(&self, can_gc: CanGc) {
1533 assert!(self.is_readable());
1535 self.state.set(ReadableStreamState::Closed);
1537 let default_reader = {
1543 let reader_ref = self.reader.borrow();
1544 match reader_ref.as_ref() {
1545 Some(ReaderType::Default(reader)) => reader.get(),
1546 _ => None,
1547 }
1548 };
1549
1550 if let Some(reader) = default_reader {
1551 reader.close(can_gc);
1553 return;
1554 }
1555
1556 let byob_reader = {
1558 let reader_ref = self.reader.borrow();
1559 match reader_ref.as_ref() {
1560 Some(ReaderType::BYOB(reader)) => reader.get(),
1561 _ => None,
1562 }
1563 };
1564
1565 if let Some(reader) = byob_reader {
1566 reader.close(can_gc);
1568 }
1569
1570 }
1572
1573 pub(crate) fn cancel(
1575 &self,
1576 cx: &mut js::context::JSContext,
1577 global: &GlobalScope,
1578 reason: SafeHandleValue,
1579 ) -> Rc<Promise> {
1580 self.disturbed.set(true);
1582
1583 if self.is_closed() {
1585 return Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
1586 }
1587 if self.is_errored() {
1589 let promise = Promise::new2(cx, global);
1590 rooted!(&in(cx) let mut rval = UndefinedValue());
1591 self.stored_error
1592 .safe_to_jsval(cx.into(), rval.handle_mut(), CanGc::from_cx(cx));
1593 promise.reject_native(&rval.handle(), CanGc::from_cx(cx));
1594 return promise;
1595 }
1596 self.close(CanGc::from_cx(cx));
1598
1599 let byob_reader = {
1601 let reader_ref = self.reader.borrow();
1602 match reader_ref.as_ref() {
1603 Some(ReaderType::BYOB(reader)) => reader.get(),
1604 _ => None,
1605 }
1606 };
1607
1608 if let Some(reader) = byob_reader {
1609 reader.cancel(CanGc::from_cx(cx));
1611 }
1612
1613 let source_cancel_promise = match self.controller.borrow().as_ref() {
1616 Some(ControllerType::Default(controller)) => controller
1617 .get()
1618 .expect("Stream should have controller.")
1619 .perform_cancel_steps(cx, global, reason),
1620 Some(ControllerType::Byte(controller)) => controller
1621 .get()
1622 .expect("Stream should have controller.")
1623 .perform_cancel_steps(cx, global, reason),
1624 None => {
1625 panic!("Stream does not have a controller.");
1626 },
1627 };
1628
1629 let global = self.global();
1632 let result_promise = Promise::new2(cx, &global);
1633 let fulfillment_handler = Box::new(SourceCancelPromiseFulfillmentHandler {
1634 result: result_promise.clone(),
1635 });
1636 let rejection_handler = Box::new(SourceCancelPromiseRejectionHandler {
1637 result: result_promise.clone(),
1638 });
1639 let handler = PromiseNativeHandler::new(
1640 &global,
1641 Some(fulfillment_handler),
1642 Some(rejection_handler),
1643 CanGc::from_cx(cx),
1644 );
1645 let realm = enter_realm(&*global);
1646 let comp = InRealm::Entered(&realm);
1647 source_cancel_promise.append_native_handler(&handler, comp, CanGc::from_cx(cx));
1648
1649 result_promise
1652 }
1653
1654 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1655 pub(crate) fn set_reader(&self, new_reader: Option<ReaderType>) {
1656 *self.reader.borrow_mut() = new_reader;
1657 }
1658
1659 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1660 fn byte_tee(&self, can_gc: CanGc) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1662 let reader = self.acquire_default_reader(can_gc)?;
1667 let reader = Rc::new(RefCell::new(ReaderType::Default(MutNullableDom::new(
1668 Some(&reader),
1669 ))));
1670
1671 let reading = Rc::new(Cell::new(false));
1673
1674 let read_again_for_branch_1 = Rc::new(Cell::new(false));
1676
1677 let read_again_for_branch_2 = Rc::new(Cell::new(false));
1679
1680 let canceled_1 = Rc::new(Cell::new(false));
1682
1683 let canceled_2 = Rc::new(Cell::new(false));
1685
1686 let reason_1 = Rc::new(Heap::boxed(UndefinedValue()));
1688
1689 let reason_2 = Rc::new(Heap::boxed(UndefinedValue()));
1691
1692 let cancel_promise = Promise::new(&self.global(), can_gc);
1694 let reader_version = Rc::new(Cell::new(0));
1695
1696 let byte_tee_source_1 = ByteTeeUnderlyingSource::new(
1697 reader.clone(),
1698 self,
1699 reading.clone(),
1700 read_again_for_branch_1.clone(),
1701 read_again_for_branch_2.clone(),
1702 canceled_1.clone(),
1703 canceled_2.clone(),
1704 reason_1.clone(),
1705 reason_2.clone(),
1706 cancel_promise.clone(),
1707 reader_version.clone(),
1708 ByteTeeCancelAlgorithm::Cancel1Algorithm,
1709 ByteTeePullAlgorithm::Pull1Algorithm,
1710 can_gc,
1711 );
1712
1713 let byte_tee_source_2 = ByteTeeUnderlyingSource::new(
1714 reader.clone(),
1715 self,
1716 reading,
1717 read_again_for_branch_1,
1718 read_again_for_branch_2,
1719 canceled_1,
1720 canceled_2,
1721 reason_1,
1722 reason_2,
1723 cancel_promise,
1724 reader_version,
1725 ByteTeeCancelAlgorithm::Cancel2Algorithm,
1726 ByteTeePullAlgorithm::Pull2Algorithm,
1727 can_gc,
1728 );
1729
1730 let branch_1 = readable_byte_stream_tee(
1732 &self.global(),
1733 UnderlyingSourceType::TeeByte(Dom::from_ref(&byte_tee_source_1)),
1734 can_gc,
1735 );
1736 byte_tee_source_1.set_branch_1(&branch_1);
1737 byte_tee_source_2.set_branch_1(&branch_1);
1738
1739 let branch_2 = readable_byte_stream_tee(
1741 &self.global(),
1742 UnderlyingSourceType::TeeByte(Dom::from_ref(&byte_tee_source_2)),
1743 can_gc,
1744 );
1745 byte_tee_source_1.set_branch_2(&branch_2);
1746 byte_tee_source_2.set_branch_2(&branch_2);
1747
1748 byte_tee_source_1.forward_reader_error(reader.clone(), can_gc);
1750 byte_tee_source_2.forward_reader_error(reader, can_gc);
1751
1752 Ok(vec![branch_1, branch_2])
1754 }
1755
1756 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1758 fn default_tee(
1759 &self,
1760 clone_for_branch_2: bool,
1761 can_gc: CanGc,
1762 ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1763 let clone_for_branch_2 = Rc::new(Cell::new(clone_for_branch_2));
1767
1768 let reader = self.acquire_default_reader(can_gc)?;
1770
1771 let reading = Rc::new(Cell::new(false));
1773 let read_again = Rc::new(Cell::new(false));
1775 let canceled_1 = Rc::new(Cell::new(false));
1777 let canceled_2 = Rc::new(Cell::new(false));
1779
1780 let reason_1 = Rc::new(Heap::boxed(UndefinedValue()));
1782 let reason_2 = Rc::new(Heap::boxed(UndefinedValue()));
1784 let cancel_promise = Promise::new(&self.global(), can_gc);
1786
1787 let tee_source_1 = DefaultTeeUnderlyingSource::new(
1788 &reader,
1789 self,
1790 reading.clone(),
1791 read_again.clone(),
1792 canceled_1.clone(),
1793 canceled_2.clone(),
1794 clone_for_branch_2.clone(),
1795 reason_1.clone(),
1796 reason_2.clone(),
1797 cancel_promise.clone(),
1798 DefaultTeeCancelAlgorithm::Cancel1Algorithm,
1799 can_gc,
1800 );
1801
1802 let underlying_source_type_branch_1 =
1803 UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_1));
1804
1805 let tee_source_2 = DefaultTeeUnderlyingSource::new(
1806 &reader,
1807 self,
1808 reading,
1809 read_again,
1810 canceled_1.clone(),
1811 canceled_2.clone(),
1812 clone_for_branch_2,
1813 reason_1,
1814 reason_2,
1815 cancel_promise.clone(),
1816 DefaultTeeCancelAlgorithm::Cancel2Algorithm,
1817 can_gc,
1818 );
1819
1820 let underlying_source_type_branch_2 =
1821 UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_2));
1822
1823 let branch_1 = create_readable_stream(
1825 &self.global(),
1826 underlying_source_type_branch_1,
1827 None,
1828 None,
1829 can_gc,
1830 );
1831 tee_source_1.set_branch_1(&branch_1);
1832 tee_source_2.set_branch_1(&branch_1);
1833
1834 let branch_2 = create_readable_stream(
1836 &self.global(),
1837 underlying_source_type_branch_2,
1838 None,
1839 None,
1840 can_gc,
1841 );
1842 tee_source_1.set_branch_2(&branch_2);
1843 tee_source_2.set_branch_2(&branch_2);
1844
1845 reader.default_tee_append_native_handler_to_closed_promise(
1847 &branch_1,
1848 &branch_2,
1849 canceled_1,
1850 canceled_2,
1851 cancel_promise,
1852 can_gc,
1853 );
1854
1855 Ok(vec![branch_1, branch_2])
1857 }
1858
1859 #[allow(clippy::too_many_arguments)]
1861 pub(crate) fn pipe_to(
1862 &self,
1863 cx: &mut CurrentRealm,
1864 global: &GlobalScope,
1865 dest: &WritableStream,
1866 prevent_close: bool,
1867 prevent_abort: bool,
1868 prevent_cancel: bool,
1869 signal: Option<&AbortSignal>,
1870 ) -> Rc<Promise> {
1871 assert!(!self.is_locked());
1882
1883 assert!(!dest.is_locked());
1885
1886 let reader = self
1894 .acquire_default_reader(CanGc::from_cx(cx))
1895 .expect("Acquiring a default reader for pipe_to cannot fail");
1896
1897 let writer = dest
1899 .aquire_default_writer(cx.into(), global, CanGc::from_cx(cx))
1900 .expect("Acquiring a default writer for pipe_to cannot fail");
1901
1902 self.disturbed.set(true);
1904
1905 let promise = Promise::new2(cx, global);
1910
1911 rooted!(&in(cx) let pipe_to = PipeTo {
1913 reader: Dom::from_ref(&reader),
1914 writer: Dom::from_ref(&writer),
1915 pending_writes: Default::default(),
1916 state: Default::default(),
1917 prevent_abort,
1918 prevent_cancel,
1919 prevent_close,
1920 shutting_down: Default::default(),
1921 abort_reason: Default::default(),
1922 shutdown_error: Default::default(),
1923 shutdown_action_promise: Default::default(),
1924 result_promise: promise.clone(),
1925 });
1926
1927 if let Some(signal) = signal {
1930 rooted!(&in(cx) let abort_algorithm = AbortAlgorithm::StreamPiping(pipe_to.clone()));
1933
1934 if signal.aborted() {
1936 signal.run_abort_algorithm(cx, global, &abort_algorithm);
1937 return promise;
1938 }
1939
1940 signal.add(&abort_algorithm);
1942 }
1943
1944 pipe_to.check_and_propagate_errors_forward(cx, global);
1946 pipe_to.check_and_propagate_errors_backward(cx, global);
1947 pipe_to.check_and_propagate_closing_forward(cx, global);
1948 pipe_to.check_and_propagate_closing_backward(cx, global);
1949
1950 if *pipe_to.state.borrow() == PipeToState::Starting {
1952 pipe_to.wait_for_writer_ready(cx, global);
1954 }
1955
1956 promise
1958 }
1959
1960 pub(crate) fn tee(
1962 &self,
1963 clone_for_branch_2: bool,
1964 can_gc: CanGc,
1965 ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1966 match self.controller.borrow().as_ref() {
1970 Some(ControllerType::Default(_)) => {
1971 self.default_tee(clone_for_branch_2, can_gc)
1973 },
1974 Some(ControllerType::Byte(_)) => {
1975 self.byte_tee(can_gc)
1978 },
1979 None => {
1980 unreachable!("Stream should have a controller.");
1981 },
1982 }
1983 }
1984
1985 pub(crate) fn set_up_byte_controller(
1987 &self,
1988 global: &GlobalScope,
1989 underlying_source_dict: JsUnderlyingSource,
1990 underlying_source_handle: SafeHandleObject,
1991 stream: DomRoot<ReadableStream>,
1992 strategy_hwm: f64,
1993 can_gc: CanGc,
1994 ) -> Fallible<()> {
1995 if let Some(0) = underlying_source_dict.autoAllocateChunkSize {
2011 return Err(Error::Type(c"autoAllocateChunkSize cannot be 0".to_owned()));
2012 }
2013
2014 let controller = ReadableByteStreamController::new(
2015 UnderlyingSourceType::Js(underlying_source_dict, Heap::default()),
2016 strategy_hwm,
2017 global,
2018 can_gc,
2019 );
2020
2021 controller.set_underlying_source_this_object(underlying_source_handle);
2024
2025 controller.setup(global, stream, can_gc)
2028 }
2029
2030 pub(crate) fn setup_cross_realm_transform_readable(
2032 &self,
2033 cx: &mut js::context::JSContext,
2034 port: &MessagePort,
2035 ) {
2036 let port_id = port.message_port_id();
2037 let global = self.global();
2038
2039 let size_algorithm =
2044 extract_size_algorithm(&QueuingStrategy::default(), CanGc::from_cx(cx));
2045
2046 let controller = ReadableStreamDefaultController::new(
2050 &self.global(),
2051 UnderlyingSourceType::Transfer(Dom::from_ref(port)),
2052 0.,
2053 size_algorithm,
2054 CanGc::from_cx(cx),
2055 );
2056
2057 rooted!(&in(cx) let cross_realm_transform_readable = CrossRealmTransformReadable {
2060 controller: Dom::from_ref(&controller),
2061 });
2062 global.note_cross_realm_transform_readable(&cross_realm_transform_readable, port_id);
2063
2064 port.Start(cx);
2066
2067 controller
2069 .setup(DomRoot::from_ref(self), CanGc::from_cx(cx))
2070 .expect("Setting up controller for transfer cannot fail.");
2071 }
2072}
2073
2074impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
2075 fn Constructor(
2077 cx: SafeJSContext,
2078 global: &GlobalScope,
2079 proto: Option<SafeHandleObject>,
2080 can_gc: CanGc,
2081 underlying_source: Option<*mut JSObject>,
2082 strategy: &QueuingStrategy,
2083 ) -> Fallible<DomRoot<Self>> {
2084 rooted!(in(*cx) let underlying_source_obj = underlying_source.unwrap_or(ptr::null_mut()));
2086 let underlying_source_dict = if !underlying_source_obj.is_null() {
2089 rooted!(in(*cx) let obj_val = ObjectValue(underlying_source_obj.get()));
2090 match JsUnderlyingSource::new(cx, obj_val.handle(), can_gc) {
2091 Ok(ConversionResult::Success(val)) => val,
2092 Ok(ConversionResult::Failure(error)) => {
2093 return Err(Error::Type(error.into_owned()));
2094 },
2095 _ => {
2096 return Err(Error::JSFailed);
2097 },
2098 }
2099 } else {
2100 JsUnderlyingSource::empty()
2101 };
2102
2103 let stream = ReadableStream::new_with_proto(global, proto, can_gc);
2105
2106 if underlying_source_dict.type_.is_some() {
2107 if strategy.size.is_some() {
2109 return Err(Error::Range(
2110 c"size is not supported for byte streams".to_owned(),
2111 ));
2112 }
2113
2114 let strategy_hwm = extract_high_water_mark(strategy, 0.0)?;
2116
2117 stream.set_up_byte_controller(
2120 global,
2121 underlying_source_dict,
2122 underlying_source_obj.handle(),
2123 stream.clone(),
2124 strategy_hwm,
2125 can_gc,
2126 )?;
2127 } else {
2128 let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
2130
2131 let size_algorithm = extract_size_algorithm(strategy, can_gc);
2133
2134 let controller = ReadableStreamDefaultController::new(
2135 global,
2136 UnderlyingSourceType::Js(underlying_source_dict, Heap::default()),
2137 high_water_mark,
2138 size_algorithm,
2139 can_gc,
2140 );
2141
2142 controller.set_underlying_source_this_object(underlying_source_obj.handle());
2145
2146 controller.setup(stream.clone(), can_gc)?;
2148 };
2149
2150 Ok(stream)
2151 }
2152
2153 fn Locked(&self) -> bool {
2155 self.is_locked()
2156 }
2157
2158 fn Cancel(&self, cx: &mut js::context::JSContext, reason: SafeHandleValue) -> Rc<Promise> {
2160 let global = self.global();
2161 if self.is_locked() {
2162 let promise = Promise::new2(cx, &global);
2165 promise.reject_error(
2166 Error::Type(c"stream is locked".to_owned()),
2167 CanGc::from_cx(cx),
2168 );
2169 promise
2170 } else {
2171 self.cancel(cx, &global, reason)
2173 }
2174 }
2175
2176 fn GetReader(
2178 &self,
2179 options: &ReadableStreamGetReaderOptions,
2180 can_gc: CanGc,
2181 ) -> Fallible<ReadableStreamReader> {
2182 if options.mode.is_none() {
2184 return Ok(ReadableStreamReader::ReadableStreamDefaultReader(
2185 self.acquire_default_reader(can_gc)?,
2186 ));
2187 }
2188 assert!(options.mode.unwrap() == ReadableStreamReaderMode::Byob);
2190
2191 Ok(ReadableStreamReader::ReadableStreamBYOBReader(
2193 self.acquire_byob_reader(can_gc)?,
2194 ))
2195 }
2196
2197 fn Tee(&self, can_gc: CanGc) -> Fallible<Vec<DomRoot<ReadableStream>>> {
2199 self.tee(false, can_gc)
2201 }
2202
2203 fn PipeTo(
2205 &self,
2206 cx: &mut CurrentRealm,
2207 destination: &WritableStream,
2208 options: &StreamPipeOptions,
2209 ) -> Rc<Promise> {
2210 let global = self.global();
2211
2212 if self.is_locked() {
2214 let promise = Promise::new2(cx, &global);
2216 promise.reject_error(
2217 Error::Type(c"Source stream is locked".to_owned()),
2218 CanGc::from_cx(cx),
2219 );
2220 return promise;
2221 }
2222
2223 if destination.is_locked() {
2225 let promise = Promise::new2(cx, &global);
2227 promise.reject_error(
2228 Error::Type(c"Destination stream is locked".to_owned()),
2229 CanGc::from_cx(cx),
2230 );
2231 return promise;
2232 }
2233
2234 let signal = options.signal.as_deref();
2236
2237 self.pipe_to(
2239 cx,
2240 &global,
2241 destination,
2242 options.preventClose,
2243 options.preventAbort,
2244 options.preventCancel,
2245 signal,
2246 )
2247 }
2248
2249 fn PipeThrough(
2251 &self,
2252 cx: &mut CurrentRealm,
2253 transform: &ReadableWritablePair,
2254 options: &StreamPipeOptions,
2255 ) -> Fallible<DomRoot<ReadableStream>> {
2256 let global = self.global();
2257
2258 if self.is_locked() {
2260 return Err(Error::Type(c"Source stream is locked".to_owned()));
2261 }
2262
2263 if transform.writable.is_locked() {
2265 return Err(Error::Type(c"Destination stream is locked".to_owned()));
2266 }
2267
2268 let signal = options.signal.as_deref();
2270
2271 let promise = self.pipe_to(
2274 cx,
2275 &global,
2276 &transform.writable,
2277 options.preventClose,
2278 options.preventAbort,
2279 options.preventCancel,
2280 signal,
2281 );
2282
2283 promise.set_promise_is_handled();
2285
2286 Ok(transform.readable.clone())
2288 }
2289}
2290
2291#[expect(unsafe_code)]
2292pub(crate) unsafe fn get_type_and_value_from_message(
2296 cx: SafeJSContext,
2297 data: SafeHandleValue,
2298 value: SafeMutableHandleValue,
2299 can_gc: CanGc,
2300) -> DOMString {
2301 assert!(data.is_object());
2307 rooted!(in(*cx) let data_object = data.to_object());
2308
2309 rooted!(in(*cx) let mut type_ = UndefinedValue());
2311 unsafe {
2312 get_dictionary_property(
2313 *cx,
2314 data_object.handle(),
2315 c"type",
2316 type_.handle_mut(),
2317 can_gc,
2318 )
2319 }
2320 .expect("Getting the type should not fail.");
2321
2322 unsafe { get_dictionary_property(*cx, data_object.handle(), c"value", value, can_gc) }
2324 .expect("Getting the value should not fail.");
2325
2326 let result =
2328 DOMString::safe_from_jsval(cx, type_.handle(), StringificationBehavior::Empty, can_gc)
2329 .expect("The type of the message should be a string");
2330 let ConversionResult::Success(type_string) = result else {
2331 unreachable!("The type of the message should be a string");
2332 };
2333
2334 type_string
2335}
2336
2337impl js::gc::Rootable for CrossRealmTransformReadable {}
2338
2339#[derive(Clone, JSTraceable, MallocSizeOf)]
2343#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
2344pub(crate) struct CrossRealmTransformReadable {
2345 controller: Dom<ReadableStreamDefaultController>,
2347}
2348
2349impl CrossRealmTransformReadable {
2350 #[expect(unsafe_code)]
2353 pub(crate) fn handle_message(
2354 &self,
2355 cx: &mut CurrentRealm,
2356 global: &GlobalScope,
2357 port: &MessagePort,
2358 message: SafeHandleValue,
2359 ) {
2360 rooted!(&in(cx) let mut value = UndefinedValue());
2361 let type_string = unsafe {
2362 get_type_and_value_from_message(
2363 cx.into(),
2364 message,
2365 value.handle_mut(),
2366 CanGc::from_cx(cx),
2367 )
2368 };
2369
2370 if type_string == "chunk" {
2372 self.controller
2374 .enqueue(cx, value.handle())
2375 .expect("Enqueing a chunk should not fail.");
2376 }
2377
2378 if type_string == "close" {
2380 self.controller.close(CanGc::from_cx(cx));
2382
2383 global.disentangle_port(port, CanGc::from_cx(cx));
2385 }
2386
2387 if type_string == "error" {
2389 self.controller.error(value.handle(), CanGc::from_cx(cx));
2391
2392 global.disentangle_port(port, CanGc::from_cx(cx));
2394 }
2395 }
2396
2397 pub(crate) fn handle_error(
2400 &self,
2401 cx: &mut CurrentRealm,
2402 global: &GlobalScope,
2403 port: &MessagePort,
2404 ) {
2405 let error = DOMException::new(global, DOMErrorName::DataCloneError, CanGc::from_cx(cx));
2407 rooted!(&in(cx) let mut rooted_error = UndefinedValue());
2408 error.safe_to_jsval(cx.into(), rooted_error.handle_mut(), CanGc::from_cx(cx));
2409
2410 port.cross_realm_transform_send_error(rooted_error.handle(), CanGc::from_cx(cx));
2412
2413 self.controller
2415 .error(rooted_error.handle(), CanGc::from_cx(cx));
2416
2417 global.disentangle_port(port, CanGc::from_cx(cx));
2419 }
2420}
2421
2422#[expect(unsafe_code)]
2423pub(crate) fn get_read_promise_done(
2425 cx: SafeJSContext,
2426 v: &SafeHandleValue,
2427 can_gc: CanGc,
2428) -> Result<bool, Error> {
2429 if !v.is_object() {
2430 return Err(Error::Type(c"Unknown format for done property.".to_owned()));
2431 }
2432 unsafe {
2433 rooted!(in(*cx) let object = v.to_object());
2434 rooted!(in(*cx) let mut done = UndefinedValue());
2435 match get_dictionary_property(*cx, object.handle(), c"done", done.handle_mut(), can_gc) {
2436 Ok(true) => match bool::safe_from_jsval(cx, done.handle(), (), can_gc) {
2437 Ok(ConversionResult::Success(val)) => Ok(val),
2438 Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.into_owned())),
2439 _ => Err(Error::Type(c"Unknown format for done property.".to_owned())),
2440 },
2441 Ok(false) => Err(Error::Type(c"Promise has no done property.".to_owned())),
2442 Err(()) => Err(Error::JSFailed),
2443 }
2444 }
2445}
2446
2447#[expect(unsafe_code)]
2448pub(crate) fn get_read_promise_bytes(
2450 cx: SafeJSContext,
2451 v: &SafeHandleValue,
2452 can_gc: CanGc,
2453) -> Result<Vec<u8>, Error> {
2454 if !v.is_object() {
2455 return Err(Error::Type(
2456 c"Unknown format for for bytes read.".to_owned(),
2457 ));
2458 }
2459 unsafe {
2460 rooted!(in(*cx) let object = v.to_object());
2461 rooted!(in(*cx) let mut bytes = UndefinedValue());
2462 match get_dictionary_property(*cx, object.handle(), c"value", bytes.handle_mut(), can_gc) {
2463 Ok(true) => {
2464 match Vec::<u8>::safe_from_jsval(
2465 cx,
2466 bytes.handle(),
2467 ConversionBehavior::EnforceRange,
2468 can_gc,
2469 ) {
2470 Ok(ConversionResult::Success(val)) => Ok(val),
2471 Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.into_owned())),
2472 _ => Err(Error::Type(c"Unknown format for bytes read.".to_owned())),
2473 }
2474 },
2475 Ok(false) => Err(Error::Type(c"Promise has no value property.".to_owned())),
2476 Err(()) => Err(Error::JSFailed),
2477 }
2478 }
2479}
2480
2481pub(crate) fn bytes_from_chunk_jsval(
2485 cx: SafeJSContext,
2486 chunk: &RootedTraceableBox<Heap<JSVal>>,
2487 can_gc: CanGc,
2488) -> Result<Vec<u8>, Error> {
2489 match Vec::<u8>::safe_from_jsval(cx, chunk.handle(), ConversionBehavior::EnforceRange, can_gc) {
2490 Ok(ConversionResult::Success(vec)) => Ok(vec),
2491 Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.into_owned())),
2492 _ => Err(Error::Type(c"Unknown format for bytes read.".to_owned())),
2493 }
2494}
2495
2496impl Transferable for ReadableStream {
2498 type Index = MessagePortIndex;
2499 type Data = MessagePortImpl;
2500
2501 fn transfer(
2503 &self,
2504 cx: &mut js::context::JSContext,
2505 ) -> Fallible<(MessagePortId, MessagePortImpl)> {
2506 if self.is_locked() {
2509 return Err(Error::DataClone(None));
2510 }
2511
2512 let global = self.global();
2513 let mut realm = enter_auto_realm(cx, &*global);
2514 let mut realm = realm.current_realm();
2515 let cx = &mut realm;
2516
2517 let port_1 = MessagePort::new(&global, CanGc::from_cx(cx));
2519 global.track_message_port(&port_1, None);
2520
2521 let port_2 = MessagePort::new(&global, CanGc::from_cx(cx));
2523 global.track_message_port(&port_2, None);
2524
2525 global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
2527
2528 let writable = WritableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
2530
2531 writable.setup_cross_realm_transform_writable(cx, &port_1);
2533
2534 let promise = self.pipe_to(cx, &global, &writable, false, false, false, None);
2536
2537 promise.set_promise_is_handled();
2539
2540 port_2.transfer(cx)
2542 }
2543
2544 fn transfer_receive(
2546 cx: &mut js::context::JSContext,
2547 owner: &GlobalScope,
2548 id: MessagePortId,
2549 port_impl: MessagePortImpl,
2550 ) -> Result<DomRoot<Self>, ()> {
2551 let value = ReadableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
2554
2555 let transferred_port = MessagePort::transfer_receive(cx, owner, id, port_impl)?;
2562
2563 value.setup_cross_realm_transform_readable(cx, &transferred_port);
2565 Ok(value)
2566 }
2567
2568 fn serialized_storage<'a>(
2570 data: StructuredData<'a, '_>,
2571 ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
2572 match data {
2573 StructuredData::Reader(r) => &mut r.port_impls,
2574 StructuredData::Writer(w) => &mut w.ports,
2575 }
2576 }
2577}