Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18530 Remove ZooKeeperInternals #18641

Merged
merged 22 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.kafka.server.config.{ConfigType, QuotaConfig}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.kafka.storage.internals.log.LogConfig

import java.net.{InetAddress, UnknownHostException}
import scala.jdk.CollectionConverters._
import scala.collection._

Expand Down Expand Up @@ -649,7 +650,7 @@ object ConfigCommand extends Logging {

if (hasEntityName && entityTypeVals.contains(ConfigType.IP)) {
Seq(entityName, ip).filter(options.has(_)).map(options.valueOf(_)).foreach { ipEntity =>
if (!DynamicConfig.Ip.isValidIpEntity(ipEntity))
if (!isValidIpEntity(ipEntity))
throw new IllegalArgumentException(s"The entity name for ${entityTypeVals.head} must be a valid IP or resolvable host, but it is: $ipEntity")
}
}
Expand Down Expand Up @@ -688,4 +689,13 @@ object ConfigCommand extends Logging {
}
}
}

def isValidIpEntity(ip: String): Boolean = {
try {
InetAddress.getByName(ip)
} catch {
case _: UnknownHostException => return false
}
true
}
}
44 changes: 19 additions & 25 deletions core/src/main/scala/kafka/server/ClientQuotaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Rate}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Sanitizer, Time}
import org.apache.kafka.server.config.{ClientQuotaManagerConfig, ZooKeeperInternals}
import org.apache.kafka.server.config.ClientQuotaManagerConfig
import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaType, QuotaType, QuotaUtils, SensorAccess, ThrottleCallback, ThrottledChannel}
import org.apache.kafka.server.util.ShutdownableThread
import org.apache.kafka.network.Session
Expand All @@ -55,7 +55,7 @@ object QuotaTypes {
object ClientQuotaManager {
// Purge sensors after 1 hour of inactivity
val InactiveSensorExpirationTimeSeconds = 3600

private val DefaultName = "<default>"
val DefaultClientIdQuotaEntity: KafkaQuotaEntity = KafkaQuotaEntity(None, Some(DefaultClientIdEntity))
val DefaultUserQuotaEntity: KafkaQuotaEntity = KafkaQuotaEntity(Some(DefaultUserEntity), None)
val DefaultUserClientIdQuotaEntity: KafkaQuotaEntity = KafkaQuotaEntity(Some(DefaultUserEntity), Some(DefaultClientIdEntity))
Expand All @@ -76,13 +76,13 @@ object ClientQuotaManager {

case object DefaultUserEntity extends BaseUserEntity {
override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.DEFAULT_USER
override def name: String = ZooKeeperInternals.DEFAULT_STRING
override def name: String = DefaultName
override def toString: String = "default user"
}

case object DefaultClientIdEntity extends ClientQuotaEntity.ConfigEntity {
override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.DEFAULT_CLIENT_ID
override def name: String = ZooKeeperInternals.DEFAULT_STRING
override def name: String = DefaultName
override def toString: String = "default client-id"
}

Expand All @@ -93,7 +93,7 @@ object ClientQuotaManager {

def sanitizedUser: String = userEntity.map {
case entity: UserEntity => entity.sanitizedUser
case DefaultUserEntity => ZooKeeperInternals.DEFAULT_STRING
case DefaultUserEntity => DefaultName
}.getOrElse("")

def clientId: String = clientIdEntity.map(_.name).getOrElse("")
Expand Down Expand Up @@ -260,7 +260,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
*/
def unrecordQuotaSensor(request: RequestChannel.Request, value: Double, timeMs: Long): Unit = {
val clientSensors = getOrCreateQuotaSensors(request.session, request.header.clientId)
clientSensors.quotaSensor.record(value * (-1), timeMs, false)
clientSensors.quotaSensor.record(value * -1, timeMs, false)
}

/**
Expand Down Expand Up @@ -403,12 +403,15 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
* Overrides quotas for <user>, <client-id> or <user, client-id> or the dynamic defaults
* for any of these levels.
*
* @param sanitizedUser user to override if quota applies to <user> or <user, client-id>
* @param clientId client to override if quota applies to <client-id> or <user, client-id>
* @param sanitizedClientId sanitized client ID to override if quota applies to <client-id> or <user, client-id>
* @param quota custom quota to apply or None if quota override is being removed
* @param userEntity user to override if quota applies to <user> or <user, client-id>
* @param clientEntity sanitized client entity to override if quota applies to <client-id> or <user, client-id>
* @param quota custom quota to apply or None if quota override is being removed
*/
def updateQuota(sanitizedUser: Option[String], clientId: Option[String], sanitizedClientId: Option[String], quota: Option[Quota]): Unit = {
def updateQuota(
userEntity: Option[BaseUserEntity],
clientEntity: Option[ClientQuotaEntity.ConfigEntity],
quota: Option[Quota]
): Unit = {
/*
* Acquire the write lock to apply changes in the quota objects.
* This method changes the quota in the overriddenQuota map and applies the update on the actual KafkaMetric object (if it exists).
Expand All @@ -418,29 +421,21 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
*/
lock.writeLock().lock()
try {
val userEntity = sanitizedUser.map {
case ZooKeeperInternals.DEFAULT_STRING => DefaultUserEntity
case user => UserEntity(user)
}
val clientIdEntity = sanitizedClientId.map {
case ZooKeeperInternals.DEFAULT_STRING => DefaultClientIdEntity
case _ => ClientIdEntity(clientId.getOrElse(throw new IllegalStateException("Client-id not provided")))
}
val quotaEntity = KafkaQuotaEntity(userEntity, clientIdEntity)
val quotaEntity = KafkaQuotaEntity(userEntity, clientEntity)

if (userEntity.nonEmpty) {
if (quotaEntity.clientIdEntity.nonEmpty)
quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
else
quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
} else if (clientIdEntity.nonEmpty)
} else if (clientEntity.nonEmpty)
quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled

quota match {
case Some(newQuota) => quotaCallback.updateQuota(clientQuotaType, quotaEntity, newQuota.bound)
case None => quotaCallback.removeQuota(clientQuotaType, quotaEntity)
}
val updatedEntity = if (userEntity.contains(DefaultUserEntity) || clientIdEntity.contains(DefaultClientIdEntity))
val updatedEntity = if (userEntity.contains(DefaultUserEntity) || clientEntity.contains(DefaultClientIdEntity))
None // more than one entity may need updating, so `updateQuotaMetricConfigs` will go through all metrics
else
Some(quotaEntity)
Expand All @@ -452,9 +447,8 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
}

/**
* Updates metrics configs. This is invoked when quota configs are updated in ZooKeeper
* or when partitions leaders change and custom callbacks that implement partition-based quotas
* have updated quotas.
* Updates metrics configs. This is invoked when quota configs are updated when partitions leaders change
* and custom callbacks that implement partition-based quotas have updated quotas.
*
* @param updatedQuotaEntity If set to one entity and quotas have only been enabled at one
* level, then an optimized update is performed with a single metric update. If None is provided,
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/ConfigHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.{Collections, Properties}
import kafka.log.UnifiedLog
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.Logging
import org.apache.kafka.server.config.{QuotaConfig, ZooKeeperInternals}
import org.apache.kafka.server.config.QuotaConfig
import org.apache.kafka.common.metrics.Quota._
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.server.ClientMetricsManager
Expand Down Expand Up @@ -146,7 +146,7 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager,
class BrokerConfigHandler(private val brokerConfig: KafkaConfig,
private val quotaManagers: QuotaManagers) extends ConfigHandler with Logging {
def processConfigChanges(brokerId: String, properties: Properties): Unit = {
if (brokerId == ZooKeeperInternals.DEFAULT_STRING)
if (brokerId.isEmpty)
brokerConfig.dynamicConfig.updateDefaultConfig(properties)
else if (brokerConfig.brokerId == brokerId.trim.toInt) {
brokerConfig.dynamicConfig.updateBrokerConfig(brokerConfig.brokerId, properties)
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ import scala.collection._
import scala.jdk.CollectionConverters._

/**
* Dynamic broker configurations are stored in ZooKeeper and may be defined at two levels:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in kraft mode, the two levels are still existent. could you please revise it instead of removing it?

Copy link
Contributor

@clolov clolov Feb 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heya @m1a2st! I have a follow-up ask, while the two levels are still present they will no longer be under the path specified in this file since the path is ZK-specific. As long as we don't use this comment to generate documentation I am okay with this being removed in a subsequent PR. However, if we use this file to generate documentation could you change this as part of this PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think those comments in DynamicBrokerConfig are not used to generate documentation, but it will be better to correct it - as least, the description about zk path must be removed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, then I will review once they have been changed 😊!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for @clolov and @chia7712, update this document. If there is any idea, please let me know

* Dynamic broker configurations may be defined at two levels:
* <ul>
* <li>Per-broker configs persisted at <tt>/configs/brokers/{brokerId}</tt>: These can be described/altered
* using AdminClient using the resource name brokerId.</li>
* <li>Cluster-wide defaults persisted at <tt>/configs/brokers/&lt;default&gt;</tt>: These can be described/altered
* using AdminClient using an empty resource name.</li>
* <li>Per-broker configurations are persisted at the controller and can be described
* or altered using AdminClient with the resource name brokerId.</li>
* <li>Cluster-wide default configurations are persisted at the cluster level and can be
* described or altered using AdminClient with an empty resource name.</li>
* </ul>
* The order of precedence for broker configs is:
* <ol>
Expand Down Expand Up @@ -367,7 +367,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
error(s"$errorMessage: $invalidPropNames")
}
}
removeInvalidProps(nonDynamicConfigs(props), "Non-dynamic configs configured in ZooKeeper will be ignored")
removeInvalidProps(nonDynamicConfigs(props), "Non-dynamic configs will be ignored")
removeInvalidProps(securityConfigsWithoutListenerPrefix(props),
"Security configs can be dynamically updated only using listener prefix, base configs will be ignored")
if (!perBrokerConfig)
Expand Down
14 changes: 1 addition & 13 deletions core/src/main/scala/kafka/server/DynamicConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ package kafka.server

import kafka.server.DynamicBrokerConfig.AllDynamicConfigs

import java.net.{InetAddress, UnknownHostException}
import java.util.Properties
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.coordinator.group.GroupConfig
import org.apache.kafka.server.config.{QuotaConfig, ZooKeeperInternals}
import org.apache.kafka.server.config.QuotaConfig

import java.util
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -83,17 +82,6 @@ object DynamicConfig {
def names: util.Set[String] = ipConfigs.names

def validate(props: Properties): util.Map[String, AnyRef] = DynamicConfig.validate(ipConfigs, props, customPropsAllowed = false)

def isValidIpEntity(ip: String): Boolean = {
if (ip != ZooKeeperInternals.DEFAULT_STRING) {
try {
InetAddress.getByName(ip)
} catch {
case _: UnknownHostException => return false
}
}
true
}
}

object ClientMetrics {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@
package kafka.server.metadata

import kafka.network.ConnectionQuotas
import kafka.server.ClientQuotaManager
import kafka.server.ClientQuotaManager.BaseUserEntity
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.metadata.ClientQuotaMetadataManager.transferToClientQuotaEntity
import kafka.utils.Logging
import org.apache.kafka.common.metrics.Quota
import org.apache.kafka.common.quota.ClientQuotaEntity
import org.apache.kafka.common.utils.Sanitizer
import org.apache.kafka.server.quota.ClientQuotaEntity.{ConfigEntity => ClientQuotaConfigEntity}

import java.net.{InetAddress, UnknownHostException}
import org.apache.kafka.image.{ClientQuotaDelta, ClientQuotasDelta}
import org.apache.kafka.server.config.{QuotaConfig, ZooKeeperInternals}
import org.apache.kafka.server.config.QuotaConfig

import scala.jdk.OptionConverters.RichOptionalDouble

Expand Down Expand Up @@ -145,27 +149,42 @@ class ClientQuotaMetadataManager(private[metadata] val quotaManagers: QuotaManag
}

// Convert entity into Options with sanitized values for QuotaManagers
val (sanitizedUser, sanitizedClientId) = quotaEntity match {
case UserEntity(user) => (Some(Sanitizer.sanitize(user)), None)
case DefaultUserEntity => (Some(ZooKeeperInternals.DEFAULT_STRING), None)
case ClientIdEntity(clientId) => (None, Some(Sanitizer.sanitize(clientId)))
case DefaultClientIdEntity => (None, Some(ZooKeeperInternals.DEFAULT_STRING))
case ExplicitUserExplicitClientIdEntity(user, clientId) => (Some(Sanitizer.sanitize(user)), Some(Sanitizer.sanitize(clientId)))
case ExplicitUserDefaultClientIdEntity(user) => (Some(Sanitizer.sanitize(user)), Some(ZooKeeperInternals.DEFAULT_STRING))
case DefaultUserExplicitClientIdEntity(clientId) => (Some(ZooKeeperInternals.DEFAULT_STRING), Some(Sanitizer.sanitize(clientId)))
case DefaultUserDefaultClientIdEntity => (Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING))
case IpEntity(_) | DefaultIpEntity => throw new IllegalStateException("Should not see IP quota entities here")
}
val (userEntity, clientEntity) = transferToClientQuotaEntity(quotaEntity)

val quotaValue = newValue.map(new Quota(_, true))
try {
manager.updateQuota(
sanitizedUser = sanitizedUser,
clientId = sanitizedClientId.map(Sanitizer.desanitize),
sanitizedClientId = sanitizedClientId,
quota = quotaValue)
userEntity = userEntity,
clientEntity = clientEntity,
quota = quotaValue
)
} catch {
case t: Throwable => error(s"Failed to update user-client quota $quotaEntity", t)
}
}
}

object ClientQuotaMetadataManager {

def transferToClientQuotaEntity(quotaEntity: QuotaEntity): (Option[BaseUserEntity], Option[ClientQuotaConfigEntity]) = {
quotaEntity match {
case UserEntity(user) =>
(Some(ClientQuotaManager.UserEntity(Sanitizer.sanitize(user))), None)
case DefaultUserEntity =>
(Some(ClientQuotaManager.DefaultUserEntity), None)
case ClientIdEntity(clientId) =>
(None, Some(ClientQuotaManager.ClientIdEntity(clientId)))
case DefaultClientIdEntity =>
(None, Some(ClientQuotaManager.DefaultClientIdEntity))
case ExplicitUserExplicitClientIdEntity(user, clientId) =>
(Some(ClientQuotaManager.UserEntity(Sanitizer.sanitize(user))), Some(ClientQuotaManager.ClientIdEntity(clientId)))
case ExplicitUserDefaultClientIdEntity(user) =>
(Some(ClientQuotaManager.UserEntity(Sanitizer.sanitize(user))), Some(ClientQuotaManager.DefaultClientIdEntity))
case DefaultUserExplicitClientIdEntity(clientId) =>
(Some(ClientQuotaManager.DefaultUserEntity), Some(ClientQuotaManager.ClientIdEntity(clientId)))
case DefaultUserDefaultClientIdEntity =>
(Some(ClientQuotaManager.DefaultUserEntity), Some(ClientQuotaManager.DefaultClientIdEntity))
case IpEntity(_) | DefaultIpEntity => throw new IllegalStateException("Should not see IP quota entities here")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import kafka.utils.Logging
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, CLIENT_METRICS, GROUP, TOPIC}
import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.server.config.{ConfigType, ZooKeeperInternals}
import org.apache.kafka.server.config.ConfigType
import org.apache.kafka.server.fault.FaultHandler


Expand Down Expand Up @@ -78,7 +78,7 @@ class DynamicConfigPublisher(
// These are stored in KRaft with an empty name field.
info("Updating cluster configuration : " +
toLoggableProps(resource, props).mkString(","))
nodeConfigHandler.processConfigChanges(ZooKeeperInternals.DEFAULT_STRING, props)
nodeConfigHandler.processConfigChanges(resource.name(), props)
} catch {
case t: Throwable => faultHandler.handleFault("Error updating " +
s"cluster with new configuration: ${toLoggableProps(resource, props).mkString(",")} " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -934,8 +934,11 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
// non-default value to trigger a new metric
val clientId = "test-client-1"
servers.foreach { server =>
server.quotaManagers.produce.updateQuota(None, Some(clientId), Some(clientId),
Some(Quota.upperBound(10000000)))
server.quotaManagers.produce.updateQuota(
None,
Some(ClientQuotaManager.ClientIdEntity(clientId)),
Some(Quota.upperBound(10000000))
)
}
val (producerThread, consumerThread) = startProduceConsume(retries = 0, groupProtocol, clientId)
TestUtils.waitUntilTrue(() => consumerThread.received >= 5, "Messages not sent")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
*/
package kafka.server.metadata

import kafka.server.ClientQuotaManager
import org.apache.kafka.image.ClientQuotaDelta
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertThrows}
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertThrows}
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.function.Executable

Expand All @@ -33,4 +34,43 @@ class ClientQuotaMetadataManagerTest {
assertDoesNotThrow { new Executable { def execute(): Unit = manager.handleIpQuota(IpEntity("192.168.1.1"), new ClientQuotaDelta(null)) } }
assertDoesNotThrow { new Executable { def execute(): Unit = manager.handleIpQuota(IpEntity("2001:db8::1"), new ClientQuotaDelta(null)) } }
}

@Test
def testTransferToClientQuotaEntity(): Unit = {

assertThrows(classOf[IllegalStateException],() => ClientQuotaMetadataManager.transferToClientQuotaEntity(IpEntity("a")))
assertThrows(classOf[IllegalStateException],() => ClientQuotaMetadataManager.transferToClientQuotaEntity(DefaultIpEntity))
assertEquals(
(Some(ClientQuotaManager.UserEntity("user")), None),
ClientQuotaMetadataManager.transferToClientQuotaEntity(UserEntity("user"))
)
assertEquals(
(Some(ClientQuotaManager.DefaultUserEntity), None),
ClientQuotaMetadataManager.transferToClientQuotaEntity(DefaultUserEntity)
)
assertEquals(
(None, Some(ClientQuotaManager.ClientIdEntity("client"))),
ClientQuotaMetadataManager.transferToClientQuotaEntity(ClientIdEntity("client"))
)
assertEquals(
(None, Some(ClientQuotaManager.DefaultClientIdEntity)),
ClientQuotaMetadataManager.transferToClientQuotaEntity(DefaultClientIdEntity)
)
assertEquals(
(Some(ClientQuotaManager.UserEntity("user")), Some(ClientQuotaManager.ClientIdEntity("client"))),
ClientQuotaMetadataManager.transferToClientQuotaEntity(ExplicitUserExplicitClientIdEntity("user", "client"))
)
assertEquals(
(Some(ClientQuotaManager.UserEntity("user")), Some(ClientQuotaManager.DefaultClientIdEntity)),
ClientQuotaMetadataManager.transferToClientQuotaEntity(ExplicitUserDefaultClientIdEntity("user"))
)
assertEquals(
(Some(ClientQuotaManager.DefaultUserEntity), Some(ClientQuotaManager.ClientIdEntity("client"))),
ClientQuotaMetadataManager.transferToClientQuotaEntity(DefaultUserExplicitClientIdEntity("client"))
)
assertEquals(
(Some(ClientQuotaManager.DefaultUserEntity), Some(ClientQuotaManager.DefaultClientIdEntity)),
ClientQuotaMetadataManager.transferToClientQuotaEntity(DefaultUserDefaultClientIdEntity)
)
}
}
Loading