diff --git a/examples/connext_dds/property_qos/py/README.md b/examples/connext_dds/property_qos/py/README.md new file mode 100644 index 000000000..277e66594 --- /dev/null +++ b/examples/connext_dds/property_qos/py/README.md @@ -0,0 +1,78 @@ +# Example code: Using Property QoS + +If you haven't used the RTI Connext Python API before, first check the +[Getting Started Guide](https://community.rti.com/static/documentation/connext-dds/7.0.0/doc/manuals/connext_dds_professional/getting_started_guide/index.html). + +## Building the Example :wrench: + +First use **rtiddsgen** to generate the python type file from +`numbers.idl`: + +```sh +/bin/rtiddsgen -language python -platform universal numbers.idl +``` + +Where `` refers to your RTI Connext installation. + +You will see messages that look like: + +```plaintext +INFO com.rti.ndds.nddsgen.Main Running rtiddsgen version 4.3.0, please wait ... +WARN com.rti.ndds.nddsgen.emitters.FileEmitter File exists and will not be overwritten : some/path/numbers.py +INFO com.rti.ndds.nddsgen.Main Done +``` + +This is normal and is only informing you that some of the files that **rtiddsgen** +can generate were already available in the repository. + +## Running the Example + +In two separate command prompt windows for the publisher and subscriber. Run the +following commands from the example directory (this is necessary to ensure the +application loads the QoS defined in USER_QOS_PROFILES.xml): + +```sh +python numbers_publisher.py +python numbers_subscriber.py +``` + +The applications accept up to two arguments: + +1. The ``. Both applications must use the same domain id in order to + communicate. The default is 0. + +2. How long the examples should run, measured in samples. The default is + infinite. + +While generating the output below, we used values that would capture the most +interesting behavior. + +```plaintext +Publisher Output +================ +Ok, send_socket_buffer_size....modified +Ok, recv_socket_buffer_size....modified +New UDPv4 send socket buffer size is: 65507 +New UDPv4 receive socket buffer size is: 65507 +Writing numbers, count 0 +Writing numbers, count 1 +Writing numbers, count 2 +Writing numbers, count 3 + +Subscriber Output +================= +Ok, send_socket_buffer_size....modified +Ok, recv_socket_buffer_size....modified +New UDPv4 send socket buffer size is: 65507 +New UDPv4 receive socket buffer size is: 65507 +numbers subscriber sleeping for 4 sec... + number: 500 + halfNumber: 250.000000 +numbers subscriber sleeping for 4 sec... + number: 250 + halfNumber: 125.000000 +numbers subscriber sleeping for 4 sec... + number: 125 + halfNumber: 62.500000 +numbers subscriber sleeping for 4 sec... +``` \ No newline at end of file diff --git a/examples/connext_dds/property_qos/py/USER_QOS_PROFILES.xml b/examples/connext_dds/property_qos/py/USER_QOS_PROFILES.xml new file mode 100644 index 000000000..c9aeeea7c --- /dev/null +++ b/examples/connext_dds/property_qos/py/USER_QOS_PROFILES.xml @@ -0,0 +1,102 @@ + + + + + + + + + + + + + RELIABLE_RELIABILITY_QOS + + 60 + + + + + KEEP_ALL_HISTORY_QOS + + + + + 50 + 50 + + + + + + + + + + RELIABLE_RELIABILITY_QOS + + + + KEEP_ALL_HISTORY_QOS + + + + + + + + RTI Property QoS Example + + + + + + dds.transport.UDPv4.builtin.recv_socket_buffer_size + 65507 + + + dds.transport.UDPv4.builtin.send_socket_buffer_size + 65507 + + + + + + + + + diff --git a/examples/connext_dds/property_qos/py/numbers.idl b/examples/connext_dds/property_qos/py/numbers.idl new file mode 100644 index 000000000..38b997f93 --- /dev/null +++ b/examples/connext_dds/property_qos/py/numbers.idl @@ -0,0 +1,16 @@ +/* + * (c) 2013-2019 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +struct numbers { + int32 number; + float halfNumber; +}; diff --git a/examples/connext_dds/property_qos/py/numbers_publisher.py b/examples/connext_dds/property_qos/py/numbers_publisher.py new file mode 100644 index 000000000..6f2839d6f --- /dev/null +++ b/examples/connext_dds/property_qos/py/numbers_publisher.py @@ -0,0 +1,119 @@ + +# (c) Copyright, Real-Time Innovations, 2022. All rights reserved. +# RTI grants Licensee a license to use, modify, compile, and create derivative +# works of the software solely for use with RTI Connext DDS. Licensee may +# redistribute copies of the software provided that all such copies are subject +# to this license. The software is provided "as is", with no warranty of any +# type, including any warranty for fitness for any purpose. RTI is under no +# obligation to maintain or support the software. RTI shall not be liable for +# any incidental or consequential damages arising out of the use or inability +# to use the software. + +import time +import sys +import rti.connextdds as dds + +from numbers import numbers + +class numbersPublisher: + + NEW_SOCKET_BUFFER_SIZE_STRING = "65507" + + @staticmethod + def create_participant(domain_id: int): + + participant_qos = dds.DomainParticipantQos() + + # Set the send socket buffer size + participant_qos.property["dds.transport.UDPv4.builtin.send_socket_buffer_size"] = numbersPublisher.NEW_SOCKET_BUFFER_SIZE_STRING + + # Set the recv socket buffer size + participant_qos.property["dds.transport.UDPv4.builtin.recv_socket_buffer_size"] = numbersPublisher.NEW_SOCKET_BUFFER_SIZE_STRING + + # Create the participant + participant = dds.DomainParticipant(domain_id, participant_qos) + + return participant + + + @staticmethod + def check_participant(participant: dds.DomainParticipant): + participant_qos = participant.qos + send_socket = participant_qos.property.get("dds.transport.UDPv4.builtin.send_socket_buffer_size") + recv_socket = participant_qos.property.try_get("dds.transport.UDPv4.builtin.recv_socket_buffer_size") + + if send_socket is None: + print("Error impossible to find send_socket_buffer_size") + return False + elif send_socket != numbersPublisher.NEW_SOCKET_BUFFER_SIZE_STRING: + print("Error, send_socket_buffer_size...not modified") + return False + else: + print("Ok, send_socket_buffer_size....modified") + + if recv_socket is None: + print("Error impossible to find recv_socket_buffer_size") + return False + elif recv_socket != numbersPublisher.NEW_SOCKET_BUFFER_SIZE_STRING: + print("Error, recv_socket_buffer_size...not modified") + return False + else: + print("Ok, recv_socket_buffer_size....modified") + + print(f"New UDPv4 send socket buffer size is: {send_socket}") + print(f"New UDPv4 receive socket buffer size is: {recv_socket}") + + return True + + @staticmethod + def run_publisher(domain_id: int, sample_count: int): + + # A DomainParticipant allows an application to begin communicating in + # a DDS domain. Typically there is one DomainParticipant per application. + # DomainParticipant QoS is configured in USER_QOS_PROFILES.xml + participant = dds.DomainParticipant(domain_id) + + # If you want to change the DomainParticipant's QoS programmatically + # rather than using the XML file, you will need to add the following lines + # to your code and comment out the create_participant call above. + + # In this case, we set the transport settings in the XML by default, but + # in the create_participant call, we set up the transport + # properties using the Properties QoS in code. + + # participant = numbersPublisher.create_participant(domain_id) + + if(numbersPublisher.check_participant(participant) != True): + print("check_participant error") + + # A Topic has a name and a datatype. + topic = dds.Topic(participant, "Example numbers", numbers) + + # This DataWriter will write data on Topic "Example numbers" + # DataWriter QoS is configured in USER_QOS_PROFILES.xml + writer = dds.DataWriter(participant.implicit_publisher, topic) + sample = numbers() + + sample.number = 1000 + sample.halfNumber = sample.number / 2 + + for count in range(sample_count): + # Catch control-C interrupt + try: + print(f"Writing numbers, count {count}") + writer.write(sample) + + sample.number = int(sample.halfNumber) + sample.halfNumber = sample.number / 2 + + time.sleep(1) + except KeyboardInterrupt: + break + + print("preparing to shut down...") + + +if __name__ == "__main__": + numbersPublisher.run_publisher( + domain_id=0, + sample_count=sys.maxsize) diff --git a/examples/connext_dds/property_qos/py/numbers_subscriber.py b/examples/connext_dds/property_qos/py/numbers_subscriber.py new file mode 100644 index 000000000..941614036 --- /dev/null +++ b/examples/connext_dds/property_qos/py/numbers_subscriber.py @@ -0,0 +1,145 @@ + +# (c) Copyright, Real-Time Innovations, 2022. All rights reserved. +# RTI grants Licensee a license to use, modify, compile, and create derivative +# works of the software solely for use with RTI Connext DDS. Licensee may +# redistribute copies of the software provided that all such copies are subject +# to this license. The software is provided "as is", with no warranty of any +# type, including any warranty for fitness for any purpose. RTI is under no +# obligation to maintain or support the software. RTI shall not be liable for +# any incidental or consequential damages arising out of the use or inability +# to use the software. + +import time +import sys +import rti.connextdds as dds +from numbers import numbers + +class numbersSubscriber: + + NEW_SOCKET_BUFFER_SIZE_STRING = "65507" + + @staticmethod + def create_participant(domain_id: int): + + participant_qos = dds.DomainParticipantQos() + + # Set the send socket buffer size + participant_qos.property["dds.transport.UDPv4.builtin.send_socket_buffer_size"] = numbersSubscriber.NEW_SOCKET_BUFFER_SIZE_STRING + + # Set the recv socket buffer size + participant_qos.property["dds.transport.UDPv4.builtin.recv_socket_buffer_size"] = numbersSubscriber.NEW_SOCKET_BUFFER_SIZE_STRING + + # Create the participant + participant = dds.DomainParticipant(domain_id, participant_qos) + + return participant + + + @staticmethod + def check_participant(participant: dds.DomainParticipant): + participant_qos = participant.qos + send_socket = participant_qos.property.get("dds.transport.UDPv4.builtin.send_socket_buffer_size") + recv_socket = participant_qos.property.try_get("dds.transport.UDPv4.builtin.recv_socket_buffer_size") + + if send_socket is None: + print("Error impossible to find send_socket_buffer_size") + return False + elif send_socket != numbersSubscriber.NEW_SOCKET_BUFFER_SIZE_STRING: + print("Error, send_socket_buffer_size...not modified") + return False + else: + print("Ok, send_socket_buffer_size....modified") + + if recv_socket is None: + print("Error impossible to find recv_socket_buffer_size") + return False + elif recv_socket != numbersSubscriber.NEW_SOCKET_BUFFER_SIZE_STRING: + print("Error, recv_socket_buffer_size...not modified") + return False + else: + print("Ok, recv_socket_buffer_size....modified") + + print(f"New UDPv4 send socket buffer size is: {send_socket}") + print(f"New UDPv4 receive socket buffer size is: {recv_socket}") + + return True + + @staticmethod + def process_data(reader): + # take_data() returns copies of all the data samples in the reader + # and removes them. To also take the SampleInfo meta-data, use take(). + # To not remove the data from the reader, use read_data() or read(). + samples = reader.take_data() + for sample in samples: + print(f"Received: {sample}") + + return len(samples) + + @staticmethod + def run_subscriber(domain_id: int, sample_count: int): + + # A DomainParticipant allows an application to begin communicating in + # a DDS domain. Typically there is one DomainParticipant per application. + # DomainParticipant QoS is configured in USER_QOS_PROFILES.xml + participant = dds.DomainParticipant(domain_id) + + # If you want to change the DomainParticipant's QoS programmatically + # rather than using the XML file, you will need to add the following lines + # to your code and comment out the create_participant call above. + + # In this case, we set the transport settings in the XML by default, but + # in the create_participant call, we set up the transport + # properties using the Properties QoS in code. + + # participant = numbersPublisher.create_participant(domain_id) + + if(numbersSubscriber.check_participant(participant) != True): + print("check_participant error") + + # A Topic has a name and a datatype. + topic = dds.Topic(participant, "Example numbers", numbers) + + # This DataReader reads data on Topic "Example numbers". + # DataReader QoS is configured in USER_QOS_PROFILES.xml + reader = dds.DataReader(participant.implicit_subscriber, topic) + + # Initialize samples_read to zero + samples_read = 0 + + # Associate a handler with the status condition. This will run when the + # condition is triggered, in the context of the dispatch call (see below) + # condition argument is not used + def condition_handler(_): + nonlocal samples_read + nonlocal reader + samples_read += numbersSubscriber.process_data(reader) + + # Obtain the DataReader's Status Condition + status_condition = dds.StatusCondition(reader) + + # Enable the "data available" status and set the handler. + status_condition.enabled_statuses = dds.StatusMask.DATA_AVAILABLE + status_condition.set_handler(condition_handler) + + # Create a WaitSet and attach the StatusCondition + waitset = dds.WaitSet() + waitset += status_condition + + while samples_read < sample_count: + # Catch control-C interrupt + try: + # Dispatch will call the handlers associated to the WaitSet conditions + # when they activate + print("Hello World subscriber sleeping for 1 seconds...") + + waitset.dispatch(dds.Duration(1)) # Wait up to 1s each time + except KeyboardInterrupt: + break + + print("preparing to shut down...") + + +if __name__ == "__main__": + numbersSubscriber.run_subscriber( + domain_id=0, + sample_count=sys.maxsize)