Commit 7155acf2 authored by MERCIER Michael's avatar MERCIER Michael
Browse files

debug kill progress + test + job state walltime reached

Also add more test on sequential jobs
parent ff912a0d
......@@ -295,6 +295,13 @@ add_test(kill_multiple
-bod /tmp/batsim_tests/kill_multiple
-bwd ${CMAKE_SOURCE_DIR})
add_test(kill_progress
${CMAKE_SOURCE_DIR}/tools/experiments/execute_instances.py
${CMAKE_SOURCE_DIR}/test/test_kill_progress.yaml
-bod /tmp/batsim_tests/kill_progress
-bwd ${CMAKE_SOURCE_DIR})
add_test(pybatsim_tests
${CMAKE_SOURCE_DIR}/tools/experiments/execute_instances.py
${CMAKE_SOURCE_DIR}/test/pybatsim_tests.yaml
......
......@@ -704,7 +704,7 @@ void export_jobs_to_csv(const std::string &filename, const BatsimContext *contex
{
Job * job = mit.second;
if (job->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY || job->state == JobState::JOB_STATE_COMPLETED_KILLED)
if (job->is_complete())
{
char * buf = nullptr;
int success = (job->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY);
......@@ -787,8 +787,7 @@ void export_schedule_to_csv(const std::string &filename, const BatsimContext *co
Job * job = mit.second;
nb_jobs++;
if (job->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY ||
job->state == JobState::JOB_STATE_COMPLETED_KILLED)
if (job->is_complete())
{
nb_jobs_finished++;
......
......@@ -37,40 +37,56 @@ BatTask::BatTask(Job * parent_job, Profile * profile) {
this->profile = profile;
}
BatTask::~BatTask() {
for (auto &sub_btask : this->sub_tasks) {
delete sub_btask;
sub_btask = nullptr;
}
}
void BatTask::compute_leaf_progress()
{
xbt_assert(this->sub_tasks.empty(), "Leaf should not contains sub tasks");
double remaining_flops = MSG_task_get_flops_amount(this->ptask);
double remaining_comm = MSG_task_get_remaining_communication(this->ptask);
double comput_ratio = 0;
if (this->computation_amount > 0)
{
comput_ratio = remaining_flops / this->computation_amount;
// MSG task
if (this->ptask != nullptr) {
// WARNING: This is not returning the flops amount but the work quantity
//remaining from 1 (not started) to 0 (done)
this->current_task_progress_ratio = 1 - MSG_task_get_flops_amount(this->ptask);
}
double comm_ratio = 0;
if (this->communication_amount > 0)
// delay task
else if (this->delay_task_start != -1)
{
comm_ratio = remaining_comm / this->communication_amount;
double runtime = MSG_get_clock() - this->delay_task_start;
// manage empty delay job (why?!)
if (this->delay_task_required == 0)
{
this->current_task_progress_ratio = 1;
}
else
{
this->current_task_progress_ratio = runtime / this->delay_task_required;
}
}
// Not implemented
else {
XBT_WARN("computing the progress of this type of task is not implemented");
}
this->current_task_progress_ratio = 1 - std::max(comm_ratio, comput_ratio);
}
void BatTask::compute_tasks_progress()
{
if (this->ptask != nullptr)
if (this->sub_tasks.empty())
{
this->compute_leaf_progress();
}
else
{
for (auto& task : sub_tasks)
{
task->compute_tasks_progress();
}
// compute only for current sequential task
sub_tasks[current_task_index]->compute_tasks_progress();
}
}
......@@ -79,11 +95,14 @@ void BatTask::compute_tasks_progress()
*/
BatTask* Job::compute_job_progress()
{
this->task->compute_tasks_progress();
if (this->task != nullptr) {
this->task->compute_tasks_progress();
}
return this->task;
}
Jobs::Jobs()
{
......@@ -229,6 +248,19 @@ Job::~Job()
xbt_assert(execution_processes.size() == 0,
"Internal error: job %s has %d execution processes on destruction (should be 0).",
this->id.to_string().c_str(), (int)execution_processes.size());
if (this->task != nullptr)
{
delete this->task;
this->task = nullptr;
}
}
bool Job::is_complete()
{
return (this->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY
|| this->state == JobState::JOB_STATE_COMPLETED_KILLED
|| this->state == JobState::JOB_STATE_COMPLETED_FAILED
|| this->state == JobState::JOB_STATE_COMPLETED_WALLTIME_REACHED);
}
// Do NOT remove namespaces in the arguments (to avoid doxygen warnings)
......@@ -407,6 +439,9 @@ string job_state_to_string(const JobState & state)
case JobState::JOB_STATE_COMPLETED_FAILED:
job_state = "COMPLETED_FAILED";
break;
case JobState::JOB_STATE_COMPLETED_WALLTIME_REACHED:
job_state = "COMPLETED_WALLTIME_REACHED";
break;
case JobState::JOB_STATE_COMPLETED_KILLED:
job_state = "COMPLETED_KILLED";
break;
......@@ -445,6 +480,10 @@ JobState job_state_from_string(const std::string & state)
{
new_state = JobState::JOB_STATE_COMPLETED_KILLED;
}
else if (state == "COMPLETED_WALLTIME_REACHED")
{
new_state = JobState::JOB_STATE_COMPLETED_WALLTIME_REACHED;
}
else if (state == "REJECTED")
{
new_state = JobState::JOB_STATE_REJECTED;
......
......@@ -65,23 +65,29 @@ enum class JobState
,JOB_STATE_RUNNING //!< The job has been scheduled and is currently being processed.
,JOB_STATE_COMPLETED_SUCCESSFULLY //!< The job execution finished before its walltime successfully.
,JOB_STATE_COMPLETED_FAILED //!< The job execution finished before its walltime but the job failed.
,JOB_STATE_COMPLETED_WALLTIME_REACHED //!< The job has reached his walltime and have been killed
,JOB_STATE_COMPLETED_KILLED //!< The job has been killed.
,JOB_STATE_REJECTED //!< The job has been rejected by the scheduler.
};
/**
* @brief Internal batsim simulation taski: the job profile instanciation.
* @brief Internal batsim simulation task: the job profile instantiation.
*/
struct BatTask
{
BatTask(Job * parent_job, Profile * profile);
~BatTask();
Job * parent_job;
Profile * profile; //!< The job profile. The corresponding profile tells how the job should be computed
// Manage MSG profiles
msg_task_t ptask = nullptr; //!< The final task to execute (only set for the leaf of the BatTask tree)
double computation_amount = 0;
double communication_amount = 0;
// manage Delay profile
double delay_task_start = -1; //!< Keep delay task starting time in order to compute progress afterwards
double delay_task_required = -1; //!< Keep delay task time requirement
// manage sequential profile
vector<BatTask*> sub_tasks; //!< List of sub task of this task to be executed sequentially or in parallel depending on the profile
unsigned int current_task_index = -1; //!< index in the sub_tasks vector of the current task
double current_task_progress_ratio = 0; //!< give the progress of the current task from 0 to 1
......@@ -125,7 +131,7 @@ struct Job
JobState state; //!< The current state of the job
int return_code = -1; //!< The return code of the job
BatTask * task; //!< The task to be executed by this job
BatTask * task = nullptr; //!< The task to be executed by this job
BatTask * compute_job_progress(); //<! return the task progression of the task tree
......@@ -152,6 +158,11 @@ struct Job
static Job * from_json(const std::string & json_str,
Workload * workload,
const std::string & error_prefix = "Invalid JSON job");
/**
* @brief Return true if the job has complete (successfully or not),
* false if the job has not started
*/
bool is_complete();
};
......
......@@ -181,6 +181,9 @@ int execute_task(BatTask * btask,
{
DelayProfileData * data = (DelayProfileData *) profile->data;
btask->delay_task_start = MSG_get_clock();
btask->delay_task_required = data->delay;
if (data->delay < *remaining_time)
{
XBT_INFO("Sleeping the whole task length");
......@@ -360,6 +363,7 @@ int execute_job_process(int argc, char *argv[])
// Create root task
BatTask * root_task = new BatTask(job, workload->profiles->at(job->profile));
job->task = root_task;
// Execute the process
job->return_code = execute_task(root_task, args->context, args->allocation, cleanup_data, &remaining_time);
if (job->return_code == 0)
......@@ -376,7 +380,7 @@ int execute_job_process(int argc, char *argv[])
{
XBT_INFO("Job %s had been killed (walltime %g reached)",
job->id.to_string().c_str(), (double) job->walltime);
job->state = JobState::JOB_STATE_COMPLETED_KILLED;
job->state = JobState::JOB_STATE_COMPLETED_WALLTIME_REACHED;
job->kill_reason = "Walltime reached";
if (args->context->trace_schedule)
{
......@@ -384,8 +388,6 @@ int execute_job_process(int argc, char *argv[])
MSG_get_clock(), true);
}
}
// cleanup task
delete root_task;
args->context->machines.update_machines_on_job_end(job, args->allocation->machine_ids,
args->context);
......@@ -472,17 +474,27 @@ int killer_process(int argc, char *argv[])
Profile * profile = args->context->workloads.at(job_id.workload_name)->profiles->at(job->profile);
(void) profile;
xbt_assert(job->state == JobState::JOB_STATE_RUNNING ||
job->state == JobState::JOB_STATE_COMPLETED_KILLED ||
job->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY ||
job->state == JobState::JOB_STATE_COMPLETED_FAILED,
"Bad kill: job %s is not running", job->id.to_string().c_str());
// Store job progress in the message
message->jobs_progress[job_id] = job->compute_job_progress();
xbt_assert(! (job->state == JobState::JOB_STATE_REJECTED ||
job->state == JobState::JOB_STATE_SUBMITTED ||
job->state == JobState::JOB_STATE_NOT_SUBMITTED),
"Bad kill: job %s has not been started", job->id.to_string().c_str());
if (job->state == JobState::JOB_STATE_RUNNING)
{
BatTask * job_progress = job->compute_job_progress();
// consistency checks
if (profile->type == ProfileType::MSG_PARALLEL ||
profile->type == ProfileType::MSG_PARALLEL_HOMOGENEOUS ||
profile->type == ProfileType::MSG_PARALLEL_HOMOGENEOUS_PFS_MULTIPLE_TIERS ||
profile->type == ProfileType::MSG_DATA_STAGING
)
{
xbt_assert(job_progress != nullptr, "MSG profiles should contains jobs progress");
}
// Store job progress in the message
message->jobs_progress[job_id] = job_progress;
// Let's kill all the involved processes
xbt_assert(job->execution_processes.size() > 0);
for (msg_process_t process : job->execution_processes)
......
......@@ -247,11 +247,14 @@ void JsonProtocolWriter::append_job_completed(const string & job_id,
_events.PushBack(event, _alloc);
}
Value generate_task_tree(BatTask * task_tree, rapidjson::Document::AllocatorType & _alloc)
/**
* @brief Create task tree with progress in Json and add it to _alloc
*/
Value generate_task_tree(BatTask* task_tree, rapidjson::Document::AllocatorType & _alloc)
{
Value task(rapidjson::kObjectType);
// create task tree
if (task_tree->ptask != nullptr)
// add final task (leaf) progress
if (task_tree->ptask != nullptr || task_tree->delay_task_start != -1)
{
task.AddMember("profile", Value().SetString(task_tree->profile->name.c_str(), _alloc), _alloc);
task.AddMember("progress", Value().SetDouble(task_tree->current_task_progress_ratio), _alloc);
......@@ -330,11 +333,16 @@ void JsonProtocolWriter::append_job_killed(const vector<string> & job_ids,
{
jobs.PushBack(Value().SetString(job_id.c_str(), _alloc), _alloc);
// compute task progress tree
progress.AddMember(Value().SetString(job_id.c_str(), _alloc),
generate_task_tree(job_progress.at(job_id), _alloc), _alloc);
if (job_progress.at(job_id) != nullptr) {
progress.AddMember(Value().SetString(job_id.c_str(), _alloc),
generate_task_tree(job_progress.at(job_id), _alloc), _alloc);
}
}
event.AddMember("data", Value().SetObject().AddMember("job_ids", jobs, _alloc), _alloc);
Value data(rapidjson::kObjectType);
data.AddMember("job_ids", jobs, _alloc);
data.AddMember("job_progress", progress, _alloc);
event.AddMember("data", data, _alloc);
_events.PushBack(event, _alloc);
}
......
......@@ -215,7 +215,7 @@ void server_on_job_completed(ServerData * data,
{
status = "FAILED";
}
else if (job->state == JobState::JOB_STATE_COMPLETED_KILLED && job->kill_reason == "Walltime reached")
else if (job->state == JobState::JOB_STATE_COMPLETED_WALLTIME_REACHED)
{
status = "TIMEOUT";
}
......@@ -486,9 +486,11 @@ void server_on_killing_done(ServerData * data,
{
job_ids_str.push_back(job_id.to_string());
// store job progress from BatTask tree in str
jobs_progress_str[job_id.to_string()] = message->jobs_progress[job_id];
const Job * job = data->context->workloads.job_at(job_id);
if (job->state == JobState::JOB_STATE_COMPLETED_KILLED &&
job->kill_reason == "Killed from killer_process (probably requested by the decision process)")
if (job->state == JobState::JOB_STATE_COMPLETED_KILLED)
{
data->nb_running_jobs--;
xbt_assert(data->nb_running_jobs >= 0);
......@@ -496,9 +498,8 @@ void server_on_killing_done(ServerData * data,
xbt_assert(data->nb_completed_jobs + data->nb_running_jobs <= data->nb_submitted_jobs);
really_killed_job_ids_str.push_back(job_id.to_string());
}
// compute job progress from BatTask tree in str
jobs_progress_str[job_id.to_string()] = message->jobs_progress[job_id];
}
XBT_INFO("Jobs {%s} have been killed (the following ones have REALLY been killed: {%s})",
......@@ -665,6 +666,7 @@ void server_on_change_job_state(ServerData * data,
{
case JobState::JOB_STATE_COMPLETED_SUCCESSFULLY:
case JobState::JOB_STATE_COMPLETED_FAILED:
case JobState::JOB_STATE_COMPLETED_WALLTIME_REACHED:
case JobState::JOB_STATE_COMPLETED_KILLED:
job->runtime = MSG_get_clock() - job->starting_time;
data->nb_running_jobs--;
......@@ -762,9 +764,7 @@ void server_on_kill_jobs(ServerData * data,
if (!job->kill_requested)
{
// Let's check the job state
xbt_assert(job->state == JobState::JOB_STATE_RUNNING ||
job->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY ||
job->state == JobState::JOB_STATE_COMPLETED_KILLED,
xbt_assert(job->state == JobState::JOB_STATE_RUNNING || job->is_complete(),
"Invalid KILL_JOB: job_id '%s' refers to a job not being executed nor completed.",
job_id.to_string().c_str());
......
......@@ -19,7 +19,7 @@ void generate_msg_parallel_task(
string task_name_prefix,
double*& computation_amount,
double*& communication_amount,
int nb_res,
unsigned int nb_res,
void* profile_data)
{
task_name_prefix = "p ";
......@@ -37,7 +37,7 @@ void generate_msg_parallel_homogeneous(
string task_name_prefix,
double*& computation_amount,
double*& communication_amount,
int nb_res,
unsigned int nb_res,
void* profile_data)
{
task_name_prefix = "phg ";
......@@ -56,10 +56,10 @@ void generate_msg_parallel_homogeneous(
// Let us fill the local computation and communication matrices
int k = 0;
for (int y = 0; y < nb_res; ++y) {
for (unsigned int y = 0; y < nb_res; ++y) {
computation_amount[y] = cpu;
if (communication_amount != nullptr) {
for (int x = 0; x < nb_res; ++x) {
for (unsigned int x = 0; x < nb_res; ++x) {
if (x == y) {
communication_amount[k] = 0;
} else {
......@@ -75,10 +75,10 @@ void generate_msg_parallel_homogeneous_with_pfs(
string task_name_prefix,
double*& computation_amount,
double*& communication_amount,
int& nb_res,
unsigned int& nb_res,
void* profile_data,
BatsimContext* context,
std::vector<msg_host_t> hosts_to_use)
std::vector<msg_host_t>& hosts_to_use)
{
task_name_prefix = "pfs_tiers ";
MsgParallelHomogeneousPFSMultipleTiersProfileData* data
......@@ -89,7 +89,7 @@ void generate_msg_parallel_homogeneous_with_pfs(
// The PFS machine will also be used
nb_res = nb_res + 1;
int pfs_id = nb_res - 1;
unsigned int pfs_id = nb_res - 1;
// Add the pfs_machine
switch (data->host) {
......@@ -112,10 +112,10 @@ void generate_msg_parallel_homogeneous_with_pfs(
// Let us fill the local computation and communication matrices
int k = 0;
for (int y = 0; y < nb_res; ++y) {
for (unsigned int y = 0; y < nb_res; ++y) {
computation_amount[y] = cpu;
if (communication_amount != nullptr) {
for (int x = 0; x < nb_res; ++x) {
for (unsigned int x = 0; x < nb_res; ++x) {
switch (data->direction) {
case MsgParallelHomogeneousPFSMultipleTiersProfileData::
Direction::TO_STORAGE:
......@@ -150,10 +150,10 @@ void generate_msg_parallel_homogeneous_with_pfs(
void generate_msg_data_staginig_task(string task_name_prefix,
double*& computation_amount,
double*& communication_amount,
int& nb_res,
unsigned int& nb_res,
void* profile_data,
BatsimContext* context,
std::vector<msg_host_t> hosts_to_use)
std::vector<msg_host_t>& hosts_to_use)
{
task_name_prefix = "data_staging ";
MsgDataStagingProfileData* data = (MsgDataStagingProfileData*)profile_data;
......@@ -163,7 +163,7 @@ void generate_msg_data_staginig_task(string task_name_prefix,
// The PFS machine will also be used
nb_res = 2;
int pfs_id = nb_res - 1;
unsigned int pfs_id = nb_res - 1;
// reset the alloc to use only IO nodes
hosts_to_use = std::vector<msg_host_t>();
......@@ -191,10 +191,10 @@ void generate_msg_data_staginig_task(string task_name_prefix,
// Let us fill the local computation and communication matrices
int k = 0;
for (int y = 0; y < nb_res; ++y) {
for (unsigned int y = 0; y < nb_res; ++y) {
computation_amount[y] = cpu;
if (communication_amount != nullptr) {
for (int x = 0; x < nb_res; ++x) {
for (unsigned int x = 0; x < nb_res; ++x) {
// Communications are done towards the last resource
if (x != pfs_id) {
communication_amount[k] = 0;
......@@ -210,7 +210,7 @@ void generate_msg_data_staginig_task(string task_name_prefix,
int execute_msg_task(
BatTask * btask,
const SchedulingAllocation* allocation,
int nb_res,
unsigned int nb_res,
double * remaining_time,
BatsimContext * context,
CleanExecuteProfileData * cleanup_data)
......@@ -262,8 +262,11 @@ int execute_msg_task(
// Create the MSG task
string task_name
= task_name_prefix + to_string(btask->parent_job->number) + "'" + btask->profile->name + "'";
XBT_INFO("Creating MSG task '%s'", task_name.c_str());
xbt_assert(nb_res == ((unsigned int)hosts_to_use.size()),
"the number of resources (%d) is not equal to the number of hosts (%lu)",
nb_res, hosts_to_use.size());
XBT_INFO("Creating MSG task '%s' on %d resources", task_name.c_str(), nb_res);
msg_task_t ptask = MSG_parallel_task_create(task_name.c_str(), nb_res,
hosts_to_use.data(), computation_amount, communication_amount, NULL);
......@@ -272,18 +275,6 @@ int execute_msg_task(
// Keep track of the task to get information on kill
btask->ptask = ptask;
if (computation_amount != nullptr) {
for (int i=0; i < nb_res; i++) {
btask->computation_amount += computation_amount[i];
}
}
if (communication_amount != nullptr) {
for (int i=0; i < nb_res; i++) {
for (int j=0; j < nb_res; j++) {
btask->communication_amount += communication_amount[(i * nb_res) + j];
}
}
}
// Execute the MSG task (blocking)
double time_before_execute = MSG_get_clock();
......
......@@ -10,7 +10,7 @@
int execute_msg_task(
BatTask * btask,
const SchedulingAllocation* allocation,
int nb_res,
unsigned int nb_res,
double * remaining_time,
BatsimContext * context,
CleanExecuteProfileData * cleanup_data);
base_output_directory: /tmp/batsim_tests/kill_progress
base_variables:
batsim_dir: ${base_working_directory}
implicit_instances:
implicit:
sweep:
platform :
- {"name":"small", "filename":"${batsim_dir}/platforms/small_platform.xml", "master_host":"master_host"}
workload :
- {"name":"tiny_compute", "filename": "${batsim_dir}/workload_profiles/one_computation_job.json"}
- {"name":"tiny_delay", "filename": "${batsim_dir}/workload_profiles/one_delay_job.json"}
- {"name":"test_all", "filename": "${batsim_dir}/workload_profiles/test_workload_profile.json"}
algo:
- {"name":"killer", "sched_name":"killer"}
delay_before_kill: [0,5]
nb_kills_per_job: [1]
generic_instance:
timeout: 10
working_directory: ${base_working_directory}
output_directory: ${base_output_directory}/results/${instance_id}
batsim_command: batsim -p ${platform[filename]} -w ${workload[filename]} -e ${output_directory}/out -m ${platform[master_host]} -vnetwork-only
sched_command: batsched -v ${algo[sched_name]} --variant_options_filepath ${output_directory}/sched_input.json
commands_before_execution:
# Generate sched input
- |
#!/usr/bin/env bash
cat > ${output_directory}/sched_input.json << EOF
{
"nb_kills_per_job": ${nb_kills_per_job},
"delay_before_kill": ${delay_before_kill}
}
EOF
commands_after_execution:
- |
cat ${output_directory}/batsim.stderr | \grep -o "Sending '.*'" | \cut -d "'" -f2 | \grep 'JOB_KILLED' > ${output_directory}/messages.txt
# Let's check if job progress is present in all killed job
- |
#!/usr/bin/env bash
source ${output_directory}/variables.bash
cat > ${output_directory}/jobs_analysis.py <<EOF
#!/usr/bin/env python3
import json
import pandas as pd
import re
import sys
# workload = json.load(open('${workload[filename]}'))
msg_file = open('${output_directory}/messages.txt')
msg_str = msg_file.read()
msgs = msg_str.strip().split('\n')
errors = []
for msg in msgs:
json_msg = json.loads(msg)
for event in json_msg['events']:
if event['type'] == 'JOB_KILLED':
job_data = event['data']
if 'job_progress' not in job_data:
errors.append(job_data + " do not contains job progress\n")
#TODO test if this job is msg or delay of composed
#and test the progress content type in regards
if errors:
print(error)
sys.exit(1)
else:
sys.exit(0)
EOF
- chmod +x ${output_directory}/jobs_analysis.py
- ${output_directory}/jobs_analysis.py
commands_before_instances:
- ${batsim_dir}/test/is_batsim_dir.py ${base_working_directory}
- ${batsim_dir}/test/clean_output_dir.py ${base_output_directory}
......@@ -55,14 +55,14 @@ implicit_instances:
for _,job in jobs.iterrows():
jid, jsuc, jrt = job['job_id'], job['success'], job['execution_time']
if jid in {1,2,3,4}:
if jid in {1,2,3,4,5}:
if jsuc != 1:
print('Job {} should be successful...'.format(jsuc))
exit_status = 1
if not isclose(jrt, 30, abs_tol=epsilon):
print('Job {} should take {} s (took {})'.format(jid, 30, jrt))
exit_status = 1
elif jid in {11,12,13,14}:
elif jid in {11,12,13,14,15}:
if jsuc != 0:
print('Job {} should NOT be successful...'.format(jsuc))
exit_status = 1
......@@ -73,8 +73,8 @@ implicit_instances:
print('Job {} is unknown!'.format(jid))
exit_status = 1