1use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::ptr;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::jsapi::{Heap, JSObject};
12use js::jsval::{JSVal, UndefinedValue};
13use js::rust::wrappers::JS_GetPendingException;
14use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue, MutableHandleValue};
15use js::typedarray::Uint8;
16use script_bindings::conversions::SafeToJSValConvertible;
17
18use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
19use super::bindings::root::Dom;
20use crate::dom::bindings::buffer_source::create_buffer_source;
21use crate::dom::bindings::callback::ExceptionHandling;
22use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultControllerMethods;
23use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
24use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible, throw_dom_exception};
25use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
26use crate::dom::bindings::root::{DomRoot, MutNullableDom};
27use crate::dom::bindings::trace::RootedTraceableBox;
28use crate::dom::globalscope::GlobalScope;
29use crate::dom::promise::Promise;
30use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
31use crate::dom::readablestream::ReadableStream;
32use crate::dom::readablestreamdefaultreader::ReadRequest;
33use crate::dom::underlyingsourcecontainer::{UnderlyingSourceContainer, UnderlyingSourceType};
34use crate::realms::{InRealm, enter_realm};
35use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
36
37#[derive(Clone, JSTraceable, MallocSizeOf)]
40#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
41struct PullAlgorithmFulfillmentHandler {
42 controller: Dom<ReadableStreamDefaultController>,
43}
44
45impl Callback for PullAlgorithmFulfillmentHandler {
46 fn callback(&self, _cx: SafeJSContext, _v: HandleValue, _realm: InRealm, can_gc: CanGc) {
49 self.controller.pulling.set(false);
51
52 if self.controller.pull_again.get() {
54 self.controller.pull_again.set(false);
56
57 self.controller.call_pull_if_needed(can_gc);
59 }
60 }
61}
62
63#[derive(Clone, JSTraceable, MallocSizeOf)]
66#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
67struct PullAlgorithmRejectionHandler {
68 controller: Dom<ReadableStreamDefaultController>,
69}
70
71impl Callback for PullAlgorithmRejectionHandler {
72 fn callback(&self, _cx: SafeJSContext, v: HandleValue, _realm: InRealm, can_gc: CanGc) {
75 self.controller.error(v, can_gc);
77 }
78}
79
80#[derive(Clone, JSTraceable, MallocSizeOf)]
83#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
84struct StartAlgorithmFulfillmentHandler {
85 controller: Dom<ReadableStreamDefaultController>,
86}
87
88impl Callback for StartAlgorithmFulfillmentHandler {
89 fn callback(&self, _cx: SafeJSContext, _v: HandleValue, _realm: InRealm, can_gc: CanGc) {
92 self.controller.started.set(true);
94
95 self.controller.call_pull_if_needed(can_gc);
97 }
98}
99
100#[derive(Clone, JSTraceable, MallocSizeOf)]
103#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
104struct StartAlgorithmRejectionHandler {
105 controller: Dom<ReadableStreamDefaultController>,
106}
107
108impl Callback for StartAlgorithmRejectionHandler {
109 fn callback(&self, _cx: SafeJSContext, v: HandleValue, _realm: InRealm, can_gc: CanGc) {
112 self.controller.error(v, can_gc);
114 }
115}
116
117#[derive(Debug, JSTraceable, MallocSizeOf, PartialEq)]
119#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
120pub(crate) struct ValueWithSize {
121 #[ignore_malloc_size_of = "Heap is measured by mozjs"]
123 pub(crate) value: Box<Heap<JSVal>>,
124 pub(crate) size: f64,
126}
127
128#[derive(Debug, JSTraceable, MallocSizeOf, PartialEq)]
130#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
131pub(crate) enum EnqueuedValue {
132 Native(Box<[u8]>),
134 Js(ValueWithSize),
136 CloseSentinel,
138}
139
140impl EnqueuedValue {
141 fn size(&self) -> f64 {
142 match self {
143 EnqueuedValue::Native(v) => v.len() as f64,
144 EnqueuedValue::Js(v) => v.size,
145 EnqueuedValue::CloseSentinel => 0.,
148 }
149 }
150
151 fn to_jsval(&self, cx: SafeJSContext, rval: MutableHandleValue, can_gc: CanGc) {
152 match self {
153 EnqueuedValue::Native(chunk) => {
154 rooted!(in(*cx) let mut array_buffer_ptr = ptr::null_mut::<JSObject>());
155 create_buffer_source::<Uint8>(cx, chunk, array_buffer_ptr.handle_mut(), can_gc)
156 .expect("failed to create buffer source for native chunk.");
157 array_buffer_ptr.safe_to_jsval(cx, rval, can_gc);
158 },
159 EnqueuedValue::Js(value_with_size) => {
160 value_with_size.value.safe_to_jsval(cx, rval, can_gc)
161 },
162 EnqueuedValue::CloseSentinel => {
163 unreachable!("The close sentinel is never made available as a js val.")
164 },
165 }
166 }
167}
168
169fn is_non_negative_number(value: &EnqueuedValue) -> bool {
171 let value_with_size = match value {
172 EnqueuedValue::Native(_) => return true,
173 EnqueuedValue::Js(value_with_size) => value_with_size,
174 EnqueuedValue::CloseSentinel => return true,
175 };
176
177 if value_with_size.size.is_nan() {
182 return false;
183 }
184
185 if value_with_size.size.is_sign_negative() {
187 return false;
188 }
189
190 true
191}
192
193#[derive(Default, JSTraceable, MallocSizeOf)]
195#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
196pub(crate) struct QueueWithSizes {
197 queue: RefCell<VecDeque<EnqueuedValue>>,
198 pub(crate) total_size: Cell<f64>,
200}
201
202impl QueueWithSizes {
203 pub(crate) fn dequeue_value(
207 &self,
208 cx: SafeJSContext,
209 rval: Option<MutableHandleValue>,
210 can_gc: CanGc,
211 ) {
212 {
213 let queue = self.queue.borrow();
214 let Some(value) = queue.front() else {
215 unreachable!("Buffer cannot be empty when dequeue value is called into.");
216 };
217 self.total_size.set(self.total_size.get() - value.size());
218 if let Some(rval) = rval {
219 value.to_jsval(cx, rval, can_gc);
220 } else {
221 assert_eq!(value, &EnqueuedValue::CloseSentinel);
222 }
223 }
224 self.queue.borrow_mut().pop_front();
225 }
226
227 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
229 pub(crate) fn enqueue_value_with_size(&self, value: EnqueuedValue) -> Result<(), Error> {
230 if !is_non_negative_number(&value) {
232 return Err(Error::Range(
233 "The size of the enqueued chunk is not a non-negative number.".to_string(),
234 ));
235 }
236
237 if value.size().is_infinite() {
239 return Err(Error::Range(
240 "The size of the enqueued chunk is infinite.".to_string(),
241 ));
242 }
243
244 self.total_size.set(self.total_size.get() + value.size());
245 self.queue.borrow_mut().push_back(value);
246
247 Ok(())
248 }
249
250 pub(crate) fn is_empty(&self) -> bool {
251 self.queue.borrow().is_empty()
252 }
253
254 pub(crate) fn peek_queue_value(
257 &self,
258 cx: SafeJSContext,
259 rval: MutableHandleValue,
260 can_gc: CanGc,
261 ) -> bool {
262 assert!(!self.is_empty());
267
268 let queue = self.queue.borrow();
270 let value_with_size = queue.front().expect("Queue is not empty.");
271 if let EnqueuedValue::CloseSentinel = value_with_size {
272 return true;
273 }
274
275 value_with_size.to_jsval(cx, rval, can_gc);
277 false
278 }
279
280 fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
282 self.queue
283 .borrow()
284 .iter()
285 .try_fold(Vec::new(), |mut acc, value| match value {
286 EnqueuedValue::Native(chunk) => {
287 acc.extend(chunk.iter().copied());
288 Some(acc)
289 },
290 _ => {
291 warn!("get_in_memory_bytes called on a controller with non-native source.");
292 None
293 },
294 })
295 }
296
297 pub(crate) fn reset(&self) {
299 self.queue.borrow_mut().clear();
300 self.total_size.set(Default::default());
301 }
302}
303
304#[dom_struct]
306pub(crate) struct ReadableStreamDefaultController {
307 reflector_: Reflector,
308
309 queue: QueueWithSizes,
311
312 underlying_source: MutNullableDom<UnderlyingSourceContainer>,
318
319 stream: MutNullableDom<ReadableStream>,
320
321 strategy_hwm: f64,
323
324 #[ignore_malloc_size_of = "mozjs"]
326 strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>,
327
328 close_requested: Cell<bool>,
330
331 started: Cell<bool>,
333
334 pulling: Cell<bool>,
336
337 pull_again: Cell<bool>,
339}
340
341impl ReadableStreamDefaultController {
342 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
343 fn new_inherited(
344 global: &GlobalScope,
345 underlying_source_type: UnderlyingSourceType,
346 strategy_hwm: f64,
347 strategy_size: Rc<QueuingStrategySize>,
348 can_gc: CanGc,
349 ) -> ReadableStreamDefaultController {
350 ReadableStreamDefaultController {
351 reflector_: Reflector::new(),
352 queue: Default::default(),
353 stream: MutNullableDom::new(None),
354 underlying_source: MutNullableDom::new(Some(&*UnderlyingSourceContainer::new(
355 global,
356 underlying_source_type,
357 can_gc,
358 ))),
359 strategy_hwm,
360 strategy_size: RefCell::new(Some(strategy_size)),
361 close_requested: Default::default(),
362 started: Default::default(),
363 pulling: Default::default(),
364 pull_again: Default::default(),
365 }
366 }
367
368 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
369 pub(crate) fn new(
370 global: &GlobalScope,
371 underlying_source: UnderlyingSourceType,
372 strategy_hwm: f64,
373 strategy_size: Rc<QueuingStrategySize>,
374 can_gc: CanGc,
375 ) -> DomRoot<ReadableStreamDefaultController> {
376 reflect_dom_object(
377 Box::new(ReadableStreamDefaultController::new_inherited(
378 global,
379 underlying_source,
380 strategy_hwm,
381 strategy_size,
382 can_gc,
383 )),
384 global,
385 can_gc,
386 )
387 }
388
389 pub(crate) fn setup(
391 &self,
392 stream: DomRoot<ReadableStream>,
393 can_gc: CanGc,
394 ) -> Result<(), Error> {
395 stream.assert_no_controller();
397
398 self.stream.set(Some(&stream));
400
401 let global = &*self.global();
402 let rooted_default_controller = DomRoot::from_ref(self);
403
404 stream.set_default_controller(&rooted_default_controller);
417
418 if let Some(underlying_source) = rooted_default_controller.underlying_source.get() {
419 let start_result = underlying_source
421 .call_start_algorithm(
422 Controller::ReadableStreamDefaultController(rooted_default_controller.clone()),
423 can_gc,
424 )
425 .unwrap_or_else(|| {
426 let promise = Promise::new(global, can_gc);
427 promise.resolve_native(&(), can_gc);
428 Ok(promise)
429 });
430
431 let start_promise = start_result?;
433
434 let handler = PromiseNativeHandler::new(
436 global,
437 Some(Box::new(StartAlgorithmFulfillmentHandler {
438 controller: Dom::from_ref(&rooted_default_controller),
439 })),
440 Some(Box::new(StartAlgorithmRejectionHandler {
441 controller: Dom::from_ref(&rooted_default_controller),
442 })),
443 can_gc,
444 );
445 let realm = enter_realm(global);
446 let comp = InRealm::Entered(&realm);
447 start_promise.append_native_handler(&handler, comp, can_gc);
448 };
449
450 Ok(())
451 }
452
453 pub(crate) fn set_underlying_source_this_object(&self, this_object: HandleObject) {
455 if let Some(underlying_source) = self.underlying_source.get() {
456 underlying_source.set_underlying_source_this_object(this_object);
457 }
458 }
459
460 fn dequeue_value(&self, cx: SafeJSContext, rval: MutableHandleValue, can_gc: CanGc) {
462 self.queue.dequeue_value(cx, Some(rval), can_gc);
463 }
464
465 fn should_call_pull(&self) -> bool {
467 let Some(stream) = self.stream.get() else {
471 debug!("`should_call_pull` called on a controller without a stream.");
472 return false;
473 };
474
475 if !self.can_close_or_enqueue() {
477 return false;
478 }
479
480 if !self.started.get() {
482 return false;
483 }
484
485 if stream.is_locked() && stream.get_num_read_requests() > 0 {
488 return true;
489 }
490
491 let desired_size = self.get_desired_size().expect("desiredSize is not null.");
494
495 if desired_size > 0. {
496 return true;
497 }
498
499 false
500 }
501
502 fn call_pull_if_needed(&self, can_gc: CanGc) {
504 if !self.should_call_pull() {
507 return;
508 }
509
510 if self.pulling.get() {
512 self.pull_again.set(true);
514
515 return;
516 }
517
518 self.pulling.set(true);
520
521 let global = self.global();
524 let rooted_default_controller = DomRoot::from_ref(self);
525 let controller =
526 Controller::ReadableStreamDefaultController(rooted_default_controller.clone());
527
528 let Some(underlying_source) = self.underlying_source.get() else {
529 return;
530 };
531 let handler = PromiseNativeHandler::new(
532 &global,
533 Some(Box::new(PullAlgorithmFulfillmentHandler {
534 controller: Dom::from_ref(&rooted_default_controller),
535 })),
536 Some(Box::new(PullAlgorithmRejectionHandler {
537 controller: Dom::from_ref(&rooted_default_controller),
538 })),
539 can_gc,
540 );
541
542 let realm = enter_realm(&*global);
543 let comp = InRealm::Entered(&realm);
544 let result = underlying_source
545 .call_pull_algorithm(controller, &global, can_gc)
546 .unwrap_or_else(|| {
547 let promise = Promise::new(&global, can_gc);
548 promise.resolve_native(&(), can_gc);
549 Ok(promise)
550 });
551 let promise = result.unwrap_or_else(|error| {
552 let cx = GlobalScope::get_cx();
553 rooted!(in(*cx) let mut rval = UndefinedValue());
554 error
556 .clone()
557 .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
558 let promise = Promise::new(&global, can_gc);
559 promise.reject_native(&rval.handle(), can_gc);
560 promise
561 });
562 promise.append_native_handler(&handler, comp, can_gc);
563 }
564
565 pub(crate) fn perform_cancel_steps(
567 &self,
568 cx: SafeJSContext,
569 global: &GlobalScope,
570 reason: SafeHandleValue,
571 can_gc: CanGc,
572 ) -> Rc<Promise> {
573 self.queue.reset();
575
576 let underlying_source = self
577 .underlying_source
578 .get()
579 .expect("Controller should have a source when the cancel steps are called into.");
580 let result = underlying_source
582 .call_cancel_algorithm(cx, global, reason, can_gc)
583 .unwrap_or_else(|| {
584 let promise = Promise::new(global, can_gc);
585 promise.resolve_native(&(), can_gc);
586 Ok(promise)
587 });
588 let promise = result.unwrap_or_else(|error| {
589 rooted!(in(*cx) let mut rval = UndefinedValue());
590
591 error
592 .clone()
593 .to_jsval(cx, global, rval.handle_mut(), can_gc);
594 let promise = Promise::new(global, can_gc);
595 promise.reject_native(&rval.handle(), can_gc);
596 promise
597 });
598
599 self.clear_algorithms();
601
602 promise
604 }
605
606 pub(crate) fn perform_pull_steps(&self, read_request: &ReadRequest, can_gc: CanGc) {
608 let Some(stream) = self.stream.get() else {
611 return;
612 };
613
614 if !self.queue.is_empty() {
616 let cx = GlobalScope::get_cx();
617 rooted!(in(*cx) let mut rval = UndefinedValue());
618 let result = RootedTraceableBox::new(Heap::default());
619 self.dequeue_value(cx, rval.handle_mut(), can_gc);
620 result.set(*rval);
621
622 if self.close_requested.get() && self.queue.is_empty() {
624 self.clear_algorithms();
626
627 stream.close(can_gc);
629 } else {
630 self.call_pull_if_needed(can_gc);
632 }
633 read_request.chunk_steps(result, can_gc);
635 } else {
636 stream.add_read_request(read_request);
638
639 self.call_pull_if_needed(can_gc);
641 }
642 }
643
644 pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
646 Ok(())
648 }
649
650 #[allow(unsafe_code)]
652 pub(crate) fn enqueue(
653 &self,
654 cx: SafeJSContext,
655 chunk: SafeHandleValue,
656 can_gc: CanGc,
657 ) -> Result<(), Error> {
658 if !self.can_close_or_enqueue() {
660 return Ok(());
661 }
662
663 let stream = self
664 .stream
665 .get()
666 .expect("Controller must have a stream when a chunk is enqueued.");
667
668 if stream.is_locked() && stream.get_num_read_requests() > 0 {
672 stream.fulfill_read_request(chunk, false, can_gc);
673 } else {
674 let strategy_size = {
679 let reference = self.strategy_size.borrow();
680 reference.clone()
681 };
682 let size = if let Some(strategy_size) = strategy_size {
683 let result = strategy_size.Call__(chunk, ExceptionHandling::Rethrow, can_gc);
686 match result {
687 Ok(size) => size,
689 Err(error) => {
690 rooted!(in(*cx) let mut rval = UndefinedValue());
692 unsafe { assert!(JS_GetPendingException(*cx, rval.handle_mut())) };
693
694 self.error(rval.handle(), can_gc);
696
697 return Err(error);
700 },
701 }
702 } else {
703 0.
704 };
705
706 {
707 let res = self
709 .queue
710 .enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize {
711 value: Heap::boxed(chunk.get()),
712 size,
713 }));
714 if let Err(error) = res {
715 throw_dom_exception(cx, &self.global(), error, can_gc);
721
722 rooted!(in(*cx) let mut rval = UndefinedValue());
725 unsafe { assert!(JS_GetPendingException(*cx, rval.handle_mut())) };
726
727 self.error(rval.handle(), can_gc);
729
730 return Err(Error::JSFailed);
734 }
735 }
736 }
737
738 self.call_pull_if_needed(can_gc);
740
741 Ok(())
742 }
743
744 pub(crate) fn enqueue_native(&self, chunk: Vec<u8>, can_gc: CanGc) {
747 let stream = self
748 .stream
749 .get()
750 .expect("Controller must have a stream when a chunk is enqueued.");
751 if stream.is_locked() && stream.get_num_read_requests() > 0 {
752 let cx = GlobalScope::get_cx();
753 rooted!(in(*cx) let mut rval = UndefinedValue());
754 EnqueuedValue::Native(chunk.into_boxed_slice()).to_jsval(cx, rval.handle_mut(), can_gc);
755 stream.fulfill_read_request(rval.handle(), false, can_gc);
756 } else {
757 self.queue
758 .enqueue_value_with_size(EnqueuedValue::Native(chunk.into_boxed_slice()))
759 .expect("Enqueuing a chunk from Rust should not fail.");
760 }
761 }
762
763 pub(crate) fn in_memory(&self) -> bool {
765 let Some(underlying_source) = self.underlying_source.get() else {
766 return false;
767 };
768 underlying_source.in_memory()
769 }
770
771 pub(crate) fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
773 let underlying_source = self.underlying_source.get()?;
774 if underlying_source.in_memory() {
775 return self.queue.get_in_memory_bytes();
776 }
777 None
778 }
779
780 fn clear_algorithms(&self) {
782 self.underlying_source.set(None);
785
786 *self.strategy_size.borrow_mut() = None;
788 }
789
790 pub(crate) fn close(&self, can_gc: CanGc) {
792 if !self.can_close_or_enqueue() {
794 return;
795 }
796
797 let Some(stream) = self.stream.get() else {
798 return;
799 };
800
801 self.close_requested.set(true);
803
804 if self.queue.is_empty() {
805 self.clear_algorithms();
807
808 stream.close(can_gc);
810 }
811 }
812
813 pub(crate) fn get_desired_size(&self) -> Option<f64> {
815 let stream = self.stream.get()?;
816
817 if stream.is_errored() {
819 return None;
820 }
821
822 if stream.is_closed() {
824 return Some(0.0);
825 }
826
827 let desired_size = self.strategy_hwm - self.queue.total_size.get().clamp(0.0, f64::MAX);
829 Some(desired_size.clamp(desired_size, self.strategy_hwm))
830 }
831
832 pub(crate) fn can_close_or_enqueue(&self) -> bool {
834 let Some(stream) = self.stream.get() else {
835 return false;
836 };
837
838 if !self.close_requested.get() && stream.is_readable() {
840 return true;
841 }
842
843 false
845 }
846
847 pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
849 let Some(stream) = self.stream.get() else {
850 return;
851 };
852
853 if !stream.is_readable() {
855 return;
856 }
857
858 self.queue.reset();
860
861 self.clear_algorithms();
863
864 stream.error(e, can_gc);
865 }
866
867 pub(crate) fn has_backpressure(&self) -> bool {
869 !self.should_call_pull()
872 }
873}
874
875impl ReadableStreamDefaultControllerMethods<crate::DomTypeHolder>
876 for ReadableStreamDefaultController
877{
878 fn GetDesiredSize(&self) -> Option<f64> {
880 self.get_desired_size()
881 }
882
883 fn Close(&self, can_gc: CanGc) -> Fallible<()> {
885 if !self.can_close_or_enqueue() {
886 return Err(Error::Type("Stream cannot be closed.".to_string()));
889 }
890
891 self.close(can_gc);
893
894 Ok(())
895 }
896
897 fn Enqueue(&self, cx: SafeJSContext, chunk: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
899 if !self.can_close_or_enqueue() {
901 return Err(Error::Type("Stream cannot be enqueued to.".to_string()));
902 }
903
904 self.enqueue(cx, chunk, can_gc)
906 }
907
908 fn Error(&self, _cx: SafeJSContext, e: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
910 self.error(e, can_gc);
911 Ok(())
912 }
913}