Skip to content

Commit 9da42bd

Browse files
knopers8sy-c
authored andcommitted
Fix FairMQ device consumers (#28)
The consmuers with FairMQ devices were getting stuck during the device initialization after FairMQ bump from 1.2.7.1 to 1.3.6 (the commit 'Run state handlers on the main thread (breaking change for control).').
1 parent 90b054b commit 9da42bd

File tree

2 files changed

+32
-5
lines changed

2 files changed

+32
-5
lines changed

src/ConsumerDataSampling.cxx

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#ifdef WITH_FAIRMQ
44

5+
#include <thread>
56
#include <fairmq/FairMQDevice.h>
67
#include <fairmq/FairMQMessage.h>
78
#include <fairmq/FairMQTransportFactory.h>
@@ -20,7 +21,6 @@ class ConsumerDataSampling: public Consumer {
2021

2122
void Run() override {
2223
while (CheckCurrentState(RUNNING)) {
23-
//printf("loop Run()\n");
2424
usleep(200000);
2525
}
2626
}
@@ -39,16 +39,17 @@ class ConsumerDataSampling: public Consumer {
3939
private:
4040
std::vector<FairMQChannel> channels;
4141
FMQSender sender;
42-
42+
std::thread deviceThread;
4343

4444
// todo: check why this type is not public in FMQ interface?
4545
typedef std::unordered_map<std::string, std::vector<FairMQChannel>> FairMQMap;
4646
FairMQMap m;
4747

4848
std::shared_ptr<FairMQTransportFactory> transportFactory;
4949

50-
public:
50+
public:
5151
ConsumerDataSampling(ConfigFile &cfg, std::string cfgEntryPoint) : Consumer(cfg,cfgEntryPoint), channels(1) {
52+
channels[0].UpdateChannelName("data-out");
5253
channels[0].UpdateType("pub"); // pub or push?
5354
channels[0].UpdateMethod("bind");
5455
channels[0].UpdateAddress("ipc:///tmp/readout-pipe-1");
@@ -58,7 +59,6 @@ class ConsumerDataSampling: public Consumer {
5859
throw "ConsumerDataSampling: channel validation failed";
5960
}
6061

61-
6262
// todo: def "data-out" as const string to name output channel to which we will push
6363
m.emplace(std::string("data-out"),channels);
6464

@@ -71,6 +71,8 @@ class ConsumerDataSampling: public Consumer {
7171

7272
transportFactory = FairMQTransportFactory::CreateTransportFactory("zeromq");
7373

74+
deviceThread = std::thread(&ConsumerDataSampling::runDevice, this);
75+
7476
sender.fChannels = m;
7577
sender.SetTransport("zeromq");
7678
sender.ChangeState(FairMQStateMachine::Event::INIT_DEVICE);
@@ -88,12 +90,21 @@ class ConsumerDataSampling: public Consumer {
8890
sender.ChangeState(FairMQStateMachine::Event::RESET_DEVICE);
8991
sender.WaitForEndOfState(FairMQStateMachine::Event::RESET_DEVICE);
9092
sender.ChangeState(FairMQStateMachine::Event::END);
93+
94+
if (deviceThread.joinable()) {
95+
deviceThread.join();
96+
}
9197
}
9298
int pushData(DataBlockContainerReference &b) {
9399

94100
// we create a copy of the reference, in a newly allocated object, so that reference is kept alive until this new object is destroyed in the cleanupCallback
95101
DataBlockContainerReference *ptr=new DataBlockContainerReference(b);
96102

103+
if (sender.CheckCurrentState(FairMQStateMachine::Event::RUN) ) {
104+
LOG(ERROR) << "ConsumerDataSampling: Trying to send data when the device is not in RUN state";
105+
return -1;
106+
}
107+
97108
std::unique_ptr<FairMQMessage> msgHeader(transportFactory->CreateMessage((void *)&(b->getData()->header), (size_t)(b->getData()->header.headerSize), cleanupCallback, (void *)nullptr));
98109
std::unique_ptr<FairMQMessage> msgBody(transportFactory->CreateMessage((void *)(b->getData()->data), (size_t)(b->getData()->header.dataSize), cleanupCallback, (void *)(ptr)));
99110

@@ -106,6 +117,11 @@ class ConsumerDataSampling: public Consumer {
106117
return 0;
107118
}
108119
private:
120+
121+
void runDevice() {
122+
sender.RunStateMachine();
123+
}
124+
109125
};
110126

111127

src/ConsumerFMQ.cxx

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
#ifdef WITH_FAIRMQ
55

6+
#include <thread>
67
#include <fairmq/FairMQDevice.h>
78
#include <fairmq/FairMQMessage.h>
89
#include <fairmq/FairMQTransportFactory.h>
@@ -49,10 +50,10 @@ class ConsumerFMQ: public Consumer {
4950
FairMQMap m;
5051

5152
std::shared_ptr<FairMQTransportFactory> transportFactory;
53+
std::thread deviceThread;
5254

5355
public:
5456

55-
5657
ConsumerFMQ(ConfigFile &cfg, std::string cfgEntryPoint) : Consumer(cfg,cfgEntryPoint), channels(1) {
5758

5859
channels[0].UpdateType("pair"); // pub or push?
@@ -77,6 +78,8 @@ class ConsumerFMQ: public Consumer {
7778

7879
transportFactory = FairMQTransportFactory::CreateTransportFactory("zeromq");
7980

81+
deviceThread = std::thread(&ConsumerFMQ::runDevice, this);
82+
8083
sender.fChannels = m;
8184
sender.SetTransport("zeromq");
8285
sender.ChangeState(FairMQStateMachine::Event::INIT_DEVICE);
@@ -95,6 +98,10 @@ class ConsumerFMQ: public Consumer {
9598
sender.ChangeState(FairMQStateMachine::Event::RESET_DEVICE);
9699
sender.WaitForEndOfState(FairMQStateMachine::Event::RESET_DEVICE);
97100
sender.ChangeState(FairMQStateMachine::Event::END);
101+
102+
if (deviceThread.joinable()) {
103+
deviceThread.join();
104+
}
98105
}
99106

100107
int pushData(DataBlockContainerReference &b) {
@@ -121,6 +128,10 @@ class ConsumerFMQ: public Consumer {
121128
return 0;
122129
}
123130
private:
131+
132+
void runDevice() {
133+
sender.RunStateMachine();
134+
}
124135
};
125136

126137

0 commit comments

Comments
 (0)