script/dom/stream/
byteteereadintorequest.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsval::UndefinedValue;
10use js::typedarray::ArrayBufferViewU8;
11
12use super::byteteeunderlyingsource::ByteTeePullAlgorithm;
13use crate::dom::bindings::buffer_source::HeapBufferSource;
14use crate::dom::bindings::error::{ErrorToJsval, Fallible};
15use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
16use crate::dom::bindings::root::{Dom, DomRoot};
17use crate::dom::globalscope::GlobalScope;
18use crate::dom::promise::Promise;
19use crate::dom::stream::byteteeunderlyingsource::ByteTeeUnderlyingSource;
20use crate::dom::stream::readablestream::ReadableStream;
21use crate::microtask::Microtask;
22use crate::script_runtime::CanGc;
23
24#[derive(JSTraceable, MallocSizeOf)]
25#[cfg_attr(crown, expect(crown::unrooted_must_root))]
26pub(crate) struct ByteTeeReadIntoRequestMicrotask {
27    #[ignore_malloc_size_of = "mozjs"]
28    chunk: HeapBufferSource<ArrayBufferViewU8>,
29    tee_read_request: Dom<ByteTeeReadIntoRequest>,
30}
31
32impl ByteTeeReadIntoRequestMicrotask {
33    pub(crate) fn microtask_chunk_steps(&self, can_gc: CanGc) {
34        self.tee_read_request
35            .chunk_steps(self.chunk.clone(), can_gc)
36            .expect("Failed to enqueue chunk");
37    }
38}
39
40#[dom_struct]
41pub(crate) struct ByteTeeReadIntoRequest {
42    reflector_: Reflector,
43    for_branch2: bool,
44    byob_branch: Dom<ReadableStream>,
45    other_branch: Dom<ReadableStream>,
46    stream: Dom<ReadableStream>,
47    #[ignore_malloc_size_of = "Rc"]
48    read_again_for_branch_1: Rc<Cell<bool>>,
49    #[ignore_malloc_size_of = "Rc"]
50    read_again_for_branch_2: Rc<Cell<bool>>,
51    #[ignore_malloc_size_of = "Rc"]
52    reading: Rc<Cell<bool>>,
53    #[ignore_malloc_size_of = "Rc"]
54    canceled_1: Rc<Cell<bool>>,
55    #[ignore_malloc_size_of = "Rc"]
56    canceled_2: Rc<Cell<bool>>,
57    #[ignore_malloc_size_of = "Rc"]
58    cancel_promise: Rc<Promise>,
59    tee_underlying_source: Dom<ByteTeeUnderlyingSource>,
60}
61impl ByteTeeReadIntoRequest {
62    #[allow(clippy::too_many_arguments)]
63    pub(crate) fn new(
64        for_branch2: bool,
65        byob_branch: &ReadableStream,
66        other_branch: &ReadableStream,
67        stream: &ReadableStream,
68        read_again_for_branch_1: Rc<Cell<bool>>,
69        read_again_for_branch_2: Rc<Cell<bool>>,
70        reading: Rc<Cell<bool>>,
71        canceled_1: Rc<Cell<bool>>,
72        canceled_2: Rc<Cell<bool>>,
73        cancel_promise: Rc<Promise>,
74        tee_underlying_source: &ByteTeeUnderlyingSource,
75        global: &GlobalScope,
76        can_gc: CanGc,
77    ) -> DomRoot<Self> {
78        reflect_dom_object(
79            Box::new(ByteTeeReadIntoRequest {
80                reflector_: Reflector::new(),
81                for_branch2,
82                byob_branch: Dom::from_ref(byob_branch),
83                other_branch: Dom::from_ref(other_branch),
84                stream: Dom::from_ref(stream),
85                read_again_for_branch_1,
86                read_again_for_branch_2,
87                reading,
88                canceled_1,
89                canceled_2,
90                cancel_promise,
91                tee_underlying_source: Dom::from_ref(tee_underlying_source),
92            }),
93            global,
94            can_gc,
95        )
96    }
97
98    pub(crate) fn enqueue_chunk_steps(&self, chunk: HeapBufferSource<ArrayBufferViewU8>) {
99        // Queue a microtask to perform the following steps:
100        let byte_tee_read_request_chunk = ByteTeeReadIntoRequestMicrotask {
101            chunk,
102            tee_read_request: Dom::from_ref(self),
103        };
104
105        self.global()
106            .enqueue_microtask(Microtask::ReadableStreamByteTeeReadIntoRequest(
107                byte_tee_read_request_chunk,
108            ));
109    }
110
111    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-chunk-steps%E2%91%A0>
112    #[allow(clippy::borrowed_box)]
113    pub(crate) fn chunk_steps(
114        &self,
115        chunk: HeapBufferSource<ArrayBufferViewU8>,
116        can_gc: CanGc,
117    ) -> Fallible<()> {
118        let cx = GlobalScope::get_cx();
119
120        // Set readAgainForBranch1 to false.
121        self.read_again_for_branch_1.set(false);
122
123        // Set readAgainForBranch2 to false.
124        self.read_again_for_branch_2.set(false);
125
126        // Let byobCanceled be canceled2 if forBranch2 is true, and canceled1 otherwise.
127        let byob_canceled = if self.for_branch2 {
128            self.canceled_2.get()
129        } else {
130            self.canceled_1.get()
131        };
132
133        // Let otherCanceled be canceled2 if forBranch2 is false, and canceled1 otherwise.
134        let other_canceled = if self.for_branch2 {
135            self.canceled_1.get()
136        } else {
137            self.canceled_2.get()
138        };
139
140        // If otherCanceled is false,
141        if !other_canceled {
142            // Let cloneResult be CloneAsUint8Array(chunk).
143            let clone_result = chunk.clone_as_uint8_array(cx);
144
145            // If cloneResult is an abrupt completion,
146            if let Err(error) = clone_result {
147                rooted!(in(*cx) let mut error_value = UndefinedValue());
148                error
149                    .clone()
150                    .to_jsval(cx, &self.global(), error_value.handle_mut(), can_gc);
151
152                // Perform ! ReadableByteStreamControllerError(byobBranch.[[controller]], cloneResult.[[Value]]).
153                let byob_branch_controller = self.byob_branch.get_byte_controller();
154                byob_branch_controller.error(error_value.handle(), can_gc);
155
156                // Perform ! ReadableByteStreamControllerError(otherBranch.[[controller]], cloneResult.[[Value]]).
157                let other_branch_controller = self.other_branch.get_byte_controller();
158                other_branch_controller.error(error_value.handle(), can_gc);
159
160                // Resolve cancelPromise with ! ReadableStreamCancel(stream, cloneResult.[[Value]]).
161                let cancel_result =
162                    self.stream
163                        .cancel(cx, &self.stream.global(), error_value.handle(), can_gc);
164                self.cancel_promise.resolve_native(&cancel_result, can_gc);
165
166                // Return.
167                return Ok(());
168            } else {
169                // Otherwise, let clonedChunk be cloneResult.[[Value]].
170                let cloned_chunk = clone_result.unwrap();
171
172                // If byobCanceled is false, perform !
173                // ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk).
174                if !byob_canceled {
175                    let byob_branch_controller = self.byob_branch.get_byte_controller();
176                    byob_branch_controller.respond_with_new_view(cx, chunk, can_gc)?;
177                }
178
179                // Perform ! ReadableByteStreamControllerEnqueue(otherBranch.[[controller]], clonedChunk).
180                let other_branch_controller = self.other_branch.get_byte_controller();
181                other_branch_controller.enqueue(cx, cloned_chunk, can_gc)?;
182            }
183        } else if !byob_canceled {
184            // Otherwise, if byobCanceled is false, perform
185            // ! ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk).
186
187            let byob_branch_controller = self.byob_branch.get_byte_controller();
188            byob_branch_controller.respond_with_new_view(cx, chunk, can_gc)?;
189        }
190
191        // Set reading to false.
192        self.reading.set(false);
193
194        // If readAgainForBranch1 is true, perform pull1Algorithm.
195        if self.read_again_for_branch_1.get() {
196            self.pull_algorithm(Some(ByteTeePullAlgorithm::Pull1Algorithm), can_gc);
197        } else if self.read_again_for_branch_2.get() {
198            // Otherwise, if readAgainForBranch2 is true, perform pull2Algorithm.
199            self.pull_algorithm(Some(ByteTeePullAlgorithm::Pull2Algorithm), can_gc);
200        }
201
202        Ok(())
203    }
204
205    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-close-steps%E2%91%A1>
206    pub(crate) fn close_steps(
207        &self,
208        chunk: Option<HeapBufferSource<ArrayBufferViewU8>>,
209        can_gc: CanGc,
210    ) -> Fallible<()> {
211        let cx = GlobalScope::get_cx();
212
213        // Set reading to false.
214        self.reading.set(false);
215
216        // Let byobCanceled be canceled2 if forBranch2 is true, and canceled1 otherwise.
217        let byob_canceled = if self.for_branch2 {
218            self.canceled_2.get()
219        } else {
220            self.canceled_1.get()
221        };
222
223        // Let otherCanceled be canceled2 if forBranch2 is false, and canceled1 otherwise.
224        let other_canceled = if self.for_branch2 {
225            self.canceled_1.get()
226        } else {
227            self.canceled_2.get()
228        };
229
230        // If byobCanceled is false, perform ! ReadableByteStreamControllerClose(byobBranch.[[controller]]).
231        if !byob_canceled {
232            let byob_branch_controller = self.byob_branch.get_byte_controller();
233            byob_branch_controller.close(cx, can_gc)?;
234        }
235
236        // If otherCanceled is false, perform ! ReadableByteStreamControllerClose(otherBranch.[[controller]]).
237        if !other_canceled {
238            let other_branch_controller = self.other_branch.get_byte_controller();
239            other_branch_controller.close(cx, can_gc)?;
240        }
241
242        // If chunk is not undefined,
243        if let Some(chunk_value) = chunk {
244            if chunk_value.is_undefined() {
245                // Nothing to respond with if the provided chunk is undefined.
246                // Continue with the remaining close steps.
247            } else {
248                let chunk = chunk_value;
249                // Assert: chunk.[[ByteLength]] is 0.
250                assert_eq!(chunk.byte_length(), 0);
251
252                // If byobCanceled is false, perform !
253                // ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk).
254                if !byob_canceled {
255                    let byob_branch_controller = self.byob_branch.get_byte_controller();
256                    byob_branch_controller.respond_with_new_view(cx, chunk, can_gc)?;
257                }
258
259                // If otherCanceled is false and otherBranch.[[controller]].[[pendingPullIntos]] is not empty,
260                // perform ! ReadableByteStreamControllerRespond(otherBranch.[[controller]], 0).
261                if !other_canceled {
262                    let other_branch_controller = self.other_branch.get_byte_controller();
263                    if other_branch_controller.get_pending_pull_intos_size() > 0 {
264                        other_branch_controller.respond(cx, 0, can_gc)?;
265                    }
266                }
267            }
268        }
269
270        // If byobCanceled is false or otherCanceled is false, resolve cancelPromise with undefined.
271        if !byob_canceled || !other_canceled {
272            self.cancel_promise.resolve_native(&(), can_gc);
273        }
274
275        Ok(())
276    }
277    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-error-steps%E2%91%A0>
278    pub(crate) fn error_steps(&self) {
279        // Set reading to false.
280        self.reading.set(false);
281    }
282
283    pub(crate) fn pull_algorithm(
284        &self,
285        byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
286        can_gc: CanGc,
287    ) {
288        self.tee_underlying_source
289            .pull_algorithm(byte_tee_pull_algorithm, can_gc);
290    }
291}