1use std::cell::Cell;
6use std::collections::VecDeque;
7use std::mem;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::jsapi::Heap;
12use js::jsval::{JSVal, UndefinedValue};
13use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
14
15use super::bindings::reflector::reflect_dom_object;
16use super::bindings::root::MutNullableDom;
17use super::readablebytestreamcontroller::ReadableByteStreamController;
18use super::types::ReadableStreamDefaultController;
19use crate::dom::bindings::cell::DomRefCell;
20use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::{
21 ReadableStreamDefaultReaderMethods, ReadableStreamReadResult,
22};
23use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
24use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
25use crate::dom::bindings::root::{Dom, DomRoot};
26use crate::dom::bindings::trace::RootedTraceableBox;
27use crate::dom::defaultteereadrequest::DefaultTeeReadRequest;
28use crate::dom::globalscope::GlobalScope;
29use crate::dom::promise::Promise;
30use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
31use crate::dom::readablestream::{ReadableStream, bytes_from_chunk_jsval};
32use crate::dom::readablestreamgenericreader::ReadableStreamGenericReader;
33use crate::realms::{InRealm, enter_realm};
34use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
35
36type ReadAllBytesSuccessSteps = dyn Fn(&[u8]);
37type ReadAllBytesFailureSteps = dyn Fn(SafeJSContext, SafeHandleValue);
38
39impl js::gc::Rootable for ContinueReadMicrotask {}
40
41#[derive(Clone, JSTraceable, MallocSizeOf)]
47#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
48struct ContinueReadMicrotask {
49 reader: Dom<ReadableStreamDefaultReader>,
50 request: ReadRequest,
51}
52
53impl Callback for ContinueReadMicrotask {
54 fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
55 self.reader.read(cx, &self.request, can_gc);
58 }
59}
60
61fn read_loop(
63 reader: &ReadableStreamDefaultReader,
64 cx: SafeJSContext,
65 success_steps: Rc<ReadAllBytesSuccessSteps>,
66 failure_steps: Rc<ReadAllBytesFailureSteps>,
67 can_gc: CanGc,
68) {
69 let req = ReadRequest::ReadLoop {
74 success_steps,
75 failure_steps,
76 reader: Dom::from_ref(reader),
77 bytes: Rc::new(DomRefCell::new(Vec::new())),
78 };
79 reader.read(cx, &req, can_gc);
81}
82
83#[derive(Clone, JSTraceable, MallocSizeOf)]
85pub(crate) enum ReadRequest {
86 Read(#[conditional_malloc_size_of] Rc<Promise>),
88 DefaultTee {
90 tee_read_request: Dom<DefaultTeeReadRequest>,
91 },
92 ReadLoop {
95 #[ignore_malloc_size_of = "Rc is hard"]
96 #[no_trace]
97 success_steps: Rc<ReadAllBytesSuccessSteps>,
98 #[ignore_malloc_size_of = "Rc is hard"]
99 #[no_trace]
100 failure_steps: Rc<ReadAllBytesFailureSteps>,
101 reader: Dom<ReadableStreamDefaultReader>,
102 #[conditional_malloc_size_of]
103 bytes: Rc<DomRefCell<Vec<u8>>>,
104 },
105}
106
107impl ReadRequest {
108 pub(crate) fn chunk_steps(&self, chunk: RootedTraceableBox<Heap<JSVal>>, can_gc: CanGc) {
110 match self {
111 ReadRequest::Read(promise) => {
112 promise.resolve_native(
115 &ReadableStreamReadResult {
116 done: Some(false),
117 value: chunk,
118 },
119 can_gc,
120 );
121 },
122 ReadRequest::DefaultTee { tee_read_request } => {
123 tee_read_request.enqueue_chunk_steps(chunk);
124 },
125 ReadRequest::ReadLoop {
126 success_steps: _,
127 failure_steps,
128 reader,
129 bytes,
130 } => {
131 let cx = GlobalScope::get_cx();
133 let global = reader.global();
134
135 match bytes_from_chunk_jsval(cx, &chunk, can_gc) {
136 Ok(vec) => {
137 bytes.borrow_mut().extend_from_slice(&vec);
139
140 let tick = Promise::new(&global, can_gc);
144 tick.resolve_native(&(), can_gc);
145
146 let handler = PromiseNativeHandler::new(
147 &global,
148 Some(Box::new(ContinueReadMicrotask {
149 reader: Dom::from_ref(reader),
150 request: self.clone(),
151 })),
152 None,
153 can_gc,
154 );
155
156 let realm = enter_realm(&*global);
157 let comp = InRealm::Entered(&realm);
158 tick.append_native_handler(&handler, comp, can_gc);
159 },
160 Err(err) => {
161 rooted!(in(*cx) let mut v = UndefinedValue());
163 err.to_jsval(cx, &global, v.handle_mut(), can_gc);
164 (failure_steps)(cx, v.handle());
165 },
166 }
167 },
168 }
169 }
170
171 pub(crate) fn close_steps(&self, can_gc: CanGc) {
173 match self {
174 ReadRequest::Read(promise) => {
175 let result = RootedTraceableBox::new(Heap::default());
178 result.set(UndefinedValue());
179 promise.resolve_native(
180 &ReadableStreamReadResult {
181 done: Some(true),
182 value: result,
183 },
184 can_gc,
185 );
186 },
187 ReadRequest::DefaultTee { tee_read_request } => {
188 tee_read_request.close_steps(can_gc);
189 },
190 ReadRequest::ReadLoop {
191 success_steps,
192 reader,
193 bytes,
194 ..
195 } => {
196 (success_steps)(&bytes.borrow());
198
199 reader
200 .release(can_gc)
201 .expect("Releasing the read-all-bytes reader should succeed");
202 },
203 }
204 }
205
206 pub(crate) fn error_steps(&self, e: SafeHandleValue, can_gc: CanGc) {
208 match self {
209 ReadRequest::Read(promise) => {
210 promise.reject_native(&e, can_gc)
213 },
214 ReadRequest::DefaultTee { tee_read_request } => {
215 tee_read_request.error_steps();
216 },
217 ReadRequest::ReadLoop {
218 failure_steps,
219 reader,
220 ..
221 } => {
222 let cx = GlobalScope::get_cx();
224 (failure_steps)(cx, e);
225
226 reader
227 .release(can_gc)
228 .expect("Releasing the read-all-bytes reader should succeed");
229 },
230 }
231 }
232}
233
234#[derive(Clone, JSTraceable, MallocSizeOf)]
237#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
238struct ClosedPromiseRejectionHandler {
239 branch_1_controller: Dom<ReadableStreamDefaultController>,
240 branch_2_controller: Dom<ReadableStreamDefaultController>,
241 #[conditional_malloc_size_of]
242 canceled_1: Rc<Cell<bool>>,
243 #[conditional_malloc_size_of]
244 canceled_2: Rc<Cell<bool>>,
245 #[conditional_malloc_size_of]
246 cancel_promise: Rc<Promise>,
247}
248
249impl Callback for ClosedPromiseRejectionHandler {
250 fn callback(&self, _cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
253 let branch_1_controller = &self.branch_1_controller;
254 let branch_2_controller = &self.branch_2_controller;
255
256 branch_1_controller.error(v, can_gc);
258 branch_2_controller.error(v, can_gc);
260
261 if !self.canceled_1.get() || !self.canceled_2.get() {
263 self.cancel_promise.resolve_native(&(), can_gc);
264 }
265 }
266}
267
268#[dom_struct]
270pub(crate) struct ReadableStreamDefaultReader {
271 reflector_: Reflector,
272
273 stream: MutNullableDom<ReadableStream>,
275
276 read_requests: DomRefCell<VecDeque<ReadRequest>>,
277
278 #[conditional_malloc_size_of]
280 closed_promise: DomRefCell<Rc<Promise>>,
281}
282
283impl ReadableStreamDefaultReader {
284 fn new_with_proto(
285 global: &GlobalScope,
286 proto: Option<SafeHandleObject>,
287 can_gc: CanGc,
288 ) -> DomRoot<ReadableStreamDefaultReader> {
289 reflect_dom_object_with_proto(
290 Box::new(ReadableStreamDefaultReader::new_inherited(global, can_gc)),
291 global,
292 proto,
293 can_gc,
294 )
295 }
296
297 fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> ReadableStreamDefaultReader {
298 ReadableStreamDefaultReader {
299 reflector_: Reflector::new(),
300 stream: MutNullableDom::new(None),
301 read_requests: DomRefCell::new(Default::default()),
302 closed_promise: DomRefCell::new(Promise::new(global, can_gc)),
303 }
304 }
305
306 pub(crate) fn new(global: &GlobalScope, can_gc: CanGc) -> DomRoot<ReadableStreamDefaultReader> {
307 reflect_dom_object(
308 Box::new(Self::new_inherited(global, can_gc)),
309 global,
310 can_gc,
311 )
312 }
313
314 pub(crate) fn set_up(
316 &self,
317 stream: &ReadableStream,
318 global: &GlobalScope,
319 can_gc: CanGc,
320 ) -> Fallible<()> {
321 if stream.is_locked() {
323 return Err(Error::Type("stream is locked".to_owned()));
324 }
325 self.generic_initialize(global, stream, can_gc);
328
329 self.read_requests.borrow_mut().clear();
331
332 Ok(())
333 }
334
335 pub(crate) fn close(&self, can_gc: CanGc) {
337 self.closed_promise.borrow().resolve_native(&(), can_gc);
339 let mut read_requests = self.take_read_requests();
342 for request in read_requests.drain(0..) {
345 request.close_steps(can_gc);
347 }
348 }
349
350 pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
352 self.read_requests
353 .borrow_mut()
354 .push_back(read_request.clone());
355 }
356
357 pub(crate) fn get_num_read_requests(&self) -> usize {
359 self.read_requests.borrow().len()
360 }
361
362 pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
364 self.closed_promise.borrow().reject_native(&e, can_gc);
366
367 self.closed_promise.borrow().set_promise_is_handled();
369
370 self.error_read_requests(e, can_gc);
372 }
373
374 pub(crate) fn remove_read_request(&self) -> ReadRequest {
376 self.read_requests
377 .borrow_mut()
378 .pop_front()
379 .expect("Reader must have read request when remove is called into.")
380 }
381
382 pub(crate) fn release(&self, can_gc: CanGc) -> Fallible<()> {
384 self.generic_release(can_gc)?;
386 let cx = GlobalScope::get_cx();
388 rooted!(in(*cx) let mut error = UndefinedValue());
389 Error::Type("Reader is released".to_owned()).to_jsval(
390 cx,
391 &self.global(),
392 error.handle_mut(),
393 can_gc,
394 );
395
396 self.error_read_requests(error.handle(), can_gc);
398 Ok(())
399 }
400
401 fn take_read_requests(&self) -> VecDeque<ReadRequest> {
402 mem::take(&mut *self.read_requests.borrow_mut())
403 }
404
405 fn error_read_requests(&self, rval: SafeHandleValue, can_gc: CanGc) {
407 let mut read_requests = self.take_read_requests();
409
410 for request in read_requests.drain(0..) {
412 request.error_steps(rval, can_gc);
413 }
414 }
415
416 pub(crate) fn read(&self, cx: SafeJSContext, read_request: &ReadRequest, can_gc: CanGc) {
418 assert!(self.stream.get().is_some());
422
423 let stream = self.stream.get().unwrap();
424
425 stream.set_is_disturbed(true);
427 if stream.is_closed() {
429 read_request.close_steps(can_gc);
430 } else if stream.is_errored() {
431 let cx = GlobalScope::get_cx();
434 rooted!(in(*cx) let mut error = UndefinedValue());
435 stream.get_stored_error(error.handle_mut());
436 read_request.error_steps(error.handle(), can_gc);
437 } else {
438 assert!(stream.is_readable());
441 stream.perform_pull_steps(cx, read_request, can_gc);
443 }
444 }
445
446 pub(crate) fn append_native_handler_to_closed_promise(
448 &self,
449 branch_1: &ReadableStream,
450 branch_2: &ReadableStream,
451 canceled_1: Rc<Cell<bool>>,
452 canceled_2: Rc<Cell<bool>>,
453 cancel_promise: Rc<Promise>,
454 can_gc: CanGc,
455 ) {
456 let branch_1_controller = branch_1.get_default_controller();
457
458 let branch_2_controller = branch_2.get_default_controller();
459
460 let global = self.global();
461 let handler = PromiseNativeHandler::new(
462 &global,
463 None,
464 Some(Box::new(ClosedPromiseRejectionHandler {
465 branch_1_controller: Dom::from_ref(&branch_1_controller),
466 branch_2_controller: Dom::from_ref(&branch_2_controller),
467 canceled_1,
468 canceled_2,
469 cancel_promise,
470 })),
471 can_gc,
472 );
473
474 let realm = enter_realm(&*global);
475 let comp = InRealm::Entered(&realm);
476
477 self.closed_promise
478 .borrow()
479 .append_native_handler(&handler, comp, can_gc);
480 }
481
482 pub(crate) fn read_all_bytes(
484 &self,
485 cx: SafeJSContext,
486 success_steps: Rc<ReadAllBytesSuccessSteps>,
487 failure_steps: Rc<ReadAllBytesFailureSteps>,
488 can_gc: CanGc,
489 ) {
490 read_loop(self, cx, success_steps, failure_steps, can_gc);
495 }
496
497 pub(crate) fn process_read_requests(
499 &self,
500 cx: SafeJSContext,
501 controller: DomRoot<ReadableByteStreamController>,
502 can_gc: CanGc,
503 ) -> Fallible<()> {
504 while !self.read_requests.borrow().is_empty() {
506 if controller.get_queue_total_size() == 0.0 {
508 return Ok(());
509 }
510
511 let read_request = self.remove_read_request();
514
515 controller.fill_read_request_from_queue(cx, &read_request, can_gc)?;
517 }
518 Ok(())
519 }
520}
521
522impl ReadableStreamDefaultReaderMethods<crate::DomTypeHolder> for ReadableStreamDefaultReader {
523 fn Constructor(
525 global: &GlobalScope,
526 proto: Option<SafeHandleObject>,
527 can_gc: CanGc,
528 stream: &ReadableStream,
529 ) -> Fallible<DomRoot<Self>> {
530 let reader = Self::new_with_proto(global, proto, can_gc);
531
532 Self::set_up(&reader, stream, global, can_gc)?;
534
535 Ok(reader)
536 }
537
538 fn Read(&self, can_gc: CanGc) -> Rc<Promise> {
540 let cx = GlobalScope::get_cx();
541 if self.stream.get().is_none() {
543 rooted!(in(*cx) let mut error = UndefinedValue());
544 Error::Type("stream is undefined".to_owned()).to_jsval(
545 cx,
546 &self.global(),
547 error.handle_mut(),
548 can_gc,
549 );
550 return Promise::new_rejected(&self.global(), cx, error.handle(), can_gc);
551 }
552 let promise = Promise::new(&self.global(), can_gc);
554
555 let read_request = ReadRequest::Read(promise.clone());
569
570 self.read(cx, &read_request, can_gc);
572
573 promise
575 }
576
577 fn ReleaseLock(&self, can_gc: CanGc) -> Fallible<()> {
579 if self.stream.get().is_none() {
580 return Ok(());
582 }
583
584 self.release(can_gc)
586 }
587
588 fn Closed(&self) -> Rc<Promise> {
590 self.closed()
591 }
592
593 fn Cancel(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
595 self.generic_cancel(cx, &self.global(), reason, can_gc)
596 }
597}
598
599impl ReadableStreamGenericReader for ReadableStreamDefaultReader {
600 fn get_closed_promise(&self) -> Rc<Promise> {
601 self.closed_promise.borrow().clone()
602 }
603
604 fn set_closed_promise(&self, promise: Rc<Promise>) {
605 *self.closed_promise.borrow_mut() = promise;
606 }
607
608 fn set_stream(&self, stream: Option<&ReadableStream>) {
609 self.stream.set(stream);
610 }
611
612 fn get_stream(&self) -> Option<DomRoot<ReadableStream>> {
613 self.stream.get()
614 }
615
616 fn as_default_reader(&self) -> Option<&ReadableStreamDefaultReader> {
617 Some(self)
618 }
619}