script/dom/stream/
defaultteereadrequest.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsapi::Heap;
10use js::jsval::{JSVal, UndefinedValue};
11use js::realm::AutoRealm;
12use js::rust::HandleValue as SafeHandleValue;
13
14use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
15use crate::dom::bindings::root::{Dom, DomRoot};
16use crate::dom::bindings::structuredclone;
17use crate::dom::bindings::trace::RootedTraceableBox;
18use crate::dom::globalscope::GlobalScope;
19use crate::dom::promise::Promise;
20use crate::dom::stream::defaultteeunderlyingsource::DefaultTeeUnderlyingSource;
21use crate::dom::stream::readablestream::ReadableStream;
22use crate::microtask::{Microtask, MicrotaskRunnable};
23use crate::realms::enter_auto_realm;
24use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
25
26#[derive(JSTraceable, MallocSizeOf)]
27#[cfg_attr(crown, expect(crown::unrooted_must_root))]
28pub(crate) struct DefaultTeeReadRequestMicrotask {
29    #[ignore_malloc_size_of = "mozjs"]
30    chunk: Box<Heap<JSVal>>,
31    tee_read_request: Dom<DefaultTeeReadRequest>,
32}
33
34impl MicrotaskRunnable for DefaultTeeReadRequestMicrotask {
35    fn handler(&self, cx: &mut js::context::JSContext) {
36        self.tee_read_request.chunk_steps(cx, &self.chunk);
37    }
38
39    fn enter_realm<'cx>(&self, cx: &'cx mut js::context::JSContext) -> AutoRealm<'cx> {
40        enter_auto_realm(cx, &*self.tee_read_request)
41    }
42}
43
44#[dom_struct]
45/// <https://streams.spec.whatwg.org/#ref-for-read-request%E2%91%A2>
46pub(crate) struct DefaultTeeReadRequest {
47    reflector_: Reflector,
48    stream: Dom<ReadableStream>,
49    branch_1: Dom<ReadableStream>,
50    branch_2: Dom<ReadableStream>,
51    #[conditional_malloc_size_of]
52    reading: Rc<Cell<bool>>,
53    #[conditional_malloc_size_of]
54    read_again: Rc<Cell<bool>>,
55    #[conditional_malloc_size_of]
56    canceled_1: Rc<Cell<bool>>,
57    #[conditional_malloc_size_of]
58    canceled_2: Rc<Cell<bool>>,
59    #[conditional_malloc_size_of]
60    clone_for_branch_2: Rc<Cell<bool>>,
61    #[conditional_malloc_size_of]
62    cancel_promise: Rc<Promise>,
63    tee_underlying_source: Dom<DefaultTeeUnderlyingSource>,
64}
65impl DefaultTeeReadRequest {
66    #[expect(clippy::too_many_arguments)]
67    pub(crate) fn new(
68        stream: &ReadableStream,
69        branch_1: &ReadableStream,
70        branch_2: &ReadableStream,
71        reading: Rc<Cell<bool>>,
72        read_again: Rc<Cell<bool>>,
73        canceled_1: Rc<Cell<bool>>,
74        canceled_2: Rc<Cell<bool>>,
75        clone_for_branch_2: Rc<Cell<bool>>,
76        cancel_promise: Rc<Promise>,
77        tee_underlying_source: &DefaultTeeUnderlyingSource,
78        can_gc: CanGc,
79    ) -> DomRoot<Self> {
80        reflect_dom_object(
81            Box::new(DefaultTeeReadRequest {
82                reflector_: Reflector::new(),
83                stream: Dom::from_ref(stream),
84                branch_1: Dom::from_ref(branch_1),
85                branch_2: Dom::from_ref(branch_2),
86                reading,
87                read_again,
88                canceled_1,
89                canceled_2,
90                clone_for_branch_2,
91                cancel_promise,
92                tee_underlying_source: Dom::from_ref(tee_underlying_source),
93            }),
94            &*stream.global(),
95            can_gc,
96        )
97    }
98    /// Call into cancel of the stream,
99    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>
100    pub(crate) fn stream_cancel(
101        &self,
102        cx: SafeJSContext,
103        global: &GlobalScope,
104        reason: SafeHandleValue,
105        can_gc: CanGc,
106    ) {
107        self.stream.cancel(cx, global, reason, can_gc);
108    }
109    /// Enqueue a microtask to perform the chunk steps
110    /// <https://streams.spec.whatwg.org/#ref-for-read-request-chunk-steps%E2%91%A2>
111    pub(crate) fn enqueue_chunk_steps(&self, chunk: RootedTraceableBox<Heap<JSVal>>) {
112        // Queue a microtask to perform the following steps:
113        let tee_read_request_chunk = DefaultTeeReadRequestMicrotask {
114            chunk: Heap::boxed(*chunk.handle()),
115            tee_read_request: Dom::from_ref(self),
116        };
117        self.stream
118            .global()
119            .enqueue_microtask(Microtask::ReadableStreamTeeReadRequest(
120                tee_read_request_chunk,
121            ));
122    }
123    /// <https://streams.spec.whatwg.org/#ref-for-read-request-chunk-steps%E2%91%A2>
124    #[expect(clippy::borrowed_box)]
125    pub(crate) fn chunk_steps(&self, cx: &mut js::context::JSContext, chunk: &Box<Heap<JSVal>>) {
126        let global = &self.stream.global();
127        // Set readAgain to false.
128        self.read_again.set(false);
129        // Let chunk1 and chunk2 be chunk.
130        let chunk1 = chunk;
131        let chunk2 = chunk;
132
133        rooted!(&in(cx) let chunk1_value = chunk1.get());
134        rooted!(&in(cx) let chunk2_value = chunk2.get());
135        // If canceled_2 is false and cloneForBranch2 is true,
136        if !self.canceled_2.get() && self.clone_for_branch_2.get() {
137            // Let cloneResult be StructuredClone(chunk2).
138            rooted!(&in(cx) let mut clone_result = UndefinedValue());
139            let data = structuredclone::write(cx.into(), chunk2_value.handle(), None).unwrap();
140            // If cloneResult is an abrupt completion,
141            if structuredclone::read(global, data, clone_result.handle_mut(), CanGc::from_cx(cx))
142                .is_err()
143            {
144                // Perform ! ReadableStreamDefaultControllerError(branch_1.[[controller]], cloneResult.[[Value]]).
145                self.readable_stream_default_controller_error(
146                    &self.branch_1,
147                    clone_result.handle(),
148                    CanGc::from_cx(cx),
149                );
150
151                // Perform ! ReadableStreamDefaultControllerError(branch_2.[[controller]], cloneResult.[[Value]]).
152                self.readable_stream_default_controller_error(
153                    &self.branch_2,
154                    clone_result.handle(),
155                    CanGc::from_cx(cx),
156                );
157                // Resolve cancelPromise with ! ReadableStreamCancel(stream, cloneResult.[[Value]]).
158                self.stream_cancel(cx.into(), global, clone_result.handle(), CanGc::from_cx(cx));
159                // Return.
160                return;
161            } else {
162                // Otherwise, set chunk2 to cloneResult.[[Value]].
163                chunk2.set(*clone_result);
164            }
165        }
166        // If canceled_1 is false, perform ! ReadableStreamDefaultControllerEnqueue(branch_1.[[controller]], chunk1).
167        if !self.canceled_1.get() {
168            self.readable_stream_default_controller_enqueue(
169                &self.branch_1,
170                chunk1_value.handle(),
171                CanGc::from_cx(cx),
172            );
173        }
174        // If canceled_2 is false, perform ! ReadableStreamDefaultControllerEnqueue(branch_2.[[controller]], chunk2).
175        if !self.canceled_2.get() {
176            self.readable_stream_default_controller_enqueue(
177                &self.branch_2,
178                chunk2_value.handle(),
179                CanGc::from_cx(cx),
180            );
181        }
182        // Set reading to false.
183        self.reading.set(false);
184        // If readAgain is true, perform pullAlgorithm.
185        if self.read_again.get() {
186            self.pull_algorithm(CanGc::from_cx(cx));
187        }
188    }
189    /// <https://streams.spec.whatwg.org/#read-request-close-steps>
190    pub(crate) fn close_steps(&self, can_gc: CanGc) {
191        // Set reading to false.
192        self.reading.set(false);
193        // If canceled_1 is false, perform ! ReadableStreamDefaultControllerClose(branch_1.[[controller]]).
194        if !self.canceled_1.get() {
195            self.readable_stream_default_controller_close(&self.branch_1, can_gc);
196        }
197        // If canceled_2 is false, perform ! ReadableStreamDefaultControllerClose(branch_2.[[controller]]).
198        if !self.canceled_2.get() {
199            self.readable_stream_default_controller_close(&self.branch_2, can_gc);
200        }
201        // If canceled_1 is false or canceled_2 is false, resolve cancelPromise with undefined.
202        if !self.canceled_1.get() || !self.canceled_2.get() {
203            self.cancel_promise.resolve_native(&(), can_gc);
204        }
205    }
206    /// <https://streams.spec.whatwg.org/#read-request-error-steps>
207    pub(crate) fn error_steps(&self) {
208        // Set reading to false.
209        self.reading.set(false);
210    }
211    /// Call into enqueue of the default controller of a stream,
212    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue>
213    fn readable_stream_default_controller_enqueue(
214        &self,
215        stream: &ReadableStream,
216        chunk: SafeHandleValue,
217        can_gc: CanGc,
218    ) {
219        stream
220            .get_default_controller()
221            .enqueue(GlobalScope::get_cx(), chunk, can_gc)
222            .expect("enqueue failed for stream controller in DefaultTeeReadRequest");
223    }
224
225    /// Call into close of the default controller of a stream,
226    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-close>
227    fn readable_stream_default_controller_close(&self, stream: &ReadableStream, can_gc: CanGc) {
228        stream.get_default_controller().close(can_gc);
229    }
230
231    /// Call into error of the default controller of stream,
232    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-error>
233    fn readable_stream_default_controller_error(
234        &self,
235        stream: &ReadableStream,
236        error: SafeHandleValue,
237        can_gc: CanGc,
238    ) {
239        stream.get_default_controller().error(error, can_gc);
240    }
241
242    pub(crate) fn pull_algorithm(&self, can_gc: CanGc) {
243        self.tee_underlying_source.pull_algorithm(can_gc);
244    }
245}