diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 1459a90103012..dddee842edef9 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -22,6 +22,7 @@
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.TaggedFields;
import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.common.requests.ProduceRequest;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
@@ -178,7 +179,9 @@ public static String toHtml() {
// Requests
b.append("Requests:
\n");
Schema[] requests = key.messageType.requestSchemas();
- for (short version = key.oldestVersion(); version <= key.latestVersion(); version++) {
+ // See `ProduceRequest.MIN_VERSION` for details on why we need to do this
+ short oldestVersion = key == ApiKeys.PRODUCE ? ProduceRequest.MIN_VERSION : key.oldestVersion();
+ for (short version = oldestVersion; version <= key.latestVersion(); version++) {
Schema schema = requests[version];
if (schema == null)
throw new IllegalStateException("Unexpected null schema for " + key + " with version " + version);
@@ -208,7 +211,7 @@ public static String toHtml() {
// Responses
b.append("Responses:
\n");
Schema[] responses = key.messageType.responseSchemas();
- for (int i = 0; i < responses.length; i++) {
+ for (int i = oldestVersion; i < responses.length; i++) {
Schema schema = responses[i];
// Schema
if (schema != null) {
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 8fbd86cb9bb43..5b76ed9a5ee27 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -40,6 +40,11 @@
import static org.apache.kafka.common.requests.ProduceResponse.INVALID_OFFSET;
public class ProduceRequest extends AbstractRequest {
+ // Versions 0-2 were removed in Apache Kafka 4.0, version 3 is the new baseline. Due to a bug in librdkafka,
+ // these versions have to be included in the api versions response (see KAFKA-18659), which means we cannot exclude
+ // them from the protocol definition. Instead, we reject requests with such versions in `KafkaApis` by returning
+ // `UnsupportedVersion` errors. We also special case the generated protocol html to exclude versions 0-2.
+ public static final short MIN_VERSION = 3;
public static final short LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2 = 11;
public static Builder builder(ProduceRequestData data, boolean useTransactionV1Version) {
@@ -47,7 +52,7 @@ public static Builder builder(ProduceRequestData data, boolean useTransactionV1V
// LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2 so that the broker knows that we're using transaction protocol V1.
short maxVersion = useTransactionV1Version ?
LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2 : ApiKeys.PRODUCE.latestVersion();
- return new Builder(ApiKeys.PRODUCE.oldestVersion(), maxVersion, data);
+ return new Builder(MIN_VERSION, maxVersion, data);
}
public static Builder builder(ProduceRequestData data) {
@@ -69,11 +74,6 @@ public ProduceRequest build(short version) {
return build(version, true);
}
- // Visible for testing only
- public ProduceRequest buildUnsafe(short version) {
- return build(version, false);
- }
-
private ProduceRequest build(short version, boolean validate) {
if (validate) {
// Validate the given records first
@@ -244,4 +244,5 @@ public static ProduceRequest parse(ByteBuffer buffer, short version) {
public static boolean isTransactionV2Requested(short version) {
return version > LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2;
}
+
}
diff --git a/clients/src/main/resources/common/message/ProduceRequest.json b/clients/src/main/resources/common/message/ProduceRequest.json
index db7d961f1373f..7f55cf09cb3ec 100644
--- a/clients/src/main/resources/common/message/ProduceRequest.json
+++ b/clients/src/main/resources/common/message/ProduceRequest.json
@@ -18,7 +18,10 @@
"type": "request",
"listeners": ["broker"],
"name": "ProduceRequest",
- // Versions 0-2 were removed in Apache Kafka 4.0, Version 3 is the new baseline.
+ // Versions 0-2 were removed in Apache Kafka 4.0, version 3 is the new baseline. Due to a bug in librdkafka,
+ // these versions have to be included in the api versions response (see KAFKA-18659), which means we cannot exclude
+ // them from `validVersions`. Instead, we reject requests with such versions in `KafkaApis` by returning
+ // `UnsupportedVersion` errors.
//
// Version 1 and 2 are the same as version 0.
//
@@ -44,7 +47,7 @@
// transaction V2 (KIP_890 part 2) is enabled, the produce request will also include the function for a
// AddPartitionsToTxn call. If V2 is disabled, the client can't use produce request version higher than 11 within
// a transaction.
- "validVersions": "3-12",
+ "validVersions": "0-12",
"flexibleVersions": "9+",
"fields": [
{ "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "3+", "default": "null", "entityType": "transactionalId",
diff --git a/clients/src/main/resources/common/message/ProduceResponse.json b/clients/src/main/resources/common/message/ProduceResponse.json
index 5c12539dfb118..cf67acaa82cbd 100644
--- a/clients/src/main/resources/common/message/ProduceResponse.json
+++ b/clients/src/main/resources/common/message/ProduceResponse.json
@@ -17,7 +17,10 @@
"apiKey": 0,
"type": "response",
"name": "ProduceResponse",
- // Versions 0-2 were removed in Apache Kafka 4.0, Version 3 is the new baseline.
+ // Versions 0-2 were removed in Apache Kafka 4.0, version 3 is the new baseline. Due to a bug in librdkafka,
+ // these versions have to be included in the api versions response (see KAFKA-18659), which means we cannot exclude
+ // them from `validVersions`. Instead, we reject requests with such versions in `KafkaApis` by returning
+ // `UnsupportedVersion` errors.
//
// Version 1 added the throttle time.
// Version 2 added the log append time.
@@ -38,7 +41,7 @@
// Version 11 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
//
// Version 12 is the same as version 10 (KIP-890).
- "validVersions": "3-12",
+ "validVersions": "0-12",
"flexibleVersions": "9+",
"fields": [
{ "name": "Responses", "type": "[]TopicProduceResponse", "versions": "0+",
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
index ecb8869c38bd2..26bedc55e35b0 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
@@ -212,8 +212,7 @@ public void testV6AndBelowCannotUseZStdCompression() {
.setAcks((short) 1)
.setTimeoutMs(1000);
// Can't create ProduceRequest instance with version within [3, 7)
- for (short version = 3; version < 7; version++) {
-
+ for (short version = ProduceRequest.MIN_VERSION; version < 7; version++) {
ProduceRequest.Builder requestBuilder = new ProduceRequest.Builder(version, version, produceData);
assertThrowsForAllVersions(requestBuilder, UnsupportedCompressionTypeException.class);
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 18795a7e0f38d..c1afc5298b873 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -374,6 +374,11 @@ class KafkaApis(val requestChannel: RequestChannel,
*/
def handleProduceRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
val produceRequest = request.body[ProduceRequest]
+ // See `ProduceRequest.MIN_VERSION` for details on why we need to do this
+ if (produceRequest.version < ProduceRequest.MIN_VERSION) {
+ requestHelper.sendErrorResponseMaybeThrottle(request, Errors.UNSUPPORTED_VERSION.exception())
+ return;
+ }
if (RequestUtils.hasTransactionalRecords(produceRequest)) {
val isAuthorizedTransactional = produceRequest.transactionalId != null &&
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
index 64111f1487513..44d1d1de6429a 100644
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics
import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
import org.junit.jupiter.params.provider.ValueSource
@@ -254,18 +255,18 @@ class ProduceRequestTest extends BaseRequestTest {
// Create a single-partition topic compressed with ZSTD
val topicConfig = new Properties
topicConfig.setProperty(TopicConfig.COMPRESSION_TYPE_CONFIG, BrokerCompressionType.ZSTD.name)
- val partitionToLeader = createTopic(topic, topicConfig = topicConfig)
+ val partitionToLeader = createTopic(topic, topicConfig = topicConfig)
val leader = partitionToLeader(partition)
val memoryRecords = MemoryRecords.withRecords(Compression.zstd().build(),
new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "value".getBytes))
val topicPartition = new TopicPartition("topic", partition)
val partitionRecords = new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
- new ProduceRequestData.TopicProduceData()
- .setName("topic").setPartitionData(Collections.singletonList(
- new ProduceRequestData.PartitionProduceData()
- .setIndex(partition)
- .setRecords(memoryRecords))))
+ new ProduceRequestData.TopicProduceData()
+ .setName("topic").setPartitionData(Collections.singletonList(
+ new ProduceRequestData.PartitionProduceData()
+ .setIndex(partition)
+ .setRecords(memoryRecords))))
.iterator))
.setAcks((-1).toShort)
.setTimeoutMs(3000)
@@ -283,6 +284,43 @@ class ProduceRequestTest extends BaseRequestTest {
assertEquals(-1, partitionProduceResponse1.logAppendTimeMs)
}
+ /**
+ * See `ProduceRequest.MIN_VERSION` for the details on why we need special handling for produce request v0-v2 (inclusive).
+ */
+ @Test
+ def testProduceRequestV0V1V2FailsWithUnsupportedVersion(): Unit = {
+ val topic = "topic"
+ val partition = 0
+ val partitionToLeader = createTopic(topic)
+ val leader = partitionToLeader(partition)
+ val memoryRecords = MemoryRecords.withRecords(Compression.none().build(),
+ new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "value".getBytes))
+ val topicPartition = new TopicPartition("topic", partition)
+ val partitionRecords = new ProduceRequestData()
+ .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
+ new ProduceRequestData.TopicProduceData()
+ .setName("topic").setPartitionData(Collections.singletonList(
+ new ProduceRequestData.PartitionProduceData()
+ .setIndex(partition)
+ .setRecords(memoryRecords))))
+ .iterator))
+ .setAcks((-1).toShort)
+ .setTimeoutMs(3000)
+ .setTransactionalId(null)
+
+ for (i <- 0 until ProduceRequest.MIN_VERSION) {
+ val version = i.toShort
+ val produceResponse1 = sendProduceRequest(leader, new ProduceRequest.Builder(version, version, partitionRecords).build())
+ val topicProduceResponse1 = produceResponse1.data.responses.asScala.head
+ val partitionProduceResponse1 = topicProduceResponse1.partitionResponses.asScala.head
+ val tp1 = new TopicPartition(topicProduceResponse1.name, partitionProduceResponse1.index)
+ assertEquals(topicPartition, tp1)
+ assertEquals(Errors.UNSUPPORTED_VERSION, Errors.forCode(partitionProduceResponse1.errorCode))
+ assertEquals(-1, partitionProduceResponse1.baseOffset)
+ assertEquals(-1, partitionProduceResponse1.logAppendTimeMs)
+ }
+ }
+
private def sendProduceRequest(leaderId: Int, request: ProduceRequest): ProduceResponse = {
connectAndReceive[ProduceResponse](request, destination = brokerSocketServer(leaderId))
}