Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16524: Metrics for KIP-853 #18304

Merged
merged 22 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions core/src/main/scala/kafka/raft/RaftManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.kafka.common.requests.RequestHeader
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.raft.internals.ExternalKRaftMetrics
import org.apache.kafka.raft.{Endpoints, FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, QuorumConfig, RaftClient, ReplicatedLog}
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.common.Feature
Expand Down Expand Up @@ -153,7 +154,8 @@ class KafkaRaftManager[T](
val controllerQuorumVotersFuture: CompletableFuture[JMap[Integer, InetSocketAddress]],
bootstrapServers: JCollection[InetSocketAddress],
localListeners: Endpoints,
fatalFaultHandler: FaultHandler
fatalFaultHandler: FaultHandler,
externalKRaftMetrics: ExternalKRaftMetrics
jsancio marked this conversation as resolved.
Show resolved Hide resolved
) extends RaftManager[T] with Logging {

val apiVersions = new ApiVersions()
Expand Down Expand Up @@ -192,7 +194,8 @@ class KafkaRaftManager[T](
client.initialize(
controllerQuorumVotersFuture.get(),
new FileQuorumStateStore(new File(dataDir, FileQuorumStateStore.DEFAULT_FILE_NAME)),
metrics
metrics,
externalKRaftMetrics
)
netChannel.start()
clientDriver.start()
Expand Down
13 changes: 5 additions & 8 deletions core/src/main/scala/kafka/server/SharedServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.kafka.metadata.ListenerInfo
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble
import org.apache.kafka.raft.Endpoints
import org.apache.kafka.raft.internals.ExternalKRaftMetrics
import org.apache.kafka.server.{ProcessRole, ServerSocketFactory}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler, ProcessTerminatingFaultHandler}
Expand Down Expand Up @@ -277,6 +278,8 @@ class SharedServer(
controllerServerMetrics = new ControllerMetadataMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()))
}

val externalKRaftMetrics = new ExternalKRaftMetrics(brokerMetrics, controllerServerMetrics)

val _raftManager = new KafkaRaftManager[ApiMessageAndVersion](
clusterId,
sharedServerConfig,
Expand All @@ -290,18 +293,12 @@ class SharedServer(
controllerQuorumVotersFuture,
bootstrapServers,
listenerEndpoints,
raftManagerFaultHandler
raftManagerFaultHandler,
externalKRaftMetrics
)
raftManager = _raftManager
_raftManager.startup()

if (sharedServerConfig.processRoles.contains(ProcessRole.BrokerRole)) {
brokerMetrics.addRaftMetrics(raftManager.client)
}
if (sharedServerConfig.processRoles.contains(ProcessRole.ControllerRole)) {
controllerServerMetrics.addRaftMetrics(raftManager.client)
}

metadataLoaderMetrics = if (brokerMetrics != null) {
new MetadataLoaderMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()),
elapsedNs => brokerMetrics.updateBatchProcessingTime(elapsedNs),
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/kafka/tools/TestRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.utils.{Exit, Time, Utils}
import org.apache.kafka.common.{TopicPartition, Uuid, protocol}
import org.apache.kafka.raft.errors.NotLeaderException
import org.apache.kafka.raft.{Batch, BatchReader, Endpoints, LeaderAndEpoch, RaftClient, QuorumConfig}
import org.apache.kafka.raft.internals.ExternalKRaftMetrics
import org.apache.kafka.raft.{Batch, BatchReader, Endpoints, LeaderAndEpoch, QuorumConfig, RaftClient}
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
import org.apache.kafka.server.common.serialization.RecordSerde
Expand Down Expand Up @@ -106,7 +107,8 @@ class TestRaftServer(
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)),
QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers),
endpoints,
new ProcessTerminatingFaultHandler.Builder().build()
new ProcessTerminatingFaultHandler.Builder().build(),
new ExternalKRaftMetrics(null, null)
)

workloadGenerator = new RaftWorkloadGenerator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.kafka.controller.metrics;

import org.apache.kafka.raft.KafkaRaftClient;
import org.apache.kafka.raft.internals.ExternalKRaftMetricIgnoredStaticVoters;
jsancio marked this conversation as resolved.
Show resolved Hide resolved
import org.apache.kafka.server.metrics.KafkaYammerMetrics;

import com.yammer.metrics.core.Gauge;
Expand All @@ -28,6 +28,7 @@
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand All @@ -38,7 +39,7 @@
* IMPORTANT: Metrics which are managed by the QuorumController class itself should go in
* {@link org.apache.kafka.controller.metrics.QuorumControllerMetrics}, not here.
*/
public final class ControllerMetadataMetrics implements AutoCloseable {
public final class ControllerMetadataMetrics implements AutoCloseable, ExternalKRaftMetricIgnoredStaticVoters {
private static final MetricName FENCED_BROKER_COUNT = getMetricName(
"KafkaController", "FencedBrokerCount");
private static final MetricName ACTIVE_BROKER_COUNT = getMetricName(
Expand All @@ -63,9 +64,6 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
private static final MetricName IGNORED_STATIC_VOTERS = getMetricName(
"KafkaController", "IgnoredStaticVoters");

private static final MetricName IS_OBSERVER = getMetricName(
"KafkaController", "IsObserver");

private final Optional<MetricsRegistry> registry;
private final AtomicInteger fencedBrokerCount = new AtomicInteger(0);
private final AtomicInteger activeBrokerCount = new AtomicInteger(0);
Expand All @@ -78,6 +76,8 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
private final AtomicInteger zkMigrationState = new AtomicInteger(-1);
private Optional<Meter> uncleanLeaderElectionMeter = Optional.empty();

private final AtomicBoolean ignoredStaticVoters = new AtomicBoolean(false);
jsancio marked this conversation as resolved.
Show resolved Hide resolved


/**
* Create a new ControllerMetadataMetrics object.
Expand Down Expand Up @@ -144,20 +144,11 @@ public Integer value() {

registry.ifPresent(r -> uncleanLeaderElectionMeter =
Optional.of(registry.get().newMeter(UNCLEAN_LEADER_ELECTIONS_PER_SEC, "elections", TimeUnit.SECONDS)));
}

public <T> void addRaftMetrics(KafkaRaftClient<T> client) {
registry.ifPresent(r -> r.newGauge(IGNORED_STATIC_VOTERS, new Gauge<Integer>() {
@Override
public Integer value() {
return client.ignoredStaticVoters();
}
}));

registry.ifPresent(r -> r.newGauge(IS_OBSERVER, new Gauge<Integer>() {
registry.ifPresent(r -> r.newGauge(IGNORED_STATIC_VOTERS, new Gauge<Boolean>() {
jsancio marked this conversation as resolved.
Show resolved Hide resolved
@Override
public Integer value() {
return client.isObserver();
public Boolean value() {
return ignoredStaticVoters();
}
}));
}
Expand Down Expand Up @@ -266,6 +257,13 @@ public void updateUncleanLeaderElection(int count) {
this.uncleanLeaderElectionMeter.ifPresent(m -> m.mark(count));
}

public void switchIgnoredStaticVoters() {
ignoredStaticVoters.compareAndSet(false, true);
jsancio marked this conversation as resolved.
Show resolved Hide resolved
}
public boolean ignoredStaticVoters() {
return this.ignoredStaticVoters.get();
jsancio marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void close() {
registry.ifPresent(r -> Arrays.asList(
Expand All @@ -279,8 +277,7 @@ public void close() {
METADATA_ERROR_COUNT,
ZK_MIGRATION_STATE,
UNCLEAN_LEADER_ELECTIONS_PER_SEC,
IGNORED_STATIC_VOTERS,
IS_OBSERVER
IGNORED_STATIC_VOTERS
).forEach(r::removeMetric));
}

Expand Down
25 changes: 12 additions & 13 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.kafka.raft.internals.BlockingMessageQueue;
import org.apache.kafka.raft.internals.CloseListener;
import org.apache.kafka.raft.internals.DefaultRequestSender;
import org.apache.kafka.raft.internals.ExternalKRaftMetrics;
import org.apache.kafka.raft.internals.FuturePurgatory;
import org.apache.kafka.raft.internals.KRaftControlRecordStateMachine;
import org.apache.kafka.raft.internals.KafkaRaftMetrics;
Expand Down Expand Up @@ -466,19 +467,24 @@ private void maybeFireLeaderChange() {
public void initialize(
Map<Integer, InetSocketAddress> voterAddresses,
QuorumStateStore quorumStateStore,
Metrics metrics
Metrics metrics,
ExternalKRaftMetrics externalKRaftMetrics
) {
VoterSet staticVoters = voterAddresses.isEmpty() ?
VoterSet.empty() :
VoterSet.fromInetSocketAddresses(channel.listenerName(), voterAddresses);

kafkaRaftMetrics = new KafkaRaftMetrics(metrics, "raft");

partitionState = new KRaftControlRecordStateMachine(
staticVoters,
log,
serde,
BufferSupplier.create(),
MAX_BATCH_SIZE_BYTES,
logContext
logContext,
kafkaRaftMetrics,
externalKRaftMetrics
);
// Read the entire log
logger.info("Reading KRaft snapshot and log as part of the initialization");
Expand Down Expand Up @@ -530,10 +536,11 @@ public void initialize(
quorumStateStore,
time,
logContext,
random
random,
kafkaRaftMetrics
);

kafkaRaftMetrics = new KafkaRaftMetrics(metrics, "raft", quorum);
kafkaRaftMetrics.initialize(quorum);
// All Raft voters are statically configured and known at startup
// so there are no unknown voter connections. Report this metric as 0.
kafkaRaftMetrics.updateNumUnknownVoterConnections(0);
Expand Down Expand Up @@ -648,7 +655,7 @@ private void onBecomeLeader(long currentTimeMs) {

resetConnections();
kafkaRaftMetrics.maybeUpdateElectionLatency(currentTimeMs);
kafkaRaftMetrics.addLeaderMetrics(state);
kafkaRaftMetrics.addLeaderMetrics();
jsancio marked this conversation as resolved.
Show resolved Hide resolved
}

private void flushLeaderLog(LeaderState<T> state, long currentTimeMs) {
Expand Down Expand Up @@ -3519,14 +3526,6 @@ public Optional<Node> voterNode(int id, ListenerName listenerName) {
return partitionState.lastVoterSet().voterNode(id, listenerName);
}

public int ignoredStaticVoters() {
return partitionState.lastVoterSetOffset().isPresent() ? 1 : 0;
}

public int isObserver() {
return quorum.isObserver() ? 1 : 0;
}

// Visible only for test
QuorumState quorum() {
// It's okay to return null since this method is only called by tests
Expand Down
41 changes: 32 additions & 9 deletions raft/src/main/java/org/apache/kafka/raft/LeaderState.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.raft.internals.AddVoterHandlerState;
import org.apache.kafka.raft.internals.BatchAccumulator;
import org.apache.kafka.raft.internals.KafkaRaftMetrics;
import org.apache.kafka.raft.internals.RemoveVoterHandlerState;
import org.apache.kafka.server.common.KRaftVersion;

Expand Down Expand Up @@ -81,6 +82,7 @@ public class LeaderState<T> implements EpochState {
private final int checkQuorumTimeoutMs;
private final Timer beginQuorumEpochTimer;
private final int beginQuorumEpochTimeoutMs;
private final KafkaRaftMetrics kafkaRaftMetrics;

// This is volatile because resignation can be requested from an external thread.
private volatile boolean resignRequested = false;
Expand All @@ -97,7 +99,8 @@ protected LeaderState(
BatchAccumulator<T> accumulator,
Endpoints localListeners,
int fetchTimeoutMs,
LogContext logContext
LogContext logContext,
KafkaRaftMetrics kafkaRaftMetrics
) {
this.localReplicaKey = localReplicaKey;
this.epoch = epoch;
Expand All @@ -122,6 +125,7 @@ protected LeaderState(
this.voterSetAtEpochStart = voterSetAtEpochStart;
this.offsetOfVotersAtEpochStart = offsetOfVotersAtEpochStart;
this.kraftVersionAtEpochStart = kraftVersionAtEpochStart;
this.kafkaRaftMetrics = kafkaRaftMetrics;
}

public long timeUntilBeginQuorumEpochTimerExpires(long currentTimeMs) {
Expand Down Expand Up @@ -217,6 +221,12 @@ public void resetAddVoterHandlerState(
.complete(RaftUtil.addVoterResponse(error, message))
);
addVoterHandlerState = state;
if (addVoterHandlerState.isPresent() || removeVoterHandlerState.isPresent()) {
kafkaRaftMetrics.updateUncommittedVoterChange(1);
} else {
kafkaRaftMetrics.updateUncommittedVoterChange(0);
}
jsancio marked this conversation as resolved.
Show resolved Hide resolved

}

public Optional<RemoveVoterHandlerState> removeVoterHandlerState() {
Expand All @@ -234,6 +244,11 @@ public void resetRemoveVoterHandlerState(
.complete(RaftUtil.removeVoterResponse(error, message))
);
removeVoterHandlerState = state;
if (addVoterHandlerState.isPresent() || removeVoterHandlerState.isPresent()) {
kafkaRaftMetrics.updateUncommittedVoterChange(1);
} else {
kafkaRaftMetrics.updateUncommittedVoterChange(0);
}
jsancio marked this conversation as resolved.
Show resolved Hide resolved
}

public long maybeExpirePendingOperation(long currentTimeMs) {
Expand Down Expand Up @@ -428,10 +443,6 @@ Map<ReplicaKey, ReplicaState> observerStates(final long currentTimeMs) {
return observerStates;
}

public int numObservers() {
return observerStates.size();
}

public Set<Integer> grantingVoters() {
return this.grantingVoters;
}
Expand Down Expand Up @@ -636,7 +647,10 @@ public long epochStartOffset() {
private ReplicaState getOrCreateReplicaState(ReplicaKey replicaKey) {
ReplicaState state = voterStates.get(replicaKey.id());
if (state == null || !state.matchesKey(replicaKey)) {
observerStates.putIfAbsent(replicaKey, new ReplicaState(replicaKey, false, Endpoints.empty()));
LeaderState.ReplicaState previous = observerStates.putIfAbsent(replicaKey, new ReplicaState(replicaKey, false, Endpoints.empty()));
if (previous == null) {
kafkaRaftMetrics.updateNumObservers(observerStates.size());
}
jsancio marked this conversation as resolved.
Show resolved Hide resolved
return observerStates.get(replicaKey);
}
return state;
Expand All @@ -655,10 +669,13 @@ public Optional<ReplicaState> getReplicaState(ReplicaKey replicaKey) {
* Clear observer states that have not been active for a while and are not the leader.
*/
private void clearInactiveObservers(final long currentTimeMs) {
observerStates.entrySet().removeIf(integerReplicaStateEntry ->
boolean removed = observerStates.entrySet().removeIf(integerReplicaStateEntry ->
currentTimeMs - integerReplicaStateEntry.getValue().lastFetchTimestamp >= OBSERVER_SESSION_TIMEOUT_MS &&
!integerReplicaStateEntry.getKey().equals(localReplicaKey)
);
if (removed) {
kafkaRaftMetrics.updateNumObservers(observerStates.size());
}
}

private boolean isVoter(ReplicaKey remoteReplicaKey) {
Expand All @@ -677,7 +694,10 @@ private void updateVoterAndObserverStates(VoterSet lastVoterSet) {

// Remove the voter from the previous data structures
oldVoterStates.remove(voterNode.voterKey().id());
observerStates.remove(voterNode.voterKey());
LeaderState.ReplicaState previous = observerStates.remove(voterNode.voterKey());
if (previous != null) {
kafkaRaftMetrics.updateNumObservers(observerStates.size());
}

// Make sure that the replica key in the replica state matches the voter's
state.setReplicaKey(voterNode.voterKey());
Expand All @@ -691,7 +711,10 @@ private void updateVoterAndObserverStates(VoterSet lastVoterSet) {
// Move any of the remaining old voters to observerStates
for (ReplicaState replicaStateEntry : oldVoterStates.values()) {
replicaStateEntry.clearListeners();
observerStates.putIfAbsent(replicaStateEntry.replicaKey, replicaStateEntry);
LeaderState.ReplicaState previous = observerStates.putIfAbsent(replicaStateEntry.replicaKey, replicaStateEntry);
if (previous == null) {
kafkaRaftMetrics.updateNumObservers(observerStates.size());
}
}
}

Expand Down
Loading
Loading