Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support SELECT,ZSET2 #43

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 180 additions & 6 deletions src/rmt_redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
#define REDIS_RDB_TYPE_SET 2
#define REDIS_RDB_TYPE_ZSET 3
#define REDIS_RDB_TYPE_HASH 4
#define REDIS_RDB_TYPE_ZSET2 5

/* Object types for encoded objects. */
#define REDIS_RDB_TYPE_HASH_ZIPMAP 9
Expand Down Expand Up @@ -2989,7 +2990,6 @@ redis_parse_req(struct msg *r)

if (str6icmp(m, 's', 'e', 'l', 'e', 'c', 't')) {
r->type = MSG_REQ_REDIS_SELECT;
r->noforward = 1;
break;
}

Expand Down Expand Up @@ -5469,6 +5469,7 @@ struct msg *redis_generate_msg_with_key_value(rmtContext *ctx, mbuf_base *mb,
}

msg_owner->nfrag = 0;
log_notice("sub_msg_count is: %u",sub_msg_count);
msg_owner->frag_seq = rmt_alloc(sub_msg_count * sizeof(msg));
if(msg_owner->frag_seq == NULL){
goto enomem;
Expand Down Expand Up @@ -5525,6 +5526,7 @@ struct msg *redis_generate_msg_with_key_value(rmtContext *ctx, mbuf_base *mb,
msg->type = MSG_REQ_REDIS_SADD;
break;
case REDIS_ZSET:
case REDIS_ZSET2:
ret = redis_msg_append_bulk_full(msg, REDIS_INSERT_ZSET,
rmt_strlen(REDIS_INSERT_ZSET));
msg->type = MSG_REQ_REDIS_ZADD;
Expand Down Expand Up @@ -5832,6 +5834,12 @@ static sds redis_rdb_file_load_double_str(redis_rdb *rdb) {
}
}

static sds redis_rdb_file_load_binary_double_str(redis_rdb *rdb) {
char buf[256];
if (redis_rdb_file_read(rdb,buf,8) != RMT_OK) return NULL;
return sdsnewlen(buf, 8);
}

static sds redis_rdb_file_load_lzf_str(redis_rdb *rdb) {
unsigned int len, clen;
unsigned char *c = NULL;
Expand Down Expand Up @@ -5939,7 +5947,7 @@ static struct array *redis_rdb_file_load_value(redis_rdb *rdb, int rdbtype)
str = array_push(value);
if ((*str = redis_rdb_file_load_str(rdb)) == NULL) goto error;
}
}else if (rdbtype == REDIS_RDB_TYPE_ZSET) {
}else if (rdbtype == REDIS_RDB_TYPE_ZSET || rdbtype == REDIS_RDB_TYPE_ZSET2) {
if ((len = redis_rdb_file_load_len(rdb,NULL)) == REDIS_RDB_LENERR) goto error;

value = redis_value_create((uint32_t)(2*len));
Expand All @@ -5950,9 +5958,17 @@ static struct array *redis_rdb_file_load_value(redis_rdb *rdb, int rdbtype)

while(len--) {
if ((elem1 = redis_rdb_file_load_str(rdb)) == NULL) goto error;
if ((elem2 = redis_rdb_file_load_double_str(rdb)) == NULL) {
sdsfree(elem1);
goto error;
if (rdbtype == REDIS_RDB_TYPE_ZSET) {
if ((elem2 = redis_rdb_file_load_double_str(rdb)) == NULL) {
sdsfree(elem1);
goto error;
}
} else {
if ((elem2 = redis_rdb_file_load_binary_double_str(rdb)) == NULL) {
log_error("the error key name: %s\n", elem1);
sdsfree(elem1);
goto error;
}
}

str = array_push(value);
Expand Down Expand Up @@ -6196,6 +6212,10 @@ static int redis_object_type_get_by_rdbtype(int dbtype)

return REDIS_STRING;
break;
case REDIS_RDB_TYPE_ZSET2:

return REDIS_ZSET2;
break;
case REDIS_RDB_TYPE_LIST:
case REDIS_RDB_TYPE_LIST_ZIPLIST:
case REDIS_RDB_TYPE_LIST_QUICKLIST:
Expand Down Expand Up @@ -6382,6 +6402,149 @@ int redis_key_value_send(redis_node *srnode, sds key,
return -1;
}

int redis_select_cmd_send(redis_node *srnode, const char *dbid, void *data){
int ret;
rmtContext *ctx = srnode->ctx;
redis_group *srgroup = srnode->owner;
mbuf_base *mb = srgroup->mb;
redis_group *trgroup = data;
redis_node *trnode;
redis_node **rnode;
struct msg *msg = NULL;
uint32_t i;
int mbuf_count = 0;

//get target node
rnode = array_get(trgroup->route, 0);
trnode = *rnode;
if (trnode == NULL){
log_error("ERROR: [cmd] backend node is NULL");
goto error;
}
msg = redis_generate_select_cmd_msg(ctx, mb, dbid);
if (msg == NULL){
log_error("ERROR: generate msg for select cmd error");
goto error;
}
if (msg->frag_seq == NULL){
mbuf_count += listLength(msg->data);
ret = prepare_send_msg(srnode, msg, trnode);
if (ret != RMT_OK){
log_error("ERROR1: prepare send select msg to node[%s] failed",trnode->addr);
goto error;
}
msg = NULL;
}
else{
for (i = 0; i < msg->nfrag; i++){
mbuf_count += listLength(msg->frag_seq[i]->data);
ret = prepare_send_msg(srnode, msg->frag_seq[i], trnode);
if (ret != RMT_OK){
log_error("ERROR2: prepare send select cmd to node[%s] failed",trnode->addr);
goto error;
}
msg->frag_seq[i] = NULL;
}
msg_put(msg);
msg_free(msg);
msg = NULL;
}
return mbuf_count;

error:

if (msg != NULL){
if (msg->frag_seq != NULL){
for (i = 0; i < msg->nfrag; i++){
if (msg->frag_seq[i] != NULL){
msg_put(msg->frag_seq[i]);
msg_free(msg->frag_seq[i]);
msg->frag_seq[i] = NULL;
}
}
}
msg_put(msg);
msg_free(msg);
}
return -1;
}

struct msg *redis_generate_select_cmd_msg(rmtContext *ctx, mbuf_base *mb, const char *dbid){
int ret;
struct msg *msg, *msg_owner;
uint32_t sub_msg_count = 0;
uint32_t field_count, i;

msg = NULL;
msg_owner = NULL;
/*msg_owner = msg_get(mb, 1, REDIS_DATA_TYPE_CMD);
if (msg_owner == NULL){
goto enomem;
}
msg_owner->nfrag = 0;
msg_owner->frag_seq = rmt_alloc()*/
next:
msg = msg_get(mb, 1, REDIS_DATA_TYPE_RDB);
if (msg == NULL){
goto enomem;
}
field_count = 2;
//add *2 reference aof protocol
ret = redis_msg_append_multi_bulk_len_full(msg, field_count);
if (ret != RMT_OK){
log_error("[cmd] cmd msg append bulk len error");
if (ret == RMT_ENOMEM){
goto enomem;
}
goto error;
}
// append select command to bulk.
ret = redis_msg_append_bulk_full(msg, "SELECT", rmt_strlen("SELECT"));
msg->type = MSG_REQ_REDIS_SELECT;
if (ret != RMT_OK){
log_error("[cmd] cmd msg append select error: %s",msg_type_string(msg->type));
if (ret == RMT_ENOMEM){
goto enomem;
}
goto error;
}
// append dbid to bulk, dbid type must be a string point
ret = redis_msg_append_bulk_full(msg, dbid, rmt_strlen(dbid));
if (ret != RMT_OK){
log_error("[cmd] cmd msg append dbid error");
if (ret == RMT_ENOMEM){
goto enomem;
}
goto error;
}
// set reply info
if (ctx->noreply){
msg->noreply = 1;
}
if (msg_owner == NULL){
return msg;
}

enomem:
log_error("ERROR: out of memory");

error:
if (msg != NULL){
msg_put(msg);
msg_free(msg);
}

if (msg_owner != NULL){
for (i = 0; i < msg_owner->nfrag; i++){
msg_put(msg_owner->frag_seq[i]);
msg_free(msg_owner->frag_seq[i]);
}
msg_put(msg_owner);
msg_free(msg_owner);
}
return NULL;
}

int redis_parse_rdb_file(redis_node *srnode, int mbuf_count_one_time)
{
int ret;
Expand Down Expand Up @@ -6441,7 +6604,7 @@ int redis_parse_rdb_file(redis_node *srnode, int mbuf_count_one_time)
}

rdb->rdbver = rmt_atoi(buf+len, 4);
if (rdb->rdbver < 1 || rdb->rdbver > REDIS_RDB_VERSION) {
if (rdb->rdbver < 1) {
log_error("ERROR: Can't handle RDB format version %d",
rdb->fname, rdb->rdbver);
goto error;
Expand Down Expand Up @@ -6501,6 +6664,17 @@ int redis_parse_rdb_file(redis_node *srnode, int mbuf_count_one_time)
rdb->fname);
goto eoferr;
}
else{
log_notice("fetch select db:[%u]",dbid);
char db[5];
sprintf(db,"%d",dbid);
ret = redis_select_cmd_send(srnode, db, trgroup);
if (ret < 0){
log_notice("send %s failed",db);
break;
}
mbuf_count += ret;
}

log_debug(LOG_INFO, "dbid: %d", dbid);
continue;
Expand Down
3 changes: 3 additions & 0 deletions src/rmt_redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#define REDIS_SET 2
#define REDIS_ZSET 3
#define REDIS_HASH 4
#define REDIS_ZSET2 5

#define REDIS_REPLY_STATUS_OK "+OK\r\n"
#define REDIS_REPLY_STATUS_PONG "+PONG\r\n"
Expand Down Expand Up @@ -255,6 +256,8 @@ void redis_value_destroy(struct array *value);
char *get_redis_type_string(int type);

struct array *get_multi_bulk_array_from_mbuf_list(list *mbufs);
struct msg *redis_generate_select_cmd_msg(struct rmtContext *ctx, mbuf_base *mb, const char *dbid);
int redis_select_cmd_send(redis_node *srnode, const char *dbid, void *data);

#endif