Skip to content

Commit c8cfb4c

Browse files
authored
KAFKA-17428: Add retry mechanism for cleaning up dangling remote segments (#17335)
This change introduces a retry mechanism for cleaninig up remote segments that failed the copy to remote storage. It also makes sure that we always update the remote segment state whenever we attempt a deletion. When a segment copy fails, we immediately try to delete the segment, but this can also fail. The RLMExpirationTask is now also responsible for retring dangling segments cleanup. This is how a segment state is updated in the above case: 1. COPY_SEGMENT_STARTED (copy task fails) 2. DELETE_SEGMENT_STARTED (copy task cleanup also fails) 3. DELETE_SEGMENT_STARTED (expiration task retries; self state transition) 4. DELETE_SEGMENT_FINISHED (expiration task completes) 5. COPY_SEGMENT_STARTED (copy task retries) 6. COPY_SEGMENT_FINISHED (copy task completes) Signed-off-by: Federico Valeri <[email protected]> Reviewers: Kamal Chandraprakash<[email protected]>, Luke Chen <[email protected]>
1 parent 894c4a9 commit c8cfb4c

File tree

2 files changed

+75
-68
lines changed

2 files changed

+75
-68
lines changed

core/src/main/java/kafka/log/remote/RemoteLogManager.java

+62-55
Original file line numberDiff line numberDiff line change
@@ -984,14 +984,16 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, RemoteLogSegment
984984
brokerTopicStats.topicStats(log.topicPartition().topic()).remoteCopyRequestRate().mark();
985985
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().mark();
986986
Optional<CustomMetadata> customMetadata = Optional.empty();
987+
987988
try {
988989
customMetadata = remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
989990
} catch (RemoteStorageException e) {
991+
logger.info("Copy failed, cleaning segment {}", copySegmentStartedRlsm.remoteLogSegmentId());
990992
try {
991-
remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm);
992-
logger.info("Successfully cleaned segment {} after failing to copy segment", segmentId);
993+
deleteRemoteLogSegment(copySegmentStartedRlsm, ignored -> !isCancelled());
994+
LOGGER.info("Cleanup completed for segment {}", copySegmentStartedRlsm.remoteLogSegmentId());
993995
} catch (RemoteStorageException e1) {
994-
logger.error("Error while cleaning segment {}, consider cleaning manually", segmentId, e1);
996+
LOGGER.info("Cleanup failed, will retry later with segment {}: {}", copySegmentStartedRlsm.remoteLogSegmentId(), e1.getMessage());
995997
}
996998
throw e;
997999
}
@@ -1003,17 +1005,18 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, RemoteLogSegment
10031005
long customMetadataSize = customMetadata.get().value().length;
10041006
if (customMetadataSize > this.customMetadataSizeLimit) {
10051007
CustomMetadataSizeLimitExceededException e = new CustomMetadataSizeLimitExceededException();
1006-
logger.error("Custom metadata size {} exceeds configured limit {}." +
1008+
logger.info("Custom metadata size {} exceeds configured limit {}." +
10071009
" Copying will be stopped and copied segment will be attempted to clean." +
10081010
" Original metadata: {}",
10091011
customMetadataSize, this.customMetadataSizeLimit, copySegmentStartedRlsm, e);
1012+
// For deletion, we provide back the custom metadata by creating a new metadata object from the update.
1013+
// However, the update itself will not be stored in this case.
1014+
RemoteLogSegmentMetadata newMetadata = copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm);
10101015
try {
1011-
// For deletion, we provide back the custom metadata by creating a new metadata object from the update.
1012-
// However, the update itself will not be stored in this case.
1013-
remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm));
1014-
logger.info("Successfully cleaned segment after custom metadata size exceeded");
1016+
deleteRemoteLogSegment(newMetadata, ignored -> !isCancelled());
1017+
LOGGER.info("Cleanup completed for segment {}", newMetadata.remoteLogSegmentId());
10151018
} catch (RemoteStorageException e1) {
1016-
logger.error("Error while cleaning segment after custom metadata size exceeded, consider cleaning manually", e1);
1019+
LOGGER.info("Cleanup failed, will retry later with segment {}: {}", newMetadata.remoteLogSegmentId(), e1.getMessage());
10171020
}
10181021
throw e;
10191022
}
@@ -1070,7 +1073,6 @@ public RLMExpirationTask(TopicIdPartition topicIdPartition) {
10701073

10711074
@Override
10721075
protected void execute(UnifiedLog log) throws InterruptedException, RemoteStorageException, ExecutionException {
1073-
// Cleanup/delete expired remote log segments
10741076
cleanupExpiredRemoteLogSegments();
10751077
}
10761078

@@ -1160,8 +1162,8 @@ private boolean isSegmentBreachByLogStartOffset(RemoteLogSegmentMetadata metadat
11601162
private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry,
11611163
RemoteLogSegmentMetadata metadata)
11621164
throws RemoteStorageException, ExecutionException, InterruptedException {
1163-
boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, ignored ->
1164-
metadata.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch));
1165+
boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
1166+
ignored -> metadata.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch));
11651167
if (isSegmentDeleted) {
11661168
logger.info("Deleted remote log segment {} due to leader-epoch-cache truncation. " +
11671169
"Current earliest-epoch-entry: {}, segment-end-offset: {} and segment-epochs: {}",
@@ -1170,41 +1172,6 @@ private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earl
11701172
// No need to update the log-start-offset as these epochs/offsets are earlier to that value.
11711173
return isSegmentDeleted;
11721174
}
1173-
1174-
private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
1175-
throws RemoteStorageException, ExecutionException, InterruptedException {
1176-
if (predicate.test(segmentMetadata)) {
1177-
logger.debug("Deleting remote log segment {}", segmentMetadata.remoteLogSegmentId());
1178-
1179-
String topic = segmentMetadata.topicIdPartition().topic();
1180-
1181-
// Publish delete segment started event.
1182-
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
1183-
new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
1184-
segmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
1185-
1186-
brokerTopicStats.topicStats(topic).remoteDeleteRequestRate().mark();
1187-
brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().mark();
1188-
1189-
// Delete the segment in remote storage.
1190-
try {
1191-
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
1192-
} catch (RemoteStorageException e) {
1193-
brokerTopicStats.topicStats(topic).failedRemoteDeleteRequestRate().mark();
1194-
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().mark();
1195-
throw e;
1196-
}
1197-
1198-
// Publish delete segment finished event.
1199-
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
1200-
new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
1201-
segmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
1202-
logger.debug("Deleted remote log segment {}", segmentMetadata.remoteLogSegmentId());
1203-
return true;
1204-
}
1205-
return false;
1206-
}
1207-
12081175
}
12091176

12101177
private void updateMetadataCountAndLogSizeWith(int metadataCount, long remoteLogSizeBytes) {
@@ -1221,6 +1188,7 @@ private void updateRemoteDeleteLagWith(int segmentsLeftToDelete, long sizeOfDele
12211188
brokerTopicStats.recordRemoteDeleteLagBytes(topic, partition, sizeOfDeletableSegmentsBytes);
12221189
}
12231190

1191+
/** Cleanup expired and dangling remote log segments. */
12241192
void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException {
12251193
if (isCancelled()) {
12261194
logger.info("Returning from remote log segments cleanup as the task state is changed");
@@ -1297,6 +1265,12 @@ void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionE
12971265
canProcess = false;
12981266
continue;
12991267
}
1268+
// This works as retry mechanism for dangling remote segments that failed the deletion in previous attempts.
1269+
// Rather than waiting for the retention to kick in, we cleanup early to avoid polluting the cache and possibly waste remote storage.
1270+
if (RemoteLogSegmentState.DELETE_SEGMENT_STARTED.equals(metadata.state())) {
1271+
segmentsToDelete.add(metadata);
1272+
continue;
1273+
}
13001274
if (RemoteLogSegmentState.DELETE_SEGMENT_FINISHED.equals(metadata.state())) {
13011275
continue;
13021276
}
@@ -1343,7 +1317,7 @@ void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionE
13431317
updateRemoteDeleteLagWith(segmentsLeftToDelete, sizeOfDeletableSegmentsBytes);
13441318
List<String> undeletedSegments = new ArrayList<>();
13451319
for (RemoteLogSegmentMetadata segmentMetadata : segmentsToDelete) {
1346-
if (!remoteLogRetentionHandler.deleteRemoteLogSegment(segmentMetadata, x -> !isCancelled())) {
1320+
if (!deleteRemoteLogSegment(segmentMetadata, ignored -> !isCancelled())) {
13471321
undeletedSegments.add(segmentMetadata.remoteLogSegmentId().toString());
13481322
} else {
13491323
sizeOfDeletableSegmentsBytes -= segmentMetadata.segmentSizeInBytes();
@@ -1417,13 +1391,11 @@ private Optional<RetentionSizeData> buildRetentionSizeData(long retentionSize,
14171391
Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
14181392
while (segmentsIterator.hasNext()) {
14191393
RemoteLogSegmentMetadata segmentMetadata = segmentsIterator.next();
1420-
// Only count the size of "COPY_SEGMENT_FINISHED" and "DELETE_SEGMENT_STARTED" state segments
1421-
// because "COPY_SEGMENT_STARTED" means copy didn't complete, and "DELETE_SEGMENT_FINISHED" means delete did complete.
1422-
// Note: there might be some "COPY_SEGMENT_STARTED" segments not counted here.
1423-
// Either they are being copied and will be counted next time or they are dangling and will be cleaned elsewhere,
1424-
// either way, this won't cause more segment deletion.
1425-
if (segmentMetadata.state().equals(RemoteLogSegmentState.COPY_SEGMENT_FINISHED) ||
1426-
segmentMetadata.state().equals(RemoteLogSegmentState.DELETE_SEGMENT_STARTED)) {
1394+
// Count only the size of segments in "COPY_SEGMENT_FINISHED" state because
1395+
// "COPY_SEGMENT_STARTED" means copy didn't complete and we will count them later,
1396+
// "DELETE_SEGMENT_STARTED" means deletion failed in the previous attempt and we will retry later,
1397+
// "DELETE_SEGMENT_FINISHED" means deletion completed, so there is nothing to count.
1398+
if (segmentMetadata.state().equals(RemoteLogSegmentState.COPY_SEGMENT_FINISHED)) {
14271399
RemoteLogSegmentId segmentId = segmentMetadata.remoteLogSegmentId();
14281400
if (!visitedSegmentIds.contains(segmentId) && isRemoteSegmentWithinLeaderEpochs(segmentMetadata, logEndOffset, epochEntries)) {
14291401
remoteLogSizeBytes += segmentMetadata.segmentSizeInBytes();
@@ -1463,6 +1435,41 @@ protected void execute(UnifiedLog log) throws InterruptedException, RemoteStorag
14631435
log.updateHighestOffsetInRemoteStorage(offsetAndEpoch.offset());
14641436
}
14651437
}
1438+
1439+
private boolean deleteRemoteLogSegment(
1440+
RemoteLogSegmentMetadata segmentMetadata,
1441+
Predicate<RemoteLogSegmentMetadata> predicate
1442+
) throws RemoteStorageException, ExecutionException, InterruptedException {
1443+
if (predicate.test(segmentMetadata)) {
1444+
LOGGER.debug("Deleting remote log segment {}", segmentMetadata.remoteLogSegmentId());
1445+
String topic = segmentMetadata.topicIdPartition().topic();
1446+
1447+
// Publish delete segment started event.
1448+
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
1449+
new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
1450+
segmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
1451+
1452+
brokerTopicStats.topicStats(topic).remoteDeleteRequestRate().mark();
1453+
brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().mark();
1454+
1455+
// Delete the segment in remote storage.
1456+
try {
1457+
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
1458+
} catch (RemoteStorageException e) {
1459+
brokerTopicStats.topicStats(topic).failedRemoteDeleteRequestRate().mark();
1460+
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().mark();
1461+
throw e;
1462+
}
1463+
1464+
// Publish delete segment finished event.
1465+
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
1466+
new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
1467+
segmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
1468+
LOGGER.debug("Deleted remote log segment {}", segmentMetadata.remoteLogSegmentId());
1469+
return true;
1470+
}
1471+
return false;
1472+
}
14661473

14671474
/**
14681475
* Returns true if the remote segment's epoch/offsets are within the leader epoch lineage of the partition.

0 commit comments

Comments
 (0)