diff --git a/Cargo.toml b/Cargo.toml index 09d970f..3eb797f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ndbioimage" -version = "2025.2.2" +version = "2025.2.3" edition = "2021" rust-version = "1.78.0" authors = ["Wim Pomp "] diff --git a/py/ndbioimage/__init__.py b/py/ndbioimage/__init__.py index ee439a6..fac8e54 100755 --- a/py/ndbioimage/__init__.py +++ b/py/ndbioimage/__init__.py @@ -1,10 +1,9 @@ from __future__ import annotations -import multiprocessing import os import re import warnings -from abc import ABC, ABCMeta, abstractmethod +from abc import ABC from argparse import ArgumentParser from collections import OrderedDict from datetime import datetime @@ -18,11 +17,12 @@ from typing import Any, Callable, Generator, Iterable, Optional, Sequence, TypeV import numpy as np import yaml from numpy.typing import ArrayLike, DTypeLike -from ome_types import OME, from_xml, model, ureg +from ome_types import OME, from_xml, ureg from pint import set_application_registry from tiffwrite import FrameInfo, IJTiffParallel from tqdm.auto import tqdm +from .ndbioimage_rs import Reader, download_bioformats # noqa from .transforms import Transform, Transforms # noqa: F401 try: @@ -44,8 +44,11 @@ warnings.filterwarnings('ignore', 'Reference to unknown ID') Number = int | float | np.integer | np.floating -class ReaderNotFoundError(Exception): - pass +for dep_file in (Path(__file__).parent / "deps").glob("*_"): + dep_file.rename(str(dep_file)[:-1]) + +if not list((Path(__file__).parent / "jassets").glob("bioformats*.jar")): + download_bioformats(True) class TransformTiff(IJTiffParallel): @@ -96,12 +99,6 @@ def try_default(fun: Callable[..., R], default: Any, *args: Any, **kwargs: Any) return default -def bioformats_ome(path: str | Path) -> OME: - from .rs import Reader - reader = Reader(str(path), 0) - return from_xml(reader.get_ome_xml(), parser='lxml') - - class Shape(tuple): def __new__(cls, shape: Sequence[int] | Shape, axes: str = 'yxczt') -> Shape: if isinstance(shape, Shape): @@ -164,9 +161,9 @@ class OmeCache(DequeDict): (path.with_suffix('.ome.xml').lstat() if path.with_suffix('.ome.xml').exists() else None)) -def get_positions(path: str | Path) -> Optional[list[int]]: - subclass = AbstractReader.get_subclass(path) - return subclass.get_positions(AbstractReader.split_path_series(path)[0]) +def get_positions(path: str | Path) -> Optional[list[int]]: # noqa + # TODO + return None class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC): @@ -203,89 +200,181 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC): << [0.02, 0.0005] in % See __init__ and other functions for more ideas. - - Subclassing: - Subclass AbstractReader to add more file types. A subclass should always have at least the following - methods: - staticmethod _can_open(path): returns True when the subclass can open the image in path - __frame__(self, c, z, t): this should return a single frame at channel c, slice z and time t - optional open(self): code to be run during initialization, e.g. to open a file handle - optional close(self): close the file in a proper way - optional class field priority: subclasses with lower priority will be tried first, default = 99 - optional get_ome(self) -> OME: return an OME structure with metadata, - if not present bioformats will be used to generate an OME - Any other method can be overridden as needed """ - isclosed: Optional[bool] - channel_names: Optional[list[str]] - series: Optional[int] - pxsize_um: Optional[float] - deltaz_um: Optional[float] - exposuretime_s: Optional[list[float]] - timeinterval: Optional[float] - binning: Optional[list[int]] - laserwavelengths: Optional[list[tuple[float]]] - laserpowers: Optional[list[tuple[float]]] - objective: Optional[model.Objective] - magnification: Optional[float] - tubelens: Optional[model.Objective] - filter: Optional[str] - powermode: Optional[str] - collimator: Optional[str] - tirfangle: Optional[list[float]] - gain: Optional[list[float]] - pcf: Optional[list[float]] - path: Path - __frame__: Callable[[int, int, int], np.ndarray] + do_not_pickle = 'cache', 'reader' + ureg = ureg + + def close(self) -> None: + self.reader.close() @staticmethod - def get_subclass(path: Path | str | Any): - if len(AbstractReader.__subclasses__()) == 0: - raise Exception('Restart python kernel please!') - path, _ = AbstractReader.split_path_series(path) - for subclass in sorted(AbstractReader.__subclasses__(), key=lambda subclass_: subclass_.priority): - if subclass._can_open(path): # noqa - do_not_pickle = (AbstractReader.do_not_pickle,) if isinstance(AbstractReader.do_not_pickle, str) \ - else AbstractReader.do_not_pickle - subclass_do_not_pickle = (subclass.do_not_pickle,) if isinstance(subclass.do_not_pickle, str) \ - else subclass.do_not_pickle if hasattr(subclass, 'do_not_pickle') else () - subclass.do_not_pickle = set(do_not_pickle).union(set(subclass_do_not_pickle)) - return subclass - raise ReaderNotFoundError(f'No reader found for {path}.') + def get_positions(path: str | Path) -> Optional[list[int]]: # noqa + # TODO + return None - - def __new__(cls, path: Path | str | Imread | Any = None, dtype: DTypeLike = None, axes: str = None) -> Imread: - if cls is not Imread: - return super().__new__(cls) + def __init__(self, path: Path | str | Imread | Any = None, dtype: DTypeLike = None, axes: str = None) -> None: + super().__init__() if isinstance(path, Imread): - return path - subclass = cls.get_subclass(path) - do_not_pickle = (AbstractReader.do_not_pickle,) if isinstance(AbstractReader.do_not_pickle, str) \ - else AbstractReader.do_not_pickle - subclass_do_not_pickle = (subclass.do_not_pickle,) if isinstance(subclass.do_not_pickle, str) \ - else subclass.do_not_pickle if hasattr(subclass, 'do_not_pickle') else () - subclass.do_not_pickle = set(do_not_pickle).union(set(subclass_do_not_pickle)) - return super().__new__(subclass) + self.base = path.base or path + else: + self.base = self + self.isclosed = False + if isinstance(path, str): + path = Path(path) + self.path, self.series = self.split_path_series(path) + self.reader = Reader(str(self.path), int(self.series)) + self.frame_decorator = None + self.transform = Transforms() + if isinstance(path, Path) and path.exists(): + self.title = self.path.name + self.acquisitiondate = datetime.fromtimestamp(self.path.stat().st_mtime).strftime('%y-%m-%dT%H:%M:%S') + else: # ndarray + self.title = self.__class__.__name__ + self.acquisitiondate = 'now' - def __init__(self, *args: Any, **kwargs: Any): - def parse(base: Imread = None, # noqa - slice: tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray] = None, # noqa - shape: tuple[int, ...] = (0, 0, 0, 0, 0), # noqa - dtype: DTypeLike = None, # noqa - frame_decorator: Callable[[Imread, np.ndarray, int, int, int], np.ndarray] = None # noqa - ) -> tuple[Any, ...]: - return base, slice, shape, dtype, frame_decorator + self.pcf = None + self.powermode = None + self.collimator = None + self.tirfangle = None + self.cyllens = ['None', 'None'] + self.duolink = 'None' + self.detector = [0, 1] + self.track = [0] + self.cache = DequeDict(16) + self.frameoffset = 0, 0 # how far apart the centers of frame and sensor are - base, slice, shape, dtype, frame_decorator = parse(*args, **kwargs) # noqa - self.base = base or self - self.slice = slice - self._shape = Shape(shape) - self.dtype = dtype - self.frame_decorator = frame_decorator - self.transform = Transforms() - self.flags = dict(C_CONTIGUOUS=False, F_CONTIGUOUS=False, OWNDATA=False, WRITEABLE=False, - ALIGNED=False, WRITEBACKIFCOPY=False, UPDATEIFCOPY=False) + # extract some metadata from ome + instrument = self.ome.instruments[0] if self.ome.instruments else None + image = self.ome.images[self.series if len(self.ome.images) > 1 else 0] + pixels = image.pixels + self._shape = Shape((pixels.size_y, pixels.size_x, pixels.size_c, pixels.size_z, pixels.size_t), + 'yxczt') + self.base_shape = Shape((pixels.size_y, pixels.size_x, pixels.size_c, pixels.size_z, pixels.size_t), + 'yxczt') + self.dtype = pixels.type.value if dtype is None else dtype + self.pxsize = pixels.physical_size_x_quantity + try: + self.exposuretime = tuple(find(image.pixels.planes, the_c=c).exposure_time_quantity + for c in range(self.shape['c'])) + except AttributeError: + self.exposuretime = () + + if self.zstack: + self.deltaz = image.pixels.physical_size_z_quantity + self.deltaz_um = None if self.deltaz is None else self.deltaz.to(self.ureg.um).m + else: + self.deltaz = self.deltaz_um = None + if image.objective_settings: + self.objective = find(instrument.objectives, id=image.objective_settings.id) + else: + self.objective = None + try: + t0 = find(image.pixels.planes, the_c=0, the_t=0, the_z=0).delta_t + t1 = find(image.pixels.planes, the_c=0, the_t=self.shape['t'] - 1, the_z=0).delta_t + self.timeinterval = (t1 - t0) / (self.shape['t'] - 1) if self.shape['t'] > 1 and t1 > t0 else None + except AttributeError: + self.timeinterval = None + try: + self.binning = [int(i) for i in image.pixels.channels[0].detector_settings.binning.value.split('x')] + if self.pxsize is not None: + self.pxsize *= self.binning[0] + except (AttributeError, IndexError, ValueError): + self.binning = None + self.channel_names = [channel.name for channel in image.pixels.channels] + self.channel_names += [chr(97 + i) for i in range(len(self.channel_names), self.shape['c'])] + self.cnamelist = self.channel_names + try: + optovars = [objective for objective in instrument.objectives if 'tubelens' in objective.id.lower()] + except AttributeError: + optovars = [] + if len(optovars) == 0: + self.tubelens = None + else: + self.tubelens = optovars[0] + if self.objective: + if self.tubelens: + self.magnification = self.objective.nominal_magnification * self.tubelens.nominal_magnification + else: + self.magnification = self.objective.nominal_magnification + self.NA = self.objective.lens_na + else: + self.magnification = None + self.NA = None + + self.gain = [find(instrument.detectors, id=channel.detector_settings.id).amplification_gain + for channel in image.pixels.channels + if channel.detector_settings + and find(instrument.detectors, id=channel.detector_settings.id).amplification_gain] + self.laserwavelengths = [(channel.excitation_wavelength_quantity.to(self.ureg.nm).m,) + for channel in pixels.channels if channel.excitation_wavelength_quantity] + self.laserpowers = try_default(lambda: [(1 - channel.light_source_settings.attenuation,) + for channel in pixels.channels], []) + self.filter = try_default( # type: ignore + lambda: [find(instrument.filter_sets, id=channel.filter_set_ref.id).model + for channel in image.pixels.channels], None) + self.pxsize_um = None if self.pxsize is None else self.pxsize.to(self.ureg.um).m + self.exposuretime_s = [None if i is None else i.to(self.ureg.s).m for i in self.exposuretime] + + if axes is None: + self.axes = ''.join(i for i in 'cztyx' if self.shape[i] > 1) + elif axes.lower() == 'full': + self.axes = 'cztyx' + else: + self.axes = axes + self.slice = [np.arange(s, dtype=int) for s in self.shape.yxczt] + + m = self.extrametadata + if m is not None: + try: + self.cyllens = m['CylLens'] + self.duolink = m['DLFilterSet'].split(' & ')[m['DLFilterChannel']] + if 'FeedbackChannels' in m: + self.feedback = m['FeedbackChannels'] + else: + self.feedback = m['FeedbackChannel'] + except Exception: # noqa + self.cyllens = ['None', 'None'] + self.duolink = 'None' + self.feedback = [] + try: + self.cyllenschannels = np.where([self.cyllens[self.detector[c]].lower() != 'none' + for c in range(self.shape['c'])])[0].tolist() + except Exception: # noqa + pass + try: + s = int(re.findall(r'_(\d{3})_', self.duolink)[0]) * ureg.nm + except Exception: # noqa + s = 561 * ureg.nm + try: + sigma = [] + for c, d in enumerate(self.detector): + emission = (np.hstack(self.laserwavelengths[c]) + 22) * ureg.nm + sigma.append([emission[emission > s].max(initial=0), emission[emission < s].max(initial=0)][d]) + sigma = np.hstack(sigma) + sigma[sigma == 0] = 600 * ureg.nm + sigma /= 2 * self.NA * self.pxsize + self.sigma = sigma.magnitude.tolist() # type: ignore + except Exception: # noqa + self.sigma = [2] * self.shape['c'] + if not self.NA: + self.immersionN = 1 + elif 1.5 < self.NA: + self.immersionN = 1.661 + elif 1.3 < self.NA < 1.5: + self.immersionN = 1.518 + elif 1 < self.NA < 1.3: + self.immersionN = 1.33 + else: + self.immersionN = 1 + + p = re.compile(r'(\d+):(\d+)$') + try: + self.track, self.detector = zip(*[[int(i) for i in p.findall(find( + self.ome.images[self.series].pixels.channels, id=f'Channel:{c}').detector_settings.id)[0]] + for c in range(self.shape['c'])]) + except Exception: # noqa + pass def __call__(self, c: int = None, z: int = None, t: int = None, x: int = None, y: int = None) -> np.ndarray: """ same as im[] but allowing keyword axes, but slices need to made with slice() or np.s_ """ @@ -359,7 +448,7 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC): # TODO: check output dimensionality when requested shape in some dimension is 1 if all([isinstance(s, Number) or a < 0 and s.size == 1 for s, a in zip(new_slice, axes_idx)]): - return self.block(*new_slice).item() + return self.block(*new_slice).item() # type: ignore else: new = View(self) new.slice = new_slice @@ -381,8 +470,7 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC): def __setstate__(self, state: dict[str, Any]) -> None: """ What happens during unpickling """ self.__dict__.update({key: value for key, value in state.items() if key != 'cache_size'}) - if isinstance(self, AbstractReader): - self.open() + self.reader = Reader(str(self.path), int(self.series)) self.cache = DequeDict(state.get('cache_size', 16)) def __str__(self) -> str: @@ -406,7 +494,7 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC): if dtype is not None: block = block.astype(dtype) if block.ndim == 0: - return block.item() + return block.item() # type: ignore axes = ''.join(j for i, j in enumerate('yxczt') if i not in axes_squeeze) return block.transpose([axes.find(i) for i in self.shape.axes if i in axes]) @@ -415,6 +503,7 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC): """ frame-wise application of np.argmin and np.argmax """ if axis is None: value = arg = None + axes = ''.join(i for i in self.axes if i in 'yx') + 'czt' for idx in product(range(self.shape['c']), range(self.shape['z']), range(self.shape['t'])): yxczt = (slice(None), slice(None)) + idx in_idx = tuple(yxczt['yxczt'.find(i)] for i in self.axes) @@ -424,16 +513,20 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC): if value is None: arg = new_arg + idx value = new_value + elif value == new_value: + a = np.ravel_multi_index([arg[axes.find(i)] for i in self.axes], self.shape) + b = np.ravel_multi_index([(new_arg + idx)[axes.find(i)] for i in self.axes], self.shape) + if b < a: + arg = new_arg + idx else: i = fun((value, new_value)) # type: ignore arg = (arg, new_arg + idx)[i] value = (value, new_value)[i] - axes = ''.join(i for i in self.axes if i in 'yx') + 'czt' arg = np.ravel_multi_index([arg[axes.find(i)] for i in self.axes], self.shape) if out is None: return arg else: - out.itemset(arg) + out[:] = arg return out else: if isinstance(axis, str): @@ -508,7 +601,7 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC): if out is None: return res else: - out.itemset(res) + out[:] = res return out else: if isinstance(axis, str): @@ -613,8 +706,7 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC): def summary(self) -> str: """ gives a helpful summary of the recorded experiment """ s = [f'path/filename: {self.path}', - f'series/pos: {self.series}', - f"reader: {self.base.__class__.__module__.split('.')[-1]}"] + f'series/pos: {self.series}'] s.extend((f'dtype: {self.dtype}', f'shape ({self.axes}):'.ljust(15) + f"{' x '.join(str(i) for i in self.shape)}")) if self.pxsize_um: @@ -754,7 +846,7 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC): @wraps(np.ndarray.reshape) def reshape(self, *args, **kwargs): - return np.asarray(self).reshape(*args, **kwargs) + return np.asarray(self).reshape(*args, **kwargs) # noqa @wraps(np.ndarray.squeeze) def squeeze(self, axes=None): @@ -859,7 +951,7 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC): self.cache.move_to_end(key) f = self.cache[key] else: - f = self.transform[self.channel_names[c], t].frame(self.__frame__(c, z, t)) + f = self.transform[self.channel_names[c], t].frame(self.reader.get_frame(int(c), int(z), int(t))) if self.frame_decorator is not None: f = self.frame_decorator(self, f, c, z, t) if self.cache.maxlen: @@ -915,12 +1007,6 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC): czt.append([k % self.shape[i] for k in n]) return [self.get_channel(c) for c in czt[0]], *czt[1:3] # type: ignore - @staticmethod - def bioformats_ome(path: [str, Path]) -> OME: - """ Use java BioFormats to make an ome metadata structure. """ - with multiprocessing.get_context('spawn').Pool(1) as pool: - return pool.map(bioformats_ome, (path,))[0] - @staticmethod def fix_ome(ome: OME) -> OME: # fix ome if necessary @@ -945,8 +1031,8 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC): return OME.from_xml(path.with_suffix('.ome.xml')) def get_ome(self) -> OME: - """ overload this """ - return self.bioformats_ome(self.path) + """ OME metadata structure """ + return from_xml(self.reader.get_ome_xml(), parser='lxml') @cached_property def ome(self) -> OME: @@ -1116,8 +1202,14 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC): class View(Imread, ABC): def __init__(self, base: Imread, dtype: DTypeLike = None) -> None: - super().__init__(base.base, base.slice, base.shape, dtype or base.dtype, base.frame_decorator) + super().__init__(base) + self.dtype = dtype or base.dtype + self.slice = base.slice + self._shape = Shape(base.shape) + self.frame_decorator = base.frame_decorator self.transform = base.transform + self.flags = dict(C_CONTIGUOUS=False, F_CONTIGUOUS=False, OWNDATA=False, WRITEABLE=False, + ALIGNED=False, WRITEBACKIFCOPY=False, UPDATEIFCOPY=False) def __getattr__(self, item: str) -> Any: if not hasattr(self.base, item): @@ -1125,194 +1217,6 @@ class View(Imread, ABC): return self.base.__getattribute__(item) -class AbstractReader(Imread, metaclass=ABCMeta): - priority = 99 - do_not_pickle = 'cache' - ureg = ureg - - @staticmethod - @abstractmethod - def _can_open(path: Path | str) -> bool: - """ Override this method, and return true when the subclass can open the file """ - return False - - @staticmethod - def get_positions(path: str | Path) -> Optional[list[int]]: - return None - - @abstractmethod - def __frame__(self, c: int, z: int, t: int) -> np.ndarray: - """ Override this, return the frame at c, z, t """ - return np.random.randint(0, 255, self.shape['yx']) - - def open(self) -> None: - """ Optionally override this, open file handles etc. - filehandles cannot be pickled and should be marked such by setting do_not_pickle = 'file_handle_name' """ - return - - def close(self) -> None: - """ Optionally override this, close file handles etc. """ - return - - def __init__(self, path: Path | str | Imread | Any = None, dtype: DTypeLike = None, axes: str = None) -> None: - if isinstance(path, Imread): - return - super().__init__() - self.isclosed = False - if isinstance(path, str): - path = Path(path) - self.path, self.series = self.split_path_series(path) - if isinstance(path, Path) and path.exists(): - self.title = self.path.name - self.acquisitiondate = datetime.fromtimestamp(self.path.stat().st_mtime).strftime('%y-%m-%dT%H:%M:%S') - else: # ndarray - self.title = self.__class__.__name__ - self.acquisitiondate = 'now' - - self.reader = None - self.pcf = None - self.powermode = None - self.collimator = None - self.tirfangle = None - self.cyllens = ['None', 'None'] - self.duolink = 'None' - self.detector = [0, 1] - self.track = [0] - self.cache = DequeDict(16) - self.frameoffset = 0, 0 # how far apart the centers of frame and sensor are - - self.open() - # extract some metadata from ome - instrument = self.ome.instruments[0] if self.ome.instruments else None - image = self.ome.images[self.series if len(self.ome.images) > 1 else 0] - pixels = image.pixels - self.shape = pixels.size_y, pixels.size_x, pixels.size_c, pixels.size_z, pixels.size_t - self.base_shape = Shape((pixels.size_y, pixels.size_x, pixels.size_c, pixels.size_z, pixels.size_t), 'yxczt') - self.dtype = pixels.type.value if dtype is None else dtype - self.pxsize = pixels.physical_size_x_quantity - try: - self.exposuretime = tuple(find(image.pixels.planes, the_c=c).exposure_time_quantity - for c in range(self.shape['c'])) - except AttributeError: - self.exposuretime = () - - if self.zstack: - self.deltaz = image.pixels.physical_size_z_quantity - self.deltaz_um = None if self.deltaz is None else self.deltaz.to(self.ureg.um).m - else: - self.deltaz = self.deltaz_um = None - if image.objective_settings: - self.objective = find(instrument.objectives, id=image.objective_settings.id) - else: - self.objective = None - try: - t0 = find(image.pixels.planes, the_c=0, the_t=0, the_z=0).delta_t - t1 = find(image.pixels.planes, the_c=0, the_t=self.shape['t'] - 1, the_z=0).delta_t - self.timeinterval = (t1 - t0) / (self.shape['t'] - 1) if self.shape['t'] > 1 and t1 > t0 else None - except AttributeError: - self.timeinterval = None - try: - self.binning = [int(i) for i in image.pixels.channels[0].detector_settings.binning.value.split('x')] - if self.pxsize is not None: - self.pxsize *= self.binning[0] - except (AttributeError, IndexError, ValueError): - self.binning = None - self.channel_names = [channel.name for channel in image.pixels.channels] - self.channel_names += [chr(97 + i) for i in range(len(self.channel_names), self.shape['c'])] - self.cnamelist = self.channel_names - try: - optovars = [objective for objective in instrument.objectives if 'tubelens' in objective.id.lower()] - except AttributeError: - optovars = [] - if len(optovars) == 0: - self.tubelens = None - else: - self.tubelens = optovars[0] - if self.objective: - if self.tubelens: - self.magnification = self.objective.nominal_magnification * self.tubelens.nominal_magnification - else: - self.magnification = self.objective.nominal_magnification - self.NA = self.objective.lens_na - else: - self.magnification = None - self.NA = None - - self.gain = [find(instrument.detectors, id=channel.detector_settings.id).amplification_gain - for channel in image.pixels.channels - if channel.detector_settings - and find(instrument.detectors, id=channel.detector_settings.id).amplification_gain] - self.laserwavelengths = [(channel.excitation_wavelength_quantity.to(self.ureg.nm).m,) - for channel in pixels.channels if channel.excitation_wavelength_quantity] - self.laserpowers = try_default(lambda: [(1 - channel.light_source_settings.attenuation,) - for channel in pixels.channels], []) - self.filter = try_default( # type: ignore - lambda: [find(instrument.filter_sets, id=channel.filter_set_ref.id).model - for channel in image.pixels.channels], None) - self.pxsize_um = None if self.pxsize is None else self.pxsize.to(self.ureg.um).m - self.exposuretime_s = [None if i is None else i.to(self.ureg.s).m for i in self.exposuretime] - - if axes is None: - self.axes = ''.join(i for i in 'cztyx' if self.shape[i] > 1) - elif axes.lower() == 'full': - self.axes = 'cztyx' - else: - self.axes = axes - self.slice = [np.arange(s, dtype=int) for s in self.shape.yxczt] - - m = self.extrametadata - if m is not None: - try: - self.cyllens = m['CylLens'] - self.duolink = m['DLFilterSet'].split(' & ')[m['DLFilterChannel']] - if 'FeedbackChannels' in m: - self.feedback = m['FeedbackChannels'] - else: - self.feedback = m['FeedbackChannel'] - except Exception: # noqa - self.cyllens = ['None', 'None'] - self.duolink = 'None' - self.feedback = [] - try: - self.cyllenschannels = np.where([self.cyllens[self.detector[c]].lower() != 'none' - for c in range(self.shape['c'])])[0].tolist() - except Exception: # noqa - pass - try: - s = int(re.findall(r'_(\d{3})_', self.duolink)[0]) * ureg.nm - except Exception: # noqa - s = 561 * ureg.nm - try: - sigma = [] - for c, d in enumerate(self.detector): - emission = (np.hstack(self.laserwavelengths[c]) + 22) * ureg.nm - sigma.append([emission[emission > s].max(initial=0), emission[emission < s].max(initial=0)][d]) - sigma = np.hstack(sigma) - sigma[sigma == 0] = 600 * ureg.nm - sigma /= 2 * self.NA * self.pxsize - self.sigma = sigma.magnitude.tolist() # type: ignore - except Exception: # noqa - self.sigma = [2] * self.shape['c'] - if not self.NA: - self.immersionN = 1 - elif 1.5 < self.NA: - self.immersionN = 1.661 - elif 1.3 < self.NA < 1.5: - self.immersionN = 1.518 - elif 1 < self.NA < 1.3: - self.immersionN = 1.33 - else: - self.immersionN = 1 - - p = re.compile(r'(\d+):(\d+)$') - try: - self.track, self.detector = zip(*[[int(i) for i in p.findall(find( - self.ome.images[self.series].pixels.channels, id=f'Channel:{c}').detector_settings.id)[0]] - for c in range(self.shape['c'])]) - except Exception: # noqa - pass - - def main() -> None: parser = ArgumentParser(description='Display info and save as tif') parser.add_argument('-v', '--version', action='version', version=__version__) @@ -1352,6 +1256,3 @@ def main() -> None: f.write(im.ome.to_xml()) if len(args.file) == 1: print(im.summary) - - -from .readers import * # noqa diff --git a/py/ndbioimage/readers/__init__.py b/py/ndbioimage/readers/__init__.py deleted file mode 100644 index d4ca3bc..0000000 --- a/py/ndbioimage/readers/__init__.py +++ /dev/null @@ -1 +0,0 @@ -__all__ = 'bfread', 'cziread', 'fijiread', 'ndread', 'seqread', 'tifread', 'metaseriesread' diff --git a/py/ndbioimage/readers/bfread.py b/py/ndbioimage/readers/bfread.py deleted file mode 100644 index bb36314..0000000 --- a/py/ndbioimage/readers/bfread.py +++ /dev/null @@ -1,37 +0,0 @@ -from __future__ import annotations - -from abc import ABC -from pathlib import Path - -import numpy as np - -from .. import rs -from .. import AbstractReader - -for file in (Path(__file__).parent / "deps").glob("*_"): - file.rename(str(file)[:-1]) - -if not list((Path(__file__).parent / "jassets").glob("bioformats*.jar")): - rs.download_bioformats(True) - - -class Reader(AbstractReader, ABC): - priority = 99 # panic and open with BioFormats - do_not_pickle = 'reader' - - @staticmethod - def _can_open(path: Path) -> bool: - try: - _ = rs.Reader(path) - return True - except Exception: - return False - - def open(self) -> None: - self.reader = rs.Reader(str(self.path), int(self.series)) - - def __frame__(self, c: int, z: int, t: int) -> np.ndarray: - return self.reader.get_frame(int(c), int(z), int(t)) - - def close(self) -> None: - self.reader.close() diff --git a/py/ndbioimage/readers/cziread.py b/py/ndbioimage/readers/cziread.py deleted file mode 100644 index c345d7b..0000000 --- a/py/ndbioimage/readers/cziread.py +++ /dev/null @@ -1,606 +0,0 @@ -import re -import warnings -from abc import ABC -from functools import cached_property -from io import BytesIO -from itertools import product -from pathlib import Path -from typing import Any, Callable, Optional, TypeVar - -import czifile -import imagecodecs -import numpy as np -from lxml import etree -from ome_types import OME, model -from tifffile import repeat_nd - -from .. import AbstractReader - -try: - # TODO: use zoom from imagecodecs implementation when available - from scipy.ndimage.interpolation import zoom -except ImportError: - try: - from ndimage.interpolation import zoom - except ImportError: - zoom = None - - -Element = TypeVar('Element') - - -def zstd_decode(data: bytes) -> bytes: # noqa - """ decode zstd bytes, copied from BioFormats ZeissCZIReader """ - def read_var_int(stream: BytesIO) -> int: # noqa - a = stream.read(1)[0] - if a & 128: - b = stream.read(1)[0] - if b & 128: - c = stream.read(1)[0] - return (c << 14) | ((b & 127) << 7) | (a & 127) - return (b << 7) | (a & 127) - return a & 255 - - try: - with BytesIO(data) as stream: - size_of_header = read_var_int(stream) - high_low_unpacking = False - while stream.tell() < size_of_header: - chunk_id = read_var_int(stream) - # only one chunk ID defined so far - if chunk_id == 1: - high_low_unpacking = (stream.read(1)[0] & 1) == 1 - else: - raise ValueError(f'Invalid chunk id: {chunk_id}') - pointer = stream.tell() - except Exception: # noqa - high_low_unpacking = False - pointer = 0 - - decoded = imagecodecs.zstd_decode(data[pointer:]) - if high_low_unpacking: - second_half = len(decoded) // 2 - return bytes([decoded[second_half + i // 2] if i % 2 else decoded[i // 2] for i in range(len(decoded))]) - else: - return decoded - - -def data(self, raw: bool = False, resize: bool = True, order: int = 0) -> np.ndarray: - """Read image data from file and return as numpy array.""" - DECOMPRESS = czifile.czifile.DECOMPRESS # noqa - DECOMPRESS[5] = imagecodecs.zstd_decode - DECOMPRESS[6] = zstd_decode - - de = self.directory_entry - fh = self._fh - if raw: - with fh.lock: - fh.seek(self.data_offset) - data = fh.read(self.data_size) # noqa - return data - if de.compression: - # if de.compression not in DECOMPRESS: - # raise ValueError('compression unknown or not supported') - with fh.lock: - fh.seek(self.data_offset) - data = fh.read(self.data_size) # noqa - data = DECOMPRESS[de.compression](data) # noqa - if de.compression == 2: - # LZW - data = np.fromstring(data, de.dtype) # noqa - elif de.compression in (5, 6): - # ZSTD - data = np.frombuffer(data, de.dtype) # noqa - else: - dtype = np.dtype(de.dtype) - with fh.lock: - fh.seek(self.data_offset) - data = fh.read_array(dtype, self.data_size // dtype.itemsize) # noqa - - data = data.reshape(de.stored_shape) # noqa - if de.compression != 4 and de.stored_shape[-1] in (3, 4): - if de.stored_shape[-1] == 3: - # BGR -> RGB - data = data[..., ::-1] # noqa - else: - # BGRA -> RGBA - tmp = data[..., 0].copy() - data[..., 0] = data[..., 2] - data[..., 2] = tmp - if de.stored_shape == de.shape or not resize: - return data - - # sub / supersampling - factors = [j / i for i, j in zip(de.stored_shape, de.shape)] - factors = [(int(round(f)) if abs(f - round(f)) < 0.0001 else f) - for f in factors] - - # use repeat if possible - if order == 0 and all(isinstance(f, int) for f in factors): - data = repeat_nd(data, factors).copy() # noqa - data.shape = de.shape - return data - - # remove leading dimensions with size 1 for speed - shape = list(de.stored_shape) - i = 0 - for s in shape: - if s != 1: - break - i += 1 - shape = shape[i:] - factors = factors[i:] - data.shape = shape - - # resize RGB components separately for speed - if zoom is None: - raise ImportError("cannot import 'zoom' from scipy or ndimage") - if shape[-1] in (3, 4) and factors[-1] == 1.0: - factors = factors[:-1] - old = data - data = np.empty(de.shape, de.dtype[-2:]) # noqa - for i in range(shape[-1]): - data[..., i] = zoom(old[..., i], zoom=factors, order=order) - else: - data = zoom(data, zoom=factors, order=order) # noqa - - data.shape = de.shape - return data - - -# monkeypatch zstd into czifile -czifile.czifile.SubBlockSegment.data = data - - -class Reader(AbstractReader, ABC): - priority = 0 - do_not_pickle = 'reader', 'filedict' - - @staticmethod - def _can_open(path: Path) -> bool: - return isinstance(path, Path) and path.suffix == '.czi' - - def open(self) -> None: - self.reader = czifile.CziFile(self.path) - filedict = {} - for directory_entry in self.reader.filtered_subblock_directory: - idx = self.get_index(directory_entry, self.reader.start) - if 'S' not in self.reader.axes or self.series in range(*idx[self.reader.axes.index('S')]): - for c in range(*idx[self.reader.axes.index('C')]): - for z in range(*idx[self.reader.axes.index('Z')]): - for t in range(*idx[self.reader.axes.index('T')]): - if (c, z, t) in filedict: - filedict[c, z, t].append(directory_entry) - else: - filedict[c, z, t] = [directory_entry] - if len(filedict) == 0: - raise FileNotFoundError(f'Series {self.series} not found in {self.path}.') - self.filedict = filedict # noqa - - def close(self) -> None: - self.reader.close() - - def get_ome(self) -> OME: - return OmeParse.get_ome(self.reader, self.filedict) - - def __frame__(self, c: int = 0, z: int = 0, t: int = 0) -> np.ndarray: - f = np.zeros(self.base_shape['yx'], self.dtype) - if (c, z, t) in self.filedict: - directory_entries = self.filedict[c, z, t] - x_min = min([f.start[f.axes.index('X')] for f in directory_entries]) - y_min = min([f.start[f.axes.index('Y')] for f in directory_entries]) - xy_min = {'X': x_min, 'Y': y_min} - for directory_entry in directory_entries: - subblock = directory_entry.data_segment() - tile = subblock.data(resize=True, order=0) - axes_min = [xy_min.get(ax, 0) for ax in directory_entry.axes] - index = [slice(i - j - m, i - j + k) - for i, j, k, m in zip(directory_entry.start, self.reader.start, tile.shape, axes_min)] - index = tuple(index[self.reader.axes.index(i)] for i in 'YX') - f[index] = tile.squeeze() - return f - - @staticmethod - def get_index(directory_entry: czifile.DirectoryEntryDV, start: tuple[int]) -> list[tuple[int, int]]: - return [(i - j, i - j + k) for i, j, k in zip(directory_entry.start, start, directory_entry.shape)] - - -class OmeParse: - size_x: int - size_y: int - size_c: int - size_z: int - size_t: int - - nm = model.UnitsLength.NANOMETER - um = model.UnitsLength.MICROMETER - - @classmethod - def get_ome(cls, reader: czifile.CziFile, filedict: dict[tuple[int, int, int], Any]) -> OME: - new = cls(reader, filedict) - new.parse() - return new.ome - - def __init__(self, reader: czifile.CziFile, filedict: dict[tuple[int, int, int], Any]) -> None: - self.reader = reader - self.filedict = filedict - xml = reader.metadata() - self.attachments = {i.attachment_entry.name: i.attachment_entry.data_segment() - for i in reader.attachments()} - self.tree = etree.fromstring(xml) - self.metadata = self.tree.find('Metadata') - version = self.metadata.find('Version') - if version is not None: - self.version = version.text - else: - self.version = self.metadata.find('Experiment').attrib['Version'] - - self.ome = OME() - self.information = self.metadata.find('Information') - self.display_setting = self.metadata.find('DisplaySetting') - self.experiment = self.metadata.find('Experiment') - self.acquisition_block = self.experiment.find('ExperimentBlocks').find('AcquisitionBlock') - self.instrument = self.information.find('Instrument') - self.image = self.information.find('Image') - - if self.version == '1.0': - self.experiment = self.metadata.find('Experiment') - self.acquisition_block = self.experiment.find('ExperimentBlocks').find('AcquisitionBlock') - self.multi_track_setup = self.acquisition_block.find('MultiTrackSetup') - else: - self.experiment = None - self.acquisition_block = None - self.multi_track_setup = None - - def parse(self) -> None: - self.get_experimenters() - self.get_instruments() - self.get_detectors() - self.get_objectives() - self.get_tubelenses() - self.get_light_sources() - self.get_filters() - self.get_pixels() - self.get_channels() - self.get_planes() - self.get_annotations() - - @staticmethod - def text(item: Optional[Element], default: str = "") -> str: - return default if item is None else item.text - - @staticmethod - def def_list(item: Any) -> list[Any]: - return [] if item is None else item - - @staticmethod - def try_default(fun: Callable[[Any, ...], Any] | type, default: Any = None, *args: Any, **kwargs: Any) -> Any: - try: - return fun(*args, **kwargs) - except Exception: # noqa - return default - - def get_experimenters(self) -> None: - if self.version == '1.0': - self.ome.experimenters = [ - model.Experimenter(id='Experimenter:0', - user_name=self.information.find('User').find('DisplayName').text)] - elif self.version in ('1.1', '1.2'): - self.ome.experimenters = [ - model.Experimenter(id='Experimenter:0', - user_name=self.information.find('Document').find('UserName').text)] - - def get_instruments(self) -> None: - if self.version == '1.0': - self.ome.instruments.append(model.Instrument(id=self.instrument.attrib['Id'])) - elif self.version in ('1.1', '1.2'): - for _ in self.instrument.find('Microscopes'): - self.ome.instruments.append(model.Instrument(id='Instrument:0')) - - def get_detectors(self) -> None: - if self.version == '1.0': - for detector in self.instrument.find('Detectors'): - try: - detector_type = model.Detector_Type(self.text(detector.find('Type')).upper() or "") - except ValueError: - detector_type = model.Detector_Type.OTHER - - self.ome.instruments[0].detectors.append( - model.Detector( - id=detector.attrib['Id'], model=self.text(detector.find('Manufacturer').find('Model')), - amplification_gain=float(self.text(detector.find('AmplificationGain'))), - gain=float(self.text(detector.find('Gain'))), zoom=float(self.text(detector.find('Zoom'))), - type=detector_type - )) - elif self.version in ('1.1', '1.2'): - for detector in self.instrument.find('Detectors'): - try: - detector_type = model.Detector_Type(self.text(detector.find('Type')).upper() or "") - except ValueError: - detector_type = model.Detector_Type.OTHER - - self.ome.instruments[0].detectors.append( - model.Detector( - id=detector.attrib['Id'].replace(' ', ''), - model=self.text(detector.find('Manufacturer').find('Model')), - type=detector_type - )) - - def get_objectives(self) -> None: - for objective in self.instrument.find('Objectives'): - self.ome.instruments[0].objectives.append( - model.Objective( - id=objective.attrib['Id'], - model=self.text(objective.find('Manufacturer').find('Model')), - immersion=self.text(objective.find('Immersion')), # type: ignore - lens_na=float(self.text(objective.find('LensNA'))), - nominal_magnification=float(self.text(objective.find('NominalMagnification'))))) - - def get_tubelenses(self) -> None: - if self.version == '1.0': - for idx, tube_lens in enumerate({self.text(track_setup.find('TubeLensPosition')) - for track_setup in self.multi_track_setup}): - try: - nominal_magnification = float(re.findall(r'\d+[,.]\d*', tube_lens)[0].replace(',', '.')) - except Exception: # noqa - nominal_magnification = 1.0 - - self.ome.instruments[0].objectives.append( - model.Objective(id=f'Objective:Tubelens:{idx}', model=tube_lens, - nominal_magnification=nominal_magnification)) - elif self.version in ('1.1', '1.2'): - for tubelens in self.def_list(self.instrument.find('TubeLenses')): - try: - nominal_magnification = float(re.findall(r'\d+(?:[,.]\d*)?', - tubelens.attrib['Name'])[0].replace(',', '.')) - except Exception: # noqa - nominal_magnification = 1.0 - - self.ome.instruments[0].objectives.append( - model.Objective( - id=f"Objective:{tubelens.attrib['Id']}", - model=tubelens.attrib['Name'], - nominal_magnification=nominal_magnification)) - - def get_light_sources(self) -> None: - if self.version == '1.0': - for light_source in self.def_list(self.instrument.find('LightSources')): - try: - if light_source.find('LightSourceType').find('Laser') is not None: - self.ome.instruments[0].lasers.append( - model.Laser( - id=light_source.attrib['Id'], - model=self.text(light_source.find('Manufacturer').find('Model')), - power=float(self.text(light_source.find('Power'))), - wavelength=float( - self.text(light_source.find('LightSourceType').find('Laser').find('Wavelength'))))) - except AttributeError: - pass - elif self.version in ('1.1', '1.2'): - for light_source in self.def_list(self.instrument.find('LightSources')): - try: - if light_source.find('LightSourceType').find('Laser') is not None: - self.ome.instruments[0].lasers.append( - model.Laser( - id=f"LightSource:{light_source.attrib['Id']}", - power=float(self.text(light_source.find('Power'))), - wavelength=float(light_source.attrib['Id'][-3:]))) # TODO: follow Id reference - except (AttributeError, ValueError): - pass - - def get_filters(self) -> None: - if self.version == '1.0': - for idx, filter_ in enumerate({self.text(beam_splitter.find('Filter')) - for track_setup in self.multi_track_setup - for beam_splitter in track_setup.find('BeamSplitters')}): - self.ome.instruments[0].filter_sets.append( - model.FilterSet(id=f'FilterSet:{idx}', model=filter_) - ) - - def get_pixels(self) -> None: - x_min = min([f.start[f.axes.index('X')] for f in self.filedict[0, 0, 0]]) - y_min = min([f.start[f.axes.index('Y')] for f in self.filedict[0, 0, 0]]) - x_max = max([f.start[f.axes.index('X')] + f.shape[f.axes.index('X')] for f in self.filedict[0, 0, 0]]) - y_max = max([f.start[f.axes.index('Y')] + f.shape[f.axes.index('Y')] for f in self.filedict[0, 0, 0]]) - self.size_x = x_max - x_min - self.size_y = y_max - y_min - self.size_c, self.size_z, self.size_t = (self.reader.shape[self.reader.axes.index(directory_entry)] - for directory_entry in 'CZT') - image = self.information.find('Image') - pixel_type = self.text(image.find('PixelType'), 'Gray16') - if pixel_type.startswith('Gray'): - pixel_type = 'uint' + pixel_type[4:] - objective_settings = image.find('ObjectiveSettings') - - self.ome.images.append( - model.Image( - id='Image:0', - name=f"{self.text(self.information.find('Document').find('Name'))} #1", - pixels=model.Pixels( - id='Pixels:0', size_x=self.size_x, size_y=self.size_y, - size_c=self.size_c, size_z=self.size_z, size_t=self.size_t, - dimension_order='XYCZT', type=pixel_type, # type: ignore - significant_bits=int(self.text(image.find('ComponentBitCount'))), - big_endian=False, interleaved=False, metadata_only=True), # type: ignore - experimenter_ref=model.ExperimenterRef(id='Experimenter:0'), - instrument_ref=model.InstrumentRef(id='Instrument:0'), - objective_settings=model.ObjectiveSettings( - id=objective_settings.find('ObjectiveRef').attrib['Id'], - medium=self.text(objective_settings.find('Medium')), # type: ignore - refractive_index=float(self.text(objective_settings.find('RefractiveIndex')))), - stage_label=model.StageLabel( - name=f'Scene position #0', - x=self.positions[0], x_unit=self.um, - y=self.positions[1], y_unit=self.um, - z=self.positions[2], z_unit=self.um))) - - for distance in self.metadata.find('Scaling').find('Items'): - if distance.attrib['Id'] == 'X': - self.ome.images[0].pixels.physical_size_x = float(self.text(distance.find('Value'))) * 1e6 - elif distance.attrib['Id'] == 'Y': - self.ome.images[0].pixels.physical_size_y = float(self.text(distance.find('Value'))) * 1e6 - elif self.size_z > 1 and distance.attrib['Id'] == 'Z': - self.ome.images[0].pixels.physical_size_z = float(self.text(distance.find('Value'))) * 1e6 - - @cached_property - def positions(self) -> tuple[float, float, Optional[float]]: - if self.version == '1.0': - scenes = self.image.find('Dimensions').find('S').find('Scenes') - positions = scenes[0].find('Positions')[0] - return float(positions.attrib['X']), float(positions.attrib['Y']), float(positions.attrib['Z']) - elif self.version in ('1.1', '1.2'): - try: # TODO - scenes = self.image.find('Dimensions').find('S').find('Scenes') - center_position = [float(pos) for pos in self.text(scenes[0].find('CenterPosition')).split(',')] - except AttributeError: - center_position = [0, 0] - return center_position[0], center_position[1], None - - @cached_property - def channels_im(self) -> dict: - return {channel.attrib['Id']: channel for channel in self.image.find('Dimensions').find('Channels')} - - @cached_property - def channels_ds(self) -> dict: - return {channel.attrib['Id']: channel for channel in self.display_setting.find('Channels')} - - @cached_property - def channels_ts(self) -> dict: - return {detector.attrib['Id']: track_setup - for track_setup in - self.experiment.find('ExperimentBlocks').find('AcquisitionBlock').find('MultiTrackSetup') - for detector in track_setup.find('Detectors')} - - def get_channels(self) -> None: - if self.version == '1.0': - for idx, (key, channel) in enumerate(self.channels_im.items()): - detector_settings = channel.find('DetectorSettings') - laser_scan_info = channel.find('LaserScanInfo') - detector = detector_settings.find('Detector') - try: - binning = model.Binning(self.text(detector_settings.find('Binning'))) - except ValueError: - binning = model.Binning.OTHER - - filterset = self.text(self.channels_ts[key].find('BeamSplitters')[0].find('Filter')) - filterset_idx = [filterset.model for filterset in self.ome.instruments[0].filter_sets].index(filterset) - - light_sources_settings = channel.find('LightSourcesSettings') - # no space in ome for multiple lightsources simultaneously - if len(light_sources_settings) > idx: - light_source_settings = light_sources_settings[idx] - else: - light_source_settings = light_sources_settings[0] - light_source_settings = model.LightSourceSettings( - id=light_source_settings.find('LightSource').attrib['Id'], - attenuation=float(self.text(light_source_settings.find('Attenuation'))), - wavelength=float(self.text(light_source_settings.find('Wavelength'))), - wavelength_unit=self.nm) - - self.ome.images[0].pixels.channels.append( - model.Channel( - id=f'Channel:{idx}', - name=channel.attrib['Name'], - acquisition_mode=self.text(channel.find('AcquisitionMode')), # type: ignore - color=model.Color(self.text(self.channels_ds[channel.attrib['Id']].find('Color'), 'white')), - detector_settings=model.DetectorSettings(id=detector.attrib['Id'], binning=binning), - # emission_wavelength=text(channel.find('EmissionWavelength')), # TODO: fix - excitation_wavelength=light_source_settings.wavelength, - filter_set_ref=model.FilterSetRef(id=self.ome.instruments[0].filter_sets[filterset_idx].id), - illumination_type=self.text(channel.find('IlluminationType')), # type: ignore - light_source_settings=light_source_settings, - samples_per_pixel=int(self.text(laser_scan_info.find('Averaging'))))) - elif self.version in ('1.1', '1.2'): - for idx, (key, channel) in enumerate(self.channels_im.items()): - detector_settings = channel.find('DetectorSettings') - laser_scan_info = channel.find('LaserScanInfo') - detector = detector_settings.find('Detector') - try: - color = model.Color(self.text(self.channels_ds[channel.attrib['Id']].find('Color'), 'white')) - except Exception: # noqa - color = None - try: - if (i := self.text(channel.find('EmissionWavelength'))) != '0': - emission_wavelength = float(i) - else: - emission_wavelength = None - except Exception: # noqa - emission_wavelength = None - if laser_scan_info is not None: - samples_per_pixel = int(self.text(laser_scan_info.find('Averaging'), '1')) - else: - samples_per_pixel = 1 - try: - binning = model.Binning(self.text(detector_settings.find('Binning'))) - except ValueError: - binning = model.Binning.OTHER - - light_sources_settings = channel.find('LightSourcesSettings') - # no space in ome for multiple lightsources simultaneously - if light_sources_settings is not None: - light_source_settings = light_sources_settings[0] - light_source_settings = model.LightSourceSettings( - id='LightSource:' + '_'.join([light_source_settings.find('LightSource').attrib['Id'] - for light_source_settings in light_sources_settings]), - attenuation=self.try_default(float, None, self.text(light_source_settings.find('Attenuation'))), - wavelength=self.try_default(float, None, self.text(light_source_settings.find('Wavelength'))), - wavelength_unit=self.nm) - else: - light_source_settings = None - - self.ome.images[0].pixels.channels.append( - model.Channel( - id=f'Channel:{idx}', - name=channel.attrib['Name'], - acquisition_mode=self.text(channel.find('AcquisitionMode')).replace( # type: ignore - 'SingleMoleculeLocalisation', 'SingleMoleculeImaging'), - color=color, - detector_settings=model.DetectorSettings( - id=detector.attrib['Id'].replace(' ', ""), - binning=binning), - emission_wavelength=emission_wavelength, - excitation_wavelength=self.try_default(float, None, - self.text(channel.find('ExcitationWavelength'))), - # filter_set_ref=model.FilterSetRef(id=ome.instruments[0].filter_sets[filterset_idx].id), - illumination_type=self.text(channel.find('IlluminationType')), # type: ignore - light_source_settings=light_source_settings, - samples_per_pixel=samples_per_pixel)) - - def get_planes(self) -> None: - try: - exposure_times = [float(self.text(channel.find('LaserScanInfo').find('FrameTime'))) - for channel in self.channels_im.values()] - except Exception: # noqa - exposure_times = [None] * len(self.channels_im) - delta_ts = self.attachments['TimeStamps'].data() - dt = np.diff(delta_ts) - if len(dt) and np.std(dt) / np.mean(dt) > 0.02: - dt = np.median(dt[dt > 0]) - delta_ts = dt * np.arange(len(delta_ts)) - warnings.warn(f'delta_t is inconsistent, using median value: {dt}') - - for t, z, c in product(range(self.size_t), range(self.size_z), range(self.size_c)): - self.ome.images[0].pixels.planes.append( - model.Plane(the_c=c, the_z=z, the_t=t, delta_t=delta_ts[t], - exposure_time=exposure_times[c], - position_x=self.positions[0], position_x_unit=self.um, - position_y=self.positions[1], position_y_unit=self.um, - position_z=self.positions[2], position_z_unit=self.um)) - - def get_annotations(self) -> None: - idx = 0 - for layer in [] if (ml := self.metadata.find('Layers')) is None else ml: - rectangle = layer.find('Elements').find('Rectangle') - if rectangle is not None: - geometry = rectangle.find('Geometry') - roi = model.ROI(id=f'ROI:{idx}', description=self.text(layer.find('Usage'))) - roi.union.append( - model.Rectangle( - id='Shape:0:0', - height=float(self.text(geometry.find('Height'))), - width=float(self.text(geometry.find('Width'))), - x=float(self.text(geometry.find('Left'))), - y=float(self.text(geometry.find('Top'))))) - self.ome.rois.append(roi) - self.ome.images[0].roi_refs.append(model.ROIRef(id=f'ROI:{idx}')) - idx += 1 diff --git a/py/ndbioimage/readers/fijiread.py b/py/ndbioimage/readers/fijiread.py deleted file mode 100644 index f11c0e7..0000000 --- a/py/ndbioimage/readers/fijiread.py +++ /dev/null @@ -1,59 +0,0 @@ -from abc import ABC -from itertools import product -from pathlib import Path -from struct import unpack -from warnings import warn - -import numpy as np -from ome_types import model -from tifffile import TiffFile - -from .. import AbstractReader - - -class Reader(AbstractReader, ABC): - """ Can read some tif files written with Fiji which are broken because Fiji didn't finish writing. """ - priority = 90 - do_not_pickle = 'reader' - - @staticmethod - def _can_open(path): - if isinstance(path, Path) and path.suffix in ('.tif', '.tiff'): - with TiffFile(path) as tif: - return tif.is_imagej and not tif.is_bigtiff - else: - return False - - def __frame__(self, c, z, t): # Override this, return the frame at c, z, t - self.reader.filehandle.seek(self.offset + t * self.count) - return np.reshape(unpack(self.fmt, self.reader.filehandle.read(self.count)), self.base_shape['yx']) - - def open(self): - warn(f'File {self.path.name} is probably damaged, opening with fijiread.') - self.reader = TiffFile(self.path) - assert self.reader.pages[0].compression == 1, 'Can only read uncompressed tiff files.' - assert self.reader.pages[0].samplesperpixel == 1, 'Can only read 1 sample per pixel.' - self.offset = self.reader.pages[0].dataoffsets[0] # noqa - self.count = self.reader.pages[0].databytecounts[0] # noqa - self.bytes_per_sample = self.reader.pages[0].bitspersample // 8 # noqa - self.fmt = self.reader.byteorder + self.count // self.bytes_per_sample * 'BHILQ'[self.bytes_per_sample - 1] # noqa - - def close(self): - self.reader.close() - - def get_ome(self): - size_y, size_x = self.reader.pages[0].shape - size_c, size_z = 1, 1 - size_t = int(np.floor((self.reader.filehandle.size - self.reader.pages[0].dataoffsets[0]) / self.count)) - pixel_type = model.PixelType(self.reader.pages[0].dtype.name) - ome = model.OME() - ome.instruments.append(model.Instrument()) - ome.images.append( - model.Image( - pixels=model.Pixels( - size_c=size_c, size_z=size_z, size_t=size_t, size_x=size_x, size_y=size_y, - dimension_order='XYCZT', type=pixel_type), - objective_settings=model.ObjectiveSettings(id='Objective:0'))) - for c, z, t in product(range(size_c), range(size_z), range(size_t)): - ome.images[0].pixels.planes.append(model.Plane(the_c=c, the_z=z, the_t=t, delta_t=0)) - return ome diff --git a/py/ndbioimage/readers/metaseriesread.py b/py/ndbioimage/readers/metaseriesread.py deleted file mode 100644 index f8087e4..0000000 --- a/py/ndbioimage/readers/metaseriesread.py +++ /dev/null @@ -1,80 +0,0 @@ -import re -from abc import ABC -from pathlib import Path -from typing import Optional - -import tifffile -from ome_types import model -from ome_types.units import _quantity_property # noqa - -from .. import AbstractReader - - -class Reader(AbstractReader, ABC): - priority = 20 - do_not_pickle = 'last_tif' - - @staticmethod - def _can_open(path): - return isinstance(path, Path) and (path.is_dir() or - (path.parent.is_dir() and path.name.lower().startswith('pos'))) - - @staticmethod - def get_positions(path: str | Path) -> Optional[list[int]]: - pat = re.compile(rf's(\d)_t\d+\.(tif|TIF)$') - return sorted({int(m.group(1)) for file in Path(path).iterdir() if (m := pat.search(file.name))}) - - def get_ome(self): - ome = model.OME() - tif = self.get_tif(0) - metadata = tif.metaseries_metadata - size_z = len(tif.pages) - page = tif.pages[0] - shape = {axis.lower(): size for axis, size in zip(page.axes, page.shape)} - size_x, size_y = shape['x'], shape['y'] - - ome.instruments.append(model.Instrument()) - - size_c = 1 - size_t = max(self.filedict.keys()) + 1 - pixel_type = f"uint{metadata['PlaneInfo']['bits-per-pixel']}" - ome.images.append( - model.Image( - pixels=model.Pixels( - size_c=size_c, size_z=size_z, size_t=size_t, - size_x=size_x, size_y=size_y, - dimension_order='XYCZT', type=pixel_type), - objective_settings=model.ObjectiveSettings(id='Objective:0'))) - return ome - - def open(self): - pat = re.compile(rf's{self.series}_t\d+\.(tif|TIF)$') - filelist = sorted([file for file in self.path.iterdir() if pat.search(file.name)]) - pattern = re.compile(r't(\d+)$') - self.filedict = {int(pattern.search(file.stem).group(1)) - 1: file for file in filelist} - if len(self.filedict) == 0: - raise FileNotFoundError - self.last_tif = 0, tifffile.TiffFile(self.filedict[0]) - - def close(self) -> None: - self.last_tif[1].close() - - def get_tif(self, t: int = None): - last_t, tif = self.last_tif - if (t is None or t == last_t) and not tif.filehandle.closed: - return tif - else: - tif.close() - tif = tifffile.TiffFile(self.filedict[t]) - self.last_tif = t, tif - return tif - - def __frame__(self, c=0, z=0, t=0): - tif = self.get_tif(t) - page = tif.pages[z] - if page.axes.upper() == 'YX': - return page.asarray() - elif page.axes.upper() == 'XY': - return page.asarray().T - else: - raise NotImplementedError(f'reading axes {page.axes} is not implemented') diff --git a/py/ndbioimage/readers/ndread.py b/py/ndbioimage/readers/ndread.py deleted file mode 100644 index 5a2b216..0000000 --- a/py/ndbioimage/readers/ndread.py +++ /dev/null @@ -1,53 +0,0 @@ -from abc import ABC -from itertools import product - -import numpy as np -from ome_types import model - -from .. import AbstractReader - - -class Reader(AbstractReader, ABC): - priority = 20 - - @staticmethod - def _can_open(path): - return isinstance(path, np.ndarray) and 1 <= path.ndim <= 5 - - def get_ome(self): - def shape(size_x=1, size_y=1, size_c=1, size_z=1, size_t=1): # noqa - return size_x, size_y, size_c, size_z, size_t - size_x, size_y, size_c, size_z, size_t = shape(*self.array.shape) - try: - pixel_type = model.PixelType(self.array.dtype.name) - except ValueError: - if self.array.dtype.name.startswith('int'): - pixel_type = model.PixelType('int32') - else: - pixel_type = model.PixelType('float') - - ome = model.OME() - ome.instruments.append(model.Instrument()) - ome.images.append( - model.Image( - pixels=model.Pixels( - size_c=size_c, size_z=size_z, size_t=size_t, size_x=size_x, size_y=size_y, - dimension_order='XYCZT', type=pixel_type), - objective_settings=model.ObjectiveSettings(id='Objective:0'))) - for c, z, t in product(range(size_c), range(size_z), range(size_t)): - ome.images[0].pixels.planes.append(model.Plane(the_c=c, the_z=z, the_t=t, delta_t=0)) - return ome - - def open(self): - if isinstance(self.path, np.ndarray): - self.array = np.array(self.path) - while self.array.ndim < 5: - self.array = np.expand_dims(self.array, -1) # noqa - self.path = 'numpy array' - - def __frame__(self, c, z, t): - frame = self.array[:, :, c, z, t] - if self.axes.find('y') > self.axes.find('x'): - return frame.T - else: - return frame diff --git a/py/ndbioimage/readers/seqread.py b/py/ndbioimage/readers/seqread.py deleted file mode 100644 index 4d74683..0000000 --- a/py/ndbioimage/readers/seqread.py +++ /dev/null @@ -1,146 +0,0 @@ -import re -from abc import ABC -from datetime import datetime -from itertools import product -from pathlib import Path - -import tifffile -import yaml -from ome_types import model -from ome_types.units import _quantity_property # noqa - -from .. import AbstractReader - - -def lazy_property(function, field, *arg_fields): - def lazy(self): - if self.__dict__.get(field) is None: - self.__dict__[field] = function(*[getattr(self, arg_field) for arg_field in arg_fields]) - try: - self.model_fields_set.add(field) - except Exception: # noqa - pass - return self.__dict__[field] - return property(lazy) - - -class Plane(model.Plane): - """ Lazily retrieve delta_t from metadata """ - def __init__(self, t0, file, **kwargs): # noqa - super().__init__(**kwargs) - # setting fields here because they would be removed by ome_types/pydantic after class definition - setattr(self.__class__, 'delta_t', lazy_property(self.get_delta_t, 'delta_t', 't0', 'file')) - setattr(self.__class__, 'delta_t_quantity', _quantity_property('delta_t')) - self.__dict__['t0'] = t0 # noqa - self.__dict__['file'] = file # noqa - - @staticmethod - def get_delta_t(t0, file): - with tifffile.TiffFile(file) as tif: - info = yaml.safe_load(tif.pages[0].tags[50839].value['Info']) - return float((datetime.strptime(info['Time'], '%Y-%m-%d %H:%M:%S %z') - t0).seconds) - - -class Reader(AbstractReader, ABC): - priority = 10 - - @staticmethod - def _can_open(path): - pat = re.compile(r'(?:\d+-)?Pos.*', re.IGNORECASE) - return (isinstance(path, Path) and path.is_dir() and - (pat.match(path.name) or any(file.is_dir() and pat.match(file.stem) for file in path.iterdir()))) - - def get_ome(self): - ome = model.OME() - with tifffile.TiffFile(self.filedict[0, 0, 0]) as tif: - metadata = {key: yaml.safe_load(value) for key, value in tif.pages[0].tags[50839].value.items()} - ome.experimenters.append( - model.Experimenter(id='Experimenter:0', user_name=metadata['Info']['Summary']['UserName'])) - objective_str = metadata['Info']['ZeissObjectiveTurret-Label'] - ome.instruments.append(model.Instrument()) - ome.instruments[0].objectives.append( - model.Objective( - id='Objective:0', manufacturer='Zeiss', model=objective_str, - nominal_magnification=float(re.findall(r'(\d+)x', objective_str)[0]), - lens_na=float(re.findall(r'/(\d\.\d+)', objective_str)[0]), - immersion=model.Objective_Immersion.OIL if 'oil' in objective_str.lower() else None)) - tubelens_str = metadata['Info']['ZeissOptovar-Label'] - ome.instruments[0].objectives.append( - model.Objective( - id='Objective:Tubelens:0', manufacturer='Zeiss', model=tubelens_str, - nominal_magnification=float(re.findall(r'\d?\d*[,.]?\d+(?=x$)', tubelens_str)[0].replace(',', '.')))) - ome.instruments[0].detectors.append( - model.Detector( - id='Detector:0', amplification_gain=100)) - ome.instruments[0].filter_sets.append( - model.FilterSet(id='FilterSet:0', model=metadata['Info']['ZeissReflectorTurret-Label'])) - - pxsize = metadata['Info']['PixelSizeUm'] - pxsize_cam = 6.5 if 'Hamamatsu' in metadata['Info']['Core-Camera'] else None - if pxsize == 0: - pxsize = pxsize_cam / ome.instruments[0].objectives[0].nominal_magnification - pixel_type = metadata['Info']['PixelType'].lower() - if pixel_type.startswith('gray'): - pixel_type = 'uint' + pixel_type[4:] - else: - pixel_type = 'uint16' # assume - - size_c, size_z, size_t = (max(i) + 1 for i in zip(*self.filedict.keys())) - t0 = datetime.strptime(metadata['Info']['Time'], '%Y-%m-%d %H:%M:%S %z') - ome.images.append( - model.Image( - pixels=model.Pixels( - size_c=size_c, size_z=size_z, size_t=size_t, - size_x=metadata['Info']['Width'], size_y=metadata['Info']['Height'], - dimension_order='XYCZT', # type: ignore - type=pixel_type, physical_size_x=pxsize, physical_size_y=pxsize, - physical_size_z=metadata['Info']['Summary']['z-step_um']), - objective_settings=model.ObjectiveSettings(id='Objective:0'))) - - for c, z, t in product(range(size_c), range(size_z), range(size_t)): - ome.images[0].pixels.planes.append( - Plane(t0, self.filedict[c, z, t], - the_c=c, the_z=z, the_t=t, exposure_time=metadata['Info']['Exposure-ms'] / 1000)) - - # compare channel names from metadata with filenames - pattern_c = re.compile(r'img_\d{3,}_(.*)_\d{3,}$', re.IGNORECASE) - for c in range(size_c): - ome.images[0].pixels.channels.append( - model.Channel( - id=f'Channel:{c}', name=pattern_c.findall(self.filedict[c, 0, 0].stem)[0], - detector_settings=model.DetectorSettings( - id='Detector:0', binning=metadata['Info']['Hamamatsu_sCMOS-Binning']), - filter_set_ref=model.FilterSetRef(id='FilterSet:0'))) - return ome - - def open(self): - # /some_path/Pos4: path = /some_path, series = 4 - # /some_path/5-Pos_001_005: path = /some_path/5-Pos_001_005, series = 0 - if re.match(r'(?:\d+-)?Pos.*', self.path.name, re.IGNORECASE) is None: - pat = re.compile(rf'^(?:\d+-)?Pos{self.series}$', re.IGNORECASE) - files = sorted(file for file in self.path.iterdir() if pat.match(file.name)) - if len(files): - path = files[0] - else: - raise FileNotFoundError(self.path / pat.pattern) - else: - path = self.path - - pat = re.compile(r'^img_\d{3,}.*\d{3,}.*\.tif$', re.IGNORECASE) - filelist = sorted([file for file in path.iterdir() if pat.search(file.name)]) - with tifffile.TiffFile(self.path / filelist[0]) as tif: - metadata = {key: yaml.safe_load(value) for key, value in tif.pages[0].tags[50839].value.items()} - - # compare channel names from metadata with filenames - cnamelist = metadata['Info']['Summary']['ChNames'] - cnamelist = [c for c in cnamelist if any([c in f.name for f in filelist])] - - pattern_c = re.compile(r'img_\d{3,}_(.*)_\d{3,}$', re.IGNORECASE) - pattern_z = re.compile(r'(\d{3,})$') - pattern_t = re.compile(r'img_(\d{3,})', re.IGNORECASE) - self.filedict = {(cnamelist.index(pattern_c.findall(file.stem)[0]), # noqa - int(pattern_z.findall(file.stem)[0]), - int(pattern_t.findall(file.stem)[0])): file for file in filelist} - - def __frame__(self, c=0, z=0, t=0): - return tifffile.imread(self.path / self.filedict[(c, z, t)]) diff --git a/py/ndbioimage/readers/tifread.py b/py/ndbioimage/readers/tifread.py deleted file mode 100644 index 5d3c033..0000000 --- a/py/ndbioimage/readers/tifread.py +++ /dev/null @@ -1,85 +0,0 @@ -from abc import ABC -from functools import cached_property -from itertools import product -from pathlib import Path - -import numpy as np -import tifffile -import yaml -from ome_types import model - -from .. import AbstractReader - - -class Reader(AbstractReader, ABC): - priority = 0 - do_not_pickle = 'reader' - - @staticmethod - def _can_open(path): - if isinstance(path, Path) and path.suffix in ('.tif', '.tiff'): - with tifffile.TiffFile(path) as tif: - return tif.is_imagej and tif.pages[-1]._nextifd() == 0 # noqa - else: - return False - - @cached_property - def metadata(self): - return {key: yaml.safe_load(value) if isinstance(value, str) else value - for key, value in self.reader.imagej_metadata.items()} - - def get_ome(self): - page = self.reader.pages[0] - size_y = page.imagelength - size_x = page.imagewidth - if self.p_ndim == 3: - size_c = page.samplesperpixel - size_t = self.metadata.get('frames', 1) # // C - else: - size_c = self.metadata.get('channels', 1) - size_t = self.metadata.get('frames', 1) - size_z = self.metadata.get('slices', 1) - if 282 in page.tags and 296 in page.tags and page.tags[296].value == 1: - f = page.tags[282].value - pxsize = f[1] / f[0] - else: - pxsize = None - - dtype = page.dtype.name - if dtype not in ('int8', 'int16', 'int32', 'uint8', 'uint16', 'uint32', - 'float', 'double', 'complex', 'double-complex', 'bit'): - dtype = 'float' - - interval_t = self.metadata.get('interval', 0) - - ome = model.OME() - ome.instruments.append(model.Instrument(id='Instrument:0')) - ome.instruments[0].objectives.append(model.Objective(id='Objective:0')) - ome.images.append( - model.Image( - id='Image:0', - pixels=model.Pixels( - id='Pixels:0', - size_c=size_c, size_z=size_z, size_t=size_t, size_x=size_x, size_y=size_y, - dimension_order='XYCZT', type=dtype, # type: ignore - physical_size_x=pxsize, physical_size_y=pxsize), - objective_settings=model.ObjectiveSettings(id='Objective:0'))) - for c, z, t in product(range(size_c), range(size_z), range(size_t)): - ome.images[0].pixels.planes.append(model.Plane(the_c=c, the_z=z, the_t=t, delta_t=interval_t * t)) - return ome - - def open(self): - self.reader = tifffile.TiffFile(self.path) - page = self.reader.pages[0] - self.p_ndim = page.ndim # noqa - if self.p_ndim == 3: - self.p_transpose = [i for i in [page.axes.find(j) for j in 'SYX'] if i >= 0] # noqa - - def close(self): - self.reader.close() - - def __frame__(self, c, z, t): - if self.p_ndim == 3: - return np.transpose(self.reader.asarray(z + t * self.base_shape['z']), self.p_transpose)[c] - else: - return self.reader.asarray(c + z * self.base_shape['c'] + t * self.base_shape['c'] * self.base_shape['z']) diff --git a/py/ndbioimage/rs.py b/py/ndbioimage/rs.py deleted file mode 100644 index 37ad889..0000000 --- a/py/ndbioimage/rs.py +++ /dev/null @@ -1,9 +0,0 @@ -from .ndbioimage_rs import Reader, download_bioformats # noqa -from pathlib import Path - - -if not list((Path(__file__).parent / "jassets").glob("bioformats*.jar")): - download_bioformats(True) - -for file in (Path(__file__).parent / "deps").glob("*_"): - file.rename(str(file)[:-1]) diff --git a/src/py.rs b/src/py.rs index e8b53f4..16f64e2 100644 --- a/src/py.rs +++ b/src/py.rs @@ -15,7 +15,18 @@ struct PyReader { impl PyReader { #[new] fn new(path: &str, series: usize) -> PyResult { - let path = PathBuf::from(path); + let mut path = PathBuf::from(path); + if path.is_dir() { + for file in path.read_dir()? { + if let Ok(f) = file { + let p = f.path(); + if f.path().is_file() & (p.extension() == Some("tif".as_ref())) { + path = p; + break; + } + } + } + } Ok(PyReader { reader: Reader::new(&path, series as i32)?, }) @@ -43,6 +54,11 @@ impl PyReader { fn get_ome_xml(&self) -> PyResult { Ok(self.reader.get_ome_xml()?) } + + fn close(&mut self) -> PyResult<()> { + self.reader.close()?; + Ok(()) + } } pub(crate) fn ndbioimage_file() -> anyhow::Result { diff --git a/tests/test_open.py b/tests/test_open.py index 133fbaa..4a161d8 100644 --- a/tests/test_open.py +++ b/tests/test_open.py @@ -1,27 +1,21 @@ import pickle -from multiprocessing import active_children from pathlib import Path import pytest -from ndbioimage import Imread, ReaderNotFoundError +from ndbioimage import Imread -@pytest.mark.parametrize('file', (Path(__file__).parent / 'files').iterdir()) +@pytest.mark.parametrize('file', + [file for file in (Path(__file__).parent / 'files').iterdir() if not file.suffix == '.pzl']) def test_open(file): - try: - with Imread(file) as im: - mean = im[dict(c=0, z=0, t=0)].mean() - b = pickle.dumps(im) - jm = pickle.loads(b) - assert jm[dict(c=0, z=0, t=0)].mean() == mean - v = im.view() - assert v[dict(c=0, z=0, t=0)].mean() == mean - b = pickle.dumps(v) - w = pickle.loads(b) - assert w[dict(c=0, z=0, t=0)].mean() == mean - except ReaderNotFoundError: - assert len(Imread.__subclasses__()), 'No subclasses for Imread found.' - - for child in active_children(): - child.kill() + with Imread(file) as im: + mean = im[dict(c=0, z=0, t=0)].mean() + b = pickle.dumps(im) + jm = pickle.loads(b) + assert jm[dict(c=0, z=0, t=0)].mean() == mean + v = im.view() + assert v[dict(c=0, z=0, t=0)].mean() == mean + b = pickle.dumps(v) + w = pickle.loads(b) + assert w[dict(c=0, z=0, t=0)].mean() == mean diff --git a/tests/test_slicing.py b/tests/test_slicing.py index 0cfdfca..fb2845c 100644 --- a/tests/test_slicing.py +++ b/tests/test_slicing.py @@ -1,20 +1,32 @@ +import tempfile from itertools import combinations_with_replacement from numbers import Number +from pathlib import Path import numpy as np import pytest +from tiffwrite import tiffwrite from ndbioimage import Imread -r = np.random.randint(0, 255, (64, 64, 2, 3, 4)) -im = Imread(r) -a = np.array(im) + +@pytest.fixture +def array(): + return np.random.randint(0, 255, (64, 64, 2, 3, 4), 'uint8') + +@pytest.fixture() +def image(array): + with tempfile.TemporaryDirectory() as folder: + file = Path(folder) / "test.tif" + tiffwrite(file, array, 'yxczt') + with Imread(file, axes='yxczt') as im: + yield im @pytest.mark.parametrize('s', combinations_with_replacement( (0, -1, 1, slice(None), slice(0, 1), slice(-1, 0), slice(1, 1)), 5)) -def test_slicing(s): - s_im, s_a = im[s], a[s] +def test_slicing(s, image, array): + s_im, s_a = image[s], array[s] if isinstance(s_a, Number): assert isinstance(s_im, Number) assert s_im == s_a diff --git a/tests/test_ufuncs.py b/tests/test_ufuncs.py index 7b00e09..2fa52b8 100644 --- a/tests/test_ufuncs.py +++ b/tests/test_ufuncs.py @@ -1,19 +1,31 @@ +import tempfile from itertools import product +from pathlib import Path import numpy as np import pytest +from tiffwrite import tiffwrite from ndbioimage import Imread -r = np.random.randint(0, 255, (64, 64, 2, 3, 4)) -im = Imread(r) -a = np.array(im) + +@pytest.fixture +def array(): + return np.random.randint(0, 255, (64, 64, 2, 3, 4), 'uint16') + +@pytest.fixture() +def image(array): + with tempfile.TemporaryDirectory() as folder: + file = Path(folder) / "test.tif" + tiffwrite(file, array, 'yxczt') + with Imread(file, axes='yxczt') as im: + yield im @pytest.mark.parametrize('fun_and_axis', product( (np.sum, np.nansum, np.min, np.nanmin, np.max, np.nanmax, np.argmin, np.argmax, np.mean, np.nanmean, np.var, np.nanvar, np.std, np.nanstd), (None, 0, 1, 2, 3, 4))) -def test_ufuncs(fun_and_axis): +def test_ufuncs(fun_and_axis, image, array): fun, axis = fun_and_axis - assert np.all(np.isclose(fun(im, axis), fun(a, axis))), \ + assert np.all(np.isclose(fun(image, axis), fun(array, axis))), \ f'function {fun.__name__} over axis {axis} does not give the correct result'