1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <pthread.h>
#include "shared.h"
#define NODES 4
void *load_balance_thread(void *arg) {
while (1) {
pthread_mutex_lock(&status_array->mutex);
float total = 0;
float max = -1, min = 1e9;
int max_id = -1, min_id = -1;
for (int i = 0; i < NODES; i++) {
float load = status_array->statuses[i].load;
total += load;
if (load > max) {
max = load;
max_id = i;
}
if (load < min) {
min = load;
min_id = i;
}
}
float avg = total / NODES;
if ((max - min) > (avg / 2)) {
printf("[daemon] Triggering migration: max load node %d -> min load node %d\n", max_id, min_id);
// Call external script/function to checkpoint, scp, and restart
char cmd[512];
snprintf(cmd, sizeof(cmd), "ssh node%d \"./migrate_heavy_job.sh %d %d\"", max_id, max_id, min_id);
system(cmd);
}
pthread_mutex_unlock(&status_array->mutex);
sleep(60);
}
return NULL;
}
void run_job_on_node(const char *binary_path, int node_id) {
char cmd[512];
snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d %s &", node_id, binary_path);
printf("[daemon] Running on node%d: %s\n", node_id, cmd);
system(cmd);
}
int main() {
int shm_fd = shm_open(SHM_NAME, O_CREAT | O_RDWR, 0666);
if (shm_fd == -1) {
perror("shm_open");
return 1;
}
ftruncate(shm_fd, sizeof(JobQueue));
JobQueue *queue = mmap(NULL, sizeof(JobQueue), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
if (queue == MAP_FAILED) {
perror("mmap");
return 1;
}
int stat_fd = shm_open(NODE_STATUS_SHM, O_RDWR, 0666);
if (stat_fd == -1) {
perror("shm_open NODE_STATUS_SHM");
return 1;
}
NodeStatusArray *status_array = mmap(NULL, sizeof(NodeStatusArray), PROT_READ | PROT_WRITE, MAP_SHARED, stat_fd, 0);
if (status_array == MAP_FAILED) {
perror("mmap NODE_STATUS_SHM");
return 1;
}
static int initialized = 0;
if (!initialized) {
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(&queue->mutex, &attr);
pthread_mutexattr_destroy(&attr);
queue->head = queue->tail = 0;
// Initialize job_node array
for (int i = 0; i < MAX_JOBS; i++) {
queue->job_node[i] = -1;
}
initialized = 1;
}
int next_node_id = 0;
while (1) {
pthread_mutex_lock(&queue->mutex);
while (queue->head != queue->tail) {
int idx = queue->head;
char job[MAX_PATH];
strncpy(job, queue->jobs[idx], MAX_PATH);
// Assign node to job
queue->job_node[idx] = next_node_id;
// Move head forward in the queue
queue->head = (queue->head + 1) % MAX_JOBS;
pthread_mutex_unlock(&queue->mutex);
printf("[daemon] Dispatching job at index %d to node %d: %s\n", idx, next_node_id, job);
run_job_on_node(job, next_node_id);
next_node_id = (next_node_id + 1) % NODES;
pthread_mutex_lock(&queue->mutex);
}
pthread_mutex_unlock(&queue->mutex);
sleep(1);
}
return 0;
}
|