Skip to content
Snippets Groups Projects
Commit 649a6edc authored by Jean-Matthieu Etancelin's avatar Jean-Matthieu Etancelin Committed by Franck Pérignon
Browse files

Improve multi-gpu advection profiling. Add overlaped communications in diffusion

parent b45621b9
No related branches found
No related tags found
No related merge requests found
......@@ -3,7 +3,8 @@
Diffusion on GPU
"""
from parmepy.constants import debug, np, S_DIR, PARMES_MPI_REAL, ORDERMPI
from parmepy.constants import debug, np, S_DIR, PARMES_MPI_REAL, ORDERMPI, \
PARMES_REAL, ORDER
import parmepy.tools.numpywrappers as npw
from parmepy.operator.discrete.discrete import DiscreteOperator
from parmepy.gpu import cl
......@@ -43,7 +44,7 @@ class GPUDiffusion(DiscreteOperator, GPUOperator):
if alloc:
self.size_global_alloc += self.field.mem_size
self.field_tmp=None
self.field_tmp = field_tmp
topo = self.field.topology
self._cutdir_list = np.where(topo.cutdir)[0].tolist()
......@@ -56,6 +57,9 @@ class GPUDiffusion(DiscreteOperator, GPUOperator):
self._to_recv = [None] * self.dim
self._pitches_host = [None] * self.dim
self._pitches_buff = [None] * self.dim
self._region_size = [None] * self.dim
self._r_orig = [None] * self.dim
self._br_orig = [None] * self.dim
self.mpi_type_diff_l = {}
self.mpi_type_diff_r = {}
self.profiler += FProfiler('comm_diffusion')
......@@ -72,16 +76,24 @@ class GPUDiffusion(DiscreteOperator, GPUOperator):
# _to_recv[..., 0] contains [..., Nz] data (right ghosts)
# _to_recv[..., 1] contains [..., -1] data (left ghosts)
self._to_send[d] = npw.zeros(tuple(shape))
self._to_recv[d] = npw.zeros(tuple(shape))
_to_recv = npw.zeros(tuple(shape))
self.mpi_type_diff_l[d] = PARMES_MPI_REAL.Create_subarray(
shape, shape_b, start_l, order=ORDERMPI)
self.mpi_type_diff_l[d].Commit()
self.mpi_type_diff_r[d] = PARMES_MPI_REAL.Create_subarray(
shape, shape_b, start_r, order=ORDERMPI)
self.mpi_type_diff_r[d].Commit()
self._to_recv_buf[d] = cl.Buffer(
self.cl_env.ctx, cl.mem_flags.READ_WRITE,
size=self._to_recv[d].nbytes)
self._to_recv_buf[d] = self.cl_env.global_allocation(_to_recv)
self._to_recv[d], evt = cl.enqueue_map_buffer(
self.cl_env.queue,
self._to_recv_buf[d],
offset=0,
shape=shape,
dtype=PARMES_REAL,
flags=cl.map_flags.READ | cl.map_flags.WRITE,
is_blocking=False,
order=ORDER)
evt.wait()
self._pitches_host[d] = (int(self._to_send[d][:, 0, 0].nbytes),
int(self._to_send[d][:, :, 0].nbytes))
self._pitches_buff[d] = (int(self.field.data[0][:, 0, 0].nbytes),
......@@ -94,6 +106,29 @@ class GPUDiffusion(DiscreteOperator, GPUOperator):
host_origin=(0, 0, 0),
region=(self._to_recv[d][0, 0, 0].nbytes, )).wait()
self._cl_work_size += self._to_recv[d].nbytes
r_orig = [0, ] * self.dim
br_orig = [0, ] * self.dim
r_orig[d] = self.field.data[0].shape[d] - 1
br_orig[d] = 1
if d == 0:
r_orig[d] *= self._to_send[d][0, 0, 0].nbytes
br_orig[d] *= self._to_send[d][0, 0, 0].nbytes
self._r_orig[d] = tuple(r_orig)
self._br_orig[d] = tuple(br_orig)
l_sl = [slice(None), ] * 3
r_sl = [slice(None), ] * 3
l_sl[d] = slice(0, 1)
r_sl[d] = slice(1, 2)
l_sl = tuple(l_sl)
r_sl = tuple(r_sl)
self._region_size[d] = list(self.field.data[0].shape)
if d == 0:
self._region_size[d][0] = self._to_send[d][0, 0, 0].nbytes
else:
self._region_size[d][0] = self._to_send[d][:, 0, 0].nbytes
self._region_size[d][d] = 1
self._compute = self._compute_diffusion_comm
else:
self._compute = self._compute_diffusion
......@@ -147,74 +182,66 @@ class GPUDiffusion(DiscreteOperator, GPUOperator):
topo = self.field.topology
first_cut_dir = topo.cutdir.tolist().index(True)
wait_evt = []
send_l = [None, ] * self.dim
send_r = [None, ] * self.dim
recv_l = [None, ] * self.dim
recv_r = [None, ] * self.dim
e_l = [None, ] * self.dim
e_r = [None, ] * self.dim
for d in self._cutdir_list:
r_orig = [0, ] * self.dim
br_orig = [0, ] * self.dim
r_orig[d] = self.field.data[0].shape[d] - 1
br_orig[d] = 1
if d == 0:
r_orig[d] *= self._to_send[d][0, 0, 0].nbytes
br_orig[d] *= self._to_send[d][0, 0, 0].nbytes
r_orig = tuple(r_orig)
br_orig = tuple(br_orig)
l_sl = [slice(None), ] * 3
r_sl = [slice(None), ] * 3
l_sl[d] = slice(0, 1)
r_sl[d] = slice(1, 2)
l_sl = tuple(l_sl)
r_sl = tuple(r_sl)
region_size = list(self.field.data[0].shape)
if d == 0:
region_size[0] = self._to_send[d][0, 0, 0].nbytes
else:
region_size[0] = self._to_send[d][:, 0, 0].nbytes
region_size[d] = 1
wait_events = self.field.events
e_l = cl.enqueue_copy(self.cl_env.queue, self._to_send[d],
self.field.gpu_data[0],
host_origin=(0, 0, 0),
buffer_origin=(0, 0, 0),
host_pitches=self._pitches_host[d],
buffer_pitches=self._pitches_buff[d],
region=tuple(region_size),
wait_for=wait_events)
e_r = cl.enqueue_copy(self.cl_env.queue, self._to_send[d],
self.field.gpu_data[0],
host_origin=br_orig, buffer_origin=r_orig,
host_pitches=self._pitches_host[d],
buffer_pitches=self._pitches_buff[d],
region=tuple(region_size),
wait_for=wait_events)
e_l[d] = cl.enqueue_copy(self.cl_env.queue, self._to_send[d],
self.field.gpu_data[0],
host_origin=(0, 0, 0),
buffer_origin=(0, 0, 0),
host_pitches=self._pitches_host[d],
buffer_pitches=self._pitches_buff[d],
region=tuple(self._region_size[d]),
wait_for=wait_events,
is_blocking=False)
e_r[d] = cl.enqueue_copy(self.cl_env.queue, self._to_send[d],
self.field.gpu_data[0],
host_origin=self._br_orig[d],
buffer_origin=self._r_orig[d],
host_pitches=self._pitches_host[d],
buffer_pitches=self._pitches_buff[d],
region=tuple(self._region_size[d]),
wait_for=wait_events,
is_blocking=False)
for d in self._cutdir_list:
# MPI send
r_send = []
r_recv = []
R_rk = topo.neighbours[1, d - first_cut_dir]
L_rk = topo.neighbours[0, d - first_cut_dir]
r_recv.append(self._comm.Irecv(
recv_r[d] = self._comm.Irecv(
[self._to_recv[d], 1, self.mpi_type_diff_l[d]],
source=R_rk, tag=123 + R_rk))
r_recv.append(self._comm.Irecv(
source=R_rk, tag=123 + R_rk + 19 * d)
recv_l[d] = self._comm.Irecv(
[self._to_recv[d], 1, self.mpi_type_diff_r[d]],
source=L_rk, tag=456 + L_rk))
e_l.wait()
e_r.wait()
r_send.append(self._comm.Issend(
source=L_rk, tag=456 + L_rk + 17 * d)
for d in self._cutdir_list:
R_rk = topo.neighbours[1, d - first_cut_dir]
L_rk = topo.neighbours[0, d - first_cut_dir]
e_l[d].wait()
e_r[d].wait()
send_l[d] = self._comm.Issend(
[self._to_send[d], 1, self.mpi_type_diff_l[d]],
dest=L_rk, tag=123 + self._comm_rank))
r_send.append(self._comm.Issend(
dest=L_rk, tag=123 + self._comm_rank + 19 * d)
send_r[d] = self._comm.Issend(
[self._to_send[d], 1, self.mpi_type_diff_r[d]],
dest=R_rk, tag=456 + self._comm_rank))
for r in r_send + r_recv:
r.Wait()
dest=R_rk, tag=456 + self._comm_rank + 17 * d)
for d in self._cutdir_list:
# _to_recv[..., 0] contains [..., Nz] data (right ghosts)
# _to_recv[..., 1] contains [..., -1] data (left ghosts)
send_r[d].Wait()
send_l[d].Wait()
recv_r[d].Wait()
recv_l[d].Wait()
wait_evt.append(cl.enqueue_copy(self.cl_env.queue,
self._to_recv_buf[d],
self._to_recv[d]))
self._to_recv[d],
is_blocking=False))
self.profiler['comm_diffusion'] += MPI.Wtime() - tc
if len(self._cutdir_list) == 1:
......
......@@ -710,6 +710,7 @@ class MultiGPUParticleAdvection(GPUParticleAdvection):
[self._s_l_buff[self._s_buff_block_slice[b]],
self._s_elem_block, PARMES_MPI_REAL],
dest=self._L_rk, tag=333 + self._comm_rank + 17 * b)
ctime_send_l = MPI.Wtime() - ctime
# Fill and get the right buffer
evt_comm_r = self._num_comm_r(wait_evts, dt)
......@@ -725,12 +726,14 @@ class MultiGPUParticleAdvection(GPUParticleAdvection):
is_blocking=False,
wait_for=[evt_comm_r])
# Send the right buffer
ctime = MPI.Wtime()
for b in xrange(self._s_n_blocks):
self._evt_get_r[b].wait()
self._r_send[b] = self._comm.Issend(
[self._s_r_buff[self._s_buff_block_slice[b]],
self._s_elem_block, PARMES_MPI_REAL],
dest=self._R_rk, tag=888 + self._comm_rank + 19 * b)
ctime_send_r = MPI.Wtime() - ctime
# remesh in-domain particles and get left-right layer
evt = self._num_comm(wait_evts, dt)
......@@ -758,13 +761,13 @@ class MultiGPUParticleAdvection(GPUParticleAdvection):
is_blocking=False,
wait_for=self.evt_num_remesh)
self.profiler['comm_cpu_remesh'] += MPI.Wtime() - ctime
ctime = MPI.Wtime()
# Wait MPI transfer of data from left, add them to local data and send back to device
for b in xrange(self._s_n_blocks):
self._r_send[b].Wait()
self._l_recv[b].Wait()
evt_get_locl.wait()
ctime_wait_l = MPI.Wtime() - ctime
calctime = MPI.Wtime()
self._s_locl_buff += self._s_froml_buff
......@@ -781,10 +784,12 @@ class MultiGPUParticleAdvection(GPUParticleAdvection):
is_blocking=False)
# Wait MPI transfer of data from right, add them to local data and send back to device
ctime = MPI.Wtime()
for b in xrange(self._s_n_blocks):
self._l_send[b].Wait()
self._r_recv[b].Wait()
evt_get_locr.wait()
ctime_wait_r = MPI.Wtime() - ctime
calctime = MPI.Wtime()
self._s_locr_buff += self._s_fromr_buff
self.profiler['comm_calc_remesh'] += MPI.Wtime() - calctime
......@@ -805,6 +810,8 @@ class MultiGPUParticleAdvection(GPUParticleAdvection):
self.fields_on_grid[0].events.append(evt_set_locr)
self.fields_on_grid[0].events.append(evt_set_locl)
self.profiler['comm_cpu_remesh'] += ctime_wait_r + ctime_wait_l + \
ctime_send_r + ctime_send_l
if CL_PROFILE:
rmsh_gpu_time = 0.
......@@ -839,13 +846,11 @@ class MultiGPUParticleAdvection(GPUParticleAdvection):
self.transpose_xz, self.transpose_xz_r,
self.num_advec,
self.num_remesh,
self.num_advec_and_remesh]:
self.num_advec_and_remesh,
self.num_remesh_comm_l,
self.num_remesh_comm_r,
self.num_advec_and_remesh_comm_l,
self.num_advec_and_remesh_comm_r]:
if k is not None:
for p in k.profile:
self.profiler += p
if self._is2kernel:
self.profiler += self.num_remesh_comm_l.profile[0]
self.profiler += self.num_remesh_comm_r.profile[0]
else:
self.profiler += self.num_advec_and_remesh_comm_l.profile[0]
self.profiler += self.num_advec_and_remesh_comm_r.profile[0]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment