Skip to content

chore: avoid squashing when squashing_current_reply_size crosses limit #4924

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jul 8, 2025
25 changes: 21 additions & 4 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ ABSL_FLAG(uint32_t, max_busy_read_usec, 100,
"Maximum time we read and parse from "
"a socket without yielding. In microseconds.");

ABSL_FLAG(size_t, squashed_reply_size_limit, 0,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So as I understand now we will not block the connection but just not doing the sqaushing optimizaiton, right?
lets update the comment and the PR title/description

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apart from this comment LGTM

"Max bytes allowed for squashing_current_reply_size. If this limit is reached, "
"connections dispatching pipelines won't squash them.");

using namespace util;
using namespace std;
using absl::GetFlag;
Expand Down Expand Up @@ -180,6 +184,8 @@ bool TrafficLogger::Write(iovec* blobs, size_t len) {
thread_local TrafficLogger tl_traffic_logger{};
thread_local base::Histogram* io_req_size_hist = nullptr;

thread_local const size_t reply_size_limit = absl::GetFlag(FLAGS_squashed_reply_size_limit);

void OpenTrafficLogger(string_view base_path) {
unique_lock lk{tl_traffic_logger.mutex};
if (tl_traffic_logger.log_file)
Expand Down Expand Up @@ -1158,7 +1164,7 @@ void Connection::DispatchSingle(bool has_more, absl::FunctionRef<void()> invoke_
last_interaction_ = time(nullptr);

// We might have blocked the dispatch queue from processing, wake it up.
if (dispatch_q_.size() > 0)
if (!dispatch_q_.empty())
cnd_.notify_one();
}
}
Expand Down Expand Up @@ -1632,7 +1638,8 @@ void Connection::AsyncFiber() {
bool squashing_enabled = squashing_threshold > 0;
bool threshold_reached = pending_pipeline_cmd_cnt_ > squashing_threshold;
bool are_all_plain_cmds = pending_pipeline_cmd_cnt_ == dispatch_q_.size();
if (squashing_enabled && threshold_reached && are_all_plain_cmds && !skip_next_squashing_) {
if (squashing_enabled && threshold_reached && are_all_plain_cmds && !skip_next_squashing_ &&
!IsReplySizeOverLimit()) {
SquashPipeline();
} else {
MessageHandle msg = std::move(dispatch_q_.front());
Expand Down Expand Up @@ -2059,6 +2066,16 @@ void Connection::DecrNumConns() {
--stats_->num_conns_other;
}

bool Connection::IsReplySizeOverLimit() const {
std::atomic<size_t>& reply_sz = tl_facade_stats->reply_stats.squashing_current_reply_size;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.load(memory_order_relaxed)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, why did you split this into 2 lines?

Copy link
Contributor Author

@kostasrim kostasrim Jun 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.load(memory_order_relaxed)

We should synchronize acquire and release semantics, otherwise the load might be an older value in the modification order of the atomic variable.

I splited it in two lines because otherwise the expression was too big

size_t current = reply_sz.load(std::memory_order_acquire);
const bool over_limit = reply_size_limit != 0 && current > 0 && current > reply_size_limit;
// Every 10 seconds. Otherwise, it can be too sensitive on certain workloads in production
// instances.
LOG_EVERY_N(INFO, 10) << "MultiCommandSquasher overlimit: " << current << "/" << reply_size_limit;
return over_limit;
}

void Connection::SetMaxQueueLenThreadLocal(unsigned tid, uint32_t val) {
thread_queue_backpressure[tid].pipeline_queue_max_len = val;
thread_queue_backpressure[tid].pipeline_cnd.notify_all();
Expand Down Expand Up @@ -2089,7 +2106,7 @@ void Connection::EnsureMemoryBudget(unsigned tid) {

Connection::WeakRef::WeakRef(std::shared_ptr<Connection> ptr, unsigned thread_id,
uint32_t client_id)
: ptr_{ptr}, thread_id_{thread_id}, client_id_{client_id} {
: ptr_{std::move(ptr)}, thread_id_{thread_id}, client_id_{client_id} {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Contributor Author

@kostasrim kostasrim Jun 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deep copying a shared ptr increments the atomic counter so we pay for an atomic operation. move avoids that by copying the control pointer instead. I just saw the misuse and used move 😄

}

unsigned Connection::WeakRef::Thread() const {
Expand All @@ -2115,7 +2132,7 @@ uint32_t Connection::WeakRef::GetClientId() const {
return client_id_;
}

bool Connection::WeakRef::operator<(const WeakRef& other) {
bool Connection::WeakRef::operator<(const WeakRef& other) const {
return client_id_ < other.client_id_;
}

Expand Down
4 changes: 3 additions & 1 deletion src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class Connection : public util::Connection {
// Returns client id.Thread-safe.
uint32_t GetClientId() const;

bool operator<(const WeakRef& other);
bool operator<(const WeakRef& other) const;
bool operator==(const WeakRef& other) const;

private:
Expand Down Expand Up @@ -420,6 +420,8 @@ class Connection : public util::Connection {
void IncrNumConns();
void DecrNumConns();

bool IsReplySizeOverLimit() const;

std::deque<MessageHandle> dispatch_q_; // dispatch queue
util::fb2::CondVarAny cnd_; // dispatch queue waker
util::fb2::Fiber async_fb_; // async fiber (if started)
Expand Down
36 changes: 36 additions & 0 deletions tests/dragonfly/memory_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
import asyncio
from redis import asyncio as aioredis
from .utility import *
import logging
Expand Down Expand Up @@ -222,3 +223,38 @@ async def test_cache_eviction_with_rss_deny_oom(
)
stats_info = await async_client.info("stats")
logging.info(f'Current evicted: {stats_info["evicted_keys"]}. Total keys: {num_keys}.')


@pytest.mark.asyncio
async def test_throttle_on_commands_squashing_replies_bytes(df_factory: DflyInstanceFactory):
df = df_factory.create(
proactor_threads=2,
squashed_reply_size_limit=500_000_000,
vmodule="dragonfly_connection=2",
)
df.start()

client = df.client()
# 0.5gb
await client.execute_command("debug populate 64 test 3125 rand type hash elements 500")

async def poll():
# At any point we should not cross this limit
assert df.rss < 1_500_000_000
cl = df.client()
pipe = cl.pipeline(transaction=False)
for i in range(64):
pipe.execute_command(f"hgetall test:{i}")

await pipe.execute()

tasks = []
for i in range(20):
tasks.append(asyncio.create_task(poll()))

for task in tasks:
await task

df.stop()
found = df.find_in_logs("MultiCommandSquasher overlimit: ")
assert len(found) > 0
Loading