1use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr::{self};
8use std::rc::Rc;
9
10use base::generic_channel::GenericSharedMemory;
11use base::id::{MessagePortId, MessagePortIndex};
12use constellation_traits::MessagePortImpl;
13use dom_struct::dom_struct;
14use js::jsapi::{Heap, JSObject};
15use js::jsval::{JSVal, ObjectValue, UndefinedValue};
16use js::realm::CurrentRealm;
17use js::rust::{
18 HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
19 MutableHandleValue as SafeMutableHandleValue,
20};
21use js::typedarray::ArrayBufferViewU8;
22use rustc_hash::FxHashMap;
23use script_bindings::conversions::SafeToJSValConvertible;
24
25use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
26use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{
27 ReadableStreamGetReaderOptions, ReadableStreamMethods, ReadableStreamReaderMode,
28 ReadableWritablePair, StreamPipeOptions,
29};
30use script_bindings::str::DOMString;
31
32use crate::dom::domexception::{DOMErrorName, DOMException};
33use script_bindings::conversions::{is_array_like, StringificationBehavior};
34use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
35use crate::dom::abortsignal::{AbortAlgorithm, AbortSignal};
36use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods;
37use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultController_Binding::ReadableStreamDefaultControllerMethods;
38use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource;
39use crate::dom::bindings::conversions::{ConversionBehavior, ConversionResult, SafeFromJSValConvertible};
40use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
41use crate::dom::bindings::codegen::GenericBindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriter_Binding::WritableStreamDefaultWriterMethods;
42use crate::dom::stream::writablestream::WritableStream;
43use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultReaderOrReadableStreamBYOBReader as ReadableStreamReader;
44use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
45use crate::dom::bindings::root::{DomRoot, MutNullableDom, Dom};
46use crate::dom::bindings::trace::RootedTraceableBox;
47use crate::dom::bindings::utils::get_dictionary_property;
48use crate::dom::stream::byteteeunderlyingsource::{ByteTeeCancelAlgorithm, ByteTeePullAlgorithm, ByteTeeUnderlyingSource};
49use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
50use crate::dom::stream::readablestreamgenericreader::ReadableStreamGenericReader;
51use crate::dom::globalscope::GlobalScope;
52use crate::dom::promise::{wait_for_all_promise, Promise};
53use crate::dom::stream::readablebytestreamcontroller::ReadableByteStreamController;
54use crate::dom::stream::readablestreambyobreader::ReadableStreamBYOBReader;
55use crate::dom::stream::readablestreamdefaultcontroller::ReadableStreamDefaultController;
56use crate::dom::stream::readablestreamdefaultreader::{ReadRequest, ReadableStreamDefaultReader};
57use crate::dom::stream::defaultteeunderlyingsource::DefaultTeeCancelAlgorithm;
58use crate::dom::types::DefaultTeeUnderlyingSource;
59use crate::dom::stream::underlyingsourcecontainer::UnderlyingSourceType;
60use crate::dom::stream::writablestreamdefaultwriter::WritableStreamDefaultWriter;
61use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
62use crate::dom::messageport::MessagePort;
63use crate::realms::{enter_realm, InRealm};
64use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
65use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
66use crate::dom::bindings::transferable::Transferable;
67use crate::dom::bindings::structuredclone::StructuredData;
68
69use crate::dom::bindings::buffer_source::HeapBufferSource;
70use super::readablestreambyobreader::ReadIntoRequest;
71
72#[derive(Clone, Debug, Default, MallocSizeOf, PartialEq)]
74enum PipeToState {
75 #[default]
77 Starting,
78 PendingReady,
80 PendingRead,
82 ShuttingDownWithPendingWrites(Option<ShutdownAction>),
85 ShuttingDownPendingAction,
89 Finalized,
92}
93
94#[derive(Clone, Debug, MallocSizeOf, PartialEq)]
96enum ShutdownAction {
97 WritableStreamAbort,
99 ReadableStreamCancel,
101 WritableStreamDefaultWriterCloseWithErrorPropagation,
103 Abort,
105}
106
107impl js::gc::Rootable for PipeTo {}
108
109#[derive(Clone, JSTraceable, MallocSizeOf)]
118#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
119pub(crate) struct PipeTo {
120 reader: Dom<ReadableStreamDefaultReader>,
122
123 writer: Dom<WritableStreamDefaultWriter>,
125
126 #[ignore_malloc_size_of = "nested Rc"]
129 pending_writes: Rc<RefCell<VecDeque<Rc<Promise>>>>,
130
131 #[conditional_malloc_size_of]
133 #[no_trace]
134 state: Rc<RefCell<PipeToState>>,
135
136 prevent_abort: bool,
138
139 prevent_cancel: bool,
141
142 prevent_close: bool,
144
145 #[conditional_malloc_size_of]
148 shutting_down: Rc<Cell<bool>>,
149
150 #[ignore_malloc_size_of = "mozjs"]
153 abort_reason: Rc<Heap<JSVal>>,
154
155 #[ignore_malloc_size_of = "mozjs"]
158 shutdown_error: Rc<RefCell<Option<Heap<JSVal>>>>,
159
160 #[ignore_malloc_size_of = "nested Rc"]
163 shutdown_action_promise: Rc<RefCell<Option<Rc<Promise>>>>,
164
165 #[conditional_malloc_size_of]
168 result_promise: Rc<Promise>,
169}
170
171impl PipeTo {
172 pub(crate) fn abort_with_reason(
175 &self,
176 cx: SafeJSContext,
177 global: &GlobalScope,
178 reason: SafeHandleValue,
179 realm: InRealm,
180 can_gc: CanGc,
181 ) {
182 if self.shutting_down.get() {
184 return;
185 }
186
187 self.abort_reason.set(reason.get());
191
192 self.set_shutdown_error(reason);
197
198 self.shutdown(cx, global, Some(ShutdownAction::Abort), realm, can_gc);
204 }
205}
206
207impl Callback for PipeTo {
208 #[expect(unsafe_code)]
217 fn callback(&self, cx: &mut CurrentRealm, result: SafeHandleValue) {
218 let can_gc = CanGc::from_cx(cx);
219 let in_realm_proof = cx.into();
220 let realm = InRealm::Already(&in_realm_proof);
221 let cx = cx.into();
222 let global = self.reader.global();
223
224 self.pending_writes.borrow_mut().retain(|p| {
230 let pending = p.is_pending();
231 if !pending {
232 p.set_promise_is_handled();
233 }
234 pending
235 });
236
237 let state_before_checks = self.state.borrow().clone();
239
240 if state_before_checks == PipeToState::PendingRead {
247 let source = self.reader.get_stream().expect("Source stream must be set");
248 if source.is_closed() {
249 let dest = self
250 .writer
251 .get_stream()
252 .expect("Destination stream must be set");
253
254 if dest.is_writable() && !dest.close_queued_or_in_flight() {
257 let Ok(done) = get_read_promise_done(cx, &result, can_gc) else {
258 return;
265 };
266
267 if !done {
268 self.write_chunk(cx, &global, result, can_gc);
270 }
271 }
272 }
273 }
274
275 self.check_and_propagate_errors_forward(cx, &global, realm, can_gc);
276 self.check_and_propagate_errors_backward(cx, &global, realm, can_gc);
277 self.check_and_propagate_closing_forward(cx, &global, realm, can_gc);
278 self.check_and_propagate_closing_backward(cx, &global, realm, can_gc);
279
280 let state = self.state.borrow().clone();
282
283 if state != state_before_checks {
287 return;
288 }
289
290 match state {
291 PipeToState::Starting => unreachable!("PipeTo should not be in the Starting state."),
292 PipeToState::PendingReady => {
293 self.read_chunk(&global, realm, can_gc);
295 },
296 PipeToState::PendingRead => {
297 self.write_chunk(cx, &global, result, can_gc);
299
300 if self.shutting_down.get() {
302 return;
303 }
304
305 self.wait_for_writer_ready(&global, realm, can_gc);
307 },
308 PipeToState::ShuttingDownWithPendingWrites(action) => {
309 if let Some(write) = self.pending_writes.borrow_mut().front().cloned() {
312 self.wait_on_pending_write(&global, write, realm, can_gc);
313 return;
314 }
315
316 if let Some(action) = action {
318 self.perform_action(cx, &global, action, realm, can_gc);
320 } else {
321 self.finalize(cx, &global, can_gc);
323 }
324 },
325 PipeToState::ShuttingDownPendingAction => {
326 let Some(ref promise) = *self.shutdown_action_promise.borrow() else {
327 unreachable!();
328 };
329 if promise.is_pending() {
330 return;
334 }
335
336 let is_array_like = {
337 if !result.is_object() {
338 false
339 } else {
340 unsafe { is_array_like::<crate::DomTypeHolder>(*cx, result) }
341 }
342 };
343
344 if !result.is_undefined() && !is_array_like {
346 self.set_shutdown_error(result);
356 }
357 self.finalize(cx, &global, can_gc);
358 },
359 PipeToState::Finalized => {},
360 }
361 }
362}
363
364impl PipeTo {
365 fn set_shutdown_error(&self, error: SafeHandleValue) {
368 *self.shutdown_error.borrow_mut() = Some(Heap::default());
369 let Some(ref heap) = *self.shutdown_error.borrow() else {
370 unreachable!("Option set to Some(heap) above.");
371 };
372 heap.set(error.get())
373 }
374
375 fn wait_for_writer_ready(&self, global: &GlobalScope, realm: InRealm, can_gc: CanGc) {
378 {
379 let mut state = self.state.borrow_mut();
380 *state = PipeToState::PendingReady;
381 }
382
383 let ready_promise = self.writer.Ready();
384 if ready_promise.is_fulfilled() {
385 self.read_chunk(global, realm, can_gc);
386 } else {
387 let handler = PromiseNativeHandler::new(
388 global,
389 Some(Box::new(self.clone())),
390 Some(Box::new(self.clone())),
391 can_gc,
392 );
393 ready_promise.append_native_handler(&handler, realm, can_gc);
394
395 let closed_promise = self.reader.Closed();
399 closed_promise.append_native_handler(&handler, realm, can_gc);
400 }
401 }
402
403 fn read_chunk(&self, global: &GlobalScope, realm: InRealm, can_gc: CanGc) {
405 *self.state.borrow_mut() = PipeToState::PendingRead;
406 let chunk_promise = self.reader.Read(can_gc);
407 let handler = PromiseNativeHandler::new(
408 global,
409 Some(Box::new(self.clone())),
410 Some(Box::new(self.clone())),
411 can_gc,
412 );
413 chunk_promise.append_native_handler(&handler, realm, can_gc);
414
415 let ready_promise = self.writer.Closed();
418 ready_promise.append_native_handler(&handler, realm, can_gc);
419 }
420
421 #[expect(unsafe_code)]
424 fn write_chunk(
425 &self,
426 cx: SafeJSContext,
427 global: &GlobalScope,
428 chunk: SafeHandleValue,
429 can_gc: CanGc,
430 ) -> bool {
431 if chunk.is_object() {
432 rooted!(in(*cx) let object = chunk.to_object());
433 rooted!(in(*cx) let mut bytes = UndefinedValue());
434 let has_value = unsafe {
435 get_dictionary_property(*cx, object.handle(), c"value", bytes.handle_mut(), can_gc)
436 .expect("Chunk should have a value.")
437 };
438 if has_value {
439 let write_promise = self.writer.write(cx, global, bytes.handle(), can_gc);
441 self.pending_writes.borrow_mut().push_back(write_promise);
442 return true;
443 }
444 }
445 false
446 }
447
448 fn wait_on_pending_write(
452 &self,
453 global: &GlobalScope,
454 promise: Rc<Promise>,
455 realm: InRealm,
456 can_gc: CanGc,
457 ) {
458 let handler = PromiseNativeHandler::new(
459 global,
460 Some(Box::new(self.clone())),
461 Some(Box::new(self.clone())),
462 can_gc,
463 );
464 promise.append_native_handler(&handler, realm, can_gc);
465 }
466
467 fn check_and_propagate_errors_forward(
470 &self,
471 cx: SafeJSContext,
472 global: &GlobalScope,
473 realm: InRealm,
474 can_gc: CanGc,
475 ) {
476 if self.shutting_down.get() {
479 return;
480 }
481
482 let source = self
484 .reader
485 .get_stream()
486 .expect("Reader should still have a stream");
487 if source.is_errored() {
488 rooted!(in(*cx) let mut source_error = UndefinedValue());
489 source.get_stored_error(source_error.handle_mut());
490 self.set_shutdown_error(source_error.handle());
491
492 if !self.prevent_abort {
494 self.shutdown(
497 cx,
498 global,
499 Some(ShutdownAction::WritableStreamAbort),
500 realm,
501 can_gc,
502 )
503 } else {
504 self.shutdown(cx, global, None, realm, can_gc);
506 }
507 }
508 }
509
510 fn check_and_propagate_errors_backward(
513 &self,
514 cx: SafeJSContext,
515 global: &GlobalScope,
516 realm: InRealm,
517 can_gc: CanGc,
518 ) {
519 if self.shutting_down.get() {
522 return;
523 }
524
525 let dest = self
527 .writer
528 .get_stream()
529 .expect("Writer should still have a stream");
530 if dest.is_errored() {
531 rooted!(in(*cx) let mut dest_error = UndefinedValue());
532 dest.get_stored_error(dest_error.handle_mut());
533 self.set_shutdown_error(dest_error.handle());
534
535 if !self.prevent_cancel {
537 self.shutdown(
540 cx,
541 global,
542 Some(ShutdownAction::ReadableStreamCancel),
543 realm,
544 can_gc,
545 )
546 } else {
547 self.shutdown(cx, global, None, realm, can_gc);
549 }
550 }
551 }
552
553 fn check_and_propagate_closing_forward(
556 &self,
557 cx: SafeJSContext,
558 global: &GlobalScope,
559 realm: InRealm,
560 can_gc: CanGc,
561 ) {
562 if self.shutting_down.get() {
565 return;
566 }
567
568 let source = self
570 .reader
571 .get_stream()
572 .expect("Reader should still have a stream");
573 if source.is_closed() {
574 if !self.prevent_close {
576 self.shutdown(
579 cx,
580 global,
581 Some(ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation),
582 realm,
583 can_gc,
584 )
585 } else {
586 self.shutdown(cx, global, None, realm, can_gc);
588 }
589 }
590 }
591
592 fn check_and_propagate_closing_backward(
595 &self,
596 cx: SafeJSContext,
597 global: &GlobalScope,
598 realm: InRealm,
599 can_gc: CanGc,
600 ) {
601 if self.shutting_down.get() {
604 return;
605 }
606
607 let dest = self
610 .writer
611 .get_stream()
612 .expect("Writer should still have a stream");
613 if dest.close_queued_or_in_flight() || dest.is_closed() {
614 rooted!(in(*cx) let mut dest_closed = UndefinedValue());
619 let error =
620 Error::Type("Destination is closed or has closed queued or in flight".to_string());
621 error.to_jsval(cx, global, dest_closed.handle_mut(), can_gc);
622 self.set_shutdown_error(dest_closed.handle());
623
624 if !self.prevent_cancel {
626 self.shutdown(
629 cx,
630 global,
631 Some(ShutdownAction::ReadableStreamCancel),
632 realm,
633 can_gc,
634 )
635 } else {
636 self.shutdown(cx, global, None, realm, can_gc);
638 }
639 }
640 }
641
642 fn shutdown(
646 &self,
647 cx: SafeJSContext,
648 global: &GlobalScope,
649 action: Option<ShutdownAction>,
650 realm: InRealm,
651 can_gc: CanGc,
652 ) {
653 if !self.shutting_down.replace(true) {
656 let dest = self.writer.get_stream().expect("Stream must be set");
657 if dest.is_writable() && !dest.close_queued_or_in_flight() {
660 if let Some(write) = self.pending_writes.borrow_mut().front() {
666 *self.state.borrow_mut() = PipeToState::ShuttingDownWithPendingWrites(action);
667 self.wait_on_pending_write(global, write.clone(), realm, can_gc);
668 return;
669 }
670 }
671
672 if let Some(action) = action {
674 self.perform_action(cx, global, action, realm, can_gc);
676 } else {
677 self.finalize(cx, global, can_gc);
679 }
680 }
681 }
682
683 fn perform_action(
686 &self,
687 cx: SafeJSContext,
688 global: &GlobalScope,
689 action: ShutdownAction,
690 realm: InRealm,
691 can_gc: CanGc,
692 ) {
693 rooted!(in(*cx) let mut error = UndefinedValue());
694 if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
695 error.set(shutdown_error.get());
696 }
697
698 *self.state.borrow_mut() = PipeToState::ShuttingDownPendingAction;
699
700 let promise = match action {
702 ShutdownAction::WritableStreamAbort => {
703 let dest = self.writer.get_stream().expect("Stream must be set");
704 dest.abort(cx, global, error.handle(), realm, can_gc)
705 },
706 ShutdownAction::ReadableStreamCancel => {
707 let source = self
708 .reader
709 .get_stream()
710 .expect("Reader should have a stream.");
711 source.cancel(cx, global, error.handle(), can_gc)
712 },
713 ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation => {
714 self.writer.close_with_error_propagation(cx, global, can_gc)
715 },
716 ShutdownAction::Abort => {
717 rooted!(in(*cx) let mut error = UndefinedValue());
722 error.set(self.abort_reason.get());
723
724 let mut actions = vec![];
726
727 if !self.prevent_abort {
729 let dest = self
730 .writer
731 .get_stream()
732 .expect("Destination stream must be set");
733
734 let promise = if dest.is_writable() {
736 dest.abort(cx, global, error.handle(), realm, can_gc)
738 } else {
739 Promise::new_resolved(global, cx, (), can_gc)
741 };
742 actions.push(promise);
743 }
744
745 if !self.prevent_cancel {
747 let source = self.reader.get_stream().expect("Source stream must be set");
748
749 let promise = if source.is_readable() {
751 source.cancel(cx, global, error.handle(), can_gc)
753 } else {
754 Promise::new_resolved(global, cx, (), can_gc)
756 };
757 actions.push(promise);
758 }
759
760 wait_for_all_promise(cx, global, actions, realm, can_gc)
764 },
765 };
766
767 let handler = PromiseNativeHandler::new(
770 global,
771 Some(Box::new(self.clone())),
772 Some(Box::new(self.clone())),
773 can_gc,
774 );
775 promise.append_native_handler(&handler, realm, can_gc);
776 *self.shutdown_action_promise.borrow_mut() = Some(promise);
777 }
778
779 fn finalize(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
781 *self.state.borrow_mut() = PipeToState::Finalized;
782
783 self.writer.release(cx, global, can_gc);
785
786 self.reader
792 .release(can_gc)
793 .expect("Releasing the reader should not fail");
794
795 if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
801 rooted!(in(*cx) let mut error = UndefinedValue());
802 error.set(shutdown_error.get());
803 self.result_promise.reject_native(&error.handle(), can_gc);
805 } else {
806 self.result_promise.resolve_native(&(), can_gc);
808 }
809 }
810}
811
812#[derive(Clone, JSTraceable, MallocSizeOf)]
815struct SourceCancelPromiseFulfillmentHandler {
816 #[conditional_malloc_size_of]
817 result: Rc<Promise>,
818}
819
820impl Callback for SourceCancelPromiseFulfillmentHandler {
821 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
825 let can_gc = CanGc::from_cx(cx);
826 self.result.resolve_native(&(), can_gc);
827 }
828}
829
830#[derive(Clone, JSTraceable, MallocSizeOf)]
833struct SourceCancelPromiseRejectionHandler {
834 #[conditional_malloc_size_of]
835 result: Rc<Promise>,
836}
837
838impl Callback for SourceCancelPromiseRejectionHandler {
839 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
843 let can_gc = CanGc::from_cx(cx);
844 self.result.reject_native(&v, can_gc);
845 }
846}
847
848#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf, PartialEq)]
850pub(crate) enum ReadableStreamState {
851 #[default]
852 Readable,
853 Closed,
854 Errored,
855}
856
857#[derive(JSTraceable, MallocSizeOf)]
859#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
860pub(crate) enum ControllerType {
861 Byte(MutNullableDom<ReadableByteStreamController>),
863 Default(MutNullableDom<ReadableStreamDefaultController>),
865}
866
867#[derive(JSTraceable, MallocSizeOf)]
869#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
870pub(crate) enum ReaderType {
871 #[allow(clippy::upper_case_acronyms)]
873 BYOB(MutNullableDom<ReadableStreamBYOBReader>),
874 Default(MutNullableDom<ReadableStreamDefaultReader>),
876}
877
878impl Eq for ReaderType {}
879impl PartialEq for ReaderType {
880 fn eq(&self, other: &Self) -> bool {
881 matches!(
882 (self, other),
883 (ReaderType::BYOB(_), ReaderType::BYOB(_)) |
884 (ReaderType::Default(_), ReaderType::Default(_))
885 )
886 }
887}
888
889#[cfg_attr(crown, expect(crown::unrooted_must_root))]
891pub(crate) fn create_readable_stream(
892 global: &GlobalScope,
893 underlying_source_type: UnderlyingSourceType,
894 queuing_strategy: Option<Rc<QueuingStrategySize>>,
895 high_water_mark: Option<f64>,
896 can_gc: CanGc,
897) -> DomRoot<ReadableStream> {
898 let high_water_mark = high_water_mark.unwrap_or(1.0);
900
901 let size_algorithm =
903 queuing_strategy.unwrap_or(extract_size_algorithm(&QueuingStrategy::empty(), can_gc));
904
905 assert!(high_water_mark >= 0.0);
907
908 let stream = ReadableStream::new_with_proto(global, None, can_gc);
911
912 let controller = ReadableStreamDefaultController::new(
914 global,
915 underlying_source_type,
916 high_water_mark,
917 size_algorithm,
918 can_gc,
919 );
920
921 controller
924 .setup(stream.clone(), can_gc)
925 .expect("Setup of default controller cannot fail");
926
927 stream
929}
930
931#[cfg_attr(crown, expect(crown::unrooted_must_root))]
933pub(crate) fn readable_byte_stream_tee(
934 global: &GlobalScope,
935 underlying_source_type: UnderlyingSourceType,
936 can_gc: CanGc,
937) -> DomRoot<ReadableStream> {
938 let tee_stream = ReadableStream::new_with_proto(global, None, can_gc);
941
942 let controller = ReadableByteStreamController::new(underlying_source_type, 0.0, global, can_gc);
944
945 controller
947 .setup(global, tee_stream.clone(), can_gc)
948 .expect("Setup of byte stream controller cannot fail");
949
950 tee_stream
952}
953
954#[dom_struct]
956pub(crate) struct ReadableStream {
957 reflector_: Reflector,
958
959 controller: RefCell<Option<ControllerType>>,
963
964 #[ignore_malloc_size_of = "mozjs"]
966 stored_error: Heap<JSVal>,
967
968 disturbed: Cell<bool>,
970
971 reader: RefCell<Option<ReaderType>>,
973
974 state: Cell<ReadableStreamState>,
976}
977
978impl ReadableStream {
979 fn new_inherited() -> ReadableStream {
981 ReadableStream {
982 reflector_: Reflector::new(),
983 controller: RefCell::new(None),
984 stored_error: Heap::default(),
985 disturbed: Default::default(),
986 reader: RefCell::new(None),
987 state: Cell::new(Default::default()),
988 }
989 }
990
991 pub(crate) fn new_with_proto(
992 global: &GlobalScope,
993 proto: Option<SafeHandleObject>,
994 can_gc: CanGc,
995 ) -> DomRoot<ReadableStream> {
996 reflect_dom_object_with_proto(
997 Box::new(ReadableStream::new_inherited()),
998 global,
999 proto,
1000 can_gc,
1001 )
1002 }
1003
1004 pub(crate) fn set_default_controller(&self, controller: &ReadableStreamDefaultController) {
1007 *self.controller.borrow_mut() = Some(ControllerType::Default(MutNullableDom::new(Some(
1008 controller,
1009 ))));
1010 }
1011
1012 pub(crate) fn set_byte_controller(&self, controller: &ReadableByteStreamController) {
1015 *self.controller.borrow_mut() =
1016 Some(ControllerType::Byte(MutNullableDom::new(Some(controller))));
1017 }
1018
1019 pub(crate) fn assert_no_controller(&self) {
1022 let has_no_controller = self.controller.borrow().is_none();
1023 assert!(has_no_controller);
1024 }
1025
1026 pub(crate) fn new_from_bytes(
1028 global: &GlobalScope,
1029 bytes: Vec<u8>,
1030 can_gc: CanGc,
1031 ) -> Fallible<DomRoot<ReadableStream>> {
1032 let stream = ReadableStream::new_with_external_underlying_source(
1033 global,
1034 UnderlyingSourceType::Memory(bytes.len()),
1035 can_gc,
1036 )?;
1037 stream.enqueue_native(bytes, can_gc);
1038 stream.controller_close_native(can_gc);
1039 Ok(stream)
1040 }
1041
1042 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1045 pub(crate) fn new_with_external_underlying_source(
1046 global: &GlobalScope,
1047 source: UnderlyingSourceType,
1048 can_gc: CanGc,
1049 ) -> Fallible<DomRoot<ReadableStream>> {
1050 assert!(source.is_native());
1051 let stream = ReadableStream::new_with_proto(global, None, can_gc);
1052 let controller = ReadableStreamDefaultController::new(
1053 global,
1054 source,
1055 1.0,
1056 extract_size_algorithm(&QueuingStrategy::empty(), can_gc),
1057 can_gc,
1058 );
1059 controller.setup(stream.clone(), can_gc)?;
1060 Ok(stream)
1061 }
1062
1063 pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
1065 match self.controller.borrow().as_ref() {
1066 Some(ControllerType::Default(controller)) => {
1067 let controller = controller
1068 .get()
1069 .ok_or_else(|| Error::Type("Stream should have controller.".to_string()))?;
1070 controller.perform_release_steps()
1071 },
1072 Some(ControllerType::Byte(controller)) => {
1073 let controller = controller
1074 .get()
1075 .ok_or_else(|| Error::Type("Stream should have controller.".to_string()))?;
1076 controller.perform_release_steps()
1077 },
1078 None => Err(Error::Type("Stream should have controller.".to_string())),
1079 }
1080 }
1081
1082 pub(crate) fn perform_pull_steps(
1086 &self,
1087 cx: SafeJSContext,
1088 read_request: &ReadRequest,
1089 can_gc: CanGc,
1090 ) {
1091 match self.controller.borrow().as_ref() {
1092 Some(ControllerType::Default(controller)) => controller
1093 .get()
1094 .expect("Stream should have controller.")
1095 .perform_pull_steps(read_request, can_gc),
1096 Some(ControllerType::Byte(controller)) => controller
1097 .get()
1098 .expect("Stream should have controller.")
1099 .perform_pull_steps(cx, read_request, can_gc),
1100 None => {
1101 unreachable!("Stream does not have a controller.");
1102 },
1103 }
1104 }
1105
1106 pub(crate) fn perform_pull_into(
1110 &self,
1111 cx: SafeJSContext,
1112 read_into_request: &ReadIntoRequest,
1113 view: HeapBufferSource<ArrayBufferViewU8>,
1114 min: u64,
1115 can_gc: CanGc,
1116 ) {
1117 match self.controller.borrow().as_ref() {
1118 Some(ControllerType::Byte(controller)) => controller
1119 .get()
1120 .expect("Stream should have controller.")
1121 .perform_pull_into(cx, read_into_request, view, min, can_gc),
1122 _ => {
1123 unreachable!(
1124 "Pulling a chunk from a stream with a default controller using a BYOB reader"
1125 )
1126 },
1127 }
1128 }
1129
1130 pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
1132 match self.reader.borrow().as_ref() {
1133 Some(ReaderType::Default(reader)) => {
1134 let Some(reader) = reader.get() else {
1135 panic!("Attempt to add a read request without having first acquired a reader.");
1136 };
1137
1138 assert!(self.is_readable());
1140
1141 reader.add_read_request(read_request);
1143 },
1144 _ => {
1145 unreachable!("Adding a read request can only be done on a default reader.")
1146 },
1147 }
1148 }
1149
1150 pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
1152 match self.reader.borrow().as_ref() {
1153 Some(ReaderType::BYOB(reader)) => {
1155 let Some(reader) = reader.get() else {
1156 unreachable!(
1157 "Attempt to add a read into request without having first acquired a reader."
1158 );
1159 };
1160
1161 assert!(self.is_readable() || self.is_closed());
1163
1164 reader.add_read_into_request(read_request);
1166 },
1167 _ => {
1168 unreachable!("Adding a read into request can only be done on a BYOB reader.")
1169 },
1170 }
1171 }
1172
1173 pub(crate) fn enqueue_native(&self, bytes: Vec<u8>, can_gc: CanGc) {
1176 match self.controller.borrow().as_ref() {
1177 Some(ControllerType::Default(controller)) => controller
1178 .get()
1179 .expect("Stream should have controller.")
1180 .enqueue_native(bytes, can_gc),
1181 _ => {
1182 unreachable!(
1183 "Enqueueing chunk to a stream from Rust on other than default controller"
1184 );
1185 },
1186 }
1187 }
1188
1189 pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
1191 assert!(self.is_readable());
1193
1194 self.state.set(ReadableStreamState::Errored);
1196
1197 self.stored_error.set(e.get());
1199
1200 let default_reader = {
1203 let reader_ref = self.reader.borrow();
1204 match reader_ref.as_ref() {
1205 Some(ReaderType::Default(reader)) => reader.get(),
1206 _ => None,
1207 }
1208 };
1209
1210 if let Some(reader) = default_reader {
1211 reader.error(e, can_gc);
1213 return;
1214 }
1215
1216 let byob_reader = {
1217 let reader_ref = self.reader.borrow();
1218 match reader_ref.as_ref() {
1219 Some(ReaderType::BYOB(reader)) => reader.get(),
1220 _ => None,
1221 }
1222 };
1223
1224 if let Some(reader) = byob_reader {
1225 reader.error_read_into_requests(e, can_gc);
1227 }
1228
1229 }
1231
1232 pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
1234 handle_mut.set(self.stored_error.get());
1235 }
1236
1237 pub(crate) fn error_native(&self, error: Error, can_gc: CanGc) {
1240 let cx = GlobalScope::get_cx();
1241 rooted!(in(*cx) let mut error_val = UndefinedValue());
1242 error.to_jsval(cx, &self.global(), error_val.handle_mut(), can_gc);
1243 self.error(error_val.handle(), can_gc);
1244 }
1245
1246 pub(crate) fn controller_close_native(&self, can_gc: CanGc) {
1249 match self.controller.borrow().as_ref() {
1250 Some(ControllerType::Default(controller)) => {
1251 let _ = controller
1252 .get()
1253 .expect("Stream should have controller.")
1254 .Close(can_gc);
1255 },
1256 _ => {
1257 unreachable!("Native closing is only done on default controllers.")
1258 },
1259 }
1260 }
1261
1262 pub(crate) fn in_memory(&self) -> bool {
1265 match self.controller.borrow().as_ref() {
1266 Some(ControllerType::Default(controller)) => controller
1267 .get()
1268 .expect("Stream should have controller.")
1269 .in_memory(),
1270 _ => {
1271 unreachable!(
1272 "Checking if source is in memory for a stream with a non-default controller"
1273 )
1274 },
1275 }
1276 }
1277
1278 pub(crate) fn get_in_memory_bytes(&self) -> Option<GenericSharedMemory> {
1281 match self.controller.borrow().as_ref() {
1282 Some(ControllerType::Default(controller)) => controller
1283 .get()
1284 .expect("Stream should have controller.")
1285 .get_in_memory_bytes()
1286 .as_deref()
1287 .map(GenericSharedMemory::from_bytes),
1288 _ => {
1289 unreachable!("Getting in-memory bytes for a stream with a non-default controller")
1290 },
1291 }
1292 }
1293
1294 pub(crate) fn acquire_default_reader(
1299 &self,
1300 can_gc: CanGc,
1301 ) -> Fallible<DomRoot<ReadableStreamDefaultReader>> {
1302 let reader = ReadableStreamDefaultReader::new(&self.global(), can_gc);
1304
1305 reader.set_up(self, &self.global(), can_gc)?;
1307
1308 Ok(reader)
1310 }
1311
1312 pub(crate) fn acquire_byob_reader(
1314 &self,
1315 can_gc: CanGc,
1316 ) -> Fallible<DomRoot<ReadableStreamBYOBReader>> {
1317 let reader = ReadableStreamBYOBReader::new(&self.global(), can_gc);
1319 reader.set_up(self, &self.global(), can_gc)?;
1321
1322 Ok(reader)
1324 }
1325
1326 pub(crate) fn get_default_controller(&self) -> DomRoot<ReadableStreamDefaultController> {
1327 match self.controller.borrow().as_ref() {
1328 Some(ControllerType::Default(controller)) => {
1329 controller.get().expect("Stream should have controller.")
1330 },
1331 _ => {
1332 unreachable!(
1333 "Getting default controller for a stream with a non-default controller"
1334 )
1335 },
1336 }
1337 }
1338
1339 pub(crate) fn get_byte_controller(&self) -> DomRoot<ReadableByteStreamController> {
1340 match self.controller.borrow().as_ref() {
1341 Some(ControllerType::Byte(controller)) => {
1342 controller.get().expect("Stream should have controller.")
1343 },
1344 _ => {
1345 unreachable!("Getting byte controller for a stream with a non-byte controller")
1346 },
1347 }
1348 }
1349
1350 pub(crate) fn get_default_reader(&self) -> DomRoot<ReadableStreamDefaultReader> {
1351 match self.reader.borrow().as_ref() {
1352 Some(ReaderType::Default(reader)) => reader.get().expect("Stream should have reader."),
1353 _ => {
1354 unreachable!("Getting default reader for a stream with a non-default reader")
1355 },
1356 }
1357 }
1358
1359 pub(crate) fn read_a_chunk(&self, can_gc: CanGc) -> Rc<Promise> {
1365 match self.reader.borrow().as_ref() {
1366 Some(ReaderType::Default(reader)) => {
1367 let Some(reader) = reader.get() else {
1368 unreachable!(
1369 "Attempt to read stream chunk without having first acquired a reader."
1370 );
1371 };
1372 reader.Read(can_gc)
1373 },
1374 _ => {
1375 unreachable!("Native reading of a chunk can only be done with a default reader.")
1376 },
1377 }
1378 }
1379
1380 pub(crate) fn stop_reading(&self, can_gc: CanGc) {
1385 let reader_ref = self.reader.borrow();
1386
1387 match reader_ref.as_ref() {
1388 Some(ReaderType::Default(reader)) => {
1389 let Some(reader) = reader.get() else {
1390 unreachable!("Attempt to stop reading without having first acquired a reader.");
1391 };
1392
1393 drop(reader_ref);
1394 reader.release(can_gc).expect("Reader release cannot fail.");
1395 },
1396 _ => {
1397 unreachable!("Native stop reading can only be done with a default reader.")
1398 },
1399 }
1400 }
1401
1402 pub(crate) fn is_locked(&self) -> bool {
1404 match self.reader.borrow().as_ref() {
1405 Some(ReaderType::Default(reader)) => reader.get().is_some(),
1406 Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1407 None => false,
1408 }
1409 }
1410
1411 pub(crate) fn is_disturbed(&self) -> bool {
1412 self.disturbed.get()
1413 }
1414
1415 pub(crate) fn set_is_disturbed(&self, disturbed: bool) {
1416 self.disturbed.set(disturbed);
1417 }
1418
1419 pub(crate) fn is_closed(&self) -> bool {
1420 self.state.get() == ReadableStreamState::Closed
1421 }
1422
1423 pub(crate) fn is_errored(&self) -> bool {
1424 self.state.get() == ReadableStreamState::Errored
1425 }
1426
1427 pub(crate) fn is_readable(&self) -> bool {
1428 self.state.get() == ReadableStreamState::Readable
1429 }
1430
1431 pub(crate) fn has_default_reader(&self) -> bool {
1432 match self.reader.borrow().as_ref() {
1433 Some(ReaderType::Default(reader)) => reader.get().is_some(),
1434 _ => false,
1435 }
1436 }
1437
1438 pub(crate) fn has_byob_reader(&self) -> bool {
1439 match self.reader.borrow().as_ref() {
1440 Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1441 _ => false,
1442 }
1443 }
1444
1445 pub(crate) fn has_byte_controller(&self) -> bool {
1446 match self.controller.borrow().as_ref() {
1447 Some(ControllerType::Byte(controller)) => controller.get().is_some(),
1448 _ => false,
1449 }
1450 }
1451
1452 pub(crate) fn get_num_read_requests(&self) -> usize {
1454 match self.reader.borrow().as_ref() {
1455 Some(ReaderType::Default(reader)) => {
1456 let reader = reader
1457 .get()
1458 .expect("Stream must have a reader when getting the number of read requests.");
1459 reader.get_num_read_requests()
1460 },
1461 _ => unreachable!(
1462 "Stream must have a default reader when get num read requests is called into."
1463 ),
1464 }
1465 }
1466
1467 pub(crate) fn get_num_read_into_requests(&self) -> usize {
1469 assert!(self.has_byob_reader());
1470
1471 match self.reader.borrow().as_ref() {
1472 Some(ReaderType::BYOB(reader)) => {
1473 let Some(reader) = reader.get() else {
1474 unreachable!(
1475 "Stream must have a reader when get num read into requests is called into."
1476 );
1477 };
1478 reader.get_num_read_into_requests()
1479 },
1480 _ => {
1481 unreachable!(
1482 "Stream must have a BYOB reader when get num read into requests is called into."
1483 );
1484 },
1485 }
1486 }
1487
1488 pub(crate) fn fulfill_read_request(&self, chunk: SafeHandleValue, done: bool, can_gc: CanGc) {
1490 assert!(self.has_default_reader());
1492
1493 match self.reader.borrow().as_ref() {
1494 Some(ReaderType::Default(reader)) => {
1495 let reader = reader
1497 .get()
1498 .expect("Stream must have a reader when a read request is fulfilled.");
1499 assert_ne!(reader.get_num_read_requests(), 0);
1501 let request = reader.remove_read_request();
1504
1505 if done {
1506 request.close_steps(can_gc);
1508 } else {
1509 let result = RootedTraceableBox::new(Heap::default());
1511 result.set(*chunk);
1512 request.chunk_steps(result, &self.global(), can_gc);
1513 }
1514 },
1515 _ => {
1516 unreachable!(
1517 "Stream must have a default reader when fulfill read requests is called into."
1518 );
1519 },
1520 }
1521 }
1522
1523 pub(crate) fn fulfill_read_into_request(
1525 &self,
1526 chunk: SafeHandleValue,
1527 done: bool,
1528 can_gc: CanGc,
1529 ) {
1530 assert!(self.has_byob_reader());
1532
1533 match self.reader.borrow().as_ref() {
1535 Some(ReaderType::BYOB(reader)) => {
1536 let Some(reader) = reader.get() else {
1537 unreachable!(
1538 "Stream must have a reader when a read into request is fulfilled."
1539 );
1540 };
1541
1542 assert!(reader.get_num_read_into_requests() > 0);
1544
1545 let read_into_request = reader.remove_read_into_request();
1548
1549 let result = RootedTraceableBox::new(Heap::default());
1551 if done {
1552 result.set(*chunk);
1553 read_into_request.close_steps(Some(result), can_gc);
1554 } else {
1555 result.set(*chunk);
1557 read_into_request.chunk_steps(result, can_gc);
1558 }
1559 },
1560 _ => {
1561 unreachable!(
1562 "Stream must have a BYOB reader when fulfill read into requests is called into."
1563 );
1564 },
1565 };
1566 }
1567
1568 pub(crate) fn close(&self, can_gc: CanGc) {
1570 assert!(self.is_readable());
1572 self.state.set(ReadableStreamState::Closed);
1574 let default_reader = {
1580 let reader_ref = self.reader.borrow();
1581 match reader_ref.as_ref() {
1582 Some(ReaderType::Default(reader)) => reader.get(),
1583 _ => None,
1584 }
1585 };
1586
1587 if let Some(reader) = default_reader {
1588 reader.close(can_gc);
1590 return;
1591 }
1592
1593 let byob_reader = {
1595 let reader_ref = self.reader.borrow();
1596 match reader_ref.as_ref() {
1597 Some(ReaderType::BYOB(reader)) => reader.get(),
1598 _ => None,
1599 }
1600 };
1601
1602 if let Some(reader) = byob_reader {
1603 reader.close(can_gc);
1605 }
1606
1607 }
1609
1610 pub(crate) fn cancel(
1612 &self,
1613 cx: SafeJSContext,
1614 global: &GlobalScope,
1615 reason: SafeHandleValue,
1616 can_gc: CanGc,
1617 ) -> Rc<Promise> {
1618 self.disturbed.set(true);
1620
1621 if self.is_closed() {
1623 return Promise::new_resolved(global, cx, (), can_gc);
1624 }
1625 if self.is_errored() {
1627 let promise = Promise::new(global, can_gc);
1628 rooted!(in(*cx) let mut rval = UndefinedValue());
1629 self.stored_error
1630 .safe_to_jsval(cx, rval.handle_mut(), can_gc);
1631 promise.reject_native(&rval.handle(), can_gc);
1632 return promise;
1633 }
1634 self.close(can_gc);
1636
1637 let byob_reader = {
1639 let reader_ref = self.reader.borrow();
1640 match reader_ref.as_ref() {
1641 Some(ReaderType::BYOB(reader)) => reader.get(),
1642 _ => None,
1643 }
1644 };
1645
1646 if let Some(reader) = byob_reader {
1647 reader.cancel(can_gc);
1649 }
1650
1651 let source_cancel_promise = match self.controller.borrow().as_ref() {
1654 Some(ControllerType::Default(controller)) => controller
1655 .get()
1656 .expect("Stream should have controller.")
1657 .perform_cancel_steps(cx, global, reason, can_gc),
1658 Some(ControllerType::Byte(controller)) => controller
1659 .get()
1660 .expect("Stream should have controller.")
1661 .perform_cancel_steps(cx, global, reason, can_gc),
1662 None => {
1663 panic!("Stream does not have a controller.");
1664 },
1665 };
1666
1667 let global = self.global();
1670 let result_promise = Promise::new(&global, can_gc);
1671 let fulfillment_handler = Box::new(SourceCancelPromiseFulfillmentHandler {
1672 result: result_promise.clone(),
1673 });
1674 let rejection_handler = Box::new(SourceCancelPromiseRejectionHandler {
1675 result: result_promise.clone(),
1676 });
1677 let handler = PromiseNativeHandler::new(
1678 &global,
1679 Some(fulfillment_handler),
1680 Some(rejection_handler),
1681 can_gc,
1682 );
1683 let realm = enter_realm(&*global);
1684 let comp = InRealm::Entered(&realm);
1685 source_cancel_promise.append_native_handler(&handler, comp, can_gc);
1686
1687 result_promise
1690 }
1691
1692 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1693 pub(crate) fn set_reader(&self, new_reader: Option<ReaderType>) {
1694 *self.reader.borrow_mut() = new_reader;
1695 }
1696
1697 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1698 fn byte_tee(&self, can_gc: CanGc) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1700 let reader = self.acquire_default_reader(can_gc)?;
1705 let reader = Rc::new(RefCell::new(ReaderType::Default(MutNullableDom::new(
1706 Some(&reader),
1707 ))));
1708
1709 let reading = Rc::new(Cell::new(false));
1711
1712 let read_again_for_branch_1 = Rc::new(Cell::new(false));
1714
1715 let read_again_for_branch_2 = Rc::new(Cell::new(false));
1717
1718 let canceled_1 = Rc::new(Cell::new(false));
1720
1721 let canceled_2 = Rc::new(Cell::new(false));
1723
1724 let reason_1 = Rc::new(Heap::boxed(UndefinedValue()));
1726
1727 let reason_2 = Rc::new(Heap::boxed(UndefinedValue()));
1729
1730 let cancel_promise = Promise::new(&self.global(), can_gc);
1732 let reader_version = Rc::new(Cell::new(0));
1733
1734 let byte_tee_source_1 = ByteTeeUnderlyingSource::new(
1735 reader.clone(),
1736 self,
1737 reading.clone(),
1738 read_again_for_branch_1.clone(),
1739 read_again_for_branch_2.clone(),
1740 canceled_1.clone(),
1741 canceled_2.clone(),
1742 reason_1.clone(),
1743 reason_2.clone(),
1744 cancel_promise.clone(),
1745 reader_version.clone(),
1746 ByteTeeCancelAlgorithm::Cancel1Algorithm,
1747 ByteTeePullAlgorithm::Pull1Algorithm,
1748 can_gc,
1749 );
1750
1751 let byte_tee_source_2 = ByteTeeUnderlyingSource::new(
1752 reader.clone(),
1753 self,
1754 reading,
1755 read_again_for_branch_1,
1756 read_again_for_branch_2,
1757 canceled_1,
1758 canceled_2,
1759 reason_1,
1760 reason_2,
1761 cancel_promise.clone(),
1762 reader_version,
1763 ByteTeeCancelAlgorithm::Cancel2Algorithm,
1764 ByteTeePullAlgorithm::Pull2Algorithm,
1765 can_gc,
1766 );
1767
1768 let branch_1 = readable_byte_stream_tee(
1770 &self.global(),
1771 UnderlyingSourceType::TeeByte(Dom::from_ref(&byte_tee_source_1)),
1772 can_gc,
1773 );
1774 byte_tee_source_1.set_branch_1(&branch_1);
1775 byte_tee_source_2.set_branch_1(&branch_1);
1776
1777 let branch_2 = readable_byte_stream_tee(
1779 &self.global(),
1780 UnderlyingSourceType::TeeByte(Dom::from_ref(&byte_tee_source_2)),
1781 can_gc,
1782 );
1783 byte_tee_source_1.set_branch_2(&branch_2);
1784 byte_tee_source_2.set_branch_2(&branch_2);
1785
1786 byte_tee_source_1.forward_reader_error(reader.clone(), can_gc);
1788 byte_tee_source_2.forward_reader_error(reader, can_gc);
1789
1790 Ok(vec![branch_1, branch_2])
1792 }
1793
1794 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1796 fn default_tee(
1797 &self,
1798 clone_for_branch_2: bool,
1799 can_gc: CanGc,
1800 ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1801 let clone_for_branch_2 = Rc::new(Cell::new(clone_for_branch_2));
1805
1806 let reader = self.acquire_default_reader(can_gc)?;
1808
1809 let reading = Rc::new(Cell::new(false));
1811 let read_again = Rc::new(Cell::new(false));
1813 let canceled_1 = Rc::new(Cell::new(false));
1815 let canceled_2 = Rc::new(Cell::new(false));
1817
1818 let reason_1 = Rc::new(Heap::boxed(UndefinedValue()));
1820 let reason_2 = Rc::new(Heap::boxed(UndefinedValue()));
1822 let cancel_promise = Promise::new(&self.global(), can_gc);
1824
1825 let tee_source_1 = DefaultTeeUnderlyingSource::new(
1826 &reader,
1827 self,
1828 reading.clone(),
1829 read_again.clone(),
1830 canceled_1.clone(),
1831 canceled_2.clone(),
1832 clone_for_branch_2.clone(),
1833 reason_1.clone(),
1834 reason_2.clone(),
1835 cancel_promise.clone(),
1836 DefaultTeeCancelAlgorithm::Cancel1Algorithm,
1837 can_gc,
1838 );
1839
1840 let underlying_source_type_branch_1 =
1841 UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_1));
1842
1843 let tee_source_2 = DefaultTeeUnderlyingSource::new(
1844 &reader,
1845 self,
1846 reading,
1847 read_again,
1848 canceled_1.clone(),
1849 canceled_2.clone(),
1850 clone_for_branch_2,
1851 reason_1,
1852 reason_2,
1853 cancel_promise.clone(),
1854 DefaultTeeCancelAlgorithm::Cancel2Algorithm,
1855 can_gc,
1856 );
1857
1858 let underlying_source_type_branch_2 =
1859 UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_2));
1860
1861 let branch_1 = create_readable_stream(
1863 &self.global(),
1864 underlying_source_type_branch_1,
1865 None,
1866 None,
1867 can_gc,
1868 );
1869 tee_source_1.set_branch_1(&branch_1);
1870 tee_source_2.set_branch_1(&branch_1);
1871
1872 let branch_2 = create_readable_stream(
1874 &self.global(),
1875 underlying_source_type_branch_2,
1876 None,
1877 None,
1878 can_gc,
1879 );
1880 tee_source_1.set_branch_2(&branch_2);
1881 tee_source_2.set_branch_2(&branch_2);
1882
1883 reader.default_tee_append_native_handler_to_closed_promise(
1885 &branch_1,
1886 &branch_2,
1887 canceled_1,
1888 canceled_2,
1889 cancel_promise,
1890 can_gc,
1891 );
1892
1893 Ok(vec![branch_1, branch_2])
1895 }
1896
1897 #[allow(clippy::too_many_arguments)]
1899 pub(crate) fn pipe_to(
1900 &self,
1901 cx: SafeJSContext,
1902 global: &GlobalScope,
1903 dest: &WritableStream,
1904 prevent_close: bool,
1905 prevent_abort: bool,
1906 prevent_cancel: bool,
1907 signal: Option<&AbortSignal>,
1908 realm: InRealm,
1909 can_gc: CanGc,
1910 ) -> Rc<Promise> {
1911 assert!(!self.is_locked());
1922
1923 assert!(!dest.is_locked());
1925
1926 let reader = self
1934 .acquire_default_reader(can_gc)
1935 .expect("Acquiring a default reader for pipe_to cannot fail");
1936
1937 let writer = dest
1939 .aquire_default_writer(cx, global, can_gc)
1940 .expect("Acquiring a default writer for pipe_to cannot fail");
1941
1942 self.disturbed.set(true);
1944
1945 let promise = Promise::new(global, can_gc);
1950
1951 rooted!(in(*cx) let pipe_to = PipeTo {
1953 reader: Dom::from_ref(&reader),
1954 writer: Dom::from_ref(&writer),
1955 pending_writes: Default::default(),
1956 state: Default::default(),
1957 prevent_abort,
1958 prevent_cancel,
1959 prevent_close,
1960 shutting_down: Default::default(),
1961 abort_reason: Default::default(),
1962 shutdown_error: Default::default(),
1963 shutdown_action_promise: Default::default(),
1964 result_promise: promise.clone(),
1965 });
1966
1967 if let Some(signal) = signal {
1970 rooted!(in(*cx) let abort_algorithm = AbortAlgorithm::StreamPiping(pipe_to.clone()));
1973
1974 if signal.aborted() {
1976 signal.run_abort_algorithm(cx, global, &abort_algorithm, realm, can_gc);
1977 return promise;
1978 }
1979
1980 signal.add(&abort_algorithm);
1982 }
1983
1984 pipe_to.check_and_propagate_errors_forward(cx, global, realm, can_gc);
1986 pipe_to.check_and_propagate_errors_backward(cx, global, realm, can_gc);
1987 pipe_to.check_and_propagate_closing_forward(cx, global, realm, can_gc);
1988 pipe_to.check_and_propagate_closing_backward(cx, global, realm, can_gc);
1989
1990 if *pipe_to.state.borrow() == PipeToState::Starting {
1992 pipe_to.wait_for_writer_ready(global, realm, can_gc);
1994 }
1995
1996 promise
1998 }
1999
2000 pub(crate) fn tee(
2002 &self,
2003 clone_for_branch_2: bool,
2004 can_gc: CanGc,
2005 ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
2006 match self.controller.borrow().as_ref() {
2010 Some(ControllerType::Default(_)) => {
2011 self.default_tee(clone_for_branch_2, can_gc)
2013 },
2014 Some(ControllerType::Byte(_)) => {
2015 self.byte_tee(can_gc)
2018 },
2019 None => {
2020 unreachable!("Stream should have a controller.");
2021 },
2022 }
2023 }
2024
2025 pub(crate) fn set_up_byte_controller(
2027 &self,
2028 global: &GlobalScope,
2029 underlying_source_dict: JsUnderlyingSource,
2030 underlying_source_handle: SafeHandleObject,
2031 stream: DomRoot<ReadableStream>,
2032 strategy_hwm: f64,
2033 can_gc: CanGc,
2034 ) -> Fallible<()> {
2035 if let Some(0) = underlying_source_dict.autoAllocateChunkSize {
2051 return Err(Error::Type("autoAllocateChunkSize cannot be 0".to_owned()));
2052 }
2053
2054 let controller = ReadableByteStreamController::new(
2055 UnderlyingSourceType::Js(underlying_source_dict, Heap::default()),
2056 strategy_hwm,
2057 global,
2058 can_gc,
2059 );
2060
2061 controller.set_underlying_source_this_object(underlying_source_handle);
2064
2065 controller.setup(global, stream, can_gc)
2068 }
2069
2070 pub(crate) fn setup_cross_realm_transform_readable(
2072 &self,
2073 cx: SafeJSContext,
2074 port: &MessagePort,
2075 can_gc: CanGc,
2076 ) {
2077 let port_id = port.message_port_id();
2078 let global = self.global();
2079
2080 let size_algorithm = extract_size_algorithm(&QueuingStrategy::default(), can_gc);
2085
2086 let controller = ReadableStreamDefaultController::new(
2090 &self.global(),
2091 UnderlyingSourceType::Transfer(Dom::from_ref(port)),
2092 0.,
2093 size_algorithm,
2094 can_gc,
2095 );
2096
2097 rooted!(in(*cx) let cross_realm_transform_readable = CrossRealmTransformReadable {
2100 controller: Dom::from_ref(&controller),
2101 });
2102 global.note_cross_realm_transform_readable(&cross_realm_transform_readable, port_id);
2103
2104 port.Start(can_gc);
2106
2107 controller
2109 .setup(DomRoot::from_ref(self), can_gc)
2110 .expect("Setting up controller for transfer cannot fail.");
2111 }
2112}
2113
2114impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
2115 fn Constructor(
2117 cx: SafeJSContext,
2118 global: &GlobalScope,
2119 proto: Option<SafeHandleObject>,
2120 can_gc: CanGc,
2121 underlying_source: Option<*mut JSObject>,
2122 strategy: &QueuingStrategy,
2123 ) -> Fallible<DomRoot<Self>> {
2124 rooted!(in(*cx) let underlying_source_obj = underlying_source.unwrap_or(ptr::null_mut()));
2126 let underlying_source_dict = if !underlying_source_obj.is_null() {
2129 rooted!(in(*cx) let obj_val = ObjectValue(underlying_source_obj.get()));
2130 match JsUnderlyingSource::new(cx, obj_val.handle(), can_gc) {
2131 Ok(ConversionResult::Success(val)) => val,
2132 Ok(ConversionResult::Failure(error)) => return Err(Error::Type(error.to_string())),
2133 _ => {
2134 return Err(Error::JSFailed);
2135 },
2136 }
2137 } else {
2138 JsUnderlyingSource::empty()
2139 };
2140
2141 let stream = ReadableStream::new_with_proto(global, proto, can_gc);
2143
2144 if underlying_source_dict.type_.is_some() {
2145 if strategy.size.is_some() {
2147 return Err(Error::Range(
2148 "size is not supported for byte streams".to_owned(),
2149 ));
2150 }
2151
2152 let strategy_hwm = extract_high_water_mark(strategy, 0.0)?;
2154
2155 stream.set_up_byte_controller(
2158 global,
2159 underlying_source_dict,
2160 underlying_source_obj.handle(),
2161 stream.clone(),
2162 strategy_hwm,
2163 can_gc,
2164 )?;
2165 } else {
2166 let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
2168
2169 let size_algorithm = extract_size_algorithm(strategy, can_gc);
2171
2172 let controller = ReadableStreamDefaultController::new(
2173 global,
2174 UnderlyingSourceType::Js(underlying_source_dict, Heap::default()),
2175 high_water_mark,
2176 size_algorithm,
2177 can_gc,
2178 );
2179
2180 controller.set_underlying_source_this_object(underlying_source_obj.handle());
2183
2184 controller.setup(stream.clone(), can_gc)?;
2186 };
2187
2188 Ok(stream)
2189 }
2190
2191 fn Locked(&self) -> bool {
2193 self.is_locked()
2194 }
2195
2196 fn Cancel(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
2198 let global = self.global();
2199 if self.is_locked() {
2200 let promise = Promise::new(&global, can_gc);
2203 promise.reject_error(Error::Type("stream is locked".to_owned()), can_gc);
2204 promise
2205 } else {
2206 self.cancel(cx, &global, reason, can_gc)
2208 }
2209 }
2210
2211 fn GetReader(
2213 &self,
2214 options: &ReadableStreamGetReaderOptions,
2215 can_gc: CanGc,
2216 ) -> Fallible<ReadableStreamReader> {
2217 if options.mode.is_none() {
2219 return Ok(ReadableStreamReader::ReadableStreamDefaultReader(
2220 self.acquire_default_reader(can_gc)?,
2221 ));
2222 }
2223 assert!(options.mode.unwrap() == ReadableStreamReaderMode::Byob);
2225
2226 Ok(ReadableStreamReader::ReadableStreamBYOBReader(
2228 self.acquire_byob_reader(can_gc)?,
2229 ))
2230 }
2231
2232 fn Tee(&self, can_gc: CanGc) -> Fallible<Vec<DomRoot<ReadableStream>>> {
2234 self.tee(false, can_gc)
2236 }
2237
2238 fn PipeTo(
2240 &self,
2241 destination: &WritableStream,
2242 options: &StreamPipeOptions,
2243 realm: InRealm,
2244 can_gc: CanGc,
2245 ) -> Rc<Promise> {
2246 let cx = GlobalScope::get_cx();
2247 let global = self.global();
2248
2249 if self.is_locked() {
2251 let promise = Promise::new(&global, can_gc);
2253 promise.reject_error(Error::Type("Source stream is locked".to_owned()), can_gc);
2254 return promise;
2255 }
2256
2257 if destination.is_locked() {
2259 let promise = Promise::new(&global, can_gc);
2261 promise.reject_error(
2262 Error::Type("Destination stream is locked".to_owned()),
2263 can_gc,
2264 );
2265 return promise;
2266 }
2267
2268 let signal = options.signal.as_deref();
2270
2271 self.pipe_to(
2273 cx,
2274 &global,
2275 destination,
2276 options.preventClose,
2277 options.preventAbort,
2278 options.preventCancel,
2279 signal,
2280 realm,
2281 can_gc,
2282 )
2283 }
2284
2285 fn PipeThrough(
2287 &self,
2288 transform: &ReadableWritablePair,
2289 options: &StreamPipeOptions,
2290 realm: InRealm,
2291 can_gc: CanGc,
2292 ) -> Fallible<DomRoot<ReadableStream>> {
2293 let global = self.global();
2294 let cx = GlobalScope::get_cx();
2295
2296 if self.is_locked() {
2298 return Err(Error::Type("Source stream is locked".to_owned()));
2299 }
2300
2301 if transform.writable.is_locked() {
2303 return Err(Error::Type("Destination stream is locked".to_owned()));
2304 }
2305
2306 let signal = options.signal.as_deref();
2308
2309 let promise = self.pipe_to(
2312 cx,
2313 &global,
2314 &transform.writable,
2315 options.preventClose,
2316 options.preventAbort,
2317 options.preventCancel,
2318 signal,
2319 realm,
2320 can_gc,
2321 );
2322
2323 promise.set_promise_is_handled();
2325
2326 Ok(transform.readable.clone())
2328 }
2329}
2330
2331#[expect(unsafe_code)]
2332pub(crate) unsafe fn get_type_and_value_from_message(
2336 cx: SafeJSContext,
2337 data: SafeHandleValue,
2338 value: SafeMutableHandleValue,
2339 can_gc: CanGc,
2340) -> DOMString {
2341 assert!(data.is_object());
2347 rooted!(in(*cx) let data_object = data.to_object());
2348
2349 rooted!(in(*cx) let mut type_ = UndefinedValue());
2351 unsafe {
2352 get_dictionary_property(
2353 *cx,
2354 data_object.handle(),
2355 c"type",
2356 type_.handle_mut(),
2357 can_gc,
2358 )
2359 }
2360 .expect("Getting the type should not fail.");
2361
2362 unsafe { get_dictionary_property(*cx, data_object.handle(), c"value", value, can_gc) }
2364 .expect("Getting the value should not fail.");
2365
2366 let result =
2368 DOMString::safe_from_jsval(cx, type_.handle(), StringificationBehavior::Empty, can_gc)
2369 .expect("The type of the message should be a string");
2370 let ConversionResult::Success(type_string) = result else {
2371 unreachable!("The type of the message should be a string");
2372 };
2373
2374 type_string
2375}
2376
2377impl js::gc::Rootable for CrossRealmTransformReadable {}
2378
2379#[derive(Clone, JSTraceable, MallocSizeOf)]
2383#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
2384pub(crate) struct CrossRealmTransformReadable {
2385 controller: Dom<ReadableStreamDefaultController>,
2387}
2388
2389impl CrossRealmTransformReadable {
2390 #[expect(unsafe_code)]
2393 pub(crate) fn handle_message(
2394 &self,
2395 cx: SafeJSContext,
2396 global: &GlobalScope,
2397 port: &MessagePort,
2398 message: SafeHandleValue,
2399 _realm: InRealm,
2400 can_gc: CanGc,
2401 ) {
2402 rooted!(in(*cx) let mut value = UndefinedValue());
2403 let type_string =
2404 unsafe { get_type_and_value_from_message(cx, message, value.handle_mut(), can_gc) };
2405
2406 if type_string == "chunk" {
2408 self.controller
2410 .enqueue(cx, value.handle(), can_gc)
2411 .expect("Enqueing a chunk should not fail.");
2412 }
2413
2414 if type_string == "close" {
2416 self.controller.close(can_gc);
2418
2419 global.disentangle_port(port, can_gc);
2421 }
2422
2423 if type_string == "error" {
2425 self.controller.error(value.handle(), can_gc);
2427
2428 global.disentangle_port(port, can_gc);
2430 }
2431 }
2432
2433 pub(crate) fn handle_error(
2436 &self,
2437 cx: SafeJSContext,
2438 global: &GlobalScope,
2439 port: &MessagePort,
2440 _realm: InRealm,
2441 can_gc: CanGc,
2442 ) {
2443 let error = DOMException::new(global, DOMErrorName::DataCloneError, can_gc);
2445 rooted!(in(*cx) let mut rooted_error = UndefinedValue());
2446 error.safe_to_jsval(cx, rooted_error.handle_mut(), can_gc);
2447
2448 port.cross_realm_transform_send_error(rooted_error.handle(), can_gc);
2450
2451 self.controller.error(rooted_error.handle(), can_gc);
2453
2454 global.disentangle_port(port, can_gc);
2456 }
2457}
2458
2459#[expect(unsafe_code)]
2460pub(crate) fn get_read_promise_done(
2462 cx: SafeJSContext,
2463 v: &SafeHandleValue,
2464 can_gc: CanGc,
2465) -> Result<bool, Error> {
2466 if !v.is_object() {
2467 return Err(Error::Type("Unknown format for done property.".to_string()));
2468 }
2469 unsafe {
2470 rooted!(in(*cx) let object = v.to_object());
2471 rooted!(in(*cx) let mut done = UndefinedValue());
2472 match get_dictionary_property(*cx, object.handle(), c"done", done.handle_mut(), can_gc) {
2473 Ok(true) => match bool::safe_from_jsval(cx, done.handle(), (), can_gc) {
2474 Ok(ConversionResult::Success(val)) => Ok(val),
2475 Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.to_string())),
2476 _ => Err(Error::Type("Unknown format for done property.".to_string())),
2477 },
2478 Ok(false) => Err(Error::Type("Promise has no done property.".to_string())),
2479 Err(()) => Err(Error::JSFailed),
2480 }
2481 }
2482}
2483
2484#[expect(unsafe_code)]
2485pub(crate) fn get_read_promise_bytes(
2487 cx: SafeJSContext,
2488 v: &SafeHandleValue,
2489 can_gc: CanGc,
2490) -> Result<Vec<u8>, Error> {
2491 if !v.is_object() {
2492 return Err(Error::Type(
2493 "Unknown format for for bytes read.".to_string(),
2494 ));
2495 }
2496 unsafe {
2497 rooted!(in(*cx) let object = v.to_object());
2498 rooted!(in(*cx) let mut bytes = UndefinedValue());
2499 match get_dictionary_property(*cx, object.handle(), c"value", bytes.handle_mut(), can_gc) {
2500 Ok(true) => {
2501 match Vec::<u8>::safe_from_jsval(
2502 cx,
2503 bytes.handle(),
2504 ConversionBehavior::EnforceRange,
2505 can_gc,
2506 ) {
2507 Ok(ConversionResult::Success(val)) => Ok(val),
2508 Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.to_string())),
2509 _ => Err(Error::Type("Unknown format for bytes read.".to_string())),
2510 }
2511 },
2512 Ok(false) => Err(Error::Type("Promise has no value property.".to_string())),
2513 Err(()) => Err(Error::JSFailed),
2514 }
2515 }
2516}
2517
2518pub(crate) fn bytes_from_chunk_jsval(
2522 cx: SafeJSContext,
2523 chunk: &RootedTraceableBox<Heap<JSVal>>,
2524 can_gc: CanGc,
2525) -> Result<Vec<u8>, Error> {
2526 match Vec::<u8>::safe_from_jsval(cx, chunk.handle(), ConversionBehavior::EnforceRange, can_gc) {
2527 Ok(ConversionResult::Success(vec)) => Ok(vec),
2528 Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.to_string())),
2529 _ => Err(Error::Type("Unknown format for bytes read.".to_string())),
2530 }
2531}
2532
2533impl Transferable for ReadableStream {
2535 type Index = MessagePortIndex;
2536 type Data = MessagePortImpl;
2537
2538 fn transfer(&self) -> Fallible<(MessagePortId, MessagePortImpl)> {
2540 if self.is_locked() {
2543 return Err(Error::DataClone(None));
2544 }
2545
2546 let global = self.global();
2547 let realm = enter_realm(&*global);
2548 let comp = InRealm::Entered(&realm);
2549 let cx = GlobalScope::get_cx();
2550 let can_gc = CanGc::note();
2551
2552 let port_1 = MessagePort::new(&global, can_gc);
2554 global.track_message_port(&port_1, None);
2555
2556 let port_2 = MessagePort::new(&global, can_gc);
2558 global.track_message_port(&port_2, None);
2559
2560 global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
2562
2563 let writable = WritableStream::new_with_proto(&global, None, can_gc);
2565
2566 writable.setup_cross_realm_transform_writable(cx, &port_1, can_gc);
2568
2569 let promise = self.pipe_to(
2571 cx, &global, &writable, false, false, false, None, comp, can_gc,
2572 );
2573
2574 promise.set_promise_is_handled();
2576
2577 port_2.transfer()
2579 }
2580
2581 fn transfer_receive(
2583 owner: &GlobalScope,
2584 id: MessagePortId,
2585 port_impl: MessagePortImpl,
2586 ) -> Result<DomRoot<Self>, ()> {
2587 let cx = GlobalScope::get_cx();
2588 let can_gc = CanGc::note();
2589
2590 let value = ReadableStream::new_with_proto(owner, None, can_gc);
2593
2594 let transferred_port = MessagePort::transfer_receive(owner, id, port_impl)?;
2601
2602 value.setup_cross_realm_transform_readable(cx, &transferred_port, can_gc);
2604 Ok(value)
2605 }
2606
2607 fn serialized_storage<'a>(
2609 data: StructuredData<'a, '_>,
2610 ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
2611 match data {
2612 StructuredData::Reader(r) => &mut r.port_impls,
2613 StructuredData::Writer(w) => &mut w.ports,
2614 }
2615 }
2616}