1use std::cell::{Cell, RefCell};
6use std::ptr;
7use std::rc::Rc;
8
9use dom_struct::dom_struct;
10use js::jsapi::{Heap, IsPromiseObject, JSObject};
11use js::jsval::{JSVal, UndefinedValue};
12use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle};
13
14use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
15use super::types::TransformStream;
16use crate::dom::bindings::callback::ExceptionHandling;
17use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::{
18 UnderlyingSinkAbortCallback, UnderlyingSinkCloseCallback, UnderlyingSinkStartCallback,
19 UnderlyingSinkWriteCallback,
20};
21use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultControllerBinding::WritableStreamDefaultControllerMethods;
22use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
23use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
24use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
25use crate::dom::globalscope::GlobalScope;
26use crate::dom::messageport::MessagePort;
27use crate::dom::promise::Promise;
28use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
29use crate::dom::readablestreamdefaultcontroller::{EnqueuedValue, QueueWithSizes, ValueWithSize};
30use crate::dom::types::{AbortController, AbortSignal};
31use crate::dom::writablestream::WritableStream;
32use crate::realms::{InRealm, enter_realm};
33use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
34
35impl js::gc::Rootable for CloseAlgorithmFulfillmentHandler {}
36
37#[derive(Clone, JSTraceable, MallocSizeOf)]
40#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
41struct CloseAlgorithmFulfillmentHandler {
42 stream: Dom<WritableStream>,
43}
44
45impl Callback for CloseAlgorithmFulfillmentHandler {
46 fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
47 let stream = self.stream.as_rooted();
48
49 stream.finish_in_flight_close(cx, can_gc);
51 }
52}
53
54impl js::gc::Rootable for CloseAlgorithmRejectionHandler {}
55
56#[derive(Clone, JSTraceable, MallocSizeOf)]
59#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
60struct CloseAlgorithmRejectionHandler {
61 stream: Dom<WritableStream>,
62}
63
64impl Callback for CloseAlgorithmRejectionHandler {
65 fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
66 let stream = self.stream.as_rooted();
67
68 let global = GlobalScope::from_safe_context(cx, realm);
69
70 stream.finish_in_flight_close_with_error(cx, &global, v, can_gc);
72 }
73}
74
75impl js::gc::Rootable for StartAlgorithmFulfillmentHandler {}
76
77#[derive(Clone, JSTraceable, MallocSizeOf)]
80#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
81struct StartAlgorithmFulfillmentHandler {
82 controller: Dom<WritableStreamDefaultController>,
83}
84
85impl Callback for StartAlgorithmFulfillmentHandler {
86 fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
89 let controller = self.controller.as_rooted();
90 let stream = controller
91 .stream
92 .get()
93 .expect("Controller should have a stream.");
94
95 assert!(stream.is_erroring() || stream.is_writable());
97
98 controller.started.set(true);
100
101 let global = GlobalScope::from_safe_context(cx, realm);
102
103 controller.advance_queue_if_needed(cx, &global, can_gc)
105 }
106}
107
108impl js::gc::Rootable for StartAlgorithmRejectionHandler {}
109
110#[derive(Clone, JSTraceable, MallocSizeOf)]
113#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
114struct StartAlgorithmRejectionHandler {
115 controller: Dom<WritableStreamDefaultController>,
116}
117
118impl Callback for StartAlgorithmRejectionHandler {
119 fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
122 let controller = self.controller.as_rooted();
123 let stream = controller
124 .stream
125 .get()
126 .expect("Controller should have a stream.");
127
128 assert!(stream.is_erroring() || stream.is_writable());
130
131 controller.started.set(true);
133
134 let global = GlobalScope::from_safe_context(cx, realm);
135
136 stream.deal_with_rejection(cx, &global, v, can_gc);
138 }
139}
140
141impl js::gc::Rootable for TransferBackPressurePromiseReaction {}
142
143#[derive(JSTraceable, MallocSizeOf)]
146#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
147struct TransferBackPressurePromiseReaction {
148 #[ignore_malloc_size_of = "Rc is hard"]
150 result_promise: Rc<Promise>,
151
152 #[ignore_malloc_size_of = "Rc is hard"]
154 backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
155
156 #[ignore_malloc_size_of = "mozjs"]
158 chunk: Box<Heap<JSVal>>,
159
160 port: Dom<MessagePort>,
162}
163
164impl Callback for TransferBackPressurePromiseReaction {
165 fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
167 let global = self.result_promise.global();
168 *self.backpressure_promise.borrow_mut() = Some(Promise::new(&global, can_gc));
170
171 rooted!(in(*cx) let mut chunk = UndefinedValue());
173 chunk.set(self.chunk.get());
174 let result =
175 self.port
176 .pack_and_post_message_handling_error("chunk", chunk.handle(), can_gc);
177
178 if let Err(error) = result {
180 global.disentangle_port(&self.port, can_gc);
182
183 self.result_promise.reject_error(error, can_gc);
185 } else {
186 self.result_promise.resolve_native(&(), can_gc);
188 }
189 }
190}
191
192impl js::gc::Rootable for WriteAlgorithmFulfillmentHandler {}
193
194#[derive(Clone, JSTraceable, MallocSizeOf)]
197#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
198struct WriteAlgorithmFulfillmentHandler {
199 controller: Dom<WritableStreamDefaultController>,
200}
201
202impl Callback for WriteAlgorithmFulfillmentHandler {
203 fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
204 let controller = self.controller.as_rooted();
205 let stream = controller
206 .stream
207 .get()
208 .expect("Controller should have a stream.");
209
210 stream.finish_in_flight_write(can_gc);
212
213 assert!(stream.is_erroring() || stream.is_writable());
216
217 {
219 rooted!(in(*cx) let mut rval = UndefinedValue());
220 let mut queue = controller.queue.borrow_mut();
221 queue.dequeue_value(cx, Some(rval.handle_mut()), can_gc);
222 }
223
224 let global = GlobalScope::from_safe_context(cx, realm);
225
226 if !stream.close_queued_or_in_flight() && stream.is_writable() {
228 let backpressure = controller.get_backpressure();
230
231 stream.update_backpressure(backpressure, &global, can_gc);
233 }
234
235 controller.advance_queue_if_needed(cx, &global, can_gc)
237 }
238}
239
240impl js::gc::Rootable for WriteAlgorithmRejectionHandler {}
241
242#[derive(Clone, JSTraceable, MallocSizeOf)]
245#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
246struct WriteAlgorithmRejectionHandler {
247 controller: Dom<WritableStreamDefaultController>,
248}
249
250impl Callback for WriteAlgorithmRejectionHandler {
251 fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
252 let controller = self.controller.as_rooted();
253 let stream = controller
254 .stream
255 .get()
256 .expect("Controller should have a stream.");
257
258 if stream.is_writable() {
260 controller.clear_algorithms();
262 }
263
264 let global = GlobalScope::from_safe_context(cx, realm);
265
266 stream.finish_in_flight_write_with_error(cx, &global, v, can_gc);
268 }
269}
270
271#[derive(JSTraceable, PartialEq)]
273#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
274pub enum UnderlyingSinkType {
275 Js {
277 abort: RefCell<Option<Rc<UnderlyingSinkAbortCallback>>>,
279
280 start: RefCell<Option<Rc<UnderlyingSinkStartCallback>>>,
281
282 close: RefCell<Option<Rc<UnderlyingSinkCloseCallback>>>,
284
285 write: RefCell<Option<Rc<UnderlyingSinkWriteCallback>>>,
287 },
288 Transfer {
291 backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
292 port: Dom<MessagePort>,
293 },
294 Transform(Dom<TransformStream>, Rc<Promise>),
296}
297
298impl UnderlyingSinkType {
299 pub(crate) fn new_js(
300 abort: Option<Rc<UnderlyingSinkAbortCallback>>,
301 start: Option<Rc<UnderlyingSinkStartCallback>>,
302 close: Option<Rc<UnderlyingSinkCloseCallback>>,
303 write: Option<Rc<UnderlyingSinkWriteCallback>>,
304 ) -> Self {
305 UnderlyingSinkType::Js {
306 abort: RefCell::new(abort),
307 start: RefCell::new(start),
308 close: RefCell::new(close),
309 write: RefCell::new(write),
310 }
311 }
312}
313
314#[dom_struct]
316pub struct WritableStreamDefaultController {
317 reflector_: Reflector,
318
319 #[ignore_malloc_size_of = "underlying_sink_type"]
322 underlying_sink_type: UnderlyingSinkType,
323
324 #[ignore_malloc_size_of = "mozjs"]
326 underlying_sink_obj: Heap<*mut JSObject>,
327
328 queue: RefCell<QueueWithSizes>,
330
331 started: Cell<bool>,
333
334 strategy_hwm: f64,
336
337 #[ignore_malloc_size_of = "Rc is hard"]
339 strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>,
340
341 stream: MutNullableDom<WritableStream>,
343
344 abort_controller: Dom<AbortController>,
346}
347
348impl WritableStreamDefaultController {
349 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
351 fn new_inherited(
352 global: &GlobalScope,
353 underlying_sink_type: UnderlyingSinkType,
354 strategy_hwm: f64,
355 strategy_size: Rc<QueuingStrategySize>,
356 can_gc: CanGc,
357 ) -> WritableStreamDefaultController {
358 WritableStreamDefaultController {
359 reflector_: Reflector::new(),
360 underlying_sink_type,
361 queue: Default::default(),
362 stream: Default::default(),
363 underlying_sink_obj: Default::default(),
364 strategy_hwm,
365 strategy_size: RefCell::new(Some(strategy_size)),
366 started: Default::default(),
367 abort_controller: Dom::from_ref(&AbortController::new_with_proto(global, None, can_gc)),
368 }
369 }
370
371 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
372 pub(crate) fn new(
373 global: &GlobalScope,
374 underlying_sink_type: UnderlyingSinkType,
375 strategy_hwm: f64,
376 strategy_size: Rc<QueuingStrategySize>,
377 can_gc: CanGc,
378 ) -> DomRoot<WritableStreamDefaultController> {
379 reflect_dom_object(
380 Box::new(WritableStreamDefaultController::new_inherited(
381 global,
382 underlying_sink_type,
383 strategy_hwm,
384 strategy_size,
385 can_gc,
386 )),
387 global,
388 can_gc,
389 )
390 }
391
392 pub(crate) fn started(&self) -> bool {
393 self.started.get()
394 }
395
396 pub(crate) fn set_underlying_sink_this_object(&self, this_object: SafeHandleObject) {
398 self.underlying_sink_obj.set(*this_object);
399 }
400
401 pub(crate) fn signal_abort(
403 &self,
404 cx: SafeJSContext,
405 reason: SafeHandleValue,
406 realm: InRealm,
407 can_gc: CanGc,
408 ) {
409 self.abort_controller
410 .signal_abort(cx, reason, realm, can_gc);
411 }
412
413 fn clear_algorithms(&self) {
415 match &self.underlying_sink_type {
416 UnderlyingSinkType::Js {
417 abort,
418 start: _,
419 close,
420 write,
421 } => {
422 write.borrow_mut().take();
424
425 close.borrow_mut().take();
427
428 abort.borrow_mut().take();
430 },
431 UnderlyingSinkType::Transfer {
432 backpressure_promise,
433 ..
434 } => {
435 backpressure_promise.borrow_mut().take();
436 },
437 UnderlyingSinkType::Transform(_, _) => {
438 return;
439 },
440 }
441
442 self.strategy_size.borrow_mut().take();
444 }
445
446 pub(crate) fn setup(
448 &self,
449 cx: SafeJSContext,
450 global: &GlobalScope,
451 stream: &WritableStream,
452 can_gc: CanGc,
453 ) -> Result<(), Error> {
454 stream.assert_no_controller();
459
460 self.stream.set(Some(stream));
462
463 stream.set_default_controller(self);
465
466 let backpressure = self.get_backpressure();
486
487 stream.update_backpressure(backpressure, global, can_gc);
489
490 let start_promise = self.start_algorithm(cx, global, can_gc)?;
493
494 let rooted_default_controller = DomRoot::from_ref(self);
495
496 rooted!(in(*cx) let mut fulfillment_handler = Some(StartAlgorithmFulfillmentHandler {
498 controller: Dom::from_ref(&rooted_default_controller),
499 }));
500
501 rooted!(in(*cx) let mut rejection_handler = Some(StartAlgorithmRejectionHandler {
503 controller: Dom::from_ref(&rooted_default_controller),
504 }));
505
506 let handler = PromiseNativeHandler::new(
507 global,
508 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
509 rejection_handler.take().map(|h| Box::new(h) as Box<_>),
510 can_gc,
511 );
512 let realm = enter_realm(global);
513 let comp = InRealm::Entered(&realm);
514 start_promise.append_native_handler(&handler, comp, can_gc);
515
516 Ok(())
517 }
518
519 pub(crate) fn close(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
521 {
523 let mut queue = self.queue.borrow_mut();
524 queue
525 .enqueue_value_with_size(EnqueuedValue::CloseSentinel)
526 .expect("Enqueuing the close sentinel should not fail.");
527 }
528 self.advance_queue_if_needed(cx, global, can_gc);
530 }
531
532 #[allow(unsafe_code)]
533 fn start_algorithm(
534 &self,
535 cx: SafeJSContext,
536 global: &GlobalScope,
537 can_gc: CanGc,
538 ) -> Fallible<Rc<Promise>> {
539 match &self.underlying_sink_type {
540 UnderlyingSinkType::Js {
541 start,
542 abort: _,
543 close: _,
544 write: _,
545 } => {
546 let algo = start.borrow().clone();
547 let start_promise = if let Some(start) = algo {
548 rooted!(in(*cx) let mut result_object = ptr::null_mut::<JSObject>());
549 rooted!(in(*cx) let mut result: JSVal);
550 rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
551 start.Call_(
552 &this_object.handle(),
553 self,
554 result.handle_mut(),
555 ExceptionHandling::Rethrow,
556 can_gc,
557 )?;
558 let is_promise = unsafe {
559 if result.is_object() {
560 result_object.set(result.to_object());
561 IsPromiseObject(result_object.handle().into_handle())
562 } else {
563 false
564 }
565 };
566 if is_promise {
567 Promise::new_with_js_promise(result_object.handle(), cx)
568 } else {
569 Promise::new_resolved(global, cx, result.get(), can_gc)
570 }
571 } else {
572 Promise::new_resolved(global, cx, (), can_gc)
574 };
575
576 Ok(start_promise)
577 },
578 UnderlyingSinkType::Transfer { .. } => {
579 Ok(Promise::new_resolved(global, cx, (), can_gc))
581 },
582 UnderlyingSinkType::Transform(_, start_promise) => {
583 Ok(start_promise.clone())
585 },
586 }
587 }
588
589 pub(crate) fn abort_steps(
591 &self,
592 cx: SafeJSContext,
593 global: &GlobalScope,
594 reason: SafeHandleValue,
595 can_gc: CanGc,
596 ) -> Rc<Promise> {
597 let result = match &self.underlying_sink_type {
598 UnderlyingSinkType::Js {
599 abort,
600 start: _,
601 close: _,
602 write: _,
603 } => {
604 rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
605 let algo = abort.borrow().clone();
606 let result = if let Some(algo) = algo {
608 algo.Call_(
609 &this_object.handle(),
610 Some(reason),
611 ExceptionHandling::Rethrow,
612 can_gc,
613 )
614 } else {
615 Ok(Promise::new_resolved(global, cx, (), can_gc))
616 };
617 result.unwrap_or_else(|e| {
618 let promise = Promise::new(global, can_gc);
619 promise.reject_error(e, can_gc);
620 promise
621 })
622 },
623 UnderlyingSinkType::Transfer { port, .. } => {
624 let result = port.pack_and_post_message_handling_error("error", reason, can_gc);
629
630 global.disentangle_port(port, can_gc);
632
633 let promise = Promise::new(global, can_gc);
634
635 if let Err(error) = result {
637 promise.reject_error(error, can_gc);
638 } else {
639 promise.resolve_native(&(), can_gc);
641 }
642 promise
643 },
644 UnderlyingSinkType::Transform(stream, _) => {
645 stream
647 .transform_stream_default_sink_abort_algorithm(cx, global, reason, can_gc)
648 .expect("Transform stream default sink abort algorithm should not fail.")
649 },
650 };
651
652 self.clear_algorithms();
654
655 result
656 }
657
658 fn call_write_algorithm(
660 &self,
661 cx: SafeJSContext,
662 chunk: SafeHandleValue,
663 global: &GlobalScope,
664 can_gc: CanGc,
665 ) -> Rc<Promise> {
666 match &self.underlying_sink_type {
667 UnderlyingSinkType::Js {
668 abort: _,
669 start: _,
670 close: _,
671 write,
672 } => {
673 rooted!(in(*cx) let this_object = self.underlying_sink_obj.get());
674 let algo = write.borrow().clone();
675 let result = if let Some(algo) = algo {
676 algo.Call_(
677 &this_object.handle(),
678 chunk,
679 self,
680 ExceptionHandling::Rethrow,
681 can_gc,
682 )
683 } else {
684 Ok(Promise::new_resolved(global, cx, (), can_gc))
685 };
686 result.unwrap_or_else(|e| {
687 let promise = Promise::new(global, can_gc);
688 promise.reject_error(e, can_gc);
689 promise
690 })
691 },
692 UnderlyingSinkType::Transfer {
693 backpressure_promise,
694 port,
695 } => {
696 {
700 let mut backpressure_promise = backpressure_promise.borrow_mut();
703 if backpressure_promise.is_none() {
704 *backpressure_promise = Some(Promise::new_resolved(global, cx, (), can_gc));
705 }
706 }
707
708 let result_promise = Promise::new(global, can_gc);
710 rooted!(in(*cx) let mut fulfillment_handler = Some(TransferBackPressurePromiseReaction {
711 port: port.clone(),
712 backpressure_promise: backpressure_promise.clone(),
713 chunk: Heap::boxed(chunk.get()),
714 result_promise: result_promise.clone(),
715 }));
716 let handler = PromiseNativeHandler::new(
717 global,
718 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
719 None,
720 can_gc,
721 );
722 let realm = enter_realm(global);
723 let comp = InRealm::Entered(&realm);
724 backpressure_promise
725 .borrow()
726 .as_ref()
727 .expect("Promise must be some by now.")
728 .append_native_handler(&handler, comp, can_gc);
729 result_promise
730 },
731 UnderlyingSinkType::Transform(stream, _) => {
732 stream
734 .transform_stream_default_sink_write_algorithm(cx, global, chunk, can_gc)
735 .expect("Transform stream default sink write algorithm should not fail.")
736 },
737 }
738 }
739
740 fn call_close_algorithm(
742 &self,
743 cx: SafeJSContext,
744 global: &GlobalScope,
745 can_gc: CanGc,
746 ) -> Rc<Promise> {
747 match &self.underlying_sink_type {
748 UnderlyingSinkType::Js {
749 abort: _,
750 start: _,
751 close,
752 write: _,
753 } => {
754 rooted!(in(*cx) let mut this_object = ptr::null_mut::<JSObject>());
755 this_object.set(self.underlying_sink_obj.get());
756 let algo = close.borrow().clone();
757 let result = if let Some(algo) = algo {
758 algo.Call_(&this_object.handle(), ExceptionHandling::Rethrow, can_gc)
759 } else {
760 Ok(Promise::new_resolved(global, cx, (), can_gc))
761 };
762 result.unwrap_or_else(|e| {
763 let promise = Promise::new(global, can_gc);
764 promise.reject_error(e, can_gc);
765 promise
766 })
767 },
768 UnderlyingSinkType::Transfer { port, .. } => {
769 rooted!(in(*cx) let mut value = UndefinedValue());
774 port.pack_and_post_message("close", value.handle(), can_gc)
775 .expect("Sending close should not fail.");
776
777 global.disentangle_port(port, can_gc);
779
780 Promise::new_resolved(global, cx, (), can_gc)
782 },
783 UnderlyingSinkType::Transform(stream, _) => {
784 stream
786 .transform_stream_default_sink_close_algorithm(cx, global, can_gc)
787 .expect("Transform stream default sink close algorithm should not fail.")
788 },
789 }
790 }
791
792 pub(crate) fn process_close(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
794 let Some(stream) = self.stream.get() else {
796 unreachable!("Controller should have a stream");
797 };
798
799 stream.mark_close_request_in_flight();
801
802 {
804 let mut queue = self.queue.borrow_mut();
805 queue.dequeue_value(cx, None, can_gc);
806 }
807
808 assert!(self.queue.borrow().is_empty());
810
811 let sink_close_promise = self.call_close_algorithm(cx, global, can_gc);
813
814 self.clear_algorithms();
816
817 rooted!(in(*cx) let mut fulfillment_handler = Some(CloseAlgorithmFulfillmentHandler {
819 stream: Dom::from_ref(&stream),
820 }));
821
822 rooted!(in(*cx) let mut rejection_handler = Some(CloseAlgorithmRejectionHandler {
824 stream: Dom::from_ref(&stream),
825 }));
826
827 let handler = PromiseNativeHandler::new(
829 global,
830 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
831 rejection_handler.take().map(|h| Box::new(h) as Box<_>),
832 can_gc,
833 );
834 let realm = enter_realm(global);
835 let comp = InRealm::Entered(&realm);
836 sink_close_promise.append_native_handler(&handler, comp, can_gc);
837 }
838
839 fn advance_queue_if_needed(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
841 let Some(stream) = self.stream.get() else {
843 unreachable!("Controller should have a stream");
844 };
845
846 if !self.started.get() {
848 return;
849 }
850
851 if stream.has_in_flight_write_request() {
853 return;
854 }
855
856 assert!(!(stream.is_errored() || stream.is_closed()));
860
861 if stream.is_erroring() {
863 stream.finish_erroring(cx, global, can_gc);
865
866 return;
868 }
869
870 rooted!(in(*cx) let mut value = UndefinedValue());
872 let is_closed = {
873 let queue = self.queue.borrow_mut();
874
875 if queue.is_empty() {
877 return;
878 }
879 queue.peek_queue_value(cx, value.handle_mut(), can_gc)
880 };
881
882 if is_closed {
883 self.process_close(cx, global, can_gc);
885 } else {
886 self.process_write(cx, value.handle(), global, can_gc);
888 };
889 }
890
891 pub(crate) fn perform_error_steps(&self) {
893 self.queue.borrow_mut().reset();
895 }
896
897 fn process_write(
899 &self,
900 cx: SafeJSContext,
901 chunk: SafeHandleValue,
902 global: &GlobalScope,
903 can_gc: CanGc,
904 ) {
905 let Some(stream) = self.stream.get() else {
907 unreachable!("Controller should have a stream");
908 };
909
910 stream.mark_first_write_request_in_flight();
912
913 let sink_write_promise = self.call_write_algorithm(cx, chunk, global, can_gc);
915
916 rooted!(in(*cx) let mut fulfillment_handler = Some(WriteAlgorithmFulfillmentHandler {
918 controller: Dom::from_ref(self),
919 }));
920
921 rooted!(in(*cx) let mut rejection_handler = Some(WriteAlgorithmRejectionHandler {
923 controller: Dom::from_ref(self),
924 }));
925
926 let handler = PromiseNativeHandler::new(
928 global,
929 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
930 rejection_handler.take().map(|h| Box::new(h) as Box<_>),
931 can_gc,
932 );
933 let realm = enter_realm(global);
934 let comp = InRealm::Entered(&realm);
935 sink_write_promise.append_native_handler(&handler, comp, can_gc);
936 }
937
938 pub(crate) fn get_desired_size(&self) -> f64 {
940 let queue = self.queue.borrow();
942 let desired_size = self.strategy_hwm - queue.total_size.clamp(0.0, f64::MAX);
943 desired_size.clamp(desired_size, self.strategy_hwm)
944 }
945
946 fn get_backpressure(&self) -> bool {
948 let desired_size = self.get_desired_size();
950
951 desired_size == 0.0 || desired_size.is_sign_negative()
953 }
954
955 pub(crate) fn get_chunk_size(
957 &self,
958 cx: SafeJSContext,
959 global: &GlobalScope,
960 chunk: SafeHandleValue,
961 can_gc: CanGc,
962 ) -> f64 {
963 let Some(strategy_size) = self.strategy_size.borrow().clone() else {
965 let Some(stream) = self.stream.get() else {
967 unreachable!("Controller should have a stream");
968 };
969 assert!(!stream.is_writable());
970
971 return 1.0;
973 };
974
975 let result = strategy_size.Call__(chunk, ExceptionHandling::Rethrow, can_gc);
978
979 match result {
980 Ok(size) => size,
982 Err(error) => {
983 rooted!(in(*cx) let mut rooted_error = UndefinedValue());
988 error.to_jsval(cx, global, rooted_error.handle_mut(), can_gc);
989 self.error_if_needed(cx, rooted_error.handle(), global, can_gc);
990
991 1.0
993 },
994 }
995 }
996
997 pub(crate) fn write(
999 &self,
1000 cx: SafeJSContext,
1001 global: &GlobalScope,
1002 chunk: SafeHandleValue,
1003 chunk_size: f64,
1004 can_gc: CanGc,
1005 ) {
1006 let enqueue_result = {
1008 let mut queue = self.queue.borrow_mut();
1009 queue.enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize {
1010 value: Heap::boxed(chunk.get()),
1011 size: chunk_size,
1012 }))
1013 };
1014
1015 if let Err(error) = enqueue_result {
1017 rooted!(in(*cx) let mut rooted_error = UndefinedValue());
1020 error.to_jsval(cx, global, rooted_error.handle_mut(), can_gc);
1021 self.error_if_needed(cx, rooted_error.handle(), global, can_gc);
1022
1023 return;
1025 }
1026
1027 let Some(stream) = self.stream.get() else {
1029 unreachable!("Controller should have a stream");
1030 };
1031
1032 if !stream.close_queued_or_in_flight() && stream.is_writable() {
1034 let backpressure = self.get_backpressure();
1036
1037 stream.update_backpressure(backpressure, global, can_gc);
1039 }
1040
1041 self.advance_queue_if_needed(cx, global, can_gc);
1043 }
1044
1045 pub(crate) fn error_if_needed(
1047 &self,
1048 cx: SafeJSContext,
1049 error: SafeHandleValue,
1050 global: &GlobalScope,
1051 can_gc: CanGc,
1052 ) {
1053 let Some(stream) = self.stream.get() else {
1055 unreachable!("Controller should have a stream");
1056 };
1057
1058 if stream.is_writable() {
1060 self.error(&stream, cx, error, global, can_gc);
1062 }
1063 }
1064
1065 pub(crate) fn error(
1067 &self,
1068 stream: &WritableStream,
1069 cx: SafeJSContext,
1070 e: SafeHandleValue,
1071 global: &GlobalScope,
1072 can_gc: CanGc,
1073 ) {
1074 assert!(stream.is_writable());
1079
1080 self.clear_algorithms();
1082
1083 stream.start_erroring(cx, global, e, can_gc);
1085 }
1086}
1087
1088impl WritableStreamDefaultControllerMethods<crate::DomTypeHolder>
1089 for WritableStreamDefaultController
1090{
1091 fn Error(&self, cx: SafeJSContext, e: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
1093 let Some(stream) = self.stream.get() else {
1095 unreachable!("Controller should have a stream");
1096 };
1097
1098 if !stream.is_writable() {
1100 return;
1101 }
1102
1103 let global = GlobalScope::from_safe_context(cx, realm);
1104
1105 self.error(&stream, cx, e, &global, can_gc);
1107 }
1108
1109 fn Signal(&self) -> DomRoot<AbortSignal> {
1111 self.abort_controller.signal()
1113 }
1114}