Skip to content
Snippets Groups Projects
Commit 22a15d03 authored by EXT Jean-Matthieu Etancelin's avatar EXT Jean-Matthieu Etancelin
Browse files

update tasks example

parent dfea54d3
No related branches found
No related tags found
No related merge requests found
...@@ -67,6 +67,7 @@ def compute(args): ...@@ -67,6 +67,7 @@ def compute(args):
AdvectionCriteria, AdvectionCriteria,
HYSOP_REAL, HYSOP_REAL,
StretchingFormulation, StretchingFormulation,
HYSOP_DEFAULT_TASK_ID,
) )
from hysop.defaults import ( from hysop.defaults import (
VelocityField, VelocityField,
...@@ -139,27 +140,25 @@ def compute(args): ...@@ -139,27 +140,25 @@ def compute(args):
lcfl = 0.15 lcfl = 0.15
dump_freq = args.dump_freq dump_freq = args.dump_freq
# Get default MPI Parameters from domain (even for serial jobs) # Get the mpi parameters
mpi_params = MPIParams(comm=main_comm) has_tasks = not args.proc_tasks is None
mpi_params_s = MPIParams( if has_tasks:
comm=box.get_task_comm(TASK_SCALAR), mpi_params = {TASK_UW: None, TASK_SCALAR: None, HYSOP_DEFAULT_TASK_ID: None}
task_id=TASK_SCALAR, for tk in (TASK_UW, TASK_SCALAR, HYSOP_DEFAULT_TASK_ID):
on_task=TASK_SCALAR == args.proc_tasks[main_rank], mpi_params[tk] = MPIParams(
) comm=box.get_task_comm(tk), task_id=tk, on_task=box.is_on_task(tk)
mpi_params_uw = MPIParams( )
comm=box.get_task_comm(TASK_UW), else:
task_id=TASK_UW, mpi_params = MPIParams(comm=box.task_comm, task_id=HYSOP_DEFAULT_TASK_ID)
on_task=TASK_UW == args.proc_tasks[main_rank],
)
cl_env = None cl_env = None
if box.is_on_task(TASK_SCALAR): if box.is_on_task(TASK_SCALAR):
# Create an explicit OpenCL context from user parameters # Create an explicit OpenCL context from user parameters
cl_env = get_or_create_opencl_env( cl_env = get_or_create_opencl_env(
mpi_params=mpi_params_s, mpi_params=mpi_params[TASK_SCALAR],
platform_id=0, platform_id=0,
device_id=box.machine_rank % get_device_number(), device_id=box.machine_rank % get_device_number(),
) )
# Setup usual implementation specific variables # Setup usual implementation specific variables
impl = None impl = None
method = {} method = {}
...@@ -182,7 +181,7 @@ def compute(args): ...@@ -182,7 +181,7 @@ def compute(args):
advected_fields=(scal,), advected_fields=(scal,),
variables={velo: npts_uw, scal: npts_s}, variables={velo: npts_uw, scal: npts_s},
dt=dt, dt=dt,
mpi_params=mpi_params_s, mpi_params=mpi_params[TASK_SCALAR],
cl_env=cl_env, cl_env=cl_env,
) )
diffuse_scal = DirectionalDiffusion( diffuse_scal = DirectionalDiffusion(
...@@ -192,7 +191,7 @@ def compute(args): ...@@ -192,7 +191,7 @@ def compute(args):
coeffs=DIFF_COEFF_SCAL, coeffs=DIFF_COEFF_SCAL,
variables={scal: npts_s}, variables={scal: npts_s},
dt=dt, dt=dt,
mpi_params=mpi_params_s, mpi_params=mpi_params[TASK_SCALAR],
cl_env=cl_env, cl_env=cl_env,
) )
splitting_scal = StrangSplitting( splitting_scal = StrangSplitting(
...@@ -208,7 +207,7 @@ def compute(args): ...@@ -208,7 +207,7 @@ def compute(args):
advected_fields=(vorti,), advected_fields=(vorti,),
variables={velo: npts_uw, vorti: npts_uw}, variables={velo: npts_uw, vorti: npts_uw},
dt=dt, dt=dt,
mpi_params=mpi_params_uw, mpi_params=mpi_params[TASK_UW],
) )
# > Directional stretching # > Directional stretching
stretch = StaticDirectionalStretching( stretch = StaticDirectionalStretching(
...@@ -219,7 +218,7 @@ def compute(args): ...@@ -219,7 +218,7 @@ def compute(args):
vorticity=vorti, vorticity=vorti,
variables={velo: npts_uw, vorti: npts_uw}, variables={velo: npts_uw, vorti: npts_uw},
dt=dt, dt=dt,
mpi_params=mpi_params_uw, mpi_params=mpi_params[TASK_UW],
) )
# > Directional splitting operator subgraph # > Directional splitting operator subgraph
splitting = StrangSplitting( splitting = StrangSplitting(
...@@ -234,7 +233,7 @@ def compute(args): ...@@ -234,7 +233,7 @@ def compute(args):
Fin=vorti, Fin=vorti,
variables={vorti: npts_uw}, # topo_nogh}, variables={vorti: npts_uw}, # topo_nogh},
dt=dt, dt=dt,
mpi_params=mpi_params_uw, mpi_params=mpi_params[TASK_UW],
) )
# > Poisson operator to recover the velocity from the vorticity # > Poisson operator to recover the velocity from the vorticity
poisson = PoissonCurl( poisson = PoissonCurl(
...@@ -244,7 +243,7 @@ def compute(args): ...@@ -244,7 +243,7 @@ def compute(args):
vorticity=vorti, vorticity=vorti,
variables={velo: npts_uw, vorti: npts_uw}, variables={velo: npts_uw, vorti: npts_uw},
projection=None, projection=None,
mpi_params=mpi_params_uw, mpi_params=mpi_params[TASK_UW],
) )
# > Operator to compute the infinite norm of the velocity # > Operator to compute the infinite norm of the velocity
min_max_U = MinMaxFieldStatistics( min_max_U = MinMaxFieldStatistics(
...@@ -253,7 +252,7 @@ def compute(args): ...@@ -253,7 +252,7 @@ def compute(args):
Finf=True, Finf=True,
implementation=Implementation.PYTHON, implementation=Implementation.PYTHON,
variables={velo: npts_uw}, variables={velo: npts_uw},
mpi_params=mpi_params_uw, mpi_params=mpi_params[TASK_UW],
) )
# > Operator to compute the infinite norm of the vorticity # > Operator to compute the infinite norm of the vorticity
min_max_W = MinMaxFieldStatistics( min_max_W = MinMaxFieldStatistics(
...@@ -262,7 +261,7 @@ def compute(args): ...@@ -262,7 +261,7 @@ def compute(args):
Finf=True, Finf=True,
implementation=Implementation.PYTHON, implementation=Implementation.PYTHON,
variables={vorti: npts_uw}, variables={vorti: npts_uw},
mpi_params=mpi_params_uw, mpi_params=mpi_params[TASK_UW],
) )
# > Operator to compute the enstrophy # > Operator to compute the enstrophy
enstrophy_op = Enstrophy( enstrophy_op = Enstrophy(
...@@ -272,21 +271,24 @@ def compute(args): ...@@ -272,21 +271,24 @@ def compute(args):
WdotW=wdotw, WdotW=wdotw,
variables={vorti: npts_uw, wdotw: npts_uw}, variables={vorti: npts_uw, wdotw: npts_uw},
implementation=Implementation.PYTHON, implementation=Implementation.PYTHON,
mpi_params=mpi_params_uw, mpi_params=mpi_params[TASK_UW],
) )
# Adaptive timestep operator # Adaptive timestep operator
adapt_dt = AdaptiveTimeStep( adapt_dt = AdaptiveTimeStep(
dt, max_dt=dt_max, equivalent_CFL=True, mpi_params=mpi_params_uw dt, max_dt=dt_max, equivalent_CFL=True, mpi_params=mpi_params[TASK_UW]
) )
dt_cfl = adapt_dt.push_cfl_criteria( dt_cfl = adapt_dt.push_cfl_criteria(
cfl=cfl, Finf=min_max_U.Finf, equivalent_CFL=True, mpi_params=mpi_params_uw cfl=cfl,
Finf=min_max_U.Finf,
equivalent_CFL=True,
mpi_params=mpi_params[TASK_UW],
) )
dt_advec = adapt_dt.push_advection_criteria( dt_advec = adapt_dt.push_advection_criteria(
lcfl=lcfl, lcfl=lcfl,
Finf=min_max_W.Finf, Finf=min_max_W.Finf,
criteria=AdvectionCriteria.W_INF, criteria=AdvectionCriteria.W_INF,
mpi_params=mpi_params_uw, mpi_params=mpi_params[TASK_UW],
) )
dt_broadcast = InterTaskParamComm( dt_broadcast = InterTaskParamComm(
parameter=(dt,), domain=box, source_task=TASK_UW, dest_task=TASK_SCALAR parameter=(dt,), domain=box, source_task=TASK_UW, dest_task=TASK_SCALAR
...@@ -302,7 +304,8 @@ def compute(args): ...@@ -302,7 +304,8 @@ def compute(args):
with_last=True, with_last=True,
), ),
variables={velo: npts_uw}, variables={velo: npts_uw},
mpi_params=mpi_params_uw, var_names={velo[0]: "UX", velo[1]: "UY", velo[2]: "UZ"},
mpi_params=mpi_params[TASK_UW],
) )
dumpS = HDF_Writer( dumpS = HDF_Writer(
name="dumpS", name="dumpS",
...@@ -313,7 +316,8 @@ def compute(args): ...@@ -313,7 +316,8 @@ def compute(args):
with_last=True, with_last=True,
), ),
variables={scal: npts_s}, variables={scal: npts_s},
mpi_params=mpi_params_s, var_names={scal: "S"},
mpi_params=mpi_params[TASK_SCALAR],
) )
# Create the problem we want to solve and insert our # Create the problem we want to solve and insert our
...@@ -329,15 +333,15 @@ def compute(args): ...@@ -329,15 +333,15 @@ def compute(args):
Interpolation: Interpolation.LINEAR, Interpolation: Interpolation.LINEAR,
} }
) )
problem = Problem(method=method, mpi_params=mpi_params) problem = Problem(method=method, mpi_params=mpi_params[TASK_UW])
problem.insert( problem.insert(
advec, advec,
splitting, splitting,
diffuse, diffuse,
splitting_scal,
poisson, poisson,
splitting_scal, # task_scal
dumpS, # task_scal
dumpU, dumpU,
dumpS,
enstrophy_op, enstrophy_op,
min_max_U, min_max_U,
min_max_W, min_max_W,
...@@ -384,7 +388,7 @@ def compute(args): ...@@ -384,7 +388,7 @@ def compute(args):
if __name__ == "__main__": if __name__ == "__main__":
from examples.example_utils import HysopArgParser, colors from hysop_examples.argparser import HysopArgParser, colors
from hysop.core.mpi import MPI, main_size, main_rank, main_comm from hysop.core.mpi import MPI, main_size, main_rank, main_comm
class TurbulentScalarAdvectionArgParser(HysopArgParser): class TurbulentScalarAdvectionArgParser(HysopArgParser):
...@@ -406,9 +410,11 @@ if __name__ == "__main__": ...@@ -406,9 +410,11 @@ if __name__ == "__main__":
def _add_main_args(self): def _add_main_args(self):
args = super()._add_main_args() args = super()._add_main_args()
args.add_argument( args.add_argument(
"-pt",
"--proc-tasks", "--proc-tasks",
action="store_true", type=str,
action=self.eval,
container=tuple,
append=False,
dest="proc_tasks", dest="proc_tasks",
help="Specify the tasks for each proc.", help="Specify the tasks for each proc.",
) )
...@@ -425,8 +431,9 @@ if __name__ == "__main__": ...@@ -425,8 +431,9 @@ if __name__ == "__main__":
snpts=(128, 128, 256), snpts=(128, 128, 256),
dump_period=1.0, dump_period=1.0,
dump_freq=100, dump_freq=100,
proc_tasks=[ proc_tasks=tuple(
TASK_SCALAR if _ % main_size == 0 else TASK_UW for _ in range(main_size) (TASK_SCALAR, TASK_UW) if _ % main_size == 0 else (TASK_UW,)
], for _ in range(main_size)
),
) )
parser.run(compute) parser.run(compute)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment