1use std::cell::Cell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsapi::Heap;
10use js::jsval::{JSVal, UndefinedValue};
11use js::typedarray::ArrayBufferViewU8;
12use script_bindings::error::Fallible;
13
14use super::bindings::reflector::reflect_dom_object;
15use super::bindings::root::DomRoot;
16use super::byteteeunderlyingsource::ByteTeePullAlgorithm;
17use crate::dom::bindings::buffer_source::{BufferSource, HeapBufferSource};
18use crate::dom::bindings::error::{Error, ErrorToJsval};
19use crate::dom::bindings::reflector::{DomGlobal, Reflector};
20use crate::dom::bindings::root::Dom;
21use crate::dom::bindings::trace::RootedTraceableBox;
22use crate::dom::byteteeunderlyingsource::ByteTeeUnderlyingSource;
23use crate::dom::globalscope::GlobalScope;
24use crate::dom::promise::Promise;
25use crate::dom::readablestream::ReadableStream;
26use crate::microtask::Microtask;
27use crate::script_runtime::CanGc;
28
29#[derive(JSTraceable, MallocSizeOf)]
30#[cfg_attr(crown, expect(crown::unrooted_must_root))]
31pub(crate) struct ByteTeeReadRequestMicrotask {
32 #[ignore_malloc_size_of = "mozjs"]
33 chunk: Box<Heap<JSVal>>,
34 tee_read_request: Dom<ByteTeeReadRequest>,
35}
36
37impl ByteTeeReadRequestMicrotask {
38 pub(crate) fn microtask_chunk_steps(&self, can_gc: CanGc) {
39 self.tee_read_request
40 .chunk_steps(&self.chunk, can_gc)
41 .expect("ByteTeeReadRequestMicrotask::microtask_chunk_steps failed");
42 }
43}
44
45#[dom_struct]
46pub(crate) struct ByteTeeReadRequest {
48 reflector_: Reflector,
49 branch_1: Dom<ReadableStream>,
50 branch_2: Dom<ReadableStream>,
51 stream: Dom<ReadableStream>,
52 #[ignore_malloc_size_of = "Rc"]
53 read_again_for_branch_1: Rc<Cell<bool>>,
54 #[ignore_malloc_size_of = "Rc"]
55 read_again_for_branch_2: Rc<Cell<bool>>,
56 #[ignore_malloc_size_of = "Rc"]
57 reading: Rc<Cell<bool>>,
58 #[ignore_malloc_size_of = "Rc"]
59 canceled_1: Rc<Cell<bool>>,
60 #[ignore_malloc_size_of = "Rc"]
61 canceled_2: Rc<Cell<bool>>,
62 #[ignore_malloc_size_of = "Rc"]
63 cancel_promise: Rc<Promise>,
64 tee_underlying_source: Dom<ByteTeeUnderlyingSource>,
65}
66impl ByteTeeReadRequest {
67 #[allow(clippy::too_many_arguments)]
68 pub(crate) fn new(
69 branch_1: &ReadableStream,
70 branch_2: &ReadableStream,
71 stream: &ReadableStream,
72 read_again_for_branch_1: Rc<Cell<bool>>,
73 read_again_for_branch_2: Rc<Cell<bool>>,
74 reading: Rc<Cell<bool>>,
75 canceled_1: Rc<Cell<bool>>,
76 canceled_2: Rc<Cell<bool>>,
77 cancel_promise: Rc<Promise>,
78 tee_underlying_source: &ByteTeeUnderlyingSource,
79 global: &GlobalScope,
80 can_gc: CanGc,
81 ) -> DomRoot<Self> {
82 reflect_dom_object(
83 Box::new(ByteTeeReadRequest {
84 reflector_: Reflector::new(),
85 branch_1: Dom::from_ref(branch_1),
86 branch_2: Dom::from_ref(branch_2),
87 stream: Dom::from_ref(stream),
88 read_again_for_branch_1,
89 read_again_for_branch_2,
90 reading,
91 canceled_1,
92 canceled_2,
93 cancel_promise,
94 tee_underlying_source: Dom::from_ref(tee_underlying_source),
95 }),
96 global,
97 can_gc,
98 )
99 }
100
101 pub(crate) fn enqueue_chunk_steps(
104 &self,
105 global: &GlobalScope,
106 chunk: RootedTraceableBox<Heap<JSVal>>,
107 ) {
108 let byte_tee_read_request_chunk = ByteTeeReadRequestMicrotask {
110 chunk: Heap::boxed(*chunk.handle()),
111 tee_read_request: Dom::from_ref(self),
112 };
113 global.enqueue_microtask(Microtask::ReadableStreamByteTeeReadRequest(
114 byte_tee_read_request_chunk,
115 ));
116 }
117
118 #[allow(clippy::borrowed_box)]
120 pub(crate) fn chunk_steps(&self, chunk: &Box<Heap<JSVal>>, can_gc: CanGc) -> Fallible<()> {
121 let cx = GlobalScope::get_cx();
122
123 self.read_again_for_branch_1.set(false);
125
126 self.read_again_for_branch_2.set(false);
128
129 let chunk1 = chunk;
131 let chunk2 = chunk;
132
133 let handle_clone_error = |error: Error| {
135 rooted!(in(*cx) let mut error_value = UndefinedValue());
136 error
137 .clone()
138 .to_jsval(cx, &self.global(), error_value.handle_mut(), can_gc);
139
140 let branch_1_controller = self.branch_1.get_byte_controller();
141 let branch_2_controller = self.branch_2.get_byte_controller();
142
143 branch_1_controller.error(error_value.handle(), can_gc);
144 branch_2_controller.error(error_value.handle(), can_gc);
145
146 let cancel_result =
147 self.stream
148 .cancel(cx, &self.stream.global(), error_value.handle(), can_gc);
149 self.cancel_promise.resolve_native(&cancel_result, can_gc);
150 };
151
152 let chunk1_view = if !self.canceled_1.get() {
154 Some(HeapBufferSource::<ArrayBufferViewU8>::new(
155 BufferSource::ArrayBufferView(RootedTraceableBox::from_box(Heap::boxed(
156 chunk1.get().to_object(),
157 ))),
158 ))
159 } else {
160 None
161 };
162
163 let mut chunk2_view = None;
164
165 if !self.canceled_1.get() && !self.canceled_2.get() {
167 let chunk2_source =
169 HeapBufferSource::<ArrayBufferViewU8>::new(BufferSource::ArrayBufferView(
170 RootedTraceableBox::from_box(Heap::boxed(chunk2.get().to_object())),
171 ));
172 let clone_result = chunk2_source.clone_as_uint8_array(cx);
173
174 if let Err(error) = clone_result {
176 handle_clone_error(error);
177 return Ok(());
178 } else {
179 chunk2_view = clone_result.ok();
181 }
182 } else if !self.canceled_2.get() {
183 let chunk2_source =
185 HeapBufferSource::<ArrayBufferViewU8>::new(BufferSource::ArrayBufferView(
186 RootedTraceableBox::from_box(Heap::boxed(chunk2.get().to_object())),
187 ));
188 match chunk2_source.clone_as_uint8_array(cx) {
189 Ok(clone) => chunk2_view = Some(clone),
190 Err(error) => {
191 handle_clone_error(error);
192 return Ok(());
193 },
194 }
195 }
196
197 if let Some(chunk1_view) = chunk1_view {
199 let branch_1_controller = self.branch_1.get_byte_controller();
200 branch_1_controller.enqueue(cx, chunk1_view, can_gc)?;
201 }
202
203 if let Some(chunk2_view) = chunk2_view {
205 let branch_2_controller = self.branch_2.get_byte_controller();
206 branch_2_controller.enqueue(cx, chunk2_view, can_gc)?;
207 }
208
209 self.reading.set(false);
211
212 if self.read_again_for_branch_1.get() {
214 self.pull_algorithm(Some(ByteTeePullAlgorithm::Pull1Algorithm), can_gc);
215 } else if self.read_again_for_branch_2.get() {
216 self.pull_algorithm(Some(ByteTeePullAlgorithm::Pull2Algorithm), can_gc);
218 }
219
220 Ok(())
221 }
222
223 pub(crate) fn close_steps(&self, can_gc: CanGc) -> Fallible<()> {
225 let cx = GlobalScope::get_cx();
226 let branch_1_controller = self.branch_1.get_byte_controller();
227 let branch_2_controller = self.branch_2.get_byte_controller();
228
229 self.reading.set(false);
231
232 if !self.canceled_1.get() {
234 branch_1_controller.close(cx, can_gc)?;
235 }
236
237 if !self.canceled_2.get() {
239 branch_2_controller.close(cx, can_gc)?;
240 }
241
242 if branch_1_controller.get_pending_pull_intos_size() > 0 {
245 branch_1_controller.respond(cx, 0, can_gc)?;
246 }
247
248 if branch_2_controller.get_pending_pull_intos_size() > 0 {
251 branch_2_controller.respond(cx, 0, can_gc)?;
252 }
253
254 if !self.canceled_1.get() || !self.canceled_2.get() {
256 self.cancel_promise.resolve_native(&(), can_gc);
257 }
258
259 Ok(())
260 }
261
262 pub(crate) fn error_steps(&self) {
264 self.reading.set(false);
266 }
267
268 pub(crate) fn pull_algorithm(
269 &self,
270 byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
271 can_gc: CanGc,
272 ) {
273 self.tee_underlying_source
274 .pull_algorithm(byte_tee_pull_algorithm, can_gc);
275 }
276}