summaryrefslogtreecommitdiff
path: root/daemon.c
diff options
context:
space:
mode:
Diffstat (limited to 'daemon.c')
-rw-r--r--daemon.c111
1 files changed, 55 insertions, 56 deletions
diff --git a/daemon.c b/daemon.c
index 5c50f94..bd0fa2c 100644
--- a/daemon.c
+++ b/daemon.c
@@ -8,9 +8,11 @@
#include <pthread.h>
#include "shared.h"
-#define NODES 4
+#define NODES 2
void *load_balance_thread(void *arg) {
+ NodeStatusArray *status_array = (NodeStatusArray *)arg;
+
while (1) {
pthread_mutex_lock(&status_array->mutex);
@@ -32,57 +34,51 @@ void *load_balance_thread(void *arg) {
}
float avg = total / NODES;
- if ((max - min) > (avg / 2)) {
+ if ((max - min) > (avg / 2) && avg>0.1) {
printf("[daemon] Triggering migration: max load node %d -> min load node %d\n", max_id, min_id);
- // Call external script/function to checkpoint, scp, and restart
- char cmd[512];
- snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d checkpointandbalance.sh %d &", min_id, max_id);
- printf("[daemon] Running on node%d: %s\n", node_id, cmd);
- system(cmd);
- char cmd[512];
- snprintf(cmd, sizeof(cmd), "ssh node%d \"./migrate_heavy_job.sh %d %d\"", max_id, max_id, min_id);
+ printf("[daemon] Threashold exceeded: (%6.2f - %6.2f) = %6.2f > ( %6.2f / 2) \n", max, min, (max - min), avg);
+ char cmd[512];
+
+ status_array->statuses[min_id].lport++;
+ status_array->statuses[max_id].lport--;
+ snprintf(cmd, sizeof(cmd),
+ "ssh node%d '~/dmtcp/bin/migrate_heavy_job.sh %d %d %d'",
+ max_id + 1, min_id + 1, status_array->statuses[min_id].lport, status_array->statuses[max_id].lport);
+ printf("[daemon] Running migration script on node%d: %s\n", max_id+1, cmd);
system(cmd);
- }
- pthread_mutex_unlock(&status_array->mutex);
- sleep(60);
+
}
- return NULL;
+ pthread_mutex_unlock(&status_array->mutex);
+
+ sleep(60);
}
+ return NULL;
+}
-void run_job_on_node(const char *binary_path, int node_id) {
+void run_job_on_node(const char *binary_path, int node_id, int port) {
char cmd[512];
- snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d %s &", node_id, binary_path);
- printf("[daemon] Running on node%d: %s\n", node_id, cmd);
+ snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d ~/dmtcp/bin/dmtcp_coordinator --port %d --daemon", node_id, port);
+ system(cmd);
+ snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d ~/dmtcp/bin/dmtcp_launch --coord-port %d python3 ~/dmtcp/bin/%s &", node_id, port, binary_path);
+ printf("[daemon] Disptached to node%d: %s\n", node_id, cmd);
system(cmd);
}
int main() {
int shm_fd = shm_open(SHM_NAME, O_CREAT | O_RDWR, 0666);
- if (shm_fd == -1) {
- perror("shm_open");
- return 1;
- }
+
ftruncate(shm_fd, sizeof(JobQueue));
JobQueue *queue = mmap(NULL, sizeof(JobQueue), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
- if (queue == MAP_FAILED) {
- perror("mmap");
- return 1;
- }
- int stat_fd = shm_open(NODE_STATUS_SHM, O_RDWR, 0666);
- if (stat_fd == -1) {
- perror("shm_open NODE_STATUS_SHM");
- return 1;
- }
+
+ int stat_fd = shm_open(SHM_NODE_STATUS, O_RDWR, 0666);
+
NodeStatusArray *status_array = mmap(NULL, sizeof(NodeStatusArray), PROT_READ | PROT_WRITE, MAP_SHARED, stat_fd, 0);
- if (status_array == MAP_FAILED) {
- perror("mmap NODE_STATUS_SHM");
- return 1;
- }
+
static int initialized = 0;
if (!initialized) {
@@ -93,39 +89,42 @@ int main() {
pthread_mutexattr_destroy(&attr);
queue->head = queue->tail = 0;
- // Initialize job_node array
- for (int i = 0; i < MAX_JOBS; i++) {
- queue->job_node[i] = -1;
- }
+
+
+ pthread_t load_thread; //LOADB_pthread
+ pthread_create(&load_thread, NULL, load_balance_thread, status_array);
initialized = 1;
}
int next_node_id = 0;
- while (1) {
- pthread_mutex_lock(&queue->mutex);
- while (queue->head != queue->tail) {
- int idx = queue->head;
- char job[MAX_PATH];
- strncpy(job, queue->jobs[idx], MAX_PATH);
-
- // Assign node to job
- queue->job_node[idx] = next_node_id;
+while (1) {
+ pthread_mutex_lock(&queue->mutex);
+ while (queue->head != queue->tail) {
+ int idx = queue->head;
+ char job[MAX_PATH];
+ strncpy(job, queue->jobs[idx], MAX_PATH);
- // Move head forward in the queue
- queue->head = (queue->head + 1) % MAX_JOBS;
+ int job_target_node = queue->job_nodes[idx];
+ int assigned_node = (job_target_node != -1) ? job_target_node : next_node_id;
- pthread_mutex_unlock(&queue->mutex);
-
- printf("[daemon] Dispatching job at index %d to node %d: %s\n", idx, next_node_id, job);
- run_job_on_node(job, next_node_id);
+ pthread_mutex_unlock(&queue->mutex);
- next_node_id = (next_node_id + 1) % NODES;
+ printf("[daemon] Dispatching job at index %d to node %d: %s\n", idx, assigned_node, job);
+ status_array->statuses[assigned_node].lport++;
+ printf("[DEBUG] Dispatching job at index %d to node %d: %s\n", idx, assigned_node, job);
+ run_job_on_node(job, assigned_node + 1, status_array->statuses[assigned_node].lport);
- pthread_mutex_lock(&queue->mutex);
+ if (job_target_node == -1) {
+ next_node_id = (next_node_id + 1) % NODES; // round-robin advance only if not user-specified
}
- pthread_mutex_unlock(&queue->mutex);
- sleep(1);
+
+ pthread_mutex_lock(&queue->mutex);
+ queue->head = (queue->head + 1) % MAX_JOBS;
}
+ pthread_mutex_unlock(&queue->mutex);
+ sleep(1);
+}
+
return 0;
}