Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replicate batcher time based #24967

Draft
wants to merge 1 commit into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ consensus::consensus(
, _client_protocol(client)
, _leader_notification(std::move(cb))
, _fstats(_self)
, _batcher(this, config::shard_local_cfg().raft_replicate_batch_window_size())
, _batcher(
this,
config::shard_local_cfg().raft_replicate_batch_window_size(),
scheduling_config.produce_sg)
, _event_manager(this)
, _probe(std::make_unique<probe>())
, _ctxlog(group, _log->config().ntp())
Expand Down
4 changes: 3 additions & 1 deletion src/v/raft/group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ namespace raft {

group_manager::group_manager(
model::node_id self,
ss::scheduling_group produce_sg,
ss::scheduling_group raft_sg,
group_manager::config_provider_fn cfg,
recovery_memory_quota::config_provider_fn recovery_mem_cfg,
Expand All @@ -35,6 +36,7 @@ group_manager::group_manager(
ss::sharded<coordinated_recovery_throttle>& recovery_throttle,
ss::sharded<features::feature_table>& feature_table)
: _self(self)
, _produce_sg(produce_sg)
, _raft_sg(raft_sg)
, _configuration(cfg())
, _buffered_protocol(ss::make_shared<buffered_protocol>(
Expand Down Expand Up @@ -131,7 +133,7 @@ ss::future<ss::lw_shared_ptr<raft::consensus>> group_manager::create_group(
raft::group_configuration(nodes, revision),
raft::timeout_jitter(_configuration.election_timeout_ms),
log,
scheduling_config(_raft_sg, raft_priority()),
scheduling_config(_raft_sg, raft_priority(), _produce_sg),
_configuration.raft_io_timeout_ms,
_configuration.enable_longest_log_detection,
consensus_client_protocol(_buffered_protocol),
Expand Down
2 changes: 2 additions & 0 deletions src/v/raft/group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class group_manager {

group_manager(
model::node_id self,
ss::scheduling_group produce_sg,
ss::scheduling_group raft_scheduling_group,
config_provider_fn,
recovery_memory_quota::config_provider_fn recovery_mem_cfg,
Expand Down Expand Up @@ -109,6 +110,7 @@ class group_manager {
do_shutdown(ss::lw_shared_ptr<consensus>, bool remove_persistent_state);

model::node_id _self;
ss::scheduling_group _produce_sg;
ss::scheduling_group _raft_sg;
configuration _configuration;
ss::shared_ptr<raft::buffered_protocol> _buffered_protocol;
Expand Down
62 changes: 42 additions & 20 deletions src/v/raft/replicate_batcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,18 @@

namespace raft {
using namespace std::chrono_literals; // NOLINT
replicate_batcher::replicate_batcher(consensus* ptr, size_t cache_size)
namespace {
std::chrono::microseconds flush_interval = 2ms;
}
replicate_batcher::replicate_batcher(
consensus* ptr, size_t cache_size, ss::scheduling_group sg)
: _ptr(ptr)
, _max_batch_size_sem(cache_size, "raft/repl-batch")
, _max_batch_size(cache_size) {}
, _max_batch_size(cache_size)
, _sg(sg) {
_flush_timer.set_callback([this] { _trigger.signal(); });
flush_dispatch_loop();
}

replicate_stages replicate_batcher::replicate(
std::optional<model::term_id> expected_term,
Expand All @@ -40,6 +48,12 @@ replicate_stages replicate_batcher::replicate(
std::move(enqueued), expected_term, std::move(batches), opts);
return {std::move(enqueued_f), std::move(f)};
}
void replicate_batcher::flush_dispatch_loop() {
return ssx::repeat_until_gate_closed(_bg, [this] {
return ss::with_scheduling_group(
_sg, [this] { return flush_dispatch(); });
});
}

ss::future<result<replicate_result>>
replicate_batcher::cache_and_wait_for_result(
Expand All @@ -65,25 +79,12 @@ replicate_batcher::cache_and_wait_for_result(
* replicate batcher stop method
*
*/
if (!_flush_pending) {
_flush_pending = true;
ssx::background = ssx::spawn_with_gate_then(_bg, [this]() {
return _lock.get_units()
.then([this](auto units) {
return flush(std::move(units), false);
})
.handle_exception([this](const std::exception_ptr& e) {
// an exception here is quite unlikely, since the flush()
// method generally catches all its exceptions and
// propagates them to the promises associated with the
// items being flushed
vlog(
_ptr->_ctxlog.error,
"Error in background flush: {}",
e);
});
});

if (!_flush_timer.armed()) {
_armed_at = std::chrono::steady_clock::now();
_flush_timer.arm(flush_interval);
}
_trigger.signal();
} catch (...) {
// exception in caching phase
enqueued.set_to_current_exception();
Expand All @@ -94,6 +95,7 @@ replicate_batcher::cache_and_wait_for_result(
}

ss::future<> replicate_batcher::stop() {
_trigger.broken();
return _bg.close().then([this] {
// we keep a lock here to make sure that all inflight requests have
// finished already
Expand All @@ -107,6 +109,26 @@ ss::future<> replicate_batcher::stop() {
});
}

bool replicate_batcher::can_flush() const {
return !_item_cache.empty() && (_armed_at + flush_interval < std::chrono::steady_clock::now()
|| (_max_batch_size - _max_batch_size_sem.available_units()) >= 512_KiB);
}

ss::future<> replicate_batcher::flush_dispatch() {
return _trigger.wait([this] { return can_flush(); }).then([this] {
_flush_pending = true;

return _lock.get_units()
.then([this](auto units) { return flush(std::move(units), false); })
.handle_exception([this](const std::exception_ptr& e) {
// an exception here is quite unlikely, since the flush()
// method generally catches all its exceptions and
// propagates them to the promises associated with the
// items being flushed
vlog(_ptr->_ctxlog.error, "Error in background flush: {}", e);
});
});
}
ss::future<replicate_batcher::item_ptr> replicate_batcher::do_cache(
std::optional<model::term_id> expected_term,
chunked_vector<model::record_batch> batches,
Expand Down
12 changes: 11 additions & 1 deletion src/v/raft/replicate_batcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ class replicate_batcher {
ss::promise<result<replicate_result>> _promise;
};
using item_ptr = ss::lw_shared_ptr<item>;
explicit replicate_batcher(consensus* ptr, size_t cache_size);
explicit replicate_batcher(
consensus* ptr, size_t cache_size, ss::scheduling_group sg);

replicate_batcher(replicate_batcher&&) noexcept = default;
replicate_batcher& operator=(replicate_batcher&&) noexcept = delete;
Expand All @@ -133,6 +134,10 @@ class replicate_batcher {
ss::future<> stop();

private:
void flush_dispatch_loop();

bool can_flush() const;
ss::future<> flush_dispatch();
ss::future<> do_flush(
std::vector<item_ptr>,
append_entries_request,
Expand Down Expand Up @@ -161,6 +166,11 @@ class replicate_batcher {
size_t _max_batch_size;
std::vector<item_ptr> _item_cache;
mutex _lock{"replicate_batcher"};
ss::condition_variable _trigger;
ss::timer<> _flush_timer;
std::chrono::steady_clock::time_point _armed_at
= std::chrono::steady_clock::now();
ss::scheduling_group _sg;
ss::gate _bg;
// If true, a background flush must be pending. Used to coalesce
// background flush requests, since one flush dequeues all items
Expand Down
14 changes: 10 additions & 4 deletions src/v/raft/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -742,20 +742,26 @@ struct scheduling_config {
ss::scheduling_group default_sg,
ss::io_priority_class default_iopc,
ss::scheduling_group learner_recovery_sg,
ss::io_priority_class learner_recovery_iopc)
ss::io_priority_class learner_recovery_iopc,
ss::scheduling_group produce_sg)
: default_sg(default_sg)
, default_iopc(default_iopc)
, learner_recovery_sg(learner_recovery_sg)
, learner_recovery_iopc(learner_recovery_iopc) {}
, learner_recovery_iopc(learner_recovery_iopc)
, produce_sg(produce_sg) {}

scheduling_config(
ss::scheduling_group default_sg, ss::io_priority_class default_iopc)
: scheduling_config(default_sg, default_iopc, default_sg, default_iopc) {}
ss::scheduling_group default_sg,
ss::io_priority_class default_iopc,
ss::scheduling_group produce_sg)
: scheduling_config(
default_sg, default_iopc, default_sg, default_iopc, produce_sg) {}

ss::scheduling_group default_sg;
ss::io_priority_class default_iopc;
ss::scheduling_group learner_recovery_sg;
ss::io_priority_class learner_recovery_iopc;
ss::scheduling_group produce_sg;
};

std::ostream& operator<<(std::ostream& o, const consistency_level& l);
Expand Down
1 change: 1 addition & 0 deletions src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1435,6 +1435,7 @@ void application::wire_up_redpanda_services(
raft_group_manager
.start(
node_id,
sched_groups.produce_sg(),
sched_groups.raft_sg(),
[] {
return raft::group_manager::configuration{
Expand Down