Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18157:Consider UnsupportedVersionException child class to represent the case of unsupported fields #18072

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

public class UnsupportedProtocolFieldException extends UnsupportedVersionException {
private static final long serialVersionUID = 1L;

public UnsupportedProtocolFieldException(String fieldOrValue, String apiKeyName, int apiVersion, int lowestSupportedVersion) {
super("The cluster does not support [" + fieldOrValue + "] in " + apiKeyName + " API version " + apiVersion +
". Upgrade the cluster to " + apiKeyName + " API version >= " + lowestSupportedVersion +
" to enable [" + fieldOrValue + "].");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
import org.apache.kafka.common.message.CreateAclsRequestData;
import org.apache.kafka.common.message.CreateAclsRequestData.AclCreation;
import org.apache.kafka.common.message.CreateAclsResponseData;
Expand All @@ -33,8 +33,10 @@
import org.apache.kafka.common.resource.ResourceType;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

public class CreateAclsRequest extends AbstractRequest {

Expand Down Expand Up @@ -91,8 +93,13 @@ private void validate(CreateAclsRequestData data) {
if (version() == 0) {
final boolean unsupported = data.creations().stream().anyMatch(creation ->
creation.resourcePatternType() != PatternType.LITERAL.code());
if (unsupported)
throw new UnsupportedVersionException("Version 0 only supports literal resource pattern types");
if (unsupported) {
String unsupportedType = Arrays.stream(PatternType.values())
.filter(type -> type != PatternType.LITERAL)
.map(PatternType::name)
.collect(Collectors.joining(","));
throw new UnsupportedProtocolFieldException(unsupportedType, apiKey().name(), version(), 1);
}
}

final boolean unknown = data.creations().stream().anyMatch(creation ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.CreateTopicsResponseData;
Expand All @@ -40,8 +40,7 @@ public Builder(CreateTopicsRequestData data) {
@Override
public CreateTopicsRequest build(short version) {
if (data.validateOnly() && version == 0)
throw new UnsupportedVersionException("validateOnly is not supported in version 0 of " +
"CreateTopicsRequest");
throw new UnsupportedProtocolFieldException("validateOnly", apiKey().name(), version, 1);

final List<String> topicsWithDefaults = data.topics()
.stream()
Expand All @@ -53,10 +52,7 @@ public CreateTopicsRequest build(short version) {
.collect(Collectors.toList());

if (!topicsWithDefaults.isEmpty() && version < 4) {
throw new UnsupportedVersionException("Creating topics with default "
+ "partitions/replication factor are only supported in CreateTopicRequest "
+ "version 4+. The following topics need values for partitions and replicas: "
+ topicsWithDefaults);
throw new UnsupportedProtocolFieldException(String.join(",", topicsWithDefaults), apiKey().name(), version, 4);
}

return new CreateTopicsRequest(data, version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
import org.apache.kafka.common.message.DeleteAclsRequestData;
import org.apache.kafka.common.message.DeleteAclsRequestData.DeleteAclsFilter;
import org.apache.kafka.common.message.DeleteAclsResponseData;
Expand All @@ -32,6 +32,7 @@
import org.apache.kafka.common.resource.ResourceType;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -77,9 +78,13 @@ private void normalizeAndValidate() {
// to LITERAL. Note that the wildcard `*` is considered `LITERAL` for compatibility reasons.
if (patternType == PatternType.ANY)
filter.setPatternTypeFilter(PatternType.LITERAL.code());
else if (patternType != PatternType.LITERAL)
throw new UnsupportedVersionException("Version 0 does not support pattern type " +
patternType + " (only LITERAL and ANY are supported)");
else if (patternType != PatternType.LITERAL) {
String unsupportedTypes = Arrays.stream(PatternType.values())
.filter(type -> type != PatternType.ANY && type != PatternType.LITERAL)
.map(PatternType::name)
.collect(Collectors.joining(","));
throw new UnsupportedProtocolFieldException(unsupportedTypes, apiKey().name(), version(), 1);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
import org.apache.kafka.common.message.DescribeAclsRequestData;
import org.apache.kafka.common.message.DescribeAclsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
Expand All @@ -30,6 +30,8 @@
import org.apache.kafka.common.resource.ResourceType;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.stream.Collectors;

public class DescribeAclsRequest extends AbstractRequest {

Expand Down Expand Up @@ -77,8 +79,13 @@ private void normalizeAndValidate(short version) {
// to LITERAL. Note that the wildcard `*` is considered `LITERAL` for compatibility reasons.
if (patternType == PatternType.ANY)
data.setPatternTypeFilter(PatternType.LITERAL.code());
else if (patternType != PatternType.LITERAL)
throw new UnsupportedVersionException("Version 0 only supports literal resource pattern types");
else if (patternType != PatternType.LITERAL) {
String unsupportedTypes = Arrays.stream(PatternType.values())
.filter(type -> type != PatternType.LITERAL)
.map(PatternType::name)
.collect(Collectors.joining(","));
throw new UnsupportedProtocolFieldException(unsupportedTypes, apiKey().name(), version, 1);
}
}

if (data.patternTypeFilter() == PatternType.UNKNOWN.code()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
Expand All @@ -30,6 +30,7 @@

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -65,7 +66,11 @@ public String toString() {

private ElectLeadersRequestData toRequestData(short version) {
if (electionType != ElectionType.PREFERRED && version == 0) {
throw new UnsupportedVersionException("API Version 0 only supports PREFERRED election type");
String unsupportedTypes = Arrays.stream(ElectionType.values())
.filter(type -> type != ElectionType.PREFERRED)
.map(ElectionType::name)
.collect(Collectors.joining(","));
throw new UnsupportedProtocolFieldException(unsupportedTypes, apiKey().name(), version, 1);
}

ElectLeadersRequestData data = new ElectLeadersRequestData()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
Expand All @@ -43,8 +44,7 @@ public Builder(FindCoordinatorRequestData data) {
@Override
public FindCoordinatorRequest build(short version) {
if (version < 1 && data.keyType() == CoordinatorType.TRANSACTION.id()) {
throw new UnsupportedVersionException("Cannot create a v" + version + " FindCoordinator request " +
"because we require features supported only in 2 or later.");
throw new UnsupportedProtocolFieldException(CoordinatorType.TRANSACTION.name(), apiKey().name(), version, 2);
}
int batchedKeys = data.coordinatorKeys().size();
if (version < MIN_BATCHED_VERSION) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
Expand All @@ -38,8 +38,7 @@ public Builder(HeartbeatRequestData data) {
@Override
public HeartbeatRequest build(short version) {
if (data.groupInstanceId() != null && version < 3) {
throw new UnsupportedVersionException("The broker heartbeat protocol version " +
version + " does not support usage of config group.instance.id.");
throw new UnsupportedProtocolFieldException("GroupInstanceId", apiKey().name(), version, 3);
}
return new HeartbeatRequest(data, version);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
Expand All @@ -42,8 +42,7 @@ public Builder(JoinGroupRequestData data) {
@Override
public JoinGroupRequest build(short version) {
if (data.groupInstanceId() != null && version < 5) {
throw new UnsupportedVersionException("The broker join group protocol version " +
version + " does not support usage of config group.instance.id.");
throw new UnsupportedProtocolFieldException("GroupInstanceId", apiKey().name(), version, 5);
}
return new JoinGroupRequest(data, version);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
Expand Down Expand Up @@ -47,12 +47,10 @@ public Builder(ListGroupsRequestData data) {
@Override
public ListGroupsRequest build(short version) {
if (!data.statesFilter().isEmpty() && version < 4) {
throw new UnsupportedVersionException("The broker only supports ListGroups " +
"v" + version + ", but we need v4 or newer to request groups by states.");
throw new UnsupportedProtocolFieldException("StatesFilter", apiKey().name(), version, 4);
}
if (!data.typesFilter().isEmpty() && version < 5) {
throw new UnsupportedVersionException("The broker only supports ListGroups " +
"v" + version + ", but we need v5 or newer to request groups by type.");
throw new UnsupportedProtocolFieldException("TypesFilter", apiKey().name(), version, 5);
}
return new ListGroupsRequest(data, version);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
import org.apache.kafka.common.message.ListTransactionsRequestData;
import org.apache.kafka.common.message.ListTransactionsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
Expand All @@ -37,8 +37,7 @@ public Builder(ListTransactionsRequestData data) {
@Override
public ListTransactionsRequest build(short version) {
if (data.durationFilter() >= 0 && version < 1) {
throw new UnsupportedVersionException("Duration filter can be set only when using API version 1 or higher." +
" If client is connected to an older broker, do not specify duration filter or set duration filter to -1.");
throw new UnsupportedProtocolFieldException("DurationFilter", apiKey().name(), version, 1);
}
return new ListTransactionsRequest(data, version);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic;
Expand Down Expand Up @@ -105,16 +106,13 @@ public MetadataRequest build(short version) {
if (version < 1)
throw new UnsupportedVersionException("MetadataRequest versions older than 1 are not supported.");
if (!data.allowAutoTopicCreation() && version < 4)
throw new UnsupportedVersionException("MetadataRequest versions older than 4 don't support the " +
"allowAutoTopicCreation field");
throw new UnsupportedProtocolFieldException("allowAutoTopicCreation", apiKey().name(), version, 4);
if (data.topics() != null) {
data.topics().forEach(topic -> {
if (topic.name() == null && version < 12)
throw new UnsupportedVersionException("MetadataRequest version " + version +
" does not support null topic names.");
throw new UnsupportedProtocolFieldException("null topic names", apiKey().name(), version, 12);
if (!Uuid.ZERO_UUID.equals(topic.topicId()) && version < 12)
throw new UnsupportedVersionException("MetadataRequest version " + version +
" does not support non-zero topic IDs.");
throw new UnsupportedProtocolFieldException("non-zero topic IDs", apiKey().name(), version, 12);
});
}
return new MetadataRequest(data, version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic;
import org.apache.kafka.common.message.OffsetCommitResponseData;
Expand Down Expand Up @@ -58,8 +58,7 @@ public Builder(OffsetCommitRequestData data) {
@Override
public OffsetCommitRequest build(short version) {
if (data.groupInstanceId() != null && version < 7) {
throw new UnsupportedVersionException("The broker offset commit protocol version " +
version + " does not support usage of config group.instance.id.");
throw new UnsupportedProtocolFieldException("GroupInstanceId", apiKey().name(), version, 7);
}
return new OffsetCommitRequest(data, version);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
Expand Down Expand Up @@ -143,8 +144,7 @@ public OffsetFetchRequest build(short version) {
}
if (data.requireStable() && version < 7) {
if (throwOnFetchStableOffsetsUnsupported) {
throw new UnsupportedVersionException("Broker unexpectedly " +
"doesn't support requireStable flag on version " + version);
throw new UnsupportedProtocolFieldException("RequireStable", apiKey().name(), version, 7);
} else {
log.trace("Fallback the requireStable flag to false as broker " +
"only supports OffsetFetchRequest version {}. Need " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
Expand All @@ -41,8 +41,7 @@ public Builder(SyncGroupRequestData data) {
@Override
public SyncGroupRequest build(short version) {
if (data.groupInstanceId() != null && version < 3) {
throw new UnsupportedVersionException("The broker sync group protocol version " +
version + " does not support usage of config group.instance.id.");
throw new UnsupportedProtocolFieldException("GroupInstanceId", apiKey().name(), version, 3);
}
return new SyncGroupRequest(data, version);
}
Expand Down
Loading
Loading