script/dom/stream/
byteteereadrequest.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsapi::Heap;
10use js::jsval::{JSVal, UndefinedValue};
11use js::typedarray::ArrayBufferViewU8;
12use script_bindings::error::Fallible;
13
14use super::byteteeunderlyingsource::ByteTeePullAlgorithm;
15use crate::dom::bindings::buffer_source::{BufferSource, HeapBufferSource};
16use crate::dom::bindings::error::{Error, ErrorToJsval};
17use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
18use crate::dom::bindings::root::{Dom, DomRoot};
19use crate::dom::bindings::trace::RootedTraceableBox;
20use crate::dom::globalscope::GlobalScope;
21use crate::dom::promise::Promise;
22use crate::dom::stream::byteteeunderlyingsource::ByteTeeUnderlyingSource;
23use crate::dom::stream::readablestream::ReadableStream;
24use crate::microtask::Microtask;
25use crate::script_runtime::CanGc;
26
27#[derive(JSTraceable, MallocSizeOf)]
28#[cfg_attr(crown, expect(crown::unrooted_must_root))]
29pub(crate) struct ByteTeeReadRequestMicrotask {
30    #[ignore_malloc_size_of = "mozjs"]
31    chunk: Box<Heap<JSVal>>,
32    tee_read_request: Dom<ByteTeeReadRequest>,
33}
34
35impl ByteTeeReadRequestMicrotask {
36    pub(crate) fn microtask_chunk_steps(&self, can_gc: CanGc) {
37        self.tee_read_request
38            .chunk_steps(&self.chunk, can_gc)
39            .expect("ByteTeeReadRequestMicrotask::microtask_chunk_steps failed");
40    }
41}
42
43#[dom_struct]
44/// <https://streams.spec.whatwg.org/#ref-for-read-request%E2%91%A2>
45pub(crate) struct ByteTeeReadRequest {
46    reflector_: Reflector,
47    branch_1: Dom<ReadableStream>,
48    branch_2: Dom<ReadableStream>,
49    stream: Dom<ReadableStream>,
50    #[ignore_malloc_size_of = "Rc"]
51    read_again_for_branch_1: Rc<Cell<bool>>,
52    #[ignore_malloc_size_of = "Rc"]
53    read_again_for_branch_2: Rc<Cell<bool>>,
54    #[ignore_malloc_size_of = "Rc"]
55    reading: Rc<Cell<bool>>,
56    #[ignore_malloc_size_of = "Rc"]
57    canceled_1: Rc<Cell<bool>>,
58    #[ignore_malloc_size_of = "Rc"]
59    canceled_2: Rc<Cell<bool>>,
60    #[ignore_malloc_size_of = "Rc"]
61    cancel_promise: Rc<Promise>,
62    tee_underlying_source: Dom<ByteTeeUnderlyingSource>,
63}
64impl ByteTeeReadRequest {
65    #[allow(clippy::too_many_arguments)]
66    pub(crate) fn new(
67        branch_1: &ReadableStream,
68        branch_2: &ReadableStream,
69        stream: &ReadableStream,
70        read_again_for_branch_1: Rc<Cell<bool>>,
71        read_again_for_branch_2: Rc<Cell<bool>>,
72        reading: Rc<Cell<bool>>,
73        canceled_1: Rc<Cell<bool>>,
74        canceled_2: Rc<Cell<bool>>,
75        cancel_promise: Rc<Promise>,
76        tee_underlying_source: &ByteTeeUnderlyingSource,
77        global: &GlobalScope,
78        can_gc: CanGc,
79    ) -> DomRoot<Self> {
80        reflect_dom_object(
81            Box::new(ByteTeeReadRequest {
82                reflector_: Reflector::new(),
83                branch_1: Dom::from_ref(branch_1),
84                branch_2: Dom::from_ref(branch_2),
85                stream: Dom::from_ref(stream),
86                read_again_for_branch_1,
87                read_again_for_branch_2,
88                reading,
89                canceled_1,
90                canceled_2,
91                cancel_promise,
92                tee_underlying_source: Dom::from_ref(tee_underlying_source),
93            }),
94            global,
95            can_gc,
96        )
97    }
98
99    /// Enqueue a microtask to perform the chunk steps
100    /// <https://streams.spec.whatwg.org/#ref-for-read-request-chunk-steps%E2%91%A2>
101    pub(crate) fn enqueue_chunk_steps(
102        &self,
103        global: &GlobalScope,
104        chunk: RootedTraceableBox<Heap<JSVal>>,
105    ) {
106        // Queue a microtask to perform the following steps:
107        let byte_tee_read_request_chunk = ByteTeeReadRequestMicrotask {
108            chunk: Heap::boxed(*chunk.handle()),
109            tee_read_request: Dom::from_ref(self),
110        };
111        global.enqueue_microtask(Microtask::ReadableStreamByteTeeReadRequest(
112            byte_tee_read_request_chunk,
113        ));
114    }
115
116    /// <https://streams.spec.whatwg.org/#ref-for-read-request-chunk-steps%E2%91%A3>
117    #[allow(clippy::borrowed_box)]
118    pub(crate) fn chunk_steps(&self, chunk: &Box<Heap<JSVal>>, can_gc: CanGc) -> Fallible<()> {
119        let cx = GlobalScope::get_cx();
120
121        // Set readAgainForBranch1 to false.
122        self.read_again_for_branch_1.set(false);
123
124        // Set readAgainForBranch2 to false.
125        self.read_again_for_branch_2.set(false);
126
127        // Let chunk1 and chunk2 be chunk.
128        let chunk1 = chunk;
129        let chunk2 = chunk;
130
131        // Helper to surface clone failures exactly once
132        let handle_clone_error = |error: Error| {
133            rooted!(in(*cx) let mut error_value = UndefinedValue());
134            error
135                .clone()
136                .to_jsval(cx, &self.global(), error_value.handle_mut(), can_gc);
137
138            let branch_1_controller = self.branch_1.get_byte_controller();
139            let branch_2_controller = self.branch_2.get_byte_controller();
140
141            branch_1_controller.error(error_value.handle(), can_gc);
142            branch_2_controller.error(error_value.handle(), can_gc);
143
144            let cancel_result =
145                self.stream
146                    .cancel(cx, &self.stream.global(), error_value.handle(), can_gc);
147            self.cancel_promise.resolve_native(&cancel_result, can_gc);
148        };
149
150        // Prepare per branch chunks ahead of the spec enqueue steps.
151        let chunk1_view = if !self.canceled_1.get() {
152            Some(HeapBufferSource::<ArrayBufferViewU8>::new(
153                BufferSource::ArrayBufferView(RootedTraceableBox::from_box(Heap::boxed(
154                    chunk1.get().to_object(),
155                ))),
156            ))
157        } else {
158            None
159        };
160
161        let mut chunk2_view = None;
162
163        // If canceled1 is false and canceled2 is false,
164        if !self.canceled_1.get() && !self.canceled_2.get() {
165            // Let cloneResult be CloneAsUint8Array(chunk).
166            let chunk2_source =
167                HeapBufferSource::<ArrayBufferViewU8>::new(BufferSource::ArrayBufferView(
168                    RootedTraceableBox::from_box(Heap::boxed(chunk2.get().to_object())),
169                ));
170            let clone_result = chunk2_source.clone_as_uint8_array(cx);
171
172            // If cloneResult is an abrupt completion,
173            if let Err(error) = clone_result {
174                handle_clone_error(error);
175                return Ok(());
176            } else {
177                // Otherwise, set chunk2 to cloneResult.[[Value]].
178                chunk2_view = clone_result.ok();
179            }
180        } else if !self.canceled_2.get() {
181            // Only branch2 needs data; clone once for it.
182            let chunk2_source =
183                HeapBufferSource::<ArrayBufferViewU8>::new(BufferSource::ArrayBufferView(
184                    RootedTraceableBox::from_box(Heap::boxed(chunk2.get().to_object())),
185                ));
186            match chunk2_source.clone_as_uint8_array(cx) {
187                Ok(clone) => chunk2_view = Some(clone),
188                Err(error) => {
189                    handle_clone_error(error);
190                    return Ok(());
191                },
192            }
193        }
194
195        // If canceled1 is false, perform ! ReadableByteStreamControllerEnqueue(branch1.[[controller]], chunk1).
196        if let Some(chunk1_view) = chunk1_view {
197            let branch_1_controller = self.branch_1.get_byte_controller();
198            branch_1_controller.enqueue(cx, chunk1_view, can_gc)?;
199        }
200
201        // If canceled2 is false, perform ! ReadableByteStreamControllerEnqueue(branch2.[[controller]], chunk2).
202        if let Some(chunk2_view) = chunk2_view {
203            let branch_2_controller = self.branch_2.get_byte_controller();
204            branch_2_controller.enqueue(cx, chunk2_view, can_gc)?;
205        }
206
207        // Set reading to false.
208        self.reading.set(false);
209
210        // If readAgainForBranch1 is true, perform pull1Algorithm.
211        if self.read_again_for_branch_1.get() {
212            self.pull_algorithm(Some(ByteTeePullAlgorithm::Pull1Algorithm), can_gc);
213        } else if self.read_again_for_branch_2.get() {
214            // Otherwise, if readAgainForBranch2 is true, perform pull2Algorithm.
215            self.pull_algorithm(Some(ByteTeePullAlgorithm::Pull2Algorithm), can_gc);
216        }
217
218        Ok(())
219    }
220
221    /// <https://streams.spec.whatwg.org/#ref-for-read-request-close-steps%E2%91%A2>
222    pub(crate) fn close_steps(&self, can_gc: CanGc) -> Fallible<()> {
223        let cx = GlobalScope::get_cx();
224        let branch_1_controller = self.branch_1.get_byte_controller();
225        let branch_2_controller = self.branch_2.get_byte_controller();
226
227        // Set reading to false.
228        self.reading.set(false);
229
230        // If canceled1 is false, perform ! ReadableByteStreamControllerClose(branch1.[[controller]]).
231        if !self.canceled_1.get() {
232            branch_1_controller.close(cx, can_gc)?;
233        }
234
235        // If canceled2 is false, perform ! ReadableByteStreamControllerClose(branch2.[[controller]]).
236        if !self.canceled_2.get() {
237            branch_2_controller.close(cx, can_gc)?;
238        }
239
240        // If branch1.[[controller]].[[pendingPullIntos]] is not empty,
241        // perform ! ReadableByteStreamControllerRespond(branch1.[[controller]], 0).
242        if branch_1_controller.get_pending_pull_intos_size() > 0 {
243            branch_1_controller.respond(cx, 0, can_gc)?;
244        }
245
246        // If branch2.[[controller]].[[pendingPullIntos]] is not empty,
247        // perform ! ReadableByteStreamControllerRespond(branch2.[[controller]], 0).
248        if branch_2_controller.get_pending_pull_intos_size() > 0 {
249            branch_2_controller.respond(cx, 0, can_gc)?;
250        }
251
252        // If canceled1 is false or canceled2 is false, resolve cancelPromise with undefined.
253        if !self.canceled_1.get() || !self.canceled_2.get() {
254            self.cancel_promise.resolve_native(&(), can_gc);
255        }
256
257        Ok(())
258    }
259
260    /// <https://streams.spec.whatwg.org/#ref-for-read-request-error-steps%E2%91%A3>
261    pub(crate) fn error_steps(&self) {
262        // Set reading to false.
263        self.reading.set(false);
264    }
265
266    pub(crate) fn pull_algorithm(
267        &self,
268        byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
269        can_gc: CanGc,
270    ) {
271        self.tee_underlying_source
272            .pull_algorithm(byte_tee_pull_algorithm, can_gc);
273    }
274}