Skip to content

Commit 844568b

Browse files
jgates108fritzm
authored andcommitted
Merge pull request #964 from lsst/tickets/DM-51870
tickets/DM-51870
2 parents fcd3698 + 2936e96 commit 844568b

31 files changed

+812
-855
lines changed

src/czar/ActiveWorker.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ void ActiveWorker::updateStateAndSendMessages(double timeoutAliveSecs, double ti
139139
{
140140
// Go through the _qIdDoneKeepFiles, _qIdDoneDeleteFiles, and _qIdDeadUberJobs lists to build a
141141
// message to send to the worker.
142-
jsWorkerReqPtr = _wqsData->serializeJson(maxLifetime);
142+
jsWorkerReqPtr = _wqsData->toJson(maxLifetime);
143143
}
144144

145145
// Always send the message as it's a way to inform the worker that this

src/protojson/ScanTableInfo.cc

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ void ScanInfo::sortTablesSlowestFirst() {
9898
std::sort(infoTables.begin(), infoTables.end(), func);
9999
}
100100

101-
nlohmann::json ScanInfo::serializeJson() const {
101+
nlohmann::json ScanInfo::toJson() const {
102102
auto jsScanInfo = json({{"infoscanrating", scanRating}, {"infotables", json::array()}});
103103

104104
auto& jsInfoTables = jsScanInfo["infotables"];
@@ -127,6 +127,7 @@ ScanInfo::Ptr ScanInfo::createFromJson(nlohmann::json const& siJson) {
127127
auto lockInMem = http::RequestBodyJSON::required<bool>(jsElem, "silockinmem");
128128
iTbls.emplace_back(db, table, lockInMem, sRating);
129129
}
130+
siPtr->sortTablesSlowestFirst();
130131

131132
return siPtr;
132133
}
@@ -137,8 +138,13 @@ std::ostream& operator<<(std::ostream& os, ScanTableInfo const& tbl) {
137138
return os;
138139
}
139140

141+
std::ostream& ScanInfo::dump(std::ostream& os) const {
142+
os << "ScanInfo{speed=" << scanRating << " tables: " << util::printable(infoTables) << "}";
143+
return os;
144+
}
145+
140146
std::ostream& operator<<(std::ostream& os, ScanInfo const& info) {
141-
os << "ScanInfo{speed=" << info.scanRating << " tables: " << util::printable(info.infoTables) << "}";
147+
info.dump(os);
142148
return os;
143149
}
144150

src/protojson/ScanTableInfo.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,15 @@ class ScanInfo {
7171
static Ptr createFromJson(nlohmann::json const& ujJson);
7272

7373
/// Return a json version of the contents of this class.
74-
nlohmann::json serializeJson() const;
74+
nlohmann::json toJson() const;
7575

7676
void sortTablesSlowestFirst();
7777
int compareTables(ScanInfo const& rhs);
7878

7979
ScanTableInfo::ListOf infoTables;
8080
int scanRating{Rating::FASTEST};
81+
82+
std::ostream& dump(std::ostream& os) const;
8183
};
8284

8385
std::ostream& operator<<(std::ostream& os, ScanTableInfo const& tbl);

src/protojson/UberJobMsg.cc

Lines changed: 28 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -70,25 +70,25 @@ UberJobMsg::UberJobMsg(unsigned int metaVersion, std::string const& replicationI
7070
}
7171
}
7272

73-
json UberJobMsg::serializeJson() const {
73+
json UberJobMsg::toJson() const {
7474
json ujmJson = {{"version", _metaVersion},
7575
{"instance_id", _replicationInstanceId},
7676
{"auth_key", _replicationAuthKey},
7777
{"worker", _workerId},
7878
{"queryid", _qId},
7979
{"uberjobid", _ujId},
80-
{"czarinfo", _czInfo->serializeJson()},
80+
{"czarinfo", _czInfo->toJson()},
8181
{"rowlimit", _rowLimit},
82-
{"subqueries_map", _jobSubQueryTempMap->serializeJson()},
83-
{"dbtables_map", _jobDbTablesMap->serializeJson()},
82+
{"subqueries_map", _jobSubQueryTempMap->toJson()},
83+
{"dbtables_map", _jobDbTablesMap->toJson()},
8484
{"maxtablesizemb", _maxTableSizeMB},
85-
{"scaninfo", _scanInfo->serializeJson()},
85+
{"scaninfo", _scanInfo->toJson()},
8686
{"scaninteractive", _scanInteractive},
8787
{"jobs", json::array()}};
8888

8989
auto& jsJobs = ujmJson["jobs"];
9090
for (auto const& jbMsg : *_jobMsgVect) {
91-
jsJobs.emplace_back(jbMsg->serializeJson());
91+
jsJobs.emplace_back(jbMsg->toJson());
9292
}
9393

9494
LOGS(_log, LOG_LVL_TRACE, cName(__func__) << " ujmJson=" << ujmJson);
@@ -138,7 +138,7 @@ UberJobMsg::Ptr UberJobMsg::createFromJson(nlohmann::json const& ujmJson) {
138138
ujmPtr->_jobSubQueryTempMap = JobSubQueryTempMap::createFromJson(jsSubQueriesMap);
139139

140140
auto jsDbTablesMap = http::RequestBodyJSON::required<json>(ujmJson, "dbtables_map");
141-
ujmPtr->_jobDbTablesMap = JobDbTablesMap::createFromJson(jsDbTablesMap);
141+
ujmPtr->_jobDbTablesMap = JobDbTableMap::createFromJson(jsDbTablesMap);
142142

143143
for (auto const& jsUjJob : jsUjJobs) {
144144
JobMsg::Ptr jobMsgPtr =
@@ -154,13 +154,13 @@ UberJobMsg::Ptr UberJobMsg::createFromJson(nlohmann::json const& ujmJson) {
154154

155155
JobMsg::Ptr JobMsg::create(std::shared_ptr<qdisp::JobQuery> const& jobPtr,
156156
JobSubQueryTempMap::Ptr const& jobSubQueryTempMap,
157-
JobDbTablesMap::Ptr const& jobDbTablesMap) {
157+
JobDbTableMap::Ptr const& jobDbTablesMap) {
158158
auto jMsg = Ptr(new JobMsg(jobPtr, jobSubQueryTempMap, jobDbTablesMap));
159159
return jMsg;
160160
}
161161

162162
JobMsg::JobMsg(std::shared_ptr<qdisp::JobQuery> const& jobPtr,
163-
JobSubQueryTempMap::Ptr const& jobSubQueryTempMap, JobDbTablesMap::Ptr const& jobDbTablesMap)
163+
JobSubQueryTempMap::Ptr const& jobSubQueryTempMap, JobDbTableMap::Ptr const& jobDbTablesMap)
164164
: _jobSubQueryTempMap(jobSubQueryTempMap), _jobDbTablesMap(jobDbTablesMap) {
165165
auto const descr = jobPtr->getDescription();
166166
if (descr == nullptr) {
@@ -172,18 +172,11 @@ JobMsg::JobMsg(std::shared_ptr<qdisp::JobQuery> const& jobPtr,
172172
_chunkQuerySpecDb = chunkQuerySpec->db;
173173
_chunkId = chunkQuerySpec->chunkId;
174174

175-
// Add scan tables (TODO:UJ Verify this is the same for all jobs.)
176-
for (auto const& sTbl : chunkQuerySpec->scanInfo->infoTables) {
177-
int index = jobDbTablesMap->findDbTable(make_pair(sTbl.db, sTbl.table));
178-
jobDbTablesMap->setScanRating(index, sTbl.scanRating, sTbl.lockInMemory);
179-
_chunkScanTableIndexes.push_back(index);
180-
}
181-
182175
// Add fragments
183176
_jobFragments = JobFragment::createVect(*chunkQuerySpec, jobSubQueryTempMap, jobDbTablesMap);
184177
}
185178

186-
nlohmann::json JobMsg::serializeJson() const {
179+
nlohmann::json JobMsg::toJson() const {
187180
auto jsJobMsg = nlohmann::json({{"jobId", _jobId},
188181
{"attemptCount", _attemptCount},
189182
{"querySpecDb", _chunkQuerySpecDb},
@@ -200,13 +193,13 @@ nlohmann::json JobMsg::serializeJson() const {
200193

201194
auto& jsqFrags = jsJobMsg["queryFragments"];
202195
for (auto& jFrag : *_jobFragments) {
203-
jsqFrags.emplace_back(jFrag->serializeJson());
196+
jsqFrags.emplace_back(jFrag->toJson());
204197
}
205198

206199
return jsJobMsg;
207200
}
208201

209-
JobMsg::JobMsg(JobSubQueryTempMap::Ptr const& jobSubQueryTempMap, JobDbTablesMap::Ptr const& jobDbTablesMap,
202+
JobMsg::JobMsg(JobSubQueryTempMap::Ptr const& jobSubQueryTempMap, JobDbTableMap::Ptr const& jobDbTablesMap,
210203
JobId jobId, int attemptCount, std::string const& chunkQuerySpecDb, int chunkId)
211204
: _jobId(jobId),
212205
_attemptCount(attemptCount),
@@ -217,7 +210,7 @@ JobMsg::JobMsg(JobSubQueryTempMap::Ptr const& jobSubQueryTempMap, JobDbTablesMap
217210

218211
JobMsg::Ptr JobMsg::createFromJson(nlohmann::json const& ujJson,
219212
JobSubQueryTempMap::Ptr const& jobSubQueryTempMap,
220-
JobDbTablesMap::Ptr const& jobDbTablesMap) {
213+
JobDbTableMap::Ptr const& jobDbTablesMap) {
221214
JobId jobId = http::RequestBodyJSON::required<JobId>(ujJson, "jobId");
222215
int attemptCount = http::RequestBodyJSON::required<int>(ujJson, "attemptCount");
223216
string chunkQuerySpecDb = http::RequestBodyJSON::required<string>(ujJson, "querySpecDb");
@@ -231,11 +224,10 @@ JobMsg::Ptr JobMsg::createFromJson(nlohmann::json const& ujJson,
231224
jMsgPtr->_chunkScanTableIndexes = jsChunkTblIndexes.get<std::vector<int>>();
232225
jMsgPtr->_jobFragments =
233226
JobFragment::createVectFromJson(jsQFrags, jMsgPtr->_jobSubQueryTempMap, jMsgPtr->_jobDbTablesMap);
234-
235227
return jMsgPtr;
236228
}
237229

238-
json JobSubQueryTempMap::serializeJson() const {
230+
json JobSubQueryTempMap::toJson() const {
239231
// std::map<int, std::string> _qTemplateMap;
240232
json jsSubQueryTemplateMap = {{"subquerytemplate_map", json::array()}};
241233
auto& jsSqtMap = jsSubQueryTemplateMap["subquerytemplate_map"];
@@ -280,7 +272,7 @@ int JobSubQueryTempMap::findSubQueryTemp(string const& qTemp) {
280272
return index;
281273
}
282274

283-
int JobDbTablesMap::findDbTable(pair<string, string> const& dbTablePair) {
275+
int JobDbTableMap::findDbTable(pair<string, string> const& dbTablePair) {
284276
// The expected number of templates is expected to be small, less than 4,
285277
// so this shouldn't be horribly expensive.
286278
for (auto const& [key, dbTbl] : _dbTableMap) {
@@ -295,89 +287,44 @@ int JobDbTablesMap::findDbTable(pair<string, string> const& dbTablePair) {
295287
return index;
296288
}
297289

298-
json JobDbTablesMap::serializeJson() const {
299-
json jsDbTablesMap = {{"dbtable_map", json::array()}, {"scanrating_map", json::array()}};
300-
301-
auto& jsDbTblMap = jsDbTablesMap["dbtable_map"];
290+
json JobDbTableMap::toJson() const {
291+
auto jsDbTblMap = json::array();
302292
for (auto const& [key, valPair] : _dbTableMap) {
303293
json jsDbTbl = {{"index", key}, {"db", valPair.first}, {"table", valPair.second}};
304294
jsDbTblMap.push_back(jsDbTbl);
305295
}
306296

307-
auto& jsScanRatingMap = jsDbTablesMap["scanrating_map"];
308-
for (auto const& [key, valPair] : _scanRatingMap) {
309-
json jsScanR = {{"index", key}, {"scanrating", valPair.first}, {"lockinmem", valPair.second}};
310-
jsScanRatingMap.push_back(jsScanR);
311-
}
312-
313-
LOGS(_log, LOG_LVL_TRACE, cName(__func__) << " " << jsDbTablesMap);
314-
return jsDbTablesMap;
297+
LOGS(_log, LOG_LVL_TRACE, cName(__func__) << " " << jsDbTblMap);
298+
return jsDbTblMap;
315299
}
316300

317-
JobDbTablesMap::Ptr JobDbTablesMap::createFromJson(nlohmann::json const& ujJson) {
301+
JobDbTableMap::Ptr JobDbTableMap::createFromJson(nlohmann::json const& ujJson) {
318302
Ptr dbTablesMapPtr = create();
319303
auto& dbTblMap = dbTablesMapPtr->_dbTableMap;
320-
auto& scanRMap = dbTablesMapPtr->_scanRatingMap;
321304

322-
LOGS(_log, LOG_LVL_TRACE, "JobDbTablesMap::createFromJson " << ujJson);
305+
LOGS(_log, LOG_LVL_TRACE, "JobDbTableMap::createFromJson " << ujJson);
323306

324-
json const& jsDbTbl = ujJson["dbtable_map"];
325-
for (auto const& jsElem : jsDbTbl) {
307+
for (auto const& jsElem : ujJson) {
326308
int index = http::RequestBodyJSON::required<int>(jsElem, "index");
327309
string db = http::RequestBodyJSON::required<string>(jsElem, "db");
328310
string tbl = http::RequestBodyJSON::required<string>(jsElem, "table");
329311
auto res = dbTblMap.insert(make_pair(index, make_pair(db, tbl)));
330312
if (!res.second) {
331313
throw invalid_argument(dbTablesMapPtr->cName(__func__) + " index=" + to_string(index) + "=" + db +
332-
+"." + tbl + " index already found in " + to_string(jsDbTbl));
333-
}
334-
}
335-
336-
json const& jsScanR = ujJson["scanrating_map"];
337-
for (auto const& jsElem : jsScanR) {
338-
int index = http::RequestBodyJSON::required<int>(jsElem, "index");
339-
int scanR = http::RequestBodyJSON::required<int>(jsElem, "scanrating");
340-
bool lockInMem = http::RequestBodyJSON::required<bool>(jsElem, "lockinmem");
341-
auto res = scanRMap.insert(make_pair(index, make_pair(scanR, lockInMem)));
342-
if (!res.second) {
343-
throw invalid_argument(dbTablesMapPtr->cName(__func__) + " index=" + to_string(index) + "=" +
344-
to_string(scanR) + +", " + to_string(lockInMem) +
345-
" index already found in " + to_string(jsDbTbl));
314+
+"." + tbl + " index already found in " + to_string(ujJson));
346315
}
347316
}
348317

349318
return dbTablesMapPtr;
350319
}
351320

352-
void JobDbTablesMap::setScanRating(int index, int scanRating, bool lockInMemory) {
353-
auto iter = _scanRatingMap.find(index);
354-
if (iter == _scanRatingMap.end()) {
355-
_scanRatingMap[index] = make_pair(scanRating, lockInMemory);
356-
} else {
357-
auto& elem = *iter;
358-
auto& pr = elem.second;
359-
auto& [sRating, lInMem] = pr;
360-
if (sRating != scanRating || lInMem != lockInMemory) {
361-
auto [dbName, tblName] = getDbTable(index);
362-
LOGS(_log, LOG_LVL_ERROR,
363-
cName(__func__) << " unexpected change in scanRating for " << dbName << "." << tblName
364-
<< " from " << sRating << " to " << scanRating << " lockInMemory from "
365-
<< lInMem << " to " << lockInMemory);
366-
if (scanRating > sRating) {
367-
sRating = scanRating;
368-
lInMem = lockInMemory;
369-
}
370-
}
371-
}
372-
}
373-
374321
JobFragment::JobFragment(JobSubQueryTempMap::Ptr const& jobSubQueryTempMap,
375-
JobDbTablesMap::Ptr const& jobDbTablesMap)
322+
JobDbTableMap::Ptr const& jobDbTablesMap)
376323
: _jobSubQueryTempMap(jobSubQueryTempMap), _jobDbTablesMap(jobDbTablesMap) {}
377324

378325
JobFragment::VectPtr JobFragment::createVect(qproc::ChunkQuerySpec const& chunkQuerySpec,
379326
JobSubQueryTempMap::Ptr const& jobSubQueryTempMap,
380-
JobDbTablesMap::Ptr const& jobDbTablesMap) {
327+
JobDbTableMap::Ptr const& jobDbTablesMap) {
381328
VectPtr jFragments{new Vect()};
382329
if (chunkQuerySpec.nextFragment.get()) {
383330
qproc::ChunkQuerySpec const* sPtr = &chunkQuerySpec;
@@ -401,7 +348,7 @@ JobFragment::VectPtr JobFragment::createVect(qproc::ChunkQuerySpec const& chunkQ
401348
void JobFragment::_addFragment(std::vector<Ptr>& jFragments, DbTableSet const& subChunkTables,
402349
std::vector<int> const& subchunkIds, std::vector<std::string> const& queries,
403350
JobSubQueryTempMap::Ptr const& subQueryTemplates,
404-
JobDbTablesMap::Ptr const& dbTablesMap) {
351+
JobDbTableMap::Ptr const& dbTablesMap) {
405352
LOGS(_log, LOG_LVL_TRACE, "JobFragment::_addFragment start");
406353
Ptr jFrag = Ptr(new JobFragment(subQueryTemplates, dbTablesMap));
407354

@@ -448,7 +395,7 @@ string JobFragment::dump() const {
448395
return os.str();
449396
}
450397

451-
nlohmann::json JobFragment::serializeJson() const {
398+
nlohmann::json JobFragment::toJson() const {
452399
json jsFragment = {{"subquerytemplate_indexes", _jobSubQueryTempIndexes},
453400
{"dbtables_indexes", _jobDbTablesIndexes},
454401
{"subchunkids", _subchunkIds}};
@@ -459,7 +406,7 @@ nlohmann::json JobFragment::serializeJson() const {
459406

460407
JobFragment::VectPtr JobFragment::createVectFromJson(nlohmann::json const& jsFrags,
461408
JobSubQueryTempMap::Ptr const& jobSubQueryTempMap,
462-
JobDbTablesMap::Ptr const& dbTablesMap) {
409+
JobDbTableMap::Ptr const& dbTablesMap) {
463410
LOGS(_log, LOG_LVL_TRACE, "JobFragment::createVectFromJson " << jsFrags);
464411

465412
JobFragment::VectPtr jobFragments{new JobFragment::Vect()};

0 commit comments

Comments
 (0)