Skip to content

Commit 154426a

Browse files
committed
Minor refactoring of the result merging logic
Moved query completion/cancellation tests from the low-level context of the IfileMerger class into the MergingHandler, where the logic belongs to. The change streamlines responsibilities of the affected classes and helps code readers better understand the logic flow withing this corner of the application.
1 parent 1e8c6e9 commit 154426a

File tree

3 files changed

+23
-31
lines changed

3 files changed

+23
-31
lines changed

src/ccontrol/MergingHandler.cc

+21-9
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
#include "proto/ProtoHeaderWrap.h"
5050
#include "proto/worker.pb.h"
5151
#include "qdisp/CzarStats.h"
52+
#include "qdisp/Executive.h"
5253
#include "qdisp/JobQuery.h"
5354
#include "rproc/InfileMerger.h"
5455
#include "util/Bug.h"
@@ -330,16 +331,27 @@ void MergingHandler::_initState() { _setError(0, ""); }
330331
bool MergingHandler::_merge(proto::ResponseSummary const& responseSummary,
331332
proto::ResponseData const& responseData,
332333
shared_ptr<qdisp::JobQuery> const& jobQuery) {
333-
if (_flushed) {
334-
throw util::Bug(ERR_LOC, "already flushed");
335-
}
336-
bool success = _infileMerger->merge(responseSummary, responseData, jobQuery);
337-
if (!success) {
338-
LOGS(_log, LOG_LVL_WARN, __func__ << " failed");
339-
util::Error const& err = _infileMerger->getError();
340-
_setError(ccontrol::MSG_RESULT_ERROR, err.getMsg());
334+
if (_flushed) throw util::Bug(ERR_LOC, "already flushed");
335+
336+
// Nothing to do if size is zero.
337+
if (responseData.row_size() == 0) return true;
338+
339+
// Do nothing if the query got cancelled for any reason.
340+
if (jobQuery->isQueryCancelled()) return true;
341+
342+
// Check for other indicators that the query may have cancelled or finished.
343+
auto executive = jobQuery->getExecutive();
344+
if (executive == nullptr || executive->getCancelled() || executive->isLimitRowComplete()) {
345+
return true;
341346
}
342-
return success;
347+
348+
// Attempt the actual merge.
349+
if (_infileMerger->merge(responseSummary, responseData)) return true;
350+
351+
LOGS(_log, LOG_LVL_WARN, __func__ << " failed");
352+
util::Error const& err = _infileMerger->getError();
353+
_setError(ccontrol::MSG_RESULT_ERROR, err.getMsg());
354+
return false;
343355
}
344356

345357
void MergingHandler::_setError(int code, std::string const& msg) {

src/rproc/InfileMerger.cc

+1-18
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,6 @@
5959
#include "proto/ProtoImporter.h"
6060
#include "proto/worker.pb.h"
6161
#include "qdisp/CzarStats.h"
62-
#include "qdisp/Executive.h"
63-
#include "qdisp/JobQuery.h"
6462
#include "qproc/DatabaseModels.h"
6563
#include "query/ColumnRef.h"
6664
#include "query/SelectStmt.h"
@@ -169,28 +167,13 @@ void InfileMerger::mergeCompleteFor(int jobId) {
169167
}
170168

171169
bool InfileMerger::merge(proto::ResponseSummary const& responseSummary,
172-
proto::ResponseData const& responseData,
173-
std::shared_ptr<qdisp::JobQuery> const& jq) {
170+
proto::ResponseData const& responseData) {
174171
int const jobId = responseSummary.jobid();
175172
std::string queryIdJobStr = QueryIdHelper::makeIdStr(responseSummary.queryid(), jobId);
176173
if (!_queryIdStrSet) {
177174
_setQueryIdStr(QueryIdHelper::makeIdStr(responseSummary.queryid()));
178175
}
179176

180-
// Nothing to do if size is zero.
181-
if (responseData.row_size() == 0) {
182-
return true;
183-
}
184-
185-
// Do nothing if the query got cancelled for any reason.
186-
if (jq->isQueryCancelled()) {
187-
return true;
188-
}
189-
auto executive = jq->getExecutive();
190-
if (executive == nullptr || executive->getCancelled() || executive->isLimitRowComplete()) {
191-
return true;
192-
}
193-
194177
TimeCountTracker<double>::CALLBACKFUNC cbf = [](TIMEPOINT start, TIMEPOINT end, double bytes,
195178
bool success) {
196179
if (!success) return;

src/rproc/InfileMerger.h

+1-4
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ class ResponseData;
5353
class ResponseSummary;
5454
} // namespace proto
5555
namespace qdisp {
56-
class JobQuery;
5756
class MessageStore;
5857
} // namespace qdisp
5958
namespace qproc {
@@ -103,10 +102,8 @@ class InfileMerger {
103102
~InfileMerger() = default;
104103

105104
/// Merge a worker response, which contains a single ResponseData message
106-
/// Using job query info for early termination of the merge if needed.
107105
/// @return true if merge was successfully imported.
108-
bool merge(proto::ResponseSummary const& responseSummary, proto::ResponseData const& responseData,
109-
std::shared_ptr<qdisp::JobQuery> const& jq);
106+
bool merge(proto::ResponseSummary const& responseSummary, proto::ResponseData const& responseData);
110107

111108
/// Indicate the merge for the job is complete.
112109
void mergeCompleteFor(int jobId);

0 commit comments

Comments
 (0)