Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16524: Metrics for KIP-853 #18304

Merged
merged 22 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/src/main/scala/kafka/server/SharedServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,13 @@ class SharedServer(
raftManager = _raftManager
_raftManager.startup()

if (sharedServerConfig.processRoles.contains(ProcessRole.BrokerRole)) {
brokerMetrics.addRaftMetrics(raftManager.client)
}
if (sharedServerConfig.processRoles.contains(ProcessRole.ControllerRole)) {
controllerServerMetrics.addRaftMetrics(raftManager.client)
}

metadataLoaderMetrics = if (brokerMetrics != null) {
new MetadataLoaderMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()),
elapsedNs => brokerMetrics.updateBatchProcessingTime(elapsedNs),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.kafka.controller.metrics;

import org.apache.kafka.raft.KafkaRaftClient;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;

import com.yammer.metrics.core.Gauge;
Expand Down Expand Up @@ -59,6 +60,12 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
private static final MetricName UNCLEAN_LEADER_ELECTIONS_PER_SEC = getMetricName(
"ControllerStats", "UncleanLeaderElectionsPerSec");

private static final MetricName IGNORED_STATIC_VOTERS = getMetricName(
"KafkaController", "IgnoredStaticVoters");

private static final MetricName IS_OBSERVER = getMetricName(
"KafkaController", "IsObserver");

private final Optional<MetricsRegistry> registry;
private final AtomicInteger fencedBrokerCount = new AtomicInteger(0);
private final AtomicInteger activeBrokerCount = new AtomicInteger(0);
Expand Down Expand Up @@ -139,6 +146,22 @@ public Integer value() {
Optional.of(registry.get().newMeter(UNCLEAN_LEADER_ELECTIONS_PER_SEC, "elections", TimeUnit.SECONDS)));
}

public <T> void addRaftMetrics(KafkaRaftClient<T> client) {
registry.ifPresent(r -> r.newGauge(IGNORED_STATIC_VOTERS, new Gauge<Integer>() {
@Override
public Integer value() {
return client.ignoredStaticVoters();
}
}));

registry.ifPresent(r -> r.newGauge(IS_OBSERVER, new Gauge<Integer>() {
@Override
public Integer value() {
return client.isObserver();
}
}));
}

public void setFencedBrokerCount(int brokerCount) {
this.fencedBrokerCount.set(brokerCount);
}
Expand Down Expand Up @@ -255,7 +278,9 @@ public void close() {
PREFERRED_REPLICA_IMBALANCE_COUNT,
METADATA_ERROR_COUNT,
ZK_MIGRATION_STATE,
UNCLEAN_LEADER_ELECTIONS_PER_SEC
UNCLEAN_LEADER_ELECTIONS_PER_SEC,
IGNORED_STATIC_VOTERS,
IS_OBSERVER
).forEach(r::removeMetric));
}

Expand Down
12 changes: 12 additions & 0 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,7 @@ private void onBecomeLeader(long currentTimeMs) {

resetConnections();
kafkaRaftMetrics.maybeUpdateElectionLatency(currentTimeMs);
kafkaRaftMetrics.addLeaderMetrics(state);
}

private void flushLeaderLog(LeaderState<T> state, long currentTimeMs) {
Expand Down Expand Up @@ -682,13 +683,15 @@ private void transitionToCandidate(long currentTimeMs) {
private void transitionToUnattached(int epoch) {
quorum.transitionToUnattached(epoch);
maybeFireLeaderChange();
kafkaRaftMetrics.removeLeaderMetrics();
resetConnections();
}

private void transitionToResigned(List<ReplicaKey> preferredSuccessors) {
fetchPurgatory.completeAllExceptionally(
Errors.NOT_LEADER_OR_FOLLOWER.exception("Not handling request since this node is resigning"));
quorum.transitionToResigned(preferredSuccessors);
kafkaRaftMetrics.removeLeaderMetrics();
jsancio marked this conversation as resolved.
Show resolved Hide resolved
resetConnections();
}

Expand All @@ -698,6 +701,7 @@ private void transitionToUnattachedVoted(ReplicaKey candidateKey, int epoch) {

private void onBecomeFollower(long currentTimeMs) {
kafkaRaftMetrics.maybeUpdateElectionLatency(currentTimeMs);
kafkaRaftMetrics.removeLeaderMetrics();

resetConnections();

Expand Down Expand Up @@ -3515,6 +3519,14 @@ public Optional<Node> voterNode(int id, ListenerName listenerName) {
return partitionState.lastVoterSet().voterNode(id, listenerName);
}

public int ignoredStaticVoters() {
jsancio marked this conversation as resolved.
Show resolved Hide resolved
return partitionState.lastVoterSetOffset().isPresent() ? 1 : 0;
}

public int isObserver() {
return quorum.isObserver() ? 1 : 0;
}
jsancio marked this conversation as resolved.
Show resolved Hide resolved

// Visible only for test
QuorumState quorum() {
// It's okay to return null since this method is only called by tests
Expand Down
4 changes: 4 additions & 0 deletions raft/src/main/java/org/apache/kafka/raft/LeaderState.java
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,10 @@ Map<ReplicaKey, ReplicaState> observerStates(final long currentTimeMs) {
return observerStates;
}

public int numObservers() {
return observerStates.size();
}

public Set<Integer> grantingVoters() {
return this.grantingVoters;
}
Expand Down
4 changes: 4 additions & 0 deletions raft/src/main/java/org/apache/kafka/raft/QuorumState.java
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,10 @@ public boolean isVoter(ReplicaKey nodeKey) {
return partitionState.lastVoterSet().isVoter(nodeKey);
}

public int numVoters() {
return partitionState.lastVoterSet().size();
}

public boolean isObserver() {
return !isVoter();
}
Expand Down
7 changes: 7 additions & 0 deletions raft/src/main/java/org/apache/kafka/raft/VoterSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ public Set<VoterNode> voterNodes() {
return new HashSet<>(voters.values());
}

/**
* Returns size of the voter set.
jsancio marked this conversation as resolved.
Show resolved Hide resolved
*/
public int size() {
return voters.size();
}

/**
* Returns all of the endpoints for a voter id.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.WindowedSum;
import org.apache.kafka.raft.LeaderState;
import org.apache.kafka.raft.LogOffsetMetadata;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.QuorumState;
Expand All @@ -51,7 +52,10 @@ public class KafkaRaftMetrics implements AutoCloseable {
private final MetricName highWatermarkMetricName;
private final MetricName logEndOffsetMetricName;
private final MetricName logEndEpochMetricName;
private final MetricName numObserversMetricName;
private final MetricName numUnknownVoterConnectionsMetricName;
private final MetricName numVotersMetricName;
private final MetricName uncommittedVoterChangeMetricName;
private final Sensor commitTimeSensor;
private final Sensor electionTimeSensor;
private final Sensor fetchRecordsSensor;
Expand Down Expand Up @@ -138,6 +142,14 @@ public KafkaRaftMetrics(Metrics metrics, String metricGrpPrefix, QuorumState sta
"Number of unknown voters whose connection information is not cached; would never be larger than quorum-size.");
metrics.addMetric(this.numUnknownVoterConnectionsMetricName, (mConfig, currentTimeMs) -> numUnknownVoterConnections);

this.numVotersMetricName = metrics.metricName("number-of-voters", metricGroupName, "Number of voters for a KRaft topic partition.");
metrics.addMetric(this.numVotersMetricName, (Gauge<Integer>) (mConfig, currentTimestamp) -> state.numVoters());

// These metrics should only be present on the leader, otherwise they do not make sense.
// They should be added when a replica becomes leader and removed when it is no longer leader.
this.numObserversMetricName = metrics.metricName("number-of-observers", metricGroupName, "Number of observers being tracked by the KRaft topic partition leader.");
this.uncommittedVoterChangeMetricName = metrics.metricName("uncommitted-voter-change", metricGroupName, "1 if there is a voter change that has not been committed, 0 otherwise.");

this.commitTimeSensor = metrics.sensor("commit-latency");
this.commitTimeSensor.add(metrics.metricName("commit-latency-avg", metricGroupName,
"The average time in milliseconds to commit an entry in the raft log."), new Avg());
Expand Down Expand Up @@ -215,6 +227,22 @@ public void maybeUpdateElectionLatency(long currentTimeMs) {
}
}

public <T> void addLeaderMetrics(LeaderState<T> leaderState) {
metrics.addMetric(numObserversMetricName, (Gauge<Integer>) (config, now) -> leaderState.numObservers());
metrics.addMetric(uncommittedVoterChangeMetricName, (Gauge<Integer>) (config, now) -> {
if (leaderState.addVoterHandlerState().isEmpty() && leaderState.removeVoterHandlerState().isEmpty()) {
return 0;
} else {
return 1;
}
});
}
jsancio marked this conversation as resolved.
Show resolved Hide resolved

public void removeLeaderMetrics() {
metrics.removeMetric(numObserversMetricName);
metrics.removeMetric(uncommittedVoterChangeMetricName);
}
jsancio marked this conversation as resolved.
Show resolved Hide resolved

@Override
public void close() {
jsancio marked this conversation as resolved.
Show resolved Hide resolved
Arrays.asList(
Expand All @@ -226,7 +254,10 @@ public void close() {
highWatermarkMetricName,
logEndOffsetMetricName,
logEndEpochMetricName,
numUnknownVoterConnectionsMetricName
numObserversMetricName,
numUnknownVoterConnectionsMetricName,
numVotersMetricName,
uncommittedVoterChangeMetricName
).forEach(metrics::removeMetric);

Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.message.VotersRecord;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.ControlRecordType;
Expand Down Expand Up @@ -63,6 +65,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class KafkaRaftClientReconfigTest {
Expand Down Expand Up @@ -336,6 +339,11 @@ public void testAddVoter() throws Exception {
context.becomeLeader();
int epoch = context.currentEpoch();

// Check expected metrics values for leader
assertEquals(2, getMetric(context.metrics, "number-of-voters").metricValue());
assertEquals(0, getMetric(context.metrics, "number-of-observers").metricValue());
assertEquals(0, getMetric(context.metrics, "uncommitted-voter-change").metricValue());

ReplicaKey newVoter = replicaKey(local.id() + 2, true);
InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
"localhost",
Expand All @@ -355,12 +363,13 @@ public void testAddVoter() throws Exception {
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id()));

// Catch up the new voter to the leader's LEO
// Catch up the new voter to the leader's LEO, the new voter is still an observer at this point
context.deliverRequest(
context.fetchRequest(epoch, newVoter, context.log.endOffset().offset(), epoch, 0)
);
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id()));
assertEquals(1, getMetric(context.metrics, "number-of-observers").metricValue());

// Attempt to add new voter to the quorum
context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, newVoter, newListeners));
Expand All @@ -386,6 +395,8 @@ public void testAddVoter() throws Exception {
context.client.poll();
// The new voter is now a voter after writing the VotersRecord to the log
assertTrue(context.client.quorum().isVoter(newVoter));
assertEquals(1, getMetric(context.metrics, "uncommitted-voter-change").metricValue());
assertEquals(3, getMetric(context.metrics, "number-of-voters").metricValue());

// Send a FETCH to increase the HWM and commit the new voter set
context.deliverRequest(
Expand All @@ -397,6 +408,8 @@ public void testAddVoter() throws Exception {
// Expect reply for AddVoter request
context.pollUntilResponse();
context.assertSentAddVoterResponse(Errors.NONE);
assertEquals(0, getMetric(context.metrics, "number-of-observers").metricValue());
assertEquals(0, getMetric(context.metrics, "uncommitted-voter-change").metricValue());
}

@Test
Expand Down Expand Up @@ -939,6 +952,11 @@ void testAddVoterFailsWhenLosingLeadership() throws Exception {
context.becomeLeader();
int epoch = context.currentEpoch();

// Check expected metrics values for leader
assertEquals(2, getMetric(context.metrics, "number-of-voters").metricValue());
assertEquals(0, getMetric(context.metrics, "number-of-observers").metricValue());
assertEquals(0, getMetric(context.metrics, "uncommitted-voter-change").metricValue());

ReplicaKey newVoter = replicaKey(local.id() + 2, true);
InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
"localhost",
Expand Down Expand Up @@ -970,6 +988,10 @@ void testAddVoterFailsWhenLosingLeadership() throws Exception {
context.client.resign(epoch);
context.pollUntilResponse();
context.assertSentAddVoterResponse(Errors.NOT_LEADER_OR_FOLLOWER);

assertEquals(2, getMetric(context.metrics, "number-of-voters").metricValue());
assertNull(getMetric(context.metrics, "number-of-observers"));
assertNull(getMetric(context.metrics, "uncommitted-voter-change"));
}

@Test
Expand Down Expand Up @@ -1029,6 +1051,11 @@ public void testRemoveVoter() throws Exception {

assertTrue(context.client.quorum().isVoter(follower2));

// Check expected metrics values for leader
assertEquals(3, getMetric(context.metrics, "number-of-voters").metricValue());
assertEquals(0, getMetric(context.metrics, "number-of-observers").metricValue());
assertEquals(0, getMetric(context.metrics, "uncommitted-voter-change").metricValue());

// Establish a HWM and fence previous leaders
context.deliverRequest(
context.fetchRequest(epoch, follower1, context.log.endOffset().offset(), epoch, 0)
Expand All @@ -1043,9 +1070,11 @@ public void testRemoveVoter() throws Exception {
context.client.poll();
// Append the VotersRecord to the log
context.client.poll();
assertEquals(1, getMetric(context.metrics, "uncommitted-voter-change").metricValue());

// follower2 should not be a voter in the latest voter set
assertFalse(context.client.quorum().isVoter(follower2));
assertEquals(2, getMetric(context.metrics, "number-of-voters").metricValue());

// Send a FETCH to increase the HWM and commit the new voter set
context.deliverRequest(
Expand All @@ -1057,6 +1086,8 @@ public void testRemoveVoter() throws Exception {
// Expect reply for RemoveVoter request
context.pollUntilResponse();
context.assertSentRemoveVoterResponse(Errors.NONE);
assertEquals(1, getMetric(context.metrics, "number-of-observers").metricValue());
assertEquals(0, getMetric(context.metrics, "uncommitted-voter-change").metricValue());
}

@Test
Expand All @@ -1076,6 +1107,11 @@ public void testRemoveVoterIsLeader() throws Exception {
context.becomeLeader();
int epoch = context.currentEpoch();

// Check expected metrics values for leader
assertEquals(3, getMetric(context.metrics, "number-of-voters").metricValue());
assertEquals(0, getMetric(context.metrics, "number-of-observers").metricValue());
assertEquals(0, getMetric(context.metrics, "uncommitted-voter-change").metricValue());

// Establish a HWM and fence previous leaders
context.deliverRequest(
context.fetchRequest(epoch, follower1, context.log.endOffset().offset(), epoch, 0)
Expand All @@ -1093,6 +1129,8 @@ public void testRemoveVoterIsLeader() throws Exception {

// local should not be a voter in the latest voter set
assertFalse(context.client.quorum().isVoter(local));
assertEquals(2, getMetric(context.metrics, "number-of-voters").metricValue());
assertEquals(1, getMetric(context.metrics, "uncommitted-voter-change").metricValue());

// Send a FETCH request for follower1
context.deliverRequest(
Expand All @@ -1107,6 +1145,8 @@ public void testRemoveVoterIsLeader() throws Exception {
);
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id()));
assertEquals(1, getMetric(context.metrics, "number-of-observers").metricValue());
assertEquals(0, getMetric(context.metrics, "uncommitted-voter-change").metricValue());

// Expect reply for RemoveVoter request
context.pollUntilResponse();
Expand All @@ -1122,6 +1162,8 @@ public void testRemoveVoterIsLeader() throws Exception {

// Calls to resign should be allowed and not throw an exception
context.client.resign(epoch);
assertNull(getMetric(context.metrics, "number-of-observers"));
assertNull(getMetric(context.metrics, "uncommitted-voter-change"));

// Election timeout is random number in [electionTimeoutMs, 2 * electionTimeoutMs)
context.time.sleep(2L * context.electionTimeoutMs());
Expand Down Expand Up @@ -2351,4 +2393,8 @@ private static ApiVersionsResponseData apiVersionsResponse(Errors error, Support
.setErrorCode(error.code())
.setSupportedFeatures(supportedFeatures);
}

private static KafkaMetric getMetric(final Metrics metrics, final String name) {
return metrics.metrics().get(metrics.metricName(name, "raft-metrics"));
}
}
Loading
Loading