diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index 099201ecabb82..25be8880e42c1 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -96,6 +96,7 @@ import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.TopicImage; import org.apache.kafka.server.authorizer.AuthorizableRequestContext; import org.apache.kafka.server.authorizer.Authorizer; import org.apache.kafka.server.record.BrokerCompressionType; @@ -2137,10 +2138,22 @@ public void onPartitionsDeleted( ).get(); // At this point the metadata will not have been updated - // with the deleted topics. - Set topicIds = topicPartitions.stream() - .map(tp -> metadataImage.topics().getTopic(tp.topic()).id()) - .collect(Collectors.toSet()); + // with the deleted topics. However, we must guard against it. + if (metadataImage == null || metadataImage.equals(MetadataImage.EMPTY)) { + return; + } + + Set topicIds = new HashSet<>(); + for (TopicPartition tp : topicPartitions) { + TopicImage image = metadataImage.topics().getTopic(tp.topic()); + if (image != null) { + topicIds.add(image.id()); + } + } + + if (topicIds.isEmpty()) { + return; + } CompletableFuture.allOf( FutureUtils.mapExceptionally( diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index ab186575e7978..39306a9daddbb 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -8020,24 +8020,52 @@ public Optional shareGroupBuildPartitionDeleteR // a retry for the same is possible. Since this is part of an admin operation // retrying delete should not pose issues related to // performance. Also, the share coordinator is idempotent on delete partitions. - Map deletingTopics = shareGroupStatePartitionMetadata.get(shareGroupId).deletingTopics().stream() - .map(tid -> { - TopicImage image = metadataImage.topics().getTopic(tid); - return Map.entry(tid, new InitMapValue(image.name(), image.partitions().keySet(), -1)); - }) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - if (!deletingTopics.isEmpty()) { - log.info("Existing deleting entries found in share group {} - {}", shareGroupId, deletingTopics); - deleteCandidates = combineInitMaps(deleteCandidates, deletingTopics); + Set currentDeleting = shareGroupStatePartitionMetadata.get(shareGroupId).deletingTopics(); + Map deleteRetryCandidates = new HashMap<>(); + Set deletingToIgnore = new HashSet<>(); + if (!currentDeleting.isEmpty()) { + if (metadataImage == null || metadataImage.equals(MetadataImage.EMPTY)) { + deletingToIgnore.addAll(currentDeleting); + } else { + for (Uuid deletingTopicId : currentDeleting) { + TopicImage topicImage = metadataImage.topics().getTopic(deletingTopicId); + if (topicImage == null) { + deletingToIgnore.add(deletingTopicId); + } else { + deleteRetryCandidates.put(deletingTopicId, new InitMapValue(topicImage.name(), topicImage.partitions().keySet(), -1)); + } + } + } } + if (!deletingToIgnore.isEmpty()) { + log.warn("Some topics for share group id {} were not found in the metadata image - {}", shareGroupId, deletingToIgnore); + } + + if (!deleteRetryCandidates.isEmpty()) { + log.info("Existing deleting entries found in share group {} - {}", shareGroupId, deleteRetryCandidates); + deleteCandidates = combineInitMaps(deleteCandidates, deleteRetryCandidates); + } + + // Remove all initializing and initialized topic info from record and add deleting. There + // could be previous deleting topics due to offsets delete, we need to account for them as well. + // If some older deleting topics could not be found in the metadata image, they will be ignored + // and logged. + records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord( + shareGroupId, + Map.of(), + Map.of(), + deleteCandidates.entrySet().stream() + .map(entry -> Map.entry(entry.getKey(), entry.getValue().name())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)) + )); + if (deleteCandidates.isEmpty()) { return Optional.empty(); } List> topicDataList = new ArrayList<>(deleteCandidates.size()); - for (Map.Entry entry : deleteCandidates.entrySet()) { topicDataList.add(new TopicData<>( entry.getKey(), @@ -8047,15 +8075,6 @@ public Optional shareGroupBuildPartitionDeleteR )); } - // Remove all initializing and initialized topic info from record and add deleting. There - // could be previous deleting topics due to offsets delete, we need to account for them as well. - records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord( - shareGroupId, - Map.of(), - Map.of(), - attachTopicName(deleteCandidates.keySet()) - )); - return Optional.of(new DeleteShareGroupStateParameters.Builder() .setGroupTopicPartitionData(new GroupTopicPartitionData.Builder() .setGroupId(shareGroupId) @@ -8247,13 +8266,15 @@ public CoordinatorResult maybeCleanupShareGroupState( shareGroupStatePartitionMetadata.forEach((groupId, metadata) -> { Set initializingDeletedCurrent = new HashSet<>(metadata.initializingTopics().keySet()); Set initializedDeletedCurrent = new HashSet<>(metadata.initializedTopics().keySet()); + Set deletingDeletedCurrent = new HashSet<>(metadata.deletingTopics()); initializingDeletedCurrent.retainAll(deletedTopicIds); initializedDeletedCurrent.retainAll(deletedTopicIds); + deletingDeletedCurrent.retainAll(deletedTopicIds); // The deleted topic ids are neither present in initializing - // not initialized, so we have nothing to do. - if (initializingDeletedCurrent.isEmpty() && initializedDeletedCurrent.isEmpty()) { + // nor in initialized nor in deleting, so we have nothing to do. + if (initializingDeletedCurrent.isEmpty() && initializedDeletedCurrent.isEmpty() && deletingDeletedCurrent.isEmpty()) { return; } @@ -8268,14 +8289,14 @@ public CoordinatorResult maybeCleanupShareGroupState( Map finalInitialized = new HashMap<>(metadata.initializedTopics()); initializedDeletedCurrent.forEach(finalInitialized::remove); - Set deletingTopics = new HashSet<>(metadata.deletingTopics()); - deletingTopics.removeAll(deletedTopicIds); + Set finalDeleting = new HashSet<>(metadata.deletingTopics()); + finalDeleting.removeAll(deletedTopicIds); records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord( groupId, finalInitializing, finalInitialized, - attachTopicName(deletingTopics) + attachTopicName(finalDeleting) )); }); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index 01c87696053a2..7bf85d0861cc9 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -3240,6 +3240,110 @@ public void testOnPartitionsDeletedCleanupShareGroupState() { BufferSupplier.NO_CACHING ) ); + + verify(runtime, times(1)).scheduleWriteAllOperation( + ArgumentMatchers.eq("maybe-cleanup-share-group-state"), + ArgumentMatchers.eq(Duration.ofMillis(5000)), + ArgumentMatchers.any() + ); + } + + @Test + public void testOnPartitionsDeletedCleanupShareGroupStateEmptyMetadata() { + CoordinatorRuntime runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .build(); + service.startup(() -> 3); + + MetadataImage image = new MetadataImageBuilder() + .addTopic(Uuid.randomUuid(), "bar", 1) + .build(); + service.onNewMetadataImage(image, new MetadataDelta(image)); + + // No error in partition deleted callback + when(runtime.scheduleWriteAllOperation( + ArgumentMatchers.eq("on-partition-deleted"), + ArgumentMatchers.eq(Duration.ofMillis(5000)), + ArgumentMatchers.any() + )).thenReturn(List.of( + CompletableFuture.completedFuture(null), + CompletableFuture.completedFuture(null), + CompletableFuture.completedFuture(null) + )); + + when(runtime.scheduleWriteAllOperation( + ArgumentMatchers.eq("maybe-cleanup-share-group-state"), + ArgumentMatchers.eq(Duration.ofMillis(5000)), + ArgumentMatchers.any() + )).thenReturn(List.of( + CompletableFuture.completedFuture(null), + CompletableFuture.completedFuture(null), + CompletableFuture.completedFuture(null) + )); + + // The exception is logged and swallowed. + assertDoesNotThrow(() -> + service.onPartitionsDeleted( + List.of(new TopicPartition("foo", 0)), + BufferSupplier.NO_CACHING + ) + ); + + verify(runtime, times(0)).scheduleWriteAllOperation( + ArgumentMatchers.eq("maybe-cleanup-share-group-state"), + ArgumentMatchers.eq(Duration.ofMillis(5000)), + ArgumentMatchers.any() + ); + } + + @Test + public void testOnPartitionsDeletedCleanupShareGroupStateTopicsNotInMetadata() { + CoordinatorRuntime runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .build(); + service.startup(() -> 3); + + MetadataImage image = MetadataImage.EMPTY; + service.onNewMetadataImage(image, new MetadataDelta(image)); + + // No error in partition deleted callback + when(runtime.scheduleWriteAllOperation( + ArgumentMatchers.eq("on-partition-deleted"), + ArgumentMatchers.eq(Duration.ofMillis(5000)), + ArgumentMatchers.any() + )).thenReturn(List.of( + CompletableFuture.completedFuture(null), + CompletableFuture.completedFuture(null), + CompletableFuture.completedFuture(null) + )); + + when(runtime.scheduleWriteAllOperation( + ArgumentMatchers.eq("maybe-cleanup-share-group-state"), + ArgumentMatchers.eq(Duration.ofMillis(5000)), + ArgumentMatchers.any() + )).thenReturn(List.of( + CompletableFuture.completedFuture(null), + CompletableFuture.completedFuture(null), + CompletableFuture.completedFuture(null) + )); + + // The exception is logged and swallowed. + assertDoesNotThrow(() -> + service.onPartitionsDeleted( + List.of(new TopicPartition("foo", 0)), + BufferSupplier.NO_CACHING + ) + ); + + verify(runtime, times(0)).scheduleWriteAllOperation( + ArgumentMatchers.eq("maybe-cleanup-share-group-state"), + ArgumentMatchers.eq(Duration.ofMillis(5000)), + ArgumentMatchers.any() + ); } @Test diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index bc5afd7704fcf..111a1deb92186 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -21613,6 +21613,135 @@ public void testShareGroupDeleteRequestWithAlreadyDeletingTopics() { assertRecordsEquals(expectedRecords, records); } + @Test + public void testShareGroupDeleteRequestWithAlreadyDeletingTopicsButNotInMetadata() { + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .build(); + + Uuid t1Uuid = Uuid.randomUuid(); + Uuid t2Uuid = Uuid.randomUuid(); + Uuid t3Uuid = Uuid.randomUuid(); + String t1Name = "t1"; + String t2Name = "t2"; + String t3Name = "t3"; + + String groupId = "share-group"; + ShareGroup shareGroup = mock(ShareGroup.class); + when(shareGroup.groupId()).thenReturn(groupId); + when(shareGroup.isEmpty()).thenReturn(false); + + MetadataImage image = new MetadataImageBuilder() + .addTopic(t1Uuid, t1Name, 2) + .addTopic(t2Uuid, t2Name, 2) +// .addTopic(t3Uuid, t3Name, 2) // Simulate deleting topic not present in metadata image. + .build(); + + MetadataDelta delta = new MetadataDelta(image); + context.groupMetadataManager.onNewMetadataImage(image, delta); + + context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0, 0)); + + context.replay( + GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord( + groupId, + Map.of(t1Uuid, new InitMapValue(t1Name, Set.of(0, 1), 1)), + Map.of(t2Uuid, new InitMapValue(t2Name, Set.of(0, 1), 1)), + Map.of(t3Uuid, t3Name) + ) + ); + + context.commit(); + + Map> expectedTopicPartitionMap = Map.of( + t1Uuid, Set.of(0, 1), + t2Uuid, Set.of(0, 1) + ); + + List expectedRecords = List.of( + newShareGroupStatePartitionMetadataRecord( + groupId, + Map.of(), + Map.of(), + Map.of(t1Uuid, t1Name, t2Uuid, t2Name) // Existing deleting topics should be ignored. + ) + ); + + List records = new ArrayList<>(); + Optional params = context.groupMetadataManager.shareGroupBuildPartitionDeleteRequest(groupId, records); + verifyShareGroupDeleteRequest( + params, + expectedTopicPartitionMap, + groupId, + true + ); + assertRecordsEquals(expectedRecords, records); + } + + @Test + public void testShareGroupDeleteRequestWithAlreadyDeletingTopicsButMetadataIsEmpty() { + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .build(); + + Uuid t1Uuid = Uuid.randomUuid(); + Uuid t2Uuid = Uuid.randomUuid(); + Uuid t3Uuid = Uuid.randomUuid(); + String t1Name = "t1"; + String t2Name = "t2"; + String t3Name = "t3"; + + String groupId = "share-group"; + ShareGroup shareGroup = mock(ShareGroup.class); + when(shareGroup.groupId()).thenReturn(groupId); + when(shareGroup.isEmpty()).thenReturn(false); + + MetadataImage image = MetadataImage.EMPTY; + MetadataDelta delta = new MetadataDelta(image); + context.groupMetadataManager.onNewMetadataImage(image, delta); + + context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0, 0)); + + context.replay( + GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord( + groupId, + Map.of(t1Uuid, new InitMapValue(t1Name, Set.of(0, 1), 1)), + Map.of(t2Uuid, new InitMapValue(t2Name, Set.of(0, 1), 1)), + Map.of(t3Uuid, t3Name) + ) + ); + + context.commit(); + + Map> expectedTopicPartitionMap = Map.of( + t1Uuid, Set.of(0, 1), + t2Uuid, Set.of(0, 1) + ); + + List expectedRecords = List.of( + newShareGroupStatePartitionMetadataRecord( + groupId, + Map.of(), + Map.of(), + Map.of(t1Uuid, t1Name, t2Uuid, t2Name) // Existing deleting topics should be ignored. + ) + ); + + List records = new ArrayList<>(); + Optional params = context.groupMetadataManager.shareGroupBuildPartitionDeleteRequest(groupId, records); + verifyShareGroupDeleteRequest( + params, + expectedTopicPartitionMap, + groupId, + true + ); + assertRecordsEquals(expectedRecords, records); + } + @Test public void testSharePartitionsEligibleForOffsetDeletionSuccess() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); @@ -22783,6 +22912,8 @@ public void testMaybeCleanupShareGroupStateInitDeletedTopicsPresent() { String t4Name = "t4"; Uuid t5Id = Uuid.randomUuid(); String t5Name = "t5"; + Uuid t6Id = Uuid.randomUuid(); + String t6Name = "t6"; MetadataImage image = new MetadataImageBuilder() .addTopic(t1Id, t1Name, 2) @@ -22790,6 +22921,7 @@ public void testMaybeCleanupShareGroupStateInitDeletedTopicsPresent() { .addTopic(t3Id, t3Name, 3) .addTopic(t4Id, t4Name, 3) .addTopic(t5Id, t5Name, 3) + .addTopic(t6Id, t6Name, 3) .build(); MetadataDelta delta = new MetadataDelta(image); @@ -22828,7 +22960,10 @@ public void testMaybeCleanupShareGroupStateInitDeletedTopicsPresent() { .setDeletingTopics(List.of( new ShareGroupStatePartitionMetadataValue.TopicInfo() .setTopicId(t5Id) - .setTopicName(t5Name) + .setTopicName(t5Name), + new ShareGroupStatePartitionMetadataValue.TopicInfo() + .setTopicId(t6Id) + .setTopicName(t6Name) )) ); @@ -22861,7 +22996,7 @@ public void testMaybeCleanupShareGroupStateInitDeletedTopicsPresent() { ); CoordinatorResult expectedResult = new CoordinatorResult<>(expectedRecords); - assertEquals(expectedResult, context.groupMetadataManager.maybeCleanupShareGroupState(Set.of(t1Id, t2Id))); + assertEquals(expectedResult, context.groupMetadataManager.maybeCleanupShareGroupState(Set.of(t1Id, t2Id, t6Id))); } @Test