diff --git a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTPersistentSessionHandler.java b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTPersistentSessionHandler.java index 01e715828..a21f59922 100644 --- a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTPersistentSessionHandler.java +++ b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTPersistentSessionHandler.java @@ -406,14 +406,6 @@ private void cancelScheduledHint() { } } - private void scheduleConfirmTimeout(long upToSeq) { - confirmTimeout = ctx.executor().schedule(() -> { - if (upToSeq < inboxConfirmedUpToSeq) { - confirmSendBuffer(); - } - }, ThreadLocalRandom.current().nextLong(15, 45), TimeUnit.SECONDS); - } - private void confirmQoS0() { if (qos0Confirming) { return; @@ -503,9 +495,9 @@ private void confirmSendBuffer() { handleProtocolResponse(helper().onInboxTransientError(v.getCode().name())); case BACK_PRESSURE_REJECTED -> { inboxConfirming = false; - if (upToSeq < inboxConfirmedUpToSeq) { - scheduleConfirmTimeout(upToSeq); - } + // schedule confirm later + confirmTimeout = ctx.executor() + .schedule(this::confirmSendBuffer, ThreadLocalRandom.current().nextLong(15, 45), TimeUnit.SECONDS); } case TRY_LATER -> { // try again with same version diff --git a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTPersistentS2CPubTest.java b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTPersistentS2CPubTest.java index cfa2ea407..86779559d 100644 --- a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTPersistentS2CPubTest.java +++ b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTTPersistentS2CPubTest.java @@ -245,6 +245,30 @@ public void qoS2PubAuthFailed() { verify(inboxClient, times(messageCount)).unsub(any()); } + @Test + public void retryCommitAfterBackPressure() { + mockAuthCheck(true); + when(inboxClient.commit(any())) + .thenReturn(CompletableFuture.completedFuture( + CommitReply.newBuilder().setCode(CommitReply.Code.BACK_PRESSURE_REJECTED).build())) + .thenReturn(CompletableFuture.completedFuture( + CommitReply.newBuilder().setCode(CommitReply.Code.OK).build())); + + inboxFetchConsumer.accept(fetch(1, 128, AT_LEAST_ONCE)); + channel.runPendingTasks(); + + MqttPublishMessage message = channel.readOutbound(); + assertEquals(message.fixedHeader().qosLevel().value(), QoS.AT_LEAST_ONCE_VALUE); + channel.writeInbound(MQTTMessageUtils.pubAckMessage(message.variableHeader().packetId())); + channel.runPendingTasks(); + + channel.advanceTimeBy(45, TimeUnit.SECONDS); + channel.runScheduledPendingTasks(); + channel.runPendingTasks(); + + verify(inboxClient, times(2)).commit(argThat(CommitRequest::hasSendBufferUpToSeq)); + } + @Test public void fetchTryLater() { inboxFetchConsumer.accept(Fetched.newBuilder().setResult(Result.TRY_LATER).build());