use std::cell::Cell;
use std::ptr::{self, NonNull};
use std::rc::Rc;
use dom_struct::dom_struct;
use js::conversions::ToJSValConvertible;
use js::jsapi::{Heap, JSObject};
use js::jsval::{JSVal, ObjectValue, UndefinedValue};
use js::rust::{
HandleObject as SafeHandleObject, HandleValue as SafeHandleValue,
MutableHandleValue as SafeMutableHandleValue,
};
use crate::dom::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategy;
use crate::dom::bindings::codegen::Bindings::ReadableStreamBinding::{
ReadableStreamGetReaderOptions, ReadableStreamMethods, ReadableStreamReaderMode,
};
use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultReaderBinding::ReadableStreamDefaultReaderMethods;
use crate::dom::bindings::codegen::Bindings::ReadableStreamDefaultControllerBinding::ReadableStreamDefaultController_Binding::ReadableStreamDefaultControllerMethods;
use crate::dom::bindings::codegen::Bindings::UnderlyingSourceBinding::UnderlyingSource as JsUnderlyingSource;
use crate::dom::bindings::conversions::{ConversionBehavior, ConversionResult};
use crate::dom::bindings::error::Error;
use crate::dom::bindings::import::module::Fallible;
use crate::dom::bindings::import::module::UnionTypes::ReadableStreamDefaultReaderOrReadableStreamBYOBReader as ReadableStreamReader;
use crate::dom::bindings::reflector::{DomObject, Reflector, reflect_dom_object, reflect_dom_object_with_proto};
use crate::dom::bindings::root::{DomRoot, MutNullableDom, Dom};
use crate::dom::bindings::trace::RootedTraceableBox;
use crate::dom::bindings::utils::get_dictionary_property;
use crate::dom::countqueuingstrategy::{extract_high_water_mark, extract_size_algorithm};
use crate::dom::globalscope::GlobalScope;
use crate::dom::promise::Promise;
use crate::dom::readablebytestreamcontroller::ReadableByteStreamController;
use crate::dom::readablestreambyobreader::ReadableStreamBYOBReader;
use crate::dom::readablestreamdefaultcontroller::ReadableStreamDefaultController;
use crate::dom::readablestreamdefaultreader::{ReadRequest, ReadableStreamDefaultReader};
use crate::dom::defaultteeunderlyingsource::TeeCancelAlgorithm;
use crate::dom::types::DefaultTeeUnderlyingSource;
use crate::dom::underlyingsourcecontainer::UnderlyingSourceType;
use crate::js::conversions::FromJSValConvertible;
use crate::realms::{enter_realm, InRealm};
use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
#[derive(Clone, JSTraceable, MallocSizeOf)]
struct SourceCancelPromiseFulfillmentHandler {
#[ignore_malloc_size_of = "Rc are hard"]
result: Rc<Promise>,
}
impl Callback for SourceCancelPromiseFulfillmentHandler {
fn callback(&self, _cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, _can_gc: CanGc) {
self.result.resolve_native(&());
}
}
#[derive(Clone, JSTraceable, MallocSizeOf)]
struct SourceCancelPromiseRejectionHandler {
#[ignore_malloc_size_of = "Rc are hard"]
result: Rc<Promise>,
}
impl Callback for SourceCancelPromiseRejectionHandler {
fn callback(&self, _cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, _can_gc: CanGc) {
self.result.reject_native(&v);
}
}
#[derive(Clone, Copy, Debug, Default, JSTraceable, MallocSizeOf, PartialEq)]
pub enum ReadableStreamState {
#[default]
Readable,
Closed,
Errored,
}
#[derive(JSTraceable, MallocSizeOf)]
#[crown::unrooted_must_root_lint::must_root]
pub enum ControllerType {
Byte(MutNullableDom<ReadableByteStreamController>),
Default(MutNullableDom<ReadableStreamDefaultController>),
}
#[derive(JSTraceable, MallocSizeOf)]
#[crown::unrooted_must_root_lint::must_root]
pub enum ReaderType {
#[allow(clippy::upper_case_acronyms)]
BYOB(MutNullableDom<ReadableStreamBYOBReader>),
Default(MutNullableDom<ReadableStreamDefaultReader>),
}
#[allow(crown::unrooted_must_root)]
fn create_readable_stream(
global: &GlobalScope,
underlying_source_type: UnderlyingSourceType,
queuing_strategy: QueuingStrategy,
can_gc: CanGc,
) -> DomRoot<ReadableStream> {
let high_water_mark = queuing_strategy.highWaterMark.unwrap_or(1.0);
let size_algorithm = queuing_strategy
.size
.unwrap_or(extract_size_algorithm(&QueuingStrategy::empty()));
assert!(high_water_mark >= 0.0);
let stream = ReadableStream::new_with_proto(
global,
None,
ControllerType::Default(MutNullableDom::new(None)),
can_gc,
);
let controller = ReadableStreamDefaultController::new(
global,
underlying_source_type,
high_water_mark,
size_algorithm,
can_gc,
);
controller
.setup(stream.clone(), can_gc)
.expect("Setup of default controller cannot fail");
stream
}
#[dom_struct]
pub struct ReadableStream {
reflector_: Reflector,
controller: ControllerType,
#[ignore_malloc_size_of = "mozjs"]
stored_error: Heap<JSVal>,
disturbed: Cell<bool>,
reader: ReaderType,
state: Cell<ReadableStreamState>,
}
impl ReadableStream {
#[allow(crown::unrooted_must_root)]
fn new_inherited(controller: ControllerType) -> ReadableStream {
let reader = match &controller {
ControllerType::Default(_) => ReaderType::Default(MutNullableDom::new(None)),
ControllerType::Byte(_) => ReaderType::BYOB(MutNullableDom::new(None)),
};
ReadableStream {
reflector_: Reflector::new(),
controller,
stored_error: Heap::default(),
disturbed: Default::default(),
reader,
state: Cell::new(Default::default()),
}
}
#[allow(crown::unrooted_must_root)]
fn new_with_proto(
global: &GlobalScope,
proto: Option<SafeHandleObject>,
controller: ControllerType,
can_gc: CanGc,
) -> DomRoot<ReadableStream> {
reflect_dom_object_with_proto(
Box::new(ReadableStream::new_inherited(controller)),
global,
proto,
can_gc,
)
}
pub fn set_default_controller(&self, controller: &ReadableStreamDefaultController) {
match self.controller {
ControllerType::Default(ref ctrl) => ctrl.set(Some(controller)),
ControllerType::Byte(_) => {
unreachable!("set_default_controller called in setup of default controller.")
},
}
}
pub fn assert_no_controller(&self) {
let has_no_controller = match self.controller {
ControllerType::Default(ref ctrl) => ctrl.get().is_none(),
ControllerType::Byte(ref ctrl) => ctrl.get().is_none(),
};
assert!(has_no_controller);
}
pub fn new_from_bytes(
global: &GlobalScope,
bytes: Vec<u8>,
can_gc: CanGc,
) -> DomRoot<ReadableStream> {
let stream = ReadableStream::new_with_external_underlying_source(
global,
UnderlyingSourceType::Memory(bytes.len()),
can_gc,
);
stream.enqueue_native(bytes);
stream.controller_close_native();
stream
}
#[allow(crown::unrooted_must_root)]
pub fn new_with_external_underlying_source(
global: &GlobalScope,
source: UnderlyingSourceType,
can_gc: CanGc,
) -> DomRoot<ReadableStream> {
assert!(source.is_native());
let stream = ReadableStream::new_with_proto(
global,
None,
ControllerType::Default(MutNullableDom::new(None)),
can_gc,
);
let controller = ReadableStreamDefaultController::new(
global,
source,
1.0,
extract_size_algorithm(&QueuingStrategy::empty()),
can_gc,
);
controller
.setup(stream.clone(), can_gc)
.expect("Setup of controller with external underlying source cannot fail");
stream
}
pub fn perform_release_steps(&self) {
match self.controller {
ControllerType::Default(ref controller) => controller
.get()
.expect("Stream should have controller.")
.perform_release_steps(),
ControllerType::Byte(_) => todo!(),
}
}
pub fn perform_pull_steps(&self, read_request: &ReadRequest, can_gc: CanGc) {
match self.controller {
ControllerType::Default(ref controller) => controller
.get()
.expect("Stream should have controller.")
.perform_pull_steps(read_request, can_gc),
ControllerType::Byte(_) => todo!(),
}
}
pub fn add_read_request(&self, read_request: &ReadRequest) {
match self.reader {
ReaderType::Default(ref reader) => {
let Some(reader) = reader.get() else {
panic!("Attempt to add a read request without having first acquired a reader.");
};
assert!(self.is_readable());
reader.add_read_request(read_request);
},
ReaderType::BYOB(_) => {
unreachable!("Adding a read request can only be done on a default reader.")
},
}
}
pub fn get_js_stream(&self) -> NonNull<JSObject> {
NonNull::new(*self.reflector().get_jsobject())
.expect("Couldn't get a non-null pointer to JS stream object.")
}
pub fn enqueue_native(&self, bytes: Vec<u8>) {
match self.controller {
ControllerType::Default(ref controller) => controller
.get()
.expect("Stream should have controller.")
.enqueue_native(bytes),
_ => unreachable!(
"Enqueueing chunk to a stream from Rust on other than default controller"
),
}
}
pub fn error(&self, e: SafeHandleValue) {
assert!(self.is_readable());
self.state.set(ReadableStreamState::Errored);
self.stored_error.set(e.get());
match self.reader {
ReaderType::Default(ref reader) => {
let Some(reader) = reader.get() else {
return;
};
reader.error(e);
},
_ => todo!(),
}
}
pub fn get_stored_error(&self, mut handle_mut: SafeMutableHandleValue) {
handle_mut.set(self.stored_error.get());
}
#[allow(unsafe_code)]
pub fn error_native(&self, error: Error) {
let cx = GlobalScope::get_cx();
rooted!(in(*cx) let mut error_val = UndefinedValue());
unsafe { error.to_jsval(*cx, &self.global(), error_val.handle_mut()) };
self.error(error_val.handle());
}
pub fn controller_close_native(&self) {
match self.controller {
ControllerType::Default(ref controller) => {
let _ = controller
.get()
.expect("Stream should have controller.")
.Close();
},
ControllerType::Byte(_) => {
unreachable!("Native closing is only done on default controllers.")
},
}
}
pub fn in_memory(&self) -> bool {
match self.controller {
ControllerType::Default(ref controller) => controller
.get()
.expect("Stream should have controller.")
.in_memory(),
ControllerType::Byte(_) => unreachable!(
"Checking if source is in memory for a stream with a non-default controller"
),
}
}
pub fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
match self.controller {
ControllerType::Default(ref controller) => controller
.get()
.expect("Stream should have controller.")
.get_in_memory_bytes(),
ControllerType::Byte(_) => {
unreachable!("Getting in-memory bytes for a stream with a non-default controller")
},
}
}
pub fn acquire_default_reader(
&self,
can_gc: CanGc,
) -> Fallible<DomRoot<ReadableStreamDefaultReader>> {
let reader = reflect_dom_object(
Box::new(ReadableStreamDefaultReader::new_inherited(
&self.global(),
can_gc,
)),
&*self.global(),
can_gc,
);
reader.set_up(self, &self.global(), can_gc)?;
Ok(reader)
}
pub fn get_default_controller(&self) -> DomRoot<ReadableStreamDefaultController> {
match self.controller {
ControllerType::Default(ref controller) => {
controller.get().expect("Stream should have controller.")
},
ControllerType::Byte(_) => unreachable!(
"Getting default controller for a stream with a non-default controller"
),
}
}
pub fn read_a_chunk(&self, can_gc: CanGc) -> Rc<Promise> {
match self.reader {
ReaderType::Default(ref reader) => {
let Some(reader) = reader.get() else {
unreachable!(
"Attempt to read stream chunk without having first acquired a reader."
);
};
reader.Read(can_gc)
},
ReaderType::BYOB(_) => {
unreachable!("Native reading of a chunk can only be done with a default reader.")
},
}
}
pub fn stop_reading(&self) {
match self.reader {
ReaderType::Default(ref reader) => {
let Some(reader) = reader.get() else {
unreachable!("Attempt to stop reading without having first acquired a reader.");
};
reader.release();
},
ReaderType::BYOB(_) => {
unreachable!("Native stop reading can only be done with a default reader.")
},
}
}
pub fn is_locked(&self) -> bool {
match self.reader {
ReaderType::Default(ref reader) => reader.get().is_some(),
ReaderType::BYOB(ref reader) => reader.get().is_some(),
}
}
pub fn is_disturbed(&self) -> bool {
self.disturbed.get()
}
pub fn set_is_disturbed(&self, disturbed: bool) {
self.disturbed.set(disturbed);
}
pub fn is_closed(&self) -> bool {
self.state.get() == ReadableStreamState::Closed
}
pub fn is_errored(&self) -> bool {
self.state.get() == ReadableStreamState::Errored
}
pub fn is_readable(&self) -> bool {
self.state.get() == ReadableStreamState::Readable
}
pub fn has_default_reader(&self) -> bool {
match self.reader {
ReaderType::Default(ref reader) => reader.get().is_some(),
ReaderType::BYOB(_) => false,
}
}
pub fn get_num_read_requests(&self) -> usize {
assert!(self.has_default_reader());
match self.reader {
ReaderType::Default(ref reader) => {
let reader = reader
.get()
.expect("Stream must have a reader when get num read requests is called into.");
reader.get_num_read_requests()
},
ReaderType::BYOB(_) => unreachable!(
"Stream must have a default reader when get num read requests is called into."
),
}
}
#[allow(crown::unrooted_must_root)]
pub fn fulfill_read_request(&self, chunk: SafeHandleValue, done: bool) {
assert!(self.has_default_reader());
match self.reader {
ReaderType::Default(ref reader) => {
let reader = reader
.get()
.expect("Stream must have a reader when a read request is fulfilled.");
assert_ne!(reader.get_num_read_requests(), 0);
let request = reader.remove_read_request();
if done {
request.close_steps();
} else {
let result = RootedTraceableBox::new(Heap::default());
result.set(*chunk);
request.chunk_steps(result);
}
},
ReaderType::BYOB(_) => unreachable!(
"Stream must have a default reader when fulfill read requests is called into."
),
}
}
pub fn close(&self) {
assert!(self.is_readable());
self.state.set(ReadableStreamState::Closed);
match self.reader {
ReaderType::Default(ref reader) => {
let Some(reader) = reader.get() else {
return;
};
reader.close();
},
ReaderType::BYOB(ref _reader) => todo!(),
}
}
#[allow(unsafe_code)]
pub fn cancel(&self, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
self.disturbed.set(true);
if self.is_closed() {
let promise = Promise::new(&self.reflector_.global(), can_gc);
promise.resolve_native(&());
return promise;
}
if self.is_errored() {
let promise = Promise::new(&self.reflector_.global(), can_gc);
unsafe {
let cx = GlobalScope::get_cx();
rooted!(in(*cx) let mut rval = UndefinedValue());
self.stored_error.to_jsval(*cx, rval.handle_mut());
promise.reject_native(&rval.handle());
return promise;
}
}
self.close();
let source_cancel_promise = match self.controller {
ControllerType::Default(ref controller) => controller
.get()
.expect("Stream should have controller.")
.perform_cancel_steps(reason, can_gc),
ControllerType::Byte(_) => {
todo!()
},
};
let global = self.reflector_.global();
let result_promise = Promise::new(&global, can_gc);
let fulfillment_handler = Box::new(SourceCancelPromiseFulfillmentHandler {
result: result_promise.clone(),
});
let rejection_handler = Box::new(SourceCancelPromiseRejectionHandler {
result: result_promise.clone(),
});
let handler =
PromiseNativeHandler::new(&global, Some(fulfillment_handler), Some(rejection_handler));
let realm = enter_realm(&*global);
let comp = InRealm::Entered(&realm);
source_cancel_promise.append_native_handler(&handler, comp, can_gc);
result_promise
}
pub fn set_reader(&self, new_reader: Option<&ReadableStreamDefaultReader>) {
match self.reader {
ReaderType::Default(ref reader) => {
reader.set(new_reader);
},
ReaderType::BYOB(_) => {
unreachable!("Setting a reader can only be done on a default reader.")
},
}
}
#[allow(crown::unrooted_must_root)]
fn default_tee(
&self,
clone_for_branch_2: bool,
can_gc: CanGc,
) -> Fallible<Vec<DomRoot<ReadableStream>>> {
let clone_for_branch_2 = Rc::new(Cell::new(clone_for_branch_2));
let reader = self.acquire_default_reader(can_gc)?;
self.set_reader(Some(&reader));
let reading = Rc::new(Cell::new(false));
let read_again = Rc::new(Cell::new(false));
let canceled_1 = Rc::new(Cell::new(false));
let canceled_2 = Rc::new(Cell::new(false));
let reason_1 = Rc::new(Heap::boxed(UndefinedValue()));
let reason_2 = Rc::new(Heap::boxed(UndefinedValue()));
let cancel_promise = Promise::new(&self.reflector_.global(), can_gc);
let tee_source_1 = DefaultTeeUnderlyingSource::new(
&reader,
self,
reading.clone(),
read_again.clone(),
canceled_1.clone(),
canceled_2.clone(),
clone_for_branch_2.clone(),
reason_1.clone(),
reason_2.clone(),
cancel_promise.clone(),
TeeCancelAlgorithm::Cancel1Algorithm,
can_gc,
);
let underlying_source_type_branch_1 =
UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_1));
let tee_source_2 = DefaultTeeUnderlyingSource::new(
&reader,
self,
reading,
read_again,
canceled_1.clone(),
canceled_2.clone(),
clone_for_branch_2,
reason_1,
reason_2,
cancel_promise.clone(),
TeeCancelAlgorithm::Cancel2Algorithm,
can_gc,
);
let underlying_source_type_branch_2 =
UnderlyingSourceType::Tee(Dom::from_ref(&tee_source_2));
let branch_1 = create_readable_stream(
&self.reflector_.global(),
underlying_source_type_branch_1,
QueuingStrategy::empty(),
can_gc,
);
tee_source_1.set_branch_1(&branch_1);
tee_source_2.set_branch_1(&branch_1);
let branch_2 = create_readable_stream(
&self.reflector_.global(),
underlying_source_type_branch_2,
QueuingStrategy::empty(),
can_gc,
);
tee_source_1.set_branch_2(&branch_2);
tee_source_2.set_branch_2(&branch_2);
reader.append_native_handler_to_closed_promise(
&branch_1,
&branch_2,
canceled_1,
canceled_2,
cancel_promise,
can_gc,
);
Ok(vec![branch_1, branch_2])
}
fn tee(
&self,
clone_for_branch_2: bool,
can_gc: CanGc,
) -> Fallible<Vec<DomRoot<ReadableStream>>> {
match self.controller {
ControllerType::Default(ref _controller) => {
self.default_tee(clone_for_branch_2, can_gc)
},
ControllerType::Byte(ref _controller) => {
todo!()
},
}
}
}
impl ReadableStreamMethods<crate::DomTypeHolder> for ReadableStream {
fn Constructor(
cx: SafeJSContext,
global: &GlobalScope,
proto: Option<SafeHandleObject>,
can_gc: CanGc,
underlying_source: Option<*mut JSObject>,
strategy: &QueuingStrategy,
) -> Fallible<DomRoot<Self>> {
rooted!(in(*cx) let underlying_source_obj = underlying_source.unwrap_or(ptr::null_mut()));
let underlying_source_dict = if !underlying_source_obj.is_null() {
rooted!(in(*cx) let obj_val = ObjectValue(underlying_source_obj.get()));
match JsUnderlyingSource::new(cx, obj_val.handle()) {
Ok(ConversionResult::Success(val)) => val,
Ok(ConversionResult::Failure(error)) => return Err(Error::Type(error.to_string())),
_ => {
return Err(Error::JSFailed);
},
}
} else {
JsUnderlyingSource::empty()
};
let stream = if underlying_source_dict.type_.is_some() {
ReadableStream::new_with_proto(
global,
proto,
ControllerType::Byte(MutNullableDom::new(None)),
can_gc,
)
} else {
ReadableStream::new_with_proto(
global,
proto,
ControllerType::Default(MutNullableDom::new(None)),
can_gc,
)
};
if underlying_source_dict.type_.is_some() {
return Err(Error::Type("Bytes streams not implemented".to_string()));
} else {
let high_water_mark = extract_high_water_mark(strategy, 1.0)?;
let size_algorithm = extract_size_algorithm(strategy);
let controller = ReadableStreamDefaultController::new(
global,
UnderlyingSourceType::Js(underlying_source_dict, Heap::default()),
high_water_mark,
size_algorithm,
can_gc,
);
controller.set_underlying_source_this_object(underlying_source_obj.handle());
controller.setup(stream.clone(), can_gc)?;
};
Ok(stream)
}
fn Locked(&self) -> bool {
self.is_locked()
}
fn Cancel(&self, _cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Rc<Promise> {
if self.is_locked() {
let promise = Promise::new(&self.reflector_.global(), can_gc);
promise.reject_error(Error::Type("stream is not locked".to_owned()));
promise
} else {
self.cancel(reason, can_gc)
}
}
fn GetReader(
&self,
options: &ReadableStreamGetReaderOptions,
can_gc: CanGc,
) -> Fallible<ReadableStreamReader> {
if options.mode.is_none() {
return Ok(ReadableStreamReader::ReadableStreamDefaultReader(
self.acquire_default_reader(can_gc)?,
));
}
assert!(options.mode.unwrap() == ReadableStreamReaderMode::Byob);
Err(Error::Type(
"AcquireReadableStreamBYOBReader is not implemented".to_owned(),
))
}
fn Tee(&self, can_gc: CanGc) -> Fallible<Vec<DomRoot<ReadableStream>>> {
self.tee(false, can_gc)
}
}
#[allow(unsafe_code)]
pub fn get_read_promise_done(cx: SafeJSContext, v: &SafeHandleValue) -> Result<bool, Error> {
if !v.is_object() {
return Err(Error::Type("Unknown format for done property.".to_string()));
}
unsafe {
rooted!(in(*cx) let object = v.to_object());
rooted!(in(*cx) let mut done = UndefinedValue());
match get_dictionary_property(*cx, object.handle(), "done", done.handle_mut()) {
Ok(true) => match bool::from_jsval(*cx, done.handle(), ()) {
Ok(ConversionResult::Success(val)) => Ok(val),
Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.to_string())),
_ => Err(Error::Type("Unknown format for done property.".to_string())),
},
Ok(false) => Err(Error::Type("Promise has no done property.".to_string())),
Err(()) => Err(Error::JSFailed),
}
}
}
#[allow(unsafe_code)]
pub fn get_read_promise_bytes(cx: SafeJSContext, v: &SafeHandleValue) -> Result<Vec<u8>, Error> {
if !v.is_object() {
return Err(Error::Type(
"Unknown format for for bytes read.".to_string(),
));
}
unsafe {
rooted!(in(*cx) let object = v.to_object());
rooted!(in(*cx) let mut bytes = UndefinedValue());
match get_dictionary_property(*cx, object.handle(), "value", bytes.handle_mut()) {
Ok(true) => {
match Vec::<u8>::from_jsval(*cx, bytes.handle(), ConversionBehavior::EnforceRange) {
Ok(ConversionResult::Success(val)) => Ok(val),
Ok(ConversionResult::Failure(error)) => Err(Error::Type(error.to_string())),
_ => Err(Error::Type("Unknown format for bytes read.".to_string())),
}
},
Ok(false) => Err(Error::Type("Promise has no value property.".to_string())),
Err(()) => Err(Error::JSFailed),
}
}
}