1use std::cell::{Cell, RefCell};
6use std::collections::{HashMap, VecDeque};
7use std::mem;
8use std::ptr::{self};
9use std::rc::Rc;
10
11use base::id::{MessagePortId, MessagePortIndex};
12use constellation_traits::MessagePortImpl;
13use dom_struct::dom_struct;
14use js::jsapi::{Heap, JSObject};
15use js::jsval::{JSVal, ObjectValue, UndefinedValue};
16use js::rust::{
17 HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
18 MutableHandleValue as SafeMutableHandleValue,
19};
20use script_bindings::codegen::GenericBindings::MessagePortBinding::MessagePortMethods;
21use script_bindings::conversions::SafeToJSValConvertible;
22
23use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
24use crate::dom::bindings::cell::DomRefCell;
25use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
26use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::UnderlyingSink;
27use crate::dom::bindings::codegen::Bindings::WritableStreamBinding::WritableStreamMethods;
28use crate::dom::bindings::conversions::ConversionResult;
29use crate::dom::bindings::error::{Error, Fallible};
30use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
31use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
32use crate::dom::bindings::structuredclone::StructuredData;
33use crate::dom::bindings::transferable::Transferable;
34use crate::dom::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
35use crate::dom::domexception::{DOMErrorName, DOMException};
36use crate::dom::globalscope::GlobalScope;
37use crate::dom::messageport::MessagePort;
38use crate::dom::promise::Promise;
39use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
40use crate::dom::readablestream::{ReadableStream, get_type_and_value_from_message};
41use crate::dom::writablestreamdefaultcontroller::{
42 UnderlyingSinkType, WritableStreamDefaultController,
43};
44use crate::dom::writablestreamdefaultwriter::WritableStreamDefaultWriter;
45use crate::realms::{InRealm, enter_realm};
46use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
47
48impl js::gc::Rootable for AbortAlgorithmFulfillmentHandler {}
49
50#[derive(JSTraceable, MallocSizeOf)]
53#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
54struct AbortAlgorithmFulfillmentHandler {
55 stream: Dom<WritableStream>,
56 #[ignore_malloc_size_of = "Rc is hard"]
57 abort_request_promise: Rc<Promise>,
58}
59
60impl Callback for AbortAlgorithmFulfillmentHandler {
61 fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
62 self.abort_request_promise.resolve_native(&(), can_gc);
64
65 self.stream
67 .as_rooted()
68 .reject_close_and_closed_promise_if_needed(cx, can_gc);
69 }
70}
71
72impl js::gc::Rootable for AbortAlgorithmRejectionHandler {}
73
74#[derive(JSTraceable, MallocSizeOf)]
77#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
78struct AbortAlgorithmRejectionHandler {
79 stream: Dom<WritableStream>,
80 #[ignore_malloc_size_of = "Rc is hard"]
81 abort_request_promise: Rc<Promise>,
82}
83
84impl Callback for AbortAlgorithmRejectionHandler {
85 fn callback(&self, cx: SafeJSContext, reason: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
86 self.abort_request_promise.reject_native(&reason, can_gc);
88
89 self.stream
91 .as_rooted()
92 .reject_close_and_closed_promise_if_needed(cx, can_gc);
93 }
94}
95
96impl js::gc::Rootable for PendingAbortRequest {}
97
98#[derive(JSTraceable, MallocSizeOf)]
100#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
101struct PendingAbortRequest {
102 #[ignore_malloc_size_of = "Rc is hard"]
104 promise: Rc<Promise>,
105
106 #[ignore_malloc_size_of = "mozjs"]
108 reason: Box<Heap<JSVal>>,
109
110 was_already_erroring: bool,
112}
113
114#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf)]
116pub(crate) enum WritableStreamState {
117 #[default]
118 Writable,
119 Closed,
120 Erroring,
121 Errored,
122}
123
124#[dom_struct]
126pub struct WritableStream {
127 reflector_: Reflector,
128
129 backpressure: Cell<bool>,
131
132 #[ignore_malloc_size_of = "Rc is hard"]
134 close_request: DomRefCell<Option<Rc<Promise>>>,
135
136 controller: MutNullableDom<WritableStreamDefaultController>,
138
139 detached: Cell<bool>,
141
142 #[ignore_malloc_size_of = "Rc is hard"]
144 in_flight_write_request: DomRefCell<Option<Rc<Promise>>>,
145
146 #[ignore_malloc_size_of = "Rc is hard"]
148 in_flight_close_request: DomRefCell<Option<Rc<Promise>>>,
149
150 pending_abort_request: DomRefCell<Option<PendingAbortRequest>>,
152
153 state: Cell<WritableStreamState>,
155
156 #[ignore_malloc_size_of = "mozjs"]
158 stored_error: Heap<JSVal>,
159
160 writer: MutNullableDom<WritableStreamDefaultWriter>,
162
163 #[ignore_malloc_size_of = "Rc is hard"]
165 write_requests: DomRefCell<VecDeque<Rc<Promise>>>,
166}
167
168impl WritableStream {
169 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
170 fn new_inherited() -> WritableStream {
172 WritableStream {
173 reflector_: Reflector::new(),
174 backpressure: Default::default(),
175 close_request: Default::default(),
176 controller: Default::default(),
177 detached: Default::default(),
178 in_flight_write_request: Default::default(),
179 in_flight_close_request: Default::default(),
180 pending_abort_request: Default::default(),
181 state: Default::default(),
182 stored_error: Default::default(),
183 writer: Default::default(),
184 write_requests: Default::default(),
185 }
186 }
187
188 pub(crate) fn new_with_proto(
189 global: &GlobalScope,
190 proto: Option<SafeHandleObject>,
191 can_gc: CanGc,
192 ) -> DomRoot<WritableStream> {
193 reflect_dom_object_with_proto(
194 Box::new(WritableStream::new_inherited()),
195 global,
196 proto,
197 can_gc,
198 )
199 }
200
201 pub(crate) fn assert_no_controller(&self) {
204 assert!(self.controller.get().is_none());
205 }
206
207 pub(crate) fn set_default_controller(&self, controller: &WritableStreamDefaultController) {
210 self.controller.set(Some(controller));
211 }
212
213 #[allow(unused)]
214 pub(crate) fn get_default_controller(&self) -> DomRoot<WritableStreamDefaultController> {
215 self.controller.get().expect("Controller should be set.")
216 }
217
218 pub(crate) fn is_writable(&self) -> bool {
219 matches!(self.state.get(), WritableStreamState::Writable)
220 }
221
222 pub(crate) fn is_erroring(&self) -> bool {
223 matches!(self.state.get(), WritableStreamState::Erroring)
224 }
225
226 pub(crate) fn is_errored(&self) -> bool {
227 matches!(self.state.get(), WritableStreamState::Errored)
228 }
229
230 pub(crate) fn is_closed(&self) -> bool {
231 matches!(self.state.get(), WritableStreamState::Closed)
232 }
233
234 pub(crate) fn has_in_flight_write_request(&self) -> bool {
235 self.in_flight_write_request.borrow().is_some()
236 }
237
238 pub(crate) fn has_operations_marked_inflight(&self) -> bool {
240 let in_flight_write_requested = self.in_flight_write_request.borrow().is_some();
241 let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
242
243 in_flight_write_requested || in_flight_close_requested
244 }
245
246 pub(crate) fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
248 handle_mut.set(self.stored_error.get());
249 }
250
251 pub(crate) fn finish_erroring(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
253 assert!(self.is_erroring());
255
256 assert!(!self.has_operations_marked_inflight());
258
259 self.state.set(WritableStreamState::Errored);
261
262 let Some(controller) = self.controller.get() else {
264 unreachable!("Stream should have a controller.");
265 };
266 controller.perform_error_steps();
267
268 rooted!(in(*cx) let mut stored_error = UndefinedValue());
270 self.get_stored_error(stored_error.handle_mut());
271
272 let write_requests = mem::take(&mut *self.write_requests.borrow_mut());
274 for request in write_requests {
275 request.reject(cx, stored_error.handle(), can_gc);
277 }
278
279 if self.pending_abort_request.borrow().is_none() {
284 self.reject_close_and_closed_promise_if_needed(cx, can_gc);
286
287 return;
289 }
290
291 rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
294 if let Some(pending_abort_request) = &*pending_abort_request {
295 if pending_abort_request.was_already_erroring {
297 pending_abort_request
299 .promise
300 .reject(cx, stored_error.handle(), can_gc);
301
302 self.reject_close_and_closed_promise_if_needed(cx, can_gc);
304
305 return;
307 }
308
309 rooted!(in(*cx) let mut reason = UndefinedValue());
311 reason.set(pending_abort_request.reason.get());
312 let promise = controller.abort_steps(cx, global, reason.handle(), can_gc);
313
314 rooted!(in(*cx) let mut fulfillment_handler = Some(AbortAlgorithmFulfillmentHandler {
316 stream: Dom::from_ref(self),
317 abort_request_promise: pending_abort_request.promise.clone(),
318 }));
319
320 rooted!(in(*cx) let mut rejection_handler = Some(AbortAlgorithmRejectionHandler {
322 stream: Dom::from_ref(self),
323 abort_request_promise: pending_abort_request.promise.clone(),
324 }));
325
326 let handler = PromiseNativeHandler::new(
327 global,
328 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
329 rejection_handler.take().map(|h| Box::new(h) as Box<_>),
330 can_gc,
331 );
332 let realm = enter_realm(global);
333 let comp = InRealm::Entered(&realm);
334 promise.append_native_handler(&handler, comp, can_gc);
335 }
336 }
337
338 fn reject_close_and_closed_promise_if_needed(&self, cx: SafeJSContext, can_gc: CanGc) {
340 assert!(self.is_errored());
342
343 rooted!(in(*cx) let mut stored_error = UndefinedValue());
344 self.get_stored_error(stored_error.handle_mut());
345
346 let close_request = self.close_request.borrow_mut().take();
348 if let Some(close_request) = close_request {
349 assert!(self.in_flight_close_request.borrow().is_none());
351
352 close_request.reject_native(&stored_error.handle(), can_gc)
354
355 }
358
359 if let Some(writer) = self.writer.get() {
362 writer.reject_closed_promise_with_stored_error(&stored_error.handle(), can_gc);
364
365 writer.set_close_promise_is_handled();
367 }
368 }
369
370 pub(crate) fn close_queued_or_in_flight(&self) -> bool {
372 let close_requested = self.close_request.borrow().is_some();
373 let in_flight_close_requested = self.in_flight_close_request.borrow().is_some();
374
375 close_requested || in_flight_close_requested
376 }
377
378 pub(crate) fn finish_in_flight_write(&self, can_gc: CanGc) {
380 let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
381 unreachable!("Stream should have a write request");
383 };
384
385 in_flight_write_request.resolve_native(&(), can_gc);
387
388 }
391
392 pub(crate) fn start_erroring(
394 &self,
395 cx: SafeJSContext,
396 global: &GlobalScope,
397 error: SafeHandleValue,
398 can_gc: CanGc,
399 ) {
400 assert!(self.stored_error.get().is_undefined());
402
403 assert!(self.is_writable());
405
406 let Some(controller) = self.controller.get() else {
408 unreachable!("Stream should have a controller.");
410 };
411
412 self.state.set(WritableStreamState::Erroring);
414
415 self.stored_error.set(*error);
417
418 if let Some(writer) = self.writer.get() {
420 writer.ensure_ready_promise_rejected(global, error, can_gc);
422 }
423
424 if !self.has_operations_marked_inflight() && controller.started() {
426 self.finish_erroring(cx, global, can_gc);
428 }
429 }
430
431 pub(crate) fn deal_with_rejection(
433 &self,
434 cx: SafeJSContext,
435 global: &GlobalScope,
436 error: SafeHandleValue,
437 can_gc: CanGc,
438 ) {
439 if self.is_writable() {
443 self.start_erroring(cx, global, error, can_gc);
445
446 return;
448 }
449
450 assert!(self.is_erroring());
452
453 self.finish_erroring(cx, global, can_gc);
455 }
456
457 pub(crate) fn mark_first_write_request_in_flight(&self) {
459 let mut in_flight_write_request = self.in_flight_write_request.borrow_mut();
460 let mut write_requests = self.write_requests.borrow_mut();
461
462 assert!(in_flight_write_request.is_none());
464
465 assert!(!write_requests.is_empty());
467
468 let write_request = write_requests.pop_front().unwrap();
471
472 *in_flight_write_request = Some(write_request);
474 }
475
476 pub(crate) fn mark_close_request_in_flight(&self) {
478 let mut in_flight_close_request = self.in_flight_close_request.borrow_mut();
479 let mut close_request = self.close_request.borrow_mut();
480
481 assert!(in_flight_close_request.is_none());
483
484 assert!(close_request.is_some());
486
487 let close_request = close_request.take().unwrap();
490
491 *in_flight_close_request = Some(close_request);
493 }
494
495 pub(crate) fn finish_in_flight_close(&self, cx: SafeJSContext, can_gc: CanGc) {
497 let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
498 unreachable!("in_flight_close_request must be Some");
500 };
501
502 in_flight_close_request.resolve_native(&(), can_gc);
504
505 assert!(self.is_writable() || self.is_erroring());
510
511 if self.is_erroring() {
513 self.stored_error.set(UndefinedValue());
515
516 rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
518 if let Some(pending_abort_request) = &*pending_abort_request {
519 pending_abort_request.promise.resolve_native(&(), can_gc);
521
522 }
525 }
526
527 self.state.set(WritableStreamState::Closed);
529
530 if let Some(writer) = self.writer.get() {
532 writer.resolve_closed_promise_with_undefined(can_gc);
535 }
536
537 assert!(self.pending_abort_request.borrow().is_none());
539
540 assert!(self.stored_error.get().is_undefined());
542 }
543
544 pub(crate) fn finish_in_flight_close_with_error(
546 &self,
547 cx: SafeJSContext,
548 global: &GlobalScope,
549 error: SafeHandleValue,
550 can_gc: CanGc,
551 ) {
552 let Some(in_flight_close_request) = self.in_flight_close_request.borrow_mut().take() else {
553 unreachable!("Inflight close request must be defined.");
555 };
556
557 in_flight_close_request.reject_native(&error, can_gc);
559
560 assert!(self.is_erroring() || self.is_writable());
565
566 rooted!(in(*cx) let pending_abort_request = self.pending_abort_request.borrow_mut().take());
568 if let Some(pending_abort_request) = &*pending_abort_request {
569 pending_abort_request.promise.reject_native(&error, can_gc);
571
572 }
575
576 self.deal_with_rejection(cx, global, error, can_gc);
578 }
579
580 pub(crate) fn finish_in_flight_write_with_error(
582 &self,
583 cx: SafeJSContext,
584 global: &GlobalScope,
585 error: SafeHandleValue,
586 can_gc: CanGc,
587 ) {
588 let Some(in_flight_write_request) = self.in_flight_write_request.borrow_mut().take() else {
589 unreachable!("Inflight write request must be defined.");
591 };
592
593 in_flight_write_request.reject_native(&error, can_gc);
595
596 assert!(self.is_erroring() || self.is_writable());
601
602 self.deal_with_rejection(cx, global, error, can_gc);
604 }
605
606 pub(crate) fn get_writer(&self) -> Option<DomRoot<WritableStreamDefaultWriter>> {
607 self.writer.get()
608 }
609
610 pub(crate) fn set_writer(&self, writer: Option<&WritableStreamDefaultWriter>) {
611 self.writer.set(writer);
612 }
613
614 pub(crate) fn set_backpressure(&self, backpressure: bool) {
615 self.backpressure.set(backpressure);
616 }
617
618 pub(crate) fn get_backpressure(&self) -> bool {
619 self.backpressure.get()
620 }
621
622 pub(crate) fn is_locked(&self) -> bool {
624 self.get_writer().is_some()
627 }
628
629 pub(crate) fn add_write_request(&self, global: &GlobalScope, can_gc: CanGc) -> Rc<Promise> {
631 assert!(self.is_locked());
633
634 assert!(self.is_writable());
636
637 let promise = Promise::new(global, can_gc);
639
640 self.write_requests.borrow_mut().push_back(promise.clone());
642
643 promise
645 }
646
647 pub(crate) fn get_controller(&self) -> Option<DomRoot<WritableStreamDefaultController>> {
649 self.controller.get()
650 }
651
652 pub(crate) fn abort(
654 &self,
655 cx: SafeJSContext,
656 global: &GlobalScope,
657 provided_reason: SafeHandleValue,
658 realm: InRealm,
659 can_gc: CanGc,
660 ) -> Rc<Promise> {
661 if self.is_closed() || self.is_errored() {
663 return Promise::new_resolved(global, cx, (), can_gc);
665 }
666
667 self.get_controller()
669 .expect("Stream must have a controller.")
670 .signal_abort(cx, provided_reason, realm, can_gc);
671
672 let state = self.state.get();
674
675 if matches!(
677 state,
678 WritableStreamState::Closed | WritableStreamState::Errored
679 ) {
680 return Promise::new_resolved(global, cx, (), can_gc);
681 }
682
683 if self.pending_abort_request.borrow().is_some() {
685 return self
687 .pending_abort_request
688 .borrow()
689 .as_ref()
690 .expect("Pending abort request must be Some.")
691 .promise
692 .clone();
693 }
694
695 assert!(self.is_writable() || self.is_erroring());
697
698 let mut was_already_erroring = false;
700 rooted!(in(*cx) let undefined_reason = UndefinedValue());
701
702 let reason = if self.is_erroring() {
704 was_already_erroring = true;
706
707 undefined_reason.handle()
709 } else {
710 provided_reason
712 };
713
714 let promise = Promise::new(global, can_gc);
716
717 *self.pending_abort_request.borrow_mut() = Some(PendingAbortRequest {
722 promise: promise.clone(),
723 reason: Heap::boxed(reason.get()),
724 was_already_erroring,
725 });
726
727 if !was_already_erroring {
729 self.start_erroring(cx, global, reason, can_gc);
731 }
732
733 promise
735 }
736
737 pub(crate) fn close(
739 &self,
740 cx: SafeJSContext,
741 global: &GlobalScope,
742 can_gc: CanGc,
743 ) -> Rc<Promise> {
744 if self.is_closed() || self.is_errored() {
747 let promise = Promise::new(global, can_gc);
749 promise.reject_error(
750 Error::Type("Stream is closed or errored.".to_string()),
751 can_gc,
752 );
753 return promise;
754 }
755
756 assert!(self.is_writable() || self.is_erroring());
758
759 assert!(!self.close_queued_or_in_flight());
761
762 let promise = Promise::new(global, can_gc);
764
765 *self.close_request.borrow_mut() = Some(promise.clone());
767
768 if let Some(writer) = self.writer.get() {
771 if self.get_backpressure() && self.is_writable() {
774 writer.resolve_ready_promise_with_undefined(can_gc);
776 }
777 }
778
779 let Some(controller) = self.controller.get() else {
781 unreachable!("Stream must have a controller.");
782 };
783 controller.close(cx, global, can_gc);
784
785 promise
787 }
788
789 pub(crate) fn get_desired_size(&self) -> Option<f64> {
792 if self.is_errored() || self.is_erroring() {
798 return None;
799 }
800
801 if self.is_closed() {
803 return Some(0.);
804 }
805
806 let Some(controller) = self.controller.get() else {
807 unreachable!("Stream must have a controller.");
808 };
809 Some(controller.get_desired_size())
810 }
811
812 pub(crate) fn aquire_default_writer(
814 &self,
815 cx: SafeJSContext,
816 global: &GlobalScope,
817 can_gc: CanGc,
818 ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
819 let writer = WritableStreamDefaultWriter::new(global, None, can_gc);
821
822 writer.setup(cx, self, can_gc)?;
824
825 Ok(writer)
827 }
828
829 pub(crate) fn update_backpressure(
831 &self,
832 backpressure: bool,
833 global: &GlobalScope,
834 can_gc: CanGc,
835 ) {
836 self.is_writable();
838
839 assert!(!self.close_queued_or_in_flight());
841
842 let writer = self.get_writer();
844
845 if let Some(writer) = writer {
846 if backpressure != self.get_backpressure() {
848 if backpressure {
850 let promise = Promise::new(global, can_gc);
852 writer.set_ready_promise(promise);
853 } else {
854 assert!(!backpressure);
857 writer.resolve_ready_promise_with_undefined(can_gc);
859 }
860 }
861 }
862
863 self.set_backpressure(backpressure);
865 }
866
867 pub(crate) fn setup_cross_realm_transform_writable(
869 &self,
870 cx: SafeJSContext,
871 port: &MessagePort,
872 can_gc: CanGc,
873 ) {
874 let port_id = port.message_port_id();
875 let global = self.global();
876
877 let size_algorithm = extract_size_algorithm(&QueuingStrategy::default(), can_gc);
883
884 let backpressure_promise = Rc::new(RefCell::new(Some(Promise::new(&global, can_gc))));
888
889 let controller = WritableStreamDefaultController::new(
891 &global,
892 UnderlyingSinkType::Transfer {
893 backpressure_promise: backpressure_promise.clone(),
894 port: Dom::from_ref(port),
895 },
896 1.0,
897 size_algorithm,
898 can_gc,
899 );
900
901 rooted!(in(*cx) let cross_realm_transform_writable = CrossRealmTransformWritable {
904 controller: Dom::from_ref(&controller),
905 backpressure_promise: backpressure_promise.clone(),
906 });
907 global.note_cross_realm_transform_writable(&cross_realm_transform_writable, port_id);
908
909 port.Start(can_gc);
911
912 controller
914 .setup(cx, &global, self, can_gc)
915 .expect("Setup for transfer cannot fail");
916 }
917 #[allow(clippy::too_many_arguments)]
919 pub(crate) fn setup_from_underlying_sink(
920 &self,
921 cx: SafeJSContext,
922 global: &GlobalScope,
923 stream: &WritableStream,
924 underlying_sink_obj: SafeHandleObject,
925 underlying_sink: &UnderlyingSink,
926 strategy_hwm: f64,
927 strategy_size: Rc<QueuingStrategySize>,
928 can_gc: CanGc,
929 ) -> Result<(), Error> {
930 let controller = WritableStreamDefaultController::new(
956 global,
957 UnderlyingSinkType::new_js(
958 underlying_sink.abort.clone(),
959 underlying_sink.start.clone(),
960 underlying_sink.close.clone(),
961 underlying_sink.write.clone(),
962 ),
963 strategy_hwm,
964 strategy_size,
965 can_gc,
966 );
967
968 controller.set_underlying_sink_this_object(underlying_sink_obj);
971
972 controller.setup(cx, global, stream, can_gc)
974 }
975}
976
977#[cfg_attr(crown, allow(crown::unrooted_must_root))]
979pub(crate) fn create_writable_stream(
980 cx: SafeJSContext,
981 global: &GlobalScope,
982 writable_high_water_mark: f64,
983 writable_size_algorithm: Rc<QueuingStrategySize>,
984 underlying_sink_type: UnderlyingSinkType,
985 can_gc: CanGc,
986) -> Fallible<DomRoot<WritableStream>> {
987 assert!(writable_high_water_mark >= 0.0);
989
990 let stream = WritableStream::new_with_proto(global, None, can_gc);
993
994 let controller = WritableStreamDefaultController::new(
996 global,
997 underlying_sink_type,
998 writable_high_water_mark,
999 writable_size_algorithm,
1000 can_gc,
1001 );
1002
1003 controller.setup(cx, global, &stream, can_gc)?;
1006
1007 Ok(stream)
1009}
1010
1011impl WritableStreamMethods<crate::DomTypeHolder> for WritableStream {
1012 fn Constructor(
1014 cx: SafeJSContext,
1015 global: &GlobalScope,
1016 proto: Option<SafeHandleObject>,
1017 can_gc: CanGc,
1018 underlying_sink: Option<*mut JSObject>,
1019 strategy: &QueuingStrategy,
1020 ) -> Fallible<DomRoot<WritableStream>> {
1021 rooted!(in(*cx) let underlying_sink_obj = underlying_sink.unwrap_or(ptr::null_mut()));
1023
1024 let underlying_sink_dict = if !underlying_sink_obj.is_null() {
1027 rooted!(in(*cx) let obj_val = ObjectValue(underlying_sink_obj.get()));
1028 match UnderlyingSink::new(cx, obj_val.handle()) {
1029 Ok(ConversionResult::Success(val)) => val,
1030 Ok(ConversionResult::Failure(error)) => return Err(Error::Type(error.to_string())),
1031 _ => {
1032 return Err(Error::JSFailed);
1033 },
1034 }
1035 } else {
1036 UnderlyingSink::empty()
1037 };
1038
1039 if !underlying_sink_dict.type_.handle().is_undefined() {
1040 return Err(Error::Range("type is set".to_string()));
1042 }
1043
1044 let stream = WritableStream::new_with_proto(global, proto, can_gc);
1046
1047 let size_algorithm = extract_size_algorithm(strategy, can_gc);
1049
1050 let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
1052
1053 stream.setup_from_underlying_sink(
1056 cx,
1057 global,
1058 &stream,
1059 underlying_sink_obj.handle(),
1060 &underlying_sink_dict,
1061 high_water_mark,
1062 size_algorithm,
1063 can_gc,
1064 )?;
1065
1066 Ok(stream)
1067 }
1068
1069 fn Locked(&self) -> bool {
1071 self.is_locked()
1073 }
1074
1075 fn Abort(
1077 &self,
1078 cx: SafeJSContext,
1079 reason: SafeHandleValue,
1080 realm: InRealm,
1081 can_gc: CanGc,
1082 ) -> Rc<Promise> {
1083 let global = GlobalScope::from_safe_context(cx, realm);
1084
1085 if self.is_locked() {
1087 let promise = Promise::new(&global, can_gc);
1089 promise.reject_error(Error::Type("Stream is locked.".to_string()), can_gc);
1090 return promise;
1091 }
1092
1093 self.abort(cx, &global, reason, realm, can_gc)
1095 }
1096
1097 fn Close(&self, realm: InRealm, can_gc: CanGc) -> Rc<Promise> {
1099 let cx = GlobalScope::get_cx();
1100 let global = GlobalScope::from_safe_context(cx, realm);
1101
1102 if self.is_locked() {
1104 let promise = Promise::new(&global, can_gc);
1106 promise.reject_error(Error::Type("Stream is locked.".to_string()), can_gc);
1107 return promise;
1108 }
1109
1110 if self.close_queued_or_in_flight() {
1112 let promise = Promise::new(&global, can_gc);
1114 promise.reject_error(
1115 Error::Type("Stream has closed queued or in-flight".to_string()),
1116 can_gc,
1117 );
1118 return promise;
1119 }
1120
1121 self.close(cx, &global, can_gc)
1123 }
1124
1125 fn GetWriter(
1127 &self,
1128 realm: InRealm,
1129 can_gc: CanGc,
1130 ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
1131 let cx = GlobalScope::get_cx();
1132 let global = GlobalScope::from_safe_context(cx, realm);
1133
1134 self.aquire_default_writer(cx, &global, can_gc)
1136 }
1137}
1138
1139impl js::gc::Rootable for CrossRealmTransformWritable {}
1140
1141#[derive(Clone, JSTraceable, MallocSizeOf)]
1145#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
1146pub(crate) struct CrossRealmTransformWritable {
1147 controller: Dom<WritableStreamDefaultController>,
1149
1150 #[ignore_malloc_size_of = "Rc is hard"]
1152 backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>,
1153}
1154
1155impl CrossRealmTransformWritable {
1156 #[allow(unsafe_code)]
1159 pub(crate) fn handle_message(
1160 &self,
1161 cx: SafeJSContext,
1162 global: &GlobalScope,
1163 message: SafeHandleValue,
1164 _realm: InRealm,
1165 can_gc: CanGc,
1166 ) {
1167 rooted!(in(*cx) let mut value = UndefinedValue());
1168 let type_string =
1169 unsafe { get_type_and_value_from_message(cx, message, value.handle_mut(), can_gc) };
1170
1171 if type_string == "error" {
1176 self.controller
1178 .error_if_needed(cx, value.handle(), global, can_gc);
1179 }
1180
1181 let backpressure_promise = self.backpressure_promise.borrow_mut().take();
1182
1183 if let Some(promise) = backpressure_promise {
1186 promise.resolve_native(&(), can_gc);
1188
1189 }
1192 }
1193
1194 pub(crate) fn handle_error(
1197 &self,
1198 cx: SafeJSContext,
1199 global: &GlobalScope,
1200 port: &MessagePort,
1201 _realm: InRealm,
1202 can_gc: CanGc,
1203 ) {
1204 let error = DOMException::new(global, DOMErrorName::DataCloneError, can_gc);
1206 rooted!(in(*cx) let mut rooted_error = UndefinedValue());
1207 error.safe_to_jsval(cx, rooted_error.handle_mut());
1208
1209 port.cross_realm_transform_send_error(rooted_error.handle(), can_gc);
1211
1212 self.controller
1214 .error_if_needed(cx, rooted_error.handle(), global, can_gc);
1215
1216 global.disentangle_port(port, can_gc);
1218 }
1219}
1220
1221impl Transferable for WritableStream {
1223 type Index = MessagePortIndex;
1224 type Data = MessagePortImpl;
1225
1226 fn transfer(&self) -> Fallible<(MessagePortId, MessagePortImpl)> {
1228 if self.is_locked() {
1231 return Err(Error::DataClone(None));
1232 }
1233
1234 let global = self.global();
1235 let realm = enter_realm(&*global);
1236 let comp = InRealm::Entered(&realm);
1237 let cx = GlobalScope::get_cx();
1238 let can_gc = CanGc::note();
1239
1240 let port_1 = MessagePort::new(&global, can_gc);
1242 global.track_message_port(&port_1, None);
1243
1244 let port_2 = MessagePort::new(&global, can_gc);
1246 global.track_message_port(&port_2, None);
1247
1248 global.entangle_ports(*port_1.message_port_id(), *port_2.message_port_id());
1250
1251 let readable = ReadableStream::new_with_proto(&global, None, can_gc);
1253
1254 readable.setup_cross_realm_transform_readable(cx, &port_1, can_gc);
1256
1257 let promise = readable.pipe_to(cx, &global, self, false, false, false, None, comp, can_gc);
1259
1260 promise.set_promise_is_handled();
1262
1263 port_2.transfer()
1265 }
1266
1267 fn transfer_receive(
1269 owner: &GlobalScope,
1270 id: MessagePortId,
1271 port_impl: MessagePortImpl,
1272 ) -> Result<DomRoot<Self>, ()> {
1273 let cx = GlobalScope::get_cx();
1274 let can_gc = CanGc::note();
1275
1276 let value = WritableStream::new_with_proto(owner, None, can_gc);
1279
1280 let transferred_port = MessagePort::transfer_receive(owner, id, port_impl)?;
1287
1288 value.setup_cross_realm_transform_writable(cx, &transferred_port, can_gc);
1290 Ok(value)
1291 }
1292
1293 fn serialized_storage<'a>(
1295 data: StructuredData<'a, '_>,
1296 ) -> &'a mut Option<HashMap<MessagePortId, Self::Data>> {
1297 match data {
1298 StructuredData::Reader(r) => &mut r.port_impls,
1299 StructuredData::Writer(w) => &mut w.ports,
1300 }
1301 }
1302}