script/dom/stream/
byteteereadintorequest.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsval::UndefinedValue;
10use js::typedarray::ArrayBufferViewU8;
11
12use super::byteteeunderlyingsource::ByteTeePullAlgorithm;
13use crate::dom::bindings::buffer_source::HeapBufferSource;
14use crate::dom::bindings::error::{ErrorToJsval, Fallible};
15use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
16use crate::dom::bindings::root::{Dom, DomRoot};
17use crate::dom::globalscope::GlobalScope;
18use crate::dom::promise::Promise;
19use crate::dom::stream::byteteeunderlyingsource::ByteTeeUnderlyingSource;
20use crate::dom::stream::readablestream::ReadableStream;
21use crate::microtask::Microtask;
22use crate::script_runtime::CanGc;
23
24#[derive(JSTraceable, MallocSizeOf)]
25#[cfg_attr(crown, expect(crown::unrooted_must_root))]
26pub(crate) struct ByteTeeReadIntoRequestMicrotask {
27    #[ignore_malloc_size_of = "mozjs"]
28    chunk: HeapBufferSource<ArrayBufferViewU8>,
29    tee_read_request: Dom<ByteTeeReadIntoRequest>,
30}
31
32impl ByteTeeReadIntoRequestMicrotask {
33    pub(crate) fn microtask_chunk_steps(&self, cx: &mut js::context::JSContext) {
34        self.tee_read_request
35            .chunk_steps(self.chunk.clone(), cx)
36            .expect("Failed to enqueue chunk");
37    }
38}
39
40#[dom_struct]
41pub(crate) struct ByteTeeReadIntoRequest {
42    reflector_: Reflector,
43    for_branch2: bool,
44    byob_branch: Dom<ReadableStream>,
45    other_branch: Dom<ReadableStream>,
46    stream: Dom<ReadableStream>,
47    #[conditional_malloc_size_of]
48    read_again_for_branch_1: Rc<Cell<bool>>,
49    #[conditional_malloc_size_of]
50    read_again_for_branch_2: Rc<Cell<bool>>,
51    #[conditional_malloc_size_of]
52    reading: Rc<Cell<bool>>,
53    #[conditional_malloc_size_of]
54    canceled_1: Rc<Cell<bool>>,
55    #[conditional_malloc_size_of]
56    canceled_2: Rc<Cell<bool>>,
57    #[conditional_malloc_size_of]
58    cancel_promise: Rc<Promise>,
59    tee_underlying_source: Dom<ByteTeeUnderlyingSource>,
60}
61impl ByteTeeReadIntoRequest {
62    #[allow(clippy::too_many_arguments)]
63    pub(crate) fn new(
64        for_branch2: bool,
65        byob_branch: &ReadableStream,
66        other_branch: &ReadableStream,
67        stream: &ReadableStream,
68        read_again_for_branch_1: Rc<Cell<bool>>,
69        read_again_for_branch_2: Rc<Cell<bool>>,
70        reading: Rc<Cell<bool>>,
71        canceled_1: Rc<Cell<bool>>,
72        canceled_2: Rc<Cell<bool>>,
73        cancel_promise: Rc<Promise>,
74        tee_underlying_source: &ByteTeeUnderlyingSource,
75        global: &GlobalScope,
76        can_gc: CanGc,
77    ) -> DomRoot<Self> {
78        reflect_dom_object(
79            Box::new(ByteTeeReadIntoRequest {
80                reflector_: Reflector::new(),
81                for_branch2,
82                byob_branch: Dom::from_ref(byob_branch),
83                other_branch: Dom::from_ref(other_branch),
84                stream: Dom::from_ref(stream),
85                read_again_for_branch_1,
86                read_again_for_branch_2,
87                reading,
88                canceled_1,
89                canceled_2,
90                cancel_promise,
91                tee_underlying_source: Dom::from_ref(tee_underlying_source),
92            }),
93            global,
94            can_gc,
95        )
96    }
97
98    pub(crate) fn enqueue_chunk_steps(&self, chunk: HeapBufferSource<ArrayBufferViewU8>) {
99        // Queue a microtask to perform the following steps:
100        let byte_tee_read_request_chunk = ByteTeeReadIntoRequestMicrotask {
101            chunk,
102            tee_read_request: Dom::from_ref(self),
103        };
104
105        self.global()
106            .enqueue_microtask(Microtask::ReadableStreamByteTeeReadIntoRequest(
107                byte_tee_read_request_chunk,
108            ));
109    }
110
111    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-chunk-steps%E2%91%A0>
112    #[allow(clippy::borrowed_box)]
113    pub(crate) fn chunk_steps(
114        &self,
115        chunk: HeapBufferSource<ArrayBufferViewU8>,
116        cx: &mut js::context::JSContext,
117    ) -> Fallible<()> {
118        // Set readAgainForBranch1 to false.
119        self.read_again_for_branch_1.set(false);
120
121        // Set readAgainForBranch2 to false.
122        self.read_again_for_branch_2.set(false);
123
124        // Let byobCanceled be canceled2 if forBranch2 is true, and canceled1 otherwise.
125        let byob_canceled = if self.for_branch2 {
126            self.canceled_2.get()
127        } else {
128            self.canceled_1.get()
129        };
130
131        // Let otherCanceled be canceled2 if forBranch2 is false, and canceled1 otherwise.
132        let other_canceled = if self.for_branch2 {
133            self.canceled_1.get()
134        } else {
135            self.canceled_2.get()
136        };
137
138        // If otherCanceled is false,
139        if !other_canceled {
140            // Let cloneResult be CloneAsUint8Array(chunk).
141            let clone_result = chunk.clone_as_uint8_array(cx.into());
142
143            // If cloneResult is an abrupt completion,
144            if let Err(error) = clone_result {
145                rooted!(&in(cx) let mut error_value = UndefinedValue());
146                error.clone().to_jsval(
147                    cx.into(),
148                    &self.global(),
149                    error_value.handle_mut(),
150                    CanGc::from_cx(cx),
151                );
152
153                // Perform ! ReadableByteStreamControllerError(byobBranch.[[controller]], cloneResult.[[Value]]).
154                let byob_branch_controller = self.byob_branch.get_byte_controller();
155                byob_branch_controller.error(error_value.handle(), CanGc::from_cx(cx));
156
157                // Perform ! ReadableByteStreamControllerError(otherBranch.[[controller]], cloneResult.[[Value]]).
158                let other_branch_controller = self.other_branch.get_byte_controller();
159                other_branch_controller.error(error_value.handle(), CanGc::from_cx(cx));
160
161                // Resolve cancelPromise with ! ReadableStreamCancel(stream, cloneResult.[[Value]]).
162                let cancel_result = self.stream.cancel(
163                    cx.into(),
164                    &self.stream.global(),
165                    error_value.handle(),
166                    CanGc::from_cx(cx),
167                );
168                self.cancel_promise
169                    .resolve_native(&cancel_result, CanGc::from_cx(cx));
170
171                // Return.
172                return Ok(());
173            } else {
174                // Otherwise, let clonedChunk be cloneResult.[[Value]].
175                let cloned_chunk = clone_result.unwrap();
176
177                // If byobCanceled is false, perform !
178                // ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk).
179                if !byob_canceled {
180                    let byob_branch_controller = self.byob_branch.get_byte_controller();
181                    byob_branch_controller.respond_with_new_view(
182                        cx.into(),
183                        chunk,
184                        CanGc::from_cx(cx),
185                    )?;
186                }
187
188                // Perform ! ReadableByteStreamControllerEnqueue(otherBranch.[[controller]], clonedChunk).
189                let other_branch_controller = self.other_branch.get_byte_controller();
190                other_branch_controller.enqueue(cx.into(), cloned_chunk, CanGc::from_cx(cx))?;
191            }
192        } else if !byob_canceled {
193            // Otherwise, if byobCanceled is false, perform
194            // ! ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk).
195
196            let byob_branch_controller = self.byob_branch.get_byte_controller();
197            byob_branch_controller.respond_with_new_view(cx.into(), chunk, CanGc::from_cx(cx))?;
198        }
199
200        // Set reading to false.
201        self.reading.set(false);
202
203        // If readAgainForBranch1 is true, perform pull1Algorithm.
204        if self.read_again_for_branch_1.get() {
205            self.pull_algorithm(
206                Some(ByteTeePullAlgorithm::Pull1Algorithm),
207                CanGc::from_cx(cx),
208            );
209        } else if self.read_again_for_branch_2.get() {
210            // Otherwise, if readAgainForBranch2 is true, perform pull2Algorithm.
211            self.pull_algorithm(
212                Some(ByteTeePullAlgorithm::Pull2Algorithm),
213                CanGc::from_cx(cx),
214            );
215        }
216
217        Ok(())
218    }
219
220    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-close-steps%E2%91%A1>
221    pub(crate) fn close_steps(
222        &self,
223        chunk: Option<HeapBufferSource<ArrayBufferViewU8>>,
224        can_gc: CanGc,
225    ) -> Fallible<()> {
226        let cx = GlobalScope::get_cx();
227
228        // Set reading to false.
229        self.reading.set(false);
230
231        // Let byobCanceled be canceled2 if forBranch2 is true, and canceled1 otherwise.
232        let byob_canceled = if self.for_branch2 {
233            self.canceled_2.get()
234        } else {
235            self.canceled_1.get()
236        };
237
238        // Let otherCanceled be canceled2 if forBranch2 is false, and canceled1 otherwise.
239        let other_canceled = if self.for_branch2 {
240            self.canceled_1.get()
241        } else {
242            self.canceled_2.get()
243        };
244
245        // If byobCanceled is false, perform ! ReadableByteStreamControllerClose(byobBranch.[[controller]]).
246        if !byob_canceled {
247            let byob_branch_controller = self.byob_branch.get_byte_controller();
248            byob_branch_controller.close(cx, can_gc)?;
249        }
250
251        // If otherCanceled is false, perform ! ReadableByteStreamControllerClose(otherBranch.[[controller]]).
252        if !other_canceled {
253            let other_branch_controller = self.other_branch.get_byte_controller();
254            other_branch_controller.close(cx, can_gc)?;
255        }
256
257        // If chunk is not undefined,
258        if let Some(chunk_value) = chunk {
259            if chunk_value.is_undefined() {
260                // Nothing to respond with if the provided chunk is undefined.
261                // Continue with the remaining close steps.
262            } else {
263                let chunk = chunk_value;
264                // Assert: chunk.[[ByteLength]] is 0.
265                assert_eq!(chunk.byte_length(), 0);
266
267                // If byobCanceled is false, perform !
268                // ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk).
269                if !byob_canceled {
270                    let byob_branch_controller = self.byob_branch.get_byte_controller();
271                    byob_branch_controller.respond_with_new_view(cx, chunk, can_gc)?;
272                }
273
274                // If otherCanceled is false and otherBranch.[[controller]].[[pendingPullIntos]] is not empty,
275                // perform ! ReadableByteStreamControllerRespond(otherBranch.[[controller]], 0).
276                if !other_canceled {
277                    let other_branch_controller = self.other_branch.get_byte_controller();
278                    if other_branch_controller.get_pending_pull_intos_size() > 0 {
279                        other_branch_controller.respond(cx, 0, can_gc)?;
280                    }
281                }
282            }
283        }
284
285        // If byobCanceled is false or otherCanceled is false, resolve cancelPromise with undefined.
286        if !byob_canceled || !other_canceled {
287            self.cancel_promise.resolve_native(&(), can_gc);
288        }
289
290        Ok(())
291    }
292    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-error-steps%E2%91%A0>
293    pub(crate) fn error_steps(&self) {
294        // Set reading to false.
295        self.reading.set(false);
296    }
297
298    pub(crate) fn pull_algorithm(
299        &self,
300        byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
301        can_gc: CanGc,
302    ) {
303        self.tee_underlying_source
304            .pull_algorithm(byte_tee_pull_algorithm, can_gc);
305    }
306}