diff --git a/mqtt-impl/pom.xml b/mqtt-impl/pom.xml
index 514fa899b..4e29faef1 100644
--- a/mqtt-impl/pom.xml
+++ b/mqtt-impl/pom.xml
@@ -20,7 +20,7 @@
pulsar-protocol-handler-mqtt-parent
io.streamnative.pulsar.handlers
- 2.10.0.0-rc4
+ 3.4.0-SNAPSHOT
4.0.0
pulsar-protocol-handler-mqtt
diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/AbstractQosPublishHandler.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/AbstractQosPublishHandler.java
index 34f155ffb..eadaf4469 100644
--- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/AbstractQosPublishHandler.java
+++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/AbstractQosPublishHandler.java
@@ -25,7 +25,7 @@
import io.streamnative.pulsar.handlers.mqtt.utils.PulsarTopicUtils;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -63,7 +63,7 @@ protected CompletableFuture> getTopicReference(String mqttTopicN
});
}
- protected CompletableFuture writeToPulsarTopic(Connection connection, MqttPublishMessage msg) {
+ protected CompletableFuture writeToPulsarTopic(Connection connection, MqttPublishMessage msg) {
return writeToPulsarTopic(connection, msg, false);
}
@@ -73,7 +73,7 @@ protected CompletableFuture writeToPulsarTopic(Connection connecti
* @param checkSubscription Check if the subscription exists, throw #{MQTTNoMatchingSubscriberException}
* if the subscription does not exist;
*/
- protected CompletableFuture writeToPulsarTopic(Connection connection, MqttPublishMessage msg,
+ protected CompletableFuture writeToPulsarTopic(Connection connection, MqttPublishMessage msg,
boolean checkSubscription) {
TopicAliasManager topicAliasManager = connection.getTopicAliasManager();
String producerName = connection.getClientId();
@@ -102,7 +102,7 @@ protected CompletableFuture writeToPulsarTopic(Connection connecti
return getTopicReference(mqttTopicName).thenCompose(topicOp -> topicOp.map(topic -> {
MessageImpl message = toPulsarMsg(configuration, topic, msg.variableHeader().properties(),
msg.payload().nioBuffer());
- CompletableFuture ret = MessagePublishContext.publishMessages(producerName, message, topic);
+ CompletableFuture ret = MessagePublishContext.publishMessages(producerName, message, topic);
message.recycle();
return ret.thenApply(position -> {
if (checkSubscription && topic.getSubscriptions().isEmpty()) {
diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/PulsarServiceLookupHandler.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/PulsarServiceLookupHandler.java
index 3fa548985..b223a6ab9 100644
--- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/PulsarServiceLookupHandler.java
+++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/PulsarServiceLookupHandler.java
@@ -32,13 +32,13 @@
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.Backoff;
-import org.apache.pulsar.client.impl.BackoffBuilder;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.ScheduledExecutorProvider;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.Backoff;
+import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java
index dc6117c1f..df7e5b4b7 100644
--- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java
+++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java
@@ -71,7 +71,9 @@
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.bookkeeper.mledger.impl.AckSetStateUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
@@ -158,15 +160,15 @@ public void processPubAck(MqttAdapterMessage adapter) {
int packetId = msg.variableHeader().messageId();
OutstandingPacket packet = outstandingPacketContainer.remove(packetId);
if (packet != null) {
- PositionImpl position;
+ Position position;
if (packet.isBatch()) {
long[] ackSets = new long[packet.getBatchSize()];
for (int i = 0; i < packet.getBatchSize(); i++) {
ackSets[i] = packet.getBatchIndex() == i ? 0 : 1;
}
- position = PositionImpl.get(packet.getLedgerId(), packet.getEntryId(), ackSets);
+ position = AckSetStateUtil.createPositionWithAckSet(packet.getLedgerId(), packet.getEntryId(), ackSets);
} else {
- position = PositionImpl.get(packet.getLedgerId(), packet.getEntryId());
+ position = PositionFactory.create(packet.getLedgerId(), packet.getEntryId());
}
packet.getConsumer().getSubscription().acknowledgeMessage(Collections.singletonList(position),
CommandAck.AckType.Individual, Collections.emptyMap());
diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/systemtopic/SystemTopicBasedSystemEventService.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/systemtopic/SystemTopicBasedSystemEventService.java
index 12510fab0..9e8117455 100644
--- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/systemtopic/SystemTopicBasedSystemEventService.java
+++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/systemtopic/SystemTopicBasedSystemEventService.java
@@ -31,10 +31,10 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.util.RetryUtil;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.FutureUtil;
/**
diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MessagePublishContext.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MessagePublishContext.java
index f7a7d6874..bd694e8f3 100644
--- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MessagePublishContext.java
+++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MessagePublishContext.java
@@ -20,7 +20,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.Topic.PublishContext;
import org.apache.pulsar.client.api.Message;
@@ -34,7 +35,7 @@ public final class MessagePublishContext implements PublishContext {
private String producerName;
private Topic topic;
private long startTimeNs;
- private CompletableFuture positionFuture;
+ private CompletableFuture positionFuture;
/**
* Executed from managed ledger thread when the message is persisted.
@@ -50,13 +51,13 @@ public void completed(Exception exception, long ledgerId, long entryId) {
topic.getName(), ledgerId, entryId);
}
topic.recordAddLatency(System.nanoTime() - startTimeNs, TimeUnit.NANOSECONDS);
- positionFuture.complete(PositionImpl.get(ledgerId, entryId));
+ positionFuture.complete(PositionFactory.create(ledgerId, entryId));
}
recycle();
}
// recycler
- public static MessagePublishContext get(CompletableFuture positionFuture, String producerName,
+ public static MessagePublishContext get(CompletableFuture positionFuture, String producerName,
Topic topic, long startTimeNs) {
MessagePublishContext callback = RECYCLER.get();
callback.positionFuture = positionFuture;
@@ -92,9 +93,9 @@ public void recycle() {
/**
* publish mqtt message to pulsar topic, no batch.
*/
- public static CompletableFuture publishMessages(String producerName, Message message,
- Topic topic) {
- CompletableFuture future = new CompletableFuture<>();
+ public static CompletableFuture publishMessages(String producerName, Message message,
+ Topic topic) {
+ CompletableFuture future = new CompletableFuture<>();
ByteBuf headerAndPayload = messageToByteBuf(message);
topic.publishMessage(headerAndPayload,
diff --git a/pom.xml b/pom.xml
index 1a8f9ba0d..7566b4657 100644
--- a/pom.xml
+++ b/pom.xml
@@ -19,7 +19,7 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
io.streamnative.pulsar.handlers
pulsar-protocol-handler-mqtt-parent
- 2.10.0.0-rc4
+ 3.4.0-SNAPSHOT
StreamNative :: Pulsar Protocol Handler :: MoP Parent
Parent for MQTT on Pulsar implemented using Pulsar Protocol Handler.
@@ -49,7 +49,7 @@
2.22.0
6.14.3
4.0.2
- 3.2.0-SNAPSHOT
+ 3.4.0-SNAPSHOT
4.1.94.Final
2.18.0
1.16
@@ -376,12 +376,10 @@
default
https://repo1.maven.org/maven2
-
snapshot
https://s01.oss.sonatype.org/content/repositories/snapshots
-
false
@@ -390,5 +388,23 @@
bintray
https://yahoo.bintray.com/maven
+
+ ossrh
+ https://s01.oss.sonatype.org/service/local/repositories/0/content
+
+
+ nexus-snapshot
+ https://s01.oss.sonatype.org/content/repositories/snapshots
+
+
+
+ ossrh
+ https://s01.oss.sonatype.org/content/repositories/snapshots
+
+
+ ossrh
+ https://s01.oss.sonatype.org/service/local/staging/deploy/maven2/
+
+
diff --git a/tests/pom.xml b/tests/pom.xml
index 592d39b77..64897485d 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -20,7 +20,7 @@
pulsar-protocol-handler-mqtt-parent
io.streamnative.pulsar.handlers
- 2.10.0.0-rc4
+ 3.4.0-SNAPSHOT
4.0.0
pulsar-protocol-handler-mqtt-tests
diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTProtocolHandlerTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTProtocolHandlerTestBase.java
index c52b311a4..9e05df2d0 100644
--- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTProtocolHandlerTestBase.java
+++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTProtocolHandlerTestBase.java
@@ -22,7 +22,6 @@
import io.netty.channel.EventLoopGroup;
import io.streamnative.pulsar.handlers.mqtt.MQTTCommonConfiguration;
import io.streamnative.pulsar.handlers.mqtt.utils.ConfigurationUtils;
-import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URL;
@@ -33,6 +32,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
@@ -459,18 +459,19 @@ public void reallyShutdown() {
private BookKeeperClientFactory mockBookKeeperClientFactory = new BookKeeperClientFactory() {
@Override
- public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store, EventLoopGroup eventLoopGroup,
- Optional> ensemblePlacementPolicyClass,
- Map ensemblePlacementPolicyProperties) throws IOException {
- return mockBookKeeper;
+ public CompletableFuture create(ServiceConfiguration conf, MetadataStoreExtended store,
+ EventLoopGroup eventLoopGroup,
+ Optional> ensemblePlacementPolicyClass,
+ Map ensemblePlacementPolicyProperties) {
+ return CompletableFuture.completedFuture(mockBookKeeper);
}
@Override
- public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store, EventLoopGroup eventLoopGroup,
- Optional> ensemblePlacementPolicyClass,
- Map ensemblePlacementPolicyProperties, StatsLogger statsLogger)
- throws IOException {
- return mockBookKeeper;
+ public CompletableFuture create(ServiceConfiguration conf, MetadataStoreExtended store,
+ EventLoopGroup eventLoopGroup,
+ Optional> ensemblePlacementPolicyClass,
+ Map ensemblePlacementPolicyProperties, StatsLogger statsLogger) {
+ return CompletableFuture.completedFuture(mockBookKeeper);
}
@Override