Commit de1cd4f1 authored by Steffen Lackner's avatar Steffen Lackner
Browse files

[code] send the message from the job to the scheduler via the server

parent 92f5154d
......@@ -130,6 +130,9 @@ std::string ip_message_type_to_string(IPMessageType type)
break;
case IPMessageType::TO_JOB_MSG:
s = "TO_JOB_MSG";
break;
case IPMessageType::FROM_JOB_MSG:
s = "FROM_JOB_MSG";
}
return s;
......@@ -259,6 +262,11 @@ IPMessage::~IPMessage()
ToJobMessage * msg = (ToJobMessage *) data;
delete msg;
} break;
case IPMessageType::FROM_JOB_MSG:
{
FromJobMessage * msg = (FromJobMessage *) data;
delete msg;
} break;
}
data = nullptr;
......
......@@ -74,6 +74,7 @@ enum class IPMessageType
,END_DYNAMIC_SUBMIT //!< Scheduler -> Server. The scheduler tells the server that dynamic job submissions are finished.
,CONTINUE_DYNAMIC_SUBMIT //!< Scheduler -> Server. The scheduler tells the server that dynamic job submissions continue.
,TO_JOB_MSG //!< Scheduler -> Server. The scheduler sends a message to a job.
,FROM_JOB_MSG //!< Job -> Server. The job wants to send a message to the scheduler via the server.
};
/**
......@@ -238,6 +239,15 @@ struct ToJobMessage
std::string message; //!< The message to send to the job
};
/**
* @brief The content of the FromJobMessage message
*/
struct FromJobMessage
{
JobIdentifier job_id; //!< The JobIdentifier
std::string message; //!< The message to send to the scheduler
};
/**
* @brief The base struct sent in inter-process messages
*/
......
......@@ -47,6 +47,7 @@ int execute_profile(BatsimContext *context,
{
Workload * workload = context->workloads.at(allocation->job_id.workload_name);
Job * job = workload->jobs->at(allocation->job_id.job_number);
JobIdentifier job_id(workload->name, job->number);
Profile * profile = workload->profiles->at(profile_name);
int nb_res = job->required_nb_res;
......@@ -298,12 +299,17 @@ int execute_profile(BatsimContext *context,
{
SchedulerSendProfileData * data = (SchedulerSendProfileData *) profile->data;
if (delay_job(0.005, remaining_time) == -1)
return -1;
XBT_INFO("Sending message to the scheduler: %s", data->message.c_str());
context->proto_writer->append_from_job_message(job->id, data->message, MSG_get_clock());
FromJobMessage * message = new FromJobMessage;
message->job_id = job_id;
message->message = data->message;
send_message("server", IPMessageType::FROM_JOB_MSG, (void*)message);
if (delay_job(0.005, remaining_time) == -1)
return -1;
return profile->return_code;
}
else if (profile->type == ProfileType::SCHEDULER_RECV)
......
......@@ -56,6 +56,7 @@ int server_process(int argc, char *argv[])
handler_map[IPMessageType::SCHED_EXECUTE_JOB] = server_on_execute_job;
handler_map[IPMessageType::SCHED_CHANGE_JOB_STATE] = server_on_change_job_state;
handler_map[IPMessageType::TO_JOB_MSG] = server_on_to_job_msg;
handler_map[IPMessageType::FROM_JOB_MSG] = server_on_from_job_msg;
handler_map[IPMessageType::SCHED_REJECT_JOB] = server_on_reject_job;
handler_map[IPMessageType::SCHED_KILL_JOB] = server_on_kill_jobs;
handler_map[IPMessageType::SCHED_CALL_ME_LATER] = server_on_call_me_later;
......@@ -692,6 +693,25 @@ void server_on_to_job_msg(ServerData * data,
check_submitted_and_completed(data);
}
void server_on_from_job_msg(ServerData * data,
IPMessage * task_data)
{
xbt_assert(task_data->data != nullptr);
FromJobMessage * message = (FromJobMessage *) task_data->data;
Job * job = data->context->workloads.job_at(message->job_id);
XBT_INFO("Send message to scheduler: Job %d (workload=%s) message=%s",
job->number, job->workload->name.c_str(),
message->message.c_str());
data->context->proto_writer->append_from_job_message(message->job_id.to_string(),
message->message,
MSG_get_clock());
check_submitted_and_completed(data);
}
void server_on_reject_job(ServerData * data,
IPMessage * task_data)
{
......
......@@ -230,3 +230,11 @@ void server_on_change_job_state(ServerData * data,
*/
void server_on_to_job_msg(ServerData * data,
IPMessage * task_data);
/**
* @brief Server FROM_JOB_MSG handler
* @param[in,out] data The data associated with the server_process
* @param[in,out] task_data The data associated with the message the server received
*/
void server_on_from_job_msg(ServerData * data,
IPMessage * task_data);
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment