1use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::mem;
8use std::ptr::{self};
9use std::rc::Rc;
10
11use dom_struct::dom_struct;
12use js::context::JSContext;
13use js::jsapi::{Heap, JSObject};
14use js::jsval::{JSVal, ObjectValue, UndefinedValue};
15use js::realm::CurrentRealm;
16use js::rust::{
17 HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
18 MutableHandleValue as SafeMutableHandleValue,
19};
20use rustc_hash::FxHashMap;
21use script_bindings::cell::DomRefCell;
22use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
23use script_bindings::conversions::SafeToJSValConvertible;
24use script_bindings::reflector::{Reflector, reflect_dom_object_with_proto};
25use servo_base::id::{MessagePortId, MessagePortIndex};
26use servo_constellation_traits::MessagePortImpl;
27
28use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::{
29 QueuingStrategy, QueuingStrategySize,
30};
31use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::UnderlyingSink;
32use crate::dom::bindings::codegen::Bindings::WritableStreamBinding::WritableStreamMethods;
33use crate::dom::bindings::conversions::ConversionResult;
34use crate::dom::bindings::error::{Error, Fallible};
35use crate::dom::bindings::reflector::DomGlobal;
36use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
37use crate::dom::bindings::structuredclone::StructuredData;
38use crate::dom::bindings::transferable::Transferable;
39use crate::dom::domexception::{DOMErrorName, DOMException};
40use crate::dom::globalscope::GlobalScope;
41use crate::dom::messageport::MessagePort;
42use crate::dom::promise::Promise;
43use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
44use crate::dom::readablestream::{ReadableStream, get_type_and_value_from_message};
45use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
46use crate::dom::stream::writablestreamdefaultcontroller::{
47 UnderlyingSinkType, WritableStreamDefaultController,
48};
49use crate::dom::stream::writablestreamdefaultwriter::WritableStreamDefaultWriter;
50use crate::realms::{InRealm, enter_auto_realm};
51use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
52
53impl js::gc::Rootable for AbortAlgorithmFulfillmentHandler {}
54
55#[derive(JSTraceable, MallocSizeOf)]
58#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
59struct AbortAlgorithmFulfillmentHandler {
60 stream: Dom<WritableStream>,
61 #[conditional_malloc_size_of]
62 abort_request_promise: Rc<Promise>,
63}
64
65impl Callback for AbortAlgorithmFulfillmentHandler {
66 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
67 self.abort_request_promise
69 .resolve_native(&(), CanGc::from_cx(cx));
70
71 self.stream
73 .as_rooted()
74 .reject_close_and_closed_promise_if_needed(cx);
75 }
76}
77
78impl js::gc::Rootable for AbortAlgorithmRejectionHandler {}
79
80#[derive(JSTraceable, MallocSizeOf)]
83#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
84struct AbortAlgorithmRejectionHandler {
85 stream: Dom<WritableStream>,
86 #[conditional_malloc_size_of]
87 abort_request_promise: Rc<Promise>,
88}
89
90impl Callback for AbortAlgorithmRejectionHandler {
91 fn callback(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) {
92 self.abort_request_promise
94 .reject_native(&reason, CanGc::from_cx(cx));
95
96 self.stream
98 .as_rooted()
99 .reject_close_and_closed_promise_if_needed(cx);
100 }
101}
102
103impl js::gc::Rootable for PendingAbortRequest {}
104
105#[derive(JSTraceable, MallocSizeOf)]
107#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
108struct PendingAbortRequest {
109 #[conditional_malloc_size_of]
111 promise: Rc<Promise>,
112
113 #[ignore_malloc_size_of = "mozjs"]
115 reason: Box<Heap<JSVal>>,
116
117 was_already_erroring: bool,
119}
120
121#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf)]
123pub(crate) enum WritableStreamState {
124 #[default]
125 Writable,
126 Closed,
127 Erroring,
128 Errored,
129}
130
131#[dom_struct]
133pub struct WritableStream {
134 reflector_: Reflector,
135
136 backpressure: Cell<bool>,
138
139 #[conditional_malloc_size_of]
141 close_request: DomRefCell<Option<Rc<Promise>>>,
142
143 controller: MutNullableDom<WritableStreamDefaultController>,
145
146 detached: Cell<bool>,
148
149 #[conditional_malloc_size_of]
151 in_flight_write_request: DomRefCell<Option<Rc<Promise>>>,
152
153 #[conditional_malloc_size_of]
155 in_flight_close_request: DomRefCell<Option<Rc<Promise>>>,
156
157 pending_abort_request: DomRefCell<Option<PendingAbortRequest>>,
159
160 state: Cell<WritableStreamState>,
162
163 #[ignore_malloc_size_of = "mozjs"]
165 stored_error: Heap<JSVal>,
166
167 writer: MutNullableDom<WritableStreamDefaultWriter>,
169
170 #[conditional_malloc_size_of]
172 write_requests: DomRefCell<VecDeque<Rc<Promise>>>,
173}
174
175impl WritableStream {
176 fn new_inherited() -> WritableStream {
178 WritableStream {
179 reflector_: Reflector::new(),
180 backpressure: Default::default(),
181 close_request: Default::default(),
182 controller: Default::default(),
183 detached: Default::default(),
184 in_flight_write_request: Default::default(),
185 in_flight_close_request: Default::default(),
186 pending_abort_request: Default::default(),
187 state: Default::default(),
188 stored_error: Default::default(),
189 writer: Default::default(),
190 write_requests: Default::default(),
191 }
192 }
193
194 pub(crate) fn new_with_proto(
195 global: &GlobalScope,
196 proto: Option<SafeHandleObject>,
197 can_gc: CanGc,
198 ) -> DomRoot<WritableStream> {
199 reflect_dom_object_with_proto(
200 Box::new(WritableStream::new_inherited()),
201 global,
202 proto,
203 can_gc,
204 )
205 }
206
207 pub(crate) fn assert_no_controller(&self) {
210 assert!(self.controller.get().is_none());
211 }
212
213 pub(crate) fn set_default_controller(&self, controller: &WritableStreamDefaultController) {
216 self.controller.set(Some(controller));
217 }
218
219 pub(crate) fn get_default_controller(&self) -> DomRoot<WritableStreamDefaultController> {
220 self.controller.get().expect("Controller should be set.")
221 }
222
223 pub(crate) fn is_writable(&self) -> bool {
224 matches!(self.state.get(), WritableStreamState::Writable)
225 }
226
227 pub(crate) fn is_erroring(&self) -> bool {
228 matches!(self.state.get(), WritableStreamState::Erroring)
229 }
230
231 pub(crate) fn is_errored(&self) -> bool {
232 matches!(self.state.get(), WritableStreamState::Errored)
233 }
234
235 pub(crate) fn is_closed(&self) -> bool {
236 matches!(self.state.get(), WritableStreamState::Closed)
237 }
238
239 pub(crate) fn has_in_flight_write_request(&self) -> bool {
240 self.in_flight_write_request.borrow().is_some()
241 }
242
243 pub(crate) fn has_operations_marked_inflight(&self) -> bool {
245 let in_flight_write_requested = self.in_flight_write_request.borrow().is_some();
246 let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
247
248 in_flight_write_requested || in_flight_close_requested
249 }
250
251 pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
253 handle_mut.set(self.stored_error.get());
254 }
255
256 pub(crate) fn finish_erroring(&self, cx: &mut JSContext, global: &GlobalScope) {
258 assert!(self.is_erroring());
260
261 assert!(!self.has_operations_marked_inflight());
263
264 self.state.set(WritableStreamState::Errored);
266
267 let Some(controller) = self.controller.get() else {
269 unreachable!("Stream should have a controller.");
270 };
271 controller.perform_error_steps();
272
273 rooted!(&in(cx) let mut stored_error = UndefinedValue());
275 self.get_stored_error(stored_error.handle_mut());
276
277 let write_requests = mem::take(&mut *self.write_requests.borrow_mut());
279 for request in write_requests {
280 request.reject(cx.into(), stored_error.handle(), CanGc::from_cx(cx));
282 }
283
284 if self.pending_abort_request.borrow().is_none() {
289 self.reject_close_and_closed_promise_if_needed(cx);
291
292 return;
294 }
295
296 rooted!(&in(cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
299 if let Some(pending_abort_request) = &*pending_abort_request {
300 if pending_abort_request.was_already_erroring {
302 pending_abort_request.promise.reject(
304 cx.into(),
305 stored_error.handle(),
306 CanGc::from_cx(cx),
307 );
308
309 self.reject_close_and_closed_promise_if_needed(cx);
311
312 return;
314 }
315
316 rooted!(&in(cx) let mut reason = UndefinedValue());
318 reason.set(pending_abort_request.reason.get());
319 let promise = controller.abort_steps(cx, global, reason.handle());
320
321 rooted!(&in(cx) let mut fulfillment_handler = Some(AbortAlgorithmFulfillmentHandler {
323 stream: Dom::from_ref(self),
324 abort_request_promise: pending_abort_request.promise.clone(),
325 }));
326
327 rooted!(&in(cx) let mut rejection_handler = Some(AbortAlgorithmRejectionHandler {
329 stream: Dom::from_ref(self),
330 abort_request_promise: pending_abort_request.promise.clone(),
331 }));
332
333 let handler = PromiseNativeHandler::new(
334 global,
335 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
336 rejection_handler.take().map(|h| Box::new(h) as Box<_>),
337 CanGc::from_cx(cx),
338 );
339
340 let mut realm = enter_auto_realm(cx, global);
341 let cx = &mut realm.current_realm();
342 promise.append_native_handler(cx, &handler);
343 }
344 }
345
346 fn reject_close_and_closed_promise_if_needed(&self, cx: &mut JSContext) {
348 assert!(self.is_errored());
350
351 rooted!(&in(cx) let mut stored_error = UndefinedValue());
352 self.get_stored_error(stored_error.handle_mut());
353
354 let close_request = self.close_request.borrow_mut().take();
356 if let Some(close_request) = close_request {
357 assert!(self.in_flight_close_request.borrow().is_none());
359
360 close_request.reject_native(&stored_error.handle(), CanGc::from_cx(cx))
362
363 }
366
367 if let Some(writer) = self.writer.get() {
370 writer.reject_closed_promise_with_stored_error(cx, &stored_error.handle());
372
373 writer.set_close_promise_is_handled();
375 }
376 }
377
378 pub(crate) fn close_queued_or_in_flight(&self) -> bool {
380 let close_requested = self.close_request.borrow().is_some();
381 let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
382
383 close_requested || in_flight_close_requested
384 }
385
386 pub(crate) fn finish_in_flight_write(&self, can_gc: CanGc) {
388 let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
389 unreachable!("Stream should have a write request");
391 };
392
393 in_flight_write_request.resolve_native(&(), can_gc);
395
396 }
399
400 pub(crate) fn start_erroring(
402 &self,
403 cx: &mut JSContext,
404 global: &GlobalScope,
405 error: SafeHandleValue,
406 ) {
407 assert!(self.stored_error.get().is_undefined());
409
410 assert!(self.is_writable());
412
413 let Some(controller) = self.controller.get() else {
415 unreachable!("Stream should have a controller.");
417 };
418
419 self.state.set(WritableStreamState::Erroring);
421
422 self.stored_error.set(*error);
424
425 if let Some(writer) = self.writer.get() {
427 writer.ensure_ready_promise_rejected(global, error, CanGc::from_cx(cx));
429 }
430
431 if !self.has_operations_marked_inflight() && controller.started() {
433 self.finish_erroring(cx, global);
435 }
436 }
437
438 pub(crate) fn deal_with_rejection(
440 &self,
441 cx: &mut JSContext,
442 global: &GlobalScope,
443 error: SafeHandleValue,
444 ) {
445 if self.is_writable() {
449 self.start_erroring(cx, global, error);
451
452 return;
454 }
455
456 assert!(self.is_erroring());
458
459 self.finish_erroring(cx, global);
461 }
462
463 pub(crate) fn mark_first_write_request_in_flight(&self) {
465 let mut in_flight_write_request = self.in_flight_write_request.borrow_mut();
466 let mut write_requests = self.write_requests.borrow_mut();
467
468 assert!(in_flight_write_request.is_none());
470
471 assert!(!write_requests.is_empty());
473
474 let write_request = write_requests.pop_front().unwrap();
477
478 *in_flight_write_request = Some(write_request);
480 }
481
482 pub(crate) fn mark_close_request_in_flight(&self) {
484 let mut in_flight_close_request = self.in_flight_close_request.borrow_mut();
485 let mut close_request = self.close_request.borrow_mut();
486
487 assert!(in_flight_close_request.is_none());
489
490 assert!(close_request.is_some());
492
493 let close_request = close_request.take().unwrap();
496
497 *in_flight_close_request = Some(close_request);
499 }
500
501 pub(crate) fn finish_in_flight_close(&self, cx: SafeJSContext, can_gc: CanGc) {
503 let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
504 unreachable!("in_flight_close_request must be Some");
506 };
507
508 in_flight_close_request.resolve_native(&(), can_gc);
510
511 assert!(self.is_writable() || self.is_erroring());
516
517 if self.is_erroring() {
519 self.stored_error.set(UndefinedValue());
521
522 rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
524 if let Some(pending_abort_request) = &*pending_abort_request {
525 pending_abort_request.promise.resolve_native(&(), can_gc);
527
528 }
531 }
532
533 self.state.set(WritableStreamState::Closed);
535
536 if let Some(writer) = self.writer.get() {
538 writer.resolve_closed_promise_with_undefined(can_gc);
541 }
542
543 assert!(self.pending_abort_request.borrow().is_none());
545
546 assert!(self.stored_error.get().is_undefined());
548 }
549
550 pub(crate) fn finish_in_flight_close_with_error(
552 &self,
553 cx: &mut JSContext,
554 global: &GlobalScope,
555 error: SafeHandleValue,
556 ) {
557 let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
558 unreachable!("Inflight close request must be defined.");
560 };
561
562 in_flight_close_request.reject_native(&error, CanGc::from_cx(cx));
564
565 assert!(self.is_erroring() || self.is_writable());
570
571 rooted!(&in(cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
573 if let Some(pending_abort_request) = &*pending_abort_request {
574 pending_abort_request
576 .promise
577 .reject_native(&error, CanGc::from_cx(cx));
578
579 }
582
583 self.deal_with_rejection(cx, global, error);
585 }
586
587 pub(crate) fn finish_in_flight_write_with_error(
589 &self,
590 cx: &mut JSContext,
591 global: &GlobalScope,
592 error: SafeHandleValue,
593 ) {
594 let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
595 unreachable!("Inflight write request must be defined.");
597 };
598
599 in_flight_write_request.reject_native(&error, CanGc::from_cx(cx));
601
602 assert!(self.is_erroring() || self.is_writable());
607
608 self.deal_with_rejection(cx, global, error);
610 }
611
612 pub(crate) fn get_writer(&self) -> Option<DomRoot<WritableStreamDefaultWriter>> {
613 self.writer.get()
614 }
615
616 pub(crate) fn set_writer(&self, writer: Option<&WritableStreamDefaultWriter>) {
617 self.writer.set(writer);
618 }
619
620 pub(crate) fn set_backpressure(&self, backpressure: bool) {
621 self.backpressure.set(backpressure);
622 }
623
624 pub(crate) fn get_backpressure(&self) -> bool {
625 self.backpressure.get()
626 }
627
628 pub(crate) fn is_locked(&self) -> bool {
630 self.get_writer().is_some()
633 }
634
635 pub(crate) fn add_write_request(&self, global: &GlobalScope, can_gc: CanGc) -> Rc<Promise> {
637 assert!(self.is_locked());
639
640 assert!(self.is_writable());
642
643 let promise = Promise::new(global, can_gc);
645
646 self.write_requests.borrow_mut().push_back(promise.clone());
648
649 promise
651 }
652
653 pub(crate) fn get_controller(&self) -> Option<DomRoot<WritableStreamDefaultController>> {
655 self.controller.get()
656 }
657
658 pub(crate) fn abort(
660 &self,
661 cx: &mut CurrentRealm,
662 global: &GlobalScope,
663 provided_reason: SafeHandleValue,
664 ) -> Rc<Promise> {
665 if self.is_closed() || self.is_errored() {
667 return Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
669 }
670
671 self.get_controller()
673 .expect("Stream must have a controller.")
674 .signal_abort(cx, provided_reason);
675
676 let state = self.state.get();
678
679 if matches!(
681 state,
682 WritableStreamState::Closed | WritableStreamState::Errored
683 ) {
684 return Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
685 }
686
687 if self.pending_abort_request.borrow().is_some() {
689 return self
691 .pending_abort_request
692 .borrow()
693 .as_ref()
694 .expect("Pending abort request must be Some.")
695 .promise
696 .clone();
697 }
698
699 assert!(self.is_writable() || self.is_erroring());
701
702 let mut was_already_erroring = false;
704 rooted!(&in(cx) let undefined_reason = UndefinedValue());
705
706 let reason = if self.is_erroring() {
708 was_already_erroring = true;
710
711 undefined_reason.handle()
713 } else {
714 provided_reason
716 };
717
718 let promise = Promise::new2(cx, global);
720
721 *self.pending_abort_request.borrow_mut() = Some(PendingAbortRequest {
726 promise: promise.clone(),
727 reason: Heap::boxed(reason.get()),
728 was_already_erroring,
729 });
730
731 if !was_already_erroring {
733 self.start_erroring(cx, global, reason);
735 }
736
737 promise
739 }
740
741 pub(crate) fn close(&self, cx: &mut JSContext, global: &GlobalScope) -> Rc<Promise> {
743 if self.is_closed() || self.is_errored() {
746 let promise = Promise::new2(cx, global);
748 promise.reject_error(
749 Error::Type(c"Stream is closed or errored.".to_owned()),
750 CanGc::from_cx(cx),
751 );
752 return promise;
753 }
754
755 assert!(self.is_writable() || self.is_erroring());
757
758 assert!(!self.close_queued_or_in_flight());
760
761 let promise = Promise::new2(cx, global);
763
764 *self.close_request.borrow_mut() = Some(promise.clone());
766
767 if let Some(writer) = self.writer.get() {
770 if self.get_backpressure() && self.is_writable() {
773 writer.resolve_ready_promise_with_undefined(CanGc::from_cx(cx));
775 }
776 }
777
778 let Some(controller) = self.controller.get() else {
780 unreachable!("Stream must have a controller.");
781 };
782 controller.close(cx, global);
783
784 promise
786 }
787
788 pub(crate) fn get_desired_size(&self) -> Option<f64> {
791 if self.is_errored() || self.is_erroring() {
797 return None;
798 }
799
800 if self.is_closed() {
802 return Some(0.);
803 }
804
805 let Some(controller) = self.controller.get() else {
806 unreachable!("Stream must have a controller.");
807 };
808 Some(controller.get_desired_size())
809 }
810
811 pub(crate) fn aquire_default_writer(
813 &self,
814 cx: SafeJSContext,
815 global: &GlobalScope,
816 can_gc: CanGc,
817 ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
818 let writer = WritableStreamDefaultWriter::new(global, None, can_gc);
820
821 writer.setup(cx, self, can_gc)?;
823
824 Ok(writer)
826 }
827
828 pub(crate) fn update_backpressure(
830 &self,
831 backpressure: bool,
832 global: &GlobalScope,
833 can_gc: CanGc,
834 ) {
835 self.is_writable();
837
838 assert!(!self.close_queued_or_in_flight());
840
841 let writer = self.get_writer();
843
844 if let Some(writer) = writer {
845 if backpressure != self.get_backpressure() {
847 if backpressure {
849 let promise = Promise::new(global, can_gc);
851 writer.set_ready_promise(promise);
852 } else {
853 assert!(!backpressure);
856 writer.resolve_ready_promise_with_undefined(can_gc);
858 }
859 }
860 }
861
862 self.set_backpressure(backpressure);
864 }
865
866 pub(crate) fn setup_cross_realm_transform_writable(
868 &self,
869 cx: &mut JSContext,
870 port: &MessagePort,
871 ) {
872 let port_id = port.message_port_id();
873 let global = self.global();
874
875 let size_algorithm =
881 extract_size_algorithm(&QueuingStrategy::default(), CanGc::from_cx(cx));
882
883 let backpressure_promise = Rc::new(RefCell::new(Some(Promise::new(
887 &global,
888 CanGc::from_cx(cx),
889 ))));
890
891 let controller = WritableStreamDefaultController::new(
893 &global,
894 UnderlyingSinkType::Transfer {
895 backpressure_promise: backpressure_promise.clone(),
896 port: Dom::from_ref(port),
897 },
898 1.0,
899 size_algorithm,
900 CanGc::from_cx(cx),
901 );
902
903 rooted!(&in(cx) let cross_realm_transform_writable = CrossRealmTransformWritable {
906 controller: Dom::from_ref(&controller),
907 backpressure_promise,
908 });
909 global.note_cross_realm_transform_writable(&cross_realm_transform_writable, port_id);
910
911 port.Start(cx);
913
914 controller
916 .setup(cx, &global, self)
917 .expect("Setup for transfer cannot fail");
918 }
919 #[allow(clippy::too_many_arguments)]
921 fn setup_from_underlying_sink(
922 &self,
923 cx: &mut JSContext,
924 global: &GlobalScope,
925 stream: &WritableStream,
926 underlying_sink_obj: SafeHandleObject,
927 underlying_sink: &UnderlyingSink,
928 strategy_hwm: f64,
929 strategy_size: Rc<QueuingStrategySize>,
930 ) -> Result<(), Error> {
931 let controller = WritableStreamDefaultController::new(
957 global,
958 UnderlyingSinkType::new_js(
959 underlying_sink.abort.clone(),
960 underlying_sink.start.clone(),
961 underlying_sink.close.clone(),
962 underlying_sink.write.clone(),
963 ),
964 strategy_hwm,
965 strategy_size,
966 CanGc::from_cx(cx),
967 );
968
969 controller.set_underlying_sink_this_object(underlying_sink_obj);
972
973 controller.setup(cx, global, stream)
975 }
976}
977
978#[cfg_attr(crown, expect(crown::unrooted_must_root))]
980pub(crate) fn create_writable_stream(
981 cx: &mut JSContext,
982 global: &GlobalScope,
983 writable_high_water_mark: f64,
984 writable_size_algorithm: Rc<QueuingStrategySize>,
985 underlying_sink_type: UnderlyingSinkType,
986) -> Fallible<DomRoot<WritableStream>> {
987 assert!(writable_high_water_mark >= 0.0);
989
990 let stream = WritableStream::new_with_proto(global, None, CanGc::from_cx(cx));
993
994 let controller = WritableStreamDefaultController::new(
996 global,
997 underlying_sink_type,
998 writable_high_water_mark,
999 writable_size_algorithm,
1000 CanGc::from_cx(cx),
1001 );
1002
1003 controller.setup(cx, global, &stream)?;
1006
1007 Ok(stream)
1009}
1010
1011impl WritableStreamMethods<crate::DomTypeHolder> for WritableStream {
1012 fn Constructor(
1014 cx: &mut JSContext,
1015 global: &GlobalScope,
1016 proto: Option<SafeHandleObject>,
1017 underlying_sink: Option<*mut JSObject>,
1018 strategy: &QueuingStrategy,
1019 ) -> Fallible<DomRoot<WritableStream>> {
1020 rooted!(&in(cx) let underlying_sink_obj = underlying_sink.unwrap_or(ptr::null_mut()));
1022
1023 let underlying_sink_dict = if !underlying_sink_obj.is_null() {
1026 rooted!(&in(cx) let obj_val = ObjectValue(underlying_sink_obj.get()));
1027 match UnderlyingSink::new(cx.into(), obj_val.handle(), CanGc::from_cx(cx)) {
1028 Ok(ConversionResult::Success(val)) => val,
1029 Ok(ConversionResult::Failure(error)) => {
1030 return Err(Error::Type(error.into_owned()));
1031 },
1032 _ => {
1033 return Err(Error::JSFailed);
1034 },
1035 }
1036 } else {
1037 UnderlyingSink::empty()
1038 };
1039
1040 if !underlying_sink_dict.type_.handle().is_undefined() {
1041 return Err(Error::Range(c"type is set".to_owned()));
1043 }
1044
1045 let stream = WritableStream::new_with_proto(global, proto, CanGc::from_cx(cx));
1047
1048 let size_algorithm = extract_size_algorithm(strategy, CanGc::from_cx(cx));
1050
1051 let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
1053
1054 stream.setup_from_underlying_sink(
1057 cx,
1058 global,
1059 &stream,
1060 underlying_sink_obj.handle(),
1061 &underlying_sink_dict,
1062 high_water_mark,
1063 size_algorithm,
1064 )?;
1065
1066 Ok(stream)
1067 }
1068
1069 fn Locked(&self) -> bool {
1071 self.is_locked()
1073 }
1074
1075 fn Abort(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) -> Rc<Promise> {
1077 let global = GlobalScope::from_current_realm(cx);
1078
1079 if self.is_locked() {
1081 let promise = Promise::new2(cx, &global);
1083 promise.reject_error(
1084 Error::Type(c"Stream is locked.".to_owned()),
1085 CanGc::from_cx(cx),
1086 );
1087 return promise;
1088 }
1089
1090 self.abort(cx, &global, reason)
1092 }
1093
1094 fn Close(&self, cx: &mut CurrentRealm) -> Rc<Promise> {
1096 let global = GlobalScope::from_current_realm(cx);
1097
1098 if self.is_locked() {
1100 let promise = Promise::new2(cx, &global);
1102 promise.reject_error(
1103 Error::Type(c"Stream is locked.".to_owned()),
1104 CanGc::from_cx(cx),
1105 );
1106 return promise;
1107 }
1108
1109 if self.close_queued_or_in_flight() {
1111 let promise = Promise::new2(cx, &global);
1113 promise.reject_error(
1114 Error::Type(c"Stream has closed queued or in-flight".to_owned()),
1115 CanGc::from_cx(cx),
1116 );
1117 return promise;
1118 }
1119
1120 self.close(cx, &global)
1122 }
1123
1124 fn GetWriter(
1126 &self,
1127 realm: InRealm,
1128 can_gc: CanGc,
1129 ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
1130 let cx = GlobalScope::get_cx();
1131 let global = GlobalScope::from_safe_context(cx, realm);
1132
1133 self.aquire_default_writer(cx, &global, can_gc)
1135 }
1136}
1137
1138impl js::gc::Rootable for CrossRealmTransformWritable {}
1139
1140#[derive(Clone, JSTraceable, MallocSizeOf)]
1144#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
1145pub(crate) struct CrossRealmTransformWritable {
1146 controller: Dom<WritableStreamDefaultController>,
1148
1149 #[ignore_malloc_size_of = "nested Rc"]
1151 backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
1152}
1153
1154impl CrossRealmTransformWritable {
1155 #[expect(unsafe_code)]
1158 pub(crate) fn handle_message(
1159 &self,
1160 cx: &mut CurrentRealm,
1161 global: &GlobalScope,
1162 message: SafeHandleValue,
1163 ) {
1164 rooted!(&in(cx) let mut value = UndefinedValue());
1165 let type_string = unsafe {
1166 get_type_and_value_from_message(
1167 cx.into(),
1168 message,
1169 value.handle_mut(),
1170 CanGc::from_cx(cx),
1171 )
1172 };
1173
1174 if type_string == "error" {
1179 self.controller.error_if_needed(cx, value.handle(), global);
1181 }
1182
1183 let backpressure_promise = self.backpressure_promise.borrow_mut().take();
1184
1185 if let Some(promise) = backpressure_promise {
1188 promise.resolve_native(&(), CanGc::from_cx(cx));
1190
1191 }
1194 }
1195
1196 pub(crate) fn handle_error(
1199 &self,
1200 cx: &mut CurrentRealm,
1201 global: &GlobalScope,
1202 port: &MessagePort,
1203 ) {
1204 let error = DOMException::new(global, DOMErrorName::DataCloneError, CanGc::from_cx(cx));
1206 rooted!(&in(cx) let mut rooted_error = UndefinedValue());
1207 error.safe_to_jsval(cx.into(), rooted_error.handle_mut(), CanGc::from_cx(cx));
1208
1209 port.cross_realm_transform_send_error(cx, rooted_error.handle());
1211
1212 self.controller
1214 .error_if_needed(cx, rooted_error.handle(), global);
1215
1216 global.disentangle_port(cx, port);
1218 }
1219}
1220
1221impl Transferable for WritableStream {
1223 type Index = MessagePortIndex;
1224 type Data = MessagePortImpl;
1225
1226 fn transfer(&self, cx: &mut JSContext) -> Fallible<(MessagePortId, MessagePortImpl)> {
1228 if self.is_locked() {
1231 return Err(Error::DataClone(None));
1232 }
1233
1234 let global = self.global();
1235 let mut realm = enter_auto_realm(cx, &*global);
1236 let mut realm = realm.current_realm();
1237 let cx = &mut realm;
1238
1239 let port_1 = MessagePort::new(&global, CanGc::from_cx(cx));
1241 global.track_message_port(&port_1, None);
1242
1243 let port_2 = MessagePort::new(&global, CanGc::from_cx(cx));
1245 global.track_message_port(&port_2, None);
1246
1247 global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
1249
1250 let readable = ReadableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
1252
1253 readable.setup_cross_realm_transform_readable(cx, &port_1);
1255
1256 let promise = readable.pipe_to(cx, &global, self, false, false, false, None);
1258
1259 promise.set_promise_is_handled();
1261
1262 port_2.transfer(cx)
1264 }
1265
1266 fn transfer_receive(
1268 cx: &mut JSContext,
1269 owner: &GlobalScope,
1270 id: MessagePortId,
1271 port_impl: MessagePortImpl,
1272 ) -> Result<DomRoot<Self>, ()> {
1273 let value = WritableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1276
1277 let transferred_port = MessagePort::transfer_receive(cx, owner, id, port_impl)?;
1284
1285 value.setup_cross_realm_transform_writable(cx, &transferred_port);
1287 Ok(value)
1288 }
1289
1290 fn serialized_storage<'a>(
1292 data: StructuredData<'a, '_>,
1293 ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
1294 match data {
1295 StructuredData::Reader(r) => &mut r.port_impls,
1296 StructuredData::Writer(w) => &mut w.ports,
1297 }
1298 }
1299}