1use std::cell::Cell;
6use std::ptr::{self};
7use std::rc::Rc;
8
9use dom_struct::dom_struct;
10use js::jsapi::{Heap, IsPromiseObject, JSObject};
11use js::jsval::{JSVal, ObjectValue, UndefinedValue};
12use js::realm::CurrentRealm;
13use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle};
14use rustc_hash::FxHashMap;
15use script_bindings::callback::ExceptionHandling;
16use script_bindings::realms::InRealm;
17use servo_base::id::{MessagePortId, MessagePortIndex};
18use servo_constellation_traits::TransformStreamData;
19
20use super::readablestream::CrossRealmTransformReadable;
21use super::writablestream::CrossRealmTransformWritable;
22use crate::dom::bindings::cell::DomRefCell;
23use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::{
24 QueuingStrategy, QueuingStrategySize,
25};
26use crate::dom::bindings::codegen::Bindings::TransformStreamBinding::TransformStreamMethods;
27use crate::dom::bindings::codegen::Bindings::TransformerBinding::Transformer;
28use crate::dom::bindings::conversions::ConversionResult;
29use crate::dom::bindings::error::{Error, Fallible};
30use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
31use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
32use crate::dom::bindings::structuredclone::StructuredData;
33use crate::dom::bindings::transferable::Transferable;
34use crate::dom::globalscope::GlobalScope;
35use crate::dom::messageport::MessagePort;
36use crate::dom::promise::Promise;
37use crate::dom::promisenativehandler::Callback;
38use crate::dom::readablestream::{ReadableStream, create_readable_stream};
39use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
40use crate::dom::stream::transformstreamdefaultcontroller::TransformerType;
41use crate::dom::stream::underlyingsourcecontainer::UnderlyingSourceType;
42use crate::dom::stream::writablestream::create_writable_stream;
43use crate::dom::stream::writablestreamdefaultcontroller::UnderlyingSinkType;
44use crate::dom::types::{PromiseNativeHandler, TransformStreamDefaultController, WritableStream};
45use crate::realms::{enter_auto_realm, enter_realm};
46use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
47
48impl js::gc::Rootable for TransformBackPressureChangePromiseFulfillment {}
49
50#[derive(JSTraceable, MallocSizeOf)]
53#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
54struct TransformBackPressureChangePromiseFulfillment {
55 #[conditional_malloc_size_of]
57 result_promise: Rc<Promise>,
58
59 #[ignore_malloc_size_of = "mozjs"]
60 chunk: Box<Heap<JSVal>>,
61
62 writable: Dom<WritableStream>,
64
65 controller: Dom<TransformStreamDefaultController>,
66}
67
68impl Callback for TransformBackPressureChangePromiseFulfillment {
69 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
71 if self.writable.is_erroring() {
75 rooted!(&in(cx) let mut error = UndefinedValue());
76 self.writable.get_stored_error(error.handle_mut());
77 self.result_promise
78 .reject(cx.into(), error.handle(), CanGc::from_cx(cx));
79 return;
80 }
81
82 assert!(self.writable.is_writable());
84
85 rooted!(&in(cx) let mut chunk = UndefinedValue());
87 chunk.set(self.chunk.get());
88 let transform_result = self
89 .controller
90 .transform_stream_default_controller_perform_transform(
91 cx,
92 &self.writable.global(),
93 chunk.handle(),
94 )
95 .expect("perform transform failed");
96
97 let handler = PromiseNativeHandler::new(
100 &self.writable.global(),
101 Some(Box::new(PerformTransformFulfillment {
102 result_promise: self.result_promise.clone(),
103 })),
104 Some(Box::new(PerformTransformRejection {
105 result_promise: self.result_promise.clone(),
106 })),
107 CanGc::from_cx(cx),
108 );
109
110 let mut realm = enter_auto_realm(cx, &*self.writable.global());
111 let realm = &mut realm.current_realm();
112 let in_realm_proof = realm.into();
113 let comp = InRealm::Already(&in_realm_proof);
114 transform_result.append_native_handler(&handler, comp, CanGc::from_cx(realm));
115 }
116}
117
118#[derive(JSTraceable, MallocSizeOf)]
119#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
120struct PerformTransformFulfillment {
123 #[conditional_malloc_size_of]
124 result_promise: Rc<Promise>,
125}
126
127impl Callback for PerformTransformFulfillment {
128 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
129 let can_gc = CanGc::from_cx(cx);
130 self.result_promise.resolve_native(&(), can_gc);
132 }
133}
134
135#[derive(JSTraceable, MallocSizeOf)]
136#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
137struct PerformTransformRejection {
140 #[conditional_malloc_size_of]
141 result_promise: Rc<Promise>,
142}
143
144impl Callback for PerformTransformRejection {
145 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
146 let can_gc = CanGc::from_cx(cx);
147 self.result_promise.reject(cx.into(), v, can_gc);
149 }
150}
151
152#[derive(JSTraceable, MallocSizeOf)]
153#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
154struct BackpressureChangeRejection {
157 #[conditional_malloc_size_of]
158 result_promise: Rc<Promise>,
159}
160
161impl Callback for BackpressureChangeRejection {
162 fn callback(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) {
163 let can_gc = CanGc::from_cx(cx);
164 self.result_promise.reject(cx.into(), reason, can_gc);
165 }
166}
167
168impl js::gc::Rootable for CancelPromiseFulfillment {}
169
170#[derive(JSTraceable, MallocSizeOf)]
173#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
174struct CancelPromiseFulfillment {
175 readable: Dom<ReadableStream>,
176 controller: Dom<TransformStreamDefaultController>,
177 #[ignore_malloc_size_of = "mozjs"]
178 reason: Box<Heap<JSVal>>,
179}
180
181impl Callback for CancelPromiseFulfillment {
182 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
184 let can_gc = CanGc::from_cx(cx);
185 let cx: SafeJSContext = cx.into();
186 if self.readable.is_errored() {
188 rooted!(in(*cx) let mut error = UndefinedValue());
189 self.readable.get_stored_error(error.handle_mut());
190 self.controller
191 .get_finish_promise()
192 .expect("finish promise is not set")
193 .reject_native(&error.handle(), can_gc);
194 } else {
195 rooted!(in(*cx) let mut reason = UndefinedValue());
198 reason.set(self.reason.get());
199 self.readable
200 .get_default_controller()
201 .error(reason.handle(), can_gc);
202
203 self.controller
205 .get_finish_promise()
206 .expect("finish promise is not set")
207 .resolve_native(&(), can_gc);
208 }
209 }
210}
211
212impl js::gc::Rootable for CancelPromiseRejection {}
213
214#[derive(JSTraceable, MallocSizeOf)]
217#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
218struct CancelPromiseRejection {
219 readable: Dom<ReadableStream>,
220 controller: Dom<TransformStreamDefaultController>,
221}
222
223impl Callback for CancelPromiseRejection {
224 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
226 let can_gc = CanGc::from_cx(cx);
227 let cx: SafeJSContext = cx.into();
228 self.readable.get_default_controller().error(v, can_gc);
230
231 self.controller
233 .get_finish_promise()
234 .expect("finish promise is not set")
235 .reject(cx, v, can_gc);
236 }
237}
238
239impl js::gc::Rootable for SourceCancelPromiseFulfillment {}
240
241#[derive(JSTraceable, MallocSizeOf)]
244#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
245struct SourceCancelPromiseFulfillment {
246 writeable: Dom<WritableStream>,
247 controller: Dom<TransformStreamDefaultController>,
248 stream: Dom<TransformStream>,
249 #[ignore_malloc_size_of = "mozjs"]
250 reason: Box<Heap<JSVal>>,
251}
252
253impl Callback for SourceCancelPromiseFulfillment {
254 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
256 let finish_promise = self
258 .controller
259 .get_finish_promise()
260 .expect("finish promise is not set");
261
262 let global = &self.writeable.global();
263 if self.writeable.is_errored() {
265 rooted!(&in(cx) let mut error = UndefinedValue());
266 self.writeable.get_stored_error(error.handle_mut());
267 finish_promise.reject(cx.into(), error.handle(), CanGc::from_cx(cx));
268 } else {
269 rooted!(&in(cx) let mut reason = UndefinedValue());
272 reason.set(self.reason.get());
273 self.writeable
274 .get_default_controller()
275 .error_if_needed(cx, reason.handle(), global);
276
277 self.stream.unblock_write(global, CanGc::from_cx(cx));
279
280 finish_promise.resolve_native(&(), CanGc::from_cx(cx));
282 }
283 }
284}
285
286impl js::gc::Rootable for SourceCancelPromiseRejection {}
287
288#[derive(JSTraceable, MallocSizeOf)]
291#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
292struct SourceCancelPromiseRejection {
293 writeable: Dom<WritableStream>,
294 controller: Dom<TransformStreamDefaultController>,
295 stream: Dom<TransformStream>,
296}
297
298impl Callback for SourceCancelPromiseRejection {
299 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
301 let global = &self.writeable.global();
303
304 self.writeable
305 .get_default_controller()
306 .error_if_needed(cx, v, global);
307
308 self.stream.unblock_write(global, CanGc::from_cx(cx));
310
311 self.controller
313 .get_finish_promise()
314 .expect("finish promise is not set")
315 .reject(cx.into(), v, CanGc::from_cx(cx));
316 }
317}
318
319impl js::gc::Rootable for FlushPromiseFulfillment {}
320
321#[derive(JSTraceable, MallocSizeOf)]
324#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
325struct FlushPromiseFulfillment {
326 readable: Dom<ReadableStream>,
327 controller: Dom<TransformStreamDefaultController>,
328}
329
330impl Callback for FlushPromiseFulfillment {
331 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
333 let can_gc = CanGc::from_cx(cx);
334 let cx: SafeJSContext = cx.into();
335 let finish_promise = self
337 .controller
338 .get_finish_promise()
339 .expect("finish promise is not set");
340
341 if self.readable.is_errored() {
343 rooted!(in(*cx) let mut error = UndefinedValue());
344 self.readable.get_stored_error(error.handle_mut());
345 finish_promise.reject(cx, error.handle(), can_gc);
346 } else {
347 self.readable.get_default_controller().close(can_gc);
350
351 finish_promise.resolve_native(&(), can_gc);
353 }
354 }
355}
356
357impl js::gc::Rootable for FlushPromiseRejection {}
358#[derive(JSTraceable, MallocSizeOf)]
362#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
363struct FlushPromiseRejection {
364 readable: Dom<ReadableStream>,
365 controller: Dom<TransformStreamDefaultController>,
366}
367
368impl Callback for FlushPromiseRejection {
369 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
371 let can_gc = CanGc::from_cx(cx);
372 let cx: SafeJSContext = cx.into();
373 self.readable.get_default_controller().error(v, can_gc);
376
377 self.controller
379 .get_finish_promise()
380 .expect("finish promise is not set")
381 .reject(cx, v, can_gc);
382 }
383}
384
385impl js::gc::Rootable for CrossRealmTransform {}
386
387#[derive(Clone, JSTraceable, MallocSizeOf)]
390#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
391pub(crate) enum CrossRealmTransform {
392 Readable(CrossRealmTransformReadable),
394 Writable(CrossRealmTransformWritable),
396}
397
398#[dom_struct]
400pub struct TransformStream {
401 reflector_: Reflector,
402
403 backpressure: Cell<bool>,
405
406 #[conditional_malloc_size_of]
408 backpressure_change_promise: DomRefCell<Option<Rc<Promise>>>,
409
410 controller: MutNullableDom<TransformStreamDefaultController>,
412
413 detached: Cell<bool>,
415
416 readable: MutNullableDom<ReadableStream>,
418
419 writable: MutNullableDom<WritableStream>,
421}
422
423impl TransformStream {
424 fn new_inherited() -> TransformStream {
426 TransformStream {
427 reflector_: Reflector::new(),
428 backpressure: Default::default(),
429 backpressure_change_promise: DomRefCell::new(None),
430 controller: MutNullableDom::new(None),
431 detached: Cell::new(false),
432 readable: MutNullableDom::new(None),
433 writable: MutNullableDom::new(None),
434 }
435 }
436
437 pub(crate) fn new_with_proto(
438 global: &GlobalScope,
439 proto: Option<SafeHandleObject>,
440 can_gc: CanGc,
441 ) -> DomRoot<TransformStream> {
442 reflect_dom_object_with_proto(
443 Box::new(TransformStream::new_inherited()),
444 global,
445 proto,
446 can_gc,
447 )
448 }
449
450 pub(crate) fn set_up(
453 &self,
454 cx: &mut js::context::JSContext,
455 global: &GlobalScope,
456 transformer_type: TransformerType,
457 ) -> Fallible<()> {
458 let writable_high_water_mark = 1.0;
460
461 let writable_size_algorithm =
463 extract_size_algorithm(&Default::default(), CanGc::from_cx(cx));
464
465 let readable_high_water_mark = 0.0;
467
468 let readable_size_algorithm =
470 extract_size_algorithm(&Default::default(), CanGc::from_cx(cx));
471
472 let start_promise = Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx));
479
480 self.initialize(
484 cx,
485 global,
486 start_promise,
487 writable_high_water_mark,
488 writable_size_algorithm,
489 readable_high_water_mark,
490 readable_size_algorithm,
491 )?;
492
493 let controller =
495 TransformStreamDefaultController::new(global, transformer_type, CanGc::from_cx(cx));
496
497 self.set_up_transform_stream_default_controller(&controller);
501
502 Ok(())
503 }
504
505 pub(crate) fn get_controller(&self) -> DomRoot<TransformStreamDefaultController> {
506 self.controller.get().expect("controller is not set")
507 }
508
509 pub(crate) fn get_writable(&self) -> DomRoot<WritableStream> {
510 self.writable.get().expect("writable stream is not set")
511 }
512
513 pub(crate) fn get_readable(&self) -> DomRoot<ReadableStream> {
514 self.readable.get().expect("readable stream is not set")
515 }
516
517 pub(crate) fn get_backpressure(&self) -> bool {
518 self.backpressure.get()
519 }
520
521 #[expect(clippy::too_many_arguments)]
523 fn initialize(
524 &self,
525 cx: &mut js::context::JSContext,
526 global: &GlobalScope,
527 start_promise: Rc<Promise>,
528 writable_high_water_mark: f64,
529 writable_size_algorithm: Rc<QueuingStrategySize>,
530 readable_high_water_mark: f64,
531 readable_size_algorithm: Rc<QueuingStrategySize>,
532 ) -> Fallible<()> {
533 let writable = create_writable_stream(
545 cx,
546 global,
547 writable_high_water_mark,
548 writable_size_algorithm,
549 UnderlyingSinkType::Transform(Dom::from_ref(self), start_promise.clone()),
550 )?;
551 self.writable.set(Some(&writable));
552
553 let readable = create_readable_stream(
565 cx,
566 global,
567 UnderlyingSourceType::Transform(Dom::from_ref(self), start_promise),
568 Some(readable_size_algorithm),
569 Some(readable_high_water_mark),
570 );
571 self.readable.set(Some(&readable));
572
573 self.set_backpressure(global, true, CanGc::from_cx(cx));
578
579 self.controller.set(None);
581
582 Ok(())
583 }
584
585 pub(crate) fn set_backpressure(&self, global: &GlobalScope, backpressure: bool, can_gc: CanGc) {
587 assert!(self.backpressure.get() != backpressure);
589
590 if let Some(promise) = self.backpressure_change_promise.borrow_mut().take() {
593 promise.resolve_native(&(), can_gc);
594 }
595
596 *self.backpressure_change_promise.borrow_mut() = Some(Promise::new(global, can_gc));
598
599 self.backpressure.set(backpressure);
601 }
602
603 fn set_up_transform_stream_default_controller(
605 &self,
606 controller: &TransformStreamDefaultController,
607 ) {
608 assert!(self.controller.get().is_none());
613
614 controller.set_stream(self);
616
617 self.controller.set(Some(controller));
619
620 }
625
626 fn set_up_transform_stream_default_controller_from_transformer(
628 &self,
629 global: &GlobalScope,
630 transformer_obj: SafeHandleObject,
631 transformer: &Transformer,
632 can_gc: CanGc,
633 ) {
634 let transformer_type = TransformerType::new_from_js_transformer(transformer);
636 let controller = TransformStreamDefaultController::new(global, transformer_type, can_gc);
637
638 controller.set_transform_obj(transformer_obj);
661
662 self.set_up_transform_stream_default_controller(&controller);
665 }
666
667 pub(crate) fn transform_stream_default_sink_write_algorithm(
669 &self,
670 cx: &mut js::context::JSContext,
671 global: &GlobalScope,
672 chunk: SafeHandleValue,
673 ) -> Fallible<Rc<Promise>> {
674 assert!(self.writable.get().is_some());
676
677 let controller = self.controller.get().expect("controller is not set");
679
680 if self.backpressure.get() {
682 let backpressure_change_promise = self.backpressure_change_promise.borrow();
684
685 assert!(backpressure_change_promise.is_some());
687
688 let result_promise = Promise::new2(cx, global);
690 rooted!(&in(cx) let mut fulfillment_handler = Some(TransformBackPressureChangePromiseFulfillment {
691 controller: Dom::from_ref(&controller),
692 writable: Dom::from_ref(&self.writable.get().expect("writable stream")),
693 chunk: Heap::boxed(chunk.get()),
694 result_promise: result_promise.clone(),
695 }));
696
697 let handler = PromiseNativeHandler::new(
698 global,
699 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
700 Some(Box::new(BackpressureChangeRejection {
701 result_promise: result_promise.clone(),
702 })),
703 CanGc::from_cx(cx),
704 );
705 let mut realm = enter_auto_realm(cx, global);
706 let realm = &mut realm.current_realm();
707 let in_realm_proof = realm.into();
708 let comp = InRealm::Already(&in_realm_proof);
709 backpressure_change_promise
710 .as_ref()
711 .expect("Promise must be some by now.")
712 .append_native_handler(&handler, comp, CanGc::from_cx(realm));
713
714 return Ok(result_promise);
715 }
716
717 controller.transform_stream_default_controller_perform_transform(cx, global, chunk)
719 }
720
721 pub(crate) fn transform_stream_default_sink_abort_algorithm(
723 &self,
724 cx: SafeJSContext,
725 global: &GlobalScope,
726 reason: SafeHandleValue,
727 can_gc: CanGc,
728 ) -> Fallible<Rc<Promise>> {
729 let controller = self.controller.get().expect("controller is not set");
731
732 if let Some(finish_promise) = controller.get_finish_promise() {
734 return Ok(finish_promise);
735 }
736
737 let readable = self.readable.get().expect("readable stream is not set");
739
740 controller.set_finish_promise(Promise::new(global, can_gc));
742
743 let cancel_promise = controller.perform_cancel(cx, global, reason, can_gc)?;
745
746 controller.clear_algorithms();
748
749 let handler = PromiseNativeHandler::new(
751 global,
752 Some(Box::new(CancelPromiseFulfillment {
753 readable: Dom::from_ref(&readable),
754 controller: Dom::from_ref(&controller),
755 reason: Heap::boxed(reason.get()),
756 })),
757 Some(Box::new(CancelPromiseRejection {
758 readable: Dom::from_ref(&readable),
759 controller: Dom::from_ref(&controller),
760 })),
761 can_gc,
762 );
763 let realm = enter_realm(global);
764 let comp = InRealm::Entered(&realm);
765 cancel_promise.append_native_handler(&handler, comp, can_gc);
766
767 let finish_promise = controller
769 .get_finish_promise()
770 .expect("finish promise is not set");
771 Ok(finish_promise)
772 }
773
774 pub(crate) fn transform_stream_default_sink_close_algorithm(
776 &self,
777 cx: &mut js::context::JSContext,
778 global: &GlobalScope,
779 ) -> Fallible<Rc<Promise>> {
780 let controller = self
782 .controller
783 .get()
784 .ok_or(Error::Type(c"controller is not set".to_owned()))?;
785
786 if let Some(finish_promise) = controller.get_finish_promise() {
788 return Ok(finish_promise);
789 }
790
791 let readable = self
793 .readable
794 .get()
795 .ok_or(Error::Type(c"readable stream is not set".to_owned()))?;
796
797 controller.set_finish_promise(Promise::new2(cx, global));
799
800 let flush_promise = controller.perform_flush(cx, global)?;
802
803 controller.clear_algorithms();
805
806 let handler = PromiseNativeHandler::new(
808 global,
809 Some(Box::new(FlushPromiseFulfillment {
810 readable: Dom::from_ref(&readable),
811 controller: Dom::from_ref(&controller),
812 })),
813 Some(Box::new(FlushPromiseRejection {
814 readable: Dom::from_ref(&readable),
815 controller: Dom::from_ref(&controller),
816 })),
817 CanGc::from_cx(cx),
818 );
819
820 let mut realm = enter_auto_realm(cx, global);
821 let realm = &mut realm.current_realm();
822 let in_realm_proof = realm.into();
823 let comp = InRealm::Already(&in_realm_proof);
824 flush_promise.append_native_handler(&handler, comp, CanGc::from_cx(realm));
825 let finish_promise = controller
827 .get_finish_promise()
828 .expect("finish promise is not set");
829 Ok(finish_promise)
830 }
831
832 pub(crate) fn transform_stream_default_source_cancel(
834 &self,
835 cx: SafeJSContext,
836 global: &GlobalScope,
837 reason: SafeHandleValue,
838 can_gc: CanGc,
839 ) -> Fallible<Rc<Promise>> {
840 let controller = self
842 .controller
843 .get()
844 .ok_or(Error::Type(c"controller is not set".to_owned()))?;
845
846 if let Some(finish_promise) = controller.get_finish_promise() {
848 return Ok(finish_promise);
849 }
850
851 let writable = self
853 .writable
854 .get()
855 .ok_or(Error::Type(c"writable stream is not set".to_owned()))?;
856
857 controller.set_finish_promise(Promise::new(global, can_gc));
859
860 let cancel_promise = controller.perform_cancel(cx, global, reason, can_gc)?;
862
863 controller.clear_algorithms();
865
866 let handler = PromiseNativeHandler::new(
868 global,
869 Some(Box::new(SourceCancelPromiseFulfillment {
870 writeable: Dom::from_ref(&writable),
871 controller: Dom::from_ref(&controller),
872 stream: Dom::from_ref(self),
873 reason: Heap::boxed(reason.get()),
874 })),
875 Some(Box::new(SourceCancelPromiseRejection {
876 writeable: Dom::from_ref(&writable),
877 controller: Dom::from_ref(&controller),
878 stream: Dom::from_ref(self),
879 })),
880 can_gc,
881 );
882
883 let finish_promise = controller
885 .get_finish_promise()
886 .expect("finish promise is not set");
887 let realm = enter_realm(global);
888 let comp = InRealm::Entered(&realm);
889 cancel_promise.append_native_handler(&handler, comp, can_gc);
890 Ok(finish_promise)
891 }
892
893 pub(crate) fn transform_stream_default_source_pull(
895 &self,
896 global: &GlobalScope,
897 can_gc: CanGc,
898 ) -> Fallible<Rc<Promise>> {
899 assert!(self.backpressure.get());
901
902 assert!(self.backpressure_change_promise.borrow().is_some());
904
905 self.set_backpressure(global, false, can_gc);
907
908 Ok(self
910 .backpressure_change_promise
911 .borrow()
912 .clone()
913 .expect("Promise must be some by now."))
914 }
915
916 pub(crate) fn error_writable_and_unblock_write(
918 &self,
919 cx: &mut js::context::JSContext,
920 global: &GlobalScope,
921 error: SafeHandleValue,
922 ) {
923 self.get_controller().clear_algorithms();
925
926 self.get_writable()
928 .get_default_controller()
929 .error_if_needed(cx, error, global);
930
931 self.unblock_write(global, CanGc::from_cx(cx))
933 }
934
935 pub(crate) fn unblock_write(&self, global: &GlobalScope, can_gc: CanGc) {
937 if self.backpressure.get() {
939 self.set_backpressure(global, false, can_gc);
940 }
941 }
942
943 pub(crate) fn error(
945 &self,
946 cx: &mut js::context::JSContext,
947 global: &GlobalScope,
948 error: SafeHandleValue,
949 ) {
950 self.get_readable()
952 .get_default_controller()
953 .error(error, CanGc::from_cx(cx));
954
955 self.error_writable_and_unblock_write(cx, global, error);
957 }
958}
959
960impl TransformStreamMethods<crate::DomTypeHolder> for TransformStream {
961 #[expect(unsafe_code)]
963 fn Constructor(
964 cx: &mut js::context::JSContext,
965 global: &GlobalScope,
966 proto: Option<SafeHandleObject>,
967 transformer: Option<*mut JSObject>,
968 writable_strategy: &QueuingStrategy,
969 readable_strategy: &QueuingStrategy,
970 ) -> Fallible<DomRoot<TransformStream>> {
971 rooted!(&in(cx) let transformer_obj = transformer.unwrap_or(ptr::null_mut()));
973
974 let transformer_dict = if !transformer_obj.is_null() {
977 rooted!(&in(cx) let obj_val = ObjectValue(transformer_obj.get()));
978 match Transformer::new(cx.into(), obj_val.handle(), CanGc::from_cx(cx)) {
979 Ok(ConversionResult::Success(val)) => val,
980 Ok(ConversionResult::Failure(error)) => {
981 return Err(Error::Type(error.into_owned()));
982 },
983 _ => {
984 return Err(Error::JSFailed);
985 },
986 }
987 } else {
988 Transformer::empty()
989 };
990
991 if !transformer_dict.readableType.handle().is_undefined() {
993 return Err(Error::Range(c"readableType is set".to_owned()));
994 }
995
996 if !transformer_dict.writableType.handle().is_undefined() {
998 return Err(Error::Range(c"writableType is set".to_owned()));
999 }
1000
1001 let readable_high_water_mark = extract_high_water_mark(readable_strategy, 0.0)?;
1003
1004 let readable_size_algorithm = extract_size_algorithm(readable_strategy, CanGc::from_cx(cx));
1006
1007 let writable_high_water_mark = extract_high_water_mark(writable_strategy, 1.0)?;
1009
1010 let writable_size_algorithm = extract_size_algorithm(writable_strategy, CanGc::from_cx(cx));
1012
1013 let start_promise = Promise::new2(cx, global);
1015
1016 let stream = TransformStream::new_with_proto(global, proto, CanGc::from_cx(cx));
1019 stream.initialize(
1020 cx,
1021 global,
1022 start_promise.clone(),
1023 writable_high_water_mark,
1024 writable_size_algorithm,
1025 readable_high_water_mark,
1026 readable_size_algorithm,
1027 )?;
1028
1029 stream.set_up_transform_stream_default_controller_from_transformer(
1031 global,
1032 transformer_obj.handle(),
1033 &transformer_dict,
1034 CanGc::from_cx(cx),
1035 );
1036
1037 if let Some(start) = &transformer_dict.start {
1041 rooted!(&in(cx) let mut result_object = ptr::null_mut::<JSObject>());
1042 rooted!(&in(cx) let mut result: JSVal);
1043 rooted!(&in(cx) let this_object = transformer_obj.get());
1044 start.Call_(
1045 &this_object.handle(),
1046 &stream.get_controller(),
1047 result.handle_mut(),
1048 ExceptionHandling::Rethrow,
1049 CanGc::from_cx(cx),
1050 )?;
1051 let is_promise = unsafe {
1052 if result.is_object() {
1053 result_object.set(result.to_object());
1054 IsPromiseObject(result_object.handle().into_handle())
1055 } else {
1056 false
1057 }
1058 };
1059 let promise = if is_promise {
1060 Promise::new_with_js_promise(result_object.handle(), cx.into())
1061 } else {
1062 Promise::new_resolved(global, cx.into(), result.get(), CanGc::from_cx(cx))
1063 };
1064 start_promise.resolve_native(&promise, CanGc::from_cx(cx));
1065 } else {
1066 start_promise.resolve_native(&(), CanGc::from_cx(cx));
1068 };
1069
1070 Ok(stream)
1071 }
1072
1073 fn Readable(&self) -> DomRoot<ReadableStream> {
1075 self.readable.get().expect("readable stream is not set")
1077 }
1078
1079 fn Writable(&self) -> DomRoot<WritableStream> {
1081 self.writable.get().expect("writable stream is not set")
1083 }
1084}
1085
1086impl Transferable for TransformStream {
1088 type Index = MessagePortIndex;
1089 type Data = TransformStreamData;
1090
1091 fn transfer(
1093 &self,
1094 cx: &mut js::context::JSContext,
1095 ) -> Fallible<(MessagePortId, TransformStreamData)> {
1096 let global = self.global();
1097 let mut realm = enter_auto_realm(cx, &*global);
1098 let mut realm = realm.current_realm();
1099 let cx = &mut realm;
1100
1101 let readable = self.get_readable();
1103
1104 let writable = self.get_writable();
1106
1107 if readable.is_locked() || writable.is_locked() {
1112 return Err(Error::DataClone(None));
1113 }
1114
1115 let port1 = MessagePort::new(&global, CanGc::from_cx(cx));
1117 global.track_message_port(&port1, None);
1118 let port1_peer = MessagePort::new(&global, CanGc::from_cx(cx));
1119 global.track_message_port(&port1_peer, None);
1120 global.entangle_ports(*port1.message_port_id(), *port1_peer.message_port_id());
1121
1122 let proxy_readable = ReadableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
1123 proxy_readable.setup_cross_realm_transform_readable(cx, &port1);
1124 proxy_readable
1125 .pipe_to(cx, &global, &writable, false, false, false, None)
1126 .set_promise_is_handled();
1127
1128 let port2 = MessagePort::new(&global, CanGc::from_cx(cx));
1130 global.track_message_port(&port2, None);
1131 let port2_peer = MessagePort::new(&global, CanGc::from_cx(cx));
1132 global.track_message_port(&port2_peer, None);
1133 global.entangle_ports(*port2.message_port_id(), *port2_peer.message_port_id());
1134
1135 let proxy_writable = WritableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
1136 proxy_writable.setup_cross_realm_transform_writable(cx, &port2);
1137
1138 readable
1140 .pipe_to(cx, &global, &proxy_writable, false, false, false, None)
1141 .set_promise_is_handled();
1142
1143 Ok((
1148 *port1_peer.message_port_id(),
1149 TransformStreamData {
1150 readable: port1_peer.transfer(cx)?,
1151 writable: port2_peer.transfer(cx)?,
1152 },
1153 ))
1154 }
1155
1156 fn transfer_receive(
1158 cx: &mut js::context::JSContext,
1159 owner: &GlobalScope,
1160 _id: MessagePortId,
1161 data: TransformStreamData,
1162 ) -> Result<DomRoot<Self>, ()> {
1163 let port1 = MessagePort::transfer_receive(cx, owner, data.readable.0, data.readable.1)?;
1164 let port2 = MessagePort::transfer_receive(cx, owner, data.writable.0, data.writable.1)?;
1165
1166 let proxy_readable = ReadableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1170 proxy_readable.setup_cross_realm_transform_readable(cx, &port2);
1171
1172 let proxy_writable = WritableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1176 proxy_writable.setup_cross_realm_transform_writable(cx, &port1);
1177
1178 let stream = TransformStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1184 stream.readable.set(Some(&proxy_readable));
1185 stream.writable.set(Some(&proxy_writable));
1186
1187 Ok(stream)
1188 }
1189
1190 fn serialized_storage<'a>(
1191 data: StructuredData<'a, '_>,
1192 ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
1193 match data {
1194 StructuredData::Reader(r) => &mut r.transform_streams_port_impls,
1195 StructuredData::Writer(w) => &mut w.transform_streams_port,
1196 }
1197 }
1198}