Skip to content

Commit

Permalink
zmq: Improve the teardown of ZMQ Sources/Sinks
Browse files Browse the repository at this point in the history
Tweaking gr-zmq to be more friendly to our use cases.

Update *.cc

Format change.

Signed-off-by: Bill Clark <[email protected]>
  • Loading branch information
SaikWolf authored and willcode committed Sep 28, 2023
1 parent e2f1477 commit 535a1e3
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 8 deletions.
7 changes: 6 additions & 1 deletion gr-zeromq/lib/base_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,12 @@ base_impl::base_impl(int type,
}
}

base_impl::~base_impl() {}
base_impl::~base_impl()
{
d_context.shutdown();
d_socket.close();
d_context.close();
}

std::string base_impl::last_endpoint()
{
Expand Down
7 changes: 6 additions & 1 deletion gr-zeromq/lib/pub_msg_sink_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@ pub_msg_sink_impl::pub_msg_sink_impl(char* address, int timeout, bool bind)
set_msg_handler(pmt::mp("in"), [this](pmt::pmt_t msg) { this->handler(msg); });
}

pub_msg_sink_impl::~pub_msg_sink_impl() {}
pub_msg_sink_impl::~pub_msg_sink_impl()
{
d_context.shutdown();
d_socket.close();
d_context.close();
}

void pub_msg_sink_impl::handler(pmt::pmt_t msg)
{
Expand Down
7 changes: 6 additions & 1 deletion gr-zeromq/lib/pull_msg_source_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ pull_msg_source_impl::pull_msg_source_impl(char* address, int timeout, bool bind
message_port_register_out(d_port);
}

pull_msg_source_impl::~pull_msg_source_impl() {}
pull_msg_source_impl::~pull_msg_source_impl()
{
d_context.shutdown();
d_socket.close();
d_context.close();
}

bool pull_msg_source_impl::start()
{
Expand Down
7 changes: 6 additions & 1 deletion gr-zeromq/lib/push_msg_sink_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,12 @@ push_msg_sink_impl::push_msg_sink_impl(char* address, int timeout, bool bind)
set_msg_handler(pmt::mp("in"), [this](pmt::pmt_t msg) { this->handler(msg); });
}

push_msg_sink_impl::~push_msg_sink_impl() {}
push_msg_sink_impl::~push_msg_sink_impl()
{
d_context.shutdown();
d_socket.close();
d_context.close();
}

void push_msg_sink_impl::handler(pmt::pmt_t msg)
{
Expand Down
9 changes: 7 additions & 2 deletions gr-zeromq/lib/rep_msg_sink_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ rep_msg_sink_impl::rep_msg_sink_impl(char* address, int timeout, bool bind)
message_port_register_in(d_port);
}

rep_msg_sink_impl::~rep_msg_sink_impl() {}
rep_msg_sink_impl::~rep_msg_sink_impl()
{
d_context.shutdown();
d_socket.close();
d_context.close();
}

bool rep_msg_sink_impl::start()
{
Expand All @@ -79,7 +84,7 @@ void rep_msg_sink_impl::readloop()
while (!d_finished) {

// while we have data, wait for query...
while (!empty_p(d_port)) {
while (!empty_p(d_port) && !d_finished) {

// wait for query...
zmq::pollitem_t items[] = {
Expand Down
10 changes: 9 additions & 1 deletion gr-zeromq/lib/req_msg_source_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ req_msg_source_impl::req_msg_source_impl(char* address, int timeout, bool bind)
message_port_register_out(d_port);
}

req_msg_source_impl::~req_msg_source_impl() {}
req_msg_source_impl::~req_msg_source_impl()
{
d_context.shutdown();
d_socket.close();
d_context.close();
}

bool req_msg_source_impl::start()
{
Expand Down Expand Up @@ -131,6 +136,9 @@ void req_msg_source_impl::readloop()
std::this_thread::sleep_for(100us);
}
}
d_context.shutdown();
d_socket.close();
d_context.close();
}

} /* namespace zeromq */
Expand Down
7 changes: 6 additions & 1 deletion gr-zeromq/lib/sub_msg_source_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ sub_msg_source_impl::sub_msg_source_impl(char* address, int timeout, bool bind)
message_port_register_out(d_port);
}

sub_msg_source_impl::~sub_msg_source_impl() {}
sub_msg_source_impl::~sub_msg_source_impl()
{
d_context.shutdown();
d_socket.close();
d_context.close();
}

bool sub_msg_source_impl::start()
{
Expand Down

0 comments on commit 535a1e3

Please sign in to comment.