Skip to content

Commit

Permalink
KAFKA-16938 non-dynamic props gets corrupted due to circular referenc…
Browse files Browse the repository at this point in the history
…e between DynamicBrokerConfig and DynamicConfig. (#16302)

Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
frankvicky authored Jun 13, 2024
1 parent 6d1f8f8 commit 0a203a9
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 17 deletions.
16 changes: 2 additions & 14 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ object DynamicBrokerConfig {
AllDynamicConfigs.intersect(passwordConfigs)
}

private val nonDynamicProps: Set[String] = KafkaConfig.configNames.toSet -- DynamicConfig.Broker.names.asScala

def isPasswordConfig(name: String): Boolean = DynamicBrokerConfig.DynamicPasswordConfigs.exists(name.endsWith)

def brokerConfigSynonyms(name: String, matchListenerOverride: Boolean): List[String] = {
Expand Down Expand Up @@ -168,7 +166,7 @@ object DynamicBrokerConfig {
}

private def nonDynamicConfigs(props: Properties): Set[String] = {
props.asScala.keySet.intersect(nonDynamicProps)
props.asScala.keySet.intersect(DynamicConfig.Broker.nonDynamicProps)
}

private def securityConfigsWithoutListenerPrefix(props: Properties): Set[String] = {
Expand All @@ -184,16 +182,6 @@ object DynamicBrokerConfig {
DynamicConfig.Broker.validate(baseProps)
}

private[server] def addDynamicConfigs(configDef: ConfigDef): Unit = {
KafkaConfig.configKeys.forKeyValue { (configName, config) =>
if (AllDynamicConfigs.contains(configName)) {
configDef.define(config.name, config.`type`, config.defaultValue, config.validator,
config.importance, config.documentation, config.group, config.orderInGroup, config.width,
config.displayName, config.dependents, config.recommender)
}
}
}

private[server] def dynamicConfigUpdateModes: util.Map[String, String] = {
AllDynamicConfigs.map { name =>
val mode = if (PerBrokerConfigs.contains(name)) "per-broker" else "cluster-wide"
Expand Down Expand Up @@ -321,7 +309,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
}

private def verifyReconfigurableConfigs(configNames: Set[String]): Unit = CoreUtils.inWriteLock(lock) {
val nonDynamic = configNames.intersect(nonDynamicProps)
val nonDynamic = configNames.intersect(DynamicConfig.Broker.nonDynamicProps)
require(nonDynamic.isEmpty, s"Reconfigurable contains non-dynamic configs $nonDynamic")
}

Expand Down
18 changes: 15 additions & 3 deletions core/src/main/scala/kafka/server/DynamicConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package kafka.server

import kafka.server.DynamicBrokerConfig.AllDynamicConfigs

import java.net.{InetAddress, UnknownHostException}
import java.util.Properties
import org.apache.kafka.common.config.ConfigDef
Expand All @@ -30,10 +32,20 @@ import scala.jdk.CollectionConverters._
* and can only be set dynamically.
*/
object DynamicConfig {
object Broker {
private val brokerConfigs = {
val configs = QuotaConfigs.brokerQuotaConfigs()

// Filter and define all dynamic configurations
KafkaConfig.configKeys
.filter { case (configName, _) => AllDynamicConfigs.contains(configName) }
.foreach { case (_, config) => configs.define(config) }
configs
}

object Broker {
private val brokerConfigs = QuotaConfigs.brokerQuotaConfigs()
DynamicBrokerConfig.addDynamicConfigs(brokerConfigs)
// In order to avoid circular reference, all DynamicBrokerConfig's variables which are initialized by `DynamicConfig.Broker` should be moved to `DynamicConfig.Broker`.
// Otherwise, those variables of DynamicBrokerConfig will see intermediate state of `DynamicConfig.Broker`, because `brokerConfigs` is created by `DynamicBrokerConfig.AllDynamicConfigs`
val nonDynamicProps: Set[String] = KafkaConfig.configNames.toSet -- brokerConfigs.names.asScala

def configKeys: util.Map[String, ConfigDef.ConfigKey] = brokerConfigs.configKeys

Expand Down

0 comments on commit 0a203a9

Please sign in to comment.