# -*- coding: utf-8 -*-
import configparser
import os
from datetime import datetime
import math
import h5py
import numpy as np
import psutil
import ast
import os
from configparser import ConfigParser
from collections import OrderedDict
# from processor import DldFlashDataframeCreator as DldFlashProcessor
# ================================================================================
"""Functions for calculation of pulse energy and pulse energy density of optical laser.
Calibration values taken from Pump beam energy converter 800 400.xls
Units are uJ for energy, um for beam diameter, uJ/cm^2 for energy density (and arb. for diode signal)
"""
[docs]def PulseEnergy400(Diode):
"""Calculate the pulse energy of 400nm laser in uJ. The unit is um for beam diameter.
Parameter:
Diode: numeric
Measured value from photodiode (arb. units)
"""
return 0.86 * (Diode * 0.0008010439 + 0.0352573)
[docs]def PulseEnergy800(Diode):
"""Calculate the pulse energy of 800nm laser in uJ. The unit is um for beam diameter.
Parameter:
Diode: numeric
Meausred value from photodiode (arb. units)
"""
return 0.86 * (Diode * 0.0009484577 + 0.1576)
[docs]def EnergyDensity400(Diode, Diameter=600):
"""Calculate the pulse energy density of 400nm laser in uJ/cm^2.
The units are um for beam diameter, uJ/cm^2 for energy density.
Parameters:
Diode: numeric
Measured value from photodiode (arb. units)
Diameter: numeric | 600
Beam diameter.
"""
return PulseEnergy400(Diode) / (np.pi * np.square((Diameter * 0.0001) / 2))
[docs]def EnergyDensity800(Diode, Diameter=600):
"""Calculate the pulse energy density of 800nm laser in uJ/cm^2.
The units are um for beam diameter, uJ/cm^2 for energy density.
Parameters:
Diode: numeric
Measured value from photodiode (arb. units)
Diameter: numeric | 600
Beam diameter.
"""
return PulseEnergy800(Diode) / (np.pi * np.square((Diameter * 0.0001) / 2))
# %% Settings
# ================================================================================
[docs]def parse_category(category, settings_file='default'):
""" parse setting file and return desired value
Args:
category (str): title of the category
setting_file (str): path to setting file. If set to 'default' it takes
a file called SETTINGS.ini in the main folder of the repo.
Returns:
dictionary containing name and value of all entries present in this
category.
Notes:
Author: Steinn Ymir Agustsson <sagustss@uni-mainz.de>
"""
settings = ConfigParser()
if settings_file == 'default':
current_path = os.path.dirname(__file__)
while not os.path.isfile(os.path.join(current_path, 'SETTINGS.ini')):
current_path = os.path.split(current_path)[0]
settings_file = os.path.join(current_path, 'SETTINGS.ini')
settings.read(settings_file)
try:
cat_dict = {}
for k, v in settings[category].items():
try:
if v[0] == "/":
cat_dict[k] = str(v)
else:
cat_dict[k] = ast.literal_eval(v)
except ValueError:
cat_dict[k] = v
return cat_dict
except KeyError:
print('No category {} found in SETTINGS.ini'.format(category))
[docs]def parse_setting(category, name, settings_file='default'):
""" parse setting file and return desired value
Args:
category: str
title of the category
name: str
name of the parameter
setting_file: str | 'default'
path to setting file. If set to 'default' it takes a file called SETTINGS.ini
in the main folder of the repo.
Returns:
value of the parameter, None if parameter cannot be found.
Notes:
Author: Steinn Ymir Agustsson <sagustss@uni-mainz.de>
"""
settings = ConfigParser()
if settings_file == 'default':
current_path = os.path.dirname(__file__)
while not os.path.isfile(os.path.join(current_path, 'SETTINGS.ini')):
current_path = os.path.split(current_path)[0]
settings_file = os.path.join(current_path, 'SETTINGS.ini')
settings.read(settings_file)
try:
value = settings[category][name]
# if value[0] == "/":
if os.path.isdir(value):
return str(value)
else:
try:
return ast.literal_eval(value)
except SyntaxError:
return str(value)
except KeyError:
print('No entry {} in category {} found in SETTINGS.ini'.format(name, category))
return None
except ValueError:
return settings[category][name]
[docs]def write_setting(value, category, name, settings_file='default'):
""" Write enrty in the settings file
Args:
category (str): title of the category
name (str): name of the parameter
setting_file (str): path to setting file. If set to 'default' it takes
a file called SETTINGS.ini in the main folder of the repo.
Returns:
value of the parameter, None if parameter cannot be found.
Notes:
Author: Steinn Ymir Agustsson <sagustss@uni-mainz.de>
"""
settings = ConfigParser()
if settings_file == 'default':
current_path = os.path.dirname(__file__)
while not os.path.isfile(os.path.join(current_path, 'SETTINGS.ini')):
current_path = os.path.split(current_path)[0]
settings_file = os.path.join(current_path, 'SETTINGS.ini')
settings.read(settings_file)
settings[category][name] = str(value)
with open(settings_file, 'w') as configfile:
settings.write(configfile)
[docs]def parse_logbook(log_text):
""" Parse a log book entry to read out metadata.
Args:
log_text (str or file): file or plain text of the log book, in the
"correct" format TODO: add example of log entry
Returns:
logDict (dict): Dictionary with the relevant metadata
Notes:
Author: Steinn Ymir Agustsson <sagustss@uni-mainz.de>
"""
assert isinstance(log_text, str) or os.path.isfile(log_text), 'Unrecognized format for logbook text'
if os.path.isfile(log_text):
with open(log_text, 'r') as f:
text = f.read()
else:
text = log_text
logDict = OrderedDict()
t_split = text.split('\nFEL:')
logDict['comments'] = t_split.pop(0)
text = 'FEL:{}'.format(t_split[0])
log_sections = []
for line in text.split('\n'):
log_sections.append(line.strip())
log_sections = '|'.join([x.strip() for x in text.split('\n')]).split('||')
for section in log_sections:
while section[:1] == '|':
section = section[1:]
slist = section.split('|')
title = slist[0].split(':')
name = title.pop(0)
logDict[name] = OrderedDict()
try:
status = title[0].strip()
if status != '':
logDict[name]['status'] = title[0].strip()
except:
pass
for line in slist[1:]:
linelist = line.replace(':', '=').split('=')
try:
logDict[name][linelist[0].strip()] = linelist[1].strip()
except IndexError:
logDict[name][linelist[0].strip()] = None
return logDict
# %% Math
# ================================================================================
[docs]def radius(df, center=(0, 0)):
""" Calculate the radius.
"""
return np.sqrt(np.square(df.posX - center[0]) + np.square(df.posY - center[1]))
[docs]def argnearest(array, val, rettype='vectorized'):
"""Find the coordinates of the nD array element nearest to a specified value.
Args:
array (np.array): Numeric data array
val (int or float) : Look-up value
rettype (:obj:`str`,optional): return type specification
'vectorized' (default) denotes vectorized coordinates (integer)
'coordinates' denotes multidimensional coordinates (tuple)
Returns:
argval (int): coordinate position
"""
vnz = np.abs(array - val)
argval = np.argmin(vnz)
if rettype == 'vectorized':
return argval
elif rettype == 'coordinates':
return np.unravel_index(argval, array.shape)
# %% Data Input/Output
# ================================================================================
def parse_h5_keys(d,prefix=''):
l = []
for k in d.keys():
try:
[l.append(s) for s in parse_h5_keys(d[k],prefix=prefix + '/' + k)]
except:
l.append(prefix + '/' + k)
return l
[docs]def save_H5_hyperstack(data_array, filename, path=None, overwrite=True):
""" Saves an hdf5 file with 4D (Kx,Ky,E,Time) images for import in FIJI
Parameters:
data_array: numpy array
4D data array, order must be Kx,Ky,Energy,Time.
filename: str
The name of the file to save
path: str
The path to where to save hdf5 file. If None, uses the "results" folder from SETTINGS.ini.
overwrite: str
If true, it overwrites existing file with the same
name. Otherwise raises and error.
"""
mode = "w-" # fail if file exists
if overwrite:
mode = "w"
if path is None:
settings = configparser.ConfigParser()
settings.read('SETTINGS.ini')
path = settings['paths']['RESULTS_PATH']
filepath = path + filename
if not os.path.isdir(path):
os.makedirs(path)
if os.path.exists(
filepath): # create new files every time, with new trailing number
i = 1
new_filepath = filepath + "_1"
while os.path.exists(new_filepath):
new_filepath = filepath + "_{}".format(i)
i += 1
filepath = new_filepath
f = h5py.File(filepath, mode)
pumpProbeTimeSteps = len(data_array[..., :])
print(
'Creating HDF5 dataset with {} time steps'.format(pumpProbeTimeSteps))
for timeStep in range(pumpProbeTimeSteps):
xyeData = data_array[..., timeStep]
dset = f.create_dataset(
"experiment/xyE_tstep{}".format(timeStep),
xyeData.shape,
dtype='float64')
dset[...] = xyeData
print("Created file " + filepath)
[docs]def load_binned_h5(file_name, mode='r', ret_type='list'):
""" Load an HDF5 file saved with ``save_binned()`` method.
Args:
file_name (str): name of the file to load, including full path
mode (:obj:`str`, optional): Read mode of h5 file ('r' = read).
ret_type (:obj:`str`, optional): output format for axes and histograms:
'list' generates a list of arrays, ordered as
the corresponding dimensions in data. 'dict'
generates a dictionary with the names of each axis.
Returns:
data np.array: Multidimensional data read from h5 file.
axes np.array: The axes values associated with the read data.
hist np.array: Histogram values associated with the read data.
"""
if file_name[-3:] == '.h5':
filename = file_name
else:
filename = '{}.h5'.format(file_name)
with h5py.File(filename, mode) as h5File:
# Retrieving binned data
frames = h5File['frames']
data = []
if len(frames) == 1:
data = np.array(frames['f0000'])
else:
for frame in frames:
data.append(np.array(frames[frame]))
data = np.array(data)
# Retrieving axes
axes = [0 for i in range(len(data.shape))]
axes_d = {}
for ax in h5File['axes/']:
vals = h5File['axes/' + ax][()]
# axes_d[ax] = vals
idx = int(ax.split(' - ')[0][2:])
if len(frames) == 1: # shift index to compensate missing time dimension
idx -= 1
axes[idx] = vals
# Retrieving delay histograms
hists = []
hists_d = {}
for hist in h5File['histograms/']:
hists_d[hist] = h5File['histograms/' + hist][()]
hists.append(h5File['histograms/' + hist][()])
if ret_type == 'list':
return data, axes, hists
elif ret_type == 'dict':
return data, axes_d, hists_d
[docs]def get_available_runs(rootpath): # TODO: store the resulting dictionary to improve performance.
""" Collects the filepaths to the available experimental run data.
Parameters:
rootpath: str
path where to look for data (recursive in subdirectories)
Return:
available_runs: dict
dict with run numbers as keys (e.g. 'run12345') and path where to load data from as str.
"""
available_runs = {}
for dir in os.walk(rootpath):
if 'fl1user2' in dir[0]:
try:
run_path = dir[0][:-8]
for name in dir[2]:
runNumber = name.split('_')[4]
if runNumber not in available_runs:
available_runs[runNumber] = run_path
except: # TODO: use an assertion method for more solid error tracking.
pass
return available_runs
[docs]def get_path_to_run(runNumber, rootpath): # TODO: improve performance
""" Returns the path to the data of a given run number
Parameters:
runNumber: str or int
run number as integer or string.
rootpath: str
path where to look for data (recursive in subdirectories)
Return:
path: str
path to where the raw data of the given run number is stored.
"""
available_runs = get_available_runs(rootpath)
try:
return (available_runs['run{}'.format(runNumber)])
except KeyError:
raise FileNotFoundError('No run number {} under path {}'.format(runNumber, rootpath))
def availableParquet(parquet_dir=None):
if parquet_dir is None:
import configparser
settings = configparser.ConfigParser() # TODO: find a smarter way
if os.path.isfile(os.path.join(os.path.dirname(__file__), 'SETTINGS.ini')):
settings.read(os.path.join(os.path.dirname(__file__), 'SETTINGS.ini'))
else:
settings.read(os.path.join(os.path.dirname(os.path.dirname(__file__)), 'SETTINGS.ini'))
parquet_dir = settings['paths']['DATA_PARQUET_DIR']
return [x[:-3] for x in os.listdir(parquet_dir) if '_el' in x]
# %% mathematical functions
# ================================================================================
def gaussian2D(x,y, amplitude, xo, yo, sigma_x, sigma_y, theta, offset):
# x, y = M
xo = float(xo)
yo = float(yo)
a = (np.cos(theta)**2)/(2*sigma_x**2) + (np.sin(theta)**2)/(2*sigma_y**2)
b = -(np.sin(2*theta))/(4*sigma_x**2) + (np.sin(2*theta))/(4*sigma_y**2)
c = (np.sin(theta)**2)/(2*sigma_x**2) + (np.cos(theta)**2)/(2*sigma_y**2)
return offset + amplitude*np.exp( - (a*((x-xo)**2) + 2*b*(x-xo)*(y-yo)
+ c*((y-yo)**2)))
def lorentzian2D(x,y, amp, mux, muy, g, c):
numerator = np.abs(amp * g)
denominator = ((x - mux) ** 2 + (y - muy) ** 2 + g ** 2) ** 1.5
return numerator / denominator + c
def multi_lorentzian2D(M,*args):
x,y = M
arr = np.zeros(x.shape)
n=7
for i in range(len(args)//n):
arr += lorentzian2D(M, *args[i*n:i*n+n])
return arr
def multi_gaussian2D(M, *args):
x,y = M
arr = None
n=7
for i in range(len(args)//n):
if arr is None:
arr = gaussian2D(x,y, *args[i*n:i*n+n])
else:
arr += gaussian2D(x,y, *args[i*n:i*n+n])
return arr
# %% String operations
# ================================================================================
[docs]def camelCaseIt(snake_case_string):
""" Format a string in camel case
"""
titleCaseVersion = snake_case_string.title().replace("_", "")
camelCaseVersion = titleCaseVersion[0].lower() + titleCaseVersion[1:]
return camelCaseVersion
[docs]def repr_byte_size(size_bytes):
""" Represent in a string the size in Bytes in a compact format.
Adapted from https://stackoverflow.com/questions/5194057/better-way-to-convert-file-sizes-in-python
Follows same notation as Windows does for files. See: https://en.wikipedia.org/wiki/Mebibyte
"""
if size_bytes == 0:
return "0B"
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
i = int(math.floor(math.log(size_bytes, 1024)))
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return "%s %s" % (s, size_name[i])
# %% plotting
# # ================================================================================
[docs]def plot_lines(data, normalization='None', range=None, color_range=(0, 1),
x_label='', y_label='', xlim=None, ylim=None, savefig=False,
save_dir='E:/data/FLASH/', save_name='fig', static_curve=None):
""" function to fit a series of curves with nice colorplot. """
from matplotlib import pyplot as plt, cm
f, axis = plt.subplots(1, 1, figsize=(8, 6), sharex=True)
if range is None:
from_ = 0
to_ = len(data[:, ...])
else:
from_ = range[0]
to_ = range[1]
n_curves = len(data[from_:to_, 0])
print('Number of curves: {}'.format(n_curves))
cm_subsection = np.linspace(color_range[0], color_range[1], n_curves)
colors = [cm.coolwarm(1 - x) for x in cm_subsection]
for i, color in enumerate(colors[from_:to_]):
label = '{}'.format(i) # 20*(i+from_),
curve = data[i + from_, :] # result_unpumped[i]
if normalization == 'sum':
curve /= curve.sum()
elif normalization == 'max':
curve /= curve.max()
axis.plot(curve, '-', color=color, label=label)
# axis[1].plot(x_axis_energy,curve_pump, '-', color=color,label=label)
if static_curve is not None:
plt.plot(static_curve, '--', color='black', label='static')
plt.grid()
plt.legend(fontsize='large')
plt.xlabel(x_label, fontsize='xx-large')
plt.ylabel(y_label, fontsize='xx-large')
plt.xticks(fontsize='large')
plt.yticks(fontsize='large')
if xlim is not None:
plt.xlim(xlim[0], xlim[1])
if ylim is not None:
plt.ylim(ylim[0], ylim[1])
if savefig:
plt.savefig('{}{}.png'.format(save_dir, save_name),
dpi=200, facecolor='w', edgecolor='w', orientation='portrait',
papertype=None, format=None, transparent=True, bbox_inches=None,
pad_inches=0.1, frameon=None)
plt.show()
# ==================
# Methods by Steinn!
# ==================
def get_system_memory_status(print_=False):
mem_labels = ('total', 'available', 'percent', 'used', 'free')
mem = psutil.virtual_memory()
memstatus = {}
for i, val in enumerate(mem):
memstatus[mem_labels[i]] = val
if print_:
for key, value in memstatus.items():
if key == 'percent':
print('{}: {:0.3}%'.format(key, value))
else:
print('{}: {:0,.4} GB'.format(key, value / 2 ** 30))
return memstatus
def read_and_binn(runNumber, *args, static_bunches=False, source='raw', save=True):
print(datetime.now())
from processor import DldFlashDataframeCreator as DldFlashProcessor
processor = DldFlashProcessor.DldFlashProcessor()
processor.runNumber = runNumber
if source == 'raw':
processor.readData()
elif source == 'parquet':
try:
processor.readDataframes()
except:
print('No Parquet data found, loading raw data.')
processor.readData()
processor.storeDataframes()
processor.postProcess()
if static_bunches is True:
processor.dd = processor.dd[processor.dd['dldMicrobunchId'] > 400]
else:
processor.dd = processor.dd[processor.dd['dldMicrobunchId'] > 100]
processor.dd = processor.dd[processor.dd['dldMicrobunchId'] < 400]
shortname = ''
processor.resetBins()
dldTime = delayStage = dldPos = None
for arg in args:
if arg[0] == 'dldTime':
dldTime = arg[1:]
elif arg[0] == 'delayStage':
delayStage = arg[1:]
elif arg[0] == 'dldPos':
dldPos = arg[1:]
if dldTime:
processor.addBinning('dldTime', *dldTime)
shortname += 'E'
if delayStage:
processor.addBinning('delayStage', *delayStage)
shortname += 'T'
if dldPos:
processor.addBinning('dldPosX', *dldPos)
processor.addBinning('dldPosY', *dldPos)
shortname += 'KxKy'
if save:
saveName = 'run{} - {}'.format(runNumber, shortname)
result = processor.computeBinnedData(saveName=saveName)
else:
result = processor.computeBinnedData()
axes = processor.binRangeList
return result, axes, processor
[docs]def create_dataframes(runNumbers, *args):
""" Creates a parquet dataframe for each run passed.
Returns:
fails: dictionary of runs and error which broke the dataframe generation
"""
if isinstance(runNumbers, int):
runNumbers = [runNumbers, ]
for run in args:
if isinstance(run, list) or isinstance(run, tuple):
runNumbers.extend(run)
else:
runNumbers.append(run)
fails = {}
for run in runNumbers:
try:
from processor import DldFlashDataframeCreator as DldFlashProcessor
prc = DldFlashProcessor.DldFlashProcessor()
prc.runNumber = run
prc.readData()
prc.storeDataframes()
print('Stored dataframe for run {} in {}'.format(run, prc.DATA_PARQUET_DIR))
except Exception as E:
fails[run] = E
for key, val in fails.items():
print('{} failed with error {}'.format(key, val))
return fails