Files
ndbioimage/ndbioimage/readers/seqread.py
2026-03-26 16:39:16 +01:00

196 lines
8.0 KiB
Python

import re
import warnings
from abc import ABC
from datetime import datetime
from itertools import product
from pathlib import Path
import tifffile
import yaml
from ome_types import model
from ome_types.units import _quantity_property # noqa
from .. import AbstractReader
def lazy_property(function, field, *arg_fields):
def lazy(self):
if self.__dict__.get(field) is None:
self.__dict__[field] = function(*[getattr(self, arg_field) for arg_field in arg_fields])
try:
self.model_fields_set.add(field)
except Exception: # noqa
pass
return self.__dict__[field]
return property(lazy)
class Plane(model.Plane):
"""Lazily retrieve delta_t from metadata"""
def __init__(self, t0, file, **kwargs): # noqa
super().__init__(**kwargs)
# setting fields here because they would be removed by ome_types/pydantic after class definition
setattr(self.__class__, "delta_t", lazy_property(self.get_delta_t, "delta_t", "t0", "file"))
setattr(self.__class__, "delta_t_quantity", _quantity_property("delta_t"))
self.__dict__["t0"] = t0 # noqa
self.__dict__["file"] = file # noqa
@staticmethod
def get_delta_t(t0, file):
with tifffile.TiffFile(file) as tif:
info = yaml.safe_load(tif.pages[0].tags[50839].value["Info"])
return float((datetime.strptime(info["Time"], "%Y-%m-%d %H:%M:%S %z") - t0).seconds)
class Reader(AbstractReader, ABC):
priority = 10
@staticmethod
def _can_open(path):
pat = re.compile(r"(?:\d+-)?Pos.*", re.IGNORECASE)
return (
isinstance(path, Path)
and path.is_dir()
and (pat.match(path.name) or any(file.is_dir() and pat.match(file.stem) for file in path.iterdir()))
)
def get_ome(self):
ome = model.OME()
with tifffile.TiffFile(self.filedict[0, 0, 0]) as tif:
metadata = {key: yaml.safe_load(value) for key, value in tif.pages[0].tags[50839].value.items()}
if "Summary" in metadata["Info"] and "UserName" in metadata["Info"]["Summary"]:
ome.experimenters.append(
model.Experimenter(id="Experimenter:0", user_name=metadata["Info"]["Summary"]["UserName"])
)
objective_str = metadata["Info"]["ZeissObjectiveTurret-Label"]
ome.instruments.append(model.Instrument())
ome.instruments[0].objectives.append(
model.Objective(
id="Objective:0",
manufacturer="Zeiss",
model=objective_str,
nominal_magnification=float(re.findall(r"(\d+)x", objective_str)[0]),
lens_na=float(re.findall(r"/(\d\.\d+)", objective_str)[0]),
immersion=model.Objective_Immersion.OIL if "oil" in objective_str.lower() else None,
)
)
tubelens_str = metadata["Info"]["ZeissOptovar-Label"]
ome.instruments[0].objectives.append(
model.Objective(
id="Objective:Tubelens:0",
manufacturer="Zeiss",
model=tubelens_str,
nominal_magnification=float(re.findall(r"\d?\d*[,.]?\d+(?=x$)", tubelens_str)[0].replace(",", ".")),
)
)
ome.instruments[0].detectors.append(model.Detector(id="Detector:0", amplification_gain=100))
ome.instruments[0].filter_sets.append(
model.FilterSet(id="FilterSet:0", model=metadata["Info"]["ZeissReflectorTurret-Label"])
)
pxsize = metadata["Info"]["PixelSizeUm"]
pxsize_cam = 6.5 if "Hamamatsu" in metadata["Info"]["Core-Camera"] else None
if pxsize == 0:
pxsize = pxsize_cam / ome.instruments[0].objectives[0].nominal_magnification
pixel_type = metadata["Info"]["PixelType"].lower()
if pixel_type.startswith("gray"):
pixel_type = "uint" + pixel_type[4:]
else:
pixel_type = "uint16" # assume
size_c, size_z, size_t = (max(i) + 1 for i in zip(*self.filedict.keys()))
t0 = datetime.strptime(metadata["Info"]["Time"], "%Y-%m-%d %H:%M:%S %z")
ome.images.append(
model.Image(
pixels=model.Pixels(
size_c=size_c,
size_z=size_z,
size_t=size_t,
size_x=metadata["Info"]["Width"],
size_y=metadata["Info"]["Height"],
dimension_order="XYCZT", # type: ignore
type=pixel_type,
physical_size_x=pxsize,
physical_size_y=pxsize,
physical_size_z=metadata["Info"]["Summary"]["z-step_um"]
if "Summary" in metadata["Info"]
else None,
),
objective_settings=model.ObjectiveSettings(id="Objective:0"),
)
)
for c, z, t in product(range(size_c), range(size_z), range(size_t)):
ome.images[0].pixels.planes.append(
Plane(
t0,
self.filedict[c, z, t],
the_c=c,
the_z=z,
the_t=t,
exposure_time=metadata["Info"]["Exposure-ms"] / 1000,
)
)
# compare channel names from metadata with filenames
pattern_c = re.compile(r"img_\d{3,}_(.*)_\d{3,}$", re.IGNORECASE)
for c in range(size_c):
ome.images[0].pixels.channels.append(
model.Channel(
id=f"Channel:{c}",
name=pattern_c.findall(self.filedict[c, 0, 0].stem)[0],
detector_settings=model.DetectorSettings(
id="Detector:0", binning=metadata["Info"]["Hamamatsu_sCMOS-Binning"]
),
filter_set_ref=model.FilterSetRef(id="FilterSet:0"),
)
)
return ome
def open(self):
# /some_path/Pos4: path = /some_path, series = 4
# /some_path/5-Pos_001_005: path = /some_path/5-Pos_001_005, series = 0
if re.match(r"(?:\d+-)?Pos.*", self.path.name, re.IGNORECASE) is None:
pat = re.compile(rf"^(?:\d+-)?Pos{self.series}$", re.IGNORECASE)
files = sorted(file for file in self.path.iterdir() if pat.match(file.name))
if len(files):
path = files[0]
else:
raise FileNotFoundError(self.path / pat.pattern)
else:
path = self.path
pat = re.compile(r"^img_\d{3,}.*\d{3,}.*\.tif$", re.IGNORECASE)
filelist = sorted([file for file in path.iterdir() if pat.search(file.name)])
with tifffile.TiffFile(self.path / filelist[0]) as tif:
metadata = {key: yaml.safe_load(value) for key, value in tif.pages[0].tags[50839].value.items()}
# compare channel names from metadata with filenames
if "Summary" in metadata["Info"] and "ChNames" in metadata["Info"]["Summary"]:
cnamelist = metadata["Info"]["Summary"]["ChNames"]
elif (self.path.parent / "display_and_comments.txt").exists():
warnings.warn(f"{self.path} is missing some metadata")
with open(self.path.parent / "display_and_comments.txt") as f:
cnamelist = [channel["Name"] for channel in yaml.safe_load(f)["Channels"]]
else:
raise ValueError("could not find metadata describing the order of the channels")
cnamelist = [c for c in cnamelist if any([c in f.name for f in filelist])]
pattern_c = re.compile(r"img_\d{3,}_(.*)_\d{3,}$", re.IGNORECASE)
pattern_z = re.compile(r"(\d{3,})$")
pattern_t = re.compile(r"img_(\d{3,})", re.IGNORECASE)
self.filedict = {
(
cnamelist.index(pattern_c.findall(file.stem)[0]), # noqa
int(pattern_z.findall(file.stem)[0]),
int(pattern_t.findall(file.stem)[0]),
): file
for file in filelist
}
def __frame__(self, c=0, z=0, t=0):
return tifffile.imread(self.path / self.filedict[(c, z, t)])