-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18654[2/2]: Transction V2 retry add partitions on the server side when handling produce request. #18810
base: trunk
Are you sure you want to change the base?
Conversation
requestLocal | ||
) | ||
|
||
val retryTimeoutMs = config.addPartitionsToTxnConfig.addPartitionsToTxnMaxTimeoutMs() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want this value to be separate from the request timeout? And if so should it be strictly smaller than that value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think using the request timeout is good enough.
...dinator/src/main/java/org/apache/kafka/coordinator/transaction/AddPartitionsToTxnConfig.java
Outdated
Show resolved
Hide resolved
public final class AddPartitionsToTxnConfig { | ||
// The default config values for the server-side add partition to transaction operations. | ||
public static final String ADD_PARTITIONS_TO_TXN_MAX_TIMEOUT_MS_CONFIG = "add.partitions.to.txn.max.timeout.ms"; | ||
public static final int ADD_PARTITIONS_TO_TXN_MAX_TIMEOUT_MS_DEFAULT = 100; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did we choose this value?
I'm also wondering, if someone wants to turn this feature off, what should the value be? 0 I suppose?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also wondering, if someone wants to turn this feature off, what should the value be? 0 I suppose?
I guess we will not disable this feature only? It is a critical part for the TV2. So we may need to disable TV2 if this feature does not work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timeout should be small enough to not exceed client request timeout (which is controlled by the client, so we cannot make assumptions on the broker), large enough to be longer than a typical time to commit transaction, large enough to not add latency to overall call by making the client retry (the default client backoff is 100ms, so for an outlier case where transaction couldn't complete in 100ms it seems ok if the client does another 100ms backoff).
Setting to 0 will effectively turn off the retries on the broker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We currently don't allow 0 based on this config atLeast(1)
which is why I asked. 1 is effectively the same as zero though probably.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comments, updated to atLeast(0)
"It will not be effective if it is larger than request.timeout.ms"; | ||
public static final String ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_CONFIG = "add.partitions.to.txn.retry.backoff.ms"; | ||
public static final int ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_DEFAULT = 20; | ||
public static final String ADD_PARTITIONS_TO_TXN_RETRY_BACKOFF_MS_DOC = "The retry backoff when the server attempts" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we just mention this is a server-side backoff?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
|
||
if (error != Errors.CONCURRENT_TRANSACTIONS) { | ||
assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error) | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This return is for the not_coordinator case? I wonder if putting in in an if/else format would be a little easier to read then this early return
val result = handleProduceAppend(replicaManager, tp0, transactionalRecords, origin = AppendOrigin.CLIENT, | ||
transactionalId = transactionalId, transactionSupportedOperation = addPartition) | ||
val appendCallback = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback]) | ||
verify(addPartitionsToTxnManager, times(1)).addOrVerifyTransaction( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does the times here account for the previous verify? In other words, should this be 2 or does the counter reset after the first verify is called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the NOT_COORDINATOR case, the addOrVerifyTransaction is only called once.
For the CONCURRENT_TRANSACTIONS, the first verify is consumed with the error. Later addOrVerifyTransaction is called another time which matches with the second verify.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I'm just wondering if the value in the append callback should be 2. Or if calling the first verify resets the counter.
During the transaction commit phase, it is normal to hit CONCURRENT_TRANSACTION error before the transaction markers are fully propagated. Instead of letting the client to retry the produce request, it is better to retry on the server side.
https://issues.apache.org/jira/browse/KAFKA-18654