Commit b362397d authored by Sylvain Guieu's avatar Sylvain Guieu

first commit of hrm tools

parents
import os
config_path = []
## by default look in the share directory
__module_dir__, _ = os.path.split(__file__)
__package_dir__, __pkg__ = os.path.split(__module_dir__)
config_path.append(os.path.join(__package_dir__, "share", __pkg__, "config"))
# also include the module/config directory in case it was not installed py setup.py
config_path.append(os.path.join(__module_dir__, "config"))
## Loockup table that give information on device properties
## So far this is just a loockup for column names in xls sheet
## DO NOT edit so far it is used to transform xls column name to python compatible names
##
level2:
xls_col_name: Level 2
ftype: 'str_choice'
level3:
xls_col_name: Level 3
ftype: 'str_choice'
unique_letter:
xls_col_name: Unique letter
ftype: 'str_choice'
elec_name:
xls_col_name: Elec name in IF map
ftype: 'str_choice'
id:
xls_col_name: 'Unique name (ID), max 10 characters'
other_id:
xls_col_name: Other proposal
pbs_code:
xls_col_name: PBS code
ftype: 'str'
device_name:
xls_col_name: Device name
also_called:
xls_col_name: "Also called\u2026"
device_type:
xls_col_name: HW device type
ftype: 'str_choice'
quantity:
xls_col_name: '#'
ftype: 'int'
quantity_comment:
xls_col_name: Comment (nb of devices)
motor_type:
xls_col_name: Motor type (if motor)
ftype: 'str_choice'
use_case:
xls_col_name: Use case
part:
xls_col_name: Part
ftype: 'str_choice'
operations_per_day:
xls_col_name: Operations per day
interlock:
xls_col_name: Interlock
external_sync:
xls_col_name: External sync
signal_frame_rate:
xls_col_name: Signal/frame rate (Hz)
thermal_domain:
xls_col_name: Thermal domain
rot_domain:
xls_col_name: Rot. domain
range:
xls_col_name: Range
accuracy:
xls_col_name: Accuracy
repeatability:
xls_col_name: Precision or repeatability
plc:
xls_col_name: PLC
ftype: 'str_choice'
physical_network:
xls_col_name: Physical network
ftype: 'str_choice'
wp_responsible:
xls_col_name: WP responsible for low-level SW control
ftype: 'str_choice'
additional_analog_sensor:
xls_col_name: Additional analog sensors
additional_analog_sensor_quantity:
xls_col_name: '#2'
additional_analog_sensor_quantity_comment:
xls_col_name: Comments (#)
controller:
xls_col_name: Controller
fcs_detail:
xls_col_name: Detail on FCS device if any
fcs_device_quantity:
xls_col_name: Nb of SW FCS devices
fits_category:
xls_col_name: FITS category
fits_kw:
xls_col_name: FITS KW
fits_name:
xls_col_name: FITS NAME. Prefix the name with a 4 letter mnemonic followed
by some optical characteristics (max 30 characters)
fits_sub_system:
xls_col_name: FITS sub-system
model_comment:
xls_col_name: Comments (model)
pbs_code:
xls_col_name: PBS code
power:
xls_col_name: Power
sensor_grouping:
xls_col_name: Sensor grouping
sw_control_master:
xls_col_name: SW control master
sw_device:
xls_col_name: SW device on instrument SW (WP31)
sw_supervision:
xls_col_name: Supervision SW
import re
filter_loockup = {}
class Filter:
def __init__(self, data):
self.data = data
def check(self, value):
raise NotImplementedError('check')
class LEFilter(Filter):
symbol = '<='
def check(self, value):
try:
v = float(value)
except (ValueError, TypeError):
return False
return v <= self.data
filter_loockup['<='] = LEFilter
class GEFilter(Filter):
symbol = '>='
def check(self, value):
try:
v = float(value)
except (ValueError, TypeError):
return False
return v >= self.data
filter_loockup['>='] = GEFilter
class LTFilter(Filter):
symbol = '<'
def check(self, value):
try:
v = float(value)
except (ValueError, TypeError):
return False
return v < self.data
filter_loockup['<'] = LTFilter
class GTFilter(Filter):
symbol = '>'
def check(self, value):
try:
v = float(value)
except (ValueError, TypeError):
return False
return v > self.data
filter_loockup['>'] = GTFilter
class EQFilter(Filter):
symbol = '=='
def check(self, value):
if isinstance(self.data, (float,int)):
try:
v = float(value)
except (ValueError, TypeError):
return v == self.data
return v == self.data
else:
return value == self.data
filter_loockup['=='] = EQFilter
class SMFilter(Filter):
symbol = '~'
def __init__(self, data, treshold=80):
"""
Args
----
data: string to compare with
treshold: percentil keeper of string match
"""
self.data = data
self.treshold = treshold
def check(self, value):
return similarity(self.data, value)>=self.treshold
filter_loockup['~'] = SMFilter
class REFilter(Filter):
symbol = '%'
def __init__(self, re_exp):
if isinstance(re_exp, re.Pattern):
self.data = re_exp
else:
self.data = re.compile('.*'+re_exp, re.IGNORECASE)
def check(self, value):
try:
return self.data.match(value) is not None
except TypeError:
return False
filter_loockup['%'] = REFilter
# ##############################################################
#
# Some functions
#
# ##############################################################
def _longest_common_substring(s1, s2):
m = [[0] * (1 + len(s2)) for i in range(1 + len(s1))]
longest, x_longest = 0, 0
for x in range(1, 1 + len(s1)):
for y in range(1, 1 + len(s2)):
if s1[x - 1] == s2[y - 1]:
m[x][y] = m[x - 1][y - 1] + 1
if m[x][y] > longest:
longest = m[x][y]
x_longest = x
else:
m[x][y] = 0
return s1[x_longest - longest: x_longest]
def similarity(s1, s2):
return 2. * len(_longest_common_substring(s1, s2)) / (len(s1) + len(s2)) * 100
from ..db import filters
from . import keys as K
def filter_device(devices, field, filter):
if hasattr(filter, "check"):
checker = filter.check
else:
if hasattr(filter, "__call__"):
checker = filter
else:
try:
s, data = filter
except (ValueError, TypeError):
raise ValueError('filter must be a 2 tuple, a function or a Filter object got a %s'%type(filter))
try:
FilterClass = filters.filter_loockup[s]
except KeyError:
raise ValueError('Unknown filter %r'%s)
else:
checker = FilterClass(data).check
if isinstance(field, (int,str)):
if field == "*":
try:
first_key = next(iter(devices))
except StopIteration: # empty dictionary
return {}
else:
fields = [k for k,d in devices[first_key].items() if isinstance(d, dict)]
return _filter_fields(devices, fields, checker)
else:
return _filter_one_field(devices, field, checker)
else:
return _filter_fields(devices, field, checker)
def _filter_one_field(devices, field, checker):
return {k:d for k,d in devices.items() if checker(d[field][K.VALUE])}
def _filter_fields(devices, fields, checker):
return {k:d for k,d in devices.items() if any( checker(d[field][K.VALUE]) for field in fields ) }
""" Provide classes to handle device list 'database' for friendly inline manipulation
"""
from . import api
from . import ioxls
from . import keys as K
class _FieldsHandler:
def fields(self):
""" iter on fields
fields can be interpreted as column name
each oteration return a Field object
"""
for field_dict in self._fields.values():
yield Field(field_dict)
def field_keys(self):
""" iter on field keys
fields can be interpreted as column name
each oteration return a str key value
"""
for field_name in self._fields.keys():
yield field_name
def field_items(self):
""" iter on field keys,Field object pairs
fields can be interpreted as column name
each oteration return a key, Field object pair
"""
for field_name, field_dict in self._fields.items():
yield field_name, Field(field_dict)
def get_field(self, field_name):
""" return a field object of the given field """
return Field(self._fields[field_name])
class _DevicesHandler:
def devices(self):
""" iter on device
each oteration return a Device object
"""
for device_dict in self._data.values():
yield Device(device_dict, self._fields)
def device_keys(self):
""" iter on device keys
each oteration return a str device key (device unique id)
"""
for device_name in self._data.keys():
yield device_name
def device_items(self):
""" iter on device
each oteration return a key, Device object pair
"""
for device_name,device_dict in self._data.items():
yield device_name, Device(device_dict, self._fields)
def get_device(self, device_name):
try:
device_dict = self._data[device_name]
except KeyError:
raise ValueError('device with name %r does not exists'%device_name)
else:
return Device(device_dict, self._fields)
def filter_device(self, field, filter):
devices_dict = api.filter_device(self._data, field, filter)
return Devices(devices_dict, self._fields)
def filter_device_or(self, *args):
if len(args)%2:
raise ValueError('expecting a even number of argument got %d'%len(args))
data = {}
for field, filter in zip(args[::2], args[1::2]):
data.update( api.filter_device(self._data, field, filter) )
return Devices(data, self._fields)
def filter_device_and(self, *args):
if len(args)%2:
raise ValueError('expecting a even number of argument got %d'%len(args))
data = self._data
for field, filter in zip(args[::2], args[1::2]):
data = api.filter_device(data, field, filter)
return Devices(data, self._fields)
class Devices(_FieldsHandler, _DevicesHandler):
""" Device List object handler
The object must be initialised witj the class method `.from_xls` from a xls device list file
>>> devices = Devices.from_xls('/path/to/HRM-00485_HCS_deviceList-3.xlsx')
Methods
-------
devices: iterator on Device objects
device_keys: iterator on device unique names (id)
device_items: iterator on 'device unique names'/'Device object' pair
get_device(device_name): return Device object of given name
fields: iterator on Field objects
field_keys: iterator on field unique names (id)
field_items: iterator on 'field unique names'/'Field object' pair
get_field(field_name): return Field object of given name
"""
_data = None
_fields = None
def __init__(self, data_dict, fields_dict):
self._data = data_dict
self._fields = fields_dict
def __getattr__(self, attr):
try:
return object.__getattribute__(self, attr)
except AttributeError:
pass
try:
return self.__dict__[attr]
except KeyError:
try:
return self.get_device(attr)
except ValueError:
raise AttributeError('%r is not an attribute of Devices neither a device name'%attr)
def __getitem__(self, item):
try:
return self.get_device(item)
except ValueError:
raise KeyError('%r'%item)
def __iter__(self):
for device in self.devices():
yield device
def __dir__(self):
return self.device_keys()
def __len__(self):
return len(self._data)
def keys(self):
return self.device_keys()
@classmethod
def from_xls(cls, file_path, version_cell=None, rules=None):
data_dict, fields_dist = ioxls.open_device_list(file_path, version_cell=version_cell, rules=rules)
return cls(data_dict, fields_dist)
class Field:
def __init__(self, field_dict):
self._field = field_dict
def parse(self, value):
""" parse a new value for this field
WARNING: some field property can be changed (e.g. 'choices' or 'min', 'max')
Use only this method just before adding a new value in the data.
"""
value = self._field[K.PARSER](self._field, value)
return value
@property
def name(self):
return self._field[K.NAME]
@property
def choices(self):
return self._field[K.CHOICES]
def __getattr__(self, attr):
try:
return self.__dict__[attr]
except KeyError:
try:
value = self._field[attr]
except KeyError:
raise AttributeError('%r is not an attribute of Field neither a property name'%attr)
else:
return value
def __getitem__(self, item):
try:
value = self._field[item]
except KeyError:
raise KeyError('%r'%item)
else:
return value
def __dir__(self):
return self._field.keys()
class Device(_FieldsHandler):
_data = None
_fields = None
def __init__(self, device_dict, fields_dict):
self._data = device_dict
self._fields = fields_dict
def keys(self):
return self.property_keys()
def properties(self):
""" iter on value properties
each iteration return a DeviceProperty object
"""
for field_name in self.field_keys():
yield DeviceProperty(self._data[field_name])
def property_keys(self):
""" iter on value property keys
each iteration return a str key
"""
for field_name in self.field_keys():
yield field_name
def property_items(self):
""" iter on value property keys
each iteration return a str key / DeviceProperty object pair
"""
for field_name in self.field_keys():
yield field_name, DeviceProperty(self._data[field_name])
def get_property(self, field_name):
try:
property_dict = self._data[field_name]
except KeyError:
raise ValueError('%r is not a property of device')
return DeviceProperty(property_dict)
def __getattr__(self, attr):
try:
return self.__dict__[attr]
except KeyError:
try:
deviceProperty = self.get_property(attr)
except ValueError:
raise AttributeError('%r is not an attribute of Device neither a property name'%attr)
else:
return deviceProperty.value
def __getitem__(self, item):
try:
deviceProperty = self.get_property(item)
except ValueError:
raise KeyError('%r'%item)
else:
return deviceProperty.value
def __repr__(self):
txt = []
for property in self.properties():
txt.append(repr(property))
return "\n".join(txt)
def __dir__(self):
return self._fields.keys()
class DeviceProperty:
def __init__(self, property_dict):
self._data = property_dict
@property
def field(self):
return Field(self._data[K.FIELD])
@property
def value(self):
return self._data[K.VALUE]
@property
def cell(self):
return self._data[K.CELL]
def __repr__(self):
return f"{self.field.name}: {self.value}"
def __dir__(self):
return self._data[K.FIELD].keys()
\ No newline at end of file
from . import keys as K
from ..io import ioconfig
#### ################################
#
# Function to load the devices yaml files.
# So far all data information are found inside the xls sheet however to work
# correctly more information about data fields are inside yaml file found in
# the config directory.
# The yaml files define the correspondance between xls column names and the more
# python compatible name of data fields as well as the type of data found in each
# columns.
#
#### ################################
###
# define some parser for the field types
FTYPE = 'ftype'
STR_CHOICE = 'str_choice'
INT = 'int'
STR = 'str'
_ftype_loockup = {}
def add_ftype(cls):
_ftype_loockup[cls.ftype] = cls
def get_ftype(ftype):
return _ftype_loockup[ftype]
class FType:
ftype = ''
def init_field(self, name, d):
# just a copy
field = dict(d)
field[K.PARSER] = self.parse_value
field.setdefault(K.CHOICES, None)
field[K.NAME] = name
return field
def parse_value(self, field, value):
return value
##
# a str_choice is a choice of string.
# the set of choices is expended when new data arrive inside the 'choices' keyword
# of the field property
class StrChoice(FType):
ftype = STR_CHOICE
def init_field(self, name, d):
field = FType.init_field(self, name, d)
field[K.CHOICES] = set()
field[K.PARSER] = self.parse_value
return field
def parse_value(self, field, value):
value = str(value)
field[K.CHOICES].add(value)
return value
add_ftype(StrChoice)
##
# juste parse as str
class Str(FType):
ftype = STR
def init_field(self, name, d):
field = FType.init_field(self, name, d)
field[K.PARSER] = self.parse_value
return field
def parse_value(self, field, value):
return str(value)
add_ftype(Str)
##
#