From 30ea3d7b6ba6484fc6940187d5fd0290545e5b56 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 28 Jan 2025 00:10:20 -0800 Subject: [PATCH] KAFKA-18659: librdkafka compressed produce fails unless api versions returns produce v0 --- clients/src/main/resources/common/message/ProduceRequest.json | 2 +- .../src/main/resources/common/message/ProduceResponse.json | 2 +- core/src/main/scala/kafka/server/KafkaApis.scala | 4 ++++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/clients/src/main/resources/common/message/ProduceRequest.json b/clients/src/main/resources/common/message/ProduceRequest.json index db7d961f1373f..25d27b0fe3757 100644 --- a/clients/src/main/resources/common/message/ProduceRequest.json +++ b/clients/src/main/resources/common/message/ProduceRequest.json @@ -44,7 +44,7 @@ // transaction V2 (KIP_890 part 2) is enabled, the produce request will also include the function for a // AddPartitionsToTxn call. If V2 is disabled, the client can't use produce request version higher than 11 within // a transaction. - "validVersions": "3-12", + "validVersions": "0-12", "flexibleVersions": "9+", "fields": [ { "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "3+", "default": "null", "entityType": "transactionalId", diff --git a/clients/src/main/resources/common/message/ProduceResponse.json b/clients/src/main/resources/common/message/ProduceResponse.json index 5c12539dfb118..0291134b519b3 100644 --- a/clients/src/main/resources/common/message/ProduceResponse.json +++ b/clients/src/main/resources/common/message/ProduceResponse.json @@ -38,7 +38,7 @@ // Version 11 adds support for new error code TRANSACTION_ABORTABLE (KIP-890). // // Version 12 is the same as version 10 (KIP-890). - "validVersions": "3-12", + "validVersions": "0-12", "flexibleVersions": "9+", "fields": [ { "name": "Responses", "type": "[]TopicProduceResponse", "versions": "0+", diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 11623b02a3d8e..1d51fd0494c73 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -374,6 +374,10 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handleProduceRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { val produceRequest = request.body[ProduceRequest] + if (produceRequest.version <= 2) { + requestHelper.sendErrorResponseMaybeThrottle(request, Errors.UNSUPPORTED_VERSION.exception()) + return; + } if (RequestUtils.hasTransactionalRecords(produceRequest)) { val isAuthorizedTransactional = produceRequest.transactionalId != null &&