diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index c7a45fc8c80d7..02fd3304444e0 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -68,6 +68,7 @@ import scala.collection.Seq import scala.concurrent.duration.Duration import scala.concurrent.{Await, Future} import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters._ import scala.util.{Random, Using} /** @@ -2850,6 +2851,33 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } } + /** + * Waits until the metadata for the given partition has fully propagated + * and become consistent across all brokers. + * + * This method repeatedly checks the leader information for the specified + * TopicPartition in each broker's metadata cache. It compares all brokers' + * views against the leader reported by the head broker. The loop continues + * until all brokers agree on the same leader, ensuring metadata consistency. + * + * This is useful in integration tests where operations such as + * preferred leader election or other metadata updates require + * propagation time before assertions can be made reliably. + */ + def sleepMillisToPropagateMetadata(partition: TopicPartition): Unit = { + var allSynced: Boolean = false + + while (!allSynced) { + val prior = brokers.head.metadataCache.getPartitionLeaderEndpoint(partition.topic, partition.partition(), listenerName).get.id() + + allSynced = brokers.forall { broker => + val leaderIdOpt = broker.metadataCache.getPartitionLeaderEndpoint(partition.topic, partition.partition(), listenerName).map(_.id()) + val leaderId = leaderIdOpt.toScala + leaderId.contains(prior) + } + } + } + @Test def testElectPreferredLeaders(): Unit = { client = createAdminClient @@ -2876,12 +2904,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val prior1 = brokers.head.metadataCache.getPartitionLeaderEndpoint(partition1.topic, partition1.partition(), listenerName).get.id() val prior2 = brokers.head.metadataCache.getPartitionLeaderEndpoint(partition2.topic, partition2.partition(), listenerName).get.id() - var m = Map.empty[TopicPartition, Optional[NewPartitionReassignment]] + var reassignmentMap = Map.empty[TopicPartition, Optional[NewPartitionReassignment]] if (prior1 != preferred) - m += partition1 -> Optional.of(new NewPartitionReassignment(newAssignment.map(Int.box).asJava)) + reassignmentMap += partition1 -> Optional.of(new NewPartitionReassignment(newAssignment.map(Int.box).asJava)) if (prior2 != preferred) - m += partition2 -> Optional.of(new NewPartitionReassignment(newAssignment.map(Int.box).asJava)) - client.alterPartitionReassignments(m.asJava).all().get() + reassignmentMap += partition2 -> Optional.of(new NewPartitionReassignment(newAssignment.map(Int.box).asJava)) + client.alterPartitionReassignments(reassignmentMap.asJava).all().get() TestUtils.waitUntilTrue( () => preferredLeader(partition1) == preferred && preferredLeader(partition2) == preferred, @@ -2909,6 +2937,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { TestUtils.assertLeader(client, partition2, 0) // Now change the preferred leader to 1 + sleepMillisToPropagateMetadata(partition1); changePreferredLeader(prefer1) // meaningful election @@ -2947,6 +2976,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { TestUtils.assertLeader(client, partition2, 1) // Now change the preferred leader to 2 + sleepMillisToPropagateMetadata(partition1) + sleepMillisToPropagateMetadata(partition2) changePreferredLeader(prefer2) // mixed results @@ -2963,9 +2994,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { TestUtils.assertLeader(client, partition2, 2) // Now change the preferred leader to 1 + sleepMillisToPropagateMetadata(partition1) + sleepMillisToPropagateMetadata(partition2) changePreferredLeader(prefer1) // but shut it down... killBroker(1) + sleepMillisToPropagateMetadata(partition1) TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), Set(1)) def assertPreferredLeaderNotAvailable(