Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Thomas Lavocat
yggdrasil
Commits
f09ef1f2
Commit
f09ef1f2
authored
Jul 24, 2017
by
lavocat
Browse files
Create entry-points on binaries
parent
92426c09
Changes
4
Show whitespace changes
Inline
Side-by-side
bin/socket-bridge
deleted
100755 → 0
View file @
92426c09
#!/usr/bin/env python3
import
time
import
traceback
import
os
import
fcntl
import
getopt
import
json
import
queue
import
struct
import
sys
import
socket
import
select
import
yggdrasil
from
yggdrasil.isengard
import
consts
from
threading
import
Thread
from
threading
import
Timer
from
threading
import
Lock
HEADER_BUFFER
=
64
SMALL_BUFFER
=
20
MSG_LENGTH
=
1
READ_ACCESS
=
0
WRITE_ACCESS
=
1
GET_INFO_ACCESS
=
2
RECV_ACCESS
=
3
TAKTUK_EMAXCD
=
21
TAKTUK_TARGET_ANY
=
0xFFFFFFFF
TAKTUK_TARGET_ALL
=
0xFFFFFFFE
TAKTUK_TARGET_OUTPUT
=
0xFFFFFFFD
TAKTUK_READ_SIZE
=
8192
TAKTUK_WRITE_SIZE
=
8192
TAKTUK_ACTION
=
"A"
;
TAKTUK_WAIT
=
"B"
;
TAKTUK_ID
=
"C"
;
TAKTUK_EOF
=
"D"
;
TAKTUK_TAKTUK_PERL
=
"E"
;
TAKTUK_GATEWAY
=
"F"
;
TAKTUK_GET
=
"G"
;
TAKTUK_INVALID
=
"H"
;
TAKTUK_INFO
=
"I"
;
TAKTUK_COMMAND_SEND_TO
=
"J"
;
TAKTUK_COMMAND_MESSAGE
=
"K"
;
TAKTUK_OPTION
=
"N"
;
TAKTUK_TIMEOUT
=
"O"
;
TAKTUK_PUT
=
"P"
;
TAKTUK_REDUCE_RESULT
=
"Q"
;
TAKTUK_REDUCE
=
"R"
;
TAKTUK_SPREAD
=
"S"
;
TAKTUK_TAKTUK_CODE
=
"T"
;
TAKTUK_UPDATE_FAILED
=
"U"
;
TAKTUK_OPTIONS
=
"V"
;
TAKTUK_WAIT_MESSAGE
=
"W"
;
TAKTUK_RESIGN
=
"X"
;
TAKTUK_ARGUMENTS
=
"a"
;
TAKTUK_BROADCAST
=
"b"
;
TAKTUK_DOWN
=
"d"
;
TAKTUK_EXECUTE
=
"e"
;
TAKTUK_FILE
=
"f"
;
TAKTUK_GET_INFO
=
"g"
;
TAKTUK_INPUT
=
"i"
;
TAKTUK_KILL
=
"k"
;
TAKTUK_MESSAGE
=
"m"
;
TAKTUK_NUMBERED
=
"n"
;
TAKTUK_OUTPUT
=
"o"
;
TAKTUK_POSITION
=
"p"
;
TAKTUK_QUIT
=
"q"
;
TAKTUK_READY
=
"r"
;
TAKTUK_STEAL
=
"s"
;
TAKTUK_SEND_TO
=
"t"
;
TAKTUK_FORWARD_UP
=
"u"
;
TAKTUK_WORK
=
"w"
;
TAKTUK_SYNCHRONIZE
=
"x"
;
TAKTUK_PIPE
=
"z"
;
# Reduce types
TAKTUK_REDUCE_COUNT
=
'c'
;
TAKTUK_REDUCE_TREE
=
't'
;
TAKTUK_REDUCE_WAIT
=
'w'
;
# Class Bridge
# Author Thomas Lavocat
#
# Listen on a unix socket on one side and transfert messages to the taktuk
# network, and listen to the taktuk network and transfert the messages to the
# unix socket
class
Bridge
(
Thread
):
# need a valid opened socket
def
__init__
(
self
,
controler_socket
,
debug_list
,
receive_callback
=
None
):
Thread
.
__init__
(
self
)
self
.
debug_list
=
debug_list
self
.
debug
=
"bridge"
in
self
.
debug_list
#connect to the taktuk network
self
.
ind
=
int
(
os
.
getenv
(
"TAKTUK_CONTROL_READ"
))
self
.
outd
=
int
(
os
.
getenv
(
"TAKTUK_CONTROL_WRITE"
))
self
.
rank
=
os
.
getenv
(
"TAKTUK_RANK"
)
self
.
dead
=
False
#Opened for bytes transactions
self
.
file_out
=
os
.
fdopen
(
self
.
outd
,
"wb"
)
self
.
file_in
=
os
.
fdopen
(
self
.
ind
,
"rb"
)
flag
=
fcntl
.
fcntl
(
self
.
ind
,
fcntl
.
F_GETFL
)
fcntl
.
fcntl
(
self
.
ind
,
fcntl
.
F_SETFL
,
flag
|
os
.
O_NONBLOCK
)
self
.
taktuk_lock
=
Lock
()
#acquire until thread is started (it wont be long)
self
.
taktuk_ready
=
False
self
.
taktuk_lock
.
acquire
()
self
.
controler_socket
=
controler_socket
self
.
receive_callback
=
receive_callback
if
self
.
controler_socket
!=
None
:
self
.
flushprint
(
"connected to erebor"
)
self
.
controler_lock
=
Lock
()
self
.
controler_ready
=
False
self
.
controler_lock
.
acquire
()
self
.
daemon
=
True
self
.
poll
=
True
self
.
message_queues
=
{}
self
.
outputs
=
[]
def
flushprint
(
self
,
*
args
,
**
kwargs
):
if
self
.
debug
:
print
(
*
args
,
file
=
sys
.
stdout
,
**
kwargs
)
sys
.
stdout
.
flush
()
# Write on taktuk every requested datas, this code is taken from C version
def
insistent_write
(
self
,
data
)
:
size
=
len
(
data
)
if
self
.
dead
:
return
-
1
if
size
==
0
:
return
0
try
:
while
self
.
poll
and
(
size
>
0
)
and
not
self
.
dead
and
self
.
poll
:
#self.flushprint("write")
result
=
self
.
file_out
.
write
(
data
)
if
result
<=
0
or
result
==
None
:
return
-
1
size
-=
result
data
=
data
[
size
:]
except
:
return
-
1
self
.
file_out
.
flush
()
return
0
# close the taktuk's file descriptors
def
close
(
self
):
self
.
file_out
.
close
()
self
.
file_in
.
close
()
def
run
(
self
):
epoll
=
select
.
epoll
()
if
self
.
controler_socket
!=
None
:
self
.
out_epoll
=
select
.
epoll
()
self
.
out_epoll
.
register
(
self
.
controler_socket
.
fileno
(),
select
.
EPOLLOUT
)
epoll
.
register
(
self
.
controler_socket
.
fileno
(),
select
.
EPOLLIN
)
self
.
controler_ready
=
True
self
.
controler_lock
.
release
()
epoll
.
register
(
self
.
ind
,
select
.
EPOLLIN
)
self
.
taktuk_ready
=
True
self
.
taktuk_lock
.
release
()
try
:
connections
=
{};
while
self
.
poll
and
not
self
.
dead
:
s_header_m
=
None
s_rest_header
=
0
s_buffered_m
=
None
s_rest_to_read
=
0
t_header_m
=
None
t_rest_header
=
0
t_buffered_m
=
None
t_rest_to_read
=
0
events
=
epoll
.
poll
(
consts
.
timeout
)
#self.flushprint("epolling")
for
fileno
,
event
in
events
:
#self.flushprint("event to handle")
if
event
&
select
.
EPOLLIN
:
if
(
self
.
controler_socket
!=
None
and
fileno
==
self
.
controler_socket
.
fileno
())
:
length
=
0
# if we start a new message
if
s_buffered_m
==
None
:
#self.flushprint("new message")
data
=
bytearray
()
# if we start a new header
if
s_header_m
==
None
:
#self.flushprint("new header")
data
=
self
.
controler_socket
.
recv
(
4
)
# otherwise complete the previous header
else
:
data
=
s_header_m
+
self
.
controler_socket
.
recv
(
s_rest_header
)
# If we havent received enough to have a valid
# header, wait for the rest to come
if
len
(
data
)
<
4
:
s_header_m
=
data
s_rest_header
=
4
-
len
(
s_header_m
)
length
=
-
1
# Otherwise, lets decode the rest of the message
else
:
# unpack the packet length
length
=
struct
.
unpack_from
(
'>i'
,
data
)[
0
]
#self.flushprint("header complete {}".format(length))
s_header_m
=
None
else
:
length
=
s_rest_to_read
if
length
>
-
1
:
# read what we're supposed to
data
=
self
.
controler_socket
.
recv
(
length
)
#self.flushprint("content read")
# if the received message is a full one, handle it,
# otherwise, store it for later
if
len
(
data
)
==
length
:
#self.flushprint("good size ")
if
s_buffered_m
!=
None
:
data
=
s_buffered_m
+
data
s_buffered_m
=
None
self
.
_handle_message_from_socket
(
data
)
else
:
s_rest_to_read
=
length
-
len
(
data
)
if
s_buffered_m
==
None
:
s_buffered_m
=
data
else
:
s_buffered_m
=
s_buffered_m
+
data
if
fileno
==
self
.
ind
:
nbRead
=
0
read_something
=
True
error
=
False
while
self
.
poll
and
read_something
and
not
error
and
not
self
.
dead
:
length
=
0
# if we start a new message
if
t_buffered_m
==
None
:
data
=
bytearray
()
# if we start a new header
if
t_header_m
==
None
:
data
=
self
.
file_in
.
read
(
4
)
if
data
==
None
:
read_something
=
False
break
else
:
if
len
(
data
)
==
0
:
read_something
=
False
error
=
True
break
# otherwise complete the previous header
else
:
temp
=
self
.
file_in
.
read
(
t_rest_header
)
if
data
==
None
:
read_something
=
False
break
else
:
if
len
(
data
)
==
0
:
read_something
=
False
error
=
True
break
data
=
t_header_m
+
temp
# If we havent received enough to have a valid
# header, wait for the rest to come
if
len
(
data
)
<
4
:
t_header_m
=
data
t_rest_header
=
4
-
len
(
t_header_m
)
length
=
-
1
# Otherwise, lets decode the rest of the message
else
:
# unpack the packet length
length
=
struct
.
unpack_from
(
'>i'
,
data
)[
0
]
t_header_m
=
None
else
:
length
=
t_rest_to_read
if
length
>
-
1
:
# read what we're supposed to
t_buffered_m
=
data
data
=
self
.
file_in
.
read
(
length
)
if
data
==
None
:
read_something
=
False
break
else
:
if
len
(
data
)
==
0
:
read_something
=
False
error
=
True
break
# if the received message is a full one, handle it,
# otherwise, store it for later
if
len
(
data
)
==
length
:
if
t_buffered_m
!=
None
:
data
=
t_buffered_m
+
data
t_buffered_m
=
None
self
.
extract_taktuk_message
(
data
)
else
:
t_rest_to_read
=
length
-
len
(
data
)
if
t_buffered_m
==
None
:
t_buffered_m
=
data
else
:
t_buffered_m
=
t_buffered_m
+
data
if
read_something
:
nbRead
+=
1
if
nbRead
==
0
or
error
:
self
.
bridge_dead
()
else
:
self
.
bridge_dead
()
except
Exception
as
e
:
self
.
flushprint
(
"{}"
.
format
(
e
))
raise
finally
:
self
.
bridge_dead
()
self
.
flushprint
(
"error"
)
def
bridge_dead
(
self
):
self
.
dead
=
True
self
.
poll
=
False
self
.
flushprint
(
"bridge dead"
)
toforward
=
json
.
dumps
({
consts
.
FROM
:
consts
.
INTERNAL
,
consts
.
TYPE
:
consts
.
BRIDGED
,
});
if
self
.
controler_socket
!=
None
:
self
.
send_message_on_socket
(
toforward
)
else
:
self
.
receive_callback
(
toforward
)
# Extract a message from taktuk and send it to the unix socket as a JSON
# message TODO
def
extract_taktuk_message
(
self
,
to_unpack
)
:
#self.flushprint("\n get datas \n")
# The sent message by taktuk is of the following form
# code + size in the header
# so 4 bytes + 1
#self.flushprint("try to extract message {}".format(to_unpack))
offset
=
0
length
=
struct
.
unpack_from
(
'>I'
,
to_unpack
[
offset
:
4
])[
0
]
offset
+=
4
#self.flushprint("length : {}".format(length))
body_length
=
length
-
MSG_LENGTH
#self.flushprint("body_length {}".format(body_length))
code
=
struct
.
unpack_from
(
's'
,
to_unpack
[
offset
:
offset
+
1
])[
0
]
offset
+=
1
#self.flushprint("code {}".format(code))
# FROM TODO
from_size
=
struct
.
unpack_from
(
'>I'
,
to_unpack
[
offset
:
offset
+
4
])[
0
]
offset
+=
4
#self.flushprint("from_size {}".format(from_size))
from_
=
struct
.
unpack_from
(
's'
,
to_unpack
[
offset
:
offset
+
from_size
])[
0
]
offset
+=
from_size
#self.flushprint("from {}".format(from_))
# READ final datas
body_length
=
body_length
-
5
data
=
to_unpack
[
offset
:
offset
+
body_length
]
final_data
=
data
.
decode
(
consts
.
encoding
)
offset
+=
body_length
#self.flushprint("final_data : {}".format(final_data))
final_code
=
code
.
decode
(
consts
.
encoding
)
final_from
=
from_
.
decode
(
consts
.
encoding
)
#self.flushprint("body_length {}".format(body_length))
#self.flushprint("data : {}".format(data))
#self.flushprint("final_code : {}".format(final_code))
# build JSON to send
toforward
=
json
.
dumps
({
"from"
:
final_from
,
"type"
:
final_code
,
"data"
:
final_data
});
#self.log_to_father("end decode tofoward {}".format(toforward))
#self.log_to_father("received message ".format(final_from))
# send the message to top layer
if
self
.
controler_socket
!=
None
:
self
.
send_message_on_socket
(
toforward
)
else
:
self
.
receive_callback
(
toforward
)
def
write_on_taktuk
(
self
,
packed
):
if
not
self
.
taktuk_ready
:
self
.
taktuk_lock
.
acquire
()
s
=
self
.
file_out
ret
=
self
.
insistent_write
(
packed
)
return
ret
# data need to be str format assuming utf-8
# Will be send using
def
send_message_on_socket
(
self
,
data
):
if
not
self
.
controler_ready
:
self
.
controler_lock
.
acquire
()
#self.flushprint(data)
to_send
=
self
.
pack
(
data
)
rest_to_send
=
len
(
to_send
)
try
:
while
self
.
poll
and
rest_to_send
>
0
and
not
self
.
dead
:
events
=
self
.
out_epoll
.
poll
(
consts
.
timeout
)
for
fileno
,
event
in
events
:
if
event
&
select
.
EPOLLOUT
:
byteswritten
=
self
.
controler_socket
.
send
(
to_send
)
if
byteswritten
<
rest_to_send
:
to_send
=
to_send
[
byteswritten
:]
rest_to_send
=
rest_to_send
-
byteswritten
except
:
self
.
bridge_dead
()
self
.
flushprint
(
"ouch !"
)
raise
# transform string to packed datas
# data need to be str format assuming utf-8
def
pack
(
self
,
data
):
# send the packed data and the packed length
data
=
bytes
(
data
,
consts
.
encoding
)
return
struct
.
pack
(
">i"
,
len
(
data
))
+
data
# close the thread
def
stop_select
(
self
):
self
.
poll
=
False
# Send a log to the unix socket
def
propagate_taktuk_infos
(
self
)
:
toforward
=
json
.
dumps
({
consts
.
FROM
:
consts
.
INTERNAL
,
consts
.
TYPE
:
consts
.
INFOS
,
consts
.
RANK
:
self
.
rank
,
});
if
self
.
controler_socket
!=
None
:
self
.
send_message_on_socket
(
toforward
)
else
:
self
.
receive_callback
(
toforward
)
# Send a log to the unix socket
def
log_to_father
(
self
,
data
)
:
toforward
=
json
.
dumps
({
consts
.
FROM
:
consts
.
INTERNAL
,
consts
.
TYPE
:
consts
.
LOG
,
consts
.
DATA
:
data
});
if
self
.
controler_socket
!=
None
:
self
.
send_message_on_socket
(
toforward
)
else
:
self
.
receive_callback
(
toforward
)
# dest : string
# target : string
# command : letter
# data : string
# sync_bool : bool
def
taktuk_send_message
(
self
,
dest
,
target
,
command
,
data
,
sync_bool
)
:
data
=
bytes
(
data
,
consts
.
encoding
)
body_length
=
len
(
data
)
send_to
=
TAKTUK_SEND_TO
;
synchro
=
TAKTUK_SYNCHRONIZE
# The first four bytes of the header encode its own size not including
# the four bytes themselves
#
send_to
=
bytes
(
send_to
,
consts
.
encoding
)
dest
=
bytes
(
dest
,
consts
.
encoding
)
command
=
bytes
(
command
,
consts
.
encoding
)
target
=
bytes
(
target
,
consts
.
encoding
)
rank
=
bytes
(
self
.
rank
,
consts
.
encoding
)
synchro
=
bytes
(
synchro
,
consts
.
encoding
)
#self.flushprint("send header "+str(send_to)+" "+str(rank)+" "+
# str(dest)+" "+str(target)+" "+str(body_length)+" "+str(command)
# +" "+str(data))
dest_size
=
len
(
dest
)
target_size
=
len
(
target
)
taktuk_from_size
=
len
(
rank
)
# Do not take into account the first four bytes encoding the total size
# Unfortunately I have to compute size first to allocate my memory if
# needed ...
#
header_size
=
consts
.
UINT32
header_size
+=
MSG_LENGTH
header_size
+=
consts
.
UINT32
+
dest_size
header_size
+=
MSG_LENGTH
header_size
+=
consts
.
UINT32
+
target_size
header_size
+=
consts
.
UINT32
+
taktuk_from_size
;
if
sync_bool
:
header_size
+=
MSG_LENGTH
packed
=
struct
.
pack
(
'>I'
,
header_size
-
consts
.
UINT32
+
body_length
)
if
sync_bool
:
packed
+=
synchro
packed
+=
send_to
packed
+=
struct
.
pack
(
'>I'
,
dest_size
)
packed
+=
dest
packed
+=
command
packed
+=
struct
.
pack
(
'>I'
,
target_size
)
packed
+=
target
packed
+=
struct
.
pack
(
'>I'
,
taktuk_from_size
)
packed
+=
rank
packed
+=
data
assert
len
(
packed
)
==
header_size
+
body_length
#self.flushprint("need to send all {}".format(packed))
result
=
self
.
write_on_taktuk
(
packed
);
return
result
;
# dest : string
# command : letter
# data : string
# sync_bool : bool
def
taktuk_send_execute
(
self
,
dest
,
command
,
data
,
sync_bool
)
:
data
=
bytes
(
data
,
consts
.
encoding
)
body_length
=
len
(
data
)
send_to
=
TAKTUK_SEND_TO
;
synchro
=
TAKTUK_SYNCHRONIZE
# The first four bytes of the header encode its own size not including
# the four bytes themselves
#
send_to
=
bytes
(
send_to
,
consts
.
encoding
)
dest
=
bytes
(
dest
,
consts
.
encoding
)
command
=
bytes
(
command
,
consts
.
encoding
)
synchro
=
bytes
(
synchro
,
consts
.
encoding
)
#self.flushprint("send header "+str(send_to)+" "+str(rank)+" "+
# str(dest)+" "+str(target)+" "+str(body_length)+" "+str(command)
# +" "+str(data))
dest_size
=
len
(
dest
)
# Do not take into account the first four bytes encoding the total size
# Unfortunately I have to compute size first to allocate my memory if
# needed ...
#
header_size
=
consts
.
UINT32
header_size
+=
MSG_LENGTH
header_size
+=
consts
.
UINT32
+
dest_size
header_size
+=
MSG_LENGTH
if
sync_bool
:
header_size
+=
MSG_LENGTH
packed
=
struct
.
pack
(
'>I'
,
header_size
-
consts
.
UINT32
+
body_length
)
if
sync_bool
:
packed
+=
synchro
packed
+=
send_to
packed
+=
struct
.
pack
(
'>I'
,
dest_size
)
packed
+=
dest
packed
+=
command
packed
+=
data
assert
len
(
packed
)
==
header_size
+
body_length
#self.flushprint("need to send all {}".format(packed))
result
=
self
.
write_on_taktuk
(
packed
);
return
result
;
# dest : string
# command : letter
# option : letter
# data : string
# sync_bool : bool
def
taktuk_send_spawn_nodes
(
self
,
dest
,
command
,
option
,
data
,
sync_bool
)
:
data
=
bytes
(
data
,
consts
.
encoding
)
body_length
=
len
(
data
)
send_to
=
TAKTUK_SEND_TO
;
synchro
=
TAKTUK_SYNCHRONIZE
# The first four bytes of the header encode its own size not including
# the four bytes themselves
#
send_to
=
bytes
(
send_to
,
consts
.
encoding
)
dest
=
bytes
(
dest
,
consts
.
encoding
)
command
=
bytes
(
command
,
consts
.
encoding
)
option
=
bytes
(
option
,
consts
.
encoding
)
synchro
=
bytes
(
synchro
,
consts
.
encoding
)
#self.flushprint("send header "+str(send_to)+" "+str(rank)+" "+
# str(dest)+" "+str(target)+" "+str(body_length)+" "+str(command)
# +" "+str(data))
dest_size
=
len
(
dest
)
# Do not take into account the first four bytes encoding the total size
# Unfortunately I have to compute size first to allocate my memory if
# needed ...
#
header_size
=
consts
.
UINT32
header_size
+=
MSG_LENGTH
header_size
+=
consts
.
UINT32
+
dest_size
header_size
+=
MSG_LENGTH
header_size
+=
MSG_LENGTH
if
sync_bool
:
header_size
+=
MSG_LENGTH
packed
=
struct
.
pack
(
'>I'
,
header_size
-
consts
.
UINT32
+
body_length
)
if
sync_bool
:
packed
+=
synchro
packed
+=
send_to
packed
+=
struct
.
pack
(
'>I'
,
dest_size
)
packed
+=
dest
packed
+=
command
packed
+=
option
packed
+=
data
assert
len
(
packed
)
==
header_size
+
body_length
#self.flushprint("need to send all {}".format(packed))
result
=
self
.
write_on_taktuk
(
packed
);
return
result
;
# sync_bool : bool
def
taktuk_send_network
(
self
,
sync_bool
)
:
send_to
=
TAKTUK_REDUCE
command
=
TAKTUK_SEND_TO
synchro
=
TAKTUK_SYNCHRONIZE
# The first four bytes of the header encode its own size not including
# the four bytes themselves
#
send_to
=
bytes
(
send_to
,
consts
.
encoding
)
command
=
bytes
(
command
,
consts
.
encoding
)
synchro
=
bytes
(
synchro
,
consts
.
encoding
)
# Do not take into account the first four bytes encoding the total size
# Unfortunately I have to compute size first to allocate my memory if
# needed ...
#
header_size
=
consts
.
UINT32
header_size
+=
MSG_LENGTH
header_size
+=
MSG_LENGTH
if
sync_bool
:
header_size
+=
MSG_LENGTH