Commit 7382b7e3 authored by Millian Poquet's avatar Millian Poquet
Browse files

[code,doc] Redis can be disabled (->segfault)

parent fa2d0acb
......@@ -6,7 +6,10 @@ default configuration:
```json
{
"redis": {
"enabled": true
"enabled": true,
"hostname": "127.0.0.1",
"port": 6379,
"prefix": "default"
},
"job_submission": {
"forward_profiles": true,
......
......@@ -155,7 +155,7 @@ acknowledgement has been requested. Without Redis enabled the job
description and optionnaly the profile are also transmitted.
- **data**: list of job id
- **example without redis**:
- **example with redis**:
```json
{
"timestamp": 10.0,
......@@ -163,7 +163,7 @@ description and optionnaly the profile are also transmitted.
"data": {"job_id": "w0!1"}
}
```
- **example with redis**:
- **example without redis**:
```json
{
"timestamp": 10.0,
......@@ -360,7 +360,7 @@ details.
- **data**: A job id (job id duplication is forbidden), classical job and
profile information (optional).
- **example redis** : the job description, and the profile description if
- **example with redis** : the job description, and the profile description if
it unknown to Batsim yet, must have been pushed into redis by the
scheduler before sending this message
```json
......
......@@ -43,6 +43,23 @@ using namespace std;
XBT_LOG_NEW_DEFAULT_CATEGORY(batsim, "batsim"); //!< Logging
//! Batsim default JSON configuration
string default_configuration = R"({
"redis": {
"enabled": true,
"hostname": "127.0.0.1",
"port": 6379,
"prefix": "default"
},
"job_submission": {
"forward_profiles": true,
"from_scheduler": {
"enabled": false,
"acknowledge": true
}
}
})";
/**
* @brief Checks whether a file exists
* @param[in] filename The file whose existence is checked<
......@@ -133,10 +150,12 @@ Execution context options:
--config-file <cfg_file> Configuration file name (optional). [default: None]
-s, --socket-endpoint <endpoint> The Decision process socket endpoint
Decision process [default: tcp://localhost:28000].
--redis-hostname <redis_host> The Redis server hostname
[default: 127.0.0.1]
--redis-port <redis_port> The Redis server port [default: 6379].
--redis-prefix <prefix> The Redis prefix [default: default].
--redis-hostname <redis_host> The Redis server hostname. Read from config file by default.
[default: None]
--redis-port <redis_port> The Redis server port. Read from config file by default.
[default: -1]
--redis-prefix <prefix> The Redis prefix. Read from config file by default.
[default: None]
Output options:
-e, --export <prefix> The export filename prefix used to generate
......@@ -308,23 +327,15 @@ Other options:
string file_content((istreambuf_iterator<char>(file)),
istreambuf_iterator<char>());
main_args.config_file.Parse(file_content.c_str());
xbt_assert(!main_args.config_file.HasParseError(),
"Invalid configuration file '%s': could not be parsed.",
config_filename.c_str());
}
else
{
string default_configuration = R"({
"redis": {
"enabled": true
},
"job_submission": {
"forward_profiles": true,
"from_scheduler": {
"enabled": false,
"acknowledge": true
}
}
})";
main_args.config_file.Parse(default_configuration.c_str());
xbt_assert(!main_args.config_file.HasParseError(),
"Invalid default configuration file : could not be parsed.");
}
......@@ -600,9 +611,15 @@ int main(int argc, char * argv[])
if (main_args.program_type == ProgramType::BATSIM)
{
// Let's prepare Redis's connection
context.storage.set_instance_key_prefix(main_args.redis_prefix);
context.storage.connect_to_server(main_args.redis_hostname, main_args.redis_port);
if (context.redis_enabled)
{
// Let's prepare Redis's connection
context.storage.set_instance_key_prefix(main_args.redis_prefix);
context.storage.connect_to_server(main_args.redis_hostname, main_args.redis_port);
// Let's store some metadata about the current instance in the data storage
context.storage.set("nb_res", std::to_string(context.machines.nb_machines()));
}
// Let's create the socket
context.zmq_socket = new zmq::socket_t(context.zmq_context, ZMQ_REQ);
......@@ -610,10 +627,7 @@ int main(int argc, char * argv[])
// Let's create the protocol reader and writer
context.proto_reader = new JsonProtocolReader(&context);
context.proto_writer = new JsonProtocolWriter;
// Let's store some metadata about the current instance in the data storage
context.storage.set("nb_res", std::to_string(context.machines.nb_machines()));
context.proto_writer = new JsonProtocolWriter(&context);
// Let's execute the initial processes
start_initial_simulation_processes(main_args, &context);
......@@ -650,96 +664,135 @@ int main(int argc, char * argv[])
}
void set_configuration(BatsimContext *context,
const MainArguments &main_args)
MainArguments & main_args)
{
context->platform_filename = main_args.platform_filename;
context->export_prefix = main_args.export_prefix;
context->workflow_nb_concurrent_jobs_limit = main_args.workflow_nb_concurrent_jobs_limit;
context->energy_used = main_args.energy_used;
context->allow_time_sharing = main_args.allow_time_sharing;
context->trace_schedule = main_args.enable_schedule_tracing;
context->trace_machine_states = main_args.enable_machine_state_tracing;
context->simulation_start_time = chrono::high_resolution_clock::now();
using namespace rapidjson;
context->terminate_with_last_workflow = main_args.terminate_with_last_workflow;
// ********************************************************
// Let's load default values from the default configuration
// ********************************************************
Document default_config_doc;
default_config_doc.Parse(default_configuration.c_str());
xbt_assert(!default_config_doc.HasParseError(),
"Invalid default configuration file : could not be parsed.");
// Let's read the JSON configuration
/* {
"redis": {
"enabled": true
},
"job_submission": {
"forward_profiles": true,
"from_scheduler": {
"enabled": false,
"acknowledge": true
}
}
} */
bool redis_enabled = default_config_doc["redis"]["enabled"].GetBool();
string redis_hostname = default_config_doc["redis"]["hostname"].GetString();
int redis_port = default_config_doc["redis"]["port"].GetInt();
string redis_prefix = default_config_doc["redis"]["prefix"].GetString();
// Default values
bool redis_enabled = true;
bool submission_forward_profiles = true;
bool submission_sched_enabled = false;
bool submission_sched_ack = true;
bool submission_forward_profiles = default_config_doc["job_submission"]["forward_profiles"].GetBool();
using namespace rapidjson;
bool submission_sched_enabled = default_config_doc["job_submission"]["from_scheduler"]["enabled"].GetBool();
bool submission_sched_ack = default_config_doc["job_submission"]["from_scheduler"]["acknowledge"].GetBool();
// **********************************
// Let's parse the configuration file
// **********************************
const Value & main_object = main_args.config_file;
xbt_assert(main_object.IsObject(), "Invalid JSON configuration: not an object.");
if(main_object.HasMember("redis"))
{
const Value & redis_object = main_object["redis"];
xbt_assert(redis_object.IsObject(), "Invalid JSON configuration: 'redis' value should be an object.");
xbt_assert(redis_object.IsObject(), "Invalid JSON configuration: ['redis'] should be an object.");
if (redis_object.HasMember("enabled"))
{
const Value & redis_enabled_value = redis_object["enabled"];
xbt_assert(redis_enabled_value.IsBool(), "Invalid JSON configuration: 'redis'['enabled'] should be a boolean.");
xbt_assert(redis_enabled_value.IsBool(), "Invalid JSON configuration: ['redis']['enabled'] should be a boolean.");
redis_enabled = redis_enabled_value.GetBool();
}
if (redis_object.HasMember("hostname"))
{
const Value & redis_hostname_value = redis_object["hostname"];
xbt_assert(redis_hostname_value.IsString(), "Invalid JSON configuration: ['redis']['hostname'] should be a string.");
redis_hostname = redis_hostname_value.GetString();
}
if (redis_object.HasMember("port"))
{
const Value & redis_port_value = redis_object["port"];
xbt_assert(redis_port_value.IsInt(), "Invalid JSON configuration: ['redis']['port'] should be an integer.");
redis_port = redis_port_value.GetInt();
}
if (redis_object.HasMember("prefix"))
{
const Value & redis_prefix_value = redis_object["prefix"];
xbt_assert(redis_prefix_value.IsString(), "Invalid JSON configuration: ['redis']['prefix'] should be a string.");
redis_prefix = redis_prefix_value.GetString();
}
}
if (main_object.HasMember("job_submission"))
{
const Value & job_submission_object = main_object["job_submission"];
xbt_assert(job_submission_object.IsObject(), "Invalid JSON configuration: 'redis'['job_submission'] should be an object.");
xbt_assert(job_submission_object.IsObject(), "Invalid JSON configuration: ['job_submission'] should be an object.");
if (job_submission_object.HasMember("forward_profiles"))
{
const Value & forward_profiles_value = job_submission_object["forward_profiles"];
xbt_assert(forward_profiles_value.IsBool(), "Invalid JSON configuration: 'redis'['job_submission']['forward_profiles'] should be a boolean.");
xbt_assert(forward_profiles_value.IsBool(), "Invalid JSON configuration: ['job_submission']['forward_profiles'] should be a boolean.");
submission_forward_profiles = forward_profiles_value.GetBool();
}
if (job_submission_object.HasMember("from_scheduler"))
{
const Value & from_sched_object = job_submission_object["from_scheduler"];
xbt_assert(from_sched_object.IsObject(), "Invalid JSON configuration: 'redis'['job_submission']['from_scheduler'] should be an object.");
xbt_assert(from_sched_object.IsObject(), "Invalid JSON configuration: ['job_submission']['from_scheduler'] should be an object.");
if (from_sched_object.HasMember("enabled"))
{
const Value & submission_sched_enabled_value = from_sched_object["enabled"];
xbt_assert(submission_sched_enabled_value.IsBool(), "Invalid JSON configuration: 'redis'['job_submission']['enabled'] should be a boolean.");
xbt_assert(submission_sched_enabled_value.IsBool(), "Invalid JSON configuration: ['job_submission']['enabled'] should be a boolean.");
submission_sched_enabled = submission_sched_enabled_value.GetBool();
}
if (from_sched_object.HasMember("acknowledge"))
{
const Value & submission_sched_ack_value = from_sched_object["acknowledge"];
xbt_assert(submission_sched_ack_value.IsBool(), "Invalid JSON configuration: 'redis'['job_submission']['acknowledge'] should be a boolean.");
xbt_assert(submission_sched_ack_value.IsBool(), "Invalid JSON configuration: ['job_submission']['acknowledge'] should be a boolean.");
submission_sched_ack = submission_sched_ack_value.GetBool();
}
}
}
// Let's write the values into the document to make sure they are all present
// *****************************************************************
// Let's override configuration values from main arguments if needed
// *****************************************************************
if (main_args.redis_hostname != "None")
redis_hostname = main_args.redis_hostname;
if (main_args.redis_port != -1)
redis_port = main_args.redis_port;
if (main_args.redis_prefix != "None")
redis_prefix = main_args.redis_prefix;
// *************************************
// Let's update the BatsimContext values
// *************************************
context->redis_enabled = redis_enabled;
context->submission_forward_profiles = submission_forward_profiles;
context->submission_sched_enabled = submission_sched_enabled;
context->submission_sched_ack = submission_sched_ack;
context->platform_filename = main_args.platform_filename;
context->export_prefix = main_args.export_prefix;
context->workflow_nb_concurrent_jobs_limit = main_args.workflow_nb_concurrent_jobs_limit;
context->energy_used = main_args.energy_used;
context->allow_time_sharing = main_args.allow_time_sharing;
context->trace_schedule = main_args.enable_schedule_tracing;
context->trace_machine_states = main_args.enable_machine_state_tracing;
context->simulation_start_time = chrono::high_resolution_clock::now();
context->terminate_with_last_workflow = main_args.terminate_with_last_workflow;
// *************************************
// Let's update the MainArguments values
// *************************************
main_args.redis_hostname = redis_hostname;
main_args.redis_port = redis_port;
main_args.redis_prefix = redis_prefix;
// *******************************************************************************
// Let's write the output config file (the one that will be sent to the scheduler)
// *******************************************************************************
......@@ -761,6 +814,19 @@ void set_configuration(BatsimContext *context,
if (mit_redis->value.FindMember("enabled") == mit_redis->value.MemberEnd())
mit_redis->value.AddMember("enabled", Value().SetBool(redis_enabled), alloc);
// redis->hostname
if (mit_redis->value.FindMember("hostname") == mit_redis->value.MemberEnd())
mit_redis->value.AddMember("hostname", Value().SetString(redis_hostname.c_str(), alloc), alloc);
// redis->port
if (mit_redis->value.FindMember("port") == mit_redis->value.MemberEnd())
mit_redis->value.AddMember("port", Value().SetInt(redis_port), alloc);
// redis->prefix
if (mit_redis->value.FindMember("prefix") == mit_redis->value.MemberEnd())
mit_redis->value.AddMember("prefix", Value().SetString(redis_prefix.c_str(), alloc), alloc);
// job_submission
auto mit_job_submission = context->config_file.FindMember("job_submission");
if (mit_job_submission == context->config_file.MemberEnd())
......
......@@ -147,4 +147,4 @@ void start_initial_simulation_processes(const MainArguments & main_args,
* @param[in] main_args Batsim arguments
*/
void set_configuration(BatsimContext * context,
const MainArguments & main_args);
MainArguments & main_args);
......@@ -107,8 +107,13 @@ int static_job_submitter_process(int argc, char *argv[])
string job_key = RedisStorage::job_key(job_id);
string profile_key = RedisStorage::profile_key(workload->name, job->profile);
XBT_INFO("IN STATIC JOB SUBMITTER: '%s'", job->json_description.c_str());
context->storage.set(job_key, job->json_description);
context->storage.set(profile_key, workload->profiles->at(job->profile)->json_description);
if (context->redis_enabled)
{
context->storage.set(job_key, job->json_description);
if (context->submission_forward_profiles)
context->storage.set(profile_key, workload->profiles->at(job->profile)->json_description);
}
// Let's now continue the simulation
JobSubmittedMessage * msg = new JobSubmittedMessage;
......@@ -284,7 +289,7 @@ static string submit_workflow_task_as_job(BatsimContext *context, string workflo
// Create JSON description of Job corresponding to Task
double walltime = task->execution_time + 10.0;
string json_description = std::string() + "{" +
string job_json_description = std::string() + "{" +
"\"id\": \"" + workload_name + "!" + std::to_string(job_number) + "\", " +
"\"subtime\":" + std::to_string(MSG_get_clock()) + ", " +
"\"walltime\":" + std::to_string(walltime) + ", " +
......@@ -292,12 +297,24 @@ static string submit_workflow_task_as_job(BatsimContext *context, string workflo
"\"profile\": \"" + profile_name + "\"" +
"}";
// Put the metadata about the job into the data storage
JobIdentifier job_id(workload_name, job_number);
string job_key = RedisStorage::job_key(job_id);
string profile_key = RedisStorage::profile_key(workflow_name, profile_name);
context->storage.set(job_key, json_description);
context->storage.set(profile_key, profile->json_description);
// Puts the job into memory
Job * job = Job::from_json(job_json_description, context->workloads.at(workload_name));
context->workloads.at(workload_name)->jobs->add_job(job);
if (context->redis_enabled)
{
// Put the metadata about the job into the data storage
JobIdentifier job_id(workload_name, job_number);
string job_key = RedisStorage::job_key(job_id);
context->storage.set(job_key, job_json_description);
if (context->submission_forward_profiles)
{
string profile_key = RedisStorage::profile_key(workflow_name, profile_name);
context->storage.set(profile_key, profile->json_description);
}
}
// Submit the job
JobSubmittedMessage * msg = new JobSubmittedMessage;
......
......@@ -239,6 +239,9 @@ Job * Job::from_json(const rapidjson::Value & json_desc, Workload * workload)
// Let's check that the new description is a valid JSON string
rapidjson::Document check_doc;
check_doc.Parse(j->json_description.c_str());
xbt_assert(!check_doc.HasParseError(),
"A problem occured when replacing the job_id by its WLOAD!job_number counterpart:"
"The output string '%s' is not valid JSON.", j->json_description.c_str());
xbt_assert(check_doc.IsObject(),
"A problem occured when replacing the job_id by its WLOAD!job_number counterpart: "
"The output string '%s' is not valid JSON.", j->json_description.c_str());
......@@ -282,6 +285,7 @@ Job * Job::from_json(const std::string & json_str, Workload *workload)
{
Document doc;
doc.Parse(json_str.c_str());
xbt_assert(!doc.HasParseError());
return Job::from_json(doc, workload);
}
......@@ -349,6 +349,7 @@ Profile *Profile::from_json(const std::string & profile_name, const std::string
{
Document doc;
doc.Parse(json_str.c_str());
xbt_assert(!doc.HasParseError());
return Profile::from_json(profile_name, doc, "not_a_real_file");
}
......@@ -13,8 +13,8 @@ using namespace std;
XBT_LOG_NEW_DEFAULT_CATEGORY(protocol, "protocol"); //!< Logging
JsonProtocolWriter::JsonProtocolWriter() :
_alloc(_doc.GetAllocator())
JsonProtocolWriter::JsonProtocolWriter(BatsimContext * context) :
_context(context), _alloc(_doc.GetAllocator())
{
_doc.SetObject();
}
......@@ -163,14 +163,32 @@ void JsonProtocolWriter::append_simulation_ends(double date)
}
void JsonProtocolWriter::append_job_submitted(const string & job_id,
const string & job_json_description,
const string & profile_json_description,
double date)
{
/* {
/* "with_redis": {
"timestamp": 10.0,
"type": "JOB_SUBMITTED",
"data": {
"job_ids": ["w0!1", "w0!2"]
}
},
"without_redis": {
"timestamp": 10.0,
"type": "JOB_SUBMITTED",
"data": {
"job_id": "dyn!my_new_job",
"job": {
"profile": "delay_10s",
"res": 1,
"id": "my_new_job",
"walltime": 12.0
},
"profile":{
"type": "delay",
"delay": 10
}
} */
xbt_assert(date >= _last_date, "Date inconsistency");
......@@ -180,6 +198,24 @@ void JsonProtocolWriter::append_job_submitted(const string & job_id,
Value data(rapidjson::kObjectType);
data.AddMember("job_id", Value().SetString(job_id.c_str(), _alloc), _alloc);
if (!_context->redis_enabled)
{
Document job_description_doc;
job_description_doc.Parse(job_json_description.c_str());
xbt_assert(!job_description_doc.HasParseError());
data.AddMember("job", Value().CopyFrom(job_description_doc, _alloc), _alloc);
if (_context->submission_forward_profiles)
{
Document profile_description_doc;
profile_description_doc.Parse(profile_json_description.c_str());
xbt_assert(!profile_description_doc.HasParseError());
data.AddMember("profile", Value().CopyFrom(profile_description_doc, _alloc), _alloc);
}
}
Value event(rapidjson::kObjectType);
event.AddMember("timestamp", Value().SetDouble(date), _alloc);
event.AddMember("type", Value().SetString("JOB_SUBMITTED"), _alloc);
......@@ -319,49 +355,6 @@ string JsonProtocolWriter::generate_current_message(double date)
return string(buffer.GetString());
}
bool test_json_writer()
{
AbstractProtocolWriter * proto_writer = new JsonProtocolWriter;
printf("EMPTY content:\n%s\n", proto_writer->generate_current_message(0).c_str());
proto_writer->clear();
proto_writer->append_nop(0);
printf("NOP content:\n%s\n", proto_writer->generate_current_message(42).c_str());
proto_writer->clear();
// proto_writer->append_simulation_begins(4, 10);
// printf("SIM_BEGINS content:\n%s\n", proto_writer->generate_current_message(42).c_str());
// proto_writer->clear();
proto_writer->append_simulation_ends(10);
printf("SIM_ENDS content:\n%s\n", proto_writer->generate_current_message(42).c_str());
proto_writer->clear();
proto_writer->append_job_submitted({"w0!j0", "w0!j1"}, 10);
printf("JOB_SUBMITTED content:\n%s\n", proto_writer->generate_current_message(42).c_str());
proto_writer->clear();
proto_writer->append_job_completed("w0!j0", "SUCCESS", 10);
printf("JOB_COMPLETED content:\n%s\n", proto_writer->generate_current_message(42).c_str());
proto_writer->clear();
proto_writer->append_job_killed({"w0!j0", "w0!j1"}, 10);
printf("JOB_KILLED content:\n%s\n", proto_writer->generate_current_message(42).c_str());
proto_writer->clear();
proto_writer->append_resource_state_changed(MachineRange::from_string_hyphen("1,3-5"), "42", 10);
printf("RESOURCE_STATE_CHANGED content:\n%s\n", proto_writer->generate_current_message(42).c_str());
proto_writer->clear();
proto_writer->append_query_reply_energy(65535, 10);
printf("QUERY_REPLY (energy) content:\n%s\n", proto_writer->generate_current_message(42).c_str());
proto_writer->clear();
delete proto_writer;
return true;
}
JsonProtocolReader::JsonProtocolReader(BatsimContext *context) :
......@@ -386,6 +379,7 @@ void JsonProtocolReader::parse_and_apply_message(const string &message)
rapidjson::Document doc;
doc.Parse(message.c_str());
xbt_assert(!doc.HasParseError(), "Invalid JSON message: could not be parsed");
xbt_assert(doc.IsObject(), "Invalid JSON message: not a JSON object");
xbt_assert(doc.HasMember("now"), "Invalid JSON message: no 'now' key");
......
......@@ -187,9 +187,14 @@ public:
/**
* @brief Appends a JOB_SUBMITTED event.
* @param[in] job_id The identifier of the submitted job.
* @param[in] job_json_description The job JSON description (optional if redis is enabled)
* @param[in] profile_json_description The profile JSON description (optional if redis is
* disabled or if profiles are not forwarded)
* @param[in] date The event date. Must be greater than or equal to the previous event.
*/
virtual void append_job_submitted(const std::string & job_id,
const std::string & job_json_description,
const std::string & profile_json_description,
double date) = 0;
/**
......@@ -257,8 +262,9 @@ class JsonProtocolWriter : public AbstractProtocolWriter
public:
/**
* @brief Creates an empty JsonProtocolWriter
* @param[in,out] context The BatsimContext
*/
JsonProtocolWriter();
JsonProtocolWriter(BatsimContext * context);
/**
* @brief Destroys a JsonProtocolWriter
......@@ -388,9 +394,14 @@ public:
/**
* @brief Appends a JOB_SUBMITTED event.
* @param[in] job_id The identifier of the submitted job.
* @param[in] job_json_description The job JSON description (optional if redis is enabled)
* @param[in] profile_json_description The profile JSON description (optional if redis is
* disabled or if profiles are not forwarded)
* @param[in] date The event date. Must be greater than or equal to the previous event.
*/
void append_job_submitted(const std::string & job_id,
const std::string & job_json_description,
const std::string & profile_json_description,
double date);
/**
......@@ -450,6 +461,7 @@ public:
bool is_empty() { return _is_empty; }
private:
BatsimContext * _context; //!< The BatsimContext
bool _is_empty = true; //!< Stores whether events have been pushed into the writer since last clear.
double _last_date = -1; //!< The date of the latest pushed event/message
rapidjson::Document _doc; //!< A rapidjson document
......@@ -459,11 +471,6 @@ private:
};
/**
* @brief Tests whether the JsonProtocolWriter behaves correctly
* @return Whether the JsonProtocolWriter behaves as expected
*/
bool test_json_writer();
/**
* @brief In charge of parsing a protocol message and injecting internal messages into the simulation
......@@ -514,6 +521,7 @@ public:
*/
void parse_and_apply_event(const rapidjson::Value & event_object, int event_number, double now);
/**
* @brief Handles a QUERY_REQUEST event
* @param[in] event_number The event number in [0,nb_events[.
......
......@@ -68,7 +68,6 @@ int server_process(int argc, char *argv[])
MSG_process_create("Scheduler REQ-REP", request_reply_scheduler_process, (void*)req_rep_args, MSG_host_self());
sched_ready = false;
// Simulation loop
while ((nb_submitters == 0) || (nb_submitters_finished < nb_submitters) ||
(nb_completed_jobs < nb_submitted_jobs) || (!sched_ready) ||
......@@ -80,7 +79,7 @@ int server_process(int argc, char *argv[])