script/dom/stream/
byteteeunderlyingsource.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsapi::{HandleValueArray, Heap, NewArrayObject, Value};
10use js::jsval::{ObjectValue, UndefinedValue};
11use js::rust::HandleValue as SafeHandleValue;
12use js::typedarray::ArrayBufferViewU8;
13
14use super::byteteereadintorequest::ByteTeeReadIntoRequest;
15use super::readablestream::ReaderType;
16use super::readablestreambyobreader::ReadIntoRequest;
17use crate::dom::bindings::buffer_source::HeapBufferSource;
18use crate::dom::bindings::error::{Error, Fallible};
19use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
20use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
21use crate::dom::globalscope::GlobalScope;
22use crate::dom::promise::Promise;
23use crate::dom::stream::byteteereadrequest::ByteTeeReadRequest;
24use crate::dom::stream::readablestreamdefaultreader::ReadRequest;
25use crate::dom::types::ReadableStream;
26use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
27
28#[derive(JSTraceable, MallocSizeOf)]
29pub(crate) enum ByteTeeCancelAlgorithm {
30    Cancel1Algorithm,
31    Cancel2Algorithm,
32}
33
34#[derive(Clone, JSTraceable, MallocSizeOf)]
35pub(crate) enum ByteTeePullAlgorithm {
36    Pull1Algorithm,
37    Pull2Algorithm,
38}
39
40#[dom_struct]
41/// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee>
42pub(crate) struct ByteTeeUnderlyingSource {
43    reflector_: Reflector,
44    #[ignore_malloc_size_of = "Rc"]
45    reader: Rc<RefCell<ReaderType>>,
46    stream: Dom<ReadableStream>,
47    branch_1: MutNullableDom<ReadableStream>,
48    branch_2: MutNullableDom<ReadableStream>,
49    #[ignore_malloc_size_of = "Rc"]
50    read_again_for_branch_1: Rc<Cell<bool>>,
51    #[ignore_malloc_size_of = "Rc"]
52    read_again_for_branch_2: Rc<Cell<bool>>,
53    #[ignore_malloc_size_of = "Rc"]
54    reading: Rc<Cell<bool>>,
55    #[ignore_malloc_size_of = "Rc"]
56    canceled_1: Rc<Cell<bool>>,
57    #[ignore_malloc_size_of = "Rc"]
58    canceled_2: Rc<Cell<bool>>,
59    #[ignore_malloc_size_of = "Rc"]
60    #[allow(clippy::redundant_allocation)]
61    reason_1: Rc<Box<Heap<Value>>>,
62    #[ignore_malloc_size_of = "Rc"]
63    #[allow(clippy::redundant_allocation)]
64    reason_2: Rc<Box<Heap<Value>>>,
65    #[ignore_malloc_size_of = "Rc"]
66    cancel_promise: Rc<Promise>,
67    #[ignore_malloc_size_of = "Rc"]
68    reader_version: Rc<Cell<u64>>,
69    tee_cancel_algorithm: ByteTeeCancelAlgorithm,
70    byte_tee_pull_algorithm: ByteTeePullAlgorithm,
71}
72
73impl ByteTeeUnderlyingSource {
74    #[allow(clippy::too_many_arguments)]
75    #[allow(clippy::redundant_allocation)]
76    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
77    pub(crate) fn new(
78        reader: Rc<RefCell<ReaderType>>,
79        stream: &ReadableStream,
80        reading: Rc<Cell<bool>>,
81        read_again_for_branch_1: Rc<Cell<bool>>,
82        read_again_for_branch_2: Rc<Cell<bool>>,
83        canceled_1: Rc<Cell<bool>>,
84        canceled_2: Rc<Cell<bool>>,
85        reason_1: Rc<Box<Heap<Value>>>,
86        reason_2: Rc<Box<Heap<Value>>>,
87        cancel_promise: Rc<Promise>,
88        reader_version: Rc<Cell<u64>>,
89        tee_cancel_algorithm: ByteTeeCancelAlgorithm,
90        byte_tee_pull_algorithm: ByteTeePullAlgorithm,
91        can_gc: CanGc,
92    ) -> DomRoot<ByteTeeUnderlyingSource> {
93        reflect_dom_object(
94            Box::new(ByteTeeUnderlyingSource {
95                reflector_: Reflector::new(),
96                reader,
97                stream: Dom::from_ref(stream),
98                branch_1: MutNullableDom::new(None),
99                branch_2: MutNullableDom::new(None),
100                read_again_for_branch_1,
101                read_again_for_branch_2,
102                reading,
103                canceled_1,
104                canceled_2,
105                reason_1,
106                reason_2,
107                cancel_promise,
108                reader_version,
109                tee_cancel_algorithm,
110                byte_tee_pull_algorithm,
111            }),
112            &*stream.global(),
113            can_gc,
114        )
115    }
116
117    pub(crate) fn set_branch_1(&self, stream: &ReadableStream) {
118        self.branch_1.set(Some(stream));
119    }
120
121    pub(crate) fn set_branch_2(&self, stream: &ReadableStream) {
122        self.branch_2.set(Some(stream));
123    }
124
125    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
126    pub(crate) fn forward_reader_error(&self, this_reader: Rc<RefCell<ReaderType>>, can_gc: CanGc) {
127        let this_reader = this_reader.borrow_mut();
128        match &*this_reader {
129            ReaderType::Default(reader) => {
130                let expected_version = self.reader_version.get();
131                // Upon rejection of thisReader.[[closedPromise]] with reason r,
132                reader
133                    .get()
134                    .expect("Reader should be set.")
135                    .byte_tee_append_native_handler_to_closed_promise(
136                        &self.branch_1.get().expect("Branch 1 should be set."),
137                        &self.branch_2.get().expect("Branch 2 should be set."),
138                        self.canceled_1.clone(),
139                        self.canceled_2.clone(),
140                        self.cancel_promise.clone(),
141                        self.reader_version.clone(),
142                        expected_version,
143                        can_gc,
144                    );
145            },
146            ReaderType::BYOB(reader) => {
147                let expected_version = self.reader_version.get();
148                // Upon rejection of thisReader.[[closedPromise]] with reason r,
149                reader
150                    .get()
151                    .expect("Reader should be set.")
152                    .byte_tee_append_native_handler_to_closed_promise(
153                        &self.branch_1.get().expect("Branch 1 should be set."),
154                        &self.branch_2.get().expect("Branch 2 should be set."),
155                        self.canceled_1.clone(),
156                        self.canceled_2.clone(),
157                        self.cancel_promise.clone(),
158                        self.reader_version.clone(),
159                        expected_version,
160                        can_gc,
161                    );
162            },
163        }
164    }
165
166    pub(crate) fn pull_with_default_reader(
167        &self,
168        cx: SafeJSContext,
169        global: &GlobalScope,
170        can_gc: CanGc,
171    ) -> Fallible<()> {
172        let mut reader = self.reader.borrow_mut();
173        match &*reader {
174            ReaderType::BYOB(byte_reader) => {
175                // Assert: readIntoRequests is empty.
176                assert!(
177                    byte_reader
178                        .get()
179                        .expect("Reader should be set.")
180                        .get_num_read_into_requests() ==
181                        0
182                );
183
184                // Release BYOB reader.
185                byte_reader
186                    .get()
187                    .expect("Reader should be set.")
188                    .release(can_gc)?;
189
190                // Acquire default reader.
191                let default_reader = self
192                    .stream
193                    .acquire_default_reader(can_gc)
194                    .expect("AcquireReadableStreamDefaultReader should not fail");
195
196                *reader = ReaderType::Default(MutNullableDom::new(Some(&default_reader)));
197                self.reader_version
198                    .set(self.reader_version.get().wrapping_add(1));
199                drop(reader);
200
201                // Attach error forwarding for the new reader.
202                self.forward_reader_error(self.reader.clone(), can_gc);
203
204                // IMPORTANT: now actually perform the pull we were asked to do.
205                return self.pull_with_default_reader(cx, global, can_gc);
206            },
207            ReaderType::Default(reader) => {
208                let byte_tee_read_request = ByteTeeReadRequest::new(
209                    &self.branch_1.get().expect("Branch 1 should be set."),
210                    &self.branch_2.get().expect("Branch 2 should be set."),
211                    &self.stream,
212                    self.read_again_for_branch_1.clone(),
213                    self.read_again_for_branch_2.clone(),
214                    self.reading.clone(),
215                    self.canceled_1.clone(),
216                    self.canceled_2.clone(),
217                    self.cancel_promise.clone(),
218                    self,
219                    global,
220                    can_gc,
221                );
222
223                let read_request = ReadRequest::ByteTee {
224                    byte_tee_read_request: Dom::from_ref(&byte_tee_read_request),
225                };
226
227                reader
228                    .get()
229                    .expect("Reader should be set.")
230                    .read(cx, &read_request, can_gc);
231            },
232        }
233
234        Ok(())
235    }
236
237    pub(crate) fn pull_with_byob_reader(
238        &self,
239        view: HeapBufferSource<ArrayBufferViewU8>,
240        for_branch2: bool,
241        cx: SafeJSContext,
242        global: &GlobalScope,
243        can_gc: CanGc,
244    ) {
245        let mut reader = self.reader.borrow_mut();
246        match &*reader {
247            ReaderType::BYOB(reader) => {
248                // Let byobBranch be branch2 if forBranch2 is true, and branch1 otherwise.
249                let byob_branch = if for_branch2 {
250                    self.branch_2.get().expect("Branch 2 should be set.")
251                } else {
252                    self.branch_1.get().expect("Branch 1 should be set.")
253                };
254
255                // let otherBranch be branch2 if forBranch2 is false, and branch1 otherwise.
256                let other_branch = if for_branch2 {
257                    self.branch_1.get().expect("Branch 1 should be set.")
258                } else {
259                    self.branch_2.get().expect("Branch 2 should be set.")
260                };
261
262                // Let readIntoRequest be a read-into request with the following items:
263                let byte_tee_read_into_request = ByteTeeReadIntoRequest::new(
264                    for_branch2,
265                    &byob_branch,
266                    &other_branch,
267                    &self.stream,
268                    self.read_again_for_branch_1.clone(),
269                    self.read_again_for_branch_2.clone(),
270                    self.reading.clone(),
271                    self.canceled_1.clone(),
272                    self.canceled_2.clone(),
273                    self.cancel_promise.clone(),
274                    self,
275                    global,
276                    can_gc,
277                );
278
279                let read_into_request = ReadIntoRequest::ByteTee {
280                    byte_tee_read_into_request: Dom::from_ref(&byte_tee_read_into_request),
281                };
282
283                // Perform ! ReadableStreamBYOBReaderRead(reader, view, 1, readIntoRequest).
284                reader.get().expect("Reader should be set.").read(
285                    cx,
286                    view,
287                    1,
288                    &read_into_request,
289                    can_gc,
290                );
291            },
292            ReaderType::Default(default_reader) => {
293                // If reader implements ReadableStreamDefaultReader,
294                // Assert: reader.[[readRequests]] is empty.
295                assert!(
296                    default_reader
297                        .get()
298                        .expect("Reader should be set.")
299                        .get_num_read_requests() ==
300                        0
301                );
302
303                // Perform ! ReadableStreamDefaultReaderRelease(reader).
304                default_reader
305                    .get()
306                    .expect("Reader should be set.")
307                    .release(can_gc)
308                    .expect("Release should be successful.");
309
310                // Set reader to ! AcquireReadableStreamBYOBReader(stream).
311                let byob_reader = self
312                    .stream
313                    .acquire_byob_reader(can_gc)
314                    .expect("Reader should be set.");
315
316                *reader = ReaderType::BYOB(MutNullableDom::new(Some(&byob_reader)));
317                self.reader_version
318                    .set(self.reader_version.get().wrapping_add(1));
319
320                drop(reader);
321
322                // Perform forwardReaderError, given reader.
323                self.forward_reader_error(self.reader.clone(), can_gc);
324
325                // Retry the pull using the BYOB reader we just acquired.
326                self.pull_with_byob_reader(view, for_branch2, cx, global, can_gc);
327            },
328        }
329    }
330
331    /// Let pullAlgorithm be the following steps:
332    pub(crate) fn pull_algorithm(
333        &self,
334        byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
335        can_gc: CanGc,
336    ) -> Rc<Promise> {
337        let cx = GlobalScope::get_cx();
338
339        let pull_algorithm =
340            byte_tee_pull_algorithm.unwrap_or(self.byte_tee_pull_algorithm.clone());
341
342        match pull_algorithm {
343            ByteTeePullAlgorithm::Pull1Algorithm => {
344                // If reading is true,
345                if self.reading.get() {
346                    // Set readAgainForBranch1 to true.
347                    self.read_again_for_branch_1.set(true);
348                    // Return a promise resolved with undefined.
349                    rooted!(in(*cx) let mut rval = UndefinedValue());
350                    return Promise::new_resolved(&self.stream.global(), cx, rval.handle(), can_gc);
351                }
352
353                // Set reading to true.
354                self.reading.set(true);
355
356                // Let byobRequest be ! ReadableByteStreamControllerGetBYOBRequest(branch1.[[controller]]).
357                let byob_branch_controller = self
358                    .branch_1
359                    .get()
360                    .expect("Branch 1 should be set.")
361                    .get_byte_controller();
362                let byob_request = byob_branch_controller
363                    .get_byob_request(cx, can_gc)
364                    .expect("Byob request should be set.");
365
366                match byob_request {
367                    // If byobRequest is null, perform pullWithDefaultReader.
368                    None => {
369                        self.pull_with_default_reader(cx, &self.stream.global(), can_gc)
370                            .expect("Pull with default reader should be successful.");
371                    },
372                    Some(request) => {
373                        // Otherwise, perform pullWithBYOBReader, given byobRequest.[[view]] and false.
374                        let view = request.get_view();
375
376                        self.pull_with_byob_reader(view, false, cx, &self.stream.global(), can_gc);
377                    },
378                }
379
380                // Return a promise resolved with undefined.
381                rooted!(in(*cx) let mut rval = UndefinedValue());
382                Promise::new_resolved(&self.stream.global(), cx, rval.handle(), can_gc)
383            },
384            ByteTeePullAlgorithm::Pull2Algorithm => {
385                // If reading is true,
386                if self.reading.get() {
387                    // Set readAgainForBranch2 to true.
388                    self.read_again_for_branch_2.set(true);
389
390                    // Return a promise resolved with undefined.
391                    rooted!(in(*cx) let mut rval = UndefinedValue());
392                    return Promise::new_resolved(&self.stream.global(), cx, rval.handle(), can_gc);
393                }
394
395                // Set reading to true.
396                self.reading.set(true);
397
398                // Let byobRequest be ! ReadableByteStreamControllerGetBYOBRequest(branch2.[[controller]]).
399                let byob_branch_controller = self
400                    .branch_2
401                    .get()
402                    .expect("Branch 2 should be set.")
403                    .get_byte_controller();
404                let byob_request = byob_branch_controller
405                    .get_byob_request(cx, can_gc)
406                    .expect("Byob request should be set.");
407
408                match byob_request {
409                    None => {
410                        self.pull_with_default_reader(cx, &self.stream.global(), can_gc)
411                            .expect("Pull with default reader should be successful.");
412                    },
413                    Some(request) => {
414                        // Otherwise, perform pullWithBYOBReader, given byobRequest.[[view]] and true.
415                        let view = request.get_view();
416
417                        self.pull_with_byob_reader(view, true, cx, &self.stream.global(), can_gc);
418                    },
419                }
420
421                // Return a promise resolved with undefined.
422                rooted!(in(*cx) let mut rval = UndefinedValue());
423                Promise::new_resolved(&self.stream.global(), cx, rval.handle(), can_gc)
424            },
425        }
426    }
427
428    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee>
429    /// Let cancel1Algorithm be the following steps, taking a reason argument
430    /// and
431    /// Let cancel2Algorithm be the following steps, taking a reason argument
432    pub(crate) fn cancel_algorithm(
433        &self,
434        reason: SafeHandleValue,
435        can_gc: CanGc,
436    ) -> Option<Result<Rc<Promise>, Error>> {
437        match self.tee_cancel_algorithm {
438            ByteTeeCancelAlgorithm::Cancel1Algorithm => {
439                // Set canceled1 to true.
440                self.canceled_1.set(true);
441
442                // Set reason1 to reason.
443                self.reason_1.set(reason.get());
444
445                // If canceled2 is true,
446                if self.canceled_2.get() {
447                    self.resolve_cancel_promise(can_gc);
448                }
449
450                // Return cancelPromise.
451                Some(Ok(self.cancel_promise.clone()))
452            },
453            ByteTeeCancelAlgorithm::Cancel2Algorithm => {
454                // Set canceled_2 to true.
455                self.canceled_2.set(true);
456
457                // Set reason_2 to reason.
458                self.reason_2.set(reason.get());
459
460                // If canceled_1 is true,
461                if self.canceled_1.get() {
462                    self.resolve_cancel_promise(can_gc);
463                }
464                // Return cancelPromise.
465                Some(Ok(self.cancel_promise.clone()))
466            },
467        }
468    }
469
470    #[expect(unsafe_code)]
471    fn resolve_cancel_promise(&self, can_gc: CanGc) {
472        // Let compositeReason be ! CreateArrayFromList(« reason_1, reason_2 »).
473        let cx = GlobalScope::get_cx();
474        rooted_vec!(let mut reasons_values);
475        reasons_values.push(self.reason_1.get());
476        reasons_values.push(self.reason_2.get());
477
478        let reasons_values_array = HandleValueArray::from(&reasons_values);
479        rooted!(in(*cx) let reasons = unsafe { NewArrayObject(*cx, &reasons_values_array) });
480        rooted!(in(*cx) let reasons_value = ObjectValue(reasons.get()));
481
482        // Let cancelResult be ! ReadableStreamCancel(stream, compositeReason).
483        let cancel_result =
484            self.stream
485                .cancel(cx, &self.stream.global(), reasons_value.handle(), can_gc);
486
487        // Resolve cancelPromise with cancelResult.
488        self.cancel_promise.resolve_native(&cancel_result, can_gc);
489    }
490}