diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 1cf2a800de989..edf0ff3bb6fa7 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -36,6 +36,7 @@ import org.apache.kafka.server.config.{ConfigType, QuotaConfig} import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import org.apache.kafka.storage.internals.log.LogConfig +import java.net.{InetAddress, UnknownHostException} import scala.jdk.CollectionConverters._ import scala.collection._ @@ -649,7 +650,7 @@ object ConfigCommand extends Logging { if (hasEntityName && entityTypeVals.contains(ConfigType.IP)) { Seq(entityName, ip).filter(options.has(_)).map(options.valueOf(_)).foreach { ipEntity => - if (!DynamicConfig.Ip.isValidIpEntity(ipEntity)) + if (!isValidIpEntity(ipEntity)) throw new IllegalArgumentException(s"The entity name for ${entityTypeVals.head} must be a valid IP or resolvable host, but it is: $ipEntity") } } @@ -688,4 +689,13 @@ object ConfigCommand extends Logging { } } } + + def isValidIpEntity(ip: String): Boolean = { + try { + InetAddress.getByName(ip) + } catch { + case _: UnknownHostException => return false + } + true + } } diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala index f621d0dbcf437..d8346e6ab85ea 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -29,7 +29,7 @@ import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Rate} import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.{Sanitizer, Time} -import org.apache.kafka.server.config.{ClientQuotaManagerConfig, ZooKeeperInternals} +import org.apache.kafka.server.config.ClientQuotaManagerConfig import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaType, QuotaType, QuotaUtils, SensorAccess, ThrottleCallback, ThrottledChannel} import org.apache.kafka.server.util.ShutdownableThread import org.apache.kafka.network.Session @@ -55,7 +55,7 @@ object QuotaTypes { object ClientQuotaManager { // Purge sensors after 1 hour of inactivity val InactiveSensorExpirationTimeSeconds = 3600 - + private val DefaultName = "" val DefaultClientIdQuotaEntity: KafkaQuotaEntity = KafkaQuotaEntity(None, Some(DefaultClientIdEntity)) val DefaultUserQuotaEntity: KafkaQuotaEntity = KafkaQuotaEntity(Some(DefaultUserEntity), None) val DefaultUserClientIdQuotaEntity: KafkaQuotaEntity = KafkaQuotaEntity(Some(DefaultUserEntity), Some(DefaultClientIdEntity)) @@ -76,13 +76,13 @@ object ClientQuotaManager { case object DefaultUserEntity extends BaseUserEntity { override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.DEFAULT_USER - override def name: String = ZooKeeperInternals.DEFAULT_STRING + override def name: String = DefaultName override def toString: String = "default user" } case object DefaultClientIdEntity extends ClientQuotaEntity.ConfigEntity { override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.DEFAULT_CLIENT_ID - override def name: String = ZooKeeperInternals.DEFAULT_STRING + override def name: String = DefaultName override def toString: String = "default client-id" } @@ -93,7 +93,7 @@ object ClientQuotaManager { def sanitizedUser: String = userEntity.map { case entity: UserEntity => entity.sanitizedUser - case DefaultUserEntity => ZooKeeperInternals.DEFAULT_STRING + case DefaultUserEntity => DefaultName }.getOrElse("") def clientId: String = clientIdEntity.map(_.name).getOrElse("") @@ -260,7 +260,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, */ def unrecordQuotaSensor(request: RequestChannel.Request, value: Double, timeMs: Long): Unit = { val clientSensors = getOrCreateQuotaSensors(request.session, request.header.clientId) - clientSensors.quotaSensor.record(value * (-1), timeMs, false) + clientSensors.quotaSensor.record(value * -1, timeMs, false) } /** @@ -403,12 +403,15 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, * Overrides quotas for , or or the dynamic defaults * for any of these levels. * - * @param sanitizedUser user to override if quota applies to or - * @param clientId client to override if quota applies to or - * @param sanitizedClientId sanitized client ID to override if quota applies to or - * @param quota custom quota to apply or None if quota override is being removed + * @param userEntity user to override if quota applies to or + * @param clientEntity sanitized client entity to override if quota applies to or + * @param quota custom quota to apply or None if quota override is being removed */ - def updateQuota(sanitizedUser: Option[String], clientId: Option[String], sanitizedClientId: Option[String], quota: Option[Quota]): Unit = { + def updateQuota( + userEntity: Option[BaseUserEntity], + clientEntity: Option[ClientQuotaEntity.ConfigEntity], + quota: Option[Quota] + ): Unit = { /* * Acquire the write lock to apply changes in the quota objects. * This method changes the quota in the overriddenQuota map and applies the update on the actual KafkaMetric object (if it exists). @@ -418,29 +421,21 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, */ lock.writeLock().lock() try { - val userEntity = sanitizedUser.map { - case ZooKeeperInternals.DEFAULT_STRING => DefaultUserEntity - case user => UserEntity(user) - } - val clientIdEntity = sanitizedClientId.map { - case ZooKeeperInternals.DEFAULT_STRING => DefaultClientIdEntity - case _ => ClientIdEntity(clientId.getOrElse(throw new IllegalStateException("Client-id not provided"))) - } - val quotaEntity = KafkaQuotaEntity(userEntity, clientIdEntity) + val quotaEntity = KafkaQuotaEntity(userEntity, clientEntity) if (userEntity.nonEmpty) { if (quotaEntity.clientIdEntity.nonEmpty) quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled else quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled - } else if (clientIdEntity.nonEmpty) + } else if (clientEntity.nonEmpty) quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled quota match { case Some(newQuota) => quotaCallback.updateQuota(clientQuotaType, quotaEntity, newQuota.bound) case None => quotaCallback.removeQuota(clientQuotaType, quotaEntity) } - val updatedEntity = if (userEntity.contains(DefaultUserEntity) || clientIdEntity.contains(DefaultClientIdEntity)) + val updatedEntity = if (userEntity.contains(DefaultUserEntity) || clientEntity.contains(DefaultClientIdEntity)) None // more than one entity may need updating, so `updateQuotaMetricConfigs` will go through all metrics else Some(quotaEntity) @@ -452,9 +447,8 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } /** - * Updates metrics configs. This is invoked when quota configs are updated in ZooKeeper - * or when partitions leaders change and custom callbacks that implement partition-based quotas - * have updated quotas. + * Updates metrics configs. This is invoked when quota configs are updated when partitions leaders change + * and custom callbacks that implement partition-based quotas have updated quotas. * * @param updatedQuotaEntity If set to one entity and quotas have only been enabled at one * level, then an optimized update is performed with a single metric update. If None is provided, diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index aba4f9f77672d..0892afcfc6f0e 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -21,7 +21,7 @@ import java.util.{Collections, Properties} import kafka.log.UnifiedLog import kafka.server.QuotaFactory.QuotaManagers import kafka.utils.Logging -import org.apache.kafka.server.config.{QuotaConfig, ZooKeeperInternals} +import org.apache.kafka.server.config.QuotaConfig import org.apache.kafka.common.metrics.Quota._ import org.apache.kafka.coordinator.group.GroupCoordinator import org.apache.kafka.server.ClientMetricsManager @@ -146,7 +146,7 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager, class BrokerConfigHandler(private val brokerConfig: KafkaConfig, private val quotaManagers: QuotaManagers) extends ConfigHandler with Logging { def processConfigChanges(brokerId: String, properties: Properties): Unit = { - if (brokerId == ZooKeeperInternals.DEFAULT_STRING) + if (brokerId.isEmpty) brokerConfig.dynamicConfig.updateDefaultConfig(properties) else if (brokerConfig.brokerId == brokerId.trim.toInt) { brokerConfig.dynamicConfig.updateBrokerConfig(brokerConfig.brokerId, properties) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 5887bd98c1bc7..aed5c57cbeb1d 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -46,12 +46,12 @@ import scala.collection._ import scala.jdk.CollectionConverters._ /** - * Dynamic broker configurations are stored in ZooKeeper and may be defined at two levels: + * Dynamic broker configurations may be defined at two levels: *
    - *
  • Per-broker configs persisted at /configs/brokers/{brokerId}: These can be described/altered - * using AdminClient using the resource name brokerId.
  • - *
  • Cluster-wide defaults persisted at /configs/brokers/<default>: These can be described/altered - * using AdminClient using an empty resource name.
  • + *
  • Per-broker configurations are persisted at the controller and can be described + * or altered using AdminClient with the resource name brokerId.
  • + *
  • Cluster-wide default configurations are persisted at the cluster level and can be + * described or altered using AdminClient with an empty resource name.
  • *
* The order of precedence for broker configs is: *
    @@ -367,7 +367,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging error(s"$errorMessage: $invalidPropNames") } } - removeInvalidProps(nonDynamicConfigs(props), "Non-dynamic configs configured in ZooKeeper will be ignored") + removeInvalidProps(nonDynamicConfigs(props), "Non-dynamic configs will be ignored") removeInvalidProps(securityConfigsWithoutListenerPrefix(props), "Security configs can be dynamically updated only using listener prefix, base configs will be ignored") if (!perBrokerConfig) diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala b/core/src/main/scala/kafka/server/DynamicConfig.scala index 988c567f1cfe7..7a401ec1eb426 100644 --- a/core/src/main/scala/kafka/server/DynamicConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicConfig.scala @@ -19,11 +19,10 @@ package kafka.server import kafka.server.DynamicBrokerConfig.AllDynamicConfigs -import java.net.{InetAddress, UnknownHostException} import java.util.Properties import org.apache.kafka.common.config.ConfigDef import org.apache.kafka.coordinator.group.GroupConfig -import org.apache.kafka.server.config.{QuotaConfig, ZooKeeperInternals} +import org.apache.kafka.server.config.QuotaConfig import java.util import scala.jdk.CollectionConverters._ @@ -83,17 +82,6 @@ object DynamicConfig { def names: util.Set[String] = ipConfigs.names def validate(props: Properties): util.Map[String, AnyRef] = DynamicConfig.validate(ipConfigs, props, customPropsAllowed = false) - - def isValidIpEntity(ip: String): Boolean = { - if (ip != ZooKeeperInternals.DEFAULT_STRING) { - try { - InetAddress.getByName(ip) - } catch { - case _: UnknownHostException => return false - } - } - true - } } object ClientMetrics { diff --git a/core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala b/core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala index a6d3133f18697..8fae9941b4112 100644 --- a/core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala +++ b/core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala @@ -18,15 +18,19 @@ package kafka.server.metadata import kafka.network.ConnectionQuotas +import kafka.server.ClientQuotaManager +import kafka.server.ClientQuotaManager.BaseUserEntity import kafka.server.QuotaFactory.QuotaManagers +import kafka.server.metadata.ClientQuotaMetadataManager.transferToClientQuotaEntity import kafka.utils.Logging import org.apache.kafka.common.metrics.Quota import org.apache.kafka.common.quota.ClientQuotaEntity import org.apache.kafka.common.utils.Sanitizer +import org.apache.kafka.server.quota.ClientQuotaEntity.{ConfigEntity => ClientQuotaConfigEntity} import java.net.{InetAddress, UnknownHostException} import org.apache.kafka.image.{ClientQuotaDelta, ClientQuotasDelta} -import org.apache.kafka.server.config.{QuotaConfig, ZooKeeperInternals} +import org.apache.kafka.server.config.QuotaConfig import scala.jdk.OptionConverters.RichOptionalDouble @@ -145,27 +149,42 @@ class ClientQuotaMetadataManager(private[metadata] val quotaManagers: QuotaManag } // Convert entity into Options with sanitized values for QuotaManagers - val (sanitizedUser, sanitizedClientId) = quotaEntity match { - case UserEntity(user) => (Some(Sanitizer.sanitize(user)), None) - case DefaultUserEntity => (Some(ZooKeeperInternals.DEFAULT_STRING), None) - case ClientIdEntity(clientId) => (None, Some(Sanitizer.sanitize(clientId))) - case DefaultClientIdEntity => (None, Some(ZooKeeperInternals.DEFAULT_STRING)) - case ExplicitUserExplicitClientIdEntity(user, clientId) => (Some(Sanitizer.sanitize(user)), Some(Sanitizer.sanitize(clientId))) - case ExplicitUserDefaultClientIdEntity(user) => (Some(Sanitizer.sanitize(user)), Some(ZooKeeperInternals.DEFAULT_STRING)) - case DefaultUserExplicitClientIdEntity(clientId) => (Some(ZooKeeperInternals.DEFAULT_STRING), Some(Sanitizer.sanitize(clientId))) - case DefaultUserDefaultClientIdEntity => (Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING)) - case IpEntity(_) | DefaultIpEntity => throw new IllegalStateException("Should not see IP quota entities here") - } + val (userEntity, clientEntity) = transferToClientQuotaEntity(quotaEntity) val quotaValue = newValue.map(new Quota(_, true)) try { manager.updateQuota( - sanitizedUser = sanitizedUser, - clientId = sanitizedClientId.map(Sanitizer.desanitize), - sanitizedClientId = sanitizedClientId, - quota = quotaValue) + userEntity = userEntity, + clientEntity = clientEntity, + quota = quotaValue + ) } catch { case t: Throwable => error(s"Failed to update user-client quota $quotaEntity", t) } } } + +object ClientQuotaMetadataManager { + + def transferToClientQuotaEntity(quotaEntity: QuotaEntity): (Option[BaseUserEntity], Option[ClientQuotaConfigEntity]) = { + quotaEntity match { + case UserEntity(user) => + (Some(ClientQuotaManager.UserEntity(Sanitizer.sanitize(user))), None) + case DefaultUserEntity => + (Some(ClientQuotaManager.DefaultUserEntity), None) + case ClientIdEntity(clientId) => + (None, Some(ClientQuotaManager.ClientIdEntity(clientId))) + case DefaultClientIdEntity => + (None, Some(ClientQuotaManager.DefaultClientIdEntity)) + case ExplicitUserExplicitClientIdEntity(user, clientId) => + (Some(ClientQuotaManager.UserEntity(Sanitizer.sanitize(user))), Some(ClientQuotaManager.ClientIdEntity(clientId))) + case ExplicitUserDefaultClientIdEntity(user) => + (Some(ClientQuotaManager.UserEntity(Sanitizer.sanitize(user))), Some(ClientQuotaManager.DefaultClientIdEntity)) + case DefaultUserExplicitClientIdEntity(clientId) => + (Some(ClientQuotaManager.DefaultUserEntity), Some(ClientQuotaManager.ClientIdEntity(clientId))) + case DefaultUserDefaultClientIdEntity => + (Some(ClientQuotaManager.DefaultUserEntity), Some(ClientQuotaManager.DefaultClientIdEntity)) + case IpEntity(_) | DefaultIpEntity => throw new IllegalStateException("Should not see IP quota entities here") + } + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala index 763aa387962ee..6904921fb0cfa 100644 --- a/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala @@ -24,7 +24,7 @@ import kafka.utils.Logging import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, CLIENT_METRICS, GROUP, TOPIC} import org.apache.kafka.image.loader.LoaderManifest import org.apache.kafka.image.{MetadataDelta, MetadataImage} -import org.apache.kafka.server.config.{ConfigType, ZooKeeperInternals} +import org.apache.kafka.server.config.ConfigType import org.apache.kafka.server.fault.FaultHandler @@ -78,7 +78,7 @@ class DynamicConfigPublisher( // These are stored in KRaft with an empty name field. info("Updating cluster configuration : " + toLoggableProps(resource, props).mkString(",")) - nodeConfigHandler.processConfigChanges(ZooKeeperInternals.DEFAULT_STRING, props) + nodeConfigHandler.processConfigChanges(resource.name(), props) } catch { case t: Throwable => faultHandler.handleFault("Error updating " + s"cluster with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 49a0ebc21f4bf..f4de50d7cd8a3 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -934,8 +934,11 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup // non-default value to trigger a new metric val clientId = "test-client-1" servers.foreach { server => - server.quotaManagers.produce.updateQuota(None, Some(clientId), Some(clientId), - Some(Quota.upperBound(10000000))) + server.quotaManagers.produce.updateQuota( + None, + Some(ClientQuotaManager.ClientIdEntity(clientId)), + Some(Quota.upperBound(10000000)) + ) } val (producerThread, consumerThread) = startProduceConsume(retries = 0, groupProtocol, clientId) TestUtils.waitUntilTrue(() => consumerThread.received >= 5, "Messages not sent") diff --git a/core/src/test/scala/kafka/server/metadata/ClientQuotaMetadataManagerTest.scala b/core/src/test/scala/kafka/server/metadata/ClientQuotaMetadataManagerTest.scala index 03d8d30cea056..9d36cae25c239 100644 --- a/core/src/test/scala/kafka/server/metadata/ClientQuotaMetadataManagerTest.scala +++ b/core/src/test/scala/kafka/server/metadata/ClientQuotaMetadataManagerTest.scala @@ -16,8 +16,9 @@ */ package kafka.server.metadata +import kafka.server.ClientQuotaManager import org.apache.kafka.image.ClientQuotaDelta -import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertThrows} +import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertThrows} import org.junit.jupiter.api.Test import org.junit.jupiter.api.function.Executable @@ -33,4 +34,43 @@ class ClientQuotaMetadataManagerTest { assertDoesNotThrow { new Executable { def execute(): Unit = manager.handleIpQuota(IpEntity("192.168.1.1"), new ClientQuotaDelta(null)) } } assertDoesNotThrow { new Executable { def execute(): Unit = manager.handleIpQuota(IpEntity("2001:db8::1"), new ClientQuotaDelta(null)) } } } + + @Test + def testTransferToClientQuotaEntity(): Unit = { + + assertThrows(classOf[IllegalStateException],() => ClientQuotaMetadataManager.transferToClientQuotaEntity(IpEntity("a"))) + assertThrows(classOf[IllegalStateException],() => ClientQuotaMetadataManager.transferToClientQuotaEntity(DefaultIpEntity)) + assertEquals( + (Some(ClientQuotaManager.UserEntity("user")), None), + ClientQuotaMetadataManager.transferToClientQuotaEntity(UserEntity("user")) + ) + assertEquals( + (Some(ClientQuotaManager.DefaultUserEntity), None), + ClientQuotaMetadataManager.transferToClientQuotaEntity(DefaultUserEntity) + ) + assertEquals( + (None, Some(ClientQuotaManager.ClientIdEntity("client"))), + ClientQuotaMetadataManager.transferToClientQuotaEntity(ClientIdEntity("client")) + ) + assertEquals( + (None, Some(ClientQuotaManager.DefaultClientIdEntity)), + ClientQuotaMetadataManager.transferToClientQuotaEntity(DefaultClientIdEntity) + ) + assertEquals( + (Some(ClientQuotaManager.UserEntity("user")), Some(ClientQuotaManager.ClientIdEntity("client"))), + ClientQuotaMetadataManager.transferToClientQuotaEntity(ExplicitUserExplicitClientIdEntity("user", "client")) + ) + assertEquals( + (Some(ClientQuotaManager.UserEntity("user")), Some(ClientQuotaManager.DefaultClientIdEntity)), + ClientQuotaMetadataManager.transferToClientQuotaEntity(ExplicitUserDefaultClientIdEntity("user")) + ) + assertEquals( + (Some(ClientQuotaManager.DefaultUserEntity), Some(ClientQuotaManager.ClientIdEntity("client"))), + ClientQuotaMetadataManager.transferToClientQuotaEntity(DefaultUserExplicitClientIdEntity("client")) + ) + assertEquals( + (Some(ClientQuotaManager.DefaultUserEntity), Some(ClientQuotaManager.DefaultClientIdEntity)), + ClientQuotaMetadataManager.transferToClientQuotaEntity(DefaultUserDefaultClientIdEntity) + ) + } } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala index 4a4107226dcfb..6c268d3c3fbd9 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala @@ -16,11 +16,12 @@ */ package kafka.server +import kafka.server.ClientQuotaManager.BaseUserEntity + import java.net.InetAddress import org.apache.kafka.common.metrics.Quota import org.apache.kafka.common.security.auth.KafkaPrincipal -import org.apache.kafka.common.utils.Sanitizer -import org.apache.kafka.server.config.{ClientQuotaManagerConfig, ZooKeeperInternals} +import org.apache.kafka.server.config.ClientQuotaManagerConfig import org.apache.kafka.network.Session import org.apache.kafka.server.quota.QuotaType import org.junit.jupiter.api.Assertions._ @@ -34,8 +35,16 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { try { // Case 1: Update the quota. Assert that the new quota value is returned - clientQuotaManager.updateQuota(client1.configUser, client1.configClientId, client1.sanitizedConfigClientId, Some(new Quota(2000, true))) - clientQuotaManager.updateQuota(client2.configUser, client2.configClientId, client2.sanitizedConfigClientId, Some(new Quota(4000, true))) + clientQuotaManager.updateQuota( + client1.configUser, + client1.configClientEntity, + Some(new Quota(2000, true)) + ) + clientQuotaManager.updateQuota( + client2.configUser, + client2.configClientEntity, + Some(new Quota(4000, true)) + ) assertEquals(Long.MaxValue.toDouble, clientQuotaManager.quota(randomClient.user, randomClient.clientId).bound, 0.0, "Default producer quota should be " + Long.MaxValue.toDouble) @@ -50,22 +59,38 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { // Case 2: Change quota again. The quota should be updated within KafkaMetrics as well since the sensor was created. // p1 should not longer be throttled after the quota change - clientQuotaManager.updateQuota(client1.configUser, client1.configClientId, client1.sanitizedConfigClientId, Some(new Quota(3000, true))) + clientQuotaManager.updateQuota( + client1.configUser, + client1.configClientEntity, + Some(new Quota(3000, true)) + ) assertEquals(3000, clientQuotaManager.quota(client1.user, client1.clientId).bound, 0.0, "Should return the newly overridden value (3000)") throttleTimeMs = maybeRecord(clientQuotaManager, client1.user, client1.clientId, 0) assertEquals(0, throttleTimeMs, s"throttleTimeMs should be 0. was $throttleTimeMs") // Case 3: Change quota back to default. Should be throttled again - clientQuotaManager.updateQuota(client1.configUser, client1.configClientId, client1.sanitizedConfigClientId, Some(new Quota(500, true))) + clientQuotaManager.updateQuota( + client1.configUser, + client1.configClientEntity, + Some(new Quota(500, true)) + ) assertEquals(500, clientQuotaManager.quota(client1.user, client1.clientId).bound, 0.0, "Should return the default value (500)") throttleTimeMs = maybeRecord(clientQuotaManager, client1.user, client1.clientId, 0) assertTrue(throttleTimeMs > 0, s"throttleTimeMs should be > 0. was $throttleTimeMs") // Case 4: Set high default quota, remove p1 quota. p1 should no longer be throttled - clientQuotaManager.updateQuota(client1.configUser, client1.configClientId, client1.sanitizedConfigClientId, None) - clientQuotaManager.updateQuota(defaultConfigClient.configUser, defaultConfigClient.configClientId, defaultConfigClient.sanitizedConfigClientId, Some(new Quota(4000, true))) + clientQuotaManager.updateQuota( + client1.configUser, + client1.configClientEntity, + None + ) + clientQuotaManager.updateQuota( + defaultConfigClient.configUser, + defaultConfigClient.configClientEntity, + Some(new Quota(4000, true)) + ) assertEquals(4000, clientQuotaManager.quota(client1.user, client1.clientId).bound, 0.0, "Should return the newly overridden value (4000)") throttleTimeMs = maybeRecord(clientQuotaManager, client1.user, client1.clientId, 1000 * config.numQuotaSamples) @@ -76,68 +101,15 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { } } - /** - * Tests parsing for quotas. - * Quota overrides persisted in ZooKeeper in /config/clients/, default persisted in /config/clients/ - */ - @Test - def testClientIdQuotaParsing(): Unit = { - val client1 = UserClient("ANONYMOUS", "p1", None, Some("p1")) - val client2 = UserClient("ANONYMOUS", "p2", None, Some("p2")) - val randomClient = UserClient("ANONYMOUS", "random-client-id", None, None) - val defaultConfigClient = UserClient("", "", None, Some(ZooKeeperInternals.DEFAULT_STRING)) - testQuotaParsing(config, client1, client2, randomClient, defaultConfigClient) - } - - /** - * Tests parsing for quotas. - * Quota overrides persisted in ZooKeeper in /config/users/, default persisted in /config/users/ - */ - @Test - def testUserQuotaParsing(): Unit = { - val client1 = UserClient("User1", "p1", Some("User1"), None) - val client2 = UserClient("User2", "p2", Some("User2"), None) - val randomClient = UserClient("RandomUser", "random-client-id", None, None) - val defaultConfigClient = UserClient("", "", Some(ZooKeeperInternals.DEFAULT_STRING), None) - val config = new ClientQuotaManagerConfig() - testQuotaParsing(config, client1, client2, randomClient, defaultConfigClient) - } - - /** - * Tests parsing for quotas. - * Quotas persisted in ZooKeeper in /config/users//clients/, default in /config/users//clients/ - */ - @Test - def testUserClientIdQuotaParsing(): Unit = { - val client1 = UserClient("User1", "p1", Some("User1"), Some("p1")) - val client2 = UserClient("User2", "p2", Some("User2"), Some("p2")) - val randomClient = UserClient("RandomUser", "random-client-id", None, None) - val defaultConfigClient = UserClient("", "", Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING)) - val config = new ClientQuotaManagerConfig() - testQuotaParsing(config, client1, client2, randomClient, defaultConfigClient) - } - /** * Tests parsing for quotas when client-id default quota properties are set. */ @Test def testUserQuotaParsingWithDefaultClientIdQuota(): Unit = { - val client1 = UserClient("User1", "p1", Some("User1"), None) - val client2 = UserClient("User2", "p2", Some("User2"), None) - val randomClient = UserClient("RandomUser", "random-client-id", None, None) - val defaultConfigClient = UserClient("", "", Some(ZooKeeperInternals.DEFAULT_STRING), None) - testQuotaParsing(config, client1, client2, randomClient, defaultConfigClient) - } - - /** - * Tests parsing for quotas when client-id default quota properties are set. - */ - @Test - def testUserClientQuotaParsingIdWithDefaultClientIdQuota(): Unit = { - val client1 = UserClient("User1", "p1", Some("User1"), Some("p1")) - val client2 = UserClient("User2", "p2", Some("User2"), Some("p2")) + val client1 = UserClient("User1", "p1", Some(ClientQuotaManager.UserEntity("User1")), None) + val client2 = UserClient("User2", "p2", Some(ClientQuotaManager.UserEntity("User2")), None) val randomClient = UserClient("RandomUser", "random-client-id", None, None) - val defaultConfigClient = UserClient("", "", Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING)) + val defaultConfigClient = UserClient("", "", Some(ClientQuotaManager.DefaultUserEntity), None) testQuotaParsing(config, client1, client2, randomClient, defaultConfigClient) } @@ -168,7 +140,11 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { assertEquals(Double.MaxValue, clientQuotaManager.getMaxValueInQuotaWindow(userSession, "client1"), 0.01) // Set default quota config - clientQuotaManager.updateQuota(Some(ZooKeeperInternals.DEFAULT_STRING), None, None, Some(new Quota(10, true))) + clientQuotaManager.updateQuota( + Some(ClientQuotaManager.DefaultUserEntity), + None, + Some(new Quota(10, true)) + ) assertEquals(10 * numFullQuotaWindows, clientQuotaManager.getMaxValueInQuotaWindow(userSession, "client1"), 0.01) } finally { clientQuotaManager.shutdown() @@ -186,11 +162,19 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { checkQuota(clientQuotaManager, "userA", "client1", Long.MaxValue, 1000, expectThrottle = false) // Set default quota config - clientQuotaManager.updateQuota(Some(ZooKeeperInternals.DEFAULT_STRING), None, None, Some(new Quota(10, true))) + clientQuotaManager.updateQuota( + Some(ClientQuotaManager.DefaultUserEntity), + None, + Some(new Quota(10, true)) + ) checkQuota(clientQuotaManager, "userA", "client1", 10, 1000, expectThrottle = true) // Remove default quota config, back to no quotas - clientQuotaManager.updateQuota(Some(ZooKeeperInternals.DEFAULT_STRING), None, None, None) + clientQuotaManager.updateQuota( + Some(ClientQuotaManager.DefaultUserEntity), + None, + None + ) checkQuota(clientQuotaManager, "userA", "client1", Long.MaxValue, 1000, expectThrottle = false) } finally { clientQuotaManager.shutdown() @@ -205,11 +189,19 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { try { // Set quota config - clientQuotaManager.updateQuota(Some("userA"), None, None, Some(new Quota(10, true))) + clientQuotaManager.updateQuota( + Some(ClientQuotaManager.UserEntity("userA")), + None, + Some(new Quota(10, true)) + ) checkQuota(clientQuotaManager, "userA", "client1", 10, 1000, expectThrottle = true) // Remove quota config, back to no quotas - clientQuotaManager.updateQuota(Some("userA"), None, None, None) + clientQuotaManager.updateQuota( + Some(ClientQuotaManager.UserEntity("userA")), + None, + None + ) checkQuota(clientQuotaManager, "userA", "client1", Long.MaxValue, 1000, expectThrottle = false) } finally { clientQuotaManager.shutdown() @@ -224,11 +216,19 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { try { // Set quota config - clientQuotaManager.updateQuota(Some("userA"), Some("client1"), Some("client1"), Some(new Quota(10, true))) + clientQuotaManager.updateQuota( + Some(ClientQuotaManager.UserEntity("userA")), + Some(ClientQuotaManager.ClientIdEntity("client1")), + Some(new Quota(10, true)) + ) checkQuota(clientQuotaManager, "userA", "client1", 10, 1000, expectThrottle = true) // Remove quota config, back to no quotas - clientQuotaManager.updateQuota(Some("userA"), Some("client1"), Some("client1"), None) + clientQuotaManager.updateQuota( + Some(ClientQuotaManager.UserEntity("userA")), + Some(ClientQuotaManager.ClientIdEntity("client1")), + None + ) checkQuota(clientQuotaManager, "userA", "client1", Long.MaxValue, 1000, expectThrottle = false) } finally { clientQuotaManager.shutdown() @@ -241,16 +241,56 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { metrics, QuotaType.PRODUCE, time, "") try { - clientQuotaManager.updateQuota(Some(ZooKeeperInternals.DEFAULT_STRING), None, None, Some(new Quota(1000, true))) - clientQuotaManager.updateQuota(None, Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING), Some(new Quota(2000, true))) - clientQuotaManager.updateQuota(Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING), Some(new Quota(3000, true))) - clientQuotaManager.updateQuota(Some("userA"), None, None, Some(new Quota(4000, true))) - clientQuotaManager.updateQuota(Some("userA"), Some("client1"), Some("client1"), Some(new Quota(5000, true))) - clientQuotaManager.updateQuota(Some("userB"), None, None, Some(new Quota(6000, true))) - clientQuotaManager.updateQuota(Some("userB"), Some("client1"), Some("client1"), Some(new Quota(7000, true))) - clientQuotaManager.updateQuota(Some("userB"), Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING), Some(new Quota(8000, true))) - clientQuotaManager.updateQuota(Some("userC"), None, None, Some(new Quota(10000, true))) - clientQuotaManager.updateQuota(None, Some("client1"), Some("client1"), Some(new Quota(9000, true))) + clientQuotaManager.updateQuota( + Some(ClientQuotaManager.DefaultUserEntity), + None, + Some(new Quota(1000, true)) + ) + clientQuotaManager.updateQuota( + None, + Some(ClientQuotaManager.DefaultClientIdEntity), + Some(new Quota(2000, true)) + ) + clientQuotaManager.updateQuota( + Some(ClientQuotaManager.DefaultUserEntity), + Some(ClientQuotaManager.DefaultClientIdEntity), + Some(new Quota(3000, true)) + ) + clientQuotaManager.updateQuota( + Some(ClientQuotaManager.UserEntity("userA")), + None, + Some(new Quota(4000, true)) + ) + clientQuotaManager.updateQuota( + Some(ClientQuotaManager.UserEntity("userA")), + Some(ClientQuotaManager.ClientIdEntity("client1")), + Some(new Quota(5000, true)) + ) + clientQuotaManager.updateQuota( + Some(ClientQuotaManager.UserEntity("userB")), + None, + Some(new Quota(6000, true)) + ) + clientQuotaManager.updateQuota( + Some(ClientQuotaManager.UserEntity("userB")), + Some(ClientQuotaManager.ClientIdEntity("client1")), + Some(new Quota(7000, true)) + ) + clientQuotaManager.updateQuota( + Some(ClientQuotaManager.UserEntity("userB")), + Some(ClientQuotaManager.DefaultClientIdEntity), + Some(new Quota(8000, true)) + ) + clientQuotaManager.updateQuota( + Some(ClientQuotaManager.UserEntity("userC")), + None, + Some(new Quota(10000, true)) + ) + clientQuotaManager.updateQuota( + None, + Some(ClientQuotaManager.ClientIdEntity("client1")), + Some(new Quota(9000, true)) + ) checkQuota(clientQuotaManager, "userA", "client1", 5000, 4500, expectThrottle = false) // quota takes precedence over checkQuota(clientQuotaManager, "userA", "client2", 4000, 4500, expectThrottle = true) // quota takes precedence over and defaults @@ -266,32 +306,64 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { checkQuota(clientQuotaManager, "userE", "client1", 3000, 2500, expectThrottle = false) // Remove default quota config, revert to default - clientQuotaManager.updateQuota(Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING), None) + clientQuotaManager.updateQuota( + Some(ClientQuotaManager.DefaultUserEntity), + Some(ClientQuotaManager.DefaultClientIdEntity), + None + ) checkQuota(clientQuotaManager, "userD", "client1", 1000, 0, expectThrottle = false) // Metrics tags changed, restart counter checkQuota(clientQuotaManager, "userE", "client4", 1000, 1500, expectThrottle = true) checkQuota(clientQuotaManager, "userF", "client4", 1000, 800, expectThrottle = false) // Default quota shared across clients of user checkQuota(clientQuotaManager, "userF", "client5", 1000, 800, expectThrottle = true) // Remove default quota config, revert to default - clientQuotaManager.updateQuota(Some(ZooKeeperInternals.DEFAULT_STRING), None, None, None) + clientQuotaManager.updateQuota( + Some(ClientQuotaManager.DefaultUserEntity), + None, + None + ) checkQuota(clientQuotaManager, "userF", "client4", 2000, 0, expectThrottle = false) // Default quota shared across client-id of all users checkQuota(clientQuotaManager, "userF", "client5", 2000, 0, expectThrottle = false) checkQuota(clientQuotaManager, "userF", "client5", 2000, 2500, expectThrottle = true) checkQuota(clientQuotaManager, "userG", "client5", 2000, 0, expectThrottle = true) // Update quotas - clientQuotaManager.updateQuota(Some("userA"), None, None, Some(new Quota(8000, true))) - clientQuotaManager.updateQuota(Some("userA"), Some("client1"), Some("client1"), Some(new Quota(10000, true))) + clientQuotaManager.updateQuota( + Some(ClientQuotaManager.UserEntity("userA")), + None, + Some(new Quota(8000, true)) + ) + clientQuotaManager.updateQuota( + Some(ClientQuotaManager.UserEntity("userA")), + Some(ClientQuotaManager.ClientIdEntity("client1")), + Some(new Quota(10000, true)) + ) checkQuota(clientQuotaManager, "userA", "client2", 8000, 0, expectThrottle = false) checkQuota(clientQuotaManager, "userA", "client2", 8000, 4500, expectThrottle = true) // Throttled due to sum of new and earlier values checkQuota(clientQuotaManager, "userA", "client1", 10000, 0, expectThrottle = false) checkQuota(clientQuotaManager, "userA", "client1", 10000, 6000, expectThrottle = true) - clientQuotaManager.updateQuota(Some("userA"), Some("client1"), Some("client1"), None) + clientQuotaManager.updateQuota( + Some(ClientQuotaManager.UserEntity("userA")), + Some(ClientQuotaManager.ClientIdEntity("client1")), + None + ) checkQuota(clientQuotaManager, "userA", "client6", 8000, 0, expectThrottle = true) // Throttled due to shared user quota - clientQuotaManager.updateQuota(Some("userA"), Some("client6"), Some("client6"), Some(new Quota(11000, true))) + clientQuotaManager.updateQuota( + Some(ClientQuotaManager.UserEntity("userA")), + Some(ClientQuotaManager.ClientIdEntity("client6")), + Some(new Quota(11000, true)) + ) checkQuota(clientQuotaManager, "userA", "client6", 11000, 8500, expectThrottle = false) - clientQuotaManager.updateQuota(Some("userA"), Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING), Some(new Quota(12000, true))) - clientQuotaManager.updateQuota(Some("userA"), Some("client6"), Some("client6"), None) + clientQuotaManager.updateQuota( + Some(ClientQuotaManager.UserEntity("userA")), + Some(ClientQuotaManager.DefaultClientIdEntity), + Some(new Quota(12000, true)) + ) + clientQuotaManager.updateQuota( + Some(ClientQuotaManager.UserEntity("userA")), + Some(ClientQuotaManager.ClientIdEntity("client6")), + None + ) checkQuota(clientQuotaManager, "userA", "client6", 12000, 4000, expectThrottle = true) // Throttled due to sum of new and earlier values } finally { @@ -304,10 +376,13 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { val clientQuotaManager = new ClientQuotaManager(config, metrics, QuotaType.PRODUCE, time, "") val queueSizeMetric = metrics.metrics().get(metrics.metricName("queue-size", "Produce", "")) try { - clientQuotaManager.updateQuota(None, Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING), - Some(new Quota(500, true))) + clientQuotaManager.updateQuota( + None, + Some(ClientQuotaManager.DefaultClientIdEntity), + Some(new Quota(500, true)) + ) - // We have 10 second windows. Make sure that there is no quota violation + // We have 10 seconds windows. Make sure that there is no quota violation // if we produce under the quota for (_ <- 0 until 10) { assertEquals(0, maybeRecord(clientQuotaManager, "ANONYMOUS", "unknown", 400)) @@ -352,8 +427,11 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { def testExpireThrottleTimeSensor(): Unit = { val clientQuotaManager = new ClientQuotaManager(config, metrics, QuotaType.PRODUCE, time, "") try { - clientQuotaManager.updateQuota(None, Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING), - Some(new Quota(500, true))) + clientQuotaManager.updateQuota( + None, + Some(ClientQuotaManager.DefaultClientIdEntity), + Some(new Quota(500, true)) + ) maybeRecord(clientQuotaManager, "ANONYMOUS", "client1", 100) // remove the throttle time sensor @@ -374,8 +452,11 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { def testExpireQuotaSensors(): Unit = { val clientQuotaManager = new ClientQuotaManager(config, metrics, QuotaType.PRODUCE, time, "") try { - clientQuotaManager.updateQuota(None, Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING), - Some(new Quota(500, true))) + clientQuotaManager.updateQuota( + None, + Some(ClientQuotaManager.DefaultClientIdEntity), + Some(new Quota(500, true)) + ) maybeRecord(clientQuotaManager, "ANONYMOUS", "client1", 100) // remove all the sensors @@ -401,8 +482,11 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { val clientQuotaManager = new ClientQuotaManager(config, metrics, QuotaType.PRODUCE, time, "") val clientId = "client@#$%" try { - clientQuotaManager.updateQuota(None, Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING), - Some(new Quota(500, true))) + clientQuotaManager.updateQuota( + None, + Some(ClientQuotaManager.DefaultClientIdEntity), + Some(new Quota(500, true)) + ) maybeRecord(clientQuotaManager, "ANONYMOUS", clientId, 100) @@ -417,10 +501,10 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { } } - private case class UserClient(user: String, clientId: String, configUser: Option[String] = None, configClientId: Option[String] = None) { - // The class under test expects only sanitized client configs. We pass both the default value (which should not be - // sanitized to ensure it remains unique) and non-default values, so we need to take care in generating the sanitized - // client ID - def sanitizedConfigClientId = configClientId.map(x => if (x == ZooKeeperInternals.DEFAULT_STRING) ZooKeeperInternals.DEFAULT_STRING else Sanitizer.sanitize(x)) - } + private case class UserClient( + user: String, + clientId: String, + configUser: Option[BaseUserEntity] = None, + configClientEntity: Option[ClientQuotaManager.ClientIdEntity] = None + ) } diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala index cec9289e69140..8c30f749427fc 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala @@ -28,7 +28,7 @@ import org.apache.kafka.common.internals.KafkaFutureImpl import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent} import org.apache.kafka.common.requests.{AlterClientQuotasRequest, AlterClientQuotasResponse, DescribeClientQuotasRequest, DescribeClientQuotasResponse} import org.apache.kafka.common.test.ClusterInstance -import org.apache.kafka.server.config.{QuotaConfig, ZooKeeperInternals} +import org.apache.kafka.server.config.QuotaConfig import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Disabled @@ -517,20 +517,6 @@ class ClientQuotasRequestTest(cluster: ClusterInstance) { )) } - @ClusterTest - def testClientQuotasWithDefaultName(): Unit = { - // An entity using the name associated with the default entity name. The entity's name should be sanitized so - // that it does not conflict with the default entity name. - val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> ZooKeeperInternals.DEFAULT_STRING).asJava) - alterEntityQuotas(entity, Map(QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(20000.0)), validateOnly = false) - verifyDescribeEntityQuotas(entity, Map(QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0)) - - // This should not match. - val result = describeClientQuotas( - ClientQuotaFilter.containsOnly(List(ClientQuotaFilterComponent.ofDefaultEntity(ClientQuotaEntity.CLIENT_ID)).asJava)) - assert(result.isEmpty) - } - private def verifyDescribeEntityQuotas(entity: ClientQuotaEntity, quotas: Map[String, Double]): Unit = { TestUtils.tryUntilNoAssertionError(waitTime = 5000L) { val components = entity.entries.asScala.map { case (entityType, entityName) => diff --git a/core/src/test/scala/unit/kafka/server/ClientRequestQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientRequestQuotaManagerTest.scala index 2b5471653d9a9..368280d235453 100644 --- a/core/src/test/scala/unit/kafka/server/ClientRequestQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientRequestQuotaManagerTest.scala @@ -30,11 +30,15 @@ class ClientRequestQuotaManagerTest extends BaseClientQuotaManagerTest { @Test def testRequestPercentageQuotaViolation(): Unit = { val clientRequestQuotaManager = new ClientRequestQuotaManager(config, metrics, time, "", Optional.empty()) - clientRequestQuotaManager.updateQuota(Some("ANONYMOUS"), Some("test-client"), Some("test-client"), Some(Quota.upperBound(1))) + clientRequestQuotaManager.updateQuota( + Some(ClientQuotaManager.UserEntity("ANONYMOUS")), + Some(ClientQuotaManager.ClientIdEntity("test-client")), + Some(Quota.upperBound(1)) + ) val queueSizeMetric = metrics.metrics().get(metrics.metricName("queue-size", QuotaType.REQUEST.toString, "")) def millisToPercent(millis: Double) = millis * 1000 * 1000 * ClientRequestQuotaManager.NANOS_TO_PERCENTAGE_PER_SECOND try { - // We have 10 second windows. Make sure that there is no quota violation + // We have 10 seconds windows. Make sure that there is no quota violation // if we are under the quota for (_ <- 0 until 10) { assertEquals(0, maybeRecord(clientRequestQuotaManager, "ANONYMOUS", "test-client", millisToPercent(4))) diff --git a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaManagerTest.scala index 8fa4a290cc7a8..2a7cfe35a85b0 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaManagerTest.scala @@ -145,8 +145,11 @@ class ControllerMutationQuotaManagerTest extends BaseClientQuotaManagerTest { @Test def testControllerMutationQuotaViolation(): Unit = { withQuotaManager { quotaManager => - quotaManager.updateQuota(Some(User), Some(ClientId), Some(ClientId), - Some(Quota.upperBound(10))) + quotaManager.updateQuota( + Some(User).map(s => ClientQuotaManager.UserEntity(s)), + Some(ClientQuotaManager.ClientIdEntity(ClientId)), + Some(Quota.upperBound(10)) + ) val queueSizeMetric = metrics.metrics().get( metrics.metricName("queue-size", QuotaType.CONTROLLER_MUTATION.toString, "")) @@ -204,8 +207,11 @@ class ControllerMutationQuotaManagerTest extends BaseClientQuotaManagerTest { @Test def testNewStrictQuotaForReturnsStrictQuotaWhenQuotaIsEnabled(): Unit = { withQuotaManager { quotaManager => - quotaManager.updateQuota(Some(User), Some(ClientId), Some(ClientId), - Some(Quota.upperBound(10))) + quotaManager.updateQuota( + Some(User).map(s => ClientQuotaManager.UserEntity(s)), + Some(ClientQuotaManager.ClientIdEntity(ClientId)), + Some(Quota.upperBound(10)) + ) val quota = quotaManager.newStrictQuotaFor(buildSession(User), ClientId) assertTrue(quota.isInstanceOf[StrictControllerMutationQuota]) @@ -223,8 +229,11 @@ class ControllerMutationQuotaManagerTest extends BaseClientQuotaManagerTest { @Test def testNewPermissiveQuotaForReturnsStrictQuotaWhenQuotaIsEnabled(): Unit = { withQuotaManager { quotaManager => - quotaManager.updateQuota(Some(User), Some(ClientId), Some(ClientId), - Some(Quota.upperBound(10))) + quotaManager.updateQuota( + Some(User).map(s => ClientQuotaManager.UserEntity(s)), + Some(ClientQuotaManager.ClientIdEntity(ClientId)), + Some(Quota.upperBound(10)) + ) val quota = quotaManager.newPermissiveQuotaFor(buildSession(User), ClientId) assertTrue(quota.isInstanceOf[PermissiveControllerMutationQuota]) } diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ZooKeeperInternals.java b/server-common/src/main/java/org/apache/kafka/server/config/ZooKeeperInternals.java deleted file mode 100644 index 72ac563ae52f8..0000000000000 --- a/server-common/src/main/java/org/apache/kafka/server/config/ZooKeeperInternals.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.server.config; - -public class ZooKeeperInternals { - /** - * This string is used in ZooKeeper in several places to indicate a default entity type. - * For example, default user quotas are stored under /config/users/<default> - * Note that AdminClient does not use this to indicate a default, nor do records in KRaft mode. - * This constant will go away in Apache Kafka 4.0 with the end of ZK mode. - */ - public static final String DEFAULT_STRING = ""; -}