1use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::mem;
8use std::ptr::{self};
9use std::rc::Rc;
10
11use dom_struct::dom_struct;
12use js::jsapi::{Heap, JSObject};
13use js::jsval::{JSVal, ObjectValue, UndefinedValue};
14use js::realm::CurrentRealm;
15use js::rust::{
16 HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
17 MutableHandleValue as SafeMutableHandleValue,
18};
19use rustc_hash::FxHashMap;
20use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
21use script_bindings::conversions::SafeToJSValConvertible;
22use servo_base::id::{MessagePortId, MessagePortIndex};
23use servo_constellation_traits::MessagePortImpl;
24
25use crate::dom::bindings::cell::DomRefCell;
26use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::{
27 QueuingStrategy, QueuingStrategySize,
28};
29use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::UnderlyingSink;
30use crate::dom::bindings::codegen::Bindings::WritableStreamBinding::WritableStreamMethods;
31use crate::dom::bindings::conversions::ConversionResult;
32use crate::dom::bindings::error::{Error, Fallible};
33use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
34use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
35use crate::dom::bindings::structuredclone::StructuredData;
36use crate::dom::bindings::transferable::Transferable;
37use crate::dom::domexception::{DOMErrorName, DOMException};
38use crate::dom::globalscope::GlobalScope;
39use crate::dom::messageport::MessagePort;
40use crate::dom::promise::Promise;
41use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
42use crate::dom::readablestream::{ReadableStream, get_type_and_value_from_message};
43use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
44use crate::dom::stream::writablestreamdefaultcontroller::{
45 UnderlyingSinkType, WritableStreamDefaultController,
46};
47use crate::dom::stream::writablestreamdefaultwriter::WritableStreamDefaultWriter;
48use crate::realms::{InRealm, enter_auto_realm, enter_realm};
49use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
50
51impl js::gc::Rootable for AbortAlgorithmFulfillmentHandler {}
52
53#[derive(JSTraceable, MallocSizeOf)]
56#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
57struct AbortAlgorithmFulfillmentHandler {
58 stream: Dom<WritableStream>,
59 #[conditional_malloc_size_of]
60 abort_request_promise: Rc<Promise>,
61}
62
63impl Callback for AbortAlgorithmFulfillmentHandler {
64 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
65 self.abort_request_promise
67 .resolve_native(&(), CanGc::from_cx(cx));
68
69 self.stream
71 .as_rooted()
72 .reject_close_and_closed_promise_if_needed(cx);
73 }
74}
75
76impl js::gc::Rootable for AbortAlgorithmRejectionHandler {}
77
78#[derive(JSTraceable, MallocSizeOf)]
81#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
82struct AbortAlgorithmRejectionHandler {
83 stream: Dom<WritableStream>,
84 #[conditional_malloc_size_of]
85 abort_request_promise: Rc<Promise>,
86}
87
88impl Callback for AbortAlgorithmRejectionHandler {
89 fn callback(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) {
90 self.abort_request_promise
92 .reject_native(&reason, CanGc::from_cx(cx));
93
94 self.stream
96 .as_rooted()
97 .reject_close_and_closed_promise_if_needed(cx);
98 }
99}
100
101impl js::gc::Rootable for PendingAbortRequest {}
102
103#[derive(JSTraceable, MallocSizeOf)]
105#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
106struct PendingAbortRequest {
107 #[conditional_malloc_size_of]
109 promise: Rc<Promise>,
110
111 #[ignore_malloc_size_of = "mozjs"]
113 reason: Box<Heap<JSVal>>,
114
115 was_already_erroring: bool,
117}
118
119#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf)]
121pub(crate) enum WritableStreamState {
122 #[default]
123 Writable,
124 Closed,
125 Erroring,
126 Errored,
127}
128
129#[dom_struct]
131pub struct WritableStream {
132 reflector_: Reflector,
133
134 backpressure: Cell<bool>,
136
137 #[conditional_malloc_size_of]
139 close_request: DomRefCell<Option<Rc<Promise>>>,
140
141 controller: MutNullableDom<WritableStreamDefaultController>,
143
144 detached: Cell<bool>,
146
147 #[conditional_malloc_size_of]
149 in_flight_write_request: DomRefCell<Option<Rc<Promise>>>,
150
151 #[conditional_malloc_size_of]
153 in_flight_close_request: DomRefCell<Option<Rc<Promise>>>,
154
155 pending_abort_request: DomRefCell<Option<PendingAbortRequest>>,
157
158 state: Cell<WritableStreamState>,
160
161 #[ignore_malloc_size_of = "mozjs"]
163 stored_error: Heap<JSVal>,
164
165 writer: MutNullableDom<WritableStreamDefaultWriter>,
167
168 #[conditional_malloc_size_of]
170 write_requests: DomRefCell<VecDeque<Rc<Promise>>>,
171}
172
173impl WritableStream {
174 fn new_inherited() -> WritableStream {
176 WritableStream {
177 reflector_: Reflector::new(),
178 backpressure: Default::default(),
179 close_request: Default::default(),
180 controller: Default::default(),
181 detached: Default::default(),
182 in_flight_write_request: Default::default(),
183 in_flight_close_request: Default::default(),
184 pending_abort_request: Default::default(),
185 state: Default::default(),
186 stored_error: Default::default(),
187 writer: Default::default(),
188 write_requests: Default::default(),
189 }
190 }
191
192 pub(crate) fn new_with_proto(
193 global: &GlobalScope,
194 proto: Option<SafeHandleObject>,
195 can_gc: CanGc,
196 ) -> DomRoot<WritableStream> {
197 reflect_dom_object_with_proto(
198 Box::new(WritableStream::new_inherited()),
199 global,
200 proto,
201 can_gc,
202 )
203 }
204
205 pub(crate) fn assert_no_controller(&self) {
208 assert!(self.controller.get().is_none());
209 }
210
211 pub(crate) fn set_default_controller(&self, controller: &WritableStreamDefaultController) {
214 self.controller.set(Some(controller));
215 }
216
217 pub(crate) fn get_default_controller(&self) -> DomRoot<WritableStreamDefaultController> {
218 self.controller.get().expect("Controller should be set.")
219 }
220
221 pub(crate) fn is_writable(&self) -> bool {
222 matches!(self.state.get(), WritableStreamState::Writable)
223 }
224
225 pub(crate) fn is_erroring(&self) -> bool {
226 matches!(self.state.get(), WritableStreamState::Erroring)
227 }
228
229 pub(crate) fn is_errored(&self) -> bool {
230 matches!(self.state.get(), WritableStreamState::Errored)
231 }
232
233 pub(crate) fn is_closed(&self) -> bool {
234 matches!(self.state.get(), WritableStreamState::Closed)
235 }
236
237 pub(crate) fn has_in_flight_write_request(&self) -> bool {
238 self.in_flight_write_request.borrow().is_some()
239 }
240
241 pub(crate) fn has_operations_marked_inflight(&self) -> bool {
243 let in_flight_write_requested = self.in_flight_write_request.borrow().is_some();
244 let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
245
246 in_flight_write_requested || in_flight_close_requested
247 }
248
249 pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
251 handle_mut.set(self.stored_error.get());
252 }
253
254 pub(crate) fn finish_erroring(&self, cx: &mut js::context::JSContext, global: &GlobalScope) {
256 assert!(self.is_erroring());
258
259 assert!(!self.has_operations_marked_inflight());
261
262 self.state.set(WritableStreamState::Errored);
264
265 let Some(controller) = self.controller.get() else {
267 unreachable!("Stream should have a controller.");
268 };
269 controller.perform_error_steps();
270
271 rooted!(&in(cx) let mut stored_error = UndefinedValue());
273 self.get_stored_error(stored_error.handle_mut());
274
275 let write_requests = mem::take(&mut *self.write_requests.borrow_mut());
277 for request in write_requests {
278 request.reject(cx.into(), stored_error.handle(), CanGc::from_cx(cx));
280 }
281
282 if self.pending_abort_request.borrow().is_none() {
287 self.reject_close_and_closed_promise_if_needed(cx);
289
290 return;
292 }
293
294 rooted!(&in(cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
297 if let Some(pending_abort_request) = &*pending_abort_request {
298 if pending_abort_request.was_already_erroring {
300 pending_abort_request.promise.reject(
302 cx.into(),
303 stored_error.handle(),
304 CanGc::from_cx(cx),
305 );
306
307 self.reject_close_and_closed_promise_if_needed(cx);
309
310 return;
312 }
313
314 rooted!(&in(cx) let mut reason = UndefinedValue());
316 reason.set(pending_abort_request.reason.get());
317 let promise = controller.abort_steps(cx, global, reason.handle());
318
319 rooted!(&in(cx) let mut fulfillment_handler = Some(AbortAlgorithmFulfillmentHandler {
321 stream: Dom::from_ref(self),
322 abort_request_promise: pending_abort_request.promise.clone(),
323 }));
324
325 rooted!(&in(cx) let mut rejection_handler = Some(AbortAlgorithmRejectionHandler {
327 stream: Dom::from_ref(self),
328 abort_request_promise: pending_abort_request.promise.clone(),
329 }));
330
331 let handler = PromiseNativeHandler::new(
332 global,
333 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
334 rejection_handler.take().map(|h| Box::new(h) as Box<_>),
335 CanGc::from_cx(cx),
336 );
337 let realm = enter_realm(global);
338 let comp = InRealm::Entered(&realm);
339 promise.append_native_handler(&handler, comp, CanGc::from_cx(cx));
340 }
341 }
342
343 fn reject_close_and_closed_promise_if_needed(&self, cx: &mut js::context::JSContext) {
345 assert!(self.is_errored());
347
348 rooted!(&in(cx) let mut stored_error = UndefinedValue());
349 self.get_stored_error(stored_error.handle_mut());
350
351 let close_request = self.close_request.borrow_mut().take();
353 if let Some(close_request) = close_request {
354 assert!(self.in_flight_close_request.borrow().is_none());
356
357 close_request.reject_native(&stored_error.handle(), CanGc::from_cx(cx))
359
360 }
363
364 if let Some(writer) = self.writer.get() {
367 writer.reject_closed_promise_with_stored_error(cx, &stored_error.handle());
369
370 writer.set_close_promise_is_handled();
372 }
373 }
374
375 pub(crate) fn close_queued_or_in_flight(&self) -> bool {
377 let close_requested = self.close_request.borrow().is_some();
378 let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
379
380 close_requested || in_flight_close_requested
381 }
382
383 pub(crate) fn finish_in_flight_write(&self, can_gc: CanGc) {
385 let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
386 unreachable!("Stream should have a write request");
388 };
389
390 in_flight_write_request.resolve_native(&(), can_gc);
392
393 }
396
397 pub(crate) fn start_erroring(
399 &self,
400 cx: &mut js::context::JSContext,
401 global: &GlobalScope,
402 error: SafeHandleValue,
403 ) {
404 assert!(self.stored_error.get().is_undefined());
406
407 assert!(self.is_writable());
409
410 let Some(controller) = self.controller.get() else {
412 unreachable!("Stream should have a controller.");
414 };
415
416 self.state.set(WritableStreamState::Erroring);
418
419 self.stored_error.set(*error);
421
422 if let Some(writer) = self.writer.get() {
424 writer.ensure_ready_promise_rejected(global, error, CanGc::from_cx(cx));
426 }
427
428 if !self.has_operations_marked_inflight() && controller.started() {
430 self.finish_erroring(cx, global);
432 }
433 }
434
435 pub(crate) fn deal_with_rejection(
437 &self,
438 cx: &mut js::context::JSContext,
439 global: &GlobalScope,
440 error: SafeHandleValue,
441 ) {
442 if self.is_writable() {
446 self.start_erroring(cx, global, error);
448
449 return;
451 }
452
453 assert!(self.is_erroring());
455
456 self.finish_erroring(cx, global);
458 }
459
460 pub(crate) fn mark_first_write_request_in_flight(&self) {
462 let mut in_flight_write_request = self.in_flight_write_request.borrow_mut();
463 let mut write_requests = self.write_requests.borrow_mut();
464
465 assert!(in_flight_write_request.is_none());
467
468 assert!(!write_requests.is_empty());
470
471 let write_request = write_requests.pop_front().unwrap();
474
475 *in_flight_write_request = Some(write_request);
477 }
478
479 pub(crate) fn mark_close_request_in_flight(&self) {
481 let mut in_flight_close_request = self.in_flight_close_request.borrow_mut();
482 let mut close_request = self.close_request.borrow_mut();
483
484 assert!(in_flight_close_request.is_none());
486
487 assert!(close_request.is_some());
489
490 let close_request = close_request.take().unwrap();
493
494 *in_flight_close_request = Some(close_request);
496 }
497
498 pub(crate) fn finish_in_flight_close(&self, cx: SafeJSContext, can_gc: CanGc) {
500 let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
501 unreachable!("in_flight_close_request must be Some");
503 };
504
505 in_flight_close_request.resolve_native(&(), can_gc);
507
508 assert!(self.is_writable() || self.is_erroring());
513
514 if self.is_erroring() {
516 self.stored_error.set(UndefinedValue());
518
519 rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
521 if let Some(pending_abort_request) = &*pending_abort_request {
522 pending_abort_request.promise.resolve_native(&(), can_gc);
524
525 }
528 }
529
530 self.state.set(WritableStreamState::Closed);
532
533 if let Some(writer) = self.writer.get() {
535 writer.resolve_closed_promise_with_undefined(can_gc);
538 }
539
540 assert!(self.pending_abort_request.borrow().is_none());
542
543 assert!(self.stored_error.get().is_undefined());
545 }
546
547 pub(crate) fn finish_in_flight_close_with_error(
549 &self,
550 cx: &mut js::context::JSContext,
551 global: &GlobalScope,
552 error: SafeHandleValue,
553 ) {
554 let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
555 unreachable!("Inflight close request must be defined.");
557 };
558
559 in_flight_close_request.reject_native(&error, CanGc::from_cx(cx));
561
562 assert!(self.is_erroring() || self.is_writable());
567
568 rooted!(&in(cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
570 if let Some(pending_abort_request) = &*pending_abort_request {
571 pending_abort_request
573 .promise
574 .reject_native(&error, CanGc::from_cx(cx));
575
576 }
579
580 self.deal_with_rejection(cx, global, error);
582 }
583
584 pub(crate) fn finish_in_flight_write_with_error(
586 &self,
587 cx: &mut js::context::JSContext,
588 global: &GlobalScope,
589 error: SafeHandleValue,
590 ) {
591 let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
592 unreachable!("Inflight write request must be defined.");
594 };
595
596 in_flight_write_request.reject_native(&error, CanGc::from_cx(cx));
598
599 assert!(self.is_erroring() || self.is_writable());
604
605 self.deal_with_rejection(cx, global, error);
607 }
608
609 pub(crate) fn get_writer(&self) -> Option<DomRoot<WritableStreamDefaultWriter>> {
610 self.writer.get()
611 }
612
613 pub(crate) fn set_writer(&self, writer: Option<&WritableStreamDefaultWriter>) {
614 self.writer.set(writer);
615 }
616
617 pub(crate) fn set_backpressure(&self, backpressure: bool) {
618 self.backpressure.set(backpressure);
619 }
620
621 pub(crate) fn get_backpressure(&self) -> bool {
622 self.backpressure.get()
623 }
624
625 pub(crate) fn is_locked(&self) -> bool {
627 self.get_writer().is_some()
630 }
631
632 pub(crate) fn add_write_request(&self, global: &GlobalScope, can_gc: CanGc) -> Rc<Promise> {
634 assert!(self.is_locked());
636
637 assert!(self.is_writable());
639
640 let promise = Promise::new(global, can_gc);
642
643 self.write_requests.borrow_mut().push_back(promise.clone());
645
646 promise
648 }
649
650 pub(crate) fn get_controller(&self) -> Option<DomRoot<WritableStreamDefaultController>> {
652 self.controller.get()
653 }
654
655 pub(crate) fn abort(
657 &self,
658 cx: &mut CurrentRealm,
659 global: &GlobalScope,
660 provided_reason: SafeHandleValue,
661 ) -> Rc<Promise> {
662 if self.is_closed() || self.is_errored() {
664 return Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
666 }
667
668 self.get_controller()
670 .expect("Stream must have a controller.")
671 .signal_abort(cx, provided_reason);
672
673 let state = self.state.get();
675
676 if matches!(
678 state,
679 WritableStreamState::Closed | WritableStreamState::Errored
680 ) {
681 return Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
682 }
683
684 if self.pending_abort_request.borrow().is_some() {
686 return self
688 .pending_abort_request
689 .borrow()
690 .as_ref()
691 .expect("Pending abort request must be Some.")
692 .promise
693 .clone();
694 }
695
696 assert!(self.is_writable() || self.is_erroring());
698
699 let mut was_already_erroring = false;
701 rooted!(&in(cx) let undefined_reason = UndefinedValue());
702
703 let reason = if self.is_erroring() {
705 was_already_erroring = true;
707
708 undefined_reason.handle()
710 } else {
711 provided_reason
713 };
714
715 let promise = Promise::new2(cx, global);
717
718 *self.pending_abort_request.borrow_mut() = Some(PendingAbortRequest {
723 promise: promise.clone(),
724 reason: Heap::boxed(reason.get()),
725 was_already_erroring,
726 });
727
728 if !was_already_erroring {
730 self.start_erroring(cx, global, reason);
732 }
733
734 promise
736 }
737
738 pub(crate) fn close(
740 &self,
741 cx: &mut js::context::JSContext,
742 global: &GlobalScope,
743 ) -> Rc<Promise> {
744 if self.is_closed() || self.is_errored() {
747 let promise = Promise::new2(cx, global);
749 promise.reject_error(
750 Error::Type(c"Stream is closed or errored.".to_owned()),
751 CanGc::from_cx(cx),
752 );
753 return promise;
754 }
755
756 assert!(self.is_writable() || self.is_erroring());
758
759 assert!(!self.close_queued_or_in_flight());
761
762 let promise = Promise::new2(cx, global);
764
765 *self.close_request.borrow_mut() = Some(promise.clone());
767
768 if let Some(writer) = self.writer.get() {
771 if self.get_backpressure() && self.is_writable() {
774 writer.resolve_ready_promise_with_undefined(CanGc::from_cx(cx));
776 }
777 }
778
779 let Some(controller) = self.controller.get() else {
781 unreachable!("Stream must have a controller.");
782 };
783 controller.close(cx, global);
784
785 promise
787 }
788
789 pub(crate) fn get_desired_size(&self) -> Option<f64> {
792 if self.is_errored() || self.is_erroring() {
798 return None;
799 }
800
801 if self.is_closed() {
803 return Some(0.);
804 }
805
806 let Some(controller) = self.controller.get() else {
807 unreachable!("Stream must have a controller.");
808 };
809 Some(controller.get_desired_size())
810 }
811
812 pub(crate) fn aquire_default_writer(
814 &self,
815 cx: SafeJSContext,
816 global: &GlobalScope,
817 can_gc: CanGc,
818 ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
819 let writer = WritableStreamDefaultWriter::new(global, None, can_gc);
821
822 writer.setup(cx, self, can_gc)?;
824
825 Ok(writer)
827 }
828
829 pub(crate) fn update_backpressure(
831 &self,
832 backpressure: bool,
833 global: &GlobalScope,
834 can_gc: CanGc,
835 ) {
836 self.is_writable();
838
839 assert!(!self.close_queued_or_in_flight());
841
842 let writer = self.get_writer();
844
845 if let Some(writer) = writer {
846 if backpressure != self.get_backpressure() {
848 if backpressure {
850 let promise = Promise::new(global, can_gc);
852 writer.set_ready_promise(promise);
853 } else {
854 assert!(!backpressure);
857 writer.resolve_ready_promise_with_undefined(can_gc);
859 }
860 }
861 }
862
863 self.set_backpressure(backpressure);
865 }
866
867 pub(crate) fn setup_cross_realm_transform_writable(
869 &self,
870 cx: &mut js::context::JSContext,
871 port: &MessagePort,
872 ) {
873 let port_id = port.message_port_id();
874 let global = self.global();
875
876 let size_algorithm =
882 extract_size_algorithm(&QueuingStrategy::default(), CanGc::from_cx(cx));
883
884 let backpressure_promise = Rc::new(RefCell::new(Some(Promise::new(
888 &global,
889 CanGc::from_cx(cx),
890 ))));
891
892 let controller = WritableStreamDefaultController::new(
894 &global,
895 UnderlyingSinkType::Transfer {
896 backpressure_promise: backpressure_promise.clone(),
897 port: Dom::from_ref(port),
898 },
899 1.0,
900 size_algorithm,
901 CanGc::from_cx(cx),
902 );
903
904 rooted!(&in(cx) let cross_realm_transform_writable = CrossRealmTransformWritable {
907 controller: Dom::from_ref(&controller),
908 backpressure_promise,
909 });
910 global.note_cross_realm_transform_writable(&cross_realm_transform_writable, port_id);
911
912 port.Start(cx);
914
915 controller
917 .setup(cx, &global, self)
918 .expect("Setup for transfer cannot fail");
919 }
920 #[allow(clippy::too_many_arguments)]
922 fn setup_from_underlying_sink(
923 &self,
924 cx: &mut js::context::JSContext,
925 global: &GlobalScope,
926 stream: &WritableStream,
927 underlying_sink_obj: SafeHandleObject,
928 underlying_sink: &UnderlyingSink,
929 strategy_hwm: f64,
930 strategy_size: Rc<QueuingStrategySize>,
931 ) -> Result<(), Error> {
932 let controller = WritableStreamDefaultController::new(
958 global,
959 UnderlyingSinkType::new_js(
960 underlying_sink.abort.clone(),
961 underlying_sink.start.clone(),
962 underlying_sink.close.clone(),
963 underlying_sink.write.clone(),
964 ),
965 strategy_hwm,
966 strategy_size,
967 CanGc::from_cx(cx),
968 );
969
970 controller.set_underlying_sink_this_object(underlying_sink_obj);
973
974 controller.setup(cx, global, stream)
976 }
977}
978
979#[cfg_attr(crown, expect(crown::unrooted_must_root))]
981pub(crate) fn create_writable_stream(
982 cx: &mut js::context::JSContext,
983 global: &GlobalScope,
984 writable_high_water_mark: f64,
985 writable_size_algorithm: Rc<QueuingStrategySize>,
986 underlying_sink_type: UnderlyingSinkType,
987) -> Fallible<DomRoot<WritableStream>> {
988 assert!(writable_high_water_mark >= 0.0);
990
991 let stream = WritableStream::new_with_proto(global, None, CanGc::from_cx(cx));
994
995 let controller = WritableStreamDefaultController::new(
997 global,
998 underlying_sink_type,
999 writable_high_water_mark,
1000 writable_size_algorithm,
1001 CanGc::from_cx(cx),
1002 );
1003
1004 controller.setup(cx, global, &stream)?;
1007
1008 Ok(stream)
1010}
1011
1012impl WritableStreamMethods<crate::DomTypeHolder> for WritableStream {
1013 fn Constructor(
1015 cx: &mut js::context::JSContext,
1016 global: &GlobalScope,
1017 proto: Option<SafeHandleObject>,
1018 underlying_sink: Option<*mut JSObject>,
1019 strategy: &QueuingStrategy,
1020 ) -> Fallible<DomRoot<WritableStream>> {
1021 rooted!(&in(cx) let underlying_sink_obj = underlying_sink.unwrap_or(ptr::null_mut()));
1023
1024 let underlying_sink_dict = if !underlying_sink_obj.is_null() {
1027 rooted!(&in(cx) let obj_val = ObjectValue(underlying_sink_obj.get()));
1028 match UnderlyingSink::new(cx.into(), obj_val.handle(), CanGc::from_cx(cx)) {
1029 Ok(ConversionResult::Success(val)) => val,
1030 Ok(ConversionResult::Failure(error)) => {
1031 return Err(Error::Type(error.into_owned()));
1032 },
1033 _ => {
1034 return Err(Error::JSFailed);
1035 },
1036 }
1037 } else {
1038 UnderlyingSink::empty()
1039 };
1040
1041 if !underlying_sink_dict.type_.handle().is_undefined() {
1042 return Err(Error::Range(c"type is set".to_owned()));
1044 }
1045
1046 let stream = WritableStream::new_with_proto(global, proto, CanGc::from_cx(cx));
1048
1049 let size_algorithm = extract_size_algorithm(strategy, CanGc::from_cx(cx));
1051
1052 let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
1054
1055 stream.setup_from_underlying_sink(
1058 cx,
1059 global,
1060 &stream,
1061 underlying_sink_obj.handle(),
1062 &underlying_sink_dict,
1063 high_water_mark,
1064 size_algorithm,
1065 )?;
1066
1067 Ok(stream)
1068 }
1069
1070 fn Locked(&self) -> bool {
1072 self.is_locked()
1074 }
1075
1076 fn Abort(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) -> Rc<Promise> {
1078 let global = GlobalScope::from_current_realm(cx);
1079
1080 if self.is_locked() {
1082 let promise = Promise::new2(cx, &global);
1084 promise.reject_error(
1085 Error::Type(c"Stream is locked.".to_owned()),
1086 CanGc::from_cx(cx),
1087 );
1088 return promise;
1089 }
1090
1091 self.abort(cx, &global, reason)
1093 }
1094
1095 fn Close(&self, cx: &mut CurrentRealm) -> Rc<Promise> {
1097 let global = GlobalScope::from_current_realm(cx);
1098
1099 if self.is_locked() {
1101 let promise = Promise::new2(cx, &global);
1103 promise.reject_error(
1104 Error::Type(c"Stream is locked.".to_owned()),
1105 CanGc::from_cx(cx),
1106 );
1107 return promise;
1108 }
1109
1110 if self.close_queued_or_in_flight() {
1112 let promise = Promise::new2(cx, &global);
1114 promise.reject_error(
1115 Error::Type(c"Stream has closed queued or in-flight".to_owned()),
1116 CanGc::from_cx(cx),
1117 );
1118 return promise;
1119 }
1120
1121 self.close(cx, &global)
1123 }
1124
1125 fn GetWriter(
1127 &self,
1128 realm: InRealm,
1129 can_gc: CanGc,
1130 ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
1131 let cx = GlobalScope::get_cx();
1132 let global = GlobalScope::from_safe_context(cx, realm);
1133
1134 self.aquire_default_writer(cx, &global, can_gc)
1136 }
1137}
1138
1139impl js::gc::Rootable for CrossRealmTransformWritable {}
1140
1141#[derive(Clone, JSTraceable, MallocSizeOf)]
1145#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
1146pub(crate) struct CrossRealmTransformWritable {
1147 controller: Dom<WritableStreamDefaultController>,
1149
1150 #[ignore_malloc_size_of = "nested Rc"]
1152 backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
1153}
1154
1155impl CrossRealmTransformWritable {
1156 #[expect(unsafe_code)]
1159 pub(crate) fn handle_message(
1160 &self,
1161 cx: &mut CurrentRealm,
1162 global: &GlobalScope,
1163 message: SafeHandleValue,
1164 ) {
1165 rooted!(&in(cx) let mut value = UndefinedValue());
1166 let type_string = unsafe {
1167 get_type_and_value_from_message(
1168 cx.into(),
1169 message,
1170 value.handle_mut(),
1171 CanGc::from_cx(cx),
1172 )
1173 };
1174
1175 if type_string == "error" {
1180 self.controller.error_if_needed(cx, value.handle(), global);
1182 }
1183
1184 let backpressure_promise = self.backpressure_promise.borrow_mut().take();
1185
1186 if let Some(promise) = backpressure_promise {
1189 promise.resolve_native(&(), CanGc::from_cx(cx));
1191
1192 }
1195 }
1196
1197 pub(crate) fn handle_error(
1200 &self,
1201 cx: &mut CurrentRealm,
1202 global: &GlobalScope,
1203 port: &MessagePort,
1204 ) {
1205 let error = DOMException::new(global, DOMErrorName::DataCloneError, CanGc::from_cx(cx));
1207 rooted!(&in(cx) let mut rooted_error = UndefinedValue());
1208 error.safe_to_jsval(cx.into(), rooted_error.handle_mut(), CanGc::from_cx(cx));
1209
1210 port.cross_realm_transform_send_error(rooted_error.handle(), CanGc::from_cx(cx));
1212
1213 self.controller
1215 .error_if_needed(cx, rooted_error.handle(), global);
1216
1217 global.disentangle_port(cx, port);
1219 }
1220}
1221
1222impl Transferable for WritableStream {
1224 type Index = MessagePortIndex;
1225 type Data = MessagePortImpl;
1226
1227 fn transfer(
1229 &self,
1230 cx: &mut js::context::JSContext,
1231 ) -> Fallible<(MessagePortId, MessagePortImpl)> {
1232 if self.is_locked() {
1235 return Err(Error::DataClone(None));
1236 }
1237
1238 let global = self.global();
1239 let mut realm = enter_auto_realm(cx, &*global);
1240 let mut realm = realm.current_realm();
1241 let cx = &mut realm;
1242
1243 let port_1 = MessagePort::new(&global, CanGc::from_cx(cx));
1245 global.track_message_port(&port_1, None);
1246
1247 let port_2 = MessagePort::new(&global, CanGc::from_cx(cx));
1249 global.track_message_port(&port_2, None);
1250
1251 global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
1253
1254 let readable = ReadableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
1256
1257 readable.setup_cross_realm_transform_readable(cx, &port_1);
1259
1260 let promise = readable.pipe_to(cx, &global, self, false, false, false, None);
1262
1263 promise.set_promise_is_handled();
1265
1266 port_2.transfer(cx)
1268 }
1269
1270 fn transfer_receive(
1272 cx: &mut js::context::JSContext,
1273 owner: &GlobalScope,
1274 id: MessagePortId,
1275 port_impl: MessagePortImpl,
1276 ) -> Result<DomRoot<Self>, ()> {
1277 let value = WritableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1280
1281 let transferred_port = MessagePort::transfer_receive(cx, owner, id, port_impl)?;
1288
1289 value.setup_cross_realm_transform_writable(cx, &transferred_port);
1291 Ok(value)
1292 }
1293
1294 fn serialized_storage<'a>(
1296 data: StructuredData<'a, '_>,
1297 ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
1298 match data {
1299 StructuredData::Reader(r) => &mut r.port_impls,
1300 StructuredData::Writer(w) => &mut w.ports,
1301 }
1302 }
1303}