1use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::context::JSContext;
12use js::jsapi::{Heap, JSObject};
13use js::jsval::{JSVal, UndefinedValue};
14use js::realm::CurrentRealm;
15use js::rust::wrappers2::JS_GetPendingException;
16use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue, MutableHandleValue};
17use js::typedarray::Uint8;
18use script_bindings::conversions::SafeToJSValConvertible;
19use script_bindings::reflector::{Reflector, reflect_dom_object};
20
21use crate::dom::bindings::buffer_source::create_buffer_source;
22use crate::dom::bindings::callback::ExceptionHandling;
23use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
24use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultControllerMethods;
25use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
26use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible, throw_dom_exception};
27use crate::dom::bindings::reflector::DomGlobal;
28use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
29use crate::dom::bindings::trace::RootedTraceableBox;
30use crate::dom::globalscope::GlobalScope;
31use crate::dom::promise::Promise;
32use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
33use crate::dom::stream::readablestream::ReadableStream;
34use crate::dom::stream::readablestreamdefaultreader::ReadRequest;
35use crate::dom::stream::underlyingsourcecontainer::{
36 UnderlyingSourceContainer, UnderlyingSourceType,
37};
38use crate::realms::enter_auto_realm;
39use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
40
41#[derive(Clone, JSTraceable, MallocSizeOf)]
44#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
45struct PullAlgorithmFulfillmentHandler {
46 controller: Dom<ReadableStreamDefaultController>,
47}
48
49impl Callback for PullAlgorithmFulfillmentHandler {
50 fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
53 self.controller.pulling.set(false);
55
56 if self.controller.pull_again.get() {
58 self.controller.pull_again.set(false);
60
61 self.controller.call_pull_if_needed(cx);
63 }
64 }
65}
66
67#[derive(Clone, JSTraceable, MallocSizeOf)]
70#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
71struct PullAlgorithmRejectionHandler {
72 controller: Dom<ReadableStreamDefaultController>,
73}
74
75impl Callback for PullAlgorithmRejectionHandler {
76 fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
79 self.controller.error(cx, v);
81 }
82}
83
84#[derive(Clone, JSTraceable, MallocSizeOf)]
87#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
88struct StartAlgorithmFulfillmentHandler {
89 controller: Dom<ReadableStreamDefaultController>,
90}
91
92impl Callback for StartAlgorithmFulfillmentHandler {
93 fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
96 self.controller.started.set(true);
98
99 self.controller.call_pull_if_needed(cx);
101 }
102}
103
104#[derive(Clone, JSTraceable, MallocSizeOf)]
107#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
108struct StartAlgorithmRejectionHandler {
109 controller: Dom<ReadableStreamDefaultController>,
110}
111
112impl Callback for StartAlgorithmRejectionHandler {
113 fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
116 self.controller.error(cx, v);
118 }
119}
120
121#[derive(Debug, JSTraceable, MallocSizeOf, PartialEq)]
123#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
124pub(crate) struct ValueWithSize {
125 #[ignore_malloc_size_of = "Heap is measured by mozjs"]
127 pub(crate) value: Box<Heap<JSVal>>,
128 pub(crate) size: f64,
130}
131
132#[derive(Debug, JSTraceable, MallocSizeOf, PartialEq)]
134#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
135pub(crate) enum EnqueuedValue {
136 Native(Box<[u8]>),
138 Js(ValueWithSize),
140 CloseSentinel,
142}
143
144impl EnqueuedValue {
145 fn size(&self) -> f64 {
146 match self {
147 EnqueuedValue::Native(v) => v.len() as f64,
148 EnqueuedValue::Js(v) => v.size,
149 EnqueuedValue::CloseSentinel => 0.,
152 }
153 }
154
155 fn to_jsval(&self, cx: SafeJSContext, rval: MutableHandleValue, can_gc: CanGc) {
156 match self {
157 EnqueuedValue::Native(chunk) => {
158 rooted!(in(*cx) let mut array_buffer_ptr = ptr::null_mut::<JSObject>());
159 create_buffer_source::<Uint8>(cx, chunk, array_buffer_ptr.handle_mut(), can_gc)
160 .expect("failed to create buffer source for native chunk.");
161 array_buffer_ptr.safe_to_jsval(cx, rval, can_gc);
162 },
163 EnqueuedValue::Js(value_with_size) => {
164 value_with_size.value.safe_to_jsval(cx, rval, can_gc)
165 },
166 EnqueuedValue::CloseSentinel => {
167 unreachable!("The close sentinel is never made available as a js val.")
168 },
169 }
170 }
171}
172
173fn is_non_negative_number(value: &EnqueuedValue) -> bool {
175 let value_with_size = match value {
176 EnqueuedValue::Native(_) => return true,
177 EnqueuedValue::Js(value_with_size) => value_with_size,
178 EnqueuedValue::CloseSentinel => return true,
179 };
180
181 if value_with_size.size.is_nan() {
186 return false;
187 }
188
189 if value_with_size.size.is_sign_negative() {
191 return false;
192 }
193
194 true
195}
196
197#[derive(Default, JSTraceable, MallocSizeOf)]
199#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
200pub(crate) struct QueueWithSizes {
201 queue: RefCell<VecDeque<EnqueuedValue>>,
202 pub(crate) total_size: Cell<f64>,
204}
205
206impl QueueWithSizes {
207 pub(crate) fn dequeue_value(
211 &self,
212 cx: SafeJSContext,
213 rval: Option<MutableHandleValue>,
214 can_gc: CanGc,
215 ) {
216 {
217 let queue = self.queue.borrow();
218 let Some(value) = queue.front() else {
219 unreachable!("Buffer cannot be empty when dequeue value is called into.");
220 };
221 self.total_size.set(self.total_size.get() - value.size());
222 if let Some(rval) = rval {
223 value.to_jsval(cx, rval, can_gc);
224 } else {
225 assert_eq!(value, &EnqueuedValue::CloseSentinel);
226 }
227 }
228 self.queue.borrow_mut().pop_front();
229 }
230
231 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
233 pub(crate) fn enqueue_value_with_size(&self, value: EnqueuedValue) -> Result<(), Error> {
234 if !is_non_negative_number(&value) {
236 return Err(Error::Range(
237 c"The size of the enqueued chunk is not a non-negative number.".to_owned(),
238 ));
239 }
240
241 if value.size().is_infinite() {
243 return Err(Error::Range(
244 c"The size of the enqueued chunk is infinite.".to_owned(),
245 ));
246 }
247
248 self.total_size.set(self.total_size.get() + value.size());
249 self.queue.borrow_mut().push_back(value);
250
251 Ok(())
252 }
253
254 pub(crate) fn is_empty(&self) -> bool {
255 self.queue.borrow().is_empty()
256 }
257
258 pub(crate) fn peek_queue_value(
261 &self,
262 cx: SafeJSContext,
263 rval: MutableHandleValue,
264 can_gc: CanGc,
265 ) -> bool {
266 assert!(!self.is_empty());
271
272 let queue = self.queue.borrow();
274 let value_with_size = queue.front().expect("Queue is not empty.");
275 if let EnqueuedValue::CloseSentinel = value_with_size {
276 return true;
277 }
278
279 value_with_size.to_jsval(cx, rval, can_gc);
281 false
282 }
283
284 fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
286 self.queue
287 .borrow()
288 .iter()
289 .try_fold(Vec::new(), |mut acc, value| match value {
290 EnqueuedValue::Native(chunk) => {
291 acc.extend(chunk.iter().copied());
292 Some(acc)
293 },
294 _ => {
295 warn!("get_in_memory_bytes called on a controller with non-native source.");
296 None
297 },
298 })
299 }
300
301 pub(crate) fn reset(&self) {
303 self.queue.borrow_mut().clear();
304 self.total_size.set(Default::default());
305 }
306}
307
308#[dom_struct]
310pub(crate) struct ReadableStreamDefaultController {
311 reflector_: Reflector,
312
313 queue: QueueWithSizes,
315
316 underlying_source: MutNullableDom<UnderlyingSourceContainer>,
322
323 stream: MutNullableDom<ReadableStream>,
324
325 strategy_hwm: f64,
327
328 #[ignore_malloc_size_of = "mozjs"]
330 strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>,
331
332 close_requested: Cell<bool>,
334
335 started: Cell<bool>,
337
338 pulling: Cell<bool>,
340
341 pull_again: Cell<bool>,
343}
344
345impl ReadableStreamDefaultController {
346 fn new_inherited(
347 global: &GlobalScope,
348 underlying_source_type: UnderlyingSourceType,
349 strategy_hwm: f64,
350 strategy_size: Rc<QueuingStrategySize>,
351 can_gc: CanGc,
352 ) -> ReadableStreamDefaultController {
353 ReadableStreamDefaultController {
354 reflector_: Reflector::new(),
355 queue: Default::default(),
356 stream: MutNullableDom::new(None),
357 underlying_source: MutNullableDom::new(Some(&*UnderlyingSourceContainer::new(
358 global,
359 underlying_source_type,
360 can_gc,
361 ))),
362 strategy_hwm,
363 strategy_size: RefCell::new(Some(strategy_size)),
364 close_requested: Default::default(),
365 started: Default::default(),
366 pulling: Default::default(),
367 pull_again: Default::default(),
368 }
369 }
370
371 pub(crate) fn new(
372 global: &GlobalScope,
373 underlying_source: UnderlyingSourceType,
374 strategy_hwm: f64,
375 strategy_size: Rc<QueuingStrategySize>,
376 can_gc: CanGc,
377 ) -> DomRoot<ReadableStreamDefaultController> {
378 reflect_dom_object(
379 Box::new(ReadableStreamDefaultController::new_inherited(
380 global,
381 underlying_source,
382 strategy_hwm,
383 strategy_size,
384 can_gc,
385 )),
386 global,
387 can_gc,
388 )
389 }
390
391 pub(crate) fn setup(
393 &self,
394 cx: &mut JSContext,
395 stream: DomRoot<ReadableStream>,
396 ) -> Result<(), Error> {
397 stream.assert_no_controller();
399
400 self.stream.set(Some(&stream));
402
403 let global = &*self.global();
404 let rooted_default_controller = DomRoot::from_ref(self);
405
406 stream.set_default_controller(&rooted_default_controller);
419
420 if let Some(underlying_source) = rooted_default_controller.underlying_source.get() {
421 let start_result = underlying_source
423 .call_start_algorithm(
424 cx,
425 Controller::ReadableStreamDefaultController(rooted_default_controller.clone()),
426 )
427 .unwrap_or_else(|| {
428 let promise = Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
429 Ok(promise)
430 });
431
432 let start_promise = start_result?;
434
435 let handler = PromiseNativeHandler::new(
437 global,
438 Some(Box::new(StartAlgorithmFulfillmentHandler {
439 controller: Dom::from_ref(&rooted_default_controller),
440 })),
441 Some(Box::new(StartAlgorithmRejectionHandler {
442 controller: Dom::from_ref(&rooted_default_controller),
443 })),
444 CanGc::from_cx(cx),
445 );
446 let mut realm = enter_auto_realm(cx, global);
447 let cx = &mut realm.current_realm();
448 start_promise.append_native_handler(cx, &handler);
449 };
450
451 Ok(())
452 }
453
454 pub(crate) fn set_underlying_source_this_object(&self, this_object: HandleObject) {
456 if let Some(underlying_source) = self.underlying_source.get() {
457 underlying_source.set_underlying_source_this_object(this_object);
458 }
459 }
460
461 fn dequeue_value(&self, cx: SafeJSContext, rval: MutableHandleValue, can_gc: CanGc) {
463 self.queue.dequeue_value(cx, Some(rval), can_gc);
464 }
465
466 fn should_call_pull(&self) -> bool {
468 let Some(stream) = self.stream.get() else {
472 debug!("`should_call_pull` called on a controller without a stream.");
473 return false;
474 };
475
476 if !self.can_close_or_enqueue() {
478 return false;
479 }
480
481 if !self.started.get() {
483 return false;
484 }
485
486 if stream.is_locked() && stream.get_num_read_requests() > 0 {
489 return true;
490 }
491
492 let desired_size = self.get_desired_size().expect("desiredSize is not null.");
495
496 if desired_size > 0. {
497 return true;
498 }
499
500 false
501 }
502
503 fn call_pull_if_needed(&self, cx: &mut JSContext) {
505 if !self.should_call_pull() {
508 return;
509 }
510
511 if self.pulling.get() {
513 self.pull_again.set(true);
515
516 return;
517 }
518
519 self.pulling.set(true);
521
522 let global = self.global();
525 let rooted_default_controller = DomRoot::from_ref(self);
526 let controller =
527 Controller::ReadableStreamDefaultController(rooted_default_controller.clone());
528
529 let Some(underlying_source) = self.underlying_source.get() else {
530 return;
531 };
532 let handler = PromiseNativeHandler::new(
533 &global,
534 Some(Box::new(PullAlgorithmFulfillmentHandler {
535 controller: Dom::from_ref(&rooted_default_controller),
536 })),
537 Some(Box::new(PullAlgorithmRejectionHandler {
538 controller: Dom::from_ref(&rooted_default_controller),
539 })),
540 CanGc::from_cx(cx),
541 );
542
543 let mut realm = enter_auto_realm(cx, &*global);
544 let cx = &mut realm.current_realm();
545
546 let result = underlying_source
547 .call_pull_algorithm(cx, controller)
548 .unwrap_or_else(|| {
549 let promise = Promise::new_resolved(&global, cx.into(), (), CanGc::from_cx(cx));
550 Ok(promise)
551 });
552 let promise = result.unwrap_or_else(|error| {
553 rooted!(&in(cx) let mut rval = UndefinedValue());
554 error.to_jsval(cx.into(), &global, rval.handle_mut(), CanGc::from_cx(cx));
556 Promise::new_rejected(&global, cx.into(), rval.handle(), CanGc::from_cx(cx))
557 });
558 promise.append_native_handler(cx, &handler);
559 }
560
561 pub(crate) fn perform_cancel_steps(
563 &self,
564 cx: &mut JSContext,
565 global: &GlobalScope,
566 reason: SafeHandleValue,
567 ) -> Rc<Promise> {
568 self.queue.reset();
570
571 let underlying_source = self
572 .underlying_source
573 .get()
574 .expect("Controller should have a source when the cancel steps are called into.");
575 let result = underlying_source
577 .call_cancel_algorithm(cx, global, reason)
578 .unwrap_or_else(|| {
579 let promise = Promise::new2(cx, global);
580 promise.resolve_native(&(), CanGc::from_cx(cx));
581 Ok(promise)
582 });
583 let promise = result.unwrap_or_else(|error| {
584 rooted!(&in(cx) let mut rval = UndefinedValue());
585
586 error.to_jsval(cx.into(), global, rval.handle_mut(), CanGc::from_cx(cx));
587 let promise = Promise::new2(cx, global);
588 promise.reject_native(&rval.handle(), CanGc::from_cx(cx));
589 promise
590 });
591
592 self.clear_algorithms();
594
595 promise
597 }
598
599 pub(crate) fn perform_pull_steps(&self, cx: &mut JSContext, read_request: &ReadRequest) {
601 let Some(stream) = self.stream.get() else {
604 return;
605 };
606
607 if !self.queue.is_empty() {
609 rooted!(&in(cx) let mut rval = UndefinedValue());
610 let result = RootedTraceableBox::new(Heap::default());
611 self.dequeue_value(cx.into(), rval.handle_mut(), CanGc::from_cx(cx));
612 result.set(*rval);
613
614 if self.close_requested.get() && self.queue.is_empty() {
616 self.clear_algorithms();
618
619 stream.close(cx);
621 } else {
622 self.call_pull_if_needed(cx);
624 }
625 read_request.chunk_steps(cx, result, &self.global());
627 } else {
628 stream.add_read_request(read_request);
630
631 self.call_pull_if_needed(cx);
633 }
634 }
635
636 pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
638 Ok(())
640 }
641
642 #[expect(unsafe_code)]
644 pub(crate) fn enqueue(&self, cx: &mut JSContext, chunk: SafeHandleValue) -> Result<(), Error> {
645 if !self.can_close_or_enqueue() {
647 return Ok(());
648 }
649
650 let stream = self
651 .stream
652 .get()
653 .expect("Controller must have a stream when a chunk is enqueued.");
654
655 if stream.is_locked() && stream.get_num_read_requests() > 0 {
659 stream.fulfill_read_request(cx, chunk, false);
660 } else {
661 let strategy_size = {
666 let reference = self.strategy_size.borrow();
667 reference.clone()
668 };
669 let size = if let Some(strategy_size) = strategy_size {
670 let result = strategy_size.Call__(cx, chunk, ExceptionHandling::Rethrow);
673 match result {
674 Ok(size) => size,
676 Err(error) => {
677 rooted!(&in(cx) let mut rval = UndefinedValue());
679 unsafe { assert!(JS_GetPendingException(cx, rval.handle_mut())) };
680
681 self.error(cx, rval.handle());
683
684 return Err(error);
687 },
688 }
689 } else {
690 0.
691 };
692
693 {
694 let res = self
696 .queue
697 .enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize {
698 value: Heap::boxed(chunk.get()),
699 size,
700 }));
701 if let Err(error) = res {
702 throw_dom_exception(cx.into(), &self.global(), error, CanGc::from_cx(cx));
708
709 rooted!(&in(cx) let mut rval = UndefinedValue());
712 unsafe { assert!(JS_GetPendingException(cx, rval.handle_mut())) };
713
714 self.error(cx, rval.handle());
716
717 return Err(Error::JSFailed);
721 }
722 }
723 }
724
725 self.call_pull_if_needed(cx);
727
728 Ok(())
729 }
730
731 pub(crate) fn enqueue_native(&self, cx: &mut JSContext, chunk: Vec<u8>) {
734 let stream = self
735 .stream
736 .get()
737 .expect("Controller must have a stream when a chunk is enqueued.");
738 if stream.is_locked() && stream.get_num_read_requests() > 0 {
739 rooted!(&in(cx) let mut rval = UndefinedValue());
740 EnqueuedValue::Native(chunk.into_boxed_slice()).to_jsval(
741 cx.into(),
742 rval.handle_mut(),
743 CanGc::from_cx(cx),
744 );
745 stream.fulfill_read_request(cx, rval.handle(), false);
746 } else {
747 self.queue
748 .enqueue_value_with_size(EnqueuedValue::Native(chunk.into_boxed_slice()))
749 .expect("Enqueuing a chunk from Rust should not fail.");
750 }
751 }
752
753 pub(crate) fn in_memory(&self) -> bool {
755 let Some(underlying_source) = self.underlying_source.get() else {
756 return false;
757 };
758 underlying_source.in_memory()
759 }
760
761 pub(crate) fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
763 let underlying_source = self.underlying_source.get()?;
764 if underlying_source.in_memory() {
765 return self.queue.get_in_memory_bytes();
766 }
767 None
768 }
769
770 fn clear_algorithms(&self) {
772 self.underlying_source.set(None);
775
776 *self.strategy_size.borrow_mut() = None;
778 }
779
780 pub(crate) fn close(&self, cx: &mut JSContext) {
782 if !self.can_close_or_enqueue() {
784 return;
785 }
786
787 let Some(stream) = self.stream.get() else {
788 return;
789 };
790
791 self.close_requested.set(true);
793
794 if self.queue.is_empty() {
795 self.clear_algorithms();
797
798 stream.close(cx);
800 }
801 }
802
803 pub(crate) fn get_desired_size(&self) -> Option<f64> {
805 let stream = self.stream.get()?;
806
807 if stream.is_errored() {
809 return None;
810 }
811
812 if stream.is_closed() {
814 return Some(0.0);
815 }
816
817 let desired_size = self.strategy_hwm - self.queue.total_size.get().clamp(0.0, f64::MAX);
819 Some(desired_size.clamp(desired_size, self.strategy_hwm))
820 }
821
822 pub(crate) fn can_close_or_enqueue(&self) -> bool {
824 let Some(stream) = self.stream.get() else {
825 return false;
826 };
827
828 if !self.close_requested.get() && stream.is_readable() {
830 return true;
831 }
832
833 false
835 }
836
837 pub(crate) fn error(&self, cx: &mut JSContext, e: SafeHandleValue) {
839 let Some(stream) = self.stream.get() else {
840 return;
841 };
842
843 if !stream.is_readable() {
845 return;
846 }
847
848 self.queue.reset();
850
851 self.clear_algorithms();
853
854 stream.error(cx, e);
855 }
856
857 pub(crate) fn has_backpressure(&self) -> bool {
859 !self.should_call_pull()
862 }
863}
864
865impl ReadableStreamDefaultControllerMethods<crate::DomTypeHolder>
866 for ReadableStreamDefaultController
867{
868 fn GetDesiredSize(&self) -> Option<f64> {
870 self.get_desired_size()
871 }
872
873 fn Close(&self, cx: &mut JSContext) -> Fallible<()> {
875 if !self.can_close_or_enqueue() {
876 return Err(Error::Type(c"Stream cannot be closed.".to_owned()));
879 }
880
881 self.close(cx);
883
884 Ok(())
885 }
886
887 fn Enqueue(&self, cx: &mut JSContext, chunk: SafeHandleValue) -> Fallible<()> {
889 if !self.can_close_or_enqueue() {
891 return Err(Error::Type(c"Stream cannot be enqueued to.".to_owned()));
892 }
893
894 self.enqueue(cx, chunk)
896 }
897
898 fn Error(&self, cx: &mut JSContext, e: SafeHandleValue) -> Fallible<()> {
900 self.error(cx, e);
901 Ok(())
902 }
903}