Source code for pyEvalData.evaluation

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# The MIT License (MIT)
# Copyright (c) 2015-2020 Daniel Schick
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
# OR OTHER DEALINGS IN THE SOFTWARE.

from . import config
import logging

import numpy as np
import collections
import matplotlib.pyplot as plt
import matplotlib as mpl
import re
from uncertainties import unumpy
from .helpers import bin_data

__all__ = ['Evaluation']

__docformat__ = 'restructuredtext'


[docs]class Evaluation(object): """Evaluation Main class for evaluating data. The raw data is accessed via a ``Source`` object. The evaluation allows to bin data, calculate errors and propagate them. There is also an interface to ``lmfit`` for easy batch-fitting. Args: source (Source): raw data source. Attributes: log (logging.logger): logger instance from logging. clist (list[str]): list of counter names to evaluate. cdef (dict{str:str}): dict of predefined counter names and definitions. xcol (str): counter or motor for x-axis. t0 (float): approx. time zero for delay scans to determine the unpumped region of the data for normalization. custom_counters (list[str]): list of custom counters - default is [] math_keys (list[str]): list of keywords which are evaluated as numpy functions statistic_type (str): 'gauss' for normal averaging, 'poisson' for counting statistics propagate_errors (bool): propagate errors for dpendent counters. """ def __init__(self, source): self.log = logging.getLogger(__name__) self.log.setLevel(config.LOG_LEVEL) self.source = source self.clist = [] self.cdef = {} self.xcol = '' self.t0 = 0 self.custom_counters = [] self.math_keys = ['mean', 'sum', 'diff', 'max', 'min', 'round', 'abs', 'sin', 'cos', 'tan', 'arcsin', 'arccos', 'arctan', 'pi', 'exp', 'log', 'log10', 'sqrt', 'sign'] self.statistic_type = 'gauss' self.propagate_errors = True self.apply_data_filter = False self.data_filters = ['evaluatable statement']
[docs] def get_clist(self): """get_clist Returns a list of counters as defined by the user. If the counters where defined in a ``dict`` it will be converted to a ``list`` for backwards compatibility. Returns: clist (list[str]): list of counter names to evaluate. """ if isinstance(self.clist, dict): # the clist property is a dict, so retrun its keys as list clist = list(self.clist.keys()) else: clist = list(self.clist) return clist
[docs] def traverse_counters(self, clist, source_cols=''): """traverse_counters Traverse all counters and replace all predefined counter definitions. Returns also a list of the included source counters for error propagation. Args: clist (list[str]): Initial counter list. source_cols (list[str], optional): counters in the raw source data. Returns: (tuple): - *resolved_counters (list[str])* - resolved counters. - *source_counters (list[str])* - all source counters in the resolved counters. """ resolved_counters = [] source_counters = [] for counter_name in clist: # resolve each counter in the clist counter_string, res_source_counters = \ self.resolve_counter_name(counter_name, source_cols) resolved_counters.append(counter_string) source_counters.extend(res_source_counters) return resolved_counters, list(set(source_counters))
[docs] def resolve_counter_name(self, col_name, source_cols=''): """resolve_counter_name Replace all predefined counter definitions in a given counter name. The function works recursively. Args: col_name (str): initial counter string. source_cols (list[str], optional): columns in the source data. Returns: (tuple): - *col_string (str)* - resolved counter string. - *source_counters (list[str])* - source counters in the col_string """ recall = False # boolean to stop recursive calls source_counters = [] col_string = col_name for find_cdef in self.cdef.keys(): # check for all predefined counters search_pattern = r'\b' + find_cdef + r'\b' if re.search(search_pattern, col_string) is not None: if self.cdef[find_cdef] in source_cols: # this counter definition is a base source counter source_counters.append(self.cdef[find_cdef]) # found a predefined counter # recursive call if predefined counter must be resolved again recall = True # replace the counter definition in the string (col_string, _) = re.subn(search_pattern, '(' + self.cdef[find_cdef] + ')', col_string) if recall: # do the recursive call col_string, rec_source_counters = self.resolve_counter_name(col_string, source_cols) source_counters.extend(rec_source_counters) for find_cdef in source_cols: # check for all base source counters search_pattern = r'\b' + find_cdef + r'\b' if re.search(search_pattern, col_string) is not None: source_counters.append(find_cdef) return col_string, source_counters
[docs] def col_string_to_eval_string(self, col_string, array_name='spec_data'): """Use regular expressions in order to generate an evaluateable string from the counter string in order to append the new counter to the spec data. Args: col_string (str) : Definition of the counter. mode (int) : Flag for different modes Returns: eval_string (str): Evaluateable string to add the new counter to the spec data. """ # search for alphanumeric counter names in col_string iterator = re.finditer( '([0-9]*[a-zA-Z\_]+[0-9]*[a-zA-Z]*)*', col_string) # these are keys which should not be replaced but evaluated math_keys = list(self.math_keys) keys = math_keys.copy() for key in iterator: # traverse all found counter names if len(key.group()) > 0: # the match is > 0 if not key.group() in keys: # the counter name is not in the keys list # remember this counter name in the key list in order # not to replace it again keys.append(key.group()) # the actual replacement (col_string, _) = re.subn(r'\b'+key.group()+r'\b', array_name + '[\'' + key.group() + '\']', col_string) # add 'np.' prefix to numpy functions/math keys for mk in math_keys: if mk != '0x0001FFFF': (col_string, _) = re.subn(r'\b' + mk + r'\b', 'np.' + mk, col_string) return col_string
[docs] def add_custom_counters(self, spec_data, scan_num, source_counters): """Add custom counters to the spec data array. This is a stub for child classes. Args: spec_data (ndarray) : Data array from the spec scan. scan_num (int) : Scan number of the spec scan. source_counters list(str) : List of the source counters and custom counters from the clist and xcol. Returns: spec_data (ndarray): Updated data array from the spec scan. """ return spec_data
[docs] def filter_data(self, data): """filter_data Args: data (TYPE): DESCRIPTION. Returns: TYPE: DESCRIPTION. """ res = [] for data_filter in self.data_filters: name, _ = self.resolve_counter_name(data_filter) idx = eval(self.col_string_to_eval_string(name, array_name='data')) if len(res) == 0: res = idx else: res = np.logical_and(res, idx) data_list = [] dtype_list = [] for name in data.dtype.names: data_list.append(data[name][res]) dtype_list.append((name, data[name][res].dtype, data[name][res].shape)) return np.core.records.fromarrays(data_list, dtype=dtype_list)
[docs] def get_scan_data(self, scan_num): """ Args: scan_num (TYPE): DESCRIPTION. Returns: TYPE: DESCRIPTION. """ data, meta = self.source.get_scan_data(scan_num) if self.apply_data_filter: data = self.filter_data(data) return data
[docs] def get_scan_list_data(self, scan_list): """ Args: scan_num (TYPE): DESCRIPTION. Returns: TYPE: DESCRIPTION. """ data_list, meta_list = self.source.get_scan_list_data(scan_list) if self.apply_data_filter: for i, data in enumerate(data_list): data_list[i] = self.filter_data(data) return data_list
[docs] def avg_N_bin_scans(self, scan_list, xgrid=np.array([]), binning=True): """Averages data defined by the counter list, clist, onto an optional xgrid. If no xgrid is given the x-axis data of the first scan in the list is used instead. Args: scan_list (List[int]) : List of scan numbers. xgrid (Optional[ndarray]) : Grid to bin the data to - default in empty so use the x-axis of the first scan. Returns: avg_data (ndarray) : Averaged data for the scan list. std_data (ndarray) : Standart derivation of the data for the scan list. err_data (ndarray) : Error of the data for the scan list. name (str) : Name of the data set. """ # generate the name of the data set from the spec file name and scan_list name = self.source.name + " #{0:04d}".format(scan_list[0]) # get the counters which should be evaluated clist = self.get_clist() if not clist: raise Exception('No clist is defined. Do not know what to plot!') return # process also the xcol as counter in order to allow for newly defined xcols if not self.xcol: raise Exception('No xcol is defined. Do not know what to plot!') return if self.xcol not in clist: clist.append(self.xcol) source_cols = [] concat_data = np.array([]) data_list = self.get_scan_list_data(scan_list) for i, (spec_data, scan_num) in enumerate(zip(data_list, scan_list)): # traverse the scan list and read data # try: # # try to read the motors and data of this scan # spec_data = self.get_scan_data(scan_num) # except Exception: # raise # print('Scan #' + scan_num + ' not found, skipping') if i == 0 or len(source_cols) == 0: # we need to evaluate this only once # these are the base spec counters which are present in the data # file plus custom counters source_cols = list( set(list(spec_data.dtype.names) + self.custom_counters)) # resolve the clist and retrieve the resolves counters and the # necessary base spec counters for error propagation resolved_counters, source_counters = self.traverse_counters( clist, source_cols) # counter names and resolved strings for further calculations if self.statistic_type == 'poisson' or self.propagate_errors: # for error propagation we just need the base spec counters # and the xcol col_names = source_counters[:] col_strings = source_counters[:] # add the xcol to both lists col_names.append(self.xcol) col_strings.append(resolved_counters[clist.index(self.xcol)]) else: # we need to average the resolved counters col_names = clist[:] col_strings = resolved_counters[:] # create the dtype of the return array dtypes = [] for col_name in clist: dtypes.append((col_name, '<f8')) # add custom counters if defined spec_data = self.add_custom_counters(spec_data, scan_num, source_counters) data = np.array([]) # read data into data array for col_string, col_name in zip(col_strings, col_names): # traverse the counters in the clist and append to data if not # already present eval_string = self.col_string_to_eval_string( col_string, array_name='spec_data') if len(data) == 0: data = np.array(eval(eval_string), dtype=[(col_name, float)]) elif col_name not in data.dtype.names: data = eval('np.lib.recfunctions.append_fields(data,\'' + col_name + '\',data=(' + eval_string + '), dtypes=float, asrecarray=True, usemask=True)') if i > 0: # this is not the first scan in the list so append the data to # the concatenated data array concat_data = np.concatenate((concat_data, data), axis=0) else: concat_data = data if len(xgrid) == 0: # if no xgrid is given we use the xData of the first scan instead xgrid = concat_data[self.xcol] # remove xcol from clist and resolved counters for further treatment del resolved_counters[clist.index(self.xcol)] clist.remove(self.xcol) try: # bin the concatenated data to the xgrid # if a custom counter was calculated it might have a different length # than the spec counters which will result in an error while binning data # from a default spec counter and a custom counter. if binning: xgrid_reduced, _, _, _, _, _, _, _, _ = bin_data( concat_data[self.xcol], concat_data[self.xcol], xgrid) else: xgrid_reduced = xgrid # create empty arrays for averages, std and errors avg_data = np.recarray(np.shape(xgrid_reduced)[0], dtype=dtypes) std_data = np.recarray(np.shape(xgrid_reduced)[0], dtype=dtypes) err_data = np.recarray(np.shape(xgrid_reduced)[0], dtype=dtypes) if self.statistic_type == 'poisson': bin_stat = 'sum' else: # gauss bin_stat = 'mean' if binning: if self.statistic_type == 'poisson' or self.propagate_errors: # propagate errors using the uncertainties package # create empty dict for uncertainties data arrays unc_data_err = {} unc_data_std = {} for col in source_counters: # for all cols in the clist bin the data to the xgrid an calculate # the averages, stds and errors y, avg_data[self.xcol], yerr, err_data[self.xcol], ystd, \ std_data[self.xcol], _, _, _ = bin_data(concat_data[col], concat_data[self.xcol], xgrid_reduced, statistic=bin_stat) # add spec base counters to uncData arrays # the uncertainty package cannot handle masked arrays # e.g. for divisions in the clist # --> convert all base counter results to np.array() unc_data_std[col] = unumpy.uarray(np.array(y), np.array(ystd)) unc_data_err[col] = unumpy.uarray(np.array(y), np.array(yerr)) for col_name, col_string in zip(clist, resolved_counters): eval_string = self.col_string_to_eval_string( col_string, array_name='unc_data_err') temp = eval(eval_string) avg_data[col_name] = unumpy.nominal_values(temp) err_data[col_name] = unumpy.std_devs(temp) eval_string = self.col_string_to_eval_string( col_string, array_name='unc_data_std') temp = eval(eval_string) std_data[col_name] = unumpy.std_devs(temp) else: # no error propagation but averaging of individual scans for col in clist: # for all cols in the clist bin the data to the xgrid an calculate # the averages, stds and errors avg_data[col], avg_data[self.xcol], err_data[col], \ err_data[self.xcol], std_data[col], std_data[self.xcol], _, _, \ _ = bin_data(concat_data[col], concat_data[self.xcol], xgrid_reduced, statistic=bin_stat) else: for col_name, col_string in zip(clist, resolved_counters): eval_string = self.col_string_to_eval_string( col_string, array_name='spec_data') temp = eval(eval_string) avg_data[col_name] = temp avg_data[self.xcol] = concat_data[self.xcol] err_data[col_name] = 0 err_data[self.xcol] = 0 std_data[col_name] = 0 std_data[self.xcol] = 0 except Exception: raise print('xcol and ycol must have the same length --> probably you try plotting a custom' ' counter together with a spec counter') return avg_data, std_data, err_data, name
[docs] def plot_scans(self, scan_list, ylims=[], xlims=[], fig_size=[], xgrid=[], yerr='std', xerr='std', norm2one=False, binning=True, label_text='', title_text='', skip_plot=False, grid_on=True, ytext='', xtext='', fmt='-o'): """Plot a list of scans from the spec file. Various plot parameters are provided. The plotted data are returned. Args: scan_list (List[int]) : List of scan numbers. ylims (Optional[ndarray]) : ylim for the plot. xlims (Optional[ndarray]) : xlim for the plot. fig_size (Optional[ndarray]) : Figure size of the figure. xgrid (Optional[ndarray]) : Grid to bin the data to - default in empty so use the x-axis of the first scan. yerr (Optional[ndarray]) : Type of the errors in y: [err, std, none] default is 'std'. xerr (Optional[ndarray]) : Type of the errors in x: [err, std, none] default is 'std'. norm2one (Optional[bool]) : Norm transient data to 1 for t < t0 default is False. label_text (Optional[str]) : Label of the plot - default is none. title_text (Optional[str]) : Title of the figure - default is none. skip_plot (Optional[bool]) : Skip plotting, just return data default is False. grid_on (Optional[bool]) : Add grid to plot - default is True. ytext (Optional[str]) : y-Label of the plot - defaults is none. xtext (Optional[str]) : x-Label of the plot - defaults is none. fmt (Optional[str]) : format string of the plot - defaults is -o. Returns: y2plot (OrderedDict) : y-data which was plotted. x2plot (ndarray) : x-data which was plotted. yerr2plot (OrderedDict) : y-error which was plotted. xerr2plot (ndarray) : x-error which was plotted. name (str) : Name of the data set. """ # initialize the y-data as ordered dict in order to allow for multiple # counters at the same time y2plot = collections.OrderedDict() yerr2plot = collections.OrderedDict() # get the averaged data, stds and errors for the scan list and the xgrid avg_data, std_data, err_data, name = self.avg_N_bin_scans( scan_list, xgrid=xgrid, binning=binning) # set the error data if xerr == 'std': xerr_data = std_data elif xerr == 'err': xerr_data = err_data else: xerr_data = np.zeros_like(std_data) if yerr == 'std': yerr_data = std_data elif yerr == 'err': yerr_data = err_data else: yerr_data = np.zeros_like(std_data) # set x-data and errors x2plot = avg_data[self.xcol] xerr2plot = xerr_data[self.xcol] # plot all keys in the clist clist = self.get_clist() for col in clist: # traverse the counter list # save the counter data and errors in the ordered dictionary y2plot[col] = avg_data[col] yerr2plot[col] = yerr_data[col] if norm2one: # normalize the y-data to 1 for t < t0 # just makes sense for delay scans before_zero = y2plot[col][x2plot <= self.t0] y2plot[col] = y2plot[col]/np.mean(before_zero) yerr2plot[col] = yerr2plot[col]/np.mean(before_zero) if len(label_text) == 0: # if no label_text is given use the counter name lt = col else: if len(clist) > 1: # for multiple counters add the counter name to the label lt = label_text + ' | ' + col else: # for a single counter just use the label_text lt = label_text if not skip_plot: # plot the errorbar for each counter if (xerr == 'none') & (yerr == 'none'): plt.plot(x2plot, y2plot[col], fmt, label=lt) else: plt.errorbar( x2plot, y2plot[col], fmt=fmt, label=lt, xerr=xerr2plot, yerr=yerr2plot[col]) if not skip_plot: # add a legend, labels, title and set the limits and grid plt.legend(frameon=True, loc=0, numpoints=1) plt.xlabel(self.xcol) if xlims: plt.xlim(xlims) if ylims: plt.ylim(ylims) if len(title_text) > 0: plt.title(title_text) else: plt.title(name) if len(xtext) > 0: plt.xlabel(xtext) if len(ytext) > 0: plt.ylabel(ytext) if grid_on: plt.grid(True) return y2plot, x2plot, yerr2plot, xerr2plot, name
[docs] def plot_mesh_scan(self, scan_num, skip_plot=False, grid_on=False, ytext='', xtext='', levels=20, cbar=True): """Plot a single mesh scan from the spec file. Various plot parameters are provided. The plotted data are returned. Args: scan_num (int) : Scan number of the spec scan. skip_plot (Optional[bool]) : Skip plotting, just return data default is False. grid_on (Optional[bool]) : Add grid to plot - default is False. ytext (Optional[str]) : y-Label of the plot - defaults is none. xtext (Optional[str]) : x-Label of the plot - defaults is none. levels (Optional[int]) : levels of contour plot - defaults is 20. cbar (Optional[bool]) : Add colorbar to plot - default is True. Returns: xx, yy, zz : x,y,z data which was plotted """ from matplotlib.mlab import griddata from matplotlib import gridspec # read data from spec file try: # try to read data of this scan spec_data = self.get_scan_data(scan_num) except Exception: print('Scan #' + int(scan_num) + ' not found, skipping') dt = spec_data.dtype dt = dt.descr xmotor = dt[0][0] ymotor = dt[1][0] X = spec_data[xmotor] Y = spec_data[ymotor] xx = np.sort(np.unique(X)) yy = np.sort(np.unique(Y)) clist = self.get_clist() if len(clist) > 1: print('WARNING: Only the first counter of the clist is plotted.') Z = spec_data[clist[0]] zz = griddata(X, Y, Z, xx, yy, interp='linear') if not skip_plot: if cbar: gs = gridspec.GridSpec(4, 2, width_ratios=[3, 1], height_ratios=[0.2, 0.1, 1, 3] ) k = 4 else: gs = gridspec.GridSpec(2, 2, width_ratios=[3, 1], height_ratios=[1, 3] ) k = 0 ax1 = plt.subplot(gs[0+k]) plt.plot(xx, np.mean(zz, 0), label='mean') plt.plot(xx, zz[np.argmax(np.mean(zz, 1)), :], label='peak') plt.xlim([min(xx), max(xx)]) plt.legend(loc=0) ax1.xaxis.tick_top() if grid_on: plt.grid(True) plt.subplot(gs[2+k]) plt.contourf(xx, yy, zz, levels, cmap='viridis') plt.xlabel(xmotor) plt.ylabel(ymotor) if len(xtext) > 0: plt.xlabel(xtext) if len(ytext) > 0: plt.ylabel(ytext) if grid_on: plt.grid(True) if cbar: cb = plt.colorbar(cax=plt.subplot( gs[0]), orientation='horizontal') cb.ax.xaxis.set_ticks_position('top') cb.ax.xaxis.set_label_position('top') ax4 = plt.subplot(gs[3+k]) plt.plot(np.mean(zz, 1), yy) plt.plot(zz[:, np.argmax(np.mean(zz, 0))], yy) plt.ylim([np.min(yy), np.max(yy)]) ax4.yaxis.tick_right() if grid_on: plt.grid(True) return xx, yy, zz
[docs] def plot_scan_sequence(self, scan_sequence, ylims=[], xlims=[], fig_size=[], xgrid=[], yerr='std', xerr='std', norm2one=False, binning=True, sequence_type='', label_text='', title_text='', skip_plot=False, grid_on=True, ytext='', xtext='', fmt='-o'): """Plot a list of scans from the spec file. Various plot parameters are provided. The plotted data are returned. Args: scan_sequence (List[ List/Tuple[List[int], int/str]]) : Sequence of scan lists and parameters. ylims (Optional[ndarray]) : ylim for the plot. xlims (Optional[ndarray]) : xlim for the plot. fig_size (Optional[ndarray]) : Figure size of the figure. xgrid (Optional[ndarray]) : Grid to bin the data to - default in empty so use the x-axis of the first scan. yerr (Optional[ndarray]) : Type of the errors in y: [err, std, none] default is 'std'. xerr (Optional[ndarray]) : Type of the errors in x: [err, std, none] default is 'std'. norm2one (Optional[bool]) : Norm transient data to 1 for t < t0 default is False. sequence_type (Optional[str]): Type of the sequence: [fluence, delay, energy, theta, position, voltage, none, text] - default is enumeration. label_text (Optional[str]) : Label of the plot - default is none. title_text (Optional[str]) : Title of the figure - default is none. skip_plot (Optional[bool]) : Skip plotting, just return data default is False. grid_on (Optional[bool]) : Add grid to plot - default is True. ytext (Optional[str]) : y-Label of the plot - defaults is none. xtext (Optional[str]) : x-Label of the plot - defaults is none. fmt (Optional[str]) : format string of the plot - defaults is -o. Returns: sequence_data (OrderedDict) : Dictionary of the averaged scan data. parameters (List[str, float]) : Parameters of the sequence. names (List[str]) : List of names of each data set. label_texts (List[str]) : List of labels for each data set. """ # initialize the return data sequence_data = collections.OrderedDict() names = [] label_texts = [] parameters = [] for i, (scan_list, parameter) in enumerate(scan_sequence): # traverse the scan sequence parameters.append(parameter) # format the parameter as label text of this plot if no label text # is given if len(label_text) == 0: if sequence_type == 'fluence': lt = str.format('{:.2f} mJ/cm²', parameter) elif sequence_type == 'delay': lt = str.format('{:.2f} ps', parameter) elif sequence_type == 'energy': lt = str.format('{:.2f} eV', parameter) elif sequence_type == 'theta': lt = str.format('{:.2f} deg', parameter) elif sequence_type == 'temperature': lt = str.format('{:.2f} K', parameter) elif sequence_type == 'position': lt = str.format('{:.2f} mm', parameter) elif sequence_type == 'voltage': lt = str.format('{:.2f} V', parameter) elif sequence_type == 'current': lt = str.format('{:.2f} A', parameter) elif sequence_type == 'scans': lt = str(scan_list) elif sequence_type == 'none': # no parameter for single scans lt = '' elif sequence_type == 'text': # parameter is a string lt = parameter else: # no sequence type is given --> enumerate lt = str.format('#{}', i+1) else: lt = label_text[i] # get the plot data for the scan list y2plot, x2plot, yerr2plot, xerr2plot, name = self.plot_scans( scan_list, ylims=ylims, xlims=xlims, fig_size=fig_size, xgrid=xgrid, yerr=yerr, xerr=xerr, norm2one=norm2one, binning=binning, label_text=lt, title_text=title_text, skip_plot=skip_plot, grid_on=grid_on, ytext=ytext, xtext=xtext, fmt=fmt ) if self.xcol not in sequence_data.keys(): # if the xcol is not in the return data dict - add the key sequence_data[self.xcol] = [] sequence_data[self.xcol + 'Err'] = [] # add the x-axis data to the return data dict sequence_data[self.xcol].append(x2plot) sequence_data[self.xcol + 'Err'].append(xerr2plot) for counter in y2plot: # traverse all counters in the data set if counter not in sequence_data.keys(): # if the counter is not in the return data dict - add the key sequence_data[counter] = [] sequence_data[counter + 'Err'] = [] # add the counter data to the return data dict sequence_data[counter].append(y2plot[counter]) sequence_data[counter + 'Err'].append(yerr2plot[counter]) # append names and labels to their lists names.append(name) label_texts.append(lt) return sequence_data, parameters, names, label_texts
[docs] def export_scan_sequence(self, scan_sequence, path, fileName, yerr='std', xerr='std', xgrid=[], norm2one=False, binning=True): """Exports spec data for each scan list in the sequence as individual file. Args: scan_sequence (List[ List/Tuple[List[int], int/str]]) : Sequence of scan lists and parameters. path (str) : Path of the file to export to. fileName (str) : Name of the file to export to. yerr (Optional[ndarray]) : Type of the errors in y: [err, std, none] default is 'std'. xerr (Optional[ndarray]) : Type of the errors in x: [err, std, none] default is 'std'. xgrid (Optional[ndarray]) : Grid to bin the data to - default in empty so use the x-axis of the first scan. norm2one (Optional[bool]) : Norm transient data to 1 for t < t0 default is False. """ # get scan_sequence data without plotting sequence_data, parameters, names, label_texts = self.plot_scan_sequence( scan_sequence, xgrid=xgrid, yerr=yerr, xerr=xerr, norm2one=norm2one, binning=binning, skip_plot=True) for i, label_text in enumerate(label_texts): # travserse the sequence header = '' saveData = [] for counter in sequence_data: # travserse all counters in the data # build the file header header = header + counter + '\t ' # build the data matrix saveData.append(sequence_data[counter][i]) # save data with header to text file np.savetxt('{:s}/{:s}_{:s}.dat'.format(path, fileName, "".join(x for x in label_text if x.isalnum())), np.r_[saveData].T, delimiter='\t', header=header)
[docs] def fit_scans(self, scans, mod, pars, ylims=[], xlims=[], fig_size=[], xgrid=[], yerr='std', xerr='std', norm2one=False, binning=True, sequence_type='text', label_text='', title_text='', ytext='', xtext='', select='', fit_report=0, show_single=False, weights=False, fit_method='leastsq', offset_t0=False, plot_separate=False, grid_on=True, fmt='o'): """Fit, plot, and return the data of scans. This is just a wrapper for the fit_scan_sequence method """ scan_sequence = [[scans, '']] return self.fit_scan_sequence(scan_sequence, mod, pars, ylims, xlims, fig_size, xgrid, yerr, xerr, norm2one, binning, 'none', label_text, title_text, ytext, xtext, select, fit_report, show_single, weights, fit_method, offset_t0, plot_separate, grid_on, fmt=fmt)
[docs] def fit_scan_sequence(self, scan_sequence, mod, pars, ylims=[], xlims=[], fig_size=[], xgrid=[], yerr='std', xerr='std', norm2one=False, binning=True, sequence_type='', label_text='', title_text='', ytext='', xtext='', select='', fit_report=0, show_single=False, weights=False, fit_method='leastsq', offset_t0=False, plot_separate=False, grid_on=True, last_res_as_par=False, sequence_data=[], fmt='o'): """Fit, plot, and return the data of a scan sequence. Args: scan_sequence (List[ List/Tuple[List[int], int/str]]) : Sequence of scan lists and parameters. mod (Model[lmfit]) : lmfit model for fitting the data. pars (Parameters[lmfit]) : lmfit parameters for fitting the data. ylims (Optional[ndarray]) : ylim for the plot. xlims (Optional[ndarray]) : xlim for the plot. fig_size (Optional[ndarray]) : Figure size of the figure. xgrid (Optional[ndarray]) : Grid to bin the data to - default in empty so use the x-axis of the first scan. yerr (Optional[ndarray]) : Type of the errors in y: [err, std, none] default is 'std'. xerr (Optional[ndarray]) : Type of the errors in x: [err, std, none] default is 'std'. norm2one (Optional[bool]) : Norm transient data to 1 for t < t0 default is False. sequence_type (Optional[str]): Type of the sequence: [fluence, delay, energy, theta] - default is fluence. label_text (Optional[str]) : Label of the plot - default is none. title_text (Optional[str]) : Title of the figure - default is none. ytext (Optional[str]) : y-Label of the plot - defaults is none. xtext (Optional[str]) : x-Label of the plot - defaults is none. select (Optional[str]) : String to evaluate as select statement for the fit region - default is none fit_report (Optional[int]) : Set the fit reporting level: [0: none, 1: basic, 2: full] default 0. show_single (Optional[bool]) : Plot each fit seperately - default False. weights (Optional[bool]) : Use weights for fitting - default False. fit_method (Optional[str]) : Method to use for fitting; refer to lmfit - default is 'leastsq'. offset_t0 (Optional[bool]) : Offset time scans by the fitted t0 parameter - default False. plot_separate (Optional[bool]):A single plot for each counter default False. grid_on (Optional[bool]) : Add grid to plot - default is True. last_res_as_par (Optional[bool]): Use the last fit result as start values for next fit - default is False. sequence_data (Optional[ndarray]): actual exp. data are externally given. default is empty fmt (Optional[str]) : format string of the plot - defaults is -o. Returns: res (Dict[ndarray]) : Fit results. parameters (ndarray) : Parameters of the sequence. sequence_data (OrderedDict) : Dictionary of the averaged scan data.equenceData """ # get the last open figure number main_fig_num = self.get_last_fig_number() if not fig_size: # use default figure size if none is given fig_size = mpl.rcParams['figure.figsize'] # initialization of returns res = {} # initialize the results dict for i, counter in enumerate(self.get_clist()): # traverse all counters in the counter list to initialize the returns # results for this counter is again a Dict res[counter] = {} if isinstance(pars, (list, tuple)): # the fit paramters might individual for each counter _pars = pars[i] else: _pars = pars for pname, par in _pars.items(): # add a dict key for each fit parameter in the result dict res[counter][pname] = [] res[counter][pname + 'Err'] = [] # add some more results res[counter]['chisqr'] = [] res[counter]['redchi'] = [] res[counter]['CoM'] = [] res[counter]['int'] = [] res[counter]['fit'] = [] if len(sequence_data) > 0: # get only the parameters _, parameters, names, label_texts = self.plot_scan_sequence( scan_sequence, ylims=ylims, xlims=xlims, fig_size=fig_size, xgrid=xgrid, yerr=yerr, xerr=xerr, norm2one=norm2one, binning=True, sequence_type=sequence_type, label_text=label_text, title_text=title_text, skip_plot=True) else: # get the sequence data and parameters sequence_data, parameters, names, label_texts = self.plot_scan_sequence( scan_sequence, ylims=ylims, xlims=xlims, fig_size=fig_size, xgrid=xgrid, yerr=yerr, xerr=xerr, norm2one=norm2one, binning=True, sequence_type=sequence_type, label_text=label_text, title_text=title_text, skip_plot=True) # this is the number of different counters num_sub_plots = len(self.get_clist()) # fitting and plotting the data l_plot = 1 # counter for single plots for i, parameter in enumerate(parameters): # traverse all parameters of the sequence lt = label_texts[i] name = names[i] x2plot = sequence_data[self.xcol][i] xerr2plot = sequence_data[self.xcol + 'Err'][i] if fit_report > 0: # plot for basics and full fit reporting print('') print('='*10 + ' Parameter: ' + lt + ' ' + '='*15) j = 0 # counter for counters ;) k = 1 # counter for subplots for counter in sequence_data: # traverse all counters in the sequence # plot only y counters - next is the coresp. error if j >= 2 and j % 2 == 0: # add the counter name to the label for not seperate plots if sequence_type == 'none': _lt = counter else: if plot_separate or num_sub_plots == 1: _lt = lt else: _lt = lt + ' | ' + counter # get the fit models and fit parameters if they are lists/tupels if isinstance(mod, (list, tuple)): _mod = mod[k-1] else: _mod = mod if last_res_as_par and i > 0: # use last results as start values for pars _pars = pars for pname, par in pars.items(): _pars[pname].value = res[counter][pname][i-1] else: if isinstance(pars, (list, tuple)): _pars = pars[k-1] else: _pars = pars # get the actual y-data and -errors for plotting and fitting y2plot = sequence_data[counter][i] yerr2plot = sequence_data[counter + 'Err'][i] # evaluate the select statement if select == '': # select all sel = np.ones_like(y2plot, dtype=bool) else: sel = eval(select) # execute the select statement y2plot = y2plot[sel] x2plot = x2plot[sel] yerr2plot = yerr2plot[sel] xerr2plot = xerr2plot[sel] # remove nans y2plot = y2plot[~np.isnan(y2plot)] x2plot = x2plot[~np.isnan(y2plot)] yerr2plot = yerr2plot[~np.isnan(y2plot)] xerr2plot = xerr2plot[~np.isnan(y2plot)] # do the fitting with or without weighting the data if weights: out = _mod.fit(y2plot, _pars, x=x2plot, weights=1/yerr2plot, method=fit_method, nan_policy='propagate') else: out = _mod.fit(y2plot, _pars, x=x2plot, method=fit_method, nan_policy='propagate') if fit_report > 0: # for basic and full fit reporting print('') print('-'*10 + ' ' + counter + ': ' + '-'*15) for key in out.best_values: print('{:>12}: {:>10.4e} '.format( key, out.best_values[key])) # set the x-offset for delay scans - offset parameter in # the fit must be called 't0' if offset_t0: offsetX = out.best_values['t0'] else: offsetX = 0 plt.figure(main_fig_num) # select the main figure if plot_separate: # use subplot for separate plotting plt.subplot((num_sub_plots+num_sub_plots % 2)/2, 2, k) # plot the fit and the data as errorbars x2plotFit = np.linspace( np.min(x2plot), np.max(x2plot), 10000) plot = plt.plot(x2plotFit-offsetX, out.eval(x=x2plotFit), '-', lw=2, alpha=1) plt.errorbar(x2plot-offsetX, y2plot, fmt=fmt, xerr=xerr2plot, yerr=yerr2plot, label=_lt, alpha=0.25, color=plot[0].get_color()) if len(parameters) > 5: # move the legend outside the plot for more than # 5 sequence parameters plt.legend(bbox_to_anchor=(0., 1.08, 1, .102), frameon=True, loc=3, numpoints=1, ncol=3, mode="expand", borderaxespad=0.) else: plt.legend(frameon=True, loc=0, numpoints=1) # set the axis limits, title, labels and gird if xlims: plt.xlim(xlims) if ylims: plt.ylim(ylims) if len(title_text) > 0: if isinstance(title_text, (list, tuple)): plt.title(title_text[k-1]) else: plt.title(title_text) else: plt.title(name) if len(xtext) > 0: plt.xlabel(xtext) if len(ytext) > 0: if isinstance(ytext, (list, tuple)): plt.ylabel(ytext[k-1]) else: plt.ylabel(ytext) if grid_on: plt.grid(True) # show the single fits and residuals if show_single: plt.figure(main_fig_num+l_plot, figsize=fig_size) gs = mpl.gridspec.GridSpec( 2, 1, height_ratios=[1, 3], hspace=0.1) ax1 = plt.subplot(gs[0]) markerline, stemlines, baseline = plt.stem( x2plot-offsetX, out.residual, markerfmt=' ', use_line_collection=True) plt.setp(stemlines, 'color', plot[0].get_color(), 'linewidth', 2, alpha=0.5) plt.setp(baseline, 'color', 'k', 'linewidth', 0) ax1.xaxis.tick_top() ax1.yaxis.set_major_locator(plt.MaxNLocator(3)) plt.ylabel('Residuals') if xlims: plt.xlim(xlims) if ylims: plt.ylim(ylims) if len(xtext) > 0: plt.xlabel(xtext) if grid_on: plt.grid(True) if len(title_text) > 0: if isinstance(title_text, (list, tuple)): plt.title(title_text[k-1]) else: plt.title(title_text) else: plt.title(name) ax2 = plt.subplot(gs[1]) x2plotFit = np.linspace( np.min(x2plot), np.max(x2plot), 1000) ax2.plot(x2plotFit-offsetX, out.eval(x=x2plotFit), '-', lw=2, alpha=1, color=plot[0].get_color()) ax2.errorbar(x2plot-offsetX, y2plot, fmt=fmt, xerr=xerr2plot, yerr=yerr2plot, label=_lt, alpha=0.25, color=plot[0].get_color()) plt.legend(frameon=True, loc=0, numpoints=1) if xlims: plt.xlim(xlims) if ylims: plt.ylim(ylims) if len(xtext) > 0: plt.xlabel(xtext) if len(ytext) > 0: if isinstance(ytext, (list, tuple)): plt.ylabel(ytext[k-1]) else: plt.ylabel(ytext) if grid_on: plt.grid(True) l_plot += 1 if fit_report > 1: # for full fit reporting print('_'*40) print(out.fit_report()) # add the fit results to the returns for pname, par in _pars.items(): res[counter][pname] = np.append( res[counter][pname], out.best_values[pname]) res[counter][pname + 'Err'] = np.append( res[counter][pname + 'Err'], out.params[pname].stderr) res[counter]['chisqr'] = np.append( res[counter]['chisqr'], out.chisqr) res[counter]['redchi'] = np.append( res[counter]['redchi'], out.redchi) res[counter]['CoM'] = np.append( res[counter]['CoM'], sum(y2plot*x2plot)/sum(y2plot)) res[counter]['int'] = np.append( res[counter]['int'], sum(y2plot)) res[counter]['fit'] = np.append(res[counter]['fit'], out) k += 1 j += 1 plt.figure(main_fig_num) # set as active figure return res, parameters, sequence_data
# move to the end for plotting
[docs] def get_last_fig_number(self): """get_last_fig_number Return the last figure number of all opened figures for plotting data in the same figure during for-loops. Returns: fig_number (int): last figure number of all opened figures. """ try: # get the number of all opened figures fig_number = mpl._pylab_helpers.Gcf.get_active().num except Exception: # there are no figures open fig_number = 1 return fig_number
[docs] def get_next_fig_number(self): """get_next_fig_number Return the number of the next available figure. Returns: next_fig_number (int): next figure number of all opened figures. """ return self.get_last_fig_number() + 1