Vous avez reçu un message "Your GitLab account has been locked ..." ? Pas d'inquiétude : lisez cet article https://docs.gricad-pages.univ-grenoble-alpes.fr/help/unlock/

Commit c4aec808 authored by Thomas Lavocat's avatar Thomas Lavocat
Browse files

Optional launch JAIL with TCP instead of IPC

Because Java does not handle a native support of UNix domain sockets...
parent 5d02c3ce
import json
#constants
ISIPC = "ISIPC"
GETSPWE = "get spawn errors"
GETSPWN = "get spawn"
STOPMPI = "stop mpi bridge"
......
......@@ -511,6 +511,7 @@ class Erebor:
elif decoded_message[consts.ACTION] == consts.MPIDEPL :
on_network = self.networks.get(decoded_message[consts.NETWORK])
port = decoded_message[consts.PORT]
is_ipc = decoded_message[consts.ISIPC]
# init the routing table for this group
self.mpi_routing_table[on_network] = dict()
# TODO handle the fact that maybe a children is already launched on
......@@ -525,7 +526,7 @@ class Erebor:
# Will be called when a replication is done.
def after_children_startup(child) :
network = self.networks.get(child.networkId)
self.start_MPI_bridge_on(child, network.ID, port)
self.start_MPI_bridge_on(child, network.ID, port, is_ipc)
self.mpi_launching_to_wait[network] -= 1
# when everyone has bootstrap
if self.mpi_launching_to_wait[network] == 0 :
......@@ -540,7 +541,7 @@ class Erebor:
after_children_startup)
self.launch_erebor_on(str(spawn.rank), on_network)
else :
self.start_mpi_bridhe(on_network.ID, port)
self.start_mpi_bridhe(on_network.ID, port, is_ipc)
# in case of an one element group, still propagate the
# correct deployed information
if len(spawned) == 1 :
......@@ -822,6 +823,7 @@ class Erebor:
elif order[consts.ORDER] == consts.STRTMPI :
networkId = order[consts.NETWORK]
port = order[consts.PORT]
is_ipc = order[consts.ISIPC]
self.start_mpi_bridhe(networkId, port)
# If i'm asked to stop the MPI bridge
elif order[consts.ORDER] == consts.STOPMPI :
......@@ -1012,7 +1014,7 @@ class Erebor:
network.connector.register_on_dead_bridge(self.dead_bridge_callback)
# Start a new network, for whom I am the master
def start_mpi_bridhe(self, networkId, port) :
def start_mpi_bridhe(self, networkId, port, is_ipc) :
network = self.networks.get(networkId)
if network != None :
self.mpi_bridges[network] = MPIHook(self.add_to_queue,
......@@ -1022,7 +1024,8 @@ class Erebor:
network,
self.debug_list,
self.loglv,
self.logf)
self.logf,
is_ipc)
self.mpi_bridges[network].start()
# Start a new network, for whom I am the master
......@@ -1094,14 +1097,15 @@ class Erebor:
# Ask the Erebor children to start a MPIBridge on the given networkId
# -> only work with direct children
def start_MPI_bridge_on(self, children, networkId, port) :
def start_MPI_bridge_on(self, children, networkId, port, is_ipc) :
network = self.networks.get(children.networkId)
if network != None :
order = json.dumps({
consts.TYPE:consts.ORDER,
consts.ORDER:consts.STRTMPI,
consts.NETWORK:networkId,
consts.PORT:port
consts.PORT:port,
consts.ISIPC:is_ipc
});
network.connector.send_message_to(children.rank, children.target, order, False)
......
......@@ -332,13 +332,14 @@ class FrameworkControler:
# port : the port fort zmq on the hook
# callback : the function who will be executed at the end of te
# operation
def start_mpi_session(self, dnetid, snode, snetid, port, callback):
def start_mpi_session(self, dnetid, snode, snetid, port, callback, is_ipc=consts.TRUE):
id_callback = self.erebor.register_execute_callback(callback)
message = json.dumps({
consts.ACTION:consts.MPIDEPL,
consts.PORT:port,
consts.CALLBACK:str(id_callback),
consts.NETWORK:dnetid})
consts.NETWORK:dnetid,
consts.ISIPC:is_ipc})
self.erebor.send_trans_network_message_to("0",
dnetid,
snode,
......
......@@ -11,7 +11,7 @@ logger = logging.getLogger('yggdrasil')
class MPIHook(Thread) :
def __init__(self, add_to_queue, trans_hook, mpi_trans_hook, port, network,
debug_list, loglv, logf) :
debug_list, loglv, logf, is_ipc) :
Thread.__init__(self)
self.debug_list = debug_list
self.decoder = MPIDecoder()
......@@ -28,12 +28,16 @@ class MPIHook(Thread) :
self.daemon = True
self.send_trans_network_message_to = trans_hook
self.send_mpi_trans_network_message_to = mpi_trans_hook
self.is_ipc = is_ipc == consts.TRUE
def run(self) :
# A context per thread
context = zmq.Context()
rep_socket = context.socket(zmq.PULL)
rep_socket.bind("ipc:///tmp/{}".format(self.port))
if self.is_ipc :
rep_socket.bind("ipc:///tmp/{}".format(self.port))
else :
rep_socket.bind("tcp://*:{}".format(self.port))
logger.debug(consts.bold("mpi -> reply socket ready to receive on {}").format(self.port))
poller = zmq.Poller()
poller.register(rep_socket)
......
......@@ -6,12 +6,15 @@ class MPIJail(NumberedGroup):
server_number = 6000
def __init__(self, name, ID, node_list, base_rank, base_network, timeout=-1):
def __init__(self, name, ID, node_list, base_rank, base_network, tcp=False, timeout=-1):
NumberedGroup.__init__(self, name, ID, node_list, base_rank,
base_network, timeout)
MPIJail.server_number+=1
self.server_number = MPIJail.server_number
self.server_socket = "{}".format(self.server_number)
self.is_ipc = consts.TRUE
if tcp :
self.is_ipc = consts.FALSE
def start_numbered_group(self) :
def mpi_session_started(data) :
......@@ -20,7 +23,8 @@ class MPIJail(NumberedGroup):
self.root_r,
self.root_n,
self.server_socket,
mpi_session_started)
mpi_session_started,
self.is_ipc)
# to override
def start_mpi_jail_group(self) :
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment