diff --git a/ci/scripts/build_and_test.sh b/ci/scripts/build_and_test.sh index e1dde33c1ac31275443047ce1585233b527108cb..7d916c2bb484992d02ae4bf6b98e1547b061e8df 100755 --- a/ci/scripts/build_and_test.sh +++ b/ci/scripts/build_and_test.sh @@ -38,6 +38,7 @@ cp -r /hysop "${HYSOP_DIR}" cd "${HYSOP_DIR}" ${SCRIPT_DIR}/version.sh +rm -rf ${HYSOP_BUILD_DIR} meson setup ${HYSOP_BUILD_DIR} meson compile -C ${HYSOP_BUILD_DIR} meson install -C ${HYSOP_BUILD_DIR} diff --git a/ci/scripts/run_examples.sh b/ci/scripts/run_examples.sh index b69abfcadfbd9b47f0366706b98d06fe7bc806db..b34c08797d71079bb2098af8b17bcdf7f146e112 100755 --- a/ci/scripts/run_examples.sh +++ b/ci/scripts/run_examples.sh @@ -52,3 +52,24 @@ example_test "bubble/periodic_bubble_levelset_penalization.py" #LLVM bug for DP example_test "bubble/periodic_jet_levelset.py" example_test "particles_above_salt/particles_above_salt_periodic.py" example_test "particles_above_salt/particles_above_salt_symmetrized.py" + + +# Tasks examples (parallel) +MPIRUN_EXECUTABLE=${MPIRUN_EXECUTABLE:-mpirun} +MPIRUN_TASKS_OPTION='-np' +if [ "${MPIRUN_EXECUTABLE}" = "srun" ]; then MPIRUN_TASKS_OPTION='-n'; fi +if [[ ${MPIRUN_EXECUTABLE} == *"mpirun"* ]]; then MPIRUN_TASKS_OPTION='--oversubscribe '${MPIRUN_TASKS_OPTION}; fi +MPIRUN_FAIL_EARLY="-mca orte_abort_on_non_zero_status 1" +MPIRUN_ARGS="${MPIRUN_FAIL_EARLY} ${MPIRUN_TASKS_OPTION} 2" +COMMON_EXAMPLE_OPTIONS='-VNC -cp float -maxit 2 --autotuner-max-candidates 1 --save-checkpoint --checkpoint-dump-freq 0 --checkpoint-dump-period 0 --checkpoint-dump-last --checkpoint-dump-times' +example_test() { + test=$1 + echo + echo "EXAMPLE $1" + echo "========$(printf '=%.0s' `seq ${#1}`)" + ${PYTHON_EXECUTABLE} -Wd "${EXAMPLE_DIR}/${1}" -d16,16,32 -sd 32,32,64 ${COMMON_EXAMPLE_OPTIONS} + ${MPIRUN_EXECUTABLE} ${MPIRUN_ARGS} ${PYTHON_EXECUTABLE} -Wd "${EXAMPLE_DIR}/${1}" -d16,16,32 -sd 32,32,64 --proc-tasks '(111,222),(111,)' ${COMMON_EXAMPLE_OPTIONS} + ${MPIRUN_EXECUTABLE} ${MPIRUN_ARGS} ${PYTHON_EXECUTABLE} -Wd "${EXAMPLE_DIR}/${1}" -d16,16,32 -sd 32,32,64 --proc-tasks '111,222' ${COMMON_EXAMPLE_OPTIONS} + echo +} +example_test "scalar_advection/turbulent_scalar_advection.py" diff --git a/ci/utils/run_ci.sh b/ci/utils/run_ci.sh index 816aa7d057fc98a884e90178570b094990f5f333..3a13a9683253b2a3abaf07b296a5a07f477023c7 100755 --- a/ci/utils/run_ci.sh +++ b/ci/utils/run_ci.sh @@ -22,11 +22,10 @@ set -feu -o pipefail SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" DOCKER_IMAGE_TAG=${1:-ci_cpu_intel} HYSOP_REGISTRY_URL='gricad-registry.univ-grenoble-alpes.fr' -DOCKER_IMG="${HYSOP_REGISTRY_URL}/particle_methods/hysop/${DOCKER_IMAGE_TAG}:latest" CONTAINER_ID='hysop_build_and_test' -if [[ ${DOCKER_IMAGE_TAG} == *_nvidia ]]; then - EXTRA_ARGS="--gpus all" +if [[ ${DOCKER_IMAGE_TAG} == *_nvidia* ]]; then + EXTRA_ARGS="--runtime=nvidia --gpus all" else EXTRA_ARGS="" fi @@ -37,13 +36,8 @@ function remove_img() { } trap remove_img INT TERM EXIT KILL -remove_img - -docker create --cap-add=SYS_PTRACE ${EXTRA_ARGS} -v "${SCRIPT_DIR}/../..:/hysop:ro" --name="${CONTAINER_ID}" -it "${DOCKER_IMG}" - +docker create --cap-add=SYS_PTRACE ${EXTRA_ARGS} --name="${CONTAINER_ID}" -it -v "${SCRIPT_DIR}/../..:/hysop:ro" "${HYSOP_REGISTRY_URL}/particle_methods/hysop/${DOCKER_IMAGE_TAG}:latest" docker start "${CONTAINER_ID}" - -docker exec "${CONTAINER_ID}" /hysop/ci/scripts/build_and_test.sh - -# on test success, upload hysop cache to the docker images -docker commit "${CONTAINER_ID}" "${DOCKER_IMG}" +# Interactive shell is asked to activate micromamba in shell. Interactivity is not needed +docker exec -it "${CONTAINER_ID}" bash -i -c "MESON_TESTTHREADS=1 /hysop/ci/scripts/build_and_test.sh" +docker exec -it "${CONTAINER_ID}" bash -i -c "MESON_TESTTHREADS=1 EXAMPLE_DIR=/hysop/hysop_examples/examples /hysop/ci/scripts/run_examples.sh" diff --git a/hysop/core/graph/graph_builder.py b/hysop/core/graph/graph_builder.py index f832e928d34377c853826e81f2b07bc92c56e786..2ed1e4eb9549eb70bc720a0d4715af04f77d5c54 100644 --- a/hysop/core/graph/graph_builder.py +++ b/hysop/core/graph/graph_builder.py @@ -472,17 +472,22 @@ class GraphBuilder: continue else: ot_needs.append(_n) - ot_provide = can_provide = [ - _ for _ in ot_needs if _ in available_names - ] + can_provide = [_ for _ in ot_needs if _ in available_names] + to_remove = [] for prov in can_provide: - available_names[prov] = needed_elems[ - _name_to_key(prov, needed_elems) - ].task_id - for _op in ot_provide: - needed_names[_op] = available_elems[ - _name_to_key(_op, available_elems) - ].task_id + ae = available_elems[ + _name_to_key(prov, available_elems) + ] + ne = needed_elems[_name_to_key(prov, needed_elems)] + if ae.task_id != ot and ne.task_id == ot: + available_names[prov] = ne.task_id + needed_names[prov] = ae.task_id + else: + to_remove.append(prov) + for rm in to_remove: + can_provide.remove(rm) + available_names[rm] = None + needed_names[rm] = None else: comm.isend( list(needed_names.keys()), @@ -944,43 +949,18 @@ class GraphBuilder: available_elems.update(self.output_params) # Find redistribute candidates - mgs = " >[IT] Current task {} parameters and fields : {}" - gprint( - mgs.format( - "can communicate", - ", ".join( - { - _ if not hasattr(_, "name") else _.name - for _ in available_elems.keys() - } - - self._intertasks_exchanged - ), - ) - ) - gprint( - mgs.format( - "needs", - ", ".join( - { - _ if not hasattr(_, "name") else _.name - for _ in needed_elems.keys() - } - - self._intertasks_exchanged - ), - ) - ) - - for _k in available_elems: - if _k in needed_elems.keys(): - needed_elems.pop(_k) for it_redistribute_kwargs in __find_elements_to_redistribute( available_elems, needed_elems ): if it_redistribute_kwargs: - node = RedistributeInter(**it_redistribute_kwargs) + if "variable" in it_redistribute_kwargs.keys(): + node = RedistributeInter(**it_redistribute_kwargs) + else: + node = RedistributeInterParam(**it_redistribute_kwargs) node_id = len(target_node.nodes) target_node.push_nodes(node) - node.initialize() + if isinstance(node, RedistributeInter): + node.initialize() gprint( " >Handling node {}: {} {}".format( node_id, node.name, node.__class__ diff --git a/hysop/core/mpi/redistribute.py b/hysop/core/mpi/redistribute.py index a1600861778abbb4b77df8af80fb2cfe7457bdc8..6086c557199ba001909bf348f5f3316c425e18f3 100644 --- a/hysop/core/mpi/redistribute.py +++ b/hysop/core/mpi/redistribute.py @@ -855,8 +855,6 @@ class RedistributeInterParam(ComputationalGraphOperator): self.target_task = other_task_id if target_topo is None else target_topo.task_id self.task_is_source = domain.is_on_task(self.source_task) self.task_is_target = domain.is_on_task(self.target_task) - print(self.task_is_source, self.source_task, source_topo) - print(self.task_is_target, self.target_task, target_topo) if self.task_is_source: assert source_topo.on_task if self.task_is_target: diff --git a/hysop/core/tests/test_checkpoint.sh b/hysop/core/tests/test_checkpoint.sh index ffa9194c5d3c03c7a2e9f8c59ef72c041f369e96..d2ab000b183336c974a92871548104d00d0f6607 100755 --- a/hysop/core/tests/test_checkpoint.sh +++ b/hysop/core/tests/test_checkpoint.sh @@ -24,7 +24,7 @@ MPIRUN_EXECUTABLE=${MPIRUN_EXECUTABLE:-mpirun} MPIRUN_ARGS='-np' MPIRUN_FAIL_EARLY="-mca orte_abort_on_non_zero_status 1" if [ "${MPIRUN_EXECUTABLE}" = "srun" ]; then MPIRUN_ARGS='-n'; fi -if [ "${MPIRUN_EXECUTABLE}" = "mpirun" ]; then MPIRUN_ARGS='--oversubscribe '${MPIRUN_ARGS}; fi +if [[ ${MPIRUN_EXECUTABLE} == *"mpirun"* ]]; then MPIRUN_ARGS='--oversubscribe '${MPIRUN_ARGS}; fi MPIRUN_ARGS="${MPIRUN_FAIL_EARLY} ${MPIRUN_ARGS}" SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" diff --git a/hysop/core/tests/test_tasks.sh b/hysop/core/tests/test_tasks.sh index fcae4e9ab18781ff179a7482cea0b57b1074d9a6..2efc8b7308d517855738e7813fb428565cd06e7a 100755 --- a/hysop/core/tests/test_tasks.sh +++ b/hysop/core/tests/test_tasks.sh @@ -23,7 +23,7 @@ PYTHON_EXECUTABLE=${PYTHON_EXECUTABLE:-python3} MPIRUN_EXECUTABLE=${MPIRUN_EXECUTABLE:-mpirun} MPIRUN_TASKS_OPTION='-np' if [ "${MPIRUN_EXECUTABLE}" = "srun" ]; then MPIRUN_TASKS_OPTION='-n'; fi -if [ "${MPIRUN_EXECUTABLE}" = "mpirun" ]; then MPIRUN_TASKS_OPTION='--oversubscribe '${MPIRUN_TASKS_OPTION}; fi +if [[ ${MPIRUN_EXECUTABLE} == *"mpirun"* ]]; then MPIRUN_TASKS_OPTION='--oversubscribe '${MPIRUN_TASKS_OPTION}; fi MPIRUN_FAIL_EARLY="-mca orte_abort_on_non_zero_status 1" MPIRUN_ARGS="${MPIRUN_FAIL_EARLY} ${MPIRUN_TASKS_OPTION} 4" diff --git a/hysop/operator/tests/test_custom_symbolic.py b/hysop/operator/tests/test_custom_symbolic.py index ac3458c3b326049d44e3dbee502ff388c19f46af..07ce879a79362eec004a743a914bf8e78053277f 100644 --- a/hysop/operator/tests/test_custom_symbolic.py +++ b/hysop/operator/tests/test_custom_symbolic.py @@ -422,9 +422,9 @@ class TestCustomSymbolic: refin["f"][field] = ifield.sdata.get().handle.copy() refin["dfields"][field] = ifield in_names.append(field.name) - for pname, param in problem.input_params.items(): - refin["p"][pname] = param.value - in_names.append(pname) + for param in problem.input_params.keys(): + refin["p"][param.name] = param.value + in_names.append(param.name) refout = {"f": {}, "p": {}, "dfields": {}} out_names = [] @@ -440,11 +440,11 @@ class TestCustomSymbolic: res = res[ofield.compute_slices] refout["f"][field] += (res,) out_names.append(field.name + f"::{i}") - for pname, param in problem.output_params.items(): - refout["p"][pname] = compute_outputs( + for param in problem.output_params.keys(): + refout["p"][param.name] = compute_outputs( refin["f"], refin["p"], refin["dfields"], param, None ) - out_names.append(pname) + out_names.append(param.name) problem.apply(**apply_kwds) diff --git a/hysop/problem.py b/hysop/problem.py index b332cdfe8ac1a76a663ee65a015c8cbf01c67c67..c5a33b6941af48a2a9cc270fe78067c5e12a501f 100644 --- a/hysop/problem.py +++ b/hysop/problem.py @@ -89,8 +89,11 @@ class Problem(ComputationalGraph): msg = f" Problem {msg} achieved, exiting ! " vprint_banner(msg, at_border=2) sys.exit(0) - size = self.mpi_params.size - avg_time = self.mpi_params.comm.allreduce(tm.interval) / size + comm = self.mpi_params.comm + if self.domain.has_tasks: + comm = self.domain.parent_comm + size = comm.Get_size() + avg_time = comm.allreduce(tm.interval) / size msg = " Problem building took {} ({}s)" if size > 1: msg += f", averaged over {size} ranks. " @@ -263,8 +266,11 @@ class Problem(ComputationalGraph): if report_freq and (simu.current_iteration % report_freq) == 0: self.profiler_report() - size = self.mpi_params.size - avg_time = self.mpi_params.comm.allreduce(tm.interval) / size + comm = self.mpi_params.comm + if self.domain.has_tasks: + comm = self.domain.parent_comm + size = comm.Get_size() + avg_time = comm.allreduce(tm.interval) / size msg = " Simulation took {} ({}s)" if size > 1: msg += f", averaged over {size} ranks. " diff --git a/hysop_examples/examples/scalar_advection/turbulent_scalar_advection.py b/hysop_examples/examples/scalar_advection/turbulent_scalar_advection.py index 7ccb3a96425080eed2dbc90184f94f462622d4d4..d2b428ced1772036a307556cde0904f7f3b4031a 100644 --- a/hysop_examples/examples/scalar_advection/turbulent_scalar_advection.py +++ b/hysop_examples/examples/scalar_advection/turbulent_scalar_advection.py @@ -21,6 +21,7 @@ # Example from JM Etancelin PhD (section 6.3) import os import numpy as np +import mpi4py.MPI as MPI pi = np.pi cos = np.cos @@ -28,6 +29,9 @@ sin = np.sin TASK_UW = 111 TASK_SCALAR = 222 +if MPI.COMM_WORLD.Get_size() == 1: + TASK_UW = 999 + TASK_SCALAR = 999 def compute(args): @@ -58,7 +62,6 @@ def compute(args): StrangSplitting, ParameterPlotter, DirectionalAdvection, - InterTaskParamComm, HDF_Writer, ) from hysop.topology.cartesian_topology import CartesianTopology @@ -290,9 +293,6 @@ def compute(args): criteria=AdvectionCriteria.W_INF, mpi_params=mpi_params[TASK_UW], ) - dt_broadcast = InterTaskParamComm( - parameter=(dt,), domain=box, source_task=TASK_UW, dest_task=TASK_SCALAR - ) # > Outputs dumpU = HDF_Writer( @@ -352,11 +352,12 @@ def compute(args): # Create a simulation # (do not forget to specify the t and dt parameters here) simu = Simulation( - start=0.0, - end=5.0, - max_iter=9999, + start=args.tstart, + end=args.tend, + nb_iter=args.nb_iter, + max_iter=args.max_iter, dt0=1e-5, - times_of_interest=(4.5,), + times_of_interest=args.times_of_interest, t=t, dt=dt, ) @@ -374,8 +375,8 @@ def compute(args): ) # Initialize fields on tasks where needed - problem.initialize_field(velo, formula=init_velocity) if box.is_on_task(TASK_UW): + problem.initialize_field(velo, formula=init_velocity) problem.initialize_field(vorti, formula=init_vorticity) if box.is_on_task(TASK_SCALAR): problem.initialize_field(scal, formula=init_scal) @@ -427,6 +428,7 @@ if __name__ == "__main__": ndim=3, tstart=0.0, tend=12.0, + times_of_interest=(4.5,), npts=(64, 64, 128), snpts=(128, 128, 256), dump_period=1.0, diff --git a/hysop_examples/examples/tasks/tasks.py b/hysop_examples/examples/tasks/tasks.py index c6fa055f398eaf09cbe0790bf8985f8af02398db..2034b7702ee84c52b2d01ddac7f156758add7416 100755 --- a/hysop_examples/examples/tasks/tasks.py +++ b/hysop_examples/examples/tasks/tasks.py @@ -57,10 +57,13 @@ def compute(args): ) # Define parameters and field (time and analytic field) - t = ScalarParameter("t", dtype=args.dtype) + t = ScalarParameter("t", dtype=args.dtype, initial_value=0.0) + p = ScalarParameter("p", dtype=args.dtype, initial_value=1.0, quiet=False) + q = ScalarParameter("q", dtype=args.dtype, initial_value=1.0, quiet=False) A = Field(domain=box, name="A", dtype=args.dtype) B = Field(domain=box, name="B", dtype=args.dtype) C = Field(domain=box, name="C", dtype=args.dtype) + D = Field(domain=box, name="D", dtype=args.dtype) if has_tasks_overlapping: Ap = Field(domain=box, name="Ap", dtype=args.dtype) else: @@ -124,22 +127,27 @@ def compute(args): else: msg = f"Unknown implementation {impl}." - def custom_func1(_Ai, _Ao): - _Ai.data[0][...] = np.cos(_Ai.data[0]) + def custom_func1(_Ai, _Ao, _p, _q): + _p.value = args.dtype(np.min(_Ai.data[0])) + _q.value = args.dtype(np.min(_Ai.data[0])) + _Ao.data[0][...] = np.cos(_Ai.data[0]) - def custom_func2(_A, _B): + def custom_func2(_A, _B, _C): _B.data[0][...] = _A.data[0] ** 2 + _C.data[0][...] = 1.0 - def custom_func3(_B, _C, _A): - _A.data[0][...] = _B.data[0] + _C.data[0] ** 2 + def custom_func3(_B, _C, _D, _p, _q, _A): + _A.data[0][...] = ( + _B.data[0] + (_C.data[0] + _D.data[0] - 1.0) ** 2 + _p() + _q() + ) - # TaskA | TaskB - # op1 A->A | - # op2 A->B | - # `--B--, // Inter-task communication step - # | op3 B,C->A - # ,--A--' // Inter-task communication step - # endA A->A | + # TaskA | TaskB + # op1 A->A,p,q | + # op2 A->B,C | + # `--B,C,p,q--, // Inter-task communication step + # | op3 p,q,B,C,D->A + # ,-----A---' // Inter-task communication step + # endA A->A | # Note : without endA operator, the second communication is not # automatically inserted because A field is an output for task A @@ -154,7 +162,7 @@ def compute(args): name="custom_op1", func=custom_func1, invars=(A,), - outvars=(A,), + outvars=(p, q, A), variables={ A: npts, }, @@ -165,17 +173,17 @@ def compute(args): name="custom_op2", func=custom_func2, invars=(A,), - outvars=(B,), - variables={A: npts, B: npts}, + outvars=(B, C), + variables={A: npts, B: npts, C: npts}, implementation=impl, **_get_op_kwds(TASK_A), ) custom3 = CustomOperator( name="custom_op3", func=custom_func3, - invars=(B, C), + invars=(p, q, B, C, D), outvars=(Ap,), - variables={Ap: npts, B: npts, C: npts}, + variables={Ap: npts, B: npts, C: npts, D: npts}, implementation=impl, **_get_op_kwds(TASK_B), ) @@ -220,14 +228,14 @@ def compute(args): t=t, ) - def init_C(data, coords, component): + def init_D(data, coords, component): data[...] = np.sin(sum(x * x for x in coords)) # Finally solve the problem if not has_tasks or box.is_on_task(TASK_A): problem.initialize_field(A, formula=compute_scalar) if not has_tasks or box.is_on_task(TASK_B): - problem.initialize_field(C, formula=init_C) + problem.initialize_field(D, formula=init_D) problem.solve(simu, dry_run=args.dry_run) for dA in set(A.discrete_fields.values()).union(set(Ap.discrete_fields.values())):