diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 9d8b2db9d316..3cfaf7ac2d2c 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -98,6 +98,10 @@ ABSL_FLAG(uint32_t, max_busy_read_usec, 100, "Maximum time we read and parse from " "a socket without yielding. In microseconds."); +ABSL_FLAG(size_t, squashed_reply_size_limit, 0, + "Max bytes allowed for squashing_current_reply_size. If this limit is reached, " + "connections dispatching via pipelines will block until this value is decremented."); + using namespace util; using namespace std; using absl::GetFlag; @@ -180,6 +184,8 @@ bool TrafficLogger::Write(iovec* blobs, size_t len) { thread_local TrafficLogger tl_traffic_logger{}; thread_local base::Histogram* io_req_size_hist = nullptr; +thread_local const size_t reply_size_limit = absl::GetFlag(FLAGS_squashed_reply_size_limit); + void OpenTrafficLogger(string_view base_path) { unique_lock lk{tl_traffic_logger.mutex}; if (tl_traffic_logger.log_file) @@ -1158,7 +1164,7 @@ void Connection::DispatchSingle(bool has_more, absl::FunctionRef invoke_ last_interaction_ = time(nullptr); // We might have blocked the dispatch queue from processing, wake it up. - if (dispatch_q_.size() > 0) + if (!dispatch_q_.empty()) cnd_.notify_one(); } } @@ -1632,7 +1638,8 @@ void Connection::AsyncFiber() { bool squashing_enabled = squashing_threshold > 0; bool threshold_reached = pending_pipeline_cmd_cnt_ > squashing_threshold; bool are_all_plain_cmds = pending_pipeline_cmd_cnt_ == dispatch_q_.size(); - if (squashing_enabled && threshold_reached && are_all_plain_cmds && !skip_next_squashing_) { + if (squashing_enabled && threshold_reached && are_all_plain_cmds && !skip_next_squashing_ && + !IsReplySizeOverLimit()) { SquashPipeline(); } else { MessageHandle msg = std::move(dispatch_q_.front()); @@ -2059,6 +2066,16 @@ void Connection::DecrNumConns() { --stats_->num_conns_other; } +bool Connection::IsReplySizeOverLimit() const { + std::atomic& reply_sz = tl_facade_stats->reply_stats.squashing_current_reply_size; + size_t current = reply_sz.load(std::memory_order_acquire); + const bool over_limit = reply_size_limit != 0 && current > 0 && current > reply_size_limit; + // Every 10 seconds. Otherwise, it can be too sensitive on certain workloads in production + // instances. + LOG_EVERY_N(INFO, 10) << "MultiCommandSquasher overlimit: " << current << "/" << reply_size_limit; + return over_limit; +} + void Connection::SetMaxQueueLenThreadLocal(unsigned tid, uint32_t val) { thread_queue_backpressure[tid].pipeline_queue_max_len = val; thread_queue_backpressure[tid].pipeline_cnd.notify_all(); @@ -2089,7 +2106,7 @@ void Connection::EnsureMemoryBudget(unsigned tid) { Connection::WeakRef::WeakRef(std::shared_ptr ptr, unsigned thread_id, uint32_t client_id) - : ptr_{ptr}, thread_id_{thread_id}, client_id_{client_id} { + : ptr_{std::move(ptr)}, thread_id_{thread_id}, client_id_{client_id} { } unsigned Connection::WeakRef::Thread() const { @@ -2115,7 +2132,7 @@ uint32_t Connection::WeakRef::GetClientId() const { return client_id_; } -bool Connection::WeakRef::operator<(const WeakRef& other) { +bool Connection::WeakRef::operator<(const WeakRef& other) const { return client_id_ < other.client_id_; } diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index f92ca79f8a93..310b36c75f33 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -196,7 +196,7 @@ class Connection : public util::Connection { // Returns client id.Thread-safe. uint32_t GetClientId() const; - bool operator<(const WeakRef& other); + bool operator<(const WeakRef& other) const; bool operator==(const WeakRef& other) const; private: @@ -420,6 +420,8 @@ class Connection : public util::Connection { void IncrNumConns(); void DecrNumConns(); + bool IsReplySizeOverLimit() const; + std::deque dispatch_q_; // dispatch queue util::fb2::CondVarAny cnd_; // dispatch queue waker util::fb2::Fiber async_fb_; // async fiber (if started) diff --git a/tests/dragonfly/memory_test.py b/tests/dragonfly/memory_test.py index d871371dfb01..c15fd736debc 100644 --- a/tests/dragonfly/memory_test.py +++ b/tests/dragonfly/memory_test.py @@ -1,4 +1,5 @@ import pytest +import asyncio from redis import asyncio as aioredis from .utility import * import logging @@ -222,3 +223,38 @@ async def test_cache_eviction_with_rss_deny_oom( ) stats_info = await async_client.info("stats") logging.info(f'Current evicted: {stats_info["evicted_keys"]}. Total keys: {num_keys}.') + + +@pytest.mark.asyncio +async def test_throttle_on_commands_squashing_replies_bytes(df_factory: DflyInstanceFactory): + df = df_factory.create( + proactor_threads=2, + squashed_reply_size_limit=500_000_000, + vmodule="dragonfly_connection=2", + ) + df.start() + + client = df.client() + # 0.5gb + await client.execute_command("debug populate 64 test 3125 rand type hash elements 500") + + async def poll(): + # At any point we should not cross this limit + assert df.rss < 1_500_000_000 + cl = df.client() + pipe = cl.pipeline(transaction=False) + for i in range(64): + pipe.execute_command(f"hgetall test:{i}") + + await pipe.execute() + + tasks = [] + for i in range(20): + tasks.append(asyncio.create_task(poll())) + + for task in tasks: + await task + + df.stop() + found = df.find_in_logs("MultiCommandSquasher overlimit: ") + assert len(found) > 0