1use std::convert::TryFrom;
5use std::fmt::Debug;
6use std::io::{Read, Seek};
7use std::sync::mpsc;
8use rayon_core::{ThreadPool, ThreadPoolBuildError};
9
10use smallvec::alloc::sync::Arc;
11
12use crate::block::{BlockIndex, UncompressedBlock};
13use crate::block::chunk::{Chunk, TileCoordinates};
14use crate::compression::Compression;
15use crate::error::{Error, Result, u64_to_usize, UnitResult};
16use crate::io::{PeekRead, Tracking};
17use crate::meta::{MetaData, OffsetTables};
18use crate::meta::header::Header;
19
20#[derive(Debug)]
23pub struct Reader<R> {
24 meta_data: MetaData,
25 remaining_reader: PeekRead<Tracking<R>>, }
27
28impl<R: Read + Seek> Reader<R> {
29
30 pub fn read_from_buffered(read: R, pedantic: bool) -> Result<Self> {
34 let mut remaining_reader = PeekRead::new(Tracking::new(read));
35 let meta_data = MetaData::read_validated_from_buffered_peekable(&mut remaining_reader, pedantic)?;
36 Ok(Self { meta_data, remaining_reader })
37 }
38
39 pub fn meta_data(&self) -> &MetaData { &self.meta_data }
42
43 pub fn headers(&self) -> &[Header] { &self.meta_data.headers }
45
46 pub fn into_meta_data(self) -> MetaData { self.meta_data }
48
49 pub fn all_chunks(mut self, pedantic: bool) -> Result<AllChunksReader<R>> {
53 let total_chunk_count = {
54 if pedantic {
55 let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?;
56 validate_offset_tables(self.meta_data.headers.as_slice(), &offset_tables, self.remaining_reader.byte_position())?;
57 offset_tables.iter().map(|table| table.len()).sum()
58 }
59 else {
60 usize::try_from(MetaData::skip_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?)
61 .expect("too large chunk count for this machine")
62 }
63 };
64
65 Ok(AllChunksReader {
66 meta_data: self.meta_data,
67 remaining_chunks: 0 .. total_chunk_count,
68 remaining_bytes: self.remaining_reader,
69 pedantic
70 })
71 }
72
73 pub fn filter_chunks(mut self, pedantic: bool, mut filter: impl FnMut(&MetaData, TileCoordinates, BlockIndex) -> bool) -> Result<FilteredChunksReader<R>> {
78 let offset_tables = MetaData::read_offset_tables(&mut self.remaining_reader, &self.meta_data.headers)?;
79
80 if pedantic {
82 validate_offset_tables(
83 self.meta_data.headers.as_slice(), &offset_tables,
84 self.remaining_reader.byte_position()
85 )?;
86 }
87
88 let mut filtered_offsets = Vec::with_capacity(
89 (self.meta_data.headers.len() * 32).min(2*2048)
90 );
91
92 for (header_index, header) in self.meta_data.headers.iter().enumerate() { for (block_index, tile) in header.blocks_increasing_y_order().enumerate() { let data_indices = header.get_absolute_block_pixel_coordinates(tile.location)?;
97
98 let block = BlockIndex {
99 layer: header_index,
100 level: tile.location.level_index,
101 pixel_position: data_indices.position.to_usize("data indices start")?,
102 pixel_size: data_indices.size,
103 };
104
105 if filter(&self.meta_data, tile.location, block) {
106 filtered_offsets.push(offset_tables[header_index][block_index]) }
108 };
109 }
110
111 filtered_offsets.sort_unstable(); if pedantic {
114 if filtered_offsets.windows(2).any(|pair| pair[0] == pair[1]) {
116 return Err(Error::invalid("chunk offset table"))
117 }
118 }
119
120 Ok(FilteredChunksReader {
121 meta_data: self.meta_data,
122 expected_filtered_chunk_count: filtered_offsets.len(),
123 remaining_filtered_chunk_indices: filtered_offsets.into_iter(),
124 remaining_bytes: self.remaining_reader
125 })
126 }
127}
128
129
130fn validate_offset_tables(headers: &[Header], offset_tables: &OffsetTables, chunks_start_byte: usize) -> UnitResult {
131 let max_pixel_bytes: usize = headers.iter() .map(|header| header.max_pixel_file_bytes())
133 .sum();
134
135 let end_byte = chunks_start_byte + max_pixel_bytes;
137 let is_invalid = offset_tables.iter().flatten().map(|&u64| u64_to_usize(u64))
138 .any(|chunk_start| chunk_start < chunks_start_byte || chunk_start > end_byte);
139
140 if is_invalid { Err(Error::invalid("offset table")) }
141 else { Ok(()) }
142}
143
144
145
146
147#[derive(Debug)]
153pub struct FilteredChunksReader<R> {
154 meta_data: MetaData,
155 expected_filtered_chunk_count: usize,
156 remaining_filtered_chunk_indices: std::vec::IntoIter<u64>,
157 remaining_bytes: PeekRead<Tracking<R>>,
158}
159
160#[derive(Debug)]
166pub struct AllChunksReader<R> {
167 meta_data: MetaData,
168 remaining_chunks: std::ops::Range<usize>,
169 remaining_bytes: PeekRead<Tracking<R>>,
170 pedantic: bool,
171}
172
173#[derive(Debug)]
179pub struct OnProgressChunksReader<R, F> {
180 chunks_reader: R,
181 decoded_chunks: usize,
182 callback: F,
183}
184
185pub trait ChunksReader: Sized + Iterator<Item=Result<Chunk>> + ExactSizeIterator {
191
192 fn meta_data(&self) -> &MetaData;
194
195 fn headers(&self) -> &[Header] { &self.meta_data().headers }
197
198 fn expected_chunk_count(&self) -> usize;
201
202 fn read_next_chunk(&mut self) -> Option<Result<Chunk>> { self.next() }
206
207 fn on_progress<F>(self, on_progress: F) -> OnProgressChunksReader<Self, F> where F: FnMut(f64) {
212 OnProgressChunksReader { chunks_reader: self, callback: on_progress, decoded_chunks: 0 }
213 }
214
215 fn decompress_parallel(
221 self, pedantic: bool,
222 mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult
223 ) -> UnitResult
224 {
225 let mut decompressor = match self.parallel_decompressor(pedantic) {
226 Err(old_self) => return old_self.decompress_sequential(pedantic, insert_block),
227 Ok(decompressor) => decompressor,
228 };
229
230 while let Some(block) = decompressor.next() {
231 insert_block(decompressor.meta_data(), block?)?;
232 }
233
234 debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks");
235 Ok(())
236 }
237
238 fn parallel_decompressor(self, pedantic: bool) -> std::result::Result<ParallelBlockDecompressor<Self>, Self> {
244 ParallelBlockDecompressor::new(self, pedantic)
245 }
246
247 fn decompress_sequential(
250 self, pedantic: bool,
251 mut insert_block: impl FnMut(&MetaData, UncompressedBlock) -> UnitResult
252 ) -> UnitResult
253 {
254 let mut decompressor = self.sequential_decompressor(pedantic);
255 while let Some(block) = decompressor.next() {
256 insert_block(decompressor.meta_data(), block?)?;
257 }
258
259 debug_assert_eq!(decompressor.len(), 0, "compressed blocks left after decompressing all blocks");
260 Ok(())
261 }
262
263 fn sequential_decompressor(self, pedantic: bool) -> SequentialBlockDecompressor<Self> {
265 SequentialBlockDecompressor { remaining_chunks_reader: self, pedantic }
266 }
267}
268
269impl<R, F> ChunksReader for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {
270 fn meta_data(&self) -> &MetaData { self.chunks_reader.meta_data() }
271 fn expected_chunk_count(&self) -> usize { self.chunks_reader.expected_chunk_count() }
272}
273
274impl<R, F> ExactSizeIterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {}
275impl<R, F> Iterator for OnProgressChunksReader<R, F> where R: ChunksReader, F: FnMut(f64) {
276 type Item = Result<Chunk>;
277
278 fn next(&mut self) -> Option<Self::Item> {
279 self.chunks_reader.next().map(|item|{
280 {
281 let total_chunks = self.expected_chunk_count() as f64;
282 let callback = &mut self.callback;
283 callback(self.decoded_chunks as f64 / total_chunks);
284 }
285
286 self.decoded_chunks += 1;
287 item
288 })
289 .or_else(||{
290 debug_assert_eq!(
291 self.decoded_chunks, self.expected_chunk_count(),
292 "chunks reader finished but not all chunks are decompressed"
293 );
294
295 let callback = &mut self.callback;
296 callback(1.0);
297 None
298 })
299 }
300
301 fn size_hint(&self) -> (usize, Option<usize>) {
302 self.chunks_reader.size_hint()
303 }
304}
305
306impl<R: Read + Seek> ChunksReader for AllChunksReader<R> {
307 fn meta_data(&self) -> &MetaData { &self.meta_data }
308 fn expected_chunk_count(&self) -> usize { self.remaining_chunks.end }
309}
310
311impl<R: Read + Seek> ExactSizeIterator for AllChunksReader<R> {}
312impl<R: Read + Seek> Iterator for AllChunksReader<R> {
313 type Item = Result<Chunk>;
314
315 fn next(&mut self) -> Option<Self::Item> {
316 let next_chunk = self.remaining_chunks.next()
318 .map(|_| Chunk::read(&mut self.remaining_bytes, &self.meta_data));
319
320 if self.pedantic && next_chunk.is_none() && self.remaining_bytes.peek_u8().is_ok() {
322 return Some(Err(Error::invalid("end of file expected")));
323 }
324
325 next_chunk
326 }
327
328 fn size_hint(&self) -> (usize, Option<usize>) {
329 (self.remaining_chunks.len(), Some(self.remaining_chunks.len()))
330 }
331}
332
333impl<R: Read + Seek> ChunksReader for FilteredChunksReader<R> {
334 fn meta_data(&self) -> &MetaData { &self.meta_data }
335 fn expected_chunk_count(&self) -> usize { self.expected_filtered_chunk_count }
336}
337
338impl<R: Read + Seek> ExactSizeIterator for FilteredChunksReader<R> {}
339impl<R: Read + Seek> Iterator for FilteredChunksReader<R> {
340 type Item = Result<Chunk>;
341
342 fn next(&mut self) -> Option<Self::Item> {
343 self.remaining_filtered_chunk_indices.next().map(|next_chunk_location|{
345 self.remaining_bytes.skip_to( usize::try_from(next_chunk_location)
347 .expect("too large chunk position for this machine")
348 )?;
349
350 let meta_data = &self.meta_data;
351 Chunk::read(&mut self.remaining_bytes, meta_data)
352 })
353
354 }
356
357 fn size_hint(&self) -> (usize, Option<usize>) {
358 (self.remaining_filtered_chunk_indices.len(), Some(self.remaining_filtered_chunk_indices.len()))
359 }
360}
361
362#[derive(Debug)]
365pub struct SequentialBlockDecompressor<R: ChunksReader> {
366 remaining_chunks_reader: R,
367 pedantic: bool,
368}
369
370impl<R: ChunksReader> SequentialBlockDecompressor<R> {
371
372 pub fn meta_data(&self) -> &MetaData { self.remaining_chunks_reader.meta_data() }
374
375 pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> {
377 self.remaining_chunks_reader.read_next_chunk().map(|compressed_chunk|{
378 UncompressedBlock::decompress_chunk(compressed_chunk?, &self.remaining_chunks_reader.meta_data(), self.pedantic)
379 })
380 }
381}
382
383#[derive(Debug)]
389pub struct ParallelBlockDecompressor<R: ChunksReader> {
390 remaining_chunks: R,
391 sender: mpsc::Sender<Result<UncompressedBlock>>,
392 receiver: mpsc::Receiver<Result<UncompressedBlock>>,
393 currently_decompressing_count: usize,
394 max_threads: usize,
395
396 shared_meta_data_ref: Arc<MetaData>,
397 pedantic: bool,
398
399 pool: ThreadPool,
400}
401
402impl<R: ChunksReader> ParallelBlockDecompressor<R> {
403
404 pub fn new(chunks: R, pedantic: bool) -> std::result::Result<Self, R> {
409 Self::new_with_thread_pool(chunks, pedantic, ||{
410 rayon_core::ThreadPoolBuilder::new()
411 .thread_name(|index| format!("OpenEXR Block Decompressor Thread #{}", index))
412 .build()
413 })
414 }
415
416 pub fn new_with_thread_pool<CreatePool>(chunks: R, pedantic: bool, try_create_thread_pool: CreatePool)
420 -> std::result::Result<Self, R>
421 where CreatePool: FnOnce() -> std::result::Result<ThreadPool, ThreadPoolBuildError>
422 {
423 if chunks.meta_data().headers.iter()
425 .all(|head|head.compression == Compression::Uncompressed)
426 {
427 return Err(chunks);
428 }
429
430 let pool = match try_create_thread_pool() {
433 Ok(pool) => pool,
434
435 Err(_) => return Err(chunks),
437 };
438
439 let max_threads = pool.current_num_threads().max(1).min(chunks.len()) + 2; let (send, recv) = mpsc::channel(); Ok(Self {
444 shared_meta_data_ref: Arc::new(chunks.meta_data().clone()),
445 currently_decompressing_count: 0,
446 remaining_chunks: chunks,
447 sender: send,
448 receiver: recv,
449 pedantic,
450 max_threads,
451
452 pool,
453 })
454 }
455
456 pub fn decompress_next_block(&mut self) -> Option<Result<UncompressedBlock>> {
458
459 while self.currently_decompressing_count < self.max_threads {
460 let block = self.remaining_chunks.next();
461 if let Some(block) = block {
462 let block = match block {
463 Ok(block) => block,
464 Err(error) => return Some(Err(error))
465 };
466
467 let sender = self.sender.clone();
468 let meta = self.shared_meta_data_ref.clone();
469 let pedantic = self.pedantic;
470
471 self.currently_decompressing_count += 1;
472
473 self.pool.spawn(move || {
474 let decompressed_or_err = UncompressedBlock::decompress_chunk(
475 block, &meta, pedantic
476 );
477
478 let _ = sender.send(decompressed_or_err);
482 });
483 }
484 else {
485 break;
487 }
488 }
489
490 if self.currently_decompressing_count > 0 {
491 let next = self.receiver.recv()
492 .expect("all decompressing senders hung up but more messages were expected");
493
494 self.currently_decompressing_count -= 1;
495 Some(next)
496 }
497 else {
498 debug_assert!(self.receiver.try_recv().is_err(), "uncompressed chunks left in channel after decompressing all chunks"); debug_assert_eq!(self.len(), 0, "compressed chunks left after decompressing all chunks");
500 None
501 }
502 }
503
504 pub fn meta_data(&self) -> &MetaData { self.remaining_chunks.meta_data() }
506}
507
508impl<R: ChunksReader> ExactSizeIterator for SequentialBlockDecompressor<R> {}
509impl<R: ChunksReader> Iterator for SequentialBlockDecompressor<R> {
510 type Item = Result<UncompressedBlock>;
511 fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() }
512 fn size_hint(&self) -> (usize, Option<usize>) { self.remaining_chunks_reader.size_hint() }
513}
514
515impl<R: ChunksReader> ExactSizeIterator for ParallelBlockDecompressor<R> {}
516impl<R: ChunksReader> Iterator for ParallelBlockDecompressor<R> {
517 type Item = Result<UncompressedBlock>;
518 fn next(&mut self) -> Option<Self::Item> { self.decompress_next_block() }
519 fn size_hint(&self) -> (usize, Option<usize>) {
520 let remaining = self.remaining_chunks.len() + self.currently_decompressing_count;
521 (remaining, Some(remaining))
522 }
523}
524
525
526
527
528