From 26878175b5468ffdc830bfe39b029bcfcfc9cc9c Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Thu, 14 Aug 2025 17:07:04 +0300 Subject: [PATCH] refactor: introduce server-side config overrides for consumer fetch batching Instead of fully relying on consumers to adapt their configurations to tune fetch batching; introduce a couple of configuration overrides for maxWaitMs and minBytes to allow operators to set the lower-bound of these values and ensure a more prodictable performance. --- .../main/scala/kafka/server/DelayedFetch.scala | 6 ++++-- .../main/scala/kafka/server/KafkaConfig.scala | 2 ++ .../main/scala/kafka/server/ReplicaManager.scala | 10 ++++++++-- .../kafka/server/config/ServerConfigs.java | 16 ++++++++++++++++ 4 files changed, 30 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index 74a3e2b1a2997..52ba4b24aae38 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -52,8 +52,10 @@ class DelayedFetch( fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)], replicaManager: ReplicaManager, quota: ReplicaQuota, + maxWaitMs: Option[Long] = None, + minBytes: Option[Int] = None, responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit -) extends DelayedOperation(params.maxWaitMs) with Logging { +) extends DelayedOperation(maxWaitMs.getOrElse(params.maxWaitMs)) with Logging { override def toString: String = { s"DelayedFetch(params=$params" + @@ -147,7 +149,7 @@ class DelayedFetch( } // Case G - if (accumulatedSize >= params.minBytes) + if (accumulatedSize >= minBytes.getOrElse(params.minBytes)) forceComplete() else false diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 2bdadb02fb89d..116ca7e74f181 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -403,6 +403,8 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) /** ********* Fetch Configuration **************/ val maxIncrementalFetchSessionCacheSlots = getInt(ServerConfigs.MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG) val fetchMaxBytes = getInt(ServerConfigs.FETCH_MAX_BYTES_CONFIG) + val fetchMaxWaitMs = getLong(ServerConfigs.FETCH_MAX_WAIT_MS_CONFIG) + val fetchMinBytes = getInt(ServerConfigs.FETCH_MIN_BYTES_CONFIG) /** ********* Request Limit Configuration ***********/ val maxRequestPartitionSizeLimit = getInt(ServerConfigs.MAX_REQUEST_PARTITION_SIZE_LIMIT_CONFIG) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 8fa705ef8c4ba..463f479d35301 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1663,6 +1663,10 @@ class ReplicaManager(val config: KafkaConfig, var bytesReadable: Long = 0 var errorReadingData = false + // define config overrides for consumer fetch batching + val maxWaitMs = if (params.isFromFollower) params.maxWaitMs else Math.max(params.maxWaitMs, config.fetchMaxWaitMs) + val minBytes = if (params.isFromFollower) params.minBytes else Math.max(params.minBytes, config.fetchMinBytes) + // topic-partitions that have to be read from remote storage val remoteFetchInfos = new util.HashMap[TopicIdPartition, RemoteStorageFetchInfo]() @@ -1693,7 +1697,7 @@ class ReplicaManager(val config: KafkaConfig, // 4) some error happens while reading data // 5) we found a diverging epoch // 6) has a preferred read replica - if (remoteFetchInfos.isEmpty && (params.maxWaitMs <= 0 || fetchInfos.isEmpty || bytesReadable >= params.minBytes || errorReadingData || + if (remoteFetchInfos.isEmpty && (maxWaitMs <= 0 || fetchInfos.isEmpty || bytesReadable >= minBytes || errorReadingData || hasDivergingEpoch || hasPreferredReadReplica)) { val fetchPartitionData = logReadResults.map { case (tp, result) => val isReassignmentFetch = params.isFromFollower && isAddingReplica(tp.topicPartition, params.replicaId) @@ -1720,7 +1724,9 @@ class ReplicaManager(val config: KafkaConfig, fetchPartitionStatus = fetchPartitionStatus, replicaManager = this, quota = quota, - responseCallback = responseCallback + maxWaitMs = Some(maxWaitMs), + minBytes = Some(minBytes), + responseCallback = responseCallback, ) // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java index 4910f00016b13..a6f2596924681 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java @@ -105,6 +105,20 @@ public class ServerConfigs { public static final int FETCH_MAX_BYTES_DEFAULT = 55 * 1024 * 1024; public static final String FETCH_MAX_BYTES_DOC = "The maximum number of bytes we will return for a fetch request. Must be at least 1024."; + public static final String FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes"; + public static final int FETCH_MIN_BYTES_DEFAULT = 1; + public static final String FETCH_MIN_BYTES_DOC = "The minimum number of bytes we will return for a fetch request. " + + "If the number of bytes available is less than this value, we will wait until more data is available or the request times out. This is useful for batching requests to improve throughput. " + + "This configuration is only used for fetch requests from consumers, and defines a lower-bound value to the consumer-defined fetch.min.bytes. " + + "If the consumer-defined fetch.min.bytes is smaller than this value, the server will use the consumer-defined value instead."; + + public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms"; + public static final int FETCH_MAX_WAIT_MS_DEFAULT = 100; + public static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before answering a fetch request if there isn't sufficient data to immediately satisfy the request. " + + "This is useful for batching requests to improve throughput. If the number of bytes available is less than the configured value, we will wait until more data is available or the request times out. " + + "This configuration is only used for fetch requests from consumers, and defines a lower-bound value compared to the consumer-defined fetch.max.wait.ms. " + + "If the consumer-defined fetch.max.wait.ms is smaller than this value, the server will use the consumer-defined value instead."; + /** ********* Request Limit Configuration **************/ public static final String MAX_REQUEST_PARTITION_SIZE_LIMIT_CONFIG = "max.request.partition.size.limit"; public static final int MAX_REQUEST_PARTITION_SIZE_LIMIT_DEFAULT = 2000; @@ -149,6 +163,8 @@ public class ServerConfigs { /** ********* Fetch Configuration **************/ .define(MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG, INT, MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_DEFAULT, atLeast(0), MEDIUM, MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_DOC) .define(FETCH_MAX_BYTES_CONFIG, INT, FETCH_MAX_BYTES_DEFAULT, atLeast(1024), MEDIUM, FETCH_MAX_BYTES_DOC) + .define(FETCH_MIN_BYTES_CONFIG, INT, FETCH_MIN_BYTES_DEFAULT, atLeast(1), MEDIUM, FETCH_MIN_BYTES_DOC) + .define(FETCH_MAX_WAIT_MS_CONFIG, INT, FETCH_MAX_WAIT_MS_DEFAULT, atLeast(0), MEDIUM, FETCH_MAX_WAIT_MS_DOC) /** ********* Request Limit Configuration ***********/ .define(MAX_REQUEST_PARTITION_SIZE_LIMIT_CONFIG, INT, MAX_REQUEST_PARTITION_SIZE_LIMIT_DEFAULT, atLeast(1), MEDIUM, MAX_REQUEST_PARTITION_SIZE_LIMIT_DOC)