1use std::cell::{Cell, RefCell};
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::context::JSContext;
10use js::jsapi::{HandleValueArray, Heap, NewArrayObject, Value};
11use js::jsval::ObjectValue;
12use js::rust::HandleValue as SafeHandleValue;
13use js::typedarray::ArrayBufferViewU8;
14use script_bindings::reflector::{Reflector, reflect_dom_object};
15
16use super::byteteereadintorequest::ByteTeeReadIntoRequest;
17use super::readablestream::ReaderType;
18use super::readablestreambyobreader::ReadIntoRequest;
19use crate::dom::bindings::buffer_source::HeapBufferSource;
20use crate::dom::bindings::error::{Error, Fallible};
21use crate::dom::bindings::reflector::DomGlobal;
22use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
23use crate::dom::globalscope::GlobalScope;
24use crate::dom::promise::Promise;
25use crate::dom::stream::byteteereadrequest::ByteTeeReadRequest;
26use crate::dom::stream::readablestreamdefaultreader::ReadRequest;
27use crate::dom::types::ReadableStream;
28use crate::script_runtime::CanGc;
29
30#[derive(JSTraceable, MallocSizeOf)]
31pub(crate) enum ByteTeeCancelAlgorithm {
32 Cancel1Algorithm,
33 Cancel2Algorithm,
34}
35
36#[derive(Clone, JSTraceable, MallocSizeOf)]
37pub(crate) enum ByteTeePullAlgorithm {
38 Pull1Algorithm,
39 Pull2Algorithm,
40}
41
42#[dom_struct]
43pub(crate) struct ByteTeeUnderlyingSource {
45 reflector_: Reflector,
46 #[conditional_malloc_size_of]
47 reader: Rc<RefCell<ReaderType>>,
48 stream: Dom<ReadableStream>,
49 branch_1: MutNullableDom<ReadableStream>,
50 branch_2: MutNullableDom<ReadableStream>,
51 #[conditional_malloc_size_of]
52 read_again_for_branch_1: Rc<Cell<bool>>,
53 #[conditional_malloc_size_of]
54 read_again_for_branch_2: Rc<Cell<bool>>,
55 #[conditional_malloc_size_of]
56 reading: Rc<Cell<bool>>,
57 #[conditional_malloc_size_of]
58 canceled_1: Rc<Cell<bool>>,
59 #[conditional_malloc_size_of]
60 canceled_2: Rc<Cell<bool>>,
61 #[ignore_malloc_size_of = "Mozjs"]
62 reason_1: Rc<Heap<Value>>,
63 #[ignore_malloc_size_of = "Mozjs"]
64 reason_2: Rc<Heap<Value>>,
65 #[conditional_malloc_size_of]
66 cancel_promise: Rc<Promise>,
67 #[conditional_malloc_size_of]
68 reader_version: Rc<Cell<u64>>,
69 tee_cancel_algorithm: ByteTeeCancelAlgorithm,
70 byte_tee_pull_algorithm: ByteTeePullAlgorithm,
71}
72
73impl ByteTeeUnderlyingSource {
74 #[allow(clippy::too_many_arguments)]
75 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
76 pub(crate) fn new(
77 reader: Rc<RefCell<ReaderType>>,
78 stream: &ReadableStream,
79 reading: Rc<Cell<bool>>,
80 read_again_for_branch_1: Rc<Cell<bool>>,
81 read_again_for_branch_2: Rc<Cell<bool>>,
82 canceled_1: Rc<Cell<bool>>,
83 canceled_2: Rc<Cell<bool>>,
84 reason_1: Rc<Heap<Value>>,
85 reason_2: Rc<Heap<Value>>,
86 cancel_promise: Rc<Promise>,
87 reader_version: Rc<Cell<u64>>,
88 tee_cancel_algorithm: ByteTeeCancelAlgorithm,
89 byte_tee_pull_algorithm: ByteTeePullAlgorithm,
90 can_gc: CanGc,
91 ) -> DomRoot<ByteTeeUnderlyingSource> {
92 reflect_dom_object(
93 Box::new(ByteTeeUnderlyingSource {
94 reflector_: Reflector::new(),
95 reader,
96 stream: Dom::from_ref(stream),
97 branch_1: MutNullableDom::new(None),
98 branch_2: MutNullableDom::new(None),
99 read_again_for_branch_1,
100 read_again_for_branch_2,
101 reading,
102 canceled_1,
103 canceled_2,
104 reason_1,
105 reason_2,
106 cancel_promise,
107 reader_version,
108 tee_cancel_algorithm,
109 byte_tee_pull_algorithm,
110 }),
111 &*stream.global(),
112 can_gc,
113 )
114 }
115
116 pub(crate) fn set_branch_1(&self, stream: &ReadableStream) {
117 self.branch_1.set(Some(stream));
118 }
119
120 pub(crate) fn set_branch_2(&self, stream: &ReadableStream) {
121 self.branch_2.set(Some(stream));
122 }
123
124 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
125 pub(crate) fn forward_reader_error(
126 &self,
127 cx: &mut JSContext,
128 this_reader: Rc<RefCell<ReaderType>>,
129 ) {
130 let this_reader = this_reader.borrow_mut();
131 match &*this_reader {
132 ReaderType::Default(reader) => {
133 let expected_version = self.reader_version.get();
134 reader
136 .get()
137 .expect("Reader should be set.")
138 .byte_tee_append_native_handler_to_closed_promise(
139 cx,
140 &self.branch_1.get().expect("Branch 1 should be set."),
141 &self.branch_2.get().expect("Branch 2 should be set."),
142 self.canceled_1.clone(),
143 self.canceled_2.clone(),
144 self.cancel_promise.clone(),
145 self.reader_version.clone(),
146 expected_version,
147 );
148 },
149 ReaderType::BYOB(reader) => {
150 let expected_version = self.reader_version.get();
151 reader
153 .get()
154 .expect("Reader should be set.")
155 .byte_tee_append_native_handler_to_closed_promise(
156 cx,
157 &self.branch_1.get().expect("Branch 1 should be set."),
158 &self.branch_2.get().expect("Branch 2 should be set."),
159 self.canceled_1.clone(),
160 self.canceled_2.clone(),
161 self.cancel_promise.clone(),
162 self.reader_version.clone(),
163 expected_version,
164 );
165 },
166 }
167 }
168
169 fn pull_with_default_reader(&self, cx: &mut JSContext, global: &GlobalScope) -> Fallible<()> {
170 let mut reader = self.reader.borrow_mut();
171 match &*reader {
172 ReaderType::BYOB(byte_reader) => {
173 assert!(
175 byte_reader
176 .get()
177 .expect("Reader should be set.")
178 .get_num_read_into_requests() ==
179 0
180 );
181
182 byte_reader
184 .get()
185 .expect("Reader should be set.")
186 .release(CanGc::from_cx(cx))?;
187
188 let default_reader = self
190 .stream
191 .acquire_default_reader(CanGc::from_cx(cx))
192 .expect("AcquireReadableStreamDefaultReader should not fail");
193
194 *reader = ReaderType::Default(MutNullableDom::new(Some(&default_reader)));
195 self.reader_version
196 .set(self.reader_version.get().wrapping_add(1));
197 drop(reader);
198
199 self.forward_reader_error(cx, self.reader.clone());
201
202 return self.pull_with_default_reader(cx, global);
204 },
205 ReaderType::Default(reader) => {
206 let byte_tee_read_request = ByteTeeReadRequest::new(
207 &self.branch_1.get().expect("Branch 1 should be set."),
208 &self.branch_2.get().expect("Branch 2 should be set."),
209 &self.stream,
210 self.read_again_for_branch_1.clone(),
211 self.read_again_for_branch_2.clone(),
212 self.reading.clone(),
213 self.canceled_1.clone(),
214 self.canceled_2.clone(),
215 self.cancel_promise.clone(),
216 self,
217 global,
218 CanGc::from_cx(cx),
219 );
220
221 let read_request = ReadRequest::ByteTee {
222 byte_tee_read_request: Dom::from_ref(&byte_tee_read_request),
223 };
224
225 reader
226 .get()
227 .expect("Reader should be set.")
228 .read(cx, &read_request);
229 },
230 }
231
232 Ok(())
233 }
234
235 fn pull_with_byob_reader(
236 &self,
237 cx: &mut JSContext,
238 view: &HeapBufferSource<ArrayBufferViewU8>,
239 for_branch2: bool,
240 global: &GlobalScope,
241 ) {
242 let mut reader = self.reader.borrow_mut();
243 match &*reader {
244 ReaderType::BYOB(reader) => {
245 let byob_branch = if for_branch2 {
247 self.branch_2.get().expect("Branch 2 should be set.")
248 } else {
249 self.branch_1.get().expect("Branch 1 should be set.")
250 };
251
252 let other_branch = if for_branch2 {
254 self.branch_1.get().expect("Branch 1 should be set.")
255 } else {
256 self.branch_2.get().expect("Branch 2 should be set.")
257 };
258
259 let byte_tee_read_into_request = ByteTeeReadIntoRequest::new(
261 for_branch2,
262 &byob_branch,
263 &other_branch,
264 &self.stream,
265 self.read_again_for_branch_1.clone(),
266 self.read_again_for_branch_2.clone(),
267 self.reading.clone(),
268 self.canceled_1.clone(),
269 self.canceled_2.clone(),
270 self.cancel_promise.clone(),
271 self,
272 global,
273 CanGc::from_cx(cx),
274 );
275
276 let read_into_request = ReadIntoRequest::ByteTee {
277 byte_tee_read_into_request: Dom::from_ref(&byte_tee_read_into_request),
278 };
279
280 reader
282 .get()
283 .expect("Reader should be set.")
284 .read(cx, view, 1, &read_into_request);
285 },
286 ReaderType::Default(default_reader) => {
287 assert!(
290 default_reader
291 .get()
292 .expect("Reader should be set.")
293 .get_num_read_requests() ==
294 0
295 );
296
297 default_reader
299 .get()
300 .expect("Reader should be set.")
301 .release(cx)
302 .expect("Release should be successful.");
303
304 let byob_reader = self
306 .stream
307 .acquire_byob_reader(CanGc::from_cx(cx))
308 .expect("Reader should be set.");
309
310 *reader = ReaderType::BYOB(MutNullableDom::new(Some(&byob_reader)));
311 self.reader_version
312 .set(self.reader_version.get().wrapping_add(1));
313
314 drop(reader);
315
316 self.forward_reader_error(cx, self.reader.clone());
318
319 self.pull_with_byob_reader(cx, view, for_branch2, global);
321 },
322 }
323 }
324
325 pub(crate) fn pull_algorithm(
327 &self,
328 cx: &mut JSContext,
329 byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
330 ) -> Rc<Promise> {
331 let pull_algorithm =
332 byte_tee_pull_algorithm.unwrap_or(self.byte_tee_pull_algorithm.clone());
333
334 match pull_algorithm {
335 ByteTeePullAlgorithm::Pull1Algorithm => {
336 if self.reading.get() {
338 self.read_again_for_branch_1.set(true);
340 return Promise::new_resolved(
342 &self.stream.global(),
343 cx.into(),
344 (),
345 CanGc::from_cx(cx),
346 );
347 }
348
349 self.reading.set(true);
351
352 let byob_branch_controller = self
354 .branch_1
355 .get()
356 .expect("Branch 1 should be set.")
357 .get_byte_controller();
358 let byob_request = byob_branch_controller
359 .get_byob_request(cx)
360 .expect("Byob request should be set.");
361
362 match byob_request {
363 None => {
365 self.pull_with_default_reader(cx, &self.stream.global())
366 .expect("Pull with default reader should be successful.");
367 },
368 Some(request) => {
369 let view = request.get_view();
371
372 self.pull_with_byob_reader(cx, &view, false, &self.stream.global());
373 },
374 }
375
376 Promise::new_resolved(&self.stream.global(), cx.into(), (), CanGc::from_cx(cx))
378 },
379 ByteTeePullAlgorithm::Pull2Algorithm => {
380 if self.reading.get() {
382 self.read_again_for_branch_2.set(true);
384
385 return Promise::new_resolved(
387 &self.stream.global(),
388 cx.into(),
389 (),
390 CanGc::from_cx(cx),
391 );
392 }
393
394 self.reading.set(true);
396
397 let byob_branch_controller = self
399 .branch_2
400 .get()
401 .expect("Branch 2 should be set.")
402 .get_byte_controller();
403 let byob_request = byob_branch_controller
404 .get_byob_request(cx)
405 .expect("Byob request should be set.");
406
407 match byob_request {
408 None => {
409 self.pull_with_default_reader(cx, &self.stream.global())
410 .expect("Pull with default reader should be successful.");
411 },
412 Some(request) => {
413 let view = request.get_view();
415
416 self.pull_with_byob_reader(cx, &view, true, &self.stream.global());
417 },
418 }
419
420 Promise::new_resolved(&self.stream.global(), cx.into(), (), CanGc::from_cx(cx))
422 },
423 }
424 }
425
426 pub(crate) fn cancel_algorithm(
431 &self,
432 cx: &mut JSContext,
433 reason: SafeHandleValue,
434 ) -> Option<Result<Rc<Promise>, Error>> {
435 match self.tee_cancel_algorithm {
436 ByteTeeCancelAlgorithm::Cancel1Algorithm => {
437 self.canceled_1.set(true);
439
440 self.reason_1.set(reason.get());
442
443 if self.canceled_2.get() {
445 self.resolve_cancel_promise(cx);
446 }
447
448 Some(Ok(self.cancel_promise.clone()))
450 },
451 ByteTeeCancelAlgorithm::Cancel2Algorithm => {
452 self.canceled_2.set(true);
454
455 self.reason_2.set(reason.get());
457
458 if self.canceled_1.get() {
460 self.resolve_cancel_promise(cx);
461 }
462 Some(Ok(self.cancel_promise.clone()))
464 },
465 }
466 }
467
468 #[expect(unsafe_code)]
469 fn resolve_cancel_promise(&self, cx: &mut JSContext) {
470 rooted_vec!(let mut reasons_values);
472 reasons_values.push(self.reason_1.get());
473 reasons_values.push(self.reason_2.get());
474
475 let reasons_values_array = HandleValueArray::from(&reasons_values);
476 rooted!(&in(cx) let reasons = unsafe { NewArrayObject(cx.raw_cx(), &reasons_values_array) });
477 rooted!(&in(cx) let reasons_value = ObjectValue(reasons.get()));
478
479 let cancel_result = self
481 .stream
482 .cancel(cx, &self.stream.global(), reasons_value.handle());
483
484 self.cancel_promise
486 .resolve_native(&cancel_result, CanGc::from_cx(cx));
487 }
488}