script/dom/
readablestreambyobreader.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4#![allow(dead_code)]
5
6use std::collections::VecDeque;
7use std::mem;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::gc::CustomAutoRooterGuard;
12use js::jsapi::Heap;
13use js::jsval::{JSVal, UndefinedValue};
14use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
15use js::typedarray::{ArrayBufferView, ArrayBufferViewU8};
16
17use super::bindings::buffer_source::HeapBufferSource;
18use super::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::ReadableStreamBYOBReaderReadOptions;
19use super::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamReadResult;
20use super::bindings::reflector::reflect_dom_object;
21use super::readablestreamgenericreader::ReadableStreamGenericReader;
22use crate::dom::bindings::cell::DomRefCell;
23use crate::dom::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::ReadableStreamBYOBReaderMethods;
24use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
25use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
26use crate::dom::bindings::root::{DomRoot, MutNullableDom};
27use crate::dom::bindings::trace::RootedTraceableBox;
28use crate::dom::globalscope::GlobalScope;
29use crate::dom::promise::Promise;
30use crate::dom::readablestream::ReadableStream;
31use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
32
33/// <https://streams.spec.whatwg.org/#read-into-request>
34#[derive(Clone, JSTraceable, MallocSizeOf)]
35pub enum ReadIntoRequest {
36    /// <https://streams.spec.whatwg.org/#byob-reader-read>
37    Read(#[ignore_malloc_size_of = "Rc is hard"] Rc<Promise>),
38}
39
40impl ReadIntoRequest {
41    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-chunk-steps>
42    pub fn chunk_steps(&self, chunk: RootedTraceableBox<Heap<JSVal>>, can_gc: CanGc) {
43        // chunk steps, given chunk
44        // Resolve promise with «[ "value" → chunk, "done" → false ]».
45        match self {
46            ReadIntoRequest::Read(promise) => {
47                promise.resolve_native(
48                    &ReadableStreamReadResult {
49                        done: Some(false),
50                        value: chunk,
51                    },
52                    can_gc,
53                );
54            },
55        }
56    }
57
58    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-close-steps%E2%91%A0>
59    pub fn close_steps(&self, chunk: Option<RootedTraceableBox<Heap<JSVal>>>, can_gc: CanGc) {
60        // close steps, given chunk
61        // Resolve promise with «[ "value" → chunk, "done" → true ]».
62        match self {
63            ReadIntoRequest::Read(promise) => match chunk {
64                Some(chunk) => promise.resolve_native(
65                    &ReadableStreamReadResult {
66                        done: Some(true),
67                        value: chunk,
68                    },
69                    can_gc,
70                ),
71                None => {
72                    let result = RootedTraceableBox::new(Heap::default());
73                    result.set(UndefinedValue());
74                    promise.resolve_native(
75                        &ReadableStreamReadResult {
76                            done: Some(true),
77                            value: result,
78                        },
79                        can_gc,
80                    );
81                },
82            },
83        }
84    }
85
86    /// <https://streams.spec.whatwg.org/#ref-for-read-into-request-error-steps>
87    pub(crate) fn error_steps(&self, e: SafeHandleValue, can_gc: CanGc) {
88        // error steps, given e
89        // Reject promise with e.
90        match self {
91            ReadIntoRequest::Read(promise) => promise.reject_native(&e, can_gc),
92        }
93    }
94}
95
96/// <https://streams.spec.whatwg.org/#readablestreambyobreader>
97#[dom_struct]
98pub(crate) struct ReadableStreamBYOBReader {
99    reflector_: Reflector,
100
101    /// <https://streams.spec.whatwg.org/#readablestreamgenericreader-stream>
102    stream: MutNullableDom<ReadableStream>,
103
104    read_into_requests: DomRefCell<VecDeque<ReadIntoRequest>>,
105
106    /// <https://streams.spec.whatwg.org/#readablestreamgenericreader-closedpromise>
107    #[ignore_malloc_size_of = "Rc is hard"]
108    closed_promise: DomRefCell<Rc<Promise>>,
109}
110
111impl ReadableStreamBYOBReader {
112    fn new_with_proto(
113        global: &GlobalScope,
114        proto: Option<SafeHandleObject>,
115        can_gc: CanGc,
116    ) -> DomRoot<ReadableStreamBYOBReader> {
117        reflect_dom_object_with_proto(
118            Box::new(ReadableStreamBYOBReader::new_inherited(global, can_gc)),
119            global,
120            proto,
121            can_gc,
122        )
123    }
124
125    fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> ReadableStreamBYOBReader {
126        ReadableStreamBYOBReader {
127            reflector_: Reflector::new(),
128            stream: MutNullableDom::new(None),
129            read_into_requests: DomRefCell::new(Default::default()),
130            closed_promise: DomRefCell::new(Promise::new(global, can_gc)),
131        }
132    }
133
134    pub(crate) fn new(global: &GlobalScope, can_gc: CanGc) -> DomRoot<ReadableStreamBYOBReader> {
135        reflect_dom_object(
136            Box::new(Self::new_inherited(global, can_gc)),
137            global,
138            can_gc,
139        )
140    }
141
142    /// <https://streams.spec.whatwg.org/#set-up-readable-stream-byob-reader>
143    pub(crate) fn set_up(
144        &self,
145        stream: &ReadableStream,
146        global: &GlobalScope,
147        can_gc: CanGc,
148    ) -> Fallible<()> {
149        // If ! IsReadableStreamLocked(stream) is true, throw a TypeError exception.
150        if stream.is_locked() {
151            return Err(Error::Type("stream is locked".to_owned()));
152        }
153
154        // If stream.[[controller]] does not implement ReadableByteStreamController, throw a TypeError exception.
155        if !stream.has_byte_controller() {
156            return Err(Error::Type(
157                "stream controller is not a byte stream controller".to_owned(),
158            ));
159        }
160
161        // Perform ! ReadableStreamReaderGenericInitialize(reader, stream).
162        self.generic_initialize(global, stream, can_gc);
163
164        // Set reader.[[readIntoRequests]] to a new empty list.
165        self.read_into_requests.borrow_mut().clear();
166
167        Ok(())
168    }
169
170    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreambyobreaderrelease>
171    pub(crate) fn release(&self, can_gc: CanGc) -> Fallible<()> {
172        // Perform ! ReadableStreamReaderGenericRelease(reader).
173        self.generic_release(can_gc)?;
174        // Let e be a new TypeError exception.
175        let cx = GlobalScope::get_cx();
176        rooted!(in(*cx) let mut error = UndefinedValue());
177        Error::Type("Reader is released".to_owned()).to_jsval(
178            cx,
179            &self.global(),
180            error.handle_mut(),
181            can_gc,
182        );
183
184        // Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e).
185        self.error_read_into_requests(error.handle(), can_gc);
186        Ok(())
187    }
188
189    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablestreambyobreadererrorreadintorequests>
190    pub(crate) fn error_read_into_requests(&self, e: SafeHandleValue, can_gc: CanGc) {
191        // Reject reader.[[closedPromise]] with e.
192        self.closed_promise.borrow().reject_native(&e, can_gc);
193
194        // Set reader.[[closedPromise]].[[PromiseIsHandled]] to true.
195        self.closed_promise.borrow().set_promise_is_handled();
196
197        // Let readRequests be reader.[[readRequests]].
198        let mut read_into_requests = self.take_read_into_requests();
199
200        // Set reader.[[readIntoRequests]] to a new empty list.
201        for request in read_into_requests.drain(0..) {
202            // Perform readIntoRequest’s error steps, given e.
203            request.error_steps(e, can_gc);
204        }
205    }
206
207    fn take_read_into_requests(&self) -> VecDeque<ReadIntoRequest> {
208        mem::take(&mut *self.read_into_requests.borrow_mut())
209    }
210
211    /// <https://streams.spec.whatwg.org/#readable-stream-add-read-into-request>
212    pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
213        self.read_into_requests
214            .borrow_mut()
215            .push_back(read_request.clone());
216    }
217
218    /// <https://streams.spec.whatwg.org/#readable-stream-cancel>
219    pub(crate) fn cancel(&self, can_gc: CanGc) {
220        // If reader is not undefined and reader implements ReadableStreamBYOBReader,
221        // Let readIntoRequests be reader.[[readIntoRequests]].
222        let mut read_into_requests = self.take_read_into_requests();
223        // Set reader.[[readIntoRequests]] to an empty list.
224        // Perform readIntoRequest’s close steps, given undefined.
225        for request in read_into_requests.drain(0..) {
226            // Perform readIntoRequest’s close steps, given undefined.
227            request.close_steps(None, can_gc);
228        }
229    }
230
231    pub(crate) fn close(&self, can_gc: CanGc) {
232        // Resolve reader.[[closedPromise]] with undefined.
233        self.closed_promise.borrow().resolve_native(&(), can_gc);
234    }
235
236    /// <https://streams.spec.whatwg.org/#readable-stream-byob-reader-read>
237    pub(crate) fn read(
238        &self,
239        cx: SafeJSContext,
240        view: HeapBufferSource<ArrayBufferViewU8>,
241        options: &ReadableStreamBYOBReaderReadOptions,
242        read_into_request: &ReadIntoRequest,
243        can_gc: CanGc,
244    ) {
245        // Let stream be reader.[[stream]].
246
247        // Assert: stream is not undefined.
248        assert!(self.stream.get().is_some());
249
250        let stream = self.stream.get().unwrap();
251
252        // Set stream.[[disturbed]] to true.
253        stream.set_is_disturbed(true);
254        // If stream.[[state]] is "errored", perform readIntoRequest’s error steps given stream.[[storedError]].
255        if stream.is_errored() {
256            let cx = GlobalScope::get_cx();
257            rooted!(in(*cx) let mut error = UndefinedValue());
258            stream.get_stored_error(error.handle_mut());
259
260            read_into_request.error_steps(error.handle(), can_gc);
261        } else {
262            // Otherwise,
263            // perform ! ReadableByteStreamControllerPullInto(stream.[[controller]], view, min, readIntoRequest).
264            stream.perform_pull_into(cx, read_into_request, view, options, can_gc);
265        }
266    }
267
268    pub(crate) fn get_num_read_into_requests(&self) -> usize {
269        self.read_into_requests.borrow().len()
270    }
271
272    pub(crate) fn remove_read_into_request(&self) -> ReadIntoRequest {
273        self.read_into_requests
274            .borrow_mut()
275            .pop_front()
276            .expect("read into requests is empty")
277    }
278}
279
280impl ReadableStreamBYOBReaderMethods<crate::DomTypeHolder> for ReadableStreamBYOBReader {
281    /// <https://streams.spec.whatwg.org/#byob-reader-constructor>
282    fn Constructor(
283        global: &GlobalScope,
284        proto: Option<SafeHandleObject>,
285        can_gc: CanGc,
286        stream: &ReadableStream,
287    ) -> Fallible<DomRoot<Self>> {
288        let reader = Self::new_with_proto(global, proto, can_gc);
289
290        // Perform ? SetUpReadableStreamBYOBReader(this, stream).
291        Self::set_up(&reader, stream, global, can_gc)?;
292
293        Ok(reader)
294    }
295
296    /// <https://streams.spec.whatwg.org/#byob-reader-read>
297    fn Read(
298        &self,
299        view: CustomAutoRooterGuard<ArrayBufferView>,
300        options: &ReadableStreamBYOBReaderReadOptions,
301        can_gc: CanGc,
302    ) -> Rc<Promise> {
303        let view = HeapBufferSource::<ArrayBufferViewU8>::from_view(view);
304
305        // Let promise be a new promise.
306        let promise = Promise::new(&self.global(), can_gc);
307
308        let cx = GlobalScope::get_cx();
309        // If view.[[ByteLength]] is 0, return a promise rejected with a TypeError exception.
310        if view.byte_length() == 0 {
311            promise.reject_error(Error::Type("view byte length is 0".to_owned()), can_gc);
312            return promise;
313        }
314        // If view.[[ViewedArrayBuffer]].[[ArrayBufferByteLength]] is 0,
315        // return a promise rejected with a TypeError exception.
316        if view.viewed_buffer_array_byte_length(cx) == 0 {
317            promise.reject_error(
318                Error::Type("viewed buffer byte length is 0".to_owned()),
319                can_gc,
320            );
321            return promise;
322        }
323
324        // If ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is true,
325        // return a promise rejected with a TypeError exception.
326        if view.is_detached_buffer(cx) {
327            promise.reject_error(Error::Type("view is detached".to_owned()), can_gc);
328            return promise;
329        }
330
331        // If options["min"] is 0, return a promise rejected with a TypeError exception.
332        if options.min == 0 {
333            promise.reject_error(Error::Type("min is 0".to_owned()), can_gc);
334            return promise;
335        }
336
337        // If view has a [[TypedArrayName]] internal slot,
338        if view.has_typed_array_name() {
339            // If options["min"] > view.[[ArrayLength]], return a promise rejected with a RangeError exception.
340            if options.min > (view.get_typed_array_length() as u64) {
341                promise.reject_error(
342                    Error::Range("min is greater than array length".to_owned()),
343                    can_gc,
344                );
345                return promise;
346            }
347        } else {
348            // Otherwise (i.e., it is a DataView),
349            // If options["min"] > view.[[ByteLength]], return a promise rejected with a RangeError exception.
350            if options.min > (view.byte_length() as u64) {
351                promise.reject_error(
352                    Error::Range("min is greater than byte length".to_owned()),
353                    can_gc,
354                );
355                return promise;
356            }
357        }
358
359        // If this.[[stream]] is undefined, return a promise rejected with a TypeError exception.
360        if self.stream.get().is_none() {
361            promise.reject_error(
362                Error::Type("min is greater than byte length".to_owned()),
363                can_gc,
364            );
365            return promise;
366        }
367
368        // Let readIntoRequest be a new read-into request with the following items:
369        //
370        // chunk steps, given chunk
371        // Resolve promise with «[ "value" → chunk, "done" → false ]».
372        //
373        // close steps, given chunk
374        // Resolve promise with «[ "value" → chunk, "done" → true ]».
375        //
376        // error steps, given e
377        // Reject promise with e
378        let read_into_request = ReadIntoRequest::Read(promise.clone());
379
380        // Perform ! ReadableStreamBYOBReaderRead(this, view, options["min"], readIntoRequest).
381        self.read(cx, view, options, &read_into_request, can_gc);
382
383        // Return promise.
384        promise
385    }
386
387    /// <https://streams.spec.whatwg.org/#byob-reader-release-lock>
388    fn ReleaseLock(&self, can_gc: CanGc) -> Fallible<()> {
389        if self.stream.get().is_none() {
390            // If this.[[stream]] is undefined, return.
391            return Ok(());
392        }
393
394        // Perform !ReadableStreamBYOBReaderRelease(this).
395        self.release(can_gc)
396    }
397
398    /// <https://streams.spec.whatwg.org/#generic-reader-closed>
399    fn Closed(&self) -> Rc<Promise> {
400        self.closed()
401    }
402
403    /// <https://streams.spec.whatwg.org/#generic-reader-cancel>
404    fn Cancel(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
405        self.generic_cancel(cx, &self.global(), reason, can_gc)
406    }
407}
408
409impl ReadableStreamGenericReader for ReadableStreamBYOBReader {
410    fn get_closed_promise(&self) -> Rc<Promise> {
411        self.closed_promise.borrow().clone()
412    }
413
414    fn set_closed_promise(&self, promise: Rc<Promise>) {
415        *self.closed_promise.borrow_mut() = promise;
416    }
417
418    fn set_stream(&self, stream: Option<&ReadableStream>) {
419        self.stream.set(stream);
420    }
421
422    fn get_stream(&self) -> Option<DomRoot<ReadableStream>> {
423        self.stream.get()
424    }
425
426    fn as_byob_reader(&self) -> Option<&ReadableStreamBYOBReader> {
427        Some(self)
428    }
429}