1use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr::{self};
8use std::rc::Rc;
9
10use servo_base::generic_channel::GenericSharedMemory;
11use servo_base::id::{MessagePortId, MessagePortIndex};
12use servo_constellation_traits::MessagePortImpl;
13use dom_struct::dom_struct;
14use js::context::JSContext;
15use js::jsapi::{Heap, JSObject};
16use js::jsval::{JSVal, ObjectValue, UndefinedValue};
17use js::realm::CurrentRealm;
18use js::rust::{
19 HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
20 MutableHandleValue as SafeMutableHandleValue,
21};
22use js::typedarray::ArrayBufferViewU8;
23use rustc_hash::FxHashMap;
24use script_bindings::conversions::SafeToJSValConvertible;
25
26use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
27use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{
28 ReadableStreamGetReaderOptions, ReadableStreamMethods, ReadableStreamReaderMode,
29 ReadableWritablePair, StreamPipeOptions,
30};
31use script_bindings::str::DOMString;
32
33use crate::dom::domexception::{DOMErrorName, DOMException};
34use script_bindings::conversions::{is_array_like, StringificationBehavior};
35use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
36use crate::dom::abortsignal::{AbortAlgorithm, AbortSignal};
37use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods;
38use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultController_Binding::ReadableStreamDefaultControllerMethods;
39use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource;
40use crate::dom::bindings::conversions::{ConversionBehavior, ConversionResult, SafeFromJSValConvertible};
41use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
42use crate::dom::bindings::codegen::GenericBindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriter_Binding::WritableStreamDefaultWriterMethods;
43use crate::dom::stream::writablestream::WritableStream;
44use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultReaderOrReadableStreamBYOBReader as ReadableStreamReader;
45use crate::dom::bindings::reflector::DomGlobal;
46use script_bindings::reflector::{Reflector, reflect_dom_object_with_proto};
47use crate::dom::bindings::root::{DomRoot, MutNullableDom, Dom};
48use crate::dom::bindings::trace::RootedTraceableBox;
49use crate::dom::bindings::utils::get_dictionary_property;
50use crate::dom::stream::byteteeunderlyingsource::{ByteTeeCancelAlgorithm, ByteTeePullAlgorithm, ByteTeeUnderlyingSource};
51use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
52use crate::dom::stream::readablestreamgenericreader::ReadableStreamGenericReader;
53use crate::dom::globalscope::GlobalScope;
54use crate::dom::promise::{wait_for_all_promise, Promise};
55use crate::dom::stream::readablebytestreamcontroller::ReadableByteStreamController;
56use crate::dom::stream::readablestreambyobreader::ReadableStreamBYOBReader;
57use crate::dom::stream::readablestreamdefaultcontroller::ReadableStreamDefaultController;
58use crate::dom::stream::readablestreamdefaultreader::{ReadRequest, ReadableStreamDefaultReader};
59use crate::dom::stream::defaultteeunderlyingsource::DefaultTeeCancelAlgorithm;
60use crate::dom::types::DefaultTeeUnderlyingSource;
61use crate::dom::stream::underlyingsourcecontainer::UnderlyingSourceType;
62use crate::dom::stream::writablestreamdefaultwriter::WritableStreamDefaultWriter;
63use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
64use crate::dom::messageport::MessagePort;
65use crate::realms::{enter_auto_realm};
66use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
67use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
68use crate::dom::bindings::transferable::Transferable;
69use crate::dom::bindings::structuredclone::StructuredData;
70
71use crate::dom::bindings::buffer_source::HeapBufferSource;
72use super::readablestreambyobreader::ReadIntoRequest;
73
74#[derive(Clone, Debug, Default, MallocSizeOf, PartialEq)]
76enum PipeToState {
77 #[default]
79 Starting,
80 PendingReady,
82 PendingRead,
84 ShuttingDownWithPendingWrites(Option<ShutdownAction>),
87 ShuttingDownPendingAction,
91 Finalized,
94}
95
96#[derive(Clone, Debug, MallocSizeOf, PartialEq)]
98enum ShutdownAction {
99 WritableStreamAbort,
101 ReadableStreamCancel,
103 WritableStreamDefaultWriterCloseWithErrorPropagation,
105 Abort,
107}
108
109impl js::gc::Rootable for PipeTo {}
110
111#[derive(Clone, JSTraceable, MallocSizeOf)]
120#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
121pub(crate) struct PipeTo {
122 reader: Dom<ReadableStreamDefaultReader>,
124
125 writer: Dom<WritableStreamDefaultWriter>,
127
128 #[ignore_malloc_size_of = "nested Rc"]
131 pending_writes: Rc<RefCell<VecDeque<Rc<Promise>>>>,
132
133 #[conditional_malloc_size_of]
135 #[no_trace]
136 state: Rc<RefCell<PipeToState>>,
137
138 prevent_abort: bool,
140
141 prevent_cancel: bool,
143
144 prevent_close: bool,
146
147 #[conditional_malloc_size_of]
150 shutting_down: Rc<Cell<bool>>,
151
152 #[ignore_malloc_size_of = "mozjs"]
155 abort_reason: Rc<Heap<JSVal>>,
156
157 #[ignore_malloc_size_of = "mozjs"]
160 shutdown_error: Rc<RefCell<Option<Heap<JSVal>>>>,
161
162 #[ignore_malloc_size_of = "nested Rc"]
165 shutdown_action_promise: Rc<RefCell<Option<Rc<Promise>>>>,
166
167 #[conditional_malloc_size_of]
170 result_promise: Rc<Promise>,
171}
172
173impl PipeTo {
174 pub(crate) fn abort_with_reason(
177 &self,
178 cx: &mut CurrentRealm,
179 global: &GlobalScope,
180 reason: SafeHandleValue,
181 ) {
182 if self.shutting_down.get() {
184 return;
185 }
186
187 self.abort_reason.set(reason.get());
191
192 self.set_shutdown_error(reason);
197
198 self.shutdown(cx, global, Some(ShutdownAction::Abort));
204 }
205}
206
207impl Callback for PipeTo {
208 #[expect(unsafe_code)]
217 fn callback(&self, cx: &mut CurrentRealm, result: SafeHandleValue) {
218 let global = self.reader.global();
219
220 self.pending_writes.borrow_mut().retain(|p| {
226 let pending = p.is_pending();
227 if !pending {
228 p.set_promise_is_handled();
229 }
230 pending
231 });
232
233 let state_before_checks = self.state.borrow().clone();
235
236 if state_before_checks == PipeToState::PendingRead {
243 let source = self.reader.get_stream().expect("Source stream must be set");
244 if source.is_closed() {
245 let dest = self
246 .writer
247 .get_stream()
248 .expect("Destination stream must be set");
249
250 if dest.is_writable() && !dest.close_queued_or_in_flight() {
253 let Ok(done) = get_read_promise_done(cx.into(), &result, CanGc::from_cx(cx))
254 else {
255 return;
262 };
263
264 if !done {
265 self.write_chunk(cx, &global, result);
267 }
268 }
269 }
270 }
271
272 self.check_and_propagate_errors_forward(cx, &global);
273 self.check_and_propagate_errors_backward(cx, &global);
274 self.check_and_propagate_closing_forward(cx, &global);
275 self.check_and_propagate_closing_backward(cx, &global);
276
277 let state = self.state.borrow().clone();
279
280 if state != state_before_checks {
284 return;
285 }
286
287 match state {
288 PipeToState::Starting => unreachable!("PipeTo should not be in the Starting state."),
289 PipeToState::PendingReady => {
290 self.read_chunk(cx, &global);
292 },
293 PipeToState::PendingRead => {
294 self.write_chunk(cx, &global, result);
296
297 if self.shutting_down.get() {
299 return;
300 }
301
302 self.wait_for_writer_ready(cx, &global);
304 },
305 PipeToState::ShuttingDownWithPendingWrites(action) => {
306 if let Some(write) = self.pending_writes.borrow_mut().front().cloned() {
309 self.wait_on_pending_write(cx, &global, write);
310 return;
311 }
312
313 if let Some(action) = action {
315 self.perform_action(cx, &global, action);
317 } else {
318 self.finalize(cx, &global);
320 }
321 },
322 PipeToState::ShuttingDownPendingAction => {
323 let Some(ref promise) = *self.shutdown_action_promise.borrow() else {
324 unreachable!();
325 };
326 if promise.is_pending() {
327 return;
331 }
332
333 let is_array_like = {
334 if !result.is_object() {
335 false
336 } else {
337 unsafe { is_array_like::<crate::DomTypeHolder>(cx.raw_cx(), result) }
338 }
339 };
340
341 if !result.is_undefined() && !is_array_like {
343 self.set_shutdown_error(result);
353 }
354 self.finalize(cx, &global);
355 },
356 PipeToState::Finalized => {},
357 }
358 }
359}
360
361impl PipeTo {
362 fn set_shutdown_error(&self, error: SafeHandleValue) {
365 *self.shutdown_error.borrow_mut() = Some(Heap::default());
366 let Some(ref heap) = *self.shutdown_error.borrow() else {
367 unreachable!("Option set to Some(heap) above.");
368 };
369 heap.set(error.get())
370 }
371
372 fn wait_for_writer_ready(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
375 {
376 let mut state = self.state.borrow_mut();
377 *state = PipeToState::PendingReady;
378 }
379
380 let ready_promise = self.writer.Ready();
381 if ready_promise.is_fulfilled() {
382 self.read_chunk(cx, global);
383 } else {
384 let handler = PromiseNativeHandler::new(
385 global,
386 Some(Box::new(self.clone())),
387 Some(Box::new(self.clone())),
388 CanGc::from_cx(cx),
389 );
390 ready_promise.append_native_handler(cx, &handler);
391
392 let closed_promise = self.reader.Closed();
396 closed_promise.append_native_handler(cx, &handler);
397 }
398 }
399
400 fn read_chunk(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
402 *self.state.borrow_mut() = PipeToState::PendingRead;
403 let chunk_promise = self.reader.Read(cx);
404 let handler = PromiseNativeHandler::new(
405 global,
406 Some(Box::new(self.clone())),
407 Some(Box::new(self.clone())),
408 CanGc::from_cx(cx),
409 );
410 chunk_promise.append_native_handler(cx, &handler);
411
412 let ready_promise = self.writer.Closed();
415 ready_promise.append_native_handler(cx, &handler);
416 }
417
418 #[expect(unsafe_code)]
421 fn write_chunk(
422 &self,
423 cx: &mut JSContext,
424 global: &GlobalScope,
425 chunk: SafeHandleValue,
426 ) -> bool {
427 if chunk.is_object() {
428 rooted!(&in(cx) let object = chunk.to_object());
429 rooted!(&in(cx) let mut bytes = UndefinedValue());
430 let has_value = unsafe {
431 get_dictionary_property(
432 cx.raw_cx(),
433 object.handle(),
434 c"value",
435 bytes.handle_mut(),
436 CanGc::from_cx(cx),
437 )
438 .expect("Chunk should have a value.")
439 };
440 if has_value {
441 let write_promise = self.writer.write(cx, global, bytes.handle());
443 self.pending_writes.borrow_mut().push_back(write_promise);
444 return true;
445 }
446 }
447 false
448 }
449
450 fn wait_on_pending_write(
454 &self,
455 cx: &mut CurrentRealm,
456 global: &GlobalScope,
457 promise: Rc<Promise>,
458 ) {
459 let handler = PromiseNativeHandler::new(
460 global,
461 Some(Box::new(self.clone())),
462 Some(Box::new(self.clone())),
463 CanGc::from_cx(cx),
464 );
465 promise.append_native_handler(cx, &handler);
466 }
467
468 fn check_and_propagate_errors_forward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
471 if self.shutting_down.get() {
474 return;
475 }
476
477 let source = self
479 .reader
480 .get_stream()
481 .expect("Reader should still have a stream");
482 if source.is_errored() {
483 rooted!(&in(cx) let mut source_error = UndefinedValue());
484 source.get_stored_error(source_error.handle_mut());
485 self.set_shutdown_error(source_error.handle());
486
487 if !self.prevent_abort {
489 self.shutdown(cx, global, Some(ShutdownAction::WritableStreamAbort))
492 } else {
493 self.shutdown(cx, global, None);
495 }
496 }
497 }
498
499 fn check_and_propagate_errors_backward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
502 if self.shutting_down.get() {
505 return;
506 }
507
508 let dest = self
510 .writer
511 .get_stream()
512 .expect("Writer should still have a stream");
513 if dest.is_errored() {
514 rooted!(&in(cx) let mut dest_error = UndefinedValue());
515 dest.get_stored_error(dest_error.handle_mut());
516 self.set_shutdown_error(dest_error.handle());
517
518 if !self.prevent_cancel {
520 self.shutdown(cx, global, Some(ShutdownAction::ReadableStreamCancel))
523 } else {
524 self.shutdown(cx, global, None);
526 }
527 }
528 }
529
530 fn check_and_propagate_closing_forward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
533 if self.shutting_down.get() {
536 return;
537 }
538
539 let source = self
541 .reader
542 .get_stream()
543 .expect("Reader should still have a stream");
544 if source.is_closed() {
545 if !self.prevent_close {
547 self.shutdown(
550 cx,
551 global,
552 Some(ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation),
553 )
554 } else {
555 self.shutdown(cx, global, None);
557 }
558 }
559 }
560
561 fn check_and_propagate_closing_backward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
564 if self.shutting_down.get() {
567 return;
568 }
569
570 let dest = self
573 .writer
574 .get_stream()
575 .expect("Writer should still have a stream");
576 if dest.close_queued_or_in_flight() || dest.is_closed() {
577 rooted!(&in(cx) let mut dest_closed = UndefinedValue());
582 let error =
583 Error::Type(c"Destination is closed or has closed queued or in flight".to_owned());
584 error.to_jsval(
585 cx.into(),
586 global,
587 dest_closed.handle_mut(),
588 CanGc::from_cx(cx),
589 );
590 self.set_shutdown_error(dest_closed.handle());
591
592 if !self.prevent_cancel {
594 self.shutdown(cx, global, Some(ShutdownAction::ReadableStreamCancel))
597 } else {
598 self.shutdown(cx, global, None);
600 }
601 }
602 }
603
604 fn shutdown(
608 &self,
609 cx: &mut CurrentRealm,
610 global: &GlobalScope,
611 action: Option<ShutdownAction>,
612 ) {
613 if !self.shutting_down.replace(true) {
616 let dest = self.writer.get_stream().expect("Stream must be set");
617 if dest.is_writable() && !dest.close_queued_or_in_flight() {
620 if let Some(write) = self.pending_writes.borrow_mut().front() {
626 *self.state.borrow_mut() = PipeToState::ShuttingDownWithPendingWrites(action);
627 self.wait_on_pending_write(cx, global, write.clone());
628 return;
629 }
630 }
631
632 if let Some(action) = action {
634 self.perform_action(cx, global, action);
636 } else {
637 self.finalize(cx, global);
639 }
640 }
641 }
642
643 fn perform_action(&self, cx: &mut CurrentRealm, global: &GlobalScope, action: ShutdownAction) {
646 rooted!(&in(cx) let mut error = UndefinedValue());
647 if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
648 error.set(shutdown_error.get());
649 }
650
651 *self.state.borrow_mut() = PipeToState::ShuttingDownPendingAction;
652
653 let promise = match action {
655 ShutdownAction::WritableStreamAbort => {
656 let dest = self.writer.get_stream().expect("Stream must be set");
657 dest.abort(cx, global, error.handle())
658 },
659 ShutdownAction::ReadableStreamCancel => {
660 let source = self
661 .reader
662 .get_stream()
663 .expect("Reader should have a stream.");
664 source.cancel(cx, global, error.handle())
665 },
666 ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation => {
667 self.writer.close_with_error_propagation(cx, global)
668 },
669 ShutdownAction::Abort => {
670 rooted!(&in(cx) let mut error = UndefinedValue());
675 error.set(self.abort_reason.get());
676
677 let mut actions = vec![];
679
680 if !self.prevent_abort {
682 let dest = self
683 .writer
684 .get_stream()
685 .expect("Destination stream must be set");
686
687 let promise = if dest.is_writable() {
689 dest.abort(cx, global, error.handle())
691 } else {
692 Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
694 };
695 actions.push(promise);
696 }
697
698 if !self.prevent_cancel {
700 let source = self.reader.get_stream().expect("Source stream must be set");
701
702 let promise = if source.is_readable() {
704 source.cancel(cx, global, error.handle())
706 } else {
707 Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
709 };
710 actions.push(promise);
711 }
712
713 wait_for_all_promise(cx, global, actions)
717 },
718 };
719
720 let handler = PromiseNativeHandler::new(
723 global,
724 Some(Box::new(self.clone())),
725 Some(Box::new(self.clone())),
726 CanGc::from_cx(cx),
727 );
728 promise.append_native_handler(cx, &handler);
729 *self.shutdown_action_promise.borrow_mut() = Some(promise);
730 }
731
732 fn finalize(&self, cx: &mut JSContext, global: &GlobalScope) {
734 *self.state.borrow_mut() = PipeToState::Finalized;
735
736 self.writer.release(cx.into(), global, CanGc::from_cx(cx));
738
739 self.reader
745 .release(cx)
746 .expect("Releasing the reader should not fail");
747
748 if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
754 rooted!(&in(cx) let mut error = UndefinedValue());
755 error.set(shutdown_error.get());
756 self.result_promise
758 .reject_native(&error.handle(), CanGc::from_cx(cx));
759 } else {
760 self.result_promise.resolve_native(&(), CanGc::from_cx(cx));
762 }
763 }
764}
765
766#[derive(Clone, JSTraceable, MallocSizeOf)]
769struct SourceCancelPromiseFulfillmentHandler {
770 #[conditional_malloc_size_of]
771 result: Rc<Promise>,
772}
773
774impl Callback for SourceCancelPromiseFulfillmentHandler {
775 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
779 self.result.resolve_native(&(), CanGc::from_cx(cx));
780 }
781}
782
783#[derive(Clone, JSTraceable, MallocSizeOf)]
786struct SourceCancelPromiseRejectionHandler {
787 #[conditional_malloc_size_of]
788 result: Rc<Promise>,
789}
790
791impl Callback for SourceCancelPromiseRejectionHandler {
792 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
796 self.result.reject_native(&v, CanGc::from_cx(cx));
797 }
798}
799
800#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf, PartialEq)]
802pub(crate) enum ReadableStreamState {
803 #[default]
804 Readable,
805 Closed,
806 Errored,
807}
808
809#[derive(JSTraceable, MallocSizeOf)]
811#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
812pub(crate) enum ControllerType {
813 Byte(MutNullableDom<ReadableByteStreamController>),
815 Default(MutNullableDom<ReadableStreamDefaultController>),
817}
818
819#[derive(JSTraceable, MallocSizeOf)]
821#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
822pub(crate) enum ReaderType {
823 #[allow(clippy::upper_case_acronyms)]
825 BYOB(MutNullableDom<ReadableStreamBYOBReader>),
826 Default(MutNullableDom<ReadableStreamDefaultReader>),
828}
829
830impl Eq for ReaderType {}
831impl PartialEq for ReaderType {
832 fn eq(&self, other: &Self) -> bool {
833 matches!(
834 (self, other),
835 (ReaderType::BYOB(_), ReaderType::BYOB(_)) |
836 (ReaderType::Default(_), ReaderType::Default(_))
837 )
838 }
839}
840
841pub(crate) fn create_readable_stream(
843 cx: &mut JSContext,
844 global: &GlobalScope,
845 underlying_source_type: UnderlyingSourceType,
846 queuing_strategy: Option<Rc<QueuingStrategySize>>,
847 high_water_mark: Option<f64>,
848) -> DomRoot<ReadableStream> {
849 let high_water_mark = high_water_mark.unwrap_or(1.0);
851
852 let size_algorithm = queuing_strategy.unwrap_or(extract_size_algorithm(
854 &QueuingStrategy::empty(),
855 CanGc::from_cx(cx),
856 ));
857
858 assert!(high_water_mark >= 0.0);
860
861 let stream = ReadableStream::new_with_proto(global, None, CanGc::from_cx(cx));
864
865 let controller = ReadableStreamDefaultController::new(
867 global,
868 underlying_source_type,
869 high_water_mark,
870 size_algorithm,
871 CanGc::from_cx(cx),
872 );
873
874 controller
877 .setup(cx, stream.clone())
878 .expect("Setup of default controller cannot fail");
879
880 stream
882}
883
884fn readable_byte_stream_tee(
886 cx: &mut JSContext,
887 global: &GlobalScope,
888 underlying_source_type: UnderlyingSourceType,
889) -> DomRoot<ReadableStream> {
890 let tee_stream = ReadableStream::new_with_proto(global, None, CanGc::from_cx(cx));
893
894 let controller =
896 ReadableByteStreamController::new(underlying_source_type, 0.0, global, CanGc::from_cx(cx));
897
898 controller
900 .setup(cx, global, tee_stream.clone())
901 .expect("Setup of byte stream controller cannot fail");
902
903 tee_stream
905}
906
907#[dom_struct]
909pub(crate) struct ReadableStream {
910 reflector_: Reflector,
911
912 controller: RefCell<Option<ControllerType>>,
916
917 #[ignore_malloc_size_of = "mozjs"]
919 stored_error: Heap<JSVal>,
920
921 disturbed: Cell<bool>,
923
924 reader: RefCell<Option<ReaderType>>,
926
927 state: Cell<ReadableStreamState>,
929}
930
931impl ReadableStream {
932 fn new_inherited() -> ReadableStream {
934 ReadableStream {
935 reflector_: Reflector::new(),
936 controller: RefCell::new(None),
937 stored_error: Heap::default(),
938 disturbed: Default::default(),
939 reader: RefCell::new(None),
940 state: Cell::new(Default::default()),
941 }
942 }
943
944 pub(crate) fn new_with_proto(
945 global: &GlobalScope,
946 proto: Option<SafeHandleObject>,
947 can_gc: CanGc,
948 ) -> DomRoot<ReadableStream> {
949 reflect_dom_object_with_proto(
950 Box::new(ReadableStream::new_inherited()),
951 global,
952 proto,
953 can_gc,
954 )
955 }
956
957 pub(crate) fn set_default_controller(&self, controller: &ReadableStreamDefaultController) {
960 *self.controller.borrow_mut() = Some(ControllerType::Default(MutNullableDom::new(Some(
961 controller,
962 ))));
963 }
964
965 pub(crate) fn set_byte_controller(&self, controller: &ReadableByteStreamController) {
968 *self.controller.borrow_mut() =
969 Some(ControllerType::Byte(MutNullableDom::new(Some(controller))));
970 }
971
972 pub(crate) fn assert_no_controller(&self) {
975 let has_no_controller = self.controller.borrow().is_none();
976 assert!(has_no_controller);
977 }
978
979 pub(crate) fn new_from_bytes(
981 cx: &mut JSContext,
982 global: &GlobalScope,
983 bytes: Vec<u8>,
984 ) -> Fallible<DomRoot<ReadableStream>> {
985 let stream = ReadableStream::new_with_external_underlying_source(
986 cx,
987 global,
988 UnderlyingSourceType::Memory(bytes.len()),
989 )?;
990 stream.enqueue_native(cx, bytes);
991 stream.controller_close_native(cx);
992 Ok(stream)
993 }
994
995 pub(crate) fn new_with_external_underlying_source(
998 cx: &mut JSContext,
999 global: &GlobalScope,
1000 source: UnderlyingSourceType,
1001 ) -> Fallible<DomRoot<ReadableStream>> {
1002 assert!(source.is_native());
1003 let stream = ReadableStream::new_with_proto(global, None, CanGc::from_cx(cx));
1004 let controller = ReadableStreamDefaultController::new(
1005 global,
1006 source,
1007 1.0,
1008 extract_size_algorithm(&QueuingStrategy::empty(), CanGc::from_cx(cx)),
1009 CanGc::from_cx(cx),
1010 );
1011 controller.setup(cx, stream.clone())?;
1012 Ok(stream)
1013 }
1014
1015 pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
1017 match self.controller.borrow().as_ref() {
1018 Some(ControllerType::Default(controller)) => {
1019 let controller = controller
1020 .get()
1021 .ok_or_else(|| Error::Type(c"Stream should have controller.".to_owned()))?;
1022 controller.perform_release_steps()
1023 },
1024 Some(ControllerType::Byte(controller)) => {
1025 let controller = controller
1026 .get()
1027 .ok_or_else(|| Error::Type(c"Stream should have controller.".to_owned()))?;
1028 controller.perform_release_steps()
1029 },
1030 None => Err(Error::Type(c"Stream should have controller.".to_owned())),
1031 }
1032 }
1033
1034 pub(crate) fn perform_pull_steps(&self, cx: &mut JSContext, read_request: &ReadRequest) {
1038 match self.controller.borrow().as_ref() {
1039 Some(ControllerType::Default(controller)) => controller
1040 .get()
1041 .expect("Stream should have controller.")
1042 .perform_pull_steps(cx, read_request),
1043 Some(ControllerType::Byte(controller)) => controller
1044 .get()
1045 .expect("Stream should have controller.")
1046 .perform_pull_steps(cx, read_request),
1047 None => {
1048 unreachable!("Stream does not have a controller.");
1049 },
1050 }
1051 }
1052
1053 pub(crate) fn perform_pull_into(
1057 &self,
1058 cx: &mut JSContext,
1059 read_into_request: &ReadIntoRequest,
1060 view: &HeapBufferSource<ArrayBufferViewU8>,
1061 min: u64,
1062 ) {
1063 match self.controller.borrow().as_ref() {
1064 Some(ControllerType::Byte(controller)) => controller
1065 .get()
1066 .expect("Stream should have controller.")
1067 .perform_pull_into(cx, read_into_request, view, min),
1068 _ => {
1069 unreachable!(
1070 "Pulling a chunk from a stream with a default controller using a BYOB reader"
1071 )
1072 },
1073 }
1074 }
1075
1076 pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
1078 match self.reader.borrow().as_ref() {
1079 Some(ReaderType::Default(reader)) => {
1080 let Some(reader) = reader.get() else {
1081 panic!("Attempt to add a read request without having first acquired a reader.");
1082 };
1083
1084 assert!(self.is_readable());
1086
1087 reader.add_read_request(read_request);
1089 },
1090 _ => {
1091 unreachable!("Adding a read request can only be done on a default reader.")
1092 },
1093 }
1094 }
1095
1096 pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
1098 match self.reader.borrow().as_ref() {
1099 Some(ReaderType::BYOB(reader)) => {
1101 let Some(reader) = reader.get() else {
1102 unreachable!(
1103 "Attempt to add a read into request without having first acquired a reader."
1104 );
1105 };
1106
1107 assert!(self.is_readable() || self.is_closed());
1109
1110 reader.add_read_into_request(read_request);
1112 },
1113 _ => {
1114 unreachable!("Adding a read into request can only be done on a BYOB reader.")
1115 },
1116 }
1117 }
1118
1119 pub(crate) fn enqueue_native(&self, cx: &mut JSContext, bytes: Vec<u8>) {
1122 match self.controller.borrow().as_ref() {
1123 Some(ControllerType::Default(controller)) => controller
1124 .get()
1125 .expect("Stream should have controller.")
1126 .enqueue_native(cx, bytes),
1127 _ => {
1128 unreachable!(
1129 "Enqueueing chunk to a stream from Rust on other than default controller"
1130 );
1131 },
1132 }
1133 }
1134
1135 pub(crate) fn error(&self, cx: &mut JSContext, e: SafeHandleValue) {
1137 assert!(self.is_readable());
1139
1140 self.state.set(ReadableStreamState::Errored);
1142
1143 self.stored_error.set(e.get());
1145
1146 let default_reader = {
1149 let reader_ref = self.reader.borrow();
1150 match reader_ref.as_ref() {
1151 Some(ReaderType::Default(reader)) => reader.get(),
1152 _ => None,
1153 }
1154 };
1155
1156 if let Some(reader) = default_reader {
1157 reader.error(cx, e);
1159 return;
1160 }
1161
1162 let byob_reader = {
1163 let reader_ref = self.reader.borrow();
1164 match reader_ref.as_ref() {
1165 Some(ReaderType::BYOB(reader)) => reader.get(),
1166 _ => None,
1167 }
1168 };
1169
1170 if let Some(reader) = byob_reader {
1171 reader.error_read_into_requests(e, CanGc::from_cx(cx));
1173 }
1174
1175 }
1177
1178 pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
1180 handle_mut.set(self.stored_error.get());
1181 }
1182
1183 pub(crate) fn error_native(&self, cx: &mut JSContext, error: Error) {
1186 rooted!(&in(cx) let mut error_val = UndefinedValue());
1187 error.to_jsval(
1188 cx.into(),
1189 &self.global(),
1190 error_val.handle_mut(),
1191 CanGc::from_cx(cx),
1192 );
1193 self.error(cx, error_val.handle());
1194 }
1195
1196 pub(crate) fn controller_close_native(&self, cx: &mut JSContext) {
1199 match self.controller.borrow().as_ref() {
1200 Some(ControllerType::Default(controller)) => {
1201 let _ = controller
1202 .get()
1203 .expect("Stream should have controller.")
1204 .Close(cx);
1205 },
1206 _ => {
1207 unreachable!("Native closing is only done on default controllers.")
1208 },
1209 }
1210 }
1211
1212 pub(crate) fn in_memory(&self) -> bool {
1215 match self.controller.borrow().as_ref() {
1216 Some(ControllerType::Default(controller)) => controller
1217 .get()
1218 .expect("Stream should have controller.")
1219 .in_memory(),
1220 _ => {
1221 unreachable!(
1222 "Checking if source is in memory for a stream with a non-default controller"
1223 )
1224 },
1225 }
1226 }
1227
1228 pub(crate) fn get_in_memory_bytes(&self) -> Option<GenericSharedMemory> {
1231 match self.controller.borrow().as_ref() {
1232 Some(ControllerType::Default(controller)) => controller
1233 .get()
1234 .expect("Stream should have controller.")
1235 .get_in_memory_bytes()
1236 .as_deref()
1237 .map(GenericSharedMemory::from_bytes),
1238 _ => {
1239 unreachable!("Getting in-memory bytes for a stream with a non-default controller")
1240 },
1241 }
1242 }
1243
1244 pub(crate) fn acquire_default_reader(
1249 &self,
1250 can_gc: CanGc,
1251 ) -> Fallible<DomRoot<ReadableStreamDefaultReader>> {
1252 let reader = ReadableStreamDefaultReader::new(&self.global(), can_gc);
1254
1255 reader.set_up(self, &self.global(), can_gc)?;
1257
1258 Ok(reader)
1260 }
1261
1262 pub(crate) fn acquire_byob_reader(
1264 &self,
1265 can_gc: CanGc,
1266 ) -> Fallible<DomRoot<ReadableStreamBYOBReader>> {
1267 let reader = ReadableStreamBYOBReader::new(&self.global(), can_gc);
1269 reader.set_up(self, &self.global(), can_gc)?;
1271
1272 Ok(reader)
1274 }
1275
1276 pub(crate) fn get_default_controller(&self) -> DomRoot<ReadableStreamDefaultController> {
1277 match self.controller.borrow().as_ref() {
1278 Some(ControllerType::Default(controller)) => {
1279 controller.get().expect("Stream should have controller.")
1280 },
1281 _ => {
1282 unreachable!(
1283 "Getting default controller for a stream with a non-default controller"
1284 )
1285 },
1286 }
1287 }
1288
1289 pub(crate) fn get_byte_controller(&self) -> DomRoot<ReadableByteStreamController> {
1290 match self.controller.borrow().as_ref() {
1291 Some(ControllerType::Byte(controller)) => {
1292 controller.get().expect("Stream should have controller.")
1293 },
1294 _ => {
1295 unreachable!("Getting byte controller for a stream with a non-byte controller")
1296 },
1297 }
1298 }
1299
1300 pub(crate) fn get_default_reader(&self) -> DomRoot<ReadableStreamDefaultReader> {
1301 match self.reader.borrow().as_ref() {
1302 Some(ReaderType::Default(reader)) => reader.get().expect("Stream should have reader."),
1303 _ => {
1304 unreachable!("Getting default reader for a stream with a non-default reader")
1305 },
1306 }
1307 }
1308
1309 pub(crate) fn read_a_chunk(&self, cx: &mut JSContext) -> Rc<Promise> {
1315 match self.reader.borrow().as_ref() {
1316 Some(ReaderType::Default(reader)) => {
1317 let Some(reader) = reader.get() else {
1318 unreachable!(
1319 "Attempt to read stream chunk without having first acquired a reader."
1320 );
1321 };
1322 reader.Read(cx)
1323 },
1324 _ => {
1325 unreachable!("Native reading of a chunk can only be done with a default reader.")
1326 },
1327 }
1328 }
1329
1330 pub(crate) fn stop_reading(&self, cx: &mut JSContext) {
1335 let reader_ref = self.reader.borrow();
1336
1337 match reader_ref.as_ref() {
1338 Some(ReaderType::Default(reader)) => {
1339 let Some(reader) = reader.get() else {
1340 unreachable!("Attempt to stop reading without having first acquired a reader.");
1341 };
1342
1343 drop(reader_ref);
1344 reader.release(cx).expect("Reader release cannot fail.");
1345 },
1346 _ => {
1347 unreachable!("Native stop reading can only be done with a default reader.")
1348 },
1349 }
1350 }
1351
1352 pub(crate) fn is_locked(&self) -> bool {
1354 match self.reader.borrow().as_ref() {
1355 Some(ReaderType::Default(reader)) => reader.get().is_some(),
1356 Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1357 None => false,
1358 }
1359 }
1360
1361 pub(crate) fn is_disturbed(&self) -> bool {
1362 self.disturbed.get()
1363 }
1364
1365 pub(crate) fn set_is_disturbed(&self, disturbed: bool) {
1366 self.disturbed.set(disturbed);
1367 }
1368
1369 pub(crate) fn is_closed(&self) -> bool {
1370 self.state.get() == ReadableStreamState::Closed
1371 }
1372
1373 pub(crate) fn is_errored(&self) -> bool {
1374 self.state.get() == ReadableStreamState::Errored
1375 }
1376
1377 pub(crate) fn is_readable(&self) -> bool {
1378 self.state.get() == ReadableStreamState::Readable
1379 }
1380
1381 pub(crate) fn has_default_reader(&self) -> bool {
1382 match self.reader.borrow().as_ref() {
1383 Some(ReaderType::Default(reader)) => reader.get().is_some(),
1384 _ => false,
1385 }
1386 }
1387
1388 pub(crate) fn has_byob_reader(&self) -> bool {
1389 match self.reader.borrow().as_ref() {
1390 Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1391 _ => false,
1392 }
1393 }
1394
1395 pub(crate) fn has_byte_controller(&self) -> bool {
1396 match self.controller.borrow().as_ref() {
1397 Some(ControllerType::Byte(controller)) => controller.get().is_some(),
1398 _ => false,
1399 }
1400 }
1401
1402 pub(crate) fn get_num_read_requests(&self) -> usize {
1404 match self.reader.borrow().as_ref() {
1405 Some(ReaderType::Default(reader)) => {
1406 let reader = reader
1407 .get()
1408 .expect("Stream must have a reader when getting the number of read requests.");
1409 reader.get_num_read_requests()
1410 },
1411 _ => unreachable!(
1412 "Stream must have a default reader when get num read requests is called into."
1413 ),
1414 }
1415 }
1416
1417 pub(crate) fn get_num_read_into_requests(&self) -> usize {
1419 assert!(self.has_byob_reader());
1420
1421 match self.reader.borrow().as_ref() {
1422 Some(ReaderType::BYOB(reader)) => {
1423 let Some(reader) = reader.get() else {
1424 unreachable!(
1425 "Stream must have a reader when get num read into requests is called into."
1426 );
1427 };
1428 reader.get_num_read_into_requests()
1429 },
1430 _ => {
1431 unreachable!(
1432 "Stream must have a BYOB reader when get num read into requests is called into."
1433 );
1434 },
1435 }
1436 }
1437
1438 pub(crate) fn fulfill_read_request(
1440 &self,
1441 cx: &mut JSContext,
1442 chunk: SafeHandleValue,
1443 done: bool,
1444 ) {
1445 assert!(self.has_default_reader());
1447
1448 match self.reader.borrow().as_ref() {
1449 Some(ReaderType::Default(reader)) => {
1450 let reader = reader
1452 .get()
1453 .expect("Stream must have a reader when a read request is fulfilled.");
1454 assert_ne!(reader.get_num_read_requests(), 0);
1456 let request = reader.remove_read_request();
1459
1460 if done {
1461 request.close_steps(cx);
1463 } else {
1464 let result = RootedTraceableBox::new(Heap::default());
1466 result.set(*chunk);
1467 request.chunk_steps(cx, result, &self.global());
1468 }
1469 },
1470 _ => {
1471 unreachable!(
1472 "Stream must have a default reader when fulfill read requests is called into."
1473 );
1474 },
1475 }
1476 }
1477
1478 pub(crate) fn fulfill_read_into_request(
1480 &self,
1481 cx: &mut JSContext,
1482 chunk: SafeHandleValue,
1483 done: bool,
1484 ) {
1485 assert!(self.has_byob_reader());
1487
1488 match self.reader.borrow().as_ref() {
1490 Some(ReaderType::BYOB(reader)) => {
1491 let Some(reader) = reader.get() else {
1492 unreachable!(
1493 "Stream must have a reader when a read into request is fulfilled."
1494 );
1495 };
1496
1497 assert!(reader.get_num_read_into_requests() > 0);
1499
1500 let read_into_request = reader.remove_read_into_request();
1503
1504 let result = RootedTraceableBox::new(Heap::default());
1506 if done {
1507 result.set(*chunk);
1508 read_into_request.close_steps(cx, Some(result));
1509 } else {
1510 result.set(*chunk);
1512 read_into_request.chunk_steps(result, CanGc::from_cx(cx));
1513 }
1514 },
1515 _ => {
1516 unreachable!(
1517 "Stream must have a BYOB reader when fulfill read into requests is called into."
1518 );
1519 },
1520 };
1521 }
1522
1523 pub(crate) fn close(&self, cx: &mut JSContext) {
1525 assert!(self.is_readable());
1527 self.state.set(ReadableStreamState::Closed);
1529 let default_reader = {
1535 let reader_ref = self.reader.borrow();
1536 match reader_ref.as_ref() {
1537 Some(ReaderType::Default(reader)) => reader.get(),
1538 _ => None,
1539 }
1540 };
1541
1542 if let Some(reader) = default_reader {
1543 reader.close(cx);
1545 return;
1546 }
1547
1548 let byob_reader = {
1550 let reader_ref = self.reader.borrow();
1551 match reader_ref.as_ref() {
1552 Some(ReaderType::BYOB(reader)) => reader.get(),
1553 _ => None,
1554 }
1555 };
1556
1557 if let Some(reader) = byob_reader {
1558 reader.close(CanGc::from_cx(cx));
1560 }
1561
1562 }
1564
1565 pub(crate) fn cancel(
1567 &self,
1568 cx: &mut JSContext,
1569 global: &GlobalScope,
1570 reason: SafeHandleValue,
1571 ) -> Rc<Promise> {
1572 self.disturbed.set(true);
1574
1575 if self.is_closed() {
1577 return Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
1578 }
1579 if self.is_errored() {
1581 let promise = Promise::new2(cx, global);
1582 rooted!(&in(cx) let mut rval = UndefinedValue());
1583 self.stored_error
1584 .safe_to_jsval(cx.into(), rval.handle_mut(), CanGc::from_cx(cx));
1585 promise.reject_native(&rval.handle(), CanGc::from_cx(cx));
1586 return promise;
1587 }
1588 self.close(cx);
1590
1591 let byob_reader = {
1593 let reader_ref = self.reader.borrow();
1594 match reader_ref.as_ref() {
1595 Some(ReaderType::BYOB(reader)) => reader.get(),
1596 _ => None,
1597 }
1598 };
1599
1600 if let Some(reader) = byob_reader {
1601 reader.cancel(cx);
1603 }
1604
1605 let source_cancel_promise = match self.controller.borrow().as_ref() {
1608 Some(ControllerType::Default(controller)) => controller
1609 .get()
1610 .expect("Stream should have controller.")
1611 .perform_cancel_steps(cx, global, reason),
1612 Some(ControllerType::Byte(controller)) => controller
1613 .get()
1614 .expect("Stream should have controller.")
1615 .perform_cancel_steps(cx, global, reason),
1616 None => {
1617 panic!("Stream does not have a controller.");
1618 },
1619 };
1620
1621 let global = self.global();
1624 let result_promise = Promise::new2(cx, &global);
1625 let fulfillment_handler = Box::new(SourceCancelPromiseFulfillmentHandler {
1626 result: result_promise.clone(),
1627 });
1628 let rejection_handler = Box::new(SourceCancelPromiseRejectionHandler {
1629 result: result_promise.clone(),
1630 });
1631 let handler = PromiseNativeHandler::new(
1632 &global,
1633 Some(fulfillment_handler),
1634 Some(rejection_handler),
1635 CanGc::from_cx(cx),
1636 );
1637 let mut realm = enter_auto_realm(cx, &*global);
1638 let cx = &mut realm.current_realm();
1639 source_cancel_promise.append_native_handler(cx, &handler);
1640
1641 result_promise
1644 }
1645
1646 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1647 pub(crate) fn set_reader(&self, new_reader: Option<ReaderType>) {
1648 *self.reader.borrow_mut() = new_reader;
1649 }
1650
1651 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1652 fn byte_tee(&self, cx: &mut JSContext) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1654 let reader = self.acquire_default_reader(CanGc::from_cx(cx))?;
1659 let reader = Rc::new(RefCell::new(ReaderType::Default(MutNullableDom::new(
1660 Some(&reader),
1661 ))));
1662
1663 let reading = Rc::new(Cell::new(false));
1665
1666 let read_again_for_branch_1 = Rc::new(Cell::new(false));
1668
1669 let read_again_for_branch_2 = Rc::new(Cell::new(false));
1671
1672 let canceled_1 = Rc::new(Cell::new(false));
1674
1675 let canceled_2 = Rc::new(Cell::new(false));
1677
1678 let reason_1 = Rc::new(Heap::default());
1680
1681 let reason_2 = Rc::new(Heap::default());
1683
1684 let cancel_promise = Promise::new2(cx, &self.global());
1686 let reader_version = Rc::new(Cell::new(0));
1687
1688 let byte_tee_source_1 = ByteTeeUnderlyingSource::new(
1689 reader.clone(),
1690 self,
1691 reading.clone(),
1692 read_again_for_branch_1.clone(),
1693 read_again_for_branch_2.clone(),
1694 canceled_1.clone(),
1695 canceled_2.clone(),
1696 reason_1.clone(),
1697 reason_2.clone(),
1698 cancel_promise.clone(),
1699 reader_version.clone(),
1700 ByteTeeCancelAlgorithm::Cancel1Algorithm,
1701 ByteTeePullAlgorithm::Pull1Algorithm,
1702 CanGc::from_cx(cx),
1703 );
1704
1705 let byte_tee_source_2 = ByteTeeUnderlyingSource::new(
1706 reader.clone(),
1707 self,
1708 reading,
1709 read_again_for_branch_1,
1710 read_again_for_branch_2,
1711 canceled_1,
1712 canceled_2,
1713 reason_1,
1714 reason_2,
1715 cancel_promise,
1716 reader_version,
1717 ByteTeeCancelAlgorithm::Cancel2Algorithm,
1718 ByteTeePullAlgorithm::Pull2Algorithm,
1719 CanGc::from_cx(cx),
1720 );
1721
1722 let branch_1 = readable_byte_stream_tee(
1724 cx,
1725 &self.global(),
1726 UnderlyingSourceType::TeeByte(&byte_tee_source_1),
1727 );
1728 byte_tee_source_1.set_branch_1(&branch_1);
1729 byte_tee_source_2.set_branch_1(&branch_1);
1730
1731 let branch_2 = readable_byte_stream_tee(
1733 cx,
1734 &self.global(),
1735 UnderlyingSourceType::TeeByte(&byte_tee_source_2),
1736 );
1737 byte_tee_source_1.set_branch_2(&branch_2);
1738 byte_tee_source_2.set_branch_2(&branch_2);
1739
1740 byte_tee_source_1.forward_reader_error(cx, reader.clone());
1742 byte_tee_source_2.forward_reader_error(cx, reader);
1743
1744 Ok(vec![branch_1, branch_2])
1746 }
1747
1748 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1750 fn default_tee(
1751 &self,
1752 cx: &mut JSContext,
1753 clone_for_branch_2: bool,
1754 ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1755 let clone_for_branch_2 = Rc::new(Cell::new(clone_for_branch_2));
1759
1760 let reader = self.acquire_default_reader(CanGc::from_cx(cx))?;
1762
1763 let reading = Rc::new(Cell::new(false));
1765 let read_again = Rc::new(Cell::new(false));
1767 let canceled_1 = Rc::new(Cell::new(false));
1769 let canceled_2 = Rc::new(Cell::new(false));
1771
1772 let reason_1 = Rc::new(Heap::default());
1774 let reason_2 = Rc::new(Heap::default());
1776 let cancel_promise = Promise::new2(cx, &self.global());
1778
1779 let tee_source_1 = DefaultTeeUnderlyingSource::new(
1780 &reader,
1781 self,
1782 reading.clone(),
1783 read_again.clone(),
1784 canceled_1.clone(),
1785 canceled_2.clone(),
1786 clone_for_branch_2.clone(),
1787 reason_1.clone(),
1788 reason_2.clone(),
1789 cancel_promise.clone(),
1790 DefaultTeeCancelAlgorithm::Cancel1Algorithm,
1791 CanGc::from_cx(cx),
1792 );
1793
1794 let underlying_source_type_branch_1 = UnderlyingSourceType::Tee(&tee_source_1);
1795
1796 let tee_source_2 = DefaultTeeUnderlyingSource::new(
1797 &reader,
1798 self,
1799 reading,
1800 read_again,
1801 canceled_1.clone(),
1802 canceled_2.clone(),
1803 clone_for_branch_2,
1804 reason_1,
1805 reason_2,
1806 cancel_promise.clone(),
1807 DefaultTeeCancelAlgorithm::Cancel2Algorithm,
1808 CanGc::from_cx(cx),
1809 );
1810
1811 let underlying_source_type_branch_2 = UnderlyingSourceType::Tee(&tee_source_2);
1812
1813 let branch_1 = create_readable_stream(
1815 cx,
1816 &self.global(),
1817 underlying_source_type_branch_1,
1818 None,
1819 None,
1820 );
1821 tee_source_1.set_branch_1(&branch_1);
1822 tee_source_2.set_branch_1(&branch_1);
1823
1824 let branch_2 = create_readable_stream(
1826 cx,
1827 &self.global(),
1828 underlying_source_type_branch_2,
1829 None,
1830 None,
1831 );
1832 tee_source_1.set_branch_2(&branch_2);
1833 tee_source_2.set_branch_2(&branch_2);
1834
1835 reader.default_tee_append_native_handler_to_closed_promise(
1837 cx,
1838 &branch_1,
1839 &branch_2,
1840 canceled_1,
1841 canceled_2,
1842 cancel_promise,
1843 );
1844
1845 Ok(vec![branch_1, branch_2])
1847 }
1848
1849 #[allow(clippy::too_many_arguments)]
1851 pub(crate) fn pipe_to(
1852 &self,
1853 cx: &mut CurrentRealm,
1854 global: &GlobalScope,
1855 dest: &WritableStream,
1856 prevent_close: bool,
1857 prevent_abort: bool,
1858 prevent_cancel: bool,
1859 signal: Option<&AbortSignal>,
1860 ) -> Rc<Promise> {
1861 assert!(!self.is_locked());
1872
1873 assert!(!dest.is_locked());
1875
1876 let reader = self
1884 .acquire_default_reader(CanGc::from_cx(cx))
1885 .expect("Acquiring a default reader for pipe_to cannot fail");
1886
1887 let writer = dest
1889 .aquire_default_writer(cx.into(), global, CanGc::from_cx(cx))
1890 .expect("Acquiring a default writer for pipe_to cannot fail");
1891
1892 self.disturbed.set(true);
1894
1895 let promise = Promise::new2(cx, global);
1900
1901 rooted!(&in(cx) let pipe_to = PipeTo {
1903 reader: Dom::from_ref(&reader),
1904 writer: Dom::from_ref(&writer),
1905 pending_writes: Default::default(),
1906 state: Default::default(),
1907 prevent_abort,
1908 prevent_cancel,
1909 prevent_close,
1910 shutting_down: Default::default(),
1911 abort_reason: Default::default(),
1912 shutdown_error: Default::default(),
1913 shutdown_action_promise: Default::default(),
1914 result_promise: promise.clone(),
1915 });
1916
1917 if let Some(signal) = signal {
1920 rooted!(&in(cx) let abort_algorithm = AbortAlgorithm::StreamPiping(pipe_to.clone()));
1923
1924 if signal.aborted() {
1926 signal.run_abort_algorithm(cx, global, &abort_algorithm);
1927 return promise;
1928 }
1929
1930 signal.add(&abort_algorithm);
1932 }
1933
1934 pipe_to.check_and_propagate_errors_forward(cx, global);
1936 pipe_to.check_and_propagate_errors_backward(cx, global);
1937 pipe_to.check_and_propagate_closing_forward(cx, global);
1938 pipe_to.check_and_propagate_closing_backward(cx, global);
1939
1940 if *pipe_to.state.borrow() == PipeToState::Starting {
1942 pipe_to.wait_for_writer_ready(cx, global);
1944 }
1945
1946 promise
1948 }
1949
1950 pub(crate) fn tee(
1952 &self,
1953 cx: &mut JSContext,
1954 clone_for_branch_2: bool,
1955 ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1956 match self.controller.borrow().as_ref() {
1960 Some(ControllerType::Default(_)) => {
1961 self.default_tee(cx, clone_for_branch_2)
1963 },
1964 Some(ControllerType::Byte(_)) => {
1965 self.byte_tee(cx)
1968 },
1969 None => {
1970 unreachable!("Stream should have a controller.");
1971 },
1972 }
1973 }
1974
1975 fn set_up_byte_controller(
1977 &self,
1978 cx: &mut JSContext,
1979 global: &GlobalScope,
1980 underlying_source_dict: JsUnderlyingSource,
1981 underlying_source_handle: SafeHandleObject,
1982 stream: DomRoot<ReadableStream>,
1983 strategy_hwm: f64,
1984 ) -> Fallible<()> {
1985 if let Some(0) = underlying_source_dict.autoAllocateChunkSize {
2001 return Err(Error::Type(c"autoAllocateChunkSize cannot be 0".to_owned()));
2002 }
2003
2004 let controller = ReadableByteStreamController::new(
2005 UnderlyingSourceType::Js(underlying_source_dict),
2006 strategy_hwm,
2007 global,
2008 CanGc::from_cx(cx),
2009 );
2010
2011 controller.set_underlying_source_this_object(underlying_source_handle);
2014
2015 controller.setup(cx, global, stream)
2018 }
2019
2020 pub(crate) fn setup_cross_realm_transform_readable(
2022 &self,
2023 cx: &mut JSContext,
2024 port: &MessagePort,
2025 ) {
2026 let port_id = port.message_port_id();
2027 let global = self.global();
2028
2029 let size_algorithm =
2034 extract_size_algorithm(&QueuingStrategy::default(), CanGc::from_cx(cx));
2035
2036 let controller = ReadableStreamDefaultController::new(
2040 &self.global(),
2041 UnderlyingSourceType::Transfer(port),
2042 0.,
2043 size_algorithm,
2044 CanGc::from_cx(cx),
2045 );
2046
2047 rooted!(&in(cx) let cross_realm_transform_readable = CrossRealmTransformReadable {
2050 controller: Dom::from_ref(&controller),
2051 });
2052 global.note_cross_realm_transform_readable(&cross_realm_transform_readable, port_id);
2053
2054 port.Start(cx);
2056
2057 controller
2059 .setup(cx, DomRoot::from_ref(self))
2060 .expect("Setting up controller for transfer cannot fail.");
2061 }
2062}
2063
2064impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
2065 fn Constructor(
2067 cx: &mut JSContext,
2068 global: &GlobalScope,
2069 proto: Option<SafeHandleObject>,
2070 underlying_source: Option<*mut JSObject>,
2071 strategy: &QueuingStrategy,
2072 ) -> Fallible<DomRoot<Self>> {
2073 rooted!(&in(cx) let underlying_source_obj = underlying_source.unwrap_or(ptr::null_mut()));
2075 let underlying_source_dict = if !underlying_source_obj.is_null() {
2078 rooted!(&in(cx) let obj_val = ObjectValue(underlying_source_obj.get()));
2079 match JsUnderlyingSource::new(cx.into(), obj_val.handle(), CanGc::from_cx(cx)) {
2080 Ok(ConversionResult::Success(val)) => val,
2081 Ok(ConversionResult::Failure(error)) => {
2082 return Err(Error::Type(error.into_owned()));
2083 },
2084 _ => {
2085 return Err(Error::JSFailed);
2086 },
2087 }
2088 } else {
2089 JsUnderlyingSource::empty()
2090 };
2091
2092 let stream = ReadableStream::new_with_proto(global, proto, CanGc::from_cx(cx));
2094
2095 if underlying_source_dict.type_.is_some() {
2096 if strategy.size.is_some() {
2098 return Err(Error::Range(
2099 c"size is not supported for byte streams".to_owned(),
2100 ));
2101 }
2102
2103 let strategy_hwm = extract_high_water_mark(strategy, 0.0)?;
2105
2106 stream.set_up_byte_controller(
2109 cx,
2110 global,
2111 underlying_source_dict,
2112 underlying_source_obj.handle(),
2113 stream.clone(),
2114 strategy_hwm,
2115 )?;
2116 } else {
2117 let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
2119
2120 let size_algorithm = extract_size_algorithm(strategy, CanGc::from_cx(cx));
2122
2123 let controller = ReadableStreamDefaultController::new(
2124 global,
2125 UnderlyingSourceType::Js(underlying_source_dict),
2126 high_water_mark,
2127 size_algorithm,
2128 CanGc::from_cx(cx),
2129 );
2130
2131 controller.set_underlying_source_this_object(underlying_source_obj.handle());
2134
2135 controller.setup(cx, stream.clone())?;
2137 };
2138
2139 Ok(stream)
2140 }
2141
2142 fn Locked(&self) -> bool {
2144 self.is_locked()
2145 }
2146
2147 fn Cancel(&self, cx: &mut JSContext, reason: SafeHandleValue) -> Rc<Promise> {
2149 let global = self.global();
2150 if self.is_locked() {
2151 let promise = Promise::new2(cx, &global);
2154 promise.reject_error(
2155 Error::Type(c"stream is locked".to_owned()),
2156 CanGc::from_cx(cx),
2157 );
2158 promise
2159 } else {
2160 self.cancel(cx, &global, reason)
2162 }
2163 }
2164
2165 fn GetReader(
2167 &self,
2168 options: &ReadableStreamGetReaderOptions,
2169 can_gc: CanGc,
2170 ) -> Fallible<ReadableStreamReader> {
2171 if options.mode.is_none() {
2173 return Ok(ReadableStreamReader::ReadableStreamDefaultReader(
2174 self.acquire_default_reader(can_gc)?,
2175 ));
2176 }
2177 assert!(options.mode.unwrap() == ReadableStreamReaderMode::Byob);
2179
2180 Ok(ReadableStreamReader::ReadableStreamBYOBReader(
2182 self.acquire_byob_reader(can_gc)?,
2183 ))
2184 }
2185
2186 fn Tee(&self, cx: &mut JSContext) -> Fallible<Vec<DomRoot<ReadableStream>>> {
2188 self.tee(cx, false)
2190 }
2191
2192 fn PipeTo(
2194 &self,
2195 cx: &mut CurrentRealm,
2196 destination: &WritableStream,
2197 options: &StreamPipeOptions,
2198 ) -> Rc<Promise> {
2199 let global = self.global();
2200
2201 if self.is_locked() {
2203 let promise = Promise::new2(cx, &global);
2205 promise.reject_error(
2206 Error::Type(c"Source stream is locked".to_owned()),
2207 CanGc::from_cx(cx),
2208 );
2209 return promise;
2210 }
2211
2212 if destination.is_locked() {
2214 let promise = Promise::new2(cx, &global);
2216 promise.reject_error(
2217 Error::Type(c"Destination stream is locked".to_owned()),
2218 CanGc::from_cx(cx),
2219 );
2220 return promise;
2221 }
2222
2223 let signal = options.signal.as_deref();
2225
2226 self.pipe_to(
2228 cx,
2229 &global,
2230 destination,
2231 options.preventClose,
2232 options.preventAbort,
2233 options.preventCancel,
2234 signal,
2235 )
2236 }
2237
2238 fn PipeThrough(
2240 &self,
2241 cx: &mut CurrentRealm,
2242 transform: &ReadableWritablePair,
2243 options: &StreamPipeOptions,
2244 ) -> Fallible<DomRoot<ReadableStream>> {
2245 let global = self.global();
2246
2247 if self.is_locked() {
2249 return Err(Error::Type(c"Source stream is locked".to_owned()));
2250 }
2251
2252 if transform.writable.is_locked() {
2254 return Err(Error::Type(c"Destination stream is locked".to_owned()));
2255 }
2256
2257 let signal = options.signal.as_deref();
2259
2260 let promise = self.pipe_to(
2263 cx,
2264 &global,
2265 &transform.writable,
2266 options.preventClose,
2267 options.preventAbort,
2268 options.preventCancel,
2269 signal,
2270 );
2271
2272 promise.set_promise_is_handled();
2274
2275 Ok(transform.readable.clone())
2277 }
2278}
2279
2280#[expect(unsafe_code)]
2281pub(crate) unsafe fn get_type_and_value_from_message(
2285 cx: SafeJSContext,
2286 data: SafeHandleValue,
2287 value: SafeMutableHandleValue,
2288 can_gc: CanGc,
2289) -> DOMString {
2290 assert!(data.is_object());
2296 rooted!(in(*cx) let data_object = data.to_object());
2297
2298 rooted!(in(*cx) let mut type_ = UndefinedValue());
2300 unsafe {
2301 get_dictionary_property(
2302 *cx,
2303 data_object.handle(),
2304 c"type",
2305 type_.handle_mut(),
2306 can_gc,
2307 )
2308 }
2309 .expect("Getting the type should not fail.");
2310
2311 unsafe { get_dictionary_property(*cx, data_object.handle(), c"value", value, can_gc) }
2313 .expect("Getting the value should not fail.");
2314
2315 let result =
2317 DOMString::safe_from_jsval(cx, type_.handle(), StringificationBehavior::Empty, can_gc)
2318 .expect("The type of the message should be a string");
2319 let ConversionResult::Success(type_string) = result else {
2320 unreachable!("The type of the message should be a string");
2321 };
2322
2323 type_string
2324}
2325
2326impl js::gc::Rootable for CrossRealmTransformReadable {}
2327
2328#[derive(Clone, JSTraceable, MallocSizeOf)]
2332#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
2333pub(crate) struct CrossRealmTransformReadable {
2334 controller: Dom<ReadableStreamDefaultController>,
2336}
2337
2338impl CrossRealmTransformReadable {
2339 #[expect(unsafe_code)]
2342 pub(crate) fn handle_message(
2343 &self,
2344 cx: &mut CurrentRealm,
2345 global: &GlobalScope,
2346 port: &MessagePort,
2347 message: SafeHandleValue,
2348 ) {
2349 rooted!(&in(cx) let mut value = UndefinedValue());
2350 let type_string = unsafe {
2351 get_type_and_value_from_message(
2352 cx.into(),
2353 message,
2354 value.handle_mut(),
2355 CanGc::from_cx(cx),
2356 )
2357 };
2358
2359 if type_string == "chunk" {
2361 self.controller
2363 .enqueue(cx, value.handle())
2364 .expect("Enqueing a chunk should not fail.");
2365 }
2366
2367 if type_string == "close" {
2369 self.controller.close(cx);
2371
2372 global.disentangle_port(cx, port);
2374 }
2375
2376 if type_string == "error" {
2378 self.controller.error(cx, value.handle());
2380
2381 global.disentangle_port(cx, port);
2383 }
2384 }
2385
2386 pub(crate) fn handle_error(
2389 &self,
2390 cx: &mut CurrentRealm,
2391 global: &GlobalScope,
2392 port: &MessagePort,
2393 ) {
2394 let error = DOMException::new(global, DOMErrorName::DataCloneError, CanGc::from_cx(cx));
2396 rooted!(&in(cx) let mut rooted_error = UndefinedValue());
2397 error.safe_to_jsval(cx.into(), rooted_error.handle_mut(), CanGc::from_cx(cx));
2398
2399 port.cross_realm_transform_send_error(cx, rooted_error.handle());
2401
2402 self.controller.error(cx, rooted_error.handle());
2404
2405 global.disentangle_port(cx, port);
2407 }
2408}
2409
2410#[expect(unsafe_code)]
2411pub(crate) fn get_read_promise_done(
2413 cx: SafeJSContext,
2414 v: &SafeHandleValue,
2415 can_gc: CanGc,
2416) -> Result<bool, Error> {
2417 if !v.is_object() {
2418 return Err(Error::Type(c"Unknown format for done property.".to_owned()));
2419 }
2420 unsafe {
2421 rooted!(in(*cx) let object = v.to_object());
2422 rooted!(in(*cx) let mut done = UndefinedValue());
2423 match get_dictionary_property(*cx, object.handle(), c"done", done.handle_mut(), can_gc) {
2424 Ok(true) => match bool::safe_from_jsval(cx, done.handle(), (), can_gc) {
2425 Ok(ConversionResult::Success(val)) => Ok(val),
2426 Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.into_owned())),
2427 _ => Err(Error::Type(c"Unknown format for done property.".to_owned())),
2428 },
2429 Ok(false) => Err(Error::Type(c"Promise has no done property.".to_owned())),
2430 Err(()) => Err(Error::JSFailed),
2431 }
2432 }
2433}
2434
2435#[expect(unsafe_code)]
2436pub(crate) fn get_read_promise_bytes(
2438 cx: SafeJSContext,
2439 v: &SafeHandleValue,
2440 can_gc: CanGc,
2441) -> Result<Vec<u8>, Error> {
2442 if !v.is_object() {
2443 return Err(Error::Type(
2444 c"Unknown format for for bytes read.".to_owned(),
2445 ));
2446 }
2447 unsafe {
2448 rooted!(in(*cx) let object = v.to_object());
2449 rooted!(in(*cx) let mut bytes = UndefinedValue());
2450 match get_dictionary_property(*cx, object.handle(), c"value", bytes.handle_mut(), can_gc) {
2451 Ok(true) => {
2452 match Vec::<u8>::safe_from_jsval(
2453 cx,
2454 bytes.handle(),
2455 ConversionBehavior::EnforceRange,
2456 can_gc,
2457 ) {
2458 Ok(ConversionResult::Success(val)) => Ok(val),
2459 Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.into_owned())),
2460 _ => Err(Error::Type(c"Unknown format for bytes read.".to_owned())),
2461 }
2462 },
2463 Ok(false) => Err(Error::Type(c"Promise has no value property.".to_owned())),
2464 Err(()) => Err(Error::JSFailed),
2465 }
2466 }
2467}
2468
2469pub(crate) fn bytes_from_chunk_jsval(
2473 cx: SafeJSContext,
2474 chunk: &RootedTraceableBox<Heap<JSVal>>,
2475 can_gc: CanGc,
2476) -> Result<Vec<u8>, Error> {
2477 match Vec::<u8>::safe_from_jsval(cx, chunk.handle(), ConversionBehavior::EnforceRange, can_gc) {
2478 Ok(ConversionResult::Success(vec)) => Ok(vec),
2479 Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.into_owned())),
2480 _ => Err(Error::Type(c"Unknown format for bytes read.".to_owned())),
2481 }
2482}
2483
2484impl Transferable for ReadableStream {
2486 type Index = MessagePortIndex;
2487 type Data = MessagePortImpl;
2488
2489 fn transfer(&self, cx: &mut JSContext) -> Fallible<(MessagePortId, MessagePortImpl)> {
2491 if self.is_locked() {
2494 return Err(Error::DataClone(None));
2495 }
2496
2497 let global = self.global();
2498 let mut realm = enter_auto_realm(cx, &*global);
2499 let mut realm = realm.current_realm();
2500 let cx = &mut realm;
2501
2502 let port_1 = MessagePort::new(&global, CanGc::from_cx(cx));
2504 global.track_message_port(&port_1, None);
2505
2506 let port_2 = MessagePort::new(&global, CanGc::from_cx(cx));
2508 global.track_message_port(&port_2, None);
2509
2510 global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
2512
2513 let writable = WritableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
2515
2516 writable.setup_cross_realm_transform_writable(cx, &port_1);
2518
2519 let promise = self.pipe_to(cx, &global, &writable, false, false, false, None);
2521
2522 promise.set_promise_is_handled();
2524
2525 port_2.transfer(cx)
2527 }
2528
2529 fn transfer_receive(
2531 cx: &mut JSContext,
2532 owner: &GlobalScope,
2533 id: MessagePortId,
2534 port_impl: MessagePortImpl,
2535 ) -> Result<DomRoot<Self>, ()> {
2536 let value = ReadableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
2539
2540 let transferred_port = MessagePort::transfer_receive(cx, owner, id, port_impl)?;
2547
2548 value.setup_cross_realm_transform_readable(cx, &transferred_port);
2550 Ok(value)
2551 }
2552
2553 fn serialized_storage<'a>(
2555 data: StructuredData<'a, '_>,
2556 ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
2557 match data {
2558 StructuredData::Reader(r) => &mut r.port_impls,
2559 StructuredData::Writer(w) => &mut w.ports,
2560 }
2561 }
2562}