33
33
34
34
// Third-party headers
35
35
#include " curl/curl.h"
36
- #include " XrdCl/XrdClFile.hh"
37
36
38
37
// LSST headers
39
38
#include " lsst/log/Log.h"
49
48
#include " proto/ProtoHeaderWrap.h"
50
49
#include " proto/worker.pb.h"
51
50
#include " qdisp/CzarStats.h"
51
+ #include " qdisp/Executive.h"
52
52
#include " qdisp/JobQuery.h"
53
53
#include " qdisp/UberJob.h"
54
54
#include " rproc/InfileMerger.h"
@@ -273,13 +273,14 @@ shared_ptr<http::ClientConnPool> const& MergingHandler::_getHttpConnPool() {
273
273
return _httpConnPool;
274
274
}
275
275
276
- MergingHandler::MergingHandler (std::shared_ptr<rproc::InfileMerger> merger, std::string const & tableName)
277
- : _infileMerger{merger}, _tableName{tableName} {}
276
+ MergingHandler::MergingHandler (std::shared_ptr<rproc::InfileMerger> const & merger,
277
+ std::shared_ptr<qdisp::Executive> const & exec)
278
+ : _infileMerger(merger), _executive(exec) {}
278
279
279
- MergingHandler::~MergingHandler () { LOGS (_log, LOG_LVL_TRACE, __func__ << " " << _tableName ); }
280
+ MergingHandler::~MergingHandler () { LOGS (_log, LOG_LVL_TRACE, __func__); }
280
281
281
282
void MergingHandler::errorFlush (std::string const & msg, int code) {
282
- _setError (code, msg);
283
+ _setError (code, msg, util::ErrorCode::RESULT_IMPORT );
283
284
// Might want more info from result service.
284
285
// Do something about the error. FIXME.
285
286
LOGS (_log, LOG_LVL_ERROR, " Error receiving result." );
@@ -293,7 +294,7 @@ void MergingHandler::prepScrubResults(int jobId, int attemptCount) {
293
294
}
294
295
295
296
std::ostream& MergingHandler::print (std::ostream& os) const {
296
- return os << " MergingRequester(" << _tableName << " , flushed=" << (_flushed ? " true)" : " false)" );
297
+ return os << " MergingRequester(flushed=" << (_flushed ? " true)" : " false)" );
297
298
}
298
299
299
300
bool MergingHandler::_mergeHttp (shared_ptr<qdisp::UberJob> const & uberJob,
@@ -305,15 +306,16 @@ bool MergingHandler::_mergeHttp(shared_ptr<qdisp::UberJob> const& uberJob,
305
306
if (!success) {
306
307
LOGS (_log, LOG_LVL_WARN, __func__ << " failed" );
307
308
util::Error const & err = _infileMerger->getError ();
308
- _setError (ccontrol::MSG_RESULT_ERROR, err.getMsg ());
309
+ _setError (ccontrol::MSG_RESULT_ERROR, err.getMsg (), util::ErrorCode::RESULT_IMPORT );
309
310
}
310
311
return success;
311
312
}
312
313
313
- void MergingHandler::_setError (int code, std::string const & msg) {
314
+ void MergingHandler::_setError (int code, std::string const & msg, int errorState ) {
314
315
LOGS (_log, LOG_LVL_DEBUG, " _setError: code: " << code << " , message: " << msg);
315
- std::lock_guard<std::mutex> lock (_errorMutex);
316
- _error = Error (code, msg);
316
+ auto exec = _executive.lock ();
317
+ if (exec == nullptr ) return ;
318
+ exec->addMultiError (code, msg, errorState);
317
319
}
318
320
319
321
tuple<bool , bool > MergingHandler::flushHttp (string const & fileUrl, uint64_t expectedRows,
@@ -363,10 +365,9 @@ tuple<bool, bool> MergingHandler::flushHttp(string const& fileUrl, uint64_t expe
363
365
return {success, shouldCancel};
364
366
}
365
367
366
- void MergingHandler::flushHttpError (int errorCode, std::string const & errorMsg, int status ) {
368
+ void MergingHandler::flushHttpError (int errorCode, std::string const & errorMsg, int errState ) {
367
369
if (!_errorSet.exchange (true )) {
368
- _error = util::Error (errorCode, errorMsg, util::ErrorCode::MYSQLEXEC);
369
- _setError (ccontrol::MSG_RESULT_ERROR, _error.getMsg ());
370
+ _setError (errorCode, errorMsg, errState);
370
371
}
371
372
}
372
373
0 commit comments