From 92dabd20539ca0a784a2a1228cdbf53c64e3cdfe Mon Sep 17 00:00:00 2001 From: John Gates Date: Fri, 16 Dec 2022 15:52:56 -0800 Subject: [PATCH 1/4] Temporary logging changes. --- src/util/InstanceCount.h | 7 +++++++ src/wbase/SendChannelShared.cc | 13 +++++++++++-- src/wbase/SendChannelShared.h | 2 +- src/wbase/TransmitData.cc | 4 +++- src/wdb/QueryRunner.cc | 8 ++++---- 5 files changed, 26 insertions(+), 8 deletions(-) diff --git a/src/util/InstanceCount.h b/src/util/InstanceCount.h index 9923b84b5b..877daddc90 100644 --- a/src/util/InstanceCount.h +++ b/src/util/InstanceCount.h @@ -5,15 +5,22 @@ // System headers #include +#include #include #include namespace lsst::qserv::util { /// This a utility class to track the number of instances of any class where it is a member. +/// Alternatively, it can be used to track functions or code blocks by giving the +/// InstanceCount a unique identifier in the block of code to track. +/// It can also be used to see how many threads are waiting on a mutex. +/// InstanceCount is a flexible debugging tool, but it is very noisy in the log. +/// Once a problem is solved, the local instances of InstanceCount should be removed. // class InstanceCount { public: + using Ptr = std::shared_ptr; InstanceCount(std::string const& className); InstanceCount(InstanceCount const& other); InstanceCount(InstanceCount&& origin); diff --git a/src/wbase/SendChannelShared.cc b/src/wbase/SendChannelShared.cc index 44de0c4bb6..d0a6c15a09 100644 --- a/src/wbase/SendChannelShared.cc +++ b/src/wbase/SendChannelShared.cc @@ -129,7 +129,7 @@ bool SendChannelShared::_addTransmit(bool cancelled, bool erred, bool lastIn, Tr bool reallyLast = _lastRecvd; string idStr(makeIdStr(qId, jId)); if (_icPtr == nullptr) { - _icPtr = std::make_shared(std::to_string(qId) + "_SCS_LDB"); + _icPtr = std::make_shared("QI=" + std::to_string(qId) + "_SCS_LDB"); } // If something bad already happened, just give up. @@ -160,7 +160,7 @@ util::TimerHistogram scsTransmitSend("scsTransmitSend", {0.01, 0.1, 1.0, 2.0, 5. bool SendChannelShared::_transmit(bool erred) { string idStr = "QID?"; - + util::InstanceCount::Ptr icPtrA; // Result data is transmitted in messages containing data and headers. // data - is the result data // header - contains information about the next chunk of result data, @@ -210,7 +210,9 @@ bool SendChannelShared::_transmit(bool erred) { // The first message needs to put its header data in metadata as there's // no previous message it could attach its header to. { + icPtrA = std::make_shared(thisTransmit->getIdStr() + "_te_LDB_" + to_string(reallyLast) + "_a"); //&&& lock_guard streamLock(_streamMutex); // Must keep meta and buffer together. + icPtrA = std::make_shared(thisTransmit->getIdStr() + "_te_LDB_" + to_string(reallyLast) + "_b"); //&&& if (_firstTransmit.exchange(false)) { // Put the header for the first message in metadata // _metaDataBuf must remain valid until Finished() is called. @@ -229,8 +231,10 @@ bool SendChannelShared::_transmit(bool erred) { { util::Timer sendTimer; sendTimer.start(); + icPtrA = std::make_shared(thisTransmit->getIdStr() + "_te_LDB_" + to_string(reallyLast) + "_c"); //&&& bool sent = _sendBuf(streamLock, streamBuf, reallyLast, "transmitLoop " + idStr + " " + seqStr, scsSeq); + icPtrA = std::make_shared(thisTransmit->getIdStr() + "_te_LDB_" + to_string(reallyLast) + "_d"); //&&& sendTimer.stop(); auto logMsgSend = scsTransmitSend.addTime(sendTimer.getElapsed(), idStr); LOGS(_log, LOG_LVL_INFO, logMsgSend); @@ -241,6 +245,7 @@ bool SendChannelShared::_transmit(bool erred) { } } } + icPtrA = std::make_shared(thisTransmit->getIdStr() + "_te_LDB_" + to_string(reallyLast) + "_z"); //&&& // If that was the last message, break the loop. if (reallyLast) return true; } @@ -251,7 +256,9 @@ util::TimerHistogram transmitHisto("transmit Hist", {0.1, 1, 5, 10, 20, 40}); bool SendChannelShared::_sendBuf(lock_guard const& streamLock, xrdsvc::StreamBuffer::Ptr& streamBuf, bool last, string const& note, int scsSeq) { + util::InstanceCount ica(note + "_SCSStream_LDB_a"); bool sent = _sendChannel->sendStream(streamBuf, last, scsSeq); + util::InstanceCount icb(note + "_SCSStream_LDB_b"); if (!sent) { LOGS(_log, LOG_LVL_ERROR, "Failed to transmit " << note << "!"); return false; @@ -259,7 +266,9 @@ bool SendChannelShared::_sendBuf(lock_guard const& streamLock, xrdsvc::St util::Timer t; t.start(); LOGS(_log, LOG_LVL_INFO, "_sendbuf wait start " << note); + util::InstanceCount icc(note + "_SCSStream_LDB_c"); streamBuf->waitForDoneWithThis(); // Block until this buffer has been sent. + util::InstanceCount icd(note + "_SCSStream_LDB_d"); t.stop(); auto logMsg = transmitHisto.addTime(t.getElapsed(), note); LOGS(_log, LOG_LVL_DEBUG, logMsg); diff --git a/src/wbase/SendChannelShared.h b/src/wbase/SendChannelShared.h index 9981906c7c..0b9b8a640c 100644 --- a/src/wbase/SendChannelShared.h +++ b/src/wbase/SendChannelShared.h @@ -288,7 +288,7 @@ class SendChannelShared { std::shared_ptr _transmitData; ///< TransmitData object mutable std::mutex _tMtx; ///< protects _transmitData - std::shared_ptr _icPtr; ///< temporary for LockupDB + util::InstanceCount::Ptr _icPtr; ///< temporary for LockupDB }; } // namespace wbase diff --git a/src/wbase/TransmitData.cc b/src/wbase/TransmitData.cc index 6591997d76..8f938bb471 100644 --- a/src/wbase/TransmitData.cc +++ b/src/wbase/TransmitData.cc @@ -83,8 +83,9 @@ proto::Result* TransmitData::_createResult() { void TransmitData::attachNextHeader(TransmitData::Ptr const& nextTr, bool reallyLast, uint32_t seq, int scsSeq) { - _icPtr = std::make_shared(_idStr + "_td_LDB_" + std::to_string(reallyLast)); + _icPtr = std::make_shared(_idStr + "_td_LDB_" + std::to_string(reallyLast) + "_a"); lock_guard lock(_trMtx); + _icPtr = std::make_shared(_idStr + "_td_LDB_" + std::to_string(reallyLast) + "_b"); if (_result == nullptr) { throw util::Bug(ERR_LOC, _idStr + "_transmitLoop() had nullptr result!"); } @@ -105,6 +106,7 @@ void TransmitData::attachNextHeader(TransmitData::Ptr const& nextTr, bool really } // Append the next header to this data. _dataMsg += proto::ProtoHeaderWrap::wrap(nextHeaderString); + _icPtr = std::make_shared(_idStr + "_td_LDB_" + std::to_string(reallyLast) + "_c"); } string TransmitData::makeHeaderString(bool reallyLast, uint32_t seq, int scsSeq) { diff --git a/src/wdb/QueryRunner.cc b/src/wdb/QueryRunner.cc index bff17fc96e..a7561af8a1 100644 --- a/src/wdb/QueryRunner.cc +++ b/src/wdb/QueryRunner.cc @@ -150,7 +150,7 @@ size_t QueryRunner::_getDesiredLimit() { util::TimerHistogram memWaitHisto("memWait Hist", {1, 5, 10, 20, 40}); bool QueryRunner::runQuery() { - util::InstanceCount ic(to_string(_task->getQueryId()) + "_rq_LDB"); // LockupDB + util::InstanceCount ic("QI=" + to_string(_task->getQueryId()) + "_rq_LDB"); // LockupDB QSERV_LOGCONTEXT_QUERY_JOB(_task->getQueryId(), _task->getJobId()); LOGS(_log, LOG_LVL_INFO, "QueryRunner::runQuery() tid=" << _task->getIdStr() @@ -350,7 +350,7 @@ bool QueryRunner::_dispatchChannel() { // Pass all information on to the shared object to add on to // an existing message or build a new one as needed. - util::InstanceCount ica(to_string(_task->getQueryId()) + "_rqa_LDB"); // LockupDB + util::InstanceCount ica("QI=" + to_string(_task->getQueryId()) + "_rqa_LDB"); // LockupDB if (_task->getSendChannel()->buildAndTransmitResult(res, numFields, *_task, _largeResult, _multiError, _cancelled, readRowsOk)) { erred = true; @@ -371,14 +371,14 @@ bool QueryRunner::_dispatchChannel() { erred = true; } // IMPORTANT, do not leave this function before this check has been made. - util::InstanceCount icb(to_string(_task->getQueryId()) + "_rqb_LDB"); // LockupDB + util::InstanceCount icb("QI=" + to_string(_task->getQueryId()) + "_rqb_LDB"); // LockupDB if (needToFreeRes) { needToFreeRes = false; // All rows have been read out or there was an error. In // either case resources need to be freed. _mysqlConn->freeResult(); } - util::InstanceCount icc(to_string(_task->getQueryId()) + "_rqc_LDB"); // LockupDB + util::InstanceCount icc("QI=" + to_string(_task->getQueryId()) + "_rqc_LDB"); // LockupDB if (!readRowsOk) { // This means a there was a transmit error and there's no way to // send anything to the czar. However, there were mysql results From 0b04b388bd4011407a2b336baf79c331443589e4 Mon Sep 17 00:00:00 2001 From: John Gates Date: Tue, 20 Dec 2022 11:08:02 -0800 Subject: [PATCH 2/4] Added log messages for bug hunting. --- src/ccontrol/MergingHandler.cc | 15 +++++++++++++-- src/qdisp/QueryRequest.cc | 9 +++++++++ src/rproc/InfileMerger.cc | 5 +++++ src/wbase/TransmitData.cc | 2 +- src/wpublish/AddChunkGroupCommand.cc | 4 ++-- src/wpublish/ChunkListCommand.cc | 4 ++-- src/wpublish/GetChunkListCommand.cc | 2 +- src/wpublish/GetStatusCommand.cc | 2 +- src/wpublish/RemoveChunkGroupCommand.cc | 4 ++-- src/wpublish/SetChunkListCommand.cc | 4 ++-- src/wpublish/TestEchoCommand.cc | 2 +- src/xrdsvc/StreamBuffer.cc | 8 +++++--- src/xrdsvc/StreamBuffer.h | 5 +++-- 13 files changed, 47 insertions(+), 19 deletions(-) diff --git a/src/ccontrol/MergingHandler.cc b/src/ccontrol/MergingHandler.cc index 5b3f14102b..e550c49689 100644 --- a/src/ccontrol/MergingHandler.cc +++ b/src/ccontrol/MergingHandler.cc @@ -95,9 +95,11 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, bool& lar if (bLen < 0) { throw util::Bug(ERR_LOC, "MergingHandler invalid blen=" + to_string(bLen) + " from " + _wName); } + util::InstanceCount ica(_tableName + "_Merge_flush_LDB_a"); switch (_state) { - case MsgState::HEADER_WAIT: + case MsgState::HEADER_WAIT: { + util::InstanceCount icl(_tableName + "_Merge_flush_LDB_l_header_wait"); _response->headerSize = static_cast((*bufPtr)[0]); if (!proto::ProtoHeaderWrap::unwrap(_response, *bufPtr)) { std::string sErr = @@ -140,8 +142,10 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, bool& lar _state = MsgState::RESULT_RECV; } } + } return true; case MsgState::RESULT_WAIT: { + util::InstanceCount icp(_tableName + "_Merge_flush_LDB_p_result_wait"); nextBufSize = proto::ProtoHeaderWrap::getProtoHeaderSize(); auto jobQuery = getJobQuery().lock(); if (!_verifyResult(bufPtr, bLen)) { @@ -160,7 +164,9 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, bool& lar LOGS(_log, LOG_LVL_DEBUG, "Flushed last=" << last << " for tableName=" << _tableName); auto success = _merge(); + util::InstanceCount icpx(_tableName + "_Merge_flush_LDB_px"); _response.reset(new WorkerResponse()); + util::InstanceCount icpz(_tableName + "_Merge_flush_LDB_pz"); return success; } case MsgState::RESULT_RECV: @@ -169,6 +175,7 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, bool& lar case MsgState::HEADER_ERR: [[fallthrough]]; case MsgState::RESULT_ERR: { + util::InstanceCount iczerror(_tableName + "_Merge_flush_LDB_zerror"); std::ostringstream eos; eos << "Unexpected message From:" << _wName << " flush state=" << getStateStr(_state) << " last=" << last; @@ -224,11 +231,14 @@ void MergingHandler::_initState() { } bool MergingHandler::_merge() { + util::InstanceCount ica(_tableName + "_Merge_merge_LDB_a"); if (auto job = getJobQuery().lock()) { if (_flushed) { throw util::Bug(ERR_LOC, "MergingRequester::_merge : already flushed"); } + util::InstanceCount icb(_tableName + "_Merge_merge_LDB_b"); bool success = _infileMerger->merge(_response); + util::InstanceCount icc(_tableName + "_Merge_merge_LDB_c"); if (!success) { LOGS(_log, LOG_LVL_WARN, "_merge() failed"); rproc::InfileMergerError const& err = _infileMerger->getError(); @@ -236,10 +246,11 @@ bool MergingHandler::_merge() { _state = MsgState::RESULT_ERR; } _response.reset(); - + util::InstanceCount icx(_tableName + "_Merge_merge_LDB_x"); return success; } LOGS(_log, LOG_LVL_ERROR, "MergingHandler::_merge() failed, jobQuery was NULL"); + util::InstanceCount icz(_tableName + "_Merge_merge_LDB_z"); return false; } diff --git a/src/qdisp/QueryRequest.cc b/src/qdisp/QueryRequest.cc index b8bfb6140e..56806f7d33 100644 --- a/src/qdisp/QueryRequest.cc +++ b/src/qdisp/QueryRequest.cc @@ -79,6 +79,7 @@ class QueryRequest::AskForResponseDataCmd : public PriorityCommand { void action(util::CmdData* data) override { // If everything is ok, call GetResponseData to have XrdSsi ask the worker for the data. QSERV_LOGCONTEXT_QUERY_JOB(_qid, _jobid); + util::InstanceCount ica("QI=" + to_string(_qid) +":" + to_string(_jobid) + "_QReq_AfRDCmd_LDB_a"); util::Timer tWaiting; util::Timer tTotal; PseudoFifo::Element::Ptr pseudoFifoElem; @@ -107,14 +108,18 @@ class QueryRequest::AskForResponseDataCmd : public PriorityCommand { pseudoFifoElem = _pseudoFifo->queueAndWait(); tWaiting.start(); + util::InstanceCount icb("QI=" + to_string(_qid) +":" + to_string(_jobid) + "_QReq_AfRDCmd_LDB_b"); qr->GetResponseData(&buffer[0], buffer.size()); + util::InstanceCount icc("QI=" + to_string(_qid) +":" + to_string(_jobid) + "_QReq_AfRDCmd_LDB_c"); } // Wait for XrdSsi to call ProcessResponseData with the data, // which will notify this wait with a call to receivedProcessResponseDataParameters. { LOGS(_log, LOG_LVL_TRACE, "GetResponseData called respC=" << _respCount); + util::InstanceCount icd("QI=" + to_string(_qid) +":" + to_string(_jobid) + "_QReq_AfRDCmd_LDB_d"); std::unique_lock uLock(_mtx); + util::InstanceCount ice("QI=" + to_string(_qid) +":" + to_string(_jobid) + "_QReq_AfRDCmd_LDB_e"); // TODO: make timed wait, check for wedged, if weak pointers dead, log and give up. // The only purpose of the below being in a function is make this easier to find in gdb. _lockWaitQrA(uLock); @@ -137,6 +142,7 @@ class QueryRequest::AskForResponseDataCmd : public PriorityCommand { // Actually process the data. // If more data needs to be sent, _processData will make a new AskForResponseDataCmd // object and queue it. + util::InstanceCount icf("QI=" + to_string(_qid) +":" + to_string(_jobid) + "_QReq_AfRDCmd_LDB_f"); { auto jq = _jQuery.lock(); auto qr = _qRequest.lock(); @@ -149,6 +155,7 @@ class QueryRequest::AskForResponseDataCmd : public PriorityCommand { // _processData will have created another AskForResponseDataCmd object if was needed. tTotal.stop(); } + util::InstanceCount icg("QI=" + to_string(_qid) +":" + to_string(_jobid) + "_QReq_AfRDCmd_LDB_g"); _setState(State::DONE2); LOGS(_log, LOG_LVL_DEBUG, "Ask data is done wait=" << tWaiting.getElapsed() << " total=" << tTotal.getElapsed()); @@ -336,8 +343,10 @@ bool QueryRequest::_importStream(JobQuery::Ptr const& jq) { int nextBufSize = 0; bool last = false; int resultRows = 0; + util::InstanceCount ica(_jobIdStr + "_QReq_imStream_LDB_a"); bool flushOk = jq->getDescription()->respHandler()->flush(len, bufPtr, last, largeResult, nextBufSize, resultRows); + util::InstanceCount icb(_jobIdStr + "_QReq_imStream_LDB_b_last=" + to_string(last)); if (!flushOk) { LOGS(_log, LOG_LVL_ERROR, "_importStream not flushOk"); diff --git a/src/rproc/InfileMerger.cc b/src/rproc/InfileMerger.cc index 80046bf12e..821d935642 100644 --- a/src/rproc/InfileMerger.cc +++ b/src/rproc/InfileMerger.cc @@ -218,6 +218,8 @@ bool InfileMerger::merge(std::shared_ptr const& response) if (!_queryIdStrSet) { _setQueryIdStr(QueryIdHelper::makeIdStr(response->result.queryid())); } + + util::InstanceCount ica(_getQueryIdStr() + "_InfMerge_LDB_a"); size_t resultSize = response->result.transmitsize(); LOGS(_log, LOG_LVL_TRACE, "Executing InfileMerger::merge(" @@ -289,7 +291,9 @@ bool InfileMerger::merge(std::shared_ptr const& response) throw std::invalid_argument("InfileMerger::_dbEngine is unknown =" + engineToStr(_dbEngine)); } auto end = std::chrono::system_clock::now(); + util::InstanceCount icl(_getQueryIdStr() + "_InfMerge_LDB_l"); auto mergeDur = std::chrono::duration_cast(end - start); + util::InstanceCount icm(_getQueryIdStr() + "_InfMerge_LDB_m"); LOGS(_log, LOG_LVL_DEBUG, "mergeDur=" << mergeDur.count() << " sema(total=" << _semaMgrConn->getTotalCount() << " used=" << _semaMgrConn->getUsedCount() << ")"); @@ -299,6 +303,7 @@ bool InfileMerger::merge(std::shared_ptr const& response) _invalidJobAttemptMgr.decrConcurrentMergeCount(); LOGS(_log, LOG_LVL_DEBUG, "mergeDur=" << mergeDur.count()); + util::InstanceCount icz(_getQueryIdStr() + "_InfMerge_LDB_z"); return ret; } diff --git a/src/wbase/TransmitData.cc b/src/wbase/TransmitData.cc index 8f938bb471..030c8a3f20 100644 --- a/src/wbase/TransmitData.cc +++ b/src/wbase/TransmitData.cc @@ -145,7 +145,7 @@ string TransmitData::getHeaderString(uint32_t seq, int scsSeq) { xrdsvc::StreamBuffer::Ptr TransmitData::getStreamBuffer() { lock_guard lock(_trMtx); // createWithMove invalidates _dataMsg - return xrdsvc::StreamBuffer::createWithMove(_dataMsg); + return xrdsvc::StreamBuffer::createWithMove(_dataMsg, getIdStr()); } void TransmitData::_buildHeader(bool largeResult) { diff --git a/src/wpublish/AddChunkGroupCommand.cc b/src/wpublish/AddChunkGroupCommand.cc index 3523027ce9..637322fa5b 100644 --- a/src/wpublish/AddChunkGroupCommand.cc +++ b/src/wpublish/AddChunkGroupCommand.cc @@ -69,7 +69,7 @@ void AddChunkGroupCommand::_reportError(proto::WorkerCommandChunkGroupR::Status _frameBuf.serialize(reply); string str(_frameBuf.data(), _frameBuf.size()); - _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str), true); + _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str, "other"), true); } void AddChunkGroupCommand::run() { @@ -122,7 +122,7 @@ void AddChunkGroupCommand::run() { } _frameBuf.serialize(reply); string str(_frameBuf.data(), _frameBuf.size()); - _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str), true); + _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str, "other"), true); LOGS(_log, LOG_LVL_DEBUG, context << "** SENT **"); } diff --git a/src/wpublish/ChunkListCommand.cc b/src/wpublish/ChunkListCommand.cc index a9502b0d50..3a01e4ce6a 100644 --- a/src/wpublish/ChunkListCommand.cc +++ b/src/wpublish/ChunkListCommand.cc @@ -76,7 +76,7 @@ void ChunkListCommand::_reportError(string const& message) { _frameBuf.serialize(reply); string str(_frameBuf.data(), _frameBuf.size()); - _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str), true); + _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str, "other"), true); } void ChunkListCommand::run() { @@ -195,7 +195,7 @@ void ChunkListCommand::run() { } _frameBuf.serialize(reply); string str(_frameBuf.data(), _frameBuf.size()); - _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str), true); + _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str, "other"), true); LOGS(_log, LOG_LVL_DEBUG, context << "** SENT **"); } diff --git a/src/wpublish/GetChunkListCommand.cc b/src/wpublish/GetChunkListCommand.cc index ce6e550b8e..73954cd06c 100644 --- a/src/wpublish/GetChunkListCommand.cc +++ b/src/wpublish/GetChunkListCommand.cc @@ -71,7 +71,7 @@ void GetChunkListCommand::run() { _frameBuf.serialize(reply); string str(_frameBuf.data(), _frameBuf.size()); - _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str), true); + _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str, "other"), true); LOGS(_log, LOG_LVL_DEBUG, "GetChunkListCommand::" << __func__ << " ** SENT **"); } diff --git a/src/wpublish/GetStatusCommand.cc b/src/wpublish/GetStatusCommand.cc index 257495f9f5..279059eb1f 100644 --- a/src/wpublish/GetStatusCommand.cc +++ b/src/wpublish/GetStatusCommand.cc @@ -60,7 +60,7 @@ void GetStatusCommand::run() { _frameBuf.serialize(reply); string str(_frameBuf.data(), _frameBuf.size()); - _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str), true); + _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str, "other"), true); LOGS(_log, LOG_LVL_DEBUG, "GetStatusCommand::" << __func__ << " ** SENT **"); } diff --git a/src/wpublish/RemoveChunkGroupCommand.cc b/src/wpublish/RemoveChunkGroupCommand.cc index 5f54bf6d24..ae99e2da1d 100644 --- a/src/wpublish/RemoveChunkGroupCommand.cc +++ b/src/wpublish/RemoveChunkGroupCommand.cc @@ -73,7 +73,7 @@ void RemoveChunkGroupCommand::_reportError(proto::WorkerCommandChunkGroupR::Stat _frameBuf.serialize(reply); string str(_frameBuf.data(), _frameBuf.size()); - auto streamBuffer = xrdsvc::StreamBuffer::createWithMove(str); + auto streamBuffer = xrdsvc::StreamBuffer::createWithMove(str, "other"); _sendChannel->sendStream(streamBuffer, true); } @@ -149,7 +149,7 @@ void RemoveChunkGroupCommand::run() { _frameBuf.serialize(reply); string str(_frameBuf.data(), _frameBuf.size()); - _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str), true); + _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str, "other"), true); LOGS(_log, LOG_LVL_DEBUG, context << "** SENT **"); } diff --git a/src/wpublish/SetChunkListCommand.cc b/src/wpublish/SetChunkListCommand.cc index 84df7b3f22..b2d06e9e99 100644 --- a/src/wpublish/SetChunkListCommand.cc +++ b/src/wpublish/SetChunkListCommand.cc @@ -94,7 +94,7 @@ void SetChunkListCommand::_reportError(proto::WorkerCommandSetChunkListR::Status _frameBuf.serialize(reply); string str(_frameBuf.data(), _frameBuf.size()); - auto streamBuffer = xrdsvc::StreamBuffer::createWithMove(str); + auto streamBuffer = xrdsvc::StreamBuffer::createWithMove(str, "other"); _sendChannel->sendStream(streamBuffer, true); } @@ -218,7 +218,7 @@ void SetChunkListCommand::run() { _frameBuf.serialize(reply); string str(_frameBuf.data(), _frameBuf.size()); - _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str), true); + _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str, "other"), true); LOGS(_log, LOG_LVL_DEBUG, context << "** SENT **"); } diff --git a/src/wpublish/TestEchoCommand.cc b/src/wpublish/TestEchoCommand.cc index c73aad641b..a96be6c34f 100644 --- a/src/wpublish/TestEchoCommand.cc +++ b/src/wpublish/TestEchoCommand.cc @@ -61,7 +61,7 @@ void TestEchoCommand::run() { _frameBuf.serialize(reply); string str(_frameBuf.data(), _frameBuf.size()); - _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str), true); + _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str, "other"), true); LOGS(_log, LOG_LVL_DEBUG, "TestEchoCommand::" << __func__ << " ** SENT **"); } diff --git a/src/xrdsvc/StreamBuffer.cc b/src/xrdsvc/StreamBuffer.cc index d13f0ff69a..ed5a9807f4 100644 --- a/src/xrdsvc/StreamBuffer.cc +++ b/src/xrdsvc/StreamBuffer.cc @@ -65,18 +65,18 @@ double StreamBuffer::percentOfMaxTotalBytesUsed() { } // Factory function, because this should be able to delete itself when Recycle() is called. -StreamBuffer::Ptr StreamBuffer::createWithMove(std::string &input) { +StreamBuffer::Ptr StreamBuffer::createWithMove(std::string &input, string const& idStr) { unique_lock uLock(_createMtx); if (_totalBytes >= _maxTotalBytes) { LOGS(_log, LOG_LVL_WARN, "StreamBuffer at memory limit " << _totalBytes); } _createCv.wait(uLock, []() { return _totalBytes < _maxTotalBytes; }); - Ptr ptr(new StreamBuffer(input)); + Ptr ptr(new StreamBuffer(input, idStr)); ptr->_selfKeepAlive = ptr; return ptr; } -StreamBuffer::StreamBuffer(std::string &input) { +StreamBuffer::StreamBuffer(std::string &input, string const& idStr) : _idStr(idStr) { _dataStr = std::move(input); // TODO: try to make 'data' a const char* in xrootd code. // 'data' is not being changed after being passed, so hopefully not an issue. @@ -96,7 +96,9 @@ StreamBuffer::~StreamBuffer() { /// xrdssi calls this to recycle the buffer when finished. void StreamBuffer::Recycle() { { + util::InstanceCount ica(_idStr + "_streamBuf_LDB_a"); std::lock_guard lg(_mtx); + util::InstanceCount icb(_idStr + "_streamBuf_LDB_b"); _doneWithThis = true; } _cv.notify_all(); diff --git a/src/xrdsvc/StreamBuffer.h b/src/xrdsvc/StreamBuffer.h index 5ce3dd572a..410f7087c7 100644 --- a/src/xrdsvc/StreamBuffer.h +++ b/src/xrdsvc/StreamBuffer.h @@ -54,7 +54,7 @@ class StreamBuffer : public XrdSsiStream::Buffer { /// Factory function, because this should be able to delete itself when Recycle() is called. /// The constructor uses move to avoid copying the string. - static StreamBuffer::Ptr createWithMove(std::string &input); + static StreamBuffer::Ptr createWithMove(std::string &input, std::string const& idStr); /// Set the maximum number of bytes that can be used by all instances of this class. static void setMaxTotalBytes(int64_t maxBytes); @@ -81,9 +81,10 @@ class StreamBuffer : public XrdSsiStream::Buffer { private: /// This constructor will invalidate 'input'. - explicit StreamBuffer(std::string &input); + explicit StreamBuffer(std::string &input, std::string const& idStr); std::string _dataStr; + std::string const _idStr; std::mutex _mtx; std::condition_variable _cv; bool _doneWithThis = false; From 767f11f794889ee82ae9e75da4159c4c7d8f9237 Mon Sep 17 00:00:00 2001 From: John Gates Date: Tue, 10 Jan 2023 11:56:26 -0800 Subject: [PATCH 3/4] Logging changes. --- src/ccontrol/MergingHandler.cc | 16 +++++------ src/qdisp/QueryRequest.cc | 49 +++++++++++++++++++++++++--------- src/rproc/InfileMerger.cc | 3 --- src/wbase/SendChannelShared.cc | 15 +++++++---- src/xrdsvc/StreamBuffer.cc | 4 +-- src/xrdsvc/StreamBuffer.h | 4 +-- 6 files changed, 59 insertions(+), 32 deletions(-) diff --git a/src/ccontrol/MergingHandler.cc b/src/ccontrol/MergingHandler.cc index e550c49689..38e3135979 100644 --- a/src/ccontrol/MergingHandler.cc +++ b/src/ccontrol/MergingHandler.cc @@ -99,7 +99,7 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, bool& lar switch (_state) { case MsgState::HEADER_WAIT: { - util::InstanceCount icl(_tableName + "_Merge_flush_LDB_l_header_wait"); + //&&&util::InstanceCount icl(_tableName + "_Merge_flush_LDB_l_header_wait"); _response->headerSize = static_cast((*bufPtr)[0]); if (!proto::ProtoHeaderWrap::unwrap(_response, *bufPtr)) { std::string sErr = @@ -145,7 +145,7 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, bool& lar } return true; case MsgState::RESULT_WAIT: { - util::InstanceCount icp(_tableName + "_Merge_flush_LDB_p_result_wait"); + //&&&util::InstanceCount icp(_tableName + "_Merge_flush_LDB_p_result_wait"); nextBufSize = proto::ProtoHeaderWrap::getProtoHeaderSize(); auto jobQuery = getJobQuery().lock(); if (!_verifyResult(bufPtr, bLen)) { @@ -164,9 +164,9 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, bool& lar LOGS(_log, LOG_LVL_DEBUG, "Flushed last=" << last << " for tableName=" << _tableName); auto success = _merge(); - util::InstanceCount icpx(_tableName + "_Merge_flush_LDB_px"); + //&&&util::InstanceCount icpx(_tableName + "_Merge_flush_LDB_px"); _response.reset(new WorkerResponse()); - util::InstanceCount icpz(_tableName + "_Merge_flush_LDB_pz"); + //&&&util::InstanceCount icpz(_tableName + "_Merge_flush_LDB_pz"); return success; } case MsgState::RESULT_RECV: @@ -236,9 +236,9 @@ bool MergingHandler::_merge() { if (_flushed) { throw util::Bug(ERR_LOC, "MergingRequester::_merge : already flushed"); } - util::InstanceCount icb(_tableName + "_Merge_merge_LDB_b"); + //&&&util::InstanceCount icb(_tableName + "_Merge_merge_LDB_b"); bool success = _infileMerger->merge(_response); - util::InstanceCount icc(_tableName + "_Merge_merge_LDB_c"); + //&&&util::InstanceCount icc(_tableName + "_Merge_merge_LDB_c"); if (!success) { LOGS(_log, LOG_LVL_WARN, "_merge() failed"); rproc::InfileMergerError const& err = _infileMerger->getError(); @@ -246,11 +246,11 @@ bool MergingHandler::_merge() { _state = MsgState::RESULT_ERR; } _response.reset(); - util::InstanceCount icx(_tableName + "_Merge_merge_LDB_x"); + //&&&util::InstanceCount icx(_tableName + "_Merge_merge_LDB_x"); return success; } LOGS(_log, LOG_LVL_ERROR, "MergingHandler::_merge() failed, jobQuery was NULL"); - util::InstanceCount icz(_tableName + "_Merge_merge_LDB_z"); + //&&&util::InstanceCount icz(_tableName + "_Merge_merge_LDB_z"); return false; } diff --git a/src/qdisp/QueryRequest.cc b/src/qdisp/QueryRequest.cc index 56806f7d33..d6ab4d8274 100644 --- a/src/qdisp/QueryRequest.cc +++ b/src/qdisp/QueryRequest.cc @@ -79,7 +79,7 @@ class QueryRequest::AskForResponseDataCmd : public PriorityCommand { void action(util::CmdData* data) override { // If everything is ok, call GetResponseData to have XrdSsi ask the worker for the data. QSERV_LOGCONTEXT_QUERY_JOB(_qid, _jobid); - util::InstanceCount ica("QI=" + to_string(_qid) +":" + to_string(_jobid) + "_QReq_AfRDCmd_LDB_a"); + util::InstanceCount ica("QI=" + to_string(_qid) + ":" + to_string(_jobid) + "_QReq_AfRDCmd_LDB_a"); util::Timer tWaiting; util::Timer tTotal; PseudoFifo::Element::Ptr pseudoFifoElem; @@ -108,18 +108,22 @@ class QueryRequest::AskForResponseDataCmd : public PriorityCommand { pseudoFifoElem = _pseudoFifo->queueAndWait(); tWaiting.start(); - util::InstanceCount icb("QI=" + to_string(_qid) +":" + to_string(_jobid) + "_QReq_AfRDCmd_LDB_b"); + util::InstanceCount icb("QI=" + to_string(_qid) + ":" + to_string(_jobid) + + "_QReq_AfRDCmd_LDB_b"); qr->GetResponseData(&buffer[0], buffer.size()); - util::InstanceCount icc("QI=" + to_string(_qid) +":" + to_string(_jobid) + "_QReq_AfRDCmd_LDB_c"); + util::InstanceCount icc("QI=" + to_string(_qid) + ":" + to_string(_jobid) + + "_QReq_AfRDCmd_LDB_c"); } // Wait for XrdSsi to call ProcessResponseData with the data, - // which will notify this wait with a call to receivedProcessResponseDataParameters. + // which will notify this wait with a call to notifyDataSuccess. { LOGS(_log, LOG_LVL_TRACE, "GetResponseData called respC=" << _respCount); - util::InstanceCount icd("QI=" + to_string(_qid) +":" + to_string(_jobid) + "_QReq_AfRDCmd_LDB_d"); + util::InstanceCount icd("QI=" + to_string(_qid) + ":" + to_string(_jobid) + + "_QReq_AfRDCmd_LDB_d"); std::unique_lock uLock(_mtx); - util::InstanceCount ice("QI=" + to_string(_qid) +":" + to_string(_jobid) + "_QReq_AfRDCmd_LDB_e"); + util::InstanceCount ice("QI=" + to_string(_qid) + ":" + to_string(_jobid) + + "_QReq_AfRDCmd_LDB_e"); // TODO: make timed wait, check for wedged, if weak pointers dead, log and give up. // The only purpose of the below being in a function is make this easier to find in gdb. _lockWaitQrA(uLock); @@ -142,7 +146,7 @@ class QueryRequest::AskForResponseDataCmd : public PriorityCommand { // Actually process the data. // If more data needs to be sent, _processData will make a new AskForResponseDataCmd // object and queue it. - util::InstanceCount icf("QI=" + to_string(_qid) +":" + to_string(_jobid) + "_QReq_AfRDCmd_LDB_f"); + util::InstanceCount icf("QI=" + to_string(_qid) + ":" + to_string(_jobid) + "_QReq_AfRDCmd_LDB_f"); { auto jq = _jQuery.lock(); auto qr = _qRequest.lock(); @@ -155,7 +159,7 @@ class QueryRequest::AskForResponseDataCmd : public PriorityCommand { // _processData will have created another AskForResponseDataCmd object if was needed. tTotal.stop(); } - util::InstanceCount icg("QI=" + to_string(_qid) +":" + to_string(_jobid) + "_QReq_AfRDCmd_LDB_g"); + util::InstanceCount icg("QI=" + to_string(_qid) + ":" + to_string(_jobid) + "_QReq_AfRDCmd_LDB_g"); _setState(State::DONE2); LOGS(_log, LOG_LVL_DEBUG, "Ask data is done wait=" << tWaiting.getElapsed() << " total=" << tTotal.getElapsed()); @@ -310,9 +314,17 @@ bool QueryRequest::ProcessResponse(XrdSsiErrInfo const& eInfo, XrdSsiRespInfo co case XrdSsiRespInfo::isFile: // Local-only errorDesc += "Unexpected XrdSsiRespInfo.rType == isFile"; break; - case XrdSsiRespInfo::isStream: // All remote requests + case XrdSsiRespInfo::isStream: { // All remote requests jq->getStatus()->updateInfo(_jobIdStr, JobStatus::RESPONSE_READY, "SSI"); - return _importStream(jq); + //&&&return _importStream(jq); + bool importSuccess = _importStream(jq); + LOGS(_log, LOG_LVL_INFO, "&&& _importStream importSuccess=" << importSuccess); + if (!importSuccess) { + LOGS(_log, LOG_LVL_WARN, "ProcessResponse stream import failure."); + _errorFinish(); + } + return importSuccess; + } default: errorDesc += "Out of range XrdSsiRespInfo.rType"; } @@ -641,8 +653,20 @@ void QueryRequest::cleanup() { /// a local shared pointer for this QueryRequest and/or its owner JobQuery. /// See QueryRequest::cleanup() /// @return true if this QueryRequest object had the authority to make changes. + +atomic errorFinishFalseCount{0}; //&&& +atomic errorFinishTrueCount{0}; //&&& + bool QueryRequest::_errorFinish(bool stopTrying) { LOGS(_log, LOG_LVL_DEBUG, "_errorFinish() shouldCancel=" << stopTrying); + if (stopTrying) { + ++errorFinishTrueCount; + } else { + ++errorFinishFalseCount; + } + LOGS(_log, LOG_LVL_WARN, + "&&&_errorFinish stopTrying=" << stopTrying << " tCount=" << errorFinishTrueCount + << " fCount=" << errorFinishFalseCount); auto jq = _jobQuery; { // Running _errorFinish more than once could cause errors. @@ -657,8 +681,9 @@ bool QueryRequest::_errorFinish(bool stopTrying) { } // Make the calls outside of the mutex lock. - LOGS(_log, LOG_LVL_DEBUG, "calling Finished(stopTrying=" << stopTrying << ")"); - bool ok = Finished(stopTrying); + LOGS(_log, LOG_LVL_WARN, "calling Finished(stopTrying=" << stopTrying << ")"); + //&&&bool ok = Finished(stopTrying); + bool ok = Finished(true); // &&& _finishedCalled = true; if (!ok) { LOGS(_log, LOG_LVL_ERROR, "QueryRequest::_errorFinish NOT ok"); diff --git a/src/rproc/InfileMerger.cc b/src/rproc/InfileMerger.cc index 821d935642..b38721ee78 100644 --- a/src/rproc/InfileMerger.cc +++ b/src/rproc/InfileMerger.cc @@ -291,9 +291,7 @@ bool InfileMerger::merge(std::shared_ptr const& response) throw std::invalid_argument("InfileMerger::_dbEngine is unknown =" + engineToStr(_dbEngine)); } auto end = std::chrono::system_clock::now(); - util::InstanceCount icl(_getQueryIdStr() + "_InfMerge_LDB_l"); auto mergeDur = std::chrono::duration_cast(end - start); - util::InstanceCount icm(_getQueryIdStr() + "_InfMerge_LDB_m"); LOGS(_log, LOG_LVL_DEBUG, "mergeDur=" << mergeDur.count() << " sema(total=" << _semaMgrConn->getTotalCount() << " used=" << _semaMgrConn->getUsedCount() << ")"); @@ -303,7 +301,6 @@ bool InfileMerger::merge(std::shared_ptr const& response) _invalidJobAttemptMgr.decrConcurrentMergeCount(); LOGS(_log, LOG_LVL_DEBUG, "mergeDur=" << mergeDur.count()); - util::InstanceCount icz(_getQueryIdStr() + "_InfMerge_LDB_z"); return ret; } diff --git a/src/wbase/SendChannelShared.cc b/src/wbase/SendChannelShared.cc index d0a6c15a09..fc57e23dbe 100644 --- a/src/wbase/SendChannelShared.cc +++ b/src/wbase/SendChannelShared.cc @@ -210,9 +210,11 @@ bool SendChannelShared::_transmit(bool erred) { // The first message needs to put its header data in metadata as there's // no previous message it could attach its header to. { - icPtrA = std::make_shared(thisTransmit->getIdStr() + "_te_LDB_" + to_string(reallyLast) + "_a"); //&&& + icPtrA = std::make_shared(thisTransmit->getIdStr() + "_te_LDB_" + + to_string(reallyLast) + "_a"); //&&& lock_guard streamLock(_streamMutex); // Must keep meta and buffer together. - icPtrA = std::make_shared(thisTransmit->getIdStr() + "_te_LDB_" + to_string(reallyLast) + "_b"); //&&& + icPtrA = std::make_shared(thisTransmit->getIdStr() + "_te_LDB_" + + to_string(reallyLast) + "_b"); //&&& if (_firstTransmit.exchange(false)) { // Put the header for the first message in metadata // _metaDataBuf must remain valid until Finished() is called. @@ -231,10 +233,12 @@ bool SendChannelShared::_transmit(bool erred) { { util::Timer sendTimer; sendTimer.start(); - icPtrA = std::make_shared(thisTransmit->getIdStr() + "_te_LDB_" + to_string(reallyLast) + "_c"); //&&& + icPtrA = std::make_shared(thisTransmit->getIdStr() + "_te_LDB_" + + to_string(reallyLast) + "_c"); //&&& bool sent = _sendBuf(streamLock, streamBuf, reallyLast, "transmitLoop " + idStr + " " + seqStr, scsSeq); - icPtrA = std::make_shared(thisTransmit->getIdStr() + "_te_LDB_" + to_string(reallyLast) + "_d"); //&&& + icPtrA = std::make_shared(thisTransmit->getIdStr() + "_te_LDB_" + + to_string(reallyLast) + "_d"); //&&& sendTimer.stop(); auto logMsgSend = scsTransmitSend.addTime(sendTimer.getElapsed(), idStr); LOGS(_log, LOG_LVL_INFO, logMsgSend); @@ -245,7 +249,8 @@ bool SendChannelShared::_transmit(bool erred) { } } } - icPtrA = std::make_shared(thisTransmit->getIdStr() + "_te_LDB_" + to_string(reallyLast) + "_z"); //&&& + icPtrA = std::make_shared(thisTransmit->getIdStr() + "_te_LDB_" + + to_string(reallyLast) + "_z"); //&&& // If that was the last message, break the loop. if (reallyLast) return true; } diff --git a/src/xrdsvc/StreamBuffer.cc b/src/xrdsvc/StreamBuffer.cc index ed5a9807f4..e09b9b1251 100644 --- a/src/xrdsvc/StreamBuffer.cc +++ b/src/xrdsvc/StreamBuffer.cc @@ -65,7 +65,7 @@ double StreamBuffer::percentOfMaxTotalBytesUsed() { } // Factory function, because this should be able to delete itself when Recycle() is called. -StreamBuffer::Ptr StreamBuffer::createWithMove(std::string &input, string const& idStr) { +StreamBuffer::Ptr StreamBuffer::createWithMove(std::string &input, string const &idStr) { unique_lock uLock(_createMtx); if (_totalBytes >= _maxTotalBytes) { LOGS(_log, LOG_LVL_WARN, "StreamBuffer at memory limit " << _totalBytes); @@ -76,7 +76,7 @@ StreamBuffer::Ptr StreamBuffer::createWithMove(std::string &input, string const& return ptr; } -StreamBuffer::StreamBuffer(std::string &input, string const& idStr) : _idStr(idStr) { +StreamBuffer::StreamBuffer(std::string &input, string const &idStr) : _idStr(idStr) { _dataStr = std::move(input); // TODO: try to make 'data' a const char* in xrootd code. // 'data' is not being changed after being passed, so hopefully not an issue. diff --git a/src/xrdsvc/StreamBuffer.h b/src/xrdsvc/StreamBuffer.h index 410f7087c7..9486bf64df 100644 --- a/src/xrdsvc/StreamBuffer.h +++ b/src/xrdsvc/StreamBuffer.h @@ -54,7 +54,7 @@ class StreamBuffer : public XrdSsiStream::Buffer { /// Factory function, because this should be able to delete itself when Recycle() is called. /// The constructor uses move to avoid copying the string. - static StreamBuffer::Ptr createWithMove(std::string &input, std::string const& idStr); + static StreamBuffer::Ptr createWithMove(std::string &input, std::string const &idStr); /// Set the maximum number of bytes that can be used by all instances of this class. static void setMaxTotalBytes(int64_t maxBytes); @@ -81,7 +81,7 @@ class StreamBuffer : public XrdSsiStream::Buffer { private: /// This constructor will invalidate 'input'. - explicit StreamBuffer(std::string &input, std::string const& idStr); + explicit StreamBuffer(std::string &input, std::string const &idStr); std::string _dataStr; std::string const _idStr; From 66878f4e6992bd66b8e2bd6841eaca1d2d525bbd Mon Sep 17 00:00:00 2001 From: John Gates Date: Fri, 13 Jan 2023 10:21:45 -0800 Subject: [PATCH 4/4] Added temporary code to track status of requests. --- src/qdisp/QueryRequest.cc | 171 +++++++++++++++++++++++++++++++++++-- src/qdisp/QueryRequest.h | 2 + src/xrdsvc/StreamBuffer.cc | 5 +- 3 files changed, 170 insertions(+), 8 deletions(-) diff --git a/src/qdisp/QueryRequest.cc b/src/qdisp/QueryRequest.cc index d6ab4d8274..bc74c5a9fc 100644 --- a/src/qdisp/QueryRequest.cc +++ b/src/qdisp/QueryRequest.cc @@ -218,6 +218,148 @@ class QueryRequest::AskForResponseDataCmd : public PriorityCommand { bool _last = true; }; +// &&& Keep track of items which are being received but not yet finished +class QueryRequestStatus { +public: + using Ptr = shared_ptr; + static map queryReqStatMap; // &&& + static mutex reqStatMapMtx; // &&& + + static Ptr getPtr(string const& id_) { + Ptr ptr; + lock_guard lck(reqStatMapMtx); + auto iter = queryReqStatMap.find(id_); + if (iter == queryReqStatMap.end()) { + ptr = Ptr(new QueryRequestStatus(id_)); + } else { + ptr = iter->second; + } + return ptr; + } + + static void addProcessResp(string const& id_) { + Ptr ptr = getPtr(id_); + ptr->incrProcessResp("addProcessResp"); + } + + static void addProcessRespData(string const& id_) { + Ptr ptr = getPtr(id_); + ptr->incrProcessRespData("addProcessRespData"); + } + + static void addCancelled(string const& id_) { + Ptr ptr = getPtr(id_); + ptr->incrCancelled("addCancelled"); + } + + static void addMarked(string const& id_) { + Ptr ptr = getPtr(id_); + ptr->incrMarked("addMarked"); + } + + static void addErr(string const& id_, string const& err, string const& note) { + Ptr ptr = getPtr(id_); + ptr->apendErr(err, note); + } + + static void addFinish(string const& id_, string const& note) { + Ptr ptr = getPtr(id_); + ptr->finito(note); + remove(id_); // &&& don't do this every time and remove old items that are finished?? + dumpMap(); + } + + static void dumpMap() { + stringstream os; + lock_guard lck(reqStatMapMtx); + os << "reqStatMap::"; + for (auto&& elem : queryReqStatMap) { + os << elem.second->dumpStr() << ";;"; + } + LOGS(_log, LOG_LVL_WARN, "&&& " << os.str()); + } + + static void remove(string const& id_) { + lock_guard lck(reqStatMapMtx); + queryReqStatMap.erase(id_); + } + + void incrProcessResp(string const& note) { + lock_guard lck(_mtx); + processRespCount++; + _dump(note); + } + + void incrProcessRespData(string const& note) { + lock_guard lck(_mtx); + processRespDataCount++; + _dump(note); + } + + void incrCancelled(string const& note) { + lock_guard lck(_mtx); + cancelledCount++; + _dump(note); + } + + void incrMarked(string const& note) { + lock_guard lck(_mtx); + cancelledCount++; + _dump(note); + } + + void apendErr(string const& err_, string const& note) { + lock_guard lck(_mtx); + if (err.length() < 200) { + err += err_; + } else { + err += "."; + } + _dump(note); + } + + void finito(string const& note) { + lock_guard lck(_mtx); + ++finished; + _dump(note); + } + + void dump() { + lock_guard lck(_mtx); + _dump(""); + } + + string dumpStr() { + lock_guard lck(_mtx); + return _dumpStr(""); + } + + string const id; + string err; + int processRespCount = 0; + int processRespDataCount = 0; + int cancelledCount = 0; + int markedCount = 0; + int finished = 0; + +private: + QueryRequestStatus(string const& id_) : id(id_) {} + + void _dump(string const& note) { LOGS(_log, LOG_LVL_WARN, _dumpStr("")); } + + string _dumpStr(string const& note) { + stringstream os; + os << id << " &&& " << note << " fin=" << finished << " markC=" << markedCount + << " RespC=" << processRespCount << " rData=" << processRespDataCount + << " cancelC=" << cancelledCount << " err=" << err; + return os.str(); + } + + mutex _mtx; +}; +map QueryRequestStatus::queryReqStatMap; // &&& +mutex QueryRequestStatus::reqStatMapMtx; // &&& + //////////////////////////////////////////////////////////////////////// // QueryRequest //////////////////////////////////////////////////////////////////////// @@ -242,7 +384,7 @@ QueryRequest::~QueryRequest() { } if (!_finishedCalled) { LOGS(_log, LOG_LVL_WARN, "~QueryRequest cleaning up calling Finished"); - Finished(true); + _qrFinished(true); } } @@ -269,6 +411,7 @@ char* QueryRequest::GetRequest(int& requestLength) { bool QueryRequest::ProcessResponse(XrdSsiErrInfo const& eInfo, XrdSsiRespInfo const& rInfo) { QSERV_LOGCONTEXT_QUERY_JOB(_qid, _jobid); LOGS(_log, LOG_LVL_DEBUG, "workerName=" << GetEndPoint() << " ProcessResponse"); + QueryRequestStatus::addProcessResp(_jobIdStr); //&&& string errorDesc = _jobIdStr + " "; if (isQueryCancelled()) { LOGS(_log, LOG_LVL_WARN, "QueryRequest::ProcessResponse job already cancelled"); @@ -400,6 +543,7 @@ void QueryRequest::_queueAskForResponse(AskForResponseDataCmd::Ptr const& cmd, J /// Process an incoming error. bool QueryRequest::_importError(string const& msg, int code) { + QueryRequestStatus::addErr(_jobIdStr, msg, "_importError"); //&&& auto jq = _jobQuery; { lock_guard lock(_finishStatusMutex); @@ -429,6 +573,7 @@ void QueryRequest::ProcessResponseData(XrdSsiErrInfo const& eInfo, char* buff, i LOGS(_log, LOG_LVL_DEBUG, "ProcessResponseData with buflen=" << blen << " " << (last ? "(last)" : "(more)")); + QueryRequestStatus::addProcessRespData(_jobIdStr); //&&& if (_askForResponseDataCmd == nullptr) { LOGS(_log, LOG_LVL_ERROR, "ProcessResponseData called with invalid _askForResponseDataCmd!!!"); return; @@ -599,6 +744,7 @@ bool QueryRequest::cancel() { LOGS(_log, LOG_LVL_DEBUG, "QueryRequest::cancel already cancelled, ignoring"); return false; // Don't do anything if already cancelled. } + QueryRequestStatus::addCancelled(_jobIdStr); _cancelled = true; _retried = true; // Prevent retries. // Only call the following if the job is NOT already done. @@ -659,6 +805,8 @@ atomic errorFinishTrueCount{0}; //&&& bool QueryRequest::_errorFinish(bool stopTrying) { LOGS(_log, LOG_LVL_DEBUG, "_errorFinish() shouldCancel=" << stopTrying); + QueryRequestStatus::addErr(_jobIdStr, "_errorFinish " + to_string(stopTrying), + "_errorFinish " + to_string(stopTrying)); //&&& if (stopTrying) { ++errorFinishTrueCount; } else { @@ -682,9 +830,9 @@ bool QueryRequest::_errorFinish(bool stopTrying) { // Make the calls outside of the mutex lock. LOGS(_log, LOG_LVL_WARN, "calling Finished(stopTrying=" << stopTrying << ")"); - //&&&bool ok = Finished(stopTrying); - bool ok = Finished(true); // &&& - _finishedCalled = true; + //&&&bool ok = &&&Finished(stopTrying); + bool ok = _qrFinished(true); // &&& + //&&&_finishedCalled = true; if (!ok) { LOGS(_log, LOG_LVL_ERROR, "QueryRequest::_errorFinish NOT ok"); } else { @@ -727,8 +875,8 @@ void QueryRequest::_finish() { _finishStatus = FINISHED; } - bool ok = Finished(); - _finishedCalled = true; + bool ok = _qrFinished(); + //&&&_finishedCalled = true; if (!ok) { LOGS(_log, LOG_LVL_ERROR, "QueryRequest::finish Finished() !ok "); } else { @@ -744,9 +892,20 @@ void QueryRequest::_callMarkComplete(bool success) { if (!_calledMarkComplete.exchange(true)) { auto jq = _jobQuery; if (jq != nullptr) jq->getMarkCompleteFunc()->operator()(success); + QueryRequestStatus::addMarked(_jobIdStr); } } +bool QueryRequest::_qrFinished(bool stopTrying) { + LOGS(_log, LOG_LVL_WARN, _jobIdStr << "_qrFinished A &&&stopTrying=" << stopTrying); + bool res = Finished(stopTrying); + _finishedCalled = true; + LOGS(_log, LOG_LVL_WARN, _jobIdStr << "_qrFinished B &&&stopTrying=" << stopTrying << " res=" << res); + QueryRequestStatus::addFinish(_jobIdStr, "_qrFinished stopTrying=" + to_string(stopTrying)); + + return res; +} + ostream& operator<<(ostream& os, QueryRequest const& qr) { os << "QueryRequest " << qr._jobIdStr; return os; diff --git a/src/qdisp/QueryRequest.h b/src/qdisp/QueryRequest.h index 9cefdb74ca..ef1a14625c 100644 --- a/src/qdisp/QueryRequest.h +++ b/src/qdisp/QueryRequest.h @@ -133,6 +133,8 @@ class QueryRequest : public XrdSsiRequest, public std::enable_shared_from_this