use std::convert::TryFrom;
use std::fmt::Debug;
use std::io::{Read, Seek};
use std::sync::mpsc;
use rayon_core::{ThreadPool, ThreadPoolBuildError};
use smallvec::alloc::sync::Arc;
use crate::block::{BlockIndex, UncompressedBlock};
use crate::block::chunk::{Chunk, TileCoordinates};
use crate::compression::Compression;
use crate::error::{Error, Result, u64_to_usize, UnitResult};
use crate::io::{PeekRead, Tracking};
use crate::meta::{MetaData, OffsetTables};
use crate::meta::header::Header;
#[derive(Debug)]
pub struct Reader<R> {
meta_data: MetaData,
remaining_reader: PeekRead<Tracking<R>>, }
impl<R: Read + Seek> Reader<R> {
pub fn read_from_buffered(read: R, pedantic: bool) -> Result<Self> {
let mut remaining_reader = PeekRead::new(Tracking::new(read));
let meta_data = MetaData::read_validated_from_buffered_peekable(&mut remaining_reader, pedantic)?;
Ok(Self { meta_data, remaining_reader })
}
pub fn meta_data(&self) -> &MetaData { &self.meta_data }
pub fn headers(&self) -> &[Header] { &self.meta_data.headers }
pub fn into_meta_data(self) -> MetaData { self.meta_data }
pub fn all_chunks(mut self, pedantic: bool) -> Result<AllChunksReader<R>> {
let total_chunk_count = {
if pedantic {
let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?;
validate_offset_tables(self.meta_data.headers.as_slice(), &offset_tables, self.remaining_reader.byte_position())?;
offset_tables.iter().map(|table| table.len()).sum()
}
else {
usize::try_from(MetaData::skip_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?)
.expect("too large chunk count for this machine")
}
};
Ok(AllChunksReader {
meta_data: self.meta_data,
remaining_chunks: 0 .. total_chunk_count,
remaining_bytes: self.remaining_reader,
pedantic
})
}
pub fn filter_chunks(mut self, pedantic: bool, mut filter: impl FnMut(&MetaData, TileCoordinates, BlockIndex) -> bool) -> Result<FilteredChunksReader<R>> {
let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?;
if pedantic {
validate_offset_tables(
self.meta_data.headers.as_slice(), &offset_tables,
self.remaining_reader.byte_position()
)?;
}
let mut filtered_offsets = Vec::with_capacity(
(self.meta_data.headers.len() * 32).min(2*2048)
);
for (header_index, header) in self.meta_data.headers.iter().enumerate() { for (block_index, tile) in header.blocks_increasing_y_order().enumerate() { let data_indices = header.get_absolute_block_pixel_coordinates(tile.location)?;
let block = BlockIndex {
layer: header_index,
level: tile.location.level_index,
pixel_position: data_indices.position.to_usize("data indices start")?,
pixel_size: data_indices.size,
};
if filter(&self.meta_data, tile.location, block) {
filtered_offsets.push(offset_tables[header_index][block_index]) }
};
}
filtered_offsets.sort_unstable(); if pedantic {
if filtered_offsets.windows(2).any(|pair| pair[0] == pair[1]) {
return Err(Error::invalid("chunk offset table"))
}
}
Ok(FilteredChunksReader {
meta_data: self.meta_data,
expected_filtered_chunk_count: filtered_offsets.len(),
remaining_filtered_chunk_indices: filtered_offsets.into_iter(),
remaining_bytes: self.remaining_reader
})
}
}
fn validate_offset_tables(headers: &[Header], offset_tables: &OffsetTables, chunks_start_byte: usize) -> UnitResult {
let max_pixel_bytes: usize = headers.iter() .map(|header| header.max_pixel_file_bytes())
.sum();
let end_byte = chunks_start_byte + max_pixel_bytes;
let is_invalid = offset_tables.iter().flatten().map(|&u64| u64_to_usize(u64))
.any(|chunk_start| chunk_start < chunks_start_byte || chunk_start > end_byte);
if is_invalid { Err(Error::invalid("offset table")) }
else { Ok(()) }
}
#[derive(Debug)]
pub struct FilteredChunksReader<R> {
meta_data: MetaData,
expected_filtered_chunk_count: usize,
remaining_filtered_chunk_indices: std::vec::IntoIter<u64>,
remaining_bytes: PeekRead<Tracking<R>>,
}
#[derive(Debug)]
pub struct AllChunksReader<R> {
meta_data: MetaData,
remaining_chunks: std::ops::Range<usize>,
remaining_bytes: PeekRead<Tracking<R>>,
pedantic: bool,
}
#[derive(Debug)]
pub struct OnProgressChunksReader<R, F> {
chunks_reader: R,
decoded_chunks: usize,
callback: F,
}
pub trait ChunksReader: Sized + Iterator<Item=Result<Chunk>> + ExactSizeIterator {
fn meta_data(&self) -> &MetaData;
fn headers(&self) -> &[Header] { &self.meta_data().headers }
fn expected_chunk_count(&self) -> usize;
fn read_next_chunk(&mut self) -> Option<Result<Chunk>> { self.next() }
fn on_progress<F>(self, on_progress: F) -> OnProgressChunksReader<Self, F> where F: FnMut(f64) {
OnProgressChunksReader { chunks_reader: self, callback: on_progress, decoded_chunks: 0 }
}
fn decompress_parallel(
self, pedantic: bool,
mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult
) -> UnitResult
{
let mut decompressor = match self.parallel_decompressor(pedantic) {
Err(old_self) => return old_self.decompress_sequential(pedantic, insert_block),
Ok(decompressor) => decompressor,
};
while let Some(block) = decompressor.next() {
insert_block(decompressor.meta_data(), block?)?;
}
debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks");
Ok(())
}
fn parallel_decompressor(self, pedantic: bool) -> std::result::Result<ParallelBlockDecompressor<Self>, Self> {
ParallelBlockDecompressor::new(self, pedantic)
}
fn decompress_sequential(
self, pedantic: bool,
mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult
) -> UnitResult
{
let mut decompressor = self.sequential_decompressor(pedantic);
while let Some(block) = decompressor.next() {
insert_block(decompressor.meta_data(), block?)?;
}
debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks");
Ok(())
}
fn sequential_decompressor(self, pedantic: bool) -> SequentialBlockDecompressor<Self> {
SequentialBlockDecompressor { remaining_chunks_reader: self, pedantic }
}
}
impl<R, F> ChunksReader for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {
fn meta_data(&self) -> &MetaData { self.chunks_reader.meta_data() }
fn expected_chunk_count(&self) -> usize { self.chunks_reader.expected_chunk_count() }
}
impl<R, F> ExactSizeIterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {}
impl<R, F> Iterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {
type Item = Result<Chunk>;
fn next(&mut self) -> Option<Self::Item> {
self.chunks_reader.next().map(|item|{
{
let total_chunks = self.expected_chunk_count() as f64;
let callback = &mut self.callback;
callback(self.decoded_chunks as f64 / total_chunks);
}
self.decoded_chunks += 1;
item
})
.or_else(||{
debug_assert_eq!(
self.decoded_chunks, self.expected_chunk_count(),
"chunks reader finished but not all chunks are decompressed"
);
let callback = &mut self.callback;
callback(1.0);
None
})
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.chunks_reader.size_hint()
}
}
impl<R: Read + Seek> ChunksReader for AllChunksReader<R> {
fn meta_data(&self) -> &MetaData { &self.meta_data }
fn expected_chunk_count(&self) -> usize { self.remaining_chunks.end }
}
impl<R: Read + Seek> ExactSizeIterator for AllChunksReader<R> {}
impl<R: Read + Seek> Iterator for AllChunksReader<R> {
type Item = Result<Chunk>;
fn next(&mut self) -> Option<Self::Item> {
let next_chunk = self.remaining_chunks.next()
.map(|_| Chunk::read(&mut self.remaining_bytes, &self.meta_data));
if self.pedantic && next_chunk.is_none() && self.remaining_bytes.peek_u8().is_ok() {
return Some(Err(Error::invalid("end of file expected")));
}
next_chunk
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.remaining_chunks.len(), Some(self.remaining_chunks.len()))
}
}
impl<R: Read + Seek> ChunksReader for FilteredChunksReader<R> {
fn meta_data(&self) -> &MetaData { &self.meta_data }
fn expected_chunk_count(&self) -> usize { self.expected_filtered_chunk_count }
}
impl<R: Read + Seek> ExactSizeIterator for FilteredChunksReader<R> {}
impl<R: Read + Seek> Iterator for FilteredChunksReader<R> {
type Item = Result<Chunk>;
fn next(&mut self) -> Option<Self::Item> {
self.remaining_filtered_chunk_indices.next().map(|next_chunk_location|{
self.remaining_bytes.skip_to( usize::try_from(next_chunk_location)
.expect("too large chunk position for this machine")
)?;
let meta_data = &self.meta_data;
Chunk::read(&mut self.remaining_bytes, meta_data)
})
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.remaining_filtered_chunk_indices.len(), Some(self.remaining_filtered_chunk_indices.len()))
}
}
#[derive(Debug)]
pub struct SequentialBlockDecompressor<R: ChunksReader> {
remaining_chunks_reader: R,
pedantic: bool,
}
impl<R: ChunksReader> SequentialBlockDecompressor<R> {
pub fn meta_data(&self) -> &MetaData { self.remaining_chunks_reader.meta_data() }
pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> {
self.remaining_chunks_reader.read_next_chunk().map(|compressed_chunk|{
UncompressedBlock::decompress_chunk(compressed_chunk?, &self.remaining_chunks_reader.meta_data(), self.pedantic)
})
}
}
#[derive(Debug)]
pub struct ParallelBlockDecompressor<R: ChunksReader> {
remaining_chunks: R,
sender: mpsc::Sender<Result<UncompressedBlock>>,
receiver: mpsc::Receiver<Result<UncompressedBlock>>,
currently_decompressing_count: usize,
max_threads: usize,
shared_meta_data_ref: Arc<MetaData>,
pedantic: bool,
pool: ThreadPool,
}
impl<R: ChunksReader> ParallelBlockDecompressor<R> {
pub fn new(chunks: R, pedantic: bool) -> std::result::Result<Self, R> {
Self::new_with_thread_pool(chunks, pedantic, ||{
rayon_core::ThreadPoolBuilder::new()
.thread_name(|index| format!("OpenEXR Block Decompressor Thread #{}", index))
.build()
})
}
pub fn new_with_thread_pool<CreatePool>(chunks: R, pedantic: bool, try_create_thread_pool: CreatePool)
-> std::result::Result<Self, R>
where CreatePool: FnOnce() -> std::result::Result<ThreadPool, ThreadPoolBuildError>
{
if chunks.meta_data().headers.iter()
.all(|head|head.compression == Compression::Uncompressed)
{
return Err(chunks);
}
let pool = match try_create_thread_pool() {
Ok(pool) => pool,
Err(_) => return Err(chunks),
};
let max_threads = pool.current_num_threads().max(1).min(chunks.len()) + 2; let (send, recv) = mpsc::channel(); Ok(Self {
shared_meta_data_ref: Arc::new(chunks.meta_data().clone()),
currently_decompressing_count: 0,
remaining_chunks: chunks,
sender: send,
receiver: recv,
pedantic,
max_threads,
pool,
})
}
pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> {
while self.currently_decompressing_count < self.max_threads {
let block = self.remaining_chunks.next();
if let Some(block) = block {
let block = match block {
Ok(block) => block,
Err(error) => return Some(Err(error))
};
let sender = self.sender.clone();
let meta = self.shared_meta_data_ref.clone();
let pedantic = self.pedantic;
self.currently_decompressing_count += 1;
self.pool.spawn(move || {
let decompressed_or_err = UncompressedBlock::decompress_chunk(
block, &meta, pedantic
);
let _ = sender.send(decompressed_or_err);
});
}
else {
break;
}
}
if self.currently_decompressing_count > 0 {
let next = self.receiver.recv()
.expect("all decompressing senders hung up but more messages were expected");
self.currently_decompressing_count -= 1;
Some(next)
}
else {
debug_assert!(self.receiver.try_recv().is_err(), "uncompressed chunks left in channel after decompressing all chunks"); debug_assert_eq!(self.len(), 0, "compressed chunks left after decompressing all chunks");
None
}
}
pub fn meta_data(&self) -> &MetaData { self.remaining_chunks.meta_data() }
}
impl<R: ChunksReader> ExactSizeIterator for SequentialBlockDecompressor<R> {}
impl<R: ChunksReader> Iterator for SequentialBlockDecompressor<R> {
type Item = Result<UncompressedBlock>;
fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() }
fn size_hint(&self) -> (usize, Option<usize>) { self.remaining_chunks_reader.size_hint() }
}
impl<R: ChunksReader> ExactSizeIterator for ParallelBlockDecompressor<R> {}
impl<R: ChunksReader> Iterator for ParallelBlockDecompressor<R> {
type Item = Result<UncompressedBlock>;
fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() }
fn size_hint(&self) -> (usize, Option<usize>) {
let remaining = self.remaining_chunks.len() + self.currently_decompressing_count;
(remaining, Some(remaining))
}
}