Skip to content

Commit 77ff518

Browse files
authored
Merge pull request #571 from virajdere/feature/mqtt-entity-sink
2 parents c830c7b + f877cdd commit 77ff518

File tree

12 files changed

+1839
-5
lines changed

12 files changed

+1839
-5
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -904,6 +904,10 @@ Sinks {
904904

905905
*Default*: All data items
906906

907+
## MQTT Entity Sink Documentation
908+
909+
For detailed configuration, usage, and message format for the MQTT Entity Sink, see: [docs: MTConnect MQTT Entity Sink](src/mtconnect/sink/mqtt_entity_sink/README.md)
910+
907911
### Adapter Configuration Items ###
908912

909913
* `Adapters` - Contains a list of device blocks. If there are no Adapters

agent_lib/CMakeLists.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,14 @@ set(AGENT_SOURCES
257257

258258
"${SOURCE_DIR}/sink/mqtt_sink/mqtt_service.cpp"
259259

260+
# src/sink/mqtt_entity_sink HEADER_FILE_ONLY
261+
262+
"${SOURCE_DIR}/sink/mqtt_entity_sink/mqtt_entity_sink.hpp"
263+
264+
#src/sink/mqtt_entity_sink SOURCE_FILES_ONLY
265+
266+
"${SOURCE_DIR}/sink/mqtt_entity_sink/mqtt_entity_sink.cpp"
267+
260268
# src/sink/rest_sink HEADER_FILE_ONLY
261269

262270
"${SOURCE_DIR}/sink/rest_sink/cached_file.hpp"
@@ -341,6 +349,7 @@ if(MSVC)
341349
# The modules including Beast required the /bigobj option in Windows
342350
set_property(SOURCE
343351
"${SOURCE_DIR}/sink/mqtt_sink/mqtt_service.cpp"
352+
"${SOURCE_DIR}/sink/mqtt_entity_sink/mqtt_entity_sink.cpp"
344353
"${SOURCE_DIR}/sink/rest_sink/session_impl.cpp"
345354
"${SOURCE_DIR}/source/adapter/mqtt/mqtt_adapter.cpp"
346355
"${SOURCE_DIR}/source/adapter/agent_adapter/agent_adapter.cpp"

schemas/cfg.schema.json

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,73 @@
314314
"enum":["at_least_once", "at_most_once", "exactly_once"]
315315
}
316316
}
317+
},
318+
"MqttEntitySink": {
319+
"type": "object",
320+
"properties": {
321+
"MqttHost": {
322+
"type": "string",
323+
"description": "IP Address or name of the MQTT Broker",
324+
"default": "127.0.0.1"
325+
},
326+
"MqttPort": {
327+
"type": "integer",
328+
"description": "Port number of MQTT Broker",
329+
"default": 1883
330+
},
331+
"MqttTls": {
332+
"type": "boolean",
333+
"description": "TLS Certificate for secure connection to the MQTT Broker",
334+
"default": false
335+
},
336+
"MqttUserName": {
337+
"type": "string",
338+
"description": "Username for MQTT authentication"
339+
},
340+
"MqttPassword": {
341+
"type": "string",
342+
"description": "Password for MQTT authentication"
343+
},
344+
"MqttClientId": {
345+
"type": "string",
346+
"description": "MQTT client identifier"
347+
},
348+
"MqttQOS": {
349+
"type": "string",
350+
"description": "The quality of service level for the MQTT connection. Options are: at_least_once, at_most_once, and exactly_once",
351+
"default": "at_least_once",
352+
"enum": [
353+
"at_least_once",
354+
"at_most_once",
355+
"exactly_once"
356+
]
357+
},
358+
"MqttRetain": {
359+
"type": "boolean",
360+
"description": "Retain the last message sent to the broker",
361+
"default": false
362+
},
363+
"ObservationTopicPrefix": {
364+
"type": "string",
365+
"description": "Prefix for the Observations topic",
366+
"default": "MTConnect/Devices/[device]/Observations"
367+
},
368+
"DeviceTopicPrefix": {
369+
"type": "string",
370+
"description": "Prefix for the Device topic",
371+
"default": "MTConnect/Probe/[device]"
372+
},
373+
"AssetTopicPrefix": {
374+
"type": "string",
375+
"description": "Prefix for the Asset topic",
376+
"default": "MTConnect/Asset/[device]"
377+
},
378+
"MqttLastWillTopic": {
379+
"type": "string",
380+
"description": "The topic used for the last will and testament for an agent",
381+
"default": "MTConnect/Probe/[device]/Availability"
382+
}
383+
}
317384
}
318385
}
319386
},

src/mtconnect/configuration/agent_config.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
#include "mtconnect/configuration/config_options.hpp"
5959
#include "mtconnect/device_model/device.hpp"
6060
#include "mtconnect/printer/xml_printer.hpp"
61+
#include "mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp"
6162
#include "mtconnect/sink/mqtt_sink/mqtt_service.hpp"
6263
#include "mtconnect/sink/rest_sink/rest_service.hpp"
6364
#include "mtconnect/source/adapter/agent_adapter/agent_adapter.hpp"
@@ -111,6 +112,7 @@ namespace mtconnect::configuration {
111112

112113
sink::mqtt_sink::MqttService::registerFactory(m_sinkFactory);
113114
sink::rest_sink::RestService::registerFactory(m_sinkFactory);
115+
sink::mqtt_entity_sink::MqttEntitySink::registerFactory(m_sinkFactory);
114116
adapter::shdr::ShdrAdapter::registerFactory(m_sourceFactory);
115117
adapter::mqtt_adapter::MqttAdapter::registerFactory(m_sourceFactory);
116118
adapter::agent_adapter::AgentAdapter::registerFactory(m_sourceFactory);

src/mtconnect/configuration/config_options.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,9 @@ namespace mtconnect {
107107
DECLARE_CONFIGURATION(MqttMaxTopicDepth);
108108
DECLARE_CONFIGURATION(MqttLastWillTopic);
109109
DECLARE_CONFIGURATION(MqttXPath);
110+
DECLARE_CONFIGURATION(ObservationTopicPrefix);
111+
DECLARE_CONFIGURATION(DeviceTopicPrefix);
112+
DECLARE_CONFIGURATION(AssetTopicPrefix);
110113
///@}
111114

112115
/// @name Adapter Configuration

src/mtconnect/mqtt/mqtt_server_impl.hpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
#include <inttypes.h>
2626
#include <mqtt/async_client.hpp>
27+
#include <mqtt/broker/topic_filter.hpp>
2728
#include <mqtt/setup_log.hpp>
2829
#include <mqtt_server_cpp.hpp>
2930

@@ -217,12 +218,12 @@ namespace mtconnect {
217218
LOG(debug) << "Server topic_name: " << topic_name;
218219
LOG(debug) << "Server contents: " << contents;
219220

220-
auto const &idx = m_subs.get<tag_topic>();
221-
auto r = idx.equal_range(topic_name);
222-
for (; r.first != r.second; ++r.first)
221+
for (const auto &sub : m_subs)
223222
{
224-
r.first->con->publish(topic_name, contents,
225-
std::min(r.first->qos_value, pubopts.get_qos()));
223+
if (mqtt::broker::compare_topic_filter(sub.topic, topic_name))
224+
{
225+
sub.con->publish(topic_name, contents, std::min(sub.qos_value, pubopts.get_qos()));
226+
}
226227
}
227228

228229
return true;

0 commit comments

Comments
 (0)