Commit 2d36fbe9 authored by Swann Perarnau's avatar Swann Perarnau
Browse files

Reorganize into typical python package

This is the somewhat standard python package source organization, should
make it easier later on.

Paths are not updated in this commit, meaning that most imports will
probably fail.
parent a5c0d108
include tox.ini
"""Yggdrasil
"""
from setuptools import setup, find_packages
setup(
name='yggdrasil',
version='0.1',
description="",
author="Thomas Lavocat",
author_email="tlavocat@april.org",
url='',
license='GPL3',
classifiers=[
'Development Status :: 3 - Alpha',
'License :: OSI Approved :: GNU General Public License v3 (GPLv3)',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.5',
],
packages=find_packages(),
install_requires=['pyzmq', 'pexpect'],
scripts=['bin/erebor', 'bin/socket-bridge']
)
#!/usr/bin/env python3
import time
import traceback
import os
import fcntl
import getopt
import json
import queue
import struct
import sys
import socket
import select
from isengard import consts
from threading import Thread
from threading import Timer
from threading import Lock
HEADER_BUFFER = 64
SMALL_BUFFER = 20
MSG_LENGTH = 1
READ_ACCESS = 0
WRITE_ACCESS = 1
GET_INFO_ACCESS = 2
RECV_ACCESS = 3
TAKTUK_EMAXCD = 21
TAKTUK_TARGET_ANY = 0xFFFFFFFF
TAKTUK_TARGET_ALL = 0xFFFFFFFE
TAKTUK_TARGET_OUTPUT = 0xFFFFFFFD
TAKTUK_READ_SIZE = 8192
TAKTUK_WRITE_SIZE = 8192
TAKTUK_ACTION="A";
TAKTUK_WAIT="B";
TAKTUK_ID="C";
TAKTUK_EOF="D";
TAKTUK_TAKTUK_PERL="E";
TAKTUK_GATEWAY="F";
TAKTUK_GET="G";
TAKTUK_INVALID="H";
TAKTUK_INFO="I";
TAKTUK_COMMAND_SEND_TO="J";
TAKTUK_COMMAND_MESSAGE="K";
TAKTUK_OPTION="N";
TAKTUK_TIMEOUT="O";
TAKTUK_PUT="P";
TAKTUK_REDUCE_RESULT="Q";
TAKTUK_REDUCE="R";
TAKTUK_SPREAD="S";
TAKTUK_TAKTUK_CODE="T";
TAKTUK_UPDATE_FAILED="U";
TAKTUK_OPTIONS="V";
TAKTUK_WAIT_MESSAGE="W";
TAKTUK_RESIGN="X";
TAKTUK_ARGUMENTS="a";
TAKTUK_BROADCAST="b";
TAKTUK_DOWN="d";
TAKTUK_EXECUTE="e";
TAKTUK_FILE="f";
TAKTUK_GET_INFO="g";
TAKTUK_INPUT="i";
TAKTUK_KILL="k";
TAKTUK_MESSAGE="m";
TAKTUK_NUMBERED="n";
TAKTUK_OUTPUT="o";
TAKTUK_POSITION="p";
TAKTUK_QUIT="q";
TAKTUK_READY="r";
TAKTUK_STEAL="s";
TAKTUK_SEND_TO="t";
TAKTUK_FORWARD_UP="u";
TAKTUK_WORK="w";
TAKTUK_SYNCHRONIZE="x";
TAKTUK_PIPE="z";
# Reduce types
TAKTUK_REDUCE_COUNT = 'c';
TAKTUK_REDUCE_TREE = 't';
TAKTUK_REDUCE_WAIT = 'w';
# Class Bridge
# Author Thomas Lavocat
#
# Listen on a unix socket on one side and transfert messages to the taktuk
# network, and listen to the taktuk network and transfert the messages to the
# unix socket
class Bridge(Thread):
# need a valid opened socket
def __init__(self, controler_socket, debug_list, receive_callback=None):
Thread.__init__(self)
self.debug_list = debug_list
self.debug = "bridge" in self.debug_list
#connect to the taktuk network
self.ind = int(os.getenv("TAKTUK_CONTROL_READ"))
self.outd = int(os.getenv("TAKTUK_CONTROL_WRITE"))
self.rank = os.getenv("TAKTUK_RANK")
self.dead = False
#Opened for bytes transactions
self.file_out = os.fdopen(self.outd, "wb")
self.file_in = os.fdopen(self.ind, "rb")
flag = fcntl.fcntl(self.ind, fcntl.F_GETFL)
fcntl.fcntl(self.ind, fcntl.F_SETFL, flag | os.O_NONBLOCK)
self.taktuk_lock = Lock()
#acquire until thread is started (it wont be long)
self.taktuk_ready = False
self.taktuk_lock.acquire()
self.controler_socket = controler_socket
self.receive_callback = receive_callback
if self.controler_socket != None :
self.flushprint("connected to erebor")
self.controler_lock = Lock()
self.controler_ready = False
self.controler_lock.acquire()
self.daemon = True
self.poll = True
self.message_queues = {}
self.outputs = []
def flushprint(self, *args, **kwargs):
if self.debug :
print(*args, file=sys.stdout, **kwargs)
sys.stdout.flush()
# Write on taktuk every requested datas, this code is taken from C version
def insistent_write(self, data) :
size = len(data)
if self.dead :
return -1
if size == 0 :
return 0
try :
while self.poll and (size >0) and not self.dead and self.poll:
#self.flushprint("write")
result = self.file_out.write(data)
if result <= 0 or result == None:
return -1
size -= result
data = data [size:]
except :
return -1
self.file_out.flush()
return 0
# close the taktuk's file descriptors
def close(self):
self.file_out.close()
self.file_in.close()
def run(self):
epoll = select.epoll()
if self.controler_socket != None :
self.out_epoll = select.epoll()
self.out_epoll.register(self.controler_socket.fileno(), select.EPOLLOUT)
epoll.register(self.controler_socket.fileno(), select.EPOLLIN)
self.controler_ready = True
self.controler_lock.release()
epoll.register(self.ind, select.EPOLLIN)
self.taktuk_ready = True
self.taktuk_lock.release()
try:
connections = {};
while self.poll and not self.dead :
s_header_m = None
s_rest_header = 0
s_buffered_m = None
s_rest_to_read = 0
t_header_m = None
t_rest_header = 0
t_buffered_m = None
t_rest_to_read = 0
events = epoll.poll(consts.timeout)
#self.flushprint("epolling")
for fileno, event in events:
#self.flushprint("event to handle")
if event & select.EPOLLIN:
if (self.controler_socket != None and
fileno == self.controler_socket.fileno()) :
length = 0
# if we start a new message
if s_buffered_m == None :
#self.flushprint("new message")
data = bytearray()
# if we start a new header
if s_header_m == None :
#self.flushprint("new header")
data = self.controler_socket.recv(4)
# otherwise complete the previous header
else :
data = s_header_m + self.controler_socket.recv(s_rest_header)
# If we havent received enough to have a valid
# header, wait for the rest to come
if len(data) < 4 :
s_header_m = data
s_rest_header = 4 - len(s_header_m)
length = -1
# Otherwise, lets decode the rest of the message
else :
# unpack the packet length
length = struct.unpack_from('>i', data)[0]
#self.flushprint("header complete {}".format(length))
s_header_m = None
else :
length = s_rest_to_read
if length > -1 :
# read what we're supposed to
data = self.controler_socket.recv(length)
#self.flushprint("content read")
# if the received message is a full one, handle it,
# otherwise, store it for later
if len(data) == length :
#self.flushprint("good size ")
if s_buffered_m != None :
data = s_buffered_m + data
s_buffered_m = None
self._handle_message_from_socket(data)
else :
s_rest_to_read = length - len(data)
if s_buffered_m == None :
s_buffered_m = data
else :
s_buffered_m = s_buffered_m + data
if fileno == self.ind :
nbRead = 0
read_something = True
error = False
while self.poll and read_something and not error and not self.dead:
length = 0
# if we start a new message
if t_buffered_m == None :
data = bytearray()
# if we start a new header
if t_header_m == None :
data = self.file_in.read(4)
if data == None :
read_something = False
break
else :
if len(data) == 0 :
read_something = False
error = True
break
# otherwise complete the previous header
else :
temp = self.file_in.read(t_rest_header)
if data == None :
read_something = False
break
else :
if len(data) == 0 :
read_something = False
error = True
break
data = t_header_m + temp
# If we havent received enough to have a valid
# header, wait for the rest to come
if len(data) < 4 :
t_header_m = data
t_rest_header = 4 - len(t_header_m)
length = -1
# Otherwise, lets decode the rest of the message
else :
# unpack the packet length
length = struct.unpack_from('>i', data)[0]
t_header_m = None
else :
length = t_rest_to_read
if length > -1 :
# read what we're supposed to
t_buffered_m = data
data = self.file_in.read(length)
if data == None :
read_something = False
break
else :
if len(data) == 0 :
read_something = False
error = True
break
# if the received message is a full one, handle it,
# otherwise, store it for later
if len(data) == length :
if t_buffered_m != None :
data = t_buffered_m + data
t_buffered_m = None
self.extract_taktuk_message(data)
else :
t_rest_to_read = length - len(data)
if t_buffered_m == None :
t_buffered_m = data
else :
t_buffered_m = t_buffered_m + data
if read_something :
nbRead += 1
if nbRead == 0 or error :
self.bridge_dead()
else :
self.bridge_dead()
except Exception as e :
self.flushprint( "{}".format(e))
raise
finally:
self.bridge_dead()
self.flushprint("error")
def bridge_dead(self):
self.dead = True
self.poll = False
self.flushprint("bridge dead")
toforward = json.dumps({
consts.FROM:consts.INTERNAL,
consts.TYPE:consts.BRIDGED,
});
if self.controler_socket != None :
self.send_message_on_socket(toforward)
else :
self.receive_callback(toforward)
# Extract a message from taktuk and send it to the unix socket as a JSON
# message TODO
def extract_taktuk_message(self, to_unpack) :
#self.flushprint("\n get datas \n")
# The sent message by taktuk is of the following form
# code + size in the header
# so 4 bytes + 1
#self.flushprint("try to extract message {}".format(to_unpack))
offset = 0
length = struct.unpack_from('>I', to_unpack[offset:4])[0]
offset += 4
#self.flushprint("length : {}".format(length))
body_length = length - MSG_LENGTH
#self.flushprint("body_length {}".format(body_length))
code = struct.unpack_from('s', to_unpack[offset:offset+1])[0]
offset += 1
#self.flushprint("code {}".format(code))
# FROM TODO
from_size = struct.unpack_from('>I', to_unpack[offset:offset+4])[0]
offset += 4
#self.flushprint("from_size {}".format(from_size))
from_ = struct.unpack_from('s', to_unpack[offset:offset+from_size])[0]
offset += from_size
#self.flushprint("from {}".format(from_))
# READ final datas
body_length = body_length-5
data = to_unpack[offset:offset+body_length]
final_data = data.decode(consts.encoding)
offset += body_length
#self.flushprint("final_data : {}".format(final_data))
final_code = code.decode(consts.encoding)
final_from = from_.decode(consts.encoding)
#self.flushprint("body_length {}".format(body_length))
#self.flushprint("data : {}".format(data))
#self.flushprint("final_code : {}".format(final_code))
# build JSON to send
toforward = json.dumps({"from":final_from, "type":final_code,
"data": final_data});
#self.log_to_father("end decode tofoward {}".format(toforward))
#self.log_to_father("received message ".format(final_from))
# send the message to top layer
if self.controler_socket != None :
self.send_message_on_socket(toforward)
else :
self.receive_callback(toforward)
def write_on_taktuk(self, packed):
if not self.taktuk_ready :
self.taktuk_lock.acquire()
s = self.file_out
ret = self.insistent_write(packed)
return ret
# data need to be str format assuming utf-8
# Will be send using
def send_message_on_socket(self, data):
if not self.controler_ready :
self.controler_lock.acquire()
#self.flushprint(data)
to_send = self.pack(data)
rest_to_send = len(to_send)
try :
while self.poll and rest_to_send > 0 and not self.dead :
events = self.out_epoll.poll(consts.timeout)
for fileno, event in events:
if event & select.EPOLLOUT :
byteswritten = self.controler_socket.send(to_send)
if byteswritten < rest_to_send :
to_send = to_send[byteswritten:]
rest_to_send = rest_to_send - byteswritten
except:
self.bridge_dead()
self.flushprint("ouch !")
raise
# transform string to packed datas
# data need to be str format assuming utf-8
def pack(self, data):
# send the packed data and the packed length
data = bytes(data,consts.encoding)
return struct.pack(">i", len(data)) + data
# close the thread
def stop_select(self):
self.poll = False
# Send a log to the unix socket
def propagate_taktuk_infos(self) :
toforward = json.dumps({
consts.FROM:consts.INTERNAL,
consts.TYPE:consts.INFOS,
consts.RANK:self.rank,
});
if self.controler_socket != None :
self.send_message_on_socket(toforward)
else :
self.receive_callback(toforward)
# Send a log to the unix socket
def log_to_father(self, data) :
toforward = json.dumps({consts.FROM:consts.INTERNAL,
consts.TYPE:consts.LOG, consts.DATA: data});
if self.controler_socket != None :
self.send_message_on_socket(toforward)
else :
self.receive_callback(toforward)
# dest : string
# target : string
# command : letter
# data : string
# sync_bool : bool
def taktuk_send_message(self, dest, target, command, data, sync_bool) :
data = bytes(data, consts.encoding)
body_length=len(data)
send_to = TAKTUK_SEND_TO;
synchro = TAKTUK_SYNCHRONIZE
# The first four bytes of the header encode its own size not including
# the four bytes themselves
#
send_to = bytes(send_to ,consts.encoding)
dest = bytes(dest ,consts.encoding)
command = bytes(command ,consts.encoding)
target = bytes(target ,consts.encoding)
rank = bytes(self.rank,consts.encoding)
synchro = bytes(synchro ,consts.encoding)
#self.flushprint("send header "+str(send_to)+" "+str(rank)+" "+
# str(dest)+" "+str(target)+" "+str(body_length)+" "+str(command)
# +" "+str(data))
dest_size = len(dest)
target_size = len(target)
taktuk_from_size = len(rank)
# Do not take into account the first four bytes encoding the total size
# Unfortunately I have to compute size first to allocate my memory if
# needed ...
#
header_size = consts.UINT32
header_size += MSG_LENGTH
header_size += consts.UINT32 + dest_size
header_size += MSG_LENGTH
header_size += consts.UINT32 + target_size
header_size += consts.UINT32 + taktuk_from_size;
if sync_bool :
header_size += MSG_LENGTH
packed = struct.pack('>I', header_size-consts.UINT32+body_length)
if sync_bool :
packed += synchro
packed += send_to
packed += struct.pack('>I', dest_size)
packed += dest
packed += command
packed += struct.pack('>I', target_size)
packed += target
packed += struct.pack('>I', taktuk_from_size)
packed += rank
packed += data
assert len(packed) == header_size+body_length
#self.flushprint("need to send all {}".format(packed))
result = self.write_on_taktuk(packed);
return result;
# dest : string
# command : letter
# data : string
# sync_bool : bool
def taktuk_send_execute(self, dest, command, data, sync_bool) :
data = bytes(data, consts.encoding)
body_length=len(data)
send_to = TAKTUK_SEND_TO;
synchro = TAKTUK_SYNCHRONIZE
# The first four bytes of the header encode its own size not including
# the four bytes themselves
#
send_to = bytes(send_to ,consts.encoding)
dest = bytes(dest ,consts.encoding)
command = bytes(command ,consts.encoding)
synchro = bytes(synchro ,consts.encoding)
#self.flushprint("send header "+str(send_to)+" "+str(rank)+" "+
# str(dest)+" "+str(target)+" "+str(body_length)+" "+str(command)
# +" "+str(data))
dest_size = len(dest)
# Do not take into account the first four bytes encoding the total size
# Unfortunately I have to compute size first to allocate my memory if
# needed ...
#
header_size = consts.UINT32
header_size += MSG_LENGTH
header_size += consts.UINT32 + dest_size
header_size += MSG_LENGTH
if sync_bool :
header_size += MSG_LENGTH
packed = struct.pack('>I', header_size-consts.UINT32+body_length)
if sync_bool :
packed += synchro
packed += send_to
packed += struct.pack('>I', dest_size)
packed += dest
packed += command
packed += data
assert len(packed) == header_size+body_length
#self.flushprint("need to send all {}".format(packed))
result = self.write_on_taktuk(packed);