use std::rc::Rc;
use std::{ptr, str};
use encoding_rs::UTF_8;
use ipc_channel::ipc::{self, IpcReceiver, IpcSender};
use ipc_channel::router::ROUTER;
use js::jsapi::{Heap, JSObject, JS_ClearPendingException, Value as JSValue};
use js::jsval::{JSVal, UndefinedValue};
use js::rust::wrappers::{JS_GetPendingException, JS_ParseJSON};
use js::rust::HandleValue;
use js::typedarray::{ArrayBuffer, CreateWith};
use mime::{self, Mime};
use net_traits::request::{
BodyChunkRequest, BodyChunkResponse, BodySource as NetBodySource, RequestBody,
};
use script_traits::serializable::BlobImpl;
use url::form_urlencoded;
use crate::dom::bindings::cell::DomRefCell;
use crate::dom::bindings::codegen::Bindings::BlobBinding::Blob_Binding::BlobMethods;
use crate::dom::bindings::codegen::Bindings::FormDataBinding::FormDataMethods;
use crate::dom::bindings::codegen::Bindings::XMLHttpRequestBinding::BodyInit;
use crate::dom::bindings::error::{Error, Fallible};
use crate::dom::bindings::refcounted::Trusted;
use crate::dom::bindings::reflector::DomObject;
use crate::dom::bindings::root::DomRoot;
use crate::dom::bindings::str::{DOMString, USVString};
use crate::dom::bindings::trace::RootedTraceableBox;
use crate::dom::blob::{normalize_type_string, Blob};
use crate::dom::formdata::FormData;
use crate::dom::globalscope::GlobalScope;
use crate::dom::htmlformelement::{encode_multipart_form_data, generate_boundary};
use crate::dom::promise::Promise;
use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
use crate::dom::readablestream::{get_read_promise_bytes, get_read_promise_done, ReadableStream};
use crate::dom::urlsearchparams::URLSearchParams;
use crate::realms::{enter_realm, AlreadyInRealm, InRealm};
use crate::script_runtime::{CanGc, JSContext};
use crate::task::TaskCanceller;
use crate::task_source::networking::NetworkingTaskSource;
use crate::task_source::{TaskSource, TaskSourceName};
#[derive(Clone, PartialEq)]
pub enum BodySource {
Null,
Object,
}
enum StopReading {
Error,
Done,
}
#[derive(Clone)]
struct TransmitBodyConnectHandler {
stream: Trusted<ReadableStream>,
task_source: NetworkingTaskSource,
canceller: TaskCanceller,
bytes_sender: Option<IpcSender<BodyChunkResponse>>,
control_sender: IpcSender<BodyChunkRequest>,
in_memory: Option<Vec<u8>>,
in_memory_done: bool,
source: BodySource,
}
impl TransmitBodyConnectHandler {
pub fn new(
stream: Trusted<ReadableStream>,
task_source: NetworkingTaskSource,
canceller: TaskCanceller,
control_sender: IpcSender<BodyChunkRequest>,
in_memory: Option<Vec<u8>>,
source: BodySource,
) -> TransmitBodyConnectHandler {
TransmitBodyConnectHandler {
stream,
task_source,
canceller,
bytes_sender: None,
control_sender,
in_memory,
in_memory_done: false,
source,
}
}
pub fn reset_in_memory_done(&mut self) {
self.in_memory_done = false;
}
fn re_extract(&mut self, chunk_request_receiver: IpcReceiver<BodyChunkRequest>) {
let mut body_handler = self.clone();
body_handler.reset_in_memory_done();
ROUTER.add_route(
chunk_request_receiver.to_opaque(),
Box::new(move |message| {
let request = message.to().unwrap();
match request {
BodyChunkRequest::Connect(sender) => {
body_handler.start_reading(sender);
},
BodyChunkRequest::Extract(receiver) => {
body_handler.re_extract(receiver);
},
BodyChunkRequest::Chunk => body_handler.transmit_source(),
BodyChunkRequest::Done => {
body_handler.stop_reading(StopReading::Done);
},
BodyChunkRequest::Error => {
body_handler.stop_reading(StopReading::Error);
},
}
}),
);
}
fn transmit_source(&mut self) {
if self.in_memory_done {
self.stop_reading(StopReading::Done);
return;
}
if let BodySource::Null = self.source {
panic!("ReadableStream(Null) sources should not re-direct.");
}
if let Some(bytes) = self.in_memory.clone() {
self.in_memory_done = true;
let _ = self
.bytes_sender
.as_ref()
.expect("No bytes sender to transmit source.")
.send(BodyChunkResponse::Chunk(bytes.clone()));
return;
}
warn!("Re-directs for file-based Blobs not supported yet.");
}
fn start_reading(&mut self, sender: IpcSender<BodyChunkResponse>) {
self.bytes_sender = Some(sender);
if self.source == BodySource::Null {
let stream = self.stream.clone();
let _ = self.task_source.queue_with_canceller(
task!(start_reading_request_body_stream: move || {
let rooted_stream = stream.root();
rooted_stream.start_reading().expect("Couldn't acquire a reader for the body stream.");
}),
&self.canceller,
);
}
}
fn stop_reading(&mut self, reason: StopReading) {
let bytes_sender = self
.bytes_sender
.take()
.expect("Stop reading called multiple times on TransmitBodyConnectHandler.");
match reason {
StopReading::Error => {
let _ = bytes_sender.send(BodyChunkResponse::Error);
},
StopReading::Done => {
let _ = bytes_sender.send(BodyChunkResponse::Done);
},
}
}
fn transmit_body_chunk(&mut self) {
if self.in_memory_done {
self.stop_reading(StopReading::Done);
return;
}
let stream = self.stream.clone();
let control_sender = self.control_sender.clone();
let bytes_sender = self
.bytes_sender
.clone()
.expect("No bytes sender to transmit chunk.");
if let Some(bytes) = self.in_memory.clone() {
let _ = bytes_sender.send(BodyChunkResponse::Chunk(bytes));
self.in_memory_done = true;
return;
}
let _ = self.task_source.queue_with_canceller(
task!(setup_native_body_promise_handler: move || {
let rooted_stream = stream.root();
let global = rooted_stream.global();
let promise = rooted_stream.read_a_chunk();
let promise_handler = Box::new(TransmitBodyPromiseHandler {
bytes_sender: bytes_sender.clone(),
stream: rooted_stream.clone(),
control_sender: control_sender.clone(),
});
let rejection_handler = Box::new(TransmitBodyPromiseRejectionHandler {
bytes_sender,
stream: rooted_stream,
control_sender,
});
let handler =
PromiseNativeHandler::new(&global, Some(promise_handler), Some(rejection_handler));
let realm = enter_realm(&*global);
let comp = InRealm::Entered(&realm);
promise.append_native_handler(&handler, comp);
}),
&self.canceller,
);
}
}
#[derive(Clone, JSTraceable, MallocSizeOf)]
struct TransmitBodyPromiseHandler {
#[ignore_malloc_size_of = "Channels are hard"]
#[no_trace]
bytes_sender: IpcSender<BodyChunkResponse>,
stream: DomRoot<ReadableStream>,
#[ignore_malloc_size_of = "Channels are hard"]
#[no_trace]
control_sender: IpcSender<BodyChunkRequest>,
}
impl Callback for TransmitBodyPromiseHandler {
fn callback(&self, cx: JSContext, v: HandleValue, _realm: InRealm, _can_gc: CanGc) {
let is_done = match get_read_promise_done(cx, &v) {
Ok(is_done) => is_done,
Err(_) => {
let _ = self.control_sender.send(BodyChunkRequest::Done);
return self.stream.stop_reading();
},
};
if is_done {
let _ = self.control_sender.send(BodyChunkRequest::Done);
return self.stream.stop_reading();
}
let chunk = match get_read_promise_bytes(cx, &v) {
Ok(chunk) => chunk,
Err(_) => {
let _ = self.control_sender.send(BodyChunkRequest::Error);
return self.stream.stop_reading();
},
};
let _ = self.bytes_sender.send(BodyChunkResponse::Chunk(chunk));
}
}
#[derive(Clone, JSTraceable, MallocSizeOf)]
struct TransmitBodyPromiseRejectionHandler {
#[ignore_malloc_size_of = "Channels are hard"]
#[no_trace]
bytes_sender: IpcSender<BodyChunkResponse>,
stream: DomRoot<ReadableStream>,
#[ignore_malloc_size_of = "Channels are hard"]
#[no_trace]
control_sender: IpcSender<BodyChunkRequest>,
}
impl Callback for TransmitBodyPromiseRejectionHandler {
fn callback(&self, _cx: JSContext, _v: HandleValue, _realm: InRealm, _can_gc: CanGc) {
let _ = self.control_sender.send(BodyChunkRequest::Error);
self.stream.stop_reading();
}
}
pub struct ExtractedBody {
pub stream: DomRoot<ReadableStream>,
pub source: BodySource,
pub total_bytes: Option<usize>,
pub content_type: Option<DOMString>,
}
impl ExtractedBody {
pub fn into_net_request_body(self) -> (RequestBody, DomRoot<ReadableStream>) {
let ExtractedBody {
stream,
total_bytes,
content_type: _,
source,
} = self;
let (chunk_request_sender, chunk_request_receiver) = ipc::channel().unwrap();
let trusted_stream = Trusted::new(&*stream);
let global = stream.global();
let task_source = global.networking_task_source();
let canceller = global.task_canceller(TaskSourceName::Networking);
let in_memory = stream.get_in_memory_bytes();
let net_source = match source {
BodySource::Null => NetBodySource::Null,
_ => NetBodySource::Object,
};
let mut body_handler = TransmitBodyConnectHandler::new(
trusted_stream,
task_source,
canceller,
chunk_request_sender.clone(),
in_memory,
source,
);
ROUTER.add_route(
chunk_request_receiver.to_opaque(),
Box::new(move |message| {
let request = message.to().unwrap();
match request {
BodyChunkRequest::Connect(sender) => {
body_handler.start_reading(sender);
},
BodyChunkRequest::Extract(receiver) => {
body_handler.re_extract(receiver);
},
BodyChunkRequest::Chunk => body_handler.transmit_body_chunk(),
BodyChunkRequest::Done => {
body_handler.stop_reading(StopReading::Done);
},
BodyChunkRequest::Error => {
body_handler.stop_reading(StopReading::Error);
},
}
}),
);
let request_body = RequestBody::new(chunk_request_sender, net_source, total_bytes);
(request_body, stream)
}
pub fn in_memory(&self) -> bool {
self.stream.in_memory()
}
}
pub trait Extractable {
fn extract(&self, global: &GlobalScope) -> Fallible<ExtractedBody>;
}
impl Extractable for BodyInit {
fn extract(&self, global: &GlobalScope) -> Fallible<ExtractedBody> {
match self {
BodyInit::String(ref s) => s.extract(global),
BodyInit::URLSearchParams(ref usp) => usp.extract(global),
BodyInit::Blob(ref b) => b.extract(global),
BodyInit::FormData(ref formdata) => formdata.extract(global),
BodyInit::ArrayBuffer(ref typedarray) => {
let bytes = typedarray.to_vec();
let total_bytes = bytes.len();
let stream = ReadableStream::new_from_bytes(global, bytes);
Ok(ExtractedBody {
stream,
total_bytes: Some(total_bytes),
content_type: None,
source: BodySource::Object,
})
},
BodyInit::ArrayBufferView(ref typedarray) => {
let bytes = typedarray.to_vec();
let total_bytes = bytes.len();
let stream = ReadableStream::new_from_bytes(global, bytes);
Ok(ExtractedBody {
stream,
total_bytes: Some(total_bytes),
content_type: None,
source: BodySource::Object,
})
},
BodyInit::ReadableStream(stream) => {
if stream.is_locked() || stream.is_disturbed() {
return Err(Error::Type(
"The body's stream is disturbed or locked".to_string(),
));
}
Ok(ExtractedBody {
stream: stream.clone(),
total_bytes: None,
content_type: None,
source: BodySource::Null,
})
},
}
}
}
impl Extractable for Vec<u8> {
fn extract(&self, global: &GlobalScope) -> Fallible<ExtractedBody> {
let bytes = self.clone();
let total_bytes = self.len();
let stream = ReadableStream::new_from_bytes(global, bytes);
Ok(ExtractedBody {
stream,
total_bytes: Some(total_bytes),
content_type: None,
source: BodySource::Object,
})
}
}
impl Extractable for Blob {
fn extract(&self, _global: &GlobalScope) -> Fallible<ExtractedBody> {
let blob_type = self.Type();
let content_type = if blob_type.as_ref().is_empty() {
None
} else {
Some(blob_type)
};
let total_bytes = self.Size() as usize;
Ok(ExtractedBody {
stream: self.get_stream(),
total_bytes: Some(total_bytes),
content_type,
source: BodySource::Object,
})
}
}
impl Extractable for DOMString {
fn extract(&self, global: &GlobalScope) -> Fallible<ExtractedBody> {
let bytes = self.as_bytes().to_owned();
let total_bytes = bytes.len();
let content_type = Some(DOMString::from("text/plain;charset=UTF-8"));
let stream = ReadableStream::new_from_bytes(global, bytes);
Ok(ExtractedBody {
stream,
total_bytes: Some(total_bytes),
content_type,
source: BodySource::Object,
})
}
}
impl Extractable for FormData {
fn extract(&self, global: &GlobalScope) -> Fallible<ExtractedBody> {
let boundary = generate_boundary();
let bytes = encode_multipart_form_data(&mut self.datums(), boundary.clone(), UTF_8);
let total_bytes = bytes.len();
let content_type = Some(DOMString::from(format!(
"multipart/form-data;boundary={}",
boundary
)));
let stream = ReadableStream::new_from_bytes(global, bytes);
Ok(ExtractedBody {
stream,
total_bytes: Some(total_bytes),
content_type,
source: BodySource::Object,
})
}
}
impl Extractable for URLSearchParams {
fn extract(&self, global: &GlobalScope) -> Fallible<ExtractedBody> {
let bytes = self.serialize_utf8().into_bytes();
let total_bytes = bytes.len();
let content_type = Some(DOMString::from(
"application/x-www-form-urlencoded;charset=UTF-8",
));
let stream = ReadableStream::new_from_bytes(global, bytes);
Ok(ExtractedBody {
stream,
total_bytes: Some(total_bytes),
content_type,
source: BodySource::Object,
})
}
}
#[derive(Clone, Copy, JSTraceable, MallocSizeOf)]
pub enum BodyType {
Blob,
FormData,
Json,
Text,
ArrayBuffer,
}
pub enum FetchedData {
Text(String),
Json(RootedTraceableBox<Heap<JSValue>>),
BlobData(DomRoot<Blob>),
FormData(DomRoot<FormData>),
ArrayBuffer(RootedTraceableBox<Heap<*mut JSObject>>),
JSException(RootedTraceableBox<Heap<JSVal>>),
}
#[derive(Clone, JSTraceable, MallocSizeOf)]
struct ConsumeBodyPromiseRejectionHandler {
#[ignore_malloc_size_of = "Rc are hard"]
result_promise: Rc<Promise>,
}
impl Callback for ConsumeBodyPromiseRejectionHandler {
fn callback(&self, cx: JSContext, v: HandleValue, _realm: InRealm, _can_gc: CanGc) {
self.result_promise.reject(cx, v);
}
}
#[derive(Clone, JSTraceable, MallocSizeOf)]
struct ConsumeBodyPromiseHandler {
#[ignore_malloc_size_of = "Rc are hard"]
result_promise: Rc<Promise>,
stream: Option<DomRoot<ReadableStream>>,
body_type: DomRefCell<Option<BodyType>>,
mime_type: DomRefCell<Option<Vec<u8>>>,
bytes: DomRefCell<Option<Vec<u8>>>,
}
impl ConsumeBodyPromiseHandler {
fn resolve_result_promise(&self, cx: JSContext, can_gc: CanGc) {
let body_type = self.body_type.borrow_mut().take().unwrap();
let mime_type = self.mime_type.borrow_mut().take().unwrap();
let body = self.bytes.borrow_mut().take().unwrap();
let pkg_data_results = run_package_data_algorithm(cx, body, body_type, mime_type, can_gc);
match pkg_data_results {
Ok(results) => {
match results {
FetchedData::Text(s) => self.result_promise.resolve_native(&USVString(s)),
FetchedData::Json(j) => self.result_promise.resolve_native(&j),
FetchedData::BlobData(b) => self.result_promise.resolve_native(&b),
FetchedData::FormData(f) => self.result_promise.resolve_native(&f),
FetchedData::ArrayBuffer(a) => self.result_promise.resolve_native(&a),
FetchedData::JSException(e) => self.result_promise.reject_native(&e.handle()),
};
},
Err(err) => self.result_promise.reject_error(err),
}
}
}
impl Callback for ConsumeBodyPromiseHandler {
fn callback(&self, cx: JSContext, v: HandleValue, _realm: InRealm, can_gc: CanGc) {
let stream = self
.stream
.as_ref()
.expect("ConsumeBodyPromiseHandler has no stream in callback.");
let is_done = match get_read_promise_done(cx, &v) {
Ok(is_done) => is_done,
Err(err) => {
stream.stop_reading();
return self.result_promise.reject_error(err);
},
};
if is_done {
self.resolve_result_promise(cx, can_gc);
} else {
let chunk = match get_read_promise_bytes(cx, &v) {
Ok(chunk) => chunk,
Err(err) => {
stream.stop_reading();
return self.result_promise.reject_error(err);
},
};
let mut bytes = self
.bytes
.borrow_mut()
.take()
.expect("No bytes for ConsumeBodyPromiseHandler.");
bytes.extend_from_slice(&chunk);
let global = stream.global();
let read_promise = stream.read_a_chunk();
let promise_handler = Box::new(ConsumeBodyPromiseHandler {
result_promise: self.result_promise.clone(),
stream: self.stream.clone(),
body_type: DomRefCell::new(self.body_type.borrow_mut().take()),
mime_type: DomRefCell::new(self.mime_type.borrow_mut().take()),
bytes: DomRefCell::new(Some(bytes)),
});
let rejection_handler = Box::new(ConsumeBodyPromiseRejectionHandler {
result_promise: self.result_promise.clone(),
});
let handler =
PromiseNativeHandler::new(&global, Some(promise_handler), Some(rejection_handler));
let realm = enter_realm(&*global);
let comp = InRealm::Entered(&realm);
read_promise.append_native_handler(&handler, comp);
}
}
}
#[allow(crown::unrooted_must_root)]
pub fn consume_body<T: BodyMixin + DomObject>(object: &T, body_type: BodyType) -> Rc<Promise> {
let in_realm_proof = AlreadyInRealm::assert();
let promise = Promise::new_in_current_realm(InRealm::Already(&in_realm_proof));
if object.is_disturbed() || object.is_locked() {
promise.reject_error(Error::Type(
"The body's stream is disturbed or locked".to_string(),
));
return promise;
}
consume_body_with_promise(
object,
body_type,
promise.clone(),
InRealm::Already(&in_realm_proof),
);
promise
}
#[allow(crown::unrooted_must_root)]
fn consume_body_with_promise<T: BodyMixin + DomObject>(
object: &T,
body_type: BodyType,
promise: Rc<Promise>,
comp: InRealm,
) {
let global = object.global();
let stream = match object.body() {
Some(stream) => stream,
None => ReadableStream::new_from_bytes(&global, Vec::with_capacity(0)),
};
if stream.start_reading().is_err() {
return promise.reject_error(Error::Type(
"The response's stream is disturbed or locked".to_string(),
));
}
let read_promise = stream.read_a_chunk();
let promise_handler = Box::new(ConsumeBodyPromiseHandler {
result_promise: promise.clone(),
stream: Some(stream),
body_type: DomRefCell::new(Some(body_type)),
mime_type: DomRefCell::new(Some(object.get_mime_type())),
bytes: DomRefCell::new(Some(vec![])),
});
let rejection_handler = Box::new(ConsumeBodyPromiseRejectionHandler {
result_promise: promise,
});
let handler = PromiseNativeHandler::new(
&object.global(),
Some(promise_handler),
Some(rejection_handler),
);
read_promise.append_native_handler(&handler, comp);
}
fn run_package_data_algorithm(
cx: JSContext,
bytes: Vec<u8>,
body_type: BodyType,
mime_type: Vec<u8>,
can_gc: CanGc,
) -> Fallible<FetchedData> {
let mime = &*mime_type;
let in_realm_proof = AlreadyInRealm::assert_for_cx(cx);
let global = GlobalScope::from_safe_context(cx, InRealm::Already(&in_realm_proof));
match body_type {
BodyType::Text => run_text_data_algorithm(bytes),
BodyType::Json => run_json_data_algorithm(cx, bytes),
BodyType::Blob => run_blob_data_algorithm(&global, bytes, mime, can_gc),
BodyType::FormData => run_form_data_algorithm(&global, bytes, mime),
BodyType::ArrayBuffer => run_array_buffer_data_algorithm(cx, bytes),
}
}
fn run_text_data_algorithm(bytes: Vec<u8>) -> Fallible<FetchedData> {
Ok(FetchedData::Text(
String::from_utf8_lossy(&bytes).into_owned(),
))
}
#[allow(unsafe_code)]
fn run_json_data_algorithm(cx: JSContext, bytes: Vec<u8>) -> Fallible<FetchedData> {
let json_text = String::from_utf8_lossy(&bytes);
let json_text: Vec<u16> = json_text.encode_utf16().collect();
rooted!(in(*cx) let mut rval = UndefinedValue());
unsafe {
if !JS_ParseJSON(
*cx,
json_text.as_ptr(),
json_text.len() as u32,
rval.handle_mut(),
) {
rooted!(in(*cx) let mut exception = UndefinedValue());
assert!(JS_GetPendingException(*cx, exception.handle_mut()));
JS_ClearPendingException(*cx);
return Ok(FetchedData::JSException(RootedTraceableBox::from_box(
Heap::boxed(exception.get()),
)));
}
let rooted_heap = RootedTraceableBox::from_box(Heap::boxed(rval.get()));
Ok(FetchedData::Json(rooted_heap))
}
}
fn run_blob_data_algorithm(
root: &GlobalScope,
bytes: Vec<u8>,
mime: &[u8],
can_gc: CanGc,
) -> Fallible<FetchedData> {
let mime_string = if let Ok(s) = String::from_utf8(mime.to_vec()) {
s
} else {
"".to_string()
};
let blob = Blob::new(
root,
BlobImpl::new_from_bytes(bytes, normalize_type_string(&mime_string)),
can_gc,
);
Ok(FetchedData::BlobData(blob))
}
fn run_form_data_algorithm(
root: &GlobalScope,
bytes: Vec<u8>,
mime: &[u8],
) -> Fallible<FetchedData> {
let mime_str = if let Ok(s) = str::from_utf8(mime) {
s
} else {
""
};
let mime: Mime = mime_str
.parse()
.map_err(|_| Error::Type("Inappropriate MIME-type for Body".to_string()))?;
if mime.type_() == mime::APPLICATION && mime.subtype() == mime::WWW_FORM_URLENCODED {
let entries = form_urlencoded::parse(&bytes);
let formdata = FormData::new(None, root);
for (k, e) in entries {
formdata.Append(USVString(k.into_owned()), USVString(e.into_owned()));
}
return Ok(FetchedData::FormData(formdata));
}
Err(Error::Type("Inappropriate MIME-type for Body".to_string()))
}
#[allow(unsafe_code)]
pub fn run_array_buffer_data_algorithm(cx: JSContext, bytes: Vec<u8>) -> Fallible<FetchedData> {
rooted!(in(*cx) let mut array_buffer_ptr = ptr::null_mut::<JSObject>());
let arraybuffer = unsafe {
ArrayBuffer::create(
*cx,
CreateWith::Slice(&bytes),
array_buffer_ptr.handle_mut(),
)
};
if arraybuffer.is_err() {
return Err(Error::JSFailed);
}
let rooted_heap = RootedTraceableBox::from_box(Heap::boxed(array_buffer_ptr.get()));
Ok(FetchedData::ArrayBuffer(rooted_heap))
}
pub trait BodyMixin {
fn is_disturbed(&self) -> bool;
fn body(&self) -> Option<DomRoot<ReadableStream>>;
fn is_locked(&self) -> bool;
fn get_mime_type(&self) -> Vec<u8>;
}