summaryrefslogtreecommitdiff
path: root/agent.c
diff options
context:
space:
mode:
Diffstat (limited to 'agent.c')
-rw-r--r--agent.c137
1 files changed, 73 insertions, 64 deletions
diff --git a/agent.c b/agent.c
index 9a6a286..3b0bd69 100644
--- a/agent.c
+++ b/agent.c
@@ -6,6 +6,8 @@
#include <sys/stat.h>
#include <string.h>
#include <pthread.h>
+#include <sys/sysinfo.h>
+#include <mpi.h>
#include "shared.h"
float get_load() {
@@ -13,83 +15,90 @@ float get_load() {
if (getloadavg(loads, 1) != -1) {
return (float)loads[0];
}
- return -1.0; #!!!!!!!
+ return -1.0;
}
-int main() {
-
-MPI_Init(&argc, &argv);
- int rank;
+int main(int argc, char *argv[]) {
+ MPI_Init(&argc, &argv);
+ int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+ MPI_Comm_size(MPI_COMM_WORLD, &size);
NodeStatus status;
memset(&status, 0, sizeof(NodeStatus));
status.node_id = rank;
-
- #=================================================
+ status.lport = 7778;
+
+ NodeStatusArray *ns = NULL; // only rank0
+ int shm_fd;
+
if (rank == 0) {
- int shm_fd = shm_open(SHM_NODE_STATUS, O_CREAT | O_RDWR, 0666);
- if (shm_fd == -1) {
- perror("shm_open");
- exit(1);
- }
- ftruncate(shm_fd, sizeof(NodeStatusArray));
-
- NodeStatusArray *ns = mmap(NULL, sizeof(NodeStatusArray), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
- if (ns == MAP_FAILED) {
- perror("mmap");
- exit(1);
- }
-
- static int initialized = 0;
- if (!initialized) {
- pthread_mutexattr_t attr;
- pthread_mutexattr_init(&attr);
- pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
- pthread_mutex_init(&ns->mutex, &attr);
- pthread_mutexattr_destroy(&attr);
- for (int i = 0; i < MAX_NODES; i++) {
- ns->statuses[i].node_id = i;
- ns->statuses[i].running_jobs = 0;
- ns->statuses[i].load = 0.0f;
- }
- initialized = 1;
- }
-
-
-
- }
- while (1) {
- if (rank == 1) {
- status.loadavg = get_cpu_loadavg();
- status.num_processes = 0; // done
- printf("[agent] Rank %d reporting load %.2f\n", rank, status.loadavg);
- //autocomplete
- FILE *fp = popen("pgrep -f mpirun | wc -l", "r");
- if (fp) {
- fscanf(fp, "%d", status.num_processes);
- pclose(fp);
- } else {
- status.num_processes = -1;
- }
- MPI_Send(&status, sizeof(NodeStatus), MPI_BYTE, 0, 0, MPI_COMM_WORLD);
- } else if (rank == 0) {
- NodeStatus recv;
- MPI_Recv(&recv, sizeof(NodeStatus), MPI_BYTE, 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
- printf("[daemon] Received load %.2f from node %d\n", recv.loadavg, recv.node_id);
- pthread_mutex_lock(&ns->mutex);
- ns->statuses[recv.node_id]=recv;
- pthread_mutex_unlock(&ns->mutex);
-
-
+ //shm
+ shm_fd = shm_open(SHM_NODE_STATUS, O_CREAT | O_RDWR, 0666);
+
+
+ ftruncate(shm_fd, sizeof(NodeStatusArray));
+
+ ns = mmap(NULL, sizeof(NodeStatusArray), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
+
+
+ pthread_mutexattr_t attr;
+ pthread_mutexattr_init(&attr);
+ pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
+ pthread_mutex_init(&ns->mutex, &attr);
+ pthread_mutexattr_destroy(&attr);
+
+ for (int i = 0; i < MAX_NODES; i++) {
+ ns->statuses[i].node_id = i;
+ ns->statuses[i].running_jobs = 0;
+ ns->statuses[i].load = 0.0f;
+ ns->statuses[i].lport = 7778;
}
- sleep(5);
}
- MPI_Finalize();
- return 0;
+ MPI_Barrier(MPI_COMM_WORLD);
+ while (1) {
+ status.load = get_load();
+ FILE *fp = popen("pgrep -f mpirun | wc -l", "r");
+ if (fp) {
+ if (fscanf(fp, "%d", &status.running_jobs) != 1) {
+ status.running_jobs = -1;
+ }
+ pclose(fp);
+ } else {
+ status.running_jobs = -1;
+ }
+
+ printf("[agent] Rank %d reporting load %.2f, running jobs %d\n", rank, status.load, status.running_jobs);
+
+ if (rank == 0) {
+ pthread_mutex_lock(&ns->mutex);
+ ns->statuses[rank].running_jobs = status.running_jobs;
+ ns->statuses[rank].load = status.load;
+ pthread_mutex_unlock(&ns->mutex);
+
+ for (int i = 1; i < size; i++) {
+ NodeStatus recv;
+ int ret = MPI_Recv(&recv, sizeof(NodeStatus), MPI_BYTE, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
+
+ printf("[agent] Received load %.2f, running jobs %d from node %d\n", recv.load, recv.running_jobs, recv.node_id);
+
+ pthread_mutex_lock(&ns->mutex);
+ ns->statuses[recv.node_id].running_jobs = recv.running_jobs;
+ ns->statuses[recv.node_id].load = recv.load;
+ pthread_mutex_unlock(&ns->mutex);
+ }
+ } else {
+ int ret = MPI_Send(&status, sizeof(NodeStatus), MPI_BYTE, 0, 0, MPI_COMM_WORLD);
+ }
+
+ sleep(5);
+ }
+
+ MPI_Finalize();
+ return 0;
}