Commit 054e9122 authored by Swann Perarnau's avatar Swann Perarnau
Browse files

Refactor the code to use singleton logging

This is more in line with the way the logging module is meant to be
used.
We still need to create new objects in a few places (entry points) and
propagate the config to newly created processes elsewhere.

This is still missing module filtering that was working before. I will
create a new issue for that.

Closes #11
parent 2396f4d6
This diff is collapsed.
......@@ -5,7 +5,9 @@ import sys
import signal
import yggdrasil
from yggdrasil.erebor import Erebor
from ..isengard import create_logger
from ..isengard.log import configure_logger
logger = logging.getLogger('yggdrasil')
def runner(argv, Controler=None):
......@@ -125,20 +127,7 @@ def runner(argv, Controler=None):
"unix_socket", "bridge", "mpi"]
else :
log = log.split(",")
# Transform a log value to its integer value
if loglv in ["-l", "--debug-level"] :
if value == "CRITICAL" :
loglv = logging.CRITICAL
elif value == "ERROR":
loglv = logging.ERROR
elif value == "WARNING":
loglv = logging.WARNING
elif value == "INFO":
loglv = logging.INFO
elif value == "DEBUG":
loglv = logging.DEBUG
else :
loglv = logging.NOTSET
# Parse the host file
hosts = args.file_list
if hosts != "" :
......@@ -153,7 +142,7 @@ def runner(argv, Controler=None):
tpath = tpath+"/"
# Create logger
logger = create_logger('yggdrasil.erebor.main', loglv, logf)
configure_logger(logger, log, loglv, logf)
logger.info("Option parsing done")
erebor = Erebor(root, log, loglv, logf, epath, tpath)
......@@ -177,4 +166,5 @@ def runner(argv, Controler=None):
def main() :
logger.setLevel(logging.INFO)
runner(sys.argv)
......@@ -6,9 +6,10 @@ import zmq
from ..isengard import consts
from .encoder import MPIDecoder
from threading import Thread
from ..isengard import create_logger
from ..isengard.socket_bridge import Bridge
logger = logging.getLogger('yggdrasil')
class Event :
def __init__(self, f, data) :
self.function = f
......@@ -31,10 +32,6 @@ class MPIHook(Thread) :
self.loglv = loglv
self.logf = logf
self.debug = "mpi" in self.debug_list
self.logger = create_logger('yggdrasil.erebor.mpibridge', loglv,
logf)
if not self.debug :
self.logger.setLevel(logging.NOTSET)
self.context = zmq.Context()
self.poll = True
self.subscriber = dict()
......@@ -50,36 +47,36 @@ class MPIHook(Thread) :
context = zmq.Context()
rep_socket = context.socket(zmq.PULL)
rep_socket.bind("ipc:///tmp/{}".format(self.port))
self.logger.debug(consts.bold("mpi -> reply socket ready to receive"))
logger.debug(consts.bold("mpi -> reply socket ready to receive"))
while self.poll:
try :
self.logger.debug(consts.bold("wait to receive"))
logger.debug(consts.bold("wait to receive"))
rcv_message = rep_socket.recv()
self.logger.debug(consts.bold("received"))
self.logger.debug(consts.bold("received {}".format(rcv_message)))
logger.debug(consts.bold("received"))
logger.debug(consts.bold("received {}".format(rcv_message)))
decoded_message = self.unpack(rcv_message)
self.logger.debug(consts.bold("unpacked {}".format(decoded_message)))
logger.debug(consts.bold("unpacked {}".format(decoded_message)))
# a new process is registering to me
if decoded_message[consts.TYPE] == consts.REG :
self.logger.debug(consts.bold("mpi -> new registration to do later"))
logger.debug(consts.bold("mpi -> new registration to do later"))
self.add_to_queue(Event(self.new_registration, decoded_message))
# a process wants to send a message to another one
if decoded_message[consts.TYPE] == consts.MPISEND :
self.logger.debug(consts.bold("mpi -> send a message later"))
logger.debug(consts.bold("mpi -> send a message later"))
self.add_to_queue(Event(self.send_message_to_distant_mpi_rank,
decoded_message))
if decoded_message[consts.TYPE] == consts.MESSAGE :
self.logger.debug(consts.bold("mpi -> send a ctrl message later"))
logger.debug(consts.bold("mpi -> send a ctrl message later"))
self.add_to_queue(Event(self.send_message_to_distant_erebor,
decoded_message))
# unlock the state of request socket
self.logger.debug(consts.bold("mpi -> done loop"))
logger.debug(consts.bold("mpi -> done loop"))
except Exception as e:
self.logger.warning(consts.bold("{}".format(e)))
logger.warning(consts.bold("{}".format(e)))
def unpack(self, to_unpack) :
fields = self.decoder.unpack(to_unpack)
self.logger.debug(consts.bold("unpack {}".format(fields)))
logger.debug(consts.bold("unpack {}".format(fields)))
if fields[0] == consts.REG :
return {
consts.TYPE:consts.REG,
......@@ -87,7 +84,7 @@ class MPIHook(Thread) :
consts.PORT:fields[2]
}
else :
self.logger.debug(consts.bold(fields[0]))
logger.debug(consts.bold(fields[0]))
return {
consts.TYPE:fields[0],
consts.DNODE:fields[1],
......@@ -103,7 +100,7 @@ class MPIHook(Thread) :
def new_registration(self, decoded_message) :
# register the rank
rank = decoded_message[consts.RANK]
self.logger.info(consts.bold("-> new registration {}".format(rank)))
logger.info(consts.bold("-> new registration {}".format(rank)))
socket = self.context.socket(zmq.PUSH)
socket.connect("ipc:///tmp/{}".format(decoded_message[consts.PORT]))
self.subscriber[rank] = socket
......@@ -116,10 +113,10 @@ class MPIHook(Thread) :
message)
def send_message_to_local_mpi_rank(self, mpi_rank, dnode, dnetid, message) :
self.logger.info(consts.bold("mpi -> found rank to send data"))
logger.info(consts.bold("mpi -> found rank to send data"))
socket = self.subscriber[mpi_rank]
socket.send(self.decoder.pack(dnode, dnetid, message))
self.logger.debug(consts.bold("mpi -> data sent"))
logger.debug(consts.bold("mpi -> data sent"))
def send_message_to_distant_mpi_rank(self, decoded_message) :
dnode = decoded_message[consts.DNODE]
......@@ -127,7 +124,7 @@ class MPIHook(Thread) :
snode = decoded_message[consts.SNODE]
snet = decoded_message[consts.SNETID]
data = decoded_message[consts.DATA]
self.logger.info(consts.bold("mpi -> request sending mpi trans network message"))
logger.info(consts.bold("mpi -> request sending mpi trans network message"))
self.send_mpi_trans_network_message_to(dnode,dnet,snode,snet,data)
def send_message_to_distant_erebor(self, decoded_message) :
......@@ -143,7 +140,7 @@ class MPIHook(Thread) :
consts.DATA:data,
consts.SYNCHRO:consts.FALSE
})
self.logger.info(consts.bold("mpi -> request sending a trans network message"))
logger.info(consts.bold("mpi -> request sending a trans network message"))
self.send_trans_network_message_to(dnode,dnet,snode,snet,message)
def kill(self) :
......
......@@ -5,11 +5,12 @@ import time
import re
import signal
import json
from ..isengard import create_logger
from ..isengard import consts
from ..isengard.isengardc import Isengard
from ..isengard.consts import bcolors
logger = logging.getLogger('yggdrasil')
class Network(Isengard) :
def __init__(self, add_to_queue, root, ID, debug_list, loglv, logf,
......@@ -20,10 +21,6 @@ class Network(Isengard) :
self.debug = "network" in self.debug_list
self.loglv = loglv
self.logf = logf
self.logger = create_logger('yggdrasil.erebor.network', loglv,
logf)
if not self.debug :
self.logger.setLevel(logging.NOTSET)
self.ID = ID
self.child_callbacks = []
self.order_callbacks = []
......@@ -112,7 +109,7 @@ class Network(Isengard) :
self.call_dead_bridge_calllbacks()
def bridge_generic_message_callback(self, txt) :
self.logger.info("receive generic bridge message {}".format(txt))
logger.info("receive generic bridge message {}".format(txt))
self.call_gmsg_callbacks(txt)
# is called when something arrive from the bridge
......@@ -120,14 +117,14 @@ class Network(Isengard) :
if obj[consts.TYPE] == consts.INFOS :
# Look for rank information
self.rank = obj[consts.RANK]
self.logger.debug("my rank is "+self.rank)
logger.debug("my rank is "+self.rank)
# If not network_master, forward the information
self.notify_bootstrap(self.rank != "0")
elif obj[consts.TYPE] == consts.QUIT :
self.new_order_has_come(json.loads(consts.QUIT_ORDER), "0")
# Display some logs
else :
self.logger.error(bcolors.FAIL+ "-> Bridge log : "+obj[consts.DATA]+bcolors.ENDC)
logger.error(bcolors.FAIL+ "-> Bridge log : "+obj[consts.DATA]+bcolors.ENDC)
def bridge_isengard_callback(self, sender, decoded_data) :
if decoded_data[consts.TYPE] == consts.INFOS :
......@@ -149,11 +146,11 @@ class Network(Isengard) :
# is called when something arrive from taktuk output
def taktuk_stdout_callback(self, txt, txt_8) :
self.logger.info(bcolors.OKBLUE+ txt_8 +bcolors.ENDC)
logger.info(bcolors.OKBLUE+ txt_8 +bcolors.ENDC)
# is called when status information are displayed by taktuk
def _status_print(self, txt):
self.logger.info(bcolors.WHITE+bcolors.UNDERLINE+ txt +bcolors.ENDC)
logger.info(bcolors.WHITE+bcolors.UNDERLINE+ txt +bcolors.ENDC)
if txt.startswith(socket.gethostname()) :
self.call_stats_callbacks()
......
from .log import create_logger
from .log import configure_logger
from .consts import *
from .consts import bcolors
from .isengardc import Isengard
......
......@@ -6,7 +6,6 @@ import re
import sys
import queue
import multiprocessing
from . import create_logger
from threading import Lock
from threading import Timer
from . import consts
......@@ -15,6 +14,8 @@ from .socket_bridge import Bridge
from .taktuk_wrapper import Wrapper
from .consts import bcolors
logger = logging.getLogger('yggdrasil')
HOST = 0
PEER = 1
POSITION = 1
......@@ -282,11 +283,6 @@ class Isengard:
self.debug_list = debug_list
self.loglv = loglv
self.logf = logf
self.debug = "isengard" in self.debug_list
self.logger = create_logger('yggdrasil.isengard.isengardc', loglv,
logf)
if not self.debug :
self.logger.setLevel(logging.NOTSET)
self.isRoot = root
self.spawned_lock = Lock()
self.spawned = dict()
......@@ -333,14 +329,14 @@ class Isengard:
# To override
def kill_needed(self, n):
self.logger.info(bcolors.FAIL+"parent dead"+bcolors.ENDC)
logger.info(bcolors.FAIL+"parent dead"+bcolors.ENDC)
def log_time(self, t):
if self.flog_time != None :
self.flog_time(t)
def send_to_taktuk(self, data):
#self.logger.debug("command to taktuk {}".format(data))
#logger.debug("command to taktuk {}".format(data))
if self.isRoot :
self.socket.send_message(data)
else :
......@@ -373,7 +369,7 @@ class Isengard:
def _connector_error(self, txt):
self.spawned_lock.acquire()
arguments = txt.split("connector:")[1].split(";")
self.logger.warning(consts.bold("connector error {}".format(arguments)))
logger.warning(consts.bold("connector error {}".format(arguments)))
if "Possible precedence issue with control " not in arguments[2] :
sp_node, index = self.get_wait_spawned(arguments[PEER])
if index > -1 :
......@@ -417,9 +413,9 @@ class Isengard:
return list(self.error_nodes_s)
def get_wait_spawned(self, name) :
#self.logger.debug("get wait spawn {}".format(name))
#logger.debug("get wait spawn {}".format(name))
for i in range(0, len(self.wait_spawn)) :
#self.logger.debug("spawn {}".format(name))
#logger.debug("spawn {}".format(name))
if self.wait_spawn[i].name == name :
return self.wait_spawn[i], i
return None, -1
......@@ -432,9 +428,9 @@ class Isengard:
self.spawned_lock.acquire()
#self.log_time("state message")
arguments = txt.split("state:")[1].split(";")
#self.logger.debug(arguments)
#logger.debug(arguments)
sp_node_key = arguments[HOST]+"-"+arguments[POSITION]
#self.logger.debug("key {}".format(sp_node_key))
#logger.debug("key {}".format(sp_node_key))
sp_node = None
if sp_node_key in self.spawned :
sp_node = self.spawned[sp_node_key]
......@@ -443,10 +439,10 @@ class Isengard:
if sp_node != None :
self.log_time("node appear")
self.spawned[sp_node_key] = sp_node
self.logger.debug("spawned nodes -> {}".format(len(self.spawned)))
logger.debug("spawned nodes -> {}".format(len(self.spawned)))
del self.wait_spawn[index]
else :
self.logger.error("this should not happen {} -> {} {}".format(txt, sp_node_key, self.spawned.keys()))
logger.error("this should not happen {} -> {} {}".format(txt, sp_node_key, self.spawned.keys()))
self.spawned_lock.release()
return
nodes = sp_node.update(arguments[POSITION],
......@@ -456,7 +452,7 @@ class Isengard:
#handle the case where, a just start nodes is lost having jobs to
#launch. Mark then as dead
if len(nodes) > 0 :
self.logger.debug("nodes {}".format(nodes))
logger.debug("nodes {}".format(nodes))
for node in nodes :
node = node.split(":")[0]
self.dead_node(self.spawned[node])
......@@ -465,7 +461,7 @@ class Isengard:
self.spawned_lock.release()
def propagate_state_info(self, sp_node) :
#self.logger.debug(sp_node.to_string())
#logger.debug(sp_node.to_string())
if sp_node.wait_start :
if sp_node.is_ready() or sp_node.is_failed() :
# if we were waiting for this node to start
......@@ -509,47 +505,47 @@ class Isengard:
# To override
def fire_connection_lost(self, sp_node, error_nodes) :
self.logger.info(bcolors.FAIL+"connection_lost {} {}".format(
logger.info(bcolors.FAIL+"connection_lost {} {}".format(
sp_node.name, error_nodes)+bcolors.ENDC)
# To override
def fire_node_dead(self, sp_node, error_nodes, ranks) :
self.logger.info(bcolors.FAIL+"node dead {} {}".format(
logger.info(bcolors.FAIL+"node dead {} {}".format(
sp_node.name, error_nodes)+bcolors.ENDC)
# To override
def fire_network_renumber_callback(self) :
self.logger.info(bcolors.FAIL+"network renumber_done"+bcolors.ENDC)
logger.info(bcolors.FAIL+"network renumber_done"+bcolors.ENDC)
# To override
def fire_network_ready_callback(self, error_nodes) :
self.logger.info(bcolors.FAIL+"network ready {}".format(error_nodes)+bcolors.ENDC)
logger.info(bcolors.FAIL+"network ready {}".format(error_nodes)+bcolors.ENDC)
# To override
def fire_network_update_success_callback(self):
self.logger.info(bcolors.FAIL+"network update success"+bcolors.ENDC)
logger.info(bcolors.FAIL+"network update success"+bcolors.ENDC)
# To override
def fire_network_update_failure_callback(self):
self.logger.info(bcolors.FAIL+"network update failure"+bcolors.ENDC)
logger.info(bcolors.FAIL+"network update failure"+bcolors.ENDC)
# is called when something arrive from taktuk output
def taktuk_stdout_callback(self, txt, txt_8) :
self.logger.info(bcolors.OKBLUE+ txt +bcolors.ENDC)
logger.info(bcolors.OKBLUE+ txt +bcolors.ENDC)
def status_print(self, txt, txt_utf_8):
self.add_to_queue(Event(self._status_print, txt_utf8))
def _status_print(self, txt):
self.logger.info(bcolors.UNDERLINE+ txt +bcolors.ENDC)
logger.info(bcolors.UNDERLINE+ txt +bcolors.ENDC)
# shutdown the network
def terminate(self):
if self.isRoot :
self.socket.shutdown()
self.logger.info("socket shutdown")
logger.info("socket shutdown")
self.taktuk.shutdown()
self.logger.info("taktuk shutdown")
logger.info("taktuk shutdown")
else :
self.hb_lock.acquire()
self.heaBeat.cancel()
......@@ -587,10 +583,10 @@ class Isengard:
try :
decoded_data = json.loads(envelope[consts.DATA])
if decoded_data[consts.TYPE] == consts.HEARTBT:
self.logger.info("heart beat received")
logger.info("heart beat received")
self.send_message_to(local_from,"all",consts.HBR,True)
elif decoded_data[consts.TYPE] == consts.HEARTBTR:
self.logger.info("heart beat answer received")
logger.info("heart beat answer received")
self.lastHB = time.time()
else :
return self.bridge_isengard_callback(local_from,
......@@ -600,7 +596,7 @@ class Isengard:
except ValueError :
return self.bridge_generic_message_callback(txt)
except ValueError :
self.logger.warning(json.dumps({
logger.warning(json.dumps({
consts.TYPE:consts.CONTROL,
consts.VALUE:consts.INVJSON}))
......@@ -622,25 +618,25 @@ class Isengard:
def kill_all(self, rank):
if self.isRoot :
self.logger.info(" kill "+rank)
logger.info(" kill "+rank)
self.taktuk.send_command(rank+" kill target all")
def spawn_nodes(self, dest, nodes, synch, taktuk=True):
self.spawned_lock.acquire()
#self.logger.debug("spawn request received")
#logger.debug("spawn request received")
# parse the asked nodes following those rules :
for node in self.helper.build_list(nodes) :
# create an object to follow the updates of the node
sp_node = Spawned(node)
self.wait_spawn.append(sp_node)
# self.logger.debug("add node {}".format(node))
# logger.debug("add node {}".format(node))
# add the nodes to the wait_start_list, this list will be used to
# propagate the state information.
self.to_wait_start = len(self.wait_spawn)
#self.logger.debug("lock release {}".format(len(self.wait_spawn)))
#logger.debug("lock release {}".format(len(self.wait_spawn)))
self.spawned_lock.release()
self.final_spawn_nodes(dest, nodes, synch, taktuk)
#self.logger.debug("spawn request send to taktuk")
#logger.debug("spawn request send to taktuk")
def final_spawn_nodes(self, dest, nodes, synch, taktuk=True):
if taktuk :
......
import logging
def create_logger(name, loglv, logf) :
logger = logging.getLogger(name)
logger.setLevel(loglv)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
if logf != "" :
def configure_logger(logger, log, loglv, logf):
FMT = '%(asctime)s - %(name)s - %(module)s - %(levelname)s - %(message)s'
formatter = logging.Formatter(FMT)
if logf:
fh = logging.FileHandler(logf)
fh.setLevel(loglv)
fh.setFormatter(formatter)
logger.addHandler(fh)
ch = logging.StreamHandler()
ch.setLevel(loglv)
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.setLevel(loglv)
return logger
......@@ -13,12 +13,14 @@ import sys
import socket
import select
import yggdrasil
from . import create_logger
from . import configure_logger
from . import consts
from threading import Thread
from threading import Timer
from threading import Lock
logger = logging.getLogger('yggdrasil')
HEADER_BUFFER = 64
SMALL_BUFFER = 20
MSG_LENGTH = 1
......@@ -102,9 +104,6 @@ class Bridge(Thread):
self.debug = "bridge" in self.debug_list
self.loglv = loglv
self.logf = logf
self.logger = create_logger('yggdrasil.isengard.bridge', loglv, logf)
if not self.debug :
self.logger.setLevel(logging.NOTSET)
#connect to the taktuk network
self.ind = int(os.getenv("TAKTUK_CONTROL_READ"))
self.outd = int(os.getenv("TAKTUK_CONTROL_WRITE"))
......@@ -124,7 +123,7 @@ class Bridge(Thread):
self.controler_socket = controler_socket
self.receive_callback = receive_callback
if self.controler_socket != None :
self.logger.info("connected to erebor")
logger.info("connected to erebor")
self.controler_lock = Lock()
self.controler_ready = False
self.controler_lock.acquire()
......@@ -143,7 +142,7 @@ class Bridge(Thread):
return 0
try :
while self.poll and (size >0) and not self.dead and self.poll:
#self.logger.debug("write")
#logger.debug("write")
result = self.file_out.write(data)
if result <= 0 or result == None:
return -1
......@@ -183,20 +182,20 @@ class Bridge(Thread):
t_buffered_m = None
t_rest_to_read = 0
events = epoll.poll(consts.timeout)
#self.logger.debug("epolling")
#logger.debug("epolling")
for fileno, event in events:
#self.logger.debug("event to handle")
#logger.debug("event to handle")
if event & select.EPOLLIN:
if (self.controler_socket != None and
fileno == self.controler_socket.fileno()) :
length = 0
# if we start a new message
if s_buffered_m == None :
#self.logger.debug("new message")
#logger.debug("new message")
data = bytearray()
# if we start a new header
if s_header_m == None :
#self.logger.debug("new header")
#logger.debug("new header")
data = self.controler_socket.recv(4)
# otherwise complete the previous header
else :
......@@ -211,18 +210,18 @@ class Bridge(Thread):
else :
# unpack the packet length
length = struct.unpack_from('>i', data)[0]
#self.logger.debug("header complete {}".format(length))
#logger.debug("header complete {}".format(length))
s_header_m = None
else :
length = s_rest_to_read
if length > -1 :
# read what we're supposed to
data = self.controler_socket.recv(length)
#self.logger.debug("content read")
#logger.debug("content read")
# if the received message is a full one, handle it,
# otherwise, store it for later
if len(data) == length :
#self.logger.debug("good size ")
#logger.debug("good size ")
if s_buffered_m != None :
data = s_buffered_m + data
s_buffered_m = None
......@@ -310,16 +309,16 @@ class Bridge(Thread):
else :
self.bridge_dead()
except Exception as e :
self.logger.error( "{}".format(e))
logger.error( "{}".format(e))
raise
finally:
self.bridge_dead()
self.logger.fatal("error")
logger.fatal("error")
def bridge_dead(self):
self.dead = True
self.poll = False
self.logger.fatal("bridge dead")
logger.fatal("bridge dead")
toforward = json.dumps({
consts.FROM:consts.INTERNAL,
consts.TYPE:consts.BRIDGED,
......@@ -332,38 +331,38 @@ class Bridge(Thread):
# Extract a message from taktuk and send it to the unix socket as a JSON
# message TODO
def extract_taktuk_message(self, to_unpack) :
#self.logger.debug("\n get datas \n")
#logger.debug("\n get datas \n")
# The sent message by taktuk is of the following form
# code + size in the header
# so 4 bytes + 1
#self.logger.debug("try to extract message {}".format(to_unpack))
#logger.debug("try to extract message {}".format(to_unpack))
offset = 0
length = struct.unpack_from('>I', to_unpack[offset:4])[0]
offset += 4
#self.logger.debug("length : {}".format(length))
#logger.debug("length : {}".format(length))
body_length = length - MSG_LENGTH
#self.logger.debug("body_length {}".format(body_length))
#logger.debug("body_length {}".format(body_length))
code = struct.unpack_from('s', to_unpack[offset:offset+1])[0]
offset += 1