Skip to content
Snippets Groups Projects
Commit acc65bff authored by Franck Pérignon's avatar Franck Pérignon
Browse files

Update redistribute and bridges. Temp

parent 1bb71a52
No related branches found
No related tags found
No related merge requests found
......@@ -36,10 +36,10 @@ class BridgeInter(object):
self.target_id = target_id
assert isinstance(current, Cartesian)
assert isinstance(parent, MPI.Intracomm)
self._topology = current
self._domain = current.domain
# current task id
current_task = self._domain.currentTask()
current_task = self._topology.domain.currentTask()
# True if current process is in the 'from' group'
task_is_source = current_task() == self.source_id
......@@ -59,7 +59,7 @@ class BridgeInter(object):
# rank of the first proc belonging to the remote task
# (used as remote leader)
proc_tasks = self._domain.tasks_list()
proc_tasks = self._topology.domain.tasks_list()
if task_is_source:
remote_leader = proc_tasks.index(self.target_id)
elif task_is_target:
......@@ -70,8 +70,7 @@ class BridgeInter(object):
current_indices, remote_indices = self._swap_indices()
self._tranfer_indices = {}
rank = self._topology.rank
current = current_indices[rank]
current = current_indices[self._topology.rank]
for rk in remote_indices:
inter = topotools.intersl(current, remote_indices[rk])
if inter is not None:
......@@ -92,11 +91,11 @@ class BridgeInter(object):
# To allocate remote_indices array, we need the size of
# the remote communicator.
remote_size = self.comm.Get_remote_size()
dimension = self._domain.dimension
dimension = self._topology.domain.dimension
remote_indices = npw.dim_zeros((dimension * 2, remote_size))
# Then they are broadcasted to the remote communicator
rank = self._topology.rank
current_task = self._domain.currentTask()
current_task = self._topology.domain.currentTask()
if current_task is self.source_id:
# Local 0 broadcast current_indices to remote comm
if rank == 0:
......@@ -121,7 +120,7 @@ class BridgeInter(object):
return current_indices, remote_indices
def transferTypes(self, data_shape):
def transferTypes(self):
"""
Return the dictionnary of MPI derived types
used for send (if on source) or receive (if on target)
......@@ -129,6 +128,7 @@ class BridgeInter(object):
@return : a dict of MPI types
"""
if self._transfer_types is None:
data_shape = self._topology.mesh.resolution
self._transfer_types = topotools.createSubArray(
self._tranfer_indices, data_shape)
return self._transfer_types
"""
@file redistribute_intercomm.py
Setup for data transfer/redistribution between a single parmes topology based
on different MPI communicators with null intersection (for example
by Comm_Split). One of the topology is labeled as the source and the other is
the destination.
It relies on a Bridge_intercomm.
"""
from parmepy.constants import debug, S_DIR
from parmepy.mpi.bridge_inter import BridgeInter
from parmepy.operator.redistribute_new import Redistribute
from parmepy.operator.computational import Computational
from parmepy.mpi.topology import Cartesian
class RedistributeInter(Redistribute):
"""
Interconnection between two topologies on different sub set of MPI process.
SetUp will compute a Bridge_intercomm between a single topology.
Transfers data from topology of id_from to the id_to.
"""
@debug
def __init__(self, parent, source_id=None, target_id=None, **kwds):
"""
Create an operator to distribute data between two mpi topologies for a
list of variables.
@param proc_tasks: python array specifying the task id of each of
the parent_comm MPI intracommunicator.
@remark : proc_tasks size and number of processus in parent_comm
must be equal.
"""
super(RedistributeInter, self).__init__(**kwds)
## parent communicator, that must contains all processes
## involved in source and target tasks.
self.parent = parent
# set source and targets ids.
# They must be known before setup.
# Either they can be infered from source and target
# or must be set in argument list, if either source
# or target is undefined on the current process.
if self._source is None:
assert source_id is not None
else:
assert source_id == self._source.task_id()
if self._target is None:
assert target_id is not None
else:
assert target_id == self._target.task_id()
self._source_id = source_id
self._target_id = target_id
# Set list of variables and domain.
self._set_variables()
# Domain is set, we can check if we are on source or target
current_task = self.domain.currentTask()
self._is_source = current_task == self._source_id
self._is_target = current_task == self._target_id
assert self._is_target or self._is_source
assert not (self._is_target and self._is_source)
nbprocs = len(self.domain.tasks_list())
msg = "Parent communicator size and number of procs "
msg += "in domain differ."
assert parent.Get_size() == len(nbprocs), msg
# the local topology. May be either source or target
# depending on the task of the current process.
self._topology = None
self._transfer_types = None
self._requests = {}
def _set_variables(self):
"""
Check/set the list of variables proceed by the current operator.
This can (must) be called at init.
"""
# Set list of variables.
# It depends on :
# - the type of source/target : Cartesian, Computational or None
# - the args variables : a list of variables or None
# Possible cases:
# - if source or target is None --> variables is required
# - if source and target are Cartesian --> variables is required
# - in all other cases, variables is optional.
# If variables are not set at init,
# they must be infered from source/target operators.
has_var = len(self.variables) > 0
vlist = (v for v in self.variables)
if self._source or self._target is None:
assert len(self.variables) > 0
self.variables = [v for v in vlist]
else:
source_is_topo = isinstance(self._source, Cartesian)
target_is_topo = isinstance(self._target, Cartesian)
# both source and target are topologies. Variables required.
if source_is_topo and target_is_topo:
msg = 'Redistribute, a list of variables is required at init.'
assert has_var, msg
self.variables = [v for v in vlist]
elif not source_is_topo and not target_is_topo:
# both source and target are operators
# --> intersection of their variables
vsource = self._source.variables
vtarget = self._target.variables
if not has_var:
vlist = (v for v in vsource if v in vtarget)
self.variables = [v for v in vlist]
elif source_is_topo:
# source = topo, target = operator
vtarget = self._target.variables
if not has_var:
vlist = (v for v in vtarget)
self.variables = [v for v in vlist]
else:
# source = operator, target = topo
vsource = self._source.variables
if not has_var:
vlist = (v for v in vsource)
self.variables = [v for v in vlist]
assert len(self.variables) > 0
# All variables must have the same domain
self.domain = self.variables[0].domain
for v in self.variables:
assert v.domain is self.domain
@debug
def setup(self, rwork=None, iwork=None):
"""
Computes intersection of topologies and set the MPI intercommunicator.
"""
# First of all, we need to get the current topology:
if self._is_source:
self._set_topology(self._source)
elif self._is_target:
self._set_topology(self._target)
# Now we can build the bridge (intercomm)
self.bridge = BridgeInter(self._topology, self.parent,
self._source_id, self._target_id)
# And get mpi derived types
shape = self._topology.mesh.resolution
self._transfer_types = self.bridge.transferTypes(shape)
# Add this operator into wait list of
# operators listed in run_till
for op in self._run_till:
op.addRedistributeRequirement(self)
self._is_uptodate = True
def _set_topology(self, current):
"""
@param current: a topology or a computational operator
This function check if current is valid, fits with self.variables
and get its topology to set self._topology.
"""
if isinstance(current, Cartesian):
self._topology = current
else:
assert isinstance(current, Computational)
msg = 'The variables to be distributed '
msg += 'do not belong to the input operator.'
assert all(v in current.variables for v in self.variables), msg
current.discretize()
vref = self.variables[0]
vcurrent = current.variables
self._topology = vcurrent[vref]
# We ensure that all vars have
# the same topo in target/target.
for v in (v for v in self.variables if v is not vref):
assert vcurrent[v] is self._topology
def apply(self, simulation=None):
"""
Apply this operator to its variables.
@param simulation : object that describes the simulation
parameters (time, time step, iteration number ...), see
parmepy.problem.simulation.Simulation for details.
"""
for req in self.requirements:
req.wait()
# --- Standard send/recv ---
self._requests = {}
# basetag = self._mpis.rank + 1
# Comm used for send/receive operations
# It must contains all proc. of source topo and
# target topo.
refcomm = self.bridge.comm
# Map between rank and mpi types
# Loop over all required components of each variable
for v in self.variables:
rank = self._topology.comm.Get_rank()
for d in self._range_components(v):
v_name = v.name + S_DIR[d]
vtab = v.discreteFields[self._topology].data[d]
for rk in self._transfer_types:
if self._is_target:
# Set reception
self._requests[v_name + str(rk)] = \
refcomm.Irecv([vtab, 1, self._transfer_types[rk]],
source=rk, tag=rk)
if self._is_source:
self._requests[v_name + str(rk)] = \
refcomm.Issend([vtab, 1, self._transfer_types[rk]],
dest=rk, tag=rank)
self._has_requests = True
def wait(self):
if self._has_requests:
for rk in self._requests:
self._requests[rk].Wait()
self._has_requests = False
"""
@file redistribute_intra.py
Setup for data transfer/redistribution between two topologies.
"""
from parmepy.operator.continuous import Operator
from parmepy.mpi.bridge import Bridge
from abc import ABCMeta, abstractmethod
from parmepy.constants import S_DIR
from parmepy.operator import apply_decoration
class Redistribute(Operator):
"""
Bare interface to redistribute operators
"""
__metaclass__ = ABCMeta
def __init__(self, source, target, component=None,
run_till=None, **kwds):
"""
"""
# Base class initialisation
super(Redistribute, self).__init__(**kwds)
# Object (may be an operator or a topology) which handles the
# fields to be transfered
self._source = source
# Object (may an operator or a topology) which handles the fields
# to be filled in from source.
self._target = target
self.component = component
if component is None:
# All components are considered
self._range_components = lambda v: xrange(v.nbComponents)
else:
# Only the given component is considered
assert self.component >= 0, 'component value must be positive.'
self._range_components = lambda v: (self.component)
## Bridge between topology of source and topology of target
self.bridge = None
# True if some MPI operations are running for the current operator.
self._has_requests = False
# Which operator must wait for this one before
# any computation
# Exp : run_till = op1 means that op1 will
# wait for the end of this operator before
# op1 starts its apply.
if run_till is None:
run_till = []
assert isinstance(run_till, list)
self._run_till = run_till
@abstractmethod
def setup(self, rwork=None, iwork=None):
"""
Check/set the list of variables to be distributed
What must be set at setup?
---> the list of continuous variables to be distributed
---> the bridge (one for all variables, which means
that all vars must have the same topology in source
and the same topology in target.
---> the list of discrete variables for source and
for target.
"""
def _check_operator(self, op):
"""
@param op : a computational operator
- check if op is really a computational operator
- discretize op
- check if all required variables (if any) belong to op
"""
from parmepy.operator.computational import Computational
assert isinstance(op, Computational)
op.discretize()
msg = 'The variables to be distributed '
msg += 'do not belong to the input operator.'
if len(self.variables) > 0:
assert all(v in op.variables for v in self.variables), msg
def _set_variables_and_topos(self, source_is_topo, target_is_topo):
"""
@param source_is_topo : true if source is a topo
@param target_is_topo : true if target is a topo
Internal function to set the list
of variables to be distributed
and to set source/target topologies
"""
# If variables are not set at init,
# they must be infered from source/target operators.
has_var = len(self.variables) > 0
vlist = (v for v in self.variables)
# both source and target are topologies. Variables required.
if source_is_topo and target_is_topo:
msg = 'Redistribute, a list of variables is required at init.'
assert has_var, msg
self.variables = [v for v in vlist]
# Just connect source/target topo to build bridge
# and discrete fields dictionnaries.
topo_source = self._source
topo_target = self._target
elif not source_is_topo and not target_is_topo:
# both source and target are operators
# --> intersection of their variables
vsource = self._source.variables
vtarget = self._target.variables
if not has_var:
vlist = (v for v in vsource if v in vtarget)
self.variables = [v for v in vlist]
# A reference variable, used to set topo source/target
vref = self.variables[0]
topo_source = vsource[vref]
topo_target = vtarget[vref]
# We ensure that all vars have
# the same topo in source/target.
for v in (v for v in self.variables if v is not vref):
assert vsource[v] is topo_source
assert vtarget[v] is topo_target
elif source_is_topo:
# source = topo, target = operator
vtarget = self._target.variables
if not has_var:
vlist = (v for v in vtarget)
self.variables = [v for v in vlist]
# A reference variable, used to set topo source/target
vref = self.variables[0]
topo_source = self._source
topo_target = self._target.variables[vref]
# We ensure that all vars have
# the same topo in source/target.
for v in (v for v in self.variables if v is not vref):
assert vtarget[v] is topo_target
else:
# source = operator, target = topo
vsource = self._source.variables
if not has_var:
vlist = (v for v in vsource)
self.variables = [v for v in vlist]
# A reference variable, used to set topo source/target
vref = self.variables[0]
topo_source = self._source.variables[vref]
topo_target = self._target
# We ensure that all vars have
# the same topo in source operator
for v in (v for v in self.variables if v is not vref):
assert vsource[v] is topo_source
return topo_source, topo_target
@abstractmethod
def apply(self, simulation=None):
"""
distribute data ...
"""
@abstractmethod
def wait(self):
"""
Wait for remaing MPI requests of this operator.
"""
def finalize(self):
# wait for all remaining comm
self.wait()
def printComputeTime(self):
pass
def get_profiling_info(self):
pass
class RedistributeIntra(Redistribute):
"""
Data transfer
"""
def __init__(self, **kwds):
"""
@param
"""
# Base class initialisation
super(RedistributeIntra, self).__init__(**kwds)
# Warning : comm from io_params will be used as
# reference for all mpi communication of this operator.
# --> rank computed in refcomm
# --> source and target must work inside refcomm
# If io_params is None, refcomm will COMM_WORLD.
# Dictionnary of discrete fields to be sent
self._vsource = {}
# Dictionnary of discrete fields to be overwritten
self._vtarget = {}
# dictionnary which maps rank with mpi derived type
# for send operations
self._send = {}
# dictionnay which maps rank with mpi derived type
# for send operations
self._receive = {}
# dictionnary which map rank/field name with a
# receive request
self._r_request = None
# dictionnary which map rank/field name with a
# send request
self._s_request = None
def setup(self, rwork=None, iwork=None):
"""
Build bridge ...
"""
# At setup, source and topo must be either
# a parmepy.mpi.topology.Cartesian or
# a computational operator.
msg = 'Redistribute error : undefined source of target.'
assert self._source is not None and self._target is not None, msg
from parmepy.mpi.topology import Cartesian
source_is_topo = isinstance(self._source, Cartesian)
target_is_topo = isinstance(self._target, Cartesian)
# Check input operators
if not source_is_topo:
self._check_operator(self._source)
if not target_is_topo:
self._check_operator(self._target)
# target operator must wait for
# the end of this operator to apply.
self._run_till.append(self._target)
t_source, t_target = self._set_variables_and_topos(source_is_topo,
target_is_topo)
source_res = t_source.mesh.discretization.resolution
target_res = t_target.mesh.discretization.resolution
msg = 'Redistribute error: source and target must '
msg += 'have the same global resolution.'
assert (source_res == target_res).all(), msg
# Set the dictionnaries of source/target variables
self._vsource = {v: v.discretize(t_source)
for v in self.variables}
self._vtarget = {v: v.discretize(t_target)
for v in self.variables}
# We can create the bridge
self.bridge = Bridge(t_source, t_target)
# Add this operator into wait list of
# operators listed in run_till
for op in self._run_till:
op.addRedistributeRequirement(self)
# Shape of reference is the shape of source/target mesh
vref = self.variables[0]
shape = self._vsource[vref].topology.mesh.resolution
self._send = self.bridge.sendTypes(shape)
shape = self._vtarget[vref].topology.mesh.resolution
self._receive = self.bridge.recvTypes(shape)
self._is_uptodate = True
self._is_discretized = True
def apply(self, simulation=None):
# Try different way to send vars?
# - Buffered : copy all data into a buffer and send/recv
# - Standard : one send/recv per component
# --- Standard send/recv ---
br = self.bridge
# reset send/recv requests
self._r_request = {}
self._s_request = {}
basetag = self._mpis.rank + 1
# Comm used for send/receive operations
# It must contains all proc. of source topo and
# target topo.
refcomm = self.bridge.comm
# Loop over all required components of each variable
for v in self.variables:
for d in self._range_components(v):
v_name = v.name + S_DIR[d]
# Deal with local copies of data
if br.hasLocalInter():
vTo = self._vtarget[v].data[d]
vFrom = self._vsource[v].data[d]
vTo[br.localTargetInd()] = vFrom[br.localSourceInd()]
# Transfers to other mpi processes
for rk in self._receive:
recvtag = basetag * 989 + (rk + 1) * 99 + (d + 1) * 88
mpi_type = self._receive[rk]
vTo = self._vtarget[v].data[d]
self._r_request[v_name + str(rk)] = \
refcomm.Irecv([vTo, 1, mpi_type],
source=rk, tag=recvtag)
self._has_requests = True
for rk in self._send:
sendtag = (rk + 1) * 989 + basetag * 99 + (d + 1) * 88
mpi_type = self._send[rk]
vFrom = self._vsource[v].data[d]
self._s_request[v_name + str(rk)] = \
refcomm.Issend([vFrom, 1, mpi_type],
dest=rk, tag=sendtag)
self._has_requests = True
def wait(self):
if self._has_requests:
for rk in self._r_request:
self._r_request[rk].Wait()
for rk in self._s_request:
self._s_request[rk].Wait()
self._has_requests = False
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment