1use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr::{self};
8use std::rc::Rc;
9
10use servo_base::generic_channel::GenericSharedMemory;
11use servo_base::id::{MessagePortId, MessagePortIndex};
12use servo_constellation_traits::MessagePortImpl;
13use dom_struct::dom_struct;
14use js::jsapi::{Heap, JSObject};
15use js::jsval::{JSVal, ObjectValue, UndefinedValue};
16use js::realm::CurrentRealm;
17use js::rust::{
18 HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
19 MutableHandleValue as SafeMutableHandleValue,
20};
21use js::typedarray::ArrayBufferViewU8;
22use rustc_hash::FxHashMap;
23use script_bindings::conversions::SafeToJSValConvertible;
24
25use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
26use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{
27 ReadableStreamGetReaderOptions, ReadableStreamMethods, ReadableStreamReaderMode,
28 ReadableWritablePair, StreamPipeOptions,
29};
30use script_bindings::str::DOMString;
31
32use crate::dom::domexception::{DOMErrorName, DOMException};
33use script_bindings::conversions::{is_array_like, StringificationBehavior};
34use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
35use crate::dom::abortsignal::{AbortAlgorithm, AbortSignal};
36use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods;
37use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultController_Binding::ReadableStreamDefaultControllerMethods;
38use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource;
39use crate::dom::bindings::conversions::{ConversionBehavior, ConversionResult, SafeFromJSValConvertible};
40use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
41use crate::dom::bindings::codegen::GenericBindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriter_Binding::WritableStreamDefaultWriterMethods;
42use crate::dom::stream::writablestream::WritableStream;
43use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultReaderOrReadableStreamBYOBReader as ReadableStreamReader;
44use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
45use crate::dom::bindings::root::{DomRoot, MutNullableDom, Dom};
46use crate::dom::bindings::trace::RootedTraceableBox;
47use crate::dom::bindings::utils::get_dictionary_property;
48use crate::dom::stream::byteteeunderlyingsource::{ByteTeeCancelAlgorithm, ByteTeePullAlgorithm, ByteTeeUnderlyingSource};
49use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
50use crate::dom::stream::readablestreamgenericreader::ReadableStreamGenericReader;
51use crate::dom::globalscope::GlobalScope;
52use crate::dom::promise::{wait_for_all_promise, Promise};
53use crate::dom::stream::readablebytestreamcontroller::ReadableByteStreamController;
54use crate::dom::stream::readablestreambyobreader::ReadableStreamBYOBReader;
55use crate::dom::stream::readablestreamdefaultcontroller::ReadableStreamDefaultController;
56use crate::dom::stream::readablestreamdefaultreader::{ReadRequest, ReadableStreamDefaultReader};
57use crate::dom::stream::defaultteeunderlyingsource::DefaultTeeCancelAlgorithm;
58use crate::dom::types::DefaultTeeUnderlyingSource;
59use crate::dom::stream::underlyingsourcecontainer::UnderlyingSourceType;
60use crate::dom::stream::writablestreamdefaultwriter::WritableStreamDefaultWriter;
61use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
62use crate::dom::messageport::MessagePort;
63use crate::realms::{enter_realm, InRealm, enter_auto_realm};
64use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
65use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
66use crate::dom::bindings::transferable::Transferable;
67use crate::dom::bindings::structuredclone::StructuredData;
68
69use crate::dom::bindings::buffer_source::HeapBufferSource;
70use super::readablestreambyobreader::ReadIntoRequest;
71
72#[derive(Clone, Debug, Default, MallocSizeOf, PartialEq)]
74enum PipeToState {
75 #[default]
77 Starting,
78 PendingReady,
80 PendingRead,
82 ShuttingDownWithPendingWrites(Option<ShutdownAction>),
85 ShuttingDownPendingAction,
89 Finalized,
92}
93
94#[derive(Clone, Debug, MallocSizeOf, PartialEq)]
96enum ShutdownAction {
97 WritableStreamAbort,
99 ReadableStreamCancel,
101 WritableStreamDefaultWriterCloseWithErrorPropagation,
103 Abort,
105}
106
107impl js::gc::Rootable for PipeTo {}
108
109#[derive(Clone, JSTraceable, MallocSizeOf)]
118#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
119pub(crate) struct PipeTo {
120 reader: Dom<ReadableStreamDefaultReader>,
122
123 writer: Dom<WritableStreamDefaultWriter>,
125
126 #[ignore_malloc_size_of = "nested Rc"]
129 pending_writes: Rc<RefCell<VecDeque<Rc<Promise>>>>,
130
131 #[conditional_malloc_size_of]
133 #[no_trace]
134 state: Rc<RefCell<PipeToState>>,
135
136 prevent_abort: bool,
138
139 prevent_cancel: bool,
141
142 prevent_close: bool,
144
145 #[conditional_malloc_size_of]
148 shutting_down: Rc<Cell<bool>>,
149
150 #[ignore_malloc_size_of = "mozjs"]
153 abort_reason: Rc<Heap<JSVal>>,
154
155 #[ignore_malloc_size_of = "mozjs"]
158 shutdown_error: Rc<RefCell<Option<Heap<JSVal>>>>,
159
160 #[ignore_malloc_size_of = "nested Rc"]
163 shutdown_action_promise: Rc<RefCell<Option<Rc<Promise>>>>,
164
165 #[conditional_malloc_size_of]
168 result_promise: Rc<Promise>,
169}
170
171impl PipeTo {
172 pub(crate) fn abort_with_reason(
175 &self,
176 cx: &mut CurrentRealm,
177 global: &GlobalScope,
178 reason: SafeHandleValue,
179 ) {
180 if self.shutting_down.get() {
182 return;
183 }
184
185 self.abort_reason.set(reason.get());
189
190 self.set_shutdown_error(reason);
195
196 self.shutdown(cx, global, Some(ShutdownAction::Abort));
202 }
203}
204
205impl Callback for PipeTo {
206 #[expect(unsafe_code)]
215 fn callback(&self, cx: &mut CurrentRealm, result: SafeHandleValue) {
216 let in_realm_proof = cx.into();
217 let realm = InRealm::Already(&in_realm_proof);
218 let global = self.reader.global();
219
220 self.pending_writes.borrow_mut().retain(|p| {
226 let pending = p.is_pending();
227 if !pending {
228 p.set_promise_is_handled();
229 }
230 pending
231 });
232
233 let state_before_checks = self.state.borrow().clone();
235
236 if state_before_checks == PipeToState::PendingRead {
243 let source = self.reader.get_stream().expect("Source stream must be set");
244 if source.is_closed() {
245 let dest = self
246 .writer
247 .get_stream()
248 .expect("Destination stream must be set");
249
250 if dest.is_writable() && !dest.close_queued_or_in_flight() {
253 let Ok(done) = get_read_promise_done(cx.into(), &result, CanGc::from_cx(cx))
254 else {
255 return;
262 };
263
264 if !done {
265 self.write_chunk(cx, &global, result);
267 }
268 }
269 }
270 }
271
272 self.check_and_propagate_errors_forward(cx, &global);
273 self.check_and_propagate_errors_backward(cx, &global);
274 self.check_and_propagate_closing_forward(cx, &global);
275 self.check_and_propagate_closing_backward(cx, &global);
276
277 let state = self.state.borrow().clone();
279
280 if state != state_before_checks {
284 return;
285 }
286
287 match state {
288 PipeToState::Starting => unreachable!("PipeTo should not be in the Starting state."),
289 PipeToState::PendingReady => {
290 self.read_chunk(cx, &global);
292 },
293 PipeToState::PendingRead => {
294 self.write_chunk(cx, &global, result);
296
297 if self.shutting_down.get() {
299 return;
300 }
301
302 self.wait_for_writer_ready(cx, &global);
304 },
305 PipeToState::ShuttingDownWithPendingWrites(action) => {
306 if let Some(write) = self.pending_writes.borrow_mut().front().cloned() {
309 self.wait_on_pending_write(&global, write, realm, CanGc::from_cx(cx));
310 return;
311 }
312
313 if let Some(action) = action {
315 self.perform_action(cx, &global, action);
317 } else {
318 self.finalize(cx, &global);
320 }
321 },
322 PipeToState::ShuttingDownPendingAction => {
323 let Some(ref promise) = *self.shutdown_action_promise.borrow() else {
324 unreachable!();
325 };
326 if promise.is_pending() {
327 return;
331 }
332
333 let is_array_like = {
334 if !result.is_object() {
335 false
336 } else {
337 unsafe { is_array_like::<crate::DomTypeHolder>(cx.raw_cx(), result) }
338 }
339 };
340
341 if !result.is_undefined() && !is_array_like {
343 self.set_shutdown_error(result);
353 }
354 self.finalize(cx, &global);
355 },
356 PipeToState::Finalized => {},
357 }
358 }
359}
360
361impl PipeTo {
362 fn set_shutdown_error(&self, error: SafeHandleValue) {
365 *self.shutdown_error.borrow_mut() = Some(Heap::default());
366 let Some(ref heap) = *self.shutdown_error.borrow() else {
367 unreachable!("Option set to Some(heap) above.");
368 };
369 heap.set(error.get())
370 }
371
372 fn wait_for_writer_ready(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
375 let realm = cx.into();
376 let realm = InRealm::Already(&realm);
377
378 {
379 let mut state = self.state.borrow_mut();
380 *state = PipeToState::PendingReady;
381 }
382
383 let ready_promise = self.writer.Ready();
384 if ready_promise.is_fulfilled() {
385 self.read_chunk(cx, global);
386 } else {
387 let handler = PromiseNativeHandler::new(
388 global,
389 Some(Box::new(self.clone())),
390 Some(Box::new(self.clone())),
391 CanGc::from_cx(cx),
392 );
393 ready_promise.append_native_handler(&handler, realm, CanGc::from_cx(cx));
394
395 let closed_promise = self.reader.Closed();
399 closed_promise.append_native_handler(&handler, realm, CanGc::from_cx(cx));
400 }
401 }
402
403 fn read_chunk(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
405 let realm = cx.into();
406 let realm = InRealm::Already(&realm);
407
408 *self.state.borrow_mut() = PipeToState::PendingRead;
409 let chunk_promise = self.reader.Read(cx);
410 let handler = PromiseNativeHandler::new(
411 global,
412 Some(Box::new(self.clone())),
413 Some(Box::new(self.clone())),
414 CanGc::from_cx(cx),
415 );
416 chunk_promise.append_native_handler(&handler, realm, CanGc::from_cx(cx));
417
418 let ready_promise = self.writer.Closed();
421 ready_promise.append_native_handler(&handler, realm, CanGc::from_cx(cx));
422 }
423
424 #[expect(unsafe_code)]
427 fn write_chunk(
428 &self,
429 cx: &mut js::context::JSContext,
430 global: &GlobalScope,
431 chunk: SafeHandleValue,
432 ) -> bool {
433 if chunk.is_object() {
434 rooted!(&in(cx) let object = chunk.to_object());
435 rooted!(&in(cx) let mut bytes = UndefinedValue());
436 let has_value = unsafe {
437 get_dictionary_property(
438 cx.raw_cx(),
439 object.handle(),
440 c"value",
441 bytes.handle_mut(),
442 CanGc::from_cx(cx),
443 )
444 .expect("Chunk should have a value.")
445 };
446 if has_value {
447 let write_promise = self.writer.write(cx, global, bytes.handle());
449 self.pending_writes.borrow_mut().push_back(write_promise);
450 return true;
451 }
452 }
453 false
454 }
455
456 fn wait_on_pending_write(
460 &self,
461 global: &GlobalScope,
462 promise: Rc<Promise>,
463 realm: InRealm,
464 can_gc: CanGc,
465 ) {
466 let handler = PromiseNativeHandler::new(
467 global,
468 Some(Box::new(self.clone())),
469 Some(Box::new(self.clone())),
470 can_gc,
471 );
472 promise.append_native_handler(&handler, realm, can_gc);
473 }
474
475 fn check_and_propagate_errors_forward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
478 if self.shutting_down.get() {
481 return;
482 }
483
484 let source = self
486 .reader
487 .get_stream()
488 .expect("Reader should still have a stream");
489 if source.is_errored() {
490 rooted!(&in(cx) let mut source_error = UndefinedValue());
491 source.get_stored_error(source_error.handle_mut());
492 self.set_shutdown_error(source_error.handle());
493
494 if !self.prevent_abort {
496 self.shutdown(cx, global, Some(ShutdownAction::WritableStreamAbort))
499 } else {
500 self.shutdown(cx, global, None);
502 }
503 }
504 }
505
506 fn check_and_propagate_errors_backward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
509 if self.shutting_down.get() {
512 return;
513 }
514
515 let dest = self
517 .writer
518 .get_stream()
519 .expect("Writer should still have a stream");
520 if dest.is_errored() {
521 rooted!(&in(cx) let mut dest_error = UndefinedValue());
522 dest.get_stored_error(dest_error.handle_mut());
523 self.set_shutdown_error(dest_error.handle());
524
525 if !self.prevent_cancel {
527 self.shutdown(cx, global, Some(ShutdownAction::ReadableStreamCancel))
530 } else {
531 self.shutdown(cx, global, None);
533 }
534 }
535 }
536
537 fn check_and_propagate_closing_forward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
540 if self.shutting_down.get() {
543 return;
544 }
545
546 let source = self
548 .reader
549 .get_stream()
550 .expect("Reader should still have a stream");
551 if source.is_closed() {
552 if !self.prevent_close {
554 self.shutdown(
557 cx,
558 global,
559 Some(ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation),
560 )
561 } else {
562 self.shutdown(cx, global, None);
564 }
565 }
566 }
567
568 fn check_and_propagate_closing_backward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
571 if self.shutting_down.get() {
574 return;
575 }
576
577 let dest = self
580 .writer
581 .get_stream()
582 .expect("Writer should still have a stream");
583 if dest.close_queued_or_in_flight() || dest.is_closed() {
584 rooted!(&in(cx) let mut dest_closed = UndefinedValue());
589 let error =
590 Error::Type(c"Destination is closed or has closed queued or in flight".to_owned());
591 error.to_jsval(
592 cx.into(),
593 global,
594 dest_closed.handle_mut(),
595 CanGc::from_cx(cx),
596 );
597 self.set_shutdown_error(dest_closed.handle());
598
599 if !self.prevent_cancel {
601 self.shutdown(cx, global, Some(ShutdownAction::ReadableStreamCancel))
604 } else {
605 self.shutdown(cx, global, None);
607 }
608 }
609 }
610
611 fn shutdown(
615 &self,
616 cx: &mut CurrentRealm,
617 global: &GlobalScope,
618 action: Option<ShutdownAction>,
619 ) {
620 let realm = cx.into();
621 let realm = InRealm::Already(&realm);
622 if !self.shutting_down.replace(true) {
625 let dest = self.writer.get_stream().expect("Stream must be set");
626 if dest.is_writable() && !dest.close_queued_or_in_flight() {
629 if let Some(write) = self.pending_writes.borrow_mut().front() {
635 *self.state.borrow_mut() = PipeToState::ShuttingDownWithPendingWrites(action);
636 self.wait_on_pending_write(global, write.clone(), realm, CanGc::from_cx(cx));
637 return;
638 }
639 }
640
641 if let Some(action) = action {
643 self.perform_action(cx, global, action);
645 } else {
646 self.finalize(cx, global);
648 }
649 }
650 }
651
652 fn perform_action(&self, cx: &mut CurrentRealm, global: &GlobalScope, action: ShutdownAction) {
655 let realm = cx.into();
656 let realm = InRealm::Already(&realm);
657 rooted!(&in(cx) let mut error = UndefinedValue());
658 if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
659 error.set(shutdown_error.get());
660 }
661
662 *self.state.borrow_mut() = PipeToState::ShuttingDownPendingAction;
663
664 let promise = match action {
666 ShutdownAction::WritableStreamAbort => {
667 let dest = self.writer.get_stream().expect("Stream must be set");
668 dest.abort(cx, global, error.handle())
669 },
670 ShutdownAction::ReadableStreamCancel => {
671 let source = self
672 .reader
673 .get_stream()
674 .expect("Reader should have a stream.");
675 source.cancel(cx, global, error.handle())
676 },
677 ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation => {
678 self.writer.close_with_error_propagation(cx, global)
679 },
680 ShutdownAction::Abort => {
681 rooted!(&in(cx) let mut error = UndefinedValue());
686 error.set(self.abort_reason.get());
687
688 let mut actions = vec![];
690
691 if !self.prevent_abort {
693 let dest = self
694 .writer
695 .get_stream()
696 .expect("Destination stream must be set");
697
698 let promise = if dest.is_writable() {
700 dest.abort(cx, global, error.handle())
702 } else {
703 Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
705 };
706 actions.push(promise);
707 }
708
709 if !self.prevent_cancel {
711 let source = self.reader.get_stream().expect("Source stream must be set");
712
713 let promise = if source.is_readable() {
715 source.cancel(cx, global, error.handle())
717 } else {
718 Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
720 };
721 actions.push(promise);
722 }
723
724 wait_for_all_promise(cx.into(), global, actions, realm, CanGc::from_cx(cx))
728 },
729 };
730
731 let handler = PromiseNativeHandler::new(
734 global,
735 Some(Box::new(self.clone())),
736 Some(Box::new(self.clone())),
737 CanGc::from_cx(cx),
738 );
739 promise.append_native_handler(&handler, realm, CanGc::from_cx(cx));
740 *self.shutdown_action_promise.borrow_mut() = Some(promise);
741 }
742
743 fn finalize(&self, cx: &mut js::context::JSContext, global: &GlobalScope) {
745 *self.state.borrow_mut() = PipeToState::Finalized;
746
747 self.writer.release(cx.into(), global, CanGc::from_cx(cx));
749
750 self.reader
756 .release(CanGc::from_cx(cx))
757 .expect("Releasing the reader should not fail");
758
759 if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
765 rooted!(&in(cx) let mut error = UndefinedValue());
766 error.set(shutdown_error.get());
767 self.result_promise
769 .reject_native(&error.handle(), CanGc::from_cx(cx));
770 } else {
771 self.result_promise.resolve_native(&(), CanGc::from_cx(cx));
773 }
774 }
775}
776
777#[derive(Clone, JSTraceable, MallocSizeOf)]
780struct SourceCancelPromiseFulfillmentHandler {
781 #[conditional_malloc_size_of]
782 result: Rc<Promise>,
783}
784
785impl Callback for SourceCancelPromiseFulfillmentHandler {
786 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
790 self.result.resolve_native(&(), CanGc::from_cx(cx));
791 }
792}
793
794#[derive(Clone, JSTraceable, MallocSizeOf)]
797struct SourceCancelPromiseRejectionHandler {
798 #[conditional_malloc_size_of]
799 result: Rc<Promise>,
800}
801
802impl Callback for SourceCancelPromiseRejectionHandler {
803 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
807 self.result.reject_native(&v, CanGc::from_cx(cx));
808 }
809}
810
811#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf, PartialEq)]
813pub(crate) enum ReadableStreamState {
814 #[default]
815 Readable,
816 Closed,
817 Errored,
818}
819
820#[derive(JSTraceable, MallocSizeOf)]
822#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
823pub(crate) enum ControllerType {
824 Byte(MutNullableDom<ReadableByteStreamController>),
826 Default(MutNullableDom<ReadableStreamDefaultController>),
828}
829
830#[derive(JSTraceable, MallocSizeOf)]
832#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
833pub(crate) enum ReaderType {
834 #[allow(clippy::upper_case_acronyms)]
836 BYOB(MutNullableDom<ReadableStreamBYOBReader>),
837 Default(MutNullableDom<ReadableStreamDefaultReader>),
839}
840
841impl Eq for ReaderType {}
842impl PartialEq for ReaderType {
843 fn eq(&self, other: &Self) -> bool {
844 matches!(
845 (self, other),
846 (ReaderType::BYOB(_), ReaderType::BYOB(_)) |
847 (ReaderType::Default(_), ReaderType::Default(_))
848 )
849 }
850}
851
852#[cfg_attr(crown, expect(crown::unrooted_must_root))]
854pub(crate) fn create_readable_stream(
855 cx: &mut js::context::JSContext,
856 global: &GlobalScope,
857 underlying_source_type: UnderlyingSourceType,
858 queuing_strategy: Option<Rc<QueuingStrategySize>>,
859 high_water_mark: Option<f64>,
860) -> DomRoot<ReadableStream> {
861 let high_water_mark = high_water_mark.unwrap_or(1.0);
863
864 let size_algorithm = queuing_strategy.unwrap_or(extract_size_algorithm(
866 &QueuingStrategy::empty(),
867 CanGc::from_cx(cx),
868 ));
869
870 assert!(high_water_mark >= 0.0);
872
873 let stream = ReadableStream::new_with_proto(global, None, CanGc::from_cx(cx));
876
877 let controller = ReadableStreamDefaultController::new(
879 global,
880 underlying_source_type,
881 high_water_mark,
882 size_algorithm,
883 CanGc::from_cx(cx),
884 );
885
886 controller
889 .setup(cx, stream.clone())
890 .expect("Setup of default controller cannot fail");
891
892 stream
894}
895
896#[cfg_attr(crown, expect(crown::unrooted_must_root))]
898fn readable_byte_stream_tee(
899 cx: &mut js::context::JSContext,
900 global: &GlobalScope,
901 underlying_source_type: UnderlyingSourceType,
902) -> DomRoot<ReadableStream> {
903 let tee_stream = ReadableStream::new_with_proto(global, None, CanGc::from_cx(cx));
906
907 let controller =
909 ReadableByteStreamController::new(underlying_source_type, 0.0, global, CanGc::from_cx(cx));
910
911 controller
913 .setup(cx, global, tee_stream.clone())
914 .expect("Setup of byte stream controller cannot fail");
915
916 tee_stream
918}
919
920#[dom_struct]
922pub(crate) struct ReadableStream {
923 reflector_: Reflector,
924
925 controller: RefCell<Option<ControllerType>>,
929
930 #[ignore_malloc_size_of = "mozjs"]
932 stored_error: Heap<JSVal>,
933
934 disturbed: Cell<bool>,
936
937 reader: RefCell<Option<ReaderType>>,
939
940 state: Cell<ReadableStreamState>,
942}
943
944impl ReadableStream {
945 fn new_inherited() -> ReadableStream {
947 ReadableStream {
948 reflector_: Reflector::new(),
949 controller: RefCell::new(None),
950 stored_error: Heap::default(),
951 disturbed: Default::default(),
952 reader: RefCell::new(None),
953 state: Cell::new(Default::default()),
954 }
955 }
956
957 pub(crate) fn new_with_proto(
958 global: &GlobalScope,
959 proto: Option<SafeHandleObject>,
960 can_gc: CanGc,
961 ) -> DomRoot<ReadableStream> {
962 reflect_dom_object_with_proto(
963 Box::new(ReadableStream::new_inherited()),
964 global,
965 proto,
966 can_gc,
967 )
968 }
969
970 pub(crate) fn set_default_controller(&self, controller: &ReadableStreamDefaultController) {
973 *self.controller.borrow_mut() = Some(ControllerType::Default(MutNullableDom::new(Some(
974 controller,
975 ))));
976 }
977
978 pub(crate) fn set_byte_controller(&self, controller: &ReadableByteStreamController) {
981 *self.controller.borrow_mut() =
982 Some(ControllerType::Byte(MutNullableDom::new(Some(controller))));
983 }
984
985 pub(crate) fn assert_no_controller(&self) {
988 let has_no_controller = self.controller.borrow().is_none();
989 assert!(has_no_controller);
990 }
991
992 pub(crate) fn new_from_bytes(
994 cx: &mut js::context::JSContext,
995 global: &GlobalScope,
996 bytes: Vec<u8>,
997 ) -> Fallible<DomRoot<ReadableStream>> {
998 let stream = ReadableStream::new_with_external_underlying_source(
999 cx,
1000 global,
1001 UnderlyingSourceType::Memory(bytes.len()),
1002 )?;
1003 stream.enqueue_native(bytes, CanGc::from_cx(cx));
1004 stream.controller_close_native(CanGc::from_cx(cx));
1005 Ok(stream)
1006 }
1007
1008 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1011 pub(crate) fn new_with_external_underlying_source(
1012 cx: &mut js::context::JSContext,
1013 global: &GlobalScope,
1014 source: UnderlyingSourceType,
1015 ) -> Fallible<DomRoot<ReadableStream>> {
1016 assert!(source.is_native());
1017 let stream = ReadableStream::new_with_proto(global, None, CanGc::from_cx(cx));
1018 let controller = ReadableStreamDefaultController::new(
1019 global,
1020 source,
1021 1.0,
1022 extract_size_algorithm(&QueuingStrategy::empty(), CanGc::from_cx(cx)),
1023 CanGc::from_cx(cx),
1024 );
1025 controller.setup(cx, stream.clone())?;
1026 Ok(stream)
1027 }
1028
1029 pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
1031 match self.controller.borrow().as_ref() {
1032 Some(ControllerType::Default(controller)) => {
1033 let controller = controller
1034 .get()
1035 .ok_or_else(|| Error::Type(c"Stream should have controller.".to_owned()))?;
1036 controller.perform_release_steps()
1037 },
1038 Some(ControllerType::Byte(controller)) => {
1039 let controller = controller
1040 .get()
1041 .ok_or_else(|| Error::Type(c"Stream should have controller.".to_owned()))?;
1042 controller.perform_release_steps()
1043 },
1044 None => Err(Error::Type(c"Stream should have controller.".to_owned())),
1045 }
1046 }
1047
1048 pub(crate) fn perform_pull_steps(
1052 &self,
1053 cx: SafeJSContext,
1054 read_request: &ReadRequest,
1055 can_gc: CanGc,
1056 ) {
1057 match self.controller.borrow().as_ref() {
1058 Some(ControllerType::Default(controller)) => controller
1059 .get()
1060 .expect("Stream should have controller.")
1061 .perform_pull_steps(read_request, can_gc),
1062 Some(ControllerType::Byte(controller)) => controller
1063 .get()
1064 .expect("Stream should have controller.")
1065 .perform_pull_steps(cx, read_request, can_gc),
1066 None => {
1067 unreachable!("Stream does not have a controller.");
1068 },
1069 }
1070 }
1071
1072 pub(crate) fn perform_pull_into(
1076 &self,
1077 cx: SafeJSContext,
1078 read_into_request: &ReadIntoRequest,
1079 view: HeapBufferSource<ArrayBufferViewU8>,
1080 min: u64,
1081 can_gc: CanGc,
1082 ) {
1083 match self.controller.borrow().as_ref() {
1084 Some(ControllerType::Byte(controller)) => controller
1085 .get()
1086 .expect("Stream should have controller.")
1087 .perform_pull_into(cx, read_into_request, view, min, can_gc),
1088 _ => {
1089 unreachable!(
1090 "Pulling a chunk from a stream with a default controller using a BYOB reader"
1091 )
1092 },
1093 }
1094 }
1095
1096 pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
1098 match self.reader.borrow().as_ref() {
1099 Some(ReaderType::Default(reader)) => {
1100 let Some(reader) = reader.get() else {
1101 panic!("Attempt to add a read request without having first acquired a reader.");
1102 };
1103
1104 assert!(self.is_readable());
1106
1107 reader.add_read_request(read_request);
1109 },
1110 _ => {
1111 unreachable!("Adding a read request can only be done on a default reader.")
1112 },
1113 }
1114 }
1115
1116 pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
1118 match self.reader.borrow().as_ref() {
1119 Some(ReaderType::BYOB(reader)) => {
1121 let Some(reader) = reader.get() else {
1122 unreachable!(
1123 "Attempt to add a read into request without having first acquired a reader."
1124 );
1125 };
1126
1127 assert!(self.is_readable() || self.is_closed());
1129
1130 reader.add_read_into_request(read_request);
1132 },
1133 _ => {
1134 unreachable!("Adding a read into request can only be done on a BYOB reader.")
1135 },
1136 }
1137 }
1138
1139 pub(crate) fn enqueue_native(&self, bytes: Vec<u8>, can_gc: CanGc) {
1142 match self.controller.borrow().as_ref() {
1143 Some(ControllerType::Default(controller)) => controller
1144 .get()
1145 .expect("Stream should have controller.")
1146 .enqueue_native(bytes, can_gc),
1147 _ => {
1148 unreachable!(
1149 "Enqueueing chunk to a stream from Rust on other than default controller"
1150 );
1151 },
1152 }
1153 }
1154
1155 pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
1157 assert!(self.is_readable());
1159
1160 self.state.set(ReadableStreamState::Errored);
1162
1163 self.stored_error.set(e.get());
1165
1166 let default_reader = {
1169 let reader_ref = self.reader.borrow();
1170 match reader_ref.as_ref() {
1171 Some(ReaderType::Default(reader)) => reader.get(),
1172 _ => None,
1173 }
1174 };
1175
1176 if let Some(reader) = default_reader {
1177 reader.error(e, can_gc);
1179 return;
1180 }
1181
1182 let byob_reader = {
1183 let reader_ref = self.reader.borrow();
1184 match reader_ref.as_ref() {
1185 Some(ReaderType::BYOB(reader)) => reader.get(),
1186 _ => None,
1187 }
1188 };
1189
1190 if let Some(reader) = byob_reader {
1191 reader.error_read_into_requests(e, can_gc);
1193 }
1194
1195 }
1197
1198 pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
1200 handle_mut.set(self.stored_error.get());
1201 }
1202
1203 pub(crate) fn error_native(&self, error: Error, can_gc: CanGc) {
1206 let cx = GlobalScope::get_cx();
1207 rooted!(in(*cx) let mut error_val = UndefinedValue());
1208 error.to_jsval(cx, &self.global(), error_val.handle_mut(), can_gc);
1209 self.error(error_val.handle(), can_gc);
1210 }
1211
1212 pub(crate) fn controller_close_native(&self, can_gc: CanGc) {
1215 match self.controller.borrow().as_ref() {
1216 Some(ControllerType::Default(controller)) => {
1217 let _ = controller
1218 .get()
1219 .expect("Stream should have controller.")
1220 .Close(can_gc);
1221 },
1222 _ => {
1223 unreachable!("Native closing is only done on default controllers.")
1224 },
1225 }
1226 }
1227
1228 pub(crate) fn in_memory(&self) -> bool {
1231 match self.controller.borrow().as_ref() {
1232 Some(ControllerType::Default(controller)) => controller
1233 .get()
1234 .expect("Stream should have controller.")
1235 .in_memory(),
1236 _ => {
1237 unreachable!(
1238 "Checking if source is in memory for a stream with a non-default controller"
1239 )
1240 },
1241 }
1242 }
1243
1244 pub(crate) fn get_in_memory_bytes(&self) -> Option<GenericSharedMemory> {
1247 match self.controller.borrow().as_ref() {
1248 Some(ControllerType::Default(controller)) => controller
1249 .get()
1250 .expect("Stream should have controller.")
1251 .get_in_memory_bytes()
1252 .as_deref()
1253 .map(GenericSharedMemory::from_bytes),
1254 _ => {
1255 unreachable!("Getting in-memory bytes for a stream with a non-default controller")
1256 },
1257 }
1258 }
1259
1260 pub(crate) fn acquire_default_reader(
1265 &self,
1266 can_gc: CanGc,
1267 ) -> Fallible<DomRoot<ReadableStreamDefaultReader>> {
1268 let reader = ReadableStreamDefaultReader::new(&self.global(), can_gc);
1270
1271 reader.set_up(self, &self.global(), can_gc)?;
1273
1274 Ok(reader)
1276 }
1277
1278 pub(crate) fn acquire_byob_reader(
1280 &self,
1281 can_gc: CanGc,
1282 ) -> Fallible<DomRoot<ReadableStreamBYOBReader>> {
1283 let reader = ReadableStreamBYOBReader::new(&self.global(), can_gc);
1285 reader.set_up(self, &self.global(), can_gc)?;
1287
1288 Ok(reader)
1290 }
1291
1292 pub(crate) fn get_default_controller(&self) -> DomRoot<ReadableStreamDefaultController> {
1293 match self.controller.borrow().as_ref() {
1294 Some(ControllerType::Default(controller)) => {
1295 controller.get().expect("Stream should have controller.")
1296 },
1297 _ => {
1298 unreachable!(
1299 "Getting default controller for a stream with a non-default controller"
1300 )
1301 },
1302 }
1303 }
1304
1305 pub(crate) fn get_byte_controller(&self) -> DomRoot<ReadableByteStreamController> {
1306 match self.controller.borrow().as_ref() {
1307 Some(ControllerType::Byte(controller)) => {
1308 controller.get().expect("Stream should have controller.")
1309 },
1310 _ => {
1311 unreachable!("Getting byte controller for a stream with a non-byte controller")
1312 },
1313 }
1314 }
1315
1316 pub(crate) fn get_default_reader(&self) -> DomRoot<ReadableStreamDefaultReader> {
1317 match self.reader.borrow().as_ref() {
1318 Some(ReaderType::Default(reader)) => reader.get().expect("Stream should have reader."),
1319 _ => {
1320 unreachable!("Getting default reader for a stream with a non-default reader")
1321 },
1322 }
1323 }
1324
1325 pub(crate) fn read_a_chunk(&self, cx: &mut js::context::JSContext) -> Rc<Promise> {
1331 match self.reader.borrow().as_ref() {
1332 Some(ReaderType::Default(reader)) => {
1333 let Some(reader) = reader.get() else {
1334 unreachable!(
1335 "Attempt to read stream chunk without having first acquired a reader."
1336 );
1337 };
1338 reader.Read(cx)
1339 },
1340 _ => {
1341 unreachable!("Native reading of a chunk can only be done with a default reader.")
1342 },
1343 }
1344 }
1345
1346 pub(crate) fn stop_reading(&self, can_gc: CanGc) {
1351 let reader_ref = self.reader.borrow();
1352
1353 match reader_ref.as_ref() {
1354 Some(ReaderType::Default(reader)) => {
1355 let Some(reader) = reader.get() else {
1356 unreachable!("Attempt to stop reading without having first acquired a reader.");
1357 };
1358
1359 drop(reader_ref);
1360 reader.release(can_gc).expect("Reader release cannot fail.");
1361 },
1362 _ => {
1363 unreachable!("Native stop reading can only be done with a default reader.")
1364 },
1365 }
1366 }
1367
1368 pub(crate) fn is_locked(&self) -> bool {
1370 match self.reader.borrow().as_ref() {
1371 Some(ReaderType::Default(reader)) => reader.get().is_some(),
1372 Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1373 None => false,
1374 }
1375 }
1376
1377 pub(crate) fn is_disturbed(&self) -> bool {
1378 self.disturbed.get()
1379 }
1380
1381 pub(crate) fn set_is_disturbed(&self, disturbed: bool) {
1382 self.disturbed.set(disturbed);
1383 }
1384
1385 pub(crate) fn is_closed(&self) -> bool {
1386 self.state.get() == ReadableStreamState::Closed
1387 }
1388
1389 pub(crate) fn is_errored(&self) -> bool {
1390 self.state.get() == ReadableStreamState::Errored
1391 }
1392
1393 pub(crate) fn is_readable(&self) -> bool {
1394 self.state.get() == ReadableStreamState::Readable
1395 }
1396
1397 pub(crate) fn has_default_reader(&self) -> bool {
1398 match self.reader.borrow().as_ref() {
1399 Some(ReaderType::Default(reader)) => reader.get().is_some(),
1400 _ => false,
1401 }
1402 }
1403
1404 pub(crate) fn has_byob_reader(&self) -> bool {
1405 match self.reader.borrow().as_ref() {
1406 Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1407 _ => false,
1408 }
1409 }
1410
1411 pub(crate) fn has_byte_controller(&self) -> bool {
1412 match self.controller.borrow().as_ref() {
1413 Some(ControllerType::Byte(controller)) => controller.get().is_some(),
1414 _ => false,
1415 }
1416 }
1417
1418 pub(crate) fn get_num_read_requests(&self) -> usize {
1420 match self.reader.borrow().as_ref() {
1421 Some(ReaderType::Default(reader)) => {
1422 let reader = reader
1423 .get()
1424 .expect("Stream must have a reader when getting the number of read requests.");
1425 reader.get_num_read_requests()
1426 },
1427 _ => unreachable!(
1428 "Stream must have a default reader when get num read requests is called into."
1429 ),
1430 }
1431 }
1432
1433 pub(crate) fn get_num_read_into_requests(&self) -> usize {
1435 assert!(self.has_byob_reader());
1436
1437 match self.reader.borrow().as_ref() {
1438 Some(ReaderType::BYOB(reader)) => {
1439 let Some(reader) = reader.get() else {
1440 unreachable!(
1441 "Stream must have a reader when get num read into requests is called into."
1442 );
1443 };
1444 reader.get_num_read_into_requests()
1445 },
1446 _ => {
1447 unreachable!(
1448 "Stream must have a BYOB reader when get num read into requests is called into."
1449 );
1450 },
1451 }
1452 }
1453
1454 pub(crate) fn fulfill_read_request(&self, chunk: SafeHandleValue, done: bool, can_gc: CanGc) {
1456 assert!(self.has_default_reader());
1458
1459 match self.reader.borrow().as_ref() {
1460 Some(ReaderType::Default(reader)) => {
1461 let reader = reader
1463 .get()
1464 .expect("Stream must have a reader when a read request is fulfilled.");
1465 assert_ne!(reader.get_num_read_requests(), 0);
1467 let request = reader.remove_read_request();
1470
1471 if done {
1472 request.close_steps(can_gc);
1474 } else {
1475 let result = RootedTraceableBox::new(Heap::default());
1477 result.set(*chunk);
1478 request.chunk_steps(result, &self.global(), can_gc);
1479 }
1480 },
1481 _ => {
1482 unreachable!(
1483 "Stream must have a default reader when fulfill read requests is called into."
1484 );
1485 },
1486 }
1487 }
1488
1489 pub(crate) fn fulfill_read_into_request(
1491 &self,
1492 chunk: SafeHandleValue,
1493 done: bool,
1494 can_gc: CanGc,
1495 ) {
1496 assert!(self.has_byob_reader());
1498
1499 match self.reader.borrow().as_ref() {
1501 Some(ReaderType::BYOB(reader)) => {
1502 let Some(reader) = reader.get() else {
1503 unreachable!(
1504 "Stream must have a reader when a read into request is fulfilled."
1505 );
1506 };
1507
1508 assert!(reader.get_num_read_into_requests() > 0);
1510
1511 let read_into_request = reader.remove_read_into_request();
1514
1515 let result = RootedTraceableBox::new(Heap::default());
1517 if done {
1518 result.set(*chunk);
1519 read_into_request.close_steps(Some(result), can_gc);
1520 } else {
1521 result.set(*chunk);
1523 read_into_request.chunk_steps(result, can_gc);
1524 }
1525 },
1526 _ => {
1527 unreachable!(
1528 "Stream must have a BYOB reader when fulfill read into requests is called into."
1529 );
1530 },
1531 };
1532 }
1533
1534 pub(crate) fn close(&self, can_gc: CanGc) {
1536 assert!(self.is_readable());
1538 self.state.set(ReadableStreamState::Closed);
1540 let default_reader = {
1546 let reader_ref = self.reader.borrow();
1547 match reader_ref.as_ref() {
1548 Some(ReaderType::Default(reader)) => reader.get(),
1549 _ => None,
1550 }
1551 };
1552
1553 if let Some(reader) = default_reader {
1554 reader.close(can_gc);
1556 return;
1557 }
1558
1559 let byob_reader = {
1561 let reader_ref = self.reader.borrow();
1562 match reader_ref.as_ref() {
1563 Some(ReaderType::BYOB(reader)) => reader.get(),
1564 _ => None,
1565 }
1566 };
1567
1568 if let Some(reader) = byob_reader {
1569 reader.close(can_gc);
1571 }
1572
1573 }
1575
1576 pub(crate) fn cancel(
1578 &self,
1579 cx: &mut js::context::JSContext,
1580 global: &GlobalScope,
1581 reason: SafeHandleValue,
1582 ) -> Rc<Promise> {
1583 self.disturbed.set(true);
1585
1586 if self.is_closed() {
1588 return Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
1589 }
1590 if self.is_errored() {
1592 let promise = Promise::new2(cx, global);
1593 rooted!(&in(cx) let mut rval = UndefinedValue());
1594 self.stored_error
1595 .safe_to_jsval(cx.into(), rval.handle_mut(), CanGc::from_cx(cx));
1596 promise.reject_native(&rval.handle(), CanGc::from_cx(cx));
1597 return promise;
1598 }
1599 self.close(CanGc::from_cx(cx));
1601
1602 let byob_reader = {
1604 let reader_ref = self.reader.borrow();
1605 match reader_ref.as_ref() {
1606 Some(ReaderType::BYOB(reader)) => reader.get(),
1607 _ => None,
1608 }
1609 };
1610
1611 if let Some(reader) = byob_reader {
1612 reader.cancel(CanGc::from_cx(cx));
1614 }
1615
1616 let source_cancel_promise = match self.controller.borrow().as_ref() {
1619 Some(ControllerType::Default(controller)) => controller
1620 .get()
1621 .expect("Stream should have controller.")
1622 .perform_cancel_steps(cx, global, reason),
1623 Some(ControllerType::Byte(controller)) => controller
1624 .get()
1625 .expect("Stream should have controller.")
1626 .perform_cancel_steps(cx, global, reason),
1627 None => {
1628 panic!("Stream does not have a controller.");
1629 },
1630 };
1631
1632 let global = self.global();
1635 let result_promise = Promise::new2(cx, &global);
1636 let fulfillment_handler = Box::new(SourceCancelPromiseFulfillmentHandler {
1637 result: result_promise.clone(),
1638 });
1639 let rejection_handler = Box::new(SourceCancelPromiseRejectionHandler {
1640 result: result_promise.clone(),
1641 });
1642 let handler = PromiseNativeHandler::new(
1643 &global,
1644 Some(fulfillment_handler),
1645 Some(rejection_handler),
1646 CanGc::from_cx(cx),
1647 );
1648 let realm = enter_realm(&*global);
1649 let comp = InRealm::Entered(&realm);
1650 source_cancel_promise.append_native_handler(&handler, comp, CanGc::from_cx(cx));
1651
1652 result_promise
1655 }
1656
1657 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1658 pub(crate) fn set_reader(&self, new_reader: Option<ReaderType>) {
1659 *self.reader.borrow_mut() = new_reader;
1660 }
1661
1662 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1663 fn byte_tee(&self, cx: &mut js::context::JSContext) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1665 let reader = self.acquire_default_reader(CanGc::from_cx(cx))?;
1670 let reader = Rc::new(RefCell::new(ReaderType::Default(MutNullableDom::new(
1671 Some(&reader),
1672 ))));
1673
1674 let reading = Rc::new(Cell::new(false));
1676
1677 let read_again_for_branch_1 = Rc::new(Cell::new(false));
1679
1680 let read_again_for_branch_2 = Rc::new(Cell::new(false));
1682
1683 let canceled_1 = Rc::new(Cell::new(false));
1685
1686 let canceled_2 = Rc::new(Cell::new(false));
1688
1689 let reason_1 = Rc::new(Heap::boxed(UndefinedValue()));
1691
1692 let reason_2 = Rc::new(Heap::boxed(UndefinedValue()));
1694
1695 let cancel_promise = Promise::new2(cx, &self.global());
1697 let reader_version = Rc::new(Cell::new(0));
1698
1699 let byte_tee_source_1 = ByteTeeUnderlyingSource::new(
1700 reader.clone(),
1701 self,
1702 reading.clone(),
1703 read_again_for_branch_1.clone(),
1704 read_again_for_branch_2.clone(),
1705 canceled_1.clone(),
1706 canceled_2.clone(),
1707 reason_1.clone(),
1708 reason_2.clone(),
1709 cancel_promise.clone(),
1710 reader_version.clone(),
1711 ByteTeeCancelAlgorithm::Cancel1Algorithm,
1712 ByteTeePullAlgorithm::Pull1Algorithm,
1713 CanGc::from_cx(cx),
1714 );
1715
1716 let byte_tee_source_2 = ByteTeeUnderlyingSource::new(
1717 reader.clone(),
1718 self,
1719 reading,
1720 read_again_for_branch_1,
1721 read_again_for_branch_2,
1722 canceled_1,
1723 canceled_2,
1724 reason_1,
1725 reason_2,
1726 cancel_promise,
1727 reader_version,
1728 ByteTeeCancelAlgorithm::Cancel2Algorithm,
1729 ByteTeePullAlgorithm::Pull2Algorithm,
1730 CanGc::from_cx(cx),
1731 );
1732
1733 let branch_1 = readable_byte_stream_tee(
1735 cx,
1736 &self.global(),
1737 UnderlyingSourceType::TeeByte(Dom::from_ref(&byte_tee_source_1)),
1738 );
1739 byte_tee_source_1.set_branch_1(&branch_1);
1740 byte_tee_source_2.set_branch_1(&branch_1);
1741
1742 let branch_2 = readable_byte_stream_tee(
1744 cx,
1745 &self.global(),
1746 UnderlyingSourceType::TeeByte(Dom::from_ref(&byte_tee_source_2)),
1747 );
1748 byte_tee_source_1.set_branch_2(&branch_2);
1749 byte_tee_source_2.set_branch_2(&branch_2);
1750
1751 byte_tee_source_1.forward_reader_error(reader.clone(), CanGc::from_cx(cx));
1753 byte_tee_source_2.forward_reader_error(reader, CanGc::from_cx(cx));
1754
1755 Ok(vec![branch_1, branch_2])
1757 }
1758
1759 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1761 fn default_tee(
1762 &self,
1763 cx: &mut js::context::JSContext,
1764 clone_for_branch_2: bool,
1765 ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1766 let clone_for_branch_2 = Rc::new(Cell::new(clone_for_branch_2));
1770
1771 let reader = self.acquire_default_reader(CanGc::from_cx(cx))?;
1773
1774 let reading = Rc::new(Cell::new(false));
1776 let read_again = Rc::new(Cell::new(false));
1778 let canceled_1 = Rc::new(Cell::new(false));
1780 let canceled_2 = Rc::new(Cell::new(false));
1782
1783 let reason_1 = Rc::new(Heap::boxed(UndefinedValue()));
1785 let reason_2 = Rc::new(Heap::boxed(UndefinedValue()));
1787 let cancel_promise = Promise::new2(cx, &self.global());
1789
1790 let tee_source_1 = DefaultTeeUnderlyingSource::new(
1791 &reader,
1792 self,
1793 reading.clone(),
1794 read_again.clone(),
1795 canceled_1.clone(),
1796 canceled_2.clone(),
1797 clone_for_branch_2.clone(),
1798 reason_1.clone(),
1799 reason_2.clone(),
1800 cancel_promise.clone(),
1801 DefaultTeeCancelAlgorithm::Cancel1Algorithm,
1802 CanGc::from_cx(cx),
1803 );
1804
1805 let underlying_source_type_branch_1 =
1806 UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_1));
1807
1808 let tee_source_2 = DefaultTeeUnderlyingSource::new(
1809 &reader,
1810 self,
1811 reading,
1812 read_again,
1813 canceled_1.clone(),
1814 canceled_2.clone(),
1815 clone_for_branch_2,
1816 reason_1,
1817 reason_2,
1818 cancel_promise.clone(),
1819 DefaultTeeCancelAlgorithm::Cancel2Algorithm,
1820 CanGc::from_cx(cx),
1821 );
1822
1823 let underlying_source_type_branch_2 =
1824 UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_2));
1825
1826 let branch_1 = create_readable_stream(
1828 cx,
1829 &self.global(),
1830 underlying_source_type_branch_1,
1831 None,
1832 None,
1833 );
1834 tee_source_1.set_branch_1(&branch_1);
1835 tee_source_2.set_branch_1(&branch_1);
1836
1837 let branch_2 = create_readable_stream(
1839 cx,
1840 &self.global(),
1841 underlying_source_type_branch_2,
1842 None,
1843 None,
1844 );
1845 tee_source_1.set_branch_2(&branch_2);
1846 tee_source_2.set_branch_2(&branch_2);
1847
1848 reader.default_tee_append_native_handler_to_closed_promise(
1850 &branch_1,
1851 &branch_2,
1852 canceled_1,
1853 canceled_2,
1854 cancel_promise,
1855 CanGc::from_cx(cx),
1856 );
1857
1858 Ok(vec![branch_1, branch_2])
1860 }
1861
1862 #[allow(clippy::too_many_arguments)]
1864 pub(crate) fn pipe_to(
1865 &self,
1866 cx: &mut CurrentRealm,
1867 global: &GlobalScope,
1868 dest: &WritableStream,
1869 prevent_close: bool,
1870 prevent_abort: bool,
1871 prevent_cancel: bool,
1872 signal: Option<&AbortSignal>,
1873 ) -> Rc<Promise> {
1874 assert!(!self.is_locked());
1885
1886 assert!(!dest.is_locked());
1888
1889 let reader = self
1897 .acquire_default_reader(CanGc::from_cx(cx))
1898 .expect("Acquiring a default reader for pipe_to cannot fail");
1899
1900 let writer = dest
1902 .aquire_default_writer(cx.into(), global, CanGc::from_cx(cx))
1903 .expect("Acquiring a default writer for pipe_to cannot fail");
1904
1905 self.disturbed.set(true);
1907
1908 let promise = Promise::new2(cx, global);
1913
1914 rooted!(&in(cx) let pipe_to = PipeTo {
1916 reader: Dom::from_ref(&reader),
1917 writer: Dom::from_ref(&writer),
1918 pending_writes: Default::default(),
1919 state: Default::default(),
1920 prevent_abort,
1921 prevent_cancel,
1922 prevent_close,
1923 shutting_down: Default::default(),
1924 abort_reason: Default::default(),
1925 shutdown_error: Default::default(),
1926 shutdown_action_promise: Default::default(),
1927 result_promise: promise.clone(),
1928 });
1929
1930 if let Some(signal) = signal {
1933 rooted!(&in(cx) let abort_algorithm = AbortAlgorithm::StreamPiping(pipe_to.clone()));
1936
1937 if signal.aborted() {
1939 signal.run_abort_algorithm(cx, global, &abort_algorithm);
1940 return promise;
1941 }
1942
1943 signal.add(&abort_algorithm);
1945 }
1946
1947 pipe_to.check_and_propagate_errors_forward(cx, global);
1949 pipe_to.check_and_propagate_errors_backward(cx, global);
1950 pipe_to.check_and_propagate_closing_forward(cx, global);
1951 pipe_to.check_and_propagate_closing_backward(cx, global);
1952
1953 if *pipe_to.state.borrow() == PipeToState::Starting {
1955 pipe_to.wait_for_writer_ready(cx, global);
1957 }
1958
1959 promise
1961 }
1962
1963 pub(crate) fn tee(
1965 &self,
1966 cx: &mut js::context::JSContext,
1967 clone_for_branch_2: bool,
1968 ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1969 match self.controller.borrow().as_ref() {
1973 Some(ControllerType::Default(_)) => {
1974 self.default_tee(cx, clone_for_branch_2)
1976 },
1977 Some(ControllerType::Byte(_)) => {
1978 self.byte_tee(cx)
1981 },
1982 None => {
1983 unreachable!("Stream should have a controller.");
1984 },
1985 }
1986 }
1987
1988 fn set_up_byte_controller(
1990 &self,
1991 cx: &mut js::context::JSContext,
1992 global: &GlobalScope,
1993 underlying_source_dict: JsUnderlyingSource,
1994 underlying_source_handle: SafeHandleObject,
1995 stream: DomRoot<ReadableStream>,
1996 strategy_hwm: f64,
1997 ) -> Fallible<()> {
1998 if let Some(0) = underlying_source_dict.autoAllocateChunkSize {
2014 return Err(Error::Type(c"autoAllocateChunkSize cannot be 0".to_owned()));
2015 }
2016
2017 let controller = ReadableByteStreamController::new(
2018 UnderlyingSourceType::Js(underlying_source_dict, Heap::default()),
2019 strategy_hwm,
2020 global,
2021 CanGc::from_cx(cx),
2022 );
2023
2024 controller.set_underlying_source_this_object(underlying_source_handle);
2027
2028 controller.setup(cx, global, stream)
2031 }
2032
2033 pub(crate) fn setup_cross_realm_transform_readable(
2035 &self,
2036 cx: &mut js::context::JSContext,
2037 port: &MessagePort,
2038 ) {
2039 let port_id = port.message_port_id();
2040 let global = self.global();
2041
2042 let size_algorithm =
2047 extract_size_algorithm(&QueuingStrategy::default(), CanGc::from_cx(cx));
2048
2049 let controller = ReadableStreamDefaultController::new(
2053 &self.global(),
2054 UnderlyingSourceType::Transfer(Dom::from_ref(port)),
2055 0.,
2056 size_algorithm,
2057 CanGc::from_cx(cx),
2058 );
2059
2060 rooted!(&in(cx) let cross_realm_transform_readable = CrossRealmTransformReadable {
2063 controller: Dom::from_ref(&controller),
2064 });
2065 global.note_cross_realm_transform_readable(&cross_realm_transform_readable, port_id);
2066
2067 port.Start(cx);
2069
2070 controller
2072 .setup(cx, DomRoot::from_ref(self))
2073 .expect("Setting up controller for transfer cannot fail.");
2074 }
2075}
2076
2077impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
2078 fn Constructor(
2080 cx: &mut js::context::JSContext,
2081 global: &GlobalScope,
2082 proto: Option<SafeHandleObject>,
2083 underlying_source: Option<*mut JSObject>,
2084 strategy: &QueuingStrategy,
2085 ) -> Fallible<DomRoot<Self>> {
2086 rooted!(&in(cx) let underlying_source_obj = underlying_source.unwrap_or(ptr::null_mut()));
2088 let underlying_source_dict = if !underlying_source_obj.is_null() {
2091 rooted!(&in(cx) let obj_val = ObjectValue(underlying_source_obj.get()));
2092 match JsUnderlyingSource::new(cx.into(), obj_val.handle(), CanGc::from_cx(cx)) {
2093 Ok(ConversionResult::Success(val)) => val,
2094 Ok(ConversionResult::Failure(error)) => {
2095 return Err(Error::Type(error.into_owned()));
2096 },
2097 _ => {
2098 return Err(Error::JSFailed);
2099 },
2100 }
2101 } else {
2102 JsUnderlyingSource::empty()
2103 };
2104
2105 let stream = ReadableStream::new_with_proto(global, proto, CanGc::from_cx(cx));
2107
2108 if underlying_source_dict.type_.is_some() {
2109 if strategy.size.is_some() {
2111 return Err(Error::Range(
2112 c"size is not supported for byte streams".to_owned(),
2113 ));
2114 }
2115
2116 let strategy_hwm = extract_high_water_mark(strategy, 0.0)?;
2118
2119 stream.set_up_byte_controller(
2122 cx,
2123 global,
2124 underlying_source_dict,
2125 underlying_source_obj.handle(),
2126 stream.clone(),
2127 strategy_hwm,
2128 )?;
2129 } else {
2130 let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
2132
2133 let size_algorithm = extract_size_algorithm(strategy, CanGc::from_cx(cx));
2135
2136 let controller = ReadableStreamDefaultController::new(
2137 global,
2138 UnderlyingSourceType::Js(underlying_source_dict, Heap::default()),
2139 high_water_mark,
2140 size_algorithm,
2141 CanGc::from_cx(cx),
2142 );
2143
2144 controller.set_underlying_source_this_object(underlying_source_obj.handle());
2147
2148 controller.setup(cx, stream.clone())?;
2150 };
2151
2152 Ok(stream)
2153 }
2154
2155 fn Locked(&self) -> bool {
2157 self.is_locked()
2158 }
2159
2160 fn Cancel(&self, cx: &mut js::context::JSContext, reason: SafeHandleValue) -> Rc<Promise> {
2162 let global = self.global();
2163 if self.is_locked() {
2164 let promise = Promise::new2(cx, &global);
2167 promise.reject_error(
2168 Error::Type(c"stream is locked".to_owned()),
2169 CanGc::from_cx(cx),
2170 );
2171 promise
2172 } else {
2173 self.cancel(cx, &global, reason)
2175 }
2176 }
2177
2178 fn GetReader(
2180 &self,
2181 options: &ReadableStreamGetReaderOptions,
2182 can_gc: CanGc,
2183 ) -> Fallible<ReadableStreamReader> {
2184 if options.mode.is_none() {
2186 return Ok(ReadableStreamReader::ReadableStreamDefaultReader(
2187 self.acquire_default_reader(can_gc)?,
2188 ));
2189 }
2190 assert!(options.mode.unwrap() == ReadableStreamReaderMode::Byob);
2192
2193 Ok(ReadableStreamReader::ReadableStreamBYOBReader(
2195 self.acquire_byob_reader(can_gc)?,
2196 ))
2197 }
2198
2199 fn Tee(&self, cx: &mut js::context::JSContext) -> Fallible<Vec<DomRoot<ReadableStream>>> {
2201 self.tee(cx, false)
2203 }
2204
2205 fn PipeTo(
2207 &self,
2208 cx: &mut CurrentRealm,
2209 destination: &WritableStream,
2210 options: &StreamPipeOptions,
2211 ) -> Rc<Promise> {
2212 let global = self.global();
2213
2214 if self.is_locked() {
2216 let promise = Promise::new2(cx, &global);
2218 promise.reject_error(
2219 Error::Type(c"Source stream is locked".to_owned()),
2220 CanGc::from_cx(cx),
2221 );
2222 return promise;
2223 }
2224
2225 if destination.is_locked() {
2227 let promise = Promise::new2(cx, &global);
2229 promise.reject_error(
2230 Error::Type(c"Destination stream is locked".to_owned()),
2231 CanGc::from_cx(cx),
2232 );
2233 return promise;
2234 }
2235
2236 let signal = options.signal.as_deref();
2238
2239 self.pipe_to(
2241 cx,
2242 &global,
2243 destination,
2244 options.preventClose,
2245 options.preventAbort,
2246 options.preventCancel,
2247 signal,
2248 )
2249 }
2250
2251 fn PipeThrough(
2253 &self,
2254 cx: &mut CurrentRealm,
2255 transform: &ReadableWritablePair,
2256 options: &StreamPipeOptions,
2257 ) -> Fallible<DomRoot<ReadableStream>> {
2258 let global = self.global();
2259
2260 if self.is_locked() {
2262 return Err(Error::Type(c"Source stream is locked".to_owned()));
2263 }
2264
2265 if transform.writable.is_locked() {
2267 return Err(Error::Type(c"Destination stream is locked".to_owned()));
2268 }
2269
2270 let signal = options.signal.as_deref();
2272
2273 let promise = self.pipe_to(
2276 cx,
2277 &global,
2278 &transform.writable,
2279 options.preventClose,
2280 options.preventAbort,
2281 options.preventCancel,
2282 signal,
2283 );
2284
2285 promise.set_promise_is_handled();
2287
2288 Ok(transform.readable.clone())
2290 }
2291}
2292
2293#[expect(unsafe_code)]
2294pub(crate) unsafe fn get_type_and_value_from_message(
2298 cx: SafeJSContext,
2299 data: SafeHandleValue,
2300 value: SafeMutableHandleValue,
2301 can_gc: CanGc,
2302) -> DOMString {
2303 assert!(data.is_object());
2309 rooted!(in(*cx) let data_object = data.to_object());
2310
2311 rooted!(in(*cx) let mut type_ = UndefinedValue());
2313 unsafe {
2314 get_dictionary_property(
2315 *cx,
2316 data_object.handle(),
2317 c"type",
2318 type_.handle_mut(),
2319 can_gc,
2320 )
2321 }
2322 .expect("Getting the type should not fail.");
2323
2324 unsafe { get_dictionary_property(*cx, data_object.handle(), c"value", value, can_gc) }
2326 .expect("Getting the value should not fail.");
2327
2328 let result =
2330 DOMString::safe_from_jsval(cx, type_.handle(), StringificationBehavior::Empty, can_gc)
2331 .expect("The type of the message should be a string");
2332 let ConversionResult::Success(type_string) = result else {
2333 unreachable!("The type of the message should be a string");
2334 };
2335
2336 type_string
2337}
2338
2339impl js::gc::Rootable for CrossRealmTransformReadable {}
2340
2341#[derive(Clone, JSTraceable, MallocSizeOf)]
2345#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
2346pub(crate) struct CrossRealmTransformReadable {
2347 controller: Dom<ReadableStreamDefaultController>,
2349}
2350
2351impl CrossRealmTransformReadable {
2352 #[expect(unsafe_code)]
2355 pub(crate) fn handle_message(
2356 &self,
2357 cx: &mut CurrentRealm,
2358 global: &GlobalScope,
2359 port: &MessagePort,
2360 message: SafeHandleValue,
2361 ) {
2362 rooted!(&in(cx) let mut value = UndefinedValue());
2363 let type_string = unsafe {
2364 get_type_and_value_from_message(
2365 cx.into(),
2366 message,
2367 value.handle_mut(),
2368 CanGc::from_cx(cx),
2369 )
2370 };
2371
2372 if type_string == "chunk" {
2374 self.controller
2376 .enqueue(cx, value.handle())
2377 .expect("Enqueing a chunk should not fail.");
2378 }
2379
2380 if type_string == "close" {
2382 self.controller.close(CanGc::from_cx(cx));
2384
2385 global.disentangle_port(cx, port);
2387 }
2388
2389 if type_string == "error" {
2391 self.controller.error(value.handle(), CanGc::from_cx(cx));
2393
2394 global.disentangle_port(cx, port);
2396 }
2397 }
2398
2399 pub(crate) fn handle_error(
2402 &self,
2403 cx: &mut CurrentRealm,
2404 global: &GlobalScope,
2405 port: &MessagePort,
2406 ) {
2407 let error = DOMException::new(global, DOMErrorName::DataCloneError, CanGc::from_cx(cx));
2409 rooted!(&in(cx) let mut rooted_error = UndefinedValue());
2410 error.safe_to_jsval(cx.into(), rooted_error.handle_mut(), CanGc::from_cx(cx));
2411
2412 port.cross_realm_transform_send_error(rooted_error.handle(), CanGc::from_cx(cx));
2414
2415 self.controller
2417 .error(rooted_error.handle(), CanGc::from_cx(cx));
2418
2419 global.disentangle_port(cx, port);
2421 }
2422}
2423
2424#[expect(unsafe_code)]
2425pub(crate) fn get_read_promise_done(
2427 cx: SafeJSContext,
2428 v: &SafeHandleValue,
2429 can_gc: CanGc,
2430) -> Result<bool, Error> {
2431 if !v.is_object() {
2432 return Err(Error::Type(c"Unknown format for done property.".to_owned()));
2433 }
2434 unsafe {
2435 rooted!(in(*cx) let object = v.to_object());
2436 rooted!(in(*cx) let mut done = UndefinedValue());
2437 match get_dictionary_property(*cx, object.handle(), c"done", done.handle_mut(), can_gc) {
2438 Ok(true) => match bool::safe_from_jsval(cx, done.handle(), (), can_gc) {
2439 Ok(ConversionResult::Success(val)) => Ok(val),
2440 Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.into_owned())),
2441 _ => Err(Error::Type(c"Unknown format for done property.".to_owned())),
2442 },
2443 Ok(false) => Err(Error::Type(c"Promise has no done property.".to_owned())),
2444 Err(()) => Err(Error::JSFailed),
2445 }
2446 }
2447}
2448
2449#[expect(unsafe_code)]
2450pub(crate) fn get_read_promise_bytes(
2452 cx: SafeJSContext,
2453 v: &SafeHandleValue,
2454 can_gc: CanGc,
2455) -> Result<Vec<u8>, Error> {
2456 if !v.is_object() {
2457 return Err(Error::Type(
2458 c"Unknown format for for bytes read.".to_owned(),
2459 ));
2460 }
2461 unsafe {
2462 rooted!(in(*cx) let object = v.to_object());
2463 rooted!(in(*cx) let mut bytes = UndefinedValue());
2464 match get_dictionary_property(*cx, object.handle(), c"value", bytes.handle_mut(), can_gc) {
2465 Ok(true) => {
2466 match Vec::<u8>::safe_from_jsval(
2467 cx,
2468 bytes.handle(),
2469 ConversionBehavior::EnforceRange,
2470 can_gc,
2471 ) {
2472 Ok(ConversionResult::Success(val)) => Ok(val),
2473 Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.into_owned())),
2474 _ => Err(Error::Type(c"Unknown format for bytes read.".to_owned())),
2475 }
2476 },
2477 Ok(false) => Err(Error::Type(c"Promise has no value property.".to_owned())),
2478 Err(()) => Err(Error::JSFailed),
2479 }
2480 }
2481}
2482
2483pub(crate) fn bytes_from_chunk_jsval(
2487 cx: SafeJSContext,
2488 chunk: &RootedTraceableBox<Heap<JSVal>>,
2489 can_gc: CanGc,
2490) -> Result<Vec<u8>, Error> {
2491 match Vec::<u8>::safe_from_jsval(cx, chunk.handle(), ConversionBehavior::EnforceRange, can_gc) {
2492 Ok(ConversionResult::Success(vec)) => Ok(vec),
2493 Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.into_owned())),
2494 _ => Err(Error::Type(c"Unknown format for bytes read.".to_owned())),
2495 }
2496}
2497
2498impl Transferable for ReadableStream {
2500 type Index = MessagePortIndex;
2501 type Data = MessagePortImpl;
2502
2503 fn transfer(
2505 &self,
2506 cx: &mut js::context::JSContext,
2507 ) -> Fallible<(MessagePortId, MessagePortImpl)> {
2508 if self.is_locked() {
2511 return Err(Error::DataClone(None));
2512 }
2513
2514 let global = self.global();
2515 let mut realm = enter_auto_realm(cx, &*global);
2516 let mut realm = realm.current_realm();
2517 let cx = &mut realm;
2518
2519 let port_1 = MessagePort::new(&global, CanGc::from_cx(cx));
2521 global.track_message_port(&port_1, None);
2522
2523 let port_2 = MessagePort::new(&global, CanGc::from_cx(cx));
2525 global.track_message_port(&port_2, None);
2526
2527 global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
2529
2530 let writable = WritableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
2532
2533 writable.setup_cross_realm_transform_writable(cx, &port_1);
2535
2536 let promise = self.pipe_to(cx, &global, &writable, false, false, false, None);
2538
2539 promise.set_promise_is_handled();
2541
2542 port_2.transfer(cx)
2544 }
2545
2546 fn transfer_receive(
2548 cx: &mut js::context::JSContext,
2549 owner: &GlobalScope,
2550 id: MessagePortId,
2551 port_impl: MessagePortImpl,
2552 ) -> Result<DomRoot<Self>, ()> {
2553 let value = ReadableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
2556
2557 let transferred_port = MessagePort::transfer_receive(cx, owner, id, port_impl)?;
2564
2565 value.setup_cross_realm_transform_readable(cx, &transferred_port);
2567 Ok(value)
2568 }
2569
2570 fn serialized_storage<'a>(
2572 data: StructuredData<'a, '_>,
2573 ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
2574 match data {
2575 StructuredData::Reader(r) => &mut r.port_impls,
2576 StructuredData::Writer(w) => &mut w.ports,
2577 }
2578 }
2579}