script/dom/
byteteereadrequest.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsapi::Heap;
10use js::jsval::{JSVal, UndefinedValue};
11use js::typedarray::ArrayBufferViewU8;
12use script_bindings::error::Fallible;
13
14use super::bindings::reflector::reflect_dom_object;
15use super::bindings::root::DomRoot;
16use super::byteteeunderlyingsource::ByteTeePullAlgorithm;
17use crate::dom::bindings::buffer_source::{BufferSource, HeapBufferSource};
18use crate::dom::bindings::error::{Error, ErrorToJsval};
19use crate::dom::bindings::reflector::{DomGlobal, Reflector};
20use crate::dom::bindings::root::Dom;
21use crate::dom::bindings::trace::RootedTraceableBox;
22use crate::dom::byteteeunderlyingsource::ByteTeeUnderlyingSource;
23use crate::dom::globalscope::GlobalScope;
24use crate::dom::promise::Promise;
25use crate::dom::readablestream::ReadableStream;
26use crate::microtask::Microtask;
27use crate::script_runtime::CanGc;
28
29#[derive(JSTraceable, MallocSizeOf)]
30#[cfg_attr(crown, expect(crown::unrooted_must_root))]
31pub(crate) struct ByteTeeReadRequestMicrotask {
32    #[ignore_malloc_size_of = "mozjs"]
33    chunk: Box<Heap<JSVal>>,
34    tee_read_request: Dom<ByteTeeReadRequest>,
35}
36
37impl ByteTeeReadRequestMicrotask {
38    pub(crate) fn microtask_chunk_steps(&self, can_gc: CanGc) {
39        self.tee_read_request
40            .chunk_steps(&self.chunk, can_gc)
41            .expect("ByteTeeReadRequestMicrotask::microtask_chunk_steps failed");
42    }
43}
44
45#[dom_struct]
46/// <https://streams.spec.whatwg.org/#ref-for-read-request%E2%91%A2>
47pub(crate) struct ByteTeeReadRequest {
48    reflector_: Reflector,
49    branch_1: Dom<ReadableStream>,
50    branch_2: Dom<ReadableStream>,
51    stream: Dom<ReadableStream>,
52    #[ignore_malloc_size_of = "Rc"]
53    read_again_for_branch_1: Rc<Cell<bool>>,
54    #[ignore_malloc_size_of = "Rc"]
55    read_again_for_branch_2: Rc<Cell<bool>>,
56    #[ignore_malloc_size_of = "Rc"]
57    reading: Rc<Cell<bool>>,
58    #[ignore_malloc_size_of = "Rc"]
59    canceled_1: Rc<Cell<bool>>,
60    #[ignore_malloc_size_of = "Rc"]
61    canceled_2: Rc<Cell<bool>>,
62    #[ignore_malloc_size_of = "Rc"]
63    cancel_promise: Rc<Promise>,
64    tee_underlying_source: Dom<ByteTeeUnderlyingSource>,
65}
66impl ByteTeeReadRequest {
67    #[allow(clippy::too_many_arguments)]
68    pub(crate) fn new(
69        branch_1: &ReadableStream,
70        branch_2: &ReadableStream,
71        stream: &ReadableStream,
72        read_again_for_branch_1: Rc<Cell<bool>>,
73        read_again_for_branch_2: Rc<Cell<bool>>,
74        reading: Rc<Cell<bool>>,
75        canceled_1: Rc<Cell<bool>>,
76        canceled_2: Rc<Cell<bool>>,
77        cancel_promise: Rc<Promise>,
78        tee_underlying_source: &ByteTeeUnderlyingSource,
79        global: &GlobalScope,
80        can_gc: CanGc,
81    ) -> DomRoot<Self> {
82        reflect_dom_object(
83            Box::new(ByteTeeReadRequest {
84                reflector_: Reflector::new(),
85                branch_1: Dom::from_ref(branch_1),
86                branch_2: Dom::from_ref(branch_2),
87                stream: Dom::from_ref(stream),
88                read_again_for_branch_1,
89                read_again_for_branch_2,
90                reading,
91                canceled_1,
92                canceled_2,
93                cancel_promise,
94                tee_underlying_source: Dom::from_ref(tee_underlying_source),
95            }),
96            global,
97            can_gc,
98        )
99    }
100
101    /// Enqueue a microtask to perform the chunk steps
102    /// <https://streams.spec.whatwg.org/#ref-for-read-request-chunk-steps%E2%91%A2>
103    pub(crate) fn enqueue_chunk_steps(
104        &self,
105        global: &GlobalScope,
106        chunk: RootedTraceableBox<Heap<JSVal>>,
107    ) {
108        // Queue a microtask to perform the following steps:
109        let byte_tee_read_request_chunk = ByteTeeReadRequestMicrotask {
110            chunk: Heap::boxed(*chunk.handle()),
111            tee_read_request: Dom::from_ref(self),
112        };
113        global.enqueue_microtask(Microtask::ReadableStreamByteTeeReadRequest(
114            byte_tee_read_request_chunk,
115        ));
116    }
117
118    /// <https://streams.spec.whatwg.org/#ref-for-read-request-chunk-steps%E2%91%A3>
119    #[allow(clippy::borrowed_box)]
120    pub(crate) fn chunk_steps(&self, chunk: &Box<Heap<JSVal>>, can_gc: CanGc) -> Fallible<()> {
121        let cx = GlobalScope::get_cx();
122
123        // Set readAgainForBranch1 to false.
124        self.read_again_for_branch_1.set(false);
125
126        // Set readAgainForBranch2 to false.
127        self.read_again_for_branch_2.set(false);
128
129        // Let chunk1 and chunk2 be chunk.
130        let chunk1 = chunk;
131        let chunk2 = chunk;
132
133        // Helper to surface clone failures exactly once
134        let handle_clone_error = |error: Error| {
135            rooted!(in(*cx) let mut error_value = UndefinedValue());
136            error
137                .clone()
138                .to_jsval(cx, &self.global(), error_value.handle_mut(), can_gc);
139
140            let branch_1_controller = self.branch_1.get_byte_controller();
141            let branch_2_controller = self.branch_2.get_byte_controller();
142
143            branch_1_controller.error(error_value.handle(), can_gc);
144            branch_2_controller.error(error_value.handle(), can_gc);
145
146            let cancel_result =
147                self.stream
148                    .cancel(cx, &self.stream.global(), error_value.handle(), can_gc);
149            self.cancel_promise.resolve_native(&cancel_result, can_gc);
150        };
151
152        // Prepare per branch chunks ahead of the spec enqueue steps.
153        let chunk1_view = if !self.canceled_1.get() {
154            Some(HeapBufferSource::<ArrayBufferViewU8>::new(
155                BufferSource::ArrayBufferView(RootedTraceableBox::from_box(Heap::boxed(
156                    chunk1.get().to_object(),
157                ))),
158            ))
159        } else {
160            None
161        };
162
163        let mut chunk2_view = None;
164
165        // If canceled1 is false and canceled2 is false,
166        if !self.canceled_1.get() && !self.canceled_2.get() {
167            // Let cloneResult be CloneAsUint8Array(chunk).
168            let chunk2_source =
169                HeapBufferSource::<ArrayBufferViewU8>::new(BufferSource::ArrayBufferView(
170                    RootedTraceableBox::from_box(Heap::boxed(chunk2.get().to_object())),
171                ));
172            let clone_result = chunk2_source.clone_as_uint8_array(cx);
173
174            // If cloneResult is an abrupt completion,
175            if let Err(error) = clone_result {
176                handle_clone_error(error);
177                return Ok(());
178            } else {
179                // Otherwise, set chunk2 to cloneResult.[[Value]].
180                chunk2_view = clone_result.ok();
181            }
182        } else if !self.canceled_2.get() {
183            // Only branch2 needs data; clone once for it.
184            let chunk2_source =
185                HeapBufferSource::<ArrayBufferViewU8>::new(BufferSource::ArrayBufferView(
186                    RootedTraceableBox::from_box(Heap::boxed(chunk2.get().to_object())),
187                ));
188            match chunk2_source.clone_as_uint8_array(cx) {
189                Ok(clone) => chunk2_view = Some(clone),
190                Err(error) => {
191                    handle_clone_error(error);
192                    return Ok(());
193                },
194            }
195        }
196
197        // If canceled1 is false, perform ! ReadableByteStreamControllerEnqueue(branch1.[[controller]], chunk1).
198        if let Some(chunk1_view) = chunk1_view {
199            let branch_1_controller = self.branch_1.get_byte_controller();
200            branch_1_controller.enqueue(cx, chunk1_view, can_gc)?;
201        }
202
203        // If canceled2 is false, perform ! ReadableByteStreamControllerEnqueue(branch2.[[controller]], chunk2).
204        if let Some(chunk2_view) = chunk2_view {
205            let branch_2_controller = self.branch_2.get_byte_controller();
206            branch_2_controller.enqueue(cx, chunk2_view, can_gc)?;
207        }
208
209        // Set reading to false.
210        self.reading.set(false);
211
212        // If readAgainForBranch1 is true, perform pull1Algorithm.
213        if self.read_again_for_branch_1.get() {
214            self.pull_algorithm(Some(ByteTeePullAlgorithm::Pull1Algorithm), can_gc);
215        } else if self.read_again_for_branch_2.get() {
216            // Otherwise, if readAgainForBranch2 is true, perform pull2Algorithm.
217            self.pull_algorithm(Some(ByteTeePullAlgorithm::Pull2Algorithm), can_gc);
218        }
219
220        Ok(())
221    }
222
223    /// <https://streams.spec.whatwg.org/#ref-for-read-request-close-steps%E2%91%A2>
224    pub(crate) fn close_steps(&self, can_gc: CanGc) -> Fallible<()> {
225        let cx = GlobalScope::get_cx();
226        let branch_1_controller = self.branch_1.get_byte_controller();
227        let branch_2_controller = self.branch_2.get_byte_controller();
228
229        // Set reading to false.
230        self.reading.set(false);
231
232        // If canceled1 is false, perform ! ReadableByteStreamControllerClose(branch1.[[controller]]).
233        if !self.canceled_1.get() {
234            branch_1_controller.close(cx, can_gc)?;
235        }
236
237        // If canceled2 is false, perform ! ReadableByteStreamControllerClose(branch2.[[controller]]).
238        if !self.canceled_2.get() {
239            branch_2_controller.close(cx, can_gc)?;
240        }
241
242        // If branch1.[[controller]].[[pendingPullIntos]] is not empty,
243        // perform ! ReadableByteStreamControllerRespond(branch1.[[controller]], 0).
244        if branch_1_controller.get_pending_pull_intos_size() > 0 {
245            branch_1_controller.respond(cx, 0, can_gc)?;
246        }
247
248        // If branch2.[[controller]].[[pendingPullIntos]] is not empty,
249        // perform ! ReadableByteStreamControllerRespond(branch2.[[controller]], 0).
250        if branch_2_controller.get_pending_pull_intos_size() > 0 {
251            branch_2_controller.respond(cx, 0, can_gc)?;
252        }
253
254        // If canceled1 is false or canceled2 is false, resolve cancelPromise with undefined.
255        if !self.canceled_1.get() || !self.canceled_2.get() {
256            self.cancel_promise.resolve_native(&(), can_gc);
257        }
258
259        Ok(())
260    }
261
262    /// <https://streams.spec.whatwg.org/#ref-for-read-request-error-steps%E2%91%A3>
263    pub(crate) fn error_steps(&self) {
264        // Set reading to false.
265        self.reading.set(false);
266    }
267
268    pub(crate) fn pull_algorithm(
269        &self,
270        byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
271        can_gc: CanGc,
272    ) {
273        self.tee_underlying_source
274            .pull_algorithm(byte_tee_pull_algorithm, can_gc);
275    }
276}