- remove all readers but bioformats

- open folder with sequence
This commit is contained in:
Wim Pomp
2025-02-19 21:28:59 +01:00
parent 2247a994be
commit 87e9715f97
15 changed files with 266 additions and 1407 deletions

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "ndbioimage" name = "ndbioimage"
version = "2025.2.2" version = "2025.2.3"
edition = "2021" edition = "2021"
rust-version = "1.78.0" rust-version = "1.78.0"
authors = ["Wim Pomp <w.pomp@nki.nl>"] authors = ["Wim Pomp <w.pomp@nki.nl>"]

View File

@@ -1,10 +1,9 @@
from __future__ import annotations from __future__ import annotations
import multiprocessing
import os import os
import re import re
import warnings import warnings
from abc import ABC, ABCMeta, abstractmethod from abc import ABC
from argparse import ArgumentParser from argparse import ArgumentParser
from collections import OrderedDict from collections import OrderedDict
from datetime import datetime from datetime import datetime
@@ -18,11 +17,12 @@ from typing import Any, Callable, Generator, Iterable, Optional, Sequence, TypeV
import numpy as np import numpy as np
import yaml import yaml
from numpy.typing import ArrayLike, DTypeLike from numpy.typing import ArrayLike, DTypeLike
from ome_types import OME, from_xml, model, ureg from ome_types import OME, from_xml, ureg
from pint import set_application_registry from pint import set_application_registry
from tiffwrite import FrameInfo, IJTiffParallel from tiffwrite import FrameInfo, IJTiffParallel
from tqdm.auto import tqdm from tqdm.auto import tqdm
from .ndbioimage_rs import Reader, download_bioformats # noqa
from .transforms import Transform, Transforms # noqa: F401 from .transforms import Transform, Transforms # noqa: F401
try: try:
@@ -44,8 +44,11 @@ warnings.filterwarnings('ignore', 'Reference to unknown ID')
Number = int | float | np.integer | np.floating Number = int | float | np.integer | np.floating
class ReaderNotFoundError(Exception): for dep_file in (Path(__file__).parent / "deps").glob("*_"):
pass dep_file.rename(str(dep_file)[:-1])
if not list((Path(__file__).parent / "jassets").glob("bioformats*.jar")):
download_bioformats(True)
class TransformTiff(IJTiffParallel): class TransformTiff(IJTiffParallel):
@@ -96,12 +99,6 @@ def try_default(fun: Callable[..., R], default: Any, *args: Any, **kwargs: Any)
return default return default
def bioformats_ome(path: str | Path) -> OME:
from .rs import Reader
reader = Reader(str(path), 0)
return from_xml(reader.get_ome_xml(), parser='lxml')
class Shape(tuple): class Shape(tuple):
def __new__(cls, shape: Sequence[int] | Shape, axes: str = 'yxczt') -> Shape: def __new__(cls, shape: Sequence[int] | Shape, axes: str = 'yxczt') -> Shape:
if isinstance(shape, Shape): if isinstance(shape, Shape):
@@ -164,9 +161,9 @@ class OmeCache(DequeDict):
(path.with_suffix('.ome.xml').lstat() if path.with_suffix('.ome.xml').exists() else None)) (path.with_suffix('.ome.xml').lstat() if path.with_suffix('.ome.xml').exists() else None))
def get_positions(path: str | Path) -> Optional[list[int]]: def get_positions(path: str | Path) -> Optional[list[int]]: # noqa
subclass = AbstractReader.get_subclass(path) # TODO
return subclass.get_positions(AbstractReader.split_path_series(path)[0]) return None
class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC): class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC):
@@ -203,89 +200,181 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC):
<< [0.02, 0.0005] in % << [0.02, 0.0005] in %
See __init__ and other functions for more ideas. See __init__ and other functions for more ideas.
Subclassing:
Subclass AbstractReader to add more file types. A subclass should always have at least the following
methods:
staticmethod _can_open(path): returns True when the subclass can open the image in path
__frame__(self, c, z, t): this should return a single frame at channel c, slice z and time t
optional open(self): code to be run during initialization, e.g. to open a file handle
optional close(self): close the file in a proper way
optional class field priority: subclasses with lower priority will be tried first, default = 99
optional get_ome(self) -> OME: return an OME structure with metadata,
if not present bioformats will be used to generate an OME
Any other method can be overridden as needed
""" """
isclosed: Optional[bool] do_not_pickle = 'cache', 'reader'
channel_names: Optional[list[str]] ureg = ureg
series: Optional[int]
pxsize_um: Optional[float] def close(self) -> None:
deltaz_um: Optional[float] self.reader.close()
exposuretime_s: Optional[list[float]]
timeinterval: Optional[float]
binning: Optional[list[int]]
laserwavelengths: Optional[list[tuple[float]]]
laserpowers: Optional[list[tuple[float]]]
objective: Optional[model.Objective]
magnification: Optional[float]
tubelens: Optional[model.Objective]
filter: Optional[str]
powermode: Optional[str]
collimator: Optional[str]
tirfangle: Optional[list[float]]
gain: Optional[list[float]]
pcf: Optional[list[float]]
path: Path
__frame__: Callable[[int, int, int], np.ndarray]
@staticmethod @staticmethod
def get_subclass(path: Path | str | Any): def get_positions(path: str | Path) -> Optional[list[int]]: # noqa
if len(AbstractReader.__subclasses__()) == 0: # TODO
raise Exception('Restart python kernel please!') return None
path, _ = AbstractReader.split_path_series(path)
for subclass in sorted(AbstractReader.__subclasses__(), key=lambda subclass_: subclass_.priority):
if subclass._can_open(path): # noqa
do_not_pickle = (AbstractReader.do_not_pickle,) if isinstance(AbstractReader.do_not_pickle, str) \
else AbstractReader.do_not_pickle
subclass_do_not_pickle = (subclass.do_not_pickle,) if isinstance(subclass.do_not_pickle, str) \
else subclass.do_not_pickle if hasattr(subclass, 'do_not_pickle') else ()
subclass.do_not_pickle = set(do_not_pickle).union(set(subclass_do_not_pickle))
return subclass
raise ReaderNotFoundError(f'No reader found for {path}.')
def __init__(self, path: Path | str | Imread | Any = None, dtype: DTypeLike = None, axes: str = None) -> None:
def __new__(cls, path: Path | str | Imread | Any = None, dtype: DTypeLike = None, axes: str = None) -> Imread: super().__init__()
if cls is not Imread:
return super().__new__(cls)
if isinstance(path, Imread): if isinstance(path, Imread):
return path self.base = path.base or path
subclass = cls.get_subclass(path) else:
do_not_pickle = (AbstractReader.do_not_pickle,) if isinstance(AbstractReader.do_not_pickle, str) \ self.base = self
else AbstractReader.do_not_pickle self.isclosed = False
subclass_do_not_pickle = (subclass.do_not_pickle,) if isinstance(subclass.do_not_pickle, str) \ if isinstance(path, str):
else subclass.do_not_pickle if hasattr(subclass, 'do_not_pickle') else () path = Path(path)
subclass.do_not_pickle = set(do_not_pickle).union(set(subclass_do_not_pickle)) self.path, self.series = self.split_path_series(path)
return super().__new__(subclass) self.reader = Reader(str(self.path), int(self.series))
self.frame_decorator = None
def __init__(self, *args: Any, **kwargs: Any):
def parse(base: Imread = None, # noqa
slice: tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray] = None, # noqa
shape: tuple[int, ...] = (0, 0, 0, 0, 0), # noqa
dtype: DTypeLike = None, # noqa
frame_decorator: Callable[[Imread, np.ndarray, int, int, int], np.ndarray] = None # noqa
) -> tuple[Any, ...]:
return base, slice, shape, dtype, frame_decorator
base, slice, shape, dtype, frame_decorator = parse(*args, **kwargs) # noqa
self.base = base or self
self.slice = slice
self._shape = Shape(shape)
self.dtype = dtype
self.frame_decorator = frame_decorator
self.transform = Transforms() self.transform = Transforms()
self.flags = dict(C_CONTIGUOUS=False, F_CONTIGUOUS=False, OWNDATA=False, WRITEABLE=False, if isinstance(path, Path) and path.exists():
ALIGNED=False, WRITEBACKIFCOPY=False, UPDATEIFCOPY=False) self.title = self.path.name
self.acquisitiondate = datetime.fromtimestamp(self.path.stat().st_mtime).strftime('%y-%m-%dT%H:%M:%S')
else: # ndarray
self.title = self.__class__.__name__
self.acquisitiondate = 'now'
self.pcf = None
self.powermode = None
self.collimator = None
self.tirfangle = None
self.cyllens = ['None', 'None']
self.duolink = 'None'
self.detector = [0, 1]
self.track = [0]
self.cache = DequeDict(16)
self.frameoffset = 0, 0 # how far apart the centers of frame and sensor are
# extract some metadata from ome
instrument = self.ome.instruments[0] if self.ome.instruments else None
image = self.ome.images[self.series if len(self.ome.images) > 1 else 0]
pixels = image.pixels
self._shape = Shape((pixels.size_y, pixels.size_x, pixels.size_c, pixels.size_z, pixels.size_t),
'yxczt')
self.base_shape = Shape((pixels.size_y, pixels.size_x, pixels.size_c, pixels.size_z, pixels.size_t),
'yxczt')
self.dtype = pixels.type.value if dtype is None else dtype
self.pxsize = pixels.physical_size_x_quantity
try:
self.exposuretime = tuple(find(image.pixels.planes, the_c=c).exposure_time_quantity
for c in range(self.shape['c']))
except AttributeError:
self.exposuretime = ()
if self.zstack:
self.deltaz = image.pixels.physical_size_z_quantity
self.deltaz_um = None if self.deltaz is None else self.deltaz.to(self.ureg.um).m
else:
self.deltaz = self.deltaz_um = None
if image.objective_settings:
self.objective = find(instrument.objectives, id=image.objective_settings.id)
else:
self.objective = None
try:
t0 = find(image.pixels.planes, the_c=0, the_t=0, the_z=0).delta_t
t1 = find(image.pixels.planes, the_c=0, the_t=self.shape['t'] - 1, the_z=0).delta_t
self.timeinterval = (t1 - t0) / (self.shape['t'] - 1) if self.shape['t'] > 1 and t1 > t0 else None
except AttributeError:
self.timeinterval = None
try:
self.binning = [int(i) for i in image.pixels.channels[0].detector_settings.binning.value.split('x')]
if self.pxsize is not None:
self.pxsize *= self.binning[0]
except (AttributeError, IndexError, ValueError):
self.binning = None
self.channel_names = [channel.name for channel in image.pixels.channels]
self.channel_names += [chr(97 + i) for i in range(len(self.channel_names), self.shape['c'])]
self.cnamelist = self.channel_names
try:
optovars = [objective for objective in instrument.objectives if 'tubelens' in objective.id.lower()]
except AttributeError:
optovars = []
if len(optovars) == 0:
self.tubelens = None
else:
self.tubelens = optovars[0]
if self.objective:
if self.tubelens:
self.magnification = self.objective.nominal_magnification * self.tubelens.nominal_magnification
else:
self.magnification = self.objective.nominal_magnification
self.NA = self.objective.lens_na
else:
self.magnification = None
self.NA = None
self.gain = [find(instrument.detectors, id=channel.detector_settings.id).amplification_gain
for channel in image.pixels.channels
if channel.detector_settings
and find(instrument.detectors, id=channel.detector_settings.id).amplification_gain]
self.laserwavelengths = [(channel.excitation_wavelength_quantity.to(self.ureg.nm).m,)
for channel in pixels.channels if channel.excitation_wavelength_quantity]
self.laserpowers = try_default(lambda: [(1 - channel.light_source_settings.attenuation,)
for channel in pixels.channels], [])
self.filter = try_default( # type: ignore
lambda: [find(instrument.filter_sets, id=channel.filter_set_ref.id).model
for channel in image.pixels.channels], None)
self.pxsize_um = None if self.pxsize is None else self.pxsize.to(self.ureg.um).m
self.exposuretime_s = [None if i is None else i.to(self.ureg.s).m for i in self.exposuretime]
if axes is None:
self.axes = ''.join(i for i in 'cztyx' if self.shape[i] > 1)
elif axes.lower() == 'full':
self.axes = 'cztyx'
else:
self.axes = axes
self.slice = [np.arange(s, dtype=int) for s in self.shape.yxczt]
m = self.extrametadata
if m is not None:
try:
self.cyllens = m['CylLens']
self.duolink = m['DLFilterSet'].split(' & ')[m['DLFilterChannel']]
if 'FeedbackChannels' in m:
self.feedback = m['FeedbackChannels']
else:
self.feedback = m['FeedbackChannel']
except Exception: # noqa
self.cyllens = ['None', 'None']
self.duolink = 'None'
self.feedback = []
try:
self.cyllenschannels = np.where([self.cyllens[self.detector[c]].lower() != 'none'
for c in range(self.shape['c'])])[0].tolist()
except Exception: # noqa
pass
try:
s = int(re.findall(r'_(\d{3})_', self.duolink)[0]) * ureg.nm
except Exception: # noqa
s = 561 * ureg.nm
try:
sigma = []
for c, d in enumerate(self.detector):
emission = (np.hstack(self.laserwavelengths[c]) + 22) * ureg.nm
sigma.append([emission[emission > s].max(initial=0), emission[emission < s].max(initial=0)][d])
sigma = np.hstack(sigma)
sigma[sigma == 0] = 600 * ureg.nm
sigma /= 2 * self.NA * self.pxsize
self.sigma = sigma.magnitude.tolist() # type: ignore
except Exception: # noqa
self.sigma = [2] * self.shape['c']
if not self.NA:
self.immersionN = 1
elif 1.5 < self.NA:
self.immersionN = 1.661
elif 1.3 < self.NA < 1.5:
self.immersionN = 1.518
elif 1 < self.NA < 1.3:
self.immersionN = 1.33
else:
self.immersionN = 1
p = re.compile(r'(\d+):(\d+)$')
try:
self.track, self.detector = zip(*[[int(i) for i in p.findall(find(
self.ome.images[self.series].pixels.channels, id=f'Channel:{c}').detector_settings.id)[0]]
for c in range(self.shape['c'])])
except Exception: # noqa
pass
def __call__(self, c: int = None, z: int = None, t: int = None, x: int = None, y: int = None) -> np.ndarray: def __call__(self, c: int = None, z: int = None, t: int = None, x: int = None, y: int = None) -> np.ndarray:
""" same as im[] but allowing keyword axes, but slices need to made with slice() or np.s_ """ """ same as im[] but allowing keyword axes, but slices need to made with slice() or np.s_ """
@@ -359,7 +448,7 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC):
# TODO: check output dimensionality when requested shape in some dimension is 1 # TODO: check output dimensionality when requested shape in some dimension is 1
if all([isinstance(s, Number) or a < 0 and s.size == 1 for s, a in zip(new_slice, axes_idx)]): if all([isinstance(s, Number) or a < 0 and s.size == 1 for s, a in zip(new_slice, axes_idx)]):
return self.block(*new_slice).item() return self.block(*new_slice).item() # type: ignore
else: else:
new = View(self) new = View(self)
new.slice = new_slice new.slice = new_slice
@@ -381,8 +470,7 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC):
def __setstate__(self, state: dict[str, Any]) -> None: def __setstate__(self, state: dict[str, Any]) -> None:
""" What happens during unpickling """ """ What happens during unpickling """
self.__dict__.update({key: value for key, value in state.items() if key != 'cache_size'}) self.__dict__.update({key: value for key, value in state.items() if key != 'cache_size'})
if isinstance(self, AbstractReader): self.reader = Reader(str(self.path), int(self.series))
self.open()
self.cache = DequeDict(state.get('cache_size', 16)) self.cache = DequeDict(state.get('cache_size', 16))
def __str__(self) -> str: def __str__(self) -> str:
@@ -406,7 +494,7 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC):
if dtype is not None: if dtype is not None:
block = block.astype(dtype) block = block.astype(dtype)
if block.ndim == 0: if block.ndim == 0:
return block.item() return block.item() # type: ignore
axes = ''.join(j for i, j in enumerate('yxczt') if i not in axes_squeeze) axes = ''.join(j for i, j in enumerate('yxczt') if i not in axes_squeeze)
return block.transpose([axes.find(i) for i in self.shape.axes if i in axes]) return block.transpose([axes.find(i) for i in self.shape.axes if i in axes])
@@ -415,6 +503,7 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC):
""" frame-wise application of np.argmin and np.argmax """ """ frame-wise application of np.argmin and np.argmax """
if axis is None: if axis is None:
value = arg = None value = arg = None
axes = ''.join(i for i in self.axes if i in 'yx') + 'czt'
for idx in product(range(self.shape['c']), range(self.shape['z']), range(self.shape['t'])): for idx in product(range(self.shape['c']), range(self.shape['z']), range(self.shape['t'])):
yxczt = (slice(None), slice(None)) + idx yxczt = (slice(None), slice(None)) + idx
in_idx = tuple(yxczt['yxczt'.find(i)] for i in self.axes) in_idx = tuple(yxczt['yxczt'.find(i)] for i in self.axes)
@@ -424,16 +513,20 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC):
if value is None: if value is None:
arg = new_arg + idx arg = new_arg + idx
value = new_value value = new_value
elif value == new_value:
a = np.ravel_multi_index([arg[axes.find(i)] for i in self.axes], self.shape)
b = np.ravel_multi_index([(new_arg + idx)[axes.find(i)] for i in self.axes], self.shape)
if b < a:
arg = new_arg + idx
else: else:
i = fun((value, new_value)) # type: ignore i = fun((value, new_value)) # type: ignore
arg = (arg, new_arg + idx)[i] arg = (arg, new_arg + idx)[i]
value = (value, new_value)[i] value = (value, new_value)[i]
axes = ''.join(i for i in self.axes if i in 'yx') + 'czt'
arg = np.ravel_multi_index([arg[axes.find(i)] for i in self.axes], self.shape) arg = np.ravel_multi_index([arg[axes.find(i)] for i in self.axes], self.shape)
if out is None: if out is None:
return arg return arg
else: else:
out.itemset(arg) out[:] = arg
return out return out
else: else:
if isinstance(axis, str): if isinstance(axis, str):
@@ -508,7 +601,7 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC):
if out is None: if out is None:
return res return res
else: else:
out.itemset(res) out[:] = res
return out return out
else: else:
if isinstance(axis, str): if isinstance(axis, str):
@@ -613,8 +706,7 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC):
def summary(self) -> str: def summary(self) -> str:
""" gives a helpful summary of the recorded experiment """ """ gives a helpful summary of the recorded experiment """
s = [f'path/filename: {self.path}', s = [f'path/filename: {self.path}',
f'series/pos: {self.series}', f'series/pos: {self.series}']
f"reader: {self.base.__class__.__module__.split('.')[-1]}"]
s.extend((f'dtype: {self.dtype}', s.extend((f'dtype: {self.dtype}',
f'shape ({self.axes}):'.ljust(15) + f"{' x '.join(str(i) for i in self.shape)}")) f'shape ({self.axes}):'.ljust(15) + f"{' x '.join(str(i) for i in self.shape)}"))
if self.pxsize_um: if self.pxsize_um:
@@ -754,7 +846,7 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC):
@wraps(np.ndarray.reshape) @wraps(np.ndarray.reshape)
def reshape(self, *args, **kwargs): def reshape(self, *args, **kwargs):
return np.asarray(self).reshape(*args, **kwargs) return np.asarray(self).reshape(*args, **kwargs) # noqa
@wraps(np.ndarray.squeeze) @wraps(np.ndarray.squeeze)
def squeeze(self, axes=None): def squeeze(self, axes=None):
@@ -859,7 +951,7 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC):
self.cache.move_to_end(key) self.cache.move_to_end(key)
f = self.cache[key] f = self.cache[key]
else: else:
f = self.transform[self.channel_names[c], t].frame(self.__frame__(c, z, t)) f = self.transform[self.channel_names[c], t].frame(self.reader.get_frame(int(c), int(z), int(t)))
if self.frame_decorator is not None: if self.frame_decorator is not None:
f = self.frame_decorator(self, f, c, z, t) f = self.frame_decorator(self, f, c, z, t)
if self.cache.maxlen: if self.cache.maxlen:
@@ -915,12 +1007,6 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC):
czt.append([k % self.shape[i] for k in n]) czt.append([k % self.shape[i] for k in n])
return [self.get_channel(c) for c in czt[0]], *czt[1:3] # type: ignore return [self.get_channel(c) for c in czt[0]], *czt[1:3] # type: ignore
@staticmethod
def bioformats_ome(path: [str, Path]) -> OME:
""" Use java BioFormats to make an ome metadata structure. """
with multiprocessing.get_context('spawn').Pool(1) as pool:
return pool.map(bioformats_ome, (path,))[0]
@staticmethod @staticmethod
def fix_ome(ome: OME) -> OME: def fix_ome(ome: OME) -> OME:
# fix ome if necessary # fix ome if necessary
@@ -945,8 +1031,8 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC):
return OME.from_xml(path.with_suffix('.ome.xml')) return OME.from_xml(path.with_suffix('.ome.xml'))
def get_ome(self) -> OME: def get_ome(self) -> OME:
""" overload this """ """ OME metadata structure """
return self.bioformats_ome(self.path) return from_xml(self.reader.get_ome_xml(), parser='lxml')
@cached_property @cached_property
def ome(self) -> OME: def ome(self) -> OME:
@@ -1116,8 +1202,14 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC):
class View(Imread, ABC): class View(Imread, ABC):
def __init__(self, base: Imread, dtype: DTypeLike = None) -> None: def __init__(self, base: Imread, dtype: DTypeLike = None) -> None:
super().__init__(base.base, base.slice, base.shape, dtype or base.dtype, base.frame_decorator) super().__init__(base)
self.dtype = dtype or base.dtype
self.slice = base.slice
self._shape = Shape(base.shape)
self.frame_decorator = base.frame_decorator
self.transform = base.transform self.transform = base.transform
self.flags = dict(C_CONTIGUOUS=False, F_CONTIGUOUS=False, OWNDATA=False, WRITEABLE=False,
ALIGNED=False, WRITEBACKIFCOPY=False, UPDATEIFCOPY=False)
def __getattr__(self, item: str) -> Any: def __getattr__(self, item: str) -> Any:
if not hasattr(self.base, item): if not hasattr(self.base, item):
@@ -1125,194 +1217,6 @@ class View(Imread, ABC):
return self.base.__getattribute__(item) return self.base.__getattribute__(item)
class AbstractReader(Imread, metaclass=ABCMeta):
priority = 99
do_not_pickle = 'cache'
ureg = ureg
@staticmethod
@abstractmethod
def _can_open(path: Path | str) -> bool:
""" Override this method, and return true when the subclass can open the file """
return False
@staticmethod
def get_positions(path: str | Path) -> Optional[list[int]]:
return None
@abstractmethod
def __frame__(self, c: int, z: int, t: int) -> np.ndarray:
""" Override this, return the frame at c, z, t """
return np.random.randint(0, 255, self.shape['yx'])
def open(self) -> None:
""" Optionally override this, open file handles etc.
filehandles cannot be pickled and should be marked such by setting do_not_pickle = 'file_handle_name' """
return
def close(self) -> None:
""" Optionally override this, close file handles etc. """
return
def __init__(self, path: Path | str | Imread | Any = None, dtype: DTypeLike = None, axes: str = None) -> None:
if isinstance(path, Imread):
return
super().__init__()
self.isclosed = False
if isinstance(path, str):
path = Path(path)
self.path, self.series = self.split_path_series(path)
if isinstance(path, Path) and path.exists():
self.title = self.path.name
self.acquisitiondate = datetime.fromtimestamp(self.path.stat().st_mtime).strftime('%y-%m-%dT%H:%M:%S')
else: # ndarray
self.title = self.__class__.__name__
self.acquisitiondate = 'now'
self.reader = None
self.pcf = None
self.powermode = None
self.collimator = None
self.tirfangle = None
self.cyllens = ['None', 'None']
self.duolink = 'None'
self.detector = [0, 1]
self.track = [0]
self.cache = DequeDict(16)
self.frameoffset = 0, 0 # how far apart the centers of frame and sensor are
self.open()
# extract some metadata from ome
instrument = self.ome.instruments[0] if self.ome.instruments else None
image = self.ome.images[self.series if len(self.ome.images) > 1 else 0]
pixels = image.pixels
self.shape = pixels.size_y, pixels.size_x, pixels.size_c, pixels.size_z, pixels.size_t
self.base_shape = Shape((pixels.size_y, pixels.size_x, pixels.size_c, pixels.size_z, pixels.size_t), 'yxczt')
self.dtype = pixels.type.value if dtype is None else dtype
self.pxsize = pixels.physical_size_x_quantity
try:
self.exposuretime = tuple(find(image.pixels.planes, the_c=c).exposure_time_quantity
for c in range(self.shape['c']))
except AttributeError:
self.exposuretime = ()
if self.zstack:
self.deltaz = image.pixels.physical_size_z_quantity
self.deltaz_um = None if self.deltaz is None else self.deltaz.to(self.ureg.um).m
else:
self.deltaz = self.deltaz_um = None
if image.objective_settings:
self.objective = find(instrument.objectives, id=image.objective_settings.id)
else:
self.objective = None
try:
t0 = find(image.pixels.planes, the_c=0, the_t=0, the_z=0).delta_t
t1 = find(image.pixels.planes, the_c=0, the_t=self.shape['t'] - 1, the_z=0).delta_t
self.timeinterval = (t1 - t0) / (self.shape['t'] - 1) if self.shape['t'] > 1 and t1 > t0 else None
except AttributeError:
self.timeinterval = None
try:
self.binning = [int(i) for i in image.pixels.channels[0].detector_settings.binning.value.split('x')]
if self.pxsize is not None:
self.pxsize *= self.binning[0]
except (AttributeError, IndexError, ValueError):
self.binning = None
self.channel_names = [channel.name for channel in image.pixels.channels]
self.channel_names += [chr(97 + i) for i in range(len(self.channel_names), self.shape['c'])]
self.cnamelist = self.channel_names
try:
optovars = [objective for objective in instrument.objectives if 'tubelens' in objective.id.lower()]
except AttributeError:
optovars = []
if len(optovars) == 0:
self.tubelens = None
else:
self.tubelens = optovars[0]
if self.objective:
if self.tubelens:
self.magnification = self.objective.nominal_magnification * self.tubelens.nominal_magnification
else:
self.magnification = self.objective.nominal_magnification
self.NA = self.objective.lens_na
else:
self.magnification = None
self.NA = None
self.gain = [find(instrument.detectors, id=channel.detector_settings.id).amplification_gain
for channel in image.pixels.channels
if channel.detector_settings
and find(instrument.detectors, id=channel.detector_settings.id).amplification_gain]
self.laserwavelengths = [(channel.excitation_wavelength_quantity.to(self.ureg.nm).m,)
for channel in pixels.channels if channel.excitation_wavelength_quantity]
self.laserpowers = try_default(lambda: [(1 - channel.light_source_settings.attenuation,)
for channel in pixels.channels], [])
self.filter = try_default( # type: ignore
lambda: [find(instrument.filter_sets, id=channel.filter_set_ref.id).model
for channel in image.pixels.channels], None)
self.pxsize_um = None if self.pxsize is None else self.pxsize.to(self.ureg.um).m
self.exposuretime_s = [None if i is None else i.to(self.ureg.s).m for i in self.exposuretime]
if axes is None:
self.axes = ''.join(i for i in 'cztyx' if self.shape[i] > 1)
elif axes.lower() == 'full':
self.axes = 'cztyx'
else:
self.axes = axes
self.slice = [np.arange(s, dtype=int) for s in self.shape.yxczt]
m = self.extrametadata
if m is not None:
try:
self.cyllens = m['CylLens']
self.duolink = m['DLFilterSet'].split(' & ')[m['DLFilterChannel']]
if 'FeedbackChannels' in m:
self.feedback = m['FeedbackChannels']
else:
self.feedback = m['FeedbackChannel']
except Exception: # noqa
self.cyllens = ['None', 'None']
self.duolink = 'None'
self.feedback = []
try:
self.cyllenschannels = np.where([self.cyllens[self.detector[c]].lower() != 'none'
for c in range(self.shape['c'])])[0].tolist()
except Exception: # noqa
pass
try:
s = int(re.findall(r'_(\d{3})_', self.duolink)[0]) * ureg.nm
except Exception: # noqa
s = 561 * ureg.nm
try:
sigma = []
for c, d in enumerate(self.detector):
emission = (np.hstack(self.laserwavelengths[c]) + 22) * ureg.nm
sigma.append([emission[emission > s].max(initial=0), emission[emission < s].max(initial=0)][d])
sigma = np.hstack(sigma)
sigma[sigma == 0] = 600 * ureg.nm
sigma /= 2 * self.NA * self.pxsize
self.sigma = sigma.magnitude.tolist() # type: ignore
except Exception: # noqa
self.sigma = [2] * self.shape['c']
if not self.NA:
self.immersionN = 1
elif 1.5 < self.NA:
self.immersionN = 1.661
elif 1.3 < self.NA < 1.5:
self.immersionN = 1.518
elif 1 < self.NA < 1.3:
self.immersionN = 1.33
else:
self.immersionN = 1
p = re.compile(r'(\d+):(\d+)$')
try:
self.track, self.detector = zip(*[[int(i) for i in p.findall(find(
self.ome.images[self.series].pixels.channels, id=f'Channel:{c}').detector_settings.id)[0]]
for c in range(self.shape['c'])])
except Exception: # noqa
pass
def main() -> None: def main() -> None:
parser = ArgumentParser(description='Display info and save as tif') parser = ArgumentParser(description='Display info and save as tif')
parser.add_argument('-v', '--version', action='version', version=__version__) parser.add_argument('-v', '--version', action='version', version=__version__)
@@ -1352,6 +1256,3 @@ def main() -> None:
f.write(im.ome.to_xml()) f.write(im.ome.to_xml())
if len(args.file) == 1: if len(args.file) == 1:
print(im.summary) print(im.summary)
from .readers import * # noqa

View File

@@ -1 +0,0 @@
__all__ = 'bfread', 'cziread', 'fijiread', 'ndread', 'seqread', 'tifread', 'metaseriesread'

View File

@@ -1,37 +0,0 @@
from __future__ import annotations
from abc import ABC
from pathlib import Path
import numpy as np
from .. import rs
from .. import AbstractReader
for file in (Path(__file__).parent / "deps").glob("*_"):
file.rename(str(file)[:-1])
if not list((Path(__file__).parent / "jassets").glob("bioformats*.jar")):
rs.download_bioformats(True)
class Reader(AbstractReader, ABC):
priority = 99 # panic and open with BioFormats
do_not_pickle = 'reader'
@staticmethod
def _can_open(path: Path) -> bool:
try:
_ = rs.Reader(path)
return True
except Exception:
return False
def open(self) -> None:
self.reader = rs.Reader(str(self.path), int(self.series))
def __frame__(self, c: int, z: int, t: int) -> np.ndarray:
return self.reader.get_frame(int(c), int(z), int(t))
def close(self) -> None:
self.reader.close()

View File

@@ -1,606 +0,0 @@
import re
import warnings
from abc import ABC
from functools import cached_property
from io import BytesIO
from itertools import product
from pathlib import Path
from typing import Any, Callable, Optional, TypeVar
import czifile
import imagecodecs
import numpy as np
from lxml import etree
from ome_types import OME, model
from tifffile import repeat_nd
from .. import AbstractReader
try:
# TODO: use zoom from imagecodecs implementation when available
from scipy.ndimage.interpolation import zoom
except ImportError:
try:
from ndimage.interpolation import zoom
except ImportError:
zoom = None
Element = TypeVar('Element')
def zstd_decode(data: bytes) -> bytes: # noqa
""" decode zstd bytes, copied from BioFormats ZeissCZIReader """
def read_var_int(stream: BytesIO) -> int: # noqa
a = stream.read(1)[0]
if a & 128:
b = stream.read(1)[0]
if b & 128:
c = stream.read(1)[0]
return (c << 14) | ((b & 127) << 7) | (a & 127)
return (b << 7) | (a & 127)
return a & 255
try:
with BytesIO(data) as stream:
size_of_header = read_var_int(stream)
high_low_unpacking = False
while stream.tell() < size_of_header:
chunk_id = read_var_int(stream)
# only one chunk ID defined so far
if chunk_id == 1:
high_low_unpacking = (stream.read(1)[0] & 1) == 1
else:
raise ValueError(f'Invalid chunk id: {chunk_id}')
pointer = stream.tell()
except Exception: # noqa
high_low_unpacking = False
pointer = 0
decoded = imagecodecs.zstd_decode(data[pointer:])
if high_low_unpacking:
second_half = len(decoded) // 2
return bytes([decoded[second_half + i // 2] if i % 2 else decoded[i // 2] for i in range(len(decoded))])
else:
return decoded
def data(self, raw: bool = False, resize: bool = True, order: int = 0) -> np.ndarray:
"""Read image data from file and return as numpy array."""
DECOMPRESS = czifile.czifile.DECOMPRESS # noqa
DECOMPRESS[5] = imagecodecs.zstd_decode
DECOMPRESS[6] = zstd_decode
de = self.directory_entry
fh = self._fh
if raw:
with fh.lock:
fh.seek(self.data_offset)
data = fh.read(self.data_size) # noqa
return data
if de.compression:
# if de.compression not in DECOMPRESS:
# raise ValueError('compression unknown or not supported')
with fh.lock:
fh.seek(self.data_offset)
data = fh.read(self.data_size) # noqa
data = DECOMPRESS[de.compression](data) # noqa
if de.compression == 2:
# LZW
data = np.fromstring(data, de.dtype) # noqa
elif de.compression in (5, 6):
# ZSTD
data = np.frombuffer(data, de.dtype) # noqa
else:
dtype = np.dtype(de.dtype)
with fh.lock:
fh.seek(self.data_offset)
data = fh.read_array(dtype, self.data_size // dtype.itemsize) # noqa
data = data.reshape(de.stored_shape) # noqa
if de.compression != 4 and de.stored_shape[-1] in (3, 4):
if de.stored_shape[-1] == 3:
# BGR -> RGB
data = data[..., ::-1] # noqa
else:
# BGRA -> RGBA
tmp = data[..., 0].copy()
data[..., 0] = data[..., 2]
data[..., 2] = tmp
if de.stored_shape == de.shape or not resize:
return data
# sub / supersampling
factors = [j / i for i, j in zip(de.stored_shape, de.shape)]
factors = [(int(round(f)) if abs(f - round(f)) < 0.0001 else f)
for f in factors]
# use repeat if possible
if order == 0 and all(isinstance(f, int) for f in factors):
data = repeat_nd(data, factors).copy() # noqa
data.shape = de.shape
return data
# remove leading dimensions with size 1 for speed
shape = list(de.stored_shape)
i = 0
for s in shape:
if s != 1:
break
i += 1
shape = shape[i:]
factors = factors[i:]
data.shape = shape
# resize RGB components separately for speed
if zoom is None:
raise ImportError("cannot import 'zoom' from scipy or ndimage")
if shape[-1] in (3, 4) and factors[-1] == 1.0:
factors = factors[:-1]
old = data
data = np.empty(de.shape, de.dtype[-2:]) # noqa
for i in range(shape[-1]):
data[..., i] = zoom(old[..., i], zoom=factors, order=order)
else:
data = zoom(data, zoom=factors, order=order) # noqa
data.shape = de.shape
return data
# monkeypatch zstd into czifile
czifile.czifile.SubBlockSegment.data = data
class Reader(AbstractReader, ABC):
priority = 0
do_not_pickle = 'reader', 'filedict'
@staticmethod
def _can_open(path: Path) -> bool:
return isinstance(path, Path) and path.suffix == '.czi'
def open(self) -> None:
self.reader = czifile.CziFile(self.path)
filedict = {}
for directory_entry in self.reader.filtered_subblock_directory:
idx = self.get_index(directory_entry, self.reader.start)
if 'S' not in self.reader.axes or self.series in range(*idx[self.reader.axes.index('S')]):
for c in range(*idx[self.reader.axes.index('C')]):
for z in range(*idx[self.reader.axes.index('Z')]):
for t in range(*idx[self.reader.axes.index('T')]):
if (c, z, t) in filedict:
filedict[c, z, t].append(directory_entry)
else:
filedict[c, z, t] = [directory_entry]
if len(filedict) == 0:
raise FileNotFoundError(f'Series {self.series} not found in {self.path}.')
self.filedict = filedict # noqa
def close(self) -> None:
self.reader.close()
def get_ome(self) -> OME:
return OmeParse.get_ome(self.reader, self.filedict)
def __frame__(self, c: int = 0, z: int = 0, t: int = 0) -> np.ndarray:
f = np.zeros(self.base_shape['yx'], self.dtype)
if (c, z, t) in self.filedict:
directory_entries = self.filedict[c, z, t]
x_min = min([f.start[f.axes.index('X')] for f in directory_entries])
y_min = min([f.start[f.axes.index('Y')] for f in directory_entries])
xy_min = {'X': x_min, 'Y': y_min}
for directory_entry in directory_entries:
subblock = directory_entry.data_segment()
tile = subblock.data(resize=True, order=0)
axes_min = [xy_min.get(ax, 0) for ax in directory_entry.axes]
index = [slice(i - j - m, i - j + k)
for i, j, k, m in zip(directory_entry.start, self.reader.start, tile.shape, axes_min)]
index = tuple(index[self.reader.axes.index(i)] for i in 'YX')
f[index] = tile.squeeze()
return f
@staticmethod
def get_index(directory_entry: czifile.DirectoryEntryDV, start: tuple[int]) -> list[tuple[int, int]]:
return [(i - j, i - j + k) for i, j, k in zip(directory_entry.start, start, directory_entry.shape)]
class OmeParse:
size_x: int
size_y: int
size_c: int
size_z: int
size_t: int
nm = model.UnitsLength.NANOMETER
um = model.UnitsLength.MICROMETER
@classmethod
def get_ome(cls, reader: czifile.CziFile, filedict: dict[tuple[int, int, int], Any]) -> OME:
new = cls(reader, filedict)
new.parse()
return new.ome
def __init__(self, reader: czifile.CziFile, filedict: dict[tuple[int, int, int], Any]) -> None:
self.reader = reader
self.filedict = filedict
xml = reader.metadata()
self.attachments = {i.attachment_entry.name: i.attachment_entry.data_segment()
for i in reader.attachments()}
self.tree = etree.fromstring(xml)
self.metadata = self.tree.find('Metadata')
version = self.metadata.find('Version')
if version is not None:
self.version = version.text
else:
self.version = self.metadata.find('Experiment').attrib['Version']
self.ome = OME()
self.information = self.metadata.find('Information')
self.display_setting = self.metadata.find('DisplaySetting')
self.experiment = self.metadata.find('Experiment')
self.acquisition_block = self.experiment.find('ExperimentBlocks').find('AcquisitionBlock')
self.instrument = self.information.find('Instrument')
self.image = self.information.find('Image')
if self.version == '1.0':
self.experiment = self.metadata.find('Experiment')
self.acquisition_block = self.experiment.find('ExperimentBlocks').find('AcquisitionBlock')
self.multi_track_setup = self.acquisition_block.find('MultiTrackSetup')
else:
self.experiment = None
self.acquisition_block = None
self.multi_track_setup = None
def parse(self) -> None:
self.get_experimenters()
self.get_instruments()
self.get_detectors()
self.get_objectives()
self.get_tubelenses()
self.get_light_sources()
self.get_filters()
self.get_pixels()
self.get_channels()
self.get_planes()
self.get_annotations()
@staticmethod
def text(item: Optional[Element], default: str = "") -> str:
return default if item is None else item.text
@staticmethod
def def_list(item: Any) -> list[Any]:
return [] if item is None else item
@staticmethod
def try_default(fun: Callable[[Any, ...], Any] | type, default: Any = None, *args: Any, **kwargs: Any) -> Any:
try:
return fun(*args, **kwargs)
except Exception: # noqa
return default
def get_experimenters(self) -> None:
if self.version == '1.0':
self.ome.experimenters = [
model.Experimenter(id='Experimenter:0',
user_name=self.information.find('User').find('DisplayName').text)]
elif self.version in ('1.1', '1.2'):
self.ome.experimenters = [
model.Experimenter(id='Experimenter:0',
user_name=self.information.find('Document').find('UserName').text)]
def get_instruments(self) -> None:
if self.version == '1.0':
self.ome.instruments.append(model.Instrument(id=self.instrument.attrib['Id']))
elif self.version in ('1.1', '1.2'):
for _ in self.instrument.find('Microscopes'):
self.ome.instruments.append(model.Instrument(id='Instrument:0'))
def get_detectors(self) -> None:
if self.version == '1.0':
for detector in self.instrument.find('Detectors'):
try:
detector_type = model.Detector_Type(self.text(detector.find('Type')).upper() or "")
except ValueError:
detector_type = model.Detector_Type.OTHER
self.ome.instruments[0].detectors.append(
model.Detector(
id=detector.attrib['Id'], model=self.text(detector.find('Manufacturer').find('Model')),
amplification_gain=float(self.text(detector.find('AmplificationGain'))),
gain=float(self.text(detector.find('Gain'))), zoom=float(self.text(detector.find('Zoom'))),
type=detector_type
))
elif self.version in ('1.1', '1.2'):
for detector in self.instrument.find('Detectors'):
try:
detector_type = model.Detector_Type(self.text(detector.find('Type')).upper() or "")
except ValueError:
detector_type = model.Detector_Type.OTHER
self.ome.instruments[0].detectors.append(
model.Detector(
id=detector.attrib['Id'].replace(' ', ''),
model=self.text(detector.find('Manufacturer').find('Model')),
type=detector_type
))
def get_objectives(self) -> None:
for objective in self.instrument.find('Objectives'):
self.ome.instruments[0].objectives.append(
model.Objective(
id=objective.attrib['Id'],
model=self.text(objective.find('Manufacturer').find('Model')),
immersion=self.text(objective.find('Immersion')), # type: ignore
lens_na=float(self.text(objective.find('LensNA'))),
nominal_magnification=float(self.text(objective.find('NominalMagnification')))))
def get_tubelenses(self) -> None:
if self.version == '1.0':
for idx, tube_lens in enumerate({self.text(track_setup.find('TubeLensPosition'))
for track_setup in self.multi_track_setup}):
try:
nominal_magnification = float(re.findall(r'\d+[,.]\d*', tube_lens)[0].replace(',', '.'))
except Exception: # noqa
nominal_magnification = 1.0
self.ome.instruments[0].objectives.append(
model.Objective(id=f'Objective:Tubelens:{idx}', model=tube_lens,
nominal_magnification=nominal_magnification))
elif self.version in ('1.1', '1.2'):
for tubelens in self.def_list(self.instrument.find('TubeLenses')):
try:
nominal_magnification = float(re.findall(r'\d+(?:[,.]\d*)?',
tubelens.attrib['Name'])[0].replace(',', '.'))
except Exception: # noqa
nominal_magnification = 1.0
self.ome.instruments[0].objectives.append(
model.Objective(
id=f"Objective:{tubelens.attrib['Id']}",
model=tubelens.attrib['Name'],
nominal_magnification=nominal_magnification))
def get_light_sources(self) -> None:
if self.version == '1.0':
for light_source in self.def_list(self.instrument.find('LightSources')):
try:
if light_source.find('LightSourceType').find('Laser') is not None:
self.ome.instruments[0].lasers.append(
model.Laser(
id=light_source.attrib['Id'],
model=self.text(light_source.find('Manufacturer').find('Model')),
power=float(self.text(light_source.find('Power'))),
wavelength=float(
self.text(light_source.find('LightSourceType').find('Laser').find('Wavelength')))))
except AttributeError:
pass
elif self.version in ('1.1', '1.2'):
for light_source in self.def_list(self.instrument.find('LightSources')):
try:
if light_source.find('LightSourceType').find('Laser') is not None:
self.ome.instruments[0].lasers.append(
model.Laser(
id=f"LightSource:{light_source.attrib['Id']}",
power=float(self.text(light_source.find('Power'))),
wavelength=float(light_source.attrib['Id'][-3:]))) # TODO: follow Id reference
except (AttributeError, ValueError):
pass
def get_filters(self) -> None:
if self.version == '1.0':
for idx, filter_ in enumerate({self.text(beam_splitter.find('Filter'))
for track_setup in self.multi_track_setup
for beam_splitter in track_setup.find('BeamSplitters')}):
self.ome.instruments[0].filter_sets.append(
model.FilterSet(id=f'FilterSet:{idx}', model=filter_)
)
def get_pixels(self) -> None:
x_min = min([f.start[f.axes.index('X')] for f in self.filedict[0, 0, 0]])
y_min = min([f.start[f.axes.index('Y')] for f in self.filedict[0, 0, 0]])
x_max = max([f.start[f.axes.index('X')] + f.shape[f.axes.index('X')] for f in self.filedict[0, 0, 0]])
y_max = max([f.start[f.axes.index('Y')] + f.shape[f.axes.index('Y')] for f in self.filedict[0, 0, 0]])
self.size_x = x_max - x_min
self.size_y = y_max - y_min
self.size_c, self.size_z, self.size_t = (self.reader.shape[self.reader.axes.index(directory_entry)]
for directory_entry in 'CZT')
image = self.information.find('Image')
pixel_type = self.text(image.find('PixelType'), 'Gray16')
if pixel_type.startswith('Gray'):
pixel_type = 'uint' + pixel_type[4:]
objective_settings = image.find('ObjectiveSettings')
self.ome.images.append(
model.Image(
id='Image:0',
name=f"{self.text(self.information.find('Document').find('Name'))} #1",
pixels=model.Pixels(
id='Pixels:0', size_x=self.size_x, size_y=self.size_y,
size_c=self.size_c, size_z=self.size_z, size_t=self.size_t,
dimension_order='XYCZT', type=pixel_type, # type: ignore
significant_bits=int(self.text(image.find('ComponentBitCount'))),
big_endian=False, interleaved=False, metadata_only=True), # type: ignore
experimenter_ref=model.ExperimenterRef(id='Experimenter:0'),
instrument_ref=model.InstrumentRef(id='Instrument:0'),
objective_settings=model.ObjectiveSettings(
id=objective_settings.find('ObjectiveRef').attrib['Id'],
medium=self.text(objective_settings.find('Medium')), # type: ignore
refractive_index=float(self.text(objective_settings.find('RefractiveIndex')))),
stage_label=model.StageLabel(
name=f'Scene position #0',
x=self.positions[0], x_unit=self.um,
y=self.positions[1], y_unit=self.um,
z=self.positions[2], z_unit=self.um)))
for distance in self.metadata.find('Scaling').find('Items'):
if distance.attrib['Id'] == 'X':
self.ome.images[0].pixels.physical_size_x = float(self.text(distance.find('Value'))) * 1e6
elif distance.attrib['Id'] == 'Y':
self.ome.images[0].pixels.physical_size_y = float(self.text(distance.find('Value'))) * 1e6
elif self.size_z > 1 and distance.attrib['Id'] == 'Z':
self.ome.images[0].pixels.physical_size_z = float(self.text(distance.find('Value'))) * 1e6
@cached_property
def positions(self) -> tuple[float, float, Optional[float]]:
if self.version == '1.0':
scenes = self.image.find('Dimensions').find('S').find('Scenes')
positions = scenes[0].find('Positions')[0]
return float(positions.attrib['X']), float(positions.attrib['Y']), float(positions.attrib['Z'])
elif self.version in ('1.1', '1.2'):
try: # TODO
scenes = self.image.find('Dimensions').find('S').find('Scenes')
center_position = [float(pos) for pos in self.text(scenes[0].find('CenterPosition')).split(',')]
except AttributeError:
center_position = [0, 0]
return center_position[0], center_position[1], None
@cached_property
def channels_im(self) -> dict:
return {channel.attrib['Id']: channel for channel in self.image.find('Dimensions').find('Channels')}
@cached_property
def channels_ds(self) -> dict:
return {channel.attrib['Id']: channel for channel in self.display_setting.find('Channels')}
@cached_property
def channels_ts(self) -> dict:
return {detector.attrib['Id']: track_setup
for track_setup in
self.experiment.find('ExperimentBlocks').find('AcquisitionBlock').find('MultiTrackSetup')
for detector in track_setup.find('Detectors')}
def get_channels(self) -> None:
if self.version == '1.0':
for idx, (key, channel) in enumerate(self.channels_im.items()):
detector_settings = channel.find('DetectorSettings')
laser_scan_info = channel.find('LaserScanInfo')
detector = detector_settings.find('Detector')
try:
binning = model.Binning(self.text(detector_settings.find('Binning')))
except ValueError:
binning = model.Binning.OTHER
filterset = self.text(self.channels_ts[key].find('BeamSplitters')[0].find('Filter'))
filterset_idx = [filterset.model for filterset in self.ome.instruments[0].filter_sets].index(filterset)
light_sources_settings = channel.find('LightSourcesSettings')
# no space in ome for multiple lightsources simultaneously
if len(light_sources_settings) > idx:
light_source_settings = light_sources_settings[idx]
else:
light_source_settings = light_sources_settings[0]
light_source_settings = model.LightSourceSettings(
id=light_source_settings.find('LightSource').attrib['Id'],
attenuation=float(self.text(light_source_settings.find('Attenuation'))),
wavelength=float(self.text(light_source_settings.find('Wavelength'))),
wavelength_unit=self.nm)
self.ome.images[0].pixels.channels.append(
model.Channel(
id=f'Channel:{idx}',
name=channel.attrib['Name'],
acquisition_mode=self.text(channel.find('AcquisitionMode')), # type: ignore
color=model.Color(self.text(self.channels_ds[channel.attrib['Id']].find('Color'), 'white')),
detector_settings=model.DetectorSettings(id=detector.attrib['Id'], binning=binning),
# emission_wavelength=text(channel.find('EmissionWavelength')), # TODO: fix
excitation_wavelength=light_source_settings.wavelength,
filter_set_ref=model.FilterSetRef(id=self.ome.instruments[0].filter_sets[filterset_idx].id),
illumination_type=self.text(channel.find('IlluminationType')), # type: ignore
light_source_settings=light_source_settings,
samples_per_pixel=int(self.text(laser_scan_info.find('Averaging')))))
elif self.version in ('1.1', '1.2'):
for idx, (key, channel) in enumerate(self.channels_im.items()):
detector_settings = channel.find('DetectorSettings')
laser_scan_info = channel.find('LaserScanInfo')
detector = detector_settings.find('Detector')
try:
color = model.Color(self.text(self.channels_ds[channel.attrib['Id']].find('Color'), 'white'))
except Exception: # noqa
color = None
try:
if (i := self.text(channel.find('EmissionWavelength'))) != '0':
emission_wavelength = float(i)
else:
emission_wavelength = None
except Exception: # noqa
emission_wavelength = None
if laser_scan_info is not None:
samples_per_pixel = int(self.text(laser_scan_info.find('Averaging'), '1'))
else:
samples_per_pixel = 1
try:
binning = model.Binning(self.text(detector_settings.find('Binning')))
except ValueError:
binning = model.Binning.OTHER
light_sources_settings = channel.find('LightSourcesSettings')
# no space in ome for multiple lightsources simultaneously
if light_sources_settings is not None:
light_source_settings = light_sources_settings[0]
light_source_settings = model.LightSourceSettings(
id='LightSource:' + '_'.join([light_source_settings.find('LightSource').attrib['Id']
for light_source_settings in light_sources_settings]),
attenuation=self.try_default(float, None, self.text(light_source_settings.find('Attenuation'))),
wavelength=self.try_default(float, None, self.text(light_source_settings.find('Wavelength'))),
wavelength_unit=self.nm)
else:
light_source_settings = None
self.ome.images[0].pixels.channels.append(
model.Channel(
id=f'Channel:{idx}',
name=channel.attrib['Name'],
acquisition_mode=self.text(channel.find('AcquisitionMode')).replace( # type: ignore
'SingleMoleculeLocalisation', 'SingleMoleculeImaging'),
color=color,
detector_settings=model.DetectorSettings(
id=detector.attrib['Id'].replace(' ', ""),
binning=binning),
emission_wavelength=emission_wavelength,
excitation_wavelength=self.try_default(float, None,
self.text(channel.find('ExcitationWavelength'))),
# filter_set_ref=model.FilterSetRef(id=ome.instruments[0].filter_sets[filterset_idx].id),
illumination_type=self.text(channel.find('IlluminationType')), # type: ignore
light_source_settings=light_source_settings,
samples_per_pixel=samples_per_pixel))
def get_planes(self) -> None:
try:
exposure_times = [float(self.text(channel.find('LaserScanInfo').find('FrameTime')))
for channel in self.channels_im.values()]
except Exception: # noqa
exposure_times = [None] * len(self.channels_im)
delta_ts = self.attachments['TimeStamps'].data()
dt = np.diff(delta_ts)
if len(dt) and np.std(dt) / np.mean(dt) > 0.02:
dt = np.median(dt[dt > 0])
delta_ts = dt * np.arange(len(delta_ts))
warnings.warn(f'delta_t is inconsistent, using median value: {dt}')
for t, z, c in product(range(self.size_t), range(self.size_z), range(self.size_c)):
self.ome.images[0].pixels.planes.append(
model.Plane(the_c=c, the_z=z, the_t=t, delta_t=delta_ts[t],
exposure_time=exposure_times[c],
position_x=self.positions[0], position_x_unit=self.um,
position_y=self.positions[1], position_y_unit=self.um,
position_z=self.positions[2], position_z_unit=self.um))
def get_annotations(self) -> None:
idx = 0
for layer in [] if (ml := self.metadata.find('Layers')) is None else ml:
rectangle = layer.find('Elements').find('Rectangle')
if rectangle is not None:
geometry = rectangle.find('Geometry')
roi = model.ROI(id=f'ROI:{idx}', description=self.text(layer.find('Usage')))
roi.union.append(
model.Rectangle(
id='Shape:0:0',
height=float(self.text(geometry.find('Height'))),
width=float(self.text(geometry.find('Width'))),
x=float(self.text(geometry.find('Left'))),
y=float(self.text(geometry.find('Top')))))
self.ome.rois.append(roi)
self.ome.images[0].roi_refs.append(model.ROIRef(id=f'ROI:{idx}'))
idx += 1

View File

@@ -1,59 +0,0 @@
from abc import ABC
from itertools import product
from pathlib import Path
from struct import unpack
from warnings import warn
import numpy as np
from ome_types import model
from tifffile import TiffFile
from .. import AbstractReader
class Reader(AbstractReader, ABC):
""" Can read some tif files written with Fiji which are broken because Fiji didn't finish writing. """
priority = 90
do_not_pickle = 'reader'
@staticmethod
def _can_open(path):
if isinstance(path, Path) and path.suffix in ('.tif', '.tiff'):
with TiffFile(path) as tif:
return tif.is_imagej and not tif.is_bigtiff
else:
return False
def __frame__(self, c, z, t): # Override this, return the frame at c, z, t
self.reader.filehandle.seek(self.offset + t * self.count)
return np.reshape(unpack(self.fmt, self.reader.filehandle.read(self.count)), self.base_shape['yx'])
def open(self):
warn(f'File {self.path.name} is probably damaged, opening with fijiread.')
self.reader = TiffFile(self.path)
assert self.reader.pages[0].compression == 1, 'Can only read uncompressed tiff files.'
assert self.reader.pages[0].samplesperpixel == 1, 'Can only read 1 sample per pixel.'
self.offset = self.reader.pages[0].dataoffsets[0] # noqa
self.count = self.reader.pages[0].databytecounts[0] # noqa
self.bytes_per_sample = self.reader.pages[0].bitspersample // 8 # noqa
self.fmt = self.reader.byteorder + self.count // self.bytes_per_sample * 'BHILQ'[self.bytes_per_sample - 1] # noqa
def close(self):
self.reader.close()
def get_ome(self):
size_y, size_x = self.reader.pages[0].shape
size_c, size_z = 1, 1
size_t = int(np.floor((self.reader.filehandle.size - self.reader.pages[0].dataoffsets[0]) / self.count))
pixel_type = model.PixelType(self.reader.pages[0].dtype.name)
ome = model.OME()
ome.instruments.append(model.Instrument())
ome.images.append(
model.Image(
pixels=model.Pixels(
size_c=size_c, size_z=size_z, size_t=size_t, size_x=size_x, size_y=size_y,
dimension_order='XYCZT', type=pixel_type),
objective_settings=model.ObjectiveSettings(id='Objective:0')))
for c, z, t in product(range(size_c), range(size_z), range(size_t)):
ome.images[0].pixels.planes.append(model.Plane(the_c=c, the_z=z, the_t=t, delta_t=0))
return ome

View File

@@ -1,80 +0,0 @@
import re
from abc import ABC
from pathlib import Path
from typing import Optional
import tifffile
from ome_types import model
from ome_types.units import _quantity_property # noqa
from .. import AbstractReader
class Reader(AbstractReader, ABC):
priority = 20
do_not_pickle = 'last_tif'
@staticmethod
def _can_open(path):
return isinstance(path, Path) and (path.is_dir() or
(path.parent.is_dir() and path.name.lower().startswith('pos')))
@staticmethod
def get_positions(path: str | Path) -> Optional[list[int]]:
pat = re.compile(rf's(\d)_t\d+\.(tif|TIF)$')
return sorted({int(m.group(1)) for file in Path(path).iterdir() if (m := pat.search(file.name))})
def get_ome(self):
ome = model.OME()
tif = self.get_tif(0)
metadata = tif.metaseries_metadata
size_z = len(tif.pages)
page = tif.pages[0]
shape = {axis.lower(): size for axis, size in zip(page.axes, page.shape)}
size_x, size_y = shape['x'], shape['y']
ome.instruments.append(model.Instrument())
size_c = 1
size_t = max(self.filedict.keys()) + 1
pixel_type = f"uint{metadata['PlaneInfo']['bits-per-pixel']}"
ome.images.append(
model.Image(
pixels=model.Pixels(
size_c=size_c, size_z=size_z, size_t=size_t,
size_x=size_x, size_y=size_y,
dimension_order='XYCZT', type=pixel_type),
objective_settings=model.ObjectiveSettings(id='Objective:0')))
return ome
def open(self):
pat = re.compile(rf's{self.series}_t\d+\.(tif|TIF)$')
filelist = sorted([file for file in self.path.iterdir() if pat.search(file.name)])
pattern = re.compile(r't(\d+)$')
self.filedict = {int(pattern.search(file.stem).group(1)) - 1: file for file in filelist}
if len(self.filedict) == 0:
raise FileNotFoundError
self.last_tif = 0, tifffile.TiffFile(self.filedict[0])
def close(self) -> None:
self.last_tif[1].close()
def get_tif(self, t: int = None):
last_t, tif = self.last_tif
if (t is None or t == last_t) and not tif.filehandle.closed:
return tif
else:
tif.close()
tif = tifffile.TiffFile(self.filedict[t])
self.last_tif = t, tif
return tif
def __frame__(self, c=0, z=0, t=0):
tif = self.get_tif(t)
page = tif.pages[z]
if page.axes.upper() == 'YX':
return page.asarray()
elif page.axes.upper() == 'XY':
return page.asarray().T
else:
raise NotImplementedError(f'reading axes {page.axes} is not implemented')

View File

@@ -1,53 +0,0 @@
from abc import ABC
from itertools import product
import numpy as np
from ome_types import model
from .. import AbstractReader
class Reader(AbstractReader, ABC):
priority = 20
@staticmethod
def _can_open(path):
return isinstance(path, np.ndarray) and 1 <= path.ndim <= 5
def get_ome(self):
def shape(size_x=1, size_y=1, size_c=1, size_z=1, size_t=1): # noqa
return size_x, size_y, size_c, size_z, size_t
size_x, size_y, size_c, size_z, size_t = shape(*self.array.shape)
try:
pixel_type = model.PixelType(self.array.dtype.name)
except ValueError:
if self.array.dtype.name.startswith('int'):
pixel_type = model.PixelType('int32')
else:
pixel_type = model.PixelType('float')
ome = model.OME()
ome.instruments.append(model.Instrument())
ome.images.append(
model.Image(
pixels=model.Pixels(
size_c=size_c, size_z=size_z, size_t=size_t, size_x=size_x, size_y=size_y,
dimension_order='XYCZT', type=pixel_type),
objective_settings=model.ObjectiveSettings(id='Objective:0')))
for c, z, t in product(range(size_c), range(size_z), range(size_t)):
ome.images[0].pixels.planes.append(model.Plane(the_c=c, the_z=z, the_t=t, delta_t=0))
return ome
def open(self):
if isinstance(self.path, np.ndarray):
self.array = np.array(self.path)
while self.array.ndim < 5:
self.array = np.expand_dims(self.array, -1) # noqa
self.path = 'numpy array'
def __frame__(self, c, z, t):
frame = self.array[:, :, c, z, t]
if self.axes.find('y') > self.axes.find('x'):
return frame.T
else:
return frame

View File

@@ -1,146 +0,0 @@
import re
from abc import ABC
from datetime import datetime
from itertools import product
from pathlib import Path
import tifffile
import yaml
from ome_types import model
from ome_types.units import _quantity_property # noqa
from .. import AbstractReader
def lazy_property(function, field, *arg_fields):
def lazy(self):
if self.__dict__.get(field) is None:
self.__dict__[field] = function(*[getattr(self, arg_field) for arg_field in arg_fields])
try:
self.model_fields_set.add(field)
except Exception: # noqa
pass
return self.__dict__[field]
return property(lazy)
class Plane(model.Plane):
""" Lazily retrieve delta_t from metadata """
def __init__(self, t0, file, **kwargs): # noqa
super().__init__(**kwargs)
# setting fields here because they would be removed by ome_types/pydantic after class definition
setattr(self.__class__, 'delta_t', lazy_property(self.get_delta_t, 'delta_t', 't0', 'file'))
setattr(self.__class__, 'delta_t_quantity', _quantity_property('delta_t'))
self.__dict__['t0'] = t0 # noqa
self.__dict__['file'] = file # noqa
@staticmethod
def get_delta_t(t0, file):
with tifffile.TiffFile(file) as tif:
info = yaml.safe_load(tif.pages[0].tags[50839].value['Info'])
return float((datetime.strptime(info['Time'], '%Y-%m-%d %H:%M:%S %z') - t0).seconds)
class Reader(AbstractReader, ABC):
priority = 10
@staticmethod
def _can_open(path):
pat = re.compile(r'(?:\d+-)?Pos.*', re.IGNORECASE)
return (isinstance(path, Path) and path.is_dir() and
(pat.match(path.name) or any(file.is_dir() and pat.match(file.stem) for file in path.iterdir())))
def get_ome(self):
ome = model.OME()
with tifffile.TiffFile(self.filedict[0, 0, 0]) as tif:
metadata = {key: yaml.safe_load(value) for key, value in tif.pages[0].tags[50839].value.items()}
ome.experimenters.append(
model.Experimenter(id='Experimenter:0', user_name=metadata['Info']['Summary']['UserName']))
objective_str = metadata['Info']['ZeissObjectiveTurret-Label']
ome.instruments.append(model.Instrument())
ome.instruments[0].objectives.append(
model.Objective(
id='Objective:0', manufacturer='Zeiss', model=objective_str,
nominal_magnification=float(re.findall(r'(\d+)x', objective_str)[0]),
lens_na=float(re.findall(r'/(\d\.\d+)', objective_str)[0]),
immersion=model.Objective_Immersion.OIL if 'oil' in objective_str.lower() else None))
tubelens_str = metadata['Info']['ZeissOptovar-Label']
ome.instruments[0].objectives.append(
model.Objective(
id='Objective:Tubelens:0', manufacturer='Zeiss', model=tubelens_str,
nominal_magnification=float(re.findall(r'\d?\d*[,.]?\d+(?=x$)', tubelens_str)[0].replace(',', '.'))))
ome.instruments[0].detectors.append(
model.Detector(
id='Detector:0', amplification_gain=100))
ome.instruments[0].filter_sets.append(
model.FilterSet(id='FilterSet:0', model=metadata['Info']['ZeissReflectorTurret-Label']))
pxsize = metadata['Info']['PixelSizeUm']
pxsize_cam = 6.5 if 'Hamamatsu' in metadata['Info']['Core-Camera'] else None
if pxsize == 0:
pxsize = pxsize_cam / ome.instruments[0].objectives[0].nominal_magnification
pixel_type = metadata['Info']['PixelType'].lower()
if pixel_type.startswith('gray'):
pixel_type = 'uint' + pixel_type[4:]
else:
pixel_type = 'uint16' # assume
size_c, size_z, size_t = (max(i) + 1 for i in zip(*self.filedict.keys()))
t0 = datetime.strptime(metadata['Info']['Time'], '%Y-%m-%d %H:%M:%S %z')
ome.images.append(
model.Image(
pixels=model.Pixels(
size_c=size_c, size_z=size_z, size_t=size_t,
size_x=metadata['Info']['Width'], size_y=metadata['Info']['Height'],
dimension_order='XYCZT', # type: ignore
type=pixel_type, physical_size_x=pxsize, physical_size_y=pxsize,
physical_size_z=metadata['Info']['Summary']['z-step_um']),
objective_settings=model.ObjectiveSettings(id='Objective:0')))
for c, z, t in product(range(size_c), range(size_z), range(size_t)):
ome.images[0].pixels.planes.append(
Plane(t0, self.filedict[c, z, t],
the_c=c, the_z=z, the_t=t, exposure_time=metadata['Info']['Exposure-ms'] / 1000))
# compare channel names from metadata with filenames
pattern_c = re.compile(r'img_\d{3,}_(.*)_\d{3,}$', re.IGNORECASE)
for c in range(size_c):
ome.images[0].pixels.channels.append(
model.Channel(
id=f'Channel:{c}', name=pattern_c.findall(self.filedict[c, 0, 0].stem)[0],
detector_settings=model.DetectorSettings(
id='Detector:0', binning=metadata['Info']['Hamamatsu_sCMOS-Binning']),
filter_set_ref=model.FilterSetRef(id='FilterSet:0')))
return ome
def open(self):
# /some_path/Pos4: path = /some_path, series = 4
# /some_path/5-Pos_001_005: path = /some_path/5-Pos_001_005, series = 0
if re.match(r'(?:\d+-)?Pos.*', self.path.name, re.IGNORECASE) is None:
pat = re.compile(rf'^(?:\d+-)?Pos{self.series}$', re.IGNORECASE)
files = sorted(file for file in self.path.iterdir() if pat.match(file.name))
if len(files):
path = files[0]
else:
raise FileNotFoundError(self.path / pat.pattern)
else:
path = self.path
pat = re.compile(r'^img_\d{3,}.*\d{3,}.*\.tif$', re.IGNORECASE)
filelist = sorted([file for file in path.iterdir() if pat.search(file.name)])
with tifffile.TiffFile(self.path / filelist[0]) as tif:
metadata = {key: yaml.safe_load(value) for key, value in tif.pages[0].tags[50839].value.items()}
# compare channel names from metadata with filenames
cnamelist = metadata['Info']['Summary']['ChNames']
cnamelist = [c for c in cnamelist if any([c in f.name for f in filelist])]
pattern_c = re.compile(r'img_\d{3,}_(.*)_\d{3,}$', re.IGNORECASE)
pattern_z = re.compile(r'(\d{3,})$')
pattern_t = re.compile(r'img_(\d{3,})', re.IGNORECASE)
self.filedict = {(cnamelist.index(pattern_c.findall(file.stem)[0]), # noqa
int(pattern_z.findall(file.stem)[0]),
int(pattern_t.findall(file.stem)[0])): file for file in filelist}
def __frame__(self, c=0, z=0, t=0):
return tifffile.imread(self.path / self.filedict[(c, z, t)])

View File

@@ -1,85 +0,0 @@
from abc import ABC
from functools import cached_property
from itertools import product
from pathlib import Path
import numpy as np
import tifffile
import yaml
from ome_types import model
from .. import AbstractReader
class Reader(AbstractReader, ABC):
priority = 0
do_not_pickle = 'reader'
@staticmethod
def _can_open(path):
if isinstance(path, Path) and path.suffix in ('.tif', '.tiff'):
with tifffile.TiffFile(path) as tif:
return tif.is_imagej and tif.pages[-1]._nextifd() == 0 # noqa
else:
return False
@cached_property
def metadata(self):
return {key: yaml.safe_load(value) if isinstance(value, str) else value
for key, value in self.reader.imagej_metadata.items()}
def get_ome(self):
page = self.reader.pages[0]
size_y = page.imagelength
size_x = page.imagewidth
if self.p_ndim == 3:
size_c = page.samplesperpixel
size_t = self.metadata.get('frames', 1) # // C
else:
size_c = self.metadata.get('channels', 1)
size_t = self.metadata.get('frames', 1)
size_z = self.metadata.get('slices', 1)
if 282 in page.tags and 296 in page.tags and page.tags[296].value == 1:
f = page.tags[282].value
pxsize = f[1] / f[0]
else:
pxsize = None
dtype = page.dtype.name
if dtype not in ('int8', 'int16', 'int32', 'uint8', 'uint16', 'uint32',
'float', 'double', 'complex', 'double-complex', 'bit'):
dtype = 'float'
interval_t = self.metadata.get('interval', 0)
ome = model.OME()
ome.instruments.append(model.Instrument(id='Instrument:0'))
ome.instruments[0].objectives.append(model.Objective(id='Objective:0'))
ome.images.append(
model.Image(
id='Image:0',
pixels=model.Pixels(
id='Pixels:0',
size_c=size_c, size_z=size_z, size_t=size_t, size_x=size_x, size_y=size_y,
dimension_order='XYCZT', type=dtype, # type: ignore
physical_size_x=pxsize, physical_size_y=pxsize),
objective_settings=model.ObjectiveSettings(id='Objective:0')))
for c, z, t in product(range(size_c), range(size_z), range(size_t)):
ome.images[0].pixels.planes.append(model.Plane(the_c=c, the_z=z, the_t=t, delta_t=interval_t * t))
return ome
def open(self):
self.reader = tifffile.TiffFile(self.path)
page = self.reader.pages[0]
self.p_ndim = page.ndim # noqa
if self.p_ndim == 3:
self.p_transpose = [i for i in [page.axes.find(j) for j in 'SYX'] if i >= 0] # noqa
def close(self):
self.reader.close()
def __frame__(self, c, z, t):
if self.p_ndim == 3:
return np.transpose(self.reader.asarray(z + t * self.base_shape['z']), self.p_transpose)[c]
else:
return self.reader.asarray(c + z * self.base_shape['c'] + t * self.base_shape['c'] * self.base_shape['z'])

View File

@@ -1,9 +0,0 @@
from .ndbioimage_rs import Reader, download_bioformats # noqa
from pathlib import Path
if not list((Path(__file__).parent / "jassets").glob("bioformats*.jar")):
download_bioformats(True)
for file in (Path(__file__).parent / "deps").glob("*_"):
file.rename(str(file)[:-1])

View File

@@ -15,7 +15,18 @@ struct PyReader {
impl PyReader { impl PyReader {
#[new] #[new]
fn new(path: &str, series: usize) -> PyResult<Self> { fn new(path: &str, series: usize) -> PyResult<Self> {
let path = PathBuf::from(path); let mut path = PathBuf::from(path);
if path.is_dir() {
for file in path.read_dir()? {
if let Ok(f) = file {
let p = f.path();
if f.path().is_file() & (p.extension() == Some("tif".as_ref())) {
path = p;
break;
}
}
}
}
Ok(PyReader { Ok(PyReader {
reader: Reader::new(&path, series as i32)?, reader: Reader::new(&path, series as i32)?,
}) })
@@ -43,6 +54,11 @@ impl PyReader {
fn get_ome_xml(&self) -> PyResult<String> { fn get_ome_xml(&self) -> PyResult<String> {
Ok(self.reader.get_ome_xml()?) Ok(self.reader.get_ome_xml()?)
} }
fn close(&mut self) -> PyResult<()> {
self.reader.close()?;
Ok(())
}
} }
pub(crate) fn ndbioimage_file() -> anyhow::Result<PathBuf> { pub(crate) fn ndbioimage_file() -> anyhow::Result<PathBuf> {

View File

@@ -1,15 +1,14 @@
import pickle import pickle
from multiprocessing import active_children
from pathlib import Path from pathlib import Path
import pytest import pytest
from ndbioimage import Imread, ReaderNotFoundError from ndbioimage import Imread
@pytest.mark.parametrize('file', (Path(__file__).parent / 'files').iterdir()) @pytest.mark.parametrize('file',
[file for file in (Path(__file__).parent / 'files').iterdir() if not file.suffix == '.pzl'])
def test_open(file): def test_open(file):
try:
with Imread(file) as im: with Imread(file) as im:
mean = im[dict(c=0, z=0, t=0)].mean() mean = im[dict(c=0, z=0, t=0)].mean()
b = pickle.dumps(im) b = pickle.dumps(im)
@@ -20,8 +19,3 @@ def test_open(file):
b = pickle.dumps(v) b = pickle.dumps(v)
w = pickle.loads(b) w = pickle.loads(b)
assert w[dict(c=0, z=0, t=0)].mean() == mean assert w[dict(c=0, z=0, t=0)].mean() == mean
except ReaderNotFoundError:
assert len(Imread.__subclasses__()), 'No subclasses for Imread found.'
for child in active_children():
child.kill()

View File

@@ -1,20 +1,32 @@
import tempfile
from itertools import combinations_with_replacement from itertools import combinations_with_replacement
from numbers import Number from numbers import Number
from pathlib import Path
import numpy as np import numpy as np
import pytest import pytest
from tiffwrite import tiffwrite
from ndbioimage import Imread from ndbioimage import Imread
r = np.random.randint(0, 255, (64, 64, 2, 3, 4))
im = Imread(r) @pytest.fixture
a = np.array(im) def array():
return np.random.randint(0, 255, (64, 64, 2, 3, 4), 'uint8')
@pytest.fixture()
def image(array):
with tempfile.TemporaryDirectory() as folder:
file = Path(folder) / "test.tif"
tiffwrite(file, array, 'yxczt')
with Imread(file, axes='yxczt') as im:
yield im
@pytest.mark.parametrize('s', combinations_with_replacement( @pytest.mark.parametrize('s', combinations_with_replacement(
(0, -1, 1, slice(None), slice(0, 1), slice(-1, 0), slice(1, 1)), 5)) (0, -1, 1, slice(None), slice(0, 1), slice(-1, 0), slice(1, 1)), 5))
def test_slicing(s): def test_slicing(s, image, array):
s_im, s_a = im[s], a[s] s_im, s_a = image[s], array[s]
if isinstance(s_a, Number): if isinstance(s_a, Number):
assert isinstance(s_im, Number) assert isinstance(s_im, Number)
assert s_im == s_a assert s_im == s_a

View File

@@ -1,19 +1,31 @@
import tempfile
from itertools import product from itertools import product
from pathlib import Path
import numpy as np import numpy as np
import pytest import pytest
from tiffwrite import tiffwrite
from ndbioimage import Imread from ndbioimage import Imread
r = np.random.randint(0, 255, (64, 64, 2, 3, 4))
im = Imread(r) @pytest.fixture
a = np.array(im) def array():
return np.random.randint(0, 255, (64, 64, 2, 3, 4), 'uint16')
@pytest.fixture()
def image(array):
with tempfile.TemporaryDirectory() as folder:
file = Path(folder) / "test.tif"
tiffwrite(file, array, 'yxczt')
with Imread(file, axes='yxczt') as im:
yield im
@pytest.mark.parametrize('fun_and_axis', product( @pytest.mark.parametrize('fun_and_axis', product(
(np.sum, np.nansum, np.min, np.nanmin, np.max, np.nanmax, np.argmin, np.argmax, (np.sum, np.nansum, np.min, np.nanmin, np.max, np.nanmax, np.argmin, np.argmax,
np.mean, np.nanmean, np.var, np.nanvar, np.std, np.nanstd), (None, 0, 1, 2, 3, 4))) np.mean, np.nanmean, np.var, np.nanvar, np.std, np.nanstd), (None, 0, 1, 2, 3, 4)))
def test_ufuncs(fun_and_axis): def test_ufuncs(fun_and_axis, image, array):
fun, axis = fun_and_axis fun, axis = fun_and_axis
assert np.all(np.isclose(fun(im, axis), fun(a, axis))), \ assert np.all(np.isclose(fun(image, axis), fun(array, axis))), \
f'function {fun.__name__} over axis {axis} does not give the correct result' f'function {fun.__name__} over axis {axis} does not give the correct result'