diff --git a/lib/pscom/pscom_con.c b/lib/pscom/pscom_con.c index 5d12ffeb..17c96900 100644 --- a/lib/pscom/pscom_con.c +++ b/lib/pscom/pscom_con.c @@ -387,6 +387,8 @@ pscom_con_t *pscom_con_create(pscom_sock_t *sock) INIT_LIST_HEAD(&con->poll_reader.next); INIT_LIST_HEAD(&con->poll_next_send); + INIT_LIST_HEAD(&con->shutdown_requested); + con->con_guard.fd = -1; con->precon = NULL; con->in.req = 0; diff --git a/lib/pscom/pscom_migrate.c b/lib/pscom/pscom_migrate.c index e42e2423..e4a112dd 100644 --- a/lib/pscom/pscom_migrate.c +++ b/lib/pscom/pscom_migrate.c @@ -65,7 +65,7 @@ pscom_str_replace(char *search_str, char *replace_str, char *str) static int pscom_suspend_plugins(void) -{ +{ struct list_head *pos_sock; struct list_head *pos_con; int arch; @@ -82,17 +82,17 @@ int pscom_suspend_plugins(void) /* suspend listen FD: */ pscom_suspend_listen(&sock->pub); - /* iterate over all connections */ + /* iterate over all connections and post suspend request */ struct list_head *tmp_con; list_for_each_safe(pos_con, tmp_con, &sock->connections) { pscom_con_t *con = list_entry(pos_con, - pscom_con_t, + pscom_con_t, next); - + /* determine corresponding plugin */ arch = PSCOM_CON_TYPE2ARCH(con->pub.type); plugin = pscom_plugin_by_archid(arch); - + /* suspend all still pending on-demand connections, too */ if(con->pub.type == PSCOM_CON_TYPE_ONDEMAND) { con->read_suspend(con); @@ -104,25 +104,53 @@ int pscom_suspend_plugins(void) continue; /* shutdown the connection if not migratable */ - if (plugin->properties & + if (plugin->properties & PSCOM_PLUGIN_PROP_NOT_MIGRATABLE) { - - pscom_con_shutdown(con); - - /* wait for response */ - while ( (con->read_is_suspended == 0) || (con->write_is_suspended == 0) ) { - con->read_start(con); - con->write_start(con); - pscom_test_any(); - } + + DPRINT(3, "%s:%u: send REQ to %s (type: %s)", + __FILE__, + __LINE__, + pscom_con_info_str(&con->pub.remote_con_info), + pscom_con_type_str(con->pub.type)); + pscom_con_shutdown(con); + list_add_tail(&con->shutdown_requested, &sock->shutdown_connections); + + } + } + } + + /* wait for all connections to be suspended */ + list_for_each(pos_sock, &pscom.sockets) { + pscom_sock_t *sock = list_entry(pos_sock, pscom_sock_t, next); + + /* iterate over all connections and post suspend request */ + struct list_head *tmp_con; + list_for_each_safe(pos_con, tmp_con, &sock->shutdown_connections) { + pscom_con_t *con = list_entry(pos_con, + pscom_con_t, + shutdown_requested); + + DPRINT(3, "%s:%u: wait for ACK from %s (type: %s)", + __FILE__, + __LINE__, + pscom_con_info_str(&con->pub.remote_con_info), + pscom_con_type_str(con->pub.type)); + + while ( (con->read_is_suspended == 0) || (con->write_is_suspended == 0) ) { + con->read_start(con); + con->write_start(con); + pscom_test_any(); } + + list_del_init(&con->shutdown_requested); } + assert(list_empty(&sock->shutdown_connections)); } + /* * Shutdown non-migratable plugins */ - DPRINT(1, "%s %u: Find non-migratable plugins ...", __FILE__, __LINE__); struct list_head *pos_plugin; list_for_each(pos_plugin, &pscom_plugins) { @@ -130,17 +158,17 @@ int pscom_suspend_plugins(void) if ((plugin->properties & PSCOM_PLUGIN_PROP_NOT_MIGRATABLE) && (plugin->destroy)) { - DPRINT(1, - "%s %u: Destroying '%s' ...", - __FILE__, + DPRINT(1, + "%s %u: Destroying '%s' ...", + __FILE__, __LINE__, plugin->name); if(plugin->destroy) { plugin->destroy(); } - DPRINT(1, - "%s %u: Successfully destroyed '%s'!", - __FILE__, + DPRINT(1, + "%s %u: Successfully destroyed '%s'!", + __FILE__, __LINE__, plugin->name); } @@ -178,10 +206,10 @@ int pscom_resume_plugins(void) /* iterate over all connections */ list_for_each(pos_con, &sock->connections) { pscom_con_t *con = list_entry(pos_con, - pscom_con_t, + pscom_con_t, next); /* resume connections */ - pscom_resume_connection(&con->pub); + pscom_resume_connection(&con->pub); } } @@ -198,8 +226,8 @@ int pscom_resume_non_migratable_plugins(void) } static -void pscom_message_callback(struct mosquitto *mosquitto_client, - void *arg, +void pscom_message_callback(struct mosquitto *mosquitto_client, + void *arg, const struct mosquitto_message *message) { int pid = -1; @@ -210,11 +238,11 @@ void pscom_message_callback(struct mosquitto *mosquitto_client, if ((char*)message->payload) { strcpy(payload, (char*)message->payload); - if (!strcmp(payload, "*")) { + if (!strcmp(payload, "*") || !strcmp(payload, "resume") || !strcmp(payload, "suspend")) { pid = -2; } else { msg = strtok(payload, " "); - + while (msg) { sscanf(msg, "%d", &pid); if (pid == my_pid) @@ -225,7 +253,7 @@ void pscom_message_callback(struct mosquitto *mosquitto_client, } else { pid = -2; } - + if (pid == my_pid || pid == -2) { @@ -263,10 +291,10 @@ void pscom_message_callback(struct mosquitto *mosquitto_client, // assert(0); break; - default: + default: DPRINT(1, "%s %d: ERROR: Unknown migration state (%d). " - "Abort!", - __FILE__, __LINE__, + "Abort!", + __FILE__, __LINE__, pscom.migration_state); assert(0); } @@ -298,7 +326,7 @@ void pscom_report_to_migfra(const char *status) strerror(errno)); exit(-1); } - + /* reset migration state */ pscom.migration_state = PSCOM_MIGRATION_INACTIVE; } @@ -348,11 +376,11 @@ void pscom_migration_handle_shutdown_req(void) false); if (err != MOSQ_ERR_SUCCESS) { fprintf(stderr, "%s %d: ERROR: Could not publish on '%s' - %d" - "(%d [%s])", + "(%d [%s])", __FILE__, __LINE__, PSCOM_MOSQUITTO_RESP_TOPIC, err, - errno, + errno, strerror(errno)); exit(-1); } @@ -362,7 +390,7 @@ void pscom_migration_handle_shutdown_req(void) sched_yield(); } - /* resume the connections now */ + /* resume the connections now */ pscom_migration_handle_resume_req(); } @@ -373,7 +401,7 @@ int pscom_migration_init(void) /* leave if feature should be disabled */ if (pscom.env.suspend_resume == 0) return 0; - + /* initialize libmosquitto */ if (mosquitto_lib_init() != MOSQ_ERR_SUCCESS) { DPRINT(1, "%s %d: ERROR: Could not init libmosquitto ", @@ -381,24 +409,24 @@ int pscom_migration_init(void) return PSCOM_ERR_STDERROR; } - /* create a new mosquitto client */ + /* create a new mosquitto client */ char client_name[PSCOM_MOSQUITTO_CLIENT_NAME_LENGTH]; char my_pid[10]; sprintf(my_pid, "_%d", getpid()); gethostname(client_name, PSCOM_MOSQUITTO_CLIENT_NAME_LENGTH); strcat(client_name, my_pid); - pscom_mosquitto_client = mosquitto_new(client_name, + pscom_mosquitto_client = mosquitto_new(client_name, true, NULL); if (pscom_mosquitto_client == NULL) { DPRINT(1, "%s %d: ERROR: Could create new mosquitto client " - "instance (%d [%s])", + "instance (%d [%s])", __FILE__, __LINE__, - errno, + errno, strerror(errno)); return PSCOM_ERR_STDERROR; } - + /* connect to mosquitto broker in BLOCKING manner */ int err; err = mosquitto_connect(pscom_mosquitto_client, @@ -407,10 +435,10 @@ int pscom_migration_init(void) PSCOM_KEEP_ALIVE_INT); if ( err != MOSQ_ERR_SUCCESS) { DPRINT(1, "%s %d: ERROR: Could connect to the broker - %d" - "(%d [%s])", + "(%d [%s])", __FILE__, __LINE__, err, - errno, + errno, strerror(errno)); return PSCOM_ERR_STDERROR; } else { @@ -436,11 +464,11 @@ int pscom_migration_init(void) 0); if (err != MOSQ_ERR_SUCCESS) { DPRINT(1, "%s %d: ERROR: Could not subscribe to '%s' - %d" - "(%d [%s])", + "(%d [%s])", __FILE__, __LINE__, pscom_mosquitto_req_topic, err, - errno, + errno, strerror(errno)); return PSCOM_ERR_STDERROR; } @@ -451,7 +479,7 @@ int pscom_migration_init(void) /* set the subscription callback */ mosquitto_message_callback_set(pscom_mosquitto_client, &pscom_message_callback); - + /* start the communication loop */ err = mosquitto_loop_start(pscom_mosquitto_client); if ( err != MOSQ_ERR_SUCCESS) { @@ -470,17 +498,17 @@ int pscom_migration_cleanup(void) { int err; - /* unsubscribe from the migration command topic */ + /* unsubscribe from the migration command topic */ err = mosquitto_unsubscribe(pscom_mosquitto_client, NULL, PSCOM_MOSQUITTO_REQ_TOPIC); if (err != MOSQ_ERR_SUCCESS) { DPRINT(1, "%s %d: ERROR: Could not unsubscribe from '%s' - %d" - "(%d [%s])", + "(%d [%s])", __FILE__, __LINE__, PSCOM_MOSQUITTO_REQ_TOPIC, err, - errno, + errno, strerror(errno)); return PSCOM_ERR_STDERROR; } @@ -506,7 +534,7 @@ int pscom_migration_cleanup(void) return PSCOM_ERR_STDERROR; } - /* destroy the mosquitto client */ + /* destroy the mosquitto client */ mosquitto_destroy(pscom_mosquitto_client); /* cleanup libmosquitto */ diff --git a/lib/pscom/pscom_migrate.h b/lib/pscom/pscom_migrate.h index 84205883..7ca4216c 100644 --- a/lib/pscom/pscom_migrate.h +++ b/lib/pscom/pscom_migrate.h @@ -24,7 +24,7 @@ #define PSCOM_MOSQUITTO_TOPIC_LENGTH 50 #define PSCOM_MOSQUITTO_REQ_TOPIC "fast/pscom///request" #define PSCOM_MOSQUITTO_RESP_TOPIC "fast/pscom///response" -#define PSCOM_BROKER_HOST "devon" +#define PSCOM_BROKER_HOST "zerberus" #define PSCOM_BROKER_PORT 1883 #define PSCOM_KEEP_ALIVE_INT 60 diff --git a/lib/pscom/pscom_priv.h b/lib/pscom/pscom_priv.h index 87768b48..980e32f3 100644 --- a/lib/pscom/pscom_priv.h +++ b/lib/pscom/pscom_priv.h @@ -291,6 +291,8 @@ struct PSCOM_con pscom_poll_reader_t poll_reader; struct list_head poll_next_send; // used by pscom.poll_sender + struct list_head shutdown_requested; // user by S/R protocol + struct con_guard con_guard; // connection guard struct { @@ -346,6 +348,8 @@ struct PSCOM_sock struct list_head groups; // List of pscom_group_t.next struct list_head group_req_unknown; // List of pscom_req_t.next (requests with unknown group id) + struct list_head shutdown_connections; // List of pscom_con_t.shutdown_requested + struct pscom_listener { ufd_info_t ufd_info; // TCP listen for new connections unsigned usercnt; // Count the users of the listening fd. (keep fd open, if > 0) diff --git a/lib/pscom/pscom_sock.c b/lib/pscom/pscom_sock.c index c05dd983..56e6ef62 100644 --- a/lib/pscom/pscom_sock.c +++ b/lib/pscom/pscom_sock.c @@ -154,6 +154,7 @@ pscom_sock_t *pscom_sock_create(unsigned int userdata_size) INIT_LIST_HEAD(&sock->group_req_unknown); INIT_LIST_HEAD(&sock->pendingioq); INIT_LIST_HEAD(&sock->sendq_suspending); + INIT_LIST_HEAD(&sock->shutdown_connections); sock->recv_req_cnt_any = 0;