Vous avez reçu un message "Your GitLab account has been locked ..." ? Pas d'inquiétude : lisez cet article https://docs.gricad-pages.univ-grenoble-alpes.fr/help/unlock/

Commit a945080a authored by Thomas Lavocat's avatar Thomas Lavocat
Browse files

New task lib

Providing better event handling and on transitions triggers/dependencies
possibilities.
parent 017c1b35
......@@ -95,18 +95,27 @@ heartBeat= 30
MAXQ = 500
#states
BOOSTRAP = "bootstrap"
INIT = "init"
INITd = "initialized"
RUNING = "running"
SICK = "sick"
DONE = "done"
ERROR = "error"
TIMEOUT = "timeout"
CANCELED = "canceled"
TERMINAISON_PENDING = "terminaison pending"
IDLE = 1
INIT = 2
RUNNING = 4
DONE = 40
ERROR = 50
TIMEOUT = 60
CANCELED = 70
#Authorized transitions
IDLE_INIT = IDLE+INIT
IDDLE_CANCEL = IDLE+CANCELED
INIT_RUNNING = INIT+RUNNING
INIT_ERROR = INIT+ERROR
INIT_CANCELED = INIT+CANCELED
RUNNING_DONE = RUNNING+DONE
RUNNING_ERROR = RUNNING+ERROR
RUNNING_TIMEOUT = RUNNING+TIMEOUT
RUNNING_CANCELED = RUNNING+CANCELED
TERMINAISON_PENDING = "terminaison_pending"
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
......@@ -127,5 +136,9 @@ def blue(str) :
def green(str) :
return bold(bcolors.OKGREEN+str+bcolors.ENDC)
def orange(str) :
return bold(bcolors.WARNING+str+bcolors.ENDC)
def error(str) :
return bold(bcolors.FAIL+str+bcolors.ENDC)
......@@ -667,7 +667,7 @@ class Erebor:
if network != None :
logger.debug("local network routing {}".format(mpi_dnode))
# check my rank to see is i'm equivalent to mpi_dnode
if network.state == consts.RUNING :
if network.state == consts.RUNNING :
mpi_hook = self.mpi_bridges[network]
# If I am the destination
if mpi_hook.is_destination_for(mpi_dnode) :
......@@ -716,7 +716,7 @@ class Erebor:
if network != None :
logger.debug("local network routing {} {}".format(dnode, network.ID))
logger.debug("check my rank to see is i'm equivalent to dnode")
if network.state == consts.RUNING :
if network.state == consts.RUNNING :
dnode_list = self.helper.build_list("["+dnode.replace("/",",")+"]")
logger.debug(" node list {}".format(dnode_list))
# If I am the destination
......@@ -954,7 +954,7 @@ class Erebor:
else :
self.connect_to_master_network(networkId)
# update state
self.state = consts.RUNING
self.state = consts.RUNNING
# Connect to an existing network, for whom I am a slave
# There can only be one of those
......
......@@ -123,7 +123,7 @@ class Network():
# -> to the registered callbacks
# -> to my network master (if it exsit: propagate = true)
def notify_bootstrap(self, propagate):
self.state = consts.RUNING
self.state = consts.RUNNING
data = json.dumps({
consts.FROM: self.rank,
consts.TYPE: consts.INFOS,
......
......@@ -28,12 +28,20 @@ class Barrier(MessageReceiver):
self.group_to_end = group_to_end
self.how = how
def terminate_task(self):
self.group_to_end.terminate_task()
MessageReceiver.terminate_task(self)
def canceled_state(self):
if self.end :
self.group.terminate_task()
def error_state(self):
if self.end :
self.group.terminate_task()
def timeout_state(self) :
if self.end :
self.group.terminate_task()
def done_state(self):
if self.end :
self.group.terminate_task()
def trigger(self, data) :
print(data)
self.how = self.how -1
if self.how == 0 :
self.terminate_task()
return self.how == 0
......@@ -12,7 +12,7 @@ class Broadcaster(Executor):
self.command = command
self.fail_threshold = fail_threshold
def run(self) :
def runing_state(self) :
self.framework.broadcast_exec_on("0",
self.command,
consts.TRUE,
......@@ -42,6 +42,6 @@ class Broadcaster(Executor):
if stat != 0 :
nbfail += 1
if nbfail <= self.fail_threshold :
Executor.terminate_task(self)
self.request_transition(consts.RUNNING_DONE)
else :
self.notify_error()
self.request_transition(consts.RUNNING_ERROR)
......@@ -5,6 +5,9 @@ from .group import Group
from .. import consts
class Executor(Task) :
"""
Abstract class
"""
def __init__(self, on, end, name, timeout=-1, encoding=consts.encoding) :
Task.__init__(self, name, timeout)
......@@ -12,10 +15,18 @@ class Executor(Task) :
self.end = end
self.encoding = encoding
def terminate_task(self) :
def canceled_state(self):
if self.end :
self.group.terminate_task()
def error_state(self):
if self.end :
self.group.terminate_task()
def timeout_state(self) :
if self.end :
self.group.terminate_task()
def done_state(self):
if self.end :
self.group.terminate_task()
Task.terminate_task(self)
def print_data(self, data, prefix) :
decoded = json.loads(data)
......@@ -24,6 +35,7 @@ class Executor(Task) :
def printt_decoded_data(self, stderr, stdout, status, prefix) :
print("---------- {} ----------".format(prefix))
print("-> Task : {} ".format(self.name))
value = stderr
for line in value :
line = base64.b64decode(line).decode(self.encoding)
......
......@@ -15,15 +15,13 @@ class Group(Task):
self.base = "0" # A group is always attached to the 0
self.ID = ID # ID of the group
self.node_list = node_list # nodes composing the group
self.state = consts.BOOSTRAP # init state
self.max_boot_error= max_boot_error
def dead_nodes(self, rank, ID, nodes) :
if ID == self.ID :
if self.state == consts.RUNING :
self.notify_sick()
self.notify_sick()
def initialize(self) :
def init_state(self) :
self.framework.erebor.on_dead_nodes(self.dead_nodes)
# called when the new group named ID is started on base
def group_up(networkId):
......@@ -57,12 +55,20 @@ class Group(Task):
self.framework.new_group_on(self.ID, "", self.base_rank,
self.base_network, self.root_r, self.root_r);
def runing_state (self):
pass
# to override
def start_group(self) :
self._run(self.name)
self.request_transition(consts.INIT_RUNNING)
def notify_sick(self) :
self.request_transition(self.state+consts.ERROR)
def terminate_task(self) :
self.framework.erebor.on_network_shutdown(self.ID, self.notify_done)
self.framework.delete_network(self.ID, "0", self.ID, self.root_r,
self.root_n)
Task.terminate_task(self)
def notify_done(self, data) :
self.request_transition(self.state+consts.DONE)
......@@ -29,21 +29,14 @@ class MessageReceiver(Task):
self.local_ID = local_ID
self.message_regex = re.compile(pattern)
def initialize(self) :
if len(self.run_dependency) == 0 :
self._run(self.name)
def terminate_task(self) :
Task.terminate_task(self)
def run(self) :
def runing_state(self) :
self.network = self.framework.get_network(self.local_ID)
self.network.register_on_bridge_generic_messages(self.receive_gmsg)
def receive_gmsg(self, data) :
if self.state == consts.RUNING :
if self.state == consts.RUNNING :
decoded_message = json.loads(data)
decoded_data = json.loads(decoded_message[consts.DATA])
if self.message_regex.match(decoded_data[consts.DATA]) != None :
if self.callback(decoded_data[consts.DATA]) :
self.terminate_task()
self.request_transition(consts.RUNNING_DONE)
......@@ -14,7 +14,7 @@ class ParrallelBroadcaster(Executor):
self.fail_threshold = fail_threshold
self.nbfail = 0
def run(self) :
def runing_state(self) :
for command in self.commands :
self.framework.broadcast_exec_on("0",
command,
......@@ -47,6 +47,6 @@ class ParrallelBroadcaster(Executor):
self.commandsw -=1
if self.commandsw == 0 :
if self.nbfail <= self.fail_threshold :
Executor.terminate_task(self)
self.request_transition(consts.RUNNING_DONE)
else :
self.notify_error()
self.request_transition(consts.RUNNING_ERROR)
......@@ -19,7 +19,7 @@ class MPIExecutor(Executor):
self.command = "{} -N {} -P {}".format(command, self.group.ID,
self.group.server_number)
def run(self) :
def runing_state(self) :
self.framework.exec_on(self.exec_rank,
self.command,
consts.TRUE,
......@@ -35,4 +35,4 @@ class MPIExecutor(Executor):
#self.framework.end_mpi_session(self.group.ID,
# self.group.root_r,
# self.group.root_n)
Executor.terminate_task(self)
self.request_transition(consts.RUNNING_DONE)
......@@ -24,4 +24,4 @@ class MPIJail(NumberedGroup):
# to override
def start_mpi_jail_group(self) :
self._run(self.name)
self.request_transition(consts.INIT_RUNNING)
......@@ -14,7 +14,7 @@ class SerialBroadcaster(Executor):
self.fail_threshold = fail_threshold
self.nbfail = 0
def run(self) :
def runing_state(self) :
self.exec_one()
def exec_one(self) :
......@@ -53,8 +53,8 @@ class SerialBroadcaster(Executor):
self.commandsw -=1
if self.commandsw == 0 :
if self.nbfail <= self.fail_threshold :
Executor.terminate_task(self)
self.request_transition(consts.RUNNING_DONE)
else :
self.notify_error()
self.request_transition(consts.RUNNING_ERROR)
else :
self.exec_one()
......@@ -23,4 +23,4 @@ class NumberedGroup(Group):
# to override
def start_numbered_group(self) :
self._run(self.name)
self.request_transition(consts.INIT_RUNNING)
......@@ -12,63 +12,80 @@ from .. import consts
class TaskProcessor(FrameworkControler):
def __init__(self, erebor, ID, node_list, tfile=None):
FrameworkControler.__init__(self, erebor, ID, tfile)
self.groups = dict()
self.job = {}
self.tasks = []
self.register_tasks()
self.job = {}
for task in self.tasks :
task.set_framework(self)
self.groups[task.name] = task
self.job[task.name] = 1
task.register_done_callback(self.task_done)
task.register_sick_callback(self.task_sick)
task.register_run_callback(self.task_running)
task.register_err_callback(self.task_error)
task.register_timeout_callback(self.task_timeout)
task.register_cance_callback(self.task_cancel)
self.job[task] = 1
task.register_on_state(self.task_state)
#task.register_on_transition(self.task_transition)
# to override
def register_tasks(self) :
pass
def end(self, name) :
self.job[name] = 0
def end(self, task) :
self.job[task] = 0
to_wait = sum(self.job.values())
if to_wait == 0 :
self.erebor.terminate()
def task_running(self, name) :
print(consts.blue("{} is {}".format(name, self.groups[name].state)))
def task_transition(self, task, transition) :
# Print a colored log
if transition == consts.IDLE_INIT :
print(consts.blue("{} transit {}".format(task.name, "idle -> init")))
elif transition == consts.IDDLE_CANCEL :
print(consts.blue("{} transit {}".format(task.name, "idle -> cancel")))
elif transition == consts.INIT_RUNNING :
print(consts.blue("{} transit {}".format(task.name, "init -> running")))
elif transition == consts.INIT_ERROR :
print(consts.blue("{} transit {}".format(task.name, "init -> error")))
elif transition == consts.INIT_CANCELED :
print(consts.blue("{} transit {}".format(task.name, "init -> cancel")))
elif transition == consts.RUNNING_DONE:
print(consts.blue("{} transit {}".format(task.name, "running -> done")))
elif transition == consts.RUNNING_TIMEOUT :
print(consts.orange("{} transit {}".format(task.name, "running -> timeout")))
elif transition == consts.RUNNING_CANCELED :
print(consts.orange("{} transit {}".format(task.name, "running -> canceled")))
elif transition == consts.RUNNING_ERROR :
print(consts.error("{} transit {}".format(task.name, "running -> error")))
else:
print(consts.error("{} transit {}".format(task.name, "{} Unknown and its a major issue".format(transition))))
def task_error(self, name) :
print(consts.green("{} is {}".format(name, self.groups[name].state)))
self.end(name)
def task_timeout(self, name) :
print(consts.green("{} is {}".format(name, self.groups[name].state)))
self.end(name)
def task_sick(self, name) :
print(consts.green("{} is {}".format(name, self.groups[name].state)))
self.end(name)
def task_done(self, name) :
print(consts.green("{} is {}".format(name, self.groups[name].state)))
self.end(name)
def task_cancel(self, name) :
print(consts.green("{} is {}".format(name, self.groups[name].state)))
self.end(name)
def task_state(self, task, state) :
# Print a colored log
if state == consts.IDLE :
print(consts.blue("{} is {}".format(task.name, "idle")))
elif state == consts.INIT :
print(consts.blue("{} is {}".format(task.name, "init")))
elif state == consts.RUNNING :
print(consts.blue("{} is {}".format(task.name, "running")))
elif state == consts.DONE :
print(consts.green("{} is {}".format(task.name, "done")))
elif state == consts.TIMEOUT :
print(consts.orange("{} is {}".format(task.name, "timeout")))
elif state == consts.CANCELED :
print(consts.orange("{} is {}".format(task.name, "canceled")))
elif state == consts.ERROR :
print(consts.error("{} is {}".format(task.name, "error")))
else:
print(consts.error("{} is {}".format(task.name, "Unknown and its a major issue")))
# If the task is in a terminal state, decrement its wait value and
# terminate if necessary
if state >= consts.DONE :
self.end(task)
# Run some tests
def start(self, networkId):
self.networkId = networkId
print("processor start")
for task in self.tasks :
task.init_task()
task.request_transition(consts.IDLE_INIT)
# Add a task on the fly
def add_task(self, task) :
self.to_wait += 1
self.tasks.append(task)
self._init_task(task)
## Add a task on the fly
#def add_task(self, task) :
# self.to_wait += 1
# self.tasks.append(task)
# self._init_task(task)
......@@ -2,42 +2,21 @@ from threading import Timer
from .. import consts
from .. import Event
class LaunchDependency :
def __init__(self, name) :
self.dependency = set()
self.tasks = dict()
self.name = name
def add_dependency(self, task, state) :
self.tasks[task.name] = task
#print("{} add dependency on {} on state {}".format(self.name, task.name, state))
self.dependency.add(task.name+"@"+state)
def check_depency(self, task_name) :
if task_name in self.tasks :
task = self.tasks[task_name]
current= task.state
if task_name+"@"+current in self.dependency :
#print("{} found {}".format(self.name, task_name+"@"+current))
self.dependency.remove(task.name+"@"+current)
#print("{} rest : {}".format(self.name, len(self.dependency)))
return len(self.dependency) == 0
class Task:
def __init__(self, name, timeout=-1, root_r="0", root_n="root"):
self.name = name # Way to identify the node
self.root_r = root_r # Node where root erebor is running
self.root_n = root_n # Name of the root erebor instance
self.state = consts.BOOSTRAP # init state
self.sick_callback = []
self.errr_callback = []
self.done_callback = []
self.run_callback = []
self.timo_callback = []
self.cncl_callback = []
self.state = consts.IDLE # init state
self.on_state = []
self.on_transition = []
# Keep track of the list of tasks from who we listen on notifications
self.listen_on = []
# for transitions
self.dependant_transitions = dict()
self.waiting_transitions = []
self.trigger_transitions = dict()
self.timer = None
if timeout > -1 :
#print("timer launcher")
......@@ -48,188 +27,206 @@ class Task:
def set_framework(self, framework):
self.framework = framework
# callback registration
def register_sick_callback(self, callback) :
self.sick_callback.append(callback);
return self
def register_done_callback(self, callback) :
self.done_callback.append(callback);
return self
def register_run_callback(self, callback) :
self.run_callback.append(callback);
return self
def register_err_callback(self, callback) :
self.errr_callback.append(callback);
return self
def register_timeout_callback(self, callback) :
self.timo_callback.append(callback);
# callback registration ---------------------------------------------------
def register_on_state(self, callback) :
self.on_state.append(callback);
return self
def register_cance_callback(self, callback) :
self.cncl_callback.append(callback);
def register_on_transition(self, callback) :
self.on_transition.append(callback);
return self
# initialization and running
# Transitions -------------------------------------------------------------
def add_dependency_for_transition(self, transition, on_transition, task):
"""
a dependency for a transition is a required transition on an other task.
this transition must appear before the current task engage its
transition. Otherwise, the current task transition will be blocked until
the requested transition applied.
Many dependencies can be applied for a transition.
If many dependencies are applied, all of them must appear for the
current task transition to be possible. Eventually, the last of the
dependencies will trigger the current task transition.
"""
if transition not in self.dependant_transitions :
self.dependant_transitions[transition] = dict()
if task not in self.dependant_transitions[transition] :
self.dependant_transitions[transition][task] = list()
self.dependant_transitions[transition][task].append(on_transition)
if task not in self.listen_on :
task.register_on_transition(self.upon_foreign_transition);
def add_trigger_for_transition(self, transition, on_transition, task) :
"""
a trigger, is one or many other transitions that will generate a
transition for the current task. Trigger transitions are not
dependencies. It can be used to cancel a task if another task has gone
error or timeout.
"""
if transition not in self.trigger_transitions :
self.trigger_transitions[transition] = dict()
if task not in self.trigger_transitions[transition] :
self.trigger_transitions[transition][task] = list()
self.trigger_transitions[transition][task].append(on_transition)
if task not in self.listen_on :
task.register_on_transition(self.upon_foreign_transition);
def upon_foreign_transition(self, task, transition) :
# for depedent transitions
# Propagate the foreign transition
transition_to_remove = []
for my_transition, dependencies in self.dependant_transitions.items() :
dependency_to_remove = []
if task in dependencies :
if transition in dependencies[task] :
dependency_to_remove.append(transition)
for r in dependency_to_remove :
dependencies[task].remove(r)
if len(dependencies[task]) == 0 :
del dependencies[task]
if len(dependencies) == 0 :
transition_to_remove.append(my_transition)
for t in transition_to_remove :
del self.dependant_transitions[t]
# any dependency are now isolated in the can_trigger list. It's not a
# reason to execute them yet, only transitions that have been flagged as
# waiting_transitions are triggered, the other ones will simply be
# executed on their natural turn.
for transition in transition_to_remove :
if transition in self.waiting_transitions :
self.waiting_transitions.remove(transition)
self.request_transition(transition)
# For triggers
# Propagate the foreign transition
transition_to_remove = []
for my_transition, dependencies in self.trigger_transitions.items() :
dependency_to_remove = []
if task in dependencies :
if transition in dependencies[task] :
dependency_to_remove.append(transition)
for r in dependency_to_remove :
dependencies[task].remove(r)
if len(dependencies[task]) == 0 :
del dependencies[task]
if len(dependencies) == 0 :
transition_to_remove.append(my_transition)
for t in transition_to_remove :
del self.trigger_transitions[t]
for transition in transition_to_remove :
self.request_transition(transition)
def request_transition(self, transition) :
"""