From 857bde7f4b7ccbaeefdd7e4a8e299828f2a66d3e Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Keck <jean-baptiste.keck@imag.fr> Date: Sat, 3 Mar 2018 21:57:04 +0100 Subject: [PATCH] working mpi hdf writers --- CMakeLists.txt | 4 +- examples/analytic/analytic.py | 5 +- examples/example_utils.py | 107 ++++++++++++++++-- hysop/__init__.py | 24 +--- hysop/__init__.py.in | 18 --- hysop/backend/device/kernel_autotuner.py | 37 +++--- hysop/backend/device/opencl/clpeak.py | 3 +- .../backend/host/python/operator/analytic.py | 7 +- hysop/core/graph/computational_node.py | 4 +- hysop/numerics/remesh/kernel_generator.py | 66 ++++------- hysop/numerics/stencil/stencil_generator.py | 62 +++------- hysop/operator/hdf_io.py | 42 ++++--- hysop/tools/cache.py | 28 ++--- hysop/topology/cartesian_topology.py | 5 + hysop/topology/topology.py | 5 + 15 files changed, 210 insertions(+), 207 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 429539e66..03693ec8f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -73,7 +73,7 @@ else() endif() if(NOT USE_MPI) - message(FATAL_ERROR "No-mpi version of hysop is broken, please enable mpi.") + message(FATAL_ERROR "Non-mpi version of hysop has been depcecated, please enable mpi.") endif() # Fortran interface is not used by default. @@ -143,9 +143,9 @@ find_python_module(subprocess32 REQUIRED) find_python_module(editdistance REQUIRED) find_python_module(portalocker REQUIRED) find_python_module(colors.py REQUIRED) +find_python_module(tee REQUIRED) find_python_module(argparse_color_formatter REQUIRED) find_python_module(graph_tool REQUIRED) -# --- OpenCL --- find_python_module(pyopencl REQUIRED) # --- MPI --- if(USE_MPI) diff --git a/examples/analytic/analytic.py b/examples/analytic/analytic.py index 7f4dff235..7aca0e986 100755 --- a/examples/analytic/analytic.py +++ b/examples/analytic/analytic.py @@ -116,6 +116,5 @@ if __name__=='__main__': parser.set_defaults(box_start=(0.0,), box_length=(2*np.pi,), tstart=0.0, tend=10.0, nb_iter=100, dump_freq=5) - args = parser.parse() - - compute(args) + + parser.run(compute) diff --git a/examples/example_utils.py b/examples/example_utils.py index 41efcfa2e..74f005a22 100644 --- a/examples/example_utils.py +++ b/examples/example_utils.py @@ -1,4 +1,4 @@ -import os, argparse, tempfile, colors, textwrap, warnings +import os, argparse, tempfile, colors, textwrap, warnings, contextlib, tee, re, errno from argparse_color_formatter import ColorHelpFormatter class HysopArgumentWarning(UserWarning): @@ -20,7 +20,13 @@ class SplitAppendAction(argparse._AppendAction): assert isinstance(values, str), type(values) for c in ('(','{','[',']','}',')'): values = values.replace(c, '') - values = tuple(self._convert(v) for v in values.split(self._separator)) + try: + values = tuple(self._convert(v) for v in values.split(self._separator)) + except: + msg='Failed to convert \'{}\' to {} of {}s for parameter {}.' + msg=msg.format(values, self._container.__name__, self._convert.__name__, + self.dest) + parser.error(msg) assert len(values)>0 if self._append: items = argparse._ensure_value(namespace, self.dest, self._container()) @@ -138,11 +144,84 @@ class HysopArgParser(argparse.ArgumentParser): self._setup_implementation(args) - hysop.redirect_stderr(args.stderr) - hysop.redirect_stdout(args.stdout) - return args + def run(self, program, **kwds): + from mpi4py import MPI + size = MPI.COMM_WORLD.Get_size() + rank = MPI.COMM_WORLD.Get_rank() + + args = self.parse() + args.stdout = self._fmt_filename(args.stdout, rank, size, args) + args.stderr = self._fmt_filename(args.stderr, rank, size, args) + + self._rmfile(args.stdout) + self._rmfile(args.stderr) + + with self.redirect_stdout(rank, args), self.redirect_stderr(rank, args): + program(args, **kwds) + + @staticmethod + def _fmt_filename(filename, rank, size, args): + return filename.format(rank=rank, size=size, dpath=args.dump_dir) + + @staticmethod + def _color_filter(msg): + return re.sub(r'(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]', "", msg) + + @staticmethod + def _null_filter(msg): + return None + + @staticmethod + def _mkdir(path): + path = os.path.dirname(os.path.realpath(path)) + try: + os.makedirs(path) + except OSError as e: + if (e.errno != errno.EEXIST): + raise + + @staticmethod + def _rmfile(path): + try: + os.remove(path) + except OSError as e: + if (e.errno != errno.ENOENT): + raise + + @contextlib.contextmanager + def redirect_stdout(self, rank, args): + redirect_to_terminal = (rank in args.tee_ranks) + + file_filters = [self._color_filter] + if redirect_to_terminal: + stream_filters = [] + else: + stream_filters = [self._null_filter] + + self._mkdir(args.stdout) + with tee.StdoutTee(args.stdout, mode='a', + file_filters=file_filters, + stream_filters=stream_filters): + yield + + @contextlib.contextmanager + def redirect_stderr(self, rank, args): + redirect_to_terminal = (rank in args.tee_ranks) + + file_filters = [self._color_filter] + if redirect_to_terminal: + stream_filters = [] + else: + stream_filters = [self._null_filter] + + self._mkdir(args.stderr) + with tee.StderrTee(args.stderr, mode='a', + file_filters=file_filters, + stream_filters=stream_filters): + yield + def _add_positional_args(self): pass @@ -437,12 +516,17 @@ class HysopArgParser(argparse.ArgumentParser): term_io = self.add_argument_group('Terminal I/O') msg0=('All {rank} and {size} occurences are replaced by the MPI rank and ' +'MPI communicator size. {dpath} is replaced by default output path.') - term_io.add_argument('-stdout', '--std-out', type=str, default=None, - dest='stdout', - help='Redirect stdout to this file after hysop has been initialized. '+msg0) - term_io.add_argument('-stderr', '--std-err', type=str, default=None, + #+'{program} and {hostname} are replaced by actual program and host name.') + term_io.add_argument('-stdout', '--std-out', type=str, default='{dpath}/p{size}/{rank}.out', + dest='stdout', + help='Redirect stdout to this file.' + msg0) + term_io.add_argument('-stderr', '--std-err', type=str, default='{dpath}/p{size}/{rank}.out', dest='stderr', - help='Redirect stderr to this file after hysop has been initialized. '+msg0), + help='Redirect stderr to this file. '+msg0) + term_io.add_argument('-tee', '--tee-ranks', type=str, default={0}, + action=self.split, container=set, append=False, convert=int, + dest='tee_ranks', + help='Tee stdout and stderr of specified MPI ranks to terminal.') term_io.add_argument('-V', '--verbose', action='store_true', default=None, dest='verbose', help='Enable verbosity. Overrides HYSOP_VERBOSE.') @@ -470,7 +554,8 @@ class HysopArgParser(argparse.ArgumentParser): return term_io def _check_term_io_args(self, args): - self._check_default(args, ('stdout','stderr'), str, allow_none=True) + self._check_default(args, ('stdout','stderr'), str, allow_none=False) + self._check_default(args, ('tee_ranks'), set, allow_none=False) self._check_default(args, ('verbose', 'debug', 'profile', 'kernel_debug'), bool, allow_none=True) self._check_default(args, 'trace', set, allow_none=True) diff --git a/hysop/__init__.py b/hysop/__init__.py index 418275749..d7b0eb17f 100644 --- a/hysop/__init__.py +++ b/hysop/__init__.py @@ -85,24 +85,6 @@ def reset(): from hysop.tools.handle import RegisteredObject RegisteredObject._RegisteredObject__reset() -def redirect_stdout(stdout): - if (stdout is not None): - stdout = stdout.format(rank=main_rank, size=main_size, dpath=default_path) - vprint('*Redirecting stdout to \'{}\'.'.format(stdout)) - d = os.path.dirname(stdout) - if not os.path.exists(d): - os.makedirs(d) - sys.stdout = open(stdout, 'w') - -def redirect_stderr(stderr): - if (stderr is not None): - stderr = stderr.format(rank=main_rank, size=main_size, dpath=default_path) - vprint('*Redirecting stderr to \'{}\'.'.format(stderr)) - d = os.path.dirname(stderr) - if not os.path.exists(d): - os.makedirs(d) - sys.stderr = open(stderr, 'w') - # override warning printer __formatwarning = warnings.formatwarning def __new_formatwarning(message, category, filename, lineno, line=None): @@ -146,12 +128,12 @@ msg_start = '\nStarting {} version {}'.format(package_name, version) if __MPI_ENABLED__: msg_start += (' with {} mpi process(es) on {} host(s) '+ 'providing {} shared memory node(s).').format(main_size,interhost_size,intershm_size) -vprint(msg_start) +mprint(msg_start) default_path = IO.default_path() cache_path = IO.default_cache_path() msg_io = '\n*Default path for all i/o is \'{}\'.'.format(default_path) msg_io += '\n*Default path for caching is \'{}\'.'.format(cache_path) -vprint(msg_io) -vprint() +mprint(msg_io) +mprint() diff --git a/hysop/__init__.py.in b/hysop/__init__.py.in index 5b604a8e3..113cbbcc3 100644 --- a/hysop/__init__.py.in +++ b/hysop/__init__.py.in @@ -85,24 +85,6 @@ def reset(): from hysop.tools.handle import RegisteredObject RegisteredObject._RegisteredObject__reset() -def redirect_stdout(stdout): - if (stdout is not None): - stdout = stdout.format(rank=main_rank, size=main_size, dpath=default_path) - vprint('*Redirecting stdout to \'{}\'.'.format(stdout)) - d = os.path.dirname(stdout) - if not os.path.exists(d): - os.makedirs(d) - sys.stdout = open(stdout, 'w') - -def redirect_stderr(stderr): - if (stderr is not None): - stderr = stderr.format(rank=main_rank, size=main_size, dpath=default_path) - vprint('*Redirecting stderr to \'{}\'.'.format(stderr)) - d = os.path.dirname(stderr) - if not os.path.exists(d): - os.makedirs(d) - sys.stderr = open(stderr, 'w') - # override warning printer __formatwarning = warnings.formatwarning def __new_formatwarning(message, category, filename, lineno, line=None): diff --git a/hysop/backend/device/kernel_autotuner.py b/hysop/backend/device/kernel_autotuner.py index b742e8a8f..94ebf7d78 100644 --- a/hysop/backend/device/kernel_autotuner.py +++ b/hysop/backend/device/kernel_autotuner.py @@ -7,6 +7,7 @@ from hysop.tools.types import check_instance from hysop.tools.io_utils import IO from hysop.tools.misc import previous_pow2 from hysop.tools.numpywrappers import npw +from hysop.tools.cache import load_cache, update_cache from hysop.backend.device.autotunable_kernel import AutotunableKernel, \ AutotunerWorkConfiguration from hysop.backend.device.kernel_statistics import KernelStatistics @@ -18,11 +19,6 @@ class KernelGenerationError(RuntimeError): class KernelAutotuner(object): __metaclass__ = ABCMeta - cache_dir = IO.cache_path() + '/kernel_autotuner' - config_file = cache_dir+'/configs.pklz' - if not os.path.exists(cache_dir): - os.makedirs(cache_dir) - FULL_RESULTS_KEY = '__FULL_RESULTS__' DUMP_LAST_TUNED_KERNEL = False STORE_FULL_KERNEL_SOURCES = False @@ -30,28 +26,33 @@ class KernelAutotuner(object): @staticmethod def _hash_func(): return hashlib.new('sha256') + + @staticmethod + def cache_dir(): + cache_dir = IO.cache_path() + '/kernel_autotuner' + return cache_dir + + def cache_file(self): + cache_file = '{}/{}.pklz'.format(self.cache_dir(), self.name.replace(' ','_')) + return cache_file def _reload_cache(self, extra_kwds_hash): - cache_file = '{}/{}.pklz'.format(KernelAutotuner.cache_dir,self.name.replace(' ','_')) - try: - with gzip.open(cache_file, 'rb') as f: - self.all_results = pickle.loads(f.read()) - if self.verbose: - print - print self.indent(1)+'>Loading cached results from \'{}\'.'.format(cache_file) - except: - self.all_results = {} + cache_file = self.cache_file() + if self.verbose: + print + print self.indent(1)+'>Loading cached results from \'{}\'.'.format(cache_file) + self.all_results = load_cache(cache_file) config_key = self.autotuner_config_key() config_key += (extra_kwds_hash,) + self.config_key = config_key self.results = self.all_results.setdefault(config_key, {}) - self.cache_file = cache_file return self.results def _dump_cache(self, silent=False): - with gzip.open(self.cache_file, 'wb') as f: - pickle.dump(self.all_results, f) + cache_file = self.cache_file() if (not silent) and (self.verbose>1): - print self.indent(1)+'>Caching results to \'{}\'.'.format(self.cache_file) + print self.indent(1)+'>Caching results to \'{}\'.'.format(cache_file) + update_cache(cache_file, self.config_key, self.results) def __init__(self, name, tunable_kernel, **kwds): """ diff --git a/hysop/backend/device/opencl/clpeak.py b/hysop/backend/device/opencl/clpeak.py index f762d4248..dba14738d 100644 --- a/hysop/backend/device/opencl/clpeak.py +++ b/hysop/backend/device/opencl/clpeak.py @@ -46,7 +46,8 @@ class ClPeakInfo(object): '_transfer_bandwidth_values', '_global_bdw_values', '_sp_compute_values', '_dp_compute_values', '_int_compute_values') - def _cache_file(self): + @classmethod + def _cache_file(cls): return IO.cache_path() + '/hardware/clpeak.pklz' def _load_from_cache(self, key): diff --git a/hysop/backend/host/python/operator/analytic.py b/hysop/backend/host/python/operator/analytic.py index 6584f57cd..1fa057ff0 100644 --- a/hysop/backend/host/python/operator/analytic.py +++ b/hysop/backend/host/python/operator/analytic.py @@ -83,10 +83,15 @@ class PythonAnalyticField(HostOperator): for (field, dfield) in self.input_discrete_fields: assert field.name not in extra_kwds, field.name extra_kwds[field.name] = dfield.data + self.dfield = dfield @op_apply def apply(self, **kwds): """Apply analytic formula.""" super(PythonAnalyticField, self).apply(**kwds) self.formula(**self.extra_kwds) - + self.dfield.exchange_ghosts() + + @classmethod + def supports_mpi(cls): + return True diff --git a/hysop/core/graph/computational_node.py b/hysop/core/graph/computational_node.py index 6005c773a..dd8fa4628 100644 --- a/hysop/core/graph/computational_node.py +++ b/hysop/core/graph/computational_node.py @@ -405,8 +405,8 @@ class ComputationalGraphNode(OperatorBase): msg+='\n *{} -> {}'.format(field.field_tag, topo.full_tag) raise NotImplementedError(msg) if (self._is_distributed) and (not cls.supports_mpi()): - msg='MPI multi-process has not been implemented in graph operator \'{}\' yet!' - msg=msg.format(self.name) + msg='\nMPI multi-process has not been implemented in graph operator \'{}\' yet!\n' + msg=msg.format(type(self)) raise NotImplementedError(msg) diff --git a/hysop/numerics/remesh/kernel_generator.py b/hysop/numerics/remesh/kernel_generator.py index 3b56d16a1..c0cdf2acb 100644 --- a/hysop/numerics/remesh/kernel_generator.py +++ b/hysop/numerics/remesh/kernel_generator.py @@ -1,36 +1,8 @@ -from hysop.deps import os, hashlib, pickle, sm, np, gzip, sp +from hysop.deps import os, hashlib, sm, np, gzip, sp from hysop.tools.io_utils import IO from hysop.tools.numerics import mpq,mpfr,mpqize,f2q - -_cache_dir = IO.cache_path() + '/numerics' -_cache_file = _cache_dir + '/remesh.pklz' - -def _load_cached_kernels(): - if not os.path.exists(_cache_dir): - os.makedirs(_cache_dir) - if not os.path.isfile(_cache_file): - data = {} - else: - try: - with gzip.open(_cache_file, 'rb') as f: - data = pickle.load(f) - except IOError, EOFError: - print 'warning: Could not read remeshing kernel cache.' - os.remove(_cache_file) - data = {} - return data - -def _export_kernels(): - with gzip.open(_cache_file, 'wb') as f: - pickle.dump(_cached_kernels,f) - -def _hash_kernel_key(n,r,deg,Ms,H,remesh): - s = '{}_{}_{}_{}_{}_{}'.format(n,r,deg,Ms,H,int(remesh)) - return hashlib.sha256(s).hexdigest() - -_cached_kernels = _load_cached_kernels() - +from hysop.tools.cache import load_data_from_cache, update_cache class Kernel(object): def __init__(self, register=False, verbose=False, split_polys=False, **kargs): @@ -53,7 +25,17 @@ class Kernel(object): self._build(verbose, split_polys) self._hashable = True + + @classmethod + def hash_kernel_key(cls,n,r,deg,Ms,H,remesh): + s = '{}_{}_{}_{}_{}_{}'.format(n,r,deg,Ms,H,int(remesh)) + return hashlib.sha256(s).hexdigest() + @classmethod + def cache_file(cls): + _cache_dir = IO.cache_path() + '/numerics' + _cache_file = _cache_dir + '/remesh.pklz' + return _cache_file def __eq__(self, other): if not isinstance(other, ExplicitRungeKutta): @@ -70,7 +52,7 @@ class Kernel(object): return not eq def __hash__(self): - h = _hash_kernel_key(self.n,self.r,self.deg,self.Ms,self.H,self.remesh) + h = self.hash_kernel_key(self.n,self.r,self.deg,self.Ms,self.H,self.remesh) return hash((h,self.split_polys)) def _build(self,verbose,split_polys): @@ -192,12 +174,9 @@ class Kernel(object): if verbose: print ' All done.' - def _register(self,dic): - h = _hash_kernel_key(self.n,self.r,self.deg,self.Ms,self.H,self.remesh) - if h in _cached_kernels.keys(): - raise KeyError('Current kernel was already cached or kernels key clash...') - _cached_kernels[h] = dic - _export_kernels() + def _register(self, dic): + key = self._hash_kernel_key(self.n,self.r,self.deg,self.Ms,self.H,self.remesh) + update_cache(self.cache_file(), key, dic) def __call__(self,x): Ms = self.Ms @@ -210,6 +189,8 @@ class SymmetricKernelGenerator(object): preserves the first discrete moments of a given symmetric stencil H. If H is None, an interpolation stencil is generated. """ + + SINGULAR_SYSTEM = {} def __init__(self,verbose=False): self.verbose = verbose @@ -281,19 +262,19 @@ class SymmetricKernelGenerator(object): Mh = mpqize(np.asarray(Mh)) # check if kernel was not already cached - h = _hash_kernel_key(n,r,deg,Ms,H,remesh) + key = Kernel.hash_kernel_key(n,r,deg,Ms,H,remesh) if override_cache: if verbose: print ' Cache overwrite requested.' else: if verbose: print ' Checking if kernel was already computed... ', - if h in _cached_kernels: + data = load_data_from_cache(Kernel.cache_file(), key) + if (data is not None): if verbose: print 'True' print ' Loading kernel from cache.' - data = _cached_kernels[h] - if (data is None): + if (data == self.SINGULAR_SYSTEM): raise RuntimeError('Could not solve linear system.') kernel = cls(verbose=verbose,register=False,split_polys=split_polys,**data) return kernel @@ -396,8 +377,7 @@ class SymmetricKernelGenerator(object): if len(sol)!=len(unknowns): if verbose: print 'sol=',sol - _cached_kernels[h] = None - _export_kernels() + update_cache(Kernel.cache_file(), key, self.SINGULAR_SYSTEM) raise RuntimeError('Could not solve linear system.') elif verbose: print ' => System solved.' diff --git a/hysop/numerics/stencil/stencil_generator.py b/hysop/numerics/stencil/stencil_generator.py index d77551152..14794e185 100644 --- a/hysop/numerics/stencil/stencil_generator.py +++ b/hysop/numerics/stencil/stencil_generator.py @@ -10,6 +10,7 @@ from hysop.tools.misc import prod from hysop.tools.io_utils import IO from hysop.tools.numerics import MPQ, MPZ, MPFR, F2Q, mpqize from hysop.tools.types import extend_array +from hysop.tools.cache import update_cache, load_data_from_cache from hysop.tools.sympy_utils import tensor_symbol, tensor_xreplace, \ factor_split, build_eqs_from_dicts @@ -169,10 +170,12 @@ class StencilGenerator(object): CROSS = 1 DIAG = 2 CUSTOM = 99 - - #caching - _cache_dir = IO.cache_path() + '/numerics' - _cache_file = _cache_dir + '/stencil.pklz' + + @classmethod + def cache_file(cls): + _cache_dir = IO.cache_path() + '/numerics' + _cache_file = _cache_dir + '/stencil.pklz' + return cache_file def __init__(self, config=None, cache_override=False): """ @@ -258,51 +261,18 @@ class StencilGenerator(object): self._config = config self._cache_override = cache_override - self._init_cache() - - def _init_cache(self): - if not os.path.exists(StencilGenerator._cache_dir): - os.makedirs(StencilGenerator._cache_dir) - self.results = StencilGenerator._load_cache() - @staticmethod def _hash_algorithm(): import hashlib return hashlib.new('sha512') - @staticmethod - def _hash_equations(eqs): - hasher = StencilGenerator._hash_algorithm() + @classmethod + def _hash_equations(cls, eqs): + hasher = cls._hash_algorithm() for e in eqs: hasher.update(str(e)) return hasher.hexdigest() - @staticmethod - def _load_cache(): - if os.path.isfile(StencilGenerator._cache_file): - try: - with gzip.open(StencilGenerator._cache_file, 'rb') as f: - results = pickle.loads(f.read()) - except EOFError, IOError: - os.remove(StencilGenerator._cache_file) - results = {} - else: - results = {} - return results - - @staticmethod - def _dump_cache(results): - with gzip.open(StencilGenerator._cache_file, 'wb') as f: - pickle.dump(results, f) - - @staticmethod - def _update_cache(results): - cached_results = StencilGenerator._load_cache() - cached_results.update(results) - StencilGenerator._dump_cache(cached_results) - return cached_results - - def get_config(self): """ Get current stencil generator configuration state. @@ -452,19 +422,17 @@ class StencilGenerator(object): stencil = Stencil(S,origin,order) i = 0 sol = {} + cache_file = self.cache_file() while(stencil.variables().intersection(svars)): eqs_key = StencilGenerator._hash_equations(eqs) - if (self._cache_override) or (eqs_key not in self.results.keys()): + nsol = load_data_from_cache(cache_file, eqs_key) + if (self._cache_override) or (nsol is None): nsol = sm.solve(eqs,svars) - self.results[eqs_key] = nsol - self.results = StencilGenerator._update_cache(self.results) - else: - nsol = self.results[eqs_key] - + update_cache(cache_file, key, nsol) if not nsol: break sol.update(nsol) - S = tensor_xreplace(S,sol) + S = tensor_xreplace(S, sol) err = 0 while err==0: diff --git a/hysop/operator/hdf_io.py b/hysop/operator/hdf_io.py index a32ff6bc0..ceef898f5 100755 --- a/hysop/operator/hdf_io.py +++ b/hysop/operator/hdf_io.py @@ -112,9 +112,9 @@ class HDF_IO(ComputationalGraphOperator): # Local topology, that MUST be common to all input_fields. self.topology = None - self._slices = None + self._local_compute_slices = None self._global_resolution = None - self._sl = None + self._global_slices = None # Dictionnary of discrete fields. Key = name in hdf file, # Value = discrete field self.dataset = {} @@ -149,16 +149,13 @@ class HDF_IO(ComputationalGraphOperator): # Global resolution for hdf5 output (warning : this must # be the whole domain resolution, not the subset resolution) - self._global_resolution = list(refmesh.grid_resolution) - self._slices = refmesh.local_compute_slices + self._global_resolution = refmesh.grid_resolution + self._local_compute_slices = refmesh.local_compute_slices if refmesh.on_proc: - sl = list(refmesh.global_compute_slices) + global_compute_slices = refmesh.global_compute_slices else: - sl = [slice(0, 0) for _ in xrange(self.domain.dim)] - # Reverse order, to fit with xdmf req. - self._global_resolution.reverse() - sl.reverse() - self._sl = tuple(sl) + global_compute_slices = tuple(slice(0, 0) for _ in xrange(self.domain.dim)) + self._global_compute_slices = global_compute_slices def setup(self, work=None): super(HDF_IO, self).setup(work=work) @@ -345,6 +342,7 @@ class HDF_Writer(HDF_IO): # rank. self._count = simu.current_iteration compression = self.open_hdf(self._count, mode='w') + # Get the names of output input_fields and create the corresponding # datasets for name in self.dataset: @@ -352,18 +350,21 @@ class HDF_Writer(HDF_IO): self._global_resolution, dtype=HYSOP_REAL, compression=compression) - # In parallel, each proc must write at the right place - # of the dataset --> use self._slices. - ds[self._sl] = npw.asrealarray(self.dataset[name].get()[self._slices]) + # In parallel, each proc must write at the right place of the dataset + data = self.dataset[name].get() + ds[self._global_compute_slices] = npw.asrealarray(data[self._local_compute_slices]) + # Collect datas required to write the xdmf file # --> add tuples (counter, time). - msg = 'You cannot write two hdf files for the same ' - msg += '(time, var) set. ' - msg += 'If you want to save a field two times for ' - msg += 'a single time value, please use two hdf_writer operators.' - assert simu.time != self._last_written_time, msg + if (simu.time == self._last_written_time): + msg = 'You cannot write two hdf files for the same ' + msg += '(time, var) set. ' + msg += 'If you want to save a field two times for ' + msg += 'a single time value, please use two hdf_writer operators.' + raise RuntimeError(msg) self._xdmf_data_files.append((self._count, simu.time)) self._last_written_time = simu.time + self._hdf_file.close() def _step_HDF5_XMF(self, simu): @@ -416,13 +417,10 @@ class HDF_Reader(HDF_IO): msg = 'You try to read a dataset not present in hdf file : ' for name in self.dataset: assert name in dsnames, msg + name - # Read data - self.dataset[name][self._slices] = self._hdf_file[name][self._sl] + self.dataset[name][self._local_compute_slices] = self._hdf_file[name][self._global_compute_slices] self._hdf_file.close() - # Set to None to check if it is closed in finalize self._hdf_file = None - # Do we need a CPU->GPU transfer here? @discretized def dataset_names(self): diff --git a/hysop/tools/cache.py b/hysop/tools/cache.py index 998c2c92d..451b6c849 100644 --- a/hysop/tools/cache.py +++ b/hysop/tools/cache.py @@ -20,7 +20,7 @@ if (machine_id in (None,'')): @contextlib.contextmanager def lock_file(filepath, mode, compressed=True, - timeout=30, check_interval=0.1): + timeout=20, check_interval=0.1): """ Opens a locked file with specified mode, possibly compressed. """ @@ -34,30 +34,22 @@ def lock_file(filepath, mode, compressed=True, raise if not os.path.exists(filepath): open(filepath, 'a').close() - with portalocker.Lock(filename=filepath, timeout=timeout, mode=mode, check_interval=check_interval) as fl: + with portalocker.Lock(filename=filepath, timeout=timeout, mode=mode, + check_interval=check_interval) as fl: if compressed: with gzip.GzipFile(fileobj=fl) as f: yield f else: yield fl - except portalocker.exceptions.LockException: - if os.path.exists(filepath): - try: - os.remove(filepath) - open(filepath, 'a').close() - except: - pass - with portalocker.Lock(filename=filepath, timeout=timeout, mode=mode, check_interval=check_interval) as fl: - if compressed: - with gzip.GzipFile(fileobj=fl) as f: - yield f - else: - yield fl - + except portalocker.exceptions.LockException as e: + msg='\nFATAL ERROR: Could not obtain lock for file \'{}\' after waiting for {}s.\n' + msg=msg.format(filepath, timeout) + print msg + raise e @contextlib.contextmanager def read_only_lock(filepath, compressed=True, - timeout=30, check_interval=0.1): + timeout=20, check_interval=0.1): """Opens a locked read only file, possibly compressed.""" with lock_file(filepath=filepath, mode='r', compressed=compressed, timeout=timeout, check_interval=check_interval) as f: @@ -65,7 +57,7 @@ def read_only_lock(filepath, compressed=True, @contextlib.contextmanager def write_only_lock(filepath, compressed=True, - timeout=30, check_interval=0.1): + timeout=20, check_interval=0.1): """Opens a locked write only file, possibly compressed.""" with lock_file(filepath=filepath, mode='w', compressed=compressed, timeout=timeout, check_interval=check_interval) as f: diff --git a/hysop/topology/cartesian_topology.py b/hysop/topology/cartesian_topology.py index 15524b684..70f61f49c 100644 --- a/hysop/topology/cartesian_topology.py +++ b/hysop/topology/cartesian_topology.py @@ -231,6 +231,9 @@ class CartesianTopologyView(TopologyView): def _get_ghosts(self): """Returns ghosts of the discretization.""" return self._proc_transposed(self._topology._discretization.ghosts) + def _get_comm(self): + """Return the communicator of this topology.""" + return self._get_cart_comm() def _get_cart_comm(self): """MPI cartesian topology intracommunicator.""" return self._topology._cart_comm @@ -323,6 +326,8 @@ class CartesianTopologyView(TopologyView): global_resolution = property(_get_global_resolution) ghosts = property(_get_ghosts) + comm = property(_get_comm) + cart_comm = property(_get_cart_comm) cart_dim = property(_get_cart_dim) cart_size = property(_get_cart_size) diff --git a/hysop/topology/topology.py b/hysop/topology/topology.py index 2b639d6e3..c69ebe237 100644 --- a/hysop/topology/topology.py +++ b/hysop/topology/topology.py @@ -182,6 +182,11 @@ class TopologyView(TaggedObjectView): """The geometry dimension on which the topology is defined.""" return self._topology._domain.dim + @abstractmethod + def _get_comm(self): + """Return the communicator of this topology.""" + pass + def _get_parent(self): """Return the communicator used to build this topology.""" return self._topology._mpi_params.comm -- GitLab