1use std::cell::Cell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::context::JSContext;
10use js::jsapi::Heap;
11use js::jsval::{JSVal, UndefinedValue};
12use js::typedarray::ArrayBufferViewU8;
13use script_bindings::error::Fallible;
14use script_bindings::reflector::{Reflector, reflect_dom_object};
15
16use super::byteteeunderlyingsource::ByteTeePullAlgorithm;
17use crate::dom::bindings::buffer_source::{BufferSource, HeapBufferSource};
18use crate::dom::bindings::error::{Error, ErrorToJsval};
19use crate::dom::bindings::reflector::DomGlobal;
20use crate::dom::bindings::root::{Dom, DomRoot};
21use crate::dom::bindings::trace::RootedTraceableBox;
22use crate::dom::globalscope::GlobalScope;
23use crate::dom::promise::Promise;
24use crate::dom::stream::byteteeunderlyingsource::ByteTeeUnderlyingSource;
25use crate::dom::stream::readablestream::ReadableStream;
26use crate::microtask::Microtask;
27use crate::script_runtime::CanGc;
28
29#[derive(JSTraceable, MallocSizeOf)]
30#[cfg_attr(crown, expect(crown::unrooted_must_root))]
31pub(crate) struct ByteTeeReadRequestMicrotask {
32 #[ignore_malloc_size_of = "mozjs"]
33 chunk: Box<Heap<JSVal>>,
34 tee_read_request: Dom<ByteTeeReadRequest>,
35}
36
37impl ByteTeeReadRequestMicrotask {
38 pub(crate) fn microtask_chunk_steps(&self, cx: &mut JSContext) {
39 self.tee_read_request
40 .chunk_steps(&self.chunk, cx)
41 .expect("ByteTeeReadRequestMicrotask::microtask_chunk_steps failed");
42 }
43}
44
45#[dom_struct]
46pub(crate) struct ByteTeeReadRequest {
48 reflector_: Reflector,
49 branch_1: Dom<ReadableStream>,
50 branch_2: Dom<ReadableStream>,
51 stream: Dom<ReadableStream>,
52 #[conditional_malloc_size_of]
53 read_again_for_branch_1: Rc<Cell<bool>>,
54 #[conditional_malloc_size_of]
55 read_again_for_branch_2: Rc<Cell<bool>>,
56 #[conditional_malloc_size_of]
57 reading: Rc<Cell<bool>>,
58 #[conditional_malloc_size_of]
59 canceled_1: Rc<Cell<bool>>,
60 #[conditional_malloc_size_of]
61 canceled_2: Rc<Cell<bool>>,
62 #[conditional_malloc_size_of]
63 cancel_promise: Rc<Promise>,
64 tee_underlying_source: Dom<ByteTeeUnderlyingSource>,
65}
66impl ByteTeeReadRequest {
67 #[allow(clippy::too_many_arguments)]
68 pub(crate) fn new(
69 branch_1: &ReadableStream,
70 branch_2: &ReadableStream,
71 stream: &ReadableStream,
72 read_again_for_branch_1: Rc<Cell<bool>>,
73 read_again_for_branch_2: Rc<Cell<bool>>,
74 reading: Rc<Cell<bool>>,
75 canceled_1: Rc<Cell<bool>>,
76 canceled_2: Rc<Cell<bool>>,
77 cancel_promise: Rc<Promise>,
78 tee_underlying_source: &ByteTeeUnderlyingSource,
79 global: &GlobalScope,
80 can_gc: CanGc,
81 ) -> DomRoot<Self> {
82 reflect_dom_object(
83 Box::new(ByteTeeReadRequest {
84 reflector_: Reflector::new(),
85 branch_1: Dom::from_ref(branch_1),
86 branch_2: Dom::from_ref(branch_2),
87 stream: Dom::from_ref(stream),
88 read_again_for_branch_1,
89 read_again_for_branch_2,
90 reading,
91 canceled_1,
92 canceled_2,
93 cancel_promise,
94 tee_underlying_source: Dom::from_ref(tee_underlying_source),
95 }),
96 global,
97 can_gc,
98 )
99 }
100
101 pub(crate) fn enqueue_chunk_steps(
104 &self,
105 global: &GlobalScope,
106 chunk: RootedTraceableBox<Heap<JSVal>>,
107 ) {
108 let byte_tee_read_request_chunk = ByteTeeReadRequestMicrotask {
110 chunk: Heap::boxed(*chunk.handle()),
111 tee_read_request: Dom::from_ref(self),
112 };
113 global.enqueue_microtask(Microtask::ReadableStreamByteTeeReadRequest(
114 byte_tee_read_request_chunk,
115 ));
116 }
117
118 #[allow(clippy::borrowed_box)]
120 pub(crate) fn chunk_steps(&self, chunk: &Box<Heap<JSVal>>, cx: &mut JSContext) -> Fallible<()> {
121 self.read_again_for_branch_1.set(false);
123
124 self.read_again_for_branch_2.set(false);
126
127 let chunk1 = chunk;
129 let chunk2 = chunk;
130
131 let handle_clone_error = |cx: &mut JSContext, error: Error| {
133 rooted!(&in(cx) let mut error_value = UndefinedValue());
134 error.to_jsval(
135 cx.into(),
136 &self.global(),
137 error_value.handle_mut(),
138 CanGc::from_cx(cx),
139 );
140
141 let branch_1_controller = self.branch_1.get_byte_controller();
142 let branch_2_controller = self.branch_2.get_byte_controller();
143
144 branch_1_controller.error(cx, error_value.handle());
145 branch_2_controller.error(cx, error_value.handle());
146
147 let cancel_result = self
148 .stream
149 .cancel(cx, &self.stream.global(), error_value.handle());
150 self.cancel_promise
151 .resolve_native(&cancel_result, CanGc::from_cx(cx));
152 };
153
154 let chunk1_view = if !self.canceled_1.get() {
156 Some(RootedTraceableBox::new(
157 HeapBufferSource::<ArrayBufferViewU8>::new(BufferSource::ArrayBufferView(
158 Heap::boxed(chunk1.get().to_object()),
159 )),
160 ))
161 } else {
162 None
163 };
164
165 let mut chunk2_view = None;
166
167 if !self.canceled_1.get() && !self.canceled_2.get() {
169 let chunk2_source =
171 RootedTraceableBox::new(HeapBufferSource::<ArrayBufferViewU8>::new(
172 BufferSource::ArrayBufferView(Heap::boxed(chunk2.get().to_object())),
173 ));
174 let clone_result = chunk2_source.clone_as_uint8_array(cx);
175
176 if let Err(error) = clone_result {
178 handle_clone_error(cx, error);
179 return Ok(());
180 } else {
181 chunk2_view = clone_result.ok();
183 }
184 } else if !self.canceled_2.get() {
185 let chunk2_source =
187 RootedTraceableBox::new(HeapBufferSource::<ArrayBufferViewU8>::new(
188 BufferSource::ArrayBufferView(Heap::boxed(chunk2.get().to_object())),
189 ));
190 match chunk2_source.clone_as_uint8_array(cx) {
191 Ok(clone) => chunk2_view = Some(clone),
192 Err(error) => {
193 handle_clone_error(cx, error);
194 return Ok(());
195 },
196 }
197 }
198
199 if let Some(chunk1_view) = chunk1_view {
201 let branch_1_controller = self.branch_1.get_byte_controller();
202 branch_1_controller.enqueue(cx, chunk1_view)?;
203 }
204
205 if let Some(chunk2_view) = chunk2_view {
207 let branch_2_controller = self.branch_2.get_byte_controller();
208 branch_2_controller.enqueue(cx, chunk2_view)?;
209 }
210
211 self.reading.set(false);
213
214 if self.read_again_for_branch_1.get() {
216 self.pull_algorithm(cx, Some(ByteTeePullAlgorithm::Pull1Algorithm));
217 } else if self.read_again_for_branch_2.get() {
218 self.pull_algorithm(cx, Some(ByteTeePullAlgorithm::Pull2Algorithm));
220 }
221
222 Ok(())
223 }
224
225 pub(crate) fn close_steps(&self, cx: &mut JSContext) -> Fallible<()> {
227 let branch_1_controller = self.branch_1.get_byte_controller();
228 let branch_2_controller = self.branch_2.get_byte_controller();
229
230 self.reading.set(false);
232
233 if !self.canceled_1.get() {
235 branch_1_controller.close(cx)?;
236 }
237
238 if !self.canceled_2.get() {
240 branch_2_controller.close(cx)?;
241 }
242
243 if branch_1_controller.get_pending_pull_intos_size() > 0 {
246 branch_1_controller.respond(cx, 0)?;
247 }
248
249 if branch_2_controller.get_pending_pull_intos_size() > 0 {
252 branch_2_controller.respond(cx, 0)?;
253 }
254
255 if !self.canceled_1.get() || !self.canceled_2.get() {
257 self.cancel_promise.resolve_native(&(), CanGc::from_cx(cx));
258 }
259
260 Ok(())
261 }
262
263 pub(crate) fn error_steps(&self) {
265 self.reading.set(false);
267 }
268
269 pub(crate) fn pull_algorithm(
270 &self,
271 cx: &mut JSContext,
272 byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
273 ) {
274 self.tee_underlying_source
275 .pull_algorithm(cx, byte_tee_pull_algorithm);
276 }
277}