3333
3434// Third-party headers
3535#include " curl/curl.h"
36- #include " XrdCl/XrdClFile.hh"
3736
3837// LSST headers
3938#include " lsst/log/Log.h"
@@ -126,7 +125,7 @@ std::tuple<bool, bool> readHttpFileAndMergeHttp(
126125 uint32_t msgSizeBytes = 0 ;
127126 bool success = true ;
128127 bool mergeHappened = false ;
129- int headerCount = 0 ;
128+ uint64_t headerCount = 0 ;
130129 uint64_t totalBytesRead = 0 ;
131130 try {
132131 auto exec = uberJob->getExecutive ();
@@ -210,11 +209,9 @@ std::tuple<bool, bool> readHttpFileAndMergeHttp(
210209 }
211210
212211 // Parse and evaluate the message.
213- // &&&mergeHappened = messageIsReady(msgBuf.get(), msgSizeBytes, last);
214212 mergeHappened = true ;
215213 bool messageReadyResult = messageIsReady (msgBuf.get (), msgSizeBytes, last);
216214 totalBytesRead += msgSizeBytes;
217- // &&&if (!mergeHappened) {
218215 if (!messageReadyResult) {
219216 success = false ;
220217 throw runtime_error (" message processing failed at offset " +
@@ -276,41 +273,21 @@ shared_ptr<http::ClientConnPool> const& MergingHandler::_getHttpConnPool() {
276273 return _httpConnPool;
277274}
278275
279- MergingHandler::MergingHandler (std::shared_ptr<rproc::InfileMerger> merger, std::string const & tableName)
280- : _infileMerger{merger}, _tableName{tableName} {}
276+ MergingHandler::MergingHandler (std::shared_ptr<rproc::InfileMerger> const & merger,
277+ std::shared_ptr<qdisp::Executive> const & exec)
278+ : _infileMerger(merger), _executive(exec) {}
281279
282- MergingHandler::~MergingHandler () { LOGS (_log, LOG_LVL_TRACE, __func__ << " " << _tableName ); }
280+ MergingHandler::~MergingHandler () { LOGS (_log, LOG_LVL_TRACE, __func__); }
283281
284282void MergingHandler::errorFlush (std::string const & msg, int code) {
285- _setError (code, msg);
283+ _setError (code, msg, util::ErrorCode::RESULT_IMPORT );
286284 // Might want more info from result service.
287285 // Do something about the error. FIXME.
288286 LOGS (_log, LOG_LVL_ERROR, " Error receiving result." );
289287}
290288
291- // Note that generally we always have an _infileMerger object except during
292- // a unit test. I suppose we could try to figure out how to create one.
293- //
294- void MergingHandler::prepScrubResults (int jobId, int attemptCount) {
295- if (_infileMerger) _infileMerger->prepScrub (jobId, attemptCount);
296- }
297-
298289std::ostream& MergingHandler::print (std::ostream& os) const {
299- return os << " MergingRequester(" << _tableName << " , flushed=" << (_flushed ? " true)" : " false)" );
300- }
301-
302- bool MergingHandler::_mergeHttp (shared_ptr<qdisp::UberJob> const & uberJob,
303- proto::ResponseData const & responseData) {
304- if (_flushed) {
305- throw util::Bug (ERR_LOC, " already flushed" );
306- }
307- bool const success = _infileMerger->mergeHttp (uberJob, responseData);
308- if (!success) {
309- LOGS (_log, LOG_LVL_WARN, __func__ << " failed" );
310- util::Error const & err = _infileMerger->getError ();
311- _setError (ccontrol::MSG_RESULT_ERROR, err.getMsg ());
312- }
313- return success;
290+ return os << " MergingRequester(flushed=" << (_flushed ? " true)" : " false)" );
314291}
315292
316293bool MergingHandler::_mergeHttp (shared_ptr<qdisp::UberJob> const & uberJob,
@@ -322,15 +299,16 @@ bool MergingHandler::_mergeHttp(shared_ptr<qdisp::UberJob> const& uberJob,
322299 if (!success) {
323300 LOGS (_log, LOG_LVL_WARN, __func__ << " failed" );
324301 util::Error const & err = _infileMerger->getError ();
325- _setError (ccontrol::MSG_RESULT_ERROR, err.getMsg ());
302+ _setError (ccontrol::MSG_RESULT_ERROR, err.getMsg (), util::ErrorCode::RESULT_IMPORT );
326303 }
327304 return success;
328305}
329306
330- void MergingHandler::_setError (int code, std::string const & msg) {
307+ void MergingHandler::_setError (int code, std::string const & msg, int errorState ) {
331308 LOGS (_log, LOG_LVL_DEBUG, " _setError: code: " << code << " , message: " << msg);
332- std::lock_guard<std::mutex> lock (_errorMutex);
333- _error = Error (code, msg);
309+ auto exec = _executive.lock ();
310+ if (exec == nullptr ) return ;
311+ exec->addMultiError (code, msg, errorState);
334312}
335313
336314tuple<bool , bool > MergingHandler::flushHttp (string const & fileUrl, uint64_t expectedRows,
@@ -380,10 +358,9 @@ tuple<bool, bool> MergingHandler::flushHttp(string const& fileUrl, uint64_t expe
380358 return {success, shouldCancel};
381359}
382360
383- void MergingHandler::flushHttpError (int errorCode, std::string const & errorMsg, int status ) {
361+ void MergingHandler::flushHttpError (int errorCode, std::string const & errorMsg, int errState ) {
384362 if (!_errorSet.exchange (true )) {
385- _error = util::Error (errorCode, errorMsg, util::ErrorCode::MYSQLEXEC);
386- _setError (ccontrol::MSG_RESULT_ERROR, _error.getMsg ());
363+ _setError (errorCode, errorMsg, errState);
387364 }
388365}
389366
0 commit comments