1use std::cell::Cell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsval::UndefinedValue;
10use js::typedarray::ArrayBufferViewU8;
11
12use super::bindings::reflector::reflect_dom_object;
13use super::bindings::root::DomRoot;
14use super::byteteeunderlyingsource::ByteTeePullAlgorithm;
15use crate::dom::bindings::buffer_source::HeapBufferSource;
16use crate::dom::bindings::error::{ErrorToJsval, Fallible};
17use crate::dom::bindings::reflector::{DomGlobal, Reflector};
18use crate::dom::bindings::root::Dom;
19use crate::dom::byteteeunderlyingsource::ByteTeeUnderlyingSource;
20use crate::dom::globalscope::GlobalScope;
21use crate::dom::promise::Promise;
22use crate::dom::readablestream::ReadableStream;
23use crate::microtask::Microtask;
24use crate::script_runtime::CanGc;
25
26#[derive(JSTraceable, MallocSizeOf)]
27#[cfg_attr(crown, expect(crown::unrooted_must_root))]
28pub(crate) struct ByteTeeReadIntoRequestMicrotask {
29 #[ignore_malloc_size_of = "mozjs"]
30 chunk: HeapBufferSource<ArrayBufferViewU8>,
31 tee_read_request: Dom<ByteTeeReadIntoRequest>,
32}
33
34impl ByteTeeReadIntoRequestMicrotask {
35 pub(crate) fn microtask_chunk_steps(&self, can_gc: CanGc) {
36 self.tee_read_request
37 .chunk_steps(self.chunk.clone(), can_gc)
38 .expect("Failed to enqueue chunk");
39 }
40}
41
42#[dom_struct]
43pub(crate) struct ByteTeeReadIntoRequest {
44 reflector_: Reflector,
45 for_branch2: bool,
46 byob_branch: Dom<ReadableStream>,
47 other_branch: Dom<ReadableStream>,
48 stream: Dom<ReadableStream>,
49 #[ignore_malloc_size_of = "Rc"]
50 read_again_for_branch_1: Rc<Cell<bool>>,
51 #[ignore_malloc_size_of = "Rc"]
52 read_again_for_branch_2: Rc<Cell<bool>>,
53 #[ignore_malloc_size_of = "Rc"]
54 reading: Rc<Cell<bool>>,
55 #[ignore_malloc_size_of = "Rc"]
56 canceled_1: Rc<Cell<bool>>,
57 #[ignore_malloc_size_of = "Rc"]
58 canceled_2: Rc<Cell<bool>>,
59 #[ignore_malloc_size_of = "Rc"]
60 cancel_promise: Rc<Promise>,
61 tee_underlying_source: Dom<ByteTeeUnderlyingSource>,
62}
63impl ByteTeeReadIntoRequest {
64 #[allow(clippy::too_many_arguments)]
65 pub(crate) fn new(
66 for_branch2: bool,
67 byob_branch: &ReadableStream,
68 other_branch: &ReadableStream,
69 stream: &ReadableStream,
70 read_again_for_branch_1: Rc<Cell<bool>>,
71 read_again_for_branch_2: Rc<Cell<bool>>,
72 reading: Rc<Cell<bool>>,
73 canceled_1: Rc<Cell<bool>>,
74 canceled_2: Rc<Cell<bool>>,
75 cancel_promise: Rc<Promise>,
76 tee_underlying_source: &ByteTeeUnderlyingSource,
77 global: &GlobalScope,
78 can_gc: CanGc,
79 ) -> DomRoot<Self> {
80 reflect_dom_object(
81 Box::new(ByteTeeReadIntoRequest {
82 reflector_: Reflector::new(),
83 for_branch2,
84 byob_branch: Dom::from_ref(byob_branch),
85 other_branch: Dom::from_ref(other_branch),
86 stream: Dom::from_ref(stream),
87 read_again_for_branch_1,
88 read_again_for_branch_2,
89 reading,
90 canceled_1,
91 canceled_2,
92 cancel_promise,
93 tee_underlying_source: Dom::from_ref(tee_underlying_source),
94 }),
95 global,
96 can_gc,
97 )
98 }
99
100 pub(crate) fn enqueue_chunk_steps(&self, chunk: HeapBufferSource<ArrayBufferViewU8>) {
101 let byte_tee_read_request_chunk = ByteTeeReadIntoRequestMicrotask {
103 chunk,
104 tee_read_request: Dom::from_ref(self),
105 };
106
107 self.global()
108 .enqueue_microtask(Microtask::ReadableStreamByteTeeReadIntoRequest(
109 byte_tee_read_request_chunk,
110 ));
111 }
112
113 #[allow(clippy::borrowed_box)]
115 pub(crate) fn chunk_steps(
116 &self,
117 chunk: HeapBufferSource<ArrayBufferViewU8>,
118 can_gc: CanGc,
119 ) -> Fallible<()> {
120 let cx = GlobalScope::get_cx();
121
122 self.read_again_for_branch_1.set(false);
124
125 self.read_again_for_branch_2.set(false);
127
128 let byob_canceled = if self.for_branch2 {
130 self.canceled_2.get()
131 } else {
132 self.canceled_1.get()
133 };
134
135 let other_canceled = if self.for_branch2 {
137 self.canceled_1.get()
138 } else {
139 self.canceled_2.get()
140 };
141
142 if !other_canceled {
144 let clone_result = chunk.clone_as_uint8_array(cx);
146
147 if let Err(error) = clone_result {
149 rooted!(in(*cx) let mut error_value = UndefinedValue());
150 error
151 .clone()
152 .to_jsval(cx, &self.global(), error_value.handle_mut(), can_gc);
153
154 let byob_branch_controller = self.byob_branch.get_byte_controller();
156 byob_branch_controller.error(error_value.handle(), can_gc);
157
158 let other_branch_controller = self.other_branch.get_byte_controller();
160 other_branch_controller.error(error_value.handle(), can_gc);
161
162 let cancel_result =
164 self.stream
165 .cancel(cx, &self.stream.global(), error_value.handle(), can_gc);
166 self.cancel_promise.resolve_native(&cancel_result, can_gc);
167
168 return Ok(());
170 } else {
171 let cloned_chunk = clone_result.unwrap();
173
174 if !byob_canceled {
177 let byob_branch_controller = self.byob_branch.get_byte_controller();
178 byob_branch_controller.respond_with_new_view(cx, chunk, can_gc)?;
179 }
180
181 let other_branch_controller = self.other_branch.get_byte_controller();
183 other_branch_controller.enqueue(cx, cloned_chunk, can_gc)?;
184 }
185 } else if !byob_canceled {
186 let byob_branch_controller = self.byob_branch.get_byte_controller();
190 byob_branch_controller.respond_with_new_view(cx, chunk, can_gc)?;
191 }
192
193 self.reading.set(false);
195
196 if self.read_again_for_branch_1.get() {
198 self.pull_algorithm(Some(ByteTeePullAlgorithm::Pull1Algorithm), can_gc);
199 } else if self.read_again_for_branch_2.get() {
200 self.pull_algorithm(Some(ByteTeePullAlgorithm::Pull2Algorithm), can_gc);
202 }
203
204 Ok(())
205 }
206
207 pub(crate) fn close_steps(
209 &self,
210 chunk: Option<HeapBufferSource<ArrayBufferViewU8>>,
211 can_gc: CanGc,
212 ) -> Fallible<()> {
213 let cx = GlobalScope::get_cx();
214
215 self.reading.set(false);
217
218 let byob_canceled = if self.for_branch2 {
220 self.canceled_2.get()
221 } else {
222 self.canceled_1.get()
223 };
224
225 let other_canceled = if self.for_branch2 {
227 self.canceled_1.get()
228 } else {
229 self.canceled_2.get()
230 };
231
232 if !byob_canceled {
234 let byob_branch_controller = self.byob_branch.get_byte_controller();
235 byob_branch_controller.close(cx, can_gc)?;
236 }
237
238 if !other_canceled {
240 let other_branch_controller = self.other_branch.get_byte_controller();
241 other_branch_controller.close(cx, can_gc)?;
242 }
243
244 if let Some(chunk_value) = chunk {
246 if chunk_value.is_undefined() {
247 } else {
250 let chunk = chunk_value;
251 assert_eq!(chunk.byte_length(), 0);
253
254 if !byob_canceled {
257 let byob_branch_controller = self.byob_branch.get_byte_controller();
258 byob_branch_controller.respond_with_new_view(cx, chunk, can_gc)?;
259 }
260
261 if !other_canceled {
264 let other_branch_controller = self.other_branch.get_byte_controller();
265 if other_branch_controller.get_pending_pull_intos_size() > 0 {
266 other_branch_controller.respond(cx, 0, can_gc)?;
267 }
268 }
269 }
270 }
271
272 if !byob_canceled || !other_canceled {
274 self.cancel_promise.resolve_native(&(), can_gc);
275 }
276
277 Ok(())
278 }
279 pub(crate) fn error_steps(&self) {
281 self.reading.set(false);
283 }
284
285 pub(crate) fn pull_algorithm(
286 &self,
287 byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
288 can_gc: CanGc,
289 ) {
290 self.tee_underlying_source
291 .pull_algorithm(byte_tee_pull_algorithm, can_gc);
292 }
293}