1use std::cell::Cell;
6use std::collections::VecDeque;
7use std::mem;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::context::JSContext;
12use js::gc::CustomAutoRooterGuard;
13use js::jsapi::Heap;
14use js::jsval::{JSVal, UndefinedValue};
15use js::realm::CurrentRealm;
16use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
17use js::typedarray::{ArrayBufferView, ArrayBufferViewU8};
18use script_bindings::cell::DomRefCell;
19use script_bindings::reflector::{Reflector, reflect_dom_object, reflect_dom_object_with_proto};
20use script_bindings::root::Dom;
21
22use super::byteteereadintorequest::ByteTeeReadIntoRequest;
23use super::readablebytestreamcontroller::ReadableByteStreamController;
24use super::readablestreamgenericreader::ReadableStreamGenericReader;
25use crate::dom::bindings::buffer_source::{BufferSource, HeapBufferSource};
26use crate::dom::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::{
27 ReadableStreamBYOBReaderMethods, ReadableStreamBYOBReaderReadOptions,
28};
29use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamReadResult;
30use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
31use crate::dom::bindings::reflector::DomGlobal;
32use crate::dom::bindings::root::{DomRoot, MutNullableDom};
33use crate::dom::bindings::trace::RootedTraceableBox;
34use crate::dom::globalscope::GlobalScope;
35use crate::dom::promise::Promise;
36use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
37use crate::dom::stream::readablestream::ReadableStream;
38use crate::realms::enter_auto_realm;
39use crate::script_runtime::CanGc;
40
41#[derive(Clone, JSTraceable, MallocSizeOf)]
43pub enum ReadIntoRequest {
44 Read(#[conditional_malloc_size_of] Rc<Promise>),
46 ByteTee {
47 byte_tee_read_into_request: Dom<ByteTeeReadIntoRequest>,
48 },
49}
50
51impl ReadIntoRequest {
52 pub fn chunk_steps(&self, chunk: RootedTraceableBox<Heap<JSVal>>, can_gc: CanGc) {
54 match self {
55 ReadIntoRequest::Read(promise) => {
56 promise.resolve_native(
59 &ReadableStreamReadResult {
60 done: Some(false),
61 value: chunk,
62 },
63 can_gc,
64 );
65 },
66 ReadIntoRequest::ByteTee {
67 byte_tee_read_into_request,
68 } => byte_tee_read_into_request.enqueue_chunk_steps(RootedTraceableBox::new(
69 HeapBufferSource::<ArrayBufferViewU8>::new(BufferSource::ArrayBufferView(
70 Heap::boxed(chunk.get().to_object()),
71 )),
72 )),
73 }
74 }
75
76 pub fn close_steps(&self, cx: &mut JSContext, chunk: Option<RootedTraceableBox<Heap<JSVal>>>) {
78 match self {
79 ReadIntoRequest::Read(promise) => match chunk {
80 Some(chunk) => promise.resolve_native(
83 &ReadableStreamReadResult {
84 done: Some(true),
85 value: chunk,
86 },
87 CanGc::from_cx(cx),
88 ),
89 None => {
90 let result = RootedTraceableBox::new(Heap::default());
91 result.set(UndefinedValue());
92 promise.resolve_native(
93 &ReadableStreamReadResult {
94 done: Some(true),
95 value: result,
96 },
97 CanGc::from_cx(cx),
98 );
99 },
100 },
101 ReadIntoRequest::ByteTee {
102 byte_tee_read_into_request,
103 } => match chunk {
104 Some(chunk) => byte_tee_read_into_request
105 .close_steps(
106 cx,
107 Some(RootedTraceableBox::new(
108 HeapBufferSource::<ArrayBufferViewU8>::new(
109 BufferSource::ArrayBufferView(Heap::boxed(chunk.get().to_object())),
110 ),
111 )),
112 )
113 .expect("close steps should not fail"),
114 None => byte_tee_read_into_request
115 .close_steps(cx, None)
116 .expect("close steps should not fail"),
117 },
118 }
119 }
120
121 pub(crate) fn error_steps(&self, e: SafeHandleValue, can_gc: CanGc) {
123 match self {
124 ReadIntoRequest::Read(promise) => {
125 promise.reject_native(&e, can_gc)
128 },
129 ReadIntoRequest::ByteTee {
130 byte_tee_read_into_request,
131 } => {
132 byte_tee_read_into_request.error_steps();
133 },
134 }
135 }
136}
137
138#[derive(Clone, JSTraceable, MallocSizeOf)]
141#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
142struct ByteTeeClosedPromiseRejectionHandler {
143 branch_1_controller: Dom<ReadableByteStreamController>,
144 branch_2_controller: Dom<ReadableByteStreamController>,
145 #[conditional_malloc_size_of]
146 canceled_1: Rc<Cell<bool>>,
147 #[conditional_malloc_size_of]
148 canceled_2: Rc<Cell<bool>>,
149 #[conditional_malloc_size_of]
150 cancel_promise: Rc<Promise>,
151 #[conditional_malloc_size_of]
152 reader_version: Rc<Cell<u64>>,
153 expected_version: u64,
154}
155
156impl Callback for ByteTeeClosedPromiseRejectionHandler {
157 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
160 if self.reader_version.get() != self.expected_version {
162 return;
163 }
164
165 self.branch_1_controller.error(cx, v);
167
168 self.branch_2_controller.error(cx, v);
170
171 if !self.canceled_1.get() || !self.canceled_2.get() {
173 self.cancel_promise.resolve_native(&(), CanGc::from_cx(cx));
174 }
175 }
176}
177
178#[dom_struct]
180pub(crate) struct ReadableStreamBYOBReader {
181 reflector_: Reflector,
182
183 stream: MutNullableDom<ReadableStream>,
185
186 read_into_requests: DomRefCell<VecDeque<ReadIntoRequest>>,
187
188 #[conditional_malloc_size_of]
190 closed_promise: DomRefCell<Rc<Promise>>,
191}
192
193impl ReadableStreamBYOBReader {
194 fn new_with_proto(
195 global: &GlobalScope,
196 proto: Option<SafeHandleObject>,
197 can_gc: CanGc,
198 ) -> DomRoot<ReadableStreamBYOBReader> {
199 reflect_dom_object_with_proto(
200 Box::new(ReadableStreamBYOBReader::new_inherited(global, can_gc)),
201 global,
202 proto,
203 can_gc,
204 )
205 }
206
207 fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> ReadableStreamBYOBReader {
208 ReadableStreamBYOBReader {
209 reflector_: Reflector::new(),
210 stream: MutNullableDom::new(None),
211 read_into_requests: DomRefCell::new(Default::default()),
212 closed_promise: DomRefCell::new(Promise::new(global, can_gc)),
213 }
214 }
215
216 pub(crate) fn new(global: &GlobalScope, can_gc: CanGc) -> DomRoot<ReadableStreamBYOBReader> {
217 reflect_dom_object(
218 Box::new(Self::new_inherited(global, can_gc)),
219 global,
220 can_gc,
221 )
222 }
223
224 pub(crate) fn set_up(
226 &self,
227 stream: &ReadableStream,
228 global: &GlobalScope,
229 can_gc: CanGc,
230 ) -> Fallible<()> {
231 if stream.is_locked() {
233 return Err(Error::Type(c"stream is locked".to_owned()));
234 }
235
236 if !stream.has_byte_controller() {
238 return Err(Error::Type(
239 c"stream controller is not a byte stream controller".to_owned(),
240 ));
241 }
242
243 self.generic_initialize(global, stream, can_gc);
245
246 self.read_into_requests.borrow_mut().clear();
248
249 Ok(())
250 }
251
252 pub(crate) fn release(&self, can_gc: CanGc) -> Fallible<()> {
254 self.generic_release(can_gc)
256 .expect("Generic release failed");
257 let cx = GlobalScope::get_cx();
259 rooted!(in(*cx) let mut error = UndefinedValue());
260 Error::Type(c"Reader is released".to_owned()).to_jsval(
261 cx,
262 &self.global(),
263 error.handle_mut(),
264 can_gc,
265 );
266
267 self.error_read_into_requests(error.handle(), can_gc);
269 Ok(())
270 }
271
272 pub(crate) fn error_read_into_requests(&self, e: SafeHandleValue, can_gc: CanGc) {
274 self.closed_promise.borrow().reject_native(&e, can_gc);
276
277 self.closed_promise.borrow().set_promise_is_handled();
279
280 let mut read_into_requests = self.take_read_into_requests();
282
283 for request in read_into_requests.drain(0..) {
285 request.error_steps(e, can_gc);
287 }
288 }
289
290 fn take_read_into_requests(&self) -> VecDeque<ReadIntoRequest> {
291 mem::take(&mut *self.read_into_requests.borrow_mut())
292 }
293
294 pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
296 self.read_into_requests
297 .borrow_mut()
298 .push_back(read_request.clone());
299 }
300
301 pub(crate) fn cancel(&self, cx: &mut JSContext) {
303 let mut read_into_requests = self.take_read_into_requests();
306 for request in read_into_requests.drain(0..) {
309 request.close_steps(cx, None);
311 }
312 }
313
314 pub(crate) fn close(&self, can_gc: CanGc) {
315 self.closed_promise.borrow().resolve_native(&(), can_gc);
317 }
318
319 pub(crate) fn read(
321 &self,
322 cx: &mut JSContext,
323 view: &HeapBufferSource<ArrayBufferViewU8>,
324 min: u64,
325 read_into_request: &ReadIntoRequest,
326 ) {
327 assert!(self.stream.get().is_some());
331
332 let stream = self.stream.get().unwrap();
333
334 stream.set_is_disturbed(true);
336 if stream.is_errored() {
338 rooted!(&in(cx) let mut error = UndefinedValue());
339 stream.get_stored_error(error.handle_mut());
340
341 read_into_request.error_steps(error.handle(), CanGc::from_cx(cx));
342 } else {
343 stream.perform_pull_into(cx, read_into_request, view, min);
346 }
347 }
348
349 pub(crate) fn get_num_read_into_requests(&self) -> usize {
350 self.read_into_requests.borrow().len()
351 }
352
353 pub(crate) fn remove_read_into_request(&self) -> ReadIntoRequest {
354 self.read_into_requests
355 .borrow_mut()
356 .pop_front()
357 .expect("read into requests is empty")
358 }
359
360 #[allow(clippy::too_many_arguments)]
361 pub(crate) fn byte_tee_append_native_handler_to_closed_promise(
362 &self,
363 cx: &mut JSContext,
364 branch_1: &ReadableStream,
365 branch_2: &ReadableStream,
366 canceled_1: Rc<Cell<bool>>,
367 canceled_2: Rc<Cell<bool>>,
368 cancel_promise: Rc<Promise>,
369 reader_version: Rc<Cell<u64>>,
370 expected_version: u64,
371 ) {
372 let branch_1_controller = branch_1.get_byte_controller();
373
374 let branch_2_controller = branch_2.get_byte_controller();
375
376 let global = self.global();
377 let handler = PromiseNativeHandler::new(
378 &global,
379 None,
380 Some(Box::new(ByteTeeClosedPromiseRejectionHandler {
381 branch_1_controller: Dom::from_ref(&branch_1_controller),
382 branch_2_controller: Dom::from_ref(&branch_2_controller),
383 canceled_1,
384 canceled_2,
385 cancel_promise,
386 reader_version,
387 expected_version,
388 })),
389 CanGc::from_cx(cx),
390 );
391
392 let mut realm = enter_auto_realm(cx, &*global);
393 let cx = &mut realm.current_realm();
394
395 self.closed_promise
396 .borrow()
397 .append_native_handler(cx, &handler);
398 }
399}
400
401impl ReadableStreamBYOBReaderMethods<crate::DomTypeHolder> for ReadableStreamBYOBReader {
402 fn Constructor(
404 global: &GlobalScope,
405 proto: Option<SafeHandleObject>,
406 can_gc: CanGc,
407 stream: &ReadableStream,
408 ) -> Fallible<DomRoot<Self>> {
409 let reader = Self::new_with_proto(global, proto, can_gc);
410
411 Self::set_up(&reader, stream, global, can_gc)?;
413
414 Ok(reader)
415 }
416
417 fn Read(
419 &self,
420 cx: &mut JSContext,
421 view: CustomAutoRooterGuard<ArrayBufferView>,
422 options: &ReadableStreamBYOBReaderReadOptions,
423 ) -> Rc<Promise> {
424 let view = HeapBufferSource::<ArrayBufferViewU8>::from_view(view);
425 let min = options.min;
426 let promise = Promise::new2(cx, &self.global());
428
429 if view.byte_length() == 0 {
431 promise.reject_error(
432 Error::Type(c"view byte length is 0".to_owned()),
433 CanGc::from_cx(cx),
434 );
435 return promise;
436 }
437 if view.viewed_buffer_array_byte_length(cx.into()) == 0 {
440 promise.reject_error(
441 Error::Type(c"viewed buffer byte length is 0".to_owned()),
442 CanGc::from_cx(cx),
443 );
444 return promise;
445 }
446
447 if view.is_detached_buffer(cx.into()) {
450 promise.reject_error(
451 Error::Type(c"view is detached".to_owned()),
452 CanGc::from_cx(cx),
453 );
454 return promise;
455 }
456
457 if min == 0 {
459 promise.reject_error(Error::Type(c"min is 0".to_owned()), CanGc::from_cx(cx));
460 return promise;
461 }
462
463 if view.has_typed_array_name() {
465 if min > (view.get_typed_array_length() as u64) {
467 promise.reject_error(
468 Error::Range(c"min is greater than array length".to_owned()),
469 CanGc::from_cx(cx),
470 );
471 return promise;
472 }
473 } else {
474 if min > (view.byte_length() as u64) {
477 promise.reject_error(
478 Error::Range(c"min is greater than byte length".to_owned()),
479 CanGc::from_cx(cx),
480 );
481 return promise;
482 }
483 }
484
485 if self.stream.get().is_none() {
487 promise.reject_error(
488 Error::Type(c"min is greater than byte length".to_owned()),
489 CanGc::from_cx(cx),
490 );
491 return promise;
492 }
493
494 let read_into_request = ReadIntoRequest::Read(promise.clone());
505
506 self.read(cx, &view, min, &read_into_request);
508
509 promise
511 }
512
513 fn ReleaseLock(&self, can_gc: CanGc) -> Fallible<()> {
515 if self.stream.get().is_none() {
516 return Ok(());
518 }
519
520 self.release(can_gc)
522 }
523
524 fn Closed(&self) -> Rc<Promise> {
526 self.closed()
527 }
528
529 fn Cancel(&self, cx: &mut JSContext, reason: SafeHandleValue) -> Rc<Promise> {
531 self.generic_cancel(cx, &self.global(), reason)
532 }
533}
534
535impl ReadableStreamGenericReader for ReadableStreamBYOBReader {
536 fn get_closed_promise(&self) -> Rc<Promise> {
537 self.closed_promise.borrow().clone()
538 }
539
540 fn set_closed_promise(&self, promise: Rc<Promise>) {
541 *self.closed_promise.borrow_mut() = promise;
542 }
543
544 fn set_stream(&self, stream: Option<&ReadableStream>) {
545 self.stream.set(stream);
546 }
547
548 fn get_stream(&self) -> Option<DomRoot<ReadableStream>> {
549 self.stream.get()
550 }
551
552 fn as_byob_reader(&self) -> Option<&ReadableStreamBYOBReader> {
553 Some(self)
554 }
555}