protocol.cpp 39.8 KB
Newer Older
1
2
3
4
#include "protocol.hpp"

#include <xbt.h>

5
6
#include <rapidjson/stringbuffer.h>

Millian Poquet's avatar
Millian Poquet committed
7
8
9
10
#include "context.hpp"
#include "jobs.hpp"
#include "network.hpp"

11
12
13
using namespace rapidjson;
using namespace std;

14
15
XBT_LOG_NEW_DEFAULT_CATEGORY(protocol, "protocol"); //!< Logging

16
17
JsonProtocolWriter::JsonProtocolWriter(BatsimContext * context) :
    _context(context), _alloc(_doc.GetAllocator())
18
19
20
21
22
23
24
25
26
{
    _doc.SetObject();
}

JsonProtocolWriter::~JsonProtocolWriter()
{

}

27
void JsonProtocolWriter::append_requested_call(double date)
28
{
29
30
31
32
33
34
    /* {
      "timestamp": 25.5,
      "type": "REQUESTED_CALL",
      "data": {}
    } */

35
    xbt_assert(date >= _last_date, "Date inconsistency");
36
37
38
39
40
41
42
43
44
    _last_date = date;
    _is_empty = false;

    Value event(rapidjson::kObjectType);
    event.AddMember("timestamp", Value().SetDouble(date), _alloc);
    event.AddMember("type", Value().SetString("REQUESTED_CALL"), _alloc);
    event.AddMember("data", Value().SetObject(), _alloc);

    _events.PushBack(event, _alloc);
45
46
}

47
void JsonProtocolWriter::append_simulation_begins(Machines & machines,
48
                                                  const Document & configuration,
49
                                                  bool allow_time_sharing,
50
                                                  double date)
51
{
52
53
    /* {
      "timestamp": 0.0,
54
      "type": "SIMULATION_BEGINS",
55
56
57
58
59
      "data": {}
    } */

    xbt_assert(date >= _last_date, "Date inconsistency");
    _last_date = date;
Millian Poquet's avatar
Millian Poquet committed
60
    _is_empty = false;
61

62
63
64
    Value config(rapidjson::kObjectType);
    config.CopyFrom(configuration, _alloc);

65
    Value data(rapidjson::kObjectType);
66
    data.AddMember("nb_resources", Value().SetInt(machines.nb_machines()), _alloc);
67
    data.AddMember("allow_time_sharing", Value().SetBool(allow_time_sharing), _alloc);
68
    data.AddMember("config", config, _alloc);
69

70
71
72
73
    Value resources(rapidjson::kArrayType);
    resources.Reserve(machines.nb_machines(), _alloc);
    for (const Machine * machine : machines.machines())
    {
74
        resources.PushBack(machine_to_json_value(*machine), _alloc);
75
76
77
    }
    data.AddMember("resources_data", Value().CopyFrom(resources, _alloc), _alloc);

78
    if (machines.has_hpst_machine()) {
79
        data.AddMember("hpst_host", machine_to_json_value(*machines.hpst_machine()), _alloc);
80
81
82
83
84
85
    }

    if (machines.has_pfs_machine()) {
        data.AddMember("lcst_host", machine_to_json_value(*machines.pfs_machine()), _alloc);
    }

86
87
    Value event(rapidjson::kObjectType);
    event.AddMember("timestamp", Value().SetDouble(date), _alloc);
88
    event.AddMember("type", Value().SetString("SIMULATION_BEGINS"), _alloc);
89
    event.AddMember("data", data, _alloc);
90
91

    _events.PushBack(event, _alloc);
92
93
}

94
95
96
97
98
99
100
Value JsonProtocolWriter::machine_to_json_value(const Machine & machine)
{
    Value machine_doc(rapidjson::kObjectType);
    machine_doc.AddMember("id", Value().SetInt(machine.id), _alloc);
    machine_doc.AddMember("name", Value().SetString(machine.name.c_str(), _alloc), _alloc);
    machine_doc.AddMember("state", Value().SetString(machine_state_to_string(machine.state).c_str(), _alloc), _alloc);

101
102
103
104
105
106
107
108
    Value properties(rapidjson::kObjectType);
    for(auto const &entry : machine.properties) {
        rapidjson::Value key(entry.first.c_str(), _alloc);
        rapidjson::Value value(entry.second.c_str(), _alloc);
        properties.AddMember(key, value, _alloc);
    }
    machine_doc.AddMember("properties", properties, _alloc);

109
110
111
    return machine_doc;
}

112
113
void JsonProtocolWriter::append_simulation_ends(double date)
{
114
115
116
117
118
119
120
121
    /* {
      "timestamp": 0.0,
      "type": "SIMULATION_ENDS",
      "data": {}
    } */

    xbt_assert(date >= _last_date, "Date inconsistency");
    _last_date = date;
Millian Poquet's avatar
Millian Poquet committed
122
    _is_empty = false;
123
124
125
126
127
128
129

    Value event(rapidjson::kObjectType);
    event.AddMember("timestamp", Value().SetDouble(date), _alloc);
    event.AddMember("type", Value().SetString("SIMULATION_ENDS"), _alloc);
    event.AddMember("data", Value().SetObject(), _alloc);

    _events.PushBack(event, _alloc);
130
131
}

