diff --git a/hysop_examples/examples/scalar_advection/turbulent_scalar_advection.py b/hysop_examples/examples/scalar_advection/turbulent_scalar_advection.py index f89da7c45b60416ba5f2a302d4cc19825ed9a8d6..7ccb3a96425080eed2dbc90184f94f462622d4d4 100644 --- a/hysop_examples/examples/scalar_advection/turbulent_scalar_advection.py +++ b/hysop_examples/examples/scalar_advection/turbulent_scalar_advection.py @@ -67,6 +67,7 @@ def compute(args): AdvectionCriteria, HYSOP_REAL, StretchingFormulation, + HYSOP_DEFAULT_TASK_ID, ) from hysop.defaults import ( VelocityField, @@ -139,27 +140,25 @@ def compute(args): lcfl = 0.15 dump_freq = args.dump_freq - # Get default MPI Parameters from domain (even for serial jobs) - mpi_params = MPIParams(comm=main_comm) - mpi_params_s = MPIParams( - comm=box.get_task_comm(TASK_SCALAR), - task_id=TASK_SCALAR, - on_task=TASK_SCALAR == args.proc_tasks[main_rank], - ) - mpi_params_uw = MPIParams( - comm=box.get_task_comm(TASK_UW), - task_id=TASK_UW, - on_task=TASK_UW == args.proc_tasks[main_rank], - ) - + # Get the mpi parameters + has_tasks = not args.proc_tasks is None + if has_tasks: + mpi_params = {TASK_UW: None, TASK_SCALAR: None, HYSOP_DEFAULT_TASK_ID: None} + for tk in (TASK_UW, TASK_SCALAR, HYSOP_DEFAULT_TASK_ID): + mpi_params[tk] = MPIParams( + comm=box.get_task_comm(tk), task_id=tk, on_task=box.is_on_task(tk) + ) + else: + mpi_params = MPIParams(comm=box.task_comm, task_id=HYSOP_DEFAULT_TASK_ID) cl_env = None if box.is_on_task(TASK_SCALAR): # Create an explicit OpenCL context from user parameters cl_env = get_or_create_opencl_env( - mpi_params=mpi_params_s, + mpi_params=mpi_params[TASK_SCALAR], platform_id=0, device_id=box.machine_rank % get_device_number(), ) + # Setup usual implementation specific variables impl = None method = {} @@ -182,7 +181,7 @@ def compute(args): advected_fields=(scal,), variables={velo: npts_uw, scal: npts_s}, dt=dt, - mpi_params=mpi_params_s, + mpi_params=mpi_params[TASK_SCALAR], cl_env=cl_env, ) diffuse_scal = DirectionalDiffusion( @@ -192,7 +191,7 @@ def compute(args): coeffs=DIFF_COEFF_SCAL, variables={scal: npts_s}, dt=dt, - mpi_params=mpi_params_s, + mpi_params=mpi_params[TASK_SCALAR], cl_env=cl_env, ) splitting_scal = StrangSplitting( @@ -208,7 +207,7 @@ def compute(args): advected_fields=(vorti,), variables={velo: npts_uw, vorti: npts_uw}, dt=dt, - mpi_params=mpi_params_uw, + mpi_params=mpi_params[TASK_UW], ) # > Directional stretching stretch = StaticDirectionalStretching( @@ -219,7 +218,7 @@ def compute(args): vorticity=vorti, variables={velo: npts_uw, vorti: npts_uw}, dt=dt, - mpi_params=mpi_params_uw, + mpi_params=mpi_params[TASK_UW], ) # > Directional splitting operator subgraph splitting = StrangSplitting( @@ -234,7 +233,7 @@ def compute(args): Fin=vorti, variables={vorti: npts_uw}, # topo_nogh}, dt=dt, - mpi_params=mpi_params_uw, + mpi_params=mpi_params[TASK_UW], ) # > Poisson operator to recover the velocity from the vorticity poisson = PoissonCurl( @@ -244,7 +243,7 @@ def compute(args): vorticity=vorti, variables={velo: npts_uw, vorti: npts_uw}, projection=None, - mpi_params=mpi_params_uw, + mpi_params=mpi_params[TASK_UW], ) # > Operator to compute the infinite norm of the velocity min_max_U = MinMaxFieldStatistics( @@ -253,7 +252,7 @@ def compute(args): Finf=True, implementation=Implementation.PYTHON, variables={velo: npts_uw}, - mpi_params=mpi_params_uw, + mpi_params=mpi_params[TASK_UW], ) # > Operator to compute the infinite norm of the vorticity min_max_W = MinMaxFieldStatistics( @@ -262,7 +261,7 @@ def compute(args): Finf=True, implementation=Implementation.PYTHON, variables={vorti: npts_uw}, - mpi_params=mpi_params_uw, + mpi_params=mpi_params[TASK_UW], ) # > Operator to compute the enstrophy enstrophy_op = Enstrophy( @@ -272,21 +271,24 @@ def compute(args): WdotW=wdotw, variables={vorti: npts_uw, wdotw: npts_uw}, implementation=Implementation.PYTHON, - mpi_params=mpi_params_uw, + mpi_params=mpi_params[TASK_UW], ) # Adaptive timestep operator adapt_dt = AdaptiveTimeStep( - dt, max_dt=dt_max, equivalent_CFL=True, mpi_params=mpi_params_uw + dt, max_dt=dt_max, equivalent_CFL=True, mpi_params=mpi_params[TASK_UW] ) dt_cfl = adapt_dt.push_cfl_criteria( - cfl=cfl, Finf=min_max_U.Finf, equivalent_CFL=True, mpi_params=mpi_params_uw + cfl=cfl, + Finf=min_max_U.Finf, + equivalent_CFL=True, + mpi_params=mpi_params[TASK_UW], ) dt_advec = adapt_dt.push_advection_criteria( lcfl=lcfl, Finf=min_max_W.Finf, criteria=AdvectionCriteria.W_INF, - mpi_params=mpi_params_uw, + mpi_params=mpi_params[TASK_UW], ) dt_broadcast = InterTaskParamComm( parameter=(dt,), domain=box, source_task=TASK_UW, dest_task=TASK_SCALAR @@ -302,7 +304,8 @@ def compute(args): with_last=True, ), variables={velo: npts_uw}, - mpi_params=mpi_params_uw, + var_names={velo[0]: "UX", velo[1]: "UY", velo[2]: "UZ"}, + mpi_params=mpi_params[TASK_UW], ) dumpS = HDF_Writer( name="dumpS", @@ -313,7 +316,8 @@ def compute(args): with_last=True, ), variables={scal: npts_s}, - mpi_params=mpi_params_s, + var_names={scal: "S"}, + mpi_params=mpi_params[TASK_SCALAR], ) # Create the problem we want to solve and insert our @@ -329,15 +333,15 @@ def compute(args): Interpolation: Interpolation.LINEAR, } ) - problem = Problem(method=method, mpi_params=mpi_params) + problem = Problem(method=method, mpi_params=mpi_params[TASK_UW]) problem.insert( advec, splitting, diffuse, - splitting_scal, poisson, + splitting_scal, # task_scal + dumpS, # task_scal dumpU, - dumpS, enstrophy_op, min_max_U, min_max_W, @@ -384,7 +388,7 @@ def compute(args): if __name__ == "__main__": - from examples.example_utils import HysopArgParser, colors + from hysop_examples.argparser import HysopArgParser, colors from hysop.core.mpi import MPI, main_size, main_rank, main_comm class TurbulentScalarAdvectionArgParser(HysopArgParser): @@ -406,9 +410,11 @@ if __name__ == "__main__": def _add_main_args(self): args = super()._add_main_args() args.add_argument( - "-pt", "--proc-tasks", - action="store_true", + type=str, + action=self.eval, + container=tuple, + append=False, dest="proc_tasks", help="Specify the tasks for each proc.", ) @@ -425,8 +431,9 @@ if __name__ == "__main__": snpts=(128, 128, 256), dump_period=1.0, dump_freq=100, - proc_tasks=[ - TASK_SCALAR if _ % main_size == 0 else TASK_UW for _ in range(main_size) - ], + proc_tasks=tuple( + (TASK_SCALAR, TASK_UW) if _ % main_size == 0 else (TASK_UW,) + for _ in range(main_size) + ), ) parser.run(compute)