diff --git a/hysop/backend/device/opencl/opencl_env.py b/hysop/backend/device/opencl/opencl_env.py index ed84e0f6843ac5654d7b24090764d0dfdfb294df..f42bc18fd9f20a0484afb83e86b22f9e95b21a20 100644 --- a/hysop/backend/device/opencl/opencl_env.py +++ b/hysop/backend/device/opencl/opencl_env.py @@ -174,7 +174,7 @@ device.opencl_c_version, bytes2str(device.global_mem_size)) self.device_id = device_id self.name = name - self._check_comm_devices() + #self._check_comm_devices() def build_typegen(self, precision, float_dump_mode, use_short_circuit_ops, unroll_loops): diff --git a/hysop/core/graph/computational_node.py b/hysop/core/graph/computational_node.py index 9bfc396f41f5968e9512b82474afe861cae987bc..777b6c0c7a7bdc16fec615dce7650a15e1d94d4c 100644 --- a/hysop/core/graph/computational_node.py +++ b/hysop/core/graph/computational_node.py @@ -371,7 +371,7 @@ class ComputationalGraphNode(OperatorBase): oparams = set(self.output_params.values()) parameters = tuple(iparams.union(oparams)) - if 'mpi_params' in self.__kwds: + if ('mpi_params' in self.__kwds) and ('ComputationalGraph' not in map(lambda c: c.__name__, self.__class__.__mro__)): mpi_params = self.__kwds['mpi_params'] for topo in set(self.input_fields.values() + self.output_fields.values()): if isinstance(topo, Topology) and (topo.mpi_params != mpi_params): diff --git a/hysop/problem.py b/hysop/problem.py index 2cda0b87aa123c3200d2dba98c2d11928036145b..23858550d5315198316c480683de0555b7b9956b 100644 --- a/hysop/problem.py +++ b/hysop/problem.py @@ -1,22 +1,28 @@ -import datetime -import sys +from __future__ import absolute_import +import operator, os, sys, datetime, warnings, shutil, tarfile +import numpy as np + from hysop.constants import Backend, MemoryOrdering +from hysop.tools.types import first_not_None, to_tuple from hysop.tools.string_utils import vprint_banner from hysop.tools.contexts import Timer from hysop.tools.decorators import debug +from hysop.tools.parameters import MPIParams +from hysop.tools.numerics import default_invalid_value from hysop.core.graph.computational_graph import ComputationalGraph from hysop.tools.string_utils import vprint_banner, vprint from hysop.core.mpi import main_rank, main_size, main_comm +from hysop.fields.cartesian_discrete_field import CartesianDiscreteScalarField +from hysop.domain.box import Box class Problem(ComputationalGraph): - def __init__(self, name=None, method=None, + def __init__(self, name=None, method=None, mpi_params=None, check_unique_clenv=True, **kwds): - super(Problem, self).__init__(name=name, method=method, **kwds) + mpi_params = first_not_None(mpi_params, MPIParams()) # enforce mpi params for problems + super(Problem, self).__init__(name=name, method=method, mpi_params=mpi_params, **kwds) self._do_check_unique_clenv = check_unique_clenv - mpi_params = kwds['mpi_params'] if 'mpi_params' in kwds else None - self._rank = main_rank if mpi_params is None else mpi_params.rank @debug def insert(self, *ops): @@ -125,23 +131,28 @@ class Problem(ComputationalGraph): @debug def solve(self, simu, dry_run=False, dbg=None, - report_freq=10, plot_freq=10, **kargs): + report_freq=10, plot_freq=10, checkpoint_io_params=None, + load_checkpoint=None, save_checkpoint=None, **kwds): + if dry_run: vprint() vprint_banner('** Dry-run requested, skipping simulation. **') return + + self.load_checkpoint(load_checkpoint, simu) + self.create_checkpoint_template(save_checkpoint, checkpoint_io_params) vprint('\nSolving problem...') - simu.initialize() with Timer() as tm: while not simu.is_over: vprint() simu.print_state() - self.apply(simulation=simu, dbg=dbg, **kargs) + self.apply(simulation=simu, dbg=dbg, **kwds) simu.advance(dbg=dbg, plot_freq=plot_freq) if report_freq and (simu.current_iteration % report_freq) == 0: self.profiler_report() + self.save_checkpoint(save_checkpoint, checkpoint_io_params, simu) avg_time = main_comm.allreduce(tm.interval) / main_size msg = ' Simulation took {} ({}s)' @@ -161,8 +172,299 @@ class Problem(ComputationalGraph): def final_report(self): self.profiler_report() + + @debug + def load_checkpoint(self, load_checkpoint, simu): + if (load_checkpoint is None): + simu.initialize() + return + + vprint('\nLoading problem checkpoint from \'{}\'...'.format(load_checkpoint)) + if not os.path.exists(load_checkpoint): + msg='Failed to load checkpoint \'{}\' because the file does not exist.' + raise RuntimeError(msg.format(load_checkpoint)) + + raise NotImplementedError + + @debug + def save_checkpoint(self, save_checkpoint, checkpoint_io_params, simu): + if (save_checkpoint is None): + return + + if (checkpoint_io_params is None): + msg='Save checkpoint has been set to \'{}\' but checkpoint_io_params has not been specified.' + raise RuntimeError(msg.format(save_checkpoint)) + + if not checkpoint_io_params.should_dump(simu): + return + + save_checkpoint_tar = save_checkpoint+'.tar' + + vprint('>Exporting problem checkpoint to \'{}\':'.format(save_checkpoint_tar)) + is_io_leader = (checkpoint_io_params.io_leader == self.mpi_params.rank) + + # create a backup of last checkpoint in the case things go wrong + if is_io_leader and os.path.exists(save_checkpoint_tar): + backup_checkpoint_tar = save_checkpoint_tar + '.bak' + if os.path.exists(backup_checkpoint_tar): + os.remove(backup_checkpoint_tar) + os.rename(save_checkpoint_tar, backup_checkpoint_tar) + else: + backup_checkpoint_tar = None + + # try to create the checkpoint directory, this is a collective MPI operation + try: + success, reason = self._dump_checkpoint(save_checkpoint, checkpoint_io_params, simu) + except Exception as e: + raise + success = False + reason = str(e) + success = main_comm.allreduce(int(success)) == main_comm.size + + # Compress checkpoint directory to tar (easier to copy between clusters) + # Note that there is not effective compression here, zarr already compressed field/param data + if success and is_io_leader and os.path.isdir(save_checkpoint): + try: + with tarfile.open(save_checkpoint_tar, 'w') as tf: + for (root, dirs, files) in os.walk(save_checkpoint): + for f in files: + fpath = os.path.join(root, f) + tf.add(fpath, arcname=fpath.replace(save_checkpoint+os.path.sep,'')) + if os.path.isfile(save_checkpoint_tar): + shutil.rmtree(save_checkpoint) + except: + raise + success = False + + success = main_comm.allreduce(int(success)) == main_comm.size + if not success: + from hysop.tools.warning import HysopDumpWarning + msg='Failed to dump checkpoint because: {}.'.format(reason) + warnings.warn(msg, HysopDumpWarning) + + # Something went wrong (I/O error or other) so we rollback to previous checkpoint (if there is one) + vprint(' | An error occured during checkpoint creation, rolling back to previous one...') + if is_io_leader: + if os.path.exists(save_checkpoint): + shutil.rmtree(save_checkpoint) + if os.path.exists(save_checkpoint_tar): + os.remove(save_checkpoint_tar) + if (backup_checkpoint_tar is not None) and os.path.exists(backup_checkpoint_tar): + os.rename(backup_checkpoint_tar, save_checkpoint_tar) + + def create_checkpoint_template(self, save_checkpoint, checkpoint_io_params): + # Create groups of arrays on disk (only hierarchy and array metadata is stored in the template) + # /!\ ZipStores are not safe from multiple processes so we use a DirectoryStore + # that can then be tarred manually by io_leader. + + if (save_checkpoint is None): + return + + checkpoint_template = save_checkpoint + '.template' + self._checkpoint_template = checkpoint_template + del save_checkpoint + + vprint('\n>Creating checkpoint template as \'{}\'...'.format(checkpoint_template)) + mpi_params = self.mpi_params + comm = mpi_params.comm + io_leader = checkpoint_io_params.io_leader + is_io_leader = (io_leader == mpi_params.rank) + + # Array block data compressor + from numcodecs import blosc, Blosc + blosc.use_threads = (self.mpi_params.size == 1) # disable threads for multiple processes (can deadlock) + compressor = Blosc(cname='snappy', clevel=3, shuffle=Blosc.BITSHUFFLE) + + # Create a directory layout as a file on shared filesystem + import zarr + + if is_io_leader: + if os.path.exists(checkpoint_template): + shutil.rmtree(checkpoint_template) + store = zarr.DirectoryStore(path=checkpoint_template) + root = zarr.open_group(store=store, mode='w', path='') + fields = root.create_group('fields') + else: + store = None + root = None + fields = None + + for field in sorted(self.fields, key=operator.attrgetter('name')): + # we do not care about fields discretized only on temporary fields + if all(df.is_tmp for df in field.discrete_fields.values()): + continue + + if is_io_leader: + discrete_fields = fields.create_group(field.name) + else: + discrete_fields = None + + dim = field.dim + domain = field.domain._domain + + if isinstance(domain, Box): + if (discrete_fields is not None): + discrete_fields.attrs['domain'] = 'Box' + discrete_fields.attrs['dim'] = domain.dim + discrete_fields.attrs['origin'] = to_tuple(domain.origin) + discrete_fields.attrs['end'] = to_tuple(domain.end) + discrete_fields.attrs['length'] = to_tuple(domain.length) + else: + # for now we just handle Boxed domains + raise NotImplementedError + + for (k, topo) in enumerate(sorted(field.discrete_fields, key=operator.attrgetter('full_tag'))): + dfield = field.discrete_fields[topo] + mesh = topo.mesh._mesh + + # we do not care about temporary fields + if dfield.is_tmp: + continue + + if not isinstance(dfield, CartesianDiscreteScalarField): + # for now we just handle CartesianDiscreteScalarFields. + raise NotImplementedError + + global_resolution = topo.global_resolution # logical grid size + grid_resolution = topo.grid_resolution # effective grid size + ghosts = topo.ghosts + + # get local resolutions exluding ghosts + compute_resolutions = comm.gather(to_tuple(mesh.compute_resolution), root=io_leader) + + # is the current process handling a right boundary data block on a distributed axe ? + is_at_right_boundary = (mesh.is_at_right_boundary*(mesh.proc_shape>1)).any() + is_at_right_boundary = np.asarray(comm.gather(is_at_right_boundary, root=io_leader)) + + if not is_io_leader: + continue + + # io_leader can now determine wether the cartesian discretization is uniformly distributed between processes or not + inner_compute_resolutions = tuple(compute_resolutions[i] for i in range(len(compute_resolutions)) if not is_at_right_boundary[i]) + grid_is_uniformly_distributed = all(res == inner_compute_resolutions[0] for res in inner_compute_resolutions) + grid_is_uniformly_distributed |= (topo.mpi_params.size == 1) + + if grid_is_uniformly_distributed: + # We divide the array in 'compute_resolution' chunks, no sychronization is required. + # Here there is no need to use the process locker to write this array data. + # Each process writes its own independent block of data of size 'compute_resolution'. + should_sync = False + chunks = inner_compute_resolutions[0] + else: + # We divide the array in >=1MB chunks (chunks are given in terms of elements) + # Array chunks may overlap different processes so we need interprocess sychronization (slow) + should_sync = True + if dim == 1: + chunks = 1024*1024 # at least 1MB / chunk + elif dim == 2: + chunks = (1024,1024) # at least 1MB / chunk + elif dim == 3: + chunks = (64,128,128) # at least 1MB / chunk + else: + raise NotImplementedError(dim) + + # Create array (no memory is allocated here, even on disk because data blocks are empty) + dtype = dfield.dtype + shape = grid_resolution + + array = discrete_fields.create_dataset(name='topo_{:02d}'.format(k), + overwrite=False, data=None, synchronizer=None, + compressor=compressor, shape=shape, chunks=chunks, + dtype=dtype, fill_value=default_invalid_value(dtype)) + array.attrs['should_sync'] = should_sync + + # We cannot rely on discrete mesh name because of topology names + # so we save some field metadata to be able to differentiate between + # discrete fields with the exact same grid resolution. + # proc_shape and name are used in last resort to differentiate discrete fields. + array.attrs['lboundaries'] = to_tuple(map(str, mesh.global_lboundaries)) + array.attrs['rboundaries'] = to_tuple(map(str, mesh.global_rboundaries)) + array.attrs['ghosts'] = to_tuple(mesh.ghosts) + array.attrs['proc_shape'] = to_tuple(mesh.proc_shape) + array.attrs['name'] = dfield.name + + if (root is not None): + vprint(root.tree()) + + try: + # some zarr store formats require a final close to flush data + if (root is not None): + root.close() + except AttributeError: + pass + + + def _dump_checkpoint(self, save_checkpoint, checkpoint_io_params, simu): + # Given a template, fill field and parameters data from all processes. + # returns (bool, msg) where bool is True on success + + mpi_params = self.mpi_params + comm = mpi_params.comm + io_leader = checkpoint_io_params.io_leader + is_io_leader = (io_leader == mpi_params.rank) + + if not os.path.exists(self._checkpoint_template): + # checkpoint template may have been deleted by user during simulation + self.create_checkpoint_template(save_checkpoint, checkpoint_io_params) + if is_io_leader: + if os.path.exists(save_checkpoint): + shutil.rmtree(save_checkpoint) + shutil.copytree(self._checkpoint_template, save_checkpoint) + comm.Barrier() + + #Every process now loads the same dataset template + import zarr + store = zarr.DirectoryStore(save_checkpoint) + root = zarr.open_group(store=store, mode='r+', synchronizer=None, path='') + fields_group = root['fields'] + + for field in sorted(self.fields, key=operator.attrgetter('name')): + # we do not care about fields discretized only on temporary fields + if all(df.is_tmp for df in field.discrete_fields.values()): + continue + + field_group = fields_group[field.name] + for (k, topo) in enumerate(sorted(field.discrete_fields, key=operator.attrgetter('full_tag'))): + dfield = field.discrete_fields[topo] + mesh = topo.mesh._mesh + + if dfield.is_tmp: + # we do not care about temporary fields + continue + + dataset = 'topo_{:02d}'.format(k) + array = field_group[dataset] + should_sync = array.attrs['should_sync'] + + assert dfield.nb_components == 1 + assert (array.shape == mesh.grid_resolution).all(), (array.shape, mesh.grid_resolution) + assert array.dtype == dfield.dtype, (array.dtype, dfield.dtype) + if should_sync: + # Should not be required untill we allow non-uniform discretizations + global_start = mesh.global_start + global_stop = mesh.global_stop + raise NotImplementedError('Synchronized multiprocess write has not been implemented yet.') + else: + assert (mesh.compute_resolution == array.chunks).all() or (mesh.is_at_right_boundary*(mesh.proc_shape>1)).any() + local_data = dfield.compute_data[0].get() + global_slices = mesh.global_compute_slices + array[global_slices] = local_data + + try: + # some zarr store formats require a final close to flush data + if (root is not None): + vprint(root.info) + root.close() + except AttributeError: + pass + + return True, None @debug def finalize(self): vprint('Finalizing problem...') super(Problem, self).finalize() + if (hasattr(self, '_checkpoint_template') + and (self._checkpoint_template is not None) + and os.path.exists(self._checkpoint_template)): + shutil.rmtree(self._checkpoint_template) diff --git a/hysop/tools/enum.py b/hysop/tools/enum.py index 176ee92b2ae2add2827259fee15ba3f785c93a0d..bf00ef36c0e0d7ce2041ac548ae3673d7d22bf37 100644 --- a/hysop/tools/enum.py +++ b/hysop/tools/enum.py @@ -242,7 +242,7 @@ class EnumFactory(object): def __hash__(self): return hash(self._field) - #pickling + # pickling def __reduce__(self): return (_EnumInstanceGenerator(), (name, self._field)) diff --git a/hysop/tools/parameters.py b/hysop/tools/parameters.py index 900a0f94301268bdb5560b2b11df63d26365258b..b59b80b8993904e0e7e0158e3b1c60275fc303a5 100755 --- a/hysop/tools/parameters.py +++ b/hysop/tools/parameters.py @@ -45,11 +45,12 @@ class MPIParams(namedtuple('MPIParams', ['comm', 'size', 'task_id', task_id=HYSOP_DEFAULT_TASK_ID, rank=main_rank, on_task=True): - if comm != MPI.COMM_NULL: + if (comm != MPI.COMM_NULL): rank = comm.Get_rank() size = comm.Get_size() else: rank = MPI.UNDEFINED + size = MPI.UNDEFINED return super(MPIParams, cls).__new__(cls, comm, size, task_id, rank, on_task) diff --git a/hysop/tools/warning.py b/hysop/tools/warning.py index cb040bf5b18d67b0b8e576daff91d4ad4e0cd136..ee8bb9246fb993ce85e8d1ee845f8cd7b7d75aa0 100644 --- a/hysop/tools/warning.py +++ b/hysop/tools/warning.py @@ -17,6 +17,12 @@ class HysopPerformanceWarning(HysopWarning): """ pass +class HysopDumpWarning(HysopWarning): + """ + Custom warning class for hysop I/O dumps. + """ + pass + class HysopCacheWarning(HysopWarning): """ Custom warning class for hysop caching. diff --git a/hysop/topology/topology.py b/hysop/topology/topology.py index 6ced43bf2e155a6beaa1a466e7cf041b44d4531b..398ad19a199203e526b60b8b2e6013efb7785217 100644 --- a/hysop/topology/topology.py +++ b/hysop/topology/topology.py @@ -125,8 +125,6 @@ class TopologyView(TaggedObjectView): topology_state: :class:`~hysop.topology.topology.TopologyState` State that charaterizes the given view. - topo_id: int - The topology unique id. domain : :class:`~hysop.domain.domain.Domain` The geometry on which the topology is defined. backend: :class:`~hysop.core.arrays.array_backend.ArrayBackend` diff --git a/hysop_examples/example_utils.py b/hysop_examples/example_utils.py index 1e027bafec235968d497ac7bab5d63ce8392bf58..5cf37c6530c218eec47df3ed59a71c6e150cf63c 100644 --- a/hysop_examples/example_utils.py +++ b/hysop_examples/example_utils.py @@ -1003,19 +1003,20 @@ class HysopArgParser(argparse.ArgumentParser): description = ('Configure problem checkpoints I/O parameters, dumped checkpoints represent simulation states ' 'that can be loaded back to continue the simulation later on.') pargs = self.add_argument_group('{} I/O'.format(pname.upper()), description=description) - pargs.add_argument('-L', '--load-checkpoint', default=None, type=str, dest='load_checkpoint', + pargs.add_argument('-L', '--load-checkpoint', default=None, const='checkpoint', nargs='?', type=str, dest='load_checkpoint', help=('Begin simulation from this checkpoint. Can be given as fullpath or as a filename relative to --checkpoint-dump-dir. ' 'The given checkpoint has to be compatible with the problem it will be loaded to. ' 'This will only work if parameter names, variable names, operator names, discretization and global topology information remain unchanged. ' - 'Operator ordering, boundary conditions, data ordering, data permutation and MPI layouts may be however be changed.')) - pargs.add_argument('-S', '--save-checkpoint', default=None, type=str, dest='save_checkpoint', + 'Operator ordering, boundary conditions, data ordering, data permutation and MPI layouts may be however be changed. ' + 'Defaults to {checkpoint_output_dir}/checkpoint.zip if no filename is specified.')) + pargs.add_argument('-S', '--save-checkpoint', default=None, const='checkpoint', nargs='?', type=str, dest='save_checkpoint', help=('Enable simulation checkpoints to be able to restart simulations from a specific point later on. ' 'Can be given as fullpath or as a filename relative to --checkpoint-dump-dir. ' 'Frequency or time of interests for checkpoints can be configured by using global FILE I/O parameters or ' 'specific --checkpoint-dump-* arguments which takes priority over global ones. ' 'Should not be to frequent for efficiency reasons. May be used in conjunction with --load-checkpoint, ' - 'in which case the starting checkpoint may be overwritten or lost in the case the same path are given. ' - 'Defaults to {checkpoint_output_dir}/checkpoint.zarr if no filename is specified.')) + 'in which case the starting checkpoint may be overwritten in the case the same path are given. ' + 'Defaults to {checkpoint_output_dir}/checkpoint.zip if no filename is specified.')) else: pargs = self.add_argument_group('{} I/O'.format(pname.upper())) @@ -1146,9 +1147,10 @@ class HysopArgParser(argparse.ArgumentParser): if args.force_ram_fs: args.enable_ram_fs = True - if args.dump_is_temporary: - msg='Dump is temporary but no postprocessing script has been supplied' - assert (args.postprocess_dump is not None), msg + + if args.dump_is_temporary and (args.postprocess_dump is None): + msg='Dump is temporary but no postprocessing script has been supplied.' + self.error(msg) for pname in pnames: def _set_arg(args, argname, pname, prefix=''): @@ -1167,15 +1169,16 @@ class HysopArgParser(argparse.ArgumentParser): if getattr(args, '{}_force_ram_fs'.format(pname)): setattr(args, '{}_enable_ram_fs'.format(pname), True) if getattr(args, '{}_dump_is_temporary'.format(pname)): - msg='{} dump is temporary but no postprocessing script has been supplied'.format(pname) pd = getattr(args, '{}_postprocess_dump'.format(pname)) - assert (pd is not None), msg + if (pd is None): + msg='{} dump is temporary but no postprocessing script has been supplied'.format(pname) + self.error(msg) bname = '{}_dump'.format(pname) self._check_default(args, '{}_dir'.format(bname), str, allow_none=False) self._check_default(args, '{}_freq'.format(bname), int, allow_none=False) self._check_default(args, '{}_period'.format(bname), float, allow_none=True) - self._check_default(args, '{}_times'.format(bname), tuple, allow_none=False) + self._check_default(args, '{}_times'.format(bname), tuple, allow_none=True) self._check_default(args, '{}_tstart'.format(bname), float, allow_none=False) self._check_default(args, '{}_tend'.format(bname), float, allow_none=False) self._check_positive(args, '{}_freq'.format(bname), strict=False, allow_none=False) @@ -1205,14 +1208,15 @@ class HysopArgParser(argparse.ArgumentParser): dump_times = getattr(args, '{}_times'.format(bname)) dump_period = getattr(args, '{}_period'.format(bname)) - if (dump_times is not None) or ((dump_period is not None) and (dump_period > 0.0)): - dump_times = set() if (dump_times is None) else set(dump_times) - if (dump_period is not None) and (dump_period > 0.0): - dt = dump_period - ndumps = int(np.floor(T/dt)) + 1 - toi = tstart + np.arange(ndumps)*dt - dump_times.update(toi) + + dump_times = set() if (dump_times is None) else set(dump_times) + if (dump_period is not None) and (dump_period > 0.0): + dt = dump_period + ndumps = int(np.floor(T/dt)) + 1 + toi = tstart + np.arange(ndumps)*dt + dump_times.update(toi) dump_times = filter(lambda t: (t>=tstart) & (t<=tend), dump_times) + setattr(args, '{}_times'.format(bname), tuple(sorted(dump_times))) times_of_interest.update(dump_times) @@ -1240,6 +1244,10 @@ class HysopArgParser(argparse.ArgumentParser): self.error(msg) args.times_of_interest = times_of_interest + + # checkpoints + self._check_default(args, 'load_checkpoint', str, allow_none=True) + self._check_default(args, 'save_checkpoint', str, allow_none=True) def _add_graphical_io_args(self): graphical_io = self.add_argument_group('Graphical I/O') @@ -1436,7 +1444,7 @@ class HysopArgParser(argparse.ArgumentParser): self.error(msg) def _check_dir(self, args, argnames, allow_shared=False, allow_none=False, - assert_root_folder_exists=True): + enforce_shared=False, assert_root_folder_exists=True): if not isinstance(argnames, tuple): argnames = (argnames,) for argname in argnames: @@ -1469,7 +1477,12 @@ class HysopArgParser(argparse.ArgumentParser): self.error(msg) if (not allow_shared) and self.is_shared_fs(path): - msg='{} directory \'{}\' cannot be stored on a network file system.' + msg='{} directory \'{}\' cannot be stored on a shared network file system.' + msg=msg.format(argname, argvalue) + self.error(msg) + + if enforce_shared and not self.is_shared_fs(path): + msg='{} directory \'{}\' has to be stored on a shared network file system.' msg=msg.format(argname, argvalue) self.error(msg) @@ -1543,6 +1556,25 @@ class HysopArgParser(argparse.ArgumentParser): postprocess_dump = getattr(args, '{}_postprocess_dump'.format(pname)), hdf5_disable_compression = getattr(args, '{}_hdf5_disable_compression'.format(pname))) setattr(args, '{}_io_params'.format(pname), iop) + + load_checkpoint = args.load_checkpoint + if (load_checkpoint is not None): + if (os.path.sep not in load_checkpoint): + load_checkpoint = os.path.join(args.checkpoint_dump_dir, load_checkpoint) + if not os.path.isfile(load_checkpoint): + msg = 'Cannot load checkpoint \'{}\' because the file does not exist.' + self.error(msg.format(load_checkpoint)) + load_checkpoint = os.path.abspath(load_checkpoint) + args.load_checkpoint = load_checkpoint + + save_checkpoint = args.save_checkpoint + if (save_checkpoint is not None): + if (os.path.sep not in save_checkpoint): + save_checkpoint = os.path.join(args.checkpoint_dump_dir, save_checkpoint) + save_checkpoint = os.path.abspath(save_checkpoint) + args.checkpoint_dump_dir = os.path.dirname(save_checkpoint) + args.save_checkpoint = save_checkpoint + def _setup_implementation(self, args): from hysop.constants import Implementation @@ -1862,8 +1894,7 @@ class HysopHelpFormatter(ColorHelpFormatter): p &= (len(action.option_strings)<2) or (action.option_strings[1] not in blacklist) return p actions = filter(predicate, actions) - usage = super(HysopHelpFormatter, self)._format_usage(usage=usage, actions=actions, - groups=groups, prefix=prefix) + usage = self._format_usage_color_help_formatter(usage=usage, actions=actions, groups=groups, prefix=prefix) usage = usage.rstrip() opencl_parameters = colors.color('OPENCL_PARAMETERS', fg='green', style='bold') autotuner_parameters = colors.color('AUTOTUNER_PARAMETERS', fg='green', style='bold') @@ -1881,6 +1912,100 @@ class HysopHelpFormatter(ColorHelpFormatter): usage +='\n{s}[--help] [--version] [--hardware-info] [--hardware-summary]'.format(s=s) usage +='\n\n' return usage + + def _format_usage_color_help_formatter(self, usage, actions, groups, prefix): + from gettext import gettext as _ + from colors import strip_color + import re as _re + if prefix is None: + prefix = _('usage: ') + + # if usage is specified, use that + if usage is not None: + usage = usage % dict(prog=self._prog) + + # if no optionals or positionals are available, usage is just prog + elif usage is None and not actions: + usage = '%(prog)s' % dict(prog=self._prog) + + # if optionals and positionals are available, calculate usage + elif usage is None: + prog = '%(prog)s' % dict(prog=self._prog) + + # split optionals from positionals + optionals = [] + positionals = [] + for action in actions: + if action.option_strings: + optionals.append(action) + else: + positionals.append(action) + + # build full usage string + format = self._format_actions_usage + action_usage = format(optionals + positionals, groups) + usage = ' '.join([s for s in [prog, action_usage] if s]) + + # wrap the usage parts if it's too long + text_width = self._width - self._current_indent + if len(prefix) + len(strip_color(usage)) > text_width: + + # break usage into wrappable parts + part_regexp = r'\(.*?\)+|\[.*?\]+|\S+' + opt_usage = format(optionals, groups) + pos_usage = format(positionals, groups) + opt_parts = _re.findall(part_regexp, opt_usage) + pos_parts = _re.findall(part_regexp, pos_usage) + + # helper for wrapping lines + def get_lines(parts, indent, prefix=None): + lines = [] + line = [] + if prefix is not None: + line_len = len(prefix) - 1 + else: + line_len = len(indent) - 1 + for part in parts: + if line_len + 1 + len(strip_color(part)) > text_width and line: + lines.append(indent + ' '.join(line)) + line = [] + line_len = len(indent) - 1 + line.append(part) + line_len += len(strip_color(part)) + 1 + if line: + lines.append(indent + ' '.join(line)) + if prefix is not None: + lines[0] = lines[0][len(indent):] + return lines + + # if prog is short, follow it with optionals or positionals + len_prog = len(strip_color(prog)) + if len(prefix) + len_prog <= 0.75 * text_width: + indent = ' ' * (len(prefix) + len_prog + 1) + if opt_parts: + lines = get_lines([prog] + opt_parts, indent, prefix) + lines.extend(get_lines(pos_parts, indent)) + elif pos_parts: + lines = get_lines([prog] + pos_parts, indent, prefix) + else: + lines = [prog] + + # if prog is long, put it on its own line + else: + indent = ' ' * len(prefix) + parts = opt_parts + pos_parts + lines = get_lines(parts, indent) + if len(lines) > 1: + lines = [] + lines.extend(get_lines(opt_parts, indent)) + lines.extend(get_lines(pos_parts, indent)) + lines = [prog] + lines + + # join lines into usage + usage = '\n'.join(lines) + + # prefix with 'usage:' + return '%s%s\n\n' % (prefix, usage) def start_section(self, heading): heading = colors.color('[{}]'.format(heading.upper()), fg='yellow', style='bold') diff --git a/hysop_examples/examples/scalar_diffusion/scalar_diffusion.py b/hysop_examples/examples/scalar_diffusion/scalar_diffusion.py index a19e4c1f15cae21b5dd7c226231e96be6da56d1a..23e130e62c2b90e49da49b634497fb75e4ab803b 100755 --- a/hysop_examples/examples/scalar_diffusion/scalar_diffusion.py +++ b/hysop_examples/examples/scalar_diffusion/scalar_diffusion.py @@ -48,7 +48,7 @@ def compute(args): # Create the problem we want to solve and insert our # directional splitting subgraph. # Add a writer of input field at given frequency. - problem = Problem(method=method) + problem = Problem(method=method, mpi_params=mpi_params) if (impl is Implementation.FORTRAN): # Build the directional diffusion operator @@ -100,6 +100,16 @@ def compute(args): # display the graph on the given process rank. if args.display_graph: problem.display(args.visu_rank) + + # Attach a field debug dumper when requested + if args.debug_dump_target: + from hysop.tools.debug_dumper import DebugDumper + debug_dumper = DebugDumper( + path=args.debug_dump_dir, + name=args.debug_dump_target, + force_overwrite=True, enable_on_op_apply=True) + else: + debug_dumper = None # Initialize discrete scalar field problem.initialize_field(scalar, formula=init_scalar) @@ -112,7 +122,11 @@ def compute(args): dt=dt, dt0=args.dt) # Finally solve the problem - problem.solve(simu, dry_run=args.dry_run) + problem.solve(simu, dry_run=args.dry_run, + debug_dumper=debug_dumper, + load_checkpoint=args.load_checkpoint, + save_checkpoint=args.save_checkpoint, + checkpoint_io_params=args.checkpoint_io_params) # Finalize problem.finalize() diff --git a/hysop_examples/examples/taylor_green/taylor_green.py b/hysop_examples/examples/taylor_green/taylor_green.py index 603a28ba2d946270e4a0dc9a83af2048d98dc368..744e0696e34374fbe0f1bc8287d84c7637ef52e7 100644 --- a/hysop_examples/examples/taylor_green/taylor_green.py +++ b/hysop_examples/examples/taylor_green/taylor_green.py @@ -308,7 +308,11 @@ def compute(args): problem.initialize_field(vorti, formula=init_vorticity) # Finally solve the problem - problem.solve(simu, dry_run=args.dry_run, debug_dumper=debug_dumper) + problem.solve(simu, dry_run=args.dry_run, + debug_dumper=debug_dumper, + load_checkpoint=args.load_checkpoint, + save_checkpoint=args.save_checkpoint, + checkpoint_io_params=args.checkpoint_io_params) # Finalize problem.finalize() diff --git a/requirements.txt b/requirements.txt index 6728ee0a566ba3b83ae2d9ea15b508768f05de81..6d68dae12b9a8440c3bb7a90853366d668ce0919 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,3 +23,5 @@ configparser backports.tempfile backports.weakref networkx +zarr +numcodecs