1use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr::{self};
8use std::rc::Rc;
9
10use base::id::{MessagePortId, MessagePortIndex};
11use constellation_traits::MessagePortImpl;
12use dom_struct::dom_struct;
13use ipc_channel::ipc::IpcSharedMemory;
14use js::jsapi::{Heap, JSObject};
15use js::jsval::{JSVal, ObjectValue, UndefinedValue};
16use js::rust::{
17 HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
18 MutableHandleValue as SafeMutableHandleValue,
19};
20use js::typedarray::ArrayBufferViewU8;
21use rustc_hash::FxHashMap;
22use script_bindings::conversions::SafeToJSValConvertible;
23
24use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
25use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{
26 ReadableStreamGetReaderOptions, ReadableStreamMethods, ReadableStreamReaderMode,
27 ReadableWritablePair, StreamPipeOptions,
28};
29use script_bindings::str::DOMString;
30
31use crate::dom::domexception::{DOMErrorName, DOMException};
32use script_bindings::conversions::{is_array_like, StringificationBehavior};
33use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
34use crate::dom::abortsignal::{AbortAlgorithm, AbortSignal};
35use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods;
36use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultController_Binding::ReadableStreamDefaultControllerMethods;
37use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource;
38use crate::dom::bindings::conversions::{ConversionBehavior, ConversionResult, SafeFromJSValConvertible};
39use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
40use crate::dom::bindings::codegen::GenericBindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriter_Binding::WritableStreamDefaultWriterMethods;
41use crate::dom::writablestream::WritableStream;
42use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultReaderOrReadableStreamBYOBReader as ReadableStreamReader;
43use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
44use crate::dom::bindings::root::{DomRoot, MutNullableDom, Dom};
45use crate::dom::bindings::trace::RootedTraceableBox;
46use crate::dom::bindings::utils::get_dictionary_property;
47use crate::dom::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
48use crate::dom::readablestreamgenericreader::ReadableStreamGenericReader;
49use crate::dom::globalscope::GlobalScope;
50use crate::dom::promise::{wait_for_all_promise, Promise};
51use crate::dom::readablebytestreamcontroller::ReadableByteStreamController;
52use crate::dom::readablestreambyobreader::ReadableStreamBYOBReader;
53use crate::dom::readablestreamdefaultcontroller::ReadableStreamDefaultController;
54use crate::dom::readablestreamdefaultreader::{ReadRequest, ReadableStreamDefaultReader};
55use crate::dom::defaultteeunderlyingsource::TeeCancelAlgorithm;
56use crate::dom::types::DefaultTeeUnderlyingSource;
57use crate::dom::underlyingsourcecontainer::UnderlyingSourceType;
58use crate::dom::writablestreamdefaultwriter::WritableStreamDefaultWriter;
59use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
60use crate::dom::messageport::MessagePort;
61use crate::realms::{enter_realm, InRealm};
62use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
63use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
64use crate::dom::bindings::transferable::Transferable;
65use crate::dom::bindings::structuredclone::StructuredData;
66
67use super::bindings::buffer_source::HeapBufferSource;
68use super::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::ReadableStreamBYOBReaderReadOptions;
69use super::readablestreambyobreader::ReadIntoRequest;
70
71#[derive(Clone, Debug, Default, MallocSizeOf, PartialEq)]
73enum PipeToState {
74 #[default]
76 Starting,
77 PendingReady,
79 PendingRead,
81 ShuttingDownWithPendingWrites(Option<ShutdownAction>),
84 ShuttingDownPendingAction,
88 Finalized,
91}
92
93#[derive(Clone, Debug, MallocSizeOf, PartialEq)]
95enum ShutdownAction {
96 WritableStreamAbort,
98 ReadableStreamCancel,
100 WritableStreamDefaultWriterCloseWithErrorPropagation,
102 Abort,
104}
105
106impl js::gc::Rootable for PipeTo {}
107
108#[derive(Clone, JSTraceable, MallocSizeOf)]
117#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
118pub(crate) struct PipeTo {
119 reader: Dom<ReadableStreamDefaultReader>,
121
122 writer: Dom<WritableStreamDefaultWriter>,
124
125 #[ignore_malloc_size_of = "nested Rc"]
128 pending_writes: Rc<RefCell<VecDeque<Rc<Promise>>>>,
129
130 #[conditional_malloc_size_of]
132 #[no_trace]
133 state: Rc<RefCell<PipeToState>>,
134
135 prevent_abort: bool,
137
138 prevent_cancel: bool,
140
141 prevent_close: bool,
143
144 #[conditional_malloc_size_of]
147 shutting_down: Rc<Cell<bool>>,
148
149 #[ignore_malloc_size_of = "mozjs"]
152 abort_reason: Rc<Heap<JSVal>>,
153
154 #[ignore_malloc_size_of = "mozjs"]
157 shutdown_error: Rc<RefCell<Option<Heap<JSVal>>>>,
158
159 #[ignore_malloc_size_of = "nested Rc"]
162 shutdown_action_promise: Rc<RefCell<Option<Rc<Promise>>>>,
163
164 #[conditional_malloc_size_of]
167 result_promise: Rc<Promise>,
168}
169
170impl PipeTo {
171 pub(crate) fn abort_with_reason(
174 &self,
175 cx: SafeJSContext,
176 global: &GlobalScope,
177 reason: SafeHandleValue,
178 realm: InRealm,
179 can_gc: CanGc,
180 ) {
181 if self.shutting_down.get() {
183 return;
184 }
185
186 self.abort_reason.set(reason.get());
190
191 self.set_shutdown_error(reason);
196
197 self.shutdown(cx, global, Some(ShutdownAction::Abort), realm, can_gc);
203 }
204}
205
206impl Callback for PipeTo {
207 #[expect(unsafe_code)]
216 fn callback(&self, cx: SafeJSContext, result: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
217 let global = self.reader.global();
218
219 self.pending_writes.borrow_mut().retain(|p| {
225 let pending = p.is_pending();
226 if !pending {
227 p.set_promise_is_handled();
228 }
229 pending
230 });
231
232 let state_before_checks = self.state.borrow().clone();
234
235 if state_before_checks == PipeToState::PendingRead {
242 let source = self.reader.get_stream().expect("Source stream must be set");
243 if source.is_closed() {
244 let dest = self
245 .writer
246 .get_stream()
247 .expect("Destination stream must be set");
248
249 if dest.is_writable() && !dest.close_queued_or_in_flight() {
252 let Ok(done) = get_read_promise_done(cx, &result, can_gc) else {
253 return;
260 };
261
262 if !done {
263 self.write_chunk(cx, &global, result, can_gc);
265 }
266 }
267 }
268 }
269
270 self.check_and_propagate_errors_forward(cx, &global, realm, can_gc);
271 self.check_and_propagate_errors_backward(cx, &global, realm, can_gc);
272 self.check_and_propagate_closing_forward(cx, &global, realm, can_gc);
273 self.check_and_propagate_closing_backward(cx, &global, realm, can_gc);
274
275 let state = self.state.borrow().clone();
277
278 if state != state_before_checks {
282 return;
283 }
284
285 match state {
286 PipeToState::Starting => unreachable!("PipeTo should not be in the Starting state."),
287 PipeToState::PendingReady => {
288 self.read_chunk(&global, realm, can_gc);
290 },
291 PipeToState::PendingRead => {
292 self.write_chunk(cx, &global, result, can_gc);
294
295 if self.shutting_down.get() {
297 return;
298 }
299
300 self.wait_for_writer_ready(&global, realm, can_gc);
302 },
303 PipeToState::ShuttingDownWithPendingWrites(action) => {
304 if let Some(write) = self.pending_writes.borrow_mut().front().cloned() {
307 self.wait_on_pending_write(&global, write, realm, can_gc);
308 return;
309 }
310
311 if let Some(action) = action {
313 self.perform_action(cx, &global, action, realm, can_gc);
315 } else {
316 self.finalize(cx, &global, can_gc);
318 }
319 },
320 PipeToState::ShuttingDownPendingAction => {
321 let Some(ref promise) = *self.shutdown_action_promise.borrow() else {
322 unreachable!();
323 };
324 if promise.is_pending() {
325 return;
329 }
330
331 let is_array_like = {
332 if !result.is_object() {
333 false
334 } else {
335 unsafe { is_array_like::<crate::DomTypeHolder>(*cx, result) }
336 }
337 };
338
339 if !result.is_undefined() && !is_array_like {
341 self.set_shutdown_error(result);
351 }
352 self.finalize(cx, &global, can_gc);
353 },
354 PipeToState::Finalized => {},
355 }
356 }
357}
358
359impl PipeTo {
360 fn set_shutdown_error(&self, error: SafeHandleValue) {
363 *self.shutdown_error.borrow_mut() = Some(Heap::default());
364 let Some(ref heap) = *self.shutdown_error.borrow() else {
365 unreachable!("Option set to Some(heap) above.");
366 };
367 heap.set(error.get())
368 }
369
370 fn wait_for_writer_ready(&self, global: &GlobalScope, realm: InRealm, can_gc: CanGc) {
373 {
374 let mut state = self.state.borrow_mut();
375 *state = PipeToState::PendingReady;
376 }
377
378 let ready_promise = self.writer.Ready();
379 if ready_promise.is_fulfilled() {
380 self.read_chunk(global, realm, can_gc);
381 } else {
382 let handler = PromiseNativeHandler::new(
383 global,
384 Some(Box::new(self.clone())),
385 Some(Box::new(self.clone())),
386 can_gc,
387 );
388 ready_promise.append_native_handler(&handler, realm, can_gc);
389
390 let closed_promise = self.reader.Closed();
394 closed_promise.append_native_handler(&handler, realm, can_gc);
395 }
396 }
397
398 fn read_chunk(&self, global: &GlobalScope, realm: InRealm, can_gc: CanGc) {
400 *self.state.borrow_mut() = PipeToState::PendingRead;
401 let chunk_promise = self.reader.Read(can_gc);
402 let handler = PromiseNativeHandler::new(
403 global,
404 Some(Box::new(self.clone())),
405 Some(Box::new(self.clone())),
406 can_gc,
407 );
408 chunk_promise.append_native_handler(&handler, realm, can_gc);
409
410 let ready_promise = self.writer.Closed();
413 ready_promise.append_native_handler(&handler, realm, can_gc);
414 }
415
416 #[expect(unsafe_code)]
419 fn write_chunk(
420 &self,
421 cx: SafeJSContext,
422 global: &GlobalScope,
423 chunk: SafeHandleValue,
424 can_gc: CanGc,
425 ) -> bool {
426 if chunk.is_object() {
427 rooted!(in(*cx) let object = chunk.to_object());
428 rooted!(in(*cx) let mut bytes = UndefinedValue());
429 let has_value = unsafe {
430 get_dictionary_property(*cx, object.handle(), "value", bytes.handle_mut(), can_gc)
431 .expect("Chunk should have a value.")
432 };
433 if has_value {
434 let write_promise = self.writer.write(cx, global, bytes.handle(), can_gc);
436 self.pending_writes.borrow_mut().push_back(write_promise);
437 return true;
438 }
439 }
440 false
441 }
442
443 fn wait_on_pending_write(
447 &self,
448 global: &GlobalScope,
449 promise: Rc<Promise>,
450 realm: InRealm,
451 can_gc: CanGc,
452 ) {
453 let handler = PromiseNativeHandler::new(
454 global,
455 Some(Box::new(self.clone())),
456 Some(Box::new(self.clone())),
457 can_gc,
458 );
459 promise.append_native_handler(&handler, realm, can_gc);
460 }
461
462 fn check_and_propagate_errors_forward(
465 &self,
466 cx: SafeJSContext,
467 global: &GlobalScope,
468 realm: InRealm,
469 can_gc: CanGc,
470 ) {
471 if self.shutting_down.get() {
474 return;
475 }
476
477 let source = self
479 .reader
480 .get_stream()
481 .expect("Reader should still have a stream");
482 if source.is_errored() {
483 rooted!(in(*cx) let mut source_error = UndefinedValue());
484 source.get_stored_error(source_error.handle_mut());
485 self.set_shutdown_error(source_error.handle());
486
487 if !self.prevent_abort {
489 self.shutdown(
492 cx,
493 global,
494 Some(ShutdownAction::WritableStreamAbort),
495 realm,
496 can_gc,
497 )
498 } else {
499 self.shutdown(cx, global, None, realm, can_gc);
501 }
502 }
503 }
504
505 fn check_and_propagate_errors_backward(
508 &self,
509 cx: SafeJSContext,
510 global: &GlobalScope,
511 realm: InRealm,
512 can_gc: CanGc,
513 ) {
514 if self.shutting_down.get() {
517 return;
518 }
519
520 let dest = self
522 .writer
523 .get_stream()
524 .expect("Writer should still have a stream");
525 if dest.is_errored() {
526 rooted!(in(*cx) let mut dest_error = UndefinedValue());
527 dest.get_stored_error(dest_error.handle_mut());
528 self.set_shutdown_error(dest_error.handle());
529
530 if !self.prevent_cancel {
532 self.shutdown(
535 cx,
536 global,
537 Some(ShutdownAction::ReadableStreamCancel),
538 realm,
539 can_gc,
540 )
541 } else {
542 self.shutdown(cx, global, None, realm, can_gc);
544 }
545 }
546 }
547
548 fn check_and_propagate_closing_forward(
551 &self,
552 cx: SafeJSContext,
553 global: &GlobalScope,
554 realm: InRealm,
555 can_gc: CanGc,
556 ) {
557 if self.shutting_down.get() {
560 return;
561 }
562
563 let source = self
565 .reader
566 .get_stream()
567 .expect("Reader should still have a stream");
568 if source.is_closed() {
569 if !self.prevent_close {
571 self.shutdown(
574 cx,
575 global,
576 Some(ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation),
577 realm,
578 can_gc,
579 )
580 } else {
581 self.shutdown(cx, global, None, realm, can_gc);
583 }
584 }
585 }
586
587 fn check_and_propagate_closing_backward(
590 &self,
591 cx: SafeJSContext,
592 global: &GlobalScope,
593 realm: InRealm,
594 can_gc: CanGc,
595 ) {
596 if self.shutting_down.get() {
599 return;
600 }
601
602 let dest = self
605 .writer
606 .get_stream()
607 .expect("Writer should still have a stream");
608 if dest.close_queued_or_in_flight() || dest.is_closed() {
609 rooted!(in(*cx) let mut dest_closed = UndefinedValue());
614 let error =
615 Error::Type("Destination is closed or has closed queued or in flight".to_string());
616 error.to_jsval(cx, global, dest_closed.handle_mut(), can_gc);
617 self.set_shutdown_error(dest_closed.handle());
618
619 if !self.prevent_cancel {
621 self.shutdown(
624 cx,
625 global,
626 Some(ShutdownAction::ReadableStreamCancel),
627 realm,
628 can_gc,
629 )
630 } else {
631 self.shutdown(cx, global, None, realm, can_gc);
633 }
634 }
635 }
636
637 fn shutdown(
641 &self,
642 cx: SafeJSContext,
643 global: &GlobalScope,
644 action: Option<ShutdownAction>,
645 realm: InRealm,
646 can_gc: CanGc,
647 ) {
648 if !self.shutting_down.replace(true) {
651 let dest = self.writer.get_stream().expect("Stream must be set");
652 if dest.is_writable() && !dest.close_queued_or_in_flight() {
655 if let Some(write) = self.pending_writes.borrow_mut().front() {
661 *self.state.borrow_mut() = PipeToState::ShuttingDownWithPendingWrites(action);
662 self.wait_on_pending_write(global, write.clone(), realm, can_gc);
663 return;
664 }
665 }
666
667 if let Some(action) = action {
669 self.perform_action(cx, global, action, realm, can_gc);
671 } else {
672 self.finalize(cx, global, can_gc);
674 }
675 }
676 }
677
678 fn perform_action(
681 &self,
682 cx: SafeJSContext,
683 global: &GlobalScope,
684 action: ShutdownAction,
685 realm: InRealm,
686 can_gc: CanGc,
687 ) {
688 rooted!(in(*cx) let mut error = UndefinedValue());
689 if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
690 error.set(shutdown_error.get());
691 }
692
693 *self.state.borrow_mut() = PipeToState::ShuttingDownPendingAction;
694
695 let promise = match action {
697 ShutdownAction::WritableStreamAbort => {
698 let dest = self.writer.get_stream().expect("Stream must be set");
699 dest.abort(cx, global, error.handle(), realm, can_gc)
700 },
701 ShutdownAction::ReadableStreamCancel => {
702 let source = self
703 .reader
704 .get_stream()
705 .expect("Reader should have a stream.");
706 source.cancel(cx, global, error.handle(), can_gc)
707 },
708 ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation => {
709 self.writer.close_with_error_propagation(cx, global, can_gc)
710 },
711 ShutdownAction::Abort => {
712 rooted!(in(*cx) let mut error = UndefinedValue());
717 error.set(self.abort_reason.get());
718
719 let mut actions = vec![];
721
722 if !self.prevent_abort {
724 let dest = self
725 .writer
726 .get_stream()
727 .expect("Destination stream must be set");
728
729 let promise = if dest.is_writable() {
731 dest.abort(cx, global, error.handle(), realm, can_gc)
733 } else {
734 Promise::new_resolved(global, cx, (), can_gc)
736 };
737 actions.push(promise);
738 }
739
740 if !self.prevent_cancel {
742 let source = self.reader.get_stream().expect("Source stream must be set");
743
744 let promise = if source.is_readable() {
746 source.cancel(cx, global, error.handle(), can_gc)
748 } else {
749 Promise::new_resolved(global, cx, (), can_gc)
751 };
752 actions.push(promise);
753 }
754
755 wait_for_all_promise(cx, global, actions, realm, can_gc)
759 },
760 };
761
762 let handler = PromiseNativeHandler::new(
765 global,
766 Some(Box::new(self.clone())),
767 Some(Box::new(self.clone())),
768 can_gc,
769 );
770 promise.append_native_handler(&handler, realm, can_gc);
771 *self.shutdown_action_promise.borrow_mut() = Some(promise);
772 }
773
774 fn finalize(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
776 *self.state.borrow_mut() = PipeToState::Finalized;
777
778 self.writer.release(cx, global, can_gc);
780
781 self.reader
787 .release(can_gc)
788 .expect("Releasing the reader should not fail");
789
790 if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
796 rooted!(in(*cx) let mut error = UndefinedValue());
797 error.set(shutdown_error.get());
798 self.result_promise.reject_native(&error.handle(), can_gc);
800 } else {
801 self.result_promise.resolve_native(&(), can_gc);
803 }
804 }
805}
806
807#[derive(Clone, JSTraceable, MallocSizeOf)]
810struct SourceCancelPromiseFulfillmentHandler {
811 #[conditional_malloc_size_of]
812 result: Rc<Promise>,
813}
814
815impl Callback for SourceCancelPromiseFulfillmentHandler {
816 fn callback(&self, _cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
820 self.result.resolve_native(&(), can_gc);
821 }
822}
823
824#[derive(Clone, JSTraceable, MallocSizeOf)]
827struct SourceCancelPromiseRejectionHandler {
828 #[conditional_malloc_size_of]
829 result: Rc<Promise>,
830}
831
832impl Callback for SourceCancelPromiseRejectionHandler {
833 fn callback(&self, _cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
837 self.result.reject_native(&v, can_gc);
838 }
839}
840
841#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf, PartialEq)]
843pub(crate) enum ReadableStreamState {
844 #[default]
845 Readable,
846 Closed,
847 Errored,
848}
849
850#[derive(JSTraceable, MallocSizeOf)]
852#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
853pub(crate) enum ControllerType {
854 Byte(MutNullableDom<ReadableByteStreamController>),
856 Default(MutNullableDom<ReadableStreamDefaultController>),
858}
859
860#[derive(JSTraceable, MallocSizeOf)]
862#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
863pub(crate) enum ReaderType {
864 #[allow(clippy::upper_case_acronyms)]
866 BYOB(MutNullableDom<ReadableStreamBYOBReader>),
867 Default(MutNullableDom<ReadableStreamDefaultReader>),
869}
870
871impl Eq for ReaderType {}
872impl PartialEq for ReaderType {
873 fn eq(&self, other: &Self) -> bool {
874 matches!(
875 (self, other),
876 (ReaderType::BYOB(_), ReaderType::BYOB(_)) |
877 (ReaderType::Default(_), ReaderType::Default(_))
878 )
879 }
880}
881
882#[cfg_attr(crown, allow(crown::unrooted_must_root))]
884pub(crate) fn create_readable_stream(
885 global: &GlobalScope,
886 underlying_source_type: UnderlyingSourceType,
887 queuing_strategy: Option<Rc<QueuingStrategySize>>,
888 high_water_mark: Option<f64>,
889 can_gc: CanGc,
890) -> DomRoot<ReadableStream> {
891 let high_water_mark = high_water_mark.unwrap_or(1.0);
893
894 let size_algorithm =
896 queuing_strategy.unwrap_or(extract_size_algorithm(&QueuingStrategy::empty(), can_gc));
897
898 assert!(high_water_mark >= 0.0);
900
901 let stream = ReadableStream::new_with_proto(global, None, can_gc);
904
905 let controller = ReadableStreamDefaultController::new(
907 global,
908 underlying_source_type,
909 high_water_mark,
910 size_algorithm,
911 can_gc,
912 );
913
914 controller
917 .setup(stream.clone(), can_gc)
918 .expect("Setup of default controller cannot fail");
919
920 stream
922}
923
924#[dom_struct]
926pub(crate) struct ReadableStream {
927 reflector_: Reflector,
928
929 controller: RefCell<Option<ControllerType>>,
933
934 #[ignore_malloc_size_of = "mozjs"]
936 stored_error: Heap<JSVal>,
937
938 disturbed: Cell<bool>,
940
941 reader: RefCell<Option<ReaderType>>,
943
944 state: Cell<ReadableStreamState>,
946}
947
948impl ReadableStream {
949 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
950 fn new_inherited() -> ReadableStream {
952 ReadableStream {
953 reflector_: Reflector::new(),
954 controller: RefCell::new(None),
955 stored_error: Heap::default(),
956 disturbed: Default::default(),
957 reader: RefCell::new(None),
958 state: Cell::new(Default::default()),
959 }
960 }
961
962 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
963 pub(crate) fn new_with_proto(
964 global: &GlobalScope,
965 proto: Option<SafeHandleObject>,
966 can_gc: CanGc,
967 ) -> DomRoot<ReadableStream> {
968 reflect_dom_object_with_proto(
969 Box::new(ReadableStream::new_inherited()),
970 global,
971 proto,
972 can_gc,
973 )
974 }
975
976 pub(crate) fn set_default_controller(&self, controller: &ReadableStreamDefaultController) {
979 *self.controller.borrow_mut() = Some(ControllerType::Default(MutNullableDom::new(Some(
980 controller,
981 ))));
982 }
983
984 pub(crate) fn set_byte_controller(&self, controller: &ReadableByteStreamController) {
987 *self.controller.borrow_mut() =
988 Some(ControllerType::Byte(MutNullableDom::new(Some(controller))));
989 }
990
991 pub(crate) fn assert_no_controller(&self) {
994 let has_no_controller = self.controller.borrow().is_none();
995 assert!(has_no_controller);
996 }
997
998 pub(crate) fn new_from_bytes(
1000 global: &GlobalScope,
1001 bytes: Vec<u8>,
1002 can_gc: CanGc,
1003 ) -> Fallible<DomRoot<ReadableStream>> {
1004 let stream = ReadableStream::new_with_external_underlying_source(
1005 global,
1006 UnderlyingSourceType::Memory(bytes.len()),
1007 can_gc,
1008 )?;
1009 stream.enqueue_native(bytes, can_gc);
1010 stream.controller_close_native(can_gc);
1011 Ok(stream)
1012 }
1013
1014 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
1017 pub(crate) fn new_with_external_underlying_source(
1018 global: &GlobalScope,
1019 source: UnderlyingSourceType,
1020 can_gc: CanGc,
1021 ) -> Fallible<DomRoot<ReadableStream>> {
1022 assert!(source.is_native());
1023 let stream = ReadableStream::new_with_proto(global, None, can_gc);
1024 let controller = ReadableStreamDefaultController::new(
1025 global,
1026 source,
1027 1.0,
1028 extract_size_algorithm(&QueuingStrategy::empty(), can_gc),
1029 can_gc,
1030 );
1031 controller.setup(stream.clone(), can_gc)?;
1032 Ok(stream)
1033 }
1034
1035 pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
1037 match self.controller.borrow().as_ref() {
1038 Some(ControllerType::Default(controller)) => {
1039 let controller = controller
1040 .get()
1041 .ok_or_else(|| Error::Type("Stream should have controller.".to_string()))?;
1042 controller.perform_release_steps()
1043 },
1044 Some(ControllerType::Byte(controller)) => {
1045 let controller = controller
1046 .get()
1047 .ok_or_else(|| Error::Type("Stream should have controller.".to_string()))?;
1048 controller.perform_release_steps()
1049 },
1050 None => Err(Error::Type("Stream should have controller.".to_string())),
1051 }
1052 }
1053
1054 pub(crate) fn perform_pull_steps(
1058 &self,
1059 cx: SafeJSContext,
1060 read_request: &ReadRequest,
1061 can_gc: CanGc,
1062 ) {
1063 match self.controller.borrow().as_ref() {
1064 Some(ControllerType::Default(controller)) => controller
1065 .get()
1066 .expect("Stream should have controller.")
1067 .perform_pull_steps(read_request, can_gc),
1068 Some(ControllerType::Byte(controller)) => controller
1069 .get()
1070 .expect("Stream should have controller.")
1071 .perform_pull_steps(cx, read_request, can_gc),
1072 None => {
1073 unreachable!("Stream does not have a controller.");
1074 },
1075 }
1076 }
1077
1078 pub(crate) fn perform_pull_into(
1082 &self,
1083 cx: SafeJSContext,
1084 read_into_request: &ReadIntoRequest,
1085 view: HeapBufferSource<ArrayBufferViewU8>,
1086 options: &ReadableStreamBYOBReaderReadOptions,
1087 can_gc: CanGc,
1088 ) {
1089 match self.controller.borrow().as_ref() {
1090 Some(ControllerType::Byte(controller)) => controller
1091 .get()
1092 .expect("Stream should have controller.")
1093 .perform_pull_into(cx, read_into_request, view, options, can_gc),
1094 _ => {
1095 unreachable!(
1096 "Pulling a chunk from a stream with a default controller using a BYOB reader"
1097 )
1098 },
1099 }
1100 }
1101
1102 pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
1104 match self.reader.borrow().as_ref() {
1105 Some(ReaderType::Default(reader)) => {
1106 let Some(reader) = reader.get() else {
1107 panic!("Attempt to add a read request without having first acquired a reader.");
1108 };
1109
1110 assert!(self.is_readable());
1112
1113 reader.add_read_request(read_request);
1115 },
1116 _ => {
1117 unreachable!("Adding a read request can only be done on a default reader.")
1118 },
1119 }
1120 }
1121
1122 pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
1124 match self.reader.borrow().as_ref() {
1125 Some(ReaderType::BYOB(reader)) => {
1127 let Some(reader) = reader.get() else {
1128 unreachable!(
1129 "Attempt to add a read into request without having first acquired a reader."
1130 );
1131 };
1132
1133 assert!(self.is_readable() || self.is_closed());
1135
1136 reader.add_read_into_request(read_request);
1138 },
1139 _ => {
1140 unreachable!("Adding a read into request can only be done on a BYOB reader.")
1141 },
1142 }
1143 }
1144
1145 pub(crate) fn enqueue_native(&self, bytes: Vec<u8>, can_gc: CanGc) {
1148 match self.controller.borrow().as_ref() {
1149 Some(ControllerType::Default(controller)) => controller
1150 .get()
1151 .expect("Stream should have controller.")
1152 .enqueue_native(bytes, can_gc),
1153 _ => {
1154 unreachable!(
1155 "Enqueueing chunk to a stream from Rust on other than default controller"
1156 );
1157 },
1158 }
1159 }
1160
1161 pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
1163 assert!(self.is_readable());
1165
1166 self.state.set(ReadableStreamState::Errored);
1168
1169 self.stored_error.set(e.get());
1171
1172 let default_reader = {
1175 let reader_ref = self.reader.borrow();
1176 match reader_ref.as_ref() {
1177 Some(ReaderType::Default(reader)) => reader.get(),
1178 _ => None,
1179 }
1180 };
1181
1182 if let Some(reader) = default_reader {
1183 reader.error(e, can_gc);
1185 return;
1186 }
1187
1188 let byob_reader = {
1189 let reader_ref = self.reader.borrow();
1190 match reader_ref.as_ref() {
1191 Some(ReaderType::BYOB(reader)) => reader.get(),
1192 _ => None,
1193 }
1194 };
1195
1196 if let Some(reader) = byob_reader {
1197 reader.error_read_into_requests(e, can_gc);
1199 }
1200
1201 }
1203
1204 pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
1206 handle_mut.set(self.stored_error.get());
1207 }
1208
1209 pub(crate) fn error_native(&self, error: Error, can_gc: CanGc) {
1212 let cx = GlobalScope::get_cx();
1213 rooted!(in(*cx) let mut error_val = UndefinedValue());
1214 error.to_jsval(cx, &self.global(), error_val.handle_mut(), can_gc);
1215 self.error(error_val.handle(), can_gc);
1216 }
1217
1218 pub(crate) fn controller_close_native(&self, can_gc: CanGc) {
1221 match self.controller.borrow().as_ref() {
1222 Some(ControllerType::Default(controller)) => {
1223 let _ = controller
1224 .get()
1225 .expect("Stream should have controller.")
1226 .Close(can_gc);
1227 },
1228 _ => {
1229 unreachable!("Native closing is only done on default controllers.")
1230 },
1231 }
1232 }
1233
1234 pub(crate) fn in_memory(&self) -> bool {
1237 match self.controller.borrow().as_ref() {
1238 Some(ControllerType::Default(controller)) => controller
1239 .get()
1240 .expect("Stream should have controller.")
1241 .in_memory(),
1242 _ => {
1243 unreachable!(
1244 "Checking if source is in memory for a stream with a non-default controller"
1245 )
1246 },
1247 }
1248 }
1249
1250 pub(crate) fn get_in_memory_bytes(&self) -> Option<IpcSharedMemory> {
1253 match self.controller.borrow().as_ref() {
1254 Some(ControllerType::Default(controller)) => controller
1255 .get()
1256 .expect("Stream should have controller.")
1257 .get_in_memory_bytes()
1258 .as_deref()
1259 .map(IpcSharedMemory::from_bytes),
1260 _ => {
1261 unreachable!("Getting in-memory bytes for a stream with a non-default controller")
1262 },
1263 }
1264 }
1265
1266 pub(crate) fn acquire_default_reader(
1271 &self,
1272 can_gc: CanGc,
1273 ) -> Fallible<DomRoot<ReadableStreamDefaultReader>> {
1274 let reader = ReadableStreamDefaultReader::new(&self.global(), can_gc);
1276
1277 reader.set_up(self, &self.global(), can_gc)?;
1279
1280 Ok(reader)
1282 }
1283
1284 pub(crate) fn acquire_byob_reader(
1286 &self,
1287 can_gc: CanGc,
1288 ) -> Fallible<DomRoot<ReadableStreamBYOBReader>> {
1289 let reader = ReadableStreamBYOBReader::new(&self.global(), can_gc);
1291 reader.set_up(self, &self.global(), can_gc)?;
1293
1294 Ok(reader)
1296 }
1297
1298 pub(crate) fn get_default_controller(&self) -> DomRoot<ReadableStreamDefaultController> {
1299 match self.controller.borrow().as_ref() {
1300 Some(ControllerType::Default(controller)) => {
1301 controller.get().expect("Stream should have controller.")
1302 },
1303 _ => {
1304 unreachable!(
1305 "Getting default controller for a stream with a non-default controller"
1306 )
1307 },
1308 }
1309 }
1310
1311 pub(crate) fn get_default_reader(&self) -> DomRoot<ReadableStreamDefaultReader> {
1312 match self.reader.borrow().as_ref() {
1313 Some(ReaderType::Default(reader)) => reader.get().expect("Stream should have reader."),
1314 _ => {
1315 unreachable!("Getting default reader for a stream with a non-default reader")
1316 },
1317 }
1318 }
1319
1320 pub(crate) fn read_a_chunk(&self, can_gc: CanGc) -> Rc<Promise> {
1326 match self.reader.borrow().as_ref() {
1327 Some(ReaderType::Default(reader)) => {
1328 let Some(reader) = reader.get() else {
1329 unreachable!(
1330 "Attempt to read stream chunk without having first acquired a reader."
1331 );
1332 };
1333 reader.Read(can_gc)
1334 },
1335 _ => {
1336 unreachable!("Native reading of a chunk can only be done with a default reader.")
1337 },
1338 }
1339 }
1340
1341 pub(crate) fn stop_reading(&self, can_gc: CanGc) {
1346 let reader_ref = self.reader.borrow();
1347
1348 match reader_ref.as_ref() {
1349 Some(ReaderType::Default(reader)) => {
1350 let Some(reader) = reader.get() else {
1351 unreachable!("Attempt to stop reading without having first acquired a reader.");
1352 };
1353
1354 drop(reader_ref);
1355 reader.release(can_gc).expect("Reader release cannot fail.");
1356 },
1357 _ => {
1358 unreachable!("Native stop reading can only be done with a default reader.")
1359 },
1360 }
1361 }
1362
1363 pub(crate) fn is_locked(&self) -> bool {
1365 match self.reader.borrow().as_ref() {
1366 Some(ReaderType::Default(reader)) => reader.get().is_some(),
1367 Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1368 None => false,
1369 }
1370 }
1371
1372 pub(crate) fn is_disturbed(&self) -> bool {
1373 self.disturbed.get()
1374 }
1375
1376 pub(crate) fn set_is_disturbed(&self, disturbed: bool) {
1377 self.disturbed.set(disturbed);
1378 }
1379
1380 pub(crate) fn is_closed(&self) -> bool {
1381 self.state.get() == ReadableStreamState::Closed
1382 }
1383
1384 pub(crate) fn is_errored(&self) -> bool {
1385 self.state.get() == ReadableStreamState::Errored
1386 }
1387
1388 pub(crate) fn is_readable(&self) -> bool {
1389 self.state.get() == ReadableStreamState::Readable
1390 }
1391
1392 pub(crate) fn has_default_reader(&self) -> bool {
1393 match self.reader.borrow().as_ref() {
1394 Some(ReaderType::Default(reader)) => reader.get().is_some(),
1395 _ => false,
1396 }
1397 }
1398
1399 pub(crate) fn has_byob_reader(&self) -> bool {
1400 match self.reader.borrow().as_ref() {
1401 Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1402 _ => false,
1403 }
1404 }
1405
1406 pub(crate) fn has_byte_controller(&self) -> bool {
1407 match self.controller.borrow().as_ref() {
1408 Some(ControllerType::Byte(controller)) => controller.get().is_some(),
1409 _ => false,
1410 }
1411 }
1412
1413 pub(crate) fn get_num_read_requests(&self) -> usize {
1415 match self.reader.borrow().as_ref() {
1416 Some(ReaderType::Default(reader)) => {
1417 let reader = reader
1418 .get()
1419 .expect("Stream must have a reader when getting the number of read requests.");
1420 reader.get_num_read_requests()
1421 },
1422 _ => unreachable!(
1423 "Stream must have a default reader when get num read requests is called into."
1424 ),
1425 }
1426 }
1427
1428 pub(crate) fn get_num_read_into_requests(&self) -> usize {
1430 assert!(self.has_byob_reader());
1431
1432 match self.reader.borrow().as_ref() {
1433 Some(ReaderType::BYOB(reader)) => {
1434 let Some(reader) = reader.get() else {
1435 unreachable!(
1436 "Stream must have a reader when get num read into requests is called into."
1437 );
1438 };
1439 reader.get_num_read_into_requests()
1440 },
1441 _ => {
1442 unreachable!(
1443 "Stream must have a BYOB reader when get num read into requests is called into."
1444 );
1445 },
1446 }
1447 }
1448
1449 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
1451 pub(crate) fn fulfill_read_request(&self, chunk: SafeHandleValue, done: bool, can_gc: CanGc) {
1452 assert!(self.has_default_reader());
1454
1455 match self.reader.borrow().as_ref() {
1456 Some(ReaderType::Default(reader)) => {
1457 let reader = reader
1459 .get()
1460 .expect("Stream must have a reader when a read request is fulfilled.");
1461 assert_ne!(reader.get_num_read_requests(), 0);
1463 let request = reader.remove_read_request();
1466
1467 if done {
1468 request.close_steps(can_gc);
1470 } else {
1471 let result = RootedTraceableBox::new(Heap::default());
1473 result.set(*chunk);
1474 request.chunk_steps(result, can_gc);
1475 }
1476 },
1477 _ => {
1478 unreachable!(
1479 "Stream must have a default reader when fulfill read requests is called into."
1480 );
1481 },
1482 }
1483 }
1484
1485 pub(crate) fn fulfill_read_into_request(
1487 &self,
1488 chunk: SafeHandleValue,
1489 done: bool,
1490 can_gc: CanGc,
1491 ) {
1492 assert!(self.has_byob_reader());
1494
1495 match self.reader.borrow().as_ref() {
1497 Some(ReaderType::BYOB(reader)) => {
1498 let Some(reader) = reader.get() else {
1499 unreachable!(
1500 "Stream must have a reader when a read into request is fulfilled."
1501 );
1502 };
1503
1504 assert!(reader.get_num_read_into_requests() > 0);
1506
1507 let read_into_request = reader.remove_read_into_request();
1510
1511 let result = RootedTraceableBox::new(Heap::default());
1513 if done {
1514 result.set(*chunk);
1515 read_into_request.close_steps(Some(result), can_gc);
1516 } else {
1517 result.set(*chunk);
1519 read_into_request.chunk_steps(result, can_gc);
1520 }
1521 },
1522 _ => {
1523 unreachable!(
1524 "Stream must have a BYOB reader when fulfill read into requests is called into."
1525 );
1526 },
1527 };
1528 }
1529
1530 pub(crate) fn close(&self, can_gc: CanGc) {
1532 assert!(self.is_readable());
1534 self.state.set(ReadableStreamState::Closed);
1536 let default_reader = {
1542 let reader_ref = self.reader.borrow();
1543 match reader_ref.as_ref() {
1544 Some(ReaderType::Default(reader)) => reader.get(),
1545 _ => None,
1546 }
1547 };
1548
1549 if let Some(reader) = default_reader {
1550 reader.close(can_gc);
1552 return;
1553 }
1554
1555 let byob_reader = {
1557 let reader_ref = self.reader.borrow();
1558 match reader_ref.as_ref() {
1559 Some(ReaderType::BYOB(reader)) => reader.get(),
1560 _ => None,
1561 }
1562 };
1563
1564 if let Some(reader) = byob_reader {
1565 reader.close(can_gc);
1567 }
1568
1569 }
1571
1572 pub(crate) fn cancel(
1574 &self,
1575 cx: SafeJSContext,
1576 global: &GlobalScope,
1577 reason: SafeHandleValue,
1578 can_gc: CanGc,
1579 ) -> Rc<Promise> {
1580 self.disturbed.set(true);
1582
1583 if self.is_closed() {
1585 return Promise::new_resolved(global, cx, (), can_gc);
1586 }
1587 if self.is_errored() {
1589 let promise = Promise::new(global, can_gc);
1590 rooted!(in(*cx) let mut rval = UndefinedValue());
1591 self.stored_error
1592 .safe_to_jsval(cx, rval.handle_mut(), can_gc);
1593 promise.reject_native(&rval.handle(), can_gc);
1594 return promise;
1595 }
1596 self.close(can_gc);
1598
1599 let byob_reader = {
1601 let reader_ref = self.reader.borrow();
1602 match reader_ref.as_ref() {
1603 Some(ReaderType::BYOB(reader)) => reader.get(),
1604 _ => None,
1605 }
1606 };
1607
1608 if let Some(reader) = byob_reader {
1609 reader.cancel(can_gc);
1611 }
1612
1613 let source_cancel_promise = match self.controller.borrow().as_ref() {
1616 Some(ControllerType::Default(controller)) => controller
1617 .get()
1618 .expect("Stream should have controller.")
1619 .perform_cancel_steps(cx, global, reason, can_gc),
1620 Some(ControllerType::Byte(controller)) => controller
1621 .get()
1622 .expect("Stream should have controller.")
1623 .perform_cancel_steps(cx, global, reason, can_gc),
1624 None => {
1625 panic!("Stream does not have a controller.");
1626 },
1627 };
1628
1629 let global = self.global();
1632 let result_promise = Promise::new(&global, can_gc);
1633 let fulfillment_handler = Box::new(SourceCancelPromiseFulfillmentHandler {
1634 result: result_promise.clone(),
1635 });
1636 let rejection_handler = Box::new(SourceCancelPromiseRejectionHandler {
1637 result: result_promise.clone(),
1638 });
1639 let handler = PromiseNativeHandler::new(
1640 &global,
1641 Some(fulfillment_handler),
1642 Some(rejection_handler),
1643 can_gc,
1644 );
1645 let realm = enter_realm(&*global);
1646 let comp = InRealm::Entered(&realm);
1647 source_cancel_promise.append_native_handler(&handler, comp, can_gc);
1648
1649 result_promise
1652 }
1653
1654 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
1655 pub(crate) fn set_reader(&self, new_reader: Option<ReaderType>) {
1656 *self.reader.borrow_mut() = new_reader;
1657 }
1658
1659 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
1661 fn default_tee(
1662 &self,
1663 clone_for_branch_2: bool,
1664 can_gc: CanGc,
1665 ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1666 let clone_for_branch_2 = Rc::new(Cell::new(clone_for_branch_2));
1670
1671 let reader = self.acquire_default_reader(can_gc)?;
1673 self.set_reader(Some(ReaderType::Default(MutNullableDom::new(Some(
1674 &reader,
1675 )))));
1676
1677 let reading = Rc::new(Cell::new(false));
1679 let read_again = Rc::new(Cell::new(false));
1681 let canceled_1 = Rc::new(Cell::new(false));
1683 let canceled_2 = Rc::new(Cell::new(false));
1685
1686 let reason_1 = Rc::new(Heap::boxed(UndefinedValue()));
1688 let reason_2 = Rc::new(Heap::boxed(UndefinedValue()));
1690 let cancel_promise = Promise::new(&self.global(), can_gc);
1692
1693 let tee_source_1 = DefaultTeeUnderlyingSource::new(
1694 &reader,
1695 self,
1696 reading.clone(),
1697 read_again.clone(),
1698 canceled_1.clone(),
1699 canceled_2.clone(),
1700 clone_for_branch_2.clone(),
1701 reason_1.clone(),
1702 reason_2.clone(),
1703 cancel_promise.clone(),
1704 TeeCancelAlgorithm::Cancel1Algorithm,
1705 can_gc,
1706 );
1707
1708 let underlying_source_type_branch_1 =
1709 UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_1));
1710
1711 let tee_source_2 = DefaultTeeUnderlyingSource::new(
1712 &reader,
1713 self,
1714 reading,
1715 read_again,
1716 canceled_1.clone(),
1717 canceled_2.clone(),
1718 clone_for_branch_2,
1719 reason_1,
1720 reason_2,
1721 cancel_promise.clone(),
1722 TeeCancelAlgorithm::Cancel2Algorithm,
1723 can_gc,
1724 );
1725
1726 let underlying_source_type_branch_2 =
1727 UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_2));
1728
1729 let branch_1 = create_readable_stream(
1731 &self.global(),
1732 underlying_source_type_branch_1,
1733 None,
1734 None,
1735 can_gc,
1736 );
1737 tee_source_1.set_branch_1(&branch_1);
1738 tee_source_2.set_branch_1(&branch_1);
1739
1740 let branch_2 = create_readable_stream(
1742 &self.global(),
1743 underlying_source_type_branch_2,
1744 None,
1745 None,
1746 can_gc,
1747 );
1748 tee_source_1.set_branch_2(&branch_2);
1749 tee_source_2.set_branch_2(&branch_2);
1750
1751 reader.append_native_handler_to_closed_promise(
1753 &branch_1,
1754 &branch_2,
1755 canceled_1,
1756 canceled_2,
1757 cancel_promise,
1758 can_gc,
1759 );
1760
1761 Ok(vec![branch_1, branch_2])
1763 }
1764
1765 #[allow(clippy::too_many_arguments)]
1767 pub(crate) fn pipe_to(
1768 &self,
1769 cx: SafeJSContext,
1770 global: &GlobalScope,
1771 dest: &WritableStream,
1772 prevent_close: bool,
1773 prevent_abort: bool,
1774 prevent_cancel: bool,
1775 signal: Option<&AbortSignal>,
1776 realm: InRealm,
1777 can_gc: CanGc,
1778 ) -> Rc<Promise> {
1779 assert!(!self.is_locked());
1790
1791 assert!(!dest.is_locked());
1793
1794 let reader = self
1802 .acquire_default_reader(can_gc)
1803 .expect("Acquiring a default reader for pipe_to cannot fail");
1804
1805 let writer = dest
1807 .aquire_default_writer(cx, global, can_gc)
1808 .expect("Acquiring a default writer for pipe_to cannot fail");
1809
1810 self.disturbed.set(true);
1812
1813 let promise = Promise::new(global, can_gc);
1818
1819 rooted!(in(*cx) let pipe_to = PipeTo {
1821 reader: Dom::from_ref(&reader),
1822 writer: Dom::from_ref(&writer),
1823 pending_writes: Default::default(),
1824 state: Default::default(),
1825 prevent_abort,
1826 prevent_cancel,
1827 prevent_close,
1828 shutting_down: Default::default(),
1829 abort_reason: Default::default(),
1830 shutdown_error: Default::default(),
1831 shutdown_action_promise: Default::default(),
1832 result_promise: promise.clone(),
1833 });
1834
1835 if let Some(signal) = signal {
1838 rooted!(in(*cx) let abort_algorithm = AbortAlgorithm::StreamPiping(pipe_to.clone()));
1841
1842 if signal.aborted() {
1844 signal.run_abort_algorithm(cx, global, &abort_algorithm, realm, can_gc);
1845 return promise;
1846 }
1847
1848 signal.add(&abort_algorithm);
1850 }
1851
1852 pipe_to.check_and_propagate_errors_forward(cx, global, realm, can_gc);
1854 pipe_to.check_and_propagate_errors_backward(cx, global, realm, can_gc);
1855 pipe_to.check_and_propagate_closing_forward(cx, global, realm, can_gc);
1856 pipe_to.check_and_propagate_closing_backward(cx, global, realm, can_gc);
1857
1858 if *pipe_to.state.borrow() == PipeToState::Starting {
1860 pipe_to.wait_for_writer_ready(global, realm, can_gc);
1862 }
1863
1864 promise
1866 }
1867
1868 fn tee(
1870 &self,
1871 clone_for_branch_2: bool,
1872 can_gc: CanGc,
1873 ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1874 match self.controller.borrow().as_ref() {
1878 Some(ControllerType::Default(_)) => {
1879 self.default_tee(clone_for_branch_2, can_gc)
1881 },
1882 Some(ControllerType::Byte(_)) => {
1883 Err(Error::Type(
1886 "Teeing is not yet supported for byte streams".to_owned(),
1887 ))
1888 },
1889 None => {
1890 unreachable!("Stream should have a controller.");
1891 },
1892 }
1893 }
1894
1895 pub(crate) fn set_up_byte_controller(
1897 &self,
1898 global: &GlobalScope,
1899 underlying_source_dict: JsUnderlyingSource,
1900 underlying_source_handle: SafeHandleObject,
1901 stream: DomRoot<ReadableStream>,
1902 strategy_hwm: f64,
1903 can_gc: CanGc,
1904 ) -> Fallible<()> {
1905 if let Some(0) = underlying_source_dict.autoAllocateChunkSize {
1921 return Err(Error::Type("autoAllocateChunkSize cannot be 0".to_owned()));
1922 }
1923
1924 let controller = ReadableByteStreamController::new(
1925 UnderlyingSourceType::Js(underlying_source_dict, Heap::default()),
1926 strategy_hwm,
1927 global,
1928 can_gc,
1929 );
1930
1931 controller.set_underlying_source_this_object(underlying_source_handle);
1934
1935 controller.setup(global, stream, can_gc)
1938 }
1939
1940 pub(crate) fn setup_cross_realm_transform_readable(
1942 &self,
1943 cx: SafeJSContext,
1944 port: &MessagePort,
1945 can_gc: CanGc,
1946 ) {
1947 let port_id = port.message_port_id();
1948 let global = self.global();
1949
1950 let size_algorithm = extract_size_algorithm(&QueuingStrategy::default(), can_gc);
1955
1956 let controller = ReadableStreamDefaultController::new(
1960 &self.global(),
1961 UnderlyingSourceType::Transfer(Dom::from_ref(port)),
1962 0.,
1963 size_algorithm,
1964 can_gc,
1965 );
1966
1967 rooted!(in(*cx) let cross_realm_transform_readable = CrossRealmTransformReadable {
1970 controller: Dom::from_ref(&controller),
1971 });
1972 global.note_cross_realm_transform_readable(&cross_realm_transform_readable, port_id);
1973
1974 port.Start(can_gc);
1976
1977 controller
1979 .setup(DomRoot::from_ref(self), can_gc)
1980 .expect("Setting up controller for transfer cannot fail.");
1981 }
1982}
1983
1984impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
1985 fn Constructor(
1987 cx: SafeJSContext,
1988 global: &GlobalScope,
1989 proto: Option<SafeHandleObject>,
1990 can_gc: CanGc,
1991 underlying_source: Option<*mut JSObject>,
1992 strategy: &QueuingStrategy,
1993 ) -> Fallible<DomRoot<Self>> {
1994 rooted!(in(*cx) let underlying_source_obj = underlying_source.unwrap_or(ptr::null_mut()));
1996 let underlying_source_dict = if !underlying_source_obj.is_null() {
1999 rooted!(in(*cx) let obj_val = ObjectValue(underlying_source_obj.get()));
2000 match JsUnderlyingSource::new(cx, obj_val.handle(), can_gc) {
2001 Ok(ConversionResult::Success(val)) => val,
2002 Ok(ConversionResult::Failure(error)) => return Err(Error::Type(error.to_string())),
2003 _ => {
2004 return Err(Error::JSFailed);
2005 },
2006 }
2007 } else {
2008 JsUnderlyingSource::empty()
2009 };
2010
2011 let stream = ReadableStream::new_with_proto(global, proto, can_gc);
2013
2014 if underlying_source_dict.type_.is_some() {
2015 if strategy.size.is_some() {
2017 return Err(Error::Range(
2018 "size is not supported for byte streams".to_owned(),
2019 ));
2020 }
2021
2022 let strategy_hwm = extract_high_water_mark(strategy, 0.0)?;
2024
2025 stream.set_up_byte_controller(
2028 global,
2029 underlying_source_dict,
2030 underlying_source_obj.handle(),
2031 stream.clone(),
2032 strategy_hwm,
2033 can_gc,
2034 )?;
2035 } else {
2036 let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
2038
2039 let size_algorithm = extract_size_algorithm(strategy, can_gc);
2041
2042 let controller = ReadableStreamDefaultController::new(
2043 global,
2044 UnderlyingSourceType::Js(underlying_source_dict, Heap::default()),
2045 high_water_mark,
2046 size_algorithm,
2047 can_gc,
2048 );
2049
2050 controller.set_underlying_source_this_object(underlying_source_obj.handle());
2053
2054 controller.setup(stream.clone(), can_gc)?;
2056 };
2057
2058 Ok(stream)
2059 }
2060
2061 fn Locked(&self) -> bool {
2063 self.is_locked()
2064 }
2065
2066 fn Cancel(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
2068 let global = self.global();
2069 if self.is_locked() {
2070 let promise = Promise::new(&global, can_gc);
2073 promise.reject_error(Error::Type("stream is locked".to_owned()), can_gc);
2074 promise
2075 } else {
2076 self.cancel(cx, &global, reason, can_gc)
2078 }
2079 }
2080
2081 fn GetReader(
2083 &self,
2084 options: &ReadableStreamGetReaderOptions,
2085 can_gc: CanGc,
2086 ) -> Fallible<ReadableStreamReader> {
2087 if options.mode.is_none() {
2089 return Ok(ReadableStreamReader::ReadableStreamDefaultReader(
2090 self.acquire_default_reader(can_gc)?,
2091 ));
2092 }
2093 assert!(options.mode.unwrap() == ReadableStreamReaderMode::Byob);
2095
2096 Ok(ReadableStreamReader::ReadableStreamBYOBReader(
2098 self.acquire_byob_reader(can_gc)?,
2099 ))
2100 }
2101
2102 fn Tee(&self, can_gc: CanGc) -> Fallible<Vec<DomRoot<ReadableStream>>> {
2104 self.tee(false, can_gc)
2106 }
2107
2108 fn PipeTo(
2110 &self,
2111 destination: &WritableStream,
2112 options: &StreamPipeOptions,
2113 realm: InRealm,
2114 can_gc: CanGc,
2115 ) -> Rc<Promise> {
2116 let cx = GlobalScope::get_cx();
2117 let global = self.global();
2118
2119 if self.is_locked() {
2121 let promise = Promise::new(&global, can_gc);
2123 promise.reject_error(Error::Type("Source stream is locked".to_owned()), can_gc);
2124 return promise;
2125 }
2126
2127 if destination.is_locked() {
2129 let promise = Promise::new(&global, can_gc);
2131 promise.reject_error(
2132 Error::Type("Destination stream is locked".to_owned()),
2133 can_gc,
2134 );
2135 return promise;
2136 }
2137
2138 let signal = options.signal.as_deref();
2140
2141 self.pipe_to(
2143 cx,
2144 &global,
2145 destination,
2146 options.preventClose,
2147 options.preventAbort,
2148 options.preventCancel,
2149 signal,
2150 realm,
2151 can_gc,
2152 )
2153 }
2154
2155 fn PipeThrough(
2157 &self,
2158 transform: &ReadableWritablePair,
2159 options: &StreamPipeOptions,
2160 realm: InRealm,
2161 can_gc: CanGc,
2162 ) -> Fallible<DomRoot<ReadableStream>> {
2163 let global = self.global();
2164 let cx = GlobalScope::get_cx();
2165
2166 if self.is_locked() {
2168 return Err(Error::Type("Source stream is locked".to_owned()));
2169 }
2170
2171 if transform.writable.is_locked() {
2173 return Err(Error::Type("Destination stream is locked".to_owned()));
2174 }
2175
2176 let signal = options.signal.as_deref();
2178
2179 let promise = self.pipe_to(
2182 cx,
2183 &global,
2184 &transform.writable,
2185 options.preventClose,
2186 options.preventAbort,
2187 options.preventCancel,
2188 signal,
2189 realm,
2190 can_gc,
2191 );
2192
2193 promise.set_promise_is_handled();
2195
2196 Ok(transform.readable.clone())
2198 }
2199}
2200
2201#[expect(unsafe_code)]
2202pub(crate) unsafe fn get_type_and_value_from_message(
2206 cx: SafeJSContext,
2207 data: SafeHandleValue,
2208 value: SafeMutableHandleValue,
2209 can_gc: CanGc,
2210) -> DOMString {
2211 assert!(data.is_object());
2217 rooted!(in(*cx) let data_object = data.to_object());
2218
2219 rooted!(in(*cx) let mut type_ = UndefinedValue());
2221 unsafe {
2222 get_dictionary_property(
2223 *cx,
2224 data_object.handle(),
2225 "type",
2226 type_.handle_mut(),
2227 can_gc,
2228 )
2229 }
2230 .expect("Getting the type should not fail.");
2231
2232 unsafe { get_dictionary_property(*cx, data_object.handle(), "value", value, can_gc) }
2234 .expect("Getting the value should not fail.");
2235
2236 let result =
2238 DOMString::safe_from_jsval(cx, type_.handle(), StringificationBehavior::Empty, can_gc)
2239 .expect("The type of the message should be a string");
2240 let ConversionResult::Success(type_string) = result else {
2241 unreachable!("The type of the message should be a string");
2242 };
2243
2244 type_string
2245}
2246
2247impl js::gc::Rootable for CrossRealmTransformReadable {}
2248
2249#[derive(Clone, JSTraceable, MallocSizeOf)]
2253#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
2254pub(crate) struct CrossRealmTransformReadable {
2255 controller: Dom<ReadableStreamDefaultController>,
2257}
2258
2259impl CrossRealmTransformReadable {
2260 #[expect(unsafe_code)]
2263 pub(crate) fn handle_message(
2264 &self,
2265 cx: SafeJSContext,
2266 global: &GlobalScope,
2267 port: &MessagePort,
2268 message: SafeHandleValue,
2269 _realm: InRealm,
2270 can_gc: CanGc,
2271 ) {
2272 rooted!(in(*cx) let mut value = UndefinedValue());
2273 let type_string =
2274 unsafe { get_type_and_value_from_message(cx, message, value.handle_mut(), can_gc) };
2275
2276 if type_string == "chunk" {
2278 self.controller
2280 .enqueue(cx, value.handle(), can_gc)
2281 .expect("Enqueing a chunk should not fail.");
2282 }
2283
2284 if type_string == "close" {
2286 self.controller.close(can_gc);
2288
2289 global.disentangle_port(port, can_gc);
2291 }
2292
2293 if type_string == "error" {
2295 self.controller.error(value.handle(), can_gc);
2297
2298 global.disentangle_port(port, can_gc);
2300 }
2301 }
2302
2303 pub(crate) fn handle_error(
2306 &self,
2307 cx: SafeJSContext,
2308 global: &GlobalScope,
2309 port: &MessagePort,
2310 _realm: InRealm,
2311 can_gc: CanGc,
2312 ) {
2313 let error = DOMException::new(global, DOMErrorName::DataCloneError, can_gc);
2315 rooted!(in(*cx) let mut rooted_error = UndefinedValue());
2316 error.safe_to_jsval(cx, rooted_error.handle_mut(), can_gc);
2317
2318 port.cross_realm_transform_send_error(rooted_error.handle(), can_gc);
2320
2321 self.controller.error(rooted_error.handle(), can_gc);
2323
2324 global.disentangle_port(port, can_gc);
2326 }
2327}
2328
2329#[expect(unsafe_code)]
2330pub(crate) fn get_read_promise_done(
2332 cx: SafeJSContext,
2333 v: &SafeHandleValue,
2334 can_gc: CanGc,
2335) -> Result<bool, Error> {
2336 if !v.is_object() {
2337 return Err(Error::Type("Unknown format for done property.".to_string()));
2338 }
2339 unsafe {
2340 rooted!(in(*cx) let object = v.to_object());
2341 rooted!(in(*cx) let mut done = UndefinedValue());
2342 match get_dictionary_property(*cx, object.handle(), "done", done.handle_mut(), can_gc) {
2343 Ok(true) => match bool::safe_from_jsval(cx, done.handle(), (), can_gc) {
2344 Ok(ConversionResult::Success(val)) => Ok(val),
2345 Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.to_string())),
2346 _ => Err(Error::Type("Unknown format for done property.".to_string())),
2347 },
2348 Ok(false) => Err(Error::Type("Promise has no done property.".to_string())),
2349 Err(()) => Err(Error::JSFailed),
2350 }
2351 }
2352}
2353
2354#[expect(unsafe_code)]
2355pub(crate) fn get_read_promise_bytes(
2357 cx: SafeJSContext,
2358 v: &SafeHandleValue,
2359 can_gc: CanGc,
2360) -> Result<Vec<u8>, Error> {
2361 if !v.is_object() {
2362 return Err(Error::Type(
2363 "Unknown format for for bytes read.".to_string(),
2364 ));
2365 }
2366 unsafe {
2367 rooted!(in(*cx) let object = v.to_object());
2368 rooted!(in(*cx) let mut bytes = UndefinedValue());
2369 match get_dictionary_property(*cx, object.handle(), "value", bytes.handle_mut(), can_gc) {
2370 Ok(true) => {
2371 match Vec::<u8>::safe_from_jsval(
2372 cx,
2373 bytes.handle(),
2374 ConversionBehavior::EnforceRange,
2375 can_gc,
2376 ) {
2377 Ok(ConversionResult::Success(val)) => Ok(val),
2378 Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.to_string())),
2379 _ => Err(Error::Type("Unknown format for bytes read.".to_string())),
2380 }
2381 },
2382 Ok(false) => Err(Error::Type("Promise has no value property.".to_string())),
2383 Err(()) => Err(Error::JSFailed),
2384 }
2385 }
2386}
2387
2388pub(crate) fn bytes_from_chunk_jsval(
2392 cx: SafeJSContext,
2393 chunk: &RootedTraceableBox<Heap<JSVal>>,
2394 can_gc: CanGc,
2395) -> Result<Vec<u8>, Error> {
2396 match Vec::<u8>::safe_from_jsval(cx, chunk.handle(), ConversionBehavior::EnforceRange, can_gc) {
2397 Ok(ConversionResult::Success(vec)) => Ok(vec),
2398 Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.to_string())),
2399 _ => Err(Error::Type("Unknown format for bytes read.".to_string())),
2400 }
2401}
2402
2403impl Transferable for ReadableStream {
2405 type Index = MessagePortIndex;
2406 type Data = MessagePortImpl;
2407
2408 fn transfer(&self) -> Fallible<(MessagePortId, MessagePortImpl)> {
2410 if self.is_locked() {
2413 return Err(Error::DataClone(None));
2414 }
2415
2416 let global = self.global();
2417 let realm = enter_realm(&*global);
2418 let comp = InRealm::Entered(&realm);
2419 let cx = GlobalScope::get_cx();
2420 let can_gc = CanGc::note();
2421
2422 let port_1 = MessagePort::new(&global, can_gc);
2424 global.track_message_port(&port_1, None);
2425
2426 let port_2 = MessagePort::new(&global, can_gc);
2428 global.track_message_port(&port_2, None);
2429
2430 global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
2432
2433 let writable = WritableStream::new_with_proto(&global, None, can_gc);
2435
2436 writable.setup_cross_realm_transform_writable(cx, &port_1, can_gc);
2438
2439 let promise = self.pipe_to(
2441 cx, &global, &writable, false, false, false, None, comp, can_gc,
2442 );
2443
2444 promise.set_promise_is_handled();
2446
2447 port_2.transfer()
2449 }
2450
2451 fn transfer_receive(
2453 owner: &GlobalScope,
2454 id: MessagePortId,
2455 port_impl: MessagePortImpl,
2456 ) -> Result<DomRoot<Self>, ()> {
2457 let cx = GlobalScope::get_cx();
2458 let can_gc = CanGc::note();
2459
2460 let value = ReadableStream::new_with_proto(owner, None, can_gc);
2463
2464 let transferred_port = MessagePort::transfer_receive(owner, id, port_impl)?;
2471
2472 value.setup_cross_realm_transform_readable(cx, &transferred_port, can_gc);
2474 Ok(value)
2475 }
2476
2477 fn serialized_storage<'a>(
2479 data: StructuredData<'a, '_>,
2480 ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
2481 match data {
2482 StructuredData::Reader(r) => &mut r.port_impls,
2483 StructuredData::Writer(w) => &mut w.ports,
2484 }
2485 }
2486}