Skip to content
Snippets Groups Projects
Commit 05315266 authored by Jean-Baptiste Keck's avatar Jean-Baptiste Keck
Browse files

Merge branch 'mpi-operators' of...

Merge branch 'mpi-operators' of gricad-gitlab.univ-grenoble-alpes.fr:particle_methods/hysop into mpi-operators
parents e2d6d3c1 c9adaceb
No related branches found
No related tags found
1 merge request!16MPI operators
Pipeline #24772 failed
......@@ -16,6 +16,9 @@ def CS( data, coords, t, component):
chi = lambda x,y,z: np.sqrt((x-pos[0])*(x-pos[0])+(y-pos[1])*(y-pos[1])+0.*z)<=RADIUS
data[...] = 0.
data[chi(x,y,z)] = np.cos(t())
def init_u(data, coords, component):
(x,y,z) = coords
data[...] = 0.
def compute(args):
from hysop import Box, Simulation, Problem, Field, MPIParams, IO, IOParams, main_rank
......@@ -52,10 +55,10 @@ def compute(args):
# and configure how the code is generated and compiled at runtime.
# Create an explicit OpenCL context from user parameters
from hysop.backend.device.opencl.opencl_tools import get_or_create_opencl_env
from hysop.backend.device.opencl.opencl_tools import get_or_create_opencl_env, get_device_number
cl_env = get_or_create_opencl_env(mpi_params=mpi_params,
platform_id=args.cl_platform_id,
device_id=args.cl_device_id)
device_id=box.machine_rank%get_device_number() if args.cl_device_id is None else None)
# Configure OpenCL kernel generation and tuning (already done by HysopArgParser)
from hysop.methods import OpenClKernelConfig
......@@ -170,6 +173,7 @@ def compute(args):
t=t, dt=dt)
simu.write_parameters(t, fixedPoint.it_num,
filename='parameters.txt', precision=8)
problem.initialize_field(u, formula=init_u)
problem.solve(simu, dry_run=args.dry_run)
problem.finalize()
......
......@@ -89,7 +89,7 @@ def convert_device_type(device_type):
if (device_type is None):
return None
check_instance(device_type, DeviceType)
conversion = {
DeviceType.ALL: cl.device_type.ALL,
DeviceType.ACCELERATOR: cl.device_type.ACCELERATOR,
......@@ -112,14 +112,14 @@ def convert_precision(precision):
if (precision is None):
return None
check_instance(precision, Precision)
if precision == Precision.SAME:
msg='Cannot convert Precision.SAME to numpy dtype.'
raise ValueError(msg)
if precision == Precision.QUAD:
msg= 'Numpy does not support the 128-bit IEEE quad precision data type.'
raise RuntimeError(msg)
#TODO when long double will be supported check if device has np.float96 or np.float128 long doubles
# (ie padded to 3*32bits or 2*64bits)
conversion = {
......@@ -136,6 +136,11 @@ def convert_precision(precision):
return conversion[precision]
@static_vars(opencl_environments=dict())
def get_device_number(platform_id = None):
platform_id = first_not_None(platform_id, __DEFAULT_PLATFORM_ID__)
platform = get_platform(platform_id, strict=True)
return len(platform.get_devices())
@static_vars(opencl_environments=dict())
def get_or_create_opencl_env(mpi_params,
......@@ -146,35 +151,35 @@ def get_or_create_opencl_env(mpi_params,
**kargs):
"""
Create or an OpenClEnvironment from given parameters if it does not already exists.
All environements are kept alive (cached) in a dictionary local to this
All environements are kept alive (cached) in a dictionary local to this
function (ie. all opencl operators can share the same OpenClEnvironment).
"""
platform_id = first_not_None(platform_id, __DEFAULT_PLATFORM_ID__)
device_id = first_not_None(device_id, __DEFAULT_DEVICE_ID__)
device_type = first_not_None(device_type, DeviceType.ALL)
check_instance(mpi_params, MPIParams)
check_instance(platform_id, int)
check_instance(device_id, int)
check_instance(device_type, DeviceType, allow_none=True)
check_instance(gl_sharing, bool)
key = (mpi_params, platform_id, device_id, device_type, gl_sharing,)
opencl_envs = get_or_create_opencl_env.opencl_environments
if key in opencl_envs:
return opencl_envs[key]
from hysop.backend.device.opencl.opencl_env import OpenClEnvironment
env = OpenClEnvironment(platform_id=platform_id, device_id=device_id,
device_type=device_type, gl_sharing=gl_sharing, mpi_params=mpi_params,
env = OpenClEnvironment(platform_id=platform_id, device_id=device_id,
device_type=device_type, gl_sharing=gl_sharing, mpi_params=mpi_params,
**kargs)
opencl_envs[key] = env
return env
def create_queue(ctx, props=None):
"""
......
......@@ -12,13 +12,14 @@ from hysop.tools.handle import RegisteredObject, TaggedObjectView
from hysop.tools.types import check_instance
from hysop.tools.numpywrappers import npw
from hysop.symbolic.frame import SymbolicFrame
from hysop.deps import hashlib, np
class DomainView(TaggedObjectView):
"""Abstract base class for views on domains. """
__metaclass__ = ABCMeta
__slots__ = ('_domain', '_topology_state')
@debug
def __new__(cls, topology_state, domain=None, **kwds):
"""Create and initialize a DomainView."""
......@@ -41,7 +42,7 @@ class DomainView(TaggedObjectView):
def _get_topology_state(self):
"""Return the topology state altering this domain view."""
return self._topology_state
def _get_dim(self):
"""Return the dimension of the domain."""
return self._domain._dim
......@@ -61,6 +62,16 @@ class DomainView(TaggedObjectView):
def _get_task_rank(self):
"""Return the rank of the process in the task communicator."""
return self._domain._task_rank
def _get_machine_comm(self):
"""
Return the communicator that owns the current process.
This is the sub-communicator which has been obtained by splitting.
the parent communicator by machine name.
"""
return self._domain._machine_comm
def _get_machine_rank(self):
"""Return the rank of the process in the machine communicator."""
return self._domain._machine_rank
def _get_proc_tasks(self):
"""Return mapping between mpi process rank and task identifier."""
return self._domain._proc_tasks
......@@ -73,7 +84,7 @@ class DomainView(TaggedObjectView):
def _get_frame(self):
"""Get symbolic frame associated to this domain."""
return self._domain._frame
def task_on_proc(self, parent_rank):
"""Get task identifier for a given mpi process (parent communicator rank)."""
if parent_rank >= len(self._domain._proc_tasks):
......@@ -84,7 +95,7 @@ class DomainView(TaggedObjectView):
def current_task(self):
"""Get task number of the current mpi process."""
return self.task_on_proc(self._domain._parent_rank)
def is_on_task(self, params):
"""Test if the current process corresponds to param task."""
if isinstance(params, MPIParams):
......@@ -96,7 +107,7 @@ class DomainView(TaggedObjectView):
msg=msg.format(type(params))
raise TypeError(msg)
return (self.current_task() == task_id)
def print_topologies(self):
"""Print all topologies registered on the domain."""
print self.short_description() + ' defined the following topologies:'
......@@ -129,11 +140,11 @@ class DomainView(TaggedObjectView):
def __hash__(self):
return id(self._domain) ^ hash(self._topology_state)
def __str__(self):
"""Equivalent to self.long_description()"""
return self.long_description()
domain = property(_get_domain)
dim = property(_get_dim)
proc_tasks = property(_get_proc_tasks)
......@@ -141,18 +152,20 @@ class DomainView(TaggedObjectView):
parent_rank = property(_get_parent_rank)
task_comm = property(_get_task_comm)
task_rank = property(_get_task_rank)
machine_comm = property(_get_machine_comm)
machine_rank = property(_get_machine_rank)
registered_topologies = property(_get_registered_topologies)
frame = property(_get_frame)
class Domain(RegisteredObject):
"""Abstract base class for the description of physical domains. """
__metaclass__ = ABCMeta
@debug
def __new__(cls, dim, parent_comm=None, proc_tasks=None, **kwds):
"""
Create or get an existing physical domain of given dim on a specified MPI
Create or get an existing physical domain of given dim on a specified MPI
communicator and specific tasks.
Parameters
......@@ -190,33 +203,33 @@ class Domain(RegisteredObject):
-----
*Parent communicator is split according to proc_tasks.
*About MPI Tasks
proc_tasks[n] = 12 means that task 12 owns proc n
proc_tasks[n] = 12 means that task 12 owns proc n
or equivalently that proc n is dedicated to task 12.
*A dupped parent_comm will return another idenpendent domain instance,
because MPI communicators are hashed trough their python object id.
*A dupped parent_comm will return another idenpendent domain instance,
because MPI communicators are hashed trough their python object id.
"""
dim = int(dim)
parent_comm = parent_comm or main_comm
proc_tasks = proc_tasks or [HYSOP_DEFAULT_TASK_ID,] * parent_comm.Get_size()
proc_tasks = npw.asdimarray(proc_tasks)
assert proc_tasks.size == parent_comm.Get_size()
npw.set_readonly(proc_tasks)
# double check types, to be sure RegisteredObject will work as expected
check_instance(dim, int)
check_instance(parent_comm, MPI.Intracomm)
check_instance(proc_tasks, npw.ndarray, dtype=HYSOP_DIM)
obj = super(Domain,cls).__new__(cls, dim=dim, parent_comm=parent_comm,
obj = super(Domain,cls).__new__(cls, dim=dim, parent_comm=parent_comm,
proc_tasks=proc_tasks, tag_prefix='d', **kwds)
if not obj.obj_initialized:
obj.__initialize(dim, parent_comm, proc_tasks)
return obj
@debug
def __initialize(self, dim, parent_comm, proc_tasks):
parent_rank = parent_comm.Get_rank()
......@@ -227,15 +240,24 @@ class Domain(RegisteredObject):
else:
assert len(proc_tasks) == parent_size
task_comm = parent_comm.Split(color=proc_tasks[parent_rank],
key=parent_rank)
key=parent_rank)
task_rank = task_comm.Get_rank()
# Build a per-machine communicator in order to get a rank on local machines
# Split accoring to machine name hashed and converted to integer (strings generally differs only from a single character)
machine_comm = parent_comm.Split(
color=np.int32(int(hashlib.md5(MPI.Get_processor_name()).hexdigest(),16)%np.iinfo(np.int32).max),
key=parent_rank)
machine_rank = machine_comm.Get_rank()
self._dim = dim
self._parent_comm = parent_comm
self._parent_rank = parent_rank
self._task_comm = task_comm
self._task_rank = task_rank
self._machine_comm = machine_comm
self._machine_rank = machine_rank
self._proc_tasks = proc_tasks
self._registered_topologies = {}
self._frame = SymbolicFrame(dim=dim)
......@@ -271,5 +293,3 @@ class Domain(RegisteredObject):
def view(self, topology_state):
"""Return a view of this domain altered by some topology_state."""
pass
......@@ -17,18 +17,18 @@ class PlottingOperator(ComputationalGraphOperator):
def __init__(self, name=None,
dump_dir=None,
update_frequency=1,
save_frequency=100,
axes_shape=(1,),
update_frequency=1,
save_frequency=100,
axes_shape=(1,),
figsize=(30,18),
visu_rank=0,
fig=None,
fig=None,
axes=None,
**kwds):
import matplotlib
import matplotlib.pyplot as plt
check_instance(name, str)
check_instance(update_frequency, int, minval=0)
check_instance(save_frequency, int, minval=0)
......@@ -39,15 +39,15 @@ class PlottingOperator(ComputationalGraphOperator):
if (fig is None) ^ (axes is None):
msg='figure and axes should be specified at the same time.'
raise RuntimeError(msg)
dump_dir = first_not_None(dump_dir, IO.default_path())
imgpath = '{}/{}.png'.format(dump_dir, name)
if (fig is None):
fig, axes = plt.subplots(*axes_shape, figsize=figsize)
fig.canvas.mpl_connect('key_press_event', self.on_key_press)
fig.canvas.mpl_connect('close_event', self.on_close)
axes = npw.asarray(axes).reshape(axes_shape)
self.fig = fig
......@@ -79,7 +79,7 @@ class PlottingOperator(ComputationalGraphOperator):
def _save(self, simulation, **kwds):
if simulation.should_dump(frequency=self.save_frequency, with_last=True):
self.save(simulation=simulation, **kwds)
@abstractmethod
def update(self, **kwds):
pass
......@@ -87,7 +87,7 @@ class PlottingOperator(ComputationalGraphOperator):
def save(self, **kwds):
self.fig.savefig(self.imgpath, dpi=self.fig.dpi,
bbox_inches='tight')
def on_close(self, event):
self.running = False
......@@ -100,14 +100,15 @@ class PlottingOperator(ComputationalGraphOperator):
class ParameterPlotter(PlottingOperator):
"""
Base operator to plot parameters during runtime.
Base operator to plot parameters during runtime.
"""
def __init__(self, name, parameters, alloc_size=128,
def __init__(self, name, parameters, alloc_size=128,
fig=None, axes=None, shape=None, **kwds):
input_params = {}
if (fig is not None) and (axes is not None):
import matplotlib
custom_axes = True
axes_shape=None
check_instance(parameters, dict, keys=matplotlib.axes.Axes, values=dict)
......@@ -127,7 +128,7 @@ class ParameterPlotter(PlottingOperator):
else:
raise TypeError(type(parameters))
check_instance(_parameters, dict, keys=(int,tuple,list), values=(TensorParameter,list,tuple,dict))
parameters = {}
axes_shape = (1,)*2
for (pos,params) in _parameters.iteritems():
......@@ -158,7 +159,7 @@ class ParameterPlotter(PlottingOperator):
_params[_pname] = _p
parameters[pos] = _params
super(ParameterPlotter, self).__init__(name=name, input_params=input_params,
super(ParameterPlotter, self).__init__(name=name, input_params=input_params,
axes_shape=axes_shape, axes=axes, fig=fig, **kwds)
self.custom_axes = custom_axes
......@@ -198,7 +199,7 @@ class ParameterPlotter(PlottingOperator):
return self.axes[i]
else:
return self.axes.flatten()[i]
def update(self, simulation, **kwds):
# expand memory if required
if (self.counter+1>self.times.size):
......@@ -219,4 +220,3 @@ class ParameterPlotter(PlottingOperator):
lines[pos][p].set_xdata(times[:self.counter])
lines[pos][p].set_ydata(data[pos][p][:self.counter])
self.counter += 1
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment