Vous avez reçu un message "Your GitLab account has been locked ..." ? Pas d'inquiétude : lisez cet article https://docs.gricad-pages.univ-grenoble-alpes.fr/help/unlock/

Commit 65560360 authored by Thomas Lavocat's avatar Thomas Lavocat
Browse files

New dependencies mechanism.

Can do AND and OR.
parent a896fe16
......@@ -10,3 +10,6 @@ from .ventilator import Ventilator
from .barrier import Barrier
from .message_receiver import MessageReceiver
from .tas_bag import TaskBag
from .task import Task
from .task import Dependency
from .task import DependencyEvent
......@@ -91,3 +91,11 @@ class TaskProcessor(FrameworkControler):
task.register_on_state(self.task_state)
#task.register_on_transition(self.task_transition)
task.request_transition(consts.IDLE_INIT)
def add_start_all(self, tasks) :
for task in tasks :
task.set_framework(self)
self.job[task] = 1
task.register_on_state(self.task_state)
for task in tasks :
task.request_transition(consts.IDLE_INIT)
......@@ -2,6 +2,55 @@ from threading import Timer
from .. import consts
from .. import Event
class Dependency :
AND = 1
OR = 0
def __init__(self, logic_operator, listDependencies) :
self.logic_operator = logic_operator
self.listDependencies = listDependencies
def update(self, task, transition) :
for sub_dependency in self.listDependencies :
sub_dependency.update(task, transition)
def can_run(self) :
if self.logic_operator == Dependency.AND :
b = 1
for sub_dependency in self.listDependencies :
b = b * sub_dependency.can_run()
else :
b = 0
for sub_dependency in self.listDependencies :
b = b + sub_dependency.can_run()
if b > 1 :
b = 1
return b
def return_all_tasks(self) :
tasks = []
for sub_dependency in self.listDependencies :
for task in sub_dependency.return_all_tasks() :
tasks.append(task)
return tasks
class DependencyEvent :
def __init__(self, task, transition) :
self.task = task
self.transition = transition
self.state = 0
def update(self, task, transition) :
if self.task == task and self.transition == transition :
self.state = 1
def can_run(self) :
return self.state
def return_all_tasks(self) :
return [self.task]
class Task:
def __init__(self, name, timeout=-1, root_r="0", root_n="root"):
......@@ -43,170 +92,51 @@ class Task:
# Transitions -------------------------------------------------------------
def add_dependency_for_state(self, state, on_state, task):
if state not in self.dependant_state :
self.dependant_state[state] = dict()
if task not in self.dependant_state[state] :
self.dependant_state[state][task] = list()
self.dependant_state[state][task].append(on_state)
if task not in self.listen_on_state :
task.register_on_state(self.upon_foreign_state);
def add_trigger_for_state(self, state, on_state, task) :
if state not in self.trigger_states :
self.trigger_states[state] = dict()
if task not in self.trigger_states[state] :
self.trigger_states[state][task] = list()
self.trigger_states[state][task].append(on_state)
if task not in self.listen_on_state :
task.register_on_state(self.upon_foreign_state);
def upon_foreign_state(self, task, state) :
state_to_remove = []
for my_state, dependencies in self.dependant_state.items() :
dependency_to_remove = []
if task in dependencies :
if state in dependencies[task] :
dependency_to_remove.append(state)
for r in dependency_to_remove :
dependencies[task].remove(r)
if task in dependencies and len(dependencies[task]) == 0 :
del dependencies[task]
if len(dependencies) == 0 :
state_to_remove.append(my_state)
for t in state_to_remove :
del self.dependant_state[t]
for state in state_to_remove :
if state in self.waiting_state :
self.waiting_state.remove(state)
self.request_state(state)
state_to_remove = []
for my_state, dependencies in self.trigger_states.items() :
dependency_to_remove = []
if task in dependencies :
if state in dependencies[task] :
dependency_to_remove.append(state)
for r in dependency_to_remove :
dependencies[task].remove(r)
if task in dependencies and len(dependencies[task]) == 0 :
del dependencies[task]
if len(dependencies) == 0 :
state_to_remove.append(my_state)
for t in state_to_remove :
del self.trigger_states[t]
for state in state_to_remove :
self.request_state(state)
def request_state(self, state) :
"""
will try to apply the asked transition to the current task, if a
dependency is blocking the transition, it will wait until all needed
transitions on the requested tasks are done to be triggered
"""
if ( state is consts.RUNNING or
state is consts.CANCELED or
state is consts.DONE or
state is consts.ERROR ):
if self.check_dependencies_for_state(state) :
self.apply_state(state)
else :
self.waiting_state.append(transition)
else :
print("!!!!!!!!!!! invalid state asked {}".format(state))
def check_dependencies_for_state(self, state) :
# execute state code
if state not in self.dependant_state :
return True
return False
def set_dependency_for_transition(self, transition, dependency) :
self.dependant_transitions[transition] = dependency
for task in dependency.return_all_tasks() :
task.register_on_transition(self.upon_foreign_transition);
def apply_state(self, state) :
self.request_transition(state + self.state)
def get_dependency_for_transition(self, transition) :
if transition in self.dependant_transitions :
return self.dependant_transitions[transition]
else :
return None
def add_dependency_for_transition(self, transition, on_transition, task):
"""
a dependency for a transition is a required transition on an other task.
this transition must appear before the current task engage its
transition. Otherwise, the current task transition will be blocked until
the requested transition applied.
Many dependencies can be applied for a transition.
If many dependencies are applied, all of them must appear for the
current task transition to be possible. Eventually, the last of the
dependencies will trigger the current task transition.
"""
if transition not in self.dependant_transitions :
self.dependant_transitions[transition] = dict()
if task not in self.dependant_transitions[transition] :
self.dependant_transitions[transition][task] = list()
self.dependant_transitions[transition][task].append(on_transition)
if task not in self.listen_on :
def set_trigger_for_transition(self, transition, dependency) :
self.trigger_transitions[transition] = dependency
for task in dependency.return_all_tasks() :
task.register_on_transition(self.upon_foreign_transition);
def add_trigger_for_transition(self, transition, on_transition, task) :
"""
a trigger, is one or many other transitions that will generate a
transition for the current task. Trigger transitions are not
dependencies. It can be used to cancel a task if another task has gone
error or timeout.
"""
if transition not in self.trigger_transitions :
self.trigger_transitions[transition] = dict()
if task not in self.trigger_transitions[transition] :
self.trigger_transitions[transition][task] = list()
self.trigger_transitions[transition][task].append(on_transition)
if task not in self.listen_on :
task.register_on_transition(self.upon_foreign_transition);
def get_trigger_for_transition(self, transition) :
if transition in self.trigger_transitions :
return self.trigger_transitions[transition]
else :
return None
def upon_foreign_transition(self, task, transition) :
# for depedent transitions
# Propagate the foreign transition
transition_to_remove = []
for my_transition, dependencies in self.dependant_transitions.items() :
dependency_to_remove = []
if task in dependencies :
if transition in dependencies[task] :
dependency_to_remove.append(transition)
for r in dependency_to_remove :
dependencies[task].remove(r)
if task in dependencies and len(dependencies[task]) == 0 :
del dependencies[task]
if len(dependencies) == 0 :
transition_to_remove.append(my_transition)
for t in transition_to_remove :
del self.dependant_transitions[t]
# any dependency are now isolated in the can_trigger list. It's not a
# reason to execute them yet, only transitions that have been flagged as
# waiting_transitions are triggered, the other ones will simply be
# executed on their natural turn.
for transition in transition_to_remove :
if transition in self.waiting_transitions :
self.waiting_transitions.remove(transition)
self.request_transition(transition)
# check code that need to be executed when a transition is
# possible
if transition in self.waiting_callback_on_possible_transition :
toRemove = []
for my_transition, dependency in self.dependant_transitions.items() :
dependency.update(task, transition)
if dependency.can_run() == 1:
toRemove.append(my_transition)
for my_transition in toRemove :
del self.dependant_transitions[my_transition]
if my_transition in self.waiting_transitions :
self.waiting_transitions.remove(my_transition)
self.request_transition(my_transition)
if my_transition in self.waiting_callback_on_possible_transition :
callback = self.waiting_callback_on_possible_transition[transition]
del self.waiting_callback_on_possible_transition[transition]
callback()
# For triggers
# Propagate the foreign transition
transition_to_remove = []
for my_transition, dependencies in self.trigger_transitions.items() :
dependency_to_remove = []
if task in dependencies :
if transition in dependencies[task] :
dependency_to_remove.append(transition)
for r in dependency_to_remove :
dependencies[task].remove(r)
if task in dependencies and len(dependencies[task]) == 0 :
del dependencies[task]
if len(dependencies) == 0 :
transition_to_remove.append(my_transition)
for t in transition_to_remove :
del self.trigger_transitions[t]
for transition in transition_to_remove :
self.request_transition(transition)
toRemove = []
for my_transition, dependency in self.trigger_transitions.items() :
dependency.update(task, transition)
if dependency.can_run() == 1 :
toRemove.append(my_transition)
for my_transition in toRemove :
del self.trigger_transitions[my_transition]
self.apply_transition(my_transition)
def execute_when_transition_transition_possible(self, transition, callback) :
"""
......@@ -233,7 +163,7 @@ class Task:
else :
self.waiting_callback_on_possible_transition[transition]=callback
else :
print("!!!!!!!!!!! invalid transition asked {}".format(transition))
print("{} !!!!!!!!!!! invalid transition asked {}".format(self.name, transition))
def request_transition(self, transition) :
"""
......@@ -255,7 +185,7 @@ class Task:
else :
self.waiting_transitions.append(transition)
else :
print("!!!!!!!!!!! invalid transition asked {}".format(transition))
print("{} !!!!!!!!!!! invalid transition asked {}".format(self.name, transition))
def check_dependencies_for_transition(self, transition) :
arrival_state = transition - self.state
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment