diff options
author | Your Name <you@example.com> | 2025-07-15 02:55:46 +0000 |
---|---|---|
committer | Your Name <you@example.com> | 2025-07-15 02:55:46 +0000 |
commit | 082e3adb0f3fd13b2f9de827449f1ec1ed037bf3 (patch) | |
tree | 3241cfc4b8cfbea201909df643f1011f650d8a27 /daemon2.c | |
parent | 2dbcce88ab7a73304f770006bcae53461c6daddc (diff) |
0.9 + nightly builds
Diffstat (limited to 'daemon2.c')
-rw-r--r-- | daemon2.c | 206 |
1 files changed, 206 insertions, 0 deletions
diff --git a/daemon2.c b/daemon2.c new file mode 100644 index 0000000..bb1465e --- /dev/null +++ b/daemon2.c @@ -0,0 +1,206 @@ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <fcntl.h> +#include <sys/mman.h> +#include <sys/stat.h> +#include <pthread.h> +#include "shared.h" + +#define NODES 2 + +void *load_balance_thread(void *arg) { + NodeStatusArray *status_array = (NodeStatusArray *)arg; + while (1) { + pthread_mutex_lock(&status_array->mutex); + printf("[daemon] Acquired mutex for load balancing on %s\n", getenv("HOSTNAME") ? getenv("HOSTNAME") : "unknown"); + + float total = 0; + float max = -1, min = 1e9; + int max_id = -1, min_id = -1; + + for (int i = 0; i < NODES; i++) { + float load = status_array->statuses[i].load; + total += load; + if (load > max) { + max = load; + max_id = i; + } + if (load < min) { + min = load; + min_id = i; + } + } + + float avg = total / NODES; + char cmd[512] = {0}; + int new_min_port = -1; + if ((max - min) > (avg / 2) && max_id != -1 && min_id != -1 && (max - min)>0.3) { + printf("[daemon] Triggering migration: max load node %d -> min load node %d\n", max_id, min_id); + new_min_port = status_array->statuses[min_id].lport + 1; + status_array->statuses[min_id].lport = new_min_port; + } + + pthread_mutex_unlock(&status_array->mutex); + printf("[daemon] Released mutex for load balancing on %s\n", getenv("HOSTNAME") ? getenv("HOSTNAME") : "unknown"); + + if (new_min_port != -1) { + int ret = snprintf(cmd, sizeof(cmd), + "mpirun -np 1 -host node%d ~/dmtcp/bin/migrate_heavy_job2.sh %d %d %d", + max_id+1, min_id+1, new_min_port, new_min_port); + if (ret >= sizeof(cmd)) { + fprintf(stderr, "[daemon] Command truncated in load_balance_thread\n"); + } else { + printf("[daemon] Running migration script on node%d: %s\n", max_id, cmd); + if (system(cmd) != 0) { + fprintf(stderr, "[daemon] Migration script failed on node%d\n", max_id); + } + } + } + + sleep(60); + } + return NULL; +} + +void run_job_on_node(const char *binary_path, int node_id, int port) { + char cmd[512]; + int ret = snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d ~/dmtcp/bin/dmtcp_coordinator --port %d --daemon", node_id, port); + if (ret >= sizeof(cmd)) { + fprintf(stderr, "[daemon] Command truncated in run_job_on_node (coordinator)\n"); + return; + } + if (system(cmd) != 0) { + fprintf(stderr, "[daemon] Failed to start dmtcp_coordinator on node%d\n", node_id); + } + + ret = snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d ~/dmtcp/bin/dmtcp_launch --coord-port %d %s &", node_id, port, binary_path); + if (ret >= sizeof(cmd)) { + fprintf(stderr, "[daemon] Command truncated in run_job_on_node (launch)\n"); + return; + } + printf("[daemon] Running on node%d: %s\n", node_id, cmd); + if (system(cmd) != 0) { + fprintf(stderr, "[daemon] Failed to launch job on node%d\n", node_id); + } +} + +int main() { + // Open or create job queue shared memory + int shm_fd = shm_open(SHM_NAME, O_CREAT | O_RDWR, 0666); + if (shm_fd == -1) { + perror("[daemon] shm_open SHM_NAME"); + return 1; + } + if (ftruncate(shm_fd, sizeof(JobQueue)) == -1) { + perror("[daemon] ftruncate SHM_NAME"); + close(shm_fd); + return 1; + } + + JobQueue *queue = mmap(NULL, sizeof(JobQueue), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); + if (queue == MAP_FAILED) { + perror("[daemon] mmap SHM_NAME"); + close(shm_fd); + return 1; + } + + // Open node status shared memory (created by agent.c rank 0) + int stat_fd = shm_open(SHM_NODE_STATUS, O_RDWR, 0666); + if (stat_fd == -1) { + perror("[daemon] shm_open NODE_STATUS_SHM"); + munmap(queue, sizeof(JobQueue)); + close(shm_fd); + return 1; + } + + NodeStatusArray *status_array = mmap(NULL, sizeof(NodeStatusArray), PROT_READ | PROT_WRITE, MAP_SHARED, stat_fd, 0); + if (status_array == MAP_FAILED) { + perror("[daemon] mmap NODE_STATUS_SHM"); + close(stat_fd); + munmap(queue, sizeof(JobQueue)); + close(shm_fd); + return 1; + } + + static int initialized = 0; + if (!initialized) { + pthread_mutexattr_t attr; + pthread_mutexattr_init(&attr); + pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); + if (pthread_mutex_init(&queue->mutex, &attr) != 0) { + perror("[daemon] queue mutex initialization failed"); + munmap(status_array, sizeof(NodeStatusArray)); + close(stat_fd); + munmap(queue, sizeof(JobQueue)); + close(shm_fd); + return 1; + } + pthread_mutexattr_destroy(&attr); + queue->head = queue->tail = 0; + + // Initialize job_node array + for (int i = 0; i < MAX_JOBS; i++) { + queue->job_nodes[i] = -1; + } + initialized = 1; + } + + // Start load balancing thread + pthread_t load_thread; + if (pthread_create(&load_thread, NULL, load_balance_thread, status_array) != 0) { + perror("[daemon] pthread_create"); + munmap(status_array, sizeof(NodeStatusArray)); + close(stat_fd); + munmap(queue, sizeof(JobQueue)); + close(shm_fd); + return 1; + } + + int next_node_id = 0; + + while (1) { + pthread_mutex_lock(&queue->mutex); + while (queue->head != queue->tail) { + int idx = queue->head; + char job[MAX_PATH]; + strncpy(job, queue->jobs[idx], MAX_PATH - 1); + job[MAX_PATH - 1] = '\0'; // Ensure null-termination + queue->job_nodes[idx] = next_node_id + 1; // Assign node to job + + queue->head = (queue->head + 1) % MAX_JOBS; + pthread_mutex_unlock(&queue->mutex); + + printf("[daemon] Dispatching job at index %d to node %d: %s\n", idx, next_node_id + 1, job); + pthread_mutex_lock(&status_array->mutex); + status_array->statuses[next_node_id].lport++; + status_array->statuses[next_node_id].running_jobs++; + pthread_mutex_unlock(&status_array->mutex); + + run_job_on_node(job, next_node_id + 1, status_array->statuses[next_node_id].lport); + + next_node_id = (next_node_id + 1) % NODES; + + pthread_mutex_lock(&queue->mutex); + } + pthread_mutex_unlock(&queue->mutex); + sleep(1); + } + + // Cleanup (unreachable due to infinite loop) + pthread_join(load_thread, NULL); + if (munmap(status_array, sizeof(NodeStatusArray)) == -1) { + perror("[daemon] munmap NODE_STATUS_SHM"); + } + if (close(stat_fd) == -1) { + perror("[daemon] close NODE_STATUS_SHM"); + } + if (munmap(queue, sizeof(JobQueue)) == -1) { + perror("[daemon] munmap SHM_NAME"); + } + if (close(shm_fd) == -1) { + perror("[daemon] close SHM_NAME"); + } + return 0; +} |