-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-18659: librdkafka compressed produce fails unless api versions returns produce v0 #18727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a252374
243771e
8a5d335
6420650
b38a7ef
593db8b
55056f2
cff110c
4d0227a
04db4eb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -149,6 +149,11 @@ public enum ApiKeys { | |
private static final Map<Integer, ApiKeys> ID_TO_TYPE = Arrays.stream(ApiKeys.values()) | ||
.collect(Collectors.toMap(key -> (int) key.id, Function.identity())); | ||
|
||
// Versions 0-2 were removed in Apache Kafka 4.0, version 3 is the new baseline. Due to a bug in librdkafka, | ||
// version `0` has to be included in the api versions response (see KAFKA-18659). In order to achieve that, | ||
// we adjust `toApiVersion` to return `0` for the min version of `produce` in the broker listener. | ||
public static final short PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION = 0; | ||
|
||
/** the permanent and immutable id of an API - this can't change ever */ | ||
public final short id; | ||
|
||
|
@@ -264,8 +269,30 @@ public boolean hasValidVersion() { | |
return oldestVersion() <= latestVersion(); | ||
} | ||
|
||
/** | ||
* To workaround a critical bug in librdkafka, the api versions response is inconsistent with the actual versions | ||
* supported by `produce` - this method handles that. It should be called in the context of the api response protocol | ||
* handling. | ||
* | ||
* It should not be used by code generating protocol documentation - we keep that consistent with the actual versions | ||
* supported by `produce`. | ||
* | ||
* See `PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION` for details. | ||
*/ | ||
public Optional<ApiVersionsResponseData.ApiVersion> toApiVersionForApiResponse(boolean enableUnstableLastVersion, | ||
ApiMessageType.ListenerType listenerType) { | ||
return toApiVersion(enableUnstableLastVersion, Optional.of(listenerType)); | ||
} | ||
|
||
public Optional<ApiVersionsResponseData.ApiVersion> toApiVersion(boolean enableUnstableLastVersion) { | ||
short oldestVersion = oldestVersion(); | ||
return toApiVersion(enableUnstableLastVersion, Optional.empty()); | ||
} | ||
|
||
private Optional<ApiVersionsResponseData.ApiVersion> toApiVersion(boolean enableUnstableLastVersion, | ||
Optional<ApiMessageType.ListenerType> listenerType) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since listenerType is optional, could we add a comment when the caller is expected to pass in the right listenerType? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fair point, I updated this to have two methods so it's clear which one to be used when. One of them always take the API listener and the other one doesn't. There is only one non test usage of the latter currently. |
||
// see `PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION` for details on why we do this | ||
short oldestVersion = (this == PRODUCE && listenerType.map(l -> l == ApiMessageType.ListenerType.BROKER).orElse(false)) ? | ||
ijuma marked this conversation as resolved.
Show resolved
Hide resolved
|
||
PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION : oldestVersion(); | ||
short latestVersion = latestVersion(enableUnstableLastVersion); | ||
|
||
// API is entirely disabled if latestStableVersion is smaller than oldestVersion. | ||
|
@@ -299,7 +326,7 @@ static String toHtml() { | |
b.append("<th>Key</th>\n"); | ||
b.append("</tr>"); | ||
clientApis().stream() | ||
.filter(apiKey -> apiKey.toApiVersion(false).isPresent()) | ||
.filter(apiKey -> apiKey.toApiVersion(false, Optional.empty()).isPresent()) | ||
.forEach(apiKey -> { | ||
b.append("<tr>\n"); | ||
b.append("<td>"); | ||
|
@@ -341,10 +368,7 @@ public static EnumSet<ApiKeys> controllerApis() { | |
} | ||
|
||
public static EnumSet<ApiKeys> clientApis() { | ||
List<ApiKeys> apis = Arrays.stream(ApiKeys.values()) | ||
.filter(apiKey -> apiKey.inScope(ApiMessageType.ListenerType.BROKER)) | ||
.collect(Collectors.toList()); | ||
return EnumSet.copyOf(apis); | ||
return brokerApis(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This simplification is unrelated to the main change in this PR - just something I noticed could be cleaned up. |
||
} | ||
|
||
public static EnumSet<ApiKeys> apisForListener(ApiMessageType.ListenerType listener) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -208,26 +208,26 @@ public static String toHtml() { | |
// Responses | ||
b.append("<b>Responses:</b><br>\n"); | ||
Schema[] responses = key.messageType.responseSchemas(); | ||
for (int i = 0; i < responses.length; i++) { | ||
Schema schema = responses[i]; | ||
for (int version = key.oldestVersion(); version < key.latestVersion(); version++) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use the same approach we use for requests for consistency - it should result in the same behavior, but fail more clearly if there is some inconsistency. |
||
Schema schema = responses[version]; | ||
if (schema == null) | ||
throw new IllegalStateException("Unexpected null schema for " + key + " with version " + version); | ||
// Schema | ||
if (schema != null) { | ||
b.append("<div>"); | ||
// Version header | ||
b.append("<pre>"); | ||
b.append(key.name); | ||
b.append(" Response (Version: "); | ||
b.append(i); | ||
b.append(") => "); | ||
schemaToBnfHtml(responses[i], b, 2); | ||
b.append("</pre>"); | ||
|
||
b.append("<p><b>Response header version:</b> "); | ||
b.append(key.responseHeaderVersion((short) i)); | ||
b.append("</p>\n"); | ||
|
||
schemaToFieldTableHtml(responses[i], b); | ||
} | ||
b.append("<div>"); | ||
// Version header | ||
b.append("<pre>"); | ||
b.append(key.name); | ||
b.append(" Response (Version: "); | ||
b.append(version); | ||
b.append(") => "); | ||
schemaToBnfHtml(responses[version], b, 2); | ||
b.append("</pre>"); | ||
|
||
b.append("<p><b>Response header version:</b> "); | ||
b.append(key.responseHeaderVersion((short) version)); | ||
b.append("</p>\n"); | ||
|
||
schemaToFieldTableHtml(responses[version], b); | ||
b.append("</div>\n"); | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,6 +40,7 @@ | |
import static org.apache.kafka.common.requests.ProduceResponse.INVALID_OFFSET; | ||
|
||
public class ProduceRequest extends AbstractRequest { | ||
|
||
public static final short LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2 = 11; | ||
|
||
public static Builder builder(ProduceRequestData data, boolean useTransactionV1Version) { | ||
|
@@ -66,21 +67,10 @@ public Builder(short minVersion, | |
|
||
@Override | ||
public ProduceRequest build(short version) { | ||
return build(version, true); | ||
} | ||
|
||
// Visible for testing only | ||
public ProduceRequest buildUnsafe(short version) { | ||
ijuma marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed unused overloads. |
||
return build(version, false); | ||
} | ||
|
||
private ProduceRequest build(short version, boolean validate) { | ||
if (validate) { | ||
// Validate the given records first | ||
data.topicData().forEach(tpd -> | ||
tpd.partitionData().forEach(partitionProduceData -> | ||
ProduceRequest.validateRecords(version, partitionProduceData.records()))); | ||
} | ||
// Validate the given records first | ||
data.topicData().forEach(tpd -> | ||
tpd.partitionData().forEach(partitionProduceData -> | ||
ProduceRequest.validateRecords(version, partitionProduceData.records()))); | ||
return new ProduceRequest(data, version); | ||
} | ||
|
||
|
@@ -244,4 +234,5 @@ public static ProduceRequest parse(ByteBuffer buffer, short version) { | |
public static boolean isTransactionV2Requested(short version) { | ||
return version > LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2; | ||
} | ||
|
||
} |
Uh oh!
There was an error while loading. Please reload this page.