diff options
author | Your Name <you@example.com> | 2025-07-15 03:05:08 +0000 |
---|---|---|
committer | Your Name <you@example.com> | 2025-07-15 03:05:08 +0000 |
commit | 765501cdde04cba4b0cef061f77247e883b2bce9 (patch) | |
tree | 435f8d0ae68f31b3983f41b14596e87ae680a9b2 | |
parent | 022baafc0d7cd190887d2a98327a1536f1ad2784 (diff) |
-rwxr-xr-x | code/agent | bin | 18056 -> 0 bytes | |||
-rwxr-xr-x | code/daemon2 | bin | 17888 -> 0 bytes | |||
-rw-r--r-- | daemon2.c | 206 |
3 files changed, 0 insertions, 206 deletions
diff --git a/code/agent b/code/agent Binary files differdeleted file mode 100755 index f4cf8c6..0000000 --- a/code/agent +++ /dev/null diff --git a/code/daemon2 b/code/daemon2 Binary files differdeleted file mode 100755 index 66c3582..0000000 --- a/code/daemon2 +++ /dev/null diff --git a/daemon2.c b/daemon2.c deleted file mode 100644 index bb1465e..0000000 --- a/daemon2.c +++ /dev/null @@ -1,206 +0,0 @@ -#include <stdio.h> -#include <stdlib.h> -#include <string.h> -#include <unistd.h> -#include <fcntl.h> -#include <sys/mman.h> -#include <sys/stat.h> -#include <pthread.h> -#include "shared.h" - -#define NODES 2 - -void *load_balance_thread(void *arg) { - NodeStatusArray *status_array = (NodeStatusArray *)arg; - while (1) { - pthread_mutex_lock(&status_array->mutex); - printf("[daemon] Acquired mutex for load balancing on %s\n", getenv("HOSTNAME") ? getenv("HOSTNAME") : "unknown"); - - float total = 0; - float max = -1, min = 1e9; - int max_id = -1, min_id = -1; - - for (int i = 0; i < NODES; i++) { - float load = status_array->statuses[i].load; - total += load; - if (load > max) { - max = load; - max_id = i; - } - if (load < min) { - min = load; - min_id = i; - } - } - - float avg = total / NODES; - char cmd[512] = {0}; - int new_min_port = -1; - if ((max - min) > (avg / 2) && max_id != -1 && min_id != -1 && (max - min)>0.3) { - printf("[daemon] Triggering migration: max load node %d -> min load node %d\n", max_id, min_id); - new_min_port = status_array->statuses[min_id].lport + 1; - status_array->statuses[min_id].lport = new_min_port; - } - - pthread_mutex_unlock(&status_array->mutex); - printf("[daemon] Released mutex for load balancing on %s\n", getenv("HOSTNAME") ? getenv("HOSTNAME") : "unknown"); - - if (new_min_port != -1) { - int ret = snprintf(cmd, sizeof(cmd), - "mpirun -np 1 -host node%d ~/dmtcp/bin/migrate_heavy_job2.sh %d %d %d", - max_id+1, min_id+1, new_min_port, new_min_port); - if (ret >= sizeof(cmd)) { - fprintf(stderr, "[daemon] Command truncated in load_balance_thread\n"); - } else { - printf("[daemon] Running migration script on node%d: %s\n", max_id, cmd); - if (system(cmd) != 0) { - fprintf(stderr, "[daemon] Migration script failed on node%d\n", max_id); - } - } - } - - sleep(60); - } - return NULL; -} - -void run_job_on_node(const char *binary_path, int node_id, int port) { - char cmd[512]; - int ret = snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d ~/dmtcp/bin/dmtcp_coordinator --port %d --daemon", node_id, port); - if (ret >= sizeof(cmd)) { - fprintf(stderr, "[daemon] Command truncated in run_job_on_node (coordinator)\n"); - return; - } - if (system(cmd) != 0) { - fprintf(stderr, "[daemon] Failed to start dmtcp_coordinator on node%d\n", node_id); - } - - ret = snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d ~/dmtcp/bin/dmtcp_launch --coord-port %d %s &", node_id, port, binary_path); - if (ret >= sizeof(cmd)) { - fprintf(stderr, "[daemon] Command truncated in run_job_on_node (launch)\n"); - return; - } - printf("[daemon] Running on node%d: %s\n", node_id, cmd); - if (system(cmd) != 0) { - fprintf(stderr, "[daemon] Failed to launch job on node%d\n", node_id); - } -} - -int main() { - // Open or create job queue shared memory - int shm_fd = shm_open(SHM_NAME, O_CREAT | O_RDWR, 0666); - if (shm_fd == -1) { - perror("[daemon] shm_open SHM_NAME"); - return 1; - } - if (ftruncate(shm_fd, sizeof(JobQueue)) == -1) { - perror("[daemon] ftruncate SHM_NAME"); - close(shm_fd); - return 1; - } - - JobQueue *queue = mmap(NULL, sizeof(JobQueue), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); - if (queue == MAP_FAILED) { - perror("[daemon] mmap SHM_NAME"); - close(shm_fd); - return 1; - } - - // Open node status shared memory (created by agent.c rank 0) - int stat_fd = shm_open(SHM_NODE_STATUS, O_RDWR, 0666); - if (stat_fd == -1) { - perror("[daemon] shm_open NODE_STATUS_SHM"); - munmap(queue, sizeof(JobQueue)); - close(shm_fd); - return 1; - } - - NodeStatusArray *status_array = mmap(NULL, sizeof(NodeStatusArray), PROT_READ | PROT_WRITE, MAP_SHARED, stat_fd, 0); - if (status_array == MAP_FAILED) { - perror("[daemon] mmap NODE_STATUS_SHM"); - close(stat_fd); - munmap(queue, sizeof(JobQueue)); - close(shm_fd); - return 1; - } - - static int initialized = 0; - if (!initialized) { - pthread_mutexattr_t attr; - pthread_mutexattr_init(&attr); - pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); - if (pthread_mutex_init(&queue->mutex, &attr) != 0) { - perror("[daemon] queue mutex initialization failed"); - munmap(status_array, sizeof(NodeStatusArray)); - close(stat_fd); - munmap(queue, sizeof(JobQueue)); - close(shm_fd); - return 1; - } - pthread_mutexattr_destroy(&attr); - queue->head = queue->tail = 0; - - // Initialize job_node array - for (int i = 0; i < MAX_JOBS; i++) { - queue->job_nodes[i] = -1; - } - initialized = 1; - } - - // Start load balancing thread - pthread_t load_thread; - if (pthread_create(&load_thread, NULL, load_balance_thread, status_array) != 0) { - perror("[daemon] pthread_create"); - munmap(status_array, sizeof(NodeStatusArray)); - close(stat_fd); - munmap(queue, sizeof(JobQueue)); - close(shm_fd); - return 1; - } - - int next_node_id = 0; - - while (1) { - pthread_mutex_lock(&queue->mutex); - while (queue->head != queue->tail) { - int idx = queue->head; - char job[MAX_PATH]; - strncpy(job, queue->jobs[idx], MAX_PATH - 1); - job[MAX_PATH - 1] = '\0'; // Ensure null-termination - queue->job_nodes[idx] = next_node_id + 1; // Assign node to job - - queue->head = (queue->head + 1) % MAX_JOBS; - pthread_mutex_unlock(&queue->mutex); - - printf("[daemon] Dispatching job at index %d to node %d: %s\n", idx, next_node_id + 1, job); - pthread_mutex_lock(&status_array->mutex); - status_array->statuses[next_node_id].lport++; - status_array->statuses[next_node_id].running_jobs++; - pthread_mutex_unlock(&status_array->mutex); - - run_job_on_node(job, next_node_id + 1, status_array->statuses[next_node_id].lport); - - next_node_id = (next_node_id + 1) % NODES; - - pthread_mutex_lock(&queue->mutex); - } - pthread_mutex_unlock(&queue->mutex); - sleep(1); - } - - // Cleanup (unreachable due to infinite loop) - pthread_join(load_thread, NULL); - if (munmap(status_array, sizeof(NodeStatusArray)) == -1) { - perror("[daemon] munmap NODE_STATUS_SHM"); - } - if (close(stat_fd) == -1) { - perror("[daemon] close NODE_STATUS_SHM"); - } - if (munmap(queue, sizeof(JobQueue)) == -1) { - perror("[daemon] munmap SHM_NAME"); - } - if (close(shm_fd) == -1) { - perror("[daemon] close SHM_NAME"); - } - return 0; -} |