From 40288f41f4c1f54ffff92876985c8dea26dd19ca Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Keck <jean-baptiste.keck@imag.fr> Date: Sat, 17 Feb 2018 22:48:22 +0100 Subject: [PATCH] caching utils --- CMakeLists.txt | 22 +-- hysop/backend/device/opencl/clpeak.py | 155 +++++++++++++----- hysop/backend/device/opencl/opencl_device.py | 40 ++++- .../backend/device/opencl/opencl_platform.py | 2 +- hysop/backend/hardware/hwinfo.py | 18 +- hysop/tools/cache.py | 105 ++++++++++++ hysop/tools/hysop_ls.py | 29 ++-- hysop/tools/units.py | 3 + 8 files changed, 297 insertions(+), 77 deletions(-) create mode 100644 hysop/tools/cache.py diff --git a/CMakeLists.txt b/CMakeLists.txt index 265e6fec0..1ea74125e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -131,20 +131,20 @@ set(HYSOP_LINK_LIBRARIES CACHE INTERNAL "List of external libraries.") find_package(PythonFull REQUIRED) include(FindPythonModule) # - python packages - -find_python_module(numpy REQUIRED) -find_python_module(scipy REQUIRED) -find_python_module(scitools REQUIRED) -find_python_module(h5py REQUIRED) -find_python_module(sympy REQUIRED) -find_python_module(psutil REQUIRED) -find_python_module(cpuinfo REQUIRED) -find_python_module(gmpy2 REQUIRED) +find_python_module(numpy REQUIRED) +find_python_module(scipy REQUIRED) +find_python_module(scitools REQUIRED) +find_python_module(h5py REQUIRED) +find_python_module(sympy REQUIRED) +find_python_module(psutil REQUIRED) +find_python_module(cpuinfo REQUIRED) +find_python_module(gmpy2 REQUIRED) find_python_module(subprocess32 REQUIRED) find_python_module(editdistance REQUIRED) -find_python_module(graph_tool REQUIRED) - +find_python_module(portalocker REQUIRED) +find_python_module(graph_tool REQUIRED) # --- OpenCL --- -find_python_module(pyopencl REQUIRED) +find_python_module(pyopencl REQUIRED) # --- MPI --- if(USE_MPI) find_package(MPI REQUIRED) diff --git a/hysop/backend/device/opencl/clpeak.py b/hysop/backend/device/opencl/clpeak.py index 294ee7427..9f7aa9b7f 100644 --- a/hysop/backend/device/opencl/clpeak.py +++ b/hysop/backend/device/opencl/clpeak.py @@ -1,10 +1,13 @@ -import tempfile, re, os, warnings +import tempfile, re, os, warnings, uuid, gzip, portalocker import subprocess32 as subprocess from xml.dom import minidom +from hysop.deps import pickle from hysop.tools.decorators import requires_cmd -from hysop.tools.units import bdw2str, flops2str +from hysop.tools.units import bdw2str, flops2str, iops2str from hysop.tools.warnings import HysopWarning +from hysop.tools.io_utils import IO +from hysop.tools.cache import load_attributes_from_cache, update_cache class ClPeakInfo(object): __FNULL = open(os.devnull, 'w') @@ -32,35 +35,66 @@ class ClPeakInfo(object): 'yflops': 1e24 } + __int_results = ('int', 'int2', 'int4', 'int8', 'int16') __float_results = ('float', 'float2', 'float4', 'float8', 'float16') __double_results = ('double', 'double2', 'double4', 'double8', 'double16') + + __cached_datakeys = ('_preferred_exec_params', + '_transfer_bandwidth_values', '_global_bdw_values', + '_sp_compute_values', '_dp_compute_values', '_int_compute_values') + + def _cache_file(self): + return IO.cache_path() + '/hardware/clpeak.pklz' + + def _load_from_cache(self, key): + filepath = self._cache_file() + success = load_attributes_from_cache(filepath, key, self, self.__cached_datakeys) + return success + + def _update_cache_data(self, key): + filepath = self._cache_file() + cached_data = dict(zip(self.__cached_datakeys, (getattr(self, cdk) + for cdk in self.__cached_datakeys))) + update_cache(filepath, key, cached_data) def __init__(self, platform_name, device_name, - platform_id, device_id, is_cpu): + platform_id, device_id, is_cpu, + override_cache=False): self.platform_name = platform_name self.device_name = device_name self.platform_id = platform_id self.device_id = device_id self.is_cpu = is_cpu - - self._preferred_exec_params = None - self._exec_node_id = None - self._exec_cpu_id = None - self._transfer_bandwidth_values = None - self._optimal_transfer_bandwidth_values = None - self._gather_transfer_bandwidth_info(is_cpu) + _uuid = uuid.getnode(); + key = (_uuid, platform_name, device_name, platform_id, device_id, is_cpu) + + if override_cache: + success = False + else: + success = self._load_from_cache(key) + + if not success: + self._exec_node_id = None + self._exec_cpu_id = None + self._preferred_exec_params = None + + self._transfer_bandwidth_values = None + self._gather_transfer_bandwidth_info(is_cpu) - self._global_bdw_values = None - self._mean_global_bdw = None - self._gather_global_bandwidth_info() + self._global_bdw_values = None + self._gather_global_bandwidth_info() - self._sp_compute_values = None - self._dp_compute_values = None - self._max_sp_compute = None - self._max_dp_compute = None - self._gather_compute_info() + self._sp_compute_values = None + self._dp_compute_values = None + self._int_compute_values = None + self._gather_compute_info() + + self._update_cache_data(key) + + del self._exec_node_id + del self._exec_cpu_id @property def has_memory_bandwidth(self): @@ -74,6 +108,9 @@ class ClPeakInfo(object): @property def has_double_precision_compute(self): return (self._dp_compute_values is not None) + @property + def has_integer_compute(self): + return (self._int_compute_values is not None) @property def preferred_exec_params(self): @@ -83,13 +120,19 @@ class ClPeakInfo(object): return self._transfer_bandwidth_values @property def optimal_transfer_bandwidth_values(self): - return self._optimal_transfer_bandwidth_values + if (self._transfer_bandwidth_values is None): + return None + else: + return self._transfer_bandwidth_values[self._preferred_exec_params] @property def global_bdw_values(self): return self._global_bdw_values @property def mean_global_bdw(self): - return self._mean_global_bdw + if (self._global_bdw_values is None): + return None + else: + return sum(self._global_bdw_values.values()) / len(self._global_bdw_values) @property def sp_compute_values(self): return self._sp_compute_values @@ -97,11 +140,26 @@ class ClPeakInfo(object): def dp_compute_values(self): return self._dp_compute_values @property + def int_compute_values(self): + return self._int_compute_values + @property def max_sp_compute(self): - return self._max_sp_compute + if (self._sp_compute_values is None): + return None + else: + return max(self._sp_compute_values.values()) @property def max_dp_compute(self): - return self._max_dp_compute + if (self._dp_compute_values is None): + return None + else: + return max(self._dp_compute_values.values()) + @property + def max_int_compute(self): + if (self._int_compute_values is None): + return None + else: + return max(self._int_compute_values.values()) def __str__(self): ss = [] @@ -118,9 +176,11 @@ class ClPeakInfo(object): ss += ['Read memory bandwidth (mapped): {}'.format(bdw2str(mapped_read))] ss += ['Write memory bandwidth (mapped): {}'.format(bdw2str(mapped_write))] if self.has_single_precision_compute: - ss += ['Single precision compute: {}'.format(flops2str(self.max_sp_compute))] + ss += ['Single precision compute: {} (SP)'.format(flops2str(self.max_sp_compute))] if self.has_double_precision_compute: - ss += ['Double precision compute: {}'.format(flops2str(self.max_dp_compute))] + ss += ['Double precision compute: {} (DP)'.format(flops2str(self.max_dp_compute))] + if self.has_integer_compute: + ss += ['Integer compute: {}'.format(iops2str(self.max_int_compute))] return '\n'.join(ss) def _gather_transfer_bandwidth_info(self, is_cpu): @@ -186,12 +246,10 @@ class ClPeakInfo(object): self._set_exec_params(*params[0]) self._preferred_exec_params = params[0] self._transfer_bandwidth_values = transfer_bandwidth - self._optimal_transfer_bandwidth_values = transfer_bandwidth[params[0]] else: self._set_exec_params(None, None) self._transfer_bandwidth_values = None self._preferred_exec_params = (None, None) - self._optimal_transfer_bandwidth_values = None def _gather_global_bandwidth_info(self): def handle_results(dev, cmd): @@ -209,31 +267,33 @@ class ClPeakInfo(object): assert len(attr)==1 attr = attr[0] global_bdw_values[element] = coeff * float(attr.firstChild.nodeValue.strip()) - mean_global_bdw = sum(global_bdw_values.values()) / len(global_bdw_values) self._global_bdw_values = global_bdw_values - self._mean_global_bdw = mean_global_bdw except AssertionError as e: msg='Failed to parse output of command:\n {}\n{}' msg=msg.format(cmd, e) warnings.warn(msg, HysopWarning) self._global_bdw_values = None - self._mean_global_bdw = None self._exec_clpeak(test='--global-bandwidth', handle_results=handle_results) def _gather_compute_info(self): - def handle_results(sp): + def handle_results(ctype): def _handle_results(dev, cmd): try: assert (dev is not None) - if sp: + compute_values = {} + if ctype=='float': tag='single_precision_compute' - compute_values = {} result_tags = self.__float_results - else: + elif ctype=='double': tag='double_precision_compute' - compute_values = {} result_tags = self.__double_results + elif ctype=='int': + tag='integer_compute' + result_tags = self.__int_results + else: + msg='Unkown ctype \'{}\'.'.format(ctype) + raise NotImplementedError(msg) compute = dev.getElementsByTagName(tag) assert len(compute)==1 compute = compute[0] @@ -245,24 +305,33 @@ class ClPeakInfo(object): assert len(attr)==1 attr = attr[0] compute_values[tag] = coeff * float(attr.firstChild.nodeValue.strip()) - if sp: + if ctype=='float': self._sp_compute_values = compute_values - self._max_sp_compute = max(compute_values.values()) - else: + elif ctype=='double': self._dp_compute_values = compute_values - self._max_dp_compute = max(compute_values.values()) + elif ctype=='int': + self._int_compute_values = compute_values + else: + msg='Unkown ctype \'{}\'.'.format(ctype) + raise NotImplementedError(msg) except AssertionError as e: msg='Failed to parse output of command:\n {}\n{}' msg=msg.format(cmd, e) warnings.warn(msg, HysopWarning) - if sp: - self._max_sp_compute = None + if ctype=='float': + self._sp_compute_values = None + elif ctype=='double': + self._dp_compute_values = None + elif ctype=='int': + self._int_compute_values = None else: - self._max_dp_compute = None + msg='Unkown ctype \'{}\'.'.format(ctype) + raise NotImplementedError(msg) return _handle_results - self._exec_clpeak(test='--compute-sp', handle_results=handle_results(True)) - self._exec_clpeak(test='--compute-dp', handle_results=handle_results(False)) + self._exec_clpeak(test='--compute-sp', handle_results=handle_results(ctype='float')) + self._exec_clpeak(test='--compute-dp', handle_results=handle_results(ctype='double')) + self._exec_clpeak(test='--compute-integer', handle_results=handle_results(ctype='int')) def _set_exec_params(self, membind, cpubind): self._exec_node_id = membind diff --git a/hysop/backend/device/opencl/opencl_device.py b/hysop/backend/device/opencl/opencl_device.py index eb5718792..b358d77c6 100644 --- a/hysop/backend/device/opencl/opencl_device.py +++ b/hysop/backend/device/opencl/opencl_device.py @@ -1,4 +1,4 @@ -import re +import re, fractions from hysop import vprint from hysop.deps import np from hysop.backend.device.opencl import cl @@ -7,6 +7,7 @@ from hysop.tools.units import bytes2str, freq2str, time2str from hysop.backend.device.logical_device import LogicalDevice, UnknownDeviceAttribute from hysop.tools.string_utils import prepend from hysop.backend.device.opencl.clpeak import ClPeakInfo +from hysop.tools.units import bdw2str, flops2str, iops2str def opencl_version_atleast(major, minor, returned=UnknownDeviceAttribute()): def decorator(f): @@ -133,6 +134,8 @@ class OpenClDevice(LogicalDevice): 'me_version_intel', 'num_simultaneous_interops_intel', 'simultaneous_interops_intel', 'max_atomic_counters_ext', ) + + __all_attrs = __attrs + ('_simd_lane_size', '_usable_local_mem_size', '_real_name') def __init__(self, platform, platform_handle, device_id, device_handle, hardware_topo, **kwds): @@ -176,7 +179,7 @@ class OpenClDevice(LogicalDevice): device_name=self._real_name, device_id=self.device_id, is_cpu=(self.type()==DeviceType.CPU)) - print info + self._clpeak_info = info def _extract_cl_version(self, device_handle): version = device_handle.version @@ -245,8 +248,39 @@ bytes2str(self.max_global_alloc_size()), bytes2str(self.local_mem_size(), decimal=False), 'yes, up to {} subdevices'.format(self.max_subdevices()) if self.has_device_partition_support() else 'no', 'no match', -ind=ind, inc=inc) +ind=ind, inc=inc) + msg += '\n'+self.clpeak_info_summary(indent=indent, increment=increment) return msg + + + def clpeak_info_summary(self, indent, increment): + ind=' '*indent + inc=' '*increment + info = self._clpeak_info + ss = ['{}{}|CLPEAK BENCHMARK:'.format(ind,inc)] + if info.has_memory_bandwidth: + ss += ['{{s}}global memory bandwidth: {}'.format(bdw2str(info.mean_global_bdw))] + if info.has_transfer_bandwdith: + otbv = info.optimal_transfer_bandwidth_values + read = otbv['enqueuereadbuffer'] + write = otbv['enqueuewritebuffer'] + mapped_read = otbv['memcpy_from_mapped_ptr'] + mapped_write = otbv['memcpy_to_mapped_ptr'] + ss += ['{{s}}read memory bandwidth: {}'.format(bdw2str(read))] + ss += ['{{s}}write memory bandwidth: {}'.format(bdw2str(write))] + ss += ['{{s}}read memory bandwidth (mapped): {}'.format(bdw2str(mapped_read))] + ss += ['{{s}}write memory bandwidth (mapped): {}'.format(bdw2str(mapped_write))] + if info.has_integer_compute: + ss += ['{{s}}integer compute: {}'.format(iops2str(info.max_int_compute))] + if info.has_single_precision_compute: + ss += ['{{s}}single precision compute: {} (SP)'.format(flops2str(info.max_sp_compute))] + if info.has_double_precision_compute: + ss += ['{{s}}double precision compute: {} (DP)'.format(flops2str(info.max_dp_compute))] + if (info.has_single_precision_compute and info.has_double_precision_compute): + ratio = info.max_dp_compute/info.max_sp_compute + ratio = fractions.Fraction(ratio).limit_denominator(64) + ss += ['{{s}}Double to float compute ratio: {} (DP/SP)'.format(ratio)] + return '\n'.join(ss).format(s=(ind+inc+inc+' |-')) def device_summary(self, indent=0, increment=2): ind=' '*indent diff --git a/hysop/backend/device/opencl/opencl_platform.py b/hysop/backend/device/opencl/opencl_platform.py index 39a609f24..427ec63af 100644 --- a/hysop/backend/device/opencl/opencl_platform.py +++ b/hysop/backend/device/opencl/opencl_platform.py @@ -43,7 +43,7 @@ class OpenClPlatform(Platform): ind=' '*indent inc=' '*increment new_indent = indent + 2*increment - devices = ''.join(x.to_string(indent=new_indent, increment=increment, short=True) + devices = '\n'.join(x.to_string(indent=new_indent, increment=increment, short=True) for x in self.logical_devices.values()) sep = '\n{ind}{inc}{inc}{inc}|'.format(ind=ind, inc=inc) extensions = sep + sep.join(e.strip() for e in sorted(self._extensions.split(' ')) if e not in ('', ' ', '\t', '\n')) diff --git a/hysop/backend/hardware/hwinfo.py b/hysop/backend/hardware/hwinfo.py index bb410bc59..244a18b6d 100644 --- a/hysop/backend/hardware/hwinfo.py +++ b/hysop/backend/hardware/hwinfo.py @@ -1,4 +1,5 @@ +import uuid from xml.etree import cElementTree as ElementTree from abc import abstractmethod, ABCMeta @@ -11,6 +12,8 @@ from hysop.tools.decorators import requires_cmd from hysop.tools.contexts import printoptions from hysop.tools.string_utils import prepend, camel2snake from hysop.tools.units import bytes2str +from hysop.tools.io_utils import IO +from hysop.tools.cache import load_data_from_cache, update_cache from hysop.backend.hardware.pci_ids import PCIIds from hysop.core.mpi import is_multihost, interhost_comm, host_rank @@ -250,10 +253,17 @@ class Topology(TopologyObject): @classmethod @requires_cmd('lstopo') - def parse(cls, pciids = PCIIds()): - topology = subprocess.check_output(['lstopo', '-l', '-v', '--no-caches', '--cpuset', '--of', 'xml']) - topology = ElementTree.fromstring(topology) - return Topology(parent=None, topo=topology, pciids=pciids) + def parse(cls, pciids = PCIIds(), override_cache=False): + key = uuid.getnode() + filepath = IO.cache_path()+'/hardware/topologies.pklz' + topology = load_data_from_cache(filepath=filepath, key=key) + if (topology is None) or (not isinstance(topology, Topology)) \ + or override_cache: + topology = subprocess.check_output(['lstopo', '-l', '-v', '--no-caches', '--cpuset', '--of', 'xml']) + topology = ElementTree.fromstring(topology) + topology = Topology(parent=None, topo=topology, pciids=pciids) + update_cache(filepath, key, topology) + return topology def __init__(self, parent, topo, pciids): self._machine = None diff --git a/hysop/tools/cache.py b/hysop/tools/cache.py new file mode 100644 index 000000000..0c77b343a --- /dev/null +++ b/hysop/tools/cache.py @@ -0,0 +1,105 @@ + +try: + import cPickle as pickle +except: + import pickle + +import gzip, portalocker, contextlib, os, errno + +@contextlib.contextmanager +def lock_file(filepath, mode, compressed=True, + timeout=5, check_interval=0.25): + """ + Opens a locked file with specified mode, possibly compressed. + """ + _dir = os.path.dirname(filepath) + try: + os.makedirs(_dir) + except OSError as e: + if (e.errno != errno.EEXIST): + raise + if not os.path.exists(filepath): + open(filepath, 'a').close() + with portalocker.Lock(filename=filepath, timeout=timeout, mode=mode) as fl: + if compressed: + with gzip.GzipFile(fileobj=fl) as f: + yield f + else: + yield fl + +@contextlib.contextmanager +def read_only_lock(filepath, compressed=True, + timeout=5, check_interval=0.25): + """Opens a locked read only file, possibly compressed.""" + with lock_file(filepath=filepath, mode='r', compressed=compressed, + timeout=timeout, check_interval=check_interval) as f: + yield f + +@contextlib.contextmanager +def write_only_lock(filepath, compressed=True, + timeout=5, check_interval=0.25): + """Opens a locked write only file, possibly compressed.""" + with lock_file(filepath=filepath, mode='w', compressed=compressed, + timeout=timeout, check_interval=check_interval) as f: + yield f + + +def load_cache(filepath, match_type=dict, on_fail={}, **kwds): + """Load pickled data from filepath atomically.""" + data = on_fail + with read_only_lock(filepath, **kwds) as f: + try: + data = pickle.load(f) + if not isinstance(data, match_type): + raise pickle.UnpicklingError + except (EOFError, pickle.UnpicklingError, AttributeError): + data = on_fail + return data + +def update_cache(filepath, key, data, match_type=dict, on_fail={}, **kwds): + """ + Update cache entry in given file atomically with a (key,data) pair. + Cached data is a pickled dictionnary. + """ + cached_data = load_cache(filepath=filepath, match_type=match_type, + on_fail=on_fail, **kwds) + cached_data[key] = data + with write_only_lock(filepath=filepath, **kwds) as f: + pickle.dump(cached_data, f) + +def load_data_from_cache(filepath, key, match_type=dict, on_fail={}, **kwds): + """Load cached data from a given file atomically with given key.""" + data = load_cache(filepath=filepath, match_type=match_type, on_fail=on_fail) + if (key in data): + return data[key] + else: + return None + +def load_attributes_from_cache(filepath, key, instance, attrs, **kwds): + """ + Load cached entries from a given file atomically. + + Cached data is assumed to be a dictionnary. + If key is present in pickled data, try to get all + given attributes data by keys given in attrs. + Set instance attributes with those values. + + If one attribute is missing or key is not present + in loaded data, set all values to None in instance + and return False. + + Return True on success. + """ + success = False + data = load_cache(filepath=filepath, match_type=dict, **kwds) + if (key in data): + success = True + data = data[key] + for attr in attrs: + if (attr not in data.keys()): + for attr in attrs: + setattr(instance, attr, None) + success = False + break + setattr(instance, attr, data[attr]) + return success diff --git a/hysop/tools/hysop_ls.py b/hysop/tools/hysop_ls.py index 27d793b8d..ef24950ac 100755 --- a/hysop/tools/hysop_ls.py +++ b/hysop/tools/hysop_ls.py @@ -1,5 +1,5 @@ -import sys, os, argparse, tempfile +import sys, os, argparse, tempfile, warnings # default caching directory tmp = tempfile.gettempdir() @@ -8,7 +8,7 @@ _user_hysop = os.path.expanduser('~/.cache/hysop') if os.path.isdir(_user_hysop): default_cache_dir = '{}/topology'.format(_user_hysop) else: - default_cache_dir = '/{}/hysop/topology'.format(tmp) + default_cache_dir = '{}/hysop/topology'.format(tmp) class BackendMask(object): # utility class to store reported backends @@ -133,7 +133,8 @@ class DeviceMask(object): if __name__ == '__main__': # build the argument parser - description = 'List information about local or distant cluster topology prior to a run.\nHardware informations are gathered using hwloc (lstopo), pyopencl and pycuda.' + description = 'List information about local or distant cluster topology prior to a run.' + description += '\nHardware informations are gathered using hwloc (lstopo), pyopencl and pycuda.' parser = argparse.ArgumentParser( prog='hysop-ls', description=description) @@ -294,32 +295,30 @@ if __name__ == '__main__': mpi_info.Set('env', '\n'.join(env)) comm = MPI.COMM_SELF.Spawn('python', info=mpi_info, - args=['-c', 'from hysop.tools.hysop_ls import collect_node_information; collect_node_information()']).Merge() + args=['-c', 'from hysop.tools.hysop_ls import collect_node_informations; collect_node_informations()']).Merge() if verbose: print 'Connection established.' print 'Gathering hardware topologies...' - print 'root gather {}'.format(comm.Get_rank()) topologies = comm.gather(None, root=0)[1:] - print 'root gathered' + if verbose: print 'Computing requested statistics...' - print topologies[0] hostfile.close() sys.exit(0) -def collect_node_information(): +def collect_node_informations(): from mpi4py import MPI from hysop.backend.hardware.hwinfo import PCIIds, Topology + from hysop.tools.warnings import HysopWarning comm = MPI.Comm.Get_parent().Merge() - process_rank = comm.Get_rank() - process_count = comm.Get_size() - process_host = MPI.Get_processor_name() - pciids = PCIIds() - topo = Topology.parse(pciids) - print 'child gather {}'.format(process_rank) + try: + topo = Topology.parse(pciids) + except: + msg='Failed to parse topology.' + warnings.warn(msg, HysopWarning) + topo = None comm.gather(topo, root=0) - print 'child gathered' sys.exit(0) diff --git a/hysop/tools/units.py b/hysop/tools/units.py index 32ef11b6e..f32b73145 100644 --- a/hysop/tools/units.py +++ b/hysop/tools/units.py @@ -43,6 +43,9 @@ def bdw2str(bdw,decimal=True,rounded=2): def flops2str(flops,decimal=True,rounded=2): return unit2str(flops,'FLOPS',decimal,rounded) +def iops2str(iops,decimal=True,rounded=2): + return unit2str(iops,'IOPS',decimal,rounded) + def freq2str(freq,decimal=True,rounded=2): return unit2str(freq,'Hz',decimal,rounded) -- GitLab