diff options
Diffstat (limited to 'agent.c')
-rw-r--r-- | agent.c | 137 |
1 files changed, 73 insertions, 64 deletions
@@ -6,6 +6,8 @@ #include <sys/stat.h> #include <string.h> #include <pthread.h> +#include <sys/sysinfo.h> +#include <mpi.h> #include "shared.h" float get_load() { @@ -13,83 +15,90 @@ float get_load() { if (getloadavg(loads, 1) != -1) { return (float)loads[0]; } - return -1.0; #!!!!!!! + return -1.0; } -int main() { - -MPI_Init(&argc, &argv); - int rank; +int main(int argc, char *argv[]) { + MPI_Init(&argc, &argv); + int rank, size; MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &size); NodeStatus status; memset(&status, 0, sizeof(NodeStatus)); status.node_id = rank; - - #================================================= + status.lport = 7778; + + NodeStatusArray *ns = NULL; // only rank0 + int shm_fd; + if (rank == 0) { - int shm_fd = shm_open(SHM_NODE_STATUS, O_CREAT | O_RDWR, 0666); - if (shm_fd == -1) { - perror("shm_open"); - exit(1); - } - ftruncate(shm_fd, sizeof(NodeStatusArray)); - - NodeStatusArray *ns = mmap(NULL, sizeof(NodeStatusArray), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); - if (ns == MAP_FAILED) { - perror("mmap"); - exit(1); - } - - static int initialized = 0; - if (!initialized) { - pthread_mutexattr_t attr; - pthread_mutexattr_init(&attr); - pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); - pthread_mutex_init(&ns->mutex, &attr); - pthread_mutexattr_destroy(&attr); - for (int i = 0; i < MAX_NODES; i++) { - ns->statuses[i].node_id = i; - ns->statuses[i].running_jobs = 0; - ns->statuses[i].load = 0.0f; - } - initialized = 1; - } - - - - } - while (1) { - if (rank == 1) { - status.loadavg = get_cpu_loadavg(); - status.num_processes = 0; // done - printf("[agent] Rank %d reporting load %.2f\n", rank, status.loadavg); - //autocomplete - FILE *fp = popen("pgrep -f mpirun | wc -l", "r"); - if (fp) { - fscanf(fp, "%d", status.num_processes); - pclose(fp); - } else { - status.num_processes = -1; - } - MPI_Send(&status, sizeof(NodeStatus), MPI_BYTE, 0, 0, MPI_COMM_WORLD); - } else if (rank == 0) { - NodeStatus recv; - MPI_Recv(&recv, sizeof(NodeStatus), MPI_BYTE, 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - printf("[daemon] Received load %.2f from node %d\n", recv.loadavg, recv.node_id); - pthread_mutex_lock(&ns->mutex); - ns->statuses[recv.node_id]=recv; - pthread_mutex_unlock(&ns->mutex); - - + //shm + shm_fd = shm_open(SHM_NODE_STATUS, O_CREAT | O_RDWR, 0666); + + + ftruncate(shm_fd, sizeof(NodeStatusArray)); + + ns = mmap(NULL, sizeof(NodeStatusArray), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); + + + pthread_mutexattr_t attr; + pthread_mutexattr_init(&attr); + pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); + pthread_mutex_init(&ns->mutex, &attr); + pthread_mutexattr_destroy(&attr); + + for (int i = 0; i < MAX_NODES; i++) { + ns->statuses[i].node_id = i; + ns->statuses[i].running_jobs = 0; + ns->statuses[i].load = 0.0f; + ns->statuses[i].lport = 7778; } - sleep(5); } - MPI_Finalize(); - return 0; + MPI_Barrier(MPI_COMM_WORLD); + while (1) { + status.load = get_load(); + FILE *fp = popen("pgrep -f mpirun | wc -l", "r"); + if (fp) { + if (fscanf(fp, "%d", &status.running_jobs) != 1) { + status.running_jobs = -1; + } + pclose(fp); + } else { + status.running_jobs = -1; + } + + printf("[agent] Rank %d reporting load %.2f, running jobs %d\n", rank, status.load, status.running_jobs); + + if (rank == 0) { + pthread_mutex_lock(&ns->mutex); + ns->statuses[rank].running_jobs = status.running_jobs; + ns->statuses[rank].load = status.load; + pthread_mutex_unlock(&ns->mutex); + + for (int i = 1; i < size; i++) { + NodeStatus recv; + int ret = MPI_Recv(&recv, sizeof(NodeStatus), MPI_BYTE, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + + printf("[agent] Received load %.2f, running jobs %d from node %d\n", recv.load, recv.running_jobs, recv.node_id); + + pthread_mutex_lock(&ns->mutex); + ns->statuses[recv.node_id].running_jobs = recv.running_jobs; + ns->statuses[recv.node_id].load = recv.load; + pthread_mutex_unlock(&ns->mutex); + } + } else { + int ret = MPI_Send(&status, sizeof(NodeStatus), MPI_BYTE, 0, 0, MPI_COMM_WORLD); + } + + sleep(5); + } + + MPI_Finalize(); + return 0; } |