From ffd027553b7eb67acce4227534abf42edbcd960c Mon Sep 17 00:00:00 2001
From: Jean-Baptiste Keck <Jean-Baptiste.Keck@imag.fr>
Date: Fri, 24 Jul 2020 00:27:04 +0200
Subject: [PATCH] parameter checkpoint

---
 hysop/operator/adapt_timestep.py              |  21 +--
 hysop/parameters/__init__.py                  |   5 +-
 hysop/problem.py                              | 151 +++++++++++++-----
 .../examples/taylor_green/taylor_green.py     |  21 ++-
 4 files changed, 139 insertions(+), 59 deletions(-)

diff --git a/hysop/operator/adapt_timestep.py b/hysop/operator/adapt_timestep.py
index e2cbe93de..b547e6538 100755
--- a/hysop/operator/adapt_timestep.py
+++ b/hysop/operator/adapt_timestep.py
@@ -542,11 +542,11 @@ class AdaptiveTimeStep(ComputationalGraphNodeGenerator):
     def push_cst_criteria(self, cst, Finf=None,
             name=None, pretty_name=None,
             param_name=None, param_pretty_name=None,
-            parameter=None, quiet=False, **kwds):
+            parameter=None, quiet=False, dtype=None, **kwds):
 
         parameter = self._build_parameter(parameter=parameter, quiet=quiet,
                 name=param_name, pretty_name=param_pretty_name,
-                basename=name.replace('dt_', ''))
+                basename=name.replace('dt_', ''), dtype=dtype)
         criteria = ConstantTimestepCriteria(cst=cst, Finf=Finf,
             parameter=parameter, name=name, pretty_name=pretty_name, **kwds)
         self._push_criteria(parameter.name, criteria)
@@ -557,12 +557,12 @@ class AdaptiveTimeStep(ComputationalGraphNodeGenerator):
             param_name=None, param_pretty_name=None,
             parameter=None, quiet=False,
             relative_velocities=None,
-            equivalent_CFL=None, **kwds):
+            equivalent_CFL=None, dtype=None, **kwds):
         """
         See hysop.operator.adapt_timpestep.CflTimestepCriteria.
         """
         parameter = self._build_parameter(parameter=parameter, quiet=quiet,
-                name=param_name, pretty_name=param_pretty_name, basename='cfl')
+                name=param_name, pretty_name=param_pretty_name, basename='cfl', dtype=dtype)
         criteria = CflTimestepCriteria(cfl=cfl, Fmin=Fmin, Fmax=Fmax, Finf=Finf,
                                         dx=dx, parameter=parameter,
                                         name=name, pretty_name=pretty_name,
@@ -584,11 +584,11 @@ class AdaptiveTimeStep(ComputationalGraphNodeGenerator):
     def push_advection_criteria(self, lcfl, criteria, Finf=None, gradFinf=None,
             name=None, pretty_name=None,
             param_name=None, param_pretty_name=None,
-            parameter=None, quiet=False, **kwds):
+            parameter=None, quiet=False, dtype=None, **kwds):
         """
         See hysop.operator.adapt_timpestep.AdvectionTimestepCriteria.
         """
-        parameter = self._build_parameter(parameter=parameter, quiet=quiet,
+        parameter = self._build_parameter(parameter=parameter, quiet=quiet, dtype=dtype,
                 name=param_name, pretty_name=param_pretty_name, basename='lcfl_{}'.format(str(criteria).lower()))
         criteria = AdvectionTimestepCriteria(lcfl=lcfl, Finf=Finf, gradFinf=gradFinf,
             parameter=parameter, criteria=criteria,
@@ -603,24 +603,25 @@ class AdaptiveTimeStep(ComputationalGraphNodeGenerator):
         return self.push_advection_criteria(*args, **kwds)
 
     def push_stretching_criteria(self, criteria, gradFinf, cst=1.0,
-            name=None, pretty_name=None, parameter=None, quiet=False, **kwds):
+            name=None, pretty_name=None, parameter=None, quiet=False, dtype=None, **kwds):
         """
         See hysop.operator.adapt_timpestep.StretchingTimestepCriteria.
         """
         parameter = self._build_parameter(parameter=parameter, quiet=quiet,
-                name=name, pretty_name=pretty_name, basename='stretch')
+                name=name, pretty_name=pretty_name, basename='stretch', dtype=dtype)
         criteria = StretchingTimestepCriteria(cst=cst, parameter=parameter,
                 gradFinf=gradFinf, criteria=criteria, **kwds)
         self._push_criteria(parameter.name, criteria)
         return criteria.dt
 
     def _build_parameter(self, name=None, pretty_name=None, quiet=None,
-                            basename=None, parameter=None):
+                            basename=None, parameter=None, dtype=None):
         if (parameter is None):
             name         = first_not_None(name,'dt_{}'.format(basename))
             pretty_name  = first_not_None(pretty_name, name)
