Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
4361abf
init
sundb Sep 22, 2025
2efaa56
try 1
sundb Sep 22, 2025
c521961
try 2
sundb Sep 22, 2025
95e2306
revert some code
sundb Sep 23, 2025
dbd4b2b
try 3
sundb Sep 23, 2025
9d1b0c5
Remove unused code
sundb Sep 23, 2025
2f39505
aof.c
sundb Sep 23, 2025
2f868f1
multi
sundb Sep 23, 2025
665f66a
Rename queue to list
sundb Sep 23, 2025
a0aedeb
try
sundb Sep 23, 2025
e75f46b
try
sundb Sep 23, 2025
9675858
add config
sundb Sep 23, 2025
79dda7d
parse inline buffer
sundb Sep 23, 2025
5011ed0
improve prefetch
sundb Sep 23, 2025
cd9a6bf
try
sundb Sep 23, 2025
f507381
Remove input_bytes
sundb Sep 23, 2025
949eea7
reploff_next
sundb Sep 23, 2025
20a129e
Refine
sundb Sep 23, 2025
41956c1
Remove unused code
sundb Sep 23, 2025
3a5e393
Fix crash
sundb Sep 23, 2025
fc238ee
try
sundb Sep 24, 2025
4e2faca
format
sundb Sep 24, 2025
e848c6f
format
sundb Sep 24, 2025
78a7815
fix
sundb Sep 24, 2025
4a58eb2
fix aof
sundb Sep 24, 2025
beda1bb
comment out tests
sundb Sep 24, 2025
3d36763
Remove cmdpoll
sundb Sep 24, 2025
3f259c6
improve
sundb Sep 24, 2025
d793176
fix aofrw
sundb Sep 24, 2025
c5c3454
fix aofrw
sundb Sep 24, 2025
e636831
uncomment tests
sundb Sep 24, 2025
8479541
uncomment tests
sundb Sep 24, 2025
9bec761
fix cluster
sundb Sep 24, 2025
b28914a
uncomment tests
sundb Sep 24, 2025
75d43c7
uncomment tests
sundb Sep 24, 2025
3c2bb75
uncomment tests
sundb Sep 24, 2025
3b9dfcd
uncomment tests
sundb Sep 24, 2025
e7fdcea
uncomment tests
sundb Sep 24, 2025
f8caef5
uncomment tests
sundb Sep 24, 2025
df36ec4
fix redefine
sundb Sep 24, 2025
c55bb6b
uncomment tests
sundb Sep 24, 2025
7ad0310
uncomment tests
sundb Sep 24, 2025
3bbfcae
fix cluster test
sundb Sep 25, 2025
b12e693
fix clusterSlotStatsAddNetworkBytesInForUserClient()
sundb Sep 25, 2025
7b6471c
fix slot-stats test
sundb Sep 25, 2025
a51d54d
Fix nested prefetch
sundb Sep 28, 2025
05796da
Merge branch 'unstable' into cmd-lookahead
sundb Sep 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion redis.conf
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ locale-collate ""
# Snapshotting can be completely disabled with a single empty string argument
# as in following example:
#
# save ""
save ""
#
# Unless specified otherwise, by default Redis will save the DB:
# * After 3600 seconds (an hour) if at least 1 change was performed
Expand Down Expand Up @@ -2139,6 +2139,9 @@ client-output-buffer-limit pubsub 32mb 8mb 60
#
# client-query-buffer-limit 1gb

# Defines how many commands in each client pipeline to decode and prefetch
# lookahead 16

# In some scenarios client connections can hog up memory leading to OOM
# errors or data eviction. To avoid this we can cap the accumulated memory
# used by all client connections (all pubsub and normal clients). Once we
Expand Down
12 changes: 12 additions & 0 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -1640,12 +1640,24 @@ int loadSingleAppendOnlyFile(char *filename) {
if (fakeClient->flags & CLIENT_MULTI &&
fakeClient->cmd->proc != execCommand)
{
/* queueMultiCommand requires a pendingCommand, so we create a "fake" one here
* for it to consume */
pendingCommand *pcmd = zmalloc(sizeof(pendingCommand));
initPendingCommand(pcmd);
addPengingCommand(&fakeClient->pending_cmds, pcmd);

pcmd->argc = argc;
pcmd->argv_len = argc;
pcmd->argv = argv;
pcmd->cmd = cmd;

/* Note: we don't have to attempt calling evalGetCommandFlags,
* since this is AOF, the checks in processCommand are not made
* anyway.*/
queueMultiCommand(fakeClient, cmd->flags);
} else {
cmd->proc(fakeClient);
fakeClient->all_argv_len_sum = 0; /* Otherwise no one cleans this up and we reach cleanup with it non-zero */
}

/* The fake client should not have a reply */
Expand Down
3 changes: 1 addition & 2 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,7 @@ void unblockClient(client *c, int queue_for_reprocessing) {
* call reqresAppendResponse here (for clients blocked on key,
* unblockClientOnKey is called, which eventually calls processCommand,
* which calls reqresAppendResponse) */
reqresAppendResponse(c);
resetClient(c);
prepareForNextCommand(c);
}

/* Clear the flags, and put the client in the unblocked list so that
Expand Down
77 changes: 46 additions & 31 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1107,14 +1107,18 @@ void clusterCommand(client *c) {
*
* CLUSTER_REDIR_DOWN_STATE and CLUSTER_REDIR_DOWN_RO_STATE if the cluster is
* down but the user attempts to execute a command that addresses one or more keys. */
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, uint64_t cmd_flags, int *error_code) {
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv,
uint64_t cmd_flags, int *error_code, int *precalculated_slot, getKeysResult *keys_result)
{
clusterNode *myself = getMyClusterNode();
clusterNode *n = NULL;
robj *firstkey = NULL;
int multiple_keys = 0;
multiState *ms, _ms;
multiCmd mc;
int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0,
pendingCommand mc;
initPendingCommand(&mc);
pendingCommand *mcp = &mc;
int i, slot = CLUSTER_INVALID_SLOT, migrating_slot = 0, importing_slot = 0, missing_keys = 0,
existing_keys = 0;
int pubsubshard_included = 0; /* Flag to indicate if a pubsub shard cmd is included. */

Expand All @@ -1141,54 +1145,75 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
* structure if the client is not in MULTI/EXEC state, this way
* we have a single codepath below. */
ms = &_ms;
_ms.commands = &mc;
_ms.commands = &mcp;
_ms.count = 1;

/* Properly initialize the fake pendingCommand */
initPendingCommand(&mc);
mc.argv = argv;
mc.argc = argc;
mc.cmd = cmd;
mc.keys_result = *keys_result;

/* Always extract keys for other logic, but use pre-calculated slot if provided */
if (keys_result->numkeys >= 0) {
if (*precalculated_slot != CLUSTER_INVALID_SLOT) {
mc.slot = *precalculated_slot;
}
}
}

/* Check that all the keys are in the same hash slot, and obtain this
* slot and the node associated. */
for (i = 0; i < ms->count; i++) {
struct redisCommand *mcmd;
robj **margv;
int margc, numkeys, j;
keyReference *keyindex;
int j;

pendingCommand *pcmd = ms->commands[i];

mcmd = ms->commands[i].cmd;
margc = ms->commands[i].argc;
margv = ms->commands[i].argv;
mcmd = pcmd->cmd;
margv = pcmd->argv;

/* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */
if (!pubsubshard_included &&
doesCommandHaveChannelsWithFlags(mcmd, CMD_CHANNEL_PUBLISH | CMD_CHANNEL_SUBSCRIBE))
doesCommandHaveChannelsWithFlags(mcmd, CMD_CHANNEL_PUBLISH | CMD_CHANNEL_SUBSCRIBE) &&
mcmd->key_specs_num > 0)
{
pubsubshard_included = 1;
}

getKeysResult result = GETKEYS_RESULT_INIT;
numkeys = getKeysFromCommand(mcmd,margv,margc,&result);
keyindex = result.keys;
/* If this command has keys/channels and we already have a slot,
* check if this command's slot matches */
if (pcmd->keys_result.numkeys > 0 && slot != CLUSTER_INVALID_SLOT && pcmd->slot != slot) {
/* Error: commands operate on keys from different slots */
if (error_code)
*error_code = CLUSTER_REDIR_CROSS_SLOT;
return NULL;
}

for (j = 0; j < pcmd->keys_result.numkeys; j++) {
/* The command has keys and was checked for cross-slot between its keys in preprocessCommand() */
if (pcmd->slot == CLUSTER_INVALID_SLOT) {
/* Error: multiple keys from different slots. */
if (error_code)
*error_code = CLUSTER_REDIR_CROSS_SLOT;
return NULL;
}

for (j = 0; j < numkeys; j++) {
robj *thiskey = margv[keyindex[j].pos];
int thisslot = keyHashSlot((char*)thiskey->ptr,
sdslen(thiskey->ptr));
robj *thiskey = margv[pcmd->keys_result.keys[j].pos];

if (firstkey == NULL) {
/* This is the first key we see. Check what is the slot
* and node. */
firstkey = thiskey;
slot = thisslot;
slot = pcmd->slot;
n = getNodeBySlot(slot);

/* Error: If a slot is not served, we are in "cluster down"
* state. However the state is yet to be updated, so this was
* not trapped earlier in processCommand(). Report the same
* error to the client. */
if (n == NULL) {
getKeysFreeResult(&result);
if (error_code)
*error_code = CLUSTER_REDIR_DOWN_UNBOUND;
return NULL;
Expand All @@ -1207,15 +1232,6 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
importing_slot = 1;
}
} else {
/* If it is not the first key/channel, make sure it is exactly
* the same key/channel as the first we saw. */
if (slot != thisslot) {
/* Error: multiple keys from different slots. */
getKeysFreeResult(&result);
if (error_code)
*error_code = CLUSTER_REDIR_CROSS_SLOT;
return NULL;
}
if (importing_slot && !multiple_keys && !equalStringObjects(firstkey,thiskey)) {
/* Flag this request as one with multiple different
* keys/channels when the slot is in importing state. */
Expand All @@ -1236,7 +1252,6 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
else existing_keys++;
}
}
getKeysFreeResult(&result);
}

/* No key at all in command? then we can serve the request
Expand Down Expand Up @@ -1268,7 +1283,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
}

/* Return the hashslot by reference. */
if (hashslot) *hashslot = slot;
if (precalculated_slot) *precalculated_slot = slot;

/* MIGRATE always works in the context of the local node if the slot
* is open (migrating or importing state). We need to be able to freely
Expand Down
4 changes: 3 additions & 1 deletion src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#define CLUSTER_SLOT_MASK_BITS 14 /* Number of bits used for slot id. */
#define CLUSTER_SLOTS (1<<CLUSTER_SLOT_MASK_BITS) /* Total number of slots in cluster mode, which is 16384. */
#define CLUSTER_SLOT_MASK ((unsigned long long)(CLUSTER_SLOTS - 1)) /* Bit mask for slot id stored in LSB. */
#define CLUSTER_INVALID_SLOT (-1) /* Invalid slot number. */
#define CLUSTER_OK 0 /* Everything looks ok */
#define CLUSTER_FAIL 1 /* The cluster can't work */
#define CLUSTER_NAMELEN 40 /* sha1 hex length */
Expand Down Expand Up @@ -148,7 +149,8 @@ unsigned int countKeysInSlot(unsigned int slot);
int getSlotOrReply(client *c, robj *o);

/* functions with shared implementations */
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, uint64_t cmd_flags, int *error_code);
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv,
uint64_t cmd_flags, int *error_code, int *precalculated_slot, getKeysResult *keys_result);
int clusterRedirectBlockedClientIfNeeded(client *c);
void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code);
void migrateCloseTimedoutSockets(void);
Expand Down
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3217,6 +3217,7 @@ standardConfig static_configs[] = {
createIntConfig("shutdown-timeout", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.shutdown_timeout, 10, INTEGER_CONFIG, NULL, NULL),
createIntConfig("repl-diskless-sync-max-replicas", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_diskless_sync_max_replicas, 0, INTEGER_CONFIG, NULL, NULL),
createIntConfig("cluster-compatibility-sample-ratio", NULL, MODIFIABLE_CONFIG, 0, 100, server.cluster_compatibility_sample_ratio, 0, INTEGER_CONFIG, NULL, NULL),
createIntConfig("lookahead", NULL, MODIFIABLE_CONFIG, 1, INT_MAX, server.lookahead, REDIS_DEFAULT_LOOKAHEAD, INTEGER_CONFIG, NULL, NULL),

/* Unsigned int configs */
createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients),
Expand Down
58 changes: 58 additions & 0 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -2971,6 +2971,64 @@ int getChannelsFromCommand(struct redisCommand *cmd, robj **argv, int argc, getK
return 0;
}

/* Extract slot number from keys in a keys_result structure and return to caller.
* Returns CLUSTER_INVALID_SLOT if keys belong to different slots (cross-slot error),
* or if there are no keys.
*/
int extractSlotFromKeysResult(robj **argv, getKeysResult *keys_result) {
if (keys_result->numkeys == 0)
return CLUSTER_INVALID_SLOT;

if (!server.cluster_enabled)
return 0;

int first_slot = CLUSTER_INVALID_SLOT;
for (int j = 0; j < keys_result->numkeys; j++) {
robj *this_key = argv[keys_result->keys[j].pos];
int this_slot = (int)keyHashSlot((char*)this_key->ptr, sdslen(this_key->ptr));

if (first_slot == CLUSTER_INVALID_SLOT)
first_slot = this_slot;
else if (first_slot != this_slot) {
return CLUSTER_INVALID_SLOT;
}
}
return first_slot;
}

/* Extract keys/channels from a command and calculate the cluster slot.
* Returns the number of keys/channels extracted.
* The slot number is returned by reference into *slot.
* If is_incomplete is not NULL, it will be set for key extraction.
*
* This function handles both regular commands (keys) and sharded pubsub
* commands (channels), but excludes regular pubsub commands which don't
* have slots.
*/
int extractKeysAndSlot(struct redisCommand *cmd, robj **argv, int argc,
getKeysResult *result, int *slot) {
int num_keys = -1;

if (!doesCommandHaveChannelsWithFlags(cmd, CMD_CHANNEL_PUBLISH | CMD_CHANNEL_SUBSCRIBE)) {
num_keys = getKeysFromCommandWithSpecs(cmd, argv, argc, GET_KEYSPEC_DEFAULT,
result);
} else {
/* Only extract channels for commands that have key_specs (sharded pubsub).
* Regular pubsub commands (PUBLISH, SUBSCRIBE) don't have slots. */
if (cmd->key_specs_num > 0) {
num_keys = getChannelsFromCommand(cmd, argv, argc, result);
} else {
num_keys = 0;
}
}

*slot = CLUSTER_INVALID_SLOT;
if (num_keys >= 0)
*slot = extractSlotFromKeysResult(argv, result);

return num_keys;
}

/* The base case is to use the keys position as given in the command table
* (firstkey, lastkey, step).
* This function works only on command with the legacy_range_key_spec,
Expand Down
1 change: 0 additions & 1 deletion src/iothread.c
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,6 @@ void initThreadedIO(void) {
exit(1);
}

prefetchCommandsBatchInit();

/* Spawn and initialize the I/O threads. */
for (int i = 1; i < server.io_threads_num; i++) {
Expand Down
21 changes: 11 additions & 10 deletions src/memory_prefetch.c
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,9 @@ void prefetchCommands(void) {
*
* Returns C_OK if the command was added successfully, C_ERR otherwise. */
int addCommandToBatch(client *c) {
if (unlikely(!batch)) return C_ERR;
if (unlikely(!batch)) {
return C_ERR;
}

/* If the batch is full, process it.
* We also check the client count to handle cases where
Expand All @@ -382,18 +384,17 @@ int addCommandToBatch(client *c) {

batch->clients[batch->client_count++] = c;

if (likely(c->iolookedcmd)) {
/* Get command's keys positions */
getKeysResult result = GETKEYS_RESULT_INIT;
int num_keys = getKeysFromCommand(c->iolookedcmd, c->argv, c->argc, &result);
for (int i = 0; i < num_keys && batch->key_count < batch->max_prefetch_size; i++) {
batch->keys[batch->key_count] = c->argv[result.keys[i].pos];
pendingCommand *pcmd = c->pending_cmds.head;
while (pcmd != NULL) {
if (pcmd->parsing_incomplete) break;
for (int i = 0; i < pcmd->keys_result.numkeys && batch->key_count < batch->max_prefetch_size; i++) {
batch->keys[batch->key_count] = pcmd->argv[pcmd->keys_result.keys[i].pos];
batch->keys_dicts[batch->key_count] =
kvstoreGetDict(c->db->keys, c->slot > 0 ? c->slot : 0);
kvstoreGetDict(c->db->keys, pcmd->slot > 0 ? pcmd->slot : 0);
batch->key_count++;
}
getKeysFreeResult(&result);
}
pcmd = pcmd->next;
}

return C_OK;
}
16 changes: 9 additions & 7 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -674,11 +674,12 @@ void moduleReleaseTempClient(client *c) {
listEmpty(c->reply);
c->reply_bytes = 0;
c->duration = 0;
resetClient(c);
resetClient(c, -1);
serverAssert(c->all_argv_len_sum == 0);
c->bufpos = 0;
c->flags = CLIENT_MODULE;
c->user = NULL; /* Root user */
c->cmd = c->lastcmd = c->realcmd = c->iolookedcmd = NULL;
c->cmd = c->lastcmd = c->realcmd = NULL;
if (c->bstate.async_rm_call_handle) {
RedisModuleAsyncRMCallPromise *promise = c->bstate.async_rm_call_handle;
promise->c = NULL; /* Remove the client from the promise so it will no longer be possible to abort it. */
Expand Down Expand Up @@ -6656,7 +6657,12 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
c->flags &= ~(CLIENT_READONLY|CLIENT_ASKING);
c->flags |= ctx->client->flags & (CLIENT_READONLY|CLIENT_ASKING);
const uint64_t cmd_flags = getCommandFlags(c);
if (getNodeByQuery(c,c->cmd,c->argv,c->argc,NULL,cmd_flags,&error_code) !=
int hashslot = CLUSTER_INVALID_SLOT;
/* Calculate slot beforehand for modules */
getKeysResult keys_result = GETKEYS_RESULT_INIT;
extractKeysAndSlot(c->cmd, c->argv, c->argc,
&keys_result, &hashslot);
if (getNodeByQuery(c,c->cmd,c->argv,cmd_flags,&error_code,&hashslot, &keys_result) !=
getMyClusterNode())
{
sds msg = NULL;
Expand Down Expand Up @@ -11034,10 +11040,6 @@ void moduleCallCommandFilters(client *c) {
f->callback(&filter);
}

/* If the filter sets a new command, including command or subcommand,
* the command looked up in IO threads will be invalid. */
c->iolookedcmd = NULL;

c->argv = filter.argv;
c->argv_len = filter.argv_len;
c->argc = filter.argc;
Expand Down
Loading
Loading