From 587c750b7be05a90be6eaac73f136431a874238a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franck=20P=C3=A9rignon?= <franck.perignon@imag.fr> Date: Mon, 23 May 2016 14:00:59 +0200 Subject: [PATCH] Fix tests, pep8 things (renaming ...). Push for CI, see gitlab for tests. --- hysop/mpi/bridge.py | 323 ++++++++++++-- hysop/mpi/bridge_inter.py | 135 ------ hysop/mpi/bridge_overlap.py | 115 ----- hysop/mpi/tests/test_bridge.py | 11 +- hysop/operator/redistribute.py | 504 +++++++++++++++++++++- hysop/operator/redistribute_inter.py | 183 -------- hysop/operator/redistribute_intra.py | 211 --------- hysop/operator/redistribute_overlap.py | 61 --- hysop/operator/tests/test_redistribute.py | 5 +- hysop/problem/problem.py | 173 ++++---- hysop/problem/problem_tasks.py | 3 +- hysop/problem/tests/test_problem.py | 0 hysop/problem/tests/test_simulation.py | 115 ++++- hysop/problem/tests/test_transport.py | 5 +- hysop/problem/transport.py | 9 +- hysop/tools/problem2dot.py | 3 +- hysop/tools/tests/test_profiler.py | 17 +- 17 files changed, 995 insertions(+), 878 deletions(-) delete mode 100644 hysop/mpi/bridge_inter.py delete mode 100644 hysop/mpi/bridge_overlap.py delete mode 100644 hysop/operator/redistribute_inter.py delete mode 100644 hysop/operator/redistribute_intra.py delete mode 100644 hysop/operator/redistribute_overlap.py delete mode 100644 hysop/problem/tests/test_problem.py diff --git a/hysop/mpi/bridge.py b/hysop/mpi/bridge.py index f07f590e5..e5d799771 100644 --- a/hysop/mpi/bridge.py +++ b/hysop/mpi/bridge.py @@ -1,10 +1,19 @@ -""" -@file bridge.py -Tools to compute the intersection between -two HySoP topologies. +"""Tools to compute the intersection between two topologies. + +`.. currentmodule : hysop.mpi.bridge + +* :class:`~Bridge` for topologies/operators defined + inside the same mpi communicator +* :class:`~BridgeInter` +* :class:`~BridgeOverlap` for topologies defined + inside the same mpi parent communicator and + with a different number of processes + """ from hysop.mpi.topology import Cartesian, TopoTools from hysop.tools.misc import Utils +from hysop.mpi import MPI +import hysop.tools.numpywrappers as npw class Bridge(object): @@ -13,9 +22,13 @@ class Bridge(object): """ def __init__(self, source, target): - """ - @param source : topology that owns the source mesh - @param target : topology of the targeted mesh + """Intersection between two topologies. + See users' manual for details + + Parameters + ---------- + source, target : :class:`~hysop.mpi.topology.Cartesian` + topologies that own the source mesh and targeted mesh """ # -- All dictionnaries belows used rank number (in parent comm) # as keys. -- @@ -43,7 +56,7 @@ class Bridge(object): self._build_send_recv_dict() def _check_topologies(self): - # First check if source and target are complient + """Check if source/target topologies exists and are complient""" msg = 'Bridge error, one or both topologies are None.' msg = 'Bridge error, input source/target must be topologies.' assert isinstance(self._source, Cartesian), msg @@ -59,9 +72,9 @@ class Bridge(object): self._rank = self.comm.Get_rank() def _build_send_recv_dict(self): - # Compute local intersections : i.e. find which grid points - # are on both source and target mesh. - + """Compute local (mpi) intersection of two topologies + i.e. find which grid points are on both source and target mesh. + """ # Get global indices of the mesh on source for all mpi processes. indices_source = TopoTools.gather_global_indices(self._source) @@ -104,27 +117,32 @@ class Bridge(object): self._recv_indices = {rk: convert(self._recv_indices[rk]) for rk in self._recv_indices} - def hasLocalInter(self): + def has_local_inter(self): + """True if local mesh points are also present on remote mesh + """ return self._rank in self._send_indices - def localSourceInd(self): + def local_source_ind(self): + """indices of points (in the local mesh) that + also belong to remote mesh + """ if self._rank in self._send_indices: return self._send_indices[self._rank] else: return {} - def localTargetInd(self): + def local_target_ind(self): + """indices of points (in the remote mesh) + that also belong to the local mesh + """ if self._rank in self._recv_indices: return self._recv_indices[self._rank] else: return {} - def recvTypes(self): - """ - Return the dictionnary of MPI derived types + def recv_types(self): + """Returns the dictionnary of MPI derived types received on targeted topology. - @param data_shape : shape (numpy-like) of the original array - @return : a dict of MPI types """ if self._recv_types is None: data_shape = self._target.mesh.resolution @@ -132,15 +150,270 @@ class Bridge(object): data_shape) return self._recv_types - def sendTypes(self): - """ - Return the dictionnary of MPI derived types - send from source topology. - @param data_shape : shape (numpy-like) of the original array - @return : a dict of MPI types + def send_types(self): + """Returns the dictionnary of MPI derived types sent by source topology. """ if self._send_types is None: data_shape = self._source.mesh.resolution self._send_types = TopoTools.create_subarray(self._send_indices, data_shape) return self._send_types + + +class BridgeInter(object): + """Intersection between two topologies defined + on two different mpi communicators. + """ + + def __init__(self, current, parent, source_id, target_id): + """Intersection between two topologies defined + on different mpi communicators (i.e. implies mpi intercomm) + See users' manual for details + + Parameters + ---------- + current : :class:`~hysop.mpi.topology.Cartesian` + parent : MPI.COMM + mpi communicator that must owns all the + processes involved in source and target. + source_id, target_id : int + mpi task ids for the source/target. + Required if source/target is None + else infered from source/target. + """ + + # The aim of a bridge if to compute the intersection of mesh grids + # on source topology with those on target topology, to be able to tell + # who must send/recv what to which process. + # This is done in steps: + # - the indices of grid points of each process are gathered + # onto the root process, for both source and target --> global_indices. + # We compute global indices (i.e. relative to the global grid) + # - an intercommunicator is used to broadcast these indices + # from source to the processes of target. + + # source task number + self.source_id = source_id + + # target task number + self.target_id = target_id + + assert isinstance(current, Cartesian) + assert isinstance(parent, MPI.Intracomm) + self._topology = current + # current task id + current_task = self._topology.domain.current_task() + + # True if current process is in the 'from' group' + task_is_source = current_task == self.source_id + + # True if current process is in the 'to' group + task_is_target = current_task == self.target_id + + # Ensure that current process belongs to one and only one task. + assert task_is_source or task_is_target + assert not(task_is_source and task_is_target) + + # Create an intercommunicator + # Create_intercomm attributes are: + # - local rank of leader process for current group (always 0) + # - parent communicator + # - rank of leader process in the remote group + + # rank of the first proc belonging to the remote task + # (used as remote leader) + proc_tasks = self._topology.domain.tasks_list() + if task_is_source: + remote_leader = proc_tasks.index(self.target_id) + elif task_is_target: + remote_leader = proc_tasks.index(self.source_id) + self.comm = self._topology.comm.Create_intercomm(0, parent, + remote_leader) + + current_indices, remote_indices = self._swap_indices() + + self._tranfer_indices = {} + current = current_indices[self._topology.rank] + for rk in remote_indices: + inter = Utils.intersl(current, remote_indices[rk]) + if inter is not None: + self._tranfer_indices[rk] = inter + + # Back to local indices + convert = self._topology.mesh.convert2local + self._tranfer_indices = {rk: convert(self._tranfer_indices[rk]) + for rk in self._tranfer_indices} + + self._transfer_types = None + + def _swap_indices(self): + """collect current/remote indices + """ + # First, we need to collect the global indices, as arrays + # since we need to broadcast them later. + current_indices = TopoTools.gather_global_indices(self._topology, + toslice=False) + # To allocate remote_indices array, we need the size of + # the remote communicator. + remote_size = self.comm.Get_remote_size() + dimension = self._topology.domain.dimension + remote_indices = npw.dim_zeros((dimension * 2, remote_size)) + # Then they are broadcasted to the remote communicator + rank = self._topology.rank + current_task = self._topology.domain.current_task() + if current_task is self.source_id: + # Local 0 broadcast current_indices to remote comm + if rank == 0: + self.comm.bcast(current_indices, root=MPI.ROOT) + else: + self.comm.bcast(current_indices, root=MPI.PROC_NULL) + # Get remote indices from remote comm + remote_indices = self.comm.bcast(remote_indices, root=0) + + elif current_task is self.target_id: + # Get remote indices from remote comm + remote_indices = self.comm.bcast(remote_indices, root=0) + # Local 0 broadcast current_indices to remote comm + if rank == 0: + self.comm.bcast(current_indices, root=MPI.ROOT) + else: + self.comm.bcast(current_indices, root=MPI.PROC_NULL) + + # Convert numpy arrays to dict of slices ... + current_indices = Utils.array_to_dict(current_indices) + remote_indices = Utils.array_to_dict(remote_indices) + + return current_indices, remote_indices + + def transfer_types(self): + """Return the dictionnary of MPI derived types + used for send (if on source) or receive (if on target) + """ + if self._transfer_types is None: + data_shape = self._topology.mesh.resolution + self._transfer_types = TopoTools.create_subarray( + self._tranfer_indices, data_shape) + return self._transfer_types + + +class BridgeOverlap(Bridge): + """ + Bridge between two topologies that: + - have a different number of mpi processes + - have common mpi processes + + i.e. something in between a standard bridge with intra-comm and + a bridge dealing with intercommunication. This is probably + a very pathologic case ... + + The main difference with a standard bridge is that + this one may be call on processes where either source + or target does not exist. + """ + def __init__(self, comm_ref=None, **kwds): + """Bridge between two topologies that: + * have a different number of mpi processes + * have common mpi processes + + Parameters + ---------- + comm_ref : MPI.COMM + mpi communicator used for all global communications. + It must include all processes of source and target. + If None, source.parent() is used. + + Notes + ----- + this is something in between a standard bridge with intra-comm and + a bridge dealing with intercommunication. This is probably + a very pathologic case ... + + The main difference with a standard bridge is that + this one may be call on processes where either source + or target does not exist. + + """ + self.comm = comm_ref + self.domain = None + super(BridgeOverlap, self).__init__(**kwds) + + def _check_topologies(self): + # First check if source and target are complient + if self.comm is None: + if self._source is not None: + self.comm = self._source.parent() + else: + self.comm = self._target.parent() + + # To build a bridge, all process in source/target must be in self.comm + # and there must be an overlap between source + # and target processes group. If not, turn to intercommunicator. + if self._source is not None and self._target is not None: + msg = 'BridgeOverlap error: mpi group from ' + msg += 'source and topo must overlap. If not ' + msg += 'BridgeInter will probably suits better.' + assert TopoTools.intersection_size(self._source.comm, + self._target.comm) > 0, msg + assert self._source.domain == self._target.domain + + if self._source is not None: + assert isinstance(self._source, Cartesian) + s_size = self._source.size + assert TopoTools.intersection_size(self._source.comm, + self.comm) == s_size + self.domain = self._source.domain + + if self._target is not None: + assert isinstance(self._target, Cartesian) + t_size = self._target.size + assert TopoTools.intersection_size(self._target.comm, + self.comm) == t_size + self.domain = self._target.domain + + self._rank = self.comm.Get_rank() + + def _build_send_recv_dict(self): + # Compute local intersections : i.e. find which grid points + # are on both source and target mesh. + indices_source = TopoTools.gather_global_indices_overlap(self._source, + self.comm, + self.domain) + indices_target = TopoTools.gather_global_indices_overlap(self._target, + self.comm, + self.domain) + + # From now on, we have indices_source[rk] = global indices (slice) + # of grid points of the source on process number rk in parent. + # And the same thing for indices_target. + dimension = self.domain.dimension + # Compute the intersections of the mesh on source with every mesh on + # target (i.e. for each mpi process). + if self._rank in indices_source: + current = indices_source[self._rank] + else: + current = [slice(None, None, None), ] * dimension + + for rk in indices_target: + inter = Utils.intersl(current, indices_target[rk]) + if inter is not None: + self._send_indices[rk] = inter + + if self._source is not None: + # Back to local indices + convert = self._source.mesh.convert2local + self._send_indices = {rk: convert(self._send_indices[rk]) + for rk in self._send_indices} + + if self._rank in indices_source: + current = indices_target[self._rank] + else: + current = [slice(None, None, None), ] * dimension + for rk in indices_source: + inter = Utils.intersl(current, indices_source[rk]) + if inter is not None: + self._recv_indices[rk] = inter + + if self._target is not None: + convert = self._target.mesh.convert2local + self._recv_indices = {rk: convert(self._recv_indices[rk]) + for rk in self._recv_indices} diff --git a/hysop/mpi/bridge_inter.py b/hysop/mpi/bridge_inter.py deleted file mode 100644 index 85f37b8a2..000000000 --- a/hysop/mpi/bridge_inter.py +++ /dev/null @@ -1,135 +0,0 @@ -""" -@file bridge.py -Tools to compute the intersection between -two HySoP topologies. -""" -from hysop.mpi.topology import Cartesian, TopoTools -from hysop.tools.misc import Utils -from hysop.mpi import MPI -import hysop.tools.numpywrappers as npw - - -class BridgeInter(object): - """ - todo - """ - - def __init__(self, current, parent, source_id, target_id): - """ - @param source : topology that owns the source mesh - @param target : topology of the targeted mesh - """ - - # The aim of a bridge if to compute the intersection of mesh grids - # on source topology with those on target topology, to be able to tell - # who must send/recv what to which process. - # This is done in steps: - # - the indices of grid points of each process are gathered - # onto the root process, for both source and target --> global_indices. - # We compute global indices (i.e. relative to the global grid) - # - an intercommunicator is used to broadcast these indices - # from source to the processes of target. - - ## source task number - self.source_id = source_id - - ## target task number - self.target_id = target_id - - assert isinstance(current, Cartesian) - assert isinstance(parent, MPI.Intracomm) - self._topology = current - # current task id - current_task = self._topology.domain.current_task() - - # True if current process is in the 'from' group' - task_is_source = current_task == self.source_id - - # True if current process is in the 'to' group - task_is_target = current_task == self.target_id - - # Ensure that current process belongs to one and only one task. - assert task_is_source or task_is_target - assert not(task_is_source and task_is_target) - - # Create an intercommunicator - # Create_intercomm attributes are: - # - local rank of leader process for current group (always 0) - # - parent communicator - # - rank of leader process in the remote group - - # rank of the first proc belonging to the remote task - # (used as remote leader) - proc_tasks = self._topology.domain.tasks_list() - if task_is_source: - remote_leader = proc_tasks.index(self.target_id) - elif task_is_target: - remote_leader = proc_tasks.index(self.source_id) - self.comm = self._topology.comm.Create_intercomm(0, parent, - remote_leader) - - current_indices, remote_indices = self._swap_indices() - - self._tranfer_indices = {} - current = current_indices[self._topology.rank] - for rk in remote_indices: - inter = Utils.intersl(current, remote_indices[rk]) - if inter is not None: - self._tranfer_indices[rk] = inter - - # Back to local indices - convert = self._topology.mesh.convert2local - self._tranfer_indices = {rk: convert(self._tranfer_indices[rk]) - for rk in self._tranfer_indices} - - self._transfer_types = None - - def _swap_indices(self): - # First, we need to collect the global indices, as arrays - # since we need to broadcast them later. - current_indices = TopoTools.gather_global_indices(self._topology, - toslice=False) - # To allocate remote_indices array, we need the size of - # the remote communicator. - remote_size = self.comm.Get_remote_size() - dimension = self._topology.domain.dimension - remote_indices = npw.dim_zeros((dimension * 2, remote_size)) - # Then they are broadcasted to the remote communicator - rank = self._topology.rank - current_task = self._topology.domain.current_task() - if current_task is self.source_id: - # Local 0 broadcast current_indices to remote comm - if rank == 0: - self.comm.bcast(current_indices, root=MPI.ROOT) - else: - self.comm.bcast(current_indices, root=MPI.PROC_NULL) - # Get remote indices from remote comm - remote_indices = self.comm.bcast(remote_indices, root=0) - - elif current_task is self.target_id: - # Get remote indices from remote comm - remote_indices = self.comm.bcast(remote_indices, root=0) - # Local 0 broadcast current_indices to remote comm - if rank == 0: - self.comm.bcast(current_indices, root=MPI.ROOT) - else: - self.comm.bcast(current_indices, root=MPI.PROC_NULL) - - # Convert numpy arrays to dict of slices ... - current_indices = Utils.array_to_dict(current_indices) - remote_indices = Utils.array_to_dict(remote_indices) - - return current_indices, remote_indices - - def transferTypes(self): - """ - Return the dictionnary of MPI derived types - used for send (if on source) or receive (if on target) - @param data_shape : shape (numpy-like) of the original array - @return : a dict of MPI types - """ - if self._transfer_types is None: - data_shape = self._topology.mesh.resolution - self._transfer_types = TopoTools.create_subarray( - self._tranfer_indices, data_shape) - return self._transfer_types diff --git a/hysop/mpi/bridge_overlap.py b/hysop/mpi/bridge_overlap.py deleted file mode 100644 index e4f75ba06..000000000 --- a/hysop/mpi/bridge_overlap.py +++ /dev/null @@ -1,115 +0,0 @@ -""" -@file bridge.py -Tools to compute the intersection between -two HySoP topologies defined on the same comm but for a -different number of processes. -""" -from hysop.mpi.topology import Cartesian, TopoTools -from hysop.tools.misc import Utils -from hysop.mpi.bridge import Bridge - - -class BridgeOverlap(Bridge): - """ - Bridge between two topologies that: - - have a different number of mpi processes - - have common mpi processes - - i.e. something in between a standard bridge with intra-comm and - a bridge dealing with intercommunication. This is probably - a very pathologic case ... - - The main difference with a standard bridge is that - this one may be call on processes where either source - or target does not exist. - """ - def __init__(self, comm_ref=None, **kwds): - """ - @param comm_ref : mpi communicator used for all global communications. - It must include all processes of source and target. - If None, source.parent() is used. - """ - self.comm = comm_ref - self.domain = None - super(BridgeOverlap, self).__init__(**kwds) - - def _check_topologies(self): - # First check if source and target are complient - if self.comm is None: - if self._source is not None: - self.comm = self._source.parent() - else: - self.comm = self._target.parent() - - # To build a bridge, all process in source/target must be in self.comm - # and there must be an overlap between source - # and target processes group. If not, turn to intercommunicator. - if self._source is not None and self._target is not None: - msg = 'BridgeOverlap error: mpi group from ' - msg += 'source and topo must overlap. If not ' - msg += 'BridgeInter will probably suits better.' - assert TopoTools.intersection_size(self._source.comm, - self._target.comm) > 0, msg - assert self._source.domain == self._target.domain - - if self._source is not None: - assert isinstance(self._source, Cartesian) - s_size = self._source.size - assert TopoTools.intersection_size(self._source.comm, - self.comm) == s_size - self.domain = self._source.domain - - if self._target is not None: - assert isinstance(self._target, Cartesian) - t_size = self._target.size - assert TopoTools.intersection_size(self._target.comm, - self.comm) == t_size - self.domain = self._target.domain - - self._rank = self.comm.Get_rank() - - def _build_send_recv_dict(self): - # Compute local intersections : i.e. find which grid points - # are on both source and target mesh. - indices_source = TopoTools.gather_global_indices_overlap(self._source, - self.comm, - self.domain) - indices_target = TopoTools.gather_global_indices_overlap(self._target, - self.comm, - self.domain) - - # From now on, we have indices_source[rk] = global indices (slice) - # of grid points of the source on process number rk in parent. - # And the same thing for indices_target. - dimension = self.domain.dimension - # Compute the intersections of the mesh on source with every mesh on - # target (i.e. for each mpi process). - if self._rank in indices_source: - current = indices_source[self._rank] - else: - current = [slice(None, None, None), ] * dimension - - for rk in indices_target: - inter = Utils.intersl(current, indices_target[rk]) - if inter is not None: - self._send_indices[rk] = inter - - if self._source is not None: - # Back to local indices - convert = self._source.mesh.convert2local - self._send_indices = {rk: convert(self._send_indices[rk]) - for rk in self._send_indices} - - if self._rank in indices_source: - current = indices_target[self._rank] - else: - current = [slice(None, None, None), ] * dimension - for rk in indices_source: - inter = Utils.intersl(current, indices_source[rk]) - if inter is not None: - self._recv_indices[rk] = inter - - if self._target is not None: - convert = self._target.mesh.convert2local - self._recv_indices = {rk: convert(self._recv_indices[rk]) - for rk in self._recv_indices} diff --git a/hysop/mpi/tests/test_bridge.py b/hysop/mpi/tests/test_bridge.py index 5bfe802d8..936fb00c1 100755 --- a/hysop/mpi/tests/test_bridge.py +++ b/hysop/mpi/tests/test_bridge.py @@ -1,9 +1,10 @@ from hysop.domain.box import Box from hysop.tools.parameters import Discretization -from hysop.mpi.bridge import Bridge +from hysop.mpi.bridge import Bridge, BridgeInter, BridgeOverlap from hysop.mpi import main_size, main_comm -from hysop.mpi.bridge_overlap import BridgeOverlap -from hysop.mpi.bridge_inter import BridgeInter +from hysop.mpi.tests.utils import create_subtopos, create_inter_topos + + import math @@ -77,7 +78,7 @@ def test_bridgeInter2D(): CPU = 1 GPU = 4 bridge = BridgeInter(topo1, main_comm, source_id=CPU, target_id=GPU) - tr = bridge.transferTypes() + tr = bridge.transfer_types() assert bridge is not None assert isinstance(tr, dict) # We cannot really check something interesting, @@ -104,7 +105,7 @@ def test_bridgeInter3D(): CPU = 1 GPU = 4 bridge = BridgeInter(topo1, main_comm, source_id=CPU, target_id=GPU) - tr = bridge.transferTypes() + tr = bridge.transfer_types() assert bridge is not None assert isinstance(tr, dict) # We cannot really check something interesting, diff --git a/hysop/operator/redistribute.py b/hysop/operator/redistribute.py index f43866672..bf21b5eb2 100644 --- a/hysop/operator/redistribute.py +++ b/hysop/operator/redistribute.py @@ -1,17 +1,29 @@ -""" -@file redistribute.py -Abstract interface for data redistribution. +"""Setup for data transfer/redistribution between topologies or operators + +`.. currentmodule : hysop.operator.redistribute + +* :class:`~RedistributeIntra` for topologies/operators defined + inside the same mpi communicator +* :class:`~RedistributeInter` for topologies/operators defined + on two different mpi communicator +* :class:`~RedistributeOverlap` for topologies defined + inside the same mpi parent communicator and + with a different number of processes +* :class:`~Redistribute` abstract base class + """ from hysop.operator.continuous import Operator from abc import ABCMeta, abstractmethod from hysop.mpi.topology import Cartesian from hysop.operator.computational import Computational +from hysop.operator.continuous import opsetup, opapply +from hysop.mpi.bridge import Bridge, BridgeOverlap, BridgeInter +from hysop.constants import S_DIR, debug class Redistribute(Operator): - """ - Bare interface to redistribute operators + """Abstract interface to redistribute operators """ __metaclass__ = ABCMeta @@ -19,11 +31,17 @@ class Redistribute(Operator): def __init__(self, source, target, component=None, run_till=None, **kwds): """ - @param source : topology or computational operator - @param target : topology or computational operator - @param component : which component must be distributed (default = all) - @param run_till : a list of operators that must wait for the completion - of this redistribute before any apply. + Parameters + ---------- + source, target: :class:`~hysop.mpi.topology.Cartesian` or + :class:`~hysop.operator.computational.Computational + topologies or operators that own the source mesh and targeted mesh + component: int + which component of the field must be distributed (default = all) + run_till: list of :class:`~hysop.operator.computational.Computational + operators that must wait for the completion of this redistribute + before any apply. + """ # Base class initialisation super(Redistribute, self).__init__(**kwds) @@ -44,7 +62,7 @@ class Redistribute(Operator): assert self.component >= 0, 'component value must be positive.' self._range_components = lambda v: (self.component) - ## Bridge between topology of source and topology of target + # Bridge between topology of source and topology of target self.bridge = None # True if some MPI operations are running for the current operator. self._has_requests = False @@ -78,11 +96,16 @@ class Redistribute(Operator): super(Redistribute, self).setup(rwork, iwork) def _check_operator(self, op): - """ - @param op : a computational operator - - check if op is really a computational operator - - discretize op - - check if all required variables (if any) belong to op + """ ensure op properties: + * check if op is really a computational operator + * discretize op + * check if all required variables (if any) belong to op + + Parameters + ---------- + op : :class:`~hysop.operator.computational.Computational + :param: op : a computational operator + """ assert isinstance(op, Computational) op.discretize() @@ -156,10 +179,14 @@ class Redistribute(Operator): assert v.domain is self.domain def _set_topology(self, current): - """ - @param current: a topology or a computational operator - This function check if current is valid, fits with self.variables + """This function check if current is valid, fits with self.variables and get its topology to set self._topology. + + Parameters + ---------- + current : :class:`~hysop.mpi.topology.Cartesian` or + :class:`~hysop.mpi.operator.computational.Computational` + """ if isinstance(current, Cartesian): result = current @@ -182,3 +209,442 @@ class Redistribute(Operator): def computation_time(self): pass + + +class RedistributeIntra(Redistribute): + """Data transfer between two operators/topologies. + Source and target must: + - be defined on the same communicator + - work on the same number of mpi process + - work with the same global resolution + """ + + def __init__(self, **kwds): + """Data transfer between two operators/topologies defined on the + same communicator + + Source and target must: + * be defined on the same communicator + * work on the same number of mpi process + * work with the same global resolution + """ + + # Base class initialisation + super(RedistributeIntra, self).__init__(**kwds) + + # Warning : comm from io_params will be used as + # reference for all mpi communication of this operator. + # --> rank computed in refcomm + # --> source and target must work inside refcomm + # If io_params is None, refcomm will COMM_WORLD. + + # Dictionnary of discrete fields to be sent + self._vsource = {} + # Dictionnary of discrete fields to be overwritten + self._vtarget = {} + + # dictionnary which maps rank with mpi derived type + # for send operations + self._send = {} + # dictionnay which maps rank with mpi derived type + # for send operations + self._receive = {} + # dictionnary which map rank/field name with a + # receive request + self._r_request = None + # dictionnary which map rank/field name with a + # send request + self._s_request = None + + # Set list of variables and the domain. + self._set_variables() + # Set mpi related stuff + self._set_domain_and_tasks() + + @opsetup + def setup(self, rwork=None, iwork=None): + # At setup, source and topo must be either + # a hysop.mpi.topology.Cartesian or + # a computational operator. + + msg = 'Redistribute error : undefined source of target.' + assert self._source is not None and self._target is not None, msg + + t_source = self._set_topology(self._source) + t_target = self._set_topology(self._target) + + source_res = t_source.mesh.discretization.resolution + target_res = t_target.mesh.discretization.resolution + msg = 'Redistribute error: source and target must ' + msg += 'have the same global resolution.' + assert (source_res == target_res).all(), msg + + # Set the dictionnaries of source/target variables + self._vsource = {v: v.discretize(t_source) + for v in self.variables} + self._vtarget = {v: v.discretize(t_target) + for v in self.variables} + + # We can create the bridge + self.bridge = Bridge(t_source, t_target) + + # Shape of reference is the shape of source/target mesh + self._send = self.bridge.send_types() + self._receive = self.bridge.recv_types() + self._set_synchro() + self._is_uptodate = True + + def _set_synchro(self): + """ + Set who must wait for who ... + """ + # Check input operators + if isinstance(self._source, Computational): + # redistribute must wait for source if a variable of redistribute + # is an output from source. + for v in self.variables: + vout = v in self._source.output or False + if vout: + self.wait_for(self._source) + # And source must wait for redistribute + # if a variable of red. is an output from source. + self._source.wait_for(self) + + if isinstance(self._target, Computational): + # target operator must wait for + # the end of this operator to apply. + self._run_till.append(self._target) + + # Add this operator into wait list of + # operators listed in run_till + for op in self._run_till: + op.wait_for(self) + + self._is_uptodate = True + + def add_run_till_op(self, op): + """Add an operator to the wait list""" + self._run_till.append(op) + op.wait_for(self) + + @opapply + def apply(self, simulation=None): + # Try different way to send vars? + # - Buffered : copy all data into a buffer and send/recv + # - Standard : one send/recv per component + # --- Standard send/recv --- + br = self.bridge + + # reset send/recv requests + self._r_request = {} + self._s_request = {} + + basetag = self._mpis.rank + 1 + # Comm used for send/receive operations + # It must contains all proc. of source topo and + # target topo. + refcomm = self.bridge.comm + # Loop over all required components of each variable + for v in self.variables: + for d in self._range_components(v): + v_name = v.name + S_DIR[d] + + # Deal with local copies of data + if br.has_local_inter(): + vTo = self._vtarget[v].data[d] + vFrom = self._vsource[v].data[d] + vTo[br.local_target_ind()] = vFrom[br.local_source_ind()] + + # Transfers to other mpi processes + for rk in self._receive: + recvtag = basetag * 989 + (rk + 1) * 99 + (d + 1) * 88 + mpi_type = self._receive[rk] + vTo = self._vtarget[v].data[d] + self._r_request[v_name + str(rk)] = \ + refcomm.Irecv([vTo, 1, mpi_type], + source=rk, tag=recvtag) + self._has_requests = True + for rk in self._send: + sendtag = (rk + 1) * 989 + basetag * 99 + (d + 1) * 88 + mpi_type = self._send[rk] + vFrom = self._vsource[v].data[d] + self._s_request[v_name + str(rk)] = \ + refcomm.Issend([vFrom, 1, mpi_type], + dest=rk, tag=sendtag) + self._has_requests = True + + def wait(self): + if self._has_requests: + for rk in self._r_request: + self._r_request[rk].Wait() + for rk in self._s_request: + self._s_request[rk].Wait() + self._has_requests = False + + def test_requests(self): + res = True + for rk in self._r_request.keys(): + res = self._r_request[rk].Test() + if not res: + return res + for rk in self._s_request.keys(): + res = self._s_request[rk].Test() + if not res: + return res + + def test_single_request(self, rsend=None, rrecv=None): + """if neither rsend or rrecv is given return + True if all communication request are complete + else check for sending to rsend or receiving from rrecv. + Process ranks should be those in parent_comm. + + Parameters + ---------- + rsend : string + discrete variable name + S_DIR + rank of the process + to which a message has been sent + and for which we want to test message completion. + rrecv : string + discrete variable name + S_DIR + rank of the process + from which a message has been receive + and for which we want to test message completion. + + """ + if rsend is not None or rrecv is not None: + send_res = True + recv_res = True + if rsend is not None: + send_res = self._s_request[rsend].Test() + if rrecv is not None: + recv_res = self._r_request[rrecv].Test() + res = send_res and recv_res + return res + else: + return self.test_requests() + + +class RedistributeInter(Redistribute): + """Operator to redistribute data from one communicator to another. + Source/target may be either a topology or a computational operator. + It implies mpi inter-communications. + """ + + @debug + def __init__(self, parent, source_id=None, target_id=None, **kwds): + """redistribute data from one communicator to another. + Source/target may be either a topology or a computational operator. + It implies mpi inter-communications. + + Parameters + ---------- + parent : MPI.COMM + mpi communicator that must owns all the + processes involved in source and target. + source_id, target_id : int + mpi task ids for the source/target. + Required if source/target is None + else infered from source/target. + + See other required parameters in base class. + """ + super(RedistributeInter, self).__init__(**kwds) + + # parent communicator, that must contains all processes + # involved in source and target tasks. + self.parent = parent + + # set source and targets ids. + # They must be known before setup. + # Either they can be infered from source and target + # or must be set in argument list, if either source + # or target is undefined on the current process. + if self._source is None: + assert source_id is not None + + if self._target is None: + assert target_id is not None + + self._source_id = source_id + self._target_id = target_id + + # Set list of variables and domain. + self._set_variables() + # Set mpi related stuff + self._set_domain_and_tasks() + + # Domain is set, we can check if we are on source or target + current_task = self.domain.current_task() + self._is_source = current_task == self._source_id + self._is_target = current_task == self._target_id + assert self._is_target or self._is_source + assert not (self._is_target and self._is_source) + + nbprocs = len(self.domain.tasks_list()) + msg = "Parent communicator size and number of procs " + msg += "in domain differ." + assert parent.Get_size() == nbprocs, msg + + # the local topology. May be either source or target + # depending on the task of the current process. + self._topology = None + + # dictionnary which maps rank with mpi derived type + # used for send/recv operations (send on source, recv on target ...) + self._transfer_types = None + + # dictionnary which maps rank/field name with a + # send/recv request + self._requests = {} + + @debug + @opsetup + def setup(self, rwork=None, iwork=None): + # First of all, we need to get the current topology: + if self._is_source: + assert self._source is not None + self._topology = self._set_topology(self._source) + elif self._is_target: + assert self._target is not None + self._topology = self._set_topology(self._target) + + # Now we can build the bridge (intercomm) + self.bridge = BridgeInter(self._topology, self.parent, + self._source_id, self._target_id) + + # And get mpi derived types + self._transfer_types = self.bridge.transfer_types() + + self._set_synchro() + self._is_uptodate = True + + def _set_synchro(self): + """ + Set who must wait for who ... + """ + if self._is_source and isinstance(self._source, Computational): + # redistribute must wait for source if a variable of redistribute + # is an output from source. + for v in self.variables: + vout = v in self._source.output or False + if vout: + self.wait_for(self._source) + # And source must wait for redistribute + # if a variable of red. is an output from source. + self._source.wait_for(self) + + if self._is_target and isinstance(self._target, Computational): + # target operator must wait for + # the end of this operator to apply. + self._run_till.append(self._target) + + # Add this operator into wait list of + # operators listed in run_till + for op in self._run_till: + op.wait_for(self) + + def add_run_till_op(self, op): + """Add an operator to the wait list""" + if self._is_target: + self._run_till.append(op) + op.wait_for(self) + + @debug + @opapply + def apply(self, simulation=None): + # --- Standard send/recv --- + self._requests = {} + + # basetag = self._mpis.rank + 1 + # Comm used for send/receive operations + # It must contains all proc. of source topo and + # target topo. + refcomm = self.bridge.comm + # Map between rank and mpi types + # Loop over all required components of each variable + for v in self.variables: + rank = self._topology.comm.Get_rank() + for d in self._range_components(v): + v_name = v.name + S_DIR[d] + vtab = v.discreteFields[self._topology].data[d] + for rk in self._transfer_types: + if self._is_target: + # Set reception + self._requests[v_name + str(rk)] = \ + refcomm.Irecv([vtab[...], 1, + self._transfer_types[rk]], + source=rk, tag=rk) + if self._is_source: + self._requests[v_name + str(rk)] = \ + refcomm.Issend([vtab[...], 1, + self._transfer_types[rk]], + dest=rk, tag=rank) + self._has_requests = True + + def wait(self): + if self._has_requests: + for rk in self._requests: + self._requests[rk].Wait() + for v in self.variables: + for d in self._range_components(v): + vtab = v.discreteFields[self._topology].data[d] + self._has_requests = False + + def test_requests(self): + res = True + for rk in self._requests: + res = self._requests[rk].Test() + if not res: + return res + + +class RedistributeOverlap(RedistributeIntra): + """A specific redistribute where source and target do not work with the same + group of mpi processes. + Requirements : + - work only on topologies, not on operators + - same global resolution for both topologies + - group from source topology and target topology MUST overlap. + """ + + @opsetup + def setup(self, rwork=None, iwork=None): + """ + Check/set the list of variables to be distributed + + What must be set at setup? + ---> the list of continuous variables to be distributed + ---> the bridge (one for all variables, which means + that all vars must have the same topology in source + and the same topology in target. + ---> the list of discrete variables for source and + for target. + """ + if self._source is not None: + self._vsource = self._discrete_fields(self._source) + if self._target is not None: + self._vtarget = self._discrete_fields(self._target) + + # We can create the bridge + self.bridge = BridgeOverlap(source=self._source, target=self._target, + comm_ref=self._mpis.comm) + + # Build mpi derived types for send and receive operations. + # Shape of reference is the shape of source/target mesh + if self._source is not None: + self._send = self.bridge.send_types() + if self._target is not None: + self._receive = self.bridge.recv_types() + + self._set_synchro() + self._is_uptodate = True + + def _discrete_fields(self, topo): + """Return the dictionnary of discrete fields for topo + and the variables of this operator. + + Parameters + ---------- + topo : :class:`~hysop.mpi.topology.Cartesian` + """ + assert isinstance(topo, Cartesian) + return {v: v.discretize(topo) for v in self.variables} diff --git a/hysop/operator/redistribute_inter.py b/hysop/operator/redistribute_inter.py deleted file mode 100644 index 36cc97eca..000000000 --- a/hysop/operator/redistribute_inter.py +++ /dev/null @@ -1,183 +0,0 @@ -""" -@file redistribute_intercomm.py -Data transfer between two topologies/operators, defined on -different mpi tasks (i.e. intercommunication). -""" -from hysop.constants import debug, S_DIR -from hysop.mpi.bridge_inter import BridgeInter -from hysop.operator.redistribute import Redistribute -from hysop.operator.computational import Computational -from hysop.operator.continuous import opsetup, opapply - - -class RedistributeInter(Redistribute): - """ - Operator to redistribute data from one communicator to another. - Source/target may be either a topology or a computational operator. - It implies mpi inter-communications. - """ - @debug - def __init__(self, parent, source_id=None, target_id=None, **kwds): - """ - @param parent : parent mpi communicator that must owns all the - processes involved in source and target. - @param source_id: mpi task id for the source. - Required if source is None else infered from source. - @param target_id: mpi task id for the target. - Required if target is None else infered from target. - See other required parameters in base class. - """ - super(RedistributeInter, self).__init__(**kwds) - - ## parent communicator, that must contains all processes - ## involved in source and target tasks. - self.parent = parent - - # set source and targets ids. - # They must be known before setup. - # Either they can be infered from source and target - # or must be set in argument list, if either source - # or target is undefined on the current process. - if self._source is None: - assert source_id is not None - - if self._target is None: - assert target_id is not None - - self._source_id = source_id - self._target_id = target_id - - # Set list of variables and domain. - self._set_variables() - # Set mpi related stuff - self._set_domain_and_tasks() - - # Domain is set, we can check if we are on source or target - current_task = self.domain.current_task() - self._is_source = current_task == self._source_id - self._is_target = current_task == self._target_id - assert self._is_target or self._is_source - assert not (self._is_target and self._is_source) - - nbprocs = len(self.domain.tasks_list()) - msg = "Parent communicator size and number of procs " - msg += "in domain differ." - assert parent.Get_size() == nbprocs, msg - - # the local topology. May be either source or target - # depending on the task of the current process. - self._topology = None - - # dictionnary which maps rank with mpi derived type - # used for send/recv operations (send on source, recv on target ...) - self._transfer_types = None - - # dictionnary which maps rank/field name with a - # send/recv request - self._requests = {} - - @debug - @opsetup - def setup(self, rwork=None, iwork=None): - # First of all, we need to get the current topology: - if self._is_source: - assert self._source is not None - self._topology = self._set_topology(self._source) - elif self._is_target: - assert self._target is not None - self._topology = self._set_topology(self._target) - - # Now we can build the bridge (intercomm) - self.bridge = BridgeInter(self._topology, self.parent, - self._source_id, self._target_id) - - # And get mpi derived types - self._transfer_types = self.bridge.transferTypes() - - self._set_synchro() - self._is_uptodate = True - - def _set_synchro(self): - """ - Set who must wait for who ... - """ - if self._is_source and isinstance(self._source, Computational): - # redistribute must wait for source if a variable of redistribute - # is an output from source. - for v in self.variables: - vout = v in self._source.output or False - if vout: - self.wait_for(self._source) - # And source must wait for redistribute - # if a variable of red. is an output from source. - self._source.wait_for(self) - - if self._is_target and isinstance(self._target, Computational): - # target operator must wait for - # the end of this operator to apply. - self._run_till.append(self._target) - - # Add this operator into wait list of - # operators listed in run_till - for op in self._run_till: - op.wait_for(self) - - def add_run_till_op(self, op): - """Add an operator to the wait list""" - if self._is_target: - self._run_till.append(op) - op.wait_for(self) - - @debug - @opapply - def apply(self, simulation=None): - """ - Apply this operator to its variables. - @param simulation : object that describes the simulation - parameters (time, time step, iteration number ...), see - hysop.problem.simulation.Simulation for details. - """ - # --- Standard send/recv --- - self._requests = {} - - # basetag = self._mpis.rank + 1 - # Comm used for send/receive operations - # It must contains all proc. of source topo and - # target topo. - refcomm = self.bridge.comm - # Map between rank and mpi types - # Loop over all required components of each variable - for v in self.variables: - rank = self._topology.comm.Get_rank() - for d in self._range_components(v): - v_name = v.name + S_DIR[d] - vtab = v.discreteFields[self._topology].data[d] - for rk in self._transfer_types: - if self._is_target: - # Set reception - self._requests[v_name + str(rk)] = \ - refcomm.Irecv([vtab[...], 1, - self._transfer_types[rk]], - source=rk, tag=rk) - if self._is_source: - self._requests[v_name + str(rk)] = \ - refcomm.Issend([vtab[...], 1, - self._transfer_types[rk]], - dest=rk, tag=rank) - self._has_requests = True - - def wait(self): - if self._has_requests: - for rk in self._requests: - self._requests[rk].Wait() - for v in self.variables: - for d in self._range_components(v): - vtab = v.discreteFields[self._topology].data[d] - self._has_requests = False - - def test_requests(self): - res = True - for rk in self._requests: - res = self._requests[rk].Test() - if not res: - return res diff --git a/hysop/operator/redistribute_intra.py b/hysop/operator/redistribute_intra.py deleted file mode 100644 index c9062f9cf..000000000 --- a/hysop/operator/redistribute_intra.py +++ /dev/null @@ -1,211 +0,0 @@ -""" -@file redistribute_intra.py -Setup for data transfer/redistribution between two topologies -or operators inside the same mpi communicator. -""" -from hysop.operator.redistribute import Redistribute -from hysop.operator.continuous import opsetup, opapply -from hysop.mpi.bridge import Bridge -from hysop.constants import S_DIR - - -class RedistributeIntra(Redistribute): - """ - Data transfer between two operators/topologies. - Source and target must: - - be defined on the same communicator - - work on the same number of mpi process - - work with the same global resolution - """ - def __init__(self, **kwds): - # Base class initialisation - super(RedistributeIntra, self).__init__(**kwds) - - # Warning : comm from io_params will be used as - # reference for all mpi communication of this operator. - # --> rank computed in refcomm - # --> source and target must work inside refcomm - # If io_params is None, refcomm will COMM_WORLD. - - # Dictionnary of discrete fields to be sent - self._vsource = {} - # Dictionnary of discrete fields to be overwritten - self._vtarget = {} - - # dictionnary which maps rank with mpi derived type - # for send operations - self._send = {} - # dictionnay which maps rank with mpi derived type - # for send operations - self._receive = {} - # dictionnary which map rank/field name with a - # receive request - self._r_request = None - # dictionnary which map rank/field name with a - # send request - self._s_request = None - - # Set list of variables and the domain. - self._set_variables() - # Set mpi related stuff - self._set_domain_and_tasks() - - @opsetup - def setup(self, rwork=None, iwork=None): - # At setup, source and topo must be either - # a hysop.mpi.topology.Cartesian or - # a computational operator. - - msg = 'Redistribute error : undefined source of target.' - assert self._source is not None and self._target is not None, msg - - t_source = self._set_topology(self._source) - t_target = self._set_topology(self._target) - - source_res = t_source.mesh.discretization.resolution - target_res = t_target.mesh.discretization.resolution - msg = 'Redistribute error: source and target must ' - msg += 'have the same global resolution.' - assert (source_res == target_res).all(), msg - - # Set the dictionnaries of source/target variables - self._vsource = {v: v.discretize(t_source) - for v in self.variables} - self._vtarget = {v: v.discretize(t_target) - for v in self.variables} - - # We can create the bridge - self.bridge = Bridge(t_source, t_target) - - # Shape of reference is the shape of source/target mesh - self._send = self.bridge.sendTypes() - self._receive = self.bridge.recvTypes() - self._set_synchro() - self._is_uptodate = True - - def _set_synchro(self): - """ - Set who must wait for who ... - """ - from hysop.operator.computational import Computational - # Check input operators - if isinstance(self._source, Computational): - # redistribute must wait for source if a variable of redistribute - # is an output from source. - for v in self.variables: - vout = v in self._source.output or False - if vout: - self.wait_for(self._source) - # And source must wait for redistribute - # if a variable of red. is an output from source. - self._source.wait_for(self) - - if isinstance(self._target, Computational): - # target operator must wait for - # the end of this operator to apply. - self._run_till.append(self._target) - - # Add this operator into wait list of - # operators listed in run_till - for op in self._run_till: - op.wait_for(self) - - self._is_uptodate = True - - def add_run_till_op(self, op): - """Add an operator to the wait list""" - self._run_till.append(op) - op.wait_for(self) - - @opapply - def apply(self, simulation=None): - # Try different way to send vars? - # - Buffered : copy all data into a buffer and send/recv - # - Standard : one send/recv per component - # --- Standard send/recv --- - br = self.bridge - - # reset send/recv requests - self._r_request = {} - self._s_request = {} - - basetag = self._mpis.rank + 1 - # Comm used for send/receive operations - # It must contains all proc. of source topo and - # target topo. - refcomm = self.bridge.comm - # Loop over all required components of each variable - for v in self.variables: - for d in self._range_components(v): - v_name = v.name + S_DIR[d] - - # Deal with local copies of data - if br.hasLocalInter(): - vTo = self._vtarget[v].data[d] - vFrom = self._vsource[v].data[d] - vTo[br.localTargetInd()] = vFrom[br.localSourceInd()] - - # Transfers to other mpi processes - for rk in self._receive: - recvtag = basetag * 989 + (rk + 1) * 99 + (d + 1) * 88 - mpi_type = self._receive[rk] - vTo = self._vtarget[v].data[d] - self._r_request[v_name + str(rk)] = \ - refcomm.Irecv([vTo, 1, mpi_type], - source=rk, tag=recvtag) - self._has_requests = True - for rk in self._send: - sendtag = (rk + 1) * 989 + basetag * 99 + (d + 1) * 88 - mpi_type = self._send[rk] - vFrom = self._vsource[v].data[d] - self._s_request[v_name + str(rk)] = \ - refcomm.Issend([vFrom, 1, mpi_type], - dest=rk, tag=sendtag) - self._has_requests = True - - def wait(self): - if self._has_requests: - for rk in self._r_request: - self._r_request[rk].Wait() - for rk in self._s_request: - self._s_request[rk].Wait() - self._has_requests = False - - def test_requests(self): - res = True - for rk in self._r_request.keys(): - res = self._r_request[rk].Test() - if not res: - return res - for rk in self._s_request.keys(): - res = self._s_request[rk].Test() - if not res: - return res - - def test_single_request(self, rsend=None, rrecv=None): - """ - if neither rsend or rrecv is given return - True if all communication request are complete - else check for sending to rsend or - receiving from rrecv. Process ranks - should be given in parent_comm. - @param rsend : discrete variable name + S_DIR + rank of the process - to which a message has been sent - and for which we want to test - message completion. - @param rrecv : discrete variable name + S_DIR + rank of the process - from which a message has been receive - and for which we want to test - message completion. - """ - if rsend is not None or rrecv is not None: - send_res = True - recv_res = True - if rsend is not None: - send_res = self._s_request[rsend].Test() - if rrecv is not None: - recv_res = self._r_request[rrecv].Test() - res = send_res and recv_res - return res - else: - return self.test_requests() diff --git a/hysop/operator/redistribute_overlap.py b/hysop/operator/redistribute_overlap.py deleted file mode 100644 index 6206f2502..000000000 --- a/hysop/operator/redistribute_overlap.py +++ /dev/null @@ -1,61 +0,0 @@ -""" -@file redistribute_overlap.py -Setup for data transfer/redistribution between two topologies defined on the -same mpi parent communicator but with a different number of processes. -""" -from hysop.operator.continuous import opsetup -from hysop.operator.redistribute_intra import RedistributeIntra -from hysop.mpi.bridge_overlap import BridgeOverlap - - -class RedistributeOverlap(RedistributeIntra): - """ - A specific redistribute where source and target do not work with the same - group of mpi processes. - Requirements : - - work only on topologies, not on operators - - same global resolution for both topologies - - group from source topology and target topology MUST overlap. - """ - - @opsetup - def setup(self, rwork=None, iwork=None): - """ - Check/set the list of variables to be distributed - - What must be set at setup? - ---> the list of continuous variables to be distributed - ---> the bridge (one for all variables, which means - that all vars must have the same topology in source - and the same topology in target. - ---> the list of discrete variables for source and - for target. - """ - if self._source is not None: - self._vsource = self._discrete_fields(self._source) - if self._target is not None: - self._vtarget = self._discrete_fields(self._target) - - # We can create the bridge - self.bridge = BridgeOverlap(source=self._source, target=self._target, - comm_ref=self._mpis.comm) - - # Build mpi derived types for send and receive operations. - # Shape of reference is the shape of source/target mesh - if self._source is not None: - self._send = self.bridge.sendTypes() - if self._target is not None: - self._receive = self.bridge.recvTypes() - - self._set_synchro() - self._is_uptodate = True - - def _discrete_fields(self, topo): - """ - @param topo : a Cartesian HySoP topology - Return the dictionnary of discrete fields for topo - and the variables of this operator. - """ - from hysop.mpi.topology import Cartesian - assert isinstance(topo, Cartesian) - return {v: v.discretize(topo) for v in self.variables} diff --git a/hysop/operator/tests/test_redistribute.py b/hysop/operator/tests/test_redistribute.py index ce9cadda0..67198bc34 100755 --- a/hysop/operator/tests/test_redistribute.py +++ b/hysop/operator/tests/test_redistribute.py @@ -1,6 +1,5 @@ -from hysop.operator.redistribute_intra import RedistributeIntra -from hysop.operator.redistribute_inter import RedistributeInter -from hysop.operator.redistribute_overlap import RedistributeOverlap +from hysop.operator.redistribute import RedistributeIntra,\ + RedistributeInter, RedistributeOverlap from hysop.tools.parameters import Discretization, MPIParams from hysop import testsenv import hysop as pp diff --git a/hysop/problem/problem.py b/hysop/problem/problem.py index 9cb16aaba..1cfce768a 100644 --- a/hysop/problem/problem.py +++ b/hysop/problem/problem.py @@ -1,28 +1,22 @@ -""" -@file problem.py - -Complete problem description. +"""Description of a problem, i.e. a sequence of operators """ from hysop.constants import debug import cPickle -from hysop import __VERBOSE__ -from hysop.operator.redistribute import Redistribute -from hysop.operator.redistribute_intra import RedistributeIntra +from hysop import __VERBOSE__, __GPU_ENABLED__ +from hysop.operator.redistribute import Redistribute, RedistributeIntra from hysop.tools.profiler import profile, Profiler from hysop.mpi import main_rank -from hysop.gpu.gpu_transfer import DataTransfer +if __GPU_ENABLED__: + from hysop.gpu.gpu_transfer import DataTransfer +else: + DataTransfer = int class Problem(object): - """ - Problem representation. - - Contains several operators that apply on variables. - Variables are defined on different domains.\n - Each operator is set up and variables are initialized in a set up step.\n - To solve the problem, a loop over time-steps is launched. A step consists - in calling the apply method of each operators.\n - To finish, a finalize method is called.\ + """Problem representation. + + A problem is a sequence of operators applied on a set of + continuous fields. """ @debug @@ -31,52 +25,57 @@ class Problem(object): @debug def __init__(self, operators, simulation, - dumpFreq=100, name=None): + dump_freq=100, name=None): """ - Create a transport problem instance. - - @param operators : list of operators. - @param simulation : a hysop.simulation.Simulation object - to describe simulation parameters. - @param name : an id for the problem - @param dumpFreq : frequency of dump (i.e. saving to a file) - for the problem; set dumpFreq = -1 for no dumps. Default = 100. + + Parameters + ---------- + operators : list of :class:`~hysop.operator.continuous.Continuous` + sequence of operators used to define the problem + simulation : :class:`~hysop.problem.simulation.Simulation` + description of the time discretisation + dump_freq : real; optional + frequency of dump to file for the problem. + Set dump_freq = -1 for no dumps. Default = 100, + i.e. every 100 time steps. + name : string, optional + id for the problem """ - ## Problem name + # Problem name self.name = name - ## Problem operators + # Problem operators self.operators = operators - ## Computes time step and manage iterations + # Computes time step and manage iterations self.simulation = simulation + + # all variables must be defined on the same domain. vref = self.operators[0].variables.keys()[0] self.domain = vref.domain + msg = "All variables must be defined on the same domain." for op in self.operators: for v in (v for v in op.variables if v is not vref): print id(v.domain), id(self.domain) print v.domain, self.domain if self.domain is not v.domain: - raise ValueError("Problem must have only one " + - "domain for variables.") - ## A list of variables that must be initialized before - ## any call to op.apply() + raise ValueError(msg) + # A list of variables that must be initialized before + # any call to op.apply() self.input = [] - ## call to problem.dump frequency during apply. - if dumpFreq >= 0: - ## dump problem every self.dumpFreq iter - self.dumpFreq = dumpFreq - self._doDump = True + # call to problem.dump frequency during apply. + if dump_freq >= 0: + # dump problem every self.dump_freq iter + self.dump_freq = dump_freq else: - self._doDump = False - self.dumpFreq = 100000 + self.dump_freq = None - ## Id for the problem. Used for dump file name. + # Id for the problem. Used for dump file name. if name is None: self.name = 'HySoPPb' else: self.name = name - ## Object to store computational times of lower level functions + # Object to store computational times of lower level functions self.profiler = Profiler(self, self.domain.comm_task) - ## Default file name prefix for dump. + # Default file name prefix for dump. self.filename = str(self.name) self._filedump = self.filename + '_rk_' + str(main_rank) @@ -84,16 +83,15 @@ class Problem(object): # and when variables are initialized (i.e. after a call to pre_setup) # Note : 3 categories of op : computation (stretching, poisson ...), # and data distribution (Redistribute) - self._isReady = False + self._is_ready = False @debug @profile def setup(self): - """ - Prepare operators (create topologies, allocate memories ...) + """Prepare operators (create topologies, allocate memories ...) """ # Set up for 'computational' operators - if not self._isReady: + if not self._is_ready: self.pre_setup() print "Fin setup op" # for v in self.input: @@ -113,61 +111,61 @@ class Problem(object): print ("====") def pre_setup(self): + """Discretization and setup for computational operators + and fields initialization """ - - Partial setup : only for 'computational' operators - (i.e. excluding rendering, data distribution ...) - - Initialize variables. - """ - if self._isReady: - pass + if self._is_ready: + return for op in self.operators: if not isinstance(op, Redistribute) and \ not isinstance(op, DataTransfer): op.discretize() - - for op in self.operators: - if not isinstance(op, Redistribute) and \ - not isinstance(op, DataTransfer): op.setup() if __VERBOSE__ and main_rank == 0: print ("==== Variables initialization ====") - # Build variables list to initialize - # These are operators input variables that are not output of - # previous operators in the operator stack. - # Set the variables input topology as the the topology of the fist - # operator that uses this variable as input. + # Build the list of 'input' variables, that must be initialized. + # 'input' fields are variables that are not output of + # any previous operators in the operator sequence. self.input = [] + + # First, all operators input vars are appended to input list for op in self.operators: for v in op.input: if v not in self.input: self.input.append(v) - for op in self.operators[::-1]: - for v in op.output: - if v in self.input: + + # Then starting from the last op, vars which are 'input' of + # an operator AND output of a previous op are removed + for op in self.operators[-1::-1]: + for v in self.input: + if v in op.output: self.input.remove(v) - for v in op.input: - if not v in self.input: + if v in op.input: self.input.append(v) - self._isReady = True + self._is_ready = True @debug @profile def solve(self): - """ - Solve problem. + """Apply all operators of the problem, for all simulation + time steps + + * init simulation + * for each time step: + * apply all operators + * increment time step + * dump problem (for concerned values, depend on dump_freq) - Performs simulations iterations by calling each - operators of the list until timer ends.\n - At end of time step, call an io step.\n - Displays timings at simulation end. """ + # Init simulation self.simulation.initialize() if main_rank == 0: print ("\n\n Start solving ...") + # Run simulation while not self.simulation.is_over: if main_rank == 0: self.simulation.print_state() @@ -176,11 +174,12 @@ class Problem(object): if __VERBOSE__: print (main_rank, op.name) op.apply(self.simulation) - - testdump = \ - self.simulation.current_iteration % self.dumpFreq is 0 + testdump = False + if self.dump_freq is not None: + testdump = \ + self.simulation.current_iteration % self.dump_freq is 0 self.simulation.advance() - if self._doDump and testdump: + if testdump: self.dump() @debug @@ -221,11 +220,14 @@ class Problem(object): return s def dump(self, filename=None): - """ - Serialize some data of the problem to file + """Serialize some data of the problem to file (only data required for a proper restart, namely fields in self.input and simulation). - @param filename : prefix for output file. Real name = filename_rk_N, + + Parameters + ---------- + filename : string + prefix for output file. Real name = filename_rk_N, N being current process number. If None use default value from problem parameters (self.filename) """ @@ -262,10 +264,9 @@ class Problem(object): op.setup() def setDumpFreq(self, freq): + """set rate of problem.dump call (every 'rate' iteration) + :param : the frequency of output. <0 or None means no output. """ - set rate of problem.dump call (every 'rate' iteration) - @param freq : the frequency of output - """ - self.dumpFreq = freq + self.dump_freq = freq if freq < 0: - self._doDump = False + self.dump_freq = None diff --git a/hysop/problem/problem_tasks.py b/hysop/problem/problem_tasks.py index 2a5b2904b..1acbe8e93 100644 --- a/hysop/problem/problem_tasks.py +++ b/hysop/problem/problem_tasks.py @@ -5,8 +5,7 @@ same tasks. from hysop.constants import debug from hysop import __VERBOSE__ from hysop.problem.problem import Problem -from hysop.operator.redistribute_inter import RedistributeInter -from hysop.operator.redistribute_intra import RedistributeIntra +from hysop.operator.redistribute import RedistributeInter, RedistributeIntra from hysop.operator.redistribute import Redistribute from hysop.gpu.gpu_transfer import DataTransfer from hysop.tools.profiler import profile diff --git a/hysop/problem/tests/test_problem.py b/hysop/problem/tests/test_problem.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/hysop/problem/tests/test_simulation.py b/hysop/problem/tests/test_simulation.py index beb982de0..7528861ca 100755 --- a/hysop/problem/tests/test_simulation.py +++ b/hysop/problem/tests/test_simulation.py @@ -1,18 +1,91 @@ +"""Tests simulation time loop parameters """ -@file test_simulation.py -tests simulation incr and io_utils writer -""" -from hysop.problem.simulation import Simulation +from hysop.problem.simulation import Simulation, eps from hysop.tools.io_utils import Writer, IOParams, IO +from hysop.mpi import main_rank +import numpy as np + + +def run_simu(s): + i = 0 + tref = s.start + while not s.is_over: + assert s.current_iteration == i + assert s.time == tref + s.time_step + tref = s.time + i += 1 + s.advance() + + +def test_simu_default(): + s = Simulation(nb_iter=10) + assert s.time == 0. + assert s.end == 1. + assert np.allclose(s.time_step, 1. / 10., rtol=eps) + s.initialize() + assert np.allclose(s.time, s.time_step) + + run_simu(s) + + assert np.allclose(s.time, s.end) + assert s.current_iteration == 9 + assert s.nb_iter == 10 + + s.finalize() + assert np.allclose(s.time, s.end) + assert s.current_iteration == -1 + assert s.is_over -simu = Simulation(start=0.0, end=1.0, nb_iter=10) + s.initialize() + assert s.current_iteration == 0 + assert not s.is_over + assert np.allclose(s.time, 0.1) + assert s.time == 0.1 + + +def test_simu_2(): + s = Simulation(time_step=0.1, max_iter=5) + assert s.time == 0. + assert s.end == 1. + assert np.allclose(s.time_step, 1. / 10.) + s.initialize() + assert np.allclose(s.time, s.time_step) + + run_simu(s) + + assert s.current_iteration == 4 + assert np.allclose(s.time, s.start + 5 * s.time_step) + + +def test_simu_adapt(): + s = Simulation(time_step=0.1) + s.initialize() + i = 0 + tref = s.start + while not s.is_over: + assert s.current_iteration == i + assert s.time == tref + s.time_step + tref = s.time + if s.current_iteration == 5: + s.update_time_step(s.time_step * 0.5) + s.advance() + i += 1 + + assert np.allclose(s.time, s.end) + assert np.allclose(s.time_step, 0.05) + assert s.current_iteration == 13 def test_simu_incr(): + simu = Simulation(start=0.0, end=1.0, nb_iter=10) io_params = IOParams(filename='temp_test', frequency=2, fileformat=IO.ASCII) wr = Writer(io_params) - assert wr.do_write(simu.current_iteration) + leader = io_params.io_leader + if main_rank == leader: + assert wr.do_write(simu.current_iteration) + else: + assert not wr.do_write(simu.current_iteration) simu.initialize() @@ -20,37 +93,47 @@ def test_simu_incr(): count = 1 while not simu.is_over: - if count % 2 == 0: + if count % 2 == 0 and main_rank == leader: assert wr.do_write(simu.current_iteration) else: assert not wr.do_write(simu.current_iteration) - simu.print_state() simu.advance() count += 1 - assert simu.current_iteration == 10 + assert simu.current_iteration == 9 simu.finalize() - assert wr.do_write(simu.current_iteration) + + if main_rank == leader: + assert wr.do_write(simu.current_iteration) + else: + assert not wr.do_write(simu.current_iteration) def test_simu_incr2(): + simu = Simulation(start=0.0, end=1.0, nb_iter=10) io_params = IOParams(filename='temp_test', frequency=3, fileformat=IO.ASCII) + leader = io_params.io_leader wr = Writer(io_params) - assert wr.do_write(simu.current_iteration) - simu.time_step = 0.10000000001 + if main_rank == leader: + assert wr.do_write(simu.current_iteration) + else: + assert not wr.do_write(simu.current_iteration) + simu.update_time_step(0.10000000001) simu.initialize() assert not wr.do_write(simu.current_iteration) count = 1 while not simu.is_over: - if count % 3 == 0: + if count % 3 == 0 and main_rank == leader: assert wr.do_write(simu.current_iteration) else: assert not wr.do_write(simu.current_iteration) - simu.print_state() simu.advance() count += 1 - assert simu.current_iteration == 10 + assert simu.current_iteration == 9 simu.finalize() - assert wr.do_write(simu.current_iteration) + if main_rank == leader: + assert wr.do_write(simu.current_iteration) + else: + assert not wr.do_write(simu.current_iteration) diff --git a/hysop/problem/tests/test_transport.py b/hysop/problem/tests/test_transport.py index 3b4fbe77b..92d6f4302 100644 --- a/hysop/problem/tests/test_transport.py +++ b/hysop/problem/tests/test_transport.py @@ -56,9 +56,8 @@ def assertion(dim, boxLength, boxMin, nbElem, finalTime, time_step, velo = Field(domain=box, formula=v, vectorize_formula=True, name='Velocity', is_vector=True) advec = Advection(velo, scal, discretization=Discretization(nbElem)) - simu = Simulation(start=0.0, end=finalTime, time_step=time_step, max_iter=1) - print "velo dom ...", id(velo.domain) - print "scal dom ...", id(scal.domain) + simu = Simulation(start=0.0, end=finalTime, + time_step=time_step, max_iter=1) pb = TransportProblem([advec], simu) pb.setup() initial_scalar = npw.copy(scal.discreteFields.values()[0].data[0]) diff --git a/hysop/problem/transport.py b/hysop/problem/transport.py index 8cbfdbe19..7544a6adf 100644 --- a/hysop/problem/transport.py +++ b/hysop/problem/transport.py @@ -1,5 +1,6 @@ -""" -@file transport.py +"""Transport problem + +todo : proper description """ from hysop.problem.problem import Problem from hysop.operator.advection import Advection @@ -11,10 +12,10 @@ class TransportProblem(Problem): Transport problem description. """ def __init__(self, operators, simulation, - dumpFreq=100, name=None): + dump_freq=100, name=None): super(TransportProblem, self).__init__( operators, simulation, - dumpFreq=dumpFreq, name="TransportProblem") + dump_freq=dump_freq, name="TransportProblem") self.advection, self.velocity = None, None for op in self.operators: if isinstance(op, Advection): diff --git a/hysop/tools/problem2dot.py b/hysop/tools/problem2dot.py index 6eab223eb..a052a2f97 100644 --- a/hysop/tools/problem2dot.py +++ b/hysop/tools/problem2dot.py @@ -2,8 +2,7 @@ """ from hysop.operator.advection import Advection -from hysop.operator.redistribute import Redistribute -from hysop.operator.redistribute_inter import RedistributeInter +from hysop.operator.redistribute import Redistribute, RedistributeInter from hysop.mpi import main_rank import pydot colors = [ diff --git a/hysop/tools/tests/test_profiler.py b/hysop/tools/tests/test_profiler.py index 5823d1ec2..370b88969 100755 --- a/hysop/tools/tests/test_profiler.py +++ b/hysop/tools/tests/test_profiler.py @@ -1,11 +1,12 @@ -""" -Unitary tests for hysop.tools.profiler module +"""Unitary tests for hysop.tools.profiler module """ from hysop.tools.profiler import Profiler, profile, FProfiler, ftime from hysop.mpi import main_comm class A_class(object): + """A fake class to be profiled""" + def __init__(self): self.name = 'A_class' self.profiler = Profiler(self, main_comm) @@ -33,16 +34,16 @@ def test_profilers(): a.call() assert len(a.profiler._elems.keys()) == 2 assert a.n == 1 # the function have been called - assert a.profiler['call'].n == 1 + assert a.profiler['call'].nb_calls == 1 a.call() a.call_other() assert len(a.profiler._elems.keys()) == 3 assert a.n == 12 # the call and call_other functions have been called - assert a.profiler['call'].n == 2 - assert a.profiler['call_other'].n == 1 + assert a.profiler['call'].nb_calls == 2 + assert a.profiler['call_other'].nb_calls == 1 a.func() assert len(a.profiler._elems.keys()) == 3 assert a.n == 112 # the call and call_other functions have been called - assert a.profiler['call'].n == 2 - assert a.profiler['call_other'].n == 1 - assert a.profiler['manual'].n == 1 + assert a.profiler['call'].nb_calls == 2 + assert a.profiler['call_other'].nb_calls == 1 + assert a.profiler['manual'].nb_calls == 1 -- GitLab