From 9080c603524a82fbd9baa166472091d632c0fbdf Mon Sep 17 00:00:00 2001
From: Jean-Matthieu Etancelin <jean-matthieu.etancelin@univ-pau.fr>
Date: Tue, 23 Mar 2021 12:01:30 +0100
Subject: [PATCH] Fix tasks automatic communication

---
 hysop/core/graph/graph_builder.py |   8 +-
 hysop_examples/tasks/tasks.py     | 177 ++++++++++++++++++++++++++++++
 2 files changed, 180 insertions(+), 5 deletions(-)
 create mode 100755 hysop_examples/tasks/tasks.py

diff --git a/hysop/core/graph/graph_builder.py b/hysop/core/graph/graph_builder.py
index 9fe384fdf..7764b221e 100644
--- a/hysop/core/graph/graph_builder.py
+++ b/hysop/core/graph/graph_builder.py
@@ -340,7 +340,6 @@ class GraphBuilder(object):
             comm = domain.parent_comm
             tcomm = domain.task_comm()
             current_task = domain.current_task()
-            gprint(current_task, available_elems, needed_elems)
 
             def _name_to_key(n, d):
                 var = [_ for _ in d.keys() if isinstance(_, str) and _ == n]
@@ -399,19 +398,19 @@ class GraphBuilder(object):
                 kwargs = {}
                 s_topo, r_topo, comm_dir = (None, )*3
                 ae, ne = [_name_to_key(p, _) for _ in available_elems, needed_elems]
-                if not ae is None:
+                if not ae is None and p in available_names:
                     var = ae
                     t = available_names[p]
                     topo = available_elems[ae]
                     comm_dir = 'src'
                     s_topo = topo
-                if not ne is None:
+                if not ne is None and p in needed_names:
                     var = ne
                     t = needed_names[p]
                     topo = needed_elems[ne]
                     comm_dir = 'dest'
                     r_topo = topo
-                if not (ae is None or ne is None):
+                if not (s_topo is None or r_topo is None):
                     comm_dir = 'src&dest'
                     t = None
                 assert not comm_dir is None
@@ -434,7 +433,6 @@ class GraphBuilder(object):
                                'target_topo': r_topo,
                                'other_task_id': t,
                                })
-
                 yield kwargs
 
         # iterate over ComputationalNodes
diff --git a/hysop_examples/tasks/tasks.py b/hysop_examples/tasks/tasks.py
new file mode 100755
index 000000000..1ce91f784
--- /dev/null
+++ b/hysop_examples/tasks/tasks.py
@@ -0,0 +1,177 @@
+#!/usr/bin/env python2
+import numpy as np
+import sympy as sm
+
+TASK_A = 111
+TASK_B = 222
+
+
+def compute(args):
+    '''
+    HySoP Tasks Example: Initialize a field with a space and time dependent analytic formula one task A, then compute on other task B
+    '''
+    from hysop import Field, Box, IOParams, MPIParams, \
+        Simulation, Problem, ScalarParameter
+    from hysop.constants import HYSOP_DEFAULT_TASK_ID, Implementation
+    from hysop.operators import CustomOperator, Dummy
+    has_tasks = not args.proc_tasks is None
+
+    # Define domain
+    npts = args.npts
+    box = Box(origin=args.box_origin, length=args.box_length, dim=args.ndim, proc_tasks=args.proc_tasks)
+    print box.proc_tasks
+
+    # Define parameters and field (time and analytic field)
+    t = ScalarParameter('t', dtype=args.dtype)
+    A = Field(domain=box, name='A', dtype=args.dtype)
+    B = Field(domain=box, name='B', dtype=args.dtype)
+    C = Field(domain=box, name='C', dtype=args.dtype)
+
+    # Build the mpi_params for each task (keep in mind tha some procs can be defined on both tasks)
+    if has_tasks:
+        mpi_params = {TASK_A: None, TASK_B: None}
+        for tk in (TASK_A, TASK_B):
+            #print tk, box.is_on_task(tk)
+            mpi_params[tk] = MPIParams(comm=box.task_comm(tk), task_id=tk, on_task=box.is_on_task(tk))
+            print "\n".join([" ** {}: {}".format(a, b) for a, b in mpi_params.iteritems()])
+    else:
+        mpi_params = MPIParams(comm=box.task_comm(), task_id=HYSOP_DEFAULT_TASK_ID)
+    impl = args.impl
+
+    # Setup implementation specific variables
+
+    def _get_op_kwds(tk):
+        if has_tasks:
+            op_kwds = {'mpi_params': mpi_params[tk]}
+        else:
+            op_kwds = {'mpi_params': mpi_params}
+        if (impl is Implementation.OPENCL):
+            # For the OpenCL implementation we need to setup the compute device
+            # and configure how the code is generated and compiled at runtime.
+
+            # Create an explicit OpenCL context from user parameters
+            from hysop.backend.device.opencl.opencl_tools import get_or_create_opencl_env, get_device_number
+            cl_env = get_or_create_opencl_env(
+                mpi_params=mpi_params[tk],
+                platform_id=args.cl_platform_id,
+                device_id=box.machine_rank % get_device_number() if args.cl_device_id is None else None)
+
+            # Configure OpenCL kernel generation and tuning (already done by HysopArgParser)
+            from hysop.methods import OpenClKernelConfig
+            method = {OpenClKernelConfig: args.opencl_kernel_config}
+
+            # Setup opencl specific extra operator keyword arguments
+            op_kwds['cl_env'] = cl_env
+            op_kwds['method'] = method
+        return op_kwds
+
+    # Analytic initialization method depends on chosen implementation
+    if (impl is Implementation.PYTHON):
+        def compute_scalar(data, coords, component):
+            data[...] = np.sum(x*x for x in coords)
+    elif (impl is Implementation.OPENCL):
+        compute_scalar *= np.sum(xi*xi for xi in box.frame.coords)
+    else:
+        msg = 'Unknown implementation {}.'.format(impl)
+
+    def custom_func1(_Ai, _Ao):
+        _Ai.data[0][...] = np.cos(_Ai.data[0])
+
+    def custom_func2(_A, _B):
+        _B.data[0][...] = _A.data[0]**2
+
+    def custom_func3(_B, _C, _A):
+        _A.data[0][...] = _B.data[0]+_C.data[0]**2
+
+# TaskA      |   TaskB
+# op1 A->A   |
+# op2 A->B   |
+#         `--B--,              // Inter-task communication step
+#            |  op3 B,C->A
+#         ,--A--'              // Inter-task communication step
+# endA       |
+
+# Note : without endA operator, the second communication is not
+# automatically inserted because A field is an output for task A
+# and cannot be invalidated by op3 on othre task. Inter-task
+# invalidation in graph building is not yet implemented
+
+    custom1 = CustomOperator(name='custom_op1',
+                             func=custom_func1, invars=(A, ), outvars=(A,),
+                             variables={A: npts, }, implementation=impl,
+                             **_get_op_kwds(TASK_A))
+    custom2 = CustomOperator(name='custom_op2',
+                             func=custom_func2, invars=(A, ), outvars=(B,),
+                             variables={A: npts, B: npts}, implementation=impl,
+                             **_get_op_kwds(TASK_A))
+    custom3 = CustomOperator(name='custom_op3',
+                             func=custom_func3, invars=(B, C), outvars=(A,),
+                             variables={A: npts, B: npts, C: npts}, implementation=impl,
+                             **_get_op_kwds(TASK_B))
+    endA = Dummy(name='endA',
+                 variables={A: npts, }, implementation=impl,
+                 **_get_op_kwds(TASK_A))
+
+    # Create the problem we want to solve and insert our operator
+    problem = Problem()
+    problem.insert(custom1, custom2, custom3, endA)
+    problem.build(args)
+
+    # Create a simulation and solve the problem
+    # (do not forget to specify the time parameter here)
+    simu = Simulation(start=args.tstart, end=args.tend,
+                      nb_iter=args.nb_iter, dt0=args.dt,
+                      max_iter=args.max_iter,
+                      times_of_interest=args.dump_times,
+                      t=t)
+
+    def init_C(data, coords, component):
+        data[...] = np.sin(np.sum(x*x for x in coords))
+
+    # Finally solve the problem
+    if not has_tasks or box.is_on_task(TASK_A):
+        problem.initialize_field(A, formula=compute_scalar)
+    if not has_tasks or box.is_on_task(TASK_B):
+        problem.initialize_field(C, formula=init_C)
+    problem.solve(simu, dry_run=args.dry_run)
+
+    for dA in A.discrete_fields.values():
+        if not np.allclose(dA.data[0], 1.):
+            print np.min(dA.data[0]), np.max(dA.data[0])
+        else:
+            print("Ok")
+
+    # Finalize
+    problem.finalize()
+
+
+if __name__ == '__main__':
+    import mpi4py.MPI as mpi
+    from hysop_examples.example_utils import HysopArgParser, colors
+
+    class TasksHysopArgParser(HysopArgParser):
+        def _add_main_args(self):
+            args = super(TasksHysopArgParser, self)._add_main_args()
+            args.add_argument('--proc-tasks', type=str,
+                              action=self.eval, container=tuple, append=False,
+                              dest='proc_tasks', help='Domain proc_task parameter.')
+
+        def _check_main_args(self, args):
+            super(TasksHysopArgParser, self)._check_main_args(args)
+            self._check_default(args, 'proc_tasks', tuple, allow_none=False)
+    prog_name = 'tasks'
+    default_dump_dir = '{}/hysop_examples/{}'.format(TasksHysopArgParser.tmp_dir(), prog_name)
+
+    description = colors.color('HySoP Task Example: ', fg='blue', style='bold')
+    description += 'Initialize a field with a space and time dependent analytic formula one task A, then compute on other task B.'
+
+    parser = TasksHysopArgParser(prog_name=prog_name,
+                                 description=description,
+                                 default_dump_dir=default_dump_dir)
+
+    proc_tasks = tuple([TASK_A if _ == 0 else TASK_B for _ in range(mpi.COMM_WORLD.Get_size())])
+    parser.set_defaults(box_start=(0.0,), box_length=(2*np.pi,),
+                        tstart=0.0, tend=10.0, nb_iter=1,
+                        dump_freq=5, proc_tasks=proc_tasks)
+    print(proc_tasks)
+    parser.run(compute)
-- 
GitLab