1use std::cell::{Cell, RefCell};
6use std::ptr;
7use std::rc::Rc;
8
9use dom_struct::dom_struct;
10use js::jsapi::{Heap, IsPromiseObject, JSObject};
11use js::jsval::{JSVal, UndefinedValue};
12use js::realm::CurrentRealm;
13use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle};
14
15use crate::dom::bindings::callback::ExceptionHandling;
16use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
17use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::{
18 UnderlyingSinkAbortCallback, UnderlyingSinkCloseCallback, UnderlyingSinkStartCallback,
19 UnderlyingSinkWriteCallback,
20};
21use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultControllerBinding::WritableStreamDefaultControllerMethods;
22use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
23use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
24use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
25use crate::dom::globalscope::GlobalScope;
26use crate::dom::messageport::MessagePort;
27use crate::dom::promise::Promise;
28use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
29use crate::dom::readablestreamdefaultcontroller::{EnqueuedValue, QueueWithSizes, ValueWithSize};
30use crate::dom::stream::writablestream::WritableStream;
31use crate::dom::types::{AbortController, AbortSignal, TransformStream};
32use crate::realms::{InRealm, enter_auto_realm};
33use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
34
35impl js::gc::Rootable for CloseAlgorithmFulfillmentHandler {}
36
37#[derive(Clone, JSTraceable, MallocSizeOf)]
40#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
41struct CloseAlgorithmFulfillmentHandler {
42 stream: Dom<WritableStream>,
43}
44
45impl Callback for CloseAlgorithmFulfillmentHandler {
46 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
47 let can_gc = CanGc::from_cx(cx);
48 let stream = self.stream.as_rooted();
49
50 stream.finish_in_flight_close(cx.into(), can_gc);
52 }
53}
54
55impl js::gc::Rootable for CloseAlgorithmRejectionHandler {}
56
57#[derive(Clone, JSTraceable, MallocSizeOf)]
60#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
61struct CloseAlgorithmRejectionHandler {
62 stream: Dom<WritableStream>,
63}
64
65impl Callback for CloseAlgorithmRejectionHandler {
66 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
67 let stream = self.stream.as_rooted();
68
69 let global = GlobalScope::from_current_realm(cx);
70
71 stream.finish_in_flight_close_with_error(cx, &global, v);
73 }
74}
75
76impl js::gc::Rootable for StartAlgorithmFulfillmentHandler {}
77
78#[derive(Clone, JSTraceable, MallocSizeOf)]
81#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
82struct StartAlgorithmFulfillmentHandler {
83 controller: Dom<WritableStreamDefaultController>,
84}
85
86impl Callback for StartAlgorithmFulfillmentHandler {
87 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
90 let controller = self.controller.as_rooted();
91 let stream = controller
92 .stream
93 .get()
94 .expect("Controller should have a stream.");
95
96 assert!(stream.is_erroring() || stream.is_writable());
98
99 controller.started.set(true);
101
102 let global = GlobalScope::from_current_realm(cx);
103
104 controller.advance_queue_if_needed(cx, &global)
106 }
107}
108
109impl js::gc::Rootable for StartAlgorithmRejectionHandler {}
110
111#[derive(Clone, JSTraceable, MallocSizeOf)]
114#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
115struct StartAlgorithmRejectionHandler {
116 controller: Dom<WritableStreamDefaultController>,
117}
118
119impl Callback for StartAlgorithmRejectionHandler {
120 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
123 let controller = self.controller.as_rooted();
124 let stream = controller
125 .stream
126 .get()
127 .expect("Controller should have a stream.");
128
129 assert!(stream.is_erroring() || stream.is_writable());
131
132 controller.started.set(true);
134
135 let global = GlobalScope::from_current_realm(cx);
136
137 stream.deal_with_rejection(cx, &global, v);
139 }
140}
141
142impl js::gc::Rootable for TransferBackPressurePromiseReaction {}
143
144#[derive(JSTraceable, MallocSizeOf)]
147#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
148struct TransferBackPressurePromiseReaction {
149 #[conditional_malloc_size_of]
151 result_promise: Rc<Promise>,
152
153 #[ignore_malloc_size_of = "nested Rc"]
155 backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
156
157 #[ignore_malloc_size_of = "mozjs"]
159 chunk: Box<Heap<JSVal>>,
160
161 port: Dom<MessagePort>,
163}
164
165impl Callback for TransferBackPressurePromiseReaction {
166 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
168 let can_gc = CanGc::from_cx(cx);
169 let global = self.result_promise.global();
170 let promise = Promise::new2(cx, &global);
172 *self.backpressure_promise.borrow_mut() = Some(promise);
173
174 rooted!(&in(cx) let mut chunk = UndefinedValue());
176 chunk.set(self.chunk.get());
177 let result =
178 self.port
179 .pack_and_post_message_handling_error("chunk", chunk.handle(), can_gc);
180
181 if let Err(error) = result {
183 global.disentangle_port(cx, &self.port);
185
186 self.result_promise.reject_error(error, can_gc);
188 } else {
189 self.result_promise.resolve_native(&(), can_gc);
191 }
192 }
193}
194
195impl js::gc::Rootable for WriteAlgorithmFulfillmentHandler {}
196
197#[derive(Clone, JSTraceable, MallocSizeOf)]
200#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
201struct WriteAlgorithmFulfillmentHandler {
202 controller: Dom<WritableStreamDefaultController>,
203}
204
205impl Callback for WriteAlgorithmFulfillmentHandler {
206 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
207 let can_gc = CanGc::from_cx(cx);
208 let controller = self.controller.as_rooted();
209 let stream = controller
210 .stream
211 .get()
212 .expect("Controller should have a stream.");
213
214 stream.finish_in_flight_write(can_gc);
216
217 assert!(stream.is_erroring() || stream.is_writable());
220
221 rooted!(&in(cx) let mut rval = UndefinedValue());
223 controller
224 .queue
225 .dequeue_value(cx.into(), Some(rval.handle_mut()), can_gc);
226
227 let global = GlobalScope::from_current_realm(cx);
228
229 if !stream.close_queued_or_in_flight() && stream.is_writable() {
231 let backpressure = controller.get_backpressure();
233
234 stream.update_backpressure(backpressure, &global, can_gc);
236 }
237
238 controller.advance_queue_if_needed(cx, &global)
240 }
241}
242
243impl js::gc::Rootable for WriteAlgorithmRejectionHandler {}
244
245#[derive(Clone, JSTraceable, MallocSizeOf)]
248#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
249struct WriteAlgorithmRejectionHandler {
250 controller: Dom<WritableStreamDefaultController>,
251}
252
253impl Callback for WriteAlgorithmRejectionHandler {
254 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
255 let controller = self.controller.as_rooted();
256 let stream = controller
257 .stream
258 .get()
259 .expect("Controller should have a stream.");
260
261 if stream.is_writable() {
263 controller.clear_algorithms();
265 }
266
267 let global = GlobalScope::from_current_realm(cx);
268
269 stream.finish_in_flight_write_with_error(cx, &global, v);
271 }
272}
273
274#[derive(JSTraceable, PartialEq)]
276#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
277pub enum UnderlyingSinkType {
278 Js {
280 abort: RefCell<Option<Rc<UnderlyingSinkAbortCallback>>>,
282
283 start: RefCell<Option<Rc<UnderlyingSinkStartCallback>>>,
284
285 close: RefCell<Option<Rc<UnderlyingSinkCloseCallback>>>,
287
288 write: RefCell<Option<Rc<UnderlyingSinkWriteCallback>>>,
290 },
291 Transfer {
294 backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
295 port: Dom<MessagePort>,
296 },
297 Transform(Dom<TransformStream>, Rc<Promise>),
299}
300
301impl UnderlyingSinkType {
302 pub(crate) fn new_js(
303 abort: Option<Rc<UnderlyingSinkAbortCallback>>,
304 start: Option<Rc<UnderlyingSinkStartCallback>>,
305 close: Option<Rc<UnderlyingSinkCloseCallback>>,
306 write: Option<Rc<UnderlyingSinkWriteCallback>>,
307 ) -> Self {
308 UnderlyingSinkType::Js {
309 abort: RefCell::new(abort),
310 start: RefCell::new(start),
311 close: RefCell::new(close),
312 write: RefCell::new(write),
313 }
314 }
315}
316
317#[dom_struct]
319pub struct WritableStreamDefaultController {
320 reflector_: Reflector,
321
322 #[ignore_malloc_size_of = "underlying_sink_type"]
325 underlying_sink_type: UnderlyingSinkType,
326
327 #[ignore_malloc_size_of = "mozjs"]
329 underlying_sink_obj: Heap<*mut JSObject>,
330
331 queue: QueueWithSizes,
333
334 started: Cell<bool>,
336
337 strategy_hwm: f64,
339
340 #[ignore_malloc_size_of = "QueuingStrategySize"]
342 strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>,
343
344 stream: MutNullableDom<WritableStream>,
346
347 abort_controller: Dom<AbortController>,
349}
350
351impl WritableStreamDefaultController {
352 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
354 fn new_inherited(
355 global: &GlobalScope,
356 underlying_sink_type: UnderlyingSinkType,
357 strategy_hwm: f64,
358 strategy_size: Rc<QueuingStrategySize>,
359 can_gc: CanGc,
360 ) -> WritableStreamDefaultController {
361 WritableStreamDefaultController {
362 reflector_: Reflector::new(),
363 underlying_sink_type,
364 queue: Default::default(),
365 stream: Default::default(),
366 underlying_sink_obj: Default::default(),
367 strategy_hwm,
368 strategy_size: RefCell::new(Some(strategy_size)),
369 started: Default::default(),
370 abort_controller: Dom::from_ref(&AbortController::new_with_proto(global, None, can_gc)),
371 }
372 }
373
374 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
375 pub(crate) fn new(
376 global: &GlobalScope,
377 underlying_sink_type: UnderlyingSinkType,
378 strategy_hwm: f64,
379 strategy_size: Rc<QueuingStrategySize>,
380 can_gc: CanGc,
381 ) -> DomRoot<WritableStreamDefaultController> {
382 reflect_dom_object(
383 Box::new(WritableStreamDefaultController::new_inherited(
384 global,
385 underlying_sink_type,
386 strategy_hwm,
387 strategy_size,
388 can_gc,
389 )),
390 global,
391 can_gc,
392 )
393 }
394
395 pub(crate) fn started(&self) -> bool {
396 self.started.get()
397 }
398
399 pub(crate) fn set_underlying_sink_this_object(&self, this_object: SafeHandleObject) {
401 self.underlying_sink_obj.set(*this_object);
402 }
403
404 pub(crate) fn signal_abort(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) {
406 self.abort_controller.signal_abort(cx, reason);
407 }
408
409 fn clear_algorithms(&self) {
411 match &self.underlying_sink_type {
412 UnderlyingSinkType::Js {
413 abort,
414 start: _,
415 close,
416 write,
417 } => {
418 write.borrow_mut().take();
420
421 close.borrow_mut().take();
423
424 abort.borrow_mut().take();
426 },
427 UnderlyingSinkType::Transfer {
428 backpressure_promise,
429 ..
430 } => {
431 backpressure_promise.borrow_mut().take();
432 },
433 UnderlyingSinkType::Transform(_, _) => {
434 return;
435 },
436 }
437
438 self.strategy_size.borrow_mut().take();
440 }
441
442 pub(crate) fn setup(
444 &self,
445 cx: &mut js::context::JSContext,
446 global: &GlobalScope,
447 stream: &WritableStream,
448 ) -> Result<(), Error> {
449 stream.assert_no_controller();
454
455 self.stream.set(Some(stream));
457
458 stream.set_default_controller(self);
460
461 let backpressure = self.get_backpressure();
481
482 stream.update_backpressure(backpressure, global, CanGc::from_cx(cx));
484
485 let start_promise = self.start_algorithm(cx.into(), global, CanGc::from_cx(cx))?;
488
489 let rooted_default_controller = DomRoot::from_ref(self);
490
491 rooted!(&in(cx) let mut fulfillment_handler = Some(StartAlgorithmFulfillmentHandler {
493 controller: Dom::from_ref(&rooted_default_controller),
494 }));
495
496 rooted!(&in(cx) let mut rejection_handler = Some(StartAlgorithmRejectionHandler {
498 controller: Dom::from_ref(&rooted_default_controller),
499 }));
500
501 let handler = PromiseNativeHandler::new(
502 global,
503 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
504 rejection_handler.take().map(|h| Box::new(h) as Box<_>),
505 CanGc::from_cx(cx),
506 );
507 let mut realm = enter_auto_realm(cx, global);
508 let cx = &mut realm.current_realm();
509
510 let in_realm_proof = cx.into();
511 let comp = InRealm::Already(&in_realm_proof);
512 start_promise.append_native_handler(&handler, comp, CanGc::from_cx(cx));
513
514 Ok(())
515 }
516
517 pub(crate) fn close(&self, cx: &mut js::context::JSContext, global: &GlobalScope) {
519 self.queue
521 .enqueue_value_with_size(EnqueuedValue::CloseSentinel)
522 .expect("Enqueuing the close sentinel should not fail.");
523 self.advance_queue_if_needed(cx, global);
525 }
526
527 #[expect(unsafe_code)]
528 fn start_algorithm(
529 &self,
530 cx: SafeJSContext,
531 global: &GlobalScope,
532 can_gc: CanGc,
533 ) -> Fallible<Rc<Promise>> {
534 match &self.underlying_sink_type {
535 UnderlyingSinkType::Js {
536 start,
537 abort: _,
538 close: _,
539 write: _,
540 } => {
541 let algo = start.borrow().clone();
542 let start_promise = if let Some(start) = algo {
543 rooted!(in(*cx) let mut result_object = ptr::null_mut::<JSObject>());
544 rooted!(in(*cx) let mut result: JSVal);
545 rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
546 start.Call_(
547 &this_object.handle(),
548 self,
549 result.handle_mut(),
550 ExceptionHandling::Rethrow,
551 can_gc,
552 )?;
553 let is_promise = unsafe {
554 if result.is_object() {
555 result_object.set(result.to_object());
556 IsPromiseObject(result_object.handle().into_handle())
557 } else {
558 false
559 }
560 };
561 if is_promise {
562 Promise::new_with_js_promise(result_object.handle(), cx)
563 } else {
564 Promise::new_resolved(global, cx, result.get(), can_gc)
565 }
566 } else {
567 Promise::new_resolved(global, cx, (), can_gc)
569 };
570
571 Ok(start_promise)
572 },
573 UnderlyingSinkType::Transfer { .. } => {
574 Ok(Promise::new_resolved(global, cx, (), can_gc))
576 },
577 UnderlyingSinkType::Transform(_, start_promise) => {
578 Ok(start_promise.clone())
580 },
581 }
582 }
583
584 pub(crate) fn abort_steps(
586 &self,
587 cx: &mut js::context::JSContext,
588 global: &GlobalScope,
589 reason: SafeHandleValue,
590 ) -> Rc<Promise> {
591 let result = match &self.underlying_sink_type {
592 UnderlyingSinkType::Js {
593 abort,
594 start: _,
595 close: _,
596 write: _,
597 } => {
598 rooted!(&in(cx) let this_object = self.underlying_sink_obj.get());
599 let algo = abort.borrow().clone();
600 let result = if let Some(algo) = algo {
602 algo.Call_(
603 &this_object.handle(),
604 Some(reason),
605 ExceptionHandling::Rethrow,
606 CanGc::from_cx(cx),
607 )
608 } else {
609 Ok(Promise::new_resolved(
610 global,
611 cx.into(),
612 (),
613 CanGc::from_cx(cx),
614 ))
615 };
616 result.unwrap_or_else(|e| {
617 let promise = Promise::new(global, CanGc::from_cx(cx));
618 promise.reject_error(e, CanGc::from_cx(cx));
619 promise
620 })
621 },
622 UnderlyingSinkType::Transfer { port, .. } => {
623 let result =
628 port.pack_and_post_message_handling_error("error", reason, CanGc::from_cx(cx));
629
630 global.disentangle_port(cx, port);
632
633 let promise = Promise::new(global, CanGc::from_cx(cx));
634
635 if let Err(error) = result {
637 promise.reject_error(error, CanGc::from_cx(cx));
638 } else {
639 promise.resolve_native(&(), CanGc::from_cx(cx));
641 }
642 promise
643 },
644 UnderlyingSinkType::Transform(stream, _) => {
645 stream
647 .transform_stream_default_sink_abort_algorithm(
648 cx.into(),
649 global,
650 reason,
651 CanGc::from_cx(cx),
652 )
653 .expect("Transform stream default sink abort algorithm should not fail.")
654 },
655 };
656
657 self.clear_algorithms();
659
660 result
661 }
662
663 fn call_write_algorithm(
665 &self,
666 cx: &mut js::context::JSContext,
667 chunk: SafeHandleValue,
668 global: &GlobalScope,
669 ) -> Rc<Promise> {
670 match &self.underlying_sink_type {
671 UnderlyingSinkType::Js {
672 abort: _,
673 start: _,
674 close: _,
675 write,
676 } => {
677 rooted!(&in(cx) let this_object = self.underlying_sink_obj.get());
678 let algo = write.borrow().clone();
679 let result = if let Some(algo) = algo {
680 algo.Call_(
681 &this_object.handle(),
682 chunk,
683 self,
684 ExceptionHandling::Rethrow,
685 CanGc::from_cx(cx),
686 )
687 } else {
688 Ok(Promise::new_resolved(
689 global,
690 cx.into(),
691 (),
692 CanGc::from_cx(cx),
693 ))
694 };
695 result.unwrap_or_else(|e| {
696 let promise = Promise::new2(cx, global);
697 promise.reject_error(e, CanGc::from_cx(cx));
698 promise
699 })
700 },
701 UnderlyingSinkType::Transfer {
702 backpressure_promise,
703 port,
704 } => {
705 if backpressure_promise.borrow().is_none() {
711 let promise = Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
712 *backpressure_promise.borrow_mut() = Some(promise);
713 }
714
715 let result_promise = Promise::new2(cx, global);
717 rooted!(&in(cx) let mut fulfillment_handler = Some(TransferBackPressurePromiseReaction {
718 port: port.clone(),
719 backpressure_promise: backpressure_promise.clone(),
720 chunk: Heap::boxed(chunk.get()),
721 result_promise: result_promise.clone(),
722 }));
723 let handler = PromiseNativeHandler::new(
724 global,
725 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
726 None,
727 CanGc::from_cx(cx),
728 );
729 let mut realm = enter_auto_realm(cx, global);
730 let realm = &mut realm.current_realm();
731 let in_realm_proof = realm.into();
732 let comp = InRealm::Already(&in_realm_proof);
733 backpressure_promise
734 .borrow()
735 .as_ref()
736 .expect("Promise must be some by now.")
737 .append_native_handler(&handler, comp, CanGc::from_cx(realm));
738 result_promise
739 },
740 UnderlyingSinkType::Transform(stream, _) => {
741 stream
743 .transform_stream_default_sink_write_algorithm(cx, global, chunk)
744 .expect("Transform stream default sink write algorithm should not fail.")
745 },
746 }
747 }
748
749 fn call_close_algorithm(
751 &self,
752 cx: &mut js::context::JSContext,
753 global: &GlobalScope,
754 ) -> Rc<Promise> {
755 match &self.underlying_sink_type {
756 UnderlyingSinkType::Js {
757 abort: _,
758 start: _,
759 close,
760 write: _,
761 } => {
762 rooted!(&in(cx) let mut this_object = ptr::null_mut::<JSObject>());
763 this_object.set(self.underlying_sink_obj.get());
764 let algo = close.borrow().clone();
765 let result = if let Some(algo) = algo {
766 algo.Call_(
767 &this_object.handle(),
768 ExceptionHandling::Rethrow,
769 CanGc::from_cx(cx),
770 )
771 } else {
772 Ok(Promise::new_resolved(
773 global,
774 cx.into(),
775 (),
776 CanGc::from_cx(cx),
777 ))
778 };
779 result.unwrap_or_else(|e| {
780 let promise = Promise::new2(cx, global);
781 promise.reject_error(e, CanGc::from_cx(cx));
782 promise
783 })
784 },
785 UnderlyingSinkType::Transfer { port, .. } => {
786 rooted!(&in(cx) let mut value = UndefinedValue());
791 port.pack_and_post_message("close", value.handle(), CanGc::from_cx(cx))
792 .expect("Sending close should not fail.");
793
794 global.disentangle_port(cx, port);
796
797 Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
799 },
800 UnderlyingSinkType::Transform(stream, _) => {
801 stream
803 .transform_stream_default_sink_close_algorithm(cx, global)
804 .expect("Transform stream default sink close algorithm should not fail.")
805 },
806 }
807 }
808
809 pub(crate) fn process_close(&self, cx: &mut js::context::JSContext, global: &GlobalScope) {
811 let Some(stream) = self.stream.get() else {
813 unreachable!("Controller should have a stream");
814 };
815
816 stream.mark_close_request_in_flight();
818
819 self.queue
821 .dequeue_value(cx.into(), None, CanGc::from_cx(cx));
822
823 assert!(self.queue.is_empty());
825
826 let sink_close_promise = self.call_close_algorithm(cx, global);
828
829 self.clear_algorithms();
831
832 rooted!(&in(cx) let mut fulfillment_handler = Some(CloseAlgorithmFulfillmentHandler {
834 stream: Dom::from_ref(&stream),
835 }));
836
837 rooted!(&in(cx) let mut rejection_handler = Some(CloseAlgorithmRejectionHandler {
839 stream: Dom::from_ref(&stream),
840 }));
841
842 let handler = PromiseNativeHandler::new(
844 global,
845 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
846 rejection_handler.take().map(|h| Box::new(h) as Box<_>),
847 CanGc::from_cx(cx),
848 );
849 let mut realm = enter_auto_realm(cx, global);
850 let realm = &mut realm.current_realm();
851 let in_realm_proof = realm.into();
852 let comp = InRealm::Already(&in_realm_proof);
853 sink_close_promise.append_native_handler(&handler, comp, CanGc::from_cx(realm));
854 }
855
856 fn advance_queue_if_needed(&self, cx: &mut js::context::JSContext, global: &GlobalScope) {
858 let Some(stream) = self.stream.get() else {
860 unreachable!("Controller should have a stream");
861 };
862
863 if !self.started.get() {
865 return;
866 }
867
868 if stream.has_in_flight_write_request() {
870 return;
871 }
872
873 assert!(!(stream.is_errored() || stream.is_closed()));
877
878 if stream.is_erroring() {
880 stream.finish_erroring(cx, global);
882
883 return;
885 }
886
887 rooted!(&in(cx) let mut value = UndefinedValue());
889 let is_closed = {
890 if self.queue.is_empty() {
892 return;
893 }
894 self.queue
895 .peek_queue_value(cx.into(), value.handle_mut(), CanGc::from_cx(cx))
896 };
897
898 if is_closed {
899 self.process_close(cx, global);
901 } else {
902 self.process_write(cx, value.handle(), global);
904 };
905 }
906
907 pub(crate) fn perform_error_steps(&self) {
909 self.queue.reset();
911 }
912
913 fn process_write(
915 &self,
916 cx: &mut js::context::JSContext,
917 chunk: SafeHandleValue,
918 global: &GlobalScope,
919 ) {
920 let Some(stream) = self.stream.get() else {
922 unreachable!("Controller should have a stream");
923 };
924
925 stream.mark_first_write_request_in_flight();
927
928 let sink_write_promise = self.call_write_algorithm(cx, chunk, global);
930
931 rooted!(&in(cx) let mut fulfillment_handler = Some(WriteAlgorithmFulfillmentHandler {
933 controller: Dom::from_ref(self),
934 }));
935
936 rooted!(&in(cx) let mut rejection_handler = Some(WriteAlgorithmRejectionHandler {
938 controller: Dom::from_ref(self),
939 }));
940
941 let handler = PromiseNativeHandler::new(
943 global,
944 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
945 rejection_handler.take().map(|h| Box::new(h) as Box<_>),
946 CanGc::from_cx(cx),
947 );
948 let mut realm = enter_auto_realm(cx, global);
949 let realm = &mut realm.current_realm();
950 let in_realm_proof = realm.into();
951 let comp = InRealm::Already(&in_realm_proof);
952 sink_write_promise.append_native_handler(&handler, comp, CanGc::from_cx(realm));
953 }
954
955 pub(crate) fn get_desired_size(&self) -> f64 {
957 let desired_size = self.strategy_hwm - self.queue.total_size.get().clamp(0.0, f64::MAX);
959 desired_size.clamp(desired_size, self.strategy_hwm)
960 }
961
962 fn get_backpressure(&self) -> bool {
964 let desired_size = self.get_desired_size();
966
967 desired_size == 0.0 || desired_size.is_sign_negative()
969 }
970
971 pub(crate) fn get_chunk_size(
973 &self,
974 cx: &mut js::context::JSContext,
975 global: &GlobalScope,
976 chunk: SafeHandleValue,
977 ) -> f64 {
978 let Some(strategy_size) = self.strategy_size.borrow().clone() else {
980 let Some(stream) = self.stream.get() else {
982 unreachable!("Controller should have a stream");
983 };
984 assert!(!stream.is_writable());
985
986 return 1.0;
988 };
989
990 let result = strategy_size.Call__(chunk, ExceptionHandling::Rethrow, CanGc::from_cx(cx));
993
994 match result {
995 Ok(size) => size,
997 Err(error) => {
998 rooted!(&in(cx) let mut rooted_error = UndefinedValue());
1003 error.to_jsval(
1004 cx.into(),
1005 global,
1006 rooted_error.handle_mut(),
1007 CanGc::from_cx(cx),
1008 );
1009 self.error_if_needed(cx, rooted_error.handle(), global);
1010
1011 1.0
1013 },
1014 }
1015 }
1016
1017 pub(crate) fn write(
1019 &self,
1020 cx: &mut js::context::JSContext,
1021 global: &GlobalScope,
1022 chunk: SafeHandleValue,
1023 chunk_size: f64,
1024 ) {
1025 let enqueue_result = self
1027 .queue
1028 .enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize {
1029 value: Heap::boxed(chunk.get()),
1030 size: chunk_size,
1031 }));
1032
1033 if let Err(error) = enqueue_result {
1035 rooted!(&in(cx) let mut rooted_error = UndefinedValue());
1038 error.to_jsval(
1039 cx.into(),
1040 global,
1041 rooted_error.handle_mut(),
1042 CanGc::from_cx(cx),
1043 );
1044 self.error_if_needed(cx, rooted_error.handle(), global);
1045
1046 return;
1048 }
1049
1050 let Some(stream) = self.stream.get() else {
1052 unreachable!("Controller should have a stream");
1053 };
1054
1055 if !stream.close_queued_or_in_flight() && stream.is_writable() {
1057 let backpressure = self.get_backpressure();
1059
1060 stream.update_backpressure(backpressure, global, CanGc::from_cx(cx));
1062 }
1063
1064 self.advance_queue_if_needed(cx, global);
1066 }
1067
1068 pub(crate) fn error_if_needed(
1070 &self,
1071 cx: &mut js::context::JSContext,
1072 error: SafeHandleValue,
1073 global: &GlobalScope,
1074 ) {
1075 let Some(stream) = self.stream.get() else {
1077 unreachable!("Controller should have a stream");
1078 };
1079
1080 if stream.is_writable() {
1082 self.error(cx, &stream, error, global);
1084 }
1085 }
1086
1087 fn error(
1089 &self,
1090 cx: &mut js::context::JSContext,
1091 stream: &WritableStream,
1092 e: SafeHandleValue,
1093 global: &GlobalScope,
1094 ) {
1095 assert!(stream.is_writable());
1100
1101 self.clear_algorithms();
1103
1104 stream.start_erroring(cx, global, e);
1106 }
1107}
1108
1109impl WritableStreamDefaultControllerMethods<crate::DomTypeHolder>
1110 for WritableStreamDefaultController
1111{
1112 fn Error(&self, cx: &mut CurrentRealm, e: SafeHandleValue) {
1114 let Some(stream) = self.stream.get() else {
1116 unreachable!("Controller should have a stream");
1117 };
1118
1119 if !stream.is_writable() {
1121 return;
1122 }
1123
1124 let global = GlobalScope::from_current_realm(cx);
1125
1126 self.error(cx, &stream, e, &global);
1128 }
1129
1130 fn Signal(&self) -> DomRoot<AbortSignal> {
1132 self.abort_controller.signal()
1134 }
1135}