summaryrefslogtreecommitdiff
path: root/daemon2.c
blob: bb1465efd5cf4e5c328ec15fe438230f0212e6ee (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <pthread.h>
#include "shared.h"

#define NODES 2

void *load_balance_thread(void *arg) {
    NodeStatusArray *status_array = (NodeStatusArray *)arg;
    while (1) {
        pthread_mutex_lock(&status_array->mutex);
        printf("[daemon] Acquired mutex for load balancing on %s\n", getenv("HOSTNAME") ? getenv("HOSTNAME") : "unknown");

        float total = 0;
        float max = -1, min = 1e9;
        int max_id = -1, min_id = -1;

        for (int i = 0; i < NODES; i++) {
            float load = status_array->statuses[i].load;
            total += load;
            if (load > max) {
                max = load;
                max_id = i;
            }
            if (load < min) {
                min = load;
                min_id = i;
            }
        }

        float avg = total / NODES;
        char cmd[512] = {0};
        int new_min_port = -1;
        if ((max - min) > (avg / 2) && max_id != -1 && min_id != -1 && (max - min)>0.3) {
            printf("[daemon] Triggering migration: max load node %d -> min load node %d\n", max_id, min_id);
            new_min_port = status_array->statuses[min_id].lport + 1;
            status_array->statuses[min_id].lport = new_min_port;
        }

        pthread_mutex_unlock(&status_array->mutex);
        printf("[daemon] Released mutex for load balancing on %s\n", getenv("HOSTNAME") ? getenv("HOSTNAME") : "unknown");

        if (new_min_port != -1) {
            int ret = snprintf(cmd, sizeof(cmd),
                               "mpirun -np 1 -host node%d ~/dmtcp/bin/migrate_heavy_job2.sh %d %d %d",
                               max_id+1, min_id+1, new_min_port, new_min_port);
            if (ret >= sizeof(cmd)) {
                fprintf(stderr, "[daemon] Command truncated in load_balance_thread\n");
            } else {
                printf("[daemon] Running migration script on node%d: %s\n", max_id, cmd);
                if (system(cmd) != 0) {
                    fprintf(stderr, "[daemon] Migration script failed on node%d\n", max_id);
                }
            }
        }

        sleep(60);
    }
    return NULL;
}

void run_job_on_node(const char *binary_path, int node_id, int port) {
    char cmd[512];
    int ret = snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d ~/dmtcp/bin/dmtcp_coordinator --port %d --daemon", node_id, port);
    if (ret >= sizeof(cmd)) {
        fprintf(stderr, "[daemon] Command truncated in run_job_on_node (coordinator)\n");
        return;
    }
    if (system(cmd) != 0) {
        fprintf(stderr, "[daemon] Failed to start dmtcp_coordinator on node%d\n", node_id);
    }

    ret = snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d ~/dmtcp/bin/dmtcp_launch --coord-port %d %s &", node_id, port, binary_path);
    if (ret >= sizeof(cmd)) {
        fprintf(stderr, "[daemon] Command truncated in run_job_on_node (launch)\n");
        return;
    }
    printf("[daemon] Running on node%d: %s\n", node_id, cmd);
    if (system(cmd) != 0) {
        fprintf(stderr, "[daemon] Failed to launch job on node%d\n", node_id);
    }
}

int main() {
    // Open or create job queue shared memory
    int shm_fd = shm_open(SHM_NAME, O_CREAT | O_RDWR, 0666);
    if (shm_fd == -1) {
        perror("[daemon] shm_open SHM_NAME");
        return 1;
    }
    if (ftruncate(shm_fd, sizeof(JobQueue)) == -1) {
        perror("[daemon] ftruncate SHM_NAME");
        close(shm_fd);
        return 1;
    }

    JobQueue *queue = mmap(NULL, sizeof(JobQueue), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
    if (queue == MAP_FAILED) {
        perror("[daemon] mmap SHM_NAME");
        close(shm_fd);
        return 1;
    }

    // Open node status shared memory (created by agent.c rank 0)
    int stat_fd = shm_open(SHM_NODE_STATUS, O_RDWR, 0666);
    if (stat_fd == -1) {
        perror("[daemon] shm_open NODE_STATUS_SHM");
        munmap(queue, sizeof(JobQueue));
        close(shm_fd);
        return 1;
    }

    NodeStatusArray *status_array = mmap(NULL, sizeof(NodeStatusArray), PROT_READ | PROT_WRITE, MAP_SHARED, stat_fd, 0);
    if (status_array == MAP_FAILED) {
        perror("[daemon] mmap NODE_STATUS_SHM");
        close(stat_fd);
        munmap(queue, sizeof(JobQueue));
        close(shm_fd);
        return 1;
    }

    static int initialized = 0;
    if (!initialized) {
        pthread_mutexattr_t attr;
        pthread_mutexattr_init(&attr);
        pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
        if (pthread_mutex_init(&queue->mutex, &attr) != 0) {
            perror("[daemon] queue mutex initialization failed");
            munmap(status_array, sizeof(NodeStatusArray));
            close(stat_fd);
            munmap(queue, sizeof(JobQueue));
            close(shm_fd);
            return 1;
        }
        pthread_mutexattr_destroy(&attr);
        queue->head = queue->tail = 0;

        // Initialize job_node array
        for (int i = 0; i < MAX_JOBS; i++) {
            queue->job_nodes[i] = -1;
        }
        initialized = 1;
    }

    // Start load balancing thread
    pthread_t load_thread;
    if (pthread_create(&load_thread, NULL, load_balance_thread, status_array) != 0) {
        perror("[daemon] pthread_create");
        munmap(status_array, sizeof(NodeStatusArray));
        close(stat_fd);
        munmap(queue, sizeof(JobQueue));
        close(shm_fd);
        return 1;
    }

    int next_node_id = 0;

    while (1) {
        pthread_mutex_lock(&queue->mutex);
        while (queue->head != queue->tail) {
            int idx = queue->head;
            char job[MAX_PATH];
            strncpy(job, queue->jobs[idx], MAX_PATH - 1);
            job[MAX_PATH - 1] = '\0'; // Ensure null-termination
            queue->job_nodes[idx] = next_node_id + 1; // Assign node to job

            queue->head = (queue->head + 1) % MAX_JOBS;
            pthread_mutex_unlock(&queue->mutex);

            printf("[daemon] Dispatching job at index %d to node %d: %s\n", idx, next_node_id + 1, job);
            pthread_mutex_lock(&status_array->mutex);
            status_array->statuses[next_node_id].lport++;
            status_array->statuses[next_node_id].running_jobs++;
            pthread_mutex_unlock(&status_array->mutex);

            run_job_on_node(job, next_node_id + 1, status_array->statuses[next_node_id].lport);

            next_node_id = (next_node_id + 1) % NODES;

            pthread_mutex_lock(&queue->mutex);
        }
        pthread_mutex_unlock(&queue->mutex);
        sleep(1);
    }

    // Cleanup (unreachable due to infinite loop)
    pthread_join(load_thread, NULL);
    if (munmap(status_array, sizeof(NodeStatusArray)) == -1) {
        perror("[daemon] munmap NODE_STATUS_SHM");
    }
    if (close(stat_fd) == -1) {
        perror("[daemon] close NODE_STATUS_SHM");
    }
    if (munmap(queue, sizeof(JobQueue)) == -1) {
        perror("[daemon] munmap SHM_NAME");
    }
    if (close(shm_fd) == -1) {
        perror("[daemon] close SHM_NAME");
    }
    return 0;
}