#!/usr/bin/env python
# -*- coding: utf-8 -*-
# The MIT License (MIT)
# Copyright (c) 2015-2021 Daniel Schick
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
# OR OTHER DEALINGS IN THE SOFTWARE.
from .. import config
import logging
from .scan import Scan
import os.path as path
from numpy.core.records import fromarrays
import nexusformat.nexus as nxs
__all__ = ['Source']
__docformat__ = 'restructuredtext'
[docs]class Source(object):
"""Source
Class of default source implementation.
Args:
file_name (str): file name including extension,
can include regex pattern.
file_path (str, optional): file path - defaults to ``./``.
Keyword Args:
start_scan_number (uint): start of scan numbers to parse.
stop_scan_number (uint): stop of scan numbers to parse.
This number is included.
nexus_file_name (str): name for generated nexus file.
nexus_file_name_postfix (str): postfix for nexus file name.
nexus_file_path (str): path for generated nexus file.
read_all_data (bool): read all data on parsing.
If false, data will be read only on demand.
read_and_forget (bool): clear data after read to save memory.
update_before_read (bool): always update from source
before reading scan data.
use_nexus (bool): use nexus file to join/compress raw data.
force_overwrite (bool): forced re-read of raw source and
re-generated of nexus file.
Attributes:
log (logging.logger): logger instance from logging.
name (str): name of the source
scan_dict (dict(scan)): dict of scan objects with
key being the scan number.
start_scan_number (uint): start of scan numbers to parse.
stop_scan_number (uint): stop of scan numbers to parse.
This number is included.
file_name (str): file name including extension,
can include regex pattern.
file_path (str, optional): file path - defaults to ``./``.
nexus_file_name (str): name for generated nexus file.
nexus_file_name_postfix (str): postfix for nexus file name.
nexus_file_path (str): path for generated nexus file.
nexus_file_exists(bool): if nexus file exists.
read_all_data (bool): read all data on parsing.
read_and_forget (bool): clear data after read to save memory.
update_before_read (bool): always update from source
before reading scan data.
use_nexus (bool): use nexus file to join/compress raw data.
force_overwrite (bool): forced re-read of raw source and
re-generated of nexus file.
"""
def __init__(self, file_name, file_path='./', **kwargs):
self.log = logging.getLogger(__name__)
self.log.setLevel(config.LOG_LEVEL)
self.name = file_name
self.scan_dict = {}
self._start_scan_number = 0
self._stop_scan_number = -1
self.start_scan_number = kwargs.get('start_scan_number', 0)
self.stop_scan_number = kwargs.get('stop_scan_number', -1)
self.file_name = file_name
self.file_path = file_path
self.nexus_file_name_postfix = kwargs.get('nexus_file_name_postfix',
'.pyevaldata')
self.nexus_file_name = kwargs.get('nexus_file_name', self.file_name)
self.nexus_file_path = kwargs.get('nexus_file_path', self.file_path)
self.check_nexus_file_exists()
self.read_all_data = kwargs.get('read_all_data', False)
self.read_and_forget = kwargs.get('read_and_forget', False)
self.update_before_read = kwargs.get('update_before_read', False)
self.use_nexus = kwargs.get('use_nexus', True)
self.force_overwrite = kwargs.get('force_overwrite', False)
# update from the source
self.update()
def __getattr__(self, attr):
"""__getattr__
Allows to access scans as source attributes.
Returns:
scan (Scan): scan object.
"""
if attr.startswith("scan"):
index = attr[4:]
try:
scan_number = int(index)
except ValueError:
raise ValueError('Scan number must be convertable to an integer!')
return self.get_scan(scan_number)
else:
raise AttributeError('\'{:s}\' has no attribute \'{:s}\''.format(__name__, attr))
def __len__(self):
"""Returns length of ``scan_dict``"""
return self.scan_dict.__len__()
[docs] def update(self, scan_number_list=[]):
"""update
update the ``scan_dict`` either from the raw source file/folder
or from the nexus file.
The optional ``scan_number_list`` runs the update only if required
for the included scan.
Attributes:
scan_number_list (list[int]): explicit list of scans
"""
if not isinstance(scan_number_list, list):
scan_number_list = [int(scan_number_list)]
last_scan_number = self.get_last_scan_number()
if (len(scan_number_list) == 0) \
or (last_scan_number in scan_number_list) \
or any(list(set(scan_number_list) - set(self.scan_dict.keys()))):
self.log.info('Update source')
if self.use_nexus:
self.log.debug('Updating from nexus')
# do not combine cases for better flow control
if not self.nexus_file_exists:
self.log.debug('nexus file does not exist')
self.parse_raw()
self.save_all_scans_to_nexus()
elif self.update_before_read:
self.log.debug('Update before read')
self.parse_raw()
self.save_all_scans_to_nexus()
elif self.force_overwrite:
self.log.debug('Force overwrite')
self.parse_raw()
self.save_all_scans_to_nexus()
else:
self.parse_nexus()
else:
self.log.debug('Updating from raw source')
self.parse_raw()
else:
self.log.debug('Skipping update for scans {:s} '
'which are already present in '
'scan_dict.'.format(str(scan_number_list)))
[docs] def parse_raw(self):
"""parse_raw
Parse the raw source file/folder and populate the `scan_dict`.
"""
raise NotImplementedError('Needs to be implemented!')
[docs] def parse_nexus(self):
"""parse_nexus
Parse the nexus file and populate the `scan_dict`.
"""
self.log.info('parse_nexus')
nxs_file_path = path.join(self.nexus_file_path, self.nexus_file_name)
try:
nxs_file = nxs.nxload(nxs_file_path, mode='r')
except nxs.NeXusError:
raise nxs.NeXusError('NeXus file \'{:s}\' does not exist!'.format(nxs_file_path))
with nxs_file.nxfile:
for entry in nxs_file:
# check for scan number in given range
if (nxs_file[entry].number >= self.start_scan_number) and \
((nxs_file[entry].number <= self.stop_scan_number) or
(self.stop_scan_number == -1)):
last_scan_number = self.get_last_scan_number()
# check if Scan needs to be re-created
# if scan is not present, its the last one, or force overwrite
if (nxs_file[entry].number not in self.scan_dict.keys()) or \
(nxs_file[entry].number >= last_scan_number) or \
self.force_overwrite:
# create scan object
init_mopo = {}
for field in nxs_file[entry].init_mopo:
init_mopo[field] = nxs_file[entry]['init_mopo'][field]
scan = Scan(int(nxs_file[entry].number),
cmd=nxs_file[entry].cmd,
date=nxs_file[entry].date,
time=nxs_file[entry].time,
int_time=float(nxs_file[entry].int_time),
header=nxs_file[entry].header,
init_mopo=init_mopo)
self.scan_dict[nxs_file[entry].number] = scan
# check if the data needs to be read as well
if self.read_all_data:
self.read_scan_data(self.scan_dict[nxs_file[entry].number])
[docs] def check_nexus_file_exists(self):
"""check_nexus_file_exists
Check if the nexus file is present and set `self.nexus_file_exists`.
"""
if path.exists(path.join(self.nexus_file_path, self.nexus_file_name)):
self.nexus_file_exists = True
else:
self.nexus_file_exists = False
[docs] def get_last_scan_number(self):
"""get_last_scan_number
Return the number of the last scan in the `scan_dict`.
If the `scan_dict` is empty return 0.
"""
try:
return sorted(self.scan_dict.keys())[-1]
except IndexError:
return 0
[docs] def get_all_scan_numbers(self):
"""get_all_scan_numbers
Return the all scan number from the `scan_dict`.
"""
try:
return sorted(self.scan_dict.keys())
except IndexError:
return 0
[docs] def get_scan(self, scan_number, read_data=True, dismiss_update=False):
"""get_scan
Returns a scan object from the scan dict determined by the scan_number.
Args:
scan_number (uint): number of the scan.
read_data (bool, optional): read data from source.
Defaults to `False`.
dismiss_update (bool, optional): Dismiss update even if set as
object attribute. Defaults to `False`.
Returns:
scan (Scan): scan object.
"""
self.log.debug('get_scan')
if self.update_before_read and not dismiss_update:
self.update(scan_number)
try:
scan = self.scan_dict[scan_number]
except KeyError:
raise KeyError('Scan #{:d} not found in scan dict.'.format(scan_number))
if read_data:
self.read_scan_data(scan)
return scan
[docs] def get_scan_list(self, scan_number_list, read_data=True):
"""get_scan_list
Returns a list of scan object from the `scan_dict` determined by
the list of scan_number.
Args:
scan_number_list (list(uint)): list of numbers of the scan.
read_data (bool, optional): read data from source.
Defaults to `False`.
Returns:
scans (list(Scan)): list of scan object.
"""
self.log.debug('get_scan_list')
if self.update_before_read:
self.update(scan_number_list)
scans = []
for scan_number in scan_number_list:
scan = self.get_scan(scan_number, read_data, dismiss_update=True)
scans.append(scan)
return scans
[docs] def get_scan_data(self, scan_number):
"""get_scan_data
Returns data and meta information from a scan object from the `scan_dict`
determined by the scan_number.
Args:
scan_number (uint): number of the scan.
Returns:
data (numpy.recarray[float]): scan data.
meta (dict()): scan meta information.
"""
self.log.debug('get_scan_data')
scan = self.get_scan(scan_number)
if scan.data is not None:
data = scan.data.copy()
else:
data = None
meta = scan.meta.copy()
if self.read_and_forget:
scan.clear_data()
return data, meta
[docs] def get_scan_list_data(self, scan_number_list):
"""get_scan_list_data
Returns data and meta information for a list of scan objects from
the `scan_dict` determined by the scan_numbers.
Args:
scan_number_list (list(uint)): list of numbers of the scan.
Returns:
data (list(numpy.recarray[float])): list of scan data.
meta (list(dict())): list scan meta information.
"""
self.log.debug('get_scan_list_data')
data_list = []
meta_list = []
for scan in self.get_scan_list(scan_number_list):
data_list.append(scan.data.copy())
meta_list.append(scan.meta.copy())
if self.read_and_forget:
scan.clear_data()
return data_list, meta_list
[docs] def read_scan_data(self, scan):
"""read_scan_data
Reads the data for a given scan object.
Args:
scan (Scan): scan object.
"""
self.log.debug('read_scan_data for scan #{:d}'.format(scan.number))
last_scan_number = self.get_last_scan_number()
if (scan.data is None) or \
(scan.number >= last_scan_number) or self.force_overwrite:
if self.use_nexus:
self.read_nexus_scan_data(scan)
else:
self.read_raw_scan_data(scan)
else:
self.log.debug('data not updated for scan #{:d}'.format(scan.number))
[docs] def read_raw_scan_data(self, scan):
"""read_raw_scan_data
Reads the data for a given scan object from raw source.
Args:
scan (Scan): scan object.
"""
raise NotImplementedError('Needs to be implemented!')
[docs] def read_nexus_scan_data(self, scan):
"""read_nexus_scan_data
Reads the data for a given scan object from the nexus file.
Args:
scan (Scan): scan object.
"""
self.log.debug('read_nexus_scan_data for scan #{:d}'.format(scan.number))
# try to open the file
nxs_file_path = path.join(self.nexus_file_path, self.nexus_file_name)
try:
nxs_file = nxs.nxload(nxs_file_path, mode='r')
except nxs.NeXusError:
raise nxs.NeXusError('NeXus file \'{:s}\' does not exist!'.format(nxs_file_path))
entry_name = 'entry{:d}'.format(scan.number)
# try to enter entry
try:
entry = nxs_file[entry_name]
except nxs.NeXusError:
self.log.exception('Entry #{:d} not present in NeXus file!'.format(scan.number))
return
# iterate through data fields
data_list = []
dtype_list = []
for field in entry.data:
data_list.append(entry.data[field])
dtype_list.append((field, entry.data[field].dtype, entry.data[field].shape))
if len(data_list) > 0:
scan.data = fromarrays(data_list, dtype=dtype_list)
else:
scan.data = None
[docs] def clear_scan_data(self, scan):
"""clear_scan_data
Clear the data for a given scan object.
Args:
scan (Scan): scan object.
"""
self.log.debug('clear_scan_data')
scan.clear_data()
[docs] def read_all_scan_data(self):
"""read_all_scan_data
Reads the data for all scan objects in the `scan_dict` from source.
"""
self.log.debug('read_all_scan_data')
for scan_number, scan in self.scan_dict.items():
self.read_scan_data(scan)
[docs] def clear_all_scan_data(self):
"""clear_all_scan_data
Clears the data for all scan objects in the `scan_dict`.
"""
self.log.debug('clear_all_scan_data')
for scan_number, scan in self.scan_dict.items():
self.clear_scan_data(scan)
[docs] def save_scan_to_nexus(self, scan, nxs_file=''):
"""save_scan_to_nexus
Saves a scan to the nexus file.
"""
if nxs_file == '':
nxs_file = self.get_nexus_file()
entry_name = 'entry{:d}'.format(scan.number)
# evaluate if we need to forget the data again
if scan.data is None:
clear_data = True
else:
clear_data = False
# read the raw data
self.read_raw_scan_data(scan)
self.log.info('save_scan_to_nexus for scan #{:d}'.format(scan.number))
with nxs_file.nxfile:
# if the entry already exists, it must be deleted in advance
try:
del nxs_file[entry_name]
except nxs.NeXusError:
pass
# (re-)create entry
entry = nxs_file[entry_name] = nxs.NXentry()
# iterate meta information
for key, value in scan.meta.items():
if key == 'init_mopo':
# create dedicated collection for initial motor positions
entry['init_mopo'] = nxs.NXcollection()
# iterate through initial motor positions
for mopo_key, mopo_value in scan.meta['init_mopo'].items():
entry.init_mopo[mopo_key] = nxs.NXfield(mopo_value)
else:
# add meta information as attribute to entry
entry.attrs[key] = value
# create dedicated collection for data
entry['data'] = nxs.NXcollection()
# check if there is any data present at all
if scan.data is not None:
# iterate data
for col in scan.data.dtype.names:
entry.data[col] = nxs.NXfield(scan.data[col])
# clear data of the scan if it was not present before
# or read and forget
if clear_data or self.read_and_forget:
scan.clear_data()
[docs] def save_all_scans_to_nexus(self):
"""save_all_scans_to_nexus
Saves all scan objects in the `scan_dict` to the nexus file.
"""
self.log.info('save_all_scans_to_nexus')
nxs_file = self.get_nexus_file()
try:
last_scan_in_nexus = sorted(int(num.strip('entry')) for num in nxs_file.keys())[-1]
except IndexError:
last_scan_in_nexus = -1
for scan_number, scan in self.scan_dict.items():
entry_name = 'entry{:d}'.format(scan.number)
try:
_ = nxs_file[entry_name]
scan_in_nexus = True
except (KeyError, nxs.NeXusError):
scan_in_nexus = False
if (not scan_in_nexus) or (scan.number >= last_scan_in_nexus) \
or self.force_overwrite:
self.save_scan_to_nexus(scan, nxs_file)
[docs] def get_nexus_file(self, mode='rw'):
"""get_nexus_file
Return the file handle to the NeXus file in a given ``mode```.
Args:
mode (str, optional): file mode. defaults to 'rw'.
Returns:
nxs_file (NXFile): file handle to NeXus file.
"""
self.log.debug('get_nexus_file')
try:
nxs_file = nxs.nxload(path.join(self.nexus_file_path, self.nexus_file_name), mode='rw')
except nxs.NeXusError:
nxs.NXroot().save(path.join(self.nexus_file_path, self.nexus_file_name))
nxs_file = nxs.nxload(path.join(self.nexus_file_path, self.nexus_file_name), mode='rw')
return nxs_file
@property
def nexus_file_name(self):
return self._nexus_file_name
@nexus_file_name.setter
def nexus_file_name(self, nexus_file_name):
self._nexus_file_name = nexus_file_name + self.nexus_file_name_postfix + '.nxs'
@property
def start_scan_number(self):
return self._start_scan_number
@start_scan_number.setter
def start_scan_number(self, start_scan_number):
if start_scan_number < 0:
self.log.warning('start_scan_number must not be negative!')
return
elif (start_scan_number > self.stop_scan_number) and (self.stop_scan_number > -1):
self.log.warning('start_scan_number must be <= stop_scan_number!')
return
else:
self._start_scan_number = start_scan_number
@property
def stop_scan_number(self):
return self._stop_scan_number
@stop_scan_number.setter
def stop_scan_number(self, stop_scan_number):
if stop_scan_number < -1:
self.log.warning('stop_scan_number cannot be smaller than -1!')
return
elif (stop_scan_number < self.start_scan_number) and (stop_scan_number > -1):
self.log.warning('stop_scan_number must be >= start_scan_number!')
return
else:
self._stop_scan_number = stop_scan_number