1use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::jsapi::{Heap, JSObject};
12use js::jsval::{JSVal, UndefinedValue};
13use js::realm::CurrentRealm;
14use js::rust::wrappers2::JS_GetPendingException;
15use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue, MutableHandleValue};
16use js::typedarray::Uint8;
17use script_bindings::conversions::SafeToJSValConvertible;
18
19use crate::dom::bindings::buffer_source::create_buffer_source;
20use crate::dom::bindings::callback::ExceptionHandling;
21use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
22use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultControllerMethods;
23use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
24use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible, throw_dom_exception};
25use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
26use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
27use crate::dom::bindings::trace::RootedTraceableBox;
28use crate::dom::globalscope::GlobalScope;
29use crate::dom::promise::Promise;
30use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
31use crate::dom::stream::readablestream::ReadableStream;
32use crate::dom::stream::readablestreamdefaultreader::ReadRequest;
33use crate::dom::stream::underlyingsourcecontainer::{
34 UnderlyingSourceContainer, UnderlyingSourceType,
35};
36use crate::realms::{InRealm, enter_realm};
37use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
38
39#[derive(Clone, JSTraceable, MallocSizeOf)]
42#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
43struct PullAlgorithmFulfillmentHandler {
44 controller: Dom<ReadableStreamDefaultController>,
45}
46
47impl Callback for PullAlgorithmFulfillmentHandler {
48 fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
51 let can_gc = CanGc::from_cx(cx);
52 self.controller.pulling.set(false);
54
55 if self.controller.pull_again.get() {
57 self.controller.pull_again.set(false);
59
60 self.controller.call_pull_if_needed(can_gc);
62 }
63 }
64}
65
66#[derive(Clone, JSTraceable, MallocSizeOf)]
69#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
70struct PullAlgorithmRejectionHandler {
71 controller: Dom<ReadableStreamDefaultController>,
72}
73
74impl Callback for PullAlgorithmRejectionHandler {
75 fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
78 let can_gc = CanGc::from_cx(cx);
79 self.controller.error(v, can_gc);
81 }
82}
83
84#[derive(Clone, JSTraceable, MallocSizeOf)]
87#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
88struct StartAlgorithmFulfillmentHandler {
89 controller: Dom<ReadableStreamDefaultController>,
90}
91
92impl Callback for StartAlgorithmFulfillmentHandler {
93 fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
96 let can_gc = CanGc::from_cx(cx);
97 self.controller.started.set(true);
99
100 self.controller.call_pull_if_needed(can_gc);
102 }
103}
104
105#[derive(Clone, JSTraceable, MallocSizeOf)]
108#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
109struct StartAlgorithmRejectionHandler {
110 controller: Dom<ReadableStreamDefaultController>,
111}
112
113impl Callback for StartAlgorithmRejectionHandler {
114 fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
117 let can_gc = CanGc::from_cx(cx);
118 self.controller.error(v, can_gc);
120 }
121}
122
123#[derive(Debug, JSTraceable, MallocSizeOf, PartialEq)]
125#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
126pub(crate) struct ValueWithSize {
127 #[ignore_malloc_size_of = "Heap is measured by mozjs"]
129 pub(crate) value: Box<Heap<JSVal>>,
130 pub(crate) size: f64,
132}
133
134#[derive(Debug, JSTraceable, MallocSizeOf, PartialEq)]
136#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
137pub(crate) enum EnqueuedValue {
138 Native(Box<[u8]>),
140 Js(ValueWithSize),
142 CloseSentinel,
144}
145
146impl EnqueuedValue {
147 fn size(&self) -> f64 {
148 match self {
149 EnqueuedValue::Native(v) => v.len() as f64,
150 EnqueuedValue::Js(v) => v.size,
151 EnqueuedValue::CloseSentinel => 0.,
154 }
155 }
156
157 fn to_jsval(&self, cx: SafeJSContext, rval: MutableHandleValue, can_gc: CanGc) {
158 match self {
159 EnqueuedValue::Native(chunk) => {
160 rooted!(in(*cx) let mut array_buffer_ptr = ptr::null_mut::<JSObject>());
161 create_buffer_source::<Uint8>(cx, chunk, array_buffer_ptr.handle_mut(), can_gc)
162 .expect("failed to create buffer source for native chunk.");
163 array_buffer_ptr.safe_to_jsval(cx, rval, can_gc);
164 },
165 EnqueuedValue::Js(value_with_size) => {
166 value_with_size.value.safe_to_jsval(cx, rval, can_gc)
167 },
168 EnqueuedValue::CloseSentinel => {
169 unreachable!("The close sentinel is never made available as a js val.")
170 },
171 }
172 }
173}
174
175fn is_non_negative_number(value: &EnqueuedValue) -> bool {
177 let value_with_size = match value {
178 EnqueuedValue::Native(_) => return true,
179 EnqueuedValue::Js(value_with_size) => value_with_size,
180 EnqueuedValue::CloseSentinel => return true,
181 };
182
183 if value_with_size.size.is_nan() {
188 return false;
189 }
190
191 if value_with_size.size.is_sign_negative() {
193 return false;
194 }
195
196 true
197}
198
199#[derive(Default, JSTraceable, MallocSizeOf)]
201#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
202pub(crate) struct QueueWithSizes {
203 queue: RefCell<VecDeque<EnqueuedValue>>,
204 pub(crate) total_size: Cell<f64>,
206}
207
208impl QueueWithSizes {
209 pub(crate) fn dequeue_value(
213 &self,
214 cx: SafeJSContext,
215 rval: Option<MutableHandleValue>,
216 can_gc: CanGc,
217 ) {
218 {
219 let queue = self.queue.borrow();
220 let Some(value) = queue.front() else {
221 unreachable!("Buffer cannot be empty when dequeue value is called into.");
222 };
223 self.total_size.set(self.total_size.get() - value.size());
224 if let Some(rval) = rval {
225 value.to_jsval(cx, rval, can_gc);
226 } else {
227 assert_eq!(value, &EnqueuedValue::CloseSentinel);
228 }
229 }
230 self.queue.borrow_mut().pop_front();
231 }
232
233 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
235 pub(crate) fn enqueue_value_with_size(&self, value: EnqueuedValue) -> Result<(), Error> {
236 if !is_non_negative_number(&value) {
238 return Err(Error::Range(
239 c"The size of the enqueued chunk is not a non-negative number.".to_owned(),
240 ));
241 }
242
243 if value.size().is_infinite() {
245 return Err(Error::Range(
246 c"The size of the enqueued chunk is infinite.".to_owned(),
247 ));
248 }
249
250 self.total_size.set(self.total_size.get() + value.size());
251 self.queue.borrow_mut().push_back(value);
252
253 Ok(())
254 }
255
256 pub(crate) fn is_empty(&self) -> bool {
257 self.queue.borrow().is_empty()
258 }
259
260 pub(crate) fn peek_queue_value(
263 &self,
264 cx: SafeJSContext,
265 rval: MutableHandleValue,
266 can_gc: CanGc,
267 ) -> bool {
268 assert!(!self.is_empty());
273
274 let queue = self.queue.borrow();
276 let value_with_size = queue.front().expect("Queue is not empty.");
277 if let EnqueuedValue::CloseSentinel = value_with_size {
278 return true;
279 }
280
281 value_with_size.to_jsval(cx, rval, can_gc);
283 false
284 }
285
286 fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
288 self.queue
289 .borrow()
290 .iter()
291 .try_fold(Vec::new(), |mut acc, value| match value {
292 EnqueuedValue::Native(chunk) => {
293 acc.extend(chunk.iter().copied());
294 Some(acc)
295 },
296 _ => {
297 warn!("get_in_memory_bytes called on a controller with non-native source.");
298 None
299 },
300 })
301 }
302
303 pub(crate) fn reset(&self) {
305 self.queue.borrow_mut().clear();
306 self.total_size.set(Default::default());
307 }
308}
309
310#[dom_struct]
312pub(crate) struct ReadableStreamDefaultController {
313 reflector_: Reflector,
314
315 queue: QueueWithSizes,
317
318 underlying_source: MutNullableDom<UnderlyingSourceContainer>,
324
325 stream: MutNullableDom<ReadableStream>,
326
327 strategy_hwm: f64,
329
330 #[ignore_malloc_size_of = "mozjs"]
332 strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>,
333
334 close_requested: Cell<bool>,
336
337 started: Cell<bool>,
339
340 pulling: Cell<bool>,
342
343 pull_again: Cell<bool>,
345}
346
347impl ReadableStreamDefaultController {
348 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
349 fn new_inherited(
350 global: &GlobalScope,
351 underlying_source_type: UnderlyingSourceType,
352 strategy_hwm: f64,
353 strategy_size: Rc<QueuingStrategySize>,
354 can_gc: CanGc,
355 ) -> ReadableStreamDefaultController {
356 ReadableStreamDefaultController {
357 reflector_: Reflector::new(),
358 queue: Default::default(),
359 stream: MutNullableDom::new(None),
360 underlying_source: MutNullableDom::new(Some(&*UnderlyingSourceContainer::new(
361 global,
362 underlying_source_type,
363 can_gc,
364 ))),
365 strategy_hwm,
366 strategy_size: RefCell::new(Some(strategy_size)),
367 close_requested: Default::default(),
368 started: Default::default(),
369 pulling: Default::default(),
370 pull_again: Default::default(),
371 }
372 }
373
374 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
375 pub(crate) fn new(
376 global: &GlobalScope,
377 underlying_source: UnderlyingSourceType,
378 strategy_hwm: f64,
379 strategy_size: Rc<QueuingStrategySize>,
380 can_gc: CanGc,
381 ) -> DomRoot<ReadableStreamDefaultController> {
382 reflect_dom_object(
383 Box::new(ReadableStreamDefaultController::new_inherited(
384 global,
385 underlying_source,
386 strategy_hwm,
387 strategy_size,
388 can_gc,
389 )),
390 global,
391 can_gc,
392 )
393 }
394
395 pub(crate) fn setup(
397 &self,
398 stream: DomRoot<ReadableStream>,
399 can_gc: CanGc,
400 ) -> Result<(), Error> {
401 stream.assert_no_controller();
403
404 self.stream.set(Some(&stream));
406
407 let global = &*self.global();
408 let rooted_default_controller = DomRoot::from_ref(self);
409
410 stream.set_default_controller(&rooted_default_controller);
423
424 if let Some(underlying_source) = rooted_default_controller.underlying_source.get() {
425 let start_result = underlying_source
427 .call_start_algorithm(
428 Controller::ReadableStreamDefaultController(rooted_default_controller.clone()),
429 can_gc,
430 )
431 .unwrap_or_else(|| {
432 let promise = Promise::new(global, can_gc);
433 promise.resolve_native(&(), can_gc);
434 Ok(promise)
435 });
436
437 let start_promise = start_result?;
439
440 let handler = PromiseNativeHandler::new(
442 global,
443 Some(Box::new(StartAlgorithmFulfillmentHandler {
444 controller: Dom::from_ref(&rooted_default_controller),
445 })),
446 Some(Box::new(StartAlgorithmRejectionHandler {
447 controller: Dom::from_ref(&rooted_default_controller),
448 })),
449 can_gc,
450 );
451 let realm = enter_realm(global);
452 let comp = InRealm::Entered(&realm);
453 start_promise.append_native_handler(&handler, comp, can_gc);
454 };
455
456 Ok(())
457 }
458
459 pub(crate) fn set_underlying_source_this_object(&self, this_object: HandleObject) {
461 if let Some(underlying_source) = self.underlying_source.get() {
462 underlying_source.set_underlying_source_this_object(this_object);
463 }
464 }
465
466 fn dequeue_value(&self, cx: SafeJSContext, rval: MutableHandleValue, can_gc: CanGc) {
468 self.queue.dequeue_value(cx, Some(rval), can_gc);
469 }
470
471 fn should_call_pull(&self) -> bool {
473 let Some(stream) = self.stream.get() else {
477 debug!("`should_call_pull` called on a controller without a stream.");
478 return false;
479 };
480
481 if !self.can_close_or_enqueue() {
483 return false;
484 }
485
486 if !self.started.get() {
488 return false;
489 }
490
491 if stream.is_locked() && stream.get_num_read_requests() > 0 {
494 return true;
495 }
496
497 let desired_size = self.get_desired_size().expect("desiredSize is not null.");
500
501 if desired_size > 0. {
502 return true;
503 }
504
505 false
506 }
507
508 fn call_pull_if_needed(&self, can_gc: CanGc) {
510 if !self.should_call_pull() {
513 return;
514 }
515
516 if self.pulling.get() {
518 self.pull_again.set(true);
520
521 return;
522 }
523
524 self.pulling.set(true);
526
527 let global = self.global();
530 let rooted_default_controller = DomRoot::from_ref(self);
531 let controller =
532 Controller::ReadableStreamDefaultController(rooted_default_controller.clone());
533
534 let Some(underlying_source) = self.underlying_source.get() else {
535 return;
536 };
537 let handler = PromiseNativeHandler::new(
538 &global,
539 Some(Box::new(PullAlgorithmFulfillmentHandler {
540 controller: Dom::from_ref(&rooted_default_controller),
541 })),
542 Some(Box::new(PullAlgorithmRejectionHandler {
543 controller: Dom::from_ref(&rooted_default_controller),
544 })),
545 can_gc,
546 );
547
548 let realm = enter_realm(&*global);
549 let comp = InRealm::Entered(&realm);
550 let result = underlying_source
551 .call_pull_algorithm(controller, can_gc)
552 .unwrap_or_else(|| {
553 let promise = Promise::new(&global, can_gc);
554 promise.resolve_native(&(), can_gc);
555 Ok(promise)
556 });
557 let promise = result.unwrap_or_else(|error| {
558 let cx = GlobalScope::get_cx();
559 rooted!(in(*cx) let mut rval = UndefinedValue());
560 error.to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
562 let promise = Promise::new(&global, can_gc);
563 promise.reject_native(&rval.handle(), can_gc);
564 promise
565 });
566 promise.append_native_handler(&handler, comp, can_gc);
567 }
568
569 pub(crate) fn perform_cancel_steps(
571 &self,
572 cx: &mut js::context::JSContext,
573 global: &GlobalScope,
574 reason: SafeHandleValue,
575 ) -> Rc<Promise> {
576 self.queue.reset();
578
579 let underlying_source = self
580 .underlying_source
581 .get()
582 .expect("Controller should have a source when the cancel steps are called into.");
583 let result = underlying_source
585 .call_cancel_algorithm(cx, global, reason)
586 .unwrap_or_else(|| {
587 let promise = Promise::new2(cx, global);
588 promise.resolve_native(&(), CanGc::from_cx(cx));
589 Ok(promise)
590 });
591 let promise = result.unwrap_or_else(|error| {
592 rooted!(&in(cx) let mut rval = UndefinedValue());
593
594 error.to_jsval(cx.into(), global, rval.handle_mut(), CanGc::from_cx(cx));
595 let promise = Promise::new2(cx, global);
596 promise.reject_native(&rval.handle(), CanGc::from_cx(cx));
597 promise
598 });
599
600 self.clear_algorithms();
602
603 promise
605 }
606
607 pub(crate) fn perform_pull_steps(&self, read_request: &ReadRequest, can_gc: CanGc) {
609 let Some(stream) = self.stream.get() else {
612 return;
613 };
614
615 if !self.queue.is_empty() {
617 let cx = GlobalScope::get_cx();
618 rooted!(in(*cx) let mut rval = UndefinedValue());
619 let result = RootedTraceableBox::new(Heap::default());
620 self.dequeue_value(cx, rval.handle_mut(), can_gc);
621 result.set(*rval);
622
623 if self.close_requested.get() && self.queue.is_empty() {
625 self.clear_algorithms();
627
628 stream.close(can_gc);
630 } else {
631 self.call_pull_if_needed(can_gc);
633 }
634 read_request.chunk_steps(result, &self.global(), can_gc);
636 } else {
637 stream.add_read_request(read_request);
639
640 self.call_pull_if_needed(can_gc);
642 }
643 }
644
645 pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
647 Ok(())
649 }
650
651 #[expect(unsafe_code)]
653 pub(crate) fn enqueue(
654 &self,
655 cx: &mut js::context::JSContext,
656 chunk: SafeHandleValue,
657 ) -> Result<(), Error> {
658 if !self.can_close_or_enqueue() {
660 return Ok(());
661 }
662
663 let stream = self
664 .stream
665 .get()
666 .expect("Controller must have a stream when a chunk is enqueued.");
667
668 if stream.is_locked() && stream.get_num_read_requests() > 0 {
672 stream.fulfill_read_request(chunk, false, CanGc::from_cx(cx));
673 } else {
674 let strategy_size = {
679 let reference = self.strategy_size.borrow();
680 reference.clone()
681 };
682 let size = if let Some(strategy_size) = strategy_size {
683 let result =
686 strategy_size.Call__(chunk, ExceptionHandling::Rethrow, CanGc::from_cx(cx));
687 match result {
688 Ok(size) => size,
690 Err(error) => {
691 rooted!(&in(cx) let mut rval = UndefinedValue());
693 unsafe { assert!(JS_GetPendingException(cx, rval.handle_mut())) };
694
695 self.error(rval.handle(), CanGc::from_cx(cx));
697
698 return Err(error);
701 },
702 }
703 } else {
704 0.
705 };
706
707 {
708 let res = self
710 .queue
711 .enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize {
712 value: Heap::boxed(chunk.get()),
713 size,
714 }));
715 if let Err(error) = res {
716 throw_dom_exception(cx.into(), &self.global(), error, CanGc::from_cx(cx));
722
723 rooted!(&in(cx) let mut rval = UndefinedValue());
726 unsafe { assert!(JS_GetPendingException(cx, rval.handle_mut())) };
727
728 self.error(rval.handle(), CanGc::from_cx(cx));
730
731 return Err(Error::JSFailed);
735 }
736 }
737 }
738
739 self.call_pull_if_needed(CanGc::from_cx(cx));
741
742 Ok(())
743 }
744
745 pub(crate) fn enqueue_native(&self, chunk: Vec<u8>, can_gc: CanGc) {
748 let stream = self
749 .stream
750 .get()
751 .expect("Controller must have a stream when a chunk is enqueued.");
752 if stream.is_locked() && stream.get_num_read_requests() > 0 {
753 let cx = GlobalScope::get_cx();
754 rooted!(in(*cx) let mut rval = UndefinedValue());
755 EnqueuedValue::Native(chunk.into_boxed_slice()).to_jsval(cx, rval.handle_mut(), can_gc);
756 stream.fulfill_read_request(rval.handle(), false, can_gc);
757 } else {
758 self.queue
759 .enqueue_value_with_size(EnqueuedValue::Native(chunk.into_boxed_slice()))
760 .expect("Enqueuing a chunk from Rust should not fail.");
761 }
762 }
763
764 pub(crate) fn in_memory(&self) -> bool {
766 let Some(underlying_source) = self.underlying_source.get() else {
767 return false;
768 };
769 underlying_source.in_memory()
770 }
771
772 pub(crate) fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
774 let underlying_source = self.underlying_source.get()?;
775 if underlying_source.in_memory() {
776 return self.queue.get_in_memory_bytes();
777 }
778 None
779 }
780
781 fn clear_algorithms(&self) {
783 self.underlying_source.set(None);
786
787 *self.strategy_size.borrow_mut() = None;
789 }
790
791 pub(crate) fn close(&self, can_gc: CanGc) {
793 if !self.can_close_or_enqueue() {
795 return;
796 }
797
798 let Some(stream) = self.stream.get() else {
799 return;
800 };
801
802 self.close_requested.set(true);
804
805 if self.queue.is_empty() {
806 self.clear_algorithms();
808
809 stream.close(can_gc);
811 }
812 }
813
814 pub(crate) fn get_desired_size(&self) -> Option<f64> {
816 let stream = self.stream.get()?;
817
818 if stream.is_errored() {
820 return None;
821 }
822
823 if stream.is_closed() {
825 return Some(0.0);
826 }
827
828 let desired_size = self.strategy_hwm - self.queue.total_size.get().clamp(0.0, f64::MAX);
830 Some(desired_size.clamp(desired_size, self.strategy_hwm))
831 }
832
833 pub(crate) fn can_close_or_enqueue(&self) -> bool {
835 let Some(stream) = self.stream.get() else {
836 return false;
837 };
838
839 if !self.close_requested.get() && stream.is_readable() {
841 return true;
842 }
843
844 false
846 }
847
848 pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
850 let Some(stream) = self.stream.get() else {
851 return;
852 };
853
854 if !stream.is_readable() {
856 return;
857 }
858
859 self.queue.reset();
861
862 self.clear_algorithms();
864
865 stream.error(e, can_gc);
866 }
867
868 pub(crate) fn has_backpressure(&self) -> bool {
870 !self.should_call_pull()
873 }
874}
875
876impl ReadableStreamDefaultControllerMethods<crate::DomTypeHolder>
877 for ReadableStreamDefaultController
878{
879 fn GetDesiredSize(&self) -> Option<f64> {
881 self.get_desired_size()
882 }
883
884 fn Close(&self, can_gc: CanGc) -> Fallible<()> {
886 if !self.can_close_or_enqueue() {
887 return Err(Error::Type(c"Stream cannot be closed.".to_owned()));
890 }
891
892 self.close(can_gc);
894
895 Ok(())
896 }
897
898 fn Enqueue(&self, cx: &mut js::context::JSContext, chunk: SafeHandleValue) -> Fallible<()> {
900 if !self.can_close_or_enqueue() {
902 return Err(Error::Type(c"Stream cannot be enqueued to.".to_owned()));
903 }
904
905 self.enqueue(cx, chunk)
907 }
908
909 fn Error(&self, cx: &mut js::context::JSContext, e: SafeHandleValue) -> Fallible<()> {
911 self.error(e, CanGc::from_cx(cx));
912 Ok(())
913 }
914}