protocol.hpp 18.6 KB
Newer Older
1
2
#pragma once

Millian Poquet's avatar
Millian Poquet committed
3
#include <functional>
4
5
#include <vector>
#include <string>
Millian Poquet's avatar
Millian Poquet committed
6
#include <map>
7

8
#include <rapidjson/document.h>
9
#include <rapidjson/writer.h>
10

11
#include "machine_range.hpp"
12
#include "machines.hpp"
Millian Poquet's avatar
Millian Poquet committed
13
#include "ipp.hpp"
14

Millian Poquet's avatar
Millian Poquet committed
15
16
struct BatsimContext;

17
18
19
20
21
22
23
/**
 * @brief Custom rapidjson Writer to force fixed float writing precision
 */
template<typename OutputStream>
class Writer : public rapidjson::Writer<OutputStream>
{
public:
Millian Poquet's avatar
Millian Poquet committed
24
25
26
27
    /**
     * @brief Constructor
     * @param[in,out] os The output stream
     */
28
    explicit Writer(OutputStream& os) : rapidjson::Writer<OutputStream>(os), os_(&os)
29
30
    {
    }
Millian Poquet's avatar
Millian Poquet committed
31
32
33
34
35
36

    /**
     * @brief Adds a double in the output stream
     * @param[in] d The double to add in the stream
     * @return true on success, false otherwise
     */
37
38
39
40
41
    bool Double(double d)
    {
        this->Prefix(rapidjson::kNumberType);

        const int buf_size = 32;
42
        char * buffer = new char[buf_size];
43

44
        int ret = snprintf(buffer, buf_size, "%6f", d);
45
        RAPIDJSON_ASSERT(ret >= 1);
46
        RAPIDJSON_ASSERT(ret < buf_size - 1);
47

48
        for (int i = 0; i < ret; ++i)
49
        {
50
            os_->Put(buffer[i]);
51
        }
52

53
        delete[] buffer;
54
55
56
57
        return ret < (buf_size - 1);
    }

private:
Millian Poquet's avatar
Millian Poquet committed
58
    OutputStream* os_; //!< The output stream
59
60
};

61
62
63
64
65
66
/**
 * @brief Does the interface between protocol semantics and message representation.
 */
class AbstractProtocolWriter
{
public:
67
68
69
    /**
     * @brief Destructor
     */
Millian Poquet's avatar
Millian Poquet committed
70
    virtual ~AbstractProtocolWriter() {}
71

72
73
    // Messages from Batsim to the Scheduler
    /**
74
     * @brief Appends a SIMULATION_BEGINS event.
75
     * @param[in] machines The machines usable to compute jobs
76
     * @param[in] configuration The simulation configuration
77
     * @param[in] allow_time_sharing Whether time sharing is enabled
78
79
     * @param[in] date The event date. Must be greater than or equal to the previous event.
     */
80
    virtual void append_simulation_begins(Machines & machines,
81
                                          const rapidjson::Document & configuration,
82
                                          bool allow_time_sharing,
83
                                          double date) = 0;
84
85
86
87
88
89
90
91
92

    /**
     * @brief Appends a SIMULATION_ENDS event.
     * @param[in] date The event date. Must be greater than or equal to the previous event.
     */
    virtual void append_simulation_ends(double date) = 0;

    /**
     * @brief Appends a JOB_SUBMITTED event.
Millian Poquet's avatar
Millian Poquet committed
93
     * @param[in] job_id The identifier of the submitted job.
94
95
96
     * @param[in] job_json_description The job JSON description (optional if redis is enabled)
     * @param[in] profile_json_description The profile JSON description (optional if redis is
     *            disabled or if profiles are not forwarded)
97
98
     * @param[in] date The event date. Must be greater than or equal to the previous event.
     */
Millian Poquet's avatar
Millian Poquet committed
99
    virtual void append_job_submitted(const std::string & job_id,
100
101
                                      const std::string & job_json_description,
                                      const std::string & profile_json_description,
102
103
104
105
106
                                      double date) = 0;

    /**
     * @brief Appends a JOB_COMPLETED event.
     * @param[in] job_id The identifier of the job that has completed.
107
     * @param[in] job_status The job status
108
109
     * @param[in] job_state The job state
     * @param[in] kill_reason The kill reason (if any)
110
     * @param[in] return_code The job return code
111
112
113
     * @param[in] date The event date. Must be greater than or equal to the previous event.
     */
    virtual void append_job_completed(const std::string & job_id,
114
                                      const std::string & job_status,
115
116
                                      const std::string & job_state,
                                      const std::string & kill_reason,
117
                                      int return_code,
118
119
120
121
122
123
124
                                      double date) = 0;

    /**
     * @brief Appends a JOB_KILLED event.
     * @param[in] job_ids The identifiers of the jobs that have been killed.
     * @param[in] date The event date. Must be greater than or equal to the previous event.
     */
125
    virtual void append_job_killed(const std::vector<std::string> & job_ids,
126
127
                                   double date) = 0;

128
129
130
131
132
133
134
    /**
     * @brief Appends a FROM_JOB_MSG event.
     * @param[in] job_id The identifier of the job which sends the message.
     * @param[in] message The message to be sent to the scheduler.
     * @param[in] date The event date. Must be greater than or equal to the previous event.
     */
    virtual void append_from_job_message(const std::string & job_id,
135
                                         const rapidjson::Document & message,
136
137
                                         double date) = 0;

138
139
140
141
142
143
144
145
146
147
148
    /**
     * @brief Appends a RESOURCE_STATE_CHANGED event.
     * @param[in] resources The resources whose state has changed.
     * @param[in] new_state The state the machines are now in.
     * @param[in] date The event date. Must be greater than or equal to the previous event.
     */
    virtual void append_resource_state_changed(const MachineRange & resources,
                                               const std::string & new_state,
                                               double date) = 0;

    /**
149
150
     * @brief Appends a QUERY_REPLY (energy) event.
     * @param[in] consumed_energy The total consumed energy in joules
151
152
     * @param[in] date The event date. Must be greater than or equal to the previous event.
     */
153
154
    virtual void append_query_reply_energy(double consumed_energy,
                                           double date) = 0;
155

156
157
158
159
160
161
    /**
     * @brief Appends a REQUESTED_CALL message.
     * @param[in] date The event date. Must be greater than or equal to the previous event.
     */
    virtual void append_requested_call(double date) = 0;

162
163
    // Management functions
    /**
164
     * @brief Clears inner content. Should called directly after generate_current_message.
165
166
167
168
169
170
171
172
173
174
     */
    virtual void clear() = 0;

    /**
     * @brief Generates a string representation of the message containing all the events since the
     *        last call to clear.
     * @param[in] date The message date. Must be greater than or equal to the inner events dates.
     * @return A string representation of the events added since the last call to clear.
     */
    virtual std::string generate_current_message(double date) = 0;
175
176
177
178
179
180

    /**
     * @brief Returns whether the Writer has content
     * @return Whether the Writer has content
     */
    virtual bool is_empty() = 0;
181
182
};

183
184
185
/**
 * @brief The JSON implementation of the AbstractProtocolWriter
 */
186
187
188
class JsonProtocolWriter : public AbstractProtocolWriter
{
public:
189
190
    /**
     * @brief Creates an empty JsonProtocolWriter
191
     * @param[in,out] context The BatsimContext
192
     */
193
    explicit JsonProtocolWriter(BatsimContext * context);
194

195
196
197
198
199
200
    /**
     * @brief JsonProtocolWriter cannot be copied.
     * @param[in] other Another instance
     */
    JsonProtocolWriter(const JsonProtocolWriter & other) = delete;

201
202
203
204
205
    /**
     * @brief Destroys a JsonProtocolWriter
     */
    ~JsonProtocolWriter();

206
207
    // Messages from Batsim to the Scheduler
    /**
208
     * @brief Appends a SIMULATION_BEGINS event.
209
     * @param[in] machines The machines usable to compute jobs
210
     * @param[in] configuration The simulation configuration
211
     * @param[in] allow_time_sharing Whether time sharing is enabled
212
213
     * @param[in] date The event date. Must be greater than or equal to the previous event.
     */
214
    void append_simulation_begins(Machines & machines,
215
                                  const rapidjson::Document & configuration,
216
                                  bool allow_time_sharing,
217
                                  double date);
218
219
220
221
222
223
224
225
226

    /**
     * @brief Appends a SIMULATION_ENDS event.
     * @param[in] date The event date. Must be greater than or equal to the previous event.
     */
    void append_simulation_ends(double date);

    /**
     * @brief Appends a JOB_SUBMITTED event.
Millian Poquet's avatar
Millian Poquet committed
227
     * @param[in] job_id The identifier of the submitted job.
228
229
230
     * @param[in] job_json_description The job JSON description (optional if redis is enabled)
     * @param[in] profile_json_description The profile JSON description (optional if redis is
     *            disabled or if profiles are not forwarded)
231
232
     * @param[in] date The event date. Must be greater than or equal to the previous event.
     */
Millian Poquet's avatar
Millian Poquet committed
233
    void append_job_submitted(const std::string & job_id,
234
235
                              const std::string & job_json_description,
                              const std::string & profile_json_description,
236
237
238
239
240
                              double date);

    /**
     * @brief Appends a JOB_COMPLETED event.
     * @param[in] job_id The identifier of the job that has completed.
241
     * @param[in] job_status The job status
242
243
     * @param[in] job_state The job state
     * @param[in] kill_reason The kill reason (if any)
244
     * @param[in] return_code The job return code
245
246
247
     * @param[in] date The event date. Must be greater than or equal to the previous event.
     */
    void append_job_completed(const std::string & job_id,
248
                              const std::string & job_status,
249
250
                              const std::string & job_state,
                              const std::string & kill_reason,
251
                              int return_code,
252
253
254
255
256
257
258
                              double date);

    /**
     * @brief Appends a JOB_KILLED event.
     * @param[in] job_ids The identifiers of the jobs that have been killed.
     * @param[in] date The event date. Must be greater than or equal to the previous event.
     */
259
    void append_job_killed(const std::vector<std::string> & job_ids,
260
261
                           double date);

262
263
264
265
266
267
268
    /**
     * @brief Appends a FROM_JOB_MSG event.
     * @param[in] job_id The identifier of the job which sends the message.
     * @param[in] message The message to be sent to the scheduler.
     * @param[in] date The event date. Must be greater than or equal to the previous event.
     */
    void append_from_job_message(const std::string & job_id,
269
                                 const rapidjson::Document & message,
270
271
                                 double date);

272
273
274
275
276
277
278
279
280
281
282
    /**
     * @brief Appends a RESOURCE_STATE_CHANGED event.
     * @param[in] resources The resources whose state has changed.
     * @param[in] new_state The state the machines are now in.
     * @param[in] date The event date. Must be greater than or equal to the previous event.
     */
    void append_resource_state_changed(const MachineRange & resources,
                                       const std::string & new_state,
                                       double date);

    /**
283
284
     * @brief Appends a QUERY_REPLY (energy) event.
     * @param[in] consumed_energy The total consumed energy in joules
285
286
     * @param[in] date The event date. Must be greater than or equal to the previous event.
     */
287
288
    void append_query_reply_energy(double consumed_energy,
                                   double date);
289

290
291
292
293
294
295
    /**
     * @brief Appends a REQUESTED_CALL message.
     * @param[in] date The event date. Must be greater than or equal to the previous event.
     */
    void append_requested_call(double date);

296
297
    // Management functions
    /**
298
     * @brief Clears inner content. Should be called directly after generate_current_message.
299
300
301
302
303
304
305
306
307
308
     */
    void clear();

    /**
     * @brief Generates a string representation of the message containing all the events since the
     *        last call to clear.
     * @param[in] date The message date. Must be greater than or equal to the inner events dates.
     * @return A string representation of the events added since the last call to clear.
     */
    std::string generate_current_message(double date);
309
310
311
312
313
314
315

    /**
     * @brief Returns whether the Writer has content
     * @return Whether the Writer has content
     */
    bool is_empty() { return _is_empty; }

316
317
318
private:
    /**
     * @brief Converts a machine to a json value.
319
320
     * @param[in] machine The machine to be converted
     * @return The json value
321
322
323
     */
    rapidjson::Value machine_to_json_value(const Machine & machine);

324
private:
325
    BatsimContext * _context; //!< The BatsimContext
326
327
328
329
330
    bool _is_empty = true; //!< Stores whether events have been pushed into the writer since last clear.
    double _last_date = -1; //!< The date of the latest pushed event/message
    rapidjson::Document _doc; //!< A rapidjson document
    rapidjson::Document::AllocatorType & _alloc; //!< The allocated of _doc
    rapidjson::Value _events = rapidjson::Value(rapidjson::kArrayType); //!< A rapidjson array in which the events are pushed
331
    const std::vector<std::string> accepted_completion_statuses = {"SUCCESS", "FAILED", "TIMEOUT"}; //!< The list of accepted statuses for the JOB_COMPLETED message
332
};
333
334


Millian Poquet's avatar
Millian Poquet committed
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359

/**
 * @brief In charge of parsing a protocol message and injecting internal messages into the simulation
 */
class AbstractProtocolReader
{
public:
    /**
     * @brief Destructor
     */
    virtual ~AbstractProtocolReader() {}

    /**
     * @brief Parses a message and injects events in the simulation
     * @param[in] message The protocol message
     */
    virtual void parse_and_apply_message(const std::string & message) = 0;
};

/**
 * @brief In charge of parsing a JSON message and injecting messages into the simulation
 */
class JsonProtocolReader : public AbstractProtocolReader
{
public:
360
361
362
363
    /**
     * @brief Constructor
     * @param[in] context The BatsimContext
     */
364
    explicit JsonProtocolReader(BatsimContext * context);
365

366
367
368
369
370
371
    /**
     * @brief JsonProtocolReader cannot be copied.
     * @param[in] other Another instance
     */
    JsonProtocolReader(const JsonProtocolReader & other) = delete;

Millian Poquet's avatar
Millian Poquet committed
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
    /**
     * @brief Destructor
     */
    ~JsonProtocolReader();

    /**
     * @brief Parses a message and injects events in the simulation
     * @param[in] message The protocol message
     */
    void parse_and_apply_message(const std::string & message);

    /**
     * @brief Parses an event and injects it in the simulation
     * @param[in] event_object The event (JSON object)
     * @param[in] event_number The event number in [0,nb_events[.
     * @param[in] now The message timestamp
     */
    void parse_and_apply_event(const rapidjson::Value & event_object, int event_number, double now);

391

Millian Poquet's avatar
Millian Poquet committed
392
393
    /**
     * @brief Handles a QUERY_REQUEST event
394
     * @param[in] event_number The event number in [0,nb_events[.
Millian Poquet's avatar
Millian Poquet committed
395
396
397
398
399
     * @param[in] timestamp The event timestamp
     * @param[in] data_object The data associated with the event (JSON object)
     */
    void handle_query_request(int event_number, double timestamp, const rapidjson::Value & data_object);

Millian Poquet's avatar
Millian Poquet committed
400
401
    /**
     * @brief Handles a QUERY_REQUEST event
402
     * @param[in] event_number The event number in [0,nb_events[.
Millian Poquet's avatar
Millian Poquet committed
403
404
405
406
407
     * @param[in] timestamp The event timestamp
     * @param[in] data_object The data associated with the event (JSON object)
     */
    void handle_reject_job(int event_number, double timestamp, const rapidjson::Value & data_object);

408
409
    /**
     * @brief Handles an EXECUTE_JOB event
410
     * @param[in] event_number The event number in [0,nb_events[.
411
412
413
414
415
     * @param[in] timestamp The event timestamp
     * @param[in] data_object The data associated with the event (JSON object)
     */
    void handle_execute_job(int event_number, double timestamp, const rapidjson::Value & data_object);

416
417
418
419
420
421
422
423
    /**
     * @brief Handles an CHANGE_JOB_STATE event
     * @param[in] event_number The event number in [0,nb_events[.
     * @param[in] timestamp The event timestamp
     * @param[in] data_object The data associated with the event (JSON object)
     */
    void handle_change_job_state(int event_number, double timestamp, const rapidjson::Value & data_object);

424
425
    /**
     * @brief Handles a CALL_ME_LATER event
426
     * @param[in] event_number The event number in [0,nb_events[.
427
428
429
430
431
432
433
     * @param[in] timestamp The event timestamp
     * @param[in] data_object The data associated with the event (JSON object)
     */
    void handle_call_me_later(int event_number, double timestamp, const rapidjson::Value & data_object);

    /**
     * @brief Handles a SET_RESOURCE_STATE event
434
     * @param[in] event_number The event number in [0,nb_events[.
435
436
437
438
439
     * @param[in] timestamp The event timestamp
     * @param[in] data_object The data associated with the event (JSON object)
     */
    void handle_set_resource_state(int event_number, double timestamp, const rapidjson::Value & data_object);

440
441
    /**
     * @brief Handles a NOTIFY event
442
     * @param[in] event_number The event number in [0,nb_events[.
443
444
445
446
447
     * @param[in] timestamp The event timestamp
     * @param[in] data_object The data associated with the event (JSON object)
     */
    void handle_notify(int event_number, double timestamp, const rapidjson::Value & data_object);

448
449
450
451
452
453
454
455
    /**
     * @brief Handles a TO_JOB_MSG event
     * @param[in] event_number The event number in [0,nb_events[.
     * @param[in] timestamp The event timestamp
     * @param[in] data_object The data associated with the event (JSON object)
     */
    void handle_to_job_msg(int event_number, double timestamp, const rapidjson::Value & data_object);

456
457
    /**
     * @brief Handles a SUBMIT_JOB event
458
     * @param[in] event_number The event number in [0,nb_events[.
459
460
461
462
463
464
465
     * @param[in] timestamp The event timestamp
     * @param[in] data_object The data associated with the event (JSON object)
     */
    void handle_submit_job(int event_number, double timestamp, const rapidjson::Value & data_object);

    /**
     * @brief Handles a KILL_JOB event
466
     * @param[in] event_number The event number in [0,nb_events[.
467
468
469
470
471
     * @param[in] timestamp The event timestamp
     * @param[in] data_object The data associated with the event (JSON object)
     */
    void handle_kill_job(int event_number, double timestamp, const rapidjson::Value & data_object);

Millian Poquet's avatar
Millian Poquet committed
472
private:
473
474
475
476
477
478
    /**
     * @brief Sends a message at a given time, sleeping to reach the given time if needed
     * @param[in] when The date at which the message should be sent
     * @param[in] destination_mailbox The destination mailbox
     * @param[in] type The message type
     * @param[in] data The message data
Millian Poquet's avatar
Millian Poquet committed
479
     * @param[in] detached Whether the send should be detached
480
     */
Millian Poquet's avatar
Millian Poquet committed
481
    void send_message(double when,
482
483
                      const std::string & destination_mailbox,
                      IPMessageType type,
Millian Poquet's avatar
Millian Poquet committed
484
485
                      void * data = nullptr,
                      bool detached = false) const;
Millian Poquet's avatar
Millian Poquet committed
486
487

private:
488
    //! Maps message types to their handler functions
Millian Poquet's avatar
Millian Poquet committed
489
    std::map<std::string, std::function<void(JsonProtocolReader*, int, double, const rapidjson::Value&)>> _type_to_handler_map;
490
491
    std::vector<std::string> accepted_requests = {"consumed_energy"}; //!< The currently acceptes requests for the QUERY_REQUEST message
    BatsimContext * context = nullptr; //!< The BatsimContext
Millian Poquet's avatar
Millian Poquet committed
492
};