diff --git a/README.md b/README.md index 93b337e..74b969b 100644 --- a/README.md +++ b/README.md @@ -63,11 +63,11 @@ automatically recognize it and use it to open the appropriate file format. Image are required to implement the following methods: - staticmethod _can_open(path): return True if path can be opened by this reader -- property ome: reads metadata from file and adds them to an OME object imported -from the ome-types library - \_\_frame__(self, c, z, t): return the frame at channel=c, z-slice=z, time=t from the file Optional methods: +- get_ome: reads metadata from file and adds them to an OME object imported +from the ome-types library - open(self): maybe open some file handle - close(self): close any file handles diff --git a/ndbioimage/__init__.py b/ndbioimage/__init__.py index d0d806e..ae67ed2 100755 --- a/ndbioimage/__init__.py +++ b/ndbioimage/__init__.py @@ -256,11 +256,16 @@ class Imread(np.lib.mixins.NDArrayOperatorsMixin, ABC): return super().__new__(subclass) raise ReaderNotFoundError(f'No reader found for {path}.') - def __init__(self, base: Imread = None, - slice: tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray] = None, # noqa - shape: tuple[int, ...] = (0, 0, 0, 0, 0), - dtype: DTypeLike = None, - frame_decorator: Callable[[Imread, np.ndarray, int, int, int], np.ndarray] = None) -> None: + def __init__(self, *args: Any, **kwargs: Any): + def parse(base: Imread = None, # noqa + slice: tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray] = None, # noqa + shape: tuple[int, ...] = (0, 0, 0, 0, 0), # noqa + dtype: DTypeLike = None, # noqa + frame_decorator: Callable[[Imread, np.ndarray, int, int, int], np.ndarray] = None # noqa + ) -> tuple[Any, ...]: + return base, slice, shape, dtype, frame_decorator + + base, slice, shape, dtype, frame_decorator = parse(*args, **kwargs) # noqa self.base = base or self self.slice = slice self._shape = Shape(shape) diff --git a/ndbioimage/readers/cziread.py b/ndbioimage/readers/cziread.py index 67d16a5..083cd0e 100644 --- a/ndbioimage/readers/cziread.py +++ b/ndbioimage/readers/cziread.py @@ -1,16 +1,17 @@ import re import warnings from abc import ABC +from functools import cached_property from io import BytesIO from itertools import product from pathlib import Path -from typing import Any, TypeVar, Optional +from typing import Any, Callable, Optional, TypeVar import czifile import imagecodecs import numpy as np from lxml import etree -from ome_types import model, OME +from ome_types import OME, model from tifffile import repeat_nd from .. import AbstractReader @@ -172,417 +173,15 @@ class Reader(AbstractReader, ABC): filedict[c, z, t].append(directory_entry) else: filedict[c, z, t] = [directory_entry] + if len(filedict) == 0: + raise FileNotFoundError(f'Series {self.series} not found in {self.path}.') self.filedict = filedict # noqa def close(self) -> None: self.reader.close() def get_ome(self) -> OME: - xml = self.reader.metadata() - attachments = {i.attachment_entry.name: i.attachment_entry.data_segment() - for i in self.reader.attachments()} - tree = etree.fromstring(xml) - metadata = tree.find('Metadata') - version = metadata.find('Version') - if version is not None: - version = version.text - else: - version = metadata.find('Experiment').attrib['Version'] - - if version == '1.0': - return self.ome_10(tree, attachments) - elif version == '1.2': - return self.ome_12(tree, attachments) - - def ome_12(self, tree: etree, attachments: dict[str, Any]) -> OME: - def text(item: Optional[Element], default: str = "") -> str: - return default if item is None else item.text - - def def_list(item: Any) -> list[Any]: - return [] if item is None else item - - ome = OME() - - metadata = tree.find('Metadata') - - information = metadata.find('Information') - display_setting = metadata.find('DisplaySetting') - ome.experimenters = [model.Experimenter(id='Experimenter:0', - user_name=information.find('Document').find('UserName').text)] - - instrument = information.find('Instrument') - for _ in instrument.find('Microscopes'): - ome.instruments.append(model.Instrument(id='Instrument:0')) - - for detector in instrument.find('Detectors'): - try: - detector_type = model.Detector_Type(text(detector.find('Type')).upper() or "") - except ValueError: - detector_type = model.Detector_Type.OTHER - - ome.instruments[0].detectors.append( - model.Detector( - id=detector.attrib['Id'].replace(' ', ''), model=text(detector.find('Manufacturer').find('Model')), - type=detector_type - )) - - for objective in instrument.find('Objectives'): - ome.instruments[0].objectives.append( - model.Objective( - id=objective.attrib['Id'], - model=text(objective.find('Manufacturer').find('Model')), - immersion=text(objective.find('Immersion')), - lens_na=float(text(objective.find('LensNA'))), - nominal_magnification=float(text(objective.find('NominalMagnification'))))) - - for tubelens in instrument.find('TubeLenses'): - try: - nominal_magnification = float(re.findall(r'\d+(?:[,.]\d*)?', - tubelens.attrib['Name'])[0].replace(',', '.')) - except Exception: # noqa - nominal_magnification = 1.0 - - ome.instruments[0].objectives.append( - model.Objective( - id=f"Objective:{tubelens.attrib['Id']}", - model=tubelens.attrib['Name'], - nominal_magnification=nominal_magnification)) - - for light_source in def_list(instrument.find('LightSources')): - if light_source.find('LightSourceType').find('Laser') is not None: - ome.instruments[0].lasers.append( - model.Laser( - id=f"LightSource:{light_source.attrib['Id']}", - power=float(text(light_source.find('Power'))), - wavelength=float(light_source.attrib['Id'][-3:]))) - - x_min = min([f.start[f.axes.index('X')] for f in self.filedict[0, 0, 0]]) - y_min = min([f.start[f.axes.index('Y')] for f in self.filedict[0, 0, 0]]) - x_max = max([f.start[f.axes.index('X')] + f.shape[f.axes.index('X')] for f in self.filedict[0, 0, 0]]) - y_max = max([f.start[f.axes.index('Y')] + f.shape[f.axes.index('Y')] for f in self.filedict[0, 0, 0]]) - size_x = x_max - x_min - size_y = y_max - y_min - size_c, size_z, size_t = (self.reader.shape[self.reader.axes.index(directory_entry)] - for directory_entry in 'CZT') - - image = information.find('Image') - pixel_type = text(image.find('PixelType'), 'Gray16') - if pixel_type.startswith('Gray'): - pixel_type = 'uint' + pixel_type[4:] - objective_settings = image.find('ObjectiveSettings') - try: # TODO - scenes = image.find('Dimensions').find('S').find('Scenes') - center_position = [float(pos) for pos in text(scenes[0].find('CenterPosition')).split(',')] - except AttributeError: - center_position = [0, 0] - um = model.UnitsLength.MICROMETER - nm = model.UnitsLength.NANOMETER - - ome.images.append( - model.Image( - id='Image:0', - name=f"{text(information.find('Document').find('Name'))} #1", - pixels=model.Pixels( - id='Pixels:0', size_x=size_x, size_y=size_y, - size_c=size_c, size_z=size_z, size_t=size_t, - dimension_order='XYCZT', type=pixel_type, - significant_bits=int(text(image.find('ComponentBitCount'))), - big_endian=False, interleaved=False, metadata_only=True), - experimenter_ref=model.ExperimenterRef(id='Experimenter:0'), - instrument_ref=model.InstrumentRef(id='Instrument:0'), - objective_settings=model.ObjectiveSettings( - id=objective_settings.find('ObjectiveRef').attrib['Id'], - medium=text(objective_settings.find('Medium')), - refractive_index=float(text(objective_settings.find('RefractiveIndex')))), - stage_label=model.StageLabel( - name=f'Scene position #0', - x=center_position[0], x_unit=um, - y=center_position[1], y_unit=um))) - - for distance in metadata.find('Scaling').find('Items'): - if distance.attrib['Id'] == 'X': - ome.images[0].pixels.physical_size_x = float(text(distance.find('Value'))) * 1e6 - elif distance.attrib['Id'] == 'Y': - ome.images[0].pixels.physical_size_y = float(text(distance.find('Value'))) * 1e6 - elif size_z > 1 and distance.attrib['Id'] == 'Z': - ome.images[0].pixels.physical_size_z = float(text(distance.find('Value'))) * 1e6 - - channels_im = {channel.attrib['Id']: channel for channel in image.find('Dimensions').find('Channels')} - channels_ds = {channel.attrib['Id']: channel for channel in display_setting.find('Channels')} - - for idx, (key, channel) in enumerate(channels_im.items()): - detector_settings = channel.find('DetectorSettings') - laser_scan_info = channel.find('LaserScanInfo') - detector = detector_settings.find('Detector') - try: - binning = model.Binning(text(detector_settings.find('Binning'))) - except ValueError: - binning = model.Binning.OTHER - - light_sources_settings = channel.find('LightSourcesSettings') - # no space in ome for multiple lightsources simultaneously - if light_sources_settings is not None: - light_source_settings = light_sources_settings[0] - light_source_settings = model.LightSourceSettings( - id='LightSource:' + '_'.join([light_source_settings.find('LightSource').attrib['Id'] - for light_source_settings in light_sources_settings]), - attenuation=float(text(light_source_settings.find('Attenuation'))), - wavelength=float(text(light_source_settings.find('Wavelength'))), - wavelength_unit=nm) - else: - light_source_settings = None - - ome.images[0].pixels.channels.append( - model.Channel( - id=f'Channel:{idx}', - name=channel.attrib['Name'], - acquisition_mode=text(channel.find('AcquisitionMode')).replace('SingleMoleculeLocalisation', - 'SingleMoleculeImaging'), - color=model.Color(text(channels_ds[channel.attrib['Id']].find('Color'), 'white')), - detector_settings=model.DetectorSettings( - id=detector.attrib['Id'].replace(' ', ""), - binning=binning), - emission_wavelength=i if (i := text(channel.find('EmissionWavelength'))) != '0' else '100', - excitation_wavelength=text(channel.find('ExcitationWavelength')), - # filter_set_ref=model.FilterSetRef(id=ome.instruments[0].filter_sets[filterset_idx].id), - illumination_type=text(channel.find('IlluminationType')), - light_source_settings=light_source_settings, - samples_per_pixel=int(text(laser_scan_info.find('Averaging'), '1')))) - - exposure_times = [float(text(channel.find('LaserScanInfo').find('FrameTime'), '100')) for channel in - channels_im.values()] - delta_ts = attachments['TimeStamps'].data() - dt = np.diff(delta_ts) - if np.std(dt) / np.mean(dt) > 0.02: - dt = np.median(dt[dt > 0]) - delta_ts = dt * np.arange(len(delta_ts)) - warnings.warn(f'delta_t is inconsistent, using median value: {dt}') - - for t, z, c in product(range(size_t), range(size_z), range(size_c)): - ome.images[0].pixels.planes.append( - model.Plane(the_c=c, the_z=z, the_t=t, delta_t=delta_ts[t], exposure_time=exposure_times[c])) - - idx = 0 - for layer in [] if (ml := metadata.find('Layers')) is None else ml: - rectangle = layer.find('Elements').find('Rectangle') - if rectangle is not None: - geometry = rectangle.find('Geometry') - roi = model.ROI(id=f'ROI:{idx}', description=text(layer.find('Usage'))) - roi.union.append( - model.Rectangle( - id='Shape:0:0', - height=float(text(geometry.find('Height'))), - width=float(text(geometry.find('Width'))), - x=float(text(geometry.find('Left'))), - y=float(text(geometry.find('Top'))))) - ome.rois.append(roi) - ome.images[0].roi_refs.append(model.ROIRef(id=f'ROI:{idx}')) - idx += 1 - return ome - - def ome_10(self, tree: etree, attachments: dict[str, Any]) -> OME: - def text(item: Optional[Element], default: str = "") -> str: - return default if item is None else item.text - - def def_list(item: Any) -> list[Any]: - return [] if item is None else item - - ome = OME() - - metadata = tree.find('Metadata') - - information = metadata.find('Information') - display_setting = metadata.find('DisplaySetting') - experiment = metadata.find('Experiment') - acquisition_block = experiment.find('ExperimentBlocks').find('AcquisitionBlock') - - ome.experimenters = [model.Experimenter(id='Experimenter:0', - user_name=information.find('User').find('DisplayName').text)] - - instrument = information.find('Instrument') - ome.instruments.append(model.Instrument(id=instrument.attrib['Id'])) - - for detector in instrument.find('Detectors'): - try: - detector_type = model.Detector_Type(text(detector.find('Type')).upper() or "") - except ValueError: - detector_type = model.Detector_Type.OTHER - - ome.instruments[0].detectors.append( - model.Detector( - id=detector.attrib['Id'], model=text(detector.find('Manufacturer').find('Model')), - amplification_gain=float(text(detector.find('AmplificationGain'))), - gain=float(text(detector.find('Gain'))), zoom=float(text(detector.find('Zoom'))), - type=detector_type - )) - - for objective in instrument.find('Objectives'): - ome.instruments[0].objectives.append( - model.Objective( - id=objective.attrib['Id'], - model=text(objective.find('Manufacturer').find('Model')), - immersion=text(objective.find('Immersion')), - lens_na=float(text(objective.find('LensNA'))), - nominal_magnification=float(text(objective.find('NominalMagnification'))))) - - for light_source in def_list(instrument.find('LightSources')): - if light_source.find('LightSourceType').find('Laser') is not None: - ome.instruments[0].lasers.append( - model.Laser( - id=light_source.attrib['Id'], - model=text(light_source.find('Manufacturer').find('Model')), - power=float(text(light_source.find('Power'))), - wavelength=float( - text(light_source.find('LightSourceType').find('Laser').find('Wavelength'))))) - - multi_track_setup = acquisition_block.find('MultiTrackSetup') - for idx, tube_lens in enumerate({text(track_setup.find('TubeLensPosition')) - for track_setup in multi_track_setup}): - ome.instruments[0].objectives.append( - model.Objective(id=f'Objective:Tubelens:{idx}', model=tube_lens, - nominal_magnification=float( - re.findall(r'\d+[,.]\d*', tube_lens)[0].replace(',', '.')) - )) - - for idx, filter_ in enumerate({text(beam_splitter.find('Filter')) - for track_setup in multi_track_setup - for beam_splitter in track_setup.find('BeamSplitters')}): - ome.instruments[0].filter_sets.append( - model.FilterSet(id=f'FilterSet:{idx}', model=filter_) - ) - - for idx, collimator in enumerate({text(track_setup.find('FWFOVPosition')) - for track_setup in multi_track_setup}): - ome.instruments[0].filters.append(model.Filter(id=f'Filter:Collimator:{idx}', model=collimator)) - - x_min = min([f.start[f.axes.index('X')] for f in self.filedict[0, 0, 0]]) - y_min = min([f.start[f.axes.index('Y')] for f in self.filedict[0, 0, 0]]) - x_max = max([f.start[f.axes.index('X')] + f.shape[f.axes.index('X')] for f in self.filedict[0, 0, 0]]) - y_max = max([f.start[f.axes.index('Y')] + f.shape[f.axes.index('Y')] for f in self.filedict[0, 0, 0]]) - size_x = x_max - x_min - size_y = y_max - y_min - size_c, size_z, size_t = (self.reader.shape[self.reader.axes.index(directory_entry)] - for directory_entry in 'CZT') - - image = information.find('Image') - pixel_type = text(image.find('PixelType'), 'Gray16') - if pixel_type.startswith('Gray'): - pixel_type = 'uint' + pixel_type[4:] - objective_settings = image.find('ObjectiveSettings') - scenes = image.find('Dimensions').find('S').find('Scenes') - positions = scenes[0].find('Positions')[0] - um = model.UnitsLength.MICROMETER - nm = model.UnitsLength.NANOMETER - - ome.images.append( - model.Image( - id='Image:0', - name=f"{text(information.find('Document').find('Name'))} #1", - pixels=model.Pixels( - id='Pixels:0', size_x=size_x, size_y=size_y, - size_c=size_c, size_z=size_z, size_t=size_t, - dimension_order='XYCZT', type=pixel_type, - significant_bits=int(text(image.find('ComponentBitCount'))), - big_endian=False, interleaved=False, metadata_only=True), - experimenter_ref=model.ExperimenterRef(id='Experimenter:0'), - instrument_ref=model.InstrumentRef(id='Instrument:0'), - objective_settings=model.ObjectiveSettings( - id=objective_settings.find('ObjectiveRef').attrib['Id'], - medium=text(objective_settings.find('Medium')), - refractive_index=float(text(objective_settings.find('RefractiveIndex')))), - stage_label=model.StageLabel( - name=f'Scene position #0', - x=float(positions.attrib['X']), x_unit=um, - y=float(positions.attrib['Y']), y_unit=um, - z=float(positions.attrib['Z']), z_unit=um))) - - for distance in metadata.find('Scaling').find('Items'): - if distance.attrib['Id'] == 'X': - ome.images[0].pixels.physical_size_x = float(text(distance.find('Value'))) * 1e6 - elif distance.attrib['Id'] == 'Y': - ome.images[0].pixels.physical_size_y = float(text(distance.find('Value'))) * 1e6 - elif size_z > 1 and distance.attrib['Id'] == 'Z': - ome.images[0].pixels.physical_size_z = float(text(distance.find('Value'))) * 1e6 - - channels_im = {channel.attrib['Id']: channel for channel in image.find('Dimensions').find('Channels')} - channels_ds = {channel.attrib['Id']: channel for channel in display_setting.find('Channels')} - channels_ts = {detector.attrib['Id']: track_setup - for track_setup in - experiment.find('ExperimentBlocks').find('AcquisitionBlock').find('MultiTrackSetup') - for detector in track_setup.find('Detectors')} - - for idx, (key, channel) in enumerate(channels_im.items()): - detector_settings = channel.find('DetectorSettings') - laser_scan_info = channel.find('LaserScanInfo') - detector = detector_settings.find('Detector') - try: - binning = model.Binning(text(detector_settings.find('Binning'))) - except ValueError: - binning = model.Binning.OTHER - - filterset = text(channels_ts[key].find('BeamSplitters')[0].find('Filter')) - filterset_idx = [filterset.model for filterset in ome.instruments[0].filter_sets].index(filterset) - - light_sources_settings = channel.find('LightSourcesSettings') - # no space in ome for multiple lightsources simultaneously - if len(light_sources_settings) > idx: - light_source_settings = light_sources_settings[idx] - else: - light_source_settings = light_sources_settings[0] - light_source_settings = model.LightSourceSettings( - id=light_source_settings.find('LightSource').attrib['Id'], - attenuation=float(text(light_source_settings.find('Attenuation'))), - wavelength=float(text(light_source_settings.find('Wavelength'))), - wavelength_unit=nm) - - ome.images[0].pixels.channels.append( - model.Channel( - id=f'Channel:{idx}', - name=channel.attrib['Name'], - acquisition_mode=text(channel.find('AcquisitionMode')), - color=model.Color(text(channels_ds[channel.attrib['Id']].find('Color'), 'white')), - detector_settings=model.DetectorSettings(id=detector.attrib['Id'], binning=binning), - # emission_wavelength=text(channel.find('EmissionWavelength')), # TODO: fix - excitation_wavelength=light_source_settings.wavelength, - filter_set_ref=model.FilterSetRef(id=ome.instruments[0].filter_sets[filterset_idx].id), - illumination_type=text(channel.find('IlluminationType')), - light_source_settings=light_source_settings, - samples_per_pixel=int(text(laser_scan_info.find('Averaging'))))) - - exposure_times = [float(text(channel.find('LaserScanInfo').find('FrameTime'))) for channel in - channels_im.values()] - delta_ts = attachments['TimeStamps'].data() - dt = np.diff(delta_ts) - if np.std(dt) / np.mean(dt) > 0.02: - dt = np.median(dt[dt > 0]) - delta_ts = dt * np.arange(len(delta_ts)) - warnings.warn(f'delta_t is inconsistent, using median value: {dt}') - - for t, z, c in product(range(size_t), range(size_z), range(size_c)): - ome.images[0].pixels.planes.append( - model.Plane(the_c=c, the_z=z, the_t=t, delta_t=delta_ts[t], - exposure_time=exposure_times[c], - position_x=float(positions.attrib['X']), position_x_unit=um, - position_y=float(positions.attrib['Y']), position_y_unit=um, - position_z=float(positions.attrib['Z']), position_z_unit=um)) - - idx = 0 - for layer in [] if (ml := metadata.find('Layers')) is None else ml: - rectangle = layer.find('Elements').find('Rectangle') - if rectangle is not None: - geometry = rectangle.find('Geometry') - roi = model.ROI(id=f'ROI:{idx}', description=text(layer.find('Usage'))) - roi.union.append( - model.Rectangle( - id='Shape:0:0', - height=float(text(geometry.find('Height'))), - width=float(text(geometry.find('Width'))), - x=float(text(geometry.find('Left'))), - y=float(text(geometry.find('Top'))))) - ome.rois.append(roi) - ome.images[0].roi_refs.append(model.ROIRef(id=f'ROI:{idx}')) - idx += 1 - return ome + return OmeParse.get_ome(self.reader, self.filedict) def __frame__(self, c: int = 0, z: int = 0, t: int = 0) -> np.ndarray: f = np.zeros(self.base.shape['yx'], self.dtype) @@ -604,3 +203,392 @@ class Reader(AbstractReader, ABC): @staticmethod def get_index(directory_entry: czifile.DirectoryEntryDV, start: tuple[int]) -> list[tuple[int, int]]: return [(i - j, i - j + k) for i, j, k in zip(directory_entry.start, start, directory_entry.shape)] + + +class OmeParse: + size_x: int + size_y: int + size_c: int + size_z: int + size_t: int + + nm = model.UnitsLength.NANOMETER + um = model.UnitsLength.MICROMETER + + @classmethod + def get_ome(cls, reader: czifile.CziFile, filedict: dict[tuple[int, int, int], Any]) -> OME: + new = cls(reader, filedict) + new.parse() + return new.ome + + def __init__(self, reader: czifile.CziFile, filedict: dict[tuple[int, int, int], Any]) -> None: + self.reader = reader + self.filedict = filedict + xml = reader.metadata() + self.attachments = {i.attachment_entry.name: i.attachment_entry.data_segment() + for i in reader.attachments()} + self.tree = etree.fromstring(xml) + self.metadata = self.tree.find('Metadata') + version = self.metadata.find('Version') + if version is not None: + self.version = version.text + else: + self.version = self.metadata.find('Experiment').attrib['Version'] + + self.ome = OME() + self.information = self.metadata.find('Information') + self.display_setting = self.metadata.find('DisplaySetting') + self.experiment = self.metadata.find('Experiment') + self.acquisition_block = self.experiment.find('ExperimentBlocks').find('AcquisitionBlock') + self.instrument = self.information.find('Instrument') + self.image = self.information.find('Image') + + if self.version == '1.0': + self.experiment = self.metadata.find('Experiment') + self.acquisition_block = self.experiment.find('ExperimentBlocks').find('AcquisitionBlock') + self.multi_track_setup = self.acquisition_block.find('MultiTrackSetup') + else: + self.experiment = None + self.acquisition_block = None + self.multi_track_setup = None + + def parse(self) -> None: + self.get_experimenters() + self.get_instruments() + self.get_detectors() + self.get_objectives() + self.get_tubelenses() + self.get_light_sources() + self.get_filters() + self.get_pixels() + + @staticmethod + def text(item: Optional[Element], default: str = "") -> str: + return default if item is None else item.text + + @staticmethod + def def_list(item: Any) -> list[Any]: + return [] if item is None else item + + @staticmethod + def try_default(fun: Callable[[Any, ...], Any] | type, default: Any = None, *args: Any, **kwargs: Any) -> Any: + try: + return fun(*args, **kwargs) + except Exception: # noqa + return default + + def get_experimenters(self) -> None: + if self.version == '1.0': + self.ome.experimenters = [ + model.Experimenter(id='Experimenter:0', + user_name=self.information.find('User').find('DisplayName').text)] + elif self.version in ('1.1', '1.2'): + self.ome.experimenters = [ + model.Experimenter(id='Experimenter:0', + user_name=self.information.find('Document').find('UserName').text)] + + def get_instruments(self) -> None: + if self.version == '1.0': + self.ome.instruments.append(model.Instrument(id=self.instrument.attrib['Id'])) + elif self.version in ('1.1', '1.2'): + for _ in self.instrument.find('Microscopes'): + self.ome.instruments.append(model.Instrument(id='Instrument:0')) + + def get_detectors(self) -> None: + if self.version == '1.0': + for detector in self.instrument.find('Detectors'): + try: + detector_type = model.Detector_Type(self.text(detector.find('Type')).upper() or "") + except ValueError: + detector_type = model.Detector_Type.OTHER + + self.ome.instruments[0].detectors.append( + model.Detector( + id=detector.attrib['Id'], model=self.text(detector.find('Manufacturer').find('Model')), + amplification_gain=float(self.text(detector.find('AmplificationGain'))), + gain=float(self.text(detector.find('Gain'))), zoom=float(self.text(detector.find('Zoom'))), + type=detector_type + )) + elif self.version in ('1.1', '1.2'): + for detector in self.instrument.find('Detectors'): + try: + detector_type = model.Detector_Type(self.text(detector.find('Type')).upper() or "") + except ValueError: + detector_type = model.Detector_Type.OTHER + + self.ome.instruments[0].detectors.append( + model.Detector( + id=detector.attrib['Id'].replace(' ', ''), + model=self.text(detector.find('Manufacturer').find('Model')), + type=detector_type + )) + + def get_objectives(self) -> None: + for objective in self.instrument.find('Objectives'): + self.ome.instruments[0].objectives.append( + model.Objective( + id=objective.attrib['Id'], + model=self.text(objective.find('Manufacturer').find('Model')), + immersion=self.text(objective.find('Immersion')), + lens_na=float(self.text(objective.find('LensNA'))), + nominal_magnification=float(self.text(objective.find('NominalMagnification'))))) + + def get_tubelenses(self) -> None: + if self.version == '1.0': + for idx, tube_lens in enumerate({self.text(track_setup.find('TubeLensPosition')) + for track_setup in self.multi_track_setup}): + self.ome.instruments[0].objectives.append( + model.Objective(id=f'Objective:Tubelens:{idx}', model=tube_lens, + nominal_magnification=float( + re.findall(r'\d+[,.]\d*', tube_lens)[0].replace(',', '.')) + )) + elif self.version in ('1.1', '1.2'): + for tubelens in self.instrument.find('TubeLenses'): + try: + nominal_magnification = float(re.findall(r'\d+(?:[,.]\d*)?', + tubelens.attrib['Name'])[0].replace(',', '.')) + except Exception: # noqa + nominal_magnification = 1.0 + + self.ome.instruments[0].objectives.append( + model.Objective( + id=f"Objective:{tubelens.attrib['Id']}", + model=tubelens.attrib['Name'], + nominal_magnification=nominal_magnification)) + + def get_light_sources(self) -> None: + if self.version == '1.0': + for light_source in self.def_list(self.instrument.find('LightSources')): + if light_source.find('LightSourceType').find('Laser') is not None: + self.ome.instruments[0].lasers.append( + model.Laser( + id=light_source.attrib['Id'], + model=self.text(light_source.find('Manufacturer').find('Model')), + power=float(self.text(light_source.find('Power'))), + wavelength=float( + self.text(light_source.find('LightSourceType').find('Laser').find('Wavelength'))))) + elif self.version in ('1.1', '1.2'): + for light_source in self.def_list(self.instrument.find('LightSources')): + if light_source.find('LightSourceType').find('Laser') is not None: + self.ome.instruments[0].lasers.append( + model.Laser( + id=f"LightSource:{light_source.attrib['Id']}", + power=float(self.text(light_source.find('Power'))), + wavelength=float(light_source.attrib['Id'][-3:]))) + + def get_filters(self) -> None: + if self.version == '1.0': + for idx, filter_ in enumerate({self.text(beam_splitter.find('Filter')) + for track_setup in self.multi_track_setup + for beam_splitter in track_setup.find('BeamSplitters')}): + self.ome.instruments[0].filter_sets.append( + model.FilterSet(id=f'FilterSet:{idx}', model=filter_) + ) + + def get_pixels(self) -> None: + x_min = min([f.start[f.axes.index('X')] for f in self.filedict[0, 0, 0]]) + y_min = min([f.start[f.axes.index('Y')] for f in self.filedict[0, 0, 0]]) + x_max = max([f.start[f.axes.index('X')] + f.shape[f.axes.index('X')] for f in self.filedict[0, 0, 0]]) + y_max = max([f.start[f.axes.index('Y')] + f.shape[f.axes.index('Y')] for f in self.filedict[0, 0, 0]]) + self.size_x = x_max - x_min + self.size_y = y_max - y_min + self.size_c, self.size_z, self.size_t = (self.reader.shape[self.reader.axes.index(directory_entry)] + for directory_entry in 'CZT') + image = self.information.find('Image') + pixel_type = self.text(image.find('PixelType'), 'Gray16') + if pixel_type.startswith('Gray'): + pixel_type = 'uint' + pixel_type[4:] + objective_settings = image.find('ObjectiveSettings') + + self.ome.images.append( + model.Image( + id='Image:0', + name=f"{self.text(self.information.find('Document').find('Name'))} #1", + pixels=model.Pixels( + id='Pixels:0', size_x=self.size_x, size_y=self.size_y, + size_c=self.size_c, size_z=self.size_z, size_t=self.size_t, + dimension_order='XYCZT', type=pixel_type, + significant_bits=int(self.text(image.find('ComponentBitCount'))), + big_endian=False, interleaved=False, metadata_only=True), + experimenter_ref=model.ExperimenterRef(id='Experimenter:0'), + instrument_ref=model.InstrumentRef(id='Instrument:0'), + objective_settings=model.ObjectiveSettings( + id=objective_settings.find('ObjectiveRef').attrib['Id'], + medium=self.text(objective_settings.find('Medium')), + refractive_index=float(self.text(objective_settings.find('RefractiveIndex')))), + stage_label=model.StageLabel( + name=f'Scene position #0', + x=self.positions[0], x_unit=self.um, + y=self.positions[1], y_unit=self.um, + z=self.positions[2], z_unit=self.um))) + + for distance in self.metadata.find('Scaling').find('Items'): + if distance.attrib['Id'] == 'X': + self.ome.images[0].pixels.physical_size_x = float(self.text(distance.find('Value'))) * 1e6 + elif distance.attrib['Id'] == 'Y': + self.ome.images[0].pixels.physical_size_y = float(self.text(distance.find('Value'))) * 1e6 + elif self.size_z > 1 and distance.attrib['Id'] == 'Z': + self.ome.images[0].pixels.physical_size_z = float(self.text(distance.find('Value'))) * 1e6 + + @cached_property + def positions(self) -> tuple[float, float, Optional[float]]: + if self.version == '1.0': + scenes = self.image.find('Dimensions').find('S').find('Scenes') + positions = scenes[0].find('Positions')[0] + return float(positions.attrib['X']), float(positions.attrib['Y']), float(positions.attrib['Z']) + elif self.version in ('1.1', '1.2'): + try: # TODO + scenes = self.image.find('Dimensions').find('S').find('Scenes') + center_position = [float(pos) for pos in self.text(scenes[0].find('CenterPosition')).split(',')] + except AttributeError: + center_position = [0, 0] + return center_position[0], center_position[1], None + + @cached_property + def channels_im(self) -> dict: + return {channel.attrib['Id']: channel for channel in self.image.find('Dimensions').find('Channels')} + + @cached_property + def channels_ds(self) -> dict: + return {channel.attrib['Id']: channel for channel in self.display_setting.find('Channels')} + + @cached_property + def channels_ts(self) -> dict: + return {detector.attrib['Id']: track_setup + for track_setup in + self.experiment.find('ExperimentBlocks').find('AcquisitionBlock').find('MultiTrackSetup') + for detector in track_setup.find('Detectors')} + + def get_channels(self) -> None: + if self.version == '1.0': + for idx, (key, channel) in enumerate(self.channels_im.items()): + detector_settings = channel.find('DetectorSettings') + laser_scan_info = channel.find('LaserScanInfo') + detector = detector_settings.find('Detector') + try: + binning = model.Binning(self.text(detector_settings.find('Binning'))) + except ValueError: + binning = model.Binning.OTHER + + filterset = self.text(self.channels_ts[key].find('BeamSplitters')[0].find('Filter')) + filterset_idx = [filterset.model for filterset in self.ome.instruments[0].filter_sets].index(filterset) + + light_sources_settings = channel.find('LightSourcesSettings') + # no space in ome for multiple lightsources simultaneously + if len(light_sources_settings) > idx: + light_source_settings = light_sources_settings[idx] + else: + light_source_settings = light_sources_settings[0] + light_source_settings = model.LightSourceSettings( + id=light_source_settings.find('LightSource').attrib['Id'], + attenuation=float(self.text(light_source_settings.find('Attenuation'))), + wavelength=float(self.text(light_source_settings.find('Wavelength'))), + wavelength_unit=self.nm) + + self.ome.images[0].pixels.channels.append( + model.Channel( + id=f'Channel:{idx}', + name=channel.attrib['Name'], + acquisition_mode=self.text(channel.find('AcquisitionMode')), + color=model.Color(self.text(self.channels_ds[channel.attrib['Id']].find('Color'), 'white')), + detector_settings=model.DetectorSettings(id=detector.attrib['Id'], binning=binning), + # emission_wavelength=text(channel.find('EmissionWavelength')), # TODO: fix + excitation_wavelength=light_source_settings.wavelength, + filter_set_ref=model.FilterSetRef(id=self.ome.instruments[0].filter_sets[filterset_idx].id), + illumination_type=self.text(channel.find('IlluminationType')), + light_source_settings=light_source_settings, + samples_per_pixel=int(self.text(laser_scan_info.find('Averaging'))))) + elif self.version in ('1.1', '1.2'): + for idx, (key, channel) in enumerate(self.channels_im.items()): + detector_settings = channel.find('DetectorSettings') + laser_scan_info = channel.find('LaserScanInfo') + detector = detector_settings.find('Detector') + try: + color = model.Color(self.text(self.channels_ds[channel.attrib['Id']].find('Color'), 'white')) + except Exception: # noqa + color = None + try: + if (i := self.text(channel.find('EmissionWavelength'))) != '0': + emission_wavelength = float(i) + else: + emission_wavelength = None + except Exception: # noqa + emission_wavelength = None + if laser_scan_info is not None: + samples_per_pixel = int(self.text(laser_scan_info.find('Averaging'), '1')) + else: + samples_per_pixel = 1 + try: + binning = model.Binning(self.text(detector_settings.find('Binning'))) + except ValueError: + binning = model.Binning.OTHER + + light_sources_settings = channel.find('LightSourcesSettings') + # no space in ome for multiple lightsources simultaneously + if light_sources_settings is not None: + light_source_settings = light_sources_settings[0] + light_source_settings = model.LightSourceSettings( + id='LightSource:' + '_'.join([light_source_settings.find('LightSource').attrib['Id'] + for light_source_settings in light_sources_settings]), + attenuation=self.try_default(float, None, self.text(light_source_settings.find('Attenuation'))), + wavelength=self.try_default(float, None, self.text(light_source_settings.find('Wavelength'))), + wavelength_unit=self.nm) + else: + light_source_settings = None + + self.ome.images[0].pixels.channels.append( + model.Channel( + id=f'Channel:{idx}', + name=channel.attrib['Name'], + acquisition_mode=self.text(channel.find('AcquisitionMode')).replace( + 'SingleMoleculeLocalisation', 'SingleMoleculeImaging'), + color=color, + detector_settings=model.DetectorSettings( + id=detector.attrib['Id'].replace(' ', ""), + binning=binning), + emission_wavelength=emission_wavelength, + excitation_wavelength=self.try_default(float, None, + self.text(channel.find('ExcitationWavelength'))), + # filter_set_ref=model.FilterSetRef(id=ome.instruments[0].filter_sets[filterset_idx].id), + illumination_type=self.text(channel.find('IlluminationType')), + light_source_settings=light_source_settings, + samples_per_pixel=samples_per_pixel)) + + def get_planes(self) -> None: + try: + exposure_times = [float(self.text(channel.find('LaserScanInfo').find('FrameTime'))) + for channel in self.channels_im.values()] + except Exception: # noqa + exposure_times = [None] * len(self.channels_im) + delta_ts = self.attachments['TimeStamps'].data() + dt = np.diff(delta_ts) + if np.std(dt) / np.mean(dt) > 0.02: + dt = np.median(dt[dt > 0]) + delta_ts = dt * np.arange(len(delta_ts)) + warnings.warn(f'delta_t is inconsistent, using median value: {dt}') + + for t, z, c in product(range(self.size_t), range(self.size_z), range(self.size_c)): + self.ome.images[0].pixels.planes.append( + model.Plane(the_c=c, the_z=z, the_t=t, delta_t=delta_ts[t], + exposure_time=exposure_times[c], + position_x=self.positions[0], position_x_unit=self.um, + position_y=self.positions[1], position_y_unit=self.um, + position_z=self.positions[2], position_z_unit=self.um)) + + def get_annotations(self) -> None: + idx = 0 + for layer in [] if (ml := self.metadata.find('Layers')) is None else ml: + rectangle = layer.find('Elements').find('Rectangle') + if rectangle is not None: + geometry = rectangle.find('Geometry') + roi = model.ROI(id=f'ROI:{idx}', description=self.text(layer.find('Usage'))) + roi.union.append( + model.Rectangle( + id='Shape:0:0', + height=float(self.text(geometry.find('Height'))), + width=float(self.text(geometry.find('Width'))), + x=float(self.text(geometry.find('Left'))), + y=float(self.text(geometry.find('Top'))))) + self.ome.rois.append(roi) + self.ome.images[0].roi_refs.append(model.ROIRef(id=f'ROI:{idx}')) + idx += 1 diff --git a/pyproject.toml b/pyproject.toml index d1d5f0c..5b7bbec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "ndbioimage" -version = "2024.4.6" +version = "2024.4.7" description = "Bio image reading, metadata and some affine registration." authors = ["W. Pomp "] license = "GPLv3" @@ -12,7 +12,7 @@ exclude = ["ndbioimage/jars"] [tool.poetry.dependencies] python = "^3.10" -numpy = "*" +numpy = ">=1.20.0" pandas = "*" tifffile = "*" czifile = "2019.7.2"