Skip to main content

script/dom/stream/
byteteeunderlyingsource.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::context::JSContext;
10use js::jsapi::{HandleValueArray, Heap, NewArrayObject, Value};
11use js::jsval::ObjectValue;
12use js::rust::HandleValue as SafeHandleValue;
13use js::typedarray::ArrayBufferViewU8;
14use script_bindings::reflector::{Reflector, reflect_dom_object_with_cx};
15
16use super::byteteereadintorequest::ByteTeeReadIntoRequest;
17use super::readablestream::ReaderType;
18use super::readablestreambyobreader::ReadIntoRequest;
19use crate::dom::bindings::buffer_source::HeapBufferSource;
20use crate::dom::bindings::error::{Error, Fallible};
21use crate::dom::bindings::reflector::DomGlobal;
22use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
23use crate::dom::globalscope::GlobalScope;
24use crate::dom::promise::Promise;
25use crate::dom::stream::byteteereadrequest::ByteTeeReadRequest;
26use crate::dom::stream::readablestreamdefaultreader::ReadRequest;
27use crate::dom::types::ReadableStream;
28
29#[derive(JSTraceable, MallocSizeOf)]
30pub(crate) enum ByteTeeCancelAlgorithm {
31    Cancel1Algorithm,
32    Cancel2Algorithm,
33}
34
35#[derive(Clone, JSTraceable, MallocSizeOf)]
36pub(crate) enum ByteTeePullAlgorithm {
37    Pull1Algorithm,
38    Pull2Algorithm,
39}
40
41#[dom_struct]
42/// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee>
43pub(crate) struct ByteTeeUnderlyingSource {
44    reflector_: Reflector,
45    #[conditional_malloc_size_of]
46    reader: Rc<RefCell<ReaderType>>,
47    stream: Dom<ReadableStream>,
48    branch_1: MutNullableDom<ReadableStream>,
49    branch_2: MutNullableDom<ReadableStream>,
50    #[conditional_malloc_size_of]
51    read_again_for_branch_1: Rc<Cell<bool>>,
52    #[conditional_malloc_size_of]
53    read_again_for_branch_2: Rc<Cell<bool>>,
54    #[conditional_malloc_size_of]
55    reading: Rc<Cell<bool>>,
56    #[conditional_malloc_size_of]
57    canceled_1: Rc<Cell<bool>>,
58    #[conditional_malloc_size_of]
59    canceled_2: Rc<Cell<bool>>,
60    #[ignore_malloc_size_of = "Mozjs"]
61    reason_1: Rc<Heap<Value>>,
62    #[ignore_malloc_size_of = "Mozjs"]
63    reason_2: Rc<Heap<Value>>,
64    #[conditional_malloc_size_of]
65    cancel_promise: Rc<Promise>,
66    #[conditional_malloc_size_of]
67    reader_version: Rc<Cell<u64>>,
68    tee_cancel_algorithm: ByteTeeCancelAlgorithm,
69    byte_tee_pull_algorithm: ByteTeePullAlgorithm,
70}
71
72impl ByteTeeUnderlyingSource {
73    #[allow(clippy::too_many_arguments)]
74    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
75    pub(crate) fn new(
76        cx: &mut JSContext,
77        reader: Rc<RefCell<ReaderType>>,
78        stream: &ReadableStream,
79        reading: Rc<Cell<bool>>,
80        read_again_for_branch_1: Rc<Cell<bool>>,
81        read_again_for_branch_2: Rc<Cell<bool>>,
82        canceled_1: Rc<Cell<bool>>,
83        canceled_2: Rc<Cell<bool>>,
84        reason_1: Rc<Heap<Value>>,
85        reason_2: Rc<Heap<Value>>,
86        cancel_promise: Rc<Promise>,
87        reader_version: Rc<Cell<u64>>,
88        tee_cancel_algorithm: ByteTeeCancelAlgorithm,
89        byte_tee_pull_algorithm: ByteTeePullAlgorithm,
90    ) -> DomRoot<ByteTeeUnderlyingSource> {
91        reflect_dom_object_with_cx(
92            Box::new(ByteTeeUnderlyingSource {
93                reflector_: Reflector::new(),
94                reader,
95                stream: Dom::from_ref(stream),
96                branch_1: MutNullableDom::new(None),
97                branch_2: MutNullableDom::new(None),
98                read_again_for_branch_1,
99                read_again_for_branch_2,
100                reading,
101                canceled_1,
102                canceled_2,
103                reason_1,
104                reason_2,
105                cancel_promise,
106                reader_version,
107                tee_cancel_algorithm,
108                byte_tee_pull_algorithm,
109            }),
110            &*stream.global(),
111            cx,
112        )
113    }
114
115    pub(crate) fn set_branch_1(&self, stream: &ReadableStream) {
116        self.branch_1.set(Some(stream));
117    }
118
119    pub(crate) fn set_branch_2(&self, stream: &ReadableStream) {
120        self.branch_2.set(Some(stream));
121    }
122
123    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
124    pub(crate) fn forward_reader_error(
125        &self,
126        cx: &mut JSContext,
127        this_reader: Rc<RefCell<ReaderType>>,
128    ) {
129        let this_reader = this_reader.borrow_mut();
130        match &*this_reader {
131            ReaderType::Default(reader) => {
132                let expected_version = self.reader_version.get();
133                // Upon rejection of thisReader.[[closedPromise]] with reason r,
134                reader
135                    .get()
136                    .expect("Reader should be set.")
137                    .byte_tee_append_native_handler_to_closed_promise(
138                        cx,
139                        &self.branch_1.get().expect("Branch 1 should be set."),
140                        &self.branch_2.get().expect("Branch 2 should be set."),
141                        self.canceled_1.clone(),
142                        self.canceled_2.clone(),
143                        self.cancel_promise.clone(),
144                        self.reader_version.clone(),
145                        expected_version,
146                    );
147            },
148            ReaderType::BYOB(reader) => {
149                let expected_version = self.reader_version.get();
150                // Upon rejection of thisReader.[[closedPromise]] with reason r,
151                reader
152                    .get()
153                    .expect("Reader should be set.")
154                    .byte_tee_append_native_handler_to_closed_promise(
155                        cx,
156                        &self.branch_1.get().expect("Branch 1 should be set."),
157                        &self.branch_2.get().expect("Branch 2 should be set."),
158                        self.canceled_1.clone(),
159                        self.canceled_2.clone(),
160                        self.cancel_promise.clone(),
161                        self.reader_version.clone(),
162                        expected_version,
163                    );
164            },
165        }
166    }
167
168    fn pull_with_default_reader(&self, cx: &mut JSContext, global: &GlobalScope) -> Fallible<()> {
169        let mut reader = self.reader.borrow_mut();
170        match &*reader {
171            ReaderType::BYOB(byte_reader) => {
172                // Assert: readIntoRequests is empty.
173                assert!(
174                    byte_reader
175                        .get()
176                        .expect("Reader should be set.")
177                        .get_num_read_into_requests() ==
178                        0
179                );
180
181                // Release BYOB reader.
182                byte_reader
183                    .get()
184                    .expect("Reader should be set.")
185                    .release(cx)?;
186
187                // Acquire default reader.
188                let default_reader = self
189                    .stream
190                    .acquire_default_reader(cx)
191                    .expect("AcquireReadableStreamDefaultReader should not fail");
192
193                *reader = ReaderType::Default(MutNullableDom::new(Some(&default_reader)));
194                self.reader_version
195                    .set(self.reader_version.get().wrapping_add(1));
196                drop(reader);
197
198                // Attach error forwarding for the new reader.
199                self.forward_reader_error(cx, self.reader.clone());
200
201                // IMPORTANT: now actually perform the pull we were asked to do.
202                return self.pull_with_default_reader(cx, global);
203            },
204            ReaderType::Default(reader) => {
205                let byte_tee_read_request = ByteTeeReadRequest::new(
206                    cx,
207                    &self.branch_1.get().expect("Branch 1 should be set."),
208                    &self.branch_2.get().expect("Branch 2 should be set."),
209                    &self.stream,
210                    self.read_again_for_branch_1.clone(),
211                    self.read_again_for_branch_2.clone(),
212                    self.reading.clone(),
213                    self.canceled_1.clone(),
214                    self.canceled_2.clone(),
215                    self.cancel_promise.clone(),
216                    self,
217                    global,
218                );
219
220                let read_request = ReadRequest::ByteTee {
221                    byte_tee_read_request: Dom::from_ref(&byte_tee_read_request),
222                };
223
224                reader
225                    .get()
226                    .expect("Reader should be set.")
227                    .read(cx, &read_request);
228            },
229        }
230
231        Ok(())
232    }
233
234    fn pull_with_byob_reader(
235        &self,
236        cx: &mut JSContext,
237        view: &HeapBufferSource<ArrayBufferViewU8>,
238        for_branch2: bool,
239        global: &GlobalScope,
240    ) {
241        let mut reader = self.reader.borrow_mut();
242        match &*reader {
243            ReaderType::BYOB(reader) => {
244                // Let byobBranch be branch2 if forBranch2 is true, and branch1 otherwise.
245                let byob_branch = if for_branch2 {
246                    self.branch_2.get().expect("Branch 2 should be set.")
247                } else {
248                    self.branch_1.get().expect("Branch 1 should be set.")
249                };
250
251                // let otherBranch be branch2 if forBranch2 is false, and branch1 otherwise.
252                let other_branch = if for_branch2 {
253                    self.branch_1.get().expect("Branch 1 should be set.")
254                } else {
255                    self.branch_2.get().expect("Branch 2 should be set.")
256                };
257
258                // Let readIntoRequest be a read-into request with the following items:
259                let byte_tee_read_into_request = ByteTeeReadIntoRequest::new(
260                    cx,
261                    for_branch2,
262                    &byob_branch,
263                    &other_branch,
264                    &self.stream,
265                    self.read_again_for_branch_1.clone(),
266                    self.read_again_for_branch_2.clone(),
267                    self.reading.clone(),
268                    self.canceled_1.clone(),
269                    self.canceled_2.clone(),
270                    self.cancel_promise.clone(),
271                    self,
272                    global,
273                );
274
275                let read_into_request = ReadIntoRequest::ByteTee {
276                    byte_tee_read_into_request: Dom::from_ref(&byte_tee_read_into_request),
277                };
278
279                // Perform ! ReadableStreamBYOBReaderRead(reader, view, 1, readIntoRequest).
280                reader
281                    .get()
282                    .expect("Reader should be set.")
283                    .read(cx, view, 1, &read_into_request);
284            },
285            ReaderType::Default(default_reader) => {
286                // If reader implements ReadableStreamDefaultReader,
287                // Assert: reader.[[readRequests]] is empty.
288                assert!(
289                    default_reader
290                        .get()
291                        .expect("Reader should be set.")
292                        .get_num_read_requests() ==
293                        0
294                );
295
296                // Perform ! ReadableStreamDefaultReaderRelease(reader).
297                default_reader
298                    .get()
299                    .expect("Reader should be set.")
300                    .release(cx)
301                    .expect("Release should be successful.");
302
303                // Set reader to ! AcquireReadableStreamBYOBReader(stream).
304                let byob_reader = self
305                    .stream
306                    .acquire_byob_reader(cx)
307                    .expect("Reader should be set.");
308
309                *reader = ReaderType::BYOB(MutNullableDom::new(Some(&byob_reader)));
310                self.reader_version
311                    .set(self.reader_version.get().wrapping_add(1));
312
313                drop(reader);
314
315                // Perform forwardReaderError, given reader.
316                self.forward_reader_error(cx, self.reader.clone());
317
318                // Retry the pull using the BYOB reader we just acquired.
319                self.pull_with_byob_reader(cx, view, for_branch2, global);
320            },
321        }
322    }
323
324    /// Let pullAlgorithm be the following steps:
325    pub(crate) fn pull_algorithm(
326        &self,
327        cx: &mut JSContext,
328        byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
329    ) -> Rc<Promise> {
330        let pull_algorithm =
331            byte_tee_pull_algorithm.unwrap_or(self.byte_tee_pull_algorithm.clone());
332
333        match pull_algorithm {
334            ByteTeePullAlgorithm::Pull1Algorithm => {
335                // If reading is true,
336                if self.reading.get() {
337                    // Set readAgainForBranch1 to true.
338                    self.read_again_for_branch_1.set(true);
339                    // Return a promise resolved with undefined.
340                    return Promise::new_resolved(cx, &self.stream.global(), ());
341                }
342
343                // Set reading to true.
344                self.reading.set(true);
345
346                // Let byobRequest be ! ReadableByteStreamControllerGetBYOBRequest(branch1.[[controller]]).
347                let byob_branch_controller = self
348                    .branch_1
349                    .get()
350                    .expect("Branch 1 should be set.")
351                    .get_byte_controller();
352                let byob_request = byob_branch_controller
353                    .get_byob_request(cx)
354                    .expect("Byob request should be set.");
355
356                match byob_request {
357                    // If byobRequest is null, perform pullWithDefaultReader.
358                    None => {
359                        self.pull_with_default_reader(cx, &self.stream.global())
360                            .expect("Pull with default reader should be successful.");
361                    },
362                    Some(request) => {
363                        // Otherwise, perform pullWithBYOBReader, given byobRequest.[[view]] and false.
364                        let view = request.get_view();
365
366                        self.pull_with_byob_reader(cx, &view, false, &self.stream.global());
367                    },
368                }
369
370                // Return a promise resolved with undefined.
371                Promise::new_resolved(cx, &self.stream.global(), ())
372            },
373            ByteTeePullAlgorithm::Pull2Algorithm => {
374                // If reading is true,
375                if self.reading.get() {
376                    // Set readAgainForBranch2 to true.
377                    self.read_again_for_branch_2.set(true);
378
379                    // Return a promise resolved with undefined.
380                    return Promise::new_resolved(cx, &self.stream.global(), ());
381                }
382
383                // Set reading to true.
384                self.reading.set(true);
385
386                // Let byobRequest be ! ReadableByteStreamControllerGetBYOBRequest(branch2.[[controller]]).
387                let byob_branch_controller = self
388                    .branch_2
389                    .get()
390                    .expect("Branch 2 should be set.")
391                    .get_byte_controller();
392                let byob_request = byob_branch_controller
393                    .get_byob_request(cx)
394                    .expect("Byob request should be set.");
395
396                match byob_request {
397                    None => {
398                        self.pull_with_default_reader(cx, &self.stream.global())
399                            .expect("Pull with default reader should be successful.");
400                    },
401                    Some(request) => {
402                        // Otherwise, perform pullWithBYOBReader, given byobRequest.[[view]] and true.
403                        let view = request.get_view();
404
405                        self.pull_with_byob_reader(cx, &view, true, &self.stream.global());
406                    },
407                }
408
409                // Return a promise resolved with undefined.
410                Promise::new_resolved(cx, &self.stream.global(), ())
411            },
412        }
413    }
414
415    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee>
416    /// Let cancel1Algorithm be the following steps, taking a reason argument
417    /// and
418    /// Let cancel2Algorithm be the following steps, taking a reason argument
419    pub(crate) fn cancel_algorithm(
420        &self,
421        cx: &mut JSContext,
422        reason: SafeHandleValue,
423    ) -> Option<Result<Rc<Promise>, Error>> {
424        match self.tee_cancel_algorithm {
425            ByteTeeCancelAlgorithm::Cancel1Algorithm => {
426                // Set canceled1 to true.
427                self.canceled_1.set(true);
428
429                // Set reason1 to reason.
430                self.reason_1.set(reason.get());
431
432                // If canceled2 is true,
433                if self.canceled_2.get() {
434                    self.resolve_cancel_promise(cx);
435                }
436
437                // Return cancelPromise.
438                Some(Ok(self.cancel_promise.clone()))
439            },
440            ByteTeeCancelAlgorithm::Cancel2Algorithm => {
441                // Set canceled_2 to true.
442                self.canceled_2.set(true);
443
444                // Set reason_2 to reason.
445                self.reason_2.set(reason.get());
446
447                // If canceled_1 is true,
448                if self.canceled_1.get() {
449                    self.resolve_cancel_promise(cx);
450                }
451                // Return cancelPromise.
452                Some(Ok(self.cancel_promise.clone()))
453            },
454        }
455    }
456
457    #[expect(unsafe_code)]
458    fn resolve_cancel_promise(&self, cx: &mut JSContext) {
459        // Let compositeReason be ! CreateArrayFromList(« reason_1, reason_2 »).
460        rooted_vec!(let mut reasons_values);
461        reasons_values.push(self.reason_1.get());
462        reasons_values.push(self.reason_2.get());
463
464        let reasons_values_array = HandleValueArray::from(&reasons_values);
465        rooted!(&in(cx) let reasons = unsafe { NewArrayObject(cx.raw_cx(), &reasons_values_array) });
466        rooted!(&in(cx) let reasons_value = ObjectValue(reasons.get()));
467
468        // Let cancelResult be ! ReadableStreamCancel(stream, compositeReason).
469        let cancel_result = self
470            .stream
471            .cancel(cx, &self.stream.global(), reasons_value.handle());
472
473        // Resolve cancelPromise with cancelResult.
474        self.cancel_promise.resolve_native(cx, &cancel_result);
475    }
476}