1use std::cell::Cell;
6use std::ptr::{self};
7use std::rc::Rc;
8
9use dom_struct::dom_struct;
10use js::context::JSContext;
11use js::jsapi::{Heap, IsPromiseObject, JSObject};
12use js::jsval::{JSVal, ObjectValue, UndefinedValue};
13use js::realm::CurrentRealm;
14use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle};
15use rustc_hash::FxHashMap;
16use script_bindings::callback::ExceptionHandling;
17use script_bindings::cell::DomRefCell;
18use script_bindings::reflector::{Reflector, reflect_dom_object_with_proto_and_cx};
19use servo_base::id::{MessagePortId, MessagePortIndex};
20use servo_constellation_traits::TransformStreamData;
21
22use super::readablestream::CrossRealmTransformReadable;
23use super::writablestream::CrossRealmTransformWritable;
24use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::{
25 QueuingStrategy, QueuingStrategySize,
26};
27use crate::dom::bindings::codegen::Bindings::TransformStreamBinding::TransformStreamMethods;
28use crate::dom::bindings::codegen::Bindings::TransformerBinding::Transformer;
29use crate::dom::bindings::conversions::ConversionResult;
30use crate::dom::bindings::error::{Error, Fallible};
31use crate::dom::bindings::reflector::DomGlobal;
32use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
33use crate::dom::bindings::structuredclone::StructuredData;
34use crate::dom::bindings::transferable::Transferable;
35use crate::dom::globalscope::GlobalScope;
36use crate::dom::messageport::MessagePort;
37use crate::dom::promise::Promise;
38use crate::dom::promisenativehandler::Callback;
39use crate::dom::readablestream::{ReadableStream, create_readable_stream};
40use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
41use crate::dom::stream::transformstreamdefaultcontroller::TransformerType;
42use crate::dom::stream::underlyingsourcecontainer::UnderlyingSourceType;
43use crate::dom::stream::writablestream::create_writable_stream;
44use crate::dom::stream::writablestreamdefaultcontroller::UnderlyingSinkType;
45use crate::dom::types::{PromiseNativeHandler, TransformStreamDefaultController, WritableStream};
46use crate::realms::enter_auto_realm;
47
48impl js::gc::Rootable for TransformBackPressureChangePromiseFulfillment {}
49
50#[derive(JSTraceable, MallocSizeOf)]
53#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
54struct TransformBackPressureChangePromiseFulfillment {
55 #[conditional_malloc_size_of]
57 result_promise: Rc<Promise>,
58
59 #[ignore_malloc_size_of = "mozjs"]
60 chunk: Box<Heap<JSVal>>,
61
62 writable: Dom<WritableStream>,
64
65 controller: Dom<TransformStreamDefaultController>,
66}
67
68impl Callback for TransformBackPressureChangePromiseFulfillment {
69 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
71 if self.writable.is_erroring() {
75 rooted!(&in(cx) let mut error = UndefinedValue());
76 self.writable.get_stored_error(error.handle_mut());
77 self.result_promise.reject(cx, error.handle());
78 return;
79 }
80
81 assert!(self.writable.is_writable());
83
84 rooted!(&in(cx) let mut chunk = UndefinedValue());
86 chunk.set(self.chunk.get());
87 let transform_result = self
88 .controller
89 .transform_stream_default_controller_perform_transform(
90 cx,
91 &self.writable.global(),
92 chunk.handle(),
93 )
94 .expect("perform transform failed");
95
96 let handler = PromiseNativeHandler::new(
99 cx,
100 &self.writable.global(),
101 Some(Box::new(PerformTransformFulfillment {
102 result_promise: self.result_promise.clone(),
103 })),
104 Some(Box::new(PerformTransformRejection {
105 result_promise: self.result_promise.clone(),
106 })),
107 );
108
109 let mut realm = enter_auto_realm(cx, &*self.writable.global());
110 let realm = &mut realm.current_realm();
111 transform_result.append_native_handler(realm, &handler);
112 }
113}
114
115#[derive(JSTraceable, MallocSizeOf)]
116#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
117struct PerformTransformFulfillment {
120 #[conditional_malloc_size_of]
121 result_promise: Rc<Promise>,
122}
123
124impl Callback for PerformTransformFulfillment {
125 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
126 self.result_promise.resolve_native(cx, &());
128 }
129}
130
131#[derive(JSTraceable, MallocSizeOf)]
132#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
133struct PerformTransformRejection {
136 #[conditional_malloc_size_of]
137 result_promise: Rc<Promise>,
138}
139
140impl Callback for PerformTransformRejection {
141 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
142 self.result_promise.reject(cx, v);
144 }
145}
146
147#[derive(JSTraceable, MallocSizeOf)]
148#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
149struct BackpressureChangeRejection {
152 #[conditional_malloc_size_of]
153 result_promise: Rc<Promise>,
154}
155
156impl Callback for BackpressureChangeRejection {
157 fn callback(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) {
158 self.result_promise.reject(cx, reason);
159 }
160}
161
162impl js::gc::Rootable for CancelPromiseFulfillment {}
163
164#[derive(JSTraceable, MallocSizeOf)]
167#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
168struct CancelPromiseFulfillment {
169 readable: Dom<ReadableStream>,
170 controller: Dom<TransformStreamDefaultController>,
171 #[ignore_malloc_size_of = "mozjs"]
172 reason: Box<Heap<JSVal>>,
173}
174
175impl Callback for CancelPromiseFulfillment {
176 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
178 if self.readable.is_errored() {
180 rooted!(&in(cx) let mut error = UndefinedValue());
181 self.readable.get_stored_error(error.handle_mut());
182 self.controller
183 .get_finish_promise()
184 .expect("finish promise is not set")
185 .reject_native(cx, &error.handle());
186 } else {
187 rooted!(&in(cx) let mut reason = UndefinedValue());
190 reason.set(self.reason.get());
191 self.readable
192 .get_default_controller()
193 .error(cx, reason.handle());
194
195 self.controller
197 .get_finish_promise()
198 .expect("finish promise is not set")
199 .resolve_native(cx, &());
200 }
201 }
202}
203
204impl js::gc::Rootable for CancelPromiseRejection {}
205
206#[derive(JSTraceable, MallocSizeOf)]
209#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
210struct CancelPromiseRejection {
211 readable: Dom<ReadableStream>,
212 controller: Dom<TransformStreamDefaultController>,
213}
214
215impl Callback for CancelPromiseRejection {
216 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
218 self.readable.get_default_controller().error(cx, v);
220
221 self.controller
223 .get_finish_promise()
224 .expect("finish promise is not set")
225 .reject(cx, v);
226 }
227}
228
229impl js::gc::Rootable for SourceCancelPromiseFulfillment {}
230
231#[derive(JSTraceable, MallocSizeOf)]
234#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
235struct SourceCancelPromiseFulfillment {
236 writeable: Dom<WritableStream>,
237 controller: Dom<TransformStreamDefaultController>,
238 stream: Dom<TransformStream>,
239 #[ignore_malloc_size_of = "mozjs"]
240 reason: Box<Heap<JSVal>>,
241}
242
243impl Callback for SourceCancelPromiseFulfillment {
244 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
246 let finish_promise = self
248 .controller
249 .get_finish_promise()
250 .expect("finish promise is not set");
251
252 let global = &self.writeable.global();
253 if self.writeable.is_errored() {
255 rooted!(&in(cx) let mut error = UndefinedValue());
256 self.writeable.get_stored_error(error.handle_mut());
257 finish_promise.reject(cx, error.handle());
258 } else {
259 rooted!(&in(cx) let mut reason = UndefinedValue());
262 reason.set(self.reason.get());
263 self.writeable
264 .get_default_controller()
265 .error_if_needed(cx, reason.handle(), global);
266
267 self.stream.unblock_write(cx, global);
269
270 finish_promise.resolve_native(cx, &());
272 }
273 }
274}
275
276impl js::gc::Rootable for SourceCancelPromiseRejection {}
277
278#[derive(JSTraceable, MallocSizeOf)]
281#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
282struct SourceCancelPromiseRejection {
283 writeable: Dom<WritableStream>,
284 controller: Dom<TransformStreamDefaultController>,
285 stream: Dom<TransformStream>,
286}
287
288impl Callback for SourceCancelPromiseRejection {
289 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
291 let global = &self.writeable.global();
293
294 self.writeable
295 .get_default_controller()
296 .error_if_needed(cx, v, global);
297
298 self.stream.unblock_write(cx, global);
300
301 self.controller
303 .get_finish_promise()
304 .expect("finish promise is not set")
305 .reject(cx, v);
306 }
307}
308
309impl js::gc::Rootable for FlushPromiseFulfillment {}
310
311#[derive(JSTraceable, MallocSizeOf)]
314#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
315struct FlushPromiseFulfillment {
316 readable: Dom<ReadableStream>,
317 controller: Dom<TransformStreamDefaultController>,
318}
319
320impl Callback for FlushPromiseFulfillment {
321 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
323 let finish_promise = self
325 .controller
326 .get_finish_promise()
327 .expect("finish promise is not set");
328
329 if self.readable.is_errored() {
331 rooted!(&in(cx) let mut error = UndefinedValue());
332 self.readable.get_stored_error(error.handle_mut());
333 finish_promise.reject(cx, error.handle());
334 } else {
335 self.readable.get_default_controller().close(cx);
338
339 finish_promise.resolve_native(cx, &());
341 }
342 }
343}
344
345impl js::gc::Rootable for FlushPromiseRejection {}
346#[derive(JSTraceable, MallocSizeOf)]
350#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
351struct FlushPromiseRejection {
352 readable: Dom<ReadableStream>,
353 controller: Dom<TransformStreamDefaultController>,
354}
355
356impl Callback for FlushPromiseRejection {
357 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
359 self.readable.get_default_controller().error(cx, v);
362
363 self.controller
365 .get_finish_promise()
366 .expect("finish promise is not set")
367 .reject(cx, v);
368 }
369}
370
371impl js::gc::Rootable for CrossRealmTransform {}
372
373#[derive(Clone, JSTraceable, MallocSizeOf)]
376#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
377pub(crate) enum CrossRealmTransform {
378 Readable(CrossRealmTransformReadable),
380 Writable(CrossRealmTransformWritable),
382}
383
384#[dom_struct]
386pub struct TransformStream {
387 reflector_: Reflector,
388
389 backpressure: Cell<bool>,
391
392 #[conditional_malloc_size_of]
394 backpressure_change_promise: DomRefCell<Option<Rc<Promise>>>,
395
396 controller: MutNullableDom<TransformStreamDefaultController>,
398
399 detached: Cell<bool>,
401
402 readable: MutNullableDom<ReadableStream>,
404
405 writable: MutNullableDom<WritableStream>,
407}
408
409impl TransformStream {
410 fn new_inherited() -> TransformStream {
412 TransformStream {
413 reflector_: Reflector::new(),
414 backpressure: Default::default(),
415 backpressure_change_promise: DomRefCell::new(None),
416 controller: MutNullableDom::new(None),
417 detached: Cell::new(false),
418 readable: MutNullableDom::new(None),
419 writable: MutNullableDom::new(None),
420 }
421 }
422
423 pub(crate) fn new_with_proto(
424 cx: &mut JSContext,
425 global: &GlobalScope,
426 proto: Option<SafeHandleObject>,
427 ) -> DomRoot<TransformStream> {
428 reflect_dom_object_with_proto_and_cx(
429 Box::new(TransformStream::new_inherited()),
430 global,
431 proto,
432 cx,
433 )
434 }
435
436 pub(crate) fn set_up(
439 &self,
440 cx: &mut JSContext,
441 global: &GlobalScope,
442 transformer_type: TransformerType,
443 ) -> Fallible<()> {
444 let writable_high_water_mark = 1.0;
446
447 let writable_size_algorithm = extract_size_algorithm(cx, &Default::default());
449
450 let readable_high_water_mark = 0.0;
452
453 let readable_size_algorithm = extract_size_algorithm(cx, &Default::default());
455
456 let start_promise = Promise::new_resolved(cx, global, ());
463
464 self.initialize(
468 cx,
469 global,
470 start_promise,
471 writable_high_water_mark,
472 writable_size_algorithm,
473 readable_high_water_mark,
474 readable_size_algorithm,
475 )?;
476
477 let controller = TransformStreamDefaultController::new(cx, global, transformer_type);
479
480 self.set_up_transform_stream_default_controller(&controller);
484
485 Ok(())
486 }
487
488 pub(crate) fn get_controller(&self) -> DomRoot<TransformStreamDefaultController> {
489 self.controller.get().expect("controller is not set")
490 }
491
492 pub(crate) fn get_writable(&self) -> DomRoot<WritableStream> {
493 self.writable.get().expect("writable stream is not set")
494 }
495
496 pub(crate) fn get_readable(&self) -> DomRoot<ReadableStream> {
497 self.readable.get().expect("readable stream is not set")
498 }
499
500 pub(crate) fn get_backpressure(&self) -> bool {
501 self.backpressure.get()
502 }
503
504 #[expect(clippy::too_many_arguments)]
506 fn initialize(
507 &self,
508 cx: &mut JSContext,
509 global: &GlobalScope,
510 start_promise: Rc<Promise>,
511 writable_high_water_mark: f64,
512 writable_size_algorithm: Rc<QueuingStrategySize>,
513 readable_high_water_mark: f64,
514 readable_size_algorithm: Rc<QueuingStrategySize>,
515 ) -> Fallible<()> {
516 let writable = create_writable_stream(
528 cx,
529 global,
530 writable_high_water_mark,
531 writable_size_algorithm,
532 UnderlyingSinkType::Transform(Dom::from_ref(self), start_promise.clone()),
533 )?;
534 self.writable.set(Some(&writable));
535
536 let readable = create_readable_stream(
548 cx,
549 global,
550 UnderlyingSourceType::Transform(self, start_promise),
551 Some(readable_size_algorithm),
552 Some(readable_high_water_mark),
553 );
554 self.readable.set(Some(&readable));
555
556 self.set_backpressure(cx, global, true);
561
562 self.controller.set(None);
564
565 Ok(())
566 }
567
568 pub(crate) fn set_backpressure(
570 &self,
571 cx: &mut JSContext,
572 global: &GlobalScope,
573 backpressure: bool,
574 ) {
575 assert!(self.backpressure.get() != backpressure);
577
578 if let Some(promise) = self.backpressure_change_promise.borrow_mut().take() {
581 promise.resolve_native(cx, &());
582 }
583
584 *self.backpressure_change_promise.borrow_mut() = Some(Promise::new(cx, global));
586
587 self.backpressure.set(backpressure);
589 }
590
591 fn set_up_transform_stream_default_controller(
593 &self,
594 controller: &TransformStreamDefaultController,
595 ) {
596 assert!(self.controller.get().is_none());
601
602 controller.set_stream(self);
604
605 self.controller.set(Some(controller));
607
608 }
613
614 fn set_up_transform_stream_default_controller_from_transformer(
616 &self,
617 cx: &mut JSContext,
618 global: &GlobalScope,
619 transformer_obj: SafeHandleObject,
620 transformer: &Transformer,
621 ) {
622 let transformer_type = TransformerType::new_from_js_transformer(transformer);
624 let controller = TransformStreamDefaultController::new(cx, global, transformer_type);
625
626 controller.set_transform_obj(transformer_obj);
649
650 self.set_up_transform_stream_default_controller(&controller);
653 }
654
655 pub(crate) fn transform_stream_default_sink_write_algorithm(
657 &self,
658 cx: &mut JSContext,
659 global: &GlobalScope,
660 chunk: SafeHandleValue,
661 ) -> Fallible<Rc<Promise>> {
662 assert!(self.writable.get().is_some());
664
665 let controller = self.controller.get().expect("controller is not set");
667
668 if self.backpressure.get() {
670 let backpressure_change_promise = self.backpressure_change_promise.borrow();
672
673 assert!(backpressure_change_promise.is_some());
675
676 let result_promise = Promise::new(cx, global);
678 rooted!(&in(cx) let mut fulfillment_handler = Some(TransformBackPressureChangePromiseFulfillment {
679 controller: Dom::from_ref(&controller),
680 writable: Dom::from_ref(&self.writable.get().expect("writable stream")),
681 chunk: Heap::boxed(chunk.get()),
682 result_promise: result_promise.clone(),
683 }));
684
685 let handler = PromiseNativeHandler::new(
686 cx,
687 global,
688 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
689 Some(Box::new(BackpressureChangeRejection {
690 result_promise: result_promise.clone(),
691 })),
692 );
693 let mut realm = enter_auto_realm(cx, global);
694 let realm = &mut realm.current_realm();
695 backpressure_change_promise
696 .as_ref()
697 .expect("Promise must be some by now.")
698 .append_native_handler(realm, &handler);
699
700 return Ok(result_promise);
701 }
702
703 controller.transform_stream_default_controller_perform_transform(cx, global, chunk)
705 }
706
707 pub(crate) fn transform_stream_default_sink_abort_algorithm(
709 &self,
710 cx: &mut JSContext,
711 global: &GlobalScope,
712 reason: SafeHandleValue,
713 ) -> Fallible<Rc<Promise>> {
714 let controller = self.controller.get().expect("controller is not set");
716
717 if let Some(finish_promise) = controller.get_finish_promise() {
719 return Ok(finish_promise);
720 }
721
722 let readable = self.readable.get().expect("readable stream is not set");
724
725 controller.set_finish_promise(Promise::new(cx, global));
727
728 let cancel_promise = controller.perform_cancel(cx, global, reason)?;
730
731 controller.clear_algorithms();
733
734 let handler = PromiseNativeHandler::new(
736 cx,
737 global,
738 Some(Box::new(CancelPromiseFulfillment {
739 readable: Dom::from_ref(&readable),
740 controller: Dom::from_ref(&controller),
741 reason: Heap::boxed(reason.get()),
742 })),
743 Some(Box::new(CancelPromiseRejection {
744 readable: Dom::from_ref(&readable),
745 controller: Dom::from_ref(&controller),
746 })),
747 );
748 let mut realm = enter_auto_realm(cx, global);
749 let cx = &mut realm.current_realm();
750 cancel_promise.append_native_handler(cx, &handler);
751
752 let finish_promise = controller
754 .get_finish_promise()
755 .expect("finish promise is not set");
756 Ok(finish_promise)
757 }
758
759 pub(crate) fn transform_stream_default_sink_close_algorithm(
761 &self,
762 cx: &mut JSContext,
763 global: &GlobalScope,
764 ) -> Fallible<Rc<Promise>> {
765 let controller = self
767 .controller
768 .get()
769 .ok_or(Error::Type(c"controller is not set".to_owned()))?;
770
771 if let Some(finish_promise) = controller.get_finish_promise() {
773 return Ok(finish_promise);
774 }
775
776 let readable = self
778 .readable
779 .get()
780 .ok_or(Error::Type(c"readable stream is not set".to_owned()))?;
781
782 controller.set_finish_promise(Promise::new(cx, global));
784
785 let flush_promise = controller.perform_flush(cx, global)?;
787
788 controller.clear_algorithms();
790
791 let handler = PromiseNativeHandler::new(
793 cx,
794 global,
795 Some(Box::new(FlushPromiseFulfillment {
796 readable: Dom::from_ref(&readable),
797 controller: Dom::from_ref(&controller),
798 })),
799 Some(Box::new(FlushPromiseRejection {
800 readable: Dom::from_ref(&readable),
801 controller: Dom::from_ref(&controller),
802 })),
803 );
804
805 let mut realm = enter_auto_realm(cx, global);
806 let realm = &mut realm.current_realm();
807 flush_promise.append_native_handler(realm, &handler);
808 let finish_promise = controller
810 .get_finish_promise()
811 .expect("finish promise is not set");
812 Ok(finish_promise)
813 }
814
815 pub(crate) fn transform_stream_default_source_cancel(
817 &self,
818 cx: &mut JSContext,
819 global: &GlobalScope,
820 reason: SafeHandleValue,
821 ) -> Fallible<Rc<Promise>> {
822 let controller = self
824 .controller
825 .get()
826 .ok_or(Error::Type(c"controller is not set".to_owned()))?;
827
828 if let Some(finish_promise) = controller.get_finish_promise() {
830 return Ok(finish_promise);
831 }
832
833 let writable = self
835 .writable
836 .get()
837 .ok_or(Error::Type(c"writable stream is not set".to_owned()))?;
838
839 controller.set_finish_promise(Promise::new(cx, global));
841
842 let cancel_promise = controller.perform_cancel(cx, global, reason)?;
844
845 controller.clear_algorithms();
847
848 let handler = PromiseNativeHandler::new(
850 cx,
851 global,
852 Some(Box::new(SourceCancelPromiseFulfillment {
853 writeable: Dom::from_ref(&writable),
854 controller: Dom::from_ref(&controller),
855 stream: Dom::from_ref(self),
856 reason: Heap::boxed(reason.get()),
857 })),
858 Some(Box::new(SourceCancelPromiseRejection {
859 writeable: Dom::from_ref(&writable),
860 controller: Dom::from_ref(&controller),
861 stream: Dom::from_ref(self),
862 })),
863 );
864
865 let finish_promise = controller
867 .get_finish_promise()
868 .expect("finish promise is not set");
869 let mut realm = enter_auto_realm(cx, global);
870 let cx = &mut realm.current_realm();
871 cancel_promise.append_native_handler(cx, &handler);
872 Ok(finish_promise)
873 }
874
875 pub(crate) fn transform_stream_default_source_pull(
877 &self,
878 cx: &mut JSContext,
879 global: &GlobalScope,
880 ) -> Fallible<Rc<Promise>> {
881 assert!(self.backpressure.get());
883
884 assert!(self.backpressure_change_promise.borrow().is_some());
886
887 self.set_backpressure(cx, global, false);
889
890 Ok(self
892 .backpressure_change_promise
893 .borrow()
894 .clone()
895 .expect("Promise must be some by now."))
896 }
897
898 pub(crate) fn error_writable_and_unblock_write(
900 &self,
901 cx: &mut JSContext,
902 global: &GlobalScope,
903 error: SafeHandleValue,
904 ) {
905 self.get_controller().clear_algorithms();
907
908 self.get_writable()
910 .get_default_controller()
911 .error_if_needed(cx, error, global);
912
913 self.unblock_write(cx, global)
915 }
916
917 pub(crate) fn unblock_write(&self, cx: &mut JSContext, global: &GlobalScope) {
919 if self.backpressure.get() {
921 self.set_backpressure(cx, global, false);
922 }
923 }
924
925 pub(crate) fn error(&self, cx: &mut JSContext, global: &GlobalScope, error: SafeHandleValue) {
927 self.get_readable()
929 .get_default_controller()
930 .error(cx, error);
931
932 self.error_writable_and_unblock_write(cx, global, error);
934 }
935}
936
937impl TransformStreamMethods<crate::DomTypeHolder> for TransformStream {
938 #[expect(unsafe_code)]
940 fn Constructor(
941 cx: &mut JSContext,
942 global: &GlobalScope,
943 proto: Option<SafeHandleObject>,
944 transformer: Option<*mut JSObject>,
945 writable_strategy: &QueuingStrategy,
946 readable_strategy: &QueuingStrategy,
947 ) -> Fallible<DomRoot<TransformStream>> {
948 rooted!(&in(cx) let transformer_obj = transformer.unwrap_or(ptr::null_mut()));
950
951 let transformer_dict = if !transformer_obj.is_null() {
954 rooted!(&in(cx) let obj_val = ObjectValue(transformer_obj.get()));
955 match Transformer::new(cx, obj_val.handle()) {
956 Ok(ConversionResult::Success(val)) => val,
957 Ok(ConversionResult::Failure(error)) => {
958 return Err(Error::Type(error.into_owned()));
959 },
960 _ => {
961 return Err(Error::JSFailed);
962 },
963 }
964 } else {
965 Transformer::empty()
966 };
967
968 if !transformer_dict.readableType.handle().is_undefined() {
970 return Err(Error::Range(c"readableType is set".to_owned()));
971 }
972
973 if !transformer_dict.writableType.handle().is_undefined() {
975 return Err(Error::Range(c"writableType is set".to_owned()));
976 }
977
978 let readable_high_water_mark = extract_high_water_mark(readable_strategy, 0.0)?;
980
981 let readable_size_algorithm = extract_size_algorithm(cx, readable_strategy);
983
984 let writable_high_water_mark = extract_high_water_mark(writable_strategy, 1.0)?;
986
987 let writable_size_algorithm = extract_size_algorithm(cx, writable_strategy);
989
990 let start_promise = Promise::new(cx, global);
992
993 let stream = TransformStream::new_with_proto(cx, global, proto);
996 stream.initialize(
997 cx,
998 global,
999 start_promise.clone(),
1000 writable_high_water_mark,
1001 writable_size_algorithm,
1002 readable_high_water_mark,
1003 readable_size_algorithm,
1004 )?;
1005
1006 stream.set_up_transform_stream_default_controller_from_transformer(
1008 cx,
1009 global,
1010 transformer_obj.handle(),
1011 &transformer_dict,
1012 );
1013
1014 if let Some(start) = &transformer_dict.start {
1018 rooted!(&in(cx) let mut result_object = ptr::null_mut::<JSObject>());
1019 rooted!(&in(cx) let mut result: JSVal);
1020 rooted!(&in(cx) let this_object = transformer_obj.get());
1021 start.Call_(
1022 cx,
1023 &this_object.handle(),
1024 &stream.get_controller(),
1025 result.handle_mut(),
1026 ExceptionHandling::Rethrow,
1027 )?;
1028 let is_promise = unsafe {
1029 if result.is_object() {
1030 result_object.set(result.to_object());
1031 IsPromiseObject(result_object.handle().into_handle())
1032 } else {
1033 false
1034 }
1035 };
1036 let promise = if is_promise {
1037 Promise::new_with_js_promise(cx, result_object.handle())
1038 } else {
1039 Promise::new_resolved(cx, global, result.get())
1040 };
1041 start_promise.resolve_native(cx, &promise);
1042 } else {
1043 start_promise.resolve_native(cx, &());
1045 };
1046
1047 Ok(stream)
1048 }
1049
1050 fn Readable(&self) -> DomRoot<ReadableStream> {
1052 self.readable.get().expect("readable stream is not set")
1054 }
1055
1056 fn Writable(&self) -> DomRoot<WritableStream> {
1058 self.writable.get().expect("writable stream is not set")
1060 }
1061}
1062
1063impl Transferable for TransformStream {
1065 type Index = MessagePortIndex;
1066 type Data = TransformStreamData;
1067
1068 fn transfer(&self, cx: &mut JSContext) -> Fallible<(MessagePortId, TransformStreamData)> {
1070 let global = self.global();
1071 let mut realm = enter_auto_realm(cx, &*global);
1072 let mut realm = realm.current_realm();
1073 let cx = &mut realm;
1074
1075 let readable = self.get_readable();
1077
1078 let writable = self.get_writable();
1080
1081 if readable.is_locked() || writable.is_locked() {
1086 return Err(Error::DataClone(None));
1087 }
1088
1089 let port1 = MessagePort::new(cx, &global);
1091 global.track_message_port(&port1, None);
1092 let port1_peer = MessagePort::new(cx, &global);
1093 global.track_message_port(&port1_peer, None);
1094 global.entangle_ports(*port1.message_port_id(), *port1_peer.message_port_id());
1095
1096 let proxy_readable = ReadableStream::new_with_proto(cx, &global, None);
1097 proxy_readable.setup_cross_realm_transform_readable(cx, &port1);
1098 proxy_readable
1099 .pipe_to(cx, &global, &writable, false, false, false, None)
1100 .set_promise_is_handled(cx);
1101
1102 let port2 = MessagePort::new(cx, &global);
1104 global.track_message_port(&port2, None);
1105 let port2_peer = MessagePort::new(cx, &global);
1106 global.track_message_port(&port2_peer, None);
1107 global.entangle_ports(*port2.message_port_id(), *port2_peer.message_port_id());
1108
1109 let proxy_writable = WritableStream::new_with_proto(cx, &global, None);
1110 proxy_writable.setup_cross_realm_transform_writable(cx, &port2);
1111
1112 readable
1114 .pipe_to(cx, &global, &proxy_writable, false, false, false, None)
1115 .set_promise_is_handled(cx);
1116
1117 Ok((
1122 *port1_peer.message_port_id(),
1123 TransformStreamData {
1124 readable: port1_peer.transfer(cx)?,
1125 writable: port2_peer.transfer(cx)?,
1126 },
1127 ))
1128 }
1129
1130 fn transfer_receive(
1132 cx: &mut JSContext,
1133 owner: &GlobalScope,
1134 _id: MessagePortId,
1135 data: TransformStreamData,
1136 ) -> Result<DomRoot<Self>, ()> {
1137 let port1 = MessagePort::transfer_receive(cx, owner, data.readable.0, data.readable.1)?;
1138 let port2 = MessagePort::transfer_receive(cx, owner, data.writable.0, data.writable.1)?;
1139
1140 let proxy_readable = ReadableStream::new_with_proto(cx, owner, None);
1144 proxy_readable.setup_cross_realm_transform_readable(cx, &port2);
1145
1146 let proxy_writable = WritableStream::new_with_proto(cx, owner, None);
1150 proxy_writable.setup_cross_realm_transform_writable(cx, &port1);
1151
1152 let stream = TransformStream::new_with_proto(cx, owner, None);
1158 stream.readable.set(Some(&proxy_readable));
1159 stream.writable.set(Some(&proxy_writable));
1160
1161 Ok(stream)
1162 }
1163
1164 fn serialized_storage<'a>(
1165 data: StructuredData<'a, '_>,
1166 ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
1167 match data {
1168 StructuredData::Reader(r) => &mut r.transform_streams_port_impls,
1169 StructuredData::Writer(w) => &mut w.transform_streams_port,
1170 }
1171 }
1172}