Vous avez reçu un message "Your GitLab account has been locked ..." ? Pas d'inquiétude : lisez cet article https://docs.gricad-pages.univ-grenoble-alpes.fr/help/unlock/

Commit 8bd973b1 authored by Thomas Lavocat's avatar Thomas Lavocat
Browse files

Bug fixes

* Close a mpi bridge upon destruction
* Cancel timer upon task ending
* add logs
parent 980e28ad
......@@ -35,29 +35,35 @@ class MPIHook(Thread) :
rep_socket = context.socket(zmq.PULL)
rep_socket.bind("ipc:///tmp/{}".format(self.port))
logger.debug(consts.bold("mpi -> reply socket ready to receive on {}").format(self.port))
while self.poll:
poller = zmq.Poller()
poller.register(rep_socket)
while self.poll: # TODO do actual polling pls
try :
logger.debug(consts.bold("wait to receive"))
rcv_message = rep_socket.recv()
logger.debug(consts.bold("received"))
logger.debug(consts.bold("received {}".format(rcv_message)))
decoded_message = self.unpack(rcv_message)
logger.debug(consts.bold("unpacked {}".format(decoded_message)))
# a new process is registering to me
if decoded_message[consts.TYPE] == consts.REG :
logger.debug(consts.bold("mpi -> new registration to do later"))
self.add_to_queue(Event(self.new_registration, decoded_message))
# a process wants to send a message to another one
if decoded_message[consts.TYPE] == consts.MPISEND :
logger.debug(consts.bold("mpi -> send a message later"))
self.add_to_queue(Event(self.send_message_to_distant_mpi_rank,
decoded_message))
if decoded_message[consts.TYPE] == consts.MESSAGE :
logger.debug(consts.bold("mpi -> send a ctrl message later"))
self.add_to_queue(Event(self.send_message_to_distant_erebor,
decoded_message))
# unlock the state of request socket
logger.debug(consts.bold("mpi -> done loop"))
events = dict(poller.poll(10000))
if events[rep_socket] == zmq.POLLIN :
rcv_message = rep_socket.recv()
logger.debug(consts.bold("received"))
logger.debug(consts.bold("received {}".format(rcv_message)))
decoded_message = self.unpack(rcv_message)
logger.debug(consts.bold("unpacked {}".format(decoded_message)))
# a new process is registering to me
if decoded_message[consts.TYPE] == consts.REG :
logger.debug(consts.bold("mpi -> new registration to do later"))
self.add_to_queue(Event(self.new_registration, decoded_message))
# a process wants to send a message to another one
if decoded_message[consts.TYPE] == consts.MPISEND :
logger.debug(consts.bold("mpi -> send a message later"))
self.add_to_queue(Event(self.send_message_to_distant_mpi_rank,
decoded_message))
if decoded_message[consts.TYPE] == consts.MESSAGE :
logger.debug(consts.bold("mpi -> send a ctrl message later"))
self.add_to_queue(Event(self.send_message_to_distant_erebor,
decoded_message))
# unlock the state of request socket
logger.debug(consts.bold("mpi -> done loop"))
else :
logger.debug(consts.bold("no messages"))
except Exception as e:
logger.warning(consts.bold("{}".format(e)))
......
......@@ -25,3 +25,11 @@ class MPIJail(NumberedGroup):
# to override
def start_mpi_jail_group(self) :
self.request_transition(consts.INIT_RUNNING)
def terminate_task(self) :
self.execute_when_transition_transition_possible(self.state+consts.DONE,
self.kill_mpi)
def kill_mpi(self):
self.framework.end_mpi_session(self.ID, self.root_r,self.root_n)
self.kill_group()
......@@ -84,8 +84,10 @@ class TaskProcessor(FrameworkControler):
for task in self.tasks :
task.request_transition(consts.IDLE_INIT)
## Add a task on the fly
#def add_task(self, task) :
# self.to_wait += 1
# self.tasks.append(task)
# self._init_task(task)
# Add a task on the fly
def add_task(self, task) :
task.set_framework(self)
self.job[task] = 1
task.register_on_state(self.task_state)
#task.register_on_transition(self.task_transition)
task.request_transition(consts.IDLE_INIT)
......@@ -148,7 +148,7 @@ class Task:
else :
self.waiting_callback_on_possible_transition[transition]=callback
else :
print("invalid transition asked")
print("!!!!!!!!!!! invalid transition asked {}".format(transition))
def request_transition(self, transition) :
"""
......@@ -170,7 +170,7 @@ class Task:
else :
self.waiting_transitions.append(transition)
else :
print("invalid transition asked")
print("!!!!!!!!!!! invalid transition asked {}".format(transition))
def check_dependencies_for_transition(self, transition) :
arrival_state = transition - self.state
......@@ -213,12 +213,20 @@ class Task:
self.launch_timer();
self.runing_state();
elif arrival_state == consts.DONE :
if self.timer != None :
self.timer.cancel()
self.done_state();
elif arrival_state == consts.ERROR :
if self.timer != None :
self.timer.cancel()
self.error_state();
elif arrival_state == consts.TIMEOUT :
if self.timer != None :
self.timer.cancel()
self.timeout_state();
elif arrival_state == consts.CANCELED :
if self.timer != None :
self.timer.cancel()
self.canceled_state();
# Methods below are to be overridden by sub classes. ----------------------
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment