1use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::jsapi::{Heap, JSObject};
12use js::jsval::{JSVal, UndefinedValue};
13use js::realm::CurrentRealm;
14use js::rust::wrappers::JS_GetPendingException;
15use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue, MutableHandleValue};
16use js::typedarray::Uint8;
17use script_bindings::conversions::SafeToJSValConvertible;
18
19use crate::dom::bindings::buffer_source::create_buffer_source;
20use crate::dom::bindings::callback::ExceptionHandling;
21use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
22use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultControllerMethods;
23use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
24use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible, throw_dom_exception};
25use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
26use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
27use crate::dom::bindings::trace::RootedTraceableBox;
28use crate::dom::globalscope::GlobalScope;
29use crate::dom::promise::Promise;
30use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
31use crate::dom::stream::readablestream::ReadableStream;
32use crate::dom::stream::readablestreamdefaultreader::ReadRequest;
33use crate::dom::stream::underlyingsourcecontainer::{
34 UnderlyingSourceContainer, UnderlyingSourceType,
35};
36use crate::realms::{InRealm, enter_realm};
37use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
38
39#[derive(Clone, JSTraceable, MallocSizeOf)]
42#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
43struct PullAlgorithmFulfillmentHandler {
44 controller: Dom<ReadableStreamDefaultController>,
45}
46
47impl Callback for PullAlgorithmFulfillmentHandler {
48 fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
51 let can_gc = CanGc::from_cx(cx);
52 self.controller.pulling.set(false);
54
55 if self.controller.pull_again.get() {
57 self.controller.pull_again.set(false);
59
60 self.controller.call_pull_if_needed(can_gc);
62 }
63 }
64}
65
66#[derive(Clone, JSTraceable, MallocSizeOf)]
69#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
70struct PullAlgorithmRejectionHandler {
71 controller: Dom<ReadableStreamDefaultController>,
72}
73
74impl Callback for PullAlgorithmRejectionHandler {
75 fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
78 let can_gc = CanGc::from_cx(cx);
79 self.controller.error(v, can_gc);
81 }
82}
83
84#[derive(Clone, JSTraceable, MallocSizeOf)]
87#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
88struct StartAlgorithmFulfillmentHandler {
89 controller: Dom<ReadableStreamDefaultController>,
90}
91
92impl Callback for StartAlgorithmFulfillmentHandler {
93 fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
96 let can_gc = CanGc::from_cx(cx);
97 self.controller.started.set(true);
99
100 self.controller.call_pull_if_needed(can_gc);
102 }
103}
104
105#[derive(Clone, JSTraceable, MallocSizeOf)]
108#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
109struct StartAlgorithmRejectionHandler {
110 controller: Dom<ReadableStreamDefaultController>,
111}
112
113impl Callback for StartAlgorithmRejectionHandler {
114 fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
117 let can_gc = CanGc::from_cx(cx);
118 self.controller.error(v, can_gc);
120 }
121}
122
123#[derive(Debug, JSTraceable, MallocSizeOf, PartialEq)]
125#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
126pub(crate) struct ValueWithSize {
127 #[ignore_malloc_size_of = "Heap is measured by mozjs"]
129 pub(crate) value: Box<Heap<JSVal>>,
130 pub(crate) size: f64,
132}
133
134#[derive(Debug, JSTraceable, MallocSizeOf, PartialEq)]
136#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
137pub(crate) enum EnqueuedValue {
138 Native(Box<[u8]>),
140 Js(ValueWithSize),
142 CloseSentinel,
144}
145
146impl EnqueuedValue {
147 fn size(&self) -> f64 {
148 match self {
149 EnqueuedValue::Native(v) => v.len() as f64,
150 EnqueuedValue::Js(v) => v.size,
151 EnqueuedValue::CloseSentinel => 0.,
154 }
155 }
156
157 fn to_jsval(&self, cx: SafeJSContext, rval: MutableHandleValue, can_gc: CanGc) {
158 match self {
159 EnqueuedValue::Native(chunk) => {
160 rooted!(in(*cx) let mut array_buffer_ptr = ptr::null_mut::<JSObject>());
161 create_buffer_source::<Uint8>(cx, chunk, array_buffer_ptr.handle_mut(), can_gc)
162 .expect("failed to create buffer source for native chunk.");
163 array_buffer_ptr.safe_to_jsval(cx, rval, can_gc);
164 },
165 EnqueuedValue::Js(value_with_size) => {
166 value_with_size.value.safe_to_jsval(cx, rval, can_gc)
167 },
168 EnqueuedValue::CloseSentinel => {
169 unreachable!("The close sentinel is never made available as a js val.")
170 },
171 }
172 }
173}
174
175fn is_non_negative_number(value: &EnqueuedValue) -> bool {
177 let value_with_size = match value {
178 EnqueuedValue::Native(_) => return true,
179 EnqueuedValue::Js(value_with_size) => value_with_size,
180 EnqueuedValue::CloseSentinel => return true,
181 };
182
183 if value_with_size.size.is_nan() {
188 return false;
189 }
190
191 if value_with_size.size.is_sign_negative() {
193 return false;
194 }
195
196 true
197}
198
199#[derive(Default, JSTraceable, MallocSizeOf)]
201#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
202pub(crate) struct QueueWithSizes {
203 queue: RefCell<VecDeque<EnqueuedValue>>,
204 pub(crate) total_size: Cell<f64>,
206}
207
208impl QueueWithSizes {
209 pub(crate) fn dequeue_value(
213 &self,
214 cx: SafeJSContext,
215 rval: Option<MutableHandleValue>,
216 can_gc: CanGc,
217 ) {
218 {
219 let queue = self.queue.borrow();
220 let Some(value) = queue.front() else {
221 unreachable!("Buffer cannot be empty when dequeue value is called into.");
222 };
223 self.total_size.set(self.total_size.get() - value.size());
224 if let Some(rval) = rval {
225 value.to_jsval(cx, rval, can_gc);
226 } else {
227 assert_eq!(value, &EnqueuedValue::CloseSentinel);
228 }
229 }
230 self.queue.borrow_mut().pop_front();
231 }
232
233 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
235 pub(crate) fn enqueue_value_with_size(&self, value: EnqueuedValue) -> Result<(), Error> {
236 if !is_non_negative_number(&value) {
238 return Err(Error::Range(
239 "The size of the enqueued chunk is not a non-negative number.".to_string(),
240 ));
241 }
242
243 if value.size().is_infinite() {
245 return Err(Error::Range(
246 "The size of the enqueued chunk is infinite.".to_string(),
247 ));
248 }
249
250 self.total_size.set(self.total_size.get() + value.size());
251 self.queue.borrow_mut().push_back(value);
252
253 Ok(())
254 }
255
256 pub(crate) fn is_empty(&self) -> bool {
257 self.queue.borrow().is_empty()
258 }
259
260 pub(crate) fn peek_queue_value(
263 &self,
264 cx: SafeJSContext,
265 rval: MutableHandleValue,
266 can_gc: CanGc,
267 ) -> bool {
268 assert!(!self.is_empty());
273
274 let queue = self.queue.borrow();
276 let value_with_size = queue.front().expect("Queue is not empty.");
277 if let EnqueuedValue::CloseSentinel = value_with_size {
278 return true;
279 }
280
281 value_with_size.to_jsval(cx, rval, can_gc);
283 false
284 }
285
286 fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
288 self.queue
289 .borrow()
290 .iter()
291 .try_fold(Vec::new(), |mut acc, value| match value {
292 EnqueuedValue::Native(chunk) => {
293 acc.extend(chunk.iter().copied());
294 Some(acc)
295 },
296 _ => {
297 warn!("get_in_memory_bytes called on a controller with non-native source.");
298 None
299 },
300 })
301 }
302
303 pub(crate) fn reset(&self) {
305 self.queue.borrow_mut().clear();
306 self.total_size.set(Default::default());
307 }
308}
309
310#[dom_struct]
312pub(crate) struct ReadableStreamDefaultController {
313 reflector_: Reflector,
314
315 queue: QueueWithSizes,
317
318 underlying_source: MutNullableDom<UnderlyingSourceContainer>,
324
325 stream: MutNullableDom<ReadableStream>,
326
327 strategy_hwm: f64,
329
330 #[ignore_malloc_size_of = "mozjs"]
332 strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>,
333
334 close_requested: Cell<bool>,
336
337 started: Cell<bool>,
339
340 pulling: Cell<bool>,
342
343 pull_again: Cell<bool>,
345}
346
347impl ReadableStreamDefaultController {
348 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
349 fn new_inherited(
350 global: &GlobalScope,
351 underlying_source_type: UnderlyingSourceType,
352 strategy_hwm: f64,
353 strategy_size: Rc<QueuingStrategySize>,
354 can_gc: CanGc,
355 ) -> ReadableStreamDefaultController {
356 ReadableStreamDefaultController {
357 reflector_: Reflector::new(),
358 queue: Default::default(),
359 stream: MutNullableDom::new(None),
360 underlying_source: MutNullableDom::new(Some(&*UnderlyingSourceContainer::new(
361 global,
362 underlying_source_type,
363 can_gc,
364 ))),
365 strategy_hwm,
366 strategy_size: RefCell::new(Some(strategy_size)),
367 close_requested: Default::default(),
368 started: Default::default(),
369 pulling: Default::default(),
370 pull_again: Default::default(),
371 }
372 }
373
374 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
375 pub(crate) fn new(
376 global: &GlobalScope,
377 underlying_source: UnderlyingSourceType,
378 strategy_hwm: f64,
379 strategy_size: Rc<QueuingStrategySize>,
380 can_gc: CanGc,
381 ) -> DomRoot<ReadableStreamDefaultController> {
382 reflect_dom_object(
383 Box::new(ReadableStreamDefaultController::new_inherited(
384 global,
385 underlying_source,
386 strategy_hwm,
387 strategy_size,
388 can_gc,
389 )),
390 global,
391 can_gc,
392 )
393 }
394
395 pub(crate) fn setup(
397 &self,
398 stream: DomRoot<ReadableStream>,
399 can_gc: CanGc,
400 ) -> Result<(), Error> {
401 stream.assert_no_controller();
403
404 self.stream.set(Some(&stream));
406
407 let global = &*self.global();
408 let rooted_default_controller = DomRoot::from_ref(self);
409
410 stream.set_default_controller(&rooted_default_controller);
423
424 if let Some(underlying_source) = rooted_default_controller.underlying_source.get() {
425 let start_result = underlying_source
427 .call_start_algorithm(
428 Controller::ReadableStreamDefaultController(rooted_default_controller.clone()),
429 can_gc,
430 )
431 .unwrap_or_else(|| {
432 let promise = Promise::new(global, can_gc);
433 promise.resolve_native(&(), can_gc);
434 Ok(promise)
435 });
436
437 let start_promise = start_result?;
439
440 let handler = PromiseNativeHandler::new(
442 global,
443 Some(Box::new(StartAlgorithmFulfillmentHandler {
444 controller: Dom::from_ref(&rooted_default_controller),
445 })),
446 Some(Box::new(StartAlgorithmRejectionHandler {
447 controller: Dom::from_ref(&rooted_default_controller),
448 })),
449 can_gc,
450 );
451 let realm = enter_realm(global);
452 let comp = InRealm::Entered(&realm);
453 start_promise.append_native_handler(&handler, comp, can_gc);
454 };
455
456 Ok(())
457 }
458
459 pub(crate) fn set_underlying_source_this_object(&self, this_object: HandleObject) {
461 if let Some(underlying_source) = self.underlying_source.get() {
462 underlying_source.set_underlying_source_this_object(this_object);
463 }
464 }
465
466 fn dequeue_value(&self, cx: SafeJSContext, rval: MutableHandleValue, can_gc: CanGc) {
468 self.queue.dequeue_value(cx, Some(rval), can_gc);
469 }
470
471 fn should_call_pull(&self) -> bool {
473 let Some(stream) = self.stream.get() else {
477 debug!("`should_call_pull` called on a controller without a stream.");
478 return false;
479 };
480
481 if !self.can_close_or_enqueue() {
483 return false;
484 }
485
486 if !self.started.get() {
488 return false;
489 }
490
491 if stream.is_locked() && stream.get_num_read_requests() > 0 {
494 return true;
495 }
496
497 let desired_size = self.get_desired_size().expect("desiredSize is not null.");
500
501 if desired_size > 0. {
502 return true;
503 }
504
505 false
506 }
507
508 fn call_pull_if_needed(&self, can_gc: CanGc) {
510 if !self.should_call_pull() {
513 return;
514 }
515
516 if self.pulling.get() {
518 self.pull_again.set(true);
520
521 return;
522 }
523
524 self.pulling.set(true);
526
527 let global = self.global();
530 let rooted_default_controller = DomRoot::from_ref(self);
531 let controller =
532 Controller::ReadableStreamDefaultController(rooted_default_controller.clone());
533
534 let Some(underlying_source) = self.underlying_source.get() else {
535 return;
536 };
537 let handler = PromiseNativeHandler::new(
538 &global,
539 Some(Box::new(PullAlgorithmFulfillmentHandler {
540 controller: Dom::from_ref(&rooted_default_controller),
541 })),
542 Some(Box::new(PullAlgorithmRejectionHandler {
543 controller: Dom::from_ref(&rooted_default_controller),
544 })),
545 can_gc,
546 );
547
548 let realm = enter_realm(&*global);
549 let comp = InRealm::Entered(&realm);
550 let result = underlying_source
551 .call_pull_algorithm(controller, &global, can_gc)
552 .unwrap_or_else(|| {
553 let promise = Promise::new(&global, can_gc);
554 promise.resolve_native(&(), can_gc);
555 Ok(promise)
556 });
557 let promise = result.unwrap_or_else(|error| {
558 let cx = GlobalScope::get_cx();
559 rooted!(in(*cx) let mut rval = UndefinedValue());
560 error
562 .clone()
563 .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
564 let promise = Promise::new(&global, can_gc);
565 promise.reject_native(&rval.handle(), can_gc);
566 promise
567 });
568 promise.append_native_handler(&handler, comp, can_gc);
569 }
570
571 pub(crate) fn perform_cancel_steps(
573 &self,
574 cx: SafeJSContext,
575 global: &GlobalScope,
576 reason: SafeHandleValue,
577 can_gc: CanGc,
578 ) -> Rc<Promise> {
579 self.queue.reset();
581
582 let underlying_source = self
583 .underlying_source
584 .get()
585 .expect("Controller should have a source when the cancel steps are called into.");
586 let result = underlying_source
588 .call_cancel_algorithm(cx, global, reason, can_gc)
589 .unwrap_or_else(|| {
590 let promise = Promise::new(global, can_gc);
591 promise.resolve_native(&(), can_gc);
592 Ok(promise)
593 });
594 let promise = result.unwrap_or_else(|error| {
595 rooted!(in(*cx) let mut rval = UndefinedValue());
596
597 error
598 .clone()
599 .to_jsval(cx, global, rval.handle_mut(), can_gc);
600 let promise = Promise::new(global, can_gc);
601 promise.reject_native(&rval.handle(), can_gc);
602 promise
603 });
604
605 self.clear_algorithms();
607
608 promise
610 }
611
612 pub(crate) fn perform_pull_steps(&self, read_request: &ReadRequest, can_gc: CanGc) {
614 let Some(stream) = self.stream.get() else {
617 return;
618 };
619
620 if !self.queue.is_empty() {
622 let cx = GlobalScope::get_cx();
623 rooted!(in(*cx) let mut rval = UndefinedValue());
624 let result = RootedTraceableBox::new(Heap::default());
625 self.dequeue_value(cx, rval.handle_mut(), can_gc);
626 result.set(*rval);
627
628 if self.close_requested.get() && self.queue.is_empty() {
630 self.clear_algorithms();
632
633 stream.close(can_gc);
635 } else {
636 self.call_pull_if_needed(can_gc);
638 }
639 read_request.chunk_steps(result, &self.global(), can_gc);
641 } else {
642 stream.add_read_request(read_request);
644
645 self.call_pull_if_needed(can_gc);
647 }
648 }
649
650 pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
652 Ok(())
654 }
655
656 #[expect(unsafe_code)]
658 pub(crate) fn enqueue(
659 &self,
660 cx: SafeJSContext,
661 chunk: SafeHandleValue,
662 can_gc: CanGc,
663 ) -> Result<(), Error> {
664 if !self.can_close_or_enqueue() {
666 return Ok(());
667 }
668
669 let stream = self
670 .stream
671 .get()
672 .expect("Controller must have a stream when a chunk is enqueued.");
673
674 if stream.is_locked() && stream.get_num_read_requests() > 0 {
678 stream.fulfill_read_request(chunk, false, can_gc);
679 } else {
680 let strategy_size = {
685 let reference = self.strategy_size.borrow();
686 reference.clone()
687 };
688 let size = if let Some(strategy_size) = strategy_size {
689 let result = strategy_size.Call__(chunk, ExceptionHandling::Rethrow, can_gc);
692 match result {
693 Ok(size) => size,
695 Err(error) => {
696 rooted!(in(*cx) let mut rval = UndefinedValue());
698 unsafe { assert!(JS_GetPendingException(*cx, rval.handle_mut())) };
699
700 self.error(rval.handle(), can_gc);
702
703 return Err(error);
706 },
707 }
708 } else {
709 0.
710 };
711
712 {
713 let res = self
715 .queue
716 .enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize {
717 value: Heap::boxed(chunk.get()),
718 size,
719 }));
720 if let Err(error) = res {
721 throw_dom_exception(cx, &self.global(), error, can_gc);
727
728 rooted!(in(*cx) let mut rval = UndefinedValue());
731 unsafe { assert!(JS_GetPendingException(*cx, rval.handle_mut())) };
732
733 self.error(rval.handle(), can_gc);
735
736 return Err(Error::JSFailed);
740 }
741 }
742 }
743
744 self.call_pull_if_needed(can_gc);
746
747 Ok(())
748 }
749
750 pub(crate) fn enqueue_native(&self, chunk: Vec<u8>, can_gc: CanGc) {
753 let stream = self
754 .stream
755 .get()
756 .expect("Controller must have a stream when a chunk is enqueued.");
757 if stream.is_locked() && stream.get_num_read_requests() > 0 {
758 let cx = GlobalScope::get_cx();
759 rooted!(in(*cx) let mut rval = UndefinedValue());
760 EnqueuedValue::Native(chunk.into_boxed_slice()).to_jsval(cx, rval.handle_mut(), can_gc);
761 stream.fulfill_read_request(rval.handle(), false, can_gc);
762 } else {
763 self.queue
764 .enqueue_value_with_size(EnqueuedValue::Native(chunk.into_boxed_slice()))
765 .expect("Enqueuing a chunk from Rust should not fail.");
766 }
767 }
768
769 pub(crate) fn in_memory(&self) -> bool {
771 let Some(underlying_source) = self.underlying_source.get() else {
772 return false;
773 };
774 underlying_source.in_memory()
775 }
776
777 pub(crate) fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
779 let underlying_source = self.underlying_source.get()?;
780 if underlying_source.in_memory() {
781 return self.queue.get_in_memory_bytes();
782 }
783 None
784 }
785
786 fn clear_algorithms(&self) {
788 self.underlying_source.set(None);
791
792 *self.strategy_size.borrow_mut() = None;
794 }
795
796 pub(crate) fn close(&self, can_gc: CanGc) {
798 if !self.can_close_or_enqueue() {
800 return;
801 }
802
803 let Some(stream) = self.stream.get() else {
804 return;
805 };
806
807 self.close_requested.set(true);
809
810 if self.queue.is_empty() {
811 self.clear_algorithms();
813
814 stream.close(can_gc);
816 }
817 }
818
819 pub(crate) fn get_desired_size(&self) -> Option<f64> {
821 let stream = self.stream.get()?;
822
823 if stream.is_errored() {
825 return None;
826 }
827
828 if stream.is_closed() {
830 return Some(0.0);
831 }
832
833 let desired_size = self.strategy_hwm - self.queue.total_size.get().clamp(0.0, f64::MAX);
835 Some(desired_size.clamp(desired_size, self.strategy_hwm))
836 }
837
838 pub(crate) fn can_close_or_enqueue(&self) -> bool {
840 let Some(stream) = self.stream.get() else {
841 return false;
842 };
843
844 if !self.close_requested.get() && stream.is_readable() {
846 return true;
847 }
848
849 false
851 }
852
853 pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
855 let Some(stream) = self.stream.get() else {
856 return;
857 };
858
859 if !stream.is_readable() {
861 return;
862 }
863
864 self.queue.reset();
866
867 self.clear_algorithms();
869
870 stream.error(e, can_gc);
871 }
872
873 pub(crate) fn has_backpressure(&self) -> bool {
875 !self.should_call_pull()
878 }
879}
880
881impl ReadableStreamDefaultControllerMethods<crate::DomTypeHolder>
882 for ReadableStreamDefaultController
883{
884 fn GetDesiredSize(&self) -> Option<f64> {
886 self.get_desired_size()
887 }
888
889 fn Close(&self, can_gc: CanGc) -> Fallible<()> {
891 if !self.can_close_or_enqueue() {
892 return Err(Error::Type("Stream cannot be closed.".to_string()));
895 }
896
897 self.close(can_gc);
899
900 Ok(())
901 }
902
903 fn Enqueue(&self, cx: SafeJSContext, chunk: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
905 if !self.can_close_or_enqueue() {
907 return Err(Error::Type("Stream cannot be enqueued to.".to_string()));
908 }
909
910 self.enqueue(cx, chunk, can_gc)
912 }
913
914 fn Error(&self, _cx: SafeJSContext, e: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
916 self.error(e, can_gc);
917 Ok(())
918 }
919}