Skip to content

Commit

Permalink
KAFKA-16525; Dynamic KRaft network manager and channel (#15986)
Browse files Browse the repository at this point in the history
Allow KRaft replicas to send requests to any node (Node) not just the nodes configured in the
controller.quorum.voters property. This flexibility is needed so KRaft can implement the
controller.quorum.voters configuration, send request to the dynamically changing set of voters and
send request to the leader endpoint (Node) discovered through the KRaft RPCs (specially
BeginQuorumEpoch request and Fetch response).

This was achieved by changing the RequestManager API to accept Node instead of just the replica ID.
Internally, the request manager tracks connection state using the Node.idString method to match the
connection management used by NetworkClient.

The API for RequestManager is also changed so that the ConnectState class is not exposed in the
API. This allows the request manager to reclaim heap memory for any connection that is ready.

The NetworkChannel was updated to receive the endpoint information (Node) through the outbound raft
request (RaftRequent.Outbound). This makes the network channel more flexible as it doesn't need to
be configured with the list of all possible endpoints. RaftRequest.Outbound and
RaftResponse.Inbound were updated to include the remote node instead of just the remote id.

The follower state tracked by KRaft replicas was updated to include both the leader id and the
leader's endpoint (Node). In this comment the node value is computed from the set of voters. In
future commit this will be updated so that it is sent through KRaft RPCs. For example
BeginQuorumEpoch request and Fetch response.

Support for configuring controller.quorum.bootstrap.servers was added. This includes changes to
KafkaConfig, QuorumConfig, etc. All of the tests using QuorumTestHarness were changed to use the
controller.quorum.bootstrap.servers instead of the controller.quorum.voters for the broker
configuration. Finally, the node id for the bootstrap server will be decreasing negative numbers
starting with -2.

Reviewers: Jason Gustafson <[email protected]>, Luke Chen <[email protected]>, Colin P. McCabe <[email protected]>
  • Loading branch information
jsancio authored Jun 3, 2024
1 parent 8a882a7 commit 459da47
Show file tree
Hide file tree
Showing 45 changed files with 2,158 additions and 985 deletions.
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
Expand Down
18 changes: 11 additions & 7 deletions core/src/main/scala/kafka/raft/RaftManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.nio.file.Paths
import java.util.OptionalInt
import java.util.concurrent.CompletableFuture
import java.util.{Map => JMap}
import java.util.{Collection => JCollection}
import kafka.log.LogManager
import kafka.log.UnifiedLog
import kafka.server.KafkaConfig
Expand Down Expand Up @@ -133,7 +134,7 @@ trait RaftManager[T] {

def replicatedLog: ReplicatedLog

def voterNode(id: Int, listener: String): Option[Node]
def voterNode(id: Int, listener: ListenerName): Option[Node]
}

class KafkaRaftManager[T](
Expand All @@ -147,6 +148,7 @@ class KafkaRaftManager[T](
metrics: Metrics,
threadNamePrefixOpt: Option[String],
val controllerQuorumVotersFuture: CompletableFuture[JMap[Integer, InetSocketAddress]],
bootstrapServers: JCollection[InetSocketAddress],
fatalFaultHandler: FaultHandler
) extends RaftManager[T] with Logging {

Expand Down Expand Up @@ -185,7 +187,6 @@ class KafkaRaftManager[T](
def startup(): Unit = {
client.initialize(
controllerQuorumVotersFuture.get(),
config.controllerListenerNames.head,
new FileQuorumStateStore(new File(dataDir, FileQuorumStateStore.DEFAULT_FILE_NAME)),
metrics
)
Expand Down Expand Up @@ -228,14 +229,15 @@ class KafkaRaftManager[T](
expirationService,
logContext,
clusterId,
bootstrapServers,
raftConfig
)
client
}

private def buildNetworkChannel(): KafkaNetworkChannel = {
val netClient = buildNetworkClient()
new KafkaNetworkChannel(time, netClient, config.quorumRequestTimeoutMs, threadNamePrefix)
val (listenerName, netClient) = buildNetworkClient()
new KafkaNetworkChannel(time, listenerName, netClient, config.quorumRequestTimeoutMs, threadNamePrefix)
}

private def createDataDir(): File = {
Expand All @@ -254,7 +256,7 @@ class KafkaRaftManager[T](
)
}

private def buildNetworkClient(): NetworkClient = {
private def buildNetworkClient(): (ListenerName, NetworkClient) = {
val controllerListenerName = new ListenerName(config.controllerListenerNames.head)
val controllerSecurityProtocol = config.effectiveListenerSecurityProtocolMap.getOrElse(
controllerListenerName,
Expand Down Expand Up @@ -292,7 +294,7 @@ class KafkaRaftManager[T](
val reconnectBackoffMsMs = 500
val discoverBrokerVersions = true

new NetworkClient(
val networkClient = new NetworkClient(
selector,
new ManualMetadataUpdater(),
clientId,
Expand All @@ -309,13 +311,15 @@ class KafkaRaftManager[T](
apiVersions,
logContext
)

(controllerListenerName, networkClient)
}

override def leaderAndEpoch: LeaderAndEpoch = {
client.leaderAndEpoch
}

override def voterNode(id: Int, listener: String): Option[Node] = {
override def voterNode(id: Int, listener: ListenerName): Option[Node] = {
client.voterNode(id, listener).toScala
}
}
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ object KafkaConfig {

/** ********* Raft Quorum Configuration *********/
.define(QuorumConfig.QUORUM_VOTERS_CONFIG, LIST, QuorumConfig.DEFAULT_QUORUM_VOTERS, new QuorumConfig.ControllerQuorumVotersValidator(), HIGH, QuorumConfig.QUORUM_VOTERS_DOC)
.define(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, LIST, QuorumConfig.DEFAULT_QUORUM_BOOTSTRAP_SERVERS, new QuorumConfig.ControllerQuorumBootstrapServersValidator(), HIGH, QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_DOC)
.define(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, INT, QuorumConfig.DEFAULT_QUORUM_ELECTION_TIMEOUT_MS, null, HIGH, QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_DOC)
.define(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, INT, QuorumConfig.DEFAULT_QUORUM_FETCH_TIMEOUT_MS, null, HIGH, QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_DOC)
.define(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, INT, QuorumConfig.DEFAULT_QUORUM_ELECTION_BACKOFF_MAX_MS, null, HIGH, QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_DOC)
Expand Down Expand Up @@ -1055,6 +1056,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami

/** ********* Raft Quorum Configuration *********/
val quorumVoters = getList(QuorumConfig.QUORUM_VOTERS_CONFIG)
val quorumBootstrapServers = getList(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG)
val quorumElectionTimeoutMs = getInt(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG)
val quorumFetchTimeoutMs = getInt(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG)
val quorumElectionBackoffMs = getInt(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG)
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/KafkaRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class KafkaRaftServer(
time,
metrics,
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumVoters)),
QuorumConfig.parseBootstrapServers(config.quorumBootstrapServers),
new StandardFaultHandlerFactory(),
)

Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ import java.net.{InetAddress, SocketTimeoutException}
import java.nio.file.{Files, Paths}
import java.time.Duration
import java.util
import java.util.{Optional, OptionalInt, OptionalLong}
import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.{Optional, OptionalInt, OptionalLong}
import scala.collection.{Map, Seq}
import scala.compat.java8.OptionConverters.RichOptionForJava8
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -439,6 +439,7 @@ class KafkaServer(
metrics,
threadNamePrefix,
CompletableFuture.completedFuture(quorumVoters),
QuorumConfig.parseBootstrapServers(config.quorumBootstrapServers),
fatalFaultHandler = new LoggingFaultHandler("raftManager", () => shutdown())
)
quorumControllerNodeProvider = RaftControllerNodeProvider(raftManager, config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class RaftControllerNodeProvider(
val saslMechanism: String
) extends ControllerNodeProvider with Logging {

private def idToNode(id: Int): Option[Node] = raftManager.voterNode(id, listenerName.value())
private def idToNode(id: Int): Option[Node] = raftManager.voterNode(id, listenerName)

override def getControllerInfo(): ControllerInformation =
ControllerInformation(raftManager.leaderAndEpoch.leaderId.asScala.flatMap(idToNode),
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/kafka/server/SharedServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import java.util.Arrays
import java.util.Optional
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{CompletableFuture, TimeUnit}
import java.util.{Collection => JCollection}
import java.util.{Map => JMap}


Expand Down Expand Up @@ -94,6 +95,7 @@ class SharedServer(
val time: Time,
private val _metrics: Metrics,
val controllerQuorumVotersFuture: CompletableFuture[JMap[Integer, InetSocketAddress]],
val bootstrapServers: JCollection[InetSocketAddress],
val faultHandlerFactory: FaultHandlerFactory
) extends Logging {
private val logContext: LogContext = new LogContext(s"[SharedServer id=${sharedServerConfig.nodeId}] ")
Expand Down Expand Up @@ -265,6 +267,7 @@ class SharedServer(
metrics,
Some(s"kafka-${sharedServerConfig.nodeId}-raft"), // No dash expected at the end
controllerQuorumVotersFuture,
bootstrapServers,
raftManagerFaultHandler
)
raftManager = _raftManager
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/tools/StorageTool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ object StorageTool extends Logging {
metaPropertiesEnsemble.verify(metaProperties.clusterId(), metaProperties.nodeId(),
util.EnumSet.noneOf(classOf[VerificationFlag]))

System.out.println(s"metaPropertiesEnsemble=$metaPropertiesEnsemble")
stream.println(s"metaPropertiesEnsemble=$metaPropertiesEnsemble")
val copier = new MetaPropertiesEnsemble.Copier(metaPropertiesEnsemble)
if (!(ignoreFormatted || copier.logDirProps().isEmpty)) {
val firstLogDir = copier.logDirProps().keySet().iterator().next()
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/tools/TestRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class TestRaftServer(
metrics,
Some(threadNamePrefix),
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumVoters)),
QuorumConfig.parseBootstrapServers(config.quorumBootstrapServers),
new ProcessTerminatingFaultHandler.Builder().build()
)

Expand Down
26 changes: 17 additions & 9 deletions core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,15 @@ public KafkaClusterTestKit build() throws Exception {
ThreadUtils.createThreadFactory("kafka-cluster-test-kit-executor-%d", false));
for (ControllerNode node : nodes.controllerNodes().values()) {
setupNodeDirectories(baseDirectory, node.metadataDirectory(), Collections.emptyList());
SharedServer sharedServer = new SharedServer(createNodeConfig(node),
node.initialMetaPropertiesEnsemble(),
Time.SYSTEM,
new Metrics(),
connectFutureManager.future,
faultHandlerFactory);
SharedServer sharedServer = new SharedServer(
createNodeConfig(node),
node.initialMetaPropertiesEnsemble(),
Time.SYSTEM,
new Metrics(),
connectFutureManager.future,
Collections.emptyList(),
faultHandlerFactory
);
ControllerServer controller = null;
try {
controller = new ControllerServer(
Expand All @@ -267,13 +270,18 @@ public KafkaClusterTestKit build() throws Exception {
jointServers.put(node.id(), sharedServer);
}
for (BrokerNode node : nodes.brokerNodes().values()) {
SharedServer sharedServer = jointServers.computeIfAbsent(node.id(),
id -> new SharedServer(createNodeConfig(node),
SharedServer sharedServer = jointServers.computeIfAbsent(
node.id(),
id -> new SharedServer(
createNodeConfig(node),
node.initialMetaPropertiesEnsemble(),
Time.SYSTEM,
new Metrics(),
connectFutureManager.future,
faultHandlerFactory));
Collections.emptyList(),
faultHandlerFactory
)
);
BrokerServer broker = null;
try {
broker = new BrokerServer(sharedServer);
Expand Down
1 change: 0 additions & 1 deletion core/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,5 @@ log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.logger.kafka=WARN
log4j.logger.org.apache.kafka=WARN


# zkclient can be verbose, during debugging it is common to adjust it separately
log4j.logger.org.apache.zookeeper=WARN
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.admin.{NewPartitions, NewTopic}
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.{KafkaException, MetricName, TopicPartition}
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.{InvalidGroupIdException, InvalidTopicException, TimeoutException, WakeupException}
import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.record.{CompressionType, TimestampType}
import org.apache.kafka.common.serialization._
import org.apache.kafka.common.{KafkaException, MetricName, TopicPartition}
import org.apache.kafka.test.{MockConsumerInterceptor, MockProducerInterceptor}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Timeout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,15 @@ class KRaftQuorumImplementation(
metaPropertiesEnsemble.verify(Optional.of(clusterId),
OptionalInt.of(config.nodeId),
util.EnumSet.of(REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR))
val sharedServer = new SharedServer(config,
val sharedServer = new SharedServer(
config,
metaPropertiesEnsemble,
time,
new Metrics(),
controllerQuorumVotersFuture,
faultHandlerFactory)
controllerQuorumVotersFuture.get().values(),
faultHandlerFactory
)
var broker: BrokerServer = null
try {
broker = new BrokerServer(sharedServer)
Expand Down Expand Up @@ -371,12 +374,15 @@ abstract class QuorumTestHarness extends Logging {
metaPropertiesEnsemble.verify(Optional.of(metaProperties.clusterId().get()),
OptionalInt.of(nodeId),
util.EnumSet.of(REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR))
val sharedServer = new SharedServer(config,
val sharedServer = new SharedServer(
config,
metaPropertiesEnsemble,
Time.SYSTEM,
new Metrics(),
controllerQuorumVotersFuture,
faultHandlerFactory)
Collections.emptyList(),
faultHandlerFactory
)
var controllerServer: ControllerServer = null
try {
controllerServer = new ControllerServer(
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/unit/kafka/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class KafkaConfigTest {

@Test
def testBrokerRoleNodeIdValidation(): Unit = {
// Ensure that validation is happening at startup to check that brokers do not use their node.id as a voter in controller.quorum.voters
// Ensure that validation is happening at startup to check that brokers do not use their node.id as a voter in controller.quorum.voters
val propertiesFile = new Properties
propertiesFile.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
propertiesFile.setProperty(KRaftConfigs.NODE_ID_CONFIG, "1")
Expand All @@ -102,7 +102,7 @@ class KafkaConfigTest {

@Test
def testControllerRoleNodeIdValidation(): Unit = {
// Ensure that validation is happening at startup to check that controllers use their node.id as a voter in controller.quorum.voters
// Ensure that validation is happening at startup to check that controllers use their node.id as a voter in controller.quorum.voters
val propertiesFile = new Properties
propertiesFile.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
propertiesFile.setProperty(KRaftConfigs.NODE_ID_CONFIG, "1")
Expand Down
1 change: 1 addition & 0 deletions core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class RaftManagerTest {
new Metrics(Time.SYSTEM),
Option.empty,
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumVoters)),
QuorumConfig.parseBootstrapServers(config.quorumBootstrapServers),
mock(classOf[FaultHandler])
)
}
Expand Down
20 changes: 19 additions & 1 deletion core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package kafka.server

import java.net.InetSocketAddress
import java.util
import java.util.{Collections, Properties}
import java.util.{Arrays, Collections, Properties}
import kafka.cluster.EndPoint
import kafka.security.authorizer.AclAuthorizer
import kafka.utils.TestUtils.assertBadConfigContainingMessage
Expand Down Expand Up @@ -1032,6 +1032,7 @@ class KafkaConfigTest {

// Raft Quorum Configs
case QuorumConfig.QUORUM_VOTERS_CONFIG => // ignore string
case QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG => // ignore string
case QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
Expand Down Expand Up @@ -1402,6 +1403,23 @@ class KafkaConfigTest {
assertEquals(expectedVoters, addresses)
}

@Test
def testParseQuorumBootstrapServers(): Unit = {
val expected = Arrays.asList(
InetSocketAddress.createUnresolved("kafka1", 9092),
InetSocketAddress.createUnresolved("kafka2", 9092)
)

val props = TestUtils.createBrokerConfig(0, null)
props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092")

val addresses = QuorumConfig.parseBootstrapServers(
KafkaConfig.fromProps(props).quorumBootstrapServers
)

assertEquals(expected, addresses)
}

@Test
def testAcceptsLargeNodeIdForRaftBasedCase(): Unit = {
// Generation of Broker IDs is not supported when using Raft-based controller quorums,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import java.nio.ByteBuffer
import java.util
import java.util.Collections
import java.util.Optional
import java.util.Arrays
import java.util.Properties
import java.util.stream.IntStream
import kafka.log.{LogTestUtils, UnifiedLog}
import kafka.raft.{KafkaMetadataLog, MetadataLogConfig}
import kafka.server.{BrokerTopicStats, KafkaRaftServer}
Expand Down Expand Up @@ -338,7 +338,7 @@ class DumpLogSegmentsTest {
.setLastContainedLogTimestamp(lastContainedLogTimestamp)
.setRawSnapshotWriter(metadataLog.createNewSnapshot(new OffsetAndEpoch(0, 0)).get)
.setKraftVersion(1)
.setVoterSet(Optional.of(VoterSetTest.voterSet(VoterSetTest.voterMap(Arrays.asList(1, 2, 3), true))))
.setVoterSet(Optional.of(VoterSetTest.voterSet(VoterSetTest.voterMap(IntStream.of(1, 2, 3), true))))
.build(MetadataRecordSerde.INSTANCE)
) { snapshotWriter =>
snapshotWriter.append(metadataRecords.asJava)
Expand Down
Loading

0 comments on commit 459da47

Please sign in to comment.