Description
In what version(s) of Spring for Apache Kafka are you seeing this issue?
3.3.6 (and most likely earlier and later versions as well)
Describe the bug
The sendAndReceive
method of the ReplyingKafkaTemplate
does not take the name of a user-defined reply header into account when it checks whether the header already exists.
boolean hasReplyTopic = headers.lastHeader(KafkaHeaders.REPLY_TOPIC) != null;
if (!hasReplyTopic && this.replyTopic != null) {
headers.add(new RecordHeader(this.replyTopicHeaderName, this.replyTopic));
if (this.replyPartition != null) {
headers.add(new RecordHeader(this.replyPartitionHeaderName, this.replyPartition));
}
}
The check uses KafkaHeaders.REPLY_TOPIC
to get the header and then uses this.replyTopicHeaderName
to set the header. If a custom header name was provided earlier via setReplyTopicHeaderName
, this name is not taken into consideration during the initial check.
This leads to duplicate headers (reply topic and reply partition) being set. In my case, this is particularly problematic because I have to provide the reply partition header in a different form - but the ReplyingKafkaTemplate overwrites it with its own header.
To Reproduce
Create a ReplyingKafkaTemplate and set a custom reply header
ReplyingKafkaTemplate<String, Message, Message> replyingKafkaTemplate = new ReplyingKafkaTemplate<>(producerFactory, repliesContainer);
replyingKafkaTemplate.setReplyTopicHeaderName("REPLY_TOPIC");
...
Create your own custom header
List<Header> headers = new ArrayList<>();
headers.add(new RecordHeader("REPLY_TOPIC", replyTopic.getBytes()));
Use the headers in the ProducerRecord
and send it via the replyingKafkaTemplate
ProducerRecord<String, Message> producerRecord = new ProducerRecord<>("my-topic", null, null, key, value, headers);
replyingKafkaTemplate .sendAndReceive(producerRecord)
Expected behavior
Since the reply topic header is already provided it will not be set by the ReplyingKafkaTemplate. The reply partition header is also not set.