Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17182: Consumer fetch sessions are evicted too quickly with AsyncKafkaConsumer #17700

Merged

Conversation

kirktrue
Copy link
Collaborator

@kirktrue kirktrue commented Nov 5, 2024

This change reduces fetch session cache evictions on the broker for AsyncKafkaConsumer by altering its logic to determine which partitions it includes in fetch requests.

Background

Consumer implementations fetch data from the cluster and temporarily buffer it in memory until the user next calls Consumer.poll(). When a fetch request is being generated, partitions that already have buffered data are not included in the fetch request.

The ClassicKafkaConsumer performs much of its fetch logic and network I/O in the application thread. On poll(), if there is any locally-buffered data, the ClassicKafkaConsumer does not fetch any new data and simply returns the buffered data to the user from poll().

On the other hand, the AsyncKafkaConsumer consumer splits its logic and network I/O between two threads, which results in a potential race condition during fetch. The AsyncKafkaConsumer also checks for buffered data on its application thread. If it finds there is none, it signals the background thread to create a fetch request. However, it's possible for the background thread to receive data from a previous fetch and buffer it before the fetch request logic starts. When that occurs, as the background thread creates a new fetch request, it skips any buffered data, which has the unintended result that those partitions get added to the fetch request's "to remove" set. This signals to the broker to remove those partitions from its internal cache.

This issue is technically possible in the ClassicKafkaConsumer too, since the heartbeat thread performs network I/O in addition to the application thread. However, because of the frequency at which the AsyncKafkaConsumer's background thread runs, it is ~100x more likely to happen.

Options

The core decision is: what should the background thread do if it is asked to create a fetch request and it discovers there's buffered data. There were multiple proposals to address this issue in the AsyncKafkaConsumer. Among them are:

  1. The background thread should omit buffered partitions from the fetch request as before (this is the existing behavior)
  2. The background thread should skip the fetch request generation entirely if there are any buffered partitions
  3. The background thread should include buffered partitions in the fetch request, but use a small “max bytes” value
  4. The background thread should skip fetching from the nodes that have buffered partitions

Option 4 won out. The change is localized to AbstractFetch where the basic idea is to skip fetch requests to a given node if that node is the leader for buffered data. By preventing a fetch request from being sent to that node, it won't have any "holes" where the buffered partitions should be.

Testing

Eviction rate testing

Here are the results of our internal stress testing:

  • ClassicKafkaConsumer—after the initial spike during test start up, the average rate settles down to ~0.14 evictions/second CLASSIC
  • AsyncKafkaConsumer, (w/o fix)—after startup, the evictions still settle down, but they are about 100x higher than the ClassicKafkaConsumer at ~1.48 evictions/second CONSUMER-before
  • AsyncKafkaConsumer (w/ fix)—the eviction rate is now closer to the ClassicKafkaConsumer at ~0.22 evictions/second CONSUMER-after

EndToEndLatency testing

The bundled EndToEndLatency test runner was executed on a single machine using Docker. The apache/kafka:latest Docker image was used and either the cluster/combined/plaintext/docker-compose.yml or single-node/plaintext/docker-compose.yml Docker Compose configuration files, depending on the test. The Docker containers were recreated from scratch before each test.

A single topic was created with 30 partitions and with a replication factor of either 1 or 3, depending on a single- or multi-node setup.

For each of the test runs these argument values were used:

  • Message count: 100000
  • acks: 1
  • Message size: 128 bytes

A configuration file which contained a single configuration value of group.protocol=<$group_protocol> was also provided to the test, where $group_protocol was either CLASSIC or CONSUMER.

Test results

Test 1—CLASSIC group protocol, cluster size: 3 nodes, replication factor: 3

Metric trunk PR
Average latency 1.4901 1.4871
50th percentile 1 1
99th percentile 3 3
99.9th percentile 6 6

Test 2—CONSUMER group protocol, cluster size: 3 nodes, replication factor: 3

Metric trunk PR
Average latency 1.4704 1.4807
50th percentile 1 1
99th percentile 3 3
99.9th percentile 6 7

Test 3—CLASSIC group protocol, cluster size: 1 node, replication factor: 1

Metric trunk PR
Average latency 1.0777 1.0193
50th percentile 1 1
99th percentile 2 2
99.9th percentile 5 4

Test 4—CONSUMER group protocol, cluster size: 1 node, replication factor: 1

Metric trunk PR
Average latency 1.0937 1.0503
50th percentile 1 1
99th percentile 2 2
99.9th percentile 4 4

Conclusion

These tests did not reveal any significant differences between the current fetcher logic on trunk and the one proposed in this PR. Addition test runs using larger message counts and/or larger message sizes did not affect the result.

@kirktrue kirktrue changed the title KAFKA-17439: Make polling for new records an explicit action/event in the new consumer KAFKA-17182: Consumer fetch sessions are evicted too quickly with AsyncKafkaConsumer Nov 5, 2024
@kirktrue kirktrue added ctr Consumer Threading Refactor (KIP-848) ci-approved labels Nov 5, 2024
@kirktrue kirktrue added the KIP-848 The Next Generation of the Consumer Rebalance Protocol label Nov 7, 2024
Copy link
Contributor

@jeffkbkim jeffkbkim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! thanks @kirktrue for pushing this through

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kirktrue : Thanks for the updated PR. A few more comments.

//
// Note: this check is not needed for the unbuffered partitions as the logic in
// SubscriptionState.fetchablePartitions() only includes partitions currently assigned.
if (!subscriptions.isAssigned(partition))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An assigned partition doesn't necessarily have a valid position. So, we need to do a stricter check here.

);
subscriptions.position(node0Partition2, leaderlessPosition);

// Both the collected partition and the position without a partition leader should have a retrievable position.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both the collected partition and the position without a partition leader should have a retrievable position. => Both collected partitions should have a retrievable position ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My apologies for the confusing wording. I've revised most of the comments in that method in an effort to improve clarity. PTAL. Thanks.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kirktrue : Thanks for the updated PR. Just a few minor comments.

assertEquals(1, fetcher.fetchBuffer.bufferedPartitions().size());

// Node 0's partitions have all been collected, so validate that and then reset the list of partitions
// from which to fetch data so the next pass should request can fetch more data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the next pass should request can fetch more data. doesn't read well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now reads:

// Validate that all of node 0's partitions have all been collected.
assertTrue(node0Partitions.isEmpty());

// Reset the list of partitions for node 0 so the next fetch pass requests data.
node0Partitions = partitionsForNode(node0, partitions);

// Change the set of assigned partitions to exclude the remaining buffered partition for node 0, which means
// that partition is unassigned.
Set<TopicPartition> topicsWithoutUnassignedPartition = new HashSet<>(partitions);
topicsWithoutUnassignedPartition.remove(node0Partition2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just initialize topicsWithoutUnassignedPartition with node0Partition2?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because topicsWithoutUnassignedPartition hold the original four partitions minus node0Partition2. Regardless, I reworked how this is done in the test to make it clearer.

// Overwrite the position with an empty leader to trigger the test case.
subscriptions.position(node0Partition2, null);

// Both the collected partition and the position without a partition leader should have a retrievable position.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still doesn't read well. Also, the second partition's position is not available.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was changed to:

// Confirm that calling SubscriptionState.position() succeeds for a leaderless partition. While it shouldn't
// throw an exception, it should return a null position.
SubscriptionState.FetchPosition position = assertDoesNotThrow(() -> subscriptions.position(node0Partition2));
assertNull(position);

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kirktrue : Thanks the updated PR. The code LGTM. Are the test failures related?

@kirktrue
Copy link
Collaborator Author

@kirktrue : Thanks the updated PR. The code LGTM. Are the test failures related?

I don't believe they are, no. I'll look at the failures from the current test run and dig around a little to see if others are hitting them too and report back.

Thanks.

@kirktrue
Copy link
Collaborator Author

@junrao—the majority of the errors I see in the latest test run are not related.

The following test failure occurs on both Java 17 and 23, but the issue has been filed several times:

The following tests are flaky, and have issues filed:

  • KAFKA-15474: AbstractCoordinatorTest > testWakeupAfterSyncGroupReceivedExternalCompletion()
  • KAFKA-8031, KAFKA-8032, KAFKA-8107, KAFKA-15960: ClientIdQuotaTest > testQuotaOverrideDelete(String, String).quorum=kraft.groupProtocol=consumer
  • KAFKA-15900, KAFKA-18551, KAFKA-18639, : EagerConsumerCoordinatorTest > testOutdatedCoordinatorAssignment()
  • KAFKA-18298: PlaintextAdminIntegrationTest > testConsumerGroupsDeprecatedConsumerGroupState(String, String).quorum=kraft.groupProtocol=consumer
  • KAFKA-13514: StickyAssignorTest > testLargeAssignmentAndGroupWithUniformSubscription(boolean).hasConsumerRack = false

The only issue that isn't filed is this:

  • StickyAssignorTest > testAssignmentAndGroupWithNonEqualSubscriptionNotTimeout(boolean).hasConsumerRack = false

I'll see if I can reproduce that flaky test locally.

@kirktrue
Copy link
Collaborator Author

kirktrue commented Jan 30, 2025

@junrao—I wasn't able to reproduce the flaky behavior in StickyAssignorTest locally. However, StickyAssignorTest is a proper unit test in that it focuses on the ConsumerPartitionAssignor logic. It doesn't use a KafkaConsumer, so it isn't executing any code related to the fetcher. It's safe to conclude that the failing and flaky tests are unrelated to this change.

@kirktrue
Copy link
Collaborator Author

@junrao—the latest test run has a few flaky tests, but they're all known flaky tests that are filed in Jira.

Are we able to merge this change, or should we wait for green build?

Thanks!

@lianetm
Copy link
Member

lianetm commented Jan 30, 2025

@kirktrue the fix for the failed test here has been merged to trunk and builds are green again. Get the latest changes and we should be green here too

@kirktrue
Copy link
Collaborator Author

@kirktrue the fix for the failed test here has been merged to trunk and builds are green again. Get the latest changes and we should be green here too

Great! Updated my branch and re-running the build. Thanks @lianetm!

@kirktrue
Copy link
Collaborator Author

@junrao @lianetm @jeffkbkim—all green! Can we merge? 🥺

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kirktrue : Thanks for triaging the tests. LGTM

@junrao junrao merged commit 6cf54c4 into apache:trunk Jan 30, 2025
9 checks passed
@kirktrue
Copy link
Collaborator Author

🥳

Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM too, thanks @kirktrue !

@junrao
Copy link
Contributor

junrao commented Jan 30, 2025

@kirktrue : Do we want to create a separate PR to cherry-pick this to 4.0?

@kirktrue
Copy link
Collaborator Author

@kirktrue : Do we want to create a separate PR to cherry-pick this to 4.0?

Yes. Is that step performed by the merge-r or the contributor? Sometimes the person merging to trunk also handles the cherry-pick, sometimes not. Is there a general rule of thumb for that?

Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved clients consumer ctr Consumer Threading Refactor (KIP-848) KIP-848 The Next Generation of the Consumer Rebalance Protocol
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants