Commit 5fb6c80c authored by Millian Poquet's avatar Millian Poquet
Browse files

Merge branch 'data_storage' of gitlab.inria.fr:batsim/batsim into data_storage

parents f5c628f1 777fc079
......@@ -57,6 +57,8 @@ the job ID of the job which just completed. This part is not mandatory, it depen
| 3+ | Z | Batsim->Sched | No content | Batsim tells the scheduler that the simulation is about to end (all jobs have been submitted and completed/rejected)
| 3+ | F | Batsim->Sched | MID1,MID2,MIDn | Batsim tells the scheduler that the given machines are in a failure state (crashed, no jobs can be computed on them). Each MIDk part can be a single machine ID or a closed interval MIDa-MIDb where MIDa <= MIDb
| 3+ | f | Batsim->Sched | MID1,MID2,MIDn | Batsim tells the scheduler that the given machines are no longer in a failure state (jobs can now be computed on them). Each MIDk part can be a single machine ID or a closed interval MIDa-MIDb where MIDa <= MIDb
| 4+ | Q | Batsim->Sched | SUB,REQ,TIME | Batsim queries the scheduler about potential waiting time for requested number of processors, for a given walltime. SUB is the submitter.
| 4+ | W | Sched->Batsim | SUB,REQ,TIME,WAIT | Scheduler notifies Batsim about potential waiting time for requested number of processors and walltime. SUB is the submitter.
# Message Examples #
......
......@@ -84,6 +84,12 @@ std::string ipMessageTypeToString(IPMessageType type)
case IPMessageType::SCHED_TELL_ME_ENERGY:
s = "SCHED_TELL_ME_ENERGY";
break;
case IPMessageType::SCHED_WAIT_ANSWER:
s = "SCHED_WAIT_ANSWER";
break;
case IPMessageType::WAIT_QUERY:
s = "WAIT_QUERY";
break;
case IPMessageType::SCHED_READY:
s = "SCHED_READY";
break;
......@@ -163,6 +169,16 @@ IPMessage::~IPMessage()
case IPMessageType::SCHED_TELL_ME_ENERGY:
{
} break;
case IPMessageType::WAIT_QUERY:
{
WaitQueryMessage * msg = (WaitQueryMessage *) data;
delete msg;
} break;
case IPMessageType::SCHED_WAIT_ANSWER:
{
SchedWaitAnswerMessage * msg = (SchedWaitAnswerMessage *) data;
delete msg;
} break;
case IPMessageType::SCHED_READY:
{
} break;
......
......@@ -58,6 +58,8 @@ enum class IPMessageType
,SCHED_NOP //!< SchedulerHandler -> Server. The scheduler handler tells the server a scheduling event occured (a NOP message).
,SCHED_NOP_ME_LATER //!< SchedulerHandler -> Server. The scheduler handler tells the server a scheduling event occured (a NOP_ME_LATTER message).
,SCHED_TELL_ME_ENERGY //!< SchedulerHandler -> Server. The scheduler handler tells the server a scheduling event occured (a TELL_ME_CONSUMED_ENERGY message).
,SCHED_WAIT_ANSWER //!< SchedulerHandler -> Server. The scheduler handler tells the server a scheduling event occured (a WAIT_ANSWER message).
,WAIT_QUERY //!< Server -> SchedulerHandler. The scheduler handler tells the server a scheduling event occured (a WAIT_ANSWER message).
,SCHED_READY //!< SchedulerHandler -> Server. The scheduler handler tells the server that the scheduler is ready (messages can be sent to it).
,WAITING_DONE //!< Waiter -> server. The waiter tells the server that the target time has been reached.
,SUBMITTER_HELLO //!< Submitter -> Server. The submitter tells it starts submitting to the server.
......@@ -153,6 +155,27 @@ struct NOPMeLaterMessage
double target_time; //!< The time at which Batsim should send a NOP message to the decision real process
};
/**
* @brief The content of the WaitQuery message
*/
struct WaitQueryMessage
{
std::string submitter_name; //!< The name of the submitter which submitted the job.
int nb_resources; //!< The number of resources for which we would like to know the waiting time
double processing_time; //!< The duration for which the resources would be used
};
/**
* @brief The content of the SchedWaitAnswer message
*/
struct SchedWaitAnswerMessage
{
std::string submitter_name; //!< The name of the submitter which submitted the job.
int nb_resources; //!< The number of resources for which we would like to know the waiting time
double processing_time; //!< The duration for which the resources would be used
double expected_time; //!< The expected waiting time supplied by the scheduler
};
/**
* @brief The content of the SwitchON message
*/
......
......@@ -135,6 +135,7 @@ int static_job_submitter_process(int argc, char *argv[])
static string submit_workflow_task_as_job(BatsimContext *context, string workflow_name, string submitter_name, Task *task);
static string wait_for_job_completion(string submitter_name);
static std::tuple<int,double,double> wait_for_query_answer(string submitter_name);
/* Ugly Global */
std::map<std::string, int> task_id_counters;
......@@ -159,7 +160,7 @@ int workflow_submitter_process(int argc, char *argv[])
const string submitter_name = args->workflow_name + "_submitter";
XBT_INFO("New Workflow submitter for workflow %s (start time = %lf)!",
args->workflow_name.c_str(),workflow->start_time);
args->workflow_name.c_str(),workflow->start_time);
/* Initializing my task_id counter */
task_id_counters[workflow->name] = 0;
......@@ -178,7 +179,7 @@ int workflow_submitter_process(int argc, char *argv[])
/* Wait until the workflow start-time */
if (workflow->start_time > MSG_get_clock()) {
XBT_INFO("Warning: already past workflow start time! (%lf)", workflow->start_time);
XBT_INFO("Warning: already past workflow start time! (%lf)", workflow->start_time);
}
MSG_process_sleep(MAX(0.0, workflow->start_time - MSG_get_clock()));
......@@ -199,14 +200,14 @@ int workflow_submitter_process(int argc, char *argv[])
/* Insert the task into the submitted_tasks map */
submitted_tasks[job_key] = task;
current_nb++;
current_nb++;
}
if(!submitted_tasks.empty()) /* we are done submitting tasks, wait for one to complete */
{
/* Wait for callback */
string completed_job_key = wait_for_job_completion(submitter_name);
current_nb--;
current_nb--;
//XBT_INFO("TASK # %s has completed!!!\n", completed_job_key.c_str());
......@@ -238,7 +239,7 @@ int workflow_submitter_process(int argc, char *argv[])
XBT_INFO("WORKFLOW_MAKESPAN %s %lf (depth = %d)\n", workflow->filename.c_str(), makespan, workflow->get_maximum_depth());
// This is a TERRIBLE exit, but the goal is to stop the simulation (don't keep simulated the workload beyond
// the worflow completion). This is much more brutal than the -k option. To be removed/commented-out later, but right
// now it saves a lot of time, and was obviously easy to implement.
// now it saves a lot of time, and was obviously easy to implement.
exit(0);
/* Goodbye */
......@@ -305,6 +306,19 @@ static string submit_workflow_task_as_job(BatsimContext *context, string workflo
msg->job_id.job_number = job_number;
send_message("server", IPMessageType::JOB_SUBMITTED, (void*)msg);
// HOWTO Test Wait Query
// WaitQueryMessage * message = new WaitQueryMessage;
// message->submitter_name = submitter_name;
// message->nb_resources = task->num_procs;
// message->processing_time = walltime;
// send_message("server", IPMessageType::WAIT_QUERY, (void*)message);
// HOWTO Test Answer
// std::tuple<int,double,double> answer;
// answer = wait_for_query_answer(submitter_name);
// XBT_INFO("Got my answer : %f", std::get<2>(answer));
(void)wait_for_query_answer; // Horrible hack to silence "unused" warning.
// Create an ID to return
string id_to_return = workload_name + "!" + std::to_string(job_number);
......@@ -329,9 +343,28 @@ static string wait_for_job_completion(string submitter_name) {
return notification_data->job_id.workload_name + "!" +
std::to_string(notification_data->job_id.job_number);
}
/**
* @brief TODO
* @param submitter_name TODO
* @return TODO
*/
static std::tuple<int,double,double> wait_for_query_answer(string submitter_name) {
msg_task_t task_notification = NULL;
IPMessage *task_notification_data;
MSG_task_receive(&(task_notification), submitter_name.c_str());
task_notification_data = (IPMessage *) MSG_task_get_data(task_notification);
SchedWaitAnswerMessage *res =
(SchedWaitAnswerMessage *) task_notification_data->data;
XBT_INFO("Returning : %d %f %f", res->nb_resources, res->processing_time, res->expected_time);
return {res->nb_resources, res->processing_time, res->expected_time};
}
int job_launcher_process(int argc, char *argv[])
{
(void) argc;
......
......@@ -459,6 +459,31 @@ int request_reply_scheduler_process(int argc, char *argv[])
send_message("server", IPMessageType::SCHED_TELL_ME_ENERGY);
} break; // End of case received_stamp == TELL_ME_CONSUMED_ENERGY
case ANSWER_WAIT:
{
xbt_assert(parts2.size() == 3, "Invalid event received ('%s'): messages to ask the waiting time must be composed of 3 parts separated by ':'",
event_string.c_str());
SchedWaitAnswerMessage * message = new SchedWaitAnswerMessage;
vector<string> parts3;
boost::split(parts3, parts2[2], boost::is_any_of(","), boost::token_compress_on);
// xbt_assert(parts3.size() == 3, "Invalid event received ('%s'): invalid wait answer message content ('%s'): it must be"
// " formated like R,T,W where R is the number of requested resource, T the requested walltime and W the waiting time",
// event_string.c_str(), parts2[2].c_str());
int nb_resources = std::stoi(parts3[1]);
double processing_time = std::stod(parts3[2]);
double expected_time = std::stod(parts3[3]);
message->submitter_name = parts3[0];
message->nb_resources = nb_resources;
message->processing_time = processing_time;
message->expected_time = expected_time;
send_message("server", IPMessageType::SCHED_WAIT_ANSWER, (void*) message);
} break; // End of case received_stamp == ANSWER_WAIT
default:
{
xbt_die("Invalid event received ('%s') : unhandled network stamp received ('%c')", event_string.c_str(), received_stamp);
......
......@@ -24,7 +24,9 @@ enum NetworkStamp : char
,NOP_ME_LATER = 'n' //!< Decision -> Batsim. The Decision real process wants to be awaken at a future simulation time
,TELL_ME_CONSUMED_ENERGY = 'E' //!< Decision -> Batsim. The Decision real process wants to know how much energy has been consumed on computing machines since the beginning of the simulation
,PSTATE_HAS_BEEN_SET = 'p' //!< Batsim -> Decision. Batsim acknowledges that the power state of one of several machines has been changed
,QUERY_WAIT = 'Q' //!< Batsim -> Decision. Batsim queries the Decision process on the waiting time to get a given amount of resources
,CONSUMED_ENERGY = 'e' //!< Batsim -> Decision. Batsim tells the Decision process how much energy has been used since the beginning of the simulation
,ANSWER_WAIT = 'W' //!< Decision -> Batsim. The Decision process answers to Batsim on the waiting time to get a given amount of resources
};
/**
......
......@@ -50,9 +50,12 @@ int uds_server_process(int argc, char *argv[])
map<string, Submitter*> submitters;
// Let's store the origin or some jobs
// Let's store the origin of some jobs
map<JobIdentifier, Submitter*> origin_of_jobs;
// Let's store the origin of wait queries
map<std::pair<int,double>, Submitter*> origin_of_wait_queries;
string send_buffer;
// Let's tell the Decision process that the simulation is about to begin (and that some data can be read from the data storage)
......@@ -430,6 +433,36 @@ int uds_server_process(int argc, char *argv[])
} break; // end of case SCHED_READY
case IPMessageType::SCHED_WAIT_ANSWER:
{
SchedWaitAnswerMessage * message = new SchedWaitAnswerMessage;
*message = *( (SchedWaitAnswerMessage *) task_data->data);
// Submitter * submitter = origin_of_wait_queries.at({message->nb_resources,message->processing_time});
dsend_message(message->submitter_name, IPMessageType::SCHED_WAIT_ANSWER, (void*) message);
// origin_of_wait_queries.erase({message->nb_resources,message->processing_time});
} break; // end of case SCHED_WAIT_ANSWER
case IPMessageType::WAIT_QUERY:
{
WaitQueryMessage * message = (WaitQueryMessage *) task_data->data;
// XBT_INFO("received : %s , %s\n", to_string(message->nb_resources).c_str(), to_string(message->processing_time).c_str());
send_buffer += "|" + std::to_string(MSG_get_clock()) + ":Q:"
+ message->submitter_name.c_str() + ","
+ to_string(message->nb_resources).c_str() + ","
+ boost::lexical_cast<string>(message->processing_time).c_str();
// XBT_INFO("INFO!!! Message to send to scheduler : '%s'", send_buffer.c_str());
//Submitter * submitter = submitters.at(message->submitter_name);
//origin_of_wait_queries[{message->nb_resources,message->processing_time}] = submitter;
} break; // end of case WAIT_QUERY
case IPMessageType::SWITCHED_ON:
{
xbt_assert(task_data->data != nullptr);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment