1use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::mem;
8use std::ptr::{self};
9use std::rc::Rc;
10
11use base::id::{MessagePortId, MessagePortIndex};
12use constellation_traits::MessagePortImpl;
13use dom_struct::dom_struct;
14use js::jsapi::{Heap, JSObject};
15use js::jsval::{JSVal, ObjectValue, UndefinedValue};
16use js::realm::CurrentRealm;
17use js::rust::{
18 HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
19 MutableHandleValue as SafeMutableHandleValue,
20};
21use rustc_hash::FxHashMap;
22use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
23use script_bindings::conversions::SafeToJSValConvertible;
24
25use crate::dom::bindings::cell::DomRefCell;
26use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::{
27 QueuingStrategy, QueuingStrategySize,
28};
29use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::UnderlyingSink;
30use crate::dom::bindings::codegen::Bindings::WritableStreamBinding::WritableStreamMethods;
31use crate::dom::bindings::conversions::ConversionResult;
32use crate::dom::bindings::error::{Error, Fallible};
33use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
34use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
35use crate::dom::bindings::structuredclone::StructuredData;
36use crate::dom::bindings::transferable::Transferable;
37use crate::dom::domexception::{DOMErrorName, DOMException};
38use crate::dom::globalscope::GlobalScope;
39use crate::dom::messageport::MessagePort;
40use crate::dom::promise::Promise;
41use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
42use crate::dom::readablestream::{ReadableStream, get_type_and_value_from_message};
43use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
44use crate::dom::stream::writablestreamdefaultcontroller::{
45 UnderlyingSinkType, WritableStreamDefaultController,
46};
47use crate::dom::stream::writablestreamdefaultwriter::WritableStreamDefaultWriter;
48use crate::realms::{InRealm, enter_realm};
49use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
50
51impl js::gc::Rootable for AbortAlgorithmFulfillmentHandler {}
52
53#[derive(JSTraceable, MallocSizeOf)]
56#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
57struct AbortAlgorithmFulfillmentHandler {
58 stream: Dom<WritableStream>,
59 #[conditional_malloc_size_of]
60 abort_request_promise: Rc<Promise>,
61}
62
63impl Callback for AbortAlgorithmFulfillmentHandler {
64 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
65 let can_gc = CanGc::from_cx(cx);
66 let cx: SafeJSContext = cx.into();
67 self.abort_request_promise.resolve_native(&(), can_gc);
69
70 self.stream
72 .as_rooted()
73 .reject_close_and_closed_promise_if_needed(cx, can_gc);
74 }
75}
76
77impl js::gc::Rootable for AbortAlgorithmRejectionHandler {}
78
79#[derive(JSTraceable, MallocSizeOf)]
82#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
83struct AbortAlgorithmRejectionHandler {
84 stream: Dom<WritableStream>,
85 #[conditional_malloc_size_of]
86 abort_request_promise: Rc<Promise>,
87}
88
89impl Callback for AbortAlgorithmRejectionHandler {
90 fn callback(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) {
91 let can_gc = CanGc::from_cx(cx);
92 let cx: SafeJSContext = cx.into();
93 self.abort_request_promise.reject_native(&reason, can_gc);
95
96 self.stream
98 .as_rooted()
99 .reject_close_and_closed_promise_if_needed(cx, can_gc);
100 }
101}
102
103impl js::gc::Rootable for PendingAbortRequest {}
104
105#[derive(JSTraceable, MallocSizeOf)]
107#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
108struct PendingAbortRequest {
109 #[conditional_malloc_size_of]
111 promise: Rc<Promise>,
112
113 #[ignore_malloc_size_of = "mozjs"]
115 reason: Box<Heap<JSVal>>,
116
117 was_already_erroring: bool,
119}
120
121#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf)]
123pub(crate) enum WritableStreamState {
124 #[default]
125 Writable,
126 Closed,
127 Erroring,
128 Errored,
129}
130
131#[dom_struct]
133pub struct WritableStream {
134 reflector_: Reflector,
135
136 backpressure: Cell<bool>,
138
139 #[conditional_malloc_size_of]
141 close_request: DomRefCell<Option<Rc<Promise>>>,
142
143 controller: MutNullableDom<WritableStreamDefaultController>,
145
146 detached: Cell<bool>,
148
149 #[conditional_malloc_size_of]
151 in_flight_write_request: DomRefCell<Option<Rc<Promise>>>,
152
153 #[conditional_malloc_size_of]
155 in_flight_close_request: DomRefCell<Option<Rc<Promise>>>,
156
157 pending_abort_request: DomRefCell<Option<PendingAbortRequest>>,
159
160 state: Cell<WritableStreamState>,
162
163 #[ignore_malloc_size_of = "mozjs"]
165 stored_error: Heap<JSVal>,
166
167 writer: MutNullableDom<WritableStreamDefaultWriter>,
169
170 #[conditional_malloc_size_of]
172 write_requests: DomRefCell<VecDeque<Rc<Promise>>>,
173}
174
175impl WritableStream {
176 fn new_inherited() -> WritableStream {
178 WritableStream {
179 reflector_: Reflector::new(),
180 backpressure: Default::default(),
181 close_request: Default::default(),
182 controller: Default::default(),
183 detached: Default::default(),
184 in_flight_write_request: Default::default(),
185 in_flight_close_request: Default::default(),
186 pending_abort_request: Default::default(),
187 state: Default::default(),
188 stored_error: Default::default(),
189 writer: Default::default(),
190 write_requests: Default::default(),
191 }
192 }
193
194 pub(crate) fn new_with_proto(
195 global: &GlobalScope,
196 proto: Option<SafeHandleObject>,
197 can_gc: CanGc,
198 ) -> DomRoot<WritableStream> {
199 reflect_dom_object_with_proto(
200 Box::new(WritableStream::new_inherited()),
201 global,
202 proto,
203 can_gc,
204 )
205 }
206
207 pub(crate) fn assert_no_controller(&self) {
210 assert!(self.controller.get().is_none());
211 }
212
213 pub(crate) fn set_default_controller(&self, controller: &WritableStreamDefaultController) {
216 self.controller.set(Some(controller));
217 }
218
219 pub(crate) fn get_default_controller(&self) -> DomRoot<WritableStreamDefaultController> {
220 self.controller.get().expect("Controller should be set.")
221 }
222
223 pub(crate) fn is_writable(&self) -> bool {
224 matches!(self.state.get(), WritableStreamState::Writable)
225 }
226
227 pub(crate) fn is_erroring(&self) -> bool {
228 matches!(self.state.get(), WritableStreamState::Erroring)
229 }
230
231 pub(crate) fn is_errored(&self) -> bool {
232 matches!(self.state.get(), WritableStreamState::Errored)
233 }
234
235 pub(crate) fn is_closed(&self) -> bool {
236 matches!(self.state.get(), WritableStreamState::Closed)
237 }
238
239 pub(crate) fn has_in_flight_write_request(&self) -> bool {
240 self.in_flight_write_request.borrow().is_some()
241 }
242
243 pub(crate) fn has_operations_marked_inflight(&self) -> bool {
245 let in_flight_write_requested = self.in_flight_write_request.borrow().is_some();
246 let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
247
248 in_flight_write_requested || in_flight_close_requested
249 }
250
251 pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
253 handle_mut.set(self.stored_error.get());
254 }
255
256 pub(crate) fn finish_erroring(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
258 assert!(self.is_erroring());
260
261 assert!(!self.has_operations_marked_inflight());
263
264 self.state.set(WritableStreamState::Errored);
266
267 let Some(controller) = self.controller.get() else {
269 unreachable!("Stream should have a controller.");
270 };
271 controller.perform_error_steps();
272
273 rooted!(in(*cx) let mut stored_error = UndefinedValue());
275 self.get_stored_error(stored_error.handle_mut());
276
277 let write_requests = mem::take(&mut *self.write_requests.borrow_mut());
279 for request in write_requests {
280 request.reject(cx, stored_error.handle(), can_gc);
282 }
283
284 if self.pending_abort_request.borrow().is_none() {
289 self.reject_close_and_closed_promise_if_needed(cx, can_gc);
291
292 return;
294 }
295
296 rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
299 if let Some(pending_abort_request) = &*pending_abort_request {
300 if pending_abort_request.was_already_erroring {
302 pending_abort_request
304 .promise
305 .reject(cx, stored_error.handle(), can_gc);
306
307 self.reject_close_and_closed_promise_if_needed(cx, can_gc);
309
310 return;
312 }
313
314 rooted!(in(*cx) let mut reason = UndefinedValue());
316 reason.set(pending_abort_request.reason.get());
317 let promise = controller.abort_steps(cx, global, reason.handle(), can_gc);
318
319 rooted!(in(*cx) let mut fulfillment_handler = Some(AbortAlgorithmFulfillmentHandler {
321 stream: Dom::from_ref(self),
322 abort_request_promise: pending_abort_request.promise.clone(),
323 }));
324
325 rooted!(in(*cx) let mut rejection_handler = Some(AbortAlgorithmRejectionHandler {
327 stream: Dom::from_ref(self),
328 abort_request_promise: pending_abort_request.promise.clone(),
329 }));
330
331 let handler = PromiseNativeHandler::new(
332 global,
333 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
334 rejection_handler.take().map(|h| Box::new(h) as Box<_>),
335 can_gc,
336 );
337 let realm = enter_realm(global);
338 let comp = InRealm::Entered(&realm);
339 promise.append_native_handler(&handler, comp, can_gc);
340 }
341 }
342
343 fn reject_close_and_closed_promise_if_needed(&self, cx: SafeJSContext, can_gc: CanGc) {
345 assert!(self.is_errored());
347
348 rooted!(in(*cx) let mut stored_error = UndefinedValue());
349 self.get_stored_error(stored_error.handle_mut());
350
351 let close_request = self.close_request.borrow_mut().take();
353 if let Some(close_request) = close_request {
354 assert!(self.in_flight_close_request.borrow().is_none());
356
357 close_request.reject_native(&stored_error.handle(), can_gc)
359
360 }
363
364 if let Some(writer) = self.writer.get() {
367 writer.reject_closed_promise_with_stored_error(&stored_error.handle(), can_gc);
369
370 writer.set_close_promise_is_handled();
372 }
373 }
374
375 pub(crate) fn close_queued_or_in_flight(&self) -> bool {
377 let close_requested = self.close_request.borrow().is_some();
378 let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
379
380 close_requested || in_flight_close_requested
381 }
382
383 pub(crate) fn finish_in_flight_write(&self, can_gc: CanGc) {
385 let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
386 unreachable!("Stream should have a write request");
388 };
389
390 in_flight_write_request.resolve_native(&(), can_gc);
392
393 }
396
397 pub(crate) fn start_erroring(
399 &self,
400 cx: SafeJSContext,
401 global: &GlobalScope,
402 error: SafeHandleValue,
403 can_gc: CanGc,
404 ) {
405 assert!(self.stored_error.get().is_undefined());
407
408 assert!(self.is_writable());
410
411 let Some(controller) = self.controller.get() else {
413 unreachable!("Stream should have a controller.");
415 };
416
417 self.state.set(WritableStreamState::Erroring);
419
420 self.stored_error.set(*error);
422
423 if let Some(writer) = self.writer.get() {
425 writer.ensure_ready_promise_rejected(global, error, can_gc);
427 }
428
429 if !self.has_operations_marked_inflight() && controller.started() {
431 self.finish_erroring(cx, global, can_gc);
433 }
434 }
435
436 pub(crate) fn deal_with_rejection(
438 &self,
439 cx: SafeJSContext,
440 global: &GlobalScope,
441 error: SafeHandleValue,
442 can_gc: CanGc,
443 ) {
444 if self.is_writable() {
448 self.start_erroring(cx, global, error, can_gc);
450
451 return;
453 }
454
455 assert!(self.is_erroring());
457
458 self.finish_erroring(cx, global, can_gc);
460 }
461
462 pub(crate) fn mark_first_write_request_in_flight(&self) {
464 let mut in_flight_write_request = self.in_flight_write_request.borrow_mut();
465 let mut write_requests = self.write_requests.borrow_mut();
466
467 assert!(in_flight_write_request.is_none());
469
470 assert!(!write_requests.is_empty());
472
473 let write_request = write_requests.pop_front().unwrap();
476
477 *in_flight_write_request = Some(write_request);
479 }
480
481 pub(crate) fn mark_close_request_in_flight(&self) {
483 let mut in_flight_close_request = self.in_flight_close_request.borrow_mut();
484 let mut close_request = self.close_request.borrow_mut();
485
486 assert!(in_flight_close_request.is_none());
488
489 assert!(close_request.is_some());
491
492 let close_request = close_request.take().unwrap();
495
496 *in_flight_close_request = Some(close_request);
498 }
499
500 pub(crate) fn finish_in_flight_close(&self, cx: SafeJSContext, can_gc: CanGc) {
502 let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
503 unreachable!("in_flight_close_request must be Some");
505 };
506
507 in_flight_close_request.resolve_native(&(), can_gc);
509
510 assert!(self.is_writable() || self.is_erroring());
515
516 if self.is_erroring() {
518 self.stored_error.set(UndefinedValue());
520
521 rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
523 if let Some(pending_abort_request) = &*pending_abort_request {
524 pending_abort_request.promise.resolve_native(&(), can_gc);
526
527 }
530 }
531
532 self.state.set(WritableStreamState::Closed);
534
535 if let Some(writer) = self.writer.get() {
537 writer.resolve_closed_promise_with_undefined(can_gc);
540 }
541
542 assert!(self.pending_abort_request.borrow().is_none());
544
545 assert!(self.stored_error.get().is_undefined());
547 }
548
549 pub(crate) fn finish_in_flight_close_with_error(
551 &self,
552 cx: SafeJSContext,
553 global: &GlobalScope,
554 error: SafeHandleValue,
555 can_gc: CanGc,
556 ) {
557 let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
558 unreachable!("Inflight close request must be defined.");
560 };
561
562 in_flight_close_request.reject_native(&error, can_gc);
564
565 assert!(self.is_erroring() || self.is_writable());
570
571 rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
573 if let Some(pending_abort_request) = &*pending_abort_request {
574 pending_abort_request.promise.reject_native(&error, can_gc);
576
577 }
580
581 self.deal_with_rejection(cx, global, error, can_gc);
583 }
584
585 pub(crate) fn finish_in_flight_write_with_error(
587 &self,
588 cx: SafeJSContext,
589 global: &GlobalScope,
590 error: SafeHandleValue,
591 can_gc: CanGc,
592 ) {
593 let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
594 unreachable!("Inflight write request must be defined.");
596 };
597
598 in_flight_write_request.reject_native(&error, can_gc);
600
601 assert!(self.is_erroring() || self.is_writable());
606
607 self.deal_with_rejection(cx, global, error, can_gc);
609 }
610
611 pub(crate) fn get_writer(&self) -> Option<DomRoot<WritableStreamDefaultWriter>> {
612 self.writer.get()
613 }
614
615 pub(crate) fn set_writer(&self, writer: Option<&WritableStreamDefaultWriter>) {
616 self.writer.set(writer);
617 }
618
619 pub(crate) fn set_backpressure(&self, backpressure: bool) {
620 self.backpressure.set(backpressure);
621 }
622
623 pub(crate) fn get_backpressure(&self) -> bool {
624 self.backpressure.get()
625 }
626
627 pub(crate) fn is_locked(&self) -> bool {
629 self.get_writer().is_some()
632 }
633
634 pub(crate) fn add_write_request(&self, global: &GlobalScope, can_gc: CanGc) -> Rc<Promise> {
636 assert!(self.is_locked());
638
639 assert!(self.is_writable());
641
642 let promise = Promise::new(global, can_gc);
644
645 self.write_requests.borrow_mut().push_back(promise.clone());
647
648 promise
650 }
651
652 pub(crate) fn get_controller(&self) -> Option<DomRoot<WritableStreamDefaultController>> {
654 self.controller.get()
655 }
656
657 pub(crate) fn abort(
659 &self,
660 cx: SafeJSContext,
661 global: &GlobalScope,
662 provided_reason: SafeHandleValue,
663 realm: InRealm,
664 can_gc: CanGc,
665 ) -> Rc<Promise> {
666 if self.is_closed() || self.is_errored() {
668 return Promise::new_resolved(global, cx, (), can_gc);
670 }
671
672 self.get_controller()
674 .expect("Stream must have a controller.")
675 .signal_abort(cx, provided_reason, realm, can_gc);
676
677 let state = self.state.get();
679
680 if matches!(
682 state,
683 WritableStreamState::Closed | WritableStreamState::Errored
684 ) {
685 return Promise::new_resolved(global, cx, (), can_gc);
686 }
687
688 if self.pending_abort_request.borrow().is_some() {
690 return self
692 .pending_abort_request
693 .borrow()
694 .as_ref()
695 .expect("Pending abort request must be Some.")
696 .promise
697 .clone();
698 }
699
700 assert!(self.is_writable() || self.is_erroring());
702
703 let mut was_already_erroring = false;
705 rooted!(in(*cx) let undefined_reason = UndefinedValue());
706
707 let reason = if self.is_erroring() {
709 was_already_erroring = true;
711
712 undefined_reason.handle()
714 } else {
715 provided_reason
717 };
718
719 let promise = Promise::new(global, can_gc);
721
722 *self.pending_abort_request.borrow_mut() = Some(PendingAbortRequest {
727 promise: promise.clone(),
728 reason: Heap::boxed(reason.get()),
729 was_already_erroring,
730 });
731
732 if !was_already_erroring {
734 self.start_erroring(cx, global, reason, can_gc);
736 }
737
738 promise
740 }
741
742 pub(crate) fn close(
744 &self,
745 cx: SafeJSContext,
746 global: &GlobalScope,
747 can_gc: CanGc,
748 ) -> Rc<Promise> {
749 if self.is_closed() || self.is_errored() {
752 let promise = Promise::new(global, can_gc);
754 promise.reject_error(
755 Error::Type("Stream is closed or errored.".to_string()),
756 can_gc,
757 );
758 return promise;
759 }
760
761 assert!(self.is_writable() || self.is_erroring());
763
764 assert!(!self.close_queued_or_in_flight());
766
767 let promise = Promise::new(global, can_gc);
769
770 *self.close_request.borrow_mut() = Some(promise.clone());
772
773 if let Some(writer) = self.writer.get() {
776 if self.get_backpressure() && self.is_writable() {
779 writer.resolve_ready_promise_with_undefined(can_gc);
781 }
782 }
783
784 let Some(controller) = self.controller.get() else {
786 unreachable!("Stream must have a controller.");
787 };
788 controller.close(cx, global, can_gc);
789
790 promise
792 }
793
794 pub(crate) fn get_desired_size(&self) -> Option<f64> {
797 if self.is_errored() || self.is_erroring() {
803 return None;
804 }
805
806 if self.is_closed() {
808 return Some(0.);
809 }
810
811 let Some(controller) = self.controller.get() else {
812 unreachable!("Stream must have a controller.");
813 };
814 Some(controller.get_desired_size())
815 }
816
817 pub(crate) fn aquire_default_writer(
819 &self,
820 cx: SafeJSContext,
821 global: &GlobalScope,
822 can_gc: CanGc,
823 ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
824 let writer = WritableStreamDefaultWriter::new(global, None, can_gc);
826
827 writer.setup(cx, self, can_gc)?;
829
830 Ok(writer)
832 }
833
834 pub(crate) fn update_backpressure(
836 &self,
837 backpressure: bool,
838 global: &GlobalScope,
839 can_gc: CanGc,
840 ) {
841 self.is_writable();
843
844 assert!(!self.close_queued_or_in_flight());
846
847 let writer = self.get_writer();
849
850 if let Some(writer) = writer {
851 if backpressure != self.get_backpressure() {
853 if backpressure {
855 let promise = Promise::new(global, can_gc);
857 writer.set_ready_promise(promise);
858 } else {
859 assert!(!backpressure);
862 writer.resolve_ready_promise_with_undefined(can_gc);
864 }
865 }
866 }
867
868 self.set_backpressure(backpressure);
870 }
871
872 pub(crate) fn setup_cross_realm_transform_writable(
874 &self,
875 cx: SafeJSContext,
876 port: &MessagePort,
877 can_gc: CanGc,
878 ) {
879 let port_id = port.message_port_id();
880 let global = self.global();
881
882 let size_algorithm = extract_size_algorithm(&QueuingStrategy::default(), can_gc);
888
889 let backpressure_promise = Rc::new(RefCell::new(Some(Promise::new(&global, can_gc))));
893
894 let controller = WritableStreamDefaultController::new(
896 &global,
897 UnderlyingSinkType::Transfer {
898 backpressure_promise: backpressure_promise.clone(),
899 port: Dom::from_ref(port),
900 },
901 1.0,
902 size_algorithm,
903 can_gc,
904 );
905
906 rooted!(in(*cx) let cross_realm_transform_writable = CrossRealmTransformWritable {
909 controller: Dom::from_ref(&controller),
910 backpressure_promise: backpressure_promise.clone(),
911 });
912 global.note_cross_realm_transform_writable(&cross_realm_transform_writable, port_id);
913
914 port.Start(can_gc);
916
917 controller
919 .setup(cx, &global, self, can_gc)
920 .expect("Setup for transfer cannot fail");
921 }
922 #[allow(clippy::too_many_arguments)]
924 pub(crate) fn setup_from_underlying_sink(
925 &self,
926 cx: SafeJSContext,
927 global: &GlobalScope,
928 stream: &WritableStream,
929 underlying_sink_obj: SafeHandleObject,
930 underlying_sink: &UnderlyingSink,
931 strategy_hwm: f64,
932 strategy_size: Rc<QueuingStrategySize>,
933 can_gc: CanGc,
934 ) -> Result<(), Error> {
935 let controller = WritableStreamDefaultController::new(
961 global,
962 UnderlyingSinkType::new_js(
963 underlying_sink.abort.clone(),
964 underlying_sink.start.clone(),
965 underlying_sink.close.clone(),
966 underlying_sink.write.clone(),
967 ),
968 strategy_hwm,
969 strategy_size,
970 can_gc,
971 );
972
973 controller.set_underlying_sink_this_object(underlying_sink_obj);
976
977 controller.setup(cx, global, stream, can_gc)
979 }
980}
981
982#[cfg_attr(crown, expect(crown::unrooted_must_root))]
984pub(crate) fn create_writable_stream(
985 cx: SafeJSContext,
986 global: &GlobalScope,
987 writable_high_water_mark: f64,
988 writable_size_algorithm: Rc<QueuingStrategySize>,
989 underlying_sink_type: UnderlyingSinkType,
990 can_gc: CanGc,
991) -> Fallible<DomRoot<WritableStream>> {
992 assert!(writable_high_water_mark >= 0.0);
994
995 let stream = WritableStream::new_with_proto(global, None, can_gc);
998
999 let controller = WritableStreamDefaultController::new(
1001 global,
1002 underlying_sink_type,
1003 writable_high_water_mark,
1004 writable_size_algorithm,
1005 can_gc,
1006 );
1007
1008 controller.setup(cx, global, &stream, can_gc)?;
1011
1012 Ok(stream)
1014}
1015
1016impl WritableStreamMethods<crate::DomTypeHolder> for WritableStream {
1017 fn Constructor(
1019 cx: SafeJSContext,
1020 global: &GlobalScope,
1021 proto: Option<SafeHandleObject>,
1022 can_gc: CanGc,
1023 underlying_sink: Option<*mut JSObject>,
1024 strategy: &QueuingStrategy,
1025 ) -> Fallible<DomRoot<WritableStream>> {
1026 rooted!(in(*cx) let underlying_sink_obj = underlying_sink.unwrap_or(ptr::null_mut()));
1028
1029 let underlying_sink_dict = if !underlying_sink_obj.is_null() {
1032 rooted!(in(*cx) let obj_val = ObjectValue(underlying_sink_obj.get()));
1033 match UnderlyingSink::new(cx, obj_val.handle(), can_gc) {
1034 Ok(ConversionResult::Success(val)) => val,
1035 Ok(ConversionResult::Failure(error)) => return Err(Error::Type(error.to_string())),
1036 _ => {
1037 return Err(Error::JSFailed);
1038 },
1039 }
1040 } else {
1041 UnderlyingSink::empty()
1042 };
1043
1044 if !underlying_sink_dict.type_.handle().is_undefined() {
1045 return Err(Error::Range("type is set".to_string()));
1047 }
1048
1049 let stream = WritableStream::new_with_proto(global, proto, can_gc);
1051
1052 let size_algorithm = extract_size_algorithm(strategy, can_gc);
1054
1055 let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
1057
1058 stream.setup_from_underlying_sink(
1061 cx,
1062 global,
1063 &stream,
1064 underlying_sink_obj.handle(),
1065 &underlying_sink_dict,
1066 high_water_mark,
1067 size_algorithm,
1068 can_gc,
1069 )?;
1070
1071 Ok(stream)
1072 }
1073
1074 fn Locked(&self) -> bool {
1076 self.is_locked()
1078 }
1079
1080 fn Abort(
1082 &self,
1083 cx: SafeJSContext,
1084 reason: SafeHandleValue,
1085 realm: InRealm,
1086 can_gc: CanGc,
1087 ) -> Rc<Promise> {
1088 let global = GlobalScope::from_safe_context(cx, realm);
1089
1090 if self.is_locked() {
1092 let promise = Promise::new(&global, can_gc);
1094 promise.reject_error(Error::Type("Stream is locked.".to_string()), can_gc);
1095 return promise;
1096 }
1097
1098 self.abort(cx, &global, reason, realm, can_gc)
1100 }
1101
1102 fn Close(&self, realm: InRealm, can_gc: CanGc) -> Rc<Promise> {
1104 let cx = GlobalScope::get_cx();
1105 let global = GlobalScope::from_safe_context(cx, realm);
1106
1107 if self.is_locked() {
1109 let promise = Promise::new(&global, can_gc);
1111 promise.reject_error(Error::Type("Stream is locked.".to_string()), can_gc);
1112 return promise;
1113 }
1114
1115 if self.close_queued_or_in_flight() {
1117 let promise = Promise::new(&global, can_gc);
1119 promise.reject_error(
1120 Error::Type("Stream has closed queued or in-flight".to_string()),
1121 can_gc,
1122 );
1123 return promise;
1124 }
1125
1126 self.close(cx, &global, can_gc)
1128 }
1129
1130 fn GetWriter(
1132 &self,
1133 realm: InRealm,
1134 can_gc: CanGc,
1135 ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
1136 let cx = GlobalScope::get_cx();
1137 let global = GlobalScope::from_safe_context(cx, realm);
1138
1139 self.aquire_default_writer(cx, &global, can_gc)
1141 }
1142}
1143
1144impl js::gc::Rootable for CrossRealmTransformWritable {}
1145
1146#[derive(Clone, JSTraceable, MallocSizeOf)]
1150#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
1151pub(crate) struct CrossRealmTransformWritable {
1152 controller: Dom<WritableStreamDefaultController>,
1154
1155 #[ignore_malloc_size_of = "nested Rc"]
1157 backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
1158}
1159
1160impl CrossRealmTransformWritable {
1161 #[expect(unsafe_code)]
1164 pub(crate) fn handle_message(
1165 &self,
1166 cx: SafeJSContext,
1167 global: &GlobalScope,
1168 message: SafeHandleValue,
1169 _realm: InRealm,
1170 can_gc: CanGc,
1171 ) {
1172 rooted!(in(*cx) let mut value = UndefinedValue());
1173 let type_string =
1174 unsafe { get_type_and_value_from_message(cx, message, value.handle_mut(), can_gc) };
1175
1176 if type_string == "error" {
1181 self.controller
1183 .error_if_needed(cx, value.handle(), global, can_gc);
1184 }
1185
1186 let backpressure_promise = self.backpressure_promise.borrow_mut().take();
1187
1188 if let Some(promise) = backpressure_promise {
1191 promise.resolve_native(&(), can_gc);
1193
1194 }
1197 }
1198
1199 pub(crate) fn handle_error(
1202 &self,
1203 cx: SafeJSContext,
1204 global: &GlobalScope,
1205 port: &MessagePort,
1206 _realm: InRealm,
1207 can_gc: CanGc,
1208 ) {
1209 let error = DOMException::new(global, DOMErrorName::DataCloneError, can_gc);
1211 rooted!(in(*cx) let mut rooted_error = UndefinedValue());
1212 error.safe_to_jsval(cx, rooted_error.handle_mut(), can_gc);
1213
1214 port.cross_realm_transform_send_error(rooted_error.handle(), can_gc);
1216
1217 self.controller
1219 .error_if_needed(cx, rooted_error.handle(), global, can_gc);
1220
1221 global.disentangle_port(port, can_gc);
1223 }
1224}
1225
1226impl Transferable for WritableStream {
1228 type Index = MessagePortIndex;
1229 type Data = MessagePortImpl;
1230
1231 fn transfer(&self) -> Fallible<(MessagePortId, MessagePortImpl)> {
1233 if self.is_locked() {
1236 return Err(Error::DataClone(None));
1237 }
1238
1239 let global = self.global();
1240 let realm = enter_realm(&*global);
1241 let comp = InRealm::Entered(&realm);
1242 let cx = GlobalScope::get_cx();
1243 let can_gc = CanGc::note();
1244
1245 let port_1 = MessagePort::new(&global, can_gc);
1247 global.track_message_port(&port_1, None);
1248
1249 let port_2 = MessagePort::new(&global, can_gc);
1251 global.track_message_port(&port_2, None);
1252
1253 global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
1255
1256 let readable = ReadableStream::new_with_proto(&global, None, can_gc);
1258
1259 readable.setup_cross_realm_transform_readable(cx, &port_1, can_gc);
1261
1262 let promise = readable.pipe_to(cx, &global, self, false, false, false, None, comp, can_gc);
1264
1265 promise.set_promise_is_handled();
1267
1268 port_2.transfer()
1270 }
1271
1272 fn transfer_receive(
1274 owner: &GlobalScope,
1275 id: MessagePortId,
1276 port_impl: MessagePortImpl,
1277 ) -> Result<DomRoot<Self>, ()> {
1278 let cx = GlobalScope::get_cx();
1279 let can_gc = CanGc::note();
1280
1281 let value = WritableStream::new_with_proto(owner, None, can_gc);
1284
1285 let transferred_port = MessagePort::transfer_receive(owner, id, port_impl)?;
1292
1293 value.setup_cross_realm_transform_writable(cx, &transferred_port, can_gc);
1295 Ok(value)
1296 }
1297
1298 fn serialized_storage<'a>(
1300 data: StructuredData<'a, '_>,
1301 ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
1302 match data {
1303 StructuredData::Reader(r) => &mut r.port_impls,
1304 StructuredData::Writer(w) => &mut w.ports,
1305 }
1306 }
1307}