From 68c6a0ac9a34ddf2771e9488a78a16aa760be2e2 Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Keck <Jean-Baptiste.Keck@imag.fr> Date: Thu, 10 Oct 2019 15:40:54 +0200 Subject: [PATCH] add support for in memory HDF5 post-processing --- examples/example_utils.py | 109 +++++++++--- .../opencl/opencl_autotunable_kernel.py | 6 + hysop/operator/hdf_io.py | 70 +++++--- hysop/tools/io_utils.py | 155 ++++++++++++++---- hysop/tools/postprocess_dump.sh | 24 +++ requirements.txt | 3 +- 6 files changed, 298 insertions(+), 69 deletions(-) create mode 100755 hysop/tools/postprocess_dump.sh diff --git a/examples/example_utils.py b/examples/example_utils.py index 638918a46..ede181d4f 100644 --- a/examples/example_utils.py +++ b/examples/example_utils.py @@ -515,7 +515,7 @@ class HysopArgParser(argparse.ArgumentParser): help=('Specify timestep instead of a number of iterations '+ '(has priority over number of iterations).'+ ' This will be the initial timestep when using adaptive timestep.')) - simu.add_argument('-fts', '--fixed-timestep', action='store_true', + simu.add_argument('-fts', '--fixed-timestep', default=False, action='store_true', dest='fixed_timestep', help='Disable variable timestepping. In this case, timestep has to be specified with -dt or -nb_iter.') simu.add_argument('-mindt', '--min-timestep', type=float, @@ -637,7 +637,7 @@ class HysopArgParser(argparse.ArgumentParser): default='spectral', dest='vorticity_diffusion_mode', help='Enforce either spectral or directional diffusion with finite differences for vorticity. Vorticity is a special case for diffusion because vorticity diffusion can be computed at the same time as the Poisson operator which recovers the velocity. Spectral diffusion of vorticity when divergence-free field projection is enabled is virtually free. The default value is spectral diffusion mode.') method.add_argument('--enable-diffusion-substepping', - dest='enable_diffusion_substepping', action='store_true', + dest='enable_diffusion_substepping', default=False, action='store_true', help='Do not restrict timestep because of finite difference directional diffusion CFL but enforce substepping inside the operator depending on current timestep.') return method @@ -815,7 +815,7 @@ class HysopArgParser(argparse.ArgumentParser): dest='autotuner_dump_dir', help='Configure kernel autotuner dump directory.') autotuner.add_argument('--autotuner-cache-override', - action='store_true', + default=False, action='store_true', dest='autotuner_cache_override', help=('Override kernel autotuner cached data. Best kernels candidates will be stored in ' + 'a temporary directory instead of persistant system-wide cache directory.')) @@ -836,19 +836,19 @@ class HysopArgParser(argparse.ArgumentParser): dest='autotuner_verbose', help='Configure kernel autotuner kernel verbosity (0 to 5).') autotuner.add_argument('--autotuner-debug', - action='store_true', + default=False, action='store_true', dest='autotuner_debug', help='Configure kernel autotuner kernel debug flag.') autotuner.add_argument('--autotuner-dump-kernels', - action='store_true', + default=False, action='store_true', dest='autotuner_dump_kernels', help='Configure kernel autotuner kernel source dumping.') autotuner.add_argument('--autotuner-dump-isolation', - action='store_true', + default=False, action='store_true', dest='autotuner_dump_isolation', help='Configure kernel autotuner to generate oclgrind kernel isolation files for each optimal kernel.') autotuner.add_argument('--autotuner-dump-hash-logs', - action='store_true', + default=False, action='store_true', dest='autotuner_dump_hash_logs', help=('Configure kernel autotuner to generate kernel extra keywords hash logs ' +'for kernel caching debugging purposes.')) @@ -859,11 +859,11 @@ class HysopArgParser(argparse.ArgumentParser): +'A kernel that matches becomes candidate for statistics, postprocessing, source and isolation dump, if enabled. ' +'If not specified, all kernels are considered by default by using the generic \'.*\' pattern.')) autotuner.add_argument('--autotuner-plot-statistics', - action='store_true', + default=False, action='store_true', dest='autotuner_plot_statistics', help='Compute and plot tuning statistics for all tuned kernels.') autotuner.add_argument('--autotuner-bench-kernels', - action='store_true', + default=False, action='store_true', dest='autotuner_bench_kernels', help=('Enable standard bench mode for kernels: search without max candidates ' +'at maximum verbosity with cache override and nruns=8. ' @@ -970,6 +970,24 @@ class HysopArgParser(argparse.ArgumentParser): file_io.add_argument('--dump-tend', type=float, default=None, dest='dump_tend', help='Set global end time at which output are dumped for all IO params. Defaults to simulation end.') + file_io.add_argument('--dump-postprocess', type=str, default=None, + dest='postprocess_dump', + help=('Run a custom command after I/O dump: ' + +'command FILENAME\n' + +'See hysop/tools/postprocess_dump.sh for an example of post processing script.\n' + +'I/O can be postprocessed directly from RAM by setting --enable-ram-fs.')) + file_io.add_argument('--dump-is-temporary', default=False, action='store_true', + dest='dump_is_temporary', + help='Delete dumped data files after callback has been executed. Best used with --enable-ram-fs and --dump-post-process.') + file_io.add_argument('--enable-ram-fs', default=False, action='store_true', + dest='enable_ram_fs', + help='Dump I/O directly into RAM (if possible), else fallback to --dump-dir unless --force-ram-fs has been set.') + file_io.add_argument('--force-ram-fs', default=False, action='store_true', + dest='force_ram_fs', + help='Dump I/O directly into RAM (if possible), else raise an EnvironmentError. Implies --enable-ram-fs. When enabled --dump-dir is ignored.') + file_io.add_argument('--hdf5-disable-compression', default=False, action='store_true', + dest='hdf5_disable_compression', + help='Disable compression for HDF5 outputs (when available).') # list of additional named io_params to be generated generate_io_params = generate_io_params or () @@ -1002,6 +1020,24 @@ class HysopArgParser(argparse.ArgumentParser): type=float, default=None, dest='{}_dump_tend'.format(pname), help='Set end time at which output are dumped for IO parameter \'{}\'.'.format(pname)) + file_io.add_argument('--{}-dump-postprocess'.format(pname), type=str, default=None, + dest='{}_postprocess_dump'.format(pname), + help=('Run a custom command after {} I/O dump: '.format(pname) + +'command FILENAME\n' + +'See hysop/tools/postprocess_dump.sh for an example of post processing script.\n' + +'{} I/O can be postprocessed directly from RAM by setting --enable-ram-fs.'.format(pname))) + file_io.add_argument('--{}-dump-is-temporary'.format(pname), default=None, action='store_true', + dest='{}_dump_is_temporary'.format(pname), + help='Delete {} data files after callback has been executed. Best used with --enable-ram-fs and --dump-post-process.'.format(pname)) + file_io.add_argument('--{}-enable-ram-fs'.format(pname), default=None, action='store_true', + dest='{}_enable_ram_fs'.format(pname), + help='Dump I/O directly into RAM (if possible), else fallback to --dump-dir unless --force-ram-fs has been set.') + file_io.add_argument('--{}-force-ram-fs'.format(pname), default=None, action='store_true', + dest='{}_force_ram_fs'.format(pname), + help='Dump {} I/O directly into RAM (if possible), else raise an EnvironmentError. Implies --enable-ram-fs. When enabled --dump-dir is ignored.'.format(pname)) + file_io.add_argument('--{}-hdf5-disable-compression'.format(pname), default=None, action='store_true', + dest='{}_hdf5_disable_compression'.format(pname), + help='Disable compression for {} HDF5 outputs (when available).'.format(pname)) setattr(file_io, '{}_io'.format(pname), pargs) file_io.add_argument('--cache-dir', type=str, default=None, @@ -1033,9 +1069,11 @@ class HysopArgParser(argparse.ArgumentParser): self._check_default(args, ('debug_dump_dir', 'debug_dump_target'), str, allow_none=True) self._check_default(args, 'override_cache', bool, allow_none=True) self._check_default(args, 'clean', bool, allow_none=True) - self._check_default(args, 'cache_dir', str, allow_none=True) + self._check_default(args, ('cache_dir', 'postprocess_dump'), str, allow_none=True) self._check_dir(args, 'cache_dir', allow_shared=False, allow_none=True) - self._check_default(args, 'no_interactive', bool, allow_none=False) + self._check_default(args, ('no_interactive', 'dump_is_temporary', + 'enable_ram_fs', 'force_ram_fs', 'hdf5_disable_compression'), + bool, allow_none=False) self._check_default(args, 'dump_dir', str, allow_none=False) self._check_default(args, 'dump_freq', int, allow_none=True) @@ -1081,15 +1119,32 @@ class HysopArgParser(argparse.ArgumentParser): args.dump_times = tuple(sorted(args.dump_times)) times_of_interest.update(args.dump_times) + if args.force_ram_fs: + args.enable_ram_fs = True + if args.dump_is_temporary: + msg='Dump is temporary but no postprocessing script has been supplied' + assert (args.postprocess_dump is not None), msg + for pname in pnames: - for argname in ('dir','freq','period','times','tstart','tend'): - bname = 'dump_{}'.format(argname) + def _set_arg(args, argname, pname, prefix=''): + bname = '{}{}'.format(prefix, argname) vname = '{}_{}'.format(pname, bname) default_value = getattr(args, bname) actual_value = getattr(args, vname) if (actual_value is None): setattr(args, vname, default_value) value = getattr(args, vname) + return value + for argname in ('dir','freq','period','times','tstart','tend','is_temporary'): + _set_arg(args, argname, pname, prefix='dump_') + for argname in ('enable_ram_fs', 'force_ram_fs', 'hdf5_disable_compression', 'postprocess_dump'): + _set_arg(args, argname, pname) + if getattr(args, '{}_force_ram_fs'.format(pname)): + setattr(args, '{}_enable_ram_fs'.format(pname), True) + if getattr(args, '{}_dump_is_temporary'.format(pname)): + msg='{} dump is temporary but no postprocessing script has been supplied'.format(pname) + pd = getattr(args, '{}_postprocess_dump'.format(pname)) + assert (pd is not None), msg bname = '{}_dump'.format(pname) self._check_default(args, '{}_dir'.format(bname), str, allow_none=False) @@ -1099,6 +1154,10 @@ class HysopArgParser(argparse.ArgumentParser): self._check_default(args, '{}_tstart'.format(bname), float, allow_none=False) self._check_default(args, '{}_tend'.format(bname), float, allow_none=False) self._check_positive(args, '{}_freq'.format(bname), strict=False, allow_none=False) + self._check_default(args, + tuple(map(lambda k: '{}_{}'.format('{}'.format(pname), k), ('dump_is_temporary', + 'enable_ram_fs', 'force_ram_fs', 'hdf5_disable_compression'))), + bool, allow_none=False) self._check_dir(args, '{}_dir'.format(bname), allow_shared=True, allow_none=False) ststart = '{}_tstart'.format(bname) @@ -1282,7 +1341,7 @@ class HysopArgParser(argparse.ArgumentParser): def _check_default(self, args, argnames, types, allow_none=False): if not isinstance(argnames, tuple): argnames = (argnames,) - assert all(isinstance(a, str) for a in argnames) + assert all(isinstance(a, str) for a in argnames), argnames if not isinstance(types, tuple): types = (types,) assert all(isinstance(a, type) for a in types) @@ -1440,14 +1499,24 @@ class HysopArgParser(argparse.ArgumentParser): args.io_params = IOParams(filename=None, filepath=args.dump_dir, frequency=args.dump_freq, dump_times=args.dump_times, - dump_tstart=args.dump_tstart, dump_tend=args.dump_tend) + dump_tstart=args.dump_tstart, dump_tend=args.dump_tend, + enable_ram_fs=args.enable_ram_fs, force_ram_fs=args.force_ram_fs, + dump_is_temporary=args.dump_is_temporary, + postprocess_dump=args.postprocess_dump, + hdf5_disable_compression=args.hdf5_disable_compression) + for pname in self.generate_io_params: iop = IOParams(filename=None, - filepath = getattr(args, '{}_dump_dir'.format(pname)), - frequency = getattr(args, '{}_dump_freq'.format(pname)), - dump_times = getattr(args, '{}_dump_times'.format(pname)), - dump_tstart = getattr(args, '{}_dump_tstart'.format(pname)), - dump_tend = getattr(args, '{}_dump_tend'.format(pname))) + filepath = getattr(args, '{}_dump_dir'.format(pname)), + frequency = getattr(args, '{}_dump_freq'.format(pname)), + dump_times = getattr(args, '{}_dump_times'.format(pname)), + dump_tstart = getattr(args, '{}_dump_tstart'.format(pname)), + dump_tend = getattr(args, '{}_dump_tend'.format(pname)), + enable_ram_fs = getattr(args, '{}_enable_ram_fs'.format(pname)), + force_ram_fs = getattr(args, '{}_force_ram_fs'.format(pname)), + dump_is_temporary = getattr(args, '{}_dump_is_temporary'.format(pname)), + postprocess_dump = getattr(args, '{}_postprocess_dump'.format(pname)), + hdf5_disable_compression = getattr(args, '{}_hdf5_disable_compression'.format(pname))) setattr(args, '{}_io_params'.format(pname), iop) def _setup_implementation(self, args): diff --git a/hysop/backend/device/opencl/opencl_autotunable_kernel.py b/hysop/backend/device/opencl/opencl_autotunable_kernel.py index 4099ccf88..50756c8be 100644 --- a/hysop/backend/device/opencl/opencl_autotunable_kernel.py +++ b/hysop/backend/device/opencl/opencl_autotunable_kernel.py @@ -174,6 +174,11 @@ class OpenClAutotunableKernel(AutotunableKernel): print('POSTPROCESSING KERNEL {}:\n'.format(autotuner.name) + ' '.join(command)) try: subprocess.check_call(command) + except OSError as e: + msg="\nFATAL ERROR: Could not find or execute postprocessing script '{}'.".format(command[0]) + print msg + print + raise except subprocess.CalledProcessError as e: if (e.returncode == 10): msg="Postprocessing script has requested to stop the simulation (return code 10), exiting." @@ -183,6 +188,7 @@ class OpenClAutotunableKernel(AutotunableKernel): msg='\nFATAL ERROR: Failed to call autotuner postprocessing command.\n{}\n' msg=msg.format(' '.join(command)) print(msg) + print raise kernel = OpenClKernel(name=autotuner.name, program=program, diff --git a/hysop/operator/hdf_io.py b/hysop/operator/hdf_io.py index ecf8a2f78..fbb2b6448 100755 --- a/hysop/operator/hdf_io.py +++ b/hysop/operator/hdf_io.py @@ -7,9 +7,9 @@ * :class:`~HDF_IO` abstract interface for hdf io classes """ -import functools +import subprocess, sys, os, functools from abc import ABCMeta, abstractmethod -from hysop import __H5PY_PARALLEL_COMPRESSION_ENABLED__ +from hysop import __H5PY_PARALLEL_COMPRESSION_ENABLED__, vprint from hysop.deps import h5py, sys from hysop.core.graph.graph import discretized from hysop.constants import DirectionLabels, HYSOP_REAL, Backend, TranspositionState @@ -254,8 +254,9 @@ class HDF_IO(ComputationalGraphOperator): if not __H5PY_PARALLEL_COMPRESSION_ENABLED__: compression=None else: - self._hdf_file = h5py.File(filename.format(rk=self.topology.cart_rank), mode) - return compression + filename = filename.format(rk=self.topology.cart_rank) + self._hdf_file = h5py.File(filename, mode) + return (filename, compression) @classmethod def supports_multiple_topologies(cls): @@ -269,7 +270,7 @@ class HDF_Writer(HDF_IO): """ Print field(s) values on a given topo, in HDF5 format. """ - def __init__(self, variables, xmfalways=True, + def __init__(self, variables, name=None, pretty_name=None, **kwds): """ Write some fields data into hdf/xmdf files. @@ -277,10 +278,6 @@ class HDF_Writer(HDF_IO): Parameters ---------- - xmfalways : boolean, optional - true if xmf output must be updated at the same time - an hdf5 file is created (i.e. at each time step), - default=True kwds : base class arguments """ @@ -296,10 +293,7 @@ class HDF_Writer(HDF_IO): # count the number of calls self._count = 0 - if xmfalways: - self.step = self._step_HDF5_XMF - else: - self.step = self._step_HDF5 + self.step = self._step_HDF5 self._xdmf_data_files = [] # filename = prefix_N, N = counter value self._get_filename = self._input_fname @@ -366,8 +360,10 @@ class HDF_Writer(HDF_IO): def finalize(self): if self._xmf_file: - self.updateXMFFile() + filename = self._xmf_file.name self._xmf_file.close() + vprint('>Deleting XMF file {}...'.format(filename)) + os.remove(filename) def _input_fname(self, i): """Set output file name for current iteration""" @@ -475,7 +471,8 @@ class HDF_Writer(HDF_IO): # of the current output (count) and on the current process # rank. self._count = simu.current_iteration - compression = self.open_hdf(self._count, mode='w') + (filename, compression) = self.open_hdf(self._count, mode='w') + vprint('>Dumping HDF5 data to {}...'.format(filename)) # Get the names of output input_fields and create the corresponding # datasets @@ -512,13 +509,48 @@ class HDF_Writer(HDF_IO): raise RuntimeError(msg) self._xdmf_data_files.append((self._count, simu.t())) self._last_written_time = simu.t() - + self._hdf_file.close() - - def _step_HDF5_XMF(self, simu): - self._step_HDF5(simu) self.updateXMFFile() + if self.io_params.postprocess_dump: + postprocess_cmd = self.io_params.postprocess_dump + op_name = self.name + actual_filepath = self.io_params.filepath + disk_filepath = self.io_params.disk_filepath + xmf_file = self._xmf_file.name + hdf_file = filename + hdf_is_tmp = self.io_params.dump_is_temporary + + vprint('>Executing postprocessing script: {}'.format(postprocess_cmd)) + + # execute command OP_NAME ACTUAL_FILEPATH DISK_FILEPATH XMF_FILE HDF5_FILE IS_TMP + command = [str(postprocess_cmd), + str(op_name), str(actual_filepath), str(disk_filepath), + str(xmf_file), str(hdf_file), str(hdf_is_tmp)] + try: + subprocess.check_call(command) + except OSError as e: + msg="\nFATAL ERROR: Could not find or execute postprocessing script '{}'.".format(command[0]) + print msg + print + raise + except subprocess.CalledProcessError as e: + if (e.returncode == 10): + msg="Postprocessing script has requested to stop the simulation (return code 10), exiting." + vprint(msg) + sys.exit(0) + else: + msg='\nFATAL ERROR: Failed to call I/O postprocessing command.\n{}\n' + msg=msg.format(' '.join(command)) + print(msg) + print + raise + + if self.io_params.dump_is_temporary: + vprint('>Deleting HDF5 data {}...'.format(filename)) + os.remove(filename) + del self._xdmf_data_files[:] class HDF_Reader(HDF_IO): """ diff --git a/hysop/tools/io_utils.py b/hysop/tools/io_utils.py index 1209d9161..c837711e7 100755 --- a/hysop/tools/io_utils.py +++ b/hysop/tools/io_utils.py @@ -60,6 +60,31 @@ class IO(object): """ assert (cls._default_path is not None), 'default path has not been set.' return cls._default_path + + @classmethod + def default_ram_path(cls): + """Get the current default path used for io in memory. + + Returns + ------- + string + the default value of the current RAM i/o path. + """ + try: + import memory_tempfile + except ImportError as e: + print + print e + print + msg='You are trying to use a RAM filesystem but the \'mempory_tempfile\' is not present on your system.' + msg+='Get it from https://gitlab.com/keckj/memory-tempfile.' + raise RuntimeError(msg) + mt = memory_tempfile.MemoryTempfile(fallback=True) + if mt.found_mem_tempdir(): + return mt.gettempdir() + else: + return None + @staticmethod def check_dir(filepath, io_rank=0, comm=None): @@ -94,7 +119,7 @@ class IO(object): assert isinstance(pathdir, str) IO._default_path = pathdir IO.check_dir(IO._default_path) - + @classmethod def default_cache_path(cls): from hysop import get_env @@ -127,6 +152,10 @@ class IO(object): if IO._cache_path is None: IO.set_cache_path(IO.default_cache_path()) return IO._cache_path + + @classmethod + def ram_path(cls): + return cls.default_ram_path() @classmethod def get_tmp_dir(cls, key): @@ -195,7 +224,9 @@ class IOParams(namedtuple("IOParams", ['filename', 'filepath', 'dump_times_fp32', 'dump_times_fp64', 'dump_tstart', 'dump_tend', 'io_leader', 'visu_leader', - 'kwds'])): + 'enable_ram_fs', 'force_ram_fs', + 'dump_is_temporary', 'postprocess_dump', + 'hdf5_disable_compression', 'disk_filepath', 'kwds'])): """ A struct to handle I/O files parameters @@ -219,6 +250,20 @@ class IOParams(namedtuple("IOParams", ['filename', 'filepath', Rank of the mpi process dealing with the io. Default is 0. visu_leader : int Rank of the mpi process dealing with the graphical io. Default is 0. + enable_ram_fs: bool + Instruct the dumper to write directly to RAM, fallback to filepath/filename when this is not possible. + force_ram_fs: bool + Force the dumper to write directly to RAM, and raise an error when this is not possible (filepath/filename are ignored). + Implies enable_ram_fs. + dump_is_temporary: bool + Instruct the dumper to delete dumped data from disk or RAM after postprocessing script has been called. + Implies that a postprocessing script is supplied. + postprocess_dump: str + Path to a postprocessing script that will be called after dump. + See hysop/tools/postprocess_dump.sh for an example of post processing script. + hdf5_disable_compression: bool + Disable compression for HDF5 outputs (when available). + Can be used to accelerate in RAM postprocessing. kwds: dict Custom extra keyword arguments to pass to operators @@ -235,24 +280,51 @@ class IOParams(namedtuple("IOParams", ['filename', 'filepath', frequency=1, fileformat=None, dump_times=None, dump_tstart=None, dump_tend=None, io_leader=0, visu_leader=0, + enable_ram_fs=False, force_ram_fs=False, + dump_is_temporary=False, postprocess_dump=None, + hdf5_disable_compression=False, **kwds): - + dump_tstart = first_not_None(dump_tstart, -np.inf) dump_tend = first_not_None(dump_tend, +np.inf) fileformat = first_not_None(fileformat, IO.HDF5) + dump_times = first_not_None(dump_times, ()) - dump_times = first_not_None(dump_times, ()) - if (dump_times is not None): - check_instance(dump_times, tuple, values=(float,np.float64)) - dump_times_fp64 = tuple(map(np.float64, dump_times)) - dump_times_fp32 = tuple(map(np.float32, dump_times)) - else: - dump_times_fp32 = () - dump_times_fp64 = () - + check_instance(filename, str, allow_none=True) + check_instance(filepath, str, allow_none=True) check_instance(frequency, (int,long)) - check_instance(dump_tstart, float) - check_instance(dump_tend, float) + check_instance(dump_times, tuple, values=(float,np.float64)) + check_instance(dump_tstart, (int, long, float, np.float64)) + check_instance(dump_tend, (int, long, float, np.float64)) + check_instance(io_leader, (int,long)) + check_instance(visu_leader, (int,long)) + check_instance(enable_ram_fs, bool) + check_instance(force_ram_fs, bool) + check_instance(dump_is_temporary, bool) + check_instance(postprocess_dump, str, allow_none=True) + check_instance(hdf5_disable_compression, bool) + + frequency = int(frequency) + dump_tstart = float(dump_tstart) + dump_tend = float(dump_tend) + io_leader = int(io_leader) + visu_leader = int(visu_leader) + + dump_times_fp64 = tuple(map(np.float64, dump_times)) + dump_times_fp32 = tuple(map(np.float32, dump_times)) + + ram_path = IO.ram_path() + if force_ram_fs: + enable_ram_fs = True + if (ram_path is None): + raise RuntimeError('Could not find any memory based filesystem (ramfs, tmpfs).') + + disk_filepath = None + if enable_ram_fs and (ram_path is not None): + if filename: + assert not os.path.isabs(filename), filename + disk_filepath = filepath + filepath = ram_path # Filename is absolute path, filepath arg is ignored. if filename: @@ -276,14 +348,23 @@ class IOParams(namedtuple("IOParams", ['filename', 'filepath', filepath = os.path.abspath(filepath) else: filepath = IO.default_path() - IO.check_dir(filepath) + + if (disk_filepath is None): + disk_filepath = filepath + + if dump_is_temporary: + msg='Dump is temporary but no postprocessing script has been supplied' + assert (postprocess_dump is not None), msg + return super(IOParams, cls).__new__(cls, filename, filepath, frequency, fileformat, dump_times_fp32, dump_times_fp64, dump_tstart, dump_tend, io_leader, visu_leader, - kwds) + enable_ram_fs, force_ram_fs, + dump_is_temporary, postprocess_dump, + hdf5_disable_compression, disk_filepath, kwds) def should_dump(self, simulation, with_last=False): frequency = self.frequency @@ -303,9 +384,12 @@ class IOParams(namedtuple("IOParams", ['filename', 'filepath', return dump def clone(self, **kwds): - keys = ('filename', 'filepath', 'frequency', 'fileformat', + keys = ('filename', 'frequency', 'fileformat', 'dump_times', 'dump_tstart', 'dump_tend', - 'io_leader', 'visu_leader', 'kwds') + 'io_leader', 'visu_leader', + 'enable_ram_fs', 'force_ram_fs', + 'dump_is_temporary', 'postprocess_dump', + 'hdf5_disable_compression', 'kwds') diff = set(kwds.keys()).difference(keys) if diff: @@ -319,6 +403,9 @@ class IOParams(namedtuple("IOParams", ['filename', 'filepath', all_kwds[k] = v else: all_kwds[k] = kwds.get(k, getattr(self, k)) + + all_kwds['filepath'] = kwds.get('filepath', getattr(self, 'disk_filepath')) + return IOParams(**all_kwds) @property @@ -330,19 +417,29 @@ class IOParams(namedtuple("IOParams", ['filename', 'filepath', def to_string(self, prefix=''): ss=\ -'''filename: {} -filepath: {} -fileformat: {} -frequency: {} -dump_times: {} -dump_tstart: {} -dump_tend: {} -io_leader: {} -visu_leader: {} -extra_kwds: {}'''.format( +'''filename: {} +filepath: {} +fileformat: {} +frequency: {} +dump_times: {} +dump_tstart: {} +dump_tend: {} +io_leader: {} +visu_leader: {} +enable_ram_fs: {} +force_ram_fs: {} +dump_is_tmp: {} +post_process: {} +hdf5_no_compr: {} +extra_kwds: {}'''.format( self.filename, self.filepath, self.fileformat, self.frequency, self.dump_times, self.dump_tstart, self.dump_tend, - self.io_leader, self.visu_leader, self.kwds) + self.io_leader, self.visu_leader, + self.enable_ram_fs, self.force_ram_fs, + self.dump_is_temporary, + self.postprocess_dump, + self.hdf5_disable_compression, + self.kwds) return prefix+('\n'+prefix).join(ss.split('\n')) diff --git a/hysop/tools/postprocess_dump.sh b/hysop/tools/postprocess_dump.sh new file mode 100755 index 000000000..b8f9f7d54 --- /dev/null +++ b/hysop/tools/postprocess_dump.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# Example of I/O post processing script. +# Input arguments are: +# OP_NAME +# ACTUAL_FILEPATH DISK_FILEPATH +# XMF_FILE HDF5_FILE +# IS_TMP +# See example hysop/examples/example_utils.py interface and '--dump-postprocess' argument. + +set -e +if [ "$#" -ne 6 ]; then + echo "Script expected 6 parameters." + exit 1 +fi + +OP_NAME=${1} +ACTUAL_FILEPATH=${2} +DISK_FILEPATH=${3} +XMF_FILE=${4} +HDF5_FILE=${5} +IS_TMP=${6} + +echo ">Successfully postprocessed dump '$OP_NAME'." +exit 0 diff --git a/requirements.txt b/requirements.txt index cc4d20816..f99a62727 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,7 +11,6 @@ editdistance portalocker tee ansicolors -backports.weakref argparse_color_formatter primefac pybind11 @@ -21,3 +20,5 @@ mpi4py matplotlib numba configparser +backports.tempfile +backports.weakref -- GitLab