1use std::cell::Cell;
6use std::collections::VecDeque;
7use std::mem;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::gc::MutableHandle;
12use js::jsapi::Heap;
13use js::jsval::{JSVal, UndefinedValue};
14use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
15
16use super::bindings::reflector::reflect_dom_object;
17use super::bindings::root::MutNullableDom;
18use super::readablebytestreamcontroller::ReadableByteStreamController;
19use super::types::ReadableStreamDefaultController;
20use crate::dom::bindings::cell::DomRefCell;
21use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::{
22 ReadableStreamDefaultReaderMethods, ReadableStreamReadResult,
23};
24use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
25use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
26use crate::dom::bindings::root::{Dom, DomRoot};
27use crate::dom::bindings::trace::RootedTraceableBox;
28use crate::dom::defaultteereadrequest::DefaultTeeReadRequest;
29use crate::dom::globalscope::GlobalScope;
30use crate::dom::promise::Promise;
31use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
32use crate::dom::readablestream::{ReadableStream, get_read_promise_bytes, get_read_promise_done};
33use crate::dom::readablestreamgenericreader::ReadableStreamGenericReader;
34use crate::realms::{InRealm, enter_realm};
35use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
36
37type ReadAllBytesSuccessSteps = dyn Fn(&[u8]);
38type ReadAllBytesFailureSteps = dyn Fn(SafeJSContext, SafeHandleValue);
39
40impl js::gc::Rootable for ReadLoopFulFillmentHandler {}
41
42#[derive(Clone, JSTraceable, MallocSizeOf)]
44#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
45struct ReadLoopFulFillmentHandler {
46 #[ignore_malloc_size_of = "Rc is hard"]
47 #[no_trace]
48 success_steps: Rc<ReadAllBytesSuccessSteps>,
49
50 #[ignore_malloc_size_of = "Rc is hard"]
51 #[no_trace]
52 failure_steps: Rc<ReadAllBytesFailureSteps>,
53
54 reader: Dom<ReadableStreamDefaultReader>,
55
56 #[ignore_malloc_size_of = "Rc is hard"]
57 bytes: Rc<DomRefCell<Vec<u8>>>,
58}
59
60impl Callback for ReadLoopFulFillmentHandler {
61 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
62 fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, realm: InRealm, can_gc: CanGc) {
63 let global = self.reader.global();
64 let is_done = match get_read_promise_done(cx, &v, can_gc) {
65 Ok(is_done) => is_done,
66 Err(err) => {
67 self.reader
68 .release(can_gc)
69 .expect("Releasing the reader should succeed");
70 rooted!(in(*cx) let mut v = UndefinedValue());
71 err.to_jsval(cx, &global, v.handle_mut(), can_gc);
72 (self.failure_steps)(cx, v.handle());
73 return;
74 },
75 };
76
77 if is_done {
78 (self.success_steps)(&self.bytes.borrow());
81 self.reader
82 .release(can_gc)
83 .expect("Releasing the reader should succeed");
84 } else {
85 let chunk = match get_read_promise_bytes(cx, &v, can_gc) {
87 Ok(chunk) => chunk,
88 Err(err) => {
89 rooted!(in(*cx) let mut v = UndefinedValue());
91 err.to_jsval(cx, &global, v.handle_mut(), can_gc);
92 (self.failure_steps)(cx, v.handle());
93 self.reader
94 .release(can_gc)
95 .expect("Releasing the reader should succeed");
96 return;
97 },
98 };
99
100 self.bytes.borrow_mut().extend_from_slice(&chunk);
102
103 rooted!(in(*cx) let mut this = Some(self.clone()));
105 read_loop(
106 &global,
107 this.handle_mut(),
108 Box::new(ReadLoopRejectionHandler {
109 failure_steps: self.failure_steps.clone(),
110 }),
111 realm,
112 can_gc,
113 );
114 }
115 }
116}
117
118#[derive(Clone, JSTraceable, MallocSizeOf)]
119struct ReadLoopRejectionHandler {
121 #[ignore_malloc_size_of = "Rc is hard"]
122 #[no_trace]
123 failure_steps: Rc<ReadAllBytesFailureSteps>,
124}
125
126impl Callback for ReadLoopRejectionHandler {
127 fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, _can_gc: CanGc) {
129 (self.failure_steps)(cx, v);
131 }
132}
133
134fn read_loop(
136 global: &GlobalScope,
137 mut fulfillment_handler: MutableHandle<Option<ReadLoopFulFillmentHandler>>,
138 rejection_handler: Box<ReadLoopRejectionHandler>,
139 realm: InRealm,
140 can_gc: CanGc,
141) {
142 let read_promise = fulfillment_handler
149 .as_ref()
150 .expect("Fulfillment handler should be some.")
151 .reader
152 .Read(can_gc);
153
154 let handler = PromiseNativeHandler::new(
155 global,
156 fulfillment_handler.take().map(|h| Box::new(h) as Box<_>),
157 Some(rejection_handler),
158 can_gc,
159 );
160 read_promise.append_native_handler(&handler, realm, can_gc);
161}
162
163#[derive(Clone, JSTraceable, MallocSizeOf)]
165pub(crate) enum ReadRequest {
166 Read(#[ignore_malloc_size_of = "Rc is hard"] Rc<Promise>),
168 DefaultTee {
170 tee_read_request: Dom<DefaultTeeReadRequest>,
171 },
172}
173
174impl ReadRequest {
175 pub(crate) fn chunk_steps(&self, chunk: RootedTraceableBox<Heap<JSVal>>, can_gc: CanGc) {
177 match self {
178 ReadRequest::Read(promise) => {
179 promise.resolve_native(
182 &ReadableStreamReadResult {
183 done: Some(false),
184 value: chunk,
185 },
186 can_gc,
187 );
188 },
189 ReadRequest::DefaultTee { tee_read_request } => {
190 tee_read_request.enqueue_chunk_steps(chunk);
191 },
192 }
193 }
194
195 pub(crate) fn close_steps(&self, can_gc: CanGc) {
197 match self {
198 ReadRequest::Read(promise) => {
199 let result = RootedTraceableBox::new(Heap::default());
202 result.set(UndefinedValue());
203 promise.resolve_native(
204 &ReadableStreamReadResult {
205 done: Some(true),
206 value: result,
207 },
208 can_gc,
209 );
210 },
211 ReadRequest::DefaultTee { tee_read_request } => {
212 tee_read_request.close_steps(can_gc);
213 },
214 }
215 }
216
217 pub(crate) fn error_steps(&self, e: SafeHandleValue, can_gc: CanGc) {
219 match self {
220 ReadRequest::Read(promise) => {
221 promise.reject_native(&e, can_gc)
224 },
225 ReadRequest::DefaultTee { tee_read_request } => {
226 tee_read_request.error_steps();
227 },
228 }
229 }
230}
231
232#[derive(Clone, JSTraceable, MallocSizeOf)]
235#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
236struct ClosedPromiseRejectionHandler {
237 branch_1_controller: Dom<ReadableStreamDefaultController>,
238 branch_2_controller: Dom<ReadableStreamDefaultController>,
239 #[ignore_malloc_size_of = "Rc"]
240 canceled_1: Rc<Cell<bool>>,
241 #[ignore_malloc_size_of = "Rc"]
242 canceled_2: Rc<Cell<bool>>,
243 #[ignore_malloc_size_of = "Rc"]
244 cancel_promise: Rc<Promise>,
245}
246
247impl Callback for ClosedPromiseRejectionHandler {
248 fn callback(&self, _cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
251 let branch_1_controller = &self.branch_1_controller;
252 let branch_2_controller = &self.branch_2_controller;
253
254 branch_1_controller.error(v, can_gc);
256 branch_2_controller.error(v, can_gc);
258
259 if !self.canceled_1.get() || !self.canceled_2.get() {
261 self.cancel_promise.resolve_native(&(), can_gc);
262 }
263 }
264}
265
266#[dom_struct]
268pub(crate) struct ReadableStreamDefaultReader {
269 reflector_: Reflector,
270
271 stream: MutNullableDom<ReadableStream>,
273
274 read_requests: DomRefCell<VecDeque<ReadRequest>>,
275
276 #[ignore_malloc_size_of = "Rc is hard"]
278 closed_promise: DomRefCell<Rc<Promise>>,
279}
280
281impl ReadableStreamDefaultReader {
282 fn new_with_proto(
283 global: &GlobalScope,
284 proto: Option<SafeHandleObject>,
285 can_gc: CanGc,
286 ) -> DomRoot<ReadableStreamDefaultReader> {
287 reflect_dom_object_with_proto(
288 Box::new(ReadableStreamDefaultReader::new_inherited(global, can_gc)),
289 global,
290 proto,
291 can_gc,
292 )
293 }
294
295 fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> ReadableStreamDefaultReader {
296 ReadableStreamDefaultReader {
297 reflector_: Reflector::new(),
298 stream: MutNullableDom::new(None),
299 read_requests: DomRefCell::new(Default::default()),
300 closed_promise: DomRefCell::new(Promise::new(global, can_gc)),
301 }
302 }
303
304 pub(crate) fn new(global: &GlobalScope, can_gc: CanGc) -> DomRoot<ReadableStreamDefaultReader> {
305 reflect_dom_object(
306 Box::new(Self::new_inherited(global, can_gc)),
307 global,
308 can_gc,
309 )
310 }
311
312 pub(crate) fn set_up(
314 &self,
315 stream: &ReadableStream,
316 global: &GlobalScope,
317 can_gc: CanGc,
318 ) -> Fallible<()> {
319 if stream.is_locked() {
321 return Err(Error::Type("stream is locked".to_owned()));
322 }
323 self.generic_initialize(global, stream, can_gc);
326
327 self.read_requests.borrow_mut().clear();
329
330 Ok(())
331 }
332
333 pub(crate) fn close(&self, can_gc: CanGc) {
335 self.closed_promise.borrow().resolve_native(&(), can_gc);
337 let mut read_requests = self.take_read_requests();
340 for request in read_requests.drain(0..) {
343 request.close_steps(can_gc);
345 }
346 }
347
348 pub(crate) fn add_read_request(&self, read_request: &ReadRequest) {
350 self.read_requests
351 .borrow_mut()
352 .push_back(read_request.clone());
353 }
354
355 pub(crate) fn get_num_read_requests(&self) -> usize {
357 self.read_requests.borrow().len()
358 }
359
360 pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
362 self.closed_promise.borrow().reject_native(&e, can_gc);
364
365 self.closed_promise.borrow().set_promise_is_handled();
367
368 self.error_read_requests(e, can_gc);
370 }
371
372 pub(crate) fn remove_read_request(&self) -> ReadRequest {
374 self.read_requests
375 .borrow_mut()
376 .pop_front()
377 .expect("Reader must have read request when remove is called into.")
378 }
379
380 pub(crate) fn release(&self, can_gc: CanGc) -> Fallible<()> {
382 self.generic_release(can_gc)?;
384 let cx = GlobalScope::get_cx();
386 rooted!(in(*cx) let mut error = UndefinedValue());
387 Error::Type("Reader is released".to_owned()).to_jsval(
388 cx,
389 &self.global(),
390 error.handle_mut(),
391 can_gc,
392 );
393
394 self.error_read_requests(error.handle(), can_gc);
396 Ok(())
397 }
398
399 fn take_read_requests(&self) -> VecDeque<ReadRequest> {
400 mem::take(&mut *self.read_requests.borrow_mut())
401 }
402
403 fn error_read_requests(&self, rval: SafeHandleValue, can_gc: CanGc) {
405 let mut read_requests = self.take_read_requests();
407
408 for request in read_requests.drain(0..) {
410 request.error_steps(rval, can_gc);
411 }
412 }
413
414 pub(crate) fn read(&self, cx: SafeJSContext, read_request: &ReadRequest, can_gc: CanGc) {
416 assert!(self.stream.get().is_some());
420
421 let stream = self.stream.get().unwrap();
422
423 stream.set_is_disturbed(true);
425 if stream.is_closed() {
427 read_request.close_steps(can_gc);
428 } else if stream.is_errored() {
429 let cx = GlobalScope::get_cx();
432 rooted!(in(*cx) let mut error = UndefinedValue());
433 stream.get_stored_error(error.handle_mut());
434 read_request.error_steps(error.handle(), can_gc);
435 } else {
436 assert!(stream.is_readable());
439 stream.perform_pull_steps(cx, read_request, can_gc);
441 }
442 }
443
444 pub(crate) fn append_native_handler_to_closed_promise(
446 &self,
447 branch_1: &ReadableStream,
448 branch_2: &ReadableStream,
449 canceled_1: Rc<Cell<bool>>,
450 canceled_2: Rc<Cell<bool>>,
451 cancel_promise: Rc<Promise>,
452 can_gc: CanGc,
453 ) {
454 let branch_1_controller = branch_1.get_default_controller();
455
456 let branch_2_controller = branch_2.get_default_controller();
457
458 let global = self.global();
459 let handler = PromiseNativeHandler::new(
460 &global,
461 None,
462 Some(Box::new(ClosedPromiseRejectionHandler {
463 branch_1_controller: Dom::from_ref(&branch_1_controller),
464 branch_2_controller: Dom::from_ref(&branch_2_controller),
465 canceled_1,
466 canceled_2,
467 cancel_promise,
468 })),
469 can_gc,
470 );
471
472 let realm = enter_realm(&*global);
473 let comp = InRealm::Entered(&realm);
474
475 self.closed_promise
476 .borrow()
477 .append_native_handler(&handler, comp, can_gc);
478 }
479
480 pub(crate) fn read_all_bytes(
482 &self,
483 cx: SafeJSContext,
484 global: &GlobalScope,
485 success_steps: Rc<ReadAllBytesSuccessSteps>,
486 failure_steps: Rc<ReadAllBytesFailureSteps>,
487 realm: InRealm,
488 can_gc: CanGc,
489 ) {
490 rooted!(in(*cx) let mut fulfillment_handler = Some(ReadLoopFulFillmentHandler {
496 success_steps,
497 failure_steps: failure_steps.clone(),
498 reader: Dom::from_ref(self),
499 bytes: Rc::new(DomRefCell::new(Vec::new())),
500 }));
501 let rejection_handler = Box::new(ReadLoopRejectionHandler { failure_steps });
502 read_loop(
503 global,
504 fulfillment_handler.handle_mut(),
505 rejection_handler,
506 realm,
507 can_gc,
508 );
509 }
510
511 pub(crate) fn process_read_requests(
513 &self,
514 cx: SafeJSContext,
515 controller: DomRoot<ReadableByteStreamController>,
516 can_gc: CanGc,
517 ) -> Fallible<()> {
518 while !self.read_requests.borrow().is_empty() {
520 if controller.get_queue_total_size() == 0.0 {
522 return Ok(());
523 }
524
525 let read_request = self.remove_read_request();
528
529 controller.fill_read_request_from_queue(cx, &read_request, can_gc)?;
531 }
532 Ok(())
533 }
534}
535
536impl ReadableStreamDefaultReaderMethods<crate::DomTypeHolder> for ReadableStreamDefaultReader {
537 fn Constructor(
539 global: &GlobalScope,
540 proto: Option<SafeHandleObject>,
541 can_gc: CanGc,
542 stream: &ReadableStream,
543 ) -> Fallible<DomRoot<Self>> {
544 let reader = Self::new_with_proto(global, proto, can_gc);
545
546 Self::set_up(&reader, stream, global, can_gc)?;
548
549 Ok(reader)
550 }
551
552 fn Read(&self, can_gc: CanGc) -> Rc<Promise> {
554 let cx = GlobalScope::get_cx();
555 if self.stream.get().is_none() {
557 rooted!(in(*cx) let mut error = UndefinedValue());
558 Error::Type("stream is undefined".to_owned()).to_jsval(
559 cx,
560 &self.global(),
561 error.handle_mut(),
562 can_gc,
563 );
564 return Promise::new_rejected(&self.global(), cx, error.handle(), can_gc);
565 }
566 let promise = Promise::new(&self.global(), can_gc);
568
569 let read_request = ReadRequest::Read(promise.clone());
583
584 self.read(cx, &read_request, can_gc);
586
587 promise
589 }
590
591 fn ReleaseLock(&self, can_gc: CanGc) -> Fallible<()> {
593 if self.stream.get().is_none() {
594 return Ok(());
596 }
597
598 self.release(can_gc)
600 }
601
602 fn Closed(&self) -> Rc<Promise> {
604 self.closed()
605 }
606
607 fn Cancel(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
609 self.generic_cancel(cx, &self.global(), reason, can_gc)
610 }
611}
612
613impl ReadableStreamGenericReader for ReadableStreamDefaultReader {
614 fn get_closed_promise(&self) -> Rc<Promise> {
615 self.closed_promise.borrow().clone()
616 }
617
618 fn set_closed_promise(&self, promise: Rc<Promise>) {
619 *self.closed_promise.borrow_mut() = promise;
620 }
621
622 fn set_stream(&self, stream: Option<&ReadableStream>) {
623 self.stream.set(stream);
624 }
625
626 fn get_stream(&self) -> Option<DomRoot<ReadableStream>> {
627 self.stream.get()
628 }
629
630 fn as_default_reader(&self) -> Option<&ReadableStreamDefaultReader> {
631 Some(self)
632 }
633}