1use std::cell::{Cell, RefCell};
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsapi::{HandleValueArray, Heap, NewArrayObject, Value};
10use js::jsval::{ObjectValue, UndefinedValue};
11use js::rust::HandleValue as SafeHandleValue;
12use js::typedarray::ArrayBufferViewU8;
13
14use super::bindings::buffer_source::HeapBufferSource;
15use super::bindings::root::{DomRoot, MutNullableDom};
16use super::byteteereadintorequest::ByteTeeReadIntoRequest;
17use super::readablestream::ReaderType;
18use super::readablestreambyobreader::ReadIntoRequest;
19use super::types::ReadableStream;
20use crate::dom::bindings::error::{Error, Fallible};
21use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
22use crate::dom::bindings::root::Dom;
23use crate::dom::byteteereadrequest::ByteTeeReadRequest;
24use crate::dom::globalscope::GlobalScope;
25use crate::dom::promise::Promise;
26use crate::dom::readablestreamdefaultreader::ReadRequest;
27use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
28
29#[derive(JSTraceable, MallocSizeOf)]
30pub(crate) enum ByteTeeCancelAlgorithm {
31 Cancel1Algorithm,
32 Cancel2Algorithm,
33}
34
35#[derive(Clone, JSTraceable, MallocSizeOf)]
36pub(crate) enum ByteTeePullAlgorithm {
37 Pull1Algorithm,
38 Pull2Algorithm,
39}
40
41#[dom_struct]
42pub(crate) struct ByteTeeUnderlyingSource {
44 reflector_: Reflector,
45 #[ignore_malloc_size_of = "Rc"]
46 reader: Rc<RefCell<ReaderType>>,
47 stream: Dom<ReadableStream>,
48 branch_1: MutNullableDom<ReadableStream>,
49 branch_2: MutNullableDom<ReadableStream>,
50 #[ignore_malloc_size_of = "Rc"]
51 read_again_for_branch_1: Rc<Cell<bool>>,
52 #[ignore_malloc_size_of = "Rc"]
53 read_again_for_branch_2: Rc<Cell<bool>>,
54 #[ignore_malloc_size_of = "Rc"]
55 reading: Rc<Cell<bool>>,
56 #[ignore_malloc_size_of = "Rc"]
57 canceled_1: Rc<Cell<bool>>,
58 #[ignore_malloc_size_of = "Rc"]
59 canceled_2: Rc<Cell<bool>>,
60 #[ignore_malloc_size_of = "Rc"]
61 #[allow(clippy::redundant_allocation)]
62 reason_1: Rc<Box<Heap<Value>>>,
63 #[ignore_malloc_size_of = "Rc"]
64 #[allow(clippy::redundant_allocation)]
65 reason_2: Rc<Box<Heap<Value>>>,
66 #[ignore_malloc_size_of = "Rc"]
67 cancel_promise: Rc<Promise>,
68 #[ignore_malloc_size_of = "Rc"]
69 reader_version: Rc<Cell<u64>>,
70 tee_cancel_algorithm: ByteTeeCancelAlgorithm,
71 byte_tee_pull_algorithm: ByteTeePullAlgorithm,
72}
73
74impl ByteTeeUnderlyingSource {
75 #[allow(clippy::too_many_arguments)]
76 #[allow(clippy::redundant_allocation)]
77 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
78 pub(crate) fn new(
79 reader: Rc<RefCell<ReaderType>>,
80 stream: &ReadableStream,
81 reading: Rc<Cell<bool>>,
82 read_again_for_branch_1: Rc<Cell<bool>>,
83 read_again_for_branch_2: Rc<Cell<bool>>,
84 canceled_1: Rc<Cell<bool>>,
85 canceled_2: Rc<Cell<bool>>,
86 reason_1: Rc<Box<Heap<Value>>>,
87 reason_2: Rc<Box<Heap<Value>>>,
88 cancel_promise: Rc<Promise>,
89 reader_version: Rc<Cell<u64>>,
90 tee_cancel_algorithm: ByteTeeCancelAlgorithm,
91 byte_tee_pull_algorithm: ByteTeePullAlgorithm,
92 can_gc: CanGc,
93 ) -> DomRoot<ByteTeeUnderlyingSource> {
94 reflect_dom_object(
95 Box::new(ByteTeeUnderlyingSource {
96 reflector_: Reflector::new(),
97 reader,
98 stream: Dom::from_ref(stream),
99 branch_1: MutNullableDom::new(None),
100 branch_2: MutNullableDom::new(None),
101 read_again_for_branch_1,
102 read_again_for_branch_2,
103 reading,
104 canceled_1,
105 canceled_2,
106 reason_1,
107 reason_2,
108 cancel_promise,
109 reader_version,
110 tee_cancel_algorithm,
111 byte_tee_pull_algorithm,
112 }),
113 &*stream.global(),
114 can_gc,
115 )
116 }
117
118 pub(crate) fn set_branch_1(&self, stream: &ReadableStream) {
119 self.branch_1.set(Some(stream));
120 }
121
122 pub(crate) fn set_branch_2(&self, stream: &ReadableStream) {
123 self.branch_2.set(Some(stream));
124 }
125
126 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
127 pub(crate) fn forward_reader_error(&self, this_reader: Rc<RefCell<ReaderType>>, can_gc: CanGc) {
128 let this_reader = this_reader.borrow_mut();
129 match &*this_reader {
130 ReaderType::Default(reader) => {
131 let expected_version = self.reader_version.get();
132 reader
134 .get()
135 .expect("Reader should be set.")
136 .byte_tee_append_native_handler_to_closed_promise(
137 &self.branch_1.get().expect("Branch 1 should be set."),
138 &self.branch_2.get().expect("Branch 2 should be set."),
139 self.canceled_1.clone(),
140 self.canceled_2.clone(),
141 self.cancel_promise.clone(),
142 self.reader_version.clone(),
143 expected_version,
144 can_gc,
145 );
146 },
147 ReaderType::BYOB(reader) => {
148 let expected_version = self.reader_version.get();
149 reader
151 .get()
152 .expect("Reader should be set.")
153 .byte_tee_append_native_handler_to_closed_promise(
154 &self.branch_1.get().expect("Branch 1 should be set."),
155 &self.branch_2.get().expect("Branch 2 should be set."),
156 self.canceled_1.clone(),
157 self.canceled_2.clone(),
158 self.cancel_promise.clone(),
159 self.reader_version.clone(),
160 expected_version,
161 can_gc,
162 );
163 },
164 }
165 }
166
167 pub(crate) fn pull_with_default_reader(
168 &self,
169 cx: SafeJSContext,
170 global: &GlobalScope,
171 can_gc: CanGc,
172 ) -> Fallible<()> {
173 let mut reader = self.reader.borrow_mut();
174 match &*reader {
175 ReaderType::BYOB(byte_reader) => {
176 assert!(
178 byte_reader
179 .get()
180 .expect("Reader should be set.")
181 .get_num_read_into_requests() ==
182 0
183 );
184
185 byte_reader
187 .get()
188 .expect("Reader should be set.")
189 .release(can_gc)?;
190
191 let default_reader = self
193 .stream
194 .acquire_default_reader(can_gc)
195 .expect("AcquireReadableStreamDefaultReader should not fail");
196
197 *reader = ReaderType::Default(MutNullableDom::new(Some(&default_reader)));
198 self.reader_version
199 .set(self.reader_version.get().wrapping_add(1));
200 drop(reader);
201
202 self.forward_reader_error(self.reader.clone(), can_gc);
204
205 return self.pull_with_default_reader(cx, global, can_gc);
207 },
208 ReaderType::Default(reader) => {
209 let byte_tee_read_request = ByteTeeReadRequest::new(
210 &self.branch_1.get().expect("Branch 1 should be set."),
211 &self.branch_2.get().expect("Branch 2 should be set."),
212 &self.stream,
213 self.read_again_for_branch_1.clone(),
214 self.read_again_for_branch_2.clone(),
215 self.reading.clone(),
216 self.canceled_1.clone(),
217 self.canceled_2.clone(),
218 self.cancel_promise.clone(),
219 self,
220 global,
221 can_gc,
222 );
223
224 let read_request = ReadRequest::ByteTee {
225 byte_tee_read_request: Dom::from_ref(&byte_tee_read_request),
226 };
227
228 reader
229 .get()
230 .expect("Reader should be set.")
231 .read(cx, &read_request, can_gc);
232 },
233 }
234
235 Ok(())
236 }
237
238 pub(crate) fn pull_with_byob_reader(
239 &self,
240 view: HeapBufferSource<ArrayBufferViewU8>,
241 for_branch2: bool,
242 cx: SafeJSContext,
243 global: &GlobalScope,
244 can_gc: CanGc,
245 ) {
246 let mut reader = self.reader.borrow_mut();
247 match &*reader {
248 ReaderType::BYOB(reader) => {
249 let byob_branch = if for_branch2 {
251 self.branch_2.get().expect("Branch 2 should be set.")
252 } else {
253 self.branch_1.get().expect("Branch 1 should be set.")
254 };
255
256 let other_branch = if for_branch2 {
258 self.branch_1.get().expect("Branch 1 should be set.")
259 } else {
260 self.branch_2.get().expect("Branch 2 should be set.")
261 };
262
263 let byte_tee_read_into_request = ByteTeeReadIntoRequest::new(
265 for_branch2,
266 &byob_branch,
267 &other_branch,
268 &self.stream,
269 self.read_again_for_branch_1.clone(),
270 self.read_again_for_branch_2.clone(),
271 self.reading.clone(),
272 self.canceled_1.clone(),
273 self.canceled_2.clone(),
274 self.cancel_promise.clone(),
275 self,
276 global,
277 can_gc,
278 );
279
280 let read_into_request = ReadIntoRequest::ByteTee {
281 byte_tee_read_into_request: Dom::from_ref(&byte_tee_read_into_request),
282 };
283
284 reader.get().expect("Reader should be set.").read(
286 cx,
287 view,
288 1,
289 &read_into_request,
290 can_gc,
291 );
292 },
293 ReaderType::Default(default_reader) => {
294 assert!(
297 default_reader
298 .get()
299 .expect("Reader should be set.")
300 .get_num_read_requests() ==
301 0
302 );
303
304 default_reader
306 .get()
307 .expect("Reader should be set.")
308 .release(can_gc)
309 .expect("Release should be successful.");
310
311 let byob_reader = self
313 .stream
314 .acquire_byob_reader(can_gc)
315 .expect("Reader should be set.");
316
317 *reader = ReaderType::BYOB(MutNullableDom::new(Some(&byob_reader)));
318 self.reader_version
319 .set(self.reader_version.get().wrapping_add(1));
320
321 drop(reader);
322
323 self.forward_reader_error(self.reader.clone(), can_gc);
325
326 self.pull_with_byob_reader(view, for_branch2, cx, global, can_gc);
328 },
329 }
330 }
331
332 pub(crate) fn pull_algorithm(
334 &self,
335 byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
336 can_gc: CanGc,
337 ) -> Rc<Promise> {
338 let cx = GlobalScope::get_cx();
339
340 let pull_algorithm =
341 byte_tee_pull_algorithm.unwrap_or(self.byte_tee_pull_algorithm.clone());
342
343 match pull_algorithm {
344 ByteTeePullAlgorithm::Pull1Algorithm => {
345 if self.reading.get() {
347 self.read_again_for_branch_1.set(true);
349 rooted!(in(*cx) let mut rval = UndefinedValue());
351 return Promise::new_resolved(&self.stream.global(), cx, rval.handle(), can_gc);
352 }
353
354 self.reading.set(true);
356
357 let byob_branch_controller = self
359 .branch_1
360 .get()
361 .expect("Branch 1 should be set.")
362 .get_byte_controller();
363 let byob_request = byob_branch_controller
364 .get_byob_request(cx, can_gc)
365 .expect("Byob request should be set.");
366
367 match byob_request {
368 None => {
370 self.pull_with_default_reader(cx, &self.stream.global(), can_gc)
371 .expect("Pull with default reader should be successful.");
372 },
373 Some(request) => {
374 let view = request.get_view();
376
377 self.pull_with_byob_reader(view, false, cx, &self.stream.global(), can_gc);
378 },
379 }
380
381 rooted!(in(*cx) let mut rval = UndefinedValue());
383 Promise::new_resolved(&self.stream.global(), cx, rval.handle(), can_gc)
384 },
385 ByteTeePullAlgorithm::Pull2Algorithm => {
386 if self.reading.get() {
388 self.read_again_for_branch_2.set(true);
390
391 rooted!(in(*cx) let mut rval = UndefinedValue());
393 return Promise::new_resolved(&self.stream.global(), cx, rval.handle(), can_gc);
394 }
395
396 self.reading.set(true);
398
399 let byob_branch_controller = self
401 .branch_2
402 .get()
403 .expect("Branch 2 should be set.")
404 .get_byte_controller();
405 let byob_request = byob_branch_controller
406 .get_byob_request(cx, can_gc)
407 .expect("Byob request should be set.");
408
409 match byob_request {
410 None => {
411 self.pull_with_default_reader(cx, &self.stream.global(), can_gc)
412 .expect("Pull with default reader should be successful.");
413 },
414 Some(request) => {
415 let view = request.get_view();
417
418 self.pull_with_byob_reader(view, true, cx, &self.stream.global(), can_gc);
419 },
420 }
421
422 rooted!(in(*cx) let mut rval = UndefinedValue());
424 Promise::new_resolved(&self.stream.global(), cx, rval.handle(), can_gc)
425 },
426 }
427 }
428
429 pub(crate) fn cancel_algorithm(
434 &self,
435 reason: SafeHandleValue,
436 can_gc: CanGc,
437 ) -> Option<Result<Rc<Promise>, Error>> {
438 match self.tee_cancel_algorithm {
439 ByteTeeCancelAlgorithm::Cancel1Algorithm => {
440 self.canceled_1.set(true);
442
443 self.reason_1.set(reason.get());
445
446 if self.canceled_2.get() {
448 self.resolve_cancel_promise(can_gc);
449 }
450
451 Some(Ok(self.cancel_promise.clone()))
453 },
454 ByteTeeCancelAlgorithm::Cancel2Algorithm => {
455 self.canceled_2.set(true);
457
458 self.reason_2.set(reason.get());
460
461 if self.canceled_1.get() {
463 self.resolve_cancel_promise(can_gc);
464 }
465 Some(Ok(self.cancel_promise.clone()))
467 },
468 }
469 }
470
471 #[expect(unsafe_code)]
472 fn resolve_cancel_promise(&self, can_gc: CanGc) {
473 let cx = GlobalScope::get_cx();
475 rooted_vec!(let mut reasons_values);
476 reasons_values.push(self.reason_1.get());
477 reasons_values.push(self.reason_2.get());
478
479 let reasons_values_array = HandleValueArray::from(&reasons_values);
480 rooted!(in(*cx) let reasons = unsafe { NewArrayObject(*cx, &reasons_values_array) });
481 rooted!(in(*cx) let reasons_value = ObjectValue(reasons.get()));
482
483 let cancel_result =
485 self.stream
486 .cancel(cx, &self.stream.global(), reasons_value.handle(), can_gc);
487
488 self.cancel_promise.resolve_native(&cancel_result, can_gc);
490 }
491}