Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/src/main/scala/kafka/server/DelayedFetch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ class DelayedFetch(
fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)],
replicaManager: ReplicaManager,
quota: ReplicaQuota,
maxWaitMs: Option[Long] = None,
minBytes: Option[Int] = None,
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit
) extends DelayedOperation(params.maxWaitMs) with Logging {
) extends DelayedOperation(maxWaitMs.getOrElse(params.maxWaitMs)) with Logging {

override def toString: String = {
s"DelayedFetch(params=$params" +
Expand Down Expand Up @@ -147,7 +149,7 @@ class DelayedFetch(
}

// Case G
if (accumulatedSize >= params.minBytes)
if (accumulatedSize >= minBytes.getOrElse(params.minBytes))
forceComplete()
else
false
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,8 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
/** ********* Fetch Configuration **************/
val maxIncrementalFetchSessionCacheSlots = getInt(ServerConfigs.MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG)
val fetchMaxBytes = getInt(ServerConfigs.FETCH_MAX_BYTES_CONFIG)
val fetchMaxWaitMs = getLong(ServerConfigs.FETCH_MAX_WAIT_MS_CONFIG)
val fetchMinBytes = getInt(ServerConfigs.FETCH_MIN_BYTES_CONFIG)

/** ********* Request Limit Configuration ***********/
val maxRequestPartitionSizeLimit = getInt(ServerConfigs.MAX_REQUEST_PARTITION_SIZE_LIMIT_CONFIG)
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1663,6 +1663,10 @@ class ReplicaManager(val config: KafkaConfig,
var bytesReadable: Long = 0
var errorReadingData = false

// define config overrides for consumer fetch batching
val maxWaitMs = if (params.isFromFollower) params.maxWaitMs else Math.max(params.maxWaitMs, config.fetchMaxWaitMs)
val minBytes = if (params.isFromFollower) params.minBytes else Math.max(params.minBytes, config.fetchMinBytes)

// topic-partitions that have to be read from remote storage
val remoteFetchInfos = new util.HashMap[TopicIdPartition, RemoteStorageFetchInfo]()

Expand Down Expand Up @@ -1693,7 +1697,7 @@ class ReplicaManager(val config: KafkaConfig,
// 4) some error happens while reading data
// 5) we found a diverging epoch
// 6) has a preferred read replica
if (remoteFetchInfos.isEmpty && (params.maxWaitMs <= 0 || fetchInfos.isEmpty || bytesReadable >= params.minBytes || errorReadingData ||
if (remoteFetchInfos.isEmpty && (maxWaitMs <= 0 || fetchInfos.isEmpty || bytesReadable >= minBytes || errorReadingData ||
hasDivergingEpoch || hasPreferredReadReplica)) {
val fetchPartitionData = logReadResults.map { case (tp, result) =>
val isReassignmentFetch = params.isFromFollower && isAddingReplica(tp.topicPartition, params.replicaId)
Expand All @@ -1720,7 +1724,9 @@ class ReplicaManager(val config: KafkaConfig,
fetchPartitionStatus = fetchPartitionStatus,
replicaManager = this,
quota = quota,
responseCallback = responseCallback
maxWaitMs = Some(maxWaitMs),
minBytes = Some(minBytes),
responseCallback = responseCallback,
)

// create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,20 @@ public class ServerConfigs {
public static final int FETCH_MAX_BYTES_DEFAULT = 55 * 1024 * 1024;
public static final String FETCH_MAX_BYTES_DOC = "The maximum number of bytes we will return for a fetch request. Must be at least 1024.";

public static final String FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes";
public static final int FETCH_MIN_BYTES_DEFAULT = 1;
public static final String FETCH_MIN_BYTES_DOC = "The minimum number of bytes we will return for a fetch request. " +
"If the number of bytes available is less than this value, we will wait until more data is available or the request times out. This is useful for batching requests to improve throughput. " +
"This configuration is only used for fetch requests from consumers, and defines a lower-bound value to the consumer-defined fetch.min.bytes. " +
"If the consumer-defined fetch.min.bytes is smaller than this value, the server will use the consumer-defined value instead.";

public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms";
public static final int FETCH_MAX_WAIT_MS_DEFAULT = 100;
public static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before answering a fetch request if there isn't sufficient data to immediately satisfy the request. " +
"This is useful for batching requests to improve throughput. If the number of bytes available is less than the configured value, we will wait until more data is available or the request times out. " +
"This configuration is only used for fetch requests from consumers, and defines a lower-bound value compared to the consumer-defined fetch.max.wait.ms. " +
"If the consumer-defined fetch.max.wait.ms is smaller than this value, the server will use the consumer-defined value instead.";

/** ********* Request Limit Configuration **************/
public static final String MAX_REQUEST_PARTITION_SIZE_LIMIT_CONFIG = "max.request.partition.size.limit";
public static final int MAX_REQUEST_PARTITION_SIZE_LIMIT_DEFAULT = 2000;
Expand Down Expand Up @@ -149,6 +163,8 @@ public class ServerConfigs {
/** ********* Fetch Configuration **************/
.define(MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG, INT, MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_DEFAULT, atLeast(0), MEDIUM, MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_DOC)
.define(FETCH_MAX_BYTES_CONFIG, INT, FETCH_MAX_BYTES_DEFAULT, atLeast(1024), MEDIUM, FETCH_MAX_BYTES_DOC)
.define(FETCH_MIN_BYTES_CONFIG, INT, FETCH_MIN_BYTES_DEFAULT, atLeast(1), MEDIUM, FETCH_MIN_BYTES_DOC)
.define(FETCH_MAX_WAIT_MS_CONFIG, INT, FETCH_MAX_WAIT_MS_DEFAULT, atLeast(0), MEDIUM, FETCH_MAX_WAIT_MS_DOC)

/** ********* Request Limit Configuration ***********/
.define(MAX_REQUEST_PARTITION_SIZE_LIMIT_CONFIG, INT, MAX_REQUEST_PARTITION_SIZE_LIMIT_DEFAULT, atLeast(1), MEDIUM, MAX_REQUEST_PARTITION_SIZE_LIMIT_DOC)
Expand Down
Loading