From 8c0c77a7956eaad909d31a774302c9c6eab1a7cd Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Keck <Jean-Baptiste.Keck@imag.fr> Date: Fri, 9 Mar 2018 17:20:35 +0100 Subject: [PATCH] rect views --- hysop/backend/device/opencl/opencl_array.py | 32 +- .../opencl/opencl_copy_kernel_launchers.py | 25 +- .../device/opencl/opencl_kernel_launcher.py | 123 +++-- .../operator/directional/advection_dir.py | 77 +--- hysop/backend/host/host_array_backend.py | 13 +- hysop/fields/cartesian_discrete_field.py | 22 +- hysop/fields/ghost_exchangers.py | 421 ++++++++++++------ hysop/fields/tests/test_cartesian.py | 29 +- hysop/fields/tests/test_cartesian.sh | 2 +- 9 files changed, 447 insertions(+), 297 deletions(-) diff --git a/hysop/backend/device/opencl/opencl_array.py b/hysop/backend/device/opencl/opencl_array.py index df0d27932..a3fddfe21 100644 --- a/hysop/backend/device/opencl/opencl_array.py +++ b/hysop/backend/device/opencl/opencl_array.py @@ -1,7 +1,7 @@ import numpy as np -from hysop.tools.types import check_instance +from hysop.tools.types import check_instance, first_not_None from hysop.backend.device.opencl import cl, clArray, clTools from hysop.backend.device.opencl.opencl_env import OpenClEnvironment from hysop.backend.device.opencl.opencl_array_backend import OpenClArrayBackend @@ -115,7 +115,21 @@ class OpenClArray(Array): Returns a HostArray, view or copy of this array. """ queue = self.backend.check_queue(queue) - host_array = self._call('get', queue=queue, ary=ary, async=async) + if self.flags.forc: + host_array = self._call('get', queue=queue, ary=ary, async=async) + else: + from hysop.backend.device.opencl.opencl_copy_kernel_launchers import \ + OpenClCopyBufferRectLauncher + assert not async + if (ary is not None): + host_array = ary + else: + host_array = self.backend.host_array_backend.empty_like(self) + kl = OpenClCopyBufferRectLauncher.from_slices(varname='buffer', src=self, + dst=host_array) + evt = kl(queue=queue) + evt.wait() + if handle: return host_array._handle if (host_array.size == 1): @@ -125,7 +139,6 @@ class OpenClArray(Array): return host_array._handle[0] return host_array - # event managment def events(self): """ @@ -286,13 +299,22 @@ class OpenClArray(Array): queue=queue, async=async, **kwds) def setitem(self, subscript, value, queue=None): + queue = first_not_None(queue, self.default_queue) if np.isscalar(value): a = self[subscript] if (a is not None): a.fill(value=value, queue=queue) else: - self.handle.setitem(subscript=subscript, value=value, - queue=queue) + try: + self.handle.setitem(subscript=subscript, value=value, + queue=queue) + except ValueError: + from hysop.backend.device.opencl.opencl_copy_kernel_launchers import \ + OpenClCopyBufferRectLauncher + kl = OpenClCopyBufferRectLauncher.from_slices(varname='buffer', src=value, + dst=self, dst_slices=subscript) + evt = kl(queue=queue) + evt.wait() def __setitem__(self, *args): self.setitem(*args) diff --git a/hysop/backend/device/opencl/opencl_copy_kernel_launchers.py b/hysop/backend/device/opencl/opencl_copy_kernel_launchers.py index 5d18e1b2c..7835d6845 100644 --- a/hysop/backend/device/opencl/opencl_copy_kernel_launchers.py +++ b/hysop/backend/device/opencl/opencl_copy_kernel_launchers.py @@ -200,7 +200,8 @@ class OpenClCopyDevice2DeviceLauncher(OpenClCopyBufferLauncher): check_instance(dst_device_offset, (int, np.integer), allow_none=True) check_instance(byte_count, (int, np.integer), allow_none=True) assert (src_nbytes is None) or (dst_nbytes is None) or (src_nbytes == dst_nbytes) - super(OpenClCopyDevice2DeviceLauncher, self).__init__(varname=varname, src=src, dst=dst, + super(OpenClCopyDevice2DeviceLauncher, self).__init__(varname=varname, + src=src, dst=dst, src_device_offset=src_device_offset, dst_device_offset=dst_device_offset, byte_count=byte_count) @@ -331,21 +332,26 @@ class OpenClCopyBufferRectLauncher(OpenClCopyKernelLauncher): def _compute_region(cls, a, indices): # compute nelems and parameters check_instance(indices, tuple, values=tuple, size=a.ndim) + start_offset = 0 if isinstance(a, (np.ndarray,)): data = a elif isinstance(a, (HostArray,)): data = a.handle else: - data = a.data + try: + data = a.data + except clArray.ArrayHasOffsetError: + data = a.base_data + start_offset = a.offset if isinstance(a, Array): a = a.handle shape = a.shape strides = a.strides dtype = a.dtype - estart = tuple( idx[0] for idx in indices ) - estop = tuple( idx[1] for idx in indices ) - estep = tuple( idx[2] for idx in indices ) + estart = tuple( idx[0] for idx in indices ) + estop = tuple( idx[1] for idx in indices ) + estep = tuple( idx[2] for idx in indices ) assert len(shape) == len(strides) == len(estep) == len(estart) == len(estop) _estart, _estop, _estep = (npw.asintegerarray(_) for _ in (estart, estop, estep)) @@ -379,9 +385,8 @@ class OpenClCopyBufferRectLauncher(OpenClCopyKernelLauncher): assert pitches[-1] == dtype.itemsize pitches = pitches[:-1] region[-1] *= dtype.itemsize - origin[-1] *= dtype.itemsize + origin[-1] *= dtype.itemsize + start_offset - # ptr+=offset return data, region, origin, pitches @@ -411,8 +416,10 @@ class OpenClCopyBufferRectLauncher(OpenClCopyKernelLauncher): dst.shape, dst.dtype, dst_slices, '{}', '{}') - src_slices, src_dtype, src_nelems, src_bytes, src_indices = cls._format_slices(src, src_slices) - dst_slices, dst_dtype, dst_nelems, dst_bytes, dst_indices = cls._format_slices(dst, dst_slices) + src_slices, src_dtype, src_nelems, src_bytes, src_indices = \ + cls._format_slices(src, src_slices) + dst_slices, dst_dtype, dst_nelems, dst_bytes, dst_indices = \ + cls._format_slices(dst, dst_slices) msg0 = msg0.format(src_slices, dst_slices) if (src_bytes != dst_bytes): diff --git a/hysop/backend/device/opencl/opencl_kernel_launcher.py b/hysop/backend/device/opencl/opencl_kernel_launcher.py index 694851313..ceb7709eb 100644 --- a/hysop/backend/device/opencl/opencl_kernel_launcher.py +++ b/hysop/backend/device/opencl/opencl_kernel_launcher.py @@ -37,7 +37,8 @@ class OpenClKernelListLauncher(object): def push_copy_host_device(self, varname, src, dst, src_device_offset=None, dst_device_offset=None, byte_count=None): """Shortcut for OpenClCopyBuffer kernels creation.""" - from hysop.backend.device.opencl.opencl_copy_kernel_launchers import OpenClCopyBufferLauncher + from hysop.backend.device.opencl.opencl_copy_kernel_launchers import \ + OpenClCopyBufferLauncher kernel = OpenClCopyBufferLauncher(varname=varname, src=src, dst=dst, byte_count=byte_count, src_device_offset=src_device_offset, dst_device_offset=dst_device_offset) @@ -46,7 +47,8 @@ class OpenClKernelListLauncher(object): def push_copy_host_to_device(self, varname, src, dst, dst_device_offset=None): """Shortcut for OpenClCopyHost2Device kernels creation.""" - from hysop.backend.device.opencl.opencl_copy_kernel_launchers import OpenClCopyHost2DeviceLauncher + from hysop.backend.device.opencl.opencl_copy_kernel_launchers import \ + OpenClCopyHost2DeviceLauncher kernel = OpenClCopyHost2DeviceLauncher(varname=varname, src=src, dst=dst, dst_device_offset=dst_device_offset) self.push_kernels(kernel) @@ -54,7 +56,8 @@ class OpenClKernelListLauncher(object): def push_copy_device_to_host(self, varname, src, dst, src_device_offset=None): """Shortcut for OpenClCopyDevice2Host kernels creation.""" - from hysop.backend.device.opencl.opencl_copy_kernel_launchers import OpenClCopyDevice2HostLauncher + from hysop.backend.device.opencl.opencl_copy_kernel_launchers import \ + OpenClCopyDevice2HostLauncher kernel = OpenClCopyDevice2HostLauncher(varname=varname, src=src, dst=dst, src_device_offset=src_device_offset) @@ -64,7 +67,8 @@ class OpenClKernelListLauncher(object): def push_copy_device_to_device(self, varname, src, dst, src_device_offset=None, dst_device_offset=None, byte_count=None): """Shortcut for OpenClCopyDevice2Device kernels creation.""" - from hysop.backend.device.opencl.opencl_copy_kernel_launchers import OpenClCopyDevice2DeviceLauncher + from hysop.backend.device.opencl.opencl_copy_kernel_launchers import\ + OpenClCopyDevice2DeviceLauncher kernel = OpenClCopyDevice2DeviceLauncher(varname=varname, src=src, dst=dst, byte_count=byte_count, src_device_offset=src_device_offset, dst_device_offset=dst_device_offset) @@ -76,24 +80,28 @@ class OpenClKernelListLauncher(object): Push OpenClKernelLaunchers into the list. None values are ignored for convenience. """ - for kernel in kernel_launchers: - if (kernel is None): + for launcher in kernel_launchers: + if (launcher is None): pass - elif isinstance(kernel, OpenClKernelLauncherI): - if not kernel.global_size_configured(): + elif isinstance(launcher, LauncherI): + if not launcher.global_size_configured(): msg='OpenClKernelLauncher {} global_work_size has not been configured.' - msg=msg.format(kernel.name) + msg=msg.format(launcher.name) raise RuntimeError(msg) - if isinstance(kernel, OpenClParametrizedKernelLauncher): - parameters = {k: v[1] for (k,v) in kernel.parameters_map.iteritems()} - self._update_parameters_from_parametrized_kernel(kernel, parameters) - self._kernels += (kernel,) - elif isinstance(kernel, OpenClKernelListLauncher): - self._update_parameters_from_kernel_list_launcher(kernel) - self._kernels += kernel._kernels + if isinstance(launcher, OpenClParametrizedKernelLauncher): + parameters = {k: v[1] for (k,v) in launcher.parameters_map.iteritems()} + self._update_parameters_from_parametrized_kernel(launcher, parameters) + elif isinstance(launcher, HostLauncherI): + parameters = launcher.parameters() + self._update_parameters_from_parametrized_kernel(launcher, parameters) + self._kernels += (launcher,) + elif isinstance(launcher, OpenClKernelListLauncher): + self._update_parameters_from_kernel_list_launcher(launcher) + self._kernels += launcher._kernels else: - msg='Expected an OpenClKernelLauncher or a OpenClKernelListLauncher but got a {}.' - msg=msg.format(type(kernel)) + msg='Expected an OpenClKernelLauncher or a OpenClKernelListLauncher ' + msg+='but got a {}.' + msg=msg.format(type(launcher)) raise TypeError(msg) return self @@ -136,8 +144,11 @@ class OpenClKernelListLauncher(object): return evt def _update_parameters_from_parametrized_kernel(self, kernel, parameters): - """Update parameters of this kernel list launcher from a OpenClParametrizedKernelLauncher.""" - check_instance(kernel, OpenClParametrizedKernelLauncher) + """ + Update parameters of this kernel list launcher from a + OpenClParametrizedKernelLauncher (or HostLauncherI). + """ + check_instance(kernel, (OpenClParametrizedKernelLauncher, HostLauncherI)) check_instance(parameters, dict, keys=str, values=(type,npw.dtype)) sparameters = self._parameters for (pname, ptype) in parameters.iteritems(): @@ -196,15 +207,12 @@ class OpenClKernelListLauncher(object): statistics = property(_get_statistics) - -class OpenClKernelLauncherI(object): +class LauncherI(object): """ - Interface for any object that has the ability to enqueue a OpenCL kernel - without extra arguments. + Interface for any object that has the ability to be a launcher. """ - __metaclass__ = ABCMeta - + def __init__(self, name, **kwds): """ Create a OpenClKernelLauncher. @@ -216,7 +224,7 @@ class OpenClKernelLauncherI(object): kwds: dict Base class arguments. """ - super(OpenClKernelLauncherI, self).__init__(**kwds) + super(LauncherI, self).__init__(**kwds) check_instance(name, str) self._name = name self._events = () @@ -240,10 +248,6 @@ class OpenClKernelLauncherI(object): self._kernel_statistics += stats self._events = running return self._kernel_statistics - - name = property(_get_name) - events = property(_get_events) - statistics = property(_get_statistics) def _register_event(self, queue, evt): """ @@ -252,7 +256,21 @@ class OpenClKernelLauncherI(object): """ if (cl.command_queue_properties.PROFILING_ENABLE & queue.properties): self._events += (evt,) + + name = property(_get_name) + events = property(_get_events) + statistics = property(_get_statistics) + @abstractmethod + def __call__(self, queue=None, wait_for=None, **kwds): + """ + Launch with a specific queue. + Wait for wait_for events before computing. + If queue has profiling enabled, events are pushed into a local list of events + to compute kernel statistics when self.statistics is fetched. + """ + pass + @abstractmethod def global_size_configured(self): """ @@ -261,6 +279,12 @@ class OpenClKernelLauncherI(object): """ pass +class OpenClKernelLauncherI(LauncherI): + """ + Interface for any object that has the ability to enqueue a OpenCL kernel + without extra arguments. + """ + @abstractmethod def __call__(self, queue=None, wait_for=None, global_work_size=None, local_work_size=None, **kwds): @@ -277,25 +301,35 @@ class OpenClKernelLauncherI(object): if not __KERNEL_DEBUG__ or __TRACE_KERNELS__: return if isinstance(arg_type, npw.dtype) or \ - (isinstance(arg_type, tuple) and len(arg_type)==1 and isinstance(arg_type[0], npw.dtype)): + (isinstance(arg_type, tuple) and len(arg_type)==1 + and isinstance(arg_type[0], npw.dtype)): dtype = arg_type if isinstance(arg_type, npw.dtype) else arg_type[0] good = isinstance(arg, npw.ndarray) if not good: - msg='Argument {}::{} at id {} at does not match required type np.ndarray, got {} instead.' + msg='Argument {}::{} at id {} at does not match required type np.ndarray, ' + msg+='got {} instead.' msg=msg.format(self.name, arg_name, arg_id, dtype, type(arg)) raise RuntimeError(msg) good = (arg.dtype==dtype) if not good: - msg='Argument {}::{} at id {} at does not match required dtype {}, got {} instead.' + msg='Argument {}::{} at id {} at does not match required dtype {}, ' + msg+='got {} instead.' msg=msg.format(self.name, arg_name, arg_id, dtype, arg.dtype) raise RuntimeError(msg) else: good = isinstance(arg, arg_type) if not good: - msg='Argument {}::{} at id {} at does not match required type {}, got {} instead.' + msg='Argument {}::{} at id {} at does not match required type {}, ' + msg+='got {} instead.' msg=msg.format(self.name, arg_name, arg_id, arg_type, type(arg)) raise RuntimeError(msg) +class HostLauncherI(LauncherI): + def parameters(self): + return {} + + def global_size_configured(self): + return True class OpenClKernelLauncher(OpenClKernelLauncherI): """ @@ -475,8 +509,10 @@ class OpenClParametrizedKernelLauncher(OpenClKernelLauncher): pindexes = tuple(x[0] for x in parameters_map.values()) aindexes = tuple(x[0] for x in args_list) - assert len(pindexes) == len(set(pindexes)), 'Arguments at same position: {}'.format(parameters_map) - assert len(aindexes) == len(set(aindexes)), 'Arguments at same position: {}'.format(parameters_map) + assert len(pindexes) == len(set(pindexes)), \ + 'Arguments at same position: {}'.format(parameters_map) + assert len(aindexes) == len(set(aindexes)), \ + 'Arguments at same position: {}'.format(parameters_map) if set(pindexes).intersection(aindexes): msg='Overlap between parameters indexes and default argument indexes.' msg+='\nparameters: {}\ndefault args: {}'.format(parameters_map, args_list) @@ -570,7 +606,8 @@ class OpenClIterativeKernelLauncher(OpenClParametrizedKernelLauncher): """ check_instance(args_list, tuple, values=tuple) check_instance(parameters_map, dict, keys=str, values=tuple) - check_instance(iterated_parameters, dict, keys=str, values=OpenClKernelParameterGenerator) + check_instance(iterated_parameters, dict, keys=str, + values=OpenClKernelParameterGenerator) iterated_parameter_arg_ids = () iterated_parameter_arg_names = () @@ -630,11 +667,14 @@ class OpenClIterativeKernelLauncher(OpenClParametrizedKernelLauncher): arg_names = self.iterated_parameter_arg_names for i,args in enumerate(self.iter_parameters()): if __KERNEL_DEBUG__ or __TRACE_KERNELS__: - apply_msg = self._apply_msg.format('{}', global_work_size, local_work_size, '{}') - print apply_msg.format(' | ', ', '.join('{}={}'.format(pname, pval) for (pname,pval) in zip(arg_names, args))) + apply_msg = self._apply_msg.format('{}', global_work_size, local_work_size, + '{}') + print apply_msg.format(' | ', ', '.join('{}={}'.format(pname, pval) + for (pname,pval) in zip(arg_names, args))) - for arg_id, arg_name, arg_type, arg_value in zip(arg_ids, arg_names, arg_types, args): + for arg_id, arg_name, arg_type, arg_value in zip(arg_ids, arg_names, + arg_types, args): self.check_kernel_arg(arg_value, arg_id, arg_name, arg_types) kernel.set_arg(arg_id, arg_value) @@ -651,3 +691,4 @@ class OpenClIterativeKernelLauncher(OpenClParametrizedKernelLauncher): else: evt = None return evt + diff --git a/hysop/backend/device/opencl/operator/directional/advection_dir.py b/hysop/backend/device/opencl/operator/directional/advection_dir.py index 695663759..bd273551a 100644 --- a/hysop/backend/device/opencl/operator/directional/advection_dir.py +++ b/hysop/backend/device/opencl/operator/directional/advection_dir.py @@ -170,79 +170,14 @@ class OpenClDirectionalAdvection(DirectionalAdvectionBase, OpenClDirectionalOper def _collect_redistribute_kernels(self, work): remesh_ghosts = self.remesh_ghosts - dsinputs = self.dadvected_fields_in dsoutputs = self.dadvected_fields_out + kl = OpenClKernelListLauncher(name='accumulate_and_exchange_ghosts') for sout in dsoutputs.values(): - all_outer_ghosts = sout.get_outer_ghost_slices() - all_inner_ghosts = sout.get_inner_ghost_slices() - remesh_outer_ghosts = sout.get_outer_ghost_slices(remesh_ghosts) - remesh_inner_ghosts = sout.get_inner_ghost_slices(remesh_ghosts) - nprocs = sout.mesh.proc_shape - ghosts = sout.mesh.ghosts - - (lhs, rhs) = work.get_buffer(self, sout.name+'_ghost_layers') - - kl = OpenClKernelListLauncher(name='accumulate_and_exchange_ghosts') - for i in xrange(sout.nb_components): - log, rog, oshape = remesh_outer_ghosts[-1] - lig, rig, ishape = remesh_inner_ghosts[-1] - si = sout[i] - varname = sout.name+'_{}'.format(i) - assert (ishape==oshape).all() - if (nprocs[-1] == 1): - # accumulate right outer ghosts to left inner ghosts - kl += OpenClCopyBufferRectLauncher.from_slices(varname=varname+'_left_inner_ghosts', - src=si, src_slices=lig, dst=lhs) - kl += OpenClCopyBufferRectLauncher.from_slices(varname=varname+'_right_outer_ghosts', - src=si, src_slices=rog, dst=rhs) - kl += sout.backend.add(x1=lhs, x2=rhs, out=lhs, build_kernel_launcher=True) - kl += OpenClCopyBufferRectLauncher.from_slices(varname=varname+'_accumulate_rg', - src=lhs, dst=si, dst_slices=lig) - - # accumulate left outer ghosts to right inner ghosts - kl += OpenClCopyBufferRectLauncher.from_slices(varname=varname+'_left_outer_ghosts', - src=si, src_slices=log, dst=lhs) - kl += OpenClCopyBufferRectLauncher.from_slices(varname=varname+'_right_inner_ghosts', - src=si, src_slices=rig, dst=rhs) - kl += sout.backend.add(x1=lhs, x2=rhs, out=lhs, build_kernel_launcher=True) - kl += OpenClCopyBufferRectLauncher.from_slices(varname=varname+'_accumulate_lg', - src=lhs, dst=si, dst_slices=rig) - else: - msg='MPI exchange not implemented yet.' - raise RuntimeError(msg) - - for i in xrange(sout.nb_components): - si = sout[i] - vname = sout.name+'_{}'.format(i) - for direction in xrange(sout.dim): - if ghosts[direction]==0: - continue - dirlabel = DirectionLabels[sout.dim-direction-1] - (flhs, frhs) = work.get_buffer(self, sout.name+'_full_ghost_layers_'+dirlabel) - varname = vname+'_{}'.format(dirlabel) - lsrc, rsrc, ishape = all_inner_ghosts[direction] - ldst, rdst, oshape = all_outer_ghosts[direction] - assert (ishape==oshape).all() - if (nprocs[direction] == 1): - # exchange all left inner ghosts to right outer ghosts - kl += OpenClCopyBufferRectLauncher.from_slices(varname=varname+'_to_right_ghosts_buffer', - src=si, src_slices=lsrc, dst=frhs) - kl += OpenClCopyBufferRectLauncher.from_slices(varname=varname+'_from_right_ghosts_buffer', - src=frhs, dst=si, dst_slices=rdst) - - # exchange all right inner ghosts to left outer ghosts - kl += OpenClCopyBufferRectLauncher.from_slices(varname=varname+'_to_left_ghosts_buffer', - src=si, src_slices=rsrc, dst=flhs) - kl += OpenClCopyBufferRectLauncher.from_slices(varname=varname+'_from_left_ghosts_buffer', - src=flhs, dst=si, dst_slices=ldst) - else: - msg='MPI exchange not implemented yet.' - raise RuntimeError(msg) - - self.accumulate_and_exchange = kl - return kl - - + print '{}'.format(sout.name) + # kl += sout.accumulate_ghosts(directions=-1, ghosts=remesh_ghosts, + # build_launcher=True) + kl += sout.exchange_ghosts(build_launcher=True) + return kl @op_apply def apply(self, **kargs): diff --git a/hysop/backend/host/host_array_backend.py b/hysop/backend/host/host_array_backend.py index 0b12b172f..db151b085 100644 --- a/hysop/backend/host/host_array_backend.py +++ b/hysop/backend/host/host_array_backend.py @@ -246,11 +246,14 @@ class HostArrayBackend(ArrayBackend): """ self._unsupported_argument('empty_like', 'subok', subok, True) if (order is None) or (order == MemoryOrdering.SAME_ORDER): - if a.flags['C_CONTIGUOUS']: - order = MemoryOrdering.C_CONTIGUOUS - elif a.flags['F_CONTIGUOUS']: - order = MemoryOrdering.FORTRAN_CONTIGUOUS - else: + try: + if a.flags['C_CONTIGUOUS']: + order = MemoryOrdering.C_CONTIGUOUS + elif a.flags['F_CONTIGUOUS']: + order = MemoryOrdering.FORTRAN_CONTIGUOUS + else: + order = default_order + except AttributeError: order = default_order return self.empty( shape = a.shape, diff --git a/hysop/fields/cartesian_discrete_field.py b/hysop/fields/cartesian_discrete_field.py index e7212170f..db021f048 100644 --- a/hysop/fields/cartesian_discrete_field.py +++ b/hysop/fields/cartesian_discrete_field.py @@ -529,18 +529,10 @@ CartesianDiscreteFieldView (id={}, tag={}) msg=msg.format(backend.kind) vprint(msg) - # print 'BEFORE EXCHANGE' - # self.print_with_ghosts(outer_ghosts=None) - # print 'BEFORE EXCHANGE' - # self.print_with_ghosts() if exchange_ghosts: evt = self.exchange_ghosts() if (evt is not None): evt.wait() - # print 'AFTER EXCHANGE' - # self.print_with_ghosts() - # print 'AFTER EXCHANGE' - # self.print_with_ghosts(outer_ghosts=None) def integrate(self, data=None, components=None): """Sum all the values in the mesh.""" @@ -672,7 +664,7 @@ CartesianDiscreteFieldView (id={}, tag={}) def exchange_ghosts(self, components=None, directions=None, ghosts=None, ghost_op=None, exchange_method=None, - evt=None, **kwds): + evt=None, build_launcher=False, **kwds): """ Exchange ghosts using cached ghost exchangers which are built at first use. ie. Exchange every ghosts components of self.data using current topology state. @@ -692,9 +684,17 @@ CartesianDiscreteFieldView (id={}, tag={}) ghost_op=ghost_op, components=components, directions=directions, exchange_method=exchange_method) ghost_exchanger = self._dfield._ghost_exchangers[key] - new_evt = ghost_exchanger.exchange_ghosts(**kwds) + if build_launcher: + assert (evt is None) + return ghost_exchanger._build_launcher() + else: + new_evt = ghost_exchanger(**kwds) else: - new_evt = None + if build_launcher: + assert (evt is None) + return None + else: + new_evt = None return first_not_None(new_evt, evt) def build_ghost_exchanger(self, name=None, components=None, directions=None, diff --git a/hysop/fields/ghost_exchangers.py b/hysop/fields/ghost_exchangers.py index 32078a002..55ccad788 100644 --- a/hysop/fields/ghost_exchangers.py +++ b/hysop/fields/ghost_exchangers.py @@ -9,6 +9,7 @@ from hysop.core.arrays.all import HostArray, OpenClArray from hysop.core.mpi import MPI from hysop.core.mpi.topo_tools import TopoTools from hysop.backend.device.opencl import cl, clArray +from hysop.backend.device.opencl.opencl_kernel_launcher import HostLauncherI class GhostExchanger(object): def __init__(self, name, topology, data, ghost_op): @@ -32,10 +33,10 @@ class GhostExchanger(object): self.backend = data[0].backend if hasattr(data[0], 'backend') else topology.backend self.host_backend = self.backend.host_array_backend self.base_tag = int(hashlib.sha1(name).hexdigest(), 16) % (104729) - self.name = name self.ghost_op = ghost_op self.base_dtype = base_dtype self.base_mpi_type = base_mpi_type + self.name = name class CartesianDiscreteFieldGhostExchanger(GhostExchanger): @@ -45,7 +46,7 @@ class CartesianDiscreteFieldGhostExchanger(GhostExchanger): exchange_method=None): ghost_op = first_not_None(ghost_op, GhostOperation.EXCHANGE) - + if ghost_op is GhostOperation.EXCHANGE: exchange_method = first_not_None(exchange_method, ExchangeMethod.NEIGHBOR_ALL_TO_ALL_W) @@ -84,20 +85,39 @@ class CartesianDiscreteFieldGhostExchanger(GhostExchanger): if (kind == Backend.HOST): for d in data: assert isinstance(d, (np.ndarray, HostArray)), type(d) - self.exchange_ghosts = self.exchange_ghosts_on_host_backend elif (kind == Backend.OPENCL): for d in data: assert isinstance(d, (clArray.Array, OpenClArray)), type(d) - self.exchange_ghosts = self.exchange_ghosts_on_opencl_backend else: msg='Unknown topology array backend kind {}.'.format(kind) raise ValueError(msg) self.kind = kind - self._python_launcher = None - self._cl_launcher = None + self._launcher = None + + def __call__(self, **kwds): + """Exchange ghosts on specified data.""" + if (self._launcher is None): + self._build_launcher() + if (self.kind is Backend.OPENCL): + kwds.setdefault('queue', self.backend.cl_env.default_queue) + return self._launcher(**kwds) - def build_python_ghost_exchanger(self): + def _build_launcher(self): + if (self._launcher is not None): + return self._launcher + if (self.kind is Backend.HOST): + self._build_python_launcher() + elif (self.kind is Backend.OPENCL): + self._build_opencl_launcher() + else: + msg='Unknown array backend kind {}.'.format(self.kind) + raise NotImplementedError(msg) + assert (self._launcher is not None) + return self._launcher + + def _build_python_launcher(self): + assert (self._launcher is None) ghost_op = self.ghost_op comm = self.topology.comm local_rank = self.topology.cart_rank @@ -117,20 +137,34 @@ class CartesianDiscreteFieldGhostExchanger(GhostExchanger): return tag local_exchange = [] - target_buffers = [] + has_mpi_exchange = False # Call kwds depends on exchange method used (Send-Recv, Window', 'Neighbor_AlltoAll') - isend_kwds = [] # one call per direction, per dimension, per data buffer - irecv_kwds = [] # one call per direction, per dimension, per data buffer - all_to_all_v_kwds = None # only one call - all_to_all_w_kwds = [] # one cell per data buffer + isend_kwds = [] # one call per direction, per dimension, per data buffer + irecv_kwds = [] # one call per direction, per dimension, per data buffer + i_dst_buffers = [] + i_src_buffers = [] - has_mpi_exchange = False - all_to_all_v_send_requests = {} - all_to_all_v_recv_requests = {} + v_kwds = None # only one call for all buffers + v_send_requests = {} + v_recv_requests = {} + v_src_buffers = () + v_dst_buffers = () + v_send_buffer = None + v_recv_buffer = None + + w_kwds = [] # one cell per data buffer + w_src_buffers = [] + w_dst_buffers = [] + w_send_buffer = [] + w_recv_buffer = [] + + from_buffer = False + to_buffer = False + for (i,buf) in enumerate(self.data): - all_to_all_w_send_requests = {} - all_to_all_w_recv_requests = {} + w_send_requests = {} + w_recv_requests = {} for d in self.directions: if self.ghosts[d]==0: continue @@ -142,7 +176,7 @@ class CartesianDiscreteFieldGhostExchanger(GhostExchanger): nprocs = proc_shape[d] has_mpi_exchange |= (nprocs > 1) - + if (nprocs == 1): # We need to exchange with ourselves local_exchange.append((buf,outer_right,inner_left)) @@ -158,13 +192,13 @@ class CartesianDiscreteFieldGhostExchanger(GhostExchanger): if nprocs==2: # switch left and right in 2 proc periodic case outer_right, outer_left = outer_left, outer_right - all_to_all_v_send_requests.setdefault(('left',left_rank), []) \ + v_send_requests.setdefault(('left',left_rank), []) \ .append(buf[inner_left]) - all_to_all_v_send_requests.setdefault(('right',right_rank), []) \ + v_send_requests.setdefault(('right',right_rank), []) \ .append(buf[inner_right]) - all_to_all_v_recv_requests.setdefault(('left',left_rank), []) \ + v_recv_requests.setdefault(('left',left_rank), []) \ .append(buf[outer_left]) - all_to_all_v_recv_requests.setdefault(('right',right_rank), []) \ + v_recv_requests.setdefault(('right',right_rank), []) \ .append(buf[outer_right]) elif (exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_W): # Send and receive to and from every neigbours @@ -172,14 +206,16 @@ class CartesianDiscreteFieldGhostExchanger(GhostExchanger): if nprocs==2: # switch left and right in 2 proc periodic case outer_right, outer_left = outer_left, outer_right - all_to_all_w_send_requests.setdefault(('left',left_rank), []) \ + w_send_requests.setdefault(('left',left_rank), []) \ .append((buf, inner_left)) - all_to_all_w_send_requests.setdefault(('right',right_rank), []) \ + w_send_requests.setdefault(('right',right_rank), []) \ .append((buf, inner_right)) - all_to_all_w_recv_requests.setdefault(('left',left_rank), []) \ + w_recv_requests.setdefault(('left',left_rank), []) \ .append((buf, outer_left)) - all_to_all_w_recv_requests.setdefault(('right',right_rank), []) \ + w_recv_requests.setdefault(('right',right_rank), []) \ .append((buf, outer_right)) + from_buffer = False + to_buffer = False elif (exchange_method is ExchangeMethod.ISEND_IRECV): # Exchanges with left neighour assert (left_rank != local_rank) and (left_rank != -1), left_rank @@ -238,13 +274,13 @@ class CartesianDiscreteFieldGhostExchanger(GhostExchanger): if nprocs==2: # switch left and right in 2 proc periodic case inner_right, inner_left = inner_left, inner_right - all_to_all_v_send_requests.setdefault(('left',left_rank), []) \ + v_send_requests.setdefault(('left',left_rank), []) \ .append(buf[outer_left]) - all_to_all_v_send_requests.setdefault(('right',right_rank), []) \ + v_send_requests.setdefault(('right',right_rank), []) \ .append(buf[outer_right]) - all_to_all_v_recv_requests.setdefault(('left',left_rank), []) \ + v_recv_requests.setdefault(('left',left_rank), []) \ .append(buf[inner_left]) - all_to_all_v_recv_requests.setdefault(('right',right_rank), []) \ + v_recv_requests.setdefault(('right',right_rank), []) \ .append(buf[inner_right]) elif (exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_W): # Send and receive to and from every neigbours @@ -254,14 +290,16 @@ class CartesianDiscreteFieldGhostExchanger(GhostExchanger): inner_right, inner_left = inner_left, inner_right left_rank = neighbour_ranks[0,d] right_rank = neighbour_ranks[1,d] - all_to_all_w_send_requests.setdefault(('left',left_rank), []) \ + w_send_requests.setdefault(('left',left_rank), []) \ .append((buf, outer_left)) - all_to_all_w_send_requests.setdefault(('right',right_rank), []) \ + w_send_requests.setdefault(('right',right_rank), []) \ .append((buf, outer_right)) - all_to_all_w_recv_requests.setdefault(('left',left_rank), []) \ + w_recv_requests.setdefault(('left',left_rank), []) \ .append((buf, inner_left)) - all_to_all_w_recv_requests.setdefault(('right',right_rank), []) \ + w_recv_requests.setdefault(('right',right_rank), []) \ .append((buf, inner_right)) + from_buffer = False + to_buffer = True elif (exchange_method is ExchangeMethod.ISEND_IRECV): # Exchanges with left neighour left_rank = neighbour_ranks[0,d] @@ -285,7 +323,7 @@ class CartesianDiscreteFieldGhostExchanger(GhostExchanger): 'source':left_rank, 'tag':recvtag} irecv_kwds.append(recv_kwds) - target_buffers.append((tmp, buf, inner_left)) + i_dst_buffers.append((tmp, buf, inner_left)) # Exchanges with right neighour right_rank = neighbour_ranks[1,d] @@ -309,7 +347,7 @@ class CartesianDiscreteFieldGhostExchanger(GhostExchanger): 'source':right_rank, 'tag':recvtag} irecv_kwds.append(recv_kwds) - target_buffers.append((tmp, buf, inner_right)) + i_dst_buffers.append((tmp, buf, inner_right)) else: msg='Unknown MPI exchange method {}.'.format(exchange_method) raise NotImplementedError(msg) @@ -322,73 +360,115 @@ class CartesianDiscreteFieldGhostExchanger(GhostExchanger): sendtypes, recvtypes = (),() # subarray types sendcounts, recvcounts = (),() # in type counts sdispls, rdispls = (),() # in bytes + sdispl, rdispl = 0, 0 + scount, rcount = 0, 0 + + src_buffers, dst_buffers = (), () for direction in xrange(comm.Get_dim()): for (tag,rank) in zip(('left','right'), comm.Shift(direction, 1)): - if ((tag,rank) in all_to_all_w_send_requests): - requests = all_to_all_w_send_requests[(tag,rank)] + displ = 0 + if ((tag,rank) in w_send_requests): + requests = w_send_requests[(tag,rank)] assert len(requests)==1 (src,slc) = requests[0] assert src is buf assert src.dtype == base_dtype - mpi_type = TopoTools.create_subarray(slc, src.shape, - mpi_type=base_mpi_type) - send_count = 1 + if from_buffer: + mpi_type = base_mpi_type + send_count = src[slc].size + displ = src[slc].nbytes + vslc = slice(scount, scount+send_count) + src_buffers += ((src[slc],vslc),) + else: + mpi_type = TopoTools.create_subarray(slc, src.shape, + mpi_type=base_mpi_type) + send_count = 1 else: mpi_type = base_mpi_type send_count = 0 sendtypes += (mpi_type,) sendcounts += (send_count,) - sdispls += (0,) - if ((tag,rank) in all_to_all_w_recv_requests): - requests = all_to_all_w_recv_requests[(tag,rank)] + sdispls += (sdispl,) + sdispl += displ + scount += send_count + + displ = 0 + if ((tag,rank) in w_recv_requests): + requests = w_recv_requests[(tag,rank)] assert len(requests)==1 (dst,slc) = requests[0] assert dst is buf assert dst.dtype == base_dtype - mpi_type = TopoTools.create_subarray(slc, dst.shape, - mpi_type=base_mpi_type) - recv_count = 1 + if to_buffer: + mpi_type = base_mpi_type + recv_count = dst[slc].size + displ = dst[slc].nbytes + vslc = slice(rcount, rcount+recv_count) + dst_buffers += ((dst[slc],vslc),) + else: + mpi_type = TopoTools.create_subarray(slc, dst.shape, + mpi_type=base_mpi_type) + recv_count = 1 else: mpi_type = base_mpi_type recv_count = 0 recvtypes += (mpi_type,) recvcounts += (recv_count,) - rdispls += (0,) - all_to_all_w_kwds.append( { - 'sendbuf': [buf.handle, sendcounts, sdispls, sendtypes], - 'recvbuf': [buf.handle, recvcounts, rdispls, recvtypes] + rdispls += (rdispl,) + rdispl += displ + rcount += recv_count + if from_buffer: + assert (scount > 0), scount + send_size = scount + send_buffer = self.backend.empty(shape=(send_size,), dtype=base_dtype) + sendbuf = send_buffer.handle + else: + send_buffer = None + sendbuf = buf.handle + if to_buffer: + assert (rcount > 0), rcount + recv_size = rcount + recv_buffer = self.backend.empty(shape=(recv_size,), dtype=base_dtype) + recvbuf = recv_buffer.handle + else: + recv_buffer = None + recvbuf = buf.handle + w_src_buffers.append(src_buffers) + w_send_buffer.append(send_buffer) + w_dst_buffers.append(dst_buffers) + w_recv_buffer.append(recv_buffer) + w_kwds.append( { + 'sendbuf': [sendbuf, sendcounts, sdispls, sendtypes], + 'recvbuf': [recvbuf, recvcounts, rdispls, recvtypes] } ) # handle all_to_all_v kwds (one call for all buffers and all neighbours) - all_to_all_v_send_buffers = () - all_to_all_v_recv_buffers = () - all_to_all_v_send_buffer = None - all_to_all_v_recv_buffer = None if has_mpi_exchange and (exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_V): - assert all_to_all_v_send_requests - assert all_to_all_v_recv_requests + assert v_send_requests + assert v_recv_requests sendtype, recvtype = base_mpi_type, base_mpi_type sendcounts, recvcounts = (),() # in type counts sdispls, rdispls = (),() # in type counts send_disp, recv_disp = 0,0 # in type counts - send_buffers, recv_buffers = (), () + src_buffers, dst_buffers = (), () + send_buffer, recv_buffer = None, None for direction in xrange(comm.Get_dim()): for (tag,rank) in zip(('left','right'), comm.Shift(direction, 1)): send_count, recv_count = 0, 0 - if ((tag,rank) in all_to_all_v_send_requests): - for src in all_to_all_v_send_requests[(tag,rank)]: + if ((tag,rank) in v_send_requests): + for src in v_send_requests[(tag,rank)]: assert src.dtype == base_dtype slc = slice(send_disp+send_count, send_disp+send_count+src.size) - send_buffers += ((src,slc),) + src_buffers += ((src,slc),) send_count += src.size sendcounts += (send_count,) sdispls += (send_disp,) send_disp += send_count - if ((tag,rank) in all_to_all_v_recv_requests): - for dst in all_to_all_v_recv_requests[(tag,rank)]: + if ((tag,rank) in v_recv_requests): + for dst in v_recv_requests[(tag,rank)]: assert dst.dtype == base_dtype slc = slice(recv_disp+recv_count, recv_disp+recv_count+dst.size) - recv_buffers += ((dst,slc),) + dst_buffers += ((dst,slc),) recv_count += dst.size recvcounts += (recv_count,) rdispls += (recv_disp,) @@ -401,40 +481,42 @@ class CartesianDiscreteFieldGhostExchanger(GhostExchanger): recv_buffer = self.backend.empty(shape=(recv_size,), dtype=base_dtype) assert send_buffer.dtype==base_dtype assert recv_buffer.dtype==base_dtype - all_to_all_v_kwds = { + v_kwds = { 'sendbuf': [send_buffer.handle, sendcounts, sdispls, sendtype], 'recvbuf': [recv_buffer.handle, recvcounts, rdispls, recvtype] } - all_to_all_v_send_buffers = send_buffers - all_to_all_v_recv_buffers = recv_buffers - all_to_all_v_send_buffer = send_buffer - all_to_all_v_recv_buffer = recv_buffer + v_src_buffers = src_buffers + v_dst_buffers = dst_buffers + v_send_buffer = send_buffer + v_recv_buffer = recv_buffer if ghost_op is GhostOperation.EXCHANGE: if has_mpi_exchange: if exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_W: - assert len(all_to_all_w_kwds)>0 + assert len(w_kwds)>0 + assert (from_buffer is False) + assert (to_buffer is False) def python_launcher(): evts = [] - for kwds in all_to_all_w_kwds: + for kwds in w_kwds: evt = comm.Ineighbor_alltoallw(**kwds) evts.append(evt) for (buf,outer,inner) in local_exchange: buf[outer] = buf[inner] MPI.Request.Waitall(evts) elif exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_V: - assert all_to_all_v_kwds + assert v_kwds def python_launcher(): - if all_to_all_v_kwds: - for (src,slc) in all_to_all_v_send_buffers: - all_to_all_v_send_buffer[slc] = src.ravel() - evt = comm.Ineighbor_alltoallv(**all_to_all_v_kwds) + if v_kwds: + for (src,slc) in v_src_buffers: + v_send_buffer[slc] = src.ravel() + evt = comm.Ineighbor_alltoallv(**v_kwds) for (buf,outer,inner) in local_exchange: buf[outer] = buf[inner] - if all_to_all_v_kwds: + if v_kwds: evt.wait() - for (dst,slc) in all_to_all_v_recv_buffers: - dst[...] = all_to_all_v_recv_buffer[slc].reshape(dst.shape) + for (dst,slc) in v_dst_buffers: + dst[...] = v_recv_buffer[slc].reshape(dst.shape) elif exchange_method is ExchangeMethod.ISEND_IRECV: def python_launcher(): evts = [] @@ -457,18 +539,32 @@ class CartesianDiscreteFieldGhostExchanger(GhostExchanger): elif ghost_op is GhostOperation.ACCUMULATE: if has_mpi_exchange: if exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_W: - raise NotImplementedError(msg) + assert (from_buffer is False) + assert (to_buffer is True) + def python_launcher(): + evts = [] + for (kwds,send_buffer,src_buffers) in \ + zip(w_kwds, w_send_buffer, w_src_buffers): + for (src,slc) in src_buffers: + send_buffer[slc] = src.ravel() + evt = comm.Ineighbor_alltoallw(**kwds) + evts.append(evt) + for (buf,outer,inner) in local_exchange: + buf[inner] += buf[outer] + for idx in iter_mpi_requests(evts): + for (dst,slc) in w_dst_buffers[idx]: + dst[...] += w_recv_buffer[idx][slc].reshape(dst.shape) elif exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_V: - assert all_to_all_v_kwds + assert v_kwds def python_launcher(): - for (src,slc) in all_to_all_v_send_buffers: - all_to_all_v_send_buffer[slc] = src.ravel() - evt = comm.Ineighbor_alltoallv(**all_to_all_v_kwds) + for (src,slc) in v_src_buffers: + v_send_buffer[slc] = src.ravel() + evt = comm.Ineighbor_alltoallv(**v_kwds) for (buf,outer,inner) in local_exchange: buf[inner] += buf[outer] evt.wait() - for (dst,slc) in all_to_all_v_recv_buffers: - dst[...] += all_to_all_v_recv_buffer[slc].reshape(dst.shape) + for (dst,slc) in v_dst_buffers: + dst[...] += v_recv_buffer[slc].reshape(dst.shape) elif exchange_method is ExchangeMethod.ISEND_IRECV: def python_launcher(): send_evts = [] @@ -482,7 +578,7 @@ class CartesianDiscreteFieldGhostExchanger(GhostExchanger): for (buf,outer,inner) in local_exchange: buf[inner] += buf[outer] for idx in iter_mpi_requests(recv_evts): - (tmp, buf, inner) = target_buffers[idx] + (tmp, buf, inner) = i_dst_buffers[idx] buf[inner] += tmp MPI.Request.Waitall(send_evts) else: @@ -496,67 +592,110 @@ class CartesianDiscreteFieldGhostExchanger(GhostExchanger): msg='Unknown GhostOperation {}.'.format(ghost_op) raise NotImplementedError(msg) - self._python_launcher = python_launcher + self._launcher = python_launcher - def exchange_ghosts_on_host_backend(self): - """Exchange ghosts on specified host data.""" - # print 'EXCHANGE GHOSTS' - # print self.ghost_op, self.directions, self.ghosts - # print 'BEFORE EXCHANGE' - # print self.data[0] - if (self._python_launcher is None): - self.build_python_ghost_exchanger() - self._python_launcher() - # print 'AFTER EXCHANGE' - # print self.data[0] - - def build_opencl_kernel_launcher(self): + def _build_opencl_launcher(self): + assert (self._launcher is None) from hysop.backend.device.opencl.opencl_kernel_launcher import OpenClKernelListLauncher from hysop.backend.device.opencl.opencl_copy_kernel_launchers \ import OpenClCopyBufferRectLauncher - kl = OpenClKernelListLauncher(name='exchange_ghosts_{}'.format(self.name)) dim = self.dim - for (i, buf) in enumerate(self.data): - vname = '{}_{}'.format(self.name, i) - for d in self.directions: - if self.ghosts[d]==0: - continue - inner_left, inner_right, inner_shape = self.inner_ghosts[d] - outer_left, outer_right, outer_shape = self.outer_ghosts[d] - assert (inner_shape==outer_shape).all() - dirlabel = DirectionLabels[dim-d-1] - varname = vname+'_{}'.format(dirlabel) - if (self.nprocs[d] == 1): - # some opencl platforms reject inplace buffer copies so we use a tmp buffer - # to perform local ghost exchanges - tmp = self.backend.empty(shape=outer_shape, dtype=buf.dtype) + ghost_op = self.ghost_op + proc_shape = self.topology.proc_shape - # exchange all left inner ghosts to right outer ghosts - kl += OpenClCopyBufferRectLauncher.from_slices( - varname=varname+'_to_right_ghosts_buffer', - src=buf, src_slices=inner_left, dst=tmp) - kl += OpenClCopyBufferRectLauncher.from_slices( - varname=varname+'_from_right_ghosts_buffer', - src=tmp, dst=buf, dst_slices=outer_right) + has_mpi_exchange = False - # exchange all right inner ghosts to left outer ghosts - kl += OpenClCopyBufferRectLauncher.from_slices( - varname=varname+'_to_left_ghosts_buffer', - src=buf, src_slices=inner_right, dst=tmp) - kl += OpenClCopyBufferRectLauncher.from_slices( - varname=varname+'_from_left_ghosts_buffer', - src=tmp, dst=buf, dst_slices=outer_left) - else: - msg='MPI exchange not implemented yet.' - raise RuntimeError(msg) - self._cl_launcher = kl - - def exchange_ghosts_on_opencl_backend(self, queue=None): - """Exchange ghosts on specified opencl device data.""" - queue = first_not_None(queue, self.backend.cl_env.default_queue) - if (self._cl_launcher is None): - self.build_opencl_kernel_launcher() - return self._cl_launcher(queue=queue) + if ghost_op is GhostOperation.EXCHANGE: + kl = OpenClKernelListLauncher(name='exchange_ghosts_{}'.format(self.name)) + for (i, buf) in enumerate(self.data): + vname = '{}_{}'.format(self.name, i) + for d in self.directions: + if self.ghosts[d]==0: + continue + inner_left, inner_right, inner_shape = self.inner_ghosts[d] + outer_left, outer_right, outer_shape = self.outer_ghosts[d] + assert (inner_shape==outer_shape).all() + dirlabel = DirectionLabels[dim-d-1] + varname = vname+'_{}'.format(dirlabel) + + nprocs = proc_shape[d] + has_mpi_exchange |= (nprocs > 1) + + if (nprocs == 1): + # some opencl platforms reject inplace buffer copies + # so we use a tmp buffer + # to perform local ghost exchanges + tmp = self.backend.empty(shape=outer_shape, dtype=buf.dtype) + + # exchange all left inner ghosts to right outer ghosts + kl += OpenClCopyBufferRectLauncher.from_slices( + varname=varname+'_to_right_ghosts_buffer', + src=buf, src_slices=inner_left, dst=tmp) + kl += OpenClCopyBufferRectLauncher.from_slices( + varname=varname+'_from_right_ghosts_buffer', + src=tmp, dst=buf, dst_slices=outer_right) + + print 'INNER LEFT', inner_left + print 'OUTER_RIGHT', outer_right + + # exchange all right inner ghosts to left outer ghosts + kl += OpenClCopyBufferRectLauncher.from_slices( + varname=varname+'_to_left_ghosts_buffer', + src=buf, src_slices=inner_right, dst=tmp) + kl += OpenClCopyBufferRectLauncher.from_slices( + varname=varname+'_from_left_ghosts_buffer', + src=tmp, dst=buf, dst_slices=outer_left) + else: + msg='MPI exchange not implemented yet.' + raise RuntimeError(msg) + elif ghost_op is GhostOperation.ACCUMULATE: + kl = OpenClKernelListLauncher(name='accumulate_ghosts_{}'.format(self.name)) + for (i, buf) in enumerate(self.data): + vname = '{}_{}'.format(self.name, i) + for d in self.directions: + if self.ghosts[d]==0: + continue + + nprocs = proc_shape[d] + has_mpi_exchange |= (nprocs > 1) + + if (nprocs == 1): + inner_left, inner_right, inner_shape = self.inner_ghosts[d] + outer_left, outer_right, outer_shape = self.outer_ghosts[d] + assert (inner_shape==outer_shape).all() + dirlabel = DirectionLabels[dim-d-1] + varname = vname+'_{}'.format(dirlabel) + ltmp = self.backend.empty(shape=inner_shape, dtype=buf.dtype) + rtmp = self.backend.empty(shape=inner_shape, dtype=buf.dtype) + # accumulate right outer ghosts to left inner ghosts + kl += OpenClCopyBufferRectLauncher.from_slices( + varname=varname+'_left_inner_ghosts', + src=buf, src_slices=inner_left, dst=ltmp) + kl += OpenClCopyBufferRectLauncher.from_slices( + varname=varname+'_right_outer_ghosts', + src=buf, src_slices=outer_right, dst=rtmp) + kl += self.backend.add(x1=ltmp, x2=rtmp, out=ltmp, + build_kernel_launcher=True) + kl += OpenClCopyBufferRectLauncher.from_slices( + varname=varname+'_accumulate_lg', + src=ltmp, dst=buf, dst_slices=inner_left) + + # accumulate left outer ghosts to right inner ghosts + kl += OpenClCopyBufferRectLauncher.from_slices( + varname=varname+'_left_outer_ghosts', + src=buf, src_slices=outer_left, dst=ltmp) + kl += OpenClCopyBufferRectLauncher.from_slices( + varname=varname+'_right_inner_ghosts', + src=buf, src_slices=inner_right, dst=rtmp) + kl += self.backend.add(x1=ltmp, x2=rtmp, out=ltmp, + build_kernel_launcher=True) + kl += OpenClCopyBufferRectLauncher.from_slices( + varname=varname+'_accumulate_rg', + src=ltmp, dst=buf, dst_slices=inner_right) + else: + msg='Unknown MPI exchange method {}.'.format(exchange_method) + raise NotImplementedError(msg) + self._launcher = kl diff --git a/hysop/fields/tests/test_cartesian.py b/hysop/fields/tests/test_cartesian.py index fc22f2f38..e22f3fb4d 100644 --- a/hysop/fields/tests/test_cartesian.py +++ b/hysop/fields/tests/test_cartesian.py @@ -138,7 +138,7 @@ def test_serial_initialization_3d(): def test_mpi_ghost_exchange(comm): rank = comm.Get_rank() size = comm.Get_size() - dtypes = (np.float16, np.float32, np.float64, + dtypes = (np.float32, np.float32, np.float64, np.int16, np.int32, np.int64, np.uint16, np.uint32, np.uint64) assert size-1 < len(dtypes) @@ -172,7 +172,7 @@ def test_mpi_ghost_exchange(comm): if rank==0: print ' *cart_shape: {}'.format(shape) topo = CartesianTopology(domain=domain, discretization=discretization, - backend=Backend.HOST, cart_shape=shape) + backend=Backend.OPENCL, cart_shape=shape) assert (topo.proc_shape==shape).all() def ghost_base(i,d,rank,local_dir): @@ -180,7 +180,7 @@ def test_mpi_ghost_exchange(comm): def ghost_vals(buf,i,d,rank,local_dir): if (buf is None): return None - base = ghost_base(i,d,rank,local_dir) + base = np.full_like(buf, fill_value=ghost_base(i,d,rank,local_dir)) I = np.ix_(*tuple(np.arange(buf.shape[direction], dtype=buf.dtype) for direction in xrange(buf.ndim))) return base + I[d] @@ -203,8 +203,8 @@ def test_mpi_ghost_exchange(comm): lghosts, rghosts, shape = dF.inner_ghost_slices[d] _lghosts, _rghosts, shape = dF.outer_ghost_slices[d] for (i,data) in enumerate(dF.data): - data[lghosts] = ghost_vals(data[lghosts], i,d,rank,0) - data[rghosts] = ghost_vals(data[rghosts], i,d,rank,1) + data[lghosts] = ghost_vals(data[lghosts].get(), i,d,rank,0) + data[rghosts] = ghost_vals(data[rghosts].get(), i,d,rank,1) data[_lghosts] = 0 data[_rghosts] = 0 @@ -297,7 +297,8 @@ def test_mpi_ghost_accumulate(comm): if rank==0: print ' |{} COMPONENT(S)'.format(F.nb_components) for exchange_method in (ExchangeMethod.ISEND_IRECV, - ExchangeMethod.NEIGHBOR_ALL_TO_ALL_V): + ExchangeMethod.NEIGHBOR_ALL_TO_ALL_V, + ExchangeMethod.NEIGHBOR_ALL_TO_ALL_W,): if rank==0: print ' ExchangeMethod.{:<25} |'.format(exchange_method), sys.stdout.flush() @@ -311,7 +312,8 @@ def test_mpi_ghost_accumulate(comm): for directions in all_directions: if rank==0: if directions: - print ''.join(DirectionLabels[dim-1-d] for d in directions), + print ''.join(DirectionLabels[dim-1-d] + for d in directions), else: print '--', sys.stdout.flush() @@ -323,7 +325,8 @@ def test_mpi_ghost_accumulate(comm): continue (iview,ishape) = all_inner_ghost_slices[ndirections][directions][displacements] (oview,oshape) = all_outer_ghost_slices[ndirections][directions][displacements] - data[oview] = ghost_vals(oshape, rank, directions, displacements, i) + data[oview] = ghost_vals(oshape, rank, directions, + displacements, i) dF.exchange_ghosts(directions=directions, exchange_method=exchange_method, @@ -373,10 +376,10 @@ if __name__ == '__main__': size = comm.Get_size() with test_context(): - #if (size==1): - #test_serial_initialization_1d() - #test_serial_initialization_2d() - #test_serial_initialization_3d() + if (size==0): + test_serial_initialization_1d() + test_serial_initialization_2d() + test_serial_initialization_3d() test_mpi_ghost_exchange(comm=comm) - #test_mpi_ghost_accumulate(comm=comm) + # test_mpi_ghost_accumulate(comm=comm) diff --git a/hysop/fields/tests/test_cartesian.sh b/hysop/fields/tests/test_cartesian.sh index 2ebd8f8a2..b0218498d 100755 --- a/hysop/fields/tests/test_cartesian.sh +++ b/hysop/fields/tests/test_cartesian.sh @@ -21,6 +21,6 @@ popd > /dev/null TEST_FILE=$SCRIPT_PATH/test_cartesian.py for i in 1 2 3 4 5 6 7 8 9; do -#for i in 1 2; do +#for i in 2; do mpirun --oversubscribe -np $i python2 $TEST_FILE done -- GitLab