script/dom/stream/
byteteereadintorequest.rs1use std::cell::Cell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::context::JSContext;
10use js::jsval::UndefinedValue;
11use js::typedarray::ArrayBufferViewU8;
12use script_bindings::reflector::{Reflector, reflect_dom_object_with_cx};
13use script_bindings::trace::RootedTraceableBox;
14
15use super::byteteeunderlyingsource::ByteTeePullAlgorithm;
16use crate::dom::bindings::buffer_source::HeapBufferSource;
17use crate::dom::bindings::error::{ErrorToJsval, Fallible};
18use crate::dom::bindings::refcounted::Trusted;
19use crate::dom::bindings::reflector::DomGlobal;
20use crate::dom::bindings::root::{Dom, DomRoot};
21use crate::dom::globalscope::GlobalScope;
22use crate::dom::promise::Promise;
23use crate::dom::stream::byteteeunderlyingsource::ByteTeeUnderlyingSource;
24use crate::dom::stream::readablestream::ReadableStream;
25use crate::microtask::Microtask;
26
27#[derive(JSTraceable, MallocSizeOf)]
28pub(crate) struct ByteTeeReadIntoRequestMicrotask {
29 #[ignore_malloc_size_of = "mozjs"]
30 chunk: RootedTraceableBox<HeapBufferSource<ArrayBufferViewU8>>,
31 tee_read_request: Trusted<ByteTeeReadIntoRequest>,
32}
33
34impl ByteTeeReadIntoRequestMicrotask {
35 pub(crate) fn microtask_chunk_steps(&self, cx: &mut JSContext) {
36 self.tee_read_request
37 .root()
38 .chunk_steps(&self.chunk, cx)
39 .expect("Failed to enqueue chunk");
40 }
41}
42
43#[dom_struct]
44pub(crate) struct ByteTeeReadIntoRequest {
45 reflector_: Reflector,
46 for_branch2: bool,
47 byob_branch: Dom<ReadableStream>,
48 other_branch: Dom<ReadableStream>,
49 stream: Dom<ReadableStream>,
50 #[conditional_malloc_size_of]
51 read_again_for_branch_1: Rc<Cell<bool>>,
52 #[conditional_malloc_size_of]
53 read_again_for_branch_2: Rc<Cell<bool>>,
54 #[conditional_malloc_size_of]
55 reading: Rc<Cell<bool>>,
56 #[conditional_malloc_size_of]
57 canceled_1: Rc<Cell<bool>>,
58 #[conditional_malloc_size_of]
59 canceled_2: Rc<Cell<bool>>,
60 #[conditional_malloc_size_of]
61 cancel_promise: Rc<Promise>,
62 tee_underlying_source: Dom<ByteTeeUnderlyingSource>,
63}
64impl ByteTeeReadIntoRequest {
65 #[allow(clippy::too_many_arguments)]
66 pub(crate) fn new(
67 cx: &mut JSContext,
68 for_branch2: bool,
69 byob_branch: &ReadableStream,
70 other_branch: &ReadableStream,
71 stream: &ReadableStream,
72 read_again_for_branch_1: Rc<Cell<bool>>,
73 read_again_for_branch_2: Rc<Cell<bool>>,
74 reading: Rc<Cell<bool>>,
75 canceled_1: Rc<Cell<bool>>,
76 canceled_2: Rc<Cell<bool>>,
77 cancel_promise: Rc<Promise>,
78 tee_underlying_source: &ByteTeeUnderlyingSource,
79 global: &GlobalScope,
80 ) -> DomRoot<Self> {
81 reflect_dom_object_with_cx(
82 Box::new(ByteTeeReadIntoRequest {
83 reflector_: Reflector::new(),
84 for_branch2,
85 byob_branch: Dom::from_ref(byob_branch),
86 other_branch: Dom::from_ref(other_branch),
87 stream: Dom::from_ref(stream),
88 read_again_for_branch_1,
89 read_again_for_branch_2,
90 reading,
91 canceled_1,
92 canceled_2,
93 cancel_promise,
94 tee_underlying_source: Dom::from_ref(tee_underlying_source),
95 }),
96 global,
97 cx,
98 )
99 }
100
101 pub(crate) fn enqueue_chunk_steps(
102 &self,
103 cx: &mut JSContext,
104 chunk: RootedTraceableBox<HeapBufferSource<ArrayBufferViewU8>>,
105 ) {
106 let byte_tee_read_request_chunk = ByteTeeReadIntoRequestMicrotask {
108 chunk,
109 tee_read_request: Trusted::new(self),
110 };
111
112 self.global().enqueue_microtask(
113 cx,
114 Microtask::ReadableStreamByteTeeReadIntoRequest(byte_tee_read_request_chunk),
115 );
116 }
117
118 #[allow(clippy::borrowed_box)]
120 pub(crate) fn chunk_steps(
121 &self,
122 chunk: &HeapBufferSource<ArrayBufferViewU8>,
123 cx: &mut JSContext,
124 ) -> Fallible<()> {
125 self.read_again_for_branch_1.set(false);
127
128 self.read_again_for_branch_2.set(false);
130
131 let byob_canceled = if self.for_branch2 {
133 self.canceled_2.get()
134 } else {
135 self.canceled_1.get()
136 };
137
138 let other_canceled = if self.for_branch2 {
140 self.canceled_1.get()
141 } else {
142 self.canceled_2.get()
143 };
144
145 if !other_canceled {
147 let clone_result = chunk.clone_as_uint8_array(cx);
149
150 if let Err(error) = clone_result {
152 rooted!(&in(cx) let mut error_value = UndefinedValue());
153 error.to_jsval(cx, &self.global(), error_value.handle_mut());
154
155 let byob_branch_controller = self.byob_branch.get_byte_controller();
157 byob_branch_controller.error(cx, error_value.handle());
158
159 let other_branch_controller = self.other_branch.get_byte_controller();
161 other_branch_controller.error(cx, error_value.handle());
162
163 let cancel_result =
165 self.stream
166 .cancel(cx, &self.stream.global(), error_value.handle());
167 self.cancel_promise.resolve_native(cx, &cancel_result);
168
169 return Ok(());
171 } else {
172 let cloned_chunk = clone_result.unwrap();
174
175 if !byob_canceled {
178 let byob_branch_controller = self.byob_branch.get_byte_controller();
179 byob_branch_controller.respond_with_new_view(cx, chunk)?;
180 }
181
182 let other_branch_controller = self.other_branch.get_byte_controller();
184 other_branch_controller.enqueue(cx, cloned_chunk)?;
185 }
186 } else if !byob_canceled {
187 let byob_branch_controller = self.byob_branch.get_byte_controller();
191 byob_branch_controller.respond_with_new_view(cx, chunk)?;
192 }
193
194 self.reading.set(false);
196
197 if self.read_again_for_branch_1.get() {
199 self.pull_algorithm(cx, Some(ByteTeePullAlgorithm::Pull1Algorithm));
200 } else if self.read_again_for_branch_2.get() {
201 self.pull_algorithm(cx, Some(ByteTeePullAlgorithm::Pull2Algorithm));
203 }
204
205 Ok(())
206 }
207
208 pub(crate) fn close_steps(
210 &self,
211 cx: &mut JSContext,
212 chunk: Option<RootedTraceableBox<HeapBufferSource<ArrayBufferViewU8>>>,
213 ) -> Fallible<()> {
214 self.reading.set(false);
216
217 let byob_canceled = if self.for_branch2 {
219 self.canceled_2.get()
220 } else {
221 self.canceled_1.get()
222 };
223
224 let other_canceled = if self.for_branch2 {
226 self.canceled_1.get()
227 } else {
228 self.canceled_2.get()
229 };
230
231 if !byob_canceled {
233 let byob_branch_controller = self.byob_branch.get_byte_controller();
234 byob_branch_controller.close(cx)?;
235 }
236
237 if !other_canceled {
239 let other_branch_controller = self.other_branch.get_byte_controller();
240 other_branch_controller.close(cx)?;
241 }
242
243 if let Some(chunk_value) = chunk {
245 if chunk_value.is_undefined() {
246 } else {
249 let chunk = chunk_value;
250 assert_eq!(chunk.byte_length(), 0);
252
253 if !byob_canceled {
256 let byob_branch_controller = self.byob_branch.get_byte_controller();
257 byob_branch_controller.respond_with_new_view(cx, &chunk)?;
258 }
259
260 if !other_canceled {
263 let other_branch_controller = self.other_branch.get_byte_controller();
264 if other_branch_controller.get_pending_pull_intos_size() > 0 {
265 other_branch_controller.respond(cx, 0)?;
266 }
267 }
268 }
269 }
270
271 if !byob_canceled || !other_canceled {
273 self.cancel_promise.resolve_native(cx, &());
274 }
275
276 Ok(())
277 }
278 pub(crate) fn error_steps(&self) {
280 self.reading.set(false);
282 }
283
284 pub(crate) fn pull_algorithm(
285 &self,
286 cx: &mut JSContext,
287 byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
288 ) {
289 self.tee_underlying_source
290 .pull_algorithm(cx, byte_tee_pull_algorithm);
291 }
292}