forked from apache/kafka
-
Notifications
You must be signed in to change notification settings - Fork 0
[pull] trunk from apache:trunk #1292
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
pull
wants to merge
5,202
commits into
sudotty:trunk
Choose a base branch
from
apache:trunk
base: trunk
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
…0372) issue: #19905 (comment) What: Change `String[] topics` to `Set<String> topics` throughout `LogCompactionTester`. Why: `Set<String>` is more modern and reduces the need for array-to-collection conversions. Reviewers: Ken Huang <[email protected]>, TengYao Chi <[email protected]>, Jhen-Yung Hsu <[email protected]>, Lan Ding <[email protected]>, Kuan-Po Tseng <[email protected]>, Chia-Ping Tsai <[email protected]>
- Changes: Remove fetchQuotaMetrics and copyQuotaMetrics in RemoteLogManager on close from: #20342 (comment) Reviewers: Kamal Chandraprakash <[email protected]>
… for client telemetry (#20144) #### Summary This PR implements dynamic compression type selection and fallback mechanism for client telemetry to handle cases where compression libraries are not available on the client classpath. #### Problem Currently, when a compression library is missing (e.g., NoClassDefFoundError), the client telemetry system catches the generic Throwable but doesn't learn from the failure. This means, the same unsupported compression type will be attempted on every telemetry push #### Solution This PR introduces a comprehensive fallback mechanism: - Specific Exception Handling: Replace generic Throwable catching with specific exceptions (IOException, NoClassDefFoundError) - Unsupported Compression Tracking: Add unsupportedCompressionTypes collection to track compression types that have failed due to missing libraries - Dynamic Selection: Enhance ClientTelemetryUtils.preferredCompressionType() to accept an unsupported types parameter and filter out known problematic compression types - Thread Safety: Use ConcurrentHashMap.newKeySet() for thread-safe access to the unsupported types collection - Improved Logging: Include exception details in log messages for better debugging #### Key Changes - Modified createPushRequest() to track failed compression types in unsupportedCompressionTypes - Updated ClientTelemetryUtils.preferredCompressionType() to filter out unsupported types - Enhanced exception handling with specific exception types instead of Throwable #### Testing - Added appropriate Unit tests - Testing apache kafka on local logs: ``` ✗ cat ~/Desktop/kafka-client.log | grep " org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter" 2025-07-17 07:56:52:602 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry subscription request with client instance id AAAAAAAAAAAAAAAAAAAAAA 2025-07-17 07:56:52:602 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from SUBSCRIPTION_NEEDED to SUBSCRIPTION_IN_PROGRESS 2025-07-17 07:56:52:640 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from SUBSCRIPTION_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:56:52:640 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Telemetry subscription push interval value from broker was 5000; to stagger requests the first push interval is being adjusted to 4551 2025-07-17 07:56:52:640 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Updating subscription - subscription: ClientTelemetrySubscription{clientInstanceId=aVd3fzviRGSgEuAWNY5mMA, subscriptionId=1650084878, pushIntervalMs=5000, acceptedCompressionTypes=[zstd, lz4, snappy, none], deltaTemporality=true, selector=org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils$$Lambda$308/0x00000005011ce470@2f16e398}; intervalMs: 4551, lastRequestMs: 1752739012639 2025-07-17 07:56:52:640 [kafka-producer-network-thread | kr-kafka-producer] INFO org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Client telemetry registered with client instance id: aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:56:57:196 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:56:57:196 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:56:57:224 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Compression library zstd not found, sending uncompressed data at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:722) at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:703) at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createRequest(ClientTelemetryReporter.java:389) 2025-07-17 07:56:57:295 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:02:296 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:02:297 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:02:300 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Compression library lz4 not found, sending uncompressed data at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:722) at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:703) at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createRequest(ClientTelemetryReporter.java:389) 2025-07-17 07:57:02:329 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:07:329 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:07:330 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:07:331 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Compression library snappy not found, sending uncompressed data at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:722) at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createPushRequest(ClientTelemetryReporter.java:703) at org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter$DefaultClientTelemetrySender.createRequest(ClientTelemetryReporter.java:389) 2025-07-17 07:57:07:344 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:12:346 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:12:346 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:12:400 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:17:402 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:17:402 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:17:442 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:22:442 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:22:442 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:22:508 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:27:512 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:27:512 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:27:555 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:32:555 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:32:555 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:32:578 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:37:580 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:37:580 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:37:606 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:42:606 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:42:606 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:42:646 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:47:647 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:47:647 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:47:673 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:52:673 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:52:673 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:52:711 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED 2025-07-17 07:57:57:711 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Creating telemetry push request with client instance id aVd3fzviRGSgEuAWNY5mMA 2025-07-17 07:57:57:711 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_NEEDED to PUSH_IN_PROGRESS 2025-07-17 07:57:57:765 [kafka-producer-network-thread | kr-kafka-producer] DEBUG org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - Setting telemetry state from PUSH_IN_PROGRESS to PUSH_NEEDED ``` Reviewers: poorv Mittal <[email protected]>, Chia-Ping Tsai <[email protected]>
…infra (#20199) **Changes**: Use ClusterTest to rewrite EligibleLeaderReplicasIntegrationTest. **Validation**: Run the test 50 times locally with consistent success. Reviewers: Chia-Ping Tsai <[email protected]>
Upgrade log4j to version 2.25.1 Reviewers: Ismael Juma <[email protected]>, Chia-Ping Tsai <[email protected]>
enhance the description of ControllerId in DescribeClusterResponse.json Reviewers: Chia-Ping Tsai <[email protected]>
This patch adds the include argument to ConsumerPerformance tool. ConsoleConsumer and ConsumerPerformance serve different purposes but share common functionality for message consumption. Currently, there's an inconsistency in their command-line interfaces: - ConsoleConsumer supports an --include argument that allows users to specify a regular expression pattern to filter topics for consumption - ConsumerPerformance lacks this topic filtering capability, requiring users to specify a single topic explicitly via --topic argument This inconsistency creates two problems: - Similar tools should provide similar topic selection capabilities for better user experience - Users cannot test consumer performance across multiple topics or dynamically matching topic sets, making it difficult to test realistic scenarios Reviewers: Chia-Ping Tsai <[email protected]>
Update the KRaft dynamic voter set documentation. In Kafka 4.1, we introduced a powerful new feature that enables seamless migration from a static voter set to a dynamic voter set. Reviewers: Ken Huang <[email protected]>, TengYao Chi <[email protected]>, Chia-Ping Tsai <[email protected]>
### Case 1: no --kafka-url and --kafka-archive Should fail. One of argument (--kafka-url/--kafka-archive) is required. ``` > python docker_build_test.py apache/kafka --image-tag KAFKA-18841 --image-type jvm --build usage: docker_build_test.py [-h] [--image-tag TAG] [--image-type {jvm,native}] [--build] [--test] (--kafka-url KAFKA_URL | --kafka-archive KAFKA_ARCHIVE) image docker_build_test.py: error: one of the arguments --kafka-url/-u --kafka-archive/-a is required ``` ### Case 2: --kafka-url with native ``` > python docker_build_test.py apache/kafka --image-tag KAFKA-18841 --image-type native --kafka-url https://dist.apache.org/repos/dist/dev/kafka/4.0.0-rc0/kafka_2.13-4.0.0.tgz --build ``` ### Case 3: --karka-url with jvm ``` > python docker_build_test.py apache/kafka --image-tag KAFKA-18841 --image-type jvm --kafka-url https://dist.apache.org/repos/dist/dev/kafka/4.0.0-rc0/kafka_2.13-4.0.0.tgz --build ``` ### Case 4: --kafka-archive with native ``` > ./gradlew clean releaseTarGz > cd docker > python docker_build_test.py apache/kafka --image-tag KAFKA-18841 --image-type native --kafka-archive </absolute/path/to/core/build/distributions/kafka_2.13-4.1.0-SNAPSHOT.tgz> --build ``` ### Case 5: --kafka-archive with jvm ``` > ./gradlew clean releaseTarGz > cd docker > python docker_build_test.py apache/kafka --image-tag KAFKA-18841 --image-type jvm --kafka-archive </absolute/path/to/core/build/distributions/kafka_2.13-4.1.0-SNAPSHOT.tgz> --build ``` Reviewers: Vedarth Sharma <[email protected]>, Chia-Ping Tsai <[email protected]>, TengYao Chi <[email protected]> --------- Signed-off-by: PoAn Yang <[email protected]>
…tionMetadata with non-empty topic partitions (#20370) This is followup PR for #19699. * Update TransactionLog#readTxnRecordValue to initialize TransactionMetadata with non-empty topic partitions * Update `TxnTransitMetadata` comment, because it's not immutable. Reviewers: TengYao Chi <[email protected]>, Justine Olshan <[email protected]>, Kuan-Po Tseng <[email protected]>, Chia-Ping Tsai <[email protected]>
A bunch of cleanups in the release scripts Reviewers: Luke Chen <[email protected]>
Offline migration essentially preserves offsets and nothing else. So effectively write tombstones for classic group type when a streams heartbeat is sent to with the group ID of an empty classic group, and write tombstones for the streams group type when a classic consumer attempts to join with a group ID of an empty streams group. Reviewers: Bill Bejeck <[email protected]>, Sean Quah <[email protected]>, Dongnuo Lyu <[email protected]>
…ader is unavailable (#20271) This PR applies the same partition leader check for `StreamsGroupCommand` as `ShareGroupCommand` and `ConsumerGroupCommand` to avoid the command execution timeout. Reviewers: Lucas Brutschy <[email protected]>
Add Unit Tests for an empty follower fetch for various Leader states. | TieredStorage Enabled | Leader Log Start Offset | Leader Local Log Start Offset | Leader Log End Offset | Remarks | |-----------------------|-------------------------|--------------------------------|-----------------------|---------------------------------------| | N | 0 | - | 200 | - | | N | 10 | - | 200 | - | | Y | 0 | 200 | 200 | No segments deleted locally | | Y | 0 | 200 | 100 | Segments uploaded and deleted locally | | Y | 0 | 200 | 200 | All segments deleted locally | | Y | 10 | 10 | 200 | No segments deleted locally | | Y | 10 | 100 | 200 | Segments uploaded and deleted locally | | Y | 10 | 200 | 200 | All segments deleted locally | Reviewers: Kamal Chandraprakash <[email protected]>
…anUnsubscribe to align test case (#20407) `testAsyncConsumerClassicConsumerSubscribeInvalidTopicCanUnsubscribe` does not align with the test case. This patch renames the test name to describe the test case more precisely. Reviewers: TengYao Chi <[email protected]>
…call (#20375) This PR ensures that describeTopics correctly propagates its timeoutMs setting to the underlying describeCluster call. Integration tests were added to verify that the API now fails with a TimeoutException when brokers do not respond within the configured timeout. Reviewers: Ken Huang <[email protected]>, TengYao Chi <[email protected]>, Chia-Ping Tsai <[email protected]>
The PR fixes the batch alignment issue when partitions are re-assigned. During initial read of state the batches can be broken arbitrarily. Say the start offset is 10 and cache contains [15-18] batch during initialization. When fetch happens at offset 10 and say the fetched batch contain 10 records i.e. [10-19] then correct batches will be created if maxFetchRecords is greater than 10. But if maxFetchRecords is less than 10 then last offset of batch is determined, which will be 19. Hence acquire method will incorrectly create a batch of [10-19] while [15-18] already exists. Below check is required t resolve the issue: ``` if (isInitialReadGapOffsetWindowActive() && lastAcquiredOffset > lastOffset) { lastAcquiredOffset = lastOffset; } ``` While testing with other cases, other issues were determined while updating the gap offset, acquire of records prior share partitions end offset and determining next fetch offset with compacted topics. All these issues can arise mainly during initial read window after partition re-assignment. Reviewers: Andrew Schofield <[email protected]>, Abhinav Dixit <[email protected]>, Chirag Wadhwa <[email protected]>
…s is empty (#20410) If `errorResults` is empty, there’s no need to create a new `entriesPerPartition` map. Reviewers: Chia-Ping Tsai <[email protected]>
This PR removes associated logging within NetworkClient to reduce noise and streamline the client code. Reviewers: Ismael Juma <[email protected]>, David Arthur <[email protected]>, Chia-Ping Tsai <[email protected]>
…nfig constants (#20249) This PR aims to add documentation to `alterLogLevelConfigs` method to remind users to use valid LogLevelConfig constants. Reviewers: Chia-Ping Tsai <[email protected]>
…#16584) This is the first part of the implementation of [KIP-1023](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1023%3A+Follower+fetch+from+tiered+offset) The purpose of this pull request is for the broker to start returning the correct offset when it receives a -6 as a timestamp in a ListOffsets API request. Added unit tests for the new timestamp. Reviewers: Kamal Chandraprakash <[email protected]>
Adds example commands for running integration tests from the command line. Reviewers: Chia-Ping Tsai <[email protected]>
#20405) - **Changes**: Replace misused dynamicPerBrokerConfigs with dynamicDefaultConfigs - **Reasons**: KRaft servers don't handle the cluser-level configs in starting from: https://github.com/apache/kafka/pull/18949/files#r2296809389 Reviewers: Jun Rao <[email protected]>, Jhen-Yung Hsu <[email protected]>, PoAn Yang <[email protected]>, Chia-Ping Tsai <[email protected]> --------- Co-authored-by: PoAn Yang <[email protected]>
As per the suggestion by @adixitconfluent and @chirag-wadhwa5, [here](#20395 (comment)), I have refactored the code with variable and method names. Reviewers: Andrew Schofield <[email protected]>, Chirag Wadhwa <[email protected]>
…ducer/consumer (#20390) As described in [jira](https://issues.apache.org/jira/browse/KAFKA-19625), this PR implements replace `consumer.config` and `producer.config` with `command-config` for kafka-verifiable-producer.sh and kafka-verifiable-consumer.sh. Reviewers: Andrew Schofield <[email protected]>
…20236) This PR adds an end-to-end integration tests that validates the Dead Letter Queue (DLQ) feature introduced in [KIP-1034](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams) Reviewers: Lucas Brutschy <[email protected]>
…GlobalStateRestoreListener (#20419) What I observed is that if I run both combinations useNewProtocol=true, useNewProtocol=false it would often fail the second time, but if I only run the second variation useNewProtocol=false it works, and only the first variation useNewProtocol=true also works. So this points to some state that is not cleared between the tests - and indeed, the test creates a topic “inputTopic”, produces to it, but doesn’t delete it, so the second variation will run with produce to it again and then run with twice the data. I also reduced heartbeat interval and session timeout since some of the tests need to wait for the old consumer to leave which (sigh) Kafka Streams doesn't do, so we have to wait that it gets kicked out by session timeout. So previously we waited for 45 seconds, now, we at least wait only 1 second. Reviewers: Bill Bejeck <[email protected]>, Chia-Ping Tsai <[email protected]>
…in the same delta (#20242) There is a small logic bug in topic replay. If a topic is created and then removed before the TopicsDelta is applied, we end up with the deleted topic in createdTopics on the delta. Tis issue is fixed by removing the topicName from createTopics when replaying RemoveTopicRecord to make sure the deleted topics won't appear in createTopics. Reviewers: José Armando García Sancio <[email protected]>, Kevin Wu <[email protected]>, Alyssa Huang <[email protected]>
Now that Kafka support Java 17, this PR makes some changes in connect module. The changes in this PR are limited to only some files. A future PR(s) shall follow. The changes mostly include: - Collections.emptyList(), Collections.singletonList() and Arrays.asList() are replaced with List.of() - Collections.emptyMap() and Collections.singletonMap() are replaced with Map.of() - Collections.singleton() is replaced with Set.of() Modules target: runtime/src/main Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai <[email protected]>
…-->> 9 (#20684) from: #19513 (comment) 1. Fix the task `unitTest` and `integrationTest`; 2. Change the `name` to a method call `name()` for `KeepAliveMode`. Reviewers: Ken Huang <[email protected]>, dejan2609 <[email protected]>, Chia-Ping Tsai <[email protected]>
Clarify the Javadoc for `Node#isFenced` to align with KIP-1073, which introduced the “fenced” field in `DescribeCluster` for brokers. The “fenced” flag applies only to broker nodes returned by `DescribeCluster`. For controller quorum nodes, it doesn’t apply and is always `false`. This clarifies that not every node has a meaningful fenced state. Reviewers: TaiJuWu <[email protected]>, Chia-Ping Tsai <[email protected]>
This PR is a follow-up to KAFKA-18193. It addresses the need for a null check and an improved error message. Please refer to the previous comments and the review of #19955 (review) for more context. Reviewers: Chia-Ping Tsai <[email protected]>
There are some calls to `TestUtils::waitForCondition` with the actual value of the condition being evaluated. These calls must use the overload with a `Supplier` for conditionDetails so that the actual value is lazily evaluated at the time of the condition failing. Reviewers: Chia-Ping Tsai <[email protected]>
…eMode (#20695) In Scala/Java joint compilation, builds that run with `keepAliveMode=SESSION` can fail in the `:core:compileScala` task. This happens when `javac` is given a non-existent Java output directory on its classpath. ``` Unexpected javac output: warning: [path] bad path element ".../core/build/classes/java/main": no such file or directory error: warnings found and -Werror specified ``` This issue occurs after a clean build because `core/build/classes/java/main` does not exist, as Java classes are written to `classes/scala/main` during joint compilation. With `-Werror` enabled, the resulting `[path]` warning is treated as an error and stops the build. This PR adds a workaround scoped to **SESSION** builds, while continuing to investigate the underlying classpath assembly logic during joint compilation. Related discussion: #20684 (comment) Tracked by: [KAFKA-19786](https://issues.apache.org/jira/browse/KAFKA-19786) Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai <[email protected]>
This PR improves error handling for feature argument parsing in the `StorageTool`. The main change is stricter validation of the expected format for feature arguments, ensuring users provide them in the correct `feature=version` format. **Before:** * If user using argument like: `./bin/kafka-storage.sh feature-dependencies --feature group.version`, the following error will occur, which is a bit confusing: <img width="970" height="162" alt="image" src="https://github.com/user-attachments/assets/aa4341f9-6eb8-488e-b88e-f5244560184b" /> **After:** <img width="812" height="55" alt="image" src="https://github.com/user-attachments/assets/9aedecdb-e657-4bd3-8b9b-22d6282a1dc1" /> Reviewers: Chia-Ping Tsai <[email protected]>
…ucer (KIP-1147) (#20673) *What* https://issues.apache.org/jira/browse/KAFKA-19725 - Implement KIP-1147 for ConsoleProducer where we replace --property with --reader-property. - Currently the previous names for the options are still usable but there will be warning message stating those are deprecated and will be removed in a future version. - I have added unit tests and also manually verified using the console tools that things are working as expected. Reviewers: Andrew Schofield <[email protected]>, Jhen-Yung Hsu <[email protected]>
…20692) Cleanup and rewrote more tests in `TaskManagerTest.java` Reviewers: Lucas Brutschy <[email protected]>
…LE (#20600) Streams groups sometimes describe as NOT_READY when STABLE. That is, the group is configured and all topics exist, but when you use LIST_GROUP and STREAMS_GROUP_DESCRIBE, the group will show up as not ready. The root cause seems to be that #19802 moved the creation of the soft state configured topology from the replay path to the heartbeat. This way, LIST_GROUP and STREAMS_GROUP_DESCRIBE, may not show the configured topology, because the configured topology that is created in the heartbeat is "thrown away", and the new group is recreated on the replay-path. To reflect a consistent view of the topology via LIST_GROUP and STREAMS_GROUP_DESCRIBE, we need to store additional information in the consumer offset topic. In particular, we need to store at least whether a topology was validated against the current topic metadata, as this defines whether a group is in STABLE and not in NOT_READY. This change adds a new field `validatedTopologyEpoch` to the metadata of the group, which stores precisely this information. Reviewers: Matthias J. Sax <[email protected]>
Make some internal methods `public` for third-party users who want to hook into this API directly (even if they are not advised to do this; using internal API is at their own risk). Reviewers: Matthias J. Sax <[email protected]>
This pull request enhances the StreamsGroupCommand to return appropriate exit codes, improving its reliability for scripting and automation. - Exception handling is now consistently managed within the execute() helper method. - The main() explicitly calls Exit.exit(0) on success and Exit.exit(1) on failure. - Remove unused method `org.apache.kafka.tools.streams.StreamsGroupCommandTest#describeTopicsResult` Reviewers: Chia-Ping Tsai <[email protected]>, Matthias J. Sax <[email protected]>
updates the Javadoc for `org.apache.kafka.clients.consumer.OffsetResetStrategy` to clearly state that it is deprecated for removal with no replacement. Reviewers: Matthias J. Sax <[email protected]>
… metrics (#20464) - Background Add new metrics for TasksLost, TasksAssigned and TasksRevoked in `StreamsRebalanceListener`. - Features Implemented Added three rebalance latency metrics to DefaultStreamsRebalanceListener: - `tasks-revoked-latency-avg/max` - Average/max latency when revoking tasks - `tasks-assigned-latency-avg/max` - Average/max latency when assigning tasks - `tasks-lost-latency-avg/max` - Average/max latency when losing tasks Main Changes 1. Created `RebalanceMetrics.java` - Follows existing Streams ThreadMetrics pattern with static factory methods - Provides three methods to create sensors: `tasksRevokedSensor()`, `tasksAssignedSensor()`, `tasksLostSensor()` 2. Modified DefaultStreamsRebalanceListener.java - Added three Sensor member variables - Updated constructor to accept `StreamsMetricsImpl` and threadId parameters - Records execution time in `onTasksRevoked()`, `onTasksAssigned()`, and `onAllTasksLost()` methods 3. Modified StreamThread.java - Pass `streamsMetrics` and `getName()` when creating `DefaultStreamsRebalanceListener` 4. Added Comprehensive Tests - `DefaultStreamsRebalanceListenerTest `- Added 6 tests to verify metrics recording (both normal and exception cases) - `RebalanceMetricsTest` - Verifies sensor creation logic Reviewers: Lucas Brutschy <[email protected]>
Broker heap memory gets filled up and throws OOM error when remote reads are triggered for multiple partitions within a FETCH request. Steps to reproduce: 1. Start a one node broker and configure LocalTieredStorage as remote storage. 2. Create a topic with 5 partitions. 3. Produce message and ensure that few segments are uploaded to remote. 4. Start a consumer to read from those 5 partitions. Seek the offset to beginning for 4 partitions and to end for 1 partition. This is to simulate that the FETCH request read from both remote-log and local-log. 5. The broker crashes with the OOM error. 6. The DelayedRemoteFetch / RemoteLogReadResult references are being held by the purgatory, so the broker crashes. Reviewers: Luke Chen <[email protected]>, Satish Duggana <[email protected]>
The PR just tidies up the comments in the MetadataVersion class to do with using placeholder metadata versions for a future release as a way to use features to enable share groups and streams groups as previews in AK 4.1. The metadata versions are simply unstable 4.2 MVs now, which is entirely usual at this stage of the release. Reviewers: Chia-Ping Tsai <[email protected]>
…available. (#20665) Before we added caching for consumer next offsets we'd called `mainConsumer.position` and always expected something back. When we added the caching, we kept the check that we always have nextOffset, but as the logic changed to fetching the offsets from poll, we may not have anything for topics that have no messages. This PR accounts for that. Reviewers: Lucas Brutschy <[email protected]>, Matthias J. Sax <[email protected]>
…is present in the first partition. (#20706) When the FETCH request read from remote log for multiple partitions, then all the partitions are marked with minOneMessage as true. This is not the expected behavior: Assume that the FETCH request is configured with fetchMaxBytes as 50 MB and max.partition.fetch.bytes as 1 MB. And, the broker is hosting 5 partitions as leader for the topic. The FETCH request might try to read the data from remote for all the 5 partitions. If the size of the message in the topic is 30 MB, then we might be returning back 150 MB of response instead of 30 MB due to minOneMessage set to true for all the remote-read requests. Mark the minOneMessage as false when delayedRemoteFetch is present in the first partition. Discussion thread: #20088 (comment) Reviewers: Luke Chen <[email protected]>, Satish Duggana <[email protected]>
…ts.topic.compression.codec is used (#20653) The group coordinator has been having issues with unknown errors. The theory is that this is caused by optimistic compression estimates which cause unchecked batch overflows when trying to write. This PR adds a check for uncompressed record size to flush batches more eagerly and avoid overfilling partially-full batches. This should make the group coordinator errors less frequent. Also added tests to ensure this change does not impact desired behavior for large compressible records. Reviewers: Sean Quah <[email protected]>, David Jacot <[email protected]>
Extends from: #19513 details: - scala-logging_2.13: Updated from 3.9.5 to 3.9.6 - org.owasp.dependencycheck: Updated from 12.1.3 to 12.1.8 - org.scoverage: Updated from 8.0.3 to 8.1 - com.diffplug.spotless: Updated from 7.2.1 to 8.0.0 - scalafmt: Updated from 3.7.14 to 3.10.0 - grgit: Updated from 4.1.1 to 5.3.3 - jacoco: Updated from 0.8.13 to 0.8.14 - scoverage: Updated from 2.0.11 to 2.3.0 Reviewers: Chia-Ping Tsai <[email protected]>
…des (#20705) Use dedicated error method for metadata downgrade failures instead of invalidMetadataVersion(), which incorrectly suggested the metadata version number was invalid. The new error message clearly shows the version transition, e.g., "Unsupported metadata.version downgrade from 8 to 7" instead of "Invalid metadata.version 7". Reviewers: Chia-Ping Tsai <[email protected]>
Always mark the result of "missing build scan" as successful, not as failing, for 4.0 and 4.1. Two reasons causing missing build scan: - 4.0 does not have the KAFKA-18748 patches. - The JDK version used by `ci-complete` is inconsistent with other active branches, causing the build report task to fail. The real solution will be in [KAFKA-19768](https://issues.apache.org/jira/browse/KAFKA-19768). Reviewers: David Arthur <[email protected]>, Chia-Ping Tsai <[email protected]>
Marking as flaky as the failure rate has been increasing significantly latetly (18% atm) Reviewers: Andrew Schofield <[email protected]>
…0466) Cleanup 'share' from group.coordinator.rebalance.protocols as a valid value. Share Groups should instead be enabled through the share.version Reviewers: Andrew Schofield <[email protected]>, jimmy <[email protected]>, TaiJuWu <[email protected]>, Chia-Ping Tsai <[email protected]>
This PR moves yet another publisher to the metadata module. Fixes [1]. [1] - https://issues.apache.org/jira/browse/KAFKA-18710 Reviewers: Chia-Ping Tsai <[email protected]>
…ges when the producer fails with MESSAGE_TOO_LARGE, violating delivery guarantees (#20285) Bug Fix in Producer where flush() does not wait for a batch to complete after splitting. Cf - #20254 (comment) and [KAFKA-19479](https://issues.apache.org/jira/browse/KAFKA-19479) for more details Reviewers: Jun Rao <[email protected]>
…20668) Kafka Streams does not catch Error types that occur during `GlobalStreamThread` initiation, and therefore it is not possible to trace the error (for example, an `ExceptionInInitializerError` occurs when RocksDB is not found for a global store). This is because errors are not caught and logged. The catch block in `GlobalStreamThread#initialize()` has been ensured to catch `Throwable` instead of `Exception`. Additionally, the empty `setUncaughtHandler` set operation that prevented this from taking effect when users employed setUncaughtExceptionHandler has been removed. Reviewers: Matthias J. Sax <[email protected]>
As part of separating share consumer infrastructure from the regular consumer, this makes a separate metadata class for share consumers. Reviewers: Lianet Magrans <[email protected]>
For validating offset commits based on partitions, we need to be able to efficiently find the subtopology from a given source topic (user input topic or repartition topic). We introduce this precomputed map in `StreamsTopology`, where it is generated upon construction. We can use the precomputed map in StreamsGroup.isSuscribedToTopic we can use it to validate efficiently if we are still subscribed to a certain topic. Using `StreamTopology` here instead of `ConfiguredTopology` is an improvement in itself, because configured topology is soft state and not entirely reliable to always be there. Reviewers: Matthias J. Sax <[email protected]>
…p metadata (#20702) ## What Ticket: https://issues.apache.org/jira/browse/KAFKA-19765 In KIP-1071, there are configurations that affect the assignment, and that can be configured dynamically. - The previous assignment config of streams group is stored as part of the StreamsGroupMetadata as a collection of key-value pairs. - If the assignment configuration is changed, group epoch is also bumped. Reviewers: Lucas Brutschy <[email protected]>
Co-authored-by: Federico Valeri <[email protected]> Reviewers: Mickael Maison <[email protected]>, Federico Valeri <[email protected]>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
See Commits and Changes for more details.
Created by
pull[bot]
Can you help keep this open source service alive? 💖 Please sponsor : )