summaryrefslogtreecommitdiff
path: root/daemon.c
blob: ea1ffaadd56dda786abb8df29856e785cbefab70 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <pthread.h>
#include "shared.h"

#define NODES 4

void *load_balance_thread(void *arg) {
    while (1) {
        pthread_mutex_lock(&status_array->mutex);

        float total = 0;
        float max = -1, min = 1e9;
        int max_id = -1, min_id = -1;

        for (int i = 0; i < NODES; i++) {
            float load = status_array->statuses[i].load;
            total += load;
            if (load > max) {
                max = load;
                max_id = i;
            }
            if (load < min) {
                min = load;
                min_id = i;
            }
        }

        float avg = total / NODES;
        if ((max - min) > (avg / 2)) {
            printf("[daemon] Triggering migration: max load node %d -> min load node %d\n", max_id, min_id);
            // Call external script/function to checkpoint, scp, and restart
            char cmd[512];
            snprintf(cmd, sizeof(cmd), "ssh node%d \"./migrate_heavy_job.sh %d %d\"", max_id, max_id, min_id);
            system(cmd);
        }

        pthread_mutex_unlock(&status_array->mutex);
        sleep(60);
    }
    return NULL;
}


void run_job_on_node(const char *binary_path, int node_id) {
    char cmd[512];
    snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d %s &", node_id, binary_path);
    printf("[daemon] Running on node%d: %s\n", node_id, cmd);
    system(cmd);
}

int main() {
    int shm_fd = shm_open(SHM_NAME, O_CREAT | O_RDWR, 0666);
    if (shm_fd == -1) {
        perror("shm_open");
        return 1;
    }
    ftruncate(shm_fd, sizeof(JobQueue));

    JobQueue *queue = mmap(NULL, sizeof(JobQueue), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
    if (queue == MAP_FAILED) {
        perror("mmap");
        return 1;
    }

    int stat_fd = shm_open(NODE_STATUS_SHM, O_RDWR, 0666);
    if (stat_fd == -1) {
        perror("shm_open NODE_STATUS_SHM");
        return 1;
    }

    NodeStatusArray *status_array = mmap(NULL, sizeof(NodeStatusArray), PROT_READ | PROT_WRITE, MAP_SHARED, stat_fd, 0);
    if (status_array == MAP_FAILED) {
        perror("mmap NODE_STATUS_SHM");
        return 1;
    }

    static int initialized = 0;
    if (!initialized) {
        pthread_mutexattr_t attr;
        pthread_mutexattr_init(&attr);
        pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
        pthread_mutex_init(&queue->mutex, &attr);
        pthread_mutexattr_destroy(&attr);
        queue->head = queue->tail = 0;

        // Initialize job_node array
        for (int i = 0; i < MAX_JOBS; i++) {
            queue->job_node[i] = -1;
        }
        initialized = 1;
    }

    int next_node_id = 0;

    while (1) {
        pthread_mutex_lock(&queue->mutex);
        while (queue->head != queue->tail) {
            int idx = queue->head;
            char job[MAX_PATH];
            strncpy(job, queue->jobs[idx], MAX_PATH);

            // Assign node to job
            queue->job_node[idx] = next_node_id;

            // Move head forward in the queue
            queue->head = (queue->head + 1) % MAX_JOBS;

            pthread_mutex_unlock(&queue->mutex);

            printf("[daemon] Dispatching job at index %d to node %d: %s\n", idx, next_node_id, job);
            run_job_on_node(job, next_node_id);

            next_node_id = (next_node_id + 1) % NODES;

            pthread_mutex_lock(&queue->mutex);
        }
        pthread_mutex_unlock(&queue->mutex);
        sleep(1);
    }
    return 0;
}