From 1445f8e759a0409f561e2c811960fd65b1b64611 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 17 Jun 2025 15:02:05 +0200 Subject: initial --- agent.c | 95 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 agent.c (limited to 'agent.c') diff --git a/agent.c b/agent.c new file mode 100644 index 0000000..9a6a286 --- /dev/null +++ b/agent.c @@ -0,0 +1,95 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include "shared.h" + +float get_load() { + double loads[1]; + if (getloadavg(loads, 1) != -1) { + return (float)loads[0]; + } + return -1.0; #!!!!!!! +} + +int main() { + +MPI_Init(&argc, &argv); + int rank; + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + + NodeStatus status; + memset(&status, 0, sizeof(NodeStatus)); + status.node_id = rank; + + #================================================= + if (rank == 0) { + int shm_fd = shm_open(SHM_NODE_STATUS, O_CREAT | O_RDWR, 0666); + if (shm_fd == -1) { + perror("shm_open"); + exit(1); + } + ftruncate(shm_fd, sizeof(NodeStatusArray)); + + NodeStatusArray *ns = mmap(NULL, sizeof(NodeStatusArray), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); + if (ns == MAP_FAILED) { + perror("mmap"); + exit(1); + } + + static int initialized = 0; + if (!initialized) { + pthread_mutexattr_t attr; + pthread_mutexattr_init(&attr); + pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); + pthread_mutex_init(&ns->mutex, &attr); + pthread_mutexattr_destroy(&attr); + for (int i = 0; i < MAX_NODES; i++) { + ns->statuses[i].node_id = i; + ns->statuses[i].running_jobs = 0; + ns->statuses[i].load = 0.0f; + } + initialized = 1; + } + + + + } + while (1) { + if (rank == 1) { + status.loadavg = get_cpu_loadavg(); + status.num_processes = 0; // done + printf("[agent] Rank %d reporting load %.2f\n", rank, status.loadavg); + //autocomplete + FILE *fp = popen("pgrep -f mpirun | wc -l", "r"); + if (fp) { + fscanf(fp, "%d", status.num_processes); + pclose(fp); + } else { + status.num_processes = -1; + } + MPI_Send(&status, sizeof(NodeStatus), MPI_BYTE, 0, 0, MPI_COMM_WORLD); + } else if (rank == 0) { + NodeStatus recv; + MPI_Recv(&recv, sizeof(NodeStatus), MPI_BYTE, 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + printf("[daemon] Received load %.2f from node %d\n", recv.loadavg, recv.node_id); + pthread_mutex_lock(&ns->mutex); + ns->statuses[recv.node_id]=recv; + pthread_mutex_unlock(&ns->mutex); + + + } + sleep(5); + } + + MPI_Finalize(); + return 0; + + + + +} -- cgit v1.2.3