Skip to main content

script/dom/stream/
defaultteereadrequest.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::context::JSContext;
10use js::jsapi::Heap;
11use js::jsval::{JSVal, UndefinedValue};
12use js::realm::AutoRealm;
13use js::rust::HandleValue as SafeHandleValue;
14use script_bindings::reflector::{Reflector, reflect_dom_object_with_cx};
15
16use crate::dom::bindings::error::ErrorToJsval;
17use crate::dom::bindings::reflector::DomGlobal;
18use crate::dom::bindings::root::{Dom, DomRoot};
19use crate::dom::bindings::structuredclone;
20use crate::dom::bindings::trace::RootedTraceableBox;
21use crate::dom::globalscope::GlobalScope;
22use crate::dom::promise::Promise;
23use crate::dom::stream::defaultteeunderlyingsource::DefaultTeeUnderlyingSource;
24use crate::dom::stream::readablestream::ReadableStream;
25use crate::microtask::{Microtask, MicrotaskRunnable};
26use crate::realms::enter_auto_realm;
27
28#[derive(JSTraceable, MallocSizeOf)]
29#[cfg_attr(crown, expect(crown::unrooted_must_root))]
30pub(crate) struct DefaultTeeReadRequestMicrotask {
31    #[ignore_malloc_size_of = "mozjs"]
32    chunk: Box<Heap<JSVal>>,
33    tee_read_request: Dom<DefaultTeeReadRequest>,
34}
35
36impl MicrotaskRunnable for DefaultTeeReadRequestMicrotask {
37    fn handler(&self, cx: &mut JSContext) {
38        self.tee_read_request.chunk_steps(cx, &self.chunk);
39    }
40
41    fn enter_realm<'cx>(&self, cx: &'cx mut js::context::JSContext) -> AutoRealm<'cx> {
42        enter_auto_realm(cx, &*self.tee_read_request)
43    }
44}
45
46#[dom_struct]
47/// <https://streams.spec.whatwg.org/#ref-for-read-request%E2%91%A2>
48pub(crate) struct DefaultTeeReadRequest {
49    reflector_: Reflector,
50    stream: Dom<ReadableStream>,
51    branch_1: Dom<ReadableStream>,
52    branch_2: Dom<ReadableStream>,
53    #[conditional_malloc_size_of]
54    reading: Rc<Cell<bool>>,
55    #[conditional_malloc_size_of]
56    read_again: Rc<Cell<bool>>,
57    #[conditional_malloc_size_of]
58    canceled_1: Rc<Cell<bool>>,
59    #[conditional_malloc_size_of]
60    canceled_2: Rc<Cell<bool>>,
61    #[conditional_malloc_size_of]
62    clone_for_branch_2: Rc<Cell<bool>>,
63    #[conditional_malloc_size_of]
64    cancel_promise: Rc<Promise>,
65    tee_underlying_source: Dom<DefaultTeeUnderlyingSource>,
66}
67impl DefaultTeeReadRequest {
68    #[expect(clippy::too_many_arguments)]
69    pub(crate) fn new(
70        cx: &mut JSContext,
71        stream: &ReadableStream,
72        branch_1: &ReadableStream,
73        branch_2: &ReadableStream,
74        reading: Rc<Cell<bool>>,
75        read_again: Rc<Cell<bool>>,
76        canceled_1: Rc<Cell<bool>>,
77        canceled_2: Rc<Cell<bool>>,
78        clone_for_branch_2: Rc<Cell<bool>>,
79        cancel_promise: Rc<Promise>,
80        tee_underlying_source: &DefaultTeeUnderlyingSource,
81    ) -> DomRoot<Self> {
82        reflect_dom_object_with_cx(
83            Box::new(DefaultTeeReadRequest {
84                reflector_: Reflector::new(),
85                stream: Dom::from_ref(stream),
86                branch_1: Dom::from_ref(branch_1),
87                branch_2: Dom::from_ref(branch_2),
88                reading,
89                read_again,
90                canceled_1,
91                canceled_2,
92                clone_for_branch_2,
93                cancel_promise,
94                tee_underlying_source: Dom::from_ref(tee_underlying_source),
95            }),
96            &*stream.global(),
97            cx,
98        )
99    }
100    /// Call into cancel of the stream,
101    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>
102    pub(crate) fn stream_cancel(
103        &self,
104        cx: &mut JSContext,
105        global: &GlobalScope,
106        reason: SafeHandleValue,
107    ) {
108        self.stream.cancel(cx, global, reason);
109    }
110    /// Enqueue a microtask to perform the chunk steps
111    /// <https://streams.spec.whatwg.org/#ref-for-read-request-chunk-steps%E2%91%A2>
112    pub(crate) fn enqueue_chunk_steps(
113        &self,
114        cx: &mut JSContext,
115        chunk: RootedTraceableBox<Heap<JSVal>>,
116    ) {
117        // Queue a microtask to perform the following steps:
118        let tee_read_request_chunk = DefaultTeeReadRequestMicrotask {
119            chunk: Heap::boxed(*chunk.handle()),
120            tee_read_request: Dom::from_ref(self),
121        };
122        self.stream.global().enqueue_microtask(
123            cx,
124            Microtask::ReadableStreamTeeReadRequest(tee_read_request_chunk),
125        );
126    }
127    /// <https://streams.spec.whatwg.org/#ref-for-read-request-chunk-steps%E2%91%A2>
128    #[expect(clippy::borrowed_box)]
129    pub(crate) fn chunk_steps(&self, cx: &mut JSContext, chunk: &Box<Heap<JSVal>>) {
130        let global = &self.stream.global();
131        // Set readAgain to false.
132        self.read_again.set(false);
133        // Let chunk1 and chunk2 be chunk.
134        rooted!(&in(cx) let chunk1_value = chunk.get());
135        rooted!(&in(cx) let mut chunk2_value = chunk.get());
136        // If canceled_2 is false and cloneForBranch2 is true,
137        if !self.canceled_2.get() && self.clone_for_branch_2.get() {
138            // Let cloneResult be StructuredClone(chunk2).
139            let data = match structuredclone::write(cx, chunk2_value.handle(), None) {
140                Ok(data) => data,
141                Err(error) => {
142                    // If cloneResult is an abrupt completion,
143                    rooted!(&in(cx) let mut error_value = UndefinedValue());
144                    error.to_jsval(cx, global, error_value.handle_mut());
145                    // Perform ! ReadableStreamDefaultControllerError(branch_1.[[controller]], cloneResult.[[Value]]).
146                    self.readable_stream_default_controller_error(
147                        cx,
148                        &self.branch_1,
149                        error_value.handle(),
150                    );
151
152                    // Perform ! ReadableStreamDefaultControllerError(branch_2.[[controller]], cloneResult.[[Value]]).
153                    self.readable_stream_default_controller_error(
154                        cx,
155                        &self.branch_2,
156                        error_value.handle(),
157                    );
158                    // Resolve cancelPromise with ! ReadableStreamCancel(stream, cloneResult.[[Value]]).
159                    self.stream_cancel(cx, global, error_value.handle());
160                    // Return.
161                    return;
162                },
163            };
164            // If cloneResult is an abrupt completion,
165            if let Err(error) = structuredclone::read(cx, global, data, chunk2_value.handle_mut()) {
166                rooted!(&in(cx) let mut error_value = UndefinedValue());
167                error.to_jsval(cx, global, error_value.handle_mut());
168                // Perform ! ReadableStreamDefaultControllerError(branch_1.[[controller]], cloneResult.[[Value]]).
169                self.readable_stream_default_controller_error(
170                    cx,
171                    &self.branch_1,
172                    error_value.handle(),
173                );
174
175                // Perform ! ReadableStreamDefaultControllerError(branch_2.[[controller]], cloneResult.[[Value]]).
176                self.readable_stream_default_controller_error(
177                    cx,
178                    &self.branch_2,
179                    error_value.handle(),
180                );
181                // Resolve cancelPromise with ! ReadableStreamCancel(stream, cloneResult.[[Value]]).
182                self.stream_cancel(cx, global, error_value.handle());
183                // Return.
184                return;
185            }
186        }
187        // If canceled_1 is false, perform ! ReadableStreamDefaultControllerEnqueue(branch_1.[[controller]], chunk1).
188        if !self.canceled_1.get() {
189            self.readable_stream_default_controller_enqueue(
190                cx,
191                &self.branch_1,
192                chunk1_value.handle(),
193            );
194        }
195        // If canceled_2 is false, perform ! ReadableStreamDefaultControllerEnqueue(branch_2.[[controller]], chunk2).
196        if !self.canceled_2.get() {
197            self.readable_stream_default_controller_enqueue(
198                cx,
199                &self.branch_2,
200                chunk2_value.handle(),
201            );
202        }
203        // Set reading to false.
204        self.reading.set(false);
205        // If readAgain is true, perform pullAlgorithm.
206        if self.read_again.get() {
207            self.pull_algorithm(cx);
208        }
209    }
210    /// <https://streams.spec.whatwg.org/#read-request-close-steps>
211    pub(crate) fn close_steps(&self, cx: &mut JSContext) {
212        // Set reading to false.
213        self.reading.set(false);
214        // If canceled_1 is false, perform ! ReadableStreamDefaultControllerClose(branch_1.[[controller]]).
215        if !self.canceled_1.get() {
216            self.readable_stream_default_controller_close(cx, &self.branch_1);
217        }
218        // If canceled_2 is false, perform ! ReadableStreamDefaultControllerClose(branch_2.[[controller]]).
219        if !self.canceled_2.get() {
220            self.readable_stream_default_controller_close(cx, &self.branch_2);
221        }
222        // If canceled_1 is false or canceled_2 is false, resolve cancelPromise with undefined.
223        if !self.canceled_1.get() || !self.canceled_2.get() {
224            self.cancel_promise.resolve_native(cx, &());
225        }
226    }
227    /// <https://streams.spec.whatwg.org/#read-request-error-steps>
228    pub(crate) fn error_steps(&self) {
229        // Set reading to false.
230        self.reading.set(false);
231    }
232    /// Call into enqueue of the default controller of a stream,
233    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue>
234    fn readable_stream_default_controller_enqueue(
235        &self,
236        cx: &mut JSContext,
237        stream: &ReadableStream,
238        chunk: SafeHandleValue,
239    ) {
240        stream
241            .get_default_controller()
242            .enqueue(cx, chunk)
243            .expect("enqueue failed for stream controller in DefaultTeeReadRequest");
244    }
245
246    /// Call into close of the default controller of a stream,
247    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-close>
248    fn readable_stream_default_controller_close(
249        &self,
250        cx: &mut JSContext,
251        stream: &ReadableStream,
252    ) {
253        stream.get_default_controller().close(cx);
254    }
255
256    /// Call into error of the default controller of stream,
257    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-error>
258    fn readable_stream_default_controller_error(
259        &self,
260        cx: &mut JSContext,
261        stream: &ReadableStream,
262        error: SafeHandleValue,
263    ) {
264        stream.get_default_controller().error(cx, error);
265    }
266
267    pub(crate) fn pull_algorithm(&self, cx: &mut JSContext) {
268        self.tee_underlying_source.pull_algorithm(cx);
269    }
270}