Millian Poquet's avatar
Millian Poquet committed
132
void JsonProtocolWriter::append_job_submitted(const string & job_id,
133
134
                                              const string & job_json_description,
                                              const string & profile_json_description,
135
136
                                              double date)
{
137
    /* "with_redis": {
138
139
140
141
142
      "timestamp": 10.0,
      "type": "JOB_SUBMITTED",
      "data": {
        "job_ids": ["w0!1", "w0!2"]
      }
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
    },
    "without_redis": {
      "timestamp": 10.0,
      "type": "JOB_SUBMITTED",
      "data": {
        "job_id": "dyn!my_new_job",
        "job": {
          "profile": "delay_10s",
          "res": 1,
          "id": "my_new_job",
          "walltime": 12.0
        },
        "profile":{
          "type": "delay",
          "delay": 10
        }
159
160
161
162
    } */

    xbt_assert(date >= _last_date, "Date inconsistency");
    _last_date = date;
Millian Poquet's avatar
Millian Poquet committed
163
164
165
166
    _is_empty = false;

    Value data(rapidjson::kObjectType);
    data.AddMember("job_id", Value().SetString(job_id.c_str(), _alloc), _alloc);
167

168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
    if (!_context->redis_enabled)
    {
        Document job_description_doc;
        job_description_doc.Parse(job_json_description.c_str());
        xbt_assert(!job_description_doc.HasParseError());

        data.AddMember("job", Value().CopyFrom(job_description_doc, _alloc), _alloc);

        if (_context->submission_forward_profiles)
        {
            Document profile_description_doc;
            profile_description_doc.Parse(profile_json_description.c_str());
            xbt_assert(!profile_description_doc.HasParseError());

            data.AddMember("profile", Value().CopyFrom(profile_description_doc, _alloc), _alloc);
        }
    }

186
187
188
    Value event(rapidjson::kObjectType);
    event.AddMember("timestamp", Value().SetDouble(date), _alloc);
    event.AddMember("type", Value().SetString("JOB_SUBMITTED"), _alloc);
Millian Poquet's avatar
Millian Poquet committed
189
    event.AddMember("data", data, _alloc);
190
191

    _events.PushBack(event, _alloc);
192
193
}

194
195
void JsonProtocolWriter::append_job_completed(const string & job_id,
                                              const string & job_status,
196
197
                                              const string & job_state,
                                              const string & kill_reason,
198
199
                                              double date)
{
200
201
202
203
204
205
206
    /* {
      "timestamp": 10.0,
      "type": "JOB_COMPLETED",
      "data": {"job_id": "w0!1", "status": "SUCCESS"}
    } */

    xbt_assert(date >= _last_date, "Date inconsistency");
207
    xbt_assert(std::find(accepted_completion_statuses.begin(), accepted_completion_statuses.end(), job_status) != accepted_completion_statuses.end(),
208
209
               "Unsupported job status '%s'!", job_status.c_str());
    _last_date = date;
Millian Poquet's avatar
Millian Poquet committed
210
    _is_empty = false;
211
212
213
214

    Value data(rapidjson::kObjectType);
    data.AddMember("job_id", Value().SetString(job_id.c_str(), _alloc), _alloc);
    data.AddMember("status", Value().SetString(job_status.c_str(), _alloc), _alloc);
215
216
    data.AddMember("job_state", Value().SetString(job_state.c_str(), _alloc), _alloc);
    data.AddMember("kill_reason", Value().SetString(kill_reason.c_str(), _alloc), _alloc);
217
218
219
220
221
222
223

    Value event(rapidjson::kObjectType);
    event.AddMember("timestamp", Value().SetDouble(date), _alloc);
    event.AddMember("type", Value().SetString("JOB_COMPLETED"), _alloc);
    event.AddMember("data", data, _alloc);

    _events.PushBack(event, _alloc);
224
225
}

226
void JsonProtocolWriter::append_job_killed(const vector<string> & job_ids,
227
228
                                           double date)
{
229
230
231
232
233
234
235
236
    /* {
      "timestamp": 10.0,
      "type": "JOB_KILLED",
      "data": {"job_ids": ["w0!1", "w0!2"]}
    } */

    xbt_assert(date >= _last_date, "Date inconsistency");
    _last_date = date;
Millian Poquet's avatar
Millian Poquet committed
237
    _is_empty = false;
238
239
240
241
242
243
244
245

    Value event(rapidjson::kObjectType);
    event.AddMember("timestamp", Value().SetDouble(date), _alloc);
    event.AddMember("type", Value().SetString("JOB_KILLED"), _alloc);

    Value jobs(rapidjson::kArrayType);
    jobs.Reserve(job_ids.size(), _alloc);
    for (const string & job_id : job_ids)
246
    {
247
        jobs.PushBack(Value().SetString(job_id.c_str(), _alloc), _alloc);
248
    }
249
250
251
252

    event.AddMember("data", Value().SetObject().AddMember("job_ids", jobs, _alloc), _alloc);

    _events.PushBack(event, _alloc);
253
254
}

255
256
void JsonProtocolWriter::append_resource_state_changed(const MachineRange & resources,
                                                       const string & new_state,
257
258
                                                       double date)
{
259
260
261
262
263
264
265
266
    /* {
      "timestamp": 10.0,
      "type": "RESOURCE_STATE_CHANGED",
      "data": {"resources": "1 2 3-5", "state": "42"}
    } */

    xbt_assert(date >= _last_date, "Date inconsistency");
    _last_date = date;
Millian Poquet's avatar
Millian Poquet committed
267
    _is_empty = false;
268
269
270
271
272
273
274
275
276
277
278
279

    Value data(rapidjson::kObjectType);
    data.AddMember("resources",
                   Value().SetString(resources.to_string_hyphen(" ", "-").c_str(), _alloc), _alloc);
    data.AddMember("state", Value().SetString(new_state.c_str(), _alloc), _alloc);

    Value event(rapidjson::kObjectType);
    event.AddMember("timestamp", Value().SetDouble(date), _alloc);
    event.AddMember("type", Value().SetString("RESOURCE_STATE_CHANGED"), _alloc);
    event.AddMember("data", data, _alloc);

    _events.PushBack(event, _alloc);
280
281
}

282
283
void JsonProtocolWriter::append_query_reply_energy(double consumed_energy,
                                                   double date)
284
{
285
286
287
288
289
290
291
292
    /* {
      "timestamp": 10.0,
      "type": "QUERY_REPLY",
      "data": {"energy_consumed": "12500" }
    } */

    xbt_assert(date >= _last_date, "Date inconsistency");
    _last_date = date;
Millian Poquet's avatar
Millian Poquet committed
293
    _is_empty = false;
294
295
296
297
298
299
300

    Value event(rapidjson::kObjectType);
    event.AddMember("timestamp", Value().SetDouble(date), _alloc);
    event.AddMember("type", Value().SetString("QUERY_REPLY"), _alloc);
    event.AddMember("data", Value().SetObject().AddMember("energy_consumed", Value().SetDouble(consumed_energy), _alloc), _alloc);

    _events.PushBack(event, _alloc);
301
302
303
304
}

void JsonProtocolWriter::clear()
{
305
306
307
308
    _is_empty = true;

    _doc.RemoveAllMembers();
    _events.SetArray();
309
310
}

311
string JsonProtocolWriter::generate_current_message(double date)
312
{
313
314
315
316
317
318
319
320
321
322
323
    xbt_assert(date >= _last_date, "Date inconsistency");
    xbt_assert(_events.IsArray(),
               "Successive calls to JsonProtocolWriter::generate_current_message without calling "
               "the clear() method is not supported");

    // Generating the content
    _doc.AddMember("now", Value().SetDouble(date), _alloc);
    _doc.AddMember("events", _events, _alloc);

    // Dumping the content to a buffer
    StringBuffer buffer;
324
    ::Writer<rapidjson::StringBuffer> writer(buffer);
325
326
327
    _doc.Accept(writer);

    // Returning the buffer as a string
328
    return string(buffer.GetString(), buffer.GetSize());
329
330
}

Millian Poquet's avatar
Millian Poquet committed
331
332


333
334
JsonProtocolReader::JsonProtocolReader(BatsimContext *context) :
    context(context)
Millian Poquet's avatar
Millian Poquet committed
335
{
336
337
338
    _type_to_handler_map["QUERY_REQUEST"] = &JsonProtocolReader::handle_query_request;
    _type_to_handler_map["REJECT_JOB"] = &JsonProtocolReader::handle_reject_job;
    _type_to_handler_map["EXECUTE_JOB"] = &JsonProtocolReader::handle_execute_job;
339
    _type_to_handler_map["CHANGE_JOB_STATE"] = &JsonProtocolReader::handle_change_job_state;
340
341
342
343
344
345
    _type_to_handler_map["CALL_ME_LATER"] = &JsonProtocolReader::handle_call_me_later;
    _type_to_handler_map["KILL_JOB"] = &JsonProtocolReader::handle_kill_job;
    _type_to_handler_map["SUBMIT_JOB"] = &JsonProtocolReader::handle_submit_job;
    _type_to_handler_map["SET_RESOURCE_STATE"] = &JsonProtocolReader::handle_set_resource_state;
    _type_to_handler_map["NOTIFY"] = &JsonProtocolReader::handle_notify;
}
Millian Poquet's avatar
Millian Poquet committed
346

347
348
JsonProtocolReader::~JsonProtocolReader()
{
Millian Poquet's avatar
Millian Poquet committed
349
350
351
352
353
354
355
}

void JsonProtocolReader::parse_and_apply_message(const string &message)
{
    rapidjson::Document doc;
    doc.Parse(message.c_str());

356
    xbt_assert(!doc.HasParseError(), "Invalid JSON message: could not be parsed");
Millian Poquet's avatar
Millian Poquet committed
357
358
359
    xbt_assert(doc.IsObject(), "Invalid JSON message: not a JSON object");

    xbt_assert(doc.HasMember("now"), "Invalid JSON message: no 'now' key");
360
    xbt_assert(doc["now"].IsNumber(), "Invalid JSON message: 'now' value should be a number.");
Millian Poquet's avatar
Millian Poquet committed
361
362
363
364
365
366
367
368
369
370
371
    double now = doc["now"].GetDouble();

    xbt_assert(doc.HasMember("events"), "Invalid JSON message: no 'events' key");
    const auto & events_array = doc["events"];
    xbt_assert(events_array.IsArray(), "Invalid JSON message: 'events' value should be an array.");

    for (unsigned int i = 0; i < events_array.Size(); ++i)
    {
        const auto & event_object = events_array[i];
        parse_and_apply_event(event_object, i, now);
    }
Millian Poquet's avatar
Millian Poquet committed
372

Millian Poquet's avatar
Millian Poquet committed
373
    send_message(now, "server", IPMessageType::SCHED_READY);
Millian Poquet's avatar
Millian Poquet committed
374
375
376
377
378
379
380
381
382
}

void JsonProtocolReader::parse_and_apply_event(const Value & event_object,
                                               int event_number,
                                               double now)
{
    xbt_assert(event_object.IsObject(), "Invalid JSON message: event %d should be an object.", event_number);

    xbt_assert(event_object.HasMember("timestamp"), "Invalid JSON message: event %d should have a 'timestamp' key.", event_number);
383
    xbt_assert(event_object["timestamp"].IsNumber(), "Invalid JSON message: timestamp of event %d should be a number", event_number);
Millian Poquet's avatar
Millian Poquet committed
384
385
    double timestamp = event_object["timestamp"].GetDouble();
    xbt_assert(timestamp <= now, "Invalid JSON message: timestamp %g of event %d should be lower than or equal to now=%g.", timestamp, event_number, now);
Millian Poquet's avatar
Millian Poquet committed
386
    (void) now; // Avoids a warning if assertions are ignored
Millian Poquet's avatar
Millian Poquet committed
387
388
389
390
391
392
393
394
395

    xbt_assert(event_object.HasMember("type"), "Invalid JSON message: event %d should have a 'type' key.", event_number);
    xbt_assert(event_object["type"].IsString(), "Invalid JSON message: event %d 'type' value should be a String", event_number);
    string type = event_object["type"].GetString();
    xbt_assert(_type_to_handler_map.find(type) != _type_to_handler_map.end(), "Invalid JSON message: event %d has an unknown 'type' value '%s'", event_number, type.c_str());

    xbt_assert(event_object.HasMember("data"), "Invalid JSON message: event %d should have a 'data' key.", event_number);
    const Value & data_object = event_object["data"];

396
397
    auto handler_function = _type_to_handler_map[type];
    handler_function(this, event_number, timestamp, data_object);
Millian Poquet's avatar
Millian Poquet committed
398
399
400
401
}

void JsonProtocolReader::handle_query_request(int event_number, double timestamp, const Value &data_object)
{
Millian Poquet's avatar
Millian Poquet committed
402
    (void) event_number; // Avoids a warning if assertions are ignored
Millian Poquet's avatar
Millian Poquet committed
403
404
405
406
407
408
409
410
    /* {
      "timestamp": 10.0,
      "type": "QUERY_REQUEST",
      "data": {
        "requests": {"consumed_energy": {}}
      }
    } */

Millian Poquet's avatar
Millian Poquet committed
411
412
413
414
415
    xbt_assert(data_object.IsObject(), "Invalid JSON message: the 'data' value of event %d (QUERY_REQUEST) should be an object", event_number);
    xbt_assert(data_object.MemberCount() > 0, "Invalid JSON message: the 'data' value of event %d (QUERY_REQUEST) cannot be empty (size=%d)", event_number, (int)data_object.MemberCount());

    for (auto it = data_object.MemberBegin(); it != data_object.MemberEnd(); ++it)
    {
Millian Poquet's avatar
Millian Poquet committed
416
417
        const Value & key_value = it->name;
        const Value & value_object = it->value;
Millian Poquet's avatar
Millian Poquet committed
418
        (void) value_object; // Avoids a warning if assertions are ignored
Millian Poquet's avatar
Millian Poquet committed
419
420
421
422
423
424
425
426
427
428

        xbt_assert(key_value.IsString(), "Invalid JSON message: a key within the 'data' object of event %d (QUERY_REQUEST) is not a string", event_number);
        string key = key_value.GetString();
        xbt_assert(std::find(accepted_requests.begin(), accepted_requests.end(), key) != accepted_requests.end(), "Invalid JSON message: Unknown QUERY_REQUEST '%s' of event %d", key.c_str(), event_number);

        xbt_assert(value_object.IsObject(), "Invalid JSON message: the value of '%s' inside 'data' object of event %d (QUERY_REQUEST) is not an object", key.c_str(), event_number);

        if (key == "consumed_energy")
        {
            xbt_assert(value_object.ObjectEmpty(), "Invalid JSON message: the value of '%s' inside 'data' object of event %d (QUERY_REQUEST) should be empty", key.c_str(), event_number);
Millian Poquet's avatar
Millian Poquet committed
429
            send_message(timestamp, "server", IPMessageType::SCHED_TELL_ME_ENERGY);
Millian Poquet's avatar
Millian Poquet committed
430
431
432
433
        }
    }
}

Millian Poquet's avatar
Millian Poquet committed
434
435
436
void JsonProtocolReader::handle_reject_job(int event_number,
                                           double timestamp,
                                           const Value &data_object)
Millian Poquet's avatar
Millian Poquet committed
437
{
Millian Poquet's avatar
Millian Poquet committed
438
    (void) event_number; // Avoids a warning if assertions are ignored
Millian Poquet's avatar
Millian Poquet committed
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
    /* {
      "timestamp": 10.0,
      "type": "REJECT_JOB",
      "data": { "job_id": "w12!45" }
    } */

    xbt_assert(data_object.IsObject(), "Invalid JSON message: the 'data' value of event %d (REJECT_JOB) should be an object", event_number);
    xbt_assert(data_object.MemberCount() == 1, "Invalid JSON message: the 'data' value of event %d (REJECT_JOB) should be of size 1 (size=%d)", event_number, (int)data_object.MemberCount());

    xbt_assert(data_object.HasMember("job_id"), "Invalid JSON message: the 'data' value of event %d (REJECT_JOB) should contain a 'job_id' key.", event_number);
    const Value & job_id_value = data_object["job_id"];
    xbt_assert(job_id_value.IsString(), "Invalid JSON message: the 'job_id' value in the 'data' value of event %d (REJECT_JOB) should be a string.", event_number);
    string job_id = job_id_value.GetString();

    JobRejectedMessage * message = new JobRejectedMessage;
    if (!identify_job_from_string(context, job_id, message->job_id))
    {
456
457
        xbt_assert(false, "Invalid JSON message: "
                          "Invalid job rejection received: The job identifier '%s' is not valid. "
Millian Poquet's avatar
Millian Poquet committed
458
459
460
461
462
463
                          "Job identifiers must be of the form [WORKLOAD_NAME!]JOB_ID. "
                          "If WORKLOAD_NAME! is omitted, WORKLOAD_NAME='static' is used. "
                          "Furthermore, the corresponding job must exist.", job_id.c_str());
    }

    Job * job = context->workloads.job_at(message->job_id);
Millian Poquet's avatar
Millian Poquet committed
464
    (void) job; // Avoids a warning if assertions are ignored
Millian Poquet's avatar
Millian Poquet committed
465
    xbt_assert(job->state == JobState::JOB_STATE_SUBMITTED,
466
               "Invalid JSON message: "
Millian Poquet's avatar
Millian Poquet committed
467
468
469
470
               "Invalid rejection received: job %s cannot be rejected at the present time."
               "For being rejected, a job must be submitted and not allocated yet.",
               job->id.c_str());

471
    send_message(timestamp, "server", IPMessageType::SCHED_REJECT_JOB, (void*) message);
Millian Poquet's avatar
Millian Poquet committed
472
473
}

Millian Poquet's avatar
Millian Poquet committed
474
475
476
void JsonProtocolReader::handle_execute_job(int event_number,
                                            double timestamp,
                                            const Value &data_object)
477
{
Millian Poquet's avatar
Millian Poquet committed
478
    (void) event_number; // Avoids a warning if assertions are ignored
479
480
481
482
483
484
485
486
487
488
    /* {
      "timestamp": 10.0,
      "type": "EXECUTE_JOB",
      "data": {
        "job_id": "w12!45",
        "alloc": "2-3",
        "mapping": {"0": "0", "1": "0", "2": "1", "3": "1"}
      }
    } */

489
    ExecuteJobMessage * message = new ExecuteJobMessage;
490
    message->allocation = new SchedulingAllocation;
491
492

    xbt_assert(data_object.IsObject(), "Invalid JSON message: the 'data' value of event %d (EXECUTE_JOB) should be an object", event_number);
493
    xbt_assert(data_object.MemberCount() == 2 || data_object.MemberCount() == 3, "Invalid JSON message: the 'data' value of event %d (EXECUTE_JOB) should be of size in {2,3} (size=%d)", event_number, (int)data_object.MemberCount());
494
495
496
497
498
499
500
501
502
503
504

    // *************************
    // Job identifier management
    // *************************
    // Let's read it from the JSON message
    xbt_assert(data_object.HasMember("job_id"), "Invalid JSON message: the 'data' value of event %d (EXECUTE_JOB) should contain a 'job_id' key.", event_number);
    const Value & job_id_value = data_object["job_id"];
    xbt_assert(job_id_value.IsString(), "Invalid JSON message: the 'job_id' value in the 'data' value of event %d (EXECUTE_JOB) should be a string.", event_number);
    string job_id = job_id_value.GetString();

    // Let's retrieve the job identifier
505
506
    if (!identify_job_from_string(context, job_id, message->allocation->job_id,
                                  IdentifyJobReturnCondition::STRING_VALID))
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
    {
        xbt_assert(false, "Invalid JSON message: in event %d (EXECUTE_JOB): "
                          "The job identifier '%s' is not valid. "
                          "Job identifiers must be of the form [WORKLOAD_NAME!]JOB_ID. "
                          "If WORKLOAD_NAME! is omitted, WORKLOAD_NAME='static' is used. "
                          "Furthermore, the corresponding job must exist.",
                   event_number, job_id.c_str());
    }

    // *********************
    // Allocation management
    // *********************
    // Let's read it from the JSON message
    xbt_assert(data_object.HasMember("alloc"), "Invalid JSON message: the 'data' value of event %d (EXECUTE_JOB) should contain a 'alloc' key.", event_number);
    const Value & alloc_value = data_object["alloc"];
    xbt_assert(alloc_value.IsString(), "Invalid JSON message: the 'alloc' value in the 'data' value of event %d (EXECUTE_JOB) should be a string.", event_number);
    string alloc = alloc_value.GetString();

525
    message->allocation->machine_ids = MachineRange::from_string_hyphen(alloc, " ", "-", "Invalid JSON message received from the scheduler");
526
    int nb_allocated_resources = message->allocation->machine_ids.size();
Millian Poquet's avatar
Millian Poquet committed
527
    (void) nb_allocated_resources; // Avoids a warning if assertions are ignored
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
    xbt_assert(nb_allocated_resources > 0, "Invalid JSON message: in event %d (EXECUTE_JOB): the number of allocated resources should be strictly positive (got %d).", event_number, nb_allocated_resources);

    // *****************************
    // Mapping management (optional)
    // *****************************
    if (data_object.HasMember("mapping"))
    {
        const Value & mapping_value = data_object["mapping"];
        xbt_assert(mapping_value.IsObject(), "Invalid JSON message: the 'mapping' value in the 'data' value of event %d (EXECUTE_JOB) should be a string.", event_number);
        xbt_assert(mapping_value.MemberCount() > 0, "Invalid JSON: the 'mapping' value in the 'data' value of event %d (EXECUTE_JOB) must be a non-empty object", event_number);
        map<int,int> mapping_map;

        // Let's fill the map from the JSON description
        for (auto it = mapping_value.MemberBegin(); it != mapping_value.MemberEnd(); ++it)
        {
            const Value & key_value = it->name;
            const Value & value_value = it->value;

            xbt_assert(key_value.IsInt() || key_value.IsString(), "Invalid JSON message: Invalid 'mapping' of event %d (EXECUTE_JOB): a key is not an integer nor a string", event_number);
            xbt_assert(value_value.IsInt() || value_value.IsString(), "Invalid JSON message: Invalid 'mapping' of event %d (EXECUTE_JOB): a value is not an integer nor a string", event_number);

            int executor;
            int resource;

            try
            {
                if (key_value.IsInt())
555
                {
556
                    executor = key_value.GetInt();
557
                }
558
                else
559
                {
560
                    executor = std::stoi(key_value.GetString());
561
                }
562
563

                if (value_value.IsInt())
564
                {
565
                    resource = value_value.GetInt();
566
                }
567
                else
568
                {
569
                    resource = std::stoi(value_value.GetString());
570
                }
571
            }
572
            catch (const std::exception &)
573
574
            {
                xbt_assert(false, "Invalid JSON message: Invalid 'mapping' object of event %d (EXECUTE_JOB): all keys and values must be integers (or strings representing integers)", event_number);
575
                throw;
576
577
578
579
580
581
            }

            mapping_map[executor] = resource;
        }

        // Let's write the mapping as a vector (keys will be implicit between 0 and nb_executor-1)
582
        message->allocation->mapping.reserve(mapping_map.size());
583
584
585
586
        auto mit = mapping_map.begin();
        int nb_inserted = 0;

        xbt_assert(mit->first == nb_inserted, "Invalid JSON message: Invalid 'mapping' object of event %d (EXECUTE_JOB): no resource associated to executor %d.", event_number, nb_inserted);
587
        xbt_assert(mit->second >= 0 && mit->second < nb_allocated_resources, "Invalid JSON message: Invalid 'mapping' object of event %d (EXECUTE_JOB): executor %d should use the %d-th resource within the allocation, but there are only %d allocated resources.", event_number, mit->first, mit->second, nb_allocated_resources);
588
        message->allocation->mapping.push_back(mit->second);
589
590
591
592

        for (++mit, ++nb_inserted; mit != mapping_map.end(); ++mit, ++nb_inserted)
        {
            xbt_assert(mit->first == nb_inserted, "Invalid JSON message: Invalid 'mapping' object of event %d (EXECUTE_JOB): no resource associated to executor %d.", event_number, nb_inserted);
593
            xbt_assert(mit->second >= 0 && mit->second < nb_allocated_resources, "Invalid JSON message: Invalid 'mapping' object of event %d (EXECUTE_JOB): executor %d should use the %d-th resource within the allocation, but there are only %d allocated resources.", event_number, mit->first, mit->second, nb_allocated_resources);
594
            message->allocation->mapping.push_back(mit->second);
595
596
        }

597
        xbt_assert(message->allocation->mapping.size() == mapping_map.size());
598
    }
599
600
601
602
603
    else
    {
        // Default mapping
        message->allocation->mapping.resize(nb_allocated_resources);
        for (int i = 0; i < nb_allocated_resources; ++i)
Millian Poquet's avatar
Millian Poquet committed
604
        {
605
            message->allocation->mapping[i] = i;
Millian Poquet's avatar
Millian Poquet committed
606
        }
607
    }
608
609

    // Everything has been parsed correctly, let's inject the message into the simulation.
610
    send_message(timestamp, "server", IPMessageType::SCHED_EXECUTE_JOB, (void*) message);
611
612
}

Millian Poquet's avatar
Millian Poquet committed
613
614
615
void JsonProtocolReader::handle_call_me_later(int event_number,
                                              double timestamp,
                                              const Value &data_object)
616
617
618
619
620
621
622
{
    /* {
      "timestamp": 10.0,
      "type": "CALL_ME_LATER",
      "data": {"timestamp": 25.5}
    } */

Millian Poquet's avatar
Millian Poquet committed
623
    CallMeLaterMessage * message = new CallMeLaterMessage;
624
625
626
627
628
629

    xbt_assert(data_object.IsObject(), "Invalid JSON message: the 'data' value of event %d (CALL_ME_LATER) should be an object", event_number);
    xbt_assert(data_object.MemberCount() == 1, "Invalid JSON message: the 'data' value of event %d (CALL_ME_LATER) should be of size 1 (size=%d)", event_number, (int)data_object.MemberCount());

    xbt_assert(data_object.HasMember("timestamp"), "Invalid JSON message: the 'data' value of event %d (CALL_ME_LATER) should contain a 'timestamp' key.", event_number);
    const Value & timestamp_value = data_object["timestamp"];
630
    xbt_assert(timestamp_value.IsNumber(), "Invalid JSON message: the 'timestamp' value in the 'data' value of event %d (CALL_ME_LATER) should be a number.", event_number);
631
632
633
    message->target_time = timestamp_value.GetDouble();

    if (message->target_time < MSG_get_clock())
634
    {
635
        XBT_WARN("Event %d (CALL_ME_LATER) asks to be called at time %g but it is already reached", event_number, message->target_time);
636
    }
637

Millian Poquet's avatar
Millian Poquet committed
638
    send_message(timestamp, "server", IPMessageType::SCHED_CALL_ME_LATER, (void*) message);
639
640
}

Millian Poquet's avatar
Millian Poquet committed
641
642
643
void JsonProtocolReader::handle_set_resource_state(int event_number,
                                                   double timestamp,
                                                   const Value &data_object)
644
{
Millian Poquet's avatar
Millian Poquet committed
645
    (void) event_number; // Avoids a warning if assertions are ignored
646
647
648
649
650
651
652
653
654
655
656
    /* {
      "timestamp": 10.0,
      "type": "SET_RESOURCE_STATE",
      "data": {"resources": "1 2 3-5", "state": "42"}
    } */
    PStateModificationMessage * message = new PStateModificationMessage;

    // ********************
    // Resources management
    // ********************
    // Let's read it from the JSON message
657
658
659
    xbt_assert(data_object.IsObject(), "Invalid JSON message: the 'data' value of event %d (SET_RESOURCE_STATE) should be an object", event_number);
    xbt_assert(data_object.MemberCount() == 2, "Invalid JSON message: the 'data' value of event %d (SET_RESOURCE_STATE) should be of size 2 (size=%d)", event_number, (int)data_object.MemberCount());

660
661
662
663
664
    xbt_assert(data_object.HasMember("resources"), "Invalid JSON message: the 'data' value of event %d (SET_RESOURCE_STATE) should contain a 'resources' key.", event_number);
    const Value & resources_value = data_object["resources"];
    xbt_assert(resources_value.IsString(), "Invalid JSON message: the 'resources' value in the 'data' value of event %d (SET_RESOURCE_STATE) should be a string.", event_number);
    string resources = resources_value.GetString();

665
    message->machine_ids = MachineRange::from_string_hyphen(resources, " ", "-", "Invalid JSON message received from the scheduler");
666
    int nb_allocated_resources = message->machine_ids.size();
Millian Poquet's avatar
Millian Poquet committed
667
    (void) nb_allocated_resources; // Avoids a warning if assertions are ignored
668
669
670
671
672
673
674
    xbt_assert(nb_allocated_resources > 0, "Invalid JSON message: in event %d (SET_RESOURCE_STATE): the number of allocated resources should be strictly positive (got %d).", event_number, nb_allocated_resources);

    // State management
    xbt_assert(data_object.HasMember("state"), "Invalid JSON message: the 'data' value of event %d (SET_RESOURCE_STATE) should contain a 'state' key.", event_number);
    const Value & state_value = data_object["state"];
    xbt_assert(state_value.IsString(), "Invalid JSON message: the 'state' value in the 'data' value of event %d (SET_RESOURCE_STATE) should be a string.", event_number);
    string state_value_string = state_value.GetString();
675
676
677
678
    try
    {
        message->new_pstate = std::stoi(state_value_string);
    }
679
    catch(const std::exception &)
680
681
    {
        xbt_assert(false, "Invalid JSON message: the 'state' value in the 'data' value of event %d (SET_RESOURCE_STATE) should be a string corresponding to an integer (got '%s')", event_number, state_value_string.c_str());
682
        throw;
683
684
    }

Millian Poquet's avatar
Millian Poquet committed
685
    send_message(timestamp, "server", IPMessageType::PSTATE_MODIFICATION, (void*) message);
686
687
}

688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
void JsonProtocolReader::handle_change_job_state(int event_number,
                                       double timestamp,
                                       const Value &data_object)
{
    (void) event_number; // Avoids a warning if assertions are ignored
    /* {
      "timestamp": 42.0,
      "type": "CHANGE_JOB_STATE",
      "data": {
            "job_id": "w12!45",
            "job_state": "COMPLETED_KILLED",
            "kill_reason": "Sub-jobs were killed."
      }
    } */

    xbt_assert(data_object.IsObject(), "Invalid JSON message: the 'data' value of event %d (CHANGE_JOB_STATE) should be an object", event_number);

    xbt_assert(data_object.HasMember("job_id"), "Invalid JSON message: the 'data' value of event %d (CHANGE_JOB_STATE) should have a 'job_id' key", event_number);
    const Value & job_id_value = data_object["job_id"];
    xbt_assert(job_id_value.IsString(), "Invalid JSON message: in event %d (CHANGE_JOB_STATE): ['data']['job_id'] should be a string", event_number);
    string job_id = job_id_value.GetString();

    xbt_assert(data_object.HasMember("job_state"), "Invalid JSON message: the 'data' value of event %d (CHANGE_JOB_STATE) should have a 'job_state' key", event_number);
    const Value & job_state_value = data_object["job_state"];
    xbt_assert(job_state_value.IsString(), "Invalid JSON message: in event %d (CHANGE_JOB_STATE): ['data']['job_state'] should be a string", event_number);
    string job_state = job_state_value.GetString();

    if (job_state != "NOT_SUBMITTED"
            && job_state != "SUBMITTED"
            && job_state != "RUNNING"
            && job_state != "COMPLETED_SUCCESSFULLY"
            && job_state != "COMPLETED_KILLED"
            && job_state != "REJECTED") {
        xbt_assert(false, "Invalid JSON message: in event %d (CHANGE_JOB_STATE): ['data']['job_state'] must be one of: NOT_SUBMITTED, SUBMITTED, RUNNING, COMPLETED_SUCCESSFULLY, COMPLETED_KILLED, REJECTED", event_number);
    }

    string kill_reason;
    if (data_object.HasMember("kill_reason")) {
        const Value & kill_reason_value = data_object["kill_reason"];
        xbt_assert(kill_reason_value.IsString(), "Invalid JSON message: in event %d (CHANGE_kill_reason): ['data']['kill_reason'] should be a string", event_number);
        kill_reason = kill_reason_value.GetString();

        if (kill_reason != "" && job_state != "COMPLETED_KILLED") {
            xbt_assert(false, "Invalid JSON message: in event %d (CHANGE_JOB_STATE): ['data']['kill_reason'] is only allowed if the job_state is COMPLETED_KILLED", event_number);
        }
    }

    ChangeJobStateMessage * message = new ChangeJobStateMessage;

    if (!identify_job_from_string(context, job_id, message->job_id))
    {
        xbt_assert(false, "Invalid JSON message: "
                          "Invalid job change job state received: The job identifier '%s' is not valid. "
                          "Job identifiers must be of the form [WORKLOAD_NAME!]JOB_ID. "
                          "If WORKLOAD_NAME! is omitted, WORKLOAD_NAME='static' is used. "
                          "Furthermore, the corresponding job must exist.", job_id.c_str());
    }

    message->job_state = job_state;
    message->kill_reason = kill_reason;

    send_message(timestamp, "server", IPMessageType::SCHED_CHANGE_JOB_STATE, (void *) message);
}

Millian Poquet's avatar
Millian Poquet committed
752
753
754
void JsonProtocolReader::handle_notify(int event_number,
                                       double timestamp,
                                       const Value &data_object)
755
{
Millian Poquet's avatar
Millian Poquet committed
756
    (void) event_number; // Avoids a warning if assertions are ignored
757
758
759
760
761
762
763
764
765
766
767
768
769
770
    /* {
      "timestamp": 42.0,
      "type": "NOTIFY",
      "data": { "type": "submission_finished" }
    } */

    xbt_assert(data_object.IsObject(), "Invalid JSON message: the 'data' value of event %d (NOTIFY) should be an object", event_number);

    xbt_assert(data_object.HasMember("type"), "Invalid JSON message: the 'data' value of event %d (NOTIFY) should have a 'type' key", event_number);
    const Value & notify_type_value = data_object["type"];
    xbt_assert(notify_type_value.IsString(), "Invalid JSON message: in event %d (NOTIFY): ['data']['type'] should be a string", event_number);
    string notify_type = notify_type_value.GetString();

    if (notify_type == "submission_finished")
771
    {
772
        send_message(timestamp, "server", IPMessageType::END_DYNAMIC_SUBMIT);
773
    }
774
775
776
777
    else if (notify_type == "continue_submission")
    {
        send_message(timestamp, "server", IPMessageType::CONTINUE_DYNAMIC_SUBMIT);
    }
778
    else
779
    {
780
        xbt_assert(false, "Unknown NOTIFY type received ('%s').", notify_type.c_str());
781
    }
782

783
784
785
    (void) timestamp;
}

Millian Poquet's avatar
Millian Poquet committed
786
787
788
void JsonProtocolReader::handle_submit_job(int event_number,
                                           double timestamp,
                                           const Value &data_object)
789
{
Millian Poquet's avatar
Millian Poquet committed
790
    (void) event_number; // Avoids a warning if assertions are ignored
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
    /* "with_redis": {
      "timestamp": 10.0,
      "type": "SUBMIT_JOB",
      "data": {
        "job_id": "w12!45",
      }
    },
    "without_redis": {
      "timestamp": 10.0,
      "type": "SUBMIT_JOB",
      "data": {
        "job_id": "dyn!my_new_job",
        "job":{
          "profile": "delay_10s",
          "res": 1,
          "id": "my_new_job",
          "walltime": 12.0
        },
        "profile":{
          "type": "delay",
          "delay": 10
        }
      }
    } */

    JobSubmittedByDPMessage * message = new JobSubmittedByDPMessage;

    xbt_assert(context->submission_sched_enabled, "Invalid JSON message: dynamic job submission received but the option seems disabled...");

    xbt_assert(data_object.IsObject(), "Invalid JSON message: the 'data' value of event %d (SUBMIT_JOB) should be an object", event_number);

    xbt_assert(data_object.HasMember("job_id"), "Invalid JSON message: the 'data' value of event %d (SUBMIT_JOB) should have a 'job_id' key", event_number);
    const Value & job_id_value = data_object["job_id"];
    xbt_assert(job_id_value.IsString(), "Invalid JSON message: in event %d (SUBMIT_JOB): ['data']['job_id'] should be a string", event_number);
    string job_id = job_id_value.GetString();

827
828
    if (!identify_job_from_string(context, job_id, message->job_id,
                                  IdentifyJobReturnCondition::STRING_VALID__JOB_DOES_NOT_EXISTS))
829
    {
830
        xbt_assert(false, "Invalid JSON message: in event %d (SUBMIT_JOB): job_id '%s' seems invalid (already exists?)", event_number, job_id.c_str());
831
    }
832
833
834

    if (data_object.HasMember("job"))
    {
Millian Poquet's avatar
Millian Poquet committed
835
836
        xbt_assert(!context->redis_enabled, "Invalid JSON message: in event %d (SUBMIT_JOB): 'job' object is given but redis seems disabled...", event_number);

837
838
839
        const Value & job_object = data_object["job"];
        xbt_assert(job_object.IsObject(), "Invalid JSON message: in event %d (SUBMIT_JOB): ['data']['job'] should be an object", event_number);

840
        StringBuffer buffer;
841
842
843
        ::Writer<rapidjson::StringBuffer> writer(buffer);
        job_object.Accept(writer);

844
        message->job_description = string(buffer.GetString(), buffer.GetSize());
845
846
    }
    else
847
    {
848
        xbt_assert(context->redis_enabled, "Invalid JSON message: in event %d (SUBMIT_JOB): ['data']['job'] is unset but redis seems enabled...", event_number);
849
    }
850
851
852

    if (data_object.HasMember("profile"))
    {
Millian Poquet's avatar
Millian Poquet committed
853
854
        xbt_assert(!context->redis_enabled, "Invalid JSON message: in event %d (SUBMIT_JOB): 'profile' object is given but redis seems disabled...", event_number);

855
856
857
        const Value & profile_object = data_object["profile"];
        xbt_assert(profile_object.IsObject(), "Invalid JSON message: in event %d (SUBMIT_JOB): ['data']['profile'] should be an object", event_number);

858
        StringBuffer buffer;
859
860
861
        ::Writer<rapidjson::StringBuffer> writer(buffer);
        profile_object.Accept(writer);

862
        message->job_profile_description = string(buffer.GetString(), buffer.GetSize());
863
864
865
    }

    send_message(timestamp, "server", IPMessageType::JOB_SUBMITTED_BY_DP, (void *) message);
866
867
}

Millian Poquet's avatar
Millian Poquet committed
868
869
870
void JsonProtocolReader::handle_kill_job(int event_number,
                                         double timestamp,
                                         const Value &data_object)
871
{
Millian Poquet's avatar
Millian Poquet committed
872
    (void) event_number; // Avoids a warning if assertions are ignored
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
    /* {
      "timestamp": 10.0,
      "type": "KILL_JOB",
      "data": {"job_ids": ["w0!1", "w0!2"]}
    } */

    KillJobMessage * message = new KillJobMessage;

    xbt_assert(data_object.IsObject(), "Invalid JSON message: the 'data' value of event %d (KILL_JOB) should be an object", event_number);
    xbt_assert(data_object.MemberCount() == 1, "Invalid JSON message: the 'data' value of event %d (KILL_JOB) should be of size 1 (size=%d)", event_number, (int)data_object.MemberCount());

    xbt_assert(data_object.HasMember("job_ids"), "Invalid JSON message: the 'data' value of event %d (KILL_JOB) should contain a 'job_ids' key.", event_number);
    const Value & job_ids_array = data_object["job_ids"];
    xbt_assert(job_ids_array.IsArray(), "Invalid JSON message: the 'job_ids' value in the 'data' value of event %d (KILL_JOB) should be an array.", event_number);
    xbt_assert(job_ids_array.Size() > 0, "Invalid JSON message: the 'job_ids' array in the 'data' value of event %d (KILL_JOB) should be non-empty.", event_number);
888
    message->jobs_ids.resize(job_ids_array.Size());
889
890
891
892
893

    for (unsigned int i = 0; i < job_ids_array.Size(); ++i)
    {
        const Value & job_id_value = job_ids_array[i];
        if (!identify_job_from_string(context, job_id_value.GetString(), message->jobs_ids[i]))
894
        {
895
            xbt_assert(false, "Invalid JSON message: in event %d (KILL_JOB): job_id %d ('%s') is invalid.", event_number, i, message->jobs_ids[i].to_string().c_str());
896
        }
897
898
899
    }

    send_message(timestamp, "server", IPMessageType::SCHED_KILL_JOB, (void *) message);
900
901
}

Millian Poquet's avatar
Millian Poquet committed
902
void JsonProtocolReader::send_message(double when,
Millian Poquet's avatar
Millian Poquet committed
903
904
                                      const string &destination_mailbox,
                                      IPMessageType type,
Millian Poquet's avatar
Millian Poquet committed
905
906
                                      void *data,
                                      bool detached) const
Millian Poquet's avatar
Millian Poquet committed
907
{
908
    // Let's wait until "when" time is reached
Millian Poquet's avatar
Millian Poquet committed
909
910
    double current_time = MSG_get_clock();
    if (when > current_time)
911
    {
Millian Poquet's avatar
Millian Poquet committed
912
        MSG_process_sleep(when - current_time);
913
    }
Millian Poquet's avatar
Millian Poquet committed
914

915
    // Let's actually send the message
Millian Poquet's avatar
Millian Poquet committed
916
    generic_send_message(destination_mailbox, type, data, detached);
Millian Poquet's avatar
Millian Poquet committed
917
}