Source code for processor.utilities.diagnostics

# -*- coding: utf-8 -*-
"""

@author: Steinn Ymir Agustsson
"""

from .misc import repr_byte_size
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

import dask
from dask.diagnostics import ProgressBar


[docs]def channel_report(dd): """ Generates a table containing relevant statistical quantities on all available channels [EXPENSIVE!!] Args: dd (dask.DataFrame): Table to analyse Returns: df (pandas.DataFrame): table with computed values Notes: Author: Steinn Ymir Agustsson <sagustss@uni-mainz.de> """ print('creating channel report...') with ProgressBar(): chan_dict = {} for chan in dd.columns: vals = dd[chan].compute() clean = vals[np.isfinite(vals)] chan_dict[chan] = {} chan_dict[chan]['min'] = min(clean) chan_dict[chan]['max'] = max(clean) chan_dict[chan]['amp'] = chan_dict[chan]['max'] - chan_dict[chan]['min'] chan_dict[chan]['mean'] = np.mean(clean) chan_dict[chan]['std'] = np.std(clean) chan_dict[chan]['len'] = len(vals) chan_dict[chan]['len_nan'] = len(vals) - len(clean) chan_dict[chan]['max_loc'] = vals.idxmax() chan_dict[chan]['min_loc'] = vals.idxmin() df = pd.DataFrame.from_dict(chan_dict).T return df
[docs]def channel_statistics(dd,columns=None): """ Statistical overiview of the dataset. The report contains minimum, maximum, amplitude(max-min), mean and standard deviation of each column. Args: dd (dask.DataFrame): Table to analyse columns (list,None): list of columns to analyse, if None it checks all columns in the dataframe. Returns: df (pandas.DataFrame): table with computed values Notes: Author: Steinn Ymir Agustsson <sagustss@uni-mainz.de> """ print('creating channel report...') chan_dict = {} if columns is None: columns = dd.columns for chan in columns: try: vals = dd[chan] chan_dict[chan] = {} chan_dict[chan]['min'] = vals.min() chan_dict[chan]['max'] = vals.max() chan_dict[chan]['amp'] = chan_dict[chan]['max'] - chan_dict[chan]['min'] chan_dict[chan]['mean'] = vals.mean() chan_dict[chan]['std'] = vals.std() # chan_dict[chan]['len'] = vals.size # chan_dict[chan]['len_nan'] = vals.size - vals.isna().size except KeyError: print(f'No column {chan} in dataframe') with ProgressBar(): d, = dask.compute(chan_dict) return pd.DataFrame.from_dict(d)
[docs]def binned_array_size(processor): """ Prints the expected size in memory of the binned array, computed with the current binning parameters""" voxels = np.prod([len(x) for x in processor.binAxesList]) print(repr_byte_size(voxels.astype(np.float64) * 64.))
[docs]def plot_channels(processor): """ Generates a plot of each available channel. Useful for diagnostics on data conditions Args: processor (DldProcessor): processor instance """ cols = processor.dd.columns f, ax = plt.subplots(len(cols) // 2, 2, figsize=(15, 20)) for i, chan in enumerate(processor.dd.columns): vals = processor.dd[chan].compute() clean = vals[np.isfinite(vals)] start, stop, mean = clean.min(), clean.max(), clean.mean() if np.abs((start - mean) / (stop - mean)) > 5: start = stop - 2 * mean elif np.abs((stop - mean) / (start - mean)) > 5: stop = start + 2 * mean step = max((stop - start) / 1000, 1) x = processor.addBinning(chan, start, stop, step) res = processor.computeBinnedData() ax[i // 2, i % 2].set_title(chan, fontsize='x-large') if len(res) > 1: try: ax[i // 2, i % 2].plot(x, res) except ValueError: ax[i // 2, i % 2].plot(x, res[:-1]) processor.resetBins()
[docs]def plot_GMD_vs_bunches(self): """ not sure what this was done for... """ f, ax = plt.subplots(1, 2) self.resetBins() ubId = self.addBinning('dldMicrobunchId', 1, 500, 1) result_ubId = self.computeBinnedData() ax[0].plot(ubId, result_ubId / max(result_ubId), label='Photoelectron count') ax[0].set_title('Microbunch') self.resetBins() MbId_values = self.dd['macroBunchPulseId'].compute() clean = MbId_values[np.isfinite(MbId_values)] start, stop, mean = clean.min(), clean.max(), clean.mean() if np.abs((start - mean) / (stop - mean)) > 5: start = stop - 2 * mean elif np.abs((stop - mean) / (start - mean)) > 5: stop = start + 2 * mean step = max((stop - start) / 1000, 1) MbId = self.addBinning('macroBunchPulseId', start, stop, step) result_MbId = self.computeBinnedData() ax[1].plot(MbId, result_MbId / max(result_MbId), label='Photoelectron count') ax[1].set_title('Macrobunch') gmd_values = np.nan_to_num(self.dd['gmdBda'].compute()) ubId_values = self.dd['dldMicrobunchId'].compute() gmd_ubId = np.zeros_like(ubId) for g, ub in zip(gmd_values, ubId_values): ub_idx = int(ub) if 0 < ub_idx < len(gmd_ubId): gmd_ubId[ub_idx] += g gmd_MbId = np.zeros_like(MbId) for g, Mb in zip(gmd_values, MbId_values): Mb_idx = int(Mb) if 0 < Mb_idx < len(gmd_MbId): gmd_MbId[Mb_idx] += g ax[0].plot(ubId, gmd_ubId / max(gmd_ubId), label='GMD') ax[1].plot(MbId, gmd_MbId / max(gmd_MbId) + 1, label='GMD') for a in ax: a.legend() a.grid()