summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYour Name <you@example.com>2025-06-17 15:02:05 +0200
committerYour Name <you@example.com>2025-06-17 15:02:05 +0200
commit1445f8e759a0409f561e2c811960fd65b1b64611 (patch)
tree5074bbd1786fe167298bd372817966db735ee94b
initial
-rw-r--r--addjob.c41
-rw-r--r--agent.c95
-rw-r--r--daemon.c127
-rw-r--r--shared.h31
4 files changed, 294 insertions, 0 deletions
diff --git a/addjob.c b/addjob.c
new file mode 100644
index 0000000..db6a9d3
--- /dev/null
+++ b/addjob.c
@@ -0,0 +1,41 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+#include "shared.h"
+
+int main(int argc, char *argv[]) {
+ if (argc != 2) {
+ fprintf(stderr, "Usage: %s <path-to-job-binary>\n", argv[0]);
+ return 1;
+ }
+
+ int shm_fd = shm_open(SHM_NAME, O_RDWR, 0666);
+ if (shm_fd == -1) {
+ perror("shm_open");
+ return 1;
+ }
+
+ JobQueue *queue = mmap(NULL, sizeof(JobQueue), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
+ if (queue == MAP_FAILED) {
+ perror("mmap");
+ return 1;
+ }
+
+ pthread_mutex_lock(&queue->mutex);
+ if ((queue->tail + 1) % MAX_JOBS == queue->head) {
+ printf("[addjob] Queue full\n");
+ } else {
+ strncpy(queue->jobs[queue->tail], argv[1], MAX_PATH);
+ queue->tail = (queue->tail + 1) % MAX_JOBS;
+ printf("[addjob] Queued job: %s\n", argv[1]);
+ }
+ pthread_mutex_unlock(&queue->mutex);
+
+ munmap(queue, sizeof(JobQueue));
+ close(shm_fd);
+ return 0;
+}
diff --git a/agent.c b/agent.c
new file mode 100644
index 0000000..9a6a286
--- /dev/null
+++ b/agent.c
@@ -0,0 +1,95 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+#include <string.h>
+#include <pthread.h>
+#include "shared.h"
+
+float get_load() {
+ double loads[1];
+ if (getloadavg(loads, 1) != -1) {
+ return (float)loads[0];
+ }
+ return -1.0; #!!!!!!!
+}
+
+int main() {
+
+MPI_Init(&argc, &argv);
+ int rank;
+ MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+
+ NodeStatus status;
+ memset(&status, 0, sizeof(NodeStatus));
+ status.node_id = rank;
+
+ #=================================================
+ if (rank == 0) {
+ int shm_fd = shm_open(SHM_NODE_STATUS, O_CREAT | O_RDWR, 0666);
+ if (shm_fd == -1) {
+ perror("shm_open");
+ exit(1);
+ }
+ ftruncate(shm_fd, sizeof(NodeStatusArray));
+
+ NodeStatusArray *ns = mmap(NULL, sizeof(NodeStatusArray), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
+ if (ns == MAP_FAILED) {
+ perror("mmap");
+ exit(1);
+ }
+
+ static int initialized = 0;
+ if (!initialized) {
+ pthread_mutexattr_t attr;
+ pthread_mutexattr_init(&attr);
+ pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
+ pthread_mutex_init(&ns->mutex, &attr);
+ pthread_mutexattr_destroy(&attr);
+ for (int i = 0; i < MAX_NODES; i++) {
+ ns->statuses[i].node_id = i;
+ ns->statuses[i].running_jobs = 0;
+ ns->statuses[i].load = 0.0f;
+ }
+ initialized = 1;
+ }
+
+
+
+ }
+ while (1) {
+ if (rank == 1) {
+ status.loadavg = get_cpu_loadavg();
+ status.num_processes = 0; // done
+ printf("[agent] Rank %d reporting load %.2f\n", rank, status.loadavg);
+ //autocomplete
+ FILE *fp = popen("pgrep -f mpirun | wc -l", "r");
+ if (fp) {
+ fscanf(fp, "%d", status.num_processes);
+ pclose(fp);
+ } else {
+ status.num_processes = -1;
+ }
+ MPI_Send(&status, sizeof(NodeStatus), MPI_BYTE, 0, 0, MPI_COMM_WORLD);
+ } else if (rank == 0) {
+ NodeStatus recv;
+ MPI_Recv(&recv, sizeof(NodeStatus), MPI_BYTE, 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
+ printf("[daemon] Received load %.2f from node %d\n", recv.loadavg, recv.node_id);
+ pthread_mutex_lock(&ns->mutex);
+ ns->statuses[recv.node_id]=recv;
+ pthread_mutex_unlock(&ns->mutex);
+
+
+ }
+ sleep(5);
+ }
+
+ MPI_Finalize();
+ return 0;
+
+
+
+
+}
diff --git a/daemon.c b/daemon.c
new file mode 100644
index 0000000..ea1ffaa
--- /dev/null
+++ b/daemon.c
@@ -0,0 +1,127 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+#include <pthread.h>
+#include "shared.h"
+
+#define NODES 4
+
+void *load_balance_thread(void *arg) {
+ while (1) {
+ pthread_mutex_lock(&status_array->mutex);
+
+ float total = 0;
+ float max = -1, min = 1e9;
+ int max_id = -1, min_id = -1;
+
+ for (int i = 0; i < NODES; i++) {
+ float load = status_array->statuses[i].load;
+ total += load;
+ if (load > max) {
+ max = load;
+ max_id = i;
+ }
+ if (load < min) {
+ min = load;
+ min_id = i;
+ }
+ }
+
+ float avg = total / NODES;
+ if ((max - min) > (avg / 2)) {
+ printf("[daemon] Triggering migration: max load node %d -> min load node %d\n", max_id, min_id);
+ // Call external script/function to checkpoint, scp, and restart
+ char cmd[512];
+ snprintf(cmd, sizeof(cmd), "ssh node%d \"./migrate_heavy_job.sh %d %d\"", max_id, max_id, min_id);
+ system(cmd);
+ }
+
+ pthread_mutex_unlock(&status_array->mutex);
+ sleep(60);
+ }
+ return NULL;
+}
+
+
+void run_job_on_node(const char *binary_path, int node_id) {
+ char cmd[512];
+ snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d %s &", node_id, binary_path);
+ printf("[daemon] Running on node%d: %s\n", node_id, cmd);
+ system(cmd);
+}
+
+int main() {
+ int shm_fd = shm_open(SHM_NAME, O_CREAT | O_RDWR, 0666);
+ if (shm_fd == -1) {
+ perror("shm_open");
+ return 1;
+ }
+ ftruncate(shm_fd, sizeof(JobQueue));
+
+ JobQueue *queue = mmap(NULL, sizeof(JobQueue), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
+ if (queue == MAP_FAILED) {
+ perror("mmap");
+ return 1;
+ }
+
+ int stat_fd = shm_open(NODE_STATUS_SHM, O_RDWR, 0666);
+ if (stat_fd == -1) {
+ perror("shm_open NODE_STATUS_SHM");
+ return 1;
+ }
+
+ NodeStatusArray *status_array = mmap(NULL, sizeof(NodeStatusArray), PROT_READ | PROT_WRITE, MAP_SHARED, stat_fd, 0);
+ if (status_array == MAP_FAILED) {
+ perror("mmap NODE_STATUS_SHM");
+ return 1;
+ }
+
+ static int initialized = 0;
+ if (!initialized) {
+ pthread_mutexattr_t attr;
+ pthread_mutexattr_init(&attr);
+ pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
+ pthread_mutex_init(&queue->mutex, &attr);
+ pthread_mutexattr_destroy(&attr);
+ queue->head = queue->tail = 0;
+
+ // Initialize job_node array
+ for (int i = 0; i < MAX_JOBS; i++) {
+ queue->job_node[i] = -1;
+ }
+ initialized = 1;
+ }
+
+ int next_node_id = 0;
+
+ while (1) {
+ pthread_mutex_lock(&queue->mutex);
+ while (queue->head != queue->tail) {
+ int idx = queue->head;
+ char job[MAX_PATH];
+ strncpy(job, queue->jobs[idx], MAX_PATH);
+
+ // Assign node to job
+ queue->job_node[idx] = next_node_id;
+
+ // Move head forward in the queue
+ queue->head = (queue->head + 1) % MAX_JOBS;
+
+ pthread_mutex_unlock(&queue->mutex);
+
+ printf("[daemon] Dispatching job at index %d to node %d: %s\n", idx, next_node_id, job);
+ run_job_on_node(job, next_node_id);
+
+ next_node_id = (next_node_id + 1) % NODES;
+
+ pthread_mutex_lock(&queue->mutex);
+ }
+ pthread_mutex_unlock(&queue->mutex);
+ sleep(1);
+ }
+ return 0;
+}
diff --git a/shared.h b/shared.h
new file mode 100644
index 0000000..30c8ffe
--- /dev/null
+++ b/shared.h
@@ -0,0 +1,31 @@
+#ifndef SHARED_H
+#define SHARED_H
+
+#include <pthread.h>
+
+#define MAX_JOBS 100
+#define MAX_PATH 256
+#define MAX_NODES 4
+
+#define SHM_NAME "/job_queue_shm"
+#define SHM_NODE_STATUS "/node_status_shm"
+
+typedef struct {
+ pthread_mutex_t mutex;
+ char jobs[MAX_JOBS][MAX_PATH];
+ int head;
+ int tail;
+} JobQueue;
+
+typedef struct {
+ int node_id;
+ int running_jobs;
+ float load; // from /proc/[pid]/stat or getloadavg()
+} NodeStatus;
+
+typedef struct {
+ pthread_mutex_t mutex;
+ NodeStatus statuses[MAX_NODES];
+} NodeStatusArray;
+
+#endif