1use std::cell::Cell;
6use std::ptr::{self};
7use std::rc::Rc;
8
9use dom_struct::dom_struct;
10use js::context::JSContext;
11use js::jsapi::{Heap, IsPromiseObject, JSObject};
12use js::jsval::{JSVal, ObjectValue, UndefinedValue};
13use js::realm::CurrentRealm;
14use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle};
15use rustc_hash::FxHashMap;
16use script_bindings::callback::ExceptionHandling;
17use script_bindings::cell::DomRefCell;
18use script_bindings::reflector::{Reflector, reflect_dom_object_with_proto};
19use servo_base::id::{MessagePortId, MessagePortIndex};
20use servo_constellation_traits::TransformStreamData;
21
22use super::readablestream::CrossRealmTransformReadable;
23use super::writablestream::CrossRealmTransformWritable;
24use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::{
25 QueuingStrategy, QueuingStrategySize,
26};
27use crate::dom::bindings::codegen::Bindings::TransformStreamBinding::TransformStreamMethods;
28use crate::dom::bindings::codegen::Bindings::TransformerBinding::Transformer;
29use crate::dom::bindings::conversions::ConversionResult;
30use crate::dom::bindings::error::{Error, Fallible};
31use crate::dom::bindings::reflector::DomGlobal;
32use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
33use crate::dom::bindings::structuredclone::StructuredData;
34use crate::dom::bindings::transferable::Transferable;
35use crate::dom::globalscope::GlobalScope;
36use crate::dom::messageport::MessagePort;
37use crate::dom::promise::Promise;
38use crate::dom::promisenativehandler::Callback;
39use crate::dom::readablestream::{ReadableStream, create_readable_stream};
40use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
41use crate::dom::stream::transformstreamdefaultcontroller::TransformerType;
42use crate::dom::stream::underlyingsourcecontainer::UnderlyingSourceType;
43use crate::dom::stream::writablestream::create_writable_stream;
44use crate::dom::stream::writablestreamdefaultcontroller::UnderlyingSinkType;
45use crate::dom::types::{PromiseNativeHandler, TransformStreamDefaultController, WritableStream};
46use crate::realms::enter_auto_realm;
47use crate::script_runtime::CanGc;
48
49impl js::gc::Rootable for TransformBackPressureChangePromiseFulfillment {}
50
51#[derive(JSTraceable, MallocSizeOf)]
54#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
55struct TransformBackPressureChangePromiseFulfillment {
56 #[conditional_malloc_size_of]
58 result_promise: Rc<Promise>,
59
60 #[ignore_malloc_size_of = "mozjs"]
61 chunk: Box<Heap<JSVal>>,
62
63 writable: Dom<WritableStream>,
65
66 controller: Dom<TransformStreamDefaultController>,
67}
68
69impl Callback for TransformBackPressureChangePromiseFulfillment {
70 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
72 if self.writable.is_erroring() {
76 rooted!(&in(cx) let mut error = UndefinedValue());
77 self.writable.get_stored_error(error.handle_mut());
78 self.result_promise.reject_with_cx(cx, error.handle());
79 return;
80 }
81
82 assert!(self.writable.is_writable());
84
85 rooted!(&in(cx) let mut chunk = UndefinedValue());
87 chunk.set(self.chunk.get());
88 let transform_result = self
89 .controller
90 .transform_stream_default_controller_perform_transform(
91 cx,
92 &self.writable.global(),
93 chunk.handle(),
94 )
95 .expect("perform transform failed");
96
97 let handler = PromiseNativeHandler::new(
100 cx,
101 &self.writable.global(),
102 Some(Box::new(PerformTransformFulfillment {
103 result_promise: self.result_promise.clone(),
104 })),
105 Some(Box::new(PerformTransformRejection {
106 result_promise: self.result_promise.clone(),
107 })),
108 );
109
110 let mut realm = enter_auto_realm(cx, &*self.writable.global());
111 let realm = &mut realm.current_realm();
112 transform_result.append_native_handler(realm, &handler);
113 }
114}
115
116#[derive(JSTraceable, MallocSizeOf)]
117#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
118struct PerformTransformFulfillment {
121 #[conditional_malloc_size_of]
122 result_promise: Rc<Promise>,
123}
124
125impl Callback for PerformTransformFulfillment {
126 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
127 let can_gc = CanGc::from_cx(cx);
128 self.result_promise.resolve_native(&(), can_gc);
130 }
131}
132
133#[derive(JSTraceable, MallocSizeOf)]
134#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
135struct PerformTransformRejection {
138 #[conditional_malloc_size_of]
139 result_promise: Rc<Promise>,
140}
141
142impl Callback for PerformTransformRejection {
143 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
144 let can_gc = CanGc::from_cx(cx);
145 self.result_promise.reject(cx.into(), v, can_gc);
147 }
148}
149
150#[derive(JSTraceable, MallocSizeOf)]
151#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
152struct BackpressureChangeRejection {
155 #[conditional_malloc_size_of]
156 result_promise: Rc<Promise>,
157}
158
159impl Callback for BackpressureChangeRejection {
160 fn callback(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) {
161 let can_gc = CanGc::from_cx(cx);
162 self.result_promise.reject(cx.into(), reason, can_gc);
163 }
164}
165
166impl js::gc::Rootable for CancelPromiseFulfillment {}
167
168#[derive(JSTraceable, MallocSizeOf)]
171#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
172struct CancelPromiseFulfillment {
173 readable: Dom<ReadableStream>,
174 controller: Dom<TransformStreamDefaultController>,
175 #[ignore_malloc_size_of = "mozjs"]
176 reason: Box<Heap<JSVal>>,
177}
178
179impl Callback for CancelPromiseFulfillment {
180 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
182 if self.readable.is_errored() {
184 rooted!(&in(cx) let mut error = UndefinedValue());
185 self.readable.get_stored_error(error.handle_mut());
186 self.controller
187 .get_finish_promise()
188 .expect("finish promise is not set")
189 .reject_native_with_cx(cx, &error.handle());
190 } else {
191 rooted!(&in(cx) let mut reason = UndefinedValue());
194 reason.set(self.reason.get());
195 self.readable
196 .get_default_controller()
197 .error(cx, reason.handle());
198
199 self.controller
201 .get_finish_promise()
202 .expect("finish promise is not set")
203 .resolve_native_with_cx(cx, &());
204 }
205 }
206}
207
208impl js::gc::Rootable for CancelPromiseRejection {}
209
210#[derive(JSTraceable, MallocSizeOf)]
213#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
214struct CancelPromiseRejection {
215 readable: Dom<ReadableStream>,
216 controller: Dom<TransformStreamDefaultController>,
217}
218
219impl Callback for CancelPromiseRejection {
220 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
222 self.readable.get_default_controller().error(cx, v);
224
225 self.controller
227 .get_finish_promise()
228 .expect("finish promise is not set")
229 .reject_with_cx(cx, v);
230 }
231}
232
233impl js::gc::Rootable for SourceCancelPromiseFulfillment {}
234
235#[derive(JSTraceable, MallocSizeOf)]
238#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
239struct SourceCancelPromiseFulfillment {
240 writeable: Dom<WritableStream>,
241 controller: Dom<TransformStreamDefaultController>,
242 stream: Dom<TransformStream>,
243 #[ignore_malloc_size_of = "mozjs"]
244 reason: Box<Heap<JSVal>>,
245}
246
247impl Callback for SourceCancelPromiseFulfillment {
248 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
250 let finish_promise = self
252 .controller
253 .get_finish_promise()
254 .expect("finish promise is not set");
255
256 let global = &self.writeable.global();
257 if self.writeable.is_errored() {
259 rooted!(&in(cx) let mut error = UndefinedValue());
260 self.writeable.get_stored_error(error.handle_mut());
261 finish_promise.reject_with_cx(cx, error.handle());
262 } else {
263 rooted!(&in(cx) let mut reason = UndefinedValue());
266 reason.set(self.reason.get());
267 self.writeable
268 .get_default_controller()
269 .error_if_needed(cx, reason.handle(), global);
270
271 self.stream.unblock_write(global, CanGc::from_cx(cx));
273
274 finish_promise.resolve_native_with_cx(cx, &());
276 }
277 }
278}
279
280impl js::gc::Rootable for SourceCancelPromiseRejection {}
281
282#[derive(JSTraceable, MallocSizeOf)]
285#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
286struct SourceCancelPromiseRejection {
287 writeable: Dom<WritableStream>,
288 controller: Dom<TransformStreamDefaultController>,
289 stream: Dom<TransformStream>,
290}
291
292impl Callback for SourceCancelPromiseRejection {
293 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
295 let global = &self.writeable.global();
297
298 self.writeable
299 .get_default_controller()
300 .error_if_needed(cx, v, global);
301
302 self.stream.unblock_write(global, CanGc::from_cx(cx));
304
305 self.controller
307 .get_finish_promise()
308 .expect("finish promise is not set")
309 .reject_with_cx(cx, v);
310 }
311}
312
313impl js::gc::Rootable for FlushPromiseFulfillment {}
314
315#[derive(JSTraceable, MallocSizeOf)]
318#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
319struct FlushPromiseFulfillment {
320 readable: Dom<ReadableStream>,
321 controller: Dom<TransformStreamDefaultController>,
322}
323
324impl Callback for FlushPromiseFulfillment {
325 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
327 let finish_promise = self
329 .controller
330 .get_finish_promise()
331 .expect("finish promise is not set");
332
333 if self.readable.is_errored() {
335 rooted!(&in(cx) let mut error = UndefinedValue());
336 self.readable.get_stored_error(error.handle_mut());
337 finish_promise.reject_with_cx(cx, error.handle());
338 } else {
339 self.readable.get_default_controller().close(cx);
342
343 finish_promise.resolve_native_with_cx(cx, &());
345 }
346 }
347}
348
349impl js::gc::Rootable for FlushPromiseRejection {}
350#[derive(JSTraceable, MallocSizeOf)]
354#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
355struct FlushPromiseRejection {
356 readable: Dom<ReadableStream>,
357 controller: Dom<TransformStreamDefaultController>,
358}
359
360impl Callback for FlushPromiseRejection {
361 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
363 self.readable.get_default_controller().error(cx, v);
366
367 self.controller
369 .get_finish_promise()
370 .expect("finish promise is not set")
371 .reject_with_cx(cx, v);
372 }
373}
374
375impl js::gc::Rootable for CrossRealmTransform {}
376
377#[derive(Clone, JSTraceable, MallocSizeOf)]
380#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
381pub(crate) enum CrossRealmTransform {
382 Readable(CrossRealmTransformReadable),
384 Writable(CrossRealmTransformWritable),
386}
387
388#[dom_struct]
390pub struct TransformStream {
391 reflector_: Reflector,
392
393 backpressure: Cell<bool>,
395
396 #[conditional_malloc_size_of]
398 backpressure_change_promise: DomRefCell<Option<Rc<Promise>>>,
399
400 controller: MutNullableDom<TransformStreamDefaultController>,
402
403 detached: Cell<bool>,
405
406 readable: MutNullableDom<ReadableStream>,
408
409 writable: MutNullableDom<WritableStream>,
411}
412
413impl TransformStream {
414 fn new_inherited() -> TransformStream {
416 TransformStream {
417 reflector_: Reflector::new(),
418 backpressure: Default::default(),
419 backpressure_change_promise: DomRefCell::new(None),
420 controller: MutNullableDom::new(None),
421 detached: Cell::new(false),
422 readable: MutNullableDom::new(None),
423 writable: MutNullableDom::new(None),
424 }
425 }
426
427 pub(crate) fn new_with_proto(
428 global: &GlobalScope,
429 proto: Option<SafeHandleObject>,
430 can_gc: CanGc,
431 ) -> DomRoot<TransformStream> {
432 reflect_dom_object_with_proto(
433 Box::new(TransformStream::new_inherited()),
434 global,
435 proto,
436 can_gc,
437 )
438 }
439
440 pub(crate) fn set_up(
443 &self,
444 cx: &mut JSContext,
445 global: &GlobalScope,
446 transformer_type: TransformerType,
447 ) -> Fallible<()> {
448 let writable_high_water_mark = 1.0;
450
451 let writable_size_algorithm =
453 extract_size_algorithm(&Default::default(), CanGc::from_cx(cx));
454
455 let readable_high_water_mark = 0.0;
457
458 let readable_size_algorithm =
460 extract_size_algorithm(&Default::default(), CanGc::from_cx(cx));
461
462 let start_promise = Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
469
470 self.initialize(
474 cx,
475 global,
476 start_promise,
477 writable_high_water_mark,
478 writable_size_algorithm,
479 readable_high_water_mark,
480 readable_size_algorithm,
481 )?;
482
483 let controller =
485 TransformStreamDefaultController::new(global, transformer_type, CanGc::from_cx(cx));
486
487 self.set_up_transform_stream_default_controller(&controller);
491
492 Ok(())
493 }
494
495 pub(crate) fn get_controller(&self) -> DomRoot<TransformStreamDefaultController> {
496 self.controller.get().expect("controller is not set")
497 }
498
499 pub(crate) fn get_writable(&self) -> DomRoot<WritableStream> {
500 self.writable.get().expect("writable stream is not set")
501 }
502
503 pub(crate) fn get_readable(&self) -> DomRoot<ReadableStream> {
504 self.readable.get().expect("readable stream is not set")
505 }
506
507 pub(crate) fn get_backpressure(&self) -> bool {
508 self.backpressure.get()
509 }
510
511 #[expect(clippy::too_many_arguments)]
513 fn initialize(
514 &self,
515 cx: &mut JSContext,
516 global: &GlobalScope,
517 start_promise: Rc<Promise>,
518 writable_high_water_mark: f64,
519 writable_size_algorithm: Rc<QueuingStrategySize>,
520 readable_high_water_mark: f64,
521 readable_size_algorithm: Rc<QueuingStrategySize>,
522 ) -> Fallible<()> {
523 let writable = create_writable_stream(
535 cx,
536 global,
537 writable_high_water_mark,
538 writable_size_algorithm,
539 UnderlyingSinkType::Transform(Dom::from_ref(self), start_promise.clone()),
540 )?;
541 self.writable.set(Some(&writable));
542
543 let readable = create_readable_stream(
555 cx,
556 global,
557 UnderlyingSourceType::Transform(self, start_promise),
558 Some(readable_size_algorithm),
559 Some(readable_high_water_mark),
560 );
561 self.readable.set(Some(&readable));
562
563 self.set_backpressure(global, true, CanGc::from_cx(cx));
568
569 self.controller.set(None);
571
572 Ok(())
573 }
574
575 pub(crate) fn set_backpressure(&self, global: &GlobalScope, backpressure: bool, can_gc: CanGc) {
577 assert!(self.backpressure.get() != backpressure);
579
580 if let Some(promise) = self.backpressure_change_promise.borrow_mut().take() {
583 promise.resolve_native(&(), can_gc);
584 }
585
586 *self.backpressure_change_promise.borrow_mut() = Some(Promise::new(global, can_gc));
588
589 self.backpressure.set(backpressure);
591 }
592
593 fn set_up_transform_stream_default_controller(
595 &self,
596 controller: &TransformStreamDefaultController,
597 ) {
598 assert!(self.controller.get().is_none());
603
604 controller.set_stream(self);
606
607 self.controller.set(Some(controller));
609
610 }
615
616 fn set_up_transform_stream_default_controller_from_transformer(
618 &self,
619 global: &GlobalScope,
620 transformer_obj: SafeHandleObject,
621 transformer: &Transformer,
622 can_gc: CanGc,
623 ) {
624 let transformer_type = TransformerType::new_from_js_transformer(transformer);
626 let controller = TransformStreamDefaultController::new(global, transformer_type, can_gc);
627
628 controller.set_transform_obj(transformer_obj);
651
652 self.set_up_transform_stream_default_controller(&controller);
655 }
656
657 pub(crate) fn transform_stream_default_sink_write_algorithm(
659 &self,
660 cx: &mut JSContext,
661 global: &GlobalScope,
662 chunk: SafeHandleValue,
663 ) -> Fallible<Rc<Promise>> {
664 assert!(self.writable.get().is_some());
666
667 let controller = self.controller.get().expect("controller is not set");
669
670 if self.backpressure.get() {
672 let backpressure_change_promise = self.backpressure_change_promise.borrow();
674
675 assert!(backpressure_change_promise.is_some());
677
678 let result_promise = Promise::new2(cx, global);
680 rooted!(&in(cx) let mut fulfillment_handler = Some(TransformBackPressureChangePromiseFulfillment {
681 controller: Dom::from_ref(&controller),
682 writable: Dom::from_ref(&self.writable.get().expect("writable stream")),
683 chunk: Heap::boxed(chunk.get()),
684 result_promise: result_promise.clone(),
685 }));
686
687 let handler = PromiseNativeHandler::new(
688 cx,
689 global,
690 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
691 Some(Box::new(BackpressureChangeRejection {
692 result_promise: result_promise.clone(),
693 })),
694 );
695 let mut realm = enter_auto_realm(cx, global);
696 let realm = &mut realm.current_realm();
697 backpressure_change_promise
698 .as_ref()
699 .expect("Promise must be some by now.")
700 .append_native_handler(realm, &handler);
701
702 return Ok(result_promise);
703 }
704
705 controller.transform_stream_default_controller_perform_transform(cx, global, chunk)
707 }
708
709 pub(crate) fn transform_stream_default_sink_abort_algorithm(
711 &self,
712 cx: &mut JSContext,
713 global: &GlobalScope,
714 reason: SafeHandleValue,
715 ) -> Fallible<Rc<Promise>> {
716 let controller = self.controller.get().expect("controller is not set");
718
719 if let Some(finish_promise) = controller.get_finish_promise() {
721 return Ok(finish_promise);
722 }
723
724 let readable = self.readable.get().expect("readable stream is not set");
726
727 controller.set_finish_promise(Promise::new2(cx, global));
729
730 let cancel_promise = controller.perform_cancel(cx, global, reason)?;
732
733 controller.clear_algorithms();
735
736 let handler = PromiseNativeHandler::new(
738 cx,
739 global,
740 Some(Box::new(CancelPromiseFulfillment {
741 readable: Dom::from_ref(&readable),
742 controller: Dom::from_ref(&controller),
743 reason: Heap::boxed(reason.get()),
744 })),
745 Some(Box::new(CancelPromiseRejection {
746 readable: Dom::from_ref(&readable),
747 controller: Dom::from_ref(&controller),
748 })),
749 );
750 let mut realm = enter_auto_realm(cx, global);
751 let cx = &mut realm.current_realm();
752 cancel_promise.append_native_handler(cx, &handler);
753
754 let finish_promise = controller
756 .get_finish_promise()
757 .expect("finish promise is not set");
758 Ok(finish_promise)
759 }
760
761 pub(crate) fn transform_stream_default_sink_close_algorithm(
763 &self,
764 cx: &mut JSContext,
765 global: &GlobalScope,
766 ) -> Fallible<Rc<Promise>> {
767 let controller = self
769 .controller
770 .get()
771 .ok_or(Error::Type(c"controller is not set".to_owned()))?;
772
773 if let Some(finish_promise) = controller.get_finish_promise() {
775 return Ok(finish_promise);
776 }
777
778 let readable = self
780 .readable
781 .get()
782 .ok_or(Error::Type(c"readable stream is not set".to_owned()))?;
783
784 controller.set_finish_promise(Promise::new2(cx, global));
786
787 let flush_promise = controller.perform_flush(cx, global)?;
789
790 controller.clear_algorithms();
792
793 let handler = PromiseNativeHandler::new(
795 cx,
796 global,
797 Some(Box::new(FlushPromiseFulfillment {
798 readable: Dom::from_ref(&readable),
799 controller: Dom::from_ref(&controller),
800 })),
801 Some(Box::new(FlushPromiseRejection {
802 readable: Dom::from_ref(&readable),
803 controller: Dom::from_ref(&controller),
804 })),
805 );
806
807 let mut realm = enter_auto_realm(cx, global);
808 let realm = &mut realm.current_realm();
809 flush_promise.append_native_handler(realm, &handler);
810 let finish_promise = controller
812 .get_finish_promise()
813 .expect("finish promise is not set");
814 Ok(finish_promise)
815 }
816
817 pub(crate) fn transform_stream_default_source_cancel(
819 &self,
820 cx: &mut JSContext,
821 global: &GlobalScope,
822 reason: SafeHandleValue,
823 ) -> Fallible<Rc<Promise>> {
824 let controller = self
826 .controller
827 .get()
828 .ok_or(Error::Type(c"controller is not set".to_owned()))?;
829
830 if let Some(finish_promise) = controller.get_finish_promise() {
832 return Ok(finish_promise);
833 }
834
835 let writable = self
837 .writable
838 .get()
839 .ok_or(Error::Type(c"writable stream is not set".to_owned()))?;
840
841 controller.set_finish_promise(Promise::new2(cx, global));
843
844 let cancel_promise = controller.perform_cancel(cx, global, reason)?;
846
847 controller.clear_algorithms();
849
850 let handler = PromiseNativeHandler::new(
852 cx,
853 global,
854 Some(Box::new(SourceCancelPromiseFulfillment {
855 writeable: Dom::from_ref(&writable),
856 controller: Dom::from_ref(&controller),
857 stream: Dom::from_ref(self),
858 reason: Heap::boxed(reason.get()),
859 })),
860 Some(Box::new(SourceCancelPromiseRejection {
861 writeable: Dom::from_ref(&writable),
862 controller: Dom::from_ref(&controller),
863 stream: Dom::from_ref(self),
864 })),
865 );
866
867 let finish_promise = controller
869 .get_finish_promise()
870 .expect("finish promise is not set");
871 let mut realm = enter_auto_realm(cx, global);
872 let cx = &mut realm.current_realm();
873 cancel_promise.append_native_handler(cx, &handler);
874 Ok(finish_promise)
875 }
876
877 pub(crate) fn transform_stream_default_source_pull(
879 &self,
880 global: &GlobalScope,
881 can_gc: CanGc,
882 ) -> Fallible<Rc<Promise>> {
883 assert!(self.backpressure.get());
885
886 assert!(self.backpressure_change_promise.borrow().is_some());
888
889 self.set_backpressure(global, false, can_gc);
891
892 Ok(self
894 .backpressure_change_promise
895 .borrow()
896 .clone()
897 .expect("Promise must be some by now."))
898 }
899
900 pub(crate) fn error_writable_and_unblock_write(
902 &self,
903 cx: &mut JSContext,
904 global: &GlobalScope,
905 error: SafeHandleValue,
906 ) {
907 self.get_controller().clear_algorithms();
909
910 self.get_writable()
912 .get_default_controller()
913 .error_if_needed(cx, error, global);
914
915 self.unblock_write(global, CanGc::from_cx(cx))
917 }
918
919 pub(crate) fn unblock_write(&self, global: &GlobalScope, can_gc: CanGc) {
921 if self.backpressure.get() {
923 self.set_backpressure(global, false, can_gc);
924 }
925 }
926
927 pub(crate) fn error(&self, cx: &mut JSContext, global: &GlobalScope, error: SafeHandleValue) {
929 self.get_readable()
931 .get_default_controller()
932 .error(cx, error);
933
934 self.error_writable_and_unblock_write(cx, global, error);
936 }
937}
938
939impl TransformStreamMethods<crate::DomTypeHolder> for TransformStream {
940 #[expect(unsafe_code)]
942 fn Constructor(
943 cx: &mut JSContext,
944 global: &GlobalScope,
945 proto: Option<SafeHandleObject>,
946 transformer: Option<*mut JSObject>,
947 writable_strategy: &QueuingStrategy,
948 readable_strategy: &QueuingStrategy,
949 ) -> Fallible<DomRoot<TransformStream>> {
950 rooted!(&in(cx) let transformer_obj = transformer.unwrap_or(ptr::null_mut()));
952
953 let transformer_dict = if !transformer_obj.is_null() {
956 rooted!(&in(cx) let obj_val = ObjectValue(transformer_obj.get()));
957 match Transformer::new(cx, obj_val.handle()) {
958 Ok(ConversionResult::Success(val)) => val,
959 Ok(ConversionResult::Failure(error)) => {
960 return Err(Error::Type(error.into_owned()));
961 },
962 _ => {
963 return Err(Error::JSFailed);
964 },
965 }
966 } else {
967 Transformer::empty()
968 };
969
970 if !transformer_dict.readableType.handle().is_undefined() {
972 return Err(Error::Range(c"readableType is set".to_owned()));
973 }
974
975 if !transformer_dict.writableType.handle().is_undefined() {
977 return Err(Error::Range(c"writableType is set".to_owned()));
978 }
979
980 let readable_high_water_mark = extract_high_water_mark(readable_strategy, 0.0)?;
982
983 let readable_size_algorithm = extract_size_algorithm(readable_strategy, CanGc::from_cx(cx));
985
986 let writable_high_water_mark = extract_high_water_mark(writable_strategy, 1.0)?;
988
989 let writable_size_algorithm = extract_size_algorithm(writable_strategy, CanGc::from_cx(cx));
991
992 let start_promise = Promise::new2(cx, global);
994
995 let stream = TransformStream::new_with_proto(global, proto, CanGc::from_cx(cx));
998 stream.initialize(
999 cx,
1000 global,
1001 start_promise.clone(),
1002 writable_high_water_mark,
1003 writable_size_algorithm,
1004 readable_high_water_mark,
1005 readable_size_algorithm,
1006 )?;
1007
1008 stream.set_up_transform_stream_default_controller_from_transformer(
1010 global,
1011 transformer_obj.handle(),
1012 &transformer_dict,
1013 CanGc::from_cx(cx),
1014 );
1015
1016 if let Some(start) = &transformer_dict.start {
1020 rooted!(&in(cx) let mut result_object = ptr::null_mut::<JSObject>());
1021 rooted!(&in(cx) let mut result: JSVal);
1022 rooted!(&in(cx) let this_object = transformer_obj.get());
1023 start.Call_(
1024 cx,
1025 &this_object.handle(),
1026 &stream.get_controller(),
1027 result.handle_mut(),
1028 ExceptionHandling::Rethrow,
1029 )?;
1030 let is_promise = unsafe {
1031 if result.is_object() {
1032 result_object.set(result.to_object());
1033 IsPromiseObject(result_object.handle().into_handle())
1034 } else {
1035 false
1036 }
1037 };
1038 let promise = if is_promise {
1039 Promise::new_with_js_promise(result_object.handle(), cx.into())
1040 } else {
1041 Promise::new_resolved(global, cx.into(), result.get(), CanGc::from_cx(cx))
1042 };
1043 start_promise.resolve_native_with_cx(cx, &promise);
1044 } else {
1045 start_promise.resolve_native_with_cx(cx, &());
1047 };
1048
1049 Ok(stream)
1050 }
1051
1052 fn Readable(&self) -> DomRoot<ReadableStream> {
1054 self.readable.get().expect("readable stream is not set")
1056 }
1057
1058 fn Writable(&self) -> DomRoot<WritableStream> {
1060 self.writable.get().expect("writable stream is not set")
1062 }
1063}
1064
1065impl Transferable for TransformStream {
1067 type Index = MessagePortIndex;
1068 type Data = TransformStreamData;
1069
1070 fn transfer(&self, cx: &mut JSContext) -> Fallible<(MessagePortId, TransformStreamData)> {
1072 let global = self.global();
1073 let mut realm = enter_auto_realm(cx, &*global);
1074 let mut realm = realm.current_realm();
1075 let cx = &mut realm;
1076
1077 let readable = self.get_readable();
1079
1080 let writable = self.get_writable();
1082
1083 if readable.is_locked() || writable.is_locked() {
1088 return Err(Error::DataClone(None));
1089 }
1090
1091 let port1 = MessagePort::new(&global, CanGc::from_cx(cx));
1093 global.track_message_port(&port1, None);
1094 let port1_peer = MessagePort::new(&global, CanGc::from_cx(cx));
1095 global.track_message_port(&port1_peer, None);
1096 global.entangle_ports(*port1.message_port_id(), *port1_peer.message_port_id());
1097
1098 let proxy_readable = ReadableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
1099 proxy_readable.setup_cross_realm_transform_readable(cx, &port1);
1100 proxy_readable
1101 .pipe_to(cx, &global, &writable, false, false, false, None)
1102 .set_promise_is_handled();
1103
1104 let port2 = MessagePort::new(&global, CanGc::from_cx(cx));
1106 global.track_message_port(&port2, None);
1107 let port2_peer = MessagePort::new(&global, CanGc::from_cx(cx));
1108 global.track_message_port(&port2_peer, None);
1109 global.entangle_ports(*port2.message_port_id(), *port2_peer.message_port_id());
1110
1111 let proxy_writable = WritableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
1112 proxy_writable.setup_cross_realm_transform_writable(cx, &port2);
1113
1114 readable
1116 .pipe_to(cx, &global, &proxy_writable, false, false, false, None)
1117 .set_promise_is_handled();
1118
1119 Ok((
1124 *port1_peer.message_port_id(),
1125 TransformStreamData {
1126 readable: port1_peer.transfer(cx)?,
1127 writable: port2_peer.transfer(cx)?,
1128 },
1129 ))
1130 }
1131
1132 fn transfer_receive(
1134 cx: &mut JSContext,
1135 owner: &GlobalScope,
1136 _id: MessagePortId,
1137 data: TransformStreamData,
1138 ) -> Result<DomRoot<Self>, ()> {
1139 let port1 = MessagePort::transfer_receive(cx, owner, data.readable.0, data.readable.1)?;
1140 let port2 = MessagePort::transfer_receive(cx, owner, data.writable.0, data.writable.1)?;
1141
1142 let proxy_readable = ReadableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1146 proxy_readable.setup_cross_realm_transform_readable(cx, &port2);
1147
1148 let proxy_writable = WritableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1152 proxy_writable.setup_cross_realm_transform_writable(cx, &port1);
1153
1154 let stream = TransformStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1160 stream.readable.set(Some(&proxy_readable));
1161 stream.writable.set(Some(&proxy_writable));
1162
1163 Ok(stream)
1164 }
1165
1166 fn serialized_storage<'a>(
1167 data: StructuredData<'a, '_>,
1168 ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
1169 match data {
1170 StructuredData::Reader(r) => &mut r.transform_streams_port_impls,
1171 StructuredData::Writer(w) => &mut w.transform_streams_port,
1172 }
1173 }
1174}