summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYour Name <you@example.com>2025-07-15 02:55:46 +0000
committerYour Name <you@example.com>2025-07-15 02:55:46 +0000
commit082e3adb0f3fd13b2f9de827449f1ec1ed037bf3 (patch)
tree3241cfc4b8cfbea201909df643f1011f650d8a27
parent2dbcce88ab7a73304f770006bcae53461c6daddc (diff)
0.9 + nightly builds
-rwxr-xr-xaddjobbin0 -> 17272 bytes
-rw-r--r--addjob.c19
-rwxr-xr-xagentbin0 -> 18056 bytes
-rw-r--r--agent.c137
-rwxr-xr-xcode/agentbin0 -> 18056 bytes
-rwxr-xr-xcode/daemon2bin0 -> 17888 bytes
-rwxr-xr-xdaemonbin0 -> 17712 bytes
-rw-r--r--daemon.c111
-rwxr-xr-xdaemon2bin0 -> 17888 bytes
-rw-r--r--daemon2.c206
-rw-r--r--shared.h7
11 files changed, 355 insertions, 125 deletions
diff --git a/addjob b/addjob
new file mode 100755
index 0000000..a23d1f0
--- /dev/null
+++ b/addjob
Binary files differ
diff --git a/addjob.c b/addjob.c
index db6a9d3..6af7847 100644
--- a/addjob.c
+++ b/addjob.c
@@ -8,11 +8,20 @@
#include "shared.h"
int main(int argc, char *argv[]) {
- if (argc != 2) {
- fprintf(stderr, "Usage: %s <path-to-job-binary>\n", argv[0]);
+ if (argc != 2 && argc != 3) {
+ fprintf(stderr, "Usage: %s <path-to-job-binary> [node]\n", argv[0]);
return 1;
}
+ int target_node = -1;
+ if (argc == 3) {
+ target_node = atoi(argv[2]);
+ if (target_node < 0 || target_node >= MAX_NODES) {
+ fprintf(stderr, "[addjob] Invalid node ID: %d\n", target_node);
+ return 2;
+ }
+ }
+
int shm_fd = shm_open(SHM_NAME, O_RDWR, 0666);
if (shm_fd == -1) {
perror("shm_open");
@@ -30,8 +39,12 @@ int main(int argc, char *argv[]) {
printf("[addjob] Queue full\n");
} else {
strncpy(queue->jobs[queue->tail], argv[1], MAX_PATH);
+ queue->job_nodes[queue->tail] = target_node;
queue->tail = (queue->tail + 1) % MAX_JOBS;
- printf("[addjob] Queued job: %s\n", argv[1]);
+ printf("[addjob] Queued job: %s", argv[1]);
+ if (target_node != -1)
+ printf(" for node %d", target_node);
+ printf("\n");
}
pthread_mutex_unlock(&queue->mutex);
diff --git a/agent b/agent
new file mode 100755
index 0000000..f4cf8c6
--- /dev/null
+++ b/agent
Binary files differ
diff --git a/agent.c b/agent.c
index 9a6a286..3b0bd69 100644
--- a/agent.c
+++ b/agent.c
@@ -6,6 +6,8 @@
#include <sys/stat.h>
#include <string.h>
#include <pthread.h>
+#include <sys/sysinfo.h>
+#include <mpi.h>
#include "shared.h"
float get_load() {
@@ -13,83 +15,90 @@ float get_load() {
if (getloadavg(loads, 1) != -1) {
return (float)loads[0];
}
- return -1.0; #!!!!!!!
+ return -1.0;
}
-int main() {
-
-MPI_Init(&argc, &argv);
- int rank;
+int main(int argc, char *argv[]) {
+ MPI_Init(&argc, &argv);
+ int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+ MPI_Comm_size(MPI_COMM_WORLD, &size);
NodeStatus status;
memset(&status, 0, sizeof(NodeStatus));
status.node_id = rank;
-
- #=================================================
+ status.lport = 7778;
+
+ NodeStatusArray *ns = NULL; // only rank0
+ int shm_fd;
+
if (rank == 0) {
- int shm_fd = shm_open(SHM_NODE_STATUS, O_CREAT | O_RDWR, 0666);
- if (shm_fd == -1) {
- perror("shm_open");
- exit(1);
- }
- ftruncate(shm_fd, sizeof(NodeStatusArray));
-
- NodeStatusArray *ns = mmap(NULL, sizeof(NodeStatusArray), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
- if (ns == MAP_FAILED) {
- perror("mmap");
- exit(1);
- }
-
- static int initialized = 0;
- if (!initialized) {
- pthread_mutexattr_t attr;
- pthread_mutexattr_init(&attr);
- pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
- pthread_mutex_init(&ns->mutex, &attr);
- pthread_mutexattr_destroy(&attr);
- for (int i = 0; i < MAX_NODES; i++) {
- ns->statuses[i].node_id = i;
- ns->statuses[i].running_jobs = 0;
- ns->statuses[i].load = 0.0f;
- }
- initialized = 1;
- }
-
-
-
- }
- while (1) {
- if (rank == 1) {
- status.loadavg = get_cpu_loadavg();
- status.num_processes = 0; // done
- printf("[agent] Rank %d reporting load %.2f\n", rank, status.loadavg);
- //autocomplete
- FILE *fp = popen("pgrep -f mpirun | wc -l", "r");
- if (fp) {
- fscanf(fp, "%d", status.num_processes);
- pclose(fp);
- } else {
- status.num_processes = -1;
- }
- MPI_Send(&status, sizeof(NodeStatus), MPI_BYTE, 0, 0, MPI_COMM_WORLD);
- } else if (rank == 0) {
- NodeStatus recv;
- MPI_Recv(&recv, sizeof(NodeStatus), MPI_BYTE, 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
- printf("[daemon] Received load %.2f from node %d\n", recv.loadavg, recv.node_id);
- pthread_mutex_lock(&ns->mutex);
- ns->statuses[recv.node_id]=recv;
- pthread_mutex_unlock(&ns->mutex);
-
-
+ //shm
+ shm_fd = shm_open(SHM_NODE_STATUS, O_CREAT | O_RDWR, 0666);
+
+
+ ftruncate(shm_fd, sizeof(NodeStatusArray));
+
+ ns = mmap(NULL, sizeof(NodeStatusArray), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
+
+
+ pthread_mutexattr_t attr;
+ pthread_mutexattr_init(&attr);
+ pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
+ pthread_mutex_init(&ns->mutex, &attr);
+ pthread_mutexattr_destroy(&attr);
+
+ for (int i = 0; i < MAX_NODES; i++) {
+ ns->statuses[i].node_id = i;
+ ns->statuses[i].running_jobs = 0;
+ ns->statuses[i].load = 0.0f;
+ ns->statuses[i].lport = 7778;
}
- sleep(5);
}
- MPI_Finalize();
- return 0;
+ MPI_Barrier(MPI_COMM_WORLD);
+ while (1) {
+ status.load = get_load();
+ FILE *fp = popen("pgrep -f mpirun | wc -l", "r");
+ if (fp) {
+ if (fscanf(fp, "%d", &status.running_jobs) != 1) {
+ status.running_jobs = -1;
+ }
+ pclose(fp);
+ } else {
+ status.running_jobs = -1;
+ }
+
+ printf("[agent] Rank %d reporting load %.2f, running jobs %d\n", rank, status.load, status.running_jobs);
+
+ if (rank == 0) {
+ pthread_mutex_lock(&ns->mutex);
+ ns->statuses[rank].running_jobs = status.running_jobs;
+ ns->statuses[rank].load = status.load;
+ pthread_mutex_unlock(&ns->mutex);
+
+ for (int i = 1; i < size; i++) {
+ NodeStatus recv;
+ int ret = MPI_Recv(&recv, sizeof(NodeStatus), MPI_BYTE, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
+
+ printf("[agent] Received load %.2f, running jobs %d from node %d\n", recv.load, recv.running_jobs, recv.node_id);
+
+ pthread_mutex_lock(&ns->mutex);
+ ns->statuses[recv.node_id].running_jobs = recv.running_jobs;
+ ns->statuses[recv.node_id].load = recv.load;
+ pthread_mutex_unlock(&ns->mutex);
+ }
+ } else {
+ int ret = MPI_Send(&status, sizeof(NodeStatus), MPI_BYTE, 0, 0, MPI_COMM_WORLD);
+ }
+
+ sleep(5);
+ }
+
+ MPI_Finalize();
+ return 0;
}
diff --git a/code/agent b/code/agent
new file mode 100755
index 0000000..f4cf8c6
--- /dev/null
+++ b/code/agent
Binary files differ
diff --git a/code/daemon2 b/code/daemon2
new file mode 100755
index 0000000..66c3582
--- /dev/null
+++ b/code/daemon2
Binary files differ
diff --git a/daemon b/daemon
new file mode 100755
index 0000000..e351052
--- /dev/null
+++ b/daemon
Binary files differ
diff --git a/daemon.c b/daemon.c
index 5c50f94..bd0fa2c 100644
--- a/daemon.c
+++ b/daemon.c
@@ -8,9 +8,11 @@
#include <pthread.h>
#include "shared.h"
-#define NODES 4
+#define NODES 2
void *load_balance_thread(void *arg) {
+ NodeStatusArray *status_array = (NodeStatusArray *)arg;
+
while (1) {
pthread_mutex_lock(&status_array->mutex);
@@ -32,57 +34,51 @@ void *load_balance_thread(void *arg) {
}
float avg = total / NODES;
- if ((max - min) > (avg / 2)) {
+ if ((max - min) > (avg / 2) && avg>0.1) {
printf("[daemon] Triggering migration: max load node %d -> min load node %d\n", max_id, min_id);
- // Call external script/function to checkpoint, scp, and restart
- char cmd[512];
- snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d checkpointandbalance.sh %d &", min_id, max_id);
- printf("[daemon] Running on node%d: %s\n", node_id, cmd);
- system(cmd);
- char cmd[512];
- snprintf(cmd, sizeof(cmd), "ssh node%d \"./migrate_heavy_job.sh %d %d\"", max_id, max_id, min_id);
+ printf("[daemon] Threashold exceeded: (%6.2f - %6.2f) = %6.2f > ( %6.2f / 2) \n", max, min, (max - min), avg);
+ char cmd[512];
+
+ status_array->statuses[min_id].lport++;
+ status_array->statuses[max_id].lport--;
+ snprintf(cmd, sizeof(cmd),
+ "ssh node%d '~/dmtcp/bin/migrate_heavy_job.sh %d %d %d'",
+ max_id + 1, min_id + 1, status_array->statuses[min_id].lport, status_array->statuses[max_id].lport);
+ printf("[daemon] Running migration script on node%d: %s\n", max_id+1, cmd);
system(cmd);
- }
- pthread_mutex_unlock(&status_array->mutex);
- sleep(60);
+
}
- return NULL;
+ pthread_mutex_unlock(&status_array->mutex);
+
+ sleep(60);
}
+ return NULL;
+}
-void run_job_on_node(const char *binary_path, int node_id) {
+void run_job_on_node(const char *binary_path, int node_id, int port) {
char cmd[512];
- snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d %s &", node_id, binary_path);
- printf("[daemon] Running on node%d: %s\n", node_id, cmd);
+ snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d ~/dmtcp/bin/dmtcp_coordinator --port %d --daemon", node_id, port);
+ system(cmd);
+ snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d ~/dmtcp/bin/dmtcp_launch --coord-port %d python3 ~/dmtcp/bin/%s &", node_id, port, binary_path);
+ printf("[daemon] Disptached to node%d: %s\n", node_id, cmd);
system(cmd);
}
int main() {
int shm_fd = shm_open(SHM_NAME, O_CREAT | O_RDWR, 0666);
- if (shm_fd == -1) {
- perror("shm_open");
- return 1;
- }
+
ftruncate(shm_fd, sizeof(JobQueue));
JobQueue *queue = mmap(NULL, sizeof(JobQueue), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
- if (queue == MAP_FAILED) {
- perror("mmap");
- return 1;
- }
- int stat_fd = shm_open(NODE_STATUS_SHM, O_RDWR, 0666);
- if (stat_fd == -1) {
- perror("shm_open NODE_STATUS_SHM");
- return 1;
- }
+
+ int stat_fd = shm_open(SHM_NODE_STATUS, O_RDWR, 0666);
+
NodeStatusArray *status_array = mmap(NULL, sizeof(NodeStatusArray), PROT_READ | PROT_WRITE, MAP_SHARED, stat_fd, 0);
- if (status_array == MAP_FAILED) {
- perror("mmap NODE_STATUS_SHM");
- return 1;
- }
+
static int initialized = 0;
if (!initialized) {
@@ -93,39 +89,42 @@ int main() {
pthread_mutexattr_destroy(&attr);
queue->head = queue->tail = 0;
- // Initialize job_node array
- for (int i = 0; i < MAX_JOBS; i++) {
- queue->job_node[i] = -1;
- }
+
+
+ pthread_t load_thread; //LOADB_pthread
+ pthread_create(&load_thread, NULL, load_balance_thread, status_array);
initialized = 1;
}
int next_node_id = 0;
- while (1) {
- pthread_mutex_lock(&queue->mutex);
- while (queue->head != queue->tail) {
- int idx = queue->head;
- char job[MAX_PATH];
- strncpy(job, queue->jobs[idx], MAX_PATH);
-
- // Assign node to job
- queue->job_node[idx] = next_node_id;
+while (1) {
+ pthread_mutex_lock(&queue->mutex);
+ while (queue->head != queue->tail) {
+ int idx = queue->head;
+ char job[MAX_PATH];
+ strncpy(job, queue->jobs[idx], MAX_PATH);
- // Move head forward in the queue
- queue->head = (queue->head + 1) % MAX_JOBS;
+ int job_target_node = queue->job_nodes[idx];
+ int assigned_node = (job_target_node != -1) ? job_target_node : next_node_id;
- pthread_mutex_unlock(&queue->mutex);
-
- printf("[daemon] Dispatching job at index %d to node %d: %s\n", idx, next_node_id, job);
- run_job_on_node(job, next_node_id);
+ pthread_mutex_unlock(&queue->mutex);
- next_node_id = (next_node_id + 1) % NODES;
+ printf("[daemon] Dispatching job at index %d to node %d: %s\n", idx, assigned_node, job);
+ status_array->statuses[assigned_node].lport++;
+ printf("[DEBUG] Dispatching job at index %d to node %d: %s\n", idx, assigned_node, job);
+ run_job_on_node(job, assigned_node + 1, status_array->statuses[assigned_node].lport);
- pthread_mutex_lock(&queue->mutex);
+ if (job_target_node == -1) {
+ next_node_id = (next_node_id + 1) % NODES; // round-robin advance only if not user-specified
}
- pthread_mutex_unlock(&queue->mutex);
- sleep(1);
+
+ pthread_mutex_lock(&queue->mutex);
+ queue->head = (queue->head + 1) % MAX_JOBS;
}
+ pthread_mutex_unlock(&queue->mutex);
+ sleep(1);
+}
+
return 0;
}
diff --git a/daemon2 b/daemon2
new file mode 100755
index 0000000..66c3582
--- /dev/null
+++ b/daemon2
Binary files differ
diff --git a/daemon2.c b/daemon2.c
new file mode 100644
index 0000000..bb1465e
--- /dev/null
+++ b/daemon2.c
@@ -0,0 +1,206 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+#include <pthread.h>
+#include "shared.h"
+
+#define NODES 2
+
+void *load_balance_thread(void *arg) {
+ NodeStatusArray *status_array = (NodeStatusArray *)arg;
+ while (1) {
+ pthread_mutex_lock(&status_array->mutex);
+ printf("[daemon] Acquired mutex for load balancing on %s\n", getenv("HOSTNAME") ? getenv("HOSTNAME") : "unknown");
+
+ float total = 0;
+ float max = -1, min = 1e9;
+ int max_id = -1, min_id = -1;
+
+ for (int i = 0; i < NODES; i++) {
+ float load = status_array->statuses[i].load;
+ total += load;
+ if (load > max) {
+ max = load;
+ max_id = i;
+ }
+ if (load < min) {
+ min = load;
+ min_id = i;
+ }
+ }
+
+ float avg = total / NODES;
+ char cmd[512] = {0};
+ int new_min_port = -1;
+ if ((max - min) > (avg / 2) && max_id != -1 && min_id != -1 && (max - min)>0.3) {
+ printf("[daemon] Triggering migration: max load node %d -> min load node %d\n", max_id, min_id);
+ new_min_port = status_array->statuses[min_id].lport + 1;
+ status_array->statuses[min_id].lport = new_min_port;
+ }
+
+ pthread_mutex_unlock(&status_array->mutex);
+ printf("[daemon] Released mutex for load balancing on %s\n", getenv("HOSTNAME") ? getenv("HOSTNAME") : "unknown");
+
+ if (new_min_port != -1) {
+ int ret = snprintf(cmd, sizeof(cmd),
+ "mpirun -np 1 -host node%d ~/dmtcp/bin/migrate_heavy_job2.sh %d %d %d",
+ max_id+1, min_id+1, new_min_port, new_min_port);
+ if (ret >= sizeof(cmd)) {
+ fprintf(stderr, "[daemon] Command truncated in load_balance_thread\n");
+ } else {
+ printf("[daemon] Running migration script on node%d: %s\n", max_id, cmd);
+ if (system(cmd) != 0) {
+ fprintf(stderr, "[daemon] Migration script failed on node%d\n", max_id);
+ }
+ }
+ }
+
+ sleep(60);
+ }
+ return NULL;
+}
+
+void run_job_on_node(const char *binary_path, int node_id, int port) {
+ char cmd[512];
+ int ret = snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d ~/dmtcp/bin/dmtcp_coordinator --port %d --daemon", node_id, port);
+ if (ret >= sizeof(cmd)) {
+ fprintf(stderr, "[daemon] Command truncated in run_job_on_node (coordinator)\n");
+ return;
+ }
+ if (system(cmd) != 0) {
+ fprintf(stderr, "[daemon] Failed to start dmtcp_coordinator on node%d\n", node_id);
+ }
+
+ ret = snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d ~/dmtcp/bin/dmtcp_launch --coord-port %d %s &", node_id, port, binary_path);
+ if (ret >= sizeof(cmd)) {
+ fprintf(stderr, "[daemon] Command truncated in run_job_on_node (launch)\n");
+ return;
+ }
+ printf("[daemon] Running on node%d: %s\n", node_id, cmd);
+ if (system(cmd) != 0) {
+ fprintf(stderr, "[daemon] Failed to launch job on node%d\n", node_id);
+ }
+}
+
+int main() {
+ // Open or create job queue shared memory
+ int shm_fd = shm_open(SHM_NAME, O_CREAT | O_RDWR, 0666);
+ if (shm_fd == -1) {
+ perror("[daemon] shm_open SHM_NAME");
+ return 1;
+ }
+ if (ftruncate(shm_fd, sizeof(JobQueue)) == -1) {
+ perror("[daemon] ftruncate SHM_NAME");
+ close(shm_fd);
+ return 1;
+ }
+
+ JobQueue *queue = mmap(NULL, sizeof(JobQueue), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
+ if (queue == MAP_FAILED) {
+ perror("[daemon] mmap SHM_NAME");
+ close(shm_fd);
+ return 1;
+ }
+
+ // Open node status shared memory (created by agent.c rank 0)
+ int stat_fd = shm_open(SHM_NODE_STATUS, O_RDWR, 0666);
+ if (stat_fd == -1) {
+ perror("[daemon] shm_open NODE_STATUS_SHM");
+ munmap(queue, sizeof(JobQueue));
+ close(shm_fd);
+ return 1;
+ }
+
+ NodeStatusArray *status_array = mmap(NULL, sizeof(NodeStatusArray), PROT_READ | PROT_WRITE, MAP_SHARED, stat_fd, 0);
+ if (status_array == MAP_FAILED) {
+ perror("[daemon] mmap NODE_STATUS_SHM");
+ close(stat_fd);
+ munmap(queue, sizeof(JobQueue));
+ close(shm_fd);
+ return 1;
+ }
+
+ static int initialized = 0;
+ if (!initialized) {
+ pthread_mutexattr_t attr;
+ pthread_mutexattr_init(&attr);
+ pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
+ if (pthread_mutex_init(&queue->mutex, &attr) != 0) {
+ perror("[daemon] queue mutex initialization failed");
+ munmap(status_array, sizeof(NodeStatusArray));
+ close(stat_fd);
+ munmap(queue, sizeof(JobQueue));
+ close(shm_fd);
+ return 1;
+ }
+ pthread_mutexattr_destroy(&attr);
+ queue->head = queue->tail = 0;
+
+ // Initialize job_node array
+ for (int i = 0; i < MAX_JOBS; i++) {
+ queue->job_nodes[i] = -1;
+ }
+ initialized = 1;
+ }
+
+ // Start load balancing thread
+ pthread_t load_thread;
+ if (pthread_create(&load_thread, NULL, load_balance_thread, status_array) != 0) {
+ perror("[daemon] pthread_create");
+ munmap(status_array, sizeof(NodeStatusArray));
+ close(stat_fd);
+ munmap(queue, sizeof(JobQueue));
+ close(shm_fd);
+ return 1;
+ }
+
+ int next_node_id = 0;
+
+ while (1) {
+ pthread_mutex_lock(&queue->mutex);
+ while (queue->head != queue->tail) {
+ int idx = queue->head;
+ char job[MAX_PATH];
+ strncpy(job, queue->jobs[idx], MAX_PATH - 1);
+ job[MAX_PATH - 1] = '\0'; // Ensure null-termination
+ queue->job_nodes[idx] = next_node_id + 1; // Assign node to job
+
+ queue->head = (queue->head + 1) % MAX_JOBS;
+ pthread_mutex_unlock(&queue->mutex);
+
+ printf("[daemon] Dispatching job at index %d to node %d: %s\n", idx, next_node_id + 1, job);
+ pthread_mutex_lock(&status_array->mutex);
+ status_array->statuses[next_node_id].lport++;
+ status_array->statuses[next_node_id].running_jobs++;
+ pthread_mutex_unlock(&status_array->mutex);
+
+ run_job_on_node(job, next_node_id + 1, status_array->statuses[next_node_id].lport);
+
+ next_node_id = (next_node_id + 1) % NODES;
+
+ pthread_mutex_lock(&queue->mutex);
+ }
+ pthread_mutex_unlock(&queue->mutex);
+ sleep(1);
+ }
+
+ // Cleanup (unreachable due to infinite loop)
+ pthread_join(load_thread, NULL);
+ if (munmap(status_array, sizeof(NodeStatusArray)) == -1) {
+ perror("[daemon] munmap NODE_STATUS_SHM");
+ }
+ if (close(stat_fd) == -1) {
+ perror("[daemon] close NODE_STATUS_SHM");
+ }
+ if (munmap(queue, sizeof(JobQueue)) == -1) {
+ perror("[daemon] munmap SHM_NAME");
+ }
+ if (close(shm_fd) == -1) {
+ perror("[daemon] close SHM_NAME");
+ }
+ return 0;
+}
diff --git a/shared.h b/shared.h
index 30c8ffe..05f85c6 100644
--- a/shared.h
+++ b/shared.h
@@ -5,7 +5,7 @@
#define MAX_JOBS 100
#define MAX_PATH 256
-#define MAX_NODES 4
+#define MAX_NODES 2
#define SHM_NAME "/job_queue_shm"
#define SHM_NODE_STATUS "/node_status_shm"
@@ -13,12 +13,15 @@
typedef struct {
pthread_mutex_t mutex;
char jobs[MAX_JOBS][MAX_PATH];
- int head;
+ int job_nodes[MAX_JOBS]; // NEW
+
+ int head;
int tail;
} JobQueue;
typedef struct {
int node_id;
+ int lport;
int running_jobs;
float load; // from /proc/[pid]/stat or getloadavg()
} NodeStatus;