Source code for pyEvalData.io.sardana_nexus

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# The MIT License (MIT)
# Copyright (c) 2015-2021 Daniel Schick
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
# OR OTHER DEALINGS IN THE SOFTWARE.


from numpy.core.records import fromarrays
import nexusformat.nexus as nxs
import os

from .source import Source
from .scan import Scan

__all__ = ['SardanaNeXus']

__docformat__ = 'restructuredtext'


[docs]class SardanaNeXus(Source): """SardanaNeXus Source implementation for Sardana NeXus files. Args: file_name (str): file name including extension, can include regex pattern. file_path (str, optional): file path - defaults to ``./``. Keyword Args: start_scan_number (uint): start of scan numbers to parse. stop_scan_number (uint): stop of scan numbers to parse. This number is included. nexus_file_name (str): name for generated nexus file. nexus_file_name_postfix (str): postfix for nexus file name. nexus_file_path (str): path for generated nexus file. read_all_data (bool): read all data on parsing. If false, data will be read only on demand. read_and_forget (bool): clear data after read to save memory. update_before_read (bool): always update from source before reading scan data. use_nexus (bool): use nexus file to join/compress raw data. force_overwrite (bool): forced re-read of raw source and re-generated of nexus file. Attributes: log (logging.logger): logger instance from logging. name (str): name of the source scan_dict (dict(scan)): dict of scan objects with key being the scan number. start_scan_number (uint): start of scan numbers to parse. stop_scan_number (uint): stop of scan numbers to parse. This number is included. file_name (str): file name including extension, can include regex pattern. file_path (str, optional): file path - defaults to ``./``. nexus_file_name (str): name for generated nexus file. nexus_file_name_postfix (str): postfix for nexus file name. nexus_file_path (str): path for generated nexus file. nexus_file_exists(bool): if nexus file exists. read_all_data (bool): read all data on parsing. read_and_forget (bool): clear data after read to save memory. update_before_read (bool): always update from source before reading scan data. use_nexus (bool): use nexus file to join/compress raw data. force_overwrite (bool): forced re-read of raw source and re-generated of nexus file. """ def __init__(self, file_name, file_path, **kwargs): super().__init__(file_name, file_path, **kwargs)
[docs] def parse_raw(self): """parse_raw Parse the Sardana NeXus file and populate the `scan_dict`. """ self.log.info('parse_raw') nxs_file_path = os.path.join(self.file_path, self.file_name) try: os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE" nxs_file = nxs.nxload(nxs_file_path, mode='r') except nxs.NeXusError: raise nxs.NeXusError('Sardana NeXus file \'{:s}\' does not exist!'.format( nxs_file_path)) with nxs_file.nxfile: for entry in nxs_file: # check for scan number in given range entry_number = int(nxs_file[entry].entry_identifier) if (entry_number >= self.start_scan_number) and \ ((entry_number <= self.stop_scan_number) or (self.stop_scan_number == -1)): last_scan_number = self.get_last_scan_number() # check if Scan needs to be re-created # if scan is not present, its the last one, or force overwrite if (entry_number not in self.scan_dict.keys()) or \ (entry_number >= last_scan_number) or \ self.force_overwrite: # create scan object init_mopo = {} for field in nxs_file[entry].measurement.pre_scan_snapshot: init_mopo[field] = \ nxs_file[entry]['measurement/pre_scan_snapshot'][field] scan = Scan(int(entry_number), cmd=nxs_file[entry].title, date=nxs_file[entry].start_time, time=nxs_file[entry].start_time, int_time=float(0), header='', init_mopo=init_mopo) self.scan_dict[entry_number] = scan # check if the data needs to be read as well if self.read_all_data: self.read_scan_data(self.scan_dict[entry_number])
[docs] def read_raw_scan_data(self, scan): """read_raw_scan_data Reads the data for a given scan object from Sardana NeXus file. Args: scan (Scan): scan object. """ self.log.info('read_raw_scan_data for scan #{:d}'.format(scan.number)) # try to open the file nxs_file_path = os.path.join(self.file_path, self.file_name) try: os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE" nxs_file = nxs.nxload(nxs_file_path, mode='r') except nxs.NeXusError: raise nxs.NeXusError('Sardana NeXus file \'{:s}\' does not exist!'.format( nxs_file_path)) entry_name = 'entry{:d}'.format(scan.number) # try to enter entry try: entry = nxs_file[entry_name] except nxs.NeXusError: self.log.exception('Entry #{:d} not present in NeXus file!'.format(scan.number)) return # iterate through data fields data_list = [] dtype_list = [] for field in entry.measurement: # do not add data which is already in the pre-scan snapshot # that is tricky if it is in the snapshot and scanned ... if field != 'pre_scan_snapshot': data_list.append(entry.measurement[field]) dtype_list.append((field, entry.measurement[field].dtype, entry.measurement[field].shape)) if len(data_list) > 0: scan.data = fromarrays(data_list, dtype=dtype_list) else: scan.date = None