#include #include #include #include #include #include #include #include #include #include #include "shared.h" float get_load() { double loads[1]; if (getloadavg(loads, 1) != -1) { return (float)loads[0]; } return -1.0; } int main(int argc, char *argv[]) { MPI_Init(&argc, &argv); int rank, size; MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &size); NodeStatus status; memset(&status, 0, sizeof(NodeStatus)); status.node_id = rank; status.lport = 7778; NodeStatusArray *ns = NULL; // only rank0 int shm_fd; if (rank == 0) { //shm shm_fd = shm_open(SHM_NODE_STATUS, O_CREAT | O_RDWR, 0666); ftruncate(shm_fd, sizeof(NodeStatusArray)); ns = mmap(NULL, sizeof(NodeStatusArray), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); pthread_mutexattr_t attr; pthread_mutexattr_init(&attr); pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); pthread_mutex_init(&ns->mutex, &attr); pthread_mutexattr_destroy(&attr); for (int i = 0; i < MAX_NODES; i++) { ns->statuses[i].node_id = i; ns->statuses[i].running_jobs = 0; ns->statuses[i].load = 0.0f; ns->statuses[i].lport = 7778; } } MPI_Barrier(MPI_COMM_WORLD); while (1) { status.load = get_load(); FILE *fp = popen("pgrep -f mpirun | wc -l", "r"); if (fp) { if (fscanf(fp, "%d", &status.running_jobs) != 1) { status.running_jobs = -1; } pclose(fp); } else { status.running_jobs = -1; } printf("[agent] Rank %d reporting load %.2f, running jobs %d\n", rank, status.load, status.running_jobs); if (rank == 0) { pthread_mutex_lock(&ns->mutex); ns->statuses[rank].running_jobs = status.running_jobs; ns->statuses[rank].load = status.load; pthread_mutex_unlock(&ns->mutex); for (int i = 1; i < size; i++) { NodeStatus recv; int ret = MPI_Recv(&recv, sizeof(NodeStatus), MPI_BYTE, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); printf("[agent] Received load %.2f, running jobs %d from node %d\n", recv.load, recv.running_jobs, recv.node_id); pthread_mutex_lock(&ns->mutex); ns->statuses[recv.node_id].running_jobs = recv.running_jobs; ns->statuses[recv.node_id].load = recv.load; pthread_mutex_unlock(&ns->mutex); } } else { int ret = MPI_Send(&status, sizeof(NodeStatus), MPI_BYTE, 0, 0, MPI_COMM_WORLD); } sleep(5); } MPI_Finalize(); return 0; }