1use std::cell::Cell;
6use std::collections::HashMap;
7use std::ptr::{self};
8use std::rc::Rc;
9
10use base::id::{MessagePortId, MessagePortIndex};
11use constellation_traits::TransformStreamData;
12use dom_struct::dom_struct;
13use js::jsapi::{Heap, IsPromiseObject, JSObject};
14use js::jsval::{JSVal, ObjectValue, UndefinedValue};
15use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle};
16use script_bindings::callback::ExceptionHandling;
17use script_bindings::realms::InRealm;
18
19use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize;
20use super::bindings::structuredclone::StructuredData;
21use super::bindings::transferable::Transferable;
22use super::messageport::MessagePort;
23use super::promisenativehandler::Callback;
24use super::readablestream::CrossRealmTransformReadable;
25use super::types::{TransformStreamDefaultController, WritableStream};
26use super::writablestream::CrossRealmTransformWritable;
27use crate::dom::bindings::cell::DomRefCell;
28use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
29use crate::dom::bindings::codegen::Bindings::TransformStreamBinding::TransformStreamMethods;
30use crate::dom::bindings::codegen::Bindings::TransformerBinding::Transformer;
31use crate::dom::bindings::conversions::ConversionResult;
32use crate::dom::bindings::error::{Error, Fallible};
33use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
34use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
35use crate::dom::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
36use crate::dom::globalscope::GlobalScope;
37use crate::dom::promise::Promise;
38use crate::dom::readablestream::{ReadableStream, create_readable_stream};
39use crate::dom::transformstreamdefaultcontroller::TransformerType;
40use crate::dom::types::PromiseNativeHandler;
41use crate::dom::underlyingsourcecontainer::UnderlyingSourceType;
42use crate::dom::writablestream::create_writable_stream;
43use crate::dom::writablestreamdefaultcontroller::UnderlyingSinkType;
44use crate::realms::enter_realm;
45use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
46
47impl js::gc::Rootable for TransformBackPressureChangePromiseFulfillment {}
48
49#[derive(JSTraceable, MallocSizeOf)]
52#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
53struct TransformBackPressureChangePromiseFulfillment {
54 #[ignore_malloc_size_of = "Rc is hard"]
56 result_promise: Rc<Promise>,
57
58 #[ignore_malloc_size_of = "mozjs"]
59 chunk: Box<Heap<JSVal>>,
60
61 writable: Dom<WritableStream>,
63
64 controller: Dom<TransformStreamDefaultController>,
65}
66
67impl Callback for TransformBackPressureChangePromiseFulfillment {
68 fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
70 if self.writable.is_erroring() {
74 rooted!(in(*cx) let mut error = UndefinedValue());
75 self.writable.get_stored_error(error.handle_mut());
76 self.result_promise.reject(cx, error.handle(), can_gc);
77 return;
78 }
79
80 assert!(self.writable.is_writable());
82
83 rooted!(in(*cx) let mut chunk = UndefinedValue());
85 chunk.set(self.chunk.get());
86 let transform_result = self
87 .controller
88 .transform_stream_default_controller_perform_transform(
89 cx,
90 &self.writable.global(),
91 chunk.handle(),
92 can_gc,
93 )
94 .expect("perform transform failed");
95
96 let handler = PromiseNativeHandler::new(
99 &self.writable.global(),
100 Some(Box::new(PerformTransformFulfillment {
101 result_promise: self.result_promise.clone(),
102 })),
103 Some(Box::new(PerformTransformRejection {
104 result_promise: self.result_promise.clone(),
105 })),
106 can_gc,
107 );
108
109 let realm = enter_realm(&*self.writable.global());
110 let comp = InRealm::Entered(&realm);
111 transform_result.append_native_handler(&handler, comp, can_gc);
112 }
113}
114
115#[derive(JSTraceable, MallocSizeOf)]
116#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
117struct PerformTransformFulfillment {
120 #[ignore_malloc_size_of = "Rc is hard"]
121 result_promise: Rc<Promise>,
122}
123
124impl Callback for PerformTransformFulfillment {
125 fn callback(&self, _cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
126 self.result_promise.resolve_native(&(), can_gc);
128 }
129}
130
131#[derive(JSTraceable, MallocSizeOf)]
132#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
133struct PerformTransformRejection {
136 #[ignore_malloc_size_of = "Rc is hard"]
137 result_promise: Rc<Promise>,
138}
139
140impl Callback for PerformTransformRejection {
141 fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
142 self.result_promise.reject(cx, v, can_gc);
144 }
145}
146
147#[derive(JSTraceable, MallocSizeOf)]
148#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
149struct BackpressureChangeRejection {
152 #[ignore_malloc_size_of = "Rc is hard"]
153 result_promise: Rc<Promise>,
154}
155
156impl Callback for BackpressureChangeRejection {
157 fn callback(&self, cx: SafeJSContext, reason: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
158 self.result_promise.reject(cx, reason, can_gc);
159 }
160}
161
162impl js::gc::Rootable for CancelPromiseFulfillment {}
163
164#[derive(JSTraceable, MallocSizeOf)]
167#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
168struct CancelPromiseFulfillment {
169 readable: Dom<ReadableStream>,
170 controller: Dom<TransformStreamDefaultController>,
171 #[ignore_malloc_size_of = "mozjs"]
172 reason: Box<Heap<JSVal>>,
173}
174
175impl Callback for CancelPromiseFulfillment {
176 fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
178 if self.readable.is_errored() {
180 rooted!(in(*cx) let mut error = UndefinedValue());
181 self.readable.get_stored_error(error.handle_mut());
182 self.controller
183 .get_finish_promise()
184 .expect("finish promise is not set")
185 .reject_native(&error.handle(), can_gc);
186 } else {
187 rooted!(in(*cx) let mut reason = UndefinedValue());
190 reason.set(self.reason.get());
191 self.readable
192 .get_default_controller()
193 .error(reason.handle(), can_gc);
194
195 self.controller
197 .get_finish_promise()
198 .expect("finish promise is not set")
199 .resolve_native(&(), can_gc);
200 }
201 }
202}
203
204impl js::gc::Rootable for CancelPromiseRejection {}
205
206#[derive(JSTraceable, MallocSizeOf)]
209#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
210struct CancelPromiseRejection {
211 readable: Dom<ReadableStream>,
212 controller: Dom<TransformStreamDefaultController>,
213}
214
215impl Callback for CancelPromiseRejection {
216 fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
218 self.readable.get_default_controller().error(v, can_gc);
220
221 self.controller
223 .get_finish_promise()
224 .expect("finish promise is not set")
225 .reject(cx, v, can_gc);
226 }
227}
228
229impl js::gc::Rootable for SourceCancelPromiseFulfillment {}
230
231#[derive(JSTraceable, MallocSizeOf)]
234#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
235struct SourceCancelPromiseFulfillment {
236 writeable: Dom<WritableStream>,
237 controller: Dom<TransformStreamDefaultController>,
238 stream: Dom<TransformStream>,
239 #[ignore_malloc_size_of = "mozjs"]
240 reason: Box<Heap<JSVal>>,
241}
242
243impl Callback for SourceCancelPromiseFulfillment {
244 fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
246 let finish_promise = self
248 .controller
249 .get_finish_promise()
250 .expect("finish promise is not set");
251
252 let global = &self.writeable.global();
253 if self.writeable.is_errored() {
255 rooted!(in(*cx) let mut error = UndefinedValue());
256 self.writeable.get_stored_error(error.handle_mut());
257 finish_promise.reject(cx, error.handle(), can_gc);
258 } else {
259 rooted!(in(*cx) let mut reason = UndefinedValue());
262 reason.set(self.reason.get());
263 self.writeable.get_default_controller().error_if_needed(
264 cx,
265 reason.handle(),
266 global,
267 can_gc,
268 );
269
270 self.stream.unblock_write(global, can_gc);
272
273 finish_promise.resolve_native(&(), can_gc);
275 }
276 }
277}
278
279impl js::gc::Rootable for SourceCancelPromiseRejection {}
280
281#[derive(JSTraceable, MallocSizeOf)]
284#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
285struct SourceCancelPromiseRejection {
286 writeable: Dom<WritableStream>,
287 controller: Dom<TransformStreamDefaultController>,
288 stream: Dom<TransformStream>,
289}
290
291impl Callback for SourceCancelPromiseRejection {
292 fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
294 let global = &self.writeable.global();
296
297 self.writeable
298 .get_default_controller()
299 .error_if_needed(cx, v, global, can_gc);
300
301 self.stream.unblock_write(global, can_gc);
303
304 self.controller
306 .get_finish_promise()
307 .expect("finish promise is not set")
308 .reject(cx, v, can_gc);
309 }
310}
311
312impl js::gc::Rootable for FlushPromiseFulfillment {}
313
314#[derive(JSTraceable, MallocSizeOf)]
317#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
318struct FlushPromiseFulfillment {
319 readable: Dom<ReadableStream>,
320 controller: Dom<TransformStreamDefaultController>,
321}
322
323impl Callback for FlushPromiseFulfillment {
324 fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
326 let finish_promise = self
328 .controller
329 .get_finish_promise()
330 .expect("finish promise is not set");
331
332 if self.readable.is_errored() {
334 rooted!(in(*cx) let mut error = UndefinedValue());
335 self.readable.get_stored_error(error.handle_mut());
336 finish_promise.reject(cx, error.handle(), can_gc);
337 } else {
338 self.readable.get_default_controller().close(can_gc);
341
342 finish_promise.resolve_native(&(), can_gc);
344 }
345 }
346}
347
348impl js::gc::Rootable for FlushPromiseRejection {}
349#[derive(JSTraceable, MallocSizeOf)]
353#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
354struct FlushPromiseRejection {
355 readable: Dom<ReadableStream>,
356 controller: Dom<TransformStreamDefaultController>,
357}
358
359impl Callback for FlushPromiseRejection {
360 fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
362 self.readable.get_default_controller().error(v, can_gc);
365
366 self.controller
368 .get_finish_promise()
369 .expect("finish promise is not set")
370 .reject(cx, v, can_gc);
371 }
372}
373
374impl js::gc::Rootable for CrossRealmTransform {}
375
376#[derive(Clone, JSTraceable, MallocSizeOf)]
379#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
380pub(crate) enum CrossRealmTransform {
381 Readable(CrossRealmTransformReadable),
383 Writable(CrossRealmTransformWritable),
385}
386
387#[dom_struct]
389pub struct TransformStream {
390 reflector_: Reflector,
391
392 backpressure: Cell<bool>,
394
395 #[ignore_malloc_size_of = "Rc is hard"]
397 backpressure_change_promise: DomRefCell<Option<Rc<Promise>>>,
398
399 controller: MutNullableDom<TransformStreamDefaultController>,
401
402 detached: Cell<bool>,
404
405 readable: MutNullableDom<ReadableStream>,
407
408 writable: MutNullableDom<WritableStream>,
410}
411
412impl TransformStream {
413 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
414 fn new_inherited() -> TransformStream {
416 TransformStream {
417 reflector_: Reflector::new(),
418 backpressure: Default::default(),
419 backpressure_change_promise: DomRefCell::new(None),
420 controller: MutNullableDom::new(None),
421 detached: Cell::new(false),
422 readable: MutNullableDom::new(None),
423 writable: MutNullableDom::new(None),
424 }
425 }
426
427 pub(crate) fn new_with_proto(
428 global: &GlobalScope,
429 proto: Option<SafeHandleObject>,
430 can_gc: CanGc,
431 ) -> DomRoot<TransformStream> {
432 reflect_dom_object_with_proto(
433 Box::new(TransformStream::new_inherited()),
434 global,
435 proto,
436 can_gc,
437 )
438 }
439
440 pub(crate) fn set_up(
443 &self,
444 cx: SafeJSContext,
445 global: &GlobalScope,
446 transformer_type: TransformerType,
447 can_gc: CanGc,
448 ) -> Fallible<()> {
449 let writable_high_water_mark = 1.0;
451
452 let writable_size_algorithm = extract_size_algorithm(&Default::default(), can_gc);
454
455 let readable_high_water_mark = 0.0;
457
458 let readable_size_algorithm = extract_size_algorithm(&Default::default(), can_gc);
460
461 let start_promise = Promise::new_resolved(global, cx, (), can_gc);
468
469 self.initialize(
473 cx,
474 global,
475 start_promise.clone(),
476 writable_high_water_mark,
477 writable_size_algorithm,
478 readable_high_water_mark,
479 readable_size_algorithm,
480 can_gc,
481 )?;
482
483 let controller = TransformStreamDefaultController::new(global, transformer_type, can_gc);
485
486 self.set_up_transform_stream_default_controller(&controller);
490
491 Ok(())
492 }
493
494 pub(crate) fn get_controller(&self) -> DomRoot<TransformStreamDefaultController> {
495 self.controller.get().expect("controller is not set")
496 }
497
498 pub(crate) fn get_writable(&self) -> DomRoot<WritableStream> {
499 self.writable.get().expect("writable stream is not set")
500 }
501
502 pub(crate) fn get_readable(&self) -> DomRoot<ReadableStream> {
503 self.readable.get().expect("readable stream is not set")
504 }
505
506 pub(crate) fn get_backpressure(&self) -> bool {
507 self.backpressure.get()
508 }
509
510 #[allow(clippy::too_many_arguments)]
512 fn initialize(
513 &self,
514 cx: SafeJSContext,
515 global: &GlobalScope,
516 start_promise: Rc<Promise>,
517 writable_high_water_mark: f64,
518 writable_size_algorithm: Rc<QueuingStrategySize>,
519 readable_high_water_mark: f64,
520 readable_size_algorithm: Rc<QueuingStrategySize>,
521 can_gc: CanGc,
522 ) -> Fallible<()> {
523 let writable = create_writable_stream(
535 cx,
536 global,
537 writable_high_water_mark,
538 writable_size_algorithm,
539 UnderlyingSinkType::Transform(Dom::from_ref(self), start_promise.clone()),
540 can_gc,
541 )?;
542 self.writable.set(Some(&writable));
543
544 let readable = create_readable_stream(
556 global,
557 UnderlyingSourceType::Transform(Dom::from_ref(self), start_promise.clone()),
558 Some(readable_size_algorithm),
559 Some(readable_high_water_mark),
560 can_gc,
561 );
562 self.readable.set(Some(&readable));
563
564 self.set_backpressure(global, true, can_gc);
569
570 self.controller.set(None);
572
573 Ok(())
574 }
575
576 pub(crate) fn set_backpressure(&self, global: &GlobalScope, backpressure: bool, can_gc: CanGc) {
578 assert!(self.backpressure.get() != backpressure);
580
581 if let Some(promise) = self.backpressure_change_promise.borrow_mut().take() {
584 promise.resolve_native(&(), can_gc);
585 }
586
587 *self.backpressure_change_promise.borrow_mut() = Some(Promise::new(global, can_gc));
589
590 self.backpressure.set(backpressure);
592 }
593
594 fn set_up_transform_stream_default_controller(
596 &self,
597 controller: &TransformStreamDefaultController,
598 ) {
599 assert!(self.controller.get().is_none());
604
605 controller.set_stream(self);
607
608 self.controller.set(Some(controller));
610
611 }
616
617 fn set_up_transform_stream_default_controller_from_transformer(
619 &self,
620 global: &GlobalScope,
621 transformer_obj: SafeHandleObject,
622 transformer: &Transformer,
623 can_gc: CanGc,
624 ) {
625 let transformer_type = TransformerType::new_from_js_transformer(transformer);
627 let controller = TransformStreamDefaultController::new(global, transformer_type, can_gc);
628
629 controller.set_transform_obj(transformer_obj);
652
653 self.set_up_transform_stream_default_controller(&controller);
656 }
657
658 pub(crate) fn transform_stream_default_sink_write_algorithm(
660 &self,
661 cx: SafeJSContext,
662 global: &GlobalScope,
663 chunk: SafeHandleValue,
664 can_gc: CanGc,
665 ) -> Fallible<Rc<Promise>> {
666 assert!(self.writable.get().is_some());
668
669 let controller = self.controller.get().expect("controller is not set");
671
672 if self.backpressure.get() {
674 let backpressure_change_promise = self.backpressure_change_promise.borrow();
676
677 assert!(backpressure_change_promise.is_some());
679
680 let result_promise = Promise::new(global, can_gc);
682 rooted!(in(*cx) let mut fulfillment_handler = Some(TransformBackPressureChangePromiseFulfillment {
683 controller: Dom::from_ref(&controller),
684 writable: Dom::from_ref(&self.writable.get().expect("writable stream")),
685 chunk: Heap::boxed(chunk.get()),
686 result_promise: result_promise.clone(),
687 }));
688
689 let handler = PromiseNativeHandler::new(
690 global,
691 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
692 Some(Box::new(BackpressureChangeRejection {
693 result_promise: result_promise.clone(),
694 })),
695 can_gc,
696 );
697 let realm = enter_realm(global);
698 let comp = InRealm::Entered(&realm);
699 backpressure_change_promise
700 .as_ref()
701 .expect("Promise must be some by now.")
702 .append_native_handler(&handler, comp, can_gc);
703
704 return Ok(result_promise);
705 }
706
707 controller.transform_stream_default_controller_perform_transform(cx, global, chunk, can_gc)
709 }
710
711 pub(crate) fn transform_stream_default_sink_abort_algorithm(
713 &self,
714 cx: SafeJSContext,
715 global: &GlobalScope,
716 reason: SafeHandleValue,
717 can_gc: CanGc,
718 ) -> Fallible<Rc<Promise>> {
719 let controller = self.controller.get().expect("controller is not set");
721
722 if let Some(finish_promise) = controller.get_finish_promise() {
724 return Ok(finish_promise);
725 }
726
727 let readable = self.readable.get().expect("readable stream is not set");
729
730 controller.set_finish_promise(Promise::new(global, can_gc));
732
733 let cancel_promise = controller.perform_cancel(cx, global, reason, can_gc)?;
735
736 controller.clear_algorithms();
738
739 let handler = PromiseNativeHandler::new(
741 global,
742 Some(Box::new(CancelPromiseFulfillment {
743 readable: Dom::from_ref(&readable),
744 controller: Dom::from_ref(&controller),
745 reason: Heap::boxed(reason.get()),
746 })),
747 Some(Box::new(CancelPromiseRejection {
748 readable: Dom::from_ref(&readable),
749 controller: Dom::from_ref(&controller),
750 })),
751 can_gc,
752 );
753 let realm = enter_realm(global);
754 let comp = InRealm::Entered(&realm);
755 cancel_promise.append_native_handler(&handler, comp, can_gc);
756
757 let finish_promise = controller
759 .get_finish_promise()
760 .expect("finish promise is not set");
761 Ok(finish_promise)
762 }
763
764 pub(crate) fn transform_stream_default_sink_close_algorithm(
766 &self,
767 cx: SafeJSContext,
768 global: &GlobalScope,
769 can_gc: CanGc,
770 ) -> Fallible<Rc<Promise>> {
771 let controller = self
773 .controller
774 .get()
775 .ok_or(Error::Type("controller is not set".to_string()))?;
776
777 if let Some(finish_promise) = controller.get_finish_promise() {
779 return Ok(finish_promise);
780 }
781
782 let readable = self
784 .readable
785 .get()
786 .ok_or(Error::Type("readable stream is not set".to_string()))?;
787
788 controller.set_finish_promise(Promise::new(global, can_gc));
790
791 let flush_promise = controller.perform_flush(cx, global, can_gc)?;
793
794 controller.clear_algorithms();
796
797 let handler = PromiseNativeHandler::new(
799 global,
800 Some(Box::new(FlushPromiseFulfillment {
801 readable: Dom::from_ref(&readable),
802 controller: Dom::from_ref(&controller),
803 })),
804 Some(Box::new(FlushPromiseRejection {
805 readable: Dom::from_ref(&readable),
806 controller: Dom::from_ref(&controller),
807 })),
808 can_gc,
809 );
810
811 let realm = enter_realm(global);
812 let comp = InRealm::Entered(&realm);
813 flush_promise.append_native_handler(&handler, comp, can_gc);
814 let finish_promise = controller
816 .get_finish_promise()
817 .expect("finish promise is not set");
818 Ok(finish_promise)
819 }
820
821 pub(crate) fn transform_stream_default_source_cancel(
823 &self,
824 cx: SafeJSContext,
825 global: &GlobalScope,
826 reason: SafeHandleValue,
827 can_gc: CanGc,
828 ) -> Fallible<Rc<Promise>> {
829 let controller = self
831 .controller
832 .get()
833 .ok_or(Error::Type("controller is not set".to_string()))?;
834
835 if let Some(finish_promise) = controller.get_finish_promise() {
837 return Ok(finish_promise);
838 }
839
840 let writable = self
842 .writable
843 .get()
844 .ok_or(Error::Type("writable stream is not set".to_string()))?;
845
846 controller.set_finish_promise(Promise::new(global, can_gc));
848
849 let cancel_promise = controller.perform_cancel(cx, global, reason, can_gc)?;
851
852 controller.clear_algorithms();
854
855 let handler = PromiseNativeHandler::new(
857 global,
858 Some(Box::new(SourceCancelPromiseFulfillment {
859 writeable: Dom::from_ref(&writable),
860 controller: Dom::from_ref(&controller),
861 stream: Dom::from_ref(self),
862 reason: Heap::boxed(reason.get()),
863 })),
864 Some(Box::new(SourceCancelPromiseRejection {
865 writeable: Dom::from_ref(&writable),
866 controller: Dom::from_ref(&controller),
867 stream: Dom::from_ref(self),
868 })),
869 can_gc,
870 );
871
872 let finish_promise = controller
874 .get_finish_promise()
875 .expect("finish promise is not set");
876 let realm = enter_realm(global);
877 let comp = InRealm::Entered(&realm);
878 cancel_promise.append_native_handler(&handler, comp, can_gc);
879 Ok(finish_promise)
880 }
881
882 pub(crate) fn transform_stream_default_source_pull(
884 &self,
885 global: &GlobalScope,
886 can_gc: CanGc,
887 ) -> Fallible<Rc<Promise>> {
888 assert!(self.backpressure.get());
890
891 assert!(self.backpressure_change_promise.borrow().is_some());
893
894 self.set_backpressure(global, false, can_gc);
896
897 Ok(self
899 .backpressure_change_promise
900 .borrow()
901 .clone()
902 .expect("Promise must be some by now."))
903 }
904
905 pub(crate) fn error_writable_and_unblock_write(
907 &self,
908 cx: SafeJSContext,
909 global: &GlobalScope,
910 error: SafeHandleValue,
911 can_gc: CanGc,
912 ) {
913 self.get_controller().clear_algorithms();
915
916 self.get_writable()
918 .get_default_controller()
919 .error_if_needed(cx, error, global, can_gc);
920
921 self.unblock_write(global, can_gc)
923 }
924
925 pub(crate) fn unblock_write(&self, global: &GlobalScope, can_gc: CanGc) {
927 if self.backpressure.get() {
929 self.set_backpressure(global, false, can_gc);
930 }
931 }
932
933 pub(crate) fn error(
935 &self,
936 cx: SafeJSContext,
937 global: &GlobalScope,
938 error: SafeHandleValue,
939 can_gc: CanGc,
940 ) {
941 self.get_readable()
943 .get_default_controller()
944 .error(error, can_gc);
945
946 self.error_writable_and_unblock_write(cx, global, error, can_gc);
948 }
949}
950
951#[allow(non_snake_case)]
952impl TransformStreamMethods<crate::DomTypeHolder> for TransformStream {
953 #[allow(unsafe_code)]
955 fn Constructor(
956 cx: SafeJSContext,
957 global: &GlobalScope,
958 proto: Option<SafeHandleObject>,
959 can_gc: CanGc,
960 transformer: Option<*mut JSObject>,
961 writable_strategy: &QueuingStrategy,
962 readable_strategy: &QueuingStrategy,
963 ) -> Fallible<DomRoot<TransformStream>> {
964 rooted!(in(*cx) let transformer_obj = transformer.unwrap_or(ptr::null_mut()));
966
967 let transformer_dict = if !transformer_obj.is_null() {
970 rooted!(in(*cx) let obj_val = ObjectValue(transformer_obj.get()));
971 match Transformer::new(cx, obj_val.handle()) {
972 Ok(ConversionResult::Success(val)) => val,
973 Ok(ConversionResult::Failure(error)) => return Err(Error::Type(error.to_string())),
974 _ => {
975 return Err(Error::JSFailed);
976 },
977 }
978 } else {
979 Transformer::empty()
980 };
981
982 if !transformer_dict.readableType.handle().is_undefined() {
984 return Err(Error::Range("readableType is set".to_string()));
985 }
986
987 if !transformer_dict.writableType.handle().is_undefined() {
989 return Err(Error::Range("writableType is set".to_string()));
990 }
991
992 let readable_high_water_mark = extract_high_water_mark(readable_strategy, 0.0)?;
994
995 let readable_size_algorithm = extract_size_algorithm(readable_strategy, can_gc);
997
998 let writable_high_water_mark = extract_high_water_mark(writable_strategy, 1.0)?;
1000
1001 let writable_size_algorithm = extract_size_algorithm(writable_strategy, can_gc);
1003
1004 let start_promise = Promise::new(global, can_gc);
1006
1007 let stream = TransformStream::new_with_proto(global, proto, can_gc);
1010 stream.initialize(
1011 cx,
1012 global,
1013 start_promise.clone(),
1014 writable_high_water_mark,
1015 writable_size_algorithm,
1016 readable_high_water_mark,
1017 readable_size_algorithm,
1018 can_gc,
1019 )?;
1020
1021 stream.set_up_transform_stream_default_controller_from_transformer(
1023 global,
1024 transformer_obj.handle(),
1025 &transformer_dict,
1026 can_gc,
1027 );
1028
1029 if let Some(start) = &transformer_dict.start {
1033 rooted!(in(*cx) let mut result_object = ptr::null_mut::<JSObject>());
1034 rooted!(in(*cx) let mut result: JSVal);
1035 rooted!(in(*cx) let this_object = transformer_obj.get());
1036 start.Call_(
1037 &this_object.handle(),
1038 &stream.get_controller(),
1039 result.handle_mut(),
1040 ExceptionHandling::Rethrow,
1041 can_gc,
1042 )?;
1043 let is_promise = unsafe {
1044 if result.is_object() {
1045 result_object.set(result.to_object());
1046 IsPromiseObject(result_object.handle().into_handle())
1047 } else {
1048 false
1049 }
1050 };
1051 let promise = if is_promise {
1052 Promise::new_with_js_promise(result_object.handle(), cx)
1053 } else {
1054 Promise::new_resolved(global, cx, result.get(), can_gc)
1055 };
1056 start_promise.resolve_native(&promise, can_gc);
1057 } else {
1058 start_promise.resolve_native(&(), can_gc);
1060 };
1061
1062 Ok(stream)
1063 }
1064
1065 fn Readable(&self) -> DomRoot<ReadableStream> {
1067 self.readable.get().expect("readable stream is not set")
1069 }
1070
1071 fn Writable(&self) -> DomRoot<WritableStream> {
1073 self.writable.get().expect("writable stream is not set")
1075 }
1076}
1077
1078impl Transferable for TransformStream {
1080 type Index = MessagePortIndex;
1081 type Data = TransformStreamData;
1082
1083 fn transfer(&self) -> Fallible<(MessagePortId, TransformStreamData)> {
1085 let global = self.global();
1086 let realm = enter_realm(&*global);
1087 let comp = InRealm::Entered(&realm);
1088 let cx = GlobalScope::get_cx();
1089 let can_gc = CanGc::note();
1090
1091 let readable = self.get_readable();
1093
1094 let writable = self.get_writable();
1096
1097 if readable.is_locked() || writable.is_locked() {
1102 return Err(Error::DataClone(None));
1103 }
1104
1105 let port1 = MessagePort::new(&global, can_gc);
1107 global.track_message_port(&port1, None);
1108 let port1_peer = MessagePort::new(&global, can_gc);
1109 global.track_message_port(&port1_peer, None);
1110 global.entangle_ports(*port1.message_port_id(), *port1_peer.message_port_id());
1111
1112 let proxy_readable = ReadableStream::new_with_proto(&global, None, can_gc);
1113 proxy_readable.setup_cross_realm_transform_readable(cx, &port1, can_gc);
1114 proxy_readable
1115 .pipe_to(
1116 cx, &global, &writable, false, false, false, None, comp, can_gc,
1117 )
1118 .set_promise_is_handled();
1119
1120 let port2 = MessagePort::new(&global, can_gc);
1122 global.track_message_port(&port2, None);
1123 let port2_peer = MessagePort::new(&global, can_gc);
1124 global.track_message_port(&port2_peer, None);
1125 global.entangle_ports(*port2.message_port_id(), *port2_peer.message_port_id());
1126
1127 let proxy_writable = WritableStream::new_with_proto(&global, None, can_gc);
1128 proxy_writable.setup_cross_realm_transform_writable(cx, &port2, can_gc);
1129
1130 readable
1132 .pipe_to(
1133 cx,
1134 &global,
1135 &proxy_writable,
1136 false,
1137 false,
1138 false,
1139 None,
1140 comp,
1141 can_gc,
1142 )
1143 .set_promise_is_handled();
1144
1145 Ok((
1150 *port1_peer.message_port_id(),
1151 TransformStreamData {
1152 readable: port1_peer.transfer()?,
1153 writable: port2_peer.transfer()?,
1154 },
1155 ))
1156 }
1157
1158 fn transfer_receive(
1160 owner: &GlobalScope,
1161 _id: MessagePortId,
1162 data: TransformStreamData,
1163 ) -> Result<DomRoot<Self>, ()> {
1164 let can_gc = CanGc::note();
1165 let cx = GlobalScope::get_cx();
1166
1167 let port1 = MessagePort::transfer_receive(owner, data.readable.0, data.readable.1)?;
1168 let port2 = MessagePort::transfer_receive(owner, data.writable.0, data.writable.1)?;
1169
1170 let proxy_readable = ReadableStream::new_with_proto(owner, None, can_gc);
1174 proxy_readable.setup_cross_realm_transform_readable(cx, &port2, can_gc);
1175
1176 let proxy_writable = WritableStream::new_with_proto(owner, None, can_gc);
1180 proxy_writable.setup_cross_realm_transform_writable(cx, &port1, can_gc);
1181
1182 let stream = TransformStream::new_with_proto(owner, None, can_gc);
1188 stream.readable.set(Some(&proxy_readable));
1189 stream.writable.set(Some(&proxy_writable));
1190
1191 Ok(stream)
1192 }
1193
1194 fn serialized_storage<'a>(
1195 data: StructuredData<'a, '_>,
1196 ) -> &'a mut Option<HashMap<MessagePortId, Self::Data>> {
1197 match data {
1198 StructuredData::Reader(r) => &mut r.transform_streams_port_impls,
1199 StructuredData::Writer(w) => &mut w.transform_streams_port,
1200 }
1201 }
1202}