1use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::mem;
8use std::ptr::{self};
9use std::rc::Rc;
10
11use base::id::{MessagePortId, MessagePortIndex};
12use constellation_traits::MessagePortImpl;
13use dom_struct::dom_struct;
14use js::jsapi::{Heap, JSObject};
15use js::jsval::{JSVal, ObjectValue, UndefinedValue};
16use js::realm::CurrentRealm;
17use js::rust::{
18 HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
19 MutableHandleValue as SafeMutableHandleValue,
20};
21use rustc_hash::FxHashMap;
22use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
23use script_bindings::conversions::SafeToJSValConvertible;
24
25use crate::dom::bindings::cell::DomRefCell;
26use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::{
27 QueuingStrategy, QueuingStrategySize,
28};
29use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::UnderlyingSink;
30use crate::dom::bindings::codegen::Bindings::WritableStreamBinding::WritableStreamMethods;
31use crate::dom::bindings::conversions::ConversionResult;
32use crate::dom::bindings::error::{Error, Fallible};
33use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
34use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
35use crate::dom::bindings::structuredclone::StructuredData;
36use crate::dom::bindings::transferable::Transferable;
37use crate::dom::domexception::{DOMErrorName, DOMException};
38use crate::dom::globalscope::GlobalScope;
39use crate::dom::messageport::MessagePort;
40use crate::dom::promise::Promise;
41use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
42use crate::dom::readablestream::{ReadableStream, get_type_and_value_from_message};
43use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
44use crate::dom::stream::writablestreamdefaultcontroller::{
45 UnderlyingSinkType, WritableStreamDefaultController,
46};
47use crate::dom::stream::writablestreamdefaultwriter::WritableStreamDefaultWriter;
48use crate::realms::{InRealm, enter_auto_realm, enter_realm};
49use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
50
51impl js::gc::Rootable for AbortAlgorithmFulfillmentHandler {}
52
53#[derive(JSTraceable, MallocSizeOf)]
56#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
57struct AbortAlgorithmFulfillmentHandler {
58 stream: Dom<WritableStream>,
59 #[conditional_malloc_size_of]
60 abort_request_promise: Rc<Promise>,
61}
62
63impl Callback for AbortAlgorithmFulfillmentHandler {
64 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
65 let can_gc = CanGc::from_cx(cx);
66 let cx: SafeJSContext = cx.into();
67 self.abort_request_promise.resolve_native(&(), can_gc);
69
70 self.stream
72 .as_rooted()
73 .reject_close_and_closed_promise_if_needed(cx, can_gc);
74 }
75}
76
77impl js::gc::Rootable for AbortAlgorithmRejectionHandler {}
78
79#[derive(JSTraceable, MallocSizeOf)]
82#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
83struct AbortAlgorithmRejectionHandler {
84 stream: Dom<WritableStream>,
85 #[conditional_malloc_size_of]
86 abort_request_promise: Rc<Promise>,
87}
88
89impl Callback for AbortAlgorithmRejectionHandler {
90 fn callback(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) {
91 let can_gc = CanGc::from_cx(cx);
92 let cx: SafeJSContext = cx.into();
93 self.abort_request_promise.reject_native(&reason, can_gc);
95
96 self.stream
98 .as_rooted()
99 .reject_close_and_closed_promise_if_needed(cx, can_gc);
100 }
101}
102
103impl js::gc::Rootable for PendingAbortRequest {}
104
105#[derive(JSTraceable, MallocSizeOf)]
107#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
108struct PendingAbortRequest {
109 #[conditional_malloc_size_of]
111 promise: Rc<Promise>,
112
113 #[ignore_malloc_size_of = "mozjs"]
115 reason: Box<Heap<JSVal>>,
116
117 was_already_erroring: bool,
119}
120
121#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf)]
123pub(crate) enum WritableStreamState {
124 #[default]
125 Writable,
126 Closed,
127 Erroring,
128 Errored,
129}
130
131#[dom_struct]
133pub struct WritableStream {
134 reflector_: Reflector,
135
136 backpressure: Cell<bool>,
138
139 #[conditional_malloc_size_of]
141 close_request: DomRefCell<Option<Rc<Promise>>>,
142
143 controller: MutNullableDom<WritableStreamDefaultController>,
145
146 detached: Cell<bool>,
148
149 #[conditional_malloc_size_of]
151 in_flight_write_request: DomRefCell<Option<Rc<Promise>>>,
152
153 #[conditional_malloc_size_of]
155 in_flight_close_request: DomRefCell<Option<Rc<Promise>>>,
156
157 pending_abort_request: DomRefCell<Option<PendingAbortRequest>>,
159
160 state: Cell<WritableStreamState>,
162
163 #[ignore_malloc_size_of = "mozjs"]
165 stored_error: Heap<JSVal>,
166
167 writer: MutNullableDom<WritableStreamDefaultWriter>,
169
170 #[conditional_malloc_size_of]
172 write_requests: DomRefCell<VecDeque<Rc<Promise>>>,
173}
174
175impl WritableStream {
176 fn new_inherited() -> WritableStream {
178 WritableStream {
179 reflector_: Reflector::new(),
180 backpressure: Default::default(),
181 close_request: Default::default(),
182 controller: Default::default(),
183 detached: Default::default(),
184 in_flight_write_request: Default::default(),
185 in_flight_close_request: Default::default(),
186 pending_abort_request: Default::default(),
187 state: Default::default(),
188 stored_error: Default::default(),
189 writer: Default::default(),
190 write_requests: Default::default(),
191 }
192 }
193
194 pub(crate) fn new_with_proto(
195 global: &GlobalScope,
196 proto: Option<SafeHandleObject>,
197 can_gc: CanGc,
198 ) -> DomRoot<WritableStream> {
199 reflect_dom_object_with_proto(
200 Box::new(WritableStream::new_inherited()),
201 global,
202 proto,
203 can_gc,
204 )
205 }
206
207 pub(crate) fn assert_no_controller(&self) {
210 assert!(self.controller.get().is_none());
211 }
212
213 pub(crate) fn set_default_controller(&self, controller: &WritableStreamDefaultController) {
216 self.controller.set(Some(controller));
217 }
218
219 pub(crate) fn get_default_controller(&self) -> DomRoot<WritableStreamDefaultController> {
220 self.controller.get().expect("Controller should be set.")
221 }
222
223 pub(crate) fn is_writable(&self) -> bool {
224 matches!(self.state.get(), WritableStreamState::Writable)
225 }
226
227 pub(crate) fn is_erroring(&self) -> bool {
228 matches!(self.state.get(), WritableStreamState::Erroring)
229 }
230
231 pub(crate) fn is_errored(&self) -> bool {
232 matches!(self.state.get(), WritableStreamState::Errored)
233 }
234
235 pub(crate) fn is_closed(&self) -> bool {
236 matches!(self.state.get(), WritableStreamState::Closed)
237 }
238
239 pub(crate) fn has_in_flight_write_request(&self) -> bool {
240 self.in_flight_write_request.borrow().is_some()
241 }
242
243 pub(crate) fn has_operations_marked_inflight(&self) -> bool {
245 let in_flight_write_requested = self.in_flight_write_request.borrow().is_some();
246 let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
247
248 in_flight_write_requested || in_flight_close_requested
249 }
250
251 pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
253 handle_mut.set(self.stored_error.get());
254 }
255
256 pub(crate) fn finish_erroring(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
258 assert!(self.is_erroring());
260
261 assert!(!self.has_operations_marked_inflight());
263
264 self.state.set(WritableStreamState::Errored);
266
267 let Some(controller) = self.controller.get() else {
269 unreachable!("Stream should have a controller.");
270 };
271 controller.perform_error_steps();
272
273 rooted!(in(*cx) let mut stored_error = UndefinedValue());
275 self.get_stored_error(stored_error.handle_mut());
276
277 let write_requests = mem::take(&mut *self.write_requests.borrow_mut());
279 for request in write_requests {
280 request.reject(cx, stored_error.handle(), can_gc);
282 }
283
284 if self.pending_abort_request.borrow().is_none() {
289 self.reject_close_and_closed_promise_if_needed(cx, can_gc);
291
292 return;
294 }
295
296 rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
299 if let Some(pending_abort_request) = &*pending_abort_request {
300 if pending_abort_request.was_already_erroring {
302 pending_abort_request
304 .promise
305 .reject(cx, stored_error.handle(), can_gc);
306
307 self.reject_close_and_closed_promise_if_needed(cx, can_gc);
309
310 return;
312 }
313
314 rooted!(in(*cx) let mut reason = UndefinedValue());
316 reason.set(pending_abort_request.reason.get());
317 let promise = controller.abort_steps(cx, global, reason.handle(), can_gc);
318
319 rooted!(in(*cx) let mut fulfillment_handler = Some(AbortAlgorithmFulfillmentHandler {
321 stream: Dom::from_ref(self),
322 abort_request_promise: pending_abort_request.promise.clone(),
323 }));
324
325 rooted!(in(*cx) let mut rejection_handler = Some(AbortAlgorithmRejectionHandler {
327 stream: Dom::from_ref(self),
328 abort_request_promise: pending_abort_request.promise.clone(),
329 }));
330
331 let handler = PromiseNativeHandler::new(
332 global,
333 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
334 rejection_handler.take().map(|h| Box::new(h) as Box<_>),
335 can_gc,
336 );
337 let realm = enter_realm(global);
338 let comp = InRealm::Entered(&realm);
339 promise.append_native_handler(&handler, comp, can_gc);
340 }
341 }
342
343 fn reject_close_and_closed_promise_if_needed(&self, cx: SafeJSContext, can_gc: CanGc) {
345 assert!(self.is_errored());
347
348 rooted!(in(*cx) let mut stored_error = UndefinedValue());
349 self.get_stored_error(stored_error.handle_mut());
350
351 let close_request = self.close_request.borrow_mut().take();
353 if let Some(close_request) = close_request {
354 assert!(self.in_flight_close_request.borrow().is_none());
356
357 close_request.reject_native(&stored_error.handle(), can_gc)
359
360 }
363
364 if let Some(writer) = self.writer.get() {
367 writer.reject_closed_promise_with_stored_error(&stored_error.handle(), can_gc);
369
370 writer.set_close_promise_is_handled();
372 }
373 }
374
375 pub(crate) fn close_queued_or_in_flight(&self) -> bool {
377 let close_requested = self.close_request.borrow().is_some();
378 let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
379
380 close_requested || in_flight_close_requested
381 }
382
383 pub(crate) fn finish_in_flight_write(&self, can_gc: CanGc) {
385 let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
386 unreachable!("Stream should have a write request");
388 };
389
390 in_flight_write_request.resolve_native(&(), can_gc);
392
393 }
396
397 pub(crate) fn start_erroring(
399 &self,
400 cx: SafeJSContext,
401 global: &GlobalScope,
402 error: SafeHandleValue,
403 can_gc: CanGc,
404 ) {
405 assert!(self.stored_error.get().is_undefined());
407
408 assert!(self.is_writable());
410
411 let Some(controller) = self.controller.get() else {
413 unreachable!("Stream should have a controller.");
415 };
416
417 self.state.set(WritableStreamState::Erroring);
419
420 self.stored_error.set(*error);
422
423 if let Some(writer) = self.writer.get() {
425 writer.ensure_ready_promise_rejected(global, error, can_gc);
427 }
428
429 if !self.has_operations_marked_inflight() && controller.started() {
431 self.finish_erroring(cx, global, can_gc);
433 }
434 }
435
436 pub(crate) fn deal_with_rejection(
438 &self,
439 cx: SafeJSContext,
440 global: &GlobalScope,
441 error: SafeHandleValue,
442 can_gc: CanGc,
443 ) {
444 if self.is_writable() {
448 self.start_erroring(cx, global, error, can_gc);
450
451 return;
453 }
454
455 assert!(self.is_erroring());
457
458 self.finish_erroring(cx, global, can_gc);
460 }
461
462 pub(crate) fn mark_first_write_request_in_flight(&self) {
464 let mut in_flight_write_request = self.in_flight_write_request.borrow_mut();
465 let mut write_requests = self.write_requests.borrow_mut();
466
467 assert!(in_flight_write_request.is_none());
469
470 assert!(!write_requests.is_empty());
472
473 let write_request = write_requests.pop_front().unwrap();
476
477 *in_flight_write_request = Some(write_request);
479 }
480
481 pub(crate) fn mark_close_request_in_flight(&self) {
483 let mut in_flight_close_request = self.in_flight_close_request.borrow_mut();
484 let mut close_request = self.close_request.borrow_mut();
485
486 assert!(in_flight_close_request.is_none());
488
489 assert!(close_request.is_some());
491
492 let close_request = close_request.take().unwrap();
495
496 *in_flight_close_request = Some(close_request);
498 }
499
500 pub(crate) fn finish_in_flight_close(&self, cx: SafeJSContext, can_gc: CanGc) {
502 let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
503 unreachable!("in_flight_close_request must be Some");
505 };
506
507 in_flight_close_request.resolve_native(&(), can_gc);
509
510 assert!(self.is_writable() || self.is_erroring());
515
516 if self.is_erroring() {
518 self.stored_error.set(UndefinedValue());
520
521 rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
523 if let Some(pending_abort_request) = &*pending_abort_request {
524 pending_abort_request.promise.resolve_native(&(), can_gc);
526
527 }
530 }
531
532 self.state.set(WritableStreamState::Closed);
534
535 if let Some(writer) = self.writer.get() {
537 writer.resolve_closed_promise_with_undefined(can_gc);
540 }
541
542 assert!(self.pending_abort_request.borrow().is_none());
544
545 assert!(self.stored_error.get().is_undefined());
547 }
548
549 pub(crate) fn finish_in_flight_close_with_error(
551 &self,
552 cx: SafeJSContext,
553 global: &GlobalScope,
554 error: SafeHandleValue,
555 can_gc: CanGc,
556 ) {
557 let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
558 unreachable!("Inflight close request must be defined.");
560 };
561
562 in_flight_close_request.reject_native(&error, can_gc);
564
565 assert!(self.is_erroring() || self.is_writable());
570
571 rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
573 if let Some(pending_abort_request) = &*pending_abort_request {
574 pending_abort_request.promise.reject_native(&error, can_gc);
576
577 }
580
581 self.deal_with_rejection(cx, global, error, can_gc);
583 }
584
585 pub(crate) fn finish_in_flight_write_with_error(
587 &self,
588 cx: SafeJSContext,
589 global: &GlobalScope,
590 error: SafeHandleValue,
591 can_gc: CanGc,
592 ) {
593 let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
594 unreachable!("Inflight write request must be defined.");
596 };
597
598 in_flight_write_request.reject_native(&error, can_gc);
600
601 assert!(self.is_erroring() || self.is_writable());
606
607 self.deal_with_rejection(cx, global, error, can_gc);
609 }
610
611 pub(crate) fn get_writer(&self) -> Option<DomRoot<WritableStreamDefaultWriter>> {
612 self.writer.get()
613 }
614
615 pub(crate) fn set_writer(&self, writer: Option<&WritableStreamDefaultWriter>) {
616 self.writer.set(writer);
617 }
618
619 pub(crate) fn set_backpressure(&self, backpressure: bool) {
620 self.backpressure.set(backpressure);
621 }
622
623 pub(crate) fn get_backpressure(&self) -> bool {
624 self.backpressure.get()
625 }
626
627 pub(crate) fn is_locked(&self) -> bool {
629 self.get_writer().is_some()
632 }
633
634 pub(crate) fn add_write_request(&self, global: &GlobalScope, can_gc: CanGc) -> Rc<Promise> {
636 assert!(self.is_locked());
638
639 assert!(self.is_writable());
641
642 let promise = Promise::new(global, can_gc);
644
645 self.write_requests.borrow_mut().push_back(promise.clone());
647
648 promise
650 }
651
652 pub(crate) fn get_controller(&self) -> Option<DomRoot<WritableStreamDefaultController>> {
654 self.controller.get()
655 }
656
657 pub(crate) fn abort(
659 &self,
660 cx: &mut CurrentRealm,
661 global: &GlobalScope,
662 provided_reason: SafeHandleValue,
663 ) -> Rc<Promise> {
664 if self.is_closed() || self.is_errored() {
666 return Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
668 }
669
670 self.get_controller()
672 .expect("Stream must have a controller.")
673 .signal_abort(cx, provided_reason);
674
675 let state = self.state.get();
677
678 if matches!(
680 state,
681 WritableStreamState::Closed | WritableStreamState::Errored
682 ) {
683 return Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
684 }
685
686 if self.pending_abort_request.borrow().is_some() {
688 return self
690 .pending_abort_request
691 .borrow()
692 .as_ref()
693 .expect("Pending abort request must be Some.")
694 .promise
695 .clone();
696 }
697
698 assert!(self.is_writable() || self.is_erroring());
700
701 let mut was_already_erroring = false;
703 rooted!(&in(cx) let undefined_reason = UndefinedValue());
704
705 let reason = if self.is_erroring() {
707 was_already_erroring = true;
709
710 undefined_reason.handle()
712 } else {
713 provided_reason
715 };
716
717 let promise = Promise::new2(cx, global);
719
720 *self.pending_abort_request.borrow_mut() = Some(PendingAbortRequest {
725 promise: promise.clone(),
726 reason: Heap::boxed(reason.get()),
727 was_already_erroring,
728 });
729
730 if !was_already_erroring {
732 self.start_erroring(cx.into(), global, reason, CanGc::from_cx(cx));
734 }
735
736 promise
738 }
739
740 pub(crate) fn close(
742 &self,
743 cx: SafeJSContext,
744 global: &GlobalScope,
745 can_gc: CanGc,
746 ) -> Rc<Promise> {
747 if self.is_closed() || self.is_errored() {
750 let promise = Promise::new(global, can_gc);
752 promise.reject_error(
753 Error::Type(c"Stream is closed or errored.".to_owned()),
754 can_gc,
755 );
756 return promise;
757 }
758
759 assert!(self.is_writable() || self.is_erroring());
761
762 assert!(!self.close_queued_or_in_flight());
764
765 let promise = Promise::new(global, can_gc);
767
768 *self.close_request.borrow_mut() = Some(promise.clone());
770
771 if let Some(writer) = self.writer.get() {
774 if self.get_backpressure() && self.is_writable() {
777 writer.resolve_ready_promise_with_undefined(can_gc);
779 }
780 }
781
782 let Some(controller) = self.controller.get() else {
784 unreachable!("Stream must have a controller.");
785 };
786 controller.close(cx, global, can_gc);
787
788 promise
790 }
791
792 pub(crate) fn get_desired_size(&self) -> Option<f64> {
795 if self.is_errored() || self.is_erroring() {
801 return None;
802 }
803
804 if self.is_closed() {
806 return Some(0.);
807 }
808
809 let Some(controller) = self.controller.get() else {
810 unreachable!("Stream must have a controller.");
811 };
812 Some(controller.get_desired_size())
813 }
814
815 pub(crate) fn aquire_default_writer(
817 &self,
818 cx: SafeJSContext,
819 global: &GlobalScope,
820 can_gc: CanGc,
821 ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
822 let writer = WritableStreamDefaultWriter::new(global, None, can_gc);
824
825 writer.setup(cx, self, can_gc)?;
827
828 Ok(writer)
830 }
831
832 pub(crate) fn update_backpressure(
834 &self,
835 backpressure: bool,
836 global: &GlobalScope,
837 can_gc: CanGc,
838 ) {
839 self.is_writable();
841
842 assert!(!self.close_queued_or_in_flight());
844
845 let writer = self.get_writer();
847
848 if let Some(writer) = writer {
849 if backpressure != self.get_backpressure() {
851 if backpressure {
853 let promise = Promise::new(global, can_gc);
855 writer.set_ready_promise(promise);
856 } else {
857 assert!(!backpressure);
860 writer.resolve_ready_promise_with_undefined(can_gc);
862 }
863 }
864 }
865
866 self.set_backpressure(backpressure);
868 }
869
870 pub(crate) fn setup_cross_realm_transform_writable(
872 &self,
873 cx: SafeJSContext,
874 port: &MessagePort,
875 can_gc: CanGc,
876 ) {
877 let port_id = port.message_port_id();
878 let global = self.global();
879
880 let size_algorithm = extract_size_algorithm(&QueuingStrategy::default(), can_gc);
886
887 let backpressure_promise = Rc::new(RefCell::new(Some(Promise::new(&global, can_gc))));
891
892 let controller = WritableStreamDefaultController::new(
894 &global,
895 UnderlyingSinkType::Transfer {
896 backpressure_promise: backpressure_promise.clone(),
897 port: Dom::from_ref(port),
898 },
899 1.0,
900 size_algorithm,
901 can_gc,
902 );
903
904 rooted!(in(*cx) let cross_realm_transform_writable = CrossRealmTransformWritable {
907 controller: Dom::from_ref(&controller),
908 backpressure_promise: backpressure_promise.clone(),
909 });
910 global.note_cross_realm_transform_writable(&cross_realm_transform_writable, port_id);
911
912 port.Start(can_gc);
914
915 controller
917 .setup(cx, &global, self, can_gc)
918 .expect("Setup for transfer cannot fail");
919 }
920 #[allow(clippy::too_many_arguments)]
922 pub(crate) fn setup_from_underlying_sink(
923 &self,
924 cx: SafeJSContext,
925 global: &GlobalScope,
926 stream: &WritableStream,
927 underlying_sink_obj: SafeHandleObject,
928 underlying_sink: &UnderlyingSink,
929 strategy_hwm: f64,
930 strategy_size: Rc<QueuingStrategySize>,
931 can_gc: CanGc,
932 ) -> Result<(), Error> {
933 let controller = WritableStreamDefaultController::new(
959 global,
960 UnderlyingSinkType::new_js(
961 underlying_sink.abort.clone(),
962 underlying_sink.start.clone(),
963 underlying_sink.close.clone(),
964 underlying_sink.write.clone(),
965 ),
966 strategy_hwm,
967 strategy_size,
968 can_gc,
969 );
970
971 controller.set_underlying_sink_this_object(underlying_sink_obj);
974
975 controller.setup(cx, global, stream, can_gc)
977 }
978}
979
980#[cfg_attr(crown, expect(crown::unrooted_must_root))]
982pub(crate) fn create_writable_stream(
983 cx: SafeJSContext,
984 global: &GlobalScope,
985 writable_high_water_mark: f64,
986 writable_size_algorithm: Rc<QueuingStrategySize>,
987 underlying_sink_type: UnderlyingSinkType,
988 can_gc: CanGc,
989) -> Fallible<DomRoot<WritableStream>> {
990 assert!(writable_high_water_mark >= 0.0);
992
993 let stream = WritableStream::new_with_proto(global, None, can_gc);
996
997 let controller = WritableStreamDefaultController::new(
999 global,
1000 underlying_sink_type,
1001 writable_high_water_mark,
1002 writable_size_algorithm,
1003 can_gc,
1004 );
1005
1006 controller.setup(cx, global, &stream, can_gc)?;
1009
1010 Ok(stream)
1012}
1013
1014impl WritableStreamMethods<crate::DomTypeHolder> for WritableStream {
1015 fn Constructor(
1017 cx: SafeJSContext,
1018 global: &GlobalScope,
1019 proto: Option<SafeHandleObject>,
1020 can_gc: CanGc,
1021 underlying_sink: Option<*mut JSObject>,
1022 strategy: &QueuingStrategy,
1023 ) -> Fallible<DomRoot<WritableStream>> {
1024 rooted!(in(*cx) let underlying_sink_obj = underlying_sink.unwrap_or(ptr::null_mut()));
1026
1027 let underlying_sink_dict = if !underlying_sink_obj.is_null() {
1030 rooted!(in(*cx) let obj_val = ObjectValue(underlying_sink_obj.get()));
1031 match UnderlyingSink::new(cx, obj_val.handle(), can_gc) {
1032 Ok(ConversionResult::Success(val)) => val,
1033 Ok(ConversionResult::Failure(error)) => {
1034 return Err(Error::Type(error.into_owned()));
1035 },
1036 _ => {
1037 return Err(Error::JSFailed);
1038 },
1039 }
1040 } else {
1041 UnderlyingSink::empty()
1042 };
1043
1044 if !underlying_sink_dict.type_.handle().is_undefined() {
1045 return Err(Error::Range(c"type is set".to_owned()));
1047 }
1048
1049 let stream = WritableStream::new_with_proto(global, proto, can_gc);
1051
1052 let size_algorithm = extract_size_algorithm(strategy, can_gc);
1054
1055 let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
1057
1058 stream.setup_from_underlying_sink(
1061 cx,
1062 global,
1063 &stream,
1064 underlying_sink_obj.handle(),
1065 &underlying_sink_dict,
1066 high_water_mark,
1067 size_algorithm,
1068 can_gc,
1069 )?;
1070
1071 Ok(stream)
1072 }
1073
1074 fn Locked(&self) -> bool {
1076 self.is_locked()
1078 }
1079
1080 fn Abort(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) -> Rc<Promise> {
1082 let global = GlobalScope::from_current_realm(cx);
1083
1084 if self.is_locked() {
1086 let promise = Promise::new2(cx, &global);
1088 promise.reject_error(
1089 Error::Type(c"Stream is locked.".to_owned()),
1090 CanGc::from_cx(cx),
1091 );
1092 return promise;
1093 }
1094
1095 self.abort(cx, &global, reason)
1097 }
1098
1099 fn Close(&self, realm: InRealm, can_gc: CanGc) -> Rc<Promise> {
1101 let cx = GlobalScope::get_cx();
1102 let global = GlobalScope::from_safe_context(cx, realm);
1103
1104 if self.is_locked() {
1106 let promise = Promise::new(&global, can_gc);
1108 promise.reject_error(Error::Type(c"Stream is locked.".to_owned()), can_gc);
1109 return promise;
1110 }
1111
1112 if self.close_queued_or_in_flight() {
1114 let promise = Promise::new(&global, can_gc);
1116 promise.reject_error(
1117 Error::Type(c"Stream has closed queued or in-flight".to_owned()),
1118 can_gc,
1119 );
1120 return promise;
1121 }
1122
1123 self.close(cx, &global, can_gc)
1125 }
1126
1127 fn GetWriter(
1129 &self,
1130 realm: InRealm,
1131 can_gc: CanGc,
1132 ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
1133 let cx = GlobalScope::get_cx();
1134 let global = GlobalScope::from_safe_context(cx, realm);
1135
1136 self.aquire_default_writer(cx, &global, can_gc)
1138 }
1139}
1140
1141impl js::gc::Rootable for CrossRealmTransformWritable {}
1142
1143#[derive(Clone, JSTraceable, MallocSizeOf)]
1147#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
1148pub(crate) struct CrossRealmTransformWritable {
1149 controller: Dom<WritableStreamDefaultController>,
1151
1152 #[ignore_malloc_size_of = "nested Rc"]
1154 backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
1155}
1156
1157impl CrossRealmTransformWritable {
1158 #[expect(unsafe_code)]
1161 pub(crate) fn handle_message(
1162 &self,
1163 cx: SafeJSContext,
1164 global: &GlobalScope,
1165 message: SafeHandleValue,
1166 _realm: InRealm,
1167 can_gc: CanGc,
1168 ) {
1169 rooted!(in(*cx) let mut value = UndefinedValue());
1170 let type_string =
1171 unsafe { get_type_and_value_from_message(cx, message, value.handle_mut(), can_gc) };
1172
1173 if type_string == "error" {
1178 self.controller
1180 .error_if_needed(cx, value.handle(), global, can_gc);
1181 }
1182
1183 let backpressure_promise = self.backpressure_promise.borrow_mut().take();
1184
1185 if let Some(promise) = backpressure_promise {
1188 promise.resolve_native(&(), can_gc);
1190
1191 }
1194 }
1195
1196 pub(crate) fn handle_error(
1199 &self,
1200 cx: SafeJSContext,
1201 global: &GlobalScope,
1202 port: &MessagePort,
1203 _realm: InRealm,
1204 can_gc: CanGc,
1205 ) {
1206 let error = DOMException::new(global, DOMErrorName::DataCloneError, can_gc);
1208 rooted!(in(*cx) let mut rooted_error = UndefinedValue());
1209 error.safe_to_jsval(cx, rooted_error.handle_mut(), can_gc);
1210
1211 port.cross_realm_transform_send_error(rooted_error.handle(), can_gc);
1213
1214 self.controller
1216 .error_if_needed(cx, rooted_error.handle(), global, can_gc);
1217
1218 global.disentangle_port(port, can_gc);
1220 }
1221}
1222
1223impl Transferable for WritableStream {
1225 type Index = MessagePortIndex;
1226 type Data = MessagePortImpl;
1227
1228 fn transfer(
1230 &self,
1231 cx: &mut js::context::JSContext,
1232 ) -> Fallible<(MessagePortId, MessagePortImpl)> {
1233 if self.is_locked() {
1236 return Err(Error::DataClone(None));
1237 }
1238
1239 let global = self.global();
1240 let mut realm = enter_auto_realm(cx, &*global);
1241 let mut realm = realm.current_realm();
1242 let cx = &mut realm;
1243
1244 let port_1 = MessagePort::new(&global, CanGc::from_cx(cx));
1246 global.track_message_port(&port_1, None);
1247
1248 let port_2 = MessagePort::new(&global, CanGc::from_cx(cx));
1250 global.track_message_port(&port_2, None);
1251
1252 global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
1254
1255 let readable = ReadableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
1257
1258 readable.setup_cross_realm_transform_readable(cx.into(), &port_1, CanGc::from_cx(cx));
1260
1261 let promise = readable.pipe_to(cx, &global, self, false, false, false, None);
1263
1264 promise.set_promise_is_handled();
1266
1267 port_2.transfer(cx)
1269 }
1270
1271 fn transfer_receive(
1273 cx: &mut js::context::JSContext,
1274 owner: &GlobalScope,
1275 id: MessagePortId,
1276 port_impl: MessagePortImpl,
1277 ) -> Result<DomRoot<Self>, ()> {
1278 let value = WritableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1281
1282 let transferred_port = MessagePort::transfer_receive(cx, owner, id, port_impl)?;
1289
1290 value.setup_cross_realm_transform_writable(
1292 cx.into(),
1293 &transferred_port,
1294 CanGc::from_cx(cx),
1295 );
1296 Ok(value)
1297 }
1298
1299 fn serialized_storage<'a>(
1301 data: StructuredData<'a, '_>,
1302 ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
1303 match data {
1304 StructuredData::Reader(r) => &mut r.port_impls,
1305 StructuredData::Writer(w) => &mut w.ports,
1306 }
1307 }
1308}