1use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::jsapi::{Heap, JSObject};
12use js::jsval::{JSVal, UndefinedValue};
13use js::realm::CurrentRealm;
14use js::rust::wrappers::JS_GetPendingException;
15use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue, MutableHandleValue};
16use js::typedarray::Uint8;
17use script_bindings::conversions::SafeToJSValConvertible;
18
19use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
20use super::bindings::root::Dom;
21use crate::dom::bindings::buffer_source::create_buffer_source;
22use crate::dom::bindings::callback::ExceptionHandling;
23use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultControllerMethods;
24use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
25use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible, throw_dom_exception};
26use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
27use crate::dom::bindings::root::{DomRoot, MutNullableDom};
28use crate::dom::bindings::trace::RootedTraceableBox;
29use crate::dom::globalscope::GlobalScope;
30use crate::dom::promise::Promise;
31use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
32use crate::dom::readablestream::ReadableStream;
33use crate::dom::readablestreamdefaultreader::ReadRequest;
34use crate::dom::underlyingsourcecontainer::{UnderlyingSourceContainer, UnderlyingSourceType};
35use crate::realms::{InRealm, enter_realm};
36use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
37
38#[derive(Clone, JSTraceable, MallocSizeOf)]
41#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
42struct PullAlgorithmFulfillmentHandler {
43 controller: Dom<ReadableStreamDefaultController>,
44}
45
46impl Callback for PullAlgorithmFulfillmentHandler {
47 fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
50 let can_gc = CanGc::from_cx(cx);
51 self.controller.pulling.set(false);
53
54 if self.controller.pull_again.get() {
56 self.controller.pull_again.set(false);
58
59 self.controller.call_pull_if_needed(can_gc);
61 }
62 }
63}
64
65#[derive(Clone, JSTraceable, MallocSizeOf)]
68#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
69struct PullAlgorithmRejectionHandler {
70 controller: Dom<ReadableStreamDefaultController>,
71}
72
73impl Callback for PullAlgorithmRejectionHandler {
74 fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
77 let can_gc = CanGc::from_cx(cx);
78 self.controller.error(v, can_gc);
80 }
81}
82
83#[derive(Clone, JSTraceable, MallocSizeOf)]
86#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
87struct StartAlgorithmFulfillmentHandler {
88 controller: Dom<ReadableStreamDefaultController>,
89}
90
91impl Callback for StartAlgorithmFulfillmentHandler {
92 fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
95 let can_gc = CanGc::from_cx(cx);
96 self.controller.started.set(true);
98
99 self.controller.call_pull_if_needed(can_gc);
101 }
102}
103
104#[derive(Clone, JSTraceable, MallocSizeOf)]
107#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
108struct StartAlgorithmRejectionHandler {
109 controller: Dom<ReadableStreamDefaultController>,
110}
111
112impl Callback for StartAlgorithmRejectionHandler {
113 fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
116 let can_gc = CanGc::from_cx(cx);
117 self.controller.error(v, can_gc);
119 }
120}
121
122#[derive(Debug, JSTraceable, MallocSizeOf, PartialEq)]
124#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
125pub(crate) struct ValueWithSize {
126 #[ignore_malloc_size_of = "Heap is measured by mozjs"]
128 pub(crate) value: Box<Heap<JSVal>>,
129 pub(crate) size: f64,
131}
132
133#[derive(Debug, JSTraceable, MallocSizeOf, PartialEq)]
135#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
136pub(crate) enum EnqueuedValue {
137 Native(Box<[u8]>),
139 Js(ValueWithSize),
141 CloseSentinel,
143}
144
145impl EnqueuedValue {
146 fn size(&self) -> f64 {
147 match self {
148 EnqueuedValue::Native(v) => v.len() as f64,
149 EnqueuedValue::Js(v) => v.size,
150 EnqueuedValue::CloseSentinel => 0.,
153 }
154 }
155
156 fn to_jsval(&self, cx: SafeJSContext, rval: MutableHandleValue, can_gc: CanGc) {
157 match self {
158 EnqueuedValue::Native(chunk) => {
159 rooted!(in(*cx) let mut array_buffer_ptr = ptr::null_mut::<JSObject>());
160 create_buffer_source::<Uint8>(cx, chunk, array_buffer_ptr.handle_mut(), can_gc)
161 .expect("failed to create buffer source for native chunk.");
162 array_buffer_ptr.safe_to_jsval(cx, rval, can_gc);
163 },
164 EnqueuedValue::Js(value_with_size) => {
165 value_with_size.value.safe_to_jsval(cx, rval, can_gc)
166 },
167 EnqueuedValue::CloseSentinel => {
168 unreachable!("The close sentinel is never made available as a js val.")
169 },
170 }
171 }
172}
173
174fn is_non_negative_number(value: &EnqueuedValue) -> bool {
176 let value_with_size = match value {
177 EnqueuedValue::Native(_) => return true,
178 EnqueuedValue::Js(value_with_size) => value_with_size,
179 EnqueuedValue::CloseSentinel => return true,
180 };
181
182 if value_with_size.size.is_nan() {
187 return false;
188 }
189
190 if value_with_size.size.is_sign_negative() {
192 return false;
193 }
194
195 true
196}
197
198#[derive(Default, JSTraceable, MallocSizeOf)]
200#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
201pub(crate) struct QueueWithSizes {
202 queue: RefCell<VecDeque<EnqueuedValue>>,
203 pub(crate) total_size: Cell<f64>,
205}
206
207impl QueueWithSizes {
208 pub(crate) fn dequeue_value(
212 &self,
213 cx: SafeJSContext,
214 rval: Option<MutableHandleValue>,
215 can_gc: CanGc,
216 ) {
217 {
218 let queue = self.queue.borrow();
219 let Some(value) = queue.front() else {
220 unreachable!("Buffer cannot be empty when dequeue value is called into.");
221 };
222 self.total_size.set(self.total_size.get() - value.size());
223 if let Some(rval) = rval {
224 value.to_jsval(cx, rval, can_gc);
225 } else {
226 assert_eq!(value, &EnqueuedValue::CloseSentinel);
227 }
228 }
229 self.queue.borrow_mut().pop_front();
230 }
231
232 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
234 pub(crate) fn enqueue_value_with_size(&self, value: EnqueuedValue) -> Result<(), Error> {
235 if !is_non_negative_number(&value) {
237 return Err(Error::Range(
238 "The size of the enqueued chunk is not a non-negative number.".to_string(),
239 ));
240 }
241
242 if value.size().is_infinite() {
244 return Err(Error::Range(
245 "The size of the enqueued chunk is infinite.".to_string(),
246 ));
247 }
248
249 self.total_size.set(self.total_size.get() + value.size());
250 self.queue.borrow_mut().push_back(value);
251
252 Ok(())
253 }
254
255 pub(crate) fn is_empty(&self) -> bool {
256 self.queue.borrow().is_empty()
257 }
258
259 pub(crate) fn peek_queue_value(
262 &self,
263 cx: SafeJSContext,
264 rval: MutableHandleValue,
265 can_gc: CanGc,
266 ) -> bool {
267 assert!(!self.is_empty());
272
273 let queue = self.queue.borrow();
275 let value_with_size = queue.front().expect("Queue is not empty.");
276 if let EnqueuedValue::CloseSentinel = value_with_size {
277 return true;
278 }
279
280 value_with_size.to_jsval(cx, rval, can_gc);
282 false
283 }
284
285 fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
287 self.queue
288 .borrow()
289 .iter()
290 .try_fold(Vec::new(), |mut acc, value| match value {
291 EnqueuedValue::Native(chunk) => {
292 acc.extend(chunk.iter().copied());
293 Some(acc)
294 },
295 _ => {
296 warn!("get_in_memory_bytes called on a controller with non-native source.");
297 None
298 },
299 })
300 }
301
302 pub(crate) fn reset(&self) {
304 self.queue.borrow_mut().clear();
305 self.total_size.set(Default::default());
306 }
307}
308
309#[dom_struct]
311pub(crate) struct ReadableStreamDefaultController {
312 reflector_: Reflector,
313
314 queue: QueueWithSizes,
316
317 underlying_source: MutNullableDom<UnderlyingSourceContainer>,
323
324 stream: MutNullableDom<ReadableStream>,
325
326 strategy_hwm: f64,
328
329 #[ignore_malloc_size_of = "mozjs"]
331 strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>,
332
333 close_requested: Cell<bool>,
335
336 started: Cell<bool>,
338
339 pulling: Cell<bool>,
341
342 pull_again: Cell<bool>,
344}
345
346impl ReadableStreamDefaultController {
347 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
348 fn new_inherited(
349 global: &GlobalScope,
350 underlying_source_type: UnderlyingSourceType,
351 strategy_hwm: f64,
352 strategy_size: Rc<QueuingStrategySize>,
353 can_gc: CanGc,
354 ) -> ReadableStreamDefaultController {
355 ReadableStreamDefaultController {
356 reflector_: Reflector::new(),
357 queue: Default::default(),
358 stream: MutNullableDom::new(None),
359 underlying_source: MutNullableDom::new(Some(&*UnderlyingSourceContainer::new(
360 global,
361 underlying_source_type,
362 can_gc,
363 ))),
364 strategy_hwm,
365 strategy_size: RefCell::new(Some(strategy_size)),
366 close_requested: Default::default(),
367 started: Default::default(),
368 pulling: Default::default(),
369 pull_again: Default::default(),
370 }
371 }
372
373 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
374 pub(crate) fn new(
375 global: &GlobalScope,
376 underlying_source: UnderlyingSourceType,
377 strategy_hwm: f64,
378 strategy_size: Rc<QueuingStrategySize>,
379 can_gc: CanGc,
380 ) -> DomRoot<ReadableStreamDefaultController> {
381 reflect_dom_object(
382 Box::new(ReadableStreamDefaultController::new_inherited(
383 global,
384 underlying_source,
385 strategy_hwm,
386 strategy_size,
387 can_gc,
388 )),
389 global,
390 can_gc,
391 )
392 }
393
394 pub(crate) fn setup(
396 &self,
397 stream: DomRoot<ReadableStream>,
398 can_gc: CanGc,
399 ) -> Result<(), Error> {
400 stream.assert_no_controller();
402
403 self.stream.set(Some(&stream));
405
406 let global = &*self.global();
407 let rooted_default_controller = DomRoot::from_ref(self);
408
409 stream.set_default_controller(&rooted_default_controller);
422
423 if let Some(underlying_source) = rooted_default_controller.underlying_source.get() {
424 let start_result = underlying_source
426 .call_start_algorithm(
427 Controller::ReadableStreamDefaultController(rooted_default_controller.clone()),
428 can_gc,
429 )
430 .unwrap_or_else(|| {
431 let promise = Promise::new(global, can_gc);
432 promise.resolve_native(&(), can_gc);
433 Ok(promise)
434 });
435
436 let start_promise = start_result?;
438
439 let handler = PromiseNativeHandler::new(
441 global,
442 Some(Box::new(StartAlgorithmFulfillmentHandler {
443 controller: Dom::from_ref(&rooted_default_controller),
444 })),
445 Some(Box::new(StartAlgorithmRejectionHandler {
446 controller: Dom::from_ref(&rooted_default_controller),
447 })),
448 can_gc,
449 );
450 let realm = enter_realm(global);
451 let comp = InRealm::Entered(&realm);
452 start_promise.append_native_handler(&handler, comp, can_gc);
453 };
454
455 Ok(())
456 }
457
458 pub(crate) fn set_underlying_source_this_object(&self, this_object: HandleObject) {
460 if let Some(underlying_source) = self.underlying_source.get() {
461 underlying_source.set_underlying_source_this_object(this_object);
462 }
463 }
464
465 fn dequeue_value(&self, cx: SafeJSContext, rval: MutableHandleValue, can_gc: CanGc) {
467 self.queue.dequeue_value(cx, Some(rval), can_gc);
468 }
469
470 fn should_call_pull(&self) -> bool {
472 let Some(stream) = self.stream.get() else {
476 debug!("`should_call_pull` called on a controller without a stream.");
477 return false;
478 };
479
480 if !self.can_close_or_enqueue() {
482 return false;
483 }
484
485 if !self.started.get() {
487 return false;
488 }
489
490 if stream.is_locked() && stream.get_num_read_requests() > 0 {
493 return true;
494 }
495
496 let desired_size = self.get_desired_size().expect("desiredSize is not null.");
499
500 if desired_size > 0. {
501 return true;
502 }
503
504 false
505 }
506
507 fn call_pull_if_needed(&self, can_gc: CanGc) {
509 if !self.should_call_pull() {
512 return;
513 }
514
515 if self.pulling.get() {
517 self.pull_again.set(true);
519
520 return;
521 }
522
523 self.pulling.set(true);
525
526 let global = self.global();
529 let rooted_default_controller = DomRoot::from_ref(self);
530 let controller =
531 Controller::ReadableStreamDefaultController(rooted_default_controller.clone());
532
533 let Some(underlying_source) = self.underlying_source.get() else {
534 return;
535 };
536 let handler = PromiseNativeHandler::new(
537 &global,
538 Some(Box::new(PullAlgorithmFulfillmentHandler {
539 controller: Dom::from_ref(&rooted_default_controller),
540 })),
541 Some(Box::new(PullAlgorithmRejectionHandler {
542 controller: Dom::from_ref(&rooted_default_controller),
543 })),
544 can_gc,
545 );
546
547 let realm = enter_realm(&*global);
548 let comp = InRealm::Entered(&realm);
549 let result = underlying_source
550 .call_pull_algorithm(controller, &global, can_gc)
551 .unwrap_or_else(|| {
552 let promise = Promise::new(&global, can_gc);
553 promise.resolve_native(&(), can_gc);
554 Ok(promise)
555 });
556 let promise = result.unwrap_or_else(|error| {
557 let cx = GlobalScope::get_cx();
558 rooted!(in(*cx) let mut rval = UndefinedValue());
559 error
561 .clone()
562 .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
563 let promise = Promise::new(&global, can_gc);
564 promise.reject_native(&rval.handle(), can_gc);
565 promise
566 });
567 promise.append_native_handler(&handler, comp, can_gc);
568 }
569
570 pub(crate) fn perform_cancel_steps(
572 &self,
573 cx: SafeJSContext,
574 global: &GlobalScope,
575 reason: SafeHandleValue,
576 can_gc: CanGc,
577 ) -> Rc<Promise> {
578 self.queue.reset();
580
581 let underlying_source = self
582 .underlying_source
583 .get()
584 .expect("Controller should have a source when the cancel steps are called into.");
585 let result = underlying_source
587 .call_cancel_algorithm(cx, global, reason, can_gc)
588 .unwrap_or_else(|| {
589 let promise = Promise::new(global, can_gc);
590 promise.resolve_native(&(), can_gc);
591 Ok(promise)
592 });
593 let promise = result.unwrap_or_else(|error| {
594 rooted!(in(*cx) let mut rval = UndefinedValue());
595
596 error
597 .clone()
598 .to_jsval(cx, global, rval.handle_mut(), can_gc);
599 let promise = Promise::new(global, can_gc);
600 promise.reject_native(&rval.handle(), can_gc);
601 promise
602 });
603
604 self.clear_algorithms();
606
607 promise
609 }
610
611 pub(crate) fn perform_pull_steps(&self, read_request: &ReadRequest, can_gc: CanGc) {
613 let Some(stream) = self.stream.get() else {
616 return;
617 };
618
619 if !self.queue.is_empty() {
621 let cx = GlobalScope::get_cx();
622 rooted!(in(*cx) let mut rval = UndefinedValue());
623 let result = RootedTraceableBox::new(Heap::default());
624 self.dequeue_value(cx, rval.handle_mut(), can_gc);
625 result.set(*rval);
626
627 if self.close_requested.get() && self.queue.is_empty() {
629 self.clear_algorithms();
631
632 stream.close(can_gc);
634 } else {
635 self.call_pull_if_needed(can_gc);
637 }
638 read_request.chunk_steps(result, &self.global(), can_gc);
640 } else {
641 stream.add_read_request(read_request);
643
644 self.call_pull_if_needed(can_gc);
646 }
647 }
648
649 pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
651 Ok(())
653 }
654
655 #[expect(unsafe_code)]
657 pub(crate) fn enqueue(
658 &self,
659 cx: SafeJSContext,
660 chunk: SafeHandleValue,
661 can_gc: CanGc,
662 ) -> Result<(), Error> {
663 if !self.can_close_or_enqueue() {
665 return Ok(());
666 }
667
668 let stream = self
669 .stream
670 .get()
671 .expect("Controller must have a stream when a chunk is enqueued.");
672
673 if stream.is_locked() && stream.get_num_read_requests() > 0 {
677 stream.fulfill_read_request(chunk, false, can_gc);
678 } else {
679 let strategy_size = {
684 let reference = self.strategy_size.borrow();
685 reference.clone()
686 };
687 let size = if let Some(strategy_size) = strategy_size {
688 let result = strategy_size.Call__(chunk, ExceptionHandling::Rethrow, can_gc);
691 match result {
692 Ok(size) => size,
694 Err(error) => {
695 rooted!(in(*cx) let mut rval = UndefinedValue());
697 unsafe { assert!(JS_GetPendingException(*cx, rval.handle_mut())) };
698
699 self.error(rval.handle(), can_gc);
701
702 return Err(error);
705 },
706 }
707 } else {
708 0.
709 };
710
711 {
712 let res = self
714 .queue
715 .enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize {
716 value: Heap::boxed(chunk.get()),
717 size,
718 }));
719 if let Err(error) = res {
720 throw_dom_exception(cx, &self.global(), error, can_gc);
726
727 rooted!(in(*cx) let mut rval = UndefinedValue());
730 unsafe { assert!(JS_GetPendingException(*cx, rval.handle_mut())) };
731
732 self.error(rval.handle(), can_gc);
734
735 return Err(Error::JSFailed);
739 }
740 }
741 }
742
743 self.call_pull_if_needed(can_gc);
745
746 Ok(())
747 }
748
749 pub(crate) fn enqueue_native(&self, chunk: Vec<u8>, can_gc: CanGc) {
752 let stream = self
753 .stream
754 .get()
755 .expect("Controller must have a stream when a chunk is enqueued.");
756 if stream.is_locked() && stream.get_num_read_requests() > 0 {
757 let cx = GlobalScope::get_cx();
758 rooted!(in(*cx) let mut rval = UndefinedValue());
759 EnqueuedValue::Native(chunk.into_boxed_slice()).to_jsval(cx, rval.handle_mut(), can_gc);
760 stream.fulfill_read_request(rval.handle(), false, can_gc);
761 } else {
762 self.queue
763 .enqueue_value_with_size(EnqueuedValue::Native(chunk.into_boxed_slice()))
764 .expect("Enqueuing a chunk from Rust should not fail.");
765 }
766 }
767
768 pub(crate) fn in_memory(&self) -> bool {
770 let Some(underlying_source) = self.underlying_source.get() else {
771 return false;
772 };
773 underlying_source.in_memory()
774 }
775
776 pub(crate) fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
778 let underlying_source = self.underlying_source.get()?;
779 if underlying_source.in_memory() {
780 return self.queue.get_in_memory_bytes();
781 }
782 None
783 }
784
785 fn clear_algorithms(&self) {
787 self.underlying_source.set(None);
790
791 *self.strategy_size.borrow_mut() = None;
793 }
794
795 pub(crate) fn close(&self, can_gc: CanGc) {
797 if !self.can_close_or_enqueue() {
799 return;
800 }
801
802 let Some(stream) = self.stream.get() else {
803 return;
804 };
805
806 self.close_requested.set(true);
808
809 if self.queue.is_empty() {
810 self.clear_algorithms();
812
813 stream.close(can_gc);
815 }
816 }
817
818 pub(crate) fn get_desired_size(&self) -> Option<f64> {
820 let stream = self.stream.get()?;
821
822 if stream.is_errored() {
824 return None;
825 }
826
827 if stream.is_closed() {
829 return Some(0.0);
830 }
831
832 let desired_size = self.strategy_hwm - self.queue.total_size.get().clamp(0.0, f64::MAX);
834 Some(desired_size.clamp(desired_size, self.strategy_hwm))
835 }
836
837 pub(crate) fn can_close_or_enqueue(&self) -> bool {
839 let Some(stream) = self.stream.get() else {
840 return false;
841 };
842
843 if !self.close_requested.get() && stream.is_readable() {
845 return true;
846 }
847
848 false
850 }
851
852 pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
854 let Some(stream) = self.stream.get() else {
855 return;
856 };
857
858 if !stream.is_readable() {
860 return;
861 }
862
863 self.queue.reset();
865
866 self.clear_algorithms();
868
869 stream.error(e, can_gc);
870 }
871
872 pub(crate) fn has_backpressure(&self) -> bool {
874 !self.should_call_pull()
877 }
878}
879
880impl ReadableStreamDefaultControllerMethods<crate::DomTypeHolder>
881 for ReadableStreamDefaultController
882{
883 fn GetDesiredSize(&self) -> Option<f64> {
885 self.get_desired_size()
886 }
887
888 fn Close(&self, can_gc: CanGc) -> Fallible<()> {
890 if !self.can_close_or_enqueue() {
891 return Err(Error::Type("Stream cannot be closed.".to_string()));
894 }
895
896 self.close(can_gc);
898
899 Ok(())
900 }
901
902 fn Enqueue(&self, cx: SafeJSContext, chunk: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
904 if !self.can_close_or_enqueue() {
906 return Err(Error::Type("Stream cannot be enqueued to.".to_string()));
907 }
908
909 self.enqueue(cx, chunk, can_gc)
911 }
912
913 fn Error(&self, _cx: SafeJSContext, e: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
915 self.error(e, can_gc);
916 Ok(())
917 }
918}