1use std::cell::Cell;
6use std::ptr::{self};
7use std::rc::Rc;
8
9use base::id::{MessagePortId, MessagePortIndex};
10use constellation_traits::TransformStreamData;
11use dom_struct::dom_struct;
12use js::jsapi::{Heap, IsPromiseObject, JSObject};
13use js::jsval::{JSVal, ObjectValue, UndefinedValue};
14use js::realm::CurrentRealm;
15use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle};
16use rustc_hash::FxHashMap;
17use script_bindings::callback::ExceptionHandling;
18use script_bindings::realms::InRealm;
19
20use super::readablestream::CrossRealmTransformReadable;
21use super::writablestream::CrossRealmTransformWritable;
22use crate::dom::bindings::cell::DomRefCell;
23use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::{
24 QueuingStrategy, QueuingStrategySize,
25};
26use crate::dom::bindings::codegen::Bindings::TransformStreamBinding::TransformStreamMethods;
27use crate::dom::bindings::codegen::Bindings::TransformerBinding::Transformer;
28use crate::dom::bindings::conversions::ConversionResult;
29use crate::dom::bindings::error::{Error, Fallible};
30use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
31use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
32use crate::dom::bindings::structuredclone::StructuredData;
33use crate::dom::bindings::transferable::Transferable;
34use crate::dom::globalscope::GlobalScope;
35use crate::dom::messageport::MessagePort;
36use crate::dom::promise::Promise;
37use crate::dom::promisenativehandler::Callback;
38use crate::dom::readablestream::{ReadableStream, create_readable_stream};
39use crate::dom::stream::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
40use crate::dom::stream::transformstreamdefaultcontroller::TransformerType;
41use crate::dom::stream::underlyingsourcecontainer::UnderlyingSourceType;
42use crate::dom::stream::writablestream::create_writable_stream;
43use crate::dom::stream::writablestreamdefaultcontroller::UnderlyingSinkType;
44use crate::dom::types::{PromiseNativeHandler, TransformStreamDefaultController, WritableStream};
45use crate::realms::enter_realm;
46use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
47
48impl js::gc::Rootable for TransformBackPressureChangePromiseFulfillment {}
49
50#[derive(JSTraceable, MallocSizeOf)]
53#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
54struct TransformBackPressureChangePromiseFulfillment {
55 #[conditional_malloc_size_of]
57 result_promise: Rc<Promise>,
58
59 #[ignore_malloc_size_of = "mozjs"]
60 chunk: Box<Heap<JSVal>>,
61
62 writable: Dom<WritableStream>,
64
65 controller: Dom<TransformStreamDefaultController>,
66}
67
68impl Callback for TransformBackPressureChangePromiseFulfillment {
69 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
71 let can_gc = CanGc::from_cx(cx);
72 let cx: SafeJSContext = cx.into();
73 if self.writable.is_erroring() {
77 rooted!(in(*cx) let mut error = UndefinedValue());
78 self.writable.get_stored_error(error.handle_mut());
79 self.result_promise.reject(cx, error.handle(), can_gc);
80 return;
81 }
82
83 assert!(self.writable.is_writable());
85
86 rooted!(in(*cx) let mut chunk = UndefinedValue());
88 chunk.set(self.chunk.get());
89 let transform_result = self
90 .controller
91 .transform_stream_default_controller_perform_transform(
92 cx,
93 &self.writable.global(),
94 chunk.handle(),
95 can_gc,
96 )
97 .expect("perform transform failed");
98
99 let handler = PromiseNativeHandler::new(
102 &self.writable.global(),
103 Some(Box::new(PerformTransformFulfillment {
104 result_promise: self.result_promise.clone(),
105 })),
106 Some(Box::new(PerformTransformRejection {
107 result_promise: self.result_promise.clone(),
108 })),
109 can_gc,
110 );
111
112 let realm = enter_realm(&*self.writable.global());
113 let comp = InRealm::Entered(&realm);
114 transform_result.append_native_handler(&handler, comp, can_gc);
115 }
116}
117
118#[derive(JSTraceable, MallocSizeOf)]
119#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
120struct PerformTransformFulfillment {
123 #[conditional_malloc_size_of]
124 result_promise: Rc<Promise>,
125}
126
127impl Callback for PerformTransformFulfillment {
128 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
129 let can_gc = CanGc::from_cx(cx);
130 self.result_promise.resolve_native(&(), can_gc);
132 }
133}
134
135#[derive(JSTraceable, MallocSizeOf)]
136#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
137struct PerformTransformRejection {
140 #[conditional_malloc_size_of]
141 result_promise: Rc<Promise>,
142}
143
144impl Callback for PerformTransformRejection {
145 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
146 let can_gc = CanGc::from_cx(cx);
147 self.result_promise.reject(cx.into(), v, can_gc);
149 }
150}
151
152#[derive(JSTraceable, MallocSizeOf)]
153#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
154struct BackpressureChangeRejection {
157 #[conditional_malloc_size_of]
158 result_promise: Rc<Promise>,
159}
160
161impl Callback for BackpressureChangeRejection {
162 fn callback(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) {
163 let can_gc = CanGc::from_cx(cx);
164 self.result_promise.reject(cx.into(), reason, can_gc);
165 }
166}
167
168impl js::gc::Rootable for CancelPromiseFulfillment {}
169
170#[derive(JSTraceable, MallocSizeOf)]
173#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
174struct CancelPromiseFulfillment {
175 readable: Dom<ReadableStream>,
176 controller: Dom<TransformStreamDefaultController>,
177 #[ignore_malloc_size_of = "mozjs"]
178 reason: Box<Heap<JSVal>>,
179}
180
181impl Callback for CancelPromiseFulfillment {
182 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
184 let can_gc = CanGc::from_cx(cx);
185 let cx: SafeJSContext = cx.into();
186 if self.readable.is_errored() {
188 rooted!(in(*cx) let mut error = UndefinedValue());
189 self.readable.get_stored_error(error.handle_mut());
190 self.controller
191 .get_finish_promise()
192 .expect("finish promise is not set")
193 .reject_native(&error.handle(), can_gc);
194 } else {
195 rooted!(in(*cx) let mut reason = UndefinedValue());
198 reason.set(self.reason.get());
199 self.readable
200 .get_default_controller()
201 .error(reason.handle(), can_gc);
202
203 self.controller
205 .get_finish_promise()
206 .expect("finish promise is not set")
207 .resolve_native(&(), can_gc);
208 }
209 }
210}
211
212impl js::gc::Rootable for CancelPromiseRejection {}
213
214#[derive(JSTraceable, MallocSizeOf)]
217#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
218struct CancelPromiseRejection {
219 readable: Dom<ReadableStream>,
220 controller: Dom<TransformStreamDefaultController>,
221}
222
223impl Callback for CancelPromiseRejection {
224 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
226 let can_gc = CanGc::from_cx(cx);
227 let cx: SafeJSContext = cx.into();
228 self.readable.get_default_controller().error(v, can_gc);
230
231 self.controller
233 .get_finish_promise()
234 .expect("finish promise is not set")
235 .reject(cx, v, can_gc);
236 }
237}
238
239impl js::gc::Rootable for SourceCancelPromiseFulfillment {}
240
241#[derive(JSTraceable, MallocSizeOf)]
244#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
245struct SourceCancelPromiseFulfillment {
246 writeable: Dom<WritableStream>,
247 controller: Dom<TransformStreamDefaultController>,
248 stream: Dom<TransformStream>,
249 #[ignore_malloc_size_of = "mozjs"]
250 reason: Box<Heap<JSVal>>,
251}
252
253impl Callback for SourceCancelPromiseFulfillment {
254 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
256 let can_gc = CanGc::from_cx(cx);
257 let cx: SafeJSContext = cx.into();
258 let finish_promise = self
260 .controller
261 .get_finish_promise()
262 .expect("finish promise is not set");
263
264 let global = &self.writeable.global();
265 if self.writeable.is_errored() {
267 rooted!(in(*cx) let mut error = UndefinedValue());
268 self.writeable.get_stored_error(error.handle_mut());
269 finish_promise.reject(cx, error.handle(), can_gc);
270 } else {
271 rooted!(in(*cx) let mut reason = UndefinedValue());
274 reason.set(self.reason.get());
275 self.writeable.get_default_controller().error_if_needed(
276 cx,
277 reason.handle(),
278 global,
279 can_gc,
280 );
281
282 self.stream.unblock_write(global, can_gc);
284
285 finish_promise.resolve_native(&(), can_gc);
287 }
288 }
289}
290
291impl js::gc::Rootable for SourceCancelPromiseRejection {}
292
293#[derive(JSTraceable, MallocSizeOf)]
296#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
297struct SourceCancelPromiseRejection {
298 writeable: Dom<WritableStream>,
299 controller: Dom<TransformStreamDefaultController>,
300 stream: Dom<TransformStream>,
301}
302
303impl Callback for SourceCancelPromiseRejection {
304 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
306 let can_gc = CanGc::from_cx(cx);
307 let cx: SafeJSContext = cx.into();
308 let global = &self.writeable.global();
310
311 self.writeable
312 .get_default_controller()
313 .error_if_needed(cx, v, global, can_gc);
314
315 self.stream.unblock_write(global, can_gc);
317
318 self.controller
320 .get_finish_promise()
321 .expect("finish promise is not set")
322 .reject(cx, v, can_gc);
323 }
324}
325
326impl js::gc::Rootable for FlushPromiseFulfillment {}
327
328#[derive(JSTraceable, MallocSizeOf)]
331#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
332struct FlushPromiseFulfillment {
333 readable: Dom<ReadableStream>,
334 controller: Dom<TransformStreamDefaultController>,
335}
336
337impl Callback for FlushPromiseFulfillment {
338 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
340 let can_gc = CanGc::from_cx(cx);
341 let cx: SafeJSContext = cx.into();
342 let finish_promise = self
344 .controller
345 .get_finish_promise()
346 .expect("finish promise is not set");
347
348 if self.readable.is_errored() {
350 rooted!(in(*cx) let mut error = UndefinedValue());
351 self.readable.get_stored_error(error.handle_mut());
352 finish_promise.reject(cx, error.handle(), can_gc);
353 } else {
354 self.readable.get_default_controller().close(can_gc);
357
358 finish_promise.resolve_native(&(), can_gc);
360 }
361 }
362}
363
364impl js::gc::Rootable for FlushPromiseRejection {}
365#[derive(JSTraceable, MallocSizeOf)]
369#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
370struct FlushPromiseRejection {
371 readable: Dom<ReadableStream>,
372 controller: Dom<TransformStreamDefaultController>,
373}
374
375impl Callback for FlushPromiseRejection {
376 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
378 let can_gc = CanGc::from_cx(cx);
379 let cx: SafeJSContext = cx.into();
380 self.readable.get_default_controller().error(v, can_gc);
383
384 self.controller
386 .get_finish_promise()
387 .expect("finish promise is not set")
388 .reject(cx, v, can_gc);
389 }
390}
391
392impl js::gc::Rootable for CrossRealmTransform {}
393
394#[derive(Clone, JSTraceable, MallocSizeOf)]
397#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
398pub(crate) enum CrossRealmTransform {
399 Readable(CrossRealmTransformReadable),
401 Writable(CrossRealmTransformWritable),
403}
404
405#[dom_struct]
407pub struct TransformStream {
408 reflector_: Reflector,
409
410 backpressure: Cell<bool>,
412
413 #[conditional_malloc_size_of]
415 backpressure_change_promise: DomRefCell<Option<Rc<Promise>>>,
416
417 controller: MutNullableDom<TransformStreamDefaultController>,
419
420 detached: Cell<bool>,
422
423 readable: MutNullableDom<ReadableStream>,
425
426 writable: MutNullableDom<WritableStream>,
428}
429
430impl TransformStream {
431 fn new_inherited() -> TransformStream {
433 TransformStream {
434 reflector_: Reflector::new(),
435 backpressure: Default::default(),
436 backpressure_change_promise: DomRefCell::new(None),
437 controller: MutNullableDom::new(None),
438 detached: Cell::new(false),
439 readable: MutNullableDom::new(None),
440 writable: MutNullableDom::new(None),
441 }
442 }
443
444 pub(crate) fn new_with_proto(
445 global: &GlobalScope,
446 proto: Option<SafeHandleObject>,
447 can_gc: CanGc,
448 ) -> DomRoot<TransformStream> {
449 reflect_dom_object_with_proto(
450 Box::new(TransformStream::new_inherited()),
451 global,
452 proto,
453 can_gc,
454 )
455 }
456
457 pub(crate) fn set_up(
460 &self,
461 cx: SafeJSContext,
462 global: &GlobalScope,
463 transformer_type: TransformerType,
464 can_gc: CanGc,
465 ) -> Fallible<()> {
466 let writable_high_water_mark = 1.0;
468
469 let writable_size_algorithm = extract_size_algorithm(&Default::default(), can_gc);
471
472 let readable_high_water_mark = 0.0;
474
475 let readable_size_algorithm = extract_size_algorithm(&Default::default(), can_gc);
477
478 let start_promise = Promise::new_resolved(global, cx, (), can_gc);
485
486 self.initialize(
490 cx,
491 global,
492 start_promise.clone(),
493 writable_high_water_mark,
494 writable_size_algorithm,
495 readable_high_water_mark,
496 readable_size_algorithm,
497 can_gc,
498 )?;
499
500 let controller = TransformStreamDefaultController::new(global, transformer_type, can_gc);
502
503 self.set_up_transform_stream_default_controller(&controller);
507
508 Ok(())
509 }
510
511 pub(crate) fn get_controller(&self) -> DomRoot<TransformStreamDefaultController> {
512 self.controller.get().expect("controller is not set")
513 }
514
515 pub(crate) fn get_writable(&self) -> DomRoot<WritableStream> {
516 self.writable.get().expect("writable stream is not set")
517 }
518
519 pub(crate) fn get_readable(&self) -> DomRoot<ReadableStream> {
520 self.readable.get().expect("readable stream is not set")
521 }
522
523 pub(crate) fn get_backpressure(&self) -> bool {
524 self.backpressure.get()
525 }
526
527 #[allow(clippy::too_many_arguments)]
529 fn initialize(
530 &self,
531 cx: SafeJSContext,
532 global: &GlobalScope,
533 start_promise: Rc<Promise>,
534 writable_high_water_mark: f64,
535 writable_size_algorithm: Rc<QueuingStrategySize>,
536 readable_high_water_mark: f64,
537 readable_size_algorithm: Rc<QueuingStrategySize>,
538 can_gc: CanGc,
539 ) -> Fallible<()> {
540 let writable = create_writable_stream(
552 cx,
553 global,
554 writable_high_water_mark,
555 writable_size_algorithm,
556 UnderlyingSinkType::Transform(Dom::from_ref(self), start_promise.clone()),
557 can_gc,
558 )?;
559 self.writable.set(Some(&writable));
560
561 let readable = create_readable_stream(
573 global,
574 UnderlyingSourceType::Transform(Dom::from_ref(self), start_promise.clone()),
575 Some(readable_size_algorithm),
576 Some(readable_high_water_mark),
577 can_gc,
578 );
579 self.readable.set(Some(&readable));
580
581 self.set_backpressure(global, true, can_gc);
586
587 self.controller.set(None);
589
590 Ok(())
591 }
592
593 pub(crate) fn set_backpressure(&self, global: &GlobalScope, backpressure: bool, can_gc: CanGc) {
595 assert!(self.backpressure.get() != backpressure);
597
598 if let Some(promise) = self.backpressure_change_promise.borrow_mut().take() {
601 promise.resolve_native(&(), can_gc);
602 }
603
604 *self.backpressure_change_promise.borrow_mut() = Some(Promise::new(global, can_gc));
606
607 self.backpressure.set(backpressure);
609 }
610
611 fn set_up_transform_stream_default_controller(
613 &self,
614 controller: &TransformStreamDefaultController,
615 ) {
616 assert!(self.controller.get().is_none());
621
622 controller.set_stream(self);
624
625 self.controller.set(Some(controller));
627
628 }
633
634 fn set_up_transform_stream_default_controller_from_transformer(
636 &self,
637 global: &GlobalScope,
638 transformer_obj: SafeHandleObject,
639 transformer: &Transformer,
640 can_gc: CanGc,
641 ) {
642 let transformer_type = TransformerType::new_from_js_transformer(transformer);
644 let controller = TransformStreamDefaultController::new(global, transformer_type, can_gc);
645
646 controller.set_transform_obj(transformer_obj);
669
670 self.set_up_transform_stream_default_controller(&controller);
673 }
674
675 pub(crate) fn transform_stream_default_sink_write_algorithm(
677 &self,
678 cx: SafeJSContext,
679 global: &GlobalScope,
680 chunk: SafeHandleValue,
681 can_gc: CanGc,
682 ) -> Fallible<Rc<Promise>> {
683 assert!(self.writable.get().is_some());
685
686 let controller = self.controller.get().expect("controller is not set");
688
689 if self.backpressure.get() {
691 let backpressure_change_promise = self.backpressure_change_promise.borrow();
693
694 assert!(backpressure_change_promise.is_some());
696
697 let result_promise = Promise::new(global, can_gc);
699 rooted!(in(*cx) let mut fulfillment_handler = Some(TransformBackPressureChangePromiseFulfillment {
700 controller: Dom::from_ref(&controller),
701 writable: Dom::from_ref(&self.writable.get().expect("writable stream")),
702 chunk: Heap::boxed(chunk.get()),
703 result_promise: result_promise.clone(),
704 }));
705
706 let handler = PromiseNativeHandler::new(
707 global,
708 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
709 Some(Box::new(BackpressureChangeRejection {
710 result_promise: result_promise.clone(),
711 })),
712 can_gc,
713 );
714 let realm = enter_realm(global);
715 let comp = InRealm::Entered(&realm);
716 backpressure_change_promise
717 .as_ref()
718 .expect("Promise must be some by now.")
719 .append_native_handler(&handler, comp, can_gc);
720
721 return Ok(result_promise);
722 }
723
724 controller.transform_stream_default_controller_perform_transform(cx, global, chunk, can_gc)
726 }
727
728 pub(crate) fn transform_stream_default_sink_abort_algorithm(
730 &self,
731 cx: SafeJSContext,
732 global: &GlobalScope,
733 reason: SafeHandleValue,
734 can_gc: CanGc,
735 ) -> Fallible<Rc<Promise>> {
736 let controller = self.controller.get().expect("controller is not set");
738
739 if let Some(finish_promise) = controller.get_finish_promise() {
741 return Ok(finish_promise);
742 }
743
744 let readable = self.readable.get().expect("readable stream is not set");
746
747 controller.set_finish_promise(Promise::new(global, can_gc));
749
750 let cancel_promise = controller.perform_cancel(cx, global, reason, can_gc)?;
752
753 controller.clear_algorithms();
755
756 let handler = PromiseNativeHandler::new(
758 global,
759 Some(Box::new(CancelPromiseFulfillment {
760 readable: Dom::from_ref(&readable),
761 controller: Dom::from_ref(&controller),
762 reason: Heap::boxed(reason.get()),
763 })),
764 Some(Box::new(CancelPromiseRejection {
765 readable: Dom::from_ref(&readable),
766 controller: Dom::from_ref(&controller),
767 })),
768 can_gc,
769 );
770 let realm = enter_realm(global);
771 let comp = InRealm::Entered(&realm);
772 cancel_promise.append_native_handler(&handler, comp, can_gc);
773
774 let finish_promise = controller
776 .get_finish_promise()
777 .expect("finish promise is not set");
778 Ok(finish_promise)
779 }
780
781 pub(crate) fn transform_stream_default_sink_close_algorithm(
783 &self,
784 cx: SafeJSContext,
785 global: &GlobalScope,
786 can_gc: CanGc,
787 ) -> Fallible<Rc<Promise>> {
788 let controller = self
790 .controller
791 .get()
792 .ok_or(Error::Type("controller is not set".to_string()))?;
793
794 if let Some(finish_promise) = controller.get_finish_promise() {
796 return Ok(finish_promise);
797 }
798
799 let readable = self
801 .readable
802 .get()
803 .ok_or(Error::Type("readable stream is not set".to_string()))?;
804
805 controller.set_finish_promise(Promise::new(global, can_gc));
807
808 let flush_promise = controller.perform_flush(cx, global, can_gc)?;
810
811 controller.clear_algorithms();
813
814 let handler = PromiseNativeHandler::new(
816 global,
817 Some(Box::new(FlushPromiseFulfillment {
818 readable: Dom::from_ref(&readable),
819 controller: Dom::from_ref(&controller),
820 })),
821 Some(Box::new(FlushPromiseRejection {
822 readable: Dom::from_ref(&readable),
823 controller: Dom::from_ref(&controller),
824 })),
825 can_gc,
826 );
827
828 let realm = enter_realm(global);
829 let comp = InRealm::Entered(&realm);
830 flush_promise.append_native_handler(&handler, comp, can_gc);
831 let finish_promise = controller
833 .get_finish_promise()
834 .expect("finish promise is not set");
835 Ok(finish_promise)
836 }
837
838 pub(crate) fn transform_stream_default_source_cancel(
840 &self,
841 cx: SafeJSContext,
842 global: &GlobalScope,
843 reason: SafeHandleValue,
844 can_gc: CanGc,
845 ) -> Fallible<Rc<Promise>> {
846 let controller = self
848 .controller
849 .get()
850 .ok_or(Error::Type("controller is not set".to_string()))?;
851
852 if let Some(finish_promise) = controller.get_finish_promise() {
854 return Ok(finish_promise);
855 }
856
857 let writable = self
859 .writable
860 .get()
861 .ok_or(Error::Type("writable stream is not set".to_string()))?;
862
863 controller.set_finish_promise(Promise::new(global, can_gc));
865
866 let cancel_promise = controller.perform_cancel(cx, global, reason, can_gc)?;
868
869 controller.clear_algorithms();
871
872 let handler = PromiseNativeHandler::new(
874 global,
875 Some(Box::new(SourceCancelPromiseFulfillment {
876 writeable: Dom::from_ref(&writable),
877 controller: Dom::from_ref(&controller),
878 stream: Dom::from_ref(self),
879 reason: Heap::boxed(reason.get()),
880 })),
881 Some(Box::new(SourceCancelPromiseRejection {
882 writeable: Dom::from_ref(&writable),
883 controller: Dom::from_ref(&controller),
884 stream: Dom::from_ref(self),
885 })),
886 can_gc,
887 );
888
889 let finish_promise = controller
891 .get_finish_promise()
892 .expect("finish promise is not set");
893 let realm = enter_realm(global);
894 let comp = InRealm::Entered(&realm);
895 cancel_promise.append_native_handler(&handler, comp, can_gc);
896 Ok(finish_promise)
897 }
898
899 pub(crate) fn transform_stream_default_source_pull(
901 &self,
902 global: &GlobalScope,
903 can_gc: CanGc,
904 ) -> Fallible<Rc<Promise>> {
905 assert!(self.backpressure.get());
907
908 assert!(self.backpressure_change_promise.borrow().is_some());
910
911 self.set_backpressure(global, false, can_gc);
913
914 Ok(self
916 .backpressure_change_promise
917 .borrow()
918 .clone()
919 .expect("Promise must be some by now."))
920 }
921
922 pub(crate) fn error_writable_and_unblock_write(
924 &self,
925 cx: SafeJSContext,
926 global: &GlobalScope,
927 error: SafeHandleValue,
928 can_gc: CanGc,
929 ) {
930 self.get_controller().clear_algorithms();
932
933 self.get_writable()
935 .get_default_controller()
936 .error_if_needed(cx, error, global, can_gc);
937
938 self.unblock_write(global, can_gc)
940 }
941
942 pub(crate) fn unblock_write(&self, global: &GlobalScope, can_gc: CanGc) {
944 if self.backpressure.get() {
946 self.set_backpressure(global, false, can_gc);
947 }
948 }
949
950 pub(crate) fn error(
952 &self,
953 cx: SafeJSContext,
954 global: &GlobalScope,
955 error: SafeHandleValue,
956 can_gc: CanGc,
957 ) {
958 self.get_readable()
960 .get_default_controller()
961 .error(error, can_gc);
962
963 self.error_writable_and_unblock_write(cx, global, error, can_gc);
965 }
966}
967
968impl TransformStreamMethods<crate::DomTypeHolder> for TransformStream {
969 #[expect(unsafe_code)]
971 fn Constructor(
972 cx: SafeJSContext,
973 global: &GlobalScope,
974 proto: Option<SafeHandleObject>,
975 can_gc: CanGc,
976 transformer: Option<*mut JSObject>,
977 writable_strategy: &QueuingStrategy,
978 readable_strategy: &QueuingStrategy,
979 ) -> Fallible<DomRoot<TransformStream>> {
980 rooted!(in(*cx) let transformer_obj = transformer.unwrap_or(ptr::null_mut()));
982
983 let transformer_dict = if !transformer_obj.is_null() {
986 rooted!(in(*cx) let obj_val = ObjectValue(transformer_obj.get()));
987 match Transformer::new(cx, obj_val.handle(), can_gc) {
988 Ok(ConversionResult::Success(val)) => val,
989 Ok(ConversionResult::Failure(error)) => return Err(Error::Type(error.to_string())),
990 _ => {
991 return Err(Error::JSFailed);
992 },
993 }
994 } else {
995 Transformer::empty()
996 };
997
998 if !transformer_dict.readableType.handle().is_undefined() {
1000 return Err(Error::Range("readableType is set".to_string()));
1001 }
1002
1003 if !transformer_dict.writableType.handle().is_undefined() {
1005 return Err(Error::Range("writableType is set".to_string()));
1006 }
1007
1008 let readable_high_water_mark = extract_high_water_mark(readable_strategy, 0.0)?;
1010
1011 let readable_size_algorithm = extract_size_algorithm(readable_strategy, can_gc);
1013
1014 let writable_high_water_mark = extract_high_water_mark(writable_strategy, 1.0)?;
1016
1017 let writable_size_algorithm = extract_size_algorithm(writable_strategy, can_gc);
1019
1020 let start_promise = Promise::new(global, can_gc);
1022
1023 let stream = TransformStream::new_with_proto(global, proto, can_gc);
1026 stream.initialize(
1027 cx,
1028 global,
1029 start_promise.clone(),
1030 writable_high_water_mark,
1031 writable_size_algorithm,
1032 readable_high_water_mark,
1033 readable_size_algorithm,
1034 can_gc,
1035 )?;
1036
1037 stream.set_up_transform_stream_default_controller_from_transformer(
1039 global,
1040 transformer_obj.handle(),
1041 &transformer_dict,
1042 can_gc,
1043 );
1044
1045 if let Some(start) = &transformer_dict.start {
1049 rooted!(in(*cx) let mut result_object = ptr::null_mut::<JSObject>());
1050 rooted!(in(*cx) let mut result: JSVal);
1051 rooted!(in(*cx) let this_object = transformer_obj.get());
1052 start.Call_(
1053 &this_object.handle(),
1054 &stream.get_controller(),
1055 result.handle_mut(),
1056 ExceptionHandling::Rethrow,
1057 can_gc,
1058 )?;
1059 let is_promise = unsafe {
1060 if result.is_object() {
1061 result_object.set(result.to_object());
1062 IsPromiseObject(result_object.handle().into_handle())
1063 } else {
1064 false
1065 }
1066 };
1067 let promise = if is_promise {
1068 Promise::new_with_js_promise(result_object.handle(), cx)
1069 } else {
1070 Promise::new_resolved(global, cx, result.get(), can_gc)
1071 };
1072 start_promise.resolve_native(&promise, can_gc);
1073 } else {
1074 start_promise.resolve_native(&(), can_gc);
1076 };
1077
1078 Ok(stream)
1079 }
1080
1081 fn Readable(&self) -> DomRoot<ReadableStream> {
1083 self.readable.get().expect("readable stream is not set")
1085 }
1086
1087 fn Writable(&self) -> DomRoot<WritableStream> {
1089 self.writable.get().expect("writable stream is not set")
1091 }
1092}
1093
1094impl Transferable for TransformStream {
1096 type Index = MessagePortIndex;
1097 type Data = TransformStreamData;
1098
1099 fn transfer(&self) -> Fallible<(MessagePortId, TransformStreamData)> {
1101 let global = self.global();
1102 let realm = enter_realm(&*global);
1103 let comp = InRealm::Entered(&realm);
1104 let cx = GlobalScope::get_cx();
1105 let can_gc = CanGc::note();
1106
1107 let readable = self.get_readable();
1109
1110 let writable = self.get_writable();
1112
1113 if readable.is_locked() || writable.is_locked() {
1118 return Err(Error::DataClone(None));
1119 }
1120
1121 let port1 = MessagePort::new(&global, can_gc);
1123 global.track_message_port(&port1, None);
1124 let port1_peer = MessagePort::new(&global, can_gc);
1125 global.track_message_port(&port1_peer, None);
1126 global.entangle_ports(*port1.message_port_id(), *port1_peer.message_port_id());
1127
1128 let proxy_readable = ReadableStream::new_with_proto(&global, None, can_gc);
1129 proxy_readable.setup_cross_realm_transform_readable(cx, &port1, can_gc);
1130 proxy_readable
1131 .pipe_to(
1132 cx, &global, &writable, false, false, false, None, comp, can_gc,
1133 )
1134 .set_promise_is_handled();
1135
1136 let port2 = MessagePort::new(&global, can_gc);
1138 global.track_message_port(&port2, None);
1139 let port2_peer = MessagePort::new(&global, can_gc);
1140 global.track_message_port(&port2_peer, None);
1141 global.entangle_ports(*port2.message_port_id(), *port2_peer.message_port_id());
1142
1143 let proxy_writable = WritableStream::new_with_proto(&global, None, can_gc);
1144 proxy_writable.setup_cross_realm_transform_writable(cx, &port2, can_gc);
1145
1146 readable
1148 .pipe_to(
1149 cx,
1150 &global,
1151 &proxy_writable,
1152 false,
1153 false,
1154 false,
1155 None,
1156 comp,
1157 can_gc,
1158 )
1159 .set_promise_is_handled();
1160
1161 Ok((
1166 *port1_peer.message_port_id(),
1167 TransformStreamData {
1168 readable: port1_peer.transfer()?,
1169 writable: port2_peer.transfer()?,
1170 },
1171 ))
1172 }
1173
1174 fn transfer_receive(
1176 owner: &GlobalScope,
1177 _id: MessagePortId,
1178 data: TransformStreamData,
1179 ) -> Result<DomRoot<Self>, ()> {
1180 let can_gc = CanGc::note();
1181 let cx = GlobalScope::get_cx();
1182
1183 let port1 = MessagePort::transfer_receive(owner, data.readable.0, data.readable.1)?;
1184 let port2 = MessagePort::transfer_receive(owner, data.writable.0, data.writable.1)?;
1185
1186 let proxy_readable = ReadableStream::new_with_proto(owner, None, can_gc);
1190 proxy_readable.setup_cross_realm_transform_readable(cx, &port2, can_gc);
1191
1192 let proxy_writable = WritableStream::new_with_proto(owner, None, can_gc);
1196 proxy_writable.setup_cross_realm_transform_writable(cx, &port1, can_gc);
1197
1198 let stream = TransformStream::new_with_proto(owner, None, can_gc);
1204 stream.readable.set(Some(&proxy_readable));
1205 stream.writable.set(Some(&proxy_writable));
1206
1207 Ok(stream)
1208 }
1209
1210 fn serialized_storage<'a>(
1211 data: StructuredData<'a, '_>,
1212 ) -> &'a mut Option<FxHashMap<MessagePortId, Self::Data>> {
1213 match data {
1214 StructuredData::Reader(r) => &mut r.transform_streams_port_impls,
1215 StructuredData::Writer(w) => &mut w.transform_streams_port,
1216 }
1217 }
1218}