Compare commits

13 Commits
rs ... main

Author SHA1 Message Date
w.pomp
e5eac07b7b - bugfix 2026-01-26 17:22:11 +01:00
w.pomp
8ff52f5af5 - main_channel and default_transform arguments for Imread.with_transform 2026-01-26 17:03:27 +01:00
w.pomp
066a39719a - match ome.tif sequence metadata based on position number, with backup by taking any metadata found (with warning) 2026-01-16 11:32:00 +01:00
w.pomp
776b5204c4 - remove python 3.14 test as SimpleITK-SimpleElastix is not yet available 2025-11-27 15:45:55 +01:00
w.pomp
e27a0f2657 - cziread: deal with missing exposure times 2025-11-26 17:24:48 +01:00
Wim Pomp
7fe1d189e5 - fix reading time interval when defined not in s in ome
- search all ome.tif for metadata in order
2025-10-15 20:29:07 +02:00
w.pomp
1b5febc35b - add option to skip autoscaling brightness when saving as mp4
- let coords_pandas also deal with polars dataframes
2025-09-25 15:20:00 +02:00
Wim Pomp
1fe3b3c824 - Make bioformats optional because jpype can cause problems
- Ruff format
2025-08-06 11:03:03 +02:00
Wim Pomp
3346ed3a48 - Can pycharm please stop adding wrong imports please? 2025-03-19 16:14:28 +01:00
Wim Pomp
b7dadb645e - read metadata from first file in ome tiff sequence 2025-03-19 16:06:07 +01:00
Wim Pomp
0ac22aff87 - pyproject towards PEP621
- catch errors reading metadata in tifread
- read dimension order from ome
2025-03-19 15:04:57 +01:00
Wim Pomp
6daa372ccf - allow None in Transform.from_dict
- None transform parameters -> unit parameters/matrix
2025-02-14 15:45:43 +01:00
Wim Pomp
cb52e77c34 - allow None in Transform.from_dict 2025-02-14 15:00:55 +01:00
19 changed files with 1393 additions and 849 deletions

View File

@@ -7,7 +7,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.10", "3.11", "3.12"]
python-version: ["3.10", "3.11", "3.12", "3.13"]
steps:
- uses: actions/checkout@v4

1
.gitignore vendored
View File

@@ -10,3 +10,4 @@
/tests/files/*
/poetry.lock
/dist/
/uv.lock

View File

@@ -13,17 +13,19 @@ Currently, it supports imagej tif files, czi files, micromanager tif sequences a
## Installation
One of:
```
pip install ndbioimage
```
### Installation with option to write mp4 or mkv:
Work in progress! Make sure ffmpeg is installed.
```
pip install ndbioimage[bioformats]
pip install ndbioimage[write]
pip install ndbioimage[bioformats, write]
```
- bioformats: use [bio-formats](https://www.openmicroscopy.org/bio-formats/)
as fallback when other readers cannot open a file.
- write: write an image file into a mp4 or mkv file. Work in progress! Make sure ffmpeg is installed.
## Usage
### Python

File diff suppressed because it is too large Load Diff

View File

@@ -7,10 +7,12 @@ class JVMException(Exception):
try:
class JVM:
"""There can be only one java virtual machine per python process,
so this is a singleton class to manage the jvm.
"""
_instance = None
vm_started = False
vm_killed = False
@@ -24,7 +26,7 @@ try:
def __init__(self, jars=None):
if not self.vm_started and not self.vm_killed:
try:
jar_path = Path(__file__).parent / 'jars'
jar_path = Path(__file__).parent / "jars"
if jars is None:
jars = {}
for jar, src in jars.items():
@@ -33,6 +35,7 @@ try:
classpath = [str(jar_path / jar) for jar in jars.keys()]
import jpype
jpype.startJVM(classpath=classpath)
except Exception: # noqa
self.vm_started = False
@@ -56,11 +59,11 @@ try:
pass
if self.vm_killed:
raise Exception('The JVM was killed before, and cannot be restarted in this Python process.')
raise Exception("The JVM was killed before, and cannot be restarted in this Python process.")
@staticmethod
def download(src, dest):
print(f'Downloading {dest.name} to {dest}.')
print(f"Downloading {dest.name} to {dest}.")
dest.parent.mkdir(exist_ok=True)
dest.write_bytes(request.urlopen(src).read())
@@ -69,6 +72,7 @@ try:
self = cls._instance
if self is not None and self.vm_started and not self.vm_killed:
import jpype
jpype.shutdownJVM() # noqa
self.vm_started = False
self.vm_killed = True

View File

@@ -1 +1,6 @@
__all__ = 'bfread', 'cziread', 'fijiread', 'ndread', 'seqread', 'tifread', 'metaseriesread'
from .. import JVM
if JVM is None:
__all__ = "cziread", "fijiread", "ndread", "seqread", "tifread", "metaseriesread"
else:
__all__ = "bfread", "cziread", "fijiread", "ndread", "seqread", "tifread", "metaseriesread"

View File

@@ -8,13 +8,15 @@ import numpy as np
from .. import JVM, AbstractReader, JVMException
jars = {'bioformats_package.jar': 'https://downloads.openmicroscopy.org/bio-formats/latest/artifacts/'
'bioformats_package.jar'}
jars = {
"bioformats_package.jar": "https://downloads.openmicroscopy.org/bio-formats/latest/artifacts/"
"bioformats_package.jar"
}
class JVMReader:
def __init__(self, path: Path, series: int) -> None:
mp = multiprocessing.get_context('spawn')
mp = multiprocessing.get_context("spawn")
self.path = path
self.series = series
self.queue_in = mp.Queue()
@@ -23,7 +25,7 @@ class JVMReader:
self.process = mp.Process(target=self.run)
self.process.start()
status, message = self.queue_out.get()
if status == 'status' and message == 'started':
if status == "status" and message == "started":
self.is_alive = True
else:
raise JVMException(message)
@@ -45,7 +47,7 @@ class JVMReader:
def frame(self, c: int, z: int, t: int) -> np.ndarray:
self.queue_in.put((c, z, t))
status, message = self.queue_out.get()
if status == 'frame':
if status == "frame":
return message
else:
raise JVMException(message)
@@ -74,20 +76,20 @@ class JVMReader:
elif pixel_type == jvm.format_tools.UINT8:
dtype = np.uint8
elif pixel_type == jvm.format_tools.UINT16:
dtype = '<u2' if little_endian else '>u2'
dtype = "<u2" if little_endian else ">u2"
elif pixel_type == jvm.format_tools.INT16:
dtype = '<i2' if little_endian else '>i2'
dtype = "<i2" if little_endian else ">i2"
elif pixel_type == jvm.format_tools.UINT32:
dtype = '<u4' if little_endian else '>u4'
dtype = "<u4" if little_endian else ">u4"
elif pixel_type == jvm.format_tools.INT32:
dtype = '<i4' if little_endian else '>i4'
dtype = "<i4" if little_endian else ">i4"
elif pixel_type == jvm.format_tools.FLOAT:
dtype = '<f4' if little_endian else '>f4'
dtype = "<f4" if little_endian else ">f4"
elif pixel_type == jvm.format_tools.DOUBLE:
dtype = '<f8' if little_endian else '>f8'
dtype = "<f8" if little_endian else ">f8"
else:
dtype = None
self.queue_out.put(('status', 'started'))
self.queue_out.put(("status", "started"))
while not self.done.is_set():
try:
@@ -116,8 +118,10 @@ class JVMReader:
image.shape = (height, width, 3)
del rdr
elif reader.getSizeC() > 1:
images = [np.frombuffer(open_bytes_func(reader.getIndex(z, i, t)), dtype)
for i in range(reader.getSizeC())]
images = [
np.frombuffer(open_bytes_func(reader.getIndex(z, i, t)), dtype)
for i in range(reader.getSizeC())
]
image = np.dstack(images)
image.shape = (height, width, reader.getSizeC())
# if not channel_names is None:
@@ -161,13 +165,13 @@ class JVMReader:
image.shape = (height, width)
if image.ndim == 3:
self.queue_out.put(('frame', image[..., c]))
self.queue_out.put(("frame", image[..., c]))
else:
self.queue_out.put(('frame', image))
self.queue_out.put(("frame", image))
except queues.Empty: # noqa
continue
except (Exception,):
self.queue_out.put(('error', format_exc()))
self.queue_out.put(("error", format_exc()))
finally:
if jvm is not None:
jvm.kill_vm()
@@ -189,13 +193,14 @@ class Reader(AbstractReader, ABC):
"""This class is used as a last resort, when we don't have another way to open the file. We don't like it
because it requires the java vm.
"""
priority = 99 # panic and open with BioFormats
do_not_pickle = 'reader', 'key', 'jvm'
do_not_pickle = "reader", "key", "jvm"
@staticmethod
def _can_open(path: Path) -> bool:
"""Use java BioFormats to make an ome metadata structure."""
with multiprocessing.get_context('spawn').Pool(1) as pool:
with multiprocessing.get_context("spawn").Pool(1) as pool:
return pool.map(can_open, (path,))[0]
def open(self) -> None:

View File

@@ -26,11 +26,12 @@ except ImportError:
zoom = None
Element = TypeVar('Element')
Element = TypeVar("Element")
def zstd_decode(data: bytes) -> bytes: # noqa
"""decode zstd bytes, copied from BioFormats ZeissCZIReader"""
def read_var_int(stream: BytesIO) -> int: # noqa
a = stream.read(1)[0]
if a & 128:
@@ -51,7 +52,7 @@ def zstd_decode(data: bytes) -> bytes: # noqa
if chunk_id == 1:
high_low_unpacking = (stream.read(1)[0] & 1) == 1
else:
raise ValueError(f'Invalid chunk id: {chunk_id}')
raise ValueError(f"Invalid chunk id: {chunk_id}")
pointer = stream.tell()
except Exception: # noqa
high_low_unpacking = False
@@ -112,8 +113,7 @@ def data(self, raw: bool = False, resize: bool = True, order: int = 0) -> np.nda
# sub / supersampling
factors = [j / i for i, j in zip(de.stored_shape, de.shape)]
factors = [(int(round(f)) if abs(f - round(f)) < 0.0001 else f)
for f in factors]
factors = [(int(round(f)) if abs(f - round(f)) < 0.0001 else f) for f in factors]
# use repeat if possible
if order == 0 and all(isinstance(f, int) for f in factors):
@@ -154,27 +154,27 @@ czifile.czifile.SubBlockSegment.data = data
class Reader(AbstractReader, ABC):
priority = 0
do_not_pickle = 'reader', 'filedict'
do_not_pickle = "reader", "filedict"
@staticmethod
def _can_open(path: Path) -> bool:
return isinstance(path, Path) and path.suffix == '.czi'
return isinstance(path, Path) and path.suffix == ".czi"
def open(self) -> None:
self.reader = czifile.CziFile(self.path)
filedict = {}
for directory_entry in self.reader.filtered_subblock_directory:
idx = self.get_index(directory_entry, self.reader.start)
if 'S' not in self.reader.axes or self.series in range(*idx[self.reader.axes.index('S')]):
for c in range(*idx[self.reader.axes.index('C')]):
for z in range(*idx[self.reader.axes.index('Z')]):
for t in range(*idx[self.reader.axes.index('T')]):
if "S" not in self.reader.axes or self.series in range(*idx[self.reader.axes.index("S")]):
for c in range(*idx[self.reader.axes.index("C")]):
for z in range(*idx[self.reader.axes.index("Z")]):
for t in range(*idx[self.reader.axes.index("T")]):
if (c, z, t) in filedict:
filedict[c, z, t].append(directory_entry)
else:
filedict[c, z, t] = [directory_entry]
if len(filedict) == 0:
raise FileNotFoundError(f'Series {self.series} not found in {self.path}.')
raise FileNotFoundError(f"Series {self.series} not found in {self.path}.")
self.filedict = filedict # noqa
def close(self) -> None:
@@ -184,19 +184,21 @@ class Reader(AbstractReader, ABC):
return OmeParse.get_ome(self.reader, self.filedict)
def __frame__(self, c: int = 0, z: int = 0, t: int = 0) -> np.ndarray:
f = np.zeros(self.base_shape['yx'], self.dtype)
f = np.zeros(self.base_shape["yx"], self.dtype)
if (c, z, t) in self.filedict:
directory_entries = self.filedict[c, z, t]
x_min = min([f.start[f.axes.index('X')] for f in directory_entries])
y_min = min([f.start[f.axes.index('Y')] for f in directory_entries])
xy_min = {'X': x_min, 'Y': y_min}
x_min = min([f.start[f.axes.index("X")] for f in directory_entries])
y_min = min([f.start[f.axes.index("Y")] for f in directory_entries])
xy_min = {"X": x_min, "Y": y_min}
for directory_entry in directory_entries:
subblock = directory_entry.data_segment()
tile = subblock.data(resize=True, order=0)
axes_min = [xy_min.get(ax, 0) for ax in directory_entry.axes]
index = [slice(i - j - m, i - j + k)
for i, j, k, m in zip(directory_entry.start, self.reader.start, tile.shape, axes_min)]
index = tuple(index[self.reader.axes.index(i)] for i in 'YX')
index = [
slice(i - j - m, i - j + k)
for i, j, k, m in zip(directory_entry.start, self.reader.start, tile.shape, axes_min)
]
index = tuple(index[self.reader.axes.index(i)] for i in "YX")
f[index] = tile.squeeze()
return f
@@ -225,28 +227,27 @@ class OmeParse:
self.reader = reader
self.filedict = filedict
xml = reader.metadata()
self.attachments = {i.attachment_entry.name: i.attachment_entry.data_segment()
for i in reader.attachments()}
self.attachments = {i.attachment_entry.name: i.attachment_entry.data_segment() for i in reader.attachments()}
self.tree = etree.fromstring(xml)
self.metadata = self.tree.find('Metadata')
version = self.metadata.find('Version')
self.metadata = self.tree.find("Metadata")
version = self.metadata.find("Version")
if version is not None:
self.version = version.text
else:
self.version = self.metadata.find('Experiment').attrib['Version']
self.version = self.metadata.find("Experiment").attrib["Version"]
self.ome = OME()
self.information = self.metadata.find('Information')
self.display_setting = self.metadata.find('DisplaySetting')
self.experiment = self.metadata.find('Experiment')
self.acquisition_block = self.experiment.find('ExperimentBlocks').find('AcquisitionBlock')
self.instrument = self.information.find('Instrument')
self.image = self.information.find('Image')
self.information = self.metadata.find("Information")
self.display_setting = self.metadata.find("DisplaySetting")
self.experiment = self.metadata.find("Experiment")
self.acquisition_block = self.experiment.find("ExperimentBlocks").find("AcquisitionBlock")
self.instrument = self.information.find("Instrument")
self.image = self.information.find("Image")
if self.version == '1.0':
self.experiment = self.metadata.find('Experiment')
self.acquisition_block = self.experiment.find('ExperimentBlocks').find('AcquisitionBlock')
self.multi_track_setup = self.acquisition_block.find('MultiTrackSetup')
if self.version == "1.0":
self.experiment = self.metadata.find("Experiment")
self.acquisition_block = self.experiment.find("ExperimentBlocks").find("AcquisitionBlock")
self.multi_track_setup = self.acquisition_block.find("MultiTrackSetup")
else:
self.experiment = None
self.acquisition_block = None
@@ -281,326 +282,396 @@ class OmeParse:
return default
def get_experimenters(self) -> None:
if self.version == '1.0':
if self.version == "1.0":
self.ome.experimenters = [
model.Experimenter(id='Experimenter:0',
user_name=self.information.find('User').find('DisplayName').text)]
elif self.version in ('1.1', '1.2'):
model.Experimenter(
id="Experimenter:0", user_name=self.information.find("User").find("DisplayName").text
)
]
elif self.version in ("1.1", "1.2"):
self.ome.experimenters = [
model.Experimenter(id='Experimenter:0',
user_name=self.information.find('Document').find('UserName').text)]
model.Experimenter(
id="Experimenter:0", user_name=self.information.find("Document").find("UserName").text
)
]
def get_instruments(self) -> None:
if self.version == '1.0':
self.ome.instruments.append(model.Instrument(id=self.instrument.attrib['Id']))
elif self.version in ('1.1', '1.2'):
for _ in self.instrument.find('Microscopes'):
self.ome.instruments.append(model.Instrument(id='Instrument:0'))
if self.version == "1.0":
self.ome.instruments.append(model.Instrument(id=self.instrument.attrib["Id"]))
elif self.version in ("1.1", "1.2"):
for _ in self.instrument.find("Microscopes"):
self.ome.instruments.append(model.Instrument(id="Instrument:0"))
def get_detectors(self) -> None:
if self.version == '1.0':
for detector in self.instrument.find('Detectors'):
if self.version == "1.0":
for detector in self.instrument.find("Detectors"):
try:
detector_type = model.Detector_Type(self.text(detector.find('Type')).upper() or "")
detector_type = model.Detector_Type(self.text(detector.find("Type")).upper() or "")
except ValueError:
detector_type = model.Detector_Type.OTHER
self.ome.instruments[0].detectors.append(
model.Detector(
id=detector.attrib['Id'], model=self.text(detector.find('Manufacturer').find('Model')),
amplification_gain=float(self.text(detector.find('AmplificationGain'))),
gain=float(self.text(detector.find('Gain'))), zoom=float(self.text(detector.find('Zoom'))),
type=detector_type
))
elif self.version in ('1.1', '1.2'):
for detector in self.instrument.find('Detectors'):
id=detector.attrib["Id"],
model=self.text(detector.find("Manufacturer").find("Model")),
amplification_gain=float(self.text(detector.find("AmplificationGain"))),
gain=float(self.text(detector.find("Gain"))),
zoom=float(self.text(detector.find("Zoom"))),
type=detector_type,
)
)
elif self.version in ("1.1", "1.2"):
for detector in self.instrument.find("Detectors"):
try:
detector_type = model.Detector_Type(self.text(detector.find('Type')).upper() or "")
detector_type = model.Detector_Type(self.text(detector.find("Type")).upper() or "")
except ValueError:
detector_type = model.Detector_Type.OTHER
self.ome.instruments[0].detectors.append(
model.Detector(
id=detector.attrib['Id'].replace(' ', ''),
model=self.text(detector.find('Manufacturer').find('Model')),
type=detector_type
))
id=detector.attrib["Id"].replace(" ", ""),
model=self.text(detector.find("Manufacturer").find("Model")),
type=detector_type,
)
)
def get_objectives(self) -> None:
for objective in self.instrument.find('Objectives'):
for objective in self.instrument.find("Objectives"):
self.ome.instruments[0].objectives.append(
model.Objective(
id=objective.attrib['Id'],
model=self.text(objective.find('Manufacturer').find('Model')),
immersion=self.text(objective.find('Immersion')), # type: ignore
lens_na=float(self.text(objective.find('LensNA'))),
nominal_magnification=float(self.text(objective.find('NominalMagnification')))))
id=objective.attrib["Id"],
model=self.text(objective.find("Manufacturer").find("Model")),
immersion=self.text(objective.find("Immersion")), # type: ignore
lens_na=float(self.text(objective.find("LensNA"))),
nominal_magnification=float(self.text(objective.find("NominalMagnification"))),
)
)
def get_tubelenses(self) -> None:
if self.version == '1.0':
for idx, tube_lens in enumerate({self.text(track_setup.find('TubeLensPosition'))
for track_setup in self.multi_track_setup}):
if self.version == "1.0":
for idx, tube_lens in enumerate(
{self.text(track_setup.find("TubeLensPosition")) for track_setup in self.multi_track_setup}
):
try:
nominal_magnification = float(re.findall(r'\d+[,.]\d*', tube_lens)[0].replace(',', '.'))
nominal_magnification = float(re.findall(r"\d+[,.]\d*", tube_lens)[0].replace(",", "."))
except Exception: # noqa
nominal_magnification = 1.0
self.ome.instruments[0].objectives.append(
model.Objective(id=f'Objective:Tubelens:{idx}', model=tube_lens,
nominal_magnification=nominal_magnification))
elif self.version in ('1.1', '1.2'):
for tubelens in self.def_list(self.instrument.find('TubeLenses')):
model.Objective(
id=f"Objective:Tubelens:{idx}", model=tube_lens, nominal_magnification=nominal_magnification
)
)
elif self.version in ("1.1", "1.2"):
for tubelens in self.def_list(self.instrument.find("TubeLenses")):
try:
nominal_magnification = float(re.findall(r'\d+(?:[,.]\d*)?',
tubelens.attrib['Name'])[0].replace(',', '.'))
nominal_magnification = float(
re.findall(r"\d+(?:[,.]\d*)?", tubelens.attrib["Name"])[0].replace(",", ".")
)
except Exception: # noqa
nominal_magnification = 1.0
self.ome.instruments[0].objectives.append(
model.Objective(
id=f"Objective:{tubelens.attrib['Id']}",
model=tubelens.attrib['Name'],
nominal_magnification=nominal_magnification))
model=tubelens.attrib["Name"],
nominal_magnification=nominal_magnification,
)
)
def get_light_sources(self) -> None:
if self.version == '1.0':
for light_source in self.def_list(self.instrument.find('LightSources')):
if self.version == "1.0":
for light_source in self.def_list(self.instrument.find("LightSources")):
try:
if light_source.find('LightSourceType').find('Laser') is not None:
if light_source.find("LightSourceType").find("Laser") is not None:
self.ome.instruments[0].lasers.append(
model.Laser(
id=light_source.attrib['Id'],
model=self.text(light_source.find('Manufacturer').find('Model')),
power=float(self.text(light_source.find('Power'))),
id=light_source.attrib["Id"],
model=self.text(light_source.find("Manufacturer").find("Model")),
power=float(self.text(light_source.find("Power"))),
wavelength=float(
self.text(light_source.find('LightSourceType').find('Laser').find('Wavelength')))))
self.text(light_source.find("LightSourceType").find("Laser").find("Wavelength"))
),
)
)
except AttributeError:
pass
elif self.version in ('1.1', '1.2'):
for light_source in self.def_list(self.instrument.find('LightSources')):
elif self.version in ("1.1", "1.2"):
for light_source in self.def_list(self.instrument.find("LightSources")):
try:
if light_source.find('LightSourceType').find('Laser') is not None:
if light_source.find("LightSourceType").find("Laser") is not None:
self.ome.instruments[0].lasers.append(
model.Laser(
id=f"LightSource:{light_source.attrib['Id']}",
power=float(self.text(light_source.find('Power'))),
wavelength=float(light_source.attrib['Id'][-3:]))) # TODO: follow Id reference
power=float(self.text(light_source.find("Power"))),
wavelength=float(light_source.attrib["Id"][-3:]),
)
) # TODO: follow Id reference
except (AttributeError, ValueError):
pass
def get_filters(self) -> None:
if self.version == '1.0':
for idx, filter_ in enumerate({self.text(beam_splitter.find('Filter'))
if self.version == "1.0":
for idx, filter_ in enumerate(
{
self.text(beam_splitter.find("Filter"))
for track_setup in self.multi_track_setup
for beam_splitter in track_setup.find('BeamSplitters')}):
self.ome.instruments[0].filter_sets.append(
model.FilterSet(id=f'FilterSet:{idx}', model=filter_)
)
for beam_splitter in track_setup.find("BeamSplitters")
}
):
self.ome.instruments[0].filter_sets.append(model.FilterSet(id=f"FilterSet:{idx}", model=filter_))
def get_pixels(self) -> None:
x_min = min([f.start[f.axes.index('X')] for f in self.filedict[0, 0, 0]])
y_min = min([f.start[f.axes.index('Y')] for f in self.filedict[0, 0, 0]])
x_max = max([f.start[f.axes.index('X')] + f.shape[f.axes.index('X')] for f in self.filedict[0, 0, 0]])
y_max = max([f.start[f.axes.index('Y')] + f.shape[f.axes.index('Y')] for f in self.filedict[0, 0, 0]])
x_min = min([f.start[f.axes.index("X")] for f in self.filedict[0, 0, 0]])
y_min = min([f.start[f.axes.index("Y")] for f in self.filedict[0, 0, 0]])
x_max = max([f.start[f.axes.index("X")] + f.shape[f.axes.index("X")] for f in self.filedict[0, 0, 0]])
y_max = max([f.start[f.axes.index("Y")] + f.shape[f.axes.index("Y")] for f in self.filedict[0, 0, 0]])
self.size_x = x_max - x_min
self.size_y = y_max - y_min
self.size_c, self.size_z, self.size_t = (self.reader.shape[self.reader.axes.index(directory_entry)]
for directory_entry in 'CZT')
image = self.information.find('Image')
pixel_type = self.text(image.find('PixelType'), 'Gray16')
if pixel_type.startswith('Gray'):
pixel_type = 'uint' + pixel_type[4:]
objective_settings = image.find('ObjectiveSettings')
self.size_c, self.size_z, self.size_t = (
self.reader.shape[self.reader.axes.index(directory_entry)] for directory_entry in "CZT"
)
image = self.information.find("Image")
pixel_type = self.text(image.find("PixelType"), "Gray16")
if pixel_type.startswith("Gray"):
pixel_type = "uint" + pixel_type[4:]
objective_settings = image.find("ObjectiveSettings")
self.ome.images.append(
model.Image(
id='Image:0',
id="Image:0",
name=f"{self.text(self.information.find('Document').find('Name'))} #1",
pixels=model.Pixels(
id='Pixels:0', size_x=self.size_x, size_y=self.size_y,
size_c=self.size_c, size_z=self.size_z, size_t=self.size_t,
dimension_order='XYCZT', type=pixel_type, # type: ignore
significant_bits=int(self.text(image.find('ComponentBitCount'))),
big_endian=False, interleaved=False, metadata_only=True), # type: ignore
experimenter_ref=model.ExperimenterRef(id='Experimenter:0'),
instrument_ref=model.InstrumentRef(id='Instrument:0'),
id="Pixels:0",
size_x=self.size_x,
size_y=self.size_y,
size_c=self.size_c,
size_z=self.size_z,
size_t=self.size_t,
dimension_order="XYCZT",
type=pixel_type, # type: ignore
significant_bits=int(self.text(image.find("ComponentBitCount"))),
big_endian=False,
interleaved=False,
metadata_only=True,
), # type: ignore
experimenter_ref=model.ExperimenterRef(id="Experimenter:0"),
instrument_ref=model.InstrumentRef(id="Instrument:0"),
objective_settings=model.ObjectiveSettings(
id=objective_settings.find('ObjectiveRef').attrib['Id'],
medium=self.text(objective_settings.find('Medium')), # type: ignore
refractive_index=float(self.text(objective_settings.find('RefractiveIndex')))),
id=objective_settings.find("ObjectiveRef").attrib["Id"],
medium=self.text(objective_settings.find("Medium")), # type: ignore
refractive_index=float(self.text(objective_settings.find("RefractiveIndex"))),
),
stage_label=model.StageLabel(
name=f'Scene position #0',
x=self.positions[0], x_unit=self.um,
y=self.positions[1], y_unit=self.um,
z=self.positions[2], z_unit=self.um)))
name=f"Scene position #0",
x=self.positions[0],
x_unit=self.um,
y=self.positions[1],
y_unit=self.um,
z=self.positions[2],
z_unit=self.um,
),
)
)
for distance in self.metadata.find('Scaling').find('Items'):
if distance.attrib['Id'] == 'X':
self.ome.images[0].pixels.physical_size_x = float(self.text(distance.find('Value'))) * 1e6
elif distance.attrib['Id'] == 'Y':
self.ome.images[0].pixels.physical_size_y = float(self.text(distance.find('Value'))) * 1e6
elif self.size_z > 1 and distance.attrib['Id'] == 'Z':
self.ome.images[0].pixels.physical_size_z = float(self.text(distance.find('Value'))) * 1e6
for distance in self.metadata.find("Scaling").find("Items"):
if distance.attrib["Id"] == "X":
self.ome.images[0].pixels.physical_size_x = float(self.text(distance.find("Value"))) * 1e6
elif distance.attrib["Id"] == "Y":
self.ome.images[0].pixels.physical_size_y = float(self.text(distance.find("Value"))) * 1e6
elif self.size_z > 1 and distance.attrib["Id"] == "Z":
self.ome.images[0].pixels.physical_size_z = float(self.text(distance.find("Value"))) * 1e6
@cached_property
def positions(self) -> tuple[float, float, Optional[float]]:
if self.version == '1.0':
scenes = self.image.find('Dimensions').find('S').find('Scenes')
positions = scenes[0].find('Positions')[0]
return float(positions.attrib['X']), float(positions.attrib['Y']), float(positions.attrib['Z'])
elif self.version in ('1.1', '1.2'):
if self.version == "1.0":
scenes = self.image.find("Dimensions").find("S").find("Scenes")
positions = scenes[0].find("Positions")[0]
return float(positions.attrib["X"]), float(positions.attrib["Y"]), float(positions.attrib["Z"])
elif self.version in ("1.1", "1.2"):
try: # TODO
scenes = self.image.find('Dimensions').find('S').find('Scenes')
center_position = [float(pos) for pos in self.text(scenes[0].find('CenterPosition')).split(',')]
scenes = self.image.find("Dimensions").find("S").find("Scenes")
center_position = [float(pos) for pos in self.text(scenes[0].find("CenterPosition")).split(",")]
except AttributeError:
center_position = [0, 0]
return center_position[0], center_position[1], None
@cached_property
def channels_im(self) -> dict:
return {channel.attrib['Id']: channel for channel in self.image.find('Dimensions').find('Channels')}
return {channel.attrib["Id"]: channel for channel in self.image.find("Dimensions").find("Channels")}
@cached_property
def channels_ds(self) -> dict:
return {channel.attrib['Id']: channel for channel in self.display_setting.find('Channels')}
return {channel.attrib["Id"]: channel for channel in self.display_setting.find("Channels")}
@cached_property
def channels_ts(self) -> dict:
return {detector.attrib['Id']: track_setup
for track_setup in
self.experiment.find('ExperimentBlocks').find('AcquisitionBlock').find('MultiTrackSetup')
for detector in track_setup.find('Detectors')}
return {
detector.attrib["Id"]: track_setup
for track_setup in self.experiment.find("ExperimentBlocks")
.find("AcquisitionBlock")
.find("MultiTrackSetup")
for detector in track_setup.find("Detectors")
}
def get_channels(self) -> None:
if self.version == '1.0':
if self.version == "1.0":
for idx, (key, channel) in enumerate(self.channels_im.items()):
detector_settings = channel.find('DetectorSettings')
laser_scan_info = channel.find('LaserScanInfo')
detector = detector_settings.find('Detector')
detector_settings = channel.find("DetectorSettings")
laser_scan_info = channel.find("LaserScanInfo")
detector = detector_settings.find("Detector")
try:
binning = model.Binning(self.text(detector_settings.find('Binning')))
binning = model.Binning(self.text(detector_settings.find("Binning")))
except ValueError:
binning = model.Binning.OTHER
filterset = self.text(self.channels_ts[key].find('BeamSplitters')[0].find('Filter'))
filterset = self.text(self.channels_ts[key].find("BeamSplitters")[0].find("Filter"))
filterset_idx = [filterset.model for filterset in self.ome.instruments[0].filter_sets].index(filterset)
light_sources_settings = channel.find('LightSourcesSettings')
light_sources_settings = channel.find("LightSourcesSettings")
# no space in ome for multiple lightsources simultaneously
if len(light_sources_settings) > idx:
light_source_settings = light_sources_settings[idx]
else:
light_source_settings = light_sources_settings[0]
light_source_settings = model.LightSourceSettings(
id=light_source_settings.find('LightSource').attrib['Id'],
attenuation=float(self.text(light_source_settings.find('Attenuation'))),
wavelength=float(self.text(light_source_settings.find('Wavelength'))),
wavelength_unit=self.nm)
id=light_source_settings.find("LightSource").attrib["Id"],
attenuation=float(self.text(light_source_settings.find("Attenuation"))),
wavelength=float(self.text(light_source_settings.find("Wavelength"))),
wavelength_unit=self.nm,
)
self.ome.images[0].pixels.channels.append(
model.Channel(
id=f'Channel:{idx}',
name=channel.attrib['Name'],
acquisition_mode=self.text(channel.find('AcquisitionMode')), # type: ignore
color=model.Color(self.text(self.channels_ds[channel.attrib['Id']].find('Color'), 'white')),
detector_settings=model.DetectorSettings(id=detector.attrib['Id'], binning=binning),
id=f"Channel:{idx}",
name=channel.attrib["Name"],
acquisition_mode=self.text(channel.find("AcquisitionMode")), # type: ignore
color=model.Color(self.text(self.channels_ds[channel.attrib["Id"]].find("Color"), "white")),
detector_settings=model.DetectorSettings(id=detector.attrib["Id"], binning=binning),
# emission_wavelength=text(channel.find('EmissionWavelength')), # TODO: fix
excitation_wavelength=light_source_settings.wavelength,
filter_set_ref=model.FilterSetRef(id=self.ome.instruments[0].filter_sets[filterset_idx].id),
illumination_type=self.text(channel.find('IlluminationType')), # type: ignore
illumination_type=self.text(channel.find("IlluminationType")), # type: ignore
light_source_settings=light_source_settings,
samples_per_pixel=int(self.text(laser_scan_info.find('Averaging')))))
elif self.version in ('1.1', '1.2'):
samples_per_pixel=int(self.text(laser_scan_info.find("Averaging"))),
)
)
elif self.version in ("1.1", "1.2"):
for idx, (key, channel) in enumerate(self.channels_im.items()):
detector_settings = channel.find('DetectorSettings')
laser_scan_info = channel.find('LaserScanInfo')
detector = detector_settings.find('Detector')
detector_settings = channel.find("DetectorSettings")
laser_scan_info = channel.find("LaserScanInfo")
detector = detector_settings.find("Detector")
try:
color = model.Color(self.text(self.channels_ds[channel.attrib['Id']].find('Color'), 'white'))
color = model.Color(self.text(self.channels_ds[channel.attrib["Id"]].find("Color"), "white"))
except Exception: # noqa
color = None
try:
if (i := self.text(channel.find('EmissionWavelength'))) != '0':
if (i := self.text(channel.find("EmissionWavelength"))) != "0":
emission_wavelength = float(i)
else:
emission_wavelength = None
except Exception: # noqa
emission_wavelength = None
if laser_scan_info is not None:
samples_per_pixel = int(self.text(laser_scan_info.find('Averaging'), '1'))
samples_per_pixel = int(self.text(laser_scan_info.find("Averaging"), "1"))
else:
samples_per_pixel = 1
try:
binning = model.Binning(self.text(detector_settings.find('Binning')))
binning = model.Binning(self.text(detector_settings.find("Binning")))
except ValueError:
binning = model.Binning.OTHER
light_sources_settings = channel.find('LightSourcesSettings')
light_sources_settings = channel.find("LightSourcesSettings")
# no space in ome for multiple lightsources simultaneously
if light_sources_settings is not None:
light_source_settings = light_sources_settings[0]
light_source_settings = model.LightSourceSettings(
id='LightSource:' + '_'.join([light_source_settings.find('LightSource').attrib['Id']
for light_source_settings in light_sources_settings]),
attenuation=self.try_default(float, None, self.text(light_source_settings.find('Attenuation'))),
wavelength=self.try_default(float, None, self.text(light_source_settings.find('Wavelength'))),
wavelength_unit=self.nm)
id="LightSource:"
+ "_".join(
[
light_source_settings.find("LightSource").attrib["Id"]
for light_source_settings in light_sources_settings
]
),
attenuation=self.try_default(
float, None, self.text(light_source_settings.find("Attenuation"))
),
wavelength=self.try_default(float, None, self.text(light_source_settings.find("Wavelength"))),
wavelength_unit=self.nm,
)
else:
light_source_settings = None
self.ome.images[0].pixels.channels.append(
model.Channel(
id=f'Channel:{idx}',
name=channel.attrib['Name'],
acquisition_mode=self.text(channel.find('AcquisitionMode')).replace( # type: ignore
'SingleMoleculeLocalisation', 'SingleMoleculeImaging'),
id=f"Channel:{idx}",
name=channel.attrib["Name"],
acquisition_mode=self.text(channel.find("AcquisitionMode")).replace( # type: ignore
"SingleMoleculeLocalisation", "SingleMoleculeImaging"
),
color=color,
detector_settings=model.DetectorSettings(
id=detector.attrib['Id'].replace(' ', ""),
binning=binning),
id=detector.attrib["Id"].replace(" ", ""), binning=binning
),
emission_wavelength=emission_wavelength,
excitation_wavelength=self.try_default(float, None,
self.text(channel.find('ExcitationWavelength'))),
excitation_wavelength=self.try_default(
float, None, self.text(channel.find("ExcitationWavelength"))
),
# filter_set_ref=model.FilterSetRef(id=ome.instruments[0].filter_sets[filterset_idx].id),
illumination_type=self.text(channel.find('IlluminationType')), # type: ignore
illumination_type=self.text(channel.find("IlluminationType")), # type: ignore
light_source_settings=light_source_settings,
samples_per_pixel=samples_per_pixel))
samples_per_pixel=samples_per_pixel,
)
)
def get_planes(self) -> None:
try:
exposure_times = [float(self.text(channel.find('LaserScanInfo').find('FrameTime')))
for channel in self.channels_im.values()]
exposure_times = [
float(self.text(channel.find("LaserScanInfo").find("FrameTime")))
for channel in self.channels_im.values()
]
except Exception: # noqa
exposure_times = [None] * len(self.channels_im)
delta_ts = self.attachments['TimeStamps'].data()
delta_ts = self.attachments["TimeStamps"].data()
dt = np.diff(delta_ts)
if len(dt) and np.std(dt) / np.mean(dt) > 0.02:
dt = np.median(dt[dt > 0])
delta_ts = dt * np.arange(len(delta_ts))
warnings.warn(f'delta_t is inconsistent, using median value: {dt}')
warnings.warn(f"delta_t is inconsistent, using median value: {dt}")
for t, z, c in product(range(self.size_t), range(self.size_z), range(self.size_c)):
self.ome.images[0].pixels.planes.append(
model.Plane(the_c=c, the_z=z, the_t=t, delta_t=delta_ts[t],
exposure_time=exposure_times[c],
position_x=self.positions[0], position_x_unit=self.um,
position_y=self.positions[1], position_y_unit=self.um,
position_z=self.positions[2], position_z_unit=self.um))
model.Plane(
the_c=c,
the_z=z,
the_t=t,
delta_t=delta_ts[t],
exposure_time=exposure_times[min(c, len(exposure_times) - 1)] if len(exposure_times) > 0 else None,
position_x=self.positions[0],
position_x_unit=self.um,
position_y=self.positions[1],
position_y_unit=self.um,
position_z=self.positions[2],
position_z_unit=self.um,
)
)
def get_annotations(self) -> None:
idx = 0
for layer in [] if (ml := self.metadata.find('Layers')) is None else ml:
rectangle = layer.find('Elements').find('Rectangle')
for layer in [] if (ml := self.metadata.find("Layers")) is None else ml:
rectangle = layer.find("Elements").find("Rectangle")
if rectangle is not None:
geometry = rectangle.find('Geometry')
roi = model.ROI(id=f'ROI:{idx}', description=self.text(layer.find('Usage')))
geometry = rectangle.find("Geometry")
roi = model.ROI(id=f"ROI:{idx}", description=self.text(layer.find("Usage")))
roi.union.append(
model.Rectangle(
id='Shape:0:0',
height=float(self.text(geometry.find('Height'))),
width=float(self.text(geometry.find('Width'))),
x=float(self.text(geometry.find('Left'))),
y=float(self.text(geometry.find('Top')))))
id="Shape:0:0",
height=float(self.text(geometry.find("Height"))),
width=float(self.text(geometry.find("Width"))),
x=float(self.text(geometry.find("Left"))),
y=float(self.text(geometry.find("Top"))),
)
)
self.ome.rois.append(roi)
self.ome.images[0].roi_refs.append(model.ROIRef(id=f'ROI:{idx}'))
self.ome.images[0].roi_refs.append(model.ROIRef(id=f"ROI:{idx}"))
idx += 1

View File

@@ -13,12 +13,13 @@ from .. import AbstractReader
class Reader(AbstractReader, ABC):
"""Can read some tif files written with Fiji which are broken because Fiji didn't finish writing."""
priority = 90
do_not_pickle = 'reader'
do_not_pickle = "reader"
@staticmethod
def _can_open(path):
if isinstance(path, Path) and path.suffix in ('.tif', '.tiff'):
if isinstance(path, Path) and path.suffix in (".tif", ".tiff"):
with TiffFile(path) as tif:
return tif.is_imagej and not tif.is_bigtiff
else:
@@ -26,17 +27,17 @@ class Reader(AbstractReader, ABC):
def __frame__(self, c, z, t): # Override this, return the frame at c, z, t
self.reader.filehandle.seek(self.offset + t * self.count)
return np.reshape(unpack(self.fmt, self.reader.filehandle.read(self.count)), self.base_shape['yx'])
return np.reshape(unpack(self.fmt, self.reader.filehandle.read(self.count)), self.base_shape["yx"])
def open(self):
warn(f'File {self.path.name} is probably damaged, opening with fijiread.')
warn(f"File {self.path.name} is probably damaged, opening with fijiread.")
self.reader = TiffFile(self.path)
assert self.reader.pages[0].compression == 1, 'Can only read uncompressed tiff files.'
assert self.reader.pages[0].samplesperpixel == 1, 'Can only read 1 sample per pixel.'
assert self.reader.pages[0].compression == 1, "Can only read uncompressed tiff files."
assert self.reader.pages[0].samplesperpixel == 1, "Can only read 1 sample per pixel."
self.offset = self.reader.pages[0].dataoffsets[0] # noqa
self.count = self.reader.pages[0].databytecounts[0] # noqa
self.bytes_per_sample = self.reader.pages[0].bitspersample // 8 # noqa
self.fmt = self.reader.byteorder + self.count // self.bytes_per_sample * 'BHILQ'[self.bytes_per_sample - 1] # noqa
self.fmt = self.reader.byteorder + self.count // self.bytes_per_sample * "BHILQ"[self.bytes_per_sample - 1] # noqa
def close(self):
self.reader.close()
@@ -51,9 +52,17 @@ class Reader(AbstractReader, ABC):
ome.images.append(
model.Image(
pixels=model.Pixels(
size_c=size_c, size_z=size_z, size_t=size_t, size_x=size_x, size_y=size_y,
dimension_order='XYCZT', type=pixel_type),
objective_settings=model.ObjectiveSettings(id='Objective:0')))
size_c=size_c,
size_z=size_z,
size_t=size_t,
size_x=size_x,
size_y=size_y,
dimension_order="XYCZT",
type=pixel_type,
),
objective_settings=model.ObjectiveSettings(id="Objective:0"),
)
)
for c, z, t in product(range(size_c), range(size_z), range(size_t)):
ome.images[0].pixels.planes.append(model.Plane(the_c=c, the_z=z, the_t=t, delta_t=0))
return ome

View File

@@ -12,16 +12,17 @@ from .. import AbstractReader
class Reader(AbstractReader, ABC):
priority = 20
do_not_pickle = 'last_tif'
do_not_pickle = "last_tif"
@staticmethod
def _can_open(path):
return isinstance(path, Path) and (path.is_dir() or
(path.parent.is_dir() and path.name.lower().startswith('pos')))
return isinstance(path, Path) and (
path.is_dir() or (path.parent.is_dir() and path.name.lower().startswith("pos"))
)
@staticmethod
def get_positions(path: str | Path) -> Optional[list[int]]:
pat = re.compile(rf's(\d)_t\d+\.(tif|TIF)$')
pat = re.compile(rf"s(\d)_t\d+\.(tif|TIF)$")
return sorted({int(m.group(1)) for file in Path(path).iterdir() if (m := pat.search(file.name))})
def get_ome(self):
@@ -31,7 +32,7 @@ class Reader(AbstractReader, ABC):
size_z = len(tif.pages)
page = tif.pages[0]
shape = {axis.lower(): size for axis, size in zip(page.axes, page.shape)}
size_x, size_y = shape['x'], shape['y']
size_x, size_y = shape["x"], shape["y"]
ome.instruments.append(model.Instrument())
@@ -41,16 +42,23 @@ class Reader(AbstractReader, ABC):
ome.images.append(
model.Image(
pixels=model.Pixels(
size_c=size_c, size_z=size_z, size_t=size_t,
size_x=size_x, size_y=size_y,
dimension_order='XYCZT', type=pixel_type),
objective_settings=model.ObjectiveSettings(id='Objective:0')))
size_c=size_c,
size_z=size_z,
size_t=size_t,
size_x=size_x,
size_y=size_y,
dimension_order="XYCZT",
type=pixel_type,
),
objective_settings=model.ObjectiveSettings(id="Objective:0"),
)
)
return ome
def open(self):
pat = re.compile(rf's{self.series}_t\d+\.(tif|TIF)$')
pat = re.compile(rf"s{self.series}_t\d+\.(tif|TIF)$")
filelist = sorted([file for file in self.path.iterdir() if pat.search(file.name)])
pattern = re.compile(r't(\d+)$')
pattern = re.compile(r"t(\d+)$")
self.filedict = {int(pattern.search(file.stem).group(1)) - 1: file for file in filelist}
if len(self.filedict) == 0:
raise FileNotFoundError
@@ -72,9 +80,9 @@ class Reader(AbstractReader, ABC):
def __frame__(self, c=0, z=0, t=0):
tif = self.get_tif(t)
page = tif.pages[z]
if page.axes.upper() == 'YX':
if page.axes.upper() == "YX":
return page.asarray()
elif page.axes.upper() == 'XY':
elif page.axes.upper() == "XY":
return page.asarray().T
else:
raise NotImplementedError(f'reading axes {page.axes} is not implemented')
raise NotImplementedError(f"reading axes {page.axes} is not implemented")

View File

@@ -17,23 +17,32 @@ class Reader(AbstractReader, ABC):
def get_ome(self):
def shape(size_x=1, size_y=1, size_c=1, size_z=1, size_t=1): # noqa
return size_x, size_y, size_c, size_z, size_t
size_x, size_y, size_c, size_z, size_t = shape(*self.array.shape)
try:
pixel_type = model.PixelType(self.array.dtype.name)
except ValueError:
if self.array.dtype.name.startswith('int'):
pixel_type = model.PixelType('int32')
if self.array.dtype.name.startswith("int"):
pixel_type = model.PixelType("int32")
else:
pixel_type = model.PixelType('float')
pixel_type = model.PixelType("float")
ome = model.OME()
ome.instruments.append(model.Instrument())
ome.images.append(
model.Image(
pixels=model.Pixels(
size_c=size_c, size_z=size_z, size_t=size_t, size_x=size_x, size_y=size_y,
dimension_order='XYCZT', type=pixel_type),
objective_settings=model.ObjectiveSettings(id='Objective:0')))
size_c=size_c,
size_z=size_z,
size_t=size_t,
size_x=size_x,
size_y=size_y,
dimension_order="XYCZT",
type=pixel_type,
),
objective_settings=model.ObjectiveSettings(id="Objective:0"),
)
)
for c, z, t in product(range(size_c), range(size_z), range(size_t)):
ome.images[0].pixels.planes.append(model.Plane(the_c=c, the_z=z, the_t=t, delta_t=0))
return ome
@@ -43,11 +52,11 @@ class Reader(AbstractReader, ABC):
self.array = np.array(self.path)
while self.array.ndim < 5:
self.array = np.expand_dims(self.array, -1) # noqa
self.path = 'numpy array'
self.path = "numpy array"
def __frame__(self, c, z, t):
frame = self.array[:, :, c, z, t]
if self.axes.find('y') > self.axes.find('x'):
if self.axes.find("y") > self.axes.find("x"):
return frame.T
else:
return frame

View File

@@ -21,24 +21,26 @@ def lazy_property(function, field, *arg_fields):
except Exception: # noqa
pass
return self.__dict__[field]
return property(lazy)
class Plane(model.Plane):
"""Lazily retrieve delta_t from metadata"""
def __init__(self, t0, file, **kwargs): # noqa
super().__init__(**kwargs)
# setting fields here because they would be removed by ome_types/pydantic after class definition
setattr(self.__class__, 'delta_t', lazy_property(self.get_delta_t, 'delta_t', 't0', 'file'))
setattr(self.__class__, 'delta_t_quantity', _quantity_property('delta_t'))
self.__dict__['t0'] = t0 # noqa
self.__dict__['file'] = file # noqa
setattr(self.__class__, "delta_t", lazy_property(self.get_delta_t, "delta_t", "t0", "file"))
setattr(self.__class__, "delta_t_quantity", _quantity_property("delta_t"))
self.__dict__["t0"] = t0 # noqa
self.__dict__["file"] = file # noqa
@staticmethod
def get_delta_t(t0, file):
with tifffile.TiffFile(file) as tif:
info = yaml.safe_load(tif.pages[0].tags[50839].value['Info'])
return float((datetime.strptime(info['Time'], '%Y-%m-%d %H:%M:%S %z') - t0).seconds)
info = yaml.safe_load(tif.pages[0].tags[50839].value["Info"])
return float((datetime.strptime(info["Time"], "%Y-%m-%d %H:%M:%S %z") - t0).seconds)
class Reader(AbstractReader, ABC):
@@ -46,78 +48,108 @@ class Reader(AbstractReader, ABC):
@staticmethod
def _can_open(path):
pat = re.compile(r'(?:\d+-)?Pos.*', re.IGNORECASE)
return (isinstance(path, Path) and path.is_dir() and
(pat.match(path.name) or any(file.is_dir() and pat.match(file.stem) for file in path.iterdir())))
pat = re.compile(r"(?:\d+-)?Pos.*", re.IGNORECASE)
return (
isinstance(path, Path)
and path.is_dir()
and (pat.match(path.name) or any(file.is_dir() and pat.match(file.stem) for file in path.iterdir()))
)
def get_ome(self):
ome = model.OME()
with tifffile.TiffFile(self.filedict[0, 0, 0]) as tif:
metadata = {key: yaml.safe_load(value) for key, value in tif.pages[0].tags[50839].value.items()}
ome.experimenters.append(
model.Experimenter(id='Experimenter:0', user_name=metadata['Info']['Summary']['UserName']))
objective_str = metadata['Info']['ZeissObjectiveTurret-Label']
model.Experimenter(id="Experimenter:0", user_name=metadata["Info"]["Summary"]["UserName"])
)
objective_str = metadata["Info"]["ZeissObjectiveTurret-Label"]
ome.instruments.append(model.Instrument())
ome.instruments[0].objectives.append(
model.Objective(
id='Objective:0', manufacturer='Zeiss', model=objective_str,
nominal_magnification=float(re.findall(r'(\d+)x', objective_str)[0]),
lens_na=float(re.findall(r'/(\d\.\d+)', objective_str)[0]),
immersion=model.Objective_Immersion.OIL if 'oil' in objective_str.lower() else None))
tubelens_str = metadata['Info']['ZeissOptovar-Label']
id="Objective:0",
manufacturer="Zeiss",
model=objective_str,
nominal_magnification=float(re.findall(r"(\d+)x", objective_str)[0]),
lens_na=float(re.findall(r"/(\d\.\d+)", objective_str)[0]),
immersion=model.Objective_Immersion.OIL if "oil" in objective_str.lower() else None,
)
)
tubelens_str = metadata["Info"]["ZeissOptovar-Label"]
ome.instruments[0].objectives.append(
model.Objective(
id='Objective:Tubelens:0', manufacturer='Zeiss', model=tubelens_str,
nominal_magnification=float(re.findall(r'\d?\d*[,.]?\d+(?=x$)', tubelens_str)[0].replace(',', '.'))))
ome.instruments[0].detectors.append(
model.Detector(
id='Detector:0', amplification_gain=100))
id="Objective:Tubelens:0",
manufacturer="Zeiss",
model=tubelens_str,
nominal_magnification=float(re.findall(r"\d?\d*[,.]?\d+(?=x$)", tubelens_str)[0].replace(",", ".")),
)
)
ome.instruments[0].detectors.append(model.Detector(id="Detector:0", amplification_gain=100))
ome.instruments[0].filter_sets.append(
model.FilterSet(id='FilterSet:0', model=metadata['Info']['ZeissReflectorTurret-Label']))
model.FilterSet(id="FilterSet:0", model=metadata["Info"]["ZeissReflectorTurret-Label"])
)
pxsize = metadata['Info']['PixelSizeUm']
pxsize_cam = 6.5 if 'Hamamatsu' in metadata['Info']['Core-Camera'] else None
pxsize = metadata["Info"]["PixelSizeUm"]
pxsize_cam = 6.5 if "Hamamatsu" in metadata["Info"]["Core-Camera"] else None
if pxsize == 0:
pxsize = pxsize_cam / ome.instruments[0].objectives[0].nominal_magnification
pixel_type = metadata['Info']['PixelType'].lower()
if pixel_type.startswith('gray'):
pixel_type = 'uint' + pixel_type[4:]
pixel_type = metadata["Info"]["PixelType"].lower()
if pixel_type.startswith("gray"):
pixel_type = "uint" + pixel_type[4:]
else:
pixel_type = 'uint16' # assume
pixel_type = "uint16" # assume
size_c, size_z, size_t = (max(i) + 1 for i in zip(*self.filedict.keys()))
t0 = datetime.strptime(metadata['Info']['Time'], '%Y-%m-%d %H:%M:%S %z')
t0 = datetime.strptime(metadata["Info"]["Time"], "%Y-%m-%d %H:%M:%S %z")
ome.images.append(
model.Image(
pixels=model.Pixels(
size_c=size_c, size_z=size_z, size_t=size_t,
size_x=metadata['Info']['Width'], size_y=metadata['Info']['Height'],
dimension_order='XYCZT', # type: ignore
type=pixel_type, physical_size_x=pxsize, physical_size_y=pxsize,
physical_size_z=metadata['Info']['Summary']['z-step_um']),
objective_settings=model.ObjectiveSettings(id='Objective:0')))
size_c=size_c,
size_z=size_z,
size_t=size_t,
size_x=metadata["Info"]["Width"],
size_y=metadata["Info"]["Height"],
dimension_order="XYCZT", # type: ignore
type=pixel_type,
physical_size_x=pxsize,
physical_size_y=pxsize,
physical_size_z=metadata["Info"]["Summary"]["z-step_um"],
),
objective_settings=model.ObjectiveSettings(id="Objective:0"),
)
)
for c, z, t in product(range(size_c), range(size_z), range(size_t)):
ome.images[0].pixels.planes.append(
Plane(t0, self.filedict[c, z, t],
the_c=c, the_z=z, the_t=t, exposure_time=metadata['Info']['Exposure-ms'] / 1000))
Plane(
t0,
self.filedict[c, z, t],
the_c=c,
the_z=z,
the_t=t,
exposure_time=metadata["Info"]["Exposure-ms"] / 1000,
)
)
# compare channel names from metadata with filenames
pattern_c = re.compile(r'img_\d{3,}_(.*)_\d{3,}$', re.IGNORECASE)
pattern_c = re.compile(r"img_\d{3,}_(.*)_\d{3,}$", re.IGNORECASE)
for c in range(size_c):
ome.images[0].pixels.channels.append(
model.Channel(
id=f'Channel:{c}', name=pattern_c.findall(self.filedict[c, 0, 0].stem)[0],
id=f"Channel:{c}",
name=pattern_c.findall(self.filedict[c, 0, 0].stem)[0],
detector_settings=model.DetectorSettings(
id='Detector:0', binning=metadata['Info']['Hamamatsu_sCMOS-Binning']),
filter_set_ref=model.FilterSetRef(id='FilterSet:0')))
id="Detector:0", binning=metadata["Info"]["Hamamatsu_sCMOS-Binning"]
),
filter_set_ref=model.FilterSetRef(id="FilterSet:0"),
)
)
return ome
def open(self):
# /some_path/Pos4: path = /some_path, series = 4
# /some_path/5-Pos_001_005: path = /some_path/5-Pos_001_005, series = 0
if re.match(r'(?:\d+-)?Pos.*', self.path.name, re.IGNORECASE) is None:
pat = re.compile(rf'^(?:\d+-)?Pos{self.series}$', re.IGNORECASE)
if re.match(r"(?:\d+-)?Pos.*", self.path.name, re.IGNORECASE) is None:
pat = re.compile(rf"^(?:\d+-)?Pos{self.series}$", re.IGNORECASE)
files = sorted(file for file in self.path.iterdir() if pat.match(file.name))
if len(files):
path = files[0]
@@ -126,21 +158,26 @@ class Reader(AbstractReader, ABC):
else:
path = self.path
pat = re.compile(r'^img_\d{3,}.*\d{3,}.*\.tif$', re.IGNORECASE)
pat = re.compile(r"^img_\d{3,}.*\d{3,}.*\.tif$", re.IGNORECASE)
filelist = sorted([file for file in path.iterdir() if pat.search(file.name)])
with tifffile.TiffFile(self.path / filelist[0]) as tif:
metadata = {key: yaml.safe_load(value) for key, value in tif.pages[0].tags[50839].value.items()}
# compare channel names from metadata with filenames
cnamelist = metadata['Info']['Summary']['ChNames']
cnamelist = metadata["Info"]["Summary"]["ChNames"]
cnamelist = [c for c in cnamelist if any([c in f.name for f in filelist])]
pattern_c = re.compile(r'img_\d{3,}_(.*)_\d{3,}$', re.IGNORECASE)
pattern_z = re.compile(r'(\d{3,})$')
pattern_t = re.compile(r'img_(\d{3,})', re.IGNORECASE)
self.filedict = {(cnamelist.index(pattern_c.findall(file.stem)[0]), # noqa
pattern_c = re.compile(r"img_\d{3,}_(.*)_\d{3,}$", re.IGNORECASE)
pattern_z = re.compile(r"(\d{3,})$")
pattern_t = re.compile(r"img_(\d{3,})", re.IGNORECASE)
self.filedict = {
(
cnamelist.index(pattern_c.findall(file.stem)[0]), # noqa
int(pattern_z.findall(file.stem)[0]),
int(pattern_t.findall(file.stem)[0])): file for file in filelist}
int(pattern_t.findall(file.stem)[0]),
): file
for file in filelist
}
def __frame__(self, c=0, z=0, t=0):
return tifffile.imread(self.path / self.filedict[(c, z, t)])

View File

@@ -1,3 +1,5 @@
import re
import warnings
from abc import ABC
from functools import cached_property
from itertools import product
@@ -6,18 +8,18 @@ from pathlib import Path
import numpy as np
import tifffile
import yaml
from ome_types import model
from ome_types import from_xml, model
from .. import AbstractReader
from .. import AbstractReader, try_default
class Reader(AbstractReader, ABC):
priority = 0
do_not_pickle = 'reader'
do_not_pickle = "reader"
@staticmethod
def _can_open(path):
if isinstance(path, Path) and path.suffix in ('.tif', '.tiff'):
if isinstance(path, Path) and path.suffix in (".tif", ".tiff"):
with tifffile.TiffFile(path) as tif:
return tif.is_imagej and tif.pages[-1]._nextifd() == 0 # noqa
else:
@@ -25,20 +27,74 @@ class Reader(AbstractReader, ABC):
@cached_property
def metadata(self):
return {key: yaml.safe_load(value) if isinstance(value, str) else value
for key, value in self.reader.imagej_metadata.items()}
return {
key: try_default(yaml.safe_load, value, value) if isinstance(value, str) else value
for key, value in self.reader.imagej_metadata.items()
}
def get_ome(self):
if self.reader.is_ome:
pos_number_pat = re.compile(r"\d+")
def get_pos_number(s):
return [int(i) for i in pos_number_pat.findall(s)]
match = re.match(r"^(.*)(pos[\d_]+)(.*)$", self.path.name, flags=re.IGNORECASE)
if match is not None and len(match.groups()) == 3:
a, b, c = match.groups()
pat = re.compile(f"^{re.escape(a)}" + re.sub(r"\d+", r"\\d+", b) + f"{re.escape(c)}$")
backup_ome = []
backup_backup_ome = []
pos_number = get_pos_number(b)
for file in sorted(self.path.parent.iterdir(), key=lambda i: (len(i.name), i.name)):
if pat.match(file.name):
with tifffile.TiffFile(file) as tif:
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=UserWarning)
ome = from_xml(tif.ome_metadata)
backup_backup_ome.extend(ome.images)
try:
backup_ome.extend(
[
image
for image in ome.images
if pos_number == get_pos_number(image.stage_label.name)
]
)
except ValueError:
pass
ome.images = [image for image in ome.images if b == image.stage_label.name]
if ome.images:
return ome
if backup_ome:
ome.images = [backup_ome[0]]
warnings.warn(
"could not find the ome.tif file containing the metadata with an exact match, "
f"matched {ome.images[0].stage_label.name} with {b} instead, "
"did you rename the file?"
)
return ome
if backup_backup_ome:
ome.images = [backup_backup_ome[0]]
warnings.warn(
"could not find the ome.tif file containing the metadata, "
f"used metadata from {ome.images[0].name} instead, "
"did you rename the file"
)
return ome
warnings.warn("could not find the ome.tif file containing the metadata")
page = self.reader.pages[0]
size_y = page.imagelength
size_x = page.imagewidth
if self.p_ndim == 3:
size_c = page.samplesperpixel
size_t = self.metadata.get('frames', 1) # // C
size_t = self.metadata.get("frames", 1) # // C
else:
size_c = self.metadata.get('channels', 1)
size_t = self.metadata.get('frames', 1)
size_z = self.metadata.get('slices', 1)
size_c = self.metadata.get("channels", 1)
size_t = self.metadata.get("frames", 1)
size_z = self.metadata.get("slices", 1)
if 282 in page.tags and 296 in page.tags and page.tags[296].value == 1:
f = page.tags[282].value
pxsize = f[1] / f[0]
@@ -46,40 +102,69 @@ class Reader(AbstractReader, ABC):
pxsize = None
dtype = page.dtype.name
if dtype not in ('int8', 'int16', 'int32', 'uint8', 'uint16', 'uint32',
'float', 'double', 'complex', 'double-complex', 'bit'):
dtype = 'float'
if dtype not in (
"int8",
"int16",
"int32",
"uint8",
"uint16",
"uint32",
"float",
"double",
"complex",
"double-complex",
"bit",
):
dtype = "float"
interval_t = self.metadata.get('interval', 0)
interval_t = self.metadata.get("interval", 0)
ome = model.OME()
ome.instruments.append(model.Instrument(id='Instrument:0'))
ome.instruments[0].objectives.append(model.Objective(id='Objective:0'))
ome.instruments.append(model.Instrument(id="Instrument:0"))
ome.instruments[0].objectives.append(model.Objective(id="Objective:0"))
ome.images.append(
model.Image(
id='Image:0',
id="Image:0",
pixels=model.Pixels(
id='Pixels:0',
size_c=size_c, size_z=size_z, size_t=size_t, size_x=size_x, size_y=size_y,
dimension_order='XYCZT', type=dtype, # type: ignore
physical_size_x=pxsize, physical_size_y=pxsize),
objective_settings=model.ObjectiveSettings(id='Objective:0')))
id="Pixels:0",
size_c=size_c,
size_z=size_z,
size_t=size_t,
size_x=size_x,
size_y=size_y,
dimension_order="XYCZT",
type=dtype, # type: ignore
physical_size_x=pxsize,
physical_size_y=pxsize,
),
objective_settings=model.ObjectiveSettings(id="Objective:0"),
)
)
for c, z, t in product(range(size_c), range(size_z), range(size_t)):
ome.images[0].pixels.planes.append(model.Plane(the_c=c, the_z=z, the_t=t, delta_t=interval_t * t))
return ome
def open(self):
self.reader = tifffile.TiffFile(self.path)
page = self.reader.pages[0]
page = self.reader.pages.first
self.p_ndim = page.ndim # noqa
if self.p_ndim == 3:
self.p_transpose = [i for i in [page.axes.find(j) for j in 'SYX'] if i >= 0] # noqa
self.p_transpose = [i for i in [page.axes.find(j) for j in "SYX"] if i >= 0] # noqa
else:
self.p_transpose = [i for i in [page.axes.find(j) for j in "YX"] if i >= 0] # noqa
def close(self):
self.reader.close()
def __frame__(self, c, z, t):
def __frame__(self, c: int, z: int, t: int):
dimension_order = self.ome.images[0].pixels.dimension_order.value
if self.p_ndim == 3:
return np.transpose(self.reader.asarray(z + t * self.base_shape['z']), self.p_transpose)[c]
axes = "".join([ax.lower() for ax in dimension_order if ax.lower() in "zt"])
ct = {"z": z, "t": t}
n = sum([ct[ax] * np.prod(self.base_shape[axes[:i]]) for i, ax in enumerate(axes)])
return np.transpose(self.reader.asarray(int(n)), self.p_transpose)[int(c)]
else:
return self.reader.asarray(c + z * self.base_shape['c'] + t * self.base_shape['c'] * self.base_shape['z'])
axes = "".join([ax.lower() for ax in dimension_order if ax.lower() in "czt"])
czt = {"c": c, "z": z, "t": t}
n = sum([czt[ax] * np.prod(self.base_shape[axes[:i]]) for i, ax in enumerate(axes)])
return np.transpose(self.reader.asarray(int(n)), self.p_transpose)

View File

@@ -1,6 +1,4 @@
#Insight Transform File V1.0
#Transform 0
Transform: CompositeTransform_double_2_2
#Transform 1
Transform: AffineTransform_double_2_2
Parameters: 1 0 0 1 0 0

View File

@@ -16,12 +16,17 @@ except ImportError:
sitk = None
try:
from pandas import DataFrame, Series, concat
import pandas as pd
except ImportError:
DataFrame, Series, concat = None, None, None
pd = None
try:
import polars as pl
except ImportError:
pl = None
if hasattr(yaml, 'full_load'):
if hasattr(yaml, "full_load"):
yamlload = yaml.full_load
else:
yamlload = yaml.load
@@ -34,7 +39,7 @@ class Transforms(dict):
@classmethod
def from_file(cls, file, C=True, T=True):
with open(Path(file).with_suffix('.yml')) as f:
with open(Path(file).with_suffix(".yml")) as f:
return cls.from_dict(yamlload(f), C, T)
@classmethod
@@ -42,7 +47,7 @@ class Transforms(dict):
new = cls()
for key, value in d.items():
if isinstance(key, str) and C:
new[key.replace(r'\:', ':').replace('\\\\', '\\')] = Transform.from_dict(value)
new[key.replace(r"\:", ":").replace("\\\\", "\\")] = Transform.from_dict(value)
elif T:
new[key] = Transform.from_dict(value)
return new
@@ -69,8 +74,10 @@ class Transforms(dict):
return new
def asdict(self):
return {key.replace('\\', '\\\\').replace(':', r'\:') if isinstance(key, str) else key: value.asdict()
for key, value in self.items()}
return {
key.replace("\\", "\\\\").replace(":", r"\:") if isinstance(key, str) else key: value.asdict()
for key, value in self.items()
}
def __getitem__(self, item):
return np.prod([self[i] for i in item[::-1]]) if isinstance(item, tuple) else super().__getitem__(item)
@@ -88,7 +95,7 @@ class Transforms(dict):
return hash(frozenset((*self.__dict__.items(), *self.items())))
def save(self, file):
with open(Path(file).with_suffix('.yml'), 'w') as f:
with open(Path(file).with_suffix(".yml"), "w") as f:
yaml.safe_dump(self.asdict(), f, default_flow_style=None)
def copy(self):
@@ -109,8 +116,9 @@ class Transforms(dict):
transform_channels = {key for key in self.keys() if isinstance(key, str)}
if set(channel_names) - transform_channels:
mapping = key_map(channel_names, transform_channels)
warnings.warn(f'The image file and the transform do not have the same channels,'
f' creating a mapping: {mapping}')
warnings.warn(
f"The image file and the transform do not have the same channels, creating a mapping: {mapping}"
)
for key_im, key_t in mapping.items():
self[key_im] = self[key_t]
@@ -123,21 +131,27 @@ class Transforms(dict):
return inverse
def coords_pandas(self, array, channel_names, columns=None):
if isinstance(array, DataFrame):
return concat([self.coords_pandas(row, channel_names, columns) for _, row in array.iterrows()], axis=1).T
elif isinstance(array, Series):
if pd is None:
raise ImportError("pandas is not available")
if isinstance(array, pd.DataFrame):
return pd.concat(
[self.coords_pandas(row, channel_names, columns) for _, row in array.iterrows()], axis=1
).T
elif isinstance(array, pd.Series):
key = []
if 'C' in array:
key.append(channel_names[int(array['C'])])
if 'T' in array:
key.append(int(array['T']))
if "C" in array:
key.append(channel_names[int(array["C"])])
if "T" in array:
key.append(int(array["T"]))
return self[tuple(key)].coords(array, columns)
else:
raise TypeError('Not a pandas DataFrame or Series.')
raise TypeError("Not a pandas DataFrame or Series.")
def with_beads(self, cyllens, bead_files):
assert len(bead_files) > 0, 'At least one file is needed to calculate the registration.'
transforms = [self.calculate_channel_transforms(file, cyllens) for file in bead_files]
def with_beads(self, cyllens, bead_files, main_channel=None, default_transform=None):
assert len(bead_files) > 0, "At least one file is needed to calculate the registration."
transforms = [
self.calculate_channel_transforms(file, cyllens, main_channel, default_transform) for file in bead_files
]
for key in {key for transform in transforms for key in transform.keys()}:
new_transforms = [transform[key] for transform in transforms if key in transform]
if len(new_transforms) == 1:
@@ -145,16 +159,18 @@ class Transforms(dict):
else:
self[key] = Transform()
self[key].parameters = np.mean([t.parameters for t in new_transforms], 0)
self[key].dparameters = (np.std([t.parameters for t in new_transforms], 0) /
np.sqrt(len(new_transforms))).tolist()
self[key].dparameters = (
np.std([t.parameters for t in new_transforms], 0) / np.sqrt(len(new_transforms))
).tolist()
return self
@staticmethod
def get_bead_files(path):
from . import Imread
files = []
for file in path.iterdir():
if file.name.lower().startswith('beads'):
if file.name.lower().startswith("beads"):
try:
with Imread(file):
files.append(file)
@@ -162,92 +178,97 @@ class Transforms(dict):
pass
files = sorted(files)
if not files:
raise Exception('No bead file found!')
raise Exception("No bead file found!")
checked_files = []
for file in files:
try:
if file.is_dir():
file /= 'Pos0'
file /= "Pos0"
with Imread(file): # check for errors opening the file
checked_files.append(file)
except (Exception,):
continue
if not checked_files:
raise Exception('No bead file found!')
raise Exception("No bead file found!")
return checked_files
@staticmethod
def calculate_channel_transforms(bead_file, cyllens):
def calculate_channel_transforms(bead_file, cyllens, main_channel=None, default_transform=None):
"""When no channel is not transformed by a cylindrical lens, assume that the image is scaled by a factor 1.162
in the horizontal direction"""
from . import Imread
with Imread(bead_file, axes='zcyx') as im: # noqa
max_ims = im.max('z')
with Imread(bead_file, axes="zcyx") as im: # noqa
max_ims = im.max("z")
goodch = [c for c, max_im in enumerate(max_ims) if not im.is_noise(max_im)]
if not goodch:
goodch = list(range(len(max_ims)))
untransformed = [c for c in range(im.shape['c']) if cyllens[im.detector[c]].lower() == 'none']
untransformed = [c for c in range(im.shape["c"]) if cyllens[im.detector[c]].lower() == "none"]
good_and_untrans = sorted(set(goodch) & set(untransformed))
if main_channel is None:
if good_and_untrans:
masterch = good_and_untrans[0]
main_channel = good_and_untrans[0]
else:
masterch = goodch[0]
main_channel = goodch[0]
transform = Transform()
if not good_and_untrans:
matrix = transform.matrix
if default_transform is None:
matrix[0, 0] = 0.86
else:
for i, t in zip(((0, 0), (0, 1), (1, 0), (1, 1), (0, 2), (1, 2)), default_transform):
matrix[i] = t
transform.matrix = matrix
transforms = Transforms()
for c in tqdm(goodch, desc='Calculating channel transforms'): # noqa
if c == masterch:
for c in tqdm(goodch, desc="Calculating channel transforms"): # noqa
if c == main_channel:
transforms[im.channel_names[c]] = transform
else:
transforms[im.channel_names[c]] = Transform.register(max_ims[masterch], max_ims[c]) * transform
transforms[im.channel_names[c]] = Transform.register(max_ims[main_channel], max_ims[c]) * transform
return transforms
@staticmethod
def save_channel_transform_tiff(bead_files, tiffile):
def save_channel_transform_tiff(bead_files, tiffile, default_transform=None):
from . import Imread
n_channels = 0
for file in bead_files:
with Imread(file) as im:
n_channels = max(n_channels, im.shape['c'])
n_channels = max(n_channels, im.shape["c"])
with IJTiffFile(tiffile) as tif:
for t, file in enumerate(bead_files):
with Imread(file) as im:
with Imread(file).with_transform() as jm:
for c in range(im.shape['c']):
tif.save(np.hstack((im(c=c, t=0).max('z'), jm(c=c, t=0).max('z'))), c, 0, t)
with Imread(file).with_transform(default_transform=default_transform) as jm:
for c in range(im.shape["c"]):
tif.save(np.hstack((im(c=c, t=0).max("z"), jm(c=c, t=0).max("z"))), c, 0, t)
def with_drift(self, im):
"""Calculate shifts relative to the first frame
divide the sequence into groups,
compare each frame to the frame in the middle of the group and compare these middle frames to each other
"""
im = im.transpose('tzycx')
t_groups = [list(chunk) for chunk in Chunks(range(im.shape['t']), size=round(np.sqrt(im.shape['t'])))]
im = im.transpose("tzycx")
t_groups = [list(chunk) for chunk in Chunks(range(im.shape["t"]), size=round(np.sqrt(im.shape["t"])))]
t_keys = [int(np.round(np.mean(t_group))) for t_group in t_groups]
t_pairs = [(int(np.round(np.mean(t_group))), frame) for t_group in t_groups for frame in t_group]
t_pairs.extend(zip(t_keys, t_keys[1:]))
fmaxz_keys = {t_key: filters.gaussian(im[t_key].max('z'), 5) for t_key in t_keys}
fmaxz_keys = {t_key: filters.gaussian(im[t_key].max("z"), 5) for t_key in t_keys}
def fun(t_key_t, im, fmaxz_keys):
t_key, t = t_key_t
if t_key == t:
return 0, 0
else:
fmaxz = filters.gaussian(im[t].max('z'), 5)
return Transform.register(fmaxz_keys[t_key], fmaxz, 'translation').parameters[4:]
fmaxz = filters.gaussian(im[t].max("z"), 5)
return Transform.register(fmaxz_keys[t_key], fmaxz, "translation").parameters[4:]
shifts = np.array(pmap(fun, t_pairs, (im, fmaxz_keys), desc='Calculating image shifts.'))
shifts = np.array(pmap(fun, t_pairs, (im, fmaxz_keys), desc="Calculating image shifts."))
shift_keys_cum = np.zeros(2)
for shift_keys, t_group in zip(np.vstack((-shifts[0], shifts[im.shape['t']:])), t_groups):
for shift_keys, t_group in zip(np.vstack((-shifts[0], shifts[im.shape["t"] :])), t_groups):
shift_keys_cum += shift_keys
shifts[t_group] += shift_keys_cum
for i, shift in enumerate(shifts[:im.shape['t']]):
for i, shift in enumerate(shifts[: im.shape["t"]]):
self[i] = Transform.from_shift(shift)
return self
@@ -257,9 +278,9 @@ class Transform:
if sitk is None:
self.transform = None
else:
self.transform = sitk.ReadTransform(str(Path(__file__).parent / 'transform.txt'))
self.dparameters = [0., 0., 0., 0., 0., 0.]
self.shape = [512., 512.]
self.transform = sitk.ReadTransform(str(Path(__file__).parent / "transform.txt"))
self.dparameters = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
self.shape = [512.0, 512.0]
self.origin = [255.5, 255.5]
self._last, self._inverse = None, None
@@ -276,10 +297,11 @@ class Transform:
def register(cls, fix, mov, kind=None):
"""kind: 'affine', 'translation', 'rigid'"""
if sitk is None:
raise ImportError('SimpleElastix is not installed: '
'https://simpleelastix.readthedocs.io/GettingStarted.html')
raise ImportError(
"SimpleElastix is not installed: https://simpleelastix.readthedocs.io/GettingStarted.html"
)
new = cls()
kind = kind or 'affine'
kind = kind or "affine"
new.shape = fix.shape
fix, mov = new.cast_image(fix), new.cast_image(mov)
# TODO: implement RigidTransform
@@ -290,16 +312,16 @@ class Transform:
tfilter.SetParameterMap(sitk.GetDefaultParameterMap(kind))
tfilter.Execute()
transform = tfilter.GetTransformParameterMap()[0]
if kind == 'affine':
new.parameters = [float(t) for t in transform['TransformParameters']]
new.shape = [float(t) for t in transform['Size']]
new.origin = [float(t) for t in transform['CenterOfRotationPoint']]
elif kind == 'translation':
new.parameters = [1.0, 0.0, 0.0, 1.0] + [float(t) for t in transform['TransformParameters']]
new.shape = [float(t) for t in transform['Size']]
if kind == "affine":
new.parameters = [float(t) for t in transform["TransformParameters"]]
new.shape = [float(t) for t in transform["Size"]]
new.origin = [float(t) for t in transform["CenterOfRotationPoint"]]
elif kind == "translation":
new.parameters = [1.0, 0.0, 0.0, 1.0] + [float(t) for t in transform["TransformParameters"]]
new.shape = [float(t) for t in transform["Size"]]
new.origin = [(t - 1) / 2 for t in new.shape]
else:
raise NotImplementedError(f'{kind} tranforms not implemented (yet)')
raise NotImplementedError(f"{kind} transforms not implemented (yet)")
new.dparameters = 6 * [np.nan]
return new
@@ -315,16 +337,24 @@ class Transform:
@classmethod
def from_file(cls, file):
with open(Path(file).with_suffix('.yml')) as f:
with open(Path(file).with_suffix(".yml")) as f:
return cls.from_dict(yamlload(f))
@classmethod
def from_dict(cls, d):
new = cls()
new.origin = [float(i) for i in d['CenterOfRotationPoint']]
new.parameters = [float(i) for i in d['TransformParameters']]
new.dparameters = [float(i) for i in d['dTransformParameters']] if 'dTransformParameters' in d else 6 * [np.nan]
new.shape = [float(i) for i in d['Size']]
new.origin = None if d["CenterOfRotationPoint"] is None else [float(i) for i in d["CenterOfRotationPoint"]]
new.parameters = (
(1.0, 0.0, 0.0, 1.0, 0.0, 0.0)
if d["TransformParameters"] is None
else [float(i) for i in d["TransformParameters"]]
)
new.dparameters = (
[(0.0, 0.0, 0.0, 0.0, 0.0, 0.0) if i is None else float(i) for i in d["dTransformParameters"]]
if "dTransformParameters" in d
else 6 * [np.nan] and d["dTransformParameters"] is not None
)
new.shape = None if d["Size"] is None else [None if i is None else float(i) for i in d["Size"]]
return new
def __mul__(self, other): # TODO: take care of dmatrix
@@ -357,9 +387,9 @@ class Transform:
@property
def matrix(self):
return np.array(((*self.parameters[:2], self.parameters[4]),
(*self.parameters[2:4], self.parameters[5]),
(0, 0, 1)))
return np.array(
((*self.parameters[:2], self.parameters[4]), (*self.parameters[2:4], self.parameters[5]), (0, 0, 1))
)
@matrix.setter
def matrix(self, value):
@@ -368,9 +398,9 @@ class Transform:
@property
def dmatrix(self):
return np.array(((*self.dparameters[:2], self.dparameters[4]),
(*self.dparameters[2:4], self.dparameters[5]),
(0, 0, 0)))
return np.array(
((*self.dparameters[:2], self.dparameters[4]), (*self.dparameters[2:4], self.dparameters[5]), (0, 0, 0))
)
@dmatrix.setter
def dmatrix(self, value):
@@ -381,6 +411,8 @@ class Transform:
def parameters(self):
if self.transform is not None:
return list(self.transform.GetParameters())
else:
return [1.0, 0.0, 0.0, 1.0, 0.0, 0.0]
@parameters.setter
def parameters(self, value):
@@ -416,18 +448,23 @@ class Transform:
self.shape = shape[:2]
def asdict(self):
return {'CenterOfRotationPoint': self.origin, 'Size': self.shape, 'TransformParameters': self.parameters,
'dTransformParameters': np.nan_to_num(self.dparameters, nan=1e99).tolist()}
return {
"CenterOfRotationPoint": self.origin,
"Size": self.shape,
"TransformParameters": self.parameters,
"dTransformParameters": np.nan_to_num(self.dparameters, nan=1e99).tolist(),
}
def frame(self, im, default=0):
if self.is_unity():
return im
else:
if sitk is None:
raise ImportError('SimpleElastix is not installed: '
'https://simpleelastix.readthedocs.io/GettingStarted.html')
raise ImportError(
"SimpleElastix is not installed: https://simpleelastix.readthedocs.io/GettingStarted.html"
)
dtype = im.dtype
im = im.astype('float')
im = im.astype("float")
intp = sitk.sitkBSpline if np.issubdtype(dtype, np.floating) else sitk.sitkNearestNeighbor
return self.cast_array(sitk.Resample(self.cast_image(im), self.transform, intp, default)).astype(dtype)
@@ -437,14 +474,21 @@ class Transform:
"""
if self.is_unity():
return array.copy()
elif DataFrame is not None and isinstance(array, (DataFrame, Series)):
columns = columns or ['x', 'y']
elif pd is not None and isinstance(array, (pd.DataFrame, pd.Series)):
columns = columns or ["x", "y"]
array = array.copy()
if isinstance(array, DataFrame):
if isinstance(array, pd.DataFrame):
array[columns] = self.coords(np.atleast_2d(array[columns].to_numpy()))
elif isinstance(array, Series):
elif isinstance(array, pd.Series):
array[columns] = self.coords(np.atleast_2d(array[columns].to_numpy()))[0]
return array
elif pl is not None and isinstance(array, (pl.DataFrame, pl.LazyFrame)):
columns = columns or ["x", "y"]
if isinstance(array, pl.DataFrame):
xy = self.coords(np.atleast_2d(array.select(columns).to_numpy()))
elif isinstance(array, pl.LazyFrame):
xy = self.coords(np.atleast_2d(array.select(columns).collect().to_numpy()))
return array.with_columns(**{c: i for c, i in zip(columns, xy.T)})
else: # somehow we need to use the inverse here to get the same effect as when using self.frame
return np.array([self.inverse.transform.TransformPoint(i.tolist()) for i in np.asarray(array)])
@@ -452,7 +496,7 @@ class Transform:
"""save the parameters of the transform calculated
with affine_registration to a yaml file
"""
if not file[-3:] == 'yml':
file += '.yml'
with open(file, 'w') as f:
if not file[-3:] == "yml":
file += ".yml"
with open(file, "w") as f:
yaml.safe_dump(self.asdict(), f, default_flow_style=None)

View File

@@ -1,46 +1,43 @@
[tool.poetry]
[project]
name = "ndbioimage"
version = "2025.1.2"
version = "2026.1.2"
description = "Bio image reading, metadata and some affine registration."
authors = ["W. Pomp <w.pomp@nki.nl>"]
license = "GPLv3"
authors = [
{ name = "W. Pomp", email = "w.pomp@nki.nl" }
]
license = { text = "GPL-3.0-or-later"}
readme = "README.md"
keywords = ["bioformats", "imread", "numpy", "metadata"]
include = ["transform.txt"]
repository = "https://github.com/wimpomp/ndbioimage"
requires-python = ">=3.10"
exclude = ["ndbioimage/jars"]
[tool.poetry.dependencies]
python = "^3.10"
numpy = ">=1.20.0"
pandas = "*"
tifffile = "*"
czifile = "2019.7.2"
tiffwrite = ">=2024.12.1"
ome-types = ">=0.4.0"
pint = "*"
tqdm = "*"
lxml = "*"
pyyaml = "*"
parfor = ">=2025.1.0"
JPype1 = "*"
SimpleITK-SimpleElastix = [
{ version = "*", python = "<3.12" },
{ version = "*", python = ">=3.12", markers = "sys_platform != 'darwin'" },
{ version = "*", python = ">=3.12", markers = "platform_machine == 'aarch64'" },
dependencies = [
"czifile == 2019.7.2",
"imagecodecs",
"lxml",
"numpy >= 1.20",
"ome-types",
"pandas",
"parfor >= 2025.1.0",
"pint",
"pyyaml",
"SimpleITK-SimpleElastix; sys_platform != 'darwin'",
"scikit-image",
"tifffile <= 2025.1.10",
"tiffwrite >= 2024.12.1",
"tqdm",
]
scikit-image = "*"
imagecodecs = "*"
xsdata = "^23" # until pydantic is up-to-date
matplotlib = { version = "*", optional = true }
scikit-video = { version = "*", optional = true }
pytest = { version = "*", optional = true }
[tool.poetry.extras]
[project.optional-dependencies]
test = ["pytest"]
write = ["matplotlib", "scikit-video"]
bioformats = ["JPype1"]
[tool.poetry.scripts]
[project.urls]
repository = "https://github.com/wimpomp/ndbioimage"
[project.scripts]
ndbioimage = "ndbioimage:main"
[tool.pytest.ini_options]
@@ -49,6 +46,10 @@ filterwarnings = ["ignore:::(colorcet)"]
[tool.isort]
line_length = 119
[tool.ruff]
line-length = 119
indent-width = 4
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

View File

@@ -7,7 +7,7 @@ import pytest
from ndbioimage import Imread, ReaderNotFoundError
@pytest.mark.parametrize('file', (Path(__file__).parent / 'files').iterdir())
@pytest.mark.parametrize("file", (Path(__file__).parent / "files").iterdir())
def test_open(file):
try:
with Imread(file) as im:
@@ -21,7 +21,7 @@ def test_open(file):
w = pickle.loads(b)
assert w[dict(c=0, z=0, t=0)].mean() == mean
except ReaderNotFoundError:
assert len(Imread.__subclasses__()), 'No subclasses for Imread found.'
assert len(Imread.__subclasses__()), "No subclasses for Imread found."
for child in active_children():
child.kill()

View File

@@ -11,8 +11,9 @@ im = Imread(r)
a = np.array(im)
@pytest.mark.parametrize('s', combinations_with_replacement(
(0, -1, 1, slice(None), slice(0, 1), slice(-1, 0), slice(1, 1)), 5))
@pytest.mark.parametrize(
"s", combinations_with_replacement((0, -1, 1, slice(None), slice(0, 1), slice(-1, 0), slice(1, 1)), 5)
)
def test_slicing(s):
s_im, s_a = im[s], a[s]
if isinstance(s_a, Number):

View File

@@ -10,10 +10,30 @@ im = Imread(r)
a = np.array(im)
@pytest.mark.parametrize('fun_and_axis', product(
(np.sum, np.nansum, np.min, np.nanmin, np.max, np.nanmax, np.argmin, np.argmax,
np.mean, np.nanmean, np.var, np.nanvar, np.std, np.nanstd), (None, 0, 1, 2, 3, 4)))
@pytest.mark.parametrize(
"fun_and_axis",
product(
(
np.sum,
np.nansum,
np.min,
np.nanmin,
np.max,
np.nanmax,
np.argmin,
np.argmax,
np.mean,
np.nanmean,
np.var,
np.nanvar,
np.std,
np.nanstd,
),
(None, 0, 1, 2, 3, 4),
),
)
def test_ufuncs(fun_and_axis):
fun, axis = fun_and_axis
assert np.all(np.isclose(fun(im, axis), fun(a, axis))), \
f'function {fun.__name__} over axis {axis} does not give the correct result'
assert np.all(np.isclose(fun(im, axis), fun(a, axis))), (
f"function {fun.__name__} over axis {axis} does not give the correct result"
)