From 7678585bba86635e25635141ce0a8384dd7dc2e3 Mon Sep 17 00:00:00 2001 From: Wim Pomp Date: Wed, 9 Oct 2024 20:30:45 +0200 Subject: [PATCH] - some more threading --- .gitignore | 1 + src/lib.rs | 112 +++++++++++++++++++++++++++++++++++++---------------- src/py.rs | 12 +----- 3 files changed, 81 insertions(+), 44 deletions(-) diff --git a/.gitignore b/.gitignore index 889ae51..cb58d10 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ /target/ /Cargo.lock /foo.tif +*.so diff --git a/src/lib.rs b/src/lib.rs index e900cf5..758632b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -#[cfg(not(feature = "nopython"))] +// #[cfg(not(feature = "nopython"))] mod py; use std::cmp::Ordering; @@ -10,6 +10,8 @@ use num::{Complex, Rational32, Zero}; use ndarray::{s, Array2}; use num::traits::ToBytes; use std::hash::{DefaultHasher, Hash, Hasher}; +use std::thread; +use std::thread::JoinHandle; use chrono::Utc; use zstd::stream::encode_all; use rayon::prelude::*; @@ -235,6 +237,16 @@ impl Tag { } +struct CompressedFrame { + bytes: Vec>, + image_width: u32, + image_length: u32, + tile_size: usize, + bits_per_sample: u16, + sample_format: u16 +} + + #[derive(Clone, Debug)] struct Frame { tileoffsets: Vec, @@ -245,7 +257,6 @@ struct Frame { sample_format: u16, tile_width: u16, tile_length: u16, - extra_tags: Vec } impl Frame { @@ -255,7 +266,7 @@ impl Frame { ) -> Self { Frame { tileoffsets, tilebytecounts, image_width, image_length, bits_per_sample, - sample_format, tile_width, tile_length, extra_tags: Vec::new() + sample_format, tile_width, tile_length } } } @@ -310,6 +321,7 @@ pub struct IJTiffFile { file: File, frames: HashMap<(usize, u8), Frame>, hashes: HashMap, + threads: HashMap<(usize, u8), JoinHandle>, pub shape: (usize, usize, usize), pub n_frames: usize, pub samples_per_pixel: u8, @@ -318,7 +330,8 @@ pub struct IJTiffFile { pub comment: Option, pub delta_z: Option, pub timeinterval: Option, - pub extra_tags: Option> + pub extra_tags: Vec, + pub extra_tags_frame: HashMap> } impl Drop for IJTiffFile { @@ -344,9 +357,10 @@ impl IJTiffFile { } else { (1, shape.0 * shape.1 * shape.2) }; - Ok(IJTiffFile { file, frames: HashMap::new(), hashes: HashMap::new(), shape, n_frames, + Ok(IJTiffFile { file, frames: HashMap::new(), hashes: HashMap::new(), + threads: HashMap::new(), shape, n_frames, samples_per_pixel: spp, colormap: None, colors: None, comment: None, delta_z: None, - timeinterval: None, extra_tags: None } ) + timeinterval: None, extra_tags: Vec::new(), extra_tags_frame: HashMap::new() } ) } pub fn description(&self) -> String { @@ -418,39 +432,64 @@ impl IJTiffFile { } } - pub fn save(&mut self, frame: Array2, c: usize, z: usize, t: usize, - extra_tags: Option>) -> Result<()> { - self.compress_frame(frame.reversed_axes(), c, z, t, extra_tags)?; + pub fn save(&mut self, frame: Array2, c: usize, z: usize, t: usize, + extra_tags: Option>) -> Result<()> + where T: Bytes + Clone + Send + Sync + Zero + 'static { + let key = self.get_frame_number(c, z, t); + if let Some(extra_tags) = extra_tags { + if let Some(extra_tags_frame) = self.extra_tags_frame.get_mut(&key.0) { + extra_tags_frame.extend(extra_tags); + } else { + self.extra_tags_frame.insert(key.0, extra_tags); + } + } + self.compress_frame(frame.reversed_axes(), key)?; Ok(()) } - fn compress_frame(&mut self, frame: Array2, - c: usize, z: usize, t: usize, - extra_tags: Option>) -> Result<()> { - let image_width = frame.shape()[0] as u32; - let image_length = frame.shape()[1] as u32; - let tile_size = 2usize.pow(((image_width as f64 * image_length as f64 / 64f64).log2() / 2f64).round() as u32).max(16).min(1024); - let mut tileoffsets = Vec::new(); - let mut tilebytecounts = Vec::new(); - let tiles = IJTiffFile::tile(frame.reversed_axes(), tile_size); - let byte_tiles: Vec> = tiles.into_iter().map( - |tile| tile.map(|x| x.bytes()).into_iter().flatten().collect() - ).collect(); - for tile in byte_tiles.into_par_iter().map(|x| encode_all(&*x, 3)).collect::>() { - if let Ok(bytes) = tile { - tilebytecounts.push(bytes.len() as u64); - tileoffsets.push(self.write(&bytes)?); + fn compress_frame(&mut self, frame: Array2, key: (usize, u8)) -> Result<()> + where T: Bytes + Clone + Zero + Send + 'static { + fn compress(frame: Array2) -> CompressedFrame + where T: Bytes + Clone + Zero { + let image_width = frame.shape()[0] as u32; + let image_length = frame.shape()[1] as u32; + let tile_size = 2usize.pow(((image_width as f64 * image_length as f64 / 64f64 + ).log2() / 2f64).round() as u32).max(16).min(1024); + let tiles = IJTiffFile::tile(frame.reversed_axes(), tile_size); + let byte_tiles: Vec> = tiles.into_iter().map( + |tile| tile.map(|x| x.bytes()).into_iter().flatten().collect() + ).collect(); + let bytes = byte_tiles.into_par_iter().map(|x| encode_all(&*x, 3).unwrap()).collect::>(); + CompressedFrame { bytes, image_width, image_length, tile_size, + bits_per_sample: T::BITS_PER_SAMPLE, sample_format: T::SAMPLE_FORMAT } + } + self.threads.insert(key, thread::spawn(move || compress(frame))); + for key in self.threads.keys().cloned().collect::>() { + if self.threads[&key].is_finished() { + } } - let mut frame = Frame::new(tileoffsets, tilebytecounts, image_width, image_length, - T::BITS_PER_SAMPLE, T::SAMPLE_FORMAT, tile_size as u16, tile_size as u16); - if let Some(tags) = extra_tags { - for tag in tags { - frame.extra_tags.push(tag); + for key in self.threads.keys().cloned().collect::>() { + if self.threads[&key].is_finished() { + if let Some(thread) = self.threads.remove(&key) { + self.write_frame(thread.join().unwrap(), key)?; + } } } - self.frames.insert(self.get_frame_number(c, z, t), frame); + Ok(()) + } + + fn write_frame(&mut self, frame: CompressedFrame, key: (usize, u8)) -> Result<()> { + let mut tileoffsets = Vec::new(); + let mut tilebytecounts = Vec::new(); + for tile in frame.bytes { + tilebytecounts.push(tile.len() as u64); + tileoffsets.push(self.write(&tile)?); + } + let frame = Frame::new(tileoffsets, tilebytecounts, frame.image_width, frame.image_length, + frame.bits_per_sample, frame.sample_format, frame.tile_size as u16, frame.tile_size as u16); + self.frames.insert(key, frame); Ok(()) } @@ -499,6 +538,11 @@ impl IJTiffFile { } fn close(&mut self) -> Result<()> { + for key in self.threads.keys().cloned().collect::>() { + if let Some(thread) = self.threads.remove(&key) { + self.write_frame(thread.join().unwrap(), key)?; + } + } let mut where_to_write_next_ifd_offset = OFFSET - OFFSET_SIZE as u64; let mut warn = false; for frame_number in 0..self.n_frames { @@ -547,10 +591,10 @@ impl IJTiffFile { ifd.push_tag(Tag::short(284, vec![2])) } } - ifd.extend_tags(frame.extra_tags.to_owned()); - if let Some(extra_tags) = &self.extra_tags { - ifd.extend_tags(extra_tags.to_owned()); + if self.extra_tags_frame.contains_key(&frame_number) { + ifd.extend_tags(self.extra_tags_frame[&frame_number].to_owned()); } + ifd.extend_tags(self.extra_tags.to_owned()); ifd.push_tag(Tag::ascii(306, &format!("{}", Utc::now().format("%Y:%m:%d %H:%M:%S")))); where_to_write_next_ifd_offset = ifd.write(self, where_to_write_next_ifd_offset)?; } else { diff --git a/src/py.rs b/src/py.rs index 5da17e5..3c91f4b 100644 --- a/src/py.rs +++ b/src/py.rs @@ -144,21 +144,13 @@ impl PyIJTiffFile { fn append_extra_tag(&mut self, tag: PyTag) { if let Some(ijtifffile) = self.ijtifffile.as_mut() { - if let Some(extra_tags) = ijtifffile.extra_tags.as_mut() { - extra_tags.push(tag.tag); - } else { - ijtifffile.extra_tags = Some(vec![tag.tag]); - } + ijtifffile.extra_tags.push(tag.tag); } } fn extend_extra_tags(&mut self, tags: Vec) { if let Some(ijtifffile) = self.ijtifffile.as_mut() { - if let Some(extra_tags) = ijtifffile.extra_tags.as_mut() { - extra_tags.extend(tags.into_iter().map(|x| x.tag)); - } else { - ijtifffile.extra_tags = Some(tags.into_iter().map(|x| x.tag).collect()); - } + ijtifffile.extra_tags.extend(tags.into_iter().map(|x| x.tag)); } }