Skip to content

Commit

Permalink
update the logic
Browse files Browse the repository at this point in the history
  • Loading branch information
m1a2st committed Jan 25, 2025
1 parent e78c476 commit 18d7801
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 27 deletions.
29 changes: 13 additions & 16 deletions core/src/main/scala/kafka/server/ClientQuotaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Rate}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Sanitizer, Time}
import org.apache.kafka.server.config.{ClientQuotaManagerConfig, ZooKeeperInternals}
import org.apache.kafka.server.config.ClientQuotaManagerConfig
import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaType, QuotaType, QuotaUtils, SensorAccess, ThrottleCallback, ThrottledChannel}
import org.apache.kafka.server.util.ShutdownableThread
import org.apache.kafka.network.Session
Expand All @@ -55,7 +55,7 @@ object QuotaTypes {
object ClientQuotaManager {
// Purge sensors after 1 hour of inactivity
val InactiveSensorExpirationTimeSeconds = 3600

private val DefaultString = "<default>"
val DefaultClientIdQuotaEntity: KafkaQuotaEntity = KafkaQuotaEntity(None, Some(DefaultClientIdEntity))
val DefaultUserQuotaEntity: KafkaQuotaEntity = KafkaQuotaEntity(Some(DefaultUserEntity), None)
val DefaultUserClientIdQuotaEntity: KafkaQuotaEntity = KafkaQuotaEntity(Some(DefaultUserEntity), Some(DefaultClientIdEntity))
Expand All @@ -76,13 +76,13 @@ object ClientQuotaManager {

case object DefaultUserEntity extends BaseUserEntity {
override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.DEFAULT_USER
override def name: String = ZooKeeperInternals.DEFAULT_STRING
override def name: String = DefaultString
override def toString: String = "default user"
}

case object DefaultClientIdEntity extends ClientQuotaEntity.ConfigEntity {
override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.DEFAULT_CLIENT_ID
override def name: String = ZooKeeperInternals.DEFAULT_STRING
override def name: String = DefaultString
override def toString: String = "default client-id"
}

Expand All @@ -93,7 +93,7 @@ object ClientQuotaManager {

def sanitizedUser: String = userEntity.map {
case entity: UserEntity => entity.sanitizedUser
case DefaultUserEntity => ZooKeeperInternals.DEFAULT_STRING
case DefaultUserEntity => DefaultUserEntity.name
}.getOrElse("")

def clientId: String = clientIdEntity.map(_.name).getOrElse("")
Expand Down Expand Up @@ -403,12 +403,17 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
* Overrides quotas for <user>, <client-id> or <user, client-id> or the dynamic defaults
* for any of these levels.
*
* @param sanitizedUser user to override if quota applies to <user> or <user, client-id>
* @param clientId client to override if quota applies to <client-id> or <user, client-id>
* @param userEntity user to override if quota applies to <user> or <user, client-id>
* @param clientIdEntity client to override if quota applies to <client-id> or <user, client-id>
* @param sanitizedClientId sanitized client ID to override if quota applies to <client-id> or <user, client-id>
* @param quota custom quota to apply or None if quota override is being removed
*/
def updateQuota(sanitizedUser: Option[String], clientId: Option[String], sanitizedClientId: Option[String], quota: Option[Quota]): Unit = {
def updateQuota(
userEntity: Option[BaseUserEntity],
clientIdEntity: Option[ClientQuotaEntity.ConfigEntity],
sanitizedClientId: Option[ClientQuotaEntity.ConfigEntity],
quota: Option[Quota]
): Unit = {
/*
* Acquire the write lock to apply changes in the quota objects.
* This method changes the quota in the overriddenQuota map and applies the update on the actual KafkaMetric object (if it exists).
Expand All @@ -418,14 +423,6 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
*/
lock.writeLock().lock()
try {
val userEntity = sanitizedUser.map {
case ZooKeeperInternals.DEFAULT_STRING => DefaultUserEntity
case user => UserEntity(user)
}
val clientIdEntity = sanitizedClientId.map {
case ZooKeeperInternals.DEFAULT_STRING => DefaultClientIdEntity
case _ => ClientIdEntity(clientId.getOrElse(throw new IllegalStateException("Client-id not provided")))
}
val quotaEntity = KafkaQuotaEntity(userEntity, clientIdEntity)

if (userEntity.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package kafka.server.metadata

import kafka.network.ConnectionQuotas
import kafka.server.ClientQuotaManager
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.Logging
import org.apache.kafka.common.metrics.Quota
Expand All @@ -26,7 +27,7 @@ import org.apache.kafka.common.utils.Sanitizer

import java.net.{InetAddress, UnknownHostException}
import org.apache.kafka.image.{ClientQuotaDelta, ClientQuotasDelta}
import org.apache.kafka.server.config.{QuotaConfig, ZooKeeperInternals}
import org.apache.kafka.server.config.QuotaConfig

import scala.jdk.OptionConverters.RichOptionalDouble

Expand Down Expand Up @@ -146,22 +147,30 @@ class ClientQuotaMetadataManager(private[metadata] val quotaManagers: QuotaManag

// Convert entity into Options with sanitized values for QuotaManagers
val (sanitizedUser, sanitizedClientId) = quotaEntity match {
case UserEntity(user) => (Some(Sanitizer.sanitize(user)), None)
case DefaultUserEntity => (Some(ZooKeeperInternals.DEFAULT_STRING), None)
case ClientIdEntity(clientId) => (None, Some(Sanitizer.sanitize(clientId)))
case DefaultClientIdEntity => (None, Some(ZooKeeperInternals.DEFAULT_STRING))
case ExplicitUserExplicitClientIdEntity(user, clientId) => (Some(Sanitizer.sanitize(user)), Some(Sanitizer.sanitize(clientId)))
case ExplicitUserDefaultClientIdEntity(user) => (Some(Sanitizer.sanitize(user)), Some(ZooKeeperInternals.DEFAULT_STRING))
case DefaultUserExplicitClientIdEntity(clientId) => (Some(ZooKeeperInternals.DEFAULT_STRING), Some(Sanitizer.sanitize(clientId)))
case DefaultUserDefaultClientIdEntity => (Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING))
case UserEntity(user) =>
(Some(ClientQuotaManager.UserEntity(Sanitizer.sanitize(user))), None)
case DefaultUserEntity =>
(Some(ClientQuotaManager.DefaultUserEntity), None)
case ClientIdEntity(clientId) =>
(None, Some(ClientQuotaManager.ClientIdEntity(Sanitizer.sanitize(clientId))))
case DefaultClientIdEntity =>
(None, Some(ClientQuotaManager.DefaultClientIdEntity))
case ExplicitUserExplicitClientIdEntity(user, clientId) =>
(Some(ClientQuotaManager.UserEntity(Sanitizer.sanitize(user))), Some(ClientQuotaManager.ClientIdEntity(Sanitizer.sanitize(clientId))))
case ExplicitUserDefaultClientIdEntity(user) =>
(Some(ClientQuotaManager.UserEntity(Sanitizer.sanitize(user))), Some(ClientQuotaManager.DefaultClientIdEntity))
case DefaultUserExplicitClientIdEntity(clientId) =>
(Some(ClientQuotaManager.DefaultUserEntity), Some(ClientQuotaManager.ClientIdEntity(Sanitizer.sanitize(clientId))))
case DefaultUserDefaultClientIdEntity =>
(Some(ClientQuotaManager.DefaultUserEntity), Some(ClientQuotaManager.DefaultClientIdEntity))
case IpEntity(_) | DefaultIpEntity => throw new IllegalStateException("Should not see IP quota entities here")
}

val quotaValue = newValue.map(new Quota(_, true))
try {
manager.updateQuota(
sanitizedUser = sanitizedUser,
clientId = sanitizedClientId.map(Sanitizer.desanitize),
userEntity = sanitizedUser,
clientIdEntity = sanitizedClientId.map(id => Sanitizer.desanitize(id.name())).map(ClientQuotaManager.ClientIdEntity),
sanitizedClientId = sanitizedClientId,
quota = quotaValue)
} catch {
Expand Down

0 comments on commit 18d7801

Please sign in to comment.