Skip to content

Commit

Permalink
KAFKA-18733: Implemented fetch ratio and partition acquire time metri…
Browse files Browse the repository at this point in the history
…cs (3/N) (#18959)

PR implements the final set of ShareGroupMetrics,
RequestTopicPartitionsFetchRatio and TopicPartitionsAcquireTimeMs, as
defined in KIP-1103:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1103%3A+Additional+metrics+for+cooperative+consumption

Note: Metric `RequestTopicPartitionsFetchRatio` is calculated as
percentage as Histogram API doesn't record double.


Reviewers: Andrew Schofield <[email protected]>, Abhinav Dixit <[email protected]>
  • Loading branch information
apoorvmittal10 authored Feb 21, 2025
1 parent 8f13e7c commit f543eac
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 26 deletions.
79 changes: 66 additions & 13 deletions core/src/main/java/kafka/server/share/DelayedShareFetch.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.purgatory.DelayedOperation;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy;
import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.share.metrics.ShareGroupMetrics;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
Expand All @@ -37,7 +39,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
Expand All @@ -62,33 +63,54 @@ public class DelayedShareFetch extends DelayedOperation {
private final ReplicaManager replicaManager;
private final BiConsumer<SharePartitionKey, Throwable> exceptionHandler;
private final PartitionMaxBytesStrategy partitionMaxBytesStrategy;
private final ShareGroupMetrics shareGroupMetrics;
private final Time time;
// The topic partitions that need to be completed for the share fetch request are given by sharePartitions.
// sharePartitions is a subset of shareFetchData. The order of insertion/deletion of entries in sharePartitions is important.
private final LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions;
// Tracks the start time to acquire any share partition for a fetch request.
private long acquireStartTimeMs;
private LinkedHashMap<TopicIdPartition, Long> partitionsAcquired;
private LinkedHashMap<TopicIdPartition, LogReadResult> partitionsAlreadyFetched;

/**
* This function constructs an instance of delayed share fetch operation for completing share fetch requests instantaneously or with delay.
* @param shareFetch - The share fetch parameters of the share fetch request.
* @param replicaManager - The replica manager instance used to read from log/complete the request.
* @param exceptionHandler - The handler to complete share fetch requests with exception.
* @param sharePartitions - The share partitions referenced in the share fetch request.
* This function constructs an instance of delayed share fetch operation for completing share fetch
* requests instantaneously or with delay.
*
* @param shareFetch The share fetch parameters of the share fetch request.
* @param replicaManager The replica manager instance used to read from log/complete the request.
* @param exceptionHandler The handler to complete share fetch requests with exception.
* @param sharePartitions The share partitions referenced in the share fetch request.
* @param shareGroupMetrics The share group metrics to record the metrics.
* @param time The system time.
*/
public DelayedShareFetch(
ShareFetch shareFetch,
ReplicaManager replicaManager,
BiConsumer<SharePartitionKey, Throwable> exceptionHandler,
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions) {
this(shareFetch, replicaManager, exceptionHandler, sharePartitions, PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM));
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions,
ShareGroupMetrics shareGroupMetrics,
Time time
) {
this(shareFetch,
replicaManager,
exceptionHandler,
sharePartitions,
PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM),
shareGroupMetrics,
time
);
}

DelayedShareFetch(
ShareFetch shareFetch,
ReplicaManager replicaManager,
BiConsumer<SharePartitionKey, Throwable> exceptionHandler,
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions,
PartitionMaxBytesStrategy partitionMaxBytesStrategy) {
PartitionMaxBytesStrategy partitionMaxBytesStrategy,
ShareGroupMetrics shareGroupMetrics,
Time time
) {
super(shareFetch.fetchParams().maxWaitMs, Optional.empty());
this.shareFetch = shareFetch;
this.replicaManager = replicaManager;
Expand All @@ -97,6 +119,9 @@ public DelayedShareFetch(
this.exceptionHandler = exceptionHandler;
this.sharePartitions = sharePartitions;
this.partitionMaxBytesStrategy = partitionMaxBytesStrategy;
this.shareGroupMetrics = shareGroupMetrics;
this.time = time;
this.acquireStartTimeMs = time.hiResClockMs();
}

@Override
Expand All @@ -120,16 +145,28 @@ public void onComplete() {
try {
LinkedHashMap<TopicIdPartition, Long> topicPartitionData;
// tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch.
if (partitionsAcquired.isEmpty())
if (partitionsAcquired.isEmpty()) {
topicPartitionData = acquirablePartitions();
// tryComplete invoked forceComplete, so we can use the data from tryComplete.
else
// The TopicPartitionsAcquireTimeMs metric signifies the tension when acquiring the locks
// for the share partition, hence if no partitions are yet acquired by tryComplete,
// we record the metric here. Do not check if the request has successfully acquired any
// partitions now or not, as then the upper bound of request timeout shall be recorded
// for the metric.
updateAcquireElapsedTimeMetric();
} else {
// tryComplete invoked forceComplete, so we can use the data from tryComplete.
topicPartitionData = partitionsAcquired;
}

if (topicPartitionData.isEmpty()) {
// No locks for share partitions could be acquired, so we complete the request with an empty response.
shareFetch.maybeComplete(Collections.emptyMap());
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), 0);
shareFetch.maybeComplete(Map.of());
return;
} else {
// Update metric to record acquired to requested partitions.
double requestTopicToAcquired = (double) topicPartitionData.size() / shareFetch.partitionMaxBytes().size();
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) (requestTopicToAcquired * 100));
}
log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}",
topicPartitionData, shareFetch.groupId(), shareFetch.fetchParams());
Expand Down Expand Up @@ -183,6 +220,8 @@ public boolean tryComplete() {

try {
if (!topicPartitionData.isEmpty()) {
// Update the metric to record the time taken to acquire the locks for the share partitions.
updateAcquireElapsedTimeMetric();
// In case, fetch offset metadata doesn't exist for one or more topic partitions, we do a
// replicaManager.readFromLog to populate the offset metadata and update the fetch offset metadata for
// those topic partitions.
Expand Down Expand Up @@ -417,6 +456,20 @@ private void handleFetchException(
shareFetch.maybeCompleteWithException(topicIdPartitions, throwable);
}

/**
* The method updates the metric for the time taken to acquire the share partition locks. Also,
* it resets the acquireStartTimeMs to the current time, so that the metric records the time taken
* to acquire the locks for the re-try, if the partitions are re-acquired. The partitions can be
* re-acquired if the fetch request is not completed because of the minBytes or some other condition.
*/
private void updateAcquireElapsedTimeMetric() {
long currentTimeMs = time.hiResClockMs();
shareGroupMetrics.recordTopicPartitionsAcquireTimeMs(shareFetch.groupId(), currentTimeMs - acquireStartTimeMs);
// Reset the acquireStartTimeMs to the current time. If the fetch request is not completed
// and the partitions are re-acquired then metric should record value from the last acquire time.
acquireStartTimeMs = currentTimeMs;
}

// Visible for testing.
LinkedHashMap<TopicIdPartition, LogReadResult> combineLogReadResponse(LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
LinkedHashMap<TopicIdPartition, LogReadResult> existingFetchedData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ void processShareFetch(ShareFetch shareFetch) {
// Add the share fetch to the delayed share fetch purgatory to process the fetch request.
// The request will be added irrespective of whether the share partition is initialized or not.
// Once the share partition is initialized, the delayed share fetch will be completed.
addDelayedShareFetch(new DelayedShareFetch(shareFetch, replicaManager, fencedSharePartitionHandler(), sharePartitions), delayedShareFetchWatchKeys);
addDelayedShareFetch(new DelayedShareFetch(shareFetch, replicaManager, fencedSharePartitionHandler(), sharePartitions, shareGroupMetrics, time), delayedShareFetchWatchKeys);
}

private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitionKey) {
Expand Down
Loading

0 comments on commit f543eac

Please sign in to comment.