Commit 92c53d9b authored by Millian Poquet's avatar Millian Poquet
Browse files

Merge branch 'coalloc' into coalloc_to_master

parents de0f92dc 6fe904da
......@@ -879,14 +879,14 @@ void export_schedule_to_csv(const std::string &filename, const BatsimContext *co
output_map["nb_computing_machines"] = to_string(context->machines.nb_machines());
XBT_INFO("jobs=%d, finished=%d, success=%d, killed=%d, success_rate=%lf",
nb_jobs, nb_jobs_finished, nb_jobs_success, nb_jobs_killed, success_rate);
XBT_INFO("makespan=%lf, scheduling_time=%lf, mean_waiting_time=%lf, mean_turnaround_time=%lf,"
nb_jobs, nb_jobs_finished, nb_jobs_success, nb_jobs_killed, success_rate);
XBT_INFO("makespan=%lf, scheduling_time=%lf, mean_waiting_time=%lf, mean_turnaround_time=%lf, "
"mean_slowdown=%lf, max_waiting_time=%lf, max_turnaround_time=%lf, max_slowdown=%lf",
(double)makespan, (double)seconds_used_by_scheduler,
mean_waiting_time, mean_turnaround_time, mean_slowdown,
(double)mean_waiting_time, (double)mean_turnaround_time, mean_slowdown,
(double)max_waiting_time, (double)max_turnaround_time, (double)max_slowdown);
XBT_INFO("mean_machines_running=%lf, max_machines_running=%lf",
mean_time_running, max_time_running);
(double)mean_time_running, (double)max_time_running);
Rational total_consumed_energy = context->energy_last_job_completion - context->energy_first_job_submission;
output_map["consumed_joules"] = to_string((double) total_consumed_energy);
......
......@@ -127,6 +127,12 @@ std::string ip_message_type_to_string(IPMessageType type)
break;
case IPMessageType::CONTINUE_DYNAMIC_SUBMIT:
s = "CONTINUE_DYNAMIC_SUBMIT";
break;
case IPMessageType::TO_JOB_MSG:
s = "TO_JOB_MSG";
break;
case IPMessageType::FROM_JOB_MSG:
s = "FROM_JOB_MSG";
}
return s;
......@@ -251,6 +257,16 @@ IPMessage::~IPMessage()
case IPMessageType::CONTINUE_DYNAMIC_SUBMIT:
{
} break;
case IPMessageType::TO_JOB_MSG:
{
ToJobMessage * msg = (ToJobMessage *) data;
delete msg;
} break;
case IPMessageType::FROM_JOB_MSG:
{
FromJobMessage * msg = (FromJobMessage *) data;
delete msg;
} break;
}
data = nullptr;
......
......@@ -8,6 +8,8 @@
#include <vector>
#include <string>
#include <rapidjson/document.h>
#include <simgrid/msg.h>
#include "machine_range.hpp"
......@@ -73,6 +75,8 @@ enum class IPMessageType
,SWITCHED_OFF //!< SwitcherOFF -> Server. The switcherOFF process tells the server the machine pstate has been changed.
,END_DYNAMIC_SUBMIT //!< Scheduler -> Server. The scheduler tells the server that dynamic job submissions are finished.
,CONTINUE_DYNAMIC_SUBMIT //!< Scheduler -> Server. The scheduler tells the server that dynamic job submissions continue.
,TO_JOB_MSG //!< Scheduler -> Server. The scheduler sends a message to a job.
,FROM_JOB_MSG //!< Job -> Server. The job wants to send a message to the scheduler via the server.
};
/**
......@@ -228,6 +232,24 @@ struct KillingDoneMessage
std::vector<JobIdentifier> jobs_ids; //!< The IDs of the jobs which have been killed
};
/**
* @brief The content of the ToJobMessage message
*/
struct ToJobMessage
{
JobIdentifier job_id; //!< The JobIdentifier
std::string message; //!< The message to send to the job
};
/**
* @brief The content of the FromJobMessage message
*/
struct FromJobMessage
{
JobIdentifier job_id; //!< The JobIdentifier
rapidjson::Document message; //!< The message to send to the scheduler
};
/**
* @brief The base struct sent in inter-process messages
*/
......
......@@ -348,6 +348,9 @@ string job_state_to_string(JobState state)
case JobState::JOB_STATE_COMPLETED_SUCCESSFULLY:
job_state = "COMPLETED_SUCCESSFULLY";
break;
case JobState::JOB_STATE_COMPLETED_FAILED:
job_state = "COMPLETED_FAILED";
break;
case JobState::JOB_STATE_COMPLETED_KILLED:
job_state = "COMPLETED_KILLED";
break;
......@@ -378,6 +381,10 @@ JobState job_state_from_string(std::string state)
{
new_state = JobState::JOB_STATE_COMPLETED_SUCCESSFULLY;
}
else if (state == "COMPLETED_FAILED")
{
new_state = JobState::JOB_STATE_COMPLETED_FAILED;
}
else if (state == "COMPLETED_KILLED")
{
new_state = JobState::JOB_STATE_COMPLETED_KILLED;
......
......@@ -7,6 +7,7 @@
#include <map>
#include <vector>
#include <deque>
#include <rapidjson/document.h>
......@@ -26,7 +27,8 @@ enum class JobState
JOB_STATE_NOT_SUBMITTED //!< The job exists but cannot be scheduled yet.
,JOB_STATE_SUBMITTED //!< The job has been submitted, it can now be scheduled.
,JOB_STATE_RUNNING //!< The job has been scheduled and is currently being processed.
,JOB_STATE_COMPLETED_SUCCESSFULLY //!< The job execution finished before its walltime.
,JOB_STATE_COMPLETED_SUCCESSFULLY //!< The job execution finished before its walltime successfully.
,JOB_STATE_COMPLETED_FAILED //!< The job execution finished before its walltime but the job failed.
,JOB_STATE_COMPLETED_KILLED //!< The job has been killed.
,JOB_STATE_REJECTED //!< The job has been rejected by the scheduler.
};
......@@ -55,11 +57,14 @@ struct Job
long double consumed_energy; //!< The sum, for each machine on which the job has been allocated, of the consumed energy (in Joules) during the job execution time (consumed_energy_after_job_completion - consumed_energy_before_job_start)
std::deque<std::string> incoming_message_buffer; //!< The buffer for incoming messages from the scheduler.
Rational starting_time; //!< The time at which the job starts to be executed.
Rational runtime; //!< The amount of time during which the job has been executed
MachineRange allocation; //!< The machines on which the job has been executed.
std::vector<int> smpi_ranks_to_hosts_mapping; //!< If the job uses a SMPI profile, stores which host number each MPI rank should use. These numbers must be in [0,required_nb_res[.
JobState state; //!< The current state of the job
int return_code = -1; //!< The return code of the job
/**
* @brief Creates a new-allocated Job from a JSON description
......
......@@ -2,6 +2,7 @@
* @file jobs_execution.cpp
* @brief Contains functions related to the execution of the jobs
*/
#include <regex>
#include "jobs_execution.hpp"
#include "jobs.hpp"
......@@ -46,6 +47,7 @@ int execute_profile(BatsimContext *context,
{
Workload * workload = context->workloads.at(allocation->job_id.workload_name);
Job * job = workload->jobs->at(allocation->job_id.job_number);
JobIdentifier job_id(workload->name, job->number);
Profile * profile = workload->profiles->at(profile_name);
int nb_res = job->required_nb_res;
......@@ -259,11 +261,11 @@ int execute_profile(BatsimContext *context,
msg_error_t err = MSG_parallel_task_execute_with_timeout(ptask, *remaining_time);
*remaining_time = *remaining_time - (MSG_get_clock() - time_before_execute);
int ret = 1;
int ret = profile->return_code;
if (err == MSG_OK) {}
else if (err == MSG_TIMEOUT)
{
ret = 0;
ret = -1;
}
else
{
......@@ -286,14 +288,85 @@ int execute_profile(BatsimContext *context,
{
for (unsigned int j = 0; j < data->sequence.size(); j++)
{
if (execute_profile(context, data->sequence[j], allocation,
cleanup_data, remaining_time) == 0)
int ret_last_profile = execute_profile(context, data->sequence[j], allocation,
cleanup_data, remaining_time);
if (ret_last_profile != 0)
{
return 0;
return ret_last_profile;
}
}
}
return 1;
return profile->return_code;
}
else if (profile->type == ProfileType::SCHEDULER_SEND)
{
SchedulerSendProfileData * data = (SchedulerSendProfileData *) profile->data;
XBT_INFO("Sending message to the scheduler");
FromJobMessage * message = new FromJobMessage;
message->job_id = job_id;
message->message.CopyFrom(data->message, message->message.GetAllocator());
send_message("server", IPMessageType::FROM_JOB_MSG, (void*)message);
if (delay_job(0.005, remaining_time) == -1)
return -1;
return profile->return_code;
}
else if (profile->type == ProfileType::SCHEDULER_RECV)
{
SchedulerRecvProfileData * data = (SchedulerRecvProfileData *) profile->data;
string profile_to_execute = "";
bool has_messages = false;
XBT_INFO("Trying to receive message from scheduler");
if (job->incoming_message_buffer.empty()) {
if (data->on_timeout == "") {
XBT_INFO("Waiting for message from scheduler");
while (true) {
if (delay_job(0.005, remaining_time) == -1)
return -1;
if (!job->incoming_message_buffer.empty()) {
XBT_INFO("Finally got message from scheduler");
has_messages = true;
break;
}
}
} else {
XBT_INFO("Timeout on waiting for message from scheduler");
profile_to_execute = data->on_timeout;
}
} else {
has_messages = true;
}
if (has_messages) {
string first_message = job->incoming_message_buffer.front();
job->incoming_message_buffer.pop_front();
regex msg_regex(data->regex);
if (regex_match(first_message, msg_regex)) {
XBT_INFO("Message from scheduler matches");
profile_to_execute = data->on_success;
} else {
XBT_INFO("Message from scheduler does not match");
profile_to_execute = data->on_failure;
}
}
if (profile_to_execute != "") {
XBT_INFO("Execute profile: %s", profile_to_execute.c_str());
int ret_last_profile = execute_profile(context, profile_to_execute, allocation,
cleanup_data, remaining_time);
if (ret_last_profile != 0) {
return ret_last_profile;
}
}
return profile->return_code;
}
else if (profile->type == ProfileType::DELAY)
{
......@@ -305,7 +378,7 @@ int execute_profile(BatsimContext *context,
MSG_process_sleep(data->delay);
XBT_INFO("Sleeping done");
*remaining_time = *remaining_time - data->delay;
return 1;
return profile->return_code;
}
else
{
......@@ -313,7 +386,7 @@ int execute_profile(BatsimContext *context,
MSG_process_sleep(*remaining_time);
XBT_INFO("Walltime reached");
*remaining_time = 0;
return 0;
return -1;
}
}
else if (profile->type == ProfileType::SMPI)
......@@ -385,13 +458,31 @@ int execute_profile(BatsimContext *context,
}
MSG_sem_acquire(sem);
free(sem);
return 1;
return profile->return_code;
}
else
xbt_die("Cannot execute job %s: the profile type '%s' is unknown",
job->id.c_str(), job->profile.c_str());
return 0;
return 1;
}
int delay_job(double sleeptime,
double * remaining_time)
{
if (sleeptime < *remaining_time)
{
MSG_process_sleep(sleeptime);
*remaining_time = *remaining_time - sleeptime;
return 0;
}
else
{
XBT_INFO("Job has reached walltime");
MSG_process_sleep(*remaining_time);
*remaining_time = 0;
return -1;
}
}
int execute_job_process(int argc, char *argv[])
......@@ -423,11 +514,17 @@ int execute_job_process(int argc, char *argv[])
CleanExecuteProfileData * cleanup_data = new CleanExecuteProfileData;
cleanup_data->exec_process_args = args;
SIMIX_process_on_exit(MSG_process_self(), execute_profile_cleanup, cleanup_data);
if (execute_profile(args->context, job->profile, args->allocation, cleanup_data, &remaining_time) == 1)
job->return_code = execute_profile(args->context, job->profile, args->allocation, cleanup_data, &remaining_time);
if (job->return_code == 0)
{
XBT_INFO("Job %s finished in time", job->id.c_str());
XBT_INFO("Job %s finished in time (success)", job->id.c_str());
job->state = JobState::JOB_STATE_COMPLETED_SUCCESSFULLY;
}
else if (job->return_code > 0)
{
XBT_INFO("Job %s finished in time (failed)", job->id.c_str());
job->state = JobState::JOB_STATE_COMPLETED_FAILED;
}
else
{
XBT_INFO("Job %s had been killed (walltime %g reached)",
......@@ -556,7 +653,8 @@ int killer_process(int argc, char *argv[])
xbt_assert(job->state == JobState::JOB_STATE_RUNNING ||
job->state == JobState::JOB_STATE_COMPLETED_KILLED ||
job->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY,
job->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY ||
job->state == JobState::JOB_STATE_COMPLETED_FAILED,
"Bad kill: job %s is not running", job->id.c_str());
if (job->state == JobState::JOB_STATE_RUNNING)
......
......@@ -51,6 +51,15 @@ int execute_profile(BatsimContext *context,
CleanExecuteProfileData * cleanup_data,
double * remaining_time);
/**
* @brief Simulate the delay job profile (sleeping MSG process)
* @param[in] sleeptime The time to sleep
* @param[in,out] remaining_time The remaining amount of time before walltime
* @return 0 if enough time is available, -1 in the case of a timeout
*/
int delay_job(double sleeptime,
double * remaining_time);
/**
* @brief Is executed on execute_profile termination, allows to clean memory on kill
* @param[in] unknown An unknown argument (oops?)
......
......@@ -195,6 +195,24 @@ Profile::~Profile()
d = nullptr;
}
}
else if (type == ProfileType::SCHEDULER_SEND)
{
SchedulerSendProfileData * d = (SchedulerSendProfileData *) data;
if (d != nullptr)
{
delete d;
d = nullptr;
}
}
else if (type == ProfileType::SCHEDULER_RECV)
{
SchedulerRecvProfileData * d = (SchedulerRecvProfileData *) data;
if (d != nullptr)
{
delete d;
d = nullptr;
}
}
else
{
XBT_ERROR("Deletion of an unknown profile type (%d)", type);
......@@ -220,6 +238,13 @@ Profile *Profile::from_json(const std::string & profile_name,
error_prefix.c_str(), profile_name.c_str());
string profile_type = json_desc["type"].GetString();
int return_code = 0;
if (json_desc.HasMember("ret")) {
return_code = json_desc["ret"].GetInt();
}
profile->return_code = return_code;
if (profile_type == "delay")
{
profile->type = ProfileType::DELAY;
......@@ -311,11 +336,14 @@ Profile *Profile::from_json(const std::string & profile_name,
profile->type = ProfileType::SEQUENCE;
SequenceProfileData * data = new SequenceProfileData;
xbt_assert(json_desc.HasMember("nb"), "%s: profile '%s' has no 'nb' field",
error_prefix.c_str(), profile_name.c_str());
xbt_assert(json_desc["nb"].IsInt(), "%s: profile '%s' has a non-integral 'nb' field",
int repeat = 1;
if (json_desc.HasMember("nb")) {
xbt_assert(json_desc["nb"].IsInt(), "%s: profile '%s' has a non-integral 'nb' field",
error_prefix.c_str(), profile_name.c_str());
data->repeat = json_desc["nb"].GetInt();
repeat = json_desc["nb"].GetInt();
}
data->repeat = repeat;
xbt_assert(data->repeat > 0, "%s: profile '%s' has a non-strictly-positive 'nb' field (%d)",
error_prefix.c_str(), profile_name.c_str(), data->repeat);
......@@ -434,6 +462,47 @@ Profile *Profile::from_json(const std::string & profile_name,
error_prefix.c_str(), profile_name.c_str(), direction.c_str());
}
profile->data = data;
}
else if (profile_type == "send")
{
profile->type = ProfileType::SCHEDULER_SEND;
SchedulerSendProfileData * data = new SchedulerSendProfileData;
xbt_assert(json_desc.HasMember("msg"), "%s: profile '%s' has no 'msg' field",
error_prefix.c_str(), profile_name.c_str());
xbt_assert(json_desc["msg"].IsObject(), "%s: profile '%s' field 'msg' is no object",
error_prefix.c_str(), profile_name.c_str());
data->message.CopyFrom(json_desc["msg"], data->message.GetAllocator());
profile->data = data;
}
else if (profile_type == "recv")
{
profile->type = ProfileType::SCHEDULER_RECV;
SchedulerRecvProfileData * data = new SchedulerRecvProfileData;
data->regex = string(".*");
if (json_desc.HasMember("regex")) {
data->regex = json_desc["regex"].GetString();
}
data->on_success = string("");
if (json_desc.HasMember("success")) {
data->on_success = json_desc["success"].GetString();
}
data->on_failure = string("");
if (json_desc.HasMember("failure")) {
data->on_failure = json_desc["failure"].GetString();
}
data->on_timeout = string("");
if (json_desc.HasMember("timeout")) {
data->on_timeout = json_desc["timeout"].GetString();
}
profile->data = data;
}
else if (profile_type == "smpi")
......
......@@ -23,6 +23,8 @@ enum class ProfileType
,SEQUENCE //!< The profile is non-atomic: it is composed of a sequence of other profiles
,MSG_PARALLEL_HOMOGENEOUS_PFS_MULTIPLE_TIERS //!< The profile is a homogeneous MSG for complex parallel filesystem access. Its data is of type MsgParallelHomogeneousPFSMultipleTiersProfileData
,MSG_DATA_STAGING //!< The profile is a MSG for moving data between the pfs hosts. Its data is of type DataStagingProfileData
,SCHEDULER_SEND //!< The profile is a profile simulating a message sent to the scheduler. Its data is of type SchedulerSendProfileData
,SCHEDULER_RECV //!< The profile receives a message from the scheduler and can execute a profile based on a value comparison of the message. Its data is of type SchedulerRecvProfileData
};
/**
......@@ -38,6 +40,7 @@ struct Profile
ProfileType type; //!< The type of the profile
void * data; //!< The associated data
std::string json_description; //!< The JSON description of the profile
int return_code = 0; //!< The return code of this profile's execution (SUCCESS == 0)
/**
* @brief Creates a new-allocated Profile from a JSON description
......@@ -164,6 +167,26 @@ struct MsgDataStagingProfileData
Direction direction; //!< Whether data should be transfered to the HPST or from the HPST
};
/**
* @brief The data associated to SCHEDULER_SEND profiles
*/
struct SchedulerSendProfileData
{
rapidjson::Document message; //!< The message being sent to the scheduler
};
/**
* @brief The data associated to SCHEDULER_RECV profiles
*/
struct SchedulerRecvProfileData
{
std::string regex; //!< The regex which is tested for matching
std::string on_success; //!< The profile to execute if it matches
std::string on_failure; //!< The profile to execute if it does not match
std::string on_timeout; //!< The profile to execute if no message is in the buffer (i.e. the scheduler has not answered in time). Can be omitted which will result that the job will wait until its walltime is reached.
};
/**
* @brief Used to handles all the profiles of one workload
*/
......
......@@ -212,6 +212,7 @@ void JsonProtocolWriter::append_job_completed(const string & job_id,
const string & job_status,
const string & job_state,
const string & kill_reason,
int return_code,
double date)
{
/* {
......@@ -235,6 +236,7 @@ void JsonProtocolWriter::append_job_completed(const string & job_id,
data.AddMember("job_id", Value().SetString(job_id.c_str(), _alloc), _alloc);
data.AddMember("status", Value().SetString(job_status.c_str(), _alloc), _alloc);
data.AddMember("job_state", Value().SetString(job_state.c_str(), _alloc), _alloc);
data.AddMember("return_code", Value().SetInt(return_code), _alloc);
data.AddMember("kill_reason", Value().SetString(kill_reason.c_str(), _alloc), _alloc);
Value event(rapidjson::kObjectType);
......@@ -274,6 +276,36 @@ void JsonProtocolWriter::append_job_killed(const vector<string> & job_ids,
_events.PushBack(event, _alloc);
}
void JsonProtocolWriter::append_from_job_message(const string & job_id,
const Document & message,
double date)
{
/* {
"timestamp": 10.0,
"type": "FROM_JOB_MSG",
"data": {
"job_id": "w0!1",
"msg": "some_message"
}
} */
xbt_assert(date >= _last_date, "Date inconsistency");
_last_date = date;
_is_empty = false;
Value data(rapidjson::kObjectType);
data.AddMember("job_id",
Value().SetString(job_id.c_str(), _alloc), _alloc);
data.AddMember("msg", Value().CopyFrom(message, _alloc), _alloc);
Value event(rapidjson::kObjectType);
event.AddMember("timestamp", Value().SetDouble(date), _alloc);
event.AddMember("type", Value().SetString("FROM_JOB_MSG"), _alloc);
event.AddMember("data", data, _alloc);
_events.PushBack(event, _alloc);
}
void JsonProtocolWriter::append_resource_state_changed(const MachineRange & resources,
const string & new_state,
double date)
......@@ -364,6 +396,7 @@ JsonProtocolReader::JsonProtocolReader(BatsimContext *context) :
_type_to_handler_map["SUBMIT_JOB"] = &JsonProtocolReader::handle_submit_job;
_type_to_handler_map["SET_RESOURCE_STATE"] = &JsonProtocolReader::handle_set_resource_state;
_type_to_handler_map["NOTIFY"] = &JsonProtocolReader::handle_notify;
_type_to_handler_map["TO_JOB_MSG"] = &JsonProtocolReader::handle_to_job_msg;
}
JsonProtocolReader::~JsonProtocolReader()
......@@ -812,6 +845,47 @@ void JsonProtocolReader::handle_notify(int event_number,
(void) timestamp;
}
void JsonProtocolReader::handle_to_job_msg(int event_number,
double timestamp,
const Value &data_object)
{
(void) event_number; // Avoids a warning if assertions are ignored
/* {
"timestamp": 42.0,
"type": "TO_JOB_MSG",
"data": {
"job_id": "w!0",
"msg": "Some answer"
}
} */
xbt_assert(data_object.IsObject(), "Invalid JSON message: the 'data' value of event %d (TO_JOB_MSG) should be an object", event_number);
xbt_assert(data_object.HasMember("job_id"), "Invalid JSON message: the 'data' value of event %d (TO_JOB_MSG) should have a 'job_id' key", event_number);
const Value & job_id_value = data_object["job_id"];
xbt_assert(job_id_value.IsString(), "Invalid JSON message: in event %d (TO_JOB_MSG): ['data']['job_id'] should be a string", event_number);
string job_id = job_id_value.GetString();
xbt_assert(data_object.HasMember("msg"), "Invalid JSON msg: the 'data' value of event %d (TO_JOB_MSG) should have a 'msg' key", event_number);
const Value & msg_value = data_object["msg"];
xbt_assert(msg_value.IsString(), "Invalid JSON msg: in event %d (TO_JOB_MSG): ['data']['msg'] should be a string", event_number);
string msg = msg_value.GetString();
ToJobMessage * message = new ToJobMessage;
if (!identify_job_from_string(context, job_id, message->job_id))
{
xbt_assert(false, "Invalid JSON message: "
"Invalid job change job state received: The job identifier '%s' is not valid. "
"Job identifiers must be of the form [WORKLOAD_NAME!]JOB_ID. "
"If WORKLOAD_NAME! is omitted, WORKLOAD_NAME='static' is used. "
"Furthermore, the corresponding job must exist.", job_id.c_str());
}
message->message = msg;
send_message(timestamp, "server", IPMessageType::TO_JOB_MSG, (void *) message);
}
void JsonProtocolReader::handle_submit_job(int event_number,
double timestamp,
const Value &data_object)
......
......@@ -113,6 +113,7 @@ public:
const std::string & job_status,
const std::string & job_state,
const std::string & kill_reason,
int return_code,
double date) = 0;