diff --git a/docs/using-mqtt-over-websocket.md b/docs/using-mqtt-over-websocket.md index de0b5a4c..7d7b2314 100644 --- a/docs/using-mqtt-over-websocket.md +++ b/docs/using-mqtt-over-websocket.md @@ -54,6 +54,31 @@ mqttTlsKeyFilePath=/xxx/server.key > > Secure WebSocket Port: `8084` +### How to use Proxy + +``` +mqttProxyEnabled=true +mqttProxyTlsEnabled=true +mqttProxyTlsPskEnabled=true +mqttProxyWsEnabled=true +mqttProxyWssEnabled=true +``` + + > #### Note + > MQTT Proxy Info: + > + > Server: `broker.steamnative.io` + > + > Proxy TCP Port: `5682`` + > + > Proxy TLS Port: `5683` + > + > Proxy TLS PSK Port: `5684` + > + > Proxy WebSocket Port: `5083 + > + > Proxy Secure WebSocket Port: `5084` + ## Get Started with MQTT over WebSocket ### Install MQTT WebSocket Client diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/channel/MQTTChannelInitializer.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/channel/MQTTChannelInitializer.java index 399786b7..d2a37a7f 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/channel/MQTTChannelInitializer.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/channel/MQTTChannelInitializer.java @@ -15,23 +15,17 @@ import static org.apache.pulsar.client.impl.PulsarChannelInitializer.TLS_HANDLER; import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.http.HttpContentCompressor; -import io.netty.handler.codec.http.HttpObjectAggregator; -import io.netty.handler.codec.http.HttpServerCodec; -import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.codec.mqtt.MqttDecoder; import io.netty.handler.ssl.SslHandler; import io.netty.handler.timeout.IdleStateHandler; import io.streamnative.pulsar.handlers.mqtt.broker.MQTTServerConfiguration; import io.streamnative.pulsar.handlers.mqtt.broker.MQTTService; -import io.streamnative.pulsar.handlers.mqtt.broker.codec.MqttWebSocketCodec; -import io.streamnative.pulsar.handlers.mqtt.common.Constants; import io.streamnative.pulsar.handlers.mqtt.common.adapter.CombineAdapterHandler; import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterDecoder; import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterEncoder; import io.streamnative.pulsar.handlers.mqtt.common.psk.PSKUtils; +import io.streamnative.pulsar.handlers.mqtt.common.utils.WebSocketUtils; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; @@ -89,7 +83,7 @@ public void initChannel(SocketChannel ch) throws Exception { new SslHandler(PSKUtils.createServerEngine(ch, mqttService.getPskConfiguration()))); } if (this.enableWs) { - addWsHandler(ch.pipeline()); + WebSocketUtils.addWsHandler(ch.pipeline(), mqttConfig); } // Decoder ch.pipeline().addLast(MqttAdapterDecoder.NAME, new MqttAdapterDecoder()); @@ -101,27 +95,6 @@ public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(MQTTBrokerInboundHandler.NAME, new MQTTBrokerInboundHandler(mqttService)); } - /** - * Add websocket handler. - * @param pipeline - */ - private void addWsHandler(ChannelPipeline pipeline) { - // Encode or decode request and reply messages into HTTP messages - pipeline.addLast(Constants.HANDLER_HTTP_CODEC, new HttpServerCodec()); - - // Combine the parts of an HTTP message into a complete HTTP message - pipeline.addLast(Constants.HANDLER_HTTP_AGGREGATOR, - new HttpObjectAggregator(mqttConfig.getHttpMaxContentLength())); - - // Compress and encode HTTP messages - pipeline.addLast(Constants.HANDLER_HTTP_COMPRESSOR, new HttpContentCompressor()); - - pipeline.addLast(Constants.HANDLER_WEB_SOCKET_SERVER_PROTOCOL, - new WebSocketServerProtocolHandler(mqttConfig.getWebSocketPath(), Constants.MQTT_SUB_PROTOCOL_CSV_LIST, - true, mqttConfig.getWebSocketMaxFrameSize())); - pipeline.addLast(Constants.HANDLER_MQTT_WEB_SOCKET_CODEC, new MqttWebSocketCodec()); - } - protected PulsarSslConfiguration buildSslConfiguration(MQTTServerConfiguration config) { return PulsarSslConfiguration.builder() .tlsProvider(config.getMqttTlsProvider()) diff --git a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/MQTTCommonConfiguration.java b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/MQTTCommonConfiguration.java index 5e10edec..cdb37d96 100644 --- a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/MQTTCommonConfiguration.java +++ b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/MQTTCommonConfiguration.java @@ -124,6 +124,20 @@ public class MQTTCommonConfiguration extends ServiceConfiguration { ) private int mqttProxyTlsPskPort = 5684; + @FieldContext( + category = CATEGORY_MQTT_PROXY, + required = false, + doc = "The mqtt proxy ws port" + ) + private int mqttProxyWsPort = 5083; + + @FieldContext( + category = CATEGORY_MQTT_PROXY, + required = false, + doc = "The mqtt proxy wss port" + ) + private int mqttProxyWssPort = 5084; + @FieldContext( category = CATEGORY_MQTT_PROXY, required = false, @@ -160,6 +174,20 @@ public class MQTTCommonConfiguration extends ServiceConfiguration { ) private boolean mqttProxyTlsPskEnabled = false; + @FieldContext( + category = CATEGORY_MQTT_PROXY, + required = false, + doc = "Whether start mqtt protocol handler with proxy ws" + ) + private boolean mqttProxyWsEnabled = false; + + @FieldContext( + category = CATEGORY_MQTT_PROXY, + required = false, + doc = "Whether start mqtt protocol handler with proxy wss" + ) + private boolean mqttProxyWssEnabled = false; + @FieldContext( category = CATEGORY_MQTT_PROXY, required = false, diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/codec/MqttWebSocketCodec.java b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/codec/MqttWebSocketCodec.java similarity index 95% rename from mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/codec/MqttWebSocketCodec.java rename to mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/codec/MqttWebSocketCodec.java index acfd8768..34f140a4 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/codec/MqttWebSocketCodec.java +++ b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/codec/MqttWebSocketCodec.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.streamnative.pulsar.handlers.mqtt.broker.codec; +package io.streamnative.pulsar.handlers.mqtt.common.codec; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/codec/package-info.java b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/codec/package-info.java similarity index 90% rename from mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/codec/package-info.java rename to mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/codec/package-info.java index bbaeac08..c4dd4ca0 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/codec/package-info.java +++ b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/codec/package-info.java @@ -11,4 +11,4 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.streamnative.pulsar.handlers.mqtt.broker.codec; \ No newline at end of file +package io.streamnative.pulsar.handlers.mqtt.common.codec; \ No newline at end of file diff --git a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/utils/ConfigurationUtils.java b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/utils/ConfigurationUtils.java index 461f6f24..8b63e358 100644 --- a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/utils/ConfigurationUtils.java +++ b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/utils/ConfigurationUtils.java @@ -55,7 +55,7 @@ public final class ConfigurationUtils { "^((mqtt)(\\+ssl)?(\\+psk)?|(ws)(\\+ssl)?)://[-a-zA-Z0-9+&@#/%?=~_|!:,.;]*[-0-9+]"; public static final String PROXY_LISTENER_PATTERN = - "^(mqtt-proxy)://[-a-zA-Z0-9+&@#/%?=~_|!:,.;]*[-0-9+]"; + "^((mqtt-proxy)(\\\\+ssl)?(\\\\+psk)?|(ws-proxy)(\\\\+ssl)?)://[-a-zA-Z0-9+&@#/%?=~_|!:,.;]*[-0-9+]"; /** * Creates PulsarConfiguration and loads it with populated attribute values loaded from provided property file. diff --git a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/utils/WebSocketUtils.java b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/utils/WebSocketUtils.java new file mode 100644 index 00000000..06284d89 --- /dev/null +++ b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/utils/WebSocketUtils.java @@ -0,0 +1,48 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.mqtt.common.utils; + +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.http.HttpContentCompressor; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; +import io.streamnative.pulsar.handlers.mqtt.common.Constants; +import io.streamnative.pulsar.handlers.mqtt.common.MQTTCommonConfiguration; +import io.streamnative.pulsar.handlers.mqtt.common.codec.MqttWebSocketCodec; + +public class WebSocketUtils { + + /** + * Add websocket handler. + * + * @param pipeline + */ + public static void addWsHandler(ChannelPipeline pipeline, MQTTCommonConfiguration configuration) { + // Encode or decode request and reply messages into HTTP messages + pipeline.addLast(Constants.HANDLER_HTTP_CODEC, new HttpServerCodec()); + + // Combine the parts of an HTTP message into a complete HTTP message + pipeline.addLast(Constants.HANDLER_HTTP_AGGREGATOR, + new HttpObjectAggregator(configuration.getHttpMaxContentLength())); + + // Compress and encode HTTP messages + pipeline.addLast(Constants.HANDLER_HTTP_COMPRESSOR, new HttpContentCompressor()); + + pipeline.addLast(Constants.HANDLER_WEB_SOCKET_SERVER_PROTOCOL, + new WebSocketServerProtocolHandler(configuration.getWebSocketPath(), + Constants.MQTT_SUB_PROTOCOL_CSV_LIST, true, configuration.getWebSocketMaxFrameSize())); + pipeline.addLast(Constants.HANDLER_MQTT_WEB_SOCKET_CODEC, new MqttWebSocketCodec()); + } +} diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java index 402f2bbf..cd4b903f 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java @@ -73,6 +73,8 @@ public class MQTTProxyService implements Closeable { private Channel listenChannel; private Channel listenChannelTls; private Channel listenChannelTlsPsk; + private Channel listenChannelWs; + private Channel listenChannelWss; private final EventLoopGroup acceptorGroup; private final EventLoopGroup workerGroup; private final WebService webService; @@ -130,7 +132,7 @@ public void start() throws MQTTProxyException { serverBootstrap.channel(EventLoopUtil.getServerSocketChannelClass(workerGroup)); EventLoopUtil.enableTriggeredMode(serverBootstrap); serverBootstrap.childHandler(new MQTTProxyChannelInitializer( - this, proxyConfig, false, sslContextRefresher)); + this, proxyConfig, false, false, sslContextRefresher)); try { listenChannel = serverBootstrap.bind(proxyConfig.getMqttProxyPort()).sync().channel(); @@ -142,7 +144,7 @@ public void start() throws MQTTProxyException { if (proxyConfig.isMqttProxyTlsEnabled() || proxyConfig.isMqttProxyMTlsAuthenticationEnabled()) { ServerBootstrap tlsBootstrap = serverBootstrap.clone(); tlsBootstrap.childHandler(new MQTTProxyChannelInitializer( - this, proxyConfig, true, sslContextRefresher)); + this, proxyConfig, true, false, sslContextRefresher)); try { listenChannelTls = tlsBootstrap.bind(proxyConfig.getMqttProxyTlsPort()).sync().channel(); log.info("Started MQTT Proxy with TLS on {}", listenChannelTls.localAddress()); @@ -162,7 +164,7 @@ public void start() throws MQTTProxyException { // Add channel initializer ServerBootstrap tlsPskBootstrap = serverBootstrap.clone(); tlsPskBootstrap.childHandler(new MQTTProxyChannelInitializer( - this, proxyConfig, false, true, sslContextRefresher)); + this, proxyConfig, false, true, false, sslContextRefresher)); try { listenChannelTlsPsk = tlsPskBootstrap.bind(proxyConfig.getMqttProxyTlsPskPort()).sync().channel(); log.info("Started MQTT Proxy with TLS-PSK on {}", listenChannelTlsPsk.localAddress()); @@ -170,6 +172,30 @@ public void start() throws MQTTProxyException { throw new MQTTProxyException(e); } } + + if (proxyConfig.isMqttProxyWsEnabled()) { + ServerBootstrap wsBootstrap = serverBootstrap.clone(); + wsBootstrap.childHandler(new MQTTProxyChannelInitializer( + this, proxyConfig, false, true, sslContextRefresher)); + try { + listenChannelWs = wsBootstrap.bind(proxyConfig.getMqttProxyWsPort()).sync().channel(); + log.info("Started MQTT Proxy with WS on {}", listenChannelWs.localAddress()); + } catch (InterruptedException e) { + throw new MQTTProxyException(e); + } + } + + if (proxyConfig.isMqttProxyWssEnabled()) { + ServerBootstrap wssBootstrap = serverBootstrap.clone(); + wssBootstrap.childHandler(new MQTTProxyChannelInitializer( + this, proxyConfig, true, true, sslContextRefresher)); + try { + listenChannelWss = wssBootstrap.bind(proxyConfig.getMqttProxyWssPort()).sync().channel(); + log.info("Started MQTT Proxy with WSS on {}", listenChannelWss.localAddress()); + } catch (InterruptedException e) { + throw new MQTTProxyException(e); + } + } this.lookupHandler = new PulsarServiceLookupHandler(pulsarService, proxyConfig); this.eventService.start(); this.webService.start(); @@ -184,7 +210,7 @@ public void start0() throws MQTTProxyException { if (proxyConfig.isMqttProxyTlsEnabled() || proxyConfig.isMqttProxyMTlsAuthenticationEnabled()) { ServerBootstrap tlsBootstrap = serverBootstrap.clone(); tlsBootstrap.childHandler(new MQTTProxyChannelInitializer( - this, proxyConfig, true, sslContextRefresher)); + this, proxyConfig, true, false, sslContextRefresher)); try { listenChannelTls = tlsBootstrap.bind(proxyConfig.getMqttProxyTlsPort()).sync().channel(); log.info("Started MQTT Proxy with TLS on {}", listenChannelTls.localAddress()); @@ -204,7 +230,7 @@ public void start0() throws MQTTProxyException { // Add channel initializer ServerBootstrap tlsPskBootstrap = serverBootstrap.clone(); tlsPskBootstrap.childHandler(new MQTTProxyChannelInitializer( - this, proxyConfig, false, true, sslContextRefresher)); + this, proxyConfig, false, true, false, sslContextRefresher)); try { listenChannelTlsPsk = tlsPskBootstrap.bind(proxyConfig.getMqttProxyTlsPskPort()).sync().channel(); log.info("Started MQTT Proxy with TLS-PSK on {}", listenChannelTlsPsk.localAddress()); @@ -212,6 +238,30 @@ public void start0() throws MQTTProxyException { throw new MQTTProxyException(e); } } + + if (proxyConfig.isMqttProxyWsEnabled()) { + ServerBootstrap wsBootstrap = serverBootstrap.clone(); + wsBootstrap.childHandler(new MQTTProxyChannelInitializer( + this, proxyConfig, false, true, sslContextRefresher)); + try { + listenChannelWs = wsBootstrap.bind(proxyConfig.getMqttProxyWsPort()).sync().channel(); + log.info("Started MQTT Proxy with WS on {}", listenChannelWs.localAddress()); + } catch (InterruptedException e) { + throw new MQTTProxyException(e); + } + } + + if (proxyConfig.isMqttProxyWssEnabled()) { + ServerBootstrap wssBootstrap = serverBootstrap.clone(); + wssBootstrap.childHandler(new MQTTProxyChannelInitializer( + this, proxyConfig, true, true, sslContextRefresher)); + try { + listenChannelWss = wssBootstrap.bind(proxyConfig.getMqttProxyWssPort()).sync().channel(); + log.info("Started MQTT Proxy with WSS on {}", listenChannelWss.localAddress()); + } catch (InterruptedException e) { + throw new MQTTProxyException(e); + } + } this.lookupHandler = new PulsarServiceLookupHandler(pulsarService, proxyConfig); this.eventService.start(); } @@ -227,6 +277,12 @@ public void close() { if (listenChannelTlsPsk != null) { listenChannelTlsPsk.close(); } + if (listenChannelWs != null) { + listenChannelWs.close(); + } + if (listenChannelWss != null) { + listenChannelWss.close(); + } this.acceptorGroup.shutdownGracefully(); this.workerGroup.shutdownGracefully(); this.eventService.close(); diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/channel/MQTTProxyChannelInitializer.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/channel/MQTTProxyChannelInitializer.java index 81da2cb7..6bb14a39 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/channel/MQTTProxyChannelInitializer.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/channel/MQTTProxyChannelInitializer.java @@ -23,6 +23,7 @@ import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterDecoder; import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterEncoder; import io.streamnative.pulsar.handlers.mqtt.common.psk.PSKUtils; +import io.streamnative.pulsar.handlers.mqtt.common.utils.WebSocketUtils; import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyConfiguration; import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyService; import io.streamnative.pulsar.handlers.mqtt.proxy.impl.MQTTProxyException; @@ -45,22 +46,24 @@ public class MQTTProxyChannelInitializer extends ChannelInitializer