From 2b522283a287e341ca68aa952f801c3d63f4a142 Mon Sep 17 00:00:00 2001 From: frankvicky Date: Fri, 6 Dec 2024 12:25:06 +0800 Subject: [PATCH] KAFKA-18157:Consider UnsupportedVersionException child class to represent the case of unsupported fields --- .../UnsupportedProtocolFieldException.java | 27 +++++++++++++++++++ .../common/requests/CreateAclsRequest.java | 13 ++++++--- .../common/requests/CreateTopicsRequest.java | 10 +++---- .../common/requests/DeleteAclsRequest.java | 13 ++++++--- .../common/requests/DescribeAclsRequest.java | 13 ++++++--- .../common/requests/ElectLeadersRequest.java | 9 +++++-- .../requests/FindCoordinatorRequest.java | 4 +-- .../common/requests/HeartbeatRequest.java | 5 ++-- .../common/requests/JoinGroupRequest.java | 5 ++-- .../common/requests/ListGroupsRequest.java | 8 +++--- .../requests/ListTransactionsRequest.java | 5 ++-- .../common/requests/MetadataRequest.java | 10 +++---- .../common/requests/OffsetCommitRequest.java | 5 ++-- .../common/requests/OffsetFetchRequest.java | 4 +-- .../common/requests/SyncGroupRequest.java | 5 ++-- .../requests/UpdateMetadataRequest.java | 11 ++++++-- .../kafka/clients/NetworkClientTest.java | 4 +-- .../common/requests/RequestResponseTest.java | 8 +++--- 18 files changed, 102 insertions(+), 57 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/UnsupportedProtocolFieldException.java diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedProtocolFieldException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedProtocolFieldException.java new file mode 100644 index 0000000000000..50246af5592bb --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedProtocolFieldException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class UnsupportedProtocolFieldException extends UnsupportedVersionException { + private static final long serialVersionUID = 1L; + + public UnsupportedProtocolFieldException(String fieldOrValue, String apiKeyName, int apiVersion, int lowestSupportedVersion) { + super("The cluster does not support [" + fieldOrValue + "] in " + apiKeyName + " API version " + apiVersion + + ". Upgrade the cluster to " + apiKeyName + " API version >= " + lowestSupportedVersion + + " to enable [" + fieldOrValue + "]."); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java index 29df8326bc5a6..de1d8eacf6980 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java @@ -21,7 +21,7 @@ import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; -import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.errors.UnsupportedProtocolFieldException; import org.apache.kafka.common.message.CreateAclsRequestData; import org.apache.kafka.common.message.CreateAclsRequestData.AclCreation; import org.apache.kafka.common.message.CreateAclsResponseData; @@ -33,8 +33,10 @@ import org.apache.kafka.common.resource.ResourceType; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; public class CreateAclsRequest extends AbstractRequest { @@ -91,8 +93,13 @@ private void validate(CreateAclsRequestData data) { if (version() == 0) { final boolean unsupported = data.creations().stream().anyMatch(creation -> creation.resourcePatternType() != PatternType.LITERAL.code()); - if (unsupported) - throw new UnsupportedVersionException("Version 0 only supports literal resource pattern types"); + if (unsupported) { + String unsupportedType = Arrays.stream(PatternType.values()) + .filter(type -> type != PatternType.LITERAL) + .map(PatternType::name) + .collect(Collectors.joining(",")); + throw new UnsupportedProtocolFieldException(unsupportedType, apiKey().name(), version(), 1); + } } final boolean unknown = data.creations().stream().anyMatch(creation -> diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java index 6f713fcdaf3c2..56b49f8bb0f2a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.common.requests; -import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.errors.UnsupportedProtocolFieldException; import org.apache.kafka.common.message.CreateTopicsRequestData; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; import org.apache.kafka.common.message.CreateTopicsResponseData; @@ -40,8 +40,7 @@ public Builder(CreateTopicsRequestData data) { @Override public CreateTopicsRequest build(short version) { if (data.validateOnly() && version == 0) - throw new UnsupportedVersionException("validateOnly is not supported in version 0 of " + - "CreateTopicsRequest"); + throw new UnsupportedProtocolFieldException("validateOnly", apiKey().name(), version, 1); final List topicsWithDefaults = data.topics() .stream() @@ -53,10 +52,7 @@ public CreateTopicsRequest build(short version) { .collect(Collectors.toList()); if (!topicsWithDefaults.isEmpty() && version < 4) { - throw new UnsupportedVersionException("Creating topics with default " - + "partitions/replication factor are only supported in CreateTopicRequest " - + "version 4+. The following topics need values for partitions and replicas: " - + topicsWithDefaults); + throw new UnsupportedProtocolFieldException(String.join(",", topicsWithDefaults), apiKey().name(), version, 4); } return new CreateTopicsRequest(data, version); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java index fea08e38be8f0..86574f0e230ba 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java @@ -20,7 +20,7 @@ import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; -import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.errors.UnsupportedProtocolFieldException; import org.apache.kafka.common.message.DeleteAclsRequestData; import org.apache.kafka.common.message.DeleteAclsRequestData.DeleteAclsFilter; import org.apache.kafka.common.message.DeleteAclsResponseData; @@ -32,6 +32,7 @@ import org.apache.kafka.common.resource.ResourceType; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -77,9 +78,13 @@ private void normalizeAndValidate() { // to LITERAL. Note that the wildcard `*` is considered `LITERAL` for compatibility reasons. if (patternType == PatternType.ANY) filter.setPatternTypeFilter(PatternType.LITERAL.code()); - else if (patternType != PatternType.LITERAL) - throw new UnsupportedVersionException("Version 0 does not support pattern type " + - patternType + " (only LITERAL and ANY are supported)"); + else if (patternType != PatternType.LITERAL) { + String unsupportedTypes = Arrays.stream(PatternType.values()) + .filter(type -> type != PatternType.ANY && type != PatternType.LITERAL) + .map(PatternType::name) + .collect(Collectors.joining(",")); + throw new UnsupportedProtocolFieldException(unsupportedTypes, apiKey().name(), version(), 1); + } } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java index 1ddf5bf99fc89..e242e757537f1 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java @@ -20,7 +20,7 @@ import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; -import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.errors.UnsupportedProtocolFieldException; import org.apache.kafka.common.message.DescribeAclsRequestData; import org.apache.kafka.common.message.DescribeAclsResponseData; import org.apache.kafka.common.protocol.ApiKeys; @@ -30,6 +30,8 @@ import org.apache.kafka.common.resource.ResourceType; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.stream.Collectors; public class DescribeAclsRequest extends AbstractRequest { @@ -77,8 +79,13 @@ private void normalizeAndValidate(short version) { // to LITERAL. Note that the wildcard `*` is considered `LITERAL` for compatibility reasons. if (patternType == PatternType.ANY) data.setPatternTypeFilter(PatternType.LITERAL.code()); - else if (patternType != PatternType.LITERAL) - throw new UnsupportedVersionException("Version 0 only supports literal resource pattern types"); + else if (patternType != PatternType.LITERAL) { + String unsupportedTypes = Arrays.stream(PatternType.values()) + .filter(type -> type != PatternType.LITERAL) + .map(PatternType::name) + .collect(Collectors.joining(",")); + throw new UnsupportedProtocolFieldException(unsupportedTypes, apiKey().name(), version, 1); + } } if (data.patternTypeFilter() == PatternType.UNKNOWN.code() diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java index dae4086569f24..e8e077194b090 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java @@ -19,7 +19,7 @@ import org.apache.kafka.common.ElectionType; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.errors.UnsupportedProtocolFieldException; import org.apache.kafka.common.message.ElectLeadersRequestData; import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions; import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; @@ -30,6 +30,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -65,7 +66,11 @@ public String toString() { private ElectLeadersRequestData toRequestData(short version) { if (electionType != ElectionType.PREFERRED && version == 0) { - throw new UnsupportedVersionException("API Version 0 only supports PREFERRED election type"); + String unsupportedTypes = Arrays.stream(ElectionType.values()) + .filter(type -> type != ElectionType.PREFERRED) + .map(ElectionType::name) + .collect(Collectors.joining(",")); + throw new UnsupportedProtocolFieldException(unsupportedTypes, apiKey().name(), version, 1); } ElectLeadersRequestData data = new ElectLeadersRequestData() diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java index 26cfc809d995a..3e83023f1cd49 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.UnsupportedProtocolFieldException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.FindCoordinatorRequestData; import org.apache.kafka.common.message.FindCoordinatorResponseData; @@ -43,8 +44,7 @@ public Builder(FindCoordinatorRequestData data) { @Override public FindCoordinatorRequest build(short version) { if (version < 1 && data.keyType() == CoordinatorType.TRANSACTION.id()) { - throw new UnsupportedVersionException("Cannot create a v" + version + " FindCoordinator request " + - "because we require features supported only in 2 or later."); + throw new UnsupportedProtocolFieldException(CoordinatorType.TRANSACTION.name(), apiKey().name(), version, 2); } int batchedKeys = data.coordinatorKeys().size(); if (version < MIN_BATCHED_VERSION) { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java index 482e61a255a8e..1c6ceb11722f3 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.common.requests; -import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.errors.UnsupportedProtocolFieldException; import org.apache.kafka.common.message.HeartbeatRequestData; import org.apache.kafka.common.message.HeartbeatResponseData; import org.apache.kafka.common.protocol.ApiKeys; @@ -38,8 +38,7 @@ public Builder(HeartbeatRequestData data) { @Override public HeartbeatRequest build(short version) { if (data.groupInstanceId() != null && version < 3) { - throw new UnsupportedVersionException("The broker heartbeat protocol version " + - version + " does not support usage of config group.instance.id."); + throw new UnsupportedProtocolFieldException("GroupInstanceId", apiKey().name(), version, 3); } return new HeartbeatRequest(data, version); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java index 946f7849bf14c..6ffc1ff6a1f5f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java @@ -17,7 +17,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.errors.InvalidConfigurationException; -import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.errors.UnsupportedProtocolFieldException; import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.message.JoinGroupRequestData; import org.apache.kafka.common.message.JoinGroupResponseData; @@ -42,8 +42,7 @@ public Builder(JoinGroupRequestData data) { @Override public JoinGroupRequest build(short version) { if (data.groupInstanceId() != null && version < 5) { - throw new UnsupportedVersionException("The broker join group protocol version " + - version + " does not support usage of config group.instance.id."); + throw new UnsupportedProtocolFieldException("GroupInstanceId", apiKey().name(), version, 5); } return new JoinGroupRequest(data, version); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java index 05ced8202d9d5..763ad232b2d2f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.common.requests; -import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.errors.UnsupportedProtocolFieldException; import org.apache.kafka.common.message.ListGroupsRequestData; import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.protocol.ApiKeys; @@ -47,12 +47,10 @@ public Builder(ListGroupsRequestData data) { @Override public ListGroupsRequest build(short version) { if (!data.statesFilter().isEmpty() && version < 4) { - throw new UnsupportedVersionException("The broker only supports ListGroups " + - "v" + version + ", but we need v4 or newer to request groups by states."); + throw new UnsupportedProtocolFieldException("StatesFilter", apiKey().name(), version, 4); } if (!data.typesFilter().isEmpty() && version < 5) { - throw new UnsupportedVersionException("The broker only supports ListGroups " + - "v" + version + ", but we need v5 or newer to request groups by type."); + throw new UnsupportedProtocolFieldException("TypesFilter", apiKey().name(), version, 5); } return new ListGroupsRequest(data, version); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsRequest.java index a5fef3ee7b29a..64e35c8c0d224 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsRequest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.common.requests; -import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.errors.UnsupportedProtocolFieldException; import org.apache.kafka.common.message.ListTransactionsRequestData; import org.apache.kafka.common.message.ListTransactionsResponseData; import org.apache.kafka.common.protocol.ApiKeys; @@ -37,8 +37,7 @@ public Builder(ListTransactionsRequestData data) { @Override public ListTransactionsRequest build(short version) { if (data.durationFilter() >= 0 && version < 1) { - throw new UnsupportedVersionException("Duration filter can be set only when using API version 1 or higher." + - " If client is connected to an older broker, do not specify duration filter or set duration filter to -1."); + throw new UnsupportedProtocolFieldException("DurationFilter", apiKey().name(), version, 1); } return new ListTransactionsRequest(data, version); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java index 2e60e04b2aa06..7c9f25aeeb39c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.UnsupportedProtocolFieldException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.MetadataRequestData; import org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic; @@ -105,16 +106,13 @@ public MetadataRequest build(short version) { if (version < 1) throw new UnsupportedVersionException("MetadataRequest versions older than 1 are not supported."); if (!data.allowAutoTopicCreation() && version < 4) - throw new UnsupportedVersionException("MetadataRequest versions older than 4 don't support the " + - "allowAutoTopicCreation field"); + throw new UnsupportedProtocolFieldException("allowAutoTopicCreation", apiKey().name(), version, 4); if (data.topics() != null) { data.topics().forEach(topic -> { if (topic.name() == null && version < 12) - throw new UnsupportedVersionException("MetadataRequest version " + version + - " does not support null topic names."); + throw new UnsupportedProtocolFieldException("null topic names", apiKey().name(), version, 12); if (!Uuid.ZERO_UUID.equals(topic.topicId()) && version < 12) - throw new UnsupportedVersionException("MetadataRequest version " + version + - " does not support non-zero topic IDs."); + throw new UnsupportedProtocolFieldException("non-zero topic IDs", apiKey().name(), version, 12); }); } return new MetadataRequest(data, version); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java index 88111b1007717..f5155305141eb 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -17,7 +17,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.errors.UnsupportedProtocolFieldException; import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic; import org.apache.kafka.common.message.OffsetCommitResponseData; @@ -58,8 +58,7 @@ public Builder(OffsetCommitRequestData data) { @Override public OffsetCommitRequest build(short version) { if (data.groupInstanceId() != null && version < 7) { - throw new UnsupportedVersionException("The broker offset commit protocol version " + - version + " does not support usage of config group.instance.id."); + throw new UnsupportedProtocolFieldException("GroupInstanceId", apiKey().name(), version, 7); } return new OffsetCommitRequest(data, version); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java index 7ece0700bfa96..ab292246bf49d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.UnsupportedProtocolFieldException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.OffsetFetchRequestData; import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup; @@ -143,8 +144,7 @@ public OffsetFetchRequest build(short version) { } if (data.requireStable() && version < 7) { if (throwOnFetchStableOffsetsUnsupported) { - throw new UnsupportedVersionException("Broker unexpectedly " + - "doesn't support requireStable flag on version " + version); + throw new UnsupportedProtocolFieldException("RequireStable", apiKey().name(), version, 7); } else { log.trace("Fallback the requireStable flag to false as broker " + "only supports OffsetFetchRequest version {}. Need " + diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java index d2da48fb018ee..675a32bded35a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.common.requests; -import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.errors.UnsupportedProtocolFieldException; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.protocol.ApiKeys; @@ -41,8 +41,7 @@ public Builder(SyncGroupRequestData data) { @Override public SyncGroupRequest build(short version) { if (data.groupInstanceId() != null && version < 3) { - throw new UnsupportedVersionException("The broker sync group protocol version " + - version + " does not support usage of config group.instance.id."); + throw new UnsupportedProtocolFieldException("GroupInstanceId", apiKey().name(), version, 3); } return new SyncGroupRequest(data, version); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java index 15a4dfff1a6a0..ce5136a3d0b65 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.UnsupportedProtocolFieldException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.UpdateMetadataRequestData; import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataBroker; @@ -33,6 +34,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -78,8 +80,13 @@ public UpdateMetadataRequest build(short version) { if (version == 0) { if (broker.endpoints().size() != 1) throw new UnsupportedVersionException("UpdateMetadataRequest v0 requires a single endpoint"); - if (broker.endpoints().get(0).securityProtocol() != SecurityProtocol.PLAINTEXT.id) - throw new UnsupportedVersionException("UpdateMetadataRequest v0 only handles PLAINTEXT endpoints"); + if (broker.endpoints().get(0).securityProtocol() != SecurityProtocol.PLAINTEXT.id) { + String unsupportedProtocol = Arrays.stream(SecurityProtocol.values()) + .filter(protocol -> protocol != SecurityProtocol.PLAINTEXT) + .map(SecurityProtocol::name) + .collect(Collectors.joining(",")); + throw new UnsupportedProtocolFieldException(unsupportedProtocol, apiKey().name(), version, 1); + } // Don't null out `endpoints` since it's ignored by the generated code if version >= 1 UpdateMetadataEndpoint endpoint = broker.endpoints().get(0); broker.setV0Host(endpoint.host()); diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index ce7d4d83506d1..4e5b1c6847dba 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -21,7 +21,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.RebootstrapRequiredException; -import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.errors.UnsupportedProtocolFieldException; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.message.ApiMessageType; import org.apache.kafka.common.message.ApiVersionsResponseData; @@ -240,7 +240,7 @@ public void testUnsupportedVersionDuringInternalMetadataRequest() { // disabling auto topic creation for versions less than 4 is not supported MetadataRequest.Builder builder = new MetadataRequest.Builder(topics, false, (short) 3); client.sendInternalMetadataRequest(builder, node.idString(), time.milliseconds()); - assertEquals(UnsupportedVersionException.class, metadataUpdater.getAndClearFailure().getClass()); + assertEquals(UnsupportedProtocolFieldException.class, metadataUpdater.getAndClearFailure().getClass()); } @Test diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 445bff499bdd4..c068eaf33c622 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.errors.NotEnoughReplicasException; import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedProtocolFieldException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.AddOffsetsToTxnRequestData; import org.apache.kafka.common.message.AddOffsetsToTxnResponseData; @@ -652,8 +653,8 @@ public void testCreateTopicRequestV0FailsIfValidateOnly() { @Test public void testCreateTopicRequestV3FailsIfNoPartitionsOrReplicas() { - final UnsupportedVersionException exception = assertThrows( - UnsupportedVersionException.class, () -> { + final UnsupportedProtocolFieldException exception = assertThrows( + UnsupportedProtocolFieldException.class, () -> { CreateTopicsRequestData data = new CreateTopicsRequestData() .setTimeoutMs(123) .setValidateOnly(false); @@ -668,8 +669,7 @@ public void testCreateTopicRequestV3FailsIfNoPartitionsOrReplicas() { new Builder(data).build((short) 3); }); - assertTrue(exception.getMessage().contains("supported in CreateTopicRequest version 4+")); - assertTrue(exception.getMessage().contains("[foo, bar]")); + assertTrue(exception.getMessage().contains("does not support [foo,bar] in CREATE_TOPICS API version 3")); } @Test