1use std::cell::Cell;
6use std::collections::VecDeque;
7use std::mem;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::context::JSContext;
12use js::jsapi::Heap;
13use js::jsval::{JSVal, UndefinedValue};
14use js::realm::CurrentRealm;
15use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
16use script_bindings::cell::DomRefCell;
17use script_bindings::reflector::{
18 Reflector, reflect_dom_object_with_cx, reflect_dom_object_with_proto_and_cx,
19};
20
21use super::byteteereadrequest::ByteTeeReadRequest;
22use super::readablebytestreamcontroller::ReadableByteStreamController;
23use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::{
24 ReadableStreamDefaultReaderMethods, ReadableStreamReadResult,
25};
26use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
27use crate::dom::bindings::reflector::DomGlobal;
28use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
29use crate::dom::bindings::trace::RootedTraceableBox;
30use crate::dom::globalscope::GlobalScope;
31use crate::dom::promise::Promise;
32use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
33use crate::dom::readablestream::{ReadableStream, bytes_from_chunk_jsval};
34use crate::dom::stream::defaultteereadrequest::DefaultTeeReadRequest;
35use crate::dom::stream::readablestreamgenericreader::ReadableStreamGenericReader;
36use crate::dom::types::ReadableStreamDefaultController;
37use crate::realms::enter_auto_realm;
38
39type ReadAllBytesSuccessSteps = dyn Fn(&mut js::context::JSContext, &[u8]);
40type ReadAllBytesFailureSteps = dyn Fn(&mut js::context::JSContext, SafeHandleValue);
41
42impl js::gc::Rootable for ContinueReadMicrotask {}
43
44#[derive(Clone, JSTraceable, MallocSizeOf)]
50#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
51struct ContinueReadMicrotask {
52 reader: Dom<ReadableStreamDefaultReader>,
53 request: ReadRequest,
54}
55
56impl Callback for ContinueReadMicrotask {
57 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
58 self.reader.read(cx, &self.request);
61 }
62}
63
64fn read_loop(
66 cx: &mut js::context::JSContext,
67 reader: &ReadableStreamDefaultReader,
68 success_steps: Rc<ReadAllBytesSuccessSteps>,
69 failure_steps: Rc<ReadAllBytesFailureSteps>,
70) {
71 let req = ReadRequest::ReadLoop {
76 success_steps,
77 failure_steps,
78 reader: Dom::from_ref(reader),
79 bytes: Rc::new(DomRefCell::new(Vec::new())),
80 };
81 reader.read(cx, &req);
83}
84
85#[derive(Clone, JSTraceable, MallocSizeOf)]
87pub(crate) enum ReadRequest {
88 Read(#[conditional_malloc_size_of] Rc<Promise>),
90 DefaultTee {
92 tee_read_request: Dom<DefaultTeeReadRequest>,
93 },
94 ReadLoop {
97 #[ignore_malloc_size_of = "dyn Fn"]
98 #[no_trace]
99 success_steps: Rc<ReadAllBytesSuccessSteps>,
100 #[ignore_malloc_size_of = "dyn Fn"]
101 #[no_trace]
102 failure_steps: Rc<ReadAllBytesFailureSteps>,
103 reader: Dom<ReadableStreamDefaultReader>,
104 #[conditional_malloc_size_of]
105 bytes: Rc<DomRefCell<Vec<u8>>>,
106 },
107 ByteTee {
108 byte_tee_read_request: Dom<ByteTeeReadRequest>,
109 },
110}
111
112impl ReadRequest {
113 pub(crate) fn chunk_steps(
115 &self,
116 cx: &mut js::context::JSContext,
117 chunk: RootedTraceableBox<Heap<JSVal>>,
118 global: &GlobalScope,
119 ) {
120 match self {
121 ReadRequest::Read(promise) => {
122 promise.resolve_native(
125 cx,
126 &ReadableStreamReadResult {
127 done: Some(false),
128 value: chunk,
129 },
130 );
131 },
132 ReadRequest::DefaultTee { tee_read_request } => {
133 tee_read_request.enqueue_chunk_steps(cx, chunk);
134 },
135 ReadRequest::ByteTee {
136 byte_tee_read_request,
137 } => {
138 byte_tee_read_request.enqueue_chunk_steps(cx, global, chunk);
139 },
140 ReadRequest::ReadLoop {
141 success_steps: _,
142 failure_steps,
143 reader,
144 bytes,
145 } => {
146 let global = reader.global();
148
149 match bytes_from_chunk_jsval(cx, &chunk) {
150 Ok(vec) => {
151 bytes.borrow_mut().extend_from_slice(&vec);
153
154 let tick = Promise::new(cx, &global);
158 tick.resolve_native(cx, &());
159
160 let handler = PromiseNativeHandler::new(
161 cx,
162 &global,
163 Some(Box::new(ContinueReadMicrotask {
164 reader: Dom::from_ref(reader),
165 request: self.clone(),
166 })),
167 None,
168 );
169
170 let mut realm = enter_auto_realm(cx, &*global);
171 let cx = &mut realm.current_realm();
172 tick.append_native_handler(cx, &handler);
173 },
174 Err(err) => {
175 rooted!(&in(cx) let mut v = UndefinedValue());
177 err.to_jsval(cx, &global, v.handle_mut());
178 (failure_steps)(cx, v.handle());
179 },
180 }
181 },
182 }
183 }
184
185 pub(crate) fn close_steps(&self, cx: &mut js::context::JSContext) {
187 match self {
188 ReadRequest::Read(promise) => {
189 let result = RootedTraceableBox::new(Heap::default());
192 result.set(UndefinedValue());
193 promise.resolve_native(
194 cx,
195 &ReadableStreamReadResult {
196 done: Some(true),
197 value: result,
198 },
199 );
200 },
201 ReadRequest::DefaultTee { tee_read_request } => {
202 tee_read_request.close_steps(cx);
203 },
204 ReadRequest::ByteTee {
205 byte_tee_read_request,
206 } => {
207 byte_tee_read_request
208 .close_steps(cx)
209 .expect("ByteTeeReadRequest close steps should not fail");
210 },
211 ReadRequest::ReadLoop {
212 success_steps,
213 reader,
214 bytes,
215 ..
216 } => {
217 (success_steps)(cx, &bytes.borrow());
219
220 reader
221 .release(cx)
222 .expect("Releasing the read-all-bytes reader should succeed");
223 },
224 }
225 }
226
227 pub(crate) fn error_steps(&self, cx: &mut js::context::JSContext, e: SafeHandleValue) {
229 match self {
230 ReadRequest::Read(promise) => {
231 promise.reject_native(cx, &e)
234 },
235 ReadRequest::DefaultTee { tee_read_request } => {
236 tee_read_request.error_steps();
237 },
238 ReadRequest::ByteTee {
239 byte_tee_read_request,
240 } => {
241 byte_tee_read_request.error_steps();
242 },
243 ReadRequest::ReadLoop {
244 failure_steps,
245 reader,
246 ..
247 } => {
248 (failure_steps)(cx, e);
250
251 reader
252 .release(cx)
253 .expect("Releasing the read-all-bytes reader should succeed");
254 },
255 }
256 }
257}
258
259#[derive(Clone, JSTraceable, MallocSizeOf)]
262#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
263struct ByteTeeClosedPromiseRejectionHandler {
264 branch_1_controller: Dom<ReadableByteStreamController>,
265 branch_2_controller: Dom<ReadableByteStreamController>,
266 #[conditional_malloc_size_of]
267 canceled_1: Rc<Cell<bool>>,
268 #[conditional_malloc_size_of]
269 canceled_2: Rc<Cell<bool>>,
270 #[conditional_malloc_size_of]
271 cancel_promise: Rc<Promise>,
272 #[conditional_malloc_size_of]
273 reader_version: Rc<Cell<u64>>,
274 expected_version: u64,
275}
276
277impl Callback for ByteTeeClosedPromiseRejectionHandler {
278 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
281 if self.reader_version.get() != self.expected_version {
283 return;
284 }
285
286 self.branch_1_controller.error(cx, v);
288
289 self.branch_2_controller.error(cx, v);
291
292 if !self.canceled_1.get() || !self.canceled_2.get() {
294 self.cancel_promise.resolve_native(cx, &());
295 }
296 }
297}
298
299#[derive(Clone, JSTraceable, MallocSizeOf)]
302#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
303struct DefaultTeeClosedPromiseRejectionHandler {
304 branch_1_controller: Dom<ReadableStreamDefaultController>,
305 branch_2_controller: Dom<ReadableStreamDefaultController>,
306 #[conditional_malloc_size_of]
307 canceled_1: Rc<Cell<bool>>,
308 #[conditional_malloc_size_of]
309 canceled_2: Rc<Cell<bool>>,
310 #[conditional_malloc_size_of]
311 cancel_promise: Rc<Promise>,
312}
313
314impl Callback for DefaultTeeClosedPromiseRejectionHandler {
315 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
318 self.branch_1_controller.error(cx, v);
320 self.branch_2_controller.error(cx, v);
322
323 if !self.canceled_1.get() || !self.canceled_2.get() {
325 self.cancel_promise.resolve_native(cx, &());
326 }
327 }
328}
329
330#[dom_struct]
332pub(crate) struct ReadableStreamDefaultReader {
333 reflector_: Reflector,
334
335 stream: MutNullableDom<ReadableStream>,
337
338 read_requests: DomRefCell<VecDeque<ReadRequest>>,
339
340 #[conditional_malloc_size_of]
342 closed_promise: DomRefCell<Rc<Promise>>,
343}
344
345impl ReadableStreamDefaultReader {
346 fn new_with_proto(
347 cx: &mut JSContext,
348 global: &GlobalScope,
349 proto: Option<SafeHandleObject>,
350 ) -> DomRoot<ReadableStreamDefaultReader> {
351 reflect_dom_object_with_proto_and_cx(
352 Box::new(ReadableStreamDefaultReader::new_inherited(cx, global)),
353 global,
354 proto,
355 cx,
356 )
357 }
358
359 fn new_inherited(cx: &mut JSContext, global: &GlobalScope) -> ReadableStreamDefaultReader {
360 ReadableStreamDefaultReader {
361 reflector_: Reflector::new(),
362 stream: MutNullableDom::new(None),
363 read_requests: DomRefCell::new(Default::default()),
364 closed_promise: DomRefCell::new(Promise::new(cx, global)),
365 }
366 }
367
368 pub(crate) fn new(
369 cx: &mut JSContext,
370 global: &GlobalScope,
371 ) -> DomRoot<ReadableStreamDefaultReader> {
372 reflect_dom_object_with_cx(Box::new(Self::new_inherited(cx, global)), global, cx)
373 }
374
375 pub(crate) fn set_up(
377 &self,
378 cx: &mut JSContext,
379 stream: &ReadableStream,
380 global: &GlobalScope,
381 ) -> Fallible<()> {
382 if stream.is_locked() {
384 return Err(Error::Type(c"stream is locked".to_owned()));
385 }
386 self.generic_initialize(cx, global, stream);
389
390 self.read_requests.borrow_mut().clear();
392
393 Ok(())
394 }
395
396 pub(crate) fn close(&self, cx: &mut js::context::JSContext) {
398 self.closed_promise.borrow().resolve_native(cx, &());
400 let mut read_requests = self.take_read_requests();
403 for request in read_requests.drain(0..) {
406 request.close_steps(cx);
408 }
409 }
410
411 pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
413 self.read_requests
414 .borrow_mut()
415 .push_back(read_request.clone());
416 }
417
418 pub(crate) fn get_num_read_requests(&self) -> usize {
420 self.read_requests.borrow().len()
421 }
422
423 pub(crate) fn error(&self, cx: &mut js::context::JSContext, e: SafeHandleValue) {
425 self.closed_promise.borrow().reject_native(cx, &e);
427
428 self.closed_promise.borrow().set_promise_is_handled(cx);
430
431 self.error_read_requests(cx, e);
433 }
434
435 pub(crate) fn remove_read_request(&self) -> ReadRequest {
437 self.read_requests
438 .borrow_mut()
439 .pop_front()
440 .expect("Reader must have read request when remove is called into.")
441 }
442
443 pub(crate) fn release(&self, cx: &mut js::context::JSContext) -> Fallible<()> {
445 self.generic_release(cx).expect("Generic release failed");
447 rooted!(&in(cx) let mut error = UndefinedValue());
449 Error::Type(c"Reader is released".to_owned()).to_jsval(
450 cx,
451 &self.global(),
452 error.handle_mut(),
453 );
454
455 self.error_read_requests(cx, error.handle());
457 Ok(())
458 }
459
460 fn take_read_requests(&self) -> VecDeque<ReadRequest> {
461 mem::take(&mut *self.read_requests.borrow_mut())
462 }
463
464 fn error_read_requests(&self, cx: &mut js::context::JSContext, rval: SafeHandleValue) {
466 let mut read_requests = self.take_read_requests();
468
469 for request in read_requests.drain(0..) {
471 request.error_steps(cx, rval);
472 }
473 }
474
475 pub(crate) fn read(&self, cx: &mut js::context::JSContext, read_request: &ReadRequest) {
477 assert!(self.stream.get().is_some());
481
482 let stream = self.stream.get().unwrap();
483
484 stream.set_is_disturbed(true);
486 if stream.is_closed() {
488 read_request.close_steps(cx);
489 } else if stream.is_errored() {
490 rooted!(&in(cx) let mut error = UndefinedValue());
493 stream.get_stored_error(error.handle_mut());
494 read_request.error_steps(cx, error.handle());
495 } else {
496 assert!(stream.is_readable());
499 stream.perform_pull_steps(cx, read_request);
501 }
502 }
503
504 #[allow(clippy::too_many_arguments)]
507 pub(crate) fn byte_tee_append_native_handler_to_closed_promise(
508 &self,
509 cx: &mut js::context::JSContext,
510 branch_1: &ReadableStream,
511 branch_2: &ReadableStream,
512 canceled_1: Rc<Cell<bool>>,
513 canceled_2: Rc<Cell<bool>>,
514 cancel_promise: Rc<Promise>,
515 reader_version: Rc<Cell<u64>>,
516 expected_version: u64,
517 ) {
518 let branch_1_controller = branch_1.get_byte_controller();
520 let branch_2_controller = branch_2.get_byte_controller();
521
522 let global = self.global();
523 let handler = PromiseNativeHandler::new(
524 cx,
525 &global,
526 None,
527 Some(Box::new(ByteTeeClosedPromiseRejectionHandler {
528 branch_1_controller: Dom::from_ref(&branch_1_controller),
529 branch_2_controller: Dom::from_ref(&branch_2_controller),
530 canceled_1,
531 canceled_2,
532 cancel_promise,
533 reader_version,
534 expected_version,
535 })),
536 );
537
538 let mut realm = enter_auto_realm(cx, &*global);
539 let cx = &mut realm.current_realm();
540
541 self.closed_promise
542 .borrow()
543 .append_native_handler(cx, &handler);
544 }
545
546 pub(crate) fn default_tee_append_native_handler_to_closed_promise(
548 &self,
549 cx: &mut js::context::JSContext,
550 branch_1: &ReadableStream,
551 branch_2: &ReadableStream,
552 canceled_1: Rc<Cell<bool>>,
553 canceled_2: Rc<Cell<bool>>,
554 cancel_promise: Rc<Promise>,
555 ) {
556 let branch_1_controller = branch_1.get_default_controller();
557
558 let branch_2_controller = branch_2.get_default_controller();
559
560 let global = self.global();
561 let handler = PromiseNativeHandler::new(
562 cx,
563 &global,
564 None,
565 Some(Box::new(DefaultTeeClosedPromiseRejectionHandler {
566 branch_1_controller: Dom::from_ref(&branch_1_controller),
567 branch_2_controller: Dom::from_ref(&branch_2_controller),
568 canceled_1,
569 canceled_2,
570 cancel_promise,
571 })),
572 );
573
574 let mut realm = enter_auto_realm(cx, &*global);
575 let cx = &mut realm.current_realm();
576
577 self.closed_promise
578 .borrow()
579 .append_native_handler(cx, &handler);
580 }
581
582 pub(crate) fn read_all_bytes(
584 &self,
585 cx: &mut js::context::JSContext,
586 success_steps: Rc<ReadAllBytesSuccessSteps>,
587 failure_steps: Rc<ReadAllBytesFailureSteps>,
588 ) {
589 read_loop(cx, self, success_steps, failure_steps);
594 }
595
596 pub(crate) fn process_read_requests(
598 &self,
599 cx: &mut js::context::JSContext,
600 controller: DomRoot<ReadableByteStreamController>,
601 ) -> Fallible<()> {
602 while !self.read_requests.borrow().is_empty() {
604 if controller.get_queue_total_size() == 0.0 {
606 return Ok(());
607 }
608
609 let read_request = self.remove_read_request();
612
613 controller
615 .fill_read_request_from_queue(cx, &read_request)
616 .expect("Fill read request from queue failed");
617 }
618 Ok(())
619 }
620}
621
622impl ReadableStreamDefaultReaderMethods<crate::DomTypeHolder> for ReadableStreamDefaultReader {
623 fn Constructor(
625 cx: &mut JSContext,
626 global: &GlobalScope,
627 proto: Option<SafeHandleObject>,
628 stream: &ReadableStream,
629 ) -> Fallible<DomRoot<Self>> {
630 let reader = Self::new_with_proto(cx, global, proto);
631
632 reader.set_up(cx, stream, global)?;
634
635 Ok(reader)
636 }
637
638 fn Read(&self, cx: &mut js::context::JSContext) -> Rc<Promise> {
640 if self.stream.get().is_none() {
642 rooted!(&in(cx) let mut error = UndefinedValue());
643 Error::Type(c"stream is undefined".to_owned()).to_jsval(
644 cx,
645 &self.global(),
646 error.handle_mut(),
647 );
648 return Promise::new_rejected(cx, &self.global(), error.handle());
649 }
650 let promise = Promise::new(cx, &self.global());
652
653 let read_request = ReadRequest::Read(promise.clone());
667
668 self.read(cx, &read_request);
670
671 promise
673 }
674
675 fn ReleaseLock(&self, cx: &mut js::context::JSContext) -> Fallible<()> {
677 if self.stream.get().is_none() {
678 return Ok(());
680 }
681
682 self.release(cx)
684 }
685
686 fn Closed(&self) -> Rc<Promise> {
688 self.closed()
689 }
690
691 fn Cancel(&self, cx: &mut js::context::JSContext, reason: SafeHandleValue) -> Rc<Promise> {
693 self.generic_cancel(cx, &self.global(), reason)
694 }
695}
696
697impl ReadableStreamGenericReader for ReadableStreamDefaultReader {
698 fn get_closed_promise(&self) -> Rc<Promise> {
699 self.closed_promise.borrow().clone()
700 }
701
702 fn set_closed_promise(&self, promise: Rc<Promise>) {
703 *self.closed_promise.borrow_mut() = promise;
704 }
705
706 fn set_stream(&self, stream: Option<&ReadableStream>) {
707 self.stream.set(stream);
708 }
709
710 fn get_stream(&self) -> Option<DomRoot<ReadableStream>> {
711 self.stream.get()
712 }
713
714 fn as_default_reader(&self) -> Option<&ReadableStreamDefaultReader> {
715 Some(self)
716 }
717}