Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/adlist.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,22 @@ void listLinkNodeTail(list *list, listNode *node) {
list->len++;
}

/* like listAddNodeTail, with a pre-existing listNode item */
list *listAddTail(list *list, listNode *node)
{
if (list->len == 0) {
list->head = list->tail = node;
node->prev = node->next = NULL;
} else {
node->prev = list->tail;
node->next = NULL;
list->tail->next = node;
list->tail = node;
}
list->len++;
return list;
}

list *listInsertNode(list *list, listNode *old_node, void *value, int after) {
listNode *node;

Expand Down
1 change: 1 addition & 0 deletions src/adlist.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ void listReleaseGeneric(void *list);
void listEmpty(list *list);
list *listAddNodeHead(list *list, void *value);
list *listAddNodeTail(list *list, void *value);
list *listAddTail(list *list, listNode *node);
list *listInsertNode(list *list, listNode *old_node, void *value, int after);
void listDelNode(list *list, listNode *node);
listIter *listGetIterator(list *list, int direction);
Expand Down
20 changes: 20 additions & 0 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -1640,12 +1640,32 @@ int loadSingleAppendOnlyFile(char *filename) {
if (fakeClient->flags & CLIENT_MULTI &&
fakeClient->cmd->proc != execCommand)
{
/* queueMultiCommand requires a pendingCommand, so we create a "fake" one here
* for it to consume */
pendingCommand *pcmd = zmalloc(sizeof(pendingCommand));
initPendingCommand(pcmd);
listNode *next_pend = zmalloc(sizeof(listNode));
listAddTail(fakeClient->pending_cmds, next_pend);
next_pend->value = pcmd;

pcmd->argc = argc;
pcmd->argv_len = argc;
pcmd->argv = argv;
pcmd->cmd = cmd;

fakeClient->ready_pending_cmds += 1;

/* Note: we don't have to attempt calling evalGetCommandFlags,
* since this is AOF, the checks in processCommand are not made
* anyway.*/
queueMultiCommand(fakeClient, cmd->flags);

/* Since freeClientPendingCommands doesn't get called in this flow to free the queued
* command, we do it manually. */
freeClientPendingCommands(fakeClient, 1);
} else {
cmd->proc(fakeClient);
fakeClient->all_argv_len_sum = 0; /* Otherwise no one cleans this up and we reach cleanup with it non-zero */
}

/* The fake client should not have a reply */
Expand Down
3 changes: 1 addition & 2 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,7 @@ void unblockClient(client *c, int queue_for_reprocessing) {
* call reqresAppendResponse here (for clients blocked on key,
* unblockClientOnKey is called, which eventually calls processCommand,
* which calls reqresAppendResponse) */
reqresAppendResponse(c);
resetClient(c);
prepareForNextCommand(c);
}

/* Clear the flags, and put the client in the unblocked list so that
Expand Down
47 changes: 20 additions & 27 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1113,8 +1113,10 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
robj *firstkey = NULL;
int multiple_keys = 0;
multiState *ms, _ms;
multiCmd mc;
int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0,
pendingCommand mc;
initPendingCommand(&mc);
pendingCommand *mcp = &mc;
int i, slot = CLUSTER_INVALID_SLOT, migrating_slot = 0, importing_slot = 0, missing_keys = 0,
existing_keys = 0;
int pubsubshard_included = 0; /* Flag to indicate if a pubsub shard cmd is included. */

Expand All @@ -1141,7 +1143,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
* structure if the client is not in MULTI/EXEC state, this way
* we have a single codepath below. */
ms = &_ms;
_ms.commands = &mc;
_ms.commands = &mcp;
_ms.count = 1;
mc.argv = argv;
mc.argc = argc;
Expand All @@ -1153,12 +1155,12 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
for (i = 0; i < ms->count; i++) {
struct redisCommand *mcmd;
robj **margv;
int margc, numkeys, j;
keyReference *keyindex;
int j;

pendingCommand *pcmd = ms->commands[i];

mcmd = ms->commands[i].cmd;
margc = ms->commands[i].argc;
margv = ms->commands[i].argv;
mcmd = pcmd->cmd;
margv = pcmd->argv;

/* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */
if (!pubsubshard_included &&
Expand All @@ -1167,28 +1169,29 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
pubsubshard_included = 1;
}

getKeysResult result = GETKEYS_RESULT_INIT;
numkeys = getKeysFromCommand(mcmd,margv,margc,&result);
keyindex = result.keys;
for (j = 0; j < pcmd->keys_result.numkeys; j++) {
/* The command has keys and was checked for cross-slot between its keys in preprocessCommand() */
if (pcmd->slot == CLUSTER_INVALID_SLOT) {
/* Error: multiple keys from different slots. */
if (error_code)
*error_code = CLUSTER_REDIR_CROSS_SLOT;
return NULL;
}

for (j = 0; j < numkeys; j++) {
robj *thiskey = margv[keyindex[j].pos];
int thisslot = keyHashSlot((char*)thiskey->ptr,
sdslen(thiskey->ptr));
robj *thiskey = margv[pcmd->keys_result.keys[j].pos];

if (firstkey == NULL) {
/* This is the first key we see. Check what is the slot
* and node. */
firstkey = thiskey;
slot = thisslot;
slot = pcmd->slot;
n = getNodeBySlot(slot);

/* Error: If a slot is not served, we are in "cluster down"
* state. However the state is yet to be updated, so this was
* not trapped earlier in processCommand(). Report the same
* error to the client. */
if (n == NULL) {
getKeysFreeResult(&result);
if (error_code)
*error_code = CLUSTER_REDIR_DOWN_UNBOUND;
return NULL;
Expand All @@ -1207,15 +1210,6 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
importing_slot = 1;
}
} else {
/* If it is not the first key/channel, make sure it is exactly
* the same key/channel as the first we saw. */
if (slot != thisslot) {
/* Error: multiple keys from different slots. */
getKeysFreeResult(&result);
if (error_code)
*error_code = CLUSTER_REDIR_CROSS_SLOT;
return NULL;
}
if (importing_slot && !multiple_keys && !equalStringObjects(firstkey,thiskey)) {
/* Flag this request as one with multiple different
* keys/channels when the slot is in importing state. */
Expand All @@ -1236,7 +1230,6 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
else existing_keys++;
}
}
getKeysFreeResult(&result);
}

/* No key at all in command? then we can serve the request
Expand Down
1 change: 1 addition & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#define CLUSTER_SLOT_MASK_BITS 14 /* Number of bits used for slot id. */
#define CLUSTER_SLOTS (1<<CLUSTER_SLOT_MASK_BITS) /* Total number of slots in cluster mode, which is 16384. */
#define CLUSTER_SLOT_MASK ((unsigned long long)(CLUSTER_SLOTS - 1)) /* Bit mask for slot id stored in LSB. */
#define CLUSTER_INVALID_SLOT (-1) /* Invalid slot number. */
#define CLUSTER_OK 0 /* Everything looks ok */
#define CLUSTER_FAIL 1 /* The cluster can't work */
#define CLUSTER_NAMELEN 40 /* sha1 hex length */
Expand Down
25 changes: 25 additions & 0 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,8 @@ void debugCommand(client *c) {
" Enable or disable the main dict and expire dict resizing.",
"SCRIPT <LIST|<sha>>",
" Output SHA and content of all scripts or of a specific script with its SHA.",
"LOOKAHEAD",
" Low-level look-ahead information for all clients.",
"MARK-INTERNAL-CLIENT [UNMARK]",
" Promote the current connection to an internal connection.",
NULL
Expand Down Expand Up @@ -1088,6 +1090,29 @@ NULL
return;
}
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"lookahead")) {
/* Pause all IO threads to access data of clients safely, and pausing the
* specific IO thread will not repeatedly execute in catClientInfoString. */
int allpaused = 0;
if (server.io_threads_num > 1 && !server.crashing &&
pthread_equal(server.main_thread_id, pthread_self()))
{
allpaused = 1;
pauseAllIOThreads();
}

sds info = sdsempty();
listNode *ln;
listIter li;
listRewind(server.clients,&li);
while ((ln = listNext(&li)) != NULL) {
client *client = listNodeValue(ln);
info = sdscatprintf(info, "id: %lu pcmds: %d\n", client->id, client->ready_pending_cmds);
}

if (allpaused) resumeAllIOThreads();
addReplyVerbatim(c,info,strlen(info),"txt");
sdsfree(info);
} else if(!strcasecmp(c->argv[1]->ptr,"mark-internal-client") && c->argc < 4) {
if (c->argc == 2) {
c->flags |= CLIENT_INTERNAL;
Expand Down
34 changes: 30 additions & 4 deletions src/iothread.c
Original file line number Diff line number Diff line change
Expand Up @@ -466,10 +466,36 @@ int processClientsFromIOThread(IOThread *t) {

/* Process the pending command and input buffer. */
if (!c->read_error && c->io_flags & CLIENT_IO_PENDING_COMMAND) {
c->flags |= CLIENT_PENDING_COMMAND;
if (processPendingCommandAndInputBuffer(c) == C_ERR) {
/* If the client is no longer valid, it must be freed safely. */
continue;
serverAssert(listLength(c->pending_cmds) > 0);

while (listLength(c->pending_cmds)) {
pendingCommand *curcmd = listFirst(c->pending_cmds)->value;

/* We populate the old client fields so we don't have to modify all existing logic to work with pendingCommands */
c->argc = curcmd->argc;
c->argv = curcmd->argv;
c->argv_len = curcmd->argv_len;
c->reploff_next = curcmd->reploff;
c->slot = curcmd->slot;
serverAssert(c->argv);

/* We are finally ready to execute the command. */
if (processCommandAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, it must be freed safely. */
continue;
}
}
serverAssert(listLength(c->pending_cmds) == 0);

/* Now process client if it has more data in it's buffer.
*
* Note: when a master client steps into this function,
* it can always satisfy this condition, because its querybuf
* contains data not applied. */
serverAssert(c->ready_pending_cmds == 0);
if (((c->querybuf && sdslen(c->querybuf) > 0))) {
if (processInputBuffer(c) == C_ERR)
continue;
}
}

Expand Down
22 changes: 11 additions & 11 deletions src/memory_prefetch.c
Original file line number Diff line number Diff line change
Expand Up @@ -382,18 +382,18 @@ int addCommandToBatch(client *c) {

batch->clients[batch->client_count++] = c;

if (likely(c->iolookedcmd)) {
// if (likely(c->iolookedcmd)) {
/* Get command's keys positions */
getKeysResult result = GETKEYS_RESULT_INIT;
int num_keys = getKeysFromCommand(c->iolookedcmd, c->argv, c->argc, &result);
for (int i = 0; i < num_keys && batch->key_count < batch->max_prefetch_size; i++) {
batch->keys[batch->key_count] = c->argv[result.keys[i].pos];
batch->keys_dicts[batch->key_count] =
kvstoreGetDict(c->db->keys, c->slot > 0 ? c->slot : 0);
batch->key_count++;
}
getKeysFreeResult(&result);
}
// getKeysResult result = GETKEYS_RESULT_INIT;
// int num_keys = getKeysFromCommand(c->iolookedcmd, c->argv, c->argc, &result);
// for (int i = 0; i < num_keys && batch->key_count < batch->max_prefetch_size; i++) {
// batch->keys[batch->key_count] = c->argv[result.keys[i].pos];
// batch->keys_dicts[batch->key_count] =
// kvstoreGetDict(c->db->keys, c->slot > 0 ? c->slot : 0);
// batch->key_count++;
// }
// getKeysFreeResult(&result);
// }

return C_OK;
}
9 changes: 3 additions & 6 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -674,11 +674,12 @@ void moduleReleaseTempClient(client *c) {
listEmpty(c->reply);
c->reply_bytes = 0;
c->duration = 0;
resetClient(c);
resetClient(c, -1);
serverAssert(c->all_argv_len_sum == 0);
c->bufpos = 0;
c->flags = CLIENT_MODULE;
c->user = NULL; /* Root user */
c->cmd = c->lastcmd = c->realcmd = c->iolookedcmd = NULL;
c->cmd = c->lastcmd = c->realcmd = NULL;
if (c->bstate.async_rm_call_handle) {
RedisModuleAsyncRMCallPromise *promise = c->bstate.async_rm_call_handle;
promise->c = NULL; /* Remove the client from the promise so it will no longer be possible to abort it. */
Expand Down Expand Up @@ -11034,10 +11035,6 @@ void moduleCallCommandFilters(client *c) {
f->callback(&filter);
}

/* If the filter sets a new command, including command or subcommand,
* the command looked up in IO threads will be invalid. */
c->iolookedcmd = NULL;

c->argv = filter.argv;
c->argv_len = filter.argv_len;
c->argc = filter.argc;
Expand Down
Loading
Loading