Skip to content

Commit da02c98

Browse files
committed
TEMPORARY: commented out log statement in Czar query dispatching and result processing
1 parent ad33f42 commit da02c98

27 files changed

+308
-297
lines changed

src/ccontrol/MergingHandler.cc

+20-18
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ bool readXrootFileResourceAndMerge(string const& xrootUrl,
118118
function<bool(char const*, uint32_t, bool&)> const& messageIsReady) {
119119
string const context = "MergingHandler::" + string(__func__) + " ";
120120

121-
LOGS(_log, LOG_LVL_DEBUG, context << "xrootUrl=" << xrootUrl);
121+
// LOGS(_log, LOG_LVL_DEBUG, context << "xrootUrl=" << xrootUrl);
122122

123123
// Track the file while the control flow is staying within the function.
124124
ResultFileTracker const resultFileTracker;
@@ -220,17 +220,17 @@ bool readXrootFileResourceAndMerge(string const& xrootUrl,
220220
}
221221
status = file.Close();
222222
if (!status.IsOK()) {
223-
LOGS(_log, LOG_LVL_WARN,
224-
context << "failed to close " << xrootUrl << ", " << xrootdStatus2str(status));
223+
// LOGS(_log, LOG_LVL_WARN,
224+
// context << "failed to close " << xrootUrl << ", " << xrootdStatus2str(status));
225225
}
226226

227227
// Remove the file from the worker if it still exists. Report and ignore errors.
228228
// The files will be garbage-collected by workers.
229229
XrdCl::FileSystem fileSystem(xrootUrl);
230230
status = fileSystem.Rm(xrootUrl2path(xrootUrl));
231231
if (!status.IsOK()) {
232-
LOGS(_log, LOG_LVL_WARN,
233-
context << "failed to remove " << xrootUrl << ", " << xrootdStatus2str(status));
232+
// LOGS(_log, LOG_LVL_WARN,
233+
// context << "failed to remove " << xrootUrl << ", " << xrootdStatus2str(status));
234234
}
235235
return success;
236236
}
@@ -240,7 +240,7 @@ bool readHttpFileAndMerge(string const& httpUrl,
240240
shared_ptr<http::ClientConnPool> const& httpConnPool) {
241241
string const context = "MergingHandler::" + string(__func__) + " ";
242242

243-
LOGS(_log, LOG_LVL_DEBUG, context << "httpUrl=" << httpUrl);
243+
// LOGS(_log, LOG_LVL_DEBUG, context << "httpUrl=" << httpUrl);
244244

245245
// Track the file while the control flow is staying within the function.
246246
ResultFileTracker const resultFileTracker;
@@ -370,7 +370,7 @@ bool readHttpFileAndMerge(string const& httpUrl,
370370
http::Client remover(http::Method::DELETE, httpUrl);
371371
remover.read([](char const* inBuf, size_t inBufSize) {});
372372
} catch (exception const& ex) {
373-
LOGS(_log, LOG_LVL_WARN, context << "failed to remove " << httpUrl << ", ex: " << ex.what());
373+
// LOGS(_log, LOG_LVL_WARN, context << "failed to remove " << httpUrl << ", ex: " << ex.what());
374374
}
375375
return success;
376376
}
@@ -396,7 +396,9 @@ MergingHandler::MergingHandler(std::shared_ptr<rproc::InfileMerger> merger, std:
396396
_initState();
397397
}
398398

399-
MergingHandler::~MergingHandler() { LOGS(_log, LOG_LVL_DEBUG, __func__); }
399+
MergingHandler::~MergingHandler() {
400+
// LOGS(_log, LOG_LVL_DEBUG, __func__);
401+
}
400402

