#include #include #include #include #include #include #include #include #include "shared.h" float get_load() { double loads[1]; if (getloadavg(loads, 1) != -1) { return (float)loads[0]; } return -1.0; #!!!!!!! } int main() { MPI_Init(&argc, &argv); int rank; MPI_Comm_rank(MPI_COMM_WORLD, &rank); NodeStatus status; memset(&status, 0, sizeof(NodeStatus)); status.node_id = rank; #================================================= if (rank == 0) { int shm_fd = shm_open(SHM_NODE_STATUS, O_CREAT | O_RDWR, 0666); if (shm_fd == -1) { perror("shm_open"); exit(1); } ftruncate(shm_fd, sizeof(NodeStatusArray)); NodeStatusArray *ns = mmap(NULL, sizeof(NodeStatusArray), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); if (ns == MAP_FAILED) { perror("mmap"); exit(1); } static int initialized = 0; if (!initialized) { pthread_mutexattr_t attr; pthread_mutexattr_init(&attr); pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); pthread_mutex_init(&ns->mutex, &attr); pthread_mutexattr_destroy(&attr); for (int i = 0; i < MAX_NODES; i++) { ns->statuses[i].node_id = i; ns->statuses[i].running_jobs = 0; ns->statuses[i].load = 0.0f; } initialized = 1; } } while (1) { if (rank == 1) { status.loadavg = get_cpu_loadavg(); status.num_processes = 0; // done printf("[agent] Rank %d reporting load %.2f\n", rank, status.loadavg); //autocomplete FILE *fp = popen("pgrep -f mpirun | wc -l", "r"); if (fp) { fscanf(fp, "%d", status.num_processes); pclose(fp); } else { status.num_processes = -1; } MPI_Send(&status, sizeof(NodeStatus), MPI_BYTE, 0, 0, MPI_COMM_WORLD); } else if (rank == 0) { NodeStatus recv; MPI_Recv(&recv, sizeof(NodeStatus), MPI_BYTE, 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); printf("[daemon] Received load %.2f from node %d\n", recv.loadavg, recv.node_id); pthread_mutex_lock(&ns->mutex); ns->statuses[recv.node_id]=recv; pthread_mutex_unlock(&ns->mutex); } sleep(5); } MPI_Finalize(); return 0; }