script/dom/stream/
byteteereadrequest.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsapi::Heap;
10use js::jsval::{JSVal, UndefinedValue};
11use js::typedarray::ArrayBufferViewU8;
12use script_bindings::error::Fallible;
13
14use super::byteteeunderlyingsource::ByteTeePullAlgorithm;
15use crate::dom::bindings::buffer_source::{BufferSource, HeapBufferSource};
16use crate::dom::bindings::error::{Error, ErrorToJsval};
17use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
18use crate::dom::bindings::root::{Dom, DomRoot};
19use crate::dom::bindings::trace::RootedTraceableBox;
20use crate::dom::globalscope::GlobalScope;
21use crate::dom::promise::Promise;
22use crate::dom::stream::byteteeunderlyingsource::ByteTeeUnderlyingSource;
23use crate::dom::stream::readablestream::ReadableStream;
24use crate::microtask::Microtask;
25use crate::script_runtime::CanGc;
26
27#[derive(JSTraceable, MallocSizeOf)]
28#[cfg_attr(crown, expect(crown::unrooted_must_root))]
29pub(crate) struct ByteTeeReadRequestMicrotask {
30    #[ignore_malloc_size_of = "mozjs"]
31    chunk: Box<Heap<JSVal>>,
32    tee_read_request: Dom<ByteTeeReadRequest>,
33}
34
35impl ByteTeeReadRequestMicrotask {
36    pub(crate) fn microtask_chunk_steps(&self, cx: &mut js::context::JSContext) {
37        self.tee_read_request
38            .chunk_steps(&self.chunk, cx)
39            .expect("ByteTeeReadRequestMicrotask::microtask_chunk_steps failed");
40    }
41}
42
43#[dom_struct]
44/// <https://streams.spec.whatwg.org/#ref-for-read-request%E2%91%A2>
45pub(crate) struct ByteTeeReadRequest {
46    reflector_: Reflector,
47    branch_1: Dom<ReadableStream>,
48    branch_2: Dom<ReadableStream>,
49    stream: Dom<ReadableStream>,
50    #[conditional_malloc_size_of]
51    read_again_for_branch_1: Rc<Cell<bool>>,
52    #[conditional_malloc_size_of]
53    read_again_for_branch_2: Rc<Cell<bool>>,
54    #[conditional_malloc_size_of]
55    reading: Rc<Cell<bool>>,
56    #[conditional_malloc_size_of]
57    canceled_1: Rc<Cell<bool>>,
58    #[conditional_malloc_size_of]
59    canceled_2: Rc<Cell<bool>>,
60    #[conditional_malloc_size_of]
61    cancel_promise: Rc<Promise>,
62    tee_underlying_source: Dom<ByteTeeUnderlyingSource>,
63}
64impl ByteTeeReadRequest {
65    #[allow(clippy::too_many_arguments)]
66    pub(crate) fn new(
67        branch_1: &ReadableStream,
68        branch_2: &ReadableStream,
69        stream: &ReadableStream,
70        read_again_for_branch_1: Rc<Cell<bool>>,
71        read_again_for_branch_2: Rc<Cell<bool>>,
72        reading: Rc<Cell<bool>>,
73        canceled_1: Rc<Cell<bool>>,
74        canceled_2: Rc<Cell<bool>>,
75        cancel_promise: Rc<Promise>,
76        tee_underlying_source: &ByteTeeUnderlyingSource,
77        global: &GlobalScope,
78        can_gc: CanGc,
79    ) -> DomRoot<Self> {
80        reflect_dom_object(
81            Box::new(ByteTeeReadRequest {
82                reflector_: Reflector::new(),
83                branch_1: Dom::from_ref(branch_1),
84                branch_2: Dom::from_ref(branch_2),
85                stream: Dom::from_ref(stream),
86                read_again_for_branch_1,
87                read_again_for_branch_2,
88                reading,
89                canceled_1,
90                canceled_2,
91                cancel_promise,
92                tee_underlying_source: Dom::from_ref(tee_underlying_source),
93            }),
94            global,
95            can_gc,
96        )
97    }
98
99    /// Enqueue a microtask to perform the chunk steps
100    /// <https://streams.spec.whatwg.org/#ref-for-read-request-chunk-steps%E2%91%A2>
101    pub(crate) fn enqueue_chunk_steps(
102        &self,
103        global: &GlobalScope,
104        chunk: RootedTraceableBox<Heap<JSVal>>,
105    ) {
106        // Queue a microtask to perform the following steps:
107        let byte_tee_read_request_chunk = ByteTeeReadRequestMicrotask {
108            chunk: Heap::boxed(*chunk.handle()),
109            tee_read_request: Dom::from_ref(self),
110        };
111        global.enqueue_microtask(Microtask::ReadableStreamByteTeeReadRequest(
112            byte_tee_read_request_chunk,
113        ));
114    }
115
116    /// <https://streams.spec.whatwg.org/#ref-for-read-request-chunk-steps%E2%91%A3>
117    #[allow(clippy::borrowed_box)]
118    pub(crate) fn chunk_steps(
119        &self,
120        chunk: &Box<Heap<JSVal>>,
121        cx: &mut js::context::JSContext,
122    ) -> Fallible<()> {
123        // Set readAgainForBranch1 to false.
124        self.read_again_for_branch_1.set(false);
125
126        // Set readAgainForBranch2 to false.
127        self.read_again_for_branch_2.set(false);
128
129        // Let chunk1 and chunk2 be chunk.
130        let chunk1 = chunk;
131        let chunk2 = chunk;
132
133        // Helper to surface clone failures exactly once
134        let handle_clone_error = |cx: &mut js::context::JSContext, error: Error| {
135            rooted!(&in(cx) let mut error_value = UndefinedValue());
136            error.clone().to_jsval(
137                cx.into(),
138                &self.global(),
139                error_value.handle_mut(),
140                CanGc::from_cx(cx),
141            );
142
143            let branch_1_controller = self.branch_1.get_byte_controller();
144            let branch_2_controller = self.branch_2.get_byte_controller();
145
146            branch_1_controller.error(error_value.handle(), CanGc::from_cx(cx));
147            branch_2_controller.error(error_value.handle(), CanGc::from_cx(cx));
148
149            let cancel_result = self.stream.cancel(
150                cx.into(),
151                &self.stream.global(),
152                error_value.handle(),
153                CanGc::from_cx(cx),
154            );
155            self.cancel_promise
156                .resolve_native(&cancel_result, CanGc::from_cx(cx));
157        };
158
159        // Prepare per branch chunks ahead of the spec enqueue steps.
160        let chunk1_view = if !self.canceled_1.get() {
161            Some(HeapBufferSource::<ArrayBufferViewU8>::new(
162                BufferSource::ArrayBufferView(RootedTraceableBox::from_box(Heap::boxed(
163                    chunk1.get().to_object(),
164                ))),
165            ))
166        } else {
167            None
168        };
169
170        let mut chunk2_view = None;
171
172        // If canceled1 is false and canceled2 is false,
173        if !self.canceled_1.get() && !self.canceled_2.get() {
174            // Let cloneResult be CloneAsUint8Array(chunk).
175            let chunk2_source =
176                HeapBufferSource::<ArrayBufferViewU8>::new(BufferSource::ArrayBufferView(
177                    RootedTraceableBox::from_box(Heap::boxed(chunk2.get().to_object())),
178                ));
179            let clone_result = chunk2_source.clone_as_uint8_array(cx.into());
180
181            // If cloneResult is an abrupt completion,
182            if let Err(error) = clone_result {
183                handle_clone_error(cx, error);
184                return Ok(());
185            } else {
186                // Otherwise, set chunk2 to cloneResult.[[Value]].
187                chunk2_view = clone_result.ok();
188            }
189        } else if !self.canceled_2.get() {
190            // Only branch2 needs data; clone once for it.
191            let chunk2_source =
192                HeapBufferSource::<ArrayBufferViewU8>::new(BufferSource::ArrayBufferView(
193                    RootedTraceableBox::from_box(Heap::boxed(chunk2.get().to_object())),
194                ));
195            match chunk2_source.clone_as_uint8_array(cx.into()) {
196                Ok(clone) => chunk2_view = Some(clone),
197                Err(error) => {
198                    handle_clone_error(cx, error);
199                    return Ok(());
200                },
201            }
202        }
203
204        // If canceled1 is false, perform ! ReadableByteStreamControllerEnqueue(branch1.[[controller]], chunk1).
205        if let Some(chunk1_view) = chunk1_view {
206            let branch_1_controller = self.branch_1.get_byte_controller();
207            branch_1_controller.enqueue(cx.into(), chunk1_view, CanGc::from_cx(cx))?;
208        }
209
210        // If canceled2 is false, perform ! ReadableByteStreamControllerEnqueue(branch2.[[controller]], chunk2).
211        if let Some(chunk2_view) = chunk2_view {
212            let branch_2_controller = self.branch_2.get_byte_controller();
213            branch_2_controller.enqueue(cx.into(), chunk2_view, CanGc::from_cx(cx))?;
214        }
215
216        // Set reading to false.
217        self.reading.set(false);
218
219        // If readAgainForBranch1 is true, perform pull1Algorithm.
220        if self.read_again_for_branch_1.get() {
221            self.pull_algorithm(
222                Some(ByteTeePullAlgorithm::Pull1Algorithm),
223                CanGc::from_cx(cx),
224            );
225        } else if self.read_again_for_branch_2.get() {
226            // Otherwise, if readAgainForBranch2 is true, perform pull2Algorithm.
227            self.pull_algorithm(
228                Some(ByteTeePullAlgorithm::Pull2Algorithm),
229                CanGc::from_cx(cx),
230            );
231        }
232
233        Ok(())
234    }
235
236    /// <https://streams.spec.whatwg.org/#ref-for-read-request-close-steps%E2%91%A2>
237    pub(crate) fn close_steps(&self, can_gc: CanGc) -> Fallible<()> {
238        let cx = GlobalScope::get_cx();
239        let branch_1_controller = self.branch_1.get_byte_controller();
240        let branch_2_controller = self.branch_2.get_byte_controller();
241
242        // Set reading to false.
243        self.reading.set(false);
244
245        // If canceled1 is false, perform ! ReadableByteStreamControllerClose(branch1.[[controller]]).
246        if !self.canceled_1.get() {
247            branch_1_controller.close(cx, can_gc)?;
248        }
249
250        // If canceled2 is false, perform ! ReadableByteStreamControllerClose(branch2.[[controller]]).
251        if !self.canceled_2.get() {
252            branch_2_controller.close(cx, can_gc)?;
253        }
254
255        // If branch1.[[controller]].[[pendingPullIntos]] is not empty,
256        // perform ! ReadableByteStreamControllerRespond(branch1.[[controller]], 0).
257        if branch_1_controller.get_pending_pull_intos_size() > 0 {
258            branch_1_controller.respond(cx, 0, can_gc)?;
259        }
260
261        // If branch2.[[controller]].[[pendingPullIntos]] is not empty,
262        // perform ! ReadableByteStreamControllerRespond(branch2.[[controller]], 0).
263        if branch_2_controller.get_pending_pull_intos_size() > 0 {
264            branch_2_controller.respond(cx, 0, can_gc)?;
265        }
266
267        // If canceled1 is false or canceled2 is false, resolve cancelPromise with undefined.
268        if !self.canceled_1.get() || !self.canceled_2.get() {
269            self.cancel_promise.resolve_native(&(), can_gc);
270        }
271
272        Ok(())
273    }
274
275    /// <https://streams.spec.whatwg.org/#ref-for-read-request-error-steps%E2%91%A3>
276    pub(crate) fn error_steps(&self) {
277        // Set reading to false.
278        self.reading.set(false);
279    }
280
281    pub(crate) fn pull_algorithm(
282        &self,
283        byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
284        can_gc: CanGc,
285    ) {
286        self.tee_underlying_source
287            .pull_algorithm(byte_tee_pull_algorithm, can_gc);
288    }
289}