Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/pscom/pscom_con.c
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,8 @@ pscom_con_t *pscom_con_create(pscom_sock_t *sock)
INIT_LIST_HEAD(&con->poll_reader.next);
INIT_LIST_HEAD(&con->poll_next_send);

INIT_LIST_HEAD(&con->shutdown_requested);

con->con_guard.fd = -1;
con->precon = NULL;
con->in.req = 0;
Expand Down
130 changes: 79 additions & 51 deletions lib/pscom/pscom_migrate.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pscom_str_replace(char *search_str, char *replace_str, char *str)

static
int pscom_suspend_plugins(void)
{
{
struct list_head *pos_sock;
struct list_head *pos_con;
int arch;
Expand All @@ -82,17 +82,17 @@ int pscom_suspend_plugins(void)
/* suspend listen FD: */
pscom_suspend_listen(&sock->pub);

/* iterate over all connections */
/* iterate over all connections and post suspend request */
struct list_head *tmp_con;
list_for_each_safe(pos_con, tmp_con, &sock->connections) {
pscom_con_t *con = list_entry(pos_con,
pscom_con_t,
pscom_con_t,
next);

/* determine corresponding plugin */
arch = PSCOM_CON_TYPE2ARCH(con->pub.type);
plugin = pscom_plugin_by_archid(arch);

/* suspend all still pending on-demand connections, too */
if(con->pub.type == PSCOM_CON_TYPE_ONDEMAND) {
con->read_suspend(con);
Expand All @@ -104,43 +104,71 @@ int pscom_suspend_plugins(void)
continue;

/* shutdown the connection if not migratable */
if (plugin->properties &
if (plugin->properties &
PSCOM_PLUGIN_PROP_NOT_MIGRATABLE) {

pscom_con_shutdown(con);

/* wait for response */
while ( (con->read_is_suspended == 0) || (con->write_is_suspended == 0) ) {
con->read_start(con);
con->write_start(con);
pscom_test_any();
}

DPRINT(3, "%s:%u: send REQ to %s (type: %s)",
__FILE__,
__LINE__,
pscom_con_info_str(&con->pub.remote_con_info),
pscom_con_type_str(con->pub.type));
pscom_con_shutdown(con);
list_add_tail(&con->shutdown_requested, &sock->shutdown_connections);

}
}
}

/* wait for all connections to be suspended */
list_for_each(pos_sock, &pscom.sockets) {
pscom_sock_t *sock = list_entry(pos_sock, pscom_sock_t, next);

/* iterate over all connections and post suspend request */
struct list_head *tmp_con;
list_for_each_safe(pos_con, tmp_con, &sock->shutdown_connections) {
pscom_con_t *con = list_entry(pos_con,
pscom_con_t,
shutdown_requested);

DPRINT(3, "%s:%u: wait for ACK from %s (type: %s)",
__FILE__,
__LINE__,
pscom_con_info_str(&con->pub.remote_con_info),
pscom_con_type_str(con->pub.type));

while ( (con->read_is_suspended == 0) || (con->write_is_suspended == 0) ) {
con->read_start(con);
con->write_start(con);
pscom_test_any();
}

list_del_init(&con->shutdown_requested);
}
assert(list_empty(&sock->shutdown_connections));
}


/*
* Shutdown non-migratable plugins
*/

DPRINT(1, "%s %u: Find non-migratable plugins ...", __FILE__, __LINE__);
struct list_head *pos_plugin;
list_for_each(pos_plugin, &pscom_plugins) {
plugin = list_entry(pos_plugin, pscom_plugin_t, next);

if ((plugin->properties & PSCOM_PLUGIN_PROP_NOT_MIGRATABLE) &&
(plugin->destroy)) {
DPRINT(1,
"%s %u: Destroying '%s' ...",
__FILE__,
DPRINT(1,
"%s %u: Destroying '%s' ...",
__FILE__,
__LINE__,
plugin->name);
if(plugin->destroy) {
plugin->destroy();
}
DPRINT(1,
"%s %u: Successfully destroyed '%s'!",
__FILE__,
DPRINT(1,
"%s %u: Successfully destroyed '%s'!",
__FILE__,
__LINE__,
plugin->name);
}
Expand Down Expand Up @@ -178,10 +206,10 @@ int pscom_resume_plugins(void)
/* iterate over all connections */
list_for_each(pos_con, &sock->connections) {
pscom_con_t *con = list_entry(pos_con,
pscom_con_t,
pscom_con_t,
next);
/* resume connections */
pscom_resume_connection(&con->pub);
pscom_resume_connection(&con->pub);
}
}

Expand All @@ -198,8 +226,8 @@ int pscom_resume_non_migratable_plugins(void)
}

static
void pscom_message_callback(struct mosquitto *mosquitto_client,
void *arg,
void pscom_message_callback(struct mosquitto *mosquitto_client,
void *arg,
const struct mosquitto_message *message)
{
int pid = -1;
Expand All @@ -210,11 +238,11 @@ void pscom_message_callback(struct mosquitto *mosquitto_client,
if ((char*)message->payload) {
strcpy(payload, (char*)message->payload);

if (!strcmp(payload, "*")) {
if (!strcmp(payload, "*") || !strcmp(payload, "resume") || !strcmp(payload, "suspend")) {
pid = -2;
} else {
msg = strtok(payload, " ");

while (msg) {
sscanf(msg, "%d", &pid);
if (pid == my_pid)
Expand All @@ -225,7 +253,7 @@ void pscom_message_callback(struct mosquitto *mosquitto_client,
} else {
pid = -2;
}


if (pid == my_pid || pid == -2) {

Expand Down Expand Up @@ -263,10 +291,10 @@ void pscom_message_callback(struct mosquitto *mosquitto_client,
// assert(0);
break;

default:
default:
DPRINT(1, "%s %d: ERROR: Unknown migration state (%d). "
"Abort!",
__FILE__, __LINE__,
"Abort!",
__FILE__, __LINE__,
pscom.migration_state);
assert(0);
}
Expand Down Expand Up @@ -298,7 +326,7 @@ void pscom_report_to_migfra(const char *status)
strerror(errno));
exit(-1);
}

/* reset migration state */
pscom.migration_state = PSCOM_MIGRATION_INACTIVE;
}
Expand Down Expand Up @@ -348,11 +376,11 @@ void pscom_migration_handle_shutdown_req(void)
false);
if (err != MOSQ_ERR_SUCCESS) {
fprintf(stderr, "%s %d: ERROR: Could not publish on '%s' - %d"
"(%d [%s])",
"(%d [%s])",
__FILE__, __LINE__,
PSCOM_MOSQUITTO_RESP_TOPIC,
err,
errno,
errno,
strerror(errno));
exit(-1);
}
Expand All @@ -362,7 +390,7 @@ void pscom_migration_handle_shutdown_req(void)
sched_yield();
}

/* resume the connections now */
/* resume the connections now */
pscom_migration_handle_resume_req();
}

Expand All @@ -373,32 +401,32 @@ int pscom_migration_init(void)
/* leave if feature should be disabled */
if (pscom.env.suspend_resume == 0)
return 0;

/* initialize libmosquitto */
if (mosquitto_lib_init() != MOSQ_ERR_SUCCESS) {
DPRINT(1, "%s %d: ERROR: Could not init libmosquitto ",
__FILE__, __LINE__);
return PSCOM_ERR_STDERROR;
}

/* create a new mosquitto client */
/* create a new mosquitto client */
char client_name[PSCOM_MOSQUITTO_CLIENT_NAME_LENGTH];
char my_pid[10];
sprintf(my_pid, "_%d", getpid());
gethostname(client_name, PSCOM_MOSQUITTO_CLIENT_NAME_LENGTH);
strcat(client_name, my_pid);
pscom_mosquitto_client = mosquitto_new(client_name,
pscom_mosquitto_client = mosquitto_new(client_name,
true,
NULL);
if (pscom_mosquitto_client == NULL) {
DPRINT(1, "%s %d: ERROR: Could create new mosquitto client "
"instance (%d [%s])",
"instance (%d [%s])",
__FILE__, __LINE__,
errno,
errno,
strerror(errno));
return PSCOM_ERR_STDERROR;
}

/* connect to mosquitto broker in BLOCKING manner */
int err;
err = mosquitto_connect(pscom_mosquitto_client,
Expand All @@ -407,10 +435,10 @@ int pscom_migration_init(void)
PSCOM_KEEP_ALIVE_INT);
if ( err != MOSQ_ERR_SUCCESS) {
DPRINT(1, "%s %d: ERROR: Could connect to the broker - %d"
"(%d [%s])",
"(%d [%s])",
__FILE__, __LINE__,
err,
errno,
errno,
strerror(errno));
return PSCOM_ERR_STDERROR;
} else {
Expand All @@ -436,11 +464,11 @@ int pscom_migration_init(void)
0);
if (err != MOSQ_ERR_SUCCESS) {
DPRINT(1, "%s %d: ERROR: Could not subscribe to '%s' - %d"
"(%d [%s])",
"(%d [%s])",
__FILE__, __LINE__,
pscom_mosquitto_req_topic,
err,
errno,
errno,
strerror(errno));
return PSCOM_ERR_STDERROR;
}
Expand All @@ -451,7 +479,7 @@ int pscom_migration_init(void)
/* set the subscription callback */
mosquitto_message_callback_set(pscom_mosquitto_client,
&pscom_message_callback);

/* start the communication loop */
err = mosquitto_loop_start(pscom_mosquitto_client);
if ( err != MOSQ_ERR_SUCCESS) {
Expand All @@ -470,17 +498,17 @@ int pscom_migration_cleanup(void)
{
int err;

/* unsubscribe from the migration command topic */
/* unsubscribe from the migration command topic */
err = mosquitto_unsubscribe(pscom_mosquitto_client,
NULL,
PSCOM_MOSQUITTO_REQ_TOPIC);
if (err != MOSQ_ERR_SUCCESS) {
DPRINT(1, "%s %d: ERROR: Could not unsubscribe from '%s' - %d"
"(%d [%s])",
"(%d [%s])",
__FILE__, __LINE__,
PSCOM_MOSQUITTO_REQ_TOPIC,
err,
errno,
errno,
strerror(errno));
return PSCOM_ERR_STDERROR;
}
Expand All @@ -506,7 +534,7 @@ int pscom_migration_cleanup(void)
return PSCOM_ERR_STDERROR;
}

/* destroy the mosquitto client */
/* destroy the mosquitto client */
mosquitto_destroy(pscom_mosquitto_client);

/* cleanup libmosquitto */
Expand Down
2 changes: 1 addition & 1 deletion lib/pscom/pscom_migrate.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#define PSCOM_MOSQUITTO_TOPIC_LENGTH 50
#define PSCOM_MOSQUITTO_REQ_TOPIC "fast/pscom/<hostname>/<pid>/request"
#define PSCOM_MOSQUITTO_RESP_TOPIC "fast/pscom/<hostname>/<pid>/response"
#define PSCOM_BROKER_HOST "devon"
#define PSCOM_BROKER_HOST "zerberus"
#define PSCOM_BROKER_PORT 1883
#define PSCOM_KEEP_ALIVE_INT 60

Expand Down
4 changes: 4 additions & 0 deletions lib/pscom/pscom_priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,8 @@ struct PSCOM_con
pscom_poll_reader_t poll_reader;
struct list_head poll_next_send; // used by pscom.poll_sender

struct list_head shutdown_requested; // user by S/R protocol

struct con_guard con_guard; // connection guard

struct {
Expand Down Expand Up @@ -346,6 +348,8 @@ struct PSCOM_sock
struct list_head groups; // List of pscom_group_t.next
struct list_head group_req_unknown; // List of pscom_req_t.next (requests with unknown group id)

struct list_head shutdown_connections; // List of pscom_con_t.shutdown_requested

struct pscom_listener {
ufd_info_t ufd_info; // TCP listen for new connections
unsigned usercnt; // Count the users of the listening fd. (keep fd open, if > 0)
Expand Down
1 change: 1 addition & 0 deletions lib/pscom/pscom_sock.c
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ pscom_sock_t *pscom_sock_create(unsigned int userdata_size)
INIT_LIST_HEAD(&sock->group_req_unknown);
INIT_LIST_HEAD(&sock->pendingioq);
INIT_LIST_HEAD(&sock->sendq_suspending);
INIT_LIST_HEAD(&sock->shutdown_connections);

sock->recv_req_cnt_any = 0;

Expand Down