1use std::cell::Cell;
6use std::collections::VecDeque;
7use std::mem;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::gc::CustomAutoRooterGuard;
12use js::jsapi::Heap;
13use js::jsval::{JSVal, UndefinedValue};
14use js::realm::CurrentRealm;
15use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
16use js::typedarray::{ArrayBufferView, ArrayBufferViewU8};
17use script_bindings::root::Dom;
18
19use super::bindings::buffer_source::{BufferSource, HeapBufferSource};
20use super::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::ReadableStreamBYOBReaderReadOptions;
21use super::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamReadResult;
22use super::bindings::reflector::reflect_dom_object;
23use super::byteteereadintorequest::ByteTeeReadIntoRequest;
24use super::promisenativehandler::{Callback, PromiseNativeHandler};
25use super::readablebytestreamcontroller::ReadableByteStreamController;
26use super::readablestreamgenericreader::ReadableStreamGenericReader;
27use crate::dom::bindings::cell::DomRefCell;
28use crate::dom::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::ReadableStreamBYOBReaderMethods;
29use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
30use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
31use crate::dom::bindings::root::{DomRoot, MutNullableDom};
32use crate::dom::bindings::trace::RootedTraceableBox;
33use crate::dom::globalscope::GlobalScope;
34use crate::dom::promise::Promise;
35use crate::dom::readablestream::ReadableStream;
36use crate::realms::{InRealm, enter_realm};
37use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
38
39#[derive(Clone, JSTraceable, MallocSizeOf)]
41pub enum ReadIntoRequest {
42 Read(#[conditional_malloc_size_of] Rc<Promise>),
44 ByteTee {
45 byte_tee_read_into_request: Dom<ByteTeeReadIntoRequest>,
46 },
47}
48
49impl ReadIntoRequest {
50 pub fn chunk_steps(&self, chunk: RootedTraceableBox<Heap<JSVal>>, can_gc: CanGc) {
52 match self {
53 ReadIntoRequest::Read(promise) => {
54 promise.resolve_native(
57 &ReadableStreamReadResult {
58 done: Some(false),
59 value: chunk,
60 },
61 can_gc,
62 );
63 },
64 ReadIntoRequest::ByteTee {
65 byte_tee_read_into_request,
66 } => {
67 byte_tee_read_into_request.enqueue_chunk_steps(
68 HeapBufferSource::<ArrayBufferViewU8>::new(BufferSource::ArrayBufferView(
69 RootedTraceableBox::from_box(Heap::boxed(chunk.get().to_object())),
70 )),
71 )
72 },
73 }
74 }
75
76 pub fn close_steps(&self, chunk: Option<RootedTraceableBox<Heap<JSVal>>>, can_gc: CanGc) {
78 match self {
79 ReadIntoRequest::Read(promise) => match chunk {
80 Some(chunk) => promise.resolve_native(
83 &ReadableStreamReadResult {
84 done: Some(true),
85 value: chunk,
86 },
87 can_gc,
88 ),
89 None => {
90 let result = RootedTraceableBox::new(Heap::default());
91 result.set(UndefinedValue());
92 promise.resolve_native(
93 &ReadableStreamReadResult {
94 done: Some(true),
95 value: result,
96 },
97 can_gc,
98 );
99 },
100 },
101 ReadIntoRequest::ByteTee {
102 byte_tee_read_into_request,
103 } => match chunk {
104 Some(chunk) => byte_tee_read_into_request
105 .close_steps(
106 Some(HeapBufferSource::<ArrayBufferViewU8>::new(
107 BufferSource::ArrayBufferView(RootedTraceableBox::from_box(
108 Heap::boxed(chunk.get().to_object()),
109 )),
110 )),
111 can_gc,
112 )
113 .expect("close steps should not fail"),
114 None => byte_tee_read_into_request
115 .close_steps(None, can_gc)
116 .expect("close steps should not fail"),
117 },
118 }
119 }
120
121 pub(crate) fn error_steps(&self, e: SafeHandleValue, can_gc: CanGc) {
123 match self {
124 ReadIntoRequest::Read(promise) => {
125 promise.reject_native(&e, can_gc)
128 },
129 ReadIntoRequest::ByteTee {
130 byte_tee_read_into_request,
131 } => {
132 byte_tee_read_into_request.error_steps();
133 },
134 }
135 }
136}
137
138#[derive(Clone, JSTraceable, MallocSizeOf)]
141#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
142struct ByteTeeClosedPromiseRejectionHandler {
143 branch_1_controller: Dom<ReadableByteStreamController>,
144 branch_2_controller: Dom<ReadableByteStreamController>,
145 #[ignore_malloc_size_of = "Rc"]
146 canceled_1: Rc<Cell<bool>>,
147 #[ignore_malloc_size_of = "Rc"]
148 canceled_2: Rc<Cell<bool>>,
149 #[ignore_malloc_size_of = "Rc"]
150 cancel_promise: Rc<Promise>,
151 #[ignore_malloc_size_of = "Rc"]
152 reader_version: Rc<Cell<u64>>,
153 expected_version: u64,
154}
155
156impl Callback for ByteTeeClosedPromiseRejectionHandler {
157 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
160 let can_gc = CanGc::from_cx(cx);
161 if self.reader_version.get() != self.expected_version {
163 return;
164 }
165
166 self.branch_1_controller.error(v, can_gc);
168
169 self.branch_2_controller.error(v, can_gc);
171
172 if !self.canceled_1.get() || !self.canceled_2.get() {
174 self.cancel_promise.resolve_native(&(), can_gc);
175 }
176 }
177}
178
179#[dom_struct]
181pub(crate) struct ReadableStreamBYOBReader {
182 reflector_: Reflector,
183
184 stream: MutNullableDom<ReadableStream>,
186
187 read_into_requests: DomRefCell<VecDeque<ReadIntoRequest>>,
188
189 #[conditional_malloc_size_of]
191 closed_promise: DomRefCell<Rc<Promise>>,
192}
193
194impl ReadableStreamBYOBReader {
195 fn new_with_proto(
196 global: &GlobalScope,
197 proto: Option<SafeHandleObject>,
198 can_gc: CanGc,
199 ) -> DomRoot<ReadableStreamBYOBReader> {
200 reflect_dom_object_with_proto(
201 Box::new(ReadableStreamBYOBReader::new_inherited(global, can_gc)),
202 global,
203 proto,
204 can_gc,
205 )
206 }
207
208 fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> ReadableStreamBYOBReader {
209 ReadableStreamBYOBReader {
210 reflector_: Reflector::new(),
211 stream: MutNullableDom::new(None),
212 read_into_requests: DomRefCell::new(Default::default()),
213 closed_promise: DomRefCell::new(Promise::new(global, can_gc)),
214 }
215 }
216
217 pub(crate) fn new(global: &GlobalScope, can_gc: CanGc) -> DomRoot<ReadableStreamBYOBReader> {
218 reflect_dom_object(
219 Box::new(Self::new_inherited(global, can_gc)),
220 global,
221 can_gc,
222 )
223 }
224
225 pub(crate) fn set_up(
227 &self,
228 stream: &ReadableStream,
229 global: &GlobalScope,
230 can_gc: CanGc,
231 ) -> Fallible<()> {
232 if stream.is_locked() {
234 return Err(Error::Type("stream is locked".to_owned()));
235 }
236
237 if !stream.has_byte_controller() {
239 return Err(Error::Type(
240 "stream controller is not a byte stream controller".to_owned(),
241 ));
242 }
243
244 self.generic_initialize(global, stream, can_gc);
246
247 self.read_into_requests.borrow_mut().clear();
249
250 Ok(())
251 }
252
253 pub(crate) fn release(&self, can_gc: CanGc) -> Fallible<()> {
255 self.generic_release(can_gc)
257 .expect("Generic release failed");
258 let cx = GlobalScope::get_cx();
260 rooted!(in(*cx) let mut error = UndefinedValue());
261 Error::Type("Reader is released".to_owned()).to_jsval(
262 cx,
263 &self.global(),
264 error.handle_mut(),
265 can_gc,
266 );
267
268 self.error_read_into_requests(error.handle(), can_gc);
270 Ok(())
271 }
272
273 pub(crate) fn error_read_into_requests(&self, e: SafeHandleValue, can_gc: CanGc) {
275 self.closed_promise.borrow().reject_native(&e, can_gc);
277
278 self.closed_promise.borrow().set_promise_is_handled();
280
281 let mut read_into_requests = self.take_read_into_requests();
283
284 for request in read_into_requests.drain(0..) {
286 request.error_steps(e, can_gc);
288 }
289 }
290
291 fn take_read_into_requests(&self) -> VecDeque<ReadIntoRequest> {
292 mem::take(&mut *self.read_into_requests.borrow_mut())
293 }
294
295 pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
297 self.read_into_requests
298 .borrow_mut()
299 .push_back(read_request.clone());
300 }
301
302 pub(crate) fn cancel(&self, can_gc: CanGc) {
304 let mut read_into_requests = self.take_read_into_requests();
307 for request in read_into_requests.drain(0..) {
310 request.close_steps(None, can_gc);
312 }
313 }
314
315 pub(crate) fn close(&self, can_gc: CanGc) {
316 self.closed_promise.borrow().resolve_native(&(), can_gc);
318 }
319
320 pub(crate) fn read(
322 &self,
323 cx: SafeJSContext,
324 view: HeapBufferSource<ArrayBufferViewU8>,
325 min: u64,
326 read_into_request: &ReadIntoRequest,
327 can_gc: CanGc,
328 ) {
329 assert!(self.stream.get().is_some());
333
334 let stream = self.stream.get().unwrap();
335
336 stream.set_is_disturbed(true);
338 if stream.is_errored() {
340 let cx = GlobalScope::get_cx();
341 rooted!(in(*cx) let mut error = UndefinedValue());
342 stream.get_stored_error(error.handle_mut());
343
344 read_into_request.error_steps(error.handle(), can_gc);
345 } else {
346 stream.perform_pull_into(cx, read_into_request, view, min, can_gc);
349 }
350 }
351
352 pub(crate) fn get_num_read_into_requests(&self) -> usize {
353 self.read_into_requests.borrow().len()
354 }
355
356 pub(crate) fn remove_read_into_request(&self) -> ReadIntoRequest {
357 self.read_into_requests
358 .borrow_mut()
359 .pop_front()
360 .expect("read into requests is empty")
361 }
362
363 #[allow(clippy::too_many_arguments)]
364 pub(crate) fn byte_tee_append_native_handler_to_closed_promise(
365 &self,
366 branch_1: &ReadableStream,
367 branch_2: &ReadableStream,
368 canceled_1: Rc<Cell<bool>>,
369 canceled_2: Rc<Cell<bool>>,
370 cancel_promise: Rc<Promise>,
371 reader_version: Rc<Cell<u64>>,
372 expected_version: u64,
373 can_gc: CanGc,
374 ) {
375 let branch_1_controller = branch_1.get_byte_controller();
376
377 let branch_2_controller = branch_2.get_byte_controller();
378
379 let global = self.global();
380 let handler = PromiseNativeHandler::new(
381 &global,
382 None,
383 Some(Box::new(ByteTeeClosedPromiseRejectionHandler {
384 branch_1_controller: Dom::from_ref(&branch_1_controller),
385 branch_2_controller: Dom::from_ref(&branch_2_controller),
386 canceled_1,
387 canceled_2,
388 cancel_promise,
389 reader_version,
390 expected_version,
391 })),
392 can_gc,
393 );
394
395 let realm = enter_realm(&*global);
396 let comp = InRealm::Entered(&realm);
397
398 self.closed_promise
399 .borrow()
400 .append_native_handler(&handler, comp, can_gc);
401 }
402}
403
404impl ReadableStreamBYOBReaderMethods<crate::DomTypeHolder> for ReadableStreamBYOBReader {
405 fn Constructor(
407 global: &GlobalScope,
408 proto: Option<SafeHandleObject>,
409 can_gc: CanGc,
410 stream: &ReadableStream,
411 ) -> Fallible<DomRoot<Self>> {
412 let reader = Self::new_with_proto(global, proto, can_gc);
413
414 Self::set_up(&reader, stream, global, can_gc)?;
416
417 Ok(reader)
418 }
419
420 fn Read(
422 &self,
423 view: CustomAutoRooterGuard<ArrayBufferView>,
424 options: &ReadableStreamBYOBReaderReadOptions,
425 can_gc: CanGc,
426 ) -> Rc<Promise> {
427 let view = HeapBufferSource::<ArrayBufferViewU8>::from_view(view);
428 let min = options.min;
429 let promise = Promise::new(&self.global(), can_gc);
431
432 let cx = GlobalScope::get_cx();
433 if view.byte_length() == 0 {
435 promise.reject_error(Error::Type("view byte length is 0".to_owned()), can_gc);
436 return promise;
437 }
438 if view.viewed_buffer_array_byte_length(cx) == 0 {
441 promise.reject_error(
442 Error::Type("viewed buffer byte length is 0".to_owned()),
443 can_gc,
444 );
445 return promise;
446 }
447
448 if view.is_detached_buffer(cx) {
451 promise.reject_error(Error::Type("view is detached".to_owned()), can_gc);
452 return promise;
453 }
454
455 if min == 0 {
457 promise.reject_error(Error::Type("min is 0".to_owned()), can_gc);
458 return promise;
459 }
460
461 if view.has_typed_array_name() {
463 if min > (view.get_typed_array_length() as u64) {
465 promise.reject_error(
466 Error::Range("min is greater than array length".to_owned()),
467 can_gc,
468 );
469 return promise;
470 }
471 } else {
472 if min > (view.byte_length() as u64) {
475 promise.reject_error(
476 Error::Range("min is greater than byte length".to_owned()),
477 can_gc,
478 );
479 return promise;
480 }
481 }
482
483 if self.stream.get().is_none() {
485 promise.reject_error(
486 Error::Type("min is greater than byte length".to_owned()),
487 can_gc,
488 );
489 return promise;
490 }
491
492 let read_into_request = ReadIntoRequest::Read(promise.clone());
503
504 self.read(cx, view, min, &read_into_request, can_gc);
506
507 promise
509 }
510
511 fn ReleaseLock(&self, can_gc: CanGc) -> Fallible<()> {
513 if self.stream.get().is_none() {
514 return Ok(());
516 }
517
518 self.release(can_gc)
520 }
521
522 fn Closed(&self) -> Rc<Promise> {
524 self.closed()
525 }
526
527 fn Cancel(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
529 self.generic_cancel(cx, &self.global(), reason, can_gc)
530 }
531}
532
533impl ReadableStreamGenericReader for ReadableStreamBYOBReader {
534 fn get_closed_promise(&self) -> Rc<Promise> {
535 self.closed_promise.borrow().clone()
536 }
537
538 fn set_closed_promise(&self, promise: Rc<Promise>) {
539 *self.closed_promise.borrow_mut() = promise;
540 }
541
542 fn set_stream(&self, stream: Option<&ReadableStream>) {
543 self.stream.set(stream);
544 }
545
546 fn get_stream(&self) -> Option<DomRoot<ReadableStream>> {
547 self.stream.get()
548 }
549
550 fn as_byob_reader(&self) -> Option<&ReadableStreamBYOBReader> {
551 Some(self)
552 }
553}