1use std::cell::{Cell, RefCell};
6use std::ptr;
7use std::rc::Rc;
8
9use dom_struct::dom_struct;
10use js::jsapi::{Heap, IsPromiseObject, JSObject};
11use js::jsval::{JSVal, UndefinedValue};
12use js::realm::CurrentRealm;
13use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle};
14
15use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
16use super::types::TransformStream;
17use crate::dom::bindings::callback::ExceptionHandling;
18use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::{
19 UnderlyingSinkAbortCallback, UnderlyingSinkCloseCallback, UnderlyingSinkStartCallback,
20 UnderlyingSinkWriteCallback,
21};
22use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultControllerBinding::WritableStreamDefaultControllerMethods;
23use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
24use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
25use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
26use crate::dom::globalscope::GlobalScope;
27use crate::dom::messageport::MessagePort;
28use crate::dom::promise::Promise;
29use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
30use crate::dom::readablestreamdefaultcontroller::{EnqueuedValue, QueueWithSizes, ValueWithSize};
31use crate::dom::types::{AbortController, AbortSignal};
32use crate::dom::writablestream::WritableStream;
33use crate::realms::{InRealm, enter_realm};
34use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
35
36impl js::gc::Rootable for CloseAlgorithmFulfillmentHandler {}
37
38#[derive(Clone, JSTraceable, MallocSizeOf)]
41#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
42struct CloseAlgorithmFulfillmentHandler {
43 stream: Dom<WritableStream>,
44}
45
46impl Callback for CloseAlgorithmFulfillmentHandler {
47 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
48 let can_gc = CanGc::from_cx(cx);
49 let stream = self.stream.as_rooted();
50
51 stream.finish_in_flight_close(cx.into(), can_gc);
53 }
54}
55
56impl js::gc::Rootable for CloseAlgorithmRejectionHandler {}
57
58#[derive(Clone, JSTraceable, MallocSizeOf)]
61#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
62struct CloseAlgorithmRejectionHandler {
63 stream: Dom<WritableStream>,
64}
65
66impl Callback for CloseAlgorithmRejectionHandler {
67 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
68 let stream = self.stream.as_rooted();
69
70 let global = GlobalScope::from_current_realm(cx);
71
72 stream.finish_in_flight_close_with_error(cx.into(), &global, v, CanGc::from_cx(cx));
74 }
75}
76
77impl js::gc::Rootable for StartAlgorithmFulfillmentHandler {}
78
79#[derive(Clone, JSTraceable, MallocSizeOf)]
82#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
83struct StartAlgorithmFulfillmentHandler {
84 controller: Dom<WritableStreamDefaultController>,
85}
86
87impl Callback for StartAlgorithmFulfillmentHandler {
88 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
91 let controller = self.controller.as_rooted();
92 let stream = controller
93 .stream
94 .get()
95 .expect("Controller should have a stream.");
96
97 assert!(stream.is_erroring() || stream.is_writable());
99
100 controller.started.set(true);
102
103 let global = GlobalScope::from_current_realm(cx);
104
105 controller.advance_queue_if_needed(cx.into(), &global, CanGc::from_cx(cx))
107 }
108}
109
110impl js::gc::Rootable for StartAlgorithmRejectionHandler {}
111
112#[derive(Clone, JSTraceable, MallocSizeOf)]
115#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
116struct StartAlgorithmRejectionHandler {
117 controller: Dom<WritableStreamDefaultController>,
118}
119
120impl Callback for StartAlgorithmRejectionHandler {
121 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
124 let controller = self.controller.as_rooted();
125 let stream = controller
126 .stream
127 .get()
128 .expect("Controller should have a stream.");
129
130 assert!(stream.is_erroring() || stream.is_writable());
132
133 controller.started.set(true);
135
136 let global = GlobalScope::from_current_realm(cx);
137
138 stream.deal_with_rejection(cx.into(), &global, v, CanGc::from_cx(cx));
140 }
141}
142
143impl js::gc::Rootable for TransferBackPressurePromiseReaction {}
144
145#[derive(JSTraceable, MallocSizeOf)]
148#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
149struct TransferBackPressurePromiseReaction {
150 #[conditional_malloc_size_of]
152 result_promise: Rc<Promise>,
153
154 #[ignore_malloc_size_of = "nested Rc"]
156 backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
157
158 #[ignore_malloc_size_of = "mozjs"]
160 chunk: Box<Heap<JSVal>>,
161
162 port: Dom<MessagePort>,
164}
165
166impl Callback for TransferBackPressurePromiseReaction {
167 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
169 let can_gc = CanGc::from_cx(cx);
170 let global = self.result_promise.global();
171 let promise = Promise::new2(cx, &global);
173 *self.backpressure_promise.borrow_mut() = Some(promise);
174
175 rooted!(&in(cx) let mut chunk = UndefinedValue());
177 chunk.set(self.chunk.get());
178 let result =
179 self.port
180 .pack_and_post_message_handling_error("chunk", chunk.handle(), can_gc);
181
182 if let Err(error) = result {
184 global.disentangle_port(&self.port, can_gc);
186
187 self.result_promise.reject_error(error, can_gc);
189 } else {
190 self.result_promise.resolve_native(&(), can_gc);
192 }
193 }
194}
195
196impl js::gc::Rootable for WriteAlgorithmFulfillmentHandler {}
197
198#[derive(Clone, JSTraceable, MallocSizeOf)]
201#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
202struct WriteAlgorithmFulfillmentHandler {
203 controller: Dom<WritableStreamDefaultController>,
204}
205
206impl Callback for WriteAlgorithmFulfillmentHandler {
207 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
208 let can_gc = CanGc::from_cx(cx);
209 let controller = self.controller.as_rooted();
210 let stream = controller
211 .stream
212 .get()
213 .expect("Controller should have a stream.");
214
215 stream.finish_in_flight_write(can_gc);
217
218 assert!(stream.is_erroring() || stream.is_writable());
221
222 rooted!(&in(cx) let mut rval = UndefinedValue());
224 controller
225 .queue
226 .dequeue_value(cx.into(), Some(rval.handle_mut()), can_gc);
227
228 let global = GlobalScope::from_current_realm(cx);
229
230 if !stream.close_queued_or_in_flight() && stream.is_writable() {
232 let backpressure = controller.get_backpressure();
234
235 stream.update_backpressure(backpressure, &global, can_gc);
237 }
238
239 controller.advance_queue_if_needed(cx.into(), &global, can_gc)
241 }
242}
243
244impl js::gc::Rootable for WriteAlgorithmRejectionHandler {}
245
246#[derive(Clone, JSTraceable, MallocSizeOf)]
249#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
250struct WriteAlgorithmRejectionHandler {
251 controller: Dom<WritableStreamDefaultController>,
252}
253
254impl Callback for WriteAlgorithmRejectionHandler {
255 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
256 let controller = self.controller.as_rooted();
257 let stream = controller
258 .stream
259 .get()
260 .expect("Controller should have a stream.");
261
262 if stream.is_writable() {
264 controller.clear_algorithms();
266 }
267
268 let global = GlobalScope::from_current_realm(cx);
269
270 stream.finish_in_flight_write_with_error(cx.into(), &global, v, CanGc::from_cx(cx));
272 }
273}
274
275#[derive(JSTraceable, PartialEq)]
277#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
278pub enum UnderlyingSinkType {
279 Js {
281 abort: RefCell<Option<Rc<UnderlyingSinkAbortCallback>>>,
283
284 start: RefCell<Option<Rc<UnderlyingSinkStartCallback>>>,
285
286 close: RefCell<Option<Rc<UnderlyingSinkCloseCallback>>>,
288
289 write: RefCell<Option<Rc<UnderlyingSinkWriteCallback>>>,
291 },
292 Transfer {
295 backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
296 port: Dom<MessagePort>,
297 },
298 Transform(Dom<TransformStream>, Rc<Promise>),
300}
301
302impl UnderlyingSinkType {
303 pub(crate) fn new_js(
304 abort: Option<Rc<UnderlyingSinkAbortCallback>>,
305 start: Option<Rc<UnderlyingSinkStartCallback>>,
306 close: Option<Rc<UnderlyingSinkCloseCallback>>,
307 write: Option<Rc<UnderlyingSinkWriteCallback>>,
308 ) -> Self {
309 UnderlyingSinkType::Js {
310 abort: RefCell::new(abort),
311 start: RefCell::new(start),
312 close: RefCell::new(close),
313 write: RefCell::new(write),
314 }
315 }
316}
317
318#[dom_struct]
320pub struct WritableStreamDefaultController {
321 reflector_: Reflector,
322
323 #[ignore_malloc_size_of = "underlying_sink_type"]
326 underlying_sink_type: UnderlyingSinkType,
327
328 #[ignore_malloc_size_of = "mozjs"]
330 underlying_sink_obj: Heap<*mut JSObject>,
331
332 queue: QueueWithSizes,
334
335 started: Cell<bool>,
337
338 strategy_hwm: f64,
340
341 #[ignore_malloc_size_of = "QueuingStrategySize"]
343 strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>,
344
345 stream: MutNullableDom<WritableStream>,
347
348 abort_controller: Dom<AbortController>,
350}
351
352impl WritableStreamDefaultController {
353 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
355 fn new_inherited(
356 global: &GlobalScope,
357 underlying_sink_type: UnderlyingSinkType,
358 strategy_hwm: f64,
359 strategy_size: Rc<QueuingStrategySize>,
360 can_gc: CanGc,
361 ) -> WritableStreamDefaultController {
362 WritableStreamDefaultController {
363 reflector_: Reflector::new(),
364 underlying_sink_type,
365 queue: Default::default(),
366 stream: Default::default(),
367 underlying_sink_obj: Default::default(),
368 strategy_hwm,
369 strategy_size: RefCell::new(Some(strategy_size)),
370 started: Default::default(),
371 abort_controller: Dom::from_ref(&AbortController::new_with_proto(global, None, can_gc)),
372 }
373 }
374
375 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
376 pub(crate) fn new(
377 global: &GlobalScope,
378 underlying_sink_type: UnderlyingSinkType,
379 strategy_hwm: f64,
380 strategy_size: Rc<QueuingStrategySize>,
381 can_gc: CanGc,
382 ) -> DomRoot<WritableStreamDefaultController> {
383 reflect_dom_object(
384 Box::new(WritableStreamDefaultController::new_inherited(
385 global,
386 underlying_sink_type,
387 strategy_hwm,
388 strategy_size,
389 can_gc,
390 )),
391 global,
392 can_gc,
393 )
394 }
395
396 pub(crate) fn started(&self) -> bool {
397 self.started.get()
398 }
399
400 pub(crate) fn set_underlying_sink_this_object(&self, this_object: SafeHandleObject) {
402 self.underlying_sink_obj.set(*this_object);
403 }
404
405 pub(crate) fn signal_abort(
407 &self,
408 cx: SafeJSContext,
409 reason: SafeHandleValue,
410 realm: InRealm,
411 can_gc: CanGc,
412 ) {
413 self.abort_controller
414 .signal_abort(cx, reason, realm, can_gc);
415 }
416
417 fn clear_algorithms(&self) {
419 match &self.underlying_sink_type {
420 UnderlyingSinkType::Js {
421 abort,
422 start: _,
423 close,
424 write,
425 } => {
426 write.borrow_mut().take();
428
429 close.borrow_mut().take();
431
432 abort.borrow_mut().take();
434 },
435 UnderlyingSinkType::Transfer {
436 backpressure_promise,
437 ..
438 } => {
439 backpressure_promise.borrow_mut().take();
440 },
441 UnderlyingSinkType::Transform(_, _) => {
442 return;
443 },
444 }
445
446 self.strategy_size.borrow_mut().take();
448 }
449
450 pub(crate) fn setup(
452 &self,
453 cx: SafeJSContext,
454 global: &GlobalScope,
455 stream: &WritableStream,
456 can_gc: CanGc,
457 ) -> Result<(), Error> {
458 stream.assert_no_controller();
463
464 self.stream.set(Some(stream));
466
467 stream.set_default_controller(self);
469
470 let backpressure = self.get_backpressure();
490
491 stream.update_backpressure(backpressure, global, can_gc);
493
494 let start_promise = self.start_algorithm(cx, global, can_gc)?;
497
498 let rooted_default_controller = DomRoot::from_ref(self);
499
500 rooted!(in(*cx) let mut fulfillment_handler = Some(StartAlgorithmFulfillmentHandler {
502 controller: Dom::from_ref(&rooted_default_controller),
503 }));
504
505 rooted!(in(*cx) let mut rejection_handler = Some(StartAlgorithmRejectionHandler {
507 controller: Dom::from_ref(&rooted_default_controller),
508 }));
509
510 let handler = PromiseNativeHandler::new(
511 global,
512 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
513 rejection_handler.take().map(|h| Box::new(h) as Box<_>),
514 can_gc,
515 );
516 let realm = enter_realm(global);
517 let comp = InRealm::Entered(&realm);
518 start_promise.append_native_handler(&handler, comp, can_gc);
519
520 Ok(())
521 }
522
523 pub(crate) fn close(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
525 self.queue
527 .enqueue_value_with_size(EnqueuedValue::CloseSentinel)
528 .expect("Enqueuing the close sentinel should not fail.");
529 self.advance_queue_if_needed(cx, global, can_gc);
531 }
532
533 #[expect(unsafe_code)]
534 fn start_algorithm(
535 &self,
536 cx: SafeJSContext,
537 global: &GlobalScope,
538 can_gc: CanGc,
539 ) -> Fallible<Rc<Promise>> {
540 match &self.underlying_sink_type {
541 UnderlyingSinkType::Js {
542 start,
543 abort: _,
544 close: _,
545 write: _,
546 } => {
547 let algo = start.borrow().clone();
548 let start_promise = if let Some(start) = algo {
549 rooted!(in(*cx) let mut result_object = ptr::null_mut::<JSObject>());
550 rooted!(in(*cx) let mut result: JSVal);
551 rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
552 start.Call_(
553 &this_object.handle(),
554 self,
555 result.handle_mut(),
556 ExceptionHandling::Rethrow,
557 can_gc,
558 )?;
559 let is_promise = unsafe {
560 if result.is_object() {
561 result_object.set(result.to_object());
562 IsPromiseObject(result_object.handle().into_handle())
563 } else {
564 false
565 }
566 };
567 if is_promise {
568 Promise::new_with_js_promise(result_object.handle(), cx)
569 } else {
570 Promise::new_resolved(global, cx, result.get(), can_gc)
571 }
572 } else {
573 Promise::new_resolved(global, cx, (), can_gc)
575 };
576
577 Ok(start_promise)
578 },
579 UnderlyingSinkType::Transfer { .. } => {
580 Ok(Promise::new_resolved(global, cx, (), can_gc))
582 },
583 UnderlyingSinkType::Transform(_, start_promise) => {
584 Ok(start_promise.clone())
586 },
587 }
588 }
589
590 pub(crate) fn abort_steps(
592 &self,
593 cx: SafeJSContext,
594 global: &GlobalScope,
595 reason: SafeHandleValue,
596 can_gc: CanGc,
597 ) -> Rc<Promise> {
598 let result = match &self.underlying_sink_type {
599 UnderlyingSinkType::Js {
600 abort,
601 start: _,
602 close: _,
603 write: _,
604 } => {
605 rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
606 let algo = abort.borrow().clone();
607 let result = if let Some(algo) = algo {
609 algo.Call_(
610 &this_object.handle(),
611 Some(reason),
612 ExceptionHandling::Rethrow,
613 can_gc,
614 )
615 } else {
616 Ok(Promise::new_resolved(global, cx, (), can_gc))
617 };
618 result.unwrap_or_else(|e| {
619 let promise = Promise::new(global, can_gc);
620 promise.reject_error(e, can_gc);
621 promise
622 })
623 },
624 UnderlyingSinkType::Transfer { port, .. } => {
625 let result = port.pack_and_post_message_handling_error("error", reason, can_gc);
630
631 global.disentangle_port(port, can_gc);
633
634 let promise = Promise::new(global, can_gc);
635
636 if let Err(error) = result {
638 promise.reject_error(error, can_gc);
639 } else {
640 promise.resolve_native(&(), can_gc);
642 }
643 promise
644 },
645 UnderlyingSinkType::Transform(stream, _) => {
646 stream
648 .transform_stream_default_sink_abort_algorithm(cx, global, reason, can_gc)
649 .expect("Transform stream default sink abort algorithm should not fail.")
650 },
651 };
652
653 self.clear_algorithms();
655
656 result
657 }
658
659 fn call_write_algorithm(
661 &self,
662 cx: SafeJSContext,
663 chunk: SafeHandleValue,
664 global: &GlobalScope,
665 can_gc: CanGc,
666 ) -> Rc<Promise> {
667 match &self.underlying_sink_type {
668 UnderlyingSinkType::Js {
669 abort: _,
670 start: _,
671 close: _,
672 write,
673 } => {
674 rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
675 let algo = write.borrow().clone();
676 let result = if let Some(algo) = algo {
677 algo.Call_(
678 &this_object.handle(),
679 chunk,
680 self,
681 ExceptionHandling::Rethrow,
682 can_gc,
683 )
684 } else {
685 Ok(Promise::new_resolved(global, cx, (), can_gc))
686 };
687 result.unwrap_or_else(|e| {
688 let promise = Promise::new(global, can_gc);
689 promise.reject_error(e, can_gc);
690 promise
691 })
692 },
693 UnderlyingSinkType::Transfer {
694 backpressure_promise,
695 port,
696 } => {
697 if backpressure_promise.borrow().is_none() {
703 let promise = Promise::new_resolved(global, cx, (), can_gc);
704 *backpressure_promise.borrow_mut() = Some(promise);
705 }
706
707 let result_promise = Promise::new(global, can_gc);
709 rooted!(in(*cx) let mut fulfillment_handler = Some(TransferBackPressurePromiseReaction {
710 port: port.clone(),
711 backpressure_promise: backpressure_promise.clone(),
712 chunk: Heap::boxed(chunk.get()),
713 result_promise: result_promise.clone(),
714 }));
715 let handler = PromiseNativeHandler::new(
716 global,
717 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
718 None,
719 can_gc,
720 );
721 let realm = enter_realm(global);
722 let comp = InRealm::Entered(&realm);
723 backpressure_promise
724 .borrow()
725 .as_ref()
726 .expect("Promise must be some by now.")
727 .append_native_handler(&handler, comp, can_gc);
728 result_promise
729 },
730 UnderlyingSinkType::Transform(stream, _) => {
731 stream
733 .transform_stream_default_sink_write_algorithm(cx, global, chunk, can_gc)
734 .expect("Transform stream default sink write algorithm should not fail.")
735 },
736 }
737 }
738
739 fn call_close_algorithm(
741 &self,
742 cx: SafeJSContext,
743 global: &GlobalScope,
744 can_gc: CanGc,
745 ) -> Rc<Promise> {
746 match &self.underlying_sink_type {
747 UnderlyingSinkType::Js {
748 abort: _,
749 start: _,
750 close,
751 write: _,
752 } => {
753 rooted!(in(*cx) let mut this_object = ptr::null_mut::<JSObject>());
754 this_object.set(self.underlying_sink_obj.get());
755 let algo = close.borrow().clone();
756 let result = if let Some(algo) = algo {
757 algo.Call_(&this_object.handle(), ExceptionHandling::Rethrow, can_gc)
758 } else {
759 Ok(Promise::new_resolved(global, cx, (), can_gc))
760 };
761 result.unwrap_or_else(|e| {
762 let promise = Promise::new(global, can_gc);
763 promise.reject_error(e, can_gc);
764 promise
765 })
766 },
767 UnderlyingSinkType::Transfer { port, .. } => {
768 rooted!(in(*cx) let mut value = UndefinedValue());
773 port.pack_and_post_message("close", value.handle(), can_gc)
774 .expect("Sending close should not fail.");
775
776 global.disentangle_port(port, can_gc);
778
779 Promise::new_resolved(global, cx, (), can_gc)
781 },
782 UnderlyingSinkType::Transform(stream, _) => {
783 stream
785 .transform_stream_default_sink_close_algorithm(cx, global, can_gc)
786 .expect("Transform stream default sink close algorithm should not fail.")
787 },
788 }
789 }
790
791 pub(crate) fn process_close(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
793 let Some(stream) = self.stream.get() else {
795 unreachable!("Controller should have a stream");
796 };
797
798 stream.mark_close_request_in_flight();
800
801 self.queue.dequeue_value(cx, None, can_gc);
803
804 assert!(self.queue.is_empty());
806
807 let sink_close_promise = self.call_close_algorithm(cx, global, can_gc);
809
810 self.clear_algorithms();
812
813 rooted!(in(*cx) let mut fulfillment_handler = Some(CloseAlgorithmFulfillmentHandler {
815 stream: Dom::from_ref(&stream),
816 }));
817
818 rooted!(in(*cx) let mut rejection_handler = Some(CloseAlgorithmRejectionHandler {
820 stream: Dom::from_ref(&stream),
821 }));
822
823 let handler = PromiseNativeHandler::new(
825 global,
826 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
827 rejection_handler.take().map(|h| Box::new(h) as Box<_>),
828 can_gc,
829 );
830 let realm = enter_realm(global);
831 let comp = InRealm::Entered(&realm);
832 sink_close_promise.append_native_handler(&handler, comp, can_gc);
833 }
834
835 fn advance_queue_if_needed(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
837 let Some(stream) = self.stream.get() else {
839 unreachable!("Controller should have a stream");
840 };
841
842 if !self.started.get() {
844 return;
845 }
846
847 if stream.has_in_flight_write_request() {
849 return;
850 }
851
852 assert!(!(stream.is_errored() || stream.is_closed()));
856
857 if stream.is_erroring() {
859 stream.finish_erroring(cx, global, can_gc);
861
862 return;
864 }
865
866 rooted!(in(*cx) let mut value = UndefinedValue());
868 let is_closed = {
869 if self.queue.is_empty() {
871 return;
872 }
873 self.queue.peek_queue_value(cx, value.handle_mut(), can_gc)
874 };
875
876 if is_closed {
877 self.process_close(cx, global, can_gc);
879 } else {
880 self.process_write(cx, value.handle(), global, can_gc);
882 };
883 }
884
885 pub(crate) fn perform_error_steps(&self) {
887 self.queue.reset();
889 }
890
891 fn process_write(
893 &self,
894 cx: SafeJSContext,
895 chunk: SafeHandleValue,
896 global: &GlobalScope,
897 can_gc: CanGc,
898 ) {
899 let Some(stream) = self.stream.get() else {
901 unreachable!("Controller should have a stream");
902 };
903
904 stream.mark_first_write_request_in_flight();
906
907 let sink_write_promise = self.call_write_algorithm(cx, chunk, global, can_gc);
909
910 rooted!(in(*cx) let mut fulfillment_handler = Some(WriteAlgorithmFulfillmentHandler {
912 controller: Dom::from_ref(self),
913 }));
914
915 rooted!(in(*cx) let mut rejection_handler = Some(WriteAlgorithmRejectionHandler {
917 controller: Dom::from_ref(self),
918 }));
919
920 let handler = PromiseNativeHandler::new(
922 global,
923 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
924 rejection_handler.take().map(|h| Box::new(h) as Box<_>),
925 can_gc,
926 );
927 let realm = enter_realm(global);
928 let comp = InRealm::Entered(&realm);
929 sink_write_promise.append_native_handler(&handler, comp, can_gc);
930 }
931
932 pub(crate) fn get_desired_size(&self) -> f64 {
934 let desired_size = self.strategy_hwm - self.queue.total_size.get().clamp(0.0, f64::MAX);
936 desired_size.clamp(desired_size, self.strategy_hwm)
937 }
938
939 fn get_backpressure(&self) -> bool {
941 let desired_size = self.get_desired_size();
943
944 desired_size == 0.0 || desired_size.is_sign_negative()
946 }
947
948 pub(crate) fn get_chunk_size(
950 &self,
951 cx: SafeJSContext,
952 global: &GlobalScope,
953 chunk: SafeHandleValue,
954 can_gc: CanGc,
955 ) -> f64 {
956 let Some(strategy_size) = self.strategy_size.borrow().clone() else {
958 let Some(stream) = self.stream.get() else {
960 unreachable!("Controller should have a stream");
961 };
962 assert!(!stream.is_writable());
963
964 return 1.0;
966 };
967
968 let result = strategy_size.Call__(chunk, ExceptionHandling::Rethrow, can_gc);
971
972 match result {
973 Ok(size) => size,
975 Err(error) => {
976 rooted!(in(*cx) let mut rooted_error = UndefinedValue());
981 error.to_jsval(cx, global, rooted_error.handle_mut(), can_gc);
982 self.error_if_needed(cx, rooted_error.handle(), global, can_gc);
983
984 1.0
986 },
987 }
988 }
989
990 pub(crate) fn write(
992 &self,
993 cx: SafeJSContext,
994 global: &GlobalScope,
995 chunk: SafeHandleValue,
996 chunk_size: f64,
997 can_gc: CanGc,
998 ) {
999 let enqueue_result = self
1001 .queue
1002 .enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize {
1003 value: Heap::boxed(chunk.get()),
1004 size: chunk_size,
1005 }));
1006
1007 if let Err(error) = enqueue_result {
1009 rooted!(in(*cx) let mut rooted_error = UndefinedValue());
1012 error.to_jsval(cx, global, rooted_error.handle_mut(), can_gc);
1013 self.error_if_needed(cx, rooted_error.handle(), global, can_gc);
1014
1015 return;
1017 }
1018
1019 let Some(stream) = self.stream.get() else {
1021 unreachable!("Controller should have a stream");
1022 };
1023
1024 if !stream.close_queued_or_in_flight() && stream.is_writable() {
1026 let backpressure = self.get_backpressure();
1028
1029 stream.update_backpressure(backpressure, global, can_gc);
1031 }
1032
1033 self.advance_queue_if_needed(cx, global, can_gc);
1035 }
1036
1037 pub(crate) fn error_if_needed(
1039 &self,
1040 cx: SafeJSContext,
1041 error: SafeHandleValue,
1042 global: &GlobalScope,
1043 can_gc: CanGc,
1044 ) {
1045 let Some(stream) = self.stream.get() else {
1047 unreachable!("Controller should have a stream");
1048 };
1049
1050 if stream.is_writable() {
1052 self.error(&stream, cx, error, global, can_gc);
1054 }
1055 }
1056
1057 pub(crate) fn error(
1059 &self,
1060 stream: &WritableStream,
1061 cx: SafeJSContext,
1062 e: SafeHandleValue,
1063 global: &GlobalScope,
1064 can_gc: CanGc,
1065 ) {
1066 assert!(stream.is_writable());
1071
1072 self.clear_algorithms();
1074
1075 stream.start_erroring(cx, global, e, can_gc);
1077 }
1078}
1079
1080impl WritableStreamDefaultControllerMethods<crate::DomTypeHolder>
1081 for WritableStreamDefaultController
1082{
1083 fn Error(&self, cx: SafeJSContext, e: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
1085 let Some(stream) = self.stream.get() else {
1087 unreachable!("Controller should have a stream");
1088 };
1089
1090 if !stream.is_writable() {
1092 return;
1093 }
1094
1095 let global = GlobalScope::from_safe_context(cx, realm);
1096
1097 self.error(&stream, cx, e, &global, can_gc);
1099 }
1100
1101 fn Signal(&self) -> DomRoot<AbortSignal> {
1103 self.abort_controller.signal()
1105 }
1106}