Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18530 Remove ZooKeeperInternals #18641

Merged
merged 22 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 38 additions & 19 deletions core/src/main/scala/kafka/server/ClientQuotaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Rate}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Sanitizer, Time}
import org.apache.kafka.server.config.{ClientQuotaManagerConfig, ZooKeeperInternals}
import org.apache.kafka.server.config.ClientQuotaManagerConfig
import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaType, QuotaType, QuotaUtils, SensorAccess, ThrottleCallback, ThrottledChannel}
import org.apache.kafka.server.util.ShutdownableThread
import org.apache.kafka.network.Session
Expand All @@ -55,7 +55,7 @@ object QuotaTypes {
object ClientQuotaManager {
// Purge sensors after 1 hour of inactivity
val InactiveSensorExpirationTimeSeconds = 3600

val DefaultString = "<default>"
val DefaultClientIdQuotaEntity: KafkaQuotaEntity = KafkaQuotaEntity(None, Some(DefaultClientIdEntity))
val DefaultUserQuotaEntity: KafkaQuotaEntity = KafkaQuotaEntity(Some(DefaultUserEntity), None)
val DefaultUserClientIdQuotaEntity: KafkaQuotaEntity = KafkaQuotaEntity(Some(DefaultUserEntity), Some(DefaultClientIdEntity))
Expand All @@ -76,13 +76,13 @@ object ClientQuotaManager {

case object DefaultUserEntity extends BaseUserEntity {
override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.DEFAULT_USER
override def name: String = ZooKeeperInternals.DEFAULT_STRING
override def name: String = DefaultString
override def toString: String = "default user"
}

case object DefaultClientIdEntity extends ClientQuotaEntity.ConfigEntity {
override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.DEFAULT_CLIENT_ID
override def name: String = ZooKeeperInternals.DEFAULT_STRING
override def name: String = DefaultString
override def toString: String = "default client-id"
}

Expand All @@ -93,7 +93,7 @@ object ClientQuotaManager {

def sanitizedUser: String = userEntity.map {
case entity: UserEntity => entity.sanitizedUser
case DefaultUserEntity => ZooKeeperInternals.DEFAULT_STRING
case DefaultUserEntity => DefaultString
}.getOrElse("")

def clientId: String = clientIdEntity.map(_.name).getOrElse("")
Expand Down Expand Up @@ -260,7 +260,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
*/
def unrecordQuotaSensor(request: RequestChannel.Request, value: Double, timeMs: Long): Unit = {
val clientSensors = getOrCreateQuotaSensors(request.session, request.header.clientId)
clientSensors.quotaSensor.record(value * (-1), timeMs, false)
clientSensors.quotaSensor.record(value * -1, timeMs, false)
}

/**
Expand Down Expand Up @@ -404,11 +404,14 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
* for any of these levels.
*
* @param sanitizedUser user to override if quota applies to <user> or <user, client-id>
* @param clientId client to override if quota applies to <client-id> or <user, client-id>
* @param sanitizedClientId sanitized client ID to override if quota applies to <client-id> or <user, client-id>
* @param quota custom quota to apply or None if quota override is being removed
*/
def updateQuota(sanitizedUser: Option[String], clientId: Option[String], sanitizedClientId: Option[String], quota: Option[Quota]): Unit = {
def updateQuota(
sanitizedUser: Option[BaseUserEntity],
sanitizedClientId: Option[ClientQuotaEntity.ConfigEntity],
quota: Option[Quota]
): Unit = {
/*
* Acquire the write lock to apply changes in the quota objects.
* This method changes the quota in the overriddenQuota map and applies the update on the actual KafkaMetric object (if it exists).
Expand All @@ -418,14 +421,9 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
*/
lock.writeLock().lock()
try {
val userEntity = sanitizedUser.map {
case ZooKeeperInternals.DEFAULT_STRING => DefaultUserEntity
case user => UserEntity(user)
}
val clientIdEntity = sanitizedClientId.map {
case ZooKeeperInternals.DEFAULT_STRING => DefaultClientIdEntity
case _ => ClientIdEntity(clientId.getOrElse(throw new IllegalStateException("Client-id not provided")))
}
val userEntity = getOrDefaultUser(sanitizedUser)
val clientIdEntity = getOrDefaultClient(sanitizedClientId)

val quotaEntity = KafkaQuotaEntity(userEntity, clientIdEntity)

if (userEntity.nonEmpty) {
Expand All @@ -451,10 +449,31 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
}
}

private def getOrDefaultClient(
sanitizedClientId: Option[ClientQuotaEntity.ConfigEntity]
): Option[ClientQuotaEntity.ConfigEntity] = {
if (sanitizedClientId.isEmpty)
None
else if (sanitizedClientId.get.name() == DefaultString)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in kraft, we don't use <default> so this check is weird. If this is used to fix test, then maybe we should revise the test

Some(DefaultClientIdEntity)
else {
val clientId = sanitizedClientId.map(s => Sanitizer.desanitize(s.name()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should handle the ClientIdEntity only.

Some(ClientIdEntity(clientId.getOrElse(throw new IllegalStateException("Client-id not provided"))))
}
}

private def getOrDefaultUser(sanitizedUser: Option[BaseUserEntity]): Option[BaseUserEntity] = {
if (sanitizedUser.isEmpty)
None
else if (sanitizedUser.get.name() == DefaultString)
Some(DefaultUserEntity)
else
sanitizedUser
}

/**
* Updates metrics configs. This is invoked when quota configs are updated in ZooKeeper
* or when partitions leaders change and custom callbacks that implement partition-based quotas
* have updated quotas.
* Updates metrics configs. This is invoked when quota configs are updated when partitions leaders change
* and custom callbacks that implement partition-based quotas have updated quotas.
*
* @param updatedQuotaEntity If set to one entity and quotas have only been enabled at one
* level, then an optimized update is performed with a single metric update. If None is provided,
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/ConfigHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.{Collections, Properties}
import kafka.log.UnifiedLog
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.Logging
import org.apache.kafka.server.config.{QuotaConfig, ZooKeeperInternals}
import org.apache.kafka.server.config.QuotaConfig
import org.apache.kafka.common.metrics.Quota._
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.server.ClientMetricsManager
Expand Down Expand Up @@ -146,7 +146,7 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager,
class BrokerConfigHandler(private val brokerConfig: KafkaConfig,
private val quotaManagers: QuotaManagers) extends ConfigHandler with Logging {
def processConfigChanges(brokerId: String, properties: Properties): Unit = {
if (brokerId == ZooKeeperInternals.DEFAULT_STRING)
if (brokerId.isEmpty)
brokerConfig.dynamicConfig.updateDefaultConfig(properties)
else if (brokerConfig.brokerId == brokerId.trim.toInt) {
brokerConfig.dynamicConfig.updateBrokerConfig(brokerConfig.brokerId, properties)
Expand Down
9 changes: 1 addition & 8 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,6 @@ import scala.collection._
import scala.jdk.CollectionConverters._

/**
* Dynamic broker configurations are stored in ZooKeeper and may be defined at two levels:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in kraft mode, the two levels are still existent. could you please revise it instead of removing it?

Copy link
Contributor

@clolov clolov Feb 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heya @m1a2st! I have a follow-up ask, while the two levels are still present they will no longer be under the path specified in this file since the path is ZK-specific. As long as we don't use this comment to generate documentation I am okay with this being removed in a subsequent PR. However, if we use this file to generate documentation could you change this as part of this PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think those comments in DynamicBrokerConfig are not used to generate documentation, but it will be better to correct it - as least, the description about zk path must be removed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, then I will review once they have been changed 😊!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for @clolov and @chia7712, update this document. If there is any idea, please let me know

* <ul>
* <li>Per-broker configs persisted at <tt>/configs/brokers/{brokerId}</tt>: These can be described/altered
* using AdminClient using the resource name brokerId.</li>
* <li>Cluster-wide defaults persisted at <tt>/configs/brokers/&lt;default&gt;</tt>: These can be described/altered
* using AdminClient using an empty resource name.</li>
* </ul>
* The order of precedence for broker configs is:
* <ol>
* <li>STATIC_BROKER_CONFIG: properties that broker is started up with, typically from server.properties file</li>
Expand Down Expand Up @@ -391,7 +384,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
error(s"$errorMessage: $invalidPropNames")
}
}
removeInvalidProps(nonDynamicConfigs(props), "Non-dynamic configs configured in ZooKeeper will be ignored")
removeInvalidProps(nonDynamicConfigs(props), "Non-dynamic configs will be ignored")
removeInvalidProps(securityConfigsWithoutListenerPrefix(props),
"Security configs can be dynamically updated only using listener prefix, base configs will be ignored")
if (!perBrokerConfig)
Expand Down
12 changes: 5 additions & 7 deletions core/src/main/scala/kafka/server/DynamicConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.net.{InetAddress, UnknownHostException}
import java.util.Properties
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.coordinator.group.GroupConfig
import org.apache.kafka.server.config.{QuotaConfig, ZooKeeperInternals}
import org.apache.kafka.server.config.QuotaConfig

import java.util
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -85,12 +85,10 @@ object DynamicConfig {
def validate(props: Properties): util.Map[String, AnyRef] = DynamicConfig.validate(ipConfigs, props, customPropsAllowed = false)

def isValidIpEntity(ip: String): Boolean = {
m1a2st marked this conversation as resolved.
Show resolved Hide resolved
if (ip != ZooKeeperInternals.DEFAULT_STRING) {
try {
InetAddress.getByName(ip)
} catch {
case _: UnknownHostException => return false
}
try {
InetAddress.getByName(ip)
} catch {
case _: UnknownHostException => return false
}
true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@
package kafka.server.metadata

import kafka.network.ConnectionQuotas
import kafka.server.ClientQuotaManager
import kafka.server.ClientQuotaManager.BaseUserEntity
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.metadata.ClientQuotaMetadataManager.transferToClientQuotaEntity
import kafka.utils.Logging
import org.apache.kafka.common.metrics.Quota
import org.apache.kafka.common.quota.ClientQuotaEntity
import org.apache.kafka.common.utils.Sanitizer
import org.apache.kafka.server.quota.ClientQuotaEntity.{ConfigEntity => ClientQuotaConfigEntity}

import java.net.{InetAddress, UnknownHostException}
import org.apache.kafka.image.{ClientQuotaDelta, ClientQuotasDelta}
import org.apache.kafka.server.config.{QuotaConfig, ZooKeeperInternals}
import org.apache.kafka.server.config.QuotaConfig

import scala.jdk.OptionConverters.RichOptionalDouble

Expand Down Expand Up @@ -145,27 +149,43 @@ class ClientQuotaMetadataManager(private[metadata] val quotaManagers: QuotaManag
}

// Convert entity into Options with sanitized values for QuotaManagers
val (sanitizedUser, sanitizedClientId) = quotaEntity match {
case UserEntity(user) => (Some(Sanitizer.sanitize(user)), None)
case DefaultUserEntity => (Some(ZooKeeperInternals.DEFAULT_STRING), None)
case ClientIdEntity(clientId) => (None, Some(Sanitizer.sanitize(clientId)))
case DefaultClientIdEntity => (None, Some(ZooKeeperInternals.DEFAULT_STRING))
case ExplicitUserExplicitClientIdEntity(user, clientId) => (Some(Sanitizer.sanitize(user)), Some(Sanitizer.sanitize(clientId)))
case ExplicitUserDefaultClientIdEntity(user) => (Some(Sanitizer.sanitize(user)), Some(ZooKeeperInternals.DEFAULT_STRING))
case DefaultUserExplicitClientIdEntity(clientId) => (Some(ZooKeeperInternals.DEFAULT_STRING), Some(Sanitizer.sanitize(clientId)))
case DefaultUserDefaultClientIdEntity => (Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING))
case IpEntity(_) | DefaultIpEntity => throw new IllegalStateException("Should not see IP quota entities here")
}
val (sanitizedUser, sanitizedClientId) = transferToClientQuotaEntity(quotaEntity)

val quotaValue = newValue.map(new Quota(_, true))
try {
manager.updateQuota(
sanitizedUser = sanitizedUser,
clientId = sanitizedClientId.map(Sanitizer.desanitize),
sanitizedClientId = sanitizedClientId,
quota = quotaValue)
quota = quotaValue
)
} catch {
case t: Throwable => error(s"Failed to update user-client quota $quotaEntity", t)
}
}
}

object ClientQuotaMetadataManager {

def transferToClientQuotaEntity(quotaEntity: QuotaEntity): (Option[BaseUserEntity], Option[ClientQuotaConfigEntity]) = {
val (sanitizedUser, sanitizedClientId) = quotaEntity match {
case UserEntity(user) =>
(Some(ClientQuotaManager.UserEntity(Sanitizer.sanitize(user))), None)
case DefaultUserEntity =>
(Some(ClientQuotaManager.DefaultUserEntity), None)
case ClientIdEntity(clientId) =>
(None, Some(ClientQuotaManager.ClientIdEntity(Sanitizer.sanitize(clientId))))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to sanitize data in kraft mode? the zk path is gone so we don't need to align the data for zk anymore.

Copy link
Contributor Author

@m1a2st m1a2st Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In sanitize method, It deal with * and + symbols. Therefore, I think it is not only handling matters related to Zookeeper.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After under discussion, We should remove this sanitize, it is unused in this path.

case DefaultClientIdEntity =>
(None, Some(ClientQuotaManager.DefaultClientIdEntity))
case ExplicitUserExplicitClientIdEntity(user, clientId) =>
(Some(ClientQuotaManager.UserEntity(Sanitizer.sanitize(user))), Some(ClientQuotaManager.ClientIdEntity(Sanitizer.sanitize(clientId))))
case ExplicitUserDefaultClientIdEntity(user) =>
(Some(ClientQuotaManager.UserEntity(Sanitizer.sanitize(user))), Some(ClientQuotaManager.DefaultClientIdEntity))
case DefaultUserExplicitClientIdEntity(clientId) =>
(Some(ClientQuotaManager.DefaultUserEntity), Some(ClientQuotaManager.ClientIdEntity(Sanitizer.sanitize(clientId))))
case DefaultUserDefaultClientIdEntity =>
(Some(ClientQuotaManager.DefaultUserEntity), Some(ClientQuotaManager.DefaultClientIdEntity))
case IpEntity(_) | DefaultIpEntity => throw new IllegalStateException("Should not see IP quota entities here")
}
(sanitizedUser, sanitizedClientId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import kafka.utils.Logging
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, CLIENT_METRICS, GROUP, TOPIC}
import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.server.config.{ConfigType, ZooKeeperInternals}
import org.apache.kafka.server.config.ConfigType
import org.apache.kafka.server.fault.FaultHandler


Expand Down Expand Up @@ -78,7 +78,7 @@ class DynamicConfigPublisher(
// These are stored in KRaft with an empty name field.
info("Updating cluster configuration : " +
toLoggableProps(resource, props).mkString(","))
nodeConfigHandler.processConfigChanges(ZooKeeperInternals.DEFAULT_STRING, props)
nodeConfigHandler.processConfigChanges(resource.name(), props)
} catch {
case t: Throwable => faultHandler.handleFault("Error updating " +
s"cluster with new configuration: ${toLoggableProps(resource, props).mkString(",")} " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -934,8 +934,11 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
// non-default value to trigger a new metric
val clientId = "test-client-1"
servers.foreach { server =>
server.quotaManagers.produce.updateQuota(None, Some(clientId), Some(clientId),
Some(Quota.upperBound(10000000)))
server.quotaManagers.produce.updateQuota(
None,
Some(ClientQuotaManager.ClientIdEntity(clientId)),
Some(Quota.upperBound(10000000))
)
}
val (producerThread, consumerThread) = startProduceConsume(retries = 0, groupProtocol, clientId)
TestUtils.waitUntilTrue(() => consumerThread.received >= 5, "Messages not sent")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
*/
package kafka.server.metadata

import kafka.server.ClientQuotaManager
import org.apache.kafka.image.ClientQuotaDelta
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertThrows}
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertThrows}
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.function.Executable

Expand All @@ -33,4 +34,43 @@ class ClientQuotaMetadataManagerTest {
assertDoesNotThrow { new Executable { def execute(): Unit = manager.handleIpQuota(IpEntity("192.168.1.1"), new ClientQuotaDelta(null)) } }
assertDoesNotThrow { new Executable { def execute(): Unit = manager.handleIpQuota(IpEntity("2001:db8::1"), new ClientQuotaDelta(null)) } }
}

@Test
def testTransferToClientQuotaEntity(): Unit = {

assertThrows(classOf[IllegalStateException],() => ClientQuotaMetadataManager.transferToClientQuotaEntity(IpEntity("a")))
assertThrows(classOf[IllegalStateException],() => ClientQuotaMetadataManager.transferToClientQuotaEntity(DefaultIpEntity))
assertEquals(
(Some(ClientQuotaManager.UserEntity("user")), None),
ClientQuotaMetadataManager.transferToClientQuotaEntity(UserEntity("user"))
)
assertEquals(
(Some(ClientQuotaManager.DefaultUserEntity), None),
ClientQuotaMetadataManager.transferToClientQuotaEntity(DefaultUserEntity)
)
assertEquals(
(None, Some(ClientQuotaManager.ClientIdEntity("client"))),
ClientQuotaMetadataManager.transferToClientQuotaEntity(ClientIdEntity("client"))
)
assertEquals(
(None, Some(ClientQuotaManager.DefaultClientIdEntity)),
ClientQuotaMetadataManager.transferToClientQuotaEntity(DefaultClientIdEntity)
)
assertEquals(
(Some(ClientQuotaManager.UserEntity("user")), Some(ClientQuotaManager.ClientIdEntity("client"))),
ClientQuotaMetadataManager.transferToClientQuotaEntity(ExplicitUserExplicitClientIdEntity("user", "client"))
)
assertEquals(
(Some(ClientQuotaManager.UserEntity("user")), Some(ClientQuotaManager.DefaultClientIdEntity)),
ClientQuotaMetadataManager.transferToClientQuotaEntity(ExplicitUserDefaultClientIdEntity("user"))
)
assertEquals(
(Some(ClientQuotaManager.DefaultUserEntity), Some(ClientQuotaManager.ClientIdEntity("client"))),
ClientQuotaMetadataManager.transferToClientQuotaEntity(DefaultUserExplicitClientIdEntity("client"))
)
assertEquals(
(Some(ClientQuotaManager.DefaultUserEntity), Some(ClientQuotaManager.DefaultClientIdEntity)),
ClientQuotaMetadataManager.transferToClientQuotaEntity(DefaultUserDefaultClientIdEntity)
)
}
}
Loading
Loading