From db90f70a3609300e066525333fb03133c460b985 Mon Sep 17 00:00:00 2001 From: Bonkura <110040081+bonkuraps@users.noreply.github.com> Date: Sun, 18 Dec 2022 19:24:48 +0100 Subject: [PATCH] ZMQ: Add topics filtering (#730) - Add ZeroMQ topics support. - Add ZeroMQ publisher in /utilities. - Add requirements.txt --- .gitignore | 16 +++ .../DataStreamZMQ/datastream_zmq.cpp | 97 ++++++++++++++----- .../DataStreamZMQ/datastream_zmq.h | 5 + .../DataStreamZMQ/datastream_zmq.ui | 16 +++ .../utilities/start_test_publisher.py | 56 +++++++++++ .../DataStreamZMQ/zmp_publisher.py | 25 ----- requirements.txt | 3 + 7 files changed, 168 insertions(+), 50 deletions(-) create mode 100755 plotjuggler_plugins/DataStreamZMQ/utilities/start_test_publisher.py delete mode 100755 plotjuggler_plugins/DataStreamZMQ/zmp_publisher.py create mode 100644 requirements.txt diff --git a/.gitignore b/.gitignore index 1612832ab..962d0b24d 100644 --- a/.gitignore +++ b/.gitignore @@ -52,3 +52,19 @@ installer/io.plotjuggler.application/data # OS X .DS_Store + +# Cmake files +CMakeLists.txt.user +CMakeCache.txt +CMakeFiles +CMakeScripts +Testing +Makefile +cmake_install.cmake +install_manifest.txt +compile_commands.json +CTestTestfile.cmake +_deps + +# Clangd +.cache diff --git a/plotjuggler_plugins/DataStreamZMQ/datastream_zmq.cpp b/plotjuggler_plugins/DataStreamZMQ/datastream_zmq.cpp index cc5e77680..48c045b51 100644 --- a/plotjuggler_plugins/DataStreamZMQ/datastream_zmq.cpp +++ b/plotjuggler_plugins/DataStreamZMQ/datastream_zmq.cpp @@ -72,20 +72,22 @@ bool DataStreamZMQ::start(QStringList*) QSettings settings; QString address = settings.value("ZMQ_Subscriber::address", "localhost").toString(); QString protocol = settings.value("ZMQ_Subscriber::protocol", "JSON").toString(); + QString topics = settings.value("ZMQ_Subscriber::topics", "").toString(); + int port = settings.value("ZMQ_Subscriber::port", 9872).toInt(); dialog->ui->lineEditAddress->setText(address); dialog->ui->lineEditPort->setText(QString::number(port)); + dialog->ui->lineEditTopics->setText(topics); ParserFactoryPlugin::Ptr parser_creator; connect(dialog->ui->comboBoxProtocol, - qOverload(&QComboBox::currentIndexChanged), - this, [&](const QString & selected_protocol) - { + qOverload(&QComboBox::currentIndexChanged), this, + [&](const QString& selected_protocol) { if (parser_creator) { - if( auto prev_widget = parser_creator->optionsWidget()) + if (auto prev_widget = parser_creator->optionsWidget()) { prev_widget->setVisible(false); } @@ -110,6 +112,7 @@ bool DataStreamZMQ::start(QStringList*) address = dialog->ui->lineEditAddress->text(); port = dialog->ui->lineEditPort->text().toUShort(&ok); protocol = dialog->ui->comboBoxProtocol->currentText(); + topics = dialog->ui->lineEditTopics->text(); _parser = parser_creator->createParser({}, {}, {}, dataMap()); @@ -117,14 +120,17 @@ bool DataStreamZMQ::start(QStringList*) settings.setValue("ZMQ_Subscriber::address", address); settings.setValue("ZMQ_Subscriber::protocol", protocol); settings.setValue("ZMQ_Subscriber::port", port); + settings.setValue("ZMQ_Subscriber::topics", topics); _socket_address = (dialog->ui->comboBox->currentText() + address + ":" + QString::number(port)) .toStdString(); _zmq_socket.connect(_socket_address.c_str()); - // subscribe to everything - _zmq_socket.set(zmq::sockopt::subscribe, ""); + + parseTopicFilters(topics); + subscribeTopics(); + _zmq_socket.set(zmq::sockopt::rcvtimeo, 100); qDebug() << "ZMQ listening on address" << QString::fromStdString(_socket_address); @@ -141,12 +147,15 @@ void DataStreamZMQ::shutdown() if (_running) { _running = false; + if (_receive_thread.joinable()) { _receive_thread.join(); } + + unsubscribeTopics(); + _zmq_socket.disconnect(_socket_address.c_str()); - _running = false; } } @@ -164,27 +173,65 @@ void DataStreamZMQ::receiveLoop() double timestamp = 1e-6 * double(duration_cast(ts).count()); PJ::MessageRef msg(reinterpret_cast(recv_msg.data()), recv_msg.size()); - - try + if (parseMessage(msg, timestamp)) { - std::lock_guard lock(mutex()); - _parser->parseMessage(msg, timestamp); emit this->dataReceived(); } - catch (std::exception& err) - { - QMessageBox::warning(nullptr, tr("ZMQ Subscriber"), - tr("Problem parsing the message. ZMQ Subscriber will be " - "stopped.\n%1") - .arg(err.what()), - QMessageBox::Ok); - - _zmq_socket.disconnect(_socket_address.c_str()); - _running = false; - // notify the GUI - emit closed(); - return; - } } } } + +bool DataStreamZMQ::parseMessage(const PJ::MessageRef& msg, double& timestamp) +{ + try + { + std::lock_guard lock(mutex()); + _parser->parseMessage(msg, timestamp); + return true; + } + catch (...) + { + return false; + } +} + +void DataStreamZMQ::parseTopicFilters(const QString& topic_filters) +{ + const QRegExp regex("(,{0,1}\\s+)|(;\\s*)"); + + if (topic_filters.trimmed().size() != 0) + { + const auto splitted = topic_filters.split(regex); + + for (const auto& topic : splitted) + { + _topic_filters.push_back(topic.toStdString()); + } + } + else + { + _topic_filters.push_back(""); + } +} + +void DataStreamZMQ::subscribeTopics() +{ + for (const auto& topic : _topic_filters) + { + qDebug() << "ZMQ Subscribed topic" << QString::fromStdString(topic); + + _zmq_socket.set(zmq::sockopt::subscribe, topic); + } +} + +void DataStreamZMQ::unsubscribeTopics() +{ + for (const auto& topic : _topic_filters) + { + qDebug() << "ZMQ Unsubscribed topic" << QString::fromStdString(topic); + + _zmq_socket.set(zmq::sockopt::unsubscribe, topic); + } + + _topic_filters.clear(); +} diff --git a/plotjuggler_plugins/DataStreamZMQ/datastream_zmq.h b/plotjuggler_plugins/DataStreamZMQ/datastream_zmq.h index 1e459e55b..c028337fd 100644 --- a/plotjuggler_plugins/DataStreamZMQ/datastream_zmq.h +++ b/plotjuggler_plugins/DataStreamZMQ/datastream_zmq.h @@ -56,6 +56,11 @@ class DataStreamZMQ : public PJ::DataStreamer PJ::MessageParserPtr _parser; std::string _socket_address; std::thread _receive_thread; + std::vector _topic_filters; void receiveLoop(); + bool parseMessage(const PJ::MessageRef& msg, double& timestamp); + void parseTopicFilters(const QString& filters); + void subscribeTopics(); + void unsubscribeTopics(); }; diff --git a/plotjuggler_plugins/DataStreamZMQ/datastream_zmq.ui b/plotjuggler_plugins/DataStreamZMQ/datastream_zmq.ui index bc42d5bb9..1a1aaa363 100644 --- a/plotjuggler_plugins/DataStreamZMQ/datastream_zmq.ui +++ b/plotjuggler_plugins/DataStreamZMQ/datastream_zmq.ui @@ -147,6 +147,22 @@ + + + + + 75 + true + + + + Topics: + + + + + + diff --git a/plotjuggler_plugins/DataStreamZMQ/utilities/start_test_publisher.py b/plotjuggler_plugins/DataStreamZMQ/utilities/start_test_publisher.py new file mode 100755 index 000000000..b6818de40 --- /dev/null +++ b/plotjuggler_plugins/DataStreamZMQ/utilities/start_test_publisher.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 + +import zmq +import math +import json +import argparse + +from time import sleep +import numpy as np + +PORT = 9872 + +parser = argparse.ArgumentParser("start_test_publisher") + +parser.add_argument("--topic|-t", + dest="topic", + help="Topic on which messages will be published", + type=str, + required=False) + +args = parser.parse_args() +topic = args.topic + + +def main(): + context = zmq.Context() + server_socket = context.socket(zmq.PUB) + server_socket.bind("tcp://*:" + str(PORT)) + ticks = 0 + + while True: + data = { + "ticks": ticks, + "data": { + "cos": math.cos(ticks), + "sin": math.sin(ticks), + "floor": np.floor(np.cos(ticks)), + "ceil": np.ceil(np.cos(ticks)) + } + } + + if topic: + print(f"[{topic}] - " + json.dumps(data)) + server_socket.send_multipart( + [topic.encode(), json.dumps(data).encode()]) + else: + print(json.dumps(data)) + server_socket.send(json.dumps(data).encode()) + + ticks += 1 + + sleep(0.1) + + +if __name__ == '__main__': + main() diff --git a/plotjuggler_plugins/DataStreamZMQ/zmp_publisher.py b/plotjuggler_plugins/DataStreamZMQ/zmp_publisher.py deleted file mode 100755 index a2a73ef8a..000000000 --- a/plotjuggler_plugins/DataStreamZMQ/zmp_publisher.py +++ /dev/null @@ -1,25 +0,0 @@ -#!/usr/bin/python - -import zmq -import math -import json -from time import sleep - -context = zmq.Context() -socket = context.socket(zmq.PUB) -socket.bind("tcp://*:9872") - -time = 0.0 - -while True: - sleep(0.05) - time += 0.05 - - data = { - "timestamp": time, - "test_data": { - "cos": math.cos(time), - "sin": math.sin(time) - } - } - socket.send_string( json.dumps(data) ) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 000000000..02bf9653c --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +numpy==1.23.2 +pyzmq==23.2.1 +autopep8==1.7.0