1use std::cell::Cell;
6use std::collections::VecDeque;
7use std::mem;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::context::JSContext;
12use js::gc::CustomAutoRooterGuard;
13use js::jsapi::Heap;
14use js::jsval::{JSVal, UndefinedValue};
15use js::realm::CurrentRealm;
16use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
17use js::typedarray::{ArrayBufferView, ArrayBufferViewU8};
18use script_bindings::root::Dom;
19
20use super::byteteereadintorequest::ByteTeeReadIntoRequest;
21use super::readablebytestreamcontroller::ReadableByteStreamController;
22use super::readablestreamgenericreader::ReadableStreamGenericReader;
23use crate::dom::bindings::buffer_source::{BufferSource, HeapBufferSource};
24use crate::dom::bindings::cell::DomRefCell;
25use crate::dom::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::{
26 ReadableStreamBYOBReaderMethods, ReadableStreamBYOBReaderReadOptions,
27};
28use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamReadResult;
29use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
30use crate::dom::bindings::reflector::{
31 DomGlobal, Reflector, reflect_dom_object, reflect_dom_object_with_proto,
32};
33use crate::dom::bindings::root::{DomRoot, MutNullableDom};
34use crate::dom::bindings::trace::RootedTraceableBox;
35use crate::dom::globalscope::GlobalScope;
36use crate::dom::promise::Promise;
37use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
38use crate::dom::stream::readablestream::ReadableStream;
39use crate::realms::{InRealm, enter_realm};
40use crate::script_runtime::CanGc;
41
42#[derive(Clone, JSTraceable, MallocSizeOf)]
44pub enum ReadIntoRequest {
45 Read(#[conditional_malloc_size_of] Rc<Promise>),
47 ByteTee {
48 byte_tee_read_into_request: Dom<ByteTeeReadIntoRequest>,
49 },
50}
51
52impl ReadIntoRequest {
53 pub fn chunk_steps(&self, chunk: RootedTraceableBox<Heap<JSVal>>, can_gc: CanGc) {
55 match self {
56 ReadIntoRequest::Read(promise) => {
57 promise.resolve_native(
60 &ReadableStreamReadResult {
61 done: Some(false),
62 value: chunk,
63 },
64 can_gc,
65 );
66 },
67 ReadIntoRequest::ByteTee {
68 byte_tee_read_into_request,
69 } => {
70 byte_tee_read_into_request.enqueue_chunk_steps(
71 HeapBufferSource::<ArrayBufferViewU8>::new(BufferSource::ArrayBufferView(
72 RootedTraceableBox::from_box(Heap::boxed(chunk.get().to_object())),
73 )),
74 )
75 },
76 }
77 }
78
79 pub fn close_steps(&self, cx: &mut JSContext, chunk: Option<RootedTraceableBox<Heap<JSVal>>>) {
81 match self {
82 ReadIntoRequest::Read(promise) => match chunk {
83 Some(chunk) => promise.resolve_native(
86 &ReadableStreamReadResult {
87 done: Some(true),
88 value: chunk,
89 },
90 CanGc::from_cx(cx),
91 ),
92 None => {
93 let result = RootedTraceableBox::new(Heap::default());
94 result.set(UndefinedValue());
95 promise.resolve_native(
96 &ReadableStreamReadResult {
97 done: Some(true),
98 value: result,
99 },
100 CanGc::from_cx(cx),
101 );
102 },
103 },
104 ReadIntoRequest::ByteTee {
105 byte_tee_read_into_request,
106 } => match chunk {
107 Some(chunk) => byte_tee_read_into_request
108 .close_steps(
109 cx,
110 Some(HeapBufferSource::<ArrayBufferViewU8>::new(
111 BufferSource::ArrayBufferView(RootedTraceableBox::from_box(
112 Heap::boxed(chunk.get().to_object()),
113 )),
114 )),
115 )
116 .expect("close steps should not fail"),
117 None => byte_tee_read_into_request
118 .close_steps(cx, None)
119 .expect("close steps should not fail"),
120 },
121 }
122 }
123
124 pub(crate) fn error_steps(&self, e: SafeHandleValue, can_gc: CanGc) {
126 match self {
127 ReadIntoRequest::Read(promise) => {
128 promise.reject_native(&e, can_gc)
131 },
132 ReadIntoRequest::ByteTee {
133 byte_tee_read_into_request,
134 } => {
135 byte_tee_read_into_request.error_steps();
136 },
137 }
138 }
139}
140
141#[derive(Clone, JSTraceable, MallocSizeOf)]
144#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
145struct ByteTeeClosedPromiseRejectionHandler {
146 branch_1_controller: Dom<ReadableByteStreamController>,
147 branch_2_controller: Dom<ReadableByteStreamController>,
148 #[conditional_malloc_size_of]
149 canceled_1: Rc<Cell<bool>>,
150 #[conditional_malloc_size_of]
151 canceled_2: Rc<Cell<bool>>,
152 #[conditional_malloc_size_of]
153 cancel_promise: Rc<Promise>,
154 #[conditional_malloc_size_of]
155 reader_version: Rc<Cell<u64>>,
156 expected_version: u64,
157}
158
159impl Callback for ByteTeeClosedPromiseRejectionHandler {
160 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
163 if self.reader_version.get() != self.expected_version {
165 return;
166 }
167
168 self.branch_1_controller.error(cx, v);
170
171 self.branch_2_controller.error(cx, v);
173
174 if !self.canceled_1.get() || !self.canceled_2.get() {
176 self.cancel_promise.resolve_native(&(), CanGc::from_cx(cx));
177 }
178 }
179}
180
181#[dom_struct]
183pub(crate) struct ReadableStreamBYOBReader {
184 reflector_: Reflector,
185
186 stream: MutNullableDom<ReadableStream>,
188
189 read_into_requests: DomRefCell<VecDeque<ReadIntoRequest>>,
190
191 #[conditional_malloc_size_of]
193 closed_promise: DomRefCell<Rc<Promise>>,
194}
195
196impl ReadableStreamBYOBReader {
197 fn new_with_proto(
198 global: &GlobalScope,
199 proto: Option<SafeHandleObject>,
200 can_gc: CanGc,
201 ) -> DomRoot<ReadableStreamBYOBReader> {
202 reflect_dom_object_with_proto(
203 Box::new(ReadableStreamBYOBReader::new_inherited(global, can_gc)),
204 global,
205 proto,
206 can_gc,
207 )
208 }
209
210 fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> ReadableStreamBYOBReader {
211 ReadableStreamBYOBReader {
212 reflector_: Reflector::new(),
213 stream: MutNullableDom::new(None),
214 read_into_requests: DomRefCell::new(Default::default()),
215 closed_promise: DomRefCell::new(Promise::new(global, can_gc)),
216 }
217 }
218
219 pub(crate) fn new(global: &GlobalScope, can_gc: CanGc) -> DomRoot<ReadableStreamBYOBReader> {
220 reflect_dom_object(
221 Box::new(Self::new_inherited(global, can_gc)),
222 global,
223 can_gc,
224 )
225 }
226
227 pub(crate) fn set_up(
229 &self,
230 stream: &ReadableStream,
231 global: &GlobalScope,
232 can_gc: CanGc,
233 ) -> Fallible<()> {
234 if stream.is_locked() {
236 return Err(Error::Type(c"stream is locked".to_owned()));
237 }
238
239 if !stream.has_byte_controller() {
241 return Err(Error::Type(
242 c"stream controller is not a byte stream controller".to_owned(),
243 ));
244 }
245
246 self.generic_initialize(global, stream, can_gc);
248
249 self.read_into_requests.borrow_mut().clear();
251
252 Ok(())
253 }
254
255 pub(crate) fn release(&self, can_gc: CanGc) -> Fallible<()> {
257 self.generic_release(can_gc)
259 .expect("Generic release failed");
260 let cx = GlobalScope::get_cx();
262 rooted!(in(*cx) let mut error = UndefinedValue());
263 Error::Type(c"Reader is released".to_owned()).to_jsval(
264 cx,
265 &self.global(),
266 error.handle_mut(),
267 can_gc,
268 );
269
270 self.error_read_into_requests(error.handle(), can_gc);
272 Ok(())
273 }
274
275 pub(crate) fn error_read_into_requests(&self, e: SafeHandleValue, can_gc: CanGc) {
277 self.closed_promise.borrow().reject_native(&e, can_gc);
279
280 self.closed_promise.borrow().set_promise_is_handled();
282
283 let mut read_into_requests = self.take_read_into_requests();
285
286 for request in read_into_requests.drain(0..) {
288 request.error_steps(e, can_gc);
290 }
291 }
292
293 fn take_read_into_requests(&self) -> VecDeque<ReadIntoRequest> {
294 mem::take(&mut *self.read_into_requests.borrow_mut())
295 }
296
297 pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
299 self.read_into_requests
300 .borrow_mut()
301 .push_back(read_request.clone());
302 }
303
304 pub(crate) fn cancel(&self, cx: &mut JSContext) {
306 let mut read_into_requests = self.take_read_into_requests();
309 for request in read_into_requests.drain(0..) {
312 request.close_steps(cx, None);
314 }
315 }
316
317 pub(crate) fn close(&self, can_gc: CanGc) {
318 self.closed_promise.borrow().resolve_native(&(), can_gc);
320 }
321
322 pub(crate) fn read(
324 &self,
325 cx: &mut JSContext,
326 view: HeapBufferSource<ArrayBufferViewU8>,
327 min: u64,
328 read_into_request: &ReadIntoRequest,
329 ) {
330 assert!(self.stream.get().is_some());
334
335 let stream = self.stream.get().unwrap();
336
337 stream.set_is_disturbed(true);
339 if stream.is_errored() {
341 rooted!(&in(cx) let mut error = UndefinedValue());
342 stream.get_stored_error(error.handle_mut());
343
344 read_into_request.error_steps(error.handle(), CanGc::from_cx(cx));
345 } else {
346 stream.perform_pull_into(cx, read_into_request, view, min);
349 }
350 }
351
352 pub(crate) fn get_num_read_into_requests(&self) -> usize {
353 self.read_into_requests.borrow().len()
354 }
355
356 pub(crate) fn remove_read_into_request(&self) -> ReadIntoRequest {
357 self.read_into_requests
358 .borrow_mut()
359 .pop_front()
360 .expect("read into requests is empty")
361 }
362
363 #[allow(clippy::too_many_arguments)]
364 pub(crate) fn byte_tee_append_native_handler_to_closed_promise(
365 &self,
366 branch_1: &ReadableStream,
367 branch_2: &ReadableStream,
368 canceled_1: Rc<Cell<bool>>,
369 canceled_2: Rc<Cell<bool>>,
370 cancel_promise: Rc<Promise>,
371 reader_version: Rc<Cell<u64>>,
372 expected_version: u64,
373 can_gc: CanGc,
374 ) {
375 let branch_1_controller = branch_1.get_byte_controller();
376
377 let branch_2_controller = branch_2.get_byte_controller();
378
379 let global = self.global();
380 let handler = PromiseNativeHandler::new(
381 &global,
382 None,
383 Some(Box::new(ByteTeeClosedPromiseRejectionHandler {
384 branch_1_controller: Dom::from_ref(&branch_1_controller),
385 branch_2_controller: Dom::from_ref(&branch_2_controller),
386 canceled_1,
387 canceled_2,
388 cancel_promise,
389 reader_version,
390 expected_version,
391 })),
392 can_gc,
393 );
394
395 let realm = enter_realm(&*global);
396 let comp = InRealm::Entered(&realm);
397
398 self.closed_promise
399 .borrow()
400 .append_native_handler(&handler, comp, can_gc);
401 }
402}
403
404impl ReadableStreamBYOBReaderMethods<crate::DomTypeHolder> for ReadableStreamBYOBReader {
405 fn Constructor(
407 global: &GlobalScope,
408 proto: Option<SafeHandleObject>,
409 can_gc: CanGc,
410 stream: &ReadableStream,
411 ) -> Fallible<DomRoot<Self>> {
412 let reader = Self::new_with_proto(global, proto, can_gc);
413
414 Self::set_up(&reader, stream, global, can_gc)?;
416
417 Ok(reader)
418 }
419
420 fn Read(
422 &self,
423 cx: &mut JSContext,
424 view: CustomAutoRooterGuard<ArrayBufferView>,
425 options: &ReadableStreamBYOBReaderReadOptions,
426 ) -> Rc<Promise> {
427 let view = HeapBufferSource::<ArrayBufferViewU8>::from_view(view);
428 let min = options.min;
429 let promise = Promise::new2(cx, &self.global());
431
432 if view.byte_length() == 0 {
434 promise.reject_error(
435 Error::Type(c"view byte length is 0".to_owned()),
436 CanGc::from_cx(cx),
437 );
438 return promise;
439 }
440 if view.viewed_buffer_array_byte_length(cx.into()) == 0 {
443 promise.reject_error(
444 Error::Type(c"viewed buffer byte length is 0".to_owned()),
445 CanGc::from_cx(cx),
446 );
447 return promise;
448 }
449
450 if view.is_detached_buffer(cx.into()) {
453 promise.reject_error(
454 Error::Type(c"view is detached".to_owned()),
455 CanGc::from_cx(cx),
456 );
457 return promise;
458 }
459
460 if min == 0 {
462 promise.reject_error(Error::Type(c"min is 0".to_owned()), CanGc::from_cx(cx));
463 return promise;
464 }
465
466 if view.has_typed_array_name() {
468 if min > (view.get_typed_array_length() as u64) {
470 promise.reject_error(
471 Error::Range(c"min is greater than array length".to_owned()),
472 CanGc::from_cx(cx),
473 );
474 return promise;
475 }
476 } else {
477 if min > (view.byte_length() as u64) {
480 promise.reject_error(
481 Error::Range(c"min is greater than byte length".to_owned()),
482 CanGc::from_cx(cx),
483 );
484 return promise;
485 }
486 }
487
488 if self.stream.get().is_none() {
490 promise.reject_error(
491 Error::Type(c"min is greater than byte length".to_owned()),
492 CanGc::from_cx(cx),
493 );
494 return promise;
495 }
496
497 let read_into_request = ReadIntoRequest::Read(promise.clone());
508
509 self.read(cx, view, min, &read_into_request);
511
512 promise
514 }
515
516 fn ReleaseLock(&self, can_gc: CanGc) -> Fallible<()> {
518 if self.stream.get().is_none() {
519 return Ok(());
521 }
522
523 self.release(can_gc)
525 }
526
527 fn Closed(&self) -> Rc<Promise> {
529 self.closed()
530 }
531
532 fn Cancel(&self, cx: &mut JSContext, reason: SafeHandleValue) -> Rc<Promise> {
534 self.generic_cancel(cx, &self.global(), reason)
535 }
536}
537
538impl ReadableStreamGenericReader for ReadableStreamBYOBReader {
539 fn get_closed_promise(&self) -> Rc<Promise> {
540 self.closed_promise.borrow().clone()
541 }
542
543 fn set_closed_promise(&self, promise: Rc<Promise>) {
544 *self.closed_promise.borrow_mut() = promise;
545 }
546
547 fn set_stream(&self, stream: Option<&ReadableStream>) {
548 self.stream.set(stream);
549 }
550
551 fn get_stream(&self) -> Option<DomRoot<ReadableStream>> {
552 self.stream.get()
553 }
554
555 fn as_byob_reader(&self) -> Option<&ReadableStreamBYOBReader> {
556 Some(self)
557 }
558}