From 649a6edc44342771cf94002177d3160efe02fde7 Mon Sep 17 00:00:00 2001
From: Jean-Matthieu Etancelin <jean-matthieu.etancelin@univ-reims.fr>
Date: Wed, 15 Oct 2014 16:20:31 +0200
Subject: [PATCH] Improve multi-gpu advection profiling. Add overlaped
 communications in diffusion

---
 HySoP/hysop/gpu/gpu_diffusion.py              | 145 +++++++++++-------
 .../hysop/gpu/multi_gpu_particle_advection.py |  23 +--
 2 files changed, 100 insertions(+), 68 deletions(-)

diff --git a/HySoP/hysop/gpu/gpu_diffusion.py b/HySoP/hysop/gpu/gpu_diffusion.py
index 12a5a515b..4131d9d28 100644
--- a/HySoP/hysop/gpu/gpu_diffusion.py
+++ b/HySoP/hysop/gpu/gpu_diffusion.py
@@ -3,7 +3,8 @@
 
 Diffusion on GPU
 """
-from parmepy.constants import debug, np, S_DIR, PARMES_MPI_REAL, ORDERMPI
+from parmepy.constants import debug, np, S_DIR, PARMES_MPI_REAL, ORDERMPI, \
+    PARMES_REAL, ORDER
 import parmepy.tools.numpywrappers as npw
 from parmepy.operator.discrete.discrete import DiscreteOperator
 from parmepy.gpu import cl
@@ -43,7 +44,7 @@ class GPUDiffusion(DiscreteOperator, GPUOperator):
         if alloc:
             self.size_global_alloc += self.field.mem_size
 
-        self.field_tmp=None
+        self.field_tmp = field_tmp
 
         topo = self.field.topology
         self._cutdir_list = np.where(topo.cutdir)[0].tolist()
@@ -56,6 +57,9 @@ class GPUDiffusion(DiscreteOperator, GPUOperator):
             self._to_recv = [None] * self.dim
             self._pitches_host = [None] * self.dim
             self._pitches_buff = [None] * self.dim
+            self._region_size = [None] * self.dim
+            self._r_orig = [None] * self.dim
+            self._br_orig = [None] * self.dim
             self.mpi_type_diff_l = {}
             self.mpi_type_diff_r = {}
             self.profiler += FProfiler('comm_diffusion')
@@ -72,16 +76,24 @@ class GPUDiffusion(DiscreteOperator, GPUOperator):
                 # _to_recv[..., 0] contains [..., Nz] data (right ghosts)
                 # _to_recv[..., 1] contains [..., -1] data (left ghosts)
                 self._to_send[d] = npw.zeros(tuple(shape))
-                self._to_recv[d] = npw.zeros(tuple(shape))
+                _to_recv = npw.zeros(tuple(shape))
                 self.mpi_type_diff_l[d] = PARMES_MPI_REAL.Create_subarray(
                     shape, shape_b, start_l, order=ORDERMPI)
                 self.mpi_type_diff_l[d].Commit()
                 self.mpi_type_diff_r[d] = PARMES_MPI_REAL.Create_subarray(
                     shape, shape_b, start_r, order=ORDERMPI)
                 self.mpi_type_diff_r[d].Commit()
-                self._to_recv_buf[d] = cl.Buffer(
-                    self.cl_env.ctx, cl.mem_flags.READ_WRITE,
-                    size=self._to_recv[d].nbytes)
+                self._to_recv_buf[d] = self.cl_env.global_allocation(_to_recv)
+                self._to_recv[d], evt = cl.enqueue_map_buffer(
+                    self.cl_env.queue,
+                    self._to_recv_buf[d],
+                    offset=0,
+                    shape=shape,
+                    dtype=PARMES_REAL,
+                    flags=cl.map_flags.READ | cl.map_flags.WRITE,
+                    is_blocking=False,
+                    order=ORDER)
+                evt.wait()
                 self._pitches_host[d] = (int(self._to_send[d][:, 0, 0].nbytes),
                                          int(self._to_send[d][:, :, 0].nbytes))
                 self._pitches_buff[d] = (int(self.field.data[0][:, 0, 0].nbytes),
@@ -94,6 +106,29 @@ class GPUDiffusion(DiscreteOperator, GPUOperator):
                     host_origin=(0, 0, 0),
                     region=(self._to_recv[d][0, 0, 0].nbytes, )).wait()
                 self._cl_work_size += self._to_recv[d].nbytes
+
+                r_orig = [0, ] * self.dim
+                br_orig = [0, ] * self.dim
+                r_orig[d] = self.field.data[0].shape[d] - 1
+                br_orig[d] = 1
+                if d == 0:
+                    r_orig[d] *= self._to_send[d][0, 0, 0].nbytes
+                    br_orig[d] *= self._to_send[d][0, 0, 0].nbytes
+                self._r_orig[d] = tuple(r_orig)
+                self._br_orig[d] = tuple(br_orig)
+                l_sl = [slice(None), ] * 3
+                r_sl = [slice(None), ] * 3
+                l_sl[d] = slice(0, 1)
+                r_sl[d] = slice(1, 2)
+                l_sl = tuple(l_sl)
+                r_sl = tuple(r_sl)
+                self._region_size[d] = list(self.field.data[0].shape)
+                if d == 0:
+                    self._region_size[d][0] = self._to_send[d][0, 0, 0].nbytes
+                else:
+                    self._region_size[d][0] = self._to_send[d][:, 0, 0].nbytes
+                    self._region_size[d][d] = 1
+
             self._compute = self._compute_diffusion_comm
         else:
             self._compute = self._compute_diffusion
@@ -147,74 +182,66 @@ class GPUDiffusion(DiscreteOperator, GPUOperator):
         topo = self.field.topology
         first_cut_dir = topo.cutdir.tolist().index(True)
         wait_evt = []
+        send_l = [None, ] * self.dim
+        send_r = [None, ] * self.dim
+        recv_l = [None, ] * self.dim
+        recv_r = [None, ] * self.dim
+        e_l = [None, ] * self.dim
+        e_r = [None, ] * self.dim
         for d in self._cutdir_list:
-            r_orig = [0, ] * self.dim
-            br_orig = [0, ] * self.dim
-            r_orig[d] = self.field.data[0].shape[d] - 1
-            br_orig[d] = 1
-            if d == 0:
-                r_orig[d] *= self._to_send[d][0, 0, 0].nbytes
-                br_orig[d] *= self._to_send[d][0, 0, 0].nbytes
-            r_orig = tuple(r_orig)
-            br_orig = tuple(br_orig)
-            l_sl = [slice(None), ] * 3
-            r_sl = [slice(None), ] * 3
-            l_sl[d] = slice(0, 1)
-            r_sl[d] = slice(1, 2)
-            l_sl = tuple(l_sl)
-            r_sl = tuple(r_sl)
-
-            region_size = list(self.field.data[0].shape)
-            if d == 0:
-                region_size[0] = self._to_send[d][0, 0, 0].nbytes
-            else:
-                region_size[0] = self._to_send[d][:, 0, 0].nbytes
-                region_size[d] = 1
-
             wait_events = self.field.events
-            e_l = cl.enqueue_copy(self.cl_env.queue, self._to_send[d],
-                                  self.field.gpu_data[0],
-                                  host_origin=(0, 0, 0),
-                                  buffer_origin=(0, 0, 0),
-                                  host_pitches=self._pitches_host[d],
-                                  buffer_pitches=self._pitches_buff[d],
-                                  region=tuple(region_size),
-                                  wait_for=wait_events)
-            e_r = cl.enqueue_copy(self.cl_env.queue, self._to_send[d],
-                                  self.field.gpu_data[0],
-                                  host_origin=br_orig, buffer_origin=r_orig,
-                                  host_pitches=self._pitches_host[d],
-                                  buffer_pitches=self._pitches_buff[d],
-                                  region=tuple(region_size),
-                                  wait_for=wait_events)
+            e_l[d] = cl.enqueue_copy(self.cl_env.queue, self._to_send[d],
+                                     self.field.gpu_data[0],
+                                     host_origin=(0, 0, 0),
+                                     buffer_origin=(0, 0, 0),
+                                     host_pitches=self._pitches_host[d],
+                                     buffer_pitches=self._pitches_buff[d],
+                                     region=tuple(self._region_size[d]),
+                                     wait_for=wait_events,
+                                     is_blocking=False)
+            e_r[d] = cl.enqueue_copy(self.cl_env.queue, self._to_send[d],
+                                     self.field.gpu_data[0],
+                                     host_origin=self._br_orig[d],
+                                     buffer_origin=self._r_orig[d],
+                                     host_pitches=self._pitches_host[d],
+                                     buffer_pitches=self._pitches_buff[d],
+                                     region=tuple(self._region_size[d]),
+                                     wait_for=wait_events,
+                                     is_blocking=False)
 
+        for d in self._cutdir_list:
             # MPI send
-            r_send = []
-            r_recv = []
             R_rk = topo.neighbours[1, d - first_cut_dir]
             L_rk = topo.neighbours[0, d - first_cut_dir]
-            r_recv.append(self._comm.Irecv(
+            recv_r[d] = self._comm.Irecv(
                 [self._to_recv[d], 1, self.mpi_type_diff_l[d]],
-                source=R_rk, tag=123 + R_rk))
-            r_recv.append(self._comm.Irecv(
+                source=R_rk, tag=123 + R_rk + 19 * d)
+            recv_l[d] = self._comm.Irecv(
                 [self._to_recv[d], 1, self.mpi_type_diff_r[d]],
-                source=L_rk, tag=456 + L_rk))
-            e_l.wait()
-            e_r.wait()
-            r_send.append(self._comm.Issend(
+                source=L_rk, tag=456 + L_rk + 17 * d)
+        for d in self._cutdir_list:
+            R_rk = topo.neighbours[1, d - first_cut_dir]
+            L_rk = topo.neighbours[0, d - first_cut_dir]
+            e_l[d].wait()
+            e_r[d].wait()
+            send_l[d] = self._comm.Issend(
                 [self._to_send[d], 1, self.mpi_type_diff_l[d]],
-                dest=L_rk, tag=123 + self._comm_rank))
-            r_send.append(self._comm.Issend(
+                dest=L_rk, tag=123 + self._comm_rank + 19 * d)
+            send_r[d] = self._comm.Issend(
                 [self._to_send[d], 1, self.mpi_type_diff_r[d]],
-                dest=R_rk, tag=456 + self._comm_rank))
-            for r in r_send + r_recv:
-                r.Wait()
+                dest=R_rk, tag=456 + self._comm_rank + 17 * d)
 
+        for d in self._cutdir_list:
             # _to_recv[..., 0] contains [..., Nz] data (right ghosts)
             # _to_recv[..., 1] contains [..., -1] data (left ghosts)
+            send_r[d].Wait()
+            send_l[d].Wait()
+            recv_r[d].Wait()
+            recv_l[d].Wait()
             wait_evt.append(cl.enqueue_copy(self.cl_env.queue,
                                             self._to_recv_buf[d],
-                                            self._to_recv[d]))
+                                            self._to_recv[d],
+                                            is_blocking=False))
         self.profiler['comm_diffusion'] += MPI.Wtime() - tc
 
         if len(self._cutdir_list) == 1:
diff --git a/HySoP/hysop/gpu/multi_gpu_particle_advection.py b/HySoP/hysop/gpu/multi_gpu_particle_advection.py
index 7ecc839c8..b05f17395 100644
--- a/HySoP/hysop/gpu/multi_gpu_particle_advection.py
+++ b/HySoP/hysop/gpu/multi_gpu_particle_advection.py
@@ -710,6 +710,7 @@ class MultiGPUParticleAdvection(GPUParticleAdvection):
                 [self._s_l_buff[self._s_buff_block_slice[b]],
                  self._s_elem_block, PARMES_MPI_REAL],
                 dest=self._L_rk, tag=333 + self._comm_rank + 17 * b)
+        ctime_send_l = MPI.Wtime() - ctime
 
         # Fill and get the right buffer
         evt_comm_r = self._num_comm_r(wait_evts, dt)
@@ -725,12 +726,14 @@ class MultiGPUParticleAdvection(GPUParticleAdvection):
                 is_blocking=False,
                 wait_for=[evt_comm_r])
         # Send the right buffer
+        ctime = MPI.Wtime()
         for b in xrange(self._s_n_blocks):
             self._evt_get_r[b].wait()
             self._r_send[b] = self._comm.Issend(
                 [self._s_r_buff[self._s_buff_block_slice[b]],
                  self._s_elem_block, PARMES_MPI_REAL],
                 dest=self._R_rk, tag=888 + self._comm_rank + 19 * b)
+        ctime_send_r = MPI.Wtime() - ctime
 
         # remesh in-domain particles and get left-right layer
         evt = self._num_comm(wait_evts, dt)
@@ -758,13 +761,13 @@ class MultiGPUParticleAdvection(GPUParticleAdvection):
             is_blocking=False,
             wait_for=self.evt_num_remesh)
 
-        self.profiler['comm_cpu_remesh'] += MPI.Wtime() - ctime
-
+        ctime = MPI.Wtime()
         # Wait MPI transfer of data from left, add them to local data and send back to device
         for b in xrange(self._s_n_blocks):
             self._r_send[b].Wait()
             self._l_recv[b].Wait()
         evt_get_locl.wait()
+        ctime_wait_l = MPI.Wtime() - ctime
 
         calctime = MPI.Wtime()
         self._s_locl_buff += self._s_froml_buff
@@ -781,10 +784,12 @@ class MultiGPUParticleAdvection(GPUParticleAdvection):
             is_blocking=False)
 
         # Wait MPI transfer of data from right, add them to local data and send back to device
+        ctime = MPI.Wtime()
         for b in xrange(self._s_n_blocks):
             self._l_send[b].Wait()
             self._r_recv[b].Wait()
         evt_get_locr.wait()
+        ctime_wait_r = MPI.Wtime() - ctime
         calctime = MPI.Wtime()
         self._s_locr_buff += self._s_fromr_buff
         self.profiler['comm_calc_remesh'] += MPI.Wtime() - calctime
@@ -805,6 +810,8 @@ class MultiGPUParticleAdvection(GPUParticleAdvection):
 
         self.fields_on_grid[0].events.append(evt_set_locr)
         self.fields_on_grid[0].events.append(evt_set_locl)
+        self.profiler['comm_cpu_remesh'] += ctime_wait_r + ctime_wait_l + \
+            ctime_send_r + ctime_send_l
 
         if CL_PROFILE:
             rmsh_gpu_time = 0.
@@ -839,13 +846,11 @@ class MultiGPUParticleAdvection(GPUParticleAdvection):
                   self.transpose_xz, self.transpose_xz_r,
                   self.num_advec,
                   self.num_remesh,
-                  self.num_advec_and_remesh]:
+                  self.num_advec_and_remesh,
+                  self.num_remesh_comm_l,
+                  self.num_remesh_comm_r,
+                  self.num_advec_and_remesh_comm_l,
+                  self.num_advec_and_remesh_comm_r]:
             if k is not None:
                 for p in k.profile:
                     self.profiler += p
-        if self._is2kernel:
-            self.profiler += self.num_remesh_comm_l.profile[0]
-            self.profiler += self.num_remesh_comm_r.profile[0]
-        else:
-            self.profiler += self.num_advec_and_remesh_comm_l.profile[0]
-            self.profiler += self.num_advec_and_remesh_comm_r.profile[0]
-- 
GitLab