1use std::cell::{Cell, RefCell};
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsapi::{HandleValueArray, Heap, NewArrayObject, Value};
10use js::jsval::{ObjectValue, UndefinedValue};
11use js::rust::HandleValue as SafeHandleValue;
12use js::typedarray::ArrayBufferViewU8;
13
14use super::byteteereadintorequest::ByteTeeReadIntoRequest;
15use super::readablestream::ReaderType;
16use super::readablestreambyobreader::ReadIntoRequest;
17use crate::dom::bindings::buffer_source::HeapBufferSource;
18use crate::dom::bindings::error::{Error, Fallible};
19use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
20use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
21use crate::dom::globalscope::GlobalScope;
22use crate::dom::promise::Promise;
23use crate::dom::stream::byteteereadrequest::ByteTeeReadRequest;
24use crate::dom::stream::readablestreamdefaultreader::ReadRequest;
25use crate::dom::types::ReadableStream;
26use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
27
28#[derive(JSTraceable, MallocSizeOf)]
29pub(crate) enum ByteTeeCancelAlgorithm {
30 Cancel1Algorithm,
31 Cancel2Algorithm,
32}
33
34#[derive(Clone, JSTraceable, MallocSizeOf)]
35pub(crate) enum ByteTeePullAlgorithm {
36 Pull1Algorithm,
37 Pull2Algorithm,
38}
39
40#[dom_struct]
41pub(crate) struct ByteTeeUnderlyingSource {
43 reflector_: Reflector,
44 #[ignore_malloc_size_of = "Rc"]
45 reader: Rc<RefCell<ReaderType>>,
46 stream: Dom<ReadableStream>,
47 branch_1: MutNullableDom<ReadableStream>,
48 branch_2: MutNullableDom<ReadableStream>,
49 #[ignore_malloc_size_of = "Rc"]
50 read_again_for_branch_1: Rc<Cell<bool>>,
51 #[ignore_malloc_size_of = "Rc"]
52 read_again_for_branch_2: Rc<Cell<bool>>,
53 #[ignore_malloc_size_of = "Rc"]
54 reading: Rc<Cell<bool>>,
55 #[ignore_malloc_size_of = "Rc"]
56 canceled_1: Rc<Cell<bool>>,
57 #[ignore_malloc_size_of = "Rc"]
58 canceled_2: Rc<Cell<bool>>,
59 #[ignore_malloc_size_of = "Rc"]
60 #[allow(clippy::redundant_allocation)]
61 reason_1: Rc<Box<Heap<Value>>>,
62 #[ignore_malloc_size_of = "Rc"]
63 #[allow(clippy::redundant_allocation)]
64 reason_2: Rc<Box<Heap<Value>>>,
65 #[ignore_malloc_size_of = "Rc"]
66 cancel_promise: Rc<Promise>,
67 #[ignore_malloc_size_of = "Rc"]
68 reader_version: Rc<Cell<u64>>,
69 tee_cancel_algorithm: ByteTeeCancelAlgorithm,
70 byte_tee_pull_algorithm: ByteTeePullAlgorithm,
71}
72
73impl ByteTeeUnderlyingSource {
74 #[allow(clippy::too_many_arguments)]
75 #[allow(clippy::redundant_allocation)]
76 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
77 pub(crate) fn new(
78 reader: Rc<RefCell<ReaderType>>,
79 stream: &ReadableStream,
80 reading: Rc<Cell<bool>>,
81 read_again_for_branch_1: Rc<Cell<bool>>,
82 read_again_for_branch_2: Rc<Cell<bool>>,
83 canceled_1: Rc<Cell<bool>>,
84 canceled_2: Rc<Cell<bool>>,
85 reason_1: Rc<Box<Heap<Value>>>,
86 reason_2: Rc<Box<Heap<Value>>>,
87 cancel_promise: Rc<Promise>,
88 reader_version: Rc<Cell<u64>>,
89 tee_cancel_algorithm: ByteTeeCancelAlgorithm,
90 byte_tee_pull_algorithm: ByteTeePullAlgorithm,
91 can_gc: CanGc,
92 ) -> DomRoot<ByteTeeUnderlyingSource> {
93 reflect_dom_object(
94 Box::new(ByteTeeUnderlyingSource {
95 reflector_: Reflector::new(),
96 reader,
97 stream: Dom::from_ref(stream),
98 branch_1: MutNullableDom::new(None),
99 branch_2: MutNullableDom::new(None),
100 read_again_for_branch_1,
101 read_again_for_branch_2,
102 reading,
103 canceled_1,
104 canceled_2,
105 reason_1,
106 reason_2,
107 cancel_promise,
108 reader_version,
109 tee_cancel_algorithm,
110 byte_tee_pull_algorithm,
111 }),
112 &*stream.global(),
113 can_gc,
114 )
115 }
116
117 pub(crate) fn set_branch_1(&self, stream: &ReadableStream) {
118 self.branch_1.set(Some(stream));
119 }
120
121 pub(crate) fn set_branch_2(&self, stream: &ReadableStream) {
122 self.branch_2.set(Some(stream));
123 }
124
125 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
126 pub(crate) fn forward_reader_error(&self, this_reader: Rc<RefCell<ReaderType>>, can_gc: CanGc) {
127 let this_reader = this_reader.borrow_mut();
128 match &*this_reader {
129 ReaderType::Default(reader) => {
130 let expected_version = self.reader_version.get();
131 reader
133 .get()
134 .expect("Reader should be set.")
135 .byte_tee_append_native_handler_to_closed_promise(
136 &self.branch_1.get().expect("Branch 1 should be set."),
137 &self.branch_2.get().expect("Branch 2 should be set."),
138 self.canceled_1.clone(),
139 self.canceled_2.clone(),
140 self.cancel_promise.clone(),
141 self.reader_version.clone(),
142 expected_version,
143 can_gc,
144 );
145 },
146 ReaderType::BYOB(reader) => {
147 let expected_version = self.reader_version.get();
148 reader
150 .get()
151 .expect("Reader should be set.")
152 .byte_tee_append_native_handler_to_closed_promise(
153 &self.branch_1.get().expect("Branch 1 should be set."),
154 &self.branch_2.get().expect("Branch 2 should be set."),
155 self.canceled_1.clone(),
156 self.canceled_2.clone(),
157 self.cancel_promise.clone(),
158 self.reader_version.clone(),
159 expected_version,
160 can_gc,
161 );
162 },
163 }
164 }
165
166 pub(crate) fn pull_with_default_reader(
167 &self,
168 cx: SafeJSContext,
169 global: &GlobalScope,
170 can_gc: CanGc,
171 ) -> Fallible<()> {
172 let mut reader = self.reader.borrow_mut();
173 match &*reader {
174 ReaderType::BYOB(byte_reader) => {
175 assert!(
177 byte_reader
178 .get()
179 .expect("Reader should be set.")
180 .get_num_read_into_requests() ==
181 0
182 );
183
184 byte_reader
186 .get()
187 .expect("Reader should be set.")
188 .release(can_gc)?;
189
190 let default_reader = self
192 .stream
193 .acquire_default_reader(can_gc)
194 .expect("AcquireReadableStreamDefaultReader should not fail");
195
196 *reader = ReaderType::Default(MutNullableDom::new(Some(&default_reader)));
197 self.reader_version
198 .set(self.reader_version.get().wrapping_add(1));
199 drop(reader);
200
201 self.forward_reader_error(self.reader.clone(), can_gc);
203
204 return self.pull_with_default_reader(cx, global, can_gc);
206 },
207 ReaderType::Default(reader) => {
208 let byte_tee_read_request = ByteTeeReadRequest::new(
209 &self.branch_1.get().expect("Branch 1 should be set."),
210 &self.branch_2.get().expect("Branch 2 should be set."),
211 &self.stream,
212 self.read_again_for_branch_1.clone(),
213 self.read_again_for_branch_2.clone(),
214 self.reading.clone(),
215 self.canceled_1.clone(),
216 self.canceled_2.clone(),
217 self.cancel_promise.clone(),
218 self,
219 global,
220 can_gc,
221 );
222
223 let read_request = ReadRequest::ByteTee {
224 byte_tee_read_request: Dom::from_ref(&byte_tee_read_request),
225 };
226
227 reader
228 .get()
229 .expect("Reader should be set.")
230 .read(cx, &read_request, can_gc);
231 },
232 }
233
234 Ok(())
235 }
236
237 pub(crate) fn pull_with_byob_reader(
238 &self,
239 view: HeapBufferSource<ArrayBufferViewU8>,
240 for_branch2: bool,
241 cx: SafeJSContext,
242 global: &GlobalScope,
243 can_gc: CanGc,
244 ) {
245 let mut reader = self.reader.borrow_mut();
246 match &*reader {
247 ReaderType::BYOB(reader) => {
248 let byob_branch = if for_branch2 {
250 self.branch_2.get().expect("Branch 2 should be set.")
251 } else {
252 self.branch_1.get().expect("Branch 1 should be set.")
253 };
254
255 let other_branch = if for_branch2 {
257 self.branch_1.get().expect("Branch 1 should be set.")
258 } else {
259 self.branch_2.get().expect("Branch 2 should be set.")
260 };
261
262 let byte_tee_read_into_request = ByteTeeReadIntoRequest::new(
264 for_branch2,
265 &byob_branch,
266 &other_branch,
267 &self.stream,
268 self.read_again_for_branch_1.clone(),
269 self.read_again_for_branch_2.clone(),
270 self.reading.clone(),
271 self.canceled_1.clone(),
272 self.canceled_2.clone(),
273 self.cancel_promise.clone(),
274 self,
275 global,
276 can_gc,
277 );
278
279 let read_into_request = ReadIntoRequest::ByteTee {
280 byte_tee_read_into_request: Dom::from_ref(&byte_tee_read_into_request),
281 };
282
283 reader.get().expect("Reader should be set.").read(
285 cx,
286 view,
287 1,
288 &read_into_request,
289 can_gc,
290 );
291 },
292 ReaderType::Default(default_reader) => {
293 assert!(
296 default_reader
297 .get()
298 .expect("Reader should be set.")
299 .get_num_read_requests() ==
300 0
301 );
302
303 default_reader
305 .get()
306 .expect("Reader should be set.")
307 .release(can_gc)
308 .expect("Release should be successful.");
309
310 let byob_reader = self
312 .stream
313 .acquire_byob_reader(can_gc)
314 .expect("Reader should be set.");
315
316 *reader = ReaderType::BYOB(MutNullableDom::new(Some(&byob_reader)));
317 self.reader_version
318 .set(self.reader_version.get().wrapping_add(1));
319
320 drop(reader);
321
322 self.forward_reader_error(self.reader.clone(), can_gc);
324
325 self.pull_with_byob_reader(view, for_branch2, cx, global, can_gc);
327 },
328 }
329 }
330
331 pub(crate) fn pull_algorithm(
333 &self,
334 byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
335 can_gc: CanGc,
336 ) -> Rc<Promise> {
337 let cx = GlobalScope::get_cx();
338
339 let pull_algorithm =
340 byte_tee_pull_algorithm.unwrap_or(self.byte_tee_pull_algorithm.clone());
341
342 match pull_algorithm {
343 ByteTeePullAlgorithm::Pull1Algorithm => {
344 if self.reading.get() {
346 self.read_again_for_branch_1.set(true);
348 rooted!(in(*cx) let mut rval = UndefinedValue());
350 return Promise::new_resolved(&self.stream.global(), cx, rval.handle(), can_gc);
351 }
352
353 self.reading.set(true);
355
356 let byob_branch_controller = self
358 .branch_1
359 .get()
360 .expect("Branch 1 should be set.")
361 .get_byte_controller();
362 let byob_request = byob_branch_controller
363 .get_byob_request(cx, can_gc)
364 .expect("Byob request should be set.");
365
366 match byob_request {
367 None => {
369 self.pull_with_default_reader(cx, &self.stream.global(), can_gc)
370 .expect("Pull with default reader should be successful.");
371 },
372 Some(request) => {
373 let view = request.get_view();
375
376 self.pull_with_byob_reader(view, false, cx, &self.stream.global(), can_gc);
377 },
378 }
379
380 rooted!(in(*cx) let mut rval = UndefinedValue());
382 Promise::new_resolved(&self.stream.global(), cx, rval.handle(), can_gc)
383 },
384 ByteTeePullAlgorithm::Pull2Algorithm => {
385 if self.reading.get() {
387 self.read_again_for_branch_2.set(true);
389
390 rooted!(in(*cx) let mut rval = UndefinedValue());
392 return Promise::new_resolved(&self.stream.global(), cx, rval.handle(), can_gc);
393 }
394
395 self.reading.set(true);
397
398 let byob_branch_controller = self
400 .branch_2
401 .get()
402 .expect("Branch 2 should be set.")
403 .get_byte_controller();
404 let byob_request = byob_branch_controller
405 .get_byob_request(cx, can_gc)
406 .expect("Byob request should be set.");
407
408 match byob_request {
409 None => {
410 self.pull_with_default_reader(cx, &self.stream.global(), can_gc)
411 .expect("Pull with default reader should be successful.");
412 },
413 Some(request) => {
414 let view = request.get_view();
416
417 self.pull_with_byob_reader(view, true, cx, &self.stream.global(), can_gc);
418 },
419 }
420
421 rooted!(in(*cx) let mut rval = UndefinedValue());
423 Promise::new_resolved(&self.stream.global(), cx, rval.handle(), can_gc)
424 },
425 }
426 }
427
428 pub(crate) fn cancel_algorithm(
433 &self,
434 reason: SafeHandleValue,
435 can_gc: CanGc,
436 ) -> Option<Result<Rc<Promise>, Error>> {
437 match self.tee_cancel_algorithm {
438 ByteTeeCancelAlgorithm::Cancel1Algorithm => {
439 self.canceled_1.set(true);
441
442 self.reason_1.set(reason.get());
444
445 if self.canceled_2.get() {
447 self.resolve_cancel_promise(can_gc);
448 }
449
450 Some(Ok(self.cancel_promise.clone()))
452 },
453 ByteTeeCancelAlgorithm::Cancel2Algorithm => {
454 self.canceled_2.set(true);
456
457 self.reason_2.set(reason.get());
459
460 if self.canceled_1.get() {
462 self.resolve_cancel_promise(can_gc);
463 }
464 Some(Ok(self.cancel_promise.clone()))
466 },
467 }
468 }
469
470 #[expect(unsafe_code)]
471 fn resolve_cancel_promise(&self, can_gc: CanGc) {
472 let cx = GlobalScope::get_cx();
474 rooted_vec!(let mut reasons_values);
475 reasons_values.push(self.reason_1.get());
476 reasons_values.push(self.reason_2.get());
477
478 let reasons_values_array = HandleValueArray::from(&reasons_values);
479 rooted!(in(*cx) let reasons = unsafe { NewArrayObject(*cx, &reasons_values_array) });
480 rooted!(in(*cx) let reasons_value = ObjectValue(reasons.get()));
481
482 let cancel_result =
484 self.stream
485 .cancel(cx, &self.stream.global(), reasons_value.handle(), can_gc);
486
487 self.cancel_promise.resolve_native(&cancel_result, can_gc);
489 }
490}