1use std::cell::{Cell, RefCell};
6use std::ptr;
7use std::rc::Rc;
8
9use dom_struct::dom_struct;
10use js::context::JSContext;
11use js::jsapi::{Heap, IsPromiseObject, JSObject};
12use js::jsval::{JSVal, UndefinedValue};
13use js::realm::CurrentRealm;
14use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle};
15use script_bindings::reflector::{Reflector, reflect_dom_object};
16
17use crate::dom::bindings::callback::ExceptionHandling;
18use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
19use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::{
20 UnderlyingSinkAbortCallback, UnderlyingSinkCloseCallback, UnderlyingSinkStartCallback,
21 UnderlyingSinkWriteCallback,
22};
23use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultControllerBinding::WritableStreamDefaultControllerMethods;
24use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
25use crate::dom::bindings::reflector::DomGlobal;
26use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
27use crate::dom::globalscope::GlobalScope;
28use crate::dom::messageport::MessagePort;
29use crate::dom::promise::Promise;
30use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
31use crate::dom::readablestreamdefaultcontroller::{EnqueuedValue, QueueWithSizes, ValueWithSize};
32use crate::dom::stream::writablestream::WritableStream;
33use crate::dom::types::{AbortController, AbortSignal, TransformStream};
34use crate::realms::enter_auto_realm;
35use crate::script_runtime::CanGc;
36
37impl js::gc::Rootable for CloseAlgorithmFulfillmentHandler {}
38
39#[derive(Clone, JSTraceable, MallocSizeOf)]
42#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
43struct CloseAlgorithmFulfillmentHandler {
44 stream: Dom<WritableStream>,
45}
46
47impl Callback for CloseAlgorithmFulfillmentHandler {
48 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
49 let can_gc = CanGc::from_cx(cx);
50 let stream = self.stream.as_rooted();
51
52 stream.finish_in_flight_close(cx.into(), can_gc);
54 }
55}
56
57impl js::gc::Rootable for CloseAlgorithmRejectionHandler {}
58
59#[derive(Clone, JSTraceable, MallocSizeOf)]
62#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
63struct CloseAlgorithmRejectionHandler {
64 stream: Dom<WritableStream>,
65}
66
67impl Callback for CloseAlgorithmRejectionHandler {
68 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
69 let stream = self.stream.as_rooted();
70
71 let global = GlobalScope::from_current_realm(cx);
72
73 stream.finish_in_flight_close_with_error(cx, &global, v);
75 }
76}
77
78impl js::gc::Rootable for StartAlgorithmFulfillmentHandler {}
79
80#[derive(Clone, JSTraceable, MallocSizeOf)]
83#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
84struct StartAlgorithmFulfillmentHandler {
85 controller: Dom<WritableStreamDefaultController>,
86}
87
88impl Callback for StartAlgorithmFulfillmentHandler {
89 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
92 let controller = self.controller.as_rooted();
93 let stream = controller
94 .stream
95 .get()
96 .expect("Controller should have a stream.");
97
98 assert!(stream.is_erroring() || stream.is_writable());
100
101 controller.started.set(true);
103
104 let global = GlobalScope::from_current_realm(cx);
105
106 controller.advance_queue_if_needed(cx, &global)
108 }
109}
110
111impl js::gc::Rootable for StartAlgorithmRejectionHandler {}
112
113#[derive(Clone, JSTraceable, MallocSizeOf)]
116#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
117struct StartAlgorithmRejectionHandler {
118 controller: Dom<WritableStreamDefaultController>,
119}
120
121impl Callback for StartAlgorithmRejectionHandler {
122 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
125 let controller = self.controller.as_rooted();
126 let stream = controller
127 .stream
128 .get()
129 .expect("Controller should have a stream.");
130
131 assert!(stream.is_erroring() || stream.is_writable());
133
134 controller.started.set(true);
136
137 let global = GlobalScope::from_current_realm(cx);
138
139 stream.deal_with_rejection(cx, &global, v);
141 }
142}
143
144impl js::gc::Rootable for TransferBackPressurePromiseReaction {}
145
146#[derive(JSTraceable, MallocSizeOf)]
149#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
150struct TransferBackPressurePromiseReaction {
151 #[conditional_malloc_size_of]
153 result_promise: Rc<Promise>,
154
155 #[ignore_malloc_size_of = "nested Rc"]
157 backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
158
159 #[ignore_malloc_size_of = "mozjs"]
161 chunk: Box<Heap<JSVal>>,
162
163 port: Dom<MessagePort>,
165}
166
167impl Callback for TransferBackPressurePromiseReaction {
168 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
170 let can_gc = CanGc::from_cx(cx);
171 let global = self.result_promise.global();
172 let promise = Promise::new2(cx, &global);
174 *self.backpressure_promise.borrow_mut() = Some(promise);
175
176 rooted!(&in(cx) let mut chunk = UndefinedValue());
178 chunk.set(self.chunk.get());
179 let result = self
180 .port
181 .pack_and_post_message_handling_error(cx, "chunk", chunk.handle());
182
183 if let Err(error) = result {
185 global.disentangle_port(cx, &self.port);
187
188 self.result_promise.reject_error(error, can_gc);
190 } else {
191 self.result_promise.resolve_native(&(), can_gc);
193 }
194 }
195}
196
197impl js::gc::Rootable for WriteAlgorithmFulfillmentHandler {}
198
199#[derive(Clone, JSTraceable, MallocSizeOf)]
202#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
203struct WriteAlgorithmFulfillmentHandler {
204 controller: Dom<WritableStreamDefaultController>,
205}
206
207impl Callback for WriteAlgorithmFulfillmentHandler {
208 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
209 let can_gc = CanGc::from_cx(cx);
210 let controller = self.controller.as_rooted();
211 let stream = controller
212 .stream
213 .get()
214 .expect("Controller should have a stream.");
215
216 stream.finish_in_flight_write(can_gc);
218
219 assert!(stream.is_erroring() || stream.is_writable());
222
223 rooted!(&in(cx) let mut rval = UndefinedValue());
225 controller
226 .queue
227 .dequeue_value(cx.into(), Some(rval.handle_mut()), can_gc);
228
229 let global = GlobalScope::from_current_realm(cx);
230
231 if !stream.close_queued_or_in_flight() && stream.is_writable() {
233 let backpressure = controller.get_backpressure();
235
236 stream.update_backpressure(backpressure, &global, can_gc);
238 }
239
240 controller.advance_queue_if_needed(cx, &global)
242 }
243}
244
245impl js::gc::Rootable for WriteAlgorithmRejectionHandler {}
246
247#[derive(Clone, JSTraceable, MallocSizeOf)]
250#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
251struct WriteAlgorithmRejectionHandler {
252 controller: Dom<WritableStreamDefaultController>,
253}
254
255impl Callback for WriteAlgorithmRejectionHandler {
256 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
257 let controller = self.controller.as_rooted();
258 let stream = controller
259 .stream
260 .get()
261 .expect("Controller should have a stream.");
262
263 if stream.is_writable() {
265 controller.clear_algorithms();
267 }
268
269 let global = GlobalScope::from_current_realm(cx);
270
271 stream.finish_in_flight_write_with_error(cx, &global, v);
273 }
274}
275
276#[derive(JSTraceable, PartialEq)]
278#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
279pub enum UnderlyingSinkType {
280 Js {
282 abort: RefCell<Option<Rc<UnderlyingSinkAbortCallback>>>,
284
285 start: RefCell<Option<Rc<UnderlyingSinkStartCallback>>>,
286
287 close: RefCell<Option<Rc<UnderlyingSinkCloseCallback>>>,
289
290 write: RefCell<Option<Rc<UnderlyingSinkWriteCallback>>>,
292 },
293 Transfer {
296 backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
297 port: Dom<MessagePort>,
298 },
299 Transform(Dom<TransformStream>, Rc<Promise>),
301}
302
303impl UnderlyingSinkType {
304 pub(crate) fn new_js(
305 abort: Option<Rc<UnderlyingSinkAbortCallback>>,
306 start: Option<Rc<UnderlyingSinkStartCallback>>,
307 close: Option<Rc<UnderlyingSinkCloseCallback>>,
308 write: Option<Rc<UnderlyingSinkWriteCallback>>,
309 ) -> Self {
310 UnderlyingSinkType::Js {
311 abort: RefCell::new(abort),
312 start: RefCell::new(start),
313 close: RefCell::new(close),
314 write: RefCell::new(write),
315 }
316 }
317}
318
319#[dom_struct]
321pub struct WritableStreamDefaultController {
322 reflector_: Reflector,
323
324 #[ignore_malloc_size_of = "underlying_sink_type"]
327 underlying_sink_type: UnderlyingSinkType,
328
329 #[ignore_malloc_size_of = "mozjs"]
331 underlying_sink_obj: Heap<*mut JSObject>,
332
333 queue: QueueWithSizes,
335
336 started: Cell<bool>,
338
339 strategy_hwm: f64,
341
342 #[ignore_malloc_size_of = "QueuingStrategySize"]
344 strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>,
345
346 stream: MutNullableDom<WritableStream>,
348
349 abort_controller: Dom<AbortController>,
351}
352
353impl WritableStreamDefaultController {
354 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
356 fn new_inherited(
357 global: &GlobalScope,
358 underlying_sink_type: UnderlyingSinkType,
359 strategy_hwm: f64,
360 strategy_size: Rc<QueuingStrategySize>,
361 can_gc: CanGc,
362 ) -> WritableStreamDefaultController {
363 WritableStreamDefaultController {
364 reflector_: Reflector::new(),
365 underlying_sink_type,
366 queue: Default::default(),
367 stream: Default::default(),
368 underlying_sink_obj: Default::default(),
369 strategy_hwm,
370 strategy_size: RefCell::new(Some(strategy_size)),
371 started: Default::default(),
372 abort_controller: Dom::from_ref(&AbortController::new_with_proto(global, None, can_gc)),
373 }
374 }
375
376 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
377 pub(crate) fn new(
378 global: &GlobalScope,
379 underlying_sink_type: UnderlyingSinkType,
380 strategy_hwm: f64,
381 strategy_size: Rc<QueuingStrategySize>,
382 can_gc: CanGc,
383 ) -> DomRoot<WritableStreamDefaultController> {
384 reflect_dom_object(
385 Box::new(WritableStreamDefaultController::new_inherited(
386 global,
387 underlying_sink_type,
388 strategy_hwm,
389 strategy_size,
390 can_gc,
391 )),
392 global,
393 can_gc,
394 )
395 }
396
397 pub(crate) fn started(&self) -> bool {
398 self.started.get()
399 }
400
401 pub(crate) fn set_underlying_sink_this_object(&self, this_object: SafeHandleObject) {
403 self.underlying_sink_obj.set(*this_object);
404 }
405
406 pub(crate) fn signal_abort(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) {
408 self.abort_controller.signal_abort(cx, reason);
409 }
410
411 fn clear_algorithms(&self) {
413 match &self.underlying_sink_type {
414 UnderlyingSinkType::Js {
415 abort,
416 start: _,
417 close,
418 write,
419 } => {
420 write.borrow_mut().take();
422
423 close.borrow_mut().take();
425
426 abort.borrow_mut().take();
428 },
429 UnderlyingSinkType::Transfer {
430 backpressure_promise,
431 ..
432 } => {
433 backpressure_promise.borrow_mut().take();
434 },
435 UnderlyingSinkType::Transform(_, _) => {
436 return;
437 },
438 }
439
440 self.strategy_size.borrow_mut().take();
442 }
443
444 pub(crate) fn setup(
446 &self,
447 cx: &mut JSContext,
448 global: &GlobalScope,
449 stream: &WritableStream,
450 ) -> Result<(), Error> {
451 stream.assert_no_controller();
456
457 self.stream.set(Some(stream));
459
460 stream.set_default_controller(self);
462
463 let backpressure = self.get_backpressure();
483
484 stream.update_backpressure(backpressure, global, CanGc::from_cx(cx));
486
487 let start_promise = self.start_algorithm(cx, global)?;
490
491 let rooted_default_controller = DomRoot::from_ref(self);
492
493 rooted!(&in(cx) let mut fulfillment_handler = Some(StartAlgorithmFulfillmentHandler {
495 controller: Dom::from_ref(&rooted_default_controller),
496 }));
497
498 rooted!(&in(cx) let mut rejection_handler = Some(StartAlgorithmRejectionHandler {
500 controller: Dom::from_ref(&rooted_default_controller),
501 }));
502
503 let handler = PromiseNativeHandler::new(
504 global,
505 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
506 rejection_handler.take().map(|h| Box::new(h) as Box<_>),
507 CanGc::from_cx(cx),
508 );
509 let mut realm = enter_auto_realm(cx, global);
510 let cx = &mut realm.current_realm();
511 start_promise.append_native_handler(cx, &handler);
512
513 Ok(())
514 }
515
516 pub(crate) fn close(&self, cx: &mut JSContext, global: &GlobalScope) {
518 self.queue
520 .enqueue_value_with_size(EnqueuedValue::CloseSentinel)
521 .expect("Enqueuing the close sentinel should not fail.");
522 self.advance_queue_if_needed(cx, global);
524 }
525
526 #[expect(unsafe_code)]
527 fn start_algorithm(&self, cx: &mut JSContext, global: &GlobalScope) -> Fallible<Rc<Promise>> {
528 match &self.underlying_sink_type {
529 UnderlyingSinkType::Js {
530 start,
531 abort: _,
532 close: _,
533 write: _,
534 } => {
535 let algo = start.borrow().clone();
536 let start_promise = if let Some(start) = algo {
537 rooted!(&in(cx) let mut result_object = ptr::null_mut::<JSObject>());
538 rooted!(&in(cx) let mut result: JSVal);
539 rooted!(&in(cx) let this_object = self.underlying_sink_obj.get());
540 start.Call_(
541 cx,
542 &this_object.handle(),
543 self,
544 result.handle_mut(),
545 ExceptionHandling::Rethrow,
546 )?;
547 let is_promise = unsafe {
548 if result.is_object() {
549 result_object.set(result.to_object());
550 IsPromiseObject(result_object.handle().into_handle())
551 } else {
552 false
553 }
554 };
555 if is_promise {
556 Promise::new_with_js_promise(result_object.handle(), cx.into())
557 } else {
558 Promise::new_resolved(global, cx.into(), result.get(), CanGc::from_cx(cx))
559 }
560 } else {
561 Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
563 };
564
565 Ok(start_promise)
566 },
567 UnderlyingSinkType::Transfer { .. } => {
568 Ok(Promise::new_resolved(
570 global,
571 cx.into(),
572 (),
573 CanGc::from_cx(cx),
574 ))
575 },
576 UnderlyingSinkType::Transform(_, start_promise) => {
577 Ok(start_promise.clone())
579 },
580 }
581 }
582
583 pub(crate) fn abort_steps(
585 &self,
586 cx: &mut JSContext,
587 global: &GlobalScope,
588 reason: SafeHandleValue,
589 ) -> Rc<Promise> {
590 let result = match &self.underlying_sink_type {
591 UnderlyingSinkType::Js {
592 abort,
593 start: _,
594 close: _,
595 write: _,
596 } => {
597 rooted!(&in(cx) let this_object = self.underlying_sink_obj.get());
598 let algo = abort.borrow().clone();
599 let result = if let Some(algo) = algo {
601 algo.Call_(
602 cx,
603 &this_object.handle(),
604 Some(reason),
605 ExceptionHandling::Rethrow,
606 )
607 } else {
608 Ok(Promise::new_resolved(
609 global,
610 cx.into(),
611 (),
612 CanGc::from_cx(cx),
613 ))
614 };
615 result.unwrap_or_else(|e| {
616 let promise = Promise::new(global, CanGc::from_cx(cx));
617 promise.reject_error(e, CanGc::from_cx(cx));
618 promise
619 })
620 },
621 UnderlyingSinkType::Transfer { port, .. } => {
622 let result = port.pack_and_post_message_handling_error(cx, "error", reason);
627
628 global.disentangle_port(cx, port);
630
631 let promise = Promise::new(global, CanGc::from_cx(cx));
632
633 if let Err(error) = result {
635 promise.reject_error(error, CanGc::from_cx(cx));
636 } else {
637 promise.resolve_native(&(), CanGc::from_cx(cx));
639 }
640 promise
641 },
642 UnderlyingSinkType::Transform(stream, _) => {
643 stream
645 .transform_stream_default_sink_abort_algorithm(cx, global, reason)
646 .expect("Transform stream default sink abort algorithm should not fail.")
647 },
648 };
649
650 self.clear_algorithms();
652
653 result
654 }
655
656 fn call_write_algorithm(
658 &self,
659 cx: &mut JSContext,
660 chunk: SafeHandleValue,
661 global: &GlobalScope,
662 ) -> Rc<Promise> {
663 match &self.underlying_sink_type {
664 UnderlyingSinkType::Js {
665 abort: _,
666 start: _,
667 close: _,
668 write,
669 } => {
670 rooted!(&in(cx) let this_object = self.underlying_sink_obj.get());
671 let algo = write.borrow().clone();
672 let result = if let Some(algo) = algo {
673 algo.Call_(
674 cx,
675 &this_object.handle(),
676 chunk,
677 self,
678 ExceptionHandling::Rethrow,
679 )
680 } else {
681 Ok(Promise::new_resolved(
682 global,
683 cx.into(),
684 (),
685 CanGc::from_cx(cx),
686 ))
687 };
688 result.unwrap_or_else(|e| {
689 let promise = Promise::new2(cx, global);
690 promise.reject_error(e, CanGc::from_cx(cx));
691 promise
692 })
693 },
694 UnderlyingSinkType::Transfer {
695 backpressure_promise,
696 port,
697 } => {
698 if backpressure_promise.borrow().is_none() {
704 let promise = Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
705 *backpressure_promise.borrow_mut() = Some(promise);
706 }
707
708 let result_promise = Promise::new2(cx, global);
710 rooted!(&in(cx) let mut fulfillment_handler = Some(TransferBackPressurePromiseReaction {
711 port: port.clone(),
712 backpressure_promise: backpressure_promise.clone(),
713 chunk: Heap::boxed(chunk.get()),
714 result_promise: result_promise.clone(),
715 }));
716 let handler = PromiseNativeHandler::new(
717 global,
718 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
719 None,
720 CanGc::from_cx(cx),
721 );
722 let mut realm = enter_auto_realm(cx, global);
723 let realm = &mut realm.current_realm();
724 backpressure_promise
725 .borrow()
726 .as_ref()
727 .expect("Promise must be some by now.")
728 .append_native_handler(realm, &handler);
729 result_promise
730 },
731 UnderlyingSinkType::Transform(stream, _) => {
732 stream
734 .transform_stream_default_sink_write_algorithm(cx, global, chunk)
735 .expect("Transform stream default sink write algorithm should not fail.")
736 },
737 }
738 }
739
740 fn call_close_algorithm(&self, cx: &mut JSContext, global: &GlobalScope) -> Rc<Promise> {
742 match &self.underlying_sink_type {
743 UnderlyingSinkType::Js {
744 abort: _,
745 start: _,
746 close,
747 write: _,
748 } => {
749 rooted!(&in(cx) let mut this_object = ptr::null_mut::<JSObject>());
750 this_object.set(self.underlying_sink_obj.get());
751 let algo = close.borrow().clone();
752 let result = if let Some(algo) = algo {
753 algo.Call_(cx, &this_object.handle(), ExceptionHandling::Rethrow)
754 } else {
755 Ok(Promise::new_resolved(
756 global,
757 cx.into(),
758 (),
759 CanGc::from_cx(cx),
760 ))
761 };
762 result.unwrap_or_else(|e| {
763 let promise = Promise::new2(cx, global);
764 promise.reject_error(e, CanGc::from_cx(cx));
765 promise
766 })
767 },
768 UnderlyingSinkType::Transfer { port, .. } => {
769 rooted!(&in(cx) let mut value = UndefinedValue());
774 port.pack_and_post_message(cx, "close", value.handle())
775 .expect("Sending close should not fail.");
776
777 global.disentangle_port(cx, port);
779
780 Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
782 },
783 UnderlyingSinkType::Transform(stream, _) => {
784 stream
786 .transform_stream_default_sink_close_algorithm(cx, global)
787 .expect("Transform stream default sink close algorithm should not fail.")
788 },
789 }
790 }
791
792 pub(crate) fn process_close(&self, cx: &mut JSContext, global: &GlobalScope) {
794 let Some(stream) = self.stream.get() else {
796 unreachable!("Controller should have a stream");
797 };
798
799 stream.mark_close_request_in_flight();
801
802 self.queue
804 .dequeue_value(cx.into(), None, CanGc::from_cx(cx));
805
806 assert!(self.queue.is_empty());
808
809 let sink_close_promise = self.call_close_algorithm(cx, global);
811
812 self.clear_algorithms();
814
815 rooted!(&in(cx) let mut fulfillment_handler = Some(CloseAlgorithmFulfillmentHandler {
817 stream: Dom::from_ref(&stream),
818 }));
819
820 rooted!(&in(cx) let mut rejection_handler = Some(CloseAlgorithmRejectionHandler {
822 stream: Dom::from_ref(&stream),
823 }));
824
825 let handler = PromiseNativeHandler::new(
827 global,
828 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
829 rejection_handler.take().map(|h| Box::new(h) as Box<_>),
830 CanGc::from_cx(cx),
831 );
832 let mut realm = enter_auto_realm(cx, global);
833 let realm = &mut realm.current_realm();
834 sink_close_promise.append_native_handler(realm, &handler);
835 }
836
837 fn advance_queue_if_needed(&self, cx: &mut JSContext, global: &GlobalScope) {
839 let Some(stream) = self.stream.get() else {
841 unreachable!("Controller should have a stream");
842 };
843
844 if !self.started.get() {
846 return;
847 }
848
849 if stream.has_in_flight_write_request() {
851 return;
852 }
853
854 assert!(!(stream.is_errored() || stream.is_closed()));
858
859 if stream.is_erroring() {
861 stream.finish_erroring(cx, global);
863
864 return;
866 }
867
868 rooted!(&in(cx) let mut value = UndefinedValue());
870 let is_closed = {
871 if self.queue.is_empty() {
873 return;
874 }
875 self.queue
876 .peek_queue_value(cx.into(), value.handle_mut(), CanGc::from_cx(cx))
877 };
878
879 if is_closed {
880 self.process_close(cx, global);
882 } else {
883 self.process_write(cx, value.handle(), global);
885 };
886 }
887
888 pub(crate) fn perform_error_steps(&self) {
890 self.queue.reset();
892 }
893
894 fn process_write(&self, cx: &mut JSContext, chunk: SafeHandleValue, global: &GlobalScope) {
896 let Some(stream) = self.stream.get() else {
898 unreachable!("Controller should have a stream");
899 };
900
901 stream.mark_first_write_request_in_flight();
903
904 let sink_write_promise = self.call_write_algorithm(cx, chunk, global);
906
907 rooted!(&in(cx) let mut fulfillment_handler = Some(WriteAlgorithmFulfillmentHandler {
909 controller: Dom::from_ref(self),
910 }));
911
912 rooted!(&in(cx) let mut rejection_handler = Some(WriteAlgorithmRejectionHandler {
914 controller: Dom::from_ref(self),
915 }));
916
917 let handler = PromiseNativeHandler::new(
919 global,
920 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
921 rejection_handler.take().map(|h| Box::new(h) as Box<_>),
922 CanGc::from_cx(cx),
923 );
924 let mut realm = enter_auto_realm(cx, global);
925 let realm = &mut realm.current_realm();
926 sink_write_promise.append_native_handler(realm, &handler);
927 }
928
929 pub(crate) fn get_desired_size(&self) -> f64 {
931 let desired_size = self.strategy_hwm - self.queue.total_size.get().clamp(0.0, f64::MAX);
933 desired_size.clamp(desired_size, self.strategy_hwm)
934 }
935
936 fn get_backpressure(&self) -> bool {
938 let desired_size = self.get_desired_size();
940
941 desired_size == 0.0 || desired_size.is_sign_negative()
943 }
944
945 pub(crate) fn get_chunk_size(
947 &self,
948 cx: &mut JSContext,
949 global: &GlobalScope,
950 chunk: SafeHandleValue,
951 ) -> f64 {
952 let Some(strategy_size) = self.strategy_size.borrow().clone() else {
954 let Some(stream) = self.stream.get() else {
956 unreachable!("Controller should have a stream");
957 };
958 assert!(!stream.is_writable());
959
960 return 1.0;
962 };
963
964 let result = strategy_size.Call__(cx, chunk, ExceptionHandling::Rethrow);
967
968 match result {
969 Ok(size) => size,
971 Err(error) => {
972 rooted!(&in(cx) let mut rooted_error = UndefinedValue());
977 error.to_jsval(
978 cx.into(),
979 global,
980 rooted_error.handle_mut(),
981 CanGc::from_cx(cx),
982 );
983 self.error_if_needed(cx, rooted_error.handle(), global);
984
985 1.0
987 },
988 }
989 }
990
991 pub(crate) fn write(
993 &self,
994 cx: &mut JSContext,
995 global: &GlobalScope,
996 chunk: SafeHandleValue,
997 chunk_size: f64,
998 ) {
999 let enqueue_result = self
1001 .queue
1002 .enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize {
1003 value: Heap::boxed(chunk.get()),
1004 size: chunk_size,
1005 }));
1006
1007 if let Err(error) = enqueue_result {
1009 rooted!(&in(cx) let mut rooted_error = UndefinedValue());
1012 error.to_jsval(
1013 cx.into(),
1014 global,
1015 rooted_error.handle_mut(),
1016 CanGc::from_cx(cx),
1017 );
1018 self.error_if_needed(cx, rooted_error.handle(), global);
1019
1020 return;
1022 }
1023
1024 let Some(stream) = self.stream.get() else {
1026 unreachable!("Controller should have a stream");
1027 };
1028
1029 if !stream.close_queued_or_in_flight() && stream.is_writable() {
1031 let backpressure = self.get_backpressure();
1033
1034 stream.update_backpressure(backpressure, global, CanGc::from_cx(cx));
1036 }
1037
1038 self.advance_queue_if_needed(cx, global);
1040 }
1041
1042 pub(crate) fn error_if_needed(
1044 &self,
1045 cx: &mut JSContext,
1046 error: SafeHandleValue,
1047 global: &GlobalScope,
1048 ) {
1049 let Some(stream) = self.stream.get() else {
1051 unreachable!("Controller should have a stream");
1052 };
1053
1054 if stream.is_writable() {
1056 self.error(cx, &stream, error, global);
1058 }
1059 }
1060
1061 fn error(
1063 &self,
1064 cx: &mut JSContext,
1065 stream: &WritableStream,
1066 e: SafeHandleValue,
1067 global: &GlobalScope,
1068 ) {
1069 assert!(stream.is_writable());
1074
1075 self.clear_algorithms();
1077
1078 stream.start_erroring(cx, global, e);
1080 }
1081}
1082
1083impl WritableStreamDefaultControllerMethods<crate::DomTypeHolder>
1084 for WritableStreamDefaultController
1085{
1086 fn Error(&self, cx: &mut CurrentRealm, e: SafeHandleValue) {
1088 let Some(stream) = self.stream.get() else {
1090 unreachable!("Controller should have a stream");
1091 };
1092
1093 if !stream.is_writable() {
1095 return;
1096 }
1097
1098 let global = GlobalScope::from_current_realm(cx);
1099
1100 self.error(cx, &stream, e, &global);
1102 }
1103
1104 fn Signal(&self) -> DomRoot<AbortSignal> {
1106 self.abort_controller.signal()
1108 }
1109}