Skip to content

Commit

Permalink
KAFKA-18659: librdkafka compressed produce fails unless api versions …
Browse files Browse the repository at this point in the history
…returns produce v0
  • Loading branch information
ijuma committed Jan 29, 2025
1 parent e6d72c9 commit a252374
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.TaggedFields;
import org.apache.kafka.common.protocol.types.Type;
import org.apache.kafka.common.requests.ProduceRequest;

import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
Expand Down Expand Up @@ -178,7 +179,9 @@ public static String toHtml() {
// Requests
b.append("<b>Requests:</b><br>\n");
Schema[] requests = key.messageType.requestSchemas();
for (short version = key.oldestVersion(); version <= key.latestVersion(); version++) {
// See `ProduceRequest.MIN_VERSION` for details on why we need to do this
short oldestVersion = key == ApiKeys.PRODUCE ? ProduceRequest.MIN_VERSION : key.oldestVersion();
for (short version = oldestVersion; version <= key.latestVersion(); version++) {
Schema schema = requests[version];
if (schema == null)
throw new IllegalStateException("Unexpected null schema for " + key + " with version " + version);
Expand Down Expand Up @@ -208,7 +211,7 @@ public static String toHtml() {
// Responses
b.append("<b>Responses:</b><br>\n");
Schema[] responses = key.messageType.responseSchemas();
for (int i = 0; i < responses.length; i++) {
for (int i = oldestVersion; i < responses.length; i++) {
Schema schema = responses[i];
// Schema
if (schema != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,19 @@
import static org.apache.kafka.common.requests.ProduceResponse.INVALID_OFFSET;

public class ProduceRequest extends AbstractRequest {
// Versions 0-2 were removed in Apache Kafka 4.0, version 3 is the new baseline. Due to a bug in librdkafka,
// these versions have to be included in the api versions response (see KAFKA-18659), which means we cannot exclude
// them from the protocol definition. Instead, we reject requests with such versions in `KafkaApis` by returning
// `UnsupportedVersion` errors. We also special case the generated protocol html to exclude versions 0-2.
public static final short MIN_VERSION = 3;
public static final short LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2 = 11;

public static Builder builder(ProduceRequestData data, boolean useTransactionV1Version) {
// When we use transaction V1 protocol in transaction we set the request version upper limit to
// LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2 so that the broker knows that we're using transaction protocol V1.
short maxVersion = useTransactionV1Version ?
LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2 : ApiKeys.PRODUCE.latestVersion();
return new Builder(ApiKeys.PRODUCE.oldestVersion(), maxVersion, data);
return new Builder(MIN_VERSION, maxVersion, data);
}

public static Builder builder(ProduceRequestData data) {
Expand All @@ -69,11 +74,6 @@ public ProduceRequest build(short version) {
return build(version, true);
}

// Visible for testing only
public ProduceRequest buildUnsafe(short version) {
return build(version, false);
}

private ProduceRequest build(short version, boolean validate) {
if (validate) {
// Validate the given records first
Expand Down Expand Up @@ -244,4 +244,5 @@ public static ProduceRequest parse(ByteBuffer buffer, short version) {
public static boolean isTransactionV2Requested(short version) {
return version > LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2;
}

}
7 changes: 5 additions & 2 deletions clients/src/main/resources/common/message/ProduceRequest.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
"type": "request",
"listeners": ["broker"],
"name": "ProduceRequest",
// Versions 0-2 were removed in Apache Kafka 4.0, Version 3 is the new baseline.
// Versions 0-2 were removed in Apache Kafka 4.0, version 3 is the new baseline. Due to a bug in librdkafka,
// these versions have to be included in the api versions response (see KAFKA-18659), which means we cannot exclude
// them from `validVersions`. Instead, we reject requests with such versions in `KafkaApis` by returning
// `UnsupportedVersion` errors.
//
// Version 1 and 2 are the same as version 0.
//
Expand All @@ -44,7 +47,7 @@
// transaction V2 (KIP_890 part 2) is enabled, the produce request will also include the function for a
// AddPartitionsToTxn call. If V2 is disabled, the client can't use produce request version higher than 11 within
// a transaction.
"validVersions": "3-12",
"validVersions": "0-12",
"flexibleVersions": "9+",
"fields": [
{ "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "3+", "default": "null", "entityType": "transactionalId",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
"apiKey": 0,
"type": "response",
"name": "ProduceResponse",
// Versions 0-2 were removed in Apache Kafka 4.0, Version 3 is the new baseline.
// Versions 0-2 were removed in Apache Kafka 4.0, version 3 is the new baseline. Due to a bug in librdkafka,
// these versions have to be included in the api versions response (see KAFKA-18659), which means we cannot exclude
// them from `validVersions`. Instead, we reject requests with such versions in `KafkaApis` by returning
// `UnsupportedVersion` errors.
//
// Version 1 added the throttle time.
// Version 2 added the log append time.
Expand All @@ -38,7 +41,7 @@
// Version 11 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
//
// Version 12 is the same as version 10 (KIP-890).
"validVersions": "3-12",
"validVersions": "0-12",
"flexibleVersions": "9+",
"fields": [
{ "name": "Responses", "type": "[]TopicProduceResponse", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,7 @@ public void testV6AndBelowCannotUseZStdCompression() {
.setAcks((short) 1)
.setTimeoutMs(1000);
// Can't create ProduceRequest instance with version within [3, 7)
for (short version = 3; version < 7; version++) {

for (short version = ProduceRequest.MIN_VERSION; version < 7; version++) {
ProduceRequest.Builder requestBuilder = new ProduceRequest.Builder(version, version, produceData);
assertThrowsForAllVersions(requestBuilder, UnsupportedCompressionTypeException.class);
}
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,11 @@ class KafkaApis(val requestChannel: RequestChannel,
*/
def handleProduceRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
val produceRequest = request.body[ProduceRequest]
// See `ProduceRequest.MIN_VERSION` for details on why we need to do this
if (produceRequest.version < ProduceRequest.MIN_VERSION) {
requestHelper.sendErrorResponseMaybeThrottle(request, Errors.UNSUPPORTED_VERSION.exception())
return;
}

if (RequestUtils.hasTransactionalRecords(produceRequest)) {
val isAuthorizedTransactional = produceRequest.transactionalId != null &&
Expand Down
40 changes: 39 additions & 1 deletion core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
import org.junit.jupiter.params.provider.ValueSource
Expand Down Expand Up @@ -254,7 +255,7 @@ class ProduceRequestTest extends BaseRequestTest {
// Create a single-partition topic compressed with ZSTD
val topicConfig = new Properties
topicConfig.setProperty(TopicConfig.COMPRESSION_TYPE_CONFIG, BrokerCompressionType.ZSTD.name)
val partitionToLeader = createTopic(topic, topicConfig = topicConfig)
val partitionToLeader = createTopic(topic, topicConfig = topicConfig)
val leader = partitionToLeader(partition)
val memoryRecords = MemoryRecords.withRecords(Compression.zstd().build(),
new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "value".getBytes))
Expand Down Expand Up @@ -283,6 +284,43 @@ class ProduceRequestTest extends BaseRequestTest {
assertEquals(-1, partitionProduceResponse1.logAppendTimeMs)
}

/**
* See `ProduceRequest.MIN_VERSION` for the details on why we need special handling for produce request v0-v2 (inclusive).
*/
@Test
def testProduceRequestV0V1V2FailsWithUnsupportedVersion(): Unit = {
val topic = "topic"
val partition = 0
val partitionToLeader = createTopic(topic)
val leader = partitionToLeader(partition)
val memoryRecords = MemoryRecords.withRecords(Compression.none().build(),
new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "value".getBytes))
val topicPartition = new TopicPartition("topic", partition)
val partitionRecords = new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
new ProduceRequestData.TopicProduceData()
.setName("topic").setPartitionData(Collections.singletonList(
new ProduceRequestData.PartitionProduceData()
.setIndex(partition)
.setRecords(memoryRecords))))
.iterator))
.setAcks((-1).toShort)
.setTimeoutMs(3000)
.setTransactionalId(null)

for (i <- 0 until ProduceRequest.MIN_VERSION) {
val version = i.toShort
val produceResponse1 = sendProduceRequest(leader, new ProduceRequest.Builder(version, version, partitionRecords).build())
val topicProduceResponse1 = produceResponse1.data.responses.asScala.head
val partitionProduceResponse1 = topicProduceResponse1.partitionResponses.asScala.head
val tp1 = new TopicPartition(topicProduceResponse1.name, partitionProduceResponse1.index)
assertEquals(topicPartition, tp1)
assertEquals(Errors.UNSUPPORTED_VERSION, Errors.forCode(partitionProduceResponse1.errorCode))
assertEquals(-1, partitionProduceResponse1.baseOffset)
assertEquals(-1, partitionProduceResponse1.logAppendTimeMs)
}
}

private def sendProduceRequest(leaderId: Int, request: ProduceRequest): ProduceResponse = {
connectAndReceive[ProduceResponse](request, destination = brokerSocketServer(leaderId))
}
Expand Down

0 comments on commit a252374

Please sign in to comment.