Commit eb030606 authored by Millian Poquet's avatar Millian Poquet
Browse files

[code] cosmetics + refactor + doc

parent 9002f05c
......@@ -301,7 +301,6 @@ add_test(kill_progress
-bod /tmp/batsim_tests/kill_progress
-bwd ${CMAKE_SOURCE_DIR})
add_test(pybatsim_tests
${CMAKE_SOURCE_DIR}/tools/experiments/execute_instances.py
${CMAKE_SOURCE_DIR}/test/pybatsim_tests.yaml
......
......@@ -503,8 +503,9 @@ void PajeTracer::register_new_job(const Job *job)
// Let's create a state value corresponding to this job
nb_printed = snprintf(buf, buf_size,
"%d %s%s %s \"%s\" %s\n",
DEFINE_ENTITY_VALUE, jobPrefix, job->id.to_string().c_str(), machineState,
job->id.to_string().c_str(), _colors[job->number % (int)_colors.size()].c_str());
DEFINE_ENTITY_VALUE, jobPrefix, job->id.to_string().c_str(),
machineState, job->id.to_string().c_str(),
_colors[job->number % (int)_colors.size()].c_str());
xbt_assert(nb_printed < buf_size - 1,
"Writing error: buffer has been completely filled, some information might "
"have been lost. Please increase Batsim's output temporary buffers' size");
......
......@@ -201,8 +201,8 @@ struct SwitchMessage
*/
struct KillingDoneMessage
{
std::vector<JobIdentifier> jobs_ids; //!< The IDs of the jobs which have been killed
std::map<JobIdentifier, BatTask *> jobs_progress; //!< Jobs list (recursive) amount of work done when killed
std::vector<JobIdentifier> jobs_ids; //!< The IDs of the jobs whose kill has been requested
std::map<JobIdentifier, BatTask *> jobs_progress; //!< Stores the progress of the jobs that have really been killed.
};
/**
......
......@@ -29,77 +29,74 @@ using namespace rapidjson;
XBT_LOG_NEW_DEFAULT_CATEGORY(jobs, "jobs"); //!< Logging
/**
* @brief Constructor for simple task
*/
BatTask::BatTask(Job * parent_job, Profile * profile) {
this->parent_job = parent_job;
this->profile = profile;
BatTask::BatTask(Job * parent_job, Profile * profile) :
parent_job(parent_job),
profile(profile)
{
}
BatTask::~BatTask() {
for (auto &sub_btask : this->sub_tasks) {
BatTask::~BatTask()
{
for (auto & sub_btask : this->sub_tasks)
{
delete sub_btask;
sub_btask = nullptr;
}
}
void BatTask::compute_leaf_progress()
{
xbt_assert(this->sub_tasks.empty(), "Leaf should not contains sub tasks");
xbt_assert(sub_tasks.empty(), "Leaves should not contain sub tasks");
if (profile->is_parallel_task())
{
xbt_assert(ptask != nullptr, "Internal error");
// MSG task
if (this->ptask != nullptr) {
// WARNING: This is not returning the flops amount but the work quantity
//remaining from 1 (not started) to 0 (done)
this->current_task_progress_ratio = 1 - MSG_task_get_flops_amount(this->ptask);
// WARNING: This is not returning the flops amount but the remainin quantity of work
// from 1 (not started yet) to 0 (completely finished)
current_task_progress_ratio = 1 - MSG_task_get_flops_amount(ptask);
}
// delay task
else if (this->delay_task_start != -1)
else if (profile->type == ProfileType::DELAY)
{
double runtime = MSG_get_clock() - this->delay_task_start;
// manage empty delay job (why?!)
if (this->delay_task_required == 0)
xbt_assert(delay_task_start != -1, "Internal error");
double runtime = MSG_get_clock() - delay_task_start;
// Manages empty delay job (why?!)
if (delay_task_required == 0)
{
this->current_task_progress_ratio = 1;
current_task_progress_ratio = 1;
}
else
{
this->current_task_progress_ratio = runtime / this->delay_task_required;
current_task_progress_ratio = runtime / delay_task_required;
}
}
// Not implemented
else {
XBT_WARN("computing the progress of this type of task is not implemented");
else
{
XBT_WARN("Computing the progress of %s profiles is not implemented.",
profile_type_to_string(profile->type).c_str());
}
}
void BatTask::compute_tasks_progress()
{
if (this->sub_tasks.empty())
if (profile->type == ProfileType::SEQUENCE)
{
this->compute_leaf_progress();
sub_tasks[current_task_index]->compute_tasks_progress();
}
else
{
// compute only for current sequential task
sub_tasks[current_task_index]->compute_tasks_progress();
this->compute_leaf_progress();
}
}
/**
* @brief Return the BatTask tree with progress
*/
BatTask* Job::compute_job_progress()
{
if (this->task != nullptr) {
this->task->compute_tasks_progress();
}
return this->task;
xbt_assert(task != nullptr, "Internal error");
task->compute_tasks_progress();
return task;
}
......@@ -116,12 +113,12 @@ Jobs::~Jobs()
}
}
void Jobs::setProfiles(Profiles *profiles)
void Jobs::set_profiles(Profiles *profiles)
{
_profiles = profiles;
}
void Jobs::setWorkload(Workload *workload)
void Jobs::set_workload(Workload *workload)
{
_workload = workload;
}
......@@ -248,19 +245,20 @@ Job::~Job()
xbt_assert(execution_processes.size() == 0,
"Internal error: job %s has %d execution processes on destruction (should be 0).",
this->id.to_string().c_str(), (int)execution_processes.size());
if (this->task != nullptr)
if (task != nullptr)
{
delete this->task;
this->task = nullptr;
delete task;
task = nullptr;
}
}
bool Job::is_complete()
bool Job::is_complete() const
{
return (this->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY
|| this->state == JobState::JOB_STATE_COMPLETED_KILLED
|| this->state == JobState::JOB_STATE_COMPLETED_FAILED
|| this->state == JobState::JOB_STATE_COMPLETED_WALLTIME_REACHED);
return (state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY) ||
(state == JobState::JOB_STATE_COMPLETED_KILLED) ||
(state == JobState::JOB_STATE_COMPLETED_FAILED) ||
(state == JobState::JOB_STATE_COMPLETED_WALLTIME_REACHED);
}
// Do NOT remove namespaces in the arguments (to avoid doxygen warnings)
......
......@@ -60,47 +60,64 @@ bool operator<(const JobIdentifier & ji1, const JobIdentifier & ji2);
*/
enum class JobState
{
JOB_STATE_NOT_SUBMITTED //!< The job exists but cannot be scheduled yet.
,JOB_STATE_SUBMITTED //!< The job has been submitted, it can now be scheduled.
,JOB_STATE_RUNNING //!< The job has been scheduled and is currently being processed.
,JOB_STATE_COMPLETED_SUCCESSFULLY //!< The job execution finished before its walltime successfully.
,JOB_STATE_COMPLETED_FAILED //!< The job execution finished before its walltime but the job failed.
,JOB_STATE_COMPLETED_WALLTIME_REACHED //!< The job has reached his walltime and have been killed
,JOB_STATE_COMPLETED_KILLED //!< The job has been killed.
,JOB_STATE_REJECTED //!< The job has been rejected by the scheduler.
JOB_STATE_NOT_SUBMITTED //!< The job exists but cannot be scheduled yet.
,JOB_STATE_SUBMITTED //!< The job has been submitted, it can now be scheduled.
,JOB_STATE_RUNNING //!< The job has been scheduled and is currently being processed.
,JOB_STATE_COMPLETED_SUCCESSFULLY //!< The job execution finished before its walltime successfully.
,JOB_STATE_COMPLETED_FAILED //!< The job execution finished before its walltime but the job failed.
,JOB_STATE_COMPLETED_WALLTIME_REACHED //!< The job has reached its walltime and has been killed.
,JOB_STATE_COMPLETED_KILLED //!< The job has been killed.
,JOB_STATE_REJECTED //!< The job has been rejected by the scheduler.
};
/**
* @brief Internal batsim simulation task: the job profile instantiation.
* @brief Internal Batsim simulation task (corresponds to a job profile instantiation).
* @details Please remark that this type is recursive since profiles can be composed.
*/
struct BatTask
{
/**
* brief Construct the batTask and store parent job and profile
* @param parent_job The parent job that own this task
* @param profile The task profile to be executed
* @brief BatTask Constructs a batTask and stores the associated job and profile
* @param[in] parent_job The job that owns the task
* @param[in] profile The profile that corresponds to the task
*/
BatTask(Job * parent_job, Profile * profile);
/**
* @brief BatTask destructor
* @details Recursively cleans subtasks
*/
~BatTask();
Job * parent_job; //!< The parent job that own this task
/**
* @brief Computes the current progress of a task
* @details This function does recursive calls if needed (composed tasks).
* compute_leaf_progress is called on leaves.
*/
void compute_tasks_progress();
private:
/**
* @brief Compute the progress of a leaf task
*/
void compute_leaf_progress();
public:
Job * parent_job; //!< The parent job that owns this task
Profile * profile; //!< The task profile. The corresponding profile tells how the job should be computed
// Manage MSG profiles
msg_task_t ptask = nullptr; //!< The final task to execute (only set for the leaf of the BatTask tree)
msg_task_t ptask = nullptr; //!< The final task to execute (only set for BatTask leaves with MSG profiles)
// manage Delay profile
double delay_task_start = -1; //!< Keep delay task starting time in order to compute progress afterwards
double delay_task_required = -1; //!< Keep delay task time requirement
double delay_task_start = -1; //!< Stores when the task started its execution, in order to compute its progress afterwards (only set for BatTask leaves with delay profiles)
double delay_task_required = -1; //!< Stores how long delay tasks should last (only set for BatTask leaves with delay profiles)
// manage sequential profile
vector<BatTask*> sub_tasks; //!< List of sub task of this task to be executed sequentially or in parallel depending on the profile
unsigned int current_task_index = -1; //!< index in the sub_tasks vector of the current task
double current_task_progress_ratio = 0; //!< give the progress of the current task from 0 to 1
void compute_tasks_progress(); //!< fill up the progress field (current_task_*) for this task and his sub tasks
private:
void compute_leaf_progress(); //!< Helper function for compute_task_progress
vector<BatTask*> sub_tasks; //!< List of sub tasks that must be executed sequentially. Only set for BatTask non-leaves with sequential profiles at the moment, but it may be used for parallel composition in the future.
unsigned int current_task_index = -1; //!< Index of the task that is currently being executed in the sub_tasks vector. Only set for BatTask non-leaves with sequential profiles.
double current_task_progress_ratio = 0; //!< Gives the progress of the current task from 0 to 1. Only set for BatTask non-leaves with sequential profiles.
};
......@@ -109,39 +126,43 @@ struct BatTask
*/
struct Job
{
/**
* @brief Destructor
*/
~Job();
// Batsim internals
Workload * workload = nullptr; //!< The workload the job belongs to
int number; //!< The job unique number within its workload
JobIdentifier id; //!< The job unique identifier
std::string profile; //!< The job profile name. The corresponding profile tells how the job should be computed
Rational submission_time; //!< The job submission time: The time at which the becomes available
Rational walltime; //!< The job walltime: if the job is executed for more than this amount of time, it will be killed
int required_nb_res; //!< The number of resources the job is requested to be executed on
std::string kill_reason; //!< If the job has been killed, the kill reason is stored in this variable
bool kill_requested = false; //!< Whether the job kill has been requested
std::set<msg_process_t> execution_processes; //!< The processes involved in running the job
BatTask * task = nullptr; //!< The root task be executed by this job (profile instantiation).
std::string json_description; //!< The JSON description of the job
long double consumed_energy; //!< The sum, for each machine on which the job has been allocated, of the consumed energy (in Joules) during the job execution time (consumed_energy_after_job_completion - consumed_energy_before_job_start)
std::set<msg_process_t> execution_processes; //!< The processes involved in running the job
std::deque<std::string> incoming_message_buffer; //!< The buffer for incoming messages from the scheduler.
Rational starting_time; //!< The time at which the job starts to be executed.
Rational runtime; //!< The amount of time during which the job has been executed
// Scheduler allocation
MachineRange allocation; //!< The machines on which the job has been executed.
std::vector<int> smpi_ranks_to_hosts_mapping; //!< If the job uses a SMPI profile, stores which host number each MPI rank should use. These numbers must be in [0,required_nb_res[.
// Current state
JobState state; //!< The current state of the job
int return_code = -1; //!< The return code of the job
Rational starting_time; //!< The time at which the job starts to be executed.
Rational runtime; //!< The amount of time during which the job has been executed.
std::string kill_reason; //!< If the job has been killed, the kill reason is stored in this variable
bool kill_requested = false; //!< Whether the job kill has been requested
long double consumed_energy; //!< The sum, for each machine on which the job has been allocated, of the consumed energy (in Joules) during the job execution time (consumed_energy_after_job_completion - consumed_energy_before_job_start)
BatTask * task = nullptr; //!< The task to be executed by this job
// User inputs
std::string profile; //!< The job profile name. The corresponding profile tells how the job should be computed
Rational submission_time; //!< The job submission time: The time at which the becomes available
Rational walltime; //!< The job walltime: if the job is executed for more than this amount of time, it will be killed
int required_nb_res; //!< The number of resources the job is requested to be executed on
int return_code = -1; //!< The return code of the job
public:
/**
* @brief compute the task progression of this job
* @return the task tree with progress values filled up
* @brief Computes the task progression of this job
* @return The task progress tree with filled-up associated values
*/
BatTask * compute_job_progress();
......@@ -169,11 +190,10 @@ struct Job
Workload * workload,
const std::string & error_prefix = "Invalid JSON job");
/**
* @brief Check if a job is complete (successfully or not)
* @return true if the job is complete , false if the job has not
* started
* @brief Checks whether a job is complete (regardless of the job success)
* @return true if the job is complete (=has started then finished), false otherwise.
*/
bool is_complete();
bool is_complete() const;
};
......@@ -212,13 +232,13 @@ public:
* @brief Sets the profiles which are associated to the Jobs
* @param[in] profiles The profiles
*/
void setProfiles(Profiles * profiles);
void set_profiles(Profiles * profiles);
/**
* @brief Sets the Workload within which this Jobs instance exist
* @param[in] workload The Workload
*/
void setWorkload(Workload * workload);
void set_workload(Workload * workload);
/**
* @brief Loads the jobs from a JSON document
......
......@@ -39,19 +39,10 @@ int smpi_replay_process(int argc, char *argv[])
return 0;
}
/**
* @brief Execute a BatTask recursively regarding on its profile type
* @param[in,out] btask the task to execute
* @param[in] context usefull information about Batsim context
* @param[in] allocation the host to execute the task to
* @param[in,out] cleanup_data to give to simgrid cleanup hook
* @param[in,out] remaining_time remaining time of the current task
* @return the profile return code if successful, -1 if walltime reached
*/
int execute_task(BatTask * btask,
BatsimContext *context,
const SchedulingAllocation * allocation,
CleanExecuteProfileData * cleanup_data,
CleanExecuteTaskData * cleanup_data,
double * remaining_time)
{
Job * job = btask->parent_job;
......@@ -61,42 +52,49 @@ int execute_task(BatTask * btask,
// Init task
btask->parent_job = job;
if (profile->type == ProfileType::MSG_PARALLEL ||
profile->type == ProfileType::MSG_PARALLEL_HOMOGENEOUS ||
profile->type == ProfileType::MSG_PARALLEL_HOMOGENEOUS_PFS_MULTIPLE_TIERS ||
profile->type == ProfileType::MSG_DATA_STAGING)
if (profile->is_parallel_task())
{
int return_code = execute_msg_task(btask, allocation, nb_res, remaining_time, context, cleanup_data);
int return_code = execute_msg_task(btask, allocation, nb_res, remaining_time,
context, cleanup_data);
if (return_code != 0)
{
return return_code;
}
return profile->return_code;
}
else if (profile->type == ProfileType::SEQUENCE)
{
SequenceProfileData * data = (SequenceProfileData *) profile->data;
// initialize BatTask list
//btask->sub_tasks = vector<BatTask*>();
for (int i = 0; i < data->repeat; i++)
// (Sequences can be repeated several times)
for (int sequence_iteration = 0; sequence_iteration < data->repeat; sequence_iteration++)
{
for (unsigned int j = 0; j < data->sequence.size(); j++)
for (unsigned int profile_index_in_sequence = 0;
profile_index_in_sequence < data->sequence.size();
profile_index_in_sequence++)
{
// conserve the index information in BatTask
btask->current_task_index = i * data->sequence.size() + j;
// Traces how the execution is going so that progress can be retrieved if needed
btask->current_task_index = sequence_iteration * data->sequence.size() +
profile_index_in_sequence;
BatTask * sub_btask = new BatTask(job,
job->workload->profiles->at(data->sequence[profile_index_in_sequence]));
btask->sub_tasks.push_back(sub_btask);
string task_name = "seq" + to_string(job->number) + "'" + job->profile + "'";
XBT_INFO("Creating sequential tasks '%s'", task_name.c_str());
BatTask * sub_btask = new BatTask(job, job->workload->profiles->at(data->sequence[j]));
btask->sub_tasks.push_back(sub_btask);
int ret_last_profile = execute_task(sub_btask, context, allocation,
cleanup_data, remaining_time);
cleanup_data, remaining_time);
// The whole sequence fails if a subtask fails
if (ret_last_profile != 0)
{
return ret_last_profile;
}
}
}
return profile->return_code;
}
else if (profile->type == ProfileType::SCHEDULER_SEND)
......@@ -133,7 +131,6 @@ int execute_task(BatTask * btask,
XBT_INFO("Waiting for message from scheduler");
while (true)
{
if (do_delay_task(data->polltime, remaining_time) == -1)
{
return -1;
......@@ -280,36 +277,36 @@ int execute_task(BatTask * btask,
int do_delay_task(double sleeptime, double * remaining_time)
{
if (sleeptime < *remaining_time)
{
XBT_INFO("Sleeping the whole task length");
MSG_process_sleep(sleeptime);
XBT_INFO("Sleeping done");
*remaining_time = *remaining_time - sleeptime;
return 0;
}
else
{
XBT_INFO("Sleeping until walltime");
MSG_process_sleep(*remaining_time);
XBT_INFO("Job has reached walltime");
*remaining_time = 0;
return -1;
}
if (sleeptime < *remaining_time)
{
XBT_INFO("Sleeping the whole task length");
MSG_process_sleep(sleeptime);
XBT_INFO("Sleeping done");
*remaining_time = *remaining_time - sleeptime;
return 0;
}
else
{
XBT_INFO("Sleeping until walltime");
MSG_process_sleep(*remaining_time);
XBT_INFO("Job has reached walltime");
*remaining_time = 0;
return -1;
}
}
/**
* @brief Hook function given to simgrid to cleanup the task after its
* execution ends
* @param unknown unknown
* @param[in,out] data structure to clean up (cast in * CleanExecuteProfileData)
* @param[in,out] data structure to clean up (cast in * CleanExecuteTaskData)
* @return always 0
*/
int execute_task_cleanup(void * unknown, void * data)
{
(void) unknown;
CleanExecuteProfileData * cleanup_data = (CleanExecuteProfileData *) data;
CleanExecuteTaskData * cleanup_data = (CleanExecuteTaskData *) data;
xbt_assert(cleanup_data != nullptr);
XBT_DEBUG("before freeing computation amount %p", cleanup_data->computation_amount);
......@@ -365,15 +362,16 @@ int execute_job_process(int argc, char *argv[])
args->allocation->machine_ids,
args->context);
// Add a cleanup hook on the process
CleanExecuteProfileData * cleanup_data = new CleanExecuteProfileData;
CleanExecuteTaskData * cleanup_data = new CleanExecuteTaskData;
cleanup_data->exec_process_args = args;
SIMIX_process_on_exit(MSG_process_self(), execute_task_cleanup, cleanup_data);
// Create root task
BatTask * root_task = new BatTask(job, workload->profiles->at(job->profile));
job->task = root_task;
job->task = new BatTask(job, workload->profiles->at(job->profile));
// Execute the process
job->return_code = execute_task(root_task, args->context, args->allocation, cleanup_data, &remaining_time);
job->return_code = execute_task(job->task, args->context, args->allocation,
cleanup_data, &remaining_time);
if (job->return_code == 0)
{
XBT_INFO("Job %s finished in time (success)", job->id.to_string().c_str());
......@@ -381,7 +379,8 @@ int execute_job_process(int argc, char *argv[])
}
else if (job->return_code > 0)
{
XBT_INFO("Job %s finished in time (failed)", job->id.to_string().c_str());
XBT_INFO("Job %s finished in time (failed: return_code=%d)",
job->id.to_string().c_str(), job->return_code);
job->state = JobState::JOB_STATE_COMPLETED_FAILED;
}
else
......@@ -490,14 +489,13 @@ int killer_process(int argc, char *argv[])
if (job->state == JobState::JOB_STATE_RUNNING)
{
BatTask * job_progress = job->compute_job_progress();
// consistency checks
if (profile->type == ProfileType::MSG_PARALLEL ||
profile->type == ProfileType::MSG_PARALLEL_HOMOGENEOUS ||
profile->type == ProfileType::MSG_PARALLEL_HOMOGENEOUS_PFS_MULTIPLE_TIERS ||
profile->type == ProfileType::MSG_DATA_STAGING
)
// Consistency checks
if (profile->is_parallel_task() ||
profile->type == ProfileType::DELAY)
{
xbt_assert(job_progress != nullptr, "MSG profiles should contains jobs progress");
xbt_assert(job_progress != nullptr,
"MSG and delay profiles should contain jobs progress");
}
// Store job progress in the message
......
......@@ -9,10 +9,10 @@
#include "context.hpp"
/**
* @brief Structure used to clean the data of the execute_profile function if the process executing it gets killed
* @details This structure should be allocated (via new!) before running execute_profile (but not when doing recursive calls). It is freed by execute_profile_cleanup.
* @brief Structure used to clean the data of a profile execution if the process executing it gets killed
* @details This structure should be allocated (via new!) before running execute_task (but not when doing recursive calls). It is freed by execute_task_cleanup.
*/
struct CleanExecuteProfileData
struct CleanExecuteTaskData
{
double * computation_amount = nullptr; //!< The computation amount (may be null)
double * communication_amount = nullptr; //!< The communication amount (may be null)
......@@ -37,7 +37,7 @@ int killer_process(int argc, char *argv[]);
int smpi_replay_process(int argc, char *argv[]);
/**
* @brief Simulate the delay job profile (sleeping MSG process)
* @brief Simulates a delay profile (sleeps until finished or walltime)
* @param[in] sleeptime The time to sleep
* @param[in,out] remaining_time The remaining amount of time before walltime
* @return 0 if enough time is available, -1 in the case of a timeout
......@@ -45,12 +45,27 @@ int smpi_replay_process(int argc, char *argv[]);
int do_delay_task(double sleeptime, double * remaining_time);
/**
* @brief Is executed on execute_profile termination, allows to clean memory on kill
* @brief Execute a BatTask recursively regarding on its profile type
* @param[in,out] btask the task to execute
* @param[in] context usefull information about Batsim context
* @param[in] allocation the host to execute the task to
* @param[in,out] cleanup_data to give to simgrid cleanup hook
* @param[in,out] remaining_time remaining time of the current task
* @return the profile return code if successful, -1 if walltime reached
*/
int execute_task(BatTask * btask,
BatsimContext *context,
const SchedulingAllocation * allocation,
CleanExecuteTaskData * cleanup_data,
double * remaining_time);
/**
* @brief Is executed on execute_task termination, allows to clean memory on kill
* @param[in] unknown An unknown argument (oops?)
* @param[in] data The data to free
* @return 0
*/
int execute_profile_cleanup(void * unknown, void * data);
int execute_task_cleanup(void * unknown, void * data);
/**
* @brief The process in charge of executing a job
......
......@@ -606,3 +606,50 @@ Profile *Profile::from_json(const std::string & profile_name,
return Profile::from_json(profile_name, doc, error_prefix, false);