Skip to content

Commit

Permalink
Handle Pulsar batch message make sure dispatch batch can dispatch cor…
Browse files Browse the repository at this point in the history
…rectly. (#11)
  • Loading branch information
codelipenghui committed Jun 3, 2020
1 parent 3dcc30f commit dc82ff0
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.streamnative.pulsar.handlers.mqtt.OutstandingPacket;
import io.streamnative.pulsar.handlers.mqtt.OutstandingPacketContainer;
Expand Down Expand Up @@ -78,8 +79,10 @@ public ChannelPromise sendMessages(List<Entry> entries, EntryBatchSizes batchSiz
outstandingPacketContainer.add(new OutstandingPacket(this, packetId, entry.getLedgerId(),
entry.getEntryId()));
}
cnx.ctx().channel().write(PulsarMessageConverter.toMQTTMsg(topicName, entry,
packetId, qos));
for (MqttPublishMessage msg : PulsarMessageConverter.toMqttMessages(topicName, entry,
packetId, qos)) {
cnx.ctx().channel().write(msg);
}
}
if (MqttQoS.AT_MOST_ONCE == qos) {
incrementPermits(totalMessages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.streamnative.pulsar.handlers.mqtt.support.MessageBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
Expand All @@ -31,6 +36,7 @@
/**
* Tools for converting MQTT message to Pulsar message and Pulsar message to MQTT message.
*/
@Slf4j
public class PulsarMessageConverter {

private static final Schema<byte[]> SCHEMA = Schema.BYTES;
Expand All @@ -42,16 +48,43 @@ public static MessageImpl<byte[]> toPulsarMsg(MqttPublishMessage mqttMsg) {
return MessageImpl.create(metadataBuilder, mqttMsg.payload().nioBuffer(), SCHEMA);
}

public static MqttPublishMessage toMQTTMsg(String topicName, Entry entry, int messageId, MqttQoS qos) {
public static List<MqttPublishMessage> toMqttMessages(String topicName, Entry entry, int messageId, MqttQoS qos) {
ByteBuf metadataAndPayload = entry.getDataBuffer();
Commands.skipMessageMetadata(metadataAndPayload);
return MessageBuilder.publish()
PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
if (metadata.hasNumMessagesInBatch()) {
int batchSize = metadata.getNumMessagesInBatch();
metadata.recycle();
List<MqttPublishMessage> response = new ArrayList<>(batchSize);
try {
for (int i = 0; i < batchSize; i++) {
PulsarApi.SingleMessageMetadata.Builder single = PulsarApi.SingleMessageMetadata.newBuilder();
ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(metadataAndPayload,
single,
i, batchSize);
response.add(MessageBuilder.publish()
.messageId(messageId)
.payload(singleMessagePayload)
.topicName(topicName)
.qos(qos)
.retained(false)
.build());
single.recycle();
}
return response;
} catch (IOException e) {
log.error("Error decoding batch for message {}. Whole batch will be included in output",
entry.getPosition(), e);
return Collections.emptyList();
}
} else {
return Collections.singletonList(MessageBuilder.publish()
.messageId(messageId)
.payload(metadataAndPayload)
.topicName(topicName)
.qos(qos)
.retained(false)
.build();
.build());
}
}

// convert message to ByteBuf payload for ledger.addEntry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.google.common.collect.Sets;
import io.streamnative.pulsar.handlers.mqtt.base.MQTTProtocolHandlerTestBase;

import java.util.Arrays;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.client.api.Consumer;
Expand All @@ -34,13 +33,15 @@
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/**
* Simple integration tests for MQTT protocol handler.
*/
@Slf4j
public class SimpleIntegrationTest extends MQTTProtocolHandlerTestBase {

@BeforeClass
@Override
public void setup() throws Exception {
Expand Down Expand Up @@ -77,6 +78,14 @@ public void cleanup() throws Exception {
super.internalCleanup();
}

@DataProvider(name = "batchEnabled")
public Object[][] batchEnabled() {
return new Object[][] {
{ true },
{ false }
};
}

@Test
public void testSimpleMqttPubAndSubQos0() throws Exception {
final String topicName = "persistent://public/default/qos0";
Expand Down Expand Up @@ -136,8 +145,8 @@ public void testSendByMqttAndReceiveByPulsar() throws Exception {
connection.disconnect();
}

@Test
public void testSendByPulsarAndReceiveByMqtt() throws Exception {
@Test(dataProvider = "batchEnabled")
public void testSendByPulsarAndReceiveByMqtt(boolean batchEnabled) throws Exception {
final String topicName = "persistent://public/default/testSendByPulsarAndReceiveByMqtt";
MQTT mqtt = new MQTT();
mqtt.setHost("127.0.0.1", 1883);
Expand All @@ -148,15 +157,13 @@ public void testSendByPulsarAndReceiveByMqtt() throws Exception {

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
.enableBatching(false)
.enableBatching(batchEnabled)
.create();

String message = "Hello MQTT";

producer.newMessage().value(message).send();
producer.newMessage().value(message).sendAsync();
Message received = connection.receive();
System.out.println(Arrays.toString(message.getBytes()));
System.out.println(Arrays.toString(received.getPayload()));
Assert.assertEquals(new String(received.getPayload()), message);
received.ack();
connection.disconnect();
Expand Down Expand Up @@ -218,4 +225,37 @@ public void testBacklogShouldBeZeroWithQos1() throws Exception {
.subscriptions.entrySet().iterator().next().getValue().msgBacklog, 0);
connection.disconnect();
}

@Test(dataProvider = "batchEnabled")
public void testBacklogShouldBeZeroWithQos0AndSendByPulsar(boolean batchEnabled) throws Exception {
final String topicName = "persistent://public/default/testBacklogShouldBeZeroWithQos0AndSendByPulsar-"
+ batchEnabled;
MQTT mqtt = new MQTT();
mqtt.setHost("127.0.0.1", 1883);
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
Topic[] topics = { new Topic(topicName, QoS.AT_MOST_ONCE) };
connection.subscribe(topics);
String message = "Hello MQTT";

int messages = 10000;
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
.blockIfQueueFull(true)
.enableBatching(batchEnabled)
.create();
for (int i = 0; i < messages; i++) {
producer.sendAsync(message + i);
}

for (int i = 0; i < messages; i++) {
Message received = connection.receive();
Assert.assertEquals(new String(received.getPayload()), (message + i));
}

Assert.assertEquals(admin.topics().getStats(topicName).subscriptions.size(), 1);
Assert.assertEquals(admin.topics().getStats(topicName)
.subscriptions.entrySet().iterator().next().getValue().msgBacklog, 0);
connection.disconnect();
}
}

0 comments on commit dc82ff0

Please sign in to comment.