script/dom/
defaultteereadrequest.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsapi::{Heap, JSAutoRealm};
10use js::jsval::{JSVal, UndefinedValue};
11use js::rust::HandleValue as SafeHandleValue;
12
13use super::bindings::reflector::reflect_dom_object;
14use super::bindings::root::DomRoot;
15use super::bindings::structuredclone;
16use crate::dom::bindings::reflector::{DomGlobal, Reflector};
17use crate::dom::bindings::root::Dom;
18use crate::dom::bindings::trace::RootedTraceableBox;
19use crate::dom::defaultteeunderlyingsource::DefaultTeeUnderlyingSource;
20use crate::dom::globalscope::GlobalScope;
21use crate::dom::promise::Promise;
22use crate::dom::readablestream::ReadableStream;
23use crate::microtask::{Microtask, MicrotaskRunnable};
24use crate::realms::enter_realm;
25use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
26
27#[derive(JSTraceable, MallocSizeOf)]
28#[cfg_attr(crown, allow(crown::unrooted_must_root))]
29pub(crate) struct DefaultTeeReadRequestMicrotask {
30    #[ignore_malloc_size_of = "mozjs"]
31    chunk: Box<Heap<JSVal>>,
32    tee_read_request: Dom<DefaultTeeReadRequest>,
33}
34
35impl MicrotaskRunnable for DefaultTeeReadRequestMicrotask {
36    fn handler(&self, can_gc: CanGc) {
37        let cx = GlobalScope::get_cx();
38        self.tee_read_request.chunk_steps(cx, &self.chunk, can_gc);
39    }
40
41    fn enter_realm(&self) -> JSAutoRealm {
42        enter_realm(&*self.tee_read_request)
43    }
44}
45
46#[dom_struct]
47/// <https://streams.spec.whatwg.org/#ref-for-read-request%E2%91%A2>
48pub(crate) struct DefaultTeeReadRequest {
49    reflector_: Reflector,
50    stream: Dom<ReadableStream>,
51    branch_1: Dom<ReadableStream>,
52    branch_2: Dom<ReadableStream>,
53    #[ignore_malloc_size_of = "Rc"]
54    reading: Rc<Cell<bool>>,
55    #[ignore_malloc_size_of = "Rc"]
56    read_again: Rc<Cell<bool>>,
57    #[ignore_malloc_size_of = "Rc"]
58    canceled_1: Rc<Cell<bool>>,
59    #[ignore_malloc_size_of = "Rc"]
60    canceled_2: Rc<Cell<bool>>,
61    #[ignore_malloc_size_of = "Rc"]
62    clone_for_branch_2: Rc<Cell<bool>>,
63    #[ignore_malloc_size_of = "Rc"]
64    cancel_promise: Rc<Promise>,
65    tee_underlying_source: Dom<DefaultTeeUnderlyingSource>,
66}
67impl DefaultTeeReadRequest {
68    #[allow(clippy::too_many_arguments)]
69    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
70    pub(crate) fn new(
71        stream: &ReadableStream,
72        branch_1: &ReadableStream,
73        branch_2: &ReadableStream,
74        reading: Rc<Cell<bool>>,
75        read_again: Rc<Cell<bool>>,
76        canceled_1: Rc<Cell<bool>>,
77        canceled_2: Rc<Cell<bool>>,
78        clone_for_branch_2: Rc<Cell<bool>>,
79        cancel_promise: Rc<Promise>,
80        tee_underlying_source: &DefaultTeeUnderlyingSource,
81        can_gc: CanGc,
82    ) -> DomRoot<Self> {
83        reflect_dom_object(
84            Box::new(DefaultTeeReadRequest {
85                reflector_: Reflector::new(),
86                stream: Dom::from_ref(stream),
87                branch_1: Dom::from_ref(branch_1),
88                branch_2: Dom::from_ref(branch_2),
89                reading,
90                read_again,
91                canceled_1,
92                canceled_2,
93                clone_for_branch_2,
94                cancel_promise,
95                tee_underlying_source: Dom::from_ref(tee_underlying_source),
96            }),
97            &*stream.global(),
98            can_gc,
99        )
100    }
101    /// Call into cancel of the stream,
102    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>
103    pub(crate) fn stream_cancel(
104        &self,
105        cx: SafeJSContext,
106        global: &GlobalScope,
107        reason: SafeHandleValue,
108        can_gc: CanGc,
109    ) {
110        self.stream.cancel(cx, global, reason, can_gc);
111    }
112    /// Enqueue a microtask to perform the chunk steps
113    /// <https://streams.spec.whatwg.org/#ref-for-read-request-chunk-steps%E2%91%A2>
114    pub(crate) fn enqueue_chunk_steps(&self, chunk: RootedTraceableBox<Heap<JSVal>>) {
115        // Queue a microtask to perform the following steps:
116        let tee_read_request_chunk = DefaultTeeReadRequestMicrotask {
117            chunk: Heap::boxed(*chunk.handle()),
118            tee_read_request: Dom::from_ref(self),
119        };
120        let global = self.stream.global();
121        let microtask_queue = global.microtask_queue();
122        let cx = GlobalScope::get_cx();
123        microtask_queue.enqueue(
124            Microtask::ReadableStreamTeeReadRequest(tee_read_request_chunk),
125            cx,
126        );
127    }
128    /// <https://streams.spec.whatwg.org/#ref-for-read-request-chunk-steps%E2%91%A2>
129    #[allow(clippy::borrowed_box)]
130    pub(crate) fn chunk_steps(&self, cx: SafeJSContext, chunk: &Box<Heap<JSVal>>, can_gc: CanGc) {
131        let global = &self.stream.global();
132        // Set readAgain to false.
133        self.read_again.set(false);
134        // Let chunk1 and chunk2 be chunk.
135        let chunk1 = chunk;
136        let chunk2 = chunk;
137
138        rooted!(in(*cx) let chunk1_value = chunk1.get());
139        rooted!(in(*cx) let chunk2_value = chunk2.get());
140        // If canceled_2 is false and cloneForBranch2 is true,
141        if !self.canceled_2.get() && self.clone_for_branch_2.get() {
142            // Let cloneResult be StructuredClone(chunk2).
143            rooted!(in(*cx) let mut clone_result = UndefinedValue());
144            let data = structuredclone::write(cx, chunk2_value.handle(), None).unwrap();
145            // If cloneResult is an abrupt completion,
146            if structuredclone::read(global, data, clone_result.handle_mut()).is_err() {
147                // Perform ! ReadableStreamDefaultControllerError(branch_1.[[controller]], cloneResult.[[Value]]).
148                self.readable_stream_default_controller_error(
149                    &self.branch_1,
150                    clone_result.handle(),
151                    can_gc,
152                );
153
154                // Perform ! ReadableStreamDefaultControllerError(branch_2.[[controller]], cloneResult.[[Value]]).
155                self.readable_stream_default_controller_error(
156                    &self.branch_2,
157                    clone_result.handle(),
158                    can_gc,
159                );
160                // Resolve cancelPromise with ! ReadableStreamCancel(stream, cloneResult.[[Value]]).
161                self.stream_cancel(cx, global, clone_result.handle(), can_gc);
162                // Return.
163                return;
164            } else {
165                // Otherwise, set chunk2 to cloneResult.[[Value]].
166                chunk2.set(*clone_result);
167            }
168        }
169        // If canceled_1 is false, perform ! ReadableStreamDefaultControllerEnqueue(branch_1.[[controller]], chunk1).
170        if !self.canceled_1.get() {
171            self.readable_stream_default_controller_enqueue(
172                &self.branch_1,
173                chunk1_value.handle(),
174                can_gc,
175            );
176        }
177        // If canceled_2 is false, perform ! ReadableStreamDefaultControllerEnqueue(branch_2.[[controller]], chunk2).
178        if !self.canceled_2.get() {
179            self.readable_stream_default_controller_enqueue(
180                &self.branch_2,
181                chunk2_value.handle(),
182                can_gc,
183            );
184        }
185        // Set reading to false.
186        self.reading.set(false);
187        // If readAgain is true, perform pullAlgorithm.
188        if self.read_again.get() {
189            self.pull_algorithm(can_gc);
190        }
191    }
192    /// <https://streams.spec.whatwg.org/#read-request-close-steps>
193    pub(crate) fn close_steps(&self, can_gc: CanGc) {
194        // Set reading to false.
195        self.reading.set(false);
196        // If canceled_1 is false, perform ! ReadableStreamDefaultControllerClose(branch_1.[[controller]]).
197        if !self.canceled_1.get() {
198            self.readable_stream_default_controller_close(&self.branch_1, can_gc);
199        }
200        // If canceled_2 is false, perform ! ReadableStreamDefaultControllerClose(branch_2.[[controller]]).
201        if !self.canceled_2.get() {
202            self.readable_stream_default_controller_close(&self.branch_2, can_gc);
203        }
204        // If canceled_1 is false or canceled_2 is false, resolve cancelPromise with undefined.
205        if !self.canceled_1.get() || !self.canceled_2.get() {
206            self.cancel_promise.resolve_native(&(), can_gc);
207        }
208    }
209    /// <https://streams.spec.whatwg.org/#read-request-error-steps>
210    pub(crate) fn error_steps(&self) {
211        // Set reading to false.
212        self.reading.set(false);
213    }
214    /// Call into enqueue of the default controller of a stream,
215    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue>
216    fn readable_stream_default_controller_enqueue(
217        &self,
218        stream: &ReadableStream,
219        chunk: SafeHandleValue,
220        can_gc: CanGc,
221    ) {
222        stream
223            .get_default_controller()
224            .enqueue(GlobalScope::get_cx(), chunk, can_gc)
225            .expect("enqueue failed for stream controller in DefaultTeeReadRequest");
226    }
227
228    /// Call into close of the default controller of a stream,
229    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-close>
230    fn readable_stream_default_controller_close(&self, stream: &ReadableStream, can_gc: CanGc) {
231        stream.get_default_controller().close(can_gc);
232    }
233
234    /// Call into error of the default controller of stream,
235    /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-error>
236    fn readable_stream_default_controller_error(
237        &self,
238        stream: &ReadableStream,
239        error: SafeHandleValue,
240        can_gc: CanGc,
241    ) {
242        stream.get_default_controller().error(error, can_gc);
243    }
244
245    pub(crate) fn pull_algorithm(&self, can_gc: CanGc) {
246        self.tee_underlying_source.pull_algorithm(can_gc);
247    }
248}