Commit 2f77cc68 authored by MERCIER Michael's avatar MERCIER Michael
Browse files

Refactor + add BatTask + killing progress

parent 5b76ee45
......@@ -6,6 +6,7 @@
#pragma once
#include <vector>
#include <map>
#include <string>
#include <rapidjson/document.h>
......@@ -16,6 +17,7 @@
struct BatsimContext;
struct Job;
struct BatTask;
/**
* @brief A simple structure used to identify one job
......@@ -230,6 +232,7 @@ struct SwitchMessage
struct KillingDoneMessage
{
std::vector<JobIdentifier> jobs_ids; //!< The IDs of the jobs which have been killed
std::map<JobIdentifier, BatTask *> jobs_progress; //!< Jobs list (recursive) amount of work done when killed
};
/**
......
......@@ -28,6 +28,73 @@ using namespace rapidjson;
XBT_LOG_NEW_DEFAULT_CATEGORY(jobs, "jobs"); //!< Logging
/**
* @brief Constructor for simple (leaf) task
*/
BatTask::BatTask(string profile_name, msg_task_t ptask, double computation_amount, double communication_amount)
{
this->profile_name = profile_name;
this->ptask = ptask;
this->computation_amount = computation_amount;
this->communication_amount = communication_amount;
}
/**
* @brief Constructor for composed task (with sub_tasks)
*/
BatTask::BatTask(string profile_name, std::vector<BatTask*> sub_tasks)
{
this->profile_name = profile_name;
this->sub_tasks = sub_tasks;
}
void BatTask::compute_leaf_progress()
{
xbt_assert(this->sub_tasks.empty(), "Leaf should not contains sub tasks");
double remaining_flops = MSG_task_get_flops_amount(this->ptask);
double remaining_comm = MSG_task_get_remaining_communication(this->ptask);
double comput_ratio = 0;
if (this->computation_amount > 0)
{
comput_ratio = remaining_flops / this->computation_amount;
}
double comm_ratio = 0;
if (this->communication_amount > 0)
{
comm_ratio = remaining_comm / this->communication_amount;
}
this->current_task_progress_ratio = 1 - std::max(comm_ratio, comput_ratio);
}
BatTask * BatTask::compute_tasks_progress()
{
if (this->ptask != nullptr)
{
this->compute_leaf_progress();
}
else
{
for (auto& task : sub_tasks)
{
task->compute_tasks_progress();
}
}
}
/**
* @brief Return the BatTask tree with progress
*/
BatTask* Job::compute_job_progress()
{
this->task->compute_tasks_progress();
return this->task;
}
Jobs::Jobs()
{
......
......@@ -16,6 +16,8 @@
#include "exact_numbers.hpp"
#include "machine_range.hpp"
using namespace std;
class Profiles;
class Workload;
......@@ -33,6 +35,27 @@ enum class JobState
,JOB_STATE_REJECTED //!< The job has been rejected by the scheduler.
};
/**
* @brief Internal batsim simulation taski: the job profile instanciation.
*/
struct BatTask
{
BatTask(string profile_name, msg_task_t ptask, double computation_amount, double communication_amount);
BatTask(string profile_name, std::vector<BatTask*> sub_tasks);
std::string profile_name; //!< The job profile name. The corresponding profile tells how the job should be computed
msg_task_t ptask = nullptr; //!< The final task to execute (only set for the leaf of the BatTask tree)
double computation_amount = 0;
double communication_amount = 0;
std::vector<BatTask*> sub_tasks; //!< List of sub task of this task to be executed sequentially or in parallel depending on the profile
unsigned int current_task_index = -1; //!< index in the sub_tasks vector of the current task
double current_task_progress_ratio = 0; //!< give the progress of the current task from 0 to 1
BatTask * compute_tasks_progress();
private:
void compute_leaf_progress();
};
/**
* @brief Represents a job
*/
......@@ -66,6 +89,10 @@ struct Job
JobState state; //!< The current state of the job
int return_code = -1; //!< The return code of the job
BatTask * task; //!< The task to be executed by this job
BatTask * compute_job_progress(); //<! return the task progression of the task tree
/**
* @brief Creates a new-allocated Job from a JSON description
* @param[in] json_desc The JSON description of the job
......
......@@ -6,10 +6,10 @@
#include "jobs_execution.hpp"
#include "jobs.hpp"
#include "task_execution.hpp"
#include <simgrid/plugins/energy.h>
#include <simgrid/msg.h>
#include <smpi/smpi.h>
XBT_LOG_NEW_DEFAULT_CATEGORY(jobs_execution, "jobs_execution"); //!< Logging
......@@ -39,11 +39,11 @@ int smpi_replay_process(int argc, char *argv[])
return 0;
}
int execute_profile(BatsimContext *context,
const std::string & profile_name,
const SchedulingAllocation * allocation,
CleanExecuteProfileData * cleanup_data,
double * remaining_time)
int execute_task(BatsimContext *context,
const std::string & profile_name,
const SchedulingAllocation * allocation,
CleanExecuteProfileData * cleanup_data,
double * remaining_time)
{
Workload * workload = context->workloads.at(allocation->job_id.workload_name);
Job * job = workload->jobs->at(allocation->job_id.job_number);
......@@ -56,229 +56,12 @@ int execute_profile(BatsimContext *context,
profile->type == ProfileType::MSG_PARALLEL_HOMOGENEOUS_PFS_MULTIPLE_TIERS ||
profile->type == ProfileType::MSG_DATA_STAGING)
{
double * computation_amount = nullptr;
double * communication_amount = nullptr;
string task_name_prefix;
std::vector<msg_host_t> hosts_to_use = allocation->hosts;
if (profile->type == ProfileType::MSG_PARALLEL)
{
task_name_prefix = "p ";
MsgParallelProfileData * data = (MsgParallelProfileData *)profile->data;
// These amounts are deallocated by SG
computation_amount = xbt_new(double, nb_res);
communication_amount = xbt_new(double, nb_res*nb_res);
// Let us retrieve the matrices from the profile
memcpy(computation_amount, data->cpu, sizeof(double) * nb_res);
memcpy(communication_amount, data->com, sizeof(double) * nb_res * nb_res);
}
else if (profile->type == ProfileType::MSG_PARALLEL_HOMOGENEOUS)
{
task_name_prefix = "phg ";
MsgParallelHomogeneousProfileData * data = (MsgParallelHomogeneousProfileData *)profile->data;
double cpu = data->cpu;
double com = data->com;
// These amounts are deallocated by SG
computation_amount = xbt_new(double, nb_res);
communication_amount = nullptr;
if(com > 0)
{
communication_amount = xbt_new(double, nb_res * nb_res);
}
// Let us fill the local computation and communication matrices
int k = 0;
for (int y = 0; y < nb_res; ++y)
{
computation_amount[y] = cpu;
if(communication_amount != nullptr)
{
for (int x = 0; x < nb_res; ++x)
{
if (x == y)
{
communication_amount[k++] = 0;
}
else
{
communication_amount[k++] = com;
}
}
}
}
}
else if (profile->type == ProfileType::MSG_PARALLEL_HOMOGENEOUS_PFS_MULTIPLE_TIERS)
int return_code = execute_msg_task(allocation, nb_res, job, remaining_time, profile, context, cleanup_data);
if (return_code != 0)
{
task_name_prefix = "pfs_tiers ";
MsgParallelHomogeneousPFSMultipleTiersProfileData * data = (MsgParallelHomogeneousPFSMultipleTiersProfileData *)profile->data;
double cpu = 0;
double size = data->size;
// The PFS machine will also be used
nb_res = nb_res + 1;
int pfs_id = nb_res - 1;
// Add the pfs_machine
switch(data->host)
{
case MsgParallelHomogeneousPFSMultipleTiersProfileData::Host::HPST:
hosts_to_use.push_back(context->machines.hpst_machine()->host);
break;
case MsgParallelHomogeneousPFSMultipleTiersProfileData::Host::LCST:
hosts_to_use.push_back(context->machines.pfs_machine()->host);
break;
default:
xbt_die("Should not be reached");
}
// These amounts are deallocated by SG
computation_amount = xbt_new(double, nb_res);
communication_amount = nullptr;
if(size > 0)
{
communication_amount = xbt_new(double, nb_res*nb_res);
}
// Let us fill the local computation and communication matrices
int k = 0;
for (int y = 0; y < nb_res; ++y)
{
computation_amount[y] = cpu;
if(communication_amount != nullptr)
{
for (int x = 0; x < nb_res; ++x)
{
switch(data->direction)
{
case MsgParallelHomogeneousPFSMultipleTiersProfileData::Direction::TO_STORAGE:
// Communications are done towards the PFS host, which is the last resource (to the storage)
if (x != pfs_id)
{
communication_amount[k++] = 0;
}
else
{
communication_amount[k++] = size;
}
break;
case MsgParallelHomogeneousPFSMultipleTiersProfileData::Direction::FROM_STORAGE:
// Communications are done towards the job allocation (from the storage)
if (x != pfs_id)
{
communication_amount[k++] = size;
}
else
{
communication_amount[k++] = 0;
}
break;
default:
xbt_die("Should not be reached");
}
}
}
}
return return_code;
}
else if (profile->type == ProfileType::MSG_DATA_STAGING)
{
task_name_prefix = "data_staging ";
MsgDataStagingProfileData * data = (MsgDataStagingProfileData *)profile->data;
double cpu = 0;
double size = data->size;
// The PFS machine will also be used
nb_res = 2;
int pfs_id = nb_res - 1;
hosts_to_use = std::vector<msg_host_t>();
// Add the pfs_machine
switch(data->direction)
{
case MsgDataStagingProfileData::Direction::LCST_TO_HPST:
hosts_to_use.push_back(context->machines.pfs_machine()->host);
hosts_to_use.push_back(context->machines.hpst_machine()->host);
break;
case MsgDataStagingProfileData::Direction::HPST_TO_LCST:
hosts_to_use.push_back(context->machines.hpst_machine()->host);
hosts_to_use.push_back(context->machines.pfs_machine()->host);
break;
default:
xbt_die("Should not be reached");
}
// These amounts are deallocated by SG
computation_amount = xbt_new(double, nb_res);
communication_amount = nullptr;
if(size > 0)
{
communication_amount = xbt_new(double, nb_res*nb_res);
}
// Let us fill the local computation and communication matrices
int k = 0;
for (int y = 0; y < nb_res; ++y)
{
computation_amount[y] = cpu;
if(communication_amount != nullptr)
{
for (int x = 0; x < nb_res; ++x)
{
// Communications are done towards the last resource
if (x != pfs_id)
{
communication_amount[k++] = 0;
}
else
{
communication_amount[k++] = size;
}
}
}
}
}
string task_name = task_name_prefix + to_string(job->number) + "'" + job->profile + "'";
XBT_INFO("Creating task '%s'", task_name.c_str());
msg_task_t ptask = MSG_parallel_task_create(task_name.c_str(),
nb_res,
hosts_to_use.data(),
computation_amount,
communication_amount, NULL);
// If the process gets killed, the following data may need to be freed
cleanup_data->task = ptask;
double time_before_execute = MSG_get_clock();
XBT_INFO("Executing task '%s'", MSG_task_get_name(ptask));
msg_error_t err = MSG_parallel_task_execute_with_timeout(ptask, *remaining_time);
*remaining_time = *remaining_time - (MSG_get_clock() - time_before_execute);
int ret = profile->return_code;
if (err == MSG_OK) {}
else if (err == MSG_TIMEOUT)
{
ret = -1;
}
else
{
xbt_die("A task execution had been stopped by an unhandled way (err = %d)", err);
}
XBT_INFO("Task '%s' finished", MSG_task_get_name(ptask));
MSG_task_destroy(ptask);
// The task has been executed, the data does need to be freed in the cleanup function anymore
cleanup_data->task = nullptr;
return ret;
return profile->return_code;
}
else if (profile->type == ProfileType::SEQUENCE)
{
......@@ -288,7 +71,7 @@ int execute_profile(BatsimContext *context,
{
for (unsigned int j = 0; j < data->sequence.size(); j++)
{
int ret_last_profile = execute_profile(context, data->sequence[j], allocation,
int ret_last_profile = execute_task(context, data->sequence[j], allocation,
cleanup_data, remaining_time);
if (ret_last_profile != 0)
{
......@@ -376,8 +159,8 @@ int execute_profile(BatsimContext *context,
if (profile_to_execute != "")
{
XBT_INFO("Execute profile: %s", profile_to_execute.c_str());
int ret_last_profile = execute_profile(context, profile_to_execute, allocation,
XBT_INFO("Instaciate task from profile: %s", profile_to_execute.c_str());
int ret_last_profile = execute_task(context, profile_to_execute, allocation,
cleanup_data, remaining_time);
if (ret_last_profile != 0)
{
......@@ -503,6 +286,39 @@ int delay_job(double sleeptime,
}
}
int execute_task_cleanup(void * unknown, void * data)
{
(void) unknown;
CleanExecuteProfileData * cleanup_data = (CleanExecuteProfileData *) data;
xbt_assert(cleanup_data != nullptr);
XBT_DEBUG("before freeing computation amount %p", cleanup_data->computation_amount);
xbt_free(cleanup_data->computation_amount);
XBT_DEBUG("before freeing communication amount %p", cleanup_data->communication_amount);
xbt_free(cleanup_data->communication_amount);
if (cleanup_data->exec_process_args != nullptr)
{
XBT_DEBUG("before deleting exec_process_args->allocation %p",
cleanup_data->exec_process_args->allocation);
delete cleanup_data->exec_process_args->allocation;
XBT_DEBUG("before deleting exec_process_args %p", cleanup_data->exec_process_args);
delete cleanup_data->exec_process_args;
}
if (cleanup_data->task != nullptr)
{
XBT_WARN("Not cleaning the task data to avoid a SG deadlock :(");
//MSG_task_destroy(cleanup_data->task);
}
XBT_DEBUG("before deleting cleanup_data %p", cleanup_data);
delete cleanup_data;
return 0;
}
int execute_job_process(int argc, char *argv[])
{
(void) argc;
......@@ -529,10 +345,13 @@ int execute_job_process(int argc, char *argv[])
args->context->machines.update_machines_on_job_run(job,
args->allocation->machine_ids,
args->context);
// Add a cleanup hook on the process
CleanExecuteProfileData * cleanup_data = new CleanExecuteProfileData;
cleanup_data->exec_process_args = args;
SIMIX_process_on_exit(MSG_process_self(), execute_profile_cleanup, cleanup_data);
job->return_code = execute_profile(args->context, job->profile, args->allocation, cleanup_data, &remaining_time);
SIMIX_process_on_exit(MSG_process_self(), execute_task_cleanup, cleanup_data);
// Execute the process
job->return_code = execute_task(args->context, job->profile, args->allocation, cleanup_data, &remaining_time);
if (job->return_code == 0)
{
XBT_INFO("Job %s finished in time (success)", job->id.c_str());
......@@ -623,38 +442,7 @@ int waiter_process(int argc, char *argv[])
return 0;
}
int execute_profile_cleanup(void * unknown, void * data)
{
(void) unknown;
CleanExecuteProfileData * cleanup_data = (CleanExecuteProfileData *) data;
xbt_assert(cleanup_data != nullptr);
XBT_DEBUG("before freeing computation amount %p", cleanup_data->computation_amount);
xbt_free(cleanup_data->computation_amount);
XBT_DEBUG("before freeing communication amount %p", cleanup_data->communication_amount);
xbt_free(cleanup_data->communication_amount);
if (cleanup_data->exec_process_args != nullptr)
{
XBT_DEBUG("before deleting exec_process_args->allocation %p",
cleanup_data->exec_process_args->allocation);
delete cleanup_data->exec_process_args->allocation;
XBT_DEBUG("before deleting exec_process_args %p", cleanup_data->exec_process_args);
delete cleanup_data->exec_process_args;
}
if (cleanup_data->task != nullptr)
{
XBT_WARN("Not cleaning the task data to avoid a SG deadlock :(");
//MSG_task_destroy(cleanup_data->task);
}
XBT_DEBUG("before deleting cleanup_data %p", cleanup_data);
delete cleanup_data;
return 0;
}
int killer_process(int argc, char *argv[])
{
......@@ -663,6 +451,9 @@ int killer_process(int argc, char *argv[])
KillerProcessArguments * args = (KillerProcessArguments *) MSG_process_get_data(MSG_process_self());
KillingDoneMessage * message = new KillingDoneMessage;
message->jobs_ids = args->jobs_ids;
for (const JobIdentifier & job_id : args->jobs_ids)
{
Job * job = args->context->workloads.job_at(job_id);
......@@ -675,6 +466,9 @@ int killer_process(int argc, char *argv[])
job->state == JobState::JOB_STATE_COMPLETED_FAILED,
"Bad kill: job %s is not running", job->id.c_str());
// Store job progress in the message
message->jobs_progress[job_id] = job->compute_job_progress();
if (job->state == JobState::JOB_STATE_RUNNING)
{
// Let's kill all the involved processes
......@@ -719,8 +513,6 @@ int killer_process(int argc, char *argv[])
}
}
KillingDoneMessage * message = new KillingDoneMessage;
message->jobs_ids = args->jobs_ids;
send_message("server", IPMessageType::KILLING_DONE, (void*)message);
delete args;
......
/**
* @file task_execution.cpp
* @brief Contains functions related to the execution of the MSG profile tasks
*/
#include <simgrid/msg.h>
#include "jobs.hpp"
#include "profiles.hpp"
#include "ipp.hpp"
#include "context.hpp"
#include "jobs_execution.hpp"
XBT_LOG_NEW_DEFAULT_CATEGORY(task_execution, "task_execution"); //!< Logging
using namespace std;
void generate_msg_parallel_task(
string task_name_prefix,
double* computation_amount,
double* communication_amount,
int nb_res,
void* profile_data)
{
task_name_prefix = "p ";
MsgParallelProfileData* data = (MsgParallelProfileData*)profile_data;
// These amounts are deallocated by SG
computation_amount = xbt_new(double, nb_res);
communication_amount = xbt_new(double, nb_res* nb_res);
// Let us retrieve the matrices from the profile
memcpy(computation_amount, data->cpu, sizeof(double) * nb_res);
memcpy(communication_amount, data->com, sizeof(double) * nb_res * nb_res);
}
void generate_msg_parallel_homogeneous(
string task_name_prefix,
double* computation_amount,
double* communication_amount,
int nb_res,
void* profile_data)
{
task_name_prefix = "phg ";
MsgParallelHomogeneousProfileData* data
= (MsgParallelHomogeneousProfileData*)profile_data;
double cpu = data->cpu;
double com = data->com;
// These amounts are deallocated by SG
computation_amount = xbt_new(double, nb_res);
communication_amount = nullptr;
if (com > 0) {
communication_amount = xbt_new(double, nb_res* nb_res);
}
// Let us fill the local computation and communication matrices
int k = 0;
for (int y = 0; y < nb_res; ++y) {
computation_amount[y] = cpu;
if (communication_amount != nullptr) {
for (int x = 0; x < nb_res; ++x) {
if (x == y) {
communication_amount[k++] = 0;
} else {
communication_amount[k++] = com;
}
}
}
}
}
void generate_msg_parallel_homogeneous_with_pfs(
string task_name_prefix,
double* computation_amount,
double* communication_amount,
int nb_res,
void* profile_data,
BatsimContext* context,