1use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::mem;
8use std::ptr::{self};
9use std::rc::Rc;
10
11use dom_struct::dom_struct;
12use js::context::JSContext;
13use js::jsapi::{Heap, JSObject};
14use js::jsval::{JSVal, ObjectValue, UndefinedValue};
15use js::realm::CurrentRealm;
16use js::rust::{
17 HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
18 MutableHandleValue as SafeMutableHandleValue,
19};
20use rustc_hash::FxHashMap;
21use script_bindings::cell::DomRefCell;
22use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
23use script_bindings::conversions::SafeToJSValConvertible;
24use script_bindings::reflector::{Reflector, reflect_dom_object_with_proto_and_cx};
25use servo_base::id::{MessagePortId, MessagePortIndex};
26use servo_constellation_traits::MessagePortImpl;
27
28use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::{
29 QueuingStrategy, QueuingStrategySize,
30};
31use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::UnderlyingSink;
32use crate::dom::bindings::codegen::Bindings::WritableStreamBinding::WritableStreamMethods;
33use crate::dom::bindings::conversions::ConversionResult;
34use crate::dom::bindings::error::{Error, Fallible};
35use crate::dom::bindings::reflector::DomGlobal;
36use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
37use crate::dom::bindings::structuredclone::StructuredData;
38use crate::dom::bindings::transferable::Transferable;
39use crate::dom::domexception::{DOMErrorName, DOMException};
40use crate::dom::globalscope::GlobalScope;
41use crate::dom::messageport::MessagePort;
42use crate::dom::promise::Promise;
43use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
44use crate::dom::readablestream::{ReadableStream, get_type_and_value_from_message};
45use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
46use crate::dom::stream::writablestreamdefaultcontroller::{
47 UnderlyingSinkType, WritableStreamDefaultController,
48};
49use crate::dom::stream::writablestreamdefaultwriter::WritableStreamDefaultWriter;
50use crate::realms::enter_auto_realm;
51
52impl js::gc::Rootable for AbortAlgorithmFulfillmentHandler {}
53
54#[derive(JSTraceable, MallocSizeOf)]
57#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
58struct AbortAlgorithmFulfillmentHandler {
59 stream: Dom<WritableStream>,
60 #[conditional_malloc_size_of]
61 abort_request_promise: Rc<Promise>,
62}
63
64impl Callback for AbortAlgorithmFulfillmentHandler {
65 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
66 self.abort_request_promise.resolve_native(cx, &());
68
69 self.stream
71 .as_rooted()
72 .reject_close_and_closed_promise_if_needed(cx);
73 }
74}
75
76impl js::gc::Rootable for AbortAlgorithmRejectionHandler {}
77
78#[derive(JSTraceable, MallocSizeOf)]
81#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
82struct AbortAlgorithmRejectionHandler {
83 stream: Dom<WritableStream>,
84 #[conditional_malloc_size_of]
85 abort_request_promise: Rc<Promise>,
86}
87
88impl Callback for AbortAlgorithmRejectionHandler {
89 fn callback(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) {
90 self.abort_request_promise.reject_native(cx, &reason);
92
93 self.stream
95 .as_rooted()
96 .reject_close_and_closed_promise_if_needed(cx);
97 }
98}
99
100impl js::gc::Rootable for PendingAbortRequest {}
101
102#[derive(JSTraceable, MallocSizeOf)]
104#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
105struct PendingAbortRequest {
106 #[conditional_malloc_size_of]
108 promise: Rc<Promise>,
109
110 #[ignore_malloc_size_of = "mozjs"]
112 reason: Box<Heap<JSVal>>,
113
114 was_already_erroring: bool,
116}
117
118#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf)]
120pub(crate) enum WritableStreamState {
121 #[default]
122 Writable,
123 Closed,
124 Erroring,
125 Errored,
126}
127
128#[dom_struct]
130pub struct WritableStream {
131 reflector_: Reflector,
132
133 backpressure: Cell<bool>,
135
136 #[conditional_malloc_size_of]
138 close_request: DomRefCell<Option<Rc<Promise>>>,
139
140 controller: MutNullableDom<WritableStreamDefaultController>,
142
143 detached: Cell<bool>,
145
146 #[conditional_malloc_size_of]
148 in_flight_write_request: DomRefCell<Option<Rc<Promise>>>,
149
150 #[conditional_malloc_size_of]
152 in_flight_close_request: DomRefCell<Option<Rc<Promise>>>,
153
154 pending_abort_request: DomRefCell<Option<PendingAbortRequest>>,
156
157 state: Cell<WritableStreamState>,
159
160 #[ignore_malloc_size_of = "mozjs"]
162 stored_error: Heap<JSVal>,
163
164 writer: MutNullableDom<WritableStreamDefaultWriter>,
166
167 #[conditional_malloc_size_of]
169 write_requests: DomRefCell<VecDeque<Rc<Promise>>>,
170}
171
172impl WritableStream {
173 fn new_inherited() -> WritableStream {
175 WritableStream {
176 reflector_: Reflector::new(),
177 backpressure: Default::default(),
178 close_request: Default::default(),
179 controller: Default::default(),
180 detached: Default::default(),
181 in_flight_write_request: Default::default(),
182 in_flight_close_request: Default::default(),
183 pending_abort_request: Default::default(),
184 state: Default::default(),
185 stored_error: Default::default(),
186 writer: Default::default(),
187 write_requests: Default::default(),
188 }
189 }
190
191 pub(crate) fn new_with_proto(
192 cx: &mut JSContext,
193 global: &GlobalScope,
194 proto: Option<SafeHandleObject>,
195 ) -> DomRoot<WritableStream> {
196 reflect_dom_object_with_proto_and_cx(
197 Box::new(WritableStream::new_inherited()),
198 global,
199 proto,
200 cx,
201 )
202 }
203
204 pub(crate) fn assert_no_controller(&self) {
207 assert!(self.controller.get().is_none());
208 }
209
210 pub(crate) fn set_default_controller(&self, controller: &WritableStreamDefaultController) {
213 self.controller.set(Some(controller));
214 }
215
216 pub(crate) fn get_default_controller(&self) -> DomRoot<WritableStreamDefaultController> {
217 self.controller.get().expect("Controller should be set.")
218 }
219
220 pub(crate) fn is_writable(&self) -> bool {
221 matches!(self.state.get(), WritableStreamState::Writable)
222 }
223
224 pub(crate) fn is_erroring(&self) -> bool {
225 matches!(self.state.get(), WritableStreamState::Erroring)
226 }
227
228 pub(crate) fn is_errored(&self) -> bool {
229 matches!(self.state.get(), WritableStreamState::Errored)
230 }
231
232 pub(crate) fn is_closed(&self) -> bool {
233 matches!(self.state.get(), WritableStreamState::Closed)
234 }
235
236 pub(crate) fn has_in_flight_write_request(&self) -> bool {
237 self.in_flight_write_request.borrow().is_some()
238 }
239
240 pub(crate) fn has_operations_marked_inflight(&self) -> bool {
242 let in_flight_write_requested = self.in_flight_write_request.borrow().is_some();
243 let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
244
245 in_flight_write_requested || in_flight_close_requested
246 }
247
248 pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
250 handle_mut.set(self.stored_error.get());
251 }
252
253 pub(crate) fn finish_erroring(&self, cx: &mut JSContext, global: &GlobalScope) {
255 assert!(self.is_erroring());
257
258 assert!(!self.has_operations_marked_inflight());
260
261 self.state.set(WritableStreamState::Errored);
263
264 let Some(controller) = self.controller.get() else {
266 unreachable!("Stream should have a controller.");
267 };
268 controller.perform_error_steps();
269
270 rooted!(&in(cx) let mut stored_error = UndefinedValue());
272 self.get_stored_error(stored_error.handle_mut());
273
274 let write_requests = mem::take(&mut *self.write_requests.borrow_mut());
276 for request in write_requests {
277 request.reject(cx, stored_error.handle());
279 }
280
281 if self.pending_abort_request.borrow().is_none() {
286 self.reject_close_and_closed_promise_if_needed(cx);
288
289 return;
291 }
292
293 rooted!(&in(cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
296 if let Some(pending_abort_request) = &*pending_abort_request {
297 if pending_abort_request.was_already_erroring {
299 pending_abort_request
301 .promise
302 .reject(cx, stored_error.handle());
303
304 self.reject_close_and_closed_promise_if_needed(cx);
306
307 return;
309 }
310
311 rooted!(&in(cx) let mut reason = UndefinedValue());
313 reason.set(pending_abort_request.reason.get());
314 let promise = controller.abort_steps(cx, global, reason.handle());
315
316 rooted!(&in(cx) let mut fulfillment_handler = Some(AbortAlgorithmFulfillmentHandler {
318 stream: Dom::from_ref(self),
319 abort_request_promise: pending_abort_request.promise.clone(),
320 }));
321
322 rooted!(&in(cx) let mut rejection_handler = Some(AbortAlgorithmRejectionHandler {
324 stream: Dom::from_ref(self),
325 abort_request_promise: pending_abort_request.promise.clone(),
326 }));
327
328 let handler = PromiseNativeHandler::new(
329 cx,
330 global,
331 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
332 rejection_handler.take().map(|h| Box::new(h) as Box<_>),
333 );
334
335 let mut realm = enter_auto_realm(cx, global);
336 let cx = &mut realm.current_realm();
337 promise.append_native_handler(cx, &handler);
338 }
339 }
340
341 fn reject_close_and_closed_promise_if_needed(&self, cx: &mut JSContext) {
343 assert!(self.is_errored());
345
346 rooted!(&in(cx) let mut stored_error = UndefinedValue());
347 self.get_stored_error(stored_error.handle_mut());
348
349 let close_request = self.close_request.borrow_mut().take();
351 if let Some(close_request) = close_request {
352 assert!(self.in_flight_close_request.borrow().is_none());
354
355 close_request.reject_native(cx, &stored_error.handle())
357
358 }
361
362 if let Some(writer) = self.writer.get() {
365 writer.reject_closed_promise_with_stored_error(cx, &stored_error.handle());
367
368 writer.set_close_promise_is_handled(cx);
370 }
371 }
372
373 pub(crate) fn close_queued_or_in_flight(&self) -> bool {
375 let close_requested = self.close_request.borrow().is_some();
376 let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
377
378 close_requested || in_flight_close_requested
379 }
380
381 pub(crate) fn finish_in_flight_write(&self, cx: &mut JSContext) {
383 let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
384 unreachable!("Stream should have a write request");
386 };
387
388 in_flight_write_request.resolve_native(cx, &());
390
391 }
394
395 pub(crate) fn start_erroring(
397 &self,
398 cx: &mut JSContext,
399 global: &GlobalScope,
400 error: SafeHandleValue,
401 ) {
402 assert!(self.stored_error.get().is_undefined());
404
405 assert!(self.is_writable());
407
408 let Some(controller) = self.controller.get() else {
410 unreachable!("Stream should have a controller.");
412 };
413
414 self.state.set(WritableStreamState::Erroring);
416
417 self.stored_error.set(*error);
419
420 if let Some(writer) = self.writer.get() {
422 writer.ensure_ready_promise_rejected(cx, global, error);
424 }
425
426 if !self.has_operations_marked_inflight() && controller.started() {
428 self.finish_erroring(cx, global);
430 }
431 }
432
433 pub(crate) fn deal_with_rejection(
435 &self,
436 cx: &mut JSContext,
437 global: &GlobalScope,
438 error: SafeHandleValue,
439 ) {
440 if self.is_writable() {
444 self.start_erroring(cx, global, error);
446
447 return;
449 }
450
451 assert!(self.is_erroring());
453
454 self.finish_erroring(cx, global);
456 }
457
458 pub(crate) fn mark_first_write_request_in_flight(&self) {
460 let mut in_flight_write_request = self.in_flight_write_request.borrow_mut();
461 let mut write_requests = self.write_requests.borrow_mut();
462
463 assert!(in_flight_write_request.is_none());
465
466 assert!(!write_requests.is_empty());
468
469 let write_request = write_requests.pop_front().unwrap();
472
473 *in_flight_write_request = Some(write_request);
475 }
476
477 pub(crate) fn mark_close_request_in_flight(&self) {
479 let mut in_flight_close_request = self.in_flight_close_request.borrow_mut();
480 let mut close_request = self.close_request.borrow_mut();
481
482 assert!(in_flight_close_request.is_none());
484
485 assert!(close_request.is_some());
487
488 let close_request = close_request.take().unwrap();
491
492 *in_flight_close_request = Some(close_request);
494 }
495
496 pub(crate) fn finish_in_flight_close(&self, cx: &mut JSContext) {
498 let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
499 unreachable!("in_flight_close_request must be Some");
501 };
502
503 in_flight_close_request.resolve_native(cx, &());
505
506 assert!(self.is_writable() || self.is_erroring());
511
512 if self.is_erroring() {
514 self.stored_error.set(UndefinedValue());
516
517 rooted!(&in(cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
519 if let Some(pending_abort_request) = &*pending_abort_request {
520 pending_abort_request.promise.resolve_native(cx, &());
522
523 }
526 }
527
528 self.state.set(WritableStreamState::Closed);
530
531 if let Some(writer) = self.writer.get() {
533 writer.resolve_closed_promise_with_undefined(cx);
536 }
537
538 assert!(self.pending_abort_request.borrow().is_none());
540
541 assert!(self.stored_error.get().is_undefined());
543 }
544
545 pub(crate) fn finish_in_flight_close_with_error(
547 &self,
548 cx: &mut JSContext,
549 global: &GlobalScope,
550 error: SafeHandleValue,
551 ) {
552 let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
553 unreachable!("Inflight close request must be defined.");
555 };
556
557 in_flight_close_request.reject_native(cx, &error);
559
560 assert!(self.is_erroring() || self.is_writable());
565
566 rooted!(&in(cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
568 if let Some(pending_abort_request) = &*pending_abort_request {
569 pending_abort_request.promise.reject_native(cx, &error);
571
572 }
575
576 self.deal_with_rejection(cx, global, error);
578 }
579
580 pub(crate) fn finish_in_flight_write_with_error(
582 &self,
583 cx: &mut JSContext,
584 global: &GlobalScope,
585 error: SafeHandleValue,
586 ) {
587 let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
588 unreachable!("Inflight write request must be defined.");
590 };
591
592 in_flight_write_request.reject_native(cx, &error);
594
595 assert!(self.is_erroring() || self.is_writable());
600
601 self.deal_with_rejection(cx, global, error);
603 }
604
605 pub(crate) fn get_writer(&self) -> Option<DomRoot<WritableStreamDefaultWriter>> {
606 self.writer.get()
607 }
608
609 pub(crate) fn set_writer(&self, writer: Option<&WritableStreamDefaultWriter>) {
610 self.writer.set(writer);
611 }
612
613 pub(crate) fn set_backpressure(&self, backpressure: bool) {
614 self.backpressure.set(backpressure);
615 }
616
617 pub(crate) fn get_backpressure(&self) -> bool {
618 self.backpressure.get()
619 }
620
621 pub(crate) fn is_locked(&self) -> bool {
623 self.get_writer().is_some()
626 }
627
628 pub(crate) fn add_write_request(
630 &self,
631 cx: &mut JSContext,
632 global: &GlobalScope,
633 ) -> Rc<Promise> {
634 assert!(self.is_locked());
636
637 assert!(self.is_writable());
639
640 let promise = Promise::new(cx, global);
642
643 self.write_requests.borrow_mut().push_back(promise.clone());
645
646 promise
648 }
649
650 pub(crate) fn get_controller(&self) -> Option<DomRoot<WritableStreamDefaultController>> {
652 self.controller.get()
653 }
654
655 pub(crate) fn abort(
657 &self,
658 cx: &mut CurrentRealm,
659 global: &GlobalScope,
660 provided_reason: SafeHandleValue,
661 ) -> Rc<Promise> {
662 if self.is_closed() || self.is_errored() {
664 return Promise::new_resolved(cx, global, ());
666 }
667
668 self.get_controller()
670 .expect("Stream must have a controller.")
671 .signal_abort(cx, provided_reason);
672
673 let state = self.state.get();
675
676 if matches!(
678 state,
679 WritableStreamState::Closed | WritableStreamState::Errored
680 ) {
681 return Promise::new_resolved(cx, global, ());
682 }
683
684 if self.pending_abort_request.borrow().is_some() {
686 return self
688 .pending_abort_request
689 .borrow()
690 .as_ref()
691 .expect("Pending abort request must be Some.")
692 .promise
693 .clone();
694 }
695
696 assert!(self.is_writable() || self.is_erroring());
698
699 let mut was_already_erroring = false;
701 rooted!(&in(cx) let undefined_reason = UndefinedValue());
702
703 let reason = if self.is_erroring() {
705 was_already_erroring = true;
707
708 undefined_reason.handle()
710 } else {
711 provided_reason
713 };
714
715 let promise = Promise::new(cx, global);
717
718 *self.pending_abort_request.borrow_mut() = Some(PendingAbortRequest {
723 promise: promise.clone(),
724 reason: Heap::boxed(reason.get()),
725 was_already_erroring,
726 });
727
728 if !was_already_erroring {
730 self.start_erroring(cx, global, reason);
732 }
733
734 promise
736 }
737
738 pub(crate) fn close(&self, cx: &mut JSContext, global: &GlobalScope) -> Rc<Promise> {
740 if self.is_closed() || self.is_errored() {
743 let promise = Promise::new(cx, global);
745 promise.reject_error(cx, Error::Type(c"Stream is closed or errored.".to_owned()));
746 return promise;
747 }
748
749 assert!(self.is_writable() || self.is_erroring());
751
752 assert!(!self.close_queued_or_in_flight());
754
755 let promise = Promise::new(cx, global);
757
758 *self.close_request.borrow_mut() = Some(promise.clone());
760
761 if let Some(writer) = self.writer.get() {
764 if self.get_backpressure() && self.is_writable() {
767 writer.resolve_ready_promise_with_undefined(cx);
769 }
770 }
771
772 let Some(controller) = self.controller.get() else {
774 unreachable!("Stream must have a controller.");
775 };
776 controller.close(cx, global);
777
778 promise
780 }
781
782 pub(crate) fn get_desired_size(&self) -> Option<f64> {
785 if self.is_errored() || self.is_erroring() {
791 return None;
792 }
793
794 if self.is_closed() {
796 return Some(0.);
797 }
798
799 let Some(controller) = self.controller.get() else {
800 unreachable!("Stream must have a controller.");
801 };
802 Some(controller.get_desired_size())
803 }
804
805 pub(crate) fn aquire_default_writer(
807 &self,
808 cx: &mut CurrentRealm,
809 global: &GlobalScope,
810 ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
811 let writer = WritableStreamDefaultWriter::new(cx, global, None);
813
814 writer.setup(cx, self)?;
816
817 Ok(writer)
819 }
820
821 pub(crate) fn update_backpressure(
823 &self,
824 cx: &mut JSContext,
825 backpressure: bool,
826 global: &GlobalScope,
827 ) {
828 self.is_writable();
830
831 assert!(!self.close_queued_or_in_flight());
833
834 let writer = self.get_writer();
836
837 if let Some(writer) = writer {
838 if backpressure != self.get_backpressure() {
840 if backpressure {
842 let promise = Promise::new(cx, global);
844 writer.set_ready_promise(promise);
845 } else {
846 assert!(!backpressure);
849 writer.resolve_ready_promise_with_undefined(cx);
851 }
852 }
853 }
854
855 self.set_backpressure(backpressure);
857 }
858
859 pub(crate) fn setup_cross_realm_transform_writable(
861 &self,
862 cx: &mut JSContext,
863 port: &MessagePort,
864 ) {
865 let port_id = port.message_port_id();
866 let global = self.global();
867
868 let size_algorithm = extract_size_algorithm(cx, &QueuingStrategy::default());
874
875 let backpressure_promise = Rc::new(RefCell::new(Some(Promise::new(cx, &global))));
879
880 let controller = WritableStreamDefaultController::new(
882 cx,
883 &global,
884 UnderlyingSinkType::Transfer {
885 backpressure_promise: backpressure_promise.clone(),
886 port: Dom::from_ref(port),
887 },
888 1.0,
889 size_algorithm,
890 );
891
892 rooted!(&in(cx) let cross_realm_transform_writable = CrossRealmTransformWritable {
895 controller: Dom::from_ref(&controller),
896 backpressure_promise,
897 });
898 global.note_cross_realm_transform_writable(&cross_realm_transform_writable, port_id);
899
900 port.Start(cx);
902
903 controller
905 .setup(cx, &global, self)
906 .expect("Setup for transfer cannot fail");
907 }
908 #[allow(clippy::too_many_arguments)]
910 fn setup_from_underlying_sink(
911 &self,
912 cx: &mut JSContext,
913 global: &GlobalScope,
914 stream: &WritableStream,
915 underlying_sink_obj: SafeHandleObject,
916 underlying_sink: &UnderlyingSink,
917 strategy_hwm: f64,
918 strategy_size: Rc<QueuingStrategySize>,
919 ) -> Result<(), Error> {
920 let controller = WritableStreamDefaultController::new(
946 cx,
947 global,
948 UnderlyingSinkType::new_js(
949 underlying_sink.abort.clone(),
950 underlying_sink.start.clone(),
951 underlying_sink.close.clone(),
952 underlying_sink.write.clone(),
953 ),
954 strategy_hwm,
955 strategy_size,
956 );
957
958 controller.set_underlying_sink_this_object(underlying_sink_obj);
961
962 controller.setup(cx, global, stream)
964 }
965}
966
967#[cfg_attr(crown, expect(crown::unrooted_must_root))]
969pub(crate) fn create_writable_stream(
970 cx: &mut JSContext,
971 global: &GlobalScope,
972 writable_high_water_mark: f64,
973 writable_size_algorithm: Rc<QueuingStrategySize>,
974 underlying_sink_type: UnderlyingSinkType,
975) -> Fallible<DomRoot<WritableStream>> {
976 assert!(writable_high_water_mark >= 0.0);
978
979 let stream = WritableStream::new_with_proto(cx, global, None);
982
983 let controller = WritableStreamDefaultController::new(
985 cx,
986 global,
987 underlying_sink_type,
988 writable_high_water_mark,
989 writable_size_algorithm,
990 );
991
992 controller.setup(cx, global, &stream)?;
995
996 Ok(stream)
998}
999
1000impl WritableStreamMethods<crate::DomTypeHolder> for WritableStream {
1001 fn Constructor(
1003 cx: &mut JSContext,
1004 global: &GlobalScope,
1005 proto: Option<SafeHandleObject>,
1006 underlying_sink: Option<*mut JSObject>,
1007 strategy: &QueuingStrategy,
1008 ) -> Fallible<DomRoot<WritableStream>> {
1009 rooted!(&in(cx) let underlying_sink_obj = underlying_sink.unwrap_or(ptr::null_mut()));
1011
1012 let underlying_sink_dict = if !underlying_sink_obj.is_null() {
1015 rooted!(&in(cx) let obj_val = ObjectValue(underlying_sink_obj.get()));
1016 match UnderlyingSink::new(cx, obj_val.handle()) {
1017 Ok(ConversionResult::Success(val)) => val,
1018 Ok(ConversionResult::Failure(error)) => {
1019 return Err(Error::Type(error.into_owned()));
1020 },
1021 _ => {
1022 return Err(Error::JSFailed);
1023 },
1024 }
1025 } else {
1026 UnderlyingSink::empty()
1027 };
1028
1029 if !underlying_sink_dict.type_.handle().is_undefined() {
1030 return Err(Error::Range(c"type is set".to_owned()));
1032 }
1033
1034 let stream = WritableStream::new_with_proto(cx, global, proto);
1036
1037 let size_algorithm = extract_size_algorithm(cx, strategy);
1039
1040 let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
1042
1043 stream.setup_from_underlying_sink(
1046 cx,
1047 global,
1048 &stream,
1049 underlying_sink_obj.handle(),
1050 &underlying_sink_dict,
1051 high_water_mark,
1052 size_algorithm,
1053 )?;
1054
1055 Ok(stream)
1056 }
1057
1058 fn Locked(&self) -> bool {
1060 self.is_locked()
1062 }
1063
1064 fn Abort(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) -> Rc<Promise> {
1066 let global = GlobalScope::from_current_realm(cx);
1067
1068 if self.is_locked() {
1070 let promise = Promise::new(cx, &global);
1072 promise.reject_error(cx, Error::Type(c"Stream is locked.".to_owned()));
1073 return promise;
1074 }
1075
1076 self.abort(cx, &global, reason)
1078 }
1079
1080 fn Close(&self, cx: &mut CurrentRealm) -> Rc<Promise> {
1082 let global = GlobalScope::from_current_realm(cx);
1083
1084 if self.is_locked() {
1086 let promise = Promise::new(cx, &global);
1088 promise.reject_error(cx, Error::Type(c"Stream is locked.".to_owned()));
1089 return promise;
1090 }
1091
1092 if self.close_queued_or_in_flight() {
1094 let promise = Promise::new(cx, &global);
1096 promise.reject_error(
1097 cx,
1098 Error::Type(c"Stream has closed queued or in-flight".to_owned()),
1099 );
1100 return promise;
1101 }
1102
1103 self.close(cx, &global)
1105 }
1106
1107 fn GetWriter(
1109 &self,
1110 realm: &mut CurrentRealm,
1111 ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
1112 let global = GlobalScope::from_current_realm(realm);
1113
1114 self.aquire_default_writer(realm, &global)
1116 }
1117}
1118
1119impl js::gc::Rootable for CrossRealmTransformWritable {}
1120
1121#[derive(Clone, JSTraceable, MallocSizeOf)]
1125#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
1126pub(crate) struct CrossRealmTransformWritable {
1127 controller: Dom<WritableStreamDefaultController>,
1129
1130 #[ignore_malloc_size_of = "nested Rc"]
1132 backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
1133}
1134
1135impl CrossRealmTransformWritable {
1136 pub(crate) fn handle_message(
1139 &self,
1140 cx: &mut CurrentRealm,
1141 global: &GlobalScope,
1142 message: SafeHandleValue,
1143 ) {
1144 rooted!(&in(cx) let mut value = UndefinedValue());
1145 let type_string = get_type_and_value_from_message(cx, message, value.handle_mut());
1146
1147 if type_string == "error" {
1152 self.controller.error_if_needed(cx, value.handle(), global);
1154 }
1155
1156 let backpressure_promise = self.backpressure_promise.borrow_mut().take();
1157
1158 if let Some(promise) = backpressure_promise {
1161 promise.resolve_native(cx, &());
1163
1164 }
1167 }
1168
1169 pub(crate) fn handle_error(
1172 &self,
1173 cx: &mut CurrentRealm,
1174 global: &GlobalScope,
1175 port: &MessagePort,
1176 ) {
1177 let error = DOMException::new(cx, global, DOMErrorName::DataCloneError);
1179 rooted!(&in(cx) let mut rooted_error = UndefinedValue());
1180 error.safe_to_jsval(cx, rooted_error.handle_mut());
1181
1182 port.cross_realm_transform_send_error(cx, rooted_error.handle());
1184
1185 self.controller
1187 .error_if_needed(cx, rooted_error.handle(), global);
1188
1189 global.disentangle_port(cx, port);
1191 }
1192}
1193
1194impl Transferable for WritableStream {
1196 type Index = MessagePortIndex;
1197 type Data = MessagePortImpl;
1198
1199 fn transfer(&self, cx: &mut JSContext) -> Fallible<(MessagePortId, MessagePortImpl)> {
1201 if self.is_locked() {
1204 return Err(Error::DataClone(None));
1205 }
1206
1207 let global = self.global();
1208 let mut realm = enter_auto_realm(cx, &*global);
1209 let mut realm = realm.current_realm();
1210 let cx = &mut realm;
1211
1212 let port_1 = MessagePort::new(cx, &global);
1214 global.track_message_port(&port_1, None);
1215
1216 let port_2 = MessagePort::new(cx, &global);
1218 global.track_message_port(&port_2, None);
1219
1220 global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
1222
1223 let readable = ReadableStream::new_with_proto(cx, &global, None);
1225
1226 readable.setup_cross_realm_transform_readable(cx, &port_1);
1228
1229 let promise = readable.pipe_to(cx, &global, self, false, false, false, None);
1231
1232 promise.set_promise_is_handled(cx);
1234
1235 port_2.transfer(cx)
1237 }
1238
1239 fn transfer_receive(
1241 cx: &mut JSContext,
1242 owner: &GlobalScope,
1243 id: MessagePortId,
1244 port_impl: MessagePortImpl,
1245 ) -> Result<DomRoot<Self>, ()> {
1246 let value = WritableStream::new_with_proto(cx, owner, None);
1249
1250 let transferred_port = MessagePort::transfer_receive(cx, owner, id, port_impl)?;
1257
1258 value.setup_cross_realm_transform_writable(cx, &transferred_port);
1260 Ok(value)
1261 }
1262
1263 fn serialized_storage<'a>(
1265 data: StructuredData<'a, '_>,
1266 ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
1267 match data {
1268 StructuredData::Reader(r) => &mut r.port_impls,
1269 StructuredData::Writer(w) => &mut w.ports,
1270 }
1271 }
1272}