1use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr::{self};
8use std::rc::Rc;
9
10use base::id::{MessagePortId, MessagePortIndex};
11use constellation_traits::MessagePortImpl;
12use dom_struct::dom_struct;
13use ipc_channel::ipc::IpcSharedMemory;
14use js::jsapi::{Heap, JSObject};
15use js::jsval::{JSVal, ObjectValue, UndefinedValue};
16use js::rust::{
17 HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
18 MutableHandleValue as SafeMutableHandleValue,
19};
20use js::typedarray::ArrayBufferViewU8;
21use rustc_hash::FxHashMap;
22use script_bindings::conversions::SafeToJSValConvertible;
23
24use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
25use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{
26 ReadableStreamGetReaderOptions, ReadableStreamMethods, ReadableStreamReaderMode,
27 ReadableWritablePair, StreamPipeOptions,
28};
29use script_bindings::str::DOMString;
30
31use crate::dom::domexception::{DOMErrorName, DOMException};
32use script_bindings::conversions::{is_array_like, StringificationBehavior};
33use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
34use crate::dom::abortsignal::{AbortAlgorithm, AbortSignal};
35use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods;
36use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultController_Binding::ReadableStreamDefaultControllerMethods;
37use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource;
38use crate::dom::bindings::conversions::{ConversionBehavior, ConversionResult, SafeFromJSValConvertible};
39use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
40use crate::dom::bindings::codegen::GenericBindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriter_Binding::WritableStreamDefaultWriterMethods;
41use crate::dom::writablestream::WritableStream;
42use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultReaderOrReadableStreamBYOBReader as ReadableStreamReader;
43use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
44use crate::dom::bindings::root::{DomRoot, MutNullableDom, Dom};
45use crate::dom::bindings::trace::RootedTraceableBox;
46use crate::dom::bindings::utils::get_dictionary_property;
47use crate::dom::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
48use crate::dom::readablestreamgenericreader::ReadableStreamGenericReader;
49use crate::dom::globalscope::GlobalScope;
50use crate::dom::promise::{wait_for_all_promise, Promise};
51use crate::dom::readablebytestreamcontroller::ReadableByteStreamController;
52use crate::dom::readablestreambyobreader::ReadableStreamBYOBReader;
53use crate::dom::readablestreamdefaultcontroller::ReadableStreamDefaultController;
54use crate::dom::readablestreamdefaultreader::{ReadRequest, ReadableStreamDefaultReader};
55use crate::dom::defaultteeunderlyingsource::TeeCancelAlgorithm;
56use crate::dom::types::DefaultTeeUnderlyingSource;
57use crate::dom::underlyingsourcecontainer::UnderlyingSourceType;
58use crate::dom::writablestreamdefaultwriter::WritableStreamDefaultWriter;
59use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
60use crate::dom::messageport::MessagePort;
61use crate::realms::{enter_realm, InRealm};
62use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
63use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
64use crate::dom::bindings::transferable::Transferable;
65use crate::dom::bindings::structuredclone::StructuredData;
66
67use super::bindings::buffer_source::HeapBufferSource;
68use super::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::ReadableStreamBYOBReaderReadOptions;
69use super::readablestreambyobreader::ReadIntoRequest;
70
71#[derive(Clone, Debug, Default, MallocSizeOf, PartialEq)]
73enum PipeToState {
74 #[default]
76 Starting,
77 PendingReady,
79 PendingRead,
81 ShuttingDownWithPendingWrites(Option<ShutdownAction>),
84 ShuttingDownPendingAction,
88 Finalized,
91}
92
93#[derive(Clone, Debug, MallocSizeOf, PartialEq)]
95enum ShutdownAction {
96 WritableStreamAbort,
98 ReadableStreamCancel,
100 WritableStreamDefaultWriterCloseWithErrorPropagation,
102 Abort,
104}
105
106impl js::gc::Rootable for PipeTo {}
107
108#[derive(Clone, JSTraceable, MallocSizeOf)]
117#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
118pub(crate) struct PipeTo {
119 reader: Dom<ReadableStreamDefaultReader>,
121
122 writer: Dom<WritableStreamDefaultWriter>,
124
125 #[ignore_malloc_size_of = "nested Rc"]
128 pending_writes: Rc<RefCell<VecDeque<Rc<Promise>>>>,
129
130 #[conditional_malloc_size_of]
132 #[no_trace]
133 state: Rc<RefCell<PipeToState>>,
134
135 prevent_abort: bool,
137
138 prevent_cancel: bool,
140
141 prevent_close: bool,
143
144 #[conditional_malloc_size_of]
147 shutting_down: Rc<Cell<bool>>,
148
149 #[ignore_malloc_size_of = "mozjs"]
152 abort_reason: Rc<Heap<JSVal>>,
153
154 #[ignore_malloc_size_of = "mozjs"]
157 shutdown_error: Rc<RefCell<Option<Heap<JSVal>>>>,
158
159 #[ignore_malloc_size_of = "nested Rc"]
162 shutdown_action_promise: Rc<RefCell<Option<Rc<Promise>>>>,
163
164 #[conditional_malloc_size_of]
167 result_promise: Rc<Promise>,
168}
169
170impl PipeTo {
171 pub(crate) fn abort_with_reason(
174 &self,
175 cx: SafeJSContext,
176 global: &GlobalScope,
177 reason: SafeHandleValue,
178 realm: InRealm,
179 can_gc: CanGc,
180 ) {
181 if self.shutting_down.get() {
183 return;
184 }
185
186 self.abort_reason.set(reason.get());
190
191 self.set_shutdown_error(reason);
196
197 self.shutdown(cx, global, Some(ShutdownAction::Abort), realm, can_gc);
203 }
204}
205
206impl Callback for PipeTo {
207 #[expect(unsafe_code)]
216 fn callback(&self, cx: SafeJSContext, result: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
217 let global = self.reader.global();
218
219 self.pending_writes.borrow_mut().retain(|p| {
225 let pending = p.is_pending();
226 if !pending {
227 p.set_promise_is_handled();
228 }
229 pending
230 });
231
232 let state_before_checks = self.state.borrow().clone();
234
235 if state_before_checks == PipeToState::PendingRead {
242 let source = self.reader.get_stream().expect("Source stream must be set");
243 if source.is_closed() {
244 let dest = self
245 .writer
246 .get_stream()
247 .expect("Destination stream must be set");
248
249 if dest.is_writable() && !dest.close_queued_or_in_flight() {
252 let Ok(done) = get_read_promise_done(cx, &result, can_gc) else {
253 return;
260 };
261
262 if !done {
263 self.write_chunk(cx, &global, result, can_gc);
265 }
266 }
267 }
268 }
269
270 self.check_and_propagate_errors_forward(cx, &global, realm, can_gc);
271 self.check_and_propagate_errors_backward(cx, &global, realm, can_gc);
272 self.check_and_propagate_closing_forward(cx, &global, realm, can_gc);
273 self.check_and_propagate_closing_backward(cx, &global, realm, can_gc);
274
275 let state = self.state.borrow().clone();
277
278 if state != state_before_checks {
282 return;
283 }
284
285 match state {
286 PipeToState::Starting => unreachable!("PipeTo should not be in the Starting state."),
287 PipeToState::PendingReady => {
288 self.read_chunk(&global, realm, can_gc);
290 },
291 PipeToState::PendingRead => {
292 self.write_chunk(cx, &global, result, can_gc);
294
295 if self.shutting_down.get() {
297 return;
298 }
299
300 self.wait_for_writer_ready(&global, realm, can_gc);
302 },
303 PipeToState::ShuttingDownWithPendingWrites(action) => {
304 if let Some(write) = self.pending_writes.borrow_mut().front().cloned() {
307 self.wait_on_pending_write(&global, write, realm, can_gc);
308 return;
309 }
310
311 if let Some(action) = action {
313 self.perform_action(cx, &global, action, realm, can_gc);
315 } else {
316 self.finalize(cx, &global, can_gc);
318 }
319 },
320 PipeToState::ShuttingDownPendingAction => {
321 let Some(ref promise) = *self.shutdown_action_promise.borrow() else {
322 unreachable!();
323 };
324 if promise.is_pending() {
325 return;
329 }
330
331 let is_array_like = {
332 if !result.is_object() {
333 false
334 } else {
335 unsafe { is_array_like::<crate::DomTypeHolder>(*cx, result) }
336 }
337 };
338
339 if !result.is_undefined() && !is_array_like {
341 self.set_shutdown_error(result);
351 }
352 self.finalize(cx, &global, can_gc);
353 },
354 PipeToState::Finalized => {},
355 }
356 }
357}
358
359impl PipeTo {
360 fn set_shutdown_error(&self, error: SafeHandleValue) {
363 *self.shutdown_error.borrow_mut() = Some(Heap::default());
364 let Some(ref heap) = *self.shutdown_error.borrow() else {
365 unreachable!("Option set to Some(heap) above.");
366 };
367 heap.set(error.get())
368 }
369
370 fn wait_for_writer_ready(&self, global: &GlobalScope, realm: InRealm, can_gc: CanGc) {
373 {
374 let mut state = self.state.borrow_mut();
375 *state = PipeToState::PendingReady;
376 }
377
378 let ready_promise = self.writer.Ready();
379 if ready_promise.is_fulfilled() {
380 self.read_chunk(global, realm, can_gc);
381 } else {
382 let handler = PromiseNativeHandler::new(
383 global,
384 Some(Box::new(self.clone())),
385 Some(Box::new(self.clone())),
386 can_gc,
387 );
388 ready_promise.append_native_handler(&handler, realm, can_gc);
389
390 let closed_promise = self.reader.Closed();
394 closed_promise.append_native_handler(&handler, realm, can_gc);
395 }
396 }
397
398 fn read_chunk(&self, global: &GlobalScope, realm: InRealm, can_gc: CanGc) {
400 *self.state.borrow_mut() = PipeToState::PendingRead;
401 let chunk_promise = self.reader.Read(can_gc);
402 let handler = PromiseNativeHandler::new(
403 global,
404 Some(Box::new(self.clone())),
405 Some(Box::new(self.clone())),
406 can_gc,
407 );
408 chunk_promise.append_native_handler(&handler, realm, can_gc);
409
410 let ready_promise = self.writer.Closed();
413 ready_promise.append_native_handler(&handler, realm, can_gc);
414 }
415
416 #[expect(unsafe_code)]
419 fn write_chunk(
420 &self,
421 cx: SafeJSContext,
422 global: &GlobalScope,
423 chunk: SafeHandleValue,
424 can_gc: CanGc,
425 ) -> bool {
426 if chunk.is_object() {
427 rooted!(in(*cx) let object = chunk.to_object());
428 rooted!(in(*cx) let mut bytes = UndefinedValue());
429 let has_value = unsafe {
430 get_dictionary_property(*cx, object.handle(), "value", bytes.handle_mut(), can_gc)
431 .expect("Chunk should have a value.")
432 };
433 if has_value {
434 let write_promise = self.writer.write(cx, global, bytes.handle(), can_gc);
436 self.pending_writes.borrow_mut().push_back(write_promise);
437 return true;
438 }
439 }
440 false
441 }
442
443 fn wait_on_pending_write(
447 &self,
448 global: &GlobalScope,
449 promise: Rc<Promise>,
450 realm: InRealm,
451 can_gc: CanGc,
452 ) {
453 let handler = PromiseNativeHandler::new(
454 global,
455 Some(Box::new(self.clone())),
456 Some(Box::new(self.clone())),
457 can_gc,
458 );
459 promise.append_native_handler(&handler, realm, can_gc);
460 }
461
462 fn check_and_propagate_errors_forward(
465 &self,
466 cx: SafeJSContext,
467 global: &GlobalScope,
468 realm: InRealm,
469 can_gc: CanGc,
470 ) {
471 if self.shutting_down.get() {
474 return;
475 }
476
477 let source = self
479 .reader
480 .get_stream()
481 .expect("Reader should still have a stream");
482 if source.is_errored() {
483 rooted!(in(*cx) let mut source_error = UndefinedValue());
484 source.get_stored_error(source_error.handle_mut());
485 self.set_shutdown_error(source_error.handle());
486
487 if !self.prevent_abort {
489 self.shutdown(
492 cx,
493 global,
494 Some(ShutdownAction::WritableStreamAbort),
495 realm,
496 can_gc,
497 )
498 } else {
499 self.shutdown(cx, global, None, realm, can_gc);
501 }
502 }
503 }
504
505 fn check_and_propagate_errors_backward(
508 &self,
509 cx: SafeJSContext,
510 global: &GlobalScope,
511 realm: InRealm,
512 can_gc: CanGc,
513 ) {
514 if self.shutting_down.get() {
517 return;
518 }
519
520 let dest = self
522 .writer
523 .get_stream()
524 .expect("Writer should still have a stream");
525 if dest.is_errored() {
526 rooted!(in(*cx) let mut dest_error = UndefinedValue());
527 dest.get_stored_error(dest_error.handle_mut());
528 self.set_shutdown_error(dest_error.handle());
529
530 if !self.prevent_cancel {
532 self.shutdown(
535 cx,
536 global,
537 Some(ShutdownAction::ReadableStreamCancel),
538 realm,
539 can_gc,
540 )
541 } else {
542 self.shutdown(cx, global, None, realm, can_gc);
544 }
545 }
546 }
547
548 fn check_and_propagate_closing_forward(
551 &self,
552 cx: SafeJSContext,
553 global: &GlobalScope,
554 realm: InRealm,
555 can_gc: CanGc,
556 ) {
557 if self.shutting_down.get() {
560 return;
561 }
562
563 let source = self
565 .reader
566 .get_stream()
567 .expect("Reader should still have a stream");
568 if source.is_closed() {
569 if !self.prevent_close {
571 self.shutdown(
574 cx,
575 global,
576 Some(ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation),
577 realm,
578 can_gc,
579 )
580 } else {
581 self.shutdown(cx, global, None, realm, can_gc);
583 }
584 }
585 }
586
587 fn check_and_propagate_closing_backward(
590 &self,
591 cx: SafeJSContext,
592 global: &GlobalScope,
593 realm: InRealm,
594 can_gc: CanGc,
595 ) {
596 if self.shutting_down.get() {
599 return;
600 }
601
602 let dest = self
605 .writer
606 .get_stream()
607 .expect("Writer should still have a stream");
608 if dest.close_queued_or_in_flight() || dest.is_closed() {
609 rooted!(in(*cx) let mut dest_closed = UndefinedValue());
614 let error =
615 Error::Type("Destination is closed or has closed queued or in flight".to_string());
616 error.to_jsval(cx, global, dest_closed.handle_mut(), can_gc);
617 self.set_shutdown_error(dest_closed.handle());
618
619 if !self.prevent_cancel {
621 self.shutdown(
624 cx,
625 global,
626 Some(ShutdownAction::ReadableStreamCancel),
627 realm,
628 can_gc,
629 )
630 } else {
631 self.shutdown(cx, global, None, realm, can_gc);
633 }
634 }
635 }
636
637 fn shutdown(
641 &self,
642 cx: SafeJSContext,
643 global: &GlobalScope,
644 action: Option<ShutdownAction>,
645 realm: InRealm,
646 can_gc: CanGc,
647 ) {
648 if !self.shutting_down.replace(true) {
651 let dest = self.writer.get_stream().expect("Stream must be set");
652 if dest.is_writable() && !dest.close_queued_or_in_flight() {
655 if let Some(write) = self.pending_writes.borrow_mut().front() {
661 *self.state.borrow_mut() = PipeToState::ShuttingDownWithPendingWrites(action);
662 self.wait_on_pending_write(global, write.clone(), realm, can_gc);
663 return;
664 }
665 }
666
667 if let Some(action) = action {
669 self.perform_action(cx, global, action, realm, can_gc);
671 } else {
672 self.finalize(cx, global, can_gc);
674 }
675 }
676 }
677
678 fn perform_action(
681 &self,
682 cx: SafeJSContext,
683 global: &GlobalScope,
684 action: ShutdownAction,
685 realm: InRealm,
686 can_gc: CanGc,
687 ) {
688 rooted!(in(*cx) let mut error = UndefinedValue());
689 if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
690 error.set(shutdown_error.get());
691 }
692
693 *self.state.borrow_mut() = PipeToState::ShuttingDownPendingAction;
694
695 let promise = match action {
697 ShutdownAction::WritableStreamAbort => {
698 let dest = self.writer.get_stream().expect("Stream must be set");
699 dest.abort(cx, global, error.handle(), realm, can_gc)
700 },
701 ShutdownAction::ReadableStreamCancel => {
702 let source = self
703 .reader
704 .get_stream()
705 .expect("Reader should have a stream.");
706 source.cancel(cx, global, error.handle(), can_gc)
707 },
708 ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation => {
709 self.writer.close_with_error_propagation(cx, global, can_gc)
710 },
711 ShutdownAction::Abort => {
712 rooted!(in(*cx) let mut error = UndefinedValue());
717 error.set(self.abort_reason.get());
718
719 let mut actions = vec![];
721
722 if !self.prevent_abort {
724 let dest = self
725 .writer
726 .get_stream()
727 .expect("Destination stream must be set");
728
729 let promise = if dest.is_writable() {
731 dest.abort(cx, global, error.handle(), realm, can_gc)
733 } else {
734 Promise::new_resolved(global, cx, (), can_gc)
736 };
737 actions.push(promise);
738 }
739
740 if !self.prevent_cancel {
742 let source = self.reader.get_stream().expect("Source stream must be set");
743
744 let promise = if source.is_readable() {
746 source.cancel(cx, global, error.handle(), can_gc)
748 } else {
749 Promise::new_resolved(global, cx, (), can_gc)
751 };
752 actions.push(promise);
753 }
754
755 wait_for_all_promise(cx, global, actions, realm, can_gc)
759 },
760 };
761
762 let handler = PromiseNativeHandler::new(
765 global,
766 Some(Box::new(self.clone())),
767 Some(Box::new(self.clone())),
768 can_gc,
769 );
770 promise.append_native_handler(&handler, realm, can_gc);
771 *self.shutdown_action_promise.borrow_mut() = Some(promise);
772 }
773
774 fn finalize(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
776 *self.state.borrow_mut() = PipeToState::Finalized;
777
778 self.writer.release(cx, global, can_gc);
780
781 self.reader
787 .release(can_gc)
788 .expect("Releasing the reader should not fail");
789
790 if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
796 rooted!(in(*cx) let mut error = UndefinedValue());
797 error.set(shutdown_error.get());
798 self.result_promise.reject_native(&error.handle(), can_gc);
800 } else {
801 self.result_promise.resolve_native(&(), can_gc);
803 }
804 }
805}
806
807#[derive(Clone, JSTraceable, MallocSizeOf)]
810struct SourceCancelPromiseFulfillmentHandler {
811 #[conditional_malloc_size_of]
812 result: Rc<Promise>,
813}
814
815impl Callback for SourceCancelPromiseFulfillmentHandler {
816 fn callback(&self, _cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
820 self.result.resolve_native(&(), can_gc);
821 }
822}
823
824#[derive(Clone, JSTraceable, MallocSizeOf)]
827struct SourceCancelPromiseRejectionHandler {
828 #[conditional_malloc_size_of]
829 result: Rc<Promise>,
830}
831
832impl Callback for SourceCancelPromiseRejectionHandler {
833 fn callback(&self, _cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
837 self.result.reject_native(&v, can_gc);
838 }
839}
840
841#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf, PartialEq)]
843pub(crate) enum ReadableStreamState {
844 #[default]
845 Readable,
846 Closed,
847 Errored,
848}
849
850#[derive(JSTraceable, MallocSizeOf)]
852#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
853pub(crate) enum ControllerType {
854 Byte(MutNullableDom<ReadableByteStreamController>),
856 Default(MutNullableDom<ReadableStreamDefaultController>),
858}
859
860#[derive(JSTraceable, MallocSizeOf)]
862#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
863pub(crate) enum ReaderType {
864 #[allow(clippy::upper_case_acronyms)]
866 BYOB(MutNullableDom<ReadableStreamBYOBReader>),
867 Default(MutNullableDom<ReadableStreamDefaultReader>),
869}
870
871impl Eq for ReaderType {}
872impl PartialEq for ReaderType {
873 fn eq(&self, other: &Self) -> bool {
874 matches!(
875 (self, other),
876 (ReaderType::BYOB(_), ReaderType::BYOB(_)) |
877 (ReaderType::Default(_), ReaderType::Default(_))
878 )
879 }
880}
881
882#[cfg_attr(crown, allow(crown::unrooted_must_root))]
884pub(crate) fn create_readable_stream(
885 global: &GlobalScope,
886 underlying_source_type: UnderlyingSourceType,
887 queuing_strategy: Option<Rc<QueuingStrategySize>>,
888 high_water_mark: Option<f64>,
889 can_gc: CanGc,
890) -> DomRoot<ReadableStream> {
891 let high_water_mark = high_water_mark.unwrap_or(1.0);
893
894 let size_algorithm =
896 queuing_strategy.unwrap_or(extract_size_algorithm(&QueuingStrategy::empty(), can_gc));
897
898 assert!(high_water_mark >= 0.0);
900
901 let stream = ReadableStream::new_with_proto(global, None, can_gc);
904
905 let controller = ReadableStreamDefaultController::new(
907 global,
908 underlying_source_type,
909 high_water_mark,
910 size_algorithm,
911 can_gc,
912 );
913
914 controller
917 .setup(stream.clone(), can_gc)
918 .expect("Setup of default controller cannot fail");
919
920 stream
922}
923
924#[dom_struct]
926pub(crate) struct ReadableStream {
927 reflector_: Reflector,
928
929 controller: RefCell<Option<ControllerType>>,
933
934 #[ignore_malloc_size_of = "mozjs"]
936 stored_error: Heap<JSVal>,
937
938 disturbed: Cell<bool>,
940
941 reader: RefCell<Option<ReaderType>>,
943
944 state: Cell<ReadableStreamState>,
946}
947
948impl ReadableStream {
949 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
950 fn new_inherited() -> ReadableStream {
952 ReadableStream {
953 reflector_: Reflector::new(),
954 controller: RefCell::new(None),
955 stored_error: Heap::default(),
956 disturbed: Default::default(),
957 reader: RefCell::new(None),
958 state: Cell::new(Default::default()),
959 }
960 }
961
962 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
963 pub(crate) fn new_with_proto(
964 global: &GlobalScope,
965 proto: Option<SafeHandleObject>,
966 can_gc: CanGc,
967 ) -> DomRoot<ReadableStream> {
968 reflect_dom_object_with_proto(
969 Box::new(ReadableStream::new_inherited()),
970 global,
971 proto,
972 can_gc,
973 )
974 }
975
976 pub(crate) fn set_default_controller(&self, controller: &ReadableStreamDefaultController) {
979 *self.controller.borrow_mut() = Some(ControllerType::Default(MutNullableDom::new(Some(
980 controller,
981 ))));
982 }
983
984 pub(crate) fn set_byte_controller(&self, controller: &ReadableByteStreamController) {
987 *self.controller.borrow_mut() =
988 Some(ControllerType::Byte(MutNullableDom::new(Some(controller))));
989 }
990
991 pub(crate) fn assert_no_controller(&self) {
994 let has_no_controller = self.controller.borrow().is_none();
995 assert!(has_no_controller);
996 }
997
998 pub(crate) fn new_from_bytes(
1000 global: &GlobalScope,
1001 bytes: Vec<u8>,
1002 can_gc: CanGc,
1003 ) -> Fallible<DomRoot<ReadableStream>> {
1004 let stream = ReadableStream::new_with_external_underlying_source(
1005 global,
1006 UnderlyingSourceType::Memory(bytes.len()),
1007 can_gc,
1008 )?;
1009 stream.enqueue_native(bytes, can_gc);
1010 stream.controller_close_native(can_gc);
1011 Ok(stream)
1012 }
1013
1014 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
1017 pub(crate) fn new_with_external_underlying_source(
1018 global: &GlobalScope,
1019 source: UnderlyingSourceType,
1020 can_gc: CanGc,
1021 ) -> Fallible<DomRoot<ReadableStream>> {
1022 assert!(source.is_native());
1023 let stream = ReadableStream::new_with_proto(global, None, can_gc);
1024 let controller = ReadableStreamDefaultController::new(
1025 global,
1026 source,
1027 1.0,
1028 extract_size_algorithm(&QueuingStrategy::empty(), can_gc),
1029 can_gc,
1030 );
1031 controller.setup(stream.clone(), can_gc)?;
1032 Ok(stream)
1033 }
1034
1035 pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
1037 match self.controller.borrow().as_ref() {
1038 Some(ControllerType::Default(controller)) => {
1039 let controller = controller
1040 .get()
1041 .ok_or_else(|| Error::Type("Stream should have controller.".to_string()))?;
1042 controller.perform_release_steps()
1043 },
1044 Some(ControllerType::Byte(controller)) => {
1045 let controller = controller
1046 .get()
1047 .ok_or_else(|| Error::Type("Stream should have controller.".to_string()))?;
1048 controller.perform_release_steps()
1049 },
1050 None => Err(Error::Type("Stream should have controller.".to_string())),
1051 }
1052 }
1053
1054 pub(crate) fn perform_pull_steps(
1058 &self,
1059 cx: SafeJSContext,
1060 read_request: &ReadRequest,
1061 can_gc: CanGc,
1062 ) {
1063 match self.controller.borrow().as_ref() {
1064 Some(ControllerType::Default(controller)) => controller
1065 .get()
1066 .expect("Stream should have controller.")
1067 .perform_pull_steps(read_request, can_gc),
1068 Some(ControllerType::Byte(controller)) => controller
1069 .get()
1070 .expect("Stream should have controller.")
1071 .perform_pull_steps(cx, read_request, can_gc),
1072 None => {
1073 unreachable!("Stream does not have a controller.");
1074 },
1075 }
1076 }
1077
1078 pub(crate) fn perform_pull_into(
1082 &self,
1083 cx: SafeJSContext,
1084 read_into_request: &ReadIntoRequest,
1085 view: HeapBufferSource<ArrayBufferViewU8>,
1086 options: &ReadableStreamBYOBReaderReadOptions,
1087 can_gc: CanGc,
1088 ) {
1089 match self.controller.borrow().as_ref() {
1090 Some(ControllerType::Byte(controller)) => controller
1091 .get()
1092 .expect("Stream should have controller.")
1093 .perform_pull_into(cx, read_into_request, view, options, can_gc),
1094 _ => {
1095 unreachable!(
1096 "Pulling a chunk from a stream with a default controller using a BYOB reader"
1097 )
1098 },
1099 }
1100 }
1101
1102 pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
1104 match self.reader.borrow().as_ref() {
1105 Some(ReaderType::Default(reader)) => {
1106 let Some(reader) = reader.get() else {
1107 panic!("Attempt to add a read request without having first acquired a reader.");
1108 };
1109
1110 assert!(self.is_readable());
1112
1113 reader.add_read_request(read_request);
1115 },
1116 _ => {
1117 unreachable!("Adding a read request can only be done on a default reader.")
1118 },
1119 }
1120 }
1121
1122 pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
1124 match self.reader.borrow().as_ref() {
1125 Some(ReaderType::BYOB(reader)) => {
1127 let Some(reader) = reader.get() else {
1128 unreachable!(
1129 "Attempt to add a read into request without having first acquired a reader."
1130 );
1131 };
1132
1133 assert!(self.is_readable() || self.is_closed());
1135
1136 reader.add_read_into_request(read_request);
1138 },
1139 _ => {
1140 unreachable!("Adding a read into request can only be done on a BYOB reader.")
1141 },
1142 }
1143 }
1144
1145 pub(crate) fn enqueue_native(&self, bytes: Vec<u8>, can_gc: CanGc) {
1148 match self.controller.borrow().as_ref() {
1149 Some(ControllerType::Default(controller)) => controller
1150 .get()
1151 .expect("Stream should have controller.")
1152 .enqueue_native(bytes, can_gc),
1153 _ => {
1154 unreachable!(
1155 "Enqueueing chunk to a stream from Rust on other than default controller"
1156 );
1157 },
1158 }
1159 }
1160
1161 pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
1163 assert!(self.is_readable());
1165
1166 self.state.set(ReadableStreamState::Errored);
1168
1169 self.stored_error.set(e.get());
1171
1172 match self.reader.borrow().as_ref() {
1175 Some(ReaderType::Default(reader)) => {
1176 let Some(reader) = reader.get() else {
1177 return;
1179 };
1180
1181 reader.error(e, can_gc);
1183 },
1184 Some(ReaderType::BYOB(reader)) => {
1185 let Some(reader) = reader.get() else {
1186 return;
1188 };
1189
1190 reader.error_read_into_requests(e, can_gc);
1192 },
1193 None => {
1194 },
1196 }
1197 }
1198
1199 pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
1201 handle_mut.set(self.stored_error.get());
1202 }
1203
1204 pub(crate) fn error_native(&self, error: Error, can_gc: CanGc) {
1207 let cx = GlobalScope::get_cx();
1208 rooted!(in(*cx) let mut error_val = UndefinedValue());
1209 error.to_jsval(cx, &self.global(), error_val.handle_mut(), can_gc);
1210 self.error(error_val.handle(), can_gc);
1211 }
1212
1213 pub(crate) fn controller_close_native(&self, can_gc: CanGc) {
1216 match self.controller.borrow().as_ref() {
1217 Some(ControllerType::Default(controller)) => {
1218 let _ = controller
1219 .get()
1220 .expect("Stream should have controller.")
1221 .Close(can_gc);
1222 },
1223 _ => {
1224 unreachable!("Native closing is only done on default controllers.")
1225 },
1226 }
1227 }
1228
1229 pub(crate) fn in_memory(&self) -> bool {
1232 match self.controller.borrow().as_ref() {
1233 Some(ControllerType::Default(controller)) => controller
1234 .get()
1235 .expect("Stream should have controller.")
1236 .in_memory(),
1237 _ => {
1238 unreachable!(
1239 "Checking if source is in memory for a stream with a non-default controller"
1240 )
1241 },
1242 }
1243 }
1244
1245 pub(crate) fn get_in_memory_bytes(&self) -> Option<IpcSharedMemory> {
1248 match self.controller.borrow().as_ref() {
1249 Some(ControllerType::Default(controller)) => controller
1250 .get()
1251 .expect("Stream should have controller.")
1252 .get_in_memory_bytes()
1253 .as_deref()
1254 .map(IpcSharedMemory::from_bytes),
1255 _ => {
1256 unreachable!("Getting in-memory bytes for a stream with a non-default controller")
1257 },
1258 }
1259 }
1260
1261 pub(crate) fn acquire_default_reader(
1266 &self,
1267 can_gc: CanGc,
1268 ) -> Fallible<DomRoot<ReadableStreamDefaultReader>> {
1269 let reader = ReadableStreamDefaultReader::new(&self.global(), can_gc);
1271
1272 reader.set_up(self, &self.global(), can_gc)?;
1274
1275 Ok(reader)
1277 }
1278
1279 pub(crate) fn acquire_byob_reader(
1281 &self,
1282 can_gc: CanGc,
1283 ) -> Fallible<DomRoot<ReadableStreamBYOBReader>> {
1284 let reader = ReadableStreamBYOBReader::new(&self.global(), can_gc);
1286 reader.set_up(self, &self.global(), can_gc)?;
1288
1289 Ok(reader)
1291 }
1292
1293 pub(crate) fn get_default_controller(&self) -> DomRoot<ReadableStreamDefaultController> {
1294 match self.controller.borrow().as_ref() {
1295 Some(ControllerType::Default(controller)) => {
1296 controller.get().expect("Stream should have controller.")
1297 },
1298 _ => {
1299 unreachable!(
1300 "Getting default controller for a stream with a non-default controller"
1301 )
1302 },
1303 }
1304 }
1305
1306 pub(crate) fn get_default_reader(&self) -> DomRoot<ReadableStreamDefaultReader> {
1307 match self.reader.borrow().as_ref() {
1308 Some(ReaderType::Default(reader)) => reader.get().expect("Stream should have reader."),
1309 _ => {
1310 unreachable!("Getting default reader for a stream with a non-default reader")
1311 },
1312 }
1313 }
1314
1315 pub(crate) fn read_a_chunk(&self, can_gc: CanGc) -> Rc<Promise> {
1321 match self.reader.borrow().as_ref() {
1322 Some(ReaderType::Default(reader)) => {
1323 let Some(reader) = reader.get() else {
1324 unreachable!(
1325 "Attempt to read stream chunk without having first acquired a reader."
1326 );
1327 };
1328 reader.Read(can_gc)
1329 },
1330 _ => {
1331 unreachable!("Native reading of a chunk can only be done with a default reader.")
1332 },
1333 }
1334 }
1335
1336 pub(crate) fn stop_reading(&self, can_gc: CanGc) {
1341 let reader_ref = self.reader.borrow();
1342
1343 match reader_ref.as_ref() {
1344 Some(ReaderType::Default(reader)) => {
1345 let Some(reader) = reader.get() else {
1346 unreachable!("Attempt to stop reading without having first acquired a reader.");
1347 };
1348
1349 drop(reader_ref);
1350 reader.release(can_gc).expect("Reader release cannot fail.");
1351 },
1352 _ => {
1353 unreachable!("Native stop reading can only be done with a default reader.")
1354 },
1355 }
1356 }
1357
1358 pub(crate) fn is_locked(&self) -> bool {
1360 match self.reader.borrow().as_ref() {
1361 Some(ReaderType::Default(reader)) => reader.get().is_some(),
1362 Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1363 None => false,
1364 }
1365 }
1366
1367 pub(crate) fn is_disturbed(&self) -> bool {
1368 self.disturbed.get()
1369 }
1370
1371 pub(crate) fn set_is_disturbed(&self, disturbed: bool) {
1372 self.disturbed.set(disturbed);
1373 }
1374
1375 pub(crate) fn is_closed(&self) -> bool {
1376 self.state.get() == ReadableStreamState::Closed
1377 }
1378
1379 pub(crate) fn is_errored(&self) -> bool {
1380 self.state.get() == ReadableStreamState::Errored
1381 }
1382
1383 pub(crate) fn is_readable(&self) -> bool {
1384 self.state.get() == ReadableStreamState::Readable
1385 }
1386
1387 pub(crate) fn has_default_reader(&self) -> bool {
1388 match self.reader.borrow().as_ref() {
1389 Some(ReaderType::Default(reader)) => reader.get().is_some(),
1390 _ => false,
1391 }
1392 }
1393
1394 pub(crate) fn has_byob_reader(&self) -> bool {
1395 match self.reader.borrow().as_ref() {
1396 Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1397 _ => false,
1398 }
1399 }
1400
1401 pub(crate) fn has_byte_controller(&self) -> bool {
1402 match self.controller.borrow().as_ref() {
1403 Some(ControllerType::Byte(controller)) => controller.get().is_some(),
1404 _ => false,
1405 }
1406 }
1407
1408 pub(crate) fn get_num_read_requests(&self) -> usize {
1410 match self.reader.borrow().as_ref() {
1411 Some(ReaderType::Default(reader)) => {
1412 let reader = reader
1413 .get()
1414 .expect("Stream must have a reader when getting the number of read requests.");
1415 reader.get_num_read_requests()
1416 },
1417 _ => unreachable!(
1418 "Stream must have a default reader when get num read requests is called into."
1419 ),
1420 }
1421 }
1422
1423 pub(crate) fn get_num_read_into_requests(&self) -> usize {
1425 assert!(self.has_byob_reader());
1426
1427 match self.reader.borrow().as_ref() {
1428 Some(ReaderType::BYOB(reader)) => {
1429 let Some(reader) = reader.get() else {
1430 unreachable!(
1431 "Stream must have a reader when get num read into requests is called into."
1432 );
1433 };
1434 reader.get_num_read_into_requests()
1435 },
1436 _ => {
1437 unreachable!(
1438 "Stream must have a BYOB reader when get num read into requests is called into."
1439 );
1440 },
1441 }
1442 }
1443
1444 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
1446 pub(crate) fn fulfill_read_request(&self, chunk: SafeHandleValue, done: bool, can_gc: CanGc) {
1447 assert!(self.has_default_reader());
1449
1450 match self.reader.borrow().as_ref() {
1451 Some(ReaderType::Default(reader)) => {
1452 let reader = reader
1454 .get()
1455 .expect("Stream must have a reader when a read request is fulfilled.");
1456 assert_ne!(reader.get_num_read_requests(), 0);
1458 let request = reader.remove_read_request();
1461
1462 if done {
1463 request.close_steps(can_gc);
1465 } else {
1466 let result = RootedTraceableBox::new(Heap::default());
1468 result.set(*chunk);
1469 request.chunk_steps(result, can_gc);
1470 }
1471 },
1472 _ => {
1473 unreachable!(
1474 "Stream must have a default reader when fulfill read requests is called into."
1475 );
1476 },
1477 }
1478 }
1479
1480 pub(crate) fn fulfill_read_into_request(
1482 &self,
1483 chunk: SafeHandleValue,
1484 done: bool,
1485 can_gc: CanGc,
1486 ) {
1487 assert!(self.has_byob_reader());
1489
1490 match self.reader.borrow().as_ref() {
1492 Some(ReaderType::BYOB(reader)) => {
1493 let Some(reader) = reader.get() else {
1494 unreachable!(
1495 "Stream must have a reader when a read into request is fulfilled."
1496 );
1497 };
1498
1499 assert!(reader.get_num_read_into_requests() > 0);
1501
1502 let read_into_request = reader.remove_read_into_request();
1505
1506 let result = RootedTraceableBox::new(Heap::default());
1508 if done {
1509 result.set(*chunk);
1510 read_into_request.close_steps(Some(result), can_gc);
1511 } else {
1512 result.set(*chunk);
1514 read_into_request.chunk_steps(result, can_gc);
1515 }
1516 },
1517 _ => {
1518 unreachable!(
1519 "Stream must have a BYOB reader when fulfill read into requests is called into."
1520 );
1521 },
1522 };
1523 }
1524
1525 pub(crate) fn close(&self, can_gc: CanGc) {
1527 assert!(self.is_readable());
1529 self.state.set(ReadableStreamState::Closed);
1531 let default_reader = {
1537 let reader_ref = self.reader.borrow();
1538 match reader_ref.as_ref() {
1539 Some(ReaderType::Default(reader)) => reader.get(),
1540 _ => None,
1541 }
1542 };
1543
1544 if let Some(reader) = default_reader {
1545 reader.close(can_gc);
1547 return;
1548 }
1549
1550 let byob_reader = {
1552 let reader_ref = self.reader.borrow();
1553 match reader_ref.as_ref() {
1554 Some(ReaderType::BYOB(reader)) => reader.get(),
1555 _ => None,
1556 }
1557 };
1558
1559 if let Some(reader) = byob_reader {
1560 reader.close(can_gc);
1562 }
1563
1564 }
1566
1567 pub(crate) fn cancel(
1569 &self,
1570 cx: SafeJSContext,
1571 global: &GlobalScope,
1572 reason: SafeHandleValue,
1573 can_gc: CanGc,
1574 ) -> Rc<Promise> {
1575 self.disturbed.set(true);
1577
1578 if self.is_closed() {
1580 return Promise::new_resolved(global, cx, (), can_gc);
1581 }
1582 if self.is_errored() {
1584 let promise = Promise::new(global, can_gc);
1585 rooted!(in(*cx) let mut rval = UndefinedValue());
1586 self.stored_error
1587 .safe_to_jsval(cx, rval.handle_mut(), can_gc);
1588 promise.reject_native(&rval.handle(), can_gc);
1589 return promise;
1590 }
1591 self.close(can_gc);
1593
1594 if let Some(ReaderType::BYOB(reader)) = self.reader.borrow().as_ref() {
1596 if let Some(reader) = reader.get() {
1597 reader.cancel(can_gc);
1599 }
1600 }
1601
1602 let source_cancel_promise = match self.controller.borrow().as_ref() {
1605 Some(ControllerType::Default(controller)) => controller
1606 .get()
1607 .expect("Stream should have controller.")
1608 .perform_cancel_steps(cx, global, reason, can_gc),
1609 Some(ControllerType::Byte(controller)) => controller
1610 .get()
1611 .expect("Stream should have controller.")
1612 .perform_cancel_steps(cx, global, reason, can_gc),
1613 None => {
1614 panic!("Stream does not have a controller.");
1615 },
1616 };
1617
1618 let global = self.global();
1621 let result_promise = Promise::new(&global, can_gc);
1622 let fulfillment_handler = Box::new(SourceCancelPromiseFulfillmentHandler {
1623 result: result_promise.clone(),
1624 });
1625 let rejection_handler = Box::new(SourceCancelPromiseRejectionHandler {
1626 result: result_promise.clone(),
1627 });
1628 let handler = PromiseNativeHandler::new(
1629 &global,
1630 Some(fulfillment_handler),
1631 Some(rejection_handler),
1632 can_gc,
1633 );
1634 let realm = enter_realm(&*global);
1635 let comp = InRealm::Entered(&realm);
1636 source_cancel_promise.append_native_handler(&handler, comp, can_gc);
1637
1638 result_promise
1641 }
1642
1643 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
1644 pub(crate) fn set_reader(&self, new_reader: Option<ReaderType>) {
1645 *self.reader.borrow_mut() = new_reader;
1646 }
1647
1648 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
1650 fn default_tee(
1651 &self,
1652 clone_for_branch_2: bool,
1653 can_gc: CanGc,
1654 ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1655 let clone_for_branch_2 = Rc::new(Cell::new(clone_for_branch_2));
1659
1660 let reader = self.acquire_default_reader(can_gc)?;
1662 self.set_reader(Some(ReaderType::Default(MutNullableDom::new(Some(
1663 &reader,
1664 )))));
1665
1666 let reading = Rc::new(Cell::new(false));
1668 let read_again = Rc::new(Cell::new(false));
1670 let canceled_1 = Rc::new(Cell::new(false));
1672 let canceled_2 = Rc::new(Cell::new(false));
1674
1675 let reason_1 = Rc::new(Heap::boxed(UndefinedValue()));
1677 let reason_2 = Rc::new(Heap::boxed(UndefinedValue()));
1679 let cancel_promise = Promise::new(&self.global(), can_gc);
1681
1682 let tee_source_1 = DefaultTeeUnderlyingSource::new(
1683 &reader,
1684 self,
1685 reading.clone(),
1686 read_again.clone(),
1687 canceled_1.clone(),
1688 canceled_2.clone(),
1689 clone_for_branch_2.clone(),
1690 reason_1.clone(),
1691 reason_2.clone(),
1692 cancel_promise.clone(),
1693 TeeCancelAlgorithm::Cancel1Algorithm,
1694 can_gc,
1695 );
1696
1697 let underlying_source_type_branch_1 =
1698 UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_1));
1699
1700 let tee_source_2 = DefaultTeeUnderlyingSource::new(
1701 &reader,
1702 self,
1703 reading,
1704 read_again,
1705 canceled_1.clone(),
1706 canceled_2.clone(),
1707 clone_for_branch_2,
1708 reason_1,
1709 reason_2,
1710 cancel_promise.clone(),
1711 TeeCancelAlgorithm::Cancel2Algorithm,
1712 can_gc,
1713 );
1714
1715 let underlying_source_type_branch_2 =
1716 UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_2));
1717
1718 let branch_1 = create_readable_stream(
1720 &self.global(),
1721 underlying_source_type_branch_1,
1722 None,
1723 None,
1724 can_gc,
1725 );
1726 tee_source_1.set_branch_1(&branch_1);
1727 tee_source_2.set_branch_1(&branch_1);
1728
1729 let branch_2 = create_readable_stream(
1731 &self.global(),
1732 underlying_source_type_branch_2,
1733 None,
1734 None,
1735 can_gc,
1736 );
1737 tee_source_1.set_branch_2(&branch_2);
1738 tee_source_2.set_branch_2(&branch_2);
1739
1740 reader.append_native_handler_to_closed_promise(
1742 &branch_1,
1743 &branch_2,
1744 canceled_1,
1745 canceled_2,
1746 cancel_promise,
1747 can_gc,
1748 );
1749
1750 Ok(vec![branch_1, branch_2])
1752 }
1753
1754 #[allow(clippy::too_many_arguments)]
1756 pub(crate) fn pipe_to(
1757 &self,
1758 cx: SafeJSContext,
1759 global: &GlobalScope,
1760 dest: &WritableStream,
1761 prevent_close: bool,
1762 prevent_abort: bool,
1763 prevent_cancel: bool,
1764 signal: Option<&AbortSignal>,
1765 realm: InRealm,
1766 can_gc: CanGc,
1767 ) -> Rc<Promise> {
1768 assert!(!self.is_locked());
1779
1780 assert!(!dest.is_locked());
1782
1783 let reader = self
1791 .acquire_default_reader(can_gc)
1792 .expect("Acquiring a default reader for pipe_to cannot fail");
1793
1794 let writer = dest
1796 .aquire_default_writer(cx, global, can_gc)
1797 .expect("Acquiring a default writer for pipe_to cannot fail");
1798
1799 self.disturbed.set(true);
1801
1802 let promise = Promise::new(global, can_gc);
1807
1808 rooted!(in(*cx) let pipe_to = PipeTo {
1810 reader: Dom::from_ref(&reader),
1811 writer: Dom::from_ref(&writer),
1812 pending_writes: Default::default(),
1813 state: Default::default(),
1814 prevent_abort,
1815 prevent_cancel,
1816 prevent_close,
1817 shutting_down: Default::default(),
1818 abort_reason: Default::default(),
1819 shutdown_error: Default::default(),
1820 shutdown_action_promise: Default::default(),
1821 result_promise: promise.clone(),
1822 });
1823
1824 if let Some(signal) = signal {
1827 rooted!(in(*cx) let abort_algorithm = AbortAlgorithm::StreamPiping(pipe_to.clone()));
1830
1831 if signal.aborted() {
1833 signal.run_abort_algorithm(cx, global, &abort_algorithm, realm, can_gc);
1834 return promise;
1835 }
1836
1837 signal.add(&abort_algorithm);
1839 }
1840
1841 pipe_to.check_and_propagate_errors_forward(cx, global, realm, can_gc);
1843 pipe_to.check_and_propagate_errors_backward(cx, global, realm, can_gc);
1844 pipe_to.check_and_propagate_closing_forward(cx, global, realm, can_gc);
1845 pipe_to.check_and_propagate_closing_backward(cx, global, realm, can_gc);
1846
1847 if *pipe_to.state.borrow() == PipeToState::Starting {
1849 pipe_to.wait_for_writer_ready(global, realm, can_gc);
1851 }
1852
1853 promise
1855 }
1856
1857 fn tee(
1859 &self,
1860 clone_for_branch_2: bool,
1861 can_gc: CanGc,
1862 ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1863 match self.controller.borrow().as_ref() {
1867 Some(ControllerType::Default(_)) => {
1868 self.default_tee(clone_for_branch_2, can_gc)
1870 },
1871 Some(ControllerType::Byte(_)) => {
1872 Err(Error::Type(
1875 "Teeing is not yet supported for byte streams".to_owned(),
1876 ))
1877 },
1878 None => {
1879 unreachable!("Stream should have a controller.");
1880 },
1881 }
1882 }
1883
1884 pub(crate) fn set_up_byte_controller(
1886 &self,
1887 global: &GlobalScope,
1888 underlying_source_dict: JsUnderlyingSource,
1889 underlying_source_handle: SafeHandleObject,
1890 stream: DomRoot<ReadableStream>,
1891 strategy_hwm: f64,
1892 can_gc: CanGc,
1893 ) -> Fallible<()> {
1894 if let Some(0) = underlying_source_dict.autoAllocateChunkSize {
1910 return Err(Error::Type("autoAllocateChunkSize cannot be 0".to_owned()));
1911 }
1912
1913 let controller = ReadableByteStreamController::new(
1914 UnderlyingSourceType::Js(underlying_source_dict, Heap::default()),
1915 strategy_hwm,
1916 global,
1917 can_gc,
1918 );
1919
1920 controller.set_underlying_source_this_object(underlying_source_handle);
1923
1924 controller.setup(global, stream, can_gc)
1927 }
1928
1929 pub(crate) fn setup_cross_realm_transform_readable(
1931 &self,
1932 cx: SafeJSContext,
1933 port: &MessagePort,
1934 can_gc: CanGc,
1935 ) {
1936 let port_id = port.message_port_id();
1937 let global = self.global();
1938
1939 let size_algorithm = extract_size_algorithm(&QueuingStrategy::default(), can_gc);
1944
1945 let controller = ReadableStreamDefaultController::new(
1949 &self.global(),
1950 UnderlyingSourceType::Transfer(Dom::from_ref(port)),
1951 0.,
1952 size_algorithm,
1953 can_gc,
1954 );
1955
1956 rooted!(in(*cx) let cross_realm_transform_readable = CrossRealmTransformReadable {
1959 controller: Dom::from_ref(&controller),
1960 });
1961 global.note_cross_realm_transform_readable(&cross_realm_transform_readable, port_id);
1962
1963 port.Start(can_gc);
1965
1966 controller
1968 .setup(DomRoot::from_ref(self), can_gc)
1969 .expect("Setting up controller for transfer cannot fail.");
1970 }
1971}
1972
1973impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
1974 fn Constructor(
1976 cx: SafeJSContext,
1977 global: &GlobalScope,
1978 proto: Option<SafeHandleObject>,
1979 can_gc: CanGc,
1980 underlying_source: Option<*mut JSObject>,
1981 strategy: &QueuingStrategy,
1982 ) -> Fallible<DomRoot<Self>> {
1983 rooted!(in(*cx) let underlying_source_obj = underlying_source.unwrap_or(ptr::null_mut()));
1985 let underlying_source_dict = if !underlying_source_obj.is_null() {
1988 rooted!(in(*cx) let obj_val = ObjectValue(underlying_source_obj.get()));
1989 match JsUnderlyingSource::new(cx, obj_val.handle(), can_gc) {
1990 Ok(ConversionResult::Success(val)) => val,
1991 Ok(ConversionResult::Failure(error)) => return Err(Error::Type(error.to_string())),
1992 _ => {
1993 return Err(Error::JSFailed);
1994 },
1995 }
1996 } else {
1997 JsUnderlyingSource::empty()
1998 };
1999
2000 let stream = ReadableStream::new_with_proto(global, proto, can_gc);
2002
2003 if underlying_source_dict.type_.is_some() {
2004 if strategy.size.is_some() {
2006 return Err(Error::Range(
2007 "size is not supported for byte streams".to_owned(),
2008 ));
2009 }
2010
2011 let strategy_hwm = extract_high_water_mark(strategy, 0.0)?;
2013
2014 stream.set_up_byte_controller(
2017 global,
2018 underlying_source_dict,
2019 underlying_source_obj.handle(),
2020 stream.clone(),
2021 strategy_hwm,
2022 can_gc,
2023 )?;
2024 } else {
2025 let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
2027
2028 let size_algorithm = extract_size_algorithm(strategy, can_gc);
2030
2031 let controller = ReadableStreamDefaultController::new(
2032 global,
2033 UnderlyingSourceType::Js(underlying_source_dict, Heap::default()),
2034 high_water_mark,
2035 size_algorithm,
2036 can_gc,
2037 );
2038
2039 controller.set_underlying_source_this_object(underlying_source_obj.handle());
2042
2043 controller.setup(stream.clone(), can_gc)?;
2045 };
2046
2047 Ok(stream)
2048 }
2049
2050 fn Locked(&self) -> bool {
2052 self.is_locked()
2053 }
2054
2055 fn Cancel(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
2057 let global = self.global();
2058 if self.is_locked() {
2059 let promise = Promise::new(&global, can_gc);
2062 promise.reject_error(Error::Type("stream is locked".to_owned()), can_gc);
2063 promise
2064 } else {
2065 self.cancel(cx, &global, reason, can_gc)
2067 }
2068 }
2069
2070 fn GetReader(
2072 &self,
2073 options: &ReadableStreamGetReaderOptions,
2074 can_gc: CanGc,
2075 ) -> Fallible<ReadableStreamReader> {
2076 if options.mode.is_none() {
2078 return Ok(ReadableStreamReader::ReadableStreamDefaultReader(
2079 self.acquire_default_reader(can_gc)?,
2080 ));
2081 }
2082 assert!(options.mode.unwrap() == ReadableStreamReaderMode::Byob);
2084
2085 Ok(ReadableStreamReader::ReadableStreamBYOBReader(
2087 self.acquire_byob_reader(can_gc)?,
2088 ))
2089 }
2090
2091 fn Tee(&self, can_gc: CanGc) -> Fallible<Vec<DomRoot<ReadableStream>>> {
2093 self.tee(false, can_gc)
2095 }
2096
2097 fn PipeTo(
2099 &self,
2100 destination: &WritableStream,
2101 options: &StreamPipeOptions,
2102 realm: InRealm,
2103 can_gc: CanGc,
2104 ) -> Rc<Promise> {
2105 let cx = GlobalScope::get_cx();
2106 let global = self.global();
2107
2108 if self.is_locked() {
2110 let promise = Promise::new(&global, can_gc);
2112 promise.reject_error(Error::Type("Source stream is locked".to_owned()), can_gc);
2113 return promise;
2114 }
2115
2116 if destination.is_locked() {
2118 let promise = Promise::new(&global, can_gc);
2120 promise.reject_error(
2121 Error::Type("Destination stream is locked".to_owned()),
2122 can_gc,
2123 );
2124 return promise;
2125 }
2126
2127 let signal = options.signal.as_deref();
2129
2130 self.pipe_to(
2132 cx,
2133 &global,
2134 destination,
2135 options.preventClose,
2136 options.preventAbort,
2137 options.preventCancel,
2138 signal,
2139 realm,
2140 can_gc,
2141 )
2142 }
2143
2144 fn PipeThrough(
2146 &self,
2147 transform: &ReadableWritablePair,
2148 options: &StreamPipeOptions,
2149 realm: InRealm,
2150 can_gc: CanGc,
2151 ) -> Fallible<DomRoot<ReadableStream>> {
2152 let global = self.global();
2153 let cx = GlobalScope::get_cx();
2154
2155 if self.is_locked() {
2157 return Err(Error::Type("Source stream is locked".to_owned()));
2158 }
2159
2160 if transform.writable.is_locked() {
2162 return Err(Error::Type("Destination stream is locked".to_owned()));
2163 }
2164
2165 let signal = options.signal.as_deref();
2167
2168 let promise = self.pipe_to(
2171 cx,
2172 &global,
2173 &transform.writable,
2174 options.preventClose,
2175 options.preventAbort,
2176 options.preventCancel,
2177 signal,
2178 realm,
2179 can_gc,
2180 );
2181
2182 promise.set_promise_is_handled();
2184
2185 Ok(transform.readable.clone())
2187 }
2188}
2189
2190#[expect(unsafe_code)]
2191pub(crate) unsafe fn get_type_and_value_from_message(
2195 cx: SafeJSContext,
2196 data: SafeHandleValue,
2197 value: SafeMutableHandleValue,
2198 can_gc: CanGc,
2199) -> DOMString {
2200 assert!(data.is_object());
2206 rooted!(in(*cx) let data_object = data.to_object());
2207
2208 rooted!(in(*cx) let mut type_ = UndefinedValue());
2210 unsafe {
2211 get_dictionary_property(
2212 *cx,
2213 data_object.handle(),
2214 "type",
2215 type_.handle_mut(),
2216 can_gc,
2217 )
2218 }
2219 .expect("Getting the type should not fail.");
2220
2221 unsafe { get_dictionary_property(*cx, data_object.handle(), "value", value, can_gc) }
2223 .expect("Getting the value should not fail.");
2224
2225 let result =
2227 DOMString::safe_from_jsval(cx, type_.handle(), StringificationBehavior::Empty, can_gc)
2228 .expect("The type of the message should be a string");
2229 let ConversionResult::Success(type_string) = result else {
2230 unreachable!("The type of the message should be a string");
2231 };
2232
2233 type_string
2234}
2235
2236impl js::gc::Rootable for CrossRealmTransformReadable {}
2237
2238#[derive(Clone, JSTraceable, MallocSizeOf)]
2242#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
2243pub(crate) struct CrossRealmTransformReadable {
2244 controller: Dom<ReadableStreamDefaultController>,
2246}
2247
2248impl CrossRealmTransformReadable {
2249 #[expect(unsafe_code)]
2252 pub(crate) fn handle_message(
2253 &self,
2254 cx: SafeJSContext,
2255 global: &GlobalScope,
2256 port: &MessagePort,
2257 message: SafeHandleValue,
2258 _realm: InRealm,
2259 can_gc: CanGc,
2260 ) {
2261 rooted!(in(*cx) let mut value = UndefinedValue());
2262 let type_string =
2263 unsafe { get_type_and_value_from_message(cx, message, value.handle_mut(), can_gc) };
2264
2265 if type_string == "chunk" {
2267 self.controller
2269 .enqueue(cx, value.handle(), can_gc)
2270 .expect("Enqueing a chunk should not fail.");
2271 }
2272
2273 if type_string == "close" {
2275 self.controller.close(can_gc);
2277
2278 global.disentangle_port(port, can_gc);
2280 }
2281
2282 if type_string == "error" {
2284 self.controller.error(value.handle(), can_gc);
2286
2287 global.disentangle_port(port, can_gc);
2289 }
2290 }
2291
2292 pub(crate) fn handle_error(
2295 &self,
2296 cx: SafeJSContext,
2297 global: &GlobalScope,
2298 port: &MessagePort,
2299 _realm: InRealm,
2300 can_gc: CanGc,
2301 ) {
2302 let error = DOMException::new(global, DOMErrorName::DataCloneError, can_gc);
2304 rooted!(in(*cx) let mut rooted_error = UndefinedValue());
2305 error.safe_to_jsval(cx, rooted_error.handle_mut(), can_gc);
2306
2307 port.cross_realm_transform_send_error(rooted_error.handle(), can_gc);
2309
2310 self.controller.error(rooted_error.handle(), can_gc);
2312
2313 global.disentangle_port(port, can_gc);
2315 }
2316}
2317
2318#[expect(unsafe_code)]
2319pub(crate) fn get_read_promise_done(
2321 cx: SafeJSContext,
2322 v: &SafeHandleValue,
2323 can_gc: CanGc,
2324) -> Result<bool, Error> {
2325 if !v.is_object() {
2326 return Err(Error::Type("Unknown format for done property.".to_string()));
2327 }
2328 unsafe {
2329 rooted!(in(*cx) let object = v.to_object());
2330 rooted!(in(*cx) let mut done = UndefinedValue());
2331 match get_dictionary_property(*cx, object.handle(), "done", done.handle_mut(), can_gc) {
2332 Ok(true) => match bool::safe_from_jsval(cx, done.handle(), (), can_gc) {
2333 Ok(ConversionResult::Success(val)) => Ok(val),
2334 Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.to_string())),
2335 _ => Err(Error::Type("Unknown format for done property.".to_string())),
2336 },
2337 Ok(false) => Err(Error::Type("Promise has no done property.".to_string())),
2338 Err(()) => Err(Error::JSFailed),
2339 }
2340 }
2341}
2342
2343#[expect(unsafe_code)]
2344pub(crate) fn get_read_promise_bytes(
2346 cx: SafeJSContext,
2347 v: &SafeHandleValue,
2348 can_gc: CanGc,
2349) -> Result<Vec<u8>, Error> {
2350 if !v.is_object() {
2351 return Err(Error::Type(
2352 "Unknown format for for bytes read.".to_string(),
2353 ));
2354 }
2355 unsafe {
2356 rooted!(in(*cx) let object = v.to_object());
2357 rooted!(in(*cx) let mut bytes = UndefinedValue());
2358 match get_dictionary_property(*cx, object.handle(), "value", bytes.handle_mut(), can_gc) {
2359 Ok(true) => {
2360 match Vec::<u8>::safe_from_jsval(
2361 cx,
2362 bytes.handle(),
2363 ConversionBehavior::EnforceRange,
2364 can_gc,
2365 ) {
2366 Ok(ConversionResult::Success(val)) => Ok(val),
2367 Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.to_string())),
2368 _ => Err(Error::Type("Unknown format for bytes read.".to_string())),
2369 }
2370 },
2371 Ok(false) => Err(Error::Type("Promise has no value property.".to_string())),
2372 Err(()) => Err(Error::JSFailed),
2373 }
2374 }
2375}
2376
2377pub(crate) fn bytes_from_chunk_jsval(
2381 cx: SafeJSContext,
2382 chunk: &RootedTraceableBox<Heap<JSVal>>,
2383 can_gc: CanGc,
2384) -> Result<Vec<u8>, Error> {
2385 match Vec::<u8>::safe_from_jsval(cx, chunk.handle(), ConversionBehavior::EnforceRange, can_gc) {
2386 Ok(ConversionResult::Success(vec)) => Ok(vec),
2387 Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.to_string())),
2388 _ => Err(Error::Type("Unknown format for bytes read.".to_string())),
2389 }
2390}
2391
2392impl Transferable for ReadableStream {
2394 type Index = MessagePortIndex;
2395 type Data = MessagePortImpl;
2396
2397 fn transfer(&self) -> Fallible<(MessagePortId, MessagePortImpl)> {
2399 if self.is_locked() {
2402 return Err(Error::DataClone(None));
2403 }
2404
2405 let global = self.global();
2406 let realm = enter_realm(&*global);
2407 let comp = InRealm::Entered(&realm);
2408 let cx = GlobalScope::get_cx();
2409 let can_gc = CanGc::note();
2410
2411 let port_1 = MessagePort::new(&global, can_gc);
2413 global.track_message_port(&port_1, None);
2414
2415 let port_2 = MessagePort::new(&global, can_gc);
2417 global.track_message_port(&port_2, None);
2418
2419 global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
2421
2422 let writable = WritableStream::new_with_proto(&global, None, can_gc);
2424
2425 writable.setup_cross_realm_transform_writable(cx, &port_1, can_gc);
2427
2428 let promise = self.pipe_to(
2430 cx, &global, &writable, false, false, false, None, comp, can_gc,
2431 );
2432
2433 promise.set_promise_is_handled();
2435
2436 port_2.transfer()
2438 }
2439
2440 fn transfer_receive(
2442 owner: &GlobalScope,
2443 id: MessagePortId,
2444 port_impl: MessagePortImpl,
2445 ) -> Result<DomRoot<Self>, ()> {
2446 let cx = GlobalScope::get_cx();
2447 let can_gc = CanGc::note();
2448
2449 let value = ReadableStream::new_with_proto(owner, None, can_gc);
2452
2453 let transferred_port = MessagePort::transfer_receive(owner, id, port_impl)?;
2460
2461 value.setup_cross_realm_transform_readable(cx, &transferred_port, can_gc);
2463 Ok(value)
2464 }
2465
2466 fn serialized_storage<'a>(
2468 data: StructuredData<'a, '_>,
2469 ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
2470 match data {
2471 StructuredData::Reader(r) => &mut r.port_impls,
2472 StructuredData::Writer(w) => &mut w.ports,
2473 }
2474 }
2475}