1use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::jsapi::{Heap, JSObject};
12use js::jsval::{JSVal, UndefinedValue};
13use js::realm::CurrentRealm;
14use js::rust::wrappers2::JS_GetPendingException;
15use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue, MutableHandleValue};
16use js::typedarray::Uint8;
17use script_bindings::conversions::SafeToJSValConvertible;
18
19use crate::dom::bindings::buffer_source::create_buffer_source;
20use crate::dom::bindings::callback::ExceptionHandling;
21use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
22use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultControllerMethods;
23use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
24use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible, throw_dom_exception};
25use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
26use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
27use crate::dom::bindings::trace::RootedTraceableBox;
28use crate::dom::globalscope::GlobalScope;
29use crate::dom::promise::Promise;
30use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
31use crate::dom::stream::readablestream::ReadableStream;
32use crate::dom::stream::readablestreamdefaultreader::ReadRequest;
33use crate::dom::stream::underlyingsourcecontainer::{
34 UnderlyingSourceContainer, UnderlyingSourceType,
35};
36use crate::realms::{InRealm, enter_auto_realm, enter_realm};
37use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
38
39#[derive(Clone, JSTraceable, MallocSizeOf)]
42#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
43struct PullAlgorithmFulfillmentHandler {
44 controller: Dom<ReadableStreamDefaultController>,
45}
46
47impl Callback for PullAlgorithmFulfillmentHandler {
48 fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
51 let can_gc = CanGc::from_cx(cx);
52 self.controller.pulling.set(false);
54
55 if self.controller.pull_again.get() {
57 self.controller.pull_again.set(false);
59
60 self.controller.call_pull_if_needed(can_gc);
62 }
63 }
64}
65
66#[derive(Clone, JSTraceable, MallocSizeOf)]
69#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
70struct PullAlgorithmRejectionHandler {
71 controller: Dom<ReadableStreamDefaultController>,
72}
73
74impl Callback for PullAlgorithmRejectionHandler {
75 fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
78 let can_gc = CanGc::from_cx(cx);
79 self.controller.error(v, can_gc);
81 }
82}
83
84#[derive(Clone, JSTraceable, MallocSizeOf)]
87#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
88struct StartAlgorithmFulfillmentHandler {
89 controller: Dom<ReadableStreamDefaultController>,
90}
91
92impl Callback for StartAlgorithmFulfillmentHandler {
93 fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
96 let can_gc = CanGc::from_cx(cx);
97 self.controller.started.set(true);
99
100 self.controller.call_pull_if_needed(can_gc);
102 }
103}
104
105#[derive(Clone, JSTraceable, MallocSizeOf)]
108#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
109struct StartAlgorithmRejectionHandler {
110 controller: Dom<ReadableStreamDefaultController>,
111}
112
113impl Callback for StartAlgorithmRejectionHandler {
114 fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
117 let can_gc = CanGc::from_cx(cx);
118 self.controller.error(v, can_gc);
120 }
121}
122
123#[derive(Debug, JSTraceable, MallocSizeOf, PartialEq)]
125#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
126pub(crate) struct ValueWithSize {
127 #[ignore_malloc_size_of = "Heap is measured by mozjs"]
129 pub(crate) value: Box<Heap<JSVal>>,
130 pub(crate) size: f64,
132}
133
134#[derive(Debug, JSTraceable, MallocSizeOf, PartialEq)]
136#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
137pub(crate) enum EnqueuedValue {
138 Native(Box<[u8]>),
140 Js(ValueWithSize),
142 CloseSentinel,
144}
145
146impl EnqueuedValue {
147 fn size(&self) -> f64 {
148 match self {
149 EnqueuedValue::Native(v) => v.len() as f64,
150 EnqueuedValue::Js(v) => v.size,
151 EnqueuedValue::CloseSentinel => 0.,
154 }
155 }
156
157 fn to_jsval(&self, cx: SafeJSContext, rval: MutableHandleValue, can_gc: CanGc) {
158 match self {
159 EnqueuedValue::Native(chunk) => {
160 rooted!(in(*cx) let mut array_buffer_ptr = ptr::null_mut::<JSObject>());
161 create_buffer_source::<Uint8>(cx, chunk, array_buffer_ptr.handle_mut(), can_gc)
162 .expect("failed to create buffer source for native chunk.");
163 array_buffer_ptr.safe_to_jsval(cx, rval, can_gc);
164 },
165 EnqueuedValue::Js(value_with_size) => {
166 value_with_size.value.safe_to_jsval(cx, rval, can_gc)
167 },
168 EnqueuedValue::CloseSentinel => {
169 unreachable!("The close sentinel is never made available as a js val.")
170 },
171 }
172 }
173}
174
175fn is_non_negative_number(value: &EnqueuedValue) -> bool {
177 let value_with_size = match value {
178 EnqueuedValue::Native(_) => return true,
179 EnqueuedValue::Js(value_with_size) => value_with_size,
180 EnqueuedValue::CloseSentinel => return true,
181 };
182
183 if value_with_size.size.is_nan() {
188 return false;
189 }
190
191 if value_with_size.size.is_sign_negative() {
193 return false;
194 }
195
196 true
197}
198
199#[derive(Default, JSTraceable, MallocSizeOf)]
201#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
202pub(crate) struct QueueWithSizes {
203 queue: RefCell<VecDeque<EnqueuedValue>>,
204 pub(crate) total_size: Cell<f64>,
206}
207
208impl QueueWithSizes {
209 pub(crate) fn dequeue_value(
213 &self,
214 cx: SafeJSContext,
215 rval: Option<MutableHandleValue>,
216 can_gc: CanGc,
217 ) {
218 {
219 let queue = self.queue.borrow();
220 let Some(value) = queue.front() else {
221 unreachable!("Buffer cannot be empty when dequeue value is called into.");
222 };
223 self.total_size.set(self.total_size.get() - value.size());
224 if let Some(rval) = rval {
225 value.to_jsval(cx, rval, can_gc);
226 } else {
227 assert_eq!(value, &EnqueuedValue::CloseSentinel);
228 }
229 }
230 self.queue.borrow_mut().pop_front();
231 }
232
233 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
235 pub(crate) fn enqueue_value_with_size(&self, value: EnqueuedValue) -> Result<(), Error> {
236 if !is_non_negative_number(&value) {
238 return Err(Error::Range(
239 c"The size of the enqueued chunk is not a non-negative number.".to_owned(),
240 ));
241 }
242
243 if value.size().is_infinite() {
245 return Err(Error::Range(
246 c"The size of the enqueued chunk is infinite.".to_owned(),
247 ));
248 }
249
250 self.total_size.set(self.total_size.get() + value.size());
251 self.queue.borrow_mut().push_back(value);
252
253 Ok(())
254 }
255
256 pub(crate) fn is_empty(&self) -> bool {
257 self.queue.borrow().is_empty()
258 }
259
260 pub(crate) fn peek_queue_value(
263 &self,
264 cx: SafeJSContext,
265 rval: MutableHandleValue,
266 can_gc: CanGc,
267 ) -> bool {
268 assert!(!self.is_empty());
273
274 let queue = self.queue.borrow();
276 let value_with_size = queue.front().expect("Queue is not empty.");
277 if let EnqueuedValue::CloseSentinel = value_with_size {
278 return true;
279 }
280
281 value_with_size.to_jsval(cx, rval, can_gc);
283 false
284 }
285
286 fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
288 self.queue
289 .borrow()
290 .iter()
291 .try_fold(Vec::new(), |mut acc, value| match value {
292 EnqueuedValue::Native(chunk) => {
293 acc.extend(chunk.iter().copied());
294 Some(acc)
295 },
296 _ => {
297 warn!("get_in_memory_bytes called on a controller with non-native source.");
298 None
299 },
300 })
301 }
302
303 pub(crate) fn reset(&self) {
305 self.queue.borrow_mut().clear();
306 self.total_size.set(Default::default());
307 }
308}
309
310#[dom_struct]
312pub(crate) struct ReadableStreamDefaultController {
313 reflector_: Reflector,
314
315 queue: QueueWithSizes,
317
318 underlying_source: MutNullableDom<UnderlyingSourceContainer>,
324
325 stream: MutNullableDom<ReadableStream>,
326
327 strategy_hwm: f64,
329
330 #[ignore_malloc_size_of = "mozjs"]
332 strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>,
333
334 close_requested: Cell<bool>,
336
337 started: Cell<bool>,
339
340 pulling: Cell<bool>,
342
343 pull_again: Cell<bool>,
345}
346
347impl ReadableStreamDefaultController {
348 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
349 fn new_inherited(
350 global: &GlobalScope,
351 underlying_source_type: UnderlyingSourceType,
352 strategy_hwm: f64,
353 strategy_size: Rc<QueuingStrategySize>,
354 can_gc: CanGc,
355 ) -> ReadableStreamDefaultController {
356 ReadableStreamDefaultController {
357 reflector_: Reflector::new(),
358 queue: Default::default(),
359 stream: MutNullableDom::new(None),
360 underlying_source: MutNullableDom::new(Some(&*UnderlyingSourceContainer::new(
361 global,
362 underlying_source_type,
363 can_gc,
364 ))),
365 strategy_hwm,
366 strategy_size: RefCell::new(Some(strategy_size)),
367 close_requested: Default::default(),
368 started: Default::default(),
369 pulling: Default::default(),
370 pull_again: Default::default(),
371 }
372 }
373
374 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
375 pub(crate) fn new(
376 global: &GlobalScope,
377 underlying_source: UnderlyingSourceType,
378 strategy_hwm: f64,
379 strategy_size: Rc<QueuingStrategySize>,
380 can_gc: CanGc,
381 ) -> DomRoot<ReadableStreamDefaultController> {
382 reflect_dom_object(
383 Box::new(ReadableStreamDefaultController::new_inherited(
384 global,
385 underlying_source,
386 strategy_hwm,
387 strategy_size,
388 can_gc,
389 )),
390 global,
391 can_gc,
392 )
393 }
394
395 pub(crate) fn setup(
397 &self,
398 cx: &mut js::context::JSContext,
399 stream: DomRoot<ReadableStream>,
400 ) -> Result<(), Error> {
401 stream.assert_no_controller();
403
404 self.stream.set(Some(&stream));
406
407 let global = &*self.global();
408 let rooted_default_controller = DomRoot::from_ref(self);
409
410 stream.set_default_controller(&rooted_default_controller);
423
424 if let Some(underlying_source) = rooted_default_controller.underlying_source.get() {
425 let start_result = underlying_source
427 .call_start_algorithm(
428 cx,
429 Controller::ReadableStreamDefaultController(rooted_default_controller.clone()),
430 )
431 .unwrap_or_else(|| {
432 let promise = Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
433 Ok(promise)
434 });
435
436 let start_promise = start_result?;
438
439 let handler = PromiseNativeHandler::new(
441 global,
442 Some(Box::new(StartAlgorithmFulfillmentHandler {
443 controller: Dom::from_ref(&rooted_default_controller),
444 })),
445 Some(Box::new(StartAlgorithmRejectionHandler {
446 controller: Dom::from_ref(&rooted_default_controller),
447 })),
448 CanGc::from_cx(cx),
449 );
450 let mut realm = enter_auto_realm(cx, global);
451 let cx = &mut realm.current_realm();
452 let in_realm_proof = cx.into();
453 let comp = InRealm::Already(&in_realm_proof);
454 start_promise.append_native_handler(&handler, comp, CanGc::from_cx(cx));
455 };
456
457 Ok(())
458 }
459
460 pub(crate) fn set_underlying_source_this_object(&self, this_object: HandleObject) {
462 if let Some(underlying_source) = self.underlying_source.get() {
463 underlying_source.set_underlying_source_this_object(this_object);
464 }
465 }
466
467 fn dequeue_value(&self, cx: SafeJSContext, rval: MutableHandleValue, can_gc: CanGc) {
469 self.queue.dequeue_value(cx, Some(rval), can_gc);
470 }
471
472 fn should_call_pull(&self) -> bool {
474 let Some(stream) = self.stream.get() else {
478 debug!("`should_call_pull` called on a controller without a stream.");
479 return false;
480 };
481
482 if !self.can_close_or_enqueue() {
484 return false;
485 }
486
487 if !self.started.get() {
489 return false;
490 }
491
492 if stream.is_locked() && stream.get_num_read_requests() > 0 {
495 return true;
496 }
497
498 let desired_size = self.get_desired_size().expect("desiredSize is not null.");
501
502 if desired_size > 0. {
503 return true;
504 }
505
506 false
507 }
508
509 fn call_pull_if_needed(&self, can_gc: CanGc) {
511 if !self.should_call_pull() {
514 return;
515 }
516
517 if self.pulling.get() {
519 self.pull_again.set(true);
521
522 return;
523 }
524
525 self.pulling.set(true);
527
528 let global = self.global();
531 let rooted_default_controller = DomRoot::from_ref(self);
532 let controller =
533 Controller::ReadableStreamDefaultController(rooted_default_controller.clone());
534
535 let Some(underlying_source) = self.underlying_source.get() else {
536 return;
537 };
538 let handler = PromiseNativeHandler::new(
539 &global,
540 Some(Box::new(PullAlgorithmFulfillmentHandler {
541 controller: Dom::from_ref(&rooted_default_controller),
542 })),
543 Some(Box::new(PullAlgorithmRejectionHandler {
544 controller: Dom::from_ref(&rooted_default_controller),
545 })),
546 can_gc,
547 );
548
549 let realm = enter_realm(&*global);
550 let comp = InRealm::Entered(&realm);
551 let result = underlying_source
552 .call_pull_algorithm(controller, can_gc)
553 .unwrap_or_else(|| {
554 let promise = Promise::new(&global, can_gc);
555 promise.resolve_native(&(), can_gc);
556 Ok(promise)
557 });
558 let promise = result.unwrap_or_else(|error| {
559 let cx = GlobalScope::get_cx();
560 rooted!(in(*cx) let mut rval = UndefinedValue());
561 error.to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
563 let promise = Promise::new(&global, can_gc);
564 promise.reject_native(&rval.handle(), can_gc);
565 promise
566 });
567 promise.append_native_handler(&handler, comp, can_gc);
568 }
569
570 pub(crate) fn perform_cancel_steps(
572 &self,
573 cx: &mut js::context::JSContext,
574 global: &GlobalScope,
575 reason: SafeHandleValue,
576 ) -> Rc<Promise> {
577 self.queue.reset();
579
580 let underlying_source = self
581 .underlying_source
582 .get()
583 .expect("Controller should have a source when the cancel steps are called into.");
584 let result = underlying_source
586 .call_cancel_algorithm(cx, global, reason)
587 .unwrap_or_else(|| {
588 let promise = Promise::new2(cx, global);
589 promise.resolve_native(&(), CanGc::from_cx(cx));
590 Ok(promise)
591 });
592 let promise = result.unwrap_or_else(|error| {
593 rooted!(&in(cx) let mut rval = UndefinedValue());
594
595 error.to_jsval(cx.into(), global, rval.handle_mut(), CanGc::from_cx(cx));
596 let promise = Promise::new2(cx, global);
597 promise.reject_native(&rval.handle(), CanGc::from_cx(cx));
598 promise
599 });
600
601 self.clear_algorithms();
603
604 promise
606 }
607
608 pub(crate) fn perform_pull_steps(&self, read_request: &ReadRequest, can_gc: CanGc) {
610 let Some(stream) = self.stream.get() else {
613 return;
614 };
615
616 if !self.queue.is_empty() {
618 let cx = GlobalScope::get_cx();
619 rooted!(in(*cx) let mut rval = UndefinedValue());
620 let result = RootedTraceableBox::new(Heap::default());
621 self.dequeue_value(cx, rval.handle_mut(), can_gc);
622 result.set(*rval);
623
624 if self.close_requested.get() && self.queue.is_empty() {
626 self.clear_algorithms();
628
629 stream.close(can_gc);
631 } else {
632 self.call_pull_if_needed(can_gc);
634 }
635 read_request.chunk_steps(result, &self.global(), can_gc);
637 } else {
638 stream.add_read_request(read_request);
640
641 self.call_pull_if_needed(can_gc);
643 }
644 }
645
646 pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
648 Ok(())
650 }
651
652 #[expect(unsafe_code)]
654 pub(crate) fn enqueue(
655 &self,
656 cx: &mut js::context::JSContext,
657 chunk: SafeHandleValue,
658 ) -> Result<(), Error> {
659 if !self.can_close_or_enqueue() {
661 return Ok(());
662 }
663
664 let stream = self
665 .stream
666 .get()
667 .expect("Controller must have a stream when a chunk is enqueued.");
668
669 if stream.is_locked() && stream.get_num_read_requests() > 0 {
673 stream.fulfill_read_request(chunk, false, CanGc::from_cx(cx));
674 } else {
675 let strategy_size = {
680 let reference = self.strategy_size.borrow();
681 reference.clone()
682 };
683 let size = if let Some(strategy_size) = strategy_size {
684 let result =
687 strategy_size.Call__(chunk, ExceptionHandling::Rethrow, CanGc::from_cx(cx));
688 match result {
689 Ok(size) => size,
691 Err(error) => {
692 rooted!(&in(cx) let mut rval = UndefinedValue());
694 unsafe { assert!(JS_GetPendingException(cx, rval.handle_mut())) };
695
696 self.error(rval.handle(), CanGc::from_cx(cx));
698
699 return Err(error);
702 },
703 }
704 } else {
705 0.
706 };
707
708 {
709 let res = self
711 .queue
712 .enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize {
713 value: Heap::boxed(chunk.get()),
714 size,
715 }));
716 if let Err(error) = res {
717 throw_dom_exception(cx.into(), &self.global(), error, CanGc::from_cx(cx));
723
724 rooted!(&in(cx) let mut rval = UndefinedValue());
727 unsafe { assert!(JS_GetPendingException(cx, rval.handle_mut())) };
728
729 self.error(rval.handle(), CanGc::from_cx(cx));
731
732 return Err(Error::JSFailed);
736 }
737 }
738 }
739
740 self.call_pull_if_needed(CanGc::from_cx(cx));
742
743 Ok(())
744 }
745
746 pub(crate) fn enqueue_native(&self, chunk: Vec<u8>, can_gc: CanGc) {
749 let stream = self
750 .stream
751 .get()
752 .expect("Controller must have a stream when a chunk is enqueued.");
753 if stream.is_locked() && stream.get_num_read_requests() > 0 {
754 let cx = GlobalScope::get_cx();
755 rooted!(in(*cx) let mut rval = UndefinedValue());
756 EnqueuedValue::Native(chunk.into_boxed_slice()).to_jsval(cx, rval.handle_mut(), can_gc);
757 stream.fulfill_read_request(rval.handle(), false, can_gc);
758 } else {
759 self.queue
760 .enqueue_value_with_size(EnqueuedValue::Native(chunk.into_boxed_slice()))
761 .expect("Enqueuing a chunk from Rust should not fail.");
762 }
763 }
764
765 pub(crate) fn in_memory(&self) -> bool {
767 let Some(underlying_source) = self.underlying_source.get() else {
768 return false;
769 };
770 underlying_source.in_memory()
771 }
772
773 pub(crate) fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
775 let underlying_source = self.underlying_source.get()?;
776 if underlying_source.in_memory() {
777 return self.queue.get_in_memory_bytes();
778 }
779 None
780 }
781
782 fn clear_algorithms(&self) {
784 self.underlying_source.set(None);
787
788 *self.strategy_size.borrow_mut() = None;
790 }
791
792 pub(crate) fn close(&self, can_gc: CanGc) {
794 if !self.can_close_or_enqueue() {
796 return;
797 }
798
799 let Some(stream) = self.stream.get() else {
800 return;
801 };
802
803 self.close_requested.set(true);
805
806 if self.queue.is_empty() {
807 self.clear_algorithms();
809
810 stream.close(can_gc);
812 }
813 }
814
815 pub(crate) fn get_desired_size(&self) -> Option<f64> {
817 let stream = self.stream.get()?;
818
819 if stream.is_errored() {
821 return None;
822 }
823
824 if stream.is_closed() {
826 return Some(0.0);
827 }
828
829 let desired_size = self.strategy_hwm - self.queue.total_size.get().clamp(0.0, f64::MAX);
831 Some(desired_size.clamp(desired_size, self.strategy_hwm))
832 }
833
834 pub(crate) fn can_close_or_enqueue(&self) -> bool {
836 let Some(stream) = self.stream.get() else {
837 return false;
838 };
839
840 if !self.close_requested.get() && stream.is_readable() {
842 return true;
843 }
844
845 false
847 }
848
849 pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
851 let Some(stream) = self.stream.get() else {
852 return;
853 };
854
855 if !stream.is_readable() {
857 return;
858 }
859
860 self.queue.reset();
862
863 self.clear_algorithms();
865
866 stream.error(e, can_gc);
867 }
868
869 pub(crate) fn has_backpressure(&self) -> bool {
871 !self.should_call_pull()
874 }
875}
876
877impl ReadableStreamDefaultControllerMethods<crate::DomTypeHolder>
878 for ReadableStreamDefaultController
879{
880 fn GetDesiredSize(&self) -> Option<f64> {
882 self.get_desired_size()
883 }
884
885 fn Close(&self, can_gc: CanGc) -> Fallible<()> {
887 if !self.can_close_or_enqueue() {
888 return Err(Error::Type(c"Stream cannot be closed.".to_owned()));
891 }
892
893 self.close(can_gc);
895
896 Ok(())
897 }
898
899 fn Enqueue(&self, cx: &mut js::context::JSContext, chunk: SafeHandleValue) -> Fallible<()> {
901 if !self.can_close_or_enqueue() {
903 return Err(Error::Type(c"Stream cannot be enqueued to.".to_owned()));
904 }
905
906 self.enqueue(cx, chunk)
908 }
909
910 fn Error(&self, cx: &mut js::context::JSContext, e: SafeHandleValue) -> Fallible<()> {
912 self.error(e, CanGc::from_cx(cx));
913 Ok(())
914 }
915}