Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18723; Better handle invalid records during replication #18852

Open
wants to merge 14 commits into
base: trunk
Choose a base branch
from

Conversation

jsancio
Copy link
Member

@jsancio jsancio commented Feb 10, 2025

For the KRaft implementation there is a race between the network thread, which read bytes in the log segments, and the KRaft driver thread, which truncates the log and appends records to the log. This race can cause the network thread to send corrupted records or inconsistent records. The corrupted records case is handle by catching and logging the CorruptRecordException. The inconsistent records case is handle by only appending record batches who's partition leader epoch is less than or equal to the fetching replica's epoch and the epoch didn't change between the request and response.

For the ISR implementation there is also a race between the network thread and the replica fetcher thread, which truncates the log and appends records to the log. This race can cause the network thread send corrupted records or inconsistent records. The replica fetcher thread already handles the corrupted record case. The inconsistent records case is handle by only appending record batches who's partition leader epoch is less than or equal to the leader epoch in the FETCH request.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@github-actions github-actions bot added core Kafka Broker kraft build Gradle build or GitHub Actions clients labels Feb 10, 2025
@jsancio jsancio changed the title KAFKA-18723; Better handling invalid records during replication KAFKA-18723; Better handle invalid records during replication Feb 10, 2025
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsancio : Thanks for the PR. Made a pass of non-testing files. Left a few comments.

s"which exceeds the maximum configured value of ${config.maxMessageSize}.")
}
/* During replication of uncommitted data it is possible for the remote replica to send record batches after it lost
* leadership. This can happend if sending FETCH responses is slowed because there is a race between sending the FETCH
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo happend

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

part about sending FETCH response is slow can be read in an inaccurate way - current wording seems to suggest the response is slow because of the race condition. what about instead:
This can happen if sending FETCH responses is slow. There is a race...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed both suggestions.

@@ -333,7 +336,9 @@ abstract class AbstractFetcherThread(name: String,
// In this case, we only want to process the fetch response if the partition state is ready for fetch and
// the current offset is the same as the offset requested.
val fetchPartitionData = sessionPartitions.get(topicPartition)
if (fetchPartitionData != null && fetchPartitionData.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {
if (fetchPartitionData != null &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible that the fetch response is for an old leader epoch. It would be useful to further validate if the leader epoch in the fetch request matches the leader epoch in the current fetch state.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I thought about this when I was implementing the PR. I think we have two options:

  1. Always append up to the currentLeaderEpoch, the FETCH request's currentLeaderEpoch if the request version supports it or the locally recorded currentLeaderEpoch if the FETCH request version doesn't support the currentLeaderEpoch field. This is what this PR implements.
  2. Only append records up to the currentLeaderEpoch if the local replica's currentLeaderEpoch still matches the leader epoch when the FETCH request was created and sent.

I think they are both correct. Option 1 accepts and handles a superset of the FETCH responses that option 2 can handle. I figured that if they are both correct, it is better to progress faster and with less FETCH RPCs. What do you think?

def appendRecordsToFollowerOrFutureReplica(
records: MemoryRecords,
isFuture: Boolean,
maxEpoch: Int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxEpoch => leaderEpochForReplica?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@@ -1159,6 +1177,25 @@ class UnifiedLog(@volatile var logStartOffset: Long,
validBytesCount, lastOffsetOfFirstBatch, Collections.emptyList[RecordError], LeaderHwChange.NONE)
}

/**
* Return true if the record batch should not be appending to the log.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return true if the record batch should not be appending to the log => Return true if the record batch has a higher leader epoch than the replica?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix.

@@ -1786,8 +1788,16 @@ private boolean handleFetchResponse(
}
} else {
Records records = FetchResponse.recordsOrFail(partitionResponse);
if (records.sizeInBytes() > 0) {
try {
// TODO: make sure to test corrupted records in kafka metadata log
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be removed?

appendAsFollower(records);
} catch (CorruptRecordException | InvalidRecordException e) {
// TODO: this should log up to 265 bytes from the records
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this done yet?

Comment on lines 1186 to 1187
* @return true if the append reason is replication and the partition leader epoch is greater
* than the leader epoch, otherwise false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

distinction between partition leader epoch and leader epoch not very clear

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair. I improved the wording.

*/
skipRemainingBatches = skipRemainingBatches || hasInvalidPartitionLeaderEpoch(batch, origin, leaderEpoch);
if (skipRemainingBatches) {
info(s"Skipping batch $batch because origin is $origin and leader epoch is $leaderEpoch")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps log message should also include batch.partitionLeaderEpoch() (e.g. operator can compare the epochs)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. $batch should include the partition leader epoch. This is why I updated the toString implementation for DefaultRecordBatch to include the leader epoch of the batch.

https://github.com/apache/kafka/pull/18852/files#diff-c11736eb30dd10f1b56fb894c3efaf2bc724a9306004a71e8b3bd46d46f26ee5R505

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah missed that you changed the toString impl to include epoch, thanks!

}

@ParameterizedTest
@ArgumentsSource(classOf[InvalidMemoryRecordsProvider])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe I'm missing something - should we also have a test here where we call log.appendAsFollower(records, epoch) where epoch is less than one of the epochs in the records batch? it could be a malformed batch and we could check that the test does not throw CorruptRecordException

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. You are correct, we need to test the case when the leader epoch is invalid. I added tests for that case to MockLogTest, KafkaMetadataLogTest and UnifiedLogTest

return buffer;
}

private static ByteBuffer largeMagic() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should this just be incorrectMagic

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Large magic is one incorrect magic. Another incorrect magic is a negative number. Hence why I added negativeMagic and largeMagic.

Comment on lines 1189 to 1190
* @return true if the append reason is replication and the batch's partition leader epoch is
* greater than the leader epoch, otherwise false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe this could be further adjusted to
and the batch's partition leader epoch is greater than specified leaderEpoch, otherwise false

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsancio : Thanks for the updated PR. A few more comments.

/* During replication of uncommitted data it is possible for the remote replica to send record batches after it lost
* leadership. This can happen if sending FETCH responses is slow. There is a race between sending the FETCH
* response and the replica truncating and appending to the log. The replicating replica resolves this issue by only
* persisting up to the partition leader epoch of the leader when the FETCH request was handled. See KAFKA-18723 for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

persisting up to the partition leader epoch of the leader when the FETCH request was handled => persisting up to the current leader epoch used in the fetch request

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* @return true if the append reason is replication and the batch's partition leader epoch is
* greater than the leader epoch, otherwise false
*/
private def hasInvalidPartitionLeaderEpoch(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hasInvalidPartitionLeaderEpoch => hasHigherPartitionLeaderEpoch ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


Optional<LogAppendInfo> appendInfo = Optional.empty();
try {
appendInfo = Optional.of(log.appendAsFollower(records, quorum.epoch()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quorum.epoch() could change between the fetch request is issued and the fetch response is received, right? If so, we need to use the epoch used when creating the fetch request.

Copy link
Member Author

@jsancio jsancio Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the epoch can change between the request and response. If that happens, the KRaft replica transition states. All state transitions in KRaft reset the request manager: resetConnections. By reseting the request manager any RPC response, including FETCH, that doesn't match the set of pending requests will be ignored.

The other case is that the epoch changed because of the FETCH response being handled. This is handled here. When the epoch in the response is greater than the epoch of the replica, the replica transitions and skips the rest of the FETCH response handling, including appending the contained records.

kafkaRaftMetrics.updateLogEnd(endOffset);
logger.trace("Follower end offset updated to {} after append", endOffset);

appendInfo.ifPresent(
info -> kafkaRaftMetrics.updateFetchedRecords(info.lastOffset - info.firstOffset + 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move this inside the try/catch where appendInfo is created? This avoids the need to make appendInfo an Optional.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

import java.util.stream.Stream;

public final class InvalidMemoryRecordsProvider implements ArgumentsProvider {
// Use a baseOffset that not zero so that is less likely to match the LEO
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that not zero => that's not zero
so that is less likely => so that it is less likely

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@@ -115,6 +125,11 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint,
batches.headOption.map(_.lastOffset).getOrElse(-1)))
}

private def hasInvalidPartitionLeaderEpoch(batch: RecordBatch, leaderEpoch: Int): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hasInvalidPartitionLeaderEpoch => hasHigherPartitionLeaderEpoch?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has this been fixed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. I missed that this was for a different file and implementation.

.setControllerEpoch(0)
.setLeader(2)
.setLeaderEpoch(1)
.setLeaderEpoch(prevLeaderEpoch + 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we change leaderEpoch, the partition epoch should also change, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good catch. Fixed it.

new SimpleRecord("first message".getBytes)
),
isFuture = false,
partitionLeaderEpoch = Int.MaxValue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just use 0 as the leader epoch?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Fixed it. I wanted to make it explicit that this test was "ignoring" the skip higher leader epoch logic.

0L,
Compression.NONE,
pid,
epoch,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

epoch => producerEpoch

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

val log = createLog(logDir, logConfig)
val previousEndOffset = log.logEndOffsetMetadata.messageOffset

// Depedning on the random corruption, unified log sometimes throws and sometimes returns an
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo Depedning

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsancio : Thanks for the updated PR. Just one comment.

@@ -115,6 +125,11 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint,
batches.headOption.map(_.lastOffset).getOrElse(-1)))
}

private def hasInvalidPartitionLeaderEpoch(batch: RecordBatch, leaderEpoch: Int): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has this been fixed?

@junrao
Copy link
Contributor

junrao commented Feb 20, 2025

@jsancio : The following failed tests seem related to this PR?

FAILED ❌ AbstractFetcherThreadTest > testRetryAfterUnknownLeaderEpochInLatestOffsetFetch()
FAILED ❌ AbstractFetcherThreadTest > testFollowerFetchOutOfRangeLow()
FAILED ❌ AbstractFetcherThreadWithIbp26Test > testRetryAfterUnknownLeaderEpochInLatestOffsetFetch()
FAILED ❌ AbstractFetcherThreadWithIbp26Test > testFollowerFetchOutOfRangeLow()

@jsancio
Copy link
Member Author

jsancio commented Feb 21, 2025

@jsancio : The following failed tests seem related to this PR?

FAILED ❌ AbstractFetcherThreadTest > testRetryAfterUnknownLeaderEpochInLatestOffsetFetch()
FAILED ❌ AbstractFetcherThreadTest > testFollowerFetchOutOfRangeLow()
FAILED ❌ AbstractFetcherThreadWithIbp26Test > testRetryAfterUnknownLeaderEpochInLatestOffsetFetch()
FAILED ❌ AbstractFetcherThreadWithIbp26Test > testFollowerFetchOutOfRangeLow()

@junrao Fixed. Also added tests for testing that batches with a "higher partition leader epoch" are not replicated.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsancio : Thanks for the updated PR. Just a minor comment.

}

@Test
void testReplicationOfInvalidPartitionLeaderEpoch() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testReplicationOfInvalidPartitionLeaderEpoch => testReplicationOfHigherPartitionLeaderEpoch ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsancio : Thanks for updated PR. The code LGTM. Are the test failures related?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
build Gradle build or GitHub Actions clients core Kafka Broker kraft performance
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants