1use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::jsapi::{Heap, JSObject};
12use js::jsval::{JSVal, UndefinedValue};
13use js::rust::wrappers::JS_GetPendingException;
14use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue, MutableHandleValue};
15use js::typedarray::Uint8;
16use script_bindings::conversions::SafeToJSValConvertible;
17
18use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
19use super::bindings::root::Dom;
20use crate::dom::bindings::buffer_source::create_buffer_source;
21use crate::dom::bindings::callback::ExceptionHandling;
22use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultControllerMethods;
23use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
24use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible, throw_dom_exception};
25use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
26use crate::dom::bindings::root::{DomRoot, MutNullableDom};
27use crate::dom::bindings::trace::RootedTraceableBox;
28use crate::dom::globalscope::GlobalScope;
29use crate::dom::promise::Promise;
30use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
31use crate::dom::readablestream::ReadableStream;
32use crate::dom::readablestreamdefaultreader::ReadRequest;
33use crate::dom::underlyingsourcecontainer::{UnderlyingSourceContainer, UnderlyingSourceType};
34use crate::realms::{InRealm, enter_realm};
35use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
36
37#[derive(Clone, JSTraceable, MallocSizeOf)]
40#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
41struct PullAlgorithmFulfillmentHandler {
42 controller: Dom<ReadableStreamDefaultController>,
43}
44
45impl Callback for PullAlgorithmFulfillmentHandler {
46 fn callback(&self, _cx: SafeJSContext, _v: HandleValue, _realm: InRealm, can_gc: CanGc) {
49 self.controller.pulling.set(false);
51
52 if self.controller.pull_again.get() {
54 self.controller.pull_again.set(false);
56
57 self.controller.call_pull_if_needed(can_gc);
59 }
60 }
61}
62
63#[derive(Clone, JSTraceable, MallocSizeOf)]
66#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
67struct PullAlgorithmRejectionHandler {
68 controller: Dom<ReadableStreamDefaultController>,
69}
70
71impl Callback for PullAlgorithmRejectionHandler {
72 fn callback(&self, _cx: SafeJSContext, v: HandleValue, _realm: InRealm, can_gc: CanGc) {
75 self.controller.error(v, can_gc);
77 }
78}
79
80#[derive(Clone, JSTraceable, MallocSizeOf)]
83#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
84struct StartAlgorithmFulfillmentHandler {
85 controller: Dom<ReadableStreamDefaultController>,
86}
87
88impl Callback for StartAlgorithmFulfillmentHandler {
89 fn callback(&self, _cx: SafeJSContext, _v: HandleValue, _realm: InRealm, can_gc: CanGc) {
92 self.controller.started.set(true);
94
95 self.controller.call_pull_if_needed(can_gc);
97 }
98}
99
100#[derive(Clone, JSTraceable, MallocSizeOf)]
103#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
104struct StartAlgorithmRejectionHandler {
105 controller: Dom<ReadableStreamDefaultController>,
106}
107
108impl Callback for StartAlgorithmRejectionHandler {
109 fn callback(&self, _cx: SafeJSContext, v: HandleValue, _realm: InRealm, can_gc: CanGc) {
112 self.controller.error(v, can_gc);
114 }
115}
116
117#[derive(Debug, JSTraceable, MallocSizeOf, PartialEq)]
119#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
120pub(crate) struct ValueWithSize {
121 #[ignore_malloc_size_of = "Heap is measured by mozjs"]
123 pub(crate) value: Box<Heap<JSVal>>,
124 pub(crate) size: f64,
126}
127
128#[derive(Debug, JSTraceable, MallocSizeOf, PartialEq)]
130#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
131pub(crate) enum EnqueuedValue {
132 Native(Box<[u8]>),
134 Js(ValueWithSize),
136 CloseSentinel,
138}
139
140impl EnqueuedValue {
141 fn size(&self) -> f64 {
142 match self {
143 EnqueuedValue::Native(v) => v.len() as f64,
144 EnqueuedValue::Js(v) => v.size,
145 EnqueuedValue::CloseSentinel => 0.,
148 }
149 }
150
151 fn to_jsval(&self, cx: SafeJSContext, rval: MutableHandleValue, can_gc: CanGc) {
152 match self {
153 EnqueuedValue::Native(chunk) => {
154 rooted!(in(*cx) let mut array_buffer_ptr = ptr::null_mut::<JSObject>());
155 create_buffer_source::<Uint8>(cx, chunk, array_buffer_ptr.handle_mut(), can_gc)
156 .expect("failed to create buffer source for native chunk.");
157 array_buffer_ptr.safe_to_jsval(cx, rval);
158 },
159 EnqueuedValue::Js(value_with_size) => value_with_size.value.safe_to_jsval(cx, rval),
160 EnqueuedValue::CloseSentinel => {
161 unreachable!("The close sentinel is never made available as a js val.")
162 },
163 }
164 }
165}
166
167fn is_non_negative_number(value: &EnqueuedValue) -> bool {
169 let value_with_size = match value {
170 EnqueuedValue::Native(_) => return true,
171 EnqueuedValue::Js(value_with_size) => value_with_size,
172 EnqueuedValue::CloseSentinel => return true,
173 };
174
175 if value_with_size.size.is_nan() {
180 return false;
181 }
182
183 if value_with_size.size.is_sign_negative() {
185 return false;
186 }
187
188 true
189}
190
191#[derive(Default, JSTraceable, MallocSizeOf)]
193#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
194pub(crate) struct QueueWithSizes {
195 queue: VecDeque<EnqueuedValue>,
196 pub(crate) total_size: f64,
198}
199
200impl QueueWithSizes {
201 pub(crate) fn dequeue_value(
205 &mut self,
206 cx: SafeJSContext,
207 rval: Option<MutableHandleValue>,
208 can_gc: CanGc,
209 ) {
210 let Some(value) = self.queue.front() else {
211 unreachable!("Buffer cannot be empty when dequeue value is called into.");
212 };
213 self.total_size -= value.size();
214 if let Some(rval) = rval {
215 value.to_jsval(cx, rval, can_gc);
216 } else {
217 assert_eq!(value, &EnqueuedValue::CloseSentinel);
218 }
219 self.queue.pop_front();
220 }
221
222 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
224 pub(crate) fn enqueue_value_with_size(&mut self, value: EnqueuedValue) -> Result<(), Error> {
225 if !is_non_negative_number(&value) {
227 return Err(Error::Range(
228 "The size of the enqueued chunk is not a non-negative number.".to_string(),
229 ));
230 }
231
232 if value.size().is_infinite() {
234 return Err(Error::Range(
235 "The size of the enqueued chunk is infinite.".to_string(),
236 ));
237 }
238
239 self.total_size += value.size();
240 self.queue.push_back(value);
241
242 Ok(())
243 }
244
245 pub(crate) fn is_empty(&self) -> bool {
246 self.queue.is_empty()
247 }
248
249 pub(crate) fn peek_queue_value(
252 &self,
253 cx: SafeJSContext,
254 rval: MutableHandleValue,
255 can_gc: CanGc,
256 ) -> bool {
257 assert!(!self.is_empty());
262
263 let value_with_size = self.queue.front().expect("Queue is not empty.");
265 if let EnqueuedValue::CloseSentinel = value_with_size {
266 return true;
267 }
268
269 value_with_size.to_jsval(cx, rval, can_gc);
271 false
272 }
273
274 fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
276 self.queue
277 .iter()
278 .try_fold(Vec::new(), |mut acc, value| match value {
279 EnqueuedValue::Native(chunk) => {
280 acc.extend(chunk.iter().copied());
281 Some(acc)
282 },
283 _ => {
284 warn!("get_in_memory_bytes called on a controller with non-native source.");
285 None
286 },
287 })
288 }
289
290 pub(crate) fn reset(&mut self) {
292 self.queue.clear();
293 self.total_size = Default::default();
294 }
295}
296
297#[dom_struct]
299pub(crate) struct ReadableStreamDefaultController {
300 reflector_: Reflector,
301
302 queue: RefCell<QueueWithSizes>,
304
305 underlying_source: MutNullableDom<UnderlyingSourceContainer>,
311
312 stream: MutNullableDom<ReadableStream>,
313
314 strategy_hwm: f64,
316
317 #[ignore_malloc_size_of = "mozjs"]
319 strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>,
320
321 close_requested: Cell<bool>,
323
324 started: Cell<bool>,
326
327 pulling: Cell<bool>,
329
330 pull_again: Cell<bool>,
332}
333
334impl ReadableStreamDefaultController {
335 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
336 fn new_inherited(
337 global: &GlobalScope,
338 underlying_source_type: UnderlyingSourceType,
339 strategy_hwm: f64,
340 strategy_size: Rc<QueuingStrategySize>,
341 can_gc: CanGc,
342 ) -> ReadableStreamDefaultController {
343 ReadableStreamDefaultController {
344 reflector_: Reflector::new(),
345 queue: RefCell::new(Default::default()),
346 stream: MutNullableDom::new(None),
347 underlying_source: MutNullableDom::new(Some(&*UnderlyingSourceContainer::new(
348 global,
349 underlying_source_type,
350 can_gc,
351 ))),
352 strategy_hwm,
353 strategy_size: RefCell::new(Some(strategy_size)),
354 close_requested: Default::default(),
355 started: Default::default(),
356 pulling: Default::default(),
357 pull_again: Default::default(),
358 }
359 }
360
361 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
362 pub(crate) fn new(
363 global: &GlobalScope,
364 underlying_source: UnderlyingSourceType,
365 strategy_hwm: f64,
366 strategy_size: Rc<QueuingStrategySize>,
367 can_gc: CanGc,
368 ) -> DomRoot<ReadableStreamDefaultController> {
369 reflect_dom_object(
370 Box::new(ReadableStreamDefaultController::new_inherited(
371 global,
372 underlying_source,
373 strategy_hwm,
374 strategy_size,
375 can_gc,
376 )),
377 global,
378 can_gc,
379 )
380 }
381
382 pub(crate) fn setup(
384 &self,
385 stream: DomRoot<ReadableStream>,
386 can_gc: CanGc,
387 ) -> Result<(), Error> {
388 stream.assert_no_controller();
390
391 self.stream.set(Some(&stream));
393
394 let global = &*self.global();
395 let rooted_default_controller = DomRoot::from_ref(self);
396
397 stream.set_default_controller(&rooted_default_controller);
410
411 if let Some(underlying_source) = rooted_default_controller.underlying_source.get() {
412 let start_result = underlying_source
414 .call_start_algorithm(
415 Controller::ReadableStreamDefaultController(rooted_default_controller.clone()),
416 can_gc,
417 )
418 .unwrap_or_else(|| {
419 let promise = Promise::new(global, can_gc);
420 promise.resolve_native(&(), can_gc);
421 Ok(promise)
422 });
423
424 let start_promise = start_result?;
426
427 let handler = PromiseNativeHandler::new(
429 global,
430 Some(Box::new(StartAlgorithmFulfillmentHandler {
431 controller: Dom::from_ref(&rooted_default_controller),
432 })),
433 Some(Box::new(StartAlgorithmRejectionHandler {
434 controller: Dom::from_ref(&rooted_default_controller),
435 })),
436 can_gc,
437 );
438 let realm = enter_realm(global);
439 let comp = InRealm::Entered(&realm);
440 start_promise.append_native_handler(&handler, comp, can_gc);
441 };
442
443 Ok(())
444 }
445
446 pub(crate) fn set_underlying_source_this_object(&self, this_object: HandleObject) {
448 if let Some(underlying_source) = self.underlying_source.get() {
449 underlying_source.set_underlying_source_this_object(this_object);
450 }
451 }
452
453 fn dequeue_value(&self, cx: SafeJSContext, rval: MutableHandleValue, can_gc: CanGc) {
455 let mut queue = self.queue.borrow_mut();
456 queue.dequeue_value(cx, Some(rval), can_gc);
457 }
458
459 fn should_call_pull(&self) -> bool {
461 let Some(stream) = self.stream.get() else {
465 debug!("`should_call_pull` called on a controller without a stream.");
466 return false;
467 };
468
469 if !self.can_close_or_enqueue() {
471 return false;
472 }
473
474 if !self.started.get() {
476 return false;
477 }
478
479 if stream.is_locked() && stream.get_num_read_requests() > 0 {
482 return true;
483 }
484
485 let desired_size = self.get_desired_size().expect("desiredSize is not null.");
488
489 if desired_size > 0. {
490 return true;
491 }
492
493 false
494 }
495
496 fn call_pull_if_needed(&self, can_gc: CanGc) {
498 if !self.should_call_pull() {
501 return;
502 }
503
504 if self.pulling.get() {
506 self.pull_again.set(true);
508
509 return;
510 }
511
512 self.pulling.set(true);
514
515 let global = self.global();
518 let rooted_default_controller = DomRoot::from_ref(self);
519 let controller =
520 Controller::ReadableStreamDefaultController(rooted_default_controller.clone());
521
522 let Some(underlying_source) = self.underlying_source.get() else {
523 return;
524 };
525 let handler = PromiseNativeHandler::new(
526 &global,
527 Some(Box::new(PullAlgorithmFulfillmentHandler {
528 controller: Dom::from_ref(&rooted_default_controller),
529 })),
530 Some(Box::new(PullAlgorithmRejectionHandler {
531 controller: Dom::from_ref(&rooted_default_controller),
532 })),
533 can_gc,
534 );
535
536 let realm = enter_realm(&*global);
537 let comp = InRealm::Entered(&realm);
538 let result = underlying_source
539 .call_pull_algorithm(controller, &global, can_gc)
540 .unwrap_or_else(|| {
541 let promise = Promise::new(&global, can_gc);
542 promise.resolve_native(&(), can_gc);
543 Ok(promise)
544 });
545 let promise = result.unwrap_or_else(|error| {
546 let cx = GlobalScope::get_cx();
547 rooted!(in(*cx) let mut rval = UndefinedValue());
548 error
550 .clone()
551 .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
552 let promise = Promise::new(&global, can_gc);
553 promise.reject_native(&rval.handle(), can_gc);
554 promise
555 });
556 promise.append_native_handler(&handler, comp, can_gc);
557 }
558
559 pub(crate) fn perform_cancel_steps(
561 &self,
562 cx: SafeJSContext,
563 global: &GlobalScope,
564 reason: SafeHandleValue,
565 can_gc: CanGc,
566 ) -> Rc<Promise> {
567 self.queue.borrow_mut().reset();
569
570 let underlying_source = self
571 .underlying_source
572 .get()
573 .expect("Controller should have a source when the cancel steps are called into.");
574 let result = underlying_source
576 .call_cancel_algorithm(cx, global, reason, can_gc)
577 .unwrap_or_else(|| {
578 let promise = Promise::new(global, can_gc);
579 promise.resolve_native(&(), can_gc);
580 Ok(promise)
581 });
582 let promise = result.unwrap_or_else(|error| {
583 rooted!(in(*cx) let mut rval = UndefinedValue());
584
585 error
586 .clone()
587 .to_jsval(cx, global, rval.handle_mut(), can_gc);
588 let promise = Promise::new(global, can_gc);
589 promise.reject_native(&rval.handle(), can_gc);
590 promise
591 });
592
593 self.clear_algorithms();
595
596 promise
598 }
599
600 pub(crate) fn perform_pull_steps(&self, read_request: &ReadRequest, can_gc: CanGc) {
602 let Some(stream) = self.stream.get() else {
605 return;
606 };
607
608 if !self.queue.borrow().is_empty() {
610 let cx = GlobalScope::get_cx();
611 rooted!(in(*cx) let mut rval = UndefinedValue());
612 let result = RootedTraceableBox::new(Heap::default());
613 self.dequeue_value(cx, rval.handle_mut(), can_gc);
614 result.set(*rval);
615
616 if self.close_requested.get() && self.queue.borrow().is_empty() {
618 self.clear_algorithms();
620
621 stream.close(can_gc);
623 } else {
624 self.call_pull_if_needed(can_gc);
626 }
627 read_request.chunk_steps(result, can_gc);
629 } else {
630 stream.add_read_request(read_request);
632
633 self.call_pull_if_needed(can_gc);
635 }
636 }
637
638 pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
640 Ok(())
642 }
643
644 #[allow(unsafe_code)]
646 pub(crate) fn enqueue(
647 &self,
648 cx: SafeJSContext,
649 chunk: SafeHandleValue,
650 can_gc: CanGc,
651 ) -> Result<(), Error> {
652 if !self.can_close_or_enqueue() {
654 return Ok(());
655 }
656
657 let stream = self
658 .stream
659 .get()
660 .expect("Controller must have a stream when a chunk is enqueued.");
661
662 if stream.is_locked() && stream.get_num_read_requests() > 0 {
666 stream.fulfill_read_request(chunk, false, can_gc);
667 } else {
668 let strategy_size = {
673 let reference = self.strategy_size.borrow();
674 reference.clone()
675 };
676 let size = if let Some(strategy_size) = strategy_size {
677 let result = strategy_size.Call__(chunk, ExceptionHandling::Rethrow, can_gc);
680 match result {
681 Ok(size) => size,
683 Err(error) => {
684 rooted!(in(*cx) let mut rval = UndefinedValue());
686 unsafe { assert!(JS_GetPendingException(*cx, rval.handle_mut())) };
687
688 self.error(rval.handle(), can_gc);
690
691 return Err(error);
694 },
695 }
696 } else {
697 0.
698 };
699
700 {
701 let res = {
703 let mut queue = self.queue.borrow_mut();
704 queue.enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize {
705 value: Heap::boxed(chunk.get()),
706 size,
707 }))
708 };
709 if let Err(error) = res {
710 throw_dom_exception(cx, &self.global(), error, can_gc);
716
717 rooted!(in(*cx) let mut rval = UndefinedValue());
720 unsafe { assert!(JS_GetPendingException(*cx, rval.handle_mut())) };
721
722 self.error(rval.handle(), can_gc);
724
725 return Err(Error::JSFailed);
729 }
730 }
731 }
732
733 self.call_pull_if_needed(can_gc);
735
736 Ok(())
737 }
738
739 pub(crate) fn enqueue_native(&self, chunk: Vec<u8>, can_gc: CanGc) {
742 let stream = self
743 .stream
744 .get()
745 .expect("Controller must have a stream when a chunk is enqueued.");
746 if stream.is_locked() && stream.get_num_read_requests() > 0 {
747 let cx = GlobalScope::get_cx();
748 rooted!(in(*cx) let mut rval = UndefinedValue());
749 EnqueuedValue::Native(chunk.into_boxed_slice()).to_jsval(cx, rval.handle_mut(), can_gc);
750 stream.fulfill_read_request(rval.handle(), false, can_gc);
751 } else {
752 let mut queue = self.queue.borrow_mut();
753 queue
754 .enqueue_value_with_size(EnqueuedValue::Native(chunk.into_boxed_slice()))
755 .expect("Enqueuing a chunk from Rust should not fail.");
756 }
757 }
758
759 pub(crate) fn in_memory(&self) -> bool {
761 let Some(underlying_source) = self.underlying_source.get() else {
762 return false;
763 };
764 underlying_source.in_memory()
765 }
766
767 pub(crate) fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
769 let underlying_source = self.underlying_source.get()?;
770 if underlying_source.in_memory() {
771 return self.queue.borrow().get_in_memory_bytes();
772 }
773 None
774 }
775
776 fn clear_algorithms(&self) {
778 self.underlying_source.set(None);
781
782 *self.strategy_size.borrow_mut() = None;
784 }
785
786 pub(crate) fn close(&self, can_gc: CanGc) {
788 if !self.can_close_or_enqueue() {
790 return;
791 }
792
793 let Some(stream) = self.stream.get() else {
794 return;
795 };
796
797 self.close_requested.set(true);
799
800 if self.queue.borrow().is_empty() {
801 self.clear_algorithms();
803
804 stream.close(can_gc);
806 }
807 }
808
809 pub(crate) fn get_desired_size(&self) -> Option<f64> {
811 let stream = self.stream.get()?;
812
813 if stream.is_errored() {
815 return None;
816 }
817
818 if stream.is_closed() {
820 return Some(0.0);
821 }
822
823 let queue = self.queue.borrow();
825 let desired_size = self.strategy_hwm - queue.total_size.clamp(0.0, f64::MAX);
826 Some(desired_size.clamp(desired_size, self.strategy_hwm))
827 }
828
829 pub(crate) fn can_close_or_enqueue(&self) -> bool {
831 let Some(stream) = self.stream.get() else {
832 return false;
833 };
834
835 if !self.close_requested.get() && stream.is_readable() {
837 return true;
838 }
839
840 false
842 }
843
844 pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
846 let Some(stream) = self.stream.get() else {
847 return;
848 };
849
850 if !stream.is_readable() {
852 return;
853 }
854
855 self.queue.borrow_mut().reset();
857
858 self.clear_algorithms();
860
861 stream.error(e, can_gc);
862 }
863
864 pub(crate) fn has_backpressure(&self) -> bool {
866 !self.should_call_pull()
869 }
870}
871
872impl ReadableStreamDefaultControllerMethods<crate::DomTypeHolder>
873 for ReadableStreamDefaultController
874{
875 fn GetDesiredSize(&self) -> Option<f64> {
877 self.get_desired_size()
878 }
879
880 fn Close(&self, can_gc: CanGc) -> Fallible<()> {
882 if !self.can_close_or_enqueue() {
883 return Err(Error::Type("Stream cannot be closed.".to_string()));
886 }
887
888 self.close(can_gc);
890
891 Ok(())
892 }
893
894 fn Enqueue(&self, cx: SafeJSContext, chunk: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
896 if !self.can_close_or_enqueue() {
898 return Err(Error::Type("Stream cannot be enqueued to.".to_string()));
899 }
900
901 self.enqueue(cx, chunk, can_gc)
903 }
904
905 fn Error(&self, _cx: SafeJSContext, e: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
907 self.error(e, can_gc);
908 Ok(())
909 }
910}