1use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::mem;
8use std::ptr::{self};
9use std::rc::Rc;
10
11use base::id::{MessagePortId, MessagePortIndex};
12use constellation_traits::MessagePortImpl;
13use dom_struct::dom_struct;
14use js::jsapi::{Heap, JSObject};
15use js::jsval::{JSVal, ObjectValue, UndefinedValue};
16use js::rust::{
17 HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
18 MutableHandleValue as SafeMutableHandleValue,
19};
20use rustc_hash::FxHashMap;
21use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
22use script_bindings::conversions::SafeToJSValConvertible;
23
24use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
25use crate::dom::bindings::cell::DomRefCell;
26use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
27use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::UnderlyingSink;
28use crate::dom::bindings::codegen::Bindings::WritableStreamBinding::WritableStreamMethods;
29use crate::dom::bindings::conversions::ConversionResult;
30use crate::dom::bindings::error::{Error, Fallible};
31use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
32use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
33use crate::dom::bindings::structuredclone::StructuredData;
34use crate::dom::bindings::transferable::Transferable;
35use crate::dom::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
36use crate::dom::domexception::{DOMErrorName, DOMException};
37use crate::dom::globalscope::GlobalScope;
38use crate::dom::messageport::MessagePort;
39use crate::dom::promise::Promise;
40use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
41use crate::dom::readablestream::{ReadableStream, get_type_and_value_from_message};
42use crate::dom::writablestreamdefaultcontroller::{
43 UnderlyingSinkType, WritableStreamDefaultController,
44};
45use crate::dom::writablestreamdefaultwriter::WritableStreamDefaultWriter;
46use crate::realms::{InRealm, enter_realm};
47use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
48
49impl js::gc::Rootable for AbortAlgorithmFulfillmentHandler {}
50
51#[derive(JSTraceable, MallocSizeOf)]
54#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
55struct AbortAlgorithmFulfillmentHandler {
56 stream: Dom<WritableStream>,
57 #[ignore_malloc_size_of = "Rc is hard"]
58 abort_request_promise: Rc<Promise>,
59}
60
61impl Callback for AbortAlgorithmFulfillmentHandler {
62 fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
63 self.abort_request_promise.resolve_native(&(), can_gc);
65
66 self.stream
68 .as_rooted()
69 .reject_close_and_closed_promise_if_needed(cx, can_gc);
70 }
71}
72
73impl js::gc::Rootable for AbortAlgorithmRejectionHandler {}
74
75#[derive(JSTraceable, MallocSizeOf)]
78#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
79struct AbortAlgorithmRejectionHandler {
80 stream: Dom<WritableStream>,
81 #[ignore_malloc_size_of = "Rc is hard"]
82 abort_request_promise: Rc<Promise>,
83}
84
85impl Callback for AbortAlgorithmRejectionHandler {
86 fn callback(&self, cx: SafeJSContext, reason: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
87 self.abort_request_promise.reject_native(&reason, can_gc);
89
90 self.stream
92 .as_rooted()
93 .reject_close_and_closed_promise_if_needed(cx, can_gc);
94 }
95}
96
97impl js::gc::Rootable for PendingAbortRequest {}
98
99#[derive(JSTraceable, MallocSizeOf)]
101#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
102struct PendingAbortRequest {
103 #[ignore_malloc_size_of = "Rc is hard"]
105 promise: Rc<Promise>,
106
107 #[ignore_malloc_size_of = "mozjs"]
109 reason: Box<Heap<JSVal>>,
110
111 was_already_erroring: bool,
113}
114
115#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf)]
117pub(crate) enum WritableStreamState {
118 #[default]
119 Writable,
120 Closed,
121 Erroring,
122 Errored,
123}
124
125#[dom_struct]
127pub struct WritableStream {
128 reflector_: Reflector,
129
130 backpressure: Cell<bool>,
132
133 #[ignore_malloc_size_of = "Rc is hard"]
135 close_request: DomRefCell<Option<Rc<Promise>>>,
136
137 controller: MutNullableDom<WritableStreamDefaultController>,
139
140 detached: Cell<bool>,
142
143 #[ignore_malloc_size_of = "Rc is hard"]
145 in_flight_write_request: DomRefCell<Option<Rc<Promise>>>,
146
147 #[ignore_malloc_size_of = "Rc is hard"]
149 in_flight_close_request: DomRefCell<Option<Rc<Promise>>>,
150
151 pending_abort_request: DomRefCell<Option<PendingAbortRequest>>,
153
154 state: Cell<WritableStreamState>,
156
157 #[ignore_malloc_size_of = "mozjs"]
159 stored_error: Heap<JSVal>,
160
161 writer: MutNullableDom<WritableStreamDefaultWriter>,
163
164 #[ignore_malloc_size_of = "Rc is hard"]
166 write_requests: DomRefCell<VecDeque<Rc<Promise>>>,
167}
168
169impl WritableStream {
170 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
171 fn new_inherited() -> WritableStream {
173 WritableStream {
174 reflector_: Reflector::new(),
175 backpressure: Default::default(),
176 close_request: Default::default(),
177 controller: Default::default(),
178 detached: Default::default(),
179 in_flight_write_request: Default::default(),
180 in_flight_close_request: Default::default(),
181 pending_abort_request: Default::default(),
182 state: Default::default(),
183 stored_error: Default::default(),
184 writer: Default::default(),
185 write_requests: Default::default(),
186 }
187 }
188
189 pub(crate) fn new_with_proto(
190 global: &GlobalScope,
191 proto: Option<SafeHandleObject>,
192 can_gc: CanGc,
193 ) -> DomRoot<WritableStream> {
194 reflect_dom_object_with_proto(
195 Box::new(WritableStream::new_inherited()),
196 global,
197 proto,
198 can_gc,
199 )
200 }
201
202 pub(crate) fn assert_no_controller(&self) {
205 assert!(self.controller.get().is_none());
206 }
207
208 pub(crate) fn set_default_controller(&self, controller: &WritableStreamDefaultController) {
211 self.controller.set(Some(controller));
212 }
213
214 #[allow(unused)]
215 pub(crate) fn get_default_controller(&self) -> DomRoot<WritableStreamDefaultController> {
216 self.controller.get().expect("Controller should be set.")
217 }
218
219 pub(crate) fn is_writable(&self) -> bool {
220 matches!(self.state.get(), WritableStreamState::Writable)
221 }
222
223 pub(crate) fn is_erroring(&self) -> bool {
224 matches!(self.state.get(), WritableStreamState::Erroring)
225 }
226
227 pub(crate) fn is_errored(&self) -> bool {
228 matches!(self.state.get(), WritableStreamState::Errored)
229 }
230
231 pub(crate) fn is_closed(&self) -> bool {
232 matches!(self.state.get(), WritableStreamState::Closed)
233 }
234
235 pub(crate) fn has_in_flight_write_request(&self) -> bool {
236 self.in_flight_write_request.borrow().is_some()
237 }
238
239 pub(crate) fn has_operations_marked_inflight(&self) -> bool {
241 let in_flight_write_requested = self.in_flight_write_request.borrow().is_some();
242 let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
243
244 in_flight_write_requested || in_flight_close_requested
245 }
246
247 pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
249 handle_mut.set(self.stored_error.get());
250 }
251
252 pub(crate) fn finish_erroring(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
254 assert!(self.is_erroring());
256
257 assert!(!self.has_operations_marked_inflight());
259
260 self.state.set(WritableStreamState::Errored);
262
263 let Some(controller) = self.controller.get() else {
265 unreachable!("Stream should have a controller.");
266 };
267 controller.perform_error_steps();
268
269 rooted!(in(*cx) let mut stored_error = UndefinedValue());
271 self.get_stored_error(stored_error.handle_mut());
272
273 let write_requests = mem::take(&mut *self.write_requests.borrow_mut());
275 for request in write_requests {
276 request.reject(cx, stored_error.handle(), can_gc);
278 }
279
280 if self.pending_abort_request.borrow().is_none() {
285 self.reject_close_and_closed_promise_if_needed(cx, can_gc);
287
288 return;
290 }
291
292 rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
295 if let Some(pending_abort_request) = &*pending_abort_request {
296 if pending_abort_request.was_already_erroring {
298 pending_abort_request
300 .promise
301 .reject(cx, stored_error.handle(), can_gc);
302
303 self.reject_close_and_closed_promise_if_needed(cx, can_gc);
305
306 return;
308 }
309
310 rooted!(in(*cx) let mut reason = UndefinedValue());
312 reason.set(pending_abort_request.reason.get());
313 let promise = controller.abort_steps(cx, global, reason.handle(), can_gc);
314
315 rooted!(in(*cx) let mut fulfillment_handler = Some(AbortAlgorithmFulfillmentHandler {
317 stream: Dom::from_ref(self),
318 abort_request_promise: pending_abort_request.promise.clone(),
319 }));
320
321 rooted!(in(*cx) let mut rejection_handler = Some(AbortAlgorithmRejectionHandler {
323 stream: Dom::from_ref(self),
324 abort_request_promise: pending_abort_request.promise.clone(),
325 }));
326
327 let handler = PromiseNativeHandler::new(
328 global,
329 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
330 rejection_handler.take().map(|h| Box::new(h) as Box<_>),
331 can_gc,
332 );
333 let realm = enter_realm(global);
334 let comp = InRealm::Entered(&realm);
335 promise.append_native_handler(&handler, comp, can_gc);
336 }
337 }
338
339 fn reject_close_and_closed_promise_if_needed(&self, cx: SafeJSContext, can_gc: CanGc) {
341 assert!(self.is_errored());
343
344 rooted!(in(*cx) let mut stored_error = UndefinedValue());
345 self.get_stored_error(stored_error.handle_mut());
346
347 let close_request = self.close_request.borrow_mut().take();
349 if let Some(close_request) = close_request {
350 assert!(self.in_flight_close_request.borrow().is_none());
352
353 close_request.reject_native(&stored_error.handle(), can_gc)
355
356 }
359
360 if let Some(writer) = self.writer.get() {
363 writer.reject_closed_promise_with_stored_error(&stored_error.handle(), can_gc);
365
366 writer.set_close_promise_is_handled();
368 }
369 }
370
371 pub(crate) fn close_queued_or_in_flight(&self) -> bool {
373 let close_requested = self.close_request.borrow().is_some();
374 let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
375
376 close_requested || in_flight_close_requested
377 }
378
379 pub(crate) fn finish_in_flight_write(&self, can_gc: CanGc) {
381 let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
382 unreachable!("Stream should have a write request");
384 };
385
386 in_flight_write_request.resolve_native(&(), can_gc);
388
389 }
392
393 pub(crate) fn start_erroring(
395 &self,
396 cx: SafeJSContext,
397 global: &GlobalScope,
398 error: SafeHandleValue,
399 can_gc: CanGc,
400 ) {
401 assert!(self.stored_error.get().is_undefined());
403
404 assert!(self.is_writable());
406
407 let Some(controller) = self.controller.get() else {
409 unreachable!("Stream should have a controller.");
411 };
412
413 self.state.set(WritableStreamState::Erroring);
415
416 self.stored_error.set(*error);
418
419 if let Some(writer) = self.writer.get() {
421 writer.ensure_ready_promise_rejected(global, error, can_gc);
423 }
424
425 if !self.has_operations_marked_inflight() && controller.started() {
427 self.finish_erroring(cx, global, can_gc);
429 }
430 }
431
432 pub(crate) fn deal_with_rejection(
434 &self,
435 cx: SafeJSContext,
436 global: &GlobalScope,
437 error: SafeHandleValue,
438 can_gc: CanGc,
439 ) {
440 if self.is_writable() {
444 self.start_erroring(cx, global, error, can_gc);
446
447 return;
449 }
450
451 assert!(self.is_erroring());
453
454 self.finish_erroring(cx, global, can_gc);
456 }
457
458 pub(crate) fn mark_first_write_request_in_flight(&self) {
460 let mut in_flight_write_request = self.in_flight_write_request.borrow_mut();
461 let mut write_requests = self.write_requests.borrow_mut();
462
463 assert!(in_flight_write_request.is_none());
465
466 assert!(!write_requests.is_empty());
468
469 let write_request = write_requests.pop_front().unwrap();
472
473 *in_flight_write_request = Some(write_request);
475 }
476
477 pub(crate) fn mark_close_request_in_flight(&self) {
479 let mut in_flight_close_request = self.in_flight_close_request.borrow_mut();
480 let mut close_request = self.close_request.borrow_mut();
481
482 assert!(in_flight_close_request.is_none());
484
485 assert!(close_request.is_some());
487
488 let close_request = close_request.take().unwrap();
491
492 *in_flight_close_request = Some(close_request);
494 }
495
496 pub(crate) fn finish_in_flight_close(&self, cx: SafeJSContext, can_gc: CanGc) {
498 let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
499 unreachable!("in_flight_close_request must be Some");
501 };
502
503 in_flight_close_request.resolve_native(&(), can_gc);
505
506 assert!(self.is_writable() || self.is_erroring());
511
512 if self.is_erroring() {
514 self.stored_error.set(UndefinedValue());
516
517 rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
519 if let Some(pending_abort_request) = &*pending_abort_request {
520 pending_abort_request.promise.resolve_native(&(), can_gc);
522
523 }
526 }
527
528 self.state.set(WritableStreamState::Closed);
530
531 if let Some(writer) = self.writer.get() {
533 writer.resolve_closed_promise_with_undefined(can_gc);
536 }
537
538 assert!(self.pending_abort_request.borrow().is_none());
540
541 assert!(self.stored_error.get().is_undefined());
543 }
544
545 pub(crate) fn finish_in_flight_close_with_error(
547 &self,
548 cx: SafeJSContext,
549 global: &GlobalScope,
550 error: SafeHandleValue,
551 can_gc: CanGc,
552 ) {
553 let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
554 unreachable!("Inflight close request must be defined.");
556 };
557
558 in_flight_close_request.reject_native(&error, can_gc);
560
561 assert!(self.is_erroring() || self.is_writable());
566
567 rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
569 if let Some(pending_abort_request) = &*pending_abort_request {
570 pending_abort_request.promise.reject_native(&error, can_gc);
572
573 }
576
577 self.deal_with_rejection(cx, global, error, can_gc);
579 }
580
581 pub(crate) fn finish_in_flight_write_with_error(
583 &self,
584 cx: SafeJSContext,
585 global: &GlobalScope,
586 error: SafeHandleValue,
587 can_gc: CanGc,
588 ) {
589 let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
590 unreachable!("Inflight write request must be defined.");
592 };
593
594 in_flight_write_request.reject_native(&error, can_gc);
596
597 assert!(self.is_erroring() || self.is_writable());
602
603 self.deal_with_rejection(cx, global, error, can_gc);
605 }
606
607 pub(crate) fn get_writer(&self) -> Option<DomRoot<WritableStreamDefaultWriter>> {
608 self.writer.get()
609 }
610
611 pub(crate) fn set_writer(&self, writer: Option<&WritableStreamDefaultWriter>) {
612 self.writer.set(writer);
613 }
614
615 pub(crate) fn set_backpressure(&self, backpressure: bool) {
616 self.backpressure.set(backpressure);
617 }
618
619 pub(crate) fn get_backpressure(&self) -> bool {
620 self.backpressure.get()
621 }
622
623 pub(crate) fn is_locked(&self) -> bool {
625 self.get_writer().is_some()
628 }
629
630 pub(crate) fn add_write_request(&self, global: &GlobalScope, can_gc: CanGc) -> Rc<Promise> {
632 assert!(self.is_locked());
634
635 assert!(self.is_writable());
637
638 let promise = Promise::new(global, can_gc);
640
641 self.write_requests.borrow_mut().push_back(promise.clone());
643
644 promise
646 }
647
648 pub(crate) fn get_controller(&self) -> Option<DomRoot<WritableStreamDefaultController>> {
650 self.controller.get()
651 }
652
653 pub(crate) fn abort(
655 &self,
656 cx: SafeJSContext,
657 global: &GlobalScope,
658 provided_reason: SafeHandleValue,
659 realm: InRealm,
660 can_gc: CanGc,
661 ) -> Rc<Promise> {
662 if self.is_closed() || self.is_errored() {
664 return Promise::new_resolved(global, cx, (), can_gc);
666 }
667
668 self.get_controller()
670 .expect("Stream must have a controller.")
671 .signal_abort(cx, provided_reason, realm, can_gc);
672
673 let state = self.state.get();
675
676 if matches!(
678 state,
679 WritableStreamState::Closed | WritableStreamState::Errored
680 ) {
681 return Promise::new_resolved(global, cx, (), can_gc);
682 }
683
684 if self.pending_abort_request.borrow().is_some() {
686 return self
688 .pending_abort_request
689 .borrow()
690 .as_ref()
691 .expect("Pending abort request must be Some.")
692 .promise
693 .clone();
694 }
695
696 assert!(self.is_writable() || self.is_erroring());
698
699 let mut was_already_erroring = false;
701 rooted!(in(*cx) let undefined_reason = UndefinedValue());
702
703 let reason = if self.is_erroring() {
705 was_already_erroring = true;
707
708 undefined_reason.handle()
710 } else {
711 provided_reason
713 };
714
715 let promise = Promise::new(global, can_gc);
717
718 *self.pending_abort_request.borrow_mut() = Some(PendingAbortRequest {
723 promise: promise.clone(),
724 reason: Heap::boxed(reason.get()),
725 was_already_erroring,
726 });
727
728 if !was_already_erroring {
730 self.start_erroring(cx, global, reason, can_gc);
732 }
733
734 promise
736 }
737
738 pub(crate) fn close(
740 &self,
741 cx: SafeJSContext,
742 global: &GlobalScope,
743 can_gc: CanGc,
744 ) -> Rc<Promise> {
745 if self.is_closed() || self.is_errored() {
748 let promise = Promise::new(global, can_gc);
750 promise.reject_error(
751 Error::Type("Stream is closed or errored.".to_string()),
752 can_gc,
753 );
754 return promise;
755 }
756
757 assert!(self.is_writable() || self.is_erroring());
759
760 assert!(!self.close_queued_or_in_flight());
762
763 let promise = Promise::new(global, can_gc);
765
766 *self.close_request.borrow_mut() = Some(promise.clone());
768
769 if let Some(writer) = self.writer.get() {
772 if self.get_backpressure() && self.is_writable() {
775 writer.resolve_ready_promise_with_undefined(can_gc);
777 }
778 }
779
780 let Some(controller) = self.controller.get() else {
782 unreachable!("Stream must have a controller.");
783 };
784 controller.close(cx, global, can_gc);
785
786 promise
788 }
789
790 pub(crate) fn get_desired_size(&self) -> Option<f64> {
793 if self.is_errored() || self.is_erroring() {
799 return None;
800 }
801
802 if self.is_closed() {
804 return Some(0.);
805 }
806
807 let Some(controller) = self.controller.get() else {
808 unreachable!("Stream must have a controller.");
809 };
810 Some(controller.get_desired_size())
811 }
812
813 pub(crate) fn aquire_default_writer(
815 &self,
816 cx: SafeJSContext,
817 global: &GlobalScope,
818 can_gc: CanGc,
819 ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
820 let writer = WritableStreamDefaultWriter::new(global, None, can_gc);
822
823 writer.setup(cx, self, can_gc)?;
825
826 Ok(writer)
828 }
829
830 pub(crate) fn update_backpressure(
832 &self,
833 backpressure: bool,
834 global: &GlobalScope,
835 can_gc: CanGc,
836 ) {
837 self.is_writable();
839
840 assert!(!self.close_queued_or_in_flight());
842
843 let writer = self.get_writer();
845
846 if let Some(writer) = writer {
847 if backpressure != self.get_backpressure() {
849 if backpressure {
851 let promise = Promise::new(global, can_gc);
853 writer.set_ready_promise(promise);
854 } else {
855 assert!(!backpressure);
858 writer.resolve_ready_promise_with_undefined(can_gc);
860 }
861 }
862 }
863
864 self.set_backpressure(backpressure);
866 }
867
868 pub(crate) fn setup_cross_realm_transform_writable(
870 &self,
871 cx: SafeJSContext,
872 port: &MessagePort,
873 can_gc: CanGc,
874 ) {
875 let port_id = port.message_port_id();
876 let global = self.global();
877
878 let size_algorithm = extract_size_algorithm(&QueuingStrategy::default(), can_gc);
884
885 let backpressure_promise = Rc::new(RefCell::new(Some(Promise::new(&global, can_gc))));
889
890 let controller = WritableStreamDefaultController::new(
892 &global,
893 UnderlyingSinkType::Transfer {
894 backpressure_promise: backpressure_promise.clone(),
895 port: Dom::from_ref(port),
896 },
897 1.0,
898 size_algorithm,
899 can_gc,
900 );
901
902 rooted!(in(*cx) let cross_realm_transform_writable = CrossRealmTransformWritable {
905 controller: Dom::from_ref(&controller),
906 backpressure_promise: backpressure_promise.clone(),
907 });
908 global.note_cross_realm_transform_writable(&cross_realm_transform_writable, port_id);
909
910 port.Start(can_gc);
912
913 controller
915 .setup(cx, &global, self, can_gc)
916 .expect("Setup for transfer cannot fail");
917 }
918 #[allow(clippy::too_many_arguments)]
920 pub(crate) fn setup_from_underlying_sink(
921 &self,
922 cx: SafeJSContext,
923 global: &GlobalScope,
924 stream: &WritableStream,
925 underlying_sink_obj: SafeHandleObject,
926 underlying_sink: &UnderlyingSink,
927 strategy_hwm: f64,
928 strategy_size: Rc<QueuingStrategySize>,
929 can_gc: CanGc,
930 ) -> Result<(), Error> {
931 let controller = WritableStreamDefaultController::new(
957 global,
958 UnderlyingSinkType::new_js(
959 underlying_sink.abort.clone(),
960 underlying_sink.start.clone(),
961 underlying_sink.close.clone(),
962 underlying_sink.write.clone(),
963 ),
964 strategy_hwm,
965 strategy_size,
966 can_gc,
967 );
968
969 controller.set_underlying_sink_this_object(underlying_sink_obj);
972
973 controller.setup(cx, global, stream, can_gc)
975 }
976}
977
978#[cfg_attr(crown, allow(crown::unrooted_must_root))]
980pub(crate) fn create_writable_stream(
981 cx: SafeJSContext,
982 global: &GlobalScope,
983 writable_high_water_mark: f64,
984 writable_size_algorithm: Rc<QueuingStrategySize>,
985 underlying_sink_type: UnderlyingSinkType,
986 can_gc: CanGc,
987) -> Fallible<DomRoot<WritableStream>> {
988 assert!(writable_high_water_mark >= 0.0);
990
991 let stream = WritableStream::new_with_proto(global, None, can_gc);
994
995 let controller = WritableStreamDefaultController::new(
997 global,
998 underlying_sink_type,
999 writable_high_water_mark,
1000 writable_size_algorithm,
1001 can_gc,
1002 );
1003
1004 controller.setup(cx, global, &stream, can_gc)?;
1007
1008 Ok(stream)
1010}
1011
1012impl WritableStreamMethods<crate::DomTypeHolder> for WritableStream {
1013 fn Constructor(
1015 cx: SafeJSContext,
1016 global: &GlobalScope,
1017 proto: Option<SafeHandleObject>,
1018 can_gc: CanGc,
1019 underlying_sink: Option<*mut JSObject>,
1020 strategy: &QueuingStrategy,
1021 ) -> Fallible<DomRoot<WritableStream>> {
1022 rooted!(in(*cx) let underlying_sink_obj = underlying_sink.unwrap_or(ptr::null_mut()));
1024
1025 let underlying_sink_dict = if !underlying_sink_obj.is_null() {
1028 rooted!(in(*cx) let obj_val = ObjectValue(underlying_sink_obj.get()));
1029 match UnderlyingSink::new(cx, obj_val.handle(), can_gc) {
1030 Ok(ConversionResult::Success(val)) => val,
1031 Ok(ConversionResult::Failure(error)) => return Err(Error::Type(error.to_string())),
1032 _ => {
1033 return Err(Error::JSFailed);
1034 },
1035 }
1036 } else {
1037 UnderlyingSink::empty()
1038 };
1039
1040 if !underlying_sink_dict.type_.handle().is_undefined() {
1041 return Err(Error::Range("type is set".to_string()));
1043 }
1044
1045 let stream = WritableStream::new_with_proto(global, proto, can_gc);
1047
1048 let size_algorithm = extract_size_algorithm(strategy, can_gc);
1050
1051 let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
1053
1054 stream.setup_from_underlying_sink(
1057 cx,
1058 global,
1059 &stream,
1060 underlying_sink_obj.handle(),
1061 &underlying_sink_dict,
1062 high_water_mark,
1063 size_algorithm,
1064 can_gc,
1065 )?;
1066
1067 Ok(stream)
1068 }
1069
1070 fn Locked(&self) -> bool {
1072 self.is_locked()
1074 }
1075
1076 fn Abort(
1078 &self,
1079 cx: SafeJSContext,
1080 reason: SafeHandleValue,
1081 realm: InRealm,
1082 can_gc: CanGc,
1083 ) -> Rc<Promise> {
1084 let global = GlobalScope::from_safe_context(cx, realm);
1085
1086 if self.is_locked() {
1088 let promise = Promise::new(&global, can_gc);
1090 promise.reject_error(Error::Type("Stream is locked.".to_string()), can_gc);
1091 return promise;
1092 }
1093
1094 self.abort(cx, &global, reason, realm, can_gc)
1096 }
1097
1098 fn Close(&self, realm: InRealm, can_gc: CanGc) -> Rc<Promise> {
1100 let cx = GlobalScope::get_cx();
1101 let global = GlobalScope::from_safe_context(cx, realm);
1102
1103 if self.is_locked() {
1105 let promise = Promise::new(&global, can_gc);
1107 promise.reject_error(Error::Type("Stream is locked.".to_string()), can_gc);
1108 return promise;
1109 }
1110
1111 if self.close_queued_or_in_flight() {
1113 let promise = Promise::new(&global, can_gc);
1115 promise.reject_error(
1116 Error::Type("Stream has closed queued or in-flight".to_string()),
1117 can_gc,
1118 );
1119 return promise;
1120 }
1121
1122 self.close(cx, &global, can_gc)
1124 }
1125
1126 fn GetWriter(
1128 &self,
1129 realm: InRealm,
1130 can_gc: CanGc,
1131 ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
1132 let cx = GlobalScope::get_cx();
1133 let global = GlobalScope::from_safe_context(cx, realm);
1134
1135 self.aquire_default_writer(cx, &global, can_gc)
1137 }
1138}
1139
1140impl js::gc::Rootable for CrossRealmTransformWritable {}
1141
1142#[derive(Clone, JSTraceable, MallocSizeOf)]
1146#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
1147pub(crate) struct CrossRealmTransformWritable {
1148 controller: Dom<WritableStreamDefaultController>,
1150
1151 #[ignore_malloc_size_of = "Rc is hard"]
1153 backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
1154}
1155
1156impl CrossRealmTransformWritable {
1157 #[allow(unsafe_code)]
1160 pub(crate) fn handle_message(
1161 &self,
1162 cx: SafeJSContext,
1163 global: &GlobalScope,
1164 message: SafeHandleValue,
1165 _realm: InRealm,
1166 can_gc: CanGc,
1167 ) {
1168 rooted!(in(*cx) let mut value = UndefinedValue());
1169 let type_string =
1170 unsafe { get_type_and_value_from_message(cx, message, value.handle_mut(), can_gc) };
1171
1172 if type_string == "error" {
1177 self.controller
1179 .error_if_needed(cx, value.handle(), global, can_gc);
1180 }
1181
1182 let backpressure_promise = self.backpressure_promise.borrow_mut().take();
1183
1184 if let Some(promise) = backpressure_promise {
1187 promise.resolve_native(&(), can_gc);
1189
1190 }
1193 }
1194
1195 pub(crate) fn handle_error(
1198 &self,
1199 cx: SafeJSContext,
1200 global: &GlobalScope,
1201 port: &MessagePort,
1202 _realm: InRealm,
1203 can_gc: CanGc,
1204 ) {
1205 let error = DOMException::new(global, DOMErrorName::DataCloneError, can_gc);
1207 rooted!(in(*cx) let mut rooted_error = UndefinedValue());
1208 error.safe_to_jsval(cx, rooted_error.handle_mut());
1209
1210 port.cross_realm_transform_send_error(rooted_error.handle(), can_gc);
1212
1213 self.controller
1215 .error_if_needed(cx, rooted_error.handle(), global, can_gc);
1216
1217 global.disentangle_port(port, can_gc);
1219 }
1220}
1221
1222impl Transferable for WritableStream {
1224 type Index = MessagePortIndex;
1225 type Data = MessagePortImpl;
1226
1227 fn transfer(&self) -> Fallible<(MessagePortId, MessagePortImpl)> {
1229 if self.is_locked() {
1232 return Err(Error::DataClone(None));
1233 }
1234
1235 let global = self.global();
1236 let realm = enter_realm(&*global);
1237 let comp = InRealm::Entered(&realm);
1238 let cx = GlobalScope::get_cx();
1239 let can_gc = CanGc::note();
1240
1241 let port_1 = MessagePort::new(&global, can_gc);
1243 global.track_message_port(&port_1, None);
1244
1245 let port_2 = MessagePort::new(&global, can_gc);
1247 global.track_message_port(&port_2, None);
1248
1249 global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
1251
1252 let readable = ReadableStream::new_with_proto(&global, None, can_gc);
1254
1255 readable.setup_cross_realm_transform_readable(cx, &port_1, can_gc);
1257
1258 let promise = readable.pipe_to(cx, &global, self, false, false, false, None, comp, can_gc);
1260
1261 promise.set_promise_is_handled();
1263
1264 port_2.transfer()
1266 }
1267
1268 fn transfer_receive(
1270 owner: &GlobalScope,
1271 id: MessagePortId,
1272 port_impl: MessagePortImpl,
1273 ) -> Result<DomRoot<Self>, ()> {
1274 let cx = GlobalScope::get_cx();
1275 let can_gc = CanGc::note();
1276
1277 let value = WritableStream::new_with_proto(owner, None, can_gc);
1280
1281 let transferred_port = MessagePort::transfer_receive(owner, id, port_impl)?;
1288
1289 value.setup_cross_realm_transform_writable(cx, &transferred_port, can_gc);
1291 Ok(value)
1292 }
1293
1294 fn serialized_storage<'a>(
1296 data: StructuredData<'a, '_>,
1297 ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
1298 match data {
1299 StructuredData::Reader(r) => &mut r.port_impls,
1300 StructuredData::Writer(w) => &mut w.ports,
1301 }
1302 }
1303}