diff options
author | Your Name <you@example.com> | 2025-07-15 02:55:46 +0000 |
---|---|---|
committer | Your Name <you@example.com> | 2025-07-15 02:55:46 +0000 |
commit | 082e3adb0f3fd13b2f9de827449f1ec1ed037bf3 (patch) | |
tree | 3241cfc4b8cfbea201909df643f1011f650d8a27 /daemon.c | |
parent | 2dbcce88ab7a73304f770006bcae53461c6daddc (diff) |
0.9 + nightly builds
Diffstat (limited to 'daemon.c')
-rw-r--r-- | daemon.c | 111 |
1 files changed, 55 insertions, 56 deletions
@@ -8,9 +8,11 @@ #include <pthread.h> #include "shared.h" -#define NODES 4 +#define NODES 2 void *load_balance_thread(void *arg) { + NodeStatusArray *status_array = (NodeStatusArray *)arg; + while (1) { pthread_mutex_lock(&status_array->mutex); @@ -32,57 +34,51 @@ void *load_balance_thread(void *arg) { } float avg = total / NODES; - if ((max - min) > (avg / 2)) { + if ((max - min) > (avg / 2) && avg>0.1) { printf("[daemon] Triggering migration: max load node %d -> min load node %d\n", max_id, min_id); - // Call external script/function to checkpoint, scp, and restart - char cmd[512]; - snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d checkpointandbalance.sh %d &", min_id, max_id); - printf("[daemon] Running on node%d: %s\n", node_id, cmd); - system(cmd); - char cmd[512]; - snprintf(cmd, sizeof(cmd), "ssh node%d \"./migrate_heavy_job.sh %d %d\"", max_id, max_id, min_id); + printf("[daemon] Threashold exceeded: (%6.2f - %6.2f) = %6.2f > ( %6.2f / 2) \n", max, min, (max - min), avg); + char cmd[512]; + + status_array->statuses[min_id].lport++; + status_array->statuses[max_id].lport--; + snprintf(cmd, sizeof(cmd), + "ssh node%d '~/dmtcp/bin/migrate_heavy_job.sh %d %d %d'", + max_id + 1, min_id + 1, status_array->statuses[min_id].lport, status_array->statuses[max_id].lport); + printf("[daemon] Running migration script on node%d: %s\n", max_id+1, cmd); system(cmd); - } - pthread_mutex_unlock(&status_array->mutex); - sleep(60); + } - return NULL; + pthread_mutex_unlock(&status_array->mutex); + + sleep(60); } + return NULL; +} -void run_job_on_node(const char *binary_path, int node_id) { +void run_job_on_node(const char *binary_path, int node_id, int port) { char cmd[512]; - snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d %s &", node_id, binary_path); - printf("[daemon] Running on node%d: %s\n", node_id, cmd); + snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d ~/dmtcp/bin/dmtcp_coordinator --port %d --daemon", node_id, port); + system(cmd); + snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d ~/dmtcp/bin/dmtcp_launch --coord-port %d python3 ~/dmtcp/bin/%s &", node_id, port, binary_path); + printf("[daemon] Disptached to node%d: %s\n", node_id, cmd); system(cmd); } int main() { int shm_fd = shm_open(SHM_NAME, O_CREAT | O_RDWR, 0666); - if (shm_fd == -1) { - perror("shm_open"); - return 1; - } + ftruncate(shm_fd, sizeof(JobQueue)); JobQueue *queue = mmap(NULL, sizeof(JobQueue), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); - if (queue == MAP_FAILED) { - perror("mmap"); - return 1; - } - int stat_fd = shm_open(NODE_STATUS_SHM, O_RDWR, 0666); - if (stat_fd == -1) { - perror("shm_open NODE_STATUS_SHM"); - return 1; - } + + int stat_fd = shm_open(SHM_NODE_STATUS, O_RDWR, 0666); + NodeStatusArray *status_array = mmap(NULL, sizeof(NodeStatusArray), PROT_READ | PROT_WRITE, MAP_SHARED, stat_fd, 0); - if (status_array == MAP_FAILED) { - perror("mmap NODE_STATUS_SHM"); - return 1; - } + static int initialized = 0; if (!initialized) { @@ -93,39 +89,42 @@ int main() { pthread_mutexattr_destroy(&attr); queue->head = queue->tail = 0; - // Initialize job_node array - for (int i = 0; i < MAX_JOBS; i++) { - queue->job_node[i] = -1; - } + + + pthread_t load_thread; //LOADB_pthread + pthread_create(&load_thread, NULL, load_balance_thread, status_array); initialized = 1; } int next_node_id = 0; - while (1) { - pthread_mutex_lock(&queue->mutex); - while (queue->head != queue->tail) { - int idx = queue->head; - char job[MAX_PATH]; - strncpy(job, queue->jobs[idx], MAX_PATH); - - // Assign node to job - queue->job_node[idx] = next_node_id; +while (1) { + pthread_mutex_lock(&queue->mutex); + while (queue->head != queue->tail) { + int idx = queue->head; + char job[MAX_PATH]; + strncpy(job, queue->jobs[idx], MAX_PATH); - // Move head forward in the queue - queue->head = (queue->head + 1) % MAX_JOBS; + int job_target_node = queue->job_nodes[idx]; + int assigned_node = (job_target_node != -1) ? job_target_node : next_node_id; - pthread_mutex_unlock(&queue->mutex); - - printf("[daemon] Dispatching job at index %d to node %d: %s\n", idx, next_node_id, job); - run_job_on_node(job, next_node_id); + pthread_mutex_unlock(&queue->mutex); - next_node_id = (next_node_id + 1) % NODES; + printf("[daemon] Dispatching job at index %d to node %d: %s\n", idx, assigned_node, job); + status_array->statuses[assigned_node].lport++; + printf("[DEBUG] Dispatching job at index %d to node %d: %s\n", idx, assigned_node, job); + run_job_on_node(job, assigned_node + 1, status_array->statuses[assigned_node].lport); - pthread_mutex_lock(&queue->mutex); + if (job_target_node == -1) { + next_node_id = (next_node_id + 1) % NODES; // round-robin advance only if not user-specified } - pthread_mutex_unlock(&queue->mutex); - sleep(1); + + pthread_mutex_lock(&queue->mutex); + queue->head = (queue->head + 1) % MAX_JOBS; } + pthread_mutex_unlock(&queue->mutex); + sleep(1); +} + return 0; } |