Skip to content

Commit

Permalink
KAFKA-18530 Remove ZooKeeperInternals (#18641)
Browse files Browse the repository at this point in the history
Since zk has been removed in 4.0, config handlers no longer need to handle the "<default>" value. This PR streamlines the config update process by eliminating the unnecessary string checks for "<default>"

Reviewers: Christo Lolov <[email protected]>, Ismael Juma <[email protected]>, Chia-Ping Tsai <[email protected]>
  • Loading branch information
m1a2st authored Feb 6, 2025
1 parent 34e7136 commit a3d9d88
Show file tree
Hide file tree
Showing 14 changed files with 332 additions and 222 deletions.
12 changes: 11 additions & 1 deletion core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.kafka.server.config.{ConfigType, QuotaConfig}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.kafka.storage.internals.log.LogConfig

import java.net.{InetAddress, UnknownHostException}
import scala.jdk.CollectionConverters._
import scala.collection._

Expand Down Expand Up @@ -649,7 +650,7 @@ object ConfigCommand extends Logging {

if (hasEntityName && entityTypeVals.contains(ConfigType.IP)) {
Seq(entityName, ip).filter(options.has(_)).map(options.valueOf(_)).foreach { ipEntity =>
if (!DynamicConfig.Ip.isValidIpEntity(ipEntity))
if (!isValidIpEntity(ipEntity))
throw new IllegalArgumentException(s"The entity name for ${entityTypeVals.head} must be a valid IP or resolvable host, but it is: $ipEntity")
}
}
Expand Down Expand Up @@ -688,4 +689,13 @@ object ConfigCommand extends Logging {
}
}
}

def isValidIpEntity(ip: String): Boolean = {
try {
InetAddress.getByName(ip)
} catch {
case _: UnknownHostException => return false
}
true
}
}
44 changes: 19 additions & 25 deletions core/src/main/scala/kafka/server/ClientQuotaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Rate}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Sanitizer, Time}
import org.apache.kafka.server.config.{ClientQuotaManagerConfig, ZooKeeperInternals}
import org.apache.kafka.server.config.ClientQuotaManagerConfig
import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaType, QuotaType, QuotaUtils, SensorAccess, ThrottleCallback, ThrottledChannel}
import org.apache.kafka.server.util.ShutdownableThread
import org.apache.kafka.network.Session
Expand All @@ -55,7 +55,7 @@ object QuotaTypes {
object ClientQuotaManager {
// Purge sensors after 1 hour of inactivity
val InactiveSensorExpirationTimeSeconds = 3600

private val DefaultName = "<default>"
val DefaultClientIdQuotaEntity: KafkaQuotaEntity = KafkaQuotaEntity(None, Some(DefaultClientIdEntity))
val DefaultUserQuotaEntity: KafkaQuotaEntity = KafkaQuotaEntity(Some(DefaultUserEntity), None)
val DefaultUserClientIdQuotaEntity: KafkaQuotaEntity = KafkaQuotaEntity(Some(DefaultUserEntity), Some(DefaultClientIdEntity))
Expand All @@ -76,13 +76,13 @@ object ClientQuotaManager {

case object DefaultUserEntity extends BaseUserEntity {
override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.DEFAULT_USER
override def name: String = ZooKeeperInternals.DEFAULT_STRING
override def name: String = DefaultName
override def toString: String = "default user"
}

case object DefaultClientIdEntity extends ClientQuotaEntity.ConfigEntity {
override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.DEFAULT_CLIENT_ID
override def name: String = ZooKeeperInternals.DEFAULT_STRING
override def name: String = DefaultName
override def toString: String = "default client-id"
}

Expand All @@ -93,7 +93,7 @@ object ClientQuotaManager {

def sanitizedUser: String = userEntity.map {
case entity: UserEntity => entity.sanitizedUser
case DefaultUserEntity => ZooKeeperInternals.DEFAULT_STRING
case DefaultUserEntity => DefaultName
}.getOrElse("")

def clientId: String = clientIdEntity.map(_.name).getOrElse("")
Expand Down Expand Up @@ -260,7 +260,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
*/
def unrecordQuotaSensor(request: RequestChannel.Request, value: Double, timeMs: Long): Unit = {
val clientSensors = getOrCreateQuotaSensors(request.session, request.header.clientId)
clientSensors.quotaSensor.record(value * (-1), timeMs, false)
clientSensors.quotaSensor.record(value * -1, timeMs, false)
}

/**
Expand Down Expand Up @@ -403,12 +403,15 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
* Overrides quotas for <user>, <client-id> or <user, client-id> or the dynamic defaults
* for any of these levels.
*
* @param sanitizedUser user to override if quota applies to <user> or <user, client-id>
* @param clientId client to override if quota applies to <client-id> or <user, client-id>
* @param sanitizedClientId sanitized client ID to override if quota applies to <client-id> or <user, client-id>
* @param quota custom quota to apply or None if quota override is being removed
* @param userEntity user to override if quota applies to <user> or <user, client-id>
* @param clientEntity sanitized client entity to override if quota applies to <client-id> or <user, client-id>
* @param quota custom quota to apply or None if quota override is being removed
*/
def updateQuota(sanitizedUser: Option[String], clientId: Option[String], sanitizedClientId: Option[String], quota: Option[Quota]): Unit = {
def updateQuota(
userEntity: Option[BaseUserEntity],
clientEntity: Option[ClientQuotaEntity.ConfigEntity],
quota: Option[Quota]
): Unit = {
/*
* Acquire the write lock to apply changes in the quota objects.
* This method changes the quota in the overriddenQuota map and applies the update on the actual KafkaMetric object (if it exists).
Expand All @@ -418,29 +421,21 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
*/
lock.writeLock().lock()
try {
val userEntity = sanitizedUser.map {
case ZooKeeperInternals.DEFAULT_STRING => DefaultUserEntity
case user => UserEntity(user)
}
val clientIdEntity = sanitizedClientId.map {
case ZooKeeperInternals.DEFAULT_STRING => DefaultClientIdEntity
case _ => ClientIdEntity(clientId.getOrElse(throw new IllegalStateException("Client-id not provided")))
}
val quotaEntity = KafkaQuotaEntity(userEntity, clientIdEntity)
val quotaEntity = KafkaQuotaEntity(userEntity, clientEntity)

if (userEntity.nonEmpty) {
if (quotaEntity.clientIdEntity.nonEmpty)
quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
else
quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
} else if (clientIdEntity.nonEmpty)
} else if (clientEntity.nonEmpty)
quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled

quota match {
case Some(newQuota) => quotaCallback.updateQuota(clientQuotaType, quotaEntity, newQuota.bound)
case None => quotaCallback.removeQuota(clientQuotaType, quotaEntity)
}
val updatedEntity = if (userEntity.contains(DefaultUserEntity) || clientIdEntity.contains(DefaultClientIdEntity))
val updatedEntity = if (userEntity.contains(DefaultUserEntity) || clientEntity.contains(DefaultClientIdEntity))
None // more than one entity may need updating, so `updateQuotaMetricConfigs` will go through all metrics
else
Some(quotaEntity)
Expand All @@ -452,9 +447,8 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
}

/**
* Updates metrics configs. This is invoked when quota configs are updated in ZooKeeper
* or when partitions leaders change and custom callbacks that implement partition-based quotas
* have updated quotas.
* Updates metrics configs. This is invoked when quota configs are updated when partitions leaders change
* and custom callbacks that implement partition-based quotas have updated quotas.
*
* @param updatedQuotaEntity If set to one entity and quotas have only been enabled at one
* level, then an optimized update is performed with a single metric update. If None is provided,
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/ConfigHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.{Collections, Properties}
import kafka.log.UnifiedLog
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.Logging
import org.apache.kafka.server.config.{QuotaConfig, ZooKeeperInternals}
import org.apache.kafka.server.config.QuotaConfig
import org.apache.kafka.common.metrics.Quota._
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.server.ClientMetricsManager
Expand Down Expand Up @@ -146,7 +146,7 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager,
class BrokerConfigHandler(private val brokerConfig: KafkaConfig,
private val quotaManagers: QuotaManagers) extends ConfigHandler with Logging {
def processConfigChanges(brokerId: String, properties: Properties): Unit = {
if (brokerId == ZooKeeperInternals.DEFAULT_STRING)
if (brokerId.isEmpty)
brokerConfig.dynamicConfig.updateDefaultConfig(properties)
else if (brokerConfig.brokerId == brokerId.trim.toInt) {
brokerConfig.dynamicConfig.updateBrokerConfig(brokerConfig.brokerId, properties)
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ import scala.collection._
import scala.jdk.CollectionConverters._

/**
* Dynamic broker configurations are stored in ZooKeeper and may be defined at two levels:
* Dynamic broker configurations may be defined at two levels:
* <ul>
* <li>Per-broker configs persisted at <tt>/configs/brokers/{brokerId}</tt>: These can be described/altered
* using AdminClient using the resource name brokerId.</li>
* <li>Cluster-wide defaults persisted at <tt>/configs/brokers/&lt;default&gt;</tt>: These can be described/altered
* using AdminClient using an empty resource name.</li>
* <li>Per-broker configurations are persisted at the controller and can be described
* or altered using AdminClient with the resource name brokerId.</li>
* <li>Cluster-wide default configurations are persisted at the cluster level and can be
* described or altered using AdminClient with an empty resource name.</li>
* </ul>
* The order of precedence for broker configs is:
* <ol>
Expand Down Expand Up @@ -367,7 +367,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
error(s"$errorMessage: $invalidPropNames")
}
}
removeInvalidProps(nonDynamicConfigs(props), "Non-dynamic configs configured in ZooKeeper will be ignored")
removeInvalidProps(nonDynamicConfigs(props), "Non-dynamic configs will be ignored")
removeInvalidProps(securityConfigsWithoutListenerPrefix(props),
"Security configs can be dynamically updated only using listener prefix, base configs will be ignored")
if (!perBrokerConfig)
Expand Down
14 changes: 1 addition & 13 deletions core/src/main/scala/kafka/server/DynamicConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ package kafka.server

import kafka.server.DynamicBrokerConfig.AllDynamicConfigs

import java.net.{InetAddress, UnknownHostException}
import java.util.Properties
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.coordinator.group.GroupConfig
import org.apache.kafka.server.config.{QuotaConfig, ZooKeeperInternals}
import org.apache.kafka.server.config.QuotaConfig

import java.util
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -83,17 +82,6 @@ object DynamicConfig {
def names: util.Set[String] = ipConfigs.names

def validate(props: Properties): util.Map[String, AnyRef] = DynamicConfig.validate(ipConfigs, props, customPropsAllowed = false)

def isValidIpEntity(ip: String): Boolean = {
if (ip != ZooKeeperInternals.DEFAULT_STRING) {
try {
InetAddress.getByName(ip)
} catch {
case _: UnknownHostException => return false
}
}
true
}
}

object ClientMetrics {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@
package kafka.server.metadata

import kafka.network.ConnectionQuotas
import kafka.server.ClientQuotaManager
import kafka.server.ClientQuotaManager.BaseUserEntity
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.metadata.ClientQuotaMetadataManager.transferToClientQuotaEntity
import kafka.utils.Logging
import org.apache.kafka.common.metrics.Quota
import org.apache.kafka.common.quota.ClientQuotaEntity
import org.apache.kafka.common.utils.Sanitizer
import org.apache.kafka.server.quota.ClientQuotaEntity.{ConfigEntity => ClientQuotaConfigEntity}

import java.net.{InetAddress, UnknownHostException}
import org.apache.kafka.image.{ClientQuotaDelta, ClientQuotasDelta}
import org.apache.kafka.server.config.{QuotaConfig, ZooKeeperInternals}
import org.apache.kafka.server.config.QuotaConfig

import scala.jdk.OptionConverters.RichOptionalDouble

Expand Down Expand Up @@ -145,27 +149,42 @@ class ClientQuotaMetadataManager(private[metadata] val quotaManagers: QuotaManag
}

// Convert entity into Options with sanitized values for QuotaManagers
val (sanitizedUser, sanitizedClientId) = quotaEntity match {
case UserEntity(user) => (Some(Sanitizer.sanitize(user)), None)
case DefaultUserEntity => (Some(ZooKeeperInternals.DEFAULT_STRING), None)
case ClientIdEntity(clientId) => (None, Some(Sanitizer.sanitize(clientId)))
case DefaultClientIdEntity => (None, Some(ZooKeeperInternals.DEFAULT_STRING))
case ExplicitUserExplicitClientIdEntity(user, clientId) => (Some(Sanitizer.sanitize(user)), Some(Sanitizer.sanitize(clientId)))
case ExplicitUserDefaultClientIdEntity(user) => (Some(Sanitizer.sanitize(user)), Some(ZooKeeperInternals.DEFAULT_STRING))
case DefaultUserExplicitClientIdEntity(clientId) => (Some(ZooKeeperInternals.DEFAULT_STRING), Some(Sanitizer.sanitize(clientId)))
case DefaultUserDefaultClientIdEntity => (Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING))
case IpEntity(_) | DefaultIpEntity => throw new IllegalStateException("Should not see IP quota entities here")
}
val (userEntity, clientEntity) = transferToClientQuotaEntity(quotaEntity)

val quotaValue = newValue.map(new Quota(_, true))
try {
manager.updateQuota(
sanitizedUser = sanitizedUser,
clientId = sanitizedClientId.map(Sanitizer.desanitize),
sanitizedClientId = sanitizedClientId,
quota = quotaValue)
userEntity = userEntity,
clientEntity = clientEntity,
quota = quotaValue
)
} catch {
case t: Throwable => error(s"Failed to update user-client quota $quotaEntity", t)
}
}
}

object ClientQuotaMetadataManager {

def transferToClientQuotaEntity(quotaEntity: QuotaEntity): (Option[BaseUserEntity], Option[ClientQuotaConfigEntity]) = {
quotaEntity match {
case UserEntity(user) =>
(Some(ClientQuotaManager.UserEntity(Sanitizer.sanitize(user))), None)
case DefaultUserEntity =>
(Some(ClientQuotaManager.DefaultUserEntity), None)
case ClientIdEntity(clientId) =>
(None, Some(ClientQuotaManager.ClientIdEntity(clientId)))
case DefaultClientIdEntity =>
(None, Some(ClientQuotaManager.DefaultClientIdEntity))
case ExplicitUserExplicitClientIdEntity(user, clientId) =>
(Some(ClientQuotaManager.UserEntity(Sanitizer.sanitize(user))), Some(ClientQuotaManager.ClientIdEntity(clientId)))
case ExplicitUserDefaultClientIdEntity(user) =>
(Some(ClientQuotaManager.UserEntity(Sanitizer.sanitize(user))), Some(ClientQuotaManager.DefaultClientIdEntity))
case DefaultUserExplicitClientIdEntity(clientId) =>
(Some(ClientQuotaManager.DefaultUserEntity), Some(ClientQuotaManager.ClientIdEntity(clientId)))
case DefaultUserDefaultClientIdEntity =>
(Some(ClientQuotaManager.DefaultUserEntity), Some(ClientQuotaManager.DefaultClientIdEntity))
case IpEntity(_) | DefaultIpEntity => throw new IllegalStateException("Should not see IP quota entities here")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import kafka.utils.Logging
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, CLIENT_METRICS, GROUP, TOPIC}
import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.server.config.{ConfigType, ZooKeeperInternals}
import org.apache.kafka.server.config.ConfigType
import org.apache.kafka.server.fault.FaultHandler


Expand Down Expand Up @@ -78,7 +78,7 @@ class DynamicConfigPublisher(
// These are stored in KRaft with an empty name field.
info("Updating cluster configuration : " +
toLoggableProps(resource, props).mkString(","))
nodeConfigHandler.processConfigChanges(ZooKeeperInternals.DEFAULT_STRING, props)
nodeConfigHandler.processConfigChanges(resource.name(), props)
} catch {
case t: Throwable => faultHandler.handleFault("Error updating " +
s"cluster with new configuration: ${toLoggableProps(resource, props).mkString(",")} " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -934,8 +934,11 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
// non-default value to trigger a new metric
val clientId = "test-client-1"
servers.foreach { server =>
server.quotaManagers.produce.updateQuota(None, Some(clientId), Some(clientId),
Some(Quota.upperBound(10000000)))
server.quotaManagers.produce.updateQuota(
None,
Some(ClientQuotaManager.ClientIdEntity(clientId)),
Some(Quota.upperBound(10000000))
)
}
val (producerThread, consumerThread) = startProduceConsume(retries = 0, groupProtocol, clientId)
TestUtils.waitUntilTrue(() => consumerThread.received >= 5, "Messages not sent")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
*/
package kafka.server.metadata

import kafka.server.ClientQuotaManager
import org.apache.kafka.image.ClientQuotaDelta
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertThrows}
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertThrows}
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.function.Executable

Expand All @@ -33,4 +34,43 @@ class ClientQuotaMetadataManagerTest {
assertDoesNotThrow { new Executable { def execute(): Unit = manager.handleIpQuota(IpEntity("192.168.1.1"), new ClientQuotaDelta(null)) } }
assertDoesNotThrow { new Executable { def execute(): Unit = manager.handleIpQuota(IpEntity("2001:db8::1"), new ClientQuotaDelta(null)) } }
}

@Test
def testTransferToClientQuotaEntity(): Unit = {

assertThrows(classOf[IllegalStateException],() => ClientQuotaMetadataManager.transferToClientQuotaEntity(IpEntity("a")))
assertThrows(classOf[IllegalStateException],() => ClientQuotaMetadataManager.transferToClientQuotaEntity(DefaultIpEntity))
assertEquals(
(Some(ClientQuotaManager.UserEntity("user")), None),
ClientQuotaMetadataManager.transferToClientQuotaEntity(UserEntity("user"))
)
assertEquals(
(Some(ClientQuotaManager.DefaultUserEntity), None),
ClientQuotaMetadataManager.transferToClientQuotaEntity(DefaultUserEntity)
)
assertEquals(
(None, Some(ClientQuotaManager.ClientIdEntity("client"))),
ClientQuotaMetadataManager.transferToClientQuotaEntity(ClientIdEntity("client"))
)
assertEquals(
(None, Some(ClientQuotaManager.DefaultClientIdEntity)),
ClientQuotaMetadataManager.transferToClientQuotaEntity(DefaultClientIdEntity)
)
assertEquals(
(Some(ClientQuotaManager.UserEntity("user")), Some(ClientQuotaManager.ClientIdEntity("client"))),
ClientQuotaMetadataManager.transferToClientQuotaEntity(ExplicitUserExplicitClientIdEntity("user", "client"))
)
assertEquals(
(Some(ClientQuotaManager.UserEntity("user")), Some(ClientQuotaManager.DefaultClientIdEntity)),
ClientQuotaMetadataManager.transferToClientQuotaEntity(ExplicitUserDefaultClientIdEntity("user"))
)
assertEquals(
(Some(ClientQuotaManager.DefaultUserEntity), Some(ClientQuotaManager.ClientIdEntity("client"))),
ClientQuotaMetadataManager.transferToClientQuotaEntity(DefaultUserExplicitClientIdEntity("client"))
)
assertEquals(
(Some(ClientQuotaManager.DefaultUserEntity), Some(ClientQuotaManager.DefaultClientIdEntity)),
ClientQuotaMetadataManager.transferToClientQuotaEntity(DefaultUserDefaultClientIdEntity)
)
}
}
Loading

0 comments on commit a3d9d88

Please sign in to comment.