diff --git a/redis.conf b/redis.conf index 5d2b27ffbae..20e9a428406 100644 --- a/redis.conf +++ b/redis.conf @@ -433,7 +433,7 @@ locale-collate "" # Snapshotting can be completely disabled with a single empty string argument # as in following example: # -# save "" +save "" # # Unless specified otherwise, by default Redis will save the DB: # * After 3600 seconds (an hour) if at least 1 change was performed @@ -2139,6 +2139,9 @@ client-output-buffer-limit pubsub 32mb 8mb 60 # # client-query-buffer-limit 1gb +# Defines how many commands in each client pipeline to decode and prefetch +# lookahead 16 + # In some scenarios client connections can hog up memory leading to OOM # errors or data eviction. To avoid this we can cap the accumulated memory # used by all client connections (all pubsub and normal clients). Once we diff --git a/src/aof.c b/src/aof.c index 8a9be94b61a..e0571290a5a 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1640,12 +1640,24 @@ int loadSingleAppendOnlyFile(char *filename) { if (fakeClient->flags & CLIENT_MULTI && fakeClient->cmd->proc != execCommand) { + /* queueMultiCommand requires a pendingCommand, so we create a "fake" one here + * for it to consume */ + pendingCommand *pcmd = zmalloc(sizeof(pendingCommand)); + initPendingCommand(pcmd); + addPengingCommand(&fakeClient->pending_cmds, pcmd); + + pcmd->argc = argc; + pcmd->argv_len = argc; + pcmd->argv = argv; + pcmd->cmd = cmd; + /* Note: we don't have to attempt calling evalGetCommandFlags, * since this is AOF, the checks in processCommand are not made * anyway.*/ queueMultiCommand(fakeClient, cmd->flags); } else { cmd->proc(fakeClient); + fakeClient->all_argv_len_sum = 0; /* Otherwise no one cleans this up and we reach cleanup with it non-zero */ } /* The fake client should not have a reply */ diff --git a/src/blocked.c b/src/blocked.c index 8d15f9de3de..238a0aacc21 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -199,8 +199,7 @@ void unblockClient(client *c, int queue_for_reprocessing) { * call reqresAppendResponse here (for clients blocked on key, * unblockClientOnKey is called, which eventually calls processCommand, * which calls reqresAppendResponse) */ - reqresAppendResponse(c); - resetClient(c); + prepareForNextCommand(c); } /* Clear the flags, and put the client in the unblocked list so that diff --git a/src/cluster.c b/src/cluster.c index 2f861b5c2fa..225d149b92b 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1113,7 +1113,8 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in robj *firstkey = NULL; int multiple_keys = 0; multiState *ms, _ms; - multiCmd mc; + pendingCommand mc; + pendingCommand *mcp = &mc; int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0, existing_keys = 0; int pubsubshard_included = 0; /* Flag to indicate if a pubsub shard cmd is included. */ @@ -1141,8 +1142,11 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in * structure if the client is not in MULTI/EXEC state, this way * we have a single codepath below. */ ms = &_ms; - _ms.commands = &mc; + _ms.commands = &mcp; _ms.count = 1; + + /* Properly initialize the fake pendingCommand */ + initPendingCommand(&mc); mc.argv = argv; mc.argc = argc; mc.cmd = cmd; @@ -1156,9 +1160,11 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in int margc, numkeys, j; keyReference *keyindex; - mcmd = ms->commands[i].cmd; - margc = ms->commands[i].argc; - margv = ms->commands[i].argv; + pendingCommand *pcmd = ms->commands[i]; + + mcmd = pcmd->cmd; + margc = pcmd->argc; + margv = pcmd->argv; /* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */ if (!pubsubshard_included && diff --git a/src/cluster.h b/src/cluster.h index 18b5bb46558..e9f4df7553d 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -22,6 +22,7 @@ #define CLUSTER_SLOT_MASK_BITS 14 /* Number of bits used for slot id. */ #define CLUSTER_SLOTS (1<read_error && c->io_flags & CLIENT_IO_PENDING_COMMAND) { c->flags |= CLIENT_PENDING_COMMAND; + c->flags |= CLIENT_IN_PREFETCH; if (processPendingCommandAndInputBuffer(c) == C_ERR) { /* If the client is no longer valid, it must be freed safely. */ continue; } + c->flags &= ~CLIENT_IN_PREFETCH; } /* We may have pending replies if io thread may not finish writing @@ -729,7 +731,6 @@ void initThreadedIO(void) { exit(1); } - prefetchCommandsBatchInit(); /* Spawn and initialize the I/O threads. */ for (int i = 1; i < server.io_threads_num; i++) { diff --git a/src/memory_prefetch.c b/src/memory_prefetch.c index 8f3f77ef2d6..00d06222d69 100644 --- a/src/memory_prefetch.c +++ b/src/memory_prefetch.c @@ -382,18 +382,21 @@ int addCommandToBatch(client *c) { batch->clients[batch->client_count++] = c; - if (likely(c->iolookedcmd)) { - /* Get command's keys positions */ + pendingCommand *pcmd = c->pending_cmds.head; + while (pcmd != NULL) { + if (pcmd->parsing_incomplete || !pcmd->cmd || pcmd->flags) break; + getKeysResult result = GETKEYS_RESULT_INIT; - int num_keys = getKeysFromCommand(c->iolookedcmd, c->argv, c->argc, &result); - for (int i = 0; i < num_keys && batch->key_count < batch->max_prefetch_size; i++) { - batch->keys[batch->key_count] = c->argv[result.keys[i].pos]; + int numkeys = getKeysFromCommand(pcmd->cmd, pcmd->argv, pcmd->argc, &result); + for (int i = 0; i < numkeys && batch->key_count < batch->max_prefetch_size; i++) { + batch->keys[batch->key_count] = pcmd->argv[result.keys[i].pos]; batch->keys_dicts[batch->key_count] = - kvstoreGetDict(c->db->keys, c->slot > 0 ? c->slot : 0); + kvstoreGetDict(c->db->keys, pcmd->slot > 0 ? pcmd->slot : 0); batch->key_count++; } getKeysFreeResult(&result); - } + pcmd = pcmd->next; + } return C_OK; } diff --git a/src/module.c b/src/module.c index ab8cafb191a..1a6360f1a55 100644 --- a/src/module.c +++ b/src/module.c @@ -674,11 +674,12 @@ void moduleReleaseTempClient(client *c) { listEmpty(c->reply); c->reply_bytes = 0; c->duration = 0; - resetClient(c); + resetClient(c, -1); + serverAssert(c->all_argv_len_sum == 0); c->bufpos = 0; c->flags = CLIENT_MODULE; c->user = NULL; /* Root user */ - c->cmd = c->lastcmd = c->realcmd = c->iolookedcmd = NULL; + c->cmd = c->lastcmd = c->realcmd = NULL; if (c->bstate.async_rm_call_handle) { RedisModuleAsyncRMCallPromise *promise = c->bstate.async_rm_call_handle; promise->c = NULL; /* Remove the client from the promise so it will no longer be possible to abort it. */ @@ -11035,12 +11036,21 @@ void moduleCallCommandFilters(client *c) { } /* If the filter sets a new command, including command or subcommand, - * the command looked up in IO threads will be invalid. */ - c->iolookedcmd = NULL; + * the command looked up will be invalid. */ + c->lookedcmd = NULL; c->argv = filter.argv; c->argv_len = filter.argv_len; c->argc = filter.argc; + + /* Update pending command if it exists. */ + pendingCommand *pcmd = c->current_pending_cmd; + if (pcmd) { + pcmd->argv = filter.argv; + pcmd->argc = filter.argc; + pcmd->argv_len = filter.argv_len; + pcmd->cmd = NULL; + } } /* Return the number of arguments a filtered command has. The number of diff --git a/src/multi.c b/src/multi.c index 8c5ec6f99e2..7bd4f259336 100644 --- a/src/multi.c +++ b/src/multi.c @@ -19,27 +19,19 @@ void initClientMultiState(client *c) { c->mstate.cmd_inv_flags = 0; c->mstate.argv_len_sums = 0; c->mstate.alloc_count = 0; + c->mstate.executing_cmd = -1; } /* Release all the resources associated with MULTI/EXEC state */ void freeClientMultiState(client *c) { - int j; - - for (j = 0; j < c->mstate.count; j++) { - int i; - multiCmd *mc = c->mstate.commands+j; - - for (i = 0; i < mc->argc; i++) - decrRefCount(mc->argv[i]); - zfree(mc->argv); + for (int i = 0; i < c->mstate.count; i++) { + freePendingCommand(c, c->mstate.commands[i]); } zfree(c->mstate.commands); } /* Add a new command into the MULTI commands queue */ void queueMultiCommand(client *c, uint64_t cmd_flags) { - multiCmd *mc; - /* No sense to waste memory if the transaction is already aborted. * this is useful in case client sends these in a pipeline, or doesn't * bother to read previous responses and didn't notice the multi was already @@ -49,29 +41,35 @@ void queueMultiCommand(client *c, uint64_t cmd_flags) { if (c->mstate.count == 0) { /* If a client is using multi/exec, assuming it is used to execute at least * two commands. Hence, creating by default size of 2. */ - c->mstate.commands = zmalloc(sizeof(multiCmd)*2); + c->mstate.commands = zmalloc(sizeof(pendingCommand*)*2); c->mstate.alloc_count = 2; } if (c->mstate.count == c->mstate.alloc_count) { c->mstate.alloc_count = c->mstate.alloc_count < INT_MAX/2 ? c->mstate.alloc_count*2 : INT_MAX; - c->mstate.commands = zrealloc(c->mstate.commands, sizeof(multiCmd)*(c->mstate.alloc_count)); + c->mstate.commands = zrealloc(c->mstate.commands, sizeof(pendingCommand*)*(c->mstate.alloc_count)); } - mc = c->mstate.commands+c->mstate.count; - mc->cmd = c->cmd; - mc->argc = c->argc; - mc->argv = c->argv; - mc->argv_len = c->argv_len; + + /* Move the pending command into the multi-state. + * We leave the empty list node in 'pending_cmds' for freeClientPendingCommands to clean up + * later, but set the value to NULL to indicate it has been moved out and should not be freed. */ + pendingCommand *pcmd = popPendingCommandFromHead(&c->pending_cmds); + c->current_pending_cmd = NULL; + pendingCommand **mc = c->mstate.commands + c->mstate.count; + *mc = pcmd; c->mstate.count++; c->mstate.cmd_flags |= cmd_flags; c->mstate.cmd_inv_flags |= ~cmd_flags; - c->mstate.argv_len_sums += c->argv_len_sum + sizeof(robj*)*c->argc; + c->mstate.argv_len_sums += (*mc)->argv_len_sum; + c->all_argv_len_sum -= (*mc)->argv_len_sum; + + (*mc)->argv_len_sum = 0; /* This is no longer tracked through all_argv_len_sum, so we don't want */ + /* to subtract it from there later. */ - /* Reset the client's args since we copied them into the mstate and shouldn't - * reference them from c anymore. */ + /* Reset the client's args since we moved them into the mstate and shouldn't + * reference them from 'c' anymore. */ c->argv = NULL; c->argc = 0; - c->argv_len_sum = 0; c->argv_len = 0; } @@ -129,6 +127,7 @@ void execCommand(client *c) { int j; robj **orig_argv; int orig_argc, orig_argv_len; + size_t orig_all_argv_len_sum; struct redisCommand *orig_cmd; if (!(c->flags & CLIENT_MULTI)) { @@ -172,12 +171,19 @@ void execCommand(client *c) { orig_argv_len = c->argv_len; orig_argc = c->argc; orig_cmd = c->cmd; + + /* Multi-state commands aren't tracked through all_argv_len_sum, so we don't want anything done while executing them to affect that field. + * Otherwise, we get inconsistencies and all_argv_len_sum doesn't go back to exactly 0 when the client is finished */ + orig_all_argv_len_sum = c->all_argv_len_sum; + + c->all_argv_len_sum = c->mstate.argv_len_sums; + addReplyArrayLen(c,c->mstate.count); for (j = 0; j < c->mstate.count; j++) { - c->argc = c->mstate.commands[j].argc; - c->argv = c->mstate.commands[j].argv; - c->argv_len = c->mstate.commands[j].argv_len; - c->cmd = c->realcmd = c->mstate.commands[j].cmd; + c->argc = c->mstate.commands[j]->argc; + c->argv = c->mstate.commands[j]->argv; + c->argv_len = c->mstate.commands[j]->argv_len; + c->cmd = c->realcmd = c->mstate.commands[j]->cmd; /* ACL permissions are also checked at the time of execution in case * they were changed after the commands were queued. */ @@ -207,6 +213,7 @@ void execCommand(client *c) { "This command is no longer allowed for the " "following reason: %s", reason); } else { + c->mstate.executing_cmd = j; if (c->id == CLIENT_ID_AOF) call(c,CMD_CALL_NONE); else @@ -216,10 +223,10 @@ void execCommand(client *c) { } /* Commands may alter argc/argv, restore mstate. */ - c->mstate.commands[j].argc = c->argc; - c->mstate.commands[j].argv = c->argv; - c->mstate.commands[j].argv_len = c->argv_len; - c->mstate.commands[j].cmd = c->cmd; + c->mstate.commands[j]->argc = c->argc; + c->mstate.commands[j]->argv = c->argv; + c->mstate.commands[j]->argv_len = c->argv_len; + c->mstate.commands[j]->cmd = c->cmd; } // restore old DENY_BLOCKING value @@ -230,6 +237,7 @@ void execCommand(client *c) { c->argv_len = orig_argv_len; c->argc = orig_argc; c->cmd = c->realcmd = orig_cmd; + c->all_argv_len_sum = orig_all_argv_len_sum; discardTransaction(c); server.in_exec = 0; @@ -485,6 +493,6 @@ size_t multiStateMemOverhead(client *c) { /* Add watched keys overhead, Note: this doesn't take into account the watched keys themselves, because they aren't managed per-client. */ mem += listLength(c->watched_keys) * (sizeof(listNode) + sizeof(watchedKey)); /* Reserved memory for queued multi commands. */ - mem += c->mstate.alloc_count * sizeof(multiCmd); + mem += c->mstate.alloc_count * sizeof(pendingCommand); return mem; } diff --git a/src/networking.c b/src/networking.c index 9f4fec0d71b..a4a6752e4f5 100644 --- a/src/networking.c +++ b/src/networking.c @@ -19,6 +19,7 @@ #include "script.h" #include "fpconv_dtoa.h" #include "fmtargs.h" +#include "memory_prefetch.h" #include #include #include @@ -32,11 +33,14 @@ static inline int _clientHasPendingRepliesSlave(client *c); static inline int _clientHasPendingRepliesNonSlave(client *c); static inline int _writeToClientNonSlave(client *c, ssize_t *nwritten); static inline int _writeToClientSlave(client *c, ssize_t *nwritten); +static int consumePendingCommand(client *c); int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */ __thread sds thread_reusable_qb = NULL; __thread int thread_reusable_qb_used = 0; /* Avoid multiple clients using reusable query * buffer due to nested command execution. */ +/* COMMAND_QUEUE_MIN_CAPACITY no longer needed with linked list implementation */ + /* Return the size consumed from the allocator, for the specified SDS string, * including internal fragmentation. This function is used in order to compute * the client output buffer size. */ @@ -162,12 +166,15 @@ client *createClient(connection *conn) { c->argc = 0; c->argv = NULL; c->argv_len = 0; - c->argv_len_sum = 0; + c->all_argv_len_sum = 0; + c->pending_cmds.head = c->pending_cmds.tail = NULL; + c->pending_cmds.len = c->pending_cmds.ready_len = 0; + c->current_pending_cmd = NULL; c->original_argc = 0; c->original_argv = NULL; c->deferred_objects = NULL; c->deferred_objects_num = 0; - c->cmd = c->lastcmd = c->realcmd = c->iolookedcmd = NULL; + c->cmd = c->lastcmd = c->realcmd = c->lookedcmd = NULL; c->cur_script = NULL; c->multibulklen = 0; c->bulklen = -1; @@ -183,6 +190,7 @@ client *createClient(connection *conn) { c->replstate = REPL_STATE_NONE; c->repl_start_cmd_stream_on_ack = 0; c->reploff = 0; + c->reploff_next = 0; c->read_reploff = 0; c->repl_applied = 0; c->repl_ack_off = 0; @@ -1528,8 +1536,7 @@ static inline void freeClientArgvInternal(client *c, int free_argv) { } c->argc = 0; c->cmd = NULL; - c->iolookedcmd = NULL; - c->argv_len_sum = 0; + c->lookedcmd = NULL; if (free_argv) { c->argv_len = 0; zfree(c->argv); @@ -1541,6 +1548,18 @@ void freeClientArgv(client *c) { freeClientArgvInternal(c, 1); } +void freeClientPendingCommands(client *c, int num_pcmds_to_free) { + /* (-1) means free all pending commands */ + if (num_pcmds_to_free == -1) + num_pcmds_to_free = c->pending_cmds.len; + + while (num_pcmds_to_free--) { + pendingCommand *pcmd = popPendingCommandFromHead(&c->pending_cmds); + serverAssert(pcmd); + freePendingCommand(c, pcmd); + } +} + /* Close all the slaves connections. This is useful in chained replication * when we resync with our own master and want to force all our slaves to * resync with us as well. */ @@ -1640,6 +1659,12 @@ void unlinkClient(client *c) { c->flags &= ~CLIENT_UNBLOCKED; } + freeClientPendingCommands(c, -1); + c->argv_len = 0; + c->argv = NULL; + c->argc = 0; + c->cmd = NULL; + /* Clear the tracking status. */ if (c->flags & CLIENT_TRACKING) disableTracking(c); } @@ -1821,7 +1846,6 @@ void freeClient(client *c) { listRelease(c->reply); zfree(c->buf); freeReplicaReferencedReplBuffer(c); - freeClientArgv(c); freeClientOriginalArgv(c); freeClientDeferredObjects(c, 1); if (c->deferred_reply_errors) @@ -1838,9 +1862,15 @@ void freeClient(client *c) { /* Unlink the client: this will close the socket, remove the I/O * handlers, and remove references of the client from different - * places where active clients may be referenced. */ + * places where active clients may be referenced. + * This will also clean all remaining pending commands in the client, + * as they are no longer valid. + */ unlinkClient(c); + freeClientMultiState(c); + serverAssert(c->pending_cmds.len == 0); + /* Master/slave cleanup Case 1: * we lost the connection with a slave. */ if (c->flags & CLIENT_SLAVE) { @@ -1895,7 +1925,7 @@ void freeClient(client *c) { if (c->name) decrRefCount(c->name); if (c->lib_name) decrRefCount(c->lib_name); if (c->lib_ver) decrRefCount(c->lib_ver); - freeClientMultiState(c); + serverAssert(c->all_argv_len_sum == 0); sdsfree(c->peerid); sdsfree(c->sockname); sdsfree(c->slave_addr); @@ -2282,14 +2312,35 @@ int handleClientsWithPendingWrites(void) { return processed; } -static inline void resetClientInternal(client *c, int free_argv) { - redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL; - - freeClientArgvInternal(c, free_argv); - c->cur_script = NULL; +/* Prepare the client for the parsing of the next command. */ +void resetClientQbufState(client *c) { c->reqtype = 0; c->multibulklen = 0; c->bulklen = -1; +} + +static inline void resetClientInternal(client *c, int num_pcmds_to_free) { + redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL; + + /* We may get here with no pending commands but with an argv that needs freeing. + * An example is in the case of modules (RM_Call) */ + if (c->current_pending_cmd) { + freeClientPendingCommands(c, num_pcmds_to_free); + if (c->pending_cmds.len == 0) + serverAssert(c->all_argv_len_sum == 0); + c->current_pending_cmd = NULL; + } else if (c->argv) { + freeClientArgvInternal(c, 1 /* free_argv */); + /* If we're dealing with a client that doesn't create pendingCommand structs (e.g.: a Lua client), + * clear the all_argv_len_sum counter so we don't get to freeing the client with it non-zero. */ + c->all_argv_len_sum = 0; + } + + c->argc = 0; + c->cmd = NULL; + c->argv_len = 0; + c->argv = NULL; + c->cur_script = NULL; c->slot = -1; c->cluster_compatibility_check_slot = -2; c->flags &= ~CLIENT_EXECUTING_COMMAND; @@ -2329,8 +2380,8 @@ static inline void resetClientInternal(client *c, int free_argv) { } /* resetClient prepare the client to process the next command */ -void resetClient(client *c) { - resetClientInternal(c, 1); +void resetClient(client *c, int num_pcmds_to_free) { + resetClientInternal(c, num_pcmds_to_free); } /* This function is used when we want to re-enter the event loop but there @@ -2373,14 +2424,14 @@ void unprotectClient(client *c) { * have a well formed command. The function also returns C_ERR when there is * a protocol error: in such a case the client structure is setup to reply * with the error and close the connection. */ -int processInlineBuffer(client *c) { +int parseInlineBuffer(client *c, pendingCommand *pcmd) { char *newline; int argc, j, linefeed_chars = 1; sds *argv, aux; size_t querylen; /* Search for end of line */ - newline = strchr(c->querybuf+c->qb_pos,'\n'); + newline = memchr(c->querybuf+c->qb_pos,'\n',sdslen(c->querybuf) - c->qb_pos); /* Nothing to do without a \r\n */ if (newline == NULL) { @@ -2428,20 +2479,18 @@ int processInlineBuffer(client *c) { /* Setup argv array on client structure */ if (argc) { - /* Create new argv if space is insufficient. */ - if (unlikely(argc > c->argv_len)) { - zfree(c->argv); - c->argv = zmalloc(sizeof(robj*)*argc); - c->argv_len = argc; - } - c->argv_len_sum = 0; + zfree(pcmd->argv); + pcmd->argv = zmalloc(sizeof(robj*)*argc); + pcmd->argv_len = argc; + pcmd->argv_len_sum = 0; } /* Create redis objects for all arguments. */ - for (c->argc = 0, j = 0; j < argc; j++) { - c->argv[c->argc] = createObject(OBJ_STRING,argv[j]); - c->argc++; - c->argv_len_sum += sdslen(argv[j]); + for (pcmd->argc = 0, j = 0; j < argc; j++) { + pcmd->argv[pcmd->argc] = createObject(OBJ_STRING,argv[j]); + pcmd->argc++; + pcmd->argv_len_sum += sdslen(argv[j]); + c->all_argv_len_sum += sdslen(argv[j]); } zfree(argv); @@ -2458,7 +2507,7 @@ int processInlineBuffer(client *c) { * Command) SET key value * Inline) SET key value\r\n */ - c->net_input_bytes_curr_cmd = (c->argv_len_sum + (c->argc - 1) + 2); + pcmd->input_bytes = (pcmd->argv_len_sum + (pcmd->argc - 1) + 2); return C_OK; } @@ -2496,32 +2545,21 @@ static void setProtocolError(const char *errstr, client *c) { c->flags |= (CLIENT_CLOSE_AFTER_REPLY|CLIENT_PROTOCOL_ERROR); } -/* Process the query buffer for client 'c', setting up the client argument - * vector for command execution. Returns C_OK if after running the function - * the client has a well-formed ready to be processed command, otherwise - * C_ERR if there is still to read more buffer to get the full command. - * The function also returns C_ERR when there is a protocol error: in such a - * case the client structure is setup to reply with the error and close - * the connection. - * - * This function is called if processInputBuffer() detects that the next - * command is in RESP format, so the first byte in the command is found - * to be '*'. Otherwise for inline commands processInlineBuffer() is called. */ -int processMultibulkBuffer(client *c) { +static int parseMultibulk(client *c, pendingCommand *pcmd) { char *newline = NULL; int ok; long long ll; size_t querybuf_len = sdslen(c->querybuf); /* Cache sdslen */ if (c->multibulklen == 0) { - /* The client should have been reset */ - serverAssertWithInfo(c,NULL,c->argc == 0); + /* TODO: The client should have been reset */ + serverAssertWithInfo(c,NULL,pcmd->argc == 0); /* Multi bulk length cannot be read without a \r\n */ - newline = strchr(c->querybuf+c->qb_pos,'\r'); + newline = memchr(c->querybuf+c->qb_pos,'\r',sdslen(c->querybuf) - c->qb_pos); if (newline == NULL) { if (querybuf_len-c->qb_pos > PROTO_INLINE_MAX_SIZE) { - c->read_error = CLIENT_READ_TOO_BIG_MBULK_COUNT_STRING; + pcmd->flags = CLIENT_READ_TOO_BIG_MBULK_COUNT_STRING; } return C_ERR; } @@ -2536,10 +2574,10 @@ int processMultibulkBuffer(client *c) { size_t multibulklen_slen = newline - (c->querybuf + 1 + c->qb_pos); ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll); if (!ok || ll > INT_MAX) { - c->read_error = CLIENT_READ_INVALID_MULTIBUCK_LENGTH; + pcmd->flags = CLIENT_READ_INVALID_MULTIBUCK_LENGTH; return C_ERR; } else if (ll > 10 && authRequired(c)) { - c->read_error = CLIENT_READ_UNAUTH_MBUCK_COUNT; + pcmd->flags = CLIENT_READ_UNAUTH_MBUCK_COUNT; return C_ERR; } @@ -2548,19 +2586,12 @@ int processMultibulkBuffer(client *c) { if (ll <= 0) return C_OK; c->multibulklen = ll; + c->bulklen = -1; - /* Setup argv array on client structure. - * Create new argv in the following cases: - * 1) When the requested size is greater than the current size. - * 2) When the requested size is less than the current size, because - * we always allocate argv gradually with a maximum size of 1024, - * Therefore, if argv_len exceeds this limit, we always reallocate. */ - if (unlikely(c->multibulklen > c->argv_len || c->argv_len > 1024)) { - zfree(c->argv); - c->argv_len = min(c->multibulklen, 1024); - c->argv = zmalloc(sizeof(robj*)*c->argv_len); - } - c->argv_len_sum = 0; + zfree(pcmd->argv); + pcmd->argv_len = min(c->multibulklen, 1024); + pcmd->argv = zmalloc(sizeof(robj*)*(pcmd->argv_len)); + pcmd->argv_len_sum = 0; /* Per-slot network bytes-in calculation. * @@ -2593,17 +2624,17 @@ int processMultibulkBuffer(client *c) { * * The 1st component is calculated within the below line. * */ - c->net_input_bytes_curr_cmd += (multibulklen_slen + 3); + pcmd->input_bytes += (multibulklen_slen + 3); } serverAssertWithInfo(c,NULL,c->multibulklen > 0); while(c->multibulklen) { /* Read bulk length if unknown */ if (c->bulklen == -1) { - newline = strchr(c->querybuf+c->qb_pos,'\r'); + newline = memchr(c->querybuf+c->qb_pos,'\r',sdslen(c->querybuf) - c->qb_pos); if (newline == NULL) { if (querybuf_len-c->qb_pos > PROTO_INLINE_MAX_SIZE) { - c->read_error = CLIENT_READ_TOO_BIG_BUCK_COUNT_STRING; + pcmd->flags = CLIENT_READ_TOO_BIG_BUCK_COUNT_STRING; return C_ERR; } break; @@ -2614,7 +2645,7 @@ int processMultibulkBuffer(client *c) { break; if (c->querybuf[c->qb_pos] != '$') { - c->read_error = CLIENT_READ_EXPECTED_DOLLAR; + pcmd->flags = CLIENT_READ_EXPECTED_DOLLAR; return C_ERR; } @@ -2622,10 +2653,10 @@ int processMultibulkBuffer(client *c) { ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll); if (!ok || ll < 0 || (!(c->flags & CLIENT_MASTER) && ll > server.proto_max_bulk_len)) { - c->read_error = CLIENT_READ_INVALID_BUCK_LENGTH; + pcmd->flags = CLIENT_READ_INVALID_BUCK_LENGTH; return C_ERR; } else if (ll > 16384 && authRequired(c)) { - c->read_error = CLIENT_READ_UNAUTH_BUCK_LENGTH; + pcmd->flags = CLIENT_READ_UNAUTH_BUCK_LENGTH; return C_ERR; } @@ -2659,7 +2690,9 @@ int processMultibulkBuffer(client *c) { } c->bulklen = ll; /* Per-slot network bytes-in calculation, 2nd component. */ - c->net_input_bytes_curr_cmd += (bulklen_slen + 3); + pcmd->input_bytes += (bulklen_slen + 3); + } else { + serverAssert(pcmd->parsing_incomplete); } /* Read bulk argument */ @@ -2667,10 +2700,9 @@ int processMultibulkBuffer(client *c) { break; } else { /* Check if we have space in argv, grow if needed */ - if (c->argc >= c->argv_len) { - serverAssert(c->argv_len); /* Ensure argv is not freed while the client is in the mid of parsing command. */ - c->argv_len = min(c->argv_len < INT_MAX/2 ? c->argv_len*2 : INT_MAX, c->argc+c->multibulklen); - c->argv = zrealloc(c->argv, sizeof(robj*)*c->argv_len); + if (pcmd->argc >= pcmd->argv_len) { + pcmd->argv_len = min(pcmd->argv_len < INT_MAX/2 ? (pcmd->argv_len)*2 : INT_MAX, pcmd->argc+c->multibulklen); + pcmd->argv = zrealloc(pcmd->argv, sizeof(robj*)*(pcmd->argv_len)); } /* Optimization: if a non-master client's buffer contains JUST our bulk element @@ -2681,8 +2713,9 @@ int processMultibulkBuffer(client *c) { c->bulklen >= PROTO_MBULK_BIG_ARG && querybuf_len == (size_t)(c->bulklen+2)) { - c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf); - c->argv_len_sum += c->bulklen; + (pcmd->argv)[(pcmd->argc)++] = createObject(OBJ_STRING,c->querybuf); + pcmd->argv_len_sum += c->bulklen; + c->all_argv_len_sum += c->bulklen; sdsIncrLen(c->querybuf,-2); /* remove CRLF */ /* Assume that if we saw a fat argument we'll see another one likely... * But only if that fat argument is not too big compared to the memory limit. */ @@ -2694,9 +2727,10 @@ int processMultibulkBuffer(client *c) { sdsclear(c->querybuf); querybuf_len = sdslen(c->querybuf); /* Update cached length */ } else { - c->argv[c->argc++] = + (pcmd->argv)[(pcmd->argc)++] = createStringObject(c->querybuf+c->qb_pos,c->bulklen); - c->argv_len_sum += c->bulklen; + pcmd->argv_len_sum += c->bulklen; + c->all_argv_len_sum += c->bulklen; c->qb_pos += c->bulklen+2; } c->bulklen = -1; @@ -2707,12 +2741,25 @@ int processMultibulkBuffer(client *c) { /* We're done when c->multibulk == 0 */ if (c->multibulklen == 0) { /* Per-slot network bytes-in calculation, 3rd and 4th components. */ - c->net_input_bytes_curr_cmd += (c->argv_len_sum + (c->argc * 2)); + pcmd->input_bytes += (pcmd->argv_len_sum + (pcmd->argc * 2)); + pcmd->parsing_incomplete = 0; return C_OK; } /* Still not ready to process the command */ - return C_ERR; + pcmd->parsing_incomplete = 1; + return C_OK; +} + +/* Prepare the client for executing the next command: + * + * 1. Append the response, if necessary. + * 2. Reset the client. + * 3. Update the all_argv_len_sum counter and advance the pending_cmd cyclic buffer. + */ +void prepareForNextCommand(client *c) { + reqresAppendResponse(c); + resetClientInternal(c, 1); } /* Perform necessary tasks after a command was executed: @@ -2730,14 +2777,14 @@ void commandProcessed(client *c) { * since we have not applied the command. */ if (c->flags & CLIENT_BLOCKED) return; - reqresAppendResponse(c); clusterSlotStatsAddNetworkBytesInForUserClient(c); - resetClientInternal(c, 0); + prepareForNextCommand(c); long long prev_offset = c->reploff; if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { /* Update the applied replication offset of our master. */ - c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; + serverAssert(c->reploff_next > 0); + c->reploff = c->reploff_next; } /* If the client is a master we need to compute the difference @@ -2810,7 +2857,7 @@ int processPendingCommandAndInputBuffer(client *c) { * Note: when a master client steps into this function, * it can always satisfy this condition, because its querybuf * contains data not applied. */ - if (c->querybuf && sdslen(c->querybuf) > 0) { + if ((c->querybuf && sdslen(c->querybuf) > 0) || c->pending_cmds.ready_len > 0) { return processInputBuffer(c); } return C_OK; @@ -2878,12 +2925,86 @@ void handleClientReadError(client *c) { sdsfree(bytes); break; } + case CLIENT_READ_COMMAND_NOT_FOUND: + case CLIENT_READ_BAD_ARITY: + /* These are command validation errors, not protocol errors. + * They are handled in processCommand() via commandCheckExistence() + * and commandCheckArity(), which generate appropriate error responses. + * No action needed here. */ + break; default: - serverPanic("Unknown client read error"); + serverPanic("Unknown client read error: %d", c->read_error); + break; + } +} + +void parseInputBuffer(client *c) { + /* We limit the lookahead for unauthenticated connections to 1. + * This is both to reduce memory overhead, and to prevent errors: AUTH can + * affect the handling of succeeding commands. Parsing of "large" + * unauthenticated multibulk commands is rejected, which would cause those + * commands to incorrectly return an error to the client. */ + const int lookahead = authRequired(c) ? 1 : server.lookahead; + + /* Parse up to lookahead commands */ + while (c->pending_cmds.ready_len < lookahead && c->querybuf && c->qb_pos < sdslen(c->querybuf)) { + /* Determine request type when unknown. */ + if (!c->reqtype) { + if (c->querybuf[c->qb_pos] == '*') { + c->reqtype = PROTO_REQ_MULTIBULK; + } else { + c->reqtype = PROTO_REQ_INLINE; + } + } + + pendingCommand *pcmd = NULL; + if (c->reqtype == PROTO_REQ_INLINE) { + pcmd = zmalloc(sizeof(pendingCommand)); + initPendingCommand(pcmd); + if (parseInlineBuffer(c, pcmd) == C_ERR && !pcmd->flags) { + /* If it fails but there are no errors, it means that it might just be + * that the desired content cannot be parsed. At this point, we exit and wait for the next time. */ + freePendingCommand(c, pcmd); + return; + } + } else if (c->reqtype == PROTO_REQ_MULTIBULK) { + int incomplete = c->pending_cmds.tail && c->pending_cmds.tail->parsing_incomplete; + if (unlikely(incomplete)) { + pcmd = popPendingCommandFromHead(&c->pending_cmds); + } else { + pcmd = zmalloc(sizeof(pendingCommand)); + initPendingCommand(pcmd); + } + + if (parseMultibulk(c, pcmd) == C_ERR && !pcmd->flags) { + /* If it fails but there are no errors, it means that it might just be + * that the desired content cannot be parsed. At this point, we exit and wait for the next time. */ + freePendingCommand(c, pcmd); + return; + } + } else { + serverPanic("Unknown request type"); + } + + addPengingCommand(&c->pending_cmds, pcmd); + if (unlikely(pcmd->flags || pcmd->parsing_incomplete)) break; + + if (!pcmd->parsing_incomplete) { + pcmd->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; + preprocessCommand(c, pcmd); + resetClientQbufState(c); + } } } +/* Helper function to check if a read error is fatal (should stop processing) */ +static inline int isClientReadErrorFatal(int read_error) { + return read_error != 0 && + read_error != CLIENT_READ_COMMAND_NOT_FOUND && + read_error != CLIENT_READ_BAD_ARITY; +} + /* This function is called every time, in the client structure 'c', there is * more query buffer to process, because we read more data from the socket * or because a client was blocked and later reactivated, so there could be @@ -2891,7 +3012,8 @@ void handleClientReadError(client *c) { * return C_ERR in case the client was freed during the processing */ int processInputBuffer(client *c) { /* Keep processing while there is something in the input buffer */ - while(c->qb_pos < sdslen(c->querybuf)) { + while ((c->querybuf && c->qb_pos < sdslen(c->querybuf)) || + c->pending_cmds.ready_len > 0) { /* Immediately abort if the client is in the middle of something. */ if (c->flags & CLIENT_BLOCKED) break; @@ -2912,50 +3034,40 @@ int processInputBuffer(client *c) { * The same applies for clients we want to terminate ASAP. */ if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break; - /* Determine request type when unknown. */ - if (!c->reqtype) { - if (c->querybuf[c->qb_pos] == '*') { - c->reqtype = PROTO_REQ_MULTIBULK; - } else { - c->reqtype = PROTO_REQ_INLINE; + /* If commands are queued up, pop from the queue first */ + if (!consumePendingCommand(c)) { + parseInputBuffer(c); + if (consumePendingCommand(c) == 0) break; + + if (c->running_tid == IOTHREAD_MAIN_THREAD_ID && !(c->flags & CLIENT_IN_PREFETCH)) { + /* Prefetch the commands. */ + resetCommandsBatch(); + addCommandToBatch(c); + prefetchCommands(); } } - if (c->reqtype == PROTO_REQ_INLINE) { - if (processInlineBuffer(c) != C_OK) { - if (c->running_tid != IOTHREAD_MAIN_THREAD_ID && c->read_error) - enqueuePendingClientsToMainThread(c, 0); - break; - } - } else if (c->reqtype == PROTO_REQ_MULTIBULK) { - if (processMultibulkBuffer(c) != C_OK) { - if (c->running_tid != IOTHREAD_MAIN_THREAD_ID && c->read_error) - enqueuePendingClientsToMainThread(c, 0); - break; - } - } else { - serverPanic("Unknown request type"); + if (c->read_error && c->read_error != CLIENT_READ_COMMAND_NOT_FOUND && + c->read_error != CLIENT_READ_BAD_ARITY) { + break; + } + + if (c->running_tid != IOTHREAD_MAIN_THREAD_ID && c->read_error) { + enqueuePendingClientsToMainThread(c, 0); + break; } /* Multibulk processing could see a <= 0 length. */ - if (c->argc == 0) { - freeClientArgvInternal(c, 0); - c->reqtype = 0; - c->multibulklen = 0; - c->bulklen = -1; + if (!c->argc) { + /* A naked newline can be sent from masters as a keep-alive, or from slaves to refresh + * the last ACK time. In that case there's no command to actually execute. */ + prepareForNextCommand(c); } else { /* If we are in the context of an I/O thread, we can't really * execute the command here. All we can do is to flag the client * as one that needs to process the command. */ if (c->running_tid != IOTHREAD_MAIN_THREAD_ID) { c->io_flags |= CLIENT_IO_PENDING_COMMAND; - c->iolookedcmd = lookupCommand(c->argv, c->argc); - if (c->iolookedcmd && !commandCheckArity(c->iolookedcmd, c->argc, NULL)) { - /* The command was found, but the arity is invalid, reset it and let main - * thread handle. To avoid memory prefetching on an invalid command. */ - c->iolookedcmd = NULL; - } - c->slot = getSlotFromCommand(c->iolookedcmd, c->argv, c->argc); enqueuePendingClientsToMainThread(c, 0); break; } @@ -2985,6 +3097,7 @@ int processInputBuffer(client *c) { * so the repl_applied is not equal to qb_pos. */ if (c->repl_applied) { sdsrange(c->querybuf,c->repl_applied,-1); + serverAssert(c->qb_pos >= (size_t)c->repl_applied); c->qb_pos -= c->repl_applied; c->repl_applied = 0; } @@ -3272,7 +3385,7 @@ sds catClientInfoString(sds s, client *client) { " watch=%i", (int) listLength(client->watched_keys), " qbuf=%U", client->querybuf ? (unsigned long long) sdslen(client->querybuf) : 0, " qbuf-free=%U", client->querybuf ? (unsigned long long) sdsavail(client->querybuf) : 0, - " argv-mem=%U", (unsigned long long) client->argv_len_sum, + " argv-mem=%U", (unsigned long long) client->all_argv_len_sum, " multi-mem=%U", (unsigned long long) client->mstate.argv_len_sums, " rbs=%U", (unsigned long long) client->buf_usable_size, " rbp=%U", (unsigned long long) client->buf_peak, @@ -4181,14 +4294,51 @@ void rewriteClientCommandVector(client *c, int argc, ...) { void replaceClientCommandVector(client *c, int argc, robj **argv) { int j; retainOriginalCommandVector(c); + + /* We don't need to just fix the client argv, we also need to fix the pending command (same argv), + * But sometimes we reach here not from a real client, but from a Lua 'scriptRunCtx'. This flow bypasses the + * pending-command system entirely and uses c->argv directly. In this case there's no pending commands + * to update, so we skip that code. */ + pendingCommand *pcmd = NULL; + int is_mstate = 0; + if (c->mstate.executing_cmd < 0) { + is_mstate = 0; + if (c->pending_cmds.ready_len > 0) { + pcmd = c->pending_cmds.head; + serverAssert(!pcmd->parsing_incomplete); + } + } else { + is_mstate = 1; + serverAssert(c->mstate.executing_cmd < c->mstate.count); + pcmd = c->mstate.commands[c->mstate.executing_cmd]; + } + + if (pcmd) { + serverAssert(pcmd->argv == c->argv); + pcmd->argv = argv; + pcmd->argc = argc; + } freeClientArgv(c); c->argv = argv; c->argc = c->argv_len = argc; - c->argv_len_sum = 0; - for (j = 0; j < c->argc; j++) - if (c->argv[j]) - c->argv_len_sum += getStringObjectLen(c->argv[j]); + + if (!is_mstate) { /* multi-state does not track all_argv_len_sum, see code in queueMultiCommand */ + size_t new_argv_len_sum = 0; + for (j = 0; j < c->argc; j++) + if (c->argv[j]) + new_argv_len_sum += getStringObjectLen(c->argv[j]); + + if (!pcmd) { + c->all_argv_len_sum = new_argv_len_sum; + } else { + c->all_argv_len_sum -= pcmd->argv_len_sum; + pcmd->argv_len_sum = new_argv_len_sum; + c->all_argv_len_sum += pcmd->argv_len_sum; + } + } c->cmd = lookupCommandOrOriginal(c->argv,c->argc); + if (pcmd) + pcmd->cmd = c->cmd; serverAssertWithInfo(c,NULL,c->cmd != NULL); } @@ -4209,6 +4359,13 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { robj *oldval; retainOriginalCommandVector(c); + /* We don't need to just fix the client argv, we also need to fix the pending command (same argv), + * But sometimes we reach here not from a real client, but from a Lua 'scriptRunCtx'. This flow bypasses the + * pending-command system entirely and uses c->argv directly. In this case there's no pending commands + * to update, so we skip that code. */ + pendingCommand *pcmd = c->pending_cmds.head ? c->pending_cmds.head: NULL; + int update_pcmd = pcmd && pcmd->argv == c->argv; + /* We need to handle both extending beyond argc (just update it and * initialize the new element) or beyond argv_len (realloc is needed). */ @@ -4221,12 +4378,12 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { c->argv[i] = NULL; } oldval = c->argv[i]; - if (oldval) c->argv_len_sum -= getStringObjectLen(oldval); + if (oldval) c->all_argv_len_sum -= getStringObjectLen(oldval); if (newval) { c->argv[i] = newval; incrRefCount(newval); - c->argv_len_sum += getStringObjectLen(newval); + c->all_argv_len_sum += getStringObjectLen(newval); } else { /* move the remaining arguments one step left */ for (int j = i+1; j < c->argc; j++) { @@ -4236,10 +4393,20 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { } if (oldval) decrRefCount(oldval); + if (update_pcmd) { + pcmd->argv = c->argv; + pcmd->argc = c->argc; + pcmd->argv_len = c->argv_len; + if (oldval) pcmd->argv_len_sum -= getStringObjectLen(oldval); + if (newval) pcmd->argv_len_sum += getStringObjectLen(newval); + } + /* If this is the command name make sure to fix c->cmd. */ if (i == 0) { c->cmd = lookupCommandOrOriginal(c->argv,c->argc); serverAssertWithInfo(c,NULL,c->cmd != NULL); + if (update_pcmd) + pcmd->cmd = c->cmd; } } @@ -4281,7 +4448,7 @@ size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) { /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory * i.e. unused sds space and internal fragmentation, just the string length. but this is enough to * spot problematic clients. */ - mem += c->argv_len_sum + sizeof(robj*)*c->argc; + mem += c->all_argv_len_sum + sizeof(robj*)*c->argc; mem += multiStateMemOverhead(c); /* Add memory overhead of pubsub channels and patterns. Note: this is just the overhead of the robj pointers @@ -4715,3 +4882,97 @@ void evictClients(void) { } } } + +void initPendingCommand(pendingCommand *pcmd) { + memset(pcmd, 0, sizeof(pendingCommand)); + pcmd->slot = INVALID_CLUSTER_SLOT; +} + +void freePendingCommand(client *c, pendingCommand *pcmd) { + if (!pcmd) + return; + + if (pcmd->argv) { + for (int j = 0; j < pcmd->argc; j++) + decrRefCount(pcmd->argv[j]); + + zfree(pcmd->argv); + serverAssert(c->all_argv_len_sum >= pcmd->argv_len_sum); /* assert this doesn't try to go negative */ + c->all_argv_len_sum -= pcmd->argv_len_sum; + } + + zfree(pcmd); +} + +/* Add a command to the tail of the pending command list. */ +void addPengingCommand(pendingCommandList *queue, pendingCommand *cmd) { + cmd->next = NULL; + cmd->prev = queue->tail; + + if (queue->tail) { + queue->tail->next = cmd; + } else { + /* Queue was empty */ + queue->head = cmd; + } + + queue->tail = cmd; + queue->len++; + if (!cmd->parsing_incomplete) queue->ready_len++; +} + +pendingCommand *popPendingCommandFromHead(pendingCommandList *list) { + pendingCommand *cmd = list->head; + if (!cmd) return NULL; /* List is empty */ + + list->head = cmd->next; + if (list->head) { + list->head->prev = NULL; + } else { + /* Queue was empty */ + list->tail = NULL; + } + + cmd->next = cmd->prev = NULL; + list->len--; + if (!cmd->parsing_incomplete) list->ready_len--; + return cmd; +} + +pendingCommand *popPendingCommandFromTail(pendingCommandList *list) { + pendingCommand *cmd = list->tail; + if (!cmd) return NULL; /* List is empty */ + + list->tail = cmd->prev; + if (list->tail) { + list->tail->next = NULL; + } else { + /* Queue became empty */ + list->head = NULL; + } + + cmd->next = cmd->prev = NULL; + list->len--; + if (!cmd->parsing_incomplete) list->ready_len--; + return cmd; +} + +/* Consumes the first ready command from the pending command list and sets it as + * the client's current command. The command remains in the list but is marked as + * current. Returns 1 on success, 0 if no ready command is available (empty list + * or head command is still parsing). */ +static int consumePendingCommand(client *c) { + pendingCommand *curcmd = c->pending_cmds.head; + if (!curcmd || curcmd->parsing_incomplete) return 0; + + c->argc = curcmd->argc; + c->argv = curcmd->argv; + c->argv_len = curcmd->argv_len; + c->net_input_bytes_curr_cmd += curcmd->input_bytes; + c->reploff_next = curcmd->reploff; + c->slot = curcmd->slot; + c->lookedcmd = curcmd->cmd; + c->read_error = curcmd->flags; + c->current_pending_cmd = curcmd; + return 1; +} diff --git a/src/replication.c b/src/replication.c index 32921f19653..28bb27a2ddb 100644 --- a/src/replication.c +++ b/src/replication.c @@ -4199,12 +4199,14 @@ void replicationCacheMaster(client *c) { server.master->qb_pos = 0; server.master->repl_applied = 0; server.master->read_reploff = server.master->reploff; + server.master->reploff_next = 0; if (c->flags & CLIENT_MULTI) discardTransaction(c); listEmpty(c->reply); c->sentlen = 0; c->reply_bytes = 0; c->bufpos = 0; - resetClient(c); + resetClient(c, -1); + resetClientQbufState(c); /* Save the master. Server.master will be set to null later by * replicationHandleMasterDisconnection(). */ diff --git a/src/script_lua.c b/src/script_lua.c index 2e8220743c3..fc3cf6e9b4d 100644 --- a/src/script_lua.c +++ b/src/script_lua.c @@ -974,7 +974,8 @@ static int luaRedisGenericCommand(lua_State *lua, int raise_error) { c->argc = c->argv_len = 0; c->user = NULL; c->argv = NULL; - resetClient(c); + c->all_argv_len_sum = 0; + resetClient(c, 1); inuse--; if (raise_error) { diff --git a/src/server.c b/src/server.c index 350572111a0..61914e5244e 100644 --- a/src/server.c +++ b/src/server.c @@ -872,23 +872,6 @@ int clientsCronResizeQueryBuffer(client *c) { return 0; } -/* If the client has been idle for too long, free the client's arguments. */ -int clientsCronFreeArgvIfIdle(client *c) { - /* If the client is in the middle of parsing a command, or if argv is in use - * (e.g. parsed in the IO thread but not yet executed, or blocked), exit ASAP. */ - if (!c->argv || c->multibulklen || c->argc) return 0; - - /* Free argv if the client has been idle for more than 2 seconds or if argv - * size is too large. */ - time_t idletime = server.unixtime - c->lastinteraction; - if (idletime > 2 || c->argv_len > 128) { - c->argv_len = 0; - zfree(c->argv); - c->argv = NULL; - } - return 0; -} - /* The client output buffer can be adjusted to better fit the memory requirements. * * the logic is: @@ -960,7 +943,7 @@ int CurrentPeakMemUsageSlot = 0; int clientsCronTrackExpansiveClients(client *c) { size_t qb_size = c->querybuf ? sdsZmallocSize(c->querybuf) : 0; size_t argv_size = c->argv ? zmalloc_size(c->argv) : 0; - size_t in_usage = qb_size + c->argv_len_sum + argv_size; + size_t in_usage = qb_size + c->all_argv_len_sum + argv_size; size_t out_usage = getClientOutputBufferMemoryUsage(c); /* Track the biggest values observed so far in this slot. */ @@ -1112,7 +1095,6 @@ int clientsCronRunClient(client *c) { * terminated. */ if (clientsCronHandleTimeout(c,now)) return 1; if (clientsCronResizeQueryBuffer(c)) return 1; - if (clientsCronFreeArgvIfIdle(c)) return 1; if (clientsCronResizeOutputBuffer(c,now)) return 1; if (clientsCronTrackExpansiveClients(c)) return 1; @@ -2980,6 +2962,8 @@ void initServer(void) { if (server.maxmemory_clients != 0) initServerClientMemUsageBuckets(); + + prefetchCommandsBatchInit(); } void initListeners(void) { @@ -4061,6 +4045,52 @@ uint64_t getCommandFlags(client *c) { return cmd_flags; } +void preprocessCommand(client *c, pendingCommand *pcmd) { + pcmd->slot = INVALID_CLUSTER_SLOT; + if (pcmd->argc == 0) + return; + + /* Check if we can reuse the last command instead of looking it up. + * The last command is either the penultimate pending command (if it exists), or c->lastcmd. */ + struct redisCommand *last_cmd = c->pending_cmds.tail->prev ? c->pending_cmds.head->cmd : c->lastcmd; + + if (isCommandReusable(last_cmd, pcmd->argv[0])) + pcmd->cmd = last_cmd; + else + pcmd->cmd = lookupCommand(pcmd->argv, pcmd->argc); + + if (!pcmd->cmd) { + pcmd->flags = CLIENT_READ_COMMAND_NOT_FOUND; + return; + } + + if ((pcmd->cmd->arity > 0 && pcmd->cmd->arity != pcmd->argc) || + (pcmd->argc < -pcmd->cmd->arity)) + { + pcmd->flags = CLIENT_READ_BAD_ARITY; + return; + } + + if (server.cluster_enabled) { + getKeysResult result = (getKeysResult)GETKEYS_RESULT_INIT; + int numkeys = getKeysFromCommand(pcmd->cmd, pcmd->argv, pcmd->argc, &result); + for (int i = 0; i < numkeys; i++) { + robj *thiskey = pcmd->argv[result.keys[i].pos]; + int thisslot = (int)keyHashSlot((char*)thiskey->ptr, sdslen(thiskey->ptr)); + if (pcmd->slot == INVALID_CLUSTER_SLOT) { + pcmd->slot = thisslot; + } else if (pcmd->slot != thisslot) { + serverLog(LL_NOTICE, "preprocessCommand: CROSS SLOT ERROR"); + /* Invalidate the slot to indicate that there is a cross-slot error */ + pcmd->slot = INVALID_CLUSTER_SLOT; + /* Cross slot error. */ + break; + } + } + getKeysFreeResult(&result); + } +} + /* If this function gets called we already read a whole * command, arguments are in the client argv/argc fields. * processCommand() execute the command or prepare the @@ -4104,11 +4134,17 @@ int processCommand(client *c) { * we do not have to repeat the same checks */ if (!client_reprocessing_command) { /* check if we can reuse the last command instead of looking up if we already have that info */ - struct redisCommand *cmd = NULL; - if (isCommandReusable(c->lastcmd, c->argv[0])) - cmd = c->lastcmd; - else - cmd = c->iolookedcmd ? c->iolookedcmd : lookupCommand(c->argv, c->argc); + struct redisCommand *cmd = c->lookedcmd; + + /* The command may have been modified by modules (e.g., in CommandFilters callbacks), + * so we need to look it up again. */ + if (!cmd) { + if (isCommandReusable(c->lastcmd, c->argv[0])) + cmd = c->lastcmd; + else + cmd = lookupCommand(c->argv, c->argc); + } + if (!cmd) { /* Handle possible security attacks. */ if (!strcasecmp(c->argv[0]->ptr,"host:") || !strcasecmp(c->argv[0]->ptr,"post")) { @@ -4441,9 +4477,9 @@ int areCommandKeysInSameSlot(client *c, int *hashslot) { /* If client is in multi-exec, we need to check the slot of all keys * in the transaction. */ for (int i = 0; i < (ms ? ms->count : 1); i++) { - struct redisCommand *cmd = ms ? ms->commands[i].cmd : c->cmd; - robj **argv = ms ? ms->commands[i].argv : c->argv; - int argc = ms ? ms->commands[i].argc : c->argc; + struct redisCommand *cmd = ms ? ms->commands[i]->cmd : c->cmd; + robj **argv = ms ? ms->commands[i]->argv : c->argv; + int argc = ms ? ms->commands[i]->argc : c->argc; getKeysResult result = GETKEYS_RESULT_INIT; int numkeys = getKeysFromCommand(cmd, argv, argc, &result); @@ -6965,7 +7001,7 @@ void dismissClientMemory(client *c) { dismissMemory(c->buf, c->buf_usable_size); if (c->querybuf) dismissSds(c->querybuf); /* Dismiss argv array only if we estimate it contains a big buffer. */ - if (c->argc && c->argv_len_sum/c->argc >= server.page_size) { + if (c->argc && c->all_argv_len_sum/c->argc >= server.page_size) { for (int i = 0; i < c->argc; i++) { dismissObject(c->argv[i], 0); } diff --git a/src/server.h b/src/server.h index a5cf8a73fe0..482ffe656dc 100644 --- a/src/server.h +++ b/src/server.h @@ -202,6 +202,9 @@ struct hdr_histogram; * in order to make sure of not over provisioning more than 128 fds. */ #define CONFIG_FDSET_INCR (CONFIG_MIN_RESERVED_FDS+96) +/* Default lookahead value */ +#define REDIS_DEFAULT_LOOKAHEAD 16 + /* OOM Score Adjustment classes. */ #define CONFIG_OOM_MASTER 0 #define CONFIG_OOM_REPLICA 1 @@ -431,6 +434,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CLIENT_REEXECUTING_COMMAND (1ULL<<50) /* The client is re-executing the command. */ #define CLIENT_REPL_RDB_CHANNEL (1ULL<<51) /* Client which is used for rdb delivery as part of rdb channel replication */ #define CLIENT_INTERNAL (1ULL<<52) /* Internal client connection */ +#define CLIENT_IN_PREFETCH (1ULL<<53) /* The client is in the prefetching batch. */ /* Any flag that does not let optimize FLUSH SYNC to run it in bg as blocking client ASYNC */ #define CLIENT_AVOID_BLOCKING_ASYNC_FLUSH (CLIENT_DENY_BLOCKING|CLIENT_MULTI|CLIENT_LUA_DEBUG|CLIENT_LUA_DEBUG_SYNC|CLIENT_MODULE) @@ -461,6 +465,8 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CLIENT_READ_CONN_DISCONNECTED 11 #define CLIENT_READ_CONN_CLOSED 12 #define CLIENT_READ_REACHED_MAX_QUERYBUF 13 +#define CLIENT_READ_COMMAND_NOT_FOUND 14 +#define CLIENT_READ_BAD_ARITY 15 /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ @@ -1137,16 +1143,11 @@ typedef struct rdbLoadingCtx { functionsLibCtx* functions_lib_ctx; }rdbLoadingCtx; -/* Client MULTI/EXEC state */ -typedef struct multiCmd { - robj **argv; - int argv_len; - int argc; - struct redisCommand *cmd; -} multiCmd; - +typedef struct pendingCommand pendingCommand; typedef struct multiState { - multiCmd *commands; /* Array of MULTI commands */ + pendingCommand **commands; /* Array of pointers to MULTI commands */ + int executing_cmd; /* The index of the currently exeuted transaction + command (index in commands field) */ int count; /* Total number of MULTI commands */ int cmd_flags; /* The accumulated command flags OR-ed together. So if at least a command has a given flag, it @@ -1155,7 +1156,7 @@ typedef struct multiState { is possible to know if all the commands have a certain flag. */ size_t argv_len_sums; /* mem used by all commands arguments */ - int alloc_count; /* total number of multiCmd struct memory reserved. */ + int alloc_count; /* total number of pendingCommand struct memory reserved. */ } multiState; /* This structure holds the blocking operation state for a client. @@ -1204,6 +1205,14 @@ typedef struct readyList { robj *key; } readyList; +/* List of pending commands. */ +typedef struct pendingCommandList { + pendingCommand *head; + pendingCommand *tail; + int len; /* Number of commands in the list */ + int ready_len; /* Number of commands that are ready to be processed */ +} pendingCommandList; + /* This structure represents a Redis user. This is useful for ACLs, the * user is associated to the connection after the connection is authenticated. * If there is no associated user, the connection uses the default user. */ @@ -1343,11 +1352,13 @@ typedef struct client { int argv_len; /* Size of argv array (may be more than argc) */ int original_argc; /* Num of arguments of original command if arguments were rewritten. */ robj **original_argv; /* Arguments of original command if arguments were rewritten. */ - size_t argv_len_sum; /* Sum of lengths of objects in argv list. */ + size_t all_argv_len_sum; /* Sum of lengths of objects in all pendingCommand argv lists */ + pendingCommandList pending_cmds; /* List of parsed pending commands */ + pendingCommand *current_pending_cmd; robj **deferred_objects; /* Array of deferred objects to free. */ int deferred_objects_num; /* Number of deferred objects to free. */ struct redisCommand *cmd, *lastcmd; /* Last command executed. */ - struct redisCommand *iolookedcmd; /* Command looked up in IO threads. */ + struct redisCommand *lookedcmd; /* Command looked up in lookahead. */ struct redisCommand *realcmd; /* The original command that was executed by the client, Used to update error stats in case the c->cmd was modified during the command invocation (like on GEOADD for example). */ @@ -1383,6 +1394,7 @@ typedef struct client { sds replpreamble; /* Replication DB preamble. */ long long read_reploff; /* Read replication offset if this is a master. */ long long reploff; /* Applied replication offset if this is a master. */ + long long reploff_next; /* Next value to set for reploff when a command finishes executing */ long long repl_applied; /* Applied replication data count in querybuf, if this is a replica. */ long long repl_ack_off; /* Replication ack offset, if this is a slave. */ long long repl_aof_off; /* Replication AOF fsync ack offset, if this is a slave. */ @@ -1970,6 +1982,7 @@ struct redisServer { int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */ unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from within the main dict scan */ size_t client_max_querybuf_len; /* Limit for client query buffer length */ + int lookahead; /* how many commands in each client pipeline to decode and prefetch */ int dbnum; /* Total number of configured DBs */ int supervised; /* 1 if supervised, 0 otherwise. */ int supervised_mode; /* See SUPERVISED_* */ @@ -2337,6 +2350,25 @@ typedef struct { } getKeysResult; #define GETKEYS_RESULT_INIT { 0, MAX_KEYS_BUFFER, {{0}}, NULL } +/* Parser state and parse result of a command from a client's input buffer. */ +struct pendingCommand { + int argc; /* Num of arguments of current command. */ + int argv_len; /* Size of argv array (may be more than argc) */ + robj **argv; /* Arguments of current command. */ + size_t argv_len_sum; /* Sum of lengths of objects in argv list. */ + unsigned long long input_bytes; + struct redisCommand *cmd; + // getKeysResult keys_result; + long long reploff; /* c->reploff should be set to this value when the command is processed */ + int slot; /* The slot the command is executing against. Set to INVALID_CLUSTER_SLOT if no slot is being used or if + the command has a cross slot error */ + uint8_t flags; + int parsing_incomplete; + + struct pendingCommand *next; + struct pendingCommand *prev; +}; + /* Key specs definitions. * * Brief: This is a scheme that tries to describe the location @@ -2797,6 +2829,13 @@ void moduleDefragEnd(void); void *moduleGetHandleByName(char *modulename); int moduleIsModuleCommand(void *module_handle, struct redisCommand *cmd); +/* pcmd */ +void initPendingCommand(pendingCommand *pcmd); +void freePendingCommand(client *c, pendingCommand *pcmd); +void addPengingCommand(pendingCommandList *queue, pendingCommand *cmd); +pendingCommand *popPendingCommandFromHead(pendingCommandList *queue); +pendingCommand *popPendingCommandFromTail(pendingCommandList *queue); + /* Utils */ long long ustime(void); mstime_t mstime(void); @@ -2822,9 +2861,11 @@ void deauthenticateAndCloseClient(client *c); void logInvalidUseAndFreeClientAsync(client *c, const char *fmt, ...); int beforeNextClient(client *c); void clearClientConnectionState(client *c); -void resetClient(client *c); +void resetClient(client *c, int num_pcmds_to_free); +void resetClientQbufState(client *c); void freeClientOriginalArgv(client *c); void freeClientArgv(client *c); +void freeClientPendingCommands(client *c, int num_pcmds_to_free); void tryDeferFreeClientObject(client *c, robj *o); void freeClientDeferredObjects(client *c, int free_array); void sendReplyToClient(connection *conn); @@ -3334,8 +3375,11 @@ void updatePeakMemory(size_t used_memory); size_t freeMemoryGetNotCountedMemory(void); int overMaxmemoryAfterAlloc(size_t moremem); uint64_t getCommandFlags(client *c); +void preprocessCommand(client *c, pendingCommand *pcmd); int processCommand(client *c); void commandProcessed(client *c); +void prepareForNextCommand(client *c); + int processPendingCommandAndInputBuffer(client *c); int processCommandAndResetClient(client *c); int areCommandKeysInSameSlot(client *c, int *hashslot); diff --git a/tests/unit/cluster/sharded-pubsub.tcl b/tests/unit/cluster/sharded-pubsub.tcl index 0347ac65351..57b550ab727 100644 --- a/tests/unit/cluster/sharded-pubsub.tcl +++ b/tests/unit/cluster/sharded-pubsub.tcl @@ -29,7 +29,7 @@ start_cluster 1 1 {tags {external:skip cluster}} { $primary MULTI $primary SPUBLISH ch1 "hello" $primary GET foo - catch {[$primary EXEC]} err + catch {$primary EXEC} err assert_match {CROSSSLOT*} $err } diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index 75bd96fbac1..5be6db0278d 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -76,6 +76,10 @@ run_solo {defrag} { } proc test_active_defrag {type} { + + # note: Disabling lookahead because it changes the number and order of allocations which interferes with defrag and causes tests to fail + r config set lookahead 1 + if {[string match {*jemalloc*} [s mem_allocator]] && [r debug mallctl arenas.page] <= 8192} { test "Active defrag main dictionary: $type" { r config set hz 100 @@ -777,6 +781,7 @@ run_solo {defrag} { if {$::verbose} { puts "frag $frag" } + puts 1111 assert {$frag >= $expected_frag} r config set latency-monitor-threshold 5