server.cpp 35.5 KB
Newer Older
Millian Poquet's avatar
Millian Poquet committed
1
2
3
4
5
/**
 * @file server.cpp
 * @brief Contains functions related to the general orchestration of the simulation
 */

6
7
8
9
10
11
12
13
14
15
16
17
18
#include "server.hpp"

#include <string>

#include <boost/algorithm/string.hpp>

#include <simgrid/msg.h>

#include "context.hpp"
#include "ipp.hpp"
#include "network.hpp"
#include "jobs_execution.hpp"

Millian Poquet's avatar
Millian Poquet committed
19
XBT_LOG_NEW_DEFAULT_CATEGORY(server, "server"); //!< Logging
20
21
22

using namespace std;

23
int server_process(int argc, char *argv[])
24
25
26
27
28
29
30
{
    (void) argc;
    (void) argv;

    ServerProcessArguments * args = (ServerProcessArguments *) MSG_process_get_data(MSG_process_self());
    BatsimContext * context = args->context;

31
32
    ServerData * data = new ServerData;
    data->context = context;
33

34
35
    // Let's tell the Decision process that the simulation is about to begin
    // (and that some data can be read from the data storage)
36
    context->proto_writer->append_simulation_begins(context->machines,
37
38
39
                                                    context->config_file,
                                                    context->allow_time_sharing,
                                                    MSG_get_clock());
Millian Poquet's avatar
Millian Poquet committed
40

41
42
    RequestReplyProcessArguments * req_rep_args = new RequestReplyProcessArguments;
    req_rep_args->context = context;
Millian Poquet's avatar
Millian Poquet committed
43
44
    req_rep_args->send_buffer = context->proto_writer->generate_current_message(MSG_get_clock());
    context->proto_writer->clear();
45

46
47
48
    MSG_process_create("Scheduler REQ-REP", request_reply_scheduler_process,
                       (void*)req_rep_args, MSG_host_self());
    data->sched_ready = false;
49

50
51
52
53
54
55
56
    // Let's prepare a handler map to react on events
    std::map<IPMessageType, std::function<void(ServerData *, IPMessage *)>> handler_map;
    handler_map[IPMessageType::JOB_SUBMITTED] = server_on_job_submitted;
    handler_map[IPMessageType::JOB_SUBMITTED_BY_DP] = server_on_submit_job;
    handler_map[IPMessageType::JOB_COMPLETED] = server_on_job_completed;
    handler_map[IPMessageType::PSTATE_MODIFICATION] = server_on_pstate_modification;
    handler_map[IPMessageType::SCHED_EXECUTE_JOB] = server_on_execute_job;
57
    handler_map[IPMessageType::SCHED_CHANGE_JOB_STATE] = server_on_change_job_state;
58
59
60
61
62
63
64
65
66
67
68
    handler_map[IPMessageType::SCHED_REJECT_JOB] = server_on_reject_job;
    handler_map[IPMessageType::SCHED_KILL_JOB] = server_on_kill_jobs;
    handler_map[IPMessageType::SCHED_CALL_ME_LATER] = server_on_call_me_later;
    handler_map[IPMessageType::SCHED_TELL_ME_ENERGY] = server_on_sched_tell_me_energy;
    handler_map[IPMessageType::SCHED_WAIT_ANSWER] = server_on_sched_wait_answer;
    handler_map[IPMessageType::WAIT_QUERY] = server_on_wait_query;
    handler_map[IPMessageType::SCHED_READY] = server_on_sched_ready;
    handler_map[IPMessageType::WAITING_DONE] = server_on_waiting_done;
    handler_map[IPMessageType::KILLING_DONE] = server_on_killing_done;
    handler_map[IPMessageType::SUBMITTER_HELLO] = server_on_submitter_hello;
    handler_map[IPMessageType::SUBMITTER_BYE] = server_on_submitter_bye;
69
70
    handler_map[IPMessageType::SWITCHED_ON] = server_on_switched;
    handler_map[IPMessageType::SWITCHED_OFF] = server_on_switched;
71
    handler_map[IPMessageType::END_DYNAMIC_SUBMIT] = server_on_end_dynamic_submit;
72
    handler_map[IPMessageType::CONTINUE_DYNAMIC_SUBMIT] = server_on_continue_dynamic_submit;
73

74
    // Simulation loop
75
76
77
78
79
80
81
82
83
    while ((data->nb_submitters == 0) ||// To enter the loop
           (data->nb_submitters_finished < data->nb_submitters) || // All submitters must have finished
           (data->nb_completed_jobs < data->nb_submitted_jobs) || // All jobs must have finished
           (!data->sched_ready) || // A scheduler answer is being injected into the simulation
           (data->nb_switching_machines > 0) || // Some machines are switching state
           (data->nb_waiters > 0) || // The scheduler requested to be called in the future
           (data->nb_killers > 0) || // Some jobs are being killed
           (context->submission_sched_enabled && // If dynamic job submission are enabled
            !context->submission_sched_finished)) // The end of submissions must have been received
84
85
86
87
88
89
    {
        // Let's wait a message from a node or the request-reply process...
        msg_task_t task_received = NULL;
        IPMessage * task_data;
        MSG_task_receive(&(task_received), "server");

90
91
92
93
        // Let's handle the message
        task_data = (IPMessage *) MSG_task_get_data(task_received);
        XBT_INFO("Server received a message of type %s:",
                 ip_message_type_to_string(task_data->type).c_str());
94

95
96
97
98
99
        xbt_assert(handler_map.count(task_data->type) == 1,
                   "The server does not know how to handle message type %s.",
                   ip_message_type_to_string(task_data->type).c_str());
        auto handler_function = handler_map[task_data->type];
        handler_function(data, task_data);
100

101
102
103
        // Let's delete the message data
        delete task_data;
        MSG_task_destroy(task_received);
104

105
106
107
108
109
110
111
112
113
        // Let's send a message to the scheduler if needed
        if (data->sched_ready && // The scheduler must be ready
            !data->end_of_simulation_sent && // It will NOT be called if SIMULATION_ENDS has already been sent
            !context->proto_writer->is_empty()) // There must be something to send to it
        {
            RequestReplyProcessArguments * req_rep_args = new RequestReplyProcessArguments;
            req_rep_args->context = context;
            req_rep_args->send_buffer = context->proto_writer->generate_current_message(MSG_get_clock());
            context->proto_writer->clear();
114

115
116
117
            MSG_process_create("Scheduler REQ-REP", request_reply_scheduler_process, (void*)req_rep_args, MSG_host_self());
            data->sched_ready = false;
            if (data->all_jobs_submitted_and_completed)
118
            {
119
                data->end_of_simulation_sent = true;
120
            }
121
        }
122

123
    } // end of while
124

125
126
127
128
    XBT_INFO("Simulation is finished!");
    bool simulation_is_completed = data->all_jobs_submitted_and_completed;
    (void) simulation_is_completed; // Avoids a warning if assertions are ignored
    xbt_assert(simulation_is_completed, "Left simulation loop, but the simulation does NOT seem finished...");
129

130
131
132
133
    delete data;
    delete args;
    return 0;
}
134

135
136
137
138
139
140
141
void server_on_submitter_hello(ServerData * data,
                               IPMessage * task_data)
{
    xbt_assert(task_data->data != nullptr);
    xbt_assert(!data->all_jobs_submitted_and_completed,
               "A new submitter said hello but all jobs have already been submitted and completed... Aborting.");
    SubmitterHelloMessage * message = (SubmitterHelloMessage *) task_data->data;
142

143
144
145
    xbt_assert(data->submitters.count(message->submitter_name) == 0,
               "Invalid new submitter '%s': a submitter with the same name already exists!",
               message->submitter_name.c_str());
146

147
    data->nb_submitters++;
148

149
150
151
    ServerData::Submitter * submitter = new ServerData::Submitter;
    submitter->mailbox = message->submitter_name;
    submitter->should_be_called_back = message->enable_callback_on_job_completion;
152

153
    data->submitters[message->submitter_name] = submitter;
154

155
156
157
    XBT_INFO("New submitter said hello. Number of polite submitters: %d",
             data->nb_submitters);
}
158

159
160
161
162
163
164
165
166
167
168
169
170
void server_on_submitter_bye(ServerData * data,
                             IPMessage * task_data)
{
    xbt_assert(task_data->data != nullptr);
    SubmitterByeMessage * message = (SubmitterByeMessage *) task_data->data;

    xbt_assert(data->submitters.count(message->submitter_name) == 1);
    delete data->submitters[message->submitter_name];
    data->submitters.erase(message->submitter_name);

    data->nb_submitters_finished++;
    if (message->is_workflow_submitter)
171
    {
172
        data->nb_workflow_submitters_finished++;
173
    }
174
175
176
    XBT_INFO("A submitted said goodbye. Number of finished submitters: %d",
             data->nb_submitters_finished);

177
    check_submitted_and_completed(data);
178
}
179

180
181
182
183
184
void server_on_job_completed(ServerData * data,
                             IPMessage * task_data)
{
    xbt_assert(task_data->data != nullptr);
    JobCompletedMessage * message = (JobCompletedMessage *) task_data->data;
Millian Poquet's avatar
Millian Poquet committed
185

186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
    if (data->origin_of_jobs.count(message->job_id) == 1)
    {
        // Let's call the submitter which submitted the job back
        SubmitterJobCompletionCallbackMessage * msg = new SubmitterJobCompletionCallbackMessage;
        msg->job_id = message->job_id;

        ServerData::Submitter * submitter = data->origin_of_jobs.at(message->job_id);
        dsend_message(submitter->mailbox, IPMessageType::SUBMITTER_CALLBACK, (void*) msg);

        data->origin_of_jobs.erase(message->job_id);
    }

    data->nb_running_jobs--;
    xbt_assert(data->nb_running_jobs >= 0);
    data->nb_completed_jobs++;
    xbt_assert(data->nb_completed_jobs + data->nb_running_jobs <= data->nb_submitted_jobs);
    Job * job = data->context->workloads.job_at(message->job_id);

    XBT_INFO("Job %s!%d COMPLETED. %d jobs completed so far",
             job->workload->name.c_str(), job->number, data->nb_completed_jobs);

    string status = "UNKNOWN";
    if (job->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY)
209
    {
210
        status = "SUCCESS";
211
    }
212
    else if (job->state == JobState::JOB_STATE_COMPLETED_KILLED && job->kill_reason == "Walltime reached")
213
    {
214
        status = "TIMEOUT";
215
    }
216
217

    data->context->proto_writer->append_job_completed(message->job_id.to_string(),
218
219
                                                      status, job_state_to_string(job->state),
                                                      job->kill_reason,
220
                                                      MSG_get_clock());
221

222
    check_submitted_and_completed(data);
223
}
224

225
226
227
228
229
230
231
232
233
234
void server_on_job_submitted(ServerData * data,
                             IPMessage * task_data)
{
    // Ignore all submissions if -k was specified and all workflows have completed
    if ((data->context->workflows.size() != 0) && (data->context->terminate_with_last_workflow) &&
        (data->nb_workflow_submitters_finished == data->context->workflows.size()))
    {
        XBT_INFO("Ignoring Job due to -k command-line option");
        return;
    }
235

236
237
    xbt_assert(task_data->data != nullptr);
    JobSubmittedMessage * message = (JobSubmittedMessage *) task_data->data;
238

239
    xbt_assert(data->submitters.count(message->submitter_name) == 1);
240

241
242
243
244
245
246
    ServerData::Submitter * submitter = data->submitters.at(message->submitter_name);
    if (submitter->should_be_called_back)
    {
        xbt_assert(data->origin_of_jobs.count(message->job_id) == 0);
        data->origin_of_jobs[message->job_id] = submitter;
    }
247

248
249
250
251
252
    // Let's retrieve the Job from memory (or add it into memory if it is dynamic)
    XBT_INFO("GOT JOB: %s %d\n", message->job_id.workload_name.c_str(), message->job_id.job_number);
    xbt_assert(data->context->workloads.job_exists(message->job_id));
    Job * job = data->context->workloads.job_at(message->job_id);
    job->id = message->job_id.to_string();
253

254
255
256
257
258
    // Update control information
    job->state = JobState::JOB_STATE_SUBMITTED;
    ++data->nb_submitted_jobs;
    XBT_INFO("Job %s SUBMITTED. %d jobs submitted so far",
             message->job_id.to_string().c_str(), data->nb_submitted_jobs);
259

260
    string job_json_description, profile_json_description;
261

262
263
264
265
    if (!data->context->redis_enabled)
    {
        job_json_description = job->json_description;
        if (data->context->submission_forward_profiles)
266
        {
267
            profile_json_description = job->workload->profiles->at(job->profile)->json_description;
268
        }
269
    }
270

271
272
273
    data->context->proto_writer->append_job_submitted(job->id, job_json_description,
                                                      profile_json_description, MSG_get_clock());
}
274

275
276
277
278
279
280
281
void server_on_pstate_modification(ServerData * data,
                                   IPMessage * task_data)
{
    xbt_assert(task_data->data != nullptr);
    PStateModificationMessage * message = (PStateModificationMessage *) task_data->data;

    data->context->current_switches.add_switch(message->machine_ids, message->new_pstate);
282
283
284

    if (data->context->energy_used) {
        data->context->energy_tracer.add_pstate_change(MSG_get_clock(), message->machine_ids,
285
                                                   message->new_pstate);
286
    }
287
288
289
290
291
292

    // Let's quickly check whether this is a switchON or a switchOFF
    // Unknown transition states will be set to -42.
    int transition_state = -42;
    Machine * first_machine = data->context->machines[message->machine_ids.first_element()];
    if (first_machine->pstates[message->new_pstate] == PStateType::COMPUTATION_PSTATE)
293
    {
294
        transition_state = -1; // means we are switching to a COMPUTATION_PSTATE
295
    }
296
    else if (first_machine->pstates[message->new_pstate] == PStateType::SLEEP_PSTATE)
297
    {
298
        transition_state = -2; // means we are switching to a SLEEP_PSTATE
299
    }
300

301
302
303
    if (data->context->energy_used) {
        // The pstate is set to an invalid one to know the machines are in transition.
        data->context->pstate_tracer.add_pstate_change(MSG_get_clock(), message->machine_ids,
304
                                                   transition_state);
305
    }
306
307
308
309
310
311
312
313
314
315
316
317

    // Let's mark that some switches have been requested
    data->context->nb_grouped_switches++;
    data->context->nb_machine_switches += message->machine_ids.size();

    for (auto machine_it = message->machine_ids.elements_begin();
         machine_it != message->machine_ids.elements_end();
         ++machine_it)
    {
        const int machine_id = *machine_it;
        Machine * machine = data->context->machines[machine_id];
        int curr_pstate = MSG_host_get_pstate(machine->host);
318

319
        if (machine->pstates[curr_pstate] == PStateType::COMPUTATION_PSTATE)
320
        {
321
            if (machine->pstates[message->new_pstate] == PStateType::COMPUTATION_PSTATE)
322
            {
323
324
325
326
327
328
329
330
331
                XBT_INFO("Switching machine %d ('%s') pstate : %d -> %d.", machine->id,
                         machine->name.c_str(), curr_pstate, message->new_pstate);
                MSG_host_set_pstate(machine->host, message->new_pstate);
                xbt_assert(MSG_host_get_pstate(machine->host) == message->new_pstate);

                MachineRange all_switched_machines;
                if (data->context->current_switches.mark_switch_as_done(machine->id, message->new_pstate,
                                                                        all_switched_machines,
                                                                        data->context))
332
                {
333
334
335
                    data->context->proto_writer->append_resource_state_changed(all_switched_machines,
                                                                               std::to_string(message->new_pstate),
                                                                               MSG_get_clock());
336
                }
337
            }
338
            else if (machine->pstates[message->new_pstate] == PStateType::SLEEP_PSTATE)
339
            {
340
341
342
343
344
                machine->update_machine_state(MachineState::TRANSITING_FROM_COMPUTING_TO_SLEEPING);
                SwitchPStateProcessArguments * args = new SwitchPStateProcessArguments;
                args->context = data->context;
                args->machine_id = machine_id;
                args->new_pstate = message->new_pstate;
345

346
347
                string pname = "switch ON " + to_string(machine_id);
                MSG_process_create(pname.c_str(), switch_off_machine_process, (void*)args, machine->host);
348

349
                ++data->nb_switching_machines;
Millian Poquet's avatar
Millian Poquet committed
350
            }
351
            else
352
            {
353
354
                XBT_ERROR("Switching from a communication pstate to an invalid pstate on machine %d ('%s') : %d -> %d",
                          machine->id, machine->name.c_str(), curr_pstate, message->new_pstate);
355
            }
356
357
        }
        else if (machine->pstates[curr_pstate] == PStateType::SLEEP_PSTATE)
358
        {
359
360
361
            xbt_assert(machine->pstates[message->new_pstate] == PStateType::COMPUTATION_PSTATE,
                    "Switching from a sleep pstate to a non-computation pstate on machine %d ('%s') : %d -> %d, which is forbidden",
                    machine->id, machine->name.c_str(), curr_pstate, message->new_pstate);
362

363
364
365
366
367
            machine->update_machine_state(MachineState::TRANSITING_FROM_SLEEPING_TO_COMPUTING);
            SwitchPStateProcessArguments * args = new SwitchPStateProcessArguments;
            args->context = data->context;
            args->machine_id = machine_id;
            args->new_pstate = message->new_pstate;
368

369
370
            string pname = "switch OFF " + to_string(machine_id);
            MSG_process_create(pname.c_str(), switch_on_machine_process, (void*)args, machine->host);
371

372
373
374
            ++data->nb_switching_machines;
        }
        else
375
        {
376
            XBT_ERROR("Machine %d ('%s') has an invalid pstate : %d", machine->id, machine->name.c_str(), curr_pstate);
377
        }
378
    }
379

380
    if (data->context->trace_machine_states)
381
    {
382
        data->context->machine_state_tracer.write_machine_states(MSG_get_clock());
383
    }
384
}
385

386
387
388
389
390
391
392
void server_on_waiting_done(ServerData * data,
                            IPMessage * task_data)
{
    (void) task_data;
    data->context->proto_writer->append_requested_call(MSG_get_clock());
    --data->nb_waiters;
}
393

394
395
396
397
398
399
void server_on_sched_ready(ServerData * data,
                           IPMessage * task_data)
{
    (void) task_data;
    data->sched_ready = true;
}
400

401
402
403
404
405
406
void server_on_sched_wait_answer(ServerData * data,
                                 IPMessage * task_data)
{
    (void) data;
    SchedWaitAnswerMessage * message = new SchedWaitAnswerMessage;
    *message = *( (SchedWaitAnswerMessage *) task_data->data);
407

408
409
410
411
    //    Submitter * submitter = origin_of_wait_queries.at({message->nb_resources,message->processing_time});
    dsend_message(message->submitter_name, IPMessageType::SCHED_WAIT_ANSWER, (void*) message);
    //    origin_of_wait_queries.erase({message->nb_resources,message->processing_time});
}
412

413
414
415
416
417
418
419
void server_on_sched_tell_me_energy(ServerData * data,
                                    IPMessage * task_data)
{
    (void) task_data;
    long double total_consumed_energy = data->context->machines.total_consumed_energy(data->context);
    data->context->proto_writer->append_query_reply_energy(total_consumed_energy, MSG_get_clock());
}
420

421
422
423
424
425
426
void server_on_wait_query(ServerData * data,
                          IPMessage * task_data)
{
    (void) data;
    (void) task_data;
    //WaitQueryMessage * message = (WaitQueryMessage *) task_data->data;
427

428
429
    //    XBT_INFO("received : %s , %s\n", to_string(message->nb_resources).c_str(), to_string(message->processing_time).c_str());
    xbt_assert(false, "Unimplemented! TODO");
430

431
432
433
    //Submitter * submitter = submitters.at(message->submitter_name);
    //origin_of_wait_queries[{message->nb_resources,message->processing_time}] = submitter;
}
434

435
436
void server_on_switched(ServerData * data,
                        IPMessage * task_data)
437
438
{
    xbt_assert(task_data->data != nullptr);
439
    SwitchMessage * message = (SwitchMessage *) task_data->data;
440

441
442
443
444
    xbt_assert(data->context->machines.exists(message->machine_id));
    Machine * machine = data->context->machines[message->machine_id];
    (void) machine; // Avoids a warning if assertions are ignored
    xbt_assert(MSG_host_get_pstate(machine->host) == message->new_pstate);
445

446
447
448
449
450
    MachineRange all_switched_machines;
    if (data->context->current_switches.mark_switch_as_done(message->machine_id, message->new_pstate,
                                                            all_switched_machines, data->context))
    {
        if (data->context->trace_machine_states)
451
        {
452
            data->context->machine_state_tracer.write_machine_states(MSG_get_clock());
453
        }
454

455
456
457
458
        data->context->proto_writer->append_resource_state_changed(all_switched_machines,
                                                                   std::to_string(message->new_pstate),
                                                                   MSG_get_clock());
    }
459

460
461
    --data->nb_switching_machines;
}
462

463
464
465
466
467
void server_on_killing_done(ServerData * data,
                            IPMessage * task_data)
{
    xbt_assert(task_data->data != nullptr);
    KillingDoneMessage * message = (KillingDoneMessage *) task_data->data;
468

469
470
471
    vector<string> job_ids_str;
    vector<string> really_killed_job_ids_str;
    job_ids_str.reserve(message->jobs_ids.size());
472

473
474
475
    for (const JobIdentifier & job_id : message->jobs_ids)
    {
        job_ids_str.push_back(job_id.to_string());
476

477
478
479
        const Job * job = data->context->workloads.job_at(job_id);
        if (job->state == JobState::JOB_STATE_COMPLETED_KILLED &&
            job->kill_reason == "Killed from killer_process (probably requested by the decision process)")
480
        {
481
482
483
484
            data->nb_running_jobs--;
            xbt_assert(data->nb_running_jobs >= 0);
            data->nb_completed_jobs++;
            xbt_assert(data->nb_completed_jobs + data->nb_running_jobs <= data->nb_submitted_jobs);
485

486
487
488
            really_killed_job_ids_str.push_back(job_id.to_string());
        }
    }
489

490
491
492
    XBT_INFO("Jobs {%s} have been killed (the following ones have REALLY been killed: {%s})",
             boost::algorithm::join(job_ids_str, ",").c_str(),
             boost::algorithm::join(really_killed_job_ids_str, ",").c_str());
493

494
495
    data->context->proto_writer->append_job_killed(job_ids_str, MSG_get_clock());
    --data->nb_killers;
496

497
    check_submitted_and_completed(data);
498
}
499

500
501
502
503
504
void server_on_end_dynamic_submit(ServerData * data,
                                  IPMessage * task_data)
{
    (void) task_data;
    data->context->submission_sched_finished = true;
505
506

    check_submitted_and_completed(data);
507
508
}

509
510
511
512
513
514
515
516
517
void server_on_continue_dynamic_submit(ServerData * data,
                                  IPMessage * task_data)
{
    (void) task_data;
    data->context->submission_sched_finished = false;

    check_submitted_and_completed(data);
}

518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
void server_on_submit_job(ServerData * data,
                          IPMessage * task_data)
{
    xbt_assert(task_data->data != nullptr);
    JobSubmittedByDPMessage * message = (JobSubmittedByDPMessage *) task_data->data;

    xbt_assert(data->context->submission_sched_enabled,
               "Job submission coming from the decision process received but the option "
               "seems disabled... It can be activated by specifying a configuration file "
               "to Batsim.");

    xbt_assert(!data->context->workloads.job_exists(message->job_id),
               "Bad job submission received from the decision process: job %s already exists.",
               message->job_id.to_string().c_str());

    // Let's create the workload if it doesn't exist, or retrieve it otherwise
    Workload * workload = nullptr;
    if (data->context->workloads.exists(message->job_id.workload_name))
536
    {
537
        workload = data->context->workloads.at(message->job_id.workload_name);
538
    }
539
540
541
542
543
    else
    {
        workload = new Workload(message->job_id.workload_name);
        data->context->workloads.insert_workload(workload->name, workload);
    }
544

545
546
547
548
549
550
551
552
    // If redis is enabled, the job description must be retrieved from it
    if (data->context->redis_enabled)
    {
        xbt_assert(message->job_description.empty(), "Internal error");
        string job_key = RedisStorage::job_key(message->job_id);
        message->job_description = data->context->storage.get(job_key);
    }
    else
553
    {
554
        xbt_assert(!message->job_description.empty(), "Internal error");
555
    }
556
557
558
559
560
561
562
563
564
565
566
567
568

    // Let's parse the user-submitted job
    XBT_INFO("Parsing user-submitted job %s", message->job_id.to_string().c_str());
    Job * job = Job::from_json(message->job_description, workload,
                               "Invalid JSON job submitted by the scheduler");
    workload->jobs->add_job(job);
    job->id = JobIdentifier(workload->name, job->number).to_string();

    // Let's parse the profile if needed
    if (!workload->profiles->exists(job->profile))
    {
        XBT_INFO("The profile of user-submitted job '%s' does not exist yet.",
                 job->profile.c_str());
569

570
571
        // If redis is enabled, the profile description must be retrieved from it
        if (data->context->redis_enabled)
572
        {
573
574
575
576
577
578
            xbt_assert(message->job_profile_description.empty(), "Internal error");
            string profile_key = RedisStorage::profile_key(message->job_id.workload_name,
                                                           job->profile);
            message->job_profile_description = data->context->storage.get(profile_key);
        }
        else
579
        {
580
            xbt_assert(!message->job_profile_description.empty(), "Internal error");
581
        }
582

583
584
585
586
587
588
        Profile * profile = Profile::from_json(job->profile,
                                               message->job_profile_description,
                                               "Invalid JSON profile received from the scheduler");
        workload->profiles->add_profile(job->profile, profile);
    }
    // TODO? check profile collisions otherwise
589

590
591
    // Let's set the job state
    job->state = JobState::JOB_STATE_SUBMITTED;
592

593
594
    // Let's update global states
    ++data->nb_submitted_jobs;
595

596
597
598
    if (data->context->submission_sched_ack)
    {
        string job_json_description, profile_json_description;
599

600
        if (!data->context->redis_enabled)
601
        {
602
603
            job_json_description = job->json_description;
            if (data->context->submission_forward_profiles)
604
            {
605
                profile_json_description = job->workload->profiles->at(job->profile)->json_description;
606
            }
607
        }
608

609
610
611
612
613
        data->context->proto_writer->append_job_submitted(job->id, job_json_description,
                                                          profile_json_description,
                                                          MSG_get_clock());
    }
}
614

615
616
617
618
619
620
621
622
623
624
625
626
void server_on_change_job_state(ServerData * data,
                          IPMessage * task_data)
{
    xbt_assert(task_data->data != nullptr);
    ChangeJobStateMessage * message = (ChangeJobStateMessage *) task_data->data;

    Job * job = data->context->workloads.job_at(message->job_id);

    XBT_INFO("Change job state: Job %d (workload=%s) to state %s (kill_Reason=%s)",
        job->number, job->workload->name.c_str(),
        message->job_state.c_str(), message->kill_reason.c_str());

627
    JobState new_state = job_state_from_string(message->job_state);
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670

    switch (job->state) {
    case JobState::JOB_STATE_SUBMITTED:
        switch (new_state) {
        case JobState::JOB_STATE_RUNNING:
            job->starting_time = MSG_get_clock();
            data->nb_running_jobs++;
            xbt_assert(data->nb_running_jobs <= data->nb_submitted_jobs);
            break;
        case JobState::JOB_STATE_REJECTED:
            data->nb_completed_jobs++;
            xbt_assert(data->nb_completed_jobs + data->nb_running_jobs <= data->nb_submitted_jobs);
            break;
        default:
            xbt_assert(false, "Can only change the state of a submitted job to running or rejected");
        }
        break;
    case JobState::JOB_STATE_RUNNING:
        switch (new_state) {
        case JobState::JOB_STATE_COMPLETED_SUCCESSFULLY:
        case JobState::JOB_STATE_COMPLETED_KILLED:
            job->runtime = MSG_get_clock() - job->starting_time;
            data->nb_running_jobs--;
            xbt_assert(data->nb_running_jobs >= 0);
            data->nb_completed_jobs++;
            xbt_assert(data->nb_completed_jobs + data->nb_running_jobs <= data->nb_submitted_jobs);
            break;
        default:
            xbt_assert(false, "Can only change the state of a running job to completed (successfully and killed)");
        }
        break;
    default:
        xbt_assert(false, "Can only change the state of a submitted or running job.");
    }
    job->state = new_state;
    job->kill_reason = message->kill_reason;

    XBT_INFO("Job state changed: Job %d (workload=%s)",
        job->number, job->workload->name.c_str());

    check_submitted_and_completed(data);
}

671
672
673
674
675
void server_on_reject_job(ServerData * data,
                          IPMessage * task_data)
{
    xbt_assert(task_data->data != nullptr);
    JobRejectedMessage * message = (JobRejectedMessage *) task_data->data;
676

677
678
679
    Job * job = data->context->workloads.job_at(message->job_id);
    job->state = JobState::JOB_STATE_REJECTED;
    data->nb_completed_jobs++;
680

681
682
    XBT_INFO("Job %d (workload=%s) has been rejected",
             job->number, job->workload->name.c_str());
683
684

    check_submitted_and_completed(data);
685
}
686

687
688
689
690
691
void server_on_kill_jobs(ServerData * data,
                         IPMessage * task_data)
{
    xbt_assert(task_data->data != nullptr);
    KillJobMessage * message = (KillJobMessage *) task_data->data;
692

693
694
    KillerProcessArguments * args = new KillerProcessArguments;
    args->context = data->context;
695

696
    for (const JobIdentifier & job_id : message->jobs_ids)
697
698
    {
        Job * job = data->context->workloads.job_at(job_id);
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715

        // Let's discard jobs whose kill has already been requested
        if (!job->kill_requested)
        {
            // Let's check the job state
            xbt_assert(job->state == JobState::JOB_STATE_RUNNING ||
                       job->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY ||
                       job->state == JobState::JOB_STATE_COMPLETED_KILLED,
                       "Invalid KILL_JOB: job_id '%s' refers to a job not being executed nor completed.",
                       job_id.to_string().c_str());

            // Let's mark that the job kill has been requested
            job->kill_requested = true;

            // The job is included in the killer_process arguments
            args->jobs_ids.push_back(job_id);
        }
716
717
718
719
720
    }

    MSG_process_create("killer_process", killer_process, (void *) args, MSG_host_self());
    ++data->nb_killers;
}
721

722
723
724
725
726
void server_on_call_me_later(ServerData * data,
                             IPMessage * task_data)
{
    xbt_assert(task_data->data != nullptr);
    CallMeLaterMessage * message = (CallMeLaterMessage *) task_data->data;
727

728
729
730
    xbt_assert(message->target_time > MSG_get_clock(),
               "You asked to be awaken in the past! (you ask: %f, it is: %f)",
               message->target_time, MSG_get_clock());
731

732
733
    WaiterProcessArguments * args = new WaiterProcessArguments;
    args->target_time = message->target_time;
734

735
736
737
738
739
    string pname = "waiter " + to_string(message->target_time);
    MSG_process_create(pname.c_str(), waiter_process, (void*) args,
                       data->context->machines.master_machine()->host);
    ++data->nb_waiters;
}
740

741
742
743
744
745
746
void server_on_execute_job(ServerData * data,
                           IPMessage * task_data)
{
    xbt_assert(task_data->data != nullptr);
    ExecuteJobMessage * message = (ExecuteJobMessage *) task_data->data;
    SchedulingAllocation * allocation = message->allocation;
747

748
749
750
    xbt_assert(data->context->workloads.job_exists(allocation->job_id),
               "Trying to execute job '%s', which does NOT exist!",
               allocation->job_id.to_string().c_str());
751

752
753
754
755
    Job * job = data->context->workloads.job_at(allocation->job_id);
    xbt_assert(job->state == JobState::JOB_STATE_SUBMITTED,
               "Cannot execute job '%s': its state (%d) is not JOB_STATE_SUBMITTED.",
               job->id.c_str(), job->state);
756

757
    job->state = JobState::JOB_STATE_RUNNING;
758

759
760
    data->nb_running_jobs++;
    xbt_assert(data->nb_running_jobs <= data->nb_submitted_jobs);
761

762
763
764
    if (!data->context->allow_time_sharing)
    {
        for (auto machine_id_it = allocation->machine_ids.elements_begin(); machine_id_it != allocation->machine_ids.elements_end(); ++machine_id_it)
765
        {
766
767
768
769
770
771
772
773
            int machine_id = *machine_id_it;
            const Machine * machine = data->context->machines[machine_id];
            (void) machine; // Avoids a warning if assertions are ignored
            xbt_assert(machine->jobs_being_computed.empty(),
                       "Invalid job allocation: machine %d ('%s') is currently computing jobs (these ones:"
                       " {%s}) whereas space sharing is forbidden. Space sharing can be enabled via an option,"
                       " try --help to display the available options", machine->id, machine->name.c_str(),
                       machine->jobs_being_computed_as_string().c_str());
774
        }
775
    }
776

777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
    if (data->context->energy_used)
    {
        // Check that every machine is in a computation pstate
        for (auto machine_id_it = allocation->machine_ids.elements_begin(); machine_id_it != allocation->machine_ids.elements_end(); ++machine_id_it)
        {
            int machine_id = *machine_id_it;
            Machine * machine = data->context->machines[machine_id];
            int ps = MSG_host_get_pstate(machine->host);
            (void) ps; // Avoids a warning if assertions are ignored
            xbt_assert(machine->has_pstate(ps));
            xbt_assert(machine->pstates[ps] == PStateType::COMPUTATION_PSTATE,
                       "Invalid job allocation: machine %d ('%s') is not in a computation pstate (ps=%d)",
                       machine->id, machine->name.c_str(), ps);
            xbt_assert(machine->state == MachineState::COMPUTING || machine->state == MachineState::IDLE,
                       "Invalid job allocation: machine %d ('%s') cannot compute jobs now (the machine is"
                       " neither computing nor being idle)", machine->id, machine->name.c_str());
        }
    }
795

796
797
798
799
800
801
802
    xbt_assert((int)allocation->mapping.size() == job->required_nb_res,
               "Invalid job %s allocation. The job requires %d machines but only %d were given (%s). "
               "Using a different number of machines is only allowed if a custom mapping is specified. "
               "This mapping must specify which allocated machine each executor should use.",
               job->id.c_str(), job->required_nb_res, (int)allocation->mapping.size(),
               allocation->machine_ids.to_string_hyphen().c_str());

803
804
    // Let's generate the hosts used by the job
    allocation->hosts.clear();
805
806
807
    allocation->hosts.reserve(job->required_nb_res);

    for (unsigned int executor_id = 0; executor_id < allocation->mapping.size(); ++executor_id)
808
    {
809
810
811
        int machine_id_within_allocated_resources = allocation->mapping[executor_id];
        int machine_id = allocation->machine_ids[machine_id_within_allocated_resources];

812
        allocation->hosts.push_back(data->context->machines[machine_id]->host);
813
    }
814
815
816
817

    xbt_assert((int)allocation->hosts.size() == job->required_nb_res,
               "Invalid number of hosts (expected %d, got %d)",
               job->required_nb_res, (int)allocation->hosts.size());
818
819
820
821

    ExecuteJobProcessArguments * exec_args = new ExecuteJobProcessArguments;
    exec_args->context = data->context;
    exec_args->allocation = allocation;
822
    exec_args->notify_server_at_end = true;
823
824
825
826
827
    string pname = "job_" + job->id;
    msg_process_t process = MSG_process_create(pname.c_str(), execute_job_process,
                                               (void*)exec_args,
                                               data->context->machines[allocation->machine_ids.first_element()]->host);
    job->execution_processes.insert(process);
828
}
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843

void check_submitted_and_completed(ServerData * data)
{
    if (!data->all_jobs_submitted_and_completed && // guard to prevent multiple SIMULATION_ENDS events
        data->nb_submitters_finished == data->nb_submitters && // all submitters must have finished
        data->nb_completed_jobs == data->nb_submitted_jobs && // all submitted jobs must have finished
        // If dynamic submission is enabled, it must be finished
        (!data->context->submission_sched_enabled || data->context->submission_sched_finished))
    {
        XBT_INFO("It seems that all jobs have been submitted and completed!");
        data->all_jobs_submitted_and_completed = true;

        data->context->proto_writer->append_simulation_ends(MSG_get_clock());
    }
}