Commit 1eb78a38 authored by lavocat's avatar lavocat
Browse files

This repo only contains tests

parent acb1636d
project(c_wrapper C)
cmake_minimum_required(VERSION 3.0)
find_package(PkgConfig REQUIRED)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=c99")
set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -Wall")
IF(CMAKE_INSTALL_PREFIX_INITIALIZED_TO_DEFAULT)
# Set install directory to ../install
set(CMAKE_INSTALL_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/install" CACHE PATH "CMAKE_INSTALL_PREFIX: Install path prefix, prepended onto install directories." FORCE)
endif(CMAKE_INSTALL_PREFIX_INITIALIZED_TO_DEFAULT)
option(BUILD_WITH_ZMQ "Build with ZeroMQ" ON)
if(BUILD_WITH_ZMQ)
mark_as_advanced(FORCE ZMQ_LIBRARY
ZMQ_INCLUDE_DIR)
find_path(ZMQ_DIR NAMES include/zmq.h)
find_path(ZMQ_INCLUDE_DIR NAMES zmq.h HINTS ${ZMQ_DIR}/include)
find_library(ZMQ_LIBRARY NAMES zmq libzmq HINTS ${ZMQ_DIR}/lib)
set(ZMQ_INCLUDE_DIRS ${ZMQ_INCLUDE_DIR})
include_directories( ${ZMQ_INCLUDE_DIR} )
if(ZMQ_LIBRARY)
set(EXTRA_LIBS ${EXTRA_LIBS}
${ZMQ_LIBRARY} )
set( ENV{PKG_CONFIG_PATH} "$ENV{PKG_CONFIG_PATH}:${ZMQ_DIR}/lib/pkgconfig" )
pkg_check_modules(ZMQ REQUIRED libzmq>=4.1.5)
endif(ZMQ_LIBRARY)
if(ZMQ_LIBRARY AND ZMQ_INCLUDE_DIRS)
set(ZMQ_FOUND TRUE)
message(STATUS "ZeroMQ Found")
add_definitions( -DBUILD_WITH_ZMQ )
else(ZMQ_LIBRARY AND ZMQ_INCLUDE_DIRS)
# Disable Option if missing dependencies
set(BUILD_WITH_ZMQ FALSE CACHE BOOL "Build with ZeroMQ" FORCE)
message(STATUS "ZeroMQ missing. BUILD_WITH_ZMQ option turned OFF")
endif(ZMQ_LIBRARY AND ZMQ_INCLUDE_DIRS)
endif(BUILD_WITH_ZMQ)
add_library (erebor_wrapper SHARED erebor_wrapper.c)
target_link_libraries (erebor_wrapper LINK_PUBLIC zmq)
add_executable (client client.c)
add_executable (server server.c)
target_link_libraries (client LINK_PUBLIC erebor_wrapper)
target_link_libraries (server LINK_PUBLIC erebor_wrapper)
# Install library
install(TARGETS erebor_wrapper DESTINATION ${CMAKE_INSTALL_PREFIX}/lib/)
# Install library headers
file(GLOB HEADERS *.h)
install(FILES ${HEADERS} DESTINATION ${CMAKE_INSTALL_PREFIX}/include/)
#include <stdio.h>
#include "erebor_wrapper.h"
#include <unistd.h>
#include <getopt.h>
#include <stdlib.h>
int main(int argc, char** argv){
int opt;
char* in_ipc;
char* out_ipc = malloc(100);
char* network;
char* server;
char* str_rank = "0";
int rank =0;
do
{
opt = getopt (argc, argv, "S:R:P:N:");
switch (opt) {
case 'S':
server = optarg;
break;
case 'R':
rank = atoi(optarg);
str_rank = optarg;
break;
case 'P':
in_ipc = optarg;
int iout_ipc = atoi(optarg) + 100 + rank;
sprintf(out_ipc, "%d", iout_ipc);
break;
case 'N':
network = optarg;
break;
}
} while (opt != -1);
printf("client %d %s %s %s %s\n", rank, in_ipc, out_ipc, network, server);
erebor* e = malloc(sizeof(erebor));
int ret = erebor_init_connection(str_rank, network, in_ipc, out_ipc, e);
if (ret == -1){
erebor_print_error();
}else{
printf("ret %d", ret);
erebor_send_to(e, "0", "server", "coucou");
char *dest = malloc(100);
char *group = malloc(99);
char *message = malloc(100);
erebor_recv(e, dest, group, message);
printf("received message %s from %s@%s\n", message, dest, group);
}
erebor_close(e);
}
#include "erebor_wrapper.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <zmq.h>
#include <arpa/inet.h>
#include <stdbool.h>
static void *put_bytes(void *pos, const void *mem, size_t length);
static void *put_uint32(void *pos, uint32_t value);
int pack(int nb, char** params, char*pack);
int wait_size(char* datas, int* offset);
int send_to(erebor* erebor, char* dest, char*group, char* message, char* type, bool block);
int get_msg(erebor* erebor, char*dest, char* group, char*message, zmq_msg_t*msg);
void erebor_print_error(){
switch(errno){
case EAGAIN :
printf("Non-blocking mode was requested and the message cannot \
be sent at the moment.\n");
break;
case ENOTSUP :
printf("The zmq_send() operation is not supported by this socket type.\n");
break;
case EFSM :
printf("The zmq_send() operation cannot be performed on this socket \
at the moment due to the socket not being in the appropriate \
state. This error may occur with socket types that switch \
between several states, such as ZMQ_REP. \
See the messaging patterns section \
of zmq_socket(3) for more information.\n");
break;
case EINTR :
printf("The operation was interrupted by delivery of a signal before the message was sent.\n");
break;
case EHOSTUNREACH :
printf("The message cannot be routed.\n");
break;
case EINVAL :
printf("The endpoint supplied is invalid.");
break;
case EPROTONOSUPPORT :
printf("The requested transport protocol is not supported.");
break;
case ENOCOMPATPROTO :
printf("The requested transport protocol is not compatible with the socket type.");
break;
case ETERM :
printf("The 0MQ context associated with the specified socket was terminated.");
break;
case ENOTSOCK :
printf("The provided socket was invalid.");
break;
case EMTHREAD :
printf("No I/O thread is available to accomplish the task.");
break;
case EADDRINUSE :
printf("The requested address is already in use.");
break;
case EADDRNOTAVAIL :
printf("The requested address was not local.");
break;
case ENODEV :
printf("The requested address specifies a nonexistent interface.");
break;
}
}
static void *put_bytes(void *pos, const void *mem, size_t length) {
memcpy(pos, mem, length);
return ((char *) pos) + length;
}
static void *put_uint32(void *pos, uint32_t value) {
uint32_t to_be_sent;
char *src, *dst;
int size = sizeof(uint32_t);
to_be_sent = htonl(value);
src = (char *) &to_be_sent;
dst = (char *) pos;
while (size--)
*(dst++) = *(src++);
return dst;
}
int erebor_init_connection(char* str_rank, char* network, char* in_ipc, char* out_ipc, erebor* erebor)
{
erebor->buff = malloc(5);
erebor->rank = malloc(strlen(str_rank)+1);
erebor->network = malloc(strlen(network)+1);
erebor->context = zmq_ctx_new();
erebor->puller = zmq_socket (erebor->context, ZMQ_PULL);
erebor->pusher = zmq_socket (erebor->context, ZMQ_PUSH);
sprintf(erebor->rank, "%s", str_rank);
sprintf(erebor->network, "%s", network);
if(erebor->context == NULL)
return -1;
if(erebor->puller == NULL)
return -1;
if(erebor->pusher == NULL)
return -1;
char erebor_buffer[100];
//connect puller
sprintf (erebor_buffer, "ipc:///tmp/%s", out_ipc);
if (zmq_bind (erebor->puller, erebor_buffer) == -1){
return -1;
}
//printf("puller bound to %s\n", erebor_buffer);
//connect pusher
sprintf (erebor_buffer, "ipc:///tmp/%s", in_ipc);
if(zmq_connect (erebor->pusher, erebor_buffer) == -1){
return -1;
}
//printf("pusher connected to %s\n", erebor_buffer);
char* params[3];
params[0] = "register";
params[1] = erebor->rank;
params[2] = out_ipc;
//printf("type %s \n", params[0]);
//printf("rank %s \n", params[1]);
//printf("socket %s \n", params[2]);
int data_len = strlen(params[0])+strlen(params[1])+strlen(params[2])+12;
//printf("data len %d\n", data_len);
char* pck = malloc(data_len);
//printf("going to pack\n");
int len = pack(3, params, pck);
int ret = zmq_send (erebor->pusher, pck, len, 0);
free(pck);
return ret;
}
int pack(int nb_params, char** params, char*pck){
int len = 0;
char* tram = pck;
for(int i=0; i<nb_params; i++){
tram = put_uint32(tram, strlen(params[i]));
tram = put_bytes(tram, params[i], strlen(params[i]));
len += 4 + strlen(params[i]);
}
return len;
}
int wait_size(char* datas, int* offset){
char* place = &datas[*offset];
int* sizep = (int*) place;
int len = ntohl(sizep[0]);
*offset += 4;
return len;
}
int erebor_recv(erebor* erebor, char*dest, char* group, char*message){
zmq_msg_t msg;
int rc = zmq_msg_init (&msg);
if (rc != 0){
return rc;
}
rc = zmq_recvmsg (erebor->puller, &msg, 0);
if (rc == -1){
return rc;
}
else{
return get_msg(erebor, dest, group, message, &msg);
}
}
int erebor_non_block_recv(erebor* erebor, char*dest, char* group, char*message){
zmq_msg_t msg;
int rc = zmq_msg_init (&msg);
if (rc != 0){
return rc;
}
rc = zmq_recvmsg (erebor->puller, &msg, ZMQ_DONTWAIT);
if (rc == -1){
if(errno == EAGAIN){
return -2;
}
else{
return -1;
}
}else{
return get_msg(erebor, dest, group, message, &msg);
}
}
int get_msg(erebor* erebor, char*dest, char* group, char*message, zmq_msg_t*msg){
char* datas = (char*) zmq_msg_data (msg);
//printf("wait to receive dest\n");
int offset = 0;
//dest
int len = wait_size(datas, &offset);
memcpy(dest, &datas[offset], len);
offset += len;
dest[len] = 0x00;
//group
len = wait_size(datas, &offset);
memcpy(group, &datas[offset], len);
offset += len;
group[len] = 0x00;
//message
len = wait_size(datas, &offset);
memcpy(message, &datas[offset], len);
offset += len;
message[len] = 0x00;
zmq_msg_close (msg);
return 0;
}
int erebor_non_block_send_to(erebor* erebor, char* dest, char*group, char* message){
return send_to(erebor, dest, group, message, "mpi send", false);
}
int erebor_send_to(erebor* erebor, char* dest, char*group, char* message){
return send_to(erebor, dest, group, message, "mpi send", true);
}
int erebor_non_block_ctrl_msg_to(erebor* erebor, char*group, char* message){
return send_to(erebor, "0", group, message, "message", false);
}
int erebor_ctrl_msg_to(erebor* erebor, char*group, char* message){
return send_to(erebor, "0", group, message, "message", true);
}
int send_to(erebor* erebor, char* dest, char*group, char* message, char* type,
bool block){
char* params[6];
params[0] = type;
params[1] = dest;
params[2] = group;
params[3] = erebor->rank;
params[4] = erebor->network;
params[5] = message;
//printf("type %s \n", params[0]);
//printf("dest %s \n", params[1]);
//printf("group %s \n", params[2]);
//printf("rank %s \n", params[3]);
//printf("group %s \n", params[4]);
//printf("message %s \n", params[5]);
int data_len = strlen(params[0])+strlen(params[1])+strlen(params[2])+
strlen(params[3])+ strlen(params[4])+ strlen(params[5])+(6*4);
char* pck = malloc(data_len);
int len = pack(6, params, pck);
int ret = 0;
if(block){
ret = zmq_send (erebor->pusher, pck, len, 0);
free(pck);
return ret;
}else{
ret = zmq_send (erebor->pusher, pck, len, ZMQ_DONTWAIT);
if(ret == -1){
if(errno == EAGAIN){
return -2;
}else{
return -1;
}
}
free(pck);
return ret;
}
}
void erebor_close(erebor* erebor){
free(erebor->rank);
free(erebor->network);
zmq_close(erebor->pusher);
zmq_close(erebor->puller);
zmq_ctx_term(erebor->context);
}
#ifndef EREBOR
#define EREBOR
typedef struct erebor{
char *rank;
char *network;
void *context;
void *puller;
void *pusher;
char *buff;
} erebor;
/*
* Initialize a connection to an erebor daemon.
* str_rank is the MPI rank of this core
* network if the name of the group where this mpi_job is launched
* in_ipc is the name of the file to connect to the erebor daemon
* out_ipc is the name that will be given to the daemon in order to contact us.
* erebor is a pointer to an allocated erebor struct
*
* char* are copyed into the erebor data structure.
* They can be freed after this function call
* return -1 in case of error, erebor_print_error will give more indication
*/
int erebor_init_connection(char* str_rank, char* network, char* in_ipc, char* out_ipc, erebor* erebor);
/*
* Send a message to a foreign MPI rank on a foreign group
* erebor is a pointer to an initialized erebor struct
* dest is the MPI rank of the receiver
* group is the group name of the receiver
*
* char* can be freed after this function call
* return > 0 if a message sent
* return -1 for any other error, erebor_print_error will give more indication
*/
int erebor_send_to(erebor* erebor, char* dest, char* group, char* message);
/*
* Send a message to a foreign MPI rank on a foreign group
* erebor is a pointer to an initialized erebor struct
* dest is the MPI rank of the receiver
* group is the group name of the receiver
*
* char* can be freed after this function call
* return > 0 if a message sent
* return -2 if sending impossible
* return -1 for any other error, erebor_print_error will give more indication
*/
int erebor_non_block_send_to(erebor* erebor, char* dest, char* group, char* message);
/*
* Send a control message to an Erebor instance
* erebor is a pointer to an initialized erebor struct
* group is the group name of Erebor instance
*
* char* can be freed after this function call
* return > 0 if a message sent
* return -1 for any other error, erebor_print_error will give more indication
*/
int erebor_ctrl_msg_to(erebor* erebor, char* group, char* message);
/*
* Send a control message to an Erebor instance
* erebor is a pointer to an initialized erebor struct
* group is the group name of Erebor instance
*
* char* can be freed after this function call
* return > 0 if a message sent
* return -2 if sending impossible
* return -1 for any other error, erebor_print_error will give more indication
*/
int erebor_non_block_ctrl_msg_to(erebor* erebor, char* group, char* message);
/*
* Receive a message form erebor daemon
* erebor is a pointer to an initialized erebor struct
* dest preallocated buffer where the MPI rank of the sender will be written
* group preallocated buffer where the group of the sender will be written
* message preallocated buffer where the content of the message will be written
*
* return 0 if everything is fine
* return -1 otherwise, erebor_print_error will give more indication
*/
int erebor_recv(erebor* erebor, char*dest, char*group, char*message);
/*
* NON blocking receive
* erebor is a pointer to an initialized erebor struct
* dest preallocated buffer where the MPI rank of the sender will be written
* group preallocated buffer where the group of the sender will be written
* message preallocated buffer where the content of the message will be written
*
* return 0 if a message has arrived
* return -2 if no message was there
* return -1 for any other error, erebor_print_error will give more indication
*/
int erebor_non_block_recv(erebor* erebor, char*dest, char*group, char*message);
/*
* Close the connections.
* erebor is a pointer to an initialized erebor struct
*
* erebor data structure need to be freed afterward.
*/
void erebor_close(erebor* erebor);
/*
* Print a human readable error message from errno (from zmq errors)
*/
void erebor_print_error();
#endif
#include <stdio.h>
#include "erebor_wrapper.h"
#include <unistd.h>
#include <getopt.h>
#include <stdlib.h>
int main(int argc, char** argv){
int opt;
char* in_ipc;
char* out_ipc = malloc(100);
char* network;
int rank =0;
char* str_rank = "0";
int c =0;
do
{
opt = getopt (argc, argv, "P:N:C:");
switch (opt) {
case 'C':
c = atoi(optarg);
break;
case 'P':
in_ipc = optarg;
int iout_ipc = atoi(optarg) + 100 + rank;
sprintf(out_ipc, "%d", iout_ipc);
break;
case 'N':
network = optarg;
break;
}
} while (opt != -1);
printf("server %s %s %s %d\n", in_ipc, out_ipc, network, c);
erebor* e = malloc(sizeof(erebor));
int ret = erebor_init_connection(str_rank, network, in_ipc, out_ipc, e);
if (ret == -1){
erebor_print_error();
}else{
printf("ret %d\n", ret);
char *dest = malloc(100);
char *group = malloc(100);
char *message = malloc(100);
for(int i=0; i<c; i++){
printf("wait message %d\n", i);
erebor_recv(e, dest, group, message);
printf("received message %s from %s@%s\n", message, dest, group);
erebor_send_to(e, dest, group, "well received");
}
}
erebor_close(e);
}
from ereborc import Erebor
from encoder import MPIDecoder
from network import Network
from framework import FrameworkControler
from main import main
#!/usr/bin/env python3
import sys
import struct
from isengard import consts
class MPIDecoder:
def unpack(self, to_unpack) :
# length of the global message
offset = 0
fields = []
while offset < len(to_unpack) :
field_size = struct.unpack_from('>I', to_unpack[offset:offset+4])[0]
offset += 4
field = to_unpack[offset:offset+field_size]
offset += field_size
fields.append(field.decode(consts.encoding))
return fields
def pack(self, *fields) :
packed = None
for field in fields :
if packed == None :
packed = struct.pack('>I', len(field))
else :
packed += struct.pack('>I', len(field))
packed += bytes(field, consts.encoding)
return packed;
def main(argv):
d = MPIDecoder()
l = ["a", "b", "cd", "efghij", "a"]
e = d.pack(*l)
assert(d.unpack(e) == l)
if __name__ == "__main__":
sys.exit(main(sys