1use std::cell::Cell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsapi::{Heap, JSAutoRealm};
10use js::jsval::{JSVal, UndefinedValue};
11use js::rust::HandleValue as SafeHandleValue;
12
13use super::bindings::reflector::reflect_dom_object;
14use super::bindings::root::DomRoot;
15use super::bindings::structuredclone;
16use crate::dom::bindings::reflector::{DomGlobal, Reflector};
17use crate::dom::bindings::root::Dom;
18use crate::dom::bindings::trace::RootedTraceableBox;
19use crate::dom::defaultteeunderlyingsource::DefaultTeeUnderlyingSource;
20use crate::dom::globalscope::GlobalScope;
21use crate::dom::promise::Promise;
22use crate::dom::readablestream::ReadableStream;
23use crate::microtask::{Microtask, MicrotaskRunnable};
24use crate::realms::enter_realm;
25use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
26
27#[derive(JSTraceable, MallocSizeOf)]
28#[cfg_attr(crown, allow(crown::unrooted_must_root))]
29pub(crate) struct DefaultTeeReadRequestMicrotask {
30 #[ignore_malloc_size_of = "mozjs"]
31 chunk: Box<Heap<JSVal>>,
32 tee_read_request: Dom<DefaultTeeReadRequest>,
33}
34
35impl MicrotaskRunnable for DefaultTeeReadRequestMicrotask {
36 fn handler(&self, can_gc: CanGc) {
37 let cx = GlobalScope::get_cx();
38 self.tee_read_request.chunk_steps(cx, &self.chunk, can_gc);
39 }
40
41 fn enter_realm(&self) -> JSAutoRealm {
42 enter_realm(&*self.tee_read_request)
43 }
44}
45
46#[dom_struct]
47pub(crate) struct DefaultTeeReadRequest {
49 reflector_: Reflector,
50 stream: Dom<ReadableStream>,
51 branch_1: Dom<ReadableStream>,
52 branch_2: Dom<ReadableStream>,
53 #[ignore_malloc_size_of = "Rc"]
54 reading: Rc<Cell<bool>>,
55 #[ignore_malloc_size_of = "Rc"]
56 read_again: Rc<Cell<bool>>,
57 #[ignore_malloc_size_of = "Rc"]
58 canceled_1: Rc<Cell<bool>>,
59 #[ignore_malloc_size_of = "Rc"]
60 canceled_2: Rc<Cell<bool>>,
61 #[ignore_malloc_size_of = "Rc"]
62 clone_for_branch_2: Rc<Cell<bool>>,
63 #[ignore_malloc_size_of = "Rc"]
64 cancel_promise: Rc<Promise>,
65 tee_underlying_source: Dom<DefaultTeeUnderlyingSource>,
66}
67impl DefaultTeeReadRequest {
68 #[allow(clippy::too_many_arguments)]
69 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
70 pub(crate) fn new(
71 stream: &ReadableStream,
72 branch_1: &ReadableStream,
73 branch_2: &ReadableStream,
74 reading: Rc<Cell<bool>>,
75 read_again: Rc<Cell<bool>>,
76 canceled_1: Rc<Cell<bool>>,
77 canceled_2: Rc<Cell<bool>>,
78 clone_for_branch_2: Rc<Cell<bool>>,
79 cancel_promise: Rc<Promise>,
80 tee_underlying_source: &DefaultTeeUnderlyingSource,
81 can_gc: CanGc,
82 ) -> DomRoot<Self> {
83 reflect_dom_object(
84 Box::new(DefaultTeeReadRequest {
85 reflector_: Reflector::new(),
86 stream: Dom::from_ref(stream),
87 branch_1: Dom::from_ref(branch_1),
88 branch_2: Dom::from_ref(branch_2),
89 reading,
90 read_again,
91 canceled_1,
92 canceled_2,
93 clone_for_branch_2,
94 cancel_promise,
95 tee_underlying_source: Dom::from_ref(tee_underlying_source),
96 }),
97 &*stream.global(),
98 can_gc,
99 )
100 }
101 pub(crate) fn stream_cancel(
104 &self,
105 cx: SafeJSContext,
106 global: &GlobalScope,
107 reason: SafeHandleValue,
108 can_gc: CanGc,
109 ) {
110 self.stream.cancel(cx, global, reason, can_gc);
111 }
112 pub(crate) fn enqueue_chunk_steps(&self, chunk: RootedTraceableBox<Heap<JSVal>>) {
115 let tee_read_request_chunk = DefaultTeeReadRequestMicrotask {
117 chunk: Heap::boxed(*chunk.handle()),
118 tee_read_request: Dom::from_ref(self),
119 };
120 let global = self.stream.global();
121 let microtask_queue = global.microtask_queue();
122 let cx = GlobalScope::get_cx();
123 microtask_queue.enqueue(
124 Microtask::ReadableStreamTeeReadRequest(tee_read_request_chunk),
125 cx,
126 );
127 }
128 #[allow(clippy::borrowed_box)]
130 pub(crate) fn chunk_steps(&self, cx: SafeJSContext, chunk: &Box<Heap<JSVal>>, can_gc: CanGc) {
131 let global = &self.stream.global();
132 self.read_again.set(false);
134 let chunk1 = chunk;
136 let chunk2 = chunk;
137
138 rooted!(in(*cx) let chunk1_value = chunk1.get());
139 rooted!(in(*cx) let chunk2_value = chunk2.get());
140 if !self.canceled_2.get() && self.clone_for_branch_2.get() {
142 rooted!(in(*cx) let mut clone_result = UndefinedValue());
144 let data = structuredclone::write(cx, chunk2_value.handle(), None).unwrap();
145 if structuredclone::read(global, data, clone_result.handle_mut()).is_err() {
147 self.readable_stream_default_controller_error(
149 &self.branch_1,
150 clone_result.handle(),
151 can_gc,
152 );
153
154 self.readable_stream_default_controller_error(
156 &self.branch_2,
157 clone_result.handle(),
158 can_gc,
159 );
160 self.stream_cancel(cx, global, clone_result.handle(), can_gc);
162 return;
164 } else {
165 chunk2.set(*clone_result);
167 }
168 }
169 if !self.canceled_1.get() {
171 self.readable_stream_default_controller_enqueue(
172 &self.branch_1,
173 chunk1_value.handle(),
174 can_gc,
175 );
176 }
177 if !self.canceled_2.get() {
179 self.readable_stream_default_controller_enqueue(
180 &self.branch_2,
181 chunk2_value.handle(),
182 can_gc,
183 );
184 }
185 self.reading.set(false);
187 if self.read_again.get() {
189 self.pull_algorithm(can_gc);
190 }
191 }
192 pub(crate) fn close_steps(&self, can_gc: CanGc) {
194 self.reading.set(false);
196 if !self.canceled_1.get() {
198 self.readable_stream_default_controller_close(&self.branch_1, can_gc);
199 }
200 if !self.canceled_2.get() {
202 self.readable_stream_default_controller_close(&self.branch_2, can_gc);
203 }
204 if !self.canceled_1.get() || !self.canceled_2.get() {
206 self.cancel_promise.resolve_native(&(), can_gc);
207 }
208 }
209 pub(crate) fn error_steps(&self) {
211 self.reading.set(false);
213 }
214 fn readable_stream_default_controller_enqueue(
217 &self,
218 stream: &ReadableStream,
219 chunk: SafeHandleValue,
220 can_gc: CanGc,
221 ) {
222 stream
223 .get_default_controller()
224 .enqueue(GlobalScope::get_cx(), chunk, can_gc)
225 .expect("enqueue failed for stream controller in DefaultTeeReadRequest");
226 }
227
228 fn readable_stream_default_controller_close(&self, stream: &ReadableStream, can_gc: CanGc) {
231 stream.get_default_controller().close(can_gc);
232 }
233
234 fn readable_stream_default_controller_error(
237 &self,
238 stream: &ReadableStream,
239 error: SafeHandleValue,
240 can_gc: CanGc,
241 ) {
242 stream.get_default_controller().error(error, can_gc);
243 }
244
245 pub(crate) fn pull_algorithm(&self, can_gc: CanGc) {
246 self.tee_underlying_source.pull_algorithm(can_gc);
247 }
248}