1#![allow(dead_code)]
5
6use std::collections::VecDeque;
7use std::mem;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::gc::CustomAutoRooterGuard;
12use js::jsapi::Heap;
13use js::jsval::{JSVal, UndefinedValue};
14use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
15use js::typedarray::{ArrayBufferView, ArrayBufferViewU8};
16
17use super::bindings::buffer_source::HeapBufferSource;
18use super::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::ReadableStreamBYOBReaderReadOptions;
19use super::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamReadResult;
20use super::bindings::reflector::reflect_dom_object;
21use super::readablestreamgenericreader::ReadableStreamGenericReader;
22use crate::dom::bindings::cell::DomRefCell;
23use crate::dom::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::ReadableStreamBYOBReaderMethods;
24use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
25use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
26use crate::dom::bindings::root::{DomRoot, MutNullableDom};
27use crate::dom::bindings::trace::RootedTraceableBox;
28use crate::dom::globalscope::GlobalScope;
29use crate::dom::promise::Promise;
30use crate::dom::readablestream::ReadableStream;
31use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
32
33#[derive(Clone, JSTraceable, MallocSizeOf)]
35pub enum ReadIntoRequest {
36 Read(#[ignore_malloc_size_of = "Rc is hard"] Rc<Promise>),
38}
39
40impl ReadIntoRequest {
41 pub fn chunk_steps(&self, chunk: RootedTraceableBox<Heap<JSVal>>, can_gc: CanGc) {
43 match self {
46 ReadIntoRequest::Read(promise) => {
47 promise.resolve_native(
48 &ReadableStreamReadResult {
49 done: Some(false),
50 value: chunk,
51 },
52 can_gc,
53 );
54 },
55 }
56 }
57
58 pub fn close_steps(&self, chunk: Option<RootedTraceableBox<Heap<JSVal>>>, can_gc: CanGc) {
60 match self {
63 ReadIntoRequest::Read(promise) => match chunk {
64 Some(chunk) => promise.resolve_native(
65 &ReadableStreamReadResult {
66 done: Some(true),
67 value: chunk,
68 },
69 can_gc,
70 ),
71 None => {
72 let result = RootedTraceableBox::new(Heap::default());
73 result.set(UndefinedValue());
74 promise.resolve_native(
75 &ReadableStreamReadResult {
76 done: Some(true),
77 value: result,
78 },
79 can_gc,
80 );
81 },
82 },
83 }
84 }
85
86 pub(crate) fn error_steps(&self, e: SafeHandleValue, can_gc: CanGc) {
88 match self {
91 ReadIntoRequest::Read(promise) => promise.reject_native(&e, can_gc),
92 }
93 }
94}
95
96#[dom_struct]
98pub(crate) struct ReadableStreamBYOBReader {
99 reflector_: Reflector,
100
101 stream: MutNullableDom<ReadableStream>,
103
104 read_into_requests: DomRefCell<VecDeque<ReadIntoRequest>>,
105
106 #[ignore_malloc_size_of = "Rc is hard"]
108 closed_promise: DomRefCell<Rc<Promise>>,
109}
110
111impl ReadableStreamBYOBReader {
112 fn new_with_proto(
113 global: &GlobalScope,
114 proto: Option<SafeHandleObject>,
115 can_gc: CanGc,
116 ) -> DomRoot<ReadableStreamBYOBReader> {
117 reflect_dom_object_with_proto(
118 Box::new(ReadableStreamBYOBReader::new_inherited(global, can_gc)),
119 global,
120 proto,
121 can_gc,
122 )
123 }
124
125 fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> ReadableStreamBYOBReader {
126 ReadableStreamBYOBReader {
127 reflector_: Reflector::new(),
128 stream: MutNullableDom::new(None),
129 read_into_requests: DomRefCell::new(Default::default()),
130 closed_promise: DomRefCell::new(Promise::new(global, can_gc)),
131 }
132 }
133
134 pub(crate) fn new(global: &GlobalScope, can_gc: CanGc) -> DomRoot<ReadableStreamBYOBReader> {
135 reflect_dom_object(
136 Box::new(Self::new_inherited(global, can_gc)),
137 global,
138 can_gc,
139 )
140 }
141
142 pub(crate) fn set_up(
144 &self,
145 stream: &ReadableStream,
146 global: &GlobalScope,
147 can_gc: CanGc,
148 ) -> Fallible<()> {
149 if stream.is_locked() {
151 return Err(Error::Type("stream is locked".to_owned()));
152 }
153
154 if !stream.has_byte_controller() {
156 return Err(Error::Type(
157 "stream controller is not a byte stream controller".to_owned(),
158 ));
159 }
160
161 self.generic_initialize(global, stream, can_gc);
163
164 self.read_into_requests.borrow_mut().clear();
166
167 Ok(())
168 }
169
170 pub(crate) fn release(&self, can_gc: CanGc) -> Fallible<()> {
172 self.generic_release(can_gc)?;
174 let cx = GlobalScope::get_cx();
176 rooted!(in(*cx) let mut error = UndefinedValue());
177 Error::Type("Reader is released".to_owned()).to_jsval(
178 cx,
179 &self.global(),
180 error.handle_mut(),
181 can_gc,
182 );
183
184 self.error_read_into_requests(error.handle(), can_gc);
186 Ok(())
187 }
188
189 pub(crate) fn error_read_into_requests(&self, e: SafeHandleValue, can_gc: CanGc) {
191 self.closed_promise.borrow().reject_native(&e, can_gc);
193
194 self.closed_promise.borrow().set_promise_is_handled();
196
197 let mut read_into_requests = self.take_read_into_requests();
199
200 for request in read_into_requests.drain(0..) {
202 request.error_steps(e, can_gc);
204 }
205 }
206
207 fn take_read_into_requests(&self) -> VecDeque<ReadIntoRequest> {
208 mem::take(&mut *self.read_into_requests.borrow_mut())
209 }
210
211 pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
213 self.read_into_requests
214 .borrow_mut()
215 .push_back(read_request.clone());
216 }
217
218 pub(crate) fn cancel(&self, can_gc: CanGc) {
220 let mut read_into_requests = self.take_read_into_requests();
223 for request in read_into_requests.drain(0..) {
226 request.close_steps(None, can_gc);
228 }
229 }
230
231 pub(crate) fn close(&self, can_gc: CanGc) {
232 self.closed_promise.borrow().resolve_native(&(), can_gc);
234 }
235
236 pub(crate) fn read(
238 &self,
239 cx: SafeJSContext,
240 view: HeapBufferSource<ArrayBufferViewU8>,
241 options: &ReadableStreamBYOBReaderReadOptions,
242 read_into_request: &ReadIntoRequest,
243 can_gc: CanGc,
244 ) {
245 assert!(self.stream.get().is_some());
249
250 let stream = self.stream.get().unwrap();
251
252 stream.set_is_disturbed(true);
254 if stream.is_errored() {
256 let cx = GlobalScope::get_cx();
257 rooted!(in(*cx) let mut error = UndefinedValue());
258 stream.get_stored_error(error.handle_mut());
259
260 read_into_request.error_steps(error.handle(), can_gc);
261 } else {
262 stream.perform_pull_into(cx, read_into_request, view, options, can_gc);
265 }
266 }
267
268 pub(crate) fn get_num_read_into_requests(&self) -> usize {
269 self.read_into_requests.borrow().len()
270 }
271
272 pub(crate) fn remove_read_into_request(&self) -> ReadIntoRequest {
273 self.read_into_requests
274 .borrow_mut()
275 .pop_front()
276 .expect("read into requests is empty")
277 }
278}
279
280impl ReadableStreamBYOBReaderMethods<crate::DomTypeHolder> for ReadableStreamBYOBReader {
281 fn Constructor(
283 global: &GlobalScope,
284 proto: Option<SafeHandleObject>,
285 can_gc: CanGc,
286 stream: &ReadableStream,
287 ) -> Fallible<DomRoot<Self>> {
288 let reader = Self::new_with_proto(global, proto, can_gc);
289
290 Self::set_up(&reader, stream, global, can_gc)?;
292
293 Ok(reader)
294 }
295
296 fn Read(
298 &self,
299 view: CustomAutoRooterGuard<ArrayBufferView>,
300 options: &ReadableStreamBYOBReaderReadOptions,
301 can_gc: CanGc,
302 ) -> Rc<Promise> {
303 let view = HeapBufferSource::<ArrayBufferViewU8>::from_view(view);
304
305 let promise = Promise::new(&self.global(), can_gc);
307
308 let cx = GlobalScope::get_cx();
309 if view.byte_length() == 0 {
311 promise.reject_error(Error::Type("view byte length is 0".to_owned()), can_gc);
312 return promise;
313 }
314 if view.viewed_buffer_array_byte_length(cx) == 0 {
317 promise.reject_error(
318 Error::Type("viewed buffer byte length is 0".to_owned()),
319 can_gc,
320 );
321 return promise;
322 }
323
324 if view.is_detached_buffer(cx) {
327 promise.reject_error(Error::Type("view is detached".to_owned()), can_gc);
328 return promise;
329 }
330
331 if options.min == 0 {
333 promise.reject_error(Error::Type("min is 0".to_owned()), can_gc);
334 return promise;
335 }
336
337 if view.has_typed_array_name() {
339 if options.min > (view.get_typed_array_length() as u64) {
341 promise.reject_error(
342 Error::Range("min is greater than array length".to_owned()),
343 can_gc,
344 );
345 return promise;
346 }
347 } else {
348 if options.min > (view.byte_length() as u64) {
351 promise.reject_error(
352 Error::Range("min is greater than byte length".to_owned()),
353 can_gc,
354 );
355 return promise;
356 }
357 }
358
359 if self.stream.get().is_none() {
361 promise.reject_error(
362 Error::Type("min is greater than byte length".to_owned()),
363 can_gc,
364 );
365 return promise;
366 }
367
368 let read_into_request = ReadIntoRequest::Read(promise.clone());
379
380 self.read(cx, view, options, &read_into_request, can_gc);
382
383 promise
385 }
386
387 fn ReleaseLock(&self, can_gc: CanGc) -> Fallible<()> {
389 if self.stream.get().is_none() {
390 return Ok(());
392 }
393
394 self.release(can_gc)
396 }
397
398 fn Closed(&self) -> Rc<Promise> {
400 self.closed()
401 }
402
403 fn Cancel(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
405 self.generic_cancel(cx, &self.global(), reason, can_gc)
406 }
407}
408
409impl ReadableStreamGenericReader for ReadableStreamBYOBReader {
410 fn get_closed_promise(&self) -> Rc<Promise> {
411 self.closed_promise.borrow().clone()
412 }
413
414 fn set_closed_promise(&self, promise: Rc<Promise>) {
415 *self.closed_promise.borrow_mut() = promise;
416 }
417
418 fn set_stream(&self, stream: Option<&ReadableStream>) {
419 self.stream.set(stream);
420 }
421
422 fn get_stream(&self) -> Option<DomRoot<ReadableStream>> {
423 self.stream.get()
424 }
425
426 fn as_byob_reader(&self) -> Option<&ReadableStreamBYOBReader> {
427 Some(self)
428 }
429}