在ConsumerOffsetManager进行消费位点提交的时候如果在这个情况下有高并发会有一定概率丢失其中的一个队列的消费位点吗? #9448
-
private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
if (null == map) {
map = new ConcurrentHashMap<Integer, Long>(32);
map.put(queueId, offset);
this.offsetTable.put(key, map);
} else {
Long storeOffset = map.put(queueId, offset);
if (storeOffset != null && offset < storeOffset) {
LOG.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
}
}
if (versionChangeCounter.incrementAndGet() % brokerController.getBrokerConfig().getConsumerOffsetUpdateVersionStep() == 0) {
long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
dataVersion.nextVersion(stateMachineVersion);
}
} 在这段代码里,假设两个请求来自两个监听同一个 感觉有点绕进去了,求解答。 |
Beta Was this translation helpful? Give feedback.
Answered by
wangjinjing1
Jun 5, 2025
Replies: 1 comment 4 replies
-
看了代码,因为这个地方没有加锁,如果第一个请求走到map.put(queueId, offset)时候,又一个进来确实会有这个问题,这样必须在消费端校验是否消费过了,不然确实会重复消费 |
Beta Was this translation helpful? Give feedback.
4 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
加锁确实能解决问题,如果消费时处理的业务、问题很多,那就会很慢,但是会拖慢整个业务链的时间。在高并发的情况下本身就要求系统响应快,这样可能会导致消息堆积,像重复消费这种问题,最简单解决方法就是消费前校验下,是否已经成功处理过了,如果处理过了,直接跳过该条消息