1use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::mem;
8use std::ptr::{self};
9use std::rc::Rc;
10
11use dom_struct::dom_struct;
12use js::jsapi::{Heap, JSObject};
13use js::jsval::{JSVal, ObjectValue, UndefinedValue};
14use js::realm::CurrentRealm;
15use js::rust::{
16 HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
17 MutableHandleValue as SafeMutableHandleValue,
18};
19use rustc_hash::FxHashMap;
20use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
21use script_bindings::conversions::SafeToJSValConvertible;
22use servo_base::id::{MessagePortId, MessagePortIndex};
23use servo_constellation_traits::MessagePortImpl;
24
25use crate::dom::bindings::cell::DomRefCell;
26use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::{
27 QueuingStrategy, QueuingStrategySize,
28};
29use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::UnderlyingSink;
30use crate::dom::bindings::codegen::Bindings::WritableStreamBinding::WritableStreamMethods;
31use crate::dom::bindings::conversions::ConversionResult;
32use crate::dom::bindings::error::{Error, Fallible};
33use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
34use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
35use crate::dom::bindings::structuredclone::StructuredData;
36use crate::dom::bindings::transferable::Transferable;
37use crate::dom::domexception::{DOMErrorName, DOMException};
38use crate::dom::globalscope::GlobalScope;
39use crate::dom::messageport::MessagePort;
40use crate::dom::promise::Promise;
41use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
42use crate::dom::readablestream::{ReadableStream, get_type_and_value_from_message};
43use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
44use crate::dom::stream::writablestreamdefaultcontroller::{
45 UnderlyingSinkType, WritableStreamDefaultController,
46};
47use crate::dom::stream::writablestreamdefaultwriter::WritableStreamDefaultWriter;
48use crate::realms::{InRealm, enter_auto_realm, enter_realm};
49use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
50
51impl js::gc::Rootable for AbortAlgorithmFulfillmentHandler {}
52
53#[derive(JSTraceable, MallocSizeOf)]
56#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
57struct AbortAlgorithmFulfillmentHandler {
58 stream: Dom<WritableStream>,
59 #[conditional_malloc_size_of]
60 abort_request_promise: Rc<Promise>,
61}
62
63impl Callback for AbortAlgorithmFulfillmentHandler {
64 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
65 self.abort_request_promise
67 .resolve_native(&(), CanGc::from_cx(cx));
68
69 self.stream
71 .as_rooted()
72 .reject_close_and_closed_promise_if_needed(cx);
73 }
74}
75
76impl js::gc::Rootable for AbortAlgorithmRejectionHandler {}
77
78#[derive(JSTraceable, MallocSizeOf)]
81#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
82struct AbortAlgorithmRejectionHandler {
83 stream: Dom<WritableStream>,
84 #[conditional_malloc_size_of]
85 abort_request_promise: Rc<Promise>,
86}
87
88impl Callback for AbortAlgorithmRejectionHandler {
89 fn callback(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) {
90 self.abort_request_promise
92 .reject_native(&reason, CanGc::from_cx(cx));
93
94 self.stream
96 .as_rooted()
97 .reject_close_and_closed_promise_if_needed(cx);
98 }
99}
100
101impl js::gc::Rootable for PendingAbortRequest {}
102
103#[derive(JSTraceable, MallocSizeOf)]
105#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
106struct PendingAbortRequest {
107 #[conditional_malloc_size_of]
109 promise: Rc<Promise>,
110
111 #[ignore_malloc_size_of = "mozjs"]
113 reason: Box<Heap<JSVal>>,
114
115 was_already_erroring: bool,
117}
118
119#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf)]
121pub(crate) enum WritableStreamState {
122 #[default]
123 Writable,
124 Closed,
125 Erroring,
126 Errored,
127}
128
129#[dom_struct]
131pub struct WritableStream {
132 reflector_: Reflector,
133
134 backpressure: Cell<bool>,
136
137 #[conditional_malloc_size_of]
139 close_request: DomRefCell<Option<Rc<Promise>>>,
140
141 controller: MutNullableDom<WritableStreamDefaultController>,
143
144 detached: Cell<bool>,
146
147 #[conditional_malloc_size_of]
149 in_flight_write_request: DomRefCell<Option<Rc<Promise>>>,
150
151 #[conditional_malloc_size_of]
153 in_flight_close_request: DomRefCell<Option<Rc<Promise>>>,
154
155 pending_abort_request: DomRefCell<Option<PendingAbortRequest>>,
157
158 state: Cell<WritableStreamState>,
160
161 #[ignore_malloc_size_of = "mozjs"]
163 stored_error: Heap<JSVal>,
164
165 writer: MutNullableDom<WritableStreamDefaultWriter>,
167
168 #[conditional_malloc_size_of]
170 write_requests: DomRefCell<VecDeque<Rc<Promise>>>,
171}
172
173impl WritableStream {
174 fn new_inherited() -> WritableStream {
176 WritableStream {
177 reflector_: Reflector::new(),
178 backpressure: Default::default(),
179 close_request: Default::default(),
180 controller: Default::default(),
181 detached: Default::default(),
182 in_flight_write_request: Default::default(),
183 in_flight_close_request: Default::default(),
184 pending_abort_request: Default::default(),
185 state: Default::default(),
186 stored_error: Default::default(),
187 writer: Default::default(),
188 write_requests: Default::default(),
189 }
190 }
191
192 pub(crate) fn new_with_proto(
193 global: &GlobalScope,
194 proto: Option<SafeHandleObject>,
195 can_gc: CanGc,
196 ) -> DomRoot<WritableStream> {
197 reflect_dom_object_with_proto(
198 Box::new(WritableStream::new_inherited()),
199 global,
200 proto,
201 can_gc,
202 )
203 }
204
205 pub(crate) fn assert_no_controller(&self) {
208 assert!(self.controller.get().is_none());
209 }
210
211 pub(crate) fn set_default_controller(&self, controller: &WritableStreamDefaultController) {
214 self.controller.set(Some(controller));
215 }
216
217 pub(crate) fn get_default_controller(&self) -> DomRoot<WritableStreamDefaultController> {
218 self.controller.get().expect("Controller should be set.")
219 }
220
221 pub(crate) fn is_writable(&self) -> bool {
222 matches!(self.state.get(), WritableStreamState::Writable)
223 }
224
225 pub(crate) fn is_erroring(&self) -> bool {
226 matches!(self.state.get(), WritableStreamState::Erroring)
227 }
228
229 pub(crate) fn is_errored(&self) -> bool {
230 matches!(self.state.get(), WritableStreamState::Errored)
231 }
232
233 pub(crate) fn is_closed(&self) -> bool {
234 matches!(self.state.get(), WritableStreamState::Closed)
235 }
236
237 pub(crate) fn has_in_flight_write_request(&self) -> bool {
238 self.in_flight_write_request.borrow().is_some()
239 }
240
241 pub(crate) fn has_operations_marked_inflight(&self) -> bool {
243 let in_flight_write_requested = self.in_flight_write_request.borrow().is_some();
244 let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
245
246 in_flight_write_requested || in_flight_close_requested
247 }
248
249 pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
251 handle_mut.set(self.stored_error.get());
252 }
253
254 pub(crate) fn finish_erroring(&self, cx: &mut js::context::JSContext, global: &GlobalScope) {
256 assert!(self.is_erroring());
258
259 assert!(!self.has_operations_marked_inflight());
261
262 self.state.set(WritableStreamState::Errored);
264
265 let Some(controller) = self.controller.get() else {
267 unreachable!("Stream should have a controller.");
268 };
269 controller.perform_error_steps();
270
271 rooted!(&in(cx) let mut stored_error = UndefinedValue());
273 self.get_stored_error(stored_error.handle_mut());
274
275 let write_requests = mem::take(&mut *self.write_requests.borrow_mut());
277 for request in write_requests {
278 request.reject(cx.into(), stored_error.handle(), CanGc::from_cx(cx));
280 }
281
282 if self.pending_abort_request.borrow().is_none() {
287 self.reject_close_and_closed_promise_if_needed(cx);
289
290 return;
292 }
293
294 rooted!(&in(cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
297 if let Some(pending_abort_request) = &*pending_abort_request {
298 if pending_abort_request.was_already_erroring {
300 pending_abort_request.promise.reject(
302 cx.into(),
303 stored_error.handle(),
304 CanGc::from_cx(cx),
305 );
306
307 self.reject_close_and_closed_promise_if_needed(cx);
309
310 return;
312 }
313
314 rooted!(&in(cx) let mut reason = UndefinedValue());
316 reason.set(pending_abort_request.reason.get());
317 let promise = controller.abort_steps(cx, global, reason.handle());
318
319 rooted!(&in(cx) let mut fulfillment_handler = Some(AbortAlgorithmFulfillmentHandler {
321 stream: Dom::from_ref(self),
322 abort_request_promise: pending_abort_request.promise.clone(),
323 }));
324
325 rooted!(&in(cx) let mut rejection_handler = Some(AbortAlgorithmRejectionHandler {
327 stream: Dom::from_ref(self),
328 abort_request_promise: pending_abort_request.promise.clone(),
329 }));
330
331 let handler = PromiseNativeHandler::new(
332 global,
333 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
334 rejection_handler.take().map(|h| Box::new(h) as Box<_>),
335 CanGc::from_cx(cx),
336 );
337 let realm = enter_realm(global);
338 let comp = InRealm::Entered(&realm);
339 promise.append_native_handler(&handler, comp, CanGc::from_cx(cx));
340 }
341 }
342
343 fn reject_close_and_closed_promise_if_needed(&self, cx: &mut js::context::JSContext) {
345 assert!(self.is_errored());
347
348 rooted!(&in(cx) let mut stored_error = UndefinedValue());
349 self.get_stored_error(stored_error.handle_mut());
350
351 let close_request = self.close_request.borrow_mut().take();
353 if let Some(close_request) = close_request {
354 assert!(self.in_flight_close_request.borrow().is_none());
356
357 close_request.reject_native(&stored_error.handle(), CanGc::from_cx(cx))
359
360 }
363
364 if let Some(writer) = self.writer.get() {
367 writer.reject_closed_promise_with_stored_error(cx, &stored_error.handle());
369
370 writer.set_close_promise_is_handled();
372 }
373 }
374
375 pub(crate) fn close_queued_or_in_flight(&self) -> bool {
377 let close_requested = self.close_request.borrow().is_some();
378 let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
379
380 close_requested || in_flight_close_requested
381 }
382
383 pub(crate) fn finish_in_flight_write(&self, can_gc: CanGc) {
385 let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
386 unreachable!("Stream should have a write request");
388 };
389
390 in_flight_write_request.resolve_native(&(), can_gc);
392
393 }
396
397 pub(crate) fn start_erroring(
399 &self,
400 cx: &mut js::context::JSContext,
401 global: &GlobalScope,
402 error: SafeHandleValue,
403 ) {
404 assert!(self.stored_error.get().is_undefined());
406
407 assert!(self.is_writable());
409
410 let Some(controller) = self.controller.get() else {
412 unreachable!("Stream should have a controller.");
414 };
415
416 self.state.set(WritableStreamState::Erroring);
418
419 self.stored_error.set(*error);
421
422 if let Some(writer) = self.writer.get() {
424 writer.ensure_ready_promise_rejected(global, error, CanGc::from_cx(cx));
426 }
427
428 if !self.has_operations_marked_inflight() && controller.started() {
430 self.finish_erroring(cx, global);
432 }
433 }
434
435 pub(crate) fn deal_with_rejection(
437 &self,
438 cx: &mut js::context::JSContext,
439 global: &GlobalScope,
440 error: SafeHandleValue,
441 ) {
442 if self.is_writable() {
446 self.start_erroring(cx, global, error);
448
449 return;
451 }
452
453 assert!(self.is_erroring());
455
456 self.finish_erroring(cx, global);
458 }
459
460 pub(crate) fn mark_first_write_request_in_flight(&self) {
462 let mut in_flight_write_request = self.in_flight_write_request.borrow_mut();
463 let mut write_requests = self.write_requests.borrow_mut();
464
465 assert!(in_flight_write_request.is_none());
467
468 assert!(!write_requests.is_empty());
470
471 let write_request = write_requests.pop_front().unwrap();
474
475 *in_flight_write_request = Some(write_request);
477 }
478
479 pub(crate) fn mark_close_request_in_flight(&self) {
481 let mut in_flight_close_request = self.in_flight_close_request.borrow_mut();
482 let mut close_request = self.close_request.borrow_mut();
483
484 assert!(in_flight_close_request.is_none());
486
487 assert!(close_request.is_some());
489
490 let close_request = close_request.take().unwrap();
493
494 *in_flight_close_request = Some(close_request);
496 }
497
498 pub(crate) fn finish_in_flight_close(&self, cx: SafeJSContext, can_gc: CanGc) {
500 let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
501 unreachable!("in_flight_close_request must be Some");
503 };
504
505 in_flight_close_request.resolve_native(&(), can_gc);
507
508 assert!(self.is_writable() || self.is_erroring());
513
514 if self.is_erroring() {
516 self.stored_error.set(UndefinedValue());
518
519 rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
521 if let Some(pending_abort_request) = &*pending_abort_request {
522 pending_abort_request.promise.resolve_native(&(), can_gc);
524
525 }
528 }
529
530 self.state.set(WritableStreamState::Closed);
532
533 if let Some(writer) = self.writer.get() {
535 writer.resolve_closed_promise_with_undefined(can_gc);
538 }
539
540 assert!(self.pending_abort_request.borrow().is_none());
542
543 assert!(self.stored_error.get().is_undefined());
545 }
546
547 pub(crate) fn finish_in_flight_close_with_error(
549 &self,
550 cx: &mut js::context::JSContext,
551 global: &GlobalScope,
552 error: SafeHandleValue,
553 ) {
554 let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
555 unreachable!("Inflight close request must be defined.");
557 };
558
559 in_flight_close_request.reject_native(&error, CanGc::from_cx(cx));
561
562 assert!(self.is_erroring() || self.is_writable());
567
568 rooted!(&in(cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
570 if let Some(pending_abort_request) = &*pending_abort_request {
571 pending_abort_request
573 .promise
574 .reject_native(&error, CanGc::from_cx(cx));
575
576 }
579
580 self.deal_with_rejection(cx, global, error);
582 }
583
584 pub(crate) fn finish_in_flight_write_with_error(
586 &self,
587 cx: &mut js::context::JSContext,
588 global: &GlobalScope,
589 error: SafeHandleValue,
590 ) {
591 let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
592 unreachable!("Inflight write request must be defined.");
594 };
595
596 in_flight_write_request.reject_native(&error, CanGc::from_cx(cx));
598
599 assert!(self.is_erroring() || self.is_writable());
604
605 self.deal_with_rejection(cx, global, error);
607 }
608
609 pub(crate) fn get_writer(&self) -> Option<DomRoot<WritableStreamDefaultWriter>> {
610 self.writer.get()
611 }
612
613 pub(crate) fn set_writer(&self, writer: Option<&WritableStreamDefaultWriter>) {
614 self.writer.set(writer);
615 }
616
617 pub(crate) fn set_backpressure(&self, backpressure: bool) {
618 self.backpressure.set(backpressure);
619 }
620
621 pub(crate) fn get_backpressure(&self) -> bool {
622 self.backpressure.get()
623 }
624
625 pub(crate) fn is_locked(&self) -> bool {
627 self.get_writer().is_some()
630 }
631
632 pub(crate) fn add_write_request(&self, global: &GlobalScope, can_gc: CanGc) -> Rc<Promise> {
634 assert!(self.is_locked());
636
637 assert!(self.is_writable());
639
640 let promise = Promise::new(global, can_gc);
642
643 self.write_requests.borrow_mut().push_back(promise.clone());
645
646 promise
648 }
649
650 pub(crate) fn get_controller(&self) -> Option<DomRoot<WritableStreamDefaultController>> {
652 self.controller.get()
653 }
654
655 pub(crate) fn abort(
657 &self,
658 cx: &mut CurrentRealm,
659 global: &GlobalScope,
660 provided_reason: SafeHandleValue,
661 ) -> Rc<Promise> {
662 if self.is_closed() || self.is_errored() {
664 return Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
666 }
667
668 self.get_controller()
670 .expect("Stream must have a controller.")
671 .signal_abort(cx, provided_reason);
672
673 let state = self.state.get();
675
676 if matches!(
678 state,
679 WritableStreamState::Closed | WritableStreamState::Errored
680 ) {
681 return Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
682 }
683
684 if self.pending_abort_request.borrow().is_some() {
686 return self
688 .pending_abort_request
689 .borrow()
690 .as_ref()
691 .expect("Pending abort request must be Some.")
692 .promise
693 .clone();
694 }
695
696 assert!(self.is_writable() || self.is_erroring());
698
699 let mut was_already_erroring = false;
701 rooted!(&in(cx) let undefined_reason = UndefinedValue());
702
703 let reason = if self.is_erroring() {
705 was_already_erroring = true;
707
708 undefined_reason.handle()
710 } else {
711 provided_reason
713 };
714
715 let promise = Promise::new2(cx, global);
717
718 *self.pending_abort_request.borrow_mut() = Some(PendingAbortRequest {
723 promise: promise.clone(),
724 reason: Heap::boxed(reason.get()),
725 was_already_erroring,
726 });
727
728 if !was_already_erroring {
730 self.start_erroring(cx, global, reason);
732 }
733
734 promise
736 }
737
738 pub(crate) fn close(
740 &self,
741 cx: &mut js::context::JSContext,
742 global: &GlobalScope,
743 ) -> Rc<Promise> {
744 if self.is_closed() || self.is_errored() {
747 let promise = Promise::new2(cx, global);
749 promise.reject_error(
750 Error::Type(c"Stream is closed or errored.".to_owned()),
751 CanGc::from_cx(cx),
752 );
753 return promise;
754 }
755
756 assert!(self.is_writable() || self.is_erroring());
758
759 assert!(!self.close_queued_or_in_flight());
761
762 let promise = Promise::new2(cx, global);
764
765 *self.close_request.borrow_mut() = Some(promise.clone());
767
768 if let Some(writer) = self.writer.get() {
771 if self.get_backpressure() && self.is_writable() {
774 writer.resolve_ready_promise_with_undefined(CanGc::from_cx(cx));
776 }
777 }
778
779 let Some(controller) = self.controller.get() else {
781 unreachable!("Stream must have a controller.");
782 };
783 controller.close(cx, global);
784
785 promise
787 }
788
789 pub(crate) fn get_desired_size(&self) -> Option<f64> {
792 if self.is_errored() || self.is_erroring() {
798 return None;
799 }
800
801 if self.is_closed() {
803 return Some(0.);
804 }
805
806 let Some(controller) = self.controller.get() else {
807 unreachable!("Stream must have a controller.");
808 };
809 Some(controller.get_desired_size())
810 }
811
812 pub(crate) fn aquire_default_writer(
814 &self,
815 cx: SafeJSContext,
816 global: &GlobalScope,
817 can_gc: CanGc,
818 ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
819 let writer = WritableStreamDefaultWriter::new(global, None, can_gc);
821
822 writer.setup(cx, self, can_gc)?;
824
825 Ok(writer)
827 }
828
829 pub(crate) fn update_backpressure(
831 &self,
832 backpressure: bool,
833 global: &GlobalScope,
834 can_gc: CanGc,
835 ) {
836 self.is_writable();
838
839 assert!(!self.close_queued_or_in_flight());
841
842 let writer = self.get_writer();
844
845 if let Some(writer) = writer {
846 if backpressure != self.get_backpressure() {
848 if backpressure {
850 let promise = Promise::new(global, can_gc);
852 writer.set_ready_promise(promise);
853 } else {
854 assert!(!backpressure);
857 writer.resolve_ready_promise_with_undefined(can_gc);
859 }
860 }
861 }
862
863 self.set_backpressure(backpressure);
865 }
866
867 pub(crate) fn setup_cross_realm_transform_writable(
869 &self,
870 cx: &mut js::context::JSContext,
871 port: &MessagePort,
872 ) {
873 let port_id = port.message_port_id();
874 let global = self.global();
875
876 let size_algorithm =
882 extract_size_algorithm(&QueuingStrategy::default(), CanGc::from_cx(cx));
883
884 let backpressure_promise = Rc::new(RefCell::new(Some(Promise::new(
888 &global,
889 CanGc::from_cx(cx),
890 ))));
891
892 let controller = WritableStreamDefaultController::new(
894 &global,
895 UnderlyingSinkType::Transfer {
896 backpressure_promise: backpressure_promise.clone(),
897 port: Dom::from_ref(port),
898 },
899 1.0,
900 size_algorithm,
901 CanGc::from_cx(cx),
902 );
903
904 rooted!(&in(cx) let cross_realm_transform_writable = CrossRealmTransformWritable {
907 controller: Dom::from_ref(&controller),
908 backpressure_promise,
909 });
910 global.note_cross_realm_transform_writable(&cross_realm_transform_writable, port_id);
911
912 port.Start(cx);
914
915 controller
917 .setup(cx.into(), &global, self, CanGc::from_cx(cx))
918 .expect("Setup for transfer cannot fail");
919 }
920 #[allow(clippy::too_many_arguments)]
922 pub(crate) fn setup_from_underlying_sink(
923 &self,
924 cx: SafeJSContext,
925 global: &GlobalScope,
926 stream: &WritableStream,
927 underlying_sink_obj: SafeHandleObject,
928 underlying_sink: &UnderlyingSink,
929 strategy_hwm: f64,
930 strategy_size: Rc<QueuingStrategySize>,
931 can_gc: CanGc,
932 ) -> Result<(), Error> {
933 let controller = WritableStreamDefaultController::new(
959 global,
960 UnderlyingSinkType::new_js(
961 underlying_sink.abort.clone(),
962 underlying_sink.start.clone(),
963 underlying_sink.close.clone(),
964 underlying_sink.write.clone(),
965 ),
966 strategy_hwm,
967 strategy_size,
968 can_gc,
969 );
970
971 controller.set_underlying_sink_this_object(underlying_sink_obj);
974
975 controller.setup(cx, global, stream, can_gc)
977 }
978}
979
980#[cfg_attr(crown, expect(crown::unrooted_must_root))]
982pub(crate) fn create_writable_stream(
983 cx: SafeJSContext,
984 global: &GlobalScope,
985 writable_high_water_mark: f64,
986 writable_size_algorithm: Rc<QueuingStrategySize>,
987 underlying_sink_type: UnderlyingSinkType,
988 can_gc: CanGc,
989) -> Fallible<DomRoot<WritableStream>> {
990 assert!(writable_high_water_mark >= 0.0);
992
993 let stream = WritableStream::new_with_proto(global, None, can_gc);
996
997 let controller = WritableStreamDefaultController::new(
999 global,
1000 underlying_sink_type,
1001 writable_high_water_mark,
1002 writable_size_algorithm,
1003 can_gc,
1004 );
1005
1006 controller.setup(cx, global, &stream, can_gc)?;
1009
1010 Ok(stream)
1012}
1013
1014impl WritableStreamMethods<crate::DomTypeHolder> for WritableStream {
1015 fn Constructor(
1017 cx: SafeJSContext,
1018 global: &GlobalScope,
1019 proto: Option<SafeHandleObject>,
1020 can_gc: CanGc,
1021 underlying_sink: Option<*mut JSObject>,
1022 strategy: &QueuingStrategy,
1023 ) -> Fallible<DomRoot<WritableStream>> {
1024 rooted!(in(*cx) let underlying_sink_obj = underlying_sink.unwrap_or(ptr::null_mut()));
1026
1027 let underlying_sink_dict = if !underlying_sink_obj.is_null() {
1030 rooted!(in(*cx) let obj_val = ObjectValue(underlying_sink_obj.get()));
1031 match UnderlyingSink::new(cx, obj_val.handle(), can_gc) {
1032 Ok(ConversionResult::Success(val)) => val,
1033 Ok(ConversionResult::Failure(error)) => {
1034 return Err(Error::Type(error.into_owned()));
1035 },
1036 _ => {
1037 return Err(Error::JSFailed);
1038 },
1039 }
1040 } else {
1041 UnderlyingSink::empty()
1042 };
1043
1044 if !underlying_sink_dict.type_.handle().is_undefined() {
1045 return Err(Error::Range(c"type is set".to_owned()));
1047 }
1048
1049 let stream = WritableStream::new_with_proto(global, proto, can_gc);
1051
1052 let size_algorithm = extract_size_algorithm(strategy, can_gc);
1054
1055 let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
1057
1058 stream.setup_from_underlying_sink(
1061 cx,
1062 global,
1063 &stream,
1064 underlying_sink_obj.handle(),
1065 &underlying_sink_dict,
1066 high_water_mark,
1067 size_algorithm,
1068 can_gc,
1069 )?;
1070
1071 Ok(stream)
1072 }
1073
1074 fn Locked(&self) -> bool {
1076 self.is_locked()
1078 }
1079
1080 fn Abort(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) -> Rc<Promise> {
1082 let global = GlobalScope::from_current_realm(cx);
1083
1084 if self.is_locked() {
1086 let promise = Promise::new2(cx, &global);
1088 promise.reject_error(
1089 Error::Type(c"Stream is locked.".to_owned()),
1090 CanGc::from_cx(cx),
1091 );
1092 return promise;
1093 }
1094
1095 self.abort(cx, &global, reason)
1097 }
1098
1099 fn Close(&self, cx: &mut CurrentRealm) -> Rc<Promise> {
1101 let global = GlobalScope::from_current_realm(cx);
1102
1103 if self.is_locked() {
1105 let promise = Promise::new2(cx, &global);
1107 promise.reject_error(
1108 Error::Type(c"Stream is locked.".to_owned()),
1109 CanGc::from_cx(cx),
1110 );
1111 return promise;
1112 }
1113
1114 if self.close_queued_or_in_flight() {
1116 let promise = Promise::new2(cx, &global);
1118 promise.reject_error(
1119 Error::Type(c"Stream has closed queued or in-flight".to_owned()),
1120 CanGc::from_cx(cx),
1121 );
1122 return promise;
1123 }
1124
1125 self.close(cx, &global)
1127 }
1128
1129 fn GetWriter(
1131 &self,
1132 realm: InRealm,
1133 can_gc: CanGc,
1134 ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
1135 let cx = GlobalScope::get_cx();
1136 let global = GlobalScope::from_safe_context(cx, realm);
1137
1138 self.aquire_default_writer(cx, &global, can_gc)
1140 }
1141}
1142
1143impl js::gc::Rootable for CrossRealmTransformWritable {}
1144
1145#[derive(Clone, JSTraceable, MallocSizeOf)]
1149#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
1150pub(crate) struct CrossRealmTransformWritable {
1151 controller: Dom<WritableStreamDefaultController>,
1153
1154 #[ignore_malloc_size_of = "nested Rc"]
1156 backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
1157}
1158
1159impl CrossRealmTransformWritable {
1160 #[expect(unsafe_code)]
1163 pub(crate) fn handle_message(
1164 &self,
1165 cx: &mut CurrentRealm,
1166 global: &GlobalScope,
1167 message: SafeHandleValue,
1168 ) {
1169 rooted!(&in(cx) let mut value = UndefinedValue());
1170 let type_string = unsafe {
1171 get_type_and_value_from_message(
1172 cx.into(),
1173 message,
1174 value.handle_mut(),
1175 CanGc::from_cx(cx),
1176 )
1177 };
1178
1179 if type_string == "error" {
1184 self.controller.error_if_needed(cx, value.handle(), global);
1186 }
1187
1188 let backpressure_promise = self.backpressure_promise.borrow_mut().take();
1189
1190 if let Some(promise) = backpressure_promise {
1193 promise.resolve_native(&(), CanGc::from_cx(cx));
1195
1196 }
1199 }
1200
1201 pub(crate) fn handle_error(
1204 &self,
1205 cx: &mut CurrentRealm,
1206 global: &GlobalScope,
1207 port: &MessagePort,
1208 ) {
1209 let error = DOMException::new(global, DOMErrorName::DataCloneError, CanGc::from_cx(cx));
1211 rooted!(&in(cx) let mut rooted_error = UndefinedValue());
1212 error.safe_to_jsval(cx.into(), rooted_error.handle_mut(), CanGc::from_cx(cx));
1213
1214 port.cross_realm_transform_send_error(rooted_error.handle(), CanGc::from_cx(cx));
1216
1217 self.controller
1219 .error_if_needed(cx, rooted_error.handle(), global);
1220
1221 global.disentangle_port(port, CanGc::from_cx(cx));
1223 }
1224}
1225
1226impl Transferable for WritableStream {
1228 type Index = MessagePortIndex;
1229 type Data = MessagePortImpl;
1230
1231 fn transfer(
1233 &self,
1234 cx: &mut js::context::JSContext,
1235 ) -> Fallible<(MessagePortId, MessagePortImpl)> {
1236 if self.is_locked() {
1239 return Err(Error::DataClone(None));
1240 }
1241
1242 let global = self.global();
1243 let mut realm = enter_auto_realm(cx, &*global);
1244 let mut realm = realm.current_realm();
1245 let cx = &mut realm;
1246
1247 let port_1 = MessagePort::new(&global, CanGc::from_cx(cx));
1249 global.track_message_port(&port_1, None);
1250
1251 let port_2 = MessagePort::new(&global, CanGc::from_cx(cx));
1253 global.track_message_port(&port_2, None);
1254
1255 global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
1257
1258 let readable = ReadableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
1260
1261 readable.setup_cross_realm_transform_readable(cx, &port_1);
1263
1264 let promise = readable.pipe_to(cx, &global, self, false, false, false, None);
1266
1267 promise.set_promise_is_handled();
1269
1270 port_2.transfer(cx)
1272 }
1273
1274 fn transfer_receive(
1276 cx: &mut js::context::JSContext,
1277 owner: &GlobalScope,
1278 id: MessagePortId,
1279 port_impl: MessagePortImpl,
1280 ) -> Result<DomRoot<Self>, ()> {
1281 let value = WritableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1284
1285 let transferred_port = MessagePort::transfer_receive(cx, owner, id, port_impl)?;
1292
1293 value.setup_cross_realm_transform_writable(cx, &transferred_port);
1295 Ok(value)
1296 }
1297
1298 fn serialized_storage<'a>(
1300 data: StructuredData<'a, '_>,
1301 ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
1302 match data {
1303 StructuredData::Reader(r) => &mut r.port_impls,
1304 StructuredData::Writer(w) => &mut w.ports,
1305 }
1306 }
1307}