1use std::cell::Cell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsval::UndefinedValue;
10use js::typedarray::ArrayBufferViewU8;
11
12use super::byteteeunderlyingsource::ByteTeePullAlgorithm;
13use crate::dom::bindings::buffer_source::HeapBufferSource;
14use crate::dom::bindings::error::{ErrorToJsval, Fallible};
15use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
16use crate::dom::bindings::root::{Dom, DomRoot};
17use crate::dom::globalscope::GlobalScope;
18use crate::dom::promise::Promise;
19use crate::dom::stream::byteteeunderlyingsource::ByteTeeUnderlyingSource;
20use crate::dom::stream::readablestream::ReadableStream;
21use crate::microtask::Microtask;
22use crate::script_runtime::CanGc;
23
24#[derive(JSTraceable, MallocSizeOf)]
25#[cfg_attr(crown, expect(crown::unrooted_must_root))]
26pub(crate) struct ByteTeeReadIntoRequestMicrotask {
27 #[ignore_malloc_size_of = "mozjs"]
28 chunk: HeapBufferSource<ArrayBufferViewU8>,
29 tee_read_request: Dom<ByteTeeReadIntoRequest>,
30}
31
32impl ByteTeeReadIntoRequestMicrotask {
33 pub(crate) fn microtask_chunk_steps(&self, cx: &mut js::context::JSContext) {
34 self.tee_read_request
35 .chunk_steps(self.chunk.clone(), cx)
36 .expect("Failed to enqueue chunk");
37 }
38}
39
40#[dom_struct]
41pub(crate) struct ByteTeeReadIntoRequest {
42 reflector_: Reflector,
43 for_branch2: bool,
44 byob_branch: Dom<ReadableStream>,
45 other_branch: Dom<ReadableStream>,
46 stream: Dom<ReadableStream>,
47 #[conditional_malloc_size_of]
48 read_again_for_branch_1: Rc<Cell<bool>>,
49 #[conditional_malloc_size_of]
50 read_again_for_branch_2: Rc<Cell<bool>>,
51 #[conditional_malloc_size_of]
52 reading: Rc<Cell<bool>>,
53 #[conditional_malloc_size_of]
54 canceled_1: Rc<Cell<bool>>,
55 #[conditional_malloc_size_of]
56 canceled_2: Rc<Cell<bool>>,
57 #[conditional_malloc_size_of]
58 cancel_promise: Rc<Promise>,
59 tee_underlying_source: Dom<ByteTeeUnderlyingSource>,
60}
61impl ByteTeeReadIntoRequest {
62 #[allow(clippy::too_many_arguments)]
63 pub(crate) fn new(
64 for_branch2: bool,
65 byob_branch: &ReadableStream,
66 other_branch: &ReadableStream,
67 stream: &ReadableStream,
68 read_again_for_branch_1: Rc<Cell<bool>>,
69 read_again_for_branch_2: Rc<Cell<bool>>,
70 reading: Rc<Cell<bool>>,
71 canceled_1: Rc<Cell<bool>>,
72 canceled_2: Rc<Cell<bool>>,
73 cancel_promise: Rc<Promise>,
74 tee_underlying_source: &ByteTeeUnderlyingSource,
75 global: &GlobalScope,
76 can_gc: CanGc,
77 ) -> DomRoot<Self> {
78 reflect_dom_object(
79 Box::new(ByteTeeReadIntoRequest {
80 reflector_: Reflector::new(),
81 for_branch2,
82 byob_branch: Dom::from_ref(byob_branch),
83 other_branch: Dom::from_ref(other_branch),
84 stream: Dom::from_ref(stream),
85 read_again_for_branch_1,
86 read_again_for_branch_2,
87 reading,
88 canceled_1,
89 canceled_2,
90 cancel_promise,
91 tee_underlying_source: Dom::from_ref(tee_underlying_source),
92 }),
93 global,
94 can_gc,
95 )
96 }
97
98 pub(crate) fn enqueue_chunk_steps(&self, chunk: HeapBufferSource<ArrayBufferViewU8>) {
99 let byte_tee_read_request_chunk = ByteTeeReadIntoRequestMicrotask {
101 chunk,
102 tee_read_request: Dom::from_ref(self),
103 };
104
105 self.global()
106 .enqueue_microtask(Microtask::ReadableStreamByteTeeReadIntoRequest(
107 byte_tee_read_request_chunk,
108 ));
109 }
110
111 #[allow(clippy::borrowed_box)]
113 pub(crate) fn chunk_steps(
114 &self,
115 chunk: HeapBufferSource<ArrayBufferViewU8>,
116 cx: &mut js::context::JSContext,
117 ) -> Fallible<()> {
118 self.read_again_for_branch_1.set(false);
120
121 self.read_again_for_branch_2.set(false);
123
124 let byob_canceled = if self.for_branch2 {
126 self.canceled_2.get()
127 } else {
128 self.canceled_1.get()
129 };
130
131 let other_canceled = if self.for_branch2 {
133 self.canceled_1.get()
134 } else {
135 self.canceled_2.get()
136 };
137
138 if !other_canceled {
140 let clone_result = chunk.clone_as_uint8_array(cx.into());
142
143 if let Err(error) = clone_result {
145 rooted!(&in(cx) let mut error_value = UndefinedValue());
146 error.clone().to_jsval(
147 cx.into(),
148 &self.global(),
149 error_value.handle_mut(),
150 CanGc::from_cx(cx),
151 );
152
153 let byob_branch_controller = self.byob_branch.get_byte_controller();
155 byob_branch_controller.error(error_value.handle(), CanGc::from_cx(cx));
156
157 let other_branch_controller = self.other_branch.get_byte_controller();
159 other_branch_controller.error(error_value.handle(), CanGc::from_cx(cx));
160
161 let cancel_result = self.stream.cancel(
163 cx.into(),
164 &self.stream.global(),
165 error_value.handle(),
166 CanGc::from_cx(cx),
167 );
168 self.cancel_promise
169 .resolve_native(&cancel_result, CanGc::from_cx(cx));
170
171 return Ok(());
173 } else {
174 let cloned_chunk = clone_result.unwrap();
176
177 if !byob_canceled {
180 let byob_branch_controller = self.byob_branch.get_byte_controller();
181 byob_branch_controller.respond_with_new_view(
182 cx.into(),
183 chunk,
184 CanGc::from_cx(cx),
185 )?;
186 }
187
188 let other_branch_controller = self.other_branch.get_byte_controller();
190 other_branch_controller.enqueue(cx.into(), cloned_chunk, CanGc::from_cx(cx))?;
191 }
192 } else if !byob_canceled {
193 let byob_branch_controller = self.byob_branch.get_byte_controller();
197 byob_branch_controller.respond_with_new_view(cx.into(), chunk, CanGc::from_cx(cx))?;
198 }
199
200 self.reading.set(false);
202
203 if self.read_again_for_branch_1.get() {
205 self.pull_algorithm(
206 Some(ByteTeePullAlgorithm::Pull1Algorithm),
207 CanGc::from_cx(cx),
208 );
209 } else if self.read_again_for_branch_2.get() {
210 self.pull_algorithm(
212 Some(ByteTeePullAlgorithm::Pull2Algorithm),
213 CanGc::from_cx(cx),
214 );
215 }
216
217 Ok(())
218 }
219
220 pub(crate) fn close_steps(
222 &self,
223 chunk: Option<HeapBufferSource<ArrayBufferViewU8>>,
224 can_gc: CanGc,
225 ) -> Fallible<()> {
226 let cx = GlobalScope::get_cx();
227
228 self.reading.set(false);
230
231 let byob_canceled = if self.for_branch2 {
233 self.canceled_2.get()
234 } else {
235 self.canceled_1.get()
236 };
237
238 let other_canceled = if self.for_branch2 {
240 self.canceled_1.get()
241 } else {
242 self.canceled_2.get()
243 };
244
245 if !byob_canceled {
247 let byob_branch_controller = self.byob_branch.get_byte_controller();
248 byob_branch_controller.close(cx, can_gc)?;
249 }
250
251 if !other_canceled {
253 let other_branch_controller = self.other_branch.get_byte_controller();
254 other_branch_controller.close(cx, can_gc)?;
255 }
256
257 if let Some(chunk_value) = chunk {
259 if chunk_value.is_undefined() {
260 } else {
263 let chunk = chunk_value;
264 assert_eq!(chunk.byte_length(), 0);
266
267 if !byob_canceled {
270 let byob_branch_controller = self.byob_branch.get_byte_controller();
271 byob_branch_controller.respond_with_new_view(cx, chunk, can_gc)?;
272 }
273
274 if !other_canceled {
277 let other_branch_controller = self.other_branch.get_byte_controller();
278 if other_branch_controller.get_pending_pull_intos_size() > 0 {
279 other_branch_controller.respond(cx, 0, can_gc)?;
280 }
281 }
282 }
283 }
284
285 if !byob_canceled || !other_canceled {
287 self.cancel_promise.resolve_native(&(), can_gc);
288 }
289
290 Ok(())
291 }
292 pub(crate) fn error_steps(&self) {
294 self.reading.set(false);
296 }
297
298 pub(crate) fn pull_algorithm(
299 &self,
300 byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
301 can_gc: CanGc,
302 ) {
303 self.tee_underlying_source
304 .pull_algorithm(byte_tee_pull_algorithm, can_gc);
305 }
306}