Skip to content
Snippets Groups Projects
Commit 36ed9794 authored by EXT Jean-Matthieu Etancelin's avatar EXT Jean-Matthieu Etancelin
Browse files

remove old manual operator

parent 431ba7a5
No related branches found
No related tags found
1 merge request!55Resolve "HySoP tasks improvements (and fixes)"
# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Inter-task parameters exchange.
"""
import numpy as np
from hysop.core.graph.computational_node_frontend import ComputationalGraphNodeFrontend
from hysop.constants import Implementation, HYSOP_DIM
from hysop.backend.host.host_operator import HostOperator
from hysop.tools.htypes import check_instance
from hysop.parameters.scalar_parameter import ScalarParameter, TensorParameter
from hysop.tools.decorators import debug
from hysop.core.graph.graph import op_apply
from hysop import MPIParams, MPI
class PythonInterTaskParamComm(HostOperator):
def __new__(cls, parameter, source_task, dest_task, domain, **kwds):
return super().__new__(cls, **kwds)
@debug
def __init__(self, parameter, source_task, dest_task, domain, **kwds):
"""
Communicate parameter through tasks
parameter
----------
parameter: tuple of ScalarParameter or TensorParameter
parameters to communicate
source_task: integer
task id to be used as source for communication
parent_task: integer
task id to be used as context for inter-communication
"""
check_instance(parameter, tuple, values=(ScalarParameter, TensorParameter))
check_instance(source_task, (int, HYSOP_DIM))
check_instance(dest_task, (int, HYSOP_DIM))
input_fields, output_fields = {}, {}
super().__init__(
input_params={_.name: _ for _ in parameter},
output_params={_.name: _ for _ in parameter},
input_fields=input_fields,
output_fields=output_fields,
**kwds,
)
self.domain = domain
self.source_task = source_task
self.dest_task = dest_task
self.task_is_source = domain.is_on_task(source_task)
self.task_is_dest = domain.is_on_task(dest_task)
self.inter_comm = domain.task_intercomm(
dest_task if self.task_is_source else source_task
)
if self.inter_comm.is_inter:
# Disjoint tasks with real inter-communicator
self._the_apply = self._apply_intercomm
elif self.inter_comm.is_intra:
# Overlapping tasks using an intra-communicator fron union of tasks procs
self._the_apply = self._apply_intracomm
self._all_params_by_type = {}
for p in sorted(self.parameters, key=lambda _: _.name):
if not p.dtype in self._all_params_by_type:
self._all_params_by_type[p.dtype] = []
self._all_params_by_type[p.dtype].append(p)
self._send_temp_by_type = {
t: np.zeros((len(self._all_params_by_type[t]),), dtype=t)
for t in self._all_params_by_type.keys()
}
self._recv_temp_by_type = {
t: np.zeros((len(self._all_params_by_type[t]),), dtype=t)
for t in self._all_params_by_type.keys()
}
@op_apply
def apply(self, **kwds):
self._the_apply(**kwds)
def _apply_intercomm(self, **kwds):
"""Disjoint tasks so inter-comm bcast is needed."""
for t in self._all_params_by_type.keys():
if self.task_is_source:
self._send_temp_by_type[t][...] = [
p() for p in self._all_params_by_type[t]
]
self.inter_comm.bcast(
self._send_temp_by_type[t],
root=MPI.ROOT if self.domain.task_rank() == 0 else MPI.PROC_NULL,
)
if self.task_is_dest:
self._recv_temp_by_type[t] = self.inter_comm.bcast(
self._send_temp_by_type[t], root=0
)
for p, v in zip(
self._all_params_by_type[t], self._recv_temp_by_type[t]
):
p.value = v
def _apply_intracomm(self, **kwds):
"""Communicator is an intra-communicator defined as tasks' comm union.
Single broadcast is enough.
"""
for t in self._all_params_by_type.keys():
if self.task_is_source and self.domain.task_rank() == 0:
self._send_temp_by_type[t][...] = [
p() for p in self._all_params_by_type[t]
]
self._recv_temp_by_type[t] = self.inter_comm.bcast(
self._send_temp_by_type[t],
self.domain.task_root_in_parent(self.source_task),
)
if self.task_is_dest:
for p, v in zip(
self._all_params_by_type[t], self._recv_temp_by_type[t]
):
p.value = v
@classmethod
def supports_mpi(cls):
return True
class InterTaskParamComm(ComputationalGraphNodeFrontend):
__implementations = {Implementation.PYTHON: PythonInterTaskParamComm}
@classmethod
def implementations(cls):
return cls.__implementations
@classmethod
def default_implementation(cls):
return Implementation.PYTHON
...@@ -16,7 +16,6 @@ src_python = [ ...@@ -16,7 +16,6 @@ src_python = [
'gradient.py', 'gradient.py',
'hdf_io.py', 'hdf_io.py',
'integrate.py', 'integrate.py',
'inter_task_param_comm.py',
'kinetic_energy.py', 'kinetic_energy.py',
'mean_field.py', 'mean_field.py',
'memory_reordering.py', 'memory_reordering.py',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment