From 3652947817ecd26562c7dbe584de66148d956862 Mon Sep 17 00:00:00 2001 From: Wim Pomp Date: Wed, 27 Mar 2024 16:03:34 +0100 Subject: [PATCH] - Tiffwrite is now fully typed. --- .github/workflows/mypy.yml | 21 +++ .github/workflows/pytest.yml | 2 +- README.md | 1 + pyproject.toml | 16 +- tests/test_multiple.py | 6 +- tests/test_single.py | 4 +- tiffwrite/__init__.py | 293 +++++++++++++++++++---------------- 7 files changed, 198 insertions(+), 145 deletions(-) create mode 100644 .github/workflows/mypy.yml diff --git a/.github/workflows/mypy.yml b/.github/workflows/mypy.yml new file mode 100644 index 0000000..40beeaa --- /dev/null +++ b/.github/workflows/mypy.yml @@ -0,0 +1,21 @@ +name: MyPy + +on: [push, pull_request] + +jobs: + mypy: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.10", "3.12"] + + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: pip install mypy lxml geojson maturin python-dateutil types-python-dateutil lxml-stubs + - name: Test with mypy + run: mypy \ No newline at end of file diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 52c5485..59903e7 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -7,7 +7,7 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - python-version: ["3.10"] + python-version: ["3.10", "3.12"] os: [ubuntu-20.04, windows-2019, macOS-11] steps: diff --git a/README.md b/README.md index b4e75ff..752ffac 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,4 @@ +[![mypy](https://github.com/wimpomp/tiffwrite/actions/workflows/mypy.yml/badge.svg)](https://github.com/wimpomp/tiffwrite/actions/workflows/mypy.yml) [![pytest](https://github.com/wimpomp/tiffwrite/actions/workflows/pytest.yml/badge.svg)](https://github.com/wimpomp/tiffwrite/actions/workflows/pytest.yml) # Tiffwrite diff --git a/pyproject.toml b/pyproject.toml index 69effbc..b093ec4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "tiffwrite" -version = "2024.2.2" +version = "2024.3.0" description = "Parallel tiff writer compatible with ImageJ." authors = ["Wim Pomp, Lenstra lab NKI "] license = "GPL-3.0-or-later" @@ -9,22 +9,30 @@ packages = [{include = "tiffwrite"}] repository = "https://github.com/wimpomp/tiffwrite" [tool.poetry.dependencies] -python = "^3.8" +python = "^3.10" tifffile = "*" imagecodecs = "*" numpy = "*" tqdm = "*" colorcet = "*" matplotlib = "*" -parfor = ">=2023.10.1" +parfor = ">=2024.3.0" pytest = { version = "*", optional = true } [tool.poetry.extras] -test = ["pytest"] +test = ["pytest", "mypy"] [tool.pytest.ini_options] filterwarnings = ["ignore:::(?!tiffwrite)"] +[tool.isort] +line_length = 119 + +[tool.mypy] +disable_error_code = ["import-untyped", "return"] +implicit_optional = true +exclude = ["build"] + [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" diff --git a/tests/test_multiple.py b/tests/test_multiple.py index ce769c1..37e7abb 100644 --- a/tests/test_multiple.py +++ b/tests/test_multiple.py @@ -1,12 +1,14 @@ from contextlib import ExitStack from itertools import product +from pathlib import Path import numpy as np -from tiffwrite import IJTiffFile from tqdm.auto import tqdm +from tiffwrite import IJTiffFile -def test_mult(tmp_path): + +def test_mult(tmp_path: Path) -> None: shape = (2, 3, 5) paths = [tmp_path / f'test{i}.tif' for i in range(6)] with ExitStack() as stack: diff --git a/tests/test_single.py b/tests/test_single.py index b703a33..2d47470 100644 --- a/tests/test_single.py +++ b/tests/test_single.py @@ -1,10 +1,12 @@ from itertools import product +from pathlib import Path import numpy as np + from tiffwrite import IJTiffFile -def test_single(tmp_path): +def test_single(tmp_path: Path) -> None: path = tmp_path / 'test.tif' with IJTiffFile(path, (3, 4, 5)) as tif: for c, z, t in product(range(3), range(4), range(5)): diff --git a/tiffwrite/__init__.py b/tiffwrite/__init__.py index a2df2cb..0e04d26 100755 --- a/tiffwrite/__init__.py +++ b/tiffwrite/__init__.py @@ -1,4 +1,5 @@ -import os +from __future__ import annotations + import struct import warnings from collections.abc import Iterable @@ -10,12 +11,14 @@ from hashlib import sha1 from importlib.metadata import version from io import BytesIO from itertools import product -from numbers import Number +from pathlib import Path +from typing import Any, BinaryIO, Callable, Generator, Literal, Optional, Sequence import colorcet import numpy as np import tifffile from matplotlib import colors as mpl_colors +from numpy.typing import DTypeLike from parfor import ParPool, PoolSingleton from tqdm.auto import tqdm @@ -28,7 +31,8 @@ except Exception: # noqa __version__ = "unknown" -def tiffwrite(file, data, axes='TZCXY', dtype=None, bar=False, *args, **kwargs): +def tiffwrite(file: str | Path, data: np.ndarray, axes: str = 'TZCXY', dtype: DTypeLike = None, bar: bool = False, + *args: Any, **kwargs: Any) -> None: """ file: string; filename of the new tiff file data: 2 to 5D numpy array axes: string; order of dimensions in data, default: TZCXY for 5D, ZCXY for 4D, CXY for 3D, XY for 2D data @@ -47,20 +51,36 @@ def tiffwrite(file, data, axes='TZCXY', dtype=None, bar=False, *args, **kwargs): data = np.expand_dims(data, axis) shape = data.shape[:3] - with IJTiffFile(file, shape, data.dtype if dtype is None else dtype, *args, **kwargs) as f: + with IJTiffFile(file, shape, data.dtype if dtype is None else dtype, *args, **kwargs) as f: # type: ignore at_least_one = False - for n in tqdm(product(*[range(i) for i in shape]), total=np.prod(shape), desc='Saving tiff', disable=not bar): # noqa - if np.any(data[n]) or not at_least_one: # noqa - f.save(data[n], *n) # noqa + for n in tqdm(product(*[range(i) for i in shape]), total=np.prod(shape), desc='Saving tiff', disable=not bar): + if np.any(data[n]) or not at_least_one: + f.save(data[n], *n) at_least_one = True class Header: - def __init__(self, *args): - if len(args) == 1: - fh = args[0] + def __init__(self, filehandle_or_byteorder: BinaryIO | Literal['>', '<'] | None = None, + bigtiff: bool = True) -> None: + if filehandle_or_byteorder is None or isinstance(filehandle_or_byteorder, str): + self.byteorder = filehandle_or_byteorder or '<' + self.bigtiff = bigtiff + if self.bigtiff: + self.tagsize = 20 + self.tagnoformat = 'Q' + self.offsetsize = 8 + self.offsetformat = 'Q' + self.offset = 16 + else: + self.tagsize = 12 + self.tagnoformat = 'H' + self.offsetsize = 4 + self.offsetformat = 'I' + self.offset = 8 + else: + fh = filehandle_or_byteorder fh.seek(0) - self.byteorder = {b'II': '<', b'MM': '>'}[fh.read(2)] + self.byteorder = '>' if fh.read(2) == b'MM' else '<' self.bigtiff = {42: False, 43: True}[struct.unpack(self.byteorder + 'H', fh.read(2))[0]] if self.bigtiff: self.tagsize = 20 @@ -75,22 +95,8 @@ class Header: self.offsetformat = 'I' self.offsetsize = 4 self.offset = struct.unpack(self.byteorder + self.offsetformat, fh.read(self.offsetsize))[0] - else: - self.byteorder, self.bigtiff = args if len(args) == 2 else ('<', True) - if self.bigtiff: - self.tagsize = 20 - self.tagnoformat = 'Q' - self.offsetsize = 8 - self.offsetformat = 'Q' - self.offset = 16 - else: - self.tagsize = 12 - self.tagnoformat = 'H' - self.offsetsize = 4 - self.offsetformat = 'I' - self.offset = 8 - def write(self, fh): + def write(self, fh: BinaryIO) -> None: fh.write({'<': b'II', '>': b'MM'}[self.byteorder]) if self.bigtiff: fh.write(struct.pack(self.byteorder + 'H', 43)) @@ -103,28 +109,34 @@ class Header: class Tag: + Value = bytes | str | float | Fraction | Sequence[bytes | str | float | Fraction] tiff_tag_registry = tifffile.TiffTagRegistry({key: value.lower() for key, value in tifffile.TIFF.TAGS.items()}) @staticmethod - def to_tags(tags): - return {(key if isinstance(key, Number) else (int(key[3:]) if key.lower().startswith('tag') - else Tag.tiff_tag_registry[key.lower()])): - tag if isinstance(tag, Tag) else Tag(tag) for key, tag in tags.items()} + def from_dict(tags: dict[str | int, Value | Tag]) -> dict[int, Tag]: + return {(key if isinstance(key, int) + else (int(key[3:]) if key.lower().startswith('tag') + else Tag.tiff_tag_registry[key.lower()])): tag if isinstance(tag, Tag) else Tag(tag) + for key, tag in tags.items()} @staticmethod - def fraction(numerator=0, denominator=None): - return Fraction(numerator, denominator).limit_denominator(2 ** (31 if numerator < 0 or - (denominator is not None and denominator < 0) else 32) - 1) + def fraction(numerator: float = 0, denominator: int = None) -> Fraction: + return Fraction(numerator, denominator).limit_denominator( # type: ignore + 2 ** (31 if numerator < 0 or (denominator is not None and denominator < 0) else 32) - 1) - def __init__(self, ttype, value=None, offset=None): - self.fh = None - self.header = None - self.bytes_data = None + def __init__(self, ttype_or_value: str | Value, value: Value = None, + offset: int = None) -> None: + self._value: bytes | str | Sequence[bytes | str | float | Fraction] + self.fh: Optional[BinaryIO] = None + self.header: Optional[Header] = None + self.bytes_data: Optional[bytes] = None if value is None: - self.value = ttype - if all([isinstance(value, int) for value in self.value]): - min_value = np.min(self.value) - max_value = np.max(self.value) + self.value = ttype_or_value # type: ignore + if isinstance(self.value, (str, bytes)) or all([isinstance(value, (str, bytes)) for value in self.value]): + ttype = 'ascii' + elif all([isinstance(value, int) for value in self.value]): + min_value: int = np.min(self.value) # type: ignore + max_value: int = np.max(self.value) # type: ignore type_map = {'uint8': 'byte', 'int8': 'sbyte', 'uint16': 'short', 'int16': 'sshort', 'uint32': 'long', 'int32': 'slong', 'uint64': 'long8', 'int64': 'slong8'} for dtype, ttype in type_map.items(): @@ -132,16 +144,14 @@ class Tag: break else: ttype = 'undefined' - elif isinstance(self.value, (str, bytes)) or all([isinstance(value, (str, bytes)) for value in self.value]): - ttype = 'ascii' elif all([isinstance(value, Fraction) for value in self.value]): - if all([value.numerator < 0 or value.denominator < 0 for value in self.value]): + if all([value.numerator < 0 or value.denominator < 0 for value in self.value]): # type: ignore ttype = 'srational' else: ttype = 'rational' elif all([isinstance(value, (float, int)) for value in self.value]): - min_value = np.min(np.asarray(self.value)[np.isfinite(self.value)]) - max_value = np.max(np.asarray(self.value)[np.isfinite(self.value)]) + min_value = np.min(np.asarray(self.value)[np.isfinite(self.value)]) # type: ignore + max_value = np.max(np.asarray(self.value)[np.isfinite(self.value)]) # type: ignore type_map = {'float32': 'float', 'float64': 'double'} for dtype, ttype in type_map.items(): if np.finfo(dtype).min <= min_value and max_value <= np.finfo(dtype).max: @@ -152,53 +162,54 @@ class Tag: ttype = 'complex' else: ttype = 'undefined' - self.ttype = tifffile.TIFF.DATATYPES[ttype.upper()] + self.ttype = tifffile.TIFF.DATATYPES[ttype.upper()] # noqa else: - self.value = value - self.ttype = tifffile.TIFF.DATATYPES[ttype.upper()] if isinstance(ttype, str) else ttype + self.value = value # type: ignore + self.ttype = tifffile.TIFF.DATATYPES[ttype_or_value.upper()] if isinstance(ttype_or_value, str) \ + else ttype_or_value # type: ignore self.dtype = tifffile.TIFF.DATA_FORMATS[self.ttype] self.offset = offset self.type_check() @property - def value(self): + def value(self) -> bytes | str | Sequence[bytes | str | float | Fraction]: return self._value @value.setter - def value(self, value): + def value(self, value: Value) -> None: self._value = value if isinstance(value, Iterable) else (value,) - def __repr__(self): + def __repr__(self) -> str: if self.offset is None: - return f'{tifffile.TIFF.DATATYPES(self.ttype).name}: {self.value}' + return f'{tifffile.TIFF.DATATYPES(self.ttype).name}: {self.value!r}' else: - return f'{tifffile.TIFF.DATATYPES(self.ttype).name} @ {self.offset}: {self.value}' + return f'{tifffile.TIFF.DATATYPES(self.ttype).name} @ {self.offset}: {self.value!r}' - def type_check(self): + def type_check(self) -> None: try: self.bytes_and_count(Header()) except Exception: raise ValueError(f"tif tag type '{tifffile.TIFF.DATATYPES(self.ttype).name}' and " f"data type '{type(self.value[0]).__name__}' do not correspond") - def bytes_and_count(self, header): + def bytes_and_count(self, header: Header) -> tuple[bytes, int]: if isinstance(self.value, bytes): return self.value, len(self.value) // struct.calcsize(self.dtype) elif self.ttype in (2, 14): if isinstance(self.value, str): - bytes_value = self.value.encode('ascii') + b'\x00' # noqa + bytes_value = self.value.encode('ascii') + b'\x00' else: - bytes_value = b'\x00'.join([value.encode('ascii') for value in self.value]) + b'\x00' + bytes_value = b'\x00'.join([value.encode('ascii') for value in self.value]) + b'\x00' # type: ignore return bytes_value, len(bytes_value) elif self.ttype in (5, 10): - return b''.join([struct.pack(header.byteorder + self.dtype, + return b''.join([struct.pack(header.byteorder + self.dtype, # type: ignore *((value.denominator, value.numerator) if isinstance(value, Fraction) else value)) for value in self.value]), len(self.value) else: return b''.join([struct.pack(header.byteorder + self.dtype, value) for value in self.value]), \ len(self.value) - def write_tag(self, fh, key, header, offset=None): + def write_tag(self, fh: BinaryIO, key: int, header: Header, offset: int = None) -> None: self.fh = fh self.header = header if offset is None: @@ -220,8 +231,8 @@ class Tag: if empty_bytes: fh.write(empty_bytes * b'\x00') - def write_data(self, write=None): - if self.bytes_data: + def write_data(self, write: Callable[[BinaryIO, bytes], None] = None) -> None: + if self.bytes_data and self.fh is not None and self.header is not None and self.offset is not None: self.fh.seek(0, 2) if write is None: offset = self.write(self.bytes_data) @@ -230,24 +241,25 @@ class Tag: self.fh.seek(self.offset + self.header.tagsize - self.header.offsetsize) self.fh.write(struct.pack(self.header.byteorder + self.header.offsetformat, offset)) - def write(self, bytes_value): - if self.fh.tell() % 2: - self.fh.write(b'\x00') - offset = self.fh.tell() - self.fh.write(bytes_value) - return offset + def write(self, bytes_value: bytes) -> Optional[int]: + if self.fh is not None: + if self.fh.tell() % 2: + self.fh.write(b'\x00') + offset = self.fh.tell() + self.fh.write(bytes_value) + return offset - def copy(self): + def copy(self) -> Tag: return self.__class__(self.ttype, self.value[:], self.offset) class IFD(dict): - def __init__(self, fh=None): + def __init__(self, fh: BinaryIO = None) -> None: super().__init__() self.fh = fh - self.header = None - self.offset = None - self.where_to_write_next_ifd_offset = None + self.header: Optional[Header] = None + self.offset: Optional[int] = None + self.where_to_write_next_ifd_offset: Optional[int] = None if fh is not None: header = Header(fh) fh.seek(header.offset) @@ -279,11 +291,12 @@ class IFD(dict): fh.seek(caddr) if ttype == 1: - value = fh.read(count) + value: Tag.Value = fh.read(count) elif ttype == 2: value = fh.read(count).decode('ascii').rstrip('\x00') elif ttype in (5, 10): - value = [struct.unpack(header.byteorder + dtype, fh.read(dtypelen)) for _ in range(count)] + value = [struct.unpack(header.byteorder + dtype, fh.read(dtypelen)) # type: ignore + for _ in range(count)] else: value = [struct.unpack(header.byteorder + dtype, fh.read(dtypelen))[0] for _ in range(count)] @@ -293,20 +306,20 @@ class IFD(dict): self[code] = Tag(ttype, value, pos) fh.seek(header.offset) - def __setitem__(self, key, tag): + def __setitem__(self, key: str | int, tag: str | float | Fraction | Tag) -> None: super().__setitem__(Tag.tiff_tag_registry[key.lower()] if isinstance(key, str) else key, tag if isinstance(tag, Tag) else Tag(tag)) - def items(self): + def items(self) -> Generator[tuple[int, Tag], None, None]: # type: ignore[override] return ((key, self[key]) for key in sorted(self)) - def keys(self): + def keys(self) -> Generator[int, None, None]: # type: ignore[override] return (key for key in sorted(self)) - def values(self): + def values(self) -> Generator[Tag, None, None]: # type: ignore[override] return (self[key] for key in sorted(self)) - def write(self, fh, header, write=None): + def write(self, fh: BinaryIO, header: Header, write: Callable[[BinaryIO, bytes], None] = None) -> BinaryIO: self.fh = fh self.header = header if fh.seek(0, 2) % 2: @@ -321,11 +334,12 @@ class IFD(dict): tag.write_data(write) return fh - def write_offset(self, where_to_write_offset): - self.fh.seek(where_to_write_offset) - self.fh.write(struct.pack(self.header.byteorder + self.header.offsetformat, self.offset)) + def write_offset(self, where_to_write_offset: int) -> None: + if self.fh is not None and self.header is not None: + self.fh.seek(where_to_write_offset) + self.fh.write(struct.pack(self.header.byteorder + self.header.offsetformat, self.offset)) - def copy(self): + def copy(self) -> IFD: new = self.__class__() new.update({key: tag.copy() for key, tag in self.items()}) return new @@ -345,14 +359,17 @@ class IJTiffFile: or Copyright=Tag('ascii', 'Made by me'). See tiff_tag_registry.items(). wp@tl20200214 """ - def __init__(self, path, shape, dtype='uint16', colors=None, colormap=None, pxsize=None, deltaz=None, - timeinterval=None, compression=(50000, 22), comment=None, **extratags): + def __init__(self, path: str | Path, shape: tuple[int, int, int], dtype: DTypeLike = 'uint16', + colors: Sequence[str] = None, colormap: str = None, pxsize: float = None, + deltaz: float = None, timeinterval: float = None, + compression: tuple[int, int] = (50000, 22), comment: str = None, + **extratags: Tag.Value | Tag) -> None: assert len(shape) >= 3, 'please specify all c, z, t for the shape' assert len(shape) <= 3, 'please specify only c, z, t for the shape' assert np.dtype(dtype).char in 'BbHhf', 'datatype not supported' assert colors is None or colormap is None, 'cannot have colors and colormap simultaneously' - self.path = path + self.path = Path(path) self.shape = shape self.dtype = np.dtype(dtype) self.colors = colors @@ -362,45 +379,46 @@ class IJTiffFile: self.timeinterval = timeinterval self.compression = compression self.comment = comment - self.extratags = {} if extratags is None else Tag.to_tags(extratags) + self.extratags = {} if extratags is None else Tag.from_dict(extratags) # type: ignore if pxsize is not None: - pxsize = Tag.fraction(pxsize) - self.extratags.update({282: Tag(pxsize), 283: Tag(pxsize)}) + pxsize_fraction = Tag.fraction(pxsize) + self.extratags.update({282: Tag(pxsize_fraction), 283: Tag(pxsize_fraction)}) self.header = Header() - self.frames = [] self.spp = self.shape[0] if self.colormap is None and self.colors is None else 1 # samples/pixel self.nframes = np.prod(self.shape[1:]) if self.colormap is None and self.colors is None else np.prod(self.shape) - self.frame_extra_tags = {} - self.fh = FileHandle(path) + self.frame_extra_tags: dict[tuple[int, int, int], dict[int, Tag]] = {} + self.fh = FileHandle(self.path) self.hashes = PoolSingleton().manager.dict() self.pool = ParPool(self.compress_frame) self.main_process = True - with self.fh.lock() as fh: + with self.fh.lock() as fh: # noqa self.header.write(fh) - def __setstate__(self, state): + def __setstate__(self, state: dict[str, Any]) -> None: self.__dict__.update(state) self.main_process = False - def __hash__(self): + def __hash__(self) -> int: return hash(self.path) - def get_frame_number(self, n): + def get_frame_number(self, n: tuple[int, int, int]) -> tuple[int, int]: if self.colormap is None and self.colors is None: return n[1] + n[2] * self.shape[1], n[0] else: return n[0] + n[1] * self.shape[0] + n[2] * self.shape[0] * self.shape[1], 0 - def ij_tiff_frame(self, frame): - with BytesIO() as framedata: - with tifffile.TiffWriter(framedata, bigtiff=self.header.bigtiff, byteorder=self.header.byteorder) as t: + def ij_tiff_frame(self, frame: np.ndarray) -> bytes: + with BytesIO() as frame_bytes: + with tifffile.TiffWriter(frame_bytes, bigtiff=self.header.bigtiff, + byteorder=self.header.byteorder) as t: # type: ignore # predictor=True might save a few bytes, but requires the package imagecodes to save floats - t.write(frame, compression=self.compression, contiguous=True, predictor=False) - return framedata.getvalue() + t.write(frame, compression=self.compression, contiguous=True, predictor=False) # type: ignore + return frame_bytes.getvalue() - def save(self, frame, c, z, t, **extratags): + def save(self, frame: np.ndarray | Any, c: int, z: int, t: int, + **extratags: Tag.Value | Tag) -> None: """ save a 2d numpy array to the tiff at channel=c, slice=z, time=t, with optional extra tif tags """ assert (c, z, t) not in self.pool.tasks, f'frame {c} {z} {t} is added already' @@ -408,10 +426,10 @@ class IJTiffFile: 'frame {} {} {} is outside shape {} {} {}'.format(c, z, t, *self.shape) self.pool(frame.astype(self.dtype) if hasattr(frame, 'astype') else frame, handle=(c, z, t)) if extratags: - self.frame_extra_tags[(c, z, t)] = Tag.to_tags(extratags) + self.frame_extra_tags[(c, z, t)] = Tag.from_dict(extratags) # type: ignore @property - def description(self): + def description(self) -> bytes: desc = ['ImageJ=1.11a'] if self.colormap is None and self.colors is None: desc.extend((f'images={np.prod(self.shape[:1])}', f'slices={self.shape[1]}', f'frames={self.shape[2]}')) @@ -427,34 +445,35 @@ class IJTiffFile: desc.append(f'spacing={self.deltaz}') if self.timeinterval is not None: desc.append(f'interval={self.timeinterval}') - desc = [bytes(d, 'ascii') for d in desc] + desc_bytes = [bytes(d, 'ascii') for d in desc] if self.comment is not None: - desc.append(b'') + desc_bytes.append(b'') if isinstance(self.comment, bytes): - desc.append(self.comment) + desc_bytes.append(self.comment) else: - desc.append(bytes(self.comment, 'ascii')) - return b'\n'.join(desc) + desc_bytes.append(bytes(self.comment, 'ascii')) + return b'\n'.join(desc_bytes) @cached_property - def colormap_bytes(self): - colormap = getattr(colorcet, self.colormap) - colormap[0] = '#ffffff' - colormap[-1] = '#000000' - colormap = 65535 * np.array( - [[int(''.join(i), 16) for i in zip(*[iter(s[1:])] * 2)] for s in colormap]) // 255 - if np.dtype(self.dtype).itemsize == 2: - colormap = np.tile(colormap, 256).reshape((-1, 3)) - return b''.join([struct.pack(self.header.byteorder + 'H', c) for c in colormap.T.flatten()]) + def colormap_bytes(self) -> Optional[bytes]: + if self.colormap: + colormap = getattr(colorcet, self.colormap) + colormap[0] = '#ffffff' + colormap[-1] = '#000000' + colormap = 65535 * np.array( + [[int(''.join(i), 16) for i in zip(*[iter(s[1:])] * 2)] for s in colormap]) // 255 + if np.dtype(self.dtype).itemsize == 2: + colormap = np.tile(colormap, 256).reshape((-1, 3)) + return b''.join([struct.pack(self.header.byteorder + 'H', c) for c in colormap.T.flatten()]) @cached_property - def colors_bytes(self): + def colors_bytes(self) -> list[bytes]: return [b''.join([struct.pack(self.header.byteorder + 'H', c) for c in np.linspace(0, 65535 * np.array(mpl_colors.to_rgb(color)), 65536 if np.dtype(self.dtype).itemsize == 2 else 256, - dtype=int).T.flatten()]) for color in self.colors] + dtype=int).T.flatten()]) for color in self.colors] if self.colors else [] - def close(self): + def close(self) -> None: if self.main_process: ifds, strips = {}, {} for n in list(self.pool.tasks): @@ -462,7 +481,7 @@ class IJTiffFile: ifds[framenr], strips[(framenr, channel)] = self.pool[n] self.pool.close() - with self.fh.lock() as fh: + with self.fh.lock() as fh: # noqa for n, tags in self.frame_extra_tags.items(): framenr, channel = self.get_frame_number(n) ifds[framenr].update(tags) @@ -501,21 +520,21 @@ class IJTiffFile: warnings.warn('Some frames were not added to the tif file, either you forgot them, ' 'or an error occured and the tif file was closed prematurely.') - def __enter__(self): + def __enter__(self) -> IJTiffFile: return self - def __exit__(self, *args, **kwargs): + def __exit__(self, *args: Any, **kwargs: Any) -> None: self.close() @staticmethod - def hash_check(fh, bvalue, offset): + def hash_check(fh: BinaryIO, bvalue: bytes, offset: int) -> bool: addr = fh.tell() fh.seek(offset) same = bvalue == fh.read(len(bvalue)) fh.seek(addr) return same - def write(self, fh, bvalue): + def write(self, fh: BinaryIO, bvalue: bytes) -> int: hash_value = sha1(bvalue).hexdigest() # hash uses a random seed making hashes different in different processes if hash_value in self.hashes and self.hash_check(fh, bvalue, self.hashes[hash_value]): return self.hashes[hash_value] # reuse previously saved data @@ -527,17 +546,17 @@ class IJTiffFile: fh.write(bvalue) return offset - def compress_frame(self, frame): + def compress_frame(self, frame: np.ndarray) -> tuple[IFD, tuple[list[int], list[int]]]: """ This is run in a different process""" stripbytecounts, ifd, chunks = self.get_chunks(self.ij_tiff_frame(frame)) stripbyteoffsets = [] - with self.fh.lock() as fh: + with self.fh.lock() as fh: # noqa for chunk in chunks: stripbyteoffsets.append(self.write(fh, chunk)) return ifd, (stripbyteoffsets, stripbytecounts) @staticmethod - def get_chunks(frame): + def get_chunks(frame: bytes) -> tuple[list[int], IFD, list[bytes]]: with BytesIO(frame) as fh: ifd = IFD(fh) stripoffsets = ifd[273].value @@ -551,20 +570,20 @@ class IJTiffFile: class FileHandle: """ Process safe file handle """ - def __init__(self, name): + def __init__(self, path: Path) -> None: manager = PoolSingleton().manager - if os.path.exists(name): - os.remove(name) - with open(name, 'xb'): + if path.exists(): + path.unlink() + with open(path, 'xb'): pass - self.name = name + self.path = path self._lock = manager.RLock() self._pos = manager.Value('i', 0) @contextmanager - def lock(self): + def lock(self) -> Generator[BinaryIO, None, None]: with self._lock: - with open(self.name, 'rb+') as f: + with open(self.path, 'rb+') as f: try: f.seek(self._pos.value) yield f