diff --git a/issfile/__init__.py b/issfile/__init__.py new file mode 100644 index 0000000..022a35a --- /dev/null +++ b/issfile/__init__.py @@ -0,0 +1,109 @@ +import sys +import zipfile +import xml.etree.ElementTree as ET +import numpy as np +import re +from struct import unpack +from tiffwrite import IJTiffFile +from tqdm.auto import tqdm +from itertools import product +from yaml import safe_load + + +class IssTrack: + def __init__(self, file): + self.zip = zipfile.ZipFile(file) + self.data = self.zip.open('data/PrimaryDecayData.bin') + self.metadata = ET.fromstring(self.zip.read('dataProps/Core.xml')) + dimensions = self.metadata.find('Dimensions') + size_t = int(dimensions.find('TimeSeriesCount').text) + size_c = int(dimensions.find('ChannelCount').text) + size_y = int(dimensions.find('FrameHeight').text) + size_x = int(dimensions.find('FrameWidth').text) + # X, Y, channels, n_images, n_carpets + self.shape = size_x, size_y, size_c, size_t // 2 + size_t % 2, size_t // 2 + self.exposure_time = float(self.metadata.find('FrameIntervalTime').text) + self.pxsize = float(self.metadata.find('Boundary').find('FrameWidth').text) / self.shape[0] + + self.alba_metadata = safe_load('\n'.join([IssTrack.parse_line(line) + for line in self.metadata.find('AlbaSystemSettings').find('withComments').text.splitlines()])) + particle_tracking = self.alba_metadata['ParticleTracking'] + self.points_per_orbit = particle_tracking['ScanCirclePointCount'] + self.n_orbits = particle_tracking['OrbitCountPerTrack'] + self.orbit_time = particle_tracking['PacketTrackTime_ms'] + self.start_times = [float(re.match(r'[.\d]+', value).group(0)) + for key, value in self.alba_metadata.items() + if re.match(r'Time Series (\d+) started at', key)] + self.time_interval = np.mean(np.diff(self.start_times)[1::2]) + self.delta = self.shape[2] * 2 ** int(np.ceil(np.log2(self.time_interval * 1000 / self.orbit_time))) + self.delta = 768 # TODO: figure out if this number is the same for all files + + def __enter__(self): + return self + + def __exit__(self, *args, **kwargs): + self.close() + + def close(self): + try: + self.data.close() + finally: + self.zip.close() + + def get_image(self, c, t): + assert c < self.shape[2] and t < self.shape[3], \ + f'carpet {c = }, {t = } not in shape {self.shape[2]}, {self.shape[3]}' + frame = c + 2 * t * self.shape[2] + frame_bytes = self.shape[0] * self.shape[1] * self.delta + data = [] + for address in range(frame * frame_bytes, (frame + 1) * frame_bytes, self.delta): + self.data.seek(address) + data.append(unpack('=2022.10.0'], + entry_points={'console_scripts': ['iss2tiff=issfile:main']} +)