Commit d0c38c52 authored by Millian Poquet's avatar Millian Poquet
Browse files

[smpi] map rank->host added in jobs + test

parent 87ff7d12
......@@ -224,6 +224,12 @@ add_test(smpi
-bod /tmp/batsim_tests/smpi
-bwd ${CMAKE_SOURCE_DIR})
add_test(smpi_mapping
${CMAKE_SOURCE_DIR}/tools/experiments/execute_instances.py
${CMAKE_SOURCE_DIR}/test/test_smpi_mapping.yaml
-bod /tmp/batsim_tests/smpi_mapping
-bwd ${CMAKE_SOURCE_DIR})
add_test(smpi_batexec
${CMAKE_SOURCE_DIR}/tools/experiments/execute_instances.py
${CMAKE_SOURCE_DIR}/test/test_smpi_batexec.yaml
......
......@@ -237,6 +237,30 @@ Job * Job::from_json(const rapidjson::Value & json_desc, Workload * workload)
"A problem occured when replacing the job_id by its WLOAD!job_number counterpart: "
"The output JSON '%s' has a non-string 'id' field.", j->json_description.c_str());
if (json_desc.HasMember("smpi_ranks_to_hosts_mapping"))
{
xbt_assert(json_desc["smpi_ranks_to_hosts_mapping"].IsArray(),
"Invalid JSON: job %d has a non-array 'smpi_ranks_to_hosts_mapping' field",
j->number);
const auto & mapping_array = json_desc["smpi_ranks_to_hosts_mapping"];
j->smpi_ranks_to_hosts_mapping.resize(mapping_array.Size());
for (unsigned int i = 0; i < mapping_array.Size(); ++i)
{
xbt_assert(mapping_array[i].IsInt(),
"Invalid JSON: job %d has a bad 'smpi_ranks_to_hosts_mapping' field: rank "
"%d does not point to an integral number", j->number, i);
int host_number = mapping_array[i].GetInt();
xbt_assert(host_number >= 0 && host_number < j->required_nb_res,
"Invalid JSON: job %d has a bad 'smpi_ranks_to_hosts_mapping' field: rank "
"%d has an invalid value %d : should be in [0,%d[", j->number, i,
host_number, j->required_nb_res);
j->smpi_ranks_to_hosts_mapping[i] = host_number;
}
}
XBT_DEBUG("Loaded job %d from workload %s", (int) j->number, j->workload->name.c_str() );
return j;
......
......@@ -49,6 +49,7 @@ struct Job
Rational starting_time; //!< The time at which the job starts to be executed.
Rational runtime; //!< The amount of time during which the job has been executed
MachineRange allocation; //!< The machines on which the job has been executed.
std::vector<int> smpi_ranks_to_hosts_mapping; //!< If the job uses a SMPI profile, stores which host number each MPI rank should use. These numbers must be in [0,required_nb_res[.
JobState state; //!< The current state of the job
/**
......
......@@ -174,10 +174,30 @@ int execute_profile(BatsimContext *context,
SmpiProfileData * data = (SmpiProfileData *) profile->data;
msg_sem_t sem = MSG_sem_init(1);
for (int i = 0; i < nb_res; ++i)
int nb_ranks = data->trace_filenames.size();
// Let's use the default mapping is none is provided (round-robin on hosts, as we do not
// know the number of cores on each host)
if (job->smpi_ranks_to_hosts_mapping.empty())
{
job->smpi_ranks_to_hosts_mapping.resize(nb_ranks);
int host_to_use = 0;
for (int i = 0; i < nb_ranks; ++i)
{
job->smpi_ranks_to_hosts_mapping[i] = host_to_use;
++host_to_use %= job->required_nb_res; // ++ is done first
}
}
xbt_assert(nb_ranks == (int) job->smpi_ranks_to_hosts_mapping.size(),
"Invalid job %s: SMPI ranks_to_host mapping has an invalid size, as it should "
"use %d MPI ranks but the ranking states that there are %d ranks.",
job->id, nb_ranks, (int) job->smpi_ranks_to_hosts_mapping.size());
for (int i = 0; i < nb_ranks; ++i)
{
char *str_instance_id = NULL;
int ret = asprintf(&str_instance_id, "%d", job->number);
int ret = asprintf(&str_instance_id, "%s!%d", job->workload->name.c_str(), job->number);
xbt_assert(ret != -1, "asprintf failed (not enough memory?)");
char *str_rank_id = NULL;
......@@ -194,13 +214,17 @@ int execute_profile(BatsimContext *context,
argv[2] = str_rank_id; // Rank Id
argv[3] = xbt_strdup((char*) data->trace_filenames[i].c_str());
argv[4] = xbt_strdup("0"); //
if (i==0)
{
MSG_process_create_with_arguments(str_pname, smpi_replay_process, sem, allocation->hosts[i], 5, argv );
} else
{
MSG_process_create_with_arguments(str_pname, smpi_replay_process, NULL, allocation->hosts[i], 5, argv );
}
msg_host_t host_to_use = allocation->hosts[job->smpi_ranks_to_hosts_mapping[i]];
msg_sem_t sem_to_use = NULL;
XBT_INFO("Hello!");
if (i == 0)
sem_to_use = sem;
MSG_process_create_with_arguments(str_pname, smpi_replay_process, sem_to_use,
host_to_use, 5, argv);
// todo: avoid memory leaks
free(str_pname);
......
......@@ -86,9 +86,16 @@ void Workload::register_smpi_applications()
for (auto mit : jobs->jobs())
{
Job * job = mit.second;
string job_id_str = to_string(job->number);
XBT_INFO("Registering app. instance='%s', nb_process=%d", job_id_str.c_str(), job->required_nb_res);
SMPI_app_instance_register(job_id_str.c_str(), smpi_replay_process, job->required_nb_res);
Profile * profile = (*profiles)[job->profile];
if (profile->type == ProfileType::SMPI)
{
SmpiProfileData * data = (SmpiProfileData *) profile->data;
string job_id_str = name + "!" + to_string(job->number);
XBT_INFO("Registering app. instance='%s', nb_process=%d", job_id_str.c_str(), data->trace_filenames.size());
SMPI_app_instance_register(job_id_str.c_str(), smpi_replay_process, data->trace_filenames.size());
}
}
XBT_INFO("SMPI applications of workload '%s' have been registered.", name.c_str());
......
# This script should be called from Batsim's root directory
# If needed, the working directory of this script can be specified within this file
#base_working_directory: ~/proj/batsim
# If needed, the output directory of this script can be specified within this file
base_output_directory: /tmp/batsim_tests/smpi_mapping
base_variables:
batsim_dir: ${base_working_directory}
implicit_instances:
implicit:
sweep:
platform :
- {"name":"small", "filename":"${batsim_dir}/platforms/small_platform.xml"}
workload :
- {"name":"mapping", "filename":"${batsim_dir}/workload_profiles/test_smpi_mapping.json"}
pybatsim_algo:
- {"name":"filler", "algo_name":"fillerSched"}
generic_instance:
timeout: 10
working_directory: ${base_working_directory}
output_directory: ${base_output_directory}/results/${pybatsim_algo[name]}_${workload[name]}_${platform[name]}
batsim_command: batsim -p ${platform[filename]} -w ${workload[filename]} -e ${output_directory}/out --redis-prefix ${instance_id} --mmax-workload
sched_command: python2 ${batsim_dir}/schedulers/pybatsim/launcher.py ${pybatsim_algo[algo_name]} --redis-prefix ${instance_id}
commands_after_execution:
# Let's check that the jobs have consistent execution times
- |
#!/bin/bash
source ${output_directory}/variables.bash
cat > ${output_directory}/jobs_analysis.py <<EOF
#!/usr/bin/python2
from __future__ import print_function
import pandas as pd
import sys
# Let's get when jobs have been released
jobs = pd.read_csv('${output_directory}/out_jobs.csv')
# Jobs 1 and 2 should have similar execution times
a = 0
b = 1
if abs(jobs['execution_time'][a] - jobs['execution_time'][b]) > 1:
print("Jobs {id1} and {id2} should have similar runtimes, but "
"they have not ({runtime1}, {runtime2}).".format(
id1 = jobs['job_id'][a], id2= jobs['job_id'][b],
runtime1= jobs['execution_time'][a],
runtime2= jobs['execution_time'][b]))
sys.exit(1)
else:
print("OK. Jobs {id1} and {id2} have similar runtimes "
"({runtime1}, {runtime2}).".format(
id1 = jobs['job_id'][a], id2= jobs['job_id'][b],
runtime1= jobs['execution_time'][a],
runtime2= jobs['execution_time'][b]))
# Jobs 3 and 4 should have similar execution times
a = 2
b = 3
if abs(jobs['execution_time'][a] - jobs['execution_time'][b]) > 1:
print("Jobs {id1} and {id2} should have similar runtimes, but "
"they have not ({runtime1}, {runtime2}).".format(
id1 = jobs['job_id'][a], id2= jobs['job_id'][b],
runtime1= jobs['execution_time'][a],
runtime2= jobs['execution_time'][b]))
sys.exit(1)
else:
print("OK. Jobs {id1} and {id2} have similar runtimes "
"({runtime1}, {runtime2}).".format(
id1 = jobs['job_id'][a], id2= jobs['job_id'][b],
runtime1= jobs['execution_time'][a],
runtime2= jobs['execution_time'][b]))
a = 0
b = 2
if abs(jobs['execution_time'][a] - jobs['execution_time'][b]) <= 100:
print("OK. Jobs {id1} and {id2} should have different runtimes,"
" but they have not ({runtime1}, {runtime2}).".format(
id1 = jobs['job_id'][a], id2= jobs['job_id'][b],
runtime1= jobs['execution_time'][a],
runtime2= jobs['execution_time'][b]))
sys.exit(1)
else:
print("OK. Jobs {id1} and {id2} have different runtimes "
"({runtime1}, {runtime2}).".format(
id1 = jobs['job_id'][a], id2= jobs['job_id'][b],
runtime1= jobs['execution_time'][a],
runtime2= jobs['execution_time'][b]))
sys.exit(0)
EOF
- python2 ${output_directory}/jobs_analysis.py
commands_before_instances:
- ${batsim_dir}/test/is_batsim_dir.py ${base_working_directory}
- ${batsim_dir}/test/clean_output_dir.py ${base_output_directory}
{
"version": 0,
"command:": "",
"date": "Tue, 11 Mar 2015 9:44:30 +0100",
"description": "workload with profile file for test",
"nb_res": 4,
"jobs": [
{"id":1, "subtime":1, "walltime": 30e9, "res": 2, "profile": "1",
"description": "default mapping (round robin on nodes)"},
{"id":2, "subtime":1e13, "walltime": 30e9, "res":2, "profile":"1",
"NOT_A_smpi_ranks_to_hosts_mapping":[0,1],
"description": "should be the same as the default mapping"},
{"id":3, "subtime":2e13, "walltime": 30e9, "res":1, "profile":"1",
"description": "execute it on a single node"},
{"id":4, "subtime":3e13, "walltime": 30e9, "res":1, "profile":"1",
"NOT_A_smpi_ranks_to_hosts_mapping":[0,0],
"description": "should be the same as 3"}
],
"profiles": {
"1": {
"type": "smpi",
"mpi_size": 2,
"trace": "smpi/mixed_comm_comp/traces.txt"
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment