server.cpp 31 KB
Newer Older
Millian Poquet's avatar
Millian Poquet committed
1
2
3
4
5
/**
 * @file server.cpp
 * @brief Contains functions related to the general orchestration of the simulation
 */

6
7
8
9
10
11
12
13
14
15
16
17
18
#include "server.hpp"

#include <string>

#include <boost/algorithm/string.hpp>

#include <simgrid/msg.h>

#include "context.hpp"
#include "ipp.hpp"
#include "network.hpp"
#include "jobs_execution.hpp"

Millian Poquet's avatar
Millian Poquet committed
19
XBT_LOG_NEW_DEFAULT_CATEGORY(server, "server"); //!< Logging
20
21
22

using namespace std;

23
int server_process(int argc, char *argv[])
24
25
26
27
28
29
30
31
32
33
34
{
    (void) argc;
    (void) argv;

    ServerProcessArguments * args = (ServerProcessArguments *) MSG_process_get_data(MSG_process_self());
    BatsimContext * context = args->context;

    int nb_completed_jobs = 0;
    int nb_submitted_jobs = 0;
    int nb_submitters = 0;
    int nb_submitters_finished = 0;
35
    int nb_workflow_submitters_finished = 0;
36
37
    int nb_running_jobs = 0;
    int nb_switching_machines = 0;
38
    int nb_waiters = 0;
39
    int nb_killers = 0;
40
    bool sched_ready = true;
41
    bool all_jobs_submitted_and_completed = false;
42
    bool end_of_simulation_sent = false;
43

44
45
46
47
48
49
50
51
52
    // Let's store some information about the submitters
    struct Submitter
    {
        string mailbox;
        bool should_be_called_back;
    };

    map<string, Submitter*> submitters;

53
    // Let's store the origin of some jobs
54
55
    map<JobIdentifier, Submitter*> origin_of_jobs;

56
57
58
    // Let's store the origin of wait queries
    map<std::pair<int,double>, Submitter*> origin_of_wait_queries;

59
    // Let's tell the Decision process that the simulation is about to begin (and that some data can be read from the data storage)
60
    context->proto_writer->append_simulation_begins(context->machines.nb_machines(), context->config_file, MSG_get_clock());
Millian Poquet's avatar
Millian Poquet committed
61

62
63
    RequestReplyProcessArguments * req_rep_args = new RequestReplyProcessArguments;
    req_rep_args->context = context;
Millian Poquet's avatar
Millian Poquet committed
64
65
    req_rep_args->send_buffer = context->proto_writer->generate_current_message(MSG_get_clock());
    context->proto_writer->clear();
66
67
68
69
70

    MSG_process_create("Scheduler REQ-REP", request_reply_scheduler_process, (void*)req_rep_args, MSG_host_self());
    sched_ready = false;

    // Simulation loop
71
    while ((nb_submitters == 0) || (nb_submitters_finished < nb_submitters) ||
72
          (nb_completed_jobs < nb_submitted_jobs) || (!sched_ready) ||
73
74
          (nb_switching_machines > 0) || (nb_waiters > 0) || (nb_killers > 0) ||
          (context->submission_sched_enabled && !context->submission_sched_finished))
75
76
77
78
79
80
81
    {
        // Let's wait a message from a node or the request-reply process...
        msg_task_t task_received = NULL;
        IPMessage * task_data;
        MSG_task_receive(&(task_received), "server");
        task_data = (IPMessage *) MSG_task_get_data(task_received);

82
        XBT_INFO("Server received a message of type %s:", ip_message_type_to_string(task_data->type).c_str());
83
84
85
86
87

        switch (task_data->type)
        {
        case IPMessageType::SUBMITTER_HELLO:
        {
88
            xbt_assert(task_data->data != nullptr);
89
90
            xbt_assert(!all_jobs_submitted_and_completed,
                       "A new submitter said hello but all jobs have already been submitted and completed... Aborting.");
91
92
93
94
95
96
            SubmitterHelloMessage * message = (SubmitterHelloMessage *) task_data->data;

            xbt_assert(submitters.count(message->submitter_name) == 0,
                       "Invalid new submitter '%s': a submitter with the same name already exists!",
                       message->submitter_name.c_str());

97
            nb_submitters++;
98
99
100
101
102
103
104
105

            Submitter * submitter = new Submitter;
            submitter->mailbox = message->submitter_name;
            submitter->should_be_called_back = message->enable_callback_on_job_completion;

            submitters[message->submitter_name] = submitter;

            XBT_INFO("New submitter said hello. Number of polite submitters: %d", nb_submitters);
106
107
108
109
110

        } break; // end of case SUBMITTER_HELLO

        case IPMessageType::SUBMITTER_BYE:
        {
111
112
113
114
115
116
            xbt_assert(task_data->data != nullptr);
            SubmitterByeMessage * message = (SubmitterByeMessage *) task_data->data;

            xbt_assert(submitters.count(message->submitter_name) == 1);
            submitters.erase(message->submitter_name);

117
            nb_submitters_finished++;
Millian Poquet's avatar
Millian Poquet committed
118
119
            if (message->is_workflow_submitter)
                nb_workflow_submitters_finished++;
120
            XBT_INFO("A submitted said goodbye. Number of finished submitters: %d", nb_submitters_finished);
121

122
123
            if (!all_jobs_submitted_and_completed &&
                nb_completed_jobs == nb_submitted_jobs &&
124
125
                nb_submitters_finished == nb_submitters &&
                (!context->submission_sched_enabled || context->submission_sched_finished))
126
127
128
129
            {
                all_jobs_submitted_and_completed = true;
                XBT_INFO("It seems that all jobs have been submitted and completed!");

Millian Poquet's avatar
Millian Poquet committed
130
                context->proto_writer->append_simulation_ends(MSG_get_clock());
131
132
            }

133
134
135
136
137
138
139
        } break; // end of case SUBMITTER_BYE

        case IPMessageType::JOB_COMPLETED:
        {
            xbt_assert(task_data->data != nullptr);
            JobCompletedMessage * message = (JobCompletedMessage *) task_data->data;

140
141
142
143
144
145
146
            if (origin_of_jobs.count(message->job_id) == 1)
            {
                // Let's call the submitter which submitted the job back
                SubmitterJobCompletionCallbackMessage * msg = new SubmitterJobCompletionCallbackMessage;
                msg->job_id = message->job_id;

                Submitter * submitter = origin_of_jobs.at(message->job_id);
147
                dsend_message(submitter->mailbox, IPMessageType::SUBMITTER_CALLBACK, (void*) msg);
148
149
150
151

                origin_of_jobs.erase(message->job_id);
            }

152
153
154
            nb_running_jobs--;
            xbt_assert(nb_running_jobs >= 0);
            nb_completed_jobs++;
155
            xbt_assert(nb_completed_jobs + nb_running_jobs <= nb_submitted_jobs);
156
            Job * job = context->workloads.job_at(message->job_id);
157

158
159
            XBT_INFO("Job %s!%d COMPLETED. %d jobs completed so far",
                     job->workload->name.c_str(), job->number, nb_completed_jobs);
160

Millian Poquet's avatar
Millian Poquet committed
161
162
163
164
165
166
167
            string status = "UNKNOWN";
            if (job->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY)
                status = "SUCCESS";
            else if (job->state == JobState::JOB_STATE_COMPLETED_KILLED && job->kill_reason == "Walltime reached")
                status = "TIMEOUT";

            context->proto_writer->append_job_completed(message->job_id.to_string(), status, MSG_get_clock());
168

169
170
            if (!all_jobs_submitted_and_completed &&
                nb_completed_jobs == nb_submitted_jobs &&
171
172
                nb_submitters_finished == nb_submitters &&
                (!context->submission_sched_enabled || context->submission_sched_finished))
173
174
175
176
            {
                all_jobs_submitted_and_completed = true;
                XBT_INFO("It seems that all jobs have been submitted and completed!");

Millian Poquet's avatar
Millian Poquet committed
177
                context->proto_writer->append_simulation_ends(MSG_get_clock());
178
            }
179
180
181
182
        } break; // end of case JOB_COMPLETED

        case IPMessageType::JOB_SUBMITTED:
        {
Millian Poquet's avatar
Millian Poquet committed
183
184
185
186
187
188
            // Ignore all submissions if -k was specified and all workflows have completed
            if ((context->workflows.size() != 0) && (context->terminate_with_last_workflow) &&
                    (nb_workflow_submitters_finished == context->workflows.size()))
            {
                XBT_INFO("Ignoring Job due to -k command-line option");
                break;
189
190
            }

191
192
193
            xbt_assert(task_data->data != nullptr);
            JobSubmittedMessage * message = (JobSubmittedMessage *) task_data->data;

194
195
196
197
198
199
200
201
202
            xbt_assert(submitters.count(message->submitter_name) == 1);

            Submitter * submitter = submitters.at(message->submitter_name);
            if (submitter->should_be_called_back)
            {
                xbt_assert(origin_of_jobs.count(message->job_id) == 0);
                origin_of_jobs[message->job_id] = submitter;
            }

203
            // Let's retrieve the Job from memory (or add it into memory if it is dynamic)
204
            XBT_INFO("GOT JOB: %s %d\n", message->job_id.workload_name.c_str(), message->job_id.job_number);
205
206
            xbt_assert(context->workloads.job_exists(message->job_id));
            Job * job = context->workloads.job_at(message->job_id);
207
            job->id = message->job_id.to_string();
208

209
210
211
            // Update control information
            job->state = JobState::JOB_STATE_SUBMITTED;
            nb_submitted_jobs++;
212
213
            XBT_INFO("Job %s SUBMITTED. %d jobs submitted so far",
                     message->job_id.to_string().c_str(), nb_submitted_jobs);
214

215
216
217
218
219
220
221
222
223
224
            string job_json_description, profile_json_description;

            if (!context->redis_enabled)
            {
                job_json_description = job->json_description;
                if (context->submission_forward_profiles)
                    profile_json_description = job->workload->profiles->at(job->profile)->json_description;
            }

            context->proto_writer->append_job_submitted(job->id, job_json_description, profile_json_description, MSG_get_clock());
225
226
        } break; // end of case JOB_SUBMITTED

227
228
229
230
231
        case IPMessageType::JOB_SUBMITTED_BY_DP:
        {
            xbt_assert(task_data->data != nullptr);
            JobSubmittedByDPMessage * message = (JobSubmittedByDPMessage *) task_data->data;

232
233
234
235
236
            xbt_assert(context->submission_sched_enabled,
                       "Job submission coming from the decision process received but the option "
                       "seems disabled... It can be activated by specifying a configuration file "
                       "to Batsim.");

237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
            xbt_assert(!context->workloads.job_exists(message->job_id),
                       "Bad job submission received from the decision process: job %s already exists.",
                       message->job_id.to_string().c_str());

            // Let's create the workload if it doesn't exist, or retrieve it otherwise
            Workload * workload = nullptr;
            if (context->workloads.exists(message->job_id.workload_name))
                workload = context->workloads.at(message->job_id.workload_name);
            else
            {
                workload = new Workload(message->job_id.workload_name);
                context->workloads.insert_workload(workload->name, workload);
            }

            // Let's parse the user-submitted job
            XBT_INFO("Parsing user-submitted job %s", message->job_id.to_string().c_str());
Millian Poquet's avatar
Millian Poquet committed
253
254
            Job * job = Job::from_json(message->job_description, workload,
                                       "Invalid JSON job submitted by the scheduler");
255
            workload->jobs->add_job(job);
256
            job->id = JobIdentifier(workload->name, job->number).to_string();
257
258
259
260

            // Let's parse the profile if needed
            if (!workload->profiles->exists(job->profile))
            {
Millian Poquet's avatar
Millian Poquet committed
261
262
263
264
265
266
                XBT_INFO("The profile of user-submitted job '%s' does not exist yet.",
                         job->profile.c_str());

                Profile * profile = Profile::from_json(job->profile,
                                                       message->job_profile_description,
                                                       "Invalid JSON profile received from the scheduler");
267
268
                workload->profiles->add_profile(job->profile, profile);
            }
Millian Poquet's avatar
Millian Poquet committed
269
            // TODO? check profile collisions otherwise
270

271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
            // Let's set the job state
            job->state = JobState::JOB_STATE_SUBMITTED;

            // Let's update global states
            ++nb_submitted_jobs;

            if (context->submission_sched_ack)
            {
                string job_json_description, profile_json_description;

                if (!context->redis_enabled)
                {
                    job_json_description = job->json_description;
                    if (context->submission_forward_profiles)
                        profile_json_description = job->workload->profiles->at(job->profile)->json_description;
                }

                context->proto_writer->append_job_submitted(job->id, job_json_description, profile_json_description, MSG_get_clock());
            }

291
292
        } break; // end of case JOB_SUBMITTED_BY_DP

293
        case IPMessageType::SCHED_REJECT_JOB:
294
295
296
297
        {
            xbt_assert(task_data->data != nullptr);
            JobRejectedMessage * message = (JobRejectedMessage *) task_data->data;

298
            Job * job = context->workloads.job_at(message->job_id);
299
300
301
            job->state = JobState::JOB_STATE_REJECTED;
            nb_completed_jobs++;

Millian Poquet's avatar
Millian Poquet committed
302
303
            XBT_INFO("Job %d (workload=%s) has been rejected",
                     job->number, job->workload->name.c_str());
304
305
        } break; // end of case SCHED_REJECTION

306
307
308
309
310
311
312
313
314
        case IPMessageType::SCHED_KILL_JOB:
        {
            xbt_assert(task_data->data != nullptr);
            KillJobMessage * message = (KillJobMessage *) task_data->data;

            KillerProcessArguments * args = new KillerProcessArguments;
            args->context = context;
            args->jobs_ids = message->jobs_ids;

315
316
317
            for (const JobIdentifier & job_id : args->jobs_ids)
            {
                Job * job = context->workloads.job_at(job_id);
Millian Poquet's avatar
Millian Poquet committed
318
                (void) job; // Avoids a warning if assertions are ignored
319
320
321
322
323
324
325
                xbt_assert(job->state == JobState::JOB_STATE_RUNNING ||
                           job->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY ||
                           job->state == JobState::JOB_STATE_COMPLETED_KILLED,
                           "Invalid KILL_JOB: job_id '%s' refers to a job not being executed nor completed.",
                           job_id.to_string().c_str());
            }

326
            MSG_process_create("killer_process", killer_process, (void *) args, MSG_host_self());
327
            ++nb_killers;
328
329
        } break; // end of case SCHED_KILL_JOB

Millian Poquet's avatar
Millian Poquet committed
330
        case IPMessageType::SCHED_CALL_ME_LATER:
331
332
        {
            xbt_assert(task_data->data != nullptr);
Millian Poquet's avatar
Millian Poquet committed
333
            CallMeLaterMessage * message = (CallMeLaterMessage *) task_data->data;
334

335
336
            xbt_assert(message->target_time > MSG_get_clock(), "You asked to be awaken in the past! (you ask: %f, it is: %f)", message->target_time, MSG_get_clock());

337
338
339
340
            WaiterProcessArguments * args = new WaiterProcessArguments;
            args->target_time = message->target_time;

            string pname = "waiter " + to_string(message->target_time);
341
            MSG_process_create(pname.c_str(), waiter_process, (void*) args, context->machines.master_machine()->host);
342
            ++nb_waiters;
Millian Poquet's avatar
Millian Poquet committed
343
        } break; // end of case SCHED_CALL_ME_LATER
344
345
346
347
348
349

        case IPMessageType::PSTATE_MODIFICATION:
        {
            xbt_assert(task_data->data != nullptr);
            PStateModificationMessage * message = (PStateModificationMessage *) task_data->data;

350
            context->current_switches.add_switch(message->machine_ids, message->new_pstate);
351
            context->energy_tracer.add_pstate_change(MSG_get_clock(), message->machine_ids, message->new_pstate);
352
353
354
355
356
357
358
359
360
361
362
363
364

            // Let's quickly check whether this is a switchON or a switchOFF
            // Unknown transition states will be set to -42.
            int transition_state = -42;
            Machine * first_machine = context->machines[message->machine_ids.first_element()];
            if (first_machine->pstates[message->new_pstate] == PStateType::COMPUTATION_PSTATE)
                transition_state = -1; // means we are switching to a COMPUTATION_PSTATE
            else if (first_machine->pstates[message->new_pstate] == PStateType::SLEEP_PSTATE)
                transition_state = -2; // means we are switching to a SLEEP_PSTATE

            // The pstate is set to an invalid one to know the machines are in transition.
            context->pstate_tracer.add_pstate_change(MSG_get_clock(), message->machine_ids,
                                                     transition_state);
365

366
367
368
            for (auto machine_it = message->machine_ids.elements_begin();
                 machine_it != message->machine_ids.elements_end();
                 ++machine_it)
369
            {
370
371
372
373
374
                const int machine_id = *machine_it;
                Machine * machine = context->machines[machine_id];
                int curr_pstate = MSG_host_get_pstate(machine->host);

                if (machine->pstates[curr_pstate] == PStateType::COMPUTATION_PSTATE)
375
                {
376
377
378
379
380
381
382
                    if (machine->pstates[message->new_pstate] == PStateType::COMPUTATION_PSTATE)
                    {
                        XBT_INFO("Switching machine %d ('%s') pstate : %d -> %d.", machine->id,
                                 machine->name.c_str(), curr_pstate, message->new_pstate);
                        MSG_host_set_pstate(machine->host, message->new_pstate);
                        xbt_assert(MSG_host_get_pstate(machine->host) == message->new_pstate);

383
                        MachineRange all_switched_machines;
384
                        if (context->current_switches.mark_switch_as_done(machine->id, message->new_pstate,
385
                                                                          all_switched_machines, context))
386
                        {
387
388
389
                            context->proto_writer->append_resource_state_changed(all_switched_machines,
                                                                                 std::to_string(message->new_pstate),
                                                                                 MSG_get_clock());
390
                        }
391
392
393
                    }
                    else if (machine->pstates[message->new_pstate] == PStateType::SLEEP_PSTATE)
                    {
394
                        machine->update_machine_state(MachineState::TRANSITING_FROM_COMPUTING_TO_SLEEPING);
395
396
397
398
399
400
401
402
403
404
405
406
407
                        SwitchPStateProcessArguments * args = new SwitchPStateProcessArguments;
                        args->context = context;
                        args->machine_id = machine_id;
                        args->new_pstate = message->new_pstate;

                        string pname = "switch ON " + to_string(machine_id);
                        MSG_process_create(pname.c_str(), switch_off_machine_process, (void*)args, machine->host);

                        ++nb_switching_machines;
                    }
                    else
                        XBT_ERROR("Switching from a communication pstate to an invalid pstate on machine %d ('%s') : %d -> %d",
                                  machine->id, machine->name.c_str(), curr_pstate, message->new_pstate);
408
                }
409
                else if (machine->pstates[curr_pstate] == PStateType::SLEEP_PSTATE)
410
                {
411
412
413
414
                    xbt_assert(machine->pstates[message->new_pstate] == PStateType::COMPUTATION_PSTATE,
                            "Switching from a sleep pstate to a non-computation pstate on machine %d ('%s') : %d -> %d, which is forbidden",
                            machine->id, machine->name.c_str(), curr_pstate, message->new_pstate);

415
                    machine->update_machine_state(MachineState::TRANSITING_FROM_SLEEPING_TO_COMPUTING);
416
417
                    SwitchPStateProcessArguments * args = new SwitchPStateProcessArguments;
                    args->context = context;
418
419
                    args->machine_id = machine_id;
                    args->new_pstate = message->new_pstate;
420

421
422
                    string pname = "switch OFF " + to_string(machine_id);
                    MSG_process_create(pname.c_str(), switch_on_machine_process, (void*)args, machine->host);
423
424
425
426

                    ++nb_switching_machines;
                }
                else
427
                    XBT_ERROR("Machine %d ('%s') has an invalid pstate : %d", machine->id, machine->name.c_str(), curr_pstate);
428
429
            }

430
431
432
            if (context->trace_machine_states)
                context->machine_state_tracer.write_machine_states(MSG_get_clock());

433
434
        } break; // end of case PSTATE_MODIFICATION

435
        case IPMessageType::SCHED_EXECUTE_JOB:
436
437
        {
            xbt_assert(task_data->data != nullptr);
438
            ExecuteJobMessage * message = (ExecuteJobMessage *) task_data->data;
439
            SchedulingAllocation * allocation = message->allocation;
440

441
442
            Job * job = context->workloads.job_at(allocation->job_id);
            job->state = JobState::JOB_STATE_RUNNING;
443

444
445
            nb_running_jobs++;
            xbt_assert(nb_running_jobs <= nb_submitted_jobs);
446

447
448
449
            if (!context->allow_time_sharing)
            {
                for (auto machine_id_it = allocation->machine_ids.elements_begin(); machine_id_it != allocation->machine_ids.elements_end(); ++machine_id_it)
450
                {
451
452
453
454
455
456
457
458
                    int machine_id = *machine_id_it;
                    const Machine * machine = context->machines[machine_id];
                    (void) machine; // Avoids a warning if assertions are ignored
                    xbt_assert(machine->jobs_being_computed.empty(),
                               "Invalid job allocation: machine %d ('%s') is currently computing jobs (these ones:"
                               " {%s}) whereas space sharing is forbidden. Space sharing can be enabled via an option,"
                               " try --help to display the available options", machine->id, machine->name.c_str(),
                               machine->jobs_being_computed_as_string().c_str());
459
                }
460
            }
461

462
463
464
465
            if (context->energy_used)
            {
                // Check that every machine is in a computation pstate
                for (auto machine_id_it = allocation->machine_ids.elements_begin(); machine_id_it != allocation->machine_ids.elements_end(); ++machine_id_it)
466
                {
467
468
469
470
471
472
473
474
475
476
477
                    int machine_id = *machine_id_it;
                    Machine * machine = context->machines[machine_id];
                    int ps = MSG_host_get_pstate(machine->host);
                    (void) ps; // Avoids a warning if assertions are ignored
                    xbt_assert(machine->has_pstate(ps));
                    xbt_assert(machine->pstates[ps] == PStateType::COMPUTATION_PSTATE,
                               "Invalid job allocation: machine %d ('%s') is not in a computation pstate (ps=%d)",
                               machine->id, machine->name.c_str(), ps);
                    xbt_assert(machine->state == MachineState::COMPUTING || machine->state == MachineState::IDLE,
                               "Invalid job allocation: machine %d ('%s') cannot compute jobs now (the machine is"
                               " neither computing nor being idle)", machine->id, machine->name.c_str());
478
                }
479
            }
480

481
482
483
484
485
486
487
488
            // Let's generate the hosts used by the job
            allocation->hosts.clear();
            allocation->hosts.reserve(allocation->machine_ids.size());
            int host_i = 0;
            for (auto machine_it = allocation->machine_ids.elements_begin(); machine_it != allocation->machine_ids.elements_end(); ++machine_it,++host_i)
            {
                int machine_id = *machine_it;
                allocation->hosts[host_i] = context->machines[machine_id]->host;
489
490
            }

491
492
493
494
495
496
497
498
            ExecuteJobProcessArguments * exec_args = new ExecuteJobProcessArguments;
            exec_args->context = context;
            exec_args->allocation = allocation;
            string pname = "job_" + job->id;
            msg_process_t process = MSG_process_create(pname.c_str(), execute_job_process,
                                                       (void*)exec_args,
                                                       context->machines[allocation->machine_ids.first_element()]->host);
            job->execution_processes.insert(process);
499
500
501
502
        } break; // end of case SCHED_ALLOCATION

        case IPMessageType::WAITING_DONE:
        {
Millian Poquet's avatar
Millian Poquet committed
503
            context->proto_writer->append_nop(MSG_get_clock());
504
            --nb_waiters;
505
506
507
508
509
510
511
        } break; // end of case WAITING_DONE

        case IPMessageType::SCHED_READY:
        {
            sched_ready = true;
        } break; // end of case SCHED_READY

512
513
        case IPMessageType::SCHED_WAIT_ANSWER:
        {
Millian Poquet's avatar
Millian Poquet committed
514
515
            SchedWaitAnswerMessage * message = new SchedWaitAnswerMessage;
            *message = *( (SchedWaitAnswerMessage *) task_data->data);
516

Millian Poquet's avatar
Millian Poquet committed
517
518
519
            //	  Submitter * submitter = origin_of_wait_queries.at({message->nb_resources,message->processing_time});
            dsend_message(message->submitter_name, IPMessageType::SCHED_WAIT_ANSWER, (void*) message);
            //	  origin_of_wait_queries.erase({message->nb_resources,message->processing_time});
520
521
522
523
        } break; // end of case SCHED_WAIT_ANSWER

        case IPMessageType::WAIT_QUERY:
        {
Millian Poquet's avatar
Millian Poquet committed
524
            //WaitQueryMessage * message = (WaitQueryMessage *) task_data->data;
525

Millian Poquet's avatar
Millian Poquet committed
526
            //	  XBT_INFO("received : %s , %s\n", to_string(message->nb_resources).c_str(), to_string(message->processing_time).c_str());
Millian Poquet's avatar
Millian Poquet committed
527
            xbt_assert(false, "Unimplemented! TODO");
528

Millian Poquet's avatar
Millian Poquet committed
529
530
            //Submitter * submitter = submitters.at(message->submitter_name);
            //origin_of_wait_queries[{message->nb_resources,message->processing_time}] = submitter;
531
532
        } break; // end of case WAIT_QUERY

533
534
535
        case IPMessageType::SWITCHED_ON:
        {
            xbt_assert(task_data->data != nullptr);
536
            SwitchONMessage * message = (SwitchONMessage *) task_data->data;
537

538
539
            xbt_assert(context->machines.exists(message->machine_id));
            Machine * machine = context->machines[message->machine_id];
540
            (void) machine; // Avoids a warning if assertions are ignored
541
542
            xbt_assert(MSG_host_get_pstate(machine->host) == message->new_pstate);

543
            MachineRange all_switched_machines;
544
            if (context->current_switches.mark_switch_as_done(message->machine_id, message->new_pstate,
545
                                                              all_switched_machines, context))
546
            {
547
548
549
                if (context->trace_machine_states)
                    context->machine_state_tracer.write_machine_states(MSG_get_clock());

550
551
552
                context->proto_writer->append_resource_state_changed(all_switched_machines,
                                                                     std::to_string(message->new_pstate),
                                                                     MSG_get_clock());
553
            }
554
555
556
557
558
559
560

            --nb_switching_machines;
        } break; // end of case SWITCHED_ON

        case IPMessageType::SWITCHED_OFF:
        {
            xbt_assert(task_data->data != nullptr);
561
            SwitchOFFMessage * message = (SwitchOFFMessage *) task_data->data;
562

563
564
            xbt_assert(context->machines.exists(message->machine_id));
            Machine * machine = context->machines[message->machine_id];
565
            (void) machine; // Avoids a warning if assertions are ignored
566
567
            xbt_assert(MSG_host_get_pstate(machine->host) == message->new_pstate);

568
            MachineRange all_switched_machines;
569
            if (context->current_switches.mark_switch_as_done(message->machine_id, message->new_pstate,
570
                                                              all_switched_machines, context))
571
            {
572
573
574
                if (context->trace_machine_states)
                    context->machine_state_tracer.write_machine_states(MSG_get_clock());

575
576
577
                context->proto_writer->append_resource_state_changed(all_switched_machines,
                                                                     std::to_string(message->new_pstate),
                                                                     MSG_get_clock());
578
            }
579
580
581

            --nb_switching_machines;
        } break; // end of case SWITCHED_ON
582
583
584
585

        case IPMessageType::SCHED_TELL_ME_ENERGY:
        {
            long double total_consumed_energy = context->machines.total_consumed_energy(context);
Millian Poquet's avatar
Millian Poquet committed
586
            context->proto_writer->append_query_reply_energy(total_consumed_energy, MSG_get_clock());
587
588
589
590
591
592
        } break; // end of case SCHED_TELL_ME_ENERGY

        case IPMessageType::SUBMITTER_CALLBACK:
        {
            xbt_assert(false, "The server received a SUBMITTER_CALLBACK message, which should not happen");
        } break; // end of case SUBMITTER_CALLBACK
593
594
595

        case IPMessageType::KILLING_DONE:
        {
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
            xbt_assert(task_data->data != nullptr);
            KillingDoneMessage * message = (KillingDoneMessage *) task_data->data;

            vector<string> job_ids_str;
            vector<string> really_killed_job_ids_str;
            job_ids_str.reserve(message->jobs_ids.size());

            for (const JobIdentifier & job_id : message->jobs_ids)
            {
                job_ids_str.push_back(job_id.to_string());

                const Job * job = context->workloads.job_at(job_id);
                if (job->state == JobState::JOB_STATE_COMPLETED_KILLED &&
                    job->kill_reason == "Killed from killer_process (probably requested by the decision process)")
                {
                    nb_running_jobs--;
                    xbt_assert(nb_running_jobs >= 0);
                    nb_completed_jobs++;
                    xbt_assert(nb_completed_jobs + nb_running_jobs <= nb_submitted_jobs);

                    really_killed_job_ids_str.push_back(job_id.to_string());
                }
            }

            XBT_INFO("Jobs {%s} have been killed (the following ones have REALLY been killed: {%s})",
                     boost::algorithm::join(job_ids_str, ",").c_str(),
                     boost::algorithm::join(really_killed_job_ids_str, ",").c_str());

            context->proto_writer->append_job_killed(job_ids_str, MSG_get_clock());
            --nb_killers;

            if (!all_jobs_submitted_and_completed &&
                nb_completed_jobs == nb_submitted_jobs &&
629
630
                nb_submitters_finished == nb_submitters &&
                (!context->submission_sched_enabled || context->submission_sched_finished))
631
632
633
634
635
636
            {
                all_jobs_submitted_and_completed = true;
                XBT_INFO("It seems that all jobs have been submitted and completed!");

                context->proto_writer->append_simulation_ends(MSG_get_clock());
            }
637
        } break; // end of case KILLING_DONE
638
639
640
641
642
        } // end of switch

        delete task_data;
        MSG_task_destroy(task_received);

Millian Poquet's avatar
Millian Poquet committed
643
        if (sched_ready && !context->proto_writer->is_empty() && !end_of_simulation_sent)
644
645
646
        {
            RequestReplyProcessArguments * req_rep_args = new RequestReplyProcessArguments;
            req_rep_args->context = context;
Millian Poquet's avatar
Millian Poquet committed
647
648
            req_rep_args->send_buffer = context->proto_writer->generate_current_message(MSG_get_clock());
            context->proto_writer->clear();
649
650
651

            MSG_process_create("Scheduler REQ-REP", request_reply_scheduler_process, (void*)req_rep_args, MSG_host_self());
            sched_ready = false;
652
653
            if (all_jobs_submitted_and_completed)
                end_of_simulation_sent = true;
654
655
656
657
        }

    } // end of while

658
    XBT_INFO("Simulation is finished!");
659
    bool simulation_is_completed = all_jobs_submitted_and_completed;
Millian Poquet's avatar
Millian Poquet committed
660
    (void) simulation_is_completed; // Avoids a warning if assertions are ignored
661
    xbt_assert(simulation_is_completed, "Left simulation loop, but the simulation does NOT seem finished...");
662
663
664
665

    delete args;
    return 0;
}