1use std::cell::Cell;
6use std::ptr::{self};
7use std::rc::Rc;
8
9use dom_struct::dom_struct;
10use js::jsapi::{Heap, IsPromiseObject, JSObject};
11use js::jsval::{JSVal, ObjectValue, UndefinedValue};
12use js::realm::CurrentRealm;
13use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle};
14use rustc_hash::FxHashMap;
15use script_bindings::callback::ExceptionHandling;
16use script_bindings::realms::InRealm;
17use servo_base::id::{MessagePortId, MessagePortIndex};
18use servo_constellation_traits::TransformStreamData;
19
20use super::readablestream::CrossRealmTransformReadable;
21use super::writablestream::CrossRealmTransformWritable;
22use crate::dom::bindings::cell::DomRefCell;
23use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::{
24 QueuingStrategy, QueuingStrategySize,
25};
26use crate::dom::bindings::codegen::Bindings::TransformStreamBinding::TransformStreamMethods;
27use crate::dom::bindings::codegen::Bindings::TransformerBinding::Transformer;
28use crate::dom::bindings::conversions::ConversionResult;
29use crate::dom::bindings::error::{Error, Fallible};
30use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
31use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
32use crate::dom::bindings::structuredclone::StructuredData;
33use crate::dom::bindings::transferable::Transferable;
34use crate::dom::globalscope::GlobalScope;
35use crate::dom::messageport::MessagePort;
36use crate::dom::promise::Promise;
37use crate::dom::promisenativehandler::Callback;
38use crate::dom::readablestream::{ReadableStream, create_readable_stream};
39use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
40use crate::dom::stream::transformstreamdefaultcontroller::TransformerType;
41use crate::dom::stream::underlyingsourcecontainer::UnderlyingSourceType;
42use crate::dom::stream::writablestream::create_writable_stream;
43use crate::dom::stream::writablestreamdefaultcontroller::UnderlyingSinkType;
44use crate::dom::types::{PromiseNativeHandler, TransformStreamDefaultController, WritableStream};
45use crate::realms::{enter_auto_realm, enter_realm};
46use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
47
48impl js::gc::Rootable for TransformBackPressureChangePromiseFulfillment {}
49
50#[derive(JSTraceable, MallocSizeOf)]
53#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
54struct TransformBackPressureChangePromiseFulfillment {
55 #[conditional_malloc_size_of]
57 result_promise: Rc<Promise>,
58
59 #[ignore_malloc_size_of = "mozjs"]
60 chunk: Box<Heap<JSVal>>,
61
62 writable: Dom<WritableStream>,
64
65 controller: Dom<TransformStreamDefaultController>,
66}
67
68impl Callback for TransformBackPressureChangePromiseFulfillment {
69 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
71 if self.writable.is_erroring() {
75 rooted!(&in(cx) let mut error = UndefinedValue());
76 self.writable.get_stored_error(error.handle_mut());
77 self.result_promise
78 .reject(cx.into(), error.handle(), CanGc::from_cx(cx));
79 return;
80 }
81
82 assert!(self.writable.is_writable());
84
85 rooted!(&in(cx) let mut chunk = UndefinedValue());
87 chunk.set(self.chunk.get());
88 let transform_result = self
89 .controller
90 .transform_stream_default_controller_perform_transform(
91 cx,
92 &self.writable.global(),
93 chunk.handle(),
94 )
95 .expect("perform transform failed");
96
97 let handler = PromiseNativeHandler::new(
100 &self.writable.global(),
101 Some(Box::new(PerformTransformFulfillment {
102 result_promise: self.result_promise.clone(),
103 })),
104 Some(Box::new(PerformTransformRejection {
105 result_promise: self.result_promise.clone(),
106 })),
107 CanGc::from_cx(cx),
108 );
109
110 let mut realm = enter_auto_realm(cx, &*self.writable.global());
111 let realm = &mut realm.current_realm();
112 let in_realm_proof = realm.into();
113 let comp = InRealm::Already(&in_realm_proof);
114 transform_result.append_native_handler(&handler, comp, CanGc::from_cx(realm));
115 }
116}
117
118#[derive(JSTraceable, MallocSizeOf)]
119#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
120struct PerformTransformFulfillment {
123 #[conditional_malloc_size_of]
124 result_promise: Rc<Promise>,
125}
126
127impl Callback for PerformTransformFulfillment {
128 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
129 let can_gc = CanGc::from_cx(cx);
130 self.result_promise.resolve_native(&(), can_gc);
132 }
133}
134
135#[derive(JSTraceable, MallocSizeOf)]
136#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
137struct PerformTransformRejection {
140 #[conditional_malloc_size_of]
141 result_promise: Rc<Promise>,
142}
143
144impl Callback for PerformTransformRejection {
145 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
146 let can_gc = CanGc::from_cx(cx);
147 self.result_promise.reject(cx.into(), v, can_gc);
149 }
150}
151
152#[derive(JSTraceable, MallocSizeOf)]
153#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
154struct BackpressureChangeRejection {
157 #[conditional_malloc_size_of]
158 result_promise: Rc<Promise>,
159}
160
161impl Callback for BackpressureChangeRejection {
162 fn callback(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) {
163 let can_gc = CanGc::from_cx(cx);
164 self.result_promise.reject(cx.into(), reason, can_gc);
165 }
166}
167
168impl js::gc::Rootable for CancelPromiseFulfillment {}
169
170#[derive(JSTraceable, MallocSizeOf)]
173#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
174struct CancelPromiseFulfillment {
175 readable: Dom<ReadableStream>,
176 controller: Dom<TransformStreamDefaultController>,
177 #[ignore_malloc_size_of = "mozjs"]
178 reason: Box<Heap<JSVal>>,
179}
180
181impl Callback for CancelPromiseFulfillment {
182 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
184 let can_gc = CanGc::from_cx(cx);
185 let cx: SafeJSContext = cx.into();
186 if self.readable.is_errored() {
188 rooted!(in(*cx) let mut error = UndefinedValue());
189 self.readable.get_stored_error(error.handle_mut());
190 self.controller
191 .get_finish_promise()
192 .expect("finish promise is not set")
193 .reject_native(&error.handle(), can_gc);
194 } else {
195 rooted!(in(*cx) let mut reason = UndefinedValue());
198 reason.set(self.reason.get());
199 self.readable
200 .get_default_controller()
201 .error(reason.handle(), can_gc);
202
203 self.controller
205 .get_finish_promise()
206 .expect("finish promise is not set")
207 .resolve_native(&(), can_gc);
208 }
209 }
210}
211
212impl js::gc::Rootable for CancelPromiseRejection {}
213
214#[derive(JSTraceable, MallocSizeOf)]
217#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
218struct CancelPromiseRejection {
219 readable: Dom<ReadableStream>,
220 controller: Dom<TransformStreamDefaultController>,
221}
222
223impl Callback for CancelPromiseRejection {
224 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
226 let can_gc = CanGc::from_cx(cx);
227 let cx: SafeJSContext = cx.into();
228 self.readable.get_default_controller().error(v, can_gc);
230
231 self.controller
233 .get_finish_promise()
234 .expect("finish promise is not set")
235 .reject(cx, v, can_gc);
236 }
237}
238
239impl js::gc::Rootable for SourceCancelPromiseFulfillment {}
240
241#[derive(JSTraceable, MallocSizeOf)]
244#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
245struct SourceCancelPromiseFulfillment {
246 writeable: Dom<WritableStream>,
247 controller: Dom<TransformStreamDefaultController>,
248 stream: Dom<TransformStream>,
249 #[ignore_malloc_size_of = "mozjs"]
250 reason: Box<Heap<JSVal>>,
251}
252
253impl Callback for SourceCancelPromiseFulfillment {
254 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
256 let finish_promise = self
258 .controller
259 .get_finish_promise()
260 .expect("finish promise is not set");
261
262 let global = &self.writeable.global();
263 if self.writeable.is_errored() {
265 rooted!(&in(cx) let mut error = UndefinedValue());
266 self.writeable.get_stored_error(error.handle_mut());
267 finish_promise.reject(cx.into(), error.handle(), CanGc::from_cx(cx));
268 } else {
269 rooted!(&in(cx) let mut reason = UndefinedValue());
272 reason.set(self.reason.get());
273 self.writeable
274 .get_default_controller()
275 .error_if_needed(cx, reason.handle(), global);
276
277 self.stream.unblock_write(global, CanGc::from_cx(cx));
279
280 finish_promise.resolve_native(&(), CanGc::from_cx(cx));
282 }
283 }
284}
285
286impl js::gc::Rootable for SourceCancelPromiseRejection {}
287
288#[derive(JSTraceable, MallocSizeOf)]
291#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
292struct SourceCancelPromiseRejection {
293 writeable: Dom<WritableStream>,
294 controller: Dom<TransformStreamDefaultController>,
295 stream: Dom<TransformStream>,
296}
297
298impl Callback for SourceCancelPromiseRejection {
299 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
301 let global = &self.writeable.global();
303
304 self.writeable
305 .get_default_controller()
306 .error_if_needed(cx, v, global);
307
308 self.stream.unblock_write(global, CanGc::from_cx(cx));
310
311 self.controller
313 .get_finish_promise()
314 .expect("finish promise is not set")
315 .reject(cx.into(), v, CanGc::from_cx(cx));
316 }
317}
318
319impl js::gc::Rootable for FlushPromiseFulfillment {}
320
321#[derive(JSTraceable, MallocSizeOf)]
324#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
325struct FlushPromiseFulfillment {
326 readable: Dom<ReadableStream>,
327 controller: Dom<TransformStreamDefaultController>,
328}
329
330impl Callback for FlushPromiseFulfillment {
331 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
333 let can_gc = CanGc::from_cx(cx);
334 let cx: SafeJSContext = cx.into();
335 let finish_promise = self
337 .controller
338 .get_finish_promise()
339 .expect("finish promise is not set");
340
341 if self.readable.is_errored() {
343 rooted!(in(*cx) let mut error = UndefinedValue());
344 self.readable.get_stored_error(error.handle_mut());
345 finish_promise.reject(cx, error.handle(), can_gc);
346 } else {
347 self.readable.get_default_controller().close(can_gc);
350
351 finish_promise.resolve_native(&(), can_gc);
353 }
354 }
355}
356
357impl js::gc::Rootable for FlushPromiseRejection {}
358#[derive(JSTraceable, MallocSizeOf)]
362#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
363struct FlushPromiseRejection {
364 readable: Dom<ReadableStream>,
365 controller: Dom<TransformStreamDefaultController>,
366}
367
368impl Callback for FlushPromiseRejection {
369 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
371 let can_gc = CanGc::from_cx(cx);
372 let cx: SafeJSContext = cx.into();
373 self.readable.get_default_controller().error(v, can_gc);
376
377 self.controller
379 .get_finish_promise()
380 .expect("finish promise is not set")
381 .reject(cx, v, can_gc);
382 }
383}
384
385impl js::gc::Rootable for CrossRealmTransform {}
386
387#[derive(Clone, JSTraceable, MallocSizeOf)]
390#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
391pub(crate) enum CrossRealmTransform {
392 Readable(CrossRealmTransformReadable),
394 Writable(CrossRealmTransformWritable),
396}
397
398#[dom_struct]
400pub struct TransformStream {
401 reflector_: Reflector,
402
403 backpressure: Cell<bool>,
405
406 #[conditional_malloc_size_of]
408 backpressure_change_promise: DomRefCell<Option<Rc<Promise>>>,
409
410 controller: MutNullableDom<TransformStreamDefaultController>,
412
413 detached: Cell<bool>,
415
416 readable: MutNullableDom<ReadableStream>,
418
419 writable: MutNullableDom<WritableStream>,
421}
422
423impl TransformStream {
424 fn new_inherited() -> TransformStream {
426 TransformStream {
427 reflector_: Reflector::new(),
428 backpressure: Default::default(),
429 backpressure_change_promise: DomRefCell::new(None),
430 controller: MutNullableDom::new(None),
431 detached: Cell::new(false),
432 readable: MutNullableDom::new(None),
433 writable: MutNullableDom::new(None),
434 }
435 }
436
437 pub(crate) fn new_with_proto(
438 global: &GlobalScope,
439 proto: Option<SafeHandleObject>,
440 can_gc: CanGc,
441 ) -> DomRoot<TransformStream> {
442 reflect_dom_object_with_proto(
443 Box::new(TransformStream::new_inherited()),
444 global,
445 proto,
446 can_gc,
447 )
448 }
449
450 pub(crate) fn set_up(
453 &self,
454 cx: SafeJSContext,
455 global: &GlobalScope,
456 transformer_type: TransformerType,
457 can_gc: CanGc,
458 ) -> Fallible<()> {
459 let writable_high_water_mark = 1.0;
461
462 let writable_size_algorithm = extract_size_algorithm(&Default::default(), can_gc);
464
465 let readable_high_water_mark = 0.0;
467
468 let readable_size_algorithm = extract_size_algorithm(&Default::default(), can_gc);
470
471 let start_promise = Promise::new_resolved(global, cx, (), can_gc);
478
479 self.initialize(
483 cx,
484 global,
485 start_promise,
486 writable_high_water_mark,
487 writable_size_algorithm,
488 readable_high_water_mark,
489 readable_size_algorithm,
490 can_gc,
491 )?;
492
493 let controller = TransformStreamDefaultController::new(global, transformer_type, can_gc);
495
496 self.set_up_transform_stream_default_controller(&controller);
500
501 Ok(())
502 }
503
504 pub(crate) fn get_controller(&self) -> DomRoot<TransformStreamDefaultController> {
505 self.controller.get().expect("controller is not set")
506 }
507
508 pub(crate) fn get_writable(&self) -> DomRoot<WritableStream> {
509 self.writable.get().expect("writable stream is not set")
510 }
511
512 pub(crate) fn get_readable(&self) -> DomRoot<ReadableStream> {
513 self.readable.get().expect("readable stream is not set")
514 }
515
516 pub(crate) fn get_backpressure(&self) -> bool {
517 self.backpressure.get()
518 }
519
520 #[allow(clippy::too_many_arguments)]
522 fn initialize(
523 &self,
524 cx: SafeJSContext,
525 global: &GlobalScope,
526 start_promise: Rc<Promise>,
527 writable_high_water_mark: f64,
528 writable_size_algorithm: Rc<QueuingStrategySize>,
529 readable_high_water_mark: f64,
530 readable_size_algorithm: Rc<QueuingStrategySize>,
531 can_gc: CanGc,
532 ) -> Fallible<()> {
533 let writable = create_writable_stream(
545 cx,
546 global,
547 writable_high_water_mark,
548 writable_size_algorithm,
549 UnderlyingSinkType::Transform(Dom::from_ref(self), start_promise.clone()),
550 can_gc,
551 )?;
552 self.writable.set(Some(&writable));
553
554 let readable = create_readable_stream(
566 global,
567 UnderlyingSourceType::Transform(Dom::from_ref(self), start_promise),
568 Some(readable_size_algorithm),
569 Some(readable_high_water_mark),
570 can_gc,
571 );
572 self.readable.set(Some(&readable));
573
574 self.set_backpressure(global, true, can_gc);
579
580 self.controller.set(None);
582
583 Ok(())
584 }
585
586 pub(crate) fn set_backpressure(&self, global: &GlobalScope, backpressure: bool, can_gc: CanGc) {
588 assert!(self.backpressure.get() != backpressure);
590
591 if let Some(promise) = self.backpressure_change_promise.borrow_mut().take() {
594 promise.resolve_native(&(), can_gc);
595 }
596
597 *self.backpressure_change_promise.borrow_mut() = Some(Promise::new(global, can_gc));
599
600 self.backpressure.set(backpressure);
602 }
603
604 fn set_up_transform_stream_default_controller(
606 &self,
607 controller: &TransformStreamDefaultController,
608 ) {
609 assert!(self.controller.get().is_none());
614
615 controller.set_stream(self);
617
618 self.controller.set(Some(controller));
620
621 }
626
627 fn set_up_transform_stream_default_controller_from_transformer(
629 &self,
630 global: &GlobalScope,
631 transformer_obj: SafeHandleObject,
632 transformer: &Transformer,
633 can_gc: CanGc,
634 ) {
635 let transformer_type = TransformerType::new_from_js_transformer(transformer);
637 let controller = TransformStreamDefaultController::new(global, transformer_type, can_gc);
638
639 controller.set_transform_obj(transformer_obj);
662
663 self.set_up_transform_stream_default_controller(&controller);
666 }
667
668 pub(crate) fn transform_stream_default_sink_write_algorithm(
670 &self,
671 cx: &mut js::context::JSContext,
672 global: &GlobalScope,
673 chunk: SafeHandleValue,
674 ) -> Fallible<Rc<Promise>> {
675 assert!(self.writable.get().is_some());
677
678 let controller = self.controller.get().expect("controller is not set");
680
681 if self.backpressure.get() {
683 let backpressure_change_promise = self.backpressure_change_promise.borrow();
685
686 assert!(backpressure_change_promise.is_some());
688
689 let result_promise = Promise::new2(cx, global);
691 rooted!(&in(cx) let mut fulfillment_handler = Some(TransformBackPressureChangePromiseFulfillment {
692 controller: Dom::from_ref(&controller),
693 writable: Dom::from_ref(&self.writable.get().expect("writable stream")),
694 chunk: Heap::boxed(chunk.get()),
695 result_promise: result_promise.clone(),
696 }));
697
698 let handler = PromiseNativeHandler::new(
699 global,
700 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
701 Some(Box::new(BackpressureChangeRejection {
702 result_promise: result_promise.clone(),
703 })),
704 CanGc::from_cx(cx),
705 );
706 let mut realm = enter_auto_realm(cx, global);
707 let realm = &mut realm.current_realm();
708 let in_realm_proof = realm.into();
709 let comp = InRealm::Already(&in_realm_proof);
710 backpressure_change_promise
711 .as_ref()
712 .expect("Promise must be some by now.")
713 .append_native_handler(&handler, comp, CanGc::from_cx(realm));
714
715 return Ok(result_promise);
716 }
717
718 controller.transform_stream_default_controller_perform_transform(cx, global, chunk)
720 }
721
722 pub(crate) fn transform_stream_default_sink_abort_algorithm(
724 &self,
725 cx: SafeJSContext,
726 global: &GlobalScope,
727 reason: SafeHandleValue,
728 can_gc: CanGc,
729 ) -> Fallible<Rc<Promise>> {
730 let controller = self.controller.get().expect("controller is not set");
732
733 if let Some(finish_promise) = controller.get_finish_promise() {
735 return Ok(finish_promise);
736 }
737
738 let readable = self.readable.get().expect("readable stream is not set");
740
741 controller.set_finish_promise(Promise::new(global, can_gc));
743
744 let cancel_promise = controller.perform_cancel(cx, global, reason, can_gc)?;
746
747 controller.clear_algorithms();
749
750 let handler = PromiseNativeHandler::new(
752 global,
753 Some(Box::new(CancelPromiseFulfillment {
754 readable: Dom::from_ref(&readable),
755 controller: Dom::from_ref(&controller),
756 reason: Heap::boxed(reason.get()),
757 })),
758 Some(Box::new(CancelPromiseRejection {
759 readable: Dom::from_ref(&readable),
760 controller: Dom::from_ref(&controller),
761 })),
762 can_gc,
763 );
764 let realm = enter_realm(global);
765 let comp = InRealm::Entered(&realm);
766 cancel_promise.append_native_handler(&handler, comp, can_gc);
767
768 let finish_promise = controller
770 .get_finish_promise()
771 .expect("finish promise is not set");
772 Ok(finish_promise)
773 }
774
775 pub(crate) fn transform_stream_default_sink_close_algorithm(
777 &self,
778 cx: &mut js::context::JSContext,
779 global: &GlobalScope,
780 ) -> Fallible<Rc<Promise>> {
781 let controller = self
783 .controller
784 .get()
785 .ok_or(Error::Type(c"controller is not set".to_owned()))?;
786
787 if let Some(finish_promise) = controller.get_finish_promise() {
789 return Ok(finish_promise);
790 }
791
792 let readable = self
794 .readable
795 .get()
796 .ok_or(Error::Type(c"readable stream is not set".to_owned()))?;
797
798 controller.set_finish_promise(Promise::new2(cx, global));
800
801 let flush_promise = controller.perform_flush(cx, global)?;
803
804 controller.clear_algorithms();
806
807 let handler = PromiseNativeHandler::new(
809 global,
810 Some(Box::new(FlushPromiseFulfillment {
811 readable: Dom::from_ref(&readable),
812 controller: Dom::from_ref(&controller),
813 })),
814 Some(Box::new(FlushPromiseRejection {
815 readable: Dom::from_ref(&readable),
816 controller: Dom::from_ref(&controller),
817 })),
818 CanGc::from_cx(cx),
819 );
820
821 let mut realm = enter_auto_realm(cx, global);
822 let realm = &mut realm.current_realm();
823 let in_realm_proof = realm.into();
824 let comp = InRealm::Already(&in_realm_proof);
825 flush_promise.append_native_handler(&handler, comp, CanGc::from_cx(realm));
826 let finish_promise = controller
828 .get_finish_promise()
829 .expect("finish promise is not set");
830 Ok(finish_promise)
831 }
832
833 pub(crate) fn transform_stream_default_source_cancel(
835 &self,
836 cx: SafeJSContext,
837 global: &GlobalScope,
838 reason: SafeHandleValue,
839 can_gc: CanGc,
840 ) -> Fallible<Rc<Promise>> {
841 let controller = self
843 .controller
844 .get()
845 .ok_or(Error::Type(c"controller is not set".to_owned()))?;
846
847 if let Some(finish_promise) = controller.get_finish_promise() {
849 return Ok(finish_promise);
850 }
851
852 let writable = self
854 .writable
855 .get()
856 .ok_or(Error::Type(c"writable stream is not set".to_owned()))?;
857
858 controller.set_finish_promise(Promise::new(global, can_gc));
860
861 let cancel_promise = controller.perform_cancel(cx, global, reason, can_gc)?;
863
864 controller.clear_algorithms();
866
867 let handler = PromiseNativeHandler::new(
869 global,
870 Some(Box::new(SourceCancelPromiseFulfillment {
871 writeable: Dom::from_ref(&writable),
872 controller: Dom::from_ref(&controller),
873 stream: Dom::from_ref(self),
874 reason: Heap::boxed(reason.get()),
875 })),
876 Some(Box::new(SourceCancelPromiseRejection {
877 writeable: Dom::from_ref(&writable),
878 controller: Dom::from_ref(&controller),
879 stream: Dom::from_ref(self),
880 })),
881 can_gc,
882 );
883
884 let finish_promise = controller
886 .get_finish_promise()
887 .expect("finish promise is not set");
888 let realm = enter_realm(global);
889 let comp = InRealm::Entered(&realm);
890 cancel_promise.append_native_handler(&handler, comp, can_gc);
891 Ok(finish_promise)
892 }
893
894 pub(crate) fn transform_stream_default_source_pull(
896 &self,
897 global: &GlobalScope,
898 can_gc: CanGc,
899 ) -> Fallible<Rc<Promise>> {
900 assert!(self.backpressure.get());
902
903 assert!(self.backpressure_change_promise.borrow().is_some());
905
906 self.set_backpressure(global, false, can_gc);
908
909 Ok(self
911 .backpressure_change_promise
912 .borrow()
913 .clone()
914 .expect("Promise must be some by now."))
915 }
916
917 pub(crate) fn error_writable_and_unblock_write(
919 &self,
920 cx: &mut js::context::JSContext,
921 global: &GlobalScope,
922 error: SafeHandleValue,
923 ) {
924 self.get_controller().clear_algorithms();
926
927 self.get_writable()
929 .get_default_controller()
930 .error_if_needed(cx, error, global);
931
932 self.unblock_write(global, CanGc::from_cx(cx))
934 }
935
936 pub(crate) fn unblock_write(&self, global: &GlobalScope, can_gc: CanGc) {
938 if self.backpressure.get() {
940 self.set_backpressure(global, false, can_gc);
941 }
942 }
943
944 pub(crate) fn error(
946 &self,
947 cx: &mut js::context::JSContext,
948 global: &GlobalScope,
949 error: SafeHandleValue,
950 ) {
951 self.get_readable()
953 .get_default_controller()
954 .error(error, CanGc::from_cx(cx));
955
956 self.error_writable_and_unblock_write(cx, global, error);
958 }
959}
960
961impl TransformStreamMethods<crate::DomTypeHolder> for TransformStream {
962 #[expect(unsafe_code)]
964 fn Constructor(
965 cx: SafeJSContext,
966 global: &GlobalScope,
967 proto: Option<SafeHandleObject>,
968 can_gc: CanGc,
969 transformer: Option<*mut JSObject>,
970 writable_strategy: &QueuingStrategy,
971 readable_strategy: &QueuingStrategy,
972 ) -> Fallible<DomRoot<TransformStream>> {
973 rooted!(in(*cx) let transformer_obj = transformer.unwrap_or(ptr::null_mut()));
975
976 let transformer_dict = if !transformer_obj.is_null() {
979 rooted!(in(*cx) let obj_val = ObjectValue(transformer_obj.get()));
980 match Transformer::new(cx, obj_val.handle(), can_gc) {
981 Ok(ConversionResult::Success(val)) => val,
982 Ok(ConversionResult::Failure(error)) => {
983 return Err(Error::Type(error.into_owned()));
984 },
985 _ => {
986 return Err(Error::JSFailed);
987 },
988 }
989 } else {
990 Transformer::empty()
991 };
992
993 if !transformer_dict.readableType.handle().is_undefined() {
995 return Err(Error::Range(c"readableType is set".to_owned()));
996 }
997
998 if !transformer_dict.writableType.handle().is_undefined() {
1000 return Err(Error::Range(c"writableType is set".to_owned()));
1001 }
1002
1003 let readable_high_water_mark = extract_high_water_mark(readable_strategy, 0.0)?;
1005
1006 let readable_size_algorithm = extract_size_algorithm(readable_strategy, can_gc);
1008
1009 let writable_high_water_mark = extract_high_water_mark(writable_strategy, 1.0)?;
1011
1012 let writable_size_algorithm = extract_size_algorithm(writable_strategy, can_gc);
1014
1015 let start_promise = Promise::new(global, can_gc);
1017
1018 let stream = TransformStream::new_with_proto(global, proto, can_gc);
1021 stream.initialize(
1022 cx,
1023 global,
1024 start_promise.clone(),
1025 writable_high_water_mark,
1026 writable_size_algorithm,
1027 readable_high_water_mark,
1028 readable_size_algorithm,
1029 can_gc,
1030 )?;
1031
1032 stream.set_up_transform_stream_default_controller_from_transformer(
1034 global,
1035 transformer_obj.handle(),
1036 &transformer_dict,
1037 can_gc,
1038 );
1039
1040 if let Some(start) = &transformer_dict.start {
1044 rooted!(in(*cx) let mut result_object = ptr::null_mut::<JSObject>());
1045 rooted!(in(*cx) let mut result: JSVal);
1046 rooted!(in(*cx) let this_object = transformer_obj.get());
1047 start.Call_(
1048 &this_object.handle(),
1049 &stream.get_controller(),
1050 result.handle_mut(),
1051 ExceptionHandling::Rethrow,
1052 can_gc,
1053 )?;
1054 let is_promise = unsafe {
1055 if result.is_object() {
1056 result_object.set(result.to_object());
1057 IsPromiseObject(result_object.handle().into_handle())
1058 } else {
1059 false
1060 }
1061 };
1062 let promise = if is_promise {
1063 Promise::new_with_js_promise(result_object.handle(), cx)
1064 } else {
1065 Promise::new_resolved(global, cx, result.get(), can_gc)
1066 };
1067 start_promise.resolve_native(&promise, can_gc);
1068 } else {
1069 start_promise.resolve_native(&(), can_gc);
1071 };
1072
1073 Ok(stream)
1074 }
1075
1076 fn Readable(&self) -> DomRoot<ReadableStream> {
1078 self.readable.get().expect("readable stream is not set")
1080 }
1081
1082 fn Writable(&self) -> DomRoot<WritableStream> {
1084 self.writable.get().expect("writable stream is not set")
1086 }
1087}
1088
1089impl Transferable for TransformStream {
1091 type Index = MessagePortIndex;
1092 type Data = TransformStreamData;
1093
1094 fn transfer(
1096 &self,
1097 cx: &mut js::context::JSContext,
1098 ) -> Fallible<(MessagePortId, TransformStreamData)> {
1099 let global = self.global();
1100 let mut realm = enter_auto_realm(cx, &*global);
1101 let mut realm = realm.current_realm();
1102 let cx = &mut realm;
1103
1104 let readable = self.get_readable();
1106
1107 let writable = self.get_writable();
1109
1110 if readable.is_locked() || writable.is_locked() {
1115 return Err(Error::DataClone(None));
1116 }
1117
1118 let port1 = MessagePort::new(&global, CanGc::from_cx(cx));
1120 global.track_message_port(&port1, None);
1121 let port1_peer = MessagePort::new(&global, CanGc::from_cx(cx));
1122 global.track_message_port(&port1_peer, None);
1123 global.entangle_ports(*port1.message_port_id(), *port1_peer.message_port_id());
1124
1125 let proxy_readable = ReadableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
1126 proxy_readable.setup_cross_realm_transform_readable(cx, &port1);
1127 proxy_readable
1128 .pipe_to(cx, &global, &writable, false, false, false, None)
1129 .set_promise_is_handled();
1130
1131 let port2 = MessagePort::new(&global, CanGc::from_cx(cx));
1133 global.track_message_port(&port2, None);
1134 let port2_peer = MessagePort::new(&global, CanGc::from_cx(cx));
1135 global.track_message_port(&port2_peer, None);
1136 global.entangle_ports(*port2.message_port_id(), *port2_peer.message_port_id());
1137
1138 let proxy_writable = WritableStream::new_with_proto(&global, None, CanGc::from_cx(cx));
1139 proxy_writable.setup_cross_realm_transform_writable(cx, &port2);
1140
1141 readable
1143 .pipe_to(cx, &global, &proxy_writable, false, false, false, None)
1144 .set_promise_is_handled();
1145
1146 Ok((
1151 *port1_peer.message_port_id(),
1152 TransformStreamData {
1153 readable: port1_peer.transfer(cx)?,
1154 writable: port2_peer.transfer(cx)?,
1155 },
1156 ))
1157 }
1158
1159 fn transfer_receive(
1161 cx: &mut js::context::JSContext,
1162 owner: &GlobalScope,
1163 _id: MessagePortId,
1164 data: TransformStreamData,
1165 ) -> Result<DomRoot<Self>, ()> {
1166 let port1 = MessagePort::transfer_receive(cx, owner, data.readable.0, data.readable.1)?;
1167 let port2 = MessagePort::transfer_receive(cx, owner, data.writable.0, data.writable.1)?;
1168
1169 let proxy_readable = ReadableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1173 proxy_readable.setup_cross_realm_transform_readable(cx, &port2);
1174
1175 let proxy_writable = WritableStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1179 proxy_writable.setup_cross_realm_transform_writable(cx, &port1);
1180
1181 let stream = TransformStream::new_with_proto(owner, None, CanGc::from_cx(cx));
1187 stream.readable.set(Some(&proxy_readable));
1188 stream.writable.set(Some(&proxy_writable));
1189
1190 Ok(stream)
1191 }
1192
1193 fn serialized_storage<'a>(
1194 data: StructuredData<'a, '_>,
1195 ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
1196 match data {
1197 StructuredData::Reader(r) => &mut r.transform_streams_port_impls,
1198 StructuredData::Writer(w) => &mut w.transform_streams_port,
1199 }
1200 }
1201}