#include #include #include #include #include #include #include #include #include "shared.h" #define NODES 2 void *load_balance_thread(void *arg) { NodeStatusArray *status_array = (NodeStatusArray *)arg; while (1) { pthread_mutex_lock(&status_array->mutex); float total = 0; float max = -1, min = 1e9; int max_id = -1, min_id = -1; for (int i = 0; i < NODES; i++) { float load = status_array->statuses[i].load; total += load; if (load > max) { max = load; max_id = i; } if (load < min) { min = load; min_id = i; } } float avg = total / NODES; if ((max - min) > (avg / 2) && avg>0.1) { printf("[daemon] Triggering migration: max load node %d -> min load node %d\n", max_id, min_id); printf("[daemon] Threashold exceeded: (%6.2f - %6.2f) = %6.2f > ( %6.2f / 2) \n", max, min, (max - min), avg); char cmd[512]; status_array->statuses[min_id].lport++; status_array->statuses[max_id].lport--; snprintf(cmd, sizeof(cmd), "ssh node%d '~/dmtcp/bin/migrate_heavy_job.sh %d %d %d'", max_id + 1, min_id + 1, status_array->statuses[min_id].lport, status_array->statuses[max_id].lport); printf("[daemon] Running migration script on node%d: %s\n", max_id+1, cmd); system(cmd); } pthread_mutex_unlock(&status_array->mutex); sleep(60); } return NULL; } void run_job_on_node(const char *binary_path, int node_id, int port) { char cmd[512]; snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d ~/dmtcp/bin/dmtcp_coordinator --port %d --daemon", node_id, port); system(cmd); snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d ~/dmtcp/bin/dmtcp_launch --coord-port %d python3 ~/dmtcp/bin/%s &", node_id, port, binary_path); printf("[daemon] Disptached to node%d: %s\n", node_id, cmd); system(cmd); } int main() { int shm_fd = shm_open(SHM_NAME, O_CREAT | O_RDWR, 0666); ftruncate(shm_fd, sizeof(JobQueue)); JobQueue *queue = mmap(NULL, sizeof(JobQueue), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); int stat_fd = shm_open(SHM_NODE_STATUS, O_RDWR, 0666); NodeStatusArray *status_array = mmap(NULL, sizeof(NodeStatusArray), PROT_READ | PROT_WRITE, MAP_SHARED, stat_fd, 0); static int initialized = 0; if (!initialized) { pthread_mutexattr_t attr; pthread_mutexattr_init(&attr); pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); pthread_mutex_init(&queue->mutex, &attr); pthread_mutexattr_destroy(&attr); queue->head = queue->tail = 0; pthread_t load_thread; //LOADB_pthread pthread_create(&load_thread, NULL, load_balance_thread, status_array); initialized = 1; } int next_node_id = 0; while (1) { pthread_mutex_lock(&queue->mutex); while (queue->head != queue->tail) { int idx = queue->head; char job[MAX_PATH]; strncpy(job, queue->jobs[idx], MAX_PATH); int job_target_node = queue->job_nodes[idx]; int assigned_node = (job_target_node != -1) ? job_target_node : next_node_id; pthread_mutex_unlock(&queue->mutex); printf("[daemon] Dispatching job at index %d to node %d: %s\n", idx, assigned_node, job); status_array->statuses[assigned_node].lport++; printf("[DEBUG] Dispatching job at index %d to node %d: %s\n", idx, assigned_node, job); run_job_on_node(job, assigned_node + 1, status_array->statuses[assigned_node].lport); if (job_target_node == -1) { next_node_id = (next_node_id + 1) % NODES; // round-robin advance only if not user-specified } pthread_mutex_lock(&queue->mutex); queue->head = (queue->head + 1) % MAX_JOBS; } pthread_mutex_unlock(&queue->mutex); sleep(1); } return 0; }