script/dom/stream/
byteteereadintorequest.rs1use std::cell::Cell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::context::JSContext;
10use js::jsval::UndefinedValue;
11use js::typedarray::ArrayBufferViewU8;
12use script_bindings::reflector::{Reflector, reflect_dom_object};
13use script_bindings::trace::RootedTraceableBox;
14
15use super::byteteeunderlyingsource::ByteTeePullAlgorithm;
16use crate::dom::bindings::buffer_source::HeapBufferSource;
17use crate::dom::bindings::error::{ErrorToJsval, Fallible};
18use crate::dom::bindings::refcounted::Trusted;
19use crate::dom::bindings::reflector::DomGlobal;
20use crate::dom::bindings::root::{Dom, DomRoot};
21use crate::dom::globalscope::GlobalScope;
22use crate::dom::promise::Promise;
23use crate::dom::stream::byteteeunderlyingsource::ByteTeeUnderlyingSource;
24use crate::dom::stream::readablestream::ReadableStream;
25use crate::microtask::Microtask;
26use crate::script_runtime::CanGc;
27
28#[derive(JSTraceable, MallocSizeOf)]
29pub(crate) struct ByteTeeReadIntoRequestMicrotask {
30 #[ignore_malloc_size_of = "mozjs"]
31 chunk: RootedTraceableBox<HeapBufferSource<ArrayBufferViewU8>>,
32 tee_read_request: Trusted<ByteTeeReadIntoRequest>,
33}
34
35impl ByteTeeReadIntoRequestMicrotask {
36 pub(crate) fn microtask_chunk_steps(&self, cx: &mut JSContext) {
37 self.tee_read_request
38 .root()
39 .chunk_steps(&self.chunk, cx)
40 .expect("Failed to enqueue chunk");
41 }
42}
43
44#[dom_struct]
45pub(crate) struct ByteTeeReadIntoRequest {
46 reflector_: Reflector,
47 for_branch2: bool,
48 byob_branch: Dom<ReadableStream>,
49 other_branch: Dom<ReadableStream>,
50 stream: Dom<ReadableStream>,
51 #[conditional_malloc_size_of]
52 read_again_for_branch_1: Rc<Cell<bool>>,
53 #[conditional_malloc_size_of]
54 read_again_for_branch_2: Rc<Cell<bool>>,
55 #[conditional_malloc_size_of]
56 reading: Rc<Cell<bool>>,
57 #[conditional_malloc_size_of]
58 canceled_1: Rc<Cell<bool>>,
59 #[conditional_malloc_size_of]
60 canceled_2: Rc<Cell<bool>>,
61 #[conditional_malloc_size_of]
62 cancel_promise: Rc<Promise>,
63 tee_underlying_source: Dom<ByteTeeUnderlyingSource>,
64}
65impl ByteTeeReadIntoRequest {
66 #[allow(clippy::too_many_arguments)]
67 pub(crate) fn new(
68 for_branch2: bool,
69 byob_branch: &ReadableStream,
70 other_branch: &ReadableStream,
71 stream: &ReadableStream,
72 read_again_for_branch_1: Rc<Cell<bool>>,
73 read_again_for_branch_2: Rc<Cell<bool>>,
74 reading: Rc<Cell<bool>>,
75 canceled_1: Rc<Cell<bool>>,
76 canceled_2: Rc<Cell<bool>>,
77 cancel_promise: Rc<Promise>,
78 tee_underlying_source: &ByteTeeUnderlyingSource,
79 global: &GlobalScope,
80 can_gc: CanGc,
81 ) -> DomRoot<Self> {
82 reflect_dom_object(
83 Box::new(ByteTeeReadIntoRequest {
84 reflector_: Reflector::new(),
85 for_branch2,
86 byob_branch: Dom::from_ref(byob_branch),
87 other_branch: Dom::from_ref(other_branch),
88 stream: Dom::from_ref(stream),
89 read_again_for_branch_1,
90 read_again_for_branch_2,
91 reading,
92 canceled_1,
93 canceled_2,
94 cancel_promise,
95 tee_underlying_source: Dom::from_ref(tee_underlying_source),
96 }),
97 global,
98 can_gc,
99 )
100 }
101
102 pub(crate) fn enqueue_chunk_steps(
103 &self,
104 chunk: RootedTraceableBox<HeapBufferSource<ArrayBufferViewU8>>,
105 ) {
106 let byte_tee_read_request_chunk = ByteTeeReadIntoRequestMicrotask {
108 chunk,
109 tee_read_request: Trusted::new(self),
110 };
111
112 self.global()
113 .enqueue_microtask(Microtask::ReadableStreamByteTeeReadIntoRequest(
114 byte_tee_read_request_chunk,
115 ));
116 }
117
118 #[allow(clippy::borrowed_box)]
120 pub(crate) fn chunk_steps(
121 &self,
122 chunk: &HeapBufferSource<ArrayBufferViewU8>,
123 cx: &mut JSContext,
124 ) -> Fallible<()> {
125 self.read_again_for_branch_1.set(false);
127
128 self.read_again_for_branch_2.set(false);
130
131 let byob_canceled = if self.for_branch2 {
133 self.canceled_2.get()
134 } else {
135 self.canceled_1.get()
136 };
137
138 let other_canceled = if self.for_branch2 {
140 self.canceled_1.get()
141 } else {
142 self.canceled_2.get()
143 };
144
145 if !other_canceled {
147 let clone_result = chunk.clone_as_uint8_array(cx);
149
150 if let Err(error) = clone_result {
152 rooted!(&in(cx) let mut error_value = UndefinedValue());
153 error.to_jsval(
154 cx.into(),
155 &self.global(),
156 error_value.handle_mut(),
157 CanGc::from_cx(cx),
158 );
159
160 let byob_branch_controller = self.byob_branch.get_byte_controller();
162 byob_branch_controller.error(cx, error_value.handle());
163
164 let other_branch_controller = self.other_branch.get_byte_controller();
166 other_branch_controller.error(cx, error_value.handle());
167
168 let cancel_result =
170 self.stream
171 .cancel(cx, &self.stream.global(), error_value.handle());
172 self.cancel_promise
173 .resolve_native(&cancel_result, CanGc::from_cx(cx));
174
175 return Ok(());
177 } else {
178 let cloned_chunk = clone_result.unwrap();
180
181 if !byob_canceled {
184 let byob_branch_controller = self.byob_branch.get_byte_controller();
185 byob_branch_controller.respond_with_new_view(cx, chunk)?;
186 }
187
188 let other_branch_controller = self.other_branch.get_byte_controller();
190 other_branch_controller.enqueue(cx, cloned_chunk)?;
191 }
192 } else if !byob_canceled {
193 let byob_branch_controller = self.byob_branch.get_byte_controller();
197 byob_branch_controller.respond_with_new_view(cx, chunk)?;
198 }
199
200 self.reading.set(false);
202
203 if self.read_again_for_branch_1.get() {
205 self.pull_algorithm(cx, Some(ByteTeePullAlgorithm::Pull1Algorithm));
206 } else if self.read_again_for_branch_2.get() {
207 self.pull_algorithm(cx, Some(ByteTeePullAlgorithm::Pull2Algorithm));
209 }
210
211 Ok(())
212 }
213
214 pub(crate) fn close_steps(
216 &self,
217 cx: &mut JSContext,
218 chunk: Option<RootedTraceableBox<HeapBufferSource<ArrayBufferViewU8>>>,
219 ) -> Fallible<()> {
220 self.reading.set(false);
222
223 let byob_canceled = if self.for_branch2 {
225 self.canceled_2.get()
226 } else {
227 self.canceled_1.get()
228 };
229
230 let other_canceled = if self.for_branch2 {
232 self.canceled_1.get()
233 } else {
234 self.canceled_2.get()
235 };
236
237 if !byob_canceled {
239 let byob_branch_controller = self.byob_branch.get_byte_controller();
240 byob_branch_controller.close(cx)?;
241 }
242
243 if !other_canceled {
245 let other_branch_controller = self.other_branch.get_byte_controller();
246 other_branch_controller.close(cx)?;
247 }
248
249 if let Some(chunk_value) = chunk {
251 if chunk_value.is_undefined() {
252 } else {
255 let chunk = chunk_value;
256 assert_eq!(chunk.byte_length(), 0);
258
259 if !byob_canceled {
262 let byob_branch_controller = self.byob_branch.get_byte_controller();
263 byob_branch_controller.respond_with_new_view(cx, &chunk)?;
264 }
265
266 if !other_canceled {
269 let other_branch_controller = self.other_branch.get_byte_controller();
270 if other_branch_controller.get_pending_pull_intos_size() > 0 {
271 other_branch_controller.respond(cx, 0)?;
272 }
273 }
274 }
275 }
276
277 if !byob_canceled || !other_canceled {
279 self.cancel_promise.resolve_native(&(), CanGc::from_cx(cx));
280 }
281
282 Ok(())
283 }
284 pub(crate) fn error_steps(&self) {
286 self.reading.set(false);
288 }
289
290 pub(crate) fn pull_algorithm(
291 &self,
292 cx: &mut JSContext,
293 byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
294 ) {
295 self.tee_underlying_source
296 .pull_algorithm(cx, byte_tee_pull_algorithm);
297 }
298}