1use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr::{self};
8use std::rc::Rc;
9
10use base::id::{MessagePortId, MessagePortIndex};
11use constellation_traits::MessagePortImpl;
12use dom_struct::dom_struct;
13use rustc_hash::FxHashMap;
14use ipc_channel::ipc::IpcSharedMemory;
15use js::jsapi::{Heap, JSObject};
16use js::jsval::{JSVal, ObjectValue, UndefinedValue};
17use js::rust::{
18 HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
19 MutableHandleValue as SafeMutableHandleValue,
20};
21use js::typedarray::ArrayBufferViewU8;
22use script_bindings::conversions::SafeToJSValConvertible;
23
24use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
25use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{
26 ReadableStreamGetReaderOptions, ReadableStreamMethods, ReadableStreamReaderMode,
27 ReadableWritablePair, StreamPipeOptions,
28};
29use script_bindings::str::DOMString;
30
31use crate::dom::domexception::{DOMErrorName, DOMException};
32use script_bindings::conversions::{is_array_like, StringificationBehavior};
33use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
34use crate::dom::abortsignal::{AbortAlgorithm, AbortSignal};
35use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods;
36use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultController_Binding::ReadableStreamDefaultControllerMethods;
37use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource;
38use crate::dom::bindings::conversions::{ConversionBehavior, ConversionResult, SafeFromJSValConvertible};
39use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
40use crate::dom::bindings::codegen::GenericBindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriter_Binding::WritableStreamDefaultWriterMethods;
41use crate::dom::writablestream::WritableStream;
42use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultReaderOrReadableStreamBYOBReader as ReadableStreamReader;
43use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
44use crate::dom::bindings::root::{DomRoot, MutNullableDom, Dom};
45use crate::dom::bindings::trace::RootedTraceableBox;
46use crate::dom::bindings::utils::get_dictionary_property;
47use crate::dom::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
48use crate::dom::readablestreamgenericreader::ReadableStreamGenericReader;
49use crate::dom::globalscope::GlobalScope;
50use crate::dom::promise::{wait_for_all_promise, Promise};
51use crate::dom::readablebytestreamcontroller::ReadableByteStreamController;
52use crate::dom::readablestreambyobreader::ReadableStreamBYOBReader;
53use crate::dom::readablestreamdefaultcontroller::ReadableStreamDefaultController;
54use crate::dom::readablestreamdefaultreader::{ReadRequest, ReadableStreamDefaultReader};
55use crate::dom::defaultteeunderlyingsource::TeeCancelAlgorithm;
56use crate::dom::types::DefaultTeeUnderlyingSource;
57use crate::dom::underlyingsourcecontainer::UnderlyingSourceType;
58use crate::dom::writablestreamdefaultwriter::WritableStreamDefaultWriter;
59use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
60use crate::dom::messageport::MessagePort;
61use crate::realms::{enter_realm, InRealm};
62use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
63use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
64use crate::dom::bindings::transferable::Transferable;
65use crate::dom::bindings::structuredclone::StructuredData;
66
67use super::bindings::buffer_source::HeapBufferSource;
68use super::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::ReadableStreamBYOBReaderReadOptions;
69use super::readablestreambyobreader::ReadIntoRequest;
70
71#[derive(Clone, Debug, Default, PartialEq)]
73enum PipeToState {
74 #[default]
76 Starting,
77 PendingReady,
79 PendingRead,
81 ShuttingDownWithPendingWrites(Option<ShutdownAction>),
84 ShuttingDownPendingAction,
88 Finalized,
91}
92
93#[derive(Clone, Debug, PartialEq)]
95enum ShutdownAction {
96 WritableStreamAbort,
98 ReadableStreamCancel,
100 WritableStreamDefaultWriterCloseWithErrorPropagation,
102 Abort,
104}
105
106impl js::gc::Rootable for PipeTo {}
107
108#[derive(Clone, JSTraceable, MallocSizeOf)]
117#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
118pub(crate) struct PipeTo {
119 reader: Dom<ReadableStreamDefaultReader>,
121
122 writer: Dom<WritableStreamDefaultWriter>,
124
125 #[ignore_malloc_size_of = "Rc are hard"]
128 pending_writes: Rc<RefCell<VecDeque<Rc<Promise>>>>,
129
130 #[ignore_malloc_size_of = "Rc are hard"]
132 #[no_trace]
133 state: Rc<RefCell<PipeToState>>,
134
135 prevent_abort: bool,
137
138 prevent_cancel: bool,
140
141 prevent_close: bool,
143
144 #[ignore_malloc_size_of = "Rc are hard"]
147 shutting_down: Rc<Cell<bool>>,
148
149 #[ignore_malloc_size_of = "mozjs"]
152 abort_reason: Rc<Heap<JSVal>>,
153
154 #[ignore_malloc_size_of = "mozjs"]
157 shutdown_error: Rc<RefCell<Option<Heap<JSVal>>>>,
158
159 #[ignore_malloc_size_of = "Rc are hard"]
162 shutdown_action_promise: Rc<RefCell<Option<Rc<Promise>>>>,
163
164 #[ignore_malloc_size_of = "Rc are hard"]
167 result_promise: Rc<Promise>,
168}
169
170impl PipeTo {
171 pub(crate) fn abort_with_reason(
174 &self,
175 cx: SafeJSContext,
176 global: &GlobalScope,
177 reason: SafeHandleValue,
178 realm: InRealm,
179 can_gc: CanGc,
180 ) {
181 if self.shutting_down.get() {
183 return;
184 }
185
186 self.abort_reason.set(reason.get());
190
191 self.set_shutdown_error(reason);
196
197 self.shutdown(cx, global, Some(ShutdownAction::Abort), realm, can_gc);
203 }
204}
205
206impl Callback for PipeTo {
207 #[allow(unsafe_code)]
216 fn callback(&self, cx: SafeJSContext, result: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
217 let global = self.reader.global();
218
219 self.pending_writes.borrow_mut().retain(|p| {
225 let pending = p.is_pending();
226 if !pending {
227 p.set_promise_is_handled();
228 }
229 pending
230 });
231
232 let state_before_checks = self.state.borrow().clone();
234
235 if state_before_checks == PipeToState::PendingRead {
242 let source = self.reader.get_stream().expect("Source stream must be set");
243 if source.is_closed() {
244 let dest = self
245 .writer
246 .get_stream()
247 .expect("Destination stream must be set");
248
249 if dest.is_writable() && !dest.close_queued_or_in_flight() {
252 let Ok(done) = get_read_promise_done(cx, &result, can_gc) else {
253 return;
260 };
261
262 if !done {
263 self.write_chunk(cx, &global, result, can_gc);
265 }
266 }
267 }
268 }
269
270 self.check_and_propagate_errors_forward(cx, &global, realm, can_gc);
271 self.check_and_propagate_errors_backward(cx, &global, realm, can_gc);
272 self.check_and_propagate_closing_forward(cx, &global, realm, can_gc);
273 self.check_and_propagate_closing_backward(cx, &global, realm, can_gc);
274
275 let state = self.state.borrow().clone();
277
278 if state != state_before_checks {
282 return;
283 }
284
285 match state {
286 PipeToState::Starting => unreachable!("PipeTo should not be in the Starting state."),
287 PipeToState::PendingReady => {
288 self.read_chunk(&global, realm, can_gc);
290 },
291 PipeToState::PendingRead => {
292 self.write_chunk(cx, &global, result, can_gc);
294
295 if self.shutting_down.get() {
297 return;
298 }
299
300 self.wait_for_writer_ready(&global, realm, can_gc);
302 },
303 PipeToState::ShuttingDownWithPendingWrites(action) => {
304 if let Some(write) = self.pending_writes.borrow_mut().front().cloned() {
307 self.wait_on_pending_write(&global, write, realm, can_gc);
308 return;
309 }
310
311 if let Some(action) = action {
313 self.perform_action(cx, &global, action, realm, can_gc);
315 } else {
316 self.finalize(cx, &global, can_gc);
318 }
319 },
320 PipeToState::ShuttingDownPendingAction => {
321 let Some(ref promise) = *self.shutdown_action_promise.borrow() else {
322 unreachable!();
323 };
324 if promise.is_pending() {
325 return;
329 }
330
331 let is_array_like = {
332 if !result.is_object() {
333 false
334 } else {
335 unsafe { is_array_like::<crate::DomTypeHolder>(*cx, result) }
336 }
337 };
338
339 if !result.is_undefined() && !is_array_like {
341 self.set_shutdown_error(result);
351 }
352 self.finalize(cx, &global, can_gc);
353 },
354 PipeToState::Finalized => {},
355 }
356 }
357}
358
359impl PipeTo {
360 fn set_shutdown_error(&self, error: SafeHandleValue) {
363 *self.shutdown_error.borrow_mut() = Some(Heap::default());
364 let Some(ref heap) = *self.shutdown_error.borrow() else {
365 unreachable!("Option set to Some(heap) above.");
366 };
367 heap.set(error.get())
368 }
369
370 fn wait_for_writer_ready(&self, global: &GlobalScope, realm: InRealm, can_gc: CanGc) {
373 {
374 let mut state = self.state.borrow_mut();
375 *state = PipeToState::PendingReady;
376 }
377
378 let ready_promise = self.writer.Ready();
379 if ready_promise.is_fulfilled() {
380 self.read_chunk(global, realm, can_gc);
381 } else {
382 let handler = PromiseNativeHandler::new(
383 global,
384 Some(Box::new(self.clone())),
385 Some(Box::new(self.clone())),
386 can_gc,
387 );
388 ready_promise.append_native_handler(&handler, realm, can_gc);
389
390 let closed_promise = self.reader.Closed();
394 closed_promise.append_native_handler(&handler, realm, can_gc);
395 }
396 }
397
398 fn read_chunk(&self, global: &GlobalScope, realm: InRealm, can_gc: CanGc) {
400 *self.state.borrow_mut() = PipeToState::PendingRead;
401 let chunk_promise = self.reader.Read(can_gc);
402 let handler = PromiseNativeHandler::new(
403 global,
404 Some(Box::new(self.clone())),
405 Some(Box::new(self.clone())),
406 can_gc,
407 );
408 chunk_promise.append_native_handler(&handler, realm, can_gc);
409
410 let ready_promise = self.writer.Closed();
413 ready_promise.append_native_handler(&handler, realm, can_gc);
414 }
415
416 #[allow(unsafe_code)]
419 fn write_chunk(
420 &self,
421 cx: SafeJSContext,
422 global: &GlobalScope,
423 chunk: SafeHandleValue,
424 can_gc: CanGc,
425 ) -> bool {
426 if chunk.is_object() {
427 rooted!(in(*cx) let object = chunk.to_object());
428 rooted!(in(*cx) let mut bytes = UndefinedValue());
429 let has_value = unsafe {
430 get_dictionary_property(*cx, object.handle(), "value", bytes.handle_mut(), can_gc)
431 .expect("Chunk should have a value.")
432 };
433 if has_value {
434 let write_promise = self.writer.write(cx, global, bytes.handle(), can_gc);
436 self.pending_writes.borrow_mut().push_back(write_promise);
437 return true;
438 }
439 }
440 false
441 }
442
443 fn wait_on_pending_write(
447 &self,
448 global: &GlobalScope,
449 promise: Rc<Promise>,
450 realm: InRealm,
451 can_gc: CanGc,
452 ) {
453 let handler = PromiseNativeHandler::new(
454 global,
455 Some(Box::new(self.clone())),
456 Some(Box::new(self.clone())),
457 can_gc,
458 );
459 promise.append_native_handler(&handler, realm, can_gc);
460 }
461
462 fn check_and_propagate_errors_forward(
465 &self,
466 cx: SafeJSContext,
467 global: &GlobalScope,
468 realm: InRealm,
469 can_gc: CanGc,
470 ) {
471 if self.shutting_down.get() {
474 return;
475 }
476
477 let source = self
479 .reader
480 .get_stream()
481 .expect("Reader should still have a stream");
482 if source.is_errored() {
483 rooted!(in(*cx) let mut source_error = UndefinedValue());
484 source.get_stored_error(source_error.handle_mut());
485 self.set_shutdown_error(source_error.handle());
486
487 if !self.prevent_abort {
489 self.shutdown(
492 cx,
493 global,
494 Some(ShutdownAction::WritableStreamAbort),
495 realm,
496 can_gc,
497 )
498 } else {
499 self.shutdown(cx, global, None, realm, can_gc);
501 }
502 }
503 }
504
505 fn check_and_propagate_errors_backward(
508 &self,
509 cx: SafeJSContext,
510 global: &GlobalScope,
511 realm: InRealm,
512 can_gc: CanGc,
513 ) {
514 if self.shutting_down.get() {
517 return;
518 }
519
520 let dest = self
522 .writer
523 .get_stream()
524 .expect("Writer should still have a stream");
525 if dest.is_errored() {
526 rooted!(in(*cx) let mut dest_error = UndefinedValue());
527 dest.get_stored_error(dest_error.handle_mut());
528 self.set_shutdown_error(dest_error.handle());
529
530 if !self.prevent_cancel {
532 self.shutdown(
535 cx,
536 global,
537 Some(ShutdownAction::ReadableStreamCancel),
538 realm,
539 can_gc,
540 )
541 } else {
542 self.shutdown(cx, global, None, realm, can_gc);
544 }
545 }
546 }
547
548 fn check_and_propagate_closing_forward(
551 &self,
552 cx: SafeJSContext,
553 global: &GlobalScope,
554 realm: InRealm,
555 can_gc: CanGc,
556 ) {
557 if self.shutting_down.get() {
560 return;
561 }
562
563 let source = self
565 .reader
566 .get_stream()
567 .expect("Reader should still have a stream");
568 if source.is_closed() {
569 if !self.prevent_close {
571 self.shutdown(
574 cx,
575 global,
576 Some(ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation),
577 realm,
578 can_gc,
579 )
580 } else {
581 self.shutdown(cx, global, None, realm, can_gc);
583 }
584 }
585 }
586
587 fn check_and_propagate_closing_backward(
590 &self,
591 cx: SafeJSContext,
592 global: &GlobalScope,
593 realm: InRealm,
594 can_gc: CanGc,
595 ) {
596 if self.shutting_down.get() {
599 return;
600 }
601
602 let dest = self
605 .writer
606 .get_stream()
607 .expect("Writer should still have a stream");
608 if dest.close_queued_or_in_flight() || dest.is_closed() {
609 rooted!(in(*cx) let mut dest_closed = UndefinedValue());
614 let error =
615 Error::Type("Destination is closed or has closed queued or in flight".to_string());
616 error.to_jsval(cx, global, dest_closed.handle_mut(), can_gc);
617 self.set_shutdown_error(dest_closed.handle());
618
619 if !self.prevent_cancel {
621 self.shutdown(
624 cx,
625 global,
626 Some(ShutdownAction::ReadableStreamCancel),
627 realm,
628 can_gc,
629 )
630 } else {
631 self.shutdown(cx, global, None, realm, can_gc);
633 }
634 }
635 }
636
637 fn shutdown(
641 &self,
642 cx: SafeJSContext,
643 global: &GlobalScope,
644 action: Option<ShutdownAction>,
645 realm: InRealm,
646 can_gc: CanGc,
647 ) {
648 if !self.shutting_down.replace(true) {
651 let dest = self.writer.get_stream().expect("Stream must be set");
652 if dest.is_writable() && !dest.close_queued_or_in_flight() {
655 if let Some(write) = self.pending_writes.borrow_mut().front() {
661 *self.state.borrow_mut() = PipeToState::ShuttingDownWithPendingWrites(action);
662 self.wait_on_pending_write(global, write.clone(), realm, can_gc);
663 return;
664 }
665 }
666
667 if let Some(action) = action {
669 self.perform_action(cx, global, action, realm, can_gc);
671 } else {
672 self.finalize(cx, global, can_gc);
674 }
675 }
676 }
677
678 fn perform_action(
681 &self,
682 cx: SafeJSContext,
683 global: &GlobalScope,
684 action: ShutdownAction,
685 realm: InRealm,
686 can_gc: CanGc,
687 ) {
688 rooted!(in(*cx) let mut error = UndefinedValue());
689 if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
690 error.set(shutdown_error.get());
691 }
692
693 *self.state.borrow_mut() = PipeToState::ShuttingDownPendingAction;
694
695 let promise = match action {
697 ShutdownAction::WritableStreamAbort => {
698 let dest = self.writer.get_stream().expect("Stream must be set");
699 dest.abort(cx, global, error.handle(), realm, can_gc)
700 },
701 ShutdownAction::ReadableStreamCancel => {
702 let source = self
703 .reader
704 .get_stream()
705 .expect("Reader should have a stream.");
706 source.cancel(cx, global, error.handle(), can_gc)
707 },
708 ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation => {
709 self.writer.close_with_error_propagation(cx, global, can_gc)
710 },
711 ShutdownAction::Abort => {
712 rooted!(in(*cx) let mut error = UndefinedValue());
717 error.set(self.abort_reason.get());
718
719 let mut actions = vec![];
721
722 if !self.prevent_abort {
724 let dest = self
725 .writer
726 .get_stream()
727 .expect("Destination stream must be set");
728
729 let promise = if dest.is_writable() {
731 dest.abort(cx, global, error.handle(), realm, can_gc)
733 } else {
734 Promise::new_resolved(global, cx, (), can_gc)
736 };
737 actions.push(promise);
738 }
739
740 if !self.prevent_cancel {
742 let source = self.reader.get_stream().expect("Source stream must be set");
743
744 let promise = if source.is_readable() {
746 source.cancel(cx, global, error.handle(), can_gc)
748 } else {
749 Promise::new_resolved(global, cx, (), can_gc)
751 };
752 actions.push(promise);
753 }
754
755 wait_for_all_promise(cx, global, actions, realm, can_gc)
759 },
760 };
761
762 let handler = PromiseNativeHandler::new(
765 global,
766 Some(Box::new(self.clone())),
767 Some(Box::new(self.clone())),
768 can_gc,
769 );
770 promise.append_native_handler(&handler, realm, can_gc);
771 *self.shutdown_action_promise.borrow_mut() = Some(promise);
772 }
773
774 fn finalize(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
776 *self.state.borrow_mut() = PipeToState::Finalized;
777
778 self.writer.release(cx, global, can_gc);
780
781 self.reader
787 .release(can_gc)
788 .expect("Releasing the reader should not fail");
789
790 if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
796 rooted!(in(*cx) let mut error = UndefinedValue());
797 error.set(shutdown_error.get());
798 self.result_promise.reject_native(&error.handle(), can_gc);
800 } else {
801 self.result_promise.resolve_native(&(), can_gc);
803 }
804 }
805}
806
807#[derive(Clone, JSTraceable, MallocSizeOf)]
810struct SourceCancelPromiseFulfillmentHandler {
811 #[ignore_malloc_size_of = "Rc are hard"]
812 result: Rc<Promise>,
813}
814
815impl Callback for SourceCancelPromiseFulfillmentHandler {
816 fn callback(&self, _cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
820 self.result.resolve_native(&(), can_gc);
821 }
822}
823
824#[derive(Clone, JSTraceable, MallocSizeOf)]
827struct SourceCancelPromiseRejectionHandler {
828 #[ignore_malloc_size_of = "Rc are hard"]
829 result: Rc<Promise>,
830}
831
832impl Callback for SourceCancelPromiseRejectionHandler {
833 fn callback(&self, _cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
837 self.result.reject_native(&v, can_gc);
838 }
839}
840
841#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf, PartialEq)]
843pub(crate) enum ReadableStreamState {
844 #[default]
845 Readable,
846 Closed,
847 Errored,
848}
849
850#[derive(JSTraceable, MallocSizeOf)]
852#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
853pub(crate) enum ControllerType {
854 Byte(MutNullableDom<ReadableByteStreamController>),
856 Default(MutNullableDom<ReadableStreamDefaultController>),
858}
859
860#[derive(JSTraceable, MallocSizeOf)]
862#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
863pub(crate) enum ReaderType {
864 #[allow(clippy::upper_case_acronyms)]
866 BYOB(MutNullableDom<ReadableStreamBYOBReader>),
867 Default(MutNullableDom<ReadableStreamDefaultReader>),
869}
870
871impl Eq for ReaderType {}
872impl PartialEq for ReaderType {
873 fn eq(&self, other: &Self) -> bool {
874 matches!(
875 (self, other),
876 (ReaderType::BYOB(_), ReaderType::BYOB(_)) |
877 (ReaderType::Default(_), ReaderType::Default(_))
878 )
879 }
880}
881
882#[cfg_attr(crown, allow(crown::unrooted_must_root))]
884pub(crate) fn create_readable_stream(
885 global: &GlobalScope,
886 underlying_source_type: UnderlyingSourceType,
887 queuing_strategy: Option<Rc<QueuingStrategySize>>,
888 high_water_mark: Option<f64>,
889 can_gc: CanGc,
890) -> DomRoot<ReadableStream> {
891 let high_water_mark = high_water_mark.unwrap_or(1.0);
893
894 let size_algorithm =
896 queuing_strategy.unwrap_or(extract_size_algorithm(&QueuingStrategy::empty(), can_gc));
897
898 assert!(high_water_mark >= 0.0);
900
901 let stream = ReadableStream::new_with_proto(global, None, can_gc);
904
905 let controller = ReadableStreamDefaultController::new(
907 global,
908 underlying_source_type,
909 high_water_mark,
910 size_algorithm,
911 can_gc,
912 );
913
914 controller
917 .setup(stream.clone(), can_gc)
918 .expect("Setup of default controller cannot fail");
919
920 stream
922}
923
924#[dom_struct]
926pub(crate) struct ReadableStream {
927 reflector_: Reflector,
928
929 controller: RefCell<Option<ControllerType>>,
933
934 #[ignore_malloc_size_of = "mozjs"]
936 stored_error: Heap<JSVal>,
937
938 disturbed: Cell<bool>,
940
941 reader: RefCell<Option<ReaderType>>,
943
944 state: Cell<ReadableStreamState>,
946}
947
948impl ReadableStream {
949 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
950 fn new_inherited() -> ReadableStream {
952 ReadableStream {
953 reflector_: Reflector::new(),
954 controller: RefCell::new(None),
955 stored_error: Heap::default(),
956 disturbed: Default::default(),
957 reader: RefCell::new(None),
958 state: Cell::new(Default::default()),
959 }
960 }
961
962 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
963 pub(crate) fn new_with_proto(
964 global: &GlobalScope,
965 proto: Option<SafeHandleObject>,
966 can_gc: CanGc,
967 ) -> DomRoot<ReadableStream> {
968 reflect_dom_object_with_proto(
969 Box::new(ReadableStream::new_inherited()),
970 global,
971 proto,
972 can_gc,
973 )
974 }
975
976 pub(crate) fn set_default_controller(&self, controller: &ReadableStreamDefaultController) {
979 *self.controller.borrow_mut() = Some(ControllerType::Default(MutNullableDom::new(Some(
980 controller,
981 ))));
982 }
983
984 pub(crate) fn set_byte_controller(&self, controller: &ReadableByteStreamController) {
987 *self.controller.borrow_mut() =
988 Some(ControllerType::Byte(MutNullableDom::new(Some(controller))));
989 }
990
991 pub(crate) fn assert_no_controller(&self) {
994 let has_no_controller = self.controller.borrow().is_none();
995 assert!(has_no_controller);
996 }
997
998 pub(crate) fn new_from_bytes(
1000 global: &GlobalScope,
1001 bytes: Vec<u8>,
1002 can_gc: CanGc,
1003 ) -> Fallible<DomRoot<ReadableStream>> {
1004 let stream = ReadableStream::new_with_external_underlying_source(
1005 global,
1006 UnderlyingSourceType::Memory(bytes.len()),
1007 can_gc,
1008 )?;
1009 stream.enqueue_native(bytes, can_gc);
1010 stream.controller_close_native(can_gc);
1011 Ok(stream)
1012 }
1013
1014 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
1017 pub(crate) fn new_with_external_underlying_source(
1018 global: &GlobalScope,
1019 source: UnderlyingSourceType,
1020 can_gc: CanGc,
1021 ) -> Fallible<DomRoot<ReadableStream>> {
1022 assert!(source.is_native());
1023 let stream = ReadableStream::new_with_proto(global, None, can_gc);
1024 let controller = ReadableStreamDefaultController::new(
1025 global,
1026 source,
1027 1.0,
1028 extract_size_algorithm(&QueuingStrategy::empty(), can_gc),
1029 can_gc,
1030 );
1031 controller.setup(stream.clone(), can_gc)?;
1032 Ok(stream)
1033 }
1034
1035 pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
1037 match self.controller.borrow().as_ref() {
1038 Some(ControllerType::Default(controller)) => {
1039 let controller = controller
1040 .get()
1041 .ok_or_else(|| Error::Type("Stream should have controller.".to_string()))?;
1042 controller.perform_release_steps()
1043 },
1044 Some(ControllerType::Byte(controller)) => {
1045 let controller = controller
1046 .get()
1047 .ok_or_else(|| Error::Type("Stream should have controller.".to_string()))?;
1048 controller.perform_release_steps()
1049 },
1050 None => Err(Error::Type("Stream should have controller.".to_string())),
1051 }
1052 }
1053
1054 pub(crate) fn perform_pull_steps(
1058 &self,
1059 cx: SafeJSContext,
1060 read_request: &ReadRequest,
1061 can_gc: CanGc,
1062 ) {
1063 match self.controller.borrow().as_ref() {
1064 Some(ControllerType::Default(controller)) => controller
1065 .get()
1066 .expect("Stream should have controller.")
1067 .perform_pull_steps(read_request, can_gc),
1068 Some(ControllerType::Byte(controller)) => controller
1069 .get()
1070 .expect("Stream should have controller.")
1071 .perform_pull_steps(cx, read_request, can_gc),
1072 None => {
1073 unreachable!("Stream does not have a controller.");
1074 },
1075 }
1076 }
1077
1078 pub(crate) fn perform_pull_into(
1082 &self,
1083 cx: SafeJSContext,
1084 read_into_request: &ReadIntoRequest,
1085 view: HeapBufferSource<ArrayBufferViewU8>,
1086 options: &ReadableStreamBYOBReaderReadOptions,
1087 can_gc: CanGc,
1088 ) {
1089 match self.controller.borrow().as_ref() {
1090 Some(ControllerType::Byte(controller)) => controller
1091 .get()
1092 .expect("Stream should have controller.")
1093 .perform_pull_into(cx, read_into_request, view, options, can_gc),
1094 _ => {
1095 unreachable!(
1096 "Pulling a chunk from a stream with a default controller using a BYOB reader"
1097 )
1098 },
1099 }
1100 }
1101
1102 pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
1104 match self.reader.borrow().as_ref() {
1105 Some(ReaderType::Default(reader)) => {
1106 let Some(reader) = reader.get() else {
1107 panic!("Attempt to add a read request without having first acquired a reader.");
1108 };
1109
1110 assert!(self.is_readable());
1112
1113 reader.add_read_request(read_request);
1115 },
1116 _ => {
1117 unreachable!("Adding a read request can only be done on a default reader.")
1118 },
1119 }
1120 }
1121
1122 pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
1124 match self.reader.borrow().as_ref() {
1125 Some(ReaderType::BYOB(reader)) => {
1127 let Some(reader) = reader.get() else {
1128 unreachable!(
1129 "Attempt to add a read into request without having first acquired a reader."
1130 );
1131 };
1132
1133 assert!(self.is_readable() || self.is_closed());
1135
1136 reader.add_read_into_request(read_request);
1138 },
1139 _ => {
1140 unreachable!("Adding a read into request can only be done on a BYOB reader.")
1141 },
1142 }
1143 }
1144
1145 pub(crate) fn enqueue_native(&self, bytes: Vec<u8>, can_gc: CanGc) {
1148 match self.controller.borrow().as_ref() {
1149 Some(ControllerType::Default(controller)) => controller
1150 .get()
1151 .expect("Stream should have controller.")
1152 .enqueue_native(bytes, can_gc),
1153 _ => {
1154 unreachable!(
1155 "Enqueueing chunk to a stream from Rust on other than default controller"
1156 );
1157 },
1158 }
1159 }
1160
1161 pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
1163 assert!(self.is_readable());
1165
1166 self.state.set(ReadableStreamState::Errored);
1168
1169 self.stored_error.set(e.get());
1171
1172 match self.reader.borrow().as_ref() {
1175 Some(ReaderType::Default(reader)) => {
1176 let Some(reader) = reader.get() else {
1177 return;
1179 };
1180
1181 reader.error(e, can_gc);
1183 },
1184 Some(ReaderType::BYOB(reader)) => {
1185 let Some(reader) = reader.get() else {
1186 return;
1188 };
1189
1190 reader.error_read_into_requests(e, can_gc);
1192 },
1193 None => {
1194 },
1196 }
1197 }
1198
1199 pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
1201 handle_mut.set(self.stored_error.get());
1202 }
1203
1204 pub(crate) fn error_native(&self, error: Error, can_gc: CanGc) {
1207 let cx = GlobalScope::get_cx();
1208 rooted!(in(*cx) let mut error_val = UndefinedValue());
1209 error.to_jsval(cx, &self.global(), error_val.handle_mut(), can_gc);
1210 self.error(error_val.handle(), can_gc);
1211 }
1212
1213 pub(crate) fn controller_close_native(&self, can_gc: CanGc) {
1216 match self.controller.borrow().as_ref() {
1217 Some(ControllerType::Default(controller)) => {
1218 let _ = controller
1219 .get()
1220 .expect("Stream should have controller.")
1221 .Close(can_gc);
1222 },
1223 _ => {
1224 unreachable!("Native closing is only done on default controllers.")
1225 },
1226 }
1227 }
1228
1229 pub(crate) fn in_memory(&self) -> bool {
1232 match self.controller.borrow().as_ref() {
1233 Some(ControllerType::Default(controller)) => controller
1234 .get()
1235 .expect("Stream should have controller.")
1236 .in_memory(),
1237 _ => {
1238 unreachable!(
1239 "Checking if source is in memory for a stream with a non-default controller"
1240 )
1241 },
1242 }
1243 }
1244
1245 pub(crate) fn get_in_memory_bytes(&self) -> Option<IpcSharedMemory> {
1248 match self.controller.borrow().as_ref() {
1249 Some(ControllerType::Default(controller)) => controller
1250 .get()
1251 .expect("Stream should have controller.")
1252 .get_in_memory_bytes()
1253 .as_deref()
1254 .map(IpcSharedMemory::from_bytes),
1255 _ => {
1256 unreachable!("Getting in-memory bytes for a stream with a non-default controller")
1257 },
1258 }
1259 }
1260
1261 pub(crate) fn acquire_default_reader(
1266 &self,
1267 can_gc: CanGc,
1268 ) -> Fallible<DomRoot<ReadableStreamDefaultReader>> {
1269 let reader = ReadableStreamDefaultReader::new(&self.global(), can_gc);
1271
1272 reader.set_up(self, &self.global(), can_gc)?;
1274
1275 Ok(reader)
1277 }
1278
1279 pub(crate) fn acquire_byob_reader(
1281 &self,
1282 can_gc: CanGc,
1283 ) -> Fallible<DomRoot<ReadableStreamBYOBReader>> {
1284 let reader = ReadableStreamBYOBReader::new(&self.global(), can_gc);
1286 reader.set_up(self, &self.global(), can_gc)?;
1288
1289 Ok(reader)
1291 }
1292
1293 pub(crate) fn get_default_controller(&self) -> DomRoot<ReadableStreamDefaultController> {
1294 match self.controller.borrow().as_ref() {
1295 Some(ControllerType::Default(controller)) => {
1296 controller.get().expect("Stream should have controller.")
1297 },
1298 _ => {
1299 unreachable!(
1300 "Getting default controller for a stream with a non-default controller"
1301 )
1302 },
1303 }
1304 }
1305
1306 pub(crate) fn get_default_reader(&self) -> DomRoot<ReadableStreamDefaultReader> {
1307 match self.reader.borrow().as_ref() {
1308 Some(ReaderType::Default(reader)) => reader.get().expect("Stream should have reader."),
1309 _ => {
1310 unreachable!("Getting default reader for a stream with a non-default reader")
1311 },
1312 }
1313 }
1314
1315 pub(crate) fn read_a_chunk(&self, can_gc: CanGc) -> Rc<Promise> {
1321 match self.reader.borrow().as_ref() {
1322 Some(ReaderType::Default(reader)) => {
1323 let Some(reader) = reader.get() else {
1324 unreachable!(
1325 "Attempt to read stream chunk without having first acquired a reader."
1326 );
1327 };
1328 reader.Read(can_gc)
1329 },
1330 _ => {
1331 unreachable!("Native reading of a chunk can only be done with a default reader.")
1332 },
1333 }
1334 }
1335
1336 pub(crate) fn stop_reading(&self, can_gc: CanGc) {
1341 let reader_ref = self.reader.borrow();
1342
1343 match reader_ref.as_ref() {
1344 Some(ReaderType::Default(reader)) => {
1345 let Some(reader) = reader.get() else {
1346 unreachable!("Attempt to stop reading without having first acquired a reader.");
1347 };
1348
1349 drop(reader_ref);
1350 reader.release(can_gc).expect("Reader release cannot fail.");
1351 },
1352 _ => {
1353 unreachable!("Native stop reading can only be done with a default reader.")
1354 },
1355 }
1356 }
1357
1358 pub(crate) fn is_locked(&self) -> bool {
1360 match self.reader.borrow().as_ref() {
1361 Some(ReaderType::Default(reader)) => reader.get().is_some(),
1362 Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1363 None => false,
1364 }
1365 }
1366
1367 pub(crate) fn is_disturbed(&self) -> bool {
1368 self.disturbed.get()
1369 }
1370
1371 pub(crate) fn set_is_disturbed(&self, disturbed: bool) {
1372 self.disturbed.set(disturbed);
1373 }
1374
1375 pub(crate) fn is_closed(&self) -> bool {
1376 self.state.get() == ReadableStreamState::Closed
1377 }
1378
1379 pub(crate) fn is_errored(&self) -> bool {
1380 self.state.get() == ReadableStreamState::Errored
1381 }
1382
1383 pub(crate) fn is_readable(&self) -> bool {
1384 self.state.get() == ReadableStreamState::Readable
1385 }
1386
1387 pub(crate) fn has_default_reader(&self) -> bool {
1388 match self.reader.borrow().as_ref() {
1389 Some(ReaderType::Default(reader)) => reader.get().is_some(),
1390 _ => false,
1391 }
1392 }
1393
1394 pub(crate) fn has_byob_reader(&self) -> bool {
1395 match self.reader.borrow().as_ref() {
1396 Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1397 _ => false,
1398 }
1399 }
1400
1401 pub(crate) fn has_byte_controller(&self) -> bool {
1402 match self.controller.borrow().as_ref() {
1403 Some(ControllerType::Byte(controller)) => controller.get().is_some(),
1404 _ => false,
1405 }
1406 }
1407
1408 pub(crate) fn get_num_read_requests(&self) -> usize {
1410 match self.reader.borrow().as_ref() {
1411 Some(ReaderType::Default(reader)) => {
1412 let reader = reader
1413 .get()
1414 .expect("Stream must have a reader when getting the number of read requests.");
1415 reader.get_num_read_requests()
1416 },
1417 _ => unreachable!(
1418 "Stream must have a default reader when get num read requests is called into."
1419 ),
1420 }
1421 }
1422
1423 pub(crate) fn get_num_read_into_requests(&self) -> usize {
1425 assert!(self.has_byob_reader());
1426
1427 match self.reader.borrow().as_ref() {
1428 Some(ReaderType::BYOB(reader)) => {
1429 let Some(reader) = reader.get() else {
1430 unreachable!(
1431 "Stream must have a reader when get num read into requests is called into."
1432 );
1433 };
1434 reader.get_num_read_into_requests()
1435 },
1436 _ => {
1437 unreachable!(
1438 "Stream must have a BYOB reader when get num read into requests is called into."
1439 );
1440 },
1441 }
1442 }
1443
1444 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
1446 pub(crate) fn fulfill_read_request(&self, chunk: SafeHandleValue, done: bool, can_gc: CanGc) {
1447 assert!(self.has_default_reader());
1449
1450 match self.reader.borrow().as_ref() {
1451 Some(ReaderType::Default(reader)) => {
1452 let reader = reader
1454 .get()
1455 .expect("Stream must have a reader when a read request is fulfilled.");
1456 assert_ne!(reader.get_num_read_requests(), 0);
1458 let request = reader.remove_read_request();
1461
1462 if done {
1463 request.close_steps(can_gc);
1465 } else {
1466 let result = RootedTraceableBox::new(Heap::default());
1468 result.set(*chunk);
1469 request.chunk_steps(result, can_gc);
1470 }
1471 },
1472 _ => {
1473 unreachable!(
1474 "Stream must have a default reader when fulfill read requests is called into."
1475 );
1476 },
1477 }
1478 }
1479
1480 pub(crate) fn fulfill_read_into_request(
1482 &self,
1483 chunk: SafeHandleValue,
1484 done: bool,
1485 can_gc: CanGc,
1486 ) {
1487 assert!(self.has_byob_reader());
1489
1490 match self.reader.borrow().as_ref() {
1492 Some(ReaderType::BYOB(reader)) => {
1493 let Some(reader) = reader.get() else {
1494 unreachable!(
1495 "Stream must have a reader when a read into request is fulfilled."
1496 );
1497 };
1498
1499 assert!(reader.get_num_read_into_requests() > 0);
1501
1502 let read_into_request = reader.remove_read_into_request();
1505
1506 let result = RootedTraceableBox::new(Heap::default());
1508 if done {
1509 result.set(*chunk);
1510 read_into_request.close_steps(Some(result), can_gc);
1511 } else {
1512 result.set(*chunk);
1514 read_into_request.chunk_steps(result, can_gc);
1515 }
1516 },
1517 _ => {
1518 unreachable!(
1519 "Stream must have a BYOB reader when fulfill read into requests is called into."
1520 );
1521 },
1522 };
1523 }
1524
1525 pub(crate) fn close(&self, can_gc: CanGc) {
1527 assert!(self.is_readable());
1529 self.state.set(ReadableStreamState::Closed);
1531 match self.reader.borrow().as_ref() {
1533 Some(ReaderType::Default(reader)) => {
1534 let Some(reader) = reader.get() else {
1535 return;
1537 };
1538 reader.close(can_gc);
1540 },
1541 Some(ReaderType::BYOB(reader)) => {
1542 let Some(reader) = reader.get() else {
1543 return;
1545 };
1546
1547 reader.close(can_gc)
1548 },
1549 None => {
1550 },
1552 }
1553 }
1554
1555 pub(crate) fn cancel(
1557 &self,
1558 cx: SafeJSContext,
1559 global: &GlobalScope,
1560 reason: SafeHandleValue,
1561 can_gc: CanGc,
1562 ) -> Rc<Promise> {
1563 self.disturbed.set(true);
1565
1566 if self.is_closed() {
1568 return Promise::new_resolved(global, cx, (), can_gc);
1569 }
1570 if self.is_errored() {
1572 let promise = Promise::new(global, can_gc);
1573 rooted!(in(*cx) let mut rval = UndefinedValue());
1574 self.stored_error.safe_to_jsval(cx, rval.handle_mut());
1575 promise.reject_native(&rval.handle(), can_gc);
1576 return promise;
1577 }
1578 self.close(can_gc);
1580
1581 if let Some(ReaderType::BYOB(reader)) = self.reader.borrow().as_ref() {
1583 if let Some(reader) = reader.get() {
1584 reader.cancel(can_gc);
1586 }
1587 }
1588
1589 let source_cancel_promise = match self.controller.borrow().as_ref() {
1592 Some(ControllerType::Default(controller)) => controller
1593 .get()
1594 .expect("Stream should have controller.")
1595 .perform_cancel_steps(cx, global, reason, can_gc),
1596 Some(ControllerType::Byte(controller)) => controller
1597 .get()
1598 .expect("Stream should have controller.")
1599 .perform_cancel_steps(cx, global, reason, can_gc),
1600 None => {
1601 panic!("Stream does not have a controller.");
1602 },
1603 };
1604
1605 let global = self.global();
1608 let result_promise = Promise::new(&global, can_gc);
1609 let fulfillment_handler = Box::new(SourceCancelPromiseFulfillmentHandler {
1610 result: result_promise.clone(),
1611 });
1612 let rejection_handler = Box::new(SourceCancelPromiseRejectionHandler {
1613 result: result_promise.clone(),
1614 });
1615 let handler = PromiseNativeHandler::new(
1616 &global,
1617 Some(fulfillment_handler),
1618 Some(rejection_handler),
1619 can_gc,
1620 );
1621 let realm = enter_realm(&*global);
1622 let comp = InRealm::Entered(&realm);
1623 source_cancel_promise.append_native_handler(&handler, comp, can_gc);
1624
1625 result_promise
1628 }
1629
1630 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
1631 pub(crate) fn set_reader(&self, new_reader: Option<ReaderType>) {
1632 *self.reader.borrow_mut() = new_reader;
1633 }
1634
1635 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
1637 fn default_tee(
1638 &self,
1639 clone_for_branch_2: bool,
1640 can_gc: CanGc,
1641 ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1642 let clone_for_branch_2 = Rc::new(Cell::new(clone_for_branch_2));
1646
1647 let reader = self.acquire_default_reader(can_gc)?;
1649 self.set_reader(Some(ReaderType::Default(MutNullableDom::new(Some(
1650 &reader,
1651 )))));
1652
1653 let reading = Rc::new(Cell::new(false));
1655 let read_again = Rc::new(Cell::new(false));
1657 let canceled_1 = Rc::new(Cell::new(false));
1659 let canceled_2 = Rc::new(Cell::new(false));
1661
1662 let reason_1 = Rc::new(Heap::boxed(UndefinedValue()));
1664 let reason_2 = Rc::new(Heap::boxed(UndefinedValue()));
1666 let cancel_promise = Promise::new(&self.global(), can_gc);
1668
1669 let tee_source_1 = DefaultTeeUnderlyingSource::new(
1670 &reader,
1671 self,
1672 reading.clone(),
1673 read_again.clone(),
1674 canceled_1.clone(),
1675 canceled_2.clone(),
1676 clone_for_branch_2.clone(),
1677 reason_1.clone(),
1678 reason_2.clone(),
1679 cancel_promise.clone(),
1680 TeeCancelAlgorithm::Cancel1Algorithm,
1681 can_gc,
1682 );
1683
1684 let underlying_source_type_branch_1 =
1685 UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_1));
1686
1687 let tee_source_2 = DefaultTeeUnderlyingSource::new(
1688 &reader,
1689 self,
1690 reading,
1691 read_again,
1692 canceled_1.clone(),
1693 canceled_2.clone(),
1694 clone_for_branch_2,
1695 reason_1,
1696 reason_2,
1697 cancel_promise.clone(),
1698 TeeCancelAlgorithm::Cancel2Algorithm,
1699 can_gc,
1700 );
1701
1702 let underlying_source_type_branch_2 =
1703 UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_2));
1704
1705 let branch_1 = create_readable_stream(
1707 &self.global(),
1708 underlying_source_type_branch_1,
1709 None,
1710 None,
1711 can_gc,
1712 );
1713 tee_source_1.set_branch_1(&branch_1);
1714 tee_source_2.set_branch_1(&branch_1);
1715
1716 let branch_2 = create_readable_stream(
1718 &self.global(),
1719 underlying_source_type_branch_2,
1720 None,
1721 None,
1722 can_gc,
1723 );
1724 tee_source_1.set_branch_2(&branch_2);
1725 tee_source_2.set_branch_2(&branch_2);
1726
1727 reader.append_native_handler_to_closed_promise(
1729 &branch_1,
1730 &branch_2,
1731 canceled_1,
1732 canceled_2,
1733 cancel_promise,
1734 can_gc,
1735 );
1736
1737 Ok(vec![branch_1, branch_2])
1739 }
1740
1741 #[allow(clippy::too_many_arguments)]
1743 pub(crate) fn pipe_to(
1744 &self,
1745 cx: SafeJSContext,
1746 global: &GlobalScope,
1747 dest: &WritableStream,
1748 prevent_close: bool,
1749 prevent_abort: bool,
1750 prevent_cancel: bool,
1751 signal: Option<&AbortSignal>,
1752 realm: InRealm,
1753 can_gc: CanGc,
1754 ) -> Rc<Promise> {
1755 assert!(!self.is_locked());
1766
1767 assert!(!dest.is_locked());
1769
1770 let reader = self
1778 .acquire_default_reader(can_gc)
1779 .expect("Acquiring a default reader for pipe_to cannot fail");
1780
1781 let writer = dest
1783 .aquire_default_writer(cx, global, can_gc)
1784 .expect("Acquiring a default writer for pipe_to cannot fail");
1785
1786 self.disturbed.set(true);
1788
1789 let promise = Promise::new(global, can_gc);
1794
1795 rooted!(in(*cx) let pipe_to = PipeTo {
1797 reader: Dom::from_ref(&reader),
1798 writer: Dom::from_ref(&writer),
1799 pending_writes: Default::default(),
1800 state: Default::default(),
1801 prevent_abort,
1802 prevent_cancel,
1803 prevent_close,
1804 shutting_down: Default::default(),
1805 abort_reason: Default::default(),
1806 shutdown_error: Default::default(),
1807 shutdown_action_promise: Default::default(),
1808 result_promise: promise.clone(),
1809 });
1810
1811 if let Some(signal) = signal {
1814 rooted!(in(*cx) let abort_algorithm = AbortAlgorithm::StreamPiping(pipe_to.clone()));
1817
1818 if signal.aborted() {
1820 signal.run_abort_algorithm(cx, global, &abort_algorithm, realm, can_gc);
1821 return promise;
1822 }
1823
1824 signal.add(&abort_algorithm);
1826 }
1827
1828 pipe_to.check_and_propagate_errors_forward(cx, global, realm, can_gc);
1830 pipe_to.check_and_propagate_errors_backward(cx, global, realm, can_gc);
1831 pipe_to.check_and_propagate_closing_forward(cx, global, realm, can_gc);
1832 pipe_to.check_and_propagate_closing_backward(cx, global, realm, can_gc);
1833
1834 if *pipe_to.state.borrow() == PipeToState::Starting {
1836 pipe_to.wait_for_writer_ready(global, realm, can_gc);
1838 }
1839
1840 promise
1842 }
1843
1844 fn tee(
1846 &self,
1847 clone_for_branch_2: bool,
1848 can_gc: CanGc,
1849 ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1850 match self.controller.borrow().as_ref() {
1854 Some(ControllerType::Default(_)) => {
1855 self.default_tee(clone_for_branch_2, can_gc)
1857 },
1858 Some(ControllerType::Byte(_)) => {
1859 Err(Error::Type(
1862 "Teeing is not yet supported for byte streams".to_owned(),
1863 ))
1864 },
1865 None => {
1866 unreachable!("Stream should have a controller.");
1867 },
1868 }
1869 }
1870
1871 pub(crate) fn set_up_byte_controller(
1873 &self,
1874 global: &GlobalScope,
1875 underlying_source_dict: JsUnderlyingSource,
1876 underlying_source_handle: SafeHandleObject,
1877 stream: DomRoot<ReadableStream>,
1878 strategy_hwm: f64,
1879 can_gc: CanGc,
1880 ) -> Fallible<()> {
1881 if let Some(0) = underlying_source_dict.autoAllocateChunkSize {
1897 return Err(Error::Type("autoAllocateChunkSize cannot be 0".to_owned()));
1898 }
1899
1900 let controller = ReadableByteStreamController::new(
1901 UnderlyingSourceType::Js(underlying_source_dict, Heap::default()),
1902 strategy_hwm,
1903 global,
1904 can_gc,
1905 );
1906
1907 controller.set_underlying_source_this_object(underlying_source_handle);
1910
1911 controller.setup(global, stream, can_gc)
1914 }
1915
1916 pub(crate) fn setup_cross_realm_transform_readable(
1918 &self,
1919 cx: SafeJSContext,
1920 port: &MessagePort,
1921 can_gc: CanGc,
1922 ) {
1923 let port_id = port.message_port_id();
1924 let global = self.global();
1925
1926 let size_algorithm = extract_size_algorithm(&QueuingStrategy::default(), can_gc);
1931
1932 let controller = ReadableStreamDefaultController::new(
1936 &self.global(),
1937 UnderlyingSourceType::Transfer(Dom::from_ref(port)),
1938 0.,
1939 size_algorithm,
1940 can_gc,
1941 );
1942
1943 rooted!(in(*cx) let cross_realm_transform_readable = CrossRealmTransformReadable {
1946 controller: Dom::from_ref(&controller),
1947 });
1948 global.note_cross_realm_transform_readable(&cross_realm_transform_readable, port_id);
1949
1950 port.Start(can_gc);
1952
1953 controller
1955 .setup(DomRoot::from_ref(self), can_gc)
1956 .expect("Setting up controller for transfer cannot fail.");
1957 }
1958}
1959
1960impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
1961 fn Constructor(
1963 cx: SafeJSContext,
1964 global: &GlobalScope,
1965 proto: Option<SafeHandleObject>,
1966 can_gc: CanGc,
1967 underlying_source: Option<*mut JSObject>,
1968 strategy: &QueuingStrategy,
1969 ) -> Fallible<DomRoot<Self>> {
1970 rooted!(in(*cx) let underlying_source_obj = underlying_source.unwrap_or(ptr::null_mut()));
1972 let underlying_source_dict = if !underlying_source_obj.is_null() {
1975 rooted!(in(*cx) let obj_val = ObjectValue(underlying_source_obj.get()));
1976 match JsUnderlyingSource::new(cx, obj_val.handle(), can_gc) {
1977 Ok(ConversionResult::Success(val)) => val,
1978 Ok(ConversionResult::Failure(error)) => return Err(Error::Type(error.to_string())),
1979 _ => {
1980 return Err(Error::JSFailed);
1981 },
1982 }
1983 } else {
1984 JsUnderlyingSource::empty()
1985 };
1986
1987 let stream = ReadableStream::new_with_proto(global, proto, can_gc);
1989
1990 if underlying_source_dict.type_.is_some() {
1991 if strategy.size.is_some() {
1993 return Err(Error::Range(
1994 "size is not supported for byte streams".to_owned(),
1995 ));
1996 }
1997
1998 let strategy_hwm = extract_high_water_mark(strategy, 0.0)?;
2000
2001 stream.set_up_byte_controller(
2004 global,
2005 underlying_source_dict,
2006 underlying_source_obj.handle(),
2007 stream.clone(),
2008 strategy_hwm,
2009 can_gc,
2010 )?;
2011 } else {
2012 let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
2014
2015 let size_algorithm = extract_size_algorithm(strategy, can_gc);
2017
2018 let controller = ReadableStreamDefaultController::new(
2019 global,
2020 UnderlyingSourceType::Js(underlying_source_dict, Heap::default()),
2021 high_water_mark,
2022 size_algorithm,
2023 can_gc,
2024 );
2025
2026 controller.set_underlying_source_this_object(underlying_source_obj.handle());
2029
2030 controller.setup(stream.clone(), can_gc)?;
2032 };
2033
2034 Ok(stream)
2035 }
2036
2037 fn Locked(&self) -> bool {
2039 self.is_locked()
2040 }
2041
2042 fn Cancel(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
2044 let global = self.global();
2045 if self.is_locked() {
2046 let promise = Promise::new(&global, can_gc);
2049 promise.reject_error(Error::Type("stream is locked".to_owned()), can_gc);
2050 promise
2051 } else {
2052 self.cancel(cx, &global, reason, can_gc)
2054 }
2055 }
2056
2057 fn GetReader(
2059 &self,
2060 options: &ReadableStreamGetReaderOptions,
2061 can_gc: CanGc,
2062 ) -> Fallible<ReadableStreamReader> {
2063 if options.mode.is_none() {
2065 return Ok(ReadableStreamReader::ReadableStreamDefaultReader(
2066 self.acquire_default_reader(can_gc)?,
2067 ));
2068 }
2069 assert!(options.mode.unwrap() == ReadableStreamReaderMode::Byob);
2071
2072 Ok(ReadableStreamReader::ReadableStreamBYOBReader(
2074 self.acquire_byob_reader(can_gc)?,
2075 ))
2076 }
2077
2078 fn Tee(&self, can_gc: CanGc) -> Fallible<Vec<DomRoot<ReadableStream>>> {
2080 self.tee(false, can_gc)
2082 }
2083
2084 fn PipeTo(
2086 &self,
2087 destination: &WritableStream,
2088 options: &StreamPipeOptions,
2089 realm: InRealm,
2090 can_gc: CanGc,
2091 ) -> Rc<Promise> {
2092 let cx = GlobalScope::get_cx();
2093 let global = self.global();
2094
2095 if self.is_locked() {
2097 let promise = Promise::new(&global, can_gc);
2099 promise.reject_error(Error::Type("Source stream is locked".to_owned()), can_gc);
2100 return promise;
2101 }
2102
2103 if destination.is_locked() {
2105 let promise = Promise::new(&global, can_gc);
2107 promise.reject_error(
2108 Error::Type("Destination stream is locked".to_owned()),
2109 can_gc,
2110 );
2111 return promise;
2112 }
2113
2114 let signal = options.signal.as_deref();
2116
2117 self.pipe_to(
2119 cx,
2120 &global,
2121 destination,
2122 options.preventClose,
2123 options.preventAbort,
2124 options.preventCancel,
2125 signal,
2126 realm,
2127 can_gc,
2128 )
2129 }
2130
2131 fn PipeThrough(
2133 &self,
2134 transform: &ReadableWritablePair,
2135 options: &StreamPipeOptions,
2136 realm: InRealm,
2137 can_gc: CanGc,
2138 ) -> Fallible<DomRoot<ReadableStream>> {
2139 let global = self.global();
2140 let cx = GlobalScope::get_cx();
2141
2142 if self.is_locked() {
2144 return Err(Error::Type("Source stream is locked".to_owned()));
2145 }
2146
2147 if transform.writable.is_locked() {
2149 return Err(Error::Type("Destination stream is locked".to_owned()));
2150 }
2151
2152 let signal = options.signal.as_deref();
2154
2155 let promise = self.pipe_to(
2158 cx,
2159 &global,
2160 &transform.writable,
2161 options.preventClose,
2162 options.preventAbort,
2163 options.preventCancel,
2164 signal,
2165 realm,
2166 can_gc,
2167 );
2168
2169 promise.set_promise_is_handled();
2171
2172 Ok(transform.readable.clone())
2174 }
2175}
2176
2177#[allow(unsafe_code)]
2178pub(crate) unsafe fn get_type_and_value_from_message(
2182 cx: SafeJSContext,
2183 data: SafeHandleValue,
2184 value: SafeMutableHandleValue,
2185 can_gc: CanGc,
2186) -> DOMString {
2187 assert!(data.is_object());
2193 rooted!(in(*cx) let data_object = data.to_object());
2194
2195 rooted!(in(*cx) let mut type_ = UndefinedValue());
2197 get_dictionary_property(
2198 *cx,
2199 data_object.handle(),
2200 "type",
2201 type_.handle_mut(),
2202 can_gc,
2203 )
2204 .expect("Getting the type should not fail.");
2205
2206 get_dictionary_property(*cx, data_object.handle(), "value", value, can_gc)
2208 .expect("Getting the value should not fail.");
2209
2210 let result = DOMString::safe_from_jsval(cx, type_.handle(), StringificationBehavior::Empty)
2212 .expect("The type of the message should be a string");
2213 let ConversionResult::Success(type_string) = result else {
2214 unreachable!("The type of the message should be a string");
2215 };
2216
2217 type_string
2218}
2219
2220impl js::gc::Rootable for CrossRealmTransformReadable {}
2221
2222#[derive(Clone, JSTraceable, MallocSizeOf)]
2226#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
2227pub(crate) struct CrossRealmTransformReadable {
2228 controller: Dom<ReadableStreamDefaultController>,
2230}
2231
2232impl CrossRealmTransformReadable {
2233 #[allow(unsafe_code)]
2236 pub(crate) fn handle_message(
2237 &self,
2238 cx: SafeJSContext,
2239 global: &GlobalScope,
2240 port: &MessagePort,
2241 message: SafeHandleValue,
2242 _realm: InRealm,
2243 can_gc: CanGc,
2244 ) {
2245 rooted!(in(*cx) let mut value = UndefinedValue());
2246 let type_string =
2247 unsafe { get_type_and_value_from_message(cx, message, value.handle_mut(), can_gc) };
2248
2249 if type_string == "chunk" {
2251 self.controller
2253 .enqueue(cx, value.handle(), can_gc)
2254 .expect("Enqueing a chunk should not fail.");
2255 }
2256
2257 if type_string == "close" {
2259 self.controller.close(can_gc);
2261
2262 global.disentangle_port(port, can_gc);
2264 }
2265
2266 if type_string == "error" {
2268 self.controller.error(value.handle(), can_gc);
2270
2271 global.disentangle_port(port, can_gc);
2273 }
2274 }
2275
2276 pub(crate) fn handle_error(
2279 &self,
2280 cx: SafeJSContext,
2281 global: &GlobalScope,
2282 port: &MessagePort,
2283 _realm: InRealm,
2284 can_gc: CanGc,
2285 ) {
2286 let error = DOMException::new(global, DOMErrorName::DataCloneError, can_gc);
2288 rooted!(in(*cx) let mut rooted_error = UndefinedValue());
2289 error.safe_to_jsval(cx, rooted_error.handle_mut());
2290
2291 port.cross_realm_transform_send_error(rooted_error.handle(), can_gc);
2293
2294 self.controller.error(rooted_error.handle(), can_gc);
2296
2297 global.disentangle_port(port, can_gc);
2299 }
2300}
2301
2302#[allow(unsafe_code)]
2303pub(crate) fn get_read_promise_done(
2305 cx: SafeJSContext,
2306 v: &SafeHandleValue,
2307 can_gc: CanGc,
2308) -> Result<bool, Error> {
2309 if !v.is_object() {
2310 return Err(Error::Type("Unknown format for done property.".to_string()));
2311 }
2312 unsafe {
2313 rooted!(in(*cx) let object = v.to_object());
2314 rooted!(in(*cx) let mut done = UndefinedValue());
2315 match get_dictionary_property(*cx, object.handle(), "done", done.handle_mut(), can_gc) {
2316 Ok(true) => match bool::safe_from_jsval(cx, done.handle(), ()) {
2317 Ok(ConversionResult::Success(val)) => Ok(val),
2318 Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.to_string())),
2319 _ => Err(Error::Type("Unknown format for done property.".to_string())),
2320 },
2321 Ok(false) => Err(Error::Type("Promise has no done property.".to_string())),
2322 Err(()) => Err(Error::JSFailed),
2323 }
2324 }
2325}
2326
2327#[allow(unsafe_code)]
2328pub(crate) fn get_read_promise_bytes(
2330 cx: SafeJSContext,
2331 v: &SafeHandleValue,
2332 can_gc: CanGc,
2333) -> Result<Vec<u8>, Error> {
2334 if !v.is_object() {
2335 return Err(Error::Type(
2336 "Unknown format for for bytes read.".to_string(),
2337 ));
2338 }
2339 unsafe {
2340 rooted!(in(*cx) let object = v.to_object());
2341 rooted!(in(*cx) let mut bytes = UndefinedValue());
2342 match get_dictionary_property(*cx, object.handle(), "value", bytes.handle_mut(), can_gc) {
2343 Ok(true) => {
2344 match Vec::<u8>::safe_from_jsval(
2345 cx,
2346 bytes.handle(),
2347 ConversionBehavior::EnforceRange,
2348 ) {
2349 Ok(ConversionResult::Success(val)) => Ok(val),
2350 Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.to_string())),
2351 _ => Err(Error::Type("Unknown format for bytes read.".to_string())),
2352 }
2353 },
2354 Ok(false) => Err(Error::Type("Promise has no value property.".to_string())),
2355 Err(()) => Err(Error::JSFailed),
2356 }
2357 }
2358}
2359
2360impl Transferable for ReadableStream {
2362 type Index = MessagePortIndex;
2363 type Data = MessagePortImpl;
2364
2365 fn transfer(&self) -> Fallible<(MessagePortId, MessagePortImpl)> {
2367 if self.is_locked() {
2370 return Err(Error::DataClone(None));
2371 }
2372
2373 let global = self.global();
2374 let realm = enter_realm(&*global);
2375 let comp = InRealm::Entered(&realm);
2376 let cx = GlobalScope::get_cx();
2377 let can_gc = CanGc::note();
2378
2379 let port_1 = MessagePort::new(&global, can_gc);
2381 global.track_message_port(&port_1, None);
2382
2383 let port_2 = MessagePort::new(&global, can_gc);
2385 global.track_message_port(&port_2, None);
2386
2387 global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
2389
2390 let writable = WritableStream::new_with_proto(&global, None, can_gc);
2392
2393 writable.setup_cross_realm_transform_writable(cx, &port_1, can_gc);
2395
2396 let promise = self.pipe_to(
2398 cx, &global, &writable, false, false, false, None, comp, can_gc,
2399 );
2400
2401 promise.set_promise_is_handled();
2403
2404 port_2.transfer()
2406 }
2407
2408 fn transfer_receive(
2410 owner: &GlobalScope,
2411 id: MessagePortId,
2412 port_impl: MessagePortImpl,
2413 ) -> Result<DomRoot<Self>, ()> {
2414 let cx = GlobalScope::get_cx();
2415 let can_gc = CanGc::note();
2416
2417 let value = ReadableStream::new_with_proto(owner, None, can_gc);
2420
2421 let transferred_port = MessagePort::transfer_receive(owner, id, port_impl)?;
2428
2429 value.setup_cross_realm_transform_readable(cx, &transferred_port, can_gc);
2431 Ok(value)
2432 }
2433
2434 fn serialized_storage<'a>(
2436 data: StructuredData<'a, '_>,
2437 ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
2438 match data {
2439 StructuredData::Reader(r) => &mut r.port_impls,
2440 StructuredData::Writer(w) => &mut w.ports,
2441 }
2442 }
2443}