401403
bool MergingHandler::flush(proto::ResponseSummary const& responseSummary, uint32_t& resultRows) {
402404
_wName = responseSummary.wname();
@@ -405,15 +407,15 @@ bool MergingHandler::flush(proto::ResponseSummary const& responseSummary, uint32
405407
// of the operation to prevent inconsistency witin the application.
406408
auto const jobQuery = getJobQuery().lock();
407409
if (jobQuery == nullptr) {
408-
LOGS(_log, LOG_LVL_ERROR, __func__ << " failed, jobQuery was NULL");
410+
// LOGS(_log, LOG_LVL_ERROR, __func__ << " failed, jobQuery was NULL");
409411
return false;
410412
}
411-
LOGS(_log, LOG_LVL_TRACE,
412-
"MergingHandler::" << __func__ << " jobid=" << responseSummary.jobid()
413-
<< " transmitsize=" << responseSummary.transmitsize()
414-
<< " rowcount=" << responseSummary.rowcount() << " rowSize="
415-
<< " attemptcount=" << responseSummary.attemptcount() << " errorcode="
416-
<< responseSummary.errorcode() << " errormsg=" << responseSummary.errormsg());
413+
// LOGS(_log, LOG_LVL_TRACE,
414+
// "MergingHandler::" << __func__ << " jobid=" << responseSummary.jobid()
415+
// << " transmitsize=" << responseSummary.transmitsize()
416+
// << " rowcount=" << responseSummary.rowcount() << " rowSize="
417+
// << " attemptcount=" << responseSummary.attemptcount() << " errorcode="
418+
// << responseSummary.errorcode() << " errormsg=" << responseSummary.errormsg());
417419

418420
if (responseSummary.errorcode() != 0 || !responseSummary.errormsg().empty()) {
419421
_error = util::Error(responseSummary.errorcode(), responseSummary.errormsg(),
@@ -463,7 +465,7 @@ void MergingHandler::errorFlush(std::string const& msg, int code) {
463465
_setError(code, msg);
464466
// Might want more info from result service.
465467
// Do something about the error. FIXME.
466-
LOGS(_log, LOG_LVL_ERROR, "Error receiving result.");
468+
// LOGS(_log, LOG_LVL_ERROR, "Error receiving result.");
467469
}
468470

469471
bool MergingHandler::finished() const { return _flushed; }
@@ -501,15 +503,15 @@ bool MergingHandler::_merge(proto::ResponseSummary const& responseSummary,
501503
}
502504
bool success = _infileMerger->merge(responseSummary, responseData, jobQuery);
503505
if (!success) {
504-
LOGS(_log, LOG_LVL_WARN, __func__ << " failed");
506+
// LOGS(_log, LOG_LVL_WARN, __func__ << " failed");
505507
util::Error const& err = _infileMerger->getError();
506508
_setError(ccontrol::MSG_RESULT_ERROR, err.getMsg());
507509
}
508510
return success;
509511
}
510512

511513
void MergingHandler::_setError(int code, std::string const& msg) {
512-
LOGS(_log, LOG_LVL_DEBUG, "_setErr: code: " << code << ", message: " << msg);
514+
// LOGS(_log, LOG_LVL_DEBUG, "_setErr: code: " << code << ", message: " << msg);
513515
std::lock_guard<std::mutex> lock(_errorMutex);
514516
_error = Error(code, msg);
515517
}

src/ccontrol/UserQuerySelect.cc

+31-30
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ std::string UserQuerySelect::getError() const {
157157

158158
/// Attempt to kill in progress.
159159
void UserQuerySelect::kill() {
160-
LOGS(_log, LOG_LVL_DEBUG, "UserQuerySelect kill");
160+
// LOGS(_log, LOG_LVL_DEBUG, "UserQuerySelect kill");
161161
std::lock_guard<std::mutex> lock(_killMutex);
162162
if (!_killed) {
163163
_killed = true;
@@ -228,7 +228,7 @@ std::string UserQuerySelect::getResultQuery() const {
228228
if (not orderBy.empty()) {
229229
resultQuery += " " + orderBy;
230230
}
231-
LOGS(_log, LOG_LVL_DEBUG, "made result query:" << resultQuery);
231+
// LOGS(_log, LOG_LVL_DEBUG, "made result query:" << resultQuery);
232232
return resultQuery;
233233
}
234234

@@ -238,7 +238,7 @@ void UserQuerySelect::submit() {
238238

239239
// Using the QuerySession, generate query specs (text, db, chunkId) and then
240240
// create query messages and send them to the async query manager.
241-
LOGS(_log, LOG_LVL_DEBUG, "UserQuerySelect beginning submission");
241+
// LOGS(_log, LOG_LVL_DEBUG, "UserQuerySelect beginning submission");
242242
assert(_infileMerger);
243243

244244
auto taskMsgFactory = std::make_shared<qproc::TaskMsgFactory>();
@@ -249,9 +249,9 @@ void UserQuerySelect::submit() {
249249

250250
auto queryTemplates = _qSession->makeQueryTemplates();
251251

252-
LOGS(_log, LOG_LVL_DEBUG,
253-
"first query template:" << (queryTemplates.size() > 0 ? queryTemplates[0].sqlFragment()
254-
: "none produced."));
252+
// LOGS(_log, LOG_LVL_DEBUG,
253+
// "first query template:" << (queryTemplates.size() > 0 ? queryTemplates[0].sqlFragment()
254+
// : "none produced."));
255255

256256
// Writing query for each chunk, stop if query is cancelled.
257257
// attempt to change priority, requires root
@@ -266,7 +266,7 @@ void UserQuerySelect::submit() {
266266
try {
267267
_queryStatsData->queryStatsTmpRegister(_qMetaQueryId, _qSession->getChunksSize());
268268
} catch (qmeta::SqlError const& e) {
269-
LOGS(_log, LOG_LVL_WARN, "Failed queryStatsTmpRegister " << e.what());
269+
// LOGS(_log, LOG_LVL_WARN, "Failed queryStatsTmpRegister " << e.what());
270270
}
271271

272272
_executive->setScanInteractive(_qSession->getScanInteractive());
@@ -278,7 +278,7 @@ void UserQuerySelect::submit() {
278278
std::function<void(util::CmdData*)> funcBuildJob = [this, sequence, // sequence must be a copy
279279
&chunkSpec, &queryTemplates, &chunks, &chunksMtx,
280280
&ttn, &taskMsgFactory](util::CmdData*) {
281-
QSERV_LOGCONTEXT_QUERY(_qMetaQueryId);
281+
// QSERV_LOGCONTEXT_QUERY(_qMetaQueryId);
282282

283283
qproc::ChunkQuerySpec::Ptr cs;
284284
{
@@ -308,7 +308,7 @@ void UserQuerySelect::submit() {
308308
threadPriority.restoreOriginalValues();
309309
}
310310

311-
LOGS(_log, LOG_LVL_DEBUG, "total jobs in query=" << sequence);
311+
// LOGS(_log, LOG_LVL_DEBUG, "total jobs in query=" << sequence);
312312
_executive->waitForAllJobsToStart();
313313

314314
// we only care about per-chunk info for ASYNC queries
@@ -327,7 +327,7 @@ QueryState UserQuerySelect::join() {
327327
int64_t finalRows = 0;
328328
if (!_infileMerger->finalize(collectedBytes, finalRows)) {
329329
successful = false;
330-
LOGS(_log, LOG_LVL_ERROR, "InfileMerger::finalize failed");
330+
// LOGS(_log, LOG_LVL_ERROR, "InfileMerger::finalize failed");
331331
// Error: 1105 SQLSTATE: HY000 (ER_UNKNOWN_ERROR) Message: Unknown error
332332
_messageStore->addMessage(-1, "MERGE", 1105, "Failure while merging result",
333333
MessageSeverity::MSG_ERROR);
@@ -355,15 +355,15 @@ QueryState UserQuerySelect::join() {
355355
QueryState state = SUCCESS;
356356
if (successful) {
357357
_qMetaUpdateStatus(qmeta::QInfo::COMPLETED, collectedRows, collectedBytes, finalRows);
358-
LOGS(_log, LOG_LVL_INFO, "Joined everything (success)");
358+
// LOGS(_log, LOG_LVL_INFO, "Joined everything (success)");
359359
} else if (_killed) {
360360
// status is already set to ABORTED
361-
LOGS(_log, LOG_LVL_ERROR, "Joined everything (killed)");
361+
// LOGS(_log, LOG_LVL_ERROR, "Joined everything (killed)");
362362
operation = proto::QueryManagement::CANCEL;
363363
state = ERROR;
364364
} else {
365365
_qMetaUpdateStatus(qmeta::QInfo::FAILED, collectedRows, collectedBytes, finalRows);
366-
LOGS(_log, LOG_LVL_ERROR, "Joined everything (failure!)");
366+
// LOGS(_log, LOG_LVL_ERROR, "Joined everything (failure!)");
367367
operation = proto::QueryManagement::CANCEL;
368368
state = ERROR;
369369
}
@@ -373,7 +373,7 @@ QueryState UserQuerySelect::join() {
373373
xrdreq::QueryManagementAction::notifyAllWorkers(czarConfig->getXrootdFrontendUrl(), operation,
374374
_qMetaCzarId, _qMetaQueryId);
375375
} catch (std::exception const& ex) {
376-
LOGS(_log, LOG_LVL_WARN, ex.what());
376+
// LOGS(_log, LOG_LVL_WARN, ex.what());
377377
}
378378
}
379379
return state;
@@ -409,18 +409,19 @@ void UserQuerySelect::discard() {
409409
// Silence merger discarding errors, because this object is being released.
410410
// client no longer cares about merger errors.
411411
}
412-
LOGS(_log, LOG_LVL_INFO, "Discarded UserQuerySelect");
412+
// LOGS(_log, LOG_LVL_INFO, "Discarded UserQuerySelect");
413413
}
414414

415415
/// Setup merger (for results handling and aggregation)
416416
void UserQuerySelect::setupMerger() {
417-
LOGS(_log, LOG_LVL_TRACE, "Setup merger");
417+
// LOGS(_log, LOG_LVL_TRACE, "Setup merger");
418418
_infileMergerConfig->targetTable = _resultTable;
419419
_infileMergerConfig->mergeStmt = _qSession->getMergeStmt();
420-
LOGS(_log, LOG_LVL_DEBUG,
421-
"setting mergeStmt:" << (_infileMergerConfig->mergeStmt != nullptr
422-
? _infileMergerConfig->mergeStmt->getQueryTemplate().sqlFragment()
423-
: "nullptr"));
420+
// LOGS(_log, LOG_LVL_DEBUG,
421+
// "setting mergeStmt:" << (_infileMergerConfig->mergeStmt != nullptr
422+
// ?
423+
// _infileMergerConfig->mergeStmt->getQueryTemplate().sqlFragment()
424+
// : "nullptr"));
424425
_infileMerger =
425426
std::make_shared<rproc::InfileMerger>(*_infileMergerConfig, _databaseModels, _semaMgrConn);
426427

@@ -474,7 +475,7 @@ void UserQuerySelect::_expandSelectStarInMergeStatment(std::shared_ptr<query::Se
474475
void UserQuerySelect::saveResultQuery() { _queryMetadata->saveResultQuery(_qMetaQueryId, getResultQuery()); }
475476

476477
void UserQuerySelect::_setupChunking() {
477-
LOGS(_log, LOG_LVL_TRACE, "Setup chunking");
478+
// LOGS(_log, LOG_LVL_TRACE, "Setup chunking");
478479
// Do not throw exceptions here, set _errorExtra .
479480
std::shared_ptr<qproc::IndexMap> im;
480481
std::string dominantDb = _qSession->getDominantDb();
@@ -488,11 +489,11 @@ void UserQuerySelect::_setupChunking() {
488489
eSet = _qSession->getEmptyChunks();
489490
if (!eSet) {
490491
eSet = std::make_shared<IntSet>();
491-
LOGS(_log, LOG_LVL_WARN, "Missing empty chunks info for " << dominantDb);
492+
// LOGS(_log, LOG_LVL_WARN, "Missing empty chunks info for " << dominantDb);
492493
}
493494
}
494495
// FIXME add operator<< for QuerySession
495-
LOGS(_log, LOG_LVL_TRACE, "_qSession: " << _qSession);
496+
// LOGS(_log, LOG_LVL_TRACE, "_qSession: " << _qSession);
496497
if (_qSession->hasChunks()) {
497498
auto areaRestrictors = _qSession->getAreaRestrictors();
498499
auto secIdxRestrictors = _qSession->getSecIdxRestrictors();
@@ -506,15 +507,15 @@ void UserQuerySelect::_setupChunking() {
506507
csv = im->getAllChunks();
507508
}
508509

509-
LOGS(_log, LOG_LVL_TRACE, "Chunk specs: " << util::printable(csv));
510+
// LOGS(_log, LOG_LVL_TRACE, "Chunk specs: " << util::printable(csv));
510511
// Filter out empty chunks
511512
for (qproc::ChunkSpecVector::const_iterator i = csv.begin(), e = csv.end(); i != e; ++i) {
512513
if (eSet->count(i->chunkId) == 0) { // chunk not in empty?
513514
_qSession->addChunk(*i);
514515
}
515516
}
516517
} else {
517-
LOGS(_log, LOG_LVL_TRACE, "No chunks added, QuerySession will add dummy chunk");
518+
// LOGS(_log, LOG_LVL_TRACE, "No chunks added, QuerySession will add dummy chunk");
518519
}
519520
_qSession->setScanInteractive();
520521
}
@@ -578,8 +579,8 @@ void UserQuerySelect::qMetaRegister(std::string const& resultLocation, std::stri
578579
_qMetaQueryId = _queryMetadata->registerQuery(qInfo, tableNames);
579580
_queryIdStr = QueryIdHelper::makeIdStr(_qMetaQueryId);
580581
// Add logging context with query ID
581-
QSERV_LOGCONTEXT_QUERY(_qMetaQueryId);
582-
LOGS(_log, LOG_LVL_DEBUG, "UserQuery registered " << _qSession->getOriginal());
582+
// QSERV_LOGCONTEXT_QUERY(_qMetaQueryId);
583+
// LOGS(_log, LOG_LVL_DEBUG, "UserQuery registered " << _qSession->getOriginal());
583584

584585
// update #QID# with actual query ID
585586
boost::replace_all(_resultLoc, "#QID#", std::to_string(_qMetaQueryId));
@@ -597,7 +598,7 @@ void UserQuerySelect::qMetaRegister(std::string const& resultLocation, std::stri
597598
if (_executive != nullptr) {
598599
_executive->setQueryId(_qMetaQueryId);
599600
} else {
600-
LOGS(_log, LOG_LVL_WARN, "No Executive, assuming invalid query");
601+
// LOGS(_log, LOG_LVL_WARN, "No Executive, assuming invalid query");
601602
}
602603

603604
// Note that ordering is important here, this check must happen after
@@ -626,7 +627,7 @@ void UserQuerySelect::_qMetaUpdateStatus(qmeta::QInfo::QStatus qStatus, size_t r
626627
try {
627628
_queryStatsData->queryStatsTmpRemove(_qMetaQueryId);
628629
} catch (qmeta::SqlError const&) {
629-
LOGS(_log, LOG_LVL_WARN, "queryStatsTmp remove failed " << _queryIdStr);
630+
// LOGS(_log, LOG_LVL_WARN, "queryStatsTmp remove failed " << _queryIdStr);
630631
}
631632
}
632633

@@ -637,7 +638,7 @@ void UserQuerySelect::_qMetaUpdateMessages() {
637638
try {
638639
_queryMetadata->addQueryMessages(_qMetaQueryId, msgStore);
639640
} catch (qmeta::SqlError const& ex) {
640-
LOGS(_log, LOG_LVL_WARN, "UserQuerySelect::_qMetaUpdateMessages failed " << ex.what());
641+
// LOGS(_log, LOG_LVL_WARN, "UserQuerySelect::_qMetaUpdateMessages failed " << ex.what());
641642
}
642643
}
643644

src/qdisp/CzarStats.cc

+7-7
Original file line numberDiff line numberDiff line change
@@ -104,21 +104,21 @@ void CzarStats::endQueryRespConcurrentProcessing(TIMEPOINT start, TIMEPOINT end)
104104

105105
void CzarStats::addXRootDSSIRecvRate(double bytesPerSec) {
106106
_histXRootDSSIRecvRate->addEntry(bytesPerSec);
107-
LOGS(_log, LOG_LVL_TRACE,
108-
"CzarStats::" << __func__ << " " << bytesPerSec << " " << _histXRootDSSIRecvRate->getString(""));
107+
// LOGS(_log, LOG_LVL_TRACE,
108+
// "CzarStats::" << __func__ << " " << bytesPerSec << " " << _histXRootDSSIRecvRate->getString(""));
109109
}
110110

111111
void CzarStats::addMergeRate(double bytesPerSec) {
112112
_histMergeRate->addEntry(bytesPerSec);
113-
LOGS(_log, LOG_LVL_TRACE,
114-
"CzarStats::" << __func__ << " " << bytesPerSec << " " << _histMergeRate->getString("")
115-
<< " jsonA=" << getTransmitStatsJson() << " jsonB=" << getQdispStatsJson());
113+
// LOGS(_log, LOG_LVL_TRACE,
114+
// "CzarStats::" << __func__ << " " << bytesPerSec << " " << _histMergeRate->getString("")
115+
// << " jsonA=" << getTransmitStatsJson() << " jsonB=" << getQdispStatsJson());
116116
}
117117

118118
void CzarStats::addFileReadRate(double bytesPerSec) {
119119
_histFileReadRate->addEntry(bytesPerSec);
120-
LOGS(_log, LOG_LVL_TRACE,
121-
"CzarStats::" << __func__ << " " << bytesPerSec << " " << _histFileReadRate->getString(""));
120+
// LOGS(_log, LOG_LVL_TRACE,
121+
// "CzarStats::" << __func__ << " " << bytesPerSec << " " << _histFileReadRate->getString(""));
122122
}
123123

124124
void CzarStats::trackQueryProgress(QueryId qid) {

0 commit comments

Comments
 (0)