Skip to content

Commit 3cbd0b9

Browse files
authored
update mqtt (#792)
* update mqtt * add caution
1 parent 525aabd commit 3cbd0b9

File tree

8 files changed

+993
-200
lines changed

8 files changed

+993
-200
lines changed

src/UserGuide/Master/Table/API/Programming-MQTT.md

Lines changed: 50 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,27 @@
2222

2323
## 1. Overview
2424

25-
[MQTT](Message Queuing Telemetry Transport)(http://mqtt.org/) is a lightweight messaging protocol designed for IoT and low-bandwidth environments. It operates on a Publish/Subscribe (Pub/Sub) model, enabling efficient and reliable bidirectional communication between devices. Its core objectives are low power consumption, minimal bandwidth usage, and high real-time performance, making it ideal for unstable networks or resource-constrained scenarios (e.g., sensors, mobile devices).
25+
MQTT (Message Queuing Telemetry Transport) is a lightweight messaging protocol designed for IoT and low-bandwidth environments. It operates on a Publish/Subscribe (Pub/Sub) model, enabling efficient and reliable bidirectional communication between devices. Its core objectives are low power consumption, minimal bandwidth usage, and high real-time performance, making it ideal for unstable networks or resource-constrained scenarios (e.g., sensors, mobile devices).
2626

2727
IoTDB provides deep integration with the MQTT protocol, fully compliant with MQTT v3.1 (OASIS International Standard). The IoTDB server includes a built-in high-performance MQTT Broker module, eliminating the need for third-party middleware. Devices can directly write time-series data into the IoTDB storage engine via MQTT messages.
2828

29-
<img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; margin-right:auto; display:block;" src="/img/github/78357432-0c71cf80-75e4-11ea-98aa-c43a54d469ce.png">
29+
![](/img/mqtt-table-en-1.png)
3030

3131

32+
## 2. Configuration
3233

33-
## 2. Built-in MQTT Service
34-
The Built-in MQTT Service provide the ability of direct connection to IoTDB through MQTT. It listen the publish messages from MQTT clients
35-
and then write the data into storage immediately.
36-
The MQTT topic corresponds to IoTDB timeseries.The first segment of the MQTT topic (split by `/`) is used as the database name.The table name is derived from the `<measurement>` in the line protocol.
37-
The messages payload can be format to events by `PayloadFormatter` which loaded by java SPI, and the implementation of `PayloadFormatter` for table is `LinePayloadFormatter`.
38-
The following is the line protocol syntax of MQTT message payload and an example:
34+
By default, the IoTDB MQTT service loads configurations from `${IOTDB_HOME}/${IOTDB_CONF}/iotdb-system.properties`.
35+
36+
| **Property** | **Description** | **Default** |
37+
| ------------------------ | ---------------------------------------------------------------------------------------------------------------------- | ------------------- |
38+
| `enable_mqtt_service` | Enable/ disable the MQTT service. | FALSE |
39+
| `mqtt_host` | Host address bound to the MQTT service. | 127.0.0.1 |
40+
| `mqtt_port` | Port bound to the MQTT service. | 1883 |
41+
| `mqtt_handler_pool_size` | Thread pool size for processing MQTT messages. | 1 |
42+
| **`mqtt_payload_formatter`** | **Formatting method for MQTT message payloads. ​****Options: `json` (tree model), `line` (table model).** | **json** |
43+
| `mqtt_max_message_size` | Maximum allowed MQTT message size (bytes). | 1048576 |
44+
45+
## 3. Write Protocol
3946

4047
* Line Protocol Syntax
4148

@@ -49,23 +56,7 @@ The following is the line protocol syntax of MQTT message payload and an example
4956
myMeasurement,tag1=value1,tag2=value2 attr1=value1,attr2=value2 fieldKey="fieldValue" 1556813561098000000
5057
```
5158

52-
53-
54-
## 3. MQTT Configurations
55-
56-
By default, the IoTDB MQTT service loads configurations from `${IOTDB_HOME}/${IOTDB_CONF}/iotdb-system.properties`.
57-
58-
Configurations are as follows:
59-
60-
| **Property** | **Description** | **Default** |
61-
| ------------------------ | ---------------------------------------------------------------------------------------------------------------------- |-------------|
62-
| `enable_mqtt_service` | Enable/ disable the MQTT service. | false |
63-
| `mqtt_host` | Host address bound to the MQTT service. | 127.0.0.1 |
64-
| `mqtt_port` | Port bound to the MQTT service. | 1883 |
65-
| `mqtt_handler_pool_size` | Thread pool size for processing MQTT messages. | 1 |
66-
| **`mqtt_payload_formatter`** | **Formatting method for MQTT message payloads. ​****Options: `json` (tree model), `line` (table model).** | **json** |
67-
| `mqtt_max_message_size` | Maximum allowed MQTT message size (bytes). | 1048576 |
68-
59+
![](/img/mqtt-table-en-2.png)
6960

7061
## 4. Naming Conventions
7162

@@ -102,24 +93,47 @@ The table name is derived from the `<measurement>` in the line protocol.
10293
## 5. Coding Examples
10394
The following is an example which a mqtt client send messages to IoTDB server.
10495

105-
```java
96+
```java
10697
MQTT mqtt = new MQTT();
10798
mqtt.setHost("127.0.0.1", 1883);
10899
mqtt.setUserName("root");
109100
mqtt.setPassword("root");
110101

111102
BlockingConnection connection = mqtt.blockingConnection();
103+
String DATABASE = "myMqttTest";
112104
connection.connect();
113105

114-
for (int i = 0; i < 10; i++) {
115-
String payload = String.format("test%d,tag1=t1,tag2=t2 attr1=a1,attr2=a2 field1=\"value1\",field2=1i,field3=2u,field4=3i32,field5=t,field6=false,field7=4,field8=5f 1", random.nextDouble());
116-
117-
connection.publish("root.sg.d1.s1", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
118-
}
106+
String payload =
107+
"test1,tag1=t1,tag2=t2 attr3=a5,attr4=a4 field1=\"fieldValue1\",field2=1i,field3=1u 1";
108+
connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
109+
Thread.sleep(10);
110+
111+
payload = "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 2";
112+
connection.publish(DATABASE, payload.getBytes(), QoS.AT_LEAST_ONCE, false);
113+
Thread.sleep(10);
114+
115+
payload = "# It's a remark\n " + "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 6";
116+
connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
117+
Thread.sleep(10);
118+
119+
//batch write example
120+
payload =
121+
"test1,tag1=t1,tag2=t2 field7=t,field8=T,field9=true 3 \n "
122+
+ "test1,tag1=t1,tag2=t2 field7=f,field8=F,field9=FALSE 4";
123+
connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
124+
Thread.sleep(10);
125+
126+
//batch write example
127+
payload =
128+
"test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 field1=\"fieldValue1\",field2=1i,field3=1u 4 \n "
129+
+ "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 5";
130+
connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
131+
Thread.sleep(10);
119132

120133
connection.disconnect();
134+
```
135+
121136

122-
```
123137

124138
## 6. Customize your MQTT Message Format
125139

@@ -166,10 +180,10 @@ public class CustomizedLinePayloadFormatter implements PayloadFormatter {
166180
for (int i = 0; i < 3; i++) {
167181
long ts = i;
168182
TableMessage message = new TableMessage();
169-
183+
170184
// Parsing Database Name
171185
message.setDatabase("db" + i);
172-
186+
173187
//Parsing Table Names
174188
message.setTable("t" + i);
175189

@@ -200,7 +214,7 @@ public class CustomizedLinePayloadFormatter implements PayloadFormatter {
200214
message.setFields(fields);
201215
message.setDataTypes(dataTypes);
202216
message.setValues(values);
203-
217+
204218
//// Parsing timestamp
205219
message.setTimestamp(ts);
206220
ret.add(message);
@@ -232,6 +246,7 @@ Then, in your server:
232246
More: the message format can be anything you want. For example, if it is a binary format,
233247
just use `payload.forEachByte()` or `payload.array` to get bytes content.
234248

249+
235250
## 7. Caution
236251

237252
To avoid compatibility issues caused by a default client_id, always explicitly supply a unique, non-empty client_id in every MQTT client.

0 commit comments

Comments
 (0)