Skip to content

Commit

Permalink
Add sample proxy for mop (#18)
Browse files Browse the repository at this point in the history
Add sample proxy for mop

Signed-off-by: xiaolong.ran <[email protected]>
  • Loading branch information
wolfstudy authored Aug 31, 2020
1 parent b801ad3 commit 0d69c64
Show file tree
Hide file tree
Showing 18 changed files with 1,415 additions and 80 deletions.
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,33 @@ add configurations in Pulsar's configuration file, such as `broker.conf` or `sta
After you have installed the MoP protocol handler to Pulsar broker, you can restart the Pulsar brokers to load MoP.
### How to use Proxy
To use proxy, complete the following steps. For detailed steps, refer to [Deploy a cluster on bare metal](http://pulsar.apache.org/docs/en/deploy-bare-metal/).
1. Prepare a ZooKeeper cluster.
2. Initialize the cluster metadata.
3. Prepare a BookKeeper cluster.
4. Copy the `pulsar-protocol-handler-mqtt-${version}.nar` to the `$PULSAR_HOME/protocols` directory.
5. Start broker.
broker config
```yaml
messagingProtocols=mqtt
protocolHandlerDirectory=./protocols
brokerServicePort=6651
mqttListeners=mqtt://127.0.0.1:1883
advertisedAddress=127.0.0.1
mqttProxyEnable=true
mqttProxyPort=5682
```
### Verify MoP
There are many MQTT client can be used to verify MoP such as http://workswithweb.com/mqttbox.html, https://www.hivemq.com/mqtt-toolbox. You can choose a cli tool or interface tool to verify the MoP.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void channelRead(ChannelHandlerContext ctx, Object message) {
MqttMessage msg = (MqttMessage) message;
MqttMessageType messageType = msg.fixedHeader().messageType();
if (log.isDebugEnabled()) {
log.info("Processing MQTT message, type={}", messageType);
log.info("Processing MQTT Inbound handler message, type={}", messageType);
}
try {
switch (messageType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
import com.google.common.collect.ImmutableMap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.streamnative.pulsar.handlers.mqtt.proxy.ProxyConfiguration;
import io.streamnative.pulsar.handlers.mqtt.proxy.ProxyService;
import io.streamnative.pulsar.handlers.mqtt.utils.ConfigurationUtils;
import java.net.InetSocketAddress;
import java.util.Map;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.protocol.ProtocolHandler;
Expand Down Expand Up @@ -82,6 +85,26 @@ public String getProtocolDataToAdvertise() {
public void start(BrokerService brokerService) {
this.brokerService = brokerService;

if (mqttConfig.isMqttProxyEnable()) {
ProxyConfiguration proxyConfig = new ProxyConfiguration();
proxyConfig.setMqttTenant(mqttConfig.getDefaultTenant());
proxyConfig.setMqttMaxNoOfChannels(mqttConfig.getMaxNoOfChannels());
proxyConfig.setMqttMaxFrameSize(mqttConfig.getMaxFrameSize());
proxyConfig.setMqttHeartBeat(mqttConfig.getHeartBeat());
proxyConfig.setMqttProxyPort(mqttConfig.getMqttProxyPort());
proxyConfig.setBrokerServiceURL("pulsar://" + PulsarService.advertisedAddress(mqttConfig) + ":"
+ mqttConfig.getBrokerServicePort().get());
log.info("proxyConfig broker service URL: {}", proxyConfig.getBrokerServiceURL());
ProxyService proxyService = new ProxyService(proxyConfig, brokerService.getPulsar());
try {
proxyService.start();
log.info("Start amqp proxy service at port: {}", proxyConfig.getMqttProxyPort());
} catch (Exception e) {
log.error("Failed to start amqp proxy service.");
}
}


log.info("Starting AmqpProtocolHandler, aop version is: '{}'", MopVersion.getVersion());
log.info("Git Revision {}", MopVersion.getGitSha());
log.info("Built by {} on {} at {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public class MQTTServerConfiguration extends ServiceConfiguration {

@Category
private static final String CATEGORY_MQTT = "MQTT on Pulsar";
@Category
private static final String CATEGORY_MQTT_PROXY = "MQTT Proxy";

//
// --- MQTT on Pulsar Broker configuration ---
Expand Down Expand Up @@ -74,4 +76,18 @@ public class MQTTServerConfiguration extends ServiceConfiguration {
doc = "Default Pulsar namespace that the MQTT server used."
)
private String defaultNamespace = "default";

@FieldContext(
category = CATEGORY_MQTT_PROXY,
required = false,
doc = "The mqtt proxy port"
)
private int mqttProxyPort = 5682;

@FieldContext(
category = CATEGORY_MQTT_PROXY,
required = false,
doc = "Whether start mqtt protocol handler with proxy"
)
private boolean mqttProxyEnable = false;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.mqtt.proxy;

import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.naming.TopicName;

/**
* Lookup handler.
*/
public interface LookupHandler {

/**
* Find broker for protocolHandler.
*
* @param topicName namespaceName
* @param protocolHandlerName protocolHandler name
* @return Pair consist of brokerHost and brokerPort
*/
CompletableFuture<Pair<String, Integer>> findBroker(TopicName topicName,
String protocolHandlerName) throws Exception;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.mqtt.proxy;

import lombok.Getter;
import lombok.Setter;
import org.apache.pulsar.common.configuration.Category;
import org.apache.pulsar.common.configuration.FieldContext;

/**
* Configuration for MQTT proxy service.
*/
@Getter
@Setter
public class ProxyConfiguration {

@Category
private static final String CATEGORY_MQTT = "MQTT on Pulsar";
@Category
private static final String CATEGORY_MQTT_PROXY = "MQTT Proxy";
@Category
private static final String CATEGORY_BROKER_DISCOVERY = "Broker Discovery";

@FieldContext(
category = CATEGORY_MQTT,
required = true,
doc = "Mqtt on Pulsar Broker tenant"
)
private String mqttTenant = "public";

@FieldContext(
category = CATEGORY_MQTT,
required = true,
doc = "The maximum number of channels which can exist concurrently on a connection."
)
private int mqttMaxNoOfChannels = 64;

@FieldContext(
category = CATEGORY_MQTT,
required = true,
doc = "The maximum frame size on a connection."
)
private int mqttMaxFrameSize = 4 * 1024 * 1024;

@FieldContext(
category = CATEGORY_MQTT,
required = true,
doc = "The default heartbeat timeout on broker"
)
private int mqttHeartBeat = 60 * 1000;

@FieldContext(
category = CATEGORY_MQTT_PROXY,
required = false,
doc = "The mqtt proxy port"
)
private int mqttProxyPort = 5682;

@FieldContext(
category = CATEGORY_BROKER_DISCOVERY,
doc = "The service url points to the broker cluster"
)
private String brokerServiceURL;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.mqtt.proxy;

import static com.google.common.base.Preconditions.checkState;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import io.streamnative.pulsar.handlers.mqtt.ProtocolMethodProcessor;
import java.util.ArrayList;
import java.util.List;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.PulsarClientException;

/**
* Proxy connection.
*/
@Slf4j
public class ProxyConnection extends ChannelInboundHandlerAdapter{
private final ProtocolMethodProcessor processor;
private ProxyService proxyService;
private ProxyConfiguration proxyConfig;
@Getter
private ChannelHandlerContext cnx;
private State state;
private ProxyHandler proxyHandler;

private LookupHandler lookupHandler;

private List<Object> connectMsgList = new ArrayList<>();

private enum State {
Init,
RedirectLookup,
RedirectToBroker,
Closed
}

public ProxyConnection(ProxyService proxyService) throws PulsarClientException {
log.info("ProxyConnection init ...");
this.proxyService = proxyService;
this.proxyConfig = proxyService.getProxyConfig();
lookupHandler = proxyService.getLookupHandler();
processor = new ProxyInboundHandler(proxyService, this);
state = State.Init;
}

@Override
public void channelActive(ChannelHandlerContext cnx) throws Exception {
super.channelActive(cnx);
this.cnx = cnx;
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
this.close();
}


@Override
public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception {
MqttMessage msg = (MqttMessage) message;
MqttMessageType messageType = msg.fixedHeader().messageType();

if (log.isDebugEnabled()) {
log.info("Processing MQTT message, type={}", messageType);
}
try {
switch (messageType) {
case CONNECT:
checkState(msg instanceof MqttConnectMessage);
processor.processConnect(ctx.channel(), (MqttConnectMessage) msg);
break;
case SUBSCRIBE:
checkState(msg instanceof MqttSubscribeMessage);
processor.processSubscribe(ctx.channel(), (MqttSubscribeMessage) msg);
break;
case UNSUBSCRIBE:
checkState(msg instanceof MqttUnsubscribeMessage);
processor.processUnSubscribe(ctx.channel(), (MqttUnsubscribeMessage) msg);
break;
case PUBLISH:
checkState(msg instanceof MqttPublishMessage);
processor.processPublish(ctx.channel(), (MqttPublishMessage) msg);
break;
case PUBREC:
processor.processPubRec(ctx.channel(), msg);
break;
case PUBCOMP:
processor.processPubComp(ctx.channel(), msg);
break;
case PUBREL:
processor.processPubRel(ctx.channel(), msg);
break;
case DISCONNECT:
processor.processDisconnect(ctx.channel());
break;
case PUBACK:
checkState(msg instanceof MqttPubAckMessage);
processor.processPubAck(ctx.channel(), (MqttPubAckMessage) msg);
break;
case PINGREQ:
MqttFixedHeader pingHeader = new MqttFixedHeader(
MqttMessageType.PINGRESP,
false,
AT_MOST_ONCE,
false,
0);
MqttMessage pingResp = new MqttMessage(pingHeader);
ctx.writeAndFlush(pingResp);
break;
default:
log.error("Unkonwn MessageType:{}", messageType);
break;
}
} catch (Throwable ex) {
log.error("Exception was caught while processing MQTT message, " + ex.getCause(), ex);
ctx.fireExceptionCaught(ex);
ctx.close();
}
}

public void resetProxyHandler() {
if (proxyHandler != null) {
proxyHandler.close();
proxyHandler = null;
}
}

public void close() {
if (log.isDebugEnabled()) {
log.debug("ProxyConnection close.");
}

if (proxyHandler != null) {
resetProxyHandler();
}
if (cnx != null) {
cnx.close();
}
state = State.Closed;
}

}
Loading

0 comments on commit 0d69c64

Please sign in to comment.