1use std::cell::{Cell, RefCell};
6use std::ptr;
7use std::rc::Rc;
8
9use dom_struct::dom_struct;
10use js::context::JSContext;
11use js::jsapi::{Heap, IsPromiseObject, JSObject};
12use js::jsval::{JSVal, UndefinedValue};
13use js::realm::CurrentRealm;
14use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle};
15use script_bindings::reflector::{Reflector, reflect_dom_object_with_cx};
16
17use crate::dom::bindings::callback::ExceptionHandling;
18use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
19use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::{
20 UnderlyingSinkAbortCallback, UnderlyingSinkCloseCallback, UnderlyingSinkStartCallback,
21 UnderlyingSinkWriteCallback,
22};
23use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultControllerBinding::WritableStreamDefaultControllerMethods;
24use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
25use crate::dom::bindings::reflector::DomGlobal;
26use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
27use crate::dom::globalscope::GlobalScope;
28use crate::dom::messageport::MessagePort;
29use crate::dom::promise::Promise;
30use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
31use crate::dom::readablestreamdefaultcontroller::{EnqueuedValue, QueueWithSizes, ValueWithSize};
32use crate::dom::stream::writablestream::WritableStream;
33use crate::dom::types::{AbortController, AbortSignal, TransformStream};
34use crate::realms::enter_auto_realm;
35
36impl js::gc::Rootable for CloseAlgorithmFulfillmentHandler {}
37
38#[derive(Clone, JSTraceable, MallocSizeOf)]
41#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
42struct CloseAlgorithmFulfillmentHandler {
43 stream: Dom<WritableStream>,
44}
45
46impl Callback for CloseAlgorithmFulfillmentHandler {
47 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
48 let stream = self.stream.as_rooted();
49
50 stream.finish_in_flight_close(cx);
52 }
53}
54
55impl js::gc::Rootable for CloseAlgorithmRejectionHandler {}
56
57#[derive(Clone, JSTraceable, MallocSizeOf)]
60#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
61struct CloseAlgorithmRejectionHandler {
62 stream: Dom<WritableStream>,
63}
64
65impl Callback for CloseAlgorithmRejectionHandler {
66 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
67 let stream = self.stream.as_rooted();
68
69 let global = GlobalScope::from_current_realm(cx);
70
71 stream.finish_in_flight_close_with_error(cx, &global, v);
73 }
74}
75
76impl js::gc::Rootable for StartAlgorithmFulfillmentHandler {}
77
78#[derive(Clone, JSTraceable, MallocSizeOf)]
81#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
82struct StartAlgorithmFulfillmentHandler {
83 controller: Dom<WritableStreamDefaultController>,
84}
85
86impl Callback for StartAlgorithmFulfillmentHandler {
87 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
90 let controller = self.controller.as_rooted();
91 let stream = controller
92 .stream
93 .get()
94 .expect("Controller should have a stream.");
95
96 assert!(stream.is_erroring() || stream.is_writable());
98
99 controller.started.set(true);
101
102 let global = GlobalScope::from_current_realm(cx);
103
104 controller.advance_queue_if_needed(cx, &global)
106 }
107}
108
109impl js::gc::Rootable for StartAlgorithmRejectionHandler {}
110
111#[derive(Clone, JSTraceable, MallocSizeOf)]
114#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
115struct StartAlgorithmRejectionHandler {
116 controller: Dom<WritableStreamDefaultController>,
117}
118
119impl Callback for StartAlgorithmRejectionHandler {
120 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
123 let controller = self.controller.as_rooted();
124 let stream = controller
125 .stream
126 .get()
127 .expect("Controller should have a stream.");
128
129 assert!(stream.is_erroring() || stream.is_writable());
131
132 controller.started.set(true);
134
135 let global = GlobalScope::from_current_realm(cx);
136
137 stream.deal_with_rejection(cx, &global, v);
139 }
140}
141
142impl js::gc::Rootable for TransferBackPressurePromiseReaction {}
143
144#[derive(JSTraceable, MallocSizeOf)]
147#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
148struct TransferBackPressurePromiseReaction {
149 #[conditional_malloc_size_of]
151 result_promise: Rc<Promise>,
152
153 #[ignore_malloc_size_of = "nested Rc"]
155 backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
156
157 #[ignore_malloc_size_of = "mozjs"]
159 chunk: Box<Heap<JSVal>>,
160
161 port: Dom<MessagePort>,
163}
164
165impl Callback for TransferBackPressurePromiseReaction {
166 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
168 let global = self.result_promise.global();
169 let promise = Promise::new(cx, &global);
171 *self.backpressure_promise.borrow_mut() = Some(promise);
172
173 rooted!(&in(cx) let mut chunk = UndefinedValue());
175 chunk.set(self.chunk.get());
176 let result = self
177 .port
178 .pack_and_post_message_handling_error(cx, "chunk", chunk.handle());
179
180 if let Err(error) = result {
182 global.disentangle_port(cx, &self.port);
184
185 self.result_promise.reject_error(cx, error);
187 } else {
188 self.result_promise.resolve_native(cx, &());
190 }
191 }
192}
193
194impl js::gc::Rootable for WriteAlgorithmFulfillmentHandler {}
195
196#[derive(Clone, JSTraceable, MallocSizeOf)]
199#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
200struct WriteAlgorithmFulfillmentHandler {
201 controller: Dom<WritableStreamDefaultController>,
202}
203
204impl Callback for WriteAlgorithmFulfillmentHandler {
205 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
206 let controller = self.controller.as_rooted();
207 let stream = controller
208 .stream
209 .get()
210 .expect("Controller should have a stream.");
211
212 stream.finish_in_flight_write(cx);
214
215 assert!(stream.is_erroring() || stream.is_writable());
218
219 rooted!(&in(cx) let mut rval = UndefinedValue());
221 controller.queue.dequeue_value(cx, Some(rval.handle_mut()));
222
223 let global = GlobalScope::from_current_realm(cx);
224
225 if !stream.close_queued_or_in_flight() && stream.is_writable() {
227 let backpressure = controller.get_backpressure();
229
230 stream.update_backpressure(cx, backpressure, &global);
232 }
233
234 controller.advance_queue_if_needed(cx, &global)
236 }
237}
238
239impl js::gc::Rootable for WriteAlgorithmRejectionHandler {}
240
241#[derive(Clone, JSTraceable, MallocSizeOf)]
244#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
245struct WriteAlgorithmRejectionHandler {
246 controller: Dom<WritableStreamDefaultController>,
247}
248
249impl Callback for WriteAlgorithmRejectionHandler {
250 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
251 let controller = self.controller.as_rooted();
252 let stream = controller
253 .stream
254 .get()
255 .expect("Controller should have a stream.");
256
257 if stream.is_writable() {
259 controller.clear_algorithms();
261 }
262
263 let global = GlobalScope::from_current_realm(cx);
264
265 stream.finish_in_flight_write_with_error(cx, &global, v);
267 }
268}
269
270#[derive(JSTraceable, PartialEq)]
272#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
273pub enum UnderlyingSinkType {
274 Js {
276 abort: RefCell<Option<Rc<UnderlyingSinkAbortCallback>>>,
278
279 start: RefCell<Option<Rc<UnderlyingSinkStartCallback>>>,
280
281 close: RefCell<Option<Rc<UnderlyingSinkCloseCallback>>>,
283
284 write: RefCell<Option<Rc<UnderlyingSinkWriteCallback>>>,
286 },
287 Transfer {
290 backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
291 port: Dom<MessagePort>,
292 },
293 Transform(Dom<TransformStream>, Rc<Promise>),
295}
296
297impl UnderlyingSinkType {
298 pub(crate) fn new_js(
299 abort: Option<Rc<UnderlyingSinkAbortCallback>>,
300 start: Option<Rc<UnderlyingSinkStartCallback>>,
301 close: Option<Rc<UnderlyingSinkCloseCallback>>,
302 write: Option<Rc<UnderlyingSinkWriteCallback>>,
303 ) -> Self {
304 UnderlyingSinkType::Js {
305 abort: RefCell::new(abort),
306 start: RefCell::new(start),
307 close: RefCell::new(close),
308 write: RefCell::new(write),
309 }
310 }
311}
312
313#[dom_struct]
315pub struct WritableStreamDefaultController {
316 reflector_: Reflector,
317
318 #[ignore_malloc_size_of = "underlying_sink_type"]
321 underlying_sink_type: UnderlyingSinkType,
322
323 #[ignore_malloc_size_of = "mozjs"]
325 underlying_sink_obj: Heap<*mut JSObject>,
326
327 queue: QueueWithSizes,
329
330 started: Cell<bool>,
332
333 strategy_hwm: f64,
335
336 #[ignore_malloc_size_of = "QueuingStrategySize"]
338 strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>,
339
340 stream: MutNullableDom<WritableStream>,
342
343 abort_controller: Dom<AbortController>,
345}
346
347impl WritableStreamDefaultController {
348 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
350 fn new_inherited(
351 cx: &mut JSContext,
352 global: &GlobalScope,
353 underlying_sink_type: UnderlyingSinkType,
354 strategy_hwm: f64,
355 strategy_size: Rc<QueuingStrategySize>,
356 ) -> WritableStreamDefaultController {
357 WritableStreamDefaultController {
358 reflector_: Reflector::new(),
359 underlying_sink_type,
360 queue: Default::default(),
361 stream: Default::default(),
362 underlying_sink_obj: Default::default(),
363 strategy_hwm,
364 strategy_size: RefCell::new(Some(strategy_size)),
365 started: Default::default(),
366 abort_controller: Dom::from_ref(&AbortController::new_with_proto(cx, global, None)),
367 }
368 }
369
370 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
371 pub(crate) fn new(
372 cx: &mut JSContext,
373 global: &GlobalScope,
374 underlying_sink_type: UnderlyingSinkType,
375 strategy_hwm: f64,
376 strategy_size: Rc<QueuingStrategySize>,
377 ) -> DomRoot<WritableStreamDefaultController> {
378 reflect_dom_object_with_cx(
379 Box::new(WritableStreamDefaultController::new_inherited(
380 cx,
381 global,
382 underlying_sink_type,
383 strategy_hwm,
384 strategy_size,
385 )),
386 global,
387 cx,
388 )
389 }
390
391 pub(crate) fn started(&self) -> bool {
392 self.started.get()
393 }
394
395 pub(crate) fn set_underlying_sink_this_object(&self, this_object: SafeHandleObject) {
397 self.underlying_sink_obj.set(*this_object);
398 }
399
400 pub(crate) fn signal_abort(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) {
402 self.abort_controller.signal_abort(cx, reason);
403 }
404
405 fn clear_algorithms(&self) {
407 match &self.underlying_sink_type {
408 UnderlyingSinkType::Js {
409 abort,
410 start: _,
411 close,
412 write,
413 } => {
414 write.borrow_mut().take();
416
417 close.borrow_mut().take();
419
420 abort.borrow_mut().take();
422 },
423 UnderlyingSinkType::Transfer {
424 backpressure_promise,
425 ..
426 } => {
427 backpressure_promise.borrow_mut().take();
428 },
429 UnderlyingSinkType::Transform(_, _) => {
430 return;
431 },
432 }
433
434 self.strategy_size.borrow_mut().take();
436 }
437
438 pub(crate) fn setup(
440 &self,
441 cx: &mut JSContext,
442 global: &GlobalScope,
443 stream: &WritableStream,
444 ) -> Result<(), Error> {
445 stream.assert_no_controller();
450
451 self.stream.set(Some(stream));
453
454 stream.set_default_controller(self);
456
457 let backpressure = self.get_backpressure();
477
478 stream.update_backpressure(cx, backpressure, global);
480
481 let start_promise = self.start_algorithm(cx, global)?;
484
485 let rooted_default_controller = DomRoot::from_ref(self);
486
487 rooted!(&in(cx) let mut fulfillment_handler = Some(StartAlgorithmFulfillmentHandler {
489 controller: Dom::from_ref(&rooted_default_controller),
490 }));
491
492 rooted!(&in(cx) let mut rejection_handler = Some(StartAlgorithmRejectionHandler {
494 controller: Dom::from_ref(&rooted_default_controller),
495 }));
496
497 let handler = PromiseNativeHandler::new(
498 cx,
499 global,
500 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
501 rejection_handler.take().map(|h| Box::new(h) as Box<_>),
502 );
503 let mut realm = enter_auto_realm(cx, global);
504 let cx = &mut realm.current_realm();
505 start_promise.append_native_handler(cx, &handler);
506
507 Ok(())
508 }
509
510 pub(crate) fn close(&self, cx: &mut JSContext, global: &GlobalScope) {
512 self.queue
514 .enqueue_value_with_size(EnqueuedValue::CloseSentinel)
515 .expect("Enqueuing the close sentinel should not fail.");
516 self.advance_queue_if_needed(cx, global);
518 }
519
520 #[expect(unsafe_code)]
521 fn start_algorithm(&self, cx: &mut JSContext, global: &GlobalScope) -> Fallible<Rc<Promise>> {
522 match &self.underlying_sink_type {
523 UnderlyingSinkType::Js {
524 start,
525 abort: _,
526 close: _,
527 write: _,
528 } => {
529 let algo = start.borrow().clone();
530 let start_promise = if let Some(start) = algo {
531 rooted!(&in(cx) let mut result_object = ptr::null_mut::<JSObject>());
532 rooted!(&in(cx) let mut result: JSVal);
533 rooted!(&in(cx) let this_object = self.underlying_sink_obj.get());
534 start.Call_(
535 cx,
536 &this_object.handle(),
537 self,
538 result.handle_mut(),
539 ExceptionHandling::Rethrow,
540 )?;
541 let is_promise = unsafe {
542 if result.is_object() {
543 result_object.set(result.to_object());
544 IsPromiseObject(result_object.handle().into_handle())
545 } else {
546 false
547 }
548 };
549 if is_promise {
550 Promise::new_with_js_promise(cx, result_object.handle())
551 } else {
552 Promise::new_resolved(cx, global, result.get())
553 }
554 } else {
555 Promise::new_resolved(cx, global, ())
557 };
558
559 Ok(start_promise)
560 },
561 UnderlyingSinkType::Transfer { .. } => {
562 Ok(Promise::new_resolved(cx, global, ()))
564 },
565 UnderlyingSinkType::Transform(_, start_promise) => {
566 Ok(start_promise.clone())
568 },
569 }
570 }
571
572 pub(crate) fn abort_steps(
574 &self,
575 cx: &mut JSContext,
576 global: &GlobalScope,
577 reason: SafeHandleValue,
578 ) -> Rc<Promise> {
579 let result = match &self.underlying_sink_type {
580 UnderlyingSinkType::Js {
581 abort,
582 start: _,
583 close: _,
584 write: _,
585 } => {
586 rooted!(&in(cx) let this_object = self.underlying_sink_obj.get());
587 let algo = abort.borrow().clone();
588 let result = if let Some(algo) = algo {
590 algo.Call_(
591 cx,
592 &this_object.handle(),
593 Some(reason),
594 ExceptionHandling::Rethrow,
595 )
596 } else {
597 Ok(Promise::new_resolved(cx, global, ()))
598 };
599 result.unwrap_or_else(|e| {
600 let promise = Promise::new(cx, global);
601 promise.reject_error(cx, e);
602 promise
603 })
604 },
605 UnderlyingSinkType::Transfer { port, .. } => {
606 let result = port.pack_and_post_message_handling_error(cx, "error", reason);
611
612 global.disentangle_port(cx, port);
614
615 let promise = Promise::new(cx, global);
616
617 if let Err(error) = result {
619 promise.reject_error(cx, error);
620 } else {
621 promise.resolve_native(cx, &());
623 }
624 promise
625 },
626 UnderlyingSinkType::Transform(stream, _) => {
627 stream
629 .transform_stream_default_sink_abort_algorithm(cx, global, reason)
630 .expect("Transform stream default sink abort algorithm should not fail.")
631 },
632 };
633
634 self.clear_algorithms();
636
637 result
638 }
639
640 fn call_write_algorithm(
642 &self,
643 cx: &mut JSContext,
644 chunk: SafeHandleValue,
645 global: &GlobalScope,
646 ) -> Rc<Promise> {
647 match &self.underlying_sink_type {
648 UnderlyingSinkType::Js {
649 abort: _,
650 start: _,
651 close: _,
652 write,
653 } => {
654 rooted!(&in(cx) let this_object = self.underlying_sink_obj.get());
655 let algo = write.borrow().clone();
656 let result = if let Some(algo) = algo {
657 algo.Call_(
658 cx,
659 &this_object.handle(),
660 chunk,
661 self,
662 ExceptionHandling::Rethrow,
663 )
664 } else {
665 Ok(Promise::new_resolved(cx, global, ()))
666 };
667 result.unwrap_or_else(|e| {
668 let promise = Promise::new(cx, global);
669 promise.reject_error(cx, e);
670 promise
671 })
672 },
673 UnderlyingSinkType::Transfer {
674 backpressure_promise,
675 port,
676 } => {
677 if backpressure_promise.borrow().is_none() {
683 let promise = Promise::new_resolved(cx, global, ());
684 *backpressure_promise.borrow_mut() = Some(promise);
685 }
686
687 let result_promise = Promise::new(cx, global);
689 rooted!(&in(cx) let mut fulfillment_handler = Some(TransferBackPressurePromiseReaction {
690 port: port.clone(),
691 backpressure_promise: backpressure_promise.clone(),
692 chunk: Heap::boxed(chunk.get()),
693 result_promise: result_promise.clone(),
694 }));
695 let handler = PromiseNativeHandler::new(
696 cx,
697 global,
698 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
699 None,
700 );
701 let mut realm = enter_auto_realm(cx, global);
702 let realm = &mut realm.current_realm();
703 backpressure_promise
704 .borrow()
705 .as_ref()
706 .expect("Promise must be some by now.")
707 .append_native_handler(realm, &handler);
708 result_promise
709 },
710 UnderlyingSinkType::Transform(stream, _) => {
711 stream
713 .transform_stream_default_sink_write_algorithm(cx, global, chunk)
714 .expect("Transform stream default sink write algorithm should not fail.")
715 },
716 }
717 }
718
719 fn call_close_algorithm(&self, cx: &mut JSContext, global: &GlobalScope) -> Rc<Promise> {
721 match &self.underlying_sink_type {
722 UnderlyingSinkType::Js {
723 abort: _,
724 start: _,
725 close,
726 write: _,
727 } => {
728 rooted!(&in(cx) let mut this_object = ptr::null_mut::<JSObject>());
729 this_object.set(self.underlying_sink_obj.get());
730 let algo = close.borrow().clone();
731 let result = if let Some(algo) = algo {
732 algo.Call_(cx, &this_object.handle(), ExceptionHandling::Rethrow)
733 } else {
734 Ok(Promise::new_resolved(cx, global, ()))
735 };
736 result.unwrap_or_else(|e| {
737 let promise = Promise::new(cx, global);
738 promise.reject_error(cx, e);
739 promise
740 })
741 },
742 UnderlyingSinkType::Transfer { port, .. } => {
743 rooted!(&in(cx) let mut value = UndefinedValue());
748 port.pack_and_post_message(cx, "close", value.handle())
749 .expect("Sending close should not fail.");
750
751 global.disentangle_port(cx, port);
753
754 Promise::new_resolved(cx, global, ())
756 },
757 UnderlyingSinkType::Transform(stream, _) => {
758 stream
760 .transform_stream_default_sink_close_algorithm(cx, global)
761 .expect("Transform stream default sink close algorithm should not fail.")
762 },
763 }
764 }
765
766 pub(crate) fn process_close(&self, cx: &mut JSContext, global: &GlobalScope) {
768 let Some(stream) = self.stream.get() else {
770 unreachable!("Controller should have a stream");
771 };
772
773 stream.mark_close_request_in_flight();
775
776 self.queue.dequeue_value(cx, None);
778
779 assert!(self.queue.is_empty());
781
782 let sink_close_promise = self.call_close_algorithm(cx, global);
784
785 self.clear_algorithms();
787
788 rooted!(&in(cx) let mut fulfillment_handler = Some(CloseAlgorithmFulfillmentHandler {
790 stream: Dom::from_ref(&stream),
791 }));
792
793 rooted!(&in(cx) let mut rejection_handler = Some(CloseAlgorithmRejectionHandler {
795 stream: Dom::from_ref(&stream),
796 }));
797
798 let handler = PromiseNativeHandler::new(
800 cx,
801 global,
802 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
803 rejection_handler.take().map(|h| Box::new(h) as Box<_>),
804 );
805 let mut realm = enter_auto_realm(cx, global);
806 let realm = &mut realm.current_realm();
807 sink_close_promise.append_native_handler(realm, &handler);
808 }
809
810 fn advance_queue_if_needed(&self, cx: &mut JSContext, global: &GlobalScope) {
812 let Some(stream) = self.stream.get() else {
814 unreachable!("Controller should have a stream");
815 };
816
817 if !self.started.get() {
819 return;
820 }
821
822 if stream.has_in_flight_write_request() {
824 return;
825 }
826
827 assert!(!(stream.is_errored() || stream.is_closed()));
831
832 if stream.is_erroring() {
834 stream.finish_erroring(cx, global);
836
837 return;
839 }
840
841 rooted!(&in(cx) let mut value = UndefinedValue());
843 let is_closed = {
844 if self.queue.is_empty() {
846 return;
847 }
848 self.queue.peek_queue_value(cx, value.handle_mut())
849 };
850
851 if is_closed {
852 self.process_close(cx, global);
854 } else {
855 self.process_write(cx, value.handle(), global);
857 };
858 }
859
860 pub(crate) fn perform_error_steps(&self) {
862 self.queue.reset();
864 }
865
866 fn process_write(&self, cx: &mut JSContext, chunk: SafeHandleValue, global: &GlobalScope) {
868 let Some(stream) = self.stream.get() else {
870 unreachable!("Controller should have a stream");
871 };
872
873 stream.mark_first_write_request_in_flight();
875
876 let sink_write_promise = self.call_write_algorithm(cx, chunk, global);
878
879 rooted!(&in(cx) let mut fulfillment_handler = Some(WriteAlgorithmFulfillmentHandler {
881 controller: Dom::from_ref(self),
882 }));
883
884 rooted!(&in(cx) let mut rejection_handler = Some(WriteAlgorithmRejectionHandler {
886 controller: Dom::from_ref(self),
887 }));
888
889 let handler = PromiseNativeHandler::new(
891 cx,
892 global,
893 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
894 rejection_handler.take().map(|h| Box::new(h) as Box<_>),
895 );
896 let mut realm = enter_auto_realm(cx, global);
897 let realm = &mut realm.current_realm();
898 sink_write_promise.append_native_handler(realm, &handler);
899 }
900
901 pub(crate) fn get_desired_size(&self) -> f64 {
903 let desired_size = self.strategy_hwm - self.queue.total_size.get().clamp(0.0, f64::MAX);
905 desired_size.clamp(desired_size, self.strategy_hwm)
906 }
907
908 fn get_backpressure(&self) -> bool {
910 let desired_size = self.get_desired_size();
912
913 desired_size == 0.0 || desired_size.is_sign_negative()
915 }
916
917 pub(crate) fn get_chunk_size(
919 &self,
920 cx: &mut JSContext,
921 global: &GlobalScope,
922 chunk: SafeHandleValue,
923 ) -> f64 {
924 let Some(strategy_size) = self.strategy_size.borrow().clone() else {
926 let Some(stream) = self.stream.get() else {
928 unreachable!("Controller should have a stream");
929 };
930 assert!(!stream.is_writable());
931
932 return 1.0;
934 };
935
936 let result = strategy_size.Call__(cx, chunk, ExceptionHandling::Rethrow);
939
940 match result {
941 Ok(size) => size,
943 Err(error) => {
944 rooted!(&in(cx) let mut rooted_error = UndefinedValue());
949 error.to_jsval(cx, global, rooted_error.handle_mut());
950 self.error_if_needed(cx, rooted_error.handle(), global);
951
952 1.0
954 },
955 }
956 }
957
958 pub(crate) fn write(
960 &self,
961 cx: &mut JSContext,
962 global: &GlobalScope,
963 chunk: SafeHandleValue,
964 chunk_size: f64,
965 ) {
966 let enqueue_result = self
968 .queue
969 .enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize {
970 value: Heap::boxed(chunk.get()),
971 size: chunk_size,
972 }));
973
974 if let Err(error) = enqueue_result {
976 rooted!(&in(cx) let mut rooted_error = UndefinedValue());
979 error.to_jsval(cx, global, rooted_error.handle_mut());
980 self.error_if_needed(cx, rooted_error.handle(), global);
981
982 return;
984 }
985
986 let Some(stream) = self.stream.get() else {
988 unreachable!("Controller should have a stream");
989 };
990
991 if !stream.close_queued_or_in_flight() && stream.is_writable() {
993 let backpressure = self.get_backpressure();
995
996 stream.update_backpressure(cx, backpressure, global);
998 }
999
1000 self.advance_queue_if_needed(cx, global);
1002 }
1003
1004 pub(crate) fn error_if_needed(
1006 &self,
1007 cx: &mut JSContext,
1008 error: SafeHandleValue,
1009 global: &GlobalScope,
1010 ) {
1011 let Some(stream) = self.stream.get() else {
1013 unreachable!("Controller should have a stream");
1014 };
1015
1016 if stream.is_writable() {
1018 self.error(cx, &stream, error, global);
1020 }
1021 }
1022
1023 fn error(
1025 &self,
1026 cx: &mut JSContext,
1027 stream: &WritableStream,
1028 e: SafeHandleValue,
1029 global: &GlobalScope,
1030 ) {
1031 assert!(stream.is_writable());
1036
1037 self.clear_algorithms();
1039
1040 stream.start_erroring(cx, global, e);
1042 }
1043}
1044
1045impl WritableStreamDefaultControllerMethods<crate::DomTypeHolder>
1046 for WritableStreamDefaultController
1047{
1048 fn Error(&self, cx: &mut CurrentRealm, e: SafeHandleValue) {
1050 let Some(stream) = self.stream.get() else {
1052 unreachable!("Controller should have a stream");
1053 };
1054
1055 if !stream.is_writable() {
1057 return;
1058 }
1059
1060 let global = GlobalScope::from_current_realm(cx);
1061
1062 self.error(cx, &stream, e, &global);
1064 }
1065
1066 fn Signal(&self) -> DomRoot<AbortSignal> {
1068 self.abort_controller.signal()
1070 }
1071}