From 082e3adb0f3fd13b2f9de827449f1ec1ed037bf3 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 15 Jul 2025 02:55:46 +0000 Subject: 0.9 + nightly builds --- addjob | Bin 0 -> 17272 bytes addjob.c | 19 +++++- agent | Bin 0 -> 18056 bytes agent.c | 137 ++++++++++++++++++++------------------- code/agent | Bin 0 -> 18056 bytes code/daemon2 | Bin 0 -> 17888 bytes daemon | Bin 0 -> 17712 bytes daemon.c | 111 ++++++++++++++++---------------- daemon2 | Bin 0 -> 17888 bytes daemon2.c | 206 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ shared.h | 7 +- 11 files changed, 355 insertions(+), 125 deletions(-) create mode 100755 addjob create mode 100755 agent create mode 100755 code/agent create mode 100755 code/daemon2 create mode 100755 daemon create mode 100755 daemon2 create mode 100644 daemon2.c diff --git a/addjob b/addjob new file mode 100755 index 0000000..a23d1f0 Binary files /dev/null and b/addjob differ diff --git a/addjob.c b/addjob.c index db6a9d3..6af7847 100644 --- a/addjob.c +++ b/addjob.c @@ -8,11 +8,20 @@ #include "shared.h" int main(int argc, char *argv[]) { - if (argc != 2) { - fprintf(stderr, "Usage: %s \n", argv[0]); + if (argc != 2 && argc != 3) { + fprintf(stderr, "Usage: %s [node]\n", argv[0]); return 1; } + int target_node = -1; + if (argc == 3) { + target_node = atoi(argv[2]); + if (target_node < 0 || target_node >= MAX_NODES) { + fprintf(stderr, "[addjob] Invalid node ID: %d\n", target_node); + return 2; + } + } + int shm_fd = shm_open(SHM_NAME, O_RDWR, 0666); if (shm_fd == -1) { perror("shm_open"); @@ -30,8 +39,12 @@ int main(int argc, char *argv[]) { printf("[addjob] Queue full\n"); } else { strncpy(queue->jobs[queue->tail], argv[1], MAX_PATH); + queue->job_nodes[queue->tail] = target_node; queue->tail = (queue->tail + 1) % MAX_JOBS; - printf("[addjob] Queued job: %s\n", argv[1]); + printf("[addjob] Queued job: %s", argv[1]); + if (target_node != -1) + printf(" for node %d", target_node); + printf("\n"); } pthread_mutex_unlock(&queue->mutex); diff --git a/agent b/agent new file mode 100755 index 0000000..f4cf8c6 Binary files /dev/null and b/agent differ diff --git a/agent.c b/agent.c index 9a6a286..3b0bd69 100644 --- a/agent.c +++ b/agent.c @@ -6,6 +6,8 @@ #include #include #include +#include +#include #include "shared.h" float get_load() { @@ -13,83 +15,90 @@ float get_load() { if (getloadavg(loads, 1) != -1) { return (float)loads[0]; } - return -1.0; #!!!!!!! + return -1.0; } -int main() { - -MPI_Init(&argc, &argv); - int rank; +int main(int argc, char *argv[]) { + MPI_Init(&argc, &argv); + int rank, size; MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &size); NodeStatus status; memset(&status, 0, sizeof(NodeStatus)); status.node_id = rank; - - #================================================= + status.lport = 7778; + + NodeStatusArray *ns = NULL; // only rank0 + int shm_fd; + if (rank == 0) { - int shm_fd = shm_open(SHM_NODE_STATUS, O_CREAT | O_RDWR, 0666); - if (shm_fd == -1) { - perror("shm_open"); - exit(1); - } - ftruncate(shm_fd, sizeof(NodeStatusArray)); - - NodeStatusArray *ns = mmap(NULL, sizeof(NodeStatusArray), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); - if (ns == MAP_FAILED) { - perror("mmap"); - exit(1); - } - - static int initialized = 0; - if (!initialized) { - pthread_mutexattr_t attr; - pthread_mutexattr_init(&attr); - pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); - pthread_mutex_init(&ns->mutex, &attr); - pthread_mutexattr_destroy(&attr); - for (int i = 0; i < MAX_NODES; i++) { - ns->statuses[i].node_id = i; - ns->statuses[i].running_jobs = 0; - ns->statuses[i].load = 0.0f; - } - initialized = 1; - } - - - - } - while (1) { - if (rank == 1) { - status.loadavg = get_cpu_loadavg(); - status.num_processes = 0; // done - printf("[agent] Rank %d reporting load %.2f\n", rank, status.loadavg); - //autocomplete - FILE *fp = popen("pgrep -f mpirun | wc -l", "r"); - if (fp) { - fscanf(fp, "%d", status.num_processes); - pclose(fp); - } else { - status.num_processes = -1; - } - MPI_Send(&status, sizeof(NodeStatus), MPI_BYTE, 0, 0, MPI_COMM_WORLD); - } else if (rank == 0) { - NodeStatus recv; - MPI_Recv(&recv, sizeof(NodeStatus), MPI_BYTE, 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - printf("[daemon] Received load %.2f from node %d\n", recv.loadavg, recv.node_id); - pthread_mutex_lock(&ns->mutex); - ns->statuses[recv.node_id]=recv; - pthread_mutex_unlock(&ns->mutex); - - + //shm + shm_fd = shm_open(SHM_NODE_STATUS, O_CREAT | O_RDWR, 0666); + + + ftruncate(shm_fd, sizeof(NodeStatusArray)); + + ns = mmap(NULL, sizeof(NodeStatusArray), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); + + + pthread_mutexattr_t attr; + pthread_mutexattr_init(&attr); + pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); + pthread_mutex_init(&ns->mutex, &attr); + pthread_mutexattr_destroy(&attr); + + for (int i = 0; i < MAX_NODES; i++) { + ns->statuses[i].node_id = i; + ns->statuses[i].running_jobs = 0; + ns->statuses[i].load = 0.0f; + ns->statuses[i].lport = 7778; } - sleep(5); } - MPI_Finalize(); - return 0; + MPI_Barrier(MPI_COMM_WORLD); + while (1) { + status.load = get_load(); + FILE *fp = popen("pgrep -f mpirun | wc -l", "r"); + if (fp) { + if (fscanf(fp, "%d", &status.running_jobs) != 1) { + status.running_jobs = -1; + } + pclose(fp); + } else { + status.running_jobs = -1; + } + + printf("[agent] Rank %d reporting load %.2f, running jobs %d\n", rank, status.load, status.running_jobs); + + if (rank == 0) { + pthread_mutex_lock(&ns->mutex); + ns->statuses[rank].running_jobs = status.running_jobs; + ns->statuses[rank].load = status.load; + pthread_mutex_unlock(&ns->mutex); + + for (int i = 1; i < size; i++) { + NodeStatus recv; + int ret = MPI_Recv(&recv, sizeof(NodeStatus), MPI_BYTE, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + + printf("[agent] Received load %.2f, running jobs %d from node %d\n", recv.load, recv.running_jobs, recv.node_id); + + pthread_mutex_lock(&ns->mutex); + ns->statuses[recv.node_id].running_jobs = recv.running_jobs; + ns->statuses[recv.node_id].load = recv.load; + pthread_mutex_unlock(&ns->mutex); + } + } else { + int ret = MPI_Send(&status, sizeof(NodeStatus), MPI_BYTE, 0, 0, MPI_COMM_WORLD); + } + + sleep(5); + } + + MPI_Finalize(); + return 0; } diff --git a/code/agent b/code/agent new file mode 100755 index 0000000..f4cf8c6 Binary files /dev/null and b/code/agent differ diff --git a/code/daemon2 b/code/daemon2 new file mode 100755 index 0000000..66c3582 Binary files /dev/null and b/code/daemon2 differ diff --git a/daemon b/daemon new file mode 100755 index 0000000..e351052 Binary files /dev/null and b/daemon differ diff --git a/daemon.c b/daemon.c index 5c50f94..bd0fa2c 100644 --- a/daemon.c +++ b/daemon.c @@ -8,9 +8,11 @@ #include #include "shared.h" -#define NODES 4 +#define NODES 2 void *load_balance_thread(void *arg) { + NodeStatusArray *status_array = (NodeStatusArray *)arg; + while (1) { pthread_mutex_lock(&status_array->mutex); @@ -32,57 +34,51 @@ void *load_balance_thread(void *arg) { } float avg = total / NODES; - if ((max - min) > (avg / 2)) { + if ((max - min) > (avg / 2) && avg>0.1) { printf("[daemon] Triggering migration: max load node %d -> min load node %d\n", max_id, min_id); - // Call external script/function to checkpoint, scp, and restart - char cmd[512]; - snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d checkpointandbalance.sh %d &", min_id, max_id); - printf("[daemon] Running on node%d: %s\n", node_id, cmd); - system(cmd); - char cmd[512]; - snprintf(cmd, sizeof(cmd), "ssh node%d \"./migrate_heavy_job.sh %d %d\"", max_id, max_id, min_id); + printf("[daemon] Threashold exceeded: (%6.2f - %6.2f) = %6.2f > ( %6.2f / 2) \n", max, min, (max - min), avg); + char cmd[512]; + + status_array->statuses[min_id].lport++; + status_array->statuses[max_id].lport--; + snprintf(cmd, sizeof(cmd), + "ssh node%d '~/dmtcp/bin/migrate_heavy_job.sh %d %d %d'", + max_id + 1, min_id + 1, status_array->statuses[min_id].lport, status_array->statuses[max_id].lport); + printf("[daemon] Running migration script on node%d: %s\n", max_id+1, cmd); system(cmd); - } - pthread_mutex_unlock(&status_array->mutex); - sleep(60); + } - return NULL; + pthread_mutex_unlock(&status_array->mutex); + + sleep(60); } + return NULL; +} -void run_job_on_node(const char *binary_path, int node_id) { +void run_job_on_node(const char *binary_path, int node_id, int port) { char cmd[512]; - snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d %s &", node_id, binary_path); - printf("[daemon] Running on node%d: %s\n", node_id, cmd); + snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d ~/dmtcp/bin/dmtcp_coordinator --port %d --daemon", node_id, port); + system(cmd); + snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d ~/dmtcp/bin/dmtcp_launch --coord-port %d python3 ~/dmtcp/bin/%s &", node_id, port, binary_path); + printf("[daemon] Disptached to node%d: %s\n", node_id, cmd); system(cmd); } int main() { int shm_fd = shm_open(SHM_NAME, O_CREAT | O_RDWR, 0666); - if (shm_fd == -1) { - perror("shm_open"); - return 1; - } + ftruncate(shm_fd, sizeof(JobQueue)); JobQueue *queue = mmap(NULL, sizeof(JobQueue), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); - if (queue == MAP_FAILED) { - perror("mmap"); - return 1; - } - int stat_fd = shm_open(NODE_STATUS_SHM, O_RDWR, 0666); - if (stat_fd == -1) { - perror("shm_open NODE_STATUS_SHM"); - return 1; - } + + int stat_fd = shm_open(SHM_NODE_STATUS, O_RDWR, 0666); + NodeStatusArray *status_array = mmap(NULL, sizeof(NodeStatusArray), PROT_READ | PROT_WRITE, MAP_SHARED, stat_fd, 0); - if (status_array == MAP_FAILED) { - perror("mmap NODE_STATUS_SHM"); - return 1; - } + static int initialized = 0; if (!initialized) { @@ -93,39 +89,42 @@ int main() { pthread_mutexattr_destroy(&attr); queue->head = queue->tail = 0; - // Initialize job_node array - for (int i = 0; i < MAX_JOBS; i++) { - queue->job_node[i] = -1; - } + + + pthread_t load_thread; //LOADB_pthread + pthread_create(&load_thread, NULL, load_balance_thread, status_array); initialized = 1; } int next_node_id = 0; - while (1) { - pthread_mutex_lock(&queue->mutex); - while (queue->head != queue->tail) { - int idx = queue->head; - char job[MAX_PATH]; - strncpy(job, queue->jobs[idx], MAX_PATH); - - // Assign node to job - queue->job_node[idx] = next_node_id; +while (1) { + pthread_mutex_lock(&queue->mutex); + while (queue->head != queue->tail) { + int idx = queue->head; + char job[MAX_PATH]; + strncpy(job, queue->jobs[idx], MAX_PATH); - // Move head forward in the queue - queue->head = (queue->head + 1) % MAX_JOBS; + int job_target_node = queue->job_nodes[idx]; + int assigned_node = (job_target_node != -1) ? job_target_node : next_node_id; - pthread_mutex_unlock(&queue->mutex); - - printf("[daemon] Dispatching job at index %d to node %d: %s\n", idx, next_node_id, job); - run_job_on_node(job, next_node_id); + pthread_mutex_unlock(&queue->mutex); - next_node_id = (next_node_id + 1) % NODES; + printf("[daemon] Dispatching job at index %d to node %d: %s\n", idx, assigned_node, job); + status_array->statuses[assigned_node].lport++; + printf("[DEBUG] Dispatching job at index %d to node %d: %s\n", idx, assigned_node, job); + run_job_on_node(job, assigned_node + 1, status_array->statuses[assigned_node].lport); - pthread_mutex_lock(&queue->mutex); + if (job_target_node == -1) { + next_node_id = (next_node_id + 1) % NODES; // round-robin advance only if not user-specified } - pthread_mutex_unlock(&queue->mutex); - sleep(1); + + pthread_mutex_lock(&queue->mutex); + queue->head = (queue->head + 1) % MAX_JOBS; } + pthread_mutex_unlock(&queue->mutex); + sleep(1); +} + return 0; } diff --git a/daemon2 b/daemon2 new file mode 100755 index 0000000..66c3582 Binary files /dev/null and b/daemon2 differ diff --git a/daemon2.c b/daemon2.c new file mode 100644 index 0000000..bb1465e --- /dev/null +++ b/daemon2.c @@ -0,0 +1,206 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include "shared.h" + +#define NODES 2 + +void *load_balance_thread(void *arg) { + NodeStatusArray *status_array = (NodeStatusArray *)arg; + while (1) { + pthread_mutex_lock(&status_array->mutex); + printf("[daemon] Acquired mutex for load balancing on %s\n", getenv("HOSTNAME") ? getenv("HOSTNAME") : "unknown"); + + float total = 0; + float max = -1, min = 1e9; + int max_id = -1, min_id = -1; + + for (int i = 0; i < NODES; i++) { + float load = status_array->statuses[i].load; + total += load; + if (load > max) { + max = load; + max_id = i; + } + if (load < min) { + min = load; + min_id = i; + } + } + + float avg = total / NODES; + char cmd[512] = {0}; + int new_min_port = -1; + if ((max - min) > (avg / 2) && max_id != -1 && min_id != -1 && (max - min)>0.3) { + printf("[daemon] Triggering migration: max load node %d -> min load node %d\n", max_id, min_id); + new_min_port = status_array->statuses[min_id].lport + 1; + status_array->statuses[min_id].lport = new_min_port; + } + + pthread_mutex_unlock(&status_array->mutex); + printf("[daemon] Released mutex for load balancing on %s\n", getenv("HOSTNAME") ? getenv("HOSTNAME") : "unknown"); + + if (new_min_port != -1) { + int ret = snprintf(cmd, sizeof(cmd), + "mpirun -np 1 -host node%d ~/dmtcp/bin/migrate_heavy_job2.sh %d %d %d", + max_id+1, min_id+1, new_min_port, new_min_port); + if (ret >= sizeof(cmd)) { + fprintf(stderr, "[daemon] Command truncated in load_balance_thread\n"); + } else { + printf("[daemon] Running migration script on node%d: %s\n", max_id, cmd); + if (system(cmd) != 0) { + fprintf(stderr, "[daemon] Migration script failed on node%d\n", max_id); + } + } + } + + sleep(60); + } + return NULL; +} + +void run_job_on_node(const char *binary_path, int node_id, int port) { + char cmd[512]; + int ret = snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d ~/dmtcp/bin/dmtcp_coordinator --port %d --daemon", node_id, port); + if (ret >= sizeof(cmd)) { + fprintf(stderr, "[daemon] Command truncated in run_job_on_node (coordinator)\n"); + return; + } + if (system(cmd) != 0) { + fprintf(stderr, "[daemon] Failed to start dmtcp_coordinator on node%d\n", node_id); + } + + ret = snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d ~/dmtcp/bin/dmtcp_launch --coord-port %d %s &", node_id, port, binary_path); + if (ret >= sizeof(cmd)) { + fprintf(stderr, "[daemon] Command truncated in run_job_on_node (launch)\n"); + return; + } + printf("[daemon] Running on node%d: %s\n", node_id, cmd); + if (system(cmd) != 0) { + fprintf(stderr, "[daemon] Failed to launch job on node%d\n", node_id); + } +} + +int main() { + // Open or create job queue shared memory + int shm_fd = shm_open(SHM_NAME, O_CREAT | O_RDWR, 0666); + if (shm_fd == -1) { + perror("[daemon] shm_open SHM_NAME"); + return 1; + } + if (ftruncate(shm_fd, sizeof(JobQueue)) == -1) { + perror("[daemon] ftruncate SHM_NAME"); + close(shm_fd); + return 1; + } + + JobQueue *queue = mmap(NULL, sizeof(JobQueue), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); + if (queue == MAP_FAILED) { + perror("[daemon] mmap SHM_NAME"); + close(shm_fd); + return 1; + } + + // Open node status shared memory (created by agent.c rank 0) + int stat_fd = shm_open(SHM_NODE_STATUS, O_RDWR, 0666); + if (stat_fd == -1) { + perror("[daemon] shm_open NODE_STATUS_SHM"); + munmap(queue, sizeof(JobQueue)); + close(shm_fd); + return 1; + } + + NodeStatusArray *status_array = mmap(NULL, sizeof(NodeStatusArray), PROT_READ | PROT_WRITE, MAP_SHARED, stat_fd, 0); + if (status_array == MAP_FAILED) { + perror("[daemon] mmap NODE_STATUS_SHM"); + close(stat_fd); + munmap(queue, sizeof(JobQueue)); + close(shm_fd); + return 1; + } + + static int initialized = 0; + if (!initialized) { + pthread_mutexattr_t attr; + pthread_mutexattr_init(&attr); + pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); + if (pthread_mutex_init(&queue->mutex, &attr) != 0) { + perror("[daemon] queue mutex initialization failed"); + munmap(status_array, sizeof(NodeStatusArray)); + close(stat_fd); + munmap(queue, sizeof(JobQueue)); + close(shm_fd); + return 1; + } + pthread_mutexattr_destroy(&attr); + queue->head = queue->tail = 0; + + // Initialize job_node array + for (int i = 0; i < MAX_JOBS; i++) { + queue->job_nodes[i] = -1; + } + initialized = 1; + } + + // Start load balancing thread + pthread_t load_thread; + if (pthread_create(&load_thread, NULL, load_balance_thread, status_array) != 0) { + perror("[daemon] pthread_create"); + munmap(status_array, sizeof(NodeStatusArray)); + close(stat_fd); + munmap(queue, sizeof(JobQueue)); + close(shm_fd); + return 1; + } + + int next_node_id = 0; + + while (1) { + pthread_mutex_lock(&queue->mutex); + while (queue->head != queue->tail) { + int idx = queue->head; + char job[MAX_PATH]; + strncpy(job, queue->jobs[idx], MAX_PATH - 1); + job[MAX_PATH - 1] = '\0'; // Ensure null-termination + queue->job_nodes[idx] = next_node_id + 1; // Assign node to job + + queue->head = (queue->head + 1) % MAX_JOBS; + pthread_mutex_unlock(&queue->mutex); + + printf("[daemon] Dispatching job at index %d to node %d: %s\n", idx, next_node_id + 1, job); + pthread_mutex_lock(&status_array->mutex); + status_array->statuses[next_node_id].lport++; + status_array->statuses[next_node_id].running_jobs++; + pthread_mutex_unlock(&status_array->mutex); + + run_job_on_node(job, next_node_id + 1, status_array->statuses[next_node_id].lport); + + next_node_id = (next_node_id + 1) % NODES; + + pthread_mutex_lock(&queue->mutex); + } + pthread_mutex_unlock(&queue->mutex); + sleep(1); + } + + // Cleanup (unreachable due to infinite loop) + pthread_join(load_thread, NULL); + if (munmap(status_array, sizeof(NodeStatusArray)) == -1) { + perror("[daemon] munmap NODE_STATUS_SHM"); + } + if (close(stat_fd) == -1) { + perror("[daemon] close NODE_STATUS_SHM"); + } + if (munmap(queue, sizeof(JobQueue)) == -1) { + perror("[daemon] munmap SHM_NAME"); + } + if (close(shm_fd) == -1) { + perror("[daemon] close SHM_NAME"); + } + return 0; +} diff --git a/shared.h b/shared.h index 30c8ffe..05f85c6 100644 --- a/shared.h +++ b/shared.h @@ -5,7 +5,7 @@ #define MAX_JOBS 100 #define MAX_PATH 256 -#define MAX_NODES 4 +#define MAX_NODES 2 #define SHM_NAME "/job_queue_shm" #define SHM_NODE_STATUS "/node_status_shm" @@ -13,12 +13,15 @@ typedef struct { pthread_mutex_t mutex; char jobs[MAX_JOBS][MAX_PATH]; - int head; + int job_nodes[MAX_JOBS]; // NEW + + int head; int tail; } JobQueue; typedef struct { int node_id; + int lport; int running_jobs; float load; // from /proc/[pid]/stat or getloadavg() } NodeStatus; -- cgit v1.2.3