diff --git a/clients/src/main/resources/common/message/ProduceRequest.json b/clients/src/main/resources/common/message/ProduceRequest.json index db7d961f1373f..25d27b0fe3757 100644 --- a/clients/src/main/resources/common/message/ProduceRequest.json +++ b/clients/src/main/resources/common/message/ProduceRequest.json @@ -44,7 +44,7 @@ // transaction V2 (KIP_890 part 2) is enabled, the produce request will also include the function for a // AddPartitionsToTxn call. If V2 is disabled, the client can't use produce request version higher than 11 within // a transaction. - "validVersions": "3-12", + "validVersions": "0-12", "flexibleVersions": "9+", "fields": [ { "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "3+", "default": "null", "entityType": "transactionalId", diff --git a/clients/src/main/resources/common/message/ProduceResponse.json b/clients/src/main/resources/common/message/ProduceResponse.json index 5c12539dfb118..0291134b519b3 100644 --- a/clients/src/main/resources/common/message/ProduceResponse.json +++ b/clients/src/main/resources/common/message/ProduceResponse.json @@ -38,7 +38,7 @@ // Version 11 adds support for new error code TRANSACTION_ABORTABLE (KIP-890). // // Version 12 is the same as version 10 (KIP-890). - "validVersions": "3-12", + "validVersions": "0-12", "flexibleVersions": "9+", "fields": [ { "name": "Responses", "type": "[]TopicProduceResponse", "versions": "0+", diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 11623b02a3d8e..1d51fd0494c73 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -374,6 +374,10 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handleProduceRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { val produceRequest = request.body[ProduceRequest] + if (produceRequest.version <= 2) { + requestHelper.sendErrorResponseMaybeThrottle(request, Errors.UNSUPPORTED_VERSION.exception()) + return; + } if (RequestUtils.hasTransactionalRecords(produceRequest)) { val isAuthorizedTransactional = produceRequest.transactionalId != null &&