1use std::cell::Cell;
6use std::collections::VecDeque;
7use std::mem;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::context::JSContext;
12use js::gc::CustomAutoRooterGuard;
13use js::jsapi::Heap;
14use js::jsval::{JSVal, UndefinedValue};
15use js::realm::CurrentRealm;
16use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
17use js::typedarray::{ArrayBufferView, ArrayBufferViewU8};
18use script_bindings::cell::DomRefCell;
19use script_bindings::reflector::{
20 Reflector, reflect_dom_object_with_cx, reflect_dom_object_with_proto_and_cx,
21};
22use script_bindings::root::Dom;
23
24use super::byteteereadintorequest::ByteTeeReadIntoRequest;
25use super::readablebytestreamcontroller::ReadableByteStreamController;
26use super::readablestreamgenericreader::ReadableStreamGenericReader;
27use crate::dom::bindings::buffer_source::{BufferSource, HeapBufferSource};
28use crate::dom::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::{
29 ReadableStreamBYOBReaderMethods, ReadableStreamBYOBReaderReadOptions,
30};
31use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamReadResult;
32use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
33use crate::dom::bindings::reflector::DomGlobal;
34use crate::dom::bindings::root::{DomRoot, MutNullableDom};
35use crate::dom::bindings::trace::RootedTraceableBox;
36use crate::dom::globalscope::GlobalScope;
37use crate::dom::promise::Promise;
38use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
39use crate::dom::stream::readablestream::ReadableStream;
40use crate::realms::enter_auto_realm;
41
42#[derive(Clone, JSTraceable, MallocSizeOf)]
44pub enum ReadIntoRequest {
45 Read(#[conditional_malloc_size_of] Rc<Promise>),
47 ByteTee {
48 byte_tee_read_into_request: Dom<ByteTeeReadIntoRequest>,
49 },
50}
51
52impl ReadIntoRequest {
53 pub fn chunk_steps(&self, cx: &mut JSContext, chunk: RootedTraceableBox<Heap<JSVal>>) {
55 match self {
56 ReadIntoRequest::Read(promise) => {
57 promise.resolve_native(
60 cx,
61 &ReadableStreamReadResult {
62 done: Some(false),
63 value: chunk,
64 },
65 );
66 },
67 ReadIntoRequest::ByteTee {
68 byte_tee_read_into_request,
69 } => byte_tee_read_into_request.enqueue_chunk_steps(
70 cx,
71 RootedTraceableBox::new(HeapBufferSource::<ArrayBufferViewU8>::new(
72 BufferSource::ArrayBufferView(Heap::boxed(chunk.get().to_object())),
73 )),
74 ),
75 }
76 }
77
78 pub fn close_steps(&self, cx: &mut JSContext, chunk: Option<RootedTraceableBox<Heap<JSVal>>>) {
80 match self {
81 ReadIntoRequest::Read(promise) => match chunk {
82 Some(chunk) => promise.resolve_native(
85 cx,
86 &ReadableStreamReadResult {
87 done: Some(true),
88 value: chunk,
89 },
90 ),
91 None => {
92 let result = RootedTraceableBox::new(Heap::default());
93 result.set(UndefinedValue());
94 promise.resolve_native(
95 cx,
96 &ReadableStreamReadResult {
97 done: Some(true),
98 value: result,
99 },
100 );
101 },
102 },
103 ReadIntoRequest::ByteTee {
104 byte_tee_read_into_request,
105 } => match chunk {
106 Some(chunk) => byte_tee_read_into_request
107 .close_steps(
108 cx,
109 Some(RootedTraceableBox::new(
110 HeapBufferSource::<ArrayBufferViewU8>::new(
111 BufferSource::ArrayBufferView(Heap::boxed(chunk.get().to_object())),
112 ),
113 )),
114 )
115 .expect("close steps should not fail"),
116 None => byte_tee_read_into_request
117 .close_steps(cx, None)
118 .expect("close steps should not fail"),
119 },
120 }
121 }
122
123 pub(crate) fn error_steps(&self, cx: &mut JSContext, e: SafeHandleValue) {
125 match self {
126 ReadIntoRequest::Read(promise) => {
127 promise.reject_native(cx, &e)
130 },
131 ReadIntoRequest::ByteTee {
132 byte_tee_read_into_request,
133 } => {
134 byte_tee_read_into_request.error_steps();
135 },
136 }
137 }
138}
139
140#[derive(Clone, JSTraceable, MallocSizeOf)]
143#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
144struct ByteTeeClosedPromiseRejectionHandler {
145 branch_1_controller: Dom<ReadableByteStreamController>,
146 branch_2_controller: Dom<ReadableByteStreamController>,
147 #[conditional_malloc_size_of]
148 canceled_1: Rc<Cell<bool>>,
149 #[conditional_malloc_size_of]
150 canceled_2: Rc<Cell<bool>>,
151 #[conditional_malloc_size_of]
152 cancel_promise: Rc<Promise>,
153 #[conditional_malloc_size_of]
154 reader_version: Rc<Cell<u64>>,
155 expected_version: u64,
156}
157
158impl Callback for ByteTeeClosedPromiseRejectionHandler {
159 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
162 if self.reader_version.get() != self.expected_version {
164 return;
165 }
166
167 self.branch_1_controller.error(cx, v);
169
170 self.branch_2_controller.error(cx, v);
172
173 if !self.canceled_1.get() || !self.canceled_2.get() {
175 self.cancel_promise.resolve_native(cx, &());
176 }
177 }
178}
179
180#[dom_struct]
182pub(crate) struct ReadableStreamBYOBReader {
183 reflector_: Reflector,
184
185 stream: MutNullableDom<ReadableStream>,
187
188 read_into_requests: DomRefCell<VecDeque<ReadIntoRequest>>,
189
190 #[conditional_malloc_size_of]
192 closed_promise: DomRefCell<Rc<Promise>>,
193}
194
195impl ReadableStreamBYOBReader {
196 fn new_with_proto(
197 cx: &mut JSContext,
198 global: &GlobalScope,
199 proto: Option<SafeHandleObject>,
200 ) -> DomRoot<ReadableStreamBYOBReader> {
201 reflect_dom_object_with_proto_and_cx(
202 Box::new(ReadableStreamBYOBReader::new_inherited(cx, global)),
203 global,
204 proto,
205 cx,
206 )
207 }
208
209 fn new_inherited(cx: &mut JSContext, global: &GlobalScope) -> ReadableStreamBYOBReader {
210 ReadableStreamBYOBReader {
211 reflector_: Reflector::new(),
212 stream: MutNullableDom::new(None),
213 read_into_requests: DomRefCell::new(Default::default()),
214 closed_promise: DomRefCell::new(Promise::new(cx, global)),
215 }
216 }
217
218 pub(crate) fn new(
219 cx: &mut JSContext,
220 global: &GlobalScope,
221 ) -> DomRoot<ReadableStreamBYOBReader> {
222 reflect_dom_object_with_cx(Box::new(Self::new_inherited(cx, global)), global, cx)
223 }
224
225 pub(crate) fn set_up(
227 &self,
228 cx: &mut JSContext,
229 stream: &ReadableStream,
230 global: &GlobalScope,
231 ) -> Fallible<()> {
232 if stream.is_locked() {
234 return Err(Error::Type(c"stream is locked".to_owned()));
235 }
236
237 if !stream.has_byte_controller() {
239 return Err(Error::Type(
240 c"stream controller is not a byte stream controller".to_owned(),
241 ));
242 }
243
244 self.generic_initialize(cx, global, stream);
246
247 self.read_into_requests.borrow_mut().clear();
249
250 Ok(())
251 }
252
253 pub(crate) fn release(&self, cx: &mut JSContext) -> Fallible<()> {
255 self.generic_release(cx).expect("Generic release failed");
257 rooted!(&in(cx) let mut error = UndefinedValue());
259 Error::Type(c"Reader is released".to_owned()).to_jsval(
260 cx,
261 &self.global(),
262 error.handle_mut(),
263 );
264
265 self.error_read_into_requests(cx, error.handle());
267 Ok(())
268 }
269
270 pub(crate) fn error_read_into_requests(&self, cx: &mut JSContext, e: SafeHandleValue) {
272 self.closed_promise.borrow().reject_native(cx, &e);
274
275 self.closed_promise.borrow().set_promise_is_handled(cx);
277
278 let mut read_into_requests = self.take_read_into_requests();
280
281 for request in read_into_requests.drain(0..) {
283 request.error_steps(cx, e);
285 }
286 }
287
288 fn take_read_into_requests(&self) -> VecDeque<ReadIntoRequest> {
289 mem::take(&mut *self.read_into_requests.borrow_mut())
290 }
291
292 pub(crate) fn add_read_into_request(&self, read_request: &ReadIntoRequest) {
294 self.read_into_requests
295 .borrow_mut()
296 .push_back(read_request.clone());
297 }
298
299 pub(crate) fn cancel(&self, cx: &mut JSContext) {
301 let mut read_into_requests = self.take_read_into_requests();
304 for request in read_into_requests.drain(0..) {
307 request.close_steps(cx, None);
309 }
310 }
311
312 pub(crate) fn close(&self, cx: &mut JSContext) {
313 self.closed_promise.borrow().resolve_native(cx, &());
315 }
316
317 pub(crate) fn read(
319 &self,
320 cx: &mut JSContext,
321 view: &HeapBufferSource<ArrayBufferViewU8>,
322 min: u64,
323 read_into_request: &ReadIntoRequest,
324 ) {
325 assert!(self.stream.get().is_some());
329
330 let stream = self.stream.get().unwrap();
331
332 stream.set_is_disturbed(true);
334 if stream.is_errored() {
336 rooted!(&in(cx) let mut error = UndefinedValue());
337 stream.get_stored_error(error.handle_mut());
338
339 read_into_request.error_steps(cx, error.handle());
340 } else {
341 stream.perform_pull_into(cx, read_into_request, view, min);
344 }
345 }
346
347 pub(crate) fn get_num_read_into_requests(&self) -> usize {
348 self.read_into_requests.borrow().len()
349 }
350
351 pub(crate) fn remove_read_into_request(&self) -> ReadIntoRequest {
352 self.read_into_requests
353 .borrow_mut()
354 .pop_front()
355 .expect("read into requests is empty")
356 }
357
358 #[allow(clippy::too_many_arguments)]
359 pub(crate) fn byte_tee_append_native_handler_to_closed_promise(
360 &self,
361 cx: &mut JSContext,
362 branch_1: &ReadableStream,
363 branch_2: &ReadableStream,
364 canceled_1: Rc<Cell<bool>>,
365 canceled_2: Rc<Cell<bool>>,
366 cancel_promise: Rc<Promise>,
367 reader_version: Rc<Cell<u64>>,
368 expected_version: u64,
369 ) {
370 let branch_1_controller = branch_1.get_byte_controller();
371
372 let branch_2_controller = branch_2.get_byte_controller();
373
374 let global = self.global();
375 let handler = PromiseNativeHandler::new(
376 cx,
377 &global,
378 None,
379 Some(Box::new(ByteTeeClosedPromiseRejectionHandler {
380 branch_1_controller: Dom::from_ref(&branch_1_controller),
381 branch_2_controller: Dom::from_ref(&branch_2_controller),
382 canceled_1,
383 canceled_2,
384 cancel_promise,
385 reader_version,
386 expected_version,
387 })),
388 );
389
390 let mut realm = enter_auto_realm(cx, &*global);
391 let cx = &mut realm.current_realm();
392
393 self.closed_promise
394 .borrow()
395 .append_native_handler(cx, &handler);
396 }
397}
398
399impl ReadableStreamBYOBReaderMethods<crate::DomTypeHolder> for ReadableStreamBYOBReader {
400 fn Constructor(
402 cx: &mut JSContext,
403 global: &GlobalScope,
404 proto: Option<SafeHandleObject>,
405 stream: &ReadableStream,
406 ) -> Fallible<DomRoot<Self>> {
407 let reader = Self::new_with_proto(cx, global, proto);
408
409 reader.set_up(cx, stream, global)?;
411
412 Ok(reader)
413 }
414
415 fn Read(
417 &self,
418 cx: &mut JSContext,
419 view: CustomAutoRooterGuard<ArrayBufferView>,
420 options: &ReadableStreamBYOBReaderReadOptions,
421 ) -> Rc<Promise> {
422 let view = HeapBufferSource::<ArrayBufferViewU8>::from_view(view);
423 let min = options.min;
424 let promise = Promise::new(cx, &self.global());
426
427 if view.byte_length() == 0 {
429 promise.reject_error(cx, Error::Type(c"view byte length is 0".to_owned()));
430 return promise;
431 }
432 if view.viewed_buffer_array_byte_length(cx) == 0 {
435 promise.reject_error(
436 cx,
437 Error::Type(c"viewed buffer byte length is 0".to_owned()),
438 );
439 return promise;
440 }
441
442 if view.is_detached_buffer(cx) {
445 promise.reject_error(cx, Error::Type(c"view is detached".to_owned()));
446 return promise;
447 }
448
449 if min == 0 {
451 promise.reject_error(cx, Error::Type(c"min is 0".to_owned()));
452 return promise;
453 }
454
455 if view.has_typed_array_name() {
457 if min > (view.get_typed_array_length() as u64) {
459 promise.reject_error(
460 cx,
461 Error::Range(c"min is greater than array length".to_owned()),
462 );
463 return promise;
464 }
465 } else {
466 if min > (view.byte_length() as u64) {
469 promise.reject_error(
470 cx,
471 Error::Range(c"min is greater than byte length".to_owned()),
472 );
473 return promise;
474 }
475 }
476
477 if self.stream.get().is_none() {
479 promise.reject_error(
480 cx,
481 Error::Type(c"min is greater than byte length".to_owned()),
482 );
483 return promise;
484 }
485
486 let read_into_request = ReadIntoRequest::Read(promise.clone());
497
498 self.read(cx, &view, min, &read_into_request);
500
501 promise
503 }
504
505 fn ReleaseLock(&self, cx: &mut JSContext) -> Fallible<()> {
507 if self.stream.get().is_none() {
508 return Ok(());
510 }
511
512 self.release(cx)
514 }
515
516 fn Closed(&self) -> Rc<Promise> {
518 self.closed()
519 }
520
521 fn Cancel(&self, cx: &mut JSContext, reason: SafeHandleValue) -> Rc<Promise> {
523 self.generic_cancel(cx, &self.global(), reason)
524 }
525}
526
527impl ReadableStreamGenericReader for ReadableStreamBYOBReader {
528 fn get_closed_promise(&self) -> Rc<Promise> {
529 self.closed_promise.borrow().clone()
530 }
531
532 fn set_closed_promise(&self, promise: Rc<Promise>) {
533 *self.closed_promise.borrow_mut() = promise;
534 }
535
536 fn set_stream(&self, stream: Option<&ReadableStream>) {
537 self.stream.set(stream);
538 }
539
540 fn get_stream(&self) -> Option<DomRoot<ReadableStream>> {
541 self.stream.get()
542 }
543
544 fn as_byob_reader(&self) -> Option<&ReadableStreamBYOBReader> {
545 Some(self)
546 }
547}