1use std::cell::{Cell, RefCell};
6use std::ptr;
7use std::rc::Rc;
8
9use dom_struct::dom_struct;
10use js::jsapi::{Heap, IsPromiseObject, JSObject};
11use js::jsval::{JSVal, UndefinedValue};
12use js::realm::CurrentRealm;
13use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle};
14
15use crate::dom::bindings::callback::ExceptionHandling;
16use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
17use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::{
18 UnderlyingSinkAbortCallback, UnderlyingSinkCloseCallback, UnderlyingSinkStartCallback,
19 UnderlyingSinkWriteCallback,
20};
21use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultControllerBinding::WritableStreamDefaultControllerMethods;
22use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
23use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
24use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
25use crate::dom::globalscope::GlobalScope;
26use crate::dom::messageport::MessagePort;
27use crate::dom::promise::Promise;
28use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
29use crate::dom::readablestreamdefaultcontroller::{EnqueuedValue, QueueWithSizes, ValueWithSize};
30use crate::dom::stream::writablestream::WritableStream;
31use crate::dom::types::{AbortController, AbortSignal, TransformStream};
32use crate::realms::{InRealm, enter_realm};
33use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
34
35impl js::gc::Rootable for CloseAlgorithmFulfillmentHandler {}
36
37#[derive(Clone, JSTraceable, MallocSizeOf)]
40#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
41struct CloseAlgorithmFulfillmentHandler {
42 stream: Dom<WritableStream>,
43}
44
45impl Callback for CloseAlgorithmFulfillmentHandler {
46 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
47 let can_gc = CanGc::from_cx(cx);
48 let stream = self.stream.as_rooted();
49
50 stream.finish_in_flight_close(cx.into(), can_gc);
52 }
53}
54
55impl js::gc::Rootable for CloseAlgorithmRejectionHandler {}
56
57#[derive(Clone, JSTraceable, MallocSizeOf)]
60#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
61struct CloseAlgorithmRejectionHandler {
62 stream: Dom<WritableStream>,
63}
64
65impl Callback for CloseAlgorithmRejectionHandler {
66 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
67 let stream = self.stream.as_rooted();
68
69 let global = GlobalScope::from_current_realm(cx);
70
71 stream.finish_in_flight_close_with_error(cx.into(), &global, v, CanGc::from_cx(cx));
73 }
74}
75
76impl js::gc::Rootable for StartAlgorithmFulfillmentHandler {}
77
78#[derive(Clone, JSTraceable, MallocSizeOf)]
81#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
82struct StartAlgorithmFulfillmentHandler {
83 controller: Dom<WritableStreamDefaultController>,
84}
85
86impl Callback for StartAlgorithmFulfillmentHandler {
87 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
90 let controller = self.controller.as_rooted();
91 let stream = controller
92 .stream
93 .get()
94 .expect("Controller should have a stream.");
95
96 assert!(stream.is_erroring() || stream.is_writable());
98
99 controller.started.set(true);
101
102 let global = GlobalScope::from_current_realm(cx);
103
104 controller.advance_queue_if_needed(cx.into(), &global, CanGc::from_cx(cx))
106 }
107}
108
109impl js::gc::Rootable for StartAlgorithmRejectionHandler {}
110
111#[derive(Clone, JSTraceable, MallocSizeOf)]
114#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
115struct StartAlgorithmRejectionHandler {
116 controller: Dom<WritableStreamDefaultController>,
117}
118
119impl Callback for StartAlgorithmRejectionHandler {
120 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
123 let controller = self.controller.as_rooted();
124 let stream = controller
125 .stream
126 .get()
127 .expect("Controller should have a stream.");
128
129 assert!(stream.is_erroring() || stream.is_writable());
131
132 controller.started.set(true);
134
135 let global = GlobalScope::from_current_realm(cx);
136
137 stream.deal_with_rejection(cx.into(), &global, v, CanGc::from_cx(cx));
139 }
140}
141
142impl js::gc::Rootable for TransferBackPressurePromiseReaction {}
143
144#[derive(JSTraceable, MallocSizeOf)]
147#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
148struct TransferBackPressurePromiseReaction {
149 #[conditional_malloc_size_of]
151 result_promise: Rc<Promise>,
152
153 #[ignore_malloc_size_of = "nested Rc"]
155 backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
156
157 #[ignore_malloc_size_of = "mozjs"]
159 chunk: Box<Heap<JSVal>>,
160
161 port: Dom<MessagePort>,
163}
164
165impl Callback for TransferBackPressurePromiseReaction {
166 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
168 let can_gc = CanGc::from_cx(cx);
169 let global = self.result_promise.global();
170 let promise = Promise::new2(cx, &global);
172 *self.backpressure_promise.borrow_mut() = Some(promise);
173
174 rooted!(&in(cx) let mut chunk = UndefinedValue());
176 chunk.set(self.chunk.get());
177 let result =
178 self.port
179 .pack_and_post_message_handling_error("chunk", chunk.handle(), can_gc);
180
181 if let Err(error) = result {
183 global.disentangle_port(&self.port, can_gc);
185
186 self.result_promise.reject_error(error, can_gc);
188 } else {
189 self.result_promise.resolve_native(&(), can_gc);
191 }
192 }
193}
194
195impl js::gc::Rootable for WriteAlgorithmFulfillmentHandler {}
196
197#[derive(Clone, JSTraceable, MallocSizeOf)]
200#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
201struct WriteAlgorithmFulfillmentHandler {
202 controller: Dom<WritableStreamDefaultController>,
203}
204
205impl Callback for WriteAlgorithmFulfillmentHandler {
206 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
207 let can_gc = CanGc::from_cx(cx);
208 let controller = self.controller.as_rooted();
209 let stream = controller
210 .stream
211 .get()
212 .expect("Controller should have a stream.");
213
214 stream.finish_in_flight_write(can_gc);
216
217 assert!(stream.is_erroring() || stream.is_writable());
220
221 rooted!(&in(cx) let mut rval = UndefinedValue());
223 controller
224 .queue
225 .dequeue_value(cx.into(), Some(rval.handle_mut()), can_gc);
226
227 let global = GlobalScope::from_current_realm(cx);
228
229 if !stream.close_queued_or_in_flight() && stream.is_writable() {
231 let backpressure = controller.get_backpressure();
233
234 stream.update_backpressure(backpressure, &global, can_gc);
236 }
237
238 controller.advance_queue_if_needed(cx.into(), &global, can_gc)
240 }
241}
242
243impl js::gc::Rootable for WriteAlgorithmRejectionHandler {}
244
245#[derive(Clone, JSTraceable, MallocSizeOf)]
248#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
249struct WriteAlgorithmRejectionHandler {
250 controller: Dom<WritableStreamDefaultController>,
251}
252
253impl Callback for WriteAlgorithmRejectionHandler {
254 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
255 let controller = self.controller.as_rooted();
256 let stream = controller
257 .stream
258 .get()
259 .expect("Controller should have a stream.");
260
261 if stream.is_writable() {
263 controller.clear_algorithms();
265 }
266
267 let global = GlobalScope::from_current_realm(cx);
268
269 stream.finish_in_flight_write_with_error(cx.into(), &global, v, CanGc::from_cx(cx));
271 }
272}
273
274#[derive(JSTraceable, PartialEq)]
276#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
277pub enum UnderlyingSinkType {
278 Js {
280 abort: RefCell<Option<Rc<UnderlyingSinkAbortCallback>>>,
282
283 start: RefCell<Option<Rc<UnderlyingSinkStartCallback>>>,
284
285 close: RefCell<Option<Rc<UnderlyingSinkCloseCallback>>>,
287
288 write: RefCell<Option<Rc<UnderlyingSinkWriteCallback>>>,
290 },
291 Transfer {
294 backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
295 port: Dom<MessagePort>,
296 },
297 Transform(Dom<TransformStream>, Rc<Promise>),
299}
300
301impl UnderlyingSinkType {
302 pub(crate) fn new_js(
303 abort: Option<Rc<UnderlyingSinkAbortCallback>>,
304 start: Option<Rc<UnderlyingSinkStartCallback>>,
305 close: Option<Rc<UnderlyingSinkCloseCallback>>,
306 write: Option<Rc<UnderlyingSinkWriteCallback>>,
307 ) -> Self {
308 UnderlyingSinkType::Js {
309 abort: RefCell::new(abort),
310 start: RefCell::new(start),
311 close: RefCell::new(close),
312 write: RefCell::new(write),
313 }
314 }
315}
316
317#[dom_struct]
319pub struct WritableStreamDefaultController {
320 reflector_: Reflector,
321
322 #[ignore_malloc_size_of = "underlying_sink_type"]
325 underlying_sink_type: UnderlyingSinkType,
326
327 #[ignore_malloc_size_of = "mozjs"]
329 underlying_sink_obj: Heap<*mut JSObject>,
330
331 queue: QueueWithSizes,
333
334 started: Cell<bool>,
336
337 strategy_hwm: f64,
339
340 #[ignore_malloc_size_of = "QueuingStrategySize"]
342 strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>,
343
344 stream: MutNullableDom<WritableStream>,
346
347 abort_controller: Dom<AbortController>,
349}
350
351impl WritableStreamDefaultController {
352 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
354 fn new_inherited(
355 global: &GlobalScope,
356 underlying_sink_type: UnderlyingSinkType,
357 strategy_hwm: f64,
358 strategy_size: Rc<QueuingStrategySize>,
359 can_gc: CanGc,
360 ) -> WritableStreamDefaultController {
361 WritableStreamDefaultController {
362 reflector_: Reflector::new(),
363 underlying_sink_type,
364 queue: Default::default(),
365 stream: Default::default(),
366 underlying_sink_obj: Default::default(),
367 strategy_hwm,
368 strategy_size: RefCell::new(Some(strategy_size)),
369 started: Default::default(),
370 abort_controller: Dom::from_ref(&AbortController::new_with_proto(global, None, can_gc)),
371 }
372 }
373
374 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
375 pub(crate) fn new(
376 global: &GlobalScope,
377 underlying_sink_type: UnderlyingSinkType,
378 strategy_hwm: f64,
379 strategy_size: Rc<QueuingStrategySize>,
380 can_gc: CanGc,
381 ) -> DomRoot<WritableStreamDefaultController> {
382 reflect_dom_object(
383 Box::new(WritableStreamDefaultController::new_inherited(
384 global,
385 underlying_sink_type,
386 strategy_hwm,
387 strategy_size,
388 can_gc,
389 )),
390 global,
391 can_gc,
392 )
393 }
394
395 pub(crate) fn started(&self) -> bool {
396 self.started.get()
397 }
398
399 pub(crate) fn set_underlying_sink_this_object(&self, this_object: SafeHandleObject) {
401 self.underlying_sink_obj.set(*this_object);
402 }
403
404 pub(crate) fn signal_abort(
406 &self,
407 cx: SafeJSContext,
408 reason: SafeHandleValue,
409 realm: InRealm,
410 can_gc: CanGc,
411 ) {
412 self.abort_controller
413 .signal_abort(cx, reason, realm, can_gc);
414 }
415
416 fn clear_algorithms(&self) {
418 match &self.underlying_sink_type {
419 UnderlyingSinkType::Js {
420 abort,
421 start: _,
422 close,
423 write,
424 } => {
425 write.borrow_mut().take();
427
428 close.borrow_mut().take();
430
431 abort.borrow_mut().take();
433 },
434 UnderlyingSinkType::Transfer {
435 backpressure_promise,
436 ..
437 } => {
438 backpressure_promise.borrow_mut().take();
439 },
440 UnderlyingSinkType::Transform(_, _) => {
441 return;
442 },
443 }
444
445 self.strategy_size.borrow_mut().take();
447 }
448
449 pub(crate) fn setup(
451 &self,
452 cx: SafeJSContext,
453 global: &GlobalScope,
454 stream: &WritableStream,
455 can_gc: CanGc,
456 ) -> Result<(), Error> {
457 stream.assert_no_controller();
462
463 self.stream.set(Some(stream));
465
466 stream.set_default_controller(self);
468
469 let backpressure = self.get_backpressure();
489
490 stream.update_backpressure(backpressure, global, can_gc);
492
493 let start_promise = self.start_algorithm(cx, global, can_gc)?;
496
497 let rooted_default_controller = DomRoot::from_ref(self);
498
499 rooted!(in(*cx) let mut fulfillment_handler = Some(StartAlgorithmFulfillmentHandler {
501 controller: Dom::from_ref(&rooted_default_controller),
502 }));
503
504 rooted!(in(*cx) let mut rejection_handler = Some(StartAlgorithmRejectionHandler {
506 controller: Dom::from_ref(&rooted_default_controller),
507 }));
508
509 let handler = PromiseNativeHandler::new(
510 global,
511 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
512 rejection_handler.take().map(|h| Box::new(h) as Box<_>),
513 can_gc,
514 );
515 let realm = enter_realm(global);
516 let comp = InRealm::Entered(&realm);
517 start_promise.append_native_handler(&handler, comp, can_gc);
518
519 Ok(())
520 }
521
522 pub(crate) fn close(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
524 self.queue
526 .enqueue_value_with_size(EnqueuedValue::CloseSentinel)
527 .expect("Enqueuing the close sentinel should not fail.");
528 self.advance_queue_if_needed(cx, global, can_gc);
530 }
531
532 #[expect(unsafe_code)]
533 fn start_algorithm(
534 &self,
535 cx: SafeJSContext,
536 global: &GlobalScope,
537 can_gc: CanGc,
538 ) -> Fallible<Rc<Promise>> {
539 match &self.underlying_sink_type {
540 UnderlyingSinkType::Js {
541 start,
542 abort: _,
543 close: _,
544 write: _,
545 } => {
546 let algo = start.borrow().clone();
547 let start_promise = if let Some(start) = algo {
548 rooted!(in(*cx) let mut result_object = ptr::null_mut::<JSObject>());
549 rooted!(in(*cx) let mut result: JSVal);
550 rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
551 start.Call_(
552 &this_object.handle(),
553 self,
554 result.handle_mut(),
555 ExceptionHandling::Rethrow,
556 can_gc,
557 )?;
558 let is_promise = unsafe {
559 if result.is_object() {
560 result_object.set(result.to_object());
561 IsPromiseObject(result_object.handle().into_handle())
562 } else {
563 false
564 }
565 };
566 if is_promise {
567 Promise::new_with_js_promise(result_object.handle(), cx)
568 } else {
569 Promise::new_resolved(global, cx, result.get(), can_gc)
570 }
571 } else {
572 Promise::new_resolved(global, cx, (), can_gc)
574 };
575
576 Ok(start_promise)
577 },
578 UnderlyingSinkType::Transfer { .. } => {
579 Ok(Promise::new_resolved(global, cx, (), can_gc))
581 },
582 UnderlyingSinkType::Transform(_, start_promise) => {
583 Ok(start_promise.clone())
585 },
586 }
587 }
588
589 pub(crate) fn abort_steps(
591 &self,
592 cx: SafeJSContext,
593 global: &GlobalScope,
594 reason: SafeHandleValue,
595 can_gc: CanGc,
596 ) -> Rc<Promise> {
597 let result = match &self.underlying_sink_type {
598 UnderlyingSinkType::Js {
599 abort,
600 start: _,
601 close: _,
602 write: _,
603 } => {
604 rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
605 let algo = abort.borrow().clone();
606 let result = if let Some(algo) = algo {
608 algo.Call_(
609 &this_object.handle(),
610 Some(reason),
611 ExceptionHandling::Rethrow,
612 can_gc,
613 )
614 } else {
615 Ok(Promise::new_resolved(global, cx, (), can_gc))
616 };
617 result.unwrap_or_else(|e| {
618 let promise = Promise::new(global, can_gc);
619 promise.reject_error(e, can_gc);
620 promise
621 })
622 },
623 UnderlyingSinkType::Transfer { port, .. } => {
624 let result = port.pack_and_post_message_handling_error("error", reason, can_gc);
629
630 global.disentangle_port(port, can_gc);
632
633 let promise = Promise::new(global, can_gc);
634
635 if let Err(error) = result {
637 promise.reject_error(error, can_gc);
638 } else {
639 promise.resolve_native(&(), can_gc);
641 }
642 promise
643 },
644 UnderlyingSinkType::Transform(stream, _) => {
645 stream
647 .transform_stream_default_sink_abort_algorithm(cx, global, reason, can_gc)
648 .expect("Transform stream default sink abort algorithm should not fail.")
649 },
650 };
651
652 self.clear_algorithms();
654
655 result
656 }
657
658 fn call_write_algorithm(
660 &self,
661 cx: SafeJSContext,
662 chunk: SafeHandleValue,
663 global: &GlobalScope,
664 can_gc: CanGc,
665 ) -> Rc<Promise> {
666 match &self.underlying_sink_type {
667 UnderlyingSinkType::Js {
668 abort: _,
669 start: _,
670 close: _,
671 write,
672 } => {
673 rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
674 let algo = write.borrow().clone();
675 let result = if let Some(algo) = algo {
676 algo.Call_(
677 &this_object.handle(),
678 chunk,
679 self,
680 ExceptionHandling::Rethrow,
681 can_gc,
682 )
683 } else {
684 Ok(Promise::new_resolved(global, cx, (), can_gc))
685 };
686 result.unwrap_or_else(|e| {
687 let promise = Promise::new(global, can_gc);
688 promise.reject_error(e, can_gc);
689 promise
690 })
691 },
692 UnderlyingSinkType::Transfer {
693 backpressure_promise,
694 port,
695 } => {
696 if backpressure_promise.borrow().is_none() {
702 let promise = Promise::new_resolved(global, cx, (), can_gc);
703 *backpressure_promise.borrow_mut() = Some(promise);
704 }
705
706 let result_promise = Promise::new(global, can_gc);
708 rooted!(in(*cx) let mut fulfillment_handler = Some(TransferBackPressurePromiseReaction {
709 port: port.clone(),
710 backpressure_promise: backpressure_promise.clone(),
711 chunk: Heap::boxed(chunk.get()),
712 result_promise: result_promise.clone(),
713 }));
714 let handler = PromiseNativeHandler::new(
715 global,
716 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
717 None,
718 can_gc,
719 );
720 let realm = enter_realm(global);
721 let comp = InRealm::Entered(&realm);
722 backpressure_promise
723 .borrow()
724 .as_ref()
725 .expect("Promise must be some by now.")
726 .append_native_handler(&handler, comp, can_gc);
727 result_promise
728 },
729 UnderlyingSinkType::Transform(stream, _) => {
730 stream
732 .transform_stream_default_sink_write_algorithm(cx, global, chunk, can_gc)
733 .expect("Transform stream default sink write algorithm should not fail.")
734 },
735 }
736 }
737
738 fn call_close_algorithm(
740 &self,
741 cx: SafeJSContext,
742 global: &GlobalScope,
743 can_gc: CanGc,
744 ) -> Rc<Promise> {
745 match &self.underlying_sink_type {
746 UnderlyingSinkType::Js {
747 abort: _,
748 start: _,
749 close,
750 write: _,
751 } => {
752 rooted!(in(*cx) let mut this_object = ptr::null_mut::<JSObject>());
753 this_object.set(self.underlying_sink_obj.get());
754 let algo = close.borrow().clone();
755 let result = if let Some(algo) = algo {
756 algo.Call_(&this_object.handle(), ExceptionHandling::Rethrow, can_gc)
757 } else {
758 Ok(Promise::new_resolved(global, cx, (), can_gc))
759 };
760 result.unwrap_or_else(|e| {
761 let promise = Promise::new(global, can_gc);
762 promise.reject_error(e, can_gc);
763 promise
764 })
765 },
766 UnderlyingSinkType::Transfer { port, .. } => {
767 rooted!(in(*cx) let mut value = UndefinedValue());
772 port.pack_and_post_message("close", value.handle(), can_gc)
773 .expect("Sending close should not fail.");
774
775 global.disentangle_port(port, can_gc);
777
778 Promise::new_resolved(global, cx, (), can_gc)
780 },
781 UnderlyingSinkType::Transform(stream, _) => {
782 stream
784 .transform_stream_default_sink_close_algorithm(cx, global, can_gc)
785 .expect("Transform stream default sink close algorithm should not fail.")
786 },
787 }
788 }
789
790 pub(crate) fn process_close(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
792 let Some(stream) = self.stream.get() else {
794 unreachable!("Controller should have a stream");
795 };
796
797 stream.mark_close_request_in_flight();
799
800 self.queue.dequeue_value(cx, None, can_gc);
802
803 assert!(self.queue.is_empty());
805
806 let sink_close_promise = self.call_close_algorithm(cx, global, can_gc);
808
809 self.clear_algorithms();
811
812 rooted!(in(*cx) let mut fulfillment_handler = Some(CloseAlgorithmFulfillmentHandler {
814 stream: Dom::from_ref(&stream),
815 }));
816
817 rooted!(in(*cx) let mut rejection_handler = Some(CloseAlgorithmRejectionHandler {
819 stream: Dom::from_ref(&stream),
820 }));
821
822 let handler = PromiseNativeHandler::new(
824 global,
825 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
826 rejection_handler.take().map(|h| Box::new(h) as Box<_>),
827 can_gc,
828 );
829 let realm = enter_realm(global);
830 let comp = InRealm::Entered(&realm);
831 sink_close_promise.append_native_handler(&handler, comp, can_gc);
832 }
833
834 fn advance_queue_if_needed(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
836 let Some(stream) = self.stream.get() else {
838 unreachable!("Controller should have a stream");
839 };
840
841 if !self.started.get() {
843 return;
844 }
845
846 if stream.has_in_flight_write_request() {
848 return;
849 }
850
851 assert!(!(stream.is_errored() || stream.is_closed()));
855
856 if stream.is_erroring() {
858 stream.finish_erroring(cx, global, can_gc);
860
861 return;
863 }
864
865 rooted!(in(*cx) let mut value = UndefinedValue());
867 let is_closed = {
868 if self.queue.is_empty() {
870 return;
871 }
872 self.queue.peek_queue_value(cx, value.handle_mut(), can_gc)
873 };
874
875 if is_closed {
876 self.process_close(cx, global, can_gc);
878 } else {
879 self.process_write(cx, value.handle(), global, can_gc);
881 };
882 }
883
884 pub(crate) fn perform_error_steps(&self) {
886 self.queue.reset();
888 }
889
890 fn process_write(
892 &self,
893 cx: SafeJSContext,
894 chunk: SafeHandleValue,
895 global: &GlobalScope,
896 can_gc: CanGc,
897 ) {
898 let Some(stream) = self.stream.get() else {
900 unreachable!("Controller should have a stream");
901 };
902
903 stream.mark_first_write_request_in_flight();
905
906 let sink_write_promise = self.call_write_algorithm(cx, chunk, global, can_gc);
908
909 rooted!(in(*cx) let mut fulfillment_handler = Some(WriteAlgorithmFulfillmentHandler {
911 controller: Dom::from_ref(self),
912 }));
913
914 rooted!(in(*cx) let mut rejection_handler = Some(WriteAlgorithmRejectionHandler {
916 controller: Dom::from_ref(self),
917 }));
918
919 let handler = PromiseNativeHandler::new(
921 global,
922 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
923 rejection_handler.take().map(|h| Box::new(h) as Box<_>),
924 can_gc,
925 );
926 let realm = enter_realm(global);
927 let comp = InRealm::Entered(&realm);
928 sink_write_promise.append_native_handler(&handler, comp, can_gc);
929 }
930
931 pub(crate) fn get_desired_size(&self) -> f64 {
933 let desired_size = self.strategy_hwm - self.queue.total_size.get().clamp(0.0, f64::MAX);
935 desired_size.clamp(desired_size, self.strategy_hwm)
936 }
937
938 fn get_backpressure(&self) -> bool {
940 let desired_size = self.get_desired_size();
942
943 desired_size == 0.0 || desired_size.is_sign_negative()
945 }
946
947 pub(crate) fn get_chunk_size(
949 &self,
950 cx: SafeJSContext,
951 global: &GlobalScope,
952 chunk: SafeHandleValue,
953 can_gc: CanGc,
954 ) -> f64 {
955 let Some(strategy_size) = self.strategy_size.borrow().clone() else {
957 let Some(stream) = self.stream.get() else {
959 unreachable!("Controller should have a stream");
960 };
961 assert!(!stream.is_writable());
962
963 return 1.0;
965 };
966
967 let result = strategy_size.Call__(chunk, ExceptionHandling::Rethrow, can_gc);
970
971 match result {
972 Ok(size) => size,
974 Err(error) => {
975 rooted!(in(*cx) let mut rooted_error = UndefinedValue());
980 error.to_jsval(cx, global, rooted_error.handle_mut(), can_gc);
981 self.error_if_needed(cx, rooted_error.handle(), global, can_gc);
982
983 1.0
985 },
986 }
987 }
988
989 pub(crate) fn write(
991 &self,
992 cx: SafeJSContext,
993 global: &GlobalScope,
994 chunk: SafeHandleValue,
995 chunk_size: f64,
996 can_gc: CanGc,
997 ) {
998 let enqueue_result = self
1000 .queue
1001 .enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize {
1002 value: Heap::boxed(chunk.get()),
1003 size: chunk_size,
1004 }));
1005
1006 if let Err(error) = enqueue_result {
1008 rooted!(in(*cx) let mut rooted_error = UndefinedValue());
1011 error.to_jsval(cx, global, rooted_error.handle_mut(), can_gc);
1012 self.error_if_needed(cx, rooted_error.handle(), global, can_gc);
1013
1014 return;
1016 }
1017
1018 let Some(stream) = self.stream.get() else {
1020 unreachable!("Controller should have a stream");
1021 };
1022
1023 if !stream.close_queued_or_in_flight() && stream.is_writable() {
1025 let backpressure = self.get_backpressure();
1027
1028 stream.update_backpressure(backpressure, global, can_gc);
1030 }
1031
1032 self.advance_queue_if_needed(cx, global, can_gc);
1034 }
1035
1036 pub(crate) fn error_if_needed(
1038 &self,
1039 cx: SafeJSContext,
1040 error: SafeHandleValue,
1041 global: &GlobalScope,
1042 can_gc: CanGc,
1043 ) {
1044 let Some(stream) = self.stream.get() else {
1046 unreachable!("Controller should have a stream");
1047 };
1048
1049 if stream.is_writable() {
1051 self.error(&stream, cx, error, global, can_gc);
1053 }
1054 }
1055
1056 pub(crate) fn error(
1058 &self,
1059 stream: &WritableStream,
1060 cx: SafeJSContext,
1061 e: SafeHandleValue,
1062 global: &GlobalScope,
1063 can_gc: CanGc,
1064 ) {
1065 assert!(stream.is_writable());
1070
1071 self.clear_algorithms();
1073
1074 stream.start_erroring(cx, global, e, can_gc);
1076 }
1077}
1078
1079impl WritableStreamDefaultControllerMethods<crate::DomTypeHolder>
1080 for WritableStreamDefaultController
1081{
1082 fn Error(&self, cx: SafeJSContext, e: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
1084 let Some(stream) = self.stream.get() else {
1086 unreachable!("Controller should have a stream");
1087 };
1088
1089 if !stream.is_writable() {
1091 return;
1092 }
1093
1094 let global = GlobalScope::from_safe_context(cx, realm);
1095
1096 self.error(&stream, cx, e, &global, can_gc);
1098 }
1099
1100 fn Signal(&self) -> DomRoot<AbortSignal> {
1102 self.abort_controller.signal()
1104 }
1105}