Skip to main content

script/dom/stream/
byteteereadintorequest.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::context::JSContext;
10use js::jsval::UndefinedValue;
11use js::typedarray::ArrayBufferViewU8;
12use script_bindings::reflector::{Reflector, reflect_dom_object};
13use script_bindings::trace::RootedTraceableBox;
14
15use super::byteteeunderlyingsource::ByteTeePullAlgorithm;
16use crate::dom::bindings::buffer_source::HeapBufferSource;
17use crate::dom::bindings::error::{ErrorToJsval, Fallible};
18use crate::dom::bindings::refcounted::Trusted;
19use crate::dom::bindings::reflector::DomGlobal;
20use crate::dom::bindings::root::{Dom, DomRoot};
21use crate::dom::globalscope::GlobalScope;
22use crate::dom::promise::Promise;
23use crate::dom::stream::byteteeunderlyingsource::ByteTeeUnderlyingSource;
24use crate::dom::stream::readablestream::ReadableStream;
25use crate::microtask::Microtask;
26use crate::script_runtime::CanGc;
27
28#[derive(JSTraceable, MallocSizeOf)]
29pub(crate) struct ByteTeeReadIntoRequestMicrotask {
30    #[ignore_malloc_size_of = "mozjs"]
31    chunk: RootedTraceableBox<HeapBufferSource<ArrayBufferViewU8>>,
32    tee_read_request: Trusted<ByteTeeReadIntoRequest>,
33}
34
35impl ByteTeeReadIntoRequestMicrotask {
36    pub(crate) fn microtask_chunk_steps(&self, cx: &mut JSContext) {
37        self.tee_read_request
38            .root()
39            .chunk_steps(&self.chunk, cx)
40            .expect("Failed to enqueue chunk");
41    }
42}
43
44#[dom_struct]
45pub(crate) struct ByteTeeReadIntoRequest {
46    reflector_: Reflector,
47    for_branch2: bool,
48    byob_branch: Dom<ReadableStream>,
49    other_branch: Dom<ReadableStream>,
50    stream: Dom<ReadableStream>,
51    #[conditional_malloc_size_of]
52    read_again_for_branch_1: Rc<Cell<bool>>,
53    #[conditional_malloc_size_of]
54    read_again_for_branch_2: Rc<Cell<bool>>,
55    #[conditional_malloc_size_of]
56    reading: Rc<Cell<bool>>,
57    #[conditional_malloc_size_of]
58    canceled_1: Rc<Cell<bool>>,
59    #[conditional_malloc_size_of]
60    canceled_2: Rc<Cell<bool>>,
61    #[conditional_malloc_size_of]
62    cancel_promise: Rc<Promise>,
63    tee_underlying_source: Dom<ByteTeeUnderlyingSource>,
64}
65impl ByteTeeReadIntoRequest {
66    #[allow(clippy::too_many_arguments)]
67    pub(crate) fn new(
68        for_branch2: bool,
69        byob_branch: &ReadableStream,
70        other_branch: &ReadableStream,
71        stream: &ReadableStream,
72        read_again_for_branch_1: Rc<Cell<bool>>,
73        read_again_for_branch_2: Rc<Cell<bool>>,
74        reading: Rc<Cell<bool>>,
75        canceled_1: Rc<Cell<bool>>,
76        canceled_2: Rc<Cell<bool>>,
77        cancel_promise: Rc<Promise>,
78        tee_underlying_source: &ByteTeeUnderlyingSource,
79        global: &GlobalScope,
80        can_gc: CanGc,
81    ) -> DomRoot<Self> {
82        reflect_dom_object(
83            Box::new(ByteTeeReadIntoRequest {
84                reflector_: Reflector::new(),
85                for_branch2,
86                byob_branch: Dom::from_ref(byob_branch),
87                other_branch: Dom::from_ref(other_branch),
88                stream: Dom::from_ref(stream),
89                read_again_for_branch_1,
90                read_again_for_branch_2,
91                reading,
92                canceled_1,
93                canceled_2,
94                cancel_promise,
95                tee_underlying_source: Dom::from_ref(tee_underlying_source),
96            }),
97            global,
98            can_gc,
99        )
100    }
101
102    pub(crate) fn enqueue_chunk_steps(
103        &self,
104        chunk: RootedTraceableBox<HeapBufferSource<ArrayBufferViewU8>>,
105    ) {
106        // Queue a microtask to perform the following steps:
107        let byte_tee_read_request_chunk = ByteTeeReadIntoRequestMicrotask {
108            chunk,
109            tee_read_request: Trusted::new(self),
110        };
111
112        self.global()
113            .enqueue_microtask(Microtask::ReadableStreamByteTeeReadIntoRequest(
114                byte_tee_read_request_chunk,
115            ));
116    }
117
118    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-chunk-steps%E2%91%A0>
119    #[allow(clippy::borrowed_box)]
120    pub(crate) fn chunk_steps(
121        &self,
122        chunk: &HeapBufferSource<ArrayBufferViewU8>,
123        cx: &mut JSContext,
124    ) -> Fallible<()> {
125        // Set readAgainForBranch1 to false.
126        self.read_again_for_branch_1.set(false);
127
128        // Set readAgainForBranch2 to false.
129        self.read_again_for_branch_2.set(false);
130
131        // Let byobCanceled be canceled2 if forBranch2 is true, and canceled1 otherwise.
132        let byob_canceled = if self.for_branch2 {
133            self.canceled_2.get()
134        } else {
135            self.canceled_1.get()
136        };
137
138        // Let otherCanceled be canceled2 if forBranch2 is false, and canceled1 otherwise.
139        let other_canceled = if self.for_branch2 {
140            self.canceled_1.get()
141        } else {
142            self.canceled_2.get()
143        };
144
145        // If otherCanceled is false,
146        if !other_canceled {
147            // Let cloneResult be CloneAsUint8Array(chunk).
148            let clone_result = chunk.clone_as_uint8_array(cx);
149
150            // If cloneResult is an abrupt completion,
151            if let Err(error) = clone_result {
152                rooted!(&in(cx) let mut error_value = UndefinedValue());
153                error.to_jsval(
154                    cx.into(),
155                    &self.global(),
156                    error_value.handle_mut(),
157                    CanGc::from_cx(cx),
158                );
159
160                // Perform ! ReadableByteStreamControllerError(byobBranch.[[controller]], cloneResult.[[Value]]).
161                let byob_branch_controller = self.byob_branch.get_byte_controller();
162                byob_branch_controller.error(cx, error_value.handle());
163
164                // Perform ! ReadableByteStreamControllerError(otherBranch.[[controller]], cloneResult.[[Value]]).
165                let other_branch_controller = self.other_branch.get_byte_controller();
166                other_branch_controller.error(cx, error_value.handle());
167
168                // Resolve cancelPromise with ! ReadableStreamCancel(stream, cloneResult.[[Value]]).
169                let cancel_result =
170                    self.stream
171                        .cancel(cx, &self.stream.global(), error_value.handle());
172                self.cancel_promise
173                    .resolve_native(&cancel_result, CanGc::from_cx(cx));
174
175                // Return.
176                return Ok(());
177            } else {
178                // Otherwise, let clonedChunk be cloneResult.[[Value]].
179                let cloned_chunk = clone_result.unwrap();
180
181                // If byobCanceled is false, perform !
182                // ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk).
183                if !byob_canceled {
184                    let byob_branch_controller = self.byob_branch.get_byte_controller();
185                    byob_branch_controller.respond_with_new_view(cx, chunk)?;
186                }
187
188                // Perform ! ReadableByteStreamControllerEnqueue(otherBranch.[[controller]], clonedChunk).
189                let other_branch_controller = self.other_branch.get_byte_controller();
190                other_branch_controller.enqueue(cx, cloned_chunk)?;
191            }
192        } else if !byob_canceled {
193            // Otherwise, if byobCanceled is false, perform
194            // ! ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk).
195
196            let byob_branch_controller = self.byob_branch.get_byte_controller();
197            byob_branch_controller.respond_with_new_view(cx, chunk)?;
198        }
199
200        // Set reading to false.
201        self.reading.set(false);
202
203        // If readAgainForBranch1 is true, perform pull1Algorithm.
204        if self.read_again_for_branch_1.get() {
205            self.pull_algorithm(cx, Some(ByteTeePullAlgorithm::Pull1Algorithm));
206        } else if self.read_again_for_branch_2.get() {
207            // Otherwise, if readAgainForBranch2 is true, perform pull2Algorithm.
208            self.pull_algorithm(cx, Some(ByteTeePullAlgorithm::Pull2Algorithm));
209        }
210
211        Ok(())
212    }
213
214    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-close-steps%E2%91%A1>
215    pub(crate) fn close_steps(
216        &self,
217        cx: &mut JSContext,
218        chunk: Option<RootedTraceableBox<HeapBufferSource<ArrayBufferViewU8>>>,
219    ) -> Fallible<()> {
220        // Set reading to false.
221        self.reading.set(false);
222
223        // Let byobCanceled be canceled2 if forBranch2 is true, and canceled1 otherwise.
224        let byob_canceled = if self.for_branch2 {
225            self.canceled_2.get()
226        } else {
227            self.canceled_1.get()
228        };
229
230        // Let otherCanceled be canceled2 if forBranch2 is false, and canceled1 otherwise.
231        let other_canceled = if self.for_branch2 {
232            self.canceled_1.get()
233        } else {
234            self.canceled_2.get()
235        };
236
237        // If byobCanceled is false, perform ! ReadableByteStreamControllerClose(byobBranch.[[controller]]).
238        if !byob_canceled {
239            let byob_branch_controller = self.byob_branch.get_byte_controller();
240            byob_branch_controller.close(cx)?;
241        }
242
243        // If otherCanceled is false, perform ! ReadableByteStreamControllerClose(otherBranch.[[controller]]).
244        if !other_canceled {
245            let other_branch_controller = self.other_branch.get_byte_controller();
246            other_branch_controller.close(cx)?;
247        }
248
249        // If chunk is not undefined,
250        if let Some(chunk_value) = chunk {
251            if chunk_value.is_undefined() {
252                // Nothing to respond with if the provided chunk is undefined.
253                // Continue with the remaining close steps.
254            } else {
255                let chunk = chunk_value;
256                // Assert: chunk.[[ByteLength]] is 0.
257                assert_eq!(chunk.byte_length(), 0);
258
259                // If byobCanceled is false, perform !
260                // ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk).
261                if !byob_canceled {
262                    let byob_branch_controller = self.byob_branch.get_byte_controller();
263                    byob_branch_controller.respond_with_new_view(cx, &chunk)?;
264                }
265
266                // If otherCanceled is false and otherBranch.[[controller]].[[pendingPullIntos]] is not empty,
267                // perform ! ReadableByteStreamControllerRespond(otherBranch.[[controller]], 0).
268                if !other_canceled {
269                    let other_branch_controller = self.other_branch.get_byte_controller();
270                    if other_branch_controller.get_pending_pull_intos_size() > 0 {
271                        other_branch_controller.respond(cx, 0)?;
272                    }
273                }
274            }
275        }
276
277        // If byobCanceled is false or otherCanceled is false, resolve cancelPromise with undefined.
278        if !byob_canceled || !other_canceled {
279            self.cancel_promise.resolve_native(&(), CanGc::from_cx(cx));
280        }
281
282        Ok(())
283    }
284    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-error-steps%E2%91%A0>
285    pub(crate) fn error_steps(&self) {
286        // Set reading to false.
287        self.reading.set(false);
288    }
289
290    pub(crate) fn pull_algorithm(
291        &self,
292        cx: &mut JSContext,
293        byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
294    ) {
295        self.tee_underlying_source
296            .pull_algorithm(cx, byte_tee_pull_algorithm);
297    }
298}