diff --git a/src/ngx_stream_upsync_module.c b/src/ngx_stream_upsync_module.c index 5e71b11..8baccdb 100644 --- a/src/ngx_stream_upsync_module.c +++ b/src/ngx_stream_upsync_module.c @@ -36,7 +36,8 @@ typedef struct { #define NGX_STREAM_UPSYNC_CONSUL 0x0001 #define NGX_STREAM_UPSYNC_CONSUL_SERVICES 0x0002 -#define NGX_STREAM_UPSYNC_ETCD 0x0003 +#define NGX_STREAM_UPSYNC_CONSUL_HEALTH 0x0003 +#define NGX_STREAM_UPSYNC_ETCD 0x0004 typedef ngx_int_t (*ngx_stream_upsync_packet_init_pt) @@ -220,6 +221,8 @@ static ngx_int_t ngx_stream_upsync_check_index( static ngx_int_t ngx_stream_upsync_consul_parse_json(void *upsync_server); static ngx_int_t ngx_stream_upsync_consul_services_parse_json( void *upsync_server); +static ngx_int_t ngx_stream_upsync_consul_health_parse_json( + void *upsync_server); static ngx_int_t ngx_stream_upsync_etcd_parse_json(void *upsync_server); static ngx_int_t ngx_stream_upsync_check_key(u_char *key, ngx_str_t host); static void *ngx_stream_upsync_servers(ngx_cycle_t *cycle, @@ -346,6 +349,14 @@ static ngx_upsync_conf_t ngx_upsync_types[] = { ngx_stream_upsync_consul_services_parse_json, ngx_stream_upsync_clean_event }, + { ngx_string("consul_health"), + NGX_STREAM_UPSYNC_CONSUL_HEALTH, + ngx_stream_upsync_send_handler, + ngx_stream_upsync_recv_handler, + ngx_stream_upsync_consul_parse_init, + ngx_stream_upsync_consul_health_parse_json, + ngx_stream_upsync_clean_event }, + { ngx_string("etcd"), NGX_STREAM_UPSYNC_ETCD, ngx_stream_upsync_send_handler, @@ -733,7 +744,8 @@ ngx_stream_upsync_check_index(ngx_stream_upsync_server_t *upsync_server) upsync_type_conf = upsync_server->upscf->upsync_type_conf; if (upsync_type_conf->upsync_type == NGX_STREAM_UPSYNC_CONSUL - || upsync_type_conf->upsync_type == NGX_STREAM_UPSYNC_CONSUL_SERVICES) { + || upsync_type_conf->upsync_type == NGX_STREAM_UPSYNC_CONSUL_SERVICES + || upsync_type_conf->upsync_type == NGX_STREAM_UPSYNC_CONSUL_HEALTH) { for (i = 0; i < state.num_headers; i++) { if (ngx_memcmp(state.headers[i][0], NGX_INDEX_HEADER, @@ -862,6 +874,25 @@ ngx_stream_upsync_add_peers(ngx_cycle_t *cycle, peer->next = peers->peer; peers->peer = peer; +#if (NGX_STREAM_UPSTREAM_CHECK) + ngx_uint_t index; + ngx_addr_t *addrs; + + //TODO: it's a little trick, a issue with upstream_check_module + // add/del interface, not rely on addrs of check_peers; + addrs = ngx_pcalloc(ngx_cycle->pool, sizeof(ngx_addr_t)); + if (addrs == NULL) { + goto invalid; + } + addrs->name.data = peer->name.data; + addrs->name.len = peer->name.len; + addrs->sockaddr = peer->sockaddr; + addrs->socklen = peer->socklen; + + index = ngx_stream_upstream_check_add_dynamic_peer(cycle->pool, + uscf, addrs); + peer->check_index = index; +#endif w += server->weight; } w += peers->total_weight; @@ -1068,6 +1099,7 @@ ngx_stream_upsync_del_peers(ngx_cycle_t *cycle, if (uscf->peer.data == NULL) { return NGX_ERROR; } + peers = (ngx_stream_upstream_rr_peers_t *)uscf->peer.data; if (peers->number <= servers->nelts) { @@ -1088,6 +1120,11 @@ ngx_stream_upsync_del_peers(ngx_cycle_t *cycle, if (ngx_memn2cmp((u_char *) peer->sockaddr, (u_char *) server->addrs->sockaddr, len, len) == 0) { + +#if (NGX_STREAM_UPSTREAM_CHECK) + ngx_stream_upstream_check_delete_dynamic_peer( + peers->name, server->addrs); +#endif if (del_peer == NULL) { del_peer = peer; tmp_del_peer = peer; @@ -1202,6 +1239,9 @@ ngx_stream_upsync_replace_peers(ngx_cycle_t *cycle, peer->conns = 0; +#if (NGX_STREAM_UPSTREAM_CHECK) + peer->check_index = server->addrs->.check_index; +#endif peer->next = peers->peer; peers->peer = peer; @@ -1585,6 +1625,169 @@ ngx_stream_upsync_consul_services_parse_json(void *data) return NGX_OK; } +static ngx_int_t +ngx_stream_upsync_consul_health_parse_json(void *data) +{ + ngx_buf_t *buf; + ngx_stream_upsync_ctx_t *ctx; + ngx_stream_upsync_conf_t *upstream_conf = NULL; + ngx_stream_upsync_server_t *upsync_server = data; + ngx_int_t attr_value; + + ctx = &upsync_server->ctx; + buf = &ctx->body; + + cJSON *root = cJSON_Parse((char *)buf->pos); + if (root == NULL) { + ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, + "upsync_parse_json: root error"); + return NGX_ERROR; + } + + if (ngx_array_init(&ctx->upstream_conf, ctx->pool, 16, + sizeof(*upstream_conf)) != NGX_OK) + { + ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, + "upsync_parse_json: array init error"); + cJSON_Delete(root); + return NGX_ERROR; + } + + cJSON *server_next; + for (server_next = root->child; server_next != NULL; + server_next = server_next->next) + { + cJSON *addr, *checks, *check_next, *node, *port, *service, *tags, *tag_next; + size_t addr_len, port_len; + u_char port_buf[8]; + + service = cJSON_GetObjectItem(server_next, "Service"); + + addr = cJSON_GetObjectItem(service, "Address"); + if (addr == NULL || addr->valuestring == NULL + || addr->valuestring[0] == '\0') + { + node = cJSON_GetObjectItem(server_next, "Node"); + addr = cJSON_GetObjectItem(node, "Address"); + if (addr == NULL || addr->valuestring == NULL) { + continue; + } + } + + port = cJSON_GetObjectItem(service, "Port"); + if (port == NULL || port->valueint < 1 || port->valueint > 65535) { + continue; + } + ngx_memzero(port_buf, 8); + ngx_sprintf(port_buf, "%d", port->valueint); + + addr_len = ngx_strlen(addr->valuestring); + port_len = ngx_strlen(port_buf); + + if (addr_len + port_len + 2 > NGX_SOCKADDRLEN) { + continue; + } + + upstream_conf = ngx_array_push(&ctx->upstream_conf); + if (upstream_conf == NULL) { + cJSON_Delete(root); + return NGX_ERROR; + } + ngx_memzero(upstream_conf, sizeof(*upstream_conf)); + + ngx_memcpy(upstream_conf->sockaddr, addr->valuestring, addr_len); + ngx_memcpy(upstream_conf->sockaddr + addr_len, ":", 1); + ngx_memcpy(upstream_conf->sockaddr + addr_len + 1, port_buf, port_len); + + /* default value, server attribute */ + upstream_conf->weight = 1; + upstream_conf->max_fails = 2; + upstream_conf->fail_timeout = 10; + + upstream_conf->down = 0; + upstream_conf->backup = 0; + + checks = cJSON_GetObjectItem(server_next, "Checks"); + + for (check_next = checks->child; check_next != NULL; + check_next = check_next->next) + { + cJSON *check_status; + check_status = cJSON_GetObjectItem(check_next, "Status"); + + if (check_status == NULL || check_status->valuestring == NULL + || check_status->valuestring[0] == '\0' + || ngx_strncmp(check_status->valuestring, "passing", 7) != 0) + { + upstream_conf->down = 1; + break; + } + } + + tags = cJSON_GetObjectItem(service, "Tags"); + if (tags == NULL) { + continue; + } + + for (tag_next = tags->child; tag_next != NULL; + tag_next = tag_next->next) + { + u_char *tag = (u_char *) tag_next->valuestring; + if (tag == NULL) { + continue; + } + if (ngx_strncmp(tag, "weight=", 7) == 0) { + attr_value = ngx_atoi(tag + 7, (size_t)ngx_strlen(tag) - 7); + + if (attr_value == NGX_ERROR || attr_value <= 0) { + ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, + "upsync_parse_json: \"weight\" value is " + "invalid, setting default value 1"); + continue; + } else { + upstream_conf->weight = attr_value; + } + } + if (ngx_strncmp(tag, "max_fails=", 10) == 0) { + attr_value = ngx_atoi(tag + 10, (size_t)ngx_strlen(tag) - 10); + + if (attr_value == NGX_ERROR || attr_value < 0) { + ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, + "upsync_parse_json: \"max_fails\" value is " + "invalid, setting default value 2"); + continue; + } else { + upstream_conf->max_fails = attr_value; + } + } + if (ngx_strncmp(tag, "fail_timeout=", 13) == 0) { + ngx_str_t value = {ngx_strlen(tag) - 13, tag + 13}; + attr_value = ngx_parse_time(&value, 1); + + if (attr_value == NGX_ERROR || attr_value < 0) { + ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, + "upsync_parse_json: \"fail_timeout\" value is " + "invalid, setting default value 10"); + continue; + } else { + upstream_conf->fail_timeout = attr_value; + } + } + if (ngx_strncmp(tag, "down", 4) == 0 && tag[4] == '\0') { + upstream_conf->down = 1; + } + if (ngx_strncmp(tag, "backup", 6) == 0 && tag[6] == '\0') { + upstream_conf->backup = 1; + } + } + + } + + cJSON_Delete(root); + + return NGX_OK; +} + static ngx_int_t ngx_stream_upsync_etcd_parse_json(void *data) @@ -2529,7 +2732,8 @@ ngx_stream_upsync_send_handler(ngx_event_t *event) ngx_memzero(request, ngx_pagesize); if (upsync_type_conf->upsync_type == NGX_STREAM_UPSYNC_CONSUL - || upsync_type_conf->upsync_type == NGX_STREAM_UPSYNC_CONSUL_SERVICES) { + || upsync_type_conf->upsync_type == NGX_STREAM_UPSYNC_CONSUL_SERVICES + || upsync_type_conf->upsync_type == NGX_STREAM_UPSYNC_CONSUL_HEALTH) { ngx_sprintf(request, "GET %V?recurse&index=%uL HTTP/1.0\r\nHost: %V\r\n" "Accept: */*\r\n\r\n", &upscf->upsync_send, upsync_server->index, @@ -3354,6 +3558,7 @@ ngx_stream_upsync_clean_event(void *data) if (upsync_type_conf->upsync_type == NGX_STREAM_UPSYNC_CONSUL || upsync_type_conf->upsync_type == NGX_STREAM_UPSYNC_CONSUL_SERVICES + || upsync_type_conf->upsync_type == NGX_STREAM_UPSYNC_CONSUL_HEALTH || upsync_type_conf->upsync_type == NGX_STREAM_UPSYNC_ETCD) { if (parser != NULL) { @@ -3447,6 +3652,7 @@ ngx_stream_upsync_clear_all_events(ngx_cycle_t *cycle) if (upsync_type_conf->upsync_type == NGX_STREAM_UPSYNC_CONSUL || upsync_type_conf->upsync_type == NGX_STREAM_UPSYNC_CONSUL_SERVICES + || upsync_type_conf->upsync_type == NGX_STREAM_UPSYNC_CONSUL_HEALTH || upsync_type_conf->upsync_type == NGX_STREAM_UPSYNC_ETCD) { if (parser != NULL) { @@ -3608,7 +3814,8 @@ ngx_stream_client_send(ngx_stream_conf_client *client, ngx_memzero(request, ngx_pagesize); if (upsync_type_conf->upsync_type == NGX_STREAM_UPSYNC_CONSUL - || upsync_type_conf->upsync_type == NGX_STREAM_UPSYNC_CONSUL_SERVICES) { + || upsync_type_conf->upsync_type == NGX_STREAM_UPSYNC_CONSUL_SERVICES + || upsync_type_conf->upsync_type == NGX_STREAM_UPSYNC_CONSUL_HEALTH) { ngx_sprintf(request, "GET %V?recurse&index=%uL HTTP/1.0\r\nHost: %V\r\n" "Accept: */*\r\n\r\n", &upscf->upsync_send, upsync_server->index, @@ -3823,7 +4030,7 @@ ngx_stream_upsync_show(ngx_stream_session_t *s) b_body->last = ngx_snprintf(b_body->last, b_body->end - b_body->last, "No upstreams defined\n"); } - + for (i = 0; i < umcf->upstreams.nelts; i++) { ngx_stream_upsync_show_upstream(uscfp[i], b_body); b_body->last = ngx_snprintf(b_body->last, b_body->end - b_body->last, "\n"); diff --git a/src/ngx_stream_upsync_module.h b/src/ngx_stream_upsync_module.h index 94bae49..3fbdb7a 100644 --- a/src/ngx_stream_upsync_module.h +++ b/src/ngx_stream_upsync_module.h @@ -48,6 +48,15 @@ #define NGX_STREAM_LB_HASH_MODULA 8 #define NGX_STREAM_LB_HASH_KETAMA 16 +#if (NGX_STREAM_UPSTREAM_CHECK) + +extern ngx_uint_t ngx_stream_upstream_check_add_dynamic_peer(ngx_pool_t *pool, + ngx_stream_upstream_srv_conf_t *uscf, ngx_addr_t *peer_addr); +extern void ngx_stream_upstream_check_delete_dynamic_peer(ngx_str_t *name, + ngx_addr_t *peer_addr); + +#endif + /******************************hash*********************************/