Skip to content

Commit 375afc8

Browse files
committed
Added JobErrorMsg.
1 parent 0b0fe1e commit 375afc8

37 files changed

+905
-438
lines changed

src/ccontrol/MergingHandler.cc

Lines changed: 240 additions & 126 deletions
Large diffs are not rendered by default.

src/ccontrol/MergingHandler.h

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,12 @@ class MergingHandler : public qdisp::ResponseHandler {
7070

7171
/// @see ResponseHandler::flushHttp
7272
/// @see MerginHandler::_mergeHttp
73-
std::tuple<bool, bool> flushHttp(std::string const& fileUrl, uint64_t expectedRows,
74-
uint64_t& resultRows) override;
73+
std::tuple<bool, bool> flushHttp(
74+
std::string const& fileUrl, uint64_t fileSize, uint64_t expectedRows,
75+
/* &&&
76+
std::tuple<bool, bool> flushHttp(std::string const& fileUrl, uint64_t expectedRows,
77+
>>>>>>> a27525c04017db9a30061fa0bb4b5228c0c5d1b2 */
78+
uint64_t& resultRows) override;
7579

7680
/// @see ResponseHandler::flushHttpError
7781
void flushHttpError(int errorCode, std::string const& errorMsg, int status) override;
@@ -84,7 +88,9 @@ class MergingHandler : public qdisp::ResponseHandler {
8488

8589
private:
8690
/// Call InfileMerger to do the work of merging this data to the result.
87-
bool _mergeHttp(std::shared_ptr<qdisp::UberJob> const& uberJob, proto::ResponseData const& responseData);
91+
92+
bool _mergeHttp(std::shared_ptr<qdisp::UberJob> const& uberJob, std::string const& fileUrl,
93+
uint64_t fileSize);
8894

8995
/// Set error code and string.
9096
void _setError(int code, std::string const& msg, int errorState);

src/ccontrol/UserQueryFactory.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ std::shared_ptr<UserQuery> _makeUserQueryProcessList(query::SelectStmt::Ptr& stm
135135
LOGS(_log, LOG_LVL_DEBUG, "SELECT query is a PROCESSLIST");
136136
try {
137137
return std::make_shared<UserQueryProcessList>(stmt, sharedResources->qMetaSelect,
138-
sharedResources->czarId, userQueryId, resultDb);
138+
sharedResources->qMetaCzarId, userQueryId, resultDb);
139139
} catch (std::exception const& exc) {
140140
return std::make_shared<UserQueryInvalid>(exc.what());
141141
}

src/ccontrol/UserQueryProcessList.cc

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
// Qserv headers
3636
#include "css/CssAccess.h"
3737
#include "css/CssError.h"
38+
#include "cconfig/CzarConfig.h"
3839
#include "qmeta/MessageStore.h"
3940
#include "qmeta/Exceptions.h"
4041
#include "qmeta/QMetaSelect.h"
@@ -65,8 +66,7 @@ UserQueryProcessList::UserQueryProcessList(std::shared_ptr<query::SelectStmt> co
6566
std::shared_ptr<qmeta::QMetaSelect> const& qMetaSelect,
6667
qmeta::CzarId czarId, std::string const& userQueryId,
6768
std::string const& resultDb)
68-
: _resultDbConn(resultDbConn),
69-
_qMetaSelect(qMetaSelect),
69+
: _qMetaSelect(qMetaSelect),
7070
_qMetaCzarId(qMetaCzarId),
7171
_messageStore(std::make_shared<qmeta::MessageStore>()),
7272
_resultTableName(::g_nextResultTableId(userQueryId)),
@@ -93,8 +93,7 @@ UserQueryProcessList::UserQueryProcessList(std::shared_ptr<query::SelectStmt> co
9393
UserQueryProcessList::UserQueryProcessList(bool full, std::shared_ptr<qmeta::QMetaSelect> const& qMetaSelect,
9494
qmeta::CzarId czarId, std::string const& userQueryId,
9595
std::string const& resultDb)
96-
: _resultDbConn(resultDbConn),
97-
_qMetaSelect(qMetaSelect),
96+
: _qMetaSelect(qMetaSelect),
9897
_qMetaCzarId(qMetaCzarId),
9998
_messageStore(std::make_shared<qmeta::MessageStore>()),
10099
_resultTableName(::g_nextResultTableId(userQueryId)),

src/ccontrol/UserQueryResources.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "cconfig/CzarConfig.h"
2929
#include "qmeta/QMeta.h"
3030
#include "qmeta/QProgress.h"
31+
3132
namespace lsst::qserv::ccontrol {
3233

3334
UserQuerySharedResources::UserQuerySharedResources(

src/ccontrol/UserQuerySelect.cc

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -582,11 +582,10 @@ QueryState UserQuerySelect::join() {
582582

583583
/// Release resources held by the merger
584584
void UserQuerySelect::_discardMerger(std::lock_guard<std::mutex> const& lock) {
585-
_infileMergerConfig.reset();
586585
if (_infileMerger && !_infileMerger->isFinished()) {
587586
throw UserQueryError(getQueryIdString() + " merger unfinished, cannot discard");
588587
}
589-
_infileMerger.reset();
588+
_infileMergerConfig.reset();
590589
}
591590

592591
/// Release resources.
@@ -608,9 +607,8 @@ void UserQuerySelect::discard() {
608607
throw UserQueryError(getQueryIdString() + " Executive unfinished, cannot discard");
609608
}
610609

610+
// Deleting the executive may save some time if results were found early.
611611
_executive.reset();
612-
_messageStore.reset();
613-
_qSession.reset();
614612

615613
try {
616614
_discardMerger(lock);

src/czar/HttpCzarWorkerModule.cc

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@
2929
// Qserv headers
3030
#include "cconfig/CzarConfig.h"
3131
#include "czar/Czar.h"
32-
#include "protojson/JobReadyMsg.h"
32+
#include "protojson/UberJobErrorMsg.h"
33+
#include "protojson/UberJobReadyMsg.h"
3334
#include "protojson/WorkerCzarComIssue.h"
3435
#include "qdisp/Executive.h"
3536
#include "qdisp/UberJob.h"
@@ -107,14 +108,14 @@ json HttpCzarWorkerModule::_handleJobError(string const& func) {
107108
// Parse and verify the json message and then kill the UberJob.
108109
json jsRet = {{"success", 0}, {"errortype", "unknown"}, {"note", "initialized"}};
109110
try {
110-
// TODO:UJ see wbase::UberJobData::responseError for message construction
111-
string const targetWorkerId = body().required<string>("workerid");
112-
string const czarName = body().required<string>("czar");
113-
qmeta::CzarId const czarId = body().required<qmeta::CzarId>("czarid");
114-
QueryId const queryId = body().required<QueryId>("queryid");
115-
UberJobId const uberJobId = body().required<UberJobId>("uberjobid");
116-
int const errorCode = body().required<int>("errorCode");
117-
string const errorMsg = body().required<string>("errorMsg");
111+
string const& repliInstanceId = cconfig::CzarConfig::instance()->replicationInstanceId();
112+
string const& repliAuthKey = cconfig::CzarConfig::instance()->replicationAuthKey();
113+
auto const& jsReq = body().objJson;
114+
auto jrMsg = protojson::UberJobErrorMsg::createFromJson(jsReq, repliInstanceId, repliAuthKey);
115+
116+
auto const queryId = jrMsg->getQueryId();
117+
auto const czarId = jrMsg->getCzarId();
118+
auto const uberJobId = jrMsg->getUberJobId();
118119

119120
// Find UberJob
120121
qdisp::Executive::Ptr exec = czar::Czar::getCzar()->getExecutiveFromMap(queryId);
@@ -129,9 +130,8 @@ json HttpCzarWorkerModule::_handleJobError(string const& func) {
129130
" czar=" + to_string(czarId));
130131
}
131132

132-
auto importRes = uj->workerError(errorCode, errorMsg);
133+
auto importRes = uj->workerError(jrMsg->getErrorCode(), jrMsg->getErrorMsg());
133134
jsRet = importRes;
134-
135135
} catch (std::invalid_argument const& iaEx) {
136136
LOGS(_log, LOG_LVL_ERROR,
137137
"HttpCzarWorkerModule::_handleJobError received " << iaEx.what() << " js=" << body().objJson);
@@ -148,10 +148,8 @@ json HttpCzarWorkerModule::_handleJobReady(string const& func) {
148148
// Parse and verify the json message and then have the uberjob import the file.
149149
json jsRet = {{"success", 1}, {"errortype", "unknown"}, {"note", "initialized"}};
150150
try {
151-
string const repliInstanceId = cconfig::CzarConfig::instance()->replicationInstanceId();
152-
string const repliAuthKey = cconfig::CzarConfig::instance()->replicationAuthKey();
153151
auto const& jsReq = body().objJson;
154-
auto jrMsg = protojson::JobReadyMsg::createFromJson(jsReq, repliInstanceId, repliAuthKey);
152+
auto jrMsg = protojson::UberJobReadyMsg::createFromJson(jsReq);
155153

156154
// Find UberJob
157155
auto queryId = jrMsg->getQueryId();
@@ -212,7 +210,7 @@ json HttpCzarWorkerModule::_handleWorkerCzarComIssue(string const& func) {
212210
execPtr->killIncompleteUberJobsOnWorker(wId);
213211
}
214212
}
215-
jsRet = wccIssue->serializeResponseJson();
213+
jsRet = wccIssue->responseToJson();
216214
LOGS(_log, LOG_LVL_TRACE, "HttpCzarWorkerModule::_handleWorkerCzarComIssue jsRet=" << jsRet.dump());
217215

218216
} catch (std::invalid_argument const& iaEx) {

src/mysql/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ add_library(mysql SHARED)
22
add_dependencies(mysql proto)
33

44
target_sources(mysql PRIVATE
5+
CsvBuffer.cc
56
LocalInfile.cc
67
MySqlConfig.cc
78
MySqlConnection.cc
89
MySqlUtils.cc
9-
CsvBuffer.cc
1010
SchemaFactory.cc
1111
)
1212

src/mysql/LocalInfile.cc

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,6 @@ int LocalInfile::getError(char* buf, unsigned int bufLen) {
132132
return 0;
133133
}
134134

135-
LocalInfile::Mgr::~Mgr() { LOGS(_log, LOG_LVL_TRACE, "LocalInfile::Mgr::~Mgr()"); }
136-
137135
void LocalInfile::Mgr::attach(MYSQL* mysql) {
138136
mysql_set_local_infile_handler(mysql, local_infile_init, local_infile_read, local_infile_end,
139137
local_infile_error, this);
@@ -156,7 +154,11 @@ std::string LocalInfile::Mgr::prepareSrc(std::shared_ptr<CsvBuffer> const& csvBu
156154
int LocalInfile::Mgr::local_infile_init(void** ptr, const char* filename, void* userdata) {
157155
assert(userdata);
158156
LocalInfile::Mgr* m = static_cast<LocalInfile::Mgr*>(userdata);
157+
<<<<<<< HEAD
159158
auto csvBuffer = m->get(std::string(filename));
159+
=======
160+
auto csvBuffer = m->getCsv(std::string(filename));
161+
>>>>>>> 24f5d37ea (Added JobErrorMsg.)
160162
assert(csvBuffer);
161163
LocalInfile* lf = new LocalInfile(filename, csvBuffer);
162164
*ptr = lf;
@@ -190,10 +192,17 @@ void LocalInfile::Mgr::setBuffer(std::string const& filename, std::shared_ptr<Cs
190192
}
191193
}
192194

195+
<<<<<<< HEAD
193196
std::shared_ptr<CsvBuffer> LocalInfile::Mgr::get(std::string const& filename) {
194197
std::lock_guard<std::mutex> lock(_mapMutex);
195198
CsvBufferMap::iterator i = _map.find(filename);
196199
if (i == _map.end()) {
200+
=======
201+
std::shared_ptr<CsvBuffer> LocalInfile::Mgr::getCsv(std::string const& filename) {
202+
std::lock_guard<std::mutex> lock(_mapMutex);
203+
auto i = _mapCsv.find(filename);
204+
if (i == _mapCsv.end()) {
205+
>>>>>>> 24f5d37ea (Added JobErrorMsg.)
197206
return std::shared_ptr<CsvBuffer>();
198207
}
199208
return i->second;
@@ -209,7 +218,11 @@ std::string LocalInfile::Mgr::_nextFilename() {
209218

210219
bool LocalInfile::Mgr::_set(std::string const& filename, std::shared_ptr<CsvBuffer> const& csvBuffer) {
211220
std::lock_guard<std::mutex> lock(_mapMutex);
221+
<<<<<<< HEAD
212222
auto res = _map.insert(std::pair<std::string, std::shared_ptr<CsvBuffer>>(filename, csvBuffer));
223+
=======
224+
auto res = _mapCsv.insert(std::pair<std::string, std::shared_ptr<CsvBuffer>>(filename, csvBuffer));
225+
>>>>>>> 24f5d37ea (Added JobErrorMsg.)
213226
return res.second;
214227
}
215228

src/mysql/LocalInfile.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ class LocalInfile::Mgr : boost::noncopyable {
125125

126126
std::string insertBuffer(std::shared_ptr<CsvBuffer> const& csvBuffer);
127127
void setBuffer(std::string const& s, std::shared_ptr<CsvBuffer> const& csvBuffer);
128-
std::shared_ptr<CsvBuffer> get(std::string const& filename);
128+
std::shared_ptr<CsvBuffer> getCsv(std::string const& filename);
129129

130130
private:
131131
/// @return next filename
@@ -135,7 +135,7 @@ class LocalInfile::Mgr : boost::noncopyable {
135135
bool _set(std::string const& filename, std::shared_ptr<CsvBuffer> const& csvBuffer);
136136

137137
typedef std::map<std::string, std::shared_ptr<CsvBuffer>> CsvBufferMap;
138-
CsvBufferMap _map;
138+
CsvBufferMap _mapCsv;
139139
std::mutex _mapMutex;
140140
};
141141

0 commit comments

Comments
 (0)