Skip to content

Commit

Permalink
KAFKA-16524; Metrics for KIP-853 (#18304)
Browse files Browse the repository at this point in the history
This change implement some of the metrics enumerated in KIP-853.

The KafkaRaftMetrics object now exposes number-of-voters, number-of-observers and uncommitted-voter-change. The number-of-observers and uncommitted-voter-change metrics are only present on the active controller or leader, since it does not make sense for other replicas to report these metrics.

In order to make these two metrics thread-safe, KafkaRaftMetrics needs to be passed into LeaderState, and therefore QuorumState. This introduces a circularity since the KafkaRaftMetrics constructor takes in QuorumState. To break the circularity for now, the logic using QuorumState will be moved to the KafkaRaftMetrics#initialize method.

The BrokerServerMetrics object now exposes ignored-static-voters. The ControllerServerMetrics object now exposes IgnoredStaticVoters. To implement both metrics for "ignored static voters", this PR introduces the ExternalKRaftMetrics interface, which allows for higher layer metrics objects to be accessible within the raft module.

Reviewers: José Armando García Sancio <[email protected]>
  • Loading branch information
kevin-wu24 authored and cmccabe committed Feb 4, 2025
1 parent 226532a commit 98d238a
Show file tree
Hide file tree
Showing 25 changed files with 681 additions and 87 deletions.
33 changes: 33 additions & 0 deletions core/src/main/scala/kafka/raft/DefaultExternalKRaftMetrics.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.raft

import org.apache.kafka.controller.metrics.ControllerMetadataMetrics
import org.apache.kafka.raft.ExternalKRaftMetrics
import org.apache.kafka.server.metrics.BrokerServerMetrics

class DefaultExternalKRaftMetrics(
val brokerServerMetrics: Option[BrokerServerMetrics],
val controllerMetadataMetrics: Option[ControllerMetadataMetrics]
) extends ExternalKRaftMetrics {

override def setIgnoredStaticVoters(ignoredStaticVoters: Boolean): Unit = {
brokerServerMetrics.foreach(metrics => metrics.setIgnoredStaticVoters(ignoredStaticVoters))
controllerMetadataMetrics.foreach(metrics => metrics.setIgnoredStaticVoters(ignoredStaticVoters))
}
}
6 changes: 4 additions & 2 deletions core/src/main/scala/kafka/raft/RaftManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.kafka.common.requests.RequestHeader
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.raft.{Endpoints, FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, QuorumConfig, RaftClient, ReplicatedLog}
import org.apache.kafka.raft.{ExternalKRaftMetrics, Endpoints, FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, QuorumConfig, RaftClient, ReplicatedLog}
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.common.Feature
import org.apache.kafka.server.common.serialization.RecordSerde
Expand Down Expand Up @@ -115,6 +115,7 @@ class KafkaRaftManager[T](
topicId: Uuid,
time: Time,
metrics: Metrics,
externalKRaftMetrics: ExternalKRaftMetrics,
threadNamePrefixOpt: Option[String],
val controllerQuorumVotersFuture: CompletableFuture[JMap[Integer, InetSocketAddress]],
bootstrapServers: JCollection[InetSocketAddress],
Expand Down Expand Up @@ -158,7 +159,8 @@ class KafkaRaftManager[T](
client.initialize(
controllerQuorumVotersFuture.get(),
new FileQuorumStateStore(new File(dataDir, FileQuorumStateStore.DEFAULT_FILE_NAME)),
metrics
metrics,
externalKRaftMetrics
)
netChannel.start()
clientDriver.start()
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/kafka/server/SharedServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package kafka.server

import kafka.metrics.KafkaMetricsReporter
import kafka.raft.KafkaRaftManager
import kafka.raft.{DefaultExternalKRaftMetrics, KafkaRaftManager}
import kafka.server.Server.MetricsPrefix
import kafka.utils.{CoreUtils, Logging, VerifiableProperties}
import org.apache.kafka.common.metrics.Metrics
Expand Down Expand Up @@ -277,6 +277,8 @@ class SharedServer(
controllerServerMetrics = new ControllerMetadataMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()))
}

val externalKRaftMetrics = new DefaultExternalKRaftMetrics(Option(brokerMetrics), Option(controllerServerMetrics))

val _raftManager = new KafkaRaftManager[ApiMessageAndVersion](
clusterId,
sharedServerConfig,
Expand All @@ -286,6 +288,7 @@ class SharedServer(
KafkaRaftServer.MetadataTopicId,
time,
metrics,
externalKRaftMetrics,
Some(s"kafka-${sharedServerConfig.nodeId}-raft"), // No dash expected at the end
controllerQuorumVotersFuture,
bootstrapServers,
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/tools/TestRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingDeque, TimeUnit}
import joptsimple.{OptionException, OptionSpec}
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.{KafkaRaftManager, RaftManager}
import kafka.raft.{DefaultExternalKRaftMetrics, KafkaRaftManager, RaftManager}
import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, SimpleApiVersionManager}
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.errors.InvalidConfigurationException
Expand All @@ -36,7 +36,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.utils.{Exit, Time, Utils}
import org.apache.kafka.common.{TopicPartition, Uuid, protocol}
import org.apache.kafka.raft.errors.NotLeaderException
import org.apache.kafka.raft.{Batch, BatchReader, Endpoints, LeaderAndEpoch, RaftClient, QuorumConfig}
import org.apache.kafka.raft.{Batch, BatchReader, Endpoints, LeaderAndEpoch, QuorumConfig, RaftClient}
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
import org.apache.kafka.server.common.serialization.RecordSerde
Expand Down Expand Up @@ -102,6 +102,7 @@ class TestRaftServer(
topicId,
time,
metrics,
new DefaultExternalKRaftMetrics(None, None),
Some(threadNamePrefix),
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)),
QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.raft

import com.yammer.metrics.core.MetricsRegistry
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.controller.metrics.ControllerMetadataMetrics
import org.apache.kafka.server.metrics.BrokerServerMetrics
import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue}
import org.junit.jupiter.api.Test

import java.util.Optional

final class DefaultExternalKRaftMetricsTest {
@Test
def testDefaultExternalKRaftMetrics(): Unit = {
val brokerServerMetrics = new BrokerServerMetrics(new Metrics())
val controllerMetadataMetrics = new ControllerMetadataMetrics(Optional.of(new MetricsRegistry()))
val metrics = new DefaultExternalKRaftMetrics(
Option(brokerServerMetrics),
Option(controllerMetadataMetrics)
)

assertFalse(brokerServerMetrics.ignoredStaticVoters())
assertFalse(controllerMetadataMetrics.ignoredStaticVoters())

metrics.setIgnoredStaticVoters(true)

assertTrue(brokerServerMetrics.ignoredStaticVoters())
assertTrue(controllerMetadataMetrics.ignoredStaticVoters())

metrics.setIgnoredStaticVoters(false)

assertFalse(brokerServerMetrics.ignoredStaticVoters())
assertFalse(controllerMetadataMetrics.ignoredStaticVoters())
}

@Test
def testEmptyDefaultExternalKRaftMetrics(): Unit = {
val metrics = new DefaultExternalKRaftMetrics(None, None)
metrics.setIgnoredStaticVoters(true)
}
}
1 change: 1 addition & 0 deletions core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class RaftManagerTest {
topicId,
Time.SYSTEM,
new Metrics(Time.SYSTEM),
new DefaultExternalKRaftMetrics(None, None),
Option.empty,
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)),
QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand Down Expand Up @@ -54,6 +55,8 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
"KafkaController", "MetadataErrorCount");
private static final MetricName UNCLEAN_LEADER_ELECTIONS_PER_SEC = getMetricName(
"ControllerStats", "UncleanLeaderElectionsPerSec");
private static final MetricName IGNORED_STATIC_VOTERS = getMetricName(
"KafkaController", "IgnoredStaticVoters");

private final Optional<MetricsRegistry> registry;
private final AtomicInteger fencedBrokerCount = new AtomicInteger(0);
Expand All @@ -64,7 +67,7 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
private final AtomicInteger preferredReplicaImbalanceCount = new AtomicInteger(0);
private final AtomicInteger metadataErrorCount = new AtomicInteger(0);
private Optional<Meter> uncleanLeaderElectionMeter = Optional.empty();

private final AtomicBoolean ignoredStaticVoters = new AtomicBoolean(false);

/**
* Create a new ControllerMetadataMetrics object.
Expand Down Expand Up @@ -117,6 +120,13 @@ public Integer value() {
}));
registry.ifPresent(r -> uncleanLeaderElectionMeter =
Optional.of(registry.get().newMeter(UNCLEAN_LEADER_ELECTIONS_PER_SEC, "elections", TimeUnit.SECONDS)));

registry.ifPresent(r -> r.newGauge(IGNORED_STATIC_VOTERS, new Gauge<Integer>() {
@Override
public Integer value() {
return ignoredStaticVoters() ? 1 : 0;
}
}));
}

public void setFencedBrokerCount(int brokerCount) {
Expand Down Expand Up @@ -203,6 +213,14 @@ public void updateUncleanLeaderElection(int count) {
this.uncleanLeaderElectionMeter.ifPresent(m -> m.mark(count));
}

public void setIgnoredStaticVoters(boolean ignored) {
ignoredStaticVoters.set(ignored);
}

public boolean ignoredStaticVoters() {
return ignoredStaticVoters.get();
}

@Override
public void close() {
registry.ifPresent(r -> Arrays.asList(
Expand All @@ -213,7 +231,8 @@ public void close() {
OFFLINE_PARTITION_COUNT,
PREFERRED_REPLICA_IMBALANCE_COUNT,
METADATA_ERROR_COUNT,
UNCLEAN_LEADER_ELECTIONS_PER_SEC
UNCLEAN_LEADER_ELECTIONS_PER_SEC,
IGNORED_STATIC_VOTERS
).forEach(r::removeMetric));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public void testMetricNames() {
"kafka.controller:type=KafkaController,name=MetadataErrorCount",
"kafka.controller:type=KafkaController,name=OfflinePartitionsCount",
"kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount",
"kafka.controller:type=KafkaController,name=IgnoredStaticVoters",
"kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec"
)));
}
Expand Down Expand Up @@ -190,4 +191,22 @@ public void testUpdateUncleanLeaderElection() {
registry.shutdown();
}
}

@Test
public void testIgnoredStaticVoters() {
MetricsRegistry registry = new MetricsRegistry();
try (ControllerMetadataMetrics metrics = new ControllerMetadataMetrics(Optional.of(registry))) {
@SuppressWarnings("unchecked")
Gauge<Integer> ignoredStaticVoters = (Gauge<Integer>) registry
.allMetrics()
.get(metricName("KafkaController", "IgnoredStaticVoters"));
assertEquals(0, ignoredStaticVoters.value());
metrics.setIgnoredStaticVoters(true);
assertEquals(1, ignoredStaticVoters.value());
metrics.setIgnoredStaticVoters(false);
assertEquals(0, ignoredStaticVoters.value());
} finally {
registry.shutdown();
}
}
}
26 changes: 26 additions & 0 deletions raft/src/main/java/org/apache/kafka/raft/ExternalKRaftMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.raft;

/**
* Implementations of this interface store external metrics objects whose
* values are updated in the raft layer. They are not allowed to block.
*/
public interface ExternalKRaftMetrics {
void setIgnoredStaticVoters(boolean ignoredStaticVoters);
}
14 changes: 10 additions & 4 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -467,19 +467,24 @@ private void maybeFireLeaderChange() {
public void initialize(
Map<Integer, InetSocketAddress> voterAddresses,
QuorumStateStore quorumStateStore,
Metrics metrics
Metrics metrics,
ExternalKRaftMetrics externalKRaftMetrics
) {
VoterSet staticVoters = voterAddresses.isEmpty() ?
VoterSet.empty() :
VoterSet.fromInetSocketAddresses(channel.listenerName(), voterAddresses);

kafkaRaftMetrics = new KafkaRaftMetrics(metrics, "raft");

partitionState = new KRaftControlRecordStateMachine(
staticVoters,
log,
serde,
BufferSupplier.create(),
MAX_BATCH_SIZE_BYTES,
logContext
logContext,
kafkaRaftMetrics,
externalKRaftMetrics
);
// Read the entire log
logger.info("Reading KRaft snapshot and log as part of the initialization");
Expand Down Expand Up @@ -531,10 +536,11 @@ public void initialize(
quorumStateStore,
time,
logContext,
random
random,
kafkaRaftMetrics
);

kafkaRaftMetrics = new KafkaRaftMetrics(metrics, "raft", quorum);
kafkaRaftMetrics.initialize(quorum);
// All Raft voters are statically configured and known at startup
// so there are no unknown voter connections. Report this metric as 0.
kafkaRaftMetrics.updateNumUnknownVoterConnections(0);
Expand Down
Loading

0 comments on commit 98d238a

Please sign in to comment.