diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/impl/consumer/MessagePublishContext.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/impl/consumer/MessagePublishContext.java index c588885f..c6e1cf60 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/impl/consumer/MessagePublishContext.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/impl/consumer/MessagePublishContext.java @@ -36,6 +36,7 @@ public final class MessagePublishContext implements PublishContext { private Topic topic; private long startTimeNs; private CompletableFuture positionFuture; + private long sequenceId; /** * Executed from managed ledger thread when the message is persisted. @@ -58,11 +59,12 @@ public void completed(Exception exception, long ledgerId, long entryId) { // recycler public static MessagePublishContext get(CompletableFuture positionFuture, String producerName, - Topic topic, long startTimeNs) { + Topic topic, long sequenceId, long startTimeNs) { MessagePublishContext callback = RECYCLER.get(); callback.positionFuture = positionFuture; callback.producerName = producerName; callback.topic = topic; + callback.sequenceId = sequenceId; callback.startTimeNs = startTimeNs; return callback; } @@ -77,6 +79,12 @@ public String getProducerName() { return producerName; } + @Override + public long getSequenceId() { + return this.sequenceId; + } + + private static final Recycler RECYCLER = new Recycler() { protected MessagePublishContext newObject(Handle handle) { return new MessagePublishContext(handle); @@ -87,6 +95,7 @@ public void recycle() { positionFuture = null; topic = null; startTimeNs = -1; + sequenceId = -1; recyclerHandle.recycle(this); } @@ -94,12 +103,12 @@ public void recycle() { * publish mqtt message to pulsar topic, no batch. */ public static CompletableFuture publishMessages(String producerName, Message message, - Topic topic) { + long sequenceId, Topic topic) { CompletableFuture future = new CompletableFuture<>(); ByteBuf headerAndPayload = messageToByteBuf(message); topic.publishMessage(headerAndPayload, - MessagePublishContext.get(future, producerName, topic, System.nanoTime())); + MessagePublishContext.get(future, producerName, topic, sequenceId, System.nanoTime())); headerAndPayload.release(); return future; } diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/AbstractQosPublishHandler.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/AbstractQosPublishHandler.java index eadb807e..716c126a 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/AbstractQosPublishHandler.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/AbstractQosPublishHandler.java @@ -29,11 +29,13 @@ import io.streamnative.pulsar.handlers.mqtt.common.utils.PulsarTopicUtils; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import org.apache.bookkeeper.mledger.Position; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.common.util.FutureUtil; @@ -45,6 +47,8 @@ public abstract class AbstractQosPublishHandler implements QosPublishHandler { protected final PulsarService pulsarService; protected final RetainedMessageHandler retainedMessageHandler; protected final MQTTServerConfiguration configuration; + private final ConcurrentHashMap sequenceIdMap = new ConcurrentHashMap<>(); + protected AbstractQosPublishHandler(MQTTService mqttService) { this.pulsarService = mqttService.getPulsarService(); @@ -104,9 +108,23 @@ protected CompletableFuture writeToPulsarTopic(Connection connection, mqttTopicName = msg.variableHeader().topicName(); } return getTopicReference(mqttTopicName).thenCompose(topicOp -> topicOp.map(topic -> { + long lastPublishedSequenceId = -1; + if (topic instanceof PersistentTopic) { + final long lastPublishedId = ((PersistentTopic) topic).getLastPublishedSequenceId(producerName); + lastPublishedSequenceId = sequenceIdMap.compute(producerName, (k, v) -> { + long id; + if (v == null) { + id = lastPublishedId + 1; + } else { + id = Math.max(v, lastPublishedId) + 1; + } + return id; + }); + } MessageImpl message = toPulsarMsg(configuration, topic, msg.variableHeader().properties(), msg.payload().nioBuffer()); - CompletableFuture ret = MessagePublishContext.publishMessages(producerName, message, topic); + CompletableFuture ret = MessagePublishContext.publishMessages(producerName, message, + lastPublishedSequenceId, topic); message.recycle(); return ret.thenApply(position -> { if (checkSubscription && topic.getSubscriptions().isEmpty()) { diff --git a/tests/pom.xml b/tests/pom.xml index ec8729ed..8efb9e95 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -34,6 +34,12 @@ ${project.version} test + + io.streamnative.pulsar.handlers + pulsar-protocol-handler-mqtt-common + ${project.version} + test + io.streamnative testmocks diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/base/SimpleBrokerEnableDedupTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/base/SimpleBrokerEnableDedupTest.java index 74ffd778..6569126c 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/base/SimpleBrokerEnableDedupTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/base/SimpleBrokerEnableDedupTest.java @@ -53,4 +53,22 @@ public void testSendAndConsume() throws Exception { received.ack(); connection.disconnect(); } + + @Test + public void testDedup() throws Exception { + MQTT mqtt = createMQTTClient(); + String topicName = "testDedup"; + BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + Topic[] topics = { new Topic(topicName, QoS.AT_MOST_ONCE) }; + connection.subscribe(topics); + String message = "Hello MQTT"; + for (int i = 1; i <= 10; i++) { + connection.publish(topicName, (message + i).getBytes(), QoS.AT_MOST_ONCE, false); + Message received = connection.receive(); + Assert.assertEquals(received.getTopic(), topicName); + Assert.assertEquals(new String(received.getPayload()), message + i); + } + connection.disconnect(); + } }