Skip to main content

script/dom/stream/
byteteeunderlyingsource.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::context::JSContext;
10use js::jsapi::{HandleValueArray, Heap, NewArrayObject, Value};
11use js::jsval::ObjectValue;
12use js::rust::HandleValue as SafeHandleValue;
13use js::typedarray::ArrayBufferViewU8;
14use script_bindings::reflector::{Reflector, reflect_dom_object};
15
16use super::byteteereadintorequest::ByteTeeReadIntoRequest;
17use super::readablestream::ReaderType;
18use super::readablestreambyobreader::ReadIntoRequest;
19use crate::dom::bindings::buffer_source::HeapBufferSource;
20use crate::dom::bindings::error::{Error, Fallible};
21use crate::dom::bindings::reflector::DomGlobal;
22use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
23use crate::dom::globalscope::GlobalScope;
24use crate::dom::promise::Promise;
25use crate::dom::stream::byteteereadrequest::ByteTeeReadRequest;
26use crate::dom::stream::readablestreamdefaultreader::ReadRequest;
27use crate::dom::types::ReadableStream;
28use crate::script_runtime::CanGc;
29
30#[derive(JSTraceable, MallocSizeOf)]
31pub(crate) enum ByteTeeCancelAlgorithm {
32    Cancel1Algorithm,
33    Cancel2Algorithm,
34}
35
36#[derive(Clone, JSTraceable, MallocSizeOf)]
37pub(crate) enum ByteTeePullAlgorithm {
38    Pull1Algorithm,
39    Pull2Algorithm,
40}
41
42#[dom_struct]
43/// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee>
44pub(crate) struct ByteTeeUnderlyingSource {
45    reflector_: Reflector,
46    #[conditional_malloc_size_of]
47    reader: Rc<RefCell<ReaderType>>,
48    stream: Dom<ReadableStream>,
49    branch_1: MutNullableDom<ReadableStream>,
50    branch_2: MutNullableDom<ReadableStream>,
51    #[conditional_malloc_size_of]
52    read_again_for_branch_1: Rc<Cell<bool>>,
53    #[conditional_malloc_size_of]
54    read_again_for_branch_2: Rc<Cell<bool>>,
55    #[conditional_malloc_size_of]
56    reading: Rc<Cell<bool>>,
57    #[conditional_malloc_size_of]
58    canceled_1: Rc<Cell<bool>>,
59    #[conditional_malloc_size_of]
60    canceled_2: Rc<Cell<bool>>,
61    #[ignore_malloc_size_of = "Mozjs"]
62    reason_1: Rc<Heap<Value>>,
63    #[ignore_malloc_size_of = "Mozjs"]
64    reason_2: Rc<Heap<Value>>,
65    #[conditional_malloc_size_of]
66    cancel_promise: Rc<Promise>,
67    #[conditional_malloc_size_of]
68    reader_version: Rc<Cell<u64>>,
69    tee_cancel_algorithm: ByteTeeCancelAlgorithm,
70    byte_tee_pull_algorithm: ByteTeePullAlgorithm,
71}
72
73impl ByteTeeUnderlyingSource {
74    #[allow(clippy::too_many_arguments)]
75    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
76    pub(crate) fn new(
77        reader: Rc<RefCell<ReaderType>>,
78        stream: &ReadableStream,
79        reading: Rc<Cell<bool>>,
80        read_again_for_branch_1: Rc<Cell<bool>>,
81        read_again_for_branch_2: Rc<Cell<bool>>,
82        canceled_1: Rc<Cell<bool>>,
83        canceled_2: Rc<Cell<bool>>,
84        reason_1: Rc<Heap<Value>>,
85        reason_2: Rc<Heap<Value>>,
86        cancel_promise: Rc<Promise>,
87        reader_version: Rc<Cell<u64>>,
88        tee_cancel_algorithm: ByteTeeCancelAlgorithm,
89        byte_tee_pull_algorithm: ByteTeePullAlgorithm,
90        can_gc: CanGc,
91    ) -> DomRoot<ByteTeeUnderlyingSource> {
92        reflect_dom_object(
93            Box::new(ByteTeeUnderlyingSource {
94                reflector_: Reflector::new(),
95                reader,
96                stream: Dom::from_ref(stream),
97                branch_1: MutNullableDom::new(None),
98                branch_2: MutNullableDom::new(None),
99                read_again_for_branch_1,
100                read_again_for_branch_2,
101                reading,
102                canceled_1,
103                canceled_2,
104                reason_1,
105                reason_2,
106                cancel_promise,
107                reader_version,
108                tee_cancel_algorithm,
109                byte_tee_pull_algorithm,
110            }),
111            &*stream.global(),
112            can_gc,
113        )
114    }
115
116    pub(crate) fn set_branch_1(&self, stream: &ReadableStream) {
117        self.branch_1.set(Some(stream));
118    }
119
120    pub(crate) fn set_branch_2(&self, stream: &ReadableStream) {
121        self.branch_2.set(Some(stream));
122    }
123
124    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
125    pub(crate) fn forward_reader_error(
126        &self,
127        cx: &mut JSContext,
128        this_reader: Rc<RefCell<ReaderType>>,
129    ) {
130        let this_reader = this_reader.borrow_mut();
131        match &*this_reader {
132            ReaderType::Default(reader) => {
133                let expected_version = self.reader_version.get();
134                // Upon rejection of thisReader.[[closedPromise]] with reason r,
135                reader
136                    .get()
137                    .expect("Reader should be set.")
138                    .byte_tee_append_native_handler_to_closed_promise(
139                        cx,
140                        &self.branch_1.get().expect("Branch 1 should be set."),
141                        &self.branch_2.get().expect("Branch 2 should be set."),
142                        self.canceled_1.clone(),
143                        self.canceled_2.clone(),
144                        self.cancel_promise.clone(),
145                        self.reader_version.clone(),
146                        expected_version,
147                    );
148            },
149            ReaderType::BYOB(reader) => {
150                let expected_version = self.reader_version.get();
151                // Upon rejection of thisReader.[[closedPromise]] with reason r,
152                reader
153                    .get()
154                    .expect("Reader should be set.")
155                    .byte_tee_append_native_handler_to_closed_promise(
156                        cx,
157                        &self.branch_1.get().expect("Branch 1 should be set."),
158                        &self.branch_2.get().expect("Branch 2 should be set."),
159                        self.canceled_1.clone(),
160                        self.canceled_2.clone(),
161                        self.cancel_promise.clone(),
162                        self.reader_version.clone(),
163                        expected_version,
164                    );
165            },
166        }
167    }
168
169    fn pull_with_default_reader(&self, cx: &mut JSContext, global: &GlobalScope) -> Fallible<()> {
170        let mut reader = self.reader.borrow_mut();
171        match &*reader {
172            ReaderType::BYOB(byte_reader) => {
173                // Assert: readIntoRequests is empty.
174                assert!(
175                    byte_reader
176                        .get()
177                        .expect("Reader should be set.")
178                        .get_num_read_into_requests() ==
179                        0
180                );
181
182                // Release BYOB reader.
183                byte_reader
184                    .get()
185                    .expect("Reader should be set.")
186                    .release(CanGc::from_cx(cx))?;
187
188                // Acquire default reader.
189                let default_reader = self
190                    .stream
191                    .acquire_default_reader(CanGc::from_cx(cx))
192                    .expect("AcquireReadableStreamDefaultReader should not fail");
193
194                *reader = ReaderType::Default(MutNullableDom::new(Some(&default_reader)));
195                self.reader_version
196                    .set(self.reader_version.get().wrapping_add(1));
197                drop(reader);
198
199                // Attach error forwarding for the new reader.
200                self.forward_reader_error(cx, self.reader.clone());
201
202                // IMPORTANT: now actually perform the pull we were asked to do.
203                return self.pull_with_default_reader(cx, global);
204            },
205            ReaderType::Default(reader) => {
206                let byte_tee_read_request = ByteTeeReadRequest::new(
207                    &self.branch_1.get().expect("Branch 1 should be set."),
208                    &self.branch_2.get().expect("Branch 2 should be set."),
209                    &self.stream,
210                    self.read_again_for_branch_1.clone(),
211                    self.read_again_for_branch_2.clone(),
212                    self.reading.clone(),
213                    self.canceled_1.clone(),
214                    self.canceled_2.clone(),
215                    self.cancel_promise.clone(),
216                    self,
217                    global,
218                    CanGc::from_cx(cx),
219                );
220
221                let read_request = ReadRequest::ByteTee {
222                    byte_tee_read_request: Dom::from_ref(&byte_tee_read_request),
223                };
224
225                reader
226                    .get()
227                    .expect("Reader should be set.")
228                    .read(cx, &read_request);
229            },
230        }
231
232        Ok(())
233    }
234
235    fn pull_with_byob_reader(
236        &self,
237        cx: &mut JSContext,
238        view: &HeapBufferSource<ArrayBufferViewU8>,
239        for_branch2: bool,
240        global: &GlobalScope,
241    ) {
242        let mut reader = self.reader.borrow_mut();
243        match &*reader {
244            ReaderType::BYOB(reader) => {
245                // Let byobBranch be branch2 if forBranch2 is true, and branch1 otherwise.
246                let byob_branch = if for_branch2 {
247                    self.branch_2.get().expect("Branch 2 should be set.")
248                } else {
249                    self.branch_1.get().expect("Branch 1 should be set.")
250                };
251
252                // let otherBranch be branch2 if forBranch2 is false, and branch1 otherwise.
253                let other_branch = if for_branch2 {
254                    self.branch_1.get().expect("Branch 1 should be set.")
255                } else {
256                    self.branch_2.get().expect("Branch 2 should be set.")
257                };
258
259                // Let readIntoRequest be a read-into request with the following items:
260                let byte_tee_read_into_request = ByteTeeReadIntoRequest::new(
261                    for_branch2,
262                    &byob_branch,
263                    &other_branch,
264                    &self.stream,
265                    self.read_again_for_branch_1.clone(),
266                    self.read_again_for_branch_2.clone(),
267                    self.reading.clone(),
268                    self.canceled_1.clone(),
269                    self.canceled_2.clone(),
270                    self.cancel_promise.clone(),
271                    self,
272                    global,
273                    CanGc::from_cx(cx),
274                );
275
276                let read_into_request = ReadIntoRequest::ByteTee {
277                    byte_tee_read_into_request: Dom::from_ref(&byte_tee_read_into_request),
278                };
279
280                // Perform ! ReadableStreamBYOBReaderRead(reader, view, 1, readIntoRequest).
281                reader
282                    .get()
283                    .expect("Reader should be set.")
284                    .read(cx, view, 1, &read_into_request);
285            },
286            ReaderType::Default(default_reader) => {
287                // If reader implements ReadableStreamDefaultReader,
288                // Assert: reader.[[readRequests]] is empty.
289                assert!(
290                    default_reader
291                        .get()
292                        .expect("Reader should be set.")
293                        .get_num_read_requests() ==
294                        0
295                );
296
297                // Perform ! ReadableStreamDefaultReaderRelease(reader).
298                default_reader
299                    .get()
300                    .expect("Reader should be set.")
301                    .release(cx)
302                    .expect("Release should be successful.");
303
304                // Set reader to ! AcquireReadableStreamBYOBReader(stream).
305                let byob_reader = self
306                    .stream
307                    .acquire_byob_reader(CanGc::from_cx(cx))
308                    .expect("Reader should be set.");
309
310                *reader = ReaderType::BYOB(MutNullableDom::new(Some(&byob_reader)));
311                self.reader_version
312                    .set(self.reader_version.get().wrapping_add(1));
313
314                drop(reader);
315
316                // Perform forwardReaderError, given reader.
317                self.forward_reader_error(cx, self.reader.clone());
318
319                // Retry the pull using the BYOB reader we just acquired.
320                self.pull_with_byob_reader(cx, view, for_branch2, global);
321            },
322        }
323    }
324
325    /// Let pullAlgorithm be the following steps:
326    pub(crate) fn pull_algorithm(
327        &self,
328        cx: &mut JSContext,
329        byte_tee_pull_algorithm: Option<ByteTeePullAlgorithm>,
330    ) -> Rc<Promise> {
331        let pull_algorithm =
332            byte_tee_pull_algorithm.unwrap_or(self.byte_tee_pull_algorithm.clone());
333
334        match pull_algorithm {
335            ByteTeePullAlgorithm::Pull1Algorithm => {
336                // If reading is true,
337                if self.reading.get() {
338                    // Set readAgainForBranch1 to true.
339                    self.read_again_for_branch_1.set(true);
340                    // Return a promise resolved with undefined.
341                    return Promise::new_resolved(
342                        &self.stream.global(),
343                        cx.into(),
344                        (),
345                        CanGc::from_cx(cx),
346                    );
347                }
348
349                // Set reading to true.
350                self.reading.set(true);
351
352                // Let byobRequest be ! ReadableByteStreamControllerGetBYOBRequest(branch1.[[controller]]).
353                let byob_branch_controller = self
354                    .branch_1
355                    .get()
356                    .expect("Branch 1 should be set.")
357                    .get_byte_controller();
358                let byob_request = byob_branch_controller
359                    .get_byob_request(cx)
360                    .expect("Byob request should be set.");
361
362                match byob_request {
363                    // If byobRequest is null, perform pullWithDefaultReader.
364                    None => {
365                        self.pull_with_default_reader(cx, &self.stream.global())
366                            .expect("Pull with default reader should be successful.");
367                    },
368                    Some(request) => {
369                        // Otherwise, perform pullWithBYOBReader, given byobRequest.[[view]] and false.
370                        let view = request.get_view();
371
372                        self.pull_with_byob_reader(cx, &view, false, &self.stream.global());
373                    },
374                }
375
376                // Return a promise resolved with undefined.
377                Promise::new_resolved(&self.stream.global(), cx.into(), (), CanGc::from_cx(cx))
378            },
379            ByteTeePullAlgorithm::Pull2Algorithm => {
380                // If reading is true,
381                if self.reading.get() {
382                    // Set readAgainForBranch2 to true.
383                    self.read_again_for_branch_2.set(true);
384
385                    // Return a promise resolved with undefined.
386                    return Promise::new_resolved(
387                        &self.stream.global(),
388                        cx.into(),
389                        (),
390                        CanGc::from_cx(cx),
391                    );
392                }
393
394                // Set reading to true.
395                self.reading.set(true);
396
397                // Let byobRequest be ! ReadableByteStreamControllerGetBYOBRequest(branch2.[[controller]]).
398                let byob_branch_controller = self
399                    .branch_2
400                    .get()
401                    .expect("Branch 2 should be set.")
402                    .get_byte_controller();
403                let byob_request = byob_branch_controller
404                    .get_byob_request(cx)
405                    .expect("Byob request should be set.");
406
407                match byob_request {
408                    None => {
409                        self.pull_with_default_reader(cx, &self.stream.global())
410                            .expect("Pull with default reader should be successful.");
411                    },
412                    Some(request) => {
413                        // Otherwise, perform pullWithBYOBReader, given byobRequest.[[view]] and true.
414                        let view = request.get_view();
415
416                        self.pull_with_byob_reader(cx, &view, true, &self.stream.global());
417                    },
418                }
419
420                // Return a promise resolved with undefined.
421                Promise::new_resolved(&self.stream.global(), cx.into(), (), CanGc::from_cx(cx))
422            },
423        }
424    }
425
426    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee>
427    /// Let cancel1Algorithm be the following steps, taking a reason argument
428    /// and
429    /// Let cancel2Algorithm be the following steps, taking a reason argument
430    pub(crate) fn cancel_algorithm(
431        &self,
432        cx: &mut JSContext,
433        reason: SafeHandleValue,
434    ) -> Option<Result<Rc<Promise>, Error>> {
435        match self.tee_cancel_algorithm {
436            ByteTeeCancelAlgorithm::Cancel1Algorithm => {
437                // Set canceled1 to true.
438                self.canceled_1.set(true);
439
440                // Set reason1 to reason.
441                self.reason_1.set(reason.get());
442
443                // If canceled2 is true,
444                if self.canceled_2.get() {
445                    self.resolve_cancel_promise(cx);
446                }
447
448                // Return cancelPromise.
449                Some(Ok(self.cancel_promise.clone()))
450            },
451            ByteTeeCancelAlgorithm::Cancel2Algorithm => {
452                // Set canceled_2 to true.
453                self.canceled_2.set(true);
454
455                // Set reason_2 to reason.
456                self.reason_2.set(reason.get());
457
458                // If canceled_1 is true,
459                if self.canceled_1.get() {
460                    self.resolve_cancel_promise(cx);
461                }
462                // Return cancelPromise.
463                Some(Ok(self.cancel_promise.clone()))
464            },
465        }
466    }
467
468    #[expect(unsafe_code)]
469    fn resolve_cancel_promise(&self, cx: &mut JSContext) {
470        // Let compositeReason be ! CreateArrayFromList(« reason_1, reason_2 »).
471        rooted_vec!(let mut reasons_values);
472        reasons_values.push(self.reason_1.get());
473        reasons_values.push(self.reason_2.get());
474
475        let reasons_values_array = HandleValueArray::from(&reasons_values);
476        rooted!(&in(cx) let reasons = unsafe { NewArrayObject(cx.raw_cx(), &reasons_values_array) });
477        rooted!(&in(cx) let reasons_value = ObjectValue(reasons.get()));
478
479        // Let cancelResult be ! ReadableStreamCancel(stream, compositeReason).
480        let cancel_result = self
481            .stream
482            .cancel(cx, &self.stream.global(), reasons_value.handle());
483
484        // Resolve cancelPromise with cancelResult.
485        self.cancel_promise
486            .resolve_native(&cancel_result, CanGc::from_cx(cx));
487    }
488}