script/dom/
byteteeunderlyingsource.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsapi::{HandleValueArray, Heap, NewArrayObject, Value};
10use js::jsval::{ObjectValue, UndefinedValue};
11use js::rust::HandleValue as SafeHandleValue;
12use js::typedarray::ArrayBufferViewU8;
13
14use super::bindings::buffer_source::HeapBufferSource;
15use super::bindings::root::{DomRoot, MutNullableDom};
16use super::byteteereadintorequest::ByteTeeReadIntoRequest;
17use super::readablestream::ReaderType;
18use super::readablestreambyobreader::ReadIntoRequest;
19use super::types::ReadableStream;
20use crate::dom::bindings::error::{Error, Fallible};
21use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
22use crate::dom::bindings::root::Dom;
23use crate::dom::byteteereadrequest::ByteTeeReadRequest;
24use crate::dom::globalscope::GlobalScope;
25use crate::dom::promise::Promise;
26use crate::dom::readablestreamdefaultreader::ReadRequest;
27use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
28
29#[derive(JSTraceable, MallocSizeOf)]
30pub(crate) enum ByteTeeCancelAlgorithm {
31    Cancel1Algorithm,
32    Cancel2Algorithm,
33}
34
35#[derive(Clone, JSTraceable, MallocSizeOf)]
36pub(crate) enum ByteTeePullAlgorithm {
37    Pull1Algorithm,
38    Pull2Algorithm,
39}
40
41#[dom_struct]
42/// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee>
43pub(crate) struct ByteTeeUnderlyingSource {
44    reflector_: Reflector,
45    #[ignore_malloc_size_of = "Rc"]
46    reader: Rc<RefCell<ReaderType>>,
47    stream: Dom<ReadableStream>,
48    branch_1: MutNullableDom<ReadableStream>,
49    branch_2: MutNullableDom<ReadableStream>,
50    #[ignore_malloc_size_of = "Rc"]
51    read_again_for_branch_1: Rc<Cell<bool>>,
52    #[ignore_malloc_size_of = "Rc"]
53    read_again_for_branch_2: Rc<Cell<bool>>,
54    #[ignore_malloc_size_of = "Rc"]
55    reading: Rc<Cell<bool>>,
56    #[ignore_malloc_size_of = "Rc"]
57    canceled_1: Rc<Cell<bool>>,
58    #[ignore_malloc_size_of = "Rc"]
59    canceled_2: Rc<Cell<bool>>,
60    #[ignore_malloc_size_of = "Rc"]
61    #[allow(clippy::redundant_allocation)]
62    reason_1: Rc<Box<Heap<Value>>>,
63    #[ignore_malloc_size_of = "Rc"]
64    #[allow(clippy::redundant_allocation)]
65    reason_2: Rc<Box<Heap<Value>>>,
66    #[ignore_malloc_size_of = "Rc"]
67    cancel_promise: Rc<Promise>,
68    #[ignore_malloc_size_of = "Rc"]
69    reader_version: Rc<Cell<u64>>,
70    tee_cancel_algorithm: ByteTeeCancelAlgorithm,
71    byte_tee_pull_algorithm: ByteTeePullAlgorithm,
72}
73
74impl ByteTeeUnderlyingSource {
75    #[allow(clippy::too_many_arguments)]
76    #[allow(clippy::redundant_allocation)]
77    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
78    pub(crate) fn new(
79        reader: Rc<RefCell<ReaderType>>,
80        stream: &ReadableStream,
81        reading: Rc<Cell<bool>>,
82        read_again_for_branch_1: Rc<Cell<bool>>,
83        read_again_for_branch_2: Rc<Cell<bool>>,
84        canceled_1: Rc<Cell<bool>>,
85        canceled_2: Rc<Cell<bool>>,
86        reason_1: Rc<Box<Heap<Value>>>,
87        reason_2: Rc<Box<Heap<Value>>>,
88        cancel_promise: Rc<Promise>,
89        reader_version: Rc<Cell<u64>>,
90        tee_cancel_algorithm: ByteTeeCancelAlgorithm,
91        byte_tee_pull_algorithm: ByteTeePullAlgorithm,
92        can_gc: CanGc,
93    ) -> DomRoot<ByteTeeUnderlyingSource> {
94        reflect_dom_object(
95            Box::new(ByteTeeUnderlyingSource {
96                reflector_: Reflector::new(),
97                reader,
98                stream: Dom::from_ref(stream),
99                branch_1: MutNullableDom::new(None),
100                branch_2: MutNullableDom::new(None),
101                read_again_for_branch_1,
102                read_again_for_branch_2,
103                reading,
104                canceled_1,
105                canceled_2,
106                reason_1,
107                reason_2,
108                cancel_promise,
109                reader_version,
110                tee_cancel_algorithm,
111                byte_tee_pull_algorithm,
112            }),
113            &*stream.global(),
114            can_gc,
115        )
116    }
117
118    pub(crate) fn set_branch_1(&self, stream: &ReadableStream) {
119        self.branch_1.set(Some(stream));
120    }
121
122    pub(crate) fn set_branch_2(&self, stream: &ReadableStream) {
123        self.branch_2.set(Some(stream));
124    }
125
126    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
127    pub(crate) fn forward_reader_error(&self, this_reader: Rc<RefCell<ReaderType>>, can_gc: CanGc) {
128        let this_reader = this_reader.borrow_mut();
129        match &*this_reader {
130            ReaderType::Default(reader) => {
131                let expected_version = self.reader_version.get();
132                // Upon rejection of thisReader.[[closedPromise]] with reason r,
133                reader
134                    .get()
135                    .expect("Reader should be set.")
136                    .byte_tee_append_native_handler_to_closed_promise(
137                        &self.branch_1.get().expect("Branch 1 should be set."),
138                        &self.branch_2.get().expect("Branch 2 should be set."),
139                        self.canceled_1.clone(),
140                        self.canceled_2.clone(),
141                        self.cancel_promise.clone(),
142                        self.reader_version.clone(),
143                        expected_version,
144                        can_gc,
145                    );
146            },
147            ReaderType::BYOB(reader) => {
148                let expected_version = self.reader_version.get();
149                // Upon rejection of thisReader.[[closedPromise]] with reason r,
150                reader
151                    .get()
152                    .expect("Reader should be set.")
153                    .byte_tee_append_native_handler_to_closed_promise(
154                        &self.branch_1.get().expect("Branch 1 should be set."),
155                        &self.branch_2.get().expect("Branch 2 should be set."),
156                        self.canceled_1.clone(),
157                        self.canceled_2.clone(),
158                        self.cancel_promise.clone(),
159                        self.reader_version.clone(),
160                        expected_version,
161                        can_gc,
162                    );
163            },
164        }
165    }
166
167    pub(crate) fn pull_with_default_reader(
168        &self,
169        cx: SafeJSContext,
170        global: &GlobalScope,
171        can_gc: CanGc,
172    ) -> Fallible<()> {
173        let mut reader = self.reader.borrow_mut();
174        match &*reader {
175            ReaderType::BYOB(byte_reader) => {
176                // Assert: readIntoRequests is empty.
177                assert!(
178                    byte_reader
179                        .get()
180                        .expect("Reader should be set.")
181                        .get_num_read_into_requests() ==
182                        0
183                );
184
185                // Release BYOB reader.
186                byte_reader
187                    .get()
188                    .expect("Reader should be set.")
189                    .release(can_gc)?;
190
191                // Acquire default reader.
192                let default_reader = self
193                    .stream
194                    .acquire_default_reader(can_gc)
195                    .expect("AcquireReadableStreamDefaultReader should not fail");
196
197                *reader = ReaderType::Default(MutNullableDom::new(Some(&default_reader)));
198                self.reader_version
199                    .set(self.reader_version.get().wrapping_add(1));
200                drop(reader);
201
202                // Attach error forwarding for the new reader.
203                self.forward_reader_error(self.reader.clone(), can_gc);
204
205                // IMPORTANT: now actually perform the pull we were asked to do.
206                return self.pull_with_default_reader(cx, global, can_gc);
207            },
208            ReaderType::Default(reader) => {
209                let byte_tee_read_request = ByteTeeReadRequest::new(
210                    &self.branch_1.get().expect("Branch 1 should be set."),
211                    &self.branch_2.get().expect("Branch 2 should be set."),
212                    &self.stream,
213                    self.read_again_for_branch_1.clone(),
214                    self.read_again_for_branch_2.clone(),
215                    self.reading.clone(),
216                    self.canceled_1.clone(),
217                    self.canceled_2.clone(),
218                    self.cancel_promise.clone(),
219                    self,
220                    global,
221                    can_gc,
222                );
223
224                let read_request = ReadRequest::ByteTee {
225                    byte_tee_read_request: Dom::from_ref(&byte_tee_read_request),
226                };
227
228                reader
229                    .get()
230                    .expect("Reader should be set.")
231                    .read(cx, &read_request, can_gc);
232            },
233        }
234
235        Ok(())
236    }
237
238    pub(crate) fn pull_with_byob_reader(
239        &self,
240        view: HeapBufferSource<ArrayBufferViewU8>,
241        for_branch2: bool,
242        cx: SafeJSContext,
243        global: &GlobalScope,
244        can_gc: CanGc,
245    ) {
246        let mut reader = self.reader.borrow_mut();
247        match &*reader {
248            ReaderType::BYOB(reader) => {
249                // Let byobBranch be branch2 if forBranch2 is true, and branch1 otherwise.
250                let byob_branch = if for_branch2 {
251                    self.branch_2.get().expect("Branch 2 should be set.")
252                } else {
253                    self.branch_1.get().expect("Branch 1 should be set.")
254                };
255
256                // let otherBranch be branch2 if forBranch2 is false, and branch1 otherwise.
257                let other_branch = if for_branch2 {
258                    self.branch_1.get().expect("Branch 1 should be set.")
259                } else {
260                    self.branch_2.get().expect("Branch 2 should be set.")
261                };
262
263                // Let readIntoRequest be a read-into request with the following items:
264                let byte_tee_read_into_request = ByteTeeReadIntoRequest::new(
265                    for_branch2,
266                    &byob_branch,
267                    &other_branch,
268                    &self.stream,
269                    self.read_again_for_branch_1.clone(),
270                    self.read_again_for_branch_2.clone(),
271                    self.reading.clone(),
272                    self.canceled_1.clone(),
273                    self.canceled_2.clone(),
274                    self.cancel_promise.clone(),
275                    self,
276                    global,
277                    can_gc,
278                );
279
280                let read_into_request = ReadIntoRequest::ByteTee {
281                    byte_tee_read_into_request: Dom::from_ref(&byte_tee_read_into_request),
282                };
283
284                // Perform ! ReadableStreamBYOBReaderRead(reader, view, 1, readIntoRequest).
285                reader.get().expect("Reader should be set.").read(
286                    cx,
287                    view,
288                    1,
289                    &read_into_request,
290                    can_gc,
291                );
292            },
293            ReaderType::Default(default_reader) => {
294                // If reader implements ReadableStreamDefaultReader,
295                // Assert: reader.[[readRequests]] is empty.
296                assert!(
297                    default_reader
298                        .get()
299                        .expect("Reader should be set.")
300                        .get_num_read_requests() ==
301                        0
302                );
303
304                // Perform ! ReadableStreamDefaultReaderRelease(reader).
305                default_reader
306                    .get()
307                    .expect("Reader should be set.")
308                    .release(can_gc)
309                    .expect("Release should be successful.");
310
311                // Set reader to ! AcquireReadableStreamBYOBReader(stream).
312                let byob_reader = self
313                    .stream
314                    .acquire_byob_reader(can_gc)
315                    .expect("Reader should be set.");
316
317                *reader = ReaderType::BYOB(MutNullableDom::new(Some(&byob_reader)));
318                self.reader_version
319                    .set(self.reader_version.get().wrapping_add(1));
320
321                drop(reader);
322
323                // Perform forwardReaderError, given reader.
324                self.forward_reader_error(self.reader.clone(), can_gc);
325
326                // Retry the pull using the BYOB reader we just acquired.
327                self.pull_with_byob_reader(view, for_branch2, cx, global, can_gc);
328            },
329        }
330    }
331
332    /// Let pullAlgorithm be the following steps:
333    pub(crate) fn pull_algorithm(
334        &self,
335        byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
336        can_gc: CanGc,
337    ) -> Rc<Promise> {
338        let cx = GlobalScope::get_cx();
339
340        let pull_algorithm =
341            byte_tee_pull_algorithm.unwrap_or(self.byte_tee_pull_algorithm.clone());
342
343        match pull_algorithm {
344            ByteTeePullAlgorithm::Pull1Algorithm => {
345                // If reading is true,
346                if self.reading.get() {
347                    // Set readAgainForBranch1 to true.
348                    self.read_again_for_branch_1.set(true);
349                    // Return a promise resolved with undefined.
350                    rooted!(in(*cx) let mut rval = UndefinedValue());
351                    return Promise::new_resolved(&self.stream.global(), cx, rval.handle(), can_gc);
352                }
353
354                // Set reading to true.
355                self.reading.set(true);
356
357                // Let byobRequest be ! ReadableByteStreamControllerGetBYOBRequest(branch1.[[controller]]).
358                let byob_branch_controller = self
359                    .branch_1
360                    .get()
361                    .expect("Branch 1 should be set.")
362                    .get_byte_controller();
363                let byob_request = byob_branch_controller
364                    .get_byob_request(cx, can_gc)
365                    .expect("Byob request should be set.");
366
367                match byob_request {
368                    // If byobRequest is null, perform pullWithDefaultReader.
369                    None => {
370                        self.pull_with_default_reader(cx, &self.stream.global(), can_gc)
371                            .expect("Pull with default reader should be successful.");
372                    },
373                    Some(request) => {
374                        // Otherwise, perform pullWithBYOBReader, given byobRequest.[[view]] and false.
375                        let view = request.get_view();
376
377                        self.pull_with_byob_reader(view, false, cx, &self.stream.global(), can_gc);
378                    },
379                }
380
381                // Return a promise resolved with undefined.
382                rooted!(in(*cx) let mut rval = UndefinedValue());
383                Promise::new_resolved(&self.stream.global(), cx, rval.handle(), can_gc)
384            },
385            ByteTeePullAlgorithm::Pull2Algorithm => {
386                // If reading is true,
387                if self.reading.get() {
388                    // Set readAgainForBranch2 to true.
389                    self.read_again_for_branch_2.set(true);
390
391                    // Return a promise resolved with undefined.
392                    rooted!(in(*cx) let mut rval = UndefinedValue());
393                    return Promise::new_resolved(&self.stream.global(), cx, rval.handle(), can_gc);
394                }
395
396                // Set reading to true.
397                self.reading.set(true);
398
399                // Let byobRequest be ! ReadableByteStreamControllerGetBYOBRequest(branch2.[[controller]]).
400                let byob_branch_controller = self
401                    .branch_2
402                    .get()
403                    .expect("Branch 2 should be set.")
404                    .get_byte_controller();
405                let byob_request = byob_branch_controller
406                    .get_byob_request(cx, can_gc)
407                    .expect("Byob request should be set.");
408
409                match byob_request {
410                    None => {
411                        self.pull_with_default_reader(cx, &self.stream.global(), can_gc)
412                            .expect("Pull with default reader should be successful.");
413                    },
414                    Some(request) => {
415                        // Otherwise, perform pullWithBYOBReader, given byobRequest.[[view]] and true.
416                        let view = request.get_view();
417
418                        self.pull_with_byob_reader(view, true, cx, &self.stream.global(), can_gc);
419                    },
420                }
421
422                // Return a promise resolved with undefined.
423                rooted!(in(*cx) let mut rval = UndefinedValue());
424                Promise::new_resolved(&self.stream.global(), cx, rval.handle(), can_gc)
425            },
426        }
427    }
428
429    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee>
430    /// Let cancel1Algorithm be the following steps, taking a reason argument
431    /// and
432    /// Let cancel2Algorithm be the following steps, taking a reason argument
433    pub(crate) fn cancel_algorithm(
434        &self,
435        reason: SafeHandleValue,
436        can_gc: CanGc,
437    ) -> Option<Result<Rc<Promise>, Error>> {
438        match self.tee_cancel_algorithm {
439            ByteTeeCancelAlgorithm::Cancel1Algorithm => {
440                // Set canceled1 to true.
441                self.canceled_1.set(true);
442
443                // Set reason1 to reason.
444                self.reason_1.set(reason.get());
445
446                // If canceled2 is true,
447                if self.canceled_2.get() {
448                    self.resolve_cancel_promise(can_gc);
449                }
450
451                // Return cancelPromise.
452                Some(Ok(self.cancel_promise.clone()))
453            },
454            ByteTeeCancelAlgorithm::Cancel2Algorithm => {
455                // Set canceled_2 to true.
456                self.canceled_2.set(true);
457
458                // Set reason_2 to reason.
459                self.reason_2.set(reason.get());
460
461                // If canceled_1 is true,
462                if self.canceled_1.get() {
463                    self.resolve_cancel_promise(can_gc);
464                }
465                // Return cancelPromise.
466                Some(Ok(self.cancel_promise.clone()))
467            },
468        }
469    }
470
471    #[expect(unsafe_code)]
472    fn resolve_cancel_promise(&self, can_gc: CanGc) {
473        // Let compositeReason be ! CreateArrayFromList(« reason_1, reason_2 »).
474        let cx = GlobalScope::get_cx();
475        rooted_vec!(let mut reasons_values);
476        reasons_values.push(self.reason_1.get());
477        reasons_values.push(self.reason_2.get());
478
479        let reasons_values_array = HandleValueArray::from(&reasons_values);
480        rooted!(in(*cx) let reasons = unsafe { NewArrayObject(*cx, &reasons_values_array) });
481        rooted!(in(*cx) let reasons_value = ObjectValue(reasons.get()));
482
483        // Let cancelResult be ! ReadableStreamCancel(stream, compositeReason).
484        let cancel_result =
485            self.stream
486                .cancel(cx, &self.stream.global(), reasons_value.handle(), can_gc);
487
488        // Resolve cancelPromise with cancelResult.
489        self.cancel_promise.resolve_native(&cancel_result, can_gc);
490    }
491}