3434
3535// Third-party headers
3636#include " curl/curl.h"
37- #include " XrdCl/XrdClFile.hh"
3837
3938// LSST headers
4039#include " lsst/log/Log.h"
@@ -127,7 +126,7 @@ std::tuple<bool, bool> readHttpFileAndMergeHttp(
127126 uint32_t msgSizeBytes = 0 ;
128127 bool success = true ;
129128 bool mergeHappened = false ;
130- int headerCount = 0 ;
129+ uint64_t headerCount = 0 ;
131130 uint64_t totalBytesRead = 0 ;
132131 try {
133132 auto exec = uberJob->getExecutive ();
@@ -211,11 +210,9 @@ std::tuple<bool, bool> readHttpFileAndMergeHttp(
211210 }
212211
213212 // Parse and evaluate the message.
214- // &&&mergeHappened = messageIsReady(msgBuf.get(), msgSizeBytes, last);
215213 mergeHappened = true ;
216214 bool messageReadyResult = messageIsReady (msgBuf.get (), msgSizeBytes, last);
217215 totalBytesRead += msgSizeBytes;
218- // &&&if (!mergeHappened) {
219216 if (!messageReadyResult) {
220217 success = false ;
221218 throw runtime_error (" message processing failed at offset " +
@@ -265,10 +262,23 @@ std::tuple<bool, bool> readHttpFileAndMergeHttp(
265262
266263namespace lsst ::qserv::ccontrol {
267264
268- MergingHandler::MergingHandler (std:: shared_ptr<rproc::InfileMerger> merger, std::string const & tableName)
269- : _infileMerger{merger}, _tableName{tableName} {}
265+ shared_ptr<http::ClientConnPool> MergingHandler::_httpConnPool;
266+ mutex MergingHandler::_httpConnPoolMutex;
270267
271- MergingHandler::~MergingHandler () { LOGS (_log, LOG_LVL_TRACE, __func__ << " " << _tableName); }
268+ shared_ptr<http::ClientConnPool> const & MergingHandler::_getHttpConnPool () {
269+ lock_guard<mutex> const lock (_httpConnPoolMutex);
270+ if (nullptr == _httpConnPool) {
271+ _httpConnPool = make_shared<http::ClientConnPool>(
272+ cconfig::CzarConfig::instance ()->getResultMaxHttpConnections ());
273+ }
274+ return _httpConnPool;
275+ }
276+
277+ MergingHandler::MergingHandler (std::shared_ptr<rproc::InfileMerger> const & merger,
278+ std::shared_ptr<qdisp::Executive> const & exec)
279+ : _infileMerger(merger), _executive(exec) {}
280+
281+ MergingHandler::~MergingHandler () { LOGS (_log, LOG_LVL_TRACE, __func__); }
272282
273283
274284bool MergingHandler::flush (proto::ResponseSummary const & resp) {
@@ -317,35 +327,14 @@ bool MergingHandler::flush(proto::ResponseSummary const& resp) {
317327}
318328
319329void MergingHandler::errorFlush (std::string const & msg, int code) {
320- _setError (code, msg);
330+ _setError (code, msg, util::ErrorCode::RESULT_IMPORT );
321331 // Might want more info from result service.
322332 // Do something about the error. FIXME.
323333 LOGS (_log, LOG_LVL_ERROR, " Error receiving result." );
324334}
325335
326- // Note that generally we always have an _infileMerger object except during
327- // a unit test. I suppose we could try to figure out how to create one.
328- //
329- void MergingHandler::prepScrubResults (int jobId, int attemptCount) {
330- if (_infileMerger) _infileMerger->prepScrub (jobId, attemptCount);
331- }
332-
333336std::ostream& MergingHandler::print (std::ostream& os) const {
334- return os << " MergingRequester(" << _tableName << " , flushed=" << (_flushed ? " true)" : " false)" );
335- }
336-
337- bool MergingHandler::_mergeHttp (shared_ptr<qdisp::UberJob> const & uberJob,
338- proto::ResponseData const & responseData) {
339- if (_flushed) {
340- throw util::Bug (ERR_LOC, " already flushed" );
341- }
342- bool const success = _infileMerger->mergeHttp (uberJob, responseData);
343- if (!success) {
344- LOGS (_log, LOG_LVL_WARN, __func__ << " failed" );
345- util::Error const & err = _infileMerger->getError ();
346- _setError (ccontrol::MSG_RESULT_ERROR, err.getMsg ());
347- }
348- return success;
337+ return os << " MergingRequester(flushed=" << (_flushed ? " true)" : " false)" );
349338}
350339
351340bool MergingHandler::_mergeHttp (shared_ptr<qdisp::UberJob> const & uberJob,
@@ -357,15 +346,16 @@ bool MergingHandler::_mergeHttp(shared_ptr<qdisp::UberJob> const& uberJob,
357346 if (!success) {
358347 LOGS (_log, LOG_LVL_WARN, __func__ << " failed" );
359348 util::Error const & err = _infileMerger->getError ();
360- _setError (ccontrol::MSG_RESULT_ERROR, err.getMsg ());
349+ _setError (ccontrol::MSG_RESULT_ERROR, err.getMsg (), util::ErrorCode::RESULT_IMPORT );
361350 }
362351 return success;
363352}
364353
365- void MergingHandler::_setError (int code, std::string const & msg) {
354+ void MergingHandler::_setError (int code, std::string const & msg, int errorState ) {
366355 LOGS (_log, LOG_LVL_DEBUG, " _setError: code: " << code << " , message: " << msg);
367- std::lock_guard<std::mutex> lock (_errorMutex);
368- _error = Error (code, msg);
356+ auto exec = _executive.lock ();
357+ if (exec == nullptr ) return ;
358+ exec->addMultiError (code, msg, errorState);
369359}
370360
371361tuple<bool , bool > MergingHandler::flushHttp (string const & fileUrl, uint64_t expectedRows,
@@ -415,10 +405,9 @@ tuple<bool, bool> MergingHandler::flushHttp(string const& fileUrl, uint64_t expe
415405 return {success, shouldCancel};
416406}
417407
418- void MergingHandler::flushHttpError (int errorCode, std::string const & errorMsg, int status ) {
408+ void MergingHandler::flushHttpError (int errorCode, std::string const & errorMsg, int errState ) {
419409 if (!_errorSet.exchange (true )) {
420- _error = util::Error (errorCode, errorMsg, util::ErrorCode::MYSQLEXEC);
421- _setError (ccontrol::MSG_RESULT_ERROR, _error.getMsg ());
410+ _setError (errorCode, errorMsg, errState);
422411 }
423412}
424413
0 commit comments