1use std::cell::Cell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsapi::Heap;
10use js::jsval::{JSVal, UndefinedValue};
11use js::typedarray::ArrayBufferViewU8;
12use script_bindings::error::Fallible;
13
14use super::byteteeunderlyingsource::ByteTeePullAlgorithm;
15use crate::dom::bindings::buffer_source::{BufferSource, HeapBufferSource};
16use crate::dom::bindings::error::{Error, ErrorToJsval};
17use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
18use crate::dom::bindings::root::{Dom, DomRoot};
19use crate::dom::bindings::trace::RootedTraceableBox;
20use crate::dom::globalscope::GlobalScope;
21use crate::dom::promise::Promise;
22use crate::dom::stream::byteteeunderlyingsource::ByteTeeUnderlyingSource;
23use crate::dom::stream::readablestream::ReadableStream;
24use crate::microtask::Microtask;
25use crate::script_runtime::CanGc;
26
27#[derive(JSTraceable, MallocSizeOf)]
28#[cfg_attr(crown, expect(crown::unrooted_must_root))]
29pub(crate) struct ByteTeeReadRequestMicrotask {
30 #[ignore_malloc_size_of = "mozjs"]
31 chunk: Box<Heap<JSVal>>,
32 tee_read_request: Dom<ByteTeeReadRequest>,
33}
34
35impl ByteTeeReadRequestMicrotask {
36 pub(crate) fn microtask_chunk_steps(&self, can_gc: CanGc) {
37 self.tee_read_request
38 .chunk_steps(&self.chunk, can_gc)
39 .expect("ByteTeeReadRequestMicrotask::microtask_chunk_steps failed");
40 }
41}
42
43#[dom_struct]
44pub(crate) struct ByteTeeReadRequest {
46 reflector_: Reflector,
47 branch_1: Dom<ReadableStream>,
48 branch_2: Dom<ReadableStream>,
49 stream: Dom<ReadableStream>,
50 #[ignore_malloc_size_of = "Rc"]
51 read_again_for_branch_1: Rc<Cell<bool>>,
52 #[ignore_malloc_size_of = "Rc"]
53 read_again_for_branch_2: Rc<Cell<bool>>,
54 #[ignore_malloc_size_of = "Rc"]
55 reading: Rc<Cell<bool>>,
56 #[ignore_malloc_size_of = "Rc"]
57 canceled_1: Rc<Cell<bool>>,
58 #[ignore_malloc_size_of = "Rc"]
59 canceled_2: Rc<Cell<bool>>,
60 #[ignore_malloc_size_of = "Rc"]
61 cancel_promise: Rc<Promise>,
62 tee_underlying_source: Dom<ByteTeeUnderlyingSource>,
63}
64impl ByteTeeReadRequest {
65 #[allow(clippy::too_many_arguments)]
66 pub(crate) fn new(
67 branch_1: &ReadableStream,
68 branch_2: &ReadableStream,
69 stream: &ReadableStream,
70 read_again_for_branch_1: Rc<Cell<bool>>,
71 read_again_for_branch_2: Rc<Cell<bool>>,
72 reading: Rc<Cell<bool>>,
73 canceled_1: Rc<Cell<bool>>,
74 canceled_2: Rc<Cell<bool>>,
75 cancel_promise: Rc<Promise>,
76 tee_underlying_source: &ByteTeeUnderlyingSource,
77 global: &GlobalScope,
78 can_gc: CanGc,
79 ) -> DomRoot<Self> {
80 reflect_dom_object(
81 Box::new(ByteTeeReadRequest {
82 reflector_: Reflector::new(),
83 branch_1: Dom::from_ref(branch_1),
84 branch_2: Dom::from_ref(branch_2),
85 stream: Dom::from_ref(stream),
86 read_again_for_branch_1,
87 read_again_for_branch_2,
88 reading,
89 canceled_1,
90 canceled_2,
91 cancel_promise,
92 tee_underlying_source: Dom::from_ref(tee_underlying_source),
93 }),
94 global,
95 can_gc,
96 )
97 }
98
99 pub(crate) fn enqueue_chunk_steps(
102 &self,
103 global: &GlobalScope,
104 chunk: RootedTraceableBox<Heap<JSVal>>,
105 ) {
106 let byte_tee_read_request_chunk = ByteTeeReadRequestMicrotask {
108 chunk: Heap::boxed(*chunk.handle()),
109 tee_read_request: Dom::from_ref(self),
110 };
111 global.enqueue_microtask(Microtask::ReadableStreamByteTeeReadRequest(
112 byte_tee_read_request_chunk,
113 ));
114 }
115
116 #[allow(clippy::borrowed_box)]
118 pub(crate) fn chunk_steps(&self, chunk: &Box<Heap<JSVal>>, can_gc: CanGc) -> Fallible<()> {
119 let cx = GlobalScope::get_cx();
120
121 self.read_again_for_branch_1.set(false);
123
124 self.read_again_for_branch_2.set(false);
126
127 let chunk1 = chunk;
129 let chunk2 = chunk;
130
131 let handle_clone_error = |error: Error| {
133 rooted!(in(*cx) let mut error_value = UndefinedValue());
134 error
135 .clone()
136 .to_jsval(cx, &self.global(), error_value.handle_mut(), can_gc);
137
138 let branch_1_controller = self.branch_1.get_byte_controller();
139 let branch_2_controller = self.branch_2.get_byte_controller();
140
141 branch_1_controller.error(error_value.handle(), can_gc);
142 branch_2_controller.error(error_value.handle(), can_gc);
143
144 let cancel_result =
145 self.stream
146 .cancel(cx, &self.stream.global(), error_value.handle(), can_gc);
147 self.cancel_promise.resolve_native(&cancel_result, can_gc);
148 };
149
150 let chunk1_view = if !self.canceled_1.get() {
152 Some(HeapBufferSource::<ArrayBufferViewU8>::new(
153 BufferSource::ArrayBufferView(RootedTraceableBox::from_box(Heap::boxed(
154 chunk1.get().to_object(),
155 ))),
156 ))
157 } else {
158 None
159 };
160
161 let mut chunk2_view = None;
162
163 if !self.canceled_1.get() && !self.canceled_2.get() {
165 let chunk2_source =
167 HeapBufferSource::<ArrayBufferViewU8>::new(BufferSource::ArrayBufferView(
168 RootedTraceableBox::from_box(Heap::boxed(chunk2.get().to_object())),
169 ));
170 let clone_result = chunk2_source.clone_as_uint8_array(cx);
171
172 if let Err(error) = clone_result {
174 handle_clone_error(error);
175 return Ok(());
176 } else {
177 chunk2_view = clone_result.ok();
179 }
180 } else if !self.canceled_2.get() {
181 let chunk2_source =
183 HeapBufferSource::<ArrayBufferViewU8>::new(BufferSource::ArrayBufferView(
184 RootedTraceableBox::from_box(Heap::boxed(chunk2.get().to_object())),
185 ));
186 match chunk2_source.clone_as_uint8_array(cx) {
187 Ok(clone) => chunk2_view = Some(clone),
188 Err(error) => {
189 handle_clone_error(error);
190 return Ok(());
191 },
192 }
193 }
194
195 if let Some(chunk1_view) = chunk1_view {
197 let branch_1_controller = self.branch_1.get_byte_controller();
198 branch_1_controller.enqueue(cx, chunk1_view, can_gc)?;
199 }
200
201 if let Some(chunk2_view) = chunk2_view {
203 let branch_2_controller = self.branch_2.get_byte_controller();
204 branch_2_controller.enqueue(cx, chunk2_view, can_gc)?;
205 }
206
207 self.reading.set(false);
209
210 if self.read_again_for_branch_1.get() {
212 self.pull_algorithm(Some(ByteTeePullAlgorithm::Pull1Algorithm), can_gc);
213 } else if self.read_again_for_branch_2.get() {
214 self.pull_algorithm(Some(ByteTeePullAlgorithm::Pull2Algorithm), can_gc);
216 }
217
218 Ok(())
219 }
220
221 pub(crate) fn close_steps(&self, can_gc: CanGc) -> Fallible<()> {
223 let cx = GlobalScope::get_cx();
224 let branch_1_controller = self.branch_1.get_byte_controller();
225 let branch_2_controller = self.branch_2.get_byte_controller();
226
227 self.reading.set(false);
229
230 if !self.canceled_1.get() {
232 branch_1_controller.close(cx, can_gc)?;
233 }
234
235 if !self.canceled_2.get() {
237 branch_2_controller.close(cx, can_gc)?;
238 }
239
240 if branch_1_controller.get_pending_pull_intos_size() > 0 {
243 branch_1_controller.respond(cx, 0, can_gc)?;
244 }
245
246 if branch_2_controller.get_pending_pull_intos_size() > 0 {
249 branch_2_controller.respond(cx, 0, can_gc)?;
250 }
251
252 if !self.canceled_1.get() || !self.canceled_2.get() {
254 self.cancel_promise.resolve_native(&(), can_gc);
255 }
256
257 Ok(())
258 }
259
260 pub(crate) fn error_steps(&self) {
262 self.reading.set(false);
264 }
265
266 pub(crate) fn pull_algorithm(
267 &self,
268 byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
269 can_gc: CanGc,
270 ) {
271 self.tee_underlying_source
272 .pull_algorithm(byte_tee_pull_algorithm, can_gc);
273 }
274}