+            dtype        = first_not_None(dtype, HYSOP_REAL)
             parameter = ScalarParameter(name=name, pretty_name=pretty_name,
-                            quiet=quiet, dtype=HYSOP_REAL)
+                            quiet=quiet, dtype=dtype)
         return parameter
 
     def _push_criteria(self, name, criteria):
diff --git a/hysop/parameters/__init__.py b/hysop/parameters/__init__.py
index a8af4cd1a..77fa1da49 100644
--- a/hysop/parameters/__init__.py
+++ b/hysop/parameters/__init__.py
@@ -1,4 +1,5 @@
 
-from hysop.parameters.tensor_parameter  import TensorParameter
+from hysop.parameters.tensor_parameter import TensorParameter
 from hysop.parameters.scalar_parameter import ScalarParameter
-__all__ = ( ScalarParameter, TensorParameter )
+from hysop.parameters.buffer_parameter import BufferParameter
+__all__ = ( ScalarParameter, TensorParameter, BufferParameter)
diff --git a/hysop/problem.py b/hysop/problem.py
index 23858550d..2ad39c92a 100644
--- a/hysop/problem.py
+++ b/hysop/problem.py
@@ -9,10 +9,12 @@ from hysop.tools.contexts import Timer
 from hysop.tools.decorators import debug
 from hysop.tools.parameters import MPIParams
 from hysop.tools.numerics import default_invalid_value
+from hysop.tools.units import bytes2str, time2str
 from hysop.core.graph.computational_graph import ComputationalGraph
 from hysop.tools.string_utils import vprint_banner, vprint
-from hysop.core.mpi import main_rank, main_size, main_comm
+from hysop.core.mpi import main_rank, main_size, main_comm, Wtime
 from hysop.fields.cartesian_discrete_field import CartesianDiscreteScalarField
+from hysop.parameters import ScalarParameter, TensorParameter, BufferParameter
 from hysop.domain.box import Box
 
 
@@ -202,6 +204,7 @@ class Problem(ComputationalGraph):
 
         vprint('>Exporting problem checkpoint to \'{}\':'.format(save_checkpoint_tar))
         is_io_leader = (checkpoint_io_params.io_leader == self.mpi_params.rank)
+        start = Wtime();
         
         # create a backup of last checkpoint in the case things go wrong
         if is_io_leader and os.path.exists(save_checkpoint_tar):
@@ -214,7 +217,7 @@ class Problem(ComputationalGraph):
         
         # try to create the checkpoint directory, this is a collective MPI operation
         try:
-            success, reason = self._dump_checkpoint(save_checkpoint, checkpoint_io_params, simu)
+            success, reason, nbytes = self._dump_checkpoint(save_checkpoint, checkpoint_io_params, simu)
         except Exception as e:
             raise
             success = False
@@ -232,25 +235,44 @@ class Problem(ComputationalGraph):
                             tf.add(fpath, arcname=fpath.replace(save_checkpoint+os.path.sep,''))
                 if os.path.isfile(save_checkpoint_tar):
                     shutil.rmtree(save_checkpoint)
-            except:
-                raise
+                else:
+                    raise RuntimeError('Could not tar checkpoint data.')
+                ellapsed = Wtime() - start
+                effective_nbytes = os.path.getsize(save_checkpoint_tar)
+                compression_ratio = max(1.0, float(nbytes)/effective_nbytes)
+                msg=' > Successfully dumped checkpoint in {} with a compression ratio of {:.1f} ({}).'
+                vprint(msg.format(time2str(ellapsed), compression_ratio, bytes2str(effective_nbytes)))
+            except Exception as e:
                 success = False
-        
+                reason = str(e)
         success = main_comm.allreduce(int(success)) == main_comm.size
-        if not success:
-            from hysop.tools.warning import HysopDumpWarning
-            msg='Failed to dump checkpoint because: {}.'.format(reason)
-            warnings.warn(msg, HysopDumpWarning) 
 
