Skip to content
Snippets Groups Projects
Commit 9080c603 authored by EXT Jean-Matthieu Etancelin's avatar EXT Jean-Matthieu Etancelin
Browse files

Fix tasks automatic communication

parent ae06ac0b
No related branches found
No related tags found
2 merge requests!24Resolve "Add python3.x support",!15WIP: Resolve "HySoP with tasks"
......@@ -340,7 +340,6 @@ class GraphBuilder(object):
comm = domain.parent_comm
tcomm = domain.task_comm()
current_task = domain.current_task()
gprint(current_task, available_elems, needed_elems)
def _name_to_key(n, d):
var = [_ for _ in d.keys() if isinstance(_, str) and _ == n]
......@@ -399,19 +398,19 @@ class GraphBuilder(object):
kwargs = {}
s_topo, r_topo, comm_dir = (None, )*3
ae, ne = [_name_to_key(p, _) for _ in available_elems, needed_elems]
if not ae is None:
if not ae is None and p in available_names:
var = ae
t = available_names[p]
topo = available_elems[ae]
comm_dir = 'src'
s_topo = topo
if not ne is None:
if not ne is None and p in needed_names:
var = ne
t = needed_names[p]
topo = needed_elems[ne]
comm_dir = 'dest'
r_topo = topo
if not (ae is None or ne is None):
if not (s_topo is None or r_topo is None):
comm_dir = 'src&dest'
t = None
assert not comm_dir is None
......@@ -434,7 +433,6 @@ class GraphBuilder(object):
'target_topo': r_topo,
'other_task_id': t,
})
yield kwargs
# iterate over ComputationalNodes
......
#!/usr/bin/env python2
import numpy as np
import sympy as sm
TASK_A = 111
TASK_B = 222
def compute(args):
'''
HySoP Tasks Example: Initialize a field with a space and time dependent analytic formula one task A, then compute on other task B
'''
from hysop import Field, Box, IOParams, MPIParams, \
Simulation, Problem, ScalarParameter
from hysop.constants import HYSOP_DEFAULT_TASK_ID, Implementation
from hysop.operators import CustomOperator, Dummy
has_tasks = not args.proc_tasks is None
# Define domain
npts = args.npts
box = Box(origin=args.box_origin, length=args.box_length, dim=args.ndim, proc_tasks=args.proc_tasks)
print box.proc_tasks
# Define parameters and field (time and analytic field)
t = ScalarParameter('t', dtype=args.dtype)
A = Field(domain=box, name='A', dtype=args.dtype)
B = Field(domain=box, name='B', dtype=args.dtype)
C = Field(domain=box, name='C', dtype=args.dtype)
# Build the mpi_params for each task (keep in mind tha some procs can be defined on both tasks)
if has_tasks:
mpi_params = {TASK_A: None, TASK_B: None}
for tk in (TASK_A, TASK_B):
#print tk, box.is_on_task(tk)
mpi_params[tk] = MPIParams(comm=box.task_comm(tk), task_id=tk, on_task=box.is_on_task(tk))
print "\n".join([" ** {}: {}".format(a, b) for a, b in mpi_params.iteritems()])
else:
mpi_params = MPIParams(comm=box.task_comm(), task_id=HYSOP_DEFAULT_TASK_ID)
impl = args.impl
# Setup implementation specific variables
def _get_op_kwds(tk):
if has_tasks:
op_kwds = {'mpi_params': mpi_params[tk]}
else:
op_kwds = {'mpi_params': mpi_params}
if (impl is Implementation.OPENCL):
# For the OpenCL implementation we need to setup the compute device
# and configure how the code is generated and compiled at runtime.
# Create an explicit OpenCL context from user parameters
from hysop.backend.device.opencl.opencl_tools import get_or_create_opencl_env, get_device_number
cl_env = get_or_create_opencl_env(
mpi_params=mpi_params[tk],
platform_id=args.cl_platform_id,
device_id=box.machine_rank % get_device_number() if args.cl_device_id is None else None)
# Configure OpenCL kernel generation and tuning (already done by HysopArgParser)
from hysop.methods import OpenClKernelConfig
method = {OpenClKernelConfig: args.opencl_kernel_config}
# Setup opencl specific extra operator keyword arguments
op_kwds['cl_env'] = cl_env
op_kwds['method'] = method
return op_kwds
# Analytic initialization method depends on chosen implementation
if (impl is Implementation.PYTHON):
def compute_scalar(data, coords, component):
data[...] = np.sum(x*x for x in coords)
elif (impl is Implementation.OPENCL):
compute_scalar *= np.sum(xi*xi for xi in box.frame.coords)
else:
msg = 'Unknown implementation {}.'.format(impl)
def custom_func1(_Ai, _Ao):
_Ai.data[0][...] = np.cos(_Ai.data[0])
def custom_func2(_A, _B):
_B.data[0][...] = _A.data[0]**2
def custom_func3(_B, _C, _A):
_A.data[0][...] = _B.data[0]+_C.data[0]**2
# TaskA | TaskB
# op1 A->A |
# op2 A->B |
# `--B--, // Inter-task communication step
# | op3 B,C->A
# ,--A--' // Inter-task communication step
# endA |
# Note : without endA operator, the second communication is not
# automatically inserted because A field is an output for task A
# and cannot be invalidated by op3 on othre task. Inter-task
# invalidation in graph building is not yet implemented
custom1 = CustomOperator(name='custom_op1',
func=custom_func1, invars=(A, ), outvars=(A,),
variables={A: npts, }, implementation=impl,
**_get_op_kwds(TASK_A))
custom2 = CustomOperator(name='custom_op2',
func=custom_func2, invars=(A, ), outvars=(B,),
variables={A: npts, B: npts}, implementation=impl,
**_get_op_kwds(TASK_A))
custom3 = CustomOperator(name='custom_op3',
func=custom_func3, invars=(B, C), outvars=(A,),
variables={A: npts, B: npts, C: npts}, implementation=impl,
**_get_op_kwds(TASK_B))
endA = Dummy(name='endA',
variables={A: npts, }, implementation=impl,
**_get_op_kwds(TASK_A))
# Create the problem we want to solve and insert our operator
problem = Problem()
problem.insert(custom1, custom2, custom3, endA)
problem.build(args)
# Create a simulation and solve the problem
# (do not forget to specify the time parameter here)
simu = Simulation(start=args.tstart, end=args.tend,
nb_iter=args.nb_iter, dt0=args.dt,
max_iter=args.max_iter,
times_of_interest=args.dump_times,
t=t)
def init_C(data, coords, component):
data[...] = np.sin(np.sum(x*x for x in coords))
# Finally solve the problem
if not has_tasks or box.is_on_task(TASK_A):
problem.initialize_field(A, formula=compute_scalar)
if not has_tasks or box.is_on_task(TASK_B):
problem.initialize_field(C, formula=init_C)
problem.solve(simu, dry_run=args.dry_run)
for dA in A.discrete_fields.values():
if not np.allclose(dA.data[0], 1.):
print np.min(dA.data[0]), np.max(dA.data[0])
else:
print("Ok")
# Finalize
problem.finalize()
if __name__ == '__main__':
import mpi4py.MPI as mpi
from hysop_examples.example_utils import HysopArgParser, colors
class TasksHysopArgParser(HysopArgParser):
def _add_main_args(self):
args = super(TasksHysopArgParser, self)._add_main_args()
args.add_argument('--proc-tasks', type=str,
action=self.eval, container=tuple, append=False,
dest='proc_tasks', help='Domain proc_task parameter.')
def _check_main_args(self, args):
super(TasksHysopArgParser, self)._check_main_args(args)
self._check_default(args, 'proc_tasks', tuple, allow_none=False)
prog_name = 'tasks'
default_dump_dir = '{}/hysop_examples/{}'.format(TasksHysopArgParser.tmp_dir(), prog_name)
description = colors.color('HySoP Task Example: ', fg='blue', style='bold')
description += 'Initialize a field with a space and time dependent analytic formula one task A, then compute on other task B.'
parser = TasksHysopArgParser(prog_name=prog_name,
description=description,
default_dump_dir=default_dump_dir)
proc_tasks = tuple([TASK_A if _ == 0 else TASK_B for _ in range(mpi.COMM_WORLD.Get_size())])
parser.set_defaults(box_start=(0.0,), box_length=(2*np.pi,),
tstart=0.0, tend=10.0, nb_iter=1,
dump_freq=5, proc_tasks=proc_tasks)
print(proc_tasks)
parser.run(compute)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment