1use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr::{self};
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::context::JSContext;
12use js::conversions::{FromJSValConvertible, ToJSValConvertible};
13use js::jsapi::{Heap, JSObject};
14use js::jsval::{JSVal, ObjectValue, UndefinedValue};
15use js::realm::CurrentRealm;
16use js::rust::{
17 HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
18 MutableHandleValue as SafeMutableHandleValue,
19};
20use js::typedarray::{ArrayBufferViewU8, Uint8};
21use rustc_hash::FxHashMap;
22use servo_base::generic_channel::GenericSharedMemory;
23use servo_base::id::{MessagePortId, MessagePortIndex};
24use servo_constellation_traits::MessagePortImpl;
25
26use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
27use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{
28 ReadableStreamGetReaderOptions, ReadableStreamMethods, ReadableStreamReaderMode,
29 ReadableWritablePair, StreamPipeOptions,
30};
31use script_bindings::str::DOMString;
32
33use crate::dom::domexception::{DOMErrorName, DOMException};
34use crate::dom::encoding::textdecoderstream::TextDecoderStream;
35use script_bindings::codegen::GenericBindings::TextDecoderStreamBinding::TextDecoderStreamMethods;
36use script_bindings::conversions::{is_array_like, StringificationBehavior};
37use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
38use crate::dom::abortsignal::{AbortAlgorithm, AbortSignal};
39use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods;
40use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultController_Binding::ReadableStreamDefaultControllerMethods;
41use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource;
42use crate::dom::bindings::conversions::{ConversionBehavior, ConversionResult, get_property, get_property_jsval};
43use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
44use crate::dom::bindings::codegen::GenericBindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriter_Binding::WritableStreamDefaultWriterMethods;
45use crate::dom::stream::writablestream::WritableStream;
46use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultReaderOrReadableStreamBYOBReader as ReadableStreamReader;
47use crate::dom::bindings::reflector::DomGlobal;
48use script_bindings::reflector::{Reflector, reflect_dom_object_with_proto_and_cx};
49use crate::dom::bindings::root::{DomRoot, MutNullableDom, Dom};
50use crate::dom::bindings::trace::RootedTraceableBox;
51use crate::dom::stream::byteteeunderlyingsource::{ByteTeeCancelAlgorithm, ByteTeePullAlgorithm, ByteTeeUnderlyingSource};
52use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
53use crate::dom::stream::readablestreamgenericreader::ReadableStreamGenericReader;
54use crate::dom::globalscope::GlobalScope;
55use crate::dom::promise::{wait_for_all_promise, Promise};
56use crate::dom::stream::readablebytestreamcontroller::ReadableByteStreamController;
57use crate::dom::stream::readablestreambyobreader::ReadableStreamBYOBReader;
58use crate::dom::stream::readablestreamdefaultcontroller::ReadableStreamDefaultController;
59use crate::dom::stream::readablestreamdefaultreader::{ReadRequest, ReadableStreamDefaultReader};
60use crate::dom::stream::defaultteeunderlyingsource::DefaultTeeCancelAlgorithm;
61use crate::dom::types::DefaultTeeUnderlyingSource;
62use crate::dom::stream::underlyingsourcecontainer::UnderlyingSourceType;
63use crate::dom::stream::writablestreamdefaultwriter::WritableStreamDefaultWriter;
64use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
65use crate::dom::messageport::MessagePort;
66use crate::realms::{enter_auto_realm};
67use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
68use crate::dom::bindings::transferable::Transferable;
69use crate::dom::bindings::structuredclone::StructuredData;
70
71use super::readablestreambyobreader::ReadIntoRequest;
72use crate::dom::bindings::buffer_source::{BufferSource, HeapBufferSource, create_buffer_source};
73
74#[derive(Clone, Debug, Default, MallocSizeOf, PartialEq)]
76enum PipeToState {
77 #[default]
79 Starting,
80 PendingReady,
82 PendingRead,
84 ShuttingDownWithPendingWrites(Option<ShutdownAction>),
87 ShuttingDownPendingAction,
91 Finalized,
94}
95
96#[derive(Clone, Debug, MallocSizeOf, PartialEq)]
98enum ShutdownAction {
99 WritableStreamAbort,
101 ReadableStreamCancel,
103 WritableStreamDefaultWriterCloseWithErrorPropagation,
105 Abort,
107}
108
109impl js::gc::Rootable for PipeTo {}
110
111#[derive(Clone, JSTraceable, MallocSizeOf)]
120#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
121pub(crate) struct PipeTo {
122 reader: Dom<ReadableStreamDefaultReader>,
124
125 writer: Dom<WritableStreamDefaultWriter>,
127
128 #[ignore_malloc_size_of = "nested Rc"]
131 pending_writes: Rc<RefCell<VecDeque<Rc<Promise>>>>,
132
133 #[conditional_malloc_size_of]
135 #[no_trace]
136 state: Rc<RefCell<PipeToState>>,
137
138 prevent_abort: bool,
140
141 prevent_cancel: bool,
143
144 prevent_close: bool,
146
147 #[conditional_malloc_size_of]
150 shutting_down: Rc<Cell<bool>>,
151
152 #[ignore_malloc_size_of = "mozjs"]
155 abort_reason: Rc<Heap<JSVal>>,
156
157 #[ignore_malloc_size_of = "mozjs"]
160 shutdown_error: Rc<RefCell<Option<Heap<JSVal>>>>,
161
162 #[ignore_malloc_size_of = "nested Rc"]
165 shutdown_action_promise: Rc<RefCell<Option<Rc<Promise>>>>,
166
167 #[conditional_malloc_size_of]
170 result_promise: Rc<Promise>,
171}
172
173impl PipeTo {
174 pub(crate) fn abort_with_reason(
177 &self,
178 cx: &mut CurrentRealm,
179 global: &GlobalScope,
180 reason: SafeHandleValue,
181 ) {
182 if self.shutting_down.get() {
184 return;
185 }
186
187 self.abort_reason.set(reason.get());
191
192 self.set_shutdown_error(reason);
197
198 self.shutdown(cx, global, Some(ShutdownAction::Abort));
204 }
205}
206
207impl Callback for PipeTo {
208 fn callback(&self, cx: &mut CurrentRealm, result: SafeHandleValue) {
217 let global = self.reader.global();
218
219 self.pending_writes.borrow_mut().retain(|p| {
225 let pending = p.is_pending();
226 if !pending {
227 p.set_promise_is_handled(cx);
228 }
229 pending
230 });
231
232 let state_before_checks = self.state.borrow().clone();
234
235 if state_before_checks == PipeToState::PendingRead {
242 let source = self.reader.get_stream().expect("Source stream must be set");
243 if source.is_closed() {
244 let dest = self
245 .writer
246 .get_stream()
247 .expect("Destination stream must be set");
248
249 if dest.is_writable() && !dest.close_queued_or_in_flight() {
252 let Ok(done) = get_read_promise_done(cx, &result) else {
253 return;
260 };
261
262 if !done {
263 self.write_chunk(cx, &global, result);
265 }
266 }
267 }
268 }
269
270 self.check_and_propagate_errors_forward(cx, &global);
271 self.check_and_propagate_errors_backward(cx, &global);
272 self.check_and_propagate_closing_forward(cx, &global);
273 self.check_and_propagate_closing_backward(cx, &global);
274
275 let state = self.state.borrow().clone();
277
278 if state != state_before_checks {
282 return;
283 }
284
285 match state {
286 PipeToState::Starting => unreachable!("PipeTo should not be in the Starting state."),
287 PipeToState::PendingReady => {
288 self.read_chunk(cx, &global);
290 },
291 PipeToState::PendingRead => {
292 self.write_chunk(cx, &global, result);
294
295 if self.shutting_down.get() {
297 return;
298 }
299
300 self.wait_for_writer_ready(cx, &global);
302 },
303 PipeToState::ShuttingDownWithPendingWrites(action) => {
304 if let Some(write) = self.pending_writes.borrow_mut().front().cloned() {
307 self.wait_on_pending_write(cx, &global, write);
308 return;
309 }
310
311 if let Some(action) = action {
313 self.perform_action(cx, &global, action);
315 } else {
316 self.finalize(cx, &global);
318 }
319 },
320 PipeToState::ShuttingDownPendingAction => {
321 let Some(ref promise) = *self.shutdown_action_promise.borrow() else {
322 unreachable!();
323 };
324 if promise.is_pending() {
325 return;
329 }
330
331 let is_array_like = {
332 if !result.is_object() {
333 false
334 } else {
335 is_array_like::<crate::DomTypeHolder>(cx, result)
336 }
337 };
338
339 if !result.is_undefined() && !is_array_like {
341 self.set_shutdown_error(result);
351 }
352 self.finalize(cx, &global);
353 },
354 PipeToState::Finalized => {},
355 }
356 }
357}
358
359impl PipeTo {
360 fn set_shutdown_error(&self, error: SafeHandleValue) {
363 *self.shutdown_error.borrow_mut() = Some(Heap::default());
364 let Some(ref heap) = *self.shutdown_error.borrow() else {
365 unreachable!("Option set to Some(heap) above.");
366 };
367 heap.set(error.get())
368 }
369
370 fn wait_for_writer_ready(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
373 {
374 let mut state = self.state.borrow_mut();
375 *state = PipeToState::PendingReady;
376 }
377
378 let ready_promise = self.writer.Ready();
379 if ready_promise.is_fulfilled() {
380 self.read_chunk(cx, global);
381 } else {
382 let handler = PromiseNativeHandler::new(
383 cx,
384 global,
385 Some(Box::new(self.clone())),
386 Some(Box::new(self.clone())),
387 );
388 ready_promise.append_native_handler(cx, &handler);
389
390 let closed_promise = self.reader.Closed();
394 closed_promise.append_native_handler(cx, &handler);
395 }
396 }
397
398 fn read_chunk(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
400 *self.state.borrow_mut() = PipeToState::PendingRead;
401 let chunk_promise = self.reader.Read(cx);
402 let handler = PromiseNativeHandler::new(
403 cx,
404 global,
405 Some(Box::new(self.clone())),
406 Some(Box::new(self.clone())),
407 );
408 chunk_promise.append_native_handler(cx, &handler);
409
410 let ready_promise = self.writer.Closed();
413 ready_promise.append_native_handler(cx, &handler);
414 }
415
416 fn write_chunk(
419 &self,
420 cx: &mut JSContext,
421 global: &GlobalScope,
422 chunk: SafeHandleValue,
423 ) -> bool {
424 if chunk.is_object() {
425 rooted!(&in(cx) let object = chunk.to_object());
426 rooted!(&in(cx) let mut bytes = UndefinedValue());
427 get_property_jsval(cx, object.handle(), c"value", bytes.handle_mut())
428 .expect("Chunk should have a value.");
429
430 let write_promise = self.writer.write(cx, global, bytes.handle());
432 self.pending_writes.borrow_mut().push_back(write_promise);
433 return true;
434 }
435 false
436 }
437
438 fn wait_on_pending_write(
442 &self,
443 cx: &mut CurrentRealm,
444 global: &GlobalScope,
445 promise: Rc<Promise>,
446 ) {
447 let handler = PromiseNativeHandler::new(
448 cx,
449 global,
450 Some(Box::new(self.clone())),
451 Some(Box::new(self.clone())),
452 );
453 promise.append_native_handler(cx, &handler);
454 }
455
456 fn check_and_propagate_errors_forward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
459 if self.shutting_down.get() {
462 return;
463 }
464
465 let source = self
467 .reader
468 .get_stream()
469 .expect("Reader should still have a stream");
470 if source.is_errored() {
471 rooted!(&in(cx) let mut source_error = UndefinedValue());
472 source.get_stored_error(source_error.handle_mut());
473 self.set_shutdown_error(source_error.handle());
474
475 if !self.prevent_abort {
477 self.shutdown(cx, global, Some(ShutdownAction::WritableStreamAbort))
480 } else {
481 self.shutdown(cx, global, None);
483 }
484 }
485 }
486
487 fn check_and_propagate_errors_backward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
490 if self.shutting_down.get() {
493 return;
494 }
495
496 let dest = self
498 .writer
499 .get_stream()
500 .expect("Writer should still have a stream");
501 if dest.is_errored() {
502 rooted!(&in(cx) let mut dest_error = UndefinedValue());
503 dest.get_stored_error(dest_error.handle_mut());
504 self.set_shutdown_error(dest_error.handle());
505
506 if !self.prevent_cancel {
508 self.shutdown(cx, global, Some(ShutdownAction::ReadableStreamCancel))
511 } else {
512 self.shutdown(cx, global, None);
514 }
515 }
516 }
517
518 fn check_and_propagate_closing_forward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
521 if self.shutting_down.get() {
524 return;
525 }
526
527 let source = self
529 .reader
530 .get_stream()
531 .expect("Reader should still have a stream");
532 if source.is_closed() {
533 if !self.prevent_close {
535 self.shutdown(
538 cx,
539 global,
540 Some(ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation),
541 )
542 } else {
543 self.shutdown(cx, global, None);
545 }
546 }
547 }
548
549 fn check_and_propagate_closing_backward(&self, cx: &mut CurrentRealm, global: &GlobalScope) {
552 if self.shutting_down.get() {
555 return;
556 }
557
558 let dest = self
561 .writer
562 .get_stream()
563 .expect("Writer should still have a stream");
564 if dest.close_queued_or_in_flight() || dest.is_closed() {
565 rooted!(&in(cx) let mut dest_closed = UndefinedValue());
570 let error =
571 Error::Type(c"Destination is closed or has closed queued or in flight".to_owned());
572 error.to_jsval(cx, global, dest_closed.handle_mut());
573 self.set_shutdown_error(dest_closed.handle());
574
575 if !self.prevent_cancel {
577 self.shutdown(cx, global, Some(ShutdownAction::ReadableStreamCancel))
580 } else {
581 self.shutdown(cx, global, None);
583 }
584 }
585 }
586
587 fn shutdown(
591 &self,
592 cx: &mut CurrentRealm,
593 global: &GlobalScope,
594 action: Option<ShutdownAction>,
595 ) {
596 if !self.shutting_down.replace(true) {
599 let dest = self.writer.get_stream().expect("Stream must be set");
600 if dest.is_writable() && !dest.close_queued_or_in_flight() {
603 if let Some(write) = self.pending_writes.borrow_mut().front() {
609 *self.state.borrow_mut() = PipeToState::ShuttingDownWithPendingWrites(action);
610 self.wait_on_pending_write(cx, global, write.clone());
611 return;
612 }
613 }
614
615 if let Some(action) = action {
617 self.perform_action(cx, global, action);
619 } else {
620 self.finalize(cx, global);
622 }
623 }
624 }
625
626 fn perform_action(&self, cx: &mut CurrentRealm, global: &GlobalScope, action: ShutdownAction) {
629 rooted!(&in(cx) let mut error = UndefinedValue());
630 if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
631 error.set(shutdown_error.get());
632 }
633
634 *self.state.borrow_mut() = PipeToState::ShuttingDownPendingAction;
635
636 let promise = match action {
638 ShutdownAction::WritableStreamAbort => {
639 let dest = self.writer.get_stream().expect("Stream must be set");
640 dest.abort(cx, global, error.handle())
641 },
642 ShutdownAction::ReadableStreamCancel => {
643 let source = self
644 .reader
645 .get_stream()
646 .expect("Reader should have a stream.");
647 source.cancel(cx, global, error.handle())
648 },
649 ShutdownAction::WritableStreamDefaultWriterCloseWithErrorPropagation => {
650 self.writer.close_with_error_propagation(cx, global)
651 },
652 ShutdownAction::Abort => {
653 rooted!(&in(cx) let mut error = UndefinedValue());
658 error.set(self.abort_reason.get());
659
660 let mut actions = vec![];
662
663 if !self.prevent_abort {
665 let dest = self
666 .writer
667 .get_stream()
668 .expect("Destination stream must be set");
669
670 let promise = if dest.is_writable() {
672 dest.abort(cx, global, error.handle())
674 } else {
675 Promise::new_resolved(cx, global, ())
677 };
678 actions.push(promise);
679 }
680
681 if !self.prevent_cancel {
683 let source = self.reader.get_stream().expect("Source stream must be set");
684
685 let promise = if source.is_readable() {
687 source.cancel(cx, global, error.handle())
689 } else {
690 Promise::new_resolved(cx, global, ())
692 };
693 actions.push(promise);
694 }
695
696 wait_for_all_promise(cx, global, actions)
700 },
701 };
702
703 let handler = PromiseNativeHandler::new(
706 cx,
707 global,
708 Some(Box::new(self.clone())),
709 Some(Box::new(self.clone())),
710 );
711 promise.append_native_handler(cx, &handler);
712 *self.shutdown_action_promise.borrow_mut() = Some(promise);
713 }
714
715 fn finalize(&self, cx: &mut JSContext, global: &GlobalScope) {
717 *self.state.borrow_mut() = PipeToState::Finalized;
718
719 self.writer.release(cx, global);
721
722 self.reader
728 .release(cx)
729 .expect("Releasing the reader should not fail");
730
731 if let Some(shutdown_error) = self.shutdown_error.borrow().as_ref() {
737 rooted!(&in(cx) let mut error = UndefinedValue());
738 error.set(shutdown_error.get());
739 self.result_promise.reject_native(cx, &error.handle());
741 } else {
742 self.result_promise.resolve_native(cx, &());
744 }
745 }
746}
747
748#[derive(Clone, JSTraceable, MallocSizeOf)]
751struct SourceCancelPromiseFulfillmentHandler {
752 #[conditional_malloc_size_of]
753 result: Rc<Promise>,
754}
755
756impl Callback for SourceCancelPromiseFulfillmentHandler {
757 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
761 self.result.resolve_native(cx, &());
762 }
763}
764
765#[derive(Clone, JSTraceable, MallocSizeOf)]
768struct SourceCancelPromiseRejectionHandler {
769 #[conditional_malloc_size_of]
770 result: Rc<Promise>,
771}
772
773impl Callback for SourceCancelPromiseRejectionHandler {
774 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
778 self.result.reject_native(cx, &v);
779 }
780}
781
782#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf, PartialEq)]
784pub(crate) enum ReadableStreamState {
785 #[default]
786 Readable,
787 Closed,
788 Errored,
789}
790
791#[derive(JSTraceable, MallocSizeOf)]
793#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
794pub(crate) enum ControllerType {
795 Byte(MutNullableDom<ReadableByteStreamController>),
797 Default(MutNullableDom<ReadableStreamDefaultController>),
799}
800
801#[derive(JSTraceable, MallocSizeOf)]
803#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
804pub(crate) enum ReaderType {
805 #[allow(clippy::upper_case_acronyms)]
807 BYOB(MutNullableDom<ReadableStreamBYOBReader>),
808 Default(MutNullableDom<ReadableStreamDefaultReader>),
810}
811
812impl Eq for ReaderType {}
813impl PartialEq for ReaderType {
814 fn eq(&self, other: &Self) -> bool {
815 matches!(
816 (self, other),
817 (ReaderType::BYOB(_), ReaderType::BYOB(_)) |
818 (ReaderType::Default(_), ReaderType::Default(_))
819 )
820 }
821}
822
823pub(crate) fn create_readable_stream(
825 cx: &mut JSContext,
826 global: &GlobalScope,
827 underlying_source_type: UnderlyingSourceType,
828 queuing_strategy: Option<Rc<QueuingStrategySize>>,
829 high_water_mark: Option<f64>,
830) -> DomRoot<ReadableStream> {
831 let high_water_mark = high_water_mark.unwrap_or(1.0);
833
834 let size_algorithm =
836 queuing_strategy.unwrap_or(extract_size_algorithm(cx, &QueuingStrategy::empty()));
837
838 assert!(high_water_mark >= 0.0);
840
841 let stream = ReadableStream::new_with_proto(cx, global, None);
844
845 let controller = ReadableStreamDefaultController::new(
847 cx,
848 global,
849 underlying_source_type,
850 high_water_mark,
851 size_algorithm,
852 );
853
854 controller
857 .setup(cx, stream.clone())
858 .expect("Setup of default controller cannot fail");
859
860 stream
862}
863
864fn readable_byte_stream_tee(
866 cx: &mut JSContext,
867 global: &GlobalScope,
868 underlying_source_type: UnderlyingSourceType,
869) -> DomRoot<ReadableStream> {
870 let tee_stream = ReadableStream::new_with_proto(cx, global, None);
873
874 let controller = ReadableByteStreamController::new(cx, underlying_source_type, 0.0, global);
876
877 controller
879 .setup(cx, global, tee_stream.clone())
880 .expect("Setup of byte stream controller cannot fail");
881
882 tee_stream
884}
885
886#[dom_struct]
888pub(crate) struct ReadableStream {
889 reflector_: Reflector,
890
891 controller: RefCell<Option<ControllerType>>,
895
896 #[ignore_malloc_size_of = "mozjs"]
898 stored_error: Heap<JSVal>,
899
900 disturbed: Cell<bool>,
902
903 reader: RefCell<Option<ReaderType>>,
905
906 state: Cell<ReadableStreamState>,
908}
909
910impl ReadableStream {
911 fn new_inherited() -> ReadableStream {
913 ReadableStream {
914 reflector_: Reflector::new(),
915 controller: RefCell::new(None),
916 stored_error: Heap::default(),
917 disturbed: Default::default(),
918 reader: RefCell::new(None),
919 state: Cell::new(Default::default()),
920 }
921 }
922
923 pub(crate) fn new_with_proto(
924 cx: &mut JSContext,
925 global: &GlobalScope,
926 proto: Option<SafeHandleObject>,
927 ) -> DomRoot<ReadableStream> {
928 reflect_dom_object_with_proto_and_cx(
929 Box::new(ReadableStream::new_inherited()),
930 global,
931 proto,
932 cx,
933 )
934 }
935
936 pub(crate) fn set_default_controller(&self, controller: &ReadableStreamDefaultController) {
939 *self.controller.borrow_mut() = Some(ControllerType::Default(MutNullableDom::new(Some(
940 controller,
941 ))));
942 }
943
944 pub(crate) fn set_byte_controller(&self, controller: &ReadableByteStreamController) {
947 *self.controller.borrow_mut() =
948 Some(ControllerType::Byte(MutNullableDom::new(Some(controller))));
949 }
950
951 pub(crate) fn assert_no_controller(&self) {
954 let has_no_controller = self.controller.borrow().is_none();
955 assert!(has_no_controller);
956 }
957
958 pub(crate) fn new_from_bytes(
960 cx: &mut JSContext,
961 global: &GlobalScope,
962 bytes: Vec<u8>,
963 ) -> Fallible<DomRoot<ReadableStream>> {
964 let stream = ReadableStream::new_with_external_underlying_source(
965 cx,
966 global,
967 UnderlyingSourceType::Memory(bytes.len()),
968 )?;
969 stream.enqueue_native(cx, bytes);
970 stream.controller_close_native(cx);
971 Ok(stream)
972 }
973
974 pub(crate) fn new_empty(
977 cx: &mut JSContext,
978 global: &GlobalScope,
979 ) -> Fallible<DomRoot<ReadableStream>> {
980 let empty_stream = ReadableStream::new_with_external_underlying_source(
983 cx,
984 global,
985 UnderlyingSourceType::Memory(0),
986 )?;
987 empty_stream.controller_close_native(cx);
989 Ok(empty_stream)
991 }
992
993 pub(crate) fn new_from_bytes_with_byte_reading_support(
995 cx: &mut JSContext,
996 global: &GlobalScope,
997 bytes: Vec<u8>,
998 ) -> Fallible<DomRoot<ReadableStream>> {
999 let stream = ReadableStream::new_with_external_underlying_byte_source(
1000 cx,
1001 global,
1002 UnderlyingSourceType::Memory(bytes.len()),
1003 )?;
1004 stream.enqueue_native(cx, bytes);
1005 stream.controller_close_native(cx);
1006 Ok(stream)
1007 }
1008
1009 pub(crate) fn new_with_external_underlying_source(
1012 cx: &mut JSContext,
1013 global: &GlobalScope,
1014 source: UnderlyingSourceType,
1015 ) -> Fallible<DomRoot<ReadableStream>> {
1016 assert!(source.is_native());
1017 let stream = ReadableStream::new_with_proto(cx, global, None);
1018 let strategy_size = extract_size_algorithm(cx, &QueuingStrategy::empty());
1019 let controller =
1020 ReadableStreamDefaultController::new(cx, global, source, 1.0, strategy_size);
1021 controller.setup(cx, stream.clone())?;
1022 Ok(stream)
1023 }
1024
1025 pub(crate) fn new_with_external_underlying_byte_source(
1027 cx: &mut JSContext,
1028 global: &GlobalScope,
1029 source: UnderlyingSourceType,
1030 ) -> Fallible<DomRoot<ReadableStream>> {
1031 assert!(source.is_native());
1032 let stream = ReadableStream::new_with_proto(cx, global, None);
1033 let controller = ReadableByteStreamController::new(cx, source, 0.0, global);
1034 controller.setup(cx, global, stream.clone())?;
1035 Ok(stream)
1036 }
1037
1038 pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
1040 match self.controller.borrow().as_ref() {
1041 Some(ControllerType::Default(controller)) => {
1042 let controller = controller
1043 .get()
1044 .ok_or_else(|| Error::Type(c"Stream should have controller.".to_owned()))?;
1045 controller.perform_release_steps()
1046 },
1047 Some(ControllerType::Byte(controller)) => {
1048 let controller = controller
1049 .get()
1050 .ok_or_else(|| Error::Type(c"Stream should have controller.".to_owned()))?;
1051 controller.perform_release_steps()
1052 },
1053 None => Err(Error::Type(c"Stream should have controller.".to_owned())),
1054 }
1055 }
1056
1057 pub(crate) fn perform_pull_steps(&self, cx: &mut JSContext, read_request: &ReadRequest) {
1061 match self.controller.borrow().as_ref() {
1062 Some(ControllerType::Default(controller)) => controller
1063 .get()
1064 .expect("Stream should have controller.")
1065 .perform_pull_steps(cx, read_request),
1066 Some(ControllerType::Byte(controller)) => controller
1067 .get()
1068 .expect("Stream should have controller.")
1069 .perform_pull_steps(cx, read_request),
1070 None => {
1071 unreachable!("Stream does not have a controller.");
1072 },
1073 }
1074 }
1075
1076 pub(crate) fn perform_pull_into(
1080 &self,
1081 cx: &mut JSContext,
1082 read_into_request: &ReadIntoRequest,
1083 view: &HeapBufferSource<ArrayBufferViewU8>,
1084 min: u64,
1085 ) {
1086 match self.controller.borrow().as_ref() {
1087 Some(ControllerType::Byte(controller)) => controller
1088 .get()
1089 .expect("Stream should have controller.")
1090 .perform_pull_into(cx, read_into_request, view, min),
1091 _ => {
1092 unreachable!(
1093 "Pulling a chunk from a stream with a default controller using a BYOB reader"
1094 )
1095 },
1096 }
1097 }
1098
1099 pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
1101 match self.reader.borrow().as_ref() {
1102 Some(ReaderType::Default(reader)) => {
1103 let Some(reader) = reader.get() else {
1104 panic!("Attempt to add a read request without having first acquired a reader.");
1105 };
1106
1107 assert!(self.is_readable());
1109
1110 reader.add_read_request(read_request);
1112 },
1113 _ => {
1114 unreachable!("Adding a read request can only be done on a default reader.")
1115 },
1116 }
1117 }
1118
1119 pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
1121 match self.reader.borrow().as_ref() {
1122 Some(ReaderType::BYOB(reader)) => {
1124 let Some(reader) = reader.get() else {
1125 unreachable!(
1126 "Attempt to add a read into request without having first acquired a reader."
1127 );
1128 };
1129
1130 assert!(self.is_readable() || self.is_closed());
1132
1133 reader.add_read_into_request(read_request);
1135 },
1136 _ => {
1137 unreachable!("Adding a read into request can only be done on a BYOB reader.")
1138 },
1139 }
1140 }
1141
1142 pub(crate) fn enqueue_native(&self, cx: &mut JSContext, bytes: Vec<u8>) {
1144 match self.controller.borrow().as_ref() {
1145 Some(ControllerType::Default(controller)) => controller
1146 .get()
1147 .expect("Stream should have controller.")
1148 .enqueue_native(cx, bytes),
1149 Some(ControllerType::Byte(controller)) => {
1150 if bytes.is_empty() {
1151 return;
1152 }
1153
1154 let controller = controller.get().expect("Stream should have controller.");
1155 rooted!(&in(cx) let mut chunk_object = ptr::null_mut::<JSObject>());
1156 create_buffer_source::<Uint8>(cx, &bytes, chunk_object.handle_mut())
1157 .expect("failed to create buffer source for native byte chunk.");
1158
1159 let chunk = RootedTraceableBox::new(HeapBufferSource::<ArrayBufferViewU8>::new(
1160 BufferSource::ArrayBufferView(Heap::boxed(*chunk_object.handle())),
1161 ));
1162 controller
1163 .enqueue(cx, chunk)
1164 .expect("Enqueuing a native byte chunk should not fail.");
1165 },
1166 _ => {
1167 unreachable!("Enqueueing chunk to a stream from Rust without a controller");
1168 },
1169 }
1170 }
1171
1172 pub(crate) fn error(&self, cx: &mut JSContext, e: SafeHandleValue) {
1174 assert!(self.is_readable());
1176
1177 self.state.set(ReadableStreamState::Errored);
1179
1180 self.stored_error.set(e.get());
1182
1183 let default_reader = {
1186 let reader_ref = self.reader.borrow();
1187 match reader_ref.as_ref() {
1188 Some(ReaderType::Default(reader)) => reader.get(),
1189 _ => None,
1190 }
1191 };
1192
1193 if let Some(reader) = default_reader {
1194 reader.error(cx, e);
1196 return;
1197 }
1198
1199 let byob_reader = {
1200 let reader_ref = self.reader.borrow();
1201 match reader_ref.as_ref() {
1202 Some(ReaderType::BYOB(reader)) => reader.get(),
1203 _ => None,
1204 }
1205 };
1206
1207 if let Some(reader) = byob_reader {
1208 reader.error_read_into_requests(cx, e);
1210 }
1211
1212 }
1214
1215 pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
1217 handle_mut.set(self.stored_error.get());
1218 }
1219
1220 pub(crate) fn error_native(&self, cx: &mut JSContext, error: Error) {
1223 rooted!(&in(cx) let mut error_val = UndefinedValue());
1224 error.to_jsval(cx, &self.global(), error_val.handle_mut());
1225 self.error(cx, error_val.handle());
1226 }
1227
1228 pub(crate) fn controller_close_native(&self, cx: &mut JSContext) {
1231 match self.controller.borrow().as_ref() {
1232 Some(ControllerType::Default(controller)) => {
1233 let _ = controller
1234 .get()
1235 .expect("Stream should have controller.")
1236 .Close(cx);
1237 },
1238 Some(ControllerType::Byte(controller)) => {
1239 let _ = controller
1240 .get()
1241 .expect("Stream should have controller.")
1242 .close(cx);
1243 },
1244 _ => {
1245 unreachable!("Native closing requires a stream controller.")
1246 },
1247 }
1248 }
1249
1250 pub(crate) fn in_memory(&self) -> bool {
1253 match self.controller.borrow().as_ref() {
1254 Some(ControllerType::Default(controller)) => controller
1255 .get()
1256 .expect("Stream should have controller.")
1257 .in_memory(),
1258 Some(ControllerType::Byte(controller)) => controller
1259 .get()
1260 .expect("Stream should have controller.")
1261 .in_memory(),
1262 _ => unreachable!("Checking if source is in memory for a stream without a controller"),
1263 }
1264 }
1265
1266 pub(crate) fn get_in_memory_bytes(&self, cx: &mut JSContext) -> Option<GenericSharedMemory> {
1269 match self.controller.borrow().as_ref() {
1270 Some(ControllerType::Default(controller)) => controller
1271 .get()
1272 .expect("Stream should have controller.")
1273 .get_in_memory_bytes()
1274 .map(GenericSharedMemory::from_vec),
1275 Some(ControllerType::Byte(controller)) => controller
1276 .get()
1277 .expect("Stream should have controller.")
1278 .get_in_memory_bytes(cx)
1279 .map(GenericSharedMemory::from_vec),
1280 _ => unreachable!("Getting in-memory bytes for a stream without a controller"),
1281 }
1282 }
1283
1284 pub(crate) fn acquire_default_reader(
1289 &self,
1290 cx: &mut JSContext,
1291 ) -> Fallible<DomRoot<ReadableStreamDefaultReader>> {
1292 let reader = ReadableStreamDefaultReader::new(cx, &self.global());
1294
1295 reader.set_up(cx, self, &self.global())?;
1297
1298 Ok(reader)
1300 }
1301
1302 pub(crate) fn acquire_byob_reader(
1304 &self,
1305 cx: &mut JSContext,
1306 ) -> Fallible<DomRoot<ReadableStreamBYOBReader>> {
1307 let reader = ReadableStreamBYOBReader::new(cx, &self.global());
1309 reader.set_up(cx, self, &self.global())?;
1311
1312 Ok(reader)
1314 }
1315
1316 pub(crate) fn get_default_controller(&self) -> DomRoot<ReadableStreamDefaultController> {
1317 match self.controller.borrow().as_ref() {
1318 Some(ControllerType::Default(controller)) => {
1319 controller.get().expect("Stream should have controller.")
1320 },
1321 _ => {
1322 unreachable!(
1323 "Getting default controller for a stream with a non-default controller"
1324 )
1325 },
1326 }
1327 }
1328
1329 pub(crate) fn get_byte_controller(&self) -> DomRoot<ReadableByteStreamController> {
1330 match self.controller.borrow().as_ref() {
1331 Some(ControllerType::Byte(controller)) => {
1332 controller.get().expect("Stream should have controller.")
1333 },
1334 _ => {
1335 unreachable!("Getting byte controller for a stream with a non-byte controller")
1336 },
1337 }
1338 }
1339
1340 pub(crate) fn get_default_reader(&self) -> DomRoot<ReadableStreamDefaultReader> {
1341 match self.reader.borrow().as_ref() {
1342 Some(ReaderType::Default(reader)) => reader.get().expect("Stream should have reader."),
1343 _ => {
1344 unreachable!("Getting default reader for a stream with a non-default reader")
1345 },
1346 }
1347 }
1348
1349 pub(crate) fn read_a_chunk(&self, cx: &mut JSContext) -> Rc<Promise> {
1355 match self.reader.borrow().as_ref() {
1356 Some(ReaderType::Default(reader)) => {
1357 let Some(reader) = reader.get() else {
1358 unreachable!(
1359 "Attempt to read stream chunk without having first acquired a reader."
1360 );
1361 };
1362 reader.Read(cx)
1363 },
1364 _ => {
1365 unreachable!("Native reading of a chunk can only be done with a default reader.")
1366 },
1367 }
1368 }
1369
1370 pub(crate) fn stop_reading(&self, cx: &mut JSContext) {
1375 let reader_ref = self.reader.borrow();
1376
1377 match reader_ref.as_ref() {
1378 Some(ReaderType::Default(reader)) => {
1379 let Some(reader) = reader.get() else {
1380 unreachable!("Attempt to stop reading without having first acquired a reader.");
1381 };
1382
1383 drop(reader_ref);
1384 reader.release(cx).expect("Reader release cannot fail.");
1385 },
1386 _ => {
1387 unreachable!("Native stop reading can only be done with a default reader.")
1388 },
1389 }
1390 }
1391
1392 pub(crate) fn is_locked(&self) -> bool {
1394 match self.reader.borrow().as_ref() {
1395 Some(ReaderType::Default(reader)) => reader.get().is_some(),
1396 Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1397 None => false,
1398 }
1399 }
1400
1401 pub(crate) fn is_disturbed(&self) -> bool {
1402 self.disturbed.get()
1403 }
1404
1405 pub(crate) fn set_is_disturbed(&self, disturbed: bool) {
1406 self.disturbed.set(disturbed);
1407 }
1408
1409 pub(crate) fn is_closed(&self) -> bool {
1410 self.state.get() == ReadableStreamState::Closed
1411 }
1412
1413 pub(crate) fn is_errored(&self) -> bool {
1414 self.state.get() == ReadableStreamState::Errored
1415 }
1416
1417 pub(crate) fn is_readable(&self) -> bool {
1418 self.state.get() == ReadableStreamState::Readable
1419 }
1420
1421 pub(crate) fn has_default_reader(&self) -> bool {
1422 match self.reader.borrow().as_ref() {
1423 Some(ReaderType::Default(reader)) => reader.get().is_some(),
1424 _ => false,
1425 }
1426 }
1427
1428 pub(crate) fn has_byob_reader(&self) -> bool {
1429 match self.reader.borrow().as_ref() {
1430 Some(ReaderType::BYOB(reader)) => reader.get().is_some(),
1431 _ => false,
1432 }
1433 }
1434
1435 pub(crate) fn has_byte_controller(&self) -> bool {
1436 match self.controller.borrow().as_ref() {
1437 Some(ControllerType::Byte(controller)) => controller.get().is_some(),
1438 _ => false,
1439 }
1440 }
1441
1442 pub(crate) fn get_num_read_requests(&self) -> usize {
1444 match self.reader.borrow().as_ref() {
1445 Some(ReaderType::Default(reader)) => {
1446 let reader = reader
1447 .get()
1448 .expect("Stream must have a reader when getting the number of read requests.");
1449 reader.get_num_read_requests()
1450 },
1451 _ => unreachable!(
1452 "Stream must have a default reader when get num read requests is called into."
1453 ),
1454 }
1455 }
1456
1457 pub(crate) fn get_num_read_into_requests(&self) -> usize {
1459 assert!(self.has_byob_reader());
1460
1461 match self.reader.borrow().as_ref() {
1462 Some(ReaderType::BYOB(reader)) => {
1463 let Some(reader) = reader.get() else {
1464 unreachable!(
1465 "Stream must have a reader when get num read into requests is called into."
1466 );
1467 };
1468 reader.get_num_read_into_requests()
1469 },
1470 _ => {
1471 unreachable!(
1472 "Stream must have a BYOB reader when get num read into requests is called into."
1473 );
1474 },
1475 }
1476 }
1477
1478 pub(crate) fn fulfill_read_request(
1480 &self,
1481 cx: &mut JSContext,
1482 chunk: SafeHandleValue,
1483 done: bool,
1484 ) {
1485 assert!(self.has_default_reader());
1487
1488 match self.reader.borrow().as_ref() {
1489 Some(ReaderType::Default(reader)) => {
1490 let reader = reader
1492 .get()
1493 .expect("Stream must have a reader when a read request is fulfilled.");
1494 assert_ne!(reader.get_num_read_requests(), 0);
1496 let request = reader.remove_read_request();
1499
1500 if done {
1501 request.close_steps(cx);
1503 } else {
1504 let result = RootedTraceableBox::new(Heap::default());
1506 result.set(*chunk);
1507 request.chunk_steps(cx, result, &self.global());
1508 }
1509 },
1510 _ => {
1511 unreachable!(
1512 "Stream must have a default reader when fulfill read requests is called into."
1513 );
1514 },
1515 }
1516 }
1517
1518 pub(crate) fn fulfill_read_into_request(
1520 &self,
1521 cx: &mut JSContext,
1522 chunk: SafeHandleValue,
1523 done: bool,
1524 ) {
1525 assert!(self.has_byob_reader());
1527
1528 match self.reader.borrow().as_ref() {
1530 Some(ReaderType::BYOB(reader)) => {
1531 let Some(reader) = reader.get() else {
1532 unreachable!(
1533 "Stream must have a reader when a read into request is fulfilled."
1534 );
1535 };
1536
1537 assert!(reader.get_num_read_into_requests() > 0);
1539
1540 let read_into_request = reader.remove_read_into_request();
1543
1544 let result = RootedTraceableBox::new(Heap::default());
1546 if done {
1547 result.set(*chunk);
1548 read_into_request.close_steps(cx, Some(result));
1549 } else {
1550 result.set(*chunk);
1552 read_into_request.chunk_steps(cx, result);
1553 }
1554 },
1555 _ => {
1556 unreachable!(
1557 "Stream must have a BYOB reader when fulfill read into requests is called into."
1558 );
1559 },
1560 };
1561 }
1562
1563 pub(crate) fn close(&self, cx: &mut JSContext) {
1565 assert!(self.is_readable());
1567 self.state.set(ReadableStreamState::Closed);
1569 let default_reader = {
1575 let reader_ref = self.reader.borrow();
1576 match reader_ref.as_ref() {
1577 Some(ReaderType::Default(reader)) => reader.get(),
1578 _ => None,
1579 }
1580 };
1581
1582 if let Some(reader) = default_reader {
1583 reader.close(cx);
1585 return;
1586 }
1587
1588 let byob_reader = {
1590 let reader_ref = self.reader.borrow();
1591 match reader_ref.as_ref() {
1592 Some(ReaderType::BYOB(reader)) => reader.get(),
1593 _ => None,
1594 }
1595 };
1596
1597 if let Some(reader) = byob_reader {
1598 reader.close(cx);
1600 }
1601
1602 }
1604
1605 pub(crate) fn cancel(
1607 &self,
1608 cx: &mut JSContext,
1609 global: &GlobalScope,
1610 reason: SafeHandleValue,
1611 ) -> Rc<Promise> {
1612 self.disturbed.set(true);
1614
1615 if self.is_closed() {
1617 return Promise::new_resolved(cx, global, ());
1618 }
1619 if self.is_errored() {
1621 let promise = Promise::new(cx, global);
1622 rooted!(&in(cx) let mut rval = UndefinedValue());
1623 self.stored_error.safe_to_jsval(cx, rval.handle_mut());
1624 promise.reject_native(cx, &rval.handle());
1625 return promise;
1626 }
1627 self.close(cx);
1629
1630 let byob_reader = {
1632 let reader_ref = self.reader.borrow();
1633 match reader_ref.as_ref() {
1634 Some(ReaderType::BYOB(reader)) => reader.get(),
1635 _ => None,
1636 }
1637 };
1638
1639 if let Some(reader) = byob_reader {
1640 reader.cancel(cx);
1642 }
1643
1644 let source_cancel_promise = match self.controller.borrow().as_ref() {
1647 Some(ControllerType::Default(controller)) => controller
1648 .get()
1649 .expect("Stream should have controller.")
1650 .perform_cancel_steps(cx, global, reason),
1651 Some(ControllerType::Byte(controller)) => controller
1652 .get()
1653 .expect("Stream should have controller.")
1654 .perform_cancel_steps(cx, global, reason),
1655 None => {
1656 panic!("Stream does not have a controller.");
1657 },
1658 };
1659
1660 let global = self.global();
1663 let result_promise = Promise::new(cx, &global);
1664 let fulfillment_handler = Box::new(SourceCancelPromiseFulfillmentHandler {
1665 result: result_promise.clone(),
1666 });
1667 let rejection_handler = Box::new(SourceCancelPromiseRejectionHandler {
1668 result: result_promise.clone(),
1669 });
1670 let handler = PromiseNativeHandler::new(
1671 cx,
1672 &global,
1673 Some(fulfillment_handler),
1674 Some(rejection_handler),
1675 );
1676 let mut realm = enter_auto_realm(cx, &*global);
1677 let cx = &mut realm.current_realm();
1678 source_cancel_promise.append_native_handler(cx, &handler);
1679
1680 result_promise
1683 }
1684
1685 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1686 pub(crate) fn set_reader(&self, new_reader: Option<ReaderType>) {
1687 *self.reader.borrow_mut() = new_reader;
1688 }
1689
1690 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1691 fn byte_tee(&self, cx: &mut JSContext) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1693 let reader = self.acquire_default_reader(cx)?;
1698 let reader = Rc::new(RefCell::new(ReaderType::Default(MutNullableDom::new(
1699 Some(&reader),
1700 ))));
1701
1702 let reading = Rc::new(Cell::new(false));
1704
1705 let read_again_for_branch_1 = Rc::new(Cell::new(false));
1707
1708 let read_again_for_branch_2 = Rc::new(Cell::new(false));
1710
1711 let canceled_1 = Rc::new(Cell::new(false));
1713
1714 let canceled_2 = Rc::new(Cell::new(false));
1716
1717 let reason_1 = Rc::new(Heap::default());
1719
1720 let reason_2 = Rc::new(Heap::default());
1722
1723 let cancel_promise = Promise::new(cx, &self.global());
1725 let reader_version = Rc::new(Cell::new(0));
1726
1727 let byte_tee_source_1 = ByteTeeUnderlyingSource::new(
1728 cx,
1729 reader.clone(),
1730 self,
1731 reading.clone(),
1732 read_again_for_branch_1.clone(),
1733 read_again_for_branch_2.clone(),
1734 canceled_1.clone(),
1735 canceled_2.clone(),
1736 reason_1.clone(),
1737 reason_2.clone(),
1738 cancel_promise.clone(),
1739 reader_version.clone(),
1740 ByteTeeCancelAlgorithm::Cancel1Algorithm,
1741 ByteTeePullAlgorithm::Pull1Algorithm,
1742 );
1743
1744 let byte_tee_source_2 = ByteTeeUnderlyingSource::new(
1745 cx,
1746 reader.clone(),
1747 self,
1748 reading,
1749 read_again_for_branch_1,
1750 read_again_for_branch_2,
1751 canceled_1,
1752 canceled_2,
1753 reason_1,
1754 reason_2,
1755 cancel_promise,
1756 reader_version,
1757 ByteTeeCancelAlgorithm::Cancel2Algorithm,
1758 ByteTeePullAlgorithm::Pull2Algorithm,
1759 );
1760
1761 let branch_1 = readable_byte_stream_tee(
1763 cx,
1764 &self.global(),
1765 UnderlyingSourceType::TeeByte(&byte_tee_source_1),
1766 );
1767 byte_tee_source_1.set_branch_1(&branch_1);
1768 byte_tee_source_2.set_branch_1(&branch_1);
1769
1770 let branch_2 = readable_byte_stream_tee(
1772 cx,
1773 &self.global(),
1774 UnderlyingSourceType::TeeByte(&byte_tee_source_2),
1775 );
1776 byte_tee_source_1.set_branch_2(&branch_2);
1777 byte_tee_source_2.set_branch_2(&branch_2);
1778
1779 byte_tee_source_1.forward_reader_error(cx, reader.clone());
1781 byte_tee_source_2.forward_reader_error(cx, reader);
1782
1783 Ok(vec![branch_1, branch_2])
1785 }
1786
1787 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
1789 fn default_tee(
1790 &self,
1791 cx: &mut JSContext,
1792 clone_for_branch_2: bool,
1793 ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1794 let clone_for_branch_2 = Rc::new(Cell::new(clone_for_branch_2));
1798
1799 let reader = self.acquire_default_reader(cx)?;
1801
1802 let reading = Rc::new(Cell::new(false));
1804 let read_again = Rc::new(Cell::new(false));
1806 let canceled_1 = Rc::new(Cell::new(false));
1808 let canceled_2 = Rc::new(Cell::new(false));
1810
1811 let reason_1 = Rc::new(Heap::default());
1813 let reason_2 = Rc::new(Heap::default());
1815 let cancel_promise = Promise::new(cx, &self.global());
1817
1818 let tee_source_1 = DefaultTeeUnderlyingSource::new(
1819 cx,
1820 &reader,
1821 self,
1822 reading.clone(),
1823 read_again.clone(),
1824 canceled_1.clone(),
1825 canceled_2.clone(),
1826 clone_for_branch_2.clone(),
1827 reason_1.clone(),
1828 reason_2.clone(),
1829 cancel_promise.clone(),
1830 DefaultTeeCancelAlgorithm::Cancel1Algorithm,
1831 );
1832
1833 let underlying_source_type_branch_1 = UnderlyingSourceType::Tee(&tee_source_1);
1834
1835 let tee_source_2 = DefaultTeeUnderlyingSource::new(
1836 cx,
1837 &reader,
1838 self,
1839 reading,
1840 read_again,
1841 canceled_1.clone(),
1842 canceled_2.clone(),
1843 clone_for_branch_2,
1844 reason_1,
1845 reason_2,
1846 cancel_promise.clone(),
1847 DefaultTeeCancelAlgorithm::Cancel2Algorithm,
1848 );
1849
1850 let underlying_source_type_branch_2 = UnderlyingSourceType::Tee(&tee_source_2);
1851
1852 let branch_1 = create_readable_stream(
1854 cx,
1855 &self.global(),
1856 underlying_source_type_branch_1,
1857 None,
1858 None,
1859 );
1860 tee_source_1.set_branch_1(&branch_1);
1861 tee_source_2.set_branch_1(&branch_1);
1862
1863 let branch_2 = create_readable_stream(
1865 cx,
1866 &self.global(),
1867 underlying_source_type_branch_2,
1868 None,
1869 None,
1870 );
1871 tee_source_1.set_branch_2(&branch_2);
1872 tee_source_2.set_branch_2(&branch_2);
1873
1874 reader.default_tee_append_native_handler_to_closed_promise(
1876 cx,
1877 &branch_1,
1878 &branch_2,
1879 canceled_1,
1880 canceled_2,
1881 cancel_promise,
1882 );
1883
1884 Ok(vec![branch_1, branch_2])
1886 }
1887
1888 #[allow(clippy::too_many_arguments)]
1890 pub(crate) fn pipe_to(
1891 &self,
1892 cx: &mut CurrentRealm,
1893 global: &GlobalScope,
1894 dest: &WritableStream,
1895 prevent_close: bool,
1896 prevent_abort: bool,
1897 prevent_cancel: bool,
1898 signal: Option<&AbortSignal>,
1899 ) -> Rc<Promise> {
1900 assert!(!self.is_locked());
1911
1912 assert!(!dest.is_locked());
1914
1915 let reader = self
1923 .acquire_default_reader(cx)
1924 .expect("Acquiring a default reader for pipe_to cannot fail");
1925
1926 let writer = dest
1928 .aquire_default_writer(cx, global)
1929 .expect("Acquiring a default writer for pipe_to cannot fail");
1930
1931 self.disturbed.set(true);
1933
1934 let promise = Promise::new(cx, global);
1939
1940 rooted!(&in(cx) let pipe_to = PipeTo {
1942 reader: Dom::from_ref(&reader),
1943 writer: Dom::from_ref(&writer),
1944 pending_writes: Default::default(),
1945 state: Default::default(),
1946 prevent_abort,
1947 prevent_cancel,
1948 prevent_close,
1949 shutting_down: Default::default(),
1950 abort_reason: Default::default(),
1951 shutdown_error: Default::default(),
1952 shutdown_action_promise: Default::default(),
1953 result_promise: promise.clone(),
1954 });
1955
1956 if let Some(signal) = signal {
1959 rooted!(&in(cx) let abort_algorithm = AbortAlgorithm::StreamPiping(pipe_to.clone()));
1962
1963 if signal.aborted() {
1965 signal.run_abort_algorithm(cx, global, &abort_algorithm);
1966 return promise;
1967 }
1968
1969 signal.add(&abort_algorithm);
1971 }
1972
1973 pipe_to.check_and_propagate_errors_forward(cx, global);
1975 pipe_to.check_and_propagate_errors_backward(cx, global);
1976 pipe_to.check_and_propagate_closing_forward(cx, global);
1977 pipe_to.check_and_propagate_closing_backward(cx, global);
1978
1979 if *pipe_to.state.borrow() == PipeToState::Starting {
1981 pipe_to.wait_for_writer_ready(cx, global);
1983 }
1984
1985 promise
1987 }
1988
1989 pub(crate) fn tee(
1991 &self,
1992 cx: &mut JSContext,
1993 clone_for_branch_2: bool,
1994 ) -> Fallible<Vec<DomRoot<ReadableStream>>> {
1995 match self.controller.borrow().as_ref() {
1999 Some(ControllerType::Default(_)) => {
2000 self.default_tee(cx, clone_for_branch_2)
2002 },
2003 Some(ControllerType::Byte(_)) => {
2004 self.byte_tee(cx)
2007 },
2008 None => {
2009 unreachable!("Stream should have a controller.");
2010 },
2011 }
2012 }
2013
2014 fn set_up_byte_controller(
2016 &self,
2017 cx: &mut JSContext,
2018 global: &GlobalScope,
2019 underlying_source_dict: JsUnderlyingSource,
2020 underlying_source_handle: SafeHandleObject,
2021 stream: DomRoot<ReadableStream>,
2022 strategy_hwm: f64,
2023 ) -> Fallible<()> {
2024 if let Some(0) = underlying_source_dict.autoAllocateChunkSize {
2040 return Err(Error::Type(c"autoAllocateChunkSize cannot be 0".to_owned()));
2041 }
2042
2043 let controller = ReadableByteStreamController::new(
2044 cx,
2045 UnderlyingSourceType::Js(underlying_source_dict),
2046 strategy_hwm,
2047 global,
2048 );
2049
2050 controller.set_underlying_source_this_object(underlying_source_handle);
2053
2054 controller.setup(cx, global, stream)
2057 }
2058
2059 pub(crate) fn setup_cross_realm_transform_readable(
2061 &self,
2062 cx: &mut JSContext,
2063 port: &MessagePort,
2064 ) {
2065 let port_id = port.message_port_id();
2066 let global = self.global();
2067
2068 let size_algorithm = extract_size_algorithm(cx, &QueuingStrategy::default());
2073
2074 let controller = ReadableStreamDefaultController::new(
2078 cx,
2079 &self.global(),
2080 UnderlyingSourceType::Transfer(port),
2081 0.,
2082 size_algorithm,
2083 );
2084
2085 rooted!(&in(cx) let cross_realm_transform_readable = CrossRealmTransformReadable {
2088 controller: Dom::from_ref(&controller),
2089 });
2090 global.note_cross_realm_transform_readable(&cross_realm_transform_readable, port_id);
2091
2092 port.Start(cx);
2094
2095 controller
2097 .setup(cx, DomRoot::from_ref(self))
2098 .expect("Setting up controller for transfer cannot fail.");
2099 }
2100}
2101
2102impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
2103 fn Constructor(
2105 cx: &mut JSContext,
2106 global: &GlobalScope,
2107 proto: Option<SafeHandleObject>,
2108 underlying_source: Option<*mut JSObject>,
2109 strategy: &QueuingStrategy,
2110 ) -> Fallible<DomRoot<Self>> {
2111 rooted!(&in(cx) let underlying_source_obj = underlying_source.unwrap_or(ptr::null_mut()));
2113 let underlying_source_dict = if !underlying_source_obj.is_null() {
2116 rooted!(&in(cx) let obj_val = ObjectValue(underlying_source_obj.get()));
2117 match JsUnderlyingSource::new(cx, obj_val.handle()) {
2118 Ok(ConversionResult::Success(val)) => val,
2119 Ok(ConversionResult::Failure(error)) => {
2120 return Err(Error::Type(error.into_owned()));
2121 },
2122 _ => {
2123 return Err(Error::JSFailed);
2124 },
2125 }
2126 } else {
2127 JsUnderlyingSource::empty()
2128 };
2129
2130 let stream = ReadableStream::new_with_proto(cx, global, proto);
2132
2133 if underlying_source_dict.type_.is_some() {
2134 if strategy.size.is_some() {
2136 return Err(Error::Range(
2137 c"size is not supported for byte streams".to_owned(),
2138 ));
2139 }
2140
2141 let strategy_hwm = extract_high_water_mark(strategy, 0.0)?;
2143
2144 stream.set_up_byte_controller(
2147 cx,
2148 global,
2149 underlying_source_dict,
2150 underlying_source_obj.handle(),
2151 stream.clone(),
2152 strategy_hwm,
2153 )?;
2154 } else {
2155 let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
2157
2158 let size_algorithm = extract_size_algorithm(cx, strategy);
2160
2161 let controller = ReadableStreamDefaultController::new(
2162 cx,
2163 global,
2164 UnderlyingSourceType::Js(underlying_source_dict),
2165 high_water_mark,
2166 size_algorithm,
2167 );
2168
2169 controller.set_underlying_source_this_object(underlying_source_obj.handle());
2172
2173 controller.setup(cx, stream.clone())?;
2175 };
2176
2177 Ok(stream)
2178 }
2179
2180 fn Locked(&self) -> bool {
2182 self.is_locked()
2183 }
2184
2185 fn Cancel(&self, cx: &mut JSContext, reason: SafeHandleValue) -> Rc<Promise> {
2187 let global = self.global();
2188 if self.is_locked() {
2189 let promise = Promise::new(cx, &global);
2192 promise.reject_error(cx, Error::Type(c"stream is locked".to_owned()));
2193 promise
2194 } else {
2195 self.cancel(cx, &global, reason)
2197 }
2198 }
2199
2200 fn GetReader(
2202 &self,
2203 cx: &mut JSContext,
2204 options: &ReadableStreamGetReaderOptions,
2205 ) -> Fallible<ReadableStreamReader> {
2206 if options.mode.is_none() {
2208 return Ok(ReadableStreamReader::ReadableStreamDefaultReader(
2209 self.acquire_default_reader(cx)?,
2210 ));
2211 }
2212 assert!(options.mode.unwrap() == ReadableStreamReaderMode::Byob);
2214
2215 Ok(ReadableStreamReader::ReadableStreamBYOBReader(
2217 self.acquire_byob_reader(cx)?,
2218 ))
2219 }
2220
2221 fn Tee(&self, cx: &mut JSContext) -> Fallible<Vec<DomRoot<ReadableStream>>> {
2223 self.tee(cx, false)
2225 }
2226
2227 fn PipeTo(
2229 &self,
2230 cx: &mut CurrentRealm,
2231 destination: &WritableStream,
2232 options: &StreamPipeOptions,
2233 ) -> Rc<Promise> {
2234 let global = self.global();
2235
2236 if self.is_locked() {
2238 let promise = Promise::new(cx, &global);
2240 promise.reject_error(cx, Error::Type(c"Source stream is locked".to_owned()));
2241 return promise;
2242 }
2243
2244 if destination.is_locked() {
2246 let promise = Promise::new(cx, &global);
2248 promise.reject_error(cx, Error::Type(c"Destination stream is locked".to_owned()));
2249 return promise;
2250 }
2251
2252 let signal = options.signal.as_deref();
2254
2255 self.pipe_to(
2257 cx,
2258 &global,
2259 destination,
2260 options.preventClose,
2261 options.preventAbort,
2262 options.preventCancel,
2263 signal,
2264 )
2265 }
2266
2267 fn PipeThrough(
2269 &self,
2270 cx: &mut CurrentRealm,
2271 transform: &ReadableWritablePair,
2272 options: &StreamPipeOptions,
2273 ) -> Fallible<DomRoot<ReadableStream>> {
2274 let global = self.global();
2275
2276 if self.is_locked() {
2278 return Err(Error::Type(c"Source stream is locked".to_owned()));
2279 }
2280
2281 if transform.writable.is_locked() {
2283 return Err(Error::Type(c"Destination stream is locked".to_owned()));
2284 }
2285
2286 let signal = options.signal.as_deref();
2288
2289 let promise = self.pipe_to(
2292 cx,
2293 &global,
2294 &transform.writable,
2295 options.preventClose,
2296 options.preventAbort,
2297 options.preventCancel,
2298 signal,
2299 );
2300
2301 promise.set_promise_is_handled(cx);
2303
2304 Ok(transform.readable.clone())
2306 }
2307}
2308
2309pub(crate) fn get_type_and_value_from_message(
2313 cx: &mut JSContext,
2314 data: SafeHandleValue,
2315 value: SafeMutableHandleValue,
2316) -> DOMString {
2317 assert!(data.is_object());
2323 rooted!(&in(cx) let data_object = data.to_object());
2324
2325 let type_ = get_property::<DOMString>(
2327 cx,
2328 data_object.handle(),
2329 c"type",
2330 StringificationBehavior::Empty,
2331 );
2332
2333 get_property_jsval(cx, data_object.handle(), c"value", value)
2335 .expect("Getting the value should not fail.");
2336
2337 type_
2339 .expect("The type of the message should be a string")
2340 .expect("Property should be present")
2341}
2342
2343impl js::gc::Rootable for CrossRealmTransformReadable {}
2344
2345#[derive(Clone, JSTraceable, MallocSizeOf)]
2349#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
2350pub(crate) struct CrossRealmTransformReadable {
2351 controller: Dom<ReadableStreamDefaultController>,
2353}
2354
2355impl CrossRealmTransformReadable {
2356 pub(crate) fn handle_message(
2359 &self,
2360 cx: &mut CurrentRealm,
2361 global: &GlobalScope,
2362 port: &MessagePort,
2363 message: SafeHandleValue,
2364 ) {
2365 rooted!(&in(cx) let mut value = UndefinedValue());
2366 let type_string = get_type_and_value_from_message(cx, message, value.handle_mut());
2367
2368 if type_string == "chunk" {
2370 self.controller
2372 .enqueue(cx, value.handle())
2373 .expect("Enqueing a chunk should not fail.");
2374 }
2375
2376 if type_string == "close" {
2378 self.controller.close(cx);
2380
2381 global.disentangle_port(cx, port);
2383 }
2384
2385 if type_string == "error" {
2387 self.controller.error(cx, value.handle());
2389
2390 global.disentangle_port(cx, port);
2392 }
2393 }
2394
2395 pub(crate) fn handle_error(
2398 &self,
2399 cx: &mut CurrentRealm,
2400 global: &GlobalScope,
2401 port: &MessagePort,
2402 ) {
2403 let error = DOMException::new(cx, global, DOMErrorName::DataCloneError);
2405 rooted!(&in(cx) let mut rooted_error = UndefinedValue());
2406 error.safe_to_jsval(cx, rooted_error.handle_mut());
2407
2408 port.cross_realm_transform_send_error(cx, rooted_error.handle());
2410
2411 self.controller.error(cx, rooted_error.handle());
2413
2414 global.disentangle_port(cx, port);
2416 }
2417}
2418
2419pub(crate) fn get_read_promise_done(
2421 cx: &mut JSContext,
2422 v: &SafeHandleValue,
2423) -> Result<bool, Error> {
2424 if !v.is_object() {
2425 return Err(Error::Type(c"Unknown format for done property.".to_owned()));
2426 }
2427
2428 rooted!(&in(cx) let object = v.to_object());
2429 get_property::<bool>(cx, object.handle(), c"done", ())?
2430 .ok_or(Error::Type(c"Promise has no done property.".to_owned()))
2431}
2432
2433pub(crate) fn get_read_promise_bytes(
2435 cx: &mut JSContext,
2436 v: &SafeHandleValue,
2437) -> Result<Vec<u8>, Error> {
2438 if !v.is_object() {
2439 return Err(Error::Type(
2440 c"Unknown format for for bytes read.".to_owned(),
2441 ));
2442 }
2443
2444 rooted!(&in(cx) let object = v.to_object());
2445 get_property::<Vec<u8>>(
2446 cx,
2447 object.handle(),
2448 c"value",
2449 ConversionBehavior::EnforceRange,
2450 )?
2451 .ok_or(Error::Type(c"Promise has no value property.".to_owned()))
2452}
2453
2454pub(crate) fn bytes_from_chunk_jsval(
2458 cx: &mut JSContext,
2459 chunk: &RootedTraceableBox<Heap<JSVal>>,
2460) -> Result<Vec<u8>, Error> {
2461 match Vec::<u8>::safe_from_jsval(cx, chunk.handle(), ConversionBehavior::EnforceRange) {
2462 Ok(ConversionResult::Success(vec)) => Ok(vec),
2463 Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.into_owned())),
2464 _ => Err(Error::Type(c"Unknown format for bytes read.".to_owned())),
2465 }
2466}
2467
2468impl Transferable for ReadableStream {
2470 type Index = MessagePortIndex;
2471 type Data = MessagePortImpl;
2472
2473 fn transfer(&self, cx: &mut JSContext) -> Fallible<(MessagePortId, MessagePortImpl)> {
2475 if self.is_locked() {
2478 return Err(Error::DataClone(None));
2479 }
2480
2481 let global = self.global();
2482 let mut realm = enter_auto_realm(cx, &*global);
2483 let mut realm = realm.current_realm();
2484 let cx = &mut realm;
2485
2486 let port_1 = MessagePort::new(cx, &global);
2488 global.track_message_port(&port_1, None);
2489
2490 let port_2 = MessagePort::new(cx, &global);
2492 global.track_message_port(&port_2, None);
2493
2494 global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
2496
2497 let writable = WritableStream::new_with_proto(cx, &global, None);
2499
2500 writable.setup_cross_realm_transform_writable(cx, &port_1);
2502
2503 let promise = self.pipe_to(cx, &global, &writable, false, false, false, None);
2505
2506 promise.set_promise_is_handled(cx);
2508
2509 port_2.transfer(cx)
2511 }
2512
2513 fn transfer_receive(
2515 cx: &mut JSContext,
2516 owner: &GlobalScope,
2517 id: MessagePortId,
2518 port_impl: MessagePortImpl,
2519 ) -> Result<DomRoot<Self>, ()> {
2520 let value = ReadableStream::new_with_proto(cx, owner, None);
2523
2524 let transferred_port = MessagePort::transfer_receive(cx, owner, id, port_impl)?;
2531
2532 value.setup_cross_realm_transform_readable(cx, &transferred_port);
2534 Ok(value)
2535 }
2536
2537 fn serialized_storage<'a>(
2539 data: StructuredData<'a, '_>,
2540 ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
2541 match data {
2542 StructuredData::Reader(r) => &mut r.port_impls,
2543 StructuredData::Writer(w) => &mut w.ports,
2544 }
2545 }
2546}
2547
2548pub(crate) fn pipe_through(
2555 source: &ReadableStream,
2556 cx: &mut JSContext,
2557 global: &GlobalScope,
2558 transform: &TextDecoderStream,
2559) -> DomRoot<ReadableStream> {
2560 let mut realm = CurrentRealm::assert(cx);
2566 let promise = source.pipe_to(
2569 &mut realm,
2570 global,
2571 &transform.Writable(),
2572 false, false, false, None, );
2577
2578 promise.set_promise_is_handled(cx);
2580 transform.Readable()
2582}