Source code for processor.utilities.io

# -*- coding: utf-8 -*-

from . import misc
import os

import h5py
import numpy as np
import tifffile
import xarray as xr


default_units = {
    'dldPosX': 'step', 
    'dldPosY': 'step', 
    'dldTime': 'step', 
    'tofVoltage':'V',
    'extractorVoltage':'V',
    'extractorCurrent':'V',
    'cryoTemperature':'K',
    'sampleTemperature':'K',
    'dldTimeBinSize':'ns',
    'delayStage':'ps',
    'streakCamera':'ps',
    'bam':'ps',
    'monochromatorPhotonEnergy':'eV',
    'timeStamp':'s',
    'dldTime_corrected':'ns',
    'energy':'eV',
    'kx':'1/A',
    'ky':'1/A'}

[docs]def res_to_xarray(res, binNames, binAxes, metadata=None): """ creates a BinnedArray (xarray subclass) out of the given np.array Parameters: res: np.array nd array of binned data binNames (list): list of names of the binned axes binAxes (list): list of np.arrays with the values of the axes Returns: ba: BinnedArray (xarray) an xarray-like container with binned data, axis, and all available metadata """ dims = binNames coords = {} for name, vals in zip(binNames, binAxes): coords[name] = vals xres = xr.DataArray(res, dims=dims, coords=coords) for name in binNames: try: xres[name].attrs['unit'] = default_units[name] except KeyError: pass xres.attrs['units'] = 'counts' xres.attrs['long_name'] = 'photoelectron counts' if metadata is not None: for k, v in metadata.items(): xres.attrs[k] = v return xres
[docs]def save_binned(data, file_name, format='h5', path=None, mode='w'): """ Save a binned numpy array to h5 file. The file includes the axes (taken from the scheduled bins) and the delay stage histogram, if it exists. Parameters: file_name: str Name of the saved file. The extension '.h5' is automatically added. path: str | None File path. mode: str | 'w' Write mode of h5 file ('w' = write). """ _abort = False if path is None: path = misc.parse_setting('paths', 'DATA_H5_DIR') #TODO: generalise to get path from any settings, as in processor. if not os.path.isdir(path): # test if the path exists... answer = input("The folder {} doesn't exist," "do you want to create it? [y/n]".format(path)) if 'y' in answer: os.makedirs(path) else: _abort = True if format == 'h5': filename = '{}.h5'.format(file_name) elif 'tif' in format: filename = '{}.tiff'.format(file_name) if os.path.isfile(path + filename): answer = input("A file named {} already exists. Overwrite it? " "[y/n or r for rename]".format(filename)) if answer.lower() in ['y', 'yes', 'ja']: pass elif answer.lower() in ['n', 'no', 'nein']: _abort = True else: filename = input("choose a new name:") if not _abort: if format == 'h5': xarray_to_h5(data, path + filename, mode) elif 'tif' in format: to_tiff(data, path + filename)
[docs]def xarray_to_h5(data, faddr, mode='w'): """ Save xarray formatted data to hdf5 Args: data (xarray.DataArray): input data faddr (str): complete file name (including path) mode (str): hdf5 read/write mode Returns: """ with h5py.File(faddr, mode) as h5File: print(f'saving data to {faddr}') if 'pumpProbeTime' in data.dims: idx = data.dims.index('pumpProbeTime') pp_data = np.swapaxes(data.data, 0, idx) elif 'delayStage' in data.dims: idx = data.dims.index('delayStage') pp_data = np.swapaxes(data.data, 0, idx) else: pp_data = None # Saving data ff = h5File.create_group('binned') if pp_data is None: # in case there is no time axis, make a single dataset ff.create_dataset('f{:04d}'.format(0), data=data.data) else: for i in range(pp_data.shape[0]): # otherwise make a dataset for each time frame. ff.create_dataset('f{:04d}'.format(i), data=pp_data[i, ...]) # Saving axes aa = h5File.create_group("axes") # aa.create_dataset('axis_order', data=data.dims) ax_n = 1 for binName in data.dims: if binName in ['pumpProbeTime', 'delayStage']: ds = aa.create_dataset(f'ax0 - {binName}', data=data.coords[binName]) ds.attrs['name'] = binName else: ds = aa.create_dataset(f'ax{ax_n} - {binName}', data=data.coords[binName]) ds.attrs['name'] = binName ax_n += 1 # Saving delay histograms # hh = h5File.create_group("histograms") # if hasattr(data, 'delaystageHistogram'): # hh.create_dataset( # 'delaystageHistogram', # data=data.delaystageHistogram) # if hasattr(data, 'pumpProbeHistogram'): # hh.create_dataset( # 'pumpProbeHistogram', # data=data.pumpProbeHistogram)\ meta_group = h5File.create_group('metadata') for k, v in data.attrs.items(): g = meta_group.create_group(k) for kk, vv in v.items(): try: g.create_dataset(kk, data=vv) except TypeError: if isinstance(vv, dict): for kkk, vvv in vv.items(): g.create_dataset(f'{kk}/{kkk}', data=vvv) else: g.create_dataset(kk, data=str(vv)) print('Saving complete!')
[docs]def array_to_tiff(array,filename): """ Save array to imageJ compatible tiff stack. Args: array (np.array): array to save. 2D,3D or 4D filename (str): full path and name of file to save. """ if array.ndim == 2: out = np.expand_dims(array,[0,1,2,5]) elif array.ndim == 3: out = np.expand_dims(array,[0,2,5]) elif array.ndim == 4: out = np.expand_dims(array, [2, 5]) else: raise NotImplementedError('Only 2-3-4D arrays supported.') tifffile.imwrite(filename, out.astype(np.float32), imagej=True) print(f'Successfully saved array with shape {array.shape} to {filename}')
[docs]def xarray_to_tiff(data, filename, axis_dict=None): """ Save data to tiff file. Args: data: xarray.DataArray data to be saved. ImageJ likes tiff files with axis order as TZCYXS. Therefore, best axis order in input should be: Time, Energy, posY, posX. The channels 'C' and 'S' are automatically added and can be ignored. filename: str full path and name of file to save. axis_dict: dict name pairs for correct axis ordering. Keys should be any of T,Z,C,Y,X,S. The Corresponding value will be searched among the dimensions of the xarray, and placed in the right order for imagej stacks metadata. units: bool Not implemented. Will be used to set units in the tif stack TODO: expand imagej metadata to include physical units """ assert isinstance(data, xr.DataArray), 'Data must be an xarray.DataArray' dims_to_add = {'C': 1, 'S': 1} dims_order = [] if axis_dict is None: axis_dict = {'T': ['delayStage','pumpProbeTime','time'], 'Z': ['dldTime','energy'], 'C': ['C'], 'Y': ['dldPosY','ky'], 'X': ['dldPosX','kx'], 'S': ['S']} else: for key in ['T', 'Z', 'C', 'Y', 'X', 'S']: if key not in axis_dict.keys(): axis_dict[key] = key # Sort the dimensions in the correct order, and fill with one-point dimensions # the missing axes. for key in ['T', 'Z', 'C', 'Y', 'X', 'S']: axis_name_list = [name for name in axis_dict[key] if name in data.dims] if len(axis_name_list) > 1: raise AttributeError(f'Too many dimensions for {key} axis.') elif len(axis_name_list) == 1: dims_order.append(*axis_name_list) else: dims_to_add[key] = 1 dims_order.append(key) print(f'tif stack dimension order: {dims_order}') xres = data.expand_dims(dims_to_add) xres = xres.transpose(*dims_order) if '.tif' not in filename: filename += '.tif' try: tifffile.imwrite(filename, xres.values.astype(np.float32), imagej=True) except: tifffile.imsave(filename, xres.values.astype(np.float32), imagej=True) # resolution=(1./2.6755, 1./2.6755),metadata={'spacing': 3.947368, 'unit': 'um'}) print(f'Successfully saved {filename}')
[docs]def to_tiff(data,filename,axis_dict=None): """ save array to imagej tiff sack.""" if isinstance(data,xr.DataArray): xarray_to_tiff(data,filename,axis_dict=axis_dict) elif isinstance(data,np.array): array_to_tiff(data,filename) else: raise TypeError('Input data must be a numpy array or xarray DataArray')