Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16524: Metrics for KIP-853 #18304

Merged
merged 22 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions core/src/main/scala/kafka/raft/RaftManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.kafka.common.requests.RequestHeader
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.raft.internals.ExternalKRaftMetrics
import org.apache.kafka.raft.{Endpoints, FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, QuorumConfig, RaftClient, ReplicatedLog}
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.common.Feature
Expand Down Expand Up @@ -153,7 +154,8 @@ class KafkaRaftManager[T](
val controllerQuorumVotersFuture: CompletableFuture[JMap[Integer, InetSocketAddress]],
bootstrapServers: JCollection[InetSocketAddress],
localListeners: Endpoints,
fatalFaultHandler: FaultHandler
fatalFaultHandler: FaultHandler,
externalKRaftMetrics: ExternalKRaftMetrics
jsancio marked this conversation as resolved.
Show resolved Hide resolved
) extends RaftManager[T] with Logging {

val apiVersions = new ApiVersions()
Expand Down Expand Up @@ -192,7 +194,8 @@ class KafkaRaftManager[T](
client.initialize(
controllerQuorumVotersFuture.get(),
new FileQuorumStateStore(new File(dataDir, FileQuorumStateStore.DEFAULT_FILE_NAME)),
metrics
metrics,
externalKRaftMetrics
)
netChannel.start()
clientDriver.start()
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/kafka/server/SharedServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.kafka.metadata.ListenerInfo
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble
import org.apache.kafka.raft.Endpoints
import org.apache.kafka.raft.internals.ExternalKRaftMetrics
import org.apache.kafka.server.{ProcessRole, ServerSocketFactory}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler, ProcessTerminatingFaultHandler}
Expand Down Expand Up @@ -277,6 +278,8 @@ class SharedServer(
controllerServerMetrics = new ControllerMetadataMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()))
}

val externalKRaftMetrics = new ExternalKRaftMetrics(brokerMetrics, controllerServerMetrics)

val _raftManager = new KafkaRaftManager[ApiMessageAndVersion](
clusterId,
sharedServerConfig,
Expand All @@ -290,7 +293,8 @@ class SharedServer(
controllerQuorumVotersFuture,
bootstrapServers,
listenerEndpoints,
raftManagerFaultHandler
raftManagerFaultHandler,
externalKRaftMetrics
)
raftManager = _raftManager
_raftManager.startup()
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/kafka/tools/TestRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.utils.{Exit, Time, Utils}
import org.apache.kafka.common.{TopicPartition, Uuid, protocol}
import org.apache.kafka.raft.errors.NotLeaderException
import org.apache.kafka.raft.{Batch, BatchReader, Endpoints, LeaderAndEpoch, RaftClient, QuorumConfig}
import org.apache.kafka.raft.internals.ExternalKRaftMetrics
import org.apache.kafka.raft.{Batch, BatchReader, Endpoints, LeaderAndEpoch, QuorumConfig, RaftClient}
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
import org.apache.kafka.server.common.serialization.RecordSerde
Expand Down Expand Up @@ -106,7 +107,8 @@ class TestRaftServer(
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)),
QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers),
endpoints,
new ProcessTerminatingFaultHandler.Builder().build()
new ProcessTerminatingFaultHandler.Builder().build(),
new ExternalKRaftMetrics(null, null)
)

workloadGenerator = new RaftWorkloadGenerator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.kafka.controller.metrics;

import org.apache.kafka.raft.internals.ExternalKRaftMetricIgnoredStaticVoters;
jsancio marked this conversation as resolved.
Show resolved Hide resolved
import org.apache.kafka.server.metrics.KafkaYammerMetrics;

import com.yammer.metrics.core.Gauge;
Expand All @@ -27,6 +28,7 @@
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand All @@ -37,7 +39,7 @@
* IMPORTANT: Metrics which are managed by the QuorumController class itself should go in
* {@link org.apache.kafka.controller.metrics.QuorumControllerMetrics}, not here.
*/
public final class ControllerMetadataMetrics implements AutoCloseable {
public final class ControllerMetadataMetrics implements AutoCloseable, ExternalKRaftMetricIgnoredStaticVoters {
private static final MetricName FENCED_BROKER_COUNT = getMetricName(
"KafkaController", "FencedBrokerCount");
private static final MetricName ACTIVE_BROKER_COUNT = getMetricName(
Expand All @@ -59,6 +61,9 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
private static final MetricName UNCLEAN_LEADER_ELECTIONS_PER_SEC = getMetricName(
"ControllerStats", "UncleanLeaderElectionsPerSec");

private static final MetricName IGNORED_STATIC_VOTERS = getMetricName(
"KafkaController", "IgnoredStaticVoters");

private final Optional<MetricsRegistry> registry;
private final AtomicInteger fencedBrokerCount = new AtomicInteger(0);
private final AtomicInteger activeBrokerCount = new AtomicInteger(0);
Expand All @@ -71,6 +76,8 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
private final AtomicInteger zkMigrationState = new AtomicInteger(-1);
private Optional<Meter> uncleanLeaderElectionMeter = Optional.empty();

private final AtomicBoolean ignoredStaticVoters = new AtomicBoolean(false);
jsancio marked this conversation as resolved.
Show resolved Hide resolved


/**
* Create a new ControllerMetadataMetrics object.
Expand Down Expand Up @@ -137,6 +144,13 @@ public Integer value() {

registry.ifPresent(r -> uncleanLeaderElectionMeter =
Optional.of(registry.get().newMeter(UNCLEAN_LEADER_ELECTIONS_PER_SEC, "elections", TimeUnit.SECONDS)));

registry.ifPresent(r -> r.newGauge(IGNORED_STATIC_VOTERS, new Gauge<Boolean>() {
jsancio marked this conversation as resolved.
Show resolved Hide resolved
@Override
public Boolean value() {
return ignoredStaticVoters();
}
}));
}

public void setFencedBrokerCount(int brokerCount) {
Expand Down Expand Up @@ -243,6 +257,13 @@ public void updateUncleanLeaderElection(int count) {
this.uncleanLeaderElectionMeter.ifPresent(m -> m.mark(count));
}

public void switchIgnoredStaticVoters() {
ignoredStaticVoters.compareAndSet(false, true);
jsancio marked this conversation as resolved.
Show resolved Hide resolved
}
public boolean ignoredStaticVoters() {
return this.ignoredStaticVoters.get();
jsancio marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void close() {
registry.ifPresent(r -> Arrays.asList(
Expand All @@ -255,7 +276,8 @@ public void close() {
PREFERRED_REPLICA_IMBALANCE_COUNT,
METADATA_ERROR_COUNT,
ZK_MIGRATION_STATE,
UNCLEAN_LEADER_ELECTIONS_PER_SEC
UNCLEAN_LEADER_ELECTIONS_PER_SEC,
IGNORED_STATIC_VOTERS
).forEach(r::removeMetric));
}

Expand Down
19 changes: 15 additions & 4 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.kafka.raft.internals.BlockingMessageQueue;
import org.apache.kafka.raft.internals.CloseListener;
import org.apache.kafka.raft.internals.DefaultRequestSender;
import org.apache.kafka.raft.internals.ExternalKRaftMetrics;
import org.apache.kafka.raft.internals.FuturePurgatory;
import org.apache.kafka.raft.internals.KRaftControlRecordStateMachine;
import org.apache.kafka.raft.internals.KafkaRaftMetrics;
Expand Down Expand Up @@ -466,19 +467,24 @@ private void maybeFireLeaderChange() {
public void initialize(
Map<Integer, InetSocketAddress> voterAddresses,
QuorumStateStore quorumStateStore,
Metrics metrics
Metrics metrics,
ExternalKRaftMetrics externalKRaftMetrics
) {
VoterSet staticVoters = voterAddresses.isEmpty() ?
VoterSet.empty() :
VoterSet.fromInetSocketAddresses(channel.listenerName(), voterAddresses);

kafkaRaftMetrics = new KafkaRaftMetrics(metrics, "raft");

partitionState = new KRaftControlRecordStateMachine(
staticVoters,
log,
serde,
BufferSupplier.create(),
MAX_BATCH_SIZE_BYTES,
logContext
logContext,
kafkaRaftMetrics,
externalKRaftMetrics
);
// Read the entire log
logger.info("Reading KRaft snapshot and log as part of the initialization");
Expand Down Expand Up @@ -530,10 +536,11 @@ public void initialize(
quorumStateStore,
time,
logContext,
random
random,
kafkaRaftMetrics
);

kafkaRaftMetrics = new KafkaRaftMetrics(metrics, "raft", quorum);
kafkaRaftMetrics.initialize(quorum);
// All Raft voters are statically configured and known at startup
// so there are no unknown voter connections. Report this metric as 0.
kafkaRaftMetrics.updateNumUnknownVoterConnections(0);
Expand Down Expand Up @@ -648,6 +655,7 @@ private void onBecomeLeader(long currentTimeMs) {

resetConnections();
kafkaRaftMetrics.maybeUpdateElectionLatency(currentTimeMs);
kafkaRaftMetrics.addLeaderMetrics();
jsancio marked this conversation as resolved.
Show resolved Hide resolved
}

private void flushLeaderLog(LeaderState<T> state, long currentTimeMs) {
Expand Down Expand Up @@ -682,13 +690,15 @@ private void transitionToCandidate(long currentTimeMs) {
private void transitionToUnattached(int epoch) {
quorum.transitionToUnattached(epoch);
maybeFireLeaderChange();
kafkaRaftMetrics.removeLeaderMetrics();
resetConnections();
}

private void transitionToResigned(List<ReplicaKey> preferredSuccessors) {
fetchPurgatory.completeAllExceptionally(
Errors.NOT_LEADER_OR_FOLLOWER.exception("Not handling request since this node is resigning"));
quorum.transitionToResigned(preferredSuccessors);
kafkaRaftMetrics.removeLeaderMetrics();
jsancio marked this conversation as resolved.
Show resolved Hide resolved
resetConnections();
}

Expand All @@ -698,6 +708,7 @@ private void transitionToUnattachedVoted(ReplicaKey candidateKey, int epoch) {

private void onBecomeFollower(long currentTimeMs) {
kafkaRaftMetrics.maybeUpdateElectionLatency(currentTimeMs);
kafkaRaftMetrics.removeLeaderMetrics();

resetConnections();

Expand Down
37 changes: 32 additions & 5 deletions raft/src/main/java/org/apache/kafka/raft/LeaderState.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.raft.internals.AddVoterHandlerState;
import org.apache.kafka.raft.internals.BatchAccumulator;
import org.apache.kafka.raft.internals.KafkaRaftMetrics;
import org.apache.kafka.raft.internals.RemoveVoterHandlerState;
import org.apache.kafka.server.common.KRaftVersion;

Expand Down Expand Up @@ -81,6 +82,7 @@ public class LeaderState<T> implements EpochState {
private final int checkQuorumTimeoutMs;
private final Timer beginQuorumEpochTimer;
private final int beginQuorumEpochTimeoutMs;
private final KafkaRaftMetrics kafkaRaftMetrics;

// This is volatile because resignation can be requested from an external thread.
private volatile boolean resignRequested = false;
Expand All @@ -97,7 +99,8 @@ protected LeaderState(
BatchAccumulator<T> accumulator,
Endpoints localListeners,
int fetchTimeoutMs,
LogContext logContext
LogContext logContext,
KafkaRaftMetrics kafkaRaftMetrics
) {
this.localReplicaKey = localReplicaKey;
this.epoch = epoch;
Expand All @@ -122,6 +125,7 @@ protected LeaderState(
this.voterSetAtEpochStart = voterSetAtEpochStart;
this.offsetOfVotersAtEpochStart = offsetOfVotersAtEpochStart;
this.kraftVersionAtEpochStart = kraftVersionAtEpochStart;
this.kafkaRaftMetrics = kafkaRaftMetrics;
}

public long timeUntilBeginQuorumEpochTimerExpires(long currentTimeMs) {
Expand Down Expand Up @@ -217,6 +221,12 @@ public void resetAddVoterHandlerState(
.complete(RaftUtil.addVoterResponse(error, message))
);
addVoterHandlerState = state;
if (addVoterHandlerState.isPresent() || removeVoterHandlerState.isPresent()) {
kafkaRaftMetrics.updateUncommittedVoterChange(1);
} else {
kafkaRaftMetrics.updateUncommittedVoterChange(0);
}
jsancio marked this conversation as resolved.
Show resolved Hide resolved

}

public Optional<RemoveVoterHandlerState> removeVoterHandlerState() {
Expand All @@ -234,6 +244,11 @@ public void resetRemoveVoterHandlerState(
.complete(RaftUtil.removeVoterResponse(error, message))
);
removeVoterHandlerState = state;
if (addVoterHandlerState.isPresent() || removeVoterHandlerState.isPresent()) {
kafkaRaftMetrics.updateUncommittedVoterChange(1);
} else {
kafkaRaftMetrics.updateUncommittedVoterChange(0);
}
jsancio marked this conversation as resolved.
Show resolved Hide resolved
}

public long maybeExpirePendingOperation(long currentTimeMs) {
Expand Down Expand Up @@ -632,7 +647,10 @@ public long epochStartOffset() {
private ReplicaState getOrCreateReplicaState(ReplicaKey replicaKey) {
ReplicaState state = voterStates.get(replicaKey.id());
if (state == null || !state.matchesKey(replicaKey)) {
observerStates.putIfAbsent(replicaKey, new ReplicaState(replicaKey, false, Endpoints.empty()));
LeaderState.ReplicaState previous = observerStates.putIfAbsent(replicaKey, new ReplicaState(replicaKey, false, Endpoints.empty()));
if (previous == null) {
kafkaRaftMetrics.updateNumObservers(observerStates.size());
}
jsancio marked this conversation as resolved.
Show resolved Hide resolved
return observerStates.get(replicaKey);
}
return state;
Expand All @@ -651,10 +669,13 @@ public Optional<ReplicaState> getReplicaState(ReplicaKey replicaKey) {
* Clear observer states that have not been active for a while and are not the leader.
*/
private void clearInactiveObservers(final long currentTimeMs) {
observerStates.entrySet().removeIf(integerReplicaStateEntry ->
boolean removed = observerStates.entrySet().removeIf(integerReplicaStateEntry ->
currentTimeMs - integerReplicaStateEntry.getValue().lastFetchTimestamp >= OBSERVER_SESSION_TIMEOUT_MS &&
!integerReplicaStateEntry.getKey().equals(localReplicaKey)
);
if (removed) {
kafkaRaftMetrics.updateNumObservers(observerStates.size());
}
}

private boolean isVoter(ReplicaKey remoteReplicaKey) {
Expand All @@ -673,7 +694,10 @@ private void updateVoterAndObserverStates(VoterSet lastVoterSet) {

// Remove the voter from the previous data structures
oldVoterStates.remove(voterNode.voterKey().id());
observerStates.remove(voterNode.voterKey());
LeaderState.ReplicaState previous = observerStates.remove(voterNode.voterKey());
if (previous != null) {
kafkaRaftMetrics.updateNumObservers(observerStates.size());
}

// Make sure that the replica key in the replica state matches the voter's
state.setReplicaKey(voterNode.voterKey());
Expand All @@ -687,7 +711,10 @@ private void updateVoterAndObserverStates(VoterSet lastVoterSet) {
// Move any of the remaining old voters to observerStates
for (ReplicaState replicaStateEntry : oldVoterStates.values()) {
replicaStateEntry.clearListeners();
observerStates.putIfAbsent(replicaStateEntry.replicaKey, replicaStateEntry);
LeaderState.ReplicaState previous = observerStates.putIfAbsent(replicaStateEntry.replicaKey, replicaStateEntry);
if (previous == null) {
kafkaRaftMetrics.updateNumObservers(observerStates.size());
}
}
}

Expand Down
10 changes: 8 additions & 2 deletions raft/src/main/java/org/apache/kafka/raft/QuorumState.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.raft.internals.BatchAccumulator;
import org.apache.kafka.raft.internals.KRaftControlRecordStateMachine;

import org.apache.kafka.raft.internals.KafkaRaftMetrics;
import org.slf4j.Logger;

import java.io.IOException;
Expand Down Expand Up @@ -89,6 +90,8 @@ public class QuorumState {
private final int fetchTimeoutMs;
private final LogContext logContext;

private final KafkaRaftMetrics kafkaRaftMetrics;
jsancio marked this conversation as resolved.
Show resolved Hide resolved

private volatile EpochState state;

public QuorumState(
Expand All @@ -102,7 +105,8 @@ public QuorumState(
QuorumStateStore store,
Time time,
LogContext logContext,
Random random
Random random,
KafkaRaftMetrics kafkaRaftMetrics
) {
this.localId = localId;
this.localDirectoryId = localDirectoryId;
Expand All @@ -116,6 +120,7 @@ public QuorumState(
this.log = logContext.logger(QuorumState.class);
this.random = random;
this.logContext = logContext;
this.kafkaRaftMetrics = kafkaRaftMetrics;
}

private ElectionState readElectionState() {
Expand Down Expand Up @@ -604,7 +609,8 @@ public <T> LeaderState<T> transitionToLeader(long epochStartOffset, BatchAccumul
accumulator,
localListeners,
fetchTimeoutMs,
logContext
logContext,
kafkaRaftMetrics
);
durableTransitionTo(state);
return state;
Expand Down
Loading
Loading