Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16524: Metrics for KIP-853 #18304

Merged
merged 22 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ import org.apache.kafka.raft.ExternalKRaftMetrics
import org.apache.kafka.server.metrics.BrokerServerMetrics

class DefaultExternalKRaftMetrics(
val brokerServerMetricsOpt: Option[BrokerServerMetrics],
val controllerMetadataMetricsOpt: Option[ControllerMetadataMetrics]
val brokerServerMetrics: Option[BrokerServerMetrics],
val controllerMetadataMetrics: Option[ControllerMetadataMetrics]
) extends ExternalKRaftMetrics {

override def setIgnoredStaticVoters(ignoredStaticVoters: Boolean): Unit = {
brokerServerMetricsOpt.foreach(metrics => metrics.setIgnoredStaticVoters(ignoredStaticVoters))
controllerMetadataMetricsOpt.foreach(metrics => metrics.setIgnoredStaticVoters(ignoredStaticVoters))
brokerServerMetrics.foreach(metrics => metrics.setIgnoredStaticVoters(ignoredStaticVoters))
controllerMetadataMetrics.foreach(metrics => metrics.setIgnoredStaticVoters(ignoredStaticVoters))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ final class DefaultExternalKRaftMetricsTest {
def testDefaultExternalKRaftMetrics(): Unit = {
val brokerServerMetrics = new BrokerServerMetrics(new Metrics())
val controllerMetadataMetrics = new ControllerMetadataMetrics(Optional.of(new MetricsRegistry()))
var metrics = new DefaultExternalKRaftMetrics(
val metrics = new DefaultExternalKRaftMetrics(
Option(brokerServerMetrics),
Option(controllerMetadataMetrics)
)
Expand All @@ -47,8 +47,11 @@ final class DefaultExternalKRaftMetricsTest {

assertFalse(brokerServerMetrics.ignoredStaticVoters())
assertFalse(controllerMetadataMetrics.ignoredStaticVoters())
}

metrics = new DefaultExternalKRaftMetrics(None, None)
@Test
def testEmptyDefaultExternalKRaftMetrics(): Unit = {
val metrics = new DefaultExternalKRaftMetrics(None, None)
metrics.setIgnoredStaticVoters(true)
}
}
2 changes: 2 additions & 0 deletions raft/src/main/java/org/apache/kafka/raft/LeaderState.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ protected LeaderState(
this.voterSetAtEpochStart = voterSetAtEpochStart;
this.offsetOfVotersAtEpochStart = offsetOfVotersAtEpochStart;
this.kraftVersionAtEpochStart = kraftVersionAtEpochStart;

kafkaRaftMetrics.addLeaderMetrics();
this.kafkaRaftMetrics = kafkaRaftMetrics;
}

Expand Down
1 change: 0 additions & 1 deletion raft/src/main/java/org/apache/kafka/raft/QuorumState.java
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,6 @@ public <T> LeaderState<T> transitionToLeader(long epochStartOffset, BatchAccumul
logContext,
kafkaRaftMetrics
);
kafkaRaftMetrics.addLeaderMetrics();

durableTransitionTo(state);
return state;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public final class KRaftControlRecordStateMachine {
private volatile long nextOffset = STARTING_NEXT_OFFSET;
private final KafkaRaftMetrics kafkaRaftMetrics;
private final ExternalKRaftMetrics externalKRaftMetrics;
private final Optional<VoterSet> staticVoterSet;
private final VoterSet staticVoterSet;

/**
* Constructs an internal log listener
Expand Down Expand Up @@ -103,9 +103,11 @@ public KRaftControlRecordStateMachine(
this.logger = logContext.logger(this.getClass());
this.kafkaRaftMetrics = kafkaRaftMetrics;
this.externalKRaftMetrics = externalKRaftMetrics;
this.staticVoterSet = staticVoterSet.size() > 0 ? Optional.of(staticVoterSet) : Optional.empty();
this.staticVoterSet = staticVoterSet;

this.staticVoterSet.ifPresent(voters -> kafkaRaftMetrics.updateNumVoters(voters.size()));
if (staticVoterSet.size() > 0) {
kafkaRaftMetrics.updateNumVoters(staticVoterSet.size());
}
jsancio marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand All @@ -130,7 +132,7 @@ public void truncateNewEntries(long endOffset) {
}

kafkaRaftMetrics.updateNumVoters(voterSetHistory.lastValue().size());
if (staticVoterSet.isPresent() && voterSetHistory.lastEntry().isEmpty()) {
if (staticVoterSet.size() > 0 && voterSetHistory.lastEntry().isEmpty()) {
jsancio marked this conversation as resolved.
Show resolved Hide resolved
externalKRaftMetrics.setIgnoredStaticVoters(false);
}
}
Expand Down Expand Up @@ -298,7 +300,7 @@ private void handleBatch(Batch<?> batch, OptionalLong overrideOffset) {
case KRAFT_VOTERS:
VoterSet voters = VoterSet.fromVotersRecord((VotersRecord) record.message());
kafkaRaftMetrics.updateNumVoters(voters.size());
if (staticVoterSet.isPresent()) {
if (staticVoterSet.size() > 0) {
jsancio marked this conversation as resolved.
Show resolved Hide resolved
externalKRaftMetrics.setIgnoredStaticVoters(true);
}
logger.info("Latest set of voters is {} at offset {}", voters, currentOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.snapshot.RecordsSnapshotWriter;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Optional;
import java.util.stream.IntStream;
Expand Down
Loading