1use std::cell::{Cell, RefCell};
6use std::ptr;
7use std::rc::Rc;
8
9use dom_struct::dom_struct;
10use js::jsapi::{Heap, IsPromiseObject, JSObject};
11use js::jsval::{JSVal, UndefinedValue};
12use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle};
13
14use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
15use super::types::TransformStream;
16use crate::dom::bindings::callback::ExceptionHandling;
17use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::{
18 UnderlyingSinkAbortCallback, UnderlyingSinkCloseCallback, UnderlyingSinkStartCallback,
19 UnderlyingSinkWriteCallback,
20};
21use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultControllerBinding::WritableStreamDefaultControllerMethods;
22use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
23use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
24use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
25use crate::dom::globalscope::GlobalScope;
26use crate::dom::messageport::MessagePort;
27use crate::dom::promise::Promise;
28use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
29use crate::dom::readablestreamdefaultcontroller::{EnqueuedValue, QueueWithSizes, ValueWithSize};
30use crate::dom::types::{AbortController, AbortSignal};
31use crate::dom::writablestream::WritableStream;
32use crate::realms::{InRealm, enter_realm};
33use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
34
35impl js::gc::Rootable for CloseAlgorithmFulfillmentHandler {}
36
37#[derive(Clone, JSTraceable, MallocSizeOf)]
40#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
41struct CloseAlgorithmFulfillmentHandler {
42 stream: Dom<WritableStream>,
43}
44
45impl Callback for CloseAlgorithmFulfillmentHandler {
46 fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
47 let stream = self.stream.as_rooted();
48
49 stream.finish_in_flight_close(cx, can_gc);
51 }
52}
53
54impl js::gc::Rootable for CloseAlgorithmRejectionHandler {}
55
56#[derive(Clone, JSTraceable, MallocSizeOf)]
59#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
60struct CloseAlgorithmRejectionHandler {
61 stream: Dom<WritableStream>,
62}
63
64impl Callback for CloseAlgorithmRejectionHandler {
65 fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
66 let stream = self.stream.as_rooted();
67
68 let global = GlobalScope::from_safe_context(cx, realm);
69
70 stream.finish_in_flight_close_with_error(cx, &global, v, can_gc);
72 }
73}
74
75impl js::gc::Rootable for StartAlgorithmFulfillmentHandler {}
76
77#[derive(Clone, JSTraceable, MallocSizeOf)]
80#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
81struct StartAlgorithmFulfillmentHandler {
82 controller: Dom<WritableStreamDefaultController>,
83}
84
85impl Callback for StartAlgorithmFulfillmentHandler {
86 fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
89 let controller = self.controller.as_rooted();
90 let stream = controller
91 .stream
92 .get()
93 .expect("Controller should have a stream.");
94
95 assert!(stream.is_erroring() || stream.is_writable());
97
98 controller.started.set(true);
100
101 let global = GlobalScope::from_safe_context(cx, realm);
102
103 controller.advance_queue_if_needed(cx, &global, can_gc)
105 }
106}
107
108impl js::gc::Rootable for StartAlgorithmRejectionHandler {}
109
110#[derive(Clone, JSTraceable, MallocSizeOf)]
113#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
114struct StartAlgorithmRejectionHandler {
115 controller: Dom<WritableStreamDefaultController>,
116}
117
118impl Callback for StartAlgorithmRejectionHandler {
119 fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
122 let controller = self.controller.as_rooted();
123 let stream = controller
124 .stream
125 .get()
126 .expect("Controller should have a stream.");
127
128 assert!(stream.is_erroring() || stream.is_writable());
130
131 controller.started.set(true);
133
134 let global = GlobalScope::from_safe_context(cx, realm);
135
136 stream.deal_with_rejection(cx, &global, v, can_gc);
138 }
139}
140
141impl js::gc::Rootable for TransferBackPressurePromiseReaction {}
142
143#[derive(JSTraceable, MallocSizeOf)]
146#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
147struct TransferBackPressurePromiseReaction {
148 #[conditional_malloc_size_of]
150 result_promise: Rc<Promise>,
151
152 #[ignore_malloc_size_of = "nested Rc"]
154 backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
155
156 #[ignore_malloc_size_of = "mozjs"]
158 chunk: Box<Heap<JSVal>>,
159
160 port: Dom<MessagePort>,
162}
163
164impl Callback for TransferBackPressurePromiseReaction {
165 fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
167 let global = self.result_promise.global();
168 let promise = Promise::new(&global, can_gc);
170 *self.backpressure_promise.borrow_mut() = Some(promise);
171
172 rooted!(in(*cx) let mut chunk = UndefinedValue());
174 chunk.set(self.chunk.get());
175 let result =
176 self.port
177 .pack_and_post_message_handling_error("chunk", chunk.handle(), can_gc);
178
179 if let Err(error) = result {
181 global.disentangle_port(&self.port, can_gc);
183
184 self.result_promise.reject_error(error, can_gc);
186 } else {
187 self.result_promise.resolve_native(&(), can_gc);
189 }
190 }
191}
192
193impl js::gc::Rootable for WriteAlgorithmFulfillmentHandler {}
194
195#[derive(Clone, JSTraceable, MallocSizeOf)]
198#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
199struct WriteAlgorithmFulfillmentHandler {
200 controller: Dom<WritableStreamDefaultController>,
201}
202
203impl Callback for WriteAlgorithmFulfillmentHandler {
204 fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
205 let controller = self.controller.as_rooted();
206 let stream = controller
207 .stream
208 .get()
209 .expect("Controller should have a stream.");
210
211 stream.finish_in_flight_write(can_gc);
213
214 assert!(stream.is_erroring() || stream.is_writable());
217
218 rooted!(in(*cx) let mut rval = UndefinedValue());
220 controller
221 .queue
222 .dequeue_value(cx, Some(rval.handle_mut()), can_gc);
223
224 let global = GlobalScope::from_safe_context(cx, realm);
225
226 if !stream.close_queued_or_in_flight() && stream.is_writable() {
228 let backpressure = controller.get_backpressure();
230
231 stream.update_backpressure(backpressure, &global, can_gc);
233 }
234
235 controller.advance_queue_if_needed(cx, &global, can_gc)
237 }
238}
239
240impl js::gc::Rootable for WriteAlgorithmRejectionHandler {}
241
242#[derive(Clone, JSTraceable, MallocSizeOf)]
245#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
246struct WriteAlgorithmRejectionHandler {
247 controller: Dom<WritableStreamDefaultController>,
248}
249
250impl Callback for WriteAlgorithmRejectionHandler {
251 fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
252 let controller = self.controller.as_rooted();
253 let stream = controller
254 .stream
255 .get()
256 .expect("Controller should have a stream.");
257
258 if stream.is_writable() {
260 controller.clear_algorithms();
262 }
263
264 let global = GlobalScope::from_safe_context(cx, realm);
265
266 stream.finish_in_flight_write_with_error(cx, &global, v, can_gc);
268 }
269}
270
271#[derive(JSTraceable, PartialEq)]
273#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
274pub enum UnderlyingSinkType {
275 Js {
277 abort: RefCell<Option<Rc<UnderlyingSinkAbortCallback>>>,
279
280 start: RefCell<Option<Rc<UnderlyingSinkStartCallback>>>,
281
282 close: RefCell<Option<Rc<UnderlyingSinkCloseCallback>>>,
284
285 write: RefCell<Option<Rc<UnderlyingSinkWriteCallback>>>,
287 },
288 Transfer {
291 backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
292 port: Dom<MessagePort>,
293 },
294 Transform(Dom<TransformStream>, Rc<Promise>),
296}
297
298impl UnderlyingSinkType {
299 pub(crate) fn new_js(
300 abort: Option<Rc<UnderlyingSinkAbortCallback>>,
301 start: Option<Rc<UnderlyingSinkStartCallback>>,
302 close: Option<Rc<UnderlyingSinkCloseCallback>>,
303 write: Option<Rc<UnderlyingSinkWriteCallback>>,
304 ) -> Self {
305 UnderlyingSinkType::Js {
306 abort: RefCell::new(abort),
307 start: RefCell::new(start),
308 close: RefCell::new(close),
309 write: RefCell::new(write),
310 }
311 }
312}
313
314#[dom_struct]
316pub struct WritableStreamDefaultController {
317 reflector_: Reflector,
318
319 #[ignore_malloc_size_of = "underlying_sink_type"]
322 underlying_sink_type: UnderlyingSinkType,
323
324 #[ignore_malloc_size_of = "mozjs"]
326 underlying_sink_obj: Heap<*mut JSObject>,
327
328 queue: QueueWithSizes,
330
331 started: Cell<bool>,
333
334 strategy_hwm: f64,
336
337 #[ignore_malloc_size_of = "QueuingStrategySize"]
339 strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>,
340
341 stream: MutNullableDom<WritableStream>,
343
344 abort_controller: Dom<AbortController>,
346}
347
348impl WritableStreamDefaultController {
349 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
351 fn new_inherited(
352 global: &GlobalScope,
353 underlying_sink_type: UnderlyingSinkType,
354 strategy_hwm: f64,
355 strategy_size: Rc<QueuingStrategySize>,
356 can_gc: CanGc,
357 ) -> WritableStreamDefaultController {
358 WritableStreamDefaultController {
359 reflector_: Reflector::new(),
360 underlying_sink_type,
361 queue: Default::default(),
362 stream: Default::default(),
363 underlying_sink_obj: Default::default(),
364 strategy_hwm,
365 strategy_size: RefCell::new(Some(strategy_size)),
366 started: Default::default(),
367 abort_controller: Dom::from_ref(&AbortController::new_with_proto(global, None, can_gc)),
368 }
369 }
370
371 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
372 pub(crate) fn new(
373 global: &GlobalScope,
374 underlying_sink_type: UnderlyingSinkType,
375 strategy_hwm: f64,
376 strategy_size: Rc<QueuingStrategySize>,
377 can_gc: CanGc,
378 ) -> DomRoot<WritableStreamDefaultController> {
379 reflect_dom_object(
380 Box::new(WritableStreamDefaultController::new_inherited(
381 global,
382 underlying_sink_type,
383 strategy_hwm,
384 strategy_size,
385 can_gc,
386 )),
387 global,
388 can_gc,
389 )
390 }
391
392 pub(crate) fn started(&self) -> bool {
393 self.started.get()
394 }
395
396 pub(crate) fn set_underlying_sink_this_object(&self, this_object: SafeHandleObject) {
398 self.underlying_sink_obj.set(*this_object);
399 }
400
401 pub(crate) fn signal_abort(
403 &self,
404 cx: SafeJSContext,
405 reason: SafeHandleValue,
406 realm: InRealm,
407 can_gc: CanGc,
408 ) {
409 self.abort_controller
410 .signal_abort(cx, reason, realm, can_gc);
411 }
412
413 fn clear_algorithms(&self) {
415 match &self.underlying_sink_type {
416 UnderlyingSinkType::Js {
417 abort,
418 start: _,
419 close,
420 write,
421 } => {
422 write.borrow_mut().take();
424
425 close.borrow_mut().take();
427
428 abort.borrow_mut().take();
430 },
431 UnderlyingSinkType::Transfer {
432 backpressure_promise,
433 ..
434 } => {
435 backpressure_promise.borrow_mut().take();
436 },
437 UnderlyingSinkType::Transform(_, _) => {
438 return;
439 },
440 }
441
442 self.strategy_size.borrow_mut().take();
444 }
445
446 pub(crate) fn setup(
448 &self,
449 cx: SafeJSContext,
450 global: &GlobalScope,
451 stream: &WritableStream,
452 can_gc: CanGc,
453 ) -> Result<(), Error> {
454 stream.assert_no_controller();
459
460 self.stream.set(Some(stream));
462
463 stream.set_default_controller(self);
465
466 let backpressure = self.get_backpressure();
486
487 stream.update_backpressure(backpressure, global, can_gc);
489
490 let start_promise = self.start_algorithm(cx, global, can_gc)?;
493
494 let rooted_default_controller = DomRoot::from_ref(self);
495
496 rooted!(in(*cx) let mut fulfillment_handler = Some(StartAlgorithmFulfillmentHandler {
498 controller: Dom::from_ref(&rooted_default_controller),
499 }));
500
501 rooted!(in(*cx) let mut rejection_handler = Some(StartAlgorithmRejectionHandler {
503 controller: Dom::from_ref(&rooted_default_controller),
504 }));
505
506 let handler = PromiseNativeHandler::new(
507 global,
508 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
509 rejection_handler.take().map(|h| Box::new(h) as Box<_>),
510 can_gc,
511 );
512 let realm = enter_realm(global);
513 let comp = InRealm::Entered(&realm);
514 start_promise.append_native_handler(&handler, comp, can_gc);
515
516 Ok(())
517 }
518
519 pub(crate) fn close(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
521 self.queue
523 .enqueue_value_with_size(EnqueuedValue::CloseSentinel)
524 .expect("Enqueuing the close sentinel should not fail.");
525 self.advance_queue_if_needed(cx, global, can_gc);
527 }
528
529 #[allow(unsafe_code)]
530 fn start_algorithm(
531 &self,
532 cx: SafeJSContext,
533 global: &GlobalScope,
534 can_gc: CanGc,
535 ) -> Fallible<Rc<Promise>> {
536 match &self.underlying_sink_type {
537 UnderlyingSinkType::Js {
538 start,
539 abort: _,
540 close: _,
541 write: _,
542 } => {
543 let algo = start.borrow().clone();
544 let start_promise = if let Some(start) = algo {
545 rooted!(in(*cx) let mut result_object = ptr::null_mut::<JSObject>());
546 rooted!(in(*cx) let mut result: JSVal);
547 rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
548 start.Call_(
549 &this_object.handle(),
550 self,
551 result.handle_mut(),
552 ExceptionHandling::Rethrow,
553 can_gc,
554 )?;
555 let is_promise = unsafe {
556 if result.is_object() {
557 result_object.set(result.to_object());
558 IsPromiseObject(result_object.handle().into_handle())
559 } else {
560 false
561 }
562 };
563 if is_promise {
564 Promise::new_with_js_promise(result_object.handle(), cx)
565 } else {
566 Promise::new_resolved(global, cx, result.get(), can_gc)
567 }
568 } else {
569 Promise::new_resolved(global, cx, (), can_gc)
571 };
572
573 Ok(start_promise)
574 },
575 UnderlyingSinkType::Transfer { .. } => {
576 Ok(Promise::new_resolved(global, cx, (), can_gc))
578 },
579 UnderlyingSinkType::Transform(_, start_promise) => {
580 Ok(start_promise.clone())
582 },
583 }
584 }
585
586 pub(crate) fn abort_steps(
588 &self,
589 cx: SafeJSContext,
590 global: &GlobalScope,
591 reason: SafeHandleValue,
592 can_gc: CanGc,
593 ) -> Rc<Promise> {
594 let result = match &self.underlying_sink_type {
595 UnderlyingSinkType::Js {
596 abort,
597 start: _,
598 close: _,
599 write: _,
600 } => {
601 rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
602 let algo = abort.borrow().clone();
603 let result = if let Some(algo) = algo {
605 algo.Call_(
606 &this_object.handle(),
607 Some(reason),
608 ExceptionHandling::Rethrow,
609 can_gc,
610 )
611 } else {
612 Ok(Promise::new_resolved(global, cx, (), can_gc))
613 };
614 result.unwrap_or_else(|e| {
615 let promise = Promise::new(global, can_gc);
616 promise.reject_error(e, can_gc);
617 promise
618 })
619 },
620 UnderlyingSinkType::Transfer { port, .. } => {
621 let result = port.pack_and_post_message_handling_error("error", reason, can_gc);
626
627 global.disentangle_port(port, can_gc);
629
630 let promise = Promise::new(global, can_gc);
631
632 if let Err(error) = result {
634 promise.reject_error(error, can_gc);
635 } else {
636 promise.resolve_native(&(), can_gc);
638 }
639 promise
640 },
641 UnderlyingSinkType::Transform(stream, _) => {
642 stream
644 .transform_stream_default_sink_abort_algorithm(cx, global, reason, can_gc)
645 .expect("Transform stream default sink abort algorithm should not fail.")
646 },
647 };
648
649 self.clear_algorithms();
651
652 result
653 }
654
655 fn call_write_algorithm(
657 &self,
658 cx: SafeJSContext,
659 chunk: SafeHandleValue,
660 global: &GlobalScope,
661 can_gc: CanGc,
662 ) -> Rc<Promise> {
663 match &self.underlying_sink_type {
664 UnderlyingSinkType::Js {
665 abort: _,
666 start: _,
667 close: _,
668 write,
669 } => {
670 rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
671 let algo = write.borrow().clone();
672 let result = if let Some(algo) = algo {
673 algo.Call_(
674 &this_object.handle(),
675 chunk,
676 self,
677 ExceptionHandling::Rethrow,
678 can_gc,
679 )
680 } else {
681 Ok(Promise::new_resolved(global, cx, (), can_gc))
682 };
683 result.unwrap_or_else(|e| {
684 let promise = Promise::new(global, can_gc);
685 promise.reject_error(e, can_gc);
686 promise
687 })
688 },
689 UnderlyingSinkType::Transfer {
690 backpressure_promise,
691 port,
692 } => {
693 if backpressure_promise.borrow().is_none() {
699 let promise = Promise::new_resolved(global, cx, (), can_gc);
700 *backpressure_promise.borrow_mut() = Some(promise);
701 }
702
703 let result_promise = Promise::new(global, can_gc);
705 rooted!(in(*cx) let mut fulfillment_handler = Some(TransferBackPressurePromiseReaction {
706 port: port.clone(),
707 backpressure_promise: backpressure_promise.clone(),
708 chunk: Heap::boxed(chunk.get()),
709 result_promise: result_promise.clone(),
710 }));
711 let handler = PromiseNativeHandler::new(
712 global,
713 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
714 None,
715 can_gc,
716 );
717 let realm = enter_realm(global);
718 let comp = InRealm::Entered(&realm);
719 backpressure_promise
720 .borrow()
721 .as_ref()
722 .expect("Promise must be some by now.")
723 .append_native_handler(&handler, comp, can_gc);
724 result_promise
725 },
726 UnderlyingSinkType::Transform(stream, _) => {
727 stream
729 .transform_stream_default_sink_write_algorithm(cx, global, chunk, can_gc)
730 .expect("Transform stream default sink write algorithm should not fail.")
731 },
732 }
733 }
734
735 fn call_close_algorithm(
737 &self,
738 cx: SafeJSContext,
739 global: &GlobalScope,
740 can_gc: CanGc,
741 ) -> Rc<Promise> {
742 match &self.underlying_sink_type {
743 UnderlyingSinkType::Js {
744 abort: _,
745 start: _,
746 close,
747 write: _,
748 } => {
749 rooted!(in(*cx) let mut this_object = ptr::null_mut::<JSObject>());
750 this_object.set(self.underlying_sink_obj.get());
751 let algo = close.borrow().clone();
752 let result = if let Some(algo) = algo {
753 algo.Call_(&this_object.handle(), ExceptionHandling::Rethrow, can_gc)
754 } else {
755 Ok(Promise::new_resolved(global, cx, (), can_gc))
756 };
757 result.unwrap_or_else(|e| {
758 let promise = Promise::new(global, can_gc);
759 promise.reject_error(e, can_gc);
760 promise
761 })
762 },
763 UnderlyingSinkType::Transfer { port, .. } => {
764 rooted!(in(*cx) let mut value = UndefinedValue());
769 port.pack_and_post_message("close", value.handle(), can_gc)
770 .expect("Sending close should not fail.");
771
772 global.disentangle_port(port, can_gc);
774
775 Promise::new_resolved(global, cx, (), can_gc)
777 },
778 UnderlyingSinkType::Transform(stream, _) => {
779 stream
781 .transform_stream_default_sink_close_algorithm(cx, global, can_gc)
782 .expect("Transform stream default sink close algorithm should not fail.")
783 },
784 }
785 }
786
787 pub(crate) fn process_close(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
789 let Some(stream) = self.stream.get() else {
791 unreachable!("Controller should have a stream");
792 };
793
794 stream.mark_close_request_in_flight();
796
797 self.queue.dequeue_value(cx, None, can_gc);
799
800 assert!(self.queue.is_empty());
802
803 let sink_close_promise = self.call_close_algorithm(cx, global, can_gc);
805
806 self.clear_algorithms();
808
809 rooted!(in(*cx) let mut fulfillment_handler = Some(CloseAlgorithmFulfillmentHandler {
811 stream: Dom::from_ref(&stream),
812 }));
813
814 rooted!(in(*cx) let mut rejection_handler = Some(CloseAlgorithmRejectionHandler {
816 stream: Dom::from_ref(&stream),
817 }));
818
819 let handler = PromiseNativeHandler::new(
821 global,
822 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
823 rejection_handler.take().map(|h| Box::new(h) as Box<_>),
824 can_gc,
825 );
826 let realm = enter_realm(global);
827 let comp = InRealm::Entered(&realm);
828 sink_close_promise.append_native_handler(&handler, comp, can_gc);
829 }
830
831 fn advance_queue_if_needed(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
833 let Some(stream) = self.stream.get() else {
835 unreachable!("Controller should have a stream");
836 };
837
838 if !self.started.get() {
840 return;
841 }
842
843 if stream.has_in_flight_write_request() {
845 return;
846 }
847
848 assert!(!(stream.is_errored() || stream.is_closed()));
852
853 if stream.is_erroring() {
855 stream.finish_erroring(cx, global, can_gc);
857
858 return;
860 }
861
862 rooted!(in(*cx) let mut value = UndefinedValue());
864 let is_closed = {
865 if self.queue.is_empty() {
867 return;
868 }
869 self.queue.peek_queue_value(cx, value.handle_mut(), can_gc)
870 };
871
872 if is_closed {
873 self.process_close(cx, global, can_gc);
875 } else {
876 self.process_write(cx, value.handle(), global, can_gc);
878 };
879 }
880
881 pub(crate) fn perform_error_steps(&self) {
883 self.queue.reset();
885 }
886
887 fn process_write(
889 &self,
890 cx: SafeJSContext,
891 chunk: SafeHandleValue,
892 global: &GlobalScope,
893 can_gc: CanGc,
894 ) {
895 let Some(stream) = self.stream.get() else {
897 unreachable!("Controller should have a stream");
898 };
899
900 stream.mark_first_write_request_in_flight();
902
903 let sink_write_promise = self.call_write_algorithm(cx, chunk, global, can_gc);
905
906 rooted!(in(*cx) let mut fulfillment_handler = Some(WriteAlgorithmFulfillmentHandler {
908 controller: Dom::from_ref(self),
909 }));
910
911 rooted!(in(*cx) let mut rejection_handler = Some(WriteAlgorithmRejectionHandler {
913 controller: Dom::from_ref(self),
914 }));
915
916 let handler = PromiseNativeHandler::new(
918 global,
919 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
920 rejection_handler.take().map(|h| Box::new(h) as Box<_>),
921 can_gc,
922 );
923 let realm = enter_realm(global);
924 let comp = InRealm::Entered(&realm);
925 sink_write_promise.append_native_handler(&handler, comp, can_gc);
926 }
927
928 pub(crate) fn get_desired_size(&self) -> f64 {
930 let desired_size = self.strategy_hwm - self.queue.total_size.get().clamp(0.0, f64::MAX);
932 desired_size.clamp(desired_size, self.strategy_hwm)
933 }
934
935 fn get_backpressure(&self) -> bool {
937 let desired_size = self.get_desired_size();
939
940 desired_size == 0.0 || desired_size.is_sign_negative()
942 }
943
944 pub(crate) fn get_chunk_size(
946 &self,
947 cx: SafeJSContext,
948 global: &GlobalScope,
949 chunk: SafeHandleValue,
950 can_gc: CanGc,
951 ) -> f64 {
952 let Some(strategy_size) = self.strategy_size.borrow().clone() else {
954 let Some(stream) = self.stream.get() else {
956 unreachable!("Controller should have a stream");
957 };
958 assert!(!stream.is_writable());
959
960 return 1.0;
962 };
963
964 let result = strategy_size.Call__(chunk, ExceptionHandling::Rethrow, can_gc);
967
968 match result {
969 Ok(size) => size,
971 Err(error) => {
972 rooted!(in(*cx) let mut rooted_error = UndefinedValue());
977 error.to_jsval(cx, global, rooted_error.handle_mut(), can_gc);
978 self.error_if_needed(cx, rooted_error.handle(), global, can_gc);
979
980 1.0
982 },
983 }
984 }
985
986 pub(crate) fn write(
988 &self,
989 cx: SafeJSContext,
990 global: &GlobalScope,
991 chunk: SafeHandleValue,
992 chunk_size: f64,
993 can_gc: CanGc,
994 ) {
995 let enqueue_result = self
997 .queue
998 .enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize {
999 value: Heap::boxed(chunk.get()),
1000 size: chunk_size,
1001 }));
1002
1003 if let Err(error) = enqueue_result {
1005 rooted!(in(*cx) let mut rooted_error = UndefinedValue());
1008 error.to_jsval(cx, global, rooted_error.handle_mut(), can_gc);
1009 self.error_if_needed(cx, rooted_error.handle(), global, can_gc);
1010
1011 return;
1013 }
1014
1015 let Some(stream) = self.stream.get() else {
1017 unreachable!("Controller should have a stream");
1018 };
1019
1020 if !stream.close_queued_or_in_flight() && stream.is_writable() {
1022 let backpressure = self.get_backpressure();
1024
1025 stream.update_backpressure(backpressure, global, can_gc);
1027 }
1028
1029 self.advance_queue_if_needed(cx, global, can_gc);
1031 }
1032
1033 pub(crate) fn error_if_needed(
1035 &self,
1036 cx: SafeJSContext,
1037 error: SafeHandleValue,
1038 global: &GlobalScope,
1039 can_gc: CanGc,
1040 ) {
1041 let Some(stream) = self.stream.get() else {
1043 unreachable!("Controller should have a stream");
1044 };
1045
1046 if stream.is_writable() {
1048 self.error(&stream, cx, error, global, can_gc);
1050 }
1051 }
1052
1053 pub(crate) fn error(
1055 &self,
1056 stream: &WritableStream,
1057 cx: SafeJSContext,
1058 e: SafeHandleValue,
1059 global: &GlobalScope,
1060 can_gc: CanGc,
1061 ) {
1062 assert!(stream.is_writable());
1067
1068 self.clear_algorithms();
1070
1071 stream.start_erroring(cx, global, e, can_gc);
1073 }
1074}
1075
1076impl WritableStreamDefaultControllerMethods<crate::DomTypeHolder>
1077 for WritableStreamDefaultController
1078{
1079 fn Error(&self, cx: SafeJSContext, e: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
1081 let Some(stream) = self.stream.get() else {
1083 unreachable!("Controller should have a stream");
1084 };
1085
1086 if !stream.is_writable() {
1088 return;
1089 }
1090
1091 let global = GlobalScope::from_safe_context(cx, realm);
1092
1093 self.error(&stream, cx, e, &global, can_gc);
1095 }
1096
1097 fn Signal(&self) -> DomRoot<AbortSignal> {
1099 self.abort_controller.signal()
1101 }
1102}