1use std::cell::Cell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsapi::Heap;
10use js::jsval::{JSVal, UndefinedValue};
11use js::typedarray::ArrayBufferViewU8;
12use script_bindings::error::Fallible;
13
14use super::byteteeunderlyingsource::ByteTeePullAlgorithm;
15use crate::dom::bindings::buffer_source::{BufferSource, HeapBufferSource};
16use crate::dom::bindings::error::{Error, ErrorToJsval};
17use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
18use crate::dom::bindings::root::{Dom, DomRoot};
19use crate::dom::bindings::trace::RootedTraceableBox;
20use crate::dom::globalscope::GlobalScope;
21use crate::dom::promise::Promise;
22use crate::dom::stream::byteteeunderlyingsource::ByteTeeUnderlyingSource;
23use crate::dom::stream::readablestream::ReadableStream;
24use crate::microtask::Microtask;
25use crate::script_runtime::CanGc;
26
27#[derive(JSTraceable, MallocSizeOf)]
28#[cfg_attr(crown, expect(crown::unrooted_must_root))]
29pub(crate) struct ByteTeeReadRequestMicrotask {
30 #[ignore_malloc_size_of = "mozjs"]
31 chunk: Box<Heap<JSVal>>,
32 tee_read_request: Dom<ByteTeeReadRequest>,
33}
34
35impl ByteTeeReadRequestMicrotask {
36 pub(crate) fn microtask_chunk_steps(&self, cx: &mut js::context::JSContext) {
37 self.tee_read_request
38 .chunk_steps(&self.chunk, cx)
39 .expect("ByteTeeReadRequestMicrotask::microtask_chunk_steps failed");
40 }
41}
42
43#[dom_struct]
44pub(crate) struct ByteTeeReadRequest {
46 reflector_: Reflector,
47 branch_1: Dom<ReadableStream>,
48 branch_2: Dom<ReadableStream>,
49 stream: Dom<ReadableStream>,
50 #[conditional_malloc_size_of]
51 read_again_for_branch_1: Rc<Cell<bool>>,
52 #[conditional_malloc_size_of]
53 read_again_for_branch_2: Rc<Cell<bool>>,
54 #[conditional_malloc_size_of]
55 reading: Rc<Cell<bool>>,
56 #[conditional_malloc_size_of]
57 canceled_1: Rc<Cell<bool>>,
58 #[conditional_malloc_size_of]
59 canceled_2: Rc<Cell<bool>>,
60 #[conditional_malloc_size_of]
61 cancel_promise: Rc<Promise>,
62 tee_underlying_source: Dom<ByteTeeUnderlyingSource>,
63}
64impl ByteTeeReadRequest {
65 #[allow(clippy::too_many_arguments)]
66 pub(crate) fn new(
67 branch_1: &ReadableStream,
68 branch_2: &ReadableStream,
69 stream: &ReadableStream,
70 read_again_for_branch_1: Rc<Cell<bool>>,
71 read_again_for_branch_2: Rc<Cell<bool>>,
72 reading: Rc<Cell<bool>>,
73 canceled_1: Rc<Cell<bool>>,
74 canceled_2: Rc<Cell<bool>>,
75 cancel_promise: Rc<Promise>,
76 tee_underlying_source: &ByteTeeUnderlyingSource,
77 global: &GlobalScope,
78 can_gc: CanGc,
79 ) -> DomRoot<Self> {
80 reflect_dom_object(
81 Box::new(ByteTeeReadRequest {
82 reflector_: Reflector::new(),
83 branch_1: Dom::from_ref(branch_1),
84 branch_2: Dom::from_ref(branch_2),
85 stream: Dom::from_ref(stream),
86 read_again_for_branch_1,
87 read_again_for_branch_2,
88 reading,
89 canceled_1,
90 canceled_2,
91 cancel_promise,
92 tee_underlying_source: Dom::from_ref(tee_underlying_source),
93 }),
94 global,
95 can_gc,
96 )
97 }
98
99 pub(crate) fn enqueue_chunk_steps(
102 &self,
103 global: &GlobalScope,
104 chunk: RootedTraceableBox<Heap<JSVal>>,
105 ) {
106 let byte_tee_read_request_chunk = ByteTeeReadRequestMicrotask {
108 chunk: Heap::boxed(*chunk.handle()),
109 tee_read_request: Dom::from_ref(self),
110 };
111 global.enqueue_microtask(Microtask::ReadableStreamByteTeeReadRequest(
112 byte_tee_read_request_chunk,
113 ));
114 }
115
116 #[allow(clippy::borrowed_box)]
118 pub(crate) fn chunk_steps(
119 &self,
120 chunk: &Box<Heap<JSVal>>,
121 cx: &mut js::context::JSContext,
122 ) -> Fallible<()> {
123 self.read_again_for_branch_1.set(false);
125
126 self.read_again_for_branch_2.set(false);
128
129 let chunk1 = chunk;
131 let chunk2 = chunk;
132
133 let handle_clone_error = |cx: &mut js::context::JSContext, error: Error| {
135 rooted!(&in(cx) let mut error_value = UndefinedValue());
136 error.to_jsval(
137 cx.into(),
138 &self.global(),
139 error_value.handle_mut(),
140 CanGc::from_cx(cx),
141 );
142
143 let branch_1_controller = self.branch_1.get_byte_controller();
144 let branch_2_controller = self.branch_2.get_byte_controller();
145
146 branch_1_controller.error(error_value.handle(), CanGc::from_cx(cx));
147 branch_2_controller.error(error_value.handle(), CanGc::from_cx(cx));
148
149 let cancel_result = self
150 .stream
151 .cancel(cx, &self.stream.global(), error_value.handle());
152 self.cancel_promise
153 .resolve_native(&cancel_result, CanGc::from_cx(cx));
154 };
155
156 let chunk1_view = if !self.canceled_1.get() {
158 Some(HeapBufferSource::<ArrayBufferViewU8>::new(
159 BufferSource::ArrayBufferView(RootedTraceableBox::from_box(Heap::boxed(
160 chunk1.get().to_object(),
161 ))),
162 ))
163 } else {
164 None
165 };
166
167 let mut chunk2_view = None;
168
169 if !self.canceled_1.get() && !self.canceled_2.get() {
171 let chunk2_source =
173 HeapBufferSource::<ArrayBufferViewU8>::new(BufferSource::ArrayBufferView(
174 RootedTraceableBox::from_box(Heap::boxed(chunk2.get().to_object())),
175 ));
176 let clone_result = chunk2_source.clone_as_uint8_array(cx.into());
177
178 if let Err(error) = clone_result {
180 handle_clone_error(cx, error);
181 return Ok(());
182 } else {
183 chunk2_view = clone_result.ok();
185 }
186 } else if !self.canceled_2.get() {
187 let chunk2_source =
189 HeapBufferSource::<ArrayBufferViewU8>::new(BufferSource::ArrayBufferView(
190 RootedTraceableBox::from_box(Heap::boxed(chunk2.get().to_object())),
191 ));
192 match chunk2_source.clone_as_uint8_array(cx.into()) {
193 Ok(clone) => chunk2_view = Some(clone),
194 Err(error) => {
195 handle_clone_error(cx, error);
196 return Ok(());
197 },
198 }
199 }
200
201 if let Some(chunk1_view) = chunk1_view {
203 let branch_1_controller = self.branch_1.get_byte_controller();
204 branch_1_controller.enqueue(cx, chunk1_view)?;
205 }
206
207 if let Some(chunk2_view) = chunk2_view {
209 let branch_2_controller = self.branch_2.get_byte_controller();
210 branch_2_controller.enqueue(cx, chunk2_view)?;
211 }
212
213 self.reading.set(false);
215
216 if self.read_again_for_branch_1.get() {
218 self.pull_algorithm(cx, Some(ByteTeePullAlgorithm::Pull1Algorithm));
219 } else if self.read_again_for_branch_2.get() {
220 self.pull_algorithm(cx, Some(ByteTeePullAlgorithm::Pull2Algorithm));
222 }
223
224 Ok(())
225 }
226
227 pub(crate) fn close_steps(&self, can_gc: CanGc) -> Fallible<()> {
229 let cx = GlobalScope::get_cx();
230 let branch_1_controller = self.branch_1.get_byte_controller();
231 let branch_2_controller = self.branch_2.get_byte_controller();
232
233 self.reading.set(false);
235
236 if !self.canceled_1.get() {
238 branch_1_controller.close(cx, can_gc)?;
239 }
240
241 if !self.canceled_2.get() {
243 branch_2_controller.close(cx, can_gc)?;
244 }
245
246 if branch_1_controller.get_pending_pull_intos_size() > 0 {
249 branch_1_controller.respond(cx, 0, can_gc)?;
250 }
251
252 if branch_2_controller.get_pending_pull_intos_size() > 0 {
255 branch_2_controller.respond(cx, 0, can_gc)?;
256 }
257
258 if !self.canceled_1.get() || !self.canceled_2.get() {
260 self.cancel_promise.resolve_native(&(), can_gc);
261 }
262
263 Ok(())
264 }
265
266 pub(crate) fn error_steps(&self) {
268 self.reading.set(false);
270 }
271
272 pub(crate) fn pull_algorithm(
273 &self,
274 cx: &mut js::context::JSContext,
275 byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
276 ) {
277 self.tee_underlying_source
278 .pull_algorithm(byte_tee_pull_algorithm, CanGc::from_cx(cx));
279 }
280}