diff --git a/hysop/operator/adapt_timestep.py b/hysop/operator/adapt_timestep.py index e2cbe93dec9923568cb06d68994fe894252acb8b..b547e65382a45d6e81a66643352508259ae367b2 100755 --- a/hysop/operator/adapt_timestep.py +++ b/hysop/operator/adapt_timestep.py @@ -542,11 +542,11 @@ class AdaptiveTimeStep(ComputationalGraphNodeGenerator): def push_cst_criteria(self, cst, Finf=None, name=None, pretty_name=None, param_name=None, param_pretty_name=None, - parameter=None, quiet=False, **kwds): + parameter=None, quiet=False, dtype=None, **kwds): parameter = self._build_parameter(parameter=parameter, quiet=quiet, name=param_name, pretty_name=param_pretty_name, - basename=name.replace('dt_', '')) + basename=name.replace('dt_', ''), dtype=dtype) criteria = ConstantTimestepCriteria(cst=cst, Finf=Finf, parameter=parameter, name=name, pretty_name=pretty_name, **kwds) self._push_criteria(parameter.name, criteria) @@ -557,12 +557,12 @@ class AdaptiveTimeStep(ComputationalGraphNodeGenerator): param_name=None, param_pretty_name=None, parameter=None, quiet=False, relative_velocities=None, - equivalent_CFL=None, **kwds): + equivalent_CFL=None, dtype=None, **kwds): """ See hysop.operator.adapt_timpestep.CflTimestepCriteria. """ parameter = self._build_parameter(parameter=parameter, quiet=quiet, - name=param_name, pretty_name=param_pretty_name, basename='cfl') + name=param_name, pretty_name=param_pretty_name, basename='cfl', dtype=dtype) criteria = CflTimestepCriteria(cfl=cfl, Fmin=Fmin, Fmax=Fmax, Finf=Finf, dx=dx, parameter=parameter, name=name, pretty_name=pretty_name, @@ -584,11 +584,11 @@ class AdaptiveTimeStep(ComputationalGraphNodeGenerator): def push_advection_criteria(self, lcfl, criteria, Finf=None, gradFinf=None, name=None, pretty_name=None, param_name=None, param_pretty_name=None, - parameter=None, quiet=False, **kwds): + parameter=None, quiet=False, dtype=None, **kwds): """ See hysop.operator.adapt_timpestep.AdvectionTimestepCriteria. """ - parameter = self._build_parameter(parameter=parameter, quiet=quiet, + parameter = self._build_parameter(parameter=parameter, quiet=quiet, dtype=dtype, name=param_name, pretty_name=param_pretty_name, basename='lcfl_{}'.format(str(criteria).lower())) criteria = AdvectionTimestepCriteria(lcfl=lcfl, Finf=Finf, gradFinf=gradFinf, parameter=parameter, criteria=criteria, @@ -603,24 +603,25 @@ class AdaptiveTimeStep(ComputationalGraphNodeGenerator): return self.push_advection_criteria(*args, **kwds) def push_stretching_criteria(self, criteria, gradFinf, cst=1.0, - name=None, pretty_name=None, parameter=None, quiet=False, **kwds): + name=None, pretty_name=None, parameter=None, quiet=False, dtype=None, **kwds): """ See hysop.operator.adapt_timpestep.StretchingTimestepCriteria. """ parameter = self._build_parameter(parameter=parameter, quiet=quiet, - name=name, pretty_name=pretty_name, basename='stretch') + name=name, pretty_name=pretty_name, basename='stretch', dtype=dtype) criteria = StretchingTimestepCriteria(cst=cst, parameter=parameter, gradFinf=gradFinf, criteria=criteria, **kwds) self._push_criteria(parameter.name, criteria) return criteria.dt def _build_parameter(self, name=None, pretty_name=None, quiet=None, - basename=None, parameter=None): + basename=None, parameter=None, dtype=None): if (parameter is None): name = first_not_None(name,'dt_{}'.format(basename)) pretty_name = first_not_None(pretty_name, name) + dtype = first_not_None(dtype, HYSOP_REAL) parameter = ScalarParameter(name=name, pretty_name=pretty_name, - quiet=quiet, dtype=HYSOP_REAL) + quiet=quiet, dtype=dtype) return parameter def _push_criteria(self, name, criteria): diff --git a/hysop/parameters/__init__.py b/hysop/parameters/__init__.py index a8af4cd1a4042d1bc570fa39e8fe089d3ddc4936..77fa1da49b07ed37ff0fb23844eee9e13f663e0d 100644 --- a/hysop/parameters/__init__.py +++ b/hysop/parameters/__init__.py @@ -1,4 +1,5 @@ -from hysop.parameters.tensor_parameter import TensorParameter +from hysop.parameters.tensor_parameter import TensorParameter from hysop.parameters.scalar_parameter import ScalarParameter -__all__ = ( ScalarParameter, TensorParameter ) +from hysop.parameters.buffer_parameter import BufferParameter +__all__ = ( ScalarParameter, TensorParameter, BufferParameter) diff --git a/hysop/problem.py b/hysop/problem.py index 23858550d5315198316c480683de0555b7b9956b..2ad39c92a24c524e86fc384e683d3992e7a68f05 100644 --- a/hysop/problem.py +++ b/hysop/problem.py @@ -9,10 +9,12 @@ from hysop.tools.contexts import Timer from hysop.tools.decorators import debug from hysop.tools.parameters import MPIParams from hysop.tools.numerics import default_invalid_value +from hysop.tools.units import bytes2str, time2str from hysop.core.graph.computational_graph import ComputationalGraph from hysop.tools.string_utils import vprint_banner, vprint -from hysop.core.mpi import main_rank, main_size, main_comm +from hysop.core.mpi import main_rank, main_size, main_comm, Wtime from hysop.fields.cartesian_discrete_field import CartesianDiscreteScalarField +from hysop.parameters import ScalarParameter, TensorParameter, BufferParameter from hysop.domain.box import Box @@ -202,6 +204,7 @@ class Problem(ComputationalGraph): vprint('>Exporting problem checkpoint to \'{}\':'.format(save_checkpoint_tar)) is_io_leader = (checkpoint_io_params.io_leader == self.mpi_params.rank) + start = Wtime(); # create a backup of last checkpoint in the case things go wrong if is_io_leader and os.path.exists(save_checkpoint_tar): @@ -214,7 +217,7 @@ class Problem(ComputationalGraph): # try to create the checkpoint directory, this is a collective MPI operation try: - success, reason = self._dump_checkpoint(save_checkpoint, checkpoint_io_params, simu) + success, reason, nbytes = self._dump_checkpoint(save_checkpoint, checkpoint_io_params, simu) except Exception as e: raise success = False @@ -232,25 +235,44 @@ class Problem(ComputationalGraph): tf.add(fpath, arcname=fpath.replace(save_checkpoint+os.path.sep,'')) if os.path.isfile(save_checkpoint_tar): shutil.rmtree(save_checkpoint) - except: - raise + else: + raise RuntimeError('Could not tar checkpoint data.') + ellapsed = Wtime() - start + effective_nbytes = os.path.getsize(save_checkpoint_tar) + compression_ratio = max(1.0, float(nbytes)/effective_nbytes) + msg=' > Successfully dumped checkpoint in {} with a compression ratio of {:.1f} ({}).' + vprint(msg.format(time2str(ellapsed), compression_ratio, bytes2str(effective_nbytes))) + except Exception as e: success = False - + reason = str(e) success = main_comm.allreduce(int(success)) == main_comm.size - if not success: - from hysop.tools.warning import HysopDumpWarning - msg='Failed to dump checkpoint because: {}.'.format(reason) - warnings.warn(msg, HysopDumpWarning) - # Something went wrong (I/O error or other) so we rollback to previous checkpoint (if there is one) - vprint(' | An error occured during checkpoint creation, rolling back to previous one...') - if is_io_leader: - if os.path.exists(save_checkpoint): - shutil.rmtree(save_checkpoint) - if os.path.exists(save_checkpoint_tar): - os.remove(save_checkpoint_tar) - if (backup_checkpoint_tar is not None) and os.path.exists(backup_checkpoint_tar): - os.rename(backup_checkpoint_tar, save_checkpoint_tar) + if success: + return + + from hysop.tools.warning import HysopDumpWarning + msg='Failed to dump checkpoint because: {}.'.format(reason) + warnings.warn(msg, HysopDumpWarning) + + # Something went wrong (I/O error or other) so we rollback to previous checkpoint (if there is one) + vprint(' | An error occured during checkpoint creation, rolling back to previous one...') + if is_io_leader: + if os.path.exists(save_checkpoint): + shutil.rmtree(save_checkpoint) + if os.path.exists(save_checkpoint_tar): + os.remove(save_checkpoint_tar) + if (backup_checkpoint_tar is not None) and os.path.exists(backup_checkpoint_tar): + os.rename(backup_checkpoint_tar, save_checkpoint_tar) + + + @staticmethod + def format_zarr_key(k): + # note keys that contains the special characters '/' and '\' do not work well with zarr + # so we need to replace it by another character such as '_'. + # We cannot use utf8 characters such as u+2215 (division slash). + if (k is None): + return None + return k.replace('/', '_').replace('\\', '_') def create_checkpoint_template(self, save_checkpoint, checkpoint_io_params): # Create groups of arrays on disk (only hierarchy and array metadata is stored in the template) @@ -273,29 +295,52 @@ class Problem(ComputationalGraph): # Array block data compressor from numcodecs import blosc, Blosc blosc.use_threads = (self.mpi_params.size == 1) # disable threads for multiple processes (can deadlock) - compressor = Blosc(cname='snappy', clevel=3, shuffle=Blosc.BITSHUFFLE) + compressor = Blosc(cname='snappy', clevel=4, shuffle=Blosc.BITSHUFFLE) # Create a directory layout as a file on shared filesystem import zarr + + nbytes = 0 # count number of total data bytes without compression if is_io_leader: if os.path.exists(checkpoint_template): shutil.rmtree(checkpoint_template) store = zarr.DirectoryStore(path=checkpoint_template) root = zarr.open_group(store=store, mode='w', path='') + params = root.create_group('params') fields = root.create_group('fields') else: store = None root = None + params = None fields = None - + + # generate parameter arrays + for param in sorted(self.parameters, key=operator.attrgetter('name')): + if not is_io_leader: + continue + if isinstance(param, (ScalarParameter, TensorParameter, BufferParameter)): + # all those parameters store their data in a numpy ndarray so we're good + assert isinstance(param._value, np.ndarray), type(param._value) + value = param._value + array = params.create_dataset(name=self.format_zarr_key(param.name), + overwrite=False, data=None, synchronizer=None, + compressor=compressor, shape=value.shape, chunks=None, + dtype=value.dtype, fill_value=default_invalid_value(value.dtype)) + array.attrs['kind'] = param.__class__.__name__ + nbytes += value.nbytes + else: + msg = 'Cannot dump parameter of type {}.'.format(param.__class__.__name__) + raise NotImplementedError(msg) + + # generate discrete field arrays for field in sorted(self.fields, key=operator.attrgetter('name')): # we do not care about fields discretized only on temporary fields if all(df.is_tmp for df in field.discrete_fields.values()): continue if is_io_leader: - discrete_fields = fields.create_group(field.name) + discrete_fields = fields.create_group(self.format_zarr_key(field.name)) else: discrete_fields = None @@ -339,9 +384,12 @@ class Problem(ComputationalGraph): if not is_io_leader: continue - # io_leader can now determine wether the cartesian discretization is uniformly distributed between processes or not - inner_compute_resolutions = tuple(compute_resolutions[i] for i in range(len(compute_resolutions)) if not is_at_right_boundary[i]) - grid_is_uniformly_distributed = all(res == inner_compute_resolutions[0] for res in inner_compute_resolutions) + # io_leader can now determine wether the cartesian discretization is uniformly distributed + # between processes or not + inner_compute_resolutions = tuple(compute_resolutions[i] for i in range(len(compute_resolutions)) + if not is_at_right_boundary[i]) + grid_is_uniformly_distributed = all(res == inner_compute_resolutions[0] + for res in inner_compute_resolutions) grid_is_uniformly_distributed |= (topo.mpi_params.size == 1) if grid_is_uniformly_distributed: @@ -366,13 +414,15 @@ class Problem(ComputationalGraph): # Create array (no memory is allocated here, even on disk because data blocks are empty) dtype = dfield.dtype shape = grid_resolution - + + # We scale the keys up to 100 topologies, which seams to be a pretty decent upper limit + # on a per field basis. array = discrete_fields.create_dataset(name='topo_{:02d}'.format(k), overwrite=False, data=None, synchronizer=None, compressor=compressor, shape=shape, chunks=chunks, dtype=dtype, fill_value=default_invalid_value(dtype)) array.attrs['should_sync'] = should_sync - + # We cannot rely on discrete mesh name because of topology names # so we save some field metadata to be able to differentiate between # discrete fields with the exact same grid resolution. @@ -382,12 +432,17 @@ class Problem(ComputationalGraph): array.attrs['ghosts'] = to_tuple(mesh.ghosts) array.attrs['proc_shape'] = to_tuple(mesh.proc_shape) array.attrs['name'] = dfield.name + + nbytes += np.prod(shape, dtype=np.int64) * dtype.itemsize if (root is not None): + root.attrs['nbytes'] = nbytes + msg=' => Maximum checkpoint size will be {}, without compression and metadata.' vprint(root.tree()) + vprint(msg.format(bytes2str(nbytes))) + # some zarr store formats require a final close to flush data try: - # some zarr store formats require a final close to flush data if (root is not None): root.close() except AttributeError: @@ -416,14 +471,36 @@ class Problem(ComputationalGraph): import zarr store = zarr.DirectoryStore(save_checkpoint) root = zarr.open_group(store=store, mode='r+', synchronizer=None, path='') + + nbytes = root.attrs['nbytes'] fields_group = root['fields'] + params_group = root['params'] + + # Currently there is no distributed parameter capabilities so io_leader has to dump all parameters + if is_io_leader: + msg = ' | dumping parameters...' + vprint(msg) + for param in sorted(self.parameters, key=operator.attrgetter('name')): + if isinstance(param, (ScalarParameter, TensorParameter, BufferParameter)): + array = params_group[self.format_zarr_key(param.name)] + assert array.attrs['kind'] == param.__class__.__name__ + assert array.dtype == param._value.dtype + assert array.shape == param._value.shape + array[...] = param._value + else: + msg = 'Cannot dump parameter of type {}.'.format(param.__class__.__name__) + raise NotImplementedError(msg) + # Unlike parameter all processes participate for fields for field in sorted(self.fields, key=operator.attrgetter('name')): # we do not care about fields discretized only on temporary fields if all(df.is_tmp for df in field.discrete_fields.values()): continue + + msg = ' | dumping field {}...'.format(field.pretty_name) + vprint(msg) - field_group = fields_group[field.name] + field_group = fields_group[self.format_zarr_key(field.name)] for (k, topo) in enumerate(sorted(field.discrete_fields, key=operator.attrgetter('full_tag'))): dfield = field.discrete_fields[topo] mesh = topo.mesh._mesh @@ -432,33 +509,33 @@ class Problem(ComputationalGraph): # we do not care about temporary fields continue - dataset = 'topo_{:02d}'.format(k) + dataset = 'topo_{:02d}'.format(k) # key has to match template array = field_group[dataset] should_sync = array.attrs['should_sync'] assert dfield.nb_components == 1 assert (array.shape == mesh.grid_resolution).all(), (array.shape, mesh.grid_resolution) assert array.dtype == dfield.dtype, (array.dtype, dfield.dtype) + if should_sync: # Should not be required untill we allow non-uniform discretizations global_start = mesh.global_start global_stop = mesh.global_stop raise NotImplementedError('Synchronized multiprocess write has not been implemented yet.') else: - assert (mesh.compute_resolution == array.chunks).all() or (mesh.is_at_right_boundary*(mesh.proc_shape>1)).any() + assert ((mesh.compute_resolution == array.chunks).all() + or (mesh.is_at_right_boundary*(mesh.proc_shape>1)).any()) local_data = dfield.compute_data[0].get() global_slices = mesh.global_compute_slices array[global_slices] = local_data + # Some zarr store formats require a final close to flush data try: - # some zarr store formats require a final close to flush data - if (root is not None): - vprint(root.info) - root.close() + root.close() except AttributeError: pass - return True, None + return True, None, nbytes @debug def finalize(self): @@ -466,5 +543,7 @@ class Problem(ComputationalGraph): super(Problem, self).finalize() if (hasattr(self, '_checkpoint_template') and (self._checkpoint_template is not None) - and os.path.exists(self._checkpoint_template)): + and os.path.exists(self._checkpoint_template) + and self.mpi_params.rank==0): shutil.rmtree(self._checkpoint_template) + diff --git a/hysop_examples/examples/taylor_green/taylor_green.py b/hysop_examples/examples/taylor_green/taylor_green.py index 744e0696e34374fbe0f1bc8287d84c7637ef52e7..2ee4bd75e70284ad9abc7b3fc1d5880c9fbafc7a 100644 --- a/hysop_examples/examples/taylor_green/taylor_green.py +++ b/hysop_examples/examples/taylor_green/taylor_green.py @@ -40,8 +40,8 @@ def compute(args): Enstrophy, MinMaxFieldStatistics, StrangSplitting, \ ParameterPlotter, Advection, MinMaxGradientStatistics - from hysop.methods import SpaceDiscretization, Remesh, TimeIntegrator, \ - ComputeGranularity, Interpolation + from hysop.methods import SpaceDiscretization, Remesh, TimeIntegrator, Interpolation + ## IO paths spectral_path = IO.default_path() + '/spectral' @@ -172,18 +172,18 @@ def compute(args): ### Adaptive timestep operator if args.variable_timestep: adapt_dt = AdaptiveTimeStep(dt, equivalent_CFL=True) - dt_cfl = adapt_dt.push_cfl_criteria(cfl=args.cfl, - Fmin=min_max_U.Fmin, - Fmax=min_max_U.Fmax, - equivalent_CFL=True) + dt_cfl = adapt_dt.push_cfl_criteria(cfl=args.cfl, dtype=dt.dtype, + Fmin=min_max_U.Fmin, + Fmax=min_max_U.Fmax, + equivalent_CFL=True) dt_stretch = adapt_dt.push_stretching_criteria(gradFinf=min_max_gradU.Finf, - criteria=StretchingCriteria.GRAD_U) + criteria=StretchingCriteria.GRAD_U, dtype=dt.dtype) dt_lcfl0 = adapt_dt.push_advection_criteria(lcfl=args.lcfl, Finf=min_max_W.Finf, - criteria=AdvectionCriteria.W_INF, name='LCFL0') + criteria=AdvectionCriteria.W_INF, name='LCFL0', dtype=dt.dtype) dt_lcfl1 = adapt_dt.push_advection_criteria(lcfl=args.lcfl, gradFinf=min_max_gradU.Finf, - criteria=AdvectionCriteria.GRAD_U, name='LCFL1') + criteria=AdvectionCriteria.GRAD_U, name='LCFL1', dtype=dt.dtype) dt_lcfl2 = adapt_dt.push_advection_criteria(lcfl=args.lcfl, gradFinf=min_max_gradU.Finf, - criteria=AdvectionCriteria.DEFORMATION, name='LCFL2') + criteria=AdvectionCriteria.DEFORMATION, name='LCFL2', dtype=dt.dtype) else: adapt_dt = None @@ -264,7 +264,6 @@ def compute(args): # accross all operators contained in the graph. method.update( { - ComputeGranularity: args.compute_granularity, SpaceDiscretization: args.fd_order, TimeIntegrator: args.time_integrator, Remesh: args.remesh_kernel,