Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16524: Metrics for KIP-853 #18304

Merged
merged 22 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2040,8 +2040,12 @@ project(':raft') {
testImplementation project(':server-common').sourceSets.test.output
testImplementation project(':clients')
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':core')
testImplementation project(':server')
testImplementation project(':metadata')
jsancio marked this conversation as resolved.
Show resolved Hide resolved
testImplementation libs.jacksonDatabindYaml
testImplementation libs.junitJupiter
testImplementation libs.metrics
testImplementation libs.mockitoCore
testImplementation libs.jqwik
testImplementation testLog4j2Libs
Expand Down
4 changes: 4 additions & 0 deletions checkstyle/import-control.xml
jsancio marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -463,10 +463,13 @@
</subpackage>

<subpackage name="raft">
<allow pkg="com.yammer.metrics" />
<allow pkg="kafka.raft" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.controller.metrics" />
<allow class="org.apache.kafka.common.compress.Compression" exact-match="true" />
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.feature" />
Expand All @@ -477,6 +480,7 @@
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.common.serialization" />
<allow pkg="org.apache.kafka.server.fault"/>
Expand Down
35 changes: 35 additions & 0 deletions core/src/main/scala/kafka/raft/DefaultExternalKRaftMetrics.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.raft

import org.apache.kafka.controller.metrics.ControllerMetadataMetrics
import org.apache.kafka.raft.ExternalKRaftMetrics
import org.apache.kafka.server.metrics.BrokerServerMetrics

class DefaultExternalKRaftMetrics(
val brokerServerMetrics: BrokerServerMetrics,
val controllerMetadataMetrics: ControllerMetadataMetrics
) extends ExternalKRaftMetrics {
val brokerServerMetricsOpt: Option[BrokerServerMetrics] = Option(brokerServerMetrics)
val controllerMetadataMetricsOpt: Option[ControllerMetadataMetrics] = Option(controllerMetadataMetrics)
jsancio marked this conversation as resolved.
Show resolved Hide resolved

override def setIgnoredStaticVoters(): Unit = {
brokerServerMetricsOpt.foreach(metrics => metrics.setIgnoredStaticVoters())
controllerMetadataMetricsOpt.foreach(metrics => metrics.setIgnoredStaticVoters())
}
}
8 changes: 5 additions & 3 deletions core/src/main/scala/kafka/raft/RaftManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.kafka.common.requests.RequestHeader
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.raft.{Endpoints, FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, QuorumConfig, RaftClient, ReplicatedLog, TimingWheelExpirationService}
import org.apache.kafka.raft.{ExternalKRaftMetrics, Endpoints, FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, QuorumConfig, RaftClient, ReplicatedLog, TimingWheelExpirationService}
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.common.Feature
import org.apache.kafka.server.common.serialization.RecordSerde
Expand Down Expand Up @@ -119,7 +119,8 @@ class KafkaRaftManager[T](
val controllerQuorumVotersFuture: CompletableFuture[JMap[Integer, InetSocketAddress]],
bootstrapServers: JCollection[InetSocketAddress],
localListeners: Endpoints,
fatalFaultHandler: FaultHandler
fatalFaultHandler: FaultHandler,
externalKRaftMetrics: ExternalKRaftMetrics
jsancio marked this conversation as resolved.
Show resolved Hide resolved
) extends RaftManager[T] with Logging {

val apiVersions = new ApiVersions()
Expand Down Expand Up @@ -158,7 +159,8 @@ class KafkaRaftManager[T](
client.initialize(
controllerQuorumVotersFuture.get(),
new FileQuorumStateStore(new File(dataDir, FileQuorumStateStore.DEFAULT_FILE_NAME)),
metrics
metrics,
externalKRaftMetrics
)
netChannel.start()
clientDriver.start()
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/kafka/server/SharedServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package kafka.server

import kafka.metrics.KafkaMetricsReporter
import kafka.raft.KafkaRaftManager
import kafka.raft.{DefaultExternalKRaftMetrics, KafkaRaftManager}
import kafka.server.Server.MetricsPrefix
import kafka.utils.{CoreUtils, Logging, VerifiableProperties}
import org.apache.kafka.common.metrics.Metrics
Expand Down Expand Up @@ -277,6 +277,8 @@ class SharedServer(
controllerServerMetrics = new ControllerMetadataMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()))
}

val externalKRaftMetrics = new DefaultExternalKRaftMetrics(brokerMetrics, controllerServerMetrics)

val _raftManager = new KafkaRaftManager[ApiMessageAndVersion](
clusterId,
sharedServerConfig,
Expand All @@ -290,7 +292,8 @@ class SharedServer(
controllerQuorumVotersFuture,
bootstrapServers,
listenerEndpoints,
raftManagerFaultHandler
raftManagerFaultHandler,
externalKRaftMetrics
)
raftManager = _raftManager
_raftManager.startup()
Expand Down
15 changes: 12 additions & 3 deletions core/src/main/scala/kafka/tools/TestRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

package kafka.tools

import com.yammer.metrics.core.MetricsRegistry

import java.net.InetSocketAddress
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingDeque, TimeUnit}
import joptsimple.{OptionException, OptionSpec}
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.{KafkaRaftManager, RaftManager}
import kafka.raft.{DefaultExternalKRaftMetrics, KafkaRaftManager, RaftManager}
import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, SimpleApiVersionManager}
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.errors.InvalidConfigurationException
Expand All @@ -35,16 +37,19 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.{Exit, Time, Utils}
import org.apache.kafka.common.{TopicPartition, Uuid, protocol}
import org.apache.kafka.controller.metrics.ControllerMetadataMetrics
import org.apache.kafka.raft.errors.NotLeaderException
import org.apache.kafka.raft.{Batch, BatchReader, Endpoints, LeaderAndEpoch, RaftClient, QuorumConfig}
import org.apache.kafka.raft.{Batch, BatchReader, Endpoints, LeaderAndEpoch, QuorumConfig, RaftClient}
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.server.config.KRaftConfigs
import org.apache.kafka.server.fault.ProcessTerminatingFaultHandler
import org.apache.kafka.server.metrics.BrokerServerMetrics
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils, ShutdownableThread}
import org.apache.kafka.snapshot.SnapshotReader

import java.util.Optional
import scala.jdk.CollectionConverters._

/**
Expand Down Expand Up @@ -106,7 +111,11 @@ class TestRaftServer(
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)),
QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers),
endpoints,
new ProcessTerminatingFaultHandler.Builder().build()
new ProcessTerminatingFaultHandler.Builder().build(),
new DefaultExternalKRaftMetrics(
new BrokerServerMetrics(metrics),
new ControllerMetadataMetrics(Optional.of(new MetricsRegistry()))
)
jsancio marked this conversation as resolved.
Show resolved Hide resolved
)

workloadGenerator = new RaftWorkloadGenerator(
Expand Down
3 changes: 2 additions & 1 deletion core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ class RaftManagerTest {
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)),
QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers),
endpoints,
mock(classOf[FaultHandler])
mock(classOf[FaultHandler]),
new DefaultExternalKRaftMetrics(null, null)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand Down Expand Up @@ -55,6 +56,9 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
private static final MetricName UNCLEAN_LEADER_ELECTIONS_PER_SEC = getMetricName(
"ControllerStats", "UncleanLeaderElectionsPerSec");

private static final MetricName IGNORED_STATIC_VOTERS = getMetricName(
"KafkaController", "IgnoredStaticVoters");

private final Optional<MetricsRegistry> registry;
private final AtomicInteger fencedBrokerCount = new AtomicInteger(0);
private final AtomicInteger activeBrokerCount = new AtomicInteger(0);
Expand All @@ -65,6 +69,8 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
private final AtomicInteger metadataErrorCount = new AtomicInteger(0);
private Optional<Meter> uncleanLeaderElectionMeter = Optional.empty();

private final AtomicBoolean ignoredStaticVoters = new AtomicBoolean(false);
jsancio marked this conversation as resolved.
Show resolved Hide resolved


/**
* Create a new ControllerMetadataMetrics object.
Expand Down Expand Up @@ -117,6 +123,13 @@ public Integer value() {
}));
registry.ifPresent(r -> uncleanLeaderElectionMeter =
Optional.of(registry.get().newMeter(UNCLEAN_LEADER_ELECTIONS_PER_SEC, "elections", TimeUnit.SECONDS)));

registry.ifPresent(r -> r.newGauge(IGNORED_STATIC_VOTERS, new Gauge<Boolean>() {
jsancio marked this conversation as resolved.
Show resolved Hide resolved
@Override
public Boolean value() {
return ignoredStaticVoters();
}
}));
}

public void setFencedBrokerCount(int brokerCount) {
Expand Down Expand Up @@ -203,6 +216,13 @@ public void updateUncleanLeaderElection(int count) {
this.uncleanLeaderElectionMeter.ifPresent(m -> m.mark(count));
}

public void setIgnoredStaticVoters() {
ignoredStaticVoters.set(true);
}
public boolean ignoredStaticVoters() {
return ignoredStaticVoters.get();
}

@Override
public void close() {
registry.ifPresent(r -> Arrays.asList(
Expand All @@ -213,7 +233,8 @@ public void close() {
OFFLINE_PARTITION_COUNT,
PREFERRED_REPLICA_IMBALANCE_COUNT,
METADATA_ERROR_COUNT,
UNCLEAN_LEADER_ELECTIONS_PER_SEC
UNCLEAN_LEADER_ELECTIONS_PER_SEC,
IGNORED_STATIC_VOTERS
).forEach(r::removeMetric));
}

Expand Down
26 changes: 26 additions & 0 deletions raft/src/main/java/org/apache/kafka/raft/ExternalKRaftMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.raft;

/**
* Implementations of this interface store external metrics objects whose
* values are updated in the raft layer. They are not allowed to block.
*/
public interface ExternalKRaftMetrics {
void setIgnoredStaticVoters();
}
18 changes: 14 additions & 4 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -467,19 +467,24 @@ private void maybeFireLeaderChange() {
public void initialize(
Map<Integer, InetSocketAddress> voterAddresses,
QuorumStateStore quorumStateStore,
Metrics metrics
Metrics metrics,
ExternalKRaftMetrics externalKRaftMetrics
) {
VoterSet staticVoters = voterAddresses.isEmpty() ?
VoterSet.empty() :
VoterSet.fromInetSocketAddresses(channel.listenerName(), voterAddresses);

kafkaRaftMetrics = new KafkaRaftMetrics(metrics, "raft");

partitionState = new KRaftControlRecordStateMachine(
staticVoters,
log,
serde,
BufferSupplier.create(),
MAX_BATCH_SIZE_BYTES,
logContext
logContext,
kafkaRaftMetrics,
externalKRaftMetrics
);
// Read the entire log
logger.info("Reading KRaft snapshot and log as part of the initialization");
Expand Down Expand Up @@ -531,10 +536,11 @@ public void initialize(
quorumStateStore,
time,
logContext,
random
random,
kafkaRaftMetrics
);

kafkaRaftMetrics = new KafkaRaftMetrics(metrics, "raft", quorum);
kafkaRaftMetrics.initialize(quorum);
// All Raft voters are statically configured and known at startup
// so there are no unknown voter connections. Report this metric as 0.
kafkaRaftMetrics.updateNumUnknownVoterConnections(0);
Expand Down Expand Up @@ -649,6 +655,7 @@ private void onBecomeLeader(long currentTimeMs) {

resetConnections();
kafkaRaftMetrics.maybeUpdateElectionLatency(currentTimeMs);
kafkaRaftMetrics.addLeaderMetrics();
jsancio marked this conversation as resolved.
Show resolved Hide resolved
}

private void flushLeaderLog(LeaderState<T> state, long currentTimeMs) {
Expand Down Expand Up @@ -722,18 +729,21 @@ private void transitionToProspective(long currentTimeMs) {
private void transitionToUnattached(int epoch, OptionalInt leaderId) {
quorum.transitionToUnattached(epoch, leaderId);
maybeFireLeaderChange();
kafkaRaftMetrics.removeLeaderMetrics();
resetConnections();
}

private void transitionToResigned(List<ReplicaKey> preferredSuccessors) {
fetchPurgatory.completeAllExceptionally(
Errors.NOT_LEADER_OR_FOLLOWER.exception("Not handling request since this node is resigning"));
quorum.transitionToResigned(preferredSuccessors);
kafkaRaftMetrics.removeLeaderMetrics();
jsancio marked this conversation as resolved.
Show resolved Hide resolved
resetConnections();
}

private void onBecomeFollower(long currentTimeMs) {
kafkaRaftMetrics.maybeUpdateElectionLatency(currentTimeMs);
kafkaRaftMetrics.removeLeaderMetrics();

resetConnections();

Expand Down
Loading
Loading