Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
3e2e1ba
lookahead
sundb Sep 30, 2025
2481632
Fix complain
sundb Sep 30, 2025
742cb79
Revert code in processInputBuffer to avoid too many conflicts
sundb Oct 1, 2025
e0ff33a
Rename CLIENT_IN_PREFETCH to CLIENT_IN_MEMORY_PREFETCH
sundb Oct 1, 2025
77d7c5b
no longer use flags instead of read_error for pendingCommand
sundb Oct 1, 2025
4de9dd9
Update comment
sundb Oct 1, 2025
2094895
Remove unused code
sundb Oct 1, 2025
1222287
Remove unused code
sundb Oct 1, 2025
682b40a
Rename methods
sundb Oct 1, 2025
bcfcf93
Refine
sundb Oct 1, 2025
13dd132
Fix the issue that read_error wasn't handled correctly with io thread
sundb Oct 2, 2025
7bffad2
Remove consumePendingCommand()
sundb Oct 2, 2025
60be3f4
Fix mistake comment out
sundb Oct 2, 2025
16d2682
Move the calucation of lookahead outside of if
sundb Oct 2, 2025
f080aa6
Avoid the client calling the shutdownCommand again after unblocking a…
sundb Oct 3, 2025
1888ca1
Reset client for blocked shutdown client
sundb Oct 4, 2025
61d639a
Fix race condition for DefaultUser->flags
sundb Oct 4, 2025
77c2b72
Make user->flags atomic
sundb Oct 5, 2025
59ed640
Revert strchr()
sundb Oct 5, 2025
ed08a67
Add update_slot_stats argument to update cluster slot stats for prepa…
sundb Oct 5, 2025
4dd99d4
call clusterSlotStatsAddNetworkBytesInForUserClient() before resetCli…
sundb Oct 5, 2025
8cfae3f
use the slot from preprocess() in getNodeByQuery()
sundb Oct 8, 2025
5c9460a
add CLIENT_READ_CROSS_SLOT for isClientReadErrorFatal()
sundb Oct 8, 2025
02ede13
add CLIENT_READ_CROSS_SLOT for isClientReadErrorFatal()
sundb Oct 8, 2025
d205048
simplify the using of isClientReadErrorFatal()
sundb Oct 8, 2025
3911dc1
cache the key result int pending command
sundb Oct 9, 2025
7f8aabf
Smoking test
sundb Oct 9, 2025
2b01213
Fix client not being properly reset after shutdown cancellation
sundb Oct 9, 2025
c1ebab3
Cleanup
sundb Oct 9, 2025
4e6a357
Fix unnecessary memory prefetch when iothread is disabled
sundb Oct 9, 2025
0250415
Use cached getKeysResult for other places
sundb Oct 9, 2025
47a20c4
Update outdated comment
sundb Oct 9, 2025
3687e98
Merge remote-tracking branch 'origin/fix-unblock-shutdown' into comma…
sundb Oct 9, 2025
1ad4005
Change the way to get getKeysResult in preprocess()
sundb Oct 10, 2025
c046a62
delay the execution of preprocessCommand if possible
sundb Oct 10, 2025
5d69e99
Merge remote-tracking branch 'origin/unstable' into command-lookahead…
sundb Oct 10, 2025
a636e93
Remove unnecessary changes
sundb Oct 10, 2025
423cc20
Set pcmd->flags to 0 after calling command filter
sundb Oct 10, 2025
2f99832
Update src/server.h
sundb Oct 10, 2025
07e3ab5
Update src/server.h
sundb Oct 10, 2025
3cc52a6
spell
sundb Oct 10, 2025
04e383b
Merge remote-tracking branch 'origin/unstable' into command-lookahead…
sundb Oct 12, 2025
c5defde
Skip CLIENT_UNBLOCKED client for processInputBuffer()
sundb Oct 12, 2025
894938c
Remove unused code
sundb Oct 12, 2025
56a71bc
Fix wrongly last_cmd in preprocessCommand()
sundb Oct 12, 2025
ce3eb45
Add pending command pool
sundb Oct 14, 2025
199510b
Free pending command pool if idle for 2 secs
sundb Oct 14, 2025
e3cf556
Change the client's pending command pool to global pending command pool
sundb Oct 15, 2025
4e99598
Update comment
sundb Oct 16, 2025
efe3a39
Refine comment
sundb Oct 16, 2025
c547e8c
Incr the max size of pendimg command pool, and shrink it regular
sundb Oct 17, 2025
1d8c292
Simplify the code
sundb Oct 20, 2025
1cb020a
indentation
sundb Oct 20, 2025
2255dda
Merge remote-tracking branch 'origin/unstable' into command-lookahead…
sundb Oct 20, 2025
45d3cd4
code style
sundb Oct 20, 2025
7ca5db7
Defensively prevent the cmd pool from shrinking too small
sundb Oct 21, 2025
a0674a1
Apply suggestion from @ShooterIT
sundb Oct 21, 2025
a5eee02
Apply suggestion from @ShooterIT
sundb Oct 21, 2025
b97101d
Apply suggestion from @ShooterIT
sundb Oct 21, 2025
8984656
Avoid partial prefetch and incr client if success
sundb Oct 21, 2025
3f1c108
Fix the missing of handing the CLIENT_READ_CROSS_SLOT read error
sundb Oct 21, 2025
0c8c77c
Update src/networking.c
sundb Oct 21, 2025
6d34543
defer releasing pending command
sundb Oct 22, 2025
e6f606c
Refine
sundb Oct 22, 2025
8f7041e
Add comment
sundb Oct 22, 2025
f7c8e38
Add test for CROSSSLOT Keys
sundb Oct 22, 2025
a538c22
Add test for the expansion and shrinking of pending command pool
sundb Oct 22, 2025
b92cce8
Fix spell and test failure
sundb Oct 22, 2025
46850c2
Merge remote-tracking branch 'origin/unstable' into command-lookahead…
sundb Oct 22, 2025
0779f1f
eliminate CLIENT_IN_MEMORY_PREFETCH
sundb Oct 22, 2025
1804bbc
General deferred free object
sundb Oct 22, 2025
7b976d8
Fix wrongly c->deferred_objects_num++
sundb Oct 22, 2025
f156460
unnecessary change
sundb Oct 22, 2025
6a5d7eb
code style
sundb Oct 22, 2025
0d34e6a
Rename freeClientDeferredObject to freeDeferredObject to avoid confusion
sundb Oct 22, 2025
1a874f1
code review
sundb Oct 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions redis.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2174,6 +2174,9 @@ client-output-buffer-limit pubsub 32mb 8mb 60
#
# client-query-buffer-limit 1gb

# Defines how many commands in each client pipeline to decode and prefetch
# lookahead 16

# In some scenarios client connections can hog up memory leading to OOM
# errors or data eviction. To avoid this we can cap the accumulated memory
# used by all client connections (all pubsub and normal clients). Once we
Expand Down
35 changes: 17 additions & 18 deletions src/acl.c
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,7 @@ user *ACLCreateUser(const char *name, size_t namelen) {
if (raxFind(Users,(unsigned char*)name,namelen,NULL)) return NULL;
user *u = zmalloc(sizeof(*u));
u->name = sdsnewlen(name,namelen);
u->flags = USER_FLAG_DISABLED;
u->flags |= USER_FLAG_SANITIZE_PAYLOAD;
atomicSet(u->flags, USER_FLAG_DISABLED | USER_FLAG_SANITIZE_PAYLOAD);
u->passwords = listCreate();
u->acl_string = NULL;
listSetMatchMethod(u->passwords,ACLListMatchSds);
Expand Down Expand Up @@ -1289,22 +1288,18 @@ int ACLSetUser(user *u, const char *op, ssize_t oplen) {
if (oplen == -1) oplen = strlen(op);
if (oplen == 0) return C_OK; /* Empty string is a no-operation. */
if (!strcasecmp(op,"on")) {
u->flags |= USER_FLAG_ENABLED;
u->flags &= ~USER_FLAG_DISABLED;
atomicSet(u->flags, (u->flags | USER_FLAG_ENABLED) & ~USER_FLAG_DISABLED);
} else if (!strcasecmp(op,"off")) {
u->flags |= USER_FLAG_DISABLED;
u->flags &= ~USER_FLAG_ENABLED;
atomicSet(u->flags, (u->flags | USER_FLAG_DISABLED) & ~USER_FLAG_ENABLED);
} else if (!strcasecmp(op,"skip-sanitize-payload")) {
u->flags |= USER_FLAG_SANITIZE_PAYLOAD_SKIP;
u->flags &= ~USER_FLAG_SANITIZE_PAYLOAD;
atomicSet(u->flags, (u->flags | USER_FLAG_SANITIZE_PAYLOAD_SKIP) & ~USER_FLAG_SANITIZE_PAYLOAD);
} else if (!strcasecmp(op,"sanitize-payload")) {
u->flags &= ~USER_FLAG_SANITIZE_PAYLOAD_SKIP;
u->flags |= USER_FLAG_SANITIZE_PAYLOAD;
atomicSet(u->flags, (u->flags | USER_FLAG_SANITIZE_PAYLOAD) & ~USER_FLAG_SANITIZE_PAYLOAD_SKIP);
} else if (!strcasecmp(op,"nopass")) {
u->flags |= USER_FLAG_NOPASS;
atomicSet(u->flags, u->flags | USER_FLAG_NOPASS);
listEmpty(u->passwords);
} else if (!strcasecmp(op,"resetpass")) {
u->flags &= ~USER_FLAG_NOPASS;
atomicSet(u->flags, u->flags & ~USER_FLAG_NOPASS);
listEmpty(u->passwords);
} else if (op[0] == '>' || op[0] == '#') {
sds newpass;
Expand All @@ -1324,7 +1319,7 @@ int ACLSetUser(user *u, const char *op, ssize_t oplen) {
listAddNodeTail(u->passwords,newpass);
else
sdsfree(newpass);
u->flags &= ~USER_FLAG_NOPASS;
atomicSet(u->flags, u->flags & ~USER_FLAG_NOPASS);
} else if (op[0] == '<' || op[0] == '!') {
sds delpass;
if (op[0] == '<') {
Expand Down Expand Up @@ -1852,7 +1847,7 @@ int ACLUserCheckChannelPerm(user *u, sds channel, int is_pattern) {
* If the command fails an ACL check, idxptr will be to set to the first argv entry that
* causes the failure, either 0 if the command itself fails or the idx of the key/channel
* that causes the failure */
int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, int argc, int *idxptr) {
int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, int argc, getKeysResult *key_result, int *idxptr) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have two concerns that i'd like you to look into:

  1. maybe even if the command pre-processing it disabled, we can somehow let ACL, and Cluster share the same getkeys result, i.e. by using lazy creation of the result. i.e. the first one that needs it, gets it, and stores it so that the second one can use it. that is assuming the command pre-processing isn't always enabled.
  2. i don't recall how far we took this effort in ROF, but i understand we did take a different path (e.g. not passing it as an explicit argument to all these functions, and using a different way / flag to detect if it was computed or not). i don't mind merging your version when we get to it, replacing what we have in ROF (it'll be some extra work, but it'll be a one time effort). so i just wanna be sure what the approach you're taking here is also suitable there, and had only benefits, and no drawbacks.

listIter li;
listNode *ln;

Expand All @@ -1869,14 +1864,18 @@ int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, i
* calls to prevent duplicate lookups. */
aclKeyResultCache cache;
initACLKeyResultCache(&cache);
if (key_result) {
cache.keys = *key_result;
cache.keys_init = 1;
}

/* Check each selector sequentially */
listRewind(u->selectors,&li);
while((ln = listNext(&li))) {
aclSelector *s = (aclSelector *) listNodeValue(ln);
int acl_retval = ACLSelectorCheckCmd(s, cmd, argv, argc, &local_idxptr, &cache);
if (acl_retval == ACL_OK) {
cleanupACLKeyResultCache(&cache);
if (!key_result) cleanupACLKeyResultCache(&cache);
return ACL_OK;
}
if (acl_retval > relevant_error ||
Expand All @@ -1888,13 +1887,13 @@ int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, i
}

*idxptr = last_idx;
cleanupACLKeyResultCache(&cache);
if (!key_result) cleanupACLKeyResultCache(&cache);
return relevant_error;
}

/* High level API for checking if a client can execute the queued up command */
int ACLCheckAllPerm(client *c, int *idxptr) {
return ACLCheckAllUserCommandPerm(c->user, c->cmd, c->argv, c->argc, idxptr);
return ACLCheckAllUserCommandPerm(c->user, c->cmd, c->argv, c->argc, getClientCachedKeyResult(c), idxptr);
}

/* If 'new' can access all channels 'original' could then return NULL;
Expand Down Expand Up @@ -3144,7 +3143,7 @@ void aclCommand(client *c) {
}

int idx;
int result = ACLCheckAllUserCommandPerm(u, cmd, c->argv + 3, c->argc - 3, &idx);
int result = ACLCheckAllUserCommandPerm(u, cmd, c->argv + 3, c->argc - 3, NULL, &idx);
if (result != ACL_OK) {
sds err = getAclErrorMessage(result, u, cmd, c->argv[idx+3]->ptr, 1);
addReplyBulkSds(c, err);
Expand Down
12 changes: 12 additions & 0 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -1641,12 +1641,24 @@ int loadSingleAppendOnlyFile(char *filename) {
if (fakeClient->flags & CLIENT_MULTI &&
fakeClient->cmd->proc != execCommand)
{
/* queueMultiCommand requires a pendingCommand, so we create a "fake" one here
* for it to consume */
pendingCommand *pcmd = zmalloc(sizeof(pendingCommand));
initPendingCommand(pcmd);
addPendingCommand(&fakeClient->pending_cmds, pcmd);

pcmd->argc = argc;
pcmd->argv_len = argc;
pcmd->argv = argv;
pcmd->cmd = cmd;

/* Note: we don't have to attempt calling evalGetCommandFlags,
* since this is AOF, the checks in processCommand are not made
* anyway.*/
queueMultiCommand(fakeClient, cmd->flags);
} else {
cmd->proc(fakeClient);
fakeClient->all_argv_len_sum = 0; /* Otherwise no one cleans this up and we reach cleanup with it non-zero */
}

/* The fake client should not have a reply */
Expand Down
3 changes: 1 addition & 2 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ void processUnblockedClients(void) {
* call reqresAppendResponse here (for clients blocked on key,
* unblockClientOnKey is called, which eventually calls processCommand,
* which calls reqresAppendResponse) */
reqresAppendResponse(c);
resetClient(c);
prepareForNextCommand(c, 0);
}

if (c->flags & CLIENT_MODULE) {
Expand Down
82 changes: 68 additions & 14 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,31 @@ void clusterCommand(client *c) {
}
}

/* Extract slot number from keys in a keys_result structure and return to caller.
* Returns INVALID_CLUSTER_SLOT if keys belong to different slots (cross-slot error),
* or if there are no keys.
*/
int extractSlotFromKeysResult(robj **argv, getKeysResult *keys_result) {
if (keys_result->numkeys == 0)
return INVALID_CLUSTER_SLOT;

if (!server.cluster_enabled)
return 0;

int first_slot = INVALID_CLUSTER_SLOT;
for (int j = 0; j < keys_result->numkeys; j++) {
robj *this_key = argv[keys_result->keys[j].pos];
int this_slot = (int)keyHashSlot((char*)this_key->ptr, sdslen(this_key->ptr));

if (first_slot == INVALID_CLUSTER_SLOT)
first_slot = this_slot;
else if (first_slot != this_slot) {
return INVALID_CLUSTER_SLOT;
}
}
return first_slot;
}

/* Return the pointer to the cluster node that is able to serve the command.
* For the function to succeed the command should only target either:
*
Expand Down Expand Up @@ -1118,13 +1143,16 @@ void clusterCommand(client *c) {
*
* CLUSTER_REDIR_DOWN_STATE and CLUSTER_REDIR_DOWN_RO_STATE if the cluster is
* down but the user attempts to execute a command that addresses one or more keys. */
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, uint64_t cmd_flags, int *error_code) {
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot,
getKeysResult *keys_result, uint8_t read_error, uint64_t cmd_flags, int *error_code)
{
clusterNode *myself = getMyClusterNode();
clusterNode *n = NULL;
robj *firstkey = NULL;
int multiple_keys = 0;
multiState *ms, _ms;
multiCmd mc;
pendingCommand mc;
pendingCommand *mcp = &mc;
Comment on lines +1154 to +1155

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't see that we're using the pre-calculated slot number.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done with 8cfae3f

int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0,
existing_keys = 0;
int pubsubshard_included = 0; /* Flag to indicate if a pubsub shard cmd is included. */
Expand Down Expand Up @@ -1152,24 +1180,35 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
* structure if the client is not in MULTI/EXEC state, this way
* we have a single codepath below. */
ms = &_ms;
_ms.commands = &mc;
_ms.commands = &mcp;
_ms.count = 1;

/* Properly initialize the fake pendingCommand */
initPendingCommand(&mc);
mc.argv = argv;
mc.argc = argc;
mc.cmd = cmd;
mc.slot = hashslot ? *hashslot : INVALID_CLUSTER_SLOT;
mc.read_error = read_error;
if (keys_result) {
mc.keys_result = *keys_result;
mc.flags |= PENDING_CMD_KEYS_RESULT_VALID;
}
}

/* Check that all the keys are in the same hash slot, and obtain this
* slot and the node associated. */
for (i = 0; i < ms->count; i++) {
struct redisCommand *mcmd;
robj **margv;
int margc, numkeys, j;
int margc, j;
keyReference *keyindex;

mcmd = ms->commands[i].cmd;
margc = ms->commands[i].argc;
margv = ms->commands[i].argv;
pendingCommand *pcmd = ms->commands[i];

mcmd = pcmd->cmd;
margc = pcmd->argc;
margv = pcmd->argv;

/* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */
if (!pubsubshard_included &&
Expand All @@ -1178,14 +1217,29 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
pubsubshard_included = 1;
}

/* If we have a cached keys result from preprocessCommand(), use it.
* Otherwise, extract keys result. */
int use_cache_keys_result = pcmd->flags & PENDING_CMD_KEYS_RESULT_VALID;
getKeysResult result = GETKEYS_RESULT_INIT;
numkeys = getKeysFromCommand(mcmd,margv,margc,&result);
if (use_cache_keys_result)
result = pcmd->keys_result;
else
getKeysFromCommand(mcmd,margv,margc,&result);
keyindex = result.keys;

for (j = 0; j < numkeys; j++) {
for (j = 0; j < result.numkeys; j++) {
/* The command has keys and was checked for cross-slot between its keys in preprocessCommand() */
if (pcmd->read_error == CLIENT_READ_CROSS_SLOT) {
/* Error: multiple keys from different slots. */
if (error_code)
*error_code = CLUSTER_REDIR_CROSS_SLOT;
return NULL;
}

robj *thiskey = margv[keyindex[j].pos];
int thisslot = keyHashSlot((char*)thiskey->ptr,
sdslen(thiskey->ptr));
int thisslot = pcmd->slot;
if (thisslot == INVALID_CLUSTER_SLOT)
thisslot = keyHashSlot((char*)thiskey->ptr, sdslen(thiskey->ptr));

if (firstkey == NULL) {
/* This is the first key we see. Check what is the slot
Expand All @@ -1199,7 +1253,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
* not trapped earlier in processCommand(). Report the same
* error to the client. */
if (n == NULL) {
getKeysFreeResult(&result);
if (!use_cache_keys_result) getKeysFreeResult(&result);
if (error_code)
*error_code = CLUSTER_REDIR_DOWN_UNBOUND;
return NULL;
Expand All @@ -1222,7 +1276,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
* the same key/channel as the first we saw. */
if (slot != thisslot) {
/* Error: multiple keys from different slots. */
getKeysFreeResult(&result);
if (!use_cache_keys_result) getKeysFreeResult(&result);
if (error_code)
*error_code = CLUSTER_REDIR_CROSS_SLOT;
return NULL;
Expand All @@ -1247,7 +1301,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
else existing_keys++;
}
}
getKeysFreeResult(&result);
if (!use_cache_keys_result) getKeysFreeResult(&result);
}

/* No key at all in command? then we can serve the request
Expand Down
5 changes: 4 additions & 1 deletion src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#define CLUSTER_SLOT_MASK_BITS 14 /* Number of bits used for slot id. */
#define CLUSTER_SLOTS (1<<CLUSTER_SLOT_MASK_BITS) /* Total number of slots in cluster mode, which is 16384. */
#define CLUSTER_SLOT_MASK ((unsigned long long)(CLUSTER_SLOTS - 1)) /* Bit mask for slot id stored in LSB. */
#define INVALID_CLUSTER_SLOT (-1) /* Invalid slot number. */
#define CLUSTER_OK 0 /* Everything looks ok */
#define CLUSTER_FAIL 1 /* The cluster can't work */
#define CLUSTER_NAMELEN 40 /* sha1 hex length */
Expand Down Expand Up @@ -158,7 +159,9 @@ int clusterCanAccessKeysInSlot(int slot);
struct slotRangeArray *clusterGetLocalSlotRanges(void);

/* functions with shared implementations */
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, uint64_t cmd_flags, int *error_code);
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot,
getKeysResult *result, uint8_t read_error, uint64_t cmd_flags, int *error_code);
int extractSlotFromKeysResult(robj **argv, getKeysResult *keys_result);
int clusterRedirectBlockedClientIfNeeded(client *c);
void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code);
void migrateCloseTimedoutSockets(void);
Expand Down
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3218,6 +3218,7 @@ standardConfig static_configs[] = {
createIntConfig("repl-diskless-sync-max-replicas", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_diskless_sync_max_replicas, 0, INTEGER_CONFIG, NULL, NULL),
createIntConfig("cluster-compatibility-sample-ratio", NULL, MODIFIABLE_CONFIG, 0, 100, server.cluster_compatibility_sample_ratio, 0, INTEGER_CONFIG, NULL, NULL),
createIntConfig("cluster-slot-migration-max-archived-tasks", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 1, INT_MAX, server.asm_max_archived_tasks, 32, INTEGER_CONFIG, NULL, NULL),
createIntConfig("lookahead", NULL, MODIFIABLE_CONFIG, 1, INT_MAX, server.lookahead, REDIS_DEFAULT_LOOKAHEAD, INTEGER_CONFIG, NULL, NULL),

/* Unsigned int configs */
createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients),
Expand Down
34 changes: 33 additions & 1 deletion src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ static void dbSetValue(redisDb *db, robj *key, robj **valref, dictEntryLink link
* Besides, we never free a string object in BIO threads, so, even with
* lazyfree-lazy-server-del enabled, a fallback to main thread freeing
* due to defer free failure doesn't go against the config intention. */
tryDeferFreeClientObject(server.current_client, old);
tryDeferFreeClientObject(server.current_client, DEFERRED_OBJECT_TYPE_ROBJ, old);
} else if (server.lazyfree_lazy_server_del) {
freeObjAsync(key, old, db->id);
} else {
Expand Down Expand Up @@ -3134,6 +3134,38 @@ int getChannelsFromCommand(struct redisCommand *cmd, robj **argv, int argc, getK
return 0;
}

/* Extract keys/channels from a command and calculate the cluster slot.
* Returns the number of keys/channels extracted.
* The slot number is returned by reference into *slot.
* If is_incomplete is not NULL, it will be set for key extraction.
*
* This function handles both regular commands (keys) and sharded pubsub
* commands (channels), but excludes regular pubsub commands which don't
* have slots.
*/
int extractKeysAndSlot(struct redisCommand *cmd, robj **argv, int argc,
getKeysResult *result, int *slot) {
int num_keys = -1;

if (!doesCommandHaveChannelsWithFlags(cmd, CMD_CHANNEL_PUBLISH | CMD_CHANNEL_SUBSCRIBE)) {
num_keys = getKeysFromCommandWithSpecs(cmd, argv, argc, GET_KEYSPEC_DEFAULT, result);
} else {
/* Only extract channels for commands that have key_specs (sharded pubsub).
* Regular pubsub commands (PUBLISH, SUBSCRIBE) don't have slots. */
if (cmd->key_specs_num > 0) {
num_keys = getChannelsFromCommand(cmd, argv, argc, result);
} else {
num_keys = 0;
}
}

*slot = INVALID_CLUSTER_SLOT;
if (num_keys >= 0)
*slot = extractSlotFromKeysResult(argv, result);

return num_keys;
}

/* The base case is to use the keys position as given in the command table
* (firstkey, lastkey, step).
* This function works only on command with the legacy_range_key_spec,
Expand Down
Loading
Loading