-            # Something went wrong (I/O error or other) so we rollback to previous checkpoint (if there is one)
-            vprint(' | An error occured during checkpoint creation, rolling back to previous one...')
-            if is_io_leader:
-                if os.path.exists(save_checkpoint):
-                    shutil.rmtree(save_checkpoint)
-                if os.path.exists(save_checkpoint_tar):
-                    os.remove(save_checkpoint_tar)
-                if (backup_checkpoint_tar is not None) and os.path.exists(backup_checkpoint_tar):
-                    os.rename(backup_checkpoint_tar, save_checkpoint_tar)
+        if success:
+            return
+        
+        from hysop.tools.warning import HysopDumpWarning
+        msg='Failed to dump checkpoint because: {}.'.format(reason)
+        warnings.warn(msg, HysopDumpWarning) 
+
+        # Something went wrong (I/O error or other) so we rollback to previous checkpoint (if there is one)
+        vprint(' | An error occured during checkpoint creation, rolling back to previous one...')
+        if is_io_leader:
+            if os.path.exists(save_checkpoint):
+                shutil.rmtree(save_checkpoint)
+            if os.path.exists(save_checkpoint_tar):
+                os.remove(save_checkpoint_tar)
+            if (backup_checkpoint_tar is not None) and os.path.exists(backup_checkpoint_tar):
+                os.rename(backup_checkpoint_tar, save_checkpoint_tar)
+        
+    
+    @staticmethod
+    def format_zarr_key(k):
+        # note keys that contains the special characters '/' and '\' do not work well with zarr
+        # so we need to replace it by another character such as '_'.
+        # We cannot use utf8 characters such as u+2215 (division slash).
+        if (k is None):
+            return None
+        return k.replace('/', '_').replace('\\', '_')
 
     def create_checkpoint_template(self, save_checkpoint, checkpoint_io_params):
         # Create groups of arrays on disk (only hierarchy and array metadata is stored in the template)
@@ -273,29 +295,52 @@ class Problem(ComputationalGraph):
         # Array block data compressor
         from numcodecs import blosc, Blosc
         blosc.use_threads = (self.mpi_params.size == 1) # disable threads for multiple processes (can deadlock)
-        compressor = Blosc(cname='snappy', clevel=3, shuffle=Blosc.BITSHUFFLE)
+        compressor = Blosc(cname='snappy', clevel=4, shuffle=Blosc.BITSHUFFLE)
         
         # Create a directory layout as a file on shared filesystem
         import zarr
+
+        nbytes = 0 # count number of total data bytes without compression
         
         if is_io_leader:
             if os.path.exists(checkpoint_template):
                 shutil.rmtree(checkpoint_template)
             store  = zarr.DirectoryStore(path=checkpoint_template)
             root   = zarr.open_group(store=store, mode='w', path='')
+            params = root.create_group('params')
             fields = root.create_group('fields')
         else:
             store  = None
             root   = None
+            params = None
             fields = None
-
+        
+        # generate parameter arrays
+        for param in sorted(self.parameters, key=operator.attrgetter('name')):
+            if not is_io_leader:
+                continue
+            if isinstance(param, (ScalarParameter, TensorParameter, BufferParameter)):
+                # all those parameters store their data in a numpy ndarray so we're good
+                assert isinstance(param._value, np.ndarray), type(param._value)
+                value = param._value
+                array = params.create_dataset(name=self.format_zarr_key(param.name),
+                            overwrite=False, data=None, synchronizer=None, 
+                            compressor=compressor, shape=value.shape, chunks=None,
+                            dtype=value.dtype, fill_value=default_invalid_value(value.dtype))
+                array.attrs['kind'] = param.__class__.__name__
+                nbytes += value.nbytes
+            else:
+                msg = 'Cannot dump parameter of type {}.'.format(param.__class__.__name__)
+                raise NotImplementedError(msg)
+        
+        # generate discrete field arrays
         for field in sorted(self.fields, key=operator.attrgetter('name')):
             # we do not care about fields discretized only on temporary fields
             if all(df.is_tmp for df in field.discrete_fields.values()):
                 continue
 
             if is_io_leader:
-                discrete_fields = fields.create_group(field.name)
+                discrete_fields = fields.create_group(self.format_zarr_key(field.name))
             else:
                 discrete_fields = None
 
@@ -339,9 +384,12 @@ class Problem(ComputationalGraph):
                 if not is_io_leader:
                     continue
                 
-                # io_leader can now determine wether the cartesian discretization is uniformly distributed between processes or not
-                inner_compute_resolutions = tuple(compute_resolutions[i] for i in range(len(compute_resolutions)) if not is_at_right_boundary[i])
-                grid_is_uniformly_distributed = all(res == inner_compute_resolutions[0] for res in inner_compute_resolutions)
+                # io_leader can now determine wether the cartesian discretization is uniformly distributed 
+                # between processes or not
+                inner_compute_resolutions = tuple(compute_resolutions[i] for i in range(len(compute_resolutions)) 
+                                                                         if not is_at_right_boundary[i])
+                grid_is_uniformly_distributed = all(res == inner_compute_resolutions[0] 
+                                                            for res in inner_compute_resolutions)
                 grid_is_uniformly_distributed |= (topo.mpi_params.size == 1)
 
                 if grid_is_uniformly_distributed:
