Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16524: Metrics for KIP-853 #18304

Merged
merged 22 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ class DefaultExternalKRaftMetrics(
val controllerMetadataMetricsOpt: Option[ControllerMetadataMetrics]
) extends ExternalKRaftMetrics {

override def setIgnoredStaticVoters(): Unit = {
brokerServerMetricsOpt.foreach(metrics => metrics.setIgnoredStaticVoters())
controllerMetadataMetricsOpt.foreach(metrics => metrics.setIgnoredStaticVoters())
override def setIgnoredStaticVoters(ignoredStaticVoters: Boolean): Unit = {
brokerServerMetricsOpt.foreach(metrics => metrics.setIgnoredStaticVoters(ignoredStaticVoters))
controllerMetadataMetricsOpt.foreach(metrics => metrics.setIgnoredStaticVoters(ignoredStaticVoters))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,17 @@ final class DefaultExternalKRaftMetricsTest {
assertFalse(brokerServerMetrics.ignoredStaticVoters())
assertFalse(controllerMetadataMetrics.ignoredStaticVoters())

metrics.setIgnoredStaticVoters()
metrics.setIgnoredStaticVoters(true)

assertTrue(brokerServerMetrics.ignoredStaticVoters())
assertTrue(controllerMetadataMetrics.ignoredStaticVoters())

metrics.setIgnoredStaticVoters()
metrics.setIgnoredStaticVoters(false)

assertTrue(brokerServerMetrics.ignoredStaticVoters())
assertTrue(controllerMetadataMetrics.ignoredStaticVoters())
assertFalse(brokerServerMetrics.ignoredStaticVoters())
assertFalse(controllerMetadataMetrics.ignoredStaticVoters())

metrics = new DefaultExternalKRaftMetrics(None, None)
metrics.setIgnoredStaticVoters()
metrics.setIgnoredStaticVoters(true)
jsancio marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ public void updateUncleanLeaderElection(int count) {
this.uncleanLeaderElectionMeter.ifPresent(m -> m.mark(count));
}

public void setIgnoredStaticVoters() {
ignoredStaticVoters.set(true);
public void setIgnoredStaticVoters(boolean ignored) {
ignoredStaticVoters.set(ignored);
}

public boolean ignoredStaticVoters() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@
* values are updated in the raft layer. They are not allowed to block.
*/
public interface ExternalKRaftMetrics {
void setIgnoredStaticVoters();
void setIgnoredStaticVoters(boolean ignoredStaticVoters);
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ public void truncateNewEntries(long endOffset) {
}

kafkaRaftMetrics.updateNumVoters(voterSetHistory.lastValue().size());
if (staticVoterSet.isPresent() && voterSetHistory.lastEntry().isEmpty()) {
externalKRaftMetrics.setIgnoredStaticVoters(false);
}
}

/**
Expand Down Expand Up @@ -296,7 +299,7 @@ private void handleBatch(Batch<?> batch, OptionalLong overrideOffset) {
VoterSet voters = VoterSet.fromVotersRecord((VotersRecord) record.message());
kafkaRaftMetrics.updateNumVoters(voters.size());
if (staticVoterSet.isPresent()) {
externalKRaftMetrics.setIgnoredStaticVoters();
externalKRaftMetrics.setIgnoredStaticVoters(true);
}
logger.info("Latest set of voters is {} at offset {}", voters, currentOffset);
synchronized (voterSetHistory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,11 +401,11 @@ public void testAddVoter() throws Exception {
);
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id()));
checkLeaderMetricValues(3, 0, false, false, context);

// Expect reply for AddVoter request
context.pollUntilResponse();
context.assertSentAddVoterResponse(Errors.NONE);
checkLeaderMetricValues(3, 0, false, false, context);
}

@Test
Expand Down Expand Up @@ -1071,11 +1071,11 @@ public void testRemoveVoter() throws Exception {
);
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id()));
checkLeaderMetricValues(2, 1, false, false, context);

// Expect reply for RemoveVoter request
context.pollUntilResponse();
context.assertSentRemoveVoterResponse(Errors.NONE);
checkLeaderMetricValues(2, 1, false, false, context);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ public class MockExternalKRaftMetrics implements ExternalKRaftMetrics {
private boolean ignoredStaticVoters = false;

@Override
public void setIgnoredStaticVoters() {
ignoredStaticVoters = true;
public void setIgnoredStaticVoters(boolean ignoredStaticVoters) {
this.ignoredStaticVoters = ignoredStaticVoters;
}

// visible for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,11 +369,6 @@ Builder withBootstrapSnapshot(Optional<VoterSet> voters) {
return this;
}

Builder setStartingVotersStatic() {
isStartingVotersStatic = true;
return this;
}

Builder withLocalListeners(Endpoints localListeners) {
this.localListeners = localListeners;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@
import java.util.stream.IntStream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

final class KRaftControlRecordStateMachineTest {
private static Metrics metrics = new Metrics();
Expand All @@ -53,10 +51,6 @@ private static MockLog buildLog() {
return new MockLog(new TopicPartition("partition", 0), Uuid.randomUuid(), new LogContext());
}

private static KafkaMetric getNumberOfVoters(final Metrics metrics) {
return metrics.metrics().get(metrics.metricName("number-of-voters", "raft-metrics"));
}

private static KRaftControlRecordStateMachine buildPartitionListener(MockLog log, VoterSet staticVoterSet) {
return new KRaftControlRecordStateMachine(
staticVoterSet,
Expand All @@ -70,6 +64,15 @@ private static KRaftControlRecordStateMachine buildPartitionListener(MockLog log
);
}

private static void checkMetricValues(int expectedNumberOfVoters, boolean expectedIgnoredStaticVoters) {
assertEquals(expectedNumberOfVoters, getNumberOfVoters(metrics).metricValue());
assertEquals(expectedIgnoredStaticVoters, externalMetrics.getIgnoredStaticVoters());
}

private static KafkaMetric getNumberOfVoters(final Metrics metrics) {
return metrics.metrics().get(metrics.metricName("number-of-voters", "raft-metrics"));
}

@BeforeEach
public void resetMetrics() {
metrics = new Metrics();
Expand All @@ -88,8 +91,7 @@ void testEmptyPartition() {
partitionState.updateState();

assertEquals(voterSet, partitionState.lastVoterSet());
assertFalse(externalMetrics.getIgnoredStaticVoters());
assertEquals(3, getNumberOfVoters(metrics).metricValue());
checkMetricValues(3, false);
}

@Test
Expand All @@ -102,8 +104,7 @@ void testEmptyPartitionWithNoStaticVoters() {
partitionState.updateState();

assertEquals(VoterSet.empty(), partitionState.lastVoterSet());
assertFalse(externalMetrics.getIgnoredStaticVoters());
assertEquals(0, getNumberOfVoters(metrics).metricValue());
checkMetricValues(0, false);
}

@Test
Expand All @@ -115,8 +116,7 @@ void testUpdateWithoutSnapshot() {

KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, staticVoterSet);

assertFalse(externalMetrics.getIgnoredStaticVoters());
assertEquals(3, getNumberOfVoters(metrics).metricValue());
checkMetricValues(3, false);

// Append the kraft.version control record
KRaftVersion kraftVersion = KRaftVersion.KRAFT_VERSION_1;
Expand Down Expand Up @@ -150,8 +150,7 @@ void testUpdateWithoutSnapshot() {
assertEquals(voterSet, partitionState.lastVoterSet());
assertEquals(Optional.of(voterSet), partitionState.voterSetAtOffset(log.endOffset().offset() - 1));
assertEquals(kraftVersion, partitionState.kraftVersionAtOffset(log.endOffset().offset() - 1));
assertTrue(externalMetrics.getIgnoredStaticVoters());
assertEquals(3, getNumberOfVoters(metrics).metricValue());
checkMetricValues(3, true);
}

@Test
Expand All @@ -163,8 +162,7 @@ void testUpdateWithEmptySnapshot() {

KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, staticVoterSet);

assertFalse(externalMetrics.getIgnoredStaticVoters());
assertEquals(3, getNumberOfVoters(metrics).metricValue());
checkMetricValues(3, false);

// Create a snapshot that doesn't have any kraft.version or voter set control records
RecordsSnapshotWriter.Builder builder = new RecordsSnapshotWriter.Builder()
Expand Down Expand Up @@ -206,8 +204,7 @@ void testUpdateWithEmptySnapshot() {
assertEquals(voterSet, partitionState.lastVoterSet());
assertEquals(Optional.of(voterSet), partitionState.voterSetAtOffset(log.endOffset().offset() - 1));
assertEquals(kraftVersion, partitionState.kraftVersionAtOffset(log.endOffset().offset() - 1));
assertTrue(externalMetrics.getIgnoredStaticVoters());
assertEquals(3, getNumberOfVoters(metrics).metricValue());
checkMetricValues(3, true);
}

@Test
Expand All @@ -218,8 +215,7 @@ void testUpdateWithSnapshot() {

KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, staticVoterSet);

assertFalse(externalMetrics.getIgnoredStaticVoters());
assertEquals(3, getNumberOfVoters(metrics).metricValue());
checkMetricValues(3, false);

// Create a snapshot that has kraft.version and voter set control records
KRaftVersion kraftVersion = KRaftVersion.KRAFT_VERSION_1;
Expand All @@ -240,8 +236,7 @@ void testUpdateWithSnapshot() {
assertEquals(voterSet, partitionState.lastVoterSet());
assertEquals(Optional.of(voterSet), partitionState.voterSetAtOffset(log.endOffset().offset() - 1));
assertEquals(kraftVersion, partitionState.kraftVersionAtOffset(log.endOffset().offset() - 1));
assertTrue(externalMetrics.getIgnoredStaticVoters());
assertEquals(3, getNumberOfVoters(metrics).metricValue());
checkMetricValues(3, true);
}

@Test
Expand All @@ -253,8 +248,7 @@ void testUpdateWithSnapshotAndLogOverride() {

KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, staticVoterSet);

assertFalse(externalMetrics.getIgnoredStaticVoters());
assertEquals(3, getNumberOfVoters(metrics).metricValue());
checkMetricValues(3, false);

// Create a snapshot that has kraft.version and voter set control records
KRaftVersion kraftVersion = KRaftVersion.KRAFT_VERSION_1;
Expand Down Expand Up @@ -289,8 +283,7 @@ void testUpdateWithSnapshotAndLogOverride() {
assertEquals(voterSet, partitionState.lastVoterSet());
assertEquals(Optional.of(voterSet), partitionState.voterSetAtOffset(log.endOffset().offset() - 1));
assertEquals(kraftVersion, partitionState.kraftVersionAtOffset(log.endOffset().offset() - 1));
assertTrue(externalMetrics.getIgnoredStaticVoters());
assertEquals(4, getNumberOfVoters(metrics).metricValue());
checkMetricValues(4, true);

// Check the voter set at the snapshot
assertEquals(Optional.of(snapshotVoterSet), partitionState.voterSetAtOffset(snapshotId.offset() - 1));
Expand All @@ -305,8 +298,7 @@ void testTruncateTo() {

KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, staticVoterSet);

assertFalse(externalMetrics.getIgnoredStaticVoters());
assertEquals(3, getNumberOfVoters(metrics).metricValue());
checkMetricValues(3, false);

// Append the kraft.version control record
KRaftVersion kraftVersion = KRaftVersion.KRAFT_VERSION_1;
Expand Down Expand Up @@ -353,25 +345,21 @@ void testTruncateTo() {
partitionState.updateState();

assertEquals(voterSet, partitionState.lastVoterSet());
assertTrue(externalMetrics.getIgnoredStaticVoters());
assertEquals(4, getNumberOfVoters(metrics).metricValue());
checkMetricValues(4, true);

// Truncate log and listener
log.truncateTo(voterSetOffset);
partitionState.truncateNewEntries(voterSetOffset);

assertEquals(firstVoterSet, partitionState.lastVoterSet());
assertTrue(externalMetrics.getIgnoredStaticVoters());
assertEquals(3, getNumberOfVoters(metrics).metricValue());
checkMetricValues(3, true);

// Truncate the entire log
log.truncateTo(0);
partitionState.truncateNewEntries(0);

assertEquals(staticVoterSet, partitionState.lastVoterSet());
// TODO: should this metric be reporting false?
assertTrue(externalMetrics.getIgnoredStaticVoters());
assertEquals(3, getNumberOfVoters(metrics).metricValue());
checkMetricValues(3, false);
}

@Test
Expand All @@ -383,8 +371,7 @@ void testTrimPrefixTo() {

KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, staticVoterSet);

assertFalse(externalMetrics.getIgnoredStaticVoters());
assertEquals(3, getNumberOfVoters(metrics).metricValue());
checkMetricValues(3, false);

// Append the kraft.version control record
long kraftVersionOffset = log.endOffset().offset();
Expand Down Expand Up @@ -433,8 +420,7 @@ void testTrimPrefixTo() {

assertEquals(voterSet, partitionState.lastVoterSet());
assertEquals(kraftVersion, partitionState.kraftVersionAtOffset(kraftVersionOffset));
assertTrue(externalMetrics.getIgnoredStaticVoters());
assertEquals(4, getNumberOfVoters(metrics).metricValue());
checkMetricValues(4, true);

// Trim the prefix for the partition listener up to the kraft.version
partitionState.truncateOldEntries(kraftVersionOffset);
Expand All @@ -450,7 +436,6 @@ void testTrimPrefixTo() {
assertEquals(kraftVersion, partitionState.kraftVersionAtOffset(kraftVersionOffset));
assertEquals(Optional.empty(), partitionState.voterSetAtOffset(firstVoterSetOffset));
assertEquals(Optional.of(voterSet), partitionState.voterSetAtOffset(voterSetOffset));
assertTrue(externalMetrics.getIgnoredStaticVoters());
assertEquals(4, getNumberOfVoters(metrics).metricValue());
checkMetricValues(4, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ long lastAppliedTimestamp() {
return lastAppliedImageProvenance.get().lastContainedLogTimeMs();
}

public void setIgnoredStaticVoters() {
ignoredStaticVoters.set(true);
public void setIgnoredStaticVoters(boolean ignored) {
ignoredStaticVoters.set(ignored);
}

public boolean ignoredStaticVoters() {
Expand Down
Loading