1use std::cell::Cell;
6use std::collections::VecDeque;
7use std::mem;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::jsapi::Heap;
12use js::jsval::{JSVal, UndefinedValue};
13use js::realm::CurrentRealm;
14use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
15use script_bindings::cell::DomRefCell;
16use script_bindings::reflector::{Reflector, reflect_dom_object, reflect_dom_object_with_proto};
17
18use super::byteteereadrequest::ByteTeeReadRequest;
19use super::readablebytestreamcontroller::ReadableByteStreamController;
20use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::{
21 ReadableStreamDefaultReaderMethods, ReadableStreamReadResult,
22};
23use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
24use crate::dom::bindings::reflector::DomGlobal;
25use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
26use crate::dom::bindings::trace::RootedTraceableBox;
27use crate::dom::globalscope::GlobalScope;
28use crate::dom::promise::Promise;
29use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
30use crate::dom::readablestream::{ReadableStream, bytes_from_chunk_jsval};
31use crate::dom::stream::defaultteereadrequest::DefaultTeeReadRequest;
32use crate::dom::stream::readablestreamgenericreader::ReadableStreamGenericReader;
33use crate::dom::types::ReadableStreamDefaultController;
34use crate::realms::enter_auto_realm;
35use crate::script_runtime::CanGc;
36
37type ReadAllBytesSuccessSteps = dyn Fn(&mut js::context::JSContext, &[u8]);
38type ReadAllBytesFailureSteps = dyn Fn(&mut js::context::JSContext, SafeHandleValue);
39
40impl js::gc::Rootable for ContinueReadMicrotask {}
41
42#[derive(Clone, JSTraceable, MallocSizeOf)]
48#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
49struct ContinueReadMicrotask {
50 reader: Dom<ReadableStreamDefaultReader>,
51 request: ReadRequest,
52}
53
54impl Callback for ContinueReadMicrotask {
55 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
56 self.reader.read(cx, &self.request);
59 }
60}
61
62fn read_loop(
64 cx: &mut js::context::JSContext,
65 reader: &ReadableStreamDefaultReader,
66 success_steps: Rc<ReadAllBytesSuccessSteps>,
67 failure_steps: Rc<ReadAllBytesFailureSteps>,
68) {
69 let req = ReadRequest::ReadLoop {
74 success_steps,
75 failure_steps,
76 reader: Dom::from_ref(reader),
77 bytes: Rc::new(DomRefCell::new(Vec::new())),
78 };
79 reader.read(cx, &req);
81}
82
83#[derive(Clone, JSTraceable, MallocSizeOf)]
85pub(crate) enum ReadRequest {
86 Read(#[conditional_malloc_size_of] Rc<Promise>),
88 DefaultTee {
90 tee_read_request: Dom<DefaultTeeReadRequest>,
91 },
92 ReadLoop {
95 #[ignore_malloc_size_of = "dyn Fn"]
96 #[no_trace]
97 success_steps: Rc<ReadAllBytesSuccessSteps>,
98 #[ignore_malloc_size_of = "dyn Fn"]
99 #[no_trace]
100 failure_steps: Rc<ReadAllBytesFailureSteps>,
101 reader: Dom<ReadableStreamDefaultReader>,
102 #[conditional_malloc_size_of]
103 bytes: Rc<DomRefCell<Vec<u8>>>,
104 },
105 ByteTee {
106 byte_tee_read_request: Dom<ByteTeeReadRequest>,
107 },
108}
109
110impl ReadRequest {
111 pub(crate) fn chunk_steps(
113 &self,
114 cx: &mut js::context::JSContext,
115 chunk: RootedTraceableBox<Heap<JSVal>>,
116 global: &GlobalScope,
117 ) {
118 match self {
119 ReadRequest::Read(promise) => {
120 promise.resolve_native(
123 &ReadableStreamReadResult {
124 done: Some(false),
125 value: chunk,
126 },
127 CanGc::from_cx(cx),
128 );
129 },
130 ReadRequest::DefaultTee { tee_read_request } => {
131 tee_read_request.enqueue_chunk_steps(chunk);
132 },
133 ReadRequest::ByteTee {
134 byte_tee_read_request,
135 } => {
136 byte_tee_read_request.enqueue_chunk_steps(global, chunk);
137 },
138 ReadRequest::ReadLoop {
139 success_steps: _,
140 failure_steps,
141 reader,
142 bytes,
143 } => {
144 let global = reader.global();
146
147 match bytes_from_chunk_jsval(cx, &chunk) {
148 Ok(vec) => {
149 bytes.borrow_mut().extend_from_slice(&vec);
151
152 let tick = Promise::new2(cx, &global);
156 tick.resolve_native_with_cx(cx, &());
157
158 let handler = PromiseNativeHandler::new(
159 cx,
160 &global,
161 Some(Box::new(ContinueReadMicrotask {
162 reader: Dom::from_ref(reader),
163 request: self.clone(),
164 })),
165 None,
166 );
167
168 let mut realm = enter_auto_realm(cx, &*global);
169 let cx = &mut realm.current_realm();
170 tick.append_native_handler(cx, &handler);
171 },
172 Err(err) => {
173 rooted!(&in(cx) let mut v = UndefinedValue());
175 err.to_jsval(cx.into(), &global, v.handle_mut(), CanGc::from_cx(cx));
176 (failure_steps)(cx, v.handle());
177 },
178 }
179 },
180 }
181 }
182
183 pub(crate) fn close_steps(&self, cx: &mut js::context::JSContext) {
185 match self {
186 ReadRequest::Read(promise) => {
187 let result = RootedTraceableBox::new(Heap::default());
190 result.set(UndefinedValue());
191 promise.resolve_native(
192 &ReadableStreamReadResult {
193 done: Some(true),
194 value: result,
195 },
196 CanGc::from_cx(cx),
197 );
198 },
199 ReadRequest::DefaultTee { tee_read_request } => {
200 tee_read_request.close_steps(cx);
201 },
202 ReadRequest::ByteTee {
203 byte_tee_read_request,
204 } => {
205 byte_tee_read_request
206 .close_steps(cx)
207 .expect("ByteTeeReadRequest close steps should not fail");
208 },
209 ReadRequest::ReadLoop {
210 success_steps,
211 reader,
212 bytes,
213 ..
214 } => {
215 (success_steps)(cx, &bytes.borrow());
217
218 reader
219 .release(cx)
220 .expect("Releasing the read-all-bytes reader should succeed");
221 },
222 }
223 }
224
225 pub(crate) fn error_steps(&self, cx: &mut js::context::JSContext, e: SafeHandleValue) {
227 match self {
228 ReadRequest::Read(promise) => {
229 promise.reject_native(&e, CanGc::from_cx(cx))
232 },
233 ReadRequest::DefaultTee { tee_read_request } => {
234 tee_read_request.error_steps();
235 },
236 ReadRequest::ByteTee {
237 byte_tee_read_request,
238 } => {
239 byte_tee_read_request.error_steps();
240 },
241 ReadRequest::ReadLoop {
242 failure_steps,
243 reader,
244 ..
245 } => {
246 (failure_steps)(cx, e);
248
249 reader
250 .release(cx)
251 .expect("Releasing the read-all-bytes reader should succeed");
252 },
253 }
254 }
255}
256
257#[derive(Clone, JSTraceable, MallocSizeOf)]
260#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
261struct ByteTeeClosedPromiseRejectionHandler {
262 branch_1_controller: Dom<ReadableByteStreamController>,
263 branch_2_controller: Dom<ReadableByteStreamController>,
264 #[conditional_malloc_size_of]
265 canceled_1: Rc<Cell<bool>>,
266 #[conditional_malloc_size_of]
267 canceled_2: Rc<Cell<bool>>,
268 #[conditional_malloc_size_of]
269 cancel_promise: Rc<Promise>,
270 #[conditional_malloc_size_of]
271 reader_version: Rc<Cell<u64>>,
272 expected_version: u64,
273}
274
275impl Callback for ByteTeeClosedPromiseRejectionHandler {
276 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
279 if self.reader_version.get() != self.expected_version {
281 return;
282 }
283
284 self.branch_1_controller.error(cx, v);
286
287 self.branch_2_controller.error(cx, v);
289
290 if !self.canceled_1.get() || !self.canceled_2.get() {
292 self.cancel_promise.resolve_native_with_cx(cx, &());
293 }
294 }
295}
296
297#[derive(Clone, JSTraceable, MallocSizeOf)]
300#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
301struct DefaultTeeClosedPromiseRejectionHandler {
302 branch_1_controller: Dom<ReadableStreamDefaultController>,
303 branch_2_controller: Dom<ReadableStreamDefaultController>,
304 #[conditional_malloc_size_of]
305 canceled_1: Rc<Cell<bool>>,
306 #[conditional_malloc_size_of]
307 canceled_2: Rc<Cell<bool>>,
308 #[conditional_malloc_size_of]
309 cancel_promise: Rc<Promise>,
310}
311
312impl Callback for DefaultTeeClosedPromiseRejectionHandler {
313 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
316 self.branch_1_controller.error(cx, v);
318 self.branch_2_controller.error(cx, v);
320
321 if !self.canceled_1.get() || !self.canceled_2.get() {
323 self.cancel_promise.resolve_native_with_cx(cx, &());
324 }
325 }
326}
327
328#[dom_struct]
330pub(crate) struct ReadableStreamDefaultReader {
331 reflector_: Reflector,
332
333 stream: MutNullableDom<ReadableStream>,
335
336 read_requests: DomRefCell<VecDeque<ReadRequest>>,
337
338 #[conditional_malloc_size_of]
340 closed_promise: DomRefCell<Rc<Promise>>,
341}
342
343impl ReadableStreamDefaultReader {
344 fn new_with_proto(
345 global: &GlobalScope,
346 proto: Option<SafeHandleObject>,
347 can_gc: CanGc,
348 ) -> DomRoot<ReadableStreamDefaultReader> {
349 reflect_dom_object_with_proto(
350 Box::new(ReadableStreamDefaultReader::new_inherited(global, can_gc)),
351 global,
352 proto,
353 can_gc,
354 )
355 }
356
357 fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> ReadableStreamDefaultReader {
358 ReadableStreamDefaultReader {
359 reflector_: Reflector::new(),
360 stream: MutNullableDom::new(None),
361 read_requests: DomRefCell::new(Default::default()),
362 closed_promise: DomRefCell::new(Promise::new(global, can_gc)),
363 }
364 }
365
366 pub(crate) fn new(global: &GlobalScope, can_gc: CanGc) -> DomRoot<ReadableStreamDefaultReader> {
367 reflect_dom_object(
368 Box::new(Self::new_inherited(global, can_gc)),
369 global,
370 can_gc,
371 )
372 }
373
374 pub(crate) fn set_up(
376 &self,
377 stream: &ReadableStream,
378 global: &GlobalScope,
379 can_gc: CanGc,
380 ) -> Fallible<()> {
381 if stream.is_locked() {
383 return Err(Error::Type(c"stream is locked".to_owned()));
384 }
385 self.generic_initialize(global, stream, can_gc);
388
389 self.read_requests.borrow_mut().clear();
391
392 Ok(())
393 }
394
395 pub(crate) fn close(&self, cx: &mut js::context::JSContext) {
397 self.closed_promise.borrow().resolve_native_with_cx(cx, &());
399 let mut read_requests = self.take_read_requests();
402 for request in read_requests.drain(0..) {
405 request.close_steps(cx);
407 }
408 }
409
410 pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
412 self.read_requests
413 .borrow_mut()
414 .push_back(read_request.clone());
415 }
416
417 pub(crate) fn get_num_read_requests(&self) -> usize {
419 self.read_requests.borrow().len()
420 }
421
422 pub(crate) fn error(&self, cx: &mut js::context::JSContext, e: SafeHandleValue) {
424 self.closed_promise.borrow().reject_native_with_cx(cx, &e);
426
427 self.closed_promise.borrow().set_promise_is_handled();
429
430 self.error_read_requests(cx, e);
432 }
433
434 pub(crate) fn remove_read_request(&self) -> ReadRequest {
436 self.read_requests
437 .borrow_mut()
438 .pop_front()
439 .expect("Reader must have read request when remove is called into.")
440 }
441
442 pub(crate) fn release(&self, cx: &mut js::context::JSContext) -> Fallible<()> {
444 self.generic_release(CanGc::from_cx(cx))
446 .expect("Generic release failed");
447 rooted!(&in(cx) let mut error = UndefinedValue());
449 Error::Type(c"Reader is released".to_owned()).to_jsval(
450 cx.into(),
451 &self.global(),
452 error.handle_mut(),
453 CanGc::from_cx(cx),
454 );
455
456 self.error_read_requests(cx, error.handle());
458 Ok(())
459 }
460
461 fn take_read_requests(&self) -> VecDeque<ReadRequest> {
462 mem::take(&mut *self.read_requests.borrow_mut())
463 }
464
465 fn error_read_requests(&self, cx: &mut js::context::JSContext, rval: SafeHandleValue) {
467 let mut read_requests = self.take_read_requests();
469
470 for request in read_requests.drain(0..) {
472 request.error_steps(cx, rval);
473 }
474 }
475
476 pub(crate) fn read(&self, cx: &mut js::context::JSContext, read_request: &ReadRequest) {
478 assert!(self.stream.get().is_some());
482
483 let stream = self.stream.get().unwrap();
484
485 stream.set_is_disturbed(true);
487 if stream.is_closed() {
489 read_request.close_steps(cx);
490 } else if stream.is_errored() {
491 rooted!(&in(cx) let mut error = UndefinedValue());
494 stream.get_stored_error(error.handle_mut());
495 read_request.error_steps(cx, error.handle());
496 } else {
497 assert!(stream.is_readable());
500 stream.perform_pull_steps(cx, read_request);
502 }
503 }
504
505 #[allow(clippy::too_many_arguments)]
508 pub(crate) fn byte_tee_append_native_handler_to_closed_promise(
509 &self,
510 cx: &mut js::context::JSContext,
511 branch_1: &ReadableStream,
512 branch_2: &ReadableStream,
513 canceled_1: Rc<Cell<bool>>,
514 canceled_2: Rc<Cell<bool>>,
515 cancel_promise: Rc<Promise>,
516 reader_version: Rc<Cell<u64>>,
517 expected_version: u64,
518 ) {
519 let branch_1_controller = branch_1.get_byte_controller();
521 let branch_2_controller = branch_2.get_byte_controller();
522
523 let global = self.global();
524 let handler = PromiseNativeHandler::new(
525 cx,
526 &global,
527 None,
528 Some(Box::new(ByteTeeClosedPromiseRejectionHandler {
529 branch_1_controller: Dom::from_ref(&branch_1_controller),
530 branch_2_controller: Dom::from_ref(&branch_2_controller),
531 canceled_1,
532 canceled_2,
533 cancel_promise,
534 reader_version,
535 expected_version,
536 })),
537 );
538
539 let mut realm = enter_auto_realm(cx, &*global);
540 let cx = &mut realm.current_realm();
541
542 self.closed_promise
543 .borrow()
544 .append_native_handler(cx, &handler);
545 }
546
547 pub(crate) fn default_tee_append_native_handler_to_closed_promise(
549 &self,
550 cx: &mut js::context::JSContext,
551 branch_1: &ReadableStream,
552 branch_2: &ReadableStream,
553 canceled_1: Rc<Cell<bool>>,
554 canceled_2: Rc<Cell<bool>>,
555 cancel_promise: Rc<Promise>,
556 ) {
557 let branch_1_controller = branch_1.get_default_controller();
558
559 let branch_2_controller = branch_2.get_default_controller();
560
561 let global = self.global();
562 let handler = PromiseNativeHandler::new(
563 cx,
564 &global,
565 None,
566 Some(Box::new(DefaultTeeClosedPromiseRejectionHandler {
567 branch_1_controller: Dom::from_ref(&branch_1_controller),
568 branch_2_controller: Dom::from_ref(&branch_2_controller),
569 canceled_1,
570 canceled_2,
571 cancel_promise,
572 })),
573 );
574
575 let mut realm = enter_auto_realm(cx, &*global);
576 let cx = &mut realm.current_realm();
577
578 self.closed_promise
579 .borrow()
580 .append_native_handler(cx, &handler);
581 }
582
583 pub(crate) fn read_all_bytes(
585 &self,
586 cx: &mut js::context::JSContext,
587 success_steps: Rc<ReadAllBytesSuccessSteps>,
588 failure_steps: Rc<ReadAllBytesFailureSteps>,
589 ) {
590 read_loop(cx, self, success_steps, failure_steps);
595 }
596
597 pub(crate) fn process_read_requests(
599 &self,
600 cx: &mut js::context::JSContext,
601 controller: DomRoot<ReadableByteStreamController>,
602 ) -> Fallible<()> {
603 while !self.read_requests.borrow().is_empty() {
605 if controller.get_queue_total_size() == 0.0 {
607 return Ok(());
608 }
609
610 let read_request = self.remove_read_request();
613
614 controller
616 .fill_read_request_from_queue(cx, &read_request)
617 .expect("Fill read request from queue failed");
618 }
619 Ok(())
620 }
621}
622
623impl ReadableStreamDefaultReaderMethods<crate::DomTypeHolder> for ReadableStreamDefaultReader {
624 fn Constructor(
626 global: &GlobalScope,
627 proto: Option<SafeHandleObject>,
628 can_gc: CanGc,
629 stream: &ReadableStream,
630 ) -> Fallible<DomRoot<Self>> {
631 let reader = Self::new_with_proto(global, proto, can_gc);
632
633 Self::set_up(&reader, stream, global, can_gc)?;
635
636 Ok(reader)
637 }
638
639 fn Read(&self, cx: &mut js::context::JSContext) -> Rc<Promise> {
641 if self.stream.get().is_none() {
643 rooted!(&in(cx) let mut error = UndefinedValue());
644 Error::Type(c"stream is undefined".to_owned()).to_jsval(
645 cx.into(),
646 &self.global(),
647 error.handle_mut(),
648 CanGc::from_cx(cx),
649 );
650 return Promise::new_rejected(
651 &self.global(),
652 cx.into(),
653 error.handle(),
654 CanGc::from_cx(cx),
655 );
656 }
657 let promise = Promise::new2(cx, &self.global());
659
660 let read_request = ReadRequest::Read(promise.clone());
674
675 self.read(cx, &read_request);
677
678 promise
680 }
681
682 fn ReleaseLock(&self, cx: &mut js::context::JSContext) -> Fallible<()> {
684 if self.stream.get().is_none() {
685 return Ok(());
687 }
688
689 self.release(cx)
691 }
692
693 fn Closed(&self) -> Rc<Promise> {
695 self.closed()
696 }
697
698 fn Cancel(&self, cx: &mut js::context::JSContext, reason: SafeHandleValue) -> Rc<Promise> {
700 self.generic_cancel(cx, &self.global(), reason)
701 }
702}
703
704impl ReadableStreamGenericReader for ReadableStreamDefaultReader {
705 fn get_closed_promise(&self) -> Rc<Promise> {
706 self.closed_promise.borrow().clone()
707 }
708
709 fn set_closed_promise(&self, promise: Rc<Promise>) {
710 *self.closed_promise.borrow_mut() = promise;
711 }
712
713 fn set_stream(&self, stream: Option<&ReadableStream>) {
714 self.stream.set(stream);
715 }
716
717 fn get_stream(&self) -> Option<DomRoot<ReadableStream>> {
718 self.stream.get()
719 }
720
721 fn as_default_reader(&self) -> Option<&ReadableStreamDefaultReader> {
722 Some(self)
723 }
724}