@@ -366,13 +414,15 @@ class Problem(ComputationalGraph):
                 # Create array (no memory is allocated here, even on disk because data blocks are empty)
                 dtype = dfield.dtype
                 shape = grid_resolution
-
+                
+                # We scale the keys up to 100 topologies, which seams to be a pretty decent upper limit
+                # on a per field basis.
                 array = discrete_fields.create_dataset(name='topo_{:02d}'.format(k), 
                             overwrite=False, data=None, synchronizer=None, 
                             compressor=compressor, shape=shape, chunks=chunks, 
                             dtype=dtype, fill_value=default_invalid_value(dtype))
                 array.attrs['should_sync'] = should_sync
-                
+
                 # We cannot rely on discrete mesh name because of topology names
                 # so we save some field metadata to be able to differentiate between 
                 # discrete fields with the exact same grid resolution.
@@ -382,12 +432,17 @@ class Problem(ComputationalGraph):
                 array.attrs['ghosts']      = to_tuple(mesh.ghosts)
                 array.attrs['proc_shape']  = to_tuple(mesh.proc_shape)
                 array.attrs['name']        = dfield.name
+                
+                nbytes += np.prod(shape, dtype=np.int64) * dtype.itemsize
         
         if (root is not None):
+            root.attrs['nbytes'] = nbytes
+            msg=' => Maximum checkpoint size will be {}, without compression and metadata.'
             vprint(root.tree())
+            vprint(msg.format(bytes2str(nbytes)))
 
+        # some zarr store formats require a final close to flush data
         try:
-            # some zarr store formats require a final close to flush data
             if (root is not None):
                 root.close()
         except AttributeError:
@@ -416,14 +471,36 @@ class Problem(ComputationalGraph):
         import zarr 
         store = zarr.DirectoryStore(save_checkpoint)
         root  = zarr.open_group(store=store, mode='r+', synchronizer=None, path='')
+
+        nbytes = root.attrs['nbytes']
         fields_group = root['fields']
+        params_group = root['params']
+
+        # Currently there is no distributed parameter capabilities so io_leader has to dump all parameters
+        if is_io_leader:
+            msg = ' | dumping parameters...'
+            vprint(msg)
+            for param in sorted(self.parameters, key=operator.attrgetter('name')):
+                if isinstance(param, (ScalarParameter, TensorParameter, BufferParameter)):
+                    array = params_group[self.format_zarr_key(param.name)]
+                    assert array.attrs['kind'] == param.__class__.__name__
+                    assert array.dtype == param._value.dtype
+                    assert array.shape == param._value.shape
+                    array[...] = param._value
+                else:
+                    msg = 'Cannot dump parameter of type {}.'.format(param.__class__.__name__)
+                    raise NotImplementedError(msg)
         
+        # Unlike parameter all processes participate for fields
         for field in sorted(self.fields, key=operator.attrgetter('name')):
             # we do not care about fields discretized only on temporary fields
             if all(df.is_tmp for df in field.discrete_fields.values()):
                 continue
+                
+            msg = ' | dumping field {}...'.format(field.pretty_name)
+            vprint(msg)
 
-            field_group = fields_group[field.name]
+            field_group = fields_group[self.format_zarr_key(field.name)]
             for (k, topo) in enumerate(sorted(field.discrete_fields, key=operator.attrgetter('full_tag'))):
                 dfield = field.discrete_fields[topo]
                 mesh   = topo.mesh._mesh
@@ -432,33 +509,33 @@ class Problem(ComputationalGraph):
                     # we do not care about temporary fields
                     continue
 
-                dataset = 'topo_{:02d}'.format(k)
+                dataset = 'topo_{:02d}'.format(k) # key has to match template
                 array = field_group[dataset]
                 should_sync = array.attrs['should_sync']
 
                 assert dfield.nb_components == 1
                 assert (array.shape == mesh.grid_resolution).all(), (array.shape, mesh.grid_resolution)
                 assert array.dtype == dfield.dtype, (array.dtype, dfield.dtype)
+
                 if should_sync: 
                     # Should not be required untill we allow non-uniform discretizations
                     global_start = mesh.global_start
                     global_stop  = mesh.global_stop
                     raise NotImplementedError('Synchronized multiprocess write has not been implemented yet.')
                 else:
-                    assert (mesh.compute_resolution == array.chunks).all() or (mesh.is_at_right_boundary*(mesh.proc_shape>1)).any()
+                    assert ((mesh.compute_resolution == array.chunks).all() 
+                         or (mesh.is_at_right_boundary*(mesh.proc_shape>1)).any())
                     local_data = dfield.compute_data[0].get()
                     global_slices = mesh.global_compute_slices
                     array[global_slices] = local_data
         
+        # Some zarr store formats require a final close to flush data
         try:
-            # some zarr store formats require a final close to flush data
-            if (root is not None):
-                vprint(root.info)
-                root.close()
+            root.close()
         except AttributeError:
             pass
 
-        return True, None
+        return True, None, nbytes
 
     @debug
     def finalize(self):
@@ -466,5 +543,7 @@ class Problem(ComputationalGraph):
         super(Problem, self).finalize()
         if (hasattr(self, '_checkpoint_template') 
           and (self._checkpoint_template is not None) 
-          and os.path.exists(self._checkpoint_template)):
+          and os.path.exists(self._checkpoint_template)
+          and self.mpi_params.rank==0):
             shutil.rmtree(self._checkpoint_template)
+
diff --git a/hysop_examples/examples/taylor_green/taylor_green.py b/hysop_examples/examples/taylor_green/taylor_green.py
index 744e0696e..2ee4bd75e 100644
--- a/hysop_examples/examples/taylor_green/taylor_green.py
+++ b/hysop_examples/examples/taylor_green/taylor_green.py
@@ -40,8 +40,8 @@ def compute(args):
                                 Enstrophy, MinMaxFieldStatistics, StrangSplitting,    \
                                 ParameterPlotter, Advection, MinMaxGradientStatistics
 
-    from hysop.methods import SpaceDiscretization, Remesh, TimeIntegrator, \
-                              ComputeGranularity, Interpolation
+    from hysop.methods import SpaceDiscretization, Remesh, TimeIntegrator, Interpolation
+
     ## IO paths
     spectral_path = IO.default_path() + '/spectral'
 
@@ -172,18 +172,18 @@ def compute(args):
     ### Adaptive timestep operator
     if args.variable_timestep:
         adapt_dt = AdaptiveTimeStep(dt, equivalent_CFL=True)
-        dt_cfl   = adapt_dt.push_cfl_criteria(cfl=args.cfl,
-                                              Fmin=min_max_U.Fmin,
-                                              Fmax=min_max_U.Fmax,
-                                              equivalent_CFL=True)
+        dt_cfl = adapt_dt.push_cfl_criteria(cfl=args.cfl, dtype=dt.dtype,
+                                            Fmin=min_max_U.Fmin,
+                                            Fmax=min_max_U.Fmax,
+                                            equivalent_CFL=True)
         dt_stretch = adapt_dt.push_stretching_criteria(gradFinf=min_max_gradU.Finf,
-                criteria=StretchingCriteria.GRAD_U)
+                criteria=StretchingCriteria.GRAD_U, dtype=dt.dtype)
         dt_lcfl0 = adapt_dt.push_advection_criteria(lcfl=args.lcfl, Finf=min_max_W.Finf,
-                                criteria=AdvectionCriteria.W_INF, name='LCFL0')
+                                criteria=AdvectionCriteria.W_INF, name='LCFL0', dtype=dt.dtype)
         dt_lcfl1 = adapt_dt.push_advection_criteria(lcfl=args.lcfl, gradFinf=min_max_gradU.Finf,
-                                criteria=AdvectionCriteria.GRAD_U, name='LCFL1')
+                                criteria=AdvectionCriteria.GRAD_U, name='LCFL1', dtype=dt.dtype)
         dt_lcfl2 = adapt_dt.push_advection_criteria(lcfl=args.lcfl, gradFinf=min_max_gradU.Finf,
-                                criteria=AdvectionCriteria.DEFORMATION, name='LCFL2')
+                                criteria=AdvectionCriteria.DEFORMATION, name='LCFL2', dtype=dt.dtype)
     else:
         adapt_dt = None
 
@@ -264,7 +264,6 @@ def compute(args):
     # accross all operators contained in the graph.
     method.update(
             {
-               ComputeGranularity:    args.compute_granularity,
                SpaceDiscretization:   args.fd_order,
                TimeIntegrator:        args.time_integrator,
                Remesh:                args.remesh_kernel,
-- 
GitLab