From cbd4ea97064299edfa49114d128c9f1452ef7c8a Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Thu, 20 Jun 2024 15:44:29 +0800 Subject: [PATCH] Upgrade Pulsar from 2.10.0 to 3.4.0 (#1377) --- mqtt-impl/pom.xml | 2 +- .../mqtt/AbstractQosPublishHandler.java | 8 +++---- .../proxy/PulsarServiceLookupHandler.java | 4 ++-- .../MQTTBrokerProtocolMethodProcessor.java | 10 ++++---- .../SystemTopicBasedSystemEventService.java | 2 +- .../mqtt/utils/MessagePublishContext.java | 15 ++++++------ pom.xml | 24 +++++++++++++++---- tests/pom.xml | 2 +- .../base/MQTTProtocolHandlerTestBase.java | 21 ++++++++-------- 9 files changed, 54 insertions(+), 34 deletions(-) diff --git a/mqtt-impl/pom.xml b/mqtt-impl/pom.xml index 514fa899b..4e29faef1 100644 --- a/mqtt-impl/pom.xml +++ b/mqtt-impl/pom.xml @@ -20,7 +20,7 @@ pulsar-protocol-handler-mqtt-parent io.streamnative.pulsar.handlers - 2.10.0.0-rc4 + 3.4.0-SNAPSHOT 4.0.0 pulsar-protocol-handler-mqtt diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/AbstractQosPublishHandler.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/AbstractQosPublishHandler.java index 34f155ffb..eadaf4469 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/AbstractQosPublishHandler.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/AbstractQosPublishHandler.java @@ -25,7 +25,7 @@ import io.streamnative.pulsar.handlers.mqtt.utils.PulsarTopicUtils; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.mledger.Position; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.BrokerServiceException; @@ -63,7 +63,7 @@ protected CompletableFuture> getTopicReference(String mqttTopicN }); } - protected CompletableFuture writeToPulsarTopic(Connection connection, MqttPublishMessage msg) { + protected CompletableFuture writeToPulsarTopic(Connection connection, MqttPublishMessage msg) { return writeToPulsarTopic(connection, msg, false); } @@ -73,7 +73,7 @@ protected CompletableFuture writeToPulsarTopic(Connection connecti * @param checkSubscription Check if the subscription exists, throw #{MQTTNoMatchingSubscriberException} * if the subscription does not exist; */ - protected CompletableFuture writeToPulsarTopic(Connection connection, MqttPublishMessage msg, + protected CompletableFuture writeToPulsarTopic(Connection connection, MqttPublishMessage msg, boolean checkSubscription) { TopicAliasManager topicAliasManager = connection.getTopicAliasManager(); String producerName = connection.getClientId(); @@ -102,7 +102,7 @@ protected CompletableFuture writeToPulsarTopic(Connection connecti return getTopicReference(mqttTopicName).thenCompose(topicOp -> topicOp.map(topic -> { MessageImpl message = toPulsarMsg(configuration, topic, msg.variableHeader().properties(), msg.payload().nioBuffer()); - CompletableFuture ret = MessagePublishContext.publishMessages(producerName, message, topic); + CompletableFuture ret = MessagePublishContext.publishMessages(producerName, message, topic); message.recycle(); return ret.thenApply(position -> { if (checkSubscription && topic.getSubscriptions().isEmpty()) { diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/PulsarServiceLookupHandler.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/PulsarServiceLookupHandler.java index 3fa548985..b223a6ab9 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/PulsarServiceLookupHandler.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/PulsarServiceLookupHandler.java @@ -32,13 +32,13 @@ import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.impl.Backoff; -import org.apache.pulsar.client.impl.BackoffBuilder; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.client.util.ScheduledExecutorProvider; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.Backoff; +import org.apache.pulsar.common.util.BackoffBuilder; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java index dc6117c1f..df7e5b4b7 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java @@ -71,7 +71,9 @@ import java.util.stream.Collectors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.bookkeeper.mledger.impl.AckSetStateUtil; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authorization.AuthorizationService; @@ -158,15 +160,15 @@ public void processPubAck(MqttAdapterMessage adapter) { int packetId = msg.variableHeader().messageId(); OutstandingPacket packet = outstandingPacketContainer.remove(packetId); if (packet != null) { - PositionImpl position; + Position position; if (packet.isBatch()) { long[] ackSets = new long[packet.getBatchSize()]; for (int i = 0; i < packet.getBatchSize(); i++) { ackSets[i] = packet.getBatchIndex() == i ? 0 : 1; } - position = PositionImpl.get(packet.getLedgerId(), packet.getEntryId(), ackSets); + position = AckSetStateUtil.createPositionWithAckSet(packet.getLedgerId(), packet.getEntryId(), ackSets); } else { - position = PositionImpl.get(packet.getLedgerId(), packet.getEntryId()); + position = PositionFactory.create(packet.getLedgerId(), packet.getEntryId()); } packet.getConsumer().getSubscription().acknowledgeMessage(Collections.singletonList(position), CommandAck.AckType.Individual, Collections.emptyMap()); diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/systemtopic/SystemTopicBasedSystemEventService.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/systemtopic/SystemTopicBasedSystemEventService.java index 12510fab0..9e8117455 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/systemtopic/SystemTopicBasedSystemEventService.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/systemtopic/SystemTopicBasedSystemEventService.java @@ -31,10 +31,10 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.client.util.RetryUtil; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.Backoff; import org.apache.pulsar.common.util.FutureUtil; /** diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MessagePublishContext.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MessagePublishContext.java index f7a7d6874..bd694e8f3 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MessagePublishContext.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MessagePublishContext.java @@ -20,7 +20,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.Topic.PublishContext; import org.apache.pulsar.client.api.Message; @@ -34,7 +35,7 @@ public final class MessagePublishContext implements PublishContext { private String producerName; private Topic topic; private long startTimeNs; - private CompletableFuture positionFuture; + private CompletableFuture positionFuture; /** * Executed from managed ledger thread when the message is persisted. @@ -50,13 +51,13 @@ public void completed(Exception exception, long ledgerId, long entryId) { topic.getName(), ledgerId, entryId); } topic.recordAddLatency(System.nanoTime() - startTimeNs, TimeUnit.NANOSECONDS); - positionFuture.complete(PositionImpl.get(ledgerId, entryId)); + positionFuture.complete(PositionFactory.create(ledgerId, entryId)); } recycle(); } // recycler - public static MessagePublishContext get(CompletableFuture positionFuture, String producerName, + public static MessagePublishContext get(CompletableFuture positionFuture, String producerName, Topic topic, long startTimeNs) { MessagePublishContext callback = RECYCLER.get(); callback.positionFuture = positionFuture; @@ -92,9 +93,9 @@ public void recycle() { /** * publish mqtt message to pulsar topic, no batch. */ - public static CompletableFuture publishMessages(String producerName, Message message, - Topic topic) { - CompletableFuture future = new CompletableFuture<>(); + public static CompletableFuture publishMessages(String producerName, Message message, + Topic topic) { + CompletableFuture future = new CompletableFuture<>(); ByteBuf headerAndPayload = messageToByteBuf(message); topic.publishMessage(headerAndPayload, diff --git a/pom.xml b/pom.xml index 1a8f9ba0d..7566b4657 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> io.streamnative.pulsar.handlers pulsar-protocol-handler-mqtt-parent - 2.10.0.0-rc4 + 3.4.0-SNAPSHOT StreamNative :: Pulsar Protocol Handler :: MoP Parent Parent for MQTT on Pulsar implemented using Pulsar Protocol Handler. @@ -49,7 +49,7 @@ 2.22.0 6.14.3 4.0.2 - 3.2.0-SNAPSHOT + 3.4.0-SNAPSHOT 4.1.94.Final 2.18.0 1.16 @@ -376,12 +376,10 @@ default https://repo1.maven.org/maven2 - snapshot https://s01.oss.sonatype.org/content/repositories/snapshots - false @@ -390,5 +388,23 @@ bintray https://yahoo.bintray.com/maven + + ossrh + https://s01.oss.sonatype.org/service/local/repositories/0/content + + + nexus-snapshot + https://s01.oss.sonatype.org/content/repositories/snapshots + + + + ossrh + https://s01.oss.sonatype.org/content/repositories/snapshots + + + ossrh + https://s01.oss.sonatype.org/service/local/staging/deploy/maven2/ + + diff --git a/tests/pom.xml b/tests/pom.xml index 592d39b77..64897485d 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -20,7 +20,7 @@ pulsar-protocol-handler-mqtt-parent io.streamnative.pulsar.handlers - 2.10.0.0-rc4 + 3.4.0-SNAPSHOT 4.0.0 pulsar-protocol-handler-mqtt-tests diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTProtocolHandlerTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTProtocolHandlerTestBase.java index c52b311a4..9e05df2d0 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTProtocolHandlerTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTProtocolHandlerTestBase.java @@ -22,7 +22,6 @@ import io.netty.channel.EventLoopGroup; import io.streamnative.pulsar.handlers.mqtt.MQTTCommonConfiguration; import io.streamnative.pulsar.handlers.mqtt.utils.ConfigurationUtils; -import java.io.IOException; import java.lang.reflect.Field; import java.net.URI; import java.net.URL; @@ -33,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; @@ -459,18 +459,19 @@ public void reallyShutdown() { private BookKeeperClientFactory mockBookKeeperClientFactory = new BookKeeperClientFactory() { @Override - public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store, EventLoopGroup eventLoopGroup, - Optional> ensemblePlacementPolicyClass, - Map ensemblePlacementPolicyProperties) throws IOException { - return mockBookKeeper; + public CompletableFuture create(ServiceConfiguration conf, MetadataStoreExtended store, + EventLoopGroup eventLoopGroup, + Optional> ensemblePlacementPolicyClass, + Map ensemblePlacementPolicyProperties) { + return CompletableFuture.completedFuture(mockBookKeeper); } @Override - public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store, EventLoopGroup eventLoopGroup, - Optional> ensemblePlacementPolicyClass, - Map ensemblePlacementPolicyProperties, StatsLogger statsLogger) - throws IOException { - return mockBookKeeper; + public CompletableFuture create(ServiceConfiguration conf, MetadataStoreExtended store, + EventLoopGroup eventLoopGroup, + Optional> ensemblePlacementPolicyClass, + Map ensemblePlacementPolicyProperties, StatsLogger statsLogger) { + return CompletableFuture.completedFuture(mockBookKeeper); } @Override