Skip to content

Commit e31b764

Browse files
committed
Changed to use CompositeKey instead of string for keys.
1 parent 60ade03 commit e31b764

22 files changed

+568
-199
lines changed

core/modules/loader/CentralClient.cc

+11-9
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ void CentralClient::handleKeyInfo(LoaderMsg const& inMsg, BufferUdp::Ptr const&
8686
void CentralClient::_handleKeyInfo(LoaderMsg const& inMsg, std::unique_ptr<proto::KeyInfo>& protoBuf) {
8787
std::unique_ptr<proto::KeyInfo> protoData(std::move(protoBuf));
8888

89-
std::string key = protoData->key();
89+
CompositeKey key(protoData->keyint(), protoData->keystr());
9090
ChunkSubchunk chunkInfo(protoData->chunk(), protoData->subchunk());
9191

9292
LOGS(_log, LOG_LVL_INFO, "trying to remove oneShot for lookup key=" << key << " " << chunkInfo);
@@ -129,7 +129,7 @@ void CentralClient::handleKeyInsertComplete(LoaderMsg const& inMsg, BufferUdp::P
129129
void CentralClient::_handleKeyInsertComplete(LoaderMsg const& inMsg, std::unique_ptr<proto::KeyInfo>& protoBuf) {
130130
std::unique_ptr<proto::KeyInfo> protoData(std::move(protoBuf));
131131

132-
std::string key = protoData->key();
132+
CompositeKey key(protoData->keyint(), protoData->keystr());
133133
ChunkSubchunk chunkInfo(protoData->chunk(), protoData->subchunk());
134134

135135
LOGS(_log, LOG_LVL_DEBUG, "trying to remove oneShot for key=" << key << " " << chunkInfo);
@@ -157,7 +157,7 @@ void CentralClient::_handleKeyInsertComplete(LoaderMsg const& inMsg, std::unique
157157
// completion and the status of the job. keyInsertOneShot will call
158158
// _keyInsertReq until it knows the task was completed via a call
159159
// to _handleKeyInsertComplete
160-
KeyInfoData::Ptr CentralClient::keyInsertReq(std::string const& key, int chunk, int subchunk) {
160+
KeyInfoData::Ptr CentralClient::keyInsertReq(CompositeKey const& key, int chunk, int subchunk) {
161161
// Insert a oneShot DoListItem to keep trying to add the key until
162162
// we get word that it has been added successfully.
163163
LOGS(_log, LOG_LVL_INFO, "Trying to insert key=" << key << " chunk=" << chunk <<
@@ -206,7 +206,7 @@ KeyInfoData::Ptr CentralClient::keyInsertReq(std::string const& key, int chunk,
206206
}
207207

208208

209-
void CentralClient::_keyInsertReq(std::string const& key, int chunk, int subchunk) {
209+
void CentralClient::_keyInsertReq(CompositeKey const& key, int chunk, int subchunk) {
210210
LOGS(_log, LOG_LVL_INFO, "CentralClient::_keyInsertReq trying key=" << key);
211211
LoaderMsg msg(LoaderMsg::KEY_INSERT_REQ, getNextMsgId(), getHostName(), getUdpPort());
212212
BufferUdp msgData;
@@ -218,7 +218,8 @@ void CentralClient::_keyInsertReq(std::string const& key, int chunk, int subchun
218218
protoAddr->set_udpport(getUdpPort());
219219
protoAddr->set_tcpport(getTcpPort());
220220
lsst::qserv::proto::KeyInfo* protoKeyInfo = protoKeyInsert.mutable_keyinfo();
221-
protoKeyInfo->set_key(key);
221+
protoKeyInfo->set_keyint(key.kInt);
222+
protoKeyInfo->set_keystr(key.kStr);
222223
protoKeyInfo->set_chunk(chunk);
223224
protoKeyInfo->set_subchunk(subchunk);
224225
protoKeyInsert.set_hops(0);
@@ -231,7 +232,7 @@ void CentralClient::_keyInsertReq(std::string const& key, int chunk, int subchun
231232
}
232233

233234

234-
KeyInfoData::Ptr CentralClient::keyInfoReq(std::string const& key) {
235+
KeyInfoData::Ptr CentralClient::keyInfoReq(CompositeKey const& key) {
235236
// Returns a pointer to a Tracker object that can be used to track job
236237
// completion and job status. keyInsertOneShot will call _keyInsertReq until
237238
// it knows the task was completed. _handleKeyInfoComplete marks
@@ -273,7 +274,7 @@ KeyInfoData::Ptr CentralClient::keyInfoReq(std::string const& key) {
273274
}
274275

275276

276-
void CentralClient::_keyInfoReq(std::string const& key) {
277+
void CentralClient::_keyInfoReq(CompositeKey const& key) {
277278
LOGS(_log, LOG_LVL_INFO, "CentralClient::_keyInfoReq trying key=" << key);
278279
LoaderMsg msg(LoaderMsg::KEY_INFO_REQ, getNextMsgId(), getHostName(), getUdpPort());
279280
BufferUdp msgData;
@@ -285,7 +286,8 @@ void CentralClient::_keyInfoReq(std::string const& key) {
285286
protoAddr->set_udpport(getUdpPort());
286287
protoAddr->set_tcpport(getTcpPort());
287288
lsst::qserv::proto::KeyInfo* protoKeyInfo = protoKeyInsert.mutable_keyinfo();
288-
protoKeyInfo->set_key(key);
289+
protoKeyInfo->set_keyint(key.kInt);
290+
protoKeyInfo->set_keystr(key.kStr);
289291
protoKeyInfo->set_chunk(0);
290292
protoKeyInfo->set_subchunk(0);
291293
protoKeyInsert.set_hops(0);
@@ -338,7 +340,7 @@ util::CommandTracked::Ptr CentralClient::KeyInfoReqOneShot::createCommand() {
338340
}
339341

340342

341-
void CentralClient::KeyInfoReqOneShot::keyInfoComplete(std::string const& key,
343+
void CentralClient::KeyInfoReqOneShot::keyInfoComplete(CompositeKey const& key,
342344
int chunk, int subchunk, bool success) {
343345
if (key == cmdData->key) {
344346
cmdData->chunk = chunk;

core/modules/loader/CentralClient.h

+11-11
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,10 @@ class ClientConfig;
5151
class KeyInfoData : public util::Tracker {
5252
public:
5353
using Ptr = std::shared_ptr<KeyInfoData>;
54-
KeyInfoData(std::string const& key_, int chunk_, int subchunk_) :
54+
KeyInfoData(CompositeKey const& key_, int chunk_, int subchunk_) :
5555
key(key_), chunk(chunk_), subchunk(subchunk_) {}
5656

57-
std::string key;
57+
CompositeKey key;
5858
int chunk;
5959
int subchunk;
6060
bool success{false};
@@ -94,24 +94,24 @@ class CentralClient : public Central {
9494
/// nullptr if CentralClient is already trying to insert the key
9595
/// but value doesn't match the existing value. This indicates
9696
/// there is an input data error.
97-
KeyInfoData::Ptr keyInsertReq(std::string const& key, int chunk, int subchunk);
97+
KeyInfoData::Ptr keyInsertReq(CompositeKey const& key, int chunk, int subchunk);
9898
/// Handle a workers response to the keyInserReq call
9999
void handleKeyInsertComplete(LoaderMsg const& inMsg, BufferUdp::Ptr const& data);
100100

101101
/// Asynchronously request a key value lookup from the workers. It returns a
102102
/// KeyInfoData object to be used to track job status and get the value of the key.
103103
/// This can block if too many key lookup requests are already in progress.
104-
KeyInfoData::Ptr keyInfoReq(std::string const& key);
104+
KeyInfoData::Ptr keyInfoReq(CompositeKey const& key);
105105
/// Handle a workers response to the keyInfoReq call.
106106
void handleKeyInfo(LoaderMsg const& inMsg, BufferUdp::Ptr const& data);
107107

108108
std::string getOurLogId() const override { return "client"; }
109109

110110
private:
111-
void _keyInsertReq(std::string const& key, int chunk, int subchunk); ///< see keyInsertReq()
111+
void _keyInsertReq(CompositeKey const& key, int chunk, int subchunk); ///< see keyInsertReq()
112112
void _handleKeyInsertComplete(LoaderMsg const& inMsg, std::unique_ptr<proto::KeyInfo>& protoBuf);
113113

114-
void _keyInfoReq(std::string const& key); ///< see keyInfoReq()
114+
void _keyInfoReq(CompositeKey const& key); ///< see keyInfoReq()
115115
void _handleKeyInfo(LoaderMsg const& inMsg, std::unique_ptr<proto::KeyInfo>& protoBuf);
116116

117117

@@ -121,7 +121,7 @@ class CentralClient : public Central {
121121
struct KeyInsertReqOneShot : public DoListItem {
122122
using Ptr = std::shared_ptr<KeyInsertReqOneShot>;
123123

124-
KeyInsertReqOneShot(CentralClient* central_, std::string const& key_, int chunk_, int subchunk_) :
124+
KeyInsertReqOneShot(CentralClient* central_, CompositeKey const& key_, int chunk_, int subchunk_) :
125125
cmdData(std::make_shared<KeyInfoData>(key_, chunk_, subchunk_)), central(central_) {
126126
setOneShot(true);
127127
}
@@ -141,13 +141,13 @@ class CentralClient : public Central {
141141
struct KeyInfoReqOneShot : public DoListItem {
142142
using Ptr = std::shared_ptr<KeyInfoReqOneShot>;
143143

144-
KeyInfoReqOneShot(CentralClient* central_, std::string const& key_) :
144+
KeyInfoReqOneShot(CentralClient* central_, CompositeKey const& key_) :
145145
cmdData(std::make_shared<KeyInfoData>(key_, -1, -1)), central(central_) { setOneShot(true); }
146146

147147
util::CommandTracked::Ptr createCommand() override;
148148

149149
// TODO Have this function take result codes as arguments and put them in cmdData.
150-
void keyInfoComplete(std::string const& key, int chunk, int subchunk, bool success);
150+
void keyInfoComplete(CompositeKey const& key, int chunk, int subchunk, bool success);
151151

152152
KeyInfoData::Ptr cmdData;
153153
CentralClient* central;
@@ -168,10 +168,10 @@ class CentralClient : public Central {
168168
size_t _doListMaxInserts; ///< Maximum number of concurrent inserts in DoList DM-16555 &&&
169169
int _maxRequestSleepTime{100000}; ///< Time to sleep between checking requests when at max length &&& add config file entry
170170

171-
std::map<std::string, KeyInsertReqOneShot::Ptr> _waitingKeyInsertMap;
171+
std::map<CompositeKey, KeyInsertReqOneShot::Ptr> _waitingKeyInsertMap;
172172
std::mutex _waitingKeyInsertMtx; ///< protects _waitingKeyInsertMap, _doListMaxInserts
173173

174-
std::map<std::string, KeyInfoReqOneShot::Ptr> _waitingKeyInfoMap; // &&& change all references of keyInfo to keyLookup &&&, including protobuf keyInfo should only apply to worker key count and worker key range.
174+
std::map<CompositeKey, KeyInfoReqOneShot::Ptr> _waitingKeyInfoMap; // &&& change all references of keyInfo to keyLookup &&&, including protobuf keyInfo should only apply to worker key count and worker key range.
175175
std::mutex _waitingKeyInfoMtx; ///< protects _waitingKeyInfoMap, _doListMaxLookups
176176
};
177177

core/modules/loader/CentralWorker.cc

+29-28
Original file line numberDiff line numberDiff line change
@@ -242,14 +242,14 @@ bool CentralWorker::_determineRange() {
242242
nInfoR.keyCount = protoItem->mapsize();
243243
_neighborRight.setKeyCount(nInfoR.keyCount); // TODO add a timestamp to this data.
244244
nInfoR.recentAdds = protoItem->recentadds();
245-
proto::WorkerRangeString protoRange = protoItem->range();
245+
proto::WorkerRange protoRange = protoItem->range();
246246
LOGS(_log, LOG_LVL_INFO, funcName << " rightNeighbor workerId=" << workerId <<
247247
" keyCount=" << nInfoR.keyCount << " recentAdds=" << nInfoR.recentAdds);
248248
bool valid = protoRange.valid();
249249
StringRange rightRange;
250250
if (valid) {
251-
std::string min = protoRange.min();
252-
std::string max = protoRange.max();
251+
CompositeKey min(protoRange.minint(), protoRange.minstr());
252+
CompositeKey max(protoRange.maxint(), protoRange.maxstr());
253253
bool unlimited = protoRange.maxunlimited();
254254
rightRange.setMinMax(min, max, unlimited);
255255
LOGS(_log, LOG_LVL_INFO, funcName << " rightRange=" << rightRange);
@@ -386,7 +386,7 @@ void CentralWorker::_shift(Direction direction, int keysToShift) {
386386
for (int j=0; j < sz; ++j) {
387387
proto::KeyInfo const& protoKI = protoKeyList->keypair(j);
388388
ChunkSubchunk chSub(protoKI.chunk(), protoKI.subchunk());
389-
keyList.push_back(std::make_pair(protoKI.key(), chSub));
389+
keyList.push_back(std::make_pair(CompositeKey(protoKI.keyint(), protoKI.keystr()), chSub));
390390
}
391391
insertKeys(keyList, false);
392392
}
@@ -403,8 +403,8 @@ void CentralWorker::_shift(Direction direction, int keysToShift) {
403403
// Construct a message with that many keys and send it (sending the highest keys)
404404
proto::KeyList protoKeyList;
405405
protoKeyList.set_keycount(keysToShift);
406-
std::string minKey(""); // smallest value of a key sent to right neighbor
407-
std::string maxKey("");
406+
CompositeKey minKey = CompositeKey::minValue(); // smallest value of a key is sent to right neighbor
407+
CompositeKey maxKey = CompositeKey::minValue();
408408
{
409409
std::lock_guard<std::mutex> lck(_idMapMtx);
410410
if (not _transferListToRight.empty()) {
@@ -420,7 +420,8 @@ void CentralWorker::_shift(Direction direction, int keysToShift) {
420420
_transferListToRight.push_back(std::make_pair(iter->first, iter->second));
421421
proto::KeyInfo* protoKI = protoKeyList.add_keypair();
422422
minKey = iter->first;
423-
protoKI->set_key(minKey);
423+
protoKI->set_keyint(minKey.kInt);
424+
protoKI->set_keystr(minKey.kStr);
424425
protoKI->set_chunk(iter->second.chunk);
425426
protoKI->set_subchunk(iter->second.subchunk);
426427
_keyValueMap.erase(iter);
@@ -472,8 +473,8 @@ void CentralWorker::finishShiftFromRight() {
472473
StringElement::UPtr CentralWorker::buildKeyList(int keysToShift) {
473474
std::string funcName = "CentralWorker::buildKeyList";
474475
proto::KeyList protoKeyList;
475-
std::string minKey(""); // smallest key sent
476-
std::string maxKey(""); // largest key sent
476+
CompositeKey minKey = CompositeKey::minValue(); // smallest key sent
477+
CompositeKey maxKey = CompositeKey::minValue(); // largest key sent
477478
{
478479
LOGS(_log, LOG_LVL_INFO, funcName);
479480
std::lock_guard<std::mutex> lck(_idMapMtx);
@@ -492,15 +493,16 @@ StringElement::UPtr CentralWorker::buildKeyList(int keysToShift) {
492493
_transferListWithLeft.push_back(std::make_pair(iter->first, iter->second));
493494
proto::KeyInfo* protoKI = protoKeyList.add_keypair();
494495
maxKey = iter->first;
495-
protoKI->set_key(maxKey);
496+
protoKI->set_keyint(maxKey.kInt);
497+
protoKI->set_keystr(maxKey.kStr);
496498
protoKI->set_chunk(iter->second.chunk);
497499
protoKI->set_subchunk(iter->second.subchunk);
498500
_keyValueMap.erase(iter);
499501
}
500502
// Adjust our range;
501503
auto iter = _keyValueMap.begin();
502504
auto minKey = _strRange.getMin();
503-
if (minKey != "") {
505+
if (minKey != CompositeKey::minValue()) {
504506
if (iter->first != minKey) {
505507
_strRange.setMin(iter->first);
506508
_rangeChanged = true;
@@ -625,7 +627,7 @@ void CentralWorker::cancelShiftsWithLeftNeighbor() {
625627
_transferListWithLeft.clear();
626628

627629
// Fix the bottom of the range.
628-
if (_strRange.getMin() != "") {
630+
if (_strRange.getMin() != CompositeKey::minValue()) {
629631
_strRange.setMin(_keyValueMap.begin()->first);
630632
}
631633
}
@@ -667,12 +669,12 @@ void CentralWorker::_workerInfoReceive(std::unique_ptr<proto::WorkerListItem>& p
667669
portTcp = protoAddr.tcpport();
668670
}
669671
StringRange strRange;
670-
if (protoList->has_rangestr()) {
671-
proto::WorkerRangeString protoRange= protoList->rangestr();
672+
if (protoList->has_range()) {
673+
proto::WorkerRange protoRange = protoList->range();
672674
bool valid = protoRange.valid();
673675
if (valid) {
674-
std::string min = protoRange.min();
675-
std::string max = protoRange.max();
676+
CompositeKey min(protoRange.minint(), protoRange.minstr());
677+
CompositeKey max(protoRange.maxint(), protoRange.maxstr());
676678
bool unlimited = protoRange.maxunlimited();
677679
strRange.setMinMax(min, max, unlimited);
678680
}
@@ -717,7 +719,7 @@ StringRange CentralWorker::updateRangeWithLeftData(StringRange const& leftNeighb
717719
std::unique_lock<std::mutex> lck(_idMapMtx);
718720
if (not _strRange.getValid()) {
719721
// Our range has not been set, so base it on the range of the left neighbor.
720-
auto min = StringRange::incrementString(leftNeighborRange.getMax());
722+
auto min = StringRange::increment(leftNeighborRange.getMax());
721723
auto max = min;
722724
_strRange.setMinMax(min, max, leftNeighborRange.getUnlimited());
723725
newLeftNeighborRange.setMax(max, false);
@@ -770,7 +772,7 @@ void CentralWorker::_workerKeyInsertReq(LoaderMsg const& inMsg, std::unique_ptr<
770772
NetworkAddress nAddr(protoAddr.ip(), protoAddr.udpport());
771773

772774
proto::KeyInfo protoKeyInfo = protoData->keyinfo();
773-
std::string key = protoKeyInfo.key();
775+
CompositeKey key(protoKeyInfo.keyint(), protoKeyInfo.keystr());
774776
ChunkSubchunk chunkInfo(protoKeyInfo.chunk(), protoKeyInfo.subchunk());
775777

776778
/// see if the key should be inserted into our map
@@ -794,7 +796,8 @@ void CentralWorker::_workerKeyInsertReq(LoaderMsg const& inMsg, std::unique_ptr<
794796
msg.appendToData(msgData);
795797
// protoKeyInfo should still be the same
796798
proto::KeyInfo protoReply;
797-
protoReply.set_key(key);
799+
protoReply.set_keyint(key.kInt);
800+
protoReply.set_keystr(key.kStr);
798801
protoReply.set_chunk(chunkInfo.chunk);
799802
protoReply.set_subchunk(chunkInfo.subchunk);
800803
StringElement strElem;
@@ -824,7 +827,7 @@ void CentralWorker::_forwardKeyInsertRequest(NetworkAddress const& targetAddr, L
824827
std::unique_ptr<proto::KeyInfoInsert>& protoData) {
825828
// Aside from hops, the proto buffer should be the same.
826829
proto::KeyInfo protoKeyInfo = protoData->keyinfo();
827-
auto key = protoKeyInfo.key();
830+
CompositeKey key(protoKeyInfo.keyint(), protoKeyInfo.keystr());
828831
// The proto buffer should be the same, just need a new message.
829832
int hops = protoData->hops() + 1;
830833
if (hops > 4) { // TODO replace magic number with variable set via config file.
@@ -870,7 +873,7 @@ void CentralWorker::_workerKeyInfoReq(LoaderMsg const& inMsg, std::unique_ptr<pr
870873
NetworkAddress nAddr(protoAddr.ip(), protoAddr.udpport());
871874

872875
proto::KeyInfo protoKeyInfo = protoData->keyinfo();
873-
std::string key = protoKeyInfo.key();
876+
CompositeKey key(protoKeyInfo.keyint(), protoKeyInfo.keystr());
874877

875878
/// see if the key is in our map
876879
std::unique_lock<std::mutex> lck(_idMapMtx);
@@ -885,7 +888,8 @@ void CentralWorker::_workerKeyInfoReq(LoaderMsg const& inMsg, std::unique_ptr<pr
885888
BufferUdp msgData;
886889
msg.appendToData(msgData);
887890
proto::KeyInfo protoReply;
888-
protoReply.set_key(key);
891+
protoReply.set_keyint(key.kInt);
892+
protoReply.set_keystr(key.kStr);
889893
if (iter == _keyValueMap.end()) {
890894
// key not found message.
891895
protoReply.set_chunk(0);
@@ -1005,12 +1009,9 @@ std::unique_ptr<proto::WorkerKeysInfo> CentralWorker::_workerKeysInfoBuilder() {
10051009
protoWKI->set_wid(_ourId);
10061010
protoWKI->set_mapsize(mapSize);
10071011
protoWKI->set_recentadds(recentAdds);
1008-
// TODO make a function to load WorkerRangeString, happens a bit.
1009-
proto::WorkerRangeString *protoRange = protoWKI->mutable_range();
1010-
protoRange->set_valid(range.getValid());
1011-
protoRange->set_min(range.getMin());
1012-
protoRange->set_max(range.getMax());
1013-
protoRange->set_maxunlimited(range.getUnlimited());
1012+
// TODO Maybe make a function to load WorkerRangeString, happens a bit. &&&
1013+
proto::WorkerRange *protoRange = protoWKI->mutable_range();
1014+
range.loadProtoRange(*protoRange);
10141015
proto::Neighbor *protoLeft = protoWKI->mutable_left();
10151016
protoLeft->set_wid(_neighborLeft.getId());
10161017
proto::Neighbor *protoRight = protoWKI->mutable_right();

core/modules/loader/CentralWorker.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class CentralWorkerDoListItem;
5353

5454
class CentralWorker : public Central {
5555
public:
56-
typedef std::pair<std::string, ChunkSubchunk> StringKeyPair;
56+
typedef std::pair<CompositeKey, ChunkSubchunk> StringKeyPair; // &&& rename CompositeKeyPair
5757

5858
enum SocketStatus {
5959
VOID0 = 0,
@@ -237,7 +237,7 @@ class CentralWorker : public Central {
237237

238238
StringRange _strRange; ///< range for this worker TODO _range both int and string;
239239
std::atomic<bool> _rangeChanged{false};
240-
std::map<std::string, ChunkSubchunk> _keyValueMap;
240+
std::map<CompositeKey, ChunkSubchunk> _keyValueMap;
241241
std::deque<std::chrono::system_clock::time_point> _recentAdds; ///< track how many keys added recently.
242242
std::chrono::milliseconds _recentAddLimit; ///< After this period of time, additions are no longer recent.
243243
std::mutex _idMapMtx; ///< protects _strRange, _keyValueMap,

core/modules/loader/ClientConfig.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ namespace loader {
3636
///
3737
class ClientConfig : public ConfigBase {
3838
public:
39-
ClientConfig(std::string const& configFileName)
39+
explicit ClientConfig(std::string const& configFileName)
4040
: ClientConfig(util::ConfigStore(configFileName)) {}
4141

4242
ClientConfig() = delete;

0 commit comments

Comments
 (0)