Skip to main content

script/dom/stream/
byteteereadintorequest.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::context::JSContext;
10use js::jsval::UndefinedValue;
11use js::typedarray::ArrayBufferViewU8;
12use script_bindings::reflector::{Reflector, reflect_dom_object_with_cx};
13use script_bindings::trace::RootedTraceableBox;
14
15use super::byteteeunderlyingsource::ByteTeePullAlgorithm;
16use crate::dom::bindings::buffer_source::HeapBufferSource;
17use crate::dom::bindings::error::{ErrorToJsval, Fallible};
18use crate::dom::bindings::refcounted::Trusted;
19use crate::dom::bindings::reflector::DomGlobal;
20use crate::dom::bindings::root::{Dom, DomRoot};
21use crate::dom::globalscope::GlobalScope;
22use crate::dom::promise::Promise;
23use crate::dom::stream::byteteeunderlyingsource::ByteTeeUnderlyingSource;
24use crate::dom::stream::readablestream::ReadableStream;
25use crate::microtask::Microtask;
26
27#[derive(JSTraceable, MallocSizeOf)]
28pub(crate) struct ByteTeeReadIntoRequestMicrotask {
29    #[ignore_malloc_size_of = "mozjs"]
30    chunk: RootedTraceableBox<HeapBufferSource<ArrayBufferViewU8>>,
31    tee_read_request: Trusted<ByteTeeReadIntoRequest>,
32}
33
34impl ByteTeeReadIntoRequestMicrotask {
35    pub(crate) fn microtask_chunk_steps(&self, cx: &mut JSContext) {
36        self.tee_read_request
37            .root()
38            .chunk_steps(&self.chunk, cx)
39            .expect("Failed to enqueue chunk");
40    }
41}
42
43#[dom_struct]
44pub(crate) struct ByteTeeReadIntoRequest {
45    reflector_: Reflector,
46    for_branch2: bool,
47    byob_branch: Dom<ReadableStream>,
48    other_branch: Dom<ReadableStream>,
49    stream: Dom<ReadableStream>,
50    #[conditional_malloc_size_of]
51    read_again_for_branch_1: Rc<Cell<bool>>,
52    #[conditional_malloc_size_of]
53    read_again_for_branch_2: Rc<Cell<bool>>,
54    #[conditional_malloc_size_of]
55    reading: Rc<Cell<bool>>,
56    #[conditional_malloc_size_of]
57    canceled_1: Rc<Cell<bool>>,
58    #[conditional_malloc_size_of]
59    canceled_2: Rc<Cell<bool>>,
60    #[conditional_malloc_size_of]
61    cancel_promise: Rc<Promise>,
62    tee_underlying_source: Dom<ByteTeeUnderlyingSource>,
63}
64impl ByteTeeReadIntoRequest {
65    #[allow(clippy::too_many_arguments)]
66    pub(crate) fn new(
67        cx: &mut JSContext,
68        for_branch2: bool,
69        byob_branch: &ReadableStream,
70        other_branch: &ReadableStream,
71        stream: &ReadableStream,
72        read_again_for_branch_1: Rc<Cell<bool>>,
73        read_again_for_branch_2: Rc<Cell<bool>>,
74        reading: Rc<Cell<bool>>,
75        canceled_1: Rc<Cell<bool>>,
76        canceled_2: Rc<Cell<bool>>,
77        cancel_promise: Rc<Promise>,
78        tee_underlying_source: &ByteTeeUnderlyingSource,
79        global: &GlobalScope,
80    ) -> DomRoot<Self> {
81        reflect_dom_object_with_cx(
82            Box::new(ByteTeeReadIntoRequest {
83                reflector_: Reflector::new(),
84                for_branch2,
85                byob_branch: Dom::from_ref(byob_branch),
86                other_branch: Dom::from_ref(other_branch),
87                stream: Dom::from_ref(stream),
88                read_again_for_branch_1,
89                read_again_for_branch_2,
90                reading,
91                canceled_1,
92                canceled_2,
93                cancel_promise,
94                tee_underlying_source: Dom::from_ref(tee_underlying_source),
95            }),
96            global,
97            cx,
98        )
99    }
100
101    pub(crate) fn enqueue_chunk_steps(
102        &self,
103        cx: &mut JSContext,
104        chunk: RootedTraceableBox<HeapBufferSource<ArrayBufferViewU8>>,
105    ) {
106        // Queue a microtask to perform the following steps:
107        let byte_tee_read_request_chunk = ByteTeeReadIntoRequestMicrotask {
108            chunk,
109            tee_read_request: Trusted::new(self),
110        };
111
112        self.global().enqueue_microtask(
113            cx,
114            Microtask::ReadableStreamByteTeeReadIntoRequest(byte_tee_read_request_chunk),
115        );
116    }
117
118    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-chunk-steps%E2%91%A0>
119    #[allow(clippy::borrowed_box)]
120    pub(crate) fn chunk_steps(
121        &self,
122        chunk: &HeapBufferSource<ArrayBufferViewU8>,
123        cx: &mut JSContext,
124    ) -> Fallible<()> {
125        // Set readAgainForBranch1 to false.
126        self.read_again_for_branch_1.set(false);
127
128        // Set readAgainForBranch2 to false.
129        self.read_again_for_branch_2.set(false);
130
131        // Let byobCanceled be canceled2 if forBranch2 is true, and canceled1 otherwise.
132        let byob_canceled = if self.for_branch2 {
133            self.canceled_2.get()
134        } else {
135            self.canceled_1.get()
136        };
137
138        // Let otherCanceled be canceled2 if forBranch2 is false, and canceled1 otherwise.
139        let other_canceled = if self.for_branch2 {
140            self.canceled_1.get()
141        } else {
142            self.canceled_2.get()
143        };
144
145        // If otherCanceled is false,
146        if !other_canceled {
147            // Let cloneResult be CloneAsUint8Array(chunk).
148            let clone_result = chunk.clone_as_uint8_array(cx);
149
150            // If cloneResult is an abrupt completion,
151            if let Err(error) = clone_result {
152                rooted!(&in(cx) let mut error_value = UndefinedValue());
153                error.to_jsval(cx, &self.global(), error_value.handle_mut());
154
155                // Perform ! ReadableByteStreamControllerError(byobBranch.[[controller]], cloneResult.[[Value]]).
156                let byob_branch_controller = self.byob_branch.get_byte_controller();
157                byob_branch_controller.error(cx, error_value.handle());
158
159                // Perform ! ReadableByteStreamControllerError(otherBranch.[[controller]], cloneResult.[[Value]]).
160                let other_branch_controller = self.other_branch.get_byte_controller();
161                other_branch_controller.error(cx, error_value.handle());
162
163                // Resolve cancelPromise with ! ReadableStreamCancel(stream, cloneResult.[[Value]]).
164                let cancel_result =
165                    self.stream
166                        .cancel(cx, &self.stream.global(), error_value.handle());
167                self.cancel_promise.resolve_native(cx, &cancel_result);
168
169                // Return.
170                return Ok(());
171            } else {
172                // Otherwise, let clonedChunk be cloneResult.[[Value]].
173                let cloned_chunk = clone_result.unwrap();
174
175                // If byobCanceled is false, perform !
176                // ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk).
177                if !byob_canceled {
178                    let byob_branch_controller = self.byob_branch.get_byte_controller();
179                    byob_branch_controller.respond_with_new_view(cx, chunk)?;
180                }
181
182                // Perform ! ReadableByteStreamControllerEnqueue(otherBranch.[[controller]], clonedChunk).
183                let other_branch_controller = self.other_branch.get_byte_controller();
184                other_branch_controller.enqueue(cx, cloned_chunk)?;
185            }
186        } else if !byob_canceled {
187            // Otherwise, if byobCanceled is false, perform
188            // ! ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk).
189
190            let byob_branch_controller = self.byob_branch.get_byte_controller();
191            byob_branch_controller.respond_with_new_view(cx, chunk)?;
192        }
193
194        // Set reading to false.
195        self.reading.set(false);
196
197        // If readAgainForBranch1 is true, perform pull1Algorithm.
198        if self.read_again_for_branch_1.get() {
199            self.pull_algorithm(cx, Some(ByteTeePullAlgorithm::Pull1Algorithm));
200        } else if self.read_again_for_branch_2.get() {
201            // Otherwise, if readAgainForBranch2 is true, perform pull2Algorithm.
202            self.pull_algorithm(cx, Some(ByteTeePullAlgorithm::Pull2Algorithm));
203        }
204
205        Ok(())
206    }
207
208    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-close-steps%E2%91%A1>
209    pub(crate) fn close_steps(
210        &self,
211        cx: &mut JSContext,
212        chunk: Option<RootedTraceableBox<HeapBufferSource<ArrayBufferViewU8>>>,
213    ) -> Fallible<()> {
214        // Set reading to false.
215        self.reading.set(false);
216
217        // Let byobCanceled be canceled2 if forBranch2 is true, and canceled1 otherwise.
218        let byob_canceled = if self.for_branch2 {
219            self.canceled_2.get()
220        } else {
221            self.canceled_1.get()
222        };
223
224        // Let otherCanceled be canceled2 if forBranch2 is false, and canceled1 otherwise.
225        let other_canceled = if self.for_branch2 {
226            self.canceled_1.get()
227        } else {
228            self.canceled_2.get()
229        };
230
231        // If byobCanceled is false, perform ! ReadableByteStreamControllerClose(byobBranch.[[controller]]).
232        if !byob_canceled {
233            let byob_branch_controller = self.byob_branch.get_byte_controller();
234            byob_branch_controller.close(cx)?;
235        }
236
237        // If otherCanceled is false, perform ! ReadableByteStreamControllerClose(otherBranch.[[controller]]).
238        if !other_canceled {
239            let other_branch_controller = self.other_branch.get_byte_controller();
240            other_branch_controller.close(cx)?;
241        }
242
243        // If chunk is not undefined,
244        if let Some(chunk_value) = chunk {
245            if chunk_value.is_undefined() {
246                // Nothing to respond with if the provided chunk is undefined.
247                // Continue with the remaining close steps.
248            } else {
249                let chunk = chunk_value;
250                // Assert: chunk.[[ByteLength]] is 0.
251                assert_eq!(chunk.byte_length(), 0);
252
253                // If byobCanceled is false, perform !
254                // ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk).
255                if !byob_canceled {
256                    let byob_branch_controller = self.byob_branch.get_byte_controller();
257                    byob_branch_controller.respond_with_new_view(cx, &chunk)?;
258                }
259
260                // If otherCanceled is false and otherBranch.[[controller]].[[pendingPullIntos]] is not empty,
261                // perform ! ReadableByteStreamControllerRespond(otherBranch.[[controller]], 0).
262                if !other_canceled {
263                    let other_branch_controller = self.other_branch.get_byte_controller();
264                    if other_branch_controller.get_pending_pull_intos_size() > 0 {
265                        other_branch_controller.respond(cx, 0)?;
266                    }
267                }
268            }
269        }
270
271        // If byobCanceled is false or otherCanceled is false, resolve cancelPromise with undefined.
272        if !byob_canceled || !other_canceled {
273            self.cancel_promise.resolve_native(cx, &());
274        }
275
276        Ok(())
277    }
278    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-error-steps%E2%91%A0>
279    pub(crate) fn error_steps(&self) {
280        // Set reading to false.
281        self.reading.set(false);
282    }
283
284    pub(crate) fn pull_algorithm(
285        &self,
286        cx: &mut JSContext,
287        byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
288    ) {
289        self.tee_underlying_source
290            .pull_algorithm(cx, byte_tee_pull_algorithm);
291    }
292}