Skip to content

Commit

Permalink
kafka/server: add consumer lag metrics
Browse files Browse the repository at this point in the history
Note that this commit contains only the metric infrastructure, i.e. the
probe and the mechanism to dynamically enable/disable these metrics.
A subsequent commit will implement the logic to populate the consumer
lag metrics data.
  • Loading branch information
IoannisRP committed Feb 3, 2025
1 parent de05a16 commit 53aa756
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 6 deletions.
18 changes: 16 additions & 2 deletions src/v/kafka/server/group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,13 @@ group::group(
, _conf(conf)
, _catchup_lock(std::move(catchup_lock))
, _partition(std::move(partition))
, _probe(_members, _static_members, _offsets)
, _probe(_members, _static_members, _offsets, _lag_metrics)
, _ctxlog(klog, *this)
, _ctx_txlog(cluster::txlog, *this)
, _md_serializer(std::move(serializer))
, _term(term)
, _enable_group_metrics(group_metrics)
, _enable_consumer_lag_metrics(conf.enable_consumer_group_lag_metrics.bind())
, _abort_interval_ms(config::shard_local_cfg()
.abort_timed_out_transactions_interval_ms.value())
, _tx_frontend(tx_frontend)
Expand Down Expand Up @@ -165,12 +166,13 @@ group::group(
, _conf(conf)
, _catchup_lock(std::move(catchup_lock))
, _partition(std::move(partition))
, _probe(_members, _static_members, _offsets)
, _probe(_members, _static_members, _offsets, _lag_metrics)
, _ctxlog(klog, *this)
, _ctx_txlog(cluster::txlog, *this)
, _md_serializer(std::move(serializer))
, _term(term)
, _enable_group_metrics(group_metrics)
, _enable_consumer_lag_metrics(conf.enable_consumer_group_lag_metrics.bind())
, _abort_interval_ms(config::shard_local_cfg()
.abort_timed_out_transactions_interval_ms.value())
, _tx_frontend(tx_frontend)
Expand Down Expand Up @@ -3620,7 +3622,19 @@ void group::setup_metrics() {
if (!_enable_group_metrics) {
return;
}

_enable_consumer_lag_metrics.watch([this]() {
if (_enable_consumer_lag_metrics()) {
_probe.register_consumer_lag_metrics(_id);
} else {
_probe.deregister_consumer_lag_metrics();
}
});

_probe.setup_public_metrics(_id);
if (_enable_consumer_lag_metrics()) {
_probe.register_consumer_lag_metrics(_id);
}
}

} // namespace kafka
1 change: 1 addition & 0 deletions src/v/kafka/server/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,7 @@ class group final : public ss::enable_lw_shared_from_this<group> {
model::topic_partition,
std::unique_ptr<offset_metadata_with_probe>>
_offsets;
consumer_lag_metrics _lag_metrics;
group_probe<
model::topic_partition,
std::unique_ptr<offset_metadata_with_probe>>
Expand Down
55 changes: 52 additions & 3 deletions src/v/kafka/server/group_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ class group_offset_probe {
metrics::public_metric_groups _public_metrics;
};

struct consumer_lag_metrics {
size_t sum{};
size_t max{};
};

template<typename KeyType, typename ValType>
class group_probe {
using member_map = absl::node_hash_map<kafka::member_id, member_ptr>;
Expand All @@ -100,10 +105,14 @@ class group_probe {
explicit group_probe(
member_map& members,
static_member_map& static_members,
offsets_map& offsets) noexcept
offsets_map& offsets,
consumer_lag_metrics& lag_metrics) noexcept
: _members(members)
, _static_members(static_members)
, _offsets(offsets) {}
, _offsets(offsets)
, _lag_metrics(lag_metrics)
, _disable_public_metrics(
config::shard_local_cfg().disable_public_metrics()) {}

group_probe(const group_probe&) = delete;
group_probe& operator=(const group_probe&) = delete;
Expand All @@ -114,7 +123,7 @@ class group_probe {
void setup_public_metrics(const kafka::group_id& group_id) {
namespace sm = ss::metrics;

if (config::shard_local_cfg().disable_public_metrics()) {
if (_disable_public_metrics) {
return;
}

Expand All @@ -137,11 +146,51 @@ class group_probe {
labels)});
}

void register_consumer_lag_metrics(const kafka::group_id& group_id) {
if (_disable_public_metrics) {
return;
}

if (_public_metrics_consumer_lag.has_value()) {
return;
}

namespace sm = ss::metrics;
auto group_label = metrics::make_namespaced_label("group");
std::vector<sm::label_instance> labels{group_label(group_id())};

_public_metrics_consumer_lag.emplace();

_public_metrics_consumer_lag->add_group(
prometheus_sanitize::metrics_name("kafka:consumer:group"),
{sm::make_gauge(
"lag_sum",
[this] { return _lag_metrics.sum; },
sm::description(
"Sum of consumer group lag for all partitions in a group"),
labels),
sm::make_gauge(
"lag_max",
[this] { return _lag_metrics.max; },
sm::description(
"Maximum consumer group lag across all partitions in a group"),
labels)});
}
void deregister_consumer_lag_metrics() {
_public_metrics_consumer_lag = std::nullopt;
}

private:
member_map& _members;
static_member_map& _static_members;
offsets_map& _offsets;
consumer_lag_metrics& _lag_metrics;
metrics::public_metric_groups _public_metrics;
// A different group is used for these as they need to be de-register-able.
// To deregister the metrics the public_metric_groups object needs to be
// destroyed. For this reason, a nullable type is being used here.
std::optional<metrics::public_metric_groups> _public_metrics_consumer_lag;
bool _disable_public_metrics;
};

} // namespace kafka
60 changes: 59 additions & 1 deletion tests/rptest/tests/consumer_group_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from rptest.clients.types import TopicSpec
from rptest.services.kafka_cli_consumer import KafkaCliConsumer
from rptest.services.kgo_verifier_services import KgoVerifierProducer
from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST, RedpandaService
from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST, RedpandaService, MetricsEndpoint
from rptest.services.rpk_producer import RpkProducer
from rptest.services.verifiable_consumer import VerifiableConsumer
from rptest.tests.redpanda_test import RedpandaTest
Expand Down Expand Up @@ -658,6 +658,64 @@ def test_consumer_static_member_update(self):
self.producer.wait()
self.producer.free()

@cluster(num_nodes=6)
@parametrize(enable_consumer_lag_metrics=True)
@parametrize(enable_consumer_lag_metrics=False)
def test_group_lag_metrics_dynamic_registration(
self, enable_consumer_lag_metrics):
"""
Test validating that consumer lag metrics can by dynamically enabled/disabled
"""
self.redpanda.set_cluster_config(
{"enable_consumer_group_lag_metrics": enable_consumer_lag_metrics})

self.create_topic(20)
group = 'test-gr-1'
# use 2 consumers
consumers = self.create_consumers(2,
self.topic_spec.name,
group,
static_members=False)

self.start_producer()
# wait for some messages
wait_until(
lambda: ConsumerGroupTest.group_consumed_at_least(
consumers, 50 * len(consumers)), 30, 2)
self.validate_group_state(group,
expected_state="Stable",
static_members=False)

#node = random.choice(self.redpanda.started_nodes())
def get_consumer_lag_metrics_from_nodes():
patterns = [
"redpanda_kafka_consumer_group_lag_max",
"redpanda_kafka_consumer_group_lag_sum"
]
samples = self.redpanda.metrics_samples(
patterns, self.redpanda.started_nodes(),
MetricsEndpoint.PUBLIC_METRICS)
success = samples is not None and set(
samples.keys()) == set(patterns)
return success, samples

has_consumer_lag_metrics, _ = get_consumer_lag_metrics_from_nodes()
assert has_consumer_lag_metrics == enable_consumer_lag_metrics, f"Expected value '{enable_consumer_lag_metrics}' but got '{has_consumer_lag_metrics}'"

flipped_value = not enable_consumer_lag_metrics
self.redpanda.set_cluster_config(
{"enable_consumer_group_lag_metrics": flipped_value})
has_consumer_lag_metrics, _ = get_consumer_lag_metrics_from_nodes()
assert has_consumer_lag_metrics == flipped_value, f"Expected value '{flipped_value}' but got '{has_consumer_lag_metrics}'"

self.producer.wait()
self.producer.free()

for c in consumers:
c.stop()
c.wait()
c.free()


@dataclass
class OffsetAndMetadata():
Expand Down

0 comments on commit 53aa756

Please sign in to comment.