1use std::cell::Cell;
6use std::collections::VecDeque;
7use std::mem;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::jsapi::Heap;
12use js::jsval::{JSVal, UndefinedValue};
13use js::realm::CurrentRealm;
14use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
15use script_bindings::cell::DomRefCell;
16use script_bindings::reflector::{Reflector, reflect_dom_object, reflect_dom_object_with_proto};
17
18use super::byteteereadrequest::ByteTeeReadRequest;
19use super::readablebytestreamcontroller::ReadableByteStreamController;
20use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::{
21 ReadableStreamDefaultReaderMethods, ReadableStreamReadResult,
22};
23use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
24use crate::dom::bindings::reflector::DomGlobal;
25use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
26use crate::dom::bindings::trace::RootedTraceableBox;
27use crate::dom::globalscope::GlobalScope;
28use crate::dom::promise::Promise;
29use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
30use crate::dom::readablestream::{ReadableStream, bytes_from_chunk_jsval};
31use crate::dom::stream::defaultteereadrequest::DefaultTeeReadRequest;
32use crate::dom::stream::readablestreamgenericreader::ReadableStreamGenericReader;
33use crate::dom::types::ReadableStreamDefaultController;
34use crate::realms::enter_auto_realm;
35use crate::script_runtime::CanGc;
36
37type ReadAllBytesSuccessSteps = dyn Fn(&mut js::context::JSContext, &[u8]);
38type ReadAllBytesFailureSteps = dyn Fn(&mut js::context::JSContext, SafeHandleValue);
39
40impl js::gc::Rootable for ContinueReadMicrotask {}
41
42#[derive(Clone, JSTraceable, MallocSizeOf)]
48#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
49struct ContinueReadMicrotask {
50 reader: Dom<ReadableStreamDefaultReader>,
51 request: ReadRequest,
52}
53
54impl Callback for ContinueReadMicrotask {
55 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
56 self.reader.read(cx, &self.request);
59 }
60}
61
62fn read_loop(
64 cx: &mut js::context::JSContext,
65 reader: &ReadableStreamDefaultReader,
66 success_steps: Rc<ReadAllBytesSuccessSteps>,
67 failure_steps: Rc<ReadAllBytesFailureSteps>,
68) {
69 let req = ReadRequest::ReadLoop {
74 success_steps,
75 failure_steps,
76 reader: Dom::from_ref(reader),
77 bytes: Rc::new(DomRefCell::new(Vec::new())),
78 };
79 reader.read(cx, &req);
81}
82
83#[derive(Clone, JSTraceable, MallocSizeOf)]
85pub(crate) enum ReadRequest {
86 Read(#[conditional_malloc_size_of] Rc<Promise>),
88 DefaultTee {
90 tee_read_request: Dom<DefaultTeeReadRequest>,
91 },
92 ReadLoop {
95 #[ignore_malloc_size_of = "dyn Fn"]
96 #[no_trace]
97 success_steps: Rc<ReadAllBytesSuccessSteps>,
98 #[ignore_malloc_size_of = "dyn Fn"]
99 #[no_trace]
100 failure_steps: Rc<ReadAllBytesFailureSteps>,
101 reader: Dom<ReadableStreamDefaultReader>,
102 #[conditional_malloc_size_of]
103 bytes: Rc<DomRefCell<Vec<u8>>>,
104 },
105 ByteTee {
106 byte_tee_read_request: Dom<ByteTeeReadRequest>,
107 },
108}
109
110impl ReadRequest {
111 pub(crate) fn chunk_steps(
113 &self,
114 cx: &mut js::context::JSContext,
115 chunk: RootedTraceableBox<Heap<JSVal>>,
116 global: &GlobalScope,
117 ) {
118 match self {
119 ReadRequest::Read(promise) => {
120 promise.resolve_native(
123 &ReadableStreamReadResult {
124 done: Some(false),
125 value: chunk,
126 },
127 CanGc::from_cx(cx),
128 );
129 },
130 ReadRequest::DefaultTee { tee_read_request } => {
131 tee_read_request.enqueue_chunk_steps(chunk);
132 },
133 ReadRequest::ByteTee {
134 byte_tee_read_request,
135 } => {
136 byte_tee_read_request.enqueue_chunk_steps(global, chunk);
137 },
138 ReadRequest::ReadLoop {
139 success_steps: _,
140 failure_steps,
141 reader,
142 bytes,
143 } => {
144 let global = reader.global();
146
147 match bytes_from_chunk_jsval(cx.into(), &chunk, CanGc::from_cx(cx)) {
148 Ok(vec) => {
149 bytes.borrow_mut().extend_from_slice(&vec);
151
152 let tick = Promise::new(&global, CanGc::from_cx(cx));
156 tick.resolve_native(&(), CanGc::from_cx(cx));
157
158 let handler = PromiseNativeHandler::new(
159 &global,
160 Some(Box::new(ContinueReadMicrotask {
161 reader: Dom::from_ref(reader),
162 request: self.clone(),
163 })),
164 None,
165 CanGc::from_cx(cx),
166 );
167
168 let mut realm = enter_auto_realm(cx, &*global);
169 let cx = &mut realm.current_realm();
170 tick.append_native_handler(cx, &handler);
171 },
172 Err(err) => {
173 rooted!(&in(cx) let mut v = UndefinedValue());
175 err.to_jsval(cx.into(), &global, v.handle_mut(), CanGc::from_cx(cx));
176 (failure_steps)(cx, v.handle());
177 },
178 }
179 },
180 }
181 }
182
183 pub(crate) fn close_steps(&self, cx: &mut js::context::JSContext) {
185 match self {
186 ReadRequest::Read(promise) => {
187 let result = RootedTraceableBox::new(Heap::default());
190 result.set(UndefinedValue());
191 promise.resolve_native(
192 &ReadableStreamReadResult {
193 done: Some(true),
194 value: result,
195 },
196 CanGc::from_cx(cx),
197 );
198 },
199 ReadRequest::DefaultTee { tee_read_request } => {
200 tee_read_request.close_steps(cx);
201 },
202 ReadRequest::ByteTee {
203 byte_tee_read_request,
204 } => {
205 byte_tee_read_request
206 .close_steps(cx)
207 .expect("ByteTeeReadRequest close steps should not fail");
208 },
209 ReadRequest::ReadLoop {
210 success_steps,
211 reader,
212 bytes,
213 ..
214 } => {
215 (success_steps)(cx, &bytes.borrow());
217
218 reader
219 .release(cx)
220 .expect("Releasing the read-all-bytes reader should succeed");
221 },
222 }
223 }
224
225 pub(crate) fn error_steps(&self, cx: &mut js::context::JSContext, e: SafeHandleValue) {
227 match self {
228 ReadRequest::Read(promise) => {
229 promise.reject_native(&e, CanGc::from_cx(cx))
232 },
233 ReadRequest::DefaultTee { tee_read_request } => {
234 tee_read_request.error_steps();
235 },
236 ReadRequest::ByteTee {
237 byte_tee_read_request,
238 } => {
239 byte_tee_read_request.error_steps();
240 },
241 ReadRequest::ReadLoop {
242 failure_steps,
243 reader,
244 ..
245 } => {
246 (failure_steps)(cx, e);
248
249 reader
250 .release(cx)
251 .expect("Releasing the read-all-bytes reader should succeed");
252 },
253 }
254 }
255}
256
257#[derive(Clone, JSTraceable, MallocSizeOf)]
260#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
261struct ByteTeeClosedPromiseRejectionHandler {
262 branch_1_controller: Dom<ReadableByteStreamController>,
263 branch_2_controller: Dom<ReadableByteStreamController>,
264 #[conditional_malloc_size_of]
265 canceled_1: Rc<Cell<bool>>,
266 #[conditional_malloc_size_of]
267 canceled_2: Rc<Cell<bool>>,
268 #[conditional_malloc_size_of]
269 cancel_promise: Rc<Promise>,
270 #[conditional_malloc_size_of]
271 reader_version: Rc<Cell<u64>>,
272 expected_version: u64,
273}
274
275impl Callback for ByteTeeClosedPromiseRejectionHandler {
276 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
279 if self.reader_version.get() != self.expected_version {
281 return;
282 }
283
284 self.branch_1_controller.error(cx, v);
286
287 self.branch_2_controller.error(cx, v);
289
290 if !self.canceled_1.get() || !self.canceled_2.get() {
292 self.cancel_promise.resolve_native(&(), CanGc::from_cx(cx));
293 }
294 }
295}
296
297#[derive(Clone, JSTraceable, MallocSizeOf)]
300#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
301struct DefaultTeeClosedPromiseRejectionHandler {
302 branch_1_controller: Dom<ReadableStreamDefaultController>,
303 branch_2_controller: Dom<ReadableStreamDefaultController>,
304 #[conditional_malloc_size_of]
305 canceled_1: Rc<Cell<bool>>,
306 #[conditional_malloc_size_of]
307 canceled_2: Rc<Cell<bool>>,
308 #[conditional_malloc_size_of]
309 cancel_promise: Rc<Promise>,
310}
311
312impl Callback for DefaultTeeClosedPromiseRejectionHandler {
313 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
316 self.branch_1_controller.error(cx, v);
318 self.branch_2_controller.error(cx, v);
320
321 if !self.canceled_1.get() || !self.canceled_2.get() {
323 self.cancel_promise.resolve_native(&(), CanGc::from_cx(cx));
324 }
325 }
326}
327
328#[dom_struct]
330pub(crate) struct ReadableStreamDefaultReader {
331 reflector_: Reflector,
332
333 stream: MutNullableDom<ReadableStream>,
335
336 read_requests: DomRefCell<VecDeque<ReadRequest>>,
337
338 #[conditional_malloc_size_of]
340 closed_promise: DomRefCell<Rc<Promise>>,
341}
342
343impl ReadableStreamDefaultReader {
344 fn new_with_proto(
345 global: &GlobalScope,
346 proto: Option<SafeHandleObject>,
347 can_gc: CanGc,
348 ) -> DomRoot<ReadableStreamDefaultReader> {
349 reflect_dom_object_with_proto(
350 Box::new(ReadableStreamDefaultReader::new_inherited(global, can_gc)),
351 global,
352 proto,
353 can_gc,
354 )
355 }
356
357 fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> ReadableStreamDefaultReader {
358 ReadableStreamDefaultReader {
359 reflector_: Reflector::new(),
360 stream: MutNullableDom::new(None),
361 read_requests: DomRefCell::new(Default::default()),
362 closed_promise: DomRefCell::new(Promise::new(global, can_gc)),
363 }
364 }
365
366 pub(crate) fn new(global: &GlobalScope, can_gc: CanGc) -> DomRoot<ReadableStreamDefaultReader> {
367 reflect_dom_object(
368 Box::new(Self::new_inherited(global, can_gc)),
369 global,
370 can_gc,
371 )
372 }
373
374 pub(crate) fn set_up(
376 &self,
377 stream: &ReadableStream,
378 global: &GlobalScope,
379 can_gc: CanGc,
380 ) -> Fallible<()> {
381 if stream.is_locked() {
383 return Err(Error::Type(c"stream is locked".to_owned()));
384 }
385 self.generic_initialize(global, stream, can_gc);
388
389 self.read_requests.borrow_mut().clear();
391
392 Ok(())
393 }
394
395 pub(crate) fn close(&self, cx: &mut js::context::JSContext) {
397 self.closed_promise
399 .borrow()
400 .resolve_native(&(), CanGc::from_cx(cx));
401 let mut read_requests = self.take_read_requests();
404 for request in read_requests.drain(0..) {
407 request.close_steps(cx);
409 }
410 }
411
412 pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
414 self.read_requests
415 .borrow_mut()
416 .push_back(read_request.clone());
417 }
418
419 pub(crate) fn get_num_read_requests(&self) -> usize {
421 self.read_requests.borrow().len()
422 }
423
424 pub(crate) fn error(&self, cx: &mut js::context::JSContext, e: SafeHandleValue) {
426 self.closed_promise
428 .borrow()
429 .reject_native(&e, CanGc::from_cx(cx));
430
431 self.closed_promise.borrow().set_promise_is_handled();
433
434 self.error_read_requests(cx, e);
436 }
437
438 pub(crate) fn remove_read_request(&self) -> ReadRequest {
440 self.read_requests
441 .borrow_mut()
442 .pop_front()
443 .expect("Reader must have read request when remove is called into.")
444 }
445
446 pub(crate) fn release(&self, cx: &mut js::context::JSContext) -> Fallible<()> {
448 self.generic_release(CanGc::from_cx(cx))
450 .expect("Generic release failed");
451 rooted!(&in(cx) let mut error = UndefinedValue());
453 Error::Type(c"Reader is released".to_owned()).to_jsval(
454 cx.into(),
455 &self.global(),
456 error.handle_mut(),
457 CanGc::from_cx(cx),
458 );
459
460 self.error_read_requests(cx, error.handle());
462 Ok(())
463 }
464
465 fn take_read_requests(&self) -> VecDeque<ReadRequest> {
466 mem::take(&mut *self.read_requests.borrow_mut())
467 }
468
469 fn error_read_requests(&self, cx: &mut js::context::JSContext, rval: SafeHandleValue) {
471 let mut read_requests = self.take_read_requests();
473
474 for request in read_requests.drain(0..) {
476 request.error_steps(cx, rval);
477 }
478 }
479
480 pub(crate) fn read(&self, cx: &mut js::context::JSContext, read_request: &ReadRequest) {
482 assert!(self.stream.get().is_some());
486
487 let stream = self.stream.get().unwrap();
488
489 stream.set_is_disturbed(true);
491 if stream.is_closed() {
493 read_request.close_steps(cx);
494 } else if stream.is_errored() {
495 rooted!(&in(cx) let mut error = UndefinedValue());
498 stream.get_stored_error(error.handle_mut());
499 read_request.error_steps(cx, error.handle());
500 } else {
501 assert!(stream.is_readable());
504 stream.perform_pull_steps(cx, read_request);
506 }
507 }
508
509 #[allow(clippy::too_many_arguments)]
512 pub(crate) fn byte_tee_append_native_handler_to_closed_promise(
513 &self,
514 cx: &mut js::context::JSContext,
515 branch_1: &ReadableStream,
516 branch_2: &ReadableStream,
517 canceled_1: Rc<Cell<bool>>,
518 canceled_2: Rc<Cell<bool>>,
519 cancel_promise: Rc<Promise>,
520 reader_version: Rc<Cell<u64>>,
521 expected_version: u64,
522 ) {
523 let branch_1_controller = branch_1.get_byte_controller();
525 let branch_2_controller = branch_2.get_byte_controller();
526
527 let global = self.global();
528 let handler = PromiseNativeHandler::new(
529 &global,
530 None,
531 Some(Box::new(ByteTeeClosedPromiseRejectionHandler {
532 branch_1_controller: Dom::from_ref(&branch_1_controller),
533 branch_2_controller: Dom::from_ref(&branch_2_controller),
534 canceled_1,
535 canceled_2,
536 cancel_promise,
537 reader_version,
538 expected_version,
539 })),
540 CanGc::from_cx(cx),
541 );
542
543 let mut realm = enter_auto_realm(cx, &*global);
544 let cx = &mut realm.current_realm();
545
546 self.closed_promise
547 .borrow()
548 .append_native_handler(cx, &handler);
549 }
550
551 pub(crate) fn default_tee_append_native_handler_to_closed_promise(
553 &self,
554 cx: &mut js::context::JSContext,
555 branch_1: &ReadableStream,
556 branch_2: &ReadableStream,
557 canceled_1: Rc<Cell<bool>>,
558 canceled_2: Rc<Cell<bool>>,
559 cancel_promise: Rc<Promise>,
560 ) {
561 let branch_1_controller = branch_1.get_default_controller();
562
563 let branch_2_controller = branch_2.get_default_controller();
564
565 let global = self.global();
566 let handler = PromiseNativeHandler::new(
567 &global,
568 None,
569 Some(Box::new(DefaultTeeClosedPromiseRejectionHandler {
570 branch_1_controller: Dom::from_ref(&branch_1_controller),
571 branch_2_controller: Dom::from_ref(&branch_2_controller),
572 canceled_1,
573 canceled_2,
574 cancel_promise,
575 })),
576 CanGc::from_cx(cx),
577 );
578
579 let mut realm = enter_auto_realm(cx, &*global);
580 let cx = &mut realm.current_realm();
581
582 self.closed_promise
583 .borrow()
584 .append_native_handler(cx, &handler);
585 }
586
587 pub(crate) fn read_all_bytes(
589 &self,
590 cx: &mut js::context::JSContext,
591 success_steps: Rc<ReadAllBytesSuccessSteps>,
592 failure_steps: Rc<ReadAllBytesFailureSteps>,
593 ) {
594 read_loop(cx, self, success_steps, failure_steps);
599 }
600
601 pub(crate) fn process_read_requests(
603 &self,
604 cx: &mut js::context::JSContext,
605 controller: DomRoot<ReadableByteStreamController>,
606 ) -> Fallible<()> {
607 while !self.read_requests.borrow().is_empty() {
609 if controller.get_queue_total_size() == 0.0 {
611 return Ok(());
612 }
613
614 let read_request = self.remove_read_request();
617
618 controller
620 .fill_read_request_from_queue(cx, &read_request)
621 .expect("Fill read request from queue failed");
622 }
623 Ok(())
624 }
625}
626
627impl ReadableStreamDefaultReaderMethods<crate::DomTypeHolder> for ReadableStreamDefaultReader {
628 fn Constructor(
630 global: &GlobalScope,
631 proto: Option<SafeHandleObject>,
632 can_gc: CanGc,
633 stream: &ReadableStream,
634 ) -> Fallible<DomRoot<Self>> {
635 let reader = Self::new_with_proto(global, proto, can_gc);
636
637 Self::set_up(&reader, stream, global, can_gc)?;
639
640 Ok(reader)
641 }
642
643 fn Read(&self, cx: &mut js::context::JSContext) -> Rc<Promise> {
645 if self.stream.get().is_none() {
647 rooted!(&in(cx) let mut error = UndefinedValue());
648 Error::Type(c"stream is undefined".to_owned()).to_jsval(
649 cx.into(),
650 &self.global(),
651 error.handle_mut(),
652 CanGc::from_cx(cx),
653 );
654 return Promise::new_rejected(
655 &self.global(),
656 cx.into(),
657 error.handle(),
658 CanGc::from_cx(cx),
659 );
660 }
661 let promise = Promise::new2(cx, &self.global());
663
664 let read_request = ReadRequest::Read(promise.clone());
678
679 self.read(cx, &read_request);
681
682 promise
684 }
685
686 fn ReleaseLock(&self, cx: &mut js::context::JSContext) -> Fallible<()> {
688 if self.stream.get().is_none() {
689 return Ok(());
691 }
692
693 self.release(cx)
695 }
696
697 fn Closed(&self) -> Rc<Promise> {
699 self.closed()
700 }
701
702 fn Cancel(&self, cx: &mut js::context::JSContext, reason: SafeHandleValue) -> Rc<Promise> {
704 self.generic_cancel(cx, &self.global(), reason)
705 }
706}
707
708impl ReadableStreamGenericReader for ReadableStreamDefaultReader {
709 fn get_closed_promise(&self) -> Rc<Promise> {
710 self.closed_promise.borrow().clone()
711 }
712
713 fn set_closed_promise(&self, promise: Rc<Promise>) {
714 *self.closed_promise.borrow_mut() = promise;
715 }
716
717 fn set_stream(&self, stream: Option<&ReadableStream>) {
718 self.stream.set(stream);
719 }
720
721 fn get_stream(&self) -> Option<DomRoot<ReadableStream>> {
722 self.stream.get()
723 }
724
725 fn as_default_reader(&self) -> Option<&ReadableStreamDefaultReader> {
726 Some(self)
727 }
728}