From c0e57fcbcedb22a9a9a730011c743ce1a599daf5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franck=20P=C3=A9rignon?= <franck.perignon@imag.fr> Date: Mon, 15 Sep 2014 18:13:44 +0200 Subject: [PATCH] Clean redistribute op. --- .../hysop/operator/SAVE_REDIS/redistribute.py | 372 ----------------- .../SAVE_REDIS/redistribute_intercomm.py | 343 --------------- HySoP/hysop/operator/redistribute_inter.py | 268 ------------ HySoP/hysop/operator/redistribute_intra.py | 391 ++++++++---------- HySoP/hysop/operator/redistribute_new.py | 182 ++++++++ 5 files changed, 355 insertions(+), 1201 deletions(-) delete mode 100644 HySoP/hysop/operator/SAVE_REDIS/redistribute.py delete mode 100644 HySoP/hysop/operator/SAVE_REDIS/redistribute_intercomm.py delete mode 100644 HySoP/hysop/operator/redistribute_inter.py create mode 100644 HySoP/hysop/operator/redistribute_new.py diff --git a/HySoP/hysop/operator/SAVE_REDIS/redistribute.py b/HySoP/hysop/operator/SAVE_REDIS/redistribute.py deleted file mode 100644 index 608425c6b..000000000 --- a/HySoP/hysop/operator/SAVE_REDIS/redistribute.py +++ /dev/null @@ -1,372 +0,0 @@ -""" -@file redistribute.py -Setup for data transfer/redistribution between two parmes topologies. - -This operator is an inter operator which is supposed to define the process to -transfer variables and data from one operator to another. -This mainly concerns data redistribution if the two operators work on -different mpi topologies. - -When is it required to define an operator between op1 and op2? -If: -- the intersection between op1.output-variables and op2.input-variables - is not empty -- AND if the topology on which the variables of op1 are defined is different -from the one of op2 variables. - -Note Franck: this kind of operator may also be useful -to define the interpolation/filter process for data transfer for -a variable defined on several meshes. - -""" -from parmepy import __VERBOSE__ -from parmepy.constants import debug, PARMES_MPI_REAL, ORDERMPI, np, S_DIR -from parmepy.operator.continuous import Operator -from parmepy.mpi.bridge import Bridge -from parmepy.methods_keys import Support - - -class Redistribute(Operator): - """ - Interconnection between two operators. - SetUp will compute (or get if it already exists) a Bridge between two - topologies. - Apply redistributes data from opFrom topology to opTo topology. - - """ - @debug - def __init__(self, opFrom, opTo, name_suffix=None, component=None, **kwds): - - """ - Create an operator to distribute data between two mpi topologies for a - list of variables belonging to two operators. - - @param variables : the set of variables to be redistributed - @param opFrom : source operator - @param opTo : target (i.e.) the operator that handles the topology on - which data must be redistributed. - @param component: components of vector fields to consider (default: - None, all components are taken). - """ - super(Redistribute, self).__init__(**kwds) - vars_str = "_(" - for vv in self.variables: - vars_str += vv.name + "," - vars_str = vars_str[:-1] + ')' - if component is not None: - vars_str += S_DIR[component] - if name_suffix is None: - name_suffix = '' - self.name += vars_str + name_suffix - ## Source Operator - self.opFrom = opFrom - ## Targeted operator - self.opTo = opTo - - self.input = self.output = self.variables - self.evts = [] - self._toHost_fields = [] - self._toDevice_fields = [] - self._hasRequests = False - self.component = component - if component is None: - # All components are considered - self._range_components = lambda v: range(v.nbComponents) - else: - # Only the given component is considered - self._range_components = lambda v: [component] - self.r_request = {} - self.s_request = {} - self._r_types = {} - self._s_types = {} - for v in self.variables: - self._r_types[v] = {} - self._s_types[v] = {} - - # Enable desynchronization: the opTo operator must call the wait - # function of this redistribute. This operator has to know self. - self.opTo.addRedistributeRequirement(self) - - @debug - def setup(self): - """ - Computes intersection of two topologies. - - """ - # Then check if variables belong to both operators - # And check if variables have enought components. - for v in self.variables: - assert v in self.opFrom.variables and v in self.opTo.variables, \ - 'Redistribute error : one of the variable is not present\ - in both source and target operator.' - if self.component is not None: - assert self.component >= 0, 'component needs to be positive' - assert v.nbComponents > self.component, \ - 'Redistribute error : variable ' + str(v.name) + ' do not \ - have enough components (' + str(self.component) + ')' - assert self.opFrom.isUp() and self.opTo.isUp(), \ - """You should setup both opFrom and opTo operators - before any attempt to setup a redistribute operator.""" - - # Look for an operator operating on device. - try: - opFrom_is_device = \ - self.opFrom.method[Support].find('gpu') >= 0 - except KeyError: # op.method is a dict not containing Support in keys - opFrom_is_device = False - except IndexError: # op.method is a string - opFrom_is_device = False - except TypeError: # op.method is None - opFrom_is_device = False - try: - opTo_is_device = \ - self.opTo.method[Support].find('gpu') >= 0 - except KeyError: # op.method is a dict not containing Support in keys - opTo_is_device = False - except IndexError: # op.method is a sting - opTo_is_device = False - except TypeError: # op.method is None - opTo_is_device = False - - if not opFrom_is_device and not opTo_is_device: - # case: opFrom(host) --host--> opTo(host) - self.apply = self._host - self.wait = self._wait_host - else: - # Have on device operators - self.wait = self._wait_all - if opFrom_is_device and not opTo_is_device: - # case: opFrom(GPU) --toHost--host--> opTo(host) - self.apply = self._apply_toHost_host - elif not opFrom_is_device and opTo_is_device: - # case: opFrom(host) --host--toDevice--> opTo(GPU) - self.apply = self._apply_host_toDevice - else: - # case: opFrom(GPU) --toHost--host--toDevice--> opTo(host) - # Transfers are removed if variables are batched - if np.any([self.opFrom.discreteFields[v].isBatch - for v in self.variables] + - [self.opTo.discreteFields[v].isBatch - for v in self.variables]): - self.apply = self._host - else: - self.apply = self._apply_toHost_host_toDevice - - # Build bridges and toTransfer lists - self.bridges = {} - backup = None - lastvar = None - # Create bridges between topologies, for each variable. - for v in self.variables: - # Bridges creation - topofrom = self.opFrom.discreteFields[v].topology - topoto = self.opTo.discreteFields[v].topology - if backup is not None: - # Check if a similar bridge has not already been created. - if [topofrom, topoto] == backup: - self.bridges[v] = self.bridges[lastvar] - else: - self.bridges[v] = Bridge(topofrom, topoto) - backup = [topofrom, topoto] - lastvar = v - # toTransfer list completion - if opFrom_is_device: - self._toHost_fields.append(self.opFrom.discreteFields[v]) - if opTo_is_device: - self._toDevice_fields.append(self.opTo.discreteFields[v]) - - self._main_comm = self.opFrom.discreteFields[v].topology.parent() - self._main_rank = self._main_comm.Get_rank() - - # Flag telling if there will be some mpi data transfers. - self._useless_transfer = {} - for v in self.variables: - self._useless_transfer[v] = \ - (opFrom_is_device and opTo_is_device) and \ - len(self.bridges[v].recvFrom.keys()) == 0 and \ - len(self.bridges[v].sendTo.keys()) == 0 - - # Build MPI subarrays - dim = self.domain.dimension - for v in self.variables: - br = self.bridges[v] - vToShape = self.opTo.discreteFields[v].data[0].shape - vFromShape = self.opFrom.discreteFields[v].data[0].shape - for rk in br.recvFrom.keys(): - subvshape = tuple([br.recvFrom[rk][i].stop - - br.recvFrom[rk][i].start - for i in range(dim)]) - substart = tuple([br.recvFrom[rk][i].start - for i in range(dim)]) - self._r_types[v][rk] = \ - PARMES_MPI_REAL.Create_subarray(vToShape, - subvshape, - substart, - order=ORDERMPI) - self._r_types[v][rk].Commit() - for rk in br.sendTo.keys(): - subvshape = tuple([br.sendTo[rk][i].stop - - br.sendTo[rk][i].start - for i in range(dim)]) - substart = tuple([br.sendTo[rk][i].start - for i in range(dim)]) - self._s_types[v][rk] = \ - PARMES_MPI_REAL.Create_subarray(vFromShape, - subvshape, - substart, - order=ORDERMPI) - self._s_types[v][rk].Commit() - - self._is_uptodate = True - - def _apply_toHost_host_toDevice(self, simulation=None): - if __VERBOSE__: - print ("{0} APPLY toHOST+HOST+toDEVICE".format(self._main_rank)) - self._toHost() - self._wait_device() - self._host() - self._wait_host() - self._toDevice() - - def _apply_toHost_host(self, simulation=None): - if __VERBOSE__: - print ("{0} APPLY toHOST+HOST".format(self._main_rank)) - self._toHost() - self._wait_device() - self._host() - - def _apply_host_toDevice(self, simulation=None): - if __VERBOSE__: - print ("{0} APPLY HOST+toDEVICE".format(self._main_rank)) - self._host() - self._wait_host() - self._toDevice() - - def _toHost(self): - """ - Proceed with data transfer of variables from device to host - """ - for v in self.variables: - dv = self.opFrom.discreteFields[v] - if dv in self._toHost_fields: - if not self._useless_transfer[v]: - dv.toHost(self.component) - - def _toDevice(self): - """ - Proceed with data transfer of variables from device to host - """ - for v in self.variables: - dv = self.opTo.discreteFields[v] - if dv in self._toDevice_fields: - if not self._useless_transfer[v]: - dv.toDevice(self.component) - - def _host(self, simulation=None): - """ - Proceed with data redistribution from opFrom to opTo - """ - # TODO : - # - save a set of bridges in the domain and access them from operator - # - process all variables in one shot if they have the same topo - # (use buffers for mpi send/recv? ) - # - move MPI datatypes into the bridge? --> and free MPI type properly - if __VERBOSE__: - print ("{0} APPLY HOST".format(self._main_rank)) - self.r_request = {} - self.s_request = {} - for v in self.variables: - br = self.bridges[v] - # Apply for each component considered - for d in self._range_components(v): - if __VERBOSE__: - print ("{0} APPLY HOST".format(self._main_rank), - self.opFrom.discreteFields[v].name, '->', - self.opTo.discreteFields[v].name, S_DIR[d]) - vTo = self.opTo.discreteFields[v].data[d] - vFrom = self.opFrom.discreteFields[v].data[d] - v_name = self.opFrom.discreteFields[v].name + S_DIR[d] - if br.hasLocalInter: - vTo[br.ito] = vFrom[br.ifrom] - cRk = self._main_comm.Get_rank() - for rk in br.recvFrom.keys(): - recvtag = (cRk + 1) * 989 + (rk + 1) * 99 + (d + 1) * 88 - self.r_request[v_name + str(rk)] = \ - self._main_comm.Irecv([vTo, 1, self._r_types[v][rk]], - source=rk, tag=recvtag) - self._hasRequests = True - for rk in br.sendTo.keys(): - sendtag = (rk + 1) * 989 + (cRk + 1) * 99 + (d + 1) * 88 - self.s_request[v_name + str(rk)] = \ - self._main_comm.Issend([vFrom, 1, - self._s_types[v][rk]], - dest=rk, tag=sendtag) - self._hasRequests = True - - def _wait_host(self): - """ - MPI Barrier to wait for the end - of all communication requests. - """ - if __VERBOSE__: - print ("{0}", "WAIT MPI".format(self._main_rank), - self._hasRequests) - if self._hasRequests: - for rk in self.r_request.keys(): - self.r_request[rk].Wait() - for rk in self.s_request.keys(): - self.s_request[rk].Wait() - self._hasRequests = False - - def _wait_device(self): - if __VERBOSE__: - print ("{0}".format(self._main_rank), "WAITING OPENCL") - for dv in self._toDevice_fields + self._toHost_fields: - dv.wait() - - def _wait_all(self): - self._wait_host() - self._wait_device() - - def test(self, rsend=None, rrecv=None): - """ - if neither rsend or rrecv is given return - True if all communication request are complete - else check for sending to rsend or - receiving from rrecv. Process ranks - should be given in main_comm. - @param rsend : discrete variable name + S_DIR + rank of the process - to which a message has been sent - and for which we want to test - message completion. - @param rrecv : discrete variable name + S_DIR + rank of the process - from which a message has been receive - and for which we want to test - message completion. - """ - if(rsend is not None or rrecv is not None): - send_res = True - recv_res = True - if rsend is not None: - send_res = self.s_request[rsend].Test() - if rrecv is not None: - recv_res = self.r_request[rrecv].Test() - res = send_res and recv_res - else: - res = True - for rk in self.r_request.keys(): - res = self.r_request[rk].Test() - if not res: - return res - for rk in self.s_request.keys(): - res = self.s_request[rk].Test() - if not res: - return res - return res - - def addRedistributeRequirement(self, red): - raise ValueError( - "Cannot add a requirement to a Redistribute operator.") - - def getRedistributeRequirement(self): - return [] diff --git a/HySoP/hysop/operator/SAVE_REDIS/redistribute_intercomm.py b/HySoP/hysop/operator/SAVE_REDIS/redistribute_intercomm.py deleted file mode 100644 index 749723490..000000000 --- a/HySoP/hysop/operator/SAVE_REDIS/redistribute_intercomm.py +++ /dev/null @@ -1,343 +0,0 @@ -""" -@file redistribute_intercomm.py -Setup for data transfer/redistribution between a single parmes topology based -on different MPI communicators with null intersection (for example -by Comm_Split). One of the topology is labeled as the source and the other is -the destination. - -It relies on a Bridge_intercomm. -""" -from parmepy.constants import debug, PARMES_MPI_REAL, ORDERMPI, S_DIR, np -from parmepy import __VERBOSE__ -from parmepy.operator.continuous import Operator -from parmepy.mpi.topology import Bridge_intercomm -from parmepy.methods_keys import Support - - -class RedistributeIntercomm(Operator): - """ - Interconnection between two topologies on different sub set of MPI process. - SetUp will compute a Bridge_intercomm between a single topology. - Transfers data from topology of id_from to the id_to. - """ - @debug - def __init__(self, op_from, op_to, proc_tasks, - parent_comm, component=None, name_suffix='', **kwds): - """ - Create an operator to distribute data between two mpi topologies for a - list of variables. - - @param variables : the set of variables to be redistributed - @param topo : Parmes topology that differs across process of the - parent_comm MPI intracommunicator. - @param id_from : id of the task considered as input. - @param id_to : id of the task considered as output. - @param proc_tasks: python array specifying the task id of each of - the parent_comm MPI intracommunicator. - @param parent_comm : Parent communicator (Each process that use this - operator must be a member of the parent_comm) - @param component : Component to consider. - @remark : proc_tasks size and number of processus in parent_comm - must be equal. - """ - super(RedistributeIntercomm, self).__init__(**kwds) - vars_str = "_(" - for vv in self.variables: - vars_str += vv.name + "," - vars_str = vars_str[:-1] + ')' - if not component is None: - vars_str += S_DIR[component] - self.name += vars_str+name_suffix - assert parent_comm.Get_size() == len(proc_tasks), \ - "Parent communicator ({0})".format(parent_comm.Get_size()) + \ - " and size of the task id array " + \ - "({0}) are not equal".format(len(proc_tasks)) - self.opFrom = op_from - self.opTo = op_to - self.id_from = self.opFrom.task_id - self.id_to = self.opTo.task_id - self.parent_comm = parent_comm - self._dim = self.variables[0].domain.dimension - self.proc_tasks = proc_tasks - self.input = self.output = self.variables - self.component = component - if component is None: - # All components are considered - self._range_components = lambda v: range(v.nbComponents) - else: - # Only the given component is considered - self._range_components = lambda v: [component] - - self.bridges = {} - self.r_request = {} - self.s_request = {} - self._r_types = {} - self._s_types = {} - for v in self.variables: - self._r_types[v] = {} - self._s_types[v] = {} - self._toHost_fields = [] - self._toDevice_fields = [] - self._parent_rank = self.parent_comm.Get_rank() - self._my_rank = None - - def discretize(self): - - for v in self.variables: - if self.topology is None: - if self.proc_tasks[self._parent_rank] == self.id_from: - self.topology = self.opFrom.discreteFields[v].topology - else: - self.topology = self.opTo.discreteFields[v].topology - - self._my_rank = self.topology.comm.Get_rank() - self._dim = self.topology.domain.dimension - - for v in self.variables: - self.discreteFields[v] = v.discretize(self.topology) - - @debug - def setup(self): - """ - Computes intersection of topologies and set the MPI intercommunicator. - """ - assert self.topology.is_uptodate, \ - """You should setup topology - before any attempt to setup a redistribute operator.""" - - # Look for an operator opertating on device. - try: - opFrom_is_device = \ - self.opFrom.method[Support].find('gpu') >= 0 - except KeyError: # op.method is a dict not containing Support in keys - opFrom_is_device = False - except IndexError: # op.method is a sting - opFrom_is_device = False - except TypeError: # op.method is None - opFrom_is_device = False - try: - opTo_is_device = \ - self.opTo.method[Support].find('gpu') >= 0 - except KeyError: # op.method is a dict not containing Support in keys - opTo_is_device = False - except IndexError: # op.method is a sting - opTo_is_device = False - except TypeError: # op.method is None - opTo_is_device = False - - if not opFrom_is_device and not opTo_is_device: - # case: opFrom(host) --bridge--> opTo(host) - self._the_apply = self._apply_host - else: - # Have on device operators - if opFrom_is_device and not opTo_is_device: - # case: opFrom(GPU) --toHost--bridge--> opTo(host) - self._the_apply = self._apply_toHost_host - elif not opFrom_is_device and opTo_is_device: - # case: opFrom(host) --bridge--toDevice--> opTo(GPU) - self._the_apply = self._apply_host_toDevice - else: - # case: opFrom(GPU) --toHost--bridge--toDevice--> opTo(host) - # Transfers are removed if variables are batched - if np.any([self.opFrom.discreteFields[v].isBatch - for v in self.variables] + - [self.opTo.discreteFields[v].isBatch - for v in self.variables]): - self._the_apply = self._host - else: - self._the_apply = self._apply_toHost_host_toDevice - - # Build bridges and toTransfer lists - self.bridge = Bridge_intercomm(self.topology, self.parent_comm, - self.id_from, self.id_to, - self.proc_tasks) - - for v in self.variables: - # toTransfer list completion - if self.proc_tasks[self._parent_rank] == self.id_from: - if opFrom_is_device: - self._toHost_fields.append(self.opFrom.discreteFields[v]) - if self.proc_tasks[self._parent_rank] == self.id_to: - if opTo_is_device: - self._toDevice_fields.append(self.opTo.discreteFields[v]) - - for v in self.variables: - dv = v.discreteFields[self.topology] - transfers = self.bridge.transfers - # Set reception - if self.proc_tasks[self._parent_rank] == self.id_to: - for from_rk in transfers.keys(): - subshape = tuple( - [transfers[from_rk][i][1] - transfers[from_rk][i][0] - for i in range(self._dim)]) - substart = tuple( - [transfers[from_rk][i][0] for i in range(self._dim)]) - self._r_types[v][from_rk] = \ - PARMES_MPI_REAL.Create_subarray(dv.data[0].shape, - subshape, - substart, - order=ORDERMPI) - self._r_types[v][from_rk].Commit() - # Set Sending - if self.proc_tasks[self._parent_rank] == self.id_from: - for to_rk in transfers.keys(): - subshape = tuple( - [transfers[to_rk][i][1] - transfers[to_rk][i][0] - for i in range(self._dim)]) - substart = tuple( - [transfers[to_rk][i][0] for i in range(self._dim)]) - self._r_types[v][to_rk] = \ - PARMES_MPI_REAL.Create_subarray(dv.data[0].shape, - subshape, - substart, - order=ORDERMPI) - self._r_types[v][to_rk].Commit() - self._is_uptodate = True - - @debug - def apply(self, simulation=None): - """ - Apply this operator to its variables. - @param simulation : object that describes the simulation - parameters (time, time step, iteration number ...), see - parmepy.problem.simulation.Simulation for details. - """ - for req in self.requirements: - req.wait() - self._the_apply(simulation) - - def _apply_toHost_host_toDevice(self, simulation=None): - if __VERBOSE__: - print ("{0} APPLY toHOST+HOST+toDEVICE".format(self._parent_rank)) - if self.proc_tasks[self._parent_rank] == self.id_from: - self._toHost() - self._wait_device() - self._host() - self._wait_host() - if self.proc_tasks[self._parent_rank] == self.id_to: - self._toDevice() - self._wait_device() - - def _apply_toHost_host(self, simulation=None): - - if __VERBOSE__: - print ("{0} APPLY toHOST+HOST".format(self._parent_rank)) - if self.proc_tasks[self._parent_rank] == self.id_from: - self._toHost() - self._wait_device() - self._host() - self._wait_host() - - def _apply_host_toDevice(self, simulation=None): - if __VERBOSE__: - print ("{0} APPLY HOST+toDEVICE".format(self._parent_rank)) - self._host() - self._wait_host() - self.parent_comm.Barrier() - if self.proc_tasks[self._parent_rank] == self.id_to: - self._toDevice() - self._wait_device() - self.parent_comm.Barrier() - - def _apply_host(self, simulation=None): - if __VERBOSE__: - print ("{0} APPLY HOST".format(self._parent_rank)) - self._host() - self._wait_host() - - def _host(self, simulation=None): - """ - Proceed with data redistribution from opFrom to opTo - """ - self.parent_comm.Barrier() - self.r_request = {} - self.s_request = {} - for v in self.variables: - dv = v.discreteFields[self.topology] - transfers = self.bridge.transfers - for d in self._range_components(v): - v_name = dv.name + S_DIR[d] - # Set reception - if self.proc_tasks[self._parent_rank] == self.id_to: - for from_rk in transfers.keys(): - self.r_request[v_name + str(from_rk)] = \ - self.bridge.inter_comm.Irecv( - [dv.data[d], 1, self._r_types[v][from_rk]], - source=from_rk, tag=from_rk) - # Set Sending - if self.proc_tasks[self._parent_rank] == self.id_from: - for to_rk in transfers.keys(): - self.s_request[v_name + str(to_rk)] = \ - self.bridge.inter_comm.Issend( - [dv.data[d], 1, self._r_types[v][to_rk]], - dest=to_rk, tag=self._my_rank) - - def _toHost(self): - """ - Proceed with data transfer of variables from device to host - """ - if __VERBOSE__: - print ("{0} APPLY toHOST".format(self._parent_rank)) - for v in self.variables: - dv = self.opFrom.discreteFields[v] - if dv in self._toHost_fields: - dv.toHost(self.component) - - def _toDevice(self): - """ - Proceed with data transfer of variables from device to host - """ - if __VERBOSE__: - print ("{0} APPLY toDEVICE".format(self._parent_rank)) - for v in self.variables: - dv = self.opTo.discreteFields[v] - if dv in self._toDevice_fields: - dv.toDevice(self.component) - - def _wait_device(self): - if __VERBOSE__: - print ("{0} WAIT OPENCL".format(self._parent_rank)) - for dv in self._toDevice_fields + self._toHost_fields: - dv.wait() - - def _wait_host(self, simulation=None): - """Wait for requests completion.""" - if __VERBOSE__: - print ("{0} WAIT MPI".format(self._parent_rank)) - for rk in self.r_request: - self.r_request[rk].Wait() - for rk in self.s_request: - self.s_request[rk].Wait() - self.parent_comm.Barrier() - self.r_request = [] - self.s_request = [] - - def test(self, rsend=None, rrecv=None): - """ - if neither rsend or rrecv is given return - True if all communication request are complete - else check for sending to rsend or - receiving from rrecv. Process ranks - should be given in local communicator. - @param rsend : variable name + S_DIR + rank - @param rrecv : variable name + S_DIR + rank - """ - if(rsend is not None or rrecv is not None): - send_res = True - recv_res = True - if rsend is not None: - send_res = self.s_request[rsend].Test() - if rrecv is not None: - recv_res = self.r_request[rrecv].Test() - res = send_res and recv_res - else: - res = True - for rk in self.r_request.keys(): - res = self.r_request[rk].Test() - if not res: - return res - for rk in self.s_request.keys(): - res = self.s_request[rk].Test() - if not res: - return res - return res diff --git a/HySoP/hysop/operator/redistribute_inter.py b/HySoP/hysop/operator/redistribute_inter.py deleted file mode 100644 index 58fc60a5a..000000000 --- a/HySoP/hysop/operator/redistribute_inter.py +++ /dev/null @@ -1,268 +0,0 @@ -""" -@file redistribute_intercomm.py -Setup for data transfer/redistribution between a single parmes topology based -on different MPI communicators with null intersection (for example -by Comm_Split). One of the topology is labeled as the source and the other is -the destination. - -It relies on a Bridge_intercomm. -""" -from parmepy.constants import debug, S_DIR -from parmepy import __VERBOSE__ -from parmepy.mpi.newBridge import InterBridge -from parmepy.operator.redistribute import Redistribute -from parmepy.operator.continuous import Operator - - -class RedistributeInter(Redistribute): - """ - Interconnection between two topologies on different sub set of MPI process. - SetUp will compute a Bridge_intercomm between a single topology. - Transfers data from topology of id_from to the id_to. - """ - @debug - def __init__(self, proc_tasks, sourceId=None, targetId=None, **kwds): - """ - Create an operator to distribute data between two mpi topologies for a - list of variables. - - @param proc_tasks: python array specifying the task id of each of - the parent_comm MPI intracommunicator. - @remark : proc_tasks size and number of processus in parent_comm - must be equal. - """ - super(RedistributeInter, self).__init__(**kwds) - # Change task_id of the current operator: -1 means - # it belongs to several tasks. - self.task_id = -1 - assert 'parent_comm' in kwds, 'A parent communicor must be set.' - assert self._parent_comm.Get_size() == len(proc_tasks), \ - "Parent communicator ({0})".format(self._parent_comm.Get_size()) +\ - " and size of the task id array " + \ - "({0}) are not equal".format(len(proc_tasks)) - ## connectivity between tasks and procs - self.proc_tasks = proc_tasks - self._my_rank = None - self._sourceId = sourceId - self._targetId = targetId - msg = 'You must provide both source and target arguments.' - if 'source' in kwds: - assert 'target' in kwds, msg - if 'target' in kwds: - assert 'source' in kwds, msg - - # If ids can not be deduced from source/target : - if not isinstance(self._source, Operator): - msg = 'You must set sourceId and targetId in arguments.' - assert sourceId is not None, msg - assert targetId is not None, msg - - else: - msg = 'source/target ids arg are useless when ' - msg += 'sourceOp, targetOp are used.' - assert 'sourceId' not in kwds, msg - assert 'targetId' not in kwds, msg - self._sourceId = self._source.task_id - self._targetId = self._target.task_id - - def _vars_setup_fromdict(self): - """ - set vSource/vTarget dictionnaries and create bridges - @param varsDict: {v: (topoSource, topoTarget)_v, - w: (topoSource, topoTarget)_w, - ...} - - Warning : this routine can not be called during init, topologies may - not exist if the operator does not belong to the current - MPI task. - """ - msg = 'Too many arguments in Operator init/setup.' - msg += 'You must choose either variables = "list + source + target"' - msg += ' or variables = "dictionnary".' - assert self._source is None and self._target is None, msg - for v in self.variables: - topoFrom = self.variables[v][0] - topoTo = self.variables[v][1] - self.bridges[v] = InterBridge(topoFrom, topoTo, - self._sourceId, self._targetId, - proc_tasks=self.proc_tasks, - parent_comm=self._parent_comm) - - def _vars_setup_fromlist(self): - """ - set vSource/vTarget dictionnaries, when all source/target variables - have the same topology. - @param source_target : a tuple of topologies (1) or operators (2) - case 1 : - Redistribute(variables=[v, w, ...], source=topo1, target=topo2, ...) - case 2 : - Redistribute([v, w, ...], source=op1, target=op2, ...) - """ - from parmepy.operator.continuous import Operator - if not isinstance(self._source, Operator): - for v in self.variables: - self.variables[v] = (self._source, self._target) - self.bridges[v] = InterBridge(self._source, self._target, - self._sourceId, self._targetId, - self.proc_tasks, - self._parent_comm) - - ## elif isinstance(self._source, Operator): - ## for v in self.variables: - ## # check if variables belong to both operators - ## # And check if variables have enougth components. - ## self._checkOperators(v) - ## # Then get topologies and build the bridges - ## self.vSource[v] = self._source.discreteFields[v] - ## self.vTarget[v] = self._target.discreteFields[v] - ## topoSource = self.vSource[v].topology - ## topoTarget = self.vTarget[v].topology - ## self.variables[v] = (topoSource, topoTarget) - ## self.bridges[v] = InterBridge(topoSource, topoTarget) - else: - raise ValueError("Source/target type must be\ - either Cartesian or Operator.") - - @debug - def setup(self): - """ - Computes intersection of topologies and set the MPI intercommunicator. - """ - super(RedistributeInter, self).setup() - self._parent_rank = self._parent_comm.Get_rank() - - self._isSource = self.proc_tasks[self._parent_rank] == self._sourceId - self._isTarget = self.proc_tasks[self._parent_rank] == self._targetId - - self._the_apply = self._apply_host - - if self._isSource: - self._currentIndex = 0 - elif self._isTarget: - self._currentIndex = 1 - - for v in self.variables: - topology = self.variables[v][self._currentIndex] - dv = v.discreteFields[topology] - transfers = self.bridges[v].transfers - vShape = dv.data[0].shape - # Set derived types - self.bridges[v]._createSubArray(transfers, self._r_types[v], - vShape) - self._is_uptodate = True - - @debug - def apply(self, simulation=None): - """ - Apply this operator to its variables. - @param simulation : object that describes the simulation - parameters (time, time step, iteration number ...), see - parmepy.problem.simulation.Simulation for details. - """ - for req in self.requirements: - req.wait() - self._the_apply(simulation) - - def _apply_toHost_host_toDevice(self, simulation=None): - if __VERBOSE__: - print ("{0} APPLY toHOST+HOST+toDEVICE".format(self._parent_rank)) - if self.proc_tasks[self._parent_rank] == self.id_from: - self._toHost() - self._wait_device() - self._host() - self._wait_host() - if self.proc_tasks[self._parent_rank] == self.id_to: - self._toDevice() - self._wait_device() - - def _apply_toHost_host(self, simulation=None): - - if __VERBOSE__: - print ("{0} APPLY toHOST+HOST".format(self._parent_rank)) - if self.proc_tasks[self._parent_rank] == self.id_from: - self._toHost() - self._wait_device() - self._host() - self._wait_host() - - def _apply_host_toDevice(self, simulation=None): - if __VERBOSE__: - print ("{0} APPLY HOST+toDEVICE".format(self._parent_rank)) - self._host() - self._wait_host() - self._parent_comm.Barrier() - if self.proc_tasks[self._parent_rank] == self.id_to: - self._toDevice() - self._wait_device() - self._parent_comm.Barrier() - - def _apply_host(self, simulation=None): - if __VERBOSE__: - print ("{0} APPLY HOST".format(self._parent_rank)) - self._host() - self._wait_host() - - def _host(self, simulation=None): - """ - Proceed with data redistribution from opFrom to opTo - """ - self._parent_comm.Barrier() - self.r_request = {} - self.s_request = {} - - for v in self.variables: - topology = self.variables[v][self._currentIndex] - rank = topology.comm.Get_rank() - dv = v.discreteFields[topology] - transfers = self.bridges[v].transfers - for d in self._range_components(v): - v_name = dv.name + S_DIR[d] - # Set reception - if self._isSource: - for from_rk in transfers.keys(): - self.r_request[v_name + str(from_rk)] = \ - self.bridges[v].inter_comm.Irecv( - [dv.data[d], 1, self._r_types[v][from_rk]], - source=from_rk, tag=from_rk) - # Set Sending - if self._isTarget: - for to_rk in transfers.keys(): - self.s_request[v_name + str(to_rk)] = \ - self.bridges[v].inter_comm.Issend( - [dv.data[d], 1, self._r_types[v][to_rk]], - dest=to_rk, tag=rank) - - def _toHost(self): - """ - Proceed with data transfer of variables from device to host - """ - if __VERBOSE__: - print ("{0} APPLY toHOST".format(self._parent_rank)) - for v in self.variables: - dv = self.opFrom.discreteFields[v] - if dv in self._toHost_fields: - dv.toHost(self.component) - - def _toDevice(self): - """ - Proceed with data transfer of variables from device to host - """ - if __VERBOSE__: - print ("{0} APPLY toDEVICE".format(self._parent_rank)) - for v in self.variables: - dv = self.opTo.discreteFields[v] - if dv in self._toDevice_fields: - dv.toDevice(self.component) - - def _wait_host(self, simulation=None): - """Wait for requests completion.""" - if __VERBOSE__: - print ("{0} WAIT MPI".format(self._parent_rank)) - for rk in self.r_request: - self.r_request[rk].Wait() - for rk in self.s_request: - self.s_request[rk].Wait() - self._parent_comm.Barrier() - self.r_request = [] - self.s_request = [] - diff --git a/HySoP/hysop/operator/redistribute_intra.py b/HySoP/hysop/operator/redistribute_intra.py index 3da7c0f36..cdf20a7cc 100644 --- a/HySoP/hysop/operator/redistribute_intra.py +++ b/HySoP/hysop/operator/redistribute_intra.py @@ -1,238 +1,193 @@ """ @file redistribute_intra.py -Setup for data transfer/redistribution between two parmes topologies. - -This operator is an inter operator which is supposed to define the process to -transfer variables and data from one operator to another. -This mainly concerns data redistribution if the two operators work on -different mpi topologies. - -When is it required to define an operator between op1 and op2? -If: -- the intersection between op1.output-variables and op2.input-variables - is not empty -- AND if the topology on which the variables of op1 are defined is different -from the one of op2 variables. - -Note Franck: this kind of operator may also be useful -to define the interpolation/filter process for data transfer for -a variable defined on several meshes. - +Setup for data transfer/redistribution between two topologies +or operators inside the same mpi communicator. """ -from parmepy import __VERBOSE__ -from parmepy.constants import debug, PARMES_MPI_REAL, ORDERMPI, np, S_DIR -from parmepy.mpi.bridge import Bridge -from parmepy.methods_keys import Support from parmepy.operator.redistribute import Redistribute -from parmepy.operator.continuous import Tools -from parmepy.operator.continuous import Operator class RedistributeIntra(Redistribute): """ - Interconnection between two operators. - SetUp will compute (or get if it already exists) a Bridge between two - topologies. - Apply redistributes data from opFrom topology to opTo topology. - + Data transfer between two operators/topologies. + Source and target must: + - be defined on the same communicator + - work on the same number of mpi process + - work with the same global resolution """ - @debug def __init__(self, **kwds): - """ - Create an operator to distribute data between two mpi topologies for a - list of variables belonging to two operators. - - @param variables : the set of variables to be redistributed - @param opFrom : source operator - @param opTo : target (i.e.) the operator that handles the topology on - which data must be redistributed. - @param component: components of vector fields to consider (default: - None, all components are taken). + @param """ + # Base class initialisation super(RedistributeIntra, self).__init__(**kwds) - self._hasRequests = False - - # Enable desynchronization: the opTo operator must call the wait - # function of this redistribute. This operator has to know self. - ##self.opTo.addRedistributeRequirement(self) - - @debug - def setup(self): - """ - Computes intersection of two topologies. - - """ - - msg = 'Source and target objects must be of the same type, ' - msg += 'either topology or operator.' - assert self._source.__class__ == self._target.__class__, msg - super(RedistributeIntra, self).setup() - - for v in self.variables: - val = self.variables[v] - assert val[0].task_id() == val[1].task_id() - self.task_id = val[0].task_id() - - if isinstance(self._source, Operator): - source_isGPU = Tools.checkDevice(self._source) - target_isGPU = Tools.checkDevice(self._target) - else: - source_isGPU = False - target_isGPU = False - - if not source_isGPU and not target_isGPU: - # case: opFrom(host) --host--> opTo(host) - self._the_apply = self._host - self.wait = self._wait_host - else: - # Have on device operators - self.wait = self._wait_all - if source_isGPU and not target_isGPU: - # case: opFrom(GPU) --toHost--host--> opTo(host) - self._the_apply = self._apply_toHost_host - elif not source_isGPU and target_isGPU: - # case: opFrom(host) --host--toDevice--> opTo(GPU) - self._the_apply = self._apply_host_toDevice - else: - # case: opFrom(GPU) --toHost--host--toDevice--> opTo(host) - # Transfers are removed if variables are batched - if np.any([self.vSource[v].isBatch for v in self.variables] + - [self.vTarget[v].isBatch for v in self.variables]): - self._the_apply = self._host - else: - self._the_apply = self._apply_toHost_host_toDevice - - for v in self.variables: - # toTransfer list completion - if source_isGPU: - self._toHost_fields.append(self.opFrom.discreteFields[v]) - if target_isGPU: - self._toDevice_fields.append(self.opTo.discreteFields[v]) - - if self._parent_comm is None: - self._parent_comm = self.variables.values()[0][0].parent() - self._parent_rank = self._parent_comm.Get_rank() - - # Flag telling if there will be some mpi data transfers. - self._useless_transfer = {} - for v in self.variables: - self._useless_transfer[v] = (source_isGPU and target_isGPU) and \ - self.bridges[v].uselessTransfer() - - br = self.bridges[v] - vToShape = self.vTarget[v].data[0].shape - vFromShape = self.vSource[v].data[0].shape - br.createRecvSubArray(self._r_types[v], vToShape) - br.createSendSubArray(self._s_types[v], vFromShape) - - def _apply_toHost_host_toDevice(self, simulation=None): - if __VERBOSE__: - print ("{0} APPLY toHOST+HOST+toDEVICE".format(self._parent_rank)) - self._toHost() - self._wait_device() - self._host() - self._wait_host() - self._toDevice() - - def _apply_toHost_host(self, simulation=None): - if __VERBOSE__: - print ("{0} APPLY toHOST+HOST".format(self._parent_rank)) - self._toHost() - self._wait_device() - self._host() - - def _apply_host_toDevice(self, simulation=None): - if __VERBOSE__: - print ("{0} APPLY HOST+toDEVICE".format(self._parent_rank)) - self._host() - self._wait_host() - self._toDevice() - - def _toHost(self): + # Warning : comm from io_params will be used as + # reference for all mpi communication of this operator. + # --> rank computed in refcomm + # --> source and target must work inside refcomm + # If io_params is None, refcomm will COMM_WORLD. + + # Dictionnary of discrete fields to be sent + self._vsource = {} + # Dictionnary of discrete fields to be overwritten + self._vtarget = {} + + # dictionnary which maps rank with mpi derived type + # for send operations + self._send = {} + # dictionnay which maps rank with mpi derived type + # for send operations + self._receive = {} + # dictionnary which map rank/field name with a + # receive request + self._r_request = None + # dictionnary which map rank/field name with a + # send request + self._s_request = None + + def setup(self, rwork=None, iwork=None): """ - Proceed with data transfer of variables from device to host + Build bridge ... """ + # At setup, source and topo must be either + # a parmepy.mpi.topology.Cartesian or + # a computational operator. + + msg = 'Redistribute error : undefined source of target.' + assert self._source is not None and self._target is not None, msg + + from parmepy.mpi.topology import Cartesian + source_is_topo = isinstance(self._source, Cartesian) + target_is_topo = isinstance(self._target, Cartesian) + + # Check input operators + if not source_is_topo: + self._check_operator(self._source) + + if not target_is_topo: + self._check_operator(self._target) + # target operator must wait for + # the end of this operator to apply. + self._run_till.append(self._target) + + t_source, t_target = self._set_variables_and_topos(source_is_topo, + target_is_topo) + + source_res = t_source.mesh.discretization.resolution + target_res = t_target.mesh.discretization.resolution + msg = 'Redistribute error: source and target must ' + msg += 'have the same global resolution.' + assert (source_res == target_res).all(), msg + + # Set the dictionnaries of source/target variables + self._vsource = {v: v.discretize(t_source) + for v in self.variables} + self._vtarget = {v: v.discretize(t_target) + for v in self.variables} + + # We can create the bridge + self.bridge = Bridge(t_source, t_target) + + # Add this operator into wait list of + # operators listed in run_till + for op in self._run_till: + op.addRedistributeRequirement(self) + + # Shape of reference is the shape of source/target mesh + vref = self.variables[0] + shape = self._vsource[vref].topology.mesh.resolution + self._send = self.bridge.sendTypes(shape) + shape = self._vtarget[vref].topology.mesh.resolution + self._receive = self.bridge.recvTypes(shape) + + self._is_uptodate = True + + def apply(self, simulation=None): + # Try different way to send vars? + # - Buffered : copy all data into a buffer and send/recv + # - Standard : one send/recv per component + + # --- Standard send/recv --- + br = self.bridge + + # reset send/recv requests + self._r_request = {} + self._s_request = {} + + basetag = self._mpis.rank + 1 + # Comm used for send/receive operations + # It must contains all proc. of source topo and + # target topo. + refcomm = self.bridge.comm + # Loop over all required components of each variable for v in self.variables: - dv = self.vSource[v] - if dv in self._toHost_fields: - if not self._useless_transfer[v]: - dv.toHost(self.component) - - def _toDevice(self): - """ - Proceed with data transfer of variables from device to host - """ - for v in self.variables: - dv = self.vTarget[v] - if dv in self._toDevice_fields: - if not self._useless_transfer[v]: - dv.toDevice(self.component) - - def _host(self, simulation=None): - """ - Proceed with data redistribution from opFrom to opTo - """ - # TODO : - # - save a set of bridges in the domain and access them from operator - # - process all variables in one shot if they have the same topo - # (use buffers for mpi send/recv? ) - # - move MPI datatypes into the bridge? --> and free MPI type properly - if __VERBOSE__: - print ("{0} APPLY HOST".format(self._parent_rank)) - self.r_request = {} - self.s_request = {} - for v in self.variables: - br = self.bridges[v] - # Apply for each component considered for d in self._range_components(v): - if __VERBOSE__: - print ("{0} APPLY HOST".format(self._parent_rank), - self.vSource[v].name, '->', - self.vTarget[v].name, S_DIR[d]) - vTo = self.vTarget[v].data[d] - vFrom = self.vSource[v].data[d] - v_name = self.vSource[v].name + S_DIR[d] - if br.hasLocalInter: - vTo[br.ito] = vFrom[br.ifrom] - cRk = self._parent_comm.Get_rank() - for rk in br.recvFrom: - recvtag = (cRk + 1) * 989 + (rk + 1) * 99 + (d + 1) * 88 - self.r_request[v_name + str(rk)] = \ - self._parent_comm.Irecv([vTo, 1, self._r_types[v][rk]], - source=rk, tag=recvtag) - self._hasRequests = True - for rk in br.sendTo: - sendtag = (rk + 1) * 989 + (cRk + 1) * 99 + (d + 1) * 88 - self.s_request[v_name + str(rk)] = \ - self._parent_comm.Issend([vFrom, 1, - self._s_types[v][rk]], - dest=rk, tag=sendtag) - self._hasRequests = True - - def _wait_host(self): + v_name = v.name + S_DIR[d] + + # Deal with local copies of data + if br.hasLocalInter(): + vTo = self._vtarget[v].data[d] + vFrom = self._vsource[v].data[d] + vTo[br.localTargetInd()] = vFrom[br.localSourceInd()] + + # Transfers to other mpi processes + for rk in self._receive: + recvtag = basetag * 989 + (rk + 1) * 99 + (d + 1) * 88 + mpi_type = self._receive[rk] + vTo = self._vtarget[v].data[d] + self._r_request[v_name + str(rk)] = \ + refcomm.Irecv([vTo, 1, mpi_type], + source=rk, tag=recvtag) + self._has_requests = True + for rk in self._send: + sendtag = (rk + 1) * 989 + basetag * 99 + (d + 1) * 88 + mpi_type = self._send[rk] + vFrom = self._vsource[v].data[d] + self._s_request[v_name + str(rk)] = \ + refcomm.Issend([vFrom, 1, mpi_type], + dest=rk, tag=sendtag) + self._has_requests = True + + def wait(self): + if self._has_requests: + for rk in self._r_request: + self._r_request[rk].Wait() + for rk in self._s_request: + self._s_request[rk].Wait() + self._has_requests = False + + def test(self, rsend=None, rrecv=None): """ - MPI Barrier to wait for the end - of all communication requests. + if neither rsend or rrecv is given return + True if all communication request are complete + else check for sending to rsend or + receiving from rrecv. Process ranks + should be given in parent_comm. + @param rsend : discrete variable name + S_DIR + rank of the process + to which a message has been sent + and for which we want to test + message completion. + @param rrecv : discrete variable name + S_DIR + rank of the process + from which a message has been receive + and for which we want to test + message completion. """ - if __VERBOSE__: - print ("{0}", "WAIT MPI".format(self._parent_rank), - self._hasRequests) - if self._hasRequests: - for rk in self.r_request.keys(): - self.r_request[rk].Wait() - for rk in self.s_request.keys(): - self.s_request[rk].Wait() - self._hasRequests = False - - def _wait_all(self): - self._wait_host() - self._wait_device() - - def addRedistributeRequirement(self, red): - raise ValueError( - "Cannot add a requirement to a Redistribute_intra operator.") - - def getRedistributeRequirement(self): - return [] + if rsend is not None or rrecv is not None: + send_res = True + recv_res = True + if rsend is not None: + send_res = self._s_request[rsend].Test() + if rrecv is not None: + recv_res = self._r_request[rrecv].Test() + res = send_res and recv_res + else: + res = True + for rk in self._r_request.keys(): + res = self._r_request[rk].Test() + if not res: + return res + for rk in self._s_request.keys(): + res = self._s_request[rk].Test() + if not res: + return res + return res diff --git a/HySoP/hysop/operator/redistribute_new.py b/HySoP/hysop/operator/redistribute_new.py new file mode 100644 index 000000000..fe439755d --- /dev/null +++ b/HySoP/hysop/operator/redistribute_new.py @@ -0,0 +1,182 @@ +""" +@file redistribute_intra.py +Setup for data transfer/redistribution between two topologies. +""" + +from parmepy.operator.continuous import Operator +from parmepy.mpi.bridge_last import Bridge +from abc import ABCMeta, abstractmethod +from parmepy.constants import S_DIR + + +class Redistribute(Operator): + """ + Bare interface to redistribute operators + """ + + __metaclass__ = ABCMeta + + def __init__(self, source, target, component=None, + run_till=None, **kwds): + """ + + """ + # Base class initialisation + super(Redistribute, self).__init__(**kwds) + + # Object (may be an operator or a topology) which handles the + # fields to be transfered + self._source = source + # Object (may an operator or a topology) which handles the fields + # to be filled in from source. + self._target = target + + self.component = component + if component is None: + # All components are considered + self._range_components = lambda v: xrange(v.nbComponents) + else: + # Only the given component is considered + assert self.component >= 0, 'component value must be positive.' + self._range_components = lambda v: (self.component) + + ## Bridge between topology of source and topology of target + self.bridge = None + # True if some MPI operations are running for the current operator. + self._has_requests = False + # Which operator must wait for this one before + # any computation + # Exp : run_till = op1 means that op1 will + # wait for the end of this operator before + # op1 starts its apply. + if run_till is None: + run_till = [] + + assert isinstance(run_till, list) + self._run_till = run_till + + @abstractmethod + def setup(self, rwork=None, iwork=None): + """ + Check/set the list of variables to be distributed + + What must be set at setup? + ---> the list of continuous variables to be distributed + ---> the bridge (one for all variables, which means + that all vars must have the same topology in source + and the same topology in target. + ---> the list of discrete variables for source and + for target. + """ + + def _check_operator(self, op): + """ + @param op : a computational operator + - check if op is really a computational operator + - discretize op + - check if all required variables (if any) belong to op + """ + from parmepy.operator.computational import Computational + assert isinstance(op, Computational) + op.discretize() + msg = 'The variables to be distributed ' + msg += 'do not belong to the input operator.' + if len(self.variables) > 0: + assert all(v in op.variables for v in self.variables), msg + + def _set_variables_and_topos(self, source_is_topo, target_is_topo): + """ + @param source_is_topo : true if source is a topo + @param target_is_topo : true if target is a topo + Internal function to set the list + of variables to be distributed + and to set source/target topologies + """ + # If variables are not set at init, + # they must be infered from source/target operators. + has_var = len(self.variables) > 0 + vlist = (v for v in self.variables) + + # both source and target are topologies. Variables required. + if source_is_topo and target_is_topo: + msg = 'Redistribute, a list of variables is required at init.' + assert has_var, msg + + self.variables = [v for v in vlist] + # Just connect source/target topo to build bridge + # and discrete fields dictionnaries. + topo_source = self._source + topo_target = self._target + + elif not source_is_topo and not target_is_topo: + # both source and target are operators + # --> intersection of their variables + vsource = self._source.variables + vtarget = self._target.variables + if not has_var: + vlist = (v for v in vsource if v in vtarget) + + self.variables = [v for v in vlist] + + # A reference variable, used to set topo source/target + vref = self.variables[0] + topo_source = vsource[vref] + topo_target = vtarget[vref] + # We ensure that all vars have + # the same topo in source/target. + for v in (v for v in self.variables if v is not vref): + assert vsource[v] is topo_source + assert vtarget[v] is topo_target + + elif source_is_topo: + # source = topo, target = operator + vtarget = self._target.variables + if not has_var: + vlist = (v for v in vtarget) + self.variables = [v for v in vlist] + + # A reference variable, used to set topo source/target + vref = self.variables[0] + topo_source = self._source + topo_target = self._target.variables[vref] + # We ensure that all vars have + # the same topo in source/target. + for v in (v for v in self.variables if v is not vref): + assert vtarget[v] is topo_target + + else: + # source = operator, target = topo + vsource = self._source.variables + if not has_var: + vlist = (v for v in vsource) + self.variables = [v for v in vlist] + + # A reference variable, used to set topo source/target + vref = self.variables[0] + topo_source = self._source.variables[vref] + topo_target = self._target + # We ensure that all vars have + # the same topo in source operator + for v in (v for v in self.variables if v is not vref): + assert vsource[v] is topo_source + + return topo_source, topo_target + + @abstractmethod + def apply(self, simulation=None): + """ + distribute data ... + """ + + @abstractmethod + def wait(self): + """ + Wait for remaing MPI requests of this operator. + """ + + def finalize(self): + # wait for all remaining comm + self.wait() + + def printComputeTime(self): + pass -- GitLab