Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
4361abf
init
sundb Sep 22, 2025
2efaa56
try 1
sundb Sep 22, 2025
c521961
try 2
sundb Sep 22, 2025
95e2306
revert some code
sundb Sep 23, 2025
dbd4b2b
try 3
sundb Sep 23, 2025
9d1b0c5
Remove unused code
sundb Sep 23, 2025
2f39505
aof.c
sundb Sep 23, 2025
2f868f1
multi
sundb Sep 23, 2025
665f66a
Rename queue to list
sundb Sep 23, 2025
a0aedeb
try
sundb Sep 23, 2025
e75f46b
try
sundb Sep 23, 2025
9675858
add config
sundb Sep 23, 2025
79dda7d
parse inline buffer
sundb Sep 23, 2025
5011ed0
improve prefetch
sundb Sep 23, 2025
cd9a6bf
try
sundb Sep 23, 2025
f507381
Remove input_bytes
sundb Sep 23, 2025
949eea7
reploff_next
sundb Sep 23, 2025
20a129e
Refine
sundb Sep 23, 2025
41956c1
Remove unused code
sundb Sep 23, 2025
3a5e393
Fix crash
sundb Sep 23, 2025
fc238ee
try
sundb Sep 24, 2025
4e2faca
format
sundb Sep 24, 2025
e848c6f
format
sundb Sep 24, 2025
78a7815
fix
sundb Sep 24, 2025
4a58eb2
fix aof
sundb Sep 24, 2025
beda1bb
comment out tests
sundb Sep 24, 2025
3d36763
Remove cmdpoll
sundb Sep 24, 2025
3f259c6
improve
sundb Sep 24, 2025
d793176
fix aofrw
sundb Sep 24, 2025
c5c3454
fix aofrw
sundb Sep 24, 2025
e636831
uncomment tests
sundb Sep 24, 2025
8479541
uncomment tests
sundb Sep 24, 2025
9bec761
fix cluster
sundb Sep 24, 2025
b28914a
uncomment tests
sundb Sep 24, 2025
75d43c7
uncomment tests
sundb Sep 24, 2025
3c2bb75
uncomment tests
sundb Sep 24, 2025
3b9dfcd
uncomment tests
sundb Sep 24, 2025
e7fdcea
uncomment tests
sundb Sep 24, 2025
f8caef5
uncomment tests
sundb Sep 24, 2025
df36ec4
fix redefine
sundb Sep 24, 2025
c55bb6b
uncomment tests
sundb Sep 24, 2025
7ad0310
uncomment tests
sundb Sep 24, 2025
3bbfcae
fix cluster test
sundb Sep 25, 2025
b12e693
fix clusterSlotStatsAddNetworkBytesInForUserClient()
sundb Sep 25, 2025
7b6471c
fix slot-stats test
sundb Sep 25, 2025
a51d54d
Fix nested prefetch
sundb Sep 28, 2025
05796da
Merge branch 'unstable' into cmd-lookahead
sundb Sep 28, 2025
d0e73e3
try
sundb Sep 29, 2025
8d390ce
try
sundb Sep 29, 2025
5342af6
Fix crash
sundb Sep 29, 2025
1490b47
Fix memory leak
sundb Sep 29, 2025
73335b0
Fix
sundb Sep 29, 2025
eea8b94
Revert some code
sundb Sep 29, 2025
a4b968a
fix command fileter
sundb Sep 29, 2025
493925b
Naming
sundb Sep 29, 2025
5bc67ce
Refine
sundb Sep 29, 2025
8d082f4
Fix prefetch
sundb Sep 29, 2025
21e9939
rename parsed_cmd to lookedcmd
sundb Sep 29, 2025
ef5c053
Add ready len
sundb Sep 30, 2025
f19e174
Fix nested prefetch
sundb Sep 30, 2025
d84ffbd
refine
sundb Sep 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion redis.conf
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ locale-collate ""
# Snapshotting can be completely disabled with a single empty string argument
# as in following example:
#
# save ""
save ""
#
# Unless specified otherwise, by default Redis will save the DB:
# * After 3600 seconds (an hour) if at least 1 change was performed
Expand Down Expand Up @@ -2139,6 +2139,9 @@ client-output-buffer-limit pubsub 32mb 8mb 60
#
# client-query-buffer-limit 1gb

# Defines how many commands in each client pipeline to decode and prefetch
# lookahead 16

# In some scenarios client connections can hog up memory leading to OOM
# errors or data eviction. To avoid this we can cap the accumulated memory
# used by all client connections (all pubsub and normal clients). Once we
Expand Down
12 changes: 12 additions & 0 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -1640,12 +1640,24 @@ int loadSingleAppendOnlyFile(char *filename) {
if (fakeClient->flags & CLIENT_MULTI &&
fakeClient->cmd->proc != execCommand)
{
/* queueMultiCommand requires a pendingCommand, so we create a "fake" one here
* for it to consume */
pendingCommand *pcmd = zmalloc(sizeof(pendingCommand));
initPendingCommand(pcmd);
addPengingCommand(&fakeClient->pending_cmds, pcmd);

pcmd->argc = argc;
pcmd->argv_len = argc;
pcmd->argv = argv;
pcmd->cmd = cmd;

/* Note: we don't have to attempt calling evalGetCommandFlags,
* since this is AOF, the checks in processCommand are not made
* anyway.*/
queueMultiCommand(fakeClient, cmd->flags);
} else {
cmd->proc(fakeClient);
fakeClient->all_argv_len_sum = 0; /* Otherwise no one cleans this up and we reach cleanup with it non-zero */
}

/* The fake client should not have a reply */
Expand Down
3 changes: 1 addition & 2 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,7 @@ void unblockClient(client *c, int queue_for_reprocessing) {
* call reqresAppendResponse here (for clients blocked on key,
* unblockClientOnKey is called, which eventually calls processCommand,
* which calls reqresAppendResponse) */
reqresAppendResponse(c);
resetClient(c);
prepareForNextCommand(c);
}

/* Clear the flags, and put the client in the unblocked list so that
Expand Down
16 changes: 11 additions & 5 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1113,7 +1113,8 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
robj *firstkey = NULL;
int multiple_keys = 0;
multiState *ms, _ms;
multiCmd mc;
pendingCommand mc;
pendingCommand *mcp = &mc;
int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0,
existing_keys = 0;
int pubsubshard_included = 0; /* Flag to indicate if a pubsub shard cmd is included. */
Expand Down Expand Up @@ -1141,8 +1142,11 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
* structure if the client is not in MULTI/EXEC state, this way
* we have a single codepath below. */
ms = &_ms;
_ms.commands = &mc;
_ms.commands = &mcp;
_ms.count = 1;

/* Properly initialize the fake pendingCommand */
initPendingCommand(&mc);
mc.argv = argv;
mc.argc = argc;
mc.cmd = cmd;
Expand All @@ -1156,9 +1160,11 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
int margc, numkeys, j;
keyReference *keyindex;

mcmd = ms->commands[i].cmd;
margc = ms->commands[i].argc;
margv = ms->commands[i].argv;
pendingCommand *pcmd = ms->commands[i];

mcmd = pcmd->cmd;
margc = pcmd->argc;
margv = pcmd->argv;

/* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */
if (!pubsubshard_included &&
Expand Down
1 change: 1 addition & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#define CLUSTER_SLOT_MASK_BITS 14 /* Number of bits used for slot id. */
#define CLUSTER_SLOTS (1<<CLUSTER_SLOT_MASK_BITS) /* Total number of slots in cluster mode, which is 16384. */
#define CLUSTER_SLOT_MASK ((unsigned long long)(CLUSTER_SLOTS - 1)) /* Bit mask for slot id stored in LSB. */
#define INVALID_CLUSTER_SLOT (-1) /* Invalid slot number. */
#define CLUSTER_OK 0 /* Everything looks ok */
#define CLUSTER_FAIL 1 /* The cluster can't work */
#define CLUSTER_NAMELEN 40 /* sha1 hex length */
Expand Down
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3217,6 +3217,7 @@ standardConfig static_configs[] = {
createIntConfig("shutdown-timeout", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.shutdown_timeout, 10, INTEGER_CONFIG, NULL, NULL),
createIntConfig("repl-diskless-sync-max-replicas", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_diskless_sync_max_replicas, 0, INTEGER_CONFIG, NULL, NULL),
createIntConfig("cluster-compatibility-sample-ratio", NULL, MODIFIABLE_CONFIG, 0, 100, server.cluster_compatibility_sample_ratio, 0, INTEGER_CONFIG, NULL, NULL),
createIntConfig("lookahead", NULL, MODIFIABLE_CONFIG, 1, INT_MAX, server.lookahead, REDIS_DEFAULT_LOOKAHEAD, INTEGER_CONFIG, NULL, NULL),

/* Unsigned int configs */
createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients),
Expand Down
3 changes: 2 additions & 1 deletion src/iothread.c
Original file line number Diff line number Diff line change
Expand Up @@ -467,10 +467,12 @@ int processClientsFromIOThread(IOThread *t) {
/* Process the pending command and input buffer. */
if (!c->read_error && c->io_flags & CLIENT_IO_PENDING_COMMAND) {
c->flags |= CLIENT_PENDING_COMMAND;
c->flags |= CLIENT_IN_PREFETCH;
if (processPendingCommandAndInputBuffer(c) == C_ERR) {
/* If the client is no longer valid, it must be freed safely. */
continue;
}
c->flags &= ~CLIENT_IN_PREFETCH;
}

/* We may have pending replies if io thread may not finish writing
Expand Down Expand Up @@ -729,7 +731,6 @@ void initThreadedIO(void) {
exit(1);
}

prefetchCommandsBatchInit();

/* Spawn and initialize the I/O threads. */
for (int i = 1; i < server.io_threads_num; i++) {
Expand Down
17 changes: 10 additions & 7 deletions src/memory_prefetch.c
Original file line number Diff line number Diff line change
Expand Up @@ -382,18 +382,21 @@ int addCommandToBatch(client *c) {

batch->clients[batch->client_count++] = c;

if (likely(c->iolookedcmd)) {
/* Get command's keys positions */
pendingCommand *pcmd = c->pending_cmds.head;
while (pcmd != NULL) {
if (pcmd->parsing_incomplete || !pcmd->cmd || pcmd->flags) break;

getKeysResult result = GETKEYS_RESULT_INIT;
int num_keys = getKeysFromCommand(c->iolookedcmd, c->argv, c->argc, &result);
for (int i = 0; i < num_keys && batch->key_count < batch->max_prefetch_size; i++) {
batch->keys[batch->key_count] = c->argv[result.keys[i].pos];
int numkeys = getKeysFromCommand(pcmd->cmd, pcmd->argv, pcmd->argc, &result);
for (int i = 0; i < numkeys && batch->key_count < batch->max_prefetch_size; i++) {
batch->keys[batch->key_count] = pcmd->argv[result.keys[i].pos];
batch->keys_dicts[batch->key_count] =
kvstoreGetDict(c->db->keys, c->slot > 0 ? c->slot : 0);
kvstoreGetDict(c->db->keys, pcmd->slot > 0 ? pcmd->slot : 0);
batch->key_count++;
}
getKeysFreeResult(&result);
}
pcmd = pcmd->next;
}

return C_OK;
}
18 changes: 14 additions & 4 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -674,11 +674,12 @@ void moduleReleaseTempClient(client *c) {
listEmpty(c->reply);
c->reply_bytes = 0;
c->duration = 0;
resetClient(c);
resetClient(c, -1);
serverAssert(c->all_argv_len_sum == 0);
c->bufpos = 0;
c->flags = CLIENT_MODULE;
c->user = NULL; /* Root user */
c->cmd = c->lastcmd = c->realcmd = c->iolookedcmd = NULL;
c->cmd = c->lastcmd = c->realcmd = NULL;
if (c->bstate.async_rm_call_handle) {
RedisModuleAsyncRMCallPromise *promise = c->bstate.async_rm_call_handle;
promise->c = NULL; /* Remove the client from the promise so it will no longer be possible to abort it. */
Expand Down Expand Up @@ -11035,12 +11036,21 @@ void moduleCallCommandFilters(client *c) {
}

/* If the filter sets a new command, including command or subcommand,
* the command looked up in IO threads will be invalid. */
c->iolookedcmd = NULL;
* the command looked up will be invalid. */
c->lookedcmd = NULL;

c->argv = filter.argv;
c->argv_len = filter.argv_len;
c->argc = filter.argc;

/* Update pending command if it exists. */
pendingCommand *pcmd = c->current_pending_cmd;
if (pcmd) {
pcmd->argv = filter.argv;
pcmd->argc = filter.argc;
pcmd->argv_len = filter.argv_len;
pcmd->cmd = NULL;
}
}

/* Return the number of arguments a filtered command has. The number of
Expand Down
70 changes: 39 additions & 31 deletions src/multi.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,19 @@ void initClientMultiState(client *c) {
c->mstate.cmd_inv_flags = 0;
c->mstate.argv_len_sums = 0;
c->mstate.alloc_count = 0;
c->mstate.executing_cmd = -1;
}

/* Release all the resources associated with MULTI/EXEC state */
void freeClientMultiState(client *c) {
int j;

for (j = 0; j < c->mstate.count; j++) {
int i;
multiCmd *mc = c->mstate.commands+j;

for (i = 0; i < mc->argc; i++)
decrRefCount(mc->argv[i]);
zfree(mc->argv);
for (int i = 0; i < c->mstate.count; i++) {
freePendingCommand(c, c->mstate.commands[i]);
}
zfree(c->mstate.commands);
}

/* Add a new command into the MULTI commands queue */
void queueMultiCommand(client *c, uint64_t cmd_flags) {
multiCmd *mc;

/* No sense to waste memory if the transaction is already aborted.
* this is useful in case client sends these in a pipeline, or doesn't
* bother to read previous responses and didn't notice the multi was already
Expand All @@ -49,29 +41,35 @@ void queueMultiCommand(client *c, uint64_t cmd_flags) {
if (c->mstate.count == 0) {
/* If a client is using multi/exec, assuming it is used to execute at least
* two commands. Hence, creating by default size of 2. */
c->mstate.commands = zmalloc(sizeof(multiCmd)*2);
c->mstate.commands = zmalloc(sizeof(pendingCommand*)*2);
c->mstate.alloc_count = 2;
}
if (c->mstate.count == c->mstate.alloc_count) {
c->mstate.alloc_count = c->mstate.alloc_count < INT_MAX/2 ? c->mstate.alloc_count*2 : INT_MAX;
c->mstate.commands = zrealloc(c->mstate.commands, sizeof(multiCmd)*(c->mstate.alloc_count));
c->mstate.commands = zrealloc(c->mstate.commands, sizeof(pendingCommand*)*(c->mstate.alloc_count));
}
mc = c->mstate.commands+c->mstate.count;
mc->cmd = c->cmd;
mc->argc = c->argc;
mc->argv = c->argv;
mc->argv_len = c->argv_len;

/* Move the pending command into the multi-state.
* We leave the empty list node in 'pending_cmds' for freeClientPendingCommands to clean up
* later, but set the value to NULL to indicate it has been moved out and should not be freed. */
pendingCommand *pcmd = popPendingCommandFromHead(&c->pending_cmds);
c->current_pending_cmd = NULL;
pendingCommand **mc = c->mstate.commands + c->mstate.count;
*mc = pcmd;

c->mstate.count++;
c->mstate.cmd_flags |= cmd_flags;
c->mstate.cmd_inv_flags |= ~cmd_flags;
c->mstate.argv_len_sums += c->argv_len_sum + sizeof(robj*)*c->argc;
c->mstate.argv_len_sums += (*mc)->argv_len_sum;
c->all_argv_len_sum -= (*mc)->argv_len_sum;

(*mc)->argv_len_sum = 0; /* This is no longer tracked through all_argv_len_sum, so we don't want */
/* to subtract it from there later. */

/* Reset the client's args since we copied them into the mstate and shouldn't
* reference them from c anymore. */
/* Reset the client's args since we moved them into the mstate and shouldn't
* reference them from 'c' anymore. */
c->argv = NULL;
c->argc = 0;
c->argv_len_sum = 0;
c->argv_len = 0;
}

Expand Down Expand Up @@ -129,6 +127,7 @@ void execCommand(client *c) {
int j;
robj **orig_argv;
int orig_argc, orig_argv_len;
size_t orig_all_argv_len_sum;
struct redisCommand *orig_cmd;

if (!(c->flags & CLIENT_MULTI)) {
Expand Down Expand Up @@ -172,12 +171,19 @@ void execCommand(client *c) {
orig_argv_len = c->argv_len;
orig_argc = c->argc;
orig_cmd = c->cmd;

/* Multi-state commands aren't tracked through all_argv_len_sum, so we don't want anything done while executing them to affect that field.
* Otherwise, we get inconsistencies and all_argv_len_sum doesn't go back to exactly 0 when the client is finished */
orig_all_argv_len_sum = c->all_argv_len_sum;

c->all_argv_len_sum = c->mstate.argv_len_sums;

addReplyArrayLen(c,c->mstate.count);
for (j = 0; j < c->mstate.count; j++) {
c->argc = c->mstate.commands[j].argc;
c->argv = c->mstate.commands[j].argv;
c->argv_len = c->mstate.commands[j].argv_len;
c->cmd = c->realcmd = c->mstate.commands[j].cmd;
c->argc = c->mstate.commands[j]->argc;
c->argv = c->mstate.commands[j]->argv;
c->argv_len = c->mstate.commands[j]->argv_len;
c->cmd = c->realcmd = c->mstate.commands[j]->cmd;

/* ACL permissions are also checked at the time of execution in case
* they were changed after the commands were queued. */
Expand Down Expand Up @@ -207,6 +213,7 @@ void execCommand(client *c) {
"This command is no longer allowed for the "
"following reason: %s", reason);
} else {
c->mstate.executing_cmd = j;
if (c->id == CLIENT_ID_AOF)
call(c,CMD_CALL_NONE);
else
Expand All @@ -216,10 +223,10 @@ void execCommand(client *c) {
}

/* Commands may alter argc/argv, restore mstate. */
c->mstate.commands[j].argc = c->argc;
c->mstate.commands[j].argv = c->argv;
c->mstate.commands[j].argv_len = c->argv_len;
c->mstate.commands[j].cmd = c->cmd;
c->mstate.commands[j]->argc = c->argc;
c->mstate.commands[j]->argv = c->argv;
c->mstate.commands[j]->argv_len = c->argv_len;
c->mstate.commands[j]->cmd = c->cmd;
}

// restore old DENY_BLOCKING value
Expand All @@ -230,6 +237,7 @@ void execCommand(client *c) {
c->argv_len = orig_argv_len;
c->argc = orig_argc;
c->cmd = c->realcmd = orig_cmd;
c->all_argv_len_sum = orig_all_argv_len_sum;
discardTransaction(c);

server.in_exec = 0;
Expand Down Expand Up @@ -485,6 +493,6 @@ size_t multiStateMemOverhead(client *c) {
/* Add watched keys overhead, Note: this doesn't take into account the watched keys themselves, because they aren't managed per-client. */
mem += listLength(c->watched_keys) * (sizeof(listNode) + sizeof(watchedKey));
/* Reserved memory for queued multi commands. */
mem += c->mstate.alloc_count * sizeof(multiCmd);
mem += c->mstate.alloc_count * sizeof(pendingCommand);
return mem;
}
Loading
Loading