1use std::cell::{Cell, RefCell};
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::context::JSContext;
10use js::jsapi::{HandleValueArray, Heap, NewArrayObject, Value};
11use js::jsval::ObjectValue;
12use js::rust::HandleValue as SafeHandleValue;
13use js::typedarray::ArrayBufferViewU8;
14use script_bindings::reflector::{Reflector, reflect_dom_object_with_cx};
15
16use super::byteteereadintorequest::ByteTeeReadIntoRequest;
17use super::readablestream::ReaderType;
18use super::readablestreambyobreader::ReadIntoRequest;
19use crate::dom::bindings::buffer_source::HeapBufferSource;
20use crate::dom::bindings::error::{Error, Fallible};
21use crate::dom::bindings::reflector::DomGlobal;
22use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
23use crate::dom::globalscope::GlobalScope;
24use crate::dom::promise::Promise;
25use crate::dom::stream::byteteereadrequest::ByteTeeReadRequest;
26use crate::dom::stream::readablestreamdefaultreader::ReadRequest;
27use crate::dom::types::ReadableStream;
28
29#[derive(JSTraceable, MallocSizeOf)]
30pub(crate) enum ByteTeeCancelAlgorithm {
31 Cancel1Algorithm,
32 Cancel2Algorithm,
33}
34
35#[derive(Clone, JSTraceable, MallocSizeOf)]
36pub(crate) enum ByteTeePullAlgorithm {
37 Pull1Algorithm,
38 Pull2Algorithm,
39}
40
41#[dom_struct]
42pub(crate) struct ByteTeeUnderlyingSource {
44 reflector_: Reflector,
45 #[conditional_malloc_size_of]
46 reader: Rc<RefCell<ReaderType>>,
47 stream: Dom<ReadableStream>,
48 branch_1: MutNullableDom<ReadableStream>,
49 branch_2: MutNullableDom<ReadableStream>,
50 #[conditional_malloc_size_of]
51 read_again_for_branch_1: Rc<Cell<bool>>,
52 #[conditional_malloc_size_of]
53 read_again_for_branch_2: Rc<Cell<bool>>,
54 #[conditional_malloc_size_of]
55 reading: Rc<Cell<bool>>,
56 #[conditional_malloc_size_of]
57 canceled_1: Rc<Cell<bool>>,
58 #[conditional_malloc_size_of]
59 canceled_2: Rc<Cell<bool>>,
60 #[ignore_malloc_size_of = "Mozjs"]
61 reason_1: Rc<Heap<Value>>,
62 #[ignore_malloc_size_of = "Mozjs"]
63 reason_2: Rc<Heap<Value>>,
64 #[conditional_malloc_size_of]
65 cancel_promise: Rc<Promise>,
66 #[conditional_malloc_size_of]
67 reader_version: Rc<Cell<u64>>,
68 tee_cancel_algorithm: ByteTeeCancelAlgorithm,
69 byte_tee_pull_algorithm: ByteTeePullAlgorithm,
70}
71
72impl ByteTeeUnderlyingSource {
73 #[allow(clippy::too_many_arguments)]
74 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
75 pub(crate) fn new(
76 cx: &mut JSContext,
77 reader: Rc<RefCell<ReaderType>>,
78 stream: &ReadableStream,
79 reading: Rc<Cell<bool>>,
80 read_again_for_branch_1: Rc<Cell<bool>>,
81 read_again_for_branch_2: Rc<Cell<bool>>,
82 canceled_1: Rc<Cell<bool>>,
83 canceled_2: Rc<Cell<bool>>,
84 reason_1: Rc<Heap<Value>>,
85 reason_2: Rc<Heap<Value>>,
86 cancel_promise: Rc<Promise>,
87 reader_version: Rc<Cell<u64>>,
88 tee_cancel_algorithm: ByteTeeCancelAlgorithm,
89 byte_tee_pull_algorithm: ByteTeePullAlgorithm,
90 ) -> DomRoot<ByteTeeUnderlyingSource> {
91 reflect_dom_object_with_cx(
92 Box::new(ByteTeeUnderlyingSource {
93 reflector_: Reflector::new(),
94 reader,
95 stream: Dom::from_ref(stream),
96 branch_1: MutNullableDom::new(None),
97 branch_2: MutNullableDom::new(None),
98 read_again_for_branch_1,
99 read_again_for_branch_2,
100 reading,
101 canceled_1,
102 canceled_2,
103 reason_1,
104 reason_2,
105 cancel_promise,
106 reader_version,
107 tee_cancel_algorithm,
108 byte_tee_pull_algorithm,
109 }),
110 &*stream.global(),
111 cx,
112 )
113 }
114
115 pub(crate) fn set_branch_1(&self, stream: &ReadableStream) {
116 self.branch_1.set(Some(stream));
117 }
118
119 pub(crate) fn set_branch_2(&self, stream: &ReadableStream) {
120 self.branch_2.set(Some(stream));
121 }
122
123 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
124 pub(crate) fn forward_reader_error(
125 &self,
126 cx: &mut JSContext,
127 this_reader: Rc<RefCell<ReaderType>>,
128 ) {
129 let this_reader = this_reader.borrow_mut();
130 match &*this_reader {
131 ReaderType::Default(reader) => {
132 let expected_version = self.reader_version.get();
133 reader
135 .get()
136 .expect("Reader should be set.")
137 .byte_tee_append_native_handler_to_closed_promise(
138 cx,
139 &self.branch_1.get().expect("Branch 1 should be set."),
140 &self.branch_2.get().expect("Branch 2 should be set."),
141 self.canceled_1.clone(),
142 self.canceled_2.clone(),
143 self.cancel_promise.clone(),
144 self.reader_version.clone(),
145 expected_version,
146 );
147 },
148 ReaderType::BYOB(reader) => {
149 let expected_version = self.reader_version.get();
150 reader
152 .get()
153 .expect("Reader should be set.")
154 .byte_tee_append_native_handler_to_closed_promise(
155 cx,
156 &self.branch_1.get().expect("Branch 1 should be set."),
157 &self.branch_2.get().expect("Branch 2 should be set."),
158 self.canceled_1.clone(),
159 self.canceled_2.clone(),
160 self.cancel_promise.clone(),
161 self.reader_version.clone(),
162 expected_version,
163 );
164 },
165 }
166 }
167
168 fn pull_with_default_reader(&self, cx: &mut JSContext, global: &GlobalScope) -> Fallible<()> {
169 let mut reader = self.reader.borrow_mut();
170 match &*reader {
171 ReaderType::BYOB(byte_reader) => {
172 assert!(
174 byte_reader
175 .get()
176 .expect("Reader should be set.")
177 .get_num_read_into_requests() ==
178 0
179 );
180
181 byte_reader
183 .get()
184 .expect("Reader should be set.")
185 .release(cx)?;
186
187 let default_reader = self
189 .stream
190 .acquire_default_reader(cx)
191 .expect("AcquireReadableStreamDefaultReader should not fail");
192
193 *reader = ReaderType::Default(MutNullableDom::new(Some(&default_reader)));
194 self.reader_version
195 .set(self.reader_version.get().wrapping_add(1));
196 drop(reader);
197
198 self.forward_reader_error(cx, self.reader.clone());
200
201 return self.pull_with_default_reader(cx, global);
203 },
204 ReaderType::Default(reader) => {
205 let byte_tee_read_request = ByteTeeReadRequest::new(
206 cx,
207 &self.branch_1.get().expect("Branch 1 should be set."),
208 &self.branch_2.get().expect("Branch 2 should be set."),
209 &self.stream,
210 self.read_again_for_branch_1.clone(),
211 self.read_again_for_branch_2.clone(),
212 self.reading.clone(),
213 self.canceled_1.clone(),
214 self.canceled_2.clone(),
215 self.cancel_promise.clone(),
216 self,
217 global,
218 );
219
220 let read_request = ReadRequest::ByteTee {
221 byte_tee_read_request: Dom::from_ref(&byte_tee_read_request),
222 };
223
224 reader
225 .get()
226 .expect("Reader should be set.")
227 .read(cx, &read_request);
228 },
229 }
230
231 Ok(())
232 }
233
234 fn pull_with_byob_reader(
235 &self,
236 cx: &mut JSContext,
237 view: &HeapBufferSource<ArrayBufferViewU8>,
238 for_branch2: bool,
239 global: &GlobalScope,
240 ) {
241 let mut reader = self.reader.borrow_mut();
242 match &*reader {
243 ReaderType::BYOB(reader) => {
244 let byob_branch = if for_branch2 {
246 self.branch_2.get().expect("Branch 2 should be set.")
247 } else {
248 self.branch_1.get().expect("Branch 1 should be set.")
249 };
250
251 let other_branch = if for_branch2 {
253 self.branch_1.get().expect("Branch 1 should be set.")
254 } else {
255 self.branch_2.get().expect("Branch 2 should be set.")
256 };
257
258 let byte_tee_read_into_request = ByteTeeReadIntoRequest::new(
260 cx,
261 for_branch2,
262 &byob_branch,
263 &other_branch,
264 &self.stream,
265 self.read_again_for_branch_1.clone(),
266 self.read_again_for_branch_2.clone(),
267 self.reading.clone(),
268 self.canceled_1.clone(),
269 self.canceled_2.clone(),
270 self.cancel_promise.clone(),
271 self,
272 global,
273 );
274
275 let read_into_request = ReadIntoRequest::ByteTee {
276 byte_tee_read_into_request: Dom::from_ref(&byte_tee_read_into_request),
277 };
278
279 reader
281 .get()
282 .expect("Reader should be set.")
283 .read(cx, view, 1, &read_into_request);
284 },
285 ReaderType::Default(default_reader) => {
286 assert!(
289 default_reader
290 .get()
291 .expect("Reader should be set.")
292 .get_num_read_requests() ==
293 0
294 );
295
296 default_reader
298 .get()
299 .expect("Reader should be set.")
300 .release(cx)
301 .expect("Release should be successful.");
302
303 let byob_reader = self
305 .stream
306 .acquire_byob_reader(cx)
307 .expect("Reader should be set.");
308
309 *reader = ReaderType::BYOB(MutNullableDom::new(Some(&byob_reader)));
310 self.reader_version
311 .set(self.reader_version.get().wrapping_add(1));
312
313 drop(reader);
314
315 self.forward_reader_error(cx, self.reader.clone());
317
318 self.pull_with_byob_reader(cx, view, for_branch2, global);
320 },
321 }
322 }
323
324 pub(crate) fn pull_algorithm(
326 &self,
327 cx: &mut JSContext,
328 byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
329 ) -> Rc<Promise> {
330 let pull_algorithm =
331 byte_tee_pull_algorithm.unwrap_or(self.byte_tee_pull_algorithm.clone());
332
333 match pull_algorithm {
334 ByteTeePullAlgorithm::Pull1Algorithm => {
335 if self.reading.get() {
337 self.read_again_for_branch_1.set(true);
339 return Promise::new_resolved(cx, &self.stream.global(), ());
341 }
342
343 self.reading.set(true);
345
346 let byob_branch_controller = self
348 .branch_1
349 .get()
350 .expect("Branch 1 should be set.")
351 .get_byte_controller();
352 let byob_request = byob_branch_controller
353 .get_byob_request(cx)
354 .expect("Byob request should be set.");
355
356 match byob_request {
357 None => {
359 self.pull_with_default_reader(cx, &self.stream.global())
360 .expect("Pull with default reader should be successful.");
361 },
362 Some(request) => {
363 let view = request.get_view();
365
366 self.pull_with_byob_reader(cx, &view, false, &self.stream.global());
367 },
368 }
369
370 Promise::new_resolved(cx, &self.stream.global(), ())
372 },
373 ByteTeePullAlgorithm::Pull2Algorithm => {
374 if self.reading.get() {
376 self.read_again_for_branch_2.set(true);
378
379 return Promise::new_resolved(cx, &self.stream.global(), ());
381 }
382
383 self.reading.set(true);
385
386 let byob_branch_controller = self
388 .branch_2
389 .get()
390 .expect("Branch 2 should be set.")
391 .get_byte_controller();
392 let byob_request = byob_branch_controller
393 .get_byob_request(cx)
394 .expect("Byob request should be set.");
395
396 match byob_request {
397 None => {
398 self.pull_with_default_reader(cx, &self.stream.global())
399 .expect("Pull with default reader should be successful.");
400 },
401 Some(request) => {
402 let view = request.get_view();
404
405 self.pull_with_byob_reader(cx, &view, true, &self.stream.global());
406 },
407 }
408
409 Promise::new_resolved(cx, &self.stream.global(), ())
411 },
412 }
413 }
414
415 pub(crate) fn cancel_algorithm(
420 &self,
421 cx: &mut JSContext,
422 reason: SafeHandleValue,
423 ) -> Option<Result<Rc<Promise>, Error>> {
424 match self.tee_cancel_algorithm {
425 ByteTeeCancelAlgorithm::Cancel1Algorithm => {
426 self.canceled_1.set(true);
428
429 self.reason_1.set(reason.get());
431
432 if self.canceled_2.get() {
434 self.resolve_cancel_promise(cx);
435 }
436
437 Some(Ok(self.cancel_promise.clone()))
439 },
440 ByteTeeCancelAlgorithm::Cancel2Algorithm => {
441 self.canceled_2.set(true);
443
444 self.reason_2.set(reason.get());
446
447 if self.canceled_1.get() {
449 self.resolve_cancel_promise(cx);
450 }
451 Some(Ok(self.cancel_promise.clone()))
453 },
454 }
455 }
456
457 #[expect(unsafe_code)]
458 fn resolve_cancel_promise(&self, cx: &mut JSContext) {
459 rooted_vec!(let mut reasons_values);
461 reasons_values.push(self.reason_1.get());
462 reasons_values.push(self.reason_2.get());
463
464 let reasons_values_array = HandleValueArray::from(&reasons_values);
465 rooted!(&in(cx) let reasons = unsafe { NewArrayObject(cx.raw_cx(), &reasons_values_array) });
466 rooted!(&in(cx) let reasons_value = ObjectValue(reasons.get()));
467
468 let cancel_result = self
470 .stream
471 .cancel(cx, &self.stream.global(), reasons_value.handle());
472
473 self.cancel_promise.resolve_native(cx, &cancel_result);
475 }
476}