-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18530 Remove ZooKeeperInternals #18641
Changes from 20 commits
b7bbdff
a022b0c
6cef55f
72fc80b
8065ee7
ff19654
821acd0
2d16565
fe8b6ca
b81ae52
4ca7f31
a6a52e0
44274d4
e9b1e5b
1927166
b52aaed
b4b727d
a201ae9
dd871e2
13d9b7f
ed6353b
d098405
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,15 +18,19 @@ | |
package kafka.server.metadata | ||
|
||
import kafka.network.ConnectionQuotas | ||
import kafka.server.ClientQuotaManager | ||
import kafka.server.ClientQuotaManager.BaseUserEntity | ||
import kafka.server.QuotaFactory.QuotaManagers | ||
import kafka.server.metadata.ClientQuotaMetadataManager.transferToClientQuotaEntity | ||
import kafka.utils.Logging | ||
import org.apache.kafka.common.metrics.Quota | ||
import org.apache.kafka.common.quota.ClientQuotaEntity | ||
import org.apache.kafka.common.utils.Sanitizer | ||
import org.apache.kafka.server.quota.ClientQuotaEntity.{ConfigEntity => ClientQuotaConfigEntity} | ||
|
||
import java.net.{InetAddress, UnknownHostException} | ||
import org.apache.kafka.image.{ClientQuotaDelta, ClientQuotasDelta} | ||
import org.apache.kafka.server.config.{QuotaConfig, ZooKeeperInternals} | ||
import org.apache.kafka.server.config.QuotaConfig | ||
|
||
import scala.jdk.OptionConverters.RichOptionalDouble | ||
|
||
|
@@ -145,27 +149,42 @@ class ClientQuotaMetadataManager(private[metadata] val quotaManagers: QuotaManag | |
} | ||
|
||
// Convert entity into Options with sanitized values for QuotaManagers | ||
val (sanitizedUser, sanitizedClientId) = quotaEntity match { | ||
case UserEntity(user) => (Some(Sanitizer.sanitize(user)), None) | ||
case DefaultUserEntity => (Some(ZooKeeperInternals.DEFAULT_STRING), None) | ||
case ClientIdEntity(clientId) => (None, Some(Sanitizer.sanitize(clientId))) | ||
case DefaultClientIdEntity => (None, Some(ZooKeeperInternals.DEFAULT_STRING)) | ||
case ExplicitUserExplicitClientIdEntity(user, clientId) => (Some(Sanitizer.sanitize(user)), Some(Sanitizer.sanitize(clientId))) | ||
case ExplicitUserDefaultClientIdEntity(user) => (Some(Sanitizer.sanitize(user)), Some(ZooKeeperInternals.DEFAULT_STRING)) | ||
case DefaultUserExplicitClientIdEntity(clientId) => (Some(ZooKeeperInternals.DEFAULT_STRING), Some(Sanitizer.sanitize(clientId))) | ||
case DefaultUserDefaultClientIdEntity => (Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING)) | ||
case IpEntity(_) | DefaultIpEntity => throw new IllegalStateException("Should not see IP quota entities here") | ||
} | ||
val (userEntity, sanitizedClientEntity) = transferToClientQuotaEntity(quotaEntity) | ||
|
||
val quotaValue = newValue.map(new Quota(_, true)) | ||
try { | ||
manager.updateQuota( | ||
sanitizedUser = sanitizedUser, | ||
clientId = sanitizedClientId.map(Sanitizer.desanitize), | ||
sanitizedClientId = sanitizedClientId, | ||
quota = quotaValue) | ||
userEntity = userEntity, | ||
sanitizedClientEntity = sanitizedClientEntity, | ||
quota = quotaValue | ||
) | ||
} catch { | ||
case t: Throwable => error(s"Failed to update user-client quota $quotaEntity", t) | ||
} | ||
} | ||
} | ||
|
||
object ClientQuotaMetadataManager { | ||
|
||
def transferToClientQuotaEntity(quotaEntity: QuotaEntity): (Option[BaseUserEntity], Option[ClientQuotaConfigEntity]) = { | ||
quotaEntity match { | ||
case UserEntity(user) => | ||
(Some(ClientQuotaManager.UserEntity(Sanitizer.sanitize(user))), None) | ||
case DefaultUserEntity => | ||
(Some(ClientQuotaManager.DefaultUserEntity), None) | ||
case ClientIdEntity(clientId) => | ||
(None, Some(ClientQuotaManager.ClientIdEntity(Sanitizer.sanitize(clientId)))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to sanitize data in kraft mode? the zk path is gone so we don't need to align the data for zk anymore. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After under discussion, We should remove this |
||
case DefaultClientIdEntity => | ||
(None, Some(ClientQuotaManager.DefaultClientIdEntity)) | ||
case ExplicitUserExplicitClientIdEntity(user, clientId) => | ||
(Some(ClientQuotaManager.UserEntity(Sanitizer.sanitize(user))), Some(ClientQuotaManager.ClientIdEntity(Sanitizer.sanitize(clientId)))) | ||
case ExplicitUserDefaultClientIdEntity(user) => | ||
(Some(ClientQuotaManager.UserEntity(Sanitizer.sanitize(user))), Some(ClientQuotaManager.DefaultClientIdEntity)) | ||
case DefaultUserExplicitClientIdEntity(clientId) => | ||
(Some(ClientQuotaManager.DefaultUserEntity), Some(ClientQuotaManager.ClientIdEntity(Sanitizer.sanitize(clientId)))) | ||
case DefaultUserDefaultClientIdEntity => | ||
(Some(ClientQuotaManager.DefaultUserEntity), Some(ClientQuotaManager.DefaultClientIdEntity)) | ||
case IpEntity(_) | DefaultIpEntity => throw new IllegalStateException("Should not see IP quota entities here") | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in kraft mode, the two levels are still existent. could you please revise it instead of removing it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Heya @m1a2st! I have a follow-up ask, while the two levels are still present they will no longer be under the path specified in this file since the path is ZK-specific. As long as we don't use this comment to generate documentation I am okay with this being removed in a subsequent PR. However, if we use this file to generate documentation could you change this as part of this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think those comments in DynamicBrokerConfig are not used to generate documentation, but it will be better to correct it - as least, the description about zk path must be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, then I will review once they have been changed 😊!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for @clolov and @chia7712, update this document. If there is any idea, please let me know