diff --git a/src/UserGuide/Master/Table/API/Programming-MQTT.md b/src/UserGuide/Master/Table/API/Programming-MQTT.md index 91dec2b81..0ea8d2e15 100644 --- a/src/UserGuide/Master/Table/API/Programming-MQTT.md +++ b/src/UserGuide/Master/Table/API/Programming-MQTT.md @@ -22,20 +22,27 @@ ## 1. Overview -[MQTT](Message Queuing Telemetry Transport)(http://mqtt.org/) is a lightweight messaging protocol designed for IoT and low-bandwidth environments. It operates on a Publish/Subscribe (Pub/Sub) model, enabling efficient and reliable bidirectional communication between devices. Its core objectives are low power consumption, minimal bandwidth usage, and high real-time performance, making it ideal for unstable networks or resource-constrained scenarios (e.g., sensors, mobile devices). +MQTT (Message Queuing Telemetry Transport) is a lightweight messaging protocol designed for IoT and low-bandwidth environments. It operates on a Publish/Subscribe (Pub/Sub) model, enabling efficient and reliable bidirectional communication between devices. Its core objectives are low power consumption, minimal bandwidth usage, and high real-time performance, making it ideal for unstable networks or resource-constrained scenarios (e.g., sensors, mobile devices). IoTDB provides deep integration with the MQTT protocol, fully compliant with MQTT v3.1 (OASIS International Standard). The IoTDB server includes a built-in high-performance MQTT Broker module, eliminating the need for third-party middleware. Devices can directly write time-series data into the IoTDB storage engine via MQTT messages. - +![](/img/mqtt-table-en-1.png) +## 2. Configuration -## 2. Built-in MQTT Service -The Built-in MQTT Service provide the ability of direct connection to IoTDB through MQTT. It listen the publish messages from MQTT clients -and then write the data into storage immediately. -The MQTT topic corresponds to IoTDB timeseries.The first segment of the MQTT topic (split by `/`) is used as the database name.The table name is derived from the `` in the line protocol. -The messages payload can be format to events by `PayloadFormatter` which loaded by java SPI, and the implementation of `PayloadFormatter` for table is `LinePayloadFormatter`. -The following is the line protocol syntax of MQTT message payload and an example: +By default, the IoTDB MQTT service loads configurations from `${IOTDB_HOME}/${IOTDB_CONF}/iotdb-system.properties`. + +| **Property** | **Description** | **Default** | +| ------------------------ | ---------------------------------------------------------------------------------------------------------------------- | ------------------- | +| `enable_mqtt_service` | Enable/ disable the MQTT service. | FALSE | +| `mqtt_host` | Host address bound to the MQTT service. | 127.0.0.1 | +| `mqtt_port` | Port bound to the MQTT service. | 1883 | +| `mqtt_handler_pool_size` | Thread pool size for processing MQTT messages. | 1 | +| **`mqtt_payload_formatter`** | **Formatting method for MQTT message payloads. ​**​**Options: `json` (tree model), `line` (table model).** | **json** | +| `mqtt_max_message_size` | Maximum allowed MQTT message size (bytes). | 1048576 | + +## 3. Write Protocol * Line Protocol Syntax @@ -49,23 +56,7 @@ The following is the line protocol syntax of MQTT message payload and an example myMeasurement,tag1=value1,tag2=value2 attr1=value1,attr2=value2 fieldKey="fieldValue" 1556813561098000000 ``` - - -## 3. MQTT Configurations - -By default, the IoTDB MQTT service loads configurations from `${IOTDB_HOME}/${IOTDB_CONF}/iotdb-system.properties`. - -Configurations are as follows: - -| **Property** | **Description** | **Default** | -| ------------------------ | ---------------------------------------------------------------------------------------------------------------------- |-------------| -| `enable_mqtt_service` | Enable/ disable the MQTT service. | false | -| `mqtt_host` | Host address bound to the MQTT service. | 127.0.0.1 | -| `mqtt_port` | Port bound to the MQTT service. | 1883 | -| `mqtt_handler_pool_size` | Thread pool size for processing MQTT messages. | 1 | -| **`mqtt_payload_formatter`** | **Formatting method for MQTT message payloads. ​**​**Options: `json` (tree model), `line` (table model).** | **json** | -| `mqtt_max_message_size` | Maximum allowed MQTT message size (bytes). | 1048576 | - +![](/img/mqtt-table-en-2.png) ## 4. Naming Conventions @@ -102,24 +93,47 @@ The table name is derived from the `` in the line protocol. ## 5. Coding Examples The following is an example which a mqtt client send messages to IoTDB server. -```java + ```java MQTT mqtt = new MQTT(); mqtt.setHost("127.0.0.1", 1883); mqtt.setUserName("root"); mqtt.setPassword("root"); BlockingConnection connection = mqtt.blockingConnection(); +String DATABASE = "myMqttTest"; connection.connect(); -for (int i = 0; i < 10; i++) { - String payload = String.format("test%d,tag1=t1,tag2=t2 attr1=a1,attr2=a2 field1=\"value1\",field2=1i,field3=2u,field4=3i32,field5=t,field6=false,field7=4,field8=5f 1", random.nextDouble()); - - connection.publish("root.sg.d1.s1", payload.getBytes(), QoS.AT_LEAST_ONCE, false); -} +String payload = + "test1,tag1=t1,tag2=t2 attr3=a5,attr4=a4 field1=\"fieldValue1\",field2=1i,field3=1u 1"; +connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); +Thread.sleep(10); + +payload = "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 2"; +connection.publish(DATABASE, payload.getBytes(), QoS.AT_LEAST_ONCE, false); +Thread.sleep(10); + +payload = "# It's a remark\n " + "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 6"; + connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); + Thread.sleep(10); + +//batch write example +payload = + "test1,tag1=t1,tag2=t2 field7=t,field8=T,field9=true 3 \n " + + "test1,tag1=t1,tag2=t2 field7=f,field8=F,field9=FALSE 4"; +connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); +Thread.sleep(10); + +//batch write example +payload = + "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 field1=\"fieldValue1\",field2=1i,field3=1u 4 \n " + + "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 5"; +connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); +Thread.sleep(10); connection.disconnect(); + ``` + -``` ## 6. Customize your MQTT Message Format @@ -166,10 +180,10 @@ public class CustomizedLinePayloadFormatter implements PayloadFormatter { for (int i = 0; i < 3; i++) { long ts = i; TableMessage message = new TableMessage(); - + // Parsing Database Name message.setDatabase("db" + i); - + //Parsing Table Names message.setTable("t" + i); @@ -200,7 +214,7 @@ public class CustomizedLinePayloadFormatter implements PayloadFormatter { message.setFields(fields); message.setDataTypes(dataTypes); message.setValues(values); - + //// Parsing timestamp message.setTimestamp(ts); ret.add(message); @@ -232,6 +246,7 @@ Then, in your server: More: the message format can be anything you want. For example, if it is a binary format, just use `payload.forEachByte()` or `payload.array` to get bytes content. + ## 7. Caution To avoid compatibility issues caused by a default client_id, always explicitly supply a unique, non-empty client_id in every MQTT client. diff --git a/src/UserGuide/Master/Tree/API/Programming-MQTT.md b/src/UserGuide/Master/Tree/API/Programming-MQTT.md index df230ecd6..f99e0cdda 100644 --- a/src/UserGuide/Master/Tree/API/Programming-MQTT.md +++ b/src/UserGuide/Master/Tree/API/Programming-MQTT.md @@ -22,13 +22,13 @@ ## 1. Overview -[MQTT](Message Queuing Telemetry Transport)(http://mqtt.org/) is a lightweight messaging protocol designed for IoT and low-bandwidth environments. It operates on a Publish/Subscribe (Pub/Sub) model, enabling efficient and reliable bidirectional communication between devices. Its core objectives are low power consumption, minimal bandwidth usage, and high real-time performance, making it ideal for unstable networks or resource-constrained scenarios (e.g., sensors, mobile devices). +MQTT (Message Queuing Telemetry Transport) is a lightweight messaging protocol designed for IoT and low-bandwidth environments. It operates on a Publish/Subscribe (Pub/Sub) model, enabling efficient and reliable bidirectional communication between devices. Its core objectives are low power consumption, minimal bandwidth usage, and high real-time performance, making it ideal for unstable networks or resource-constrained scenarios (e.g., sensors, mobile devices). IoTDB provides deep integration with the MQTT protocol, fully compliant with MQTT v3.1 (OASIS International Standard). The IoTDB server includes a built-in high-performance MQTT Broker module, eliminating the need for third-party middleware. Devices can directly write time-series data into the IoTDB storage engine via MQTT messages. -[Programming-MQTT.md](Programming-MQTT.md) + ## 2. Built-in MQTT Service The Built-in MQTT Service provide the ability of direct connection to IoTDB through MQTT. It listen the publish messages from MQTT clients and then write the data into storage immediately. @@ -62,15 +62,14 @@ The IoTDB MQTT service load configurations from `${IOTDB_HOME}/${IOTDB_CONF}/iot Configurations are as follows: -| **Property** | DESCRIPTION | DEFAULT | -| ------------- |:-------------:|:------:| -| `enable_mqtt_service` | whether to enable the mqtt service | false | -| `mqtt_host` | the mqtt service binding host | 127.0.0.1 | -| `mqtt_port` | the mqtt service binding port | 1883 | -| `mqtt_handler_pool_size` | the handler pool size for handing the mqtt messages | 1 | -| **`mqtt_payload_formatter`** | **Formatting method for MQTT message payloads. ​**​**Options: `json` (tree model), `line` (table model).** | **json** | -| `mqtt_max_message_size` | the max mqtt message size in byte| 1048576 | - +| **Property** | **Description** | **Default** | +| ------------------------ | ---------------------------------------------------------------------------------------------------------------------- | ------------------- | +| `enable_mqtt_service` | Enable/ disable the MQTT service. | FALSE | +| `mqtt_host` | Host address bound to the MQTT service. | 127.0.0.1 | +| `mqtt_port` | Port bound to the MQTT service. | 1883 | +| `mqtt_handler_pool_size` | Thread pool size for processing MQTT messages. | 1 | +| **`mqtt_payload_formatter`** | **Formatting method for MQTT message payloads. ​**​**Options: `json` (tree model), `line` (table model).** | **json** | +| `mqtt_max_message_size` | Maximum allowed MQTT message size (bytes). | 1048576 | ## 4. Coding Examples The following is an example which a mqtt client send messages to IoTDB server. @@ -102,10 +101,41 @@ connection.disconnect(); ## 5. Customize your MQTT Message Format -If you do not like the above Json format, you can customize your MQTT Message format by just writing several lines -of codes. An example can be found in [example/mqtt-customize](https://github.com/apache/iotdb/tree/master/example/mqtt-customize) project. +In a production environment, each device typically has its own MQTT client, and the message formats of these clients have been pre-defined. If communication is to be carried out in accordance with the MQTT message format supported by IoTDB, a comprehensive upgrade and transformation of all existing clients would be required, which would undoubtedly incur significant costs. However, we can easily achieve customization of the MQTT message format through simple programming means, without the need to modify the clients. +An example can be found in [example/mqtt-customize](https://github.com/apache/iotdb/tree/rc/2.0.1/example/mqtt-customize) project. + +Assuming the MQTT client sends the following message format: +```json + { + "time":1586076045523, + "deviceID":"car_1", + "deviceType":"Gasoline car​​", + "point":"Fuel level​​", + "value":10.0 +} +``` +Or in the form of an array of JSON: +```java +[ + { + "time":1586076045523, + "deviceID":"car_1", + "deviceType":"Gasoline car​​", + "point":"Fuel level", + "value":10.0 + }, + { + "time":1586076045524, + "deviceID":"car_2", + "deviceType":"NEV(new enegry vehicle)", + "point":"Speed", + "value":80.0 + } +] +``` + +Then you can set up the custom MQTT message format through the following steps: -Steps: 1. Create a java project, and add dependency: ```xml @@ -120,44 +150,115 @@ e.g., ```java package org.apache.iotdb.mqtt.server; -import io.netty.buffer.ByteBuf; import org.apache.iotdb.db.protocol.mqtt.Message; import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter; +import org.apache.iotdb.db.protocol.mqtt.TableMessage; + +import com.google.common.collect.Lists; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.tsfile.enums.TSDataType; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +/** + * The Customized JSON payload formatter. one json format supported: { "time":1586076045523, + * "deviceID":"car_1", "deviceType":"NEV", "point":"Speed", "value":80.0 } + */ public class CustomizedJsonPayloadFormatter implements PayloadFormatter { + private static final String JSON_KEY_TIME = "time"; + private static final String JSON_KEY_DEVICEID = "deviceID"; + private static final String JSON_KEY_DEVICETYPE = "deviceType"; + private static final String JSON_KEY_POINT = "point"; + private static final String JSON_KEY_VALUE = "value"; + private static final Gson GSON = new GsonBuilder().create(); @Override public List format(String topic, ByteBuf payload) { - // Suppose the payload is a json format if (payload == null) { - return null; + return new ArrayList<>(); } - - String json = payload.toString(StandardCharsets.UTF_8); - // parse data from the json and generate Messages and put them into List ret - List ret = new ArrayList<>(); - // this is just an example, so we just generate some Messages directly - for (int i = 0; i < 2; i++) { - long ts = i; - Message message = new Message(); - message.setDevice("d" + i); - message.setTimestamp(ts); - message.setMeasurements(Arrays.asList("s1", "s2")); - message.setValues(Arrays.asList("4.0" + i, "5.0" + i)); - ret.add(message); + String txt = payload.toString(StandardCharsets.UTF_8); + JsonElement jsonElement = GSON.fromJson(txt, JsonElement.class); + if (jsonElement.isJsonObject()) { + JsonObject jsonObject = jsonElement.getAsJsonObject(); + return formatTableRow(topic, jsonObject); + } else if (jsonElement.isJsonArray()) { + JsonArray jsonArray = jsonElement.getAsJsonArray(); + List messages = new ArrayList<>(); + for (JsonElement element : jsonArray) { + JsonObject jsonObject = element.getAsJsonObject(); + messages.addAll(formatTableRow(topic, jsonObject)); + } + return messages; } - return ret; + throw new JsonParseException("payload is invalidate"); + } + + @Override + @Deprecated + public List format(ByteBuf payload) { + throw new NotImplementedException(); + } + + private List formatTableRow(String topic, JsonObject jsonObject) { + TableMessage message = new TableMessage(); + String database = !topic.contains("/") ? topic : topic.substring(0, topic.indexOf("/")); + String table = "test_table"; + + // Parsing Database Name + message.setDatabase((database)); + + // Parsing Table Name + message.setTable(table); + + // Parsing Tags + List tagKeys = new ArrayList<>(); + tagKeys.add(JSON_KEY_DEVICEID); + List tagValues = new ArrayList<>(); + tagValues.add(jsonObject.get(JSON_KEY_DEVICEID).getAsString()); + message.setTagKeys(tagKeys); + message.setTagValues(tagValues); + + // Parsing Attributes + List attributeKeys = new ArrayList<>(); + List attributeValues = new ArrayList<>(); + attributeKeys.add(JSON_KEY_DEVICETYPE); + attributeValues.add(jsonObject.get(JSON_KEY_DEVICETYPE).getAsString()); + message.setAttributeKeys(attributeKeys); + message.setAttributeValues(attributeValues); + + // Parsing Fields + List fields = Arrays.asList(JSON_KEY_POINT); + List dataTypes = Arrays.asList(TSDataType.FLOAT); + List values = Arrays.asList(jsonObject.get(JSON_KEY_VALUE).getAsFloat()); + message.setFields(fields); + message.setDataTypes(dataTypes); + message.setValues(values); + + // Parsing timestamp + message.setTimestamp(jsonObject.get(JSON_KEY_TIME).getAsLong()); + return Lists.newArrayList(message); } @Override public String getName() { - // set the value of mqtt_payload_formatter in iotdb-system.properties as the following string: - return "CustomizedJson"; + // set the value of mqtt_payload_formatter in iotdb-common.properties as the following string: + return "CustomizedJson2Table"; + } + + @Override + public String getType() { + return PayloadFormatter.TABLE_TYPE; } } ``` @@ -171,13 +272,14 @@ Then, in your server: 1. Create ${IOTDB_HOME}/ext/mqtt/ folder, and put the jar into this folder. 2. Update configuration to enable MQTT service. (`enable_mqtt_service=true` in `conf/iotdb-system.properties`) 3. Set the value of `mqtt_payload_formatter` in `conf/iotdb-system.properties` as the value of getName() in your implementation - , in this example, the value is `CustomizedJson` + , in this example, the value is `CustomizedJson2Table` 4. Launch the IoTDB server. 5. Now IoTDB will use your implementation to parse the MQTT message. More: the message format can be anything you want. For example, if it is a binary format, just use `payload.forEachByte()` or `payload.array` to get bytes content. + ## 6. Caution To avoid compatibility issues caused by a default client_id, always explicitly supply a unique, non-empty client_id in every MQTT client. @@ -189,4 +291,3 @@ Behavior varies when the client_id is missing or empty. Common examples: • MQTTX: IoTDB accepts the message. • mosquitto_pub: IoTDB rejects the connection. Therefore, explicitly assigning a unique, non-empty client_id is the simplest way to eliminate these discrepancies and ensure reliable message delivery. - diff --git a/src/UserGuide/latest-Table/API/Programming-MQTT.md b/src/UserGuide/latest-Table/API/Programming-MQTT.md index e20b3cad4..239159ab9 100644 --- a/src/UserGuide/latest-Table/API/Programming-MQTT.md +++ b/src/UserGuide/latest-Table/API/Programming-MQTT.md @@ -88,3 +88,172 @@ The table name is derived from the `` in the line protocol. | 1`i32`
123`i32` | INT32 | | `"xxx"` | TEXT | | `t`,`T`,`true`,`True`,`TRUE`
`f`,`F`,`false`,`False`,`FALSE` | BOOLEAN | + + +## 5. Coding Examples +The following is an example which a mqtt client send messages to IoTDB server. + + ```java +MQTT mqtt = new MQTT(); +mqtt.setHost("127.0.0.1", 1883); +mqtt.setUserName("root"); +mqtt.setPassword("root"); + +BlockingConnection connection = mqtt.blockingConnection(); +String DATABASE = "myMqttTest"; +connection.connect(); + +String payload = + "test1,tag1=t1,tag2=t2 attr3=a5,attr4=a4 field1=\"fieldValue1\",field2=1i,field3=1u 1"; +connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); +Thread.sleep(10); + +payload = "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 2"; +connection.publish(DATABASE, payload.getBytes(), QoS.AT_LEAST_ONCE, false); +Thread.sleep(10); + +payload = "# It's a remark\n " + "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 6"; + connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); + Thread.sleep(10); + +//batch write example +payload = + "test1,tag1=t1,tag2=t2 field7=t,field8=T,field9=true 3 \n " + + "test1,tag1=t1,tag2=t2 field7=f,field8=F,field9=FALSE 4"; +connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); +Thread.sleep(10); + +//batch write example +payload = + "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 field1=\"fieldValue1\",field2=1i,field3=1u 4 \n " + + "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 5"; +connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); +Thread.sleep(10); + +connection.disconnect(); + ``` + + + +## 6. Customize your MQTT Message Format + +If you do not like the above Line format, you can customize your MQTT Message format by just writing several lines +of codes. An example can be found in [example/mqtt-customize](https://github.com/apache/iotdb/tree/master/example/mqtt-customize) project. + +Steps: +1. Create a java project, and add dependency: +```xml + + org.apache.iotdb + iotdb-server + 2.0.4-SNAPSHOT + +``` +2. Define your implementation which implements `org.apache.iotdb.db.protocol.mqtt.PayloadFormatter` + e.g., + +```java +package org.apache.iotdb.mqtt.server; + +import io.netty.buffer.ByteBuf; +import org.apache.iotdb.db.protocol.mqtt.Message; +import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class CustomizedLinePayloadFormatter implements PayloadFormatter { + + @Override + public List format(String topic, ByteBuf payload) { + // Suppose the payload is a line format + if (payload == null) { + return null; + } + + String line = payload.toString(StandardCharsets.UTF_8); + // parse data from the line and generate Messages and put them into List ret + List ret = new ArrayList<>(); + // this is just an example, so we just generate some Messages directly + for (int i = 0; i < 3; i++) { + long ts = i; + TableMessage message = new TableMessage(); + + // Parsing Database Name + message.setDatabase("db" + i); + + //Parsing Table Names + message.setTable("t" + i); + + // Parsing Tags + List tagKeys = new ArrayList<>(); + tagKeys.add("tag1" + i); + tagKeys.add("tag2" + i); + List tagValues = new ArrayList<>(); + tagValues.add("t_value1" + i); + tagValues.add("t_value2" + i); + message.setTagKeys(tagKeys); + message.setTagValues(tagValues); + + // Parsing Attributes + List attributeKeys = new ArrayList<>(); + List attributeValues = new ArrayList<>(); + attributeKeys.add("attr1" + i); + attributeKeys.add("attr2" + i); + attributeValues.add("a_value1" + i); + attributeValues.add("a_value2" + i); + message.setAttributeKeys(attributeKeys); + message.setAttributeValues(attributeValues); + + // Parsing Fields + List fields = Arrays.asList("field1" + i, "field2" + i); + List dataTypes = Arrays.asList(TSDataType.FLOAT, TSDataType.FLOAT); + List values = Arrays.asList("4.0" + i, "5.0" + i); + message.setFields(fields); + message.setDataTypes(dataTypes); + message.setValues(values); + + //// Parsing timestamp + message.setTimestamp(ts); + ret.add(message); + } + return ret; + } + + @Override + public String getName() { + // set the value of mqtt_payload_formatter in iotdb-system.properties as the following string: + return "CustomizedLine"; + } +} +``` +3. modify the file in `src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter`: + clean the file and put your implementation class name into the file. + In this example, the content is: `org.apache.iotdb.mqtt.server.CustomizedLinePayloadFormatter` +4. compile your implementation as a jar file: `mvn package -DskipTests` + + +Then, in your server: +1. Create ${IOTDB_HOME}/ext/mqtt/ folder, and put the jar into this folder. +2. Update configuration to enable MQTT service. (`enable_mqtt_service=true` in `conf/iotdb-system.properties`) +3. Set the value of `mqtt_payload_formatter` in `conf/iotdb-system.properties` as the value of getName() in your implementation + , in this example, the value is `CustomizedLine` +4. Launch the IoTDB server. +5. Now IoTDB will use your implementation to parse the MQTT message. + +More: the message format can be anything you want. For example, if it is a binary format, +just use `payload.forEachByte()` or `payload.array` to get bytes content. + +## 7. Caution + +To avoid compatibility issues caused by a default client_id, always explicitly supply a unique, non-empty client_id in every MQTT client. +Behavior varies when the client_id is missing or empty. Common examples: +1. Explicitly sending an empty string +• MQTTX: When client_id="", IoTDB silently discards the message. +• mosquitto_pub: When client_id="", IoTDB receives the message normally. +2. Omitting client_id entirely +• MQTTX: IoTDB accepts the message. +• mosquitto_pub: IoTDB rejects the connection. +Therefore, explicitly assigning a unique, non-empty client_id is the simplest way to eliminate these discrepancies and ensure reliable message delivery. \ No newline at end of file diff --git a/src/UserGuide/latest/API/Programming-MQTT.md b/src/UserGuide/latest/API/Programming-MQTT.md index a03830f0e..f99e0cdda 100644 --- a/src/UserGuide/latest/API/Programming-MQTT.md +++ b/src/UserGuide/latest/API/Programming-MQTT.md @@ -20,17 +20,16 @@ --> # MQTT Protocol -[MQTT](http://mqtt.org/) is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. -It was designed as an extremely lightweight publish/subscribe messaging transport. -It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium. +## 1. Overview -IoTDB supports the MQTT v3.1(an OASIS Standard) protocol. -IoTDB server includes a built-in MQTT service that allows remote devices send messages into IoTDB server directly. +MQTT (Message Queuing Telemetry Transport) is a lightweight messaging protocol designed for IoT and low-bandwidth environments. It operates on a Publish/Subscribe (Pub/Sub) model, enabling efficient and reliable bidirectional communication between devices. Its core objectives are low power consumption, minimal bandwidth usage, and high real-time performance, making it ideal for unstable networks or resource-constrained scenarios (e.g., sensors, mobile devices). + +IoTDB provides deep integration with the MQTT protocol, fully compliant with MQTT v3.1 (OASIS International Standard). The IoTDB server includes a built-in high-performance MQTT Broker module, eliminating the need for third-party middleware. Devices can directly write time-series data into the IoTDB storage engine via MQTT messages. -## 1. Built-in MQTT Service +## 2. Built-in MQTT Service The Built-in MQTT Service provide the ability of direct connection to IoTDB through MQTT. It listen the publish messages from MQTT clients and then write the data into storage immediately. The MQTT topic corresponds to IoTDB timeseries. @@ -58,22 +57,21 @@ or json array of the above two. -## 2. MQTT Configurations +## 3. MQTT Configurations The IoTDB MQTT service load configurations from `${IOTDB_HOME}/${IOTDB_CONF}/iotdb-system.properties` by default. Configurations are as follows: -| NAME | DESCRIPTION | DEFAULT | -| ------------- |:-------------:|:------:| -| enable_mqtt_service | whether to enable the mqtt service | false | -| mqtt_host | the mqtt service binding host | 127.0.0.1 | -| mqtt_port | the mqtt service binding port | 1883 | -| mqtt_handler_pool_size | the handler pool size for handing the mqtt messages | 1 | -| mqtt_payload_formatter | the mqtt message payload formatter | json | -| mqtt_max_message_size | the max mqtt message size in byte| 1048576 | - +| **Property** | **Description** | **Default** | +| ------------------------ | ---------------------------------------------------------------------------------------------------------------------- | ------------------- | +| `enable_mqtt_service` | Enable/ disable the MQTT service. | FALSE | +| `mqtt_host` | Host address bound to the MQTT service. | 127.0.0.1 | +| `mqtt_port` | Port bound to the MQTT service. | 1883 | +| `mqtt_handler_pool_size` | Thread pool size for processing MQTT messages. | 1 | +| **`mqtt_payload_formatter`** | **Formatting method for MQTT message payloads. ​**​**Options: `json` (tree model), `line` (table model).** | **json** | +| `mqtt_max_message_size` | Maximum allowed MQTT message size (bytes). | 1048576 | -## 3. Coding Examples +## 4. Coding Examples The following is an example which a mqtt client send messages to IoTDB server. ```java @@ -101,18 +99,49 @@ connection.disconnect(); ``` -## 4. Customize your MQTT Message Format +## 5. Customize your MQTT Message Format + +In a production environment, each device typically has its own MQTT client, and the message formats of these clients have been pre-defined. If communication is to be carried out in accordance with the MQTT message format supported by IoTDB, a comprehensive upgrade and transformation of all existing clients would be required, which would undoubtedly incur significant costs. However, we can easily achieve customization of the MQTT message format through simple programming means, without the need to modify the clients. +An example can be found in [example/mqtt-customize](https://github.com/apache/iotdb/tree/rc/2.0.1/example/mqtt-customize) project. + +Assuming the MQTT client sends the following message format: +```json + { + "time":1586076045523, + "deviceID":"car_1", + "deviceType":"Gasoline car​​", + "point":"Fuel level​​", + "value":10.0 +} +``` +Or in the form of an array of JSON: +```java +[ + { + "time":1586076045523, + "deviceID":"car_1", + "deviceType":"Gasoline car​​", + "point":"Fuel level", + "value":10.0 + }, + { + "time":1586076045524, + "deviceID":"car_2", + "deviceType":"NEV(new enegry vehicle)", + "point":"Speed", + "value":80.0 + } +] +``` -If you do not like the above Json format, you can customize your MQTT Message format by just writing several lines -of codes. An example can be found in [example/mqtt-customize](https://github.com/apache/iotdb/tree/rc/2.0.1/example/mqtt-customize) project. +Then you can set up the custom MQTT message format through the following steps: -Steps: 1. Create a java project, and add dependency: ```xml org.apache.iotdb iotdb-server - 1.1.0-SNAPSHOT + 2.0.4-SNAPSHOT ``` 2. Define your implementation which implements `org.apache.iotdb.db.protocol.mqtt.PayloadFormatter` @@ -121,44 +150,115 @@ e.g., ```java package org.apache.iotdb.mqtt.server; -import io.netty.buffer.ByteBuf; import org.apache.iotdb.db.protocol.mqtt.Message; import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter; +import org.apache.iotdb.db.protocol.mqtt.TableMessage; + +import com.google.common.collect.Lists; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.tsfile.enums.TSDataType; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +/** + * The Customized JSON payload formatter. one json format supported: { "time":1586076045523, + * "deviceID":"car_1", "deviceType":"NEV", "point":"Speed", "value":80.0 } + */ public class CustomizedJsonPayloadFormatter implements PayloadFormatter { + private static final String JSON_KEY_TIME = "time"; + private static final String JSON_KEY_DEVICEID = "deviceID"; + private static final String JSON_KEY_DEVICETYPE = "deviceType"; + private static final String JSON_KEY_POINT = "point"; + private static final String JSON_KEY_VALUE = "value"; + private static final Gson GSON = new GsonBuilder().create(); @Override - public List format(ByteBuf payload) { - // Suppose the payload is a json format + public List format(String topic, ByteBuf payload) { if (payload == null) { - return null; + return new ArrayList<>(); } - - String json = payload.toString(StandardCharsets.UTF_8); - // parse data from the json and generate Messages and put them into List ret - List ret = new ArrayList<>(); - // this is just an example, so we just generate some Messages directly - for (int i = 0; i < 2; i++) { - long ts = i; - Message message = new Message(); - message.setDevice("d" + i); - message.setTimestamp(ts); - message.setMeasurements(Arrays.asList("s1", "s2")); - message.setValues(Arrays.asList("4.0" + i, "5.0" + i)); - ret.add(message); + String txt = payload.toString(StandardCharsets.UTF_8); + JsonElement jsonElement = GSON.fromJson(txt, JsonElement.class); + if (jsonElement.isJsonObject()) { + JsonObject jsonObject = jsonElement.getAsJsonObject(); + return formatTableRow(topic, jsonObject); + } else if (jsonElement.isJsonArray()) { + JsonArray jsonArray = jsonElement.getAsJsonArray(); + List messages = new ArrayList<>(); + for (JsonElement element : jsonArray) { + JsonObject jsonObject = element.getAsJsonObject(); + messages.addAll(formatTableRow(topic, jsonObject)); + } + return messages; } - return ret; + throw new JsonParseException("payload is invalidate"); + } + + @Override + @Deprecated + public List format(ByteBuf payload) { + throw new NotImplementedException(); + } + + private List formatTableRow(String topic, JsonObject jsonObject) { + TableMessage message = new TableMessage(); + String database = !topic.contains("/") ? topic : topic.substring(0, topic.indexOf("/")); + String table = "test_table"; + + // Parsing Database Name + message.setDatabase((database)); + + // Parsing Table Name + message.setTable(table); + + // Parsing Tags + List tagKeys = new ArrayList<>(); + tagKeys.add(JSON_KEY_DEVICEID); + List tagValues = new ArrayList<>(); + tagValues.add(jsonObject.get(JSON_KEY_DEVICEID).getAsString()); + message.setTagKeys(tagKeys); + message.setTagValues(tagValues); + + // Parsing Attributes + List attributeKeys = new ArrayList<>(); + List attributeValues = new ArrayList<>(); + attributeKeys.add(JSON_KEY_DEVICETYPE); + attributeValues.add(jsonObject.get(JSON_KEY_DEVICETYPE).getAsString()); + message.setAttributeKeys(attributeKeys); + message.setAttributeValues(attributeValues); + + // Parsing Fields + List fields = Arrays.asList(JSON_KEY_POINT); + List dataTypes = Arrays.asList(TSDataType.FLOAT); + List values = Arrays.asList(jsonObject.get(JSON_KEY_VALUE).getAsFloat()); + message.setFields(fields); + message.setDataTypes(dataTypes); + message.setValues(values); + + // Parsing timestamp + message.setTimestamp(jsonObject.get(JSON_KEY_TIME).getAsLong()); + return Lists.newArrayList(message); } @Override public String getName() { - // set the value of mqtt_payload_formatter in iotdb-system.properties as the following string: - return "CustomizedJson"; + // set the value of mqtt_payload_formatter in iotdb-common.properties as the following string: + return "CustomizedJson2Table"; + } + + @Override + public String getType() { + return PayloadFormatter.TABLE_TYPE; } } ``` @@ -172,7 +272,7 @@ Then, in your server: 1. Create ${IOTDB_HOME}/ext/mqtt/ folder, and put the jar into this folder. 2. Update configuration to enable MQTT service. (`enable_mqtt_service=true` in `conf/iotdb-system.properties`) 3. Set the value of `mqtt_payload_formatter` in `conf/iotdb-system.properties` as the value of getName() in your implementation - , in this example, the value is `CustomizedJson` + , in this example, the value is `CustomizedJson2Table` 4. Launch the IoTDB server. 5. Now IoTDB will use your implementation to parse the MQTT message. @@ -180,4 +280,14 @@ More: the message format can be anything you want. For example, if it is a binar just use `payload.forEachByte()` or `payload.array` to get bytes content. +## 6. Caution +To avoid compatibility issues caused by a default client_id, always explicitly supply a unique, non-empty client_id in every MQTT client. +Behavior varies when the client_id is missing or empty. Common examples: +1. Explicitly sending an empty string + • MQTTX: When client_id="", IoTDB silently discards the message. + • mosquitto_pub: When client_id="", IoTDB receives the message normally. +2. Omitting client_id entirely + • MQTTX: IoTDB accepts the message. + • mosquitto_pub: IoTDB rejects the connection. + Therefore, explicitly assigning a unique, non-empty client_id is the simplest way to eliminate these discrepancies and ensure reliable message delivery. diff --git a/src/zh/UserGuide/Master/Table/API/Programming-MQTT.md b/src/zh/UserGuide/Master/Table/API/Programming-MQTT.md index 56b0261b1..8f283f862 100644 --- a/src/zh/UserGuide/Master/Table/API/Programming-MQTT.md +++ b/src/zh/UserGuide/Master/Table/API/Programming-MQTT.md @@ -26,7 +26,7 @@ MQTT 是一种专为物联网(IoT)和低带宽环境设计的轻量级消息 IoTDB 深度集成了 MQTT 协议能力,完整兼容 MQTT v3.1(OASIS 国际标准协议)。IoTDB 服务器内置高性能 MQTT Broker 服务模块,无需第三方中间件,支持设备通过 MQTT 报文将时序数据直接写入 IoTDB 存储引擎。 - +![](/img/mqtt-table-1.png) ## 2. 配置方式 @@ -92,7 +92,6 @@ databaseName:stock | `t`,`T`,`true`,`True`,`TRUE`
`f`,`F`,`false`,`False`,`FALSE` | BOOLEAN | - ## 5. 代码示例 以下是 mqtt 客户端将消息发送到 IoTDB 服务器的示例。 @@ -103,19 +102,35 @@ mqtt.setUserName("root"); mqtt.setPassword("root"); BlockingConnection connection = mqtt.blockingConnection(); +String DATABASE = "myMqttTest"; connection.connect(); -Random random = new Random(); -for (int i = 0; i < 10; i++) { - String payload = String.format("{\n" + - "\"device\":\"root.sg.d1\",\n" + - "\"timestamp\":%d,\n" + - "\"measurements\":[\"s1\"],\n" + - "\"values\":[%f]\n" + - "}", System.currentTimeMillis(), random.nextDouble()); - - connection.publish("root.sg.d1.s1", payload.getBytes(), QoS.AT_LEAST_ONCE, false); -} +String payload = + "test1,tag1=t1,tag2=t2 attr3=a5,attr4=a4 field1=\"fieldValue1\",field2=1i,field3=1u 1"; +connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); +Thread.sleep(10); + +payload = "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 2"; +connection.publish(DATABASE, payload.getBytes(), QoS.AT_LEAST_ONCE, false); +Thread.sleep(10); + +payload = "# It's a remark\n " + "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 6"; + connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); + Thread.sleep(10); + +//批量写入示例 +payload = + "test1,tag1=t1,tag2=t2 field7=t,field8=T,field9=true 3 \n " + + "test1,tag1=t1,tag2=t2 field7=f,field8=F,field9=FALSE 4"; +connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); +Thread.sleep(10); + +//批量写入示例 +payload = + "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 field1=\"fieldValue1\",field2=1i,field3=1u 4 \n " + + "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 5"; +connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); +Thread.sleep(10); connection.disconnect(); ``` @@ -165,10 +180,10 @@ public class CustomizedLinePayloadFormatter implements PayloadFormatter { for (int i = 0; i < 3; i++) { long ts = i; TableMessage message = new TableMessage(); - + // Parsing Database Name message.setDatabase("db" + i); - + //Parsing Table Names message.setTable("t" + i); @@ -199,7 +214,7 @@ public class CustomizedLinePayloadFormatter implements PayloadFormatter { message.setFields(fields); message.setDataTypes(dataTypes); message.setValues(values); - + //// Parsing timestamp message.setTimestamp(ts); ret.add(message); @@ -245,5 +260,3 @@ More: MQTT 协议的消息不限于 line,你还可以用任意二进制。通 • mosquitto_pub:IoTDB拒绝连接。 由此可见,显式指定唯一且非空的client_id是消除上述差异、确保消息可靠投递的最简单做法。 - - diff --git a/src/zh/UserGuide/Master/Tree/API/Programming-MQTT.md b/src/zh/UserGuide/Master/Tree/API/Programming-MQTT.md index 9bcff3885..a145aff56 100644 --- a/src/zh/UserGuide/Master/Tree/API/Programming-MQTT.md +++ b/src/zh/UserGuide/Master/Tree/API/Programming-MQTT.md @@ -23,7 +23,7 @@ ## 1. 概述 -[MQTT](http://mqtt.org/) 是一种专为物联网(IoT)和低带宽环境设计的轻量级消息传输协议,基于发布/订阅(Pub/Sub)模型,支持设备间高效、可靠的双向通信。其核心目标是低功耗、低带宽消耗和高实时性,尤其适合网络不稳定或资源受限的场景(如传感器、移动设备)。 +MQTT 是一种专为物联网(IoT)和低带宽环境设计的轻量级消息传输协议,基于发布/订阅(Pub/Sub)模型,支持设备间高效、可靠的双向通信。其核心目标是低功耗、低带宽消耗和高实时性,尤其适合网络不稳定或资源受限的场景(如传感器、移动设备)。 IoTDB 深度集成了 MQTT 协议能力,完整兼容 MQTT v3.1(OASIS 国际标准协议)。IoTDB 服务器内置高性能 MQTT Broker 服务模块,无需第三方中间件,支持设备通过 MQTT 报文将时序数据直接写入 IoTDB 存储引擎。 @@ -59,7 +59,7 @@ MQTT 主题与 IoTDB 时间序列相对应。 ## 3. MQTT 配置 默认情况下,IoTDB MQTT 服务从`${IOTDB_HOME}/${IOTDB_CONF}/iotdb-system.properties`加载配置。 -具体配置项如下: +配置如下: | **名称** | **描述** | **默认** | |---------------------------| --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------- | @@ -71,8 +71,7 @@ MQTT 主题与 IoTDB 时间序列相对应。 | `mqtt_max_message_size` | mqtt 消息最大长度(字节) | 1048576 | - -## 4. 代码示例 +## 4. 示例代码 以下是 mqtt 客户端将消息发送到 IoTDB 服务器的示例。 ```java @@ -102,10 +101,42 @@ connection.disconnect(); ## 5. 自定义 MQTT 消息格式 -事实上可以通过简单编程来实现 MQTT 消息的格式自定义。 -可以在源码的 [example/mqtt-customize](https://github.com/apache/iotdb/tree/master/example/mqtt-customize) 项目中找到一个简单示例。 +在生产环境中,每个设备通常都配备了自己的 MQTT 客户端,且这些客户端的消息格式已经预先设定。如果按照 IoTDB 所支持的 MQTT 消息格式进行通信,就需要对现有的所有客户端进行全面的升级改造,这无疑会带来较高的成本。然而,我们可以通过简单的编程手段,轻松实现 MQTT 消息格式的自定义,而无需改造客户端。 +可以在源码的 [example/mqtt-customize](https://github.com/apache/iotdb/tree/rc/2.0.1/example/mqtt-customize) 项目中找到一个简单示例。 + +假定mqtt客户端传过来的是以下消息格式: +```json + { + "time":1586076045523, + "deviceID":"car_1", + "deviceType":"油车", + "point":"油量", + "value":10.0 +} +``` +或者JSON的数组形式: +```java +[ + { + "time":1586076045523, + "deviceID":"car_1", + "deviceType":"油车", + "point":"油量", + "value":10.0 + }, + { + "time":1586076045524, + "deviceID":"car_2", + "deviceType":"新能源车", + "point":"速度", + "value":80.0 + } +] +``` + + +则可以通过以下步骤设置设置自定义MQTT消息格式: -步骤: 1. 创建一个 Java 项目,增加如下依赖 ```xml @@ -119,44 +150,115 @@ connection.disconnect(); ```java package org.apache.iotdb.mqtt.server; -import io.netty.buffer.ByteBuf; import org.apache.iotdb.db.protocol.mqtt.Message; import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter; +import org.apache.iotdb.db.protocol.mqtt.TableMessage; + +import com.google.common.collect.Lists; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.tsfile.enums.TSDataType; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +/** + * The Customized JSON payload formatter. one json format supported: { "time":1586076045523, + * "deviceID":"car_1", "deviceType":"新能源车", "point":"速度", "value":80.0 } + */ public class CustomizedJsonPayloadFormatter implements PayloadFormatter { + private static final String JSON_KEY_TIME = "time"; + private static final String JSON_KEY_DEVICEID = "deviceID"; + private static final String JSON_KEY_DEVICETYPE = "deviceType"; + private static final String JSON_KEY_POINT = "point"; + private static final String JSON_KEY_VALUE = "value"; + private static final Gson GSON = new GsonBuilder().create(); @Override public List format(String topic, ByteBuf payload) { - // Suppose the payload is a json format if (payload == null) { - return null; + return new ArrayList<>(); } - - String json = payload.toString(StandardCharsets.UTF_8); - // parse data from the json and generate Messages and put them into List ret - List ret = new ArrayList<>(); - // this is just an example, so we just generate some Messages directly - for (int i = 0; i < 2; i++) { - long ts = i; - Message message = new Message(); - message.setDevice("d" + i); - message.setTimestamp(ts); - message.setMeasurements(Arrays.asList("s1", "s2")); - message.setValues(Arrays.asList("4.0" + i, "5.0" + i)); - ret.add(message); + String txt = payload.toString(StandardCharsets.UTF_8); + JsonElement jsonElement = GSON.fromJson(txt, JsonElement.class); + if (jsonElement.isJsonObject()) { + JsonObject jsonObject = jsonElement.getAsJsonObject(); + return formatTableRow(topic, jsonObject); + } else if (jsonElement.isJsonArray()) { + JsonArray jsonArray = jsonElement.getAsJsonArray(); + List messages = new ArrayList<>(); + for (JsonElement element : jsonArray) { + JsonObject jsonObject = element.getAsJsonObject(); + messages.addAll(formatTableRow(topic, jsonObject)); + } + return messages; } - return ret; + throw new JsonParseException("payload is invalidate"); + } + + @Override + @Deprecated + public List format(ByteBuf payload) { + throw new NotImplementedException(); + } + + private List formatTableRow(String topic, JsonObject jsonObject) { + TableMessage message = new TableMessage(); + String database = !topic.contains("/") ? topic : topic.substring(0, topic.indexOf("/")); + String table = "test_table"; + + // Parsing Database Name + message.setDatabase((database)); + + // Parsing Table Name + message.setTable(table); + + // Parsing Tags + List tagKeys = new ArrayList<>(); + tagKeys.add(JSON_KEY_DEVICEID); + List tagValues = new ArrayList<>(); + tagValues.add(jsonObject.get(JSON_KEY_DEVICEID).getAsString()); + message.setTagKeys(tagKeys); + message.setTagValues(tagValues); + + // Parsing Attributes + List attributeKeys = new ArrayList<>(); + List attributeValues = new ArrayList<>(); + attributeKeys.add(JSON_KEY_DEVICETYPE); + attributeValues.add(jsonObject.get(JSON_KEY_DEVICETYPE).getAsString()); + message.setAttributeKeys(attributeKeys); + message.setAttributeValues(attributeValues); + + // Parsing Fields + List fields = Arrays.asList(JSON_KEY_POINT); + List dataTypes = Arrays.asList(TSDataType.FLOAT); + List values = Arrays.asList(jsonObject.get(JSON_KEY_VALUE).getAsFloat()); + message.setFields(fields); + message.setDataTypes(dataTypes); + message.setValues(values); + + // Parsing timestamp + message.setTimestamp(jsonObject.get(JSON_KEY_TIME).getAsLong()); + return Lists.newArrayList(message); } @Override public String getName() { - // set the value of mqtt_payload_formatter in iotdb-system.properties as the following string: - return "CustomizedJson"; + // set the value of mqtt_payload_formatter in iotdb-common.properties as the following string: + return "CustomizedJson2Table"; + } + + @Override + public String getType() { + return PayloadFormatter.TABLE_TYPE; } } ``` @@ -170,7 +272,7 @@ public class CustomizedJsonPayloadFormatter implements PayloadFormatter { 1. 创建 ${IOTDB_HOME}/ext/mqtt/ 文件夹, 将刚才的 jar 包放入此文件夹。 2. 打开 MQTT 服务参数. (`enable_mqtt_service=true` in `conf/iotdb-system.properties`) 3. 用刚才的实现类中的 getName() 方法的返回值 设置为 `conf/iotdb-system.properties` 中 `mqtt_payload_formatter` 的值, - , 在本例中,为 `CustomizedJson` + , 在本例中,为 `CustomizedJson2Table` 4. 启动 IoTDB 5. 搞定 diff --git a/src/zh/UserGuide/latest-Table/API/Programming-MQTT.md b/src/zh/UserGuide/latest-Table/API/Programming-MQTT.md index 1ba71f7f0..b243ff3eb 100644 --- a/src/zh/UserGuide/latest-Table/API/Programming-MQTT.md +++ b/src/zh/UserGuide/latest-Table/API/Programming-MQTT.md @@ -91,3 +91,172 @@ databaseName:stock | `"xxx"` | TEXT | | `t`,`T`,`true`,`True`,`TRUE`
`f`,`F`,`false`,`False`,`FALSE` | BOOLEAN | + +## 5. 代码示例 +以下是 mqtt 客户端将消息发送到 IoTDB 服务器的示例。 + + ```java +MQTT mqtt = new MQTT(); +mqtt.setHost("127.0.0.1", 1883); +mqtt.setUserName("root"); +mqtt.setPassword("root"); + +BlockingConnection connection = mqtt.blockingConnection(); +String DATABASE = "myMqttTest"; +connection.connect(); + +String payload = + "test1,tag1=t1,tag2=t2 attr3=a5,attr4=a4 field1=\"fieldValue1\",field2=1i,field3=1u 1"; +connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); +Thread.sleep(10); + +payload = "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 2"; +connection.publish(DATABASE, payload.getBytes(), QoS.AT_LEAST_ONCE, false); +Thread.sleep(10); + +payload = "# It's a remark\n " + "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 6"; + connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); + Thread.sleep(10); + +//批量写入示例 +payload = + "test1,tag1=t1,tag2=t2 field7=t,field8=T,field9=true 3 \n " + + "test1,tag1=t1,tag2=t2 field7=f,field8=F,field9=FALSE 4"; +connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); +Thread.sleep(10); + +//批量写入示例 +payload = + "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 field1=\"fieldValue1\",field2=1i,field3=1u 4 \n " + + "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 5"; +connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false); +Thread.sleep(10); + +connection.disconnect(); + ``` + + +## 6. 自定义 MQTT 消息格式 + +事实上可以通过简单编程来实现 MQTT 消息的格式自定义。 +可以在源码的 [example/mqtt-customize](https://github.com/apache/iotdb/tree/master/example/mqtt-customize) 项目中找到一个简单示例。 + +步骤: +1. 创建一个 Java 项目,增加如下依赖 +```xml + + org.apache.iotdb + iotdb-server + 2.0.4-SNAPSHOT + +``` +2. 创建一个实现类,实现接口 `org.apache.iotdb.db.mqtt.protocol.PayloadFormatter` + +```java +package org.apache.iotdb.mqtt.server; + +import io.netty.buffer.ByteBuf; +import org.apache.iotdb.db.protocol.mqtt.Message; +import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class CustomizedLinePayloadFormatter implements PayloadFormatter { + + @Override + public List format(String topic, ByteBuf payload) { + // Suppose the payload is a line format + if (payload == null) { + return null; + } + + String line = payload.toString(StandardCharsets.UTF_8); + // parse data from the line and generate Messages and put them into List ret + List ret = new ArrayList<>(); + // this is just an example, so we just generate some Messages directly + for (int i = 0; i < 3; i++) { + long ts = i; + TableMessage message = new TableMessage(); + + // Parsing Database Name + message.setDatabase("db" + i); + + //Parsing Table Names + message.setTable("t" + i); + + // Parsing Tags + List tagKeys = new ArrayList<>(); + tagKeys.add("tag1" + i); + tagKeys.add("tag2" + i); + List tagValues = new ArrayList<>(); + tagValues.add("t_value1" + i); + tagValues.add("t_value2" + i); + message.setTagKeys(tagKeys); + message.setTagValues(tagValues); + + // Parsing Attributes + List attributeKeys = new ArrayList<>(); + List attributeValues = new ArrayList<>(); + attributeKeys.add("attr1" + i); + attributeKeys.add("attr2" + i); + attributeValues.add("a_value1" + i); + attributeValues.add("a_value2" + i); + message.setAttributeKeys(attributeKeys); + message.setAttributeValues(attributeValues); + + // Parsing Fields + List fields = Arrays.asList("field1" + i, "field2" + i); + List dataTypes = Arrays.asList(TSDataType.FLOAT, TSDataType.FLOAT); + List values = Arrays.asList("4.0" + i, "5.0" + i); + message.setFields(fields); + message.setDataTypes(dataTypes); + message.setValues(values); + + //// Parsing timestamp + message.setTimestamp(ts); + ret.add(message); + } + return ret; + } + + @Override + public String getName() { + // set the value of mqtt_payload_formatter in iotdb-system.properties as the following string: + return "CustomizedLine"; + } +} +``` + + +3. 修改项目中的 `src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter` 文件: + 将示例中的文件内容清除,并将刚才的实现类的全名(包名.类名)写入文件中。注意,这个文件中只有一行。 + 在本例中,文件内容为: `org.apache.iotdb.mqtt.server.CustomizedLinePayloadFormatter` +4. 编译项目生成一个 jar 包: `mvn package -DskipTests` + + +在 IoTDB 服务端: +1. 创建 ${IOTDB_HOME}/ext/mqtt/ 文件夹, 将刚才的 jar 包放入此文件夹。 +2. 打开 MQTT 服务参数. (`enable_mqtt_service=true` in `conf/iotdb-system.properties`) +3. 用刚才的实现类中的 getName() 方法的返回值 设置为 `conf/iotdb-system.properties` 中 `mqtt_payload_formatter` 的值, + , 在本例中,为 `CustomizedLine` +4. 启动 IoTDB +5. 搞定 + +More: MQTT 协议的消息不限于 line,你还可以用任意二进制。通过如下函数获得: +`payload.forEachByte()` or `payload.array`。 + + +## 7. 注意事项 + +为避免因缺省client_id引发的兼容性问题,强烈建议在所有MQTT客户端中始终显式地提供唯一且非空的 client_id。 +不同客户端在client_id缺失或为空时的表现并不一致,常见示例如下: +1. 显式传入空字符串 +• MQTTX:client_id=""时,IoTDB会直接丢弃消息; +• mosquitto_pub:client_id=""时,IoTDB能正常接收消息。 +2. 完全不传client_id +• MQTTX:消息可被IoTDB正常接收; +• mosquitto_pub:IoTDB拒绝连接。 +由此可见,显式指定唯一且非空的client_id是消除上述差异、确保消息可靠投递的最简单做法。 \ No newline at end of file diff --git a/src/zh/UserGuide/latest/API/Programming-MQTT.md b/src/zh/UserGuide/latest/API/Programming-MQTT.md index 6e6bed15c..a145aff56 100644 --- a/src/zh/UserGuide/latest/API/Programming-MQTT.md +++ b/src/zh/UserGuide/latest/API/Programming-MQTT.md @@ -21,18 +21,15 @@ # MQTT 协议 -[MQTT](http://mqtt.org/) 是机器对机器(M2M)/“物联网”连接协议。 +## 1. 概述 -它被设计为一种非常轻量级的发布/订阅消息传递。 +MQTT 是一种专为物联网(IoT)和低带宽环境设计的轻量级消息传输协议,基于发布/订阅(Pub/Sub)模型,支持设备间高效、可靠的双向通信。其核心目标是低功耗、低带宽消耗和高实时性,尤其适合网络不稳定或资源受限的场景(如传感器、移动设备)。 -对于与需要较小代码占用和/或网络带宽非常宝贵的远程位置的连接很有用。 - -IoTDB 支持 MQTT v3.1(OASIS 标准)协议。 -IoTDB 服务器包括内置的 MQTT 服务,该服务允许远程设备将消息直接发送到 IoTDB 服务器。 +IoTDB 深度集成了 MQTT 协议能力,完整兼容 MQTT v3.1(OASIS 国际标准协议)。IoTDB 服务器内置高性能 MQTT Broker 服务模块,无需第三方中间件,支持设备通过 MQTT 报文将时序数据直接写入 IoTDB 存储引擎。 -## 1. 内置 MQTT 服务 +## 2. 内置 MQTT 服务 内置的 MQTT 服务提供了通过 MQTT 直接连接到 IoTDB 的能力。 它侦听来自 MQTT 客户端的发布消息,然后立即将数据写入存储。 MQTT 主题与 IoTDB 时间序列相对应。 消息有效载荷可以由 Java SPI 加载的`PayloadFormatter`格式化为事件,默认实现为`JSONPayloadFormatter` @@ -59,21 +56,22 @@ MQTT 主题与 IoTDB 时间序列相对应。 -## 2. MQTT 配置 +## 3. MQTT 配置 默认情况下,IoTDB MQTT 服务从`${IOTDB_HOME}/${IOTDB_CONF}/iotdb-system.properties`加载配置。 配置如下: -| 名称 | 描述 | 默认 | -| ------------- |:-------------:|:------:| -| enable_mqtt_service | 是否启用 mqtt 服务 | false | -| mqtt_host | mqtt 服务绑定主机 | 127.0.0.1 | -| mqtt_port | mqtt 服务绑定端口 | 1883 | -| mqtt_handler_pool_size | 处理 mqtt 消息的处理程序池大小 | 1 | -| mqtt_payload_formatter | mqtt 消息有效负载格式化程序 | json | -| mqtt_max_message_size | mqtt 消息最大长度(字节)| 1048576 | +| **名称** | **描述** | **默认** | +|---------------------------| --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------- | +| `enable_mqtt_service` | 是否启用 mqtt 服务 | FALSE | +| `mqtt_host` | mqtt 服务绑定主机 | 127.0.0.1 | +| `mqtt_port` | mqtt 服务绑定端口 | 1883 | +| `mqtt_handler_pool_size` | 处理 mqtt 消息的处理程序池大小 | 1 | +| **`mqtt_payload_formatter`** | **mqtt**​**​ 消息有效负载格式化程序。**​**可选项:**​​**`json`**​**:仅适用于树模型。**​​**`line`**​**:仅适用于表模型。** | **json** | +| `mqtt_max_message_size` | mqtt 消息最大长度(字节) | 1048576 | + -## 示例代码 +## 4. 示例代码 以下是 mqtt 客户端将消息发送到 IoTDB 服务器的示例。 ```java @@ -101,18 +99,50 @@ connection.disconnect(); ``` -## 3. 自定义 MQTT 消息格式 +## 5. 自定义 MQTT 消息格式 -事实上可以通过简单编程来实现 MQTT 消息的格式自定义。 +在生产环境中,每个设备通常都配备了自己的 MQTT 客户端,且这些客户端的消息格式已经预先设定。如果按照 IoTDB 所支持的 MQTT 消息格式进行通信,就需要对现有的所有客户端进行全面的升级改造,这无疑会带来较高的成本。然而,我们可以通过简单的编程手段,轻松实现 MQTT 消息格式的自定义,而无需改造客户端。 可以在源码的 [example/mqtt-customize](https://github.com/apache/iotdb/tree/rc/2.0.1/example/mqtt-customize) 项目中找到一个简单示例。 -步骤: +假定mqtt客户端传过来的是以下消息格式: +```json + { + "time":1586076045523, + "deviceID":"car_1", + "deviceType":"油车", + "point":"油量", + "value":10.0 +} +``` +或者JSON的数组形式: +```java +[ + { + "time":1586076045523, + "deviceID":"car_1", + "deviceType":"油车", + "point":"油量", + "value":10.0 + }, + { + "time":1586076045524, + "deviceID":"car_2", + "deviceType":"新能源车", + "point":"速度", + "value":80.0 + } +] +``` + + +则可以通过以下步骤设置设置自定义MQTT消息格式: + 1. 创建一个 Java 项目,增加如下依赖 ```xml org.apache.iotdb iotdb-server - 1.3.0-SNAPSHOT + 2.0.4-SNAPSHOT ``` 2. 创建一个实现类,实现接口 `org.apache.iotdb.db.mqtt.protocol.PayloadFormatter` @@ -120,44 +150,115 @@ connection.disconnect(); ```java package org.apache.iotdb.mqtt.server; -import io.netty.buffer.ByteBuf; import org.apache.iotdb.db.protocol.mqtt.Message; import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter; +import org.apache.iotdb.db.protocol.mqtt.TableMessage; + +import com.google.common.collect.Lists; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.tsfile.enums.TSDataType; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +/** + * The Customized JSON payload formatter. one json format supported: { "time":1586076045523, + * "deviceID":"car_1", "deviceType":"新能源车", "point":"速度", "value":80.0 } + */ public class CustomizedJsonPayloadFormatter implements PayloadFormatter { + private static final String JSON_KEY_TIME = "time"; + private static final String JSON_KEY_DEVICEID = "deviceID"; + private static final String JSON_KEY_DEVICETYPE = "deviceType"; + private static final String JSON_KEY_POINT = "point"; + private static final String JSON_KEY_VALUE = "value"; + private static final Gson GSON = new GsonBuilder().create(); @Override - public List format(ByteBuf payload) { - // Suppose the payload is a json format + public List format(String topic, ByteBuf payload) { if (payload == null) { - return null; + return new ArrayList<>(); } - - String json = payload.toString(StandardCharsets.UTF_8); - // parse data from the json and generate Messages and put them into List ret - List ret = new ArrayList<>(); - // this is just an example, so we just generate some Messages directly - for (int i = 0; i < 2; i++) { - long ts = i; - Message message = new Message(); - message.setDevice("d" + i); - message.setTimestamp(ts); - message.setMeasurements(Arrays.asList("s1", "s2")); - message.setValues(Arrays.asList("4.0" + i, "5.0" + i)); - ret.add(message); + String txt = payload.toString(StandardCharsets.UTF_8); + JsonElement jsonElement = GSON.fromJson(txt, JsonElement.class); + if (jsonElement.isJsonObject()) { + JsonObject jsonObject = jsonElement.getAsJsonObject(); + return formatTableRow(topic, jsonObject); + } else if (jsonElement.isJsonArray()) { + JsonArray jsonArray = jsonElement.getAsJsonArray(); + List messages = new ArrayList<>(); + for (JsonElement element : jsonArray) { + JsonObject jsonObject = element.getAsJsonObject(); + messages.addAll(formatTableRow(topic, jsonObject)); + } + return messages; } - return ret; + throw new JsonParseException("payload is invalidate"); + } + + @Override + @Deprecated + public List format(ByteBuf payload) { + throw new NotImplementedException(); + } + + private List formatTableRow(String topic, JsonObject jsonObject) { + TableMessage message = new TableMessage(); + String database = !topic.contains("/") ? topic : topic.substring(0, topic.indexOf("/")); + String table = "test_table"; + + // Parsing Database Name + message.setDatabase((database)); + + // Parsing Table Name + message.setTable(table); + + // Parsing Tags + List tagKeys = new ArrayList<>(); + tagKeys.add(JSON_KEY_DEVICEID); + List tagValues = new ArrayList<>(); + tagValues.add(jsonObject.get(JSON_KEY_DEVICEID).getAsString()); + message.setTagKeys(tagKeys); + message.setTagValues(tagValues); + + // Parsing Attributes + List attributeKeys = new ArrayList<>(); + List attributeValues = new ArrayList<>(); + attributeKeys.add(JSON_KEY_DEVICETYPE); + attributeValues.add(jsonObject.get(JSON_KEY_DEVICETYPE).getAsString()); + message.setAttributeKeys(attributeKeys); + message.setAttributeValues(attributeValues); + + // Parsing Fields + List fields = Arrays.asList(JSON_KEY_POINT); + List dataTypes = Arrays.asList(TSDataType.FLOAT); + List values = Arrays.asList(jsonObject.get(JSON_KEY_VALUE).getAsFloat()); + message.setFields(fields); + message.setDataTypes(dataTypes); + message.setValues(values); + + // Parsing timestamp + message.setTimestamp(jsonObject.get(JSON_KEY_TIME).getAsLong()); + return Lists.newArrayList(message); } @Override public String getName() { - // set the value of mqtt_payload_formatter in iotdb-system.properties as the following string: - return "CustomizedJson"; + // set the value of mqtt_payload_formatter in iotdb-common.properties as the following string: + return "CustomizedJson2Table"; + } + + @Override + public String getType() { + return PayloadFormatter.TABLE_TYPE; } } ``` @@ -171,9 +272,22 @@ public class CustomizedJsonPayloadFormatter implements PayloadFormatter { 1. 创建 ${IOTDB_HOME}/ext/mqtt/ 文件夹, 将刚才的 jar 包放入此文件夹。 2. 打开 MQTT 服务参数. (`enable_mqtt_service=true` in `conf/iotdb-system.properties`) 3. 用刚才的实现类中的 getName() 方法的返回值 设置为 `conf/iotdb-system.properties` 中 `mqtt_payload_formatter` 的值, - , 在本例中,为 `CustomizedJson` + , 在本例中,为 `CustomizedJson2Table` 4. 启动 IoTDB 5. 搞定 More: MQTT 协议的消息不限于 json,你还可以用任意二进制。通过如下函数获得: `payload.forEachByte()` or `payload.array`。 + + +## 6. 注意事项 + +为避免因缺省client_id引发的兼容性问题,强烈建议在所有MQTT客户端中始终显式地提供唯一且非空的 client_id。 +不同客户端在client_id缺失或为空时的表现并不一致,常见示例如下: +1. 显式传入空字符串 + • MQTTX:client_id=""时,IoTDB会直接丢弃消息; + • mosquitto_pub:client_id=""时,IoTDB能正常接收消息。 +2. 完全不传client_id + • MQTTX:消息可被IoTDB正常接收; + • mosquitto_pub:IoTDB拒绝连接。 + 由此可见,显式指定唯一且非空的client_id是消除上述差异、确保消息可靠投递的最简单做法。 \ No newline at end of file