diff --git a/CMakeLists.txt b/CMakeLists.txt index 3f00922a9e5eeb1d9a371e577d0d8ef6b8f5788b..2a8c2abe6f7320f947c07921e960a57ccc618186 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -36,7 +36,7 @@ option(WITH_TESTS "Enable testing. Default = OFF" ON) option(BUILD_SHARED_LIBS "Enable dynamic library build, default = ON." ON) option(USE_CXX "Expand hysop with some new functions from a generated c++ to python interface, wrapped into hysop.cpp2hysop module. Default = ON." OFF) option(WITH_SCALES "compile/create scales lib and link it with HySoP. Default = ON." ON) -option(WITH_PARALLEL_HDF5 "Enable parallel hdf5 interface. Default = OFf." OFF) +option(WITH_PARALLEL_COMPRESSED_HDF5 "Try to enable parallel compressed hdf5 interface. Default = ON." ON) option(WITH_FFTW "Link with fftw library (required for some HySoP solvers), default = ON" ON) option(WITH_EXTRAS "Link with some extra fortran libraries (like arnoldi solver), default = OFF" OFF) option(WITH_GPU "Use of GPU (required for some HySoP solvers), default = ON" ON) @@ -232,13 +232,23 @@ if(WITH_EXTRAS) endif() # ========= Check parallel hdf5 availability ========= -if(WITH_PARALLEL_HDF5) +if(WITH_PARALLEL_COMPRESSED_HDF5) execute_process( - COMMAND ${PYTHON_EXECUTABLE} -c "import h5py; print(h5py.get_config().mpi)" - OUTPUT_VARIABLE H5PY_PARALLEL_ENABLED) - if(H5PY_PARALLEL_ENABLED EQUAL "True") - message(FATAL_ERROR "h5py is not build with parallel support.") + COMMAND ${PYTHON_EXECUTABLE} -c "import h5py; print('.'.join(str(_) for _ in h5py.h5.get_libversion()))" + OUTPUT_VARIABLE LIB_HDF5_VERSION) + string(REGEX REPLACE "\n$" "" LIB_HDF5_VERSION "${LIB_HDF5_VERSION}") + execute_process( + COMMAND ${PYTHON_EXECUTABLE} -c "import h5py; print(h5py.h5.get_libversion() >= (1,10,2) )" + OUTPUT_VARIABLE H5PY_PARALLEL_COMPRESSION_ENABLED) + if(H5PY_PARALLEL_COMPRESSION_ENABLED EQUAL "False") + message(WARNING "Your hdf5 library is too old to support parallel compression support. Minimal version is 1.10.2 but h5py was linked to version $LIB_HDF5_VERSION. Parallel HDF5 compression will be disabled.") + set(H5PY_PARALLEL_COMPRESSION_ENABLED "OFF") + else() + message(STATUS "Found h5py linked against libhdf5 version ${LIB_HDF5_VERSION}. Parallel HDF5 compression will enabled.") + set(H5PY_PARALLEL_COMPRESSION_ENABLED "ON") endif() +else() + set(H5PY_PARALLEL_COMPRESSION_ENABLED "OFF") endif() # ========= Check which opencl devices are available on the system ========= @@ -641,7 +651,7 @@ if(VERBOSE_MODE) message(STATUS " Project uses Scales : ${WITH_SCALES}") message(STATUS " Project uses FFTW : ${WITH_FFTW}") message(STATUS " Project uses GPU : ${WITH_GPU}") - message(STATUS " Project uses parallel hdf5 interface : ${WITH_PARALLEL_HDF5}") + message(STATUS " Project uses parallel hdf5 interface : ${H5PY_PARALLEL_COMPRESSION_ENABLED}") message(STATUS " ${PROJECT_NAME} profile mode : ${PROFILE}") message(STATUS " ${PROJECT_NAME} debug mode : ${DEBUG}") message(STATUS " Enable -OO run? : ${OPTIM}") diff --git a/hysop/__init__.py.in b/hysop/__init__.py.in index c1b8f5f435dbac12cc21bbcb2ee9a2bd11403c53..ef29096df566f849a87b1898392d60ebed0a4346 100644 --- a/hysop/__init__.py.in +++ b/hysop/__init__.py.in @@ -33,7 +33,7 @@ __GPU_ENABLED__ = "@WITH_GPU@" is "ON" __FFTW_ENABLED__ = "@WITH_FFTW@" is "ON" __SCALES_ENABLED__ = "@WITH_SCALES@" is "ON" __OPTIMIZE__ = not __debug__ -__PARALLEL_HDF5__ = "@WITH_PARALLEL_HDF5@" is "ON" +__H5PY_PARALLEL_COMPRESSION_ENABLED__ = ("@H5PY_PARALLEL_COMPRESSION_ENABLED@" is "ON") __VERBOSE__ = get_env('VERBOSE', ("@VERBOSE@" is "ON")) __DEBUG__ = get_env('DEBUG', ("@DEBUG@" is "ON")) diff --git a/hysop/operator/hdf_io.py b/hysop/operator/hdf_io.py index 1012844335f624c61bfc07bbb9799f3f0dc499fb..65238694793e294988ba6ac5635060948e3b7200 100755 --- a/hysop/operator/hdf_io.py +++ b/hysop/operator/hdf_io.py @@ -1,5 +1,3 @@ -# coding: utf-8 - """I/O operators .. currentmodule:: hysop.operator.hdf_io @@ -11,7 +9,7 @@ """ import functools from abc import ABCMeta, abstractmethod -from hysop import __PARALLEL_HDF5__ +from hysop import __H5PY_PARALLEL_COMPRESSION_ENABLED__ from hysop.deps import h5py, sys from hysop.core.graph.graph import discretized from hysop.constants import DirectionLabels, HYSOP_REAL, Backend, TranspositionState @@ -26,7 +24,6 @@ from hysop.topology.cartesian_descriptor import CartesianTopologyDescriptors from hysop.core.memory.memory_request import MemoryRequest from hysop.topology.topology_descriptor import TopologyDescriptor - class HDF_IO(ComputationalGraphOperator): """ Abstract interface to read/write from/to hdf files, for @@ -184,22 +181,28 @@ class HDF_IO(ComputationalGraphOperator): def discretize(self): super(HDF_IO, self).discretize() - self.topology = self.input_fields.values()[0] + topo = self.input_fields.values()[0] + use_local_hdf5 = (topo.cart_size == 1) + use_local_hdf5 |= (topo.proc_shape[0] == topo.cart_size) - refmesh = self.topology.mesh + self.topology = topo + self.use_local_hdf5 = use_local_hdf5 + self.use_parallel_hdf5 = not use_local_hdf5 + + refmesh = topo.mesh # Global resolution for hdf5 output self._global_grid_resolution = refmesh.grid_resolution # Local resolution for hdf5 output self._local_grid_resolution = refmesh.local_resolution - self._all_local_grid_resolution = self.topology.cart_comm.gather( + self._all_local_grid_resolution = topo.cart_comm.gather( self._local_grid_resolution, root=self.io_params.io_leader) local_compute_slices = {} global_compute_slices = {} for (field, itopo) in self.input_fields.iteritems(): mesh = itopo.mesh - assert (self.topology.domain._domain is itopo.domain._domain), 'domain mismatch' + assert (topo.domain._domain is itopo.domain._domain), 'domain mismatch' assert npw.array_equal(refmesh.grid_resolution, mesh.grid_resolution), 'global grid resolution mismatch' assert (mesh.on_proc == refmesh.on_proc) if mesh.on_proc: @@ -212,10 +215,6 @@ class HDF_IO(ComputationalGraphOperator): self._global_compute_slices = global_compute_slices self.refmesh = refmesh - #def setup(self, work=None): - #super(HDF_IO, self).setup(work=work) - #No list of hdf dataset names provided by user ... - name_prefix, name_postfix = self.name_prefix, self.name_postfix if (self.var_names is None): var_names = {} @@ -229,7 +228,7 @@ class HDF_IO(ComputationalGraphOperator): else: for var in self.var_names: # Discrete field associated to var - var_d = var.discretize(self.topology) + var_d = var.discretize(topo) for d in xrange(var_d.nb_components): name = name_prefix + self.var_names[var] name += '_' + DirectionLabels[d] + name_postfix @@ -241,19 +240,18 @@ class HDF_IO(ComputationalGraphOperator): self._local_compute_slices[name] = self._local_compute_slices[f] self._global_compute_slices[name] = self._global_compute_slices[f] - def open_hdf(self, count, mode): + def open_hdf(self, count, mode, compression='gzip'): filename = self._get_filename(count) - if self.topology.cart_size == 1 : + if (self.topology.cart_size == 1): self._hdf_file = h5py.File(filename, mode) - compression = 'gzip' + elif self.use_parallel_hdf5: + self._hdf_file = h5py.File(filename, mode, driver='mpio', + comm=self.topology.comm) + # disable compression if hdf5 library version is less than 1.10.2 (checked in CMakeList.txt). + if not __H5PY_PARALLEL_COMPRESSION_ENABLED__: + compression=None else: - if __PARALLEL_HDF5__: - self._hdf_file = h5py.File(filename, mode, driver='mpio', - comm=self.topology.comm) - compression = None - else: - self._hdf_file = h5py.File(filename.format(rk=self.topology.cart_rank), mode) - compression = 'gzip' + self._hdf_file = h5py.File(filename.format(rk=self.topology.cart_rank), mode) return compression @classmethod @@ -263,6 +261,7 @@ class HDF_IO(ComputationalGraphOperator): def supports_mpi(cls): return True + class HDF_Writer(HDF_IO): """ Print field(s) values on a given topo, in HDF5 format. @@ -371,9 +370,10 @@ class HDF_Writer(HDF_IO): """Set output file name for current iteration""" msg = 'count < 0, simu must be initialized.' assert i >= 0, msg - if self.topology.cart_size == 1 or __PARALLEL_HDF5__: + if (self.topology.cart_size == 1) or self.use_parallel_hdf5: return self.io_params.filename + "_{0:05d}".format(i) + '.h5' else: + assert self.use_local_hdf5 return self.io_params.filename + "_{0:05d}".format(i) + "_rk{rk:03d}.h5" @@ -409,7 +409,7 @@ class HDF_Writer(HDF_IO): ds_names = self.dataset.keys() joinrkfiles = None - if self.topology.cart_size > 1 and not __PARALLEL_HDF5__: + if self.use_local_hdf5 and (self.topology.cart_size > 1): joinrkfiles = range(self.topology.cart_size) grid_attributes = XMF.prepare_grid_attributes( ds_names, @@ -439,7 +439,7 @@ class HDF_Writer(HDF_IO): assert (f is not None) assert (lastp is not None) for (i, t) in self._xdmf_data_files: - if self.topology.cart_size == 1 or __PARALLEL_HDF5__: + if (self.topology.cart_size == 1) or self.use_parallel_hdf5: filenames = {'filename': self._get_filename(i).split('/')[-1]} else: filenames = dict(('filename'+str(r), self._get_filename(i).format(rk=r).split('/')[-1]) for r in range(self.topology.cart_size)) @@ -476,21 +476,28 @@ class HDF_Writer(HDF_IO): # Get the names of output input_fields and create the corresponding # datasets - if (self.topology.cart_size == 1) or __PARALLEL_HDF5__: + if self.use_local_hdf5: for name in self.dataset: ds = self._hdf_file.create_dataset(name, - self._global_grid_resolution, + self._local_grid_resolution, dtype=npw.float64, compression=compression) - # In parallel, each proc must write at the right place of the dataset - ds[self._global_compute_slices[name]] = self._data_getters[name]() - else: + ds[...] = self._data_getters[name]().astype(npw.float64) + elif self.use_parallel_hdf5: for name in self.dataset: ds = self._hdf_file.create_dataset(name, - self._local_grid_resolution, + self._global_grid_resolution, dtype=npw.float64, compression=compression) - ds[...] = self._data_getters[name]() + if (compression is None): + # no need for collective here because we do not use any filter + ds[self._global_compute_slices[name]] = self._data_getters[name]().astype(npw.float64) + else: + with ds.collective: + ds[self._global_compute_slices[name]] = self._data_getters[name]().astype(npw.float64) + else: + msg='Unknown HDF5 mode.' + raise RuntimeError(msg) # Collect datas required to write the xdmf file # --> add tuples (counter, time). diff --git a/hysop/topology/cartesian_topology.py b/hysop/topology/cartesian_topology.py index 6c862ecdada9491b3bedf7a4a9e4ac1fcfd5e6da..175c2e599337353a146aa581436f645a5daa3de6 100644 --- a/hysop/topology/cartesian_topology.py +++ b/hysop/topology/cartesian_topology.py @@ -582,7 +582,6 @@ class CartesianTopology(CartesianTopologyView, Topology): * All attributes are read-only properties. """ - # Get or create mpi parameters mpi_params = cls._create_mpi_params(mpi_params, domain, cl_env) diff --git a/opencl_explore.py b/opencl_explore.py index 6c3dc0547d90da58bfa9151cfa929a34b43d16ee..57618f37c96ab0143473ded0208e5e4143e22784 100644 --- a/opencl_explore.py +++ b/opencl_explore.py @@ -89,7 +89,7 @@ def explore(device_type=cl.device_type.GPU): default_platform, default_device = \ get_defaults(device_type=device_type) - out += "Platforms informations:\n Id |" + out += "\nPlatforms informations:\n Id |" for i, plt in enumerate(platforms): out += str(i) + ' ' * (p_str_max[i] - len(str(i))) + ' |' for i, plt_info in enumerate(platforms_info): @@ -97,7 +97,7 @@ def explore(device_type=cl.device_type.GPU): for i_p, plt in enumerate(platforms): out += p_data[plt][i] out += ' ' * (p_str_max[i_p] - len(p_data[plt][i])) + ' |' - out += "\nDevices informations: \n Default device |" + out += "\n\nDevices informations: \n Default device |" for i, dev in enumerate(all_devices): if i == default_device and d_data[dev][-1] == default_platform: out += "DEFAULT" + ' ' * (d_str_max[dev] - 7) + ' |'