Commit 84e08145 authored by lavocat's avatar lavocat
Browse files

Care on encoding. Related to #6

Text output by a hosted message may not be in UTF8. This is not the case
for TakTuk or our controlled programs. Callback from isengard now send
the content of the gathered output in binary and utf8 form.
When the output of a subprocess is gathered for the root node, it is
encoded in bas64. On the root, the user should take care of the final
encoding. The Executor task is now able to deal with other encoding than
UTF-8.
parent 60ac7526
import base64
import logging
import math
import traceback
......@@ -64,15 +65,15 @@ class RunningCommand:#TODO integrate node name
self.final_cllbck = callback_on_terminaison
self.final_cllbck_value = -1
def test_and_dispatch(self, txt):
if self.stdout_regex.match(txt) != None :
self.append_stdout(txt)
def test_and_dispatch(self, txt_utf8, txt_base_64):
if self.stdout_regex.match(txt_utf8) != None :
self.append_stdout(txt_utf8, txt_base_64)
return 0
elif self.stderr_regex.match(txt) != None :
self.append_stderr(txt)
elif self.stderr_regex.match(txt_utf8) != None :
self.append_stderr(txt_utf8, txt_base_64)
return 1
elif self.final_regex.match(txt) != None :
self.final(txt)
elif self.final_regex.match(txt_utf8) != None :
self.final(txt_utf8, txt_base_64)
if self.wait == 0 :
return 2
else :
......@@ -93,11 +94,11 @@ class RunningCommand:#TODO integrate node name
self.logger.debug("regex -> ^.*{}-{}: {} \(.*\): status >.*".format(self.name, self.dest, self.command))
return re.compile(regex)
def append_stdout(self, line):
self.stdout.append(line)
def append_stdout(self, line, line_64):
self.stdout.append(line_64)
def append_stderr(self, line):
self.stderr.append(line)
def append_stderr(self, line, line_64):
self.stderr.append(line_64)
# handle here ?
def node_deads(self, error_nodes, errors_rank) :
......@@ -117,9 +118,9 @@ class RunningCommand:#TODO integrate node name
return 1
return 0
def final(self, line):
def final(self, line, line64):
self.wait -= 1
self.finall = line
self.finall = line64
self.logger.debug(" TO WARN {} {} {} {} {} ".format(self.snode, self.snetid, self.dnode,
self.dnetid,self.final_cllbck_value))
if self.final_cllbck != -1 :
......@@ -153,27 +154,27 @@ class BrodcastRunningCommand(RunningCommand):
self.bstderr = dict()
self.logger.info("dest to wait broadcast command {}".format(self.wait))
def append_stdout(self, line):
def append_stdout(self, line, line_64):
identity = line.split(":")[0]
if identity not in self.bstdout :
self.bstdout[identity] = []
self.bstdout[identity].append(line)
self.bstdout[identity].append(line_64)
def append_stderr(self, line):
def append_stderr(self, line, line_64):
identity = line.split(":")[0]
if identity not in self.bstderr :
self.bstderr[identity] = []
self.bstderr[identity].append(line)
self.bstderr[identity].append(line_64)
def append_status(self, line) :
def append_status(self, line, line_64) :
identity = line.split(":")[0]
if identity not in self.bstatus :
self.bstatus[identity] = []
self.bstatus[identity].append(line)
self.bstatus[identity].append(line_64)
def final(self, line):
def final(self, line, line_64):
self.wait -= 1
self.append_status(line)
self.append_status(line, line_64)
if self.final_cllbck != -1 :
if self.snode != None :
self.final_cllbck(self.snode, self.snetid, self.dnode, self.dnetid,
......@@ -904,10 +905,12 @@ class Erebor:
def subprocess_callback(self, networkId, txt) :
dispactch = False
toremove = []
txt_base_64 = base64.b64encode(txt).decode(consts.encoding)
txt_utf8 = txt.decode(consts.encoding)
for i in range(0, len(self.running_commands)) :
running_command = self.running_commands[i]
if networkId == running_command.dnetid :
ret = running_command.test_and_dispatch(txt)
ret = running_command.test_and_dispatch(txt_utf8, txt_base_64)
if ret > -1 :
dispactch = True
if ret == 2 :
......
......@@ -105,10 +105,7 @@ class Network(Isengard) :
# is called when a subprocess write something
def _subprocess_callback(self, txt) :
if len(self.sbprc_callbacks) == 0 :
self.logger.debug(bcolors.WARNING+ txt +bcolors.ENDC)
else :
self.call_sbprc_callbacks(self.ID, txt)
self.call_sbprc_callbacks(self.ID, txt)
return json.dumps({consts.TYPE:consts.CONTROL, consts.VALUE:consts.ACK})
def bridge_dead(self) :
......@@ -151,8 +148,8 @@ class Network(Isengard) :
self.new_mpi_trans_network_has_come(decoded_data, sender)
# is called when something arrive from taktuk output
def taktuk_stdout_callback(self, txt) :
self.logger.info(bcolors.OKBLUE+ txt +bcolors.ENDC)
def taktuk_stdout_callback(self, txt, txt_8) :
self.logger.info(bcolors.OKBLUE+ txt_8 +bcolors.ENDC)
# is called when status information are displayed by taktuk
def _status_print(self, txt):
......
......@@ -367,8 +367,8 @@ class Isengard:
self.taktuk.register_callback(re.compile("^.*[0-9]*: .*$"), self.subprocess_callback)
#'connector=\"connector:$host;$peer;$line;$peer_position\\n\" '
def connector_error(self, txt):
self.add_to_queue(Event(self._connector_error, txt))
def connector_error(self, txt, txt_utf8):
self.add_to_queue(Event(self._connector_error, txt_utf8))
def _connector_error(self, txt):
self.spawned_lock.acquire()
......@@ -425,8 +425,8 @@ class Isengard:
return None, -1
#'state="state:$host;$position;$rank;$line;$peers_given\\n"'
def state_update(self, txt):
self.add_to_queue(Event(self._state_update, txt))
def state_update(self, txt, txt_utf8):
self.add_to_queue(Event(self._state_update, txt_utf8))
def _state_update(self, txt):
self.spawned_lock.acquire()
......@@ -534,11 +534,11 @@ class Isengard:
self.logger.info(bcolors.FAIL+"network update failure"+bcolors.ENDC)
# is called when something arrive from taktuk output
def taktuk_stdout_callback(self, txt) :
def taktuk_stdout_callback(self, txt, txt_8) :
self.logger.info(bcolors.OKBLUE+ txt +bcolors.ENDC)
def status_print(self, txt):
self.add_to_queue(Event(self._status_print, txt))
def status_print(self, txt, txt_utf_8):
self.add_to_queue(Event(self._status_print, txt_utf8))
def _status_print(self, txt):
self.logger.info(bcolors.UNDERLINE+ txt +bcolors.ENDC)
......@@ -557,11 +557,13 @@ class Isengard:
self.bridge.stop_select()
# is called when something arrive from the bridge
def subprocess_callback(self, txt) :
def subprocess_callback(self, txt, txt_utf8) :
# Do not give UTF-8 to subprocess out.
self.add_to_queue(Event(self._subprocess_callback, txt))
def _subprocess_callback(self, txt) :
print(bcolors.WARNING+ txt +bcolors.ENDC)
# Only in this non used function we can decode in utf8
print(bcolors.WARNING+ "{}".format(txt) +bcolors.ENDC)
# is called when something arrive from the bridge
def bridge_callback(self, txt) :
......
......@@ -67,22 +67,23 @@ class Wrapper(Thread):
# pexpect, even on linux systems one should look for \r\n
self.child.expect("\r\n", consts.timeout)
# According to pexpect doc, will only fetch the current match
stdline = self.child.before.decode(consts.encoding)
stdline = self.child.before
stdline8 = stdline.decode(consts.encoding)
#self.logger.debug(" TAKTUK LINE : "+stdline)
# Over the regex lists
hasMatched = False
for key,val in self.hashrf.items() :
if key.match(stdline) != None :
if key.match(stdline8) != None :
self.log_time(key.pattern)
val(stdline);
val(stdline, stdline8);
hasMatched = True
break
if not hasMatched :
if "option m" in stdline :
if "option m" in stdline8 :
self.log_time("option m")
else :
self.log_time("taktuk output")
self.default_callback(stdline)
self.default_callback(stdline, stdline8)
except pexpect.TIMEOUT:
pass
except pexpect.EOF:
......
import base64
import json
from yggdrasil.isengard import consts
from .group import Group
......@@ -28,15 +29,15 @@ class Broadcaster(Executor):
statuss = decoded_data[consts.STATUS]
nbfail = 0
for key, value in statuss.items() :
status = statuss[key]
status = base64.b64decode(statuss[key][0]).decode(self.encoding)
stdout = []
stderr = []
if key in stdouts :
stdout = stdouts[key]
if key in stderrs :
stderr = stderrs[key]
self.printt_decoded_data(stderr, stdout, status[0], key)
exstat = status[0].split("Exited with status")
self.printt_decoded_data(stderr, stdout, statuss[key][0], key)
exstat = status.split("Exited with status")
stat = int(exstat[len(exstat)-1])
if stat != 0 :
nbfail += 1
......
import json
import base64
from .task import Task
from .group import Group
from yggdrasil.isengard import consts
class Executor(Task) :
def __init__(self, on, end, name, timeout=-1) :
def __init__(self, on, end, name, timeout=-1, encoding=consts.encoding) :
Task.__init__(self, name, timeout)
self.group = on
self.end = end
self.group = on
self.end = end
self.encoding = encoding
def terminate_task(self) :
if self.end :
......@@ -24,10 +26,13 @@ class Executor(Task) :
print("---------- {} ----------".format(prefix))
value = stderr
for line in value :
line = base64.b64decode(line).decode(self.encoding)
line = line.split("error >")[1]
print ("{} {} : {}".format(prefix, "stderr", line))
value = stdout
for line in value :
line = base64.b64decode(line).decode(self.encoding)
line = line.split("output >")[1]
print ("{} {} : {}".format(prefix, "stdout", line))
print ("{} {} : {}".format(prefix, "status", status))
print ("{} {} : {}".format(prefix, "status",
base64.b64decode(status).decode(self.encoding)))
import base64
import json
from yggdrasil.isengard import consts
from .executor import Executor
......@@ -31,15 +32,15 @@ class ParrallelBroadcaster(Executor):
stderrs = decoded_data[consts.STDERR]
statuss = decoded_data[consts.STATUS]
for key, value in statuss.items() :
status = statuss[key]
status = base64.b64decode(statuss[key][0]).decode(self.encoding)
stdout = []
stderr = []
if key in stdouts :
stdout = stdouts[key]
if key in stderrs :
stderr = stderrs[key]
self.printt_decoded_data(stderr, stdout, status[0], key)
exstat = status[0].split("Exited with status")
self.printt_decoded_data(stderr, stdout, statuss[key][0], key)
exstat = status.split("Exited with status")
stat = int(exstat[len(exstat)-1])
if stat != 0 :
self.nbfail += 1
......
import base64
import sys
import json
from yggdrasil.isengard import consts
from .executor import Executor
......@@ -36,15 +38,15 @@ class SerialBroadcaster(Executor):
stderrs = decoded_data[consts.STDERR]
statuss = decoded_data[consts.STATUS]
for key, value in statuss.items() :
status = statuss[key]
status = base64.b64decode(statuss[key][0]).decode(self.encoding)
stdout = []
stderr = []
if key in stdouts :
stdout = stdouts[key]
if key in stderrs :
stderr = stderrs[key]
self.printt_decoded_data(stderr, stdout, status[0], key)
exstat = status[0].split("Exited with status")
self.printt_decoded_data(stderr, stdout, statuss[key][0], key)
exstat = status.split("Exited with status")
stat = int(exstat[len(exstat)-1])
if stat != 0 :
self.nbfail += 1
......
import base64
import json
from yggdrasil.isengard import consts
from .executor import Executor
......@@ -52,7 +53,7 @@ class Ventilator(Executor):
# to override
def command_done(self, data) :
decoded = json.loads(data)
status = decoded[consts.STATUS]
status = base64.b64decode(decoded[consts.STATUS]).decode(self.encoding)
exstat = status.split("Exited with status")
stat = int(exstat[len(exstat)-1])
if stat != 0 :
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment