Vous avez reçu un message "Your GitLab account has been locked ..." ? Pas d'inquiétude : lisez cet article https://docs.gricad-pages.univ-grenoble-alpes.fr/help/unlock/

Commit 11d4dce5 authored by lavocat's avatar lavocat Committed by Swann Perarnau
Browse files

Separate Isengard from network, related to issue #13

parent 1a4fac89
......@@ -3,3 +3,4 @@ from .consts import *
from .consts import bcolors
from .host_name_parser import Helper
from .event import Event
from .callback import CallbackObject
class CallbackObject:
def __init__(self, dnode, dnetid, snode, snetid, callback, value):
self.value = value
self.dnode = dnode
self.snode = snode
self.dnetid = dnetid
self.snetid = snetid
self.callback = callback
def fire(self, data):
self.callback(data, self.dnode, self.dnetid, self.snode, self.snetid,
self.value)
def copy_with_another_callback(self, callback):
return CallbackObject(self.dnode, self.dnetid, self.snode,
self.dnetid, callback, self.value)
def __str__(self):
return (" TO WARN {} {} {} {} {} ".format(self.snode, self.snetid,
self.dnode, self.dnetid,
self.value))
This diff is collapsed.
......@@ -180,27 +180,6 @@ class FrameworkControler:
snetid,
message)
# Display the network dnetid
#
# params
# synchro : the tree need to be synchronized or not for this action
# dnode : the node who will aply the order
# dnetid : the network of the node who will apply the order
# snode : the node who emit the order
# snetid : the network of the node who emit the order
def network_on(self, synchro, dnode, dnetid, snode, snetid, callback=None):
id_callback = -1
if callback != None :
id_callback = self.erebor.register_execute_callback(callback)
message = json.dumps({
consts.ACTION:consts.NETWORK,consts.SYNCHRO:consts.TRUE,
consts.CALLBACK:id_callback})
self.erebor.send_trans_network_message_to(dnode,
dnetid,
snode,
snetid,
message)
# Network update on dnode@dnetid
#
# params
......
import logging
import sys
import socket
import time
import re
import signal
import json
from .. import consts
from ..isengard.isengardc import Isengard
......@@ -11,68 +7,44 @@ from ..consts import bcolors
logger = logging.getLogger('yggdrasil')
class Network(Isengard) :
class Network():
def __init__(self, add_to_queue, root, ID, debug_list, loglv, logf,
taktuk_path, options="", log_time=None):
Isengard.__init__(self, add_to_queue, root, debug_list, loglv, logf,
taktuk_path, options, log_time)
taktuk_path, options="", log_time=None):
self.debug_list = debug_list
self.debug = "network" in self.debug_list
self.loglv = loglv
self.logf = logf
self.ID = ID
self.ID = ID
self.child_callbacks = []
self.order_callbacks = []
self.ntwrk_callbacks = []
self.mtrsn_callbacks = []
self.trsnt_callbacks = []
self.sbprc_callbacks = []
self.stats_callbacks = []
self.spawn_callbacks = []
self.lost_callbacks = []
self.dead_callbacks = []
self.ntwud_callbacks = []
self.ntwrn_callbacks = []
self.deadb_callbacks = []
self.gmsg_callbacks = []
self.state = consts.INIT
self.connector = Isengard(add_to_queue, root, debug_list, loglv, logf,
taktuk_path,
self.bridge_internal_callback,
self.bridge_isengard_callback,
self.bridge_generic_message_callback,
self.fire_network_ready_callback,
self.kill_needed,
options, log_time)
# Registration ############################################################
def one_shot_register_on_network_update(self, values):
self.ntwud_callbacks.append(values)
def one_shot_register_on_network_renumber(self, values):
self.ntwrn_callbacks.append(values)
def register_on_dead_bridge(self, values):
self.deadb_callbacks.append(values)
def register_on_bridge_generic_messages(self, values):
self.gmsg_callbacks.append(values)
def register_on_lost_nodes(self, values):
self.lost_callbacks.append(values)
def register_on_dead_nodes(self, values):
self.dead_callbacks.append(values)
def one_shot_register_on_spawn_nodes(self, values):
self.spawn_callbacks.append(values)
def one_shot_register_on_status_print(self, values):
self.stats_callbacks.append(values)
def register_on_subprocess_callback(self, register_callback):
self.sbprc_callbacks.append(register_callback)
def register_mpi_trans_network_callback(self, register_callback):
self.mtrsn_callbacks.append(register_callback)
def register_trans_network_callback(self, register_callback):
self.trsnt_callbacks.append(register_callback)
# network bootstrap
def register_network_bootstrap_callback(self, register_callback):
self.ntwrk_callbacks.append(register_callback)
......@@ -82,7 +54,7 @@ class Network(Isengard) :
def register_children_callback(self, register_callback):
self.child_callbacks.append(register_callback)
# Events ###################################################################
# Events ##################################################################
# When an order pop
def new_mpi_trans_network_has_come(self, order, rank):
......@@ -100,34 +72,28 @@ class Network(Isengard) :
def new_child_has_come(self, rank, ID):
self.call_connection_callbacks(rank, ID)
# is called when a subprocess write something
def _subprocess_callback(self, txt) :
self.call_sbprc_callbacks(self.ID, txt)
return json.dumps({consts.TYPE:consts.CONTROL, consts.VALUE:consts.ACK})
def bridge_dead(self) :
self.call_dead_bridge_calllbacks()
def bridge_generic_message_callback(self, txt) :
def bridge_generic_message_callback(self, txt):
logger.info("receive generic bridge message {}".format(txt))
self.call_gmsg_callbacks(txt)
# is called when something arrive from the bridge
def bridge_internal_callback(self, obj) :
if obj[consts.TYPE] == consts.INFOS :
def bridge_internal_callback(self, obj):
if obj[consts.TYPE] == consts.INFOS:
# Look for rank information
self.rank = obj[consts.RANK]
self.connector.rank = obj[consts.RANK]
logger.debug("my rank is "+self.rank)
# If not network_master, forward the information
self.notify_bootstrap(self.rank != "0")
elif obj[consts.TYPE] == consts.QUIT :
elif obj[consts.TYPE] == consts.QUIT:
self.new_order_has_come(json.loads(consts.QUIT_ORDER), "0")
# Display some logs
else :
logger.error(bcolors.FAIL+ "-> Bridge log : "+obj[consts.DATA]+bcolors.ENDC)
else:
logger.error(bcolors.FAIL + "-> Bridge log: "+obj[
consts.DATA]+bcolors.ENDC)
def bridge_isengard_callback(self, sender, decoded_data) :
if decoded_data[consts.TYPE] == consts.INFOS :
def bridge_isengard_callback(self, sender, decoded_data):
if decoded_data[consts.TYPE] == consts.INFOS:
# Get children's info
# It means a new one as shown
# transfert request
......@@ -136,41 +102,16 @@ class Network(Isengard) :
decoded_data[consts.RANK],
decoded_data[consts.ID])
# If it is an order
elif decoded_data[consts.TYPE] == consts.ORDER :
elif decoded_data[consts.TYPE] == consts.ORDER:
self.new_order_has_come(decoded_data, sender)
# If it is a trans-network communication
elif decoded_data[consts.TYPE] == consts.TNETWRK :
elif decoded_data[consts.TYPE] == consts.TNETWRK:
self.new_trans_network_has_come(decoded_data, sender)
elif decoded_data[consts.TYPE] == consts.MTNETWRK :
elif decoded_data[consts.TYPE] == consts.MTNETWRK:
self.new_mpi_trans_network_has_come(decoded_data, sender)
# is called when something arrive from taktuk output
def taktuk_stdout_callback(self, txt, txt_8) :
logger.info(bcolors.OKBLUE+ txt_8 +bcolors.ENDC)
# is called when status information are displayed by taktuk
def _status_print(self, txt):
logger.info(bcolors.WHITE+bcolors.UNDERLINE+ txt +bcolors.ENDC)
if txt.startswith(socket.gethostname()) :
self.call_stats_callbacks()
def fire_connection_lost(self, sp_node, error_nodes) :
self.call_lost_callbacks(sp_node, error_nodes)
def fire_node_dead(self, sp_node, error_nodes, errors_rank) :
self.call_dead_callbacks(error_nodes, errors_rank)
def fire_network_ready_callback(self, error_nodes):
self.call_spawn_callbacks(",".join(error_nodes))
def fire_network_renumber_callback(self):
self.call_ntwrn_callbacks()
def fire_network_update_failure_callback(self):
self.call_ntwud_callbacks("failure")
def fire_network_update_success_callback(self):
self.call_ntwud_callbacks("success")
pass
def kill_needed(self, n):
self.new_order_has_come(json.loads(consts.QUIT_ORDER), "0")
......@@ -178,104 +119,42 @@ class Network(Isengard) :
# Propagation ##############################################################
# Notify my bootstrap :
# Notify my bootstrap:
# -> to the registered callbacks
# -> to my network master (if it exsit : propagate = true)
# -> to my network master (if it exsit: propagate = true)
def notify_bootstrap(self, propagate):
self.state = consts.RUNING
data = json.dumps({
consts.FROM:self.rank,
consts.TYPE:consts.INFOS,
consts.RANK:self.rank,
consts.ID:self.ID
});
if propagate :
self.send_message_to("0", "all", data, False)
consts.FROM: self.rank,
consts.TYPE: consts.INFOS,
consts.RANK: self.rank,
consts.ID: self.ID
})
if propagate:
self.connector.send_message_to("0", "all", data, False)
self.call_ntwrk_callbacks(data)
def call_sbprc_callbacks(self, ID, txt) :
for callback in self.sbprc_callbacks :
callback(ID, txt)
def call_ntwrk_callbacks(self, order) :
for callback in self.ntwrk_callbacks :
# network bootstrap
def call_ntwrk_callbacks(self, order):
for callback in self.ntwrk_callbacks:
callback(order)
def call_order_callbacks(self, order,rank,ID) :
for callback in self.order_callbacks :
def call_order_callbacks(self, order, rank, ID):
for callback in self.order_callbacks:
callback(order, rank, ID)
def call_mpi_transnet_callbacks(self, order, rank) :
def call_mpi_transnet_callbacks(self, order, rank):
for callback in self.mtrsn_callbacks:
callback(order, rank)
def call_transnet_callbacks(self, order, rank) :
def call_transnet_callbacks(self, order, rank):
for callback in self.trsnt_callbacks:
callback(order, rank)
def call_connection_callbacks(self, rank, ID) :
for callback in self.child_callbacks :
def call_connection_callbacks(self, rank, ID):
for callback in self.child_callbacks:
callback(rank, ID)
def call_stats_callbacks(self) :
for c in self.stats_callbacks :
c.callback("", c.dnode, c.dnetid, c.snode, c.snetid, c.value)
self.stats_callbacks = []
def call_ntwud_callbacks(self, failure) :
for c in self.ntwud_callbacks :
c.callback(failure, c.dnode, c.dnetid, c.snode, c.snetid, c.value)
self.ntwud_callbacks = []
def call_ntwrn_callbacks(self) :
for c in self.ntwrn_callbacks :
c.callback("", c.dnode, c.dnetid, c.snode, c.snetid, c.value)
self.ntwrn_callbacks = []
def call_dead_bridge_calllbacks(self) :
for c in self.deadb_callbacks :
c(self)
def call_lost_callbacks(self, error_nodes, errors_rank) :
for c in self.lost_callbacks :
c(self.rank, self.ID, error_nodes, errors_rank)
def call_dead_callbacks(self, error_nodes, errors_rank) :
for c in self.dead_callbacks :
c(self.rank, self.ID, error_nodes, errors_rank)
def call_spawn_callbacks(self, error_nodes) :
for c in self.spawn_callbacks :
c.callback(error_nodes, c.dnode, c.dnetid, c.snode, c.snetid, c.value)
self.spawn_callbacks = []
def call_gmsg_callbacks(self, txt) :
for c in self.gmsg_callbacks :
def call_gmsg_callbacks(self, txt):
for c in self.gmsg_callbacks:
c(txt)
# Commands to manipulate Taktuk ############################################
def network_update(self, synch, taktuk=True, callback_object=None) :
# get notified on spawn
if callback_object != None :
self.one_shot_register_on_network_update(callback_object)
Isengard.network_update(self, synch, taktuk)
def network_renumber(self, synch, taktuk=True, callback_object=None) :
# get notified on spawn
if callback_object != None :
self.one_shot_register_on_network_renumber(callback_object)
Isengard.network_renumber(self, synch, taktuk)
def spawn_nodes(self, dest, nodes, synch, taktuk=True, callback_object=None):
# get notified on spawn
if callback_object != None :
self.one_shot_register_on_spawn_nodes(callback_object)
Isengard.spawn_nodes(self, dest, nodes, synch, taktuk)
def request_network_status(self, synch, taktuk=True, callback_object=None):
# get notified on status request ?
if callback_object != None :
self.one_shot_register_on_status_print(callback_object)
Isengard.request_network_status(self, synch, taktuk)
from .isengardc import Isengard
from .unix_socket_wrapper import Unix_socket
from .taktuk_wrapper import Wrapper
from .commands import BrodcastRunningCommand
from .commands import RunningCommand
import re
import logging
import json
from .. import consts
logger = logging.getLogger('yggdrasil')
class RunningCommand:
def __init__(self, name, dest, command, debug, loglv, logf,
callback_object=None):
self.name = name
self.dest = dest
self.command = command
self.stdout = []
self.stderr = []
self.finall = ""
self.wait = 1
self.debug = debug
self.loglv = loglv
self.logf = logf
self.stdout_regex = self.get_stdout_regex()
self.stderr_regex = self.get_stderr_regex()
self.final_regex = self.get_final_regex()
self.callback_object = callback_object
def test_and_dispatch(self, txt_utf8, txt_base_64):
if self.stdout_regex.match(txt_utf8) is not None:
self.append_stdout(txt_utf8, txt_base_64)
return 0
elif self.stderr_regex.match(txt_utf8) is not None:
self.append_stderr(txt_utf8, txt_base_64)
return 1
elif self.final_regex.match(txt_utf8) is not None:
self.final(txt_utf8, txt_base_64)
if self.wait == 0:
return 2
else:
return 0
else:
return -1
def get_stdout_regex(self):
regex = "^.*{}-{}: {} \(.*\): output >.*".format(self.name, self.dest,
self.command)
logger.debug("regex -> ^.*{}-{}: {} \(.*\):"
" output >.*".format(self.name, self.dest, self.command))
return re.compile(regex)
def get_stderr_regex(self):
regex = "^.*{}-{}: {} \(.*\): error >.*".format(self.name, self.dest,
self.command)
logger.debug("regex -> ^.*{}-{}: {} \(.*\):"
" error >.*".format(self.name, self.dest, self.command))
return re.compile(regex)
def get_final_regex(self):
regex = "^.*{}-{}: {} \(.*\): status >.*".format(self.name, self.dest,
self.command)
logger.debug("regex -> ^.*{}-{}: {} \(.*\):"
" status >.*".format(self.name, self.dest, self.command))
return re.compile(regex)
def append_stdout(self, line, line_64):
self.stdout.append(line_64)
def append_stderr(self, line, line_64):
self.stderr.append(line_64)
# handle here ?
def node_deads(self, error_nodes, errors_rank):
logger.warning("error nodes {}".format(errors_rank))
for i in range(0, len(error_nodes)):
rank = "X"
if i < len(errors_rank):
rank = errors_rank[i]
logger.warning("comp '{}' '{}'".format(rank, self.dest))
if self.dest == "[0-9]*" or rank in self.dest:
logger.error("dead")
self.final("dead")
if self.wait == 0:
return 2
else:
return 1
return 0
def final(self, line, line64):
self.wait -= 1
self.finall = line64
if self.callback_object is not None:
logger.debug(" TO WARN {} ".format(self.callback_object))
data = json.dumps({
consts.STDOUT: self.stdout,
consts.STDERR: self.stderr,
consts.STATUS: self.finall
})
self.callback_object.fire(data)
else:
self.print_self(self.stdout, self.stderr, self.finall)
def print_self(self, stdout, stderr, status):
for s in stdout:
logger.info(bcolors.OKGREEN+"COMMAND output > "+s+bcolors.ENDC)
for s in stderr:
logger.info(bcolors.OKGREEN+"COMMAND error > "+s+bcolors.ENDC)
logger.info(bcolors.OKGREEN+"COMMAND status > "+status+bcolors.ENDC)
class BrodcastRunningCommand(RunningCommand):
def __init__(self, dest_list, command, debug, logv, logf, callback_object):
RunningCommand.__init__(self, "", "[0-9]*", command, debug, logv, logf,
callback_object.copy_with_another_callback(
self.done))
self.wait = len(dest_list)-1
self.end_out = []
self.bstatus = dict()
self.bstdout = dict()
self.bstderr = dict()
self.my_callbac_object = callback_object
logger.debug("broadcast wait {}".format(self.wait))
logger.info("dest to wait broadcast command {}".format(self.wait))
def append_stdout(self, line, line_64):
identity = line.split(":")[0]
if identity not in self.bstdout:
self.bstdout[identity] = []
self.bstdout[identity].append(line_64)
def append_stderr(self, line, line_64):
identity = line.split(":")[0]
if identity not in self.bstderr:
self.bstderr[identity] = []
self.bstderr[identity].append(line_64)
def append_status(self, line, line_64):
identity = line.split(":")[0]
if identity not in self.bstatus:
self.bstatus[identity] = []
self.bstatus[identity].append(line_64)
def final(self, line, line_64):
self.wait -= 1
self.append_status(line, line_64)
self.callback_object.fire("")
def done(self, data, snode, snetid, dnode, dnetid, value):
if self.wait == 0:
final = json.dumps({
consts.STDOUT: self.bstdout,
consts.STDERR: self.bstderr,
consts.STATUS: self.bstatus
})
self.my_callbac_object.fire(final)
import base64
import logging
import json
import time
......@@ -12,6 +13,8 @@ from .taktuk_wrapper import Wrapper
from ..consts import bcolors
from .. import Helper
from .. import Event
from .commands import BrodcastRunningCommand
from .commands import RunningCommand
logger = logging.getLogger('yggdrasil')
......@@ -118,7 +121,12 @@ class Isengard:
# has to know if this instance is the root of all the network or not
# because a non-root owns no taktuk !
def __init__(self, add_to_queue, root, debug_list, loglv, logf, taktuk_path,
taktuk_options="", log_time=None):
c1, c2, c3, c4, c5, taktuk_options="", log_time=None):
self.internal = c1
self.bisengard = c2
self.bgeneric = c3
self.nready = c4
self.killn = c5
self.taktuk_path= taktuk_path
self.flog_time = log_time
self.debug_list = debug_list
......@@ -140,6 +148,7 @@ class Isengard:
self.spawned[socket.gethostname()+"-0"].wait_start = False
self.spawned[socket.gethostname()+"-0"].me = True
self.lastHB = time.time()
self.running_commands = []
if self.isRoot :
# launch taktuk
self.taktuk = Wrapper(self.taktuk_stdout_callback, debug_list,
......@@ -153,6 +162,14 @@ class Isengard:
self.bridge = Bridge(None, debug_list,self.loglv, self.logf, self.bridge_callback)
self.heaBeat= Timer(consts.heartBeat, self.heart_beat)
self.heaBeat.start()
#callbacks
self.ntwud_callbacks = []
self.ntwrn_callbacks = []
self.spawn_callbacks = []
self.lost_callbacks = []
self.dead_callbacks = []
self.deadb_callbacks = []
self.state = consts.INIT
def heart_beat(self):
kill_needed = False
......@@ -168,9 +185,9 @@ class Isengard:
self.heaBeat= Timer(consts.heartBeat, self.heart_beat)
self.heaBeat.start()
# To override
def kill_needed(self, n):
logger.info(bcolors.FAIL+"parent dead"+bcolors.ENDC)
self.killn(n)
def log_time(self, t):
if self.flog_time != None :
......@@ -229,6 +246,21 @@ class Isengard:
nodes.append(str(children.rank))