Skip to content

KAFKA-18105 Fix flaky PlaintextAdminIntegrationTest#testElectPreferredLeaders #20068

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: trunk
Choose a base branch
from
Open
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
dc2b489
fix: Add sleep before changePreferredLeader and waitForBrokersOutOfIs…
jim0987795064 Jun 30, 2025
113cca0
refactor: Replace Thread.sleep with waitUntilTrue in changePreferredL…
jim0987795064 Jul 4, 2025
94f10ca
refactor: Rename variable of changePreferredLeader
jim0987795064 Jul 5, 2025
a0f2360
feat: Add TestUtils.waitUntilTrue-based sleepMillis helper
jim0987795064 Jul 6, 2025
dfd30ba
revert: restore TestUtils.scala to original state
jim0987795064 Jul 7, 2025
b2944b5
revert: restore TestUtils.scala to original state
jim0987795064 Jul 7, 2025
4111838
feat: Replace fixed sleep with a metadata propagation check across al…
jim0987795064 Jul 7, 2025
2f1ca60
refactor: Move prior2 into waitUntilTrue
jim0987795064 Jul 7, 2025
75c8d04
revert: Recover the return type of assertLeader, assertNoLeader, and …
jim0987795064 Jul 8, 2025
1b09b82
refactor: Remove redundant code in waitForLeaderToBecome
jim0987795064 Jul 8, 2025
324ba84
refactor: Fix the indent
jim0987795064 Jul 8, 2025
335442c
refactor: Adjust the indent of sleepMillisToPropagateMetadata
jim0987795064 Jul 8, 2025
f8e9870
chore: revert the unnecessary change in TestUtils
jim0987795064 Jul 9, 2025
30e2350
refactor: Remove the unnecessary code in sleepMillisToPropagateMetadata
jim0987795064 Jul 9, 2025
47104c8
refactor: Extract sleepMillisToPropagateMetadata as a standalone help…
jim0987795064 Jul 9, 2025
bc8ca11
chore: Add comment and Rewrite the error message in sleepMillisToprop…
jim0987795064 Jul 10, 2025
a664054
chore: Change sleepMillisToPropagateMetadata to wait until metadata i…
jim0987795064 Jul 14, 2025
4c5df27
chore: Add comment of sleepMillisToPropagateMetadata
jim0987795064 Jul 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import scala.collection.Seq
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters._
import scala.util.{Random, Using}

/**
Expand Down Expand Up @@ -2850,6 +2851,33 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
}

/**
* Waits until the metadata for the given partition has fully propagated
* and become consistent across all brokers.
*
* This method repeatedly checks the leader information for the specified
* TopicPartition in each broker's metadata cache. It compares all brokers'
* views against the leader reported by the head broker. The loop continues
* until all brokers agree on the same leader, ensuring metadata consistency.
*
* This is useful in integration tests where operations such as
* preferred leader election or other metadata updates require
* propagation time before assertions can be made reliably.
*/
def sleepMillisToPropagateMetadata(partition: TopicPartition): Unit = {
var allSynced: Boolean = false

while (!allSynced) {
val prior = brokers.head.metadataCache.getPartitionLeaderEndpoint(partition.topic, partition.partition(), listenerName).get.id()

allSynced = brokers.forall { broker =>
val leaderIdOpt = broker.metadataCache.getPartitionLeaderEndpoint(partition.topic, partition.partition(), listenerName).map(_.id())
val leaderId = leaderIdOpt.toScala
leaderId.contains(prior)
}
}
}

@Test
def testElectPreferredLeaders(): Unit = {
client = createAdminClient
Expand All @@ -2876,12 +2904,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
val prior1 = brokers.head.metadataCache.getPartitionLeaderEndpoint(partition1.topic, partition1.partition(), listenerName).get.id()
val prior2 = brokers.head.metadataCache.getPartitionLeaderEndpoint(partition2.topic, partition2.partition(), listenerName).get.id()

var m = Map.empty[TopicPartition, Optional[NewPartitionReassignment]]
var reassignmentMap = Map.empty[TopicPartition, Optional[NewPartitionReassignment]]
if (prior1 != preferred)
m += partition1 -> Optional.of(new NewPartitionReassignment(newAssignment.map(Int.box).asJava))
reassignmentMap += partition1 -> Optional.of(new NewPartitionReassignment(newAssignment.map(Int.box).asJava))
if (prior2 != preferred)
m += partition2 -> Optional.of(new NewPartitionReassignment(newAssignment.map(Int.box).asJava))
client.alterPartitionReassignments(m.asJava).all().get()
reassignmentMap += partition2 -> Optional.of(new NewPartitionReassignment(newAssignment.map(Int.box).asJava))
client.alterPartitionReassignments(reassignmentMap.asJava).all().get()

TestUtils.waitUntilTrue(
() => preferredLeader(partition1) == preferred && preferredLeader(partition2) == preferred,
Expand Down Expand Up @@ -2909,6 +2937,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
TestUtils.assertLeader(client, partition2, 0)

// Now change the preferred leader to 1
sleepMillisToPropagateMetadata(partition1);
changePreferredLeader(prefer1)

// meaningful election
Expand Down Expand Up @@ -2947,6 +2976,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
TestUtils.assertLeader(client, partition2, 1)

// Now change the preferred leader to 2
sleepMillisToPropagateMetadata(partition1)
sleepMillisToPropagateMetadata(partition2)
changePreferredLeader(prefer2)

// mixed results
Expand All @@ -2963,9 +2994,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
TestUtils.assertLeader(client, partition2, 2)

// Now change the preferred leader to 1
sleepMillisToPropagateMetadata(partition1)
sleepMillisToPropagateMetadata(partition2)
changePreferredLeader(prefer1)
// but shut it down...
killBroker(1)
sleepMillisToPropagateMetadata(partition1)
TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), Set(1))

def assertPreferredLeaderNotAvailable(
Expand Down