1use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::mem;
8use std::ptr::{self};
9use std::rc::Rc;
10
11use dom_struct::dom_struct;
12use js::context::JSContext;
13use js::jsapi::{Heap, JSObject};
14use js::jsval::{JSVal, ObjectValue, UndefinedValue};
15use js::realm::CurrentRealm;
16use js::rust::{
17 HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
18 MutableHandleValue as SafeMutableHandleValue,
19};
20use rustc_hash::FxHashMap;
21use script_bindings::cell::DomRefCell;
22use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
23use script_bindings::conversions::SafeToJSValConvertible;
24use script_bindings::reflector::{Reflector, reflect_dom_object_with_proto};
25use servo_base::id::{MessagePortId, MessagePortIndex};
26use servo_constellation_traits::MessagePortImpl;
27
28use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::{
29 QueuingStrategy, QueuingStrategySize,
30};
31use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::UnderlyingSink;
32use crate::dom::bindings::codegen::Bindings::WritableStreamBinding::WritableStreamMethods;
33use crate::dom::bindings::conversions::ConversionResult;
34use crate::dom::bindings::error::{Error, Fallible};
35use crate::dom::bindings::reflector::DomGlobal;
36use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
37use crate::dom::bindings::structuredclone::StructuredData;
38use crate::dom::bindings::transferable::Transferable;
39use crate::dom::domexception::{DOMErrorName, DOMException};
40use crate::dom::globalscope::GlobalScope;
41use crate::dom::messageport::MessagePort;
42use crate::dom::promise::Promise;
43use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
44use crate::dom::readablestream::{ReadableStream, get_type_and_value_from_message};
45use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
46use crate::dom::stream::writablestreamdefaultcontroller::{
47 UnderlyingSinkType, WritableStreamDefaultController,
48};
49use crate::dom::stream::writablestreamdefaultwriter::WritableStreamDefaultWriter;
50use crate::realms::{InRealm, enter_auto_realm};
51use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
52
53impl js::gc::Rootable for AbortAlgorithmFulfillmentHandler {}
54
55#[derive(JSTraceable, MallocSizeOf)]
58#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
59struct AbortAlgorithmFulfillmentHandler {
60 stream: Dom<WritableStream>,
61 #[conditional_malloc_size_of]
62 abort_request_promise: Rc<Promise>,
63}
64
65impl Callback for AbortAlgorithmFulfillmentHandler {
66 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
67 self.abort_request_promise.resolve_native_with_cx(cx, &());
69
70 self.stream
72 .as_rooted()
73 .reject_close_and_closed_promise_if_needed(cx);
74 }
75}
76
77impl js::gc::Rootable for AbortAlgorithmRejectionHandler {}
78
79#[derive(JSTraceable, MallocSizeOf)]
82#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
83struct AbortAlgorithmRejectionHandler {
84 stream: Dom<WritableStream>,
85 #[conditional_malloc_size_of]
86 abort_request_promise: Rc<Promise>,
87}
88
89impl Callback for AbortAlgorithmRejectionHandler {
90 fn callback(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) {
91 self.abort_request_promise
93 .reject_native_with_cx(cx, &reason);
94
95 self.stream
97 .as_rooted()
98 .reject_close_and_closed_promise_if_needed(cx);
99 }
100}
101
102impl js::gc::Rootable for PendingAbortRequest {}
103
104#[derive(JSTraceable, MallocSizeOf)]
106#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
107struct PendingAbortRequest {
108 #[conditional_malloc_size_of]
110 promise: Rc<Promise>,
111
112 #[ignore_malloc_size_of = "mozjs"]
114 reason: Box<Heap<JSVal>>,
115
116 was_already_erroring: bool,
118}
119
120#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf)]
122pub(crate) enum WritableStreamState {
123 #[default]
124 Writable,
125 Closed,
126 Erroring,
127 Errored,
128}
129
130#[dom_struct]
132pub struct WritableStream {
133 reflector_: Reflector,
134
135 backpressure: Cell<bool>,
137
138 #[conditional_malloc_size_of]
140 close_request: DomRefCell<Option<Rc<Promise>>>,
141
142 controller: MutNullableDom<WritableStreamDefaultController>,
144
145 detached: Cell<bool>,
147
148 #[conditional_malloc_size_of]
150 in_flight_write_request: DomRefCell<Option<Rc<Promise>>>,
151
152 #[conditional_malloc_size_of]
154 in_flight_close_request: DomRefCell<Option<Rc<Promise>>>,
155
156 pending_abort_request: DomRefCell<Option<PendingAbortRequest>>,
158
159 state: Cell<WritableStreamState>,
161
162 #[ignore_malloc_size_of = "mozjs"]
164 stored_error: Heap<JSVal>,
165
166 writer: MutNullableDom<WritableStreamDefaultWriter>,
168
169 #[conditional_malloc_size_of]
171 write_requests: DomRefCell<VecDeque<Rc<Promise>>>,
172}
173
174impl WritableStream {
175 fn new_inherited() -> WritableStream {
177 WritableStream {
178 reflector_: Reflector::new(),
179 backpressure: Default::default(),
180 close_request: Default::default(),
181 controller: Default::default(),
182 detached: Default::default(),
183 in_flight_write_request: Default::default(),
184 in_flight_close_request: Default::default(),
185 pending_abort_request: Default::default(),
186 state: Default::default(),
187 stored_error: Default::default(),
188 writer: Default::default(),
189 write_requests: Default::default(),
190 }
191 }
192
193 pub(crate) fn new_with_proto(
194 global: &GlobalScope,
195 proto: Option<SafeHandleObject>,
196 can_gc: CanGc,
197 ) -> DomRoot<WritableStream> {
198 reflect_dom_object_with_proto(
199 Box::new(WritableStream::new_inherited()),
200 global,
201 proto,
202 can_gc,
203 )
204 }
205
206 pub(crate) fn assert_no_controller(&self) {
209 assert!(self.controller.get().is_none());
210 }
211
212 pub(crate) fn set_default_controller(&self, controller: &WritableStreamDefaultController) {
215 self.controller.set(Some(controller));
216 }
217
218 pub(crate) fn get_default_controller(&self) -> DomRoot<WritableStreamDefaultController> {
219 self.controller.get().expect("Controller should be set.")
220 }
221
222 pub(crate) fn is_writable(&self) -> bool {
223 matches!(self.state.get(), WritableStreamState::Writable)
224 }
225
226 pub(crate) fn is_erroring(&self) -> bool {
227 matches!(self.state.get(), WritableStreamState::Erroring)
228 }
229
230 pub(crate) fn is_errored(&self) -> bool {
231 matches!(self.state.get(), WritableStreamState::Errored)
232 }
233
234 pub(crate) fn is_closed(&self) -> bool {
235 matches!(self.state.get(), WritableStreamState::Closed)
236 }
237
238 pub(crate) fn has_in_flight_write_request(&self) -> bool {
239 self.in_flight_write_request.borrow().is_some()
240 }
241
242 pub(crate) fn has_operations_marked_inflight(&self) -> bool {
244 let in_flight_write_requested = self.in_flight_write_request.borrow().is_some();
245 let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
246
247 in_flight_write_requested || in_flight_close_requested
248 }
249
250 pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
252 handle_mut.set(self.stored_error.get());
253 }
254
255 pub(crate) fn finish_erroring(&self, cx: &mut JSContext, global: &GlobalScope) {
257 assert!(self.is_erroring());
259
260 assert!(!self.has_operations_marked_inflight());
262
263 self.state.set(WritableStreamState::Errored);
265
266 let Some(controller) = self.controller.get() else {
268 unreachable!("Stream should have a controller.");
269 };
270 controller.perform_error_steps();
271
272 rooted!(&in(cx) let mut stored_error = UndefinedValue());
274 self.get_stored_error(stored_error.handle_mut());
275
276 let write_requests = mem::take(&mut *self.write_requests.borrow_mut());
278 for request in write_requests {
279 request.reject_with_cx(cx, stored_error.handle());
281 }
282
283 if self.pending_abort_request.borrow().is_none() {
288 self.reject_close_and_closed_promise_if_needed(cx);
290
291 return;
293 }
294
295 rooted!(&in(cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
298 if let Some(pending_abort_request) = &*pending_abort_request {
299 if pending_abort_request.was_already_erroring {
301 pending_abort_request
303 .promise
304 .reject_with_cx(cx, stored_error.handle());
305
306 self.reject_close_and_closed_promise_if_needed(cx);
308
309 return;
311 }
312
313 rooted!(&in(cx) let mut reason = UndefinedValue());
315 reason.set(pending_abort_request.reason.get());
316 let promise = controller.abort_steps(cx, global, reason.handle());
317
318 rooted!(&in(cx) let mut fulfillment_handler = Some(AbortAlgorithmFulfillmentHandler {
320 stream: Dom::from_ref(self),
321 abort_request_promise: pending_abort_request.promise.clone(),
322 }));
323
324 rooted!(&in(cx) let mut rejection_handler = Some(AbortAlgorithmRejectionHandler {
326 stream: Dom::from_ref(self),
327 abort_request_promise: pending_abort_request.promise.clone(),
328 }));
329
330 let handler = PromiseNativeHandler::new(
331 cx,
332 global,
333 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
334 rejection_handler.take().map(|h| Box::new(h) as Box<_>),
335 );
336
337 let mut realm = enter_auto_realm(cx, global);
338 let cx = &mut realm.current_realm();
339 promise.append_native_handler(cx, &handler);
340 }
341 }
342
343 fn reject_close_and_closed_promise_if_needed(&self, cx: &mut JSContext) {
345 assert!(self.is_errored());
347
348 rooted!(&in(cx) let mut stored_error = UndefinedValue());
349 self.get_stored_error(stored_error.handle_mut());
350
351 let close_request = self.close_request.borrow_mut().take();
353 if let Some(close_request) = close_request {
354 assert!(self.in_flight_close_request.borrow().is_none());
356
357 close_request.reject_native(&stored_error.handle(), CanGc::from_cx(cx))
359
360 }
363
364 if let Some(writer) = self.writer.get() {
367 writer.reject_closed_promise_with_stored_error(cx, &stored_error.handle());
369
370 writer.set_close_promise_is_handled();
372 }
373 }
374
375 pub(crate) fn close_queued_or_in_flight(&self) -> bool {
377 let close_requested = self.close_request.borrow().is_some();
378 let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
379
380 close_requested || in_flight_close_requested
381 }
382
383 pub(crate) fn finish_in_flight_write(&self, can_gc: CanGc) {
385 let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
386 unreachable!("Stream should have a write request");
388 };
389
390 in_flight_write_request.resolve_native(&(), can_gc);
392
393 }
396
397 pub(crate) fn start_erroring(
399 &self,
400 cx: &mut JSContext,
401 global: &GlobalScope,
402 error: SafeHandleValue,
403 ) {
404 assert!(self.stored_error.get().is_undefined());
406
407 assert!(self.is_writable());
409
410 let Some(controller) = self.controller.get() else {
412 unreachable!("Stream should have a controller.");
414 };
415
416 self.state.set(WritableStreamState::Erroring);
418
419 self.stored_error.set(*error);
421
422 if let Some(writer) = self.writer.get() {
424 writer.ensure_ready_promise_rejected(global, error, CanGc::from_cx(cx));
426 }
427
428 if !self.has_operations_marked_inflight() && controller.started() {
430 self.finish_erroring(cx, global);
432 }
433 }
434
435 pub(crate) fn deal_with_rejection(
437 &self,
438 cx: &mut JSContext,
439 global: &GlobalScope,
440 error: SafeHandleValue,
441 ) {
442 if self.is_writable() {
446 self.start_erroring(cx, global, error);
448
449 return;
451 }
452
453 assert!(self.is_erroring());
455
456 self.finish_erroring(cx, global);
458 }
459
460 pub(crate) fn mark_first_write_request_in_flight(&self) {
462 let mut in_flight_write_request = self.in_flight_write_request.borrow_mut();
463 let mut write_requests = self.write_requests.borrow_mut();
464
465 assert!(in_flight_write_request.is_none());
467
468 assert!(!write_requests.is_empty());
470
471 let write_request = write_requests.pop_front().unwrap();
474
475 *in_flight_write_request = Some(write_request);
477 }
478
479 pub(crate) fn mark_close_request_in_flight(&self) {
481 let mut in_flight_close_request = self.in_flight_close_request.borrow_mut();
482 let mut close_request = self.close_request.borrow_mut();
483
484 assert!(in_flight_close_request.is_none());
486
487 assert!(close_request.is_some());
489
490 let close_request = close_request.take().unwrap();
493
494 *in_flight_close_request = Some(close_request);
496 }
497
498 pub(crate) fn finish_in_flight_close(&self, cx: SafeJSContext, can_gc: CanGc) {
500 let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
501 unreachable!("in_flight_close_request must be Some");
503 };
504
505 in_flight_close_request.resolve_native(&(), can_gc);
507
508 assert!(self.is_writable() || self.is_erroring());
513
514 if self.is_erroring() {
516 self.stored_error.set(UndefinedValue());
518
519 rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
521 if let Some(pending_abort_request) = &*pending_abort_request {
522 pending_abort_request.promise.resolve_native(&(), can_gc);
524
525 }
528 }
529
530 self.state.set(WritableStreamState::Closed);
532
533 if let Some(writer) = self.writer.get() {
535 writer.resolve_closed_promise_with_undefined(can_gc);
538 }
539
540 assert!(self.pending_abort_request.borrow().is_none());
542
543 assert!(self.stored_error.get().is_undefined());
545 }
546
547 pub(crate) fn finish_in_flight_close_with_error(
549 &self,
550 cx: &mut JSContext,
551 global: &GlobalScope,
552 error: SafeHandleValue,
553 ) {
554 let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
555 unreachable!("Inflight close request must be defined.");
557 };
558
559 in_flight_close_request.reject_native_with_cx(cx, &error);
561
562 assert!(self.is_erroring() || self.is_writable());
567
568 rooted!(&in(cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
570 if let Some(pending_abort_request) = &*pending_abort_request {
571 pending_abort_request
573 .promise
574 .reject_native_with_cx(cx, &error);
575
576 }
579
580 self.deal_with_rejection(cx, global, error);
582 }
583
584 pub(crate) fn finish_in_flight_write_with_error(
586 &self,
587 cx: &mut JSContext,
588 global: &GlobalScope,
589 error: SafeHandleValue,
590 ) {
591 let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
592 unreachable!("Inflight write request must be defined.");
594 };
595
596 in_flight_write_request.reject_native_with_cx(cx, &error);
598
599 assert!(self.is_erroring() || self.is_writable());
604
605 self.deal_with_rejection(cx, global, error);
607 }
608
609 pub(crate) fn get_writer(&self) -> Option<DomRoot<WritableStreamDefaultWriter>> {
610 self.writer.get()
611 }
612
613 pub(crate) fn set_writer(&self, writer: Option<&WritableStreamDefaultWriter>) {
614 self.writer.set(writer);
615 }
616
617 pub(crate) fn set_backpressure(&self, backpressure: bool) {
618 self.backpressure.set(backpressure);
619 }
620
621 pub(crate) fn get_backpressure(&self) -> bool {
622 self.backpressure.get()
623 }
624
625 pub(crate) fn is_locked(&self) -> bool {
627 self.get_writer().is_some()
630 }
631
632 pub(crate) fn add_write_request(&self, global: &GlobalScope, can_gc: CanGc) -> Rc<Promise> {
634 assert!(self.is_locked());
636
637 assert!(self.is_writable());
639
640 let promise = Promise::new(global, can_gc);
642
643 self.write_requests.borrow_mut().push_back(promise.clone());
645
646 promise
648 }
649
650 pub(crate) fn get_controller(&self) -> Option<DomRoot<WritableStreamDefaultController>> {
652 self.controller.get()
653 }
654
655 pub(crate) fn abort(
657 &self,
658 cx: &mut CurrentRealm,
659 global: &GlobalScope,
660 provided_reason: SafeHandleValue,
661 ) -> Rc<Promise> {
662 if self.is_closed() || self.is_errored() {
664 return Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
666 }
667
668 self.get_controller()
670 .expect("Stream must have a controller.")
671 .signal_abort(cx, provided_reason);
672
673 let state = self.state.get();
675
676 if matches!(
678 state,
679 WritableStreamState::Closed | WritableStreamState::Errored
680 ) {
681 return Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
682 }
683
684 if self.pending_abort_request.borrow().is_some() {
686 return self
688 .pending_abort_request
689 .borrow()
690 .as_ref()
691 .expect("Pending abort request must be Some.")
692 .promise
693 .clone();
694 }
695
696 assert!(self.is_writable() || self.is_erroring());
698
699 let mut was_already_erroring = false;
701 rooted!(&in(cx) let undefined_reason = UndefinedValue());
702
703 let reason = if self.is_erroring() {
705 was_already_erroring = true;
707
708 undefined_reason.handle()
710 } else {
711 provided_reason
713 };
714
715 let promise = Promise::new2(cx, global);
717
718 *self.pending_abort_request.borrow_mut() = Some(PendingAbortRequest {
723 promise: promise.clone(),
724 reason: Heap::boxed(reason.get()),
725 was_already_erroring,
726 });
727
728 if !was_already_erroring {
730 self.start_erroring(cx, global, reason);
732 }
733
734 promise
736 }
737
738 pub(crate) fn close(&self, cx: &mut JSContext, global: &GlobalScope) -> Rc<Promise> {
740 if self.is_closed() || self.is_errored() {
743 let promise = Promise::new2(cx, global);
745 promise
746 .reject_error_with_cx(cx, Error::Type(c"Stream is closed or errored.".to_owned()));
747 return promise;
748 }
749
750 assert!(self.is_writable() || self.is_erroring());
752
753 assert!(!self.close_queued_or_in_flight());
755
756 let promise = Promise::new2(cx, global);
758
759 *self.close_request.borrow_mut() = Some(promise.clone());
761
762 if let Some(writer) = self.writer.get() {
765 if self.get_backpressure() && self.is_writable() {
768 writer.resolve_ready_promise_with_undefined(CanGc::from_cx(cx));
770 }
771 }
772
773 let Some(controller) = self.controller.get() else {
775 unreachable!("Stream must have a controller.");
776 };
777 controller.close(cx, global);
778
779 promise
781 }
782
783 pub(crate) fn get_desired_size(&self) -> Option<f64> {
786 if self.is_errored() || self.is_erroring() {
792 return None;
793 }
794
795 if self.is_closed() {
797 return Some(0.);
798 }
799
800 let Some(controller) = self.controller.get() else {
801 unreachable!("Stream must have a controller.");
802 };
803 Some(controller.get_desired_size())
804 }
805
806 pub(crate) fn aquire_default_writer(
808 &self,
809 cx: SafeJSContext,
810 global: &GlobalScope,
811 can_gc: CanGc,
812 ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
813 let writer = WritableStreamDefaultWriter::new(global, None, can_gc);
815
816 writer.setup(cx, self, can_gc)?;
818
819 Ok(writer)
821 }
822
823 pub(crate) fn update_backpressure(
825 &self,
826 backpressure: bool,
827 global: &GlobalScope,
828 can_gc: CanGc,
829 ) {
830 self.is_writable();
832
833 assert!(!self.close_queued_or_in_flight());
835
836 let writer = self.get_writer();
838
839 if let Some(writer) = writer {
840 if backpressure != self.get_backpressure() {
842 if backpressure {
844 let promise = Promise::new(global, can_gc);
846 writer.set_ready_promise(promise);
847 } else {
848 assert!(!backpressure);
851 writer.resolve_ready_promise_with_undefined(can_gc);
853 }
854 }
855 }
856
857 self.set_backpressure(backpressure);
859 }
860
861 pub(crate) fn setup_cross_realm_transform_writable(
863 &self,
864 cx: &mut JSContext,
865 port: &MessagePort,
866 ) {
867 let port_id = port.message_port_id();
868 let global = self.global();
869
870 let size_algorithm =
876 extract_size_algorithm(&QueuingStrategy::default(), CanGc::from_cx(cx));
877
878 let backpressure_promise = Rc::new(RefCell::new(Some(Promise::new2(cx, &global))));
882
883 let controller = WritableStreamDefaultController::new(
885 &global,
886 UnderlyingSinkType::Transfer {
887 backpressure_promise: backpressure_promise.clone(),
888 port: Dom::from_ref(port),
889 },
890 1.0,
891 size_algorithm,
892 CanGc::from_cx(cx),
893 );
894
895 rooted!(&in(cx) let cross_realm_transform_writable = CrossRealmTransformWritable {
898 controller: Dom::from_ref(&controller),
899 backpressure_promise,
900 });
901 global.note_cross_realm_transform_writable(&cross_realm_transform_writable, port_id);
902
903 port.Start(cx);
905
906 controller
908 .setup(cx, &global, self)
909 .expect("Setup for transfer cannot fail");
910 }
911 #[allow(clippy::too_many_arguments)]
913 fn setup_from_underlying_sink(
914 &self,
915 cx: &mut JSContext,
916 global: &GlobalScope,
917 stream: &WritableStream,
918 underlying_sink_obj: SafeHandleObject,
919 underlying_sink: &UnderlyingSink,
920 strategy_hwm: f64,
921 strategy_size: Rc<QueuingStrategySize>,
922 ) -> Result<(), Error> {
923 let controller = WritableStreamDefaultController::new(
949 global,
950 UnderlyingSinkType::new_js(
951 underlying_sink.abort.clone(),
952 underlying_sink.start.clone(),
953 underlying_sink.close.clone(),
954 underlying_sink.write.clone(),
955 ),
956 strategy_hwm,
957 strategy_size,
958 CanGc::from_cx(cx),
959 );
960
961 controller.set_underlying_sink_this_object(underlying_sink_obj);
964
965 controller.setup(cx, global, stream)
967 }
968}
969
970#[cfg_attr(crown, expect(crown::unrooted_must_root))]
972pub(crate) fn create_writable_stream(
973 cx: &mut JSContext,
974 global: &GlobalScope,
975 writable_high_water_mark: f64,
976 writable_size_algorithm: Rc<QueuingStrategySize>,
977 underlying_sink_type: UnderlyingSinkType,
978) -> Fallible<DomRoot<WritableStream>> {
979 assert!(writable_high_water_mark >= 0.0);
981
982 let stream = WritableStream::new_with_proto(global, None, CanGc::from_cx(cx));
985
986 let controller = WritableStreamDefaultController::new(
988 global,
989 underlying_sink_type,
990 writable_high_water_mark,
991 writable_size_algorithm,
992 CanGc::from_cx(cx),
993 );
994
995 controller.setup(cx, global, &stream)?;
998
999 Ok(stream)
1001}
1002
1003impl WritableStreamMethods<crate::DomTypeHolder> for WritableStream {
1004 fn Constructor(
1006 cx: &mut JSContext,
1007 global: &GlobalScope,
1008 proto: Option<SafeHandleObject>,
1009 underlying_sink: Option<*mut JSObject>,
1010 strategy: &QueuingStrategy,
1011 ) -> Fallible<DomRoot<WritableStream>> {
1012 rooted!(&in(cx) let underlying_sink_obj = underlying_sink.unwrap_or(ptr::null_mut()));
1014
1015 let underlying_sink_dict = if !underlying_sink_obj.is_null() {
1018 rooted!(&in(cx) let obj_val = ObjectValue(underlying_sink_obj.get()));
1019 match UnderlyingSink::new(cx, obj_val.handle()) {
1020 Ok(ConversionResult::Success(val)) => val,
1021 Ok(ConversionResult::Failure(error)) => {
1022 return Err(Error::Type(error.into_owned()));
1023 },
1024 _ => {
1025 return Err(Error::JSFailed);
1026 },
1027 }
1028 } else {
1029 UnderlyingSink::empty()
1030 };
1031
1032 if !underlying_sink_dict.type_.handle().is_undefined() {
1033 return Err(Error::Range(c"type is set".to_owned()));
1035 }
1036
1037 let stream = WritableStream::new_with_proto(global, proto, CanGc::from_cx(cx));
1039
1040 let size_algorithm = extract_size_algorithm(strategy, CanGc::from_cx(cx));
1042
1043 let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
1045
1046 stream.setup_from_underlying_sink(
1049 cx,
1050 global,
1051 &stream,
1052 underlying_sink_obj.handle(),
1053 &underlying_sink_dict,
1054 high_water_mark,
1055 size_algorithm,
1056 )?;
1057
1058 Ok(stream)
1059 }
1060
1061 fn Locked(&self) -> bool {
1063 self.is_locked()
1065 }
1066
1067 fn Abort(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) -> Rc<Promise> {
1069 let global = GlobalScope::from_current_realm(cx);
1070
1071 if self.is_locked() {
1073 let promise = Promise::new2(cx, &global);
1075 promise.reject_error_with_cx(cx, Error::Type(c"Stream is locked.".to_owned()));
1076 return promise;
1077 }
1078
1079 self.abort(cx, &global, reason)
1081 }
1082
1083 fn Close(&self, cx: &mut CurrentRealm) -> Rc<Promise> {
1085 let global = GlobalScope::from_current_realm(cx);
1086
1087 if self.is_locked() {
1089 let promise = Promise::new2(cx, &global);
1091 promise.reject_error_with_cx(cx, Error::Type(c"Stream is locked.".to_owned()));
1092 return promise;
1093 }
1094
1095 if self.close_queued_or_in_flight() {
1097 let promise = Promise::new2(cx, &global);
1099 promise.reject_error_with_cx(
1100 cx,
1101 Error::Type(c"Stream has closed queued or in-flight".to_owned()),
1102 );
1103 return promise;
1104 }
1105
1106 self.close(cx, &global)
1108 }
1109
1110 fn GetWriter(
1112 &self,
1113 realm: InRealm,
1114 can_gc: CanGc,
1115 ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
1116 let cx = GlobalScope::get_cx();
1117 let global = GlobalScope::from_safe_context(cx, realm);
1118
1119 self.aquire_default_writer(cx, &global, can_gc)
1121 }
1122}
1123
1124impl js::gc::Rootable for CrossRealmTransformWritable {}
1125
1126#[derive(Clone, JSTraceable, MallocSizeOf)]
1130#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
1131pub(crate) struct CrossRealmTransformWritable {
1132 controller: Dom<WritableStreamDefaultController>,
1134
1135 #[ignore_malloc_size_of = "nested Rc"]
1137 backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
1138}
1139
1140impl CrossRealmTransformWritable {
1141 pub(crate) fn handle_message(
1144 &self,
1145 cx: &mut CurrentRealm,
1146 global: &GlobalScope,
1147 message: SafeHandleValue,
1148 ) {
1149 rooted!(&in(cx) let mut value = UndefinedValue());
1150 let type_string = get_type_and_value_from_message(cx, message, value.handle_mut());
1151
1152 if type_string == "error" {
1157 self.controller.error_if_needed(cx, value.handle(), global);
1159 }
1160
1161 let backpressure_promise = self.backpressure_promise.borrow_mut().take();
1162
1163 if let Some(promise) = backpressure_promise {
1166 promise.resolve_native_with_cx(cx, &());
1168
1169 }
1172 }
1173
1174 pub(crate) fn handle_error(
1177 &self,
1178 cx: &mut CurrentRealm,
1179 global: &GlobalScope,
1180 port: &MessagePort,
1181 ) {
1182 let error = DOMException::new(global, DOMErrorName::DataCloneError, CanGc::from_cx(cx));
1184 rooted!(&in(cx) let mut rooted_error = UndefinedValue());
1185 error.safe_to_jsval(cx.into(), rooted_error.handle_mut(), CanGc::from_cx(cx));
1186
1187 port.cross_realm_transform_send_error(cx, rooted_error.handle());
1189
1190 self.controller
1192 .error_if_needed(cx, rooted_error.handle(), global);
1193
1194 global.disentangle_port(cx, port);
1196 }
1197}
1198
1199impl Transferable for WritableStream {
1201 type Index = MessagePortIndex;
1202 type Data = MessagePortImpl;
1203
1204 fn transfer(&self, cx: &mut JSContext) -> Fallible<(MessagePortId, MessagePortImpl)> {
1206 if self.is_locked() {
1209 return Err(Error::DataClone(None));
1210 }
1211
1212 let global = self.global();
1213 let mut realm = enter_auto_realm(cx, &*global);
1214 let mut realm = realm.current_realm();
1215 let cx = &mut realm;
1216
1217 let port_1 = MessagePort::new(&global, CanGc::from_cx(cx));
1219 global.track_message_port(&port_1, None);
1220
1221 let port_2 = MessagePort::new(&global, CanGc::from_cx(cx));
1223 global.track_message_port(&port_2, None);
1224
1225 global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
1227
1228 let readable = ReadableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
1230
1231 readable.setup_cross_realm_transform_readable(cx, &port_1);
1233
1234 let promise = readable.pipe_to(cx, &global, self, false, false, false, None);
1236
1237 promise.set_promise_is_handled();
1239
1240 port_2.transfer(cx)
1242 }
1243
1244 fn transfer_receive(
1246 cx: &mut JSContext,
1247 owner: &GlobalScope,
1248 id: MessagePortId,
1249 port_impl: MessagePortImpl,
1250 ) -> Result<DomRoot<Self>, ()> {
1251 let value = WritableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1254
1255 let transferred_port = MessagePort::transfer_receive(cx, owner, id, port_impl)?;
1262
1263 value.setup_cross_realm_transform_writable(cx, &transferred_port);
1265 Ok(value)
1266 }
1267
1268 fn serialized_storage<'a>(
1270 data: StructuredData<'a, '_>,
1271 ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
1272 match data {
1273 StructuredData::Reader(r) => &mut r.port_impls,
1274 StructuredData::Writer(w) => &mut w.ports,
1275 }
1276 }
1277}