1use std::cell::Cell;
6use std::ptr::{self};
7use std::rc::Rc;
8
9use dom_struct::dom_struct;
10use js::context::JSContext;
11use js::jsapi::{Heap, IsPromiseObject, JSObject};
12use js::jsval::{JSVal, ObjectValue, UndefinedValue};
13use js::realm::CurrentRealm;
14use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle};
15use rustc_hash::FxHashMap;
16use script_bindings::callback::ExceptionHandling;
17use script_bindings::cell::DomRefCell;
18use script_bindings::reflector::{Reflector, reflect_dom_object_with_proto};
19use servo_base::id::{MessagePortId, MessagePortIndex};
20use servo_constellation_traits::TransformStreamData;
21
22use super::readablestream::CrossRealmTransformReadable;
23use super::writablestream::CrossRealmTransformWritable;
24use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::{
25 QueuingStrategy, QueuingStrategySize,
26};
27use crate::dom::bindings::codegen::Bindings::TransformStreamBinding::TransformStreamMethods;
28use crate::dom::bindings::codegen::Bindings::TransformerBinding::Transformer;
29use crate::dom::bindings::conversions::ConversionResult;
30use crate::dom::bindings::error::{Error, Fallible};
31use crate::dom::bindings::reflector::DomGlobal;
32use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
33use crate::dom::bindings::structuredclone::StructuredData;
34use crate::dom::bindings::transferable::Transferable;
35use crate::dom::globalscope::GlobalScope;
36use crate::dom::messageport::MessagePort;
37use crate::dom::promise::Promise;
38use crate::dom::promisenativehandler::Callback;
39use crate::dom::readablestream::{ReadableStream, create_readable_stream};
40use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
41use crate::dom::stream::transformstreamdefaultcontroller::TransformerType;
42use crate::dom::stream::underlyingsourcecontainer::UnderlyingSourceType;
43use crate::dom::stream::writablestream::create_writable_stream;
44use crate::dom::stream::writablestreamdefaultcontroller::UnderlyingSinkType;
45use crate::dom::types::{PromiseNativeHandler, TransformStreamDefaultController, WritableStream};
46use crate::realms::enter_auto_realm;
47use crate::script_runtime::CanGc;
48
49impl js::gc::Rootable for TransformBackPressureChangePromiseFulfillment {}
50
51#[derive(JSTraceable, MallocSizeOf)]
54#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
55struct TransformBackPressureChangePromiseFulfillment {
56 #[conditional_malloc_size_of]
58 result_promise: Rc<Promise>,
59
60 #[ignore_malloc_size_of = "mozjs"]
61 chunk: Box<Heap<JSVal>>,
62
63 writable: Dom<WritableStream>,
65
66 controller: Dom<TransformStreamDefaultController>,
67}
68
69impl Callback for TransformBackPressureChangePromiseFulfillment {
70 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
72 if self.writable.is_erroring() {
76 rooted!(&in(cx) let mut error = UndefinedValue());
77 self.writable.get_stored_error(error.handle_mut());
78 self.result_promise
79 .reject(cx.into(), error.handle(), CanGc::from_cx(cx));
80 return;
81 }
82
83 assert!(self.writable.is_writable());
85
86 rooted!(&in(cx) let mut chunk = UndefinedValue());
88 chunk.set(self.chunk.get());
89 let transform_result = self
90 .controller
91 .transform_stream_default_controller_perform_transform(
92 cx,
93 &self.writable.global(),
94 chunk.handle(),
95 )
96 .expect("perform transform failed");
97
98 let handler = PromiseNativeHandler::new(
101 &self.writable.global(),
102 Some(Box::new(PerformTransformFulfillment {
103 result_promise: self.result_promise.clone(),
104 })),
105 Some(Box::new(PerformTransformRejection {
106 result_promise: self.result_promise.clone(),
107 })),
108 CanGc::from_cx(cx),
109 );
110
111 let mut realm = enter_auto_realm(cx, &*self.writable.global());
112 let realm = &mut realm.current_realm();
113 transform_result.append_native_handler(realm, &handler);
114 }
115}
116
117#[derive(JSTraceable, MallocSizeOf)]
118#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
119struct PerformTransformFulfillment {
122 #[conditional_malloc_size_of]
123 result_promise: Rc<Promise>,
124}
125
126impl Callback for PerformTransformFulfillment {
127 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
128 let can_gc = CanGc::from_cx(cx);
129 self.result_promise.resolve_native(&(), can_gc);
131 }
132}
133
134#[derive(JSTraceable, MallocSizeOf)]
135#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
136struct PerformTransformRejection {
139 #[conditional_malloc_size_of]
140 result_promise: Rc<Promise>,
141}
142
143impl Callback for PerformTransformRejection {
144 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
145 let can_gc = CanGc::from_cx(cx);
146 self.result_promise.reject(cx.into(), v, can_gc);
148 }
149}
150
151#[derive(JSTraceable, MallocSizeOf)]
152#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
153struct BackpressureChangeRejection {
156 #[conditional_malloc_size_of]
157 result_promise: Rc<Promise>,
158}
159
160impl Callback for BackpressureChangeRejection {
161 fn callback(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) {
162 let can_gc = CanGc::from_cx(cx);
163 self.result_promise.reject(cx.into(), reason, can_gc);
164 }
165}
166
167impl js::gc::Rootable for CancelPromiseFulfillment {}
168
169#[derive(JSTraceable, MallocSizeOf)]
172#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
173struct CancelPromiseFulfillment {
174 readable: Dom<ReadableStream>,
175 controller: Dom<TransformStreamDefaultController>,
176 #[ignore_malloc_size_of = "mozjs"]
177 reason: Box<Heap<JSVal>>,
178}
179
180impl Callback for CancelPromiseFulfillment {
181 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
183 if self.readable.is_errored() {
185 rooted!(&in(cx) let mut error = UndefinedValue());
186 self.readable.get_stored_error(error.handle_mut());
187 self.controller
188 .get_finish_promise()
189 .expect("finish promise is not set")
190 .reject_native(&error.handle(), CanGc::from_cx(cx));
191 } else {
192 rooted!(&in(cx) let mut reason = UndefinedValue());
195 reason.set(self.reason.get());
196 self.readable
197 .get_default_controller()
198 .error(cx, reason.handle());
199
200 self.controller
202 .get_finish_promise()
203 .expect("finish promise is not set")
204 .resolve_native(&(), CanGc::from_cx(cx));
205 }
206 }
207}
208
209impl js::gc::Rootable for CancelPromiseRejection {}
210
211#[derive(JSTraceable, MallocSizeOf)]
214#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
215struct CancelPromiseRejection {
216 readable: Dom<ReadableStream>,
217 controller: Dom<TransformStreamDefaultController>,
218}
219
220impl Callback for CancelPromiseRejection {
221 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
223 self.readable.get_default_controller().error(cx, v);
225
226 self.controller
228 .get_finish_promise()
229 .expect("finish promise is not set")
230 .reject(cx.into(), v, CanGc::from_cx(cx));
231 }
232}
233
234impl js::gc::Rootable for SourceCancelPromiseFulfillment {}
235
236#[derive(JSTraceable, MallocSizeOf)]
239#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
240struct SourceCancelPromiseFulfillment {
241 writeable: Dom<WritableStream>,
242 controller: Dom<TransformStreamDefaultController>,
243 stream: Dom<TransformStream>,
244 #[ignore_malloc_size_of = "mozjs"]
245 reason: Box<Heap<JSVal>>,
246}
247
248impl Callback for SourceCancelPromiseFulfillment {
249 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
251 let finish_promise = self
253 .controller
254 .get_finish_promise()
255 .expect("finish promise is not set");
256
257 let global = &self.writeable.global();
258 if self.writeable.is_errored() {
260 rooted!(&in(cx) let mut error = UndefinedValue());
261 self.writeable.get_stored_error(error.handle_mut());
262 finish_promise.reject(cx.into(), error.handle(), CanGc::from_cx(cx));
263 } else {
264 rooted!(&in(cx) let mut reason = UndefinedValue());
267 reason.set(self.reason.get());
268 self.writeable
269 .get_default_controller()
270 .error_if_needed(cx, reason.handle(), global);
271
272 self.stream.unblock_write(global, CanGc::from_cx(cx));
274
275 finish_promise.resolve_native(&(), CanGc::from_cx(cx));
277 }
278 }
279}
280
281impl js::gc::Rootable for SourceCancelPromiseRejection {}
282
283#[derive(JSTraceable, MallocSizeOf)]
286#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
287struct SourceCancelPromiseRejection {
288 writeable: Dom<WritableStream>,
289 controller: Dom<TransformStreamDefaultController>,
290 stream: Dom<TransformStream>,
291}
292
293impl Callback for SourceCancelPromiseRejection {
294 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
296 let global = &self.writeable.global();
298
299 self.writeable
300 .get_default_controller()
301 .error_if_needed(cx, v, global);
302
303 self.stream.unblock_write(global, CanGc::from_cx(cx));
305
306 self.controller
308 .get_finish_promise()
309 .expect("finish promise is not set")
310 .reject(cx.into(), v, CanGc::from_cx(cx));
311 }
312}
313
314impl js::gc::Rootable for FlushPromiseFulfillment {}
315
316#[derive(JSTraceable, MallocSizeOf)]
319#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
320struct FlushPromiseFulfillment {
321 readable: Dom<ReadableStream>,
322 controller: Dom<TransformStreamDefaultController>,
323}
324
325impl Callback for FlushPromiseFulfillment {
326 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
328 let finish_promise = self
330 .controller
331 .get_finish_promise()
332 .expect("finish promise is not set");
333
334 if self.readable.is_errored() {
336 rooted!(&in(cx) let mut error = UndefinedValue());
337 self.readable.get_stored_error(error.handle_mut());
338 finish_promise.reject(cx.into(), error.handle(), CanGc::from_cx(cx));
339 } else {
340 self.readable.get_default_controller().close(cx);
343
344 finish_promise.resolve_native(&(), CanGc::from_cx(cx));
346 }
347 }
348}
349
350impl js::gc::Rootable for FlushPromiseRejection {}
351#[derive(JSTraceable, MallocSizeOf)]
355#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
356struct FlushPromiseRejection {
357 readable: Dom<ReadableStream>,
358 controller: Dom<TransformStreamDefaultController>,
359}
360
361impl Callback for FlushPromiseRejection {
362 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
364 self.readable.get_default_controller().error(cx, v);
367
368 self.controller
370 .get_finish_promise()
371 .expect("finish promise is not set")
372 .reject(cx.into(), v, CanGc::from_cx(cx));
373 }
374}
375
376impl js::gc::Rootable for CrossRealmTransform {}
377
378#[derive(Clone, JSTraceable, MallocSizeOf)]
381#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
382pub(crate) enum CrossRealmTransform {
383 Readable(CrossRealmTransformReadable),
385 Writable(CrossRealmTransformWritable),
387}
388
389#[dom_struct]
391pub struct TransformStream {
392 reflector_: Reflector,
393
394 backpressure: Cell<bool>,
396
397 #[conditional_malloc_size_of]
399 backpressure_change_promise: DomRefCell<Option<Rc<Promise>>>,
400
401 controller: MutNullableDom<TransformStreamDefaultController>,
403
404 detached: Cell<bool>,
406
407 readable: MutNullableDom<ReadableStream>,
409
410 writable: MutNullableDom<WritableStream>,
412}
413
414impl TransformStream {
415 fn new_inherited() -> TransformStream {
417 TransformStream {
418 reflector_: Reflector::new(),
419 backpressure: Default::default(),
420 backpressure_change_promise: DomRefCell::new(None),
421 controller: MutNullableDom::new(None),
422 detached: Cell::new(false),
423 readable: MutNullableDom::new(None),
424 writable: MutNullableDom::new(None),
425 }
426 }
427
428 pub(crate) fn new_with_proto(
429 global: &GlobalScope,
430 proto: Option<SafeHandleObject>,
431 can_gc: CanGc,
432 ) -> DomRoot<TransformStream> {
433 reflect_dom_object_with_proto(
434 Box::new(TransformStream::new_inherited()),
435 global,
436 proto,
437 can_gc,
438 )
439 }
440
441 pub(crate) fn set_up(
444 &self,
445 cx: &mut JSContext,
446 global: &GlobalScope,
447 transformer_type: TransformerType,
448 ) -> Fallible<()> {
449 let writable_high_water_mark = 1.0;
451
452 let writable_size_algorithm =
454 extract_size_algorithm(&Default::default(), CanGc::from_cx(cx));
455
456 let readable_high_water_mark = 0.0;
458
459 let readable_size_algorithm =
461 extract_size_algorithm(&Default::default(), CanGc::from_cx(cx));
462
463 let start_promise = Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
470
471 self.initialize(
475 cx,
476 global,
477 start_promise,
478 writable_high_water_mark,
479 writable_size_algorithm,
480 readable_high_water_mark,
481 readable_size_algorithm,
482 )?;
483
484 let controller =
486 TransformStreamDefaultController::new(global, transformer_type, CanGc::from_cx(cx));
487
488 self.set_up_transform_stream_default_controller(&controller);
492
493 Ok(())
494 }
495
496 pub(crate) fn get_controller(&self) -> DomRoot<TransformStreamDefaultController> {
497 self.controller.get().expect("controller is not set")
498 }
499
500 pub(crate) fn get_writable(&self) -> DomRoot<WritableStream> {
501 self.writable.get().expect("writable stream is not set")
502 }
503
504 pub(crate) fn get_readable(&self) -> DomRoot<ReadableStream> {
505 self.readable.get().expect("readable stream is not set")
506 }
507
508 pub(crate) fn get_backpressure(&self) -> bool {
509 self.backpressure.get()
510 }
511
512 #[expect(clippy::too_many_arguments)]
514 fn initialize(
515 &self,
516 cx: &mut JSContext,
517 global: &GlobalScope,
518 start_promise: Rc<Promise>,
519 writable_high_water_mark: f64,
520 writable_size_algorithm: Rc<QueuingStrategySize>,
521 readable_high_water_mark: f64,
522 readable_size_algorithm: Rc<QueuingStrategySize>,
523 ) -> Fallible<()> {
524 let writable = create_writable_stream(
536 cx,
537 global,
538 writable_high_water_mark,
539 writable_size_algorithm,
540 UnderlyingSinkType::Transform(Dom::from_ref(self), start_promise.clone()),
541 )?;
542 self.writable.set(Some(&writable));
543
544 let readable = create_readable_stream(
556 cx,
557 global,
558 UnderlyingSourceType::Transform(self, start_promise),
559 Some(readable_size_algorithm),
560 Some(readable_high_water_mark),
561 );
562 self.readable.set(Some(&readable));
563
564 self.set_backpressure(global, true, CanGc::from_cx(cx));
569
570 self.controller.set(None);
572
573 Ok(())
574 }
575
576 pub(crate) fn set_backpressure(&self, global: &GlobalScope, backpressure: bool, can_gc: CanGc) {
578 assert!(self.backpressure.get() != backpressure);
580
581 if let Some(promise) = self.backpressure_change_promise.borrow_mut().take() {
584 promise.resolve_native(&(), can_gc);
585 }
586
587 *self.backpressure_change_promise.borrow_mut() = Some(Promise::new(global, can_gc));
589
590 self.backpressure.set(backpressure);
592 }
593
594 fn set_up_transform_stream_default_controller(
596 &self,
597 controller: &TransformStreamDefaultController,
598 ) {
599 assert!(self.controller.get().is_none());
604
605 controller.set_stream(self);
607
608 self.controller.set(Some(controller));
610
611 }
616
617 fn set_up_transform_stream_default_controller_from_transformer(
619 &self,
620 global: &GlobalScope,
621 transformer_obj: SafeHandleObject,
622 transformer: &Transformer,
623 can_gc: CanGc,
624 ) {
625 let transformer_type = TransformerType::new_from_js_transformer(transformer);
627 let controller = TransformStreamDefaultController::new(global, transformer_type, can_gc);
628
629 controller.set_transform_obj(transformer_obj);
652
653 self.set_up_transform_stream_default_controller(&controller);
656 }
657
658 pub(crate) fn transform_stream_default_sink_write_algorithm(
660 &self,
661 cx: &mut JSContext,
662 global: &GlobalScope,
663 chunk: SafeHandleValue,
664 ) -> Fallible<Rc<Promise>> {
665 assert!(self.writable.get().is_some());
667
668 let controller = self.controller.get().expect("controller is not set");
670
671 if self.backpressure.get() {
673 let backpressure_change_promise = self.backpressure_change_promise.borrow();
675
676 assert!(backpressure_change_promise.is_some());
678
679 let result_promise = Promise::new2(cx, global);
681 rooted!(&in(cx) let mut fulfillment_handler = Some(TransformBackPressureChangePromiseFulfillment {
682 controller: Dom::from_ref(&controller),
683 writable: Dom::from_ref(&self.writable.get().expect("writable stream")),
684 chunk: Heap::boxed(chunk.get()),
685 result_promise: result_promise.clone(),
686 }));
687
688 let handler = PromiseNativeHandler::new(
689 global,
690 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
691 Some(Box::new(BackpressureChangeRejection {
692 result_promise: result_promise.clone(),
693 })),
694 CanGc::from_cx(cx),
695 );
696 let mut realm = enter_auto_realm(cx, global);
697 let realm = &mut realm.current_realm();
698 backpressure_change_promise
699 .as_ref()
700 .expect("Promise must be some by now.")
701 .append_native_handler(realm, &handler);
702
703 return Ok(result_promise);
704 }
705
706 controller.transform_stream_default_controller_perform_transform(cx, global, chunk)
708 }
709
710 pub(crate) fn transform_stream_default_sink_abort_algorithm(
712 &self,
713 cx: &mut JSContext,
714 global: &GlobalScope,
715 reason: SafeHandleValue,
716 ) -> Fallible<Rc<Promise>> {
717 let controller = self.controller.get().expect("controller is not set");
719
720 if let Some(finish_promise) = controller.get_finish_promise() {
722 return Ok(finish_promise);
723 }
724
725 let readable = self.readable.get().expect("readable stream is not set");
727
728 controller.set_finish_promise(Promise::new2(cx, global));
730
731 let cancel_promise = controller.perform_cancel(cx, global, reason)?;
733
734 controller.clear_algorithms();
736
737 let handler = PromiseNativeHandler::new(
739 global,
740 Some(Box::new(CancelPromiseFulfillment {
741 readable: Dom::from_ref(&readable),
742 controller: Dom::from_ref(&controller),
743 reason: Heap::boxed(reason.get()),
744 })),
745 Some(Box::new(CancelPromiseRejection {
746 readable: Dom::from_ref(&readable),
747 controller: Dom::from_ref(&controller),
748 })),
749 CanGc::from_cx(cx),
750 );
751 let mut realm = enter_auto_realm(cx, global);
752 let cx = &mut realm.current_realm();
753 cancel_promise.append_native_handler(cx, &handler);
754
755 let finish_promise = controller
757 .get_finish_promise()
758 .expect("finish promise is not set");
759 Ok(finish_promise)
760 }
761
762 pub(crate) fn transform_stream_default_sink_close_algorithm(
764 &self,
765 cx: &mut JSContext,
766 global: &GlobalScope,
767 ) -> Fallible<Rc<Promise>> {
768 let controller = self
770 .controller
771 .get()
772 .ok_or(Error::Type(c"controller is not set".to_owned()))?;
773
774 if let Some(finish_promise) = controller.get_finish_promise() {
776 return Ok(finish_promise);
777 }
778
779 let readable = self
781 .readable
782 .get()
783 .ok_or(Error::Type(c"readable stream is not set".to_owned()))?;
784
785 controller.set_finish_promise(Promise::new2(cx, global));
787
788 let flush_promise = controller.perform_flush(cx, global)?;
790
791 controller.clear_algorithms();
793
794 let handler = PromiseNativeHandler::new(
796 global,
797 Some(Box::new(FlushPromiseFulfillment {
798 readable: Dom::from_ref(&readable),
799 controller: Dom::from_ref(&controller),
800 })),
801 Some(Box::new(FlushPromiseRejection {
802 readable: Dom::from_ref(&readable),
803 controller: Dom::from_ref(&controller),
804 })),
805 CanGc::from_cx(cx),
806 );
807
808 let mut realm = enter_auto_realm(cx, global);
809 let realm = &mut realm.current_realm();
810 flush_promise.append_native_handler(realm, &handler);
811 let finish_promise = controller
813 .get_finish_promise()
814 .expect("finish promise is not set");
815 Ok(finish_promise)
816 }
817
818 pub(crate) fn transform_stream_default_source_cancel(
820 &self,
821 cx: &mut JSContext,
822 global: &GlobalScope,
823 reason: SafeHandleValue,
824 ) -> Fallible<Rc<Promise>> {
825 let controller = self
827 .controller
828 .get()
829 .ok_or(Error::Type(c"controller is not set".to_owned()))?;
830
831 if let Some(finish_promise) = controller.get_finish_promise() {
833 return Ok(finish_promise);
834 }
835
836 let writable = self
838 .writable
839 .get()
840 .ok_or(Error::Type(c"writable stream is not set".to_owned()))?;
841
842 controller.set_finish_promise(Promise::new2(cx, global));
844
845 let cancel_promise = controller.perform_cancel(cx, global, reason)?;
847
848 controller.clear_algorithms();
850
851 let handler = PromiseNativeHandler::new(
853 global,
854 Some(Box::new(SourceCancelPromiseFulfillment {
855 writeable: Dom::from_ref(&writable),
856 controller: Dom::from_ref(&controller),
857 stream: Dom::from_ref(self),
858 reason: Heap::boxed(reason.get()),
859 })),
860 Some(Box::new(SourceCancelPromiseRejection {
861 writeable: Dom::from_ref(&writable),
862 controller: Dom::from_ref(&controller),
863 stream: Dom::from_ref(self),
864 })),
865 CanGc::from_cx(cx),
866 );
867
868 let finish_promise = controller
870 .get_finish_promise()
871 .expect("finish promise is not set");
872 let mut realm = enter_auto_realm(cx, global);
873 let cx = &mut realm.current_realm();
874 cancel_promise.append_native_handler(cx, &handler);
875 Ok(finish_promise)
876 }
877
878 pub(crate) fn transform_stream_default_source_pull(
880 &self,
881 global: &GlobalScope,
882 can_gc: CanGc,
883 ) -> Fallible<Rc<Promise>> {
884 assert!(self.backpressure.get());
886
887 assert!(self.backpressure_change_promise.borrow().is_some());
889
890 self.set_backpressure(global, false, can_gc);
892
893 Ok(self
895 .backpressure_change_promise
896 .borrow()
897 .clone()
898 .expect("Promise must be some by now."))
899 }
900
901 pub(crate) fn error_writable_and_unblock_write(
903 &self,
904 cx: &mut JSContext,
905 global: &GlobalScope,
906 error: SafeHandleValue,
907 ) {
908 self.get_controller().clear_algorithms();
910
911 self.get_writable()
913 .get_default_controller()
914 .error_if_needed(cx, error, global);
915
916 self.unblock_write(global, CanGc::from_cx(cx))
918 }
919
920 pub(crate) fn unblock_write(&self, global: &GlobalScope, can_gc: CanGc) {
922 if self.backpressure.get() {
924 self.set_backpressure(global, false, can_gc);
925 }
926 }
927
928 pub(crate) fn error(&self, cx: &mut JSContext, global: &GlobalScope, error: SafeHandleValue) {
930 self.get_readable()
932 .get_default_controller()
933 .error(cx, error);
934
935 self.error_writable_and_unblock_write(cx, global, error);
937 }
938}
939
940impl TransformStreamMethods<crate::DomTypeHolder> for TransformStream {
941 #[expect(unsafe_code)]
943 fn Constructor(
944 cx: &mut JSContext,
945 global: &GlobalScope,
946 proto: Option<SafeHandleObject>,
947 transformer: Option<*mut JSObject>,
948 writable_strategy: &QueuingStrategy,
949 readable_strategy: &QueuingStrategy,
950 ) -> Fallible<DomRoot<TransformStream>> {
951 rooted!(&in(cx) let transformer_obj = transformer.unwrap_or(ptr::null_mut()));
953
954 let transformer_dict = if !transformer_obj.is_null() {
957 rooted!(&in(cx) let obj_val = ObjectValue(transformer_obj.get()));
958 match Transformer::new(cx.into(), obj_val.handle(), CanGc::from_cx(cx)) {
959 Ok(ConversionResult::Success(val)) => val,
960 Ok(ConversionResult::Failure(error)) => {
961 return Err(Error::Type(error.into_owned()));
962 },
963 _ => {
964 return Err(Error::JSFailed);
965 },
966 }
967 } else {
968 Transformer::empty()
969 };
970
971 if !transformer_dict.readableType.handle().is_undefined() {
973 return Err(Error::Range(c"readableType is set".to_owned()));
974 }
975
976 if !transformer_dict.writableType.handle().is_undefined() {
978 return Err(Error::Range(c"writableType is set".to_owned()));
979 }
980
981 let readable_high_water_mark = extract_high_water_mark(readable_strategy, 0.0)?;
983
984 let readable_size_algorithm = extract_size_algorithm(readable_strategy, CanGc::from_cx(cx));
986
987 let writable_high_water_mark = extract_high_water_mark(writable_strategy, 1.0)?;
989
990 let writable_size_algorithm = extract_size_algorithm(writable_strategy, CanGc::from_cx(cx));
992
993 let start_promise = Promise::new2(cx, global);
995
996 let stream = TransformStream::new_with_proto(global, proto, CanGc::from_cx(cx));
999 stream.initialize(
1000 cx,
1001 global,
1002 start_promise.clone(),
1003 writable_high_water_mark,
1004 writable_size_algorithm,
1005 readable_high_water_mark,
1006 readable_size_algorithm,
1007 )?;
1008
1009 stream.set_up_transform_stream_default_controller_from_transformer(
1011 global,
1012 transformer_obj.handle(),
1013 &transformer_dict,
1014 CanGc::from_cx(cx),
1015 );
1016
1017 if let Some(start) = &transformer_dict.start {
1021 rooted!(&in(cx) let mut result_object = ptr::null_mut::<JSObject>());
1022 rooted!(&in(cx) let mut result: JSVal);
1023 rooted!(&in(cx) let this_object = transformer_obj.get());
1024 start.Call_(
1025 cx,
1026 &this_object.handle(),
1027 &stream.get_controller(),
1028 result.handle_mut(),
1029 ExceptionHandling::Rethrow,
1030 )?;
1031 let is_promise = unsafe {
1032 if result.is_object() {
1033 result_object.set(result.to_object());
1034 IsPromiseObject(result_object.handle().into_handle())
1035 } else {
1036 false
1037 }
1038 };
1039 let promise = if is_promise {
1040 Promise::new_with_js_promise(result_object.handle(), cx.into())
1041 } else {
1042 Promise::new_resolved(global, cx.into(), result.get(), CanGc::from_cx(cx))
1043 };
1044 start_promise.resolve_native(&promise, CanGc::from_cx(cx));
1045 } else {
1046 start_promise.resolve_native(&(), CanGc::from_cx(cx));
1048 };
1049
1050 Ok(stream)
1051 }
1052
1053 fn Readable(&self) -> DomRoot<ReadableStream> {
1055 self.readable.get().expect("readable stream is not set")
1057 }
1058
1059 fn Writable(&self) -> DomRoot<WritableStream> {
1061 self.writable.get().expect("writable stream is not set")
1063 }
1064}
1065
1066impl Transferable for TransformStream {
1068 type Index = MessagePortIndex;
1069 type Data = TransformStreamData;
1070
1071 fn transfer(&self, cx: &mut JSContext) -> Fallible<(MessagePortId, TransformStreamData)> {
1073 let global = self.global();
1074 let mut realm = enter_auto_realm(cx, &*global);
1075 let mut realm = realm.current_realm();
1076 let cx = &mut realm;
1077
1078 let readable = self.get_readable();
1080
1081 let writable = self.get_writable();
1083
1084 if readable.is_locked() || writable.is_locked() {
1089 return Err(Error::DataClone(None));
1090 }
1091
1092 let port1 = MessagePort::new(&global, CanGc::from_cx(cx));
1094 global.track_message_port(&port1, None);
1095 let port1_peer = MessagePort::new(&global, CanGc::from_cx(cx));
1096 global.track_message_port(&port1_peer, None);
1097 global.entangle_ports(*port1.message_port_id(), *port1_peer.message_port_id());
1098
1099 let proxy_readable = ReadableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
1100 proxy_readable.setup_cross_realm_transform_readable(cx, &port1);
1101 proxy_readable
1102 .pipe_to(cx, &global, &writable, false, false, false, None)
1103 .set_promise_is_handled();
1104
1105 let port2 = MessagePort::new(&global, CanGc::from_cx(cx));
1107 global.track_message_port(&port2, None);
1108 let port2_peer = MessagePort::new(&global, CanGc::from_cx(cx));
1109 global.track_message_port(&port2_peer, None);
1110 global.entangle_ports(*port2.message_port_id(), *port2_peer.message_port_id());
1111
1112 let proxy_writable = WritableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
1113 proxy_writable.setup_cross_realm_transform_writable(cx, &port2);
1114
1115 readable
1117 .pipe_to(cx, &global, &proxy_writable, false, false, false, None)
1118 .set_promise_is_handled();
1119
1120 Ok((
1125 *port1_peer.message_port_id(),
1126 TransformStreamData {
1127 readable: port1_peer.transfer(cx)?,
1128 writable: port2_peer.transfer(cx)?,
1129 },
1130 ))
1131 }
1132
1133 fn transfer_receive(
1135 cx: &mut JSContext,
1136 owner: &GlobalScope,
1137 _id: MessagePortId,
1138 data: TransformStreamData,
1139 ) -> Result<DomRoot<Self>, ()> {
1140 let port1 = MessagePort::transfer_receive(cx, owner, data.readable.0, data.readable.1)?;
1141 let port2 = MessagePort::transfer_receive(cx, owner, data.writable.0, data.writable.1)?;
1142
1143 let proxy_readable = ReadableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1147 proxy_readable.setup_cross_realm_transform_readable(cx, &port2);
1148
1149 let proxy_writable = WritableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1153 proxy_writable.setup_cross_realm_transform_writable(cx, &port1);
1154
1155 let stream = TransformStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1161 stream.readable.set(Some(&proxy_readable));
1162 stream.writable.set(Some(&proxy_writable));
1163
1164 Ok(stream)
1165 }
1166
1167 fn serialized_storage<'a>(
1168 data: StructuredData<'a, '_>,
1169 ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
1170 match data {
1171 StructuredData::Reader(r) => &mut r.transform_streams_port_impls,
1172 StructuredData::Writer(w) => &mut w.transform_streams_port,
1173 }
1174 }
1175}