1use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::context::JSContext;
12use js::jsapi::{Heap, JSObject};
13use js::jsval::{JSVal, UndefinedValue};
14use js::realm::CurrentRealm;
15use js::rust::wrappers2::JS_GetPendingException;
16use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue, MutableHandleValue};
17use js::typedarray::Uint8;
18use script_bindings::conversions::SafeToJSValConvertible;
19use script_bindings::reflector::{Reflector, reflect_dom_object_with_cx};
20
21use crate::dom::bindings::buffer_source::create_buffer_source;
22use crate::dom::bindings::callback::ExceptionHandling;
23use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
24use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultControllerMethods;
25use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
26use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible, throw_dom_exception};
27use crate::dom::bindings::reflector::DomGlobal;
28use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
29use crate::dom::bindings::trace::RootedTraceableBox;
30use crate::dom::globalscope::GlobalScope;
31use crate::dom::promise::Promise;
32use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
33use crate::dom::stream::readablestream::ReadableStream;
34use crate::dom::stream::readablestreamdefaultreader::ReadRequest;
35use crate::dom::stream::underlyingsourcecontainer::{
36 UnderlyingSourceContainer, UnderlyingSourceType,
37};
38use crate::realms::enter_auto_realm;
39
40#[derive(Clone, JSTraceable, MallocSizeOf)]
43#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
44struct PullAlgorithmFulfillmentHandler {
45 controller: Dom<ReadableStreamDefaultController>,
46}
47
48impl Callback for PullAlgorithmFulfillmentHandler {
49 fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
52 self.controller.pulling.set(false);
54
55 if self.controller.pull_again.get() {
57 self.controller.pull_again.set(false);
59
60 self.controller.call_pull_if_needed(cx);
62 }
63 }
64}
65
66#[derive(Clone, JSTraceable, MallocSizeOf)]
69#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
70struct PullAlgorithmRejectionHandler {
71 controller: Dom<ReadableStreamDefaultController>,
72}
73
74impl Callback for PullAlgorithmRejectionHandler {
75 fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
78 self.controller.error(cx, v);
80 }
81}
82
83#[derive(Clone, JSTraceable, MallocSizeOf)]
86#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
87struct StartAlgorithmFulfillmentHandler {
88 controller: Dom<ReadableStreamDefaultController>,
89}
90
91impl Callback for StartAlgorithmFulfillmentHandler {
92 fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
95 self.controller.started.set(true);
97
98 self.controller.call_pull_if_needed(cx);
100 }
101}
102
103#[derive(Clone, JSTraceable, MallocSizeOf)]
106#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
107struct StartAlgorithmRejectionHandler {
108 controller: Dom<ReadableStreamDefaultController>,
109}
110
111impl Callback for StartAlgorithmRejectionHandler {
112 fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
115 self.controller.error(cx, v);
117 }
118}
119
120#[derive(Debug, JSTraceable, MallocSizeOf, PartialEq)]
122#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
123pub(crate) struct ValueWithSize {
124 #[ignore_malloc_size_of = "Heap is measured by mozjs"]
126 pub(crate) value: Box<Heap<JSVal>>,
127 pub(crate) size: f64,
129}
130
131#[derive(Debug, JSTraceable, MallocSizeOf, PartialEq)]
133#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
134pub(crate) enum EnqueuedValue {
135 Native(Box<[u8]>),
137 Js(ValueWithSize),
139 CloseSentinel,
141}
142
143impl EnqueuedValue {
144 fn size(&self) -> f64 {
145 match self {
146 EnqueuedValue::Native(v) => v.len() as f64,
147 EnqueuedValue::Js(v) => v.size,
148 EnqueuedValue::CloseSentinel => 0.,
151 }
152 }
153
154 fn to_jsval(&self, cx: &mut JSContext, rval: MutableHandleValue) {
155 match self {
156 EnqueuedValue::Native(chunk) => {
157 rooted!(&in(cx) let mut array_buffer_ptr = ptr::null_mut::<JSObject>());
158 create_buffer_source::<Uint8>(cx, chunk, array_buffer_ptr.handle_mut())
159 .expect("failed to create buffer source for native chunk.");
160 array_buffer_ptr.safe_to_jsval(cx, rval);
161 },
162 EnqueuedValue::Js(value_with_size) => value_with_size.value.safe_to_jsval(cx, rval),
163 EnqueuedValue::CloseSentinel => {
164 unreachable!("The close sentinel is never made available as a js val.")
165 },
166 }
167 }
168}
169
170fn is_non_negative_number(value: &EnqueuedValue) -> bool {
172 let value_with_size = match value {
173 EnqueuedValue::Native(_) => return true,
174 EnqueuedValue::Js(value_with_size) => value_with_size,
175 EnqueuedValue::CloseSentinel => return true,
176 };
177
178 if value_with_size.size.is_nan() {
183 return false;
184 }
185
186 if value_with_size.size.is_sign_negative() {
188 return false;
189 }
190
191 true
192}
193
194#[derive(Default, JSTraceable, MallocSizeOf)]
196#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
197pub(crate) struct QueueWithSizes {
198 queue: RefCell<VecDeque<EnqueuedValue>>,
199 pub(crate) total_size: Cell<f64>,
201}
202
203impl QueueWithSizes {
204 pub(crate) fn dequeue_value(&self, cx: &mut JSContext, rval: Option<MutableHandleValue>) {
208 {
209 let queue = self.queue.borrow();
210 let Some(value) = queue.front() else {
211 unreachable!("Buffer cannot be empty when dequeue value is called into.");
212 };
213 self.total_size.set(self.total_size.get() - value.size());
214 if let Some(rval) = rval {
215 value.to_jsval(cx, rval);
216 } else {
217 assert_eq!(value, &EnqueuedValue::CloseSentinel);
218 }
219 }
220 self.queue.borrow_mut().pop_front();
221 }
222
223 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
225 pub(crate) fn enqueue_value_with_size(&self, value: EnqueuedValue) -> Result<(), Error> {
226 if !is_non_negative_number(&value) {
228 return Err(Error::Range(
229 c"The size of the enqueued chunk is not a non-negative number.".to_owned(),
230 ));
231 }
232
233 if value.size().is_infinite() {
235 return Err(Error::Range(
236 c"The size of the enqueued chunk is infinite.".to_owned(),
237 ));
238 }
239
240 self.total_size.set(self.total_size.get() + value.size());
241 self.queue.borrow_mut().push_back(value);
242
243 Ok(())
244 }
245
246 pub(crate) fn is_empty(&self) -> bool {
247 self.queue.borrow().is_empty()
248 }
249
250 pub(crate) fn peek_queue_value(&self, cx: &mut JSContext, rval: MutableHandleValue) -> bool {
253 assert!(!self.is_empty());
258
259 let queue = self.queue.borrow();
261 let value_with_size = queue.front().expect("Queue is not empty.");
262 if let EnqueuedValue::CloseSentinel = value_with_size {
263 return true;
264 }
265
266 value_with_size.to_jsval(cx, rval);
268 false
269 }
270
271 fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
273 self.queue
274 .borrow()
275 .iter()
276 .try_fold(Vec::new(), |mut acc, value| match value {
277 EnqueuedValue::Native(chunk) => {
278 acc.extend(chunk.iter().copied());
279 Some(acc)
280 },
281 _ => {
282 warn!("get_in_memory_bytes called on a controller with non-native source.");
283 None
284 },
285 })
286 }
287
288 pub(crate) fn reset(&self) {
290 self.queue.borrow_mut().clear();
291 self.total_size.set(Default::default());
292 }
293}
294
295#[dom_struct]
297pub(crate) struct ReadableStreamDefaultController {
298 reflector_: Reflector,
299
300 queue: QueueWithSizes,
302
303 underlying_source: MutNullableDom<UnderlyingSourceContainer>,
309
310 stream: MutNullableDom<ReadableStream>,
311
312 strategy_hwm: f64,
314
315 #[ignore_malloc_size_of = "mozjs"]
317 strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>,
318
319 close_requested: Cell<bool>,
321
322 started: Cell<bool>,
324
325 pulling: Cell<bool>,
327
328 pull_again: Cell<bool>,
330}
331
332impl ReadableStreamDefaultController {
333 fn new_inherited(
334 strategy_hwm: f64,
335 strategy_size: Rc<QueuingStrategySize>,
336 underlying_source: &UnderlyingSourceContainer,
337 ) -> ReadableStreamDefaultController {
338 ReadableStreamDefaultController {
339 reflector_: Reflector::new(),
340 queue: Default::default(),
341 stream: MutNullableDom::new(None),
342 underlying_source: MutNullableDom::new(Some(underlying_source)),
343 strategy_hwm,
344 strategy_size: RefCell::new(Some(strategy_size)),
345 close_requested: Default::default(),
346 started: Default::default(),
347 pulling: Default::default(),
348 pull_again: Default::default(),
349 }
350 }
351
352 pub(crate) fn new(
353 cx: &mut JSContext,
354 global: &GlobalScope,
355 underlying_source: UnderlyingSourceType,
356 strategy_hwm: f64,
357 strategy_size: Rc<QueuingStrategySize>,
358 ) -> DomRoot<ReadableStreamDefaultController> {
359 let underlying_source = UnderlyingSourceContainer::new(cx, global, underlying_source);
360 reflect_dom_object_with_cx(
361 Box::new(ReadableStreamDefaultController::new_inherited(
362 strategy_hwm,
363 strategy_size,
364 &underlying_source,
365 )),
366 global,
367 cx,
368 )
369 }
370
371 pub(crate) fn setup(
373 &self,
374 cx: &mut JSContext,
375 stream: DomRoot<ReadableStream>,
376 ) -> Result<(), Error> {
377 stream.assert_no_controller();
379
380 self.stream.set(Some(&stream));
382
383 let global = &*self.global();
384 let rooted_default_controller = DomRoot::from_ref(self);
385
386 stream.set_default_controller(&rooted_default_controller);
399
400 if let Some(underlying_source) = rooted_default_controller.underlying_source.get() {
401 let start_result = underlying_source
403 .call_start_algorithm(
404 cx,
405 Controller::ReadableStreamDefaultController(rooted_default_controller.clone()),
406 )
407 .unwrap_or_else(|| {
408 let promise = Promise::new_resolved(cx, global, ());
409 Ok(promise)
410 });
411
412 let start_promise = start_result?;
414
415 let handler = PromiseNativeHandler::new(
417 cx,
418 global,
419 Some(Box::new(StartAlgorithmFulfillmentHandler {
420 controller: Dom::from_ref(&rooted_default_controller),
421 })),
422 Some(Box::new(StartAlgorithmRejectionHandler {
423 controller: Dom::from_ref(&rooted_default_controller),
424 })),
425 );
426 let mut realm = enter_auto_realm(cx, global);
427 let cx = &mut realm.current_realm();
428 start_promise.append_native_handler(cx, &handler);
429 };
430
431 Ok(())
432 }
433
434 pub(crate) fn set_underlying_source_this_object(&self, this_object: HandleObject) {
436 if let Some(underlying_source) = self.underlying_source.get() {
437 underlying_source.set_underlying_source_this_object(this_object);
438 }
439 }
440
441 fn dequeue_value(&self, cx: &mut JSContext, rval: MutableHandleValue) {
443 self.queue.dequeue_value(cx, Some(rval));
444 }
445
446 fn should_call_pull(&self) -> bool {
448 let Some(stream) = self.stream.get() else {
452 debug!("`should_call_pull` called on a controller without a stream.");
453 return false;
454 };
455
456 if !self.can_close_or_enqueue() {
458 return false;
459 }
460
461 if !self.started.get() {
463 return false;
464 }
465
466 if stream.is_locked() && stream.get_num_read_requests() > 0 {
469 return true;
470 }
471
472 let desired_size = self.get_desired_size().expect("desiredSize is not null.");
475
476 if desired_size > 0. {
477 return true;
478 }
479
480 false
481 }
482
483 fn call_pull_if_needed(&self, cx: &mut JSContext) {
485 if !self.should_call_pull() {
488 return;
489 }
490
491 if self.pulling.get() {
493 self.pull_again.set(true);
495
496 return;
497 }
498
499 self.pulling.set(true);
501
502 let global = self.global();
505 let rooted_default_controller = DomRoot::from_ref(self);
506 let controller =
507 Controller::ReadableStreamDefaultController(rooted_default_controller.clone());
508
509 let Some(underlying_source) = self.underlying_source.get() else {
510 return;
511 };
512 let handler = PromiseNativeHandler::new(
513 cx,
514 &global,
515 Some(Box::new(PullAlgorithmFulfillmentHandler {
516 controller: Dom::from_ref(&rooted_default_controller),
517 })),
518 Some(Box::new(PullAlgorithmRejectionHandler {
519 controller: Dom::from_ref(&rooted_default_controller),
520 })),
521 );
522
523 let mut realm = enter_auto_realm(cx, &*global);
524 let cx = &mut realm.current_realm();
525
526 let result = underlying_source
527 .call_pull_algorithm(cx, controller)
528 .unwrap_or_else(|| {
529 let promise = Promise::new_resolved(cx, &global, ());
530 Ok(promise)
531 });
532 let promise = result.unwrap_or_else(|error| {
533 rooted!(&in(cx) let mut rval = UndefinedValue());
534 error.to_jsval(cx, &global, rval.handle_mut());
536 Promise::new_rejected(cx, &global, rval.handle())
537 });
538 promise.append_native_handler(cx, &handler);
539 }
540
541 pub(crate) fn perform_cancel_steps(
543 &self,
544 cx: &mut JSContext,
545 global: &GlobalScope,
546 reason: SafeHandleValue,
547 ) -> Rc<Promise> {
548 self.queue.reset();
550
551 let underlying_source = self
552 .underlying_source
553 .get()
554 .expect("Controller should have a source when the cancel steps are called into.");
555 let result = underlying_source
557 .call_cancel_algorithm(cx, global, reason)
558 .unwrap_or_else(|| {
559 let promise = Promise::new(cx, global);
560 promise.resolve_native(cx, &());
561 Ok(promise)
562 });
563 let promise = result.unwrap_or_else(|error| {
564 rooted!(&in(cx) let mut rval = UndefinedValue());
565
566 error.to_jsval(cx, global, rval.handle_mut());
567 let promise = Promise::new(cx, global);
568 promise.reject_native(cx, &rval.handle());
569 promise
570 });
571
572 self.clear_algorithms();
574
575 promise
577 }
578
579 pub(crate) fn perform_pull_steps(&self, cx: &mut JSContext, read_request: &ReadRequest) {
581 let Some(stream) = self.stream.get() else {
584 return;
585 };
586
587 if !self.queue.is_empty() {
589 rooted!(&in(cx) let mut rval = UndefinedValue());
590 let result = RootedTraceableBox::new(Heap::default());
591 self.dequeue_value(cx, rval.handle_mut());
592 result.set(*rval);
593
594 if self.close_requested.get() && self.queue.is_empty() {
596 self.clear_algorithms();
598
599 stream.close(cx);
601 } else {
602 self.call_pull_if_needed(cx);
604 }
605 read_request.chunk_steps(cx, result, &self.global());
607 } else {
608 stream.add_read_request(read_request);
610
611 self.call_pull_if_needed(cx);
613 }
614 }
615
616 pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
618 Ok(())
620 }
621
622 #[expect(unsafe_code)]
624 pub(crate) fn enqueue(&self, cx: &mut JSContext, chunk: SafeHandleValue) -> Result<(), Error> {
625 if !self.can_close_or_enqueue() {
627 return Ok(());
628 }
629
630 let stream = self
631 .stream
632 .get()
633 .expect("Controller must have a stream when a chunk is enqueued.");
634
635 if stream.is_locked() && stream.get_num_read_requests() > 0 {
639 stream.fulfill_read_request(cx, chunk, false);
640 } else {
641 let strategy_size = {
646 let reference = self.strategy_size.borrow();
647 reference.clone()
648 };
649 let size = if let Some(strategy_size) = strategy_size {
650 let result = strategy_size.Call__(cx, chunk, ExceptionHandling::Rethrow);
653 match result {
654 Ok(size) => size,
656 Err(error) => {
657 rooted!(&in(cx) let mut rval = UndefinedValue());
659 unsafe { assert!(JS_GetPendingException(cx, rval.handle_mut())) };
660
661 self.error(cx, rval.handle());
663
664 return Err(error);
667 },
668 }
669 } else {
670 0.
671 };
672
673 {
674 let res = self
676 .queue
677 .enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize {
678 value: Heap::boxed(chunk.get()),
679 size,
680 }));
681 if let Err(error) = res {
682 throw_dom_exception(cx, &self.global(), error);
688
689 rooted!(&in(cx) let mut rval = UndefinedValue());
692 unsafe { assert!(JS_GetPendingException(cx, rval.handle_mut())) };
693
694 self.error(cx, rval.handle());
696
697 return Err(Error::JSFailed);
701 }
702 }
703 }
704
705 self.call_pull_if_needed(cx);
707
708 Ok(())
709 }
710
711 pub(crate) fn enqueue_native(&self, cx: &mut JSContext, chunk: Vec<u8>) {
714 let stream = self
715 .stream
716 .get()
717 .expect("Controller must have a stream when a chunk is enqueued.");
718 if stream.is_locked() && stream.get_num_read_requests() > 0 {
719 rooted!(&in(cx) let mut rval = UndefinedValue());
720 EnqueuedValue::Native(chunk.into_boxed_slice()).to_jsval(cx, rval.handle_mut());
721 stream.fulfill_read_request(cx, rval.handle(), false);
722 } else {
723 self.queue
724 .enqueue_value_with_size(EnqueuedValue::Native(chunk.into_boxed_slice()))
725 .expect("Enqueuing a chunk from Rust should not fail.");
726 }
727 }
728
729 pub(crate) fn in_memory(&self) -> bool {
731 let Some(underlying_source) = self.underlying_source.get() else {
732 return false;
733 };
734 underlying_source.in_memory()
735 }
736
737 pub(crate) fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
739 let underlying_source = self.underlying_source.get()?;
740 if underlying_source.in_memory() {
741 return self.queue.get_in_memory_bytes();
742 }
743 None
744 }
745
746 fn clear_algorithms(&self) {
748 self.underlying_source.set(None);
751
752 *self.strategy_size.borrow_mut() = None;
754 }
755
756 pub(crate) fn close(&self, cx: &mut JSContext) {
758 if !self.can_close_or_enqueue() {
760 return;
761 }
762
763 let Some(stream) = self.stream.get() else {
764 return;
765 };
766
767 self.close_requested.set(true);
769
770 if self.queue.is_empty() {
771 self.clear_algorithms();
773
774 stream.close(cx);
776 }
777 }
778
779 pub(crate) fn get_desired_size(&self) -> Option<f64> {
781 let stream = self.stream.get()?;
782
783 if stream.is_errored() {
785 return None;
786 }
787
788 if stream.is_closed() {
790 return Some(0.0);
791 }
792
793 let desired_size = self.strategy_hwm - self.queue.total_size.get().clamp(0.0, f64::MAX);
795 Some(desired_size.clamp(desired_size, self.strategy_hwm))
796 }
797
798 pub(crate) fn can_close_or_enqueue(&self) -> bool {
800 let Some(stream) = self.stream.get() else {
801 return false;
802 };
803
804 if !self.close_requested.get() && stream.is_readable() {
806 return true;
807 }
808
809 false
811 }
812
813 pub(crate) fn error(&self, cx: &mut JSContext, e: SafeHandleValue) {
815 let Some(stream) = self.stream.get() else {
816 return;
817 };
818
819 if !stream.is_readable() {
821 return;
822 }
823
824 self.queue.reset();
826
827 self.clear_algorithms();
829
830 stream.error(cx, e);
831 }
832
833 pub(crate) fn has_backpressure(&self) -> bool {
835 !self.should_call_pull()
838 }
839}
840
841impl ReadableStreamDefaultControllerMethods<crate::DomTypeHolder>
842 for ReadableStreamDefaultController
843{
844 fn GetDesiredSize(&self) -> Option<f64> {
846 self.get_desired_size()
847 }
848
849 fn Close(&self, cx: &mut JSContext) -> Fallible<()> {
851 if !self.can_close_or_enqueue() {
852 return Err(Error::Type(c"Stream cannot be closed.".to_owned()));
855 }
856
857 self.close(cx);
859
860 Ok(())
861 }
862
863 fn Enqueue(&self, cx: &mut JSContext, chunk: SafeHandleValue) -> Fallible<()> {
865 if !self.can_close_or_enqueue() {
867 return Err(Error::Type(c"Stream cannot be enqueued to.".to_owned()));
868 }
869
870 self.enqueue(cx, chunk)
872 }
873
874 fn Error(&self, cx: &mut JSContext, e: SafeHandleValue) -> Fallible<()> {
876 self.error(cx, e);
877 Ok(())
878 }
879}