- add metaseriesread

- add function to get all positions/series in a file
- make sure mp4 dimensions are even
This commit is contained in:
Wim Pomp
2024-09-13 11:48:38 +02:00
parent af600633cf
commit eea24e17ef
6 changed files with 120 additions and 18 deletions

View File

@@ -177,6 +177,11 @@ class OmeCache(DequeDict):
(path.with_suffix('.ome.xml').lstat() if path.with_suffix('.ome.xml').exists() else None)) (path.with_suffix('.ome.xml').lstat() if path.with_suffix('.ome.xml').exists() else None))
def get_positions(path: str | Path) -> Optional[list[int]]:
subclass = AbstractReader.get_subclass(path)
return subclass.get_positions(AbstractReader.split_path_series(path)[0])
class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC): class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC):
""" class to read image files, while taking good care of important metadata, """ class to read image files, while taking good care of important metadata,
currently optimized for .czi files, but can open anything that bioformats can handle currently optimized for .czi files, but can open anything that bioformats can handle
@@ -246,13 +251,10 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC):
pcf: Optional[list[float]] pcf: Optional[list[float]]
__frame__: Callable[[int, int, int], np.ndarray] __frame__: Callable[[int, int, int], np.ndarray]
def __new__(cls, path: Path | str | Imread | Any = None, dtype: DTypeLike = None, axes: str = None) -> Imread: @staticmethod
if cls is not Imread: def get_subclass(path: Path | str | Any):
return super().__new__(cls)
if len(AbstractReader.__subclasses__()) == 0: if len(AbstractReader.__subclasses__()) == 0:
raise Exception('Restart python kernel please!') raise Exception('Restart python kernel please!')
if isinstance(path, Imread):
return path
path, _ = AbstractReader.split_path_series(path) path, _ = AbstractReader.split_path_series(path)
for subclass in sorted(AbstractReader.__subclasses__(), key=lambda subclass_: subclass_.priority): for subclass in sorted(AbstractReader.__subclasses__(), key=lambda subclass_: subclass_.priority):
if subclass._can_open(path): # noqa if subclass._can_open(path): # noqa
@@ -261,10 +263,23 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC):
subclass_do_not_pickle = (subclass.do_not_pickle,) if isinstance(subclass.do_not_pickle, str) \ subclass_do_not_pickle = (subclass.do_not_pickle,) if isinstance(subclass.do_not_pickle, str) \
else subclass.do_not_pickle if hasattr(subclass, 'do_not_pickle') else () else subclass.do_not_pickle if hasattr(subclass, 'do_not_pickle') else ()
subclass.do_not_pickle = set(do_not_pickle).union(set(subclass_do_not_pickle)) subclass.do_not_pickle = set(do_not_pickle).union(set(subclass_do_not_pickle))
return subclass
return super().__new__(subclass)
raise ReaderNotFoundError(f'No reader found for {path}.') raise ReaderNotFoundError(f'No reader found for {path}.')
def __new__(cls, path: Path | str | Imread | Any = None, dtype: DTypeLike = None, axes: str = None) -> Imread:
if cls is not Imread:
return super().__new__(cls)
if isinstance(path, Imread):
return path
subclass = cls.get_subclass(path)
do_not_pickle = (AbstractReader.do_not_pickle,) if isinstance(AbstractReader.do_not_pickle, str) \
else AbstractReader.do_not_pickle
subclass_do_not_pickle = (subclass.do_not_pickle,) if isinstance(subclass.do_not_pickle, str) \
else subclass.do_not_pickle if hasattr(subclass, 'do_not_pickle') else ()
subclass.do_not_pickle = set(do_not_pickle).union(set(subclass_do_not_pickle))
return super().__new__(subclass)
def __init__(self, *args: Any, **kwargs: Any): def __init__(self, *args: Any, **kwargs: Any):
def parse(base: Imread = None, # noqa def parse(base: Imread = None, # noqa
slice: tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray] = None, # noqa slice: tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray] = None, # noqa
@@ -990,11 +1005,13 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC):
colors = colors or ('r', 'g', 'b')[:self.shape['c']] + max(0, self.shape['c'] - 3) * ('w',) colors = colors or ('r', 'g', 'b')[:self.shape['c']] + max(0, self.shape['c'] - 3) * ('w',)
brightnesses = brightnesses or (1,) * self.shape['c'] brightnesses = brightnesses or (1,) * self.shape['c']
scale = scale or 1 scale = scale or 1
shape_x = 2 * ((self.shape['x'] * scale + 1) // 2)
shape_y = 2 * ((self.shape['y'] * scale + 1) // 2)
with FFmpegWriter( with FFmpegWriter(
str(fname).format(name=self.path.stem, path=str(self.path.parent)), str(fname).format(name=self.path.stem, path=str(self.path.parent)),
outputdict={'-vcodec': 'libx264', '-preset': 'veryslow', '-pix_fmt': 'yuv420p', '-r': '7', outputdict={'-vcodec': 'libx264', '-preset': 'veryslow', '-pix_fmt': 'yuv420p', '-r': '7',
'-vf': f'setpts={25 / 7}*PTS,' '-vf': f'setpts={25 / 7}*PTS,scale={shape_x}:{shape_y}:flags=neighbor'}
f'scale={self.shape["x"] * scale}:{self.shape["y"] * scale}:flags=neighbor'}
) as movie: ) as movie:
im = self.transpose('tzcyx') im = self.transpose('tzcyx')
for t in trange(self.shape['t'], desc='Saving movie', disable=not bar): for t in trange(self.shape['t'], desc='Saving movie', disable=not bar):
@@ -1109,6 +1126,10 @@ class AbstractReader(Imread, metaclass=ABCMeta):
""" Override this method, and return true when the subclass can open the file """ """ Override this method, and return true when the subclass can open the file """
return False return False
@staticmethod
def get_positions(path: str | Path) -> Optional[list[int]]:
return None
@abstractmethod @abstractmethod
def __frame__(self, c: int, z: int, t: int) -> np.ndarray: def __frame__(self, c: int, z: int, t: int) -> np.ndarray:
""" Override this, return the frame at c, z, t """ """ Override this, return the frame at c, z, t """
@@ -1296,7 +1317,7 @@ def main() -> None:
parser.add_argument('-C', '--movie-colors', help='colors for channels in movie', type=str, nargs='*') parser.add_argument('-C', '--movie-colors', help='colors for channels in movie', type=str, nargs='*')
parser.add_argument('-B', '--movie-brightnesses', help='scale brightness of each channel', parser.add_argument('-B', '--movie-brightnesses', help='scale brightness of each channel',
type=float, nargs='*') type=float, nargs='*')
parser.add_argument('-S', '--movie-scale', help='upscale movie xy size, int', type=int) parser.add_argument('-S', '--movie-scale', help='upscale movie xy size, int', type=float)
args = parser.parse_args() args = parser.parse_args()
for file in tqdm(args.file, desc='operating on files', disable=len(args.file) == 1): for file in tqdm(args.file, desc='operating on files', disable=len(args.file) == 1):
@@ -1308,7 +1329,7 @@ def main() -> None:
write = Path(args.write.format(folder=str(file.parent), name=file.stem, ext=file.suffix)).absolute() # noqa write = Path(args.write.format(folder=str(file.parent), name=file.stem, ext=file.suffix)).absolute() # noqa
write.parent.mkdir(parents=True, exist_ok=True) write.parent.mkdir(parents=True, exist_ok=True)
if write.exists() and not args.force: if write.exists() and not args.force:
print(f'File {args.out} exists already, add the -f flag if you want to overwrite it.') print(f'File {args.write} exists already, add the -f flag if you want to overwrite it.')
elif write.suffix in ('.mkv', '.mp4'): elif write.suffix in ('.mkv', '.mp4'):
im.save_as_movie(write, args.channel, args.zslice, args.time, args.movie_colors, im.save_as_movie(write, args.channel, args.zslice, args.time, args.movie_colors,
args.movie_brightnesses, args.movie_scale, bar=len(args.file) == 1) args.movie_brightnesses, args.movie_scale, bar=len(args.file) == 1)

View File

@@ -1 +1 @@
__all__ = 'bfread', 'cziread', 'fijiread', 'ndread', 'seqread', 'tifread' __all__ = 'bfread', 'cziread', 'fijiread', 'ndread', 'seqread', 'tifread', 'metaseriesread'

View File

@@ -0,0 +1,80 @@
import re
from abc import ABC
from pathlib import Path
from typing import Optional
import tifffile
from ome_types import model
from ome_types.units import _quantity_property # noqa
from .. import AbstractReader
class Reader(AbstractReader, ABC):
priority = 20
do_not_pickle = 'last_tif'
@staticmethod
def _can_open(path):
return isinstance(path, Path) and (path.is_dir() or
(path.parent.is_dir() and path.name.lower().startswith('pos')))
@staticmethod
def get_positions(path: str | Path) -> Optional[list[int]]:
pat = re.compile(rf's(\d)_t\d+\.(tif|TIF)$')
return sorted({int(m.group(1)) for file in Path(path).iterdir() if (m := pat.search(file.name))})
def get_ome(self):
ome = model.OME()
tif = self.get_tif(0)
metadata = tif.metaseries_metadata
size_z = len(tif.pages)
page = tif.pages[0]
shape = {axis.lower(): size for axis, size in zip(page.axes, page.shape)}
size_x, size_y = shape['x'], shape['y']
ome.instruments.append(model.Instrument())
size_c = 1
size_t = max(self.filedict.keys()) + 1
pixel_type = f"uint{metadata['PlaneInfo']['bits-per-pixel']}"
ome.images.append(
model.Image(
pixels=model.Pixels(
size_c=size_c, size_z=size_z, size_t=size_t,
size_x=size_x, size_y=size_y,
dimension_order='XYCZT', type=pixel_type),
objective_settings=model.ObjectiveSettings(id='Objective:0')))
return ome
def open(self):
pat = re.compile(rf's{self.series}_t\d+\.(tif|TIF)$')
filelist = sorted([file for file in self.path.iterdir() if pat.search(file.name)])
pattern = re.compile(r't(\d+)$')
self.filedict = {int(pattern.search(file.stem).group(1)) - 1: file for file in filelist}
if len(self.filedict) == 0:
raise FileNotFoundError
self.last_tif = 0, tifffile.TiffFile(self.filedict[0])
def close(self) -> None:
self.last_tif[1].close()
def get_tif(self, t: int = None):
last_t, tif = self.last_tif
if (t is None or t == last_t) and not tif.filehandle.closed:
return tif
else:
tif.close()
tif = tifffile.TiffFile(self.filedict[t])
self.last_tif = t, tif
return tif
def __frame__(self, c=0, z=0, t=0):
tif = self.get_tif(t)
page = tif.pages[z]
if page.axes.upper() == 'YX':
return page.asarray()
elif page.axes.upper() == 'XY':
return page.asarray().T
else:
raise NotImplementedError(f'reading axes {page.axes} is not implemented')

View File

@@ -46,9 +46,6 @@ class Reader(AbstractReader, ABC):
self.path = 'numpy array' self.path = 'numpy array'
def __frame__(self, c, z, t): def __frame__(self, c, z, t):
# xyczt = (slice(None), slice(None), c, z, t)
# in_idx = tuple(xyczt['xyczt'.find(i)] for i in self.axes)
# print(f'{in_idx = }')
frame = self.array[:, :, c, z, t] frame = self.array[:, :, c, z, t]
if self.axes.find('y') > self.axes.find('x'): if self.axes.find('y') > self.axes.find('x'):
return frame.T return frame.T

View File

@@ -46,7 +46,11 @@ class Reader(AbstractReader, ABC):
@staticmethod @staticmethod
def _can_open(path): def _can_open(path):
return isinstance(path, Path) and path.is_dir() if isinstance(path, Path) and path.is_dir():
files = [file for file in path.iterdir() if file.name.lower().startswith('pos')]
return len(files) > 0 and files[0].is_dir()
else:
return False
def get_ome(self): def get_ome(self):
ome = model.OME() ome = model.OME()
@@ -117,7 +121,7 @@ class Reader(AbstractReader, ABC):
else: else:
path = self.path path = self.path
pat = re.compile(r'^img_\d{3,}.*\d{3,}.*\.tif$') pat = re.compile(r'^img_\d{3,}.*\d{3,}.*\.(tif|TIF)$')
filelist = sorted([file for file in path.iterdir() if pat.search(file.name)]) filelist = sorted([file for file in path.iterdir() if pat.search(file.name)])
with tifffile.TiffFile(self.path / filelist[0]) as tif: with tifffile.TiffFile(self.path / filelist[0]) as tif:
metadata = {key: yaml.safe_load(value) for key, value in tif.pages[0].tags[50839].value.items()} metadata = {key: yaml.safe_load(value) for key, value in tif.pages[0].tags[50839].value.items()}

View File

@@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "ndbioimage" name = "ndbioimage"
version = "2024.9.0" version = "2024.9.1"
description = "Bio image reading, metadata and some affine registration." description = "Bio image reading, metadata and some affine registration."
authors = ["W. Pomp <w.pomp@nki.nl>"] authors = ["W. Pomp <w.pomp@nki.nl>"]
license = "GPLv3" license = "GPLv3"