Commit ff912a0d authored by MERCIER Michael's avatar MERCIER Michael
Browse files

Big refactor for Jobs + BatTask

convert jobs id in JobIdentifier
make BatTask own the profile
parent c1f846b0
......@@ -492,7 +492,7 @@ void PajeTracer::add_job_launching(const Job * job,
void PajeTracer::register_new_job(const Job *job)
{
xbt_assert(_jobs.find(job) == _jobs.end(),
"Cannot register new job %s: it already exists", job->id.c_str());
"Cannot register new job %s: it already exists", job->id.to_string().c_str());
const int buf_size = 256;
int nb_printed;
......@@ -503,14 +503,14 @@ void PajeTracer::register_new_job(const Job *job)
// Let's create a state value corresponding to this job
nb_printed = snprintf(buf, buf_size,
"%d %s%s %s \"%s\" %s\n",
DEFINE_ENTITY_VALUE, jobPrefix, job->id.c_str(), machineState,
job->id.c_str(), _colors[job->number % (int)_colors.size()].c_str());
DEFINE_ENTITY_VALUE, jobPrefix, job->id.to_string().c_str(), machineState,
job->id.to_string().c_str(), _colors[job->number % (int)_colors.size()].c_str());
xbt_assert(nb_printed < buf_size - 1,
"Writing error: buffer has been completely filled, some information might "
"have been lost. Please increase Batsim's output temporary buffers' size");
_wbuf->append_text(buf);
_jobs[job] = jobPrefix + job->id;
_jobs[job] = jobPrefix + job->id.to_string();
free(buf);
}
......@@ -575,7 +575,7 @@ void PajeTracer::add_job_kill(const Job *job, const MachineRange & used_machine_
// Let's add a kill event associated with the scheduler
nb_printed = snprintf(buf, buf_size,
"%d %lf %s %s \"%s\"\n",
NEW_EVENT, time, killEventKiller, killer, job->id.c_str());
NEW_EVENT, time, killEventKiller, killer, job->id.to_string().c_str());
xbt_assert(nb_printed < buf_size - 1,
"Writing error: buffer has been completely filled, some information might "
"have been lost. Please increase Batsim's output temporary buffers' size");
......@@ -590,7 +590,7 @@ void PajeTracer::add_job_kill(const Job *job, const MachineRange & used_machine_
nb_printed = snprintf(buf, buf_size,
"%d %lf %s %s%d \"%s\"\n",
NEW_EVENT, time, killEventMachine, machinePrefix, machine_id,
job->id.c_str());
job->id.to_string().c_str());
xbt_assert(nb_printed < buf_size - 1,
"Writing error: buffer has been completely filled, some information might "
"have been lost. Please increase Batsim's output temporary buffers' size");
......
......@@ -15,40 +15,10 @@
#include "machine_range.hpp"
struct BatsimContext;
struct Job;
struct BatTask;
/**
* @brief A simple structure used to identify one job
*/
struct JobIdentifier
{
/**
* @brief Creates a JobIdentifier
* @param[in] workload_name The workload name
* @param[in] job_number The job number
*/
JobIdentifier(const std::string & workload_name = "", int job_number = -1);
#include "jobs.hpp"
std::string workload_name; //!< The name of the workload the job belongs to
int job_number; //!< The job unique number inside its workload
/**
* @brief Returns a string representation of the JobIdentifier.
* @details Output format is WORKLOAD_NAME!JOB_NUMBER
* @return A string representation of the JobIdentifier.
*/
std::string to_string() const;
};
struct BatsimContext;
/**
* @brief Compares two JobIdentifier thanks to their string representations
* @param[in] ji1 The first JobIdentifier
* @param[in] ji2 The second JobIdentifier
* @return ji1.to_string() < ji2.to_string()
*/
bool operator<(const JobIdentifier & ji1, const JobIdentifier & ji2);
/**
* @brief Stores the different types of inter-process messages
......
......@@ -427,7 +427,7 @@ int batexec_job_launcher_process(int argc, char *argv[])
exec_args->context = context;
exec_args->allocation = alloc;
exec_args->notify_server_at_end = false;
string pname = "job" + job->id;
string pname = "job" + job->id.to_string();
msg_process_t process = MSG_process_create(pname.c_str(), execute_job_process,
(void*) exec_args,
context->machines[alloc->machine_ids.first_element()]->host);
......
......@@ -28,24 +28,13 @@ using namespace rapidjson;
XBT_LOG_NEW_DEFAULT_CATEGORY(jobs, "jobs"); //!< Logging
/**
* @brief Constructor for simple (leaf) task
*/
BatTask::BatTask(string profile_name, msg_task_t ptask, double computation_amount, double communication_amount)
{
this->profile_name = profile_name;
this->ptask = ptask;
this->computation_amount = computation_amount;
this->communication_amount = communication_amount;
}
/**
* @brief Constructor for composed task (with sub_tasks)
* @brief Constructor for simple task
*/
BatTask::BatTask(string profile_name, std::vector<BatTask*> sub_tasks)
{
this->profile_name = profile_name;
this->sub_tasks = sub_tasks;
BatTask::BatTask(Job * parent_job, Profile * profile) {
this->parent_job = parent_job;
this->profile = profile;
}
void BatTask::compute_leaf_progress()
......@@ -70,7 +59,7 @@ void BatTask::compute_leaf_progress()
this->current_task_progress_ratio = 1 - std::max(comm_ratio, comput_ratio);
}
BatTask * BatTask::compute_tasks_progress()
void BatTask::compute_tasks_progress()
{
if (this->ptask != nullptr)
{
......@@ -239,7 +228,7 @@ Job::~Job()
{
xbt_assert(execution_processes.size() == 0,
"Internal error: job %s has %d execution processes on destruction (should be 0).",
this->id.c_str(), (int)execution_processes.size());
this->id.to_string().c_str(), (int)execution_processes.size());
}
// Do NOT remove namespaces in the arguments (to avoid doxygen warnings)
......
......@@ -19,7 +19,41 @@
using namespace std;
class Profiles;
struct Profile;
class Workload;
struct Job;
/**
* @brief A simple structure used to identify one job
*/
struct JobIdentifier
{
/**
* @brief Creates a JobIdentifier
* @param[in] workload_name The workload name
* @param[in] job_number The job number
*/
JobIdentifier(const std::string & workload_name = "", int job_number = -1);
std::string workload_name; //!< The name of the workload the job belongs to
int job_number; //!< The job unique number inside its workload
/**
* @brief Returns a string representation of the JobIdentifier.
* @details Output format is WORKLOAD_NAME!JOB_NUMBER
* @return A string representation of the JobIdentifier.
*/
std::string to_string() const;
};
/**
* @brief Compares two JobIdentifier thanks to their string representations
* @param[in] ji1 The first JobIdentifier
* @param[in] ji2 The second JobIdentifier
* @return ji1.to_string() < ji2.to_string()
*/
bool operator<(const JobIdentifier & ji1, const JobIdentifier & ji2);
/**
* @brief Contains the different states a job can be in
......@@ -35,27 +69,29 @@ enum class JobState
,JOB_STATE_REJECTED //!< The job has been rejected by the scheduler.
};
/**
* @brief Internal batsim simulation taski: the job profile instanciation.
*/
struct BatTask
{
BatTask(string profile_name, msg_task_t ptask, double computation_amount, double communication_amount);
BatTask(string profile_name, std::vector<BatTask*> sub_tasks);
BatTask(Job * parent_job, Profile * profile);
std::string profile_name; //!< The job profile name. The corresponding profile tells how the job should be computed
Job * parent_job;
Profile * profile; //!< The job profile. The corresponding profile tells how the job should be computed
msg_task_t ptask = nullptr; //!< The final task to execute (only set for the leaf of the BatTask tree)
double computation_amount = 0;
double communication_amount = 0;
std::vector<BatTask*> sub_tasks; //!< List of sub task of this task to be executed sequentially or in parallel depending on the profile
vector<BatTask*> sub_tasks; //!< List of sub task of this task to be executed sequentially or in parallel depending on the profile
unsigned int current_task_index = -1; //!< index in the sub_tasks vector of the current task
double current_task_progress_ratio = 0; //!< give the progress of the current task from 0 to 1
BatTask * compute_tasks_progress();
void compute_tasks_progress();
private:
void compute_leaf_progress();
};
/**
* @brief Represents a job
*/
......@@ -65,7 +101,7 @@ struct Job
Workload * workload = nullptr; //!< The workload the job belongs to
int number; //!< The job unique number within its workload
std::string id; //!< The job unique identifier
JobIdentifier id; //!< The job unique identifier
std::string profile; //!< The job profile name. The corresponding profile tells how the job should be computed
Rational submission_time; //!< The job submission time: The time at which the becomes available
Rational walltime; //!< The job walltime: if the job is executed for more than this amount of time, it will be killed
......@@ -118,6 +154,7 @@ struct Job
const std::string & error_prefix = "Invalid JSON job");
};
/**
* @brief Compares job thanks to their submission times
* @param[in] a The first job
......
......@@ -39,24 +39,25 @@ int smpi_replay_process(int argc, char *argv[])
return 0;
}
int execute_task(BatsimContext *context,
const std::string & profile_name,
int execute_task(BatTask * btask,
BatsimContext *context,
const SchedulingAllocation * allocation,
CleanExecuteProfileData * cleanup_data,
double * remaining_time)
{
Workload * workload = context->workloads.at(allocation->job_id.workload_name);
Job * job = workload->jobs->at(allocation->job_id.job_number);
JobIdentifier job_id(workload->name, job->number);
Profile * profile = workload->profiles->at(profile_name);
int nb_res = job->required_nb_res;
Job * job = btask->parent_job;
Profile * profile = btask->profile;
int nb_res = btask->parent_job->required_nb_res;
// Init task
btask->parent_job = job;
if (profile->type == ProfileType::MSG_PARALLEL ||
profile->type == ProfileType::MSG_PARALLEL_HOMOGENEOUS ||
profile->type == ProfileType::MSG_PARALLEL_HOMOGENEOUS_PFS_MULTIPLE_TIERS ||
profile->type == ProfileType::MSG_DATA_STAGING)
{
int return_code = execute_msg_task(allocation, nb_res, job, remaining_time, profile, context, cleanup_data);
int return_code = execute_msg_task(btask, allocation, nb_res, remaining_time, context, cleanup_data);
if (return_code != 0)
{
return return_code;
......@@ -67,11 +68,19 @@ int execute_task(BatsimContext *context,
{
SequenceProfileData * data = (SequenceProfileData *) profile->data;
// initialize BatTask list
//btask->sub_tasks = vector<BatTask*>();
for (int i = 0; i < data->repeat; i++)
{
for (unsigned int j = 0; j < data->sequence.size(); j++)
{
int ret_last_profile = execute_task(context, data->sequence[j], allocation,
// conserve the index information in BatTask
btask->current_task_index = i * data->sequence.size() + j;
string task_name = "seq" + to_string(job->number) + "'" + job->profile + "'";
XBT_INFO("Creating sequential tasks '%s'", task_name.c_str());
BatTask * sub_btask = new BatTask(job, job->workload->profiles->at(data->sequence[j]));
btask->sub_tasks.push_back(sub_btask);
int ret_last_profile = execute_task(sub_btask, context, allocation,
cleanup_data, remaining_time);
if (ret_last_profile != 0)
{
......@@ -88,7 +97,7 @@ int execute_task(BatsimContext *context,
XBT_INFO("Sending message to the scheduler");
FromJobMessage * message = new FromJobMessage;
message->job_id = job_id;
message->job_id = job->id;
message->message.CopyFrom(data->message, message->message.GetAllocator());
send_message("server", IPMessageType::FROM_JOB_MSG, (void*)message);
......@@ -160,8 +169,7 @@ int execute_task(BatsimContext *context,
if (profile_to_execute != "")
{
XBT_INFO("Instaciate task from profile: %s", profile_to_execute.c_str());
int ret_last_profile = execute_task(context, profile_to_execute, allocation,
cleanup_data, remaining_time);
int ret_last_profile = execute_task(btask, context, allocation, cleanup_data, remaining_time);
if (ret_last_profile != 0)
{
return ret_last_profile;
......@@ -213,7 +221,7 @@ int execute_task(BatsimContext *context,
xbt_assert(nb_ranks == (int) job->smpi_ranks_to_hosts_mapping.size(),
"Invalid job %s: SMPI ranks_to_host mapping has an invalid size, as it should "
"use %d MPI ranks but the ranking states that there are %d ranks.",
job->id.c_str(), nb_ranks, (int) job->smpi_ranks_to_hosts_mapping.size());
job->id.to_string().c_str(), nb_ranks, (int) job->smpi_ranks_to_hosts_mapping.size());
for (int i = 0; i < nb_ranks; ++i)
{
......@@ -263,7 +271,7 @@ int execute_task(BatsimContext *context,
}
else
xbt_die("Cannot execute job %s: the profile type '%s' is unknown",
job->id.c_str(), job->profile.c_str());
job->id.to_string().c_str(), job->profile.c_str());
return 1;
}
......@@ -350,22 +358,24 @@ int execute_job_process(int argc, char *argv[])
cleanup_data->exec_process_args = args;
SIMIX_process_on_exit(MSG_process_self(), execute_task_cleanup, cleanup_data);
// Create root task
BatTask * root_task = new BatTask(job, workload->profiles->at(job->profile));
// Execute the process
job->return_code = execute_task(args->context, job->profile, args->allocation, cleanup_data, &remaining_time);
job->return_code = execute_task(root_task, args->context, args->allocation, cleanup_data, &remaining_time);
if (job->return_code == 0)
{
XBT_INFO("Job %s finished in time (success)", job->id.c_str());
XBT_INFO("Job %s finished in time (success)", job->id.to_string().c_str());
job->state = JobState::JOB_STATE_COMPLETED_SUCCESSFULLY;
}
else if (job->return_code > 0)
{
XBT_INFO("Job %s finished in time (failed)", job->id.c_str());
XBT_INFO("Job %s finished in time (failed)", job->id.to_string().c_str());
job->state = JobState::JOB_STATE_COMPLETED_FAILED;
}
else
{
XBT_INFO("Job %s had been killed (walltime %g reached)",
job->id.c_str(), (double) job->walltime);
job->id.to_string().c_str(), (double) job->walltime);
job->state = JobState::JOB_STATE_COMPLETED_KILLED;
job->kill_reason = "Walltime reached";
if (args->context->trace_schedule)
......@@ -374,13 +384,15 @@ int execute_job_process(int argc, char *argv[])
MSG_get_clock(), true);
}
}
// cleanup task
delete root_task;
args->context->machines.update_machines_on_job_end(job, args->allocation->machine_ids,
args->context);
job->runtime = MSG_get_clock() - job->starting_time;
if (job->runtime == 0)
{
XBT_WARN("Job '%s' computed in null time. Putting epsilon instead.", job->id.c_str());
XBT_WARN("Job '%s' computed in null time. Putting epsilon instead.", job->id.to_string().c_str());
job->runtime = Rational(1e-5);
}
......@@ -464,7 +476,7 @@ int killer_process(int argc, char *argv[])
job->state == JobState::JOB_STATE_COMPLETED_KILLED ||
job->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY ||
job->state == JobState::JOB_STATE_COMPLETED_FAILED,
"Bad kill: job %s is not running", job->id.c_str());
"Bad kill: job %s is not running", job->id.to_string().c_str());
// Store job progress in the message
message->jobs_progress[job_id] = job->compute_job_progress();
......@@ -490,11 +502,11 @@ int killer_process(int argc, char *argv[])
job->runtime = (Rational)MSG_get_clock() - job->starting_time;
xbt_assert(job->runtime >= 0, "Negative runtime of killed job '%s' (%g)!",
job->id.c_str(), (double)job->runtime);
job->id.to_string().c_str(), (double)job->runtime);
if (job->runtime == 0)
{
XBT_WARN("Killed job '%s' has a null runtime. Putting epsilon instead.",
job->id.c_str());
job->id.to_string().c_str());
job->runtime = Rational(1e-5);
}
......
......@@ -573,7 +573,7 @@ void Machine::display_machine(bool is_energy_used) const
vector<string> jobs_vector;
for (auto & job : jobs_being_computed)
{
jobs_vector.push_back(job->id);
jobs_vector.push_back(job->id.to_string());
}
string str = "Machine\n";
......@@ -628,7 +628,7 @@ string Machine::jobs_being_computed_as_string() const
for (auto & job : jobs_being_computed)
{
jobs_strings.push_back(job->id);
jobs_strings.push_back(job->id.to_string());
}
return boost::algorithm::join(jobs_strings, ", ");
......
......@@ -229,6 +229,7 @@ Profile *Profile::from_json(const std::string & profile_name,
(void) error_prefix; // Avoids a warning if assertions are ignored
Profile * profile = new Profile;
profile->name = profile_name;
xbt_assert(json_desc.IsObject(), "%s: profile '%s' value must be an object",
error_prefix.c_str(), profile_name.c_str());
......
......@@ -40,6 +40,7 @@ struct Profile
ProfileType type; //!< The type of the profile
void * data; //!< The associated data
std::string json_description; //!< The JSON description of the profile
std::string name; //!< the profile unique name
int return_code = 0; //!< The return code of this profile's execution (SUCCESS == 0)
/**
......
......@@ -247,14 +247,71 @@ void JsonProtocolWriter::append_job_completed(const string & job_id,
_events.PushBack(event, _alloc);
}
Value generate_task_tree(BatTask * task_tree, rapidjson::Document::AllocatorType & _alloc)
{
Value task(rapidjson::kObjectType);
// create task tree
if (task_tree->ptask != nullptr)
{
task.AddMember("profile", Value().SetString(task_tree->profile->name.c_str(), _alloc), _alloc);
task.AddMember("progress", Value().SetDouble(task_tree->current_task_progress_ratio), _alloc);
}
else
{
task.AddMember("profile", Value().SetString(task_tree->profile->name.c_str(), _alloc), _alloc);
task.AddMember("current_task_index", Value().SetInt(task_tree->current_task_index), _alloc);
BatTask * btask = task_tree->sub_tasks[task_tree->current_task_index];
task.AddMember("current_task", generate_task_tree(btask, _alloc), _alloc);
}
return task;
}
void JsonProtocolWriter::append_job_killed(const vector<string> & job_ids,
const map<string, BatTask *> job_progress,
double date)
{
/* {
/*
{
"timestamp": 10.0,
"type": "JOB_KILLED",
"data": {"job_ids": ["w0!1", "w0!2"]}
} */
"data": {
"job_ids": ["w0!1", "w0!2"],
"job_progress":
{
// simple job
"w0!1": {"profile": "my_simple_profile", "progress": 0.52},
// sequential job
"w0!2":
{
"profile": "my_sequential_profile",
"current_task_index": 3,
"current_task":
{
"profile": "my_simple_profile",
"progress": 0.52
}
},
// composed sequential job
"w0!3:
{
"profile": "my_composed_profile",
"current_task_index": 2,
"current_task":
{
"profile": "my_sequential_profile",
"current_task_index": 3,
"current_task":
{
"profile": "my_simple_profile",
"progress": 0.52
}
}
},
}
}
}
*/
xbt_assert(date >= _last_date, "Date inconsistency");
_last_date = date;
......@@ -266,9 +323,15 @@ void JsonProtocolWriter::append_job_killed(const vector<string> & job_ids,
Value jobs(rapidjson::kArrayType);
jobs.Reserve(job_ids.size(), _alloc);
for (const string & job_id : job_ids)
Value progress(rapidjson::kObjectType);
for (const string& job_id : job_ids)
{
jobs.PushBack(Value().SetString(job_id.c_str(), _alloc), _alloc);
// compute task progress tree
progress.AddMember(Value().SetString(job_id.c_str(), _alloc),
generate_task_tree(job_progress.at(job_id), _alloc), _alloc);
}
event.AddMember("data", Value().SetObject().AddMember("job_ids", jobs, _alloc), _alloc);
......@@ -521,7 +584,7 @@ void JsonProtocolReader::handle_reject_job(int event_number,
"Invalid JSON message: "
"Invalid rejection received: job %s cannot be rejected at the present time."
"For being rejected, a job must be submitted and not allocated yet.",
job->id.c_str());
job->id.to_string().c_str());
send_message(timestamp, "server", IPMessageType::SCHED_REJECT_JOB, (void*) message);
}
......
......@@ -123,6 +123,7 @@ public:
* @param[in] date The event date. Must be greater than or equal to the previous event.
*/
virtual void append_job_killed(const std::vector<std::string> & job_ids,
const std::map<std::string, BatTask *> job_progress,
double date) = 0;
/**
......@@ -257,6 +258,7 @@ public:
* @param[in] date The event date. Must be greater than or equal to the previous event.
*/
void append_job_killed(const std::vector<std::string> & job_ids,
const std::map<std::string, BatTask *> job_progress,
double date);
/**
......
......@@ -257,7 +257,7 @@ void server_on_job_submitted(ServerData * data,
XBT_INFO("GOT JOB: %s %d\n", message->job_id.workload_name.c_str(), message->job_id.job_number);
xbt_assert(data->context->workloads.job_exists(message->job_id));
Job * job = data->context->workloads.job_at(message->job_id);
job->id = message->job_id.to_string();
job->id = message->job_id;
// Update control information
job->state = JobState::JOB_STATE_SUBMITTED;
......@@ -276,7 +276,7 @@ void server_on_job_submitted(ServerData * data,
}
}
data->context->proto_writer->append_job_submitted(job->id, job_json_description,
data->context->proto_writer->append_job_submitted(job->id.to_string(), job_json_description,
profile_json_description, MSG_get_clock());
}
......@@ -479,7 +479,9 @@ void server_on_killing_done(ServerData * data,
vector<string> job_ids_str;
vector<string> really_killed_job_ids_str;
job_ids_str.reserve(message->jobs_ids.size());
map<string, BatTask *> jobs_progress_str;
// manage job Id list
for (const JobIdentifier & job_id : message->jobs_ids)
{
job_ids_str.push_back(job_id.to_string());
......@@ -495,13 +497,15 @@ void server_on_killing_done(ServerData * data,
really_killed_job_ids_str.push_back(job_id.to_string());
}
// compute job progress from BatTask tree in str
jobs_progress_str[job_id.to_string()] = message->jobs_progress[job_id];
}
XBT_INFO("Jobs {%s} have been killed (the following ones have REALLY been killed: {%s})",
boost::algorithm::join(job_ids_str, ",").c_str(),
boost::algorithm::join(really_killed_job_ids_str, ",").c_str());
data->context->proto_writer->append_job_killed(job_ids_str, MSG_get_clock());
data->context->proto_writer->append_job_killed(job_ids_str, jobs_progress_str, MSG_get_clock());
--data->nb_killers;
check_submitted_and_completed(data);
......@@ -569,7 +573,7 @@ void server_on_submit_job(ServerData * data,
Job * job = Job::from_json(message->job_description, workload,
"Invalid JSON job submitted by the scheduler");
workload->jobs->add_job(job);
job->id = JobIdentifier(workload->name, job->number).to_string();
job->id = JobIdentifier(workload->name, job->number);
// Let's parse the profile if needed
if (!workload->profiles->exists(job->profile))
......@@ -616,7 +620,7 @@ void server_on_submit_job(ServerData * data,
}
}
data->context->proto_writer->append_job_submitted(job->id, job_json_description,
data->context->proto_writer->append_job_submitted(job->id.to_string(), job_json_description,
profile_json_description,
MSG_get_clock());
}
......@@ -809,7 +813,7 @@ void server_on_execute_job(ServerData * data,
Job * job = data->context->workloads.job_at(allocation->job_id);
xbt_assert(job->state == JobState::JOB_STATE_SUBMITTED,
"Cannot execute job '%s': its state (%d) is not JOB_STATE_SUBMITTED.",
job->id.c_str(), job->state);
job->id.to_string().c_str(), job->state);
job->state = JobState::JOB_STATE_RUNNING;
......@@ -854,7 +858,7 @@ void server_on_execute_job(ServerData * data,
"Invalid job %s allocation. The job requires %d machines but only %d were given (%s). "
"Using a different number of machines is only allowed if a custom mapping is specified. "
"This mapping must specify which allocated machine each executor should use.",
job->id.c_str(), job->