summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xcode/agentbin18056 -> 0 bytes
-rwxr-xr-xcode/daemon2bin17888 -> 0 bytes
-rw-r--r--daemon2.c206
3 files changed, 0 insertions, 206 deletions
diff --git a/code/agent b/code/agent
deleted file mode 100755
index f4cf8c6..0000000
--- a/code/agent
+++ /dev/null
Binary files differ
diff --git a/code/daemon2 b/code/daemon2
deleted file mode 100755
index 66c3582..0000000
--- a/code/daemon2
+++ /dev/null
Binary files differ
diff --git a/daemon2.c b/daemon2.c
deleted file mode 100644
index bb1465e..0000000
--- a/daemon2.c
+++ /dev/null
@@ -1,206 +0,0 @@
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h>
-#include <fcntl.h>
-#include <sys/mman.h>
-#include <sys/stat.h>
-#include <pthread.h>
-#include "shared.h"
-
-#define NODES 2
-
-void *load_balance_thread(void *arg) {
- NodeStatusArray *status_array = (NodeStatusArray *)arg;
- while (1) {
- pthread_mutex_lock(&status_array->mutex);
- printf("[daemon] Acquired mutex for load balancing on %s\n", getenv("HOSTNAME") ? getenv("HOSTNAME") : "unknown");
-
- float total = 0;
- float max = -1, min = 1e9;
- int max_id = -1, min_id = -1;
-
- for (int i = 0; i < NODES; i++) {
- float load = status_array->statuses[i].load;
- total += load;
- if (load > max) {
- max = load;
- max_id = i;
- }
- if (load < min) {
- min = load;
- min_id = i;
- }
- }
-
- float avg = total / NODES;
- char cmd[512] = {0};
- int new_min_port = -1;
- if ((max - min) > (avg / 2) && max_id != -1 && min_id != -1 && (max - min)>0.3) {
- printf("[daemon] Triggering migration: max load node %d -> min load node %d\n", max_id, min_id);
- new_min_port = status_array->statuses[min_id].lport + 1;
- status_array->statuses[min_id].lport = new_min_port;
- }
-
- pthread_mutex_unlock(&status_array->mutex);
- printf("[daemon] Released mutex for load balancing on %s\n", getenv("HOSTNAME") ? getenv("HOSTNAME") : "unknown");
-
- if (new_min_port != -1) {
- int ret = snprintf(cmd, sizeof(cmd),
- "mpirun -np 1 -host node%d ~/dmtcp/bin/migrate_heavy_job2.sh %d %d %d",
- max_id+1, min_id+1, new_min_port, new_min_port);
- if (ret >= sizeof(cmd)) {
- fprintf(stderr, "[daemon] Command truncated in load_balance_thread\n");
- } else {
- printf("[daemon] Running migration script on node%d: %s\n", max_id, cmd);
- if (system(cmd) != 0) {
- fprintf(stderr, "[daemon] Migration script failed on node%d\n", max_id);
- }
- }
- }
-
- sleep(60);
- }
- return NULL;
-}
-
-void run_job_on_node(const char *binary_path, int node_id, int port) {
- char cmd[512];
- int ret = snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d ~/dmtcp/bin/dmtcp_coordinator --port %d --daemon", node_id, port);
- if (ret >= sizeof(cmd)) {
- fprintf(stderr, "[daemon] Command truncated in run_job_on_node (coordinator)\n");
- return;
- }
- if (system(cmd) != 0) {
- fprintf(stderr, "[daemon] Failed to start dmtcp_coordinator on node%d\n", node_id);
- }
-
- ret = snprintf(cmd, sizeof(cmd), "mpirun -np 1 -host node%d ~/dmtcp/bin/dmtcp_launch --coord-port %d %s &", node_id, port, binary_path);
- if (ret >= sizeof(cmd)) {
- fprintf(stderr, "[daemon] Command truncated in run_job_on_node (launch)\n");
- return;
- }
- printf("[daemon] Running on node%d: %s\n", node_id, cmd);
- if (system(cmd) != 0) {
- fprintf(stderr, "[daemon] Failed to launch job on node%d\n", node_id);
- }
-}
-
-int main() {
- // Open or create job queue shared memory
- int shm_fd = shm_open(SHM_NAME, O_CREAT | O_RDWR, 0666);
- if (shm_fd == -1) {
- perror("[daemon] shm_open SHM_NAME");
- return 1;
- }
- if (ftruncate(shm_fd, sizeof(JobQueue)) == -1) {
- perror("[daemon] ftruncate SHM_NAME");
- close(shm_fd);
- return 1;
- }
-
- JobQueue *queue = mmap(NULL, sizeof(JobQueue), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
- if (queue == MAP_FAILED) {
- perror("[daemon] mmap SHM_NAME");
- close(shm_fd);
- return 1;
- }
-
- // Open node status shared memory (created by agent.c rank 0)
- int stat_fd = shm_open(SHM_NODE_STATUS, O_RDWR, 0666);
- if (stat_fd == -1) {
- perror("[daemon] shm_open NODE_STATUS_SHM");
- munmap(queue, sizeof(JobQueue));
- close(shm_fd);
- return 1;
- }
-
- NodeStatusArray *status_array = mmap(NULL, sizeof(NodeStatusArray), PROT_READ | PROT_WRITE, MAP_SHARED, stat_fd, 0);
- if (status_array == MAP_FAILED) {
- perror("[daemon] mmap NODE_STATUS_SHM");
- close(stat_fd);
- munmap(queue, sizeof(JobQueue));
- close(shm_fd);
- return 1;
- }
-
- static int initialized = 0;
- if (!initialized) {
- pthread_mutexattr_t attr;
- pthread_mutexattr_init(&attr);
- pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
- if (pthread_mutex_init(&queue->mutex, &attr) != 0) {
- perror("[daemon] queue mutex initialization failed");
- munmap(status_array, sizeof(NodeStatusArray));
- close(stat_fd);
- munmap(queue, sizeof(JobQueue));
- close(shm_fd);
- return 1;
- }
- pthread_mutexattr_destroy(&attr);
- queue->head = queue->tail = 0;
-
- // Initialize job_node array
- for (int i = 0; i < MAX_JOBS; i++) {
- queue->job_nodes[i] = -1;
- }
- initialized = 1;
- }
-
- // Start load balancing thread
- pthread_t load_thread;
- if (pthread_create(&load_thread, NULL, load_balance_thread, status_array) != 0) {
- perror("[daemon] pthread_create");
- munmap(status_array, sizeof(NodeStatusArray));
- close(stat_fd);
- munmap(queue, sizeof(JobQueue));
- close(shm_fd);
- return 1;
- }
-
- int next_node_id = 0;
-
- while (1) {
- pthread_mutex_lock(&queue->mutex);
- while (queue->head != queue->tail) {
- int idx = queue->head;
- char job[MAX_PATH];
- strncpy(job, queue->jobs[idx], MAX_PATH - 1);
- job[MAX_PATH - 1] = '\0'; // Ensure null-termination
- queue->job_nodes[idx] = next_node_id + 1; // Assign node to job
-
- queue->head = (queue->head + 1) % MAX_JOBS;
- pthread_mutex_unlock(&queue->mutex);
-
- printf("[daemon] Dispatching job at index %d to node %d: %s\n", idx, next_node_id + 1, job);
- pthread_mutex_lock(&status_array->mutex);
- status_array->statuses[next_node_id].lport++;
- status_array->statuses[next_node_id].running_jobs++;
- pthread_mutex_unlock(&status_array->mutex);
-
- run_job_on_node(job, next_node_id + 1, status_array->statuses[next_node_id].lport);
-
- next_node_id = (next_node_id + 1) % NODES;
-
- pthread_mutex_lock(&queue->mutex);
- }
- pthread_mutex_unlock(&queue->mutex);
- sleep(1);
- }
-
- // Cleanup (unreachable due to infinite loop)
- pthread_join(load_thread, NULL);
- if (munmap(status_array, sizeof(NodeStatusArray)) == -1) {
- perror("[daemon] munmap NODE_STATUS_SHM");
- }
- if (close(stat_fd) == -1) {
- perror("[daemon] close NODE_STATUS_SHM");
- }
- if (munmap(queue, sizeof(JobQueue)) == -1) {
- perror("[daemon] munmap SHM_NAME");
- }
- if (close(shm_fd) == -1) {
- perror("[daemon] close SHM_NAME");
- }
- return 0;
-}