Commit 688ab155 authored by Steffen Lackner's avatar Steffen Lackner
Browse files

[code] Extend pfs implementation to consider multiple storage tiers

parent ee695595
......@@ -202,7 +202,12 @@ Other options:
without scheduler nor Redis.
--pfs-host <pfs_host> The name of the host, in <platform_file>,
which will be the parallel filesystem target
as data sink/source [default: pfs_host].
as data sink/source for the large-capacity
storage tier [default: pfs_host].
--hpst-host <hpst_host> The name of the host, in <platform_file>,
which will be the parallel filesystem target
as data sink/source for the high-performance
storage tier [default: hpst_host].
-h --help Shows this help.
--version Shows Batsim version.
)";
......@@ -434,6 +439,7 @@ Other options:
main_args.program_type = ProgramType::BATSIM;
}
main_args.pfs_host_name = args["--pfs-host"].asString();
main_args.hpst_host_name = args["--hpst-host"].asString();
return !error;
}
......
......@@ -95,7 +95,8 @@ struct MainArguments
// Other
bool allow_time_sharing; //!< Allows/forbids time sharing. Two jobs can run on the same machine if and only if time sharing is allowed.
ProgramType program_type; //!< The program type (Batsim or Batexec at the moment)
std::string pfs_host_name; //!< The name of the SimGrid host which serves as parallel file system
std::string pfs_host_name; //!< The name of the SimGrid host which serves as parallel file system (a.k.a. large-capacity storage tier)
std::string hpst_host_name; //!< The name of the SimGrid host which serves as the high-performance storage tier
};
/**
......
......@@ -51,7 +51,8 @@ int execute_profile(BatsimContext *context,
if (profile->type == ProfileType::MSG_PARALLEL ||
profile->type == ProfileType::MSG_PARALLEL_HOMOGENEOUS ||
profile->type == ProfileType::MSG_PARALLEL_HOMOGENEOUS_PFS0)
profile->type == ProfileType::MSG_PARALLEL_HOMOGENEOUS_PFS_MULTIPLE_TIERS ||
profile->type == ProfileType::MSG_DATA_STAGING)
{
double * computation_amount = nullptr;
double * communication_amount = nullptr;
......@@ -108,10 +109,10 @@ int execute_profile(BatsimContext *context,
}
}
}
else if (profile->type == ProfileType::MSG_PARALLEL_HOMOGENEOUS_PFS0)
else if (profile->type == ProfileType::MSG_PARALLEL_HOMOGENEOUS_PFS_MULTIPLE_TIERS)
{
task_name_prefix = "pfs0 ";
MsgParallelHomogeneousPFS0ProfileData * data = (MsgParallelHomogeneousPFS0ProfileData *)profile->data;
task_name_prefix = "pfs_tiers ";
MsgParallelHomogeneousPFSMultipleTiersProfileData * data = (MsgParallelHomogeneousPFSMultipleTiersProfileData *)profile->data;
double cpu = 0;
double size = data->size;
......@@ -121,7 +122,16 @@ int execute_profile(BatsimContext *context,
int pfs_id = nb_res - 1;
// Add the pfs_machine
hosts_to_use.push_back(context->machines.pfs_machine()->host);
switch(data->host) {
case MsgParallelHomogeneousPFSMultipleTiersProfileData::Host::HPST:
hosts_to_use.push_back(context->machines.hpst_machine()->host);
break;
case MsgParallelHomogeneousPFSMultipleTiersProfileData::Host::LCST:
hosts_to_use.push_back(context->machines.pfs_machine()->host);
break;
default:
xbt_die("Should not be reached");
}
// These amounts are deallocated by SG
computation_amount = xbt_new(double, nb_res);
......@@ -140,19 +150,47 @@ int execute_profile(BatsimContext *context,
{
for (int x = 0; x < nb_res; ++x)
{
// Communications are done towards the PFS host, which is the last resource
if (x != pfs_id)
{
communication_amount[k++] = 0;
}
else
{
communication_amount[k++] = size;
switch(data->direction) {
case MsgParallelHomogeneousPFSMultipleTiersProfileData::Direction::TO_STORAGE:
// Communications are done towards the PFS host, which is the last resource (to the storage)
if (x != pfs_id)
{
communication_amount[k++] = 0;
}
else
{
communication_amount[k++] = size;
}
break;
case MsgParallelHomogeneousPFSMultipleTiersProfileData::Direction::FROM_STORAGE:
// Communications are done towards the job allocation (from the storage)
if (x != pfs_id)
{
communication_amount[k++] = size;
}
else
{
communication_amount[k++] = 0;
}
break;
default:
xbt_die("Should not be reached");
}
}
}
}
}
else if (profile->type == ProfileType::MSG_DATA_STAGING)
{
task_name_prefix = "data_staging ";
MsgDataStagingProfileData * data = (MsgDataStagingProfileData *)profile->data;
double cpu = 0;
double size = data->size;
// TODO
xbt_die("TODO");
}
string task_name = task_name_prefix + to_string(job->number) + "'" + job->profile + "'";
XBT_INFO("Creating task '%s'", task_name.c_str());
......
......@@ -54,12 +54,19 @@ Machines::~Machines()
_pfs_machine = nullptr;
}
if (_hpst_machine != nullptr)
{
delete _hpst_machine;
_hpst_machine = nullptr;
}
}
void Machines::create_machines(xbt_dynar_t hosts,
const BatsimContext *context,
const string & master_host_name,
const string & pfs_host_name,
const string & hpst_host_name,
int limit_machine_count)
{
xbt_assert(_machines.size() == 0, "Bad call to Machines::createMachines(): machines already created");
......@@ -211,7 +218,7 @@ void Machines::create_machines(xbt_dynar_t hosts,
"is currently forbidden (https://github.com/oar-team/batsim/issues/21).");
}
if ((machine->name != master_host_name) && (machine->name != pfs_host_name))
if ((machine->name != master_host_name) && (machine->name != pfs_host_name) && (machine->name != hpst_host_name))
{
machine->id = id;
++id;
......@@ -225,13 +232,22 @@ void Machines::create_machines(xbt_dynar_t hosts,
machine->id = -1;
_master_machine = machine;
}
else
else if (machine->name == pfs_host_name)
{
xbt_assert(_pfs_machine == nullptr, "There are two pfs hosts...");
machine->id = -2;
_pfs_machine = machine;
}
else if (machine->name == hpst_host_name)
{
xbt_assert(_hpst_machine == nullptr, "There are two hpst hosts...");
machine->id = -3;
_hpst_machine = machine;
}
else
{
xbt_die("Invalid machine found");
}
}
}
......@@ -241,6 +257,10 @@ void Machines::create_machines(xbt_dynar_t hosts,
{
XBT_WARN("Could not find pfs_host '%s'!", pfs_host_name.c_str());
}
if (_hpst_machine == nullptr)
{
XBT_WARN("Could not find hpst_host '%s'!", hpst_host_name.c_str());
}
sort_machines_by_ascending_name();
......@@ -321,6 +341,13 @@ const Machine *Machines::pfs_machine() const
return _pfs_machine;
}
const Machine *Machines::hpst_machine() const
{
xbt_assert(_hpst_machine != nullptr,
"Trying to access the hpst machine, which does not exist.");
return _hpst_machine;
}
long double Machines::total_consumed_energy(const BatsimContext *context) const
......@@ -660,11 +687,13 @@ void create_machines(const MainArguments & main_args,
XBT_INFO("Creating the machines from platform file '%s'...", main_args.platform_filename.c_str());
XBT_INFO("Looking for master host '%s'", main_args.master_host_name.c_str());
XBT_INFO("Looking for parallel file system host '%s'", main_args.pfs_host_name.c_str());
XBT_INFO("Looking for parallel file system host (LCST) '%s'", main_args.pfs_host_name.c_str());
XBT_INFO("Looking for parallel file system host (HPST) '%s'", main_args.hpst_host_name.c_str());
xbt_dynar_t hosts = MSG_hosts_as_dynar();
context->machines.create_machines(hosts, context, main_args.master_host_name,
main_args.pfs_host_name, max_nb_machines_to_use);
main_args.pfs_host_name, main_args.hpst_host_name,
max_nb_machines_to_use);
xbt_dynar_free(&hosts);
XBT_INFO("The machines have been created successfully. There are %d computing machines.",
......
......@@ -133,13 +133,15 @@ public:
* @param[in] hosts The SimGrid hosts
* @param[in] context The Batsim Context
* @param[in] master_host_name The name of the host which should be used as the Master host
* @param[in] pfs_host_name The name of the host which should be used as the parallel filestem host
* @param[in] pfs_host_name The name of the host which should be used as the parallel filestem host (large-capacity storage tier)
* @param[in] hpst_host_name The name of the host which should be used as the HPST host (high-performance storage tier)
* @param[in] limit_machine_count If set to -1, all the machines are used. If set to a strictly positive number N, only the first machines N will be used to compute jobs
*/
void create_machines(xbt_dynar_t hosts,
const BatsimContext * context,
const std::string & master_host_name,
const std::string & pfs_host_name,
const std::string & hpst_host_name,
int limit_machine_count = -1);
/**
......@@ -215,10 +217,18 @@ public:
/**
* @brief Returns a const pointer to the Parallel File System host machine
* for the large-capacity storage tier.
* @return A const pointer to the Parallel File System host machine
*/
const Machine * pfs_machine() const;
/**
* @brief Returns a const pointer to the Parallel File System host machine
* for the high-performance storage tier.
* @return A const pointer to the Parallel File System host machine
*/
const Machine * hpst_machine() const;
/**
* @brief Computes and returns the total consumed energy of all the computing machines
* @param[in] context The Batsim context
......@@ -256,6 +266,7 @@ private:
std::vector<Machine *> _machines; //!< The vector of computing machines
Machine * _master_machine = nullptr; //!< The Master host
Machine * _pfs_machine = nullptr; //!< The Master host
Machine * _hpst_machine = nullptr; //!< The Master host
PajeTracer * _tracer = nullptr; //!< The PajeTracer
std::map<MachineState, int> _nb_machines_in_each_state; //!< Counts how many machines are in each state
};
......
......@@ -177,9 +177,18 @@ Profile::~Profile()
d = nullptr;
}
}
else if (type == ProfileType::MSG_PARALLEL_HOMOGENEOUS_PFS0)
else if (type == ProfileType::MSG_PARALLEL_HOMOGENEOUS_PFS_MULTIPLE_TIERS)
{
MsgParallelHomogeneousPFS0ProfileData * d = (MsgParallelHomogeneousPFS0ProfileData *) data;
MsgParallelHomogeneousPFSMultipleTiersProfileData * d = (MsgParallelHomogeneousPFSMultipleTiersProfileData *) data;
if (d != nullptr)
{
delete d;
d = nullptr;
}
}
else if (type == ProfileType::MSG_DATA_STAGING)
{
MsgDataStagingProfileData * d = (MsgDataStagingProfileData *) data;
if (d != nullptr)
{
delete d;
......@@ -325,10 +334,53 @@ Profile *Profile::from_json(const std::string & profile_name,
profile->data = data;
}
else if (profile_type == "msg_par_hg_pfs0")
else if (profile_type == "msg_par_hg_pfs_tiers" || profile_type == "msg_par_hg_pfs0")
{
profile->type = ProfileType::MSG_PARALLEL_HOMOGENEOUS_PFS_MULTIPLE_TIERS;
MsgParallelHomogeneousPFSMultipleTiersProfileData * data = new MsgParallelHomogeneousPFSMultipleTiersProfileData;
xbt_assert(json_desc.HasMember("size"), "%s: profile '%s' has no 'size' field",
error_prefix.c_str(), profile_name.c_str());
xbt_assert(json_desc["size"].IsNumber(), "%s: profile '%s' has a non-number 'size' field",
error_prefix.c_str(), profile_name.c_str());
data->size = json_desc["size"].GetDouble();
xbt_assert(data->size >= 0, "%s: profile '%s' has a non-positive 'size' field (%g)",
error_prefix.c_str(), profile_name.c_str(), data->size);
if (json_desc.HasMember("direction")) {
auto direction = json_desc["direction"].GetString();
if (direction == std::string("to_storage")) {
data->direction = MsgParallelHomogeneousPFSMultipleTiersProfileData::Direction::TO_STORAGE;
} else if (direction == std::string("from_storage")) {
data->direction = MsgParallelHomogeneousPFSMultipleTiersProfileData::Direction::FROM_STORAGE;
} else {
xbt_assert(false, "%s: profile '%s' has an invalid 'direction' field (%g)",
error_prefix.c_str(), profile_name.c_str(), direction);
}
} else {
data->direction = MsgParallelHomogeneousPFSMultipleTiersProfileData::Direction::TO_STORAGE;
}
if (json_desc.HasMember("host")) {
auto host = json_desc["host"].GetString();
if (host == std::string("HPST")) {
data->host = MsgParallelHomogeneousPFSMultipleTiersProfileData::Host::HPST;
} else if (host == std::string("LCST") || host == std::string("PFS")) {
data->host = MsgParallelHomogeneousPFSMultipleTiersProfileData::Host::LCST;
} else {
xbt_assert(false, "%s: profile '%s' has an invalid 'host' field (%g)",
error_prefix.c_str(), profile_name.c_str(), host);
}
} else {
data->host = MsgParallelHomogeneousPFSMultipleTiersProfileData::Host::LCST;
}
profile->data = data;
}
else if (profile_type == "data_staging")
{
profile->type = ProfileType::MSG_PARALLEL_HOMOGENEOUS_PFS0;
MsgParallelHomogeneousPFS0ProfileData * data = new MsgParallelHomogeneousPFS0ProfileData;
profile->type = ProfileType::MSG_DATA_STAGING;
MsgDataStagingProfileData * data = new MsgDataStagingProfileData;
xbt_assert(json_desc.HasMember("size"), "%s: profile '%s' has no 'size' field",
error_prefix.c_str(), profile_name.c_str());
......@@ -338,6 +390,18 @@ Profile *Profile::from_json(const std::string & profile_name,
xbt_assert(data->size >= 0, "%s: profile '%s' has a non-positive 'size' field (%g)",
error_prefix.c_str(), profile_name.c_str(), data->size);
xbt_assert(json_desc.HasMember("direction"), "%s: profile '%s' has no 'direction' field",
error_prefix.c_str(), profile_name.c_str());
auto direction = json_desc["direction"].GetString();
if (direction == std::string("hpst_to_lcst")) {
data->direction = MsgDataStagingProfileData::Direction::HPST_TO_LCST;
} else if (direction == std::string("lcst_to_hpst")) {
data->direction = MsgDataStagingProfileData::Direction::LCST_TO_HPST;
} else {
xbt_assert(false, "%s: profile '%s' has an invalid 'direction' field (%g)",
error_prefix.c_str(), profile_name.c_str(), direction);
}
profile->data = data;
}
else if (profile_type == "smpi")
......
......@@ -16,12 +16,13 @@
*/
enum class ProfileType
{
DELAY //!< The profile is a delay. Its data is of type DelayProfileData
,MSG_PARALLEL //!< The profile is composed of a computation vector and a communication matrix. Its data is of type MsgParallelProfileData
,MSG_PARALLEL_HOMOGENEOUS //!< The profile is a homogeneous MSG one. Its data is of type MsgParallelHomogeneousProfileData
,SMPI //!< The profile is a SimGrid MPI time-independent trace. Its data is of type SmpiProfileData
,SEQUENCE //!< The profile is non-atomic: it is composed of a sequence of other profiles
,MSG_PARALLEL_HOMOGENEOUS_PFS0 //!< The profile is a homogeneous MSG for simple parallel filesystem. Its data is of type MsgParallelHomogeneousPFS0ProfileData
DELAY //!< The profile is a delay. Its data is of type DelayProfileData
,MSG_PARALLEL //!< The profile is composed of a computation vector and a communication matrix. Its data is of type MsgParallelProfileData
,MSG_PARALLEL_HOMOGENEOUS //!< The profile is a homogeneous MSG one. Its data is of type MsgParallelHomogeneousProfileData
,SMPI //!< The profile is a SimGrid MPI time-independent trace. Its data is of type SmpiProfileData
,SEQUENCE //!< The profile is non-atomic: it is composed of a sequence of other profiles
,MSG_PARALLEL_HOMOGENEOUS_PFS_MULTIPLE_TIERS //!< The profile is a homogeneous MSG for complex parallel filesystem access. Its data is of type MsgParallelHomogeneousPFSMultipleTiersProfileData
,MSG_DATA_STAGING //!< The profile is a MSG for moving data between the pfs hosts. Its data is of type DataStagingProfileData
};
/**
......@@ -118,11 +119,37 @@ struct SequenceProfileData
};
/**
* @brief The data associated to MSG_PARALLEL_HOMOGENEOUS_PFS0 profiles
* @brief The data associated to MSG_PARALLEL_HOMOGENEOUS_PFS_MULTIPLE_TIERS profiles
*/
struct MsgParallelHomogeneousPFS0ProfileData
struct MsgParallelHomogeneousPFSMultipleTiersProfileData
{
double size; //!< The size of data per compute node to transfer to pfs_machine (simulate a simple I/O traffic model)
enum class Direction {
TO_STORAGE,
FROM_STORAGE
};
enum class Host {
HPST,
LCST
};
double size; //!< The size of data per compute node to transfer to pfs_machine (simulate a simple I/O traffic model)
Direction direction; //!< Whether data should be transfered to the storage or from the storage to the nodes of the allocation
Host host; //!< Whether data should be transfered to/from the HPST storage or to/from the LCST storage
};
/**
* @brief The data associated to MSG_DATA_STAGING profiles
*/
struct MsgDataStagingProfileData
{
enum class Direction {
LCST_TO_HPST,
HPST_TO_LCST
};
double size; //!< The size of data to transfer between the two PFS machines
Direction direction; //!< Whether data should be transfered to the HPST or from the HPST
};
/**
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment