script/dom/stream/
byteteereadintorequest.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsval::UndefinedValue;
10use js::typedarray::ArrayBufferViewU8;
11
12use super::byteteeunderlyingsource::ByteTeePullAlgorithm;
13use crate::dom::bindings::buffer_source::HeapBufferSource;
14use crate::dom::bindings::error::{ErrorToJsval, Fallible};
15use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
16use crate::dom::bindings::root::{Dom, DomRoot};
17use crate::dom::globalscope::GlobalScope;
18use crate::dom::promise::Promise;
19use crate::dom::stream::byteteeunderlyingsource::ByteTeeUnderlyingSource;
20use crate::dom::stream::readablestream::ReadableStream;
21use crate::microtask::Microtask;
22use crate::script_runtime::CanGc;
23
24#[derive(JSTraceable, MallocSizeOf)]
25#[cfg_attr(crown, expect(crown::unrooted_must_root))]
26pub(crate) struct ByteTeeReadIntoRequestMicrotask {
27    #[ignore_malloc_size_of = "mozjs"]
28    chunk: HeapBufferSource<ArrayBufferViewU8>,
29    tee_read_request: Dom<ByteTeeReadIntoRequest>,
30}
31
32impl ByteTeeReadIntoRequestMicrotask {
33    pub(crate) fn microtask_chunk_steps(&self, cx: &mut js::context::JSContext) {
34        self.tee_read_request
35            .chunk_steps(self.chunk.clone(), cx)
36            .expect("Failed to enqueue chunk");
37    }
38}
39
40#[dom_struct]
41pub(crate) struct ByteTeeReadIntoRequest {
42    reflector_: Reflector,
43    for_branch2: bool,
44    byob_branch: Dom<ReadableStream>,
45    other_branch: Dom<ReadableStream>,
46    stream: Dom<ReadableStream>,
47    #[conditional_malloc_size_of]
48    read_again_for_branch_1: Rc<Cell<bool>>,
49    #[conditional_malloc_size_of]
50    read_again_for_branch_2: Rc<Cell<bool>>,
51    #[conditional_malloc_size_of]
52    reading: Rc<Cell<bool>>,
53    #[conditional_malloc_size_of]
54    canceled_1: Rc<Cell<bool>>,
55    #[conditional_malloc_size_of]
56    canceled_2: Rc<Cell<bool>>,
57    #[conditional_malloc_size_of]
58    cancel_promise: Rc<Promise>,
59    tee_underlying_source: Dom<ByteTeeUnderlyingSource>,
60}
61impl ByteTeeReadIntoRequest {
62    #[allow(clippy::too_many_arguments)]
63    pub(crate) fn new(
64        for_branch2: bool,
65        byob_branch: &ReadableStream,
66        other_branch: &ReadableStream,
67        stream: &ReadableStream,
68        read_again_for_branch_1: Rc<Cell<bool>>,
69        read_again_for_branch_2: Rc<Cell<bool>>,
70        reading: Rc<Cell<bool>>,
71        canceled_1: Rc<Cell<bool>>,
72        canceled_2: Rc<Cell<bool>>,
73        cancel_promise: Rc<Promise>,
74        tee_underlying_source: &ByteTeeUnderlyingSource,
75        global: &GlobalScope,
76        can_gc: CanGc,
77    ) -> DomRoot<Self> {
78        reflect_dom_object(
79            Box::new(ByteTeeReadIntoRequest {
80                reflector_: Reflector::new(),
81                for_branch2,
82                byob_branch: Dom::from_ref(byob_branch),
83                other_branch: Dom::from_ref(other_branch),
84                stream: Dom::from_ref(stream),
85                read_again_for_branch_1,
86                read_again_for_branch_2,
87                reading,
88                canceled_1,
89                canceled_2,
90                cancel_promise,
91                tee_underlying_source: Dom::from_ref(tee_underlying_source),
92            }),
93            global,
94            can_gc,
95        )
96    }
97
98    pub(crate) fn enqueue_chunk_steps(&self, chunk: HeapBufferSource<ArrayBufferViewU8>) {
99        // Queue a microtask to perform the following steps:
100        let byte_tee_read_request_chunk = ByteTeeReadIntoRequestMicrotask {
101            chunk,
102            tee_read_request: Dom::from_ref(self),
103        };
104
105        self.global()
106            .enqueue_microtask(Microtask::ReadableStreamByteTeeReadIntoRequest(
107                byte_tee_read_request_chunk,
108            ));
109    }
110
111    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-chunk-steps%E2%91%A0>
112    #[allow(clippy::borrowed_box)]
113    pub(crate) fn chunk_steps(
114        &self,
115        chunk: HeapBufferSource<ArrayBufferViewU8>,
116        cx: &mut js::context::JSContext,
117    ) -> Fallible<()> {
118        // Set readAgainForBranch1 to false.
119        self.read_again_for_branch_1.set(false);
120
121        // Set readAgainForBranch2 to false.
122        self.read_again_for_branch_2.set(false);
123
124        // Let byobCanceled be canceled2 if forBranch2 is true, and canceled1 otherwise.
125        let byob_canceled = if self.for_branch2 {
126            self.canceled_2.get()
127        } else {
128            self.canceled_1.get()
129        };
130
131        // Let otherCanceled be canceled2 if forBranch2 is false, and canceled1 otherwise.
132        let other_canceled = if self.for_branch2 {
133            self.canceled_1.get()
134        } else {
135            self.canceled_2.get()
136        };
137
138        // If otherCanceled is false,
139        if !other_canceled {
140            // Let cloneResult be CloneAsUint8Array(chunk).
141            let clone_result = chunk.clone_as_uint8_array(cx.into());
142
143            // If cloneResult is an abrupt completion,
144            if let Err(error) = clone_result {
145                rooted!(&in(cx) let mut error_value = UndefinedValue());
146                error.to_jsval(
147                    cx.into(),
148                    &self.global(),
149                    error_value.handle_mut(),
150                    CanGc::from_cx(cx),
151                );
152
153                // Perform ! ReadableByteStreamControllerError(byobBranch.[[controller]], cloneResult.[[Value]]).
154                let byob_branch_controller = self.byob_branch.get_byte_controller();
155                byob_branch_controller.error(error_value.handle(), CanGc::from_cx(cx));
156
157                // Perform ! ReadableByteStreamControllerError(otherBranch.[[controller]], cloneResult.[[Value]]).
158                let other_branch_controller = self.other_branch.get_byte_controller();
159                other_branch_controller.error(error_value.handle(), CanGc::from_cx(cx));
160
161                // Resolve cancelPromise with ! ReadableStreamCancel(stream, cloneResult.[[Value]]).
162                let cancel_result =
163                    self.stream
164                        .cancel(cx, &self.stream.global(), error_value.handle());
165                self.cancel_promise
166                    .resolve_native(&cancel_result, CanGc::from_cx(cx));
167
168                // Return.
169                return Ok(());
170            } else {
171                // Otherwise, let clonedChunk be cloneResult.[[Value]].
172                let cloned_chunk = clone_result.unwrap();
173
174                // If byobCanceled is false, perform !
175                // ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk).
176                if !byob_canceled {
177                    let byob_branch_controller = self.byob_branch.get_byte_controller();
178                    byob_branch_controller.respond_with_new_view(
179                        cx.into(),
180                        chunk,
181                        CanGc::from_cx(cx),
182                    )?;
183                }
184
185                // Perform ! ReadableByteStreamControllerEnqueue(otherBranch.[[controller]], clonedChunk).
186                let other_branch_controller = self.other_branch.get_byte_controller();
187                other_branch_controller.enqueue(cx, cloned_chunk)?;
188            }
189        } else if !byob_canceled {
190            // Otherwise, if byobCanceled is false, perform
191            // ! ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk).
192
193            let byob_branch_controller = self.byob_branch.get_byte_controller();
194            byob_branch_controller.respond_with_new_view(cx.into(), chunk, CanGc::from_cx(cx))?;
195        }
196
197        // Set reading to false.
198        self.reading.set(false);
199
200        // If readAgainForBranch1 is true, perform pull1Algorithm.
201        if self.read_again_for_branch_1.get() {
202            self.pull_algorithm(cx, Some(ByteTeePullAlgorithm::Pull1Algorithm));
203        } else if self.read_again_for_branch_2.get() {
204            // Otherwise, if readAgainForBranch2 is true, perform pull2Algorithm.
205            self.pull_algorithm(cx, Some(ByteTeePullAlgorithm::Pull2Algorithm));
206        }
207
208        Ok(())
209    }
210
211    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-close-steps%E2%91%A1>
212    pub(crate) fn close_steps(
213        &self,
214        chunk: Option<HeapBufferSource<ArrayBufferViewU8>>,
215        can_gc: CanGc,
216    ) -> Fallible<()> {
217        let cx = GlobalScope::get_cx();
218
219        // Set reading to false.
220        self.reading.set(false);
221
222        // Let byobCanceled be canceled2 if forBranch2 is true, and canceled1 otherwise.
223        let byob_canceled = if self.for_branch2 {
224            self.canceled_2.get()
225        } else {
226            self.canceled_1.get()
227        };
228
229        // Let otherCanceled be canceled2 if forBranch2 is false, and canceled1 otherwise.
230        let other_canceled = if self.for_branch2 {
231            self.canceled_1.get()
232        } else {
233            self.canceled_2.get()
234        };
235
236        // If byobCanceled is false, perform ! ReadableByteStreamControllerClose(byobBranch.[[controller]]).
237        if !byob_canceled {
238            let byob_branch_controller = self.byob_branch.get_byte_controller();
239            byob_branch_controller.close(cx, can_gc)?;
240        }
241
242        // If otherCanceled is false, perform ! ReadableByteStreamControllerClose(otherBranch.[[controller]]).
243        if !other_canceled {
244            let other_branch_controller = self.other_branch.get_byte_controller();
245            other_branch_controller.close(cx, can_gc)?;
246        }
247
248        // If chunk is not undefined,
249        if let Some(chunk_value) = chunk {
250            if chunk_value.is_undefined() {
251                // Nothing to respond with if the provided chunk is undefined.
252                // Continue with the remaining close steps.
253            } else {
254                let chunk = chunk_value;
255                // Assert: chunk.[[ByteLength]] is 0.
256                assert_eq!(chunk.byte_length(), 0);
257
258                // If byobCanceled is false, perform !
259                // ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk).
260                if !byob_canceled {
261                    let byob_branch_controller = self.byob_branch.get_byte_controller();
262                    byob_branch_controller.respond_with_new_view(cx, chunk, can_gc)?;
263                }
264
265                // If otherCanceled is false and otherBranch.[[controller]].[[pendingPullIntos]] is not empty,
266                // perform ! ReadableByteStreamControllerRespond(otherBranch.[[controller]], 0).
267                if !other_canceled {
268                    let other_branch_controller = self.other_branch.get_byte_controller();
269                    if other_branch_controller.get_pending_pull_intos_size() > 0 {
270                        other_branch_controller.respond(cx, 0, can_gc)?;
271                    }
272                }
273            }
274        }
275
276        // If byobCanceled is false or otherCanceled is false, resolve cancelPromise with undefined.
277        if !byob_canceled || !other_canceled {
278            self.cancel_promise.resolve_native(&(), can_gc);
279        }
280
281        Ok(())
282    }
283    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-error-steps%E2%91%A0>
284    pub(crate) fn error_steps(&self) {
285        // Set reading to false.
286        self.reading.set(false);
287    }
288
289    pub(crate) fn pull_algorithm(
290        &self,
291        cx: &mut js::context::JSContext,
292        byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
293    ) {
294        self.tee_underlying_source
295            .pull_algorithm(byte_tee_pull_algorithm, CanGc::from_cx(cx));
296    }
297}