use std::cell::Cell;
use std::rc::Rc;
use dom_struct::dom_struct;
use js::jsapi::Heap;
use js::jsval::{JSVal, UndefinedValue};
use js::rust::HandleValue as SafeHandleValue;
use super::bindings::reflector::reflect_dom_object;
use super::bindings::root::DomRoot;
use super::bindings::structuredclone;
use crate::dom::bindings::reflector::{DomObject, Reflector};
use crate::dom::bindings::root::Dom;
use crate::dom::bindings::trace::RootedTraceableBox;
use crate::dom::defaultteeunderlyingsource::DefaultTeeUnderlyingSource;
use crate::dom::globalscope::GlobalScope;
use crate::dom::promise::Promise;
use crate::dom::readablestream::ReadableStream;
use crate::microtask::Microtask;
use crate::script_runtime::CanGc;
#[derive(JSTraceable, MallocSizeOf)]
#[allow(crown::unrooted_must_root)]
pub struct DefaultTeeReadRequestMicrotask {
#[ignore_malloc_size_of = "mozjs"]
chunk: Box<Heap<JSVal>>,
tee_read_request: Dom<DefaultTeeReadRequest>,
}
impl DefaultTeeReadRequestMicrotask {
pub fn microtask_chunk_steps(&self, can_gc: CanGc) {
self.tee_read_request.chunk_steps(&self.chunk, can_gc)
}
}
#[dom_struct]
pub struct DefaultTeeReadRequest {
reflector_: Reflector,
stream: Dom<ReadableStream>,
branch_1: Dom<ReadableStream>,
branch_2: Dom<ReadableStream>,
#[ignore_malloc_size_of = "Rc"]
reading: Rc<Cell<bool>>,
#[ignore_malloc_size_of = "Rc"]
read_again: Rc<Cell<bool>>,
#[ignore_malloc_size_of = "Rc"]
canceled_1: Rc<Cell<bool>>,
#[ignore_malloc_size_of = "Rc"]
canceled_2: Rc<Cell<bool>>,
#[ignore_malloc_size_of = "Rc"]
clone_for_branch_2: Rc<Cell<bool>>,
#[ignore_malloc_size_of = "Rc"]
cancel_promise: Rc<Promise>,
tee_underlying_source: Dom<DefaultTeeUnderlyingSource>,
}
impl DefaultTeeReadRequest {
#[allow(clippy::too_many_arguments)]
#[allow(crown::unrooted_must_root)]
pub fn new(
stream: &ReadableStream,
branch_1: &ReadableStream,
branch_2: &ReadableStream,
reading: Rc<Cell<bool>>,
read_again: Rc<Cell<bool>>,
canceled_1: Rc<Cell<bool>>,
canceled_2: Rc<Cell<bool>>,
clone_for_branch_2: Rc<Cell<bool>>,
cancel_promise: Rc<Promise>,
tee_underlying_source: &DefaultTeeUnderlyingSource,
can_gc: CanGc,
) -> DomRoot<Self> {
reflect_dom_object(
Box::new(DefaultTeeReadRequest {
reflector_: Reflector::new(),
stream: Dom::from_ref(stream),
branch_1: Dom::from_ref(branch_1),
branch_2: Dom::from_ref(branch_2),
reading,
read_again,
canceled_1,
canceled_2,
clone_for_branch_2,
cancel_promise,
tee_underlying_source: Dom::from_ref(tee_underlying_source),
}),
&*stream.global(),
can_gc,
)
}
pub fn stream_cancel(&self, reason: SafeHandleValue, can_gc: CanGc) {
self.stream.cancel(reason, can_gc);
}
pub fn enqueue_chunk_steps(&self, chunk: RootedTraceableBox<Heap<JSVal>>) {
let tee_read_request_chunk = DefaultTeeReadRequestMicrotask {
chunk: Heap::boxed(*chunk.handle()),
tee_read_request: Dom::from_ref(self),
};
let global = self.stream.global();
let microtask_queue = global.microtask_queue();
let cx = GlobalScope::get_cx();
microtask_queue.enqueue(
Microtask::ReadableStreamTeeReadRequest(tee_read_request_chunk),
cx,
);
}
#[allow(unsafe_code)]
#[allow(clippy::borrowed_box)]
pub fn chunk_steps(&self, chunk: &Box<Heap<JSVal>>, can_gc: CanGc) {
self.read_again.set(false);
let chunk1 = chunk;
let chunk2 = chunk;
if !self.canceled_2.get() && self.clone_for_branch_2.get() {
let cx = GlobalScope::get_cx();
rooted!(in(*cx) let mut clone_result = UndefinedValue());
let data = structuredclone::write(
cx,
unsafe { SafeHandleValue::from_raw(chunk2.handle()) },
None,
)
.unwrap();
if structuredclone::read(&self.stream.global(), data, clone_result.handle_mut())
.is_err()
{
self.readable_stream_default_controller_error(
&self.branch_1,
clone_result.handle(),
);
self.readable_stream_default_controller_error(
&self.branch_2,
clone_result.handle(),
);
self.stream_cancel(clone_result.handle(), can_gc);
return;
} else {
chunk2.set(*clone_result);
}
}
if !self.canceled_1.get() {
self.readable_stream_default_controller_enqueue(
&self.branch_1,
unsafe { SafeHandleValue::from_raw(chunk1.handle()) },
can_gc,
);
}
if !self.canceled_2.get() {
self.readable_stream_default_controller_enqueue(
&self.branch_2,
unsafe { SafeHandleValue::from_raw(chunk2.handle()) },
can_gc,
);
}
self.reading.set(false);
if self.read_again.get() {
self.pull_algorithm(can_gc);
}
}
pub fn close_steps(&self) {
self.reading.set(false);
if !self.canceled_1.get() {
self.readable_stream_default_controller_close(&self.branch_1);
}
if !self.canceled_2.get() {
self.readable_stream_default_controller_close(&self.branch_2);
}
if !self.canceled_1.get() || !self.canceled_2.get() {
self.cancel_promise.resolve_native(&());
}
}
pub fn error_steps(&self) {
self.reading.set(false);
}
fn readable_stream_default_controller_enqueue(
&self,
stream: &ReadableStream,
chunk: SafeHandleValue,
can_gc: CanGc,
) {
stream
.get_default_controller()
.enqueue(GlobalScope::get_cx(), chunk, can_gc)
.expect("enqueue failed for stream controller in DefaultTeeReadRequest");
}
fn readable_stream_default_controller_close(&self, stream: &ReadableStream) {
stream.get_default_controller().close();
}
fn readable_stream_default_controller_error(
&self,
stream: &ReadableStream,
error: SafeHandleValue,
) {
stream.get_default_controller().error(error);
}
pub fn pull_algorithm(&self, can_gc: CanGc) {
self.tee_underlying_source.pull_algorithm(can_gc);
}
}