1use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr::{self};
8use std::rc::Rc;
9
10use servo_base::generic_channel::GenericSharedMemory;
11use servo_base::id::{MessagePortId, MessagePortIndex};
12use servo_constellation_traits::MessagePortImpl;
13use dom_struct::dom_struct;
14use js::context::JSContext;
15use js::conversions::{FromJSValConvertible, ToJSValConvertible};
16use js::jsapi::{Heap, JSObject};
17use js::jsval::{JSVal, ObjectValue, UndefinedValue};
18use js::realm::CurrentRealm;
19use js::rust::{
20 HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
21 MutableHandleValue as SafeMutableHandleValue,
22};
23use js::typedarray::{ArrayBufferViewU8, Uint8};
24use rustc_hash::FxHashMap;
25
26use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
27use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{
28 ReadableStreamGetReaderOptions, ReadableStreamMethods, ReadableStreamReaderMode,
29 ReadableWritablePair, StreamPipeOptions,
30};
31use script_bindings::str::DOMString;
32
33use crate::dom::domexception::{DOMErrorName, DOMException};
34use script_bindings::conversions::{is_array_like, StringificationBehavior};
35use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
36use crate::dom::abortsignal::{AbortAlgorithm, AbortSignal};
37use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods;
38use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultController_Binding::ReadableStreamDefaultControllerMethods;
39use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource;
40use crate::dom::bindings::conversions::{ConversionBehavior, ConversionResult, get_property, get_property_jsval};
41use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
42use crate::dom::bindings::codegen::GenericBindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriter_Binding::WritableStreamDefaultWriterMethods;
43use crate::dom::stream::writablestream::WritableStream;
44use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultReaderOrReadableStreamBYOBReader as ReadableStreamReader;
45use crate::dom::bindings::reflector::DomGlobal;
46use script_bindings::reflector::{Reflector, reflect_dom_object_with_proto};
47use crate::dom::bindings::root::{DomRoot, MutNullableDom, Dom};
48use crate::dom::bindings::trace::RootedTraceableBox;
49use crate::dom::stream::byteteeunderlyingsource::{ByteTeeCancelAlgorithm, ByteTeePullAlgorithm, ByteTeeUnderlyingSource};
50use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
51use crate::dom::stream::readablestreamgenericreader::ReadableStreamGenericReader;
52use crate::dom::globalscope::GlobalScope;
53use crate::dom::promise::{wait_for_all_promise, Promise};
54use crate::dom::stream::readablebytestreamcontroller::ReadableByteStreamController;
55use crate::dom::stream::readablestreambyobreader::ReadableStreamBYOBReader;
56use crate::dom::stream::readablestreamdefaultcontroller::ReadableStreamDefaultController;
57use crate::dom::stream::readablestreamdefaultreader::{ReadRequest, ReadableStreamDefaultReader};
58use crate::dom::stream::defaultteeunderlyingsource::DefaultTeeCancelAlgorithm;
59use crate::dom::types::DefaultTeeUnderlyingSource;
60use crate::dom::stream::underlyingsourcecontainer::UnderlyingSourceType;
61use crate::dom::stream::writablestreamdefaultwriter::WritableStreamDefaultWriter;
62use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
63use crate::dom::messageport::MessagePort;
64use crate::realms::{enter_auto_realm};
65use crate::script_runtime::CanGc;
66use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
67use crate::dom::bindings::transferable::Transferable;
68use crate::dom::bindings::structuredclone::StructuredData;
69
70use crate::dom::bindings::buffer_source::{BufferSource, HeapBufferSource, create_buffer_source};
71use super::readablestreambyobreader::ReadIntoRequest;
72
73#[derive(Clone, Debug, Default, MallocSizeOf, PartialEq)]
75enum PipeToState {
76 #[default]
78 Starting,
79 PendingReady,
81 PendingRead,
83 ShuttingDownWithPendingWrites(Option<ShutdownAction>),
86 ShuttingDownPendingAction,
90 Finalized,
93}
94
95#[derive(Clone, Debug, MallocSizeOf, PartialEq)]
97enum ShutdownAction {
98 WritableStreamAbort,
100 ReadableStreamCancel,
102 WritableStreamDefaultWriterCloseWithErrorPropagation,
104 Abort,
106}
107
108impl js::gc::Rootable for PipeTo {}
109
110#[derive(Clone, JSTraceable, MallocSizeOf)]
119#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
120pub(crate) struct PipeTo {
121 reader: Dom<ReadableStreamDefaultReader>,
123
124 writer: Dom<WritableStreamDefaultWriter>,
126
127 #[ignore_malloc_size_of = "nested Rc"]
130 pending_writes: Rc<RefCell<VecDeque<Rc<Promise>>>>,
131
132 #[conditional_malloc_size_of]
134 #[no_trace]
135 state: Rc<RefCell<PipeToState>>,
136
137 prevent_abort: bool,
139
140 prevent_cancel: bool,
142
143 prevent_close: bool,
145
146 #[conditional_malloc_size_of]
149 shutting_down: Rc<Cell<bool>>,
150
151 #[ignore_malloc_size_of = "mozjs"]
154 abort_reason: Rc<Heap<JSVal>>,
155
156 #[ignore_malloc_size_of = "mozjs"]
159 shutdown_error: Rc<RefCell<Option<Heap<JSVal>>>>,
160
161 #[ignore_malloc_size_of = "nested Rc"]
164 shutdown_action_promise: Rc<RefCell<Option<Rc<Promise>>>>,
165
166 #[conditional_malloc_size_of]
169 result_promise: Rc<Promise>,
170}
171
172impl PipeTo {
173 pub(crate) fn abort_with_reason(
176 &self,
177 cx: &mut CurrentRealm,
178 global: &GlobalScope,
179 reason: SafeHandleValue,
180 ) {
181 if self.shutting_down.get() {
183 return;
184 }
185
186 self.abort_reason.set(reason.get());
190
191 self.set_shutdown_error(reason);
196
197 self.shutdown(cx, global, Some(ShutdownAction::Abort));
203 }
204}
205
206impl Callback for PipeTo {
207 fn callback(&self, cx: &mut CurrentRealm, result: SafeHandleValue) {
216 let global = self.reader.global();
217
218 self.pending_writes.borrow_mut().retain(|p| {
224 let pending = p.is_pending();
225 if !pending {
226 p.set_promise_is_handled();
227 }
228 pending
229 });
230
231 let state_before_checks = self.state.borrow().clone();
233
234 if state_before_checks == PipeToState::PendingRead {
241 let source = self.reader.get_stream().expect("Source stream must be set");
242 if source.is_closed() {
243 let dest = self
244 .writer
245 .get_stream()
246 .expect("Destination stream must be set");
247
248 if dest.is_writable() && !dest.close_queued_or_in_flight() {
251 let Ok(done) = get_read_promise_done(cx, &result) else {
252 return;
259 };
260
261 if !done {
262 self.write_chunk(cx, &global, result);
264 }
265 }
266 }
267 }
268
269 self.check_and_propagate_errors_forward(cx, &global);
270 self.check_and_propagate_errors_backward(cx, &global);
271 self.check_and_propagate_closing_forward(cx, &global);
272 self.check_and_propagate_closing_backward(cx, &global);
273
274 let state = self.state.borrow().clone();
276
277 if state != state_before_checks {
281 return;
282 }
283
284 match state {
285 PipeToState::Starting => unreachable!("PipeTo should not be in the Starting state."),
286 PipeToState::PendingReady => {
287 self.read_chunk(cx, &global);
289 },
290 PipeToState::PendingRead => {
291 self.write_chunk(cx, &global, result);
293
294 if self.shutting_down.get() {
296 return;
297 }
298
299 self.wait_for_writer_ready(cx, &global);
301 },
302 PipeToState::ShuttingDownWithPendingWrites(action) => {
303 if let Some(write) = self.pending_writes.borrow_mut().front().cloned() {
306 self.wait_on_pending_write(cx, &global, write);
307 return;
308 }
309
310 if let Some(action) = action {
312 self.perform_action(cx, &global, action);
314 } else {
315 self.finalize(cx, &global);
317 }
318 },
319 PipeToState::ShuttingDownPendingAction => {
320 let Some(ref promise) = *self.shutdown_action_promise.borrow() else {
321 unreachable!();
322 };
323 if promise.is_pending() {
324 return;
328 }
329
330 let is_array_like = {
331 if !result.is_object() {
332 false
333 } else {
334 is_array_like::<crate::DomTypeHolder>(cx, result)
335 }
336 };
337
338 if !result.is_undefined() && !is_array_like {
340 self.set_shutdown_error(result);
350 }
351 self.finalize(cx, &global);
352 },
353 PipeToState::Finalized => {},
354 }
355 }
356}
357
358impl PipeTo {
359 fn set_shutdown_error(&self, error: SafeHandleValue) {
362 *self.shutdown_error.borrow_mut() = Some(Heap::default());
363 let Some(ref heap) = *self.shutdown_error.borrow() else {
364 unreachable!("Option set to Some(heap) above.");
365 };
366 heap.set(error.get())
367 }
368
369 fn wait_for_writer_ready(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
372 {
373 let mut state = self.state.borrow_mut();
374 *state = PipeToState::PendingReady;
375 }
376
377 let ready_promise = self.writer.Ready();
378 if ready_promise.is_fulfilled() {
379 self.read_chunk(cx, global);
380 } else {
381 let handler = PromiseNativeHandler::new(
382 cx,
383 global,
384 Some(Box::new(self.clone())),
385 Some(Box::new(self.clone())),
386 );
387 ready_promise.append_native_handler(cx, &handler);
388
389 let closed_promise = self.reader.Closed();
393 closed_promise.append_native_handler(cx, &handler);
394 }
395 }
396
397 fn read_chunk(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
399 *self.state.borrow_mut() = PipeToState::PendingRead;
400 let chunk_promise = self.reader.Read(cx);
401 let handler = PromiseNativeHandler::new(
402 cx,
403 global,
404 Some(Box::new(self.clone())),
405 Some(Box::new(self.clone())),
406 );
407 chunk_promise.append_native_handler(cx, &handler);
408
409 let ready_promise = self.writer.Closed();
412 ready_promise.append_native_handler(cx, &handler);
413 }
414
415 fn write_chunk(
418 &self,
419 cx: &mut JSContext,
420 global: &GlobalScope,
421 chunk: SafeHandleValue,
422 ) -> bool {
423 if chunk.is_object() {
424 rooted!(&in(cx) let object = chunk.to_object());
425 rooted!(&in(cx) let mut bytes = UndefinedValue());
426 get_property_jsval(cx, object.handle(), c"value", bytes.handle_mut())
427 .expect("Chunk should have a value.");
428
429 let write_promise = self.writer.write(cx, global, bytes.handle());
431 self.pending_writes.borrow_mut().push_back(write_promise);
432 return true;
433 }
434 false
435 }
436
437 fn wait_on_pending_write(
441 &self,
442 cx: &mut CurrentRealm,
443 global: &GlobalScope,
444 promise: Rc<Promise>,
445 ) {
446 let handler = PromiseNativeHandler::new(
447 cx,
448 global,
449 Some(Box::new(self.clone())),
450 Some(Box::new(self.clone())),
451 );
452 promise.append_native_handler(cx, &handler);
453 }
454
455 fn check_and_propagate_errors_forward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
458 if self.shutting_down.get() {
461 return;
462 }
463
464 let source = self
466 .reader
467 .get_stream()
468 .expect("Reader should still have a stream");
469 if source.is_errored() {
470 rooted!(&in(cx) let mut source_error = UndefinedValue());
471 source.get_stored_error(source_error.handle_mut());
472 self.set_shutdown_error(source_error.handle());
473
474 if !self.prevent_abort {
476 self.shutdown(cx, global, Some(ShutdownAction::WritableStreamAbort))
479 } else {
480 self.shutdown(cx, global, None);
482 }
483 }
484 }
485
486 fn check_and_propagate_errors_backward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
489 if self.shutting_down.get() {
492 return;
493 }
494
495 let dest = self
497 .writer
498 .get_stream()
499 .expect("Writer should still have a stream");
500 if dest.is_errored() {
501 rooted!(&in(cx) let mut dest_error = UndefinedValue());
502 dest.get_stored_error(dest_error.handle_mut());
503 self.set_shutdown_error(dest_error.handle());
504
505 if !self.prevent_cancel {
507 self.shutdown(cx, global, Some(ShutdownAction::ReadableStreamCancel))
510 } else {
511 self.shutdown(cx, global, None);
513 }
514 }
515 }
516
517 fn check_and_propagate_closing_forward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
520 if self.shutting_down.get() {
523 return;
524 }
525
526 let source = self
528 .reader
529 .get_stream()
530 .expect("Reader should still have a stream");
531 if source.is_closed() {
532 if !self.prevent_close {
534 self.shutdown(
537 cx,
538 global,
539 Some(ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation),
540 )
541 } else {
542 self.shutdown(cx, global, None);
544 }
545 }
546 }
547
548 fn check_and_propagate_closing_backward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
551 if self.shutting_down.get() {
554 return;
555 }
556
557 let dest = self
560 .writer
561 .get_stream()
562 .expect("Writer should still have a stream");
563 if dest.close_queued_or_in_flight() || dest.is_closed() {
564 rooted!(&in(cx) let mut dest_closed = UndefinedValue());
569 let error =
570 Error::Type(c"Destination is closed or has closed queued or in flight".to_owned());
571 error.to_jsval(
572 cx.into(),
573 global,
574 dest_closed.handle_mut(),
575 CanGc::from_cx(cx),
576 );
577 self.set_shutdown_error(dest_closed.handle());
578
579 if !self.prevent_cancel {
581 self.shutdown(cx, global, Some(ShutdownAction::ReadableStreamCancel))
584 } else {
585 self.shutdown(cx, global, None);
587 }
588 }
589 }
590
591 fn shutdown(
595 &self,
596 cx: &mut CurrentRealm,
597 global: &GlobalScope,
598 action: Option<ShutdownAction>,
599 ) {
600 if !self.shutting_down.replace(true) {
603 let dest = self.writer.get_stream().expect("Stream must be set");
604 if dest.is_writable() && !dest.close_queued_or_in_flight() {
607 if let Some(write) = self.pending_writes.borrow_mut().front() {
613 *self.state.borrow_mut() = PipeToState::ShuttingDownWithPendingWrites(action);
614 self.wait_on_pending_write(cx, global, write.clone());
615 return;
616 }
617 }
618
619 if let Some(action) = action {
621 self.perform_action(cx, global, action);
623 } else {
624 self.finalize(cx, global);
626 }
627 }
628 }
629
630 fn perform_action(&self, cx: &mut CurrentRealm, global: &GlobalScope, action: ShutdownAction) {
633 rooted!(&in(cx) let mut error = UndefinedValue());
634 if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
635 error.set(shutdown_error.get());
636 }
637
638 *self.state.borrow_mut() = PipeToState::ShuttingDownPendingAction;
639
640 let promise = match action {
642 ShutdownAction::WritableStreamAbort => {
643 let dest = self.writer.get_stream().expect("Stream must be set");
644 dest.abort(cx, global, error.handle())
645 },
646 ShutdownAction::ReadableStreamCancel => {
647 let source = self
648 .reader
649 .get_stream()
650 .expect("Reader should have a stream.");
651 source.cancel(cx, global, error.handle())
652 },
653 ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation => {
654 self.writer.close_with_error_propagation(cx, global)
655 },
656 ShutdownAction::Abort => {
657 rooted!(&in(cx) let mut error = UndefinedValue());
662 error.set(self.abort_reason.get());
663
664 let mut actions = vec![];
666
667 if !self.prevent_abort {
669 let dest = self
670 .writer
671 .get_stream()
672 .expect("Destination stream must be set");
673
674 let promise = if dest.is_writable() {
676 dest.abort(cx, global, error.handle())
678 } else {
679 Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
681 };
682 actions.push(promise);
683 }
684
685 if !self.prevent_cancel {
687 let source = self.reader.get_stream().expect("Source stream must be set");
688
689 let promise = if source.is_readable() {
691 source.cancel(cx, global, error.handle())
693 } else {
694 Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
696 };
697 actions.push(promise);
698 }
699
700 wait_for_all_promise(cx, global, actions)
704 },
705 };
706
707 let handler = PromiseNativeHandler::new(
710 cx,
711 global,
712 Some(Box::new(self.clone())),
713 Some(Box::new(self.clone())),
714 );
715 promise.append_native_handler(cx, &handler);
716 *self.shutdown_action_promise.borrow_mut() = Some(promise);
717 }
718
719 fn finalize(&self, cx: &mut JSContext, global: &GlobalScope) {
721 *self.state.borrow_mut() = PipeToState::Finalized;
722
723 self.writer.release(cx.into(), global, CanGc::from_cx(cx));
725
726 self.reader
732 .release(cx)
733 .expect("Releasing the reader should not fail");
734
735 if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
741 rooted!(&in(cx) let mut error = UndefinedValue());
742 error.set(shutdown_error.get());
743 self.result_promise
745 .reject_native_with_cx(cx, &error.handle());
746 } else {
747 self.result_promise.resolve_native_with_cx(cx, &());
749 }
750 }
751}
752
753#[derive(Clone, JSTraceable, MallocSizeOf)]
756struct SourceCancelPromiseFulfillmentHandler {
757 #[conditional_malloc_size_of]
758 result: Rc<Promise>,
759}
760
761impl Callback for SourceCancelPromiseFulfillmentHandler {
762 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
766 self.result.resolve_native_with_cx(cx, &());
767 }
768}
769
770#[derive(Clone, JSTraceable, MallocSizeOf)]
773struct SourceCancelPromiseRejectionHandler {
774 #[conditional_malloc_size_of]
775 result: Rc<Promise>,
776}
777
778impl Callback for SourceCancelPromiseRejectionHandler {
779 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
783 self.result.reject_native_with_cx(cx, &v);
784 }
785}
786
787#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf, PartialEq)]
789pub(crate) enum ReadableStreamState {
790 #[default]
791 Readable,
792 Closed,
793 Errored,
794}
795
796#[derive(JSTraceable, MallocSizeOf)]
798#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
799pub(crate) enum ControllerType {
800 Byte(MutNullableDom<ReadableByteStreamController>),
802 Default(MutNullableDom<ReadableStreamDefaultController>),
804}
805
806#[derive(JSTraceable, MallocSizeOf)]
808#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
809pub(crate) enum ReaderType {
810 #[allow(clippy::upper_case_acronyms)]
812 BYOB(MutNullableDom<ReadableStreamBYOBReader>),
813 Default(MutNullableDom<ReadableStreamDefaultReader>),
815}
816
817impl Eq for ReaderType {}
818impl PartialEq for ReaderType {
819 fn eq(&self, other: &Self) -> bool {
820 matches!(
821 (self, other),
822 (ReaderType::BYOB(_), ReaderType::BYOB(_)) |
823 (ReaderType::Default(_), ReaderType::Default(_))
824 )
825 }
826}
827
828pub(crate) fn create_readable_stream(
830 cx: &mut JSContext,
831 global: &GlobalScope,
832 underlying_source_type: UnderlyingSourceType,
833 queuing_strategy: Option<Rc<QueuingStrategySize>>,
834 high_water_mark: Option<f64>,
835) -> DomRoot<ReadableStream> {
836 let high_water_mark = high_water_mark.unwrap_or(1.0);
838
839 let size_algorithm = queuing_strategy.unwrap_or(extract_size_algorithm(
841 &QueuingStrategy::empty(),
842 CanGc::from_cx(cx),
843 ));
844
845 assert!(high_water_mark >= 0.0);
847
848 let stream = ReadableStream::new_with_proto(global, None, CanGc::from_cx(cx));
851
852 let controller = ReadableStreamDefaultController::new(
854 global,
855 underlying_source_type,
856 high_water_mark,
857 size_algorithm,
858 CanGc::from_cx(cx),
859 );
860
861 controller
864 .setup(cx, stream.clone())
865 .expect("Setup of default controller cannot fail");
866
867 stream
869}
870
871fn readable_byte_stream_tee(
873 cx: &mut JSContext,
874 global: &GlobalScope,
875 underlying_source_type: UnderlyingSourceType,
876) -> DomRoot<ReadableStream> {
877 let tee_stream = ReadableStream::new_with_proto(global, None, CanGc::from_cx(cx));
880
881 let controller =
883 ReadableByteStreamController::new(underlying_source_type, 0.0, global, CanGc::from_cx(cx));
884
885 controller
887 .setup(cx, global, tee_stream.clone())
888 .expect("Setup of byte stream controller cannot fail");
889
890 tee_stream
892}
893
894#[dom_struct]
896pub(crate) struct ReadableStream {
897 reflector_: Reflector,
898
899 controller: RefCell<Option<ControllerType>>,
903
904 #[ignore_malloc_size_of = "mozjs"]
906 stored_error: Heap<JSVal>,
907
908 disturbed: Cell<bool>,
910
911 reader: RefCell<Option<ReaderType>>,
913
914 state: Cell<ReadableStreamState>,
916}
917
918impl ReadableStream {
919 fn new_inherited() -> ReadableStream {
921 ReadableStream {
922 reflector_: Reflector::new(),
923 controller: RefCell::new(None),
924 stored_error: Heap::default(),
925 disturbed: Default::default(),
926 reader: RefCell::new(None),
927 state: Cell::new(Default::default()),
928 }
929 }
930
931 pub(crate) fn new_with_proto(
932 global: &GlobalScope,
933 proto: Option<SafeHandleObject>,
934 can_gc: CanGc,
935 ) -> DomRoot<ReadableStream> {
936 reflect_dom_object_with_proto(
937 Box::new(ReadableStream::new_inherited()),
938 global,
939 proto,
940 can_gc,
941 )
942 }
943
944 pub(crate) fn set_default_controller(&self, controller: &ReadableStreamDefaultController) {
947 *self.controller.borrow_mut() = Some(ControllerType::Default(MutNullableDom::new(Some(
948 controller,
949 ))));
950 }
951
952 pub(crate) fn set_byte_controller(&self, controller: &ReadableByteStreamController) {
955 *self.controller.borrow_mut() =
956 Some(ControllerType::Byte(MutNullableDom::new(Some(controller))));
957 }
958
959 pub(crate) fn assert_no_controller(&self) {
962 let has_no_controller = self.controller.borrow().is_none();
963 assert!(has_no_controller);
964 }
965
966 pub(crate) fn new_from_bytes(
968 cx: &mut JSContext,
969 global: &GlobalScope,
970 bytes: Vec<u8>,
971 ) -> Fallible<DomRoot<ReadableStream>> {
972 let stream = ReadableStream::new_with_external_underlying_source(
973 cx,
974 global,
975 UnderlyingSourceType::Memory(bytes.len()),
976 )?;
977 stream.enqueue_native(cx, bytes);
978 stream.controller_close_native(cx);
979 Ok(stream)
980 }
981
982 pub(crate) fn new_from_bytes_with_byte_reading_support(
984 cx: &mut JSContext,
985 global: &GlobalScope,
986 bytes: Vec<u8>,
987 ) -> Fallible<DomRoot<ReadableStream>> {
988 let stream = ReadableStream::new_with_external_underlying_byte_source(
989 cx,
990 global,
991 UnderlyingSourceType::Memory(bytes.len()),
992 )?;
993 stream.enqueue_native(cx, bytes);
994 stream.controller_close_native(cx);
995 Ok(stream)
996 }
997
998 pub(crate) fn new_with_external_underlying_source(
1001 cx: &mut JSContext,
1002 global: &GlobalScope,
1003 source: UnderlyingSourceType,
1004 ) -> Fallible<DomRoot<ReadableStream>> {
1005 assert!(source.is_native());
1006 let stream = ReadableStream::new_with_proto(global, None, CanGc::from_cx(cx));
1007 let controller = ReadableStreamDefaultController::new(
1008 global,
1009 source,
1010 1.0,
1011 extract_size_algorithm(&QueuingStrategy::empty(), CanGc::from_cx(cx)),
1012 CanGc::from_cx(cx),
1013 );
1014 controller.setup(cx, stream.clone())?;
1015 Ok(stream)
1016 }
1017
1018 pub(crate) fn new_with_external_underlying_byte_source(
1020 cx: &mut JSContext,
1021 global: &GlobalScope,
1022 source: UnderlyingSourceType,
1023 ) -> Fallible<DomRoot<ReadableStream>> {
1024 assert!(source.is_native());
1025 let stream = ReadableStream::new_with_proto(global, None, CanGc::from_cx(cx));
1026 let controller = ReadableByteStreamController::new(source, 0.0, global, CanGc::from_cx(cx));
1027 controller.setup(cx, global, stream.clone())?;
1028 Ok(stream)
1029 }
1030
1031 pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
1033 match self.controller.borrow().as_ref() {
1034 Some(ControllerType::Default(controller)) => {
1035 let controller = controller
1036 .get()
1037 .ok_or_else(|| Error::Type(c"Stream should have controller.".to_owned()))?;
1038 controller.perform_release_steps()
1039 },
1040 Some(ControllerType::Byte(controller)) => {
1041 let controller = controller
1042 .get()
1043 .ok_or_else(|| Error::Type(c"Stream should have controller.".to_owned()))?;
1044 controller.perform_release_steps()
1045 },
1046 None => Err(Error::Type(c"Stream should have controller.".to_owned())),
1047 }
1048 }
1049
1050 pub(crate) fn perform_pull_steps(&self, cx: &mut JSContext, read_request: &ReadRequest) {
1054 match self.controller.borrow().as_ref() {
1055 Some(ControllerType::Default(controller)) => controller
1056 .get()
1057 .expect("Stream should have controller.")
1058 .perform_pull_steps(cx, read_request),
1059 Some(ControllerType::Byte(controller)) => controller
1060 .get()
1061 .expect("Stream should have controller.")
1062 .perform_pull_steps(cx, read_request),
1063 None => {
1064 unreachable!("Stream does not have a controller.");
1065 },
1066 }
1067 }
1068
1069 pub(crate) fn perform_pull_into(
1073 &self,
1074 cx: &mut JSContext,
1075 read_into_request: &ReadIntoRequest,
1076 view: &HeapBufferSource<ArrayBufferViewU8>,
1077 min: u64,
1078 ) {
1079 match self.controller.borrow().as_ref() {
1080 Some(ControllerType::Byte(controller)) => controller
1081 .get()
1082 .expect("Stream should have controller.")
1083 .perform_pull_into(cx, read_into_request, view, min),
1084 _ => {
1085 unreachable!(
1086 "Pulling a chunk from a stream with a default controller using a BYOB reader"
1087 )
1088 },
1089 }
1090 }
1091
1092 pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
1094 match self.reader.borrow().as_ref() {
1095 Some(ReaderType::Default(reader)) => {
1096 let Some(reader) = reader.get() else {
1097 panic!("Attempt to add a read request without having first acquired a reader.");
1098 };
1099
1100 assert!(self.is_readable());
1102
1103 reader.add_read_request(read_request);
1105 },
1106 _ => {
1107 unreachable!("Adding a read request can only be done on a default reader.")
1108 },
1109 }
1110 }
1111
1112 pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
1114 match self.reader.borrow().as_ref() {
1115 Some(ReaderType::BYOB(reader)) => {
1117 let Some(reader) = reader.get() else {
1118 unreachable!(
1119 "Attempt to add a read into request without having first acquired a reader."
1120 );
1121 };
1122
1123 assert!(self.is_readable() || self.is_closed());
1125
1126 reader.add_read_into_request(read_request);
1128 },
1129 _ => {
1130 unreachable!("Adding a read into request can only be done on a BYOB reader.")
1131 },
1132 }
1133 }
1134
1135 pub(crate) fn enqueue_native(&self, cx: &mut JSContext, bytes: Vec<u8>) {
1137 match self.controller.borrow().as_ref() {
1138 Some(ControllerType::Default(controller)) => controller
1139 .get()
1140 .expect("Stream should have controller.")
1141 .enqueue_native(cx, bytes),
1142 Some(ControllerType::Byte(controller)) => {
1143 if bytes.is_empty() {
1144 return;
1145 }
1146
1147 let controller = controller.get().expect("Stream should have controller.");
1148 rooted!(&in(cx) let mut chunk_object = ptr::null_mut::<JSObject>());
1149 create_buffer_source::<Uint8>(
1150 cx.into(),
1151 &bytes,
1152 chunk_object.handle_mut(),
1153 CanGc::from_cx(cx),
1154 )
1155 .expect("failed to create buffer source for native byte chunk.");
1156
1157 let chunk = RootedTraceableBox::new(HeapBufferSource::<ArrayBufferViewU8>::new(
1158 BufferSource::ArrayBufferView(Heap::boxed(*chunk_object.handle())),
1159 ));
1160 controller
1161 .enqueue(cx, chunk)
1162 .expect("Enqueuing a native byte chunk should not fail.");
1163 },
1164 _ => {
1165 unreachable!("Enqueueing chunk to a stream from Rust without a controller");
1166 },
1167 }
1168 }
1169
1170 pub(crate) fn error(&self, cx: &mut JSContext, e: SafeHandleValue) {
1172 assert!(self.is_readable());
1174
1175 self.state.set(ReadableStreamState::Errored);
1177
1178 self.stored_error.set(e.get());
1180
1181 let default_reader = {
1184 let reader_ref = self.reader.borrow();
1185 match reader_ref.as_ref() {
1186 Some(ReaderType::Default(reader)) => reader.get(),
1187 _ => None,
1188 }
1189 };
1190
1191 if let Some(reader) = default_reader {
1192 reader.error(cx, e);
1194 return;
1195 }
1196
1197 let byob_reader = {
1198 let reader_ref = self.reader.borrow();
1199 match reader_ref.as_ref() {
1200 Some(ReaderType::BYOB(reader)) => reader.get(),
1201 _ => None,
1202 }
1203 };
1204
1205 if let Some(reader) = byob_reader {
1206 reader.error_read_into_requests(e, CanGc::from_cx(cx));
1208 }
1209
1210 }
1212
1213 pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
1215 handle_mut.set(self.stored_error.get());
1216 }
1217
1218 pub(crate) fn error_native(&self, cx: &mut JSContext, error: Error) {
1221 rooted!(&in(cx) let mut error_val = UndefinedValue());
1222 error.to_jsval(
1223 cx.into(),
1224 &self.global(),
1225 error_val.handle_mut(),
1226 CanGc::from_cx(cx),
1227 );
1228 self.error(cx, error_val.handle());
1229 }
1230
1231 pub(crate) fn controller_close_native(&self, cx: &mut JSContext) {
1234 match self.controller.borrow().as_ref() {
1235 Some(ControllerType::Default(controller)) => {
1236 let _ = controller
1237 .get()
1238 .expect("Stream should have controller.")
1239 .Close(cx);
1240 },
1241 Some(ControllerType::Byte(controller)) => {
1242 let _ = controller
1243 .get()
1244 .expect("Stream should have controller.")
1245 .close(cx);
1246 },
1247 _ => {
1248 unreachable!("Native closing requires a stream controller.")
1249 },
1250 }
1251 }
1252
1253 pub(crate) fn in_memory(&self) -> bool {
1256 match self.controller.borrow().as_ref() {
1257 Some(ControllerType::Default(controller)) => controller
1258 .get()
1259 .expect("Stream should have controller.")
1260 .in_memory(),
1261 Some(ControllerType::Byte(controller)) => controller
1262 .get()
1263 .expect("Stream should have controller.")
1264 .in_memory(),
1265 _ => unreachable!("Checking if source is in memory for a stream without a controller"),
1266 }
1267 }
1268
1269 pub(crate) fn get_in_memory_bytes(&self) -> Option<GenericSharedMemory> {
1272 match self.controller.borrow().as_ref() {
1273 Some(ControllerType::Default(controller)) => controller
1274 .get()
1275 .expect("Stream should have controller.")
1276 .get_in_memory_bytes()
1277 .map(GenericSharedMemory::from_vec),
1278 Some(ControllerType::Byte(controller)) => controller
1279 .get()
1280 .expect("Stream should have controller.")
1281 .get_in_memory_bytes()
1282 .map(GenericSharedMemory::from_vec),
1283 _ => unreachable!("Getting in-memory bytes for a stream without a controller"),
1284 }
1285 }
1286
1287 pub(crate) fn acquire_default_reader(
1292 &self,
1293 can_gc: CanGc,
1294 ) -> Fallible<DomRoot<ReadableStreamDefaultReader>> {
1295 let reader = ReadableStreamDefaultReader::new(&self.global(), can_gc);
1297
1298 reader.set_up(self, &self.global(), can_gc)?;
1300
1301 Ok(reader)
1303 }
1304
1305 pub(crate) fn acquire_byob_reader(
1307 &self,
1308 can_gc: CanGc,
1309 ) -> Fallible<DomRoot<ReadableStreamBYOBReader>> {
1310 let reader = ReadableStreamBYOBReader::new(&self.global(), can_gc);
1312 reader.set_up(self, &self.global(), can_gc)?;
1314
1315 Ok(reader)
1317 }
1318
1319 pub(crate) fn get_default_controller(&self) -> DomRoot<ReadableStreamDefaultController> {
1320 match self.controller.borrow().as_ref() {
1321 Some(ControllerType::Default(controller)) => {
1322 controller.get().expect("Stream should have controller.")
1323 },
1324 _ => {
1325 unreachable!(
1326 "Getting default controller for a stream with a non-default controller"
1327 )
1328 },
1329 }
1330 }
1331
1332 pub(crate) fn get_byte_controller(&self) -> DomRoot<ReadableByteStreamController> {
1333 match self.controller.borrow().as_ref() {
1334 Some(ControllerType::Byte(controller)) => {
1335 controller.get().expect("Stream should have controller.")
1336 },
1337 _ => {
1338 unreachable!("Getting byte controller for a stream with a non-byte controller")
1339 },
1340 }
1341 }
1342
1343 pub(crate) fn get_default_reader(&self) -> DomRoot<ReadableStreamDefaultReader> {
1344 match self.reader.borrow().as_ref() {
1345 Some(ReaderType::Default(reader)) => reader.get().expect("Stream should have reader."),
1346 _ => {
1347 unreachable!("Getting default reader for a stream with a non-default reader")
1348 },
1349 }
1350 }
1351
1352 pub(crate) fn read_a_chunk(&self, cx: &mut JSContext) -> Rc<Promise> {
1358 match self.reader.borrow().as_ref() {
1359 Some(ReaderType::Default(reader)) => {
1360 let Some(reader) = reader.get() else {
1361 unreachable!(
1362 "Attempt to read stream chunk without having first acquired a reader."
1363 );
1364 };
1365 reader.Read(cx)
1366 },
1367 _ => {
1368 unreachable!("Native reading of a chunk can only be done with a default reader.")
1369 },
1370 }
1371 }
1372
1373 pub(crate) fn stop_reading(&self, cx: &mut JSContext) {
1378 let reader_ref = self.reader.borrow();
1379
1380 match reader_ref.as_ref() {
1381 Some(ReaderType::Default(reader)) => {
1382 let Some(reader) = reader.get() else {
1383 unreachable!("Attempt to stop reading without having first acquired a reader.");
1384 };
1385
1386 drop(reader_ref);
1387 reader.release(cx).expect("Reader release cannot fail.");
1388 },
1389 _ => {
1390 unreachable!("Native stop reading can only be done with a default reader.")
1391 },
1392 }
1393 }
1394
1395 pub(crate) fn is_locked(&self) -> bool {
1397 match self.reader.borrow().as_ref() {
1398 Some(ReaderType::Default(reader)) => reader.get().is_some(),
1399 Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1400 None => false,
1401 }
1402 }
1403
1404 pub(crate) fn is_disturbed(&self) -> bool {
1405 self.disturbed.get()
1406 }
1407
1408 pub(crate) fn set_is_disturbed(&self, disturbed: bool) {
1409 self.disturbed.set(disturbed);
1410 }
1411
1412 pub(crate) fn is_closed(&self) -> bool {
1413 self.state.get() == ReadableStreamState::Closed
1414 }
1415
1416 pub(crate) fn is_errored(&self) -> bool {
1417 self.state.get() == ReadableStreamState::Errored
1418 }
1419
1420 pub(crate) fn is_readable(&self) -> bool {
1421 self.state.get() == ReadableStreamState::Readable
1422 }
1423
1424 pub(crate) fn has_default_reader(&self) -> bool {
1425 match self.reader.borrow().as_ref() {
1426 Some(ReaderType::Default(reader)) => reader.get().is_some(),
1427 _ => false,
1428 }
1429 }
1430
1431 pub(crate) fn has_byob_reader(&self) -> bool {
1432 match self.reader.borrow().as_ref() {
1433 Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1434 _ => false,
1435 }
1436 }
1437
1438 pub(crate) fn has_byte_controller(&self) -> bool {
1439 match self.controller.borrow().as_ref() {
1440 Some(ControllerType::Byte(controller)) => controller.get().is_some(),
1441 _ => false,
1442 }
1443 }
1444
1445 pub(crate) fn get_num_read_requests(&self) -> usize {
1447 match self.reader.borrow().as_ref() {
1448 Some(ReaderType::Default(reader)) => {
1449 let reader = reader
1450 .get()
1451 .expect("Stream must have a reader when getting the number of read requests.");
1452 reader.get_num_read_requests()
1453 },
1454 _ => unreachable!(
1455 "Stream must have a default reader when get num read requests is called into."
1456 ),
1457 }
1458 }
1459
1460 pub(crate) fn get_num_read_into_requests(&self) -> usize {
1462 assert!(self.has_byob_reader());
1463
1464 match self.reader.borrow().as_ref() {
1465 Some(ReaderType::BYOB(reader)) => {
1466 let Some(reader) = reader.get() else {
1467 unreachable!(
1468 "Stream must have a reader when get num read into requests is called into."
1469 );
1470 };
1471 reader.get_num_read_into_requests()
1472 },
1473 _ => {
1474 unreachable!(
1475 "Stream must have a BYOB reader when get num read into requests is called into."
1476 );
1477 },
1478 }
1479 }
1480
1481 pub(crate) fn fulfill_read_request(
1483 &self,
1484 cx: &mut JSContext,
1485 chunk: SafeHandleValue,
1486 done: bool,
1487 ) {
1488 assert!(self.has_default_reader());
1490
1491 match self.reader.borrow().as_ref() {
1492 Some(ReaderType::Default(reader)) => {
1493 let reader = reader
1495 .get()
1496 .expect("Stream must have a reader when a read request is fulfilled.");
1497 assert_ne!(reader.get_num_read_requests(), 0);
1499 let request = reader.remove_read_request();
1502
1503 if done {
1504 request.close_steps(cx);
1506 } else {
1507 let result = RootedTraceableBox::new(Heap::default());
1509 result.set(*chunk);
1510 request.chunk_steps(cx, result, &self.global());
1511 }
1512 },
1513 _ => {
1514 unreachable!(
1515 "Stream must have a default reader when fulfill read requests is called into."
1516 );
1517 },
1518 }
1519 }
1520
1521 pub(crate) fn fulfill_read_into_request(
1523 &self,
1524 cx: &mut JSContext,
1525 chunk: SafeHandleValue,
1526 done: bool,
1527 ) {
1528 assert!(self.has_byob_reader());
1530
1531 match self.reader.borrow().as_ref() {
1533 Some(ReaderType::BYOB(reader)) => {
1534 let Some(reader) = reader.get() else {
1535 unreachable!(
1536 "Stream must have a reader when a read into request is fulfilled."
1537 );
1538 };
1539
1540 assert!(reader.get_num_read_into_requests() > 0);
1542
1543 let read_into_request = reader.remove_read_into_request();
1546
1547 let result = RootedTraceableBox::new(Heap::default());
1549 if done {
1550 result.set(*chunk);
1551 read_into_request.close_steps(cx, Some(result));
1552 } else {
1553 result.set(*chunk);
1555 read_into_request.chunk_steps(result, CanGc::from_cx(cx));
1556 }
1557 },
1558 _ => {
1559 unreachable!(
1560 "Stream must have a BYOB reader when fulfill read into requests is called into."
1561 );
1562 },
1563 };
1564 }
1565
1566 pub(crate) fn close(&self, cx: &mut JSContext) {
1568 assert!(self.is_readable());
1570 self.state.set(ReadableStreamState::Closed);
1572 let default_reader = {
1578 let reader_ref = self.reader.borrow();
1579 match reader_ref.as_ref() {
1580 Some(ReaderType::Default(reader)) => reader.get(),
1581 _ => None,
1582 }
1583 };
1584
1585 if let Some(reader) = default_reader {
1586 reader.close(cx);
1588 return;
1589 }
1590
1591 let byob_reader = {
1593 let reader_ref = self.reader.borrow();
1594 match reader_ref.as_ref() {
1595 Some(ReaderType::BYOB(reader)) => reader.get(),
1596 _ => None,
1597 }
1598 };
1599
1600 if let Some(reader) = byob_reader {
1601 reader.close(CanGc::from_cx(cx));
1603 }
1604
1605 }
1607
1608 pub(crate) fn cancel(
1610 &self,
1611 cx: &mut JSContext,
1612 global: &GlobalScope,
1613 reason: SafeHandleValue,
1614 ) -> Rc<Promise> {
1615 self.disturbed.set(true);
1617
1618 if self.is_closed() {
1620 return Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
1621 }
1622 if self.is_errored() {
1624 let promise = Promise::new2(cx, global);
1625 rooted!(&in(cx) let mut rval = UndefinedValue());
1626 self.stored_error.safe_to_jsval(cx, rval.handle_mut());
1627 promise.reject_native_with_cx(cx, &rval.handle());
1628 return promise;
1629 }
1630 self.close(cx);
1632
1633 let byob_reader = {
1635 let reader_ref = self.reader.borrow();
1636 match reader_ref.as_ref() {
1637 Some(ReaderType::BYOB(reader)) => reader.get(),
1638 _ => None,
1639 }
1640 };
1641
1642 if let Some(reader) = byob_reader {
1643 reader.cancel(cx);
1645 }
1646
1647 let source_cancel_promise = match self.controller.borrow().as_ref() {
1650 Some(ControllerType::Default(controller)) => controller
1651 .get()
1652 .expect("Stream should have controller.")
1653 .perform_cancel_steps(cx, global, reason),
1654 Some(ControllerType::Byte(controller)) => controller
1655 .get()
1656 .expect("Stream should have controller.")
1657 .perform_cancel_steps(cx, global, reason),
1658 None => {
1659 panic!("Stream does not have a controller.");
1660 },
1661 };
1662
1663 let global = self.global();
1666 let result_promise = Promise::new2(cx, &global);
1667 let fulfillment_handler = Box::new(SourceCancelPromiseFulfillmentHandler {
1668 result: result_promise.clone(),
1669 });
1670 let rejection_handler = Box::new(SourceCancelPromiseRejectionHandler {
1671 result: result_promise.clone(),
1672 });
1673 let handler = PromiseNativeHandler::new(
1674 cx,
1675 &global,
1676 Some(fulfillment_handler),
1677 Some(rejection_handler),
1678 );
1679 let mut realm = enter_auto_realm(cx, &*global);
1680 let cx = &mut realm.current_realm();
1681 source_cancel_promise.append_native_handler(cx, &handler);
1682
1683 result_promise
1686 }
1687
1688 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1689 pub(crate) fn set_reader(&self, new_reader: Option<ReaderType>) {
1690 *self.reader.borrow_mut() = new_reader;
1691 }
1692
1693 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1694 fn byte_tee(&self, cx: &mut JSContext) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1696 let reader = self.acquire_default_reader(CanGc::from_cx(cx))?;
1701 let reader = Rc::new(RefCell::new(ReaderType::Default(MutNullableDom::new(
1702 Some(&reader),
1703 ))));
1704
1705 let reading = Rc::new(Cell::new(false));
1707
1708 let read_again_for_branch_1 = Rc::new(Cell::new(false));
1710
1711 let read_again_for_branch_2 = Rc::new(Cell::new(false));
1713
1714 let canceled_1 = Rc::new(Cell::new(false));
1716
1717 let canceled_2 = Rc::new(Cell::new(false));
1719
1720 let reason_1 = Rc::new(Heap::default());
1722
1723 let reason_2 = Rc::new(Heap::default());
1725
1726 let cancel_promise = Promise::new2(cx, &self.global());
1728 let reader_version = Rc::new(Cell::new(0));
1729
1730 let byte_tee_source_1 = ByteTeeUnderlyingSource::new(
1731 reader.clone(),
1732 self,
1733 reading.clone(),
1734 read_again_for_branch_1.clone(),
1735 read_again_for_branch_2.clone(),
1736 canceled_1.clone(),
1737 canceled_2.clone(),
1738 reason_1.clone(),
1739 reason_2.clone(),
1740 cancel_promise.clone(),
1741 reader_version.clone(),
1742 ByteTeeCancelAlgorithm::Cancel1Algorithm,
1743 ByteTeePullAlgorithm::Pull1Algorithm,
1744 CanGc::from_cx(cx),
1745 );
1746
1747 let byte_tee_source_2 = ByteTeeUnderlyingSource::new(
1748 reader.clone(),
1749 self,
1750 reading,
1751 read_again_for_branch_1,
1752 read_again_for_branch_2,
1753 canceled_1,
1754 canceled_2,
1755 reason_1,
1756 reason_2,
1757 cancel_promise,
1758 reader_version,
1759 ByteTeeCancelAlgorithm::Cancel2Algorithm,
1760 ByteTeePullAlgorithm::Pull2Algorithm,
1761 CanGc::from_cx(cx),
1762 );
1763
1764 let branch_1 = readable_byte_stream_tee(
1766 cx,
1767 &self.global(),
1768 UnderlyingSourceType::TeeByte(&byte_tee_source_1),
1769 );
1770 byte_tee_source_1.set_branch_1(&branch_1);
1771 byte_tee_source_2.set_branch_1(&branch_1);
1772
1773 let branch_2 = readable_byte_stream_tee(
1775 cx,
1776 &self.global(),
1777 UnderlyingSourceType::TeeByte(&byte_tee_source_2),
1778 );
1779 byte_tee_source_1.set_branch_2(&branch_2);
1780 byte_tee_source_2.set_branch_2(&branch_2);
1781
1782 byte_tee_source_1.forward_reader_error(cx, reader.clone());
1784 byte_tee_source_2.forward_reader_error(cx, reader);
1785
1786 Ok(vec![branch_1, branch_2])
1788 }
1789
1790 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1792 fn default_tee(
1793 &self,
1794 cx: &mut JSContext,
1795 clone_for_branch_2: bool,
1796 ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1797 let clone_for_branch_2 = Rc::new(Cell::new(clone_for_branch_2));
1801
1802 let reader = self.acquire_default_reader(CanGc::from_cx(cx))?;
1804
1805 let reading = Rc::new(Cell::new(false));
1807 let read_again = Rc::new(Cell::new(false));
1809 let canceled_1 = Rc::new(Cell::new(false));
1811 let canceled_2 = Rc::new(Cell::new(false));
1813
1814 let reason_1 = Rc::new(Heap::default());
1816 let reason_2 = Rc::new(Heap::default());
1818 let cancel_promise = Promise::new2(cx, &self.global());
1820
1821 let tee_source_1 = DefaultTeeUnderlyingSource::new(
1822 &reader,
1823 self,
1824 reading.clone(),
1825 read_again.clone(),
1826 canceled_1.clone(),
1827 canceled_2.clone(),
1828 clone_for_branch_2.clone(),
1829 reason_1.clone(),
1830 reason_2.clone(),
1831 cancel_promise.clone(),
1832 DefaultTeeCancelAlgorithm::Cancel1Algorithm,
1833 CanGc::from_cx(cx),
1834 );
1835
1836 let underlying_source_type_branch_1 = UnderlyingSourceType::Tee(&tee_source_1);
1837
1838 let tee_source_2 = DefaultTeeUnderlyingSource::new(
1839 &reader,
1840 self,
1841 reading,
1842 read_again,
1843 canceled_1.clone(),
1844 canceled_2.clone(),
1845 clone_for_branch_2,
1846 reason_1,
1847 reason_2,
1848 cancel_promise.clone(),
1849 DefaultTeeCancelAlgorithm::Cancel2Algorithm,
1850 CanGc::from_cx(cx),
1851 );
1852
1853 let underlying_source_type_branch_2 = UnderlyingSourceType::Tee(&tee_source_2);
1854
1855 let branch_1 = create_readable_stream(
1857 cx,
1858 &self.global(),
1859 underlying_source_type_branch_1,
1860 None,
1861 None,
1862 );
1863 tee_source_1.set_branch_1(&branch_1);
1864 tee_source_2.set_branch_1(&branch_1);
1865
1866 let branch_2 = create_readable_stream(
1868 cx,
1869 &self.global(),
1870 underlying_source_type_branch_2,
1871 None,
1872 None,
1873 );
1874 tee_source_1.set_branch_2(&branch_2);
1875 tee_source_2.set_branch_2(&branch_2);
1876
1877 reader.default_tee_append_native_handler_to_closed_promise(
1879 cx,
1880 &branch_1,
1881 &branch_2,
1882 canceled_1,
1883 canceled_2,
1884 cancel_promise,
1885 );
1886
1887 Ok(vec![branch_1, branch_2])
1889 }
1890
1891 #[allow(clippy::too_many_arguments)]
1893 pub(crate) fn pipe_to(
1894 &self,
1895 cx: &mut CurrentRealm,
1896 global: &GlobalScope,
1897 dest: &WritableStream,
1898 prevent_close: bool,
1899 prevent_abort: bool,
1900 prevent_cancel: bool,
1901 signal: Option<&AbortSignal>,
1902 ) -> Rc<Promise> {
1903 assert!(!self.is_locked());
1914
1915 assert!(!dest.is_locked());
1917
1918 let reader = self
1926 .acquire_default_reader(CanGc::from_cx(cx))
1927 .expect("Acquiring a default reader for pipe_to cannot fail");
1928
1929 let writer = dest
1931 .aquire_default_writer(cx.into(), global, CanGc::from_cx(cx))
1932 .expect("Acquiring a default writer for pipe_to cannot fail");
1933
1934 self.disturbed.set(true);
1936
1937 let promise = Promise::new2(cx, global);
1942
1943 rooted!(&in(cx) let pipe_to = PipeTo {
1945 reader: Dom::from_ref(&reader),
1946 writer: Dom::from_ref(&writer),
1947 pending_writes: Default::default(),
1948 state: Default::default(),
1949 prevent_abort,
1950 prevent_cancel,
1951 prevent_close,
1952 shutting_down: Default::default(),
1953 abort_reason: Default::default(),
1954 shutdown_error: Default::default(),
1955 shutdown_action_promise: Default::default(),
1956 result_promise: promise.clone(),
1957 });
1958
1959 if let Some(signal) = signal {
1962 rooted!(&in(cx) let abort_algorithm = AbortAlgorithm::StreamPiping(pipe_to.clone()));
1965
1966 if signal.aborted() {
1968 signal.run_abort_algorithm(cx, global, &abort_algorithm);
1969 return promise;
1970 }
1971
1972 signal.add(&abort_algorithm);
1974 }
1975
1976 pipe_to.check_and_propagate_errors_forward(cx, global);
1978 pipe_to.check_and_propagate_errors_backward(cx, global);
1979 pipe_to.check_and_propagate_closing_forward(cx, global);
1980 pipe_to.check_and_propagate_closing_backward(cx, global);
1981
1982 if *pipe_to.state.borrow() == PipeToState::Starting {
1984 pipe_to.wait_for_writer_ready(cx, global);
1986 }
1987
1988 promise
1990 }
1991
1992 pub(crate) fn tee(
1994 &self,
1995 cx: &mut JSContext,
1996 clone_for_branch_2: bool,
1997 ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1998 match self.controller.borrow().as_ref() {
2002 Some(ControllerType::Default(_)) => {
2003 self.default_tee(cx, clone_for_branch_2)
2005 },
2006 Some(ControllerType::Byte(_)) => {
2007 self.byte_tee(cx)
2010 },
2011 None => {
2012 unreachable!("Stream should have a controller.");
2013 },
2014 }
2015 }
2016
2017 fn set_up_byte_controller(
2019 &self,
2020 cx: &mut JSContext,
2021 global: &GlobalScope,
2022 underlying_source_dict: JsUnderlyingSource,
2023 underlying_source_handle: SafeHandleObject,
2024 stream: DomRoot<ReadableStream>,
2025 strategy_hwm: f64,
2026 ) -> Fallible<()> {
2027 if let Some(0) = underlying_source_dict.autoAllocateChunkSize {
2043 return Err(Error::Type(c"autoAllocateChunkSize cannot be 0".to_owned()));
2044 }
2045
2046 let controller = ReadableByteStreamController::new(
2047 UnderlyingSourceType::Js(underlying_source_dict),
2048 strategy_hwm,
2049 global,
2050 CanGc::from_cx(cx),
2051 );
2052
2053 controller.set_underlying_source_this_object(underlying_source_handle);
2056
2057 controller.setup(cx, global, stream)
2060 }
2061
2062 pub(crate) fn setup_cross_realm_transform_readable(
2064 &self,
2065 cx: &mut JSContext,
2066 port: &MessagePort,
2067 ) {
2068 let port_id = port.message_port_id();
2069 let global = self.global();
2070
2071 let size_algorithm =
2076 extract_size_algorithm(&QueuingStrategy::default(), CanGc::from_cx(cx));
2077
2078 let controller = ReadableStreamDefaultController::new(
2082 &self.global(),
2083 UnderlyingSourceType::Transfer(port),
2084 0.,
2085 size_algorithm,
2086 CanGc::from_cx(cx),
2087 );
2088
2089 rooted!(&in(cx) let cross_realm_transform_readable = CrossRealmTransformReadable {
2092 controller: Dom::from_ref(&controller),
2093 });
2094 global.note_cross_realm_transform_readable(&cross_realm_transform_readable, port_id);
2095
2096 port.Start(cx);
2098
2099 controller
2101 .setup(cx, DomRoot::from_ref(self))
2102 .expect("Setting up controller for transfer cannot fail.");
2103 }
2104}
2105
2106impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
2107 fn Constructor(
2109 cx: &mut JSContext,
2110 global: &GlobalScope,
2111 proto: Option<SafeHandleObject>,
2112 underlying_source: Option<*mut JSObject>,
2113 strategy: &QueuingStrategy,
2114 ) -> Fallible<DomRoot<Self>> {
2115 rooted!(&in(cx) let underlying_source_obj = underlying_source.unwrap_or(ptr::null_mut()));
2117 let underlying_source_dict = if !underlying_source_obj.is_null() {
2120 rooted!(&in(cx) let obj_val = ObjectValue(underlying_source_obj.get()));
2121 match JsUnderlyingSource::new(cx, obj_val.handle()) {
2122 Ok(ConversionResult::Success(val)) => val,
2123 Ok(ConversionResult::Failure(error)) => {
2124 return Err(Error::Type(error.into_owned()));
2125 },
2126 _ => {
2127 return Err(Error::JSFailed);
2128 },
2129 }
2130 } else {
2131 JsUnderlyingSource::empty()
2132 };
2133
2134 let stream = ReadableStream::new_with_proto(global, proto, CanGc::from_cx(cx));
2136
2137 if underlying_source_dict.type_.is_some() {
2138 if strategy.size.is_some() {
2140 return Err(Error::Range(
2141 c"size is not supported for byte streams".to_owned(),
2142 ));
2143 }
2144
2145 let strategy_hwm = extract_high_water_mark(strategy, 0.0)?;
2147
2148 stream.set_up_byte_controller(
2151 cx,
2152 global,
2153 underlying_source_dict,
2154 underlying_source_obj.handle(),
2155 stream.clone(),
2156 strategy_hwm,
2157 )?;
2158 } else {
2159 let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
2161
2162 let size_algorithm = extract_size_algorithm(strategy, CanGc::from_cx(cx));
2164
2165 let controller = ReadableStreamDefaultController::new(
2166 global,
2167 UnderlyingSourceType::Js(underlying_source_dict),
2168 high_water_mark,
2169 size_algorithm,
2170 CanGc::from_cx(cx),
2171 );
2172
2173 controller.set_underlying_source_this_object(underlying_source_obj.handle());
2176
2177 controller.setup(cx, stream.clone())?;
2179 };
2180
2181 Ok(stream)
2182 }
2183
2184 fn Locked(&self) -> bool {
2186 self.is_locked()
2187 }
2188
2189 fn Cancel(&self, cx: &mut JSContext, reason: SafeHandleValue) -> Rc<Promise> {
2191 let global = self.global();
2192 if self.is_locked() {
2193 let promise = Promise::new2(cx, &global);
2196 promise.reject_error_with_cx(cx, Error::Type(c"stream is locked".to_owned()));
2197 promise
2198 } else {
2199 self.cancel(cx, &global, reason)
2201 }
2202 }
2203
2204 fn GetReader(
2206 &self,
2207 options: &ReadableStreamGetReaderOptions,
2208 can_gc: CanGc,
2209 ) -> Fallible<ReadableStreamReader> {
2210 if options.mode.is_none() {
2212 return Ok(ReadableStreamReader::ReadableStreamDefaultReader(
2213 self.acquire_default_reader(can_gc)?,
2214 ));
2215 }
2216 assert!(options.mode.unwrap() == ReadableStreamReaderMode::Byob);
2218
2219 Ok(ReadableStreamReader::ReadableStreamBYOBReader(
2221 self.acquire_byob_reader(can_gc)?,
2222 ))
2223 }
2224
2225 fn Tee(&self, cx: &mut JSContext) -> Fallible<Vec<DomRoot<ReadableStream>>> {
2227 self.tee(cx, false)
2229 }
2230
2231 fn PipeTo(
2233 &self,
2234 cx: &mut CurrentRealm,
2235 destination: &WritableStream,
2236 options: &StreamPipeOptions,
2237 ) -> Rc<Promise> {
2238 let global = self.global();
2239
2240 if self.is_locked() {
2242 let promise = Promise::new2(cx, &global);
2244 promise.reject_error_with_cx(cx, Error::Type(c"Source stream is locked".to_owned()));
2245 return promise;
2246 }
2247
2248 if destination.is_locked() {
2250 let promise = Promise::new2(cx, &global);
2252 promise
2253 .reject_error_with_cx(cx, Error::Type(c"Destination stream is locked".to_owned()));
2254 return promise;
2255 }
2256
2257 let signal = options.signal.as_deref();
2259
2260 self.pipe_to(
2262 cx,
2263 &global,
2264 destination,
2265 options.preventClose,
2266 options.preventAbort,
2267 options.preventCancel,
2268 signal,
2269 )
2270 }
2271
2272 fn PipeThrough(
2274 &self,
2275 cx: &mut CurrentRealm,
2276 transform: &ReadableWritablePair,
2277 options: &StreamPipeOptions,
2278 ) -> Fallible<DomRoot<ReadableStream>> {
2279 let global = self.global();
2280
2281 if self.is_locked() {
2283 return Err(Error::Type(c"Source stream is locked".to_owned()));
2284 }
2285
2286 if transform.writable.is_locked() {
2288 return Err(Error::Type(c"Destination stream is locked".to_owned()));
2289 }
2290
2291 let signal = options.signal.as_deref();
2293
2294 let promise = self.pipe_to(
2297 cx,
2298 &global,
2299 &transform.writable,
2300 options.preventClose,
2301 options.preventAbort,
2302 options.preventCancel,
2303 signal,
2304 );
2305
2306 promise.set_promise_is_handled();
2308
2309 Ok(transform.readable.clone())
2311 }
2312}
2313
2314pub(crate) fn get_type_and_value_from_message(
2318 cx: &mut JSContext,
2319 data: SafeHandleValue,
2320 value: SafeMutableHandleValue,
2321) -> DOMString {
2322 assert!(data.is_object());
2328 rooted!(&in(cx) let data_object = data.to_object());
2329
2330 let type_ = get_property::<DOMString>(
2332 cx,
2333 data_object.handle(),
2334 c"type",
2335 StringificationBehavior::Empty,
2336 );
2337
2338 get_property_jsval(cx, data_object.handle(), c"value", value)
2340 .expect("Getting the value should not fail.");
2341
2342 type_
2344 .expect("The type of the message should be a string")
2345 .expect("Property should be present")
2346}
2347
2348impl js::gc::Rootable for CrossRealmTransformReadable {}
2349
2350#[derive(Clone, JSTraceable, MallocSizeOf)]
2354#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
2355pub(crate) struct CrossRealmTransformReadable {
2356 controller: Dom<ReadableStreamDefaultController>,
2358}
2359
2360impl CrossRealmTransformReadable {
2361 pub(crate) fn handle_message(
2364 &self,
2365 cx: &mut CurrentRealm,
2366 global: &GlobalScope,
2367 port: &MessagePort,
2368 message: SafeHandleValue,
2369 ) {
2370 rooted!(&in(cx) let mut value = UndefinedValue());
2371 let type_string = get_type_and_value_from_message(cx, message, value.handle_mut());
2372
2373 if type_string == "chunk" {
2375 self.controller
2377 .enqueue(cx, value.handle())
2378 .expect("Enqueing a chunk should not fail.");
2379 }
2380
2381 if type_string == "close" {
2383 self.controller.close(cx);
2385
2386 global.disentangle_port(cx, port);
2388 }
2389
2390 if type_string == "error" {
2392 self.controller.error(cx, value.handle());
2394
2395 global.disentangle_port(cx, port);
2397 }
2398 }
2399
2400 pub(crate) fn handle_error(
2403 &self,
2404 cx: &mut CurrentRealm,
2405 global: &GlobalScope,
2406 port: &MessagePort,
2407 ) {
2408 let error = DOMException::new(global, DOMErrorName::DataCloneError, CanGc::from_cx(cx));
2410 rooted!(&in(cx) let mut rooted_error = UndefinedValue());
2411 error.safe_to_jsval(cx, rooted_error.handle_mut());
2412
2413 port.cross_realm_transform_send_error(cx, rooted_error.handle());
2415
2416 self.controller.error(cx, rooted_error.handle());
2418
2419 global.disentangle_port(cx, port);
2421 }
2422}
2423
2424pub(crate) fn get_read_promise_done(
2426 cx: &mut JSContext,
2427 v: &SafeHandleValue,
2428) -> Result<bool, Error> {
2429 if !v.is_object() {
2430 return Err(Error::Type(c"Unknown format for done property.".to_owned()));
2431 }
2432
2433 rooted!(&in(cx) let object = v.to_object());
2434 get_property::<bool>(cx, object.handle(), c"done", ())?
2435 .ok_or(Error::Type(c"Promise has no done property.".to_owned()))
2436}
2437
2438pub(crate) fn get_read_promise_bytes(
2440 cx: &mut JSContext,
2441 v: &SafeHandleValue,
2442) -> Result<Vec<u8>, Error> {
2443 if !v.is_object() {
2444 return Err(Error::Type(
2445 c"Unknown format for for bytes read.".to_owned(),
2446 ));
2447 }
2448
2449 rooted!(&in(cx) let object = v.to_object());
2450 get_property::<Vec<u8>>(
2451 cx,
2452 object.handle(),
2453 c"value",
2454 ConversionBehavior::EnforceRange,
2455 )?
2456 .ok_or(Error::Type(c"Promise has no value property.".to_owned()))
2457}
2458
2459pub(crate) fn bytes_from_chunk_jsval(
2463 cx: &mut JSContext,
2464 chunk: &RootedTraceableBox<Heap<JSVal>>,
2465) -> Result<Vec<u8>, Error> {
2466 match Vec::<u8>::safe_from_jsval(cx, chunk.handle(), ConversionBehavior::EnforceRange) {
2467 Ok(ConversionResult::Success(vec)) => Ok(vec),
2468 Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.into_owned())),
2469 _ => Err(Error::Type(c"Unknown format for bytes read.".to_owned())),
2470 }
2471}
2472
2473impl Transferable for ReadableStream {
2475 type Index = MessagePortIndex;
2476 type Data = MessagePortImpl;
2477
2478 fn transfer(&self, cx: &mut JSContext) -> Fallible<(MessagePortId, MessagePortImpl)> {
2480 if self.is_locked() {
2483 return Err(Error::DataClone(None));
2484 }
2485
2486 let global = self.global();
2487 let mut realm = enter_auto_realm(cx, &*global);
2488 let mut realm = realm.current_realm();
2489 let cx = &mut realm;
2490
2491 let port_1 = MessagePort::new(&global, CanGc::from_cx(cx));
2493 global.track_message_port(&port_1, None);
2494
2495 let port_2 = MessagePort::new(&global, CanGc::from_cx(cx));
2497 global.track_message_port(&port_2, None);
2498
2499 global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
2501
2502 let writable = WritableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
2504
2505 writable.setup_cross_realm_transform_writable(cx, &port_1);
2507
2508 let promise = self.pipe_to(cx, &global, &writable, false, false, false, None);
2510
2511 promise.set_promise_is_handled();
2513
2514 port_2.transfer(cx)
2516 }
2517
2518 fn transfer_receive(
2520 cx: &mut JSContext,
2521 owner: &GlobalScope,
2522 id: MessagePortId,
2523 port_impl: MessagePortImpl,
2524 ) -> Result<DomRoot<Self>, ()> {
2525 let value = ReadableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
2528
2529 let transferred_port = MessagePort::transfer_receive(cx, owner, id, port_impl)?;
2536
2537 value.setup_cross_realm_transform_readable(cx, &transferred_port);
2539 Ok(value)
2540 }
2541
2542 fn serialized_storage<'a>(
2544 data: StructuredData<'a, '_>,
2545 ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
2546 match data {
2547 StructuredData::Reader(r) => &mut r.port_impls,
2548 StructuredData::Writer(w) => &mut w.ports,
2549 }
2550 }
2551}