- replace "" with ''

- bugfix in seqread finding images with pattern 00-Pos_000_000
- cache ome metadata
- detect faulty time delta data in czi files
- read ome from path.ome.xml if this file exists
- add extract-ome command line option
This commit is contained in:
Wim Pomp
2024-04-02 18:23:28 +02:00
parent 7d06db4ecd
commit 41658bea79
11 changed files with 355 additions and 271 deletions

View File

@@ -1,3 +1,5 @@
from __future__ import annotations
import multiprocessing import multiprocessing
import re import re
import warnings import warnings
@@ -12,13 +14,14 @@ from numbers import Number
from operator import truediv from operator import truediv
from pathlib import Path from pathlib import Path
from traceback import print_exc from traceback import print_exc
from typing import Any, Callable, Mapping, Optional
import numpy as np import numpy as np
import ome_types import ome_types
import yaml import yaml
from ome_types import model, ureg, OME from ome_types import OME, model, ureg
from pint import set_application_registry from pint import set_application_registry
from tiffwrite import IJTiffFile from tiffwrite import IFD, IJTiffFile
from tqdm.auto import tqdm from tqdm.auto import tqdm
from .jvm import JVM from .jvm import JVM
@@ -48,34 +51,34 @@ class ReaderNotFoundError(Exception):
class TransformTiff(IJTiffFile): class TransformTiff(IJTiffFile):
""" transform frames in a parallel process to speed up saving """ """ transform frames in a parallel process to speed up saving """
def __init__(self, image, *args, **kwargs): def __init__(self, image: Imread, *args: Any, **kwargs: Any) -> None:
self.image = image self.image = image
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
def compress_frame(self, frame): def compress_frame(self, frame: tuple[int, int, int]) -> tuple[IFD, tuple[list[int], list[int]]]:
return super().compress_frame(np.asarray(self.image(*frame)).astype(self.dtype)) return super().compress_frame(np.asarray(self.image(*frame)).astype(self.dtype))
class DequeDict(OrderedDict): class DequeDict(OrderedDict):
def __init__(self, maxlen=None, *args, **kwargs): def __init__(self, maxlen: int = None, *args: Any, **kwargs: Any) -> None:
self.maxlen = maxlen self.maxlen = maxlen
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
def __truncate__(self): def __truncate__(self) -> None:
if self.maxlen is not None: if self.maxlen is not None:
while len(self) > self.maxlen: while len(self) > self.maxlen:
self.popitem(False) self.popitem(False)
def __setitem__(self, *args, **kwargs): def __setitem__(self, *args: Any, **kwargs: Any) -> None:
super().__setitem__(*args, **kwargs) super().__setitem__(*args, **kwargs)
self.__truncate__() self.__truncate__()
def update(self, *args, **kwargs): def update(self, *args: Any, **kwargs: Any) -> None:
super().update(*args, **kwargs) super().update(*args, **kwargs)
self.__truncate__() self.__truncate__()
def find(obj, **kwargs): def find(obj: Mapping, **kwargs: Any) -> Any:
for item in obj: for item in obj:
try: try:
if all([getattr(item, key) == value for key, value in kwargs.items()]): if all([getattr(item, key) == value for key, value in kwargs.items()]):
@@ -84,14 +87,14 @@ def find(obj, **kwargs):
pass pass
def try_default(fun, default, *args, **kwargs): def try_default(fun: Callable, default: Any, *args: Any, **kwargs: Any) -> Any:
try: try:
return fun(*args, **kwargs) return fun(*args, **kwargs)
except Exception: # noqa except Exception: # noqa
return default return default
def get_ome(path): def bioformats_ome(path: str | Path) -> OME:
from .readers.bfread import jars from .readers.bfread import jars
try: try:
jvm = JVM(jars) # noqa jvm = JVM(jars) # noqa
@@ -109,24 +112,70 @@ def get_ome(path):
class Shape(tuple): class Shape(tuple):
def __new__(cls, shape, axes='yxczt'): def __new__(cls, shape: tuple[int] | Shape[int], axes: str = 'yxczt') -> Shape[int]:
if isinstance(shape, Shape): if isinstance(shape, Shape):
axes = shape.axes axes = shape.axes # type: ignore
instance = super().__new__(cls, shape) instance = super().__new__(cls, shape)
instance.axes = axes.lower() instance.axes = axes.lower()
return instance return instance # type: ignore
def __getitem__(self, n): def __getitem__(self, n: int | str) -> int | tuple[int]:
if isinstance(n, str): if isinstance(n, str):
if len(n) == 1: if len(n) == 1:
return self[self.axes.find(n.lower())] if n.lower() in self.axes else 1 return self[self.axes.find(n.lower())] if n.lower() in self.axes else 1
else: else:
return tuple(self[i] for i in n) return tuple(self[i] for i in n) # type: ignore
return super().__getitem__(n) return super().__getitem__(n)
@cached_property @cached_property
def yxczt(self): def yxczt(self) -> tuple[int, int, int, int, int]:
return tuple(self[i] for i in 'yxczt') return tuple(self[i] for i in 'yxczt') # type: ignore
class CachedPath(Path):
""" helper class for checking whether a file has changed, used by OmeCache """
def __init__(self, path: Path | str) -> None:
super().__init__(path)
if self.exists():
self._lstat = super().lstat() # save file metadata like creation time etc.
else:
self._lstat = None
def __eq__(self, other: Path | CachedPath) -> bool:
return super().__eq__(other) and self.lstat() == other.lstat()
def __hash__(self) -> int:
return hash((super().__hash__(), self.lstat()))
def lstat(self):
return self._lstat
class OmeCache(DequeDict):
""" prevent (potentially expensive) rereading of ome data by caching """
instance = None
def __new__(cls) -> OmeCache:
if cls.instance is None:
cls.instance = super().__new__(cls)
return cls.instance
def __init__(self) -> None:
super().__init__(64)
def __reduce__(self) -> tuple[type, tuple]:
return self.__class__, ()
def __getitem__(self, item: Path | CachedPath) -> OME:
return super().__getitem__(CachedPath(item))
def __setitem__(self, key: Path | CachedPath, value: OME) -> None:
super().__setitem__(CachedPath(key), value)
def __contains__(self, item: Path | CachedPath) -> bool:
return super().__contains__(CachedPath(item))
class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC): class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC):
@@ -246,7 +295,7 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC):
def __getitem__(self, n): def __getitem__(self, n):
""" slice like a numpy array but return an Imread instance """ """ slice like a numpy array but return an Imread instance """
if self.isclosed: if self.isclosed:
raise OSError("file is closed") raise OSError('file is closed')
if isinstance(n, (slice, Number)): # None = : if isinstance(n, (slice, Number)): # None = :
n = (n,) n = (n,)
elif isinstance(n, type(Ellipsis)): elif isinstance(n, type(Ellipsis)):
@@ -520,11 +569,11 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC):
@property @property
def summary(self): def summary(self):
""" gives a helpful summary of the recorded experiment """ """ gives a helpful summary of the recorded experiment """
s = [f"path/filename: {self.path}", s = [f'path/filename: {self.path}',
f"series/pos: {self.series}", f'series/pos: {self.series}',
f"reader: {self.base.__class__.__module__.split('.')[-1]}"] f"reader: {self.base.__class__.__module__.split('.')[-1]}"]
s.extend((f"dtype: {self.dtype}", s.extend((f'dtype: {self.dtype}',
f"shape ({self.axes}):".ljust(15) + f"{' x '.join(str(i) for i in self.shape)}")) f'shape ({self.axes}):'.ljust(15) + f"{' x '.join(str(i) for i in self.shape)}"))
if self.pxsize_um: if self.pxsize_um:
s.append(f'pixel size: {1000 * self.pxsize_um:.2f} nm') s.append(f'pixel size: {1000 * self.pxsize_um:.2f} nm')
if self.zstack and self.deltaz_um: if self.zstack and self.deltaz_um:
@@ -818,11 +867,13 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC):
return [self.get_channel(c) for c in czt[0]], *czt[1:] return [self.get_channel(c) for c in czt[0]], *czt[1:]
@staticmethod @staticmethod
def get_ome(path: [str, Path]) -> OME: def bioformats_ome(path: [str, Path]) -> OME:
""" Use java BioFormats to make an ome metadata structure. """ """ Use java BioFormats to make an ome metadata structure. """
with multiprocessing.get_context('spawn').Pool(1) as pool: with multiprocessing.get_context('spawn').Pool(1) as pool:
ome = pool.map(get_ome, (path,))[0] return pool.map(bioformats_ome, (path,))[0]
@staticmethod
def fix_ome(ome: OME) -> OME:
# fix ome if necessary # fix ome if necessary
for image in ome.images: for image in ome.images:
try: try:
@@ -838,9 +889,25 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC):
pass pass
return ome return ome
@staticmethod
def read_ome(path: [str, Path]) -> Optional[OME]:
path = Path(path)
if path.with_suffix('.ome.xml').exists():
return OME.from_xml(path.with_suffix('.ome.xml'))
def get_ome(self) -> OME:
""" overload this """
return self.bioformats_ome(self.path)
@cached_property @cached_property
def ome(self) -> OME: def ome(self) -> OME:
return self.get_ome(self.path) cache = OmeCache()
if self.path not in cache:
ome = self.read_ome(self.path)
if ome is None:
ome = self.get_ome()
cache[self.path] = self.fix_ome(ome)
return cache[self.path]
def is_noise(self, volume=None): def is_noise(self, volume=None):
""" True if volume only has noise """ """ True if volume only has noise """
@@ -885,7 +952,7 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC):
shape = [len(i) for i in n] shape = [len(i) for i in n]
with TransformTiff(self, fname.with_suffix('.tif'), shape, pixel_type, with TransformTiff(self, fname.with_suffix('.tif'), shape, pixel_type,
pxsize=self.pxsize_um, deltaz=self.deltaz_um, **kwargs) as tif: pxsize=self.pxsize_um, deltaz=self.deltaz_um, **kwargs) as tif:
for i, m in tqdm(zip(product(*[range(s) for s in shape]), product(*n)), # noqa for i, m in tqdm(zip(product(*[range(s) for s in shape]), product(*n)), # noqa
total=np.prod(shape), desc='Saving tiff', disable=not bar): total=np.prod(shape), desc='Saving tiff', disable=not bar):
tif.save(m, *i) tif.save(m, *i)
@@ -1000,7 +1067,7 @@ class AbstractReader(Imread, metaclass=ABCMeta):
self.open() self.open()
# extract some metadata from ome # extract some metadata from ome
instrument = self.ome.instruments[0] if self.ome.instruments else None instrument = self.ome.instruments[0] if self.ome.instruments else None
image = self.ome.images[self.series] image = self.ome.images[self.series if len(self.ome.images) > 1 else 0]
pixels = image.pixels pixels = image.pixels
self.shape = pixels.size_y, pixels.size_x, pixels.size_c, pixels.size_z, pixels.size_t self.shape = pixels.size_y, pixels.size_x, pixels.size_c, pixels.size_z, pixels.size_t
self.dtype = pixels.type.value if dtype is None else dtype self.dtype = pixels.type.value if dtype is None else dtype
@@ -1016,8 +1083,8 @@ class AbstractReader(Imread, metaclass=ABCMeta):
self.deltaz_um = None if self.deltaz is None else self.deltaz.to(self.ureg.um).m self.deltaz_um = None if self.deltaz is None else self.deltaz.to(self.ureg.um).m
else: else:
self.deltaz = self.deltaz_um = None self.deltaz = self.deltaz_um = None
if self.ome.images[self.series].objective_settings: if image.objective_settings:
self.objective = find(instrument.objectives, id=self.ome.images[self.series].objective_settings.id) self.objective = find(instrument.objectives, id=image.objective_settings.id)
else: else:
self.objective = None self.objective = None
try: try:
@@ -1130,6 +1197,7 @@ def main():
parser = ArgumentParser(description='Display info and save as tif') parser = ArgumentParser(description='Display info and save as tif')
parser.add_argument('file', help='image_file') parser.add_argument('file', help='image_file')
parser.add_argument('out', help='path to tif out', type=str, default=None, nargs='?') parser.add_argument('out', help='path to tif out', type=str, default=None, nargs='?')
parser.add_argument('-o', '--extract_ome', help='extract ome to xml file', action='store_true')
parser.add_argument('-r', '--register', help='register channels', action='store_true') parser.add_argument('-r', '--register', help='register channels', action='store_true')
parser.add_argument('-c', '--channel', help='channel', type=int, default=None) parser.add_argument('-c', '--channel', help='channel', type=int, default=None)
parser.add_argument('-z', '--zslice', help='z-slice', type=int, default=None) parser.add_argument('-z', '--zslice', help='z-slice', type=int, default=None)
@@ -1149,6 +1217,9 @@ def main():
print(f'File {args.out} exists already, add the -f flag if you want to overwrite it.') print(f'File {args.out} exists already, add the -f flag if you want to overwrite it.')
else: else:
im.save_as_tiff(out, args.channel, args.zslice, args.time, args.split) im.save_as_tiff(out, args.channel, args.zslice, args.time, args.split)
if args.extract_ome:
with open(im.path.with_suffix('.ome.xml'), 'w') as f:
f.write(im.ome.to_xml())
from .readers import * from .readers import *

View File

@@ -1,6 +1,6 @@
import re import re
import warnings
from abc import ABC from abc import ABC
from functools import cached_property
from io import BytesIO from io import BytesIO
from itertools import product from itertools import product
from pathlib import Path from pathlib import Path
@@ -173,18 +173,17 @@ class Reader(AbstractReader, ABC):
def close(self): def close(self):
self.reader.close() self.reader.close()
@cached_property def get_ome(self):
def ome(self):
xml = self.reader.metadata() xml = self.reader.metadata()
attachments = {i.attachment_entry.name: i.attachment_entry.data_segment() attachments = {i.attachment_entry.name: i.attachment_entry.data_segment()
for i in self.reader.attachments()} for i in self.reader.attachments()}
tree = etree.fromstring(xml) tree = etree.fromstring(xml)
metadata = tree.find("Metadata") metadata = tree.find('Metadata')
version = metadata.find("Version") version = metadata.find('Version')
if version is not None: if version is not None:
version = version.text version = version.text
else: else:
version = metadata.find("Experiment").attrib["Version"] version = metadata.find('Experiment').attrib['Version']
if version == '1.0': if version == '1.0':
return self.ome_10(tree, attachments) return self.ome_10(tree, attachments)
@@ -200,58 +199,58 @@ class Reader(AbstractReader, ABC):
ome = model.OME() ome = model.OME()
metadata = tree.find("Metadata") metadata = tree.find('Metadata')
information = metadata.find("Information") information = metadata.find('Information')
display_setting = metadata.find("DisplaySetting") display_setting = metadata.find('DisplaySetting')
ome.experimenters = [model.Experimenter(id="Experimenter:0", ome.experimenters = [model.Experimenter(id='Experimenter:0',
user_name=information.find("Document").find("UserName").text)] user_name=information.find('Document').find('UserName').text)]
instrument = information.find("Instrument") instrument = information.find('Instrument')
for _ in instrument.find("Microscopes"): for _ in instrument.find('Microscopes'):
ome.instruments.append(model.Instrument(id='Instrument:0')) ome.instruments.append(model.Instrument(id='Instrument:0'))
for detector in instrument.find("Detectors"): for detector in instrument.find('Detectors'):
try: try:
detector_type = model.Detector_Type(text(detector.find("Type")).upper() or "") detector_type = model.Detector_Type(text(detector.find('Type')).upper() or "")
except ValueError: except ValueError:
detector_type = model.Detector_Type.OTHER detector_type = model.Detector_Type.OTHER
ome.instruments[0].detectors.append( ome.instruments[0].detectors.append(
model.Detector( model.Detector(
id=detector.attrib["Id"].replace(' ', ''), model=text(detector.find("Manufacturer").find("Model")), id=detector.attrib['Id'].replace(' ', ''), model=text(detector.find('Manufacturer').find('Model')),
type=detector_type type=detector_type
)) ))
for objective in instrument.find("Objectives"): for objective in instrument.find('Objectives'):
ome.instruments[0].objectives.append( ome.instruments[0].objectives.append(
model.Objective( model.Objective(
id=objective.attrib["Id"], id=objective.attrib['Id'],
model=text(objective.find("Manufacturer").find("Model")), model=text(objective.find('Manufacturer').find('Model')),
immersion=text(objective.find("Immersion")), immersion=text(objective.find('Immersion')),
lens_na=float(text(objective.find("LensNA"))), lens_na=float(text(objective.find('LensNA'))),
nominal_magnification=float(text(objective.find("NominalMagnification"))))) nominal_magnification=float(text(objective.find('NominalMagnification')))))
for tubelens in instrument.find("TubeLenses"): for tubelens in instrument.find('TubeLenses'):
try: try:
nominal_magnification = float(re.findall(r'\d+(?:[,.]\d*)?', nominal_magnification = float(re.findall(r'\d+(?:[,.]\d*)?',
tubelens.attrib["Name"])[0].replace(',', '.')) tubelens.attrib['Name'])[0].replace(',', '.'))
except Exception: except Exception:
nominal_magnification = 1.0 nominal_magnification = 1.0
ome.instruments[0].objectives.append( ome.instruments[0].objectives.append(
model.Objective( model.Objective(
id=f'Objective:{tubelens.attrib["Id"]}', id=f"Objective:{tubelens.attrib['Id']}",
model=tubelens.attrib["Name"], model=tubelens.attrib['Name'],
nominal_magnification=nominal_magnification)) nominal_magnification=nominal_magnification))
for light_source in def_list(instrument.find("LightSources")): for light_source in def_list(instrument.find('LightSources')):
if light_source.find("LightSourceType").find("Laser") is not None: if light_source.find('LightSourceType').find('Laser') is not None:
ome.instruments[0].lasers.append( ome.instruments[0].lasers.append(
model.Laser( model.Laser(
id=f'LightSource:{light_source.attrib["Id"]}', id=f"LightSource:{light_source.attrib['Id']}",
power=float(text(light_source.find("Power"))), power=float(text(light_source.find('Power'))),
wavelength=float(light_source.attrib["Id"][-3:]))) wavelength=float(light_source.attrib['Id'][-3:])))
x_min = min([f.start[f.axes.index('X')] for f in self.filedict[0, 0, 0]]) x_min = min([f.start[f.axes.index('X')] for f in self.filedict[0, 0, 0]])
y_min = min([f.start[f.axes.index('Y')] for f in self.filedict[0, 0, 0]]) y_min = min([f.start[f.axes.index('Y')] for f in self.filedict[0, 0, 0]])
@@ -262,14 +261,14 @@ class Reader(AbstractReader, ABC):
size_c, size_z, size_t = (self.reader.shape[self.reader.axes.index(directory_entry)] size_c, size_z, size_t = (self.reader.shape[self.reader.axes.index(directory_entry)]
for directory_entry in 'CZT') for directory_entry in 'CZT')
image = information.find("Image") image = information.find('Image')
pixel_type = text(image.find("PixelType"), "Gray16") pixel_type = text(image.find('PixelType'), 'Gray16')
if pixel_type.startswith("Gray"): if pixel_type.startswith('Gray'):
pixel_type = "uint" + pixel_type[4:] pixel_type = 'uint' + pixel_type[4:]
objective_settings = image.find("ObjectiveSettings") objective_settings = image.find('ObjectiveSettings')
try: # TODO try: # TODO
scenes = image.find("Dimensions").find("S").find("Scenes") scenes = image.find('Dimensions').find('S').find('Scenes')
center_position = [float(pos) for pos in text(scenes[0].find("CenterPosition")).split(',')] center_position = [float(pos) for pos in text(scenes[0].find('CenterPosition')).split(',')]
except AttributeError: except AttributeError:
center_position = [0, 0] center_position = [0, 0]
um = model.UnitsLength.MICROMETER um = model.UnitsLength.MICROMETER
@@ -277,97 +276,103 @@ class Reader(AbstractReader, ABC):
ome.images.append( ome.images.append(
model.Image( model.Image(
id="Image:0", id='Image:0',
name=f'{text(information.find("Document").find("Name"))} #1', name=f"{text(information.find('Document').find('Name'))} #1",
pixels=model.Pixels( pixels=model.Pixels(
id="Pixels:0", size_x=size_x, size_y=size_y, id='Pixels:0', size_x=size_x, size_y=size_y,
size_c=size_c, size_z=size_z, size_t=size_t, size_c=size_c, size_z=size_z, size_t=size_t,
dimension_order="XYCZT", type=pixel_type, dimension_order='XYCZT', type=pixel_type,
significant_bits=int(text(image.find("ComponentBitCount"))), significant_bits=int(text(image.find('ComponentBitCount'))),
big_endian=False, interleaved=False, metadata_only=True), big_endian=False, interleaved=False, metadata_only=True),
experimenter_ref=model.ExperimenterRef(id='Experimenter:0'), experimenter_ref=model.ExperimenterRef(id='Experimenter:0'),
instrument_ref=model.InstrumentRef(id='Instrument:0'), instrument_ref=model.InstrumentRef(id='Instrument:0'),
objective_settings=model.ObjectiveSettings( objective_settings=model.ObjectiveSettings(
id=objective_settings.find("ObjectiveRef").attrib["Id"], id=objective_settings.find('ObjectiveRef').attrib['Id'],
medium=text(objective_settings.find("Medium")), medium=text(objective_settings.find('Medium')),
refractive_index=float(text(objective_settings.find("RefractiveIndex")))), refractive_index=float(text(objective_settings.find('RefractiveIndex')))),
stage_label=model.StageLabel( stage_label=model.StageLabel(
name=f"Scene position #0", name=f'Scene position #0',
x=center_position[0], x_unit=um, x=center_position[0], x_unit=um,
y=center_position[1], y_unit=um))) y=center_position[1], y_unit=um)))
for distance in metadata.find("Scaling").find("Items"): for distance in metadata.find('Scaling').find('Items'):
if distance.attrib["Id"] == "X": if distance.attrib['Id'] == 'X':
ome.images[0].pixels.physical_size_x = float(text(distance.find("Value"))) * 1e6 ome.images[0].pixels.physical_size_x = float(text(distance.find('Value'))) * 1e6
elif distance.attrib["Id"] == "Y": elif distance.attrib['Id'] == 'Y':
ome.images[0].pixels.physical_size_y = float(text(distance.find("Value"))) * 1e6 ome.images[0].pixels.physical_size_y = float(text(distance.find('Value'))) * 1e6
elif size_z > 1 and distance.attrib["Id"] == "Z": elif size_z > 1 and distance.attrib['Id'] == 'Z':
ome.images[0].pixels.physical_size_z = float(text(distance.find("Value"))) * 1e6 ome.images[0].pixels.physical_size_z = float(text(distance.find('Value'))) * 1e6
channels_im = {channel.attrib["Id"]: channel for channel in image.find("Dimensions").find("Channels")} channels_im = {channel.attrib['Id']: channel for channel in image.find('Dimensions').find('Channels')}
channels_ds = {channel.attrib["Id"]: channel for channel in display_setting.find("Channels")} channels_ds = {channel.attrib['Id']: channel for channel in display_setting.find('Channels')}
for idx, (key, channel) in enumerate(channels_im.items()): for idx, (key, channel) in enumerate(channels_im.items()):
detector_settings = channel.find("DetectorSettings") detector_settings = channel.find('DetectorSettings')
laser_scan_info = channel.find("LaserScanInfo") laser_scan_info = channel.find('LaserScanInfo')
detector = detector_settings.find("Detector") detector = detector_settings.find('Detector')
try: try:
binning = model.Binning(text(detector_settings.find("Binning"))) binning = model.Binning(text(detector_settings.find('Binning')))
except ValueError: except ValueError:
binning = model.Binning.OTHER binning = model.Binning.OTHER
light_sources_settings = channel.find("LightSourcesSettings") light_sources_settings = channel.find('LightSourcesSettings')
# no space in ome for multiple lightsources simultaneously # no space in ome for multiple lightsources simultaneously
if light_sources_settings is not None: if light_sources_settings is not None:
light_source_settings = light_sources_settings[0] light_source_settings = light_sources_settings[0]
light_source_settings = model.LightSourceSettings( light_source_settings = model.LightSourceSettings(
id="LightSource:" + "_".join([light_source_settings.find("LightSource").attrib["Id"] id='LightSource:' + '_'.join([light_source_settings.find('LightSource').attrib['Id']
for light_source_settings in light_sources_settings]), for light_source_settings in light_sources_settings]),
attenuation=float(text(light_source_settings.find("Attenuation"))), attenuation=float(text(light_source_settings.find('Attenuation'))),
wavelength=float(text(light_source_settings.find("Wavelength"))), wavelength=float(text(light_source_settings.find('Wavelength'))),
wavelength_unit=nm) wavelength_unit=nm)
else: else:
light_source_settings = None light_source_settings = None
ome.images[0].pixels.channels.append( ome.images[0].pixels.channels.append(
model.Channel( model.Channel(
id=f"Channel:{idx}", id=f'Channel:{idx}',
name=channel.attrib["Name"], name=channel.attrib['Name'],
acquisition_mode=text(channel.find("AcquisitionMode")).replace('SingleMoleculeLocalisation', acquisition_mode=text(channel.find('AcquisitionMode')).replace('SingleMoleculeLocalisation',
'SingleMoleculeImaging'), 'SingleMoleculeImaging'),
color=model.Color(text(channels_ds[channel.attrib["Id"]].find("Color"), 'white')), color=model.Color(text(channels_ds[channel.attrib['Id']].find('Color'), 'white')),
detector_settings=model.DetectorSettings( detector_settings=model.DetectorSettings(
id=detector.attrib["Id"].replace(" ", ""), id=detector.attrib['Id'].replace(' ', ""),
binning=binning), binning=binning),
emission_wavelength=i if (i := text(channel.find("EmissionWavelength"))) != '0' else '100', emission_wavelength=i if (i := text(channel.find('EmissionWavelength'))) != '0' else '100',
excitation_wavelength=text(channel.find("ExcitationWavelength")), excitation_wavelength=text(channel.find('ExcitationWavelength')),
# filter_set_ref=model.FilterSetRef(id=ome.instruments[0].filter_sets[filterset_idx].id), # filter_set_ref=model.FilterSetRef(id=ome.instruments[0].filter_sets[filterset_idx].id),
illumination_type=text(channel.find("IlluminationType")), illumination_type=text(channel.find('IlluminationType')),
light_source_settings=light_source_settings, light_source_settings=light_source_settings,
samples_per_pixel=int(text(laser_scan_info.find("Averaging"), "1")))) samples_per_pixel=int(text(laser_scan_info.find('Averaging'), '1'))))
exposure_times = [float(text(channel.find("LaserScanInfo").find("FrameTime"), "100")) for channel in exposure_times = [float(text(channel.find('LaserScanInfo').find('FrameTime'), '100')) for channel in
channels_im.values()] channels_im.values()]
delta_ts = attachments['TimeStamps'].data() delta_ts = attachments['TimeStamps'].data()
dt = np.diff(delta_ts)
if np.std(dt) / np.mean(dt) > 0.02:
dt = np.median(dt[dt > 0])
delta_ts = dt * np.arange(len(delta_ts))
warnings.warn(f'delta_t is inconsistent, using median value: {dt}')
for t, z, c in product(range(size_t), range(size_z), range(size_c)): for t, z, c in product(range(size_t), range(size_z), range(size_c)):
ome.images[0].pixels.planes.append( ome.images[0].pixels.planes.append(
model.Plane(the_c=c, the_z=z, the_t=t, delta_t=delta_ts[t], exposure_time=exposure_times[c])) model.Plane(the_c=c, the_z=z, the_t=t, delta_t=delta_ts[t], exposure_time=exposure_times[c]))
idx = 0 idx = 0
for layer in [] if (ml := metadata.find("Layers")) is None else ml: for layer in [] if (ml := metadata.find('Layers')) is None else ml:
rectangle = layer.find("Elements").find("Rectangle") rectangle = layer.find('Elements').find('Rectangle')
if rectangle is not None: if rectangle is not None:
geometry = rectangle.find("Geometry") geometry = rectangle.find('Geometry')
roi = model.ROI(id=f"ROI:{idx}", description=text(layer.find("Usage"))) roi = model.ROI(id=f'ROI:{idx}', description=text(layer.find('Usage')))
roi.union.append( roi.union.append(
model.Rectangle( model.Rectangle(
id='Shape:0:0', id='Shape:0:0',
height=float(text(geometry.find("Height"))), height=float(text(geometry.find('Height'))),
width=float(text(geometry.find("Width"))), width=float(text(geometry.find('Width'))),
x=float(text(geometry.find("Left"))), x=float(text(geometry.find('Left'))),
y=float(text(geometry.find("Top"))))) y=float(text(geometry.find('Top')))))
ome.rois.append(roi) ome.rois.append(roi)
ome.images[0].roi_refs.append(model.ROIRef(id=f"ROI:{idx}")) ome.images[0].roi_refs.append(model.ROIRef(id=f'ROI:{idx}'))
idx += 1 idx += 1
return ome return ome
@@ -380,71 +385,71 @@ class Reader(AbstractReader, ABC):
ome = model.OME() ome = model.OME()
metadata = tree.find("Metadata") metadata = tree.find('Metadata')
information = metadata.find("Information") information = metadata.find('Information')
display_setting = metadata.find("DisplaySetting") display_setting = metadata.find('DisplaySetting')
experiment = metadata.find("Experiment") experiment = metadata.find('Experiment')
acquisition_block = experiment.find("ExperimentBlocks").find("AcquisitionBlock") acquisition_block = experiment.find('ExperimentBlocks').find('AcquisitionBlock')
ome.experimenters = [model.Experimenter(id="Experimenter:0", ome.experimenters = [model.Experimenter(id='Experimenter:0',
user_name=information.find("User").find("DisplayName").text)] user_name=information.find('User').find('DisplayName').text)]
instrument = information.find("Instrument") instrument = information.find('Instrument')
ome.instruments.append(model.Instrument(id=instrument.attrib["Id"])) ome.instruments.append(model.Instrument(id=instrument.attrib['Id']))
for detector in instrument.find("Detectors"): for detector in instrument.find('Detectors'):
try: try:
detector_type = model.Detector_Type(text(detector.find("Type")).upper() or "") detector_type = model.Detector_Type(text(detector.find('Type')).upper() or "")
except ValueError: except ValueError:
detector_type = model.Detector_Type.OTHER detector_type = model.Detector_Type.OTHER
ome.instruments[0].detectors.append( ome.instruments[0].detectors.append(
model.Detector( model.Detector(
id=detector.attrib["Id"], model=text(detector.find("Manufacturer").find("Model")), id=detector.attrib['Id'], model=text(detector.find('Manufacturer').find('Model')),
amplification_gain=float(text(detector.find("AmplificationGain"))), amplification_gain=float(text(detector.find('AmplificationGain'))),
gain=float(text(detector.find("Gain"))), zoom=float(text(detector.find("Zoom"))), gain=float(text(detector.find('Gain'))), zoom=float(text(detector.find('Zoom'))),
type=detector_type type=detector_type
)) ))
for objective in instrument.find("Objectives"): for objective in instrument.find('Objectives'):
ome.instruments[0].objectives.append( ome.instruments[0].objectives.append(
model.Objective( model.Objective(
id=objective.attrib["Id"], id=objective.attrib['Id'],
model=text(objective.find("Manufacturer").find("Model")), model=text(objective.find('Manufacturer').find('Model')),
immersion=text(objective.find("Immersion")), immersion=text(objective.find('Immersion')),
lens_na=float(text(objective.find("LensNA"))), lens_na=float(text(objective.find('LensNA'))),
nominal_magnification=float(text(objective.find("NominalMagnification"))))) nominal_magnification=float(text(objective.find('NominalMagnification')))))
for light_source in def_list(instrument.find("LightSources")): for light_source in def_list(instrument.find('LightSources')):
if light_source.find("LightSourceType").find("Laser") is not None: if light_source.find('LightSourceType').find('Laser') is not None:
ome.instruments[0].lasers.append( ome.instruments[0].lasers.append(
model.Laser( model.Laser(
id=light_source.attrib["Id"], id=light_source.attrib['Id'],
model=text(light_source.find("Manufacturer").find("Model")), model=text(light_source.find('Manufacturer').find('Model')),
power=float(text(light_source.find("Power"))), power=float(text(light_source.find('Power'))),
wavelength=float( wavelength=float(
text(light_source.find("LightSourceType").find("Laser").find("Wavelength"))))) text(light_source.find('LightSourceType').find('Laser').find('Wavelength')))))
multi_track_setup = acquisition_block.find("MultiTrackSetup") multi_track_setup = acquisition_block.find('MultiTrackSetup')
for idx, tube_lens in enumerate({text(track_setup.find("TubeLensPosition")) for idx, tube_lens in enumerate({text(track_setup.find('TubeLensPosition'))
for track_setup in multi_track_setup}): for track_setup in multi_track_setup}):
ome.instruments[0].objectives.append( ome.instruments[0].objectives.append(
model.Objective(id=f"Objective:Tubelens:{idx}", model=tube_lens, model.Objective(id=f'Objective:Tubelens:{idx}', model=tube_lens,
nominal_magnification=float( nominal_magnification=float(
re.findall(r'\d+[,.]\d*', tube_lens)[0].replace(',', '.')) re.findall(r'\d+[,.]\d*', tube_lens)[0].replace(',', '.'))
)) ))
for idx, filter_ in enumerate({text(beam_splitter.find("Filter")) for idx, filter_ in enumerate({text(beam_splitter.find('Filter'))
for track_setup in multi_track_setup for track_setup in multi_track_setup
for beam_splitter in track_setup.find("BeamSplitters")}): for beam_splitter in track_setup.find('BeamSplitters')}):
ome.instruments[0].filter_sets.append( ome.instruments[0].filter_sets.append(
model.FilterSet(id=f"FilterSet:{idx}", model=filter_) model.FilterSet(id=f'FilterSet:{idx}', model=filter_)
) )
for idx, collimator in enumerate({text(track_setup.find("FWFOVPosition")) for idx, collimator in enumerate({text(track_setup.find('FWFOVPosition'))
for track_setup in multi_track_setup}): for track_setup in multi_track_setup}):
ome.instruments[0].filters.append(model.Filter(id=f"Filter:Collimator:{idx}", model=collimator)) ome.instruments[0].filters.append(model.Filter(id=f'Filter:Collimator:{idx}', model=collimator))
x_min = min([f.start[f.axes.index('X')] for f in self.filedict[0, 0, 0]]) x_min = min([f.start[f.axes.index('X')] for f in self.filedict[0, 0, 0]])
y_min = min([f.start[f.axes.index('Y')] for f in self.filedict[0, 0, 0]]) y_min = min([f.start[f.axes.index('Y')] for f in self.filedict[0, 0, 0]])
@@ -455,117 +460,123 @@ class Reader(AbstractReader, ABC):
size_c, size_z, size_t = (self.reader.shape[self.reader.axes.index(directory_entry)] size_c, size_z, size_t = (self.reader.shape[self.reader.axes.index(directory_entry)]
for directory_entry in 'CZT') for directory_entry in 'CZT')
image = information.find("Image") image = information.find('Image')
pixel_type = text(image.find("PixelType"), "Gray16") pixel_type = text(image.find('PixelType'), 'Gray16')
if pixel_type.startswith("Gray"): if pixel_type.startswith('Gray'):
pixel_type = "uint" + pixel_type[4:] pixel_type = 'uint' + pixel_type[4:]
objective_settings = image.find("ObjectiveSettings") objective_settings = image.find('ObjectiveSettings')
scenes = image.find("Dimensions").find("S").find("Scenes") scenes = image.find('Dimensions').find('S').find('Scenes')
positions = scenes[0].find("Positions")[0] positions = scenes[0].find('Positions')[0]
um = model.UnitsLength.MICROMETER um = model.UnitsLength.MICROMETER
nm = model.UnitsLength.NANOMETER nm = model.UnitsLength.NANOMETER
ome.images.append( ome.images.append(
model.Image( model.Image(
id="Image:0", id='Image:0',
name=f'{text(information.find("Document").find("Name"))} #1', name=f"{text(information.find('Document').find('Name'))} #1",
pixels=model.Pixels( pixels=model.Pixels(
id="Pixels:0", size_x=size_x, size_y=size_y, id='Pixels:0', size_x=size_x, size_y=size_y,
size_c=size_c, size_z=size_z, size_t=size_t, size_c=size_c, size_z=size_z, size_t=size_t,
dimension_order="XYCZT", type=pixel_type, dimension_order='XYCZT', type=pixel_type,
significant_bits=int(text(image.find("ComponentBitCount"))), significant_bits=int(text(image.find('ComponentBitCount'))),
big_endian=False, interleaved=False, metadata_only=True), big_endian=False, interleaved=False, metadata_only=True),
experimenter_ref=model.ExperimenterRef(id='Experimenter:0'), experimenter_ref=model.ExperimenterRef(id='Experimenter:0'),
instrument_ref=model.InstrumentRef(id='Instrument:0'), instrument_ref=model.InstrumentRef(id='Instrument:0'),
objective_settings=model.ObjectiveSettings( objective_settings=model.ObjectiveSettings(
id=objective_settings.find("ObjectiveRef").attrib["Id"], id=objective_settings.find('ObjectiveRef').attrib['Id'],
medium=text(objective_settings.find("Medium")), medium=text(objective_settings.find('Medium')),
refractive_index=float(text(objective_settings.find("RefractiveIndex")))), refractive_index=float(text(objective_settings.find('RefractiveIndex')))),
stage_label=model.StageLabel( stage_label=model.StageLabel(
name=f"Scene position #0", name=f'Scene position #0',
x=float(positions.attrib["X"]), x_unit=um, x=float(positions.attrib['X']), x_unit=um,
y=float(positions.attrib["Y"]), y_unit=um, y=float(positions.attrib['Y']), y_unit=um,
z=float(positions.attrib["Z"]), z_unit=um))) z=float(positions.attrib['Z']), z_unit=um)))
for distance in metadata.find("Scaling").find("Items"): for distance in metadata.find('Scaling').find('Items'):
if distance.attrib["Id"] == "X": if distance.attrib['Id'] == 'X':
ome.images[0].pixels.physical_size_x = float(text(distance.find("Value"))) * 1e6 ome.images[0].pixels.physical_size_x = float(text(distance.find('Value'))) * 1e6
elif distance.attrib["Id"] == "Y": elif distance.attrib['Id'] == 'Y':
ome.images[0].pixels.physical_size_y = float(text(distance.find("Value"))) * 1e6 ome.images[0].pixels.physical_size_y = float(text(distance.find('Value'))) * 1e6
elif size_z > 1 and distance.attrib["Id"] == "Z": elif size_z > 1 and distance.attrib['Id'] == 'Z':
ome.images[0].pixels.physical_size_z = float(text(distance.find("Value"))) * 1e6 ome.images[0].pixels.physical_size_z = float(text(distance.find('Value'))) * 1e6
channels_im = {channel.attrib["Id"]: channel for channel in image.find("Dimensions").find("Channels")} channels_im = {channel.attrib['Id']: channel for channel in image.find('Dimensions').find('Channels')}
channels_ds = {channel.attrib["Id"]: channel for channel in display_setting.find("Channels")} channels_ds = {channel.attrib['Id']: channel for channel in display_setting.find('Channels')}
channels_ts = {detector.attrib["Id"]: track_setup channels_ts = {detector.attrib['Id']: track_setup
for track_setup in for track_setup in
experiment.find("ExperimentBlocks").find("AcquisitionBlock").find("MultiTrackSetup") experiment.find('ExperimentBlocks').find('AcquisitionBlock').find('MultiTrackSetup')
for detector in track_setup.find("Detectors")} for detector in track_setup.find('Detectors')}
for idx, (key, channel) in enumerate(channels_im.items()): for idx, (key, channel) in enumerate(channels_im.items()):
detector_settings = channel.find("DetectorSettings") detector_settings = channel.find('DetectorSettings')
laser_scan_info = channel.find("LaserScanInfo") laser_scan_info = channel.find('LaserScanInfo')
detector = detector_settings.find("Detector") detector = detector_settings.find('Detector')
try: try:
binning = model.Binning(text(detector_settings.find("Binning"))) binning = model.Binning(text(detector_settings.find('Binning')))
except ValueError: except ValueError:
binning = model.Binning.OTHER binning = model.Binning.OTHER
filterset = text(channels_ts[key].find("BeamSplitters")[0].find("Filter")) filterset = text(channels_ts[key].find('BeamSplitters')[0].find('Filter'))
filterset_idx = [filterset.model for filterset in ome.instruments[0].filter_sets].index(filterset) filterset_idx = [filterset.model for filterset in ome.instruments[0].filter_sets].index(filterset)
light_sources_settings = channel.find("LightSourcesSettings") light_sources_settings = channel.find('LightSourcesSettings')
# no space in ome for multiple lightsources simultaneously # no space in ome for multiple lightsources simultaneously
if len(light_sources_settings) > idx: if len(light_sources_settings) > idx:
light_source_settings = light_sources_settings[idx] light_source_settings = light_sources_settings[idx]
else: else:
light_source_settings = light_sources_settings[0] light_source_settings = light_sources_settings[0]
light_source_settings = model.LightSourceSettings( light_source_settings = model.LightSourceSettings(
id=light_source_settings.find("LightSource").attrib["Id"], id=light_source_settings.find('LightSource').attrib['Id'],
attenuation=float(text(light_source_settings.find("Attenuation"))), attenuation=float(text(light_source_settings.find('Attenuation'))),
wavelength=float(text(light_source_settings.find("Wavelength"))), wavelength=float(text(light_source_settings.find('Wavelength'))),
wavelength_unit=nm) wavelength_unit=nm)
ome.images[0].pixels.channels.append( ome.images[0].pixels.channels.append(
model.Channel( model.Channel(
id=f"Channel:{idx}", id=f'Channel:{idx}',
name=channel.attrib["Name"], name=channel.attrib['Name'],
acquisition_mode=text(channel.find("AcquisitionMode")), acquisition_mode=text(channel.find('AcquisitionMode')),
color=model.Color(text(channels_ds[channel.attrib["Id"]].find("Color"), 'white')), color=model.Color(text(channels_ds[channel.attrib['Id']].find('Color'), 'white')),
detector_settings=model.DetectorSettings(id=detector.attrib["Id"], binning=binning), detector_settings=model.DetectorSettings(id=detector.attrib['Id'], binning=binning),
# emission_wavelength=text(channel.find("EmissionWavelength")), # TODO: fix # emission_wavelength=text(channel.find('EmissionWavelength')), # TODO: fix
excitation_wavelength=light_source_settings.wavelength, excitation_wavelength=light_source_settings.wavelength,
filter_set_ref=model.FilterSetRef(id=ome.instruments[0].filter_sets[filterset_idx].id), filter_set_ref=model.FilterSetRef(id=ome.instruments[0].filter_sets[filterset_idx].id),
illumination_type=text(channel.find("IlluminationType")), illumination_type=text(channel.find('IlluminationType')),
light_source_settings=light_source_settings, light_source_settings=light_source_settings,
samples_per_pixel=int(text(laser_scan_info.find("Averaging"))))) samples_per_pixel=int(text(laser_scan_info.find('Averaging')))))
exposure_times = [float(text(channel.find("LaserScanInfo").find("FrameTime"))) for channel in exposure_times = [float(text(channel.find('LaserScanInfo').find('FrameTime'))) for channel in
channels_im.values()] channels_im.values()]
delta_ts = attachments['TimeStamps'].data() delta_ts = attachments['TimeStamps'].data()
dt = np.diff(delta_ts)
if np.std(dt) / np.mean(dt) > 0.02:
dt = np.median(dt[dt > 0])
delta_ts = dt * np.arange(len(delta_ts))
warnings.warn(f'delta_t is inconsistent, using median value: {dt}')
for t, z, c in product(range(size_t), range(size_z), range(size_c)): for t, z, c in product(range(size_t), range(size_z), range(size_c)):
ome.images[0].pixels.planes.append( ome.images[0].pixels.planes.append(
model.Plane(the_c=c, the_z=z, the_t=t, delta_t=delta_ts[t], model.Plane(the_c=c, the_z=z, the_t=t, delta_t=delta_ts[t],
exposure_time=exposure_times[c], exposure_time=exposure_times[c],
position_x=float(positions.attrib["X"]), position_x_unit=um, position_x=float(positions.attrib['X']), position_x_unit=um,
position_y=float(positions.attrib["Y"]), position_y_unit=um, position_y=float(positions.attrib['Y']), position_y_unit=um,
position_z=float(positions.attrib["Z"]), position_z_unit=um)) position_z=float(positions.attrib['Z']), position_z_unit=um))
idx = 0 idx = 0
for layer in [] if (ml := metadata.find("Layers")) is None else ml: for layer in [] if (ml := metadata.find('Layers')) is None else ml:
rectangle = layer.find("Elements").find("Rectangle") rectangle = layer.find('Elements').find('Rectangle')
if rectangle is not None: if rectangle is not None:
geometry = rectangle.find("Geometry") geometry = rectangle.find('Geometry')
roi = model.ROI(id=f"ROI:{idx}", description=text(layer.find("Usage"))) roi = model.ROI(id=f'ROI:{idx}', description=text(layer.find('Usage')))
roi.union.append( roi.union.append(
model.Rectangle( model.Rectangle(
id='Shape:0:0', id='Shape:0:0',
height=float(text(geometry.find("Height"))), height=float(text(geometry.find('Height'))),
width=float(text(geometry.find("Width"))), width=float(text(geometry.find('Width'))),
x=float(text(geometry.find("Left"))), x=float(text(geometry.find('Left'))),
y=float(text(geometry.find("Top"))))) y=float(text(geometry.find('Top')))))
ome.rois.append(roi) ome.rois.append(roi)
ome.images[0].roi_refs.append(model.ROIRef(id=f"ROI:{idx}")) ome.images[0].roi_refs.append(model.ROIRef(id=f'ROI:{idx}'))
idx += 1 idx += 1
return ome return ome

View File

@@ -1,5 +1,4 @@
from abc import ABC from abc import ABC
from functools import cached_property
from itertools import product from itertools import product
from pathlib import Path from pathlib import Path
from struct import unpack from struct import unpack
@@ -32,8 +31,8 @@ class Reader(AbstractReader, ABC):
def open(self): def open(self):
warn(f'File {self.path.name} is probably damaged, opening with fijiread.') warn(f'File {self.path.name} is probably damaged, opening with fijiread.')
self.reader = TiffFile(self.path) self.reader = TiffFile(self.path)
assert self.reader.pages[0].compression == 1, "Can only read uncompressed tiff files." assert self.reader.pages[0].compression == 1, 'Can only read uncompressed tiff files.'
assert self.reader.pages[0].samplesperpixel == 1, "Can only read 1 sample per pixel." assert self.reader.pages[0].samplesperpixel == 1, 'Can only read 1 sample per pixel.'
self.offset = self.reader.pages[0].dataoffsets[0] # noqa self.offset = self.reader.pages[0].dataoffsets[0] # noqa
self.count = self.reader.pages[0].databytecounts[0] # noqa self.count = self.reader.pages[0].databytecounts[0] # noqa
self.bytes_per_sample = self.reader.pages[0].bitspersample // 8 # noqa self.bytes_per_sample = self.reader.pages[0].bitspersample // 8 # noqa
@@ -42,8 +41,7 @@ class Reader(AbstractReader, ABC):
def close(self): def close(self):
self.reader.close() self.reader.close()
@cached_property def get_ome(self):
def ome(self):
size_y, size_x = self.reader.pages[0].shape size_y, size_x = self.reader.pages[0].shape
size_c, size_z = 1, 1 size_c, size_z = 1, 1
size_t = int(np.floor((self.reader.filehandle.size - self.reader.pages[0].dataoffsets[0]) / self.count)) size_t = int(np.floor((self.reader.filehandle.size - self.reader.pages[0].dataoffsets[0]) / self.count))
@@ -54,8 +52,8 @@ class Reader(AbstractReader, ABC):
model.Image( model.Image(
pixels=model.Pixels( pixels=model.Pixels(
size_c=size_c, size_z=size_z, size_t=size_t, size_x=size_x, size_y=size_y, size_c=size_c, size_z=size_z, size_t=size_t, size_x=size_x, size_y=size_y,
dimension_order="XYCZT", type=pixel_type), dimension_order='XYCZT', type=pixel_type),
objective_settings=model.ObjectiveSettings(id="Objective:0"))) objective_settings=model.ObjectiveSettings(id='Objective:0')))
for c, z, t in product(range(size_c), range(size_z), range(size_t)): for c, z, t in product(range(size_c), range(size_z), range(size_t)):
ome.images[0].pixels.planes.append(model.Plane(the_c=c, the_z=z, the_t=t, delta_t=0)) ome.images[0].pixels.planes.append(model.Plane(the_c=c, the_z=z, the_t=t, delta_t=0))
return ome return ome

View File

@@ -1,5 +1,4 @@
from abc import ABC from abc import ABC
from functools import cached_property
from itertools import product from itertools import product
import numpy as np import numpy as np
@@ -15,8 +14,7 @@ class Reader(AbstractReader, ABC):
def _can_open(path): def _can_open(path):
return isinstance(path, np.ndarray) and 1 <= path.ndim <= 5 return isinstance(path, np.ndarray) and 1 <= path.ndim <= 5
@cached_property def get_ome(self):
def ome(self):
def shape(size_x=1, size_y=1, size_c=1, size_z=1, size_t=1): # noqa def shape(size_x=1, size_y=1, size_c=1, size_z=1, size_t=1): # noqa
return size_x, size_y, size_c, size_z, size_t return size_x, size_y, size_c, size_z, size_t
size_x, size_y, size_c, size_z, size_t = shape(*self.array.shape) size_x, size_y, size_c, size_z, size_t = shape(*self.array.shape)
@@ -34,8 +32,8 @@ class Reader(AbstractReader, ABC):
model.Image( model.Image(
pixels=model.Pixels( pixels=model.Pixels(
size_c=size_c, size_z=size_z, size_t=size_t, size_x=size_x, size_y=size_y, size_c=size_c, size_z=size_z, size_t=size_t, size_x=size_x, size_y=size_y,
dimension_order="XYCZT", type=pixel_type), dimension_order='XYCZT', type=pixel_type),
objective_settings=model.ObjectiveSettings(id="Objective:0"))) objective_settings=model.ObjectiveSettings(id='Objective:0')))
for c, z, t in product(range(size_c), range(size_z), range(size_t)): for c, z, t in product(range(size_c), range(size_z), range(size_t)):
ome.images[0].pixels.planes.append(model.Plane(the_c=c, the_z=z, the_t=t, delta_t=0)) ome.images[0].pixels.planes.append(model.Plane(the_c=c, the_z=z, the_t=t, delta_t=0))
return ome return ome

View File

@@ -1,7 +1,6 @@
import re import re
from abc import ABC from abc import ABC
from datetime import datetime from datetime import datetime
from functools import cached_property
from itertools import product from itertools import product
from pathlib import Path from pathlib import Path
@@ -39,7 +38,7 @@ class Plane(model.Plane):
def get_delta_t(t0, file): def get_delta_t(t0, file):
with tifffile.TiffFile(file) as tif: with tifffile.TiffFile(file) as tif:
info = yaml.safe_load(tif.pages[0].tags[50839].value['Info']) info = yaml.safe_load(tif.pages[0].tags[50839].value['Info'])
return float((datetime.strptime(info["Time"], "%Y-%m-%d %H:%M:%S %z") - t0).seconds) return float((datetime.strptime(info['Time'], '%Y-%m-%d %H:%M:%S %z') - t0).seconds)
class Reader(AbstractReader, ABC): class Reader(AbstractReader, ABC):
@@ -49,86 +48,87 @@ class Reader(AbstractReader, ABC):
def _can_open(path): def _can_open(path):
return isinstance(path, Path) and path.is_dir() return isinstance(path, Path) and path.is_dir()
@cached_property def get_ome(self):
def ome(self):
ome = model.OME() ome = model.OME()
with tifffile.TiffFile(self.filedict[0, 0, 0]) as tif: with tifffile.TiffFile(self.filedict[0, 0, 0]) as tif:
metadata = {key: yaml.safe_load(value) for key, value in tif.pages[0].tags[50839].value.items()} metadata = {key: yaml.safe_load(value) for key, value in tif.pages[0].tags[50839].value.items()}
ome.experimenters.append( ome.experimenters.append(
model.Experimenter(id="Experimenter:0", user_name=metadata["Info"]["Summary"]["UserName"])) model.Experimenter(id='Experimenter:0', user_name=metadata['Info']['Summary']['UserName']))
objective_str = metadata["Info"]["ZeissObjectiveTurret-Label"] objective_str = metadata['Info']['ZeissObjectiveTurret-Label']
ome.instruments.append(model.Instrument()) ome.instruments.append(model.Instrument())
ome.instruments[0].objectives.append( ome.instruments[0].objectives.append(
model.Objective( model.Objective(
id="Objective:0", manufacturer="Zeiss", model=objective_str, id='Objective:0', manufacturer='Zeiss', model=objective_str,
nominal_magnification=float(re.findall(r"(\d+)x", objective_str)[0]), nominal_magnification=float(re.findall(r'(\d+)x', objective_str)[0]),
lens_na=float(re.findall(r"/(\d\.\d+)", objective_str)[0]), lens_na=float(re.findall(r'/(\d\.\d+)', objective_str)[0]),
immersion=model.Objective_Immersion.OIL if 'oil' in objective_str.lower() else None)) immersion=model.Objective_Immersion.OIL if 'oil' in objective_str.lower() else None))
tubelens_str = metadata["Info"]["ZeissOptovar-Label"] tubelens_str = metadata['Info']['ZeissOptovar-Label']
ome.instruments[0].objectives.append( ome.instruments[0].objectives.append(
model.Objective( model.Objective(
id="Objective:Tubelens:0", manufacturer="Zeiss", model=tubelens_str, id='Objective:Tubelens:0', manufacturer='Zeiss', model=tubelens_str,
nominal_magnification=float(re.findall(r"\d?\d*[,.]?\d+(?=x$)", tubelens_str)[0].replace(",", ".")))) nominal_magnification=float(re.findall(r'\d?\d*[,.]?\d+(?=x$)', tubelens_str)[0].replace(',', '.'))))
ome.instruments[0].detectors.append( ome.instruments[0].detectors.append(
model.Detector( model.Detector(
id="Detector:0", amplification_gain=100)) id='Detector:0', amplification_gain=100))
ome.instruments[0].filter_sets.append( ome.instruments[0].filter_sets.append(
model.FilterSet(id='FilterSet:0', model=metadata["Info"]["ZeissReflectorTurret-Label"])) model.FilterSet(id='FilterSet:0', model=metadata['Info']['ZeissReflectorTurret-Label']))
pxsize = metadata["Info"]["PixelSizeUm"] pxsize = metadata['Info']['PixelSizeUm']
pxsize_cam = 6.5 if 'Hamamatsu' in metadata["Info"]["Core-Camera"] else None pxsize_cam = 6.5 if 'Hamamatsu' in metadata['Info']['Core-Camera'] else None
if pxsize == 0: if pxsize == 0:
pxsize = pxsize_cam / ome.instruments[0].objectives[0].nominal_magnification pxsize = pxsize_cam / ome.instruments[0].objectives[0].nominal_magnification
pixel_type = metadata["Info"]["PixelType"].lower() pixel_type = metadata['Info']['PixelType'].lower()
if pixel_type.startswith("gray"): if pixel_type.startswith('gray'):
pixel_type = "uint" + pixel_type[4:] pixel_type = 'uint' + pixel_type[4:]
else: else:
pixel_type = "uint16" # assume pixel_type = 'uint16' # assume
size_c, size_z, size_t = (max(i) + 1 for i in zip(*self.filedict.keys())) size_c, size_z, size_t = (max(i) + 1 for i in zip(*self.filedict.keys()))
t0 = datetime.strptime(metadata["Info"]["Time"], "%Y-%m-%d %H:%M:%S %z") t0 = datetime.strptime(metadata['Info']['Time'], '%Y-%m-%d %H:%M:%S %z')
ome.images.append( ome.images.append(
model.Image( model.Image(
pixels=model.Pixels( pixels=model.Pixels(
size_c=size_c, size_z=size_z, size_t=size_t, size_c=size_c, size_z=size_z, size_t=size_t,
size_x=metadata['Info']['Width'], size_y=metadata['Info']['Height'], size_x=metadata['Info']['Width'], size_y=metadata['Info']['Height'],
dimension_order="XYCZT", type=pixel_type, physical_size_x=pxsize, physical_size_y=pxsize, dimension_order='XYCZT', type=pixel_type, physical_size_x=pxsize, physical_size_y=pxsize,
physical_size_z=metadata["Info"]["Summary"]["z-step_um"]), physical_size_z=metadata['Info']['Summary']['z-step_um']),
objective_settings=model.ObjectiveSettings(id="Objective:0"))) objective_settings=model.ObjectiveSettings(id='Objective:0')))
for c, z, t in product(range(size_c), range(size_z), range(size_t)): for c, z, t in product(range(size_c), range(size_z), range(size_t)):
ome.images[0].pixels.planes.append( ome.images[0].pixels.planes.append(
Plane(t0, self.filedict[c, z, t], Plane(t0, self.filedict[c, z, t],
the_c=c, the_z=z, the_t=t, exposure_time=metadata["Info"]["Exposure-ms"] / 1000)) the_c=c, the_z=z, the_t=t, exposure_time=metadata['Info']['Exposure-ms'] / 1000))
# compare channel names from metadata with filenames # compare channel names from metadata with filenames
pattern_c = re.compile(r"img_\d{3,}_(.*)_\d{3,}$") pattern_c = re.compile(r'img_\d{3,}_(.*)_\d{3,}$')
for c in range(size_c): for c in range(size_c):
ome.images[0].pixels.channels.append( ome.images[0].pixels.channels.append(
model.Channel( model.Channel(
id=f"Channel:{c}", name=pattern_c.findall(self.filedict[c, 0, 0].stem)[0], id=f'Channel:{c}', name=pattern_c.findall(self.filedict[c, 0, 0].stem)[0],
detector_settings=model.DetectorSettings( detector_settings=model.DetectorSettings(
id="Detector:0", binning=metadata["Info"]["Hamamatsu_sCMOS-Binning"]), id='Detector:0', binning=metadata['Info']['Hamamatsu_sCMOS-Binning']),
filter_set_ref=model.FilterSetRef(id='FilterSet:0'))) filter_set_ref=model.FilterSetRef(id='FilterSet:0')))
return ome return ome
def open(self): def open(self):
if re.match(r'(?:\d+-)?Pos.*', self.path.name) is None: pat = re.compile(r'(?:\d+-)?Pos.*')
path = self.path / f"Pos{self.series}" if pat.match(self.path.name) is None:
path = sorted(file for file in self.path.iterdir() if pat.match(file.name))[self.series]
else: else:
path = self.path path = self.path
filelist = sorted([file for file in path.iterdir() if re.search(r'^img_\d{3,}.*\d{3,}.*\.tif$', file.name)]) pat = re.compile(r'^img_\d{3,}.*\d{3,}.*\.tif$')
filelist = sorted([file for file in path.iterdir() if pat.search(file.name)])
with tifffile.TiffFile(self.path / filelist[0]) as tif: with tifffile.TiffFile(self.path / filelist[0]) as tif:
metadata = {key: yaml.safe_load(value) for key, value in tif.pages[0].tags[50839].value.items()} metadata = {key: yaml.safe_load(value) for key, value in tif.pages[0].tags[50839].value.items()}
# compare channel names from metadata with filenames # compare channel names from metadata with filenames
cnamelist = metadata["Info"]["Summary"]["ChNames"] cnamelist = metadata['Info']['Summary']['ChNames']
cnamelist = [c for c in cnamelist if any([c in f.name for f in filelist])] cnamelist = [c for c in cnamelist if any([c in f.name for f in filelist])]
pattern_c = re.compile(r"img_\d{3,}_(.*)_\d{3,}$") pattern_c = re.compile(r'img_\d{3,}_(.*)_\d{3,}$')
pattern_z = re.compile(r"(\d{3,})$") pattern_z = re.compile(r'(\d{3,})$')
pattern_t = re.compile(r"img_(\d{3,})") pattern_t = re.compile(r'img_(\d{3,})')
self.filedict = {(cnamelist.index(pattern_c.findall(file.stem)[0]), # noqa self.filedict = {(cnamelist.index(pattern_c.findall(file.stem)[0]), # noqa
int(pattern_z.findall(file.stem)[0]), int(pattern_z.findall(file.stem)[0]),
int(pattern_t.findall(file.stem)[0])): file for file in filelist} int(pattern_t.findall(file.stem)[0])): file for file in filelist}

View File

@@ -24,22 +24,21 @@ class Reader(AbstractReader, ABC):
return False return False
@cached_property @cached_property
def ome(self): def metadata(self):
metadata = {key: yaml.safe_load(value) if isinstance(value, str) else value return {key: yaml.safe_load(value) if isinstance(value, str) else value
for key, value in self.reader.imagej_metadata.items()} for key, value in self.reader.imagej_metadata.items()}
def get_ome(self):
page = self.reader.pages[0] page = self.reader.pages[0]
self.p_ndim = page.ndim # noqa
size_y = page.imagelength size_y = page.imagelength
size_x = page.imagewidth size_x = page.imagewidth
if self.p_ndim == 3: if self.p_ndim == 3:
size_c = page.samplesperpixel size_c = page.samplesperpixel
self.p_transpose = [i for i in [page.axes.find(j) for j in 'SYX'] if i >= 0] # noqa size_t = self.metadata.get('frames', 1) # // C
size_t = metadata.get('frames', 1) # // C
else: else:
size_c = metadata.get('channels', 1) size_c = self.metadata.get('channels', 1)
size_t = metadata.get('frames', 1) size_t = self.metadata.get('frames', 1)
size_z = metadata.get('slices', 1) size_z = self.metadata.get('slices', 1)
if 282 in page.tags and 296 in page.tags and page.tags[296].value == 1: if 282 in page.tags and 296 in page.tags and page.tags[296].value == 1:
f = page.tags[282].value f = page.tags[282].value
pxsize = f[1] / f[0] pxsize = f[1] / f[0]
@@ -51,7 +50,7 @@ class Reader(AbstractReader, ABC):
'float', 'double', 'complex', 'double-complex', 'bit'): 'float', 'double', 'complex', 'double-complex', 'bit'):
dtype = 'float' dtype = 'float'
interval_t = metadata.get('interval', 0) interval_t = self.metadata.get('interval', 0)
ome = model.OME() ome = model.OME()
ome.instruments.append(model.Instrument(id='Instrument:0')) ome.instruments.append(model.Instrument(id='Instrument:0'))
@@ -62,14 +61,18 @@ class Reader(AbstractReader, ABC):
pixels=model.Pixels( pixels=model.Pixels(
id='Pixels:0', id='Pixels:0',
size_c=size_c, size_z=size_z, size_t=size_t, size_x=size_x, size_y=size_y, size_c=size_c, size_z=size_z, size_t=size_t, size_x=size_x, size_y=size_y,
dimension_order="XYCZT", type=dtype, physical_size_x=pxsize, physical_size_y=pxsize), dimension_order='XYCZT', type=dtype, physical_size_x=pxsize, physical_size_y=pxsize),
objective_settings=model.ObjectiveSettings(id="Objective:0"))) objective_settings=model.ObjectiveSettings(id='Objective:0')))
for c, z, t in product(range(size_c), range(size_z), range(size_t)): for c, z, t in product(range(size_c), range(size_z), range(size_t)):
ome.images[0].pixels.planes.append(model.Plane(the_c=c, the_z=z, the_t=t, delta_t=interval_t * t)) ome.images[0].pixels.planes.append(model.Plane(the_c=c, the_z=z, the_t=t, delta_t=interval_t * t))
return ome return ome
def open(self): def open(self):
self.reader = tifffile.TiffFile(self.path) self.reader = tifffile.TiffFile(self.path)
page = self.reader.pages[0]
self.p_ndim = page.ndim # noqa
if self.p_ndim == 3:
self.p_transpose = [i for i in [page.axes.find(j) for j in 'SYX'] if i >= 0] # noqa
def close(self): def close(self):
self.reader.close() self.reader.close()

View File

@@ -88,7 +88,7 @@ class Transforms(dict):
return hash(frozenset((*self.__dict__.items(), *self.items()))) return hash(frozenset((*self.__dict__.items(), *self.items())))
def save(self, file): def save(self, file):
with open(Path(file).with_suffix(".yml"), 'w') as f: with open(Path(file).with_suffix('.yml'), 'w') as f:
yaml.safe_dump(self.asdict(), f, default_flow_style=None) yaml.safe_dump(self.asdict(), f, default_flow_style=None)
def copy(self): def copy(self):
@@ -136,7 +136,7 @@ class Transforms(dict):
raise TypeError('Not a pandas DataFrame or Series.') raise TypeError('Not a pandas DataFrame or Series.')
def with_beads(self, cyllens, bead_files): def with_beads(self, cyllens, bead_files):
assert len(bead_files) > 0, "At least one file is needed to calculate the registration." assert len(bead_files) > 0, 'At least one file is needed to calculate the registration.'
transforms = [self.calculate_channel_transforms(file, cyllens) for file in bead_files] transforms = [self.calculate_channel_transforms(file, cyllens) for file in bead_files]
for key in {key for transform in transforms for key in transform.keys()}: for key in {key for transform in transforms for key in transform.keys()}:
new_transforms = [transform[key] for transform in transforms if key in transform] new_transforms = [transform[key] for transform in transforms if key in transform]

View File

@@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "ndbioimage" name = "ndbioimage"
version = "2024.3.6" version = "2024.4.0"
description = "Bio image reading, metadata and some affine registration." description = "Bio image reading, metadata and some affine registration."
authors = ["W. Pomp <w.pomp@nki.nl>"] authors = ["W. Pomp <w.pomp@nki.nl>"]
license = "GPLv3" license = "GPLv3"
@@ -22,7 +22,7 @@ pint = "*"
tqdm = "*" tqdm = "*"
lxml = "*" lxml = "*"
pyyaml = "*" pyyaml = "*"
parfor = ">=2023.10.1" parfor = ">=2024.3.0"
JPype1 = "*" JPype1 = "*"
SimpleITK-SimpleElastix = "*" SimpleITK-SimpleElastix = "*"
scikit-image = "*" scikit-image = "*"

View File

@@ -3,6 +3,7 @@ from multiprocessing import active_children
from pathlib import Path from pathlib import Path
import pytest import pytest
from ndbioimage import Imread, ReaderNotFoundError from ndbioimage import Imread, ReaderNotFoundError

View File

@@ -3,6 +3,7 @@ from numbers import Number
import numpy as np import numpy as np
import pytest import pytest
from ndbioimage import Imread from ndbioimage import Imread
r = np.random.randint(0, 255, (64, 64, 2, 3, 4)) r = np.random.randint(0, 255, (64, 64, 2, 3, 4))

View File

@@ -2,6 +2,7 @@ from itertools import product
import numpy as np import numpy as np
import pytest import pytest
from ndbioimage import Imread from ndbioimage import Imread
r = np.random.randint(0, 255, (64, 64, 2, 3, 4)) r = np.random.randint(0, 255, (64, 64, 2, 3, 4))