Commit 9002f05c authored by MERCIER Michael's avatar MERCIER Michael
Browse files

Refactor delay_job function + fix task prefix + Doc

parent 7155acf2
Pipeline #673 failed with stages
in 13 seconds
......@@ -76,11 +76,16 @@ enum class JobState
*/
struct BatTask
{
/**
* brief Construct the batTask and store parent job and profile
* @param parent_job The parent job that own this task
* @param profile The task profile to be executed
*/
BatTask(Job * parent_job, Profile * profile);
~BatTask();
Job * parent_job;
Profile * profile; //!< The job profile. The corresponding profile tells how the job should be computed
Job * parent_job; //!< The parent job that own this task
Profile * profile; //!< The task profile. The corresponding profile tells how the job should be computed
// Manage MSG profiles
msg_task_t ptask = nullptr; //!< The final task to execute (only set for the leaf of the BatTask tree)
// manage Delay profile
......@@ -91,10 +96,11 @@ struct BatTask
vector<BatTask*> sub_tasks; //!< List of sub task of this task to be executed sequentially or in parallel depending on the profile
unsigned int current_task_index = -1; //!< index in the sub_tasks vector of the current task
double current_task_progress_ratio = 0; //!< give the progress of the current task from 0 to 1
void compute_tasks_progress();
void compute_tasks_progress(); //!< fill up the progress field (current_task_*) for this task and his sub tasks
private:
void compute_leaf_progress();
void compute_leaf_progress(); //!< Helper function for compute_task_progress
};
......@@ -133,7 +139,11 @@ struct Job
BatTask * task = nullptr; //!< The task to be executed by this job
BatTask * compute_job_progress(); //<! return the task progression of the task tree
/**
* @brief compute the task progression of this job
* @return the task tree with progress values filled up
*/
BatTask * compute_job_progress();
/**
* @brief Creates a new-allocated Job from a JSON description
......@@ -159,8 +169,9 @@ struct Job
Workload * workload,
const std::string & error_prefix = "Invalid JSON job");
/**
* @brief Return true if the job has complete (successfully or not),
* false if the job has not started
* @brief Check if a job is complete (successfully or not)
* @return true if the job is complete , false if the job has not
* started
*/
bool is_complete();
};
......
......@@ -39,6 +39,15 @@ int smpi_replay_process(int argc, char *argv[])
return 0;
}
/**
* @brief Execute a BatTask recursively regarding on its profile type
* @param[in,out] btask the task to execute
* @param[in] context usefull information about Batsim context
* @param[in] allocation the host to execute the task to
* @param[in,out] cleanup_data to give to simgrid cleanup hook
* @param[in,out] remaining_time remaining time of the current task
* @return the profile return code if successful, -1 if walltime reached
*/
int execute_task(BatTask * btask,
BatsimContext *context,
const SchedulingAllocation * allocation,
......@@ -102,7 +111,7 @@ int execute_task(BatTask * btask,
send_message("server", IPMessageType::FROM_JOB_MSG, (void*)message);
if (delay_job(data->sleeptime, remaining_time) == -1)
if (do_delay_task(data->sleeptime, remaining_time) == -1)
{
return -1;
}
......@@ -124,7 +133,8 @@ int execute_task(BatTask * btask,
XBT_INFO("Waiting for message from scheduler");
while (true)
{
if (delay_job(data->polltime, remaining_time) == -1)
if (do_delay_task(data->polltime, remaining_time) == -1)
{
return -1;
}
......@@ -184,22 +194,11 @@ int execute_task(BatTask * btask,
btask->delay_task_start = MSG_get_clock();
btask->delay_task_required = data->delay;
if (data->delay < *remaining_time)
{
XBT_INFO("Sleeping the whole task length");
MSG_process_sleep(data->delay);
XBT_INFO("Sleeping done");
*remaining_time = *remaining_time - data->delay;
return profile->return_code;
}
else
if (do_delay_task(data->delay, remaining_time) == -1)
{
XBT_INFO("Sleeping until walltime");
MSG_process_sleep(*remaining_time);
XBT_INFO("Walltime reached");
*remaining_time = 0;
return -1;
}
return profile->return_code;
}
else if (profile->type == ProfileType::SMPI)
{
......@@ -279,24 +278,33 @@ int execute_task(BatTask * btask,
return 1;
}
int delay_job(double sleeptime,
double * remaining_time)
int do_delay_task(double sleeptime, double * remaining_time)
{
if (sleeptime < *remaining_time)
{
XBT_INFO("Sleeping the whole task length");
MSG_process_sleep(sleeptime);
XBT_INFO("Sleeping done");
*remaining_time = *remaining_time - sleeptime;
return 0;
}
else
{
XBT_INFO("Job has reached walltime");
XBT_INFO("Sleeping until walltime");
MSG_process_sleep(*remaining_time);
XBT_INFO("Job has reached walltime");
*remaining_time = 0;
return -1;
}
}
/**
* @brief Hook function given to simgrid to cleanup the task after its
* execution ends
* @param unknown unknown
* @param[in,out] data structure to clean up (cast in * CleanExecuteProfileData)
* @return always 0
*/
int execute_task_cleanup(void * unknown, void * data)
{
(void) unknown;
......
......@@ -36,29 +36,13 @@ int killer_process(int argc, char *argv[]);
*/
int smpi_replay_process(int argc, char *argv[]);
/**
* @brief Executes the profile of a job
* @param[in] context The Batsim Context
* @param[in] profile_name The name of the profile to execute
* @param[in] allocation The machines the job should be executed on
* @param[in,out] cleanup_data The data to clean on bad process termination (kill)
* @param[in,out] remaining_time The remaining amount of time before walltime
* @return 0
*/
int execute_profile(BatsimContext *context,
const std::string & profile_name,
const SchedulingAllocation * allocation,
CleanExecuteProfileData * cleanup_data,
double * remaining_time);
/**
* @brief Simulate the delay job profile (sleeping MSG process)
* @param[in] sleeptime The time to sleep
* @param[in,out] remaining_time The remaining amount of time before walltime
* @return 0 if enough time is available, -1 in the case of a timeout
*/
int delay_job(double sleeptime,
double * remaining_time);
int do_delay_task(double sleeptime, double * remaining_time);
/**
* @brief Is executed on execute_profile termination, allows to clean memory on kill
......
......@@ -121,6 +121,7 @@ public:
* @brief Appends a JOB_KILLED event.
* @param[in] job_ids The identifiers of the jobs that have been killed.
* @param[in] date The event date. Must be greater than or equal to the previous event.
* @param[in] job_progress contains for each job a progress tree
*/
virtual void append_job_killed(const std::vector<std::string> & job_ids,
const std::map<std::string, BatTask *> job_progress,
......@@ -256,6 +257,7 @@ public:
* @brief Appends a JOB_KILLED event.
* @param[in] job_ids The identifiers of the jobs that have been killed.
* @param[in] date The event date. Must be greater than or equal to the previous event.
* @param[in] job_progress contains for each job a progress tree
*/
void append_job_killed(const std::vector<std::string> & job_ids,
const std::map<std::string, BatTask *> job_progress,
......
......@@ -14,9 +14,17 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(task_execution, "task_execution"); //!< Logging
using namespace std;
/**
* @brief Generate the communication and computaion matrix for the msg
* parallel task profile. It also set the prefix name of the task.
* @param[out] task_name_prefix the prefix to add to the task name
* @param[out] computation_amount the computation matrix to be simulated by the msg task
* @param[out] communication_amount the communication matrix to be simulated by the msg task
* @param[in] nb_res the number of resources the task have to run on
* @param[in] profile_data the profile data
*/
void generate_msg_parallel_task(
string task_name_prefix,
string& task_name_prefix,
double*& computation_amount,
double*& communication_amount,
unsigned int nb_res,
......@@ -33,8 +41,18 @@ void generate_msg_parallel_task(
memcpy(communication_amount, data->com, sizeof(double) * nb_res * nb_res);
}
/**
* @brief Generate the communication and computaion matrix for the msg
* parallel homogeneous task profile. It also set the prefix name of the
* task.
* @param[out] task_name_prefix the prefix to add to the task name
* @param[out] computation_amount the computation matrix to be simulated by the msg task
* @param[out] communication_amount the communication matrix to be simulated by the msg task
* @param[in] nb_res the number of resources the task have to run on
* @param[in] profile_data the profile data
*/
void generate_msg_parallel_homogeneous(
string task_name_prefix,
string& task_name_prefix,
double*& computation_amount,
double*& communication_amount,
unsigned int nb_res,
......@@ -71,8 +89,21 @@ void generate_msg_parallel_homogeneous(
}
}
/**
* @brief Generate the communication and computaion matrix for the msg
* parallel homogeneous task profile with pfs.
* @details Note that the number of resource is also altered because of
* the pfs node that is addded. It also set the prefix name of the task.
* @param[out] task_name_prefix the prefix to add to the task name
* @param[out] computation_amount the computation matrix to be simulated by the msg task
* @param[out] communication_amount the communication matrix to be simulated by the msg task
* @param[in,out] nb_res the number of resources the task have to run on
* @param[in] profile_data the profile data
* @param[in,out] hosts_to_use the list of host to be used by the task
* @param[in] context the batsim context
*/
void generate_msg_parallel_homogeneous_with_pfs(
string task_name_prefix,
string& task_name_prefix,
double*& computation_amount,
double*& communication_amount,
unsigned int& nb_res,
......@@ -147,6 +178,20 @@ void generate_msg_parallel_homogeneous_with_pfs(
}
}
/**
* @brief Generate the communication and computaion matrix for the msg
* data staging task profile.
* @details Note that the number of resource is also altered because only
* the pfs and the hpst are involved in the transfer. It also set the prefix
* name of the task.
* @param[out] task_name_prefix the prefix to add to the task name
* @param[out] computation_amount the computation matrix to be simulated by the msg task
* @param[out] communication_amount the communication matrix to be simulated by the msg task
* @param[in,out] nb_res the number of resources the task have to run on
* @param[in] profile_data the profile data
* @param[in,out] hosts_to_use the list of host to be used by the task
* @param[in] context the batsim context
*/
void generate_msg_data_staginig_task(string task_name_prefix,
double*& computation_amount,
double*& communication_amount,
......
......@@ -6,6 +6,13 @@
/**
* @brief Execute tasks from profiles that use MSG simgrid model
* @param[in,out] btask task to be filled and compute
* @param[in] allocation the host to execute the task to
* @param[in] nb_res the number of resources the task have to run on
* @param[in,out] remaining_time remaining time of the current task
* @param[in] context usefull information about Batsim context
* @param[in,out] cleanup_data The data to clean on bad process termination (kill)
* @return the task profile return code of -1 it the task timeout
*/
int execute_msg_task(
BatTask * btask,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment