diff --git a/README.md b/README.md
index fe9f0affd..2e1730584 100644
--- a/README.md
+++ b/README.md
@@ -7,9 +7,9 @@
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
-
+
http://www.apache.org/licenses/LICENSE-2.0
-
+
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -54,7 +54,7 @@ MQTT-on-Pulsar (aka MoP) is developed to support MQTT protocol natively on Apach
Configure the Pulsar broker to run the MoP protocol handler as a plugin by adding configurations to the Pulsar configuration file, such as `broker.conf` or `standalone.conf`.
1. Set the configuration of the MoP protocol handler.
-
+
Add the following properties and set their values in the Pulsar configuration file, such as `conf/broker.conf` or `conf/standalone.conf`.
| Property | Suggested value | Default value |
@@ -137,10 +137,10 @@ The following example shows how to verify the MoP protocol handler with FuseSour
connection.connect();
Topic[] topics = { new Topic("persistent://public/default/my-topic", QoS.AT_LEAST_ONCE) };
connection.subscribe(topics);
-
+
// publish message
connection.publish("persistent://public/default/my-topic", "Hello MOP!".getBytes(), QoS.AT_LEAST_ONCE, false);
-
+
// receive message
Message received = connection.receive();
```
@@ -407,6 +407,10 @@ curl http://pulsar-broker-webservice-address:port/mop/stats
Please refer [here](docs/mop-configuration.md)
+## Using MQTT over WebSocket
+
+Please refer [here](docs/using-mqtt-over-websocket.md)
+
## Project maintainers
- [@Technoboy-](https://github.com/Technoboy-)
diff --git a/docs/mop-configuration.md b/docs/mop-configuration.md
index 5bc28d596..1a17ca54e 100644
--- a/docs/mop-configuration.md
+++ b/docs/mop-configuration.md
@@ -2,18 +2,21 @@
## Common
-| Property | Default value | Comment
-|----------| --------------| --------
-|messagingProtocols | mqtt | available values [mqtt, kafka, amqp]
+| Property | Default value | Comment|
+|----------| --------------| --------|
+|messagingProtocols | mqtt | available values [mqtt, kafka, amqp]|
|protocolHandlerDirectory | ./protocols | Protocol handler directory |
|mqttListeners | | MoP listener address. available listener prefix: [mqtt, mqtt+ssl, mqtt+ssl+psk]|
|advertisedAddress | | Keep the same as Pulsar broker's `advertisedAddress` |
-|mqttAuthenticationEnabled| false | Enable mqtt authentication
-|mqttAuthenticationMethods | null | Mqtt authentication methods, available values [basic, token]
+|mqttAuthenticationEnabled| false | Enable mqtt authentication|
+|mqttAuthenticationMethods | null | Mqtt authentication methods, available values [basic, token]|
|defaultTenant | public | Default Pulsar tenant that the MQTT server used |
-|defaultNamespace | default | Default Pulsar namespace that the MQTT server used |
+|defaultNamespace | default | Default Pulsar namespace that the MQTT server used |
|defaultTopicDomain | persistent | Default Pulsar topic domain that the MQTT server used |
|mqttMessageMaxLength | 8092 | Max length for per message. |
+|httpMaxContentLength | 65535 | The maximum content legnth on a http object. |
+|webSocketMaxFrameSize | 65535 | The maximum frame size on webSocket. |
+|webSocketPath | /mqtt | The websocket access path |
## MoP Proxy
@@ -69,4 +72,3 @@
|mqttProxyNumAcceptorThreads | 1 | Number of threads to use for Netty Acceptor. Default is set to `1` |
|mqttProxyNumIOThreads | Runtime.getRuntime().availableProcessors() | Number of threads to use for Netty IO |
-
diff --git a/docs/mqttx/download.PNG b/docs/mqttx/download.PNG
new file mode 100644
index 000000000..427cd13e3
Binary files /dev/null and b/docs/mqttx/download.PNG differ
diff --git a/docs/mqttx/mqttx.png b/docs/mqttx/mqttx.png
new file mode 100644
index 000000000..aa888b95c
Binary files /dev/null and b/docs/mqttx/mqttx.png differ
diff --git a/docs/mqttx/ws/add_sub.PNG b/docs/mqttx/ws/add_sub.PNG
new file mode 100644
index 000000000..949a9aaeb
Binary files /dev/null and b/docs/mqttx/ws/add_sub.PNG differ
diff --git a/docs/mqttx/ws/connect_ws.PNG b/docs/mqttx/ws/connect_ws.PNG
new file mode 100644
index 000000000..86553f665
Binary files /dev/null and b/docs/mqttx/ws/connect_ws.PNG differ
diff --git a/docs/mqttx/ws/new_ws_conn.PNG b/docs/mqttx/ws/new_ws_conn.PNG
new file mode 100644
index 000000000..6421d23e4
Binary files /dev/null and b/docs/mqttx/ws/new_ws_conn.PNG differ
diff --git a/docs/mqttx/ws/pub_msg.PNG b/docs/mqttx/ws/pub_msg.PNG
new file mode 100644
index 000000000..afe8ff6ac
Binary files /dev/null and b/docs/mqttx/ws/pub_msg.PNG differ
diff --git a/docs/mqttx/wss/add_sub.PNG b/docs/mqttx/wss/add_sub.PNG
new file mode 100644
index 000000000..ae49bc8b8
Binary files /dev/null and b/docs/mqttx/wss/add_sub.PNG differ
diff --git a/docs/mqttx/wss/connect_wss.PNG b/docs/mqttx/wss/connect_wss.PNG
new file mode 100644
index 000000000..d269df52d
Binary files /dev/null and b/docs/mqttx/wss/connect_wss.PNG differ
diff --git a/docs/mqttx/wss/new_wss_conn.PNG b/docs/mqttx/wss/new_wss_conn.PNG
new file mode 100644
index 000000000..fad579021
Binary files /dev/null and b/docs/mqttx/wss/new_wss_conn.PNG differ
diff --git a/docs/mqttx/wss/pub_msg.PNG b/docs/mqttx/wss/pub_msg.PNG
new file mode 100644
index 000000000..cd7a0411d
Binary files /dev/null and b/docs/mqttx/wss/pub_msg.PNG differ
diff --git a/docs/using-mqtt-over-websocket.md b/docs/using-mqtt-over-websocket.md
new file mode 100644
index 000000000..4cb125436
--- /dev/null
+++ b/docs/using-mqtt-over-websocket.md
@@ -0,0 +1,303 @@
+# Using MQTT over WebSocket
+
+**Table of Contents**
+
+[TOC]
+
+## What is WebSocket?
+
+WebSocket is a network communication protocol that enables two-way communication channels over a single TCP connection. Unlike HTTP, WebSocket keeps an open connection between a client and a server, which enables them to exchange data immediately and interactively. This makes WebSocket ideal for real-time interactivity applications like online games, chat applications, and stock trading systems.
+
+The WebSocket protocol has two parts: handshake and data transfer. Handshake establishes a connection between client and server, while data transfer exchanges information over the open connection.
+
+## Why Use MQTT over WebSocket?
+
+MQTT over WebSockets is quickly becoming an essential conduit for IoT interactions, offering a more accessible, efficient, and enriched experience. By enabling direct MQTT data communication through any web browser, it brings the world of IoT closer to everyone.
+
+Here are some reasons to use MQTT over WebSocket:
+
+1. **Simplified Interaction**: Interact directly with IoT devices via any web browser. No need to worry about different protocols – MQTT over WebSocket makes it straightforward.
+2. **Universal Accessibility**: With a web browser, anyone can connect to and interact with IoT devices. This opens up the world of IoT to everyone, not just those with technical expertise.
+3. **Real-Time Updates**: Get data from IoT devices in real-time, providing the most current insights directly to your browser.
+4. **Efficiency and Broad Support**: MQTT is a lightweight protocol and, combined with the widespread support of WebSocket in JavaScript, it allows for efficient real-time data transmission on almost any web application.
+5. **Enhanced Data Visualization**: Web pages can better, faster, and more richly display various MQTT data. This advantage is particularly significant as web browsers become the de facto interface for visualizing MQTT data.
+
+MQTT over WebSocket democratizes access to IoT devices, enabling anyone with a web browser to interact with these devices in real-time and easily.
+
+Next, we will provide a comprehensive guide to using MQTT over WebSocket.
+
+## Prepare MQTT Broker
+
+### Set the MQTT server listeners.
+
+```
+mqttListeners=mqtt://127.0.0.1:1883,mqtt+ssl://127.0.0.1:8883,ws://127.0.0.1:8083,ws+ssl://127.0.0.1:8084
+```
+
+### Config mqtt broker to load tls config.
+
+```
+mqttTlsCertificateFilePath=/xxx/server.crt
+mqttTlsKeyFilePath=/xxx/server.key
+```
+
+ > #### Note
+ > MQTT Broker Info:
+ >
+ > Server: `broker.steamnative.io`
+ >
+ > TCP Port: `1883``
+ >
+ > SSL/TLS Port: `8883`
+ >
+ > WebSocket Port: `8083
+ >
+ > Secure WebSocket Port: `8084`
+
+## Get Started with MQTT over WebSocket
+
+### Install MQTT WebSocket Client
+
+[MQTT.js](https://github.com/mqttjs/MQTT.js) is a fully open-source client-side library for the MQTT protocol, written in JavaScript and available for Node.js and browsers. It supports MQTT/TCP, MQTT/TLS, and MQTT/WebSocket connections.
+
+This article will use the MQTT.js library to explain WebSocket connections.
+
+To install MQTT.js, use the `npm` command if you have the Node.js runtime environment on your machine. You can install it globally and connect via the command line on the Node.js.
+
+**Installation for Node.js Project**
+
+```
+# npm
+npm install mqtt --save
+
+# yarn
+yarn add mqtt
+```
+
+**CDN References**
+
+If you're working directly in the browser and prefer not to install the library, you can also use a CDN:
+
+```
+
+
+
+```
+
+### Connect to MQTT over WebSocket in Browser
+
+For simplicity, we will implement this directly in the browser by creating a basic HTML file. In this file, we'll set up both a publisher and a subscriber.
+
+```
+
+
+
+
+
+ Weboscoket MQTT
+
+
+
+ Use WebSocket client to connect to MQTT server
+
+
+```
+
+### Connection Address
+
+The example connection address, `ws://broker.streamnavite.io:8083/mqtt`, includes `protocol` // `hostname` . `domain` : `port` / `path`.
+
+Common mistakes by beginners include:
+
+- When connecting to the MQTT.js client, it is important to specify the protocol type in the connection address. This is because the client supports multiple protocols. Additionally, MQTT does not specify a port for WebSocket. MoP defaults to 8083 for non-encrypted connections, while for encrypted connections, it defaults to `8084`.
+- Excluding path from the connection address: MQTT over WebSocket uniformly uses `/mqtt` as the connection path, which should be specified when connecting.
+- The protocol and port are mismatched. Please use `mqtt://` for MQTT, `ws://` or `wss://` for WebSocket connections, and make sure to use encrypted WebSocket connections when under HTTPS.
+- The certificate does not match the connection address.
+
+### Connection Options
+
+In the previous code snippet, `options` refer to the client connection options. These options include parameters such as `keepalive`, `clientId`, `username`, `password`, `clean`, `reconnectPeriod`, `connectTimeout`, and `will`. For more detailed descriptions of each option, please refer to the [MQTT.js documentation](https://github.com/mqttjs/MQTT.js#client).
+
+### Subscribe/Unsubscribe Topics
+
+Subscriptions can only be made after a successful connection, and the subscribed topics must comply with MQTT subscription topic rules. JavaScript's asynchronous feature means a successful connection is only ensured after the 'connect' event or by using `client.connected`.
+
+```
+client.on('connect', () => {
+ console.log(`Client connected: ${clientId}`)
+ // Subscribe
+ client.subscribe('testtopic', { qos: 0 })
+})
+// Unsubscribe
+client.unubscribe('testtopic', () => {
+ console.log('Unsubscribed');
+})
+```
+
+### Publish/Receive Messages
+
+You can publish messages to specific topics, which must comply with the MQTT publish topic rules. You do not need to subscribe to the topic before publishing; the client must be connected.
+
+```
+// Publish
+client.publish('testtopic', 'ws connection demo...!', { qos: 0, retain: false })
+// Receive
+client.on('message', (topic, message, packet) => {
+ console.log(`Received Message: ${message.toString()} On topic: ${topic}`)
+})
+```
+
+### Use WebSocket over SSL/TLS
+
+Secure WebSocket connections can be established using the WSS protocol (WebSocket Secure), essentially a WebSocket over a TLS (previously SSL) connection. TLS is used to encrypt the data that is sent over the connection, ensuring data privacy and integrity, as well as authentication.
+
+To use WebSocket over TLS with MQTT.js, you need to change the protocol in the broker address from `ws` to `wss`. However, you must also ensure that the broker you are connecting to supports WSS connections and that the port number is correct for WSS. For example, EMQX uses port `8084` for WSS connections by default.
+
+Here's an example of how you might establish a secure connection:
+
+```
+const host = 'wss://broker.steamnative.io:8084/mqtt'
+const options = {
+ // other options as before
+}
+
+const client = mqtt.connect(host, options)
+
+// rest of your code...
+```
+
+Remember, if you're connecting to a broker over WSS from a web page served over HTTPS, you must ensure the broker's certificate is trusted by the client's browser. This usually means the certificate must be issued by a well-known certificate authority and not be expired, revoked, or used for a different domain. If you're using a self-signed certificate for the broker, you must manually add it to the browser's trust store.
+
+For more details and potential issues related to using WebSocket over TLS, please refer to the [MQTT.js documentation](https://github.com/mqttjs/MQTT.js#client) or the appropriate tutorial for your MQTT broker.
+
+> *Note: When using WebSocket connections in a browser, it is not possible to establish two-way authenticated connections. However, this feature is supported in most other programming language environments. For example, in Node.js:*
+
+```
+const mqtt = require('mqtt')
+const fs = require('fs')
+const path = require('path')
+
+const KEY = fs.readFileSync(path.join(__dirname, '/tls-key.pem'))
+const CERT = fs.readFileSync(path.join(__dirname, '/tls-cert.pem'))
+const TRUSTED_CA_LIST = fs.readFileSync(path.join(__dirname, '/crt.ca.cg.pem'))
+
+const host = 'wss://broker.emqx.io:8084/mqtt'
+const options = {
+ ...
+ key: KEY,
+ cert: CERT,
+ rejectUnauthorized: true,
+ ca: TRUSTED_CA_LIST,
+}
+
+const client = mqtt.connect(host, options)
+```
+
+## Test
+
+We can use a tool like [MQTTX](https://mqttx.app/), which provides a GUI for MQTT interactions. Here's how you can test it:
+
+1. download and install
+
+ data:image/s3,"s3://crabby-images/a8237/a8237cea407029d4befccb677420dbbf46e41f94" alt=""
+
+2. start mqttx
+
+ data:image/s3,"s3://crabby-images/70203/702037440f332a1f0d67b954ab91f534da0d90a4" alt=""
+
+### WS
+
+1. New websocket connection
+
+ data:image/s3,"s3://crabby-images/dddf2/dddf29fa316bc548cb01bacfb11d247dea2210e6" alt=""
+
+2. connect websocket
+
+ data:image/s3,"s3://crabby-images/78167/78167c691f46a8f945bd76d20633db740067d9ef" alt=""
+
+3. Add a subscription
+
+ data:image/s3,"s3://crabby-images/44061/4406151ffd3cdd44923afd4759261dfa6124c110" alt=""
+
+4. Publish message
+
+ data:image/s3,"s3://crabby-images/467aa/467aabaf742c8d77d887958cff4d5daf3d63b70a" alt=""
+
+### WSS
+
+1. New websocket connection
+
+ data:image/s3,"s3://crabby-images/b497c/b497c4a9bd19e2c4fc89c50e6b88dd1cfe01f531" alt=""
+
+2. connect websocket with ssl
+
+ data:image/s3,"s3://crabby-images/adbee/adbeedd78f5807a0048e9fd73b6511f6d2e85a1e" alt=""
+
+3. Add a subscription
+
+ data:image/s3,"s3://crabby-images/156f0/156f04b8518c229ab6cc799639e14353777eaaec" alt=""
+
+4. Publish message
+
+ data:image/s3,"s3://crabby-images/1bc44/1bc44c388177d5c2b65f3e08afc37b5ab3b3a2f7" alt=""
+
+## Q&A
+
+### What is the difference between MQTT and WebSocket?
+
+The main difference lies in the protocol design and use case: MQTT is a message transfer protocol used for publish/subscribe communication, while WebSocket is a communication protocol used for real-time bidirectional communication.
+
+### Can WSS support two-way authentication connections in the browser?
+
+No, it is impossible to specify a client certificate using JavaScript code when establishing a connection in a browser, even if client certificates are set up in your OS certificate store or potentially some smart card. This means that MQTT.js cannot do so. Additionally, you cannot specify a Certificate Authority (CA) either, as it is controlled by the browser.
+
+Reference: [How to use TLS/SSL two-way authentication connections in browser? · Issue #1515 · mqttjs/MQTT.js](https://github.com/mqttjs/MQTT.js/issues/1515)
+
+
+### Can it be used outside of a browser environment?
+
+Yes, you can use MQTT over WebSocket in non-browser environments. Different programming languages have corresponding MQTT client libraries, such as Python, Node.js, Golang, etc., allowing you to connect to MQTT brokers and communicate using MQTT over WebSocket in your chosen environment. When TLS/SSL connections are supported, you can also use mutual certificate authentication.
+
+### Why do I need to fill in a path when connecting to MoP?
+
+A path must be filled in when using WebSocket to connect to [MoP](https://www.emqx.io/?__hstc=3614191.cd31d14d6887cb6138feafc20e333f55.1713262331758.1713262331758.1713319978583.2&__hssc=3614191.1.1713319978583&__hsfp=4215947063). This is because EMQX follows the unified path specification of MQTT-WebSocket. This specification requires a specific path to be specified in WebSocket connections to identify and distinguish MQTT over WebSocket traffic. This path routes MQTT over WebSocket traffic to the MQTT Broker for processing.
+
+In MoP, the default path for MQTT over WebSocket is `/mqtt`. This is set according to the specification. Therefore, when connecting to MoP, this path must be included in the WebSocket address to ensure the connection is correctly routed to the MQTT broker.
+
+### When developing MQTT web applications, whether using Vue.js or React, can I only use WebSocket connections?
+
+If you are developing applications in a browser, you can only use WebSocket connections to establish MQTT over WebSocket connections.
+
+## Summary
+
+This quickstart guide covers the basics of using MQTT over WebSocket to establish real-time communication between MQTT brokers and web browsers. We walk you through the essential steps, including establishing the WebSocket connection, initializing the MQTT client, subscribing and publishing messages, and testing the connection.
diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Constants.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Constants.java
index 77b707c9f..a0c663936 100644
--- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Constants.java
+++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Constants.java
@@ -30,6 +30,21 @@ public final class Constants {
public static final String MQTT_PROPERTIES_PREFIX = "MQTT_PROPERTIES_";
+ /**
+ * netty handler name constant.
+ */
+ public static final String HANDLER_HTTP_CODEC = "httpCodecHandler";
+
+ public static final String HANDLER_HTTP_AGGREGATOR = "httpAggregatorHandler";
+
+ public static final String HANDLER_HTTP_COMPRESSOR = "httpCompressorHandler";
+
+ public static final String HANDLER_WEB_SOCKET_SERVER_PROTOCOL = "webSocketServerProtocolHandler";
+
+ public static final String HANDLER_MQTT_WEB_SOCKET_CODEC = "mqttWebSocketCodecHandler";
+
+ public static final String MQTT_SUB_PROTOCOL_CSV_LIST = "mqtt, mqttv3.1, mqttv3.1.1, mqttv5.0";
+
private Constants() {
}
}
diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTChannelInitializer.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTChannelInitializer.java
index bbe6c47e3..8749a62e5 100644
--- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTChannelInitializer.java
+++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTChannelInitializer.java
@@ -15,7 +15,12 @@
import static org.apache.pulsar.client.impl.PulsarChannelInitializer.TLS_HANDLER;
import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.http.HttpContentCompressor;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
@@ -23,6 +28,7 @@
import io.streamnative.pulsar.handlers.mqtt.adapter.CombineAdapterHandler;
import io.streamnative.pulsar.handlers.mqtt.adapter.MqttAdapterDecoder;
import io.streamnative.pulsar.handlers.mqtt.adapter.MqttAdapterEncoder;
+import io.streamnative.pulsar.handlers.mqtt.codec.MqttWebSocketCodec;
import io.streamnative.pulsar.handlers.mqtt.support.psk.PSKUtils;
import org.apache.pulsar.common.util.NettyServerSslContextBuilder;
import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder;
@@ -37,21 +43,23 @@ public class MQTTChannelInitializer extends ChannelInitializer {
private final MQTTService mqttService;
private final boolean enableTls;
private final boolean enableTlsPsk;
+ private final boolean enableWs;
private final boolean tlsEnabledWithKeyStore;
private SslContextAutoRefreshBuilder sslCtxRefresher;
private NettySSLContextAutoRefreshBuilder nettySSLContextAutoRefreshBuilder;
- public MQTTChannelInitializer(MQTTService mqttService, boolean enableTls) {
- this(mqttService, enableTls, false);
+ public MQTTChannelInitializer(MQTTService mqttService, boolean enableTls, boolean enableWs) {
+ this(mqttService, enableTls, false, enableWs);
}
- public MQTTChannelInitializer(MQTTService mqttService, boolean enableTls, boolean enableTlsPsk) {
+ public MQTTChannelInitializer(MQTTService mqttService, boolean enableTls, boolean enableTlsPsk, boolean enableWs) {
super();
this.mqttService = mqttService;
this.mqttConfig = mqttService.getServerConfiguration();
this.enableTls = enableTls;
this.enableTlsPsk = enableTlsPsk;
+ this.enableWs = enableWs;
this.tlsEnabledWithKeyStore = mqttConfig.isMqttTlsEnabledWithKeyStore();
if (this.enableTls) {
if (tlsEnabledWithKeyStore) {
@@ -97,6 +105,9 @@ public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(TLS_HANDLER,
new SslHandler(PSKUtils.createServerEngine(ch, mqttService.getPskConfiguration())));
}
+ if (this.enableWs) {
+ addWsHandler(ch.pipeline());
+ }
// Decoder
ch.pipeline().addLast(MqttAdapterDecoder.NAME, new MqttAdapterDecoder());
ch.pipeline().addLast("mqtt-decoder", new MqttDecoder(mqttConfig.getMqttMessageMaxLength()));
@@ -106,4 +117,25 @@ public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(CombineAdapterHandler.NAME, new CombineAdapterHandler());
ch.pipeline().addLast(MQTTInboundHandler.NAME, new MQTTInboundHandler(mqttService));
}
+
+ /**
+ * Add websocket handler.
+ * @param pipeline
+ */
+ private void addWsHandler(ChannelPipeline pipeline) {
+ // Encode or decode request and reply messages into HTTP messages
+ pipeline.addLast(Constants.HANDLER_HTTP_CODEC, new HttpServerCodec());
+
+ // Combine the parts of an HTTP message into a complete HTTP message
+ pipeline.addLast(Constants.HANDLER_HTTP_AGGREGATOR,
+ new HttpObjectAggregator(mqttConfig.getHttpMaxContentLength()));
+
+ // Compress and encode HTTP messages
+ pipeline.addLast(Constants.HANDLER_HTTP_COMPRESSOR, new HttpContentCompressor());
+
+ pipeline.addLast(Constants.HANDLER_WEB_SOCKET_SERVER_PROTOCOL,
+ new WebSocketServerProtocolHandler(mqttConfig.getWebSocketPath(), Constants.MQTT_SUB_PROTOCOL_CSV_LIST,
+ true, mqttConfig.getWebSocketMaxFrameSize()));
+ pipeline.addLast(Constants.HANDLER_MQTT_WEB_SOCKET_CODEC, new MqttWebSocketCodec());
+ }
}
diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTCommonConfiguration.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTCommonConfiguration.java
index 2bcbcb129..676dc5a0e 100644
--- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTCommonConfiguration.java
+++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTCommonConfiguration.java
@@ -43,6 +43,8 @@ public class MQTTCommonConfiguration extends ServiceConfiguration {
public static final String CATEGORY_TLS_PSK = "TLS-PSK";
@Category
public static final String CATEGORY_KEYSTORE_TLS = "KeyStoreTLS";
+ @Category
+ public static final String CATEGORY_WS = "MQTT over WebSocket";
@FieldContext(
category = CATEGORY_MQTT,
@@ -340,6 +342,27 @@ public class MQTTCommonConfiguration extends ServiceConfiguration {
)
private int eventCenterCallbackPoolThreadNum = 1;
+ @FieldContext(
+ category = CATEGORY_WS,
+ required = true,
+ doc = "The maximum content legnth on a http object."
+ )
+ private int httpMaxContentLength = 65535;
+
+ @FieldContext(
+ category = CATEGORY_WS,
+ required = false,
+ doc = "The maximum frame size on webSocket."
+ )
+ private int webSocketMaxFrameSize = 65535;
+
+ @FieldContext(
+ category = CATEGORY_WS,
+ required = false,
+ doc = "The webSocket access path"
+ )
+ private String webSocketPath = "/mqtt";
+
public long getMqttTlsCertRefreshCheckDurationSec() {
if (mqttTlsCertRefreshCheckDurationSec != 300) {
return mqttTlsCertRefreshCheckDurationSec;
diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTProtocolHandler.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTProtocolHandler.java
index 4afed896b..a98f9a3fd 100644
--- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTProtocolHandler.java
+++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTProtocolHandler.java
@@ -19,6 +19,8 @@
import static io.streamnative.pulsar.handlers.mqtt.utils.ConfigurationUtils.PROTOCOL_NAME;
import static io.streamnative.pulsar.handlers.mqtt.utils.ConfigurationUtils.SSL_PREFIX;
import static io.streamnative.pulsar.handlers.mqtt.utils.ConfigurationUtils.SSL_PSK_PREFIX;
+import static io.streamnative.pulsar.handlers.mqtt.utils.ConfigurationUtils.WS_PLAINTEXT_PREFIX;
+import static io.streamnative.pulsar.handlers.mqtt.utils.ConfigurationUtils.WS_SSL_PREFIX;
import static io.streamnative.pulsar.handlers.mqtt.utils.ConfigurationUtils.getListenerPort;
import com.google.common.collect.ImmutableMap;
import io.netty.channel.ChannelInitializer;
@@ -120,18 +122,28 @@ public Map> newChannelIniti
if (listener.startsWith(PLAINTEXT_PREFIX)) {
builder.put(
new InetSocketAddress(brokerService.pulsar().getBindAddress(), getListenerPort(listener)),
- new MQTTChannelInitializer(mqttService, false));
+ new MQTTChannelInitializer(mqttService, false, false));
} else if (listener.startsWith(SSL_PREFIX)) {
builder.put(
new InetSocketAddress(brokerService.pulsar().getBindAddress(), getListenerPort(listener)),
- new MQTTChannelInitializer(mqttService, true));
+ new MQTTChannelInitializer(mqttService, true, false));
} else if (listener.startsWith(SSL_PSK_PREFIX) && mqttConfig.isMqttTlsPskEnabled()) {
+ builder.put(
+ new InetSocketAddress(brokerService.pulsar().getBindAddress(), getListenerPort(listener)),
+ new MQTTChannelInitializer(mqttService, false, true, false));
+
+ } else if (listener.startsWith(WS_PLAINTEXT_PREFIX)) {
builder.put(
new InetSocketAddress(brokerService.pulsar().getBindAddress(), getListenerPort(listener)),
new MQTTChannelInitializer(mqttService, false, true));
+ } else if (listener.startsWith(WS_SSL_PREFIX)) {
+ builder.put(
+ new InetSocketAddress(brokerService.pulsar().getBindAddress(), getListenerPort(listener)),
+ new MQTTChannelInitializer(mqttService, true, true));
+
} else {
log.error("MQTT listener {} not supported. supports {}, {} or {}",
listener, PLAINTEXT_PREFIX, SSL_PREFIX, SSL_PSK_PREFIX);
diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/codec/MqttWebSocketCodec.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/codec/MqttWebSocketCodec.java
new file mode 100644
index 000000000..2f19f8aea
--- /dev/null
+++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/codec/MqttWebSocketCodec.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamnative.pulsar.handlers.mqtt.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageCodec;
+import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
+import java.util.List;
+
+/**
+ * WebSocket Mqtt message codec.
+ *
+ */
+public class MqttWebSocketCodec extends MessageToMessageCodec {
+
+ @Override
+ protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List