Files
ndbioimage/ndbioimage/__init__.py
2025-08-06 11:03:03 +02:00

1584 lines
61 KiB
Python
Executable File

from __future__ import annotations
import multiprocessing
import os
import re
import warnings
from abc import ABC, ABCMeta, abstractmethod
from argparse import ArgumentParser
from collections import OrderedDict
from datetime import datetime
from functools import cached_property, wraps
from importlib.metadata import version
from itertools import product
from operator import truediv
from pathlib import Path
from traceback import print_exc
from typing import Any, Callable, Generator, Iterable, Optional, Sequence, TypeVar
import numpy as np
import yaml
from numpy.typing import ArrayLike, DTypeLike
from ome_types import OME, from_xml, model, ureg
from pint import set_application_registry
from tiffwrite import FrameInfo, IJTiffParallel
from tqdm.auto import tqdm
from .jvm import JVM, JVMException # noqa: F401
from .transforms import Transform, Transforms # noqa: F401
try:
__version__ = version(Path(__file__).parent.name)
except Exception: # noqa
__version__ = "unknown"
try:
with open(Path(__file__).parent.parent / ".git" / "HEAD") as g:
head = g.read().split(":")[1].strip()
with open(Path(__file__).parent.parent / ".git" / head) as h:
__git_commit_hash__ = h.read().rstrip("\n")
except Exception: # noqa
__git_commit_hash__ = "unknown"
ureg.default_format = "~P"
set_application_registry(ureg)
warnings.filterwarnings("ignore", "Reference to unknown ID")
Number = int | float | np.integer | np.floating
class ReaderNotFoundError(Exception):
pass
class TransformTiff(IJTiffParallel):
"""transform frames in a parallel process to speed up saving"""
def __init__(self, image: Imread, *args: Any, **kwargs: Any) -> None:
self.image = image
super().__init__(*args, **kwargs)
def parallel(self, frame: tuple[int, int, int]) -> tuple[FrameInfo]:
return ((np.asarray(self.image(*frame)), 0, 0, 0),)
class DequeDict(OrderedDict):
def __init__(self, maxlen: int = None, *args: Any, **kwargs: Any) -> None:
self.maxlen = maxlen
super().__init__(*args, **kwargs)
def __setitem__(self, *args: Any, **kwargs: Any) -> None:
super().__setitem__(*args, **kwargs)
self.truncate()
def truncate(self) -> None:
if self.maxlen is not None:
while len(self) > self.maxlen:
self.popitem(False)
def update(self, *args: Any, **kwargs: Any) -> None:
super().update(*args, **kwargs) # type: ignore
self.truncate()
def find(obj: Sequence[Any], **kwargs: Any) -> Any:
for item in obj:
try:
if all([getattr(item, key) == value for key, value in kwargs.items()]):
return item
except AttributeError:
pass
R = TypeVar("R")
def try_default(fun: Callable[..., R], default: Any, *args: Any, **kwargs: Any) -> R:
try:
return fun(*args, **kwargs)
except Exception: # noqa
return default
def bioformats_ome(path: str | Path) -> OME:
from .readers.bfread import jars
try:
jvm = JVM(jars) # noqa
ome_meta = jvm.metadata_tools.createOMEXMLMetadata()
reader = jvm.image_reader()
reader.setMetadataStore(ome_meta)
reader.setId(str(path))
ome = from_xml(str(ome_meta.dumpXML()), parser="lxml")
except Exception: # noqa
print_exc()
ome = model.OME()
finally:
jvm.kill_vm() # noqa
return ome
class Shape(tuple):
def __new__(cls, shape: Sequence[int] | Shape, axes: str = "yxczt") -> Shape:
if isinstance(shape, Shape):
axes = shape.axes # type: ignore
new = super().__new__(cls, shape)
new.axes = axes.lower()
return new # type: ignore
def __getitem__(self, n: int | str) -> int | tuple[int]:
if isinstance(n, str):
if len(n) == 1:
return self[self.axes.find(n.lower())] if n.lower() in self.axes else 1
else:
return tuple(self[i] for i in n) # type: ignore
return super().__getitem__(n)
@cached_property
def yxczt(self) -> tuple[int, int, int, int, int]:
return tuple(self[i] for i in "yxczt") # type: ignore
class OmeCache(DequeDict):
"""prevent (potentially expensive) rereading of ome data by caching"""
instance = None
def __new__(cls) -> OmeCache:
if cls.instance is None:
cls.instance = super().__new__(cls)
return cls.instance
def __init__(self) -> None:
super().__init__(64)
def __reduce__(self) -> tuple[type, tuple]:
return self.__class__, ()
def __getitem__(self, path: Path | str | tuple) -> OME:
if isinstance(path, tuple):
return super().__getitem__(path)
else:
return super().__getitem__(self.path_and_lstat(path))
def __setitem__(self, path: Path | str | tuple, value: OME) -> None:
if isinstance(path, tuple):
super().__setitem__(path, value)
else:
super().__setitem__(self.path_and_lstat(path), value)
def __contains__(self, path: Path | str | tuple) -> bool:
if isinstance(path, tuple):
return super().__contains__(path)
else:
return super().__contains__(self.path_and_lstat(path))
@staticmethod
def path_and_lstat(path: str | Path) -> tuple[Path, Optional[os.stat_result], Optional[os.stat_result]]:
path = Path(path)
return (
path,
(path.lstat() if path.exists() else None),
(path.with_suffix(".ome.xml").lstat() if path.with_suffix(".ome.xml").exists() else None),
)
def get_positions(path: str | Path) -> Optional[list[int]]:
subclass = AbstractReader.get_subclass(path)
return subclass.get_positions(AbstractReader.split_path_series(path)[0])
class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC):
"""class to read image files, while taking good care of important metadata,
currently optimized for .czi files, but can open anything that bioformats can handle
path: path to the image file
optional:
axes: order of axes, default: cztyx, but omitting any axes with lenght 1
dtype: datatype to be used when returning frames
modify images on the fly with a decorator function:
define a function which takes an instance of this object, one image frame,
and the coordinates c, z, t as arguments, and one image frame as return
>> imread.frame_decorator = fun
then use imread as usually
Examples:
>> im = Imread('/path/to/file.image', axes='czt)
>> im
<< shows summary
>> im.shape
<< (15, 26, 1000, 1000)
>> im.axes
<< 'ztyx'
>> plt.imshow(im[1, 0])
<< plots frame at position z=1, t=0 (python type indexing)
>> plt.imshow(im[:, 0].max('z'))
<< plots max-z projection at t=0
>> im.pxsize_um
<< 0.09708737864077668 image-plane pixel size in um
>> im.laserwavelengths
<< [642, 488]
>> im.laserpowers
<< [0.02, 0.0005] in %
See __init__ and other functions for more ideas.
Subclassing:
Subclass AbstractReader to add more file types. A subclass should always have at least the following
methods:
staticmethod _can_open(path): returns True when the subclass can open the image in path
__frame__(self, c, z, t): this should return a single frame at channel c, slice z and time t
optional open(self): code to be run during initialization, e.g. to open a file handle
optional close(self): close the file in a proper way
optional class field priority: subclasses with lower priority will be tried first, default = 99
optional get_ome(self) -> OME: return an OME structure with metadata,
if not present bioformats will be used to generate an OME
Any other method can be overridden as needed
"""
isclosed: Optional[bool]
channel_names: Optional[list[str]]
series: Optional[int]
pxsize_um: Optional[float]
deltaz_um: Optional[float]
exposuretime_s: Optional[list[float]]
timeinterval: Optional[float]
binning: Optional[list[int]]
laserwavelengths: Optional[list[tuple[float]]]
laserpowers: Optional[list[tuple[float]]]
objective: Optional[model.Objective]
magnification: Optional[float]
tubelens: Optional[model.Objective]
filter: Optional[str]
powermode: Optional[str]
collimator: Optional[str]
tirfangle: Optional[list[float]]
gain: Optional[list[float]]
pcf: Optional[list[float]]
path: Path
__frame__: Callable[[int, int, int], np.ndarray]
@staticmethod
def get_subclass(path: Path | str | Any):
if len(AbstractReader.__subclasses__()) == 0:
raise Exception("Restart python kernel please!")
path, _ = AbstractReader.split_path_series(path)
for subclass in sorted(AbstractReader.__subclasses__(), key=lambda subclass_: subclass_.priority):
if subclass._can_open(path): # noqa
do_not_pickle = (
(AbstractReader.do_not_pickle,)
if isinstance(AbstractReader.do_not_pickle, str)
else AbstractReader.do_not_pickle
)
subclass_do_not_pickle = (
(subclass.do_not_pickle,)
if isinstance(subclass.do_not_pickle, str)
else subclass.do_not_pickle
if hasattr(subclass, "do_not_pickle")
else ()
)
subclass.do_not_pickle = set(do_not_pickle).union(set(subclass_do_not_pickle))
return subclass
raise ReaderNotFoundError(f"No reader found for {path}.")
def __new__(cls, path: Path | str | Imread | Any = None, dtype: DTypeLike = None, axes: str = None) -> Imread:
if cls is not Imread:
return super().__new__(cls)
if isinstance(path, Imread):
return path
subclass = cls.get_subclass(path)
do_not_pickle = (
(AbstractReader.do_not_pickle,)
if isinstance(AbstractReader.do_not_pickle, str)
else AbstractReader.do_not_pickle
)
subclass_do_not_pickle = (
(subclass.do_not_pickle,)
if isinstance(subclass.do_not_pickle, str)
else subclass.do_not_pickle
if hasattr(subclass, "do_not_pickle")
else ()
)
subclass.do_not_pickle = set(do_not_pickle).union(set(subclass_do_not_pickle))
return super().__new__(subclass)
def __init__(self, *args: Any, **kwargs: Any):
def parse(
base: Imread = None, # noqa
slice: tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray] = None, # noqa
shape: tuple[int, ...] = (0, 0, 0, 0, 0), # noqa
dtype: DTypeLike = None, # noqa
frame_decorator: Callable[[Imread, np.ndarray, int, int, int], np.ndarray] = None, # noqa
) -> tuple[Any, ...]:
return base, slice, shape, dtype, frame_decorator
base, slice, shape, dtype, frame_decorator = parse(*args, **kwargs) # noqa
self.base = base or self
self.slice = slice
self._shape = Shape(shape)
self.dtype = dtype
self.frame_decorator = frame_decorator
self.transform = Transforms()
self.flags = dict(
C_CONTIGUOUS=False,
F_CONTIGUOUS=False,
OWNDATA=False,
WRITEABLE=False,
ALIGNED=False,
WRITEBACKIFCOPY=False,
UPDATEIFCOPY=False,
)
def __call__(self, c: int = None, z: int = None, t: int = None, x: int = None, y: int = None) -> np.ndarray:
"""same as im[] but allowing keyword axes, but slices need to made with slice() or np.s_"""
return self[{k: slice(v) if v is None else v for k, v in dict(c=c, z=z, t=t, x=x, y=y).items()}]
def __copy__(self) -> Imread:
return self.copy()
def __contains__(self, item: Number) -> bool:
def unique_yield(a: Iterable[Any], b: Iterable[Any]) -> Generator[Any, None, None]:
for k in a:
yield k
for k in b:
if k not in a:
yield k
for idx in unique_yield(
[key[:3] for key in self.cache.keys()],
product(range(self.shape["c"]), range(self.shape["z"]), range(self.shape["t"])),
):
yxczt = (slice(None), slice(None)) + idx
in_idx = tuple(yxczt["yxczt".find(i)] for i in self.axes)
if item in np.asarray(self[in_idx]):
return True
return False
def __enter__(self) -> Imread:
return self
def __exit__(self, *args: Any, **kwargs: Any) -> None:
if not self.isclosed:
self.isclosed = True
if hasattr(self, "close"):
self.close()
def __getitem__(
self,
n: int
| Sequence[int]
| Sequence[slice]
| slice
| type(Ellipsis)
| dict[str, int | Sequence[int] | Sequence[slice] | slice | type(Ellipsis)],
) -> Number | Imread | np.ndarray:
"""slice like a numpy array but return an Imread instance"""
if self.isclosed:
raise OSError("file is closed")
if isinstance(n, (slice, Number)): # None = :
n = (n,)
elif isinstance(n, type(Ellipsis)):
n = (None,) * len(self.axes)
elif isinstance(n, dict): # allow im[dict(z=0)] etc.
n = [n.get(i, slice(None)) for i in self.axes]
n = list(n)
# deal with ...
ell = [i for i, e in enumerate(n) if isinstance(e, type(Ellipsis))]
if len(ell) > 1:
raise IndexError("an index can only have a single ellipsis (...)")
if len(ell):
if len(n) > self.ndim:
n.remove(Ellipsis)
else:
n[ell[0]] = None
while len(n) < self.ndim:
n.insert(ell[0], None)
while len(n) < self.ndim:
n.append(None)
axes_idx = [self.shape.axes.find(i) for i in "yxczt"]
n = [n[j] if 0 <= j < len(n) else None for j in axes_idx] # reorder n
new_slice = []
for s, e in zip(self.slice, n):
if e is None:
new_slice.append(s)
else:
new_slice.append(s[e])
# TODO: check output dimensionality when requested shape in some dimension is 1
if all([isinstance(s, Number) or a < 0 and s.size == 1 for s, a in zip(new_slice, axes_idx)]):
return self.block(*new_slice).item()
else:
new = View(self)
new.slice = new_slice
new._shape = Shape([1 if isinstance(s, Number) else len(s) for s in new_slice])
new.axes = "".join(
j for j in self.axes if j in [i for i, s in zip("yxczt", new_slice) if not isinstance(s, Number)]
)
return new
def __getstate__(self) -> dict[str:Any]:
return {key: value for key, value in self.__dict__.items() if key not in self.do_not_pickle} | {
"cache_size": self.cache.maxlen
}
def __len__(self) -> int:
return self.shape[0]
def __repr__(self) -> str:
return self.summary
def __setstate__(self, state: dict[str, Any]) -> None:
"""What happens during unpickling"""
self.__dict__.update({key: value for key, value in state.items() if key != "cache_size"})
if isinstance(self, AbstractReader):
self.open()
self.cache = DequeDict(state.get("cache_size", 16))
def __str__(self) -> str:
return str(self.path)
# @property
# def __array_interface__(self) -> dict[str, Any]:
# return dict(shape=tuple(self.shape), typestr=self.dtype.str, version=3, data=self.tobytes())
def tobytes(self) -> bytes:
return self.flatten().tobytes()
def __array__(self, dtype: DTypeLike = None, copy: bool = None) -> np.ndarray:
if copy is False:
raise ValueError("`copy=False` isn't supported. A copy is always created.")
block = self.block(*self.slice)
axes_idx = [self.shape.axes.find(i) for i in "yxczt"]
axes_squeeze = tuple(
{i for i, j in enumerate(axes_idx) if j == -1}.union(
{i for i, j in enumerate(self.slice) if isinstance(j, Number)}
)
)
block = block.squeeze(axes_squeeze)
if dtype is not None:
block = block.astype(dtype)
if block.ndim == 0:
return block.item()
axes = "".join(j for i, j in enumerate("yxczt") if i not in axes_squeeze)
return block.transpose([axes.find(i) for i in self.shape.axes if i in axes])
def __array_arg_fun__(
self,
fun: Callable[[ArrayLike, Optional[int]], Number | np.ndarray],
axis: int | str = None,
out: np.ndarray = None,
) -> Number | np.ndarray:
"""frame-wise application of np.argmin and np.argmax"""
if axis is None:
value = arg = None
for idx in product(range(self.shape["c"]), range(self.shape["z"]), range(self.shape["t"])):
yxczt = (slice(None), slice(None)) + idx
in_idx = tuple(yxczt["yxczt".find(i)] for i in self.axes)
new = np.asarray(self[in_idx])
new_arg = np.unravel_index(fun(new), new.shape) # type: ignore
new_value = new[new_arg]
if value is None:
arg = new_arg + idx
value = new_value
else:
i = fun((value, new_value)) # type: ignore
arg = (arg, new_arg + idx)[i]
value = (value, new_value)[i]
axes = "".join(i for i in self.axes if i in "yx") + "czt"
arg = np.ravel_multi_index([arg[axes.find(i)] for i in self.axes], self.shape)
if out is None:
return arg
else:
out.itemset(arg)
return out
else:
if isinstance(axis, str):
axis_str, axis_idx = axis, self.axes.index(axis)
else:
axis_str, axis_idx = self.axes[axis], axis
if axis_str not in self.axes:
raise IndexError(f"Axis {axis_str} not in {self.axes}.")
out_shape = list(self.shape)
out_axes = list(self.axes)
out_shape.pop(axis_idx)
out_axes.pop(axis_idx)
if out is None:
out = np.zeros(out_shape, int)
if axis_str in "yx":
for idx in product(range(self.shape["c"]), range(self.shape["z"]), range(self.shape["t"])):
yxczt = (slice(None), slice(None)) + idx
out_idx = tuple(yxczt["yxczt".find(i)] for i in out_axes)
in_idx = tuple(yxczt["yxczt".find(i)] for i in self.axes)
new = self[in_idx]
out[out_idx] = fun(np.asarray(new), new.axes.find(axis_str))
else:
value = np.zeros(out.shape, self.dtype)
for idx in product(range(self.shape["c"]), range(self.shape["z"]), range(self.shape["t"])):
yxczt = (slice(None), slice(None)) + idx
out_idx = tuple(yxczt["yxczt".find(i)] for i in out_axes)
in_idx = tuple(yxczt["yxczt".find(i)] for i in self.axes)
new_value = self[in_idx]
new_arg = np.full_like(new_value, idx["czt".find(axis_str)])
if idx["czt".find(axis_str)] == 0:
value[out_idx] = new_value
out[out_idx] = new_arg
else:
old_value = value[out_idx]
i = fun((old_value, new_value), 0)
value[out_idx] = np.where(i, new_value, old_value)
out[out_idx] = np.where(i, new_arg, out[out_idx])
return out
def __array_fun__(
self,
funs: Sequence[Callable[[ArrayLike], Number | np.ndarray]],
axis: int | str = None,
dtype: DTypeLike = None,
out: np.ndarray = None,
keepdims: bool = False,
initials: list[Number | np.ndarray] = None,
where: bool | int | np.ndarray = True,
ffuns: Sequence[Callable[[ArrayLike], np.ndarray]] = None,
cfun: Callable[..., np.ndarray] = None,
) -> Number | np.ndarray:
"""frame-wise application of np.min, np.max, np.sum, np.mean and their nan equivalents"""
p = re.compile(r"\d")
dtype = self.dtype if dtype is None else np.dtype(dtype)
if initials is None:
initials = [None for _ in funs]
if ffuns is None:
ffuns = [None for _ in funs]
def ffun_(frame: ArrayLike) -> np.ndarray:
return np.asarray(frame)
ffuns = [ffun_ if ffun is None else ffun for ffun in ffuns]
if cfun is None:
def cfun(*res): # noqa
return res[0]
# TODO: smarter transforms
if axis is None:
for idx in product(range(self.shape["c"]), range(self.shape["z"]), range(self.shape["t"])):
yxczt = (slice(None), slice(None)) + idx
in_idx = tuple(yxczt["yxczt".find(i)] for i in self.axes)
w = where if where is None or isinstance(where, bool) else where[in_idx]
initials = [
fun(np.asarray(ffun(self[in_idx])), initial=initial, where=w) # type: ignore
for fun, ffun, initial in zip(funs, ffuns, initials)
]
res = cfun(*initials)
res = (np.round(res) if dtype.kind in "ui" else res).astype(p.sub("", dtype.name))
if keepdims:
res = np.array(res, dtype, ndmin=self.ndim)
if out is None:
return res
else:
out.itemset(res)
return out
else:
if isinstance(axis, str):
axis_str, axis_idx = axis, self.axes.index(axis)
else:
axis_idx = axis % self.ndim
axis_str = self.axes[axis_idx]
if axis_str not in self.axes:
raise IndexError(f"Axis {axis_str} not in {self.axes}.")
out_shape = list(self.shape)
out_axes = list(self.axes)
if not keepdims:
out_shape.pop(axis_idx)
out_axes.pop(axis_idx)
if out is None:
out = np.zeros(out_shape, dtype)
if axis_str in "yx":
yx = "yx" if self.axes.find("x") > self.axes.find("y") else "yx"
frame_ax = yx.find(axis_str)
for idx in product(range(self.shape["c"]), range(self.shape["z"]), range(self.shape["t"])):
yxczt = (slice(None), slice(None)) + idx
out_idx = tuple(yxczt["yxczt".find(i)] for i in out_axes)
in_idx = tuple(yxczt["yxczt".find(i)] for i in self.axes)
w = where if where is None or isinstance(where, bool) else where[in_idx]
res = cfun(
*[
fun(ffun(self[in_idx]), frame_ax, initial=initial, where=w) # type: ignore
for fun, ffun, initial in zip(funs, ffuns, initials)
]
)
out[out_idx] = (np.round(res) if out.dtype.kind in "ui" else res).astype(p.sub("", dtype.name))
else:
tmps = [np.zeros(out_shape) for _ in ffuns]
for idx in product(range(self.shape["c"]), range(self.shape["z"]), range(self.shape["t"])):
yxczt = (slice(None), slice(None)) + idx
out_idx = tuple(yxczt["yxczt".find(i)] for i in out_axes)
in_idx = tuple(yxczt["yxczt".find(i)] for i in self.axes)
if idx["czt".find(axis_str)] == 0:
w = where if where is None or isinstance(where, bool) else (where[in_idx],)
for tmp, fun, ffun, initial in zip(tmps, funs, ffuns, initials):
tmp[out_idx] = fun((ffun(self[in_idx]),), 0, initial=initial, where=w) # type: ignore
else:
w = (
where
if where is None or isinstance(where, bool)
else (np.ones_like(where[in_idx]), where[in_idx])
)
for tmp, fun, ffun in zip(tmps, funs, ffuns):
tmp[out_idx] = fun((tmp[out_idx], ffun(self[in_idx])), 0, where=w) # type: ignore
out[...] = (np.round(cfun(*tmps)) if out.dtype.kind in "ui" else cfun(*tmps)).astype(
p.sub("", dtype.name)
)
return out
@property
def axes(self) -> str:
return self.shape.axes
@axes.setter
def axes(self, value: str) -> None:
shape = self.shape[value]
if isinstance(shape, Number):
shape = (shape,)
self._shape = Shape(shape, value)
@property
def dtype(self) -> np.dtype:
return self._dtype
@dtype.setter
def dtype(self, value: DTypeLike) -> None:
self._dtype = np.dtype(value)
@cached_property
def extrametadata(self) -> Optional[Any]:
if isinstance(self.path, Path):
if self.path.with_suffix(".pzl2").exists():
pname = self.path.with_suffix(".pzl2")
elif self.path.with_suffix(".pzl").exists():
pname = self.path.with_suffix(".pzl")
else:
return None
try:
return self.get_config(pname)
except Exception: # noqa
return None
return None
@property
def ndim(self) -> int:
return len(self.shape)
@property
def size(self) -> int:
return np.prod(self.shape) # type: ignore
@property
def shape(self) -> Shape:
return self._shape
@shape.setter
def shape(self, value: Shape | tuple[int, ...]) -> None:
if isinstance(value, Shape):
self._shape = value
else:
self._shape = Shape([value["yxczt".find(i.lower())] for i in self.axes], self.axes)
@property
def summary(self) -> str:
"""gives a helpful summary of the recorded experiment"""
s = [
f"path/filename: {self.path}",
f"series/pos: {self.series}",
f"reader: {self.base.__class__.__module__.split('.')[-1]}",
]
s.extend(
(
f"dtype: {self.dtype}",
f"shape ({self.axes}):".ljust(15) + f"{' x '.join(str(i) for i in self.shape)}",
)
)
if self.pxsize_um:
s.append(f"pixel size: {1000 * self.pxsize_um:.2f} nm")
if self.zstack and self.deltaz_um:
s.append(f"z-interval: {1000 * self.deltaz_um:.2f} nm")
if self.exposuretime_s and not all(e is None for e in self.exposuretime_s):
s.append(f"exposuretime: {self.exposuretime_s[0]:.2f} s")
if self.timeseries and self.timeinterval:
s.append(f"time interval: {self.timeinterval:.3f} s")
if self.binning:
s.append("binning: {}x{}".format(*self.binning))
if self.laserwavelengths:
s.append(
"laser colors: "
+ " | ".join([" & ".join(len(w) * ("{:.0f}",)).format(*w) for w in self.laserwavelengths])
+ " nm"
)
if self.laserpowers:
s.append(
"laser powers: "
+ " | ".join(
[" & ".join(len(p) * ("{:.3g}",)).format(*[100 * i for i in p]) for p in self.laserpowers]
)
+ " %"
)
if self.objective and self.objective.model:
s.append(f"objective: {self.objective.model}")
if self.magnification:
s.append(f"magnification: {self.magnification}x")
if self.tubelens and self.tubelens.model:
s.append(f"tubelens: {self.tubelens.model}")
if self.filter:
s.append(f"filterset: {self.filter}")
if self.powermode:
s.append(f"powermode: {self.powermode}")
if self.collimator:
s.append("collimator: " + (" {}" * len(self.collimator)).format(*self.collimator))
if self.tirfangle:
s.append("TIRF angle: " + (" {:.2f}°" * len(self.tirfangle)).format(*self.tirfangle))
if self.gain:
s.append("gain: " + (" {:.0f}" * len(self.gain)).format(*self.gain))
if self.pcf:
s.append("pcf: " + (" {:.2f}" * len(self.pcf)).format(*self.pcf))
return "\n".join(s)
@property
def T(self) -> Imread: # noqa
return self.transpose() # type: ignore
@cached_property
def timeseries(self) -> bool:
return self.shape["t"] > 1
@cached_property
def zstack(self) -> bool:
return self.shape["z"] > 1
@wraps(np.ndarray.argmax)
def argmax(self, *args, **kwargs):
return self.__array_arg_fun__(np.argmax, *args, **kwargs)
@wraps(np.ndarray.argmin)
def argmin(self, *args, **kwargs):
return self.__array_arg_fun__(np.argmin, *args, **kwargs)
@wraps(np.ndarray.max)
def max(self, axis=None, out=None, keepdims=False, initial=None, where=True, **_):
return self.__array_fun__([np.max], axis, None, out, keepdims, [initial], where)
@wraps(np.ndarray.mean)
def mean(self, axis=None, dtype=None, out=None, keepdims=False, *, where=True, **_):
dtype = dtype or float
n = np.prod(self.shape) if axis is None else self.shape[axis]
def sfun(frame: ArrayLike) -> np.ndarray:
return np.asarray(frame).astype(float)
def cfun(res: np.ndarray) -> np.ndarray:
return res / n
return self.__array_fun__([np.sum], axis, dtype, out, keepdims, None, where, [sfun], cfun)
@wraps(np.ndarray.min)
def min(self, axis=None, out=None, keepdims=False, initial=None, where=True, **_):
return self.__array_fun__([np.min], axis, None, out, keepdims, [initial], where)
@wraps(np.moveaxis)
def moveaxis(self, source, destination):
raise NotImplementedError("moveaxis is not implemented")
@wraps(np.nanmax)
def nanmax(self, axis=None, out=None, keepdims=False, initial=None, where=True, **_):
return self.__array_fun__([np.nanmax], axis, None, out, keepdims, [initial], where)
@wraps(np.nanmean)
def nanmean(self, axis=None, dtype=None, out=None, keepdims=False, *, where=True, **_):
dtype = dtype or float
def sfun(frame):
return np.asarray(frame).astype(float)
def nfun(frame):
return np.invert(np.isnan(frame))
return self.__array_fun__([np.nansum, np.sum], axis, dtype, out, keepdims, None, where, (sfun, nfun), truediv)
@wraps(np.nanmin)
def nanmin(self, axis=None, out=None, keepdims=False, initial=None, where=True, **_):
return self.__array_fun__([np.nanmin], axis, None, out, keepdims, [initial], where)
@wraps(np.nansum)
def nansum(self, axis=None, dtype=None, out=None, keepdims=False, initial=None, where=True, **_):
return self.__array_fun__([np.nansum], axis, dtype, out, keepdims, [initial], where)
@wraps(np.nanstd)
def nanstd(self, axis=None, dtype=None, out=None, ddof=0, keepdims=None, *, where=None):
return self.nanvar(axis, dtype, out, ddof, keepdims, where=where, std=True)
@wraps(np.nanvar)
def nanvar(self, axis=None, dtype=None, out=None, ddof=0, keepdims=None, *, where=True, std=False):
dtype = dtype or float
def sfun(frame):
return np.asarray(frame).astype(float)
def s2fun(frame):
return np.asarray(frame).astype(float) ** 2
def nfun(frame):
return np.invert(np.isnan(frame))
if std:
def cfun(s, s2, n):
return np.sqrt((s2 - s**2 / n) / (n - ddof))
else:
def cfun(s, s2, n):
return (s2 - s**2 / n) / (n - ddof)
return self.__array_fun__(
[np.nansum, np.nansum, np.sum], axis, dtype, out, keepdims, None, where, (sfun, s2fun, nfun), cfun
)
@wraps(np.ndarray.flatten)
def flatten(self, *args, **kwargs):
return np.asarray(self).flatten(*args, **kwargs)
@wraps(np.ndarray.reshape)
def reshape(self, *args, **kwargs):
return np.asarray(self).reshape(*args, **kwargs)
@wraps(np.ndarray.squeeze)
def squeeze(self, axes=None):
new = self.copy()
if axes is None:
axes = tuple(i for i, j in enumerate(new.shape) if j == 1)
elif isinstance(axes, Number):
axes = (axes,)
else:
axes = tuple(new.axes.find(ax) if isinstance(ax, str) else ax for ax in axes)
if any([new.shape[ax] != 1 for ax in axes]):
raise ValueError("cannot select an axis to squeeze out which has size not equal to one")
new.axes = "".join(j for i, j in enumerate(new.axes) if i not in axes)
return new
@wraps(np.ndarray.std)
def std(self, axis=None, dtype=None, out=None, ddof=0, keepdims=None, *, where=True):
return self.var(axis, dtype, out, ddof, keepdims, where=where, std=True) # type: ignore
@wraps(np.ndarray.sum)
def sum(self, axis=None, dtype=None, out=None, keepdims=False, initial=None, where=True, **_):
return self.__array_fun__([np.sum], axis, dtype, out, keepdims, [initial], where)
@wraps(np.ndarray.swapaxes)
def swapaxes(self, axis1, axis2):
new = self.copy()
axes = new.axes
if isinstance(axis1, str):
axis1 = axes.find(axis1)
if isinstance(axis2, str):
axis2 = axes.find(axis2)
axes[axis1], axes[axis2] = axes[axis2], axes[axis1]
new.axes = axes
return new
@wraps(np.ndarray.transpose)
def transpose(self, *axes):
new = self.copy()
if not axes:
new.axes = new.axes[::-1]
else:
new.axes = "".join(ax if isinstance(ax, str) else new.axes[ax] for ax in axes)
return new
@wraps(np.ndarray.var)
def var(self, axis=None, dtype=None, out=None, ddof=0, keepdims=None, *, where=True, std=False):
dtype = dtype or float
n = np.prod(self.shape) if axis is None else self.shape[axis]
def sfun(frame):
return np.asarray(frame).astype(float)
def s2fun(frame):
return np.asarray(frame).astype(float) ** 2
if std:
def cfun(s, s2):
return np.sqrt((s2 - s**2 / n) / (n - ddof))
else:
def cfun(s, s2):
return (s2 - s**2 / n) / (n - ddof)
return self.__array_fun__([np.sum, np.sum], axis, dtype, out, keepdims, None, where, (sfun, s2fun), cfun)
def asarray(self) -> np.ndarray:
return self.__array__()
@wraps(np.ndarray.astype)
def astype(self, dtype, *_, **__):
new = self.copy()
new.dtype = dtype
return new
def block(
self,
y: int | Sequence[int] = None,
x: int | Sequence[int] = None,
c: int | Sequence[int] = None,
z: int | Sequence[int] = None,
t: int | Sequence[int] = None,
) -> np.ndarray:
"""returns 5D block of frames"""
y, x, c, z, t = (
np.arange(self.shape[i]) if e is None else np.array(e, ndmin=1) for i, e in zip("yxczt", (y, x, c, z, t))
)
d = np.empty((len(y), len(x), len(c), len(z), len(t)), self.dtype)
for (ci, cj), (zi, zj), (ti, tj) in product(enumerate(c), enumerate(z), enumerate(t)):
d[:, :, ci, zi, ti] = self.frame(cj, zj, tj)[y][:, x] # type: ignore
return d
def copy(self) -> View:
return View(self)
def data(self, c: int | Sequence[int] = 0, z: int | Sequence[int] = 0, t: int | Sequence[int] = 0) -> np.ndarray:
"""returns 3D stack of frames"""
c, z, t = (np.arange(self.shape[i]) if e is None else np.array(e, ndmin=1) for i, e in zip("czt", (c, z, t)))
return np.dstack([self.frame(ci, zi, ti) for ci, zi, ti in product(c, z, t)])
def frame(self, c: int = 0, z: int = 0, t: int = 0) -> np.ndarray:
"""returns single 2D frame"""
c = self.get_channel(c)
c %= self.base.shape["c"]
z %= self.base.shape["z"]
t %= self.base.shape["t"]
# cache last n (default 16) frames in memory for speed (~250x faster)
key = (c, z, t, self.transform, self.frame_decorator)
if self.cache.maxlen and key in self.cache:
self.cache.move_to_end(key)
f = self.cache[key]
else:
f = self.transform[self.channel_names[c], t].frame(self.__frame__(c, z, t))
if self.frame_decorator is not None:
f = self.frame_decorator(self, f, c, z, t)
if self.cache.maxlen:
self.cache[key] = f
if self.dtype is not None:
return f.copy().astype(self.dtype)
else:
return f.copy()
def get_channel(self, channel_name: str | int) -> int:
if not isinstance(channel_name, str):
return channel_name
else:
c = [i for i, c in enumerate(self.channel_names) if c.lower().startswith(channel_name.lower())]
assert len(c) > 0, f"Channel {c} not found in {self.channel_names}"
assert len(c) < 2, f"Channel {c} not unique in {self.channel_names}"
return c[0]
@staticmethod
def get_config(file: Path | str) -> Any:
"""Open a yml config file"""
loader = yaml.SafeLoader
loader.add_implicit_resolver(
r"tag:yaml.org,2002:float",
re.compile(
r"""^(?:
[-+]?([0-9][0-9_]*)\.[0-9_]*(?:[eE][-+]?[0-9]+)?
|[-+]?([0-9][0-9_]*)([eE][-+]?[0-9]+)
|\.[0-9_]+(?:[eE][-+][0-9]+)?
|[-+]?[0-9][0-9_]*(?::[0-5]?[0-9])+\.[0-9_]*
|[-+]?\\.(?:inf|Inf|INF)
|\.(?:nan|NaN|NAN))$""",
re.X,
),
list(r"-+0123456789."),
)
with open(file) as f:
return yaml.load(f, loader)
def get_czt(
self, c: int | Sequence[int], z: int | Sequence[int], t: int | Sequence[int]
) -> tuple[list[int], list[int], list[int]]:
czt = []
for i, n in zip("czt", (c, z, t)):
if n is None:
czt.append(list(range(self.shape[i])))
elif isinstance(n, range):
if n.stop < 0:
stop = n.stop % self.shape[i]
elif n.stop > self.shape[i]:
stop = self.shape[i]
else:
stop = n.stop
czt.append(list(range(n.start % self.shape[i], stop, n.step)))
elif isinstance(n, Number):
czt.append([n % self.shape[i]])
else:
czt.append([k % self.shape[i] for k in n])
return [self.get_channel(c) for c in czt[0]], *czt[1:3] # type: ignore
@staticmethod
def bioformats_ome(path: [str, Path]) -> OME:
"""Use java BioFormats to make an ome metadata structure."""
with multiprocessing.get_context("spawn").Pool(1) as pool:
return pool.map(bioformats_ome, (path,))[0]
@staticmethod
def fix_ome(ome: OME) -> OME:
# fix ome if necessary
for image in ome.images:
try:
if (
image.pixels.physical_size_z is None
and len(set([plane.the_z for plane in image.pixels.planes])) > 1
):
z = np.array(
[
(
plane.position_z * ureg.Quantity(plane.position_z_unit.value).to(ureg.m).magnitude,
plane.the_z,
)
for plane in image.pixels.planes
if plane.the_c == 0 and plane.the_t == 0
]
)
i = np.argsort(z[:, 1])
image.pixels.physical_size_z = np.nanmean(np.true_divide(*np.diff(z[i], axis=0).T)) * 1e6
image.pixels.physical_size_z_unit = "µm" # type: ignore
except Exception: # noqa
pass
return ome
@staticmethod
def read_ome(path: [str, Path]) -> Optional[OME]:
path = Path(path)
if path.with_suffix(".ome.xml").exists():
return OME.from_xml(path.with_suffix(".ome.xml"))
def get_ome(self) -> OME:
"""overload this"""
return self.bioformats_ome(self.path)
@cached_property
def ome(self) -> OME:
cache = OmeCache()
if self.path not in cache:
ome = self.read_ome(self.path)
if ome is None:
ome = self.get_ome()
cache[self.path] = self.fix_ome(ome)
return cache[self.path]
def is_noise(self, volume: ArrayLike = None) -> bool:
"""True if volume only has noise"""
if volume is None:
volume = self
fft = np.fft.fftn(volume)
corr = np.fft.fftshift(np.fft.ifftn(fft * fft.conj()).real / np.sum(volume**2))
return 1 - corr[tuple([0] * corr.ndim)] < 0.0067
@staticmethod
def kill_vm() -> None:
JVM().kill_vm()
def new(self, *args: Any, **kwargs: Any) -> View:
warnings.warn("Imread.new has been deprecated, use Imread.view instead.", DeprecationWarning, 2)
return self.view(*args, **kwargs)
def save_as_movie(
self,
fname: Path | str = None,
c: int | Sequence[int] = None,
z: int | Sequence[int] = None, # noqa
t: str | int | Sequence[int] = None, # noqa
colors: tuple[str] = None,
brightnesses: tuple[float] = None,
scale: int = None,
bar: bool = True,
speed: float = None,
) -> None:
"""saves the image as a mp4 or mkv file"""
from matplotlib.colors import to_rgb
from skvideo.io import FFmpegWriter
if t is None:
t = np.arange(self.shape["t"])
elif isinstance(t, str):
t = eval(f"np.arange(self.shape['t'])[{t}]")
elif np.isscalar(t):
t = (t,)
if speed is None:
speed = 25 / 7
def get_ab(tyx: Imread, p: tuple[float, float] = (1, 99)) -> tuple[float, float]:
s = tyx.flatten()
s = s[s > 0]
a, b = np.percentile(s, p)
if a == b:
a, b = np.min(s), np.max(s)
if a == b:
a, b = 0, 1
return a, b
def cframe(frame: ArrayLike, color: str, a: float, b: float, scale: float = 1) -> np.ndarray: # noqa
color = to_rgb(color)
frame = (frame - a) / (b - a)
frame = np.dstack([255 * frame * i for i in color])
return np.clip(np.round(frame), 0, 255).astype("uint8")
ab = list(zip(*[get_ab(i) for i in self.transpose("cztyx")])) # type: ignore
colors = colors or ("r", "g", "b")[: self.shape["c"]] + max(0, self.shape["c"] - 3) * ("w",)
brightnesses = brightnesses or (1,) * self.shape["c"]
scale = scale or 1
shape_x = 2 * ((self.shape["x"] * scale + 1) // 2)
shape_y = 2 * ((self.shape["y"] * scale + 1) // 2)
with FFmpegWriter(
str(fname).format(name=self.path.stem, path=str(self.path.parent)),
outputdict={
"-vcodec": "libx264",
"-preset": "veryslow",
"-pix_fmt": "yuv420p",
"-r": "7",
"-vf": f"setpts={speed}*PTS,scale={shape_x}:{shape_y}:flags=neighbor",
},
) as movie:
im = self.transpose("tzcyx") # type: ignore
for ti in tqdm(t, desc="Saving movie", disable=not bar):
movie.writeFrame(
np.max(
[
cframe(yx, c, a, b / s, scale)
for yx, a, b, c, s in zip(im[ti].max("z"), *ab, colors, brightnesses)
],
0,
)
)
def save_as_tiff(
self,
fname: Path | str = None,
c: int | Sequence[int] = None,
z: int | Sequence[int] = None,
t: int | Sequence[int] = None,
split: bool = False,
bar: bool = True,
pixel_type: str = "uint16",
compression: str = None,
**kwargs: Any,
) -> None:
"""saves the image as a tif file
split: split channels into different files"""
fname = Path(str(fname).format(name=self.path.stem, path=str(self.path.parent)))
if fname is None:
fname = self.path.with_suffix(".tif")
if fname == self.path:
raise FileExistsError(f"File {fname} exists already.")
if not isinstance(fname, Path):
fname = Path(fname)
if split:
for i in range(self.shape["c"]):
if self.timeseries:
self.save_as_tiff(
fname.with_name(f"{fname.stem}_C{i:01d}").with_suffix(".tif"),
i,
0,
None,
False,
bar,
pixel_type,
compression,
**kwargs,
)
else:
self.save_as_tiff(
fname.with_name(f"{fname.stem}_C{i:01d}").with_suffix(".tif"),
i,
None,
0,
False,
bar,
pixel_type,
compression,
**kwargs,
)
else:
n = [c, z, t]
for i, ax in enumerate("czt"):
if n[i] is None:
n[i] = range(self.shape[ax])
elif not isinstance(n[i], (tuple, list)):
n[i] = (n[i],)
shape = [len(i) for i in n]
with TransformTiff(
self,
fname.with_suffix(".tif"),
dtype=pixel_type,
pxsize=self.pxsize_um,
deltaz=self.deltaz_um,
compression=compression,
**kwargs,
) as tif:
for i, m in tqdm(
zip(product(*[range(s) for s in shape]), product(*n)), # noqa
total=np.prod(shape),
desc="Saving tiff",
disable=not bar,
):
tif.save(m, *i)
def with_transform(
self,
channels: bool = True,
drift: bool = False,
file: Path | str = None,
bead_files: Sequence[Path | str] = (),
) -> View:
"""returns a view where channels and/or frames are registered with an affine transformation
channels: True/False register channels using bead_files
drift: True/False register frames to correct drift
file: load registration from file with name file, default: transform.yml in self.path.parent
bead_files: files used to register channels, default: files in self.path.parent,
with names starting with 'beads'
"""
view = self.view()
if file is None:
file = Path(view.path.parent) / "transform.yml"
else:
file = Path(file)
if not bead_files:
try:
bead_files = Transforms.get_bead_files(view.path.parent)
except Exception: # noqa
if not file.exists():
raise Exception("No transform file and no bead file found.")
bead_files = ()
if channels:
try:
view.transform = Transforms.from_file(file, T=drift)
except Exception: # noqa
view.transform = Transforms().with_beads(view.cyllens, bead_files)
if drift:
view.transform = view.transform.with_drift(view)
view.transform.save(file.with_suffix(".yml"))
view.transform.save_channel_transform_tiff(bead_files, file.with_suffix(".tif"))
elif drift:
try:
view.transform = Transforms.from_file(file, C=False)
except Exception: # noqa
view.transform = Transforms().with_drift(self)
view.transform.adapt(view.frameoffset, view.shape.yxczt, view.channel_names)
return view
def set_cache_size(self, cache_size: int) -> None:
assert isinstance(cache_size, int) and cache_size >= 0
self.cache.maxlen = cache_size
self.cache.truncate()
@staticmethod
def split_path_series(path: Path | str) -> tuple[Path, int]:
if isinstance(path, str):
path = Path(path)
if isinstance(path, Path) and path.name.startswith("Pos") and path.name.lstrip("Pos").isdigit():
return path.parent, int(path.name.lstrip("Pos"))
return path, 0
def view(self, *args: Any, **kwargs: Any) -> View:
return View(self, *args, **kwargs)
class View(Imread, ABC):
def __init__(self, base: Imread, dtype: DTypeLike = None) -> None:
super().__init__(base.base, base.slice, base.shape, dtype or base.dtype, base.frame_decorator)
self.transform = base.transform
def __getattr__(self, item: str) -> Any:
if not hasattr(self.base, item):
raise AttributeError(f"{self.__class__} object has no attribute {item}")
return self.base.__getattribute__(item)
class AbstractReader(Imread, metaclass=ABCMeta):
priority = 99
do_not_pickle = "cache"
ureg = ureg
@staticmethod
@abstractmethod
def _can_open(path: Path | str) -> bool:
"""Override this method, and return true when the subclass can open the file"""
return False
@staticmethod
def get_positions(path: str | Path) -> Optional[list[int]]:
return None
@abstractmethod
def __frame__(self, c: int, z: int, t: int) -> np.ndarray:
"""Override this, return the frame at c, z, t"""
return np.random.randint(0, 255, self.shape["yx"])
def open(self) -> None:
"""Optionally override this, open file handles etc.
filehandles cannot be pickled and should be marked such by setting do_not_pickle = 'file_handle_name'"""
return
def close(self) -> None:
"""Optionally override this, close file handles etc."""
return
def __init__(self, path: Path | str | Imread | Any = None, dtype: DTypeLike = None, axes: str = None) -> None:
if isinstance(path, Imread):
return
super().__init__()
self.isclosed = False
if isinstance(path, str):
path = Path(path)
self.path, self.series = self.split_path_series(path)
if isinstance(path, Path) and path.exists():
self.title = self.path.name
self.acquisitiondate = datetime.fromtimestamp(self.path.stat().st_mtime).strftime("%y-%m-%dT%H:%M:%S")
else: # ndarray
self.title = self.__class__.__name__
self.acquisitiondate = "now"
self.reader = None
self.pcf = None
self.powermode = None
self.collimator = None
self.tirfangle = None
self.cyllens = ["None", "None"]
self.duolink = "None"
self.detector = [0, 1]
self.track = [0]
self.cache = DequeDict(16)
self.frameoffset = 0, 0 # how far apart the centers of frame and sensor are
self.open()
# extract some metadata from ome
instrument = self.ome.instruments[0] if self.ome.instruments else None
image = self.ome.images[self.series if len(self.ome.images) > 1 else 0]
pixels = image.pixels
self.shape = pixels.size_y, pixels.size_x, pixels.size_c, pixels.size_z, pixels.size_t
self.base_shape = Shape((pixels.size_y, pixels.size_x, pixels.size_c, pixels.size_z, pixels.size_t), "yxczt")
self.dtype = pixels.type.value if dtype is None else dtype
self.pxsize = pixels.physical_size_x_quantity
try:
self.exposuretime = tuple(
find(image.pixels.planes, the_c=c).exposure_time_quantity for c in range(self.shape["c"])
)
except AttributeError:
self.exposuretime = ()
if self.zstack:
self.deltaz = image.pixels.physical_size_z_quantity
self.deltaz_um = None if self.deltaz is None else self.deltaz.to(self.ureg.um).m
else:
self.deltaz = self.deltaz_um = None
if image.objective_settings:
self.objective = find(instrument.objectives, id=image.objective_settings.id)
else:
self.objective = None
try:
t0 = find(image.pixels.planes, the_c=0, the_t=0, the_z=0).delta_t
t1 = find(image.pixels.planes, the_c=0, the_t=self.shape["t"] - 1, the_z=0).delta_t
self.timeinterval = (t1 - t0) / (self.shape["t"] - 1) if self.shape["t"] > 1 and t1 > t0 else None
except AttributeError:
self.timeinterval = None
try:
self.binning = [int(i) for i in image.pixels.channels[0].detector_settings.binning.value.split("x")]
if self.pxsize is not None:
self.pxsize *= self.binning[0]
except (AttributeError, IndexError, ValueError):
self.binning = None
self.channel_names = [channel.name for channel in image.pixels.channels]
self.channel_names += [chr(97 + i) for i in range(len(self.channel_names), self.shape["c"])]
self.cnamelist = self.channel_names
try:
optovars = [objective for objective in instrument.objectives if "tubelens" in objective.id.lower()]
except AttributeError:
optovars = []
if len(optovars) == 0:
self.tubelens = None
else:
self.tubelens = optovars[0]
if self.objective:
if self.tubelens:
self.magnification = self.objective.nominal_magnification * self.tubelens.nominal_magnification
else:
self.magnification = self.objective.nominal_magnification
self.NA = self.objective.lens_na
else:
self.magnification = None
self.NA = None
self.gain = [
find(instrument.detectors, id=channel.detector_settings.id).amplification_gain
for channel in image.pixels.channels
if channel.detector_settings
and find(instrument.detectors, id=channel.detector_settings.id).amplification_gain
]
self.laserwavelengths = [
(channel.excitation_wavelength_quantity.to(self.ureg.nm).m,)
for channel in pixels.channels
if channel.excitation_wavelength_quantity
]
self.laserpowers = try_default(
lambda: [(1 - channel.light_source_settings.attenuation,) for channel in pixels.channels], []
)
self.filter = try_default( # type: ignore
lambda: [
find(instrument.filter_sets, id=channel.filter_set_ref.id).model for channel in image.pixels.channels
],
None,
)
self.pxsize_um = None if self.pxsize is None else self.pxsize.to(self.ureg.um).m
self.exposuretime_s = [None if i is None else i.to(self.ureg.s).m for i in self.exposuretime]
if axes is None:
self.axes = "".join(i for i in "cztyx" if self.shape[i] > 1)
elif axes.lower() == "full":
self.axes = "cztyx"
else:
self.axes = axes
self.slice = [np.arange(s, dtype=int) for s in self.shape.yxczt]
m = self.extrametadata
if m is not None:
try:
self.cyllens = m["CylLens"]
self.duolink = m["DLFilterSet"].split(" & ")[m["DLFilterChannel"]]
if "FeedbackChannels" in m:
self.feedback = m["FeedbackChannels"]
else:
self.feedback = m["FeedbackChannel"]
except Exception: # noqa
self.cyllens = ["None", "None"]
self.duolink = "None"
self.feedback = []
try:
self.cyllenschannels = np.where(
[self.cyllens[self.detector[c]].lower() != "none" for c in range(self.shape["c"])]
)[0].tolist()
except Exception: # noqa
pass
try:
s = int(re.findall(r"_(\d{3})_", self.duolink)[0]) * ureg.nm
except Exception: # noqa
s = 561 * ureg.nm
try:
sigma = []
for c, d in enumerate(self.detector):
emission = (np.hstack(self.laserwavelengths[c]) + 22) * ureg.nm
sigma.append([emission[emission > s].max(initial=0), emission[emission < s].max(initial=0)][d])
sigma = np.hstack(sigma)
sigma[sigma == 0] = 600 * ureg.nm
sigma /= 2 * self.NA * self.pxsize
self.sigma = sigma.magnitude.tolist() # type: ignore
except Exception: # noqa
self.sigma = [2] * self.shape["c"]
if not self.NA:
self.immersionN = 1
elif 1.5 < self.NA:
self.immersionN = 1.661
elif 1.3 < self.NA < 1.5:
self.immersionN = 1.518
elif 1 < self.NA < 1.3:
self.immersionN = 1.33
else:
self.immersionN = 1
p = re.compile(r"(\d+):(\d+)$")
try:
self.track, self.detector = zip(
*[
[
int(i)
for i in p.findall(
find(self.ome.images[self.series].pixels.channels, id=f"Channel:{c}").detector_settings.id
)[0]
]
for c in range(self.shape["c"])
]
)
except Exception: # noqa
pass
def main() -> None:
parser = ArgumentParser(description="Display info and save as tif")
parser.add_argument("-v", "--version", action="version", version=__version__)
parser.add_argument("file", help="image_file", type=str, nargs="*")
parser.add_argument(
"-w",
"--write",
help="path to tif/movie out, {folder}, {name} and {ext} take this from file in",
type=str,
default=None,
)
parser.add_argument("-o", "--extract_ome", help="extract ome to xml file", action="store_true")
parser.add_argument("-r", "--register", help="register channels", action="store_true")
parser.add_argument("-c", "--channel", help="channel", type=int, default=None)
parser.add_argument("-z", "--zslice", help="z-slice", type=int, default=None)
parser.add_argument("-t", "--time", help="time (frames) in python slicing notation", type=str, default=None)
parser.add_argument("-s", "--split", help="split channels", action="store_true")
parser.add_argument("-f", "--force", help="force overwrite", action="store_true")
parser.add_argument(
"--compression", help="compression when writing tiffs, zstd or deflate", type=str, default=None
)
parser.add_argument("-C", "--movie-colors", help="colors for channels in movie", type=str, nargs="*")
parser.add_argument("-V", "--movie_speed", help="speed of move, default = 25 / 7", type=float, default=None)
parser.add_argument("-B", "--movie-brightnesses", help="scale brightness of each channel", type=float, nargs="*")
parser.add_argument("-S", "--movie-scale", help="upscale movie xy size, int", type=float)
args = parser.parse_args()
for file in tqdm(args.file, desc="operating on files", disable=len(args.file) == 1):
file = Path(file)
with Imread(file) as im: # noqa
if args.register:
im = im.with_transform() # noqa
if args.write:
write = Path(args.write.format(folder=str(file.parent), name=file.stem, ext=file.suffix)).absolute() # noqa
write.parent.mkdir(parents=True, exist_ok=True)
if write.exists() and not args.force:
print(f"File {args.write} exists already, add the -f flag if you want to overwrite it.")
elif write.suffix in (".mkv", ".mp4", ".mov"):
im.save_as_movie(
write,
args.channel,
args.zslice,
args.time,
args.movie_colors,
args.movie_brightnesses,
args.movie_scale,
bar=len(args.file) == 1,
speed=args.movie_speed,
)
else:
im.save_as_tiff(
write,
args.channel,
args.zslice,
args.time,
args.split,
bar=len(args.file) == 1,
compression=args.compression,
)
if args.extract_ome:
with open(im.path.with_suffix(".ome.xml"), "w") as f:
f.write(im.ome.to_xml())
if len(args.file) == 1:
print(im.summary)
from .readers import * # noqa