From decd68cddc4714a699096dbc3a9645f29de4f863 Mon Sep 17 00:00:00 2001 From: Ignacio Utrilla Date: Mon, 7 Jul 2025 11:57:28 +0200 Subject: [PATCH 1/9] Added new test to read received data through a socket --- .../test/read_shape_from_socket.py | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 examples/routing_service/udp_socket_adapter/test/read_shape_from_socket.py diff --git a/examples/routing_service/udp_socket_adapter/test/read_shape_from_socket.py b/examples/routing_service/udp_socket_adapter/test/read_shape_from_socket.py new file mode 100644 index 000000000..211b79456 --- /dev/null +++ b/examples/routing_service/udp_socket_adapter/test/read_shape_from_socket.py @@ -0,0 +1,71 @@ +import socket +import sys +import struct + +# The following methods are used to simulate a moving shape (not used in this version) +def initialize_position(): + x = random.randint(lower_bound, upper_bound_x) + y = random.randint(lower_bound, upper_bound_y) + direction_x = random.choice([-1, 1]) + direction_y = random.choice([-1, 1]) + return x, y, direction_x, direction_y + + +def move_position(x, y, direction_x, direction_y): + x += direction_x * step_size + y += direction_y * step_size + + if x <= lower_bound or x >= upper_bound_x: + direction_x = -direction_x + x = max(lower_bound, min(x, upper_bound_x)) + + if y <= lower_bound or y >= upper_bound_y: + direction_y = -direction_y + y = max(lower_bound, min(y, upper_bound_y)) + + return x, y, direction_x, direction_y + + +# Receive and parse data from the socket +def receive_data(sock): + # Receive data from the socket + data, addr = sock.recvfrom(1024) # Buffer size of 1024 bytes + # Unpack the data as 3 int types (x, y, shapesize) + x, y, size = struct.unpack("iii", data) + return x, y, size, addr + + +def main(): + if len(sys.argv) != 2: + print( + "Usage: python3 read_shape_from_socket.py " + ) + return + + # Input arguments + port = int(sys.argv[1]) + + samples_received = 0 + + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: + try: + # Bind the socket to listen on the specified port + sock.bind(('', port)) + print(f"Listening for shape data on port {port}...") + + while True: + # Receive data from the socket + x, y, size, addr = receive_data(sock) + + # Print the received shape data + samples_received += 1 + print(f"Sample #{samples_received} from {addr}: x={x}, y={y}, size={size}") + + except KeyboardInterrupt: + print("\nExiting...") + except Exception as e: + print(f"Error: {e}") + + +if __name__ == "__main__": + main() From 5a1ad4f6a9cd9091dfb7d9eaddb071fdefb3fec3 Mon Sep 17 00:00:00 2001 From: Ignacio Utrilla Date: Mon, 7 Jul 2025 12:32:34 +0200 Subject: [PATCH 2/9] Added writer --- .../udp_socket_adapter/CMakeLists.txt | 2 + .../udp_socket_adapter/RsSocketAdapter.xml | 51 ++++++- .../src/SocketConnection.cxx | 19 +++ .../src/SocketConnection.hpp | 12 +- .../src/SocketStreamWriter.cxx | 130 ++++++++++++++++++ .../src/SocketStreamWriter.hpp | 91 ++++++++++++ .../udp_socket_adapter/src/UdpSocket.cxx | 11 ++ .../udp_socket_adapter/src/UdpSocket.hpp | 11 ++ 8 files changed, 319 insertions(+), 8 deletions(-) create mode 100644 examples/routing_service/udp_socket_adapter/src/SocketStreamWriter.cxx create mode 100644 examples/routing_service/udp_socket_adapter/src/SocketStreamWriter.hpp diff --git a/examples/routing_service/udp_socket_adapter/CMakeLists.txt b/examples/routing_service/udp_socket_adapter/CMakeLists.txt index 4181e5ae9..914900e91 100644 --- a/examples/routing_service/udp_socket_adapter/CMakeLists.txt +++ b/examples/routing_service/udp_socket_adapter/CMakeLists.txt @@ -37,6 +37,8 @@ add_library(${PROJECT_NAME} "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketInputDiscoveryStreamReader.hpp" "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketStreamReader.cxx" "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketStreamReader.hpp" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketStreamWriter.cxx" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketStreamWriter.hpp" "${CMAKE_CURRENT_SOURCE_DIR}/src/UdpSocket.cxx" "${CMAKE_CURRENT_SOURCE_DIR}/src/UdpSocket.hpp" ) diff --git a/examples/routing_service/udp_socket_adapter/RsSocketAdapter.xml b/examples/routing_service/udp_socket_adapter/RsSocketAdapter.xml index 757cf5948..8089e584c 100644 --- a/examples/routing_service/udp_socket_adapter/RsSocketAdapter.xml +++ b/examples/routing_service/udp_socket_adapter/RsSocketAdapter.xml @@ -28,13 +28,13 @@ - + 0 - - - + + + @@ -53,7 +53,7 @@ receive_port - 10203 + 10204 @@ -70,11 +70,48 @@ You could use Triangle or Circle if you also modify the other references to Square that are hardcoded in the adapter code --> Square - + + + + ON_ROUTE_MATCH + + ShapeType + Square + + + + send_address + 127.0.0.1 + + + send_port + 0 + + + dest_address + 127.0.0.1 + + + dest_port + 10203 + + + + + + ON_DOMAIN_MATCH + + ShapeType + Square + + + - + \ No newline at end of file diff --git a/examples/routing_service/udp_socket_adapter/src/SocketConnection.cxx b/examples/routing_service/udp_socket_adapter/src/SocketConnection.cxx index 729ce9bdf..20f1cb2bd 100644 --- a/examples/routing_service/udp_socket_adapter/src/SocketConnection.cxx +++ b/examples/routing_service/udp_socket_adapter/src/SocketConnection.cxx @@ -12,6 +12,7 @@ #include "SocketConnection.hpp" #include "SocketStreamReader.hpp" +#include "SocketStreamWriter.hpp" using namespace rti::routing; using namespace rti::routing::adapter; @@ -33,6 +34,14 @@ StreamReader *SocketConnection::create_stream_reader( return new SocketStreamReader(this, info, properties, listener); } +StreamWriter *SocketConnection::create_stream_writer( + Session *session, + const StreamInfo &info, + const PropertySet &properties) +{ + return new SocketStreamWriter(this, info, properties); +} + void SocketConnection::delete_stream_reader(StreamReader *reader) { SocketStreamReader *socket_reader = @@ -40,12 +49,22 @@ void SocketConnection::delete_stream_reader(StreamReader *reader) socket_reader->shutdown_socket_reader_thread(); delete reader; } +void SocketConnection::delete_stream_writer(StreamWriter *writer) +{ + SocketStreamWriter *socket_writer = dynamic_cast(writer); + delete writer; +} DiscoveryStreamReader *SocketConnection::input_stream_discovery_reader() { return &input_discovery_reader_; } +DiscoveryStreamReader *SocketConnection::output_stream_discovery_reader() +{ + return nullptr; +} + void SocketConnection::dispose_discovery_stream( const rti::routing::StreamInfo &stream_info) { diff --git a/examples/routing_service/udp_socket_adapter/src/SocketConnection.hpp b/examples/routing_service/udp_socket_adapter/src/SocketConnection.hpp index c79ccb00a..e1171cf3a 100644 --- a/examples/routing_service/udp_socket_adapter/src/SocketConnection.hpp +++ b/examples/routing_service/udp_socket_adapter/src/SocketConnection.hpp @@ -37,13 +37,23 @@ class SocketConnection : public rti::routing::adapter::Connection { const rti::routing::PropertySet &properties, rti::routing::adapter::StreamReaderListener *listener) final; - // This function will also stop the receiving socket thread + rti::routing::adapter::StreamWriter *create_stream_writer( + rti::routing::adapter::Session *session, + const rti::routing::StreamInfo &info, + const rti::routing::PropertySet &properties) final; + void delete_stream_reader( rti::routing::adapter::StreamReader *reader) final; + void delete_stream_writer( + rti::routing::adapter::StreamWriter *writer) final; + rti::routing::adapter::DiscoveryStreamReader * input_stream_discovery_reader() final; + rti::routing::adapter::DiscoveryStreamReader * + output_stream_discovery_reader() final; + /** * @brief This function is called by the SocketStreamReader to indicate * that it's time to dispose the route. The dispose set by the diff --git a/examples/routing_service/udp_socket_adapter/src/SocketStreamWriter.cxx b/examples/routing_service/udp_socket_adapter/src/SocketStreamWriter.cxx new file mode 100644 index 000000000..585c0680b --- /dev/null +++ b/examples/routing_service/udp_socket_adapter/src/SocketStreamWriter.cxx @@ -0,0 +1,130 @@ +/* + * (c) 2019 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#include +#include +#include +#include +#include + +#include +#include +#ifdef _WIN32 + #include + #pragma comment(lib, "ws2_32.lib") +#else + #include + #include + #include + #include + #include +#endif + +#include +#include +#include +#include "SocketStreamWriter.hpp" +#include "SocketStreamReader.hpp" //use ShapeType from here +#include + +using namespace dds::core::xtypes; +using namespace rti::routing; +using namespace rti::routing::adapter; + +SocketStreamWriter::SocketStreamWriter( + SocketConnection *connection, + const StreamInfo &info, + const PropertySet &properties + ) + : stream_info_(info.stream_name(), info.type_info().type_name()) +{ + + socket_connection_ = connection; + + adapter_type_ = + static_cast(info.type_info().type_representation()); + + + // Parse the properties provided in the xml configuration file + for (const auto &property : properties) { + if (property.first == SEND_ADDRESS_STRING) { + send_address_ = property.second; + } + else if (property.first == SEND_PORT_STRING) { + send_port_ = std::stoi(property.second); + } + else if (property.first == DEST_ADDRESS_STRING) + { + dest_address_ = property.second; + } + else if (property.first == DEST_PORT_STRING) + { + dest_port_ = std::stoi(property.second); + } + } + + socket = std::unique_ptr(new UdpSocket( + send_address_.c_str(), + send_port_)); +} + +int SocketStreamWriter::write( + const std::vector &samples, + const std::vector &infos) +{ + size_t len = 0; + + ShapeType shapes; + uint32_t tempObject=0; + + for (const auto sample : samples) { + //send sample out UDP interface + if (sample->member_exists_in_type("shapesize")) + { + shapes.shapesize = sample->value("shapesize"); + shapes.x = sample->value("x"); + shapes.y = sample->value("y"); + len = +socket->send_data((char*)&shapes, sizeof(shapes), dest_address_.c_str(), dest_port_); + } + else + { + Logger::instance().local("Received Sample that is not valid ShapeType"); + } + } + return len; +} + +int SocketStreamWriter::write( + const std::vector &samples, + const std::vector &infos, + const SelectorState &selector_state) +{ + int len; + len = write(samples, infos); + return len; +} + +void SocketStreamWriter::return_loan( + std::vector &samples, + std::vector &infos) +{ + for (int i = 0; i < samples.size(); ++i) { + delete samples[i]; + delete infos[i]; + } + samples.clear(); + infos.clear(); +} + +SocketStreamWriter::~SocketStreamWriter() +{ +} diff --git a/examples/routing_service/udp_socket_adapter/src/SocketStreamWriter.hpp b/examples/routing_service/udp_socket_adapter/src/SocketStreamWriter.hpp new file mode 100644 index 000000000..25166aceb --- /dev/null +++ b/examples/routing_service/udp_socket_adapter/src/SocketStreamWriter.hpp @@ -0,0 +1,91 @@ +/* + * (c) 2019 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#ifndef SOCKETSTREAMWRITER_HPP +#define SOCKETSTREAMWRITER_HPP + +#include +#include +#include +#include + +#include "SocketConnection.hpp" +#include "UdpSocket.hpp" + +#include +#include + +#define BUFFER_MAX_SIZE 1024 +#define SEND_ADDRESS_STRING "send_address" +#define SEND_PORT_STRING "send_port" +#define DEST_ADDRESS_STRING "dest_address" +#define DEST_PORT_STRING "dest_port" + +class SocketStreamWriter : public rti::routing::adapter::DynamicDataStreamWriter { +public: + explicit SocketStreamWriter( + SocketConnection *connection, + const rti::routing::StreamInfo &info, + const rti::routing::PropertySet & + ); + + virtual int + write(const std::vector &, + const std::vector &) final; + + virtual int write( + const std::vector &, + const std::vector &, + const rti::routing::adapter::SelectorState &selector_state) final; + + virtual void return_loan( + std::vector &, + std::vector &) final; + + ~SocketStreamWriter(); + + +private: + /** + * @brief Function used by socketreader_thread_ to read samples from the + * socket. + */ + + + SocketConnection *socket_connection_; + + std::unique_ptr socket; + + int send_port_; + int dest_port_; + + std::string send_address_; + std::string dest_address_; + rti::routing::StreamInfo stream_info_; + dds::core::xtypes::DynamicType *adapter_type_; + + struct doNothing { + RTI_INT32 CountUp; + RTI_INT32 CountDown; + RTI_INT32 Pause; + //dds::core::optionalObjectId; + RTI_UINT32 ObjectId; + }; + struct ShapeType { + int x; + int y; + int shapesize; + }; + +}; + +#endif diff --git a/examples/routing_service/udp_socket_adapter/src/UdpSocket.cxx b/examples/routing_service/udp_socket_adapter/src/UdpSocket.cxx index 3c1bf9f61..164468c8c 100644 --- a/examples/routing_service/udp_socket_adapter/src/UdpSocket.cxx +++ b/examples/routing_service/udp_socket_adapter/src/UdpSocket.cxx @@ -97,4 +97,15 @@ void UdpSocket::receive_data( &client_addr_len); return; +} + +int UdpSocket::send_data(char* tx_buffer, int tx_length, const char* destAddr, int destPort) +{ + sockaddr_in dest_addr; + dest_addr.sin_family = AF_INET; + dest_addr.sin_port = htons(destPort); + dest_addr.sin_addr.s_addr = inet_addr(destAddr); + + size_t length = sendto(sockfd, tx_buffer, tx_length, 0, (struct sockaddr*)&dest_addr, sizeof(dest_addr)); + return (int)length; } \ No newline at end of file diff --git a/examples/routing_service/udp_socket_adapter/src/UdpSocket.hpp b/examples/routing_service/udp_socket_adapter/src/UdpSocket.hpp index bc0773f41..ba03954ed 100644 --- a/examples/routing_service/udp_socket_adapter/src/UdpSocket.hpp +++ b/examples/routing_service/udp_socket_adapter/src/UdpSocket.hpp @@ -10,6 +10,9 @@ * use or inability to use the software. */ +#ifndef UDPSOCKETUTILS_HPP +#define UDPSOCKETUTILS_HPP + #ifdef _WIN32 #include #include @@ -37,6 +40,12 @@ class UdpSocket { int* received_bytes, int size_of_original_buffer); + int send_data( + char* tx_buffer, + int tx_length, + const char* destAddr, + int destPort); + private: #ifdef _WIN32 SOCKET sockfd; @@ -48,3 +57,5 @@ class UdpSocket { void init_socket(); void bind_socket(const char* ip, int port); }; + +#endif From 38f0644608bf7bfced333c49cbe9e87142b44436 Mon Sep 17 00:00:00 2001 From: Ignacio Utrilla Date: Mon, 7 Jul 2025 13:22:58 +0200 Subject: [PATCH 3/9] Split example into two. Dynamic work in progress --- .../CMakeLists.txt | 0 .../README.md | 0 .../RsSocketAdapter.xml | 99 +++++++++++ .../src/SocketAdapter.cxx | 0 .../src/SocketAdapter.hpp | 0 .../src/SocketConnection.cxx | 0 .../src/SocketConnection.hpp | 0 .../src/SocketInputDiscoveryStreamReader.cxx | 88 +++++++++ .../src/SocketInputDiscoveryStreamReader.hpp | 61 +++++++ .../src/SocketStreamReader.cxx | 154 ++++++++++++++++ .../src/SocketStreamReader.hpp | 76 ++++++++ .../src/SocketStreamWriter.cxx | 123 +++++++++++++ .../src/SocketStreamWriter.hpp | 77 ++++++++ .../src/UdpSocket.cxx | 0 .../src/UdpSocket.hpp | 0 .../test/read_shape_from_socket.py | 0 .../test/send_shape_to_socket.py | 0 .../udp_socket_adapter_typed/CMakeLists.txt | 58 ++++++ .../udp_socket_adapter_typed/README.md | 168 ++++++++++++++++++ .../RsSocketAdapter.xml | 0 .../Types.xml | 0 .../src/SocketAdapter.cxx | 49 +++++ .../src/SocketAdapter.hpp | 52 ++++++ .../src/SocketConnection.cxx | 72 ++++++++ .../src/SocketConnection.hpp | 72 ++++++++ .../src/SocketInputDiscoveryStreamReader.cxx | 0 .../src/SocketInputDiscoveryStreamReader.hpp | 0 .../src/SocketStreamReader.cxx | 0 .../src/SocketStreamReader.hpp | 0 .../src/SocketStreamWriter.cxx | 0 .../src/SocketStreamWriter.hpp | 7 - .../src/UdpSocket.cxx | 111 ++++++++++++ .../src/UdpSocket.hpp | 61 +++++++ .../test/read_shape_from_socket.py | 47 +++++ .../test/send_shape_to_socket.py | 85 +++++++++ 35 files changed, 1453 insertions(+), 7 deletions(-) rename examples/routing_service/{udp_socket_adapter => udp_socket_adapter_dynamic}/CMakeLists.txt (100%) rename examples/routing_service/{udp_socket_adapter => udp_socket_adapter_dynamic}/README.md (100%) create mode 100644 examples/routing_service/udp_socket_adapter_dynamic/RsSocketAdapter.xml rename examples/routing_service/{udp_socket_adapter => udp_socket_adapter_dynamic}/src/SocketAdapter.cxx (100%) rename examples/routing_service/{udp_socket_adapter => udp_socket_adapter_dynamic}/src/SocketAdapter.hpp (100%) rename examples/routing_service/{udp_socket_adapter => udp_socket_adapter_dynamic}/src/SocketConnection.cxx (100%) rename examples/routing_service/{udp_socket_adapter => udp_socket_adapter_dynamic}/src/SocketConnection.hpp (100%) create mode 100644 examples/routing_service/udp_socket_adapter_dynamic/src/SocketInputDiscoveryStreamReader.cxx create mode 100644 examples/routing_service/udp_socket_adapter_dynamic/src/SocketInputDiscoveryStreamReader.hpp create mode 100644 examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.cxx create mode 100644 examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.hpp create mode 100644 examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.cxx create mode 100644 examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.hpp rename examples/routing_service/{udp_socket_adapter => udp_socket_adapter_dynamic}/src/UdpSocket.cxx (100%) rename examples/routing_service/{udp_socket_adapter => udp_socket_adapter_dynamic}/src/UdpSocket.hpp (100%) rename examples/routing_service/{udp_socket_adapter => udp_socket_adapter_dynamic}/test/read_shape_from_socket.py (100%) rename examples/routing_service/{udp_socket_adapter => udp_socket_adapter_dynamic}/test/send_shape_to_socket.py (100%) create mode 100644 examples/routing_service/udp_socket_adapter_typed/CMakeLists.txt create mode 100644 examples/routing_service/udp_socket_adapter_typed/README.md rename examples/routing_service/{udp_socket_adapter => udp_socket_adapter_typed}/RsSocketAdapter.xml (100%) rename examples/routing_service/{udp_socket_adapter => udp_socket_adapter_typed}/Types.xml (100%) create mode 100644 examples/routing_service/udp_socket_adapter_typed/src/SocketAdapter.cxx create mode 100644 examples/routing_service/udp_socket_adapter_typed/src/SocketAdapter.hpp create mode 100644 examples/routing_service/udp_socket_adapter_typed/src/SocketConnection.cxx create mode 100644 examples/routing_service/udp_socket_adapter_typed/src/SocketConnection.hpp rename examples/routing_service/{udp_socket_adapter => udp_socket_adapter_typed}/src/SocketInputDiscoveryStreamReader.cxx (100%) rename examples/routing_service/{udp_socket_adapter => udp_socket_adapter_typed}/src/SocketInputDiscoveryStreamReader.hpp (100%) rename examples/routing_service/{udp_socket_adapter => udp_socket_adapter_typed}/src/SocketStreamReader.cxx (100%) rename examples/routing_service/{udp_socket_adapter => udp_socket_adapter_typed}/src/SocketStreamReader.hpp (100%) rename examples/routing_service/{udp_socket_adapter => udp_socket_adapter_typed}/src/SocketStreamWriter.cxx (100%) rename examples/routing_service/{udp_socket_adapter => udp_socket_adapter_typed}/src/SocketStreamWriter.hpp (93%) create mode 100644 examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.cxx create mode 100644 examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.hpp create mode 100644 examples/routing_service/udp_socket_adapter_typed/test/read_shape_from_socket.py create mode 100644 examples/routing_service/udp_socket_adapter_typed/test/send_shape_to_socket.py diff --git a/examples/routing_service/udp_socket_adapter/CMakeLists.txt b/examples/routing_service/udp_socket_adapter_dynamic/CMakeLists.txt similarity index 100% rename from examples/routing_service/udp_socket_adapter/CMakeLists.txt rename to examples/routing_service/udp_socket_adapter_dynamic/CMakeLists.txt diff --git a/examples/routing_service/udp_socket_adapter/README.md b/examples/routing_service/udp_socket_adapter_dynamic/README.md similarity index 100% rename from examples/routing_service/udp_socket_adapter/README.md rename to examples/routing_service/udp_socket_adapter_dynamic/README.md diff --git a/examples/routing_service/udp_socket_adapter_dynamic/RsSocketAdapter.xml b/examples/routing_service/udp_socket_adapter_dynamic/RsSocketAdapter.xml new file mode 100644 index 000000000..37eeff5ce --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_dynamic/RsSocketAdapter.xml @@ -0,0 +1,99 @@ + + + + + + SocketAdapterCpp + SocketAdapter_create_adapter_plugin + + + + + + + + + + + 0 + + + + + + + + ON_DOMAIN_MATCH + ShapeType + Square + + + + + + receive_address + 127.0.0.1 + + + + receive_port + 10204 + + + + + + ON_ROUTE_MATCH + ShapeType + + Square + + + + + + ON_ROUTE_MATCH + + ShapeType + Square + + + + send_address + 127.0.0.1 + + + send_port + 0 + + + dest_address + 127.0.0.1 + + + dest_port + 10204 + + + + + + ON_DOMAIN_MATCH + + ShapeType + Square + + + + + + + + \ No newline at end of file diff --git a/examples/routing_service/udp_socket_adapter/src/SocketAdapter.cxx b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketAdapter.cxx similarity index 100% rename from examples/routing_service/udp_socket_adapter/src/SocketAdapter.cxx rename to examples/routing_service/udp_socket_adapter_dynamic/src/SocketAdapter.cxx diff --git a/examples/routing_service/udp_socket_adapter/src/SocketAdapter.hpp b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketAdapter.hpp similarity index 100% rename from examples/routing_service/udp_socket_adapter/src/SocketAdapter.hpp rename to examples/routing_service/udp_socket_adapter_dynamic/src/SocketAdapter.hpp diff --git a/examples/routing_service/udp_socket_adapter/src/SocketConnection.cxx b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketConnection.cxx similarity index 100% rename from examples/routing_service/udp_socket_adapter/src/SocketConnection.cxx rename to examples/routing_service/udp_socket_adapter_dynamic/src/SocketConnection.cxx diff --git a/examples/routing_service/udp_socket_adapter/src/SocketConnection.hpp b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketConnection.hpp similarity index 100% rename from examples/routing_service/udp_socket_adapter/src/SocketConnection.hpp rename to examples/routing_service/udp_socket_adapter_dynamic/src/SocketConnection.hpp diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketInputDiscoveryStreamReader.cxx b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketInputDiscoveryStreamReader.cxx new file mode 100644 index 000000000..cd21be8a0 --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketInputDiscoveryStreamReader.cxx @@ -0,0 +1,88 @@ +/* + * (c) 2019 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#include "SocketInputDiscoveryStreamReader.hpp" + +using namespace rti::routing; +using namespace rti::routing::adapter; + +SocketInputDiscoveryStreamReader::SocketInputDiscoveryStreamReader( + const PropertySet &, + StreamReaderListener *input_stream_discovery_listener) +{ + input_stream_discovery_listener_ = input_stream_discovery_listener; +} + +void SocketInputDiscoveryStreamReader::dispose( + const rti::routing::StreamInfo &stream_info) +{ + /** + * This guard is essential since the take() and return_loan() operations + * triggered by calling on_data_available() execute on an internal Routing + * Service thread. The custom dispose() operation doesn't run on that + * thread. Since the take() and return_loan() operations also need to access + * the data_samples_ list this protection is required. + */ + std::lock_guard guard(data_samples_mutex_); + + std::unique_ptr stream_info_disposed( + new StreamInfo( + stream_info.stream_name(), + stream_info.type_info().type_name())); + stream_info_disposed.get()->disposed(true); + + this->data_samples_.push_back(std::move(stream_info_disposed)); + input_stream_discovery_listener_->on_data_available(this); +} + +void SocketInputDiscoveryStreamReader::take( + std::vector &stream) +{ + /** + * This guard is essential since the take() and return_loan() operations + * triggered by calling on_data_available() execute on an internal Routing + * Service thread. The custom dispose() operation doesn't run on that + * thread. Since the take() and return_loan() operations also need to access + * the data_samples_ list this protection is required. + */ + std::lock_guard guard(data_samples_mutex_); + std::transform( + data_samples_.begin(), + data_samples_.end(), + std::back_inserter(stream), + [](const std::unique_ptr &element) { + return element.get(); + }); +} + +void SocketInputDiscoveryStreamReader::return_loan( + std::vector &stream) +{ + /** + * This guard is essential since the take() and return_loan() operations + * triggered by calling on_data_available() execute on an internal Routing + * Service thread. The custom dispose() operation doesn't run on that + * thread. Since the take() and return_loan() operations also need to access + * the data_samples_ list this protection is required. + */ + std::lock_guard guard(data_samples_mutex_); + + /** + * For discovery streams there will never be any outstanding return_loan(). + * Thus we can be sure that each take() will be followed by a call to + * return_loan(), before the next take() executes. + */ + this->data_samples_.erase( + data_samples_.begin(), + data_samples_.begin() + stream.size()); + stream.clear(); +} diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketInputDiscoveryStreamReader.hpp b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketInputDiscoveryStreamReader.hpp new file mode 100644 index 000000000..b178118e1 --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketInputDiscoveryStreamReader.hpp @@ -0,0 +1,61 @@ +/* + * (c) 2019 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#ifndef SOCKETDISCOVERYSTREAMREADER_HPP +#define SOCKETDISCOVERYSTREAMREADER_HPP + +#include +#include + +#include +#include + +/** + * This class implements a DiscoveryStreamReader, a special kind of StreamReader + * that provide discovery information about the available streams and their + * types. + */ + +class SocketInputDiscoveryStreamReader + : public rti::routing::adapter::DiscoveryStreamReader { +public: + SocketInputDiscoveryStreamReader( + const rti::routing::PropertySet &, + rti::routing::adapter::StreamReaderListener + *input_stream_discovery_listener); + + void take(std::vector &) final; + + void return_loan(std::vector &) final; + + /** + * @brief Custom operation defined to indicate disposing off an + * when the SocketStreamReader has finished reading from the socket. + * The SocketInputDiscoveryStreamReader will then create a new + * discovery sample indicating that the stream has been disposed. + * This will cause the Routing Service to start tearing down the Routes + * associated with having the corresponding + * and . + * + * @param stream_info \b in. Reference to a StreamInfo object which should + * be used when creating a new StreamInfo sample with disposed set to true + */ + void dispose(const rti::routing::StreamInfo &stream_info); + +private: + std::mutex data_samples_mutex_; + std::vector> data_samples_; + rti::routing::adapter::StreamReaderListener + *input_stream_discovery_listener_; +}; + +#endif diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.cxx b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.cxx new file mode 100644 index 000000000..d7d41b634 --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.cxx @@ -0,0 +1,154 @@ +/* + * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#include +#include +#include +#include +#include + +#include +#include +#ifdef _WIN32 + #include + #pragma comment(lib, "ws2_32.lib") +#else + #include + #include + #include + #include + #include +#endif + +#include "SocketStreamReader.hpp" +#include +#include + +using namespace dds::core::xtypes; +using namespace rti::routing; +using namespace rti::routing::adapter; + +void SocketStreamReader::socket_reading_thread() +{ + while (!stop_thread_) { + /** + * Essential to protect against concurrent data access to + * buffer_ from the take() methods running on a different + * Routing Service thread. + */ + std::unique_lock lock(buffer_mutex_); + socket->receive_data( + received_buffer_, + &received_bytes_, + BUFFER_MAX_SIZE); + lock.unlock(); + + // Most likely received nothing or there was an error + // Not doing any error handling here + if (received_bytes_ <= 0) { + // Sleep for a small period of time to avoid busy waiting + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + continue; + } + + /** + * Here we notify Routing Service, that there is data available + * on the StreamReader, triggering a call to take(). + */ + reader_listener_->on_data_available(this); + } + + socket_connection_->dispose_discovery_stream(stream_info_); +} + +SocketStreamReader::SocketStreamReader( + SocketConnection *connection, + const StreamInfo &info, + const PropertySet &properties, + StreamReaderListener *listener) + : stop_thread_(false), + stream_info_(info.stream_name(), info.type_info().type_name()) +{ + socket_connection_ = connection; + reader_listener_ = listener; + adapter_type_ = + static_cast(info.type_info().type_representation()); + + // Parse the properties provided in the xml configuration file + for (const auto &property : properties) { + if (property.first == RECEIVE_ADDRESS_STRING) { + receive_address_ = property.second; + } else if (property.first == RECEIVE_PORT_STRING) { + receive_port_ = std::stoi(property.second); + } + } + + // If any of the mandatory properties is not specified, throw exception + if (receive_address_.size() == 0 || receive_port_ == 0) { + throw dds::core::IllegalOperationError( + "You must set receive_address and receive_port " + "in the RsSocketAdapter.xml file"); + } + + // Create the UDP socket to receive data + socket = std::unique_ptr( + new UdpSocket(receive_address_.c_str(), receive_port_)); + + // Start the receive thread for UDP data + socketreader_thread_ = + std::thread(&SocketStreamReader::socket_reading_thread, this); +} + +/** + * This is the Routing Service take(). It's called when the + * socket_receive_thread calls on_data_available() + */ +void SocketStreamReader::take( + std::vector &samples, + std::vector &infos) +{ + dds::core::xtypes::DynamicData deserialized_sample(*adapter_type_); + std::vector received_buffer = std::vector(received_buffer_, received_buffer_ + received_bytes_); + rti::core::xtypes::from_cdr_buffer(deserialized_sample, received_buffer); + std::cout << deserialized_sample << std::endl; + + samples.resize(1); + infos.resize(1); + + std::unique_ptr sample(new DynamicData(*adapter_type_)); + *sample = deserialized_sample; + samples[0] = sample.release(); + + return; +} + +void SocketStreamReader::return_loan( + std::vector &samples, + std::vector &infos) +{ + for (int i = 0; i < samples.size(); ++i) { + delete samples[i]; + delete infos[i]; + } + samples.clear(); + infos.clear(); +} + +void SocketStreamReader::shutdown_socket_reader_thread() +{ + stop_thread_ = true; + socketreader_thread_.join(); +} + +SocketStreamReader::~SocketStreamReader() +{ +} diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.hpp b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.hpp new file mode 100644 index 000000000..e09af73e5 --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.hpp @@ -0,0 +1,76 @@ +/* + * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#ifndef SOCKETSTREAMREADER_HPP +#define SOCKETSTREAMREADER_HPP + +#include +#include +#include + +#include "SocketConnection.hpp" +#include "UdpSocket.hpp" + +#include +#include + +#define BUFFER_MAX_SIZE 1024 +#define RECEIVE_ADDRESS_STRING "receive_address" +#define RECEIVE_PORT_STRING "receive_port" + +class SocketStreamReader : public rti::routing::adapter::DynamicDataStreamReader { +public: + SocketStreamReader( + SocketConnection *connection, + const rti::routing::StreamInfo &info, + const rti::routing::PropertySet &, + rti::routing::adapter::StreamReaderListener *listener); + + void take( + std::vector &, + std::vector &) final; + + void return_loan( + std::vector &, + std::vector &) final; + + void shutdown_socket_reader_thread(); + + ~SocketStreamReader(); + +private: + /** + * @brief Function used by socketreader_thread_ to read samples from the + * socket. + */ + void socket_reading_thread(); + + SocketConnection *socket_connection_; + rti::routing::adapter::StreamReaderListener *reader_listener_; + + std::unique_ptr socket; + + std::thread socketreader_thread_; + bool stop_thread_; + + std::ifstream input_socket_stream_; + std::string receive_address_; + int receive_port_; + char received_buffer_[BUFFER_MAX_SIZE]; // Value that's high enough + int received_bytes_; + std::mutex buffer_mutex_; + + rti::routing::StreamInfo stream_info_; + dds::core::xtypes::DynamicType *adapter_type_; +}; + +#endif diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.cxx b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.cxx new file mode 100644 index 000000000..10deb4726 --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.cxx @@ -0,0 +1,123 @@ +/* + * (c) 2019 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#include +#include +#include +#include +#include + +#include +#include +#ifdef _WIN32 + #include + #pragma comment(lib, "ws2_32.lib") +#else + #include + #include + #include + #include + #include +#endif + +#include +#include +#include +#include "SocketStreamWriter.hpp" +#include "SocketStreamReader.hpp" //use ShapeType from here +#include + +using namespace dds::core::xtypes; +using namespace rti::routing; +using namespace rti::routing::adapter; + +SocketStreamWriter::SocketStreamWriter( + SocketConnection *connection, + const StreamInfo &info, + const PropertySet &properties + ) + : stream_info_(info.stream_name(), info.type_info().type_name()) +{ + + socket_connection_ = connection; + + adapter_type_ = + static_cast(info.type_info().type_representation()); + + + // Parse the properties provided in the xml configuration file + for (const auto &property : properties) { + if (property.first == SEND_ADDRESS_STRING) { + send_address_ = property.second; + } + else if (property.first == SEND_PORT_STRING) { + send_port_ = std::stoi(property.second); + } + else if (property.first == DEST_ADDRESS_STRING) + { + dest_address_ = property.second; + } + else if (property.first == DEST_PORT_STRING) + { + dest_port_ = std::stoi(property.second); + } + } + + socket = std::unique_ptr(new UdpSocket( + send_address_.c_str(), + send_port_)); +} + +int SocketStreamWriter::write( + const std::vector &samples, + const std::vector &infos) +{ + size_t len = 0; + for (const auto sample : samples) { + std::vector buffer; + rti::core::xtypes::to_cdr_buffer(buffer, *sample); + // Send the serialized data + len = socket->send_data( + buffer.data(), + buffer.size(), + dest_address_.c_str(), + dest_port_); + } + + return len; +} + +int SocketStreamWriter::write( + const std::vector &samples, + const std::vector &infos, + const SelectorState &selector_state) +{ + int len; + len = write(samples, infos); + return len; +} + +void SocketStreamWriter::return_loan( + std::vector &samples, + std::vector &infos) +{ + for (int i = 0; i < samples.size(); ++i) { + delete samples[i]; + delete infos[i]; + } + samples.clear(); + infos.clear(); +} + +SocketStreamWriter::~SocketStreamWriter() +{ +} diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.hpp b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.hpp new file mode 100644 index 000000000..a464f8aec --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.hpp @@ -0,0 +1,77 @@ +/* + * (c) 2019 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#ifndef SOCKETSTREAMWRITER_HPP +#define SOCKETSTREAMWRITER_HPP + +#include +#include +#include +#include + +#include "SocketConnection.hpp" +#include "UdpSocket.hpp" + +#include +#include + +#define BUFFER_MAX_SIZE 1024 +#define SEND_ADDRESS_STRING "send_address" +#define SEND_PORT_STRING "send_port" +#define DEST_ADDRESS_STRING "dest_address" +#define DEST_PORT_STRING "dest_port" + +class SocketStreamWriter : public rti::routing::adapter::DynamicDataStreamWriter { +public: + explicit SocketStreamWriter( + SocketConnection *connection, + const rti::routing::StreamInfo &info, + const rti::routing::PropertySet & + ); + + virtual int + write(const std::vector &, + const std::vector &) final; + + virtual int write( + const std::vector &, + const std::vector &, + const rti::routing::adapter::SelectorState &selector_state) final; + + virtual void return_loan( + std::vector &, + std::vector &) final; + + ~SocketStreamWriter(); + + +private: + /** + * @brief Function used by socketreader_thread_ to read samples from the + * socket. + */ + + + SocketConnection *socket_connection_; + + std::unique_ptr socket; + + int send_port_; + int dest_port_; + + std::string send_address_; + std::string dest_address_; + rti::routing::StreamInfo stream_info_; + dds::core::xtypes::DynamicType *adapter_type_; +}; + +#endif diff --git a/examples/routing_service/udp_socket_adapter/src/UdpSocket.cxx b/examples/routing_service/udp_socket_adapter_dynamic/src/UdpSocket.cxx similarity index 100% rename from examples/routing_service/udp_socket_adapter/src/UdpSocket.cxx rename to examples/routing_service/udp_socket_adapter_dynamic/src/UdpSocket.cxx diff --git a/examples/routing_service/udp_socket_adapter/src/UdpSocket.hpp b/examples/routing_service/udp_socket_adapter_dynamic/src/UdpSocket.hpp similarity index 100% rename from examples/routing_service/udp_socket_adapter/src/UdpSocket.hpp rename to examples/routing_service/udp_socket_adapter_dynamic/src/UdpSocket.hpp diff --git a/examples/routing_service/udp_socket_adapter/test/read_shape_from_socket.py b/examples/routing_service/udp_socket_adapter_dynamic/test/read_shape_from_socket.py similarity index 100% rename from examples/routing_service/udp_socket_adapter/test/read_shape_from_socket.py rename to examples/routing_service/udp_socket_adapter_dynamic/test/read_shape_from_socket.py diff --git a/examples/routing_service/udp_socket_adapter/test/send_shape_to_socket.py b/examples/routing_service/udp_socket_adapter_dynamic/test/send_shape_to_socket.py similarity index 100% rename from examples/routing_service/udp_socket_adapter/test/send_shape_to_socket.py rename to examples/routing_service/udp_socket_adapter_dynamic/test/send_shape_to_socket.py diff --git a/examples/routing_service/udp_socket_adapter_typed/CMakeLists.txt b/examples/routing_service/udp_socket_adapter_typed/CMakeLists.txt new file mode 100644 index 000000000..914900e91 --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_typed/CMakeLists.txt @@ -0,0 +1,58 @@ +# +# (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. +# +# RTI grants Licensee a license to use, modify, compile, and create derivative +# works of the Software. Licensee has the right to distribute object form +# only for use with RTI products. The Software is provided "as is", with no +# warranty of any type, including any warranty for fitness for any purpose. +# RTI is under no obligation to maintain or support the Software. RTI shall +# not be liable for any incidental or consequential damages arising out of the +# use or inability to use the software. +# +cmake_minimum_required(VERSION 3.11) +project(SocketAdapterCpp) + +# Find RTI Connext dependencies +list(APPEND CMAKE_MODULE_PATH + "${CMAKE_CURRENT_SOURCE_DIR}/../../../resources/cmake/Modules" +) +include(ConnextDdsConfigureCmakeUtils) +connextdds_configure_cmake_utils() + +find_package( + RTIConnextDDS "7.3.0" + REQUIRED + COMPONENTS + core + routing_service +) + +# It may not be necessary to include the hpp files +add_library(${PROJECT_NAME} + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketAdapter.cxx" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketAdapter.hpp" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketConnection.cxx" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketConnection.hpp" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketInputDiscoveryStreamReader.cxx" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketInputDiscoveryStreamReader.hpp" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketStreamReader.cxx" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketStreamReader.hpp" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketStreamWriter.cxx" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketStreamWriter.hpp" + "${CMAKE_CURRENT_SOURCE_DIR}/src/UdpSocket.cxx" + "${CMAKE_CURRENT_SOURCE_DIR}/src/UdpSocket.hpp" +) + +set_property(TARGET ${PROJECT_NAME} PROPERTY CXX_STANDARD 11) +set_property(TARGET ${PROJECT_NAME} PROPERTY CXX_STANDARD_REQUIRED ON) + +target_link_libraries(${PROJECT_NAME} + RTIConnextDDS::routing_service_infrastructure + RTIConnextDDS::cpp2_api +) + +# To differentiate between debug and release builds +set_target_properties(${PROJECT_NAME} + PROPERTIES + DEBUG_POSTFIX "d" +) diff --git a/examples/routing_service/udp_socket_adapter_typed/README.md b/examples/routing_service/udp_socket_adapter_typed/README.md new file mode 100644 index 000000000..e69bfb04a --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_typed/README.md @@ -0,0 +1,168 @@ +# Example Code: Routing Service C++11 Socket Adapter + +## Example Description + +This example shows how to implement a simple Routing Service Adapter plugin +in C++11 to receive data from a UDP socket using RTI Routing Service. + +The code in this directory provides the following components: + +- `src/SocketAdapter` implements the plugin that is loaded by *RTI Routing +Service*. It is responsible to create and delete connections. +- `src/SocketConnection` implements a connection. This component is +responsible of the creation and deletion of `StreamReaders`. +- `src/SocketInputDiscoveryStreamReader` implements the logic necessary to +propagate information about the discovered input streams (in this case +sockets) to the Routing Service. +- `src/SocketStreamReader` implements an `StreamReader` that reads sample +information from a UDP socket. +- `test/send_shape_to_socket.py` implements a simple tester to send shape +type data to a UDP socket. + +For more details, please refer to the *RTI Routing Service SDK* documentation. + +## Building C++ example + +In order to build this example, you need to define the variables +`CONNEXTDDS_DIR` and `CONNEXTDDS_ARCH`. You can do so by exporting them +manually, by sourcing the `rtisetenv` script for your architecture, or by +passing them to the `cmake` command as arguments: + +```bash +mkdir build +cd build +cmake -DCONNEXTDDS_DIR= \ # If not exported + -DCONNEXTDDS_ARCH= \ # If not exported + -DBUILD_SHARED_LIBS=ON|OFF \ # ON is preferred + -DCMAKE_BUILD_TYPE=Debug|Release .. +cmake --build . +cd .. +``` + +Example command for Windows: + +```bash +cmake .. -DCONNEXTDDS_DIR="%NDDSHOME%" -DCONNEXTDDS_ARCH=x64Win64VS2015 -DBUILD_SHARED_LIBS=ON -DCMAKE_BUILD_TYPE=Release -A x64 -G "Visual Studio 17 2022" +cd .. +``` + +**Note**: You do not need to define `CONNEXTDDS_ARCH` if you only have one +architecture target installed in your system. + +**Note**: When compiling on a Windows 64-bit machine you will need to add the +`-A x64` parameter to the call to CMake. + +**Note:** If you are using a multi-configuration generator, such as Visual +Studio Solutions, you can specify the configuration mode to build as follows: + +```bash +cmake --build . --config Release|Debug +``` + +Here is more information about generating +[Visual Studio Solutions for Windows using CMake](https://cmake.org/cmake/help/v3.16/generator/Visual%20Studio%2016%202019.html#platform-selection). + +**Note:** `BUILD_SHARED_LIBS` allows you to control if the generated library +for this example is a static or a dynamic shared library. The following +sections assume you are building a dynamic shared library. However, Routing +Service also supports static linking of adapters. To use this functionality +you would need to create an application that uses Routing Service as a library +component and statically links to this `SocketAdapter` library. + +### Cross-compilation + +When you need to cross-compile the example, the above +command will not work, the assigned compiler won't be the cross-compiler and +errors may happen when linking against the cross-compiled Connext binaries. +To fix this, you have to create a file with the architecture name and call +CMake with a specific flag called ``-DCMAKE_TOOLCHAIN_FILE``. +An example of the file to create with the toolchain settings (e.g. for an +ARM architectures): + +```cmake +set(CMAKE_SYSTEM_NAME Linux) +set(toolchain_path "/arm-bcm2708/gcc-linaro-arm-linux-gnueabihf-raspbian") +set(CMAKE_C_COMPILER "${toolchain_path}/bin/arm-linux-gnueabihf-gcc") +set(CMAKE_CXX_COMPILER "${toolchain_path}/bin/arm-linux-gnueabihf-g++") +``` + +Then you can call CMake like this: + +```bash +cmake -DCONNEXTDDS_DIR= -DCMAKE_TOOLCHAIN_FILE= + -DCONNEXTDDS_ARCH= .. +``` + +## Running C++ example + +To run the example, you just need to run the following commands from the top +level folder. This example has been written to allow easy experimentation with +the Shapes Demo shipped with *RTI Connext DDS* installer bundle. You will find +some hardcoded references to ShapeType and Square. If you wish to create a +real Routing Service adapter, you should modify the code and XML accordingly. + +There is 1 configuration (`-cfgName`) in the Routing Service XML file: + +- **SocketAdapterToDDS** - It reads data from a UDP socket using the +SocketAdapter and outputs it to DDS. You can visualize the ouptut by +subscribing to Squares in Shapes Demo or running: + +```bash + $NDDSHOME/bin/rtiddsspy -printSample +``` + +To run Routing Service, you will need first to set up your environment as +follows. + +Before running the RTI Routing Service, you need to specify where the +`SocketAdapterCpp` library is located as shown below: + +Linux: + +```bash +$export RTI_LD_LIBRARY_PATH=$NDDSHOME/lib/: +``` + +Windows: + +```bash +set PATH=%NDDSHOME%/lib/; +``` + +The SocketAdapterCpp library will be in the `./build` folder. + +```bash +# From the build/ directory +$NDDSHOME/bin/rtiroutingservice -cfgFile RsSocketAdapter.xml -cfgName SocketAdapterToDDS +``` + +Here is an output from a sample run: + +```bash +$export RTI_LD_LIBRARY_PATH=~/$NDDSHOME/lib/$CONNEXT_ARCH:~/udp_socket_adapter/build/ + +$ $NDDSHOME/bin/rtiroutingservice -cfgFile RsSocketAdapter.xml -cfgName SocketAdapterToDDS +RTI Routing Service 7.3.0 executing (with name SocketAdapterToSocketAdapter) +``` + +Now you'll need to send data to the UDP sockets. By default, Shapes are +expected on `127.0.0.1:10203`. You can change these default values on +`RsSocketAdapter.xml`. + +To run the Shape tester that mimics a legacy UDP socket sender, run: + +```bash +python3 test/send_shape_to_socket.py 127.0.0.1 10203 +``` + +You can now open a Shapes Demo instance on domain 0 and subscribe to Squares. +You should start receiving a red Square. + +## Requirements + +To run this example you will need: + +- RTI Connext Professional version 6.0.0 or higher. +- CMake version 3.10 or higher. +- A target platform with support for RTI Routing Service and C++11. +- Python3. diff --git a/examples/routing_service/udp_socket_adapter/RsSocketAdapter.xml b/examples/routing_service/udp_socket_adapter_typed/RsSocketAdapter.xml similarity index 100% rename from examples/routing_service/udp_socket_adapter/RsSocketAdapter.xml rename to examples/routing_service/udp_socket_adapter_typed/RsSocketAdapter.xml diff --git a/examples/routing_service/udp_socket_adapter/Types.xml b/examples/routing_service/udp_socket_adapter_typed/Types.xml similarity index 100% rename from examples/routing_service/udp_socket_adapter/Types.xml rename to examples/routing_service/udp_socket_adapter_typed/Types.xml diff --git a/examples/routing_service/udp_socket_adapter_typed/src/SocketAdapter.cxx b/examples/routing_service/udp_socket_adapter_typed/src/SocketAdapter.cxx new file mode 100644 index 000000000..a649473f0 --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketAdapter.cxx @@ -0,0 +1,49 @@ +/* + * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#include "SocketAdapter.hpp" +#include "SocketConnection.hpp" + +using namespace rti::routing; +using namespace rti::routing::adapter; + +SocketAdapter::SocketAdapter(PropertySet &properties) +{ +} + +Connection *SocketAdapter::create_connection( + rti::routing::adapter::detail::StreamReaderListener + *input_stream_discovery_listener, + rti::routing::adapter::detail::StreamReaderListener + *output_stream_discovery_listener, + const PropertySet &properties) +{ + return new SocketConnection( + input_stream_discovery_listener, + output_stream_discovery_listener, + properties); +} + +void SocketAdapter::delete_connection(Connection *connection) +{ + /** + * Perform cleanup pertaining to the connection object here. + */ + delete connection; +} + +rti::config::LibraryVersion SocketAdapter::get_version() const +{ + return { 1, 0, 0, 'r' }; +} + +RTI_ADAPTER_PLUGIN_CREATE_FUNCTION_DEF(SocketAdapter) diff --git a/examples/routing_service/udp_socket_adapter_typed/src/SocketAdapter.hpp b/examples/routing_service/udp_socket_adapter_typed/src/SocketAdapter.hpp new file mode 100644 index 000000000..aec7eee95 --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketAdapter.hpp @@ -0,0 +1,52 @@ +/* + * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#ifndef SOCKETADAPTER_HPP +#define SOCKETADAPTER_HPP + +#include +#include +#include +#include +#include + +/* + * This is the initialization of the RS adapter. For simplicity, this adapter + * only reads from UDP and writes to DDS, not the other way around + */ +class SocketAdapter : public rti::routing::adapter::AdapterPlugin { +public: + explicit SocketAdapter(rti::routing::PropertySet &); + + rti::routing::adapter::Connection *create_connection( + rti::routing::adapter::detail::StreamReaderListener *, + rti::routing::adapter::detail::StreamReaderListener *, + const rti::routing::PropertySet &) final; + + void delete_connection(rti::routing::adapter::Connection *connection) final; + + rti::config::LibraryVersion get_version() const; +}; + +/** + * This macro defines a C-linkage symbol that can be used as create function + * for plug-in registration through XML. + * + * The generated symbol has the name: + * + * \code + * SocketAdapterPlugin_create_adapter_plugin + * \endcode + */ +RTI_ADAPTER_PLUGIN_CREATE_FUNCTION_DECL(SocketAdapter) + +#endif diff --git a/examples/routing_service/udp_socket_adapter_typed/src/SocketConnection.cxx b/examples/routing_service/udp_socket_adapter_typed/src/SocketConnection.cxx new file mode 100644 index 000000000..20f1cb2bd --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketConnection.cxx @@ -0,0 +1,72 @@ +/* + * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#include "SocketConnection.hpp" +#include "SocketStreamReader.hpp" +#include "SocketStreamWriter.hpp" + +using namespace rti::routing; +using namespace rti::routing::adapter; + +SocketConnection::SocketConnection( + StreamReaderListener *input_stream_discovery_listener, + StreamReaderListener *output_stream_discovery_listener, + const PropertySet &properties) + : input_discovery_reader_( + properties, + input_stream_discovery_listener) {}; + +StreamReader *SocketConnection::create_stream_reader( + Session *session, + const StreamInfo &info, + const PropertySet &properties, + StreamReaderListener *listener) +{ + return new SocketStreamReader(this, info, properties, listener); +} + +StreamWriter *SocketConnection::create_stream_writer( + Session *session, + const StreamInfo &info, + const PropertySet &properties) +{ + return new SocketStreamWriter(this, info, properties); +} + +void SocketConnection::delete_stream_reader(StreamReader *reader) +{ + SocketStreamReader *socket_reader = + dynamic_cast(reader); + socket_reader->shutdown_socket_reader_thread(); + delete reader; +} +void SocketConnection::delete_stream_writer(StreamWriter *writer) +{ + SocketStreamWriter *socket_writer = dynamic_cast(writer); + delete writer; +} + +DiscoveryStreamReader *SocketConnection::input_stream_discovery_reader() +{ + return &input_discovery_reader_; +} + +DiscoveryStreamReader *SocketConnection::output_stream_discovery_reader() +{ + return nullptr; +} + +void SocketConnection::dispose_discovery_stream( + const rti::routing::StreamInfo &stream_info) +{ + input_discovery_reader_.dispose(stream_info); +} diff --git a/examples/routing_service/udp_socket_adapter_typed/src/SocketConnection.hpp b/examples/routing_service/udp_socket_adapter_typed/src/SocketConnection.hpp new file mode 100644 index 000000000..e1171cf3a --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketConnection.hpp @@ -0,0 +1,72 @@ +/* + * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#ifndef SOCKETCONNECTION_HPP +#define SOCKETCONNECTION_HPP + +#include +#include + + #include "SocketInputDiscoveryStreamReader.hpp" + +/* + * This class creates the RS Connection, which is an access point to our + * example data domain (a UDP socket) + */ +class SocketConnection : public rti::routing::adapter::Connection { +public: + SocketConnection( + rti::routing::adapter::StreamReaderListener + *input_stream_discovery_listener, + rti::routing::adapter::StreamReaderListener + *output_stream_discovery_listener, + const rti::routing::PropertySet &properties); + + rti::routing::adapter::StreamReader *create_stream_reader( + rti::routing::adapter::Session *session, + const rti::routing::StreamInfo &info, + const rti::routing::PropertySet &properties, + rti::routing::adapter::StreamReaderListener *listener) final; + + rti::routing::adapter::StreamWriter *create_stream_writer( + rti::routing::adapter::Session *session, + const rti::routing::StreamInfo &info, + const rti::routing::PropertySet &properties) final; + + void delete_stream_reader( + rti::routing::adapter::StreamReader *reader) final; + + void delete_stream_writer( + rti::routing::adapter::StreamWriter *writer) final; + + rti::routing::adapter::DiscoveryStreamReader * + input_stream_discovery_reader() final; + + rti::routing::adapter::DiscoveryStreamReader * + output_stream_discovery_reader() final; + + /** + * @brief This function is called by the SocketStreamReader to indicate + * that it's time to dispose the route. The dispose set by the + * SocketInputDiscoveryStreamReader starts the chain of cleanup procedure. + * + * @param stream_info \b in. Reference to a StreamInfo object which should + * be used when creating a new StreamInfo sample with disposed set to true + */ + void dispose_discovery_stream( + const rti::routing::StreamInfo &stream_info); + + private: + SocketInputDiscoveryStreamReader input_discovery_reader_; +}; + +#endif diff --git a/examples/routing_service/udp_socket_adapter/src/SocketInputDiscoveryStreamReader.cxx b/examples/routing_service/udp_socket_adapter_typed/src/SocketInputDiscoveryStreamReader.cxx similarity index 100% rename from examples/routing_service/udp_socket_adapter/src/SocketInputDiscoveryStreamReader.cxx rename to examples/routing_service/udp_socket_adapter_typed/src/SocketInputDiscoveryStreamReader.cxx diff --git a/examples/routing_service/udp_socket_adapter/src/SocketInputDiscoveryStreamReader.hpp b/examples/routing_service/udp_socket_adapter_typed/src/SocketInputDiscoveryStreamReader.hpp similarity index 100% rename from examples/routing_service/udp_socket_adapter/src/SocketInputDiscoveryStreamReader.hpp rename to examples/routing_service/udp_socket_adapter_typed/src/SocketInputDiscoveryStreamReader.hpp diff --git a/examples/routing_service/udp_socket_adapter/src/SocketStreamReader.cxx b/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamReader.cxx similarity index 100% rename from examples/routing_service/udp_socket_adapter/src/SocketStreamReader.cxx rename to examples/routing_service/udp_socket_adapter_typed/src/SocketStreamReader.cxx diff --git a/examples/routing_service/udp_socket_adapter/src/SocketStreamReader.hpp b/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamReader.hpp similarity index 100% rename from examples/routing_service/udp_socket_adapter/src/SocketStreamReader.hpp rename to examples/routing_service/udp_socket_adapter_typed/src/SocketStreamReader.hpp diff --git a/examples/routing_service/udp_socket_adapter/src/SocketStreamWriter.cxx b/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamWriter.cxx similarity index 100% rename from examples/routing_service/udp_socket_adapter/src/SocketStreamWriter.cxx rename to examples/routing_service/udp_socket_adapter_typed/src/SocketStreamWriter.cxx diff --git a/examples/routing_service/udp_socket_adapter/src/SocketStreamWriter.hpp b/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamWriter.hpp similarity index 93% rename from examples/routing_service/udp_socket_adapter/src/SocketStreamWriter.hpp rename to examples/routing_service/udp_socket_adapter_typed/src/SocketStreamWriter.hpp index 25166aceb..0095a97a1 100644 --- a/examples/routing_service/udp_socket_adapter/src/SocketStreamWriter.hpp +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamWriter.hpp @@ -73,13 +73,6 @@ class SocketStreamWriter : public rti::routing::adapter::DynamicDataStreamWriter rti::routing::StreamInfo stream_info_; dds::core::xtypes::DynamicType *adapter_type_; - struct doNothing { - RTI_INT32 CountUp; - RTI_INT32 CountDown; - RTI_INT32 Pause; - //dds::core::optionalObjectId; - RTI_UINT32 ObjectId; - }; struct ShapeType { int x; int y; diff --git a/examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.cxx b/examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.cxx new file mode 100644 index 000000000..164468c8c --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.cxx @@ -0,0 +1,111 @@ +/* + * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#include "UdpSocket.hpp" + +#include + + +UdpSocket::UdpSocket(const char *ip, int port) +{ +#ifdef _WIN32 + WSADATA wsaData; + if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) { + throw dds::core::IllegalOperationError("WSAStartup failed"); + } +#endif + + // Socket initialization + init_socket(); + memset(&server_addr, 0, sizeof(server_addr)); + + // Using non-blocking sockets for easier thread management +#ifdef _WIN32 + unsigned long nonBlocking = 1; + if (ioctlsocket(sockfd, FIONBIO, &nonBlocking) != 0) { + std::cerr << "Error setting socket to non-blocking\n"; + closesocket(sockfd); + WSACleanup(); + throw dds::core::IllegalOperationError("ioctlsocket failed"); + } +#else + fcntl(sockfd, F_SETFL, O_NONBLOCK); +#endif + + // Bind the socket + bind_socket(ip, port); +} + +UdpSocket::~UdpSocket() +{ +#ifdef _WIN32 + closesocket(sockfd); + WSACleanup(); +#else + close(sockfd); +#endif +} + +void UdpSocket::init_socket() +{ + if ((sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { + throw dds::core::IllegalOperationError("Socket creation failed"); + } +} + +void UdpSocket::bind_socket(const char *ip, int port) +{ + server_addr.sin_family = AF_INET; + inet_pton(AF_INET, ip, &(server_addr.sin_addr)); + server_addr.sin_port = htons(port); + + if (bind(sockfd, + (const struct sockaddr *) &server_addr, + sizeof(server_addr)) + == -1) { + throw dds::core::IllegalOperationError("Bind failed"); + } +} + +void UdpSocket::receive_data( + char *received_buffer, + int *received_bytes, + int size_of_original_buffer) +{ + socklen_t len = sizeof(server_addr); + + socklen_t client_addr_len = sizeof(client_addr); + + /** Receive data.Since it's non-blocking, it will return right away most + * of the times + */ + *received_bytes = recvfrom( + sockfd, + received_buffer, + size_of_original_buffer, + 0, + (struct sockaddr *) &client_addr, + &client_addr_len); + + return; +} + +int UdpSocket::send_data(char* tx_buffer, int tx_length, const char* destAddr, int destPort) +{ + sockaddr_in dest_addr; + dest_addr.sin_family = AF_INET; + dest_addr.sin_port = htons(destPort); + dest_addr.sin_addr.s_addr = inet_addr(destAddr); + + size_t length = sendto(sockfd, tx_buffer, tx_length, 0, (struct sockaddr*)&dest_addr, sizeof(dest_addr)); + return (int)length; +} \ No newline at end of file diff --git a/examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.hpp b/examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.hpp new file mode 100644 index 000000000..ba03954ed --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.hpp @@ -0,0 +1,61 @@ +/* + * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#ifndef UDPSOCKETUTILS_HPP +#define UDPSOCKETUTILS_HPP + +#ifdef _WIN32 + #include + #include +#else + #include + #include + #include + #include + #include + #include +#endif + +#include + +#ifdef _WIN32 + #pragma comment(lib, "ws2_32.lib") +#endif + +class UdpSocket { +public: + UdpSocket(const char* ip, int port); + ~UdpSocket(); + void receive_data( + char* received_buffer, + int* received_bytes, + int size_of_original_buffer); + + int send_data( + char* tx_buffer, + int tx_length, + const char* destAddr, + int destPort); + +private: +#ifdef _WIN32 + SOCKET sockfd; +#else + int sockfd; +#endif + struct sockaddr_in server_addr, client_addr; + + void init_socket(); + void bind_socket(const char* ip, int port); +}; + +#endif diff --git a/examples/routing_service/udp_socket_adapter_typed/test/read_shape_from_socket.py b/examples/routing_service/udp_socket_adapter_typed/test/read_shape_from_socket.py new file mode 100644 index 000000000..f29f078e5 --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_typed/test/read_shape_from_socket.py @@ -0,0 +1,47 @@ +import socket +import sys +import struct + +# Receive and parse data from the socket +def receive_data(sock): + # Receive data from the socket + data, addr = sock.recvfrom(1024) # Buffer size of 1024 bytes + # Unpack the data as 3 int types (x, y, shapesize) + x, y, size = struct.unpack("iii", data) + return x, y, size, addr + + +def main(): + if len(sys.argv) != 2: + print( + "Usage: python3 read_shape_from_socket.py " + ) + return + + # Input arguments + port = int(sys.argv[1]) + + samples_received = 0 + + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: + try: + # Bind the socket to listen on the specified port + sock.bind(('', port)) + print(f"Listening for shape data on port {port}...") + + while True: + # Receive data from the socket + x, y, size, addr = receive_data(sock) + + # Print the received shape data + samples_received += 1 + print(f"Sample #{samples_received} from {addr}: x={x}, y={y}, size={size}") + + except KeyboardInterrupt: + print("\nExiting...") + except Exception as e: + print(f"Error: {e}") + + +if __name__ == "__main__": + main() diff --git a/examples/routing_service/udp_socket_adapter_typed/test/send_shape_to_socket.py b/examples/routing_service/udp_socket_adapter_typed/test/send_shape_to_socket.py new file mode 100644 index 000000000..9e000b61c --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_typed/test/send_shape_to_socket.py @@ -0,0 +1,85 @@ +import socket +import sys +import time +import struct +import random + +shapesize = 30 +step_size = 5 + +lower_bound = shapesize // 2 +upper_bound_x = 235 - (shapesize // 2) +upper_bound_y = 265 - (shapesize // 2) + +# The following methods are used to simulate a moving shape +def initialize_position(): + x = random.randint(lower_bound, upper_bound_x) + y = random.randint(lower_bound, upper_bound_y) + direction_x = random.choice([-1, 1]) + direction_y = random.choice([-1, 1]) + return x, y, direction_x, direction_y + + +def move_position(x, y, direction_x, direction_y): + x += direction_x * step_size + y += direction_y * step_size + + if x <= lower_bound or x >= upper_bound_x: + direction_x = -direction_x + x = max(lower_bound, min(x, upper_bound_x)) + + if y <= lower_bound or y >= upper_bound_y: + direction_y = -direction_y + y = max(lower_bound, min(y, upper_bound_y)) + + return x, y, direction_x, direction_y + + +# Send the data to the socket +def send_data(sock, server_address, port, x, y): + # The data is "packed" as 3 int types + data = struct.pack("iii", x, y, shapesize) + sock.sendto(data, (server_address, port)) + + +def main(): + if len(sys.argv) != 3: + print( + "Usage: python3 send_shape_to_socket_tester.py " + ) + return + + # Input arguments + server_address = sys.argv[1] + port = int(sys.argv[2]) + + x, y, direction_x, direction_y = initialize_position() + + samples_sent = 0 + + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: + try: + print("Sending 16 sample/s...") + + while True: + # Figure out the next position of the shape + x, y, direction_x, direction_y = move_position( + x, y, direction_x, direction_y + ) + # Send the data to the socket + send_data(sock, server_address, port, x, y) + + # Simple counter to print every 100 messages + samples_sent += 1 + if samples_sent % 100 == 0: + print(f"Samples sent: {samples_sent}") + + # Same frequency as Shapes Demo + time.sleep(1 / 16) + + except KeyboardInterrupt: + print("\nExiting...") + + +if __name__ == "__main__": + main() From 962e46ba460d32775e02f666c6e983a1f94b9610 Mon Sep 17 00:00:00 2001 From: Ignacio Utrilla Date: Tue, 8 Jul 2025 13:15:22 +0200 Subject: [PATCH 4/9] Edited Readme files and minor adjustments --- .../test/read_shape_from_socket.py | 0 .../udp_socket_adapter_dynamic/README.md | 83 +++++++++++++----- .../RsSocketAdapter.xml | 58 +++++++------ .../src/SocketConnection.cxx | 9 +- .../src/SocketStreamReader.cxx | 50 ++++------- .../test/read_shape_from_socket.py | 71 ---------------- .../test/send_shape_to_socket.py | 85 ------------------- .../udp_socket_adapter_typed/README.md | 68 ++++++++++++++- .../RsSocketAdapter.xml | 22 +++-- 9 files changed, 201 insertions(+), 245 deletions(-) create mode 100644 examples/routing_service/udp_socket_adapter/test/read_shape_from_socket.py delete mode 100644 examples/routing_service/udp_socket_adapter_dynamic/test/read_shape_from_socket.py delete mode 100644 examples/routing_service/udp_socket_adapter_dynamic/test/send_shape_to_socket.py diff --git a/examples/routing_service/udp_socket_adapter/test/read_shape_from_socket.py b/examples/routing_service/udp_socket_adapter/test/read_shape_from_socket.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/routing_service/udp_socket_adapter_dynamic/README.md b/examples/routing_service/udp_socket_adapter_dynamic/README.md index e69bfb04a..73a5b00c2 100644 --- a/examples/routing_service/udp_socket_adapter_dynamic/README.md +++ b/examples/routing_service/udp_socket_adapter_dynamic/README.md @@ -5,6 +5,9 @@ This example shows how to implement a simple Routing Service Adapter plugin in C++11 to receive data from a UDP socket using RTI Routing Service. +This examples uses dynamic data API and there is no need to know the data type +information beforehand. + The code in this directory provides the following components: - `src/SocketAdapter` implements the plugin that is loaded by *RTI Routing @@ -16,8 +19,9 @@ propagate information about the discovered input streams (in this case sockets) to the Routing Service. - `src/SocketStreamReader` implements an `StreamReader` that reads sample information from a UDP socket. -- `test/send_shape_to_socket.py` implements a simple tester to send shape -type data to a UDP socket. +- `src/SocketStreamWriter` implements an `StreamWriter` that sends sample +information to a UDP socket. + For more details, please refer to the *RTI Routing Service SDK* documentation. @@ -97,19 +101,17 @@ cmake -DCONNEXTDDS_DIR= -DCMAKE_TOOLCHAIN_FILE= - + - + - 0 + 1 - - + - ON_DOMAIN_MATCH - ShapeType - Square - + IMMEDIATE + PingType + PingStream + @@ -40,27 +38,38 @@ receive_port - 10204 + 10203 - ON_ROUTE_MATCH - ShapeType - - Square + ON_DOMAIN_MATCH + PingType + PingTopic + + + + + + + + + + + 0 + + + + ON_ROUTE_MATCH - - ShapeType - Square + PingType + PingStream @@ -79,16 +88,15 @@ dest_port - 10204 + 10203 ON_DOMAIN_MATCH - - ShapeType - Square + PingType + PingTopic diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketConnection.cxx b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketConnection.cxx index 20f1cb2bd..ca7b36199 100644 --- a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketConnection.cxx +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketConnection.cxx @@ -13,6 +13,7 @@ #include "SocketConnection.hpp" #include "SocketStreamReader.hpp" #include "SocketStreamWriter.hpp" +#include "SocketStreamWriter.hpp" using namespace rti::routing; using namespace rti::routing::adapter; @@ -35,10 +36,10 @@ StreamReader *SocketConnection::create_stream_reader( } StreamWriter *SocketConnection::create_stream_writer( - Session *session, - const StreamInfo &info, - const PropertySet &properties) -{ + Session *session, + const StreamInfo &info, + const PropertySet &properties) + { return new SocketStreamWriter(this, info, properties); } diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.cxx b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.cxx index d7d41b634..4c6038268 100644 --- a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.cxx +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.cxx @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2019 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form @@ -12,27 +12,28 @@ #include #include +#include #include #include -#include -#include #include +#include #ifdef _WIN32 #include #pragma comment(lib, "ws2_32.lib") #else - #include - #include - #include #include + #include + #include + #include #include #endif - #include "SocketStreamReader.hpp" #include +#include #include + using namespace dds::core::xtypes; using namespace rti::routing; using namespace rti::routing::adapter; @@ -40,30 +41,26 @@ using namespace rti::routing::adapter; void SocketStreamReader::socket_reading_thread() { while (!stop_thread_) { - /** - * Essential to protect against concurrent data access to - * buffer_ from the take() methods running on a different - * Routing Service thread. - */ - std::unique_lock lock(buffer_mutex_); + int received_bytes = 0; socket->receive_data( received_buffer_, - &received_bytes_, + &received_bytes, BUFFER_MAX_SIZE); - lock.unlock(); // Most likely received nothing or there was an error // Not doing any error handling here - if (received_bytes_ <= 0) { + if (received_bytes <= 0) { // Sleep for a small period of time to avoid busy waiting std::this_thread::sleep_for(std::chrono::milliseconds(10)); continue; } - /** * Here we notify Routing Service, that there is data available * on the StreamReader, triggering a call to take(). */ + + received_bytes_ = received_bytes; + reader_listener_->on_data_available(this); } @@ -80,8 +77,7 @@ SocketStreamReader::SocketStreamReader( { socket_connection_ = connection; reader_listener_ = listener; - adapter_type_ = - static_cast(info.type_info().type_representation()); + adapter_type_ = static_cast(info.type_info().type_representation()); // Parse the properties provided in the xml configuration file for (const auto &property : properties) { @@ -92,26 +88,13 @@ SocketStreamReader::SocketStreamReader( } } - // If any of the mandatory properties is not specified, throw exception - if (receive_address_.size() == 0 || receive_port_ == 0) { - throw dds::core::IllegalOperationError( - "You must set receive_address and receive_port " - "in the RsSocketAdapter.xml file"); - } - - // Create the UDP socket to receive data socket = std::unique_ptr( new UdpSocket(receive_address_.c_str(), receive_port_)); - // Start the receive thread for UDP data socketreader_thread_ = std::thread(&SocketStreamReader::socket_reading_thread, this); } -/** - * This is the Routing Service take(). It's called when the - * socket_receive_thread calls on_data_available() - */ void SocketStreamReader::take( std::vector &samples, std::vector &infos) @@ -119,8 +102,7 @@ void SocketStreamReader::take( dds::core::xtypes::DynamicData deserialized_sample(*adapter_type_); std::vector received_buffer = std::vector(received_buffer_, received_buffer_ + received_bytes_); rti::core::xtypes::from_cdr_buffer(deserialized_sample, received_buffer); - std::cout << deserialized_sample << std::endl; - + samples.resize(1); infos.resize(1); diff --git a/examples/routing_service/udp_socket_adapter_dynamic/test/read_shape_from_socket.py b/examples/routing_service/udp_socket_adapter_dynamic/test/read_shape_from_socket.py deleted file mode 100644 index 211b79456..000000000 --- a/examples/routing_service/udp_socket_adapter_dynamic/test/read_shape_from_socket.py +++ /dev/null @@ -1,71 +0,0 @@ -import socket -import sys -import struct - -# The following methods are used to simulate a moving shape (not used in this version) -def initialize_position(): - x = random.randint(lower_bound, upper_bound_x) - y = random.randint(lower_bound, upper_bound_y) - direction_x = random.choice([-1, 1]) - direction_y = random.choice([-1, 1]) - return x, y, direction_x, direction_y - - -def move_position(x, y, direction_x, direction_y): - x += direction_x * step_size - y += direction_y * step_size - - if x <= lower_bound or x >= upper_bound_x: - direction_x = -direction_x - x = max(lower_bound, min(x, upper_bound_x)) - - if y <= lower_bound or y >= upper_bound_y: - direction_y = -direction_y - y = max(lower_bound, min(y, upper_bound_y)) - - return x, y, direction_x, direction_y - - -# Receive and parse data from the socket -def receive_data(sock): - # Receive data from the socket - data, addr = sock.recvfrom(1024) # Buffer size of 1024 bytes - # Unpack the data as 3 int types (x, y, shapesize) - x, y, size = struct.unpack("iii", data) - return x, y, size, addr - - -def main(): - if len(sys.argv) != 2: - print( - "Usage: python3 read_shape_from_socket.py " - ) - return - - # Input arguments - port = int(sys.argv[1]) - - samples_received = 0 - - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: - try: - # Bind the socket to listen on the specified port - sock.bind(('', port)) - print(f"Listening for shape data on port {port}...") - - while True: - # Receive data from the socket - x, y, size, addr = receive_data(sock) - - # Print the received shape data - samples_received += 1 - print(f"Sample #{samples_received} from {addr}: x={x}, y={y}, size={size}") - - except KeyboardInterrupt: - print("\nExiting...") - except Exception as e: - print(f"Error: {e}") - - -if __name__ == "__main__": - main() diff --git a/examples/routing_service/udp_socket_adapter_dynamic/test/send_shape_to_socket.py b/examples/routing_service/udp_socket_adapter_dynamic/test/send_shape_to_socket.py deleted file mode 100644 index 9e000b61c..000000000 --- a/examples/routing_service/udp_socket_adapter_dynamic/test/send_shape_to_socket.py +++ /dev/null @@ -1,85 +0,0 @@ -import socket -import sys -import time -import struct -import random - -shapesize = 30 -step_size = 5 - -lower_bound = shapesize // 2 -upper_bound_x = 235 - (shapesize // 2) -upper_bound_y = 265 - (shapesize // 2) - -# The following methods are used to simulate a moving shape -def initialize_position(): - x = random.randint(lower_bound, upper_bound_x) - y = random.randint(lower_bound, upper_bound_y) - direction_x = random.choice([-1, 1]) - direction_y = random.choice([-1, 1]) - return x, y, direction_x, direction_y - - -def move_position(x, y, direction_x, direction_y): - x += direction_x * step_size - y += direction_y * step_size - - if x <= lower_bound or x >= upper_bound_x: - direction_x = -direction_x - x = max(lower_bound, min(x, upper_bound_x)) - - if y <= lower_bound or y >= upper_bound_y: - direction_y = -direction_y - y = max(lower_bound, min(y, upper_bound_y)) - - return x, y, direction_x, direction_y - - -# Send the data to the socket -def send_data(sock, server_address, port, x, y): - # The data is "packed" as 3 int types - data = struct.pack("iii", x, y, shapesize) - sock.sendto(data, (server_address, port)) - - -def main(): - if len(sys.argv) != 3: - print( - "Usage: python3 send_shape_to_socket_tester.py " - ) - return - - # Input arguments - server_address = sys.argv[1] - port = int(sys.argv[2]) - - x, y, direction_x, direction_y = initialize_position() - - samples_sent = 0 - - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: - try: - print("Sending 16 sample/s...") - - while True: - # Figure out the next position of the shape - x, y, direction_x, direction_y = move_position( - x, y, direction_x, direction_y - ) - # Send the data to the socket - send_data(sock, server_address, port, x, y) - - # Simple counter to print every 100 messages - samples_sent += 1 - if samples_sent % 100 == 0: - print(f"Samples sent: {samples_sent}") - - # Same frequency as Shapes Demo - time.sleep(1 / 16) - - except KeyboardInterrupt: - print("\nExiting...") - - -if __name__ == "__main__": - main() diff --git a/examples/routing_service/udp_socket_adapter_typed/README.md b/examples/routing_service/udp_socket_adapter_typed/README.md index e69bfb04a..3a9b5be0d 100644 --- a/examples/routing_service/udp_socket_adapter_typed/README.md +++ b/examples/routing_service/udp_socket_adapter_typed/README.md @@ -5,6 +5,8 @@ This example shows how to implement a simple Routing Service Adapter plugin in C++11 to receive data from a UDP socket using RTI Routing Service. +This example requires including a Types.xml file with the data type information. + The code in this directory provides the following components: - `src/SocketAdapter` implements the plugin that is loaded by *RTI Routing @@ -16,8 +18,12 @@ propagate information about the discovered input streams (in this case sockets) to the Routing Service. - `src/SocketStreamReader` implements an `StreamReader` that reads sample information from a UDP socket. +- `src/SocketStreamWriter` implements an `StreamWriter` that sends sample +information to a UDP socket. - `test/send_shape_to_socket.py` implements a simple tester to send shape type data to a UDP socket. +- `test/receive_shape_from_socket.py` implements a simple tester to receive shape +type data from a UDP socket. For more details, please refer to the *RTI Routing Service SDK* documentation. @@ -101,7 +107,7 @@ the Shapes Demo shipped with *RTI Connext DDS* installer bundle. You will find some hardcoded references to ShapeType and Square. If you wish to create a real Routing Service adapter, you should modify the code and XML accordingly. -There is 1 configuration (`-cfgName`) in the Routing Service XML file: +There are 2 configurations (`-cfgName`) in the Routing Service XML file: - **SocketAdapterToDDS** - It reads data from a UDP socket using the SocketAdapter and outputs it to DDS. You can visualize the ouptut by @@ -111,6 +117,8 @@ subscribing to Squares in Shapes Demo or running: $NDDSHOME/bin/rtiddsspy -printSample ``` +- **DDSToSocketAdapter** - It sends data from DDS to a UDP socket. + To run Routing Service, you will need first to set up your environment as follows. @@ -139,7 +147,7 @@ $NDDSHOME/bin/rtiroutingservice -cfgFile RsSocketAdapter.xml -cfgName SocketAdap Here is an output from a sample run: ```bash -$export RTI_LD_LIBRARY_PATH=~/$NDDSHOME/lib/$CONNEXT_ARCH:~/udp_socket_adapter/build/ +$export RTI_LD_LIBRARY_PATH=~/$NDDSHOME/lib/$CONNEXT_ARCH:~/udp_socket_adapter_typed/build/ $ $NDDSHOME/bin/rtiroutingservice -cfgFile RsSocketAdapter.xml -cfgName SocketAdapterToDDS RTI Routing Service 7.3.0 executing (with name SocketAdapterToSocketAdapter) @@ -158,6 +166,62 @@ python3 test/send_shape_to_socket.py 127.0.0.1 10203 You can now open a Shapes Demo instance on domain 0 and subscribe to Squares. You should start receiving a red Square. +Alternatively, you can also execute the test to send UDP sockets from DDS data: + +```bash +$export RTI_LD_LIBRARY_PATH=~/$NDDSHOME/lib/$CONNEXT_ARCH:~/udp_socket_adapter_typed/build/ + +$ $NDDSHOME/bin/rtiroutingservice -cfgFile RsSocketAdapter.xml -cfgName DDSToSocketAdapter +RTI Routing Service 7.3.0 executing (with configuration=DDSToSocketAdapter) +``` + +And to test the UDP socket content run: +```bash +python3 test/read_shape_from_socket.py 10203 +``` + + +## Running a data-diode example + +You can configure a data-diode scenario by using two Routing Services instances; +- One using **DDSToSocketAdapter** configuration to publish DDS data over a one direction UDP socket +- The other using **SocketAdapterToDDS** configuration to convert back to DDS samples + + ┌───────────┐ ┌─────────────┐ ┌─────────────┐ ┌───────────┐ + │ Connext │ │ Routing │ ┌────────────────┐ │ Routing │ │ Connext │ + │ App ├─►│ Service ├───►│ UDP DATA DIODE ├──►│ Service ├─►│ App │ + │ │ │ DDS TO UDP │ └────────────────┘ │ UDP TO DDS │ │ │ + └───────────┘ └─────────────┘ └─────────────┘ └───────────┘ + +To run this example in a local machine: +```bash +$export RTI_LD_LIBRARY_PATH=~/$NDDSHOME/lib/$CONNEXT_ARCH:~/udp_socket_adapter_typed/build/ + +$ $NDDSHOME/bin/rtiroutingservice -cfgFile RsSocketAdapter.xml -cfgName SocketAdapterToDDS +RTI Routing Service 7.3.0 executing (with configuration=SocketAdapterToDDS) +``` +And in a different terminal: +```bash +$export RTI_LD_LIBRARY_PATH=~/$NDDSHOME/lib/$CONNEXT_ARCH:~/udp_socket_adapter_typed/build/ + +$ $NDDSHOME/bin/rtiroutingservice -cfgFile RsSocketAdapter.xml -cfgName DDSToSocketAdapter +RTI Routing Service 7.3.0 executing (with configuration=DDSToSocketAdapter) +``` + +Using the default configuration from RsSocketAdapter.xml, you need to publish Squares +on domain id 0 and subscribe to Squares on domain id 1 using rtishapes demo: + +```bash +$ $NDDSHOME/bin/rtishapesdemo -domainId 0 +``` + +```bash +$ $NDDSHOME/bin/rtishapesdemo -domainId 1 +``` + +You should be able to see red squares in the subscriber application. +Keep in mind the shape color has been overwritten in the adapter for showcasing purposes. + ## Requirements To run this example you will need: diff --git a/examples/routing_service/udp_socket_adapter_typed/RsSocketAdapter.xml b/examples/routing_service/udp_socket_adapter_typed/RsSocketAdapter.xml index 8089e584c..2afe2220d 100644 --- a/examples/routing_service/udp_socket_adapter_typed/RsSocketAdapter.xml +++ b/examples/routing_service/udp_socket_adapter_typed/RsSocketAdapter.xml @@ -31,9 +31,8 @@ - 0 + 1 - @@ -53,7 +52,7 @@ receive_port - 10204 + 10203 @@ -73,10 +72,24 @@ + + + + + + + + + + + 0 + + + + ON_ROUTE_MATCH - ShapeType Square ShapeType Square From 231d22442f6711ebb69a41bb1c94429921f7d261 Mon Sep 17 00:00:00 2001 From: Ignacio Utrilla Date: Tue, 8 Jul 2025 13:29:40 +0200 Subject: [PATCH 5/9] Minor md formatting issues and new explanations --- .../routing_service/udp_socket_adapter_dynamic/README.md | 6 +++--- examples/routing_service/udp_socket_adapter_typed/README.md | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/examples/routing_service/udp_socket_adapter_dynamic/README.md b/examples/routing_service/udp_socket_adapter_dynamic/README.md index 73a5b00c2..08ce065b8 100644 --- a/examples/routing_service/udp_socket_adapter_dynamic/README.md +++ b/examples/routing_service/udp_socket_adapter_dynamic/README.md @@ -1,4 +1,4 @@ -# Example Code: Routing Service C++11 Socket Adapter +# Example Code: Routing Service C++11 Socket Adapter using Dynamic Data ## Example Description @@ -170,13 +170,13 @@ RTI Routing Service 7.3.0 executing (with configuration=DDSToSocketAdapter) You can configure a data-diode scenario by using two Routing Services instances; - One using **DDSToSocketAdapter** configuration to publish DDS data over a one direction UDP socket - The other using **SocketAdapterToDDS** configuration to convert back to DDS samples - +``` ┌───────────┐ ┌─────────────┐ ┌─────────────┐ ┌───────────┐ │ Connext │ │ Routing │ ┌────────────────┐ │ Routing │ │ Connext │ │ App ├─►│ Service ├───►│ UDP DATA DIODE ├──►│ Service ├─►│ App │ │ │ │ DDS TO UDP │ └────────────────┘ │ UDP TO DDS │ │ │ └───────────┘ └─────────────┘ └─────────────┘ └───────────┘ - +``` To run this example in a local machine: ```bash $export RTI_LD_LIBRARY_PATH=~/$NDDSHOME/lib/$CONNEXT_ARCH:~/udp_socket_adapter_dynamic/build/ diff --git a/examples/routing_service/udp_socket_adapter_typed/README.md b/examples/routing_service/udp_socket_adapter_typed/README.md index 3a9b5be0d..e84e8988a 100644 --- a/examples/routing_service/udp_socket_adapter_typed/README.md +++ b/examples/routing_service/udp_socket_adapter_typed/README.md @@ -186,13 +186,13 @@ python3 test/read_shape_from_socket.py 10203 You can configure a data-diode scenario by using two Routing Services instances; - One using **DDSToSocketAdapter** configuration to publish DDS data over a one direction UDP socket - The other using **SocketAdapterToDDS** configuration to convert back to DDS samples - +``` ┌───────────┐ ┌─────────────┐ ┌─────────────┐ ┌───────────┐ │ Connext │ │ Routing │ ┌────────────────┐ │ Routing │ │ Connext │ │ App ├─►│ Service ├───►│ UDP DATA DIODE ├──►│ Service ├─►│ App │ │ │ │ DDS TO UDP │ └────────────────┘ │ UDP TO DDS │ │ │ └───────────┘ └─────────────┘ └─────────────┘ └───────────┘ - +``` To run this example in a local machine: ```bash $export RTI_LD_LIBRARY_PATH=~/$NDDSHOME/lib/$CONNEXT_ARCH:~/udp_socket_adapter_typed/build/ @@ -218,7 +218,7 @@ $ $NDDSHOME/bin/rtishapesdemo -domainId 0 ```bash $ $NDDSHOME/bin/rtishapesdemo -domainId 1 ``` - +Then start publishing and subscribing to the Square topic. You should be able to see red squares in the subscriber application. Keep in mind the shape color has been overwritten in the adapter for showcasing purposes. From 68d34368065ff7c1ff40e9ad89701bd22a7a48d0 Mon Sep 17 00:00:00 2001 From: Ignacio Utrilla Date: Tue, 8 Jul 2025 13:58:15 +0200 Subject: [PATCH 6/9] Removed stray file --- .../udp_socket_adapter/test/read_shape_from_socket.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 examples/routing_service/udp_socket_adapter/test/read_shape_from_socket.py diff --git a/examples/routing_service/udp_socket_adapter/test/read_shape_from_socket.py b/examples/routing_service/udp_socket_adapter/test/read_shape_from_socket.py deleted file mode 100644 index e69de29bb..000000000 From a1e47f090c259c3b5e32a0a67c4a7e24b5417683 Mon Sep 17 00:00:00 2001 From: Ignacio Utrilla Date: Tue, 12 Aug 2025 11:06:28 +0200 Subject: [PATCH 7/9] Review changes. Minor changes applied --- .../udp_socket_adapter_dynamic/CMakeLists.txt | 2 +- .../udp_socket_adapter_dynamic/README.md | 12 ++++++------ .../udp_socket_adapter_dynamic/RsSocketAdapter.xml | 8 ++++---- .../udp_socket_adapter_dynamic/src/SocketAdapter.cxx | 2 +- .../udp_socket_adapter_dynamic/src/SocketAdapter.hpp | 2 +- .../src/SocketConnection.cxx | 6 ++++-- .../src/SocketConnection.hpp | 2 +- .../src/SocketInputDiscoveryStreamReader.cxx | 2 +- .../src/SocketInputDiscoveryStreamReader.hpp | 2 +- .../src/SocketStreamReader.cxx | 11 ++++------- .../src/SocketStreamReader.hpp | 2 +- .../src/SocketStreamWriter.cxx | 2 +- .../src/SocketStreamWriter.hpp | 9 ++++----- .../udp_socket_adapter_dynamic/src/UdpSocket.cxx | 2 +- .../udp_socket_adapter_dynamic/src/UdpSocket.hpp | 2 +- .../udp_socket_adapter_typed/CMakeLists.txt | 2 +- .../udp_socket_adapter_typed/RsSocketAdapter.xml | 8 ++++---- .../udp_socket_adapter_typed/src/SocketAdapter.cxx | 2 +- .../udp_socket_adapter_typed/src/SocketAdapter.hpp | 2 +- .../src/SocketConnection.cxx | 2 +- .../src/SocketConnection.hpp | 2 +- .../src/SocketInputDiscoveryStreamReader.cxx | 2 +- .../src/SocketInputDiscoveryStreamReader.hpp | 2 +- .../src/SocketStreamReader.cxx | 2 +- .../src/SocketStreamReader.hpp | 2 +- .../src/SocketStreamWriter.cxx | 2 +- .../src/SocketStreamWriter.hpp | 2 +- .../udp_socket_adapter_typed/src/UdpSocket.cxx | 2 +- .../udp_socket_adapter_typed/src/UdpSocket.hpp | 2 +- 29 files changed, 49 insertions(+), 51 deletions(-) diff --git a/examples/routing_service/udp_socket_adapter_dynamic/CMakeLists.txt b/examples/routing_service/udp_socket_adapter_dynamic/CMakeLists.txt index 914900e91..0fc9b611b 100644 --- a/examples/routing_service/udp_socket_adapter_dynamic/CMakeLists.txt +++ b/examples/routing_service/udp_socket_adapter_dynamic/CMakeLists.txt @@ -1,5 +1,5 @@ # -# (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. +# (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. # # RTI grants Licensee a license to use, modify, compile, and create derivative # works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter_dynamic/README.md b/examples/routing_service/udp_socket_adapter_dynamic/README.md index 08ce065b8..9ab2ece60 100644 --- a/examples/routing_service/udp_socket_adapter_dynamic/README.md +++ b/examples/routing_service/udp_socket_adapter_dynamic/README.md @@ -11,21 +11,21 @@ information beforehand. The code in this directory provides the following components: - `src/SocketAdapter` implements the plugin that is loaded by *RTI Routing -Service*. It is responsible to create and delete connections. +Service*. It responsible for creating and deleting connections. - `src/SocketConnection` implements a connection. This component is -responsible of the creation and deletion of `StreamReaders`. +responsible for the creation and deletion of `StreamReaders`. - `src/SocketInputDiscoveryStreamReader` implements the logic necessary to propagate information about the discovered input streams (in this case sockets) to the Routing Service. -- `src/SocketStreamReader` implements an `StreamReader` that reads sample +- `src/SocketStreamReader` implements a `StreamReader` that reads sample information from a UDP socket. -- `src/SocketStreamWriter` implements an `StreamWriter` that sends sample +- `src/SocketStreamWriter` implements a `StreamWriter` that sends sample information to a UDP socket. For more details, please refer to the *RTI Routing Service SDK* documentation. -## Building C++ example +## Building the C++ example In order to build this example, you need to define the variables `CONNEXTDDS_DIR` and `CONNEXTDDS_ARCH`. You can do so by exporting them @@ -97,7 +97,7 @@ cmake -DCONNEXTDDS_DIR= -DCMAKE_TOOLCHAIN_FILE= .. ``` -## Running C++ example +## Running the C++ example To run the example, you just need to run the following commands from the top level folder. This example has been written to allow easy experimentation with diff --git a/examples/routing_service/udp_socket_adapter_dynamic/RsSocketAdapter.xml b/examples/routing_service/udp_socket_adapter_dynamic/RsSocketAdapter.xml index bccfa7a9a..246b3ff4e 100644 --- a/examples/routing_service/udp_socket_adapter_dynamic/RsSocketAdapter.xml +++ b/examples/routing_service/udp_socket_adapter_dynamic/RsSocketAdapter.xml @@ -14,12 +14,12 @@ - + 1 - + @@ -57,12 +57,12 @@ - + 0 - + diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketAdapter.cxx b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketAdapter.cxx index a649473f0..a73226590 100644 --- a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketAdapter.cxx +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketAdapter.cxx @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketAdapter.hpp b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketAdapter.hpp index aec7eee95..0042adae7 100644 --- a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketAdapter.hpp +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketAdapter.hpp @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketConnection.cxx b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketConnection.cxx index ca7b36199..2b7eb7434 100644 --- a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketConnection.cxx +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketConnection.cxx @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form @@ -50,9 +50,11 @@ void SocketConnection::delete_stream_reader(StreamReader *reader) socket_reader->shutdown_socket_reader_thread(); delete reader; } + void SocketConnection::delete_stream_writer(StreamWriter *writer) { - SocketStreamWriter *socket_writer = dynamic_cast(writer); + SocketStreamWriter *socket_writer = + dynamic_cast(writer); delete writer; } diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketConnection.hpp b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketConnection.hpp index e1171cf3a..ae49f6c0c 100644 --- a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketConnection.hpp +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketConnection.hpp @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketInputDiscoveryStreamReader.cxx b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketInputDiscoveryStreamReader.cxx index cd21be8a0..a55323e11 100644 --- a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketInputDiscoveryStreamReader.cxx +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketInputDiscoveryStreamReader.cxx @@ -1,5 +1,5 @@ /* - * (c) 2019 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketInputDiscoveryStreamReader.hpp b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketInputDiscoveryStreamReader.hpp index b178118e1..7f2bdb373 100644 --- a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketInputDiscoveryStreamReader.hpp +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketInputDiscoveryStreamReader.hpp @@ -1,5 +1,5 @@ /* - * (c) 2019 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.cxx b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.cxx index 4c6038268..2b183d1cf 100644 --- a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.cxx +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.cxx @@ -1,5 +1,5 @@ /* - * (c) 2019 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form @@ -54,12 +54,8 @@ void SocketStreamReader::socket_reading_thread() std::this_thread::sleep_for(std::chrono::milliseconds(10)); continue; } - /** - * Here we notify Routing Service, that there is data available - * on the StreamReader, triggering a call to take(). - */ - received_bytes_ = received_bytes; + received_bytes_ = received_bytes; reader_listener_->on_data_available(this); } @@ -100,7 +96,8 @@ void SocketStreamReader::take( std::vector &infos) { dds::core::xtypes::DynamicData deserialized_sample(*adapter_type_); - std::vector received_buffer = std::vector(received_buffer_, received_buffer_ + received_bytes_); + std::vector received_buffer = std::vector( + received_buffer_, received_buffer_ + received_bytes_); rti::core::xtypes::from_cdr_buffer(deserialized_sample, received_buffer); samples.resize(1); diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.hpp b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.hpp index e09af73e5..1975737ed 100644 --- a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.hpp +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.hpp @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.cxx b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.cxx index 10deb4726..cbc542d8d 100644 --- a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.cxx +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.cxx @@ -1,5 +1,5 @@ /* - * (c) 2019 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.hpp b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.hpp index a464f8aec..c1934094b 100644 --- a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.hpp +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.hpp @@ -1,5 +1,5 @@ /* - * (c) 2019 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form @@ -59,18 +59,17 @@ class SocketStreamWriter : public rti::routing::adapter::DynamicDataStreamWriter * @brief Function used by socketreader_thread_ to read samples from the * socket. */ - SocketConnection *socket_connection_; std::unique_ptr socket; int send_port_; - int dest_port_; + int dest_port_; std::string send_address_; - std::string dest_address_; - rti::routing::StreamInfo stream_info_; + std::string dest_address_; + rti::routing::StreamInfo stream_info_; dds::core::xtypes::DynamicType *adapter_type_; }; diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/UdpSocket.cxx b/examples/routing_service/udp_socket_adapter_dynamic/src/UdpSocket.cxx index 164468c8c..8da702a3d 100644 --- a/examples/routing_service/udp_socket_adapter_dynamic/src/UdpSocket.cxx +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/UdpSocket.cxx @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/UdpSocket.hpp b/examples/routing_service/udp_socket_adapter_dynamic/src/UdpSocket.hpp index ba03954ed..7fdb59d06 100644 --- a/examples/routing_service/udp_socket_adapter_dynamic/src/UdpSocket.hpp +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/UdpSocket.hpp @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter_typed/CMakeLists.txt b/examples/routing_service/udp_socket_adapter_typed/CMakeLists.txt index 914900e91..0fc9b611b 100644 --- a/examples/routing_service/udp_socket_adapter_typed/CMakeLists.txt +++ b/examples/routing_service/udp_socket_adapter_typed/CMakeLists.txt @@ -1,5 +1,5 @@ # -# (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. +# (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. # # RTI grants Licensee a license to use, modify, compile, and create derivative # works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter_typed/RsSocketAdapter.xml b/examples/routing_service/udp_socket_adapter_typed/RsSocketAdapter.xml index 2afe2220d..15cfeb391 100644 --- a/examples/routing_service/udp_socket_adapter_typed/RsSocketAdapter.xml +++ b/examples/routing_service/udp_socket_adapter_typed/RsSocketAdapter.xml @@ -28,12 +28,12 @@ - + 1 - + @@ -79,12 +79,12 @@ - + 0 - + diff --git a/examples/routing_service/udp_socket_adapter_typed/src/SocketAdapter.cxx b/examples/routing_service/udp_socket_adapter_typed/src/SocketAdapter.cxx index a649473f0..a73226590 100644 --- a/examples/routing_service/udp_socket_adapter_typed/src/SocketAdapter.cxx +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketAdapter.cxx @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter_typed/src/SocketAdapter.hpp b/examples/routing_service/udp_socket_adapter_typed/src/SocketAdapter.hpp index aec7eee95..0042adae7 100644 --- a/examples/routing_service/udp_socket_adapter_typed/src/SocketAdapter.hpp +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketAdapter.hpp @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter_typed/src/SocketConnection.cxx b/examples/routing_service/udp_socket_adapter_typed/src/SocketConnection.cxx index 20f1cb2bd..c8f24a6fd 100644 --- a/examples/routing_service/udp_socket_adapter_typed/src/SocketConnection.cxx +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketConnection.cxx @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter_typed/src/SocketConnection.hpp b/examples/routing_service/udp_socket_adapter_typed/src/SocketConnection.hpp index e1171cf3a..ae49f6c0c 100644 --- a/examples/routing_service/udp_socket_adapter_typed/src/SocketConnection.hpp +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketConnection.hpp @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter_typed/src/SocketInputDiscoveryStreamReader.cxx b/examples/routing_service/udp_socket_adapter_typed/src/SocketInputDiscoveryStreamReader.cxx index 79c3dddad..ecfd9c68e 100644 --- a/examples/routing_service/udp_socket_adapter_typed/src/SocketInputDiscoveryStreamReader.cxx +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketInputDiscoveryStreamReader.cxx @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter_typed/src/SocketInputDiscoveryStreamReader.hpp b/examples/routing_service/udp_socket_adapter_typed/src/SocketInputDiscoveryStreamReader.hpp index 48c75f337..5103762d9 100644 --- a/examples/routing_service/udp_socket_adapter_typed/src/SocketInputDiscoveryStreamReader.hpp +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketInputDiscoveryStreamReader.hpp @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamReader.cxx b/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamReader.cxx index dbc2675c3..efec348ad 100644 --- a/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamReader.cxx +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamReader.cxx @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamReader.hpp b/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamReader.hpp index 5ab1e65a6..b3adf6d48 100644 --- a/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamReader.hpp +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamReader.hpp @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamWriter.cxx b/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamWriter.cxx index 585c0680b..90d66d14f 100644 --- a/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamWriter.cxx +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamWriter.cxx @@ -1,5 +1,5 @@ /* - * (c) 2019 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamWriter.hpp b/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamWriter.hpp index 0095a97a1..053a92810 100644 --- a/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamWriter.hpp +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamWriter.hpp @@ -1,5 +1,5 @@ /* - * (c) 2019 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.cxx b/examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.cxx index 164468c8c..8da702a3d 100644 --- a/examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.cxx +++ b/examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.cxx @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.hpp b/examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.hpp index ba03954ed..7fdb59d06 100644 --- a/examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.hpp +++ b/examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.hpp @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form From 49489ea7b91c7d38dc427cb34f877daf21c1ad38 Mon Sep 17 00:00:00 2001 From: Ignacio Utrilla Date: Thu, 14 Aug 2025 12:28:03 +0200 Subject: [PATCH 8/9] Review changes. Stream writer and reader improvements --- .../RsSocketAdapter.xml | 8 ++--- .../src/SocketStreamReader.cxx | 23 +++++++++++--- .../src/SocketStreamReader.hpp | 5 ++-- .../src/SocketStreamWriter.cxx | 30 +++---------------- .../src/SocketStreamWriter.hpp | 13 ++------ 5 files changed, 32 insertions(+), 47 deletions(-) diff --git a/examples/routing_service/udp_socket_adapter_dynamic/RsSocketAdapter.xml b/examples/routing_service/udp_socket_adapter_dynamic/RsSocketAdapter.xml index 246b3ff4e..942f400db 100644 --- a/examples/routing_service/udp_socket_adapter_dynamic/RsSocketAdapter.xml +++ b/examples/routing_service/udp_socket_adapter_dynamic/RsSocketAdapter.xml @@ -14,12 +14,12 @@ - + 1 - + @@ -57,12 +57,12 @@ - + 0 - + diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.cxx b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.cxx index 2b183d1cf..ee7b2cada 100644 --- a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.cxx +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.cxx @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -55,7 +56,10 @@ void SocketStreamReader::socket_reading_thread() continue; } - received_bytes_ = received_bytes; + { + std::lock_guard lock(buffer_mutex_); + received_buffers_.emplace(received_buffer_, received_buffer_ + received_bytes); + } reader_listener_->on_data_available(this); } @@ -95,10 +99,21 @@ void SocketStreamReader::take( std::vector &samples, std::vector &infos) { + std::vector buffer; + { + std::unique_lock lock(buffer_mutex_); + if (received_buffers_.empty()) { + // No data available + samples.clear(); + infos.clear(); + return; + } + buffer = std::move(received_buffers_.front()); + received_buffers_.pop(); + } + dds::core::xtypes::DynamicData deserialized_sample(*adapter_type_); - std::vector received_buffer = std::vector( - received_buffer_, received_buffer_ + received_bytes_); - rti::core::xtypes::from_cdr_buffer(deserialized_sample, received_buffer); + rti::core::xtypes::from_cdr_buffer(deserialized_sample, buffer); samples.resize(1); infos.resize(1); diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.hpp b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.hpp index 1975737ed..0dc717fff 100644 --- a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.hpp +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.hpp @@ -16,6 +16,7 @@ #include #include #include +#include #include "SocketConnection.hpp" #include "UdpSocket.hpp" @@ -65,8 +66,8 @@ class SocketStreamReader : public rti::routing::adapter::DynamicDataStreamReader std::ifstream input_socket_stream_; std::string receive_address_; int receive_port_; - char received_buffer_[BUFFER_MAX_SIZE]; // Value that's high enough - int received_bytes_; + char received_buffer_[BUFFER_MAX_SIZE]; + std::queue> received_buffers_; std::mutex buffer_mutex_; rti::routing::StreamInfo stream_info_; diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.cxx b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.cxx index cbc542d8d..8d08b0665 100644 --- a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.cxx +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.cxx @@ -83,12 +83,12 @@ int SocketStreamWriter::write( { size_t len = 0; for (const auto sample : samples) { - std::vector buffer; - rti::core::xtypes::to_cdr_buffer(buffer, *sample); + serialization_buffer_.clear(); + rti::core::xtypes::to_cdr_buffer(serialization_buffer_, *sample); // Send the serialized data len = socket->send_data( - buffer.data(), - buffer.size(), + serialization_buffer_.data(), + serialization_buffer_.size(), dest_address_.c_str(), dest_port_); } @@ -96,28 +96,6 @@ int SocketStreamWriter::write( return len; } -int SocketStreamWriter::write( - const std::vector &samples, - const std::vector &infos, - const SelectorState &selector_state) -{ - int len; - len = write(samples, infos); - return len; -} - -void SocketStreamWriter::return_loan( - std::vector &samples, - std::vector &infos) -{ - for (int i = 0; i < samples.size(); ++i) { - delete samples[i]; - delete infos[i]; - } - samples.clear(); - infos.clear(); -} - SocketStreamWriter::~SocketStreamWriter() { } diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.hpp b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.hpp index c1934094b..fabff566e 100644 --- a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.hpp +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.hpp @@ -38,18 +38,9 @@ class SocketStreamWriter : public rti::routing::adapter::DynamicDataStreamWriter const rti::routing::PropertySet & ); - virtual int - write(const std::vector &, - const std::vector &) final; - virtual int write( const std::vector &, - const std::vector &, - const rti::routing::adapter::SelectorState &selector_state) final; - - virtual void return_loan( - std::vector &, - std::vector &) final; + const std::vector &) final; ~SocketStreamWriter(); @@ -61,7 +52,7 @@ class SocketStreamWriter : public rti::routing::adapter::DynamicDataStreamWriter */ SocketConnection *socket_connection_; - + std::vector serialization_buffer_; std::unique_ptr socket; int send_port_; From e25f691b852f934865824c371e47b4b65c53e83b Mon Sep 17 00:00:00 2001 From: Ignacio Utrilla Date: Thu, 14 Aug 2025 15:07:05 +0200 Subject: [PATCH 9/9] Review changes. Added descriptions to classes. MInor fixes --- .../RsSocketAdapter.xml | 2 -- .../src/SocketStreamReader.cxx | 6 +++--- .../src/SocketStreamReader.hpp | 13 +++++++++++++ .../src/SocketStreamWriter.hpp | 17 +++++++++++++---- .../src/UdpSocket.hpp | 14 ++++++++++++++ .../udp_socket_adapter_typed/src/UdpSocket.hpp | 14 ++++++++++++++ 6 files changed, 57 insertions(+), 9 deletions(-) diff --git a/examples/routing_service/udp_socket_adapter_dynamic/RsSocketAdapter.xml b/examples/routing_service/udp_socket_adapter_dynamic/RsSocketAdapter.xml index 942f400db..5a693012f 100644 --- a/examples/routing_service/udp_socket_adapter_dynamic/RsSocketAdapter.xml +++ b/examples/routing_service/udp_socket_adapter_dynamic/RsSocketAdapter.xml @@ -14,12 +14,10 @@ - 1 - diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.cxx b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.cxx index ee7b2cada..b040e958b 100644 --- a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.cxx +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.cxx @@ -99,7 +99,7 @@ void SocketStreamReader::take( std::vector &samples, std::vector &infos) { - std::vector buffer; + take_buffer_.clear(); { std::unique_lock lock(buffer_mutex_); if (received_buffers_.empty()) { @@ -108,12 +108,12 @@ void SocketStreamReader::take( infos.clear(); return; } - buffer = std::move(received_buffers_.front()); + take_buffer_ = std::move(received_buffers_.front()); received_buffers_.pop(); } dds::core::xtypes::DynamicData deserialized_sample(*adapter_type_); - rti::core::xtypes::from_cdr_buffer(deserialized_sample, buffer); + rti::core::xtypes::from_cdr_buffer(deserialized_sample, take_buffer_); samples.resize(1); infos.resize(1); diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.hpp b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.hpp index 0dc717fff..21517a9ff 100644 --- a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.hpp +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.hpp @@ -28,6 +28,18 @@ #define RECEIVE_ADDRESS_STRING "receive_address" #define RECEIVE_PORT_STRING "receive_port" +/** + * @brief StreamReader implementation for UDP socket input in RTI Routing Service. + * + * SocketStreamReader is a specific implementation of rti::routing::adapter::DynamicDataStreamReader + * that receives data from a UDP socket and makes it available to RTI Routing Service as DynamicData samples. + * + * This class manages a background thread to continuously read UDP packets from a specified address and port, + * buffering received data for consumption by the Routing Service. It supports thread-safe queuing of incoming + * data, loaning and returning DynamicData samples, and clean shutdown of the reading thread. + * + */ + class SocketStreamReader : public rti::routing::adapter::DynamicDataStreamReader { public: SocketStreamReader( @@ -69,6 +81,7 @@ class SocketStreamReader : public rti::routing::adapter::DynamicDataStreamReader char received_buffer_[BUFFER_MAX_SIZE]; std::queue> received_buffers_; std::mutex buffer_mutex_; + std::vector take_buffer_; rti::routing::StreamInfo stream_info_; dds::core::xtypes::DynamicType *adapter_type_; diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.hpp b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.hpp index fabff566e..39fc32af8 100644 --- a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.hpp +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.hpp @@ -30,6 +30,19 @@ #define DEST_ADDRESS_STRING "dest_address" #define DEST_PORT_STRING "dest_port" +/** + * @brief StreamWriter implementation for UDP socket output in RTI Routing Service. + * + * SocketStreamWriter is a specific implementation of rti::routing::adapter::DynamicDataStreamWriter + * that sends data to a UDP socket, making it available for external consumers outside DDS. + * + * This class is responsible for serializing DynamicData samples received from Routing Service + * and transmitting them as UDP packets to a specified destination address and port. + * It manages socket creation, serialization buffers, and the configuration of destination + * parameters via properties. + * + */ + class SocketStreamWriter : public rti::routing::adapter::DynamicDataStreamWriter { public: explicit SocketStreamWriter( @@ -46,10 +59,6 @@ class SocketStreamWriter : public rti::routing::adapter::DynamicDataStreamWriter private: - /** - * @brief Function used by socketreader_thread_ to read samples from the - * socket. - */ SocketConnection *socket_connection_; std::vector serialization_buffer_; diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/UdpSocket.hpp b/examples/routing_service/udp_socket_adapter_dynamic/src/UdpSocket.hpp index 7fdb59d06..024429e1b 100644 --- a/examples/routing_service/udp_socket_adapter_dynamic/src/UdpSocket.hpp +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/UdpSocket.hpp @@ -31,6 +31,20 @@ #pragma comment(lib, "ws2_32.lib") #endif +/** + * @brief Utility class for UDP socket communication in the RTI Routing Service UDP Socket Adapter. + * + * UdpSocket provides a lightweight abstraction for UDP socket communication, + * supporting both Windows and POSIX systems. It is designed to be used by the Routing Service + * UDP socket adapter to send and receive raw UDP packets as part of data bridging between + * external UDP sources and DDS. + * + * The class handles socket creation, binding to a specified IP address and port, and + * ensures non-blocking operation for efficient integration with multi-threaded applications. + * It provides methods for receiving data from any UDP client and for sending data to a + * specified destination address and port. + */ + class UdpSocket { public: UdpSocket(const char* ip, int port); diff --git a/examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.hpp b/examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.hpp index 7fdb59d06..024429e1b 100644 --- a/examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.hpp +++ b/examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.hpp @@ -31,6 +31,20 @@ #pragma comment(lib, "ws2_32.lib") #endif +/** + * @brief Utility class for UDP socket communication in the RTI Routing Service UDP Socket Adapter. + * + * UdpSocket provides a lightweight abstraction for UDP socket communication, + * supporting both Windows and POSIX systems. It is designed to be used by the Routing Service + * UDP socket adapter to send and receive raw UDP packets as part of data bridging between + * external UDP sources and DDS. + * + * The class handles socket creation, binding to a specified IP address and port, and + * ensures non-blocking operation for efficient integration with multi-threaded applications. + * It provides methods for receiving data from any UDP client and for sending data to a + * specified destination address and port. + */ + class UdpSocket { public: UdpSocket(const char* ip, int port);