From f79a0ee6ab44ae38cc88bfccda299208c94bc026 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 19 Apr 2021 11:12:19 +0800 Subject: [PATCH] Bump pulsar to 2.7.1.4-rc-202104151209 (#48) --- mqtt-impl/pom.xml | 2 +- .../handlers/mqtt/support/Qos1PublishHandler.java | 9 +++++++++ .../handlers/mqtt/support/Qos2PublishHandler.java | 5 ++++- .../pulsar/handlers/mqtt/utils/PulsarTopicUtils.java | 3 +++ pom.xml | 4 ++-- tests/pom.xml | 2 +- .../streamnative/pulsar/handlers/mqtt/ProxyTest.java | 10 +++++----- 7 files changed, 25 insertions(+), 10 deletions(-) diff --git a/mqtt-impl/pom.xml b/mqtt-impl/pom.xml index 46891383d..280e5c695 100644 --- a/mqtt-impl/pom.xml +++ b/mqtt-impl/pom.xml @@ -20,7 +20,7 @@ pulsar-protocol-handler-mqtt-parent io.streamnative.pulsar.handlers - 2.7.1.3 + 2.7.1.4-rc-202104151209 4.0.0 io.streamnative.pulsar.handlers diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos1PublishHandler.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos1PublishHandler.java index 06c617ddc..148a5a8ff 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos1PublishHandler.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos1PublishHandler.java @@ -27,6 +27,8 @@ import io.streamnative.pulsar.handlers.mqtt.utils.NettyUtils; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.zookeeper.KeeperException; /** * Publish handler implementation for Qos 1. @@ -51,6 +53,13 @@ public void receivePublish(Channel channel, MqttPublishMessage msg) { sendPubAck(topic, clientId, msg.variableHeader().packetId()); } else { log.error("[{}] Write {} to Pulsar topic failed.", topic, msg, e); + Throwable cause = e.getCause(); + if (cause instanceof BrokerServiceException.ServerMetadataException) { + cause = cause.getCause(); + if (cause instanceof KeeperException.NoNodeException) { + channel.close(); + } + } } }); } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos2PublishHandler.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos2PublishHandler.java index db983b293..34b64d87f 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos2PublishHandler.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos2PublishHandler.java @@ -18,12 +18,14 @@ import io.streamnative.pulsar.handlers.mqtt.AbstractQosPublishHandler; import io.streamnative.pulsar.handlers.mqtt.ConnectionDescriptorStore; import io.streamnative.pulsar.handlers.mqtt.MQTTServerConfiguration; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarService; /** * Publish handler implementation for Qos 2. */ +@Slf4j public class Qos2PublishHandler extends AbstractQosPublishHandler { public Qos2PublishHandler(PulsarService pulsarService, MQTTServerConfiguration configuration, @@ -33,6 +35,7 @@ public Qos2PublishHandler(PulsarService pulsarService, MQTTServerConfiguration c @Override public void receivePublish(Channel channel, MqttPublishMessage msg) { - + log.error("[{}] Failed to write data due to QoS2 does not support.", msg.variableHeader().topicName()); + channel.close(); } } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarTopicUtils.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarTopicUtils.java index 31e8af43a..63a265863 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarTopicUtils.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarTopicUtils.java @@ -65,6 +65,9 @@ public static CompletableFuture getOrCreateSubscription(PulsarServ promise.complete(subscription); } } + }).exceptionally(ex -> { + promise.completeExceptionally(ex); + return null; }); return promise; } diff --git a/pom.xml b/pom.xml index 36a9a5b97..27d6b1625 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> io.streamnative.pulsar.handlers pulsar-protocol-handler-mqtt-parent - 2.7.1.3 + 2.7.1.4-rc-202104151209 StreamNative :: Pulsar Protocol Handler :: MoP Parent Parent for MQTT on Pulsar implemented using Pulsar Protocol Handler. @@ -48,7 +48,7 @@ 1.18.4 2.22.0 6.14.3 - 2.7.1.3 + 2.7.1.4-rc-202104151209 4.0.2 4.1.49.Final 3.4 diff --git a/tests/pom.xml b/tests/pom.xml index d273f1887..0aaf4c7aa 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -20,7 +20,7 @@ pulsar-protocol-handler-mqtt-parent io.streamnative.pulsar.handlers - 2.7.1.3 + 2.7.1.4-rc-202104151209 4.0.0 diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/ProxyTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/ProxyTest.java index abc14ddac..d0160e1ed 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/ProxyTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/ProxyTest.java @@ -21,8 +21,8 @@ import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.Topic; import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -31,13 +31,13 @@ */ @Slf4j public class ProxyTest extends MQTTTestBase { - @BeforeClass + @BeforeMethod @Override protected void setup() throws Exception { super.setup(); } - @AfterClass + @AfterMethod @Override protected void cleanup() throws Exception { super.cleanup(); @@ -53,7 +53,7 @@ public Object[][] mqttTopicNames() { }; } - @Test(dataProvider = "mqttTopicNames") + @Test(dataProvider = "mqttTopicNames", timeOut = 60000) public void mqttProxyTest(String topicName) throws Exception { setBrokerCount(3); int proxyPort = getProxyPort();