1use std::cell::Cell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsapi::Heap;
10use js::jsval::{JSVal, UndefinedValue};
11use js::typedarray::ArrayBufferViewU8;
12use script_bindings::error::Fallible;
13
14use super::byteteeunderlyingsource::ByteTeePullAlgorithm;
15use crate::dom::bindings::buffer_source::{BufferSource, HeapBufferSource};
16use crate::dom::bindings::error::{Error, ErrorToJsval};
17use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
18use crate::dom::bindings::root::{Dom, DomRoot};
19use crate::dom::bindings::trace::RootedTraceableBox;
20use crate::dom::globalscope::GlobalScope;
21use crate::dom::promise::Promise;
22use crate::dom::stream::byteteeunderlyingsource::ByteTeeUnderlyingSource;
23use crate::dom::stream::readablestream::ReadableStream;
24use crate::microtask::Microtask;
25use crate::script_runtime::CanGc;
26
27#[derive(JSTraceable, MallocSizeOf)]
28#[cfg_attr(crown, expect(crown::unrooted_must_root))]
29pub(crate) struct ByteTeeReadRequestMicrotask {
30 #[ignore_malloc_size_of = "mozjs"]
31 chunk: Box<Heap<JSVal>>,
32 tee_read_request: Dom<ByteTeeReadRequest>,
33}
34
35impl ByteTeeReadRequestMicrotask {
36 pub(crate) fn microtask_chunk_steps(&self, cx: &mut js::context::JSContext) {
37 self.tee_read_request
38 .chunk_steps(&self.chunk, cx)
39 .expect("ByteTeeReadRequestMicrotask::microtask_chunk_steps failed");
40 }
41}
42
43#[dom_struct]
44pub(crate) struct ByteTeeReadRequest {
46 reflector_: Reflector,
47 branch_1: Dom<ReadableStream>,
48 branch_2: Dom<ReadableStream>,
49 stream: Dom<ReadableStream>,
50 #[conditional_malloc_size_of]
51 read_again_for_branch_1: Rc<Cell<bool>>,
52 #[conditional_malloc_size_of]
53 read_again_for_branch_2: Rc<Cell<bool>>,
54 #[conditional_malloc_size_of]
55 reading: Rc<Cell<bool>>,
56 #[conditional_malloc_size_of]
57 canceled_1: Rc<Cell<bool>>,
58 #[conditional_malloc_size_of]
59 canceled_2: Rc<Cell<bool>>,
60 #[conditional_malloc_size_of]
61 cancel_promise: Rc<Promise>,
62 tee_underlying_source: Dom<ByteTeeUnderlyingSource>,
63}
64impl ByteTeeReadRequest {
65 #[allow(clippy::too_many_arguments)]
66 pub(crate) fn new(
67 branch_1: &ReadableStream,
68 branch_2: &ReadableStream,
69 stream: &ReadableStream,
70 read_again_for_branch_1: Rc<Cell<bool>>,
71 read_again_for_branch_2: Rc<Cell<bool>>,
72 reading: Rc<Cell<bool>>,
73 canceled_1: Rc<Cell<bool>>,
74 canceled_2: Rc<Cell<bool>>,
75 cancel_promise: Rc<Promise>,
76 tee_underlying_source: &ByteTeeUnderlyingSource,
77 global: &GlobalScope,
78 can_gc: CanGc,
79 ) -> DomRoot<Self> {
80 reflect_dom_object(
81 Box::new(ByteTeeReadRequest {
82 reflector_: Reflector::new(),
83 branch_1: Dom::from_ref(branch_1),
84 branch_2: Dom::from_ref(branch_2),
85 stream: Dom::from_ref(stream),
86 read_again_for_branch_1,
87 read_again_for_branch_2,
88 reading,
89 canceled_1,
90 canceled_2,
91 cancel_promise,
92 tee_underlying_source: Dom::from_ref(tee_underlying_source),
93 }),
94 global,
95 can_gc,
96 )
97 }
98
99 pub(crate) fn enqueue_chunk_steps(
102 &self,
103 global: &GlobalScope,
104 chunk: RootedTraceableBox<Heap<JSVal>>,
105 ) {
106 let byte_tee_read_request_chunk = ByteTeeReadRequestMicrotask {
108 chunk: Heap::boxed(*chunk.handle()),
109 tee_read_request: Dom::from_ref(self),
110 };
111 global.enqueue_microtask(Microtask::ReadableStreamByteTeeReadRequest(
112 byte_tee_read_request_chunk,
113 ));
114 }
115
116 #[allow(clippy::borrowed_box)]
118 pub(crate) fn chunk_steps(
119 &self,
120 chunk: &Box<Heap<JSVal>>,
121 cx: &mut js::context::JSContext,
122 ) -> Fallible<()> {
123 self.read_again_for_branch_1.set(false);
125
126 self.read_again_for_branch_2.set(false);
128
129 let chunk1 = chunk;
131 let chunk2 = chunk;
132
133 let handle_clone_error = |cx: &mut js::context::JSContext, error: Error| {
135 rooted!(&in(cx) let mut error_value = UndefinedValue());
136 error.clone().to_jsval(
137 cx.into(),
138 &self.global(),
139 error_value.handle_mut(),
140 CanGc::from_cx(cx),
141 );
142
143 let branch_1_controller = self.branch_1.get_byte_controller();
144 let branch_2_controller = self.branch_2.get_byte_controller();
145
146 branch_1_controller.error(error_value.handle(), CanGc::from_cx(cx));
147 branch_2_controller.error(error_value.handle(), CanGc::from_cx(cx));
148
149 let cancel_result = self.stream.cancel(
150 cx.into(),
151 &self.stream.global(),
152 error_value.handle(),
153 CanGc::from_cx(cx),
154 );
155 self.cancel_promise
156 .resolve_native(&cancel_result, CanGc::from_cx(cx));
157 };
158
159 let chunk1_view = if !self.canceled_1.get() {
161 Some(HeapBufferSource::<ArrayBufferViewU8>::new(
162 BufferSource::ArrayBufferView(RootedTraceableBox::from_box(Heap::boxed(
163 chunk1.get().to_object(),
164 ))),
165 ))
166 } else {
167 None
168 };
169
170 let mut chunk2_view = None;
171
172 if !self.canceled_1.get() && !self.canceled_2.get() {
174 let chunk2_source =
176 HeapBufferSource::<ArrayBufferViewU8>::new(BufferSource::ArrayBufferView(
177 RootedTraceableBox::from_box(Heap::boxed(chunk2.get().to_object())),
178 ));
179 let clone_result = chunk2_source.clone_as_uint8_array(cx.into());
180
181 if let Err(error) = clone_result {
183 handle_clone_error(cx, error);
184 return Ok(());
185 } else {
186 chunk2_view = clone_result.ok();
188 }
189 } else if !self.canceled_2.get() {
190 let chunk2_source =
192 HeapBufferSource::<ArrayBufferViewU8>::new(BufferSource::ArrayBufferView(
193 RootedTraceableBox::from_box(Heap::boxed(chunk2.get().to_object())),
194 ));
195 match chunk2_source.clone_as_uint8_array(cx.into()) {
196 Ok(clone) => chunk2_view = Some(clone),
197 Err(error) => {
198 handle_clone_error(cx, error);
199 return Ok(());
200 },
201 }
202 }
203
204 if let Some(chunk1_view) = chunk1_view {
206 let branch_1_controller = self.branch_1.get_byte_controller();
207 branch_1_controller.enqueue(cx.into(), chunk1_view, CanGc::from_cx(cx))?;
208 }
209
210 if let Some(chunk2_view) = chunk2_view {
212 let branch_2_controller = self.branch_2.get_byte_controller();
213 branch_2_controller.enqueue(cx.into(), chunk2_view, CanGc::from_cx(cx))?;
214 }
215
216 self.reading.set(false);
218
219 if self.read_again_for_branch_1.get() {
221 self.pull_algorithm(
222 Some(ByteTeePullAlgorithm::Pull1Algorithm),
223 CanGc::from_cx(cx),
224 );
225 } else if self.read_again_for_branch_2.get() {
226 self.pull_algorithm(
228 Some(ByteTeePullAlgorithm::Pull2Algorithm),
229 CanGc::from_cx(cx),
230 );
231 }
232
233 Ok(())
234 }
235
236 pub(crate) fn close_steps(&self, can_gc: CanGc) -> Fallible<()> {
238 let cx = GlobalScope::get_cx();
239 let branch_1_controller = self.branch_1.get_byte_controller();
240 let branch_2_controller = self.branch_2.get_byte_controller();
241
242 self.reading.set(false);
244
245 if !self.canceled_1.get() {
247 branch_1_controller.close(cx, can_gc)?;
248 }
249
250 if !self.canceled_2.get() {
252 branch_2_controller.close(cx, can_gc)?;
253 }
254
255 if branch_1_controller.get_pending_pull_intos_size() > 0 {
258 branch_1_controller.respond(cx, 0, can_gc)?;
259 }
260
261 if branch_2_controller.get_pending_pull_intos_size() > 0 {
264 branch_2_controller.respond(cx, 0, can_gc)?;
265 }
266
267 if !self.canceled_1.get() || !self.canceled_2.get() {
269 self.cancel_promise.resolve_native(&(), can_gc);
270 }
271
272 Ok(())
273 }
274
275 pub(crate) fn error_steps(&self) {
277 self.reading.set(false);
279 }
280
281 pub(crate) fn pull_algorithm(
282 &self,
283 byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
284 can_gc: CanGc,
285 ) {
286 self.tee_underlying_source
287 .pull_algorithm(byte_tee_pull_algorithm, can_gc);
288 }
289}