Skip to main content

script/dom/stream/
byteteereadrequest.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::context::JSContext;
10use js::jsapi::Heap;
11use js::jsval::{JSVal, UndefinedValue};
12use js::typedarray::ArrayBufferViewU8;
13use script_bindings::error::Fallible;
14use script_bindings::reflector::{Reflector, reflect_dom_object_with_cx};
15
16use super::byteteeunderlyingsource::ByteTeePullAlgorithm;
17use crate::dom::bindings::buffer_source::{BufferSource, HeapBufferSource};
18use crate::dom::bindings::error::{Error, ErrorToJsval};
19use crate::dom::bindings::reflector::DomGlobal;
20use crate::dom::bindings::root::{Dom, DomRoot};
21use crate::dom::bindings::trace::RootedTraceableBox;
22use crate::dom::globalscope::GlobalScope;
23use crate::dom::promise::Promise;
24use crate::dom::stream::byteteeunderlyingsource::ByteTeeUnderlyingSource;
25use crate::dom::stream::readablestream::ReadableStream;
26use crate::microtask::Microtask;
27
28#[derive(JSTraceable, MallocSizeOf)]
29#[cfg_attr(crown, expect(crown::unrooted_must_root))]
30pub(crate) struct ByteTeeReadRequestMicrotask {
31    #[ignore_malloc_size_of = "mozjs"]
32    chunk: Box<Heap<JSVal>>,
33    tee_read_request: Dom<ByteTeeReadRequest>,
34}
35
36impl ByteTeeReadRequestMicrotask {
37    pub(crate) fn microtask_chunk_steps(&self, cx: &mut JSContext) {
38        self.tee_read_request
39            .chunk_steps(&self.chunk, cx)
40            .expect("ByteTeeReadRequestMicrotask::microtask_chunk_steps failed");
41    }
42}
43
44#[dom_struct]
45/// <https://streams.spec.whatwg.org/#ref-for-read-request%E2%91%A2>
46pub(crate) struct ByteTeeReadRequest {
47    reflector_: Reflector,
48    branch_1: Dom<ReadableStream>,
49    branch_2: Dom<ReadableStream>,
50    stream: Dom<ReadableStream>,
51    #[conditional_malloc_size_of]
52    read_again_for_branch_1: Rc<Cell<bool>>,
53    #[conditional_malloc_size_of]
54    read_again_for_branch_2: Rc<Cell<bool>>,
55    #[conditional_malloc_size_of]
56    reading: Rc<Cell<bool>>,
57    #[conditional_malloc_size_of]
58    canceled_1: Rc<Cell<bool>>,
59    #[conditional_malloc_size_of]
60    canceled_2: Rc<Cell<bool>>,
61    #[conditional_malloc_size_of]
62    cancel_promise: Rc<Promise>,
63    tee_underlying_source: Dom<ByteTeeUnderlyingSource>,
64}
65impl ByteTeeReadRequest {
66    #[allow(clippy::too_many_arguments)]
67    pub(crate) fn new(
68        cx: &mut JSContext,
69        branch_1: &ReadableStream,
70        branch_2: &ReadableStream,
71        stream: &ReadableStream,
72        read_again_for_branch_1: Rc<Cell<bool>>,
73        read_again_for_branch_2: Rc<Cell<bool>>,
74        reading: Rc<Cell<bool>>,
75        canceled_1: Rc<Cell<bool>>,
76        canceled_2: Rc<Cell<bool>>,
77        cancel_promise: Rc<Promise>,
78        tee_underlying_source: &ByteTeeUnderlyingSource,
79        global: &GlobalScope,
80    ) -> DomRoot<Self> {
81        reflect_dom_object_with_cx(
82            Box::new(ByteTeeReadRequest {
83                reflector_: Reflector::new(),
84                branch_1: Dom::from_ref(branch_1),
85                branch_2: Dom::from_ref(branch_2),
86                stream: Dom::from_ref(stream),
87                read_again_for_branch_1,
88                read_again_for_branch_2,
89                reading,
90                canceled_1,
91                canceled_2,
92                cancel_promise,
93                tee_underlying_source: Dom::from_ref(tee_underlying_source),
94            }),
95            global,
96            cx,
97        )
98    }
99
100    /// Enqueue a microtask to perform the chunk steps
101    /// <https://streams.spec.whatwg.org/#ref-for-read-request-chunk-steps%E2%91%A2>
102    pub(crate) fn enqueue_chunk_steps(
103        &self,
104        cx: &mut JSContext,
105        global: &GlobalScope,
106        chunk: RootedTraceableBox<Heap<JSVal>>,
107    ) {
108        // Queue a microtask to perform the following steps:
109        let byte_tee_read_request_chunk = ByteTeeReadRequestMicrotask {
110            chunk: Heap::boxed(*chunk.handle()),
111            tee_read_request: Dom::from_ref(self),
112        };
113        global.enqueue_microtask(
114            cx,
115            Microtask::ReadableStreamByteTeeReadRequest(byte_tee_read_request_chunk),
116        );
117    }
118
119    /// <https://streams.spec.whatwg.org/#ref-for-read-request-chunk-steps%E2%91%A3>
120    #[allow(clippy::borrowed_box)]
121    pub(crate) fn chunk_steps(&self, chunk: &Box<Heap<JSVal>>, cx: &mut JSContext) -> Fallible<()> {
122        // Set readAgainForBranch1 to false.
123        self.read_again_for_branch_1.set(false);
124
125        // Set readAgainForBranch2 to false.
126        self.read_again_for_branch_2.set(false);
127
128        // Let chunk1 and chunk2 be chunk.
129        let chunk1 = chunk;
130        let chunk2 = chunk;
131
132        // Helper to surface clone failures exactly once
133        let handle_clone_error = |cx: &mut JSContext, error: Error| {
134            rooted!(&in(cx) let mut error_value = UndefinedValue());
135            error.to_jsval(cx, &self.global(), error_value.handle_mut());
136
137            let branch_1_controller = self.branch_1.get_byte_controller();
138            let branch_2_controller = self.branch_2.get_byte_controller();
139
140            branch_1_controller.error(cx, error_value.handle());
141            branch_2_controller.error(cx, error_value.handle());
142
143            let cancel_result = self
144                .stream
145                .cancel(cx, &self.stream.global(), error_value.handle());
146            self.cancel_promise.resolve_native(cx, &cancel_result);
147        };
148
149        // Prepare per branch chunks ahead of the spec enqueue steps.
150        let chunk1_view = if !self.canceled_1.get() {
151            Some(RootedTraceableBox::new(
152                HeapBufferSource::<ArrayBufferViewU8>::new(BufferSource::ArrayBufferView(
153                    Heap::boxed(chunk1.get().to_object()),
154                )),
155            ))
156        } else {
157            None
158        };
159
160        let mut chunk2_view = None;
161
162        // If canceled1 is false and canceled2 is false,
163        if !self.canceled_1.get() && !self.canceled_2.get() {
164            // Let cloneResult be CloneAsUint8Array(chunk).
165            let chunk2_source =
166                RootedTraceableBox::new(HeapBufferSource::<ArrayBufferViewU8>::new(
167                    BufferSource::ArrayBufferView(Heap::boxed(chunk2.get().to_object())),
168                ));
169            let clone_result = chunk2_source.clone_as_uint8_array(cx);
170
171            // If cloneResult is an abrupt completion,
172            if let Err(error) = clone_result {
173                handle_clone_error(cx, error);
174                return Ok(());
175            } else {
176                // Otherwise, set chunk2 to cloneResult.[[Value]].
177                chunk2_view = clone_result.ok();
178            }
179        } else if !self.canceled_2.get() {
180            // Only branch2 needs data; clone once for it.
181            let chunk2_source =
182                RootedTraceableBox::new(HeapBufferSource::<ArrayBufferViewU8>::new(
183                    BufferSource::ArrayBufferView(Heap::boxed(chunk2.get().to_object())),
184                ));
185            match chunk2_source.clone_as_uint8_array(cx) {
186                Ok(clone) => chunk2_view = Some(clone),
187                Err(error) => {
188                    handle_clone_error(cx, error);
189                    return Ok(());
190                },
191            }
192        }
193
194        // If canceled1 is false, perform ! ReadableByteStreamControllerEnqueue(branch1.[[controller]], chunk1).
195        if let Some(chunk1_view) = chunk1_view {
196            let branch_1_controller = self.branch_1.get_byte_controller();
197            branch_1_controller.enqueue(cx, chunk1_view)?;
198        }
199
200        // If canceled2 is false, perform ! ReadableByteStreamControllerEnqueue(branch2.[[controller]], chunk2).
201        if let Some(chunk2_view) = chunk2_view {
202            let branch_2_controller = self.branch_2.get_byte_controller();
203            branch_2_controller.enqueue(cx, chunk2_view)?;
204        }
205
206        // Set reading to false.
207        self.reading.set(false);
208
209        // If readAgainForBranch1 is true, perform pull1Algorithm.
210        if self.read_again_for_branch_1.get() {
211            self.pull_algorithm(cx, Some(ByteTeePullAlgorithm::Pull1Algorithm));
212        } else if self.read_again_for_branch_2.get() {
213            // Otherwise, if readAgainForBranch2 is true, perform pull2Algorithm.
214            self.pull_algorithm(cx, Some(ByteTeePullAlgorithm::Pull2Algorithm));
215        }
216
217        Ok(())
218    }
219
220    /// <https://streams.spec.whatwg.org/#ref-for-read-request-close-steps%E2%91%A2>
221    pub(crate) fn close_steps(&self, cx: &mut JSContext) -> Fallible<()> {
222        let branch_1_controller = self.branch_1.get_byte_controller();
223        let branch_2_controller = self.branch_2.get_byte_controller();
224
225        // Set reading to false.
226        self.reading.set(false);
227
228        // If canceled1 is false, perform ! ReadableByteStreamControllerClose(branch1.[[controller]]).
229        if !self.canceled_1.get() {
230            branch_1_controller.close(cx)?;
231        }
232
233        // If canceled2 is false, perform ! ReadableByteStreamControllerClose(branch2.[[controller]]).
234        if !self.canceled_2.get() {
235            branch_2_controller.close(cx)?;
236        }
237
238        // If branch1.[[controller]].[[pendingPullIntos]] is not empty,
239        // perform ! ReadableByteStreamControllerRespond(branch1.[[controller]], 0).
240        if branch_1_controller.get_pending_pull_intos_size() > 0 {
241            branch_1_controller.respond(cx, 0)?;
242        }
243
244        // If branch2.[[controller]].[[pendingPullIntos]] is not empty,
245        // perform ! ReadableByteStreamControllerRespond(branch2.[[controller]], 0).
246        if branch_2_controller.get_pending_pull_intos_size() > 0 {
247            branch_2_controller.respond(cx, 0)?;
248        }
249
250        // If canceled1 is false or canceled2 is false, resolve cancelPromise with undefined.
251        if !self.canceled_1.get() || !self.canceled_2.get() {
252            self.cancel_promise.resolve_native(cx, &());
253        }
254
255        Ok(())
256    }
257
258    /// <https://streams.spec.whatwg.org/#ref-for-read-request-error-steps%E2%91%A3>
259    pub(crate) fn error_steps(&self) {
260        // Set reading to false.
261        self.reading.set(false);
262    }
263
264    pub(crate) fn pull_algorithm(
265        &self,
266        cx: &mut JSContext,
267        byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
268    ) {
269        self.tee_underlying_source
270            .pull_algorithm(cx, byte_tee_pull_algorithm);
271    }
272}