diff --git a/gr-zeromq/lib/base_impl.cc b/gr-zeromq/lib/base_impl.cc index 1973fb8963d..8f29b6d1d9e 100644 --- a/gr-zeromq/lib/base_impl.cc +++ b/gr-zeromq/lib/base_impl.cc @@ -85,7 +85,12 @@ base_impl::base_impl(int type, } } -base_impl::~base_impl() {} +base_impl::~base_impl() +{ + d_context.shutdown(); + d_socket.close(); + d_context.close(); +} std::string base_impl::last_endpoint() { diff --git a/gr-zeromq/lib/pub_msg_sink_impl.cc b/gr-zeromq/lib/pub_msg_sink_impl.cc index 2f944bee684..6879fad2d02 100644 --- a/gr-zeromq/lib/pub_msg_sink_impl.cc +++ b/gr-zeromq/lib/pub_msg_sink_impl.cc @@ -55,7 +55,12 @@ pub_msg_sink_impl::pub_msg_sink_impl(char* address, int timeout, bool bind) set_msg_handler(pmt::mp("in"), [this](pmt::pmt_t msg) { this->handler(msg); }); } -pub_msg_sink_impl::~pub_msg_sink_impl() {} +pub_msg_sink_impl::~pub_msg_sink_impl() +{ + d_context.shutdown(); + d_socket.close(); + d_context.close(); +} void pub_msg_sink_impl::handler(pmt::pmt_t msg) { diff --git a/gr-zeromq/lib/pull_msg_source_impl.cc b/gr-zeromq/lib/pull_msg_source_impl.cc index 3e550d24665..0a3ca2233ee 100644 --- a/gr-zeromq/lib/pull_msg_source_impl.cc +++ b/gr-zeromq/lib/pull_msg_source_impl.cc @@ -59,7 +59,12 @@ pull_msg_source_impl::pull_msg_source_impl(char* address, int timeout, bool bind message_port_register_out(d_port); } -pull_msg_source_impl::~pull_msg_source_impl() {} +pull_msg_source_impl::~pull_msg_source_impl() +{ + d_context.shutdown(); + d_socket.close(); + d_context.close(); +} bool pull_msg_source_impl::start() { diff --git a/gr-zeromq/lib/push_msg_sink_impl.cc b/gr-zeromq/lib/push_msg_sink_impl.cc index e86b8d266dd..9aa02f4faf2 100644 --- a/gr-zeromq/lib/push_msg_sink_impl.cc +++ b/gr-zeromq/lib/push_msg_sink_impl.cc @@ -56,7 +56,12 @@ push_msg_sink_impl::push_msg_sink_impl(char* address, int timeout, bool bind) set_msg_handler(pmt::mp("in"), [this](pmt::pmt_t msg) { this->handler(msg); }); } -push_msg_sink_impl::~push_msg_sink_impl() {} +push_msg_sink_impl::~push_msg_sink_impl() +{ + d_context.shutdown(); + d_socket.close(); + d_context.close(); +} void push_msg_sink_impl::handler(pmt::pmt_t msg) { diff --git a/gr-zeromq/lib/rep_msg_sink_impl.cc b/gr-zeromq/lib/rep_msg_sink_impl.cc index b2e6a86bb51..480c0312242 100644 --- a/gr-zeromq/lib/rep_msg_sink_impl.cc +++ b/gr-zeromq/lib/rep_msg_sink_impl.cc @@ -58,7 +58,12 @@ rep_msg_sink_impl::rep_msg_sink_impl(char* address, int timeout, bool bind) message_port_register_in(d_port); } -rep_msg_sink_impl::~rep_msg_sink_impl() {} +rep_msg_sink_impl::~rep_msg_sink_impl() +{ + d_context.shutdown(); + d_socket.close(); + d_context.close(); +} bool rep_msg_sink_impl::start() { @@ -79,7 +84,7 @@ void rep_msg_sink_impl::readloop() while (!d_finished) { // while we have data, wait for query... - while (!empty_p(d_port)) { + while (!empty_p(d_port) && !d_finished) { // wait for query... zmq::pollitem_t items[] = { diff --git a/gr-zeromq/lib/req_msg_source_impl.cc b/gr-zeromq/lib/req_msg_source_impl.cc index b6ef5df6832..5020b5729a1 100644 --- a/gr-zeromq/lib/req_msg_source_impl.cc +++ b/gr-zeromq/lib/req_msg_source_impl.cc @@ -59,7 +59,12 @@ req_msg_source_impl::req_msg_source_impl(char* address, int timeout, bool bind) message_port_register_out(d_port); } -req_msg_source_impl::~req_msg_source_impl() {} +req_msg_source_impl::~req_msg_source_impl() +{ + d_context.shutdown(); + d_socket.close(); + d_context.close(); +} bool req_msg_source_impl::start() { @@ -131,6 +136,9 @@ void req_msg_source_impl::readloop() std::this_thread::sleep_for(100us); } } + d_context.shutdown(); + d_socket.close(); + d_context.close(); } } /* namespace zeromq */ diff --git a/gr-zeromq/lib/sub_msg_source_impl.cc b/gr-zeromq/lib/sub_msg_source_impl.cc index 5a03d1a67a9..46e3b108471 100644 --- a/gr-zeromq/lib/sub_msg_source_impl.cc +++ b/gr-zeromq/lib/sub_msg_source_impl.cc @@ -58,7 +58,12 @@ sub_msg_source_impl::sub_msg_source_impl(char* address, int timeout, bool bind) message_port_register_out(d_port); } -sub_msg_source_impl::~sub_msg_source_impl() {} +sub_msg_source_impl::~sub_msg_source_impl() +{ + d_context.shutdown(); + d_socket.close(); + d_context.close(); +} bool sub_msg_source_impl::start() {