Skip to content

Commit 7f19ba2

Browse files
committed
add event count throttle
1 parent 290b807 commit 7f19ba2

File tree

4 files changed

+44
-19
lines changed

4 files changed

+44
-19
lines changed

src/server/main_service.cc

-1
Original file line numberDiff line numberDiff line change
@@ -1327,7 +1327,6 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, SinkReplyBui
13271327

13281328
ServerState::tlocal()->RecordCmd(cntx->has_main_or_memcache_listener);
13291329
Transaction* tx = cntx->transaction;
1330-
13311330
auto& info = cntx->conn_state.tracking_info_;
13321331
const bool is_read_only = cid->opt_mask() & CO::READONLY;
13331332
if (tx) {

src/server/multi_command_squasher.cc

+7-6
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ atomic_uint64_t MultiCommandSquasher::current_reply_size_ = 0;
6969
thread_local size_t MultiCommandSquasher::throttle_size_limit_ =
7070
absl::GetFlag(FLAGS_throttle_squashed);
7171

72+
thread_local util::fb2::EventCount MultiCommandSquasher::ec_;
73+
7274
MultiCommandSquasher::MultiCommandSquasher(absl::Span<StoredCmd> cmds, ConnectionContext* cntx,
7375
Service* service, bool verify_commands, bool error_abort)
7476
: cmds_{cmds},
@@ -207,12 +209,7 @@ OpStatus MultiCommandSquasher::SquashedHopCb(EngineShard* es, RespVersion resp_v
207209

208210
local_tx->InitByArgs(cntx_->ns, local_cntx.conn_state.db_index, args);
209211

210-
if (MultiCommandSquasher::IsMultiCommandSquasherOverLimit()) {
211-
auto* rb = (facade::RedisReplyBuilder*)&crb;
212-
rb->SendError("High Rss");
213-
} else {
214-
service_->InvokeCmd(cmd->Cid(), args, &crb, &local_cntx);
215-
}
212+
service_->InvokeCmd(cmd->Cid(), args, &crb, &local_cntx);
216213

217214
sinfo.replies.emplace_back(crb.Take());
218215
current_reply_size_.fetch_add(Size(sinfo.replies.back()), std::memory_order_relaxed);
@@ -233,6 +230,9 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) {
233230
if (order_.empty())
234231
return true;
235232

233+
MultiCommandSquasher::ec_.await(
234+
[]() { return !MultiCommandSquasher::IsMultiCommandSquasherOverLimit(); });
235+
236236
unsigned num_shards = 0;
237237
for (auto& sd : sharded_) {
238238
sd.replies.reserve(sd.cmds.size());
@@ -287,6 +287,7 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) {
287287
break;
288288
}
289289
current_reply_size_.fetch_sub(size, std::memory_order_relaxed);
290+
MultiCommandSquasher::ec_.notifyAll();
290291

291292
uint64_t after_reply = proactor->GetMonotonicTimeNs();
292293
ServerState::SafeTLocal()->stats.multi_squash_exec_hop_usec += (after_hop - start) / 1000;

src/server/multi_command_squasher.h

+9-3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include "facade/reply_capture.h"
88
#include "server/conn_context.h"
99
#include "server/main_service.h"
10+
#include "util/fibers/synchronization.h"
1011

1112
namespace dfly {
1213

@@ -33,9 +34,12 @@ class MultiCommandSquasher {
3334
}
3435

3536
static bool IsMultiCommandSquasherOverLimit() {
36-
LOG(INFO) << throttle_size_limit_ << " current reply suze " << current_reply_size_;
37-
return throttle_size_limit_ > 0 &&
38-
current_reply_size_.load(std::memory_order_relaxed) > throttle_size_limit_;
37+
const bool over_limit =
38+
throttle_size_limit_ > 0 &&
39+
current_reply_size_.load(std::memory_order_relaxed) > throttle_size_limit_;
40+
VLOG_IF(2, over_limit) << "MultiCommandSquasher overlimit: " << throttle_size_limit_
41+
<< " current reply size " << current_reply_size_;
42+
return over_limit;
3943
}
4044

4145
private:
@@ -99,6 +103,8 @@ class MultiCommandSquasher {
99103
// we increase size in one thread and decrease in another
100104
static atomic_uint64_t current_reply_size_;
101105
static thread_local size_t throttle_size_limit_;
106+
// Used to throttle when memory is tight
107+
static thread_local util::fb2::EventCount ec_;
102108
};
103109

104110
} // namespace dfly

tests/dragonfly/memory_test.py

+28-9
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import pytest
2+
import asyncio
23
from redis import asyncio as aioredis
34
from .utility import *
45
import logging
@@ -226,18 +227,36 @@ async def test_cache_eviction_with_rss_deny_oom(
226227

227228
@pytest.mark.asyncio
228229
async def test_throttle_on_commands_squashing_replies_bytes(df_factory: DflyInstanceFactory):
229-
df = df_factory.create(proactor_threads=2, throttle_squashed=1000000000)
230+
df = df_factory.create(
231+
proactor_threads=2, throttle_squashed=1_000_000_000, vmodule="multi_command_squasher=2"
232+
)
230233
df.start()
231234

232235
client = df.client()
233236
# 1gb
234237
await client.execute_command("debug populate 1 test 10000 rand type hash elements 100000")
235238

236-
await client.execute_command("multi")
237-
await client.execute_command("hgetall test:0")
238-
await client.execute_command("hgetall test:0")
239-
await client.execute_command("hgetall test:0")
240-
await client.execute_command("hgetall test:0")
241-
res = await client.execute_command("exec")
242-
assert type(res[1]) is redis.exceptions.ResponseError
243-
assert type(res[2]) is redis.exceptions.ResponseError
239+
async def poll():
240+
# At any point we should not cross this limit
241+
cl = df.client()
242+
await cl.execute_command("multi")
243+
await cl.execute_command("hgetall test:0")
244+
await cl.execute_command("exec")
245+
246+
# With the current approach this will overshoot
247+
# await client.execute_command("multi")
248+
# await client.execute_command("hgetall test:0")
249+
# await client.execute_command("hgetall test:0")
250+
# await client.execute_command("hgetall test:0")
251+
# await client.execute_command("hgetall test:0")
252+
# res = await client.execute_command("exec")
253+
tasks = []
254+
for i in range(50):
255+
tasks.append(asyncio.create_task(poll()))
256+
257+
for task in tasks:
258+
await task
259+
260+
df.stop()
261+
found = df.find_in_logs("MultiCommandSquasher overlimit: ")
262+
assert len(found) > 0

0 commit comments

Comments
 (0)