workload.cpp 10.1 KB
Newer Older
1
2
3
4
5
/**
 * @file workload.cpp
 * @brief Contains workload-related functions
 */

6
7
8
9
10
11
12
#include "workload.hpp"

#include <fstream>
#include <streambuf>

#include <rapidjson/document.h>

13
14
#include <smpi/smpi.h>

15
16
17
#include "context.hpp"
#include "jobs.hpp"
#include "profiles.hpp"
18
#include "jobs_execution.hpp"
19
20
21
22

using namespace std;
using namespace rapidjson;

Millian Poquet's avatar
Millian Poquet committed
23
XBT_LOG_NEW_DEFAULT_CATEGORY(workload, "workload"); //!< Logging
24

25
Workload::Workload(std::string arg_name)
26
27
28
29
30
31
{
    jobs = new Jobs;
    profiles = new Profiles;

    jobs->setProfiles(profiles);
    jobs->setWorkload(this);
32
    this->name = arg_name;
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
}

Workload::~Workload()
{
    delete jobs;
    delete profiles;

    jobs = nullptr;
    profiles = nullptr;
}

void Workload::load_from_json(const std::string &json_filename, int &nb_machines)
{
    XBT_INFO("Loading JSON workload '%s'...", json_filename.c_str());
    // Let the file content be placed in a string
    ifstream ifile(json_filename);
    xbt_assert(ifile.is_open(), "Cannot read file '%s'", json_filename.c_str());
    string content;

    ifile.seekg(0, ios::end);
    content.reserve(ifile.tellg());
    ifile.seekg(0, ios::beg);

    content.assign((std::istreambuf_iterator<char>(ifile)),
                std::istreambuf_iterator<char>());

    // JSON document creation
    Document doc;
    doc.Parse(content.c_str());
62
    xbt_assert(doc.IsObject(), "Invalid JSON file '%s': not a JSON object", json_filename.c_str());
63
64
65
66
67
68
69
70
71
72
73
74

    // Let's try to read the number of machines in the JSON document
    xbt_assert(doc.HasMember("nb_res"), "Invalid JSON file '%s': the 'nb_res' field is missing", json_filename.c_str());
    const Value & nb_res_node = doc["nb_res"];
    xbt_assert(nb_res_node.IsInt(), "Invalid JSON file '%s': the 'nb_res' field is not an integer", json_filename.c_str());
    nb_machines = nb_res_node.GetInt();
    xbt_assert(nb_machines > 0, "Invalid JSON file '%s': the value of the 'nb_res' field is invalid (%d)",
               json_filename.c_str(), nb_machines);

    jobs->load_from_json(doc, json_filename);
    profiles->load_from_json(doc, json_filename);

75
76
    XBT_INFO("JSON workload parsed sucessfully. Read %d jobs and %d profiles.",
             jobs->nb_jobs(), profiles->nb_profiles());
77
78
79
80
81
82
83
    XBT_INFO("Checking workload validity...");
    check_validity();
    XBT_INFO("Workload seems to be valid.");
}

void Workload::register_smpi_applications()
{
84
85
    XBT_INFO("Registering SMPI applications of workload '%s'...", name.c_str());

86
87
88
    for (auto mit : jobs->jobs())
    {
        Job * job = mit.second;
89
90
91
92
93
94
95
        Profile * profile = (*profiles)[job->profile];

        if (profile->type == ProfileType::SMPI)
        {
            SmpiProfileData * data = (SmpiProfileData *) profile->data;

            string job_id_str = name + "!" + to_string(job->number);
Millian Poquet's avatar
Millian Poquet committed
96
97
            XBT_INFO("Registering app. instance='%s', nb_process=%d",
                     job_id_str.c_str(), (int) data->trace_filenames.size());
98
99
            SMPI_app_instance_register(job_id_str.c_str(), smpi_replay_process, data->trace_filenames.size());
        }
100
    }
101
102

    XBT_INFO("SMPI applications of workload '%s' have been registered.", name.c_str());
103
104
105
}

void Workload::check_validity()
106
107
{
    // Let's check that every SEQUENCE-typed profile points to existing profiles
108
    for (auto mit : profiles->profiles())
109
110
111
112
113
114
    {
        Profile * profile = mit.second;
        if (profile->type == ProfileType::SEQUENCE)
        {
            SequenceProfileData * data = (SequenceProfileData *) profile->data;
            for (const auto & prof : data->sequence)
115
116
            {
                (void) prof; // Avoids a warning if assertions are ignored
117
                xbt_assert(profiles->exists(prof),
118
119
120
                           "Invalid composed profile '%s': the used profile '%s' does not exist",
                           mit.first.c_str(), prof.c_str());
            }
121
122
123
124
125
126
127
        }
    }

    // TODO : check that there are no circular calls between composed profiles...
    // TODO: compute the constraint of the profile number of resources, to check if it match the jobs that use it

    // Let's check that the profile of each job exists
128
    for (auto mit : jobs->jobs())
129
130
    {
        Job * job = mit.second;
131
132
133
        xbt_assert(profiles->exists(job->profile),
                   "Invalid job %d: the associated profile '%s' does not exist",
                   job->number, job->profile.c_str());
134

135
        const Profile * profile = profiles->at(job->profile);
136
137
138
        if (profile->type == ProfileType::MSG_PARALLEL)
        {
            MsgParallelProfileData * data = (MsgParallelProfileData *) profile->data;
139
            (void) data; // Avoids a warning if assertions are ignored
140
141
142
143
            xbt_assert(data->nb_res == job->required_nb_res,
                       "Invalid job %d: the requested number of resources (%d) do NOT match"
                       " the number of resources of the associated profile '%s' (%d)",
                       job->number, job->required_nb_res, job->profile.c_str(), data->nb_res);
144
145
146
147
148
        }
        else if (profile->type == ProfileType::SEQUENCE)
        {
            // TODO: check if the number of resources matches a resource-constrained composed profile
        }
149
150
151
152
153
154
155
156
157
    }
}



Workloads::Workloads()
{

}
158

159
160
161
162
163
164
Workloads::~Workloads()
{
    for (auto mit : _workloads)
    {
        Workload * workload = mit.second;
        delete workload;
165
    }
166
    _workloads.clear();
167
168
}

169
Workload *Workloads::operator[](const std::string &workload_name)
170
{
171
172
    return at(workload_name);
}
173

174
175
176
177
const Workload *Workloads::operator[](const std::string &workload_name) const
{
    return at(workload_name);
}
178

179
180
181
182
183
Workload *Workloads::at(const std::string &workload_name)
{
    xbt_assert(exists(workload_name));
    return _workloads.at(workload_name);
}
184

185
186
187
188
189
const Workload *Workloads::at(const std::string &workload_name) const
{
    xbt_assert(exists(workload_name));
    return _workloads.at(workload_name);
}
190

191
192
193
194
Job *Workloads::job_at(const std::string &workload_name, int job_number)
{
    return at(workload_name)->jobs->at(job_number);
}
195

196
197
198
199
const Job *Workloads::job_at(const std::string &workload_name, int job_number) const
{
    return at(workload_name)->jobs->at(job_number);
}
200

201
202
203
Job *Workloads::job_at(const JobIdentifier &job_id)
{
    return at(job_id.workload_name)->jobs->at(job_id.job_number);
204
}
205

206
const Job *Workloads::job_at(const JobIdentifier &job_id) const
207
{
208
209
210
211
212
213
214
215
216
217
218
219
220
221
    return at(job_id.workload_name)->jobs->at(job_id.job_number);
}

void Workloads::insert_workload(const std::string &workload_name, Workload *workload)
{
    xbt_assert(!exists(workload_name));
    xbt_assert(!exists(workload->name));

    workload->name = workload_name;
    _workloads[workload_name] = workload;
}

bool Workloads::exists(const std::string &workload_name) const
{
Henri C's avatar
Bug--    
Henri C committed
222

223
    return _workloads.count(workload_name) == 1;
224
}
225

226
bool Workloads::contains_smpi_job() const
227
{
228
    for (auto mit : _workloads)
229
    {
230
231
232
        Workload * workload = mit.second;
        if (workload->jobs->contains_smpi_job())
            return true;
233
    }
234
235
236
237
238
239
240
241
242
243
244
245
246

    return false;
}

void Workloads::register_smpi_applications()
{
    for (auto mit : _workloads)
    {
        Workload * workload = mit.second;
        workload->register_smpi_applications();
    }
}

247
248
249
250
251
bool Workloads::job_exists(const std::string &workload_name, const int job_number)
{
    if (!exists(workload_name))
        return false;

252
253
254
255
256
257
258
259
    if (!at(workload_name)->jobs->exists(job_number))
        return false;

    const Job * job = at(workload_name)->jobs->at(job_number);
    if (!at(workload_name)->profiles->exists(job->profile))
        return false;

    return true;
260
261
262
263
}

bool Workloads::job_exists(const JobIdentifier &job_id)
{
264
265
266
267
268
269
270
271
272
273
274
    return job_exists(job_id.workload_name, job_id.job_number);
}

Job *Workloads::add_job_if_not_exists(const JobIdentifier &job_id, BatsimContext *context)
{
    xbt_assert(this == &context->workloads,
               "Bad Workloads::add_job_if_not_exists call: The given context "
               "does not match the Workloads instance (this=%p, &context->workloads=%p",
               this, &context->workloads);

    // If the job already exists, let's just return it
275
    if (job_exists(job_id)) {
276
        return job_at(job_id);
277
278
    }

279
280
281

    // Let's create a Workload if needed
    Workload * workload = nullptr;
282
    if (!exists(job_id.workload_name))
283
    {
284
285
      workload = new Workload(job_id.workload_name);
      //        workload->name = job_id.workload_name;
286
287
288
289
290
        insert_workload(job_id.workload_name, workload);
    }
    else
        workload = at(job_id.workload_name);

291

292
293
294
    // Let's retrieve the job information from the data storage
    string job_key = RedisStorage::job_key(job_id);
    string job_json_description = context->storage.get(job_key);
295
    XBT_INFO("JOB INFO: %s", job_json_description.c_str());
296
297
298
299
300

    // Let's create a Job if needed
    Job * job = nullptr;
    if (!workload->jobs->exists(job_id.job_number))
    {
301
        XBT_INFO("CREATING JOB %d FOR WORKLOAD %s",job_id.job_number, workload->name.c_str());
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
        job = Job::from_json(job_json_description, workload);
        xbt_assert(job_id.job_number == job->number,
                   "Cannot add dynamic job %s!%d: JSON job number mismatch (%d)",
                   job_id.workload_name.c_str(), job_id.job_number, job->number);

        // Let's insert the Job in the data structure
        workload->jobs->add_job(job);
    }
    else
        job = workload->jobs->at(job_id.job_number);

    // Let's retrieve the profile information from the data storage
    string profile_key = RedisStorage::profile_key(workload->name, job->profile);
    string profile_json_description = context->storage.get(profile_key);

    // Let's create a Profile if needed
    Profile * profile = nullptr;
    if (!workload->profiles->exists(job->profile))
    {
        profile = Profile::from_json(job->profile, profile_json_description);

        // Let's insert the Profile in the data structure
        workload->profiles->add_profile(job->profile, profile);
    }
    else
        profile = workload->profiles->at(job->profile);
328

329
330
    // TODO: check job & profile consistency (nb_res, etc.)
    return job;
331
332
333
334
335
336
337
338
339
340
}

std::map<std::string, Workload *> &Workloads::workloads()
{
    return _workloads;
}

const std::map<std::string, Workload *> &Workloads::workloads() const
{
    return _workloads;
341
}