Commit f05a526a authored by Millian Poquet's avatar Millian Poquet
Browse files

[code] server cleanup: data in struct

parent 3d70beea
......@@ -28,50 +28,33 @@ int server_process(int argc, char *argv[])
ServerProcessArguments * args = (ServerProcessArguments *) MSG_process_get_data(MSG_process_self());
BatsimContext * context = args->context;
int nb_completed_jobs = 0;
int nb_submitted_jobs = 0;
int nb_submitters = 0;
int nb_submitters_finished = 0;
int nb_workflow_submitters_finished = 0;
int nb_running_jobs = 0;
int nb_switching_machines = 0;
int nb_waiters = 0;
int nb_killers = 0;
bool sched_ready = true;
bool all_jobs_submitted_and_completed = false;
bool end_of_simulation_sent = false;
// Let's store some information about the submitters
struct Submitter
{
string mailbox;
bool should_be_called_back;
};
map<string, Submitter*> submitters;
// Let's store the origin of some jobs
map<JobIdentifier, Submitter*> origin_of_jobs;
// Let's store the origin of wait queries
map<std::pair<int,double>, Submitter*> origin_of_wait_queries;
ServerData * data = new ServerData;
data->context = context;
// Let's tell the Decision process that the simulation is about to begin (and that some data can be read from the data storage)
context->proto_writer->append_simulation_begins(context->machines.nb_machines(), context->config_file, MSG_get_clock());
// Let's tell the Decision process that the simulation is about to begin
// (and that some data can be read from the data storage)
context->proto_writer->append_simulation_begins(context->machines.nb_machines(),
context->config_file, MSG_get_clock());
RequestReplyProcessArguments * req_rep_args = new RequestReplyProcessArguments;
req_rep_args->context = context;
req_rep_args->send_buffer = context->proto_writer->generate_current_message(MSG_get_clock());
context->proto_writer->clear();
MSG_process_create("Scheduler REQ-REP", request_reply_scheduler_process, (void*)req_rep_args, MSG_host_self());
sched_ready = false;
MSG_process_create("Scheduler REQ-REP", request_reply_scheduler_process,
(void*)req_rep_args, MSG_host_self());
data->sched_ready = false;
// Simulation loop
while ((nb_submitters == 0) || (nb_submitters_finished < nb_submitters) ||
(nb_completed_jobs < nb_submitted_jobs) || (!sched_ready) ||
(nb_switching_machines > 0) || (nb_waiters > 0) || (nb_killers > 0) ||
(context->submission_sched_enabled && !context->submission_sched_finished))
while ((data->nb_submitters == 0) ||// To enter the loop
(data->nb_submitters_finished < data->nb_submitters) || // All submitters must have finished
(data->nb_completed_jobs < data->nb_submitted_jobs) || // All jobs must have finished
(!data->sched_ready) || // A scheduler answer is being injected into the simulation
(data->nb_switching_machines > 0) || // Some machines are switching state
(data->nb_waiters > 0) || // The scheduler requested to be called in the future
(data->nb_killers > 0) || // Some jobs are being killed
(context->submission_sched_enabled && // If dynamic job submission are enabled
!context->submission_sched_finished)) // The end of submissions must have been received
{
// Let's wait a message from a node or the request-reply process...
msg_task_t task_received = NULL;
......@@ -86,23 +69,24 @@ int server_process(int argc, char *argv[])
case IPMessageType::SUBMITTER_HELLO:
{
xbt_assert(task_data->data != nullptr);
xbt_assert(!all_jobs_submitted_and_completed,
xbt_assert(!data->all_jobs_submitted_and_completed,
"A new submitter said hello but all jobs have already been submitted and completed... Aborting.");
SubmitterHelloMessage * message = (SubmitterHelloMessage *) task_data->data;
xbt_assert(submitters.count(message->submitter_name) == 0,
xbt_assert(data->submitters.count(message->submitter_name) == 0,
"Invalid new submitter '%s': a submitter with the same name already exists!",
message->submitter_name.c_str());
nb_submitters++;
data->nb_submitters++;
Submitter * submitter = new Submitter;
ServerData::Submitter * submitter = new ServerData::Submitter;
submitter->mailbox = message->submitter_name;
submitter->should_be_called_back = message->enable_callback_on_job_completion;
submitters[message->submitter_name] = submitter;
data->submitters[message->submitter_name] = submitter;
XBT_INFO("New submitter said hello. Number of polite submitters: %d", nb_submitters);
XBT_INFO("New submitter said hello. Number of polite submitters: %d",
data->nb_submitters);
} break; // end of case SUBMITTER_HELLO
......@@ -111,21 +95,22 @@ int server_process(int argc, char *argv[])
xbt_assert(task_data->data != nullptr);
SubmitterByeMessage * message = (SubmitterByeMessage *) task_data->data;
xbt_assert(submitters.count(message->submitter_name) == 1);
delete submitters[message->submitter_name];
submitters.erase(message->submitter_name);
xbt_assert(data->submitters.count(message->submitter_name) == 1);
delete data->submitters[message->submitter_name];
data->submitters.erase(message->submitter_name);
nb_submitters_finished++;
data->nb_submitters_finished++;
if (message->is_workflow_submitter)
nb_workflow_submitters_finished++;
XBT_INFO("A submitted said goodbye. Number of finished submitters: %d", nb_submitters_finished);
data->nb_workflow_submitters_finished++;
XBT_INFO("A submitted said goodbye. Number of finished submitters: %d",
data->nb_submitters_finished);
if (!all_jobs_submitted_and_completed &&
nb_completed_jobs == nb_submitted_jobs &&
nb_submitters_finished == nb_submitters &&
if (!data->all_jobs_submitted_and_completed &&
data->nb_completed_jobs == data->nb_submitted_jobs &&
data->nb_submitters_finished == data->nb_submitters &&
(!context->submission_sched_enabled || context->submission_sched_finished))
{
all_jobs_submitted_and_completed = true;
data->all_jobs_submitted_and_completed = true;
XBT_INFO("It seems that all jobs have been submitted and completed!");
context->proto_writer->append_simulation_ends(MSG_get_clock());
......@@ -138,26 +123,26 @@ int server_process(int argc, char *argv[])
xbt_assert(task_data->data != nullptr);
JobCompletedMessage * message = (JobCompletedMessage *) task_data->data;
if (origin_of_jobs.count(message->job_id) == 1)
if (data->origin_of_jobs.count(message->job_id) == 1)
{
// Let's call the submitter which submitted the job back
SubmitterJobCompletionCallbackMessage * msg = new SubmitterJobCompletionCallbackMessage;
msg->job_id = message->job_id;
Submitter * submitter = origin_of_jobs.at(message->job_id);
ServerData::Submitter * submitter = data->origin_of_jobs.at(message->job_id);
dsend_message(submitter->mailbox, IPMessageType::SUBMITTER_CALLBACK, (void*) msg);
origin_of_jobs.erase(message->job_id);
data->origin_of_jobs.erase(message->job_id);
}
nb_running_jobs--;
xbt_assert(nb_running_jobs >= 0);
nb_completed_jobs++;
xbt_assert(nb_completed_jobs + nb_running_jobs <= nb_submitted_jobs);
data->nb_running_jobs--;
xbt_assert(data->nb_running_jobs >= 0);
data->nb_completed_jobs++;
xbt_assert(data->nb_completed_jobs + data->nb_running_jobs <= data->nb_submitted_jobs);
Job * job = context->workloads.job_at(message->job_id);
XBT_INFO("Job %s!%d COMPLETED. %d jobs completed so far",
job->workload->name.c_str(), job->number, nb_completed_jobs);
job->workload->name.c_str(), job->number, data->nb_completed_jobs);
string status = "UNKNOWN";
if (job->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY)
......@@ -167,12 +152,12 @@ int server_process(int argc, char *argv[])
context->proto_writer->append_job_completed(message->job_id.to_string(), status, MSG_get_clock());
if (!all_jobs_submitted_and_completed &&
nb_completed_jobs == nb_submitted_jobs &&
nb_submitters_finished == nb_submitters &&
if (!data->all_jobs_submitted_and_completed &&
data->nb_completed_jobs == data->nb_submitted_jobs &&
data->nb_submitters_finished == data->nb_submitters &&
(!context->submission_sched_enabled || context->submission_sched_finished))
{
all_jobs_submitted_and_completed = true;
data->all_jobs_submitted_and_completed = true;
XBT_INFO("It seems that all jobs have been submitted and completed!");
context->proto_writer->append_simulation_ends(MSG_get_clock());
......@@ -183,7 +168,7 @@ int server_process(int argc, char *argv[])
{
// Ignore all submissions if -k was specified and all workflows have completed
if ((context->workflows.size() != 0) && (context->terminate_with_last_workflow) &&
(nb_workflow_submitters_finished == context->workflows.size()))
(data->nb_workflow_submitters_finished == context->workflows.size()))
{
XBT_INFO("Ignoring Job due to -k command-line option");
break;
......@@ -192,13 +177,13 @@ int server_process(int argc, char *argv[])
xbt_assert(task_data->data != nullptr);
JobSubmittedMessage * message = (JobSubmittedMessage *) task_data->data;
xbt_assert(submitters.count(message->submitter_name) == 1);
xbt_assert(data->submitters.count(message->submitter_name) == 1);
Submitter * submitter = submitters.at(message->submitter_name);
ServerData::Submitter * submitter = data->submitters.at(message->submitter_name);
if (submitter->should_be_called_back)
{
xbt_assert(origin_of_jobs.count(message->job_id) == 0);
origin_of_jobs[message->job_id] = submitter;
xbt_assert(data->origin_of_jobs.count(message->job_id) == 0);
data->origin_of_jobs[message->job_id] = submitter;
}
// Let's retrieve the Job from memory (or add it into memory if it is dynamic)
......@@ -209,9 +194,9 @@ int server_process(int argc, char *argv[])
// Update control information
job->state = JobState::JOB_STATE_SUBMITTED;
++nb_submitted_jobs;
++data->nb_submitted_jobs;
XBT_INFO("Job %s SUBMITTED. %d jobs submitted so far",
message->job_id.to_string().c_str(), nb_submitted_jobs);
message->job_id.to_string().c_str(), data->nb_submitted_jobs);
string job_json_description, profile_json_description;
......@@ -294,7 +279,7 @@ int server_process(int argc, char *argv[])
job->state = JobState::JOB_STATE_SUBMITTED;
// Let's update global states
++nb_submitted_jobs;
++data->nb_submitted_jobs;
if (context->submission_sched_ack)
{
......@@ -320,7 +305,7 @@ int server_process(int argc, char *argv[])
Job * job = context->workloads.job_at(message->job_id);
job->state = JobState::JOB_STATE_REJECTED;
nb_completed_jobs++;
data->nb_completed_jobs++;
XBT_INFO("Job %d (workload=%s) has been rejected",
job->number, job->workload->name.c_str());
......@@ -347,7 +332,7 @@ int server_process(int argc, char *argv[])
}
MSG_process_create("killer_process", killer_process, (void *) args, MSG_host_self());
++nb_killers;
++data->nb_killers;
} break; // end of case SCHED_KILL_JOB
case IPMessageType::SCHED_CALL_ME_LATER:
......@@ -355,14 +340,17 @@ int server_process(int argc, char *argv[])
xbt_assert(task_data->data != nullptr);
CallMeLaterMessage * message = (CallMeLaterMessage *) task_data->data;
xbt_assert(message->target_time > MSG_get_clock(), "You asked to be awaken in the past! (you ask: %f, it is: %f)", message->target_time, MSG_get_clock());
xbt_assert(message->target_time > MSG_get_clock(),
"You asked to be awaken in the past! (you ask: %f, it is: %f)",
message->target_time, MSG_get_clock());
WaiterProcessArguments * args = new WaiterProcessArguments;
args->target_time = message->target_time;
string pname = "waiter " + to_string(message->target_time);
MSG_process_create(pname.c_str(), waiter_process, (void*) args, context->machines.master_machine()->host);
++nb_waiters;
MSG_process_create(pname.c_str(), waiter_process, (void*) args,
context->machines.master_machine()->host);
++data->nb_waiters;
} break; // end of case SCHED_CALL_ME_LATER
case IPMessageType::PSTATE_MODIFICATION:
......@@ -427,7 +415,7 @@ int server_process(int argc, char *argv[])
string pname = "switch ON " + to_string(machine_id);
MSG_process_create(pname.c_str(), switch_off_machine_process, (void*)args, machine->host);
++nb_switching_machines;
++data->nb_switching_machines;
}
else
XBT_ERROR("Switching from a communication pstate to an invalid pstate on machine %d ('%s') : %d -> %d",
......@@ -448,7 +436,7 @@ int server_process(int argc, char *argv[])
string pname = "switch OFF " + to_string(machine_id);
MSG_process_create(pname.c_str(), switch_on_machine_process, (void*)args, machine->host);
++nb_switching_machines;
++data->nb_switching_machines;
}
else
XBT_ERROR("Machine %d ('%s') has an invalid pstate : %d", machine->id, machine->name.c_str(), curr_pstate);
......@@ -476,8 +464,8 @@ int server_process(int argc, char *argv[])
job->state = JobState::JOB_STATE_RUNNING;
nb_running_jobs++;
xbt_assert(nb_running_jobs <= nb_submitted_jobs);
data->nb_running_jobs++;
xbt_assert(data->nb_running_jobs <= data->nb_submitted_jobs);
if (!context->allow_time_sharing)
{
......@@ -536,12 +524,12 @@ int server_process(int argc, char *argv[])
case IPMessageType::WAITING_DONE:
{
context->proto_writer->append_requested_call(MSG_get_clock());
--nb_waiters;
--data->nb_waiters;
} break; // end of case WAITING_DONE
case IPMessageType::SCHED_READY:
{
sched_ready = true;
data->sched_ready = true;
} break; // end of case SCHED_READY
case IPMessageType::SCHED_WAIT_ANSWER:
......@@ -587,7 +575,7 @@ int server_process(int argc, char *argv[])
MSG_get_clock());
}
--nb_switching_machines;
--data->nb_switching_machines;
} break; // end of case SWITCHED_ON
case IPMessageType::SWITCHED_OFF:
......@@ -612,7 +600,7 @@ int server_process(int argc, char *argv[])
MSG_get_clock());
}
--nb_switching_machines;
--data->nb_switching_machines;
} break; // end of case SWITCHED_ON
case IPMessageType::SCHED_TELL_ME_ENERGY:
......@@ -643,10 +631,10 @@ int server_process(int argc, char *argv[])
if (job->state == JobState::JOB_STATE_COMPLETED_KILLED &&
job->kill_reason == "Killed from killer_process (probably requested by the decision process)")
{
nb_running_jobs--;
xbt_assert(nb_running_jobs >= 0);
nb_completed_jobs++;
xbt_assert(nb_completed_jobs + nb_running_jobs <= nb_submitted_jobs);
data->nb_running_jobs--;
xbt_assert(data->nb_running_jobs >= 0);
data->nb_completed_jobs++;
xbt_assert(data->nb_completed_jobs + data->nb_running_jobs <= data->nb_submitted_jobs);
really_killed_job_ids_str.push_back(job_id.to_string());
}
......@@ -657,14 +645,14 @@ int server_process(int argc, char *argv[])
boost::algorithm::join(really_killed_job_ids_str, ",").c_str());
context->proto_writer->append_job_killed(job_ids_str, MSG_get_clock());
--nb_killers;
--data->nb_killers;
if (!all_jobs_submitted_and_completed &&
nb_completed_jobs == nb_submitted_jobs &&
nb_submitters_finished == nb_submitters &&
if (!data->all_jobs_submitted_and_completed &&
data->nb_completed_jobs == data->nb_submitted_jobs &&
data->nb_submitters_finished == data->nb_submitters &&
(!context->submission_sched_enabled || context->submission_sched_finished))
{
all_jobs_submitted_and_completed = true;
data->all_jobs_submitted_and_completed = true;
XBT_INFO("It seems that all jobs have been submitted and completed!");
context->proto_writer->append_simulation_ends(MSG_get_clock());
......@@ -679,7 +667,9 @@ int server_process(int argc, char *argv[])
delete task_data;
MSG_task_destroy(task_received);
if (sched_ready && !end_of_simulation_sent && !context->proto_writer->is_empty())
if (data->sched_ready &&
!data->end_of_simulation_sent &&
!context->proto_writer->is_empty())
{
RequestReplyProcessArguments * req_rep_args = new RequestReplyProcessArguments;
req_rep_args->context = context;
......@@ -687,18 +677,19 @@ int server_process(int argc, char *argv[])
context->proto_writer->clear();
MSG_process_create("Scheduler REQ-REP", request_reply_scheduler_process, (void*)req_rep_args, MSG_host_self());
sched_ready = false;
if (all_jobs_submitted_and_completed)
end_of_simulation_sent = true;
data->sched_ready = false;
if (data->all_jobs_submitted_and_completed)
data->end_of_simulation_sent = true;
}
} // end of while
XBT_INFO("Simulation is finished!");
bool simulation_is_completed = all_jobs_submitted_and_completed;
bool simulation_is_completed = data->all_jobs_submitted_and_completed;
(void) simulation_is_completed; // Avoids a warning if assertions are ignored
xbt_assert(simulation_is_completed, "Left simulation loop, but the simulation does NOT seem finished...");
delete data;
delete args;
return 0;
}
......@@ -5,6 +5,47 @@
#pragma once
#include <string>
#include <map>
#include "ipp.hpp"
struct BatsimContext;
/**
* @brief Data associated with the server_process process
*/
struct ServerData
{
/**
* @brief Data associated with the job submitters (used for callbacks)
*/
struct Submitter
{
std::string mailbox; //!< The Submitter mailbox
bool should_be_called_back; //!< Whether the submitter should be notified on events.
};
BatsimContext * context = nullptr; //!< The BatsimContext
int nb_completed_jobs = 0; //!< The number of completed jobs
int nb_submitted_jobs = 0; //!< The number of submitted jobs
int nb_submitters = 0; //!< The number of submitters
int nb_submitters_finished = 0; //!< The number of finished submitters
int nb_workflow_submitters_finished = 0; //!< The number of finished workflow submitters
int nb_running_jobs = 0; //!< The number of jobs being executed
int nb_switching_machines = 0; //!< The number of machines being switched
int nb_waiters = 0; //!< The number of pending CALL_ME_LATER waiters
int nb_killers = 0; //!< The number of killers
bool sched_ready = true; //!< Whether the scheduler can be called now
bool all_jobs_submitted_and_completed = false; //!< Whether all jobs (static and dynamic) have been submitted and completed.
bool end_of_simulation_sent = false; //!< Whether the SIMULATION_ENDS event has been sent to the scheduler.
std::map<std::string, Submitter*> submitters; //!< The submitters
std::map<JobIdentifier, Submitter*> origin_of_jobs; //!< Stores whether a Submitter must be notified on job completion
//map<std::pair<int,double>, Submitter*> origin_of_wait_queries;
};
/**
* @brief Process used to orchestrate the simulation
* @param[in] argc The number of arguments
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment