- Use spawn in stead of fork so that any jvm will not exist in any child processes and block them from stopping.
- Use poetry for install.
This commit is contained in:
23
.github/workflows/pytest.yml
vendored
Normal file
23
.github/workflows/pytest.yml
vendored
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
name: PyTest
|
||||||
|
|
||||||
|
on: workflow_call
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
pytest:
|
||||||
|
runs-on: ${{ matrix.os }}
|
||||||
|
strategy:
|
||||||
|
matrix:
|
||||||
|
python-version: ["3.8", "3.10"]
|
||||||
|
os: [ubuntu-20.04, windows-2019, macOS-11]
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v3
|
||||||
|
- name: Set up Python ${{ matrix.python-version }}
|
||||||
|
uses: actions/setup-python@v4
|
||||||
|
with:
|
||||||
|
python-version: ${{ matrix.python-version }}
|
||||||
|
- name: Install dependencies
|
||||||
|
run: |
|
||||||
|
pip install .[test]
|
||||||
|
- name: Test with pytest
|
||||||
|
run: pytest
|
||||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -2,3 +2,4 @@
|
|||||||
/dist/
|
/dist/
|
||||||
/.idea/
|
/.idea/
|
||||||
/tiffwrite.egg-info/
|
/tiffwrite.egg-info/
|
||||||
|
/.pytest_cache/
|
||||||
|
|||||||
29
pyproject.toml
Normal file
29
pyproject.toml
Normal file
@@ -0,0 +1,29 @@
|
|||||||
|
[tool.poetry]
|
||||||
|
name = "tiffwrite"
|
||||||
|
version = "2023.3.0"
|
||||||
|
description = "Parallel tiff writer compatible with ImageJ."
|
||||||
|
authors = ["Wim Pomp, Lenstra lab NKI <w.pomp@nki.nl>"]
|
||||||
|
license = "GPLv3"
|
||||||
|
readme = "README.md"
|
||||||
|
packages = [{include = "tiffwrite"}]
|
||||||
|
repository = "https://github.com/wimpomp/tiffwrite"
|
||||||
|
classifiers = [
|
||||||
|
"Programming Language :: Python :: 3",
|
||||||
|
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
|
||||||
|
"Operating System :: OS Independent",
|
||||||
|
]
|
||||||
|
|
||||||
|
[tool.poetry.dependencies]
|
||||||
|
python = "^3.7"
|
||||||
|
tifffile = "*"
|
||||||
|
numpy = "*"
|
||||||
|
tqdm = "*"
|
||||||
|
colorcet = "*"
|
||||||
|
matplotlib = "*"
|
||||||
|
|
||||||
|
[tool.poetry.group.test.dependencies]
|
||||||
|
pytest = "*"
|
||||||
|
|
||||||
|
[build-system]
|
||||||
|
requires = ["poetry-core"]
|
||||||
|
build-backend = "poetry.core.masonry.api"
|
||||||
23
setup.py
23
setup.py
@@ -1,23 +0,0 @@
|
|||||||
import setuptools
|
|
||||||
|
|
||||||
with open('README.md', 'r') as fh:
|
|
||||||
long_description = fh.read()
|
|
||||||
|
|
||||||
setuptools.setup(
|
|
||||||
name='tiffwrite',
|
|
||||||
version='2022.10.2',
|
|
||||||
author='Wim Pomp @ Lenstra lab NKI',
|
|
||||||
author_email='w.pomp@nki.nl',
|
|
||||||
description='Parallel tiff writer compatible with ImageJ.',
|
|
||||||
long_description=long_description,
|
|
||||||
long_description_content_type='text/markdown',
|
|
||||||
url='https://github.com/wimpomp/tiffwrite',
|
|
||||||
packages=setuptools.find_packages(),
|
|
||||||
classifiers=[
|
|
||||||
'Programming Language :: Python :: 3',
|
|
||||||
'License :: OSI Approved :: GNU General Public License v3 (GPLv3)',
|
|
||||||
'Operating System :: OS Independent',
|
|
||||||
],
|
|
||||||
python_requires='>=3.7',
|
|
||||||
install_requires=['tifffile', 'numpy', 'tqdm', 'colorcet', 'matplotlib'],
|
|
||||||
)
|
|
||||||
16
tests/test_multiple.py
Normal file
16
tests/test_multiple.py
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
import numpy as np
|
||||||
|
from tiffwrite import IJTiffFile
|
||||||
|
from itertools import product
|
||||||
|
from contextlib import ExitStack
|
||||||
|
from tqdm.auto import tqdm
|
||||||
|
|
||||||
|
|
||||||
|
def test_mult(tmp_path):
|
||||||
|
shape = (3, 5, 12)
|
||||||
|
paths = [tmp_path / f'test{i}.tif' for i in range(8)]
|
||||||
|
with ExitStack() as stack:
|
||||||
|
tifs = [stack.enter_context(IJTiffFile(path, shape)) for path in paths]
|
||||||
|
for c, z, t in tqdm(product(range(shape[0]), range(shape[1]), range(shape[2])), total=np.prod(shape)):
|
||||||
|
for tif in tifs:
|
||||||
|
tif.save(np.random.randint(0, 255, (1024, 1024)), c, z, t)
|
||||||
|
assert all([path.exists() for path in paths])
|
||||||
@@ -1,14 +1,11 @@
|
|||||||
#!/usr/bin/python
|
|
||||||
import tiffwrite
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
from tiffwrite import IJTiffFile
|
||||||
from itertools import product
|
from itertools import product
|
||||||
|
|
||||||
|
|
||||||
def test():
|
def test_single(tmp_path):
|
||||||
with tiffwrite.IJTiffWriter('test.tif', (3, 4, 5)) as tif:
|
path = tmp_path / 'test.tif'
|
||||||
|
with IJTiffFile(path, (3, 4, 5)) as tif:
|
||||||
for c, z, t in product(range(3), range(4), range(5)):
|
for c, z, t in product(range(3), range(4), range(5)):
|
||||||
tif.save(np.random.randint(0, 255, (64, 64)), c, z, t)
|
tif.save(np.random.randint(0, 255, (64, 64)), c, z, t)
|
||||||
|
assert path.exists()
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
test()
|
|
||||||
@@ -4,6 +4,7 @@ import colorcet
|
|||||||
import struct
|
import struct
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import multiprocessing
|
import multiprocessing
|
||||||
|
from multiprocessing import queues
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
from tqdm.auto import tqdm
|
from tqdm.auto import tqdm
|
||||||
from itertools import product
|
from itertools import product
|
||||||
@@ -16,8 +17,16 @@ from datetime import datetime
|
|||||||
from matplotlib import colors as mpl_colors
|
from matplotlib import colors as mpl_colors
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
from warnings import warn
|
from warnings import warn
|
||||||
|
from importlib.metadata import version
|
||||||
|
|
||||||
__all__ = ['IJTiffFile', 'Tag', 'tiffwrite']
|
|
||||||
|
__all__ = ["IJTiffFile", "Tag", "tiffwrite"]
|
||||||
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
__version__ = version("tiffwrite")
|
||||||
|
except Exception:
|
||||||
|
__version__ = "unknown"
|
||||||
|
|
||||||
|
|
||||||
def tiffwrite(file, data, axes='TZCXY', dtype=None, bar=False, *args, **kwargs):
|
def tiffwrite(file, data, axes='TZCXY', dtype=None, bar=False, *args, **kwargs):
|
||||||
@@ -357,16 +366,15 @@ class IJTiffFile:
|
|||||||
self.spp = self.shape[0] if self.colormap is None and self.colors is None else 1 # samples/pixel
|
self.spp = self.shape[0] if self.colormap is None and self.colors is None else 1 # samples/pixel
|
||||||
self.nframes = np.prod(self.shape[1:]) if self.colormap is None and self.colors is None else np.prod(self.shape)
|
self.nframes = np.prod(self.shape[1:]) if self.colormap is None and self.colors is None else np.prod(self.shape)
|
||||||
self.offsets = {}
|
self.offsets = {}
|
||||||
self.fh = FileHandle(path)
|
|
||||||
manager = multiprocessing.Manager()
|
|
||||||
self.hashes = manager.dict()
|
|
||||||
self.strips = {}
|
self.strips = {}
|
||||||
self.ifds = {}
|
self.ifds = {}
|
||||||
self.frame_extra_tags = {}
|
self.frame_extra_tags = {}
|
||||||
self.frames_added = []
|
self.frames_added = []
|
||||||
self.frames_written = []
|
self.frames_written = []
|
||||||
self.main_pid = multiprocessing.current_process().pid
|
|
||||||
self.pool_manager = PoolManager(self, processes)
|
self.pool_manager = PoolManager(self, processes)
|
||||||
|
self.fh = FileHandle(path)
|
||||||
|
self.hashes = Manager().manager.dict()
|
||||||
|
self.main_pid = Manager().mp.current_process().pid
|
||||||
with self.fh.lock() as fh:
|
with self.fh.lock() as fh:
|
||||||
self.header.write(fh)
|
self.header.write(fh)
|
||||||
|
|
||||||
@@ -374,7 +382,7 @@ class IJTiffFile:
|
|||||||
return hash(self.path)
|
return hash(self.path)
|
||||||
|
|
||||||
def update(self):
|
def update(self):
|
||||||
""" To be overloaded, is called when a frame has been written. """
|
""" To be overloaded, will be called when a frame has been written. """
|
||||||
|
|
||||||
def get_frame_number(self, n):
|
def get_frame_number(self, n):
|
||||||
if self.colormap is None and self.colors is None:
|
if self.colormap is None and self.colors is None:
|
||||||
@@ -396,7 +404,8 @@ class IJTiffFile:
|
|||||||
assert all([0 <= i < s for i, s in zip((c, z, t), self.shape)]), \
|
assert all([0 <= i < s for i, s in zip((c, z, t), self.shape)]), \
|
||||||
'frame {} {} {} is outside shape {} {} {}'.format(c, z, t, *self.shape)
|
'frame {} {} {} is outside shape {} {} {}'.format(c, z, t, *self.shape)
|
||||||
self.frames_added.append((c, z, t))
|
self.frames_added.append((c, z, t))
|
||||||
self.pool_manager.add_frame(self.path, frame.astype(self.dtype), (c, z, t))
|
self.pool_manager.add_frame(self.path,
|
||||||
|
frame.astype(self.dtype) if isinstance(frame, np.ndarray) else frame, (c, z, t))
|
||||||
if extratags:
|
if extratags:
|
||||||
self.frame_extra_tags[(c, z, t)] = Tag.to_tags(extratags)
|
self.frame_extra_tags[(c, z, t)] = Tag.to_tags(extratags)
|
||||||
|
|
||||||
@@ -498,7 +507,7 @@ class IJTiffFile:
|
|||||||
dtype=int).T.flatten()]) for color in self.colors]
|
dtype=int).T.flatten()]) for color in self.colors]
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
if multiprocessing.current_process().pid == self.main_pid:
|
if Manager().mp.current_process().pid == self.main_pid:
|
||||||
self.pool_manager.close(self)
|
self.pool_manager.close(self)
|
||||||
with self.fh.lock() as fh:
|
with self.fh.lock() as fh:
|
||||||
if len(self.frames_added) == 0:
|
if len(self.frames_added) == 0:
|
||||||
@@ -547,6 +556,9 @@ class IJTiffFile:
|
|||||||
def __exit__(self, *args, **kwargs):
|
def __exit__(self, *args, **kwargs):
|
||||||
self.close()
|
self.close()
|
||||||
|
|
||||||
|
def clean(self, *args, **kwargs):
|
||||||
|
""" To be overloaded, will be called when the parallel pool is closing. """
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def hash_check(fh, bvalue, offset):
|
def hash_check(fh, bvalue, offset):
|
||||||
addr = fh.tell()
|
addr = fh.tell()
|
||||||
@@ -589,6 +601,20 @@ class IJTiffFile:
|
|||||||
return stripbytecounts, ifd, chunks
|
return stripbytecounts, ifd, chunks
|
||||||
|
|
||||||
|
|
||||||
|
class Manager:
|
||||||
|
instance = None
|
||||||
|
|
||||||
|
def __new__(cls, *args, **kwargs):
|
||||||
|
if cls.instance is None:
|
||||||
|
cls.instance = super().__new__(cls)
|
||||||
|
return cls.instance
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
if not hasattr(self, 'mp'):
|
||||||
|
self.mp = multiprocessing.get_context('spawn')
|
||||||
|
self.manager = self.mp.Manager()
|
||||||
|
|
||||||
|
|
||||||
class PoolManager:
|
class PoolManager:
|
||||||
instance = None
|
instance = None
|
||||||
|
|
||||||
@@ -644,20 +670,21 @@ class PoolManager:
|
|||||||
self.queue.put(args)
|
self.queue.put(args)
|
||||||
|
|
||||||
def start_pool(self):
|
def start_pool(self):
|
||||||
|
mp = Manager().mp
|
||||||
self.is_alive = True
|
self.is_alive = True
|
||||||
nframes = sum([np.prod(tif.shape) for tif in self.tifs.values()])
|
nframes = sum([np.prod(tif.shape) for tif in self.tifs.values()])
|
||||||
if self.processes is None:
|
if self.processes is None:
|
||||||
self.processes = max(2, min(multiprocessing.cpu_count() // 6, nframes))
|
self.processes = max(2, min(mp.cpu_count() // 6, nframes))
|
||||||
elif self.processes == 'all':
|
elif self.processes == 'all':
|
||||||
self.processes = max(2, min(multiprocessing.cpu_count(), nframes))
|
self.processes = max(2, min(mp.cpu_count(), nframes))
|
||||||
else:
|
else:
|
||||||
self.processes = self.processes
|
self.processes = self.processes
|
||||||
self.queue = multiprocessing.Queue(10 * self.processes)
|
self.queue = mp.Queue(10 * self.processes)
|
||||||
self.ifd_queue = multiprocessing.Queue(10 * self.processes)
|
self.ifd_queue = mp.Queue(10 * self.processes)
|
||||||
self.error_queue = multiprocessing.Queue()
|
self.error_queue = mp.Queue()
|
||||||
self.offsets_queue = multiprocessing.Queue()
|
self.offsets_queue = mp.Queue()
|
||||||
self.done = multiprocessing.Event()
|
self.done = mp.Event()
|
||||||
self.pool = multiprocessing.Pool(self.processes, self.run)
|
self.pool = mp.Pool(self.processes, self.run)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
""" Only this is run in parallel processes. """
|
""" Only this is run in parallel processes. """
|
||||||
@@ -666,22 +693,25 @@ class PoolManager:
|
|||||||
try:
|
try:
|
||||||
file, frame, n = self.queue.get(True, 0.02)
|
file, frame, n = self.queue.get(True, 0.02)
|
||||||
self.ifd_queue.put((file, n, *self.tifs[file].compress_frame(frame)))
|
self.ifd_queue.put((file, n, *self.tifs[file].compress_frame(frame)))
|
||||||
except multiprocessing.queues.Empty:
|
except queues.Empty:
|
||||||
continue
|
continue
|
||||||
except Exception:
|
except Exception:
|
||||||
print_exc()
|
print_exc()
|
||||||
self.error_queue.put(format_exc())
|
self.error_queue.put(format_exc())
|
||||||
|
finally:
|
||||||
|
for tif in self.tifs.values():
|
||||||
|
tif.clean()
|
||||||
|
|
||||||
|
|
||||||
class FileHandle:
|
class FileHandle:
|
||||||
""" Process safe file handle """
|
""" Process safe file handle """
|
||||||
def __init__(self, name):
|
def __init__(self, name):
|
||||||
|
manager = Manager().manager
|
||||||
if os.path.exists(name):
|
if os.path.exists(name):
|
||||||
os.remove(name)
|
os.remove(name)
|
||||||
with open(name, 'xb'):
|
with open(name, 'xb'):
|
||||||
pass
|
pass
|
||||||
self.name = name
|
self.name = name
|
||||||
manager = multiprocessing.Manager()
|
|
||||||
self._lock = manager.RLock()
|
self._lock = manager.RLock()
|
||||||
self._pos = manager.Value('i', 0)
|
self._pos = manager.Value('i', 0)
|
||||||
|
|
||||||
@@ -698,56 +728,3 @@ class FileHandle:
|
|||||||
self._pos.value = f.tell()
|
self._pos.value = f.tell()
|
||||||
f.close()
|
f.close()
|
||||||
self._lock.release()
|
self._lock.release()
|
||||||
|
|
||||||
|
|
||||||
class IJTiffWriter:
|
|
||||||
def __init__(self, file, shape, dtype='uint16', colormap=None, nP=None, extratags=None, pxsize=None):
|
|
||||||
warn('IJTiffWriter is deprecated and will be removed in a future version, use IJTiffFile instead',
|
|
||||||
DeprecationWarning)
|
|
||||||
files = [file] if isinstance(file, str) else file
|
|
||||||
shapes = [shape] if isinstance(shape[0], Number) else shape # CZT
|
|
||||||
dtypes = [np.dtype(dtype)] if isinstance(dtype, (str, np.dtype)) else [np.dtype(d) for d in dtype]
|
|
||||||
colormaps = [colormap] if colormap is None or isinstance(colormap, str) else colormap
|
|
||||||
extratagss = [extratags] if extratags is None or isinstance(extratags, dict) else extratags
|
|
||||||
pxsizes = [pxsize] if pxsize is None or isinstance(pxsize, Number) else pxsize
|
|
||||||
|
|
||||||
nFiles = len(files)
|
|
||||||
if not len(shapes) == nFiles:
|
|
||||||
shapes *= nFiles
|
|
||||||
if not len(dtypes) == nFiles:
|
|
||||||
dtypes *= nFiles
|
|
||||||
if not len(colormaps) == nFiles:
|
|
||||||
colormaps *= nFiles
|
|
||||||
if not len(extratagss) == nFiles:
|
|
||||||
extratagss *= nFiles
|
|
||||||
if not len(pxsizes) == nFiles:
|
|
||||||
pxsizes *= nFiles
|
|
||||||
|
|
||||||
self.files = {file: IJTiffFile(file, shape, dtype, None, colormap, pxsize, **(extratags or {}))
|
|
||||||
for file, shape, dtype, colormap, pxsize, extratags
|
|
||||||
in zip(files, shapes, dtypes, colormaps, pxsizes, extratagss)}
|
|
||||||
|
|
||||||
assert np.all([len(s) == 3 for s in shapes]), 'please specify all c, z, t for the shape'
|
|
||||||
assert np.all([d.char in 'BbHhf' for d in dtypes]), 'datatype not supported'
|
|
||||||
|
|
||||||
def __enter__(self):
|
|
||||||
return self
|
|
||||||
|
|
||||||
def __exit__(self, *args, **kwargs):
|
|
||||||
self.close()
|
|
||||||
|
|
||||||
def save(self, file, frame, *n):
|
|
||||||
if isinstance(file, np.ndarray): # save to first/only file
|
|
||||||
n = (frame, *n)
|
|
||||||
frame = file
|
|
||||||
file = next(iter(self.files.keys()))
|
|
||||||
elif isinstance(file, Number):
|
|
||||||
file = list(self.files.keys())[int(file)]
|
|
||||||
self.files[file].save(frame, *n)
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
for file in self.files.values():
|
|
||||||
try:
|
|
||||||
file.close()
|
|
||||||
except Exception:
|
|
||||||
print_exc()
|
|
||||||
|
|||||||
Reference in New Issue
Block a user