diff options
-rw-r--r-- | addjob.c | 41 | ||||
-rw-r--r-- | agent.c | 95 | ||||
-rw-r--r-- | daemon.c | 127 | ||||
-rw-r--r-- | shared.h | 31 |
4 files changed, 294 insertions, 0 deletions
diff --git a/addjob.c b/addjob.c new file mode 100644 index 0000000..db6a9d3 --- /dev/null +++ b/addjob.c @@ -0,0 +1,41 @@ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <fcntl.h> +#include <unistd.h> +#include <sys/mman.h> +#include <sys/stat.h> +#include "shared.h" + +int main(int argc, char *argv[]) { + if (argc != 2) { + fprintf(stderr, "Usage: %s <path-to-job-binary>\n", argv[0]); + return 1; + } + + int shm_fd = shm_open(SHM_NAME, O_RDWR, 0666); + if (shm_fd == -1) { + perror("shm_open"); + return 1; + } + + JobQueue *queue = mmap(NULL, sizeof(JobQueue), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); + if (queue == MAP_FAILED) { + perror("mmap"); + return 1; + } + + pthread_mutex_lock(&queue->mutex); + if ((queue->tail + 1) % MAX_JOBS == queue->head) { + printf("[addjob] Queue full\n"); + } else { + strncpy(queue->jobs[queue->tail], argv[1], MAX_PATH); + queue->tail = (queue->tail + 1) % MAX_JOBS; + printf("[addjob] Queued job: %s\n", argv[1]); + } + pthread_mutex_unlock(&queue->mutex); + + munmap(queue, sizeof(JobQueue)); + close(shm_fd); + return 0; +} @@ -0,0 +1,95 @@ +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <fcntl.h> +#include <sys/mman.h> +#include <sys/stat.h> +#include <string.h> +#include <pthread.h> +#include "shared.h" + +float get_load() { + double loads[1]; + if (getloadavg(loads, 1) != -1) { + return (float)loads[0]; + } + return -1.0; #!!!!!!! +} + +int main() { + +MPI_Init(&argc, &argv); + int rank; + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + + NodeStatus status; + memset(&status, 0, sizeof(NodeStatus)); + status.node_id = rank; + + #================================================= + if (rank == 0) { + int shm_fd = shm_open(SHM_NODE_STATUS, O_CREAT | O_RDWR, 0666); + if (shm_fd == -1) { + perror("shm_open"); + exit(1); + } + ftruncate(shm_fd, sizeof(NodeStatusArray)); + + NodeStatusArray *ns = mmap(NULL, sizeof(NodeStatusArray), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); + if (ns == MAP_FAILED) { + perror("mmap"); + exit(1); + } + + static int initialized = 0; + if (!initialized) { + pthread_mutexattr_t attr; + pthread_mutexattr_init(&attr); + pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); + pthread_mutex_init(&ns->mutex, &attr); + pthread_mutexattr_destroy(&attr); + for (int i = 0; i < MAX_NODES; i++) { + ns->statuses[i].node_id = i; + ns->statuses[i].running_jobs = 0; + ns->statuses[i].load = 0.0f; + } + initialized = 1; + } + + + + } + while (1) { + if (rank == 1) { + status.loadavg = get_cpu_loadavg(); + status.num_processes = 0; // done + printf("[agent] Rank %d reporting load %.2f\n", rank, status.loadavg); + //autocomplete + FILE *fp = popen("pgrep -f mpirun | wc -l", "r"); + if (fp) { + fscanf(fp, "%d", status.num_processes); + pclose(fp); + } else { + status.num_processes = -1; + } + MPI_Send(&status, sizeof(NodeStatus), MPI_BYTE, 0, 0, MPI_COMM_WORLD); + } else if (rank == 0) { + NodeStatus recv; + MPI_Recv(&recv, sizeof(NodeStatus), MPI_BYTE, 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + printf("[daemon] Received load %.2f from node %d\n", recv.loadavg, recv.node_id); + pthread_mutex_lock(&ns->mutex); + ns->statuses[recv.node_id]=recv; + pthread_mutex_unlock(&ns->mutex); + + + } + sleep(5); + } + + MPI_Finalize(); + return 0; + + + + +} diff --git a/daemon.c b/daemon.c new file mode 100644 index 0000000..ea1ffaa --- /dev/null +++ b/daemon.c @@ -0,0 +1,127 @@ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <fcntl.h> +#include <sys/mman.h> +#include <sys/stat.h> +#include <pthread.h> +#include "shared.h" + +#define NODES 4 + +void *load_balance_thread(void *arg) { + while (1) { + pthread_mutex_lock(&status_array->mutex); + + float total = 0; + float max = -1, min = 1e9; + int max_id = -1, min_id = -1; + + for (int i = 0; i < NODES; i++) { + float load = status_array->statuses[i].load; + total += load; + if (load > max) { + max = load; + max_id = i; + } + if (load < min) { + min = load; + min_id = i; + } + } + + float avg = total / NODES; + if ((max - min) > (avg / 2)) { + printf("[daemon] Triggering migration: max load node %d -> min load node %d\n", max_id, min_id); + // Call external script/function to checkpoint, scp, and restart + char cmd[512]; + snprintf(cmd, sizeof(cmd), "ssh node%d \"./migrate_heavy_job.sh %d %d\"", max_id, max_id, min_id); + system(cmd); + } + + pthread_mutex_unlock(&status_array->mutex); + sleep(60); + } + return NULL; +} + + +void run_job_on_node(const char *binary_path, int node_id) { + char cmd[512]; + snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d %s &", node_id, binary_path); + printf("[daemon] Running on node%d: %s\n", node_id, cmd); + system(cmd); +} + +int main() { + int shm_fd = shm_open(SHM_NAME, O_CREAT | O_RDWR, 0666); + if (shm_fd == -1) { + perror("shm_open"); + return 1; + } + ftruncate(shm_fd, sizeof(JobQueue)); + + JobQueue *queue = mmap(NULL, sizeof(JobQueue), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); + if (queue == MAP_FAILED) { + perror("mmap"); + return 1; + } + + int stat_fd = shm_open(NODE_STATUS_SHM, O_RDWR, 0666); + if (stat_fd == -1) { + perror("shm_open NODE_STATUS_SHM"); + return 1; + } + + NodeStatusArray *status_array = mmap(NULL, sizeof(NodeStatusArray), PROT_READ | PROT_WRITE, MAP_SHARED, stat_fd, 0); + if (status_array == MAP_FAILED) { + perror("mmap NODE_STATUS_SHM"); + return 1; + } + + static int initialized = 0; + if (!initialized) { + pthread_mutexattr_t attr; + pthread_mutexattr_init(&attr); + pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); + pthread_mutex_init(&queue->mutex, &attr); + pthread_mutexattr_destroy(&attr); + queue->head = queue->tail = 0; + + // Initialize job_node array + for (int i = 0; i < MAX_JOBS; i++) { + queue->job_node[i] = -1; + } + initialized = 1; + } + + int next_node_id = 0; + + while (1) { + pthread_mutex_lock(&queue->mutex); + while (queue->head != queue->tail) { + int idx = queue->head; + char job[MAX_PATH]; + strncpy(job, queue->jobs[idx], MAX_PATH); + + // Assign node to job + queue->job_node[idx] = next_node_id; + + // Move head forward in the queue + queue->head = (queue->head + 1) % MAX_JOBS; + + pthread_mutex_unlock(&queue->mutex); + + printf("[daemon] Dispatching job at index %d to node %d: %s\n", idx, next_node_id, job); + run_job_on_node(job, next_node_id); + + next_node_id = (next_node_id + 1) % NODES; + + pthread_mutex_lock(&queue->mutex); + } + pthread_mutex_unlock(&queue->mutex); + sleep(1); + } + return 0; +} diff --git a/shared.h b/shared.h new file mode 100644 index 0000000..30c8ffe --- /dev/null +++ b/shared.h @@ -0,0 +1,31 @@ +#ifndef SHARED_H +#define SHARED_H + +#include <pthread.h> + +#define MAX_JOBS 100 +#define MAX_PATH 256 +#define MAX_NODES 4 + +#define SHM_NAME "/job_queue_shm" +#define SHM_NODE_STATUS "/node_status_shm" + +typedef struct { + pthread_mutex_t mutex; + char jobs[MAX_JOBS][MAX_PATH]; + int head; + int tail; +} JobQueue; + +typedef struct { + int node_id; + int running_jobs; + float load; // from /proc/[pid]/stat or getloadavg() +} NodeStatus; + +typedef struct { + pthread_mutex_t mutex; + NodeStatus statuses[MAX_NODES]; +} NodeStatusArray; + +#endif |