mpi_communicator_test.py 4.24 KB
Newer Older
tlavocat's avatar
tlavocat committed
1
2
3
4
#!/usr/bin/env python3
import sys
import os
import json
lavocat's avatar
lavocat committed
5
6
7
from yggdrasil.erebor   import runner
from yggdrasil.erebor   import FrameworkControler
from yggdrasil.isengard import consts
tlavocat's avatar
tlavocat committed
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109

class MPIComTest(FrameworkControler):

    def __init__(self, erebor, ID, node_list, tfile):
        FrameworkControler.__init__(self, erebor, ID, tfile)
        self.node_list = "A,A,A,A"
        self.groups    = dict()

    def dead_nodes(self, rank, ID, nodes) :
        print("dead nodes [ from rank {} ID {}] {}".format(rank, ID, nodes))

    def network_up(self, ID) :
        self.to_wait_groups -= 1
        self.to_wait_pings   = len(self.groups) * len(self.groups)
        print("network UP {} rest to wait {}".format(ID,
            self.to_wait_groups))
        self.log_time("new group is UP")
        if self.to_wait_groups == 0 :
            self.log_time("all groups launched")
            self.erebor.terminate()
            self.close()

    def start(self, networkId):
        print("\n 'connected' \n")
        network = self.erebor.networks.get(networkId)
        self.log_time("taktuk connected")
        # get notified on dead nodes
        self.erebor.on_dead_nodes(self.dead_nodes)

        port_server = 6000
        nb_jobs     = 5
        nb_process  = 10
        self.global_wait = (nb_jobs * nb_process) + 1

        def group_server_up(ID):
            print("group server ready")
            # make it start a mpi session
            def mpi_session_started(data) :
                print("executing the client")
                cmd = os.environ["PWD"]+"/tests/mpi_server_emulator.py"
                command = "{} -r 1 -p 6000 -c {}".format(cmd, (nb_jobs*nb_process))
                self.exec_on("0", command, consts.TRUE, "0", "server", "0", "root", self.server_end)
                for i in range(0, nb_jobs) :
                    self.start_clients(nb_process, port_server, i)
            self.start_mpi_session("server", "0", "root", str(port_server),
                                    mpi_session_started)
        # start group server
        self.erebor.on_network_init("server", group_server_up)
        self.new_group_on("server", "", "0", "root", "0", "root")

    def start_clients(self, nb_process, port_server, rank) :
        i = int(rank)
        cmd = os.environ["PWD"]+"/tests/mpi_client_emulator.py"
        port_client = int(port_server + ((i+1)*nb_process))
        port_client += 1
        group_name   = "client{}".format(i)
        print("i {} port_client {} group_name {}".format(i, port_client,
            group_name))
        def group_client_up(ID):
            print("group {} ready".format(ID))
            # make it start a mpi session
            def mpi_session_started(data) :
                print("executing a simple command")
                for j in range(0, nb_process) :
                    command = "{} -r {} -p {} -s {} -n {}".format(
                                    cmd, j, port_client, "server", group_name)
                    self.exec_on("0", command, consts.TRUE, "0",
                             group_name, "0", "root", self.client_end)
            self.start_mpi_session(group_name, "0", "root", str(port_client),
                                    mpi_session_started)
        # start group client
        self.erebor.on_network_init(group_name, group_client_up)
        self.new_group_on(group_name, "", "0", "root", "0", "root")


    def server_end(self, data) :
        self.print_data(data, "server")
        self.decision_to_end()

    def client_end(self, data) :
        self.print_data(data, "client")
        self.decision_to_end()

    def print_data(self, data, prefix) :
        decoded = json.loads(data)
        value = decoded["stderr"]
        for line in value :
            line = line.split("error >")[1]
            print ("{} {} : {}".format(prefix, "stderr", line))
        value = decoded["stdout"]
        for line in value :
            line = line.split("output >")[1]
            print ("{} {} : {}".format(prefix, "stdout", line))
        print ("{} {} : {}".format(prefix, "status", decoded["status"]))

    def decision_to_end(self) :
        self.global_wait -= 1
        if self.global_wait == 0 :
            self.erebor.terminate()
            self.close()

if __name__ == "__main__":
lavocat's avatar
lavocat committed
110
    sys.exit(runner(sys.argv, MPIComTest))