Vous avez reçu un message "Your GitLab account has been locked ..." ? Pas d'inquiétude : lisez cet article https://docs.gricad-pages.univ-grenoble-alpes.fr/help/unlock/

Commit 247e73a0 authored by Thomas Lavocat's avatar Thomas Lavocat
Browse files

Add a way to group tasks

parent 8bd973b1
......@@ -105,15 +105,15 @@ CANCELED = 70
#Authorized transitions
IDLE_INIT = IDLE+INIT
IDDLE_CANCEL = IDLE+CANCELED
INIT_RUNNING = INIT+RUNNING
INIT_ERROR = INIT+ERROR
INIT_CANCELED = INIT+CANCELED
RUNNING_DONE = RUNNING+DONE
RUNNING_ERROR = RUNNING+ERROR
RUNNING_TIMEOUT = RUNNING+TIMEOUT
RUNNING_CANCELED = RUNNING+CANCELED
IDLE_INIT = IDLE+INIT
IDDLE_CANCEL = IDLE+CANCELED
INIT_RUNNING = INIT+RUNNING
INIT_ERROR = INIT+ERROR
INIT_CANCELED = INIT+CANCELED
RUNNING_DONE = RUNNING+DONE
RUNNING_ERROR = RUNNING+ERROR
RUNNING_TIMEOUT = RUNNING+TIMEOUT
RUNNING_CANCELED = RUNNING+CANCELED
TERMINAISON_PENDING = "terminaison_pending"
class bcolors:
......
......@@ -3,3 +3,4 @@ from .encoder import MPIDecoder
from .network import Network
from .framework import FrameworkControler
from .main import runner
from .main import argless_runner
......@@ -6,9 +6,23 @@ import signal
import yggdrasil
from yggdrasil.erebor import Erebor
from ..log import configure_logger
import sys, traceback, threading, time
logger = logging.getLogger('yggdrasil')
def argless_runner(Controler):
erebor = Erebor(True,
"erebor,network,isengard,wrapper,unix_socket,bridge,mpi",
"CRITICAL", "", "", "")
def signal_handler(a, b):
erebor.terminate()
signal.signal(signal.SIGINT, signal_handler)
controler = Controler(erebor, "root", "", "")
# Start the test sample after the root network bootstrap
erebor.on_network_init("root", controler.start)
controler.bootstrap()
erebor.process_messages()
erebor.terminate()
def runner(argv, Controler=None):
print("start")
......@@ -159,6 +173,7 @@ def runner(argv, Controler=None):
else:
erebor.bootstrap(ID)
erebor.process_messages()
erebor.terminate()
except:
logger.fatal("Unexpected error: {}".format())
erebor.terminate()
......
......@@ -9,4 +9,4 @@ from .processor import TaskProcessor
from .ventilator import Ventilator
from .barrier import Barrier
from .message_receiver import MessageReceiver
from .tas_bag import TaskBag
import base64
import json
from .. import consts
from .task import Task
class TaskBag(Task) :
def __init__(self, name, task_list=None, timeout=-1, encoding=consts.encoding) :
Task.__init__(self, name, timeout)
self.final_state = consts.DONE
self.wait_done = 0
if task_list is not None :
self.set_task_list(task_list)
else :
self.task_list = []
def add_task(self, task) :
self.task_list.append(task)
self.wait_done += 1
task.register_on_state(self.sub_task_state_change)
self.add_dependency_for_transition(consts.INIT_RUNNING,
consts.INIT_RUNNING,
task)
def set_task_list(self, task_list):
self.task_list = task_list
self.wait_done = len(self.task_list)
for task in self.task_list :
task.register_on_state(self.sub_task_state_change)
self.add_dependency_for_transition(consts.INIT_RUNNING,
consts.INIT_RUNNING,
task)
def runing_state(self) :
pass
def sub_task_state_change(self, task, state) :
if state is consts.DONE :
self.wait_done -=1
if state is consts.CANCELED or state is consts.ERROR :
self.wait_done -=1
if self.final_state is consts.DONE :
self.final_state = state
for task in self.task_list :
if (task.state is not consts.CANCELED and task.state is not
consts.ERROR and task.state is not consts.DONE):
task.request_transition(task.state + consts.CANCELED)
if self.wait_done is 0 :
self.request_transition(self.state + self.final_state)
def add_all_dependency_for_transition(self, transition, task_t, task) :
self.add_dependency_for_transition(transition, task_t, task)
for t in self.task_list :
t.add_dependency_for_transition(transition, task_t, task)
def add_all_trigger_for_transition(self,transition, task_t, task) :
self.add_trigger_for_transition(transition, task_t, task)
for t in self.task_list :
t.add_trigger_for_transition(transition, task_t, task)
def add_all_dependency_for_state(self, transition, task_t, task) :
self.add_dependency_for_state(transition, task_t, task)
for t in self.task_list :
t.add_dependency_for_state(transition, task_t, task)
def add_all_trigger_for_state(self,transition, task_t, task) :
self.add_trigger_for_state(transition, task_t, task)
for t in self.task_list :
t.add_trigger_for_state(transition, task_t, task)
......@@ -12,12 +12,16 @@ class Task:
self.on_state = []
self.on_transition = []
# Keep track of the list of tasks from who we listen on notifications
self.listen_on = []
self.listen_on = []
self.listen_on_state= []
# for transitions
self.dependant_transitions = dict()
self.dependant_state = dict()
self.waiting_callback_on_possible_transition = dict()
self.waiting_transitions = []
self.waiting_state = []
self.trigger_transitions = dict()
self.trigger_states = dict()
self.timer = None
if timeout > -1 :
#print("timer launcher")
......@@ -39,6 +43,87 @@ class Task:
# Transitions -------------------------------------------------------------
def add_dependency_for_state(self, state, on_state, task):
if state not in self.dependant_state :
self.dependant_state[state] = dict()
if task not in self.dependant_state[state] :
self.dependant_state[state][task] = list()
self.dependant_state[state][task].append(on_state)
if task not in self.listen_on_state :
task.register_on_state(self.upon_foreign_state);
def add_trigger_for_state(self, state, on_state, task) :
if state not in self.trigger_states :
self.trigger_states[state] = dict()
if task not in self.trigger_states[state] :
self.trigger_states[state][task] = list()
self.trigger_states[state][task].append(on_state)
if task not in self.listen_on_state :
task.register_on_state(self.upon_foreign_state);
def upon_foreign_state(self, task, state) :
state_to_remove = []
for my_state, dependencies in self.dependant_state.items() :
dependency_to_remove = []
if task in dependencies :
if state in dependencies[task] :
dependency_to_remove.append(state)
for r in dependency_to_remove :
dependencies[task].remove(r)
if task in dependencies and len(dependencies[task]) == 0 :
del dependencies[task]
if len(dependencies) == 0 :
state_to_remove.append(my_state)
for t in state_to_remove :
del self.dependant_state[t]
for state in state_to_remove :
if state in self.waiting_state :
self.waiting_state.remove(state)
self.request_state(state)
state_to_remove = []
for my_state, dependencies in self.trigger_states.items() :
dependency_to_remove = []
if task in dependencies :
if state in dependencies[task] :
dependency_to_remove.append(state)
for r in dependency_to_remove :
dependencies[task].remove(r)
if task in dependencies and len(dependencies[task]) == 0 :
del dependencies[task]
if len(dependencies) == 0 :
state_to_remove.append(my_state)
for t in state_to_remove :
del self.trigger_states[t]
for state in state_to_remove :
self.request_state(state)
def request_state(self, state) :
"""
will try to apply the asked transition to the current task, if a
dependency is blocking the transition, it will wait until all needed
transitions on the requested tasks are done to be triggered
"""
if ( state is consts.RUNNING or
state is consts.CANCELED or
state is consts.DONE or
state is consts.ERROR ):
if self.check_dependencies_for_state(state) :
self.apply_state(state)
else :
self.waiting_state.append(transition)
else :
print("!!!!!!!!!!! invalid state asked {}".format(state))
def check_dependencies_for_state(self, state) :
# execute state code
if state not in self.dependant_state :
return True
return False
def apply_state(self, state) :
self.request_transition(state + self.state)
def add_dependency_for_transition(self, transition, on_transition, task):
"""
a dependency for a transition is a required transition on an other task.
......@@ -84,7 +169,7 @@ class Task:
dependency_to_remove.append(transition)
for r in dependency_to_remove :
dependencies[task].remove(r)
if len(dependencies[task]) == 0 :
if task in dependencies and len(dependencies[task]) == 0 :
del dependencies[task]
if len(dependencies) == 0 :
transition_to_remove.append(my_transition)
......@@ -114,7 +199,7 @@ class Task:
dependency_to_remove.append(transition)
for r in dependency_to_remove :
dependencies[task].remove(r)
if len(dependencies[task]) == 0 :
if task in dependencies and len(dependencies[task]) == 0 :
del dependencies[task]
if len(dependencies) == 0 :
transition_to_remove.append(my_transition)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment