script/dom/
byteteereadintorequest.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsval::UndefinedValue;
10use js::typedarray::ArrayBufferViewU8;
11
12use super::bindings::reflector::reflect_dom_object;
13use super::bindings::root::DomRoot;
14use super::byteteeunderlyingsource::ByteTeePullAlgorithm;
15use crate::dom::bindings::buffer_source::HeapBufferSource;
16use crate::dom::bindings::error::{ErrorToJsval, Fallible};
17use crate::dom::bindings::reflector::{DomGlobal, Reflector};
18use crate::dom::bindings::root::Dom;
19use crate::dom::byteteeunderlyingsource::ByteTeeUnderlyingSource;
20use crate::dom::globalscope::GlobalScope;
21use crate::dom::promise::Promise;
22use crate::dom::readablestream::ReadableStream;
23use crate::microtask::Microtask;
24use crate::script_runtime::CanGc;
25
26#[derive(JSTraceable, MallocSizeOf)]
27#[cfg_attr(crown, expect(crown::unrooted_must_root))]
28pub(crate) struct ByteTeeReadIntoRequestMicrotask {
29    #[ignore_malloc_size_of = "mozjs"]
30    chunk: HeapBufferSource<ArrayBufferViewU8>,
31    tee_read_request: Dom<ByteTeeReadIntoRequest>,
32}
33
34impl ByteTeeReadIntoRequestMicrotask {
35    pub(crate) fn microtask_chunk_steps(&self, can_gc: CanGc) {
36        self.tee_read_request
37            .chunk_steps(self.chunk.clone(), can_gc)
38            .expect("Failed to enqueue chunk");
39    }
40}
41
42#[dom_struct]
43pub(crate) struct ByteTeeReadIntoRequest {
44    reflector_: Reflector,
45    for_branch2: bool,
46    byob_branch: Dom<ReadableStream>,
47    other_branch: Dom<ReadableStream>,
48    stream: Dom<ReadableStream>,
49    #[ignore_malloc_size_of = "Rc"]
50    read_again_for_branch_1: Rc<Cell<bool>>,
51    #[ignore_malloc_size_of = "Rc"]
52    read_again_for_branch_2: Rc<Cell<bool>>,
53    #[ignore_malloc_size_of = "Rc"]
54    reading: Rc<Cell<bool>>,
55    #[ignore_malloc_size_of = "Rc"]
56    canceled_1: Rc<Cell<bool>>,
57    #[ignore_malloc_size_of = "Rc"]
58    canceled_2: Rc<Cell<bool>>,
59    #[ignore_malloc_size_of = "Rc"]
60    cancel_promise: Rc<Promise>,
61    tee_underlying_source: Dom<ByteTeeUnderlyingSource>,
62}
63impl ByteTeeReadIntoRequest {
64    #[allow(clippy::too_many_arguments)]
65    pub(crate) fn new(
66        for_branch2: bool,
67        byob_branch: &ReadableStream,
68        other_branch: &ReadableStream,
69        stream: &ReadableStream,
70        read_again_for_branch_1: Rc<Cell<bool>>,
71        read_again_for_branch_2: Rc<Cell<bool>>,
72        reading: Rc<Cell<bool>>,
73        canceled_1: Rc<Cell<bool>>,
74        canceled_2: Rc<Cell<bool>>,
75        cancel_promise: Rc<Promise>,
76        tee_underlying_source: &ByteTeeUnderlyingSource,
77        global: &GlobalScope,
78        can_gc: CanGc,
79    ) -> DomRoot<Self> {
80        reflect_dom_object(
81            Box::new(ByteTeeReadIntoRequest {
82                reflector_: Reflector::new(),
83                for_branch2,
84                byob_branch: Dom::from_ref(byob_branch),
85                other_branch: Dom::from_ref(other_branch),
86                stream: Dom::from_ref(stream),
87                read_again_for_branch_1,
88                read_again_for_branch_2,
89                reading,
90                canceled_1,
91                canceled_2,
92                cancel_promise,
93                tee_underlying_source: Dom::from_ref(tee_underlying_source),
94            }),
95            global,
96            can_gc,
97        )
98    }
99
100    pub(crate) fn enqueue_chunk_steps(&self, chunk: HeapBufferSource<ArrayBufferViewU8>) {
101        // Queue a microtask to perform the following steps:
102        let byte_tee_read_request_chunk = ByteTeeReadIntoRequestMicrotask {
103            chunk,
104            tee_read_request: Dom::from_ref(self),
105        };
106
107        self.global()
108            .enqueue_microtask(Microtask::ReadableStreamByteTeeReadIntoRequest(
109                byte_tee_read_request_chunk,
110            ));
111    }
112
113    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-chunk-steps%E2%91%A0>
114    #[allow(clippy::borrowed_box)]
115    pub(crate) fn chunk_steps(
116        &self,
117        chunk: HeapBufferSource<ArrayBufferViewU8>,
118        can_gc: CanGc,
119    ) -> Fallible<()> {
120        let cx = GlobalScope::get_cx();
121
122        // Set readAgainForBranch1 to false.
123        self.read_again_for_branch_1.set(false);
124
125        // Set readAgainForBranch2 to false.
126        self.read_again_for_branch_2.set(false);
127
128        // Let byobCanceled be canceled2 if forBranch2 is true, and canceled1 otherwise.
129        let byob_canceled = if self.for_branch2 {
130            self.canceled_2.get()
131        } else {
132            self.canceled_1.get()
133        };
134
135        // Let otherCanceled be canceled2 if forBranch2 is false, and canceled1 otherwise.
136        let other_canceled = if self.for_branch2 {
137            self.canceled_1.get()
138        } else {
139            self.canceled_2.get()
140        };
141
142        // If otherCanceled is false,
143        if !other_canceled {
144            // Let cloneResult be CloneAsUint8Array(chunk).
145            let clone_result = chunk.clone_as_uint8_array(cx);
146
147            // If cloneResult is an abrupt completion,
148            if let Err(error) = clone_result {
149                rooted!(in(*cx) let mut error_value = UndefinedValue());
150                error
151                    .clone()
152                    .to_jsval(cx, &self.global(), error_value.handle_mut(), can_gc);
153
154                // Perform ! ReadableByteStreamControllerError(byobBranch.[[controller]], cloneResult.[[Value]]).
155                let byob_branch_controller = self.byob_branch.get_byte_controller();
156                byob_branch_controller.error(error_value.handle(), can_gc);
157
158                // Perform ! ReadableByteStreamControllerError(otherBranch.[[controller]], cloneResult.[[Value]]).
159                let other_branch_controller = self.other_branch.get_byte_controller();
160                other_branch_controller.error(error_value.handle(), can_gc);
161
162                // Resolve cancelPromise with ! ReadableStreamCancel(stream, cloneResult.[[Value]]).
163                let cancel_result =
164                    self.stream
165                        .cancel(cx, &self.stream.global(), error_value.handle(), can_gc);
166                self.cancel_promise.resolve_native(&cancel_result, can_gc);
167
168                // Return.
169                return Ok(());
170            } else {
171                // Otherwise, let clonedChunk be cloneResult.[[Value]].
172                let cloned_chunk = clone_result.unwrap();
173
174                // If byobCanceled is false, perform !
175                // ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk).
176                if !byob_canceled {
177                    let byob_branch_controller = self.byob_branch.get_byte_controller();
178                    byob_branch_controller.respond_with_new_view(cx, chunk, can_gc)?;
179                }
180
181                // Perform ! ReadableByteStreamControllerEnqueue(otherBranch.[[controller]], clonedChunk).
182                let other_branch_controller = self.other_branch.get_byte_controller();
183                other_branch_controller.enqueue(cx, cloned_chunk, can_gc)?;
184            }
185        } else if !byob_canceled {
186            // Otherwise, if byobCanceled is false, perform
187            // ! ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk).
188
189            let byob_branch_controller = self.byob_branch.get_byte_controller();
190            byob_branch_controller.respond_with_new_view(cx, chunk, can_gc)?;
191        }
192
193        // Set reading to false.
194        self.reading.set(false);
195
196        // If readAgainForBranch1 is true, perform pull1Algorithm.
197        if self.read_again_for_branch_1.get() {
198            self.pull_algorithm(Some(ByteTeePullAlgorithm::Pull1Algorithm), can_gc);
199        } else if self.read_again_for_branch_2.get() {
200            // Otherwise, if readAgainForBranch2 is true, perform pull2Algorithm.
201            self.pull_algorithm(Some(ByteTeePullAlgorithm::Pull2Algorithm), can_gc);
202        }
203
204        Ok(())
205    }
206
207    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-close-steps%E2%91%A1>
208    pub(crate) fn close_steps(
209        &self,
210        chunk: Option<HeapBufferSource<ArrayBufferViewU8>>,
211        can_gc: CanGc,
212    ) -> Fallible<()> {
213        let cx = GlobalScope::get_cx();
214
215        // Set reading to false.
216        self.reading.set(false);
217
218        // Let byobCanceled be canceled2 if forBranch2 is true, and canceled1 otherwise.
219        let byob_canceled = if self.for_branch2 {
220            self.canceled_2.get()
221        } else {
222            self.canceled_1.get()
223        };
224
225        // Let otherCanceled be canceled2 if forBranch2 is false, and canceled1 otherwise.
226        let other_canceled = if self.for_branch2 {
227            self.canceled_1.get()
228        } else {
229            self.canceled_2.get()
230        };
231
232        // If byobCanceled is false, perform ! ReadableByteStreamControllerClose(byobBranch.[[controller]]).
233        if !byob_canceled {
234            let byob_branch_controller = self.byob_branch.get_byte_controller();
235            byob_branch_controller.close(cx, can_gc)?;
236        }
237
238        // If otherCanceled is false, perform ! ReadableByteStreamControllerClose(otherBranch.[[controller]]).
239        if !other_canceled {
240            let other_branch_controller = self.other_branch.get_byte_controller();
241            other_branch_controller.close(cx, can_gc)?;
242        }
243
244        // If chunk is not undefined,
245        if let Some(chunk_value) = chunk {
246            if chunk_value.is_undefined() {
247                // Nothing to respond with if the provided chunk is undefined.
248                // Continue with the remaining close steps.
249            } else {
250                let chunk = chunk_value;
251                // Assert: chunk.[[ByteLength]] is 0.
252                assert_eq!(chunk.byte_length(), 0);
253
254                // If byobCanceled is false, perform !
255                // ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk).
256                if !byob_canceled {
257                    let byob_branch_controller = self.byob_branch.get_byte_controller();
258                    byob_branch_controller.respond_with_new_view(cx, chunk, can_gc)?;
259                }
260
261                // If otherCanceled is false and otherBranch.[[controller]].[[pendingPullIntos]] is not empty,
262                // perform ! ReadableByteStreamControllerRespond(otherBranch.[[controller]], 0).
263                if !other_canceled {
264                    let other_branch_controller = self.other_branch.get_byte_controller();
265                    if other_branch_controller.get_pending_pull_intos_size() > 0 {
266                        other_branch_controller.respond(cx, 0, can_gc)?;
267                    }
268                }
269            }
270        }
271
272        // If byobCanceled is false or otherCanceled is false, resolve cancelPromise with undefined.
273        if !byob_canceled || !other_canceled {
274            self.cancel_promise.resolve_native(&(), can_gc);
275        }
276
277        Ok(())
278    }
279    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-error-steps%E2%91%A0>
280    pub(crate) fn error_steps(&self) {
281        // Set reading to false.
282        self.reading.set(false);
283    }
284
285    pub(crate) fn pull_algorithm(
286        &self,
287        byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
288        can_gc: CanGc,
289    ) {
290        self.tee_underlying_source
291            .pull_algorithm(byte_tee_pull_algorithm, can_gc);
292    }
293}