1use std::cell::Cell;
6use std::collections::VecDeque;
7use std::mem;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::jsapi::Heap;
12use js::jsval::{JSVal, UndefinedValue};
13use js::realm::CurrentRealm;
14use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
15
16use super::byteteereadrequest::ByteTeeReadRequest;
17use super::readablebytestreamcontroller::ReadableByteStreamController;
18use crate::dom::bindings::cell::DomRefCell;
19use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::{
20 ReadableStreamDefaultReaderMethods, ReadableStreamReadResult,
21};
22use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
23use crate::dom::bindings::reflector::{
24 DomGlobal, Reflector, reflect_dom_object, reflect_dom_object_with_proto,
25};
26use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
27use crate::dom::bindings::trace::RootedTraceableBox;
28use crate::dom::globalscope::GlobalScope;
29use crate::dom::promise::Promise;
30use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
31use crate::dom::readablestream::{ReadableStream, bytes_from_chunk_jsval};
32use crate::dom::stream::defaultteereadrequest::DefaultTeeReadRequest;
33use crate::dom::stream::readablestreamgenericreader::ReadableStreamGenericReader;
34use crate::dom::types::ReadableStreamDefaultController;
35use crate::realms::{InRealm, enter_realm};
36use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
37
38type ReadAllBytesSuccessSteps = dyn Fn(&[u8]);
39type ReadAllBytesFailureSteps = dyn Fn(SafeJSContext, SafeHandleValue);
40
41impl js::gc::Rootable for ContinueReadMicrotask {}
42
43#[derive(Clone, JSTraceable, MallocSizeOf)]
49#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
50struct ContinueReadMicrotask {
51 reader: Dom<ReadableStreamDefaultReader>,
52 request: ReadRequest,
53}
54
55impl Callback for ContinueReadMicrotask {
56 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
57 let can_gc = CanGc::from_cx(cx);
58 self.reader.read(cx.into(), &self.request, can_gc);
61 }
62}
63
64fn read_loop(
66 reader: &ReadableStreamDefaultReader,
67 cx: SafeJSContext,
68 success_steps: Rc<ReadAllBytesSuccessSteps>,
69 failure_steps: Rc<ReadAllBytesFailureSteps>,
70 can_gc: CanGc,
71) {
72 let req = ReadRequest::ReadLoop {
77 success_steps,
78 failure_steps,
79 reader: Dom::from_ref(reader),
80 bytes: Rc::new(DomRefCell::new(Vec::new())),
81 };
82 reader.read(cx, &req, can_gc);
84}
85
86#[derive(Clone, JSTraceable, MallocSizeOf)]
88pub(crate) enum ReadRequest {
89 Read(#[conditional_malloc_size_of] Rc<Promise>),
91 DefaultTee {
93 tee_read_request: Dom<DefaultTeeReadRequest>,
94 },
95 ReadLoop {
98 #[ignore_malloc_size_of = "Rc is hard"]
99 #[no_trace]
100 success_steps: Rc<ReadAllBytesSuccessSteps>,
101 #[ignore_malloc_size_of = "Rc is hard"]
102 #[no_trace]
103 failure_steps: Rc<ReadAllBytesFailureSteps>,
104 reader: Dom<ReadableStreamDefaultReader>,
105 #[conditional_malloc_size_of]
106 bytes: Rc<DomRefCell<Vec<u8>>>,
107 },
108 ByteTee {
109 byte_tee_read_request: Dom<ByteTeeReadRequest>,
110 },
111}
112
113impl ReadRequest {
114 pub(crate) fn chunk_steps(
116 &self,
117 chunk: RootedTraceableBox<Heap<JSVal>>,
118 global: &GlobalScope,
119 can_gc: CanGc,
120 ) {
121 match self {
122 ReadRequest::Read(promise) => {
123 promise.resolve_native(
126 &ReadableStreamReadResult {
127 done: Some(false),
128 value: chunk,
129 },
130 can_gc,
131 );
132 },
133 ReadRequest::DefaultTee { tee_read_request } => {
134 tee_read_request.enqueue_chunk_steps(chunk);
135 },
136 ReadRequest::ByteTee {
137 byte_tee_read_request,
138 } => {
139 byte_tee_read_request.enqueue_chunk_steps(global, chunk);
140 },
141 ReadRequest::ReadLoop {
142 success_steps: _,
143 failure_steps,
144 reader,
145 bytes,
146 } => {
147 let cx = GlobalScope::get_cx();
149 let global = reader.global();
150
151 match bytes_from_chunk_jsval(cx, &chunk, can_gc) {
152 Ok(vec) => {
153 bytes.borrow_mut().extend_from_slice(&vec);
155
156 let tick = Promise::new(&global, can_gc);
160 tick.resolve_native(&(), can_gc);
161
162 let handler = PromiseNativeHandler::new(
163 &global,
164 Some(Box::new(ContinueReadMicrotask {
165 reader: Dom::from_ref(reader),
166 request: self.clone(),
167 })),
168 None,
169 can_gc,
170 );
171
172 let realm = enter_realm(&*global);
173 let comp = InRealm::Entered(&realm);
174 tick.append_native_handler(&handler, comp, can_gc);
175 },
176 Err(err) => {
177 rooted!(in(*cx) let mut v = UndefinedValue());
179 err.to_jsval(cx, &global, v.handle_mut(), can_gc);
180 (failure_steps)(cx, v.handle());
181 },
182 }
183 },
184 }
185 }
186
187 pub(crate) fn close_steps(&self, can_gc: CanGc) {
189 match self {
190 ReadRequest::Read(promise) => {
191 let result = RootedTraceableBox::new(Heap::default());
194 result.set(UndefinedValue());
195 promise.resolve_native(
196 &ReadableStreamReadResult {
197 done: Some(true),
198 value: result,
199 },
200 can_gc,
201 );
202 },
203 ReadRequest::DefaultTee { tee_read_request } => {
204 tee_read_request.close_steps(can_gc);
205 },
206 ReadRequest::ByteTee {
207 byte_tee_read_request,
208 } => {
209 byte_tee_read_request
210 .close_steps(can_gc)
211 .expect("ByteTeeReadRequest close steps should not fail");
212 },
213 ReadRequest::ReadLoop {
214 success_steps,
215 reader,
216 bytes,
217 ..
218 } => {
219 (success_steps)(&bytes.borrow());
221
222 reader
223 .release(can_gc)
224 .expect("Releasing the read-all-bytes reader should succeed");
225 },
226 }
227 }
228
229 pub(crate) fn error_steps(&self, e: SafeHandleValue, can_gc: CanGc) {
231 match self {
232 ReadRequest::Read(promise) => {
233 promise.reject_native(&e, can_gc)
236 },
237 ReadRequest::DefaultTee { tee_read_request } => {
238 tee_read_request.error_steps();
239 },
240 ReadRequest::ByteTee {
241 byte_tee_read_request,
242 } => {
243 byte_tee_read_request.error_steps();
244 },
245 ReadRequest::ReadLoop {
246 failure_steps,
247 reader,
248 ..
249 } => {
250 let cx = GlobalScope::get_cx();
252 (failure_steps)(cx, e);
253
254 reader
255 .release(can_gc)
256 .expect("Releasing the read-all-bytes reader should succeed");
257 },
258 }
259 }
260}
261
262#[derive(Clone, JSTraceable, MallocSizeOf)]
265#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
266struct ByteTeeClosedPromiseRejectionHandler {
267 branch_1_controller: Dom<ReadableByteStreamController>,
268 branch_2_controller: Dom<ReadableByteStreamController>,
269 #[conditional_malloc_size_of]
270 canceled_1: Rc<Cell<bool>>,
271 #[conditional_malloc_size_of]
272 canceled_2: Rc<Cell<bool>>,
273 #[conditional_malloc_size_of]
274 cancel_promise: Rc<Promise>,
275 #[conditional_malloc_size_of]
276 reader_version: Rc<Cell<u64>>,
277 expected_version: u64,
278}
279
280impl Callback for ByteTeeClosedPromiseRejectionHandler {
281 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
284 let can_gc = CanGc::from_cx(cx);
285 if self.reader_version.get() != self.expected_version {
287 return;
288 }
289
290 self.branch_1_controller.error(v, can_gc);
292
293 self.branch_2_controller.error(v, can_gc);
295
296 if !self.canceled_1.get() || !self.canceled_2.get() {
298 self.cancel_promise.resolve_native(&(), can_gc);
299 }
300 }
301}
302
303#[derive(Clone, JSTraceable, MallocSizeOf)]
306#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
307struct DefaultTeeClosedPromiseRejectionHandler {
308 branch_1_controller: Dom<ReadableStreamDefaultController>,
309 branch_2_controller: Dom<ReadableStreamDefaultController>,
310 #[conditional_malloc_size_of]
311 canceled_1: Rc<Cell<bool>>,
312 #[conditional_malloc_size_of]
313 canceled_2: Rc<Cell<bool>>,
314 #[conditional_malloc_size_of]
315 cancel_promise: Rc<Promise>,
316}
317
318impl Callback for DefaultTeeClosedPromiseRejectionHandler {
319 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
322 let can_gc = CanGc::from_cx(cx);
323 self.branch_1_controller.error(v, can_gc);
325 self.branch_2_controller.error(v, can_gc);
327
328 if !self.canceled_1.get() || !self.canceled_2.get() {
330 self.cancel_promise.resolve_native(&(), can_gc);
331 }
332 }
333}
334
335#[dom_struct]
337pub(crate) struct ReadableStreamDefaultReader {
338 reflector_: Reflector,
339
340 stream: MutNullableDom<ReadableStream>,
342
343 read_requests: DomRefCell<VecDeque<ReadRequest>>,
344
345 #[conditional_malloc_size_of]
347 closed_promise: DomRefCell<Rc<Promise>>,
348}
349
350impl ReadableStreamDefaultReader {
351 fn new_with_proto(
352 global: &GlobalScope,
353 proto: Option<SafeHandleObject>,
354 can_gc: CanGc,
355 ) -> DomRoot<ReadableStreamDefaultReader> {
356 reflect_dom_object_with_proto(
357 Box::new(ReadableStreamDefaultReader::new_inherited(global, can_gc)),
358 global,
359 proto,
360 can_gc,
361 )
362 }
363
364 fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> ReadableStreamDefaultReader {
365 ReadableStreamDefaultReader {
366 reflector_: Reflector::new(),
367 stream: MutNullableDom::new(None),
368 read_requests: DomRefCell::new(Default::default()),
369 closed_promise: DomRefCell::new(Promise::new(global, can_gc)),
370 }
371 }
372
373 pub(crate) fn new(global: &GlobalScope, can_gc: CanGc) -> DomRoot<ReadableStreamDefaultReader> {
374 reflect_dom_object(
375 Box::new(Self::new_inherited(global, can_gc)),
376 global,
377 can_gc,
378 )
379 }
380
381 pub(crate) fn set_up(
383 &self,
384 stream: &ReadableStream,
385 global: &GlobalScope,
386 can_gc: CanGc,
387 ) -> Fallible<()> {
388 if stream.is_locked() {
390 return Err(Error::Type("stream is locked".to_owned()));
391 }
392 self.generic_initialize(global, stream, can_gc);
395
396 self.read_requests.borrow_mut().clear();
398
399 Ok(())
400 }
401
402 pub(crate) fn close(&self, can_gc: CanGc) {
404 self.closed_promise.borrow().resolve_native(&(), can_gc);
406 let mut read_requests = self.take_read_requests();
409 for request in read_requests.drain(0..) {
412 request.close_steps(can_gc);
414 }
415 }
416
417 pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
419 self.read_requests
420 .borrow_mut()
421 .push_back(read_request.clone());
422 }
423
424 pub(crate) fn get_num_read_requests(&self) -> usize {
426 self.read_requests.borrow().len()
427 }
428
429 pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
431 self.closed_promise.borrow().reject_native(&e, can_gc);
433
434 self.closed_promise.borrow().set_promise_is_handled();
436
437 self.error_read_requests(e, can_gc);
439 }
440
441 pub(crate) fn remove_read_request(&self) -> ReadRequest {
443 self.read_requests
444 .borrow_mut()
445 .pop_front()
446 .expect("Reader must have read request when remove is called into.")
447 }
448
449 pub(crate) fn release(&self, can_gc: CanGc) -> Fallible<()> {
451 self.generic_release(can_gc)
453 .expect("Generic release failed");
454 let cx = GlobalScope::get_cx();
456 rooted!(in(*cx) let mut error = UndefinedValue());
457 Error::Type("Reader is released".to_owned()).to_jsval(
458 cx,
459 &self.global(),
460 error.handle_mut(),
461 can_gc,
462 );
463
464 self.error_read_requests(error.handle(), can_gc);
466 Ok(())
467 }
468
469 fn take_read_requests(&self) -> VecDeque<ReadRequest> {
470 mem::take(&mut *self.read_requests.borrow_mut())
471 }
472
473 fn error_read_requests(&self, rval: SafeHandleValue, can_gc: CanGc) {
475 let mut read_requests = self.take_read_requests();
477
478 for request in read_requests.drain(0..) {
480 request.error_steps(rval, can_gc);
481 }
482 }
483
484 pub(crate) fn read(&self, cx: SafeJSContext, read_request: &ReadRequest, can_gc: CanGc) {
486 assert!(self.stream.get().is_some());
490
491 let stream = self.stream.get().unwrap();
492
493 stream.set_is_disturbed(true);
495 if stream.is_closed() {
497 read_request.close_steps(can_gc);
498 } else if stream.is_errored() {
499 let cx = GlobalScope::get_cx();
502 rooted!(in(*cx) let mut error = UndefinedValue());
503 stream.get_stored_error(error.handle_mut());
504 read_request.error_steps(error.handle(), can_gc);
505 } else {
506 assert!(stream.is_readable());
509 stream.perform_pull_steps(cx, read_request, can_gc);
511 }
512 }
513
514 #[allow(clippy::too_many_arguments)]
517 pub(crate) fn byte_tee_append_native_handler_to_closed_promise(
518 &self,
519 branch_1: &ReadableStream,
520 branch_2: &ReadableStream,
521 canceled_1: Rc<Cell<bool>>,
522 canceled_2: Rc<Cell<bool>>,
523 cancel_promise: Rc<Promise>,
524 reader_version: Rc<Cell<u64>>,
525 expected_version: u64,
526 can_gc: CanGc,
527 ) {
528 let branch_1_controller = branch_1.get_byte_controller();
530 let branch_2_controller = branch_2.get_byte_controller();
531
532 let global = self.global();
533 let handler = PromiseNativeHandler::new(
534 &global,
535 None,
536 Some(Box::new(ByteTeeClosedPromiseRejectionHandler {
537 branch_1_controller: Dom::from_ref(&branch_1_controller),
538 branch_2_controller: Dom::from_ref(&branch_2_controller),
539 canceled_1,
540 canceled_2,
541 cancel_promise,
542 reader_version,
543 expected_version,
544 })),
545 can_gc,
546 );
547
548 let realm = enter_realm(&*global);
549 let comp = InRealm::Entered(&realm);
550
551 self.closed_promise
552 .borrow()
553 .append_native_handler(&handler, comp, can_gc);
554 }
555
556 pub(crate) fn default_tee_append_native_handler_to_closed_promise(
558 &self,
559 branch_1: &ReadableStream,
560 branch_2: &ReadableStream,
561 canceled_1: Rc<Cell<bool>>,
562 canceled_2: Rc<Cell<bool>>,
563 cancel_promise: Rc<Promise>,
564 can_gc: CanGc,
565 ) {
566 let branch_1_controller = branch_1.get_default_controller();
567
568 let branch_2_controller = branch_2.get_default_controller();
569
570 let global = self.global();
571 let handler = PromiseNativeHandler::new(
572 &global,
573 None,
574 Some(Box::new(DefaultTeeClosedPromiseRejectionHandler {
575 branch_1_controller: Dom::from_ref(&branch_1_controller),
576 branch_2_controller: Dom::from_ref(&branch_2_controller),
577 canceled_1,
578 canceled_2,
579 cancel_promise,
580 })),
581 can_gc,
582 );
583
584 let realm = enter_realm(&*global);
585 let comp = InRealm::Entered(&realm);
586
587 self.closed_promise
588 .borrow()
589 .append_native_handler(&handler, comp, can_gc);
590 }
591
592 pub(crate) fn read_all_bytes(
594 &self,
595 cx: SafeJSContext,
596 success_steps: Rc<ReadAllBytesSuccessSteps>,
597 failure_steps: Rc<ReadAllBytesFailureSteps>,
598 can_gc: CanGc,
599 ) {
600 read_loop(self, cx, success_steps, failure_steps, can_gc);
605 }
606
607 pub(crate) fn process_read_requests(
609 &self,
610 cx: SafeJSContext,
611 controller: DomRoot<ReadableByteStreamController>,
612 can_gc: CanGc,
613 ) -> Fallible<()> {
614 while !self.read_requests.borrow().is_empty() {
616 if controller.get_queue_total_size() == 0.0 {
618 return Ok(());
619 }
620
621 let read_request = self.remove_read_request();
624
625 controller
627 .fill_read_request_from_queue(cx, &read_request, can_gc)
628 .expect("Fill read request from queue failed");
629 }
630 Ok(())
631 }
632}
633
634impl ReadableStreamDefaultReaderMethods<crate::DomTypeHolder> for ReadableStreamDefaultReader {
635 fn Constructor(
637 global: &GlobalScope,
638 proto: Option<SafeHandleObject>,
639 can_gc: CanGc,
640 stream: &ReadableStream,
641 ) -> Fallible<DomRoot<Self>> {
642 let reader = Self::new_with_proto(global, proto, can_gc);
643
644 Self::set_up(&reader, stream, global, can_gc)?;
646
647 Ok(reader)
648 }
649
650 fn Read(&self, can_gc: CanGc) -> Rc<Promise> {
652 let cx = GlobalScope::get_cx();
653 if self.stream.get().is_none() {
655 rooted!(in(*cx) let mut error = UndefinedValue());
656 Error::Type("stream is undefined".to_owned()).to_jsval(
657 cx,
658 &self.global(),
659 error.handle_mut(),
660 can_gc,
661 );
662 return Promise::new_rejected(&self.global(), cx, error.handle(), can_gc);
663 }
664 let promise = Promise::new(&self.global(), can_gc);
666
667 let read_request = ReadRequest::Read(promise.clone());
681
682 self.read(cx, &read_request, can_gc);
684
685 promise
687 }
688
689 fn ReleaseLock(&self, can_gc: CanGc) -> Fallible<()> {
691 if self.stream.get().is_none() {
692 return Ok(());
694 }
695
696 self.release(can_gc)
698 }
699
700 fn Closed(&self) -> Rc<Promise> {
702 self.closed()
703 }
704
705 fn Cancel(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
707 self.generic_cancel(cx, &self.global(), reason, can_gc)
708 }
709}
710
711impl ReadableStreamGenericReader for ReadableStreamDefaultReader {
712 fn get_closed_promise(&self) -> Rc<Promise> {
713 self.closed_promise.borrow().clone()
714 }
715
716 fn set_closed_promise(&self, promise: Rc<Promise>) {
717 *self.closed_promise.borrow_mut() = promise;
718 }
719
720 fn set_stream(&self, stream: Option<&ReadableStream>) {
721 self.stream.set(stream);
722 }
723
724 fn get_stream(&self) -> Option<DomRoot<ReadableStream>> {
725 self.stream.get()
726 }
727
728 fn as_default_reader(&self) -> Option<&ReadableStreamDefaultReader> {
729 Some(self)
730 }
731}