1use std::cell::Cell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::context::JSContext;
10use js::jsapi::Heap;
11use js::jsval::{JSVal, UndefinedValue};
12use js::typedarray::ArrayBufferViewU8;
13use script_bindings::error::Fallible;
14use script_bindings::reflector::{Reflector, reflect_dom_object_with_cx};
15
16use super::byteteeunderlyingsource::ByteTeePullAlgorithm;
17use crate::dom::bindings::buffer_source::{BufferSource, HeapBufferSource};
18use crate::dom::bindings::error::{Error, ErrorToJsval};
19use crate::dom::bindings::reflector::DomGlobal;
20use crate::dom::bindings::root::{Dom, DomRoot};
21use crate::dom::bindings::trace::RootedTraceableBox;
22use crate::dom::globalscope::GlobalScope;
23use crate::dom::promise::Promise;
24use crate::dom::stream::byteteeunderlyingsource::ByteTeeUnderlyingSource;
25use crate::dom::stream::readablestream::ReadableStream;
26use crate::microtask::Microtask;
27
28#[derive(JSTraceable, MallocSizeOf)]
29#[cfg_attr(crown, expect(crown::unrooted_must_root))]
30pub(crate) struct ByteTeeReadRequestMicrotask {
31 #[ignore_malloc_size_of = "mozjs"]
32 chunk: Box<Heap<JSVal>>,
33 tee_read_request: Dom<ByteTeeReadRequest>,
34}
35
36impl ByteTeeReadRequestMicrotask {
37 pub(crate) fn microtask_chunk_steps(&self, cx: &mut JSContext) {
38 self.tee_read_request
39 .chunk_steps(&self.chunk, cx)
40 .expect("ByteTeeReadRequestMicrotask::microtask_chunk_steps failed");
41 }
42}
43
44#[dom_struct]
45pub(crate) struct ByteTeeReadRequest {
47 reflector_: Reflector,
48 branch_1: Dom<ReadableStream>,
49 branch_2: Dom<ReadableStream>,
50 stream: Dom<ReadableStream>,
51 #[conditional_malloc_size_of]
52 read_again_for_branch_1: Rc<Cell<bool>>,
53 #[conditional_malloc_size_of]
54 read_again_for_branch_2: Rc<Cell<bool>>,
55 #[conditional_malloc_size_of]
56 reading: Rc<Cell<bool>>,
57 #[conditional_malloc_size_of]
58 canceled_1: Rc<Cell<bool>>,
59 #[conditional_malloc_size_of]
60 canceled_2: Rc<Cell<bool>>,
61 #[conditional_malloc_size_of]
62 cancel_promise: Rc<Promise>,
63 tee_underlying_source: Dom<ByteTeeUnderlyingSource>,
64}
65impl ByteTeeReadRequest {
66 #[allow(clippy::too_many_arguments)]
67 pub(crate) fn new(
68 cx: &mut JSContext,
69 branch_1: &ReadableStream,
70 branch_2: &ReadableStream,
71 stream: &ReadableStream,
72 read_again_for_branch_1: Rc<Cell<bool>>,
73 read_again_for_branch_2: Rc<Cell<bool>>,
74 reading: Rc<Cell<bool>>,
75 canceled_1: Rc<Cell<bool>>,
76 canceled_2: Rc<Cell<bool>>,
77 cancel_promise: Rc<Promise>,
78 tee_underlying_source: &ByteTeeUnderlyingSource,
79 global: &GlobalScope,
80 ) -> DomRoot<Self> {
81 reflect_dom_object_with_cx(
82 Box::new(ByteTeeReadRequest {
83 reflector_: Reflector::new(),
84 branch_1: Dom::from_ref(branch_1),
85 branch_2: Dom::from_ref(branch_2),
86 stream: Dom::from_ref(stream),
87 read_again_for_branch_1,
88 read_again_for_branch_2,
89 reading,
90 canceled_1,
91 canceled_2,
92 cancel_promise,
93 tee_underlying_source: Dom::from_ref(tee_underlying_source),
94 }),
95 global,
96 cx,
97 )
98 }
99
100 pub(crate) fn enqueue_chunk_steps(
103 &self,
104 cx: &mut JSContext,
105 global: &GlobalScope,
106 chunk: RootedTraceableBox<Heap<JSVal>>,
107 ) {
108 let byte_tee_read_request_chunk = ByteTeeReadRequestMicrotask {
110 chunk: Heap::boxed(*chunk.handle()),
111 tee_read_request: Dom::from_ref(self),
112 };
113 global.enqueue_microtask(
114 cx,
115 Microtask::ReadableStreamByteTeeReadRequest(byte_tee_read_request_chunk),
116 );
117 }
118
119 #[allow(clippy::borrowed_box)]
121 pub(crate) fn chunk_steps(&self, chunk: &Box<Heap<JSVal>>, cx: &mut JSContext) -> Fallible<()> {
122 self.read_again_for_branch_1.set(false);
124
125 self.read_again_for_branch_2.set(false);
127
128 let chunk1 = chunk;
130 let chunk2 = chunk;
131
132 let handle_clone_error = |cx: &mut JSContext, error: Error| {
134 rooted!(&in(cx) let mut error_value = UndefinedValue());
135 error.to_jsval(cx, &self.global(), error_value.handle_mut());
136
137 let branch_1_controller = self.branch_1.get_byte_controller();
138 let branch_2_controller = self.branch_2.get_byte_controller();
139
140 branch_1_controller.error(cx, error_value.handle());
141 branch_2_controller.error(cx, error_value.handle());
142
143 let cancel_result = self
144 .stream
145 .cancel(cx, &self.stream.global(), error_value.handle());
146 self.cancel_promise.resolve_native(cx, &cancel_result);
147 };
148
149 let chunk1_view = if !self.canceled_1.get() {
151 Some(RootedTraceableBox::new(
152 HeapBufferSource::<ArrayBufferViewU8>::new(BufferSource::ArrayBufferView(
153 Heap::boxed(chunk1.get().to_object()),
154 )),
155 ))
156 } else {
157 None
158 };
159
160 let mut chunk2_view = None;
161
162 if !self.canceled_1.get() && !self.canceled_2.get() {
164 let chunk2_source =
166 RootedTraceableBox::new(HeapBufferSource::<ArrayBufferViewU8>::new(
167 BufferSource::ArrayBufferView(Heap::boxed(chunk2.get().to_object())),
168 ));
169 let clone_result = chunk2_source.clone_as_uint8_array(cx);
170
171 if let Err(error) = clone_result {
173 handle_clone_error(cx, error);
174 return Ok(());
175 } else {
176 chunk2_view = clone_result.ok();
178 }
179 } else if !self.canceled_2.get() {
180 let chunk2_source =
182 RootedTraceableBox::new(HeapBufferSource::<ArrayBufferViewU8>::new(
183 BufferSource::ArrayBufferView(Heap::boxed(chunk2.get().to_object())),
184 ));
185 match chunk2_source.clone_as_uint8_array(cx) {
186 Ok(clone) => chunk2_view = Some(clone),
187 Err(error) => {
188 handle_clone_error(cx, error);
189 return Ok(());
190 },
191 }
192 }
193
194 if let Some(chunk1_view) = chunk1_view {
196 let branch_1_controller = self.branch_1.get_byte_controller();
197 branch_1_controller.enqueue(cx, chunk1_view)?;
198 }
199
200 if let Some(chunk2_view) = chunk2_view {
202 let branch_2_controller = self.branch_2.get_byte_controller();
203 branch_2_controller.enqueue(cx, chunk2_view)?;
204 }
205
206 self.reading.set(false);
208
209 if self.read_again_for_branch_1.get() {
211 self.pull_algorithm(cx, Some(ByteTeePullAlgorithm::Pull1Algorithm));
212 } else if self.read_again_for_branch_2.get() {
213 self.pull_algorithm(cx, Some(ByteTeePullAlgorithm::Pull2Algorithm));
215 }
216
217 Ok(())
218 }
219
220 pub(crate) fn close_steps(&self, cx: &mut JSContext) -> Fallible<()> {
222 let branch_1_controller = self.branch_1.get_byte_controller();
223 let branch_2_controller = self.branch_2.get_byte_controller();
224
225 self.reading.set(false);
227
228 if !self.canceled_1.get() {
230 branch_1_controller.close(cx)?;
231 }
232
233 if !self.canceled_2.get() {
235 branch_2_controller.close(cx)?;
236 }
237
238 if branch_1_controller.get_pending_pull_intos_size() > 0 {
241 branch_1_controller.respond(cx, 0)?;
242 }
243
244 if branch_2_controller.get_pending_pull_intos_size() > 0 {
247 branch_2_controller.respond(cx, 0)?;
248 }
249
250 if !self.canceled_1.get() || !self.canceled_2.get() {
252 self.cancel_promise.resolve_native(cx, &());
253 }
254
255 Ok(())
256 }
257
258 pub(crate) fn error_steps(&self) {
260 self.reading.set(false);
262 }
263
264 pub(crate) fn pull_algorithm(
265 &self,
266 cx: &mut JSContext,
267 byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
268 ) {
269 self.tee_underlying_source
270 .pull_algorithm(cx, byte_tee_pull_algorithm);
271 }
272}