Skip to content

Commit 6f944aa

Browse files
committed
Added timers to client program.
1 parent 1e58e8e commit 6f944aa

35 files changed

+364
-428
lines changed

core/modules/loader/Central.h

+18-5
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,6 @@ class Central {
6969

7070
virtual ~Central();
7171

72-
void run();
73-
7472
std::string getMasterHostName() const { return _masterAddr.ip; }
7573
int getMasterPort() const { return _masterAddr.port; }
7674
NetworkAddress getMasterAddr() const { return _masterAddr; }
@@ -80,7 +78,8 @@ class Central {
8078
int getErrCount() const { return _server->getErrCount(); }
8179

8280
/// Send the contents of 'sendBuf' to 'host:port'. This waits for the message to be
83-
/// sent before returning. Throws boost::system::system_error on failure.
81+
/// sent before returning.
82+
/// @throw boost::system::system_error on failure.
8483
void sendBufferTo(std::string const& host, int port, BufferUdp& sendBuf) {
8584
_server->sendBufferTo(host, port, sendBuf);
8685
}
@@ -103,19 +102,30 @@ class Central {
103102
return doList->addItem(item);
104103
}
105104

105+
/// Run the server.
106+
void runServer() {
107+
for (; _runningIOThreads < _iOThreads; ++_runningIOThreads) {
108+
run();
109+
}
110+
}
111+
106112
/// Provides a method for identifying different Central classes and
107113
/// CentralWorkers in the log file.
108114
virtual std::string getOurLogId() const { return "Central baseclass"; }
109115

110116
protected:
111117
Central(boost::asio::io_service& ioService_,
112118
std::string const& masterHostName, int masterPort,
113-
int threadPoolSize, int loopSleepTime)
119+
int threadPoolSize, int loopSleepTime,
120+
int iOThreads)
114121
: ioService(ioService_), _masterAddr(masterHostName, masterPort),
115-
_threadPoolSize(threadPoolSize), _loopSleepTime(loopSleepTime) {
122+
_threadPoolSize(threadPoolSize), _loopSleepTime(loopSleepTime),
123+
_iOThreads(iOThreads) {
116124
_initialize();
117125
}
118126

127+
void run(); ///< Run a single asio thread.
128+
119129
boost::asio::io_service& ioService;
120130

121131
DoList::Ptr doList; ///< List of items to be checked at regular intervals.
@@ -143,6 +153,9 @@ class Central {
143153
std::vector<std::thread> _ioServiceThreads; ///< List of asio io threads created by this
144154

145155
std::thread _checkDoListThread; ///< Thread for running doList checks on DoListItems.
156+
157+
int _iOThreads{5}; ///< Number of asio IO threads to run, set by config file.
158+
int _runningIOThreads{0}; ///< Number of asio IO threads started.
146159
};
147160

148161
}}} // namespace lsst::qserv::loader

core/modules/loader/CentralClient.cc

+46-43
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,13 @@ namespace loader {
5050

5151
CentralClient::CentralClient(boost::asio::io_service& ioService_,
5252
std::string const& hostName, ClientConfig const& cfg)
53-
: Central(ioService_, cfg.getMasterHost(), cfg.getMasterPortUdp(), cfg.getThreadPoolSize(), cfg.getLoopSleepTime()),
53+
: Central(ioService_, cfg.getMasterHost(), cfg.getMasterPortUdp(), cfg.getThreadPoolSize(), cfg.getLoopSleepTime(), cfg.getIOThreads()),
5454
_hostName(hostName), _udpPort(cfg.getClientPortUdp()),
5555
_defWorkerHost(cfg.getDefWorkerHost()),
5656
_defWorkerPortUdp(cfg.getDefWorkerPortUdp()),
5757
_doListMaxLookups(cfg.getMaxLookups()),
5858
_doListMaxInserts(cfg.getMaxInserts()),
59-
_maxRequestSleepTime(cfg.getMaxRequestSleepTime()){
59+
_maxRequestSleepTime(cfg.getMaxRequestSleepTime()) {
6060
}
6161

6262

@@ -65,45 +65,45 @@ void CentralClient::start() {
6565
}
6666

6767

68-
void CentralClient::handleKeyInfo(LoaderMsg const& inMsg, BufferUdp::Ptr const& data) {
69-
LOGS(_log, LOG_LVL_DEBUG, "CentralClient::handleKeyInfo");
68+
void CentralClient::handleKeyLookup(LoaderMsg const& inMsg, BufferUdp::Ptr const& data) {
69+
LOGS(_log, LOG_LVL_DEBUG, "CentralClient::handleKeyLookup");
7070

71-
StringElement::Ptr sData = std::dynamic_pointer_cast<StringElement>(MsgElement::retrieve(*data));
71+
auto const sData = std::dynamic_pointer_cast<StringElement>(MsgElement::retrieve(*data));
7272
if (sData == nullptr) {
73-
LOGS(_log, LOG_LVL_WARN, "CentralClient::handleKeyInsertComplete Failed to parse list");
73+
LOGS(_log, LOG_LVL_WARN, "CentralClient::handleKeyLookup Failed to parse list");
7474
return;
7575
}
7676
auto protoData = sData->protoParse<proto::KeyInfo>();
7777
if (protoData == nullptr) {
78-
LOGS(_log, LOG_LVL_WARN, "CentralClient::handleKeyInsertComplete Failed to parse list");
78+
LOGS(_log, LOG_LVL_WARN, "CentralClient::handleKeyLookup Failed to parse list");
7979
return;
8080
}
8181

8282
// TODO put in separate thread
83-
_handleKeyInfo(inMsg, protoData);
83+
_handleKeyLookup(inMsg, protoData);
8484
}
8585

8686

87-
void CentralClient::_handleKeyInfo(LoaderMsg const& inMsg, std::unique_ptr<proto::KeyInfo>& protoBuf) {
87+
void CentralClient::_handleKeyLookup(LoaderMsg const& inMsg, std::unique_ptr<proto::KeyInfo>& protoBuf) {
8888
std::unique_ptr<proto::KeyInfo> protoData(std::move(protoBuf));
8989

9090
CompositeKey key(protoData->keyint(), protoData->keystr());
9191
ChunkSubchunk chunkInfo(protoData->chunk(), protoData->subchunk());
9292

9393
LOGS(_log, LOG_LVL_DEBUG, "trying to remove oneShot for lookup key=" << key << " " << chunkInfo);
9494
/// Locate the original one shot and mark it as done.
95-
CentralClient::KeyInfoReqOneShot::Ptr keyInfoOneShot;
95+
CentralClient::KeyLookupReqOneShot::Ptr keyLookupOneShot;
9696
{
97-
std::lock_guard<std::mutex> lck(_waitingKeyInfoMtx);
98-
auto iter = _waitingKeyInfoMap.find(key);
99-
if (iter == _waitingKeyInfoMap.end()) {
100-
LOGS(_log, LOG_LVL_WARN, "handleKeyInfoComplete could not find key=" << key);
97+
std::lock_guard<std::mutex> lck(_waitingKeyLookupMtx);
98+
auto iter = _waitingKeyLookupMap.find(key);
99+
if (iter == _waitingKeyLookupMap.end()) {
100+
LOGS(_log, LOG_LVL_WARN, "_handleKeyLookup could not find key=" << key);
101101
return;
102102
}
103-
keyInfoOneShot = iter->second;
104-
_waitingKeyInfoMap.erase(iter);
103+
keyLookupOneShot = iter->second;
104+
_waitingKeyLookupMap.erase(iter);
105105
}
106-
keyInfoOneShot->keyInfoComplete(key, chunkInfo.chunk, chunkInfo.subchunk, protoData->success());
106+
keyLookupOneShot->keyInfoComplete(key, chunkInfo.chunk, chunkInfo.subchunk, protoData->success());
107107
LOGS(_log, LOG_LVL_INFO, "Successfully found key=" << key << " " << chunkInfo);
108108
}
109109

@@ -139,14 +139,14 @@ void CentralClient::_handleKeyInsertComplete(LoaderMsg const& inMsg, std::unique
139139
size_t mapSize;
140140
{
141141
std::lock_guard<std::mutex> lck(_waitingKeyInsertMtx);
142-
mapSize = _waitingKeyInsertMap.size();
143142
auto iter = _waitingKeyInsertMap.find(key);
144143
if (iter == _waitingKeyInsertMap.end()) {
145144
LOGS(_log, LOG_LVL_WARN, "handleKeyInsertComplete could not find key=" << key);
146145
return;
147146
}
148147
keyInsertOneShot = iter->second;
149148
_waitingKeyInsertMap.erase(iter);
149+
mapSize = _waitingKeyInsertMap.size();
150150
}
151151
keyInsertOneShot->keyInsertComplete();
152152
LOGS(_log, LOG_LVL_INFO, "Successfully inserted key=" << key << " " << chunkInfo <<
@@ -178,6 +178,7 @@ KeyInfoData::Ptr CentralClient::keyInsertReq(CompositeKey const& key, int chunk,
178178
LOGS(_log, LOG_LVL_INFO, "keyInsertReq waiting key=" << key <<
179179
"size=" << sz << " loopCount=" << loopCount);
180180
}
181+
// Let the CPU do something else while waiting for some requests to finish.
181182
usleep(_maxRequestSleepTime);
182183
++loopCount;
183184
lck.lock();
@@ -230,60 +231,64 @@ void CentralClient::_keyInsertReq(CompositeKey const& key, int chunk, int subchu
230231
strElem.appendToData(msgData);
231232
try {
232233
sendBufferTo(getDefWorkerHost(), getDefWorkerPortUdp(), msgData);
233-
} catch (boost::system::system_error e) {
234+
} catch (boost::system::system_error const& e) {
234235
LOGS(_log, LOG_LVL_ERROR, "CentralClient::_keyInsertReq boost system_error=" << e.what() <<
235236
" key=" << key << " chunk=" << chunk << " sub=" << subchunk);
236-
exit(-1); // TODO:&&& The correct course of action is unclear and requires thought,
237-
// so just blow up so it's unmistakable something bad happened for now.
238237
}
239238
}
240239

241240

242-
KeyInfoData::Ptr CentralClient::keyInfoReq(CompositeKey const& key) {
241+
KeyInfoData::Ptr CentralClient::keyLookupReq(CompositeKey const& key) {
243242
// Returns a pointer to a Tracker object that can be used to track job
244243
// completion and job status. keyInsertOneShot will call _keyInsertReq until
245244
// it knows the task was completed. _handleKeyInfoComplete marks
246245
// the jobs complete as the messages come in from workers.
247246
// Insert a oneShot DoListItem to keep trying to add the key until
248247
// we get word that it has been added successfully.
249248
LOGS(_log, LOG_LVL_INFO, "Trying to lookup key=" << key);
250-
auto keyInfoOneShot = std::make_shared<CentralClient::KeyInfoReqOneShot>(this, key);
249+
auto keyLookupOneShot = std::make_shared<CentralClient::KeyLookupReqOneShot>(this, key);
251250
{
252-
std::unique_lock<std::mutex> lck(_waitingKeyInfoMtx);
251+
std::unique_lock<std::mutex> lck(_waitingKeyLookupMtx);
253252
// Limit the number of concurrent lookups.
254253
// If the key is already in the map, there is no point in blocking.
255254
int loopCount = 0;
256-
auto iter = _waitingKeyInfoMap.find(key);
257-
while (_waitingKeyInfoMap.size() > _doListMaxLookups
258-
&& iter == _waitingKeyInfoMap.end()) {
259-
size_t sz = _waitingKeyInfoMap.size();
255+
uint64_t sleptForMicroSec = 0;
256+
uint64_t const tenSec = 10000000;
257+
auto iter = _waitingKeyLookupMap.find(key);
258+
while (_waitingKeyLookupMap.size() > _doListMaxLookups
259+
&& iter == _waitingKeyLookupMap.end()) {
260+
size_t sz = _waitingKeyLookupMap.size();
260261
lck.unlock();
261-
if (loopCount % 100 == 0) {
262+
// Log a message about this about once every 10 seconds.
263+
if (sleptForMicroSec > tenSec) sleptForMicroSec = 0;
264+
if (sleptForMicroSec == 0) {
262265
LOGS(_log, LOG_LVL_INFO, "keyInfoReq waiting key=" << key <<
263266
"size=" << sz << " loopCount=" << loopCount);
264267
}
268+
// Let the CPU do something else while waiting for some requests to finish.
265269
usleep(_maxRequestSleepTime);
270+
sleptForMicroSec += _maxRequestSleepTime;
266271
++loopCount;
267272
lck.lock();
268-
iter = _waitingKeyInfoMap.find(key);
273+
iter = _waitingKeyLookupMap.find(key);
269274
}
270275

271276
// Use the existing lookup, if there is one.
272-
if (iter != _waitingKeyInfoMap.end()) {
277+
if (iter != _waitingKeyLookupMap.end()) {
273278
auto cData = iter->second->cmdData;
274279
return cData;
275280
}
276281

277-
_waitingKeyInfoMap[key] = keyInfoOneShot;
282+
_waitingKeyLookupMap[key] = keyLookupOneShot;
278283
}
279-
runAndAddDoListItem(keyInfoOneShot);
280-
return keyInfoOneShot->cmdData;
284+
runAndAddDoListItem(keyLookupOneShot);
285+
return keyLookupOneShot->cmdData;
281286
}
282287

283288

284-
void CentralClient::_keyInfoReq(CompositeKey const& key) {
285-
LOGS(_log, LOG_LVL_INFO, "CentralClient::_keyInfoReq trying key=" << key);
286-
LoaderMsg msg(LoaderMsg::KEY_INFO_REQ, getNextMsgId(), getHostName(), getUdpPort());
289+
void CentralClient::_keyLookupReq(CompositeKey const& key) {
290+
LOGS(_log, LOG_LVL_INFO, "CentralClient::_keyLookupReq trying key=" << key);
291+
LoaderMsg msg(LoaderMsg::KEY_LOOKUP_REQ, getNextMsgId(), getHostName(), getUdpPort());
287292
BufferUdp msgData;
288293
msg.appendToData(msgData);
289294
// create the proto buffer
@@ -305,11 +310,9 @@ void CentralClient::_keyInfoReq(CompositeKey const& key) {
305310

306311
try {
307312
sendBufferTo(getDefWorkerHost(), getDefWorkerPortUdp(), msgData);
308-
} catch (boost::system::system_error e) {
313+
} catch (boost::system::system_error const& e) {
309314
LOGS(_log, LOG_LVL_ERROR, "CentralClient::_keyInfoReq boost system_error=" << e.what() <<
310315
" key=" << key);
311-
exit(-1); // TODO:&&& The correct course of action is unclear and requires thought.
312-
// So just blow up so it's unmistakable something bad happened for now.
313316
}
314317
}
315318

@@ -341,11 +344,11 @@ void CentralClient::KeyInsertReqOneShot::keyInsertComplete() {
341344
}
342345

343346

344-
util::CommandTracked::Ptr CentralClient::KeyInfoReqOneShot::createCommand() {
347+
util::CommandTracked::Ptr CentralClient::KeyLookupReqOneShot::createCommand() {
345348
struct KeyInfoReqCmd : public util::CommandTracked {
346349
KeyInfoReqCmd(KeyInfoData::Ptr& cd, CentralClient* cent_) : cData(cd), cent(cent_) {}
347350
void action(util::CmdData*) override {
348-
cent->_keyInfoReq(cData->key);
351+
cent->_keyLookupReq(cData->key);
349352
}
350353
KeyInfoData::Ptr cData;
351354
CentralClient* cent;
@@ -354,7 +357,7 @@ util::CommandTracked::Ptr CentralClient::KeyInfoReqOneShot::createCommand() {
354357
}
355358

356359

357-
void CentralClient::KeyInfoReqOneShot::keyInfoComplete(CompositeKey const& key,
360+
void CentralClient::KeyLookupReqOneShot::keyInfoComplete(CompositeKey const& key,
358361
int chunk, int subchunk, bool success) {
359362
if (key == cmdData->key) {
360363
cmdData->chunk = chunk;

core/modules/loader/CentralClient.h

+15-17
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ class CentralClient : public Central {
7474

7575
void start();
7676

77-
7877
~CentralClient() override = default;
7978

8079
std::string const& getHostName() const { return _hostName; }
@@ -86,8 +85,6 @@ class CentralClient : public Central {
8685
/// @return the default worker's UDP port
8786
int getDefWorkerPortUdp() const { return _defWorkerPortUdp; }
8887

89-
90-
9188
/// Asynchronously request a key value insert to the workers.
9289
/// This can block if too many key insert requests are already in progress.
9390
/// @return - a KeyInfoData object for checking the job's status or
@@ -101,18 +98,18 @@ class CentralClient : public Central {
10198
/// Asynchronously request a key value lookup from the workers. It returns a
10299
/// KeyInfoData object to be used to track job status and get the value of the key.
103100
/// This can block if too many key lookup requests are already in progress.
104-
KeyInfoData::Ptr keyInfoReq(CompositeKey const& key);
101+
KeyInfoData::Ptr keyLookupReq(CompositeKey const& key);
105102
/// Handle a workers response to the keyInfoReq call.
106-
void handleKeyInfo(LoaderMsg const& inMsg, BufferUdp::Ptr const& data);
103+
void handleKeyLookup(LoaderMsg const& inMsg, BufferUdp::Ptr const& data);
107104

108105
std::string getOurLogId() const override { return "client"; }
109106

110107
private:
111108
void _keyInsertReq(CompositeKey const& key, int chunk, int subchunk); ///< see keyInsertReq()
112109
void _handleKeyInsertComplete(LoaderMsg const& inMsg, std::unique_ptr<proto::KeyInfo>& protoBuf);
113110

114-
void _keyInfoReq(CompositeKey const& key); ///< see keyInfoReq()
115-
void _handleKeyInfo(LoaderMsg const& inMsg, std::unique_ptr<proto::KeyInfo>& protoBuf);
111+
void _keyLookupReq(CompositeKey const& key); ///< see keyLookReq()
112+
void _handleKeyLookup(LoaderMsg const& inMsg, std::unique_ptr<proto::KeyInfo>& protoBuf);
116113

117114

118115

@@ -136,12 +133,12 @@ class CentralClient : public Central {
136133
CentralClient* central;
137134
};
138135

139-
/// Create commands to find a key in the index and get its value.
136+
/// Create commands to lookup a key in the index and get its value.
140137
/// It should keep trying this until it works and then drop it from _waitingKeyInfoMap.
141-
struct KeyInfoReqOneShot : public DoListItem {
142-
using Ptr = std::shared_ptr<KeyInfoReqOneShot>;
138+
struct KeyLookupReqOneShot : public DoListItem {
139+
using Ptr = std::shared_ptr<KeyLookupReqOneShot>;
143140

144-
KeyInfoReqOneShot(CentralClient* central_, CompositeKey const& key_) :
141+
KeyLookupReqOneShot(CentralClient* central_, CompositeKey const& key_) :
145142
cmdData(std::make_shared<KeyInfoData>(key_, -1, -1)), central(central_) { setOneShot(true); }
146143

147144
util::CommandTracked::Ptr createCommand() override;
@@ -164,15 +161,16 @@ class CentralClient : public Central {
164161
const int _defWorkerPortUdp; ///< Default worker UDP port
165162

166163

167-
size_t _doListMaxLookups{1000}; ///< Maximum number of concurrent lookups in DoList (set by config)
168-
size_t _doListMaxInserts{1000}; ///< Maximum number of concurrent inserts in DoList (set by config)
169-
int _maxRequestSleepTime{100000}; ///< Time to sleep between checking requests when at max length (set by config)
164+
size_t const _doListMaxLookups = 1000; ///< Maximum number of concurrent lookups in DoList, set by config
165+
size_t const _doListMaxInserts = 1000; ///< Maximum number of concurrent inserts in DoList, set by config
166+
/// Time to sleep between checking requests when at max length, set by config
167+
int const _maxRequestSleepTime = 100000;
170168

171-
std::map<CompositeKey, KeyInsertReqOneShot::Ptr> _waitingKeyInsertMap;
169+
std::map<CompositeKey, KeyInsertReqOneShot::Ptr> _waitingKeyInsertMap; ///< Map of current insert requests.
172170
std::mutex _waitingKeyInsertMtx; ///< protects _waitingKeyInsertMap, _doListMaxInserts
173171

174-
std::map<CompositeKey, KeyInfoReqOneShot::Ptr> _waitingKeyInfoMap; // &&& change all references of keyInfo to keyLookup &&&, including protobuf keyInfo should only apply to worker key count and worker key range.
175-
std::mutex _waitingKeyInfoMtx; ///< protects _waitingKeyInfoMap, _doListMaxLookups
172+
std::map<CompositeKey, KeyLookupReqOneShot::Ptr> _waitingKeyLookupMap; ///< Map of current look up requests.
173+
std::mutex _waitingKeyLookupMtx; ///< protects _waitingKeyLookMap, _doListMaxLookups
176174
};
177175

178176
}}} // namespace lsst::qserv::loader

core/modules/loader/CentralMaster.cc

+3-7
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ void CentralMaster::addWorker(std::string const& ip, int udpPort, int tcpPort) {
6565
}
6666

6767

68-
void CentralMaster::updateWorkerInfo(uint32_t workerId, NeighborsInfo const& nInfo, StringRange const& strRange) {
68+
void CentralMaster::updateWorkerInfo(uint32_t workerId, NeighborsInfo const& nInfo, KeyRange const& strRange) {
6969
if (workerId == 0) {
7070
return;
7171
}
@@ -99,12 +99,10 @@ void CentralMaster::setWorkerNeighbor(MWorkerListItem::WPtr const& target, int m
9999
auto addr = targetWorker->getUdpAddress();
100100
try {
101101
sendBufferTo(addr.ip, addr.port, msgData);
102-
} catch (boost::system::system_error e) {
102+
} catch (boost::system::system_error const& e) {
103103
LOGS(_log, LOG_LVL_ERROR, "CentralMaster::setWorkerNeighbor boost system_error=" << e.what() <<
104104
" targ=" << *targetWorker << " msg=" << message <<
105105
" neighborId=" << neighborId);
106-
exit(-1); // TODO:&&& The correct course of action is unclear and requires thought,
107-
// so just blow up so it's unmistakable something bad happened for now.
108106
}
109107
}
110108

@@ -206,12 +204,10 @@ void CentralMaster::reqWorkerKeysInfo(uint64_t msgId, std::string const& targetI
206204
reqMsg.appendToData(data);
207205
try {
208206
sendBufferTo(targetIp, targetPort, data);
209-
} catch (boost::system::system_error e) {
207+
} catch (boost::system::system_error const& e) {
210208
LOGS(_log, LOG_LVL_ERROR, "CentralMaster::reqWorkerKeysInfo boost system_error=" << e.what() <<
211209
" msgId=" << msgId << " tIp=" << targetIp << " tPort=" << targetPort <<
212210
" ourHost=" << ourHostName << " ourPort=" << ourPort);
213-
exit(-1); // TODO:&&& The correct course of action is unclear and requires thought,
214-
// so just blow up so it's unmistakable something bad happened for now.
215211
}
216212
}
217213

0 commit comments

Comments
 (0)