Skip to content

Commit

Permalink
fix some error test
Browse files Browse the repository at this point in the history
  • Loading branch information
m1a2st committed Jan 30, 2025
1 parent 99482b3 commit cf4b307
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 6 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/ClientQuotaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ object QuotaTypes {
object ClientQuotaManager {
// Purge sensors after 1 hour of inactivity
val InactiveSensorExpirationTimeSeconds = 3600
private val DefaultString = "<default>"
val DefaultString = "<default>"
val DefaultClientIdQuotaEntity: KafkaQuotaEntity = KafkaQuotaEntity(None, Some(DefaultClientIdEntity))
val DefaultUserQuotaEntity: KafkaQuotaEntity = KafkaQuotaEntity(Some(DefaultUserEntity), None)
val DefaultUserClientIdQuotaEntity: KafkaQuotaEntity = KafkaQuotaEntity(Some(DefaultUserEntity), Some(DefaultClientIdEntity))
Expand Down Expand Up @@ -423,7 +423,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
*/
lock.writeLock().lock()
try {
val quotaEntity = KafkaQuotaEntity(userEntity, clientIdEntity)
val quotaEntity = KafkaQuotaEntity(userEntity, sanitizedClientId.orElse(clientIdEntity))

if (userEntity.nonEmpty) {
if (quotaEntity.clientIdEntity.nonEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class ClientQuotaMetadataManager(private[metadata] val quotaManagers: QuotaManag
}

// Convert entity into Options with sanitized values for QuotaManagers
val (sanitizedUser, sanitizedClientId) = quotaEntity match {
val (userEntity, sanitizedClientId) = quotaEntity match {
case UserEntity(user) =>
(Some(ClientQuotaManager.UserEntity(Sanitizer.sanitize(user))), None)
case DefaultUserEntity =>
Expand All @@ -169,8 +169,11 @@ class ClientQuotaMetadataManager(private[metadata] val quotaManagers: QuotaManag
val quotaValue = newValue.map(new Quota(_, true))
try {
manager.updateQuota(
userEntity = sanitizedUser,
clientIdEntity = sanitizedClientId.map(id => Sanitizer.desanitize(id.name())).map(ClientQuotaManager.ClientIdEntity),
userEntity = userEntity,
clientIdEntity = sanitizedClientId
.map(id => Sanitizer.desanitize(id.name()))
.map(ClientQuotaManager.ClientIdEntity)
.orElse(throw new IllegalStateException("Client-id not provided")),
sanitizedClientId = sanitizedClientId,
quota = quotaValue)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ class ClientQuotasRequestTest(cluster: ClusterInstance) {
def testClientQuotasWithDefaultName(): Unit = {
// An entity using the name associated with the default entity name. The entity's name should be sanitized so
// that it does not conflict with the default entity name.
val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> "").asJava)
val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> ClientQuotaManager.DefaultString).asJava)
alterEntityQuotas(entity, Map(QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(20000.0)), validateOnly = false)
verifyDescribeEntityQuotas(entity, Map(QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0))

Expand Down

0 comments on commit cf4b307

Please sign in to comment.