1use std::cell::Cell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsval::UndefinedValue;
10use js::typedarray::ArrayBufferViewU8;
11
12use super::byteteeunderlyingsource::ByteTeePullAlgorithm;
13use crate::dom::bindings::buffer_source::HeapBufferSource;
14use crate::dom::bindings::error::{ErrorToJsval, Fallible};
15use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
16use crate::dom::bindings::root::{Dom, DomRoot};
17use crate::dom::globalscope::GlobalScope;
18use crate::dom::promise::Promise;
19use crate::dom::stream::byteteeunderlyingsource::ByteTeeUnderlyingSource;
20use crate::dom::stream::readablestream::ReadableStream;
21use crate::microtask::Microtask;
22use crate::script_runtime::CanGc;
23
24#[derive(JSTraceable, MallocSizeOf)]
25#[cfg_attr(crown, expect(crown::unrooted_must_root))]
26pub(crate) struct ByteTeeReadIntoRequestMicrotask {
27 #[ignore_malloc_size_of = "mozjs"]
28 chunk: HeapBufferSource<ArrayBufferViewU8>,
29 tee_read_request: Dom<ByteTeeReadIntoRequest>,
30}
31
32impl ByteTeeReadIntoRequestMicrotask {
33 pub(crate) fn microtask_chunk_steps(&self, cx: &mut js::context::JSContext) {
34 self.tee_read_request
35 .chunk_steps(self.chunk.clone(), cx)
36 .expect("Failed to enqueue chunk");
37 }
38}
39
40#[dom_struct]
41pub(crate) struct ByteTeeReadIntoRequest {
42 reflector_: Reflector,
43 for_branch2: bool,
44 byob_branch: Dom<ReadableStream>,
45 other_branch: Dom<ReadableStream>,
46 stream: Dom<ReadableStream>,
47 #[conditional_malloc_size_of]
48 read_again_for_branch_1: Rc<Cell<bool>>,
49 #[conditional_malloc_size_of]
50 read_again_for_branch_2: Rc<Cell<bool>>,
51 #[conditional_malloc_size_of]
52 reading: Rc<Cell<bool>>,
53 #[conditional_malloc_size_of]
54 canceled_1: Rc<Cell<bool>>,
55 #[conditional_malloc_size_of]
56 canceled_2: Rc<Cell<bool>>,
57 #[conditional_malloc_size_of]
58 cancel_promise: Rc<Promise>,
59 tee_underlying_source: Dom<ByteTeeUnderlyingSource>,
60}
61impl ByteTeeReadIntoRequest {
62 #[allow(clippy::too_many_arguments)]
63 pub(crate) fn new(
64 for_branch2: bool,
65 byob_branch: &ReadableStream,
66 other_branch: &ReadableStream,
67 stream: &ReadableStream,
68 read_again_for_branch_1: Rc<Cell<bool>>,
69 read_again_for_branch_2: Rc<Cell<bool>>,
70 reading: Rc<Cell<bool>>,
71 canceled_1: Rc<Cell<bool>>,
72 canceled_2: Rc<Cell<bool>>,
73 cancel_promise: Rc<Promise>,
74 tee_underlying_source: &ByteTeeUnderlyingSource,
75 global: &GlobalScope,
76 can_gc: CanGc,
77 ) -> DomRoot<Self> {
78 reflect_dom_object(
79 Box::new(ByteTeeReadIntoRequest {
80 reflector_: Reflector::new(),
81 for_branch2,
82 byob_branch: Dom::from_ref(byob_branch),
83 other_branch: Dom::from_ref(other_branch),
84 stream: Dom::from_ref(stream),
85 read_again_for_branch_1,
86 read_again_for_branch_2,
87 reading,
88 canceled_1,
89 canceled_2,
90 cancel_promise,
91 tee_underlying_source: Dom::from_ref(tee_underlying_source),
92 }),
93 global,
94 can_gc,
95 )
96 }
97
98 pub(crate) fn enqueue_chunk_steps(&self, chunk: HeapBufferSource<ArrayBufferViewU8>) {
99 let byte_tee_read_request_chunk = ByteTeeReadIntoRequestMicrotask {
101 chunk,
102 tee_read_request: Dom::from_ref(self),
103 };
104
105 self.global()
106 .enqueue_microtask(Microtask::ReadableStreamByteTeeReadIntoRequest(
107 byte_tee_read_request_chunk,
108 ));
109 }
110
111 #[allow(clippy::borrowed_box)]
113 pub(crate) fn chunk_steps(
114 &self,
115 chunk: HeapBufferSource<ArrayBufferViewU8>,
116 cx: &mut js::context::JSContext,
117 ) -> Fallible<()> {
118 self.read_again_for_branch_1.set(false);
120
121 self.read_again_for_branch_2.set(false);
123
124 let byob_canceled = if self.for_branch2 {
126 self.canceled_2.get()
127 } else {
128 self.canceled_1.get()
129 };
130
131 let other_canceled = if self.for_branch2 {
133 self.canceled_1.get()
134 } else {
135 self.canceled_2.get()
136 };
137
138 if !other_canceled {
140 let clone_result = chunk.clone_as_uint8_array(cx.into());
142
143 if let Err(error) = clone_result {
145 rooted!(&in(cx) let mut error_value = UndefinedValue());
146 error.to_jsval(
147 cx.into(),
148 &self.global(),
149 error_value.handle_mut(),
150 CanGc::from_cx(cx),
151 );
152
153 let byob_branch_controller = self.byob_branch.get_byte_controller();
155 byob_branch_controller.error(error_value.handle(), CanGc::from_cx(cx));
156
157 let other_branch_controller = self.other_branch.get_byte_controller();
159 other_branch_controller.error(error_value.handle(), CanGc::from_cx(cx));
160
161 let cancel_result =
163 self.stream
164 .cancel(cx, &self.stream.global(), error_value.handle());
165 self.cancel_promise
166 .resolve_native(&cancel_result, CanGc::from_cx(cx));
167
168 return Ok(());
170 } else {
171 let cloned_chunk = clone_result.unwrap();
173
174 if !byob_canceled {
177 let byob_branch_controller = self.byob_branch.get_byte_controller();
178 byob_branch_controller.respond_with_new_view(
179 cx.into(),
180 chunk,
181 CanGc::from_cx(cx),
182 )?;
183 }
184
185 let other_branch_controller = self.other_branch.get_byte_controller();
187 other_branch_controller.enqueue(cx, cloned_chunk)?;
188 }
189 } else if !byob_canceled {
190 let byob_branch_controller = self.byob_branch.get_byte_controller();
194 byob_branch_controller.respond_with_new_view(cx.into(), chunk, CanGc::from_cx(cx))?;
195 }
196
197 self.reading.set(false);
199
200 if self.read_again_for_branch_1.get() {
202 self.pull_algorithm(cx, Some(ByteTeePullAlgorithm::Pull1Algorithm));
203 } else if self.read_again_for_branch_2.get() {
204 self.pull_algorithm(cx, Some(ByteTeePullAlgorithm::Pull2Algorithm));
206 }
207
208 Ok(())
209 }
210
211 pub(crate) fn close_steps(
213 &self,
214 chunk: Option<HeapBufferSource<ArrayBufferViewU8>>,
215 can_gc: CanGc,
216 ) -> Fallible<()> {
217 let cx = GlobalScope::get_cx();
218
219 self.reading.set(false);
221
222 let byob_canceled = if self.for_branch2 {
224 self.canceled_2.get()
225 } else {
226 self.canceled_1.get()
227 };
228
229 let other_canceled = if self.for_branch2 {
231 self.canceled_1.get()
232 } else {
233 self.canceled_2.get()
234 };
235
236 if !byob_canceled {
238 let byob_branch_controller = self.byob_branch.get_byte_controller();
239 byob_branch_controller.close(cx, can_gc)?;
240 }
241
242 if !other_canceled {
244 let other_branch_controller = self.other_branch.get_byte_controller();
245 other_branch_controller.close(cx, can_gc)?;
246 }
247
248 if let Some(chunk_value) = chunk {
250 if chunk_value.is_undefined() {
251 } else {
254 let chunk = chunk_value;
255 assert_eq!(chunk.byte_length(), 0);
257
258 if !byob_canceled {
261 let byob_branch_controller = self.byob_branch.get_byte_controller();
262 byob_branch_controller.respond_with_new_view(cx, chunk, can_gc)?;
263 }
264
265 if !other_canceled {
268 let other_branch_controller = self.other_branch.get_byte_controller();
269 if other_branch_controller.get_pending_pull_intos_size() > 0 {
270 other_branch_controller.respond(cx, 0, can_gc)?;
271 }
272 }
273 }
274 }
275
276 if !byob_canceled || !other_canceled {
278 self.cancel_promise.resolve_native(&(), can_gc);
279 }
280
281 Ok(())
282 }
283 pub(crate) fn error_steps(&self) {
285 self.reading.set(false);
287 }
288
289 pub(crate) fn pull_algorithm(
290 &self,
291 cx: &mut js::context::JSContext,
292 byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
293 ) {
294 self.tee_underlying_source
295 .pull_algorithm(byte_tee_pull_algorithm, CanGc::from_cx(cx));
296 }
297}