1use std::cell::Cell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsval::UndefinedValue;
10use js::typedarray::ArrayBufferViewU8;
11
12use super::byteteeunderlyingsource::ByteTeePullAlgorithm;
13use crate::dom::bindings::buffer_source::HeapBufferSource;
14use crate::dom::bindings::error::{ErrorToJsval, Fallible};
15use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
16use crate::dom::bindings::root::{Dom, DomRoot};
17use crate::dom::globalscope::GlobalScope;
18use crate::dom::promise::Promise;
19use crate::dom::stream::byteteeunderlyingsource::ByteTeeUnderlyingSource;
20use crate::dom::stream::readablestream::ReadableStream;
21use crate::microtask::Microtask;
22use crate::script_runtime::CanGc;
23
24#[derive(JSTraceable, MallocSizeOf)]
25#[cfg_attr(crown, expect(crown::unrooted_must_root))]
26pub(crate) struct ByteTeeReadIntoRequestMicrotask {
27 #[ignore_malloc_size_of = "mozjs"]
28 chunk: HeapBufferSource<ArrayBufferViewU8>,
29 tee_read_request: Dom<ByteTeeReadIntoRequest>,
30}
31
32impl ByteTeeReadIntoRequestMicrotask {
33 pub(crate) fn microtask_chunk_steps(&self, can_gc: CanGc) {
34 self.tee_read_request
35 .chunk_steps(self.chunk.clone(), can_gc)
36 .expect("Failed to enqueue chunk");
37 }
38}
39
40#[dom_struct]
41pub(crate) struct ByteTeeReadIntoRequest {
42 reflector_: Reflector,
43 for_branch2: bool,
44 byob_branch: Dom<ReadableStream>,
45 other_branch: Dom<ReadableStream>,
46 stream: Dom<ReadableStream>,
47 #[ignore_malloc_size_of = "Rc"]
48 read_again_for_branch_1: Rc<Cell<bool>>,
49 #[ignore_malloc_size_of = "Rc"]
50 read_again_for_branch_2: Rc<Cell<bool>>,
51 #[ignore_malloc_size_of = "Rc"]
52 reading: Rc<Cell<bool>>,
53 #[ignore_malloc_size_of = "Rc"]
54 canceled_1: Rc<Cell<bool>>,
55 #[ignore_malloc_size_of = "Rc"]
56 canceled_2: Rc<Cell<bool>>,
57 #[ignore_malloc_size_of = "Rc"]
58 cancel_promise: Rc<Promise>,
59 tee_underlying_source: Dom<ByteTeeUnderlyingSource>,
60}
61impl ByteTeeReadIntoRequest {
62 #[allow(clippy::too_many_arguments)]
63 pub(crate) fn new(
64 for_branch2: bool,
65 byob_branch: &ReadableStream,
66 other_branch: &ReadableStream,
67 stream: &ReadableStream,
68 read_again_for_branch_1: Rc<Cell<bool>>,
69 read_again_for_branch_2: Rc<Cell<bool>>,
70 reading: Rc<Cell<bool>>,
71 canceled_1: Rc<Cell<bool>>,
72 canceled_2: Rc<Cell<bool>>,
73 cancel_promise: Rc<Promise>,
74 tee_underlying_source: &ByteTeeUnderlyingSource,
75 global: &GlobalScope,
76 can_gc: CanGc,
77 ) -> DomRoot<Self> {
78 reflect_dom_object(
79 Box::new(ByteTeeReadIntoRequest {
80 reflector_: Reflector::new(),
81 for_branch2,
82 byob_branch: Dom::from_ref(byob_branch),
83 other_branch: Dom::from_ref(other_branch),
84 stream: Dom::from_ref(stream),
85 read_again_for_branch_1,
86 read_again_for_branch_2,
87 reading,
88 canceled_1,
89 canceled_2,
90 cancel_promise,
91 tee_underlying_source: Dom::from_ref(tee_underlying_source),
92 }),
93 global,
94 can_gc,
95 )
96 }
97
98 pub(crate) fn enqueue_chunk_steps(&self, chunk: HeapBufferSource<ArrayBufferViewU8>) {
99 let byte_tee_read_request_chunk = ByteTeeReadIntoRequestMicrotask {
101 chunk,
102 tee_read_request: Dom::from_ref(self),
103 };
104
105 self.global()
106 .enqueue_microtask(Microtask::ReadableStreamByteTeeReadIntoRequest(
107 byte_tee_read_request_chunk,
108 ));
109 }
110
111 #[allow(clippy::borrowed_box)]
113 pub(crate) fn chunk_steps(
114 &self,
115 chunk: HeapBufferSource<ArrayBufferViewU8>,
116 can_gc: CanGc,
117 ) -> Fallible<()> {
118 let cx = GlobalScope::get_cx();
119
120 self.read_again_for_branch_1.set(false);
122
123 self.read_again_for_branch_2.set(false);
125
126 let byob_canceled = if self.for_branch2 {
128 self.canceled_2.get()
129 } else {
130 self.canceled_1.get()
131 };
132
133 let other_canceled = if self.for_branch2 {
135 self.canceled_1.get()
136 } else {
137 self.canceled_2.get()
138 };
139
140 if !other_canceled {
142 let clone_result = chunk.clone_as_uint8_array(cx);
144
145 if let Err(error) = clone_result {
147 rooted!(in(*cx) let mut error_value = UndefinedValue());
148 error
149 .clone()
150 .to_jsval(cx, &self.global(), error_value.handle_mut(), can_gc);
151
152 let byob_branch_controller = self.byob_branch.get_byte_controller();
154 byob_branch_controller.error(error_value.handle(), can_gc);
155
156 let other_branch_controller = self.other_branch.get_byte_controller();
158 other_branch_controller.error(error_value.handle(), can_gc);
159
160 let cancel_result =
162 self.stream
163 .cancel(cx, &self.stream.global(), error_value.handle(), can_gc);
164 self.cancel_promise.resolve_native(&cancel_result, can_gc);
165
166 return Ok(());
168 } else {
169 let cloned_chunk = clone_result.unwrap();
171
172 if !byob_canceled {
175 let byob_branch_controller = self.byob_branch.get_byte_controller();
176 byob_branch_controller.respond_with_new_view(cx, chunk, can_gc)?;
177 }
178
179 let other_branch_controller = self.other_branch.get_byte_controller();
181 other_branch_controller.enqueue(cx, cloned_chunk, can_gc)?;
182 }
183 } else if !byob_canceled {
184 let byob_branch_controller = self.byob_branch.get_byte_controller();
188 byob_branch_controller.respond_with_new_view(cx, chunk, can_gc)?;
189 }
190
191 self.reading.set(false);
193
194 if self.read_again_for_branch_1.get() {
196 self.pull_algorithm(Some(ByteTeePullAlgorithm::Pull1Algorithm), can_gc);
197 } else if self.read_again_for_branch_2.get() {
198 self.pull_algorithm(Some(ByteTeePullAlgorithm::Pull2Algorithm), can_gc);
200 }
201
202 Ok(())
203 }
204
205 pub(crate) fn close_steps(
207 &self,
208 chunk: Option<HeapBufferSource<ArrayBufferViewU8>>,
209 can_gc: CanGc,
210 ) -> Fallible<()> {
211 let cx = GlobalScope::get_cx();
212
213 self.reading.set(false);
215
216 let byob_canceled = if self.for_branch2 {
218 self.canceled_2.get()
219 } else {
220 self.canceled_1.get()
221 };
222
223 let other_canceled = if self.for_branch2 {
225 self.canceled_1.get()
226 } else {
227 self.canceled_2.get()
228 };
229
230 if !byob_canceled {
232 let byob_branch_controller = self.byob_branch.get_byte_controller();
233 byob_branch_controller.close(cx, can_gc)?;
234 }
235
236 if !other_canceled {
238 let other_branch_controller = self.other_branch.get_byte_controller();
239 other_branch_controller.close(cx, can_gc)?;
240 }
241
242 if let Some(chunk_value) = chunk {
244 if chunk_value.is_undefined() {
245 } else {
248 let chunk = chunk_value;
249 assert_eq!(chunk.byte_length(), 0);
251
252 if !byob_canceled {
255 let byob_branch_controller = self.byob_branch.get_byte_controller();
256 byob_branch_controller.respond_with_new_view(cx, chunk, can_gc)?;
257 }
258
259 if !other_canceled {
262 let other_branch_controller = self.other_branch.get_byte_controller();
263 if other_branch_controller.get_pending_pull_intos_size() > 0 {
264 other_branch_controller.respond(cx, 0, can_gc)?;
265 }
266 }
267 }
268 }
269
270 if !byob_canceled || !other_canceled {
272 self.cancel_promise.resolve_native(&(), can_gc);
273 }
274
275 Ok(())
276 }
277 pub(crate) fn error_steps(&self) {
279 self.reading.set(false);
281 }
282
283 pub(crate) fn pull_algorithm(
284 &self,
285 byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
286 can_gc: CanGc,
287 ) {
288 self.tee_underlying_source
289 .pull_algorithm(byte_tee_pull_algorithm, can_gc);
290 }
291}