1use std::cell::Cell;
6use std::collections::VecDeque;
7use std::mem;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::context::JSContext;
12use js::jsapi::Heap;
13use js::jsval::{JSVal, UndefinedValue};
14use js::realm::CurrentRealm;
15use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
16use script_bindings::cell::DomRefCell;
17use script_bindings::reflector::{
18 Reflector, reflect_dom_object_with_cx, reflect_dom_object_with_proto_and_cx,
19};
20
21use super::byteteereadrequest::ByteTeeReadRequest;
22use super::readablebytestreamcontroller::ReadableByteStreamController;
23use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::{
24 ReadableStreamDefaultReaderMethods, ReadableStreamReadResult,
25};
26use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
27use crate::dom::bindings::reflector::DomGlobal;
28use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
29use crate::dom::bindings::trace::RootedTraceableBox;
30use crate::dom::globalscope::GlobalScope;
31use crate::dom::promise::Promise;
32use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
33use crate::dom::readablestream::{ReadableStream, bytes_from_chunk_jsval};
34use crate::dom::stream::defaultteereadrequest::DefaultTeeReadRequest;
35use crate::dom::stream::readablestreamgenericreader::ReadableStreamGenericReader;
36use crate::dom::types::ReadableStreamDefaultController;
37use crate::realms::enter_auto_realm;
38use crate::script_runtime::CanGc;
39
40type ReadAllBytesSuccessSteps = dyn Fn(&mut js::context::JSContext, &[u8]);
41type ReadAllBytesFailureSteps = dyn Fn(&mut js::context::JSContext, SafeHandleValue);
42
43impl js::gc::Rootable for ContinueReadMicrotask {}
44
45#[derive(Clone, JSTraceable, MallocSizeOf)]
51#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
52struct ContinueReadMicrotask {
53 reader: Dom<ReadableStreamDefaultReader>,
54 request: ReadRequest,
55}
56
57impl Callback for ContinueReadMicrotask {
58 fn callback(&self, cx: &mut CurrentRealm, _v: SafeHandleValue) {
59 self.reader.read(cx, &self.request);
62 }
63}
64
65fn read_loop(
67 cx: &mut js::context::JSContext,
68 reader: &ReadableStreamDefaultReader,
69 success_steps: Rc<ReadAllBytesSuccessSteps>,
70 failure_steps: Rc<ReadAllBytesFailureSteps>,
71) {
72 let req = ReadRequest::ReadLoop {
77 success_steps,
78 failure_steps,
79 reader: Dom::from_ref(reader),
80 bytes: Rc::new(DomRefCell::new(Vec::new())),
81 };
82 reader.read(cx, &req);
84}
85
86#[derive(Clone, JSTraceable, MallocSizeOf)]
88pub(crate) enum ReadRequest {
89 Read(#[conditional_malloc_size_of] Rc<Promise>),
91 DefaultTee {
93 tee_read_request: Dom<DefaultTeeReadRequest>,
94 },
95 ReadLoop {
98 #[ignore_malloc_size_of = "dyn Fn"]
99 #[no_trace]
100 success_steps: Rc<ReadAllBytesSuccessSteps>,
101 #[ignore_malloc_size_of = "dyn Fn"]
102 #[no_trace]
103 failure_steps: Rc<ReadAllBytesFailureSteps>,
104 reader: Dom<ReadableStreamDefaultReader>,
105 #[conditional_malloc_size_of]
106 bytes: Rc<DomRefCell<Vec<u8>>>,
107 },
108 ByteTee {
109 byte_tee_read_request: Dom<ByteTeeReadRequest>,
110 },
111}
112
113impl ReadRequest {
114 pub(crate) fn chunk_steps(
116 &self,
117 cx: &mut js::context::JSContext,
118 chunk: RootedTraceableBox<Heap<JSVal>>,
119 global: &GlobalScope,
120 ) {
121 match self {
122 ReadRequest::Read(promise) => {
123 promise.resolve_native(
126 &ReadableStreamReadResult {
127 done: Some(false),
128 value: chunk,
129 },
130 CanGc::from_cx(cx),
131 );
132 },
133 ReadRequest::DefaultTee { tee_read_request } => {
134 tee_read_request.enqueue_chunk_steps(chunk);
135 },
136 ReadRequest::ByteTee {
137 byte_tee_read_request,
138 } => {
139 byte_tee_read_request.enqueue_chunk_steps(global, chunk);
140 },
141 ReadRequest::ReadLoop {
142 success_steps: _,
143 failure_steps,
144 reader,
145 bytes,
146 } => {
147 let global = reader.global();
149
150 match bytes_from_chunk_jsval(cx, &chunk) {
151 Ok(vec) => {
152 bytes.borrow_mut().extend_from_slice(&vec);
154
155 let tick = Promise::new2(cx, &global);
159 tick.resolve_native_with_cx(cx, &());
160
161 let handler = PromiseNativeHandler::new(
162 cx,
163 &global,
164 Some(Box::new(ContinueReadMicrotask {
165 reader: Dom::from_ref(reader),
166 request: self.clone(),
167 })),
168 None,
169 );
170
171 let mut realm = enter_auto_realm(cx, &*global);
172 let cx = &mut realm.current_realm();
173 tick.append_native_handler(cx, &handler);
174 },
175 Err(err) => {
176 rooted!(&in(cx) let mut v = UndefinedValue());
178 err.to_jsval(cx.into(), &global, v.handle_mut(), CanGc::from_cx(cx));
179 (failure_steps)(cx, v.handle());
180 },
181 }
182 },
183 }
184 }
185
186 pub(crate) fn close_steps(&self, cx: &mut js::context::JSContext) {
188 match self {
189 ReadRequest::Read(promise) => {
190 let result = RootedTraceableBox::new(Heap::default());
193 result.set(UndefinedValue());
194 promise.resolve_native(
195 &ReadableStreamReadResult {
196 done: Some(true),
197 value: result,
198 },
199 CanGc::from_cx(cx),
200 );
201 },
202 ReadRequest::DefaultTee { tee_read_request } => {
203 tee_read_request.close_steps(cx);
204 },
205 ReadRequest::ByteTee {
206 byte_tee_read_request,
207 } => {
208 byte_tee_read_request
209 .close_steps(cx)
210 .expect("ByteTeeReadRequest close steps should not fail");
211 },
212 ReadRequest::ReadLoop {
213 success_steps,
214 reader,
215 bytes,
216 ..
217 } => {
218 (success_steps)(cx, &bytes.borrow());
220
221 reader
222 .release(cx)
223 .expect("Releasing the read-all-bytes reader should succeed");
224 },
225 }
226 }
227
228 pub(crate) fn error_steps(&self, cx: &mut js::context::JSContext, e: SafeHandleValue) {
230 match self {
231 ReadRequest::Read(promise) => {
232 promise.reject_native(&e, CanGc::from_cx(cx))
235 },
236 ReadRequest::DefaultTee { tee_read_request } => {
237 tee_read_request.error_steps();
238 },
239 ReadRequest::ByteTee {
240 byte_tee_read_request,
241 } => {
242 byte_tee_read_request.error_steps();
243 },
244 ReadRequest::ReadLoop {
245 failure_steps,
246 reader,
247 ..
248 } => {
249 (failure_steps)(cx, e);
251
252 reader
253 .release(cx)
254 .expect("Releasing the read-all-bytes reader should succeed");
255 },
256 }
257 }
258}
259
260#[derive(Clone, JSTraceable, MallocSizeOf)]
263#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
264struct ByteTeeClosedPromiseRejectionHandler {
265 branch_1_controller: Dom<ReadableByteStreamController>,
266 branch_2_controller: Dom<ReadableByteStreamController>,
267 #[conditional_malloc_size_of]
268 canceled_1: Rc<Cell<bool>>,
269 #[conditional_malloc_size_of]
270 canceled_2: Rc<Cell<bool>>,
271 #[conditional_malloc_size_of]
272 cancel_promise: Rc<Promise>,
273 #[conditional_malloc_size_of]
274 reader_version: Rc<Cell<u64>>,
275 expected_version: u64,
276}
277
278impl Callback for ByteTeeClosedPromiseRejectionHandler {
279 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
282 if self.reader_version.get() != self.expected_version {
284 return;
285 }
286
287 self.branch_1_controller.error(cx, v);
289
290 self.branch_2_controller.error(cx, v);
292
293 if !self.canceled_1.get() || !self.canceled_2.get() {
295 self.cancel_promise.resolve_native_with_cx(cx, &());
296 }
297 }
298}
299
300#[derive(Clone, JSTraceable, MallocSizeOf)]
303#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
304struct DefaultTeeClosedPromiseRejectionHandler {
305 branch_1_controller: Dom<ReadableStreamDefaultController>,
306 branch_2_controller: Dom<ReadableStreamDefaultController>,
307 #[conditional_malloc_size_of]
308 canceled_1: Rc<Cell<bool>>,
309 #[conditional_malloc_size_of]
310 canceled_2: Rc<Cell<bool>>,
311 #[conditional_malloc_size_of]
312 cancel_promise: Rc<Promise>,
313}
314
315impl Callback for DefaultTeeClosedPromiseRejectionHandler {
316 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
319 self.branch_1_controller.error(cx, v);
321 self.branch_2_controller.error(cx, v);
323
324 if !self.canceled_1.get() || !self.canceled_2.get() {
326 self.cancel_promise.resolve_native_with_cx(cx, &());
327 }
328 }
329}
330
331#[dom_struct]
333pub(crate) struct ReadableStreamDefaultReader {
334 reflector_: Reflector,
335
336 stream: MutNullableDom<ReadableStream>,
338
339 read_requests: DomRefCell<VecDeque<ReadRequest>>,
340
341 #[conditional_malloc_size_of]
343 closed_promise: DomRefCell<Rc<Promise>>,
344}
345
346impl ReadableStreamDefaultReader {
347 fn new_with_proto(
348 cx: &mut JSContext,
349 global: &GlobalScope,
350 proto: Option<SafeHandleObject>,
351 ) -> DomRoot<ReadableStreamDefaultReader> {
352 reflect_dom_object_with_proto_and_cx(
353 Box::new(ReadableStreamDefaultReader::new_inherited(cx, global)),
354 global,
355 proto,
356 cx,
357 )
358 }
359
360 fn new_inherited(cx: &mut JSContext, global: &GlobalScope) -> ReadableStreamDefaultReader {
361 ReadableStreamDefaultReader {
362 reflector_: Reflector::new(),
363 stream: MutNullableDom::new(None),
364 read_requests: DomRefCell::new(Default::default()),
365 closed_promise: DomRefCell::new(Promise::new2(cx, global)),
366 }
367 }
368
369 pub(crate) fn new(
370 cx: &mut JSContext,
371 global: &GlobalScope,
372 ) -> DomRoot<ReadableStreamDefaultReader> {
373 reflect_dom_object_with_cx(Box::new(Self::new_inherited(cx, global)), global, cx)
374 }
375
376 pub(crate) fn set_up(
378 &self,
379 cx: &mut JSContext,
380 stream: &ReadableStream,
381 global: &GlobalScope,
382 ) -> Fallible<()> {
383 if stream.is_locked() {
385 return Err(Error::Type(c"stream is locked".to_owned()));
386 }
387 self.generic_initialize(cx, global, stream);
390
391 self.read_requests.borrow_mut().clear();
393
394 Ok(())
395 }
396
397 pub(crate) fn close(&self, cx: &mut js::context::JSContext) {
399 self.closed_promise.borrow().resolve_native_with_cx(cx, &());
401 let mut read_requests = self.take_read_requests();
404 for request in read_requests.drain(0..) {
407 request.close_steps(cx);
409 }
410 }
411
412 pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
414 self.read_requests
415 .borrow_mut()
416 .push_back(read_request.clone());
417 }
418
419 pub(crate) fn get_num_read_requests(&self) -> usize {
421 self.read_requests.borrow().len()
422 }
423
424 pub(crate) fn error(&self, cx: &mut js::context::JSContext, e: SafeHandleValue) {
426 self.closed_promise.borrow().reject_native_with_cx(cx, &e);
428
429 self.closed_promise.borrow().set_promise_is_handled();
431
432 self.error_read_requests(cx, e);
434 }
435
436 pub(crate) fn remove_read_request(&self) -> ReadRequest {
438 self.read_requests
439 .borrow_mut()
440 .pop_front()
441 .expect("Reader must have read request when remove is called into.")
442 }
443
444 pub(crate) fn release(&self, cx: &mut js::context::JSContext) -> Fallible<()> {
446 self.generic_release(cx).expect("Generic release failed");
448 rooted!(&in(cx) let mut error = UndefinedValue());
450 Error::Type(c"Reader is released".to_owned()).to_jsval(
451 cx.into(),
452 &self.global(),
453 error.handle_mut(),
454 CanGc::from_cx(cx),
455 );
456
457 self.error_read_requests(cx, error.handle());
459 Ok(())
460 }
461
462 fn take_read_requests(&self) -> VecDeque<ReadRequest> {
463 mem::take(&mut *self.read_requests.borrow_mut())
464 }
465
466 fn error_read_requests(&self, cx: &mut js::context::JSContext, rval: SafeHandleValue) {
468 let mut read_requests = self.take_read_requests();
470
471 for request in read_requests.drain(0..) {
473 request.error_steps(cx, rval);
474 }
475 }
476
477 pub(crate) fn read(&self, cx: &mut js::context::JSContext, read_request: &ReadRequest) {
479 assert!(self.stream.get().is_some());
483
484 let stream = self.stream.get().unwrap();
485
486 stream.set_is_disturbed(true);
488 if stream.is_closed() {
490 read_request.close_steps(cx);
491 } else if stream.is_errored() {
492 rooted!(&in(cx) let mut error = UndefinedValue());
495 stream.get_stored_error(error.handle_mut());
496 read_request.error_steps(cx, error.handle());
497 } else {
498 assert!(stream.is_readable());
501 stream.perform_pull_steps(cx, read_request);
503 }
504 }
505
506 #[allow(clippy::too_many_arguments)]
509 pub(crate) fn byte_tee_append_native_handler_to_closed_promise(
510 &self,
511 cx: &mut js::context::JSContext,
512 branch_1: &ReadableStream,
513 branch_2: &ReadableStream,
514 canceled_1: Rc<Cell<bool>>,
515 canceled_2: Rc<Cell<bool>>,
516 cancel_promise: Rc<Promise>,
517 reader_version: Rc<Cell<u64>>,
518 expected_version: u64,
519 ) {
520 let branch_1_controller = branch_1.get_byte_controller();
522 let branch_2_controller = branch_2.get_byte_controller();
523
524 let global = self.global();
525 let handler = PromiseNativeHandler::new(
526 cx,
527 &global,
528 None,
529 Some(Box::new(ByteTeeClosedPromiseRejectionHandler {
530 branch_1_controller: Dom::from_ref(&branch_1_controller),
531 branch_2_controller: Dom::from_ref(&branch_2_controller),
532 canceled_1,
533 canceled_2,
534 cancel_promise,
535 reader_version,
536 expected_version,
537 })),
538 );
539
540 let mut realm = enter_auto_realm(cx, &*global);
541 let cx = &mut realm.current_realm();
542
543 self.closed_promise
544 .borrow()
545 .append_native_handler(cx, &handler);
546 }
547
548 pub(crate) fn default_tee_append_native_handler_to_closed_promise(
550 &self,
551 cx: &mut js::context::JSContext,
552 branch_1: &ReadableStream,
553 branch_2: &ReadableStream,
554 canceled_1: Rc<Cell<bool>>,
555 canceled_2: Rc<Cell<bool>>,
556 cancel_promise: Rc<Promise>,
557 ) {
558 let branch_1_controller = branch_1.get_default_controller();
559
560 let branch_2_controller = branch_2.get_default_controller();
561
562 let global = self.global();
563 let handler = PromiseNativeHandler::new(
564 cx,
565 &global,
566 None,
567 Some(Box::new(DefaultTeeClosedPromiseRejectionHandler {
568 branch_1_controller: Dom::from_ref(&branch_1_controller),
569 branch_2_controller: Dom::from_ref(&branch_2_controller),
570 canceled_1,
571 canceled_2,
572 cancel_promise,
573 })),
574 );
575
576 let mut realm = enter_auto_realm(cx, &*global);
577 let cx = &mut realm.current_realm();
578
579 self.closed_promise
580 .borrow()
581 .append_native_handler(cx, &handler);
582 }
583
584 pub(crate) fn read_all_bytes(
586 &self,
587 cx: &mut js::context::JSContext,
588 success_steps: Rc<ReadAllBytesSuccessSteps>,
589 failure_steps: Rc<ReadAllBytesFailureSteps>,
590 ) {
591 read_loop(cx, self, success_steps, failure_steps);
596 }
597
598 pub(crate) fn process_read_requests(
600 &self,
601 cx: &mut js::context::JSContext,
602 controller: DomRoot<ReadableByteStreamController>,
603 ) -> Fallible<()> {
604 while !self.read_requests.borrow().is_empty() {
606 if controller.get_queue_total_size() == 0.0 {
608 return Ok(());
609 }
610
611 let read_request = self.remove_read_request();
614
615 controller
617 .fill_read_request_from_queue(cx, &read_request)
618 .expect("Fill read request from queue failed");
619 }
620 Ok(())
621 }
622}
623
624impl ReadableStreamDefaultReaderMethods<crate::DomTypeHolder> for ReadableStreamDefaultReader {
625 fn Constructor(
627 cx: &mut JSContext,
628 global: &GlobalScope,
629 proto: Option<SafeHandleObject>,
630 stream: &ReadableStream,
631 ) -> Fallible<DomRoot<Self>> {
632 let reader = Self::new_with_proto(cx, global, proto);
633
634 reader.set_up(cx, stream, global)?;
636
637 Ok(reader)
638 }
639
640 fn Read(&self, cx: &mut js::context::JSContext) -> Rc<Promise> {
642 if self.stream.get().is_none() {
644 rooted!(&in(cx) let mut error = UndefinedValue());
645 Error::Type(c"stream is undefined".to_owned()).to_jsval(
646 cx.into(),
647 &self.global(),
648 error.handle_mut(),
649 CanGc::from_cx(cx),
650 );
651 return Promise::new_rejected(cx, &self.global(), error.handle());
652 }
653 let promise = Promise::new2(cx, &self.global());
655
656 let read_request = ReadRequest::Read(promise.clone());
670
671 self.read(cx, &read_request);
673
674 promise
676 }
677
678 fn ReleaseLock(&self, cx: &mut js::context::JSContext) -> Fallible<()> {
680 if self.stream.get().is_none() {
681 return Ok(());
683 }
684
685 self.release(cx)
687 }
688
689 fn Closed(&self) -> Rc<Promise> {
691 self.closed()
692 }
693
694 fn Cancel(&self, cx: &mut js::context::JSContext, reason: SafeHandleValue) -> Rc<Promise> {
696 self.generic_cancel(cx, &self.global(), reason)
697 }
698}
699
700impl ReadableStreamGenericReader for ReadableStreamDefaultReader {
701 fn get_closed_promise(&self) -> Rc<Promise> {
702 self.closed_promise.borrow().clone()
703 }
704
705 fn set_closed_promise(&self, promise: Rc<Promise>) {
706 *self.closed_promise.borrow_mut() = promise;
707 }
708
709 fn set_stream(&self, stream: Option<&ReadableStream>) {
710 self.stream.set(stream);
711 }
712
713 fn get_stream(&self) -> Option<DomRoot<ReadableStream>> {
714 self.stream.get()
715 }
716
717 fn as_default_reader(&self) -> Option<&ReadableStreamDefaultReader> {
718 Some(self)
719 }
720}