- multiprocessing options

This commit is contained in:
Wim Pomp
2022-10-12 17:01:29 +02:00
parent 1e0c897e4c
commit 86e4b28499
2 changed files with 13 additions and 4 deletions

View File

@@ -5,7 +5,7 @@ with open('README.md', 'r') as fh:
setuptools.setup(
name='tiffwrite',
version='2022.10.0',
version='2022.10.1',
author='Wim Pomp @ Lenstra lab NKI',
author_email='w.pomp@nki.nl',
description='Parallel tiff writer compatible with ImageJ.',

View File

@@ -331,7 +331,7 @@ class IJTiffFile:
wp@tl20200214
"""
def __init__(self, path, shape, dtype='uint16', colors=None, colormap=None, pxsize=None, deltaz=None,
timeinterval=None, compression=(8, 9), comment=None, **extratags):
timeinterval=None, compression=(8, 9), comment=None, processes=None, **extratags):
assert len(shape) >= 3, 'please specify all c, z, t for the shape'
assert len(shape) <= 3, 'please specify only c, z, t for the shape'
assert np.dtype(dtype).char in 'BbHhf', 'datatype not supported'
@@ -366,13 +366,16 @@ class IJTiffFile:
self.frames_added = []
self.frames_written = []
self.main_pid = multiprocessing.current_process().pid
self.pool_manager = PoolManager(self)
self.pool_manager = PoolManager(self, processes)
with self.fh.lock() as fh:
self.header.write(fh)
def __hash__(self):
return hash(self.path)
def update(self):
""" To be overloaded, is called when a frame has been written. """
def get_frame_number(self, n):
if self.colormap is None and self.colors is None:
return n[1] + n[2] * self.shape[1], n[0]
@@ -634,6 +637,7 @@ class PoolManager:
self.tifs[file].ifds[framenr] = ifd
self.tifs[file].strips[(framenr, channel)] = strip
self.tifs[file].frames_written.append(n)
self.tifs[file].update()
def add_frame(self, *args):
if not self.is_alive:
@@ -644,7 +648,12 @@ class PoolManager:
def start_pool(self):
self.is_alive = True
nframes = sum([np.prod(tif.shape) for tif in self.tifs.values()])
self.processes = self.processes or max(2, min(multiprocessing.cpu_count() // 6, nframes))
if self.processes is None:
self.processes = max(2, min(multiprocessing.cpu_count() // 6, nframes))
elif self.processes == 'all':
self.processes = max(2, min(multiprocessing.cpu_count(), nframes))
else:
self.processes = self.processes
self.queue = multiprocessing.Queue(10 * self.processes)
self.ifd_queue = multiprocessing.Queue(10 * self.processes)
self.error_queue = multiprocessing.Queue()