From 36ed9794ec8a9dac4f61a9417db8df71c5c7f865 Mon Sep 17 00:00:00 2001 From: Jean-Matthieu Etancelin <jean-matthieu.etancelin@univ-pau.fr> Date: Mon, 21 Oct 2024 14:46:35 +0200 Subject: [PATCH] remove old manual operator --- hysop/operator/inter_task_param_comm.py | 151 ------------------------ hysop/operator/meson.build | 1 - 2 files changed, 152 deletions(-) delete mode 100644 hysop/operator/inter_task_param_comm.py diff --git a/hysop/operator/inter_task_param_comm.py b/hysop/operator/inter_task_param_comm.py deleted file mode 100644 index 907c9ac9c..000000000 --- a/hysop/operator/inter_task_param_comm.py +++ /dev/null @@ -1,151 +0,0 @@ -# Copyright (c) HySoP 2011-2024 -# -# This file is part of HySoP software. -# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/" -# for further info. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -""" -Inter-task parameters exchange. -""" -import numpy as np -from hysop.core.graph.computational_node_frontend import ComputationalGraphNodeFrontend -from hysop.constants import Implementation, HYSOP_DIM -from hysop.backend.host.host_operator import HostOperator -from hysop.tools.htypes import check_instance -from hysop.parameters.scalar_parameter import ScalarParameter, TensorParameter -from hysop.tools.decorators import debug -from hysop.core.graph.graph import op_apply -from hysop import MPIParams, MPI - - -class PythonInterTaskParamComm(HostOperator): - def __new__(cls, parameter, source_task, dest_task, domain, **kwds): - return super().__new__(cls, **kwds) - - @debug - def __init__(self, parameter, source_task, dest_task, domain, **kwds): - """ - Communicate parameter through tasks - - parameter - ---------- - parameter: tuple of ScalarParameter or TensorParameter - parameters to communicate - source_task: integer - task id to be used as source for communication - parent_task: integer - task id to be used as context for inter-communication - - """ - check_instance(parameter, tuple, values=(ScalarParameter, TensorParameter)) - check_instance(source_task, (int, HYSOP_DIM)) - check_instance(dest_task, (int, HYSOP_DIM)) - input_fields, output_fields = {}, {} - super().__init__( - input_params={_.name: _ for _ in parameter}, - output_params={_.name: _ for _ in parameter}, - input_fields=input_fields, - output_fields=output_fields, - **kwds, - ) - self.domain = domain - self.source_task = source_task - self.dest_task = dest_task - self.task_is_source = domain.is_on_task(source_task) - self.task_is_dest = domain.is_on_task(dest_task) - self.inter_comm = domain.task_intercomm( - dest_task if self.task_is_source else source_task - ) - if self.inter_comm.is_inter: - # Disjoint tasks with real inter-communicator - self._the_apply = self._apply_intercomm - elif self.inter_comm.is_intra: - # Overlapping tasks using an intra-communicator fron union of tasks procs - self._the_apply = self._apply_intracomm - - self._all_params_by_type = {} - for p in sorted(self.parameters, key=lambda _: _.name): - if not p.dtype in self._all_params_by_type: - self._all_params_by_type[p.dtype] = [] - self._all_params_by_type[p.dtype].append(p) - self._send_temp_by_type = { - t: np.zeros((len(self._all_params_by_type[t]),), dtype=t) - for t in self._all_params_by_type.keys() - } - self._recv_temp_by_type = { - t: np.zeros((len(self._all_params_by_type[t]),), dtype=t) - for t in self._all_params_by_type.keys() - } - - @op_apply - def apply(self, **kwds): - self._the_apply(**kwds) - - def _apply_intercomm(self, **kwds): - """Disjoint tasks so inter-comm bcast is needed.""" - for t in self._all_params_by_type.keys(): - if self.task_is_source: - self._send_temp_by_type[t][...] = [ - p() for p in self._all_params_by_type[t] - ] - self.inter_comm.bcast( - self._send_temp_by_type[t], - root=MPI.ROOT if self.domain.task_rank() == 0 else MPI.PROC_NULL, - ) - if self.task_is_dest: - self._recv_temp_by_type[t] = self.inter_comm.bcast( - self._send_temp_by_type[t], root=0 - ) - for p, v in zip( - self._all_params_by_type[t], self._recv_temp_by_type[t] - ): - p.value = v - - def _apply_intracomm(self, **kwds): - """Communicator is an intra-communicator defined as tasks' comm union. - Single broadcast is enough. - """ - for t in self._all_params_by_type.keys(): - if self.task_is_source and self.domain.task_rank() == 0: - self._send_temp_by_type[t][...] = [ - p() for p in self._all_params_by_type[t] - ] - self._recv_temp_by_type[t] = self.inter_comm.bcast( - self._send_temp_by_type[t], - self.domain.task_root_in_parent(self.source_task), - ) - if self.task_is_dest: - for p, v in zip( - self._all_params_by_type[t], self._recv_temp_by_type[t] - ): - p.value = v - - @classmethod - def supports_mpi(cls): - return True - - -class InterTaskParamComm(ComputationalGraphNodeFrontend): - - __implementations = {Implementation.PYTHON: PythonInterTaskParamComm} - - @classmethod - def implementations(cls): - return cls.__implementations - - @classmethod - def default_implementation(cls): - return Implementation.PYTHON diff --git a/hysop/operator/meson.build b/hysop/operator/meson.build index c466dd37a..725effa38 100644 --- a/hysop/operator/meson.build +++ b/hysop/operator/meson.build @@ -16,7 +16,6 @@ src_python = [ 'gradient.py', 'hdf_io.py', 'integrate.py', - 'inter_task_param_comm.py', 'kinetic_energy.py', 'mean_field.py', 'memory_reordering.py', -- GitLab