1use std::cell::Cell;
6use std::collections::VecDeque;
7use std::mem;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::jsapi::Heap;
12use js::jsval::{JSVal, UndefinedValue};
13use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
14
15use super::bindings::reflector::reflect_dom_object;
16use super::bindings::root::MutNullableDom;
17use super::readablebytestreamcontroller::ReadableByteStreamController;
18use super::types::ReadableStreamDefaultController;
19use crate::dom::bindings::cell::DomRefCell;
20use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::{
21 ReadableStreamDefaultReaderMethods, ReadableStreamReadResult,
22};
23use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
24use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
25use crate::dom::bindings::root::{Dom, DomRoot};
26use crate::dom::bindings::trace::RootedTraceableBox;
27use crate::dom::defaultteereadrequest::DefaultTeeReadRequest;
28use crate::dom::globalscope::GlobalScope;
29use crate::dom::promise::Promise;
30use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
31use crate::dom::readablestream::{ReadableStream, bytes_from_chunk_jsval};
32use crate::dom::readablestreamgenericreader::ReadableStreamGenericReader;
33use crate::realms::{InRealm, enter_realm};
34use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
35
36type ReadAllBytesSuccessSteps = dyn Fn(&[u8]);
37type ReadAllBytesFailureSteps = dyn Fn(SafeJSContext, SafeHandleValue);
38
39impl js::gc::Rootable for ContinueReadMicrotask {}
40
41#[derive(Clone, JSTraceable, MallocSizeOf)]
47#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
48struct ContinueReadMicrotask {
49 reader: Dom<ReadableStreamDefaultReader>,
50 request: ReadRequest,
51}
52
53impl Callback for ContinueReadMicrotask {
54 fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
55 self.reader.read(cx, &self.request, can_gc);
58 }
59}
60
61fn read_loop(
63 reader: &ReadableStreamDefaultReader,
64 cx: SafeJSContext,
65 success_steps: Rc<ReadAllBytesSuccessSteps>,
66 failure_steps: Rc<ReadAllBytesFailureSteps>,
67 can_gc: CanGc,
68) {
69 let req = ReadRequest::ReadLoop {
74 success_steps,
75 failure_steps,
76 reader: Dom::from_ref(reader),
77 bytes: Rc::new(DomRefCell::new(Vec::new())),
78 };
79 reader.read(cx, &req, can_gc);
81}
82
83#[derive(Clone, JSTraceable, MallocSizeOf)]
85pub(crate) enum ReadRequest {
86 Read(#[conditional_malloc_size_of] Rc<Promise>),
88 DefaultTee {
90 tee_read_request: Dom<DefaultTeeReadRequest>,
91 },
92 ReadLoop {
95 #[ignore_malloc_size_of = "Rc is hard"]
96 #[no_trace]
97 success_steps: Rc<ReadAllBytesSuccessSteps>,
98 #[ignore_malloc_size_of = "Rc is hard"]
99 #[no_trace]
100 failure_steps: Rc<ReadAllBytesFailureSteps>,
101 reader: Dom<ReadableStreamDefaultReader>,
102 #[conditional_malloc_size_of]
103 bytes: Rc<DomRefCell<Vec<u8>>>,
104 },
105}
106
107impl ReadRequest {
108 pub(crate) fn chunk_steps(&self, chunk: RootedTraceableBox<Heap<JSVal>>, can_gc: CanGc) {
110 match self {
111 ReadRequest::Read(promise) => {
112 promise.resolve_native(
115 &ReadableStreamReadResult {
116 done: Some(false),
117 value: chunk,
118 },
119 can_gc,
120 );
121 },
122 ReadRequest::DefaultTee { tee_read_request } => {
123 tee_read_request.enqueue_chunk_steps(chunk);
124 },
125 ReadRequest::ReadLoop {
126 success_steps: _,
127 failure_steps,
128 reader,
129 bytes,
130 } => {
131 let cx = GlobalScope::get_cx();
133 let global = reader.global();
134
135 match bytes_from_chunk_jsval(cx, &chunk, can_gc) {
136 Ok(vec) => {
137 bytes.borrow_mut().extend_from_slice(&vec);
139
140 let tick = Promise::new(&global, can_gc);
144 tick.resolve_native(&(), can_gc);
145
146 let handler = PromiseNativeHandler::new(
147 &global,
148 Some(Box::new(ContinueReadMicrotask {
149 reader: Dom::from_ref(reader),
150 request: self.clone(),
151 })),
152 None,
153 can_gc,
154 );
155
156 let realm = enter_realm(&*global);
157 let comp = InRealm::Entered(&realm);
158 tick.append_native_handler(&handler, comp, can_gc);
159 },
160 Err(err) => {
161 rooted!(in(*cx) let mut v = UndefinedValue());
163 err.to_jsval(cx, &global, v.handle_mut(), can_gc);
164 (failure_steps)(cx, v.handle());
165 },
166 }
167 },
168 }
169 }
170
171 pub(crate) fn close_steps(&self, can_gc: CanGc) {
173 match self {
174 ReadRequest::Read(promise) => {
175 let result = RootedTraceableBox::new(Heap::default());
178 result.set(UndefinedValue());
179 promise.resolve_native(
180 &ReadableStreamReadResult {
181 done: Some(true),
182 value: result,
183 },
184 can_gc,
185 );
186 },
187 ReadRequest::DefaultTee { tee_read_request } => {
188 tee_read_request.close_steps(can_gc);
189 },
190 ReadRequest::ReadLoop {
191 success_steps,
192 reader,
193 bytes,
194 ..
195 } => {
196 (success_steps)(&bytes.borrow());
198
199 reader
200 .release(can_gc)
201 .expect("Releasing the read-all-bytes reader should succeed");
202 },
203 }
204 }
205
206 pub(crate) fn error_steps(&self, e: SafeHandleValue, can_gc: CanGc) {
208 match self {
209 ReadRequest::Read(promise) => {
210 promise.reject_native(&e, can_gc)
213 },
214 ReadRequest::DefaultTee { tee_read_request } => {
215 tee_read_request.error_steps();
216 },
217 ReadRequest::ReadLoop {
218 failure_steps,
219 reader,
220 ..
221 } => {
222 let cx = GlobalScope::get_cx();
224 (failure_steps)(cx, e);
225
226 reader
227 .release(can_gc)
228 .expect("Releasing the read-all-bytes reader should succeed");
229 },
230 }
231 }
232}
233
234#[derive(Clone, JSTraceable, MallocSizeOf)]
237#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
238struct ClosedPromiseRejectionHandler {
239 branch_1_controller: Dom<ReadableStreamDefaultController>,
240 branch_2_controller: Dom<ReadableStreamDefaultController>,
241 #[conditional_malloc_size_of]
242 canceled_1: Rc<Cell<bool>>,
243 #[conditional_malloc_size_of]
244 canceled_2: Rc<Cell<bool>>,
245 #[conditional_malloc_size_of]
246 cancel_promise: Rc<Promise>,
247}
248
249impl Callback for ClosedPromiseRejectionHandler {
250 fn callback(&self, _cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
253 let branch_1_controller = &self.branch_1_controller;
254 let branch_2_controller = &self.branch_2_controller;
255
256 branch_1_controller.error(v, can_gc);
258 branch_2_controller.error(v, can_gc);
260
261 if !self.canceled_1.get() || !self.canceled_2.get() {
263 self.cancel_promise.resolve_native(&(), can_gc);
264 }
265 }
266}
267
268#[dom_struct]
270pub(crate) struct ReadableStreamDefaultReader {
271 reflector_: Reflector,
272
273 stream: MutNullableDom<ReadableStream>,
275
276 read_requests: DomRefCell<VecDeque<ReadRequest>>,
277
278 #[conditional_malloc_size_of]
280 closed_promise: DomRefCell<Rc<Promise>>,
281}
282
283impl ReadableStreamDefaultReader {
284 fn new_with_proto(
285 global: &GlobalScope,
286 proto: Option<SafeHandleObject>,
287 can_gc: CanGc,
288 ) -> DomRoot<ReadableStreamDefaultReader> {
289 reflect_dom_object_with_proto(
290 Box::new(ReadableStreamDefaultReader::new_inherited(global, can_gc)),
291 global,
292 proto,
293 can_gc,
294 )
295 }
296
297 fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> ReadableStreamDefaultReader {
298 ReadableStreamDefaultReader {
299 reflector_: Reflector::new(),
300 stream: MutNullableDom::new(None),
301 read_requests: DomRefCell::new(Default::default()),
302 closed_promise: DomRefCell::new(Promise::new(global, can_gc)),
303 }
304 }
305
306 pub(crate) fn new(global: &GlobalScope, can_gc: CanGc) -> DomRoot<ReadableStreamDefaultReader> {
307 reflect_dom_object(
308 Box::new(Self::new_inherited(global, can_gc)),
309 global,
310 can_gc,
311 )
312 }
313
314 pub(crate) fn set_up(
316 &self,
317 stream: &ReadableStream,
318 global: &GlobalScope,
319 can_gc: CanGc,
320 ) -> Fallible<()> {
321 if stream.is_locked() {
323 return Err(Error::Type("stream is locked".to_owned()));
324 }
325 self.generic_initialize(global, stream, can_gc);
328
329 self.read_requests.borrow_mut().clear();
331
332 Ok(())
333 }
334
335 pub(crate) fn close(&self, can_gc: CanGc) {
337 self.closed_promise.borrow().resolve_native(&(), can_gc);
339 let mut read_requests = self.take_read_requests();
342 for request in read_requests.drain(0..) {
345 request.close_steps(can_gc);
347 }
348 }
349
350 pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
352 self.read_requests
353 .borrow_mut()
354 .push_back(read_request.clone());
355 }
356
357 pub(crate) fn get_num_read_requests(&self) -> usize {
359 self.read_requests.borrow().len()
360 }
361
362 pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
364 self.closed_promise.borrow().reject_native(&e, can_gc);
366
367 self.closed_promise.borrow().set_promise_is_handled();
369
370 self.error_read_requests(e, can_gc);
372 }
373
374 pub(crate) fn remove_read_request(&self) -> ReadRequest {
376 self.read_requests
377 .borrow_mut()
378 .pop_front()
379 .expect("Reader must have read request when remove is called into.")
380 }
381
382 pub(crate) fn release(&self, can_gc: CanGc) -> Fallible<()> {
384 self.generic_release(can_gc)
386 .expect("Generic release failed");
387 let cx = GlobalScope::get_cx();
389 rooted!(in(*cx) let mut error = UndefinedValue());
390 Error::Type("Reader is released".to_owned()).to_jsval(
391 cx,
392 &self.global(),
393 error.handle_mut(),
394 can_gc,
395 );
396
397 self.error_read_requests(error.handle(), can_gc);
399 Ok(())
400 }
401
402 fn take_read_requests(&self) -> VecDeque<ReadRequest> {
403 mem::take(&mut *self.read_requests.borrow_mut())
404 }
405
406 fn error_read_requests(&self, rval: SafeHandleValue, can_gc: CanGc) {
408 let mut read_requests = self.take_read_requests();
410
411 for request in read_requests.drain(0..) {
413 request.error_steps(rval, can_gc);
414 }
415 }
416
417 pub(crate) fn read(&self, cx: SafeJSContext, read_request: &ReadRequest, can_gc: CanGc) {
419 assert!(self.stream.get().is_some());
423
424 let stream = self.stream.get().unwrap();
425
426 stream.set_is_disturbed(true);
428 if stream.is_closed() {
430 read_request.close_steps(can_gc);
431 } else if stream.is_errored() {
432 let cx = GlobalScope::get_cx();
435 rooted!(in(*cx) let mut error = UndefinedValue());
436 stream.get_stored_error(error.handle_mut());
437 read_request.error_steps(error.handle(), can_gc);
438 } else {
439 assert!(stream.is_readable());
442 stream.perform_pull_steps(cx, read_request, can_gc);
444 }
445 }
446
447 pub(crate) fn append_native_handler_to_closed_promise(
449 &self,
450 branch_1: &ReadableStream,
451 branch_2: &ReadableStream,
452 canceled_1: Rc<Cell<bool>>,
453 canceled_2: Rc<Cell<bool>>,
454 cancel_promise: Rc<Promise>,
455 can_gc: CanGc,
456 ) {
457 let branch_1_controller = branch_1.get_default_controller();
458
459 let branch_2_controller = branch_2.get_default_controller();
460
461 let global = self.global();
462 let handler = PromiseNativeHandler::new(
463 &global,
464 None,
465 Some(Box::new(ClosedPromiseRejectionHandler {
466 branch_1_controller: Dom::from_ref(&branch_1_controller),
467 branch_2_controller: Dom::from_ref(&branch_2_controller),
468 canceled_1,
469 canceled_2,
470 cancel_promise,
471 })),
472 can_gc,
473 );
474
475 let realm = enter_realm(&*global);
476 let comp = InRealm::Entered(&realm);
477
478 self.closed_promise
479 .borrow()
480 .append_native_handler(&handler, comp, can_gc);
481 }
482
483 pub(crate) fn read_all_bytes(
485 &self,
486 cx: SafeJSContext,
487 success_steps: Rc<ReadAllBytesSuccessSteps>,
488 failure_steps: Rc<ReadAllBytesFailureSteps>,
489 can_gc: CanGc,
490 ) {
491 read_loop(self, cx, success_steps, failure_steps, can_gc);
496 }
497
498 pub(crate) fn process_read_requests(
500 &self,
501 cx: SafeJSContext,
502 controller: DomRoot<ReadableByteStreamController>,
503 can_gc: CanGc,
504 ) -> Fallible<()> {
505 while !self.read_requests.borrow().is_empty() {
507 if controller.get_queue_total_size() == 0.0 {
509 return Ok(());
510 }
511
512 let read_request = self.remove_read_request();
515
516 controller
518 .fill_read_request_from_queue(cx, &read_request, can_gc)
519 .expect("Fill read request from queue failed");
520 }
521 Ok(())
522 }
523}
524
525impl ReadableStreamDefaultReaderMethods<crate::DomTypeHolder> for ReadableStreamDefaultReader {
526 fn Constructor(
528 global: &GlobalScope,
529 proto: Option<SafeHandleObject>,
530 can_gc: CanGc,
531 stream: &ReadableStream,
532 ) -> Fallible<DomRoot<Self>> {
533 let reader = Self::new_with_proto(global, proto, can_gc);
534
535 Self::set_up(&reader, stream, global, can_gc)?;
537
538 Ok(reader)
539 }
540
541 fn Read(&self, can_gc: CanGc) -> Rc<Promise> {
543 let cx = GlobalScope::get_cx();
544 if self.stream.get().is_none() {
546 rooted!(in(*cx) let mut error = UndefinedValue());
547 Error::Type("stream is undefined".to_owned()).to_jsval(
548 cx,
549 &self.global(),
550 error.handle_mut(),
551 can_gc,
552 );
553 return Promise::new_rejected(&self.global(), cx, error.handle(), can_gc);
554 }
555 let promise = Promise::new(&self.global(), can_gc);
557
558 let read_request = ReadRequest::Read(promise.clone());
572
573 self.read(cx, &read_request, can_gc);
575
576 promise
578 }
579
580 fn ReleaseLock(&self, can_gc: CanGc) -> Fallible<()> {
582 if self.stream.get().is_none() {
583 return Ok(());
585 }
586
587 self.release(can_gc)
589 }
590
591 fn Closed(&self) -> Rc<Promise> {
593 self.closed()
594 }
595
596 fn Cancel(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
598 self.generic_cancel(cx, &self.global(), reason, can_gc)
599 }
600}
601
602impl ReadableStreamGenericReader for ReadableStreamDefaultReader {
603 fn get_closed_promise(&self) -> Rc<Promise> {
604 self.closed_promise.borrow().clone()
605 }
606
607 fn set_closed_promise(&self, promise: Rc<Promise>) {
608 *self.closed_promise.borrow_mut() = promise;
609 }
610
611 fn set_stream(&self, stream: Option<&ReadableStream>) {
612 self.stream.set(stream);
613 }
614
615 fn get_stream(&self) -> Option<DomRoot<ReadableStream>> {
616 self.stream.get()
617 }
618
619 fn as_default_reader(&self) -> Option<&ReadableStreamDefaultReader> {
620 Some(self)
621 }
622}