- some more threading

This commit is contained in:
Wim Pomp
2024-10-09 20:30:45 +02:00
parent 984df9441a
commit 7678585bba
3 changed files with 81 additions and 44 deletions

1
.gitignore vendored
View File

@@ -7,3 +7,4 @@
/target/ /target/
/Cargo.lock /Cargo.lock
/foo.tif /foo.tif
*.so

View File

@@ -1,4 +1,4 @@
#[cfg(not(feature = "nopython"))] // #[cfg(not(feature = "nopython"))]
mod py; mod py;
use std::cmp::Ordering; use std::cmp::Ordering;
@@ -10,6 +10,8 @@ use num::{Complex, Rational32, Zero};
use ndarray::{s, Array2}; use ndarray::{s, Array2};
use num::traits::ToBytes; use num::traits::ToBytes;
use std::hash::{DefaultHasher, Hash, Hasher}; use std::hash::{DefaultHasher, Hash, Hasher};
use std::thread;
use std::thread::JoinHandle;
use chrono::Utc; use chrono::Utc;
use zstd::stream::encode_all; use zstd::stream::encode_all;
use rayon::prelude::*; use rayon::prelude::*;
@@ -235,6 +237,16 @@ impl Tag {
} }
struct CompressedFrame {
bytes: Vec<Vec<u8>>,
image_width: u32,
image_length: u32,
tile_size: usize,
bits_per_sample: u16,
sample_format: u16
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct Frame { struct Frame {
tileoffsets: Vec<u64>, tileoffsets: Vec<u64>,
@@ -245,7 +257,6 @@ struct Frame {
sample_format: u16, sample_format: u16,
tile_width: u16, tile_width: u16,
tile_length: u16, tile_length: u16,
extra_tags: Vec<Tag>
} }
impl Frame { impl Frame {
@@ -255,7 +266,7 @@ impl Frame {
) -> Self { ) -> Self {
Frame { Frame {
tileoffsets, tilebytecounts, image_width, image_length, bits_per_sample, tileoffsets, tilebytecounts, image_width, image_length, bits_per_sample,
sample_format, tile_width, tile_length, extra_tags: Vec::new() sample_format, tile_width, tile_length
} }
} }
} }
@@ -310,6 +321,7 @@ pub struct IJTiffFile {
file: File, file: File,
frames: HashMap<(usize, u8), Frame>, frames: HashMap<(usize, u8), Frame>,
hashes: HashMap<u64, u64>, hashes: HashMap<u64, u64>,
threads: HashMap<(usize, u8), JoinHandle<CompressedFrame>>,
pub shape: (usize, usize, usize), pub shape: (usize, usize, usize),
pub n_frames: usize, pub n_frames: usize,
pub samples_per_pixel: u8, pub samples_per_pixel: u8,
@@ -318,7 +330,8 @@ pub struct IJTiffFile {
pub comment: Option<String>, pub comment: Option<String>,
pub delta_z: Option<f64>, pub delta_z: Option<f64>,
pub timeinterval: Option<f64>, pub timeinterval: Option<f64>,
pub extra_tags: Option<Vec<Tag>> pub extra_tags: Vec<Tag>,
pub extra_tags_frame: HashMap<usize, Vec<Tag>>
} }
impl Drop for IJTiffFile { impl Drop for IJTiffFile {
@@ -344,9 +357,10 @@ impl IJTiffFile {
} else { } else {
(1, shape.0 * shape.1 * shape.2) (1, shape.0 * shape.1 * shape.2)
}; };
Ok(IJTiffFile { file, frames: HashMap::new(), hashes: HashMap::new(), shape, n_frames, Ok(IJTiffFile { file, frames: HashMap::new(), hashes: HashMap::new(),
threads: HashMap::new(), shape, n_frames,
samples_per_pixel: spp, colormap: None, colors: None, comment: None, delta_z: None, samples_per_pixel: spp, colormap: None, colors: None, comment: None, delta_z: None,
timeinterval: None, extra_tags: None } ) timeinterval: None, extra_tags: Vec::new(), extra_tags_frame: HashMap::new() } )
} }
pub fn description(&self) -> String { pub fn description(&self) -> String {
@@ -418,39 +432,64 @@ impl IJTiffFile {
} }
} }
pub fn save<T: Bytes + Clone + Zero>(&mut self, frame: Array2<T>, c: usize, z: usize, t: usize, pub fn save<T>(&mut self, frame: Array2<T>, c: usize, z: usize, t: usize,
extra_tags: Option<Vec<Tag>>) -> Result<()> { extra_tags: Option<Vec<Tag>>) -> Result<()>
self.compress_frame(frame.reversed_axes(), c, z, t, extra_tags)?; where T: Bytes + Clone + Send + Sync + Zero + 'static {
let key = self.get_frame_number(c, z, t);
if let Some(extra_tags) = extra_tags {
if let Some(extra_tags_frame) = self.extra_tags_frame.get_mut(&key.0) {
extra_tags_frame.extend(extra_tags);
} else {
self.extra_tags_frame.insert(key.0, extra_tags);
}
}
self.compress_frame(frame.reversed_axes(), key)?;
Ok(()) Ok(())
} }
fn compress_frame<T: Bytes + Clone + Zero>(&mut self, frame: Array2<T>, fn compress_frame<T>(&mut self, frame: Array2<T>, key: (usize, u8)) -> Result<()>
c: usize, z: usize, t: usize, where T: Bytes + Clone + Zero + Send + 'static {
extra_tags: Option<Vec<Tag>>) -> Result<()> { fn compress<T>(frame: Array2<T>) -> CompressedFrame
where T: Bytes + Clone + Zero {
let image_width = frame.shape()[0] as u32; let image_width = frame.shape()[0] as u32;
let image_length = frame.shape()[1] as u32; let image_length = frame.shape()[1] as u32;
let tile_size = 2usize.pow(((image_width as f64 * image_length as f64 / 64f64).log2() / 2f64).round() as u32).max(16).min(1024); let tile_size = 2usize.pow(((image_width as f64 * image_length as f64 / 64f64
let mut tileoffsets = Vec::new(); ).log2() / 2f64).round() as u32).max(16).min(1024);
let mut tilebytecounts = Vec::new();
let tiles = IJTiffFile::tile(frame.reversed_axes(), tile_size); let tiles = IJTiffFile::tile(frame.reversed_axes(), tile_size);
let byte_tiles: Vec<Vec<u8>> = tiles.into_iter().map( let byte_tiles: Vec<Vec<u8>> = tiles.into_iter().map(
|tile| tile.map(|x| x.bytes()).into_iter().flatten().collect() |tile| tile.map(|x| x.bytes()).into_iter().flatten().collect()
).collect(); ).collect();
for tile in byte_tiles.into_par_iter().map(|x| encode_all(&*x, 3)).collect::<Vec<_>>() { let bytes = byte_tiles.into_par_iter().map(|x| encode_all(&*x, 3).unwrap()).collect::<Vec<_>>();
if let Ok(bytes) = tile { CompressedFrame { bytes, image_width, image_length, tile_size,
tilebytecounts.push(bytes.len() as u64); bits_per_sample: T::BITS_PER_SAMPLE, sample_format: T::SAMPLE_FORMAT }
tileoffsets.push(self.write(&bytes)?); }
self.threads.insert(key, thread::spawn(move || compress(frame)));
for key in self.threads.keys().cloned().collect::<Vec<(usize, u8)>>() {
if self.threads[&key].is_finished() {
} }
} }
let mut frame = Frame::new(tileoffsets, tilebytecounts, image_width, image_length, for key in self.threads.keys().cloned().collect::<Vec<(usize, u8)>>() {
T::BITS_PER_SAMPLE, T::SAMPLE_FORMAT, tile_size as u16, tile_size as u16); if self.threads[&key].is_finished() {
if let Some(tags) = extra_tags { if let Some(thread) = self.threads.remove(&key) {
for tag in tags { self.write_frame(thread.join().unwrap(), key)?;
frame.extra_tags.push(tag);
} }
} }
self.frames.insert(self.get_frame_number(c, z, t), frame); }
Ok(())
}
fn write_frame(&mut self, frame: CompressedFrame, key: (usize, u8)) -> Result<()> {
let mut tileoffsets = Vec::new();
let mut tilebytecounts = Vec::new();
for tile in frame.bytes {
tilebytecounts.push(tile.len() as u64);
tileoffsets.push(self.write(&tile)?);
}
let frame = Frame::new(tileoffsets, tilebytecounts, frame.image_width, frame.image_length,
frame.bits_per_sample, frame.sample_format, frame.tile_size as u16, frame.tile_size as u16);
self.frames.insert(key, frame);
Ok(()) Ok(())
} }
@@ -499,6 +538,11 @@ impl IJTiffFile {
} }
fn close(&mut self) -> Result<()> { fn close(&mut self) -> Result<()> {
for key in self.threads.keys().cloned().collect::<Vec<(usize, u8)>>() {
if let Some(thread) = self.threads.remove(&key) {
self.write_frame(thread.join().unwrap(), key)?;
}
}
let mut where_to_write_next_ifd_offset = OFFSET - OFFSET_SIZE as u64; let mut where_to_write_next_ifd_offset = OFFSET - OFFSET_SIZE as u64;
let mut warn = false; let mut warn = false;
for frame_number in 0..self.n_frames { for frame_number in 0..self.n_frames {
@@ -547,10 +591,10 @@ impl IJTiffFile {
ifd.push_tag(Tag::short(284, vec![2])) ifd.push_tag(Tag::short(284, vec![2]))
} }
} }
ifd.extend_tags(frame.extra_tags.to_owned()); if self.extra_tags_frame.contains_key(&frame_number) {
if let Some(extra_tags) = &self.extra_tags { ifd.extend_tags(self.extra_tags_frame[&frame_number].to_owned());
ifd.extend_tags(extra_tags.to_owned());
} }
ifd.extend_tags(self.extra_tags.to_owned());
ifd.push_tag(Tag::ascii(306, &format!("{}", Utc::now().format("%Y:%m:%d %H:%M:%S")))); ifd.push_tag(Tag::ascii(306, &format!("{}", Utc::now().format("%Y:%m:%d %H:%M:%S"))));
where_to_write_next_ifd_offset = ifd.write(self, where_to_write_next_ifd_offset)?; where_to_write_next_ifd_offset = ifd.write(self, where_to_write_next_ifd_offset)?;
} else { } else {

View File

@@ -144,21 +144,13 @@ impl PyIJTiffFile {
fn append_extra_tag(&mut self, tag: PyTag) { fn append_extra_tag(&mut self, tag: PyTag) {
if let Some(ijtifffile) = self.ijtifffile.as_mut() { if let Some(ijtifffile) = self.ijtifffile.as_mut() {
if let Some(extra_tags) = ijtifffile.extra_tags.as_mut() { ijtifffile.extra_tags.push(tag.tag);
extra_tags.push(tag.tag);
} else {
ijtifffile.extra_tags = Some(vec![tag.tag]);
}
} }
} }
fn extend_extra_tags(&mut self, tags: Vec<PyTag>) { fn extend_extra_tags(&mut self, tags: Vec<PyTag>) {
if let Some(ijtifffile) = self.ijtifffile.as_mut() { if let Some(ijtifffile) = self.ijtifffile.as_mut() {
if let Some(extra_tags) = ijtifffile.extra_tags.as_mut() { ijtifffile.extra_tags.extend(tags.into_iter().map(|x| x.tag));
extra_tags.extend(tags.into_iter().map(|x| x.tag));
} else {
ijtifffile.extra_tags = Some(tags.into_iter().map(|x| x.tag).collect());
}
} }
} }