Skip to main content

script/dom/stream/
byteteereadrequest.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::context::JSContext;
10use js::jsapi::Heap;
11use js::jsval::{JSVal, UndefinedValue};
12use js::typedarray::ArrayBufferViewU8;
13use script_bindings::error::Fallible;
14use script_bindings::reflector::{Reflector, reflect_dom_object};
15
16use super::byteteeunderlyingsource::ByteTeePullAlgorithm;
17use crate::dom::bindings::buffer_source::{BufferSource, HeapBufferSource};
18use crate::dom::bindings::error::{Error, ErrorToJsval};
19use crate::dom::bindings::reflector::DomGlobal;
20use crate::dom::bindings::root::{Dom, DomRoot};
21use crate::dom::bindings::trace::RootedTraceableBox;
22use crate::dom::globalscope::GlobalScope;
23use crate::dom::promise::Promise;
24use crate::dom::stream::byteteeunderlyingsource::ByteTeeUnderlyingSource;
25use crate::dom::stream::readablestream::ReadableStream;
26use crate::microtask::Microtask;
27use crate::script_runtime::CanGc;
28
29#[derive(JSTraceable, MallocSizeOf)]
30#[cfg_attr(crown, expect(crown::unrooted_must_root))]
31pub(crate) struct ByteTeeReadRequestMicrotask {
32    #[ignore_malloc_size_of = "mozjs"]
33    chunk: Box<Heap<JSVal>>,
34    tee_read_request: Dom<ByteTeeReadRequest>,
35}
36
37impl ByteTeeReadRequestMicrotask {
38    pub(crate) fn microtask_chunk_steps(&self, cx: &mut JSContext) {
39        self.tee_read_request
40            .chunk_steps(&self.chunk, cx)
41            .expect("ByteTeeReadRequestMicrotask::microtask_chunk_steps failed");
42    }
43}
44
45#[dom_struct]
46/// <https://streams.spec.whatwg.org/#ref-for-read-request%E2%91%A2>
47pub(crate) struct ByteTeeReadRequest {
48    reflector_: Reflector,
49    branch_1: Dom<ReadableStream>,
50    branch_2: Dom<ReadableStream>,
51    stream: Dom<ReadableStream>,
52    #[conditional_malloc_size_of]
53    read_again_for_branch_1: Rc<Cell<bool>>,
54    #[conditional_malloc_size_of]
55    read_again_for_branch_2: Rc<Cell<bool>>,
56    #[conditional_malloc_size_of]
57    reading: Rc<Cell<bool>>,
58    #[conditional_malloc_size_of]
59    canceled_1: Rc<Cell<bool>>,
60    #[conditional_malloc_size_of]
61    canceled_2: Rc<Cell<bool>>,
62    #[conditional_malloc_size_of]
63    cancel_promise: Rc<Promise>,
64    tee_underlying_source: Dom<ByteTeeUnderlyingSource>,
65}
66impl ByteTeeReadRequest {
67    #[allow(clippy::too_many_arguments)]
68    pub(crate) fn new(
69        branch_1: &ReadableStream,
70        branch_2: &ReadableStream,
71        stream: &ReadableStream,
72        read_again_for_branch_1: Rc<Cell<bool>>,
73        read_again_for_branch_2: Rc<Cell<bool>>,
74        reading: Rc<Cell<bool>>,
75        canceled_1: Rc<Cell<bool>>,
76        canceled_2: Rc<Cell<bool>>,
77        cancel_promise: Rc<Promise>,
78        tee_underlying_source: &ByteTeeUnderlyingSource,
79        global: &GlobalScope,
80        can_gc: CanGc,
81    ) -> DomRoot<Self> {
82        reflect_dom_object(
83            Box::new(ByteTeeReadRequest {
84                reflector_: Reflector::new(),
85                branch_1: Dom::from_ref(branch_1),
86                branch_2: Dom::from_ref(branch_2),
87                stream: Dom::from_ref(stream),
88                read_again_for_branch_1,
89                read_again_for_branch_2,
90                reading,
91                canceled_1,
92                canceled_2,
93                cancel_promise,
94                tee_underlying_source: Dom::from_ref(tee_underlying_source),
95            }),
96            global,
97            can_gc,
98        )
99    }
100
101    /// Enqueue a microtask to perform the chunk steps
102    /// <https://streams.spec.whatwg.org/#ref-for-read-request-chunk-steps%E2%91%A2>
103    pub(crate) fn enqueue_chunk_steps(
104        &self,
105        global: &GlobalScope,
106        chunk: RootedTraceableBox<Heap<JSVal>>,
107    ) {
108        // Queue a microtask to perform the following steps:
109        let byte_tee_read_request_chunk = ByteTeeReadRequestMicrotask {
110            chunk: Heap::boxed(*chunk.handle()),
111            tee_read_request: Dom::from_ref(self),
112        };
113        global.enqueue_microtask(Microtask::ReadableStreamByteTeeReadRequest(
114            byte_tee_read_request_chunk,
115        ));
116    }
117
118    /// <https://streams.spec.whatwg.org/#ref-for-read-request-chunk-steps%E2%91%A3>
119    #[allow(clippy::borrowed_box)]
120    pub(crate) fn chunk_steps(&self, chunk: &Box<Heap<JSVal>>, cx: &mut JSContext) -> Fallible<()> {
121        // Set readAgainForBranch1 to false.
122        self.read_again_for_branch_1.set(false);
123
124        // Set readAgainForBranch2 to false.
125        self.read_again_for_branch_2.set(false);
126
127        // Let chunk1 and chunk2 be chunk.
128        let chunk1 = chunk;
129        let chunk2 = chunk;
130
131        // Helper to surface clone failures exactly once
132        let handle_clone_error = |cx: &mut JSContext, error: Error| {
133            rooted!(&in(cx) let mut error_value = UndefinedValue());
134            error.to_jsval(
135                cx.into(),
136                &self.global(),
137                error_value.handle_mut(),
138                CanGc::from_cx(cx),
139            );
140
141            let branch_1_controller = self.branch_1.get_byte_controller();
142            let branch_2_controller = self.branch_2.get_byte_controller();
143
144            branch_1_controller.error(cx, error_value.handle());
145            branch_2_controller.error(cx, error_value.handle());
146
147            let cancel_result = self
148                .stream
149                .cancel(cx, &self.stream.global(), error_value.handle());
150            self.cancel_promise
151                .resolve_native(&cancel_result, CanGc::from_cx(cx));
152        };
153
154        // Prepare per branch chunks ahead of the spec enqueue steps.
155        let chunk1_view = if !self.canceled_1.get() {
156            Some(RootedTraceableBox::new(
157                HeapBufferSource::<ArrayBufferViewU8>::new(BufferSource::ArrayBufferView(
158                    Heap::boxed(chunk1.get().to_object()),
159                )),
160            ))
161        } else {
162            None
163        };
164
165        let mut chunk2_view = None;
166
167        // If canceled1 is false and canceled2 is false,
168        if !self.canceled_1.get() && !self.canceled_2.get() {
169            // Let cloneResult be CloneAsUint8Array(chunk).
170            let chunk2_source =
171                RootedTraceableBox::new(HeapBufferSource::<ArrayBufferViewU8>::new(
172                    BufferSource::ArrayBufferView(Heap::boxed(chunk2.get().to_object())),
173                ));
174            let clone_result = chunk2_source.clone_as_uint8_array(cx);
175
176            // If cloneResult is an abrupt completion,
177            if let Err(error) = clone_result {
178                handle_clone_error(cx, error);
179                return Ok(());
180            } else {
181                // Otherwise, set chunk2 to cloneResult.[[Value]].
182                chunk2_view = clone_result.ok();
183            }
184        } else if !self.canceled_2.get() {
185            // Only branch2 needs data; clone once for it.
186            let chunk2_source =
187                RootedTraceableBox::new(HeapBufferSource::<ArrayBufferViewU8>::new(
188                    BufferSource::ArrayBufferView(Heap::boxed(chunk2.get().to_object())),
189                ));
190            match chunk2_source.clone_as_uint8_array(cx) {
191                Ok(clone) => chunk2_view = Some(clone),
192                Err(error) => {
193                    handle_clone_error(cx, error);
194                    return Ok(());
195                },
196            }
197        }
198
199        // If canceled1 is false, perform ! ReadableByteStreamControllerEnqueue(branch1.[[controller]], chunk1).
200        if let Some(chunk1_view) = chunk1_view {
201            let branch_1_controller = self.branch_1.get_byte_controller();
202            branch_1_controller.enqueue(cx, chunk1_view)?;
203        }
204
205        // If canceled2 is false, perform ! ReadableByteStreamControllerEnqueue(branch2.[[controller]], chunk2).
206        if let Some(chunk2_view) = chunk2_view {
207            let branch_2_controller = self.branch_2.get_byte_controller();
208            branch_2_controller.enqueue(cx, chunk2_view)?;
209        }
210
211        // Set reading to false.
212        self.reading.set(false);
213
214        // If readAgainForBranch1 is true, perform pull1Algorithm.
215        if self.read_again_for_branch_1.get() {
216            self.pull_algorithm(cx, Some(ByteTeePullAlgorithm::Pull1Algorithm));
217        } else if self.read_again_for_branch_2.get() {
218            // Otherwise, if readAgainForBranch2 is true, perform pull2Algorithm.
219            self.pull_algorithm(cx, Some(ByteTeePullAlgorithm::Pull2Algorithm));
220        }
221
222        Ok(())
223    }
224
225    /// <https://streams.spec.whatwg.org/#ref-for-read-request-close-steps%E2%91%A2>
226    pub(crate) fn close_steps(&self, cx: &mut JSContext) -> Fallible<()> {
227        let branch_1_controller = self.branch_1.get_byte_controller();
228        let branch_2_controller = self.branch_2.get_byte_controller();
229
230        // Set reading to false.
231        self.reading.set(false);
232
233        // If canceled1 is false, perform ! ReadableByteStreamControllerClose(branch1.[[controller]]).
234        if !self.canceled_1.get() {
235            branch_1_controller.close(cx)?;
236        }
237
238        // If canceled2 is false, perform ! ReadableByteStreamControllerClose(branch2.[[controller]]).
239        if !self.canceled_2.get() {
240            branch_2_controller.close(cx)?;
241        }
242
243        // If branch1.[[controller]].[[pendingPullIntos]] is not empty,
244        // perform ! ReadableByteStreamControllerRespond(branch1.[[controller]], 0).
245        if branch_1_controller.get_pending_pull_intos_size() > 0 {
246            branch_1_controller.respond(cx, 0)?;
247        }
248
249        // If branch2.[[controller]].[[pendingPullIntos]] is not empty,
250        // perform ! ReadableByteStreamControllerRespond(branch2.[[controller]], 0).
251        if branch_2_controller.get_pending_pull_intos_size() > 0 {
252            branch_2_controller.respond(cx, 0)?;
253        }
254
255        // If canceled1 is false or canceled2 is false, resolve cancelPromise with undefined.
256        if !self.canceled_1.get() || !self.canceled_2.get() {
257            self.cancel_promise.resolve_native(&(), CanGc::from_cx(cx));
258        }
259
260        Ok(())
261    }
262
263    /// <https://streams.spec.whatwg.org/#ref-for-read-request-error-steps%E2%91%A3>
264    pub(crate) fn error_steps(&self) {
265        // Set reading to false.
266        self.reading.set(false);
267    }
268
269    pub(crate) fn pull_algorithm(
270        &self,
271        cx: &mut JSContext,
272        byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
273    ) {
274        self.tee_underlying_source
275            .pull_algorithm(cx, byte_tee_pull_algorithm);
276    }
277}