From 19e3be21f08be3d539d0c72b511a3e8f469fa182 Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Keck <Jean-Baptiste.Keck@imag.fr> Date: Sun, 26 Jul 2020 02:18:46 +0200 Subject: [PATCH] fix examples times_of_interest, fix graph builder io_params, fix useless folder generation, fix argparser io_parms, add hdf5_disable_slicing, set io_params=False by default --- hysop/core/graph/computational_node.py | 12 +- hysop/core/graph/continuous.py | 8 +- hysop/core/graph/graph_builder.py | 19 +- hysop/operator/hdf_io.py | 8 +- hysop/problem.py | 452 +++++++++++++++--- hysop/simulation.py | 50 +- hysop/tools/cache.py | 11 +- hysop/tools/debug_dumper.py | 16 +- hysop/tools/io_utils.py | 29 +- hysop_examples/example_utils.py | 67 ++- hysop_examples/examples/analytic/analytic.py | 2 +- .../examples/bubble/periodic_bubble.py | 2 +- .../bubble/periodic_bubble_levelset.py | 2 +- .../periodic_bubble_levelset_penalization.py | 2 +- .../examples/bubble/periodic_jet_levelset.py | 2 +- .../examples/cylinder/oscillating_cylinder.py | 2 +- .../examples/fixed_point/heat_equation.py | 2 +- .../flow_around_sphere/flow_around_sphere.py | 2 +- .../multiresolution/scalar_advection.py | 2 +- .../particles_above_salt_bc.py | 2 +- .../particles_above_salt_bc_3d.py | 2 +- .../particles_above_salt_periodic.py | 2 +- .../particles_above_salt_symmetrized.py | 2 +- .../scalar_advection/scalar_advection.py | 2 +- .../scalar_diffusion/scalar_diffusion.py | 2 +- .../sediment_deposit/sediment_deposit.py | 2 +- .../sediment_deposit_levelset.py | 2 +- .../examples/shear_layer/shear_layer.py | 2 +- .../examples/taylor_green/taylor_green.py | 2 +- .../taylor_green/taylor_green_cpuFortran.py | 2 +- 30 files changed, 548 insertions(+), 164 deletions(-) diff --git a/hysop/core/graph/computational_node.py b/hysop/core/graph/computational_node.py index 777b6c0c7..89f7a1fe0 100644 --- a/hysop/core/graph/computational_node.py +++ b/hysop/core/graph/computational_node.py @@ -270,7 +270,7 @@ class ComputationalGraphNode(OperatorBase): # => defer initialization of base class until full initialization. from hysop.core.graph.computational_graph import ComputationalGraph check_instance(self, ComputationalGraph) - io_params = kwds.get('io_params', True) + io_params = kwds.get('io_params', False) self.io_params = io_params self._set_io() @@ -381,7 +381,7 @@ class ComputationalGraphNode(OperatorBase): msg += '\n *field: {}'.format(topo.mpi_params) msg += '\n' raise RuntimeError(msg) - + super(ComputationalGraphNode, self).__init__(name=self.name, fields=fields, tensor_fields=tfields, @@ -1058,12 +1058,12 @@ class ComputationalGraphNode(OperatorBase): io_params = self.io_params if (io_params is None): - msg = 'io_params was never set for operator {}.'.format(self.name) + msg = 'io_params was never set for operator {}, please pass io_params to dump_inputs().'.format(self.name) raise RuntimeError(msg) - frequency = first_not_None(frequency, io_params.frequency) + frequency = first_not_None(frequency, io_params.frequency) fileformat = first_not_None(fileformat, io_params.fileformat) - io_leader = first_not_None(io_leader, io_params.io_leader) + io_leader = first_not_None(io_leader, io_params.io_leader) if (filename is not None): pass @@ -1075,7 +1075,7 @@ class ComputationalGraphNode(OperatorBase): io_params = IOParams(filename=filename, frequency=frequency, fileformat=fileformat, io_leader=io_leader) - + self._input_fields_to_dump.append((fields, io_params, op_kwds)) def dump_outputs(self, fields=None, io_params=None, diff --git a/hysop/core/graph/continuous.py b/hysop/core/graph/continuous.py index e6e313cd2..ae2e358bc 100755 --- a/hysop/core/graph/continuous.py +++ b/hysop/core/graph/continuous.py @@ -1,6 +1,7 @@ """Common interface for all continuous operators. """ +import os from abc import ABCMeta, abstractmethod from hysop import __PROFILE__, vprint, dprint @@ -25,7 +26,7 @@ class OperatorBase(TaggedObject): @debug def __init__(self, name, fields, tensor_fields, parameters, mpi_params=None, - io_params=True, + io_params=False, **kwds): """ Parameters @@ -104,13 +105,12 @@ class OperatorBase(TaggedObject): if (iopar is not None): if isinstance(iopar, bool): if (iopar is True): - filename='{}/{}'.format(IO.default_path(), self.name) + filename=os.path.join(IO.default_path(), self.name) self.io_params = IOParams(filename, fileformat=IO.HDF5) else: self.io_params = None elif isinstance(iopar, IOParams): - msg = 'Error, wrong file format for operator output.' - assert self.io_params.fileformat is IO.HDF5, msg + pass else: raise TypeError('Error, wrong type for io_params.') diff --git a/hysop/core/graph/graph_builder.py b/hysop/core/graph/graph_builder.py index e1614af36..f3810a432 100644 --- a/hysop/core/graph/graph_builder.py +++ b/hysop/core/graph/graph_builder.py @@ -381,13 +381,8 @@ class GraphBuilder(object): target_topo = self.output_fields[field] variables = {field: target_topo} - io_params = IOParams( - filename='{}_{}_out'.format(io_params.filename, field.name), - frequency=io_params.frequency, - fileformat=io_params.fileformat, - io_leader=io_params.io_leader) - op = HDF_Writer(io_params=io_params, variables=variables, - **op_kwds) + io_params = io_params.clone(filename='{}_{}_out'.format(io_params.filename, field.name)) + op = HDF_Writer(io_params=io_params, variables=variables, **op_kwds) op.initialize(topgraph_method=self.target_node.method) op.get_and_set_field_requirements() opnode = self.new_node(op, None, current_level, @@ -574,12 +569,7 @@ class GraphBuilder(object): if (input_fields_to_dump is not None): for (fields, io_params, op_kwds) in input_fields_to_dump: if (not fields) or (field in fields): - io_params = IOParams( - filename='{}_{}_in'.format(io_params.filename, - field.name), - frequency=io_params.frequency, - fileformat=io_params.fileformat, - io_leader=io_params.io_leader) + io_params = io_params.clone(filename='{}_{}_in'.format(io_params.filename, field.name)) self.dump_ifield = (io_params, op_kwds) break @@ -831,8 +821,7 @@ class GraphBuilder(object): from hysop.operator.hdf_io import HDF_Writer io_params, op_kwds = self.dump_ifield variables = {ifield: target_topo} - writer_op = HDF_Writer(io_params=io_params, variables=variables, - **op_kwds) + writer_op = HDF_Writer(io_params=io_params, variables=variables, **op_kwds) writer_op.initialize(topgraph_method=self.method) writer_op.get_and_set_field_requirements() writer_opnode = self.add_vertex(graph, writer_op) diff --git a/hysop/operator/hdf_io.py b/hysop/operator/hdf_io.py index 9631efb97..b9a13393f 100755 --- a/hysop/operator/hdf_io.py +++ b/hysop/operator/hdf_io.py @@ -187,7 +187,7 @@ class HDF_IO(ComputationalGraphOperator): super(HDF_IO, self).discretize() topo = self.input_fields.values()[0] use_local_hdf5 = (topo.cart_size == 1) - use_local_hdf5 |= (topo.proc_shape[0] == topo.cart_size) and (topo.cart_size <= 16) + use_local_hdf5 |= (topo.proc_shape[0] == topo.cart_size) and (topo.cart_size <= 16) and (not self.io_params.hdf5_disable_slicing) # XDMF JOIN do not support more than 16 arguments self.topology = topo @@ -510,14 +510,16 @@ class HDF_Writer(HDF_IO): ds = self._hdf_file.create_dataset(name, self._local_grid_resolution, dtype=npw.float64, - compression=compression) + compression=compression, + track_times=False) # required if we want to compare checksums in tests ds[...] = self._data_getters[name]().astype(npw.float64) elif self.use_parallel_hdf5: for name in self.dataset: ds = self._hdf_file.create_dataset(name, self._global_grid_resolution, dtype=npw.float64, - compression=compression) + compression=compression, + track_times=False) # required if we want to compare checksums in tests if (compression is None): # no need for collective here because we do not use any filter ds[self._global_compute_slices[name]] = self._data_getters[name]().astype(npw.float64) diff --git a/hysop/problem.py b/hysop/problem.py index 2ad39c92a..e11679584 100644 --- a/hysop/problem.py +++ b/hysop/problem.py @@ -1,9 +1,9 @@ from __future__ import absolute_import -import operator, os, sys, datetime, warnings, shutil, tarfile +import operator, os, sys, datetime, warnings, shutil, tarfile, uuid import numpy as np from hysop.constants import Backend, MemoryOrdering -from hysop.tools.types import first_not_None, to_tuple +from hysop.tools.types import first_not_None, to_tuple, to_list from hysop.tools.string_utils import vprint_banner from hysop.tools.contexts import Timer from hysop.tools.decorators import debug @@ -25,6 +25,8 @@ class Problem(ComputationalGraph): mpi_params = first_not_None(mpi_params, MPIParams()) # enforce mpi params for problems super(Problem, self).__init__(name=name, method=method, mpi_params=mpi_params, **kwds) self._do_check_unique_clenv = check_unique_clenv + self._checkpoint_template = None + self._checkpoint_compressor = None @debug def insert(self, *ops): @@ -141,20 +143,21 @@ class Problem(ComputationalGraph): vprint_banner('** Dry-run requested, skipping simulation. **') return - self.load_checkpoint(load_checkpoint, simu) + simu.initialize() + self.create_checkpoint_template(save_checkpoint, checkpoint_io_params) + self.load_checkpoint(load_checkpoint, checkpoint_io_params, simu) vprint('\nSolving problem...') - with Timer() as tm: while not simu.is_over: vprint() simu.print_state() self.apply(simulation=simu, dbg=dbg, **kwds) simu.advance(dbg=dbg, plot_freq=plot_freq) + self.save_checkpoint(save_checkpoint, checkpoint_io_params, simu) if report_freq and (simu.current_iteration % report_freq) == 0: self.profiler_report() - self.save_checkpoint(save_checkpoint, checkpoint_io_params, simu) avg_time = main_comm.allreduce(tm.interval) / main_size msg = ' Simulation took {} ({}s)' @@ -174,20 +177,69 @@ class Problem(ComputationalGraph): def final_report(self): self.profiler_report() + + @debug + def finalize(self): + vprint('Finalizing problem...') + super(Problem, self).finalize() + if ((self._checkpoint_template is not None) + and os.path.exists(self._checkpoint_template) + and self.mpi_params.rank==0): + try: + shutil.rmtree(self._checkpoint_template) + except OSError: + pass + @debug - def load_checkpoint(self, load_checkpoint, simu): + def load_checkpoint(self, load_checkpoint, checkpoint_io_params, simu): if (load_checkpoint is None): - simu.initialize() return - - vprint('\nLoading problem checkpoint from \'{}\'...'.format(load_checkpoint)) + + vprint('\n>Loading problem checkpoint from \'{}\'...'.format(load_checkpoint)) if not os.path.exists(load_checkpoint): msg='Failed to load checkpoint \'{}\' because the file does not exist.' raise RuntimeError(msg.format(load_checkpoint)) + + mpi_params = self.mpi_params + comm = mpi_params.comm + io_leader = checkpoint_io_params.io_leader + is_io_leader = (io_leader == self.mpi_params.rank) + start = Wtime() + + if os.path.isfile(load_checkpoint): + if load_checkpoint.endswith('.tar'): + if is_io_leader: + load_checkpoint_dir = os.path.join(os.path.dirname(load_checkpoint), + os.path.basename(load_checkpoint).replace('.tar', '')) + while os.path.exists(load_checkpoint_dir): + # ok, use another directory name to avoid dataloss... + load_checkpoint_dir = os.path.join(os.path.dirname(load_checkpoint), + '{}'.format(uuid.uuid4().hex)) + tf = tarfile.open(load_checkpoint, mode='r') + tf.extractall(path=load_checkpoint_dir) + else: + load_checkpoint_dir = None + load_checkpoint_dir = comm.bcast(load_checkpoint_dir, root=io_leader) + should_remove_dir = True + else: + msg='Can only load checkpoint with tar extension.' + raise NotImplementedError(msg) + else: + load_checkpoint_dir = load_checkpoint + should_remove_dir = False + + # here we want hysop to crash on unsuccessfull import + self._import_checkpoint(load_checkpoint_dir,checkpoint_io_params, simu) - raise NotImplementedError + if is_io_leader and should_remove_dir: + shutil.rmtree(load_checkpoint_dir) + + ellapsed = Wtime() - start + msg=' > Successfully imported checkpoint in {}.' + vprint(msg.format(time2str(ellapsed))) + @debug def save_checkpoint(self, save_checkpoint, checkpoint_io_params, simu): if (save_checkpoint is None): @@ -199,12 +251,29 @@ class Problem(ComputationalGraph): if not checkpoint_io_params.should_dump(simu): return - - save_checkpoint_tar = save_checkpoint+'.tar' + + io_leader = checkpoint_io_params.io_leader + is_io_leader = (io_leader == self.mpi_params.rank) + start = Wtime() + + # Checkpoint is first exported as a directory containing a hierarchy of arrays (field and parameters data + metadata) + # This folder is than tarred (without any form of compression) so that a checkpoint consists in a single movable file. + # Data is already compressed during data export by the zarr module, using the blosc compressor (snappy, clevel=3). + assert save_checkpoint.endswith('.tar') + save_checkpoint_tar = save_checkpoint + if is_io_leader: + save_checkpoint_dir = os.path.join(os.path.dirname(save_checkpoint), + os.path.basename(save_checkpoint).replace('.tar', '')) + while os.path.exists(save_checkpoint_dir): + # ok, use another directory name to avoid dataloss... + save_checkpoint_dir = os.path.join(os.path.dirname(save_checkpoint), + '{}'.format(uuid.uuid4().hex)) + else: + save_checkpoint_dir = None + save_checkpoint_dir = self.mpi_params.comm.bcast(save_checkpoint_dir, root=io_leader) + del save_checkpoint vprint('>Exporting problem checkpoint to \'{}\':'.format(save_checkpoint_tar)) - is_io_leader = (checkpoint_io_params.io_leader == self.mpi_params.rank) - start = Wtime(); # create a backup of last checkpoint in the case things go wrong if is_io_leader and os.path.exists(save_checkpoint_tar): @@ -217,63 +286,55 @@ class Problem(ComputationalGraph): # try to create the checkpoint directory, this is a collective MPI operation try: - success, reason, nbytes = self._dump_checkpoint(save_checkpoint, checkpoint_io_params, simu) + success, reason, nbytes = self._export_checkpoint(save_checkpoint_dir, checkpoint_io_params, simu) except Exception as e: raise success = False reason = str(e) success = main_comm.allreduce(int(success)) == main_comm.size - # Compress checkpoint directory to tar (easier to copy between clusters) - # Note that there is not effective compression here, zarr already compressed field/param data - if success and is_io_leader and os.path.isdir(save_checkpoint): + # Compress checkpoint directory to tar (easier to copy/move between clusters) + # Note that there is no effective compression here, zarr already compressed field/param data + if success and is_io_leader and os.path.isdir(save_checkpoint_dir): try: with tarfile.open(save_checkpoint_tar, 'w') as tf: - for (root, dirs, files) in os.walk(save_checkpoint): + for (root, dirs, files) in os.walk(save_checkpoint_dir): for f in files: fpath = os.path.join(root, f) - tf.add(fpath, arcname=fpath.replace(save_checkpoint+os.path.sep,'')) + tf.add(fpath, arcname=fpath.replace(save_checkpoint_dir+os.path.sep,'')) if os.path.isfile(save_checkpoint_tar): - shutil.rmtree(save_checkpoint) + shutil.rmtree(save_checkpoint_dir) else: - raise RuntimeError('Could not tar checkpoint data.') + raise RuntimeError('Could not tar checkpoint datadir.') ellapsed = Wtime() - start effective_nbytes = os.path.getsize(save_checkpoint_tar) compression_ratio = max(1.0, float(nbytes)/effective_nbytes) - msg=' > Successfully dumped checkpoint in {} with a compression ratio of {:.1f} ({}).' + msg=' > Successfully exported checkpoint in {} with a compression ratio of {:.1f} ({}).' vprint(msg.format(time2str(ellapsed), compression_ratio, bytes2str(effective_nbytes))) except Exception as e: success = False reason = str(e) success = main_comm.allreduce(int(success)) == main_comm.size - + if success: + if (backup_checkpoint_tar is not None) and os.path.isfile(backup_checkpoint_tar) and is_io_leader: + os.remove(backup_checkpoint_tar) return from hysop.tools.warning import HysopDumpWarning - msg='Failed to dump checkpoint because: {}.'.format(reason) + msg='Failed to export checkpoint because: {}.'.format(reason) warnings.warn(msg, HysopDumpWarning) # Something went wrong (I/O error or other) so we rollback to previous checkpoint (if there is one) - vprint(' | An error occured during checkpoint creation, rolling back to previous one...') + vprint(' | An error occured during checkpoint creation, rolling back to previous checkpoint...') if is_io_leader: - if os.path.exists(save_checkpoint): - shutil.rmtree(save_checkpoint) + if os.path.exists(save_checkpoint_dir): + shutil.rmtree(save_checkpoint_dir) if os.path.exists(save_checkpoint_tar): os.remove(save_checkpoint_tar) if (backup_checkpoint_tar is not None) and os.path.exists(backup_checkpoint_tar): os.rename(backup_checkpoint_tar, save_checkpoint_tar) - - @staticmethod - def format_zarr_key(k): - # note keys that contains the special characters '/' and '\' do not work well with zarr - # so we need to replace it by another character such as '_'. - # We cannot use utf8 characters such as u+2215 (division slash). - if (k is None): - return None - return k.replace('/', '_').replace('\\', '_') - def create_checkpoint_template(self, save_checkpoint, checkpoint_io_params): # Create groups of arrays on disk (only hierarchy and array metadata is stored in the template) # /!\ ZipStores are not safe from multiple processes so we use a DirectoryStore @@ -281,21 +342,32 @@ class Problem(ComputationalGraph): if (save_checkpoint is None): return - - checkpoint_template = save_checkpoint + '.template' - self._checkpoint_template = checkpoint_template - del save_checkpoint - - vprint('\n>Creating checkpoint template as \'{}\'...'.format(checkpoint_template)) + mpi_params = self.mpi_params comm = mpi_params.comm io_leader = checkpoint_io_params.io_leader is_io_leader = (io_leader == mpi_params.rank) + assert save_checkpoint.endswith('.tar'), save_checkpoint + if is_io_leader: + checkpoint_template = os.path.join(os.path.dirname(save_checkpoint), + os.path.basename(save_checkpoint).replace('.tar', '.template')) + while os.path.exists(checkpoint_template): + # ok, use another directory name to avoid dataloss... + checkpoint_template = os.path.join(os.path.dirname(save_checkpoint), + '{}'.format(uuid.uuid4().hex)) + else: + checkpoint_template = None + checkpoint_template = comm.bcast(checkpoint_template, root=io_leader) + self._checkpoint_template = checkpoint_template + del save_checkpoint + + vprint('\n>Creating checkpoint template as \'{}\'...'.format(checkpoint_template)) # Array block data compressor from numcodecs import blosc, Blosc blosc.use_threads = (self.mpi_params.size == 1) # disable threads for multiple processes (can deadlock) - compressor = Blosc(cname='snappy', clevel=4, shuffle=Blosc.BITSHUFFLE) + compressor = Blosc(cname='snappy', clevel=3, shuffle=Blosc.BITSHUFFLE) + self._checkpoint_compressor = compressor # Create a directory layout as a file on shared filesystem import zarr @@ -306,16 +378,21 @@ class Problem(ComputationalGraph): if os.path.exists(checkpoint_template): shutil.rmtree(checkpoint_template) store = zarr.DirectoryStore(path=checkpoint_template) - root = zarr.open_group(store=store, mode='w', path='') + root = zarr.open_group(store=store, mode='w', path='data') params = root.create_group('params') fields = root.create_group('fields') + simulation = root.create_group('simulation') else: store = None root = None params = None fields = None - - # generate parameter arrays + simulation = None + + # Generate parameter arrays + # Here we expect that each process store parameters that are in sync + # For each parameter we assume that the same values are broadcast to all processes + # even if is not enforced by the library (should cover most current use cases...) for param in sorted(self.parameters, key=operator.attrgetter('name')): if not is_io_leader: continue @@ -330,10 +407,11 @@ class Problem(ComputationalGraph): array.attrs['kind'] = param.__class__.__name__ nbytes += value.nbytes else: - msg = 'Cannot dump parameter of type {}.'.format(param.__class__.__name__) + msg = 'Cannot export parameter of type {}.'.format(param.__class__.__name__) raise NotImplementedError(msg) - # generate discrete field arrays + # Generate discrete field arrays + # Here we assume that each process has a non-empty chunk of data for field in sorted(self.fields, key=operator.attrgetter('name')): # we do not care about fields discretized only on temporary fields if all(df.is_tmp for df in field.discrete_fields.values()): @@ -449,7 +527,7 @@ class Problem(ComputationalGraph): pass - def _dump_checkpoint(self, save_checkpoint, checkpoint_io_params, simu): + def _export_checkpoint(self, save_checkpoint_dir, checkpoint_io_params, simu): # Given a template, fill field and parameters data from all processes. # returns (bool, msg) where bool is True on success @@ -461,20 +539,27 @@ class Problem(ComputationalGraph): if not os.path.exists(self._checkpoint_template): # checkpoint template may have been deleted by user during simulation self.create_checkpoint_template(save_checkpoint, checkpoint_io_params) + compressor = self._checkpoint_compressor + if is_io_leader: - if os.path.exists(save_checkpoint): - shutil.rmtree(save_checkpoint) - shutil.copytree(self._checkpoint_template, save_checkpoint) + if os.path.exists(save_checkpoint_dir): + shutil.rmtree(save_checkpoint_dir) + shutil.copytree(self._checkpoint_template, save_checkpoint_dir) comm.Barrier() #Every process now loads the same dataset template import zarr - store = zarr.DirectoryStore(save_checkpoint) - root = zarr.open_group(store=store, mode='r+', synchronizer=None, path='') + store = zarr.DirectoryStore(save_checkpoint_dir) + root = zarr.open_group(store=store, mode='r+', synchronizer=None, path='data') nbytes = root.attrs['nbytes'] fields_group = root['fields'] params_group = root['params'] + simu_group = root['simulation'] + + # Export simulation data + if is_io_leader: + simu.save_checkpoint(simu_group, mpi_params, checkpoint_io_params, compressor=compressor) # Currently there is no distributed parameter capabilities so io_leader has to dump all parameters if is_io_leader: @@ -527,7 +612,7 @@ class Problem(ComputationalGraph): or (mesh.is_at_right_boundary*(mesh.proc_shape>1)).any()) local_data = dfield.compute_data[0].get() global_slices = mesh.global_compute_slices - array[global_slices] = local_data + array[global_slices] = local_data # ok, every process writes to an independent data blocks # Some zarr store formats require a final close to flush data try: @@ -536,14 +621,249 @@ class Problem(ComputationalGraph): pass return True, None, nbytes + + + def _import_checkpoint(self, load_checkpoint_dir, checkpoint_io_params, simu, strict_check=False): + if not os.path.isdir(load_checkpoint_dir): + msg='Cannot find directory \'{}\'.'.format(load_checkpoint_dir) + raise RuntimeError(msg) + + # On data import, there is no need to synchronize read-only arrays + # so we are good with multiple processes reading overlapping data blocks + + import zarr + store = zarr.DirectoryStore(load_checkpoint_dir) + self.mpi_params.comm.Barrier() + + try: + root = zarr.open_group(store=store, mode='r', synchronizer=None, path='data') + params = root['params'] + fields = root['fields'] + simulation = root['simulation'] + except: + msg='An error occured during checkpoint import.' + vprint(msg) + vprint() + raise + + def raise_error(msg): + msg = ' | error: {}\n'.format(msg) + vprint(msg) + msg = 'FATAL ERROR: Failed to import checkpoint, check logs for more information.'.format(msg) + raise RuntimeError(msg) + def raise_warning(msg): + msg = ' | warning: {}'.format(msg) + vprint(msg) + + if strict_check: + raise_warning = raise_error + + def load_array_data(array, dfield): + mesh = dfield.mesh._mesh + assert np.equal(array.shape, mesh.grid_resolution).all() + + # compare attributes but ignore name because this can be annoying + attr_names = ('left boundaries', 'right boundaries', 'ghost layers', 'process shape', 'datatype') + array_attributes = (array.attrs['lboundaries'], array.attrs['rboundaries'], array.attrs['ghosts'], + array.attrs['proc_shape'], array.dtype) + dfield_attributes = (list(map(str, mesh.global_lboundaries)), list(map(str, mesh.global_rboundaries)), + list(mesh.ghosts), list(mesh.proc_shape)) + for (name,lhs,rhs) in zip(attr_names, array_attributes, dfield_attributes): + if lhs==rhs: + continue + msg='{} do not match with checkpointed field {}, loaded {} {} but expected {}.' + msg=msg.format(name, dfield.field.name, name, lhs, rhs) + raise_warning(msg) + + global_slices = mesh.global_compute_slices + data = np.asarray(array[global_slices], dtype=dfield.dtype) + dfield.compute_data[0][...] = data + dfield.exchange_ghosts() + + # Import parameters, hopefully parameter names match the ones in the checkpoint + msg = ' | importing parameters...' + vprint(msg) + for param in sorted(self.parameters, key=operator.attrgetter('name')): + key = self.format_zarr_key(param.name) + + if (key not in params): + msg='Checkpoint directory \'{}\' does not contain any data regarding to parameter {}' + msg=msg.format(load_checkpoint_dir, param.name) + raise_error(msg) + + array = params[key] + + if array.attrs['kind'] != param.__class__.__name__: + msg='Parameter kind do not match with checkpointed parameter {}, loaded kind {} but expected {}.' + msg=msg.format(param.name, array.attrs['kind'], param.__class__.__name__) + raise_error(msg) + + if isinstance(param, (ScalarParameter, TensorParameter, BufferParameter)): + value = param._value + + if (array.shape != value.shape): + msg='Parameter shape does not match with checkpointed parameter {}, loaded shape {} but expected {}.' + msg=msg.format(param.name, array.shape, value.shape) + raise_error(msg) + + if (array.dtype != value.dtype): + msg='Parameter datatype does not match with checkpointed parameter {}, loaded dtype {} but expected {}.' + msg=msg.format(param.name, array.dtype, value.dtype) + raise_warning(msg) + + value[...] = array[...] + else: + msg = 'Cannot import parameter of type {}.'.format(param.__class__.__name__) + raise NotImplementedError(msg) + + # Import simulation data after parameters are up to date + simu.load_checkpoint(simulation, self.mpi_params, checkpoint_io_params, strict_check) + + # Import discrete fields, this is a bit more tricky because topologies or simply topology + # names can change. Moreover there is currently no waranty that the same operator graph is + # generated for the exact same problem configuration each time. We just emit user warnings + # if we find a way to match topologies that do not match exactly checkpointed ones. + for field in sorted(self.fields, key=operator.attrgetter('name')): + domain = field.domain._domain + + # we do not care about fields discretized only on temporary fields + if all(df.is_tmp for df in field.discrete_fields.values()): + continue + msg = ' | importing field {}...'.format(field.pretty_name) + vprint(msg) + + field_key = self.format_zarr_key(field.name) + if (field_key not in fields): + msg='Checkpoint directory \'{}\' does not contain any data regarding to field {}' + msg=msg.format(load_checkpoint_dir, field.name) + raise_error(msg) + + dfields = fields[field_key] + + # check that domain matches + if dfields.attrs['domain'] != domain.__class__.__name__: + msg='Domain kind does not match with checkpointed field {}, loaded kind {} but expected {}.' + msg=msg.format(field.name, dfields.attrs['domain'], domain.__class__.__name__) + raise_error(msg) + if dfields.attrs['dim'] != domain.dim: + msg='Domain dim does not match with checkpointed field {}, loaded dim {} but expected {}.' + msg=msg.format(field.name, dfields.attrs['dim'], domain.dim) + raise_error(msg) + if dfields.attrs['origin'] != to_list(domain.origin): + msg='Domain origin does not match with checkpointed field {}, loaded origin {} but expected {}.' + msg=msg.format(field.name, dfields.attrs['origin'], domain.origin) + raise_error(msg) + if dfields.attrs['end'] != to_list(domain.end): + msg='Domain end does not match with checkpointed field {}, loaded end {} but expected {}.' + msg=msg.format(field.name, dfields.attrs['end'], domain.end) + raise_error(msg) + if dfields.attrs['length'] != to_list(domain.length): + msg='Domain length does not match with checkpointed field {}, loaded length {} but expected {}.' + msg=msg.format(field.name, dfields.attrs['length'], domain.length) + raise_error(msg) + + for (k, topo) in enumerate(sorted(field.discrete_fields, key=operator.attrgetter('full_tag'))): + dfield = field.discrete_fields[topo] + mesh = topo.mesh._mesh + + # we do not care about temporary fields + if dfield.is_tmp: + continue + + # for now we just handle CartesianDiscreteScalarFields. + if not isinstance(dfield, CartesianDiscreteScalarField): + raise NotImplementedError + + # first we need to exactly match global grid resolution + candidates = tuple(filter(lambda d: np.equal(d.shape, mesh.grid_resolution).all(), dfields.values())) + if len(candidates)==0: + msg='Could not find any topology with shape {} for field {}, available discretizations are: {}.' + msg=msg.format(to_tuple(mesh.grid_resolution), field.name, + ', '.join(set(str(d.shape) for d in dfields.values()))) + raise_error(msg) + elif len(candidates)==1: + load_array_data(candidates[0], dfield) + continue + + # Here multiple topologies have the extact same grid resolution so we try to match boundary conditions + old_candidates = candidates + candidates = tuple(filter(lambda d: d.attrs['lboundaries'] == to_tuple(map(str, mesh.global_lboundaries)), candidates)) + candidates = tuple(filter(lambda d: d.attrs['rboundaries'] == to_tuple(map(str, mesh.global_rboundaries)), candidates)) + if len(candidates)==0: + # ok, the user changed the boundary conditions, we ignore boundary condition information + candidates = old_candidates + elif len(candidates)==1: + load_array_data(candidates[0], dfield) + continue + + # From now on multiple topologies have the same grid resolution and boundary conditions + # We try to match exact ghost count, user did likely not change the order of the methods. + old_candidates = candidates + candidates = tuple(filter(lambda d: d.attrs['ghosts'] == to_tuple(mesh.ghosts), candidates)) + if len(candidates)==0: + # ok, the user made a change that affected ghosts, we ignore the ghost condition + candidates = old_candidates + elif len(candidates)==1: + load_array_data(candidates[0], dfield) + continue + + # Now we try to differentiate by using zero ghost info (ghosts may change with method order, but zero-ghost is very specific) + # Topology containing zero ghost layer usually target Fortran topologies for FFT operators or method that do not require any ghosts. + old_candidates = candidates + candidates = tuple(filter(lambda d: (np.equal(d.attrs['ghosts'],0) == (mesh.ghosts==0)).all(), candidates)) + if len(candidates)==0: + # ok, we ignore the zero-ghost condition + candidates = old_candidates + elif len(candidates)==1: + load_array_data(candidates[0], dfield) + continue + + # Now we try to match exact topology shape (the MPICart grid of processes) + # We try this late because use may run the simulation again with a different number of processes. + old_candidates = candidates + candidates = tuple(filter(lambda d: d.attrs['proc_shape'] == to_tuple(mesh.proc_shape), candidates)) + if len(candidates)==0: + # ok, we ignore the proc shape + candidates = old_candidates + elif len(candidates)==1: + load_array_data(candidates[0], dfield) + continue + + # Now we try to differentiate by using topo splitting info (axes on which data is distributed) + # This again is very specific and can differentiate topologies used for spectral transforms. + old_candidates = candidates + candidates = tuple(filter(lambda d: (np.greater(d.attrs['proc_shape'],1) == (mesh.proc_shape>1)).all(), candidates)) + if len(candidates)==0: + # ok, we ignore the MPI data splitting condition + candidates = old_candidates + elif len(candidates)==1: + load_array_data(candidates[0], dfield) + continue + + # Ok now, our last hope is to match the discrete field name + old_candidates = candidates + candidates = tuple(filter(lambda d: d.attrs['name'] == dfield.name, candidates)) + if len(candidates)==0: + # ok, we ignore the name + candidates = old_candidates + elif len(candidates)==1: + load_array_data(candidates[0], dfield) + continue + + assert len(candidates) > 1, 'Something went wrong.' + + msg='Could not discriminate checkpointed topologies for field {}, got {} candidates remaining.' + msg=msg.format(field.name, len(candidates)) + raise_error(msg) + + + @staticmethod + def format_zarr_key(k): + # note keys that contains the special characters '/' and '\' do not work well with zarr + # so we need to replace it by another character such as '_'. + # We cannot use utf8 characters such as u+2215 (division slash). + if (k is None): + return None + return k.replace('/', '_').replace('\\', '_') - @debug - def finalize(self): - vprint('Finalizing problem...') - super(Problem, self).finalize() - if (hasattr(self, '_checkpoint_template') - and (self._checkpoint_template is not None) - and os.path.exists(self._checkpoint_template) - and self.mpi_params.rank==0): - shutil.rmtree(self._checkpoint_template) diff --git a/hysop/simulation.py b/hysop/simulation.py index e133082df..917d3f2e4 100644 --- a/hysop/simulation.py +++ b/hysop/simulation.py @@ -28,17 +28,17 @@ Usage io.apply(s) """ +import numpy as np from abc import ABCMeta, abstractmethod from hysop import dprint, vprint from hysop.constants import HYSOP_REAL from hysop.deps import sys, os from hysop.parameters.scalar_parameter import ScalarParameter -from hysop.tools.types import first_not_None, to_set +from hysop.tools.types import first_not_None, to_set, check_instance from hysop.tools.numpywrappers import npw from hysop.tools.io_utils import IO, IOParams from hysop.tools.string_utils import vprint_banner from hysop.core.mpi import main_rank, main_comm -import numpy as np class Simulation(object): @@ -238,11 +238,11 @@ class Simulation(object): self.is_over = True return - self._comm.Barrier() # should - self.t.set_value(self.tkp1) + self._comm.Barrier() + self.update_time(self.tkp1) all_t = self._comm.gather(self.t(), root=0) - if self._rank == 0: + if (self._rank == 0): assert np.allclose(all_t, all_t[0]) self.is_time_of_interest = False @@ -255,7 +255,7 @@ class Simulation(object): if abs(self.tkp1 - self.end) <= self.tol: self._next_is_last = True elif (self.target_time_of_interest is not None) and \ - (self.tkp1 >= self.target_time_of_interest): + (self.tkp1+self.tol >= self.target_time_of_interest): msg = '** Next iteration is a time of interest, clamping dt to achieve t={}. **' msg = msg.format(self.target_time_of_interest) vprint() @@ -319,6 +319,9 @@ class Simulation(object): """ self.dt.set_value(dt) + + def update_time(self, t): + self.t.set_value(t) def initialize(self): """(Re)set simulation to initial values @@ -326,7 +329,7 @@ class Simulation(object): """ tstart, tend = self.start, self.end times_of_interest = self.times_of_interest - + self.toi_counter = 0 self.next_time_of_interest() self.is_time_of_interest = False @@ -343,11 +346,10 @@ class Simulation(object): dt0 = min(self._dt0, self.target_time_of_interest-tstart) else: dt0 = self._dt0 - - self.t.set_value(tstart) + + self.update_time(tstart) self.update_time_step(dt0) self.tkp1 = tstart + self.time_step - assert self.tkp1 <= tend if abs(self.tkp1 - self.end) <= self.tol: @@ -424,6 +426,34 @@ class Simulation(object): params = _params self._parameters_to_write.append((io_params, params, kwds)) + def save_checkpoint(self, datagroup, checkpoint_mpi_params, checkpoint_io_params, compressor): + import zarr + check_instance(datagroup, zarr.hierarchy.Group) + is_io_leader = (checkpoint_mpi_params.rank == checkpoint_io_params.io_leader) + if is_io_leader: + # we need to export simulation parameter values because they + # may not be part of global problem parameters + datagroup.attrs['t'] = float(self.t()) + datagroup.attrs['dt'] = float(self.dt()) + for attrname in ('current_iteration', 'tkp1', 'time'): + data = getattr(self, attrname) + try: + data = data.item() + except AttributeError: + pass + datagroup.attrs[attrname] = data + + def load_checkpoint(self, datagroup, checkpoint_mpi_params, checkpoint_io_params, strict_check): + import zarr + check_instance(datagroup, zarr.hierarchy.Group) + self.times_of_interest = tuple(sorted(filter(lambda t: t>=datagroup.attrs['time'], self.times_of_interest))) + self.toi_counter = 0 + self.next_time_of_interest() + self.t._value[...] = datagroup.attrs['t'] # silent parameter update + self.dt._value[...] = datagroup.attrs['dt'] # silent parameter update + for attrname in ('current_iteration', 'tkp1', 'time'): + setattr(self, attrname, datagroup.attrs[attrname]) + def __str__(self): s = "Simulation parameters : " s += "from " + str(self.start) + ' to ' + str(self.end) diff --git a/hysop/tools/cache.py b/hysop/tools/cache.py index 6f3d98899..44a998434 100644 --- a/hysop/tools/cache.py +++ b/hysop/tools/cache.py @@ -27,11 +27,12 @@ def lock_file(filepath, mode, compressed=True, _dir = os.path.dirname(filepath) try: - try: - os.makedirs(_dir) - except OSError as e: - if (e.errno != errno.EEXIST): - raise + if not os.path.isdir(_dir): + try: + os.makedirs(_dir) + except OSError as e: + if (e.errno != errno.EEXIST): + raise if not os.path.exists(filepath): open(filepath, 'a').close() with portalocker.Lock(filename=filepath, timeout=timeout, mode=mode, diff --git a/hysop/tools/debug_dumper.py b/hysop/tools/debug_dumper.py index a7b51459a..868179b3f 100644 --- a/hysop/tools/debug_dumper.py +++ b/hysop/tools/debug_dumper.py @@ -12,11 +12,15 @@ from hysop.fields.discrete_field import DiscreteScalarFieldView class DebugDumper(object): - def __init__(self, name, path='/tmp/hysop/debug', force_overwrite=False, + def __init__(self, name, path, force_overwrite=False, enable_on_op_apply=False, dump_precision=10, comm=MPI.COMM_WORLD, io_leader=0): - directory = path+'/'+name - blobs_directory = directory + '/data' + assert isinstance(name, str), name + assert isinstance(path, str), path + directory = os.path.join(path, name) + blobs_directory = os.path.join(directory, 'data') + if not os.path.isdir(blobs_directory) and comm.rank==0: + os.makedirs(blobs_directory) self.name = name self.directory = directory @@ -61,11 +65,11 @@ class DebugDumper(object): return '{:<4} {:<10} {:<20} {:<40} {:<20} {:<20} {:<20} {:<20} {:<20} {:<20} {}'.format( id_, iteration, time, tag, min_, max_, mean, variance, dtype, shape, description) - def print_header(self): + def print_header(self, with_datetime=False): now = datetime.datetime.now() - self.runfile.write('DEBUG DUMP {} ({})\n'.format( + self.runfile.write('DEBUG DUMP {}{}\n'.format( self.name, - now.strftime("%Y-%m-%d %H:%M"))) + ' ({})'.format(now.strftime("%Y-%m-%d %H:%M")) if with_datetime else '')) self.runfile.write(self.lformat('id', 'iteration', 'time', 'tag', 'min', 'max', 'mean', 'variance', 'dtype', 'shape', 'description')) diff --git a/hysop/tools/io_utils.py b/hysop/tools/io_utils.py index 74fadac74..16d197b31 100755 --- a/hysop/tools/io_utils.py +++ b/hysop/tools/io_utils.py @@ -236,7 +236,8 @@ class IOParams(namedtuple("IOParams", ['filename', 'filepath', 'io_leader', 'visu_leader', 'with_last', 'enable_ram_fs', 'force_ram_fs', 'dump_is_temporary', 'postprocess_dump', 'append', - 'hdf5_disable_compression', 'disk_filepath', 'kwds'])): + 'hdf5_disable_compression', 'hdf5_disable_slicing', + 'disk_filepath', 'kwds'])): """ A struct to handle I/O files parameters @@ -278,6 +279,9 @@ class IOParams(namedtuple("IOParams", ['filename', 'filepath', hdf5_disable_compression: bool Disable compression for HDF5 outputs (when available). Can be used to accelerate in RAM postprocessing. + hdf5_disable_slicing: bool + Disable slicing for HDF5 outputs (when available). + May reduce performance but avoid hdf5 file fragmentation. append : bool, optional Tell if appended (on xmf files, when using hdf format) kwds: dict @@ -298,8 +302,8 @@ class IOParams(namedtuple("IOParams", ['filename', 'filepath', io_leader=0, visu_leader=0, with_last=False, enable_ram_fs=False, force_ram_fs=False, dump_is_temporary=False, postprocess_dump=None, - hdf5_disable_compression=False, append=False, - **kwds): + hdf5_disable_compression=False, hdf5_disable_slicing=False, + append=False, **kwds): dump_tstart = first_not_None(dump_tstart, -np.inf) dump_tend = first_not_None(dump_tend, +np.inf) @@ -320,6 +324,7 @@ class IOParams(namedtuple("IOParams", ['filename', 'filepath', check_instance(dump_is_temporary, bool) check_instance(postprocess_dump, str, allow_none=True) check_instance(hdf5_disable_compression, bool) + check_instance(hdf5_disable_slicing, bool) check_instance(append, bool) if dump_func: assert callable(dump_func), "given function must be callable" @@ -389,15 +394,16 @@ class IOParams(namedtuple("IOParams", ['filename', 'filepath', io_leader, visu_leader, with_last, enable_ram_fs, force_ram_fs, dump_is_temporary, postprocess_dump, append, - hdf5_disable_compression, disk_filepath, kwds) + hdf5_disable_compression, hdf5_disable_slicing, + disk_filepath, kwds) def should_dump(self, simulation): - if self.dump_func is not None: + if (self.dump_func is not None): return self.dump_func(simulation) frequency = self.frequency t = simulation.t() dump = (frequency >= 0) and (self.with_last and simulation._next_is_last) - if (t < self.dump_tstart) or (t > self.dump_tend): + if (t < self.dump_tstart - simulation.tol) or (t > self.dump_tend + simulation.tol): return dump if (frequency >= 0) and simulation.is_time_of_interest: if isinstance(t, np.float32): @@ -411,12 +417,14 @@ class IOParams(namedtuple("IOParams", ['filename', 'filepath', return dump def clone(self, **kwds): - keys = ('filename', 'frequency', 'fileformat', + keys = ('filename', + 'frequency', 'fileformat', 'dump_times', 'dump_tstart', 'dump_tend', 'dump_func', 'io_leader', 'visu_leader', 'with_last', 'enable_ram_fs', 'force_ram_fs', - 'dump_is_temporary', 'postprocess_dump', 'append', - 'hdf5_disable_compression', 'kwds') + 'dump_is_temporary', 'postprocess_dump', + 'hdf5_disable_compression', 'hdf5_disable_slicing', + 'append', 'kwds') diff = set(kwds.keys()).difference(keys) if diff: @@ -432,7 +440,6 @@ class IOParams(namedtuple("IOParams", ['filename', 'filepath', all_kwds[k] = kwds.get(k, getattr(self, k)) all_kwds['filepath'] = kwds.get('filepath', getattr(self, 'disk_filepath')) - return IOParams(**all_kwds) @property @@ -459,6 +466,7 @@ force_ram_fs: {} dump_is_tmp: {} post_process: {} hdf5_no_compr: {} +hdf5_no_slice: {} append: {} extra_kwds: {}'''.format( self.filename, self.filepath, self.fileformat, @@ -468,6 +476,7 @@ extra_kwds: {}'''.format( self.dump_is_temporary, self.postprocess_dump, self.hdf5_disable_compression, + self.hdf5_disable_slicing, self.append, self.kwds) return prefix+('\n'+prefix).join(ss.split('\n')) diff --git a/hysop_examples/example_utils.py b/hysop_examples/example_utils.py index 5cf37c653..d2687d657 100644 --- a/hysop_examples/example_utils.py +++ b/hysop_examples/example_utils.py @@ -210,6 +210,7 @@ class HysopArgParser(argparse.ArgumentParser): assert isinstance(confirm_deletion, bool) if confirm_deletion: for ddir in dump_dirs: + # tar should be kept for checkpoint dumps self._rmfiles(ddir, 'txt') self._rmfiles(ddir, 'out') self._rmfiles(ddir, 'log') @@ -974,6 +975,8 @@ class HysopArgParser(argparse.ArgumentParser): file_io.add_argument('--dump-tend', type=float, default=None, dest='dump_tend', help='Set global end time at which output are dumped for all IO params. Defaults to simulation end.') + file_io.add_argument('--dump-last', action='store_true', dest='dump_last', + help='If set, always dump on last simulation iteration.') file_io.add_argument('--dump-postprocess', type=str, default=None, dest='postprocess_dump', help=('Run a custom command after I/O dump: ' @@ -992,6 +995,11 @@ class HysopArgParser(argparse.ArgumentParser): file_io.add_argument('--hdf5-disable-compression', default=False, action='store_true', dest='hdf5_disable_compression', help='Disable compression for HDF5 outputs (when available).') + file_io.add_argument('--hdf5-disable-slicing', default=False, action='store_true', + dest='hdf5_disable_slicing', + help=('Disable HDF5 slicing that is obtained with XDMF JOIN. ' + 'May reduce performances when HDF5 slicing applies (<= 16 processes slab topologies).' + 'Enabling this option guarantees a single HDF5 file for all processes per dump.')) # list of additional named io_params to be generated assert (generate_io_params is not None), generate_io_params @@ -1003,20 +1011,20 @@ class HysopArgParser(argparse.ArgumentParser): description = ('Configure problem checkpoints I/O parameters, dumped checkpoints represent simulation states ' 'that can be loaded back to continue the simulation later on.') pargs = self.add_argument_group('{} I/O'.format(pname.upper()), description=description) - pargs.add_argument('-L', '--load-checkpoint', default=None, const='checkpoint', nargs='?', type=str, dest='load_checkpoint', + pargs.add_argument('-L', '--load-checkpoint', default=None, const='checkpoint.tar', nargs='?', type=str, dest='load_checkpoint', help=('Begin simulation from this checkpoint. Can be given as fullpath or as a filename relative to --checkpoint-dump-dir. ' 'The given checkpoint has to be compatible with the problem it will be loaded to. ' 'This will only work if parameter names, variable names, operator names, discretization and global topology information remain unchanged. ' 'Operator ordering, boundary conditions, data ordering, data permutation and MPI layouts may be however be changed. ' - 'Defaults to {checkpoint_output_dir}/checkpoint.zip if no filename is specified.')) - pargs.add_argument('-S', '--save-checkpoint', default=None, const='checkpoint', nargs='?', type=str, dest='save_checkpoint', + 'Defaults to {checkpoint_output_dir}/checkpoint.tar if no filename is specified.')) + pargs.add_argument('-S', '--save-checkpoint', default=None, const='checkpoint.tar', nargs='?', type=str, dest='save_checkpoint', help=('Enable simulation checkpoints to be able to restart simulations from a specific point later on. ' 'Can be given as fullpath or as a filename relative to --checkpoint-dump-dir. ' 'Frequency or time of interests for checkpoints can be configured by using global FILE I/O parameters or ' 'specific --checkpoint-dump-* arguments which takes priority over global ones. ' 'Should not be to frequent for efficiency reasons. May be used in conjunction with --load-checkpoint, ' 'in which case the starting checkpoint may be overwritten in the case the same path are given. ' - 'Defaults to {checkpoint_output_dir}/checkpoint.zip if no filename is specified.')) + 'Defaults to {checkpoint_output_dir}/checkpoint.tar if no filename is specified.')) else: pargs = self.add_argument_group('{} I/O'.format(pname.upper())) @@ -1042,28 +1050,34 @@ class HysopArgParser(argparse.ArgumentParser): type=float, default=None, dest='{}_dump_tstart'.format(pname), help='Set starting time at which output are dumped for IO parameter \'{}\'.'.format(pname)) + pargs.add_argument('--{}-dump-last'.format(pname), action='store_true', + dest='{}_dump_last'.format(pname), + help='If set, always dump on last simulation iteration for IO parameter \'{}\''.format(pname)) pargs.add_argument('--{}-dump-tend'.format(pname), type=float, default=None, dest='{}_dump_tend'.format(pname), help='Set end time at which output are dumped for IO parameter \'{}\'.'.format(pname)) - file_io.add_argument('--{}-dump-postprocess'.format(pname), type=str, default=None, + pargs.add_argument('--{}-dump-postprocess'.format(pname), type=str, default=None, dest='{}_postprocess_dump'.format(pname), help=('Run a custom command after {} I/O dump: '.format(pname) +'command FILENAME\n' +'See hysop/tools/postprocess_dump.sh for an example of post processing script.\n' +'{} I/O can be postprocessed directly from RAM by setting --enable-ram-fs.'.format(pname))) - file_io.add_argument('--{}-dump-is-temporary'.format(pname), default=None, action='store_true', + pargs.add_argument('--{}-dump-is-temporary'.format(pname), default=None, action='store_true', dest='{}_dump_is_temporary'.format(pname), help='Delete {} data files after callback has been executed. Best used with --enable-ram-fs and --dump-post-process.'.format(pname)) - file_io.add_argument('--{}-enable-ram-fs'.format(pname), default=None, action='store_true', + pargs.add_argument('--{}-enable-ram-fs'.format(pname), default=None, action='store_true', dest='{}_enable_ram_fs'.format(pname), help='Dump I/O directly into RAM (if possible), else fallback to --dump-dir unless --force-ram-fs has been set.') - file_io.add_argument('--{}-force-ram-fs'.format(pname), default=None, action='store_true', + pargs.add_argument('--{}-force-ram-fs'.format(pname), default=None, action='store_true', dest='{}_force_ram_fs'.format(pname), help='Dump {} I/O directly into RAM (if possible), else raise an EnvironmentError. Implies --enable-ram-fs. When enabled --dump-dir is ignored.'.format(pname)) - file_io.add_argument('--{}-hdf5-disable-compression'.format(pname), default=None, action='store_true', + pargs.add_argument('--{}-hdf5-disable-compression'.format(pname), default=None, action='store_true', dest='{}_hdf5_disable_compression'.format(pname), help='Disable compression for {} HDF5 outputs (when available).'.format(pname)) + pargs.add_argument('--{}-hdf5-disable-slicing'.format(pname), default=False, action='store_true', + dest='{}_hdf5_disable_slicing'.format(pname), + help='Disable HDF5 slicing that is obtained with XDMF JOIN for {}.'.format(pname)) setattr(file_io, '{}_io'.format(pname), pargs) file_io.add_argument('--cache-dir', type=str, default=None, @@ -1074,9 +1088,9 @@ class HysopArgParser(argparse.ArgumentParser): dest='override_cache', help='Ignore cached data.') file_io.add_argument('--debug-dump-dir', type=str, - default='{}/hysop/debug'.format(self.tmp_dir()), + default=None, dest='debug_dump_dir', - help=('Target root directory for debug dumps. Debug dumps will appear into <dump dir>/<target>.')) + help=('Target root directory for debug dumps. Debug dumps will appear into <dump dir>/<target>. Defaults to global hysop dump_dir.')) file_io.add_argument('--debug-dump-target', type=str, default=None, dest='debug_dump_target', help=('Tag for field debug dumps. Debug dumps will appear into <dump dir>/<target>.')) @@ -1098,13 +1112,14 @@ class HysopArgParser(argparse.ArgumentParser): self._check_default(args, ('cache_dir', 'postprocess_dump'), str, allow_none=True) self._check_dir(args, 'cache_dir', allow_shared=False, allow_none=True) self._check_default(args, ('no_interactive', 'dump_is_temporary', - 'enable_ram_fs', 'force_ram_fs', 'hdf5_disable_compression'), + 'enable_ram_fs', 'force_ram_fs', 'hdf5_disable_compression', 'hdf5_disable_slicing'), bool, allow_none=False) self._check_default(args, 'dump_dir', str, allow_none=False) self._check_default(args, 'dump_freq', int, allow_none=True) self._check_default(args, 'dump_period', float, allow_none=True) self._check_default(args, 'dump_times', tuple, allow_none=True) + self._check_default(args, 'dump_last', bool, allow_none=False) self._check_positive(args, 'dump_freq', strict=False, allow_none=False) self._check_dir(args, 'dump_dir', allow_shared=True, allow_none=True) @@ -1162,9 +1177,9 @@ class HysopArgParser(argparse.ArgumentParser): setattr(args, vname, default_value) value = getattr(args, vname) return value - for argname in ('dir','freq','period','times','tstart','tend','is_temporary'): + for argname in ('dir','freq','period','last', 'times','tstart','tend','is_temporary'): _set_arg(args, argname, pname, prefix='dump_') - for argname in ('enable_ram_fs', 'force_ram_fs', 'hdf5_disable_compression', 'postprocess_dump'): + for argname in ('enable_ram_fs', 'force_ram_fs', 'hdf5_disable_compression', 'hdf5_disable_slicing', 'postprocess_dump'): _set_arg(args, argname, pname) if getattr(args, '{}_force_ram_fs'.format(pname)): setattr(args, '{}_enable_ram_fs'.format(pname), True) @@ -1179,12 +1194,13 @@ class HysopArgParser(argparse.ArgumentParser): self._check_default(args, '{}_freq'.format(bname), int, allow_none=False) self._check_default(args, '{}_period'.format(bname), float, allow_none=True) self._check_default(args, '{}_times'.format(bname), tuple, allow_none=True) + self._check_default(args, '{}_last'.format(bname), bool, allow_none=True) self._check_default(args, '{}_tstart'.format(bname), float, allow_none=False) self._check_default(args, '{}_tend'.format(bname), float, allow_none=False) self._check_positive(args, '{}_freq'.format(bname), strict=False, allow_none=False) self._check_default(args, tuple(map(lambda k: '{}_{}'.format('{}'.format(pname), k), ('dump_is_temporary', - 'enable_ram_fs', 'force_ram_fs', 'hdf5_disable_compression'))), + 'enable_ram_fs', 'force_ram_fs', 'hdf5_disable_compression', 'hdf5_disable_slicing'))), bool, allow_none=False) self._check_dir(args, '{}_dir'.format(bname), allow_shared=True, allow_none=False) @@ -1244,7 +1260,7 @@ class HysopArgParser(argparse.ArgumentParser): self.error(msg) args.times_of_interest = times_of_interest - + # checkpoints self._check_default(args, 'load_checkpoint', str, allow_none=True) self._check_default(args, 'save_checkpoint', str, allow_none=True) @@ -1537,28 +1553,34 @@ class HysopArgParser(argparse.ArgumentParser): args.io_params = IOParams(filename=None, filepath=args.dump_dir, frequency=args.dump_freq, dump_times=args.dump_times, - dump_tstart=args.dump_tstart, dump_tend=args.dump_tend, + dump_tstart=args.dump_tstart, dump_tend=args.dump_tend, dump_last=args.dump_last, enable_ram_fs=args.enable_ram_fs, force_ram_fs=args.force_ram_fs, dump_is_temporary=args.dump_is_temporary, postprocess_dump=args.postprocess_dump, - hdf5_disable_compression=args.hdf5_disable_compression) + hdf5_disable_compression=args.hdf5_disable_compression, + hdf5_disable_slicing=args.hdf5_disable_slicing) for pname in self.generate_io_params: iop = IOParams(filename=None, filepath = getattr(args, '{}_dump_dir'.format(pname)), frequency = getattr(args, '{}_dump_freq'.format(pname)), dump_times = getattr(args, '{}_dump_times'.format(pname)), + with_last = getattr(args, '{}_dump_last'.format(pname)) or args.dump_last, dump_tstart = getattr(args, '{}_dump_tstart'.format(pname)), dump_tend = getattr(args, '{}_dump_tend'.format(pname)), enable_ram_fs = getattr(args, '{}_enable_ram_fs'.format(pname)), force_ram_fs = getattr(args, '{}_force_ram_fs'.format(pname)), dump_is_temporary = getattr(args, '{}_dump_is_temporary'.format(pname)), postprocess_dump = getattr(args, '{}_postprocess_dump'.format(pname)), - hdf5_disable_compression = getattr(args, '{}_hdf5_disable_compression'.format(pname))) + hdf5_disable_compression = getattr(args, '{}_hdf5_disable_compression'.format(pname)), + hdf5_disable_slicing = getattr(args, '{}_hdf5_disable_slicing'.format(pname))) setattr(args, '{}_io_params'.format(pname), iop) load_checkpoint = args.load_checkpoint if (load_checkpoint is not None): + if not load_checkpoint.endswith('.tar'): + msg='Load checkpoint filename has to end with .tar, got \'{}\'.' + self.error(msg.format(load_checkpoint)) if (os.path.sep not in load_checkpoint): load_checkpoint = os.path.join(args.checkpoint_dump_dir, load_checkpoint) if not os.path.isfile(load_checkpoint): @@ -1569,11 +1591,18 @@ class HysopArgParser(argparse.ArgumentParser): save_checkpoint = args.save_checkpoint if (save_checkpoint is not None): + if not save_checkpoint.endswith('.tar'): + msg='Save checkpoint filename has to end with .tar, got \'{}\'.' + self.error(msg.format(save_checkpoint)) if (os.path.sep not in save_checkpoint): save_checkpoint = os.path.join(args.checkpoint_dump_dir, save_checkpoint) save_checkpoint = os.path.abspath(save_checkpoint) args.checkpoint_dump_dir = os.path.dirname(save_checkpoint) args.save_checkpoint = save_checkpoint + + # debug dumps + if (args.debug_dump_dir is None): + args.debug_dump_dir = args.dump_dir def _setup_implementation(self, args): diff --git a/hysop_examples/examples/analytic/analytic.py b/hysop_examples/examples/analytic/analytic.py index 4d7f1d3f8..548165fac 100755 --- a/hysop_examples/examples/analytic/analytic.py +++ b/hysop_examples/examples/analytic/analytic.py @@ -108,7 +108,7 @@ def compute(args): simu = Simulation(start=args.tstart, end=args.tend, nb_iter=args.nb_iter, dt0=args.dt, max_iter=args.max_iter, - times_of_interest=args.dump_times, + times_of_interest=args.times_of_interest, t=t) # Finally solve the problem diff --git a/hysop_examples/examples/bubble/periodic_bubble.py b/hysop_examples/examples/bubble/periodic_bubble.py index 99c7358ba..76c67733a 100644 --- a/hysop_examples/examples/bubble/periodic_bubble.py +++ b/hysop_examples/examples/bubble/periodic_bubble.py @@ -275,7 +275,7 @@ def compute(args): simu = Simulation(start=args.tstart, end=args.tend, nb_iter=args.nb_iter, max_iter=args.max_iter, - dt0=args.dt, times_of_interest=args.dump_times, + dt0=args.dt, times_of_interest=args.times_of_interest, t=t, dt=dt) simu.write_parameters(t, dt_cfl, dt_advec, dt, enstrophy, rhov, muv, diff --git a/hysop_examples/examples/bubble/periodic_bubble_levelset.py b/hysop_examples/examples/bubble/periodic_bubble_levelset.py index 7574769f0..df11b891a 100644 --- a/hysop_examples/examples/bubble/periodic_bubble_levelset.py +++ b/hysop_examples/examples/bubble/periodic_bubble_levelset.py @@ -276,7 +276,7 @@ def compute(args): simu = Simulation(start=args.tstart, end=args.tend, nb_iter=args.nb_iter, max_iter=args.max_iter, - dt0=args.dt, times_of_interest=args.dump_times, + dt0=args.dt, times_of_interest=args.times_of_interest, t=t, dt=dt) simu.write_parameters(t, dt_cfl, dt_advec, dt, enstrophy, rhov, muv, diff --git a/hysop_examples/examples/bubble/periodic_bubble_levelset_penalization.py b/hysop_examples/examples/bubble/periodic_bubble_levelset_penalization.py index 1d6a21b8c..848886963 100644 --- a/hysop_examples/examples/bubble/periodic_bubble_levelset_penalization.py +++ b/hysop_examples/examples/bubble/periodic_bubble_levelset_penalization.py @@ -317,7 +317,7 @@ def compute(args): simu = Simulation(start=args.tstart, end=args.tend, nb_iter=args.nb_iter, max_iter=args.max_iter, - dt0=args.dt, times_of_interest=args.dump_times, + dt0=args.dt, times_of_interest=args.times_of_interest, t=t, dt=dt) simu.write_parameters(t, dt_cfl, dt_advec, dt, enstrophy, rhov, muv, diff --git a/hysop_examples/examples/bubble/periodic_jet_levelset.py b/hysop_examples/examples/bubble/periodic_jet_levelset.py index 40d2ecebc..479c6469e 100644 --- a/hysop_examples/examples/bubble/periodic_jet_levelset.py +++ b/hysop_examples/examples/bubble/periodic_jet_levelset.py @@ -265,7 +265,7 @@ def compute(args): simu = Simulation(start=args.tstart, end=args.tend, nb_iter=args.nb_iter, max_iter=args.max_iter, - dt0=args.dt, times_of_interest=args.dump_times, + dt0=args.dt, times_of_interest=args.times_of_interest, t=t, dt=dt) simu.write_parameters(t, dt_cfl, dt_advec, dt, enstrophy, rhov, diff --git a/hysop_examples/examples/cylinder/oscillating_cylinder.py b/hysop_examples/examples/cylinder/oscillating_cylinder.py index 8975a9029..a19d4f94d 100644 --- a/hysop_examples/examples/cylinder/oscillating_cylinder.py +++ b/hysop_examples/examples/cylinder/oscillating_cylinder.py @@ -240,7 +240,7 @@ def compute(args): simu = Simulation(start=args.tstart, end=args.tend, nb_iter=args.nb_iter, max_iter=args.max_iter, - dt0=args.dt, times_of_interest=args.dump_times, + dt0=args.dt, times_of_interest=args.times_of_interest, t=t, dt=dt) simu.write_parameters(t, dt_cfl, dt_advec, dt, min_max_U.Finf, min_max_W.Finf, diff --git a/hysop_examples/examples/fixed_point/heat_equation.py b/hysop_examples/examples/fixed_point/heat_equation.py index 5ed1ad646..7804d6231 100644 --- a/hysop_examples/examples/fixed_point/heat_equation.py +++ b/hysop_examples/examples/fixed_point/heat_equation.py @@ -187,7 +187,7 @@ def compute(args): simu = Simulation(start=args.tstart, end=args.tend, nb_iter=args.nb_iter, max_iter=args.max_iter, - dt0=args.dt, times_of_interest=args.dump_times, + dt0=args.dt, times_of_interest=args.times_of_interest, t=t, dt=dt) simu.write_parameters(t, fixedPoint.it_num, filename='parameters.txt', precision=8) diff --git a/hysop_examples/examples/flow_around_sphere/flow_around_sphere.py b/hysop_examples/examples/flow_around_sphere/flow_around_sphere.py index 4732e599d..16a239382 100644 --- a/hysop_examples/examples/flow_around_sphere/flow_around_sphere.py +++ b/hysop_examples/examples/flow_around_sphere/flow_around_sphere.py @@ -309,7 +309,7 @@ def compute(args): simu = Simulation(start=args.tstart, end=args.tend, nb_iter=args.nb_iter, max_iter=args.max_iter, - dt0=args.dt, times_of_interest=args.dump_times, + dt0=args.dt, times_of_interest=args.times_of_interest, t=t, dt=dt) simu.write_parameters(t, dt_cfl, dt_advec, dt, enstrophy, flowrate, min_max_U.Finf, min_max_W.Finf, adapt_dt.equivalent_CFL, diff --git a/hysop_examples/examples/multiresolution/scalar_advection.py b/hysop_examples/examples/multiresolution/scalar_advection.py index f7cfbf51d..f41cf0a00 100644 --- a/hysop_examples/examples/multiresolution/scalar_advection.py +++ b/hysop_examples/examples/multiresolution/scalar_advection.py @@ -178,7 +178,7 @@ def compute(args): simu = Simulation(start=args.tstart, end=args.tend, nb_iter=args.nb_iter, max_iter=args.max_iter, - times_of_interest=args.dump_times, + times_of_interest=args.times_of_interest, dt=dt, dt0=dt0) # Finally solve the problem diff --git a/hysop_examples/examples/particles_above_salt/particles_above_salt_bc.py b/hysop_examples/examples/particles_above_salt/particles_above_salt_bc.py index 1229af4ad..77c7ace66 100644 --- a/hysop_examples/examples/particles_above_salt/particles_above_salt_bc.py +++ b/hysop_examples/examples/particles_above_salt/particles_above_salt_bc.py @@ -302,7 +302,7 @@ def compute(args): simu = Simulation(start=args.tstart, end=args.tend, nb_iter=args.nb_iter, max_iter=args.max_iter, - dt0=args.dt, times_of_interest=args.dump_times, + dt0=args.dt, times_of_interest=args.times_of_interest, t=t, dt=dt) simu.write_parameters(t, dt_cfl, dt_advec, dt, min_max_U.Finf, min_max_W.Finf, adapt_dt.equivalent_CFL, diff --git a/hysop_examples/examples/particles_above_salt/particles_above_salt_bc_3d.py b/hysop_examples/examples/particles_above_salt/particles_above_salt_bc_3d.py index 00b82718e..0e4abfd2d 100644 --- a/hysop_examples/examples/particles_above_salt/particles_above_salt_bc_3d.py +++ b/hysop_examples/examples/particles_above_salt/particles_above_salt_bc_3d.py @@ -318,7 +318,7 @@ def compute(args): simu = Simulation(start=args.tstart, end=args.tend, nb_iter=args.nb_iter, max_iter=args.max_iter, - dt0=args.dt, times_of_interest=args.dump_times, + dt0=args.dt, times_of_interest=args.times_of_interest, t=t, dt=dt) simu.write_parameters(t, dt_cfl, dt_advec, dt, min_max_U.Finf, min_max_W.Finf, adapt_dt.equivalent_CFL, diff --git a/hysop_examples/examples/particles_above_salt/particles_above_salt_periodic.py b/hysop_examples/examples/particles_above_salt/particles_above_salt_periodic.py index 49e315386..e028fa051 100644 --- a/hysop_examples/examples/particles_above_salt/particles_above_salt_periodic.py +++ b/hysop_examples/examples/particles_above_salt/particles_above_salt_periodic.py @@ -312,7 +312,7 @@ def compute(args): simu = Simulation(start=args.tstart, end=args.tend, nb_iter=args.nb_iter, max_iter=args.max_iter, - dt0=args.dt, times_of_interest=args.dump_times, + dt0=args.dt, times_of_interest=args.times_of_interest, t=t, dt=dt) simu.write_parameters(t, dt_cfl, dt_advec, dt, min_max_U.Finf, min_max_W.Finf, adapt_dt.equivalent_CFL, diff --git a/hysop_examples/examples/particles_above_salt/particles_above_salt_symmetrized.py b/hysop_examples/examples/particles_above_salt/particles_above_salt_symmetrized.py index ff6fb85cc..a261d3c4f 100644 --- a/hysop_examples/examples/particles_above_salt/particles_above_salt_symmetrized.py +++ b/hysop_examples/examples/particles_above_salt/particles_above_salt_symmetrized.py @@ -300,7 +300,7 @@ def compute(args): simu = Simulation(start=args.tstart, end=args.tend, nb_iter=args.nb_iter, max_iter=args.max_iter, - dt0=args.dt, times_of_interest=args.dump_times, + dt0=args.dt, times_of_interest=args.times_of_interest, t=t, dt=dt) simu.write_parameters(t, dt_cfl, dt_advec, dt, min_max_U.Finf, min_max_W.Finf, adapt_dt.equivalent_CFL, diff --git a/hysop_examples/examples/scalar_advection/scalar_advection.py b/hysop_examples/examples/scalar_advection/scalar_advection.py index a43204e41..008db85b2 100644 --- a/hysop_examples/examples/scalar_advection/scalar_advection.py +++ b/hysop_examples/examples/scalar_advection/scalar_advection.py @@ -131,7 +131,7 @@ def compute(args): simu = Simulation(start=args.tstart, end=args.tend, nb_iter=args.nb_iter, max_iter=args.max_iter, - times_of_interest=args.dump_times, + times_of_interest=args.times_of_interest, dt=dt, dt0=dt0) # Finally solve the problem diff --git a/hysop_examples/examples/scalar_diffusion/scalar_diffusion.py b/hysop_examples/examples/scalar_diffusion/scalar_diffusion.py index 23e130e62..cddacb6a5 100755 --- a/hysop_examples/examples/scalar_diffusion/scalar_diffusion.py +++ b/hysop_examples/examples/scalar_diffusion/scalar_diffusion.py @@ -118,7 +118,7 @@ def compute(args): # (do not forget to specify the dt parameter here) simu = Simulation(start=args.tstart, end=args.tend, nb_iter=args.nb_iter, max_iter=args.max_iter, - times_of_interest=args.dump_times, + times_of_interest=args.times_of_interest, dt=dt, dt0=args.dt) # Finally solve the problem diff --git a/hysop_examples/examples/sediment_deposit/sediment_deposit.py b/hysop_examples/examples/sediment_deposit/sediment_deposit.py index 8bae7c71c..9b67663e7 100644 --- a/hysop_examples/examples/sediment_deposit/sediment_deposit.py +++ b/hysop_examples/examples/sediment_deposit/sediment_deposit.py @@ -314,7 +314,7 @@ def compute(args): simu = Simulation(start=args.tstart, end=args.tend, nb_iter=args.nb_iter, max_iter=args.max_iter, - dt0=args.dt, times_of_interest=args.dump_times, + dt0=args.dt, times_of_interest=args.times_of_interest, t=t, dt=dt) simu.write_parameters(t, dt_cfl, dt_advec, dt, min_max_U.Finf, min_max_W.Finf, adapt_dt.equivalent_CFL, diff --git a/hysop_examples/examples/sediment_deposit/sediment_deposit_levelset.py b/hysop_examples/examples/sediment_deposit/sediment_deposit_levelset.py index 9adbca147..fb1848c67 100644 --- a/hysop_examples/examples/sediment_deposit/sediment_deposit_levelset.py +++ b/hysop_examples/examples/sediment_deposit/sediment_deposit_levelset.py @@ -375,7 +375,7 @@ def compute(args): simu = Simulation(start=args.tstart, end=args.tend, nb_iter=args.nb_iter, max_iter=args.max_iter, - dt0=args.dt, times_of_interest=args.dump_times, + dt0=args.dt, times_of_interest=args.times_of_interest, t=t, dt=dt) simu.write_parameters(t, dt_cfl, dt_advec, dt, min_max_U.Finf, min_max_W.Finf, adapt_dt.equivalent_CFL, diff --git a/hysop_examples/examples/shear_layer/shear_layer.py b/hysop_examples/examples/shear_layer/shear_layer.py index c746c2f6e..63dfebfb0 100644 --- a/hysop_examples/examples/shear_layer/shear_layer.py +++ b/hysop_examples/examples/shear_layer/shear_layer.py @@ -178,7 +178,7 @@ def compute(args): simu = Simulation(start=args.tstart, end=args.tend, nb_iter=args.nb_iter, max_iter=args.max_iter, - dt0=args.dt, times_of_interest=args.dump_times, + dt0=args.dt, times_of_interest=args.times_of_interest, t=t, dt=dt) simu.write_parameters(t, dt, filename='parameters.txt', precision=4) diff --git a/hysop_examples/examples/taylor_green/taylor_green.py b/hysop_examples/examples/taylor_green/taylor_green.py index 2ee4bd75e..b0c9d728e 100644 --- a/hysop_examples/examples/taylor_green/taylor_green.py +++ b/hysop_examples/examples/taylor_green/taylor_green.py @@ -285,7 +285,7 @@ def compute(args): simu = Simulation(start=args.tstart, end=args.tend, nb_iter=args.nb_iter, max_iter=args.max_iter, - dt0=args.dt, times_of_interest=args.dump_times, + dt0=args.dt, times_of_interest=args.times_of_interest, t=t, dt=dt) params = (t, dt, enstrophy,) if args.variable_timestep: diff --git a/hysop_examples/examples/taylor_green/taylor_green_cpuFortran.py b/hysop_examples/examples/taylor_green/taylor_green_cpuFortran.py index 22e3f8a05..b6a65931b 100644 --- a/hysop_examples/examples/taylor_green/taylor_green_cpuFortran.py +++ b/hysop_examples/examples/taylor_green/taylor_green_cpuFortran.py @@ -205,7 +205,7 @@ def compute(args): simu = Simulation(start=args.tstart, end=args.tend, nb_iter=args.nb_iter, max_iter=args.max_iter, - dt0=args.dt, times_of_interest=args.dump_times, + dt0=args.dt, times_of_interest=args.times_of_interest, t=t, dt=dt) simu.write_parameters(t, dt_cfl, dt_advec, dt, enstrophy, min_max_U.Finf, min_max_W.Finf, adapt_dt.equivalent_CFL, -- GitLab