3939#include " lsst/log/Log.h"
4040
4141// Qserv headers
42+ #include " cconfig/CzarConfig.h"
4243#include " ccontrol/msgCode.h"
4344#include " global/clock_defs.h"
4445#include " global/debugUtil.h"
4546#include " http/Client.h"
4647#include " http/ClientConnPool.h"
4748#include " http/Method.h"
48- #include " mysql/CsvBuffer .h"
49+ #include " mysql/CsvMemDisk .h"
4950#include " qdisp/CzarStats.h"
5051#include " qdisp/Executive.h"
5152#include " qdisp/JobQuery.h"
52- #include " qdisp/QueryRequest.h"
5353#include " qdisp/UberJob.h"
5454#include " rproc/InfileMerger.h"
5555#include " util/Bug.h"
@@ -84,7 +84,6 @@ lsst::qserv::TimeCountTracker<double>::CALLBACKFUNC const reportFileRecvRate =
8484 }
8585 };
8686
87-
8887string readHttpFileAndMerge (lsst::qserv::qdisp::UberJob::Ptr const & uberJob, string const & httpUrl,
8988 size_t fileSize, function<void (char const *, uint32_t )> const & messageIsReady,
9089 shared_ptr<http::ClientConnPool> const & httpConnPool) {
@@ -180,51 +179,6 @@ MergingHandler::MergingHandler(std::shared_ptr<rproc::InfileMerger> const& merge
180179
181180MergingHandler::~MergingHandler () { LOGS (_log, LOG_LVL_TRACE, __func__); }
182181
183-
184- bool MergingHandler::flush (proto::ResponseSummary const & resp) {
185- _wName = resp.wname ();
186-
187- // This is needed to ensure the job query would be staying alive for the duration
188- // of the operation to prevent inconsistency within the application.
189- auto const jobQuery = getJobQuery ().lock ();
190- if (jobQuery == nullptr ) {
191- LOGS (_log, LOG_LVL_ERROR, __func__ << " failed, jobQuery was NULL" );
192- return false ;
193- }
194- auto const jobQuery = std::dynamic_pointer_cast<qdisp::JobQuery>(jobBase);
195-
196- LOGS (_log, LOG_LVL_TRACE,
197- " MergingHandler::" << __func__ << " jobid=" << resp.jobid () << " transmitsize="
198- << resp.transmitsize () << " rowcount=" << resp.rowcount () << " rowSize="
199- << " attemptcount=" << resp.attemptcount () << " errorcode=" << resp.errorcode ()
200- << " errormsg=" << resp.errormsg ());
201-
202- if (resp.errorcode () != 0 || !resp.errormsg ().empty ()) {
203- _error = util::Error (resp.errorcode (), resp.errormsg (), util::ErrorCode::MYSQLEXEC);
204- _setError (ccontrol::MSG_RESULT_ERROR, _error.getMsg ());
205- LOGS (_log, LOG_LVL_ERROR,
206- " MergingHandler::" << __func__ << " error from worker:" << resp.wname () << " error: " << _error);
207- // This way we can track if the worker has reported this error. The current implementation
208- // requires the large result size to be reported as an error via the InfileMerger regardless
209- // of an origin of the error (Czar or the worker). Note that large results can be produced
210- // by the Czar itself, e.g., when the aggregate result of multiple worker queries is too large
211- // or by the worker when the result set of a single query is too large.
212- // The error will be reported to the Czar as a part of the response summary.
213- if (resp.errorcode () == util::ErrorCode::WORKER_RESULT_TOO_LARGE) {
214- _infileMerger->setResultSizeLimitExceeded ();
215- }
216- return false ;
217- }
218-
219- bool const success = _merge (resp, jobQuery);
220- if (success) {
221- _infileMerger->mergeCompleteFor (resp.jobid ());
222- qdisp::CzarStats::get ()->addTotalRowsRecv (resp.rowcount ());
223- qdisp::CzarStats::get ()->addTotalBytesRecv (resp.transmitsize ());
224- }
225- return success;
226- }
227-
228182void MergingHandler::errorFlush (std::string const & msg, int code) {
229183 _setError (code, msg, util::ErrorCode::RESULT_IMPORT);
230184 // Might want more info from result service.
@@ -243,13 +197,8 @@ qdisp::MergeEndStatus MergingHandler::_mergeHttp(qdisp::UberJob::Ptr const& uber
243197 }
244198
245199 if (fileSize == 0 ) return qdisp::MergeEndStatus (true );
246-
247- // Read from the http stream and push records into the CSV stream in a separate thread.
248- // Note the fixed capacity of the stream which allows up to 2 records to be buffered
249- // in the stream. This is enough to hide the latency of the HTTP connection and
250- // the time needed to read the file.
251- auto csvStream = mysql::CsvStream::create (2 );
252- _csvStream = csvStream;
200+ auto csvMemDisk = mysql::CsvMemDisk::create (fileSize, uberJob->getQueryId (), uberJob->getUjId ());
201+ _csvMemDisk = csvMemDisk;
253202
254203 // This must be after setting _csvStream to avoid cancelFileMerge()
255204 // race issues, and it needs to be before the thread starts.
@@ -259,46 +208,46 @@ qdisp::MergeEndStatus MergingHandler::_mergeHttp(qdisp::UberJob::Ptr const& uber
259208 }
260209
261210 string fileReadErrorMsg;
262- thread csvThread ([uberJob, csvStream, fileUrl, fileSize, &fileReadErrorMsg ]() {
211+ auto transferFunc = [& ]() {
263212 size_t bytesRead = 0 ;
264213 fileReadErrorMsg = ::readHttpFileAndMerge (
265214 uberJob, fileUrl, fileSize,
266- [uberJob, csvStream, fileSize, &bytesRead ](char const * buf, uint32_t size) {
215+ [& ](char const * buf, uint32_t size) {
267216 bool last = false ;
268217 if (buf == nullptr || size == 0 ) {
269218 last = true ;
270219 } else {
271- csvStream ->push (buf, size);
220+ csvMemDisk ->push (buf, size);
272221 bytesRead += size;
273222 last = bytesRead >= fileSize;
274223 }
275224 if (last) {
276- csvStream ->push (nullptr , 0 );
225+ csvMemDisk ->push (nullptr , 0 );
277226 }
278227 },
279228 MergingHandler::_getHttpConnPool ());
280229 // Push the stream terminator to indicate the end of the stream.
281230 // It may be needed to unblock the table merger which may be still attempting to read
282231 // from the CSV stream.
283232 if (!fileReadErrorMsg.empty ()) {
284- csvStream ->push (nullptr , 0 );
233+ csvMemDisk ->push (nullptr , 0 );
285234 }
286- });
235+ };
236+ csvMemDisk->transferDataFromWorker (transferFunc);
287237
288238 // Attempt the actual merge.
289- bool fileMergeSuccess = _infileMerger->mergeHttp (uberJob, fileSize, csvStream );
239+ bool fileMergeSuccess = _infileMerger->mergeHttp (uberJob, fileSize, csvMemDisk );
290240 if (!fileMergeSuccess) {
291241 LOGS (_log, LOG_LVL_WARN, __func__ << " merge failed" );
292242 util::Error const & err = _infileMerger->getError ();
293243 _setError (ccontrol::MSG_RESULT_ERROR, err.getMsg (), util::ErrorCode::RESULT_IMPORT);
294244 }
295- if (csvStream ->getContaminated ()) {
245+ if (csvMemDisk ->getContaminated ()) {
296246 LOGS (_log, LOG_LVL_ERROR, __func__ << " merge stream contaminated" );
297247 fileMergeSuccess = false ;
298248 _setError (ccontrol::MSG_RESULT_ERROR, " merge stream contaminated" , util::ErrorCode::RESULT_IMPORT);
299249 }
300250
301- csvThread.join ();
302251 if (!fileReadErrorMsg.empty ()) {
303252 LOGS (_log, LOG_LVL_WARN, __func__ << " result file read failed" );
304253 _setError (ccontrol::MSG_HTTP_RESULT, fileReadErrorMsg, util::ErrorCode::RESULT_IMPORT);
@@ -309,14 +258,14 @@ qdisp::MergeEndStatus MergingHandler::_mergeHttp(qdisp::UberJob::Ptr const& uber
309258 if (!mergeEStatus.success ) {
310259 // This error check needs to come after the csvThread.join() to ensure writing
311260 // is finished. If any bytes were written, the result table is ruined.
312- mergeEStatus.contaminated = csvStream-> getBytesWritten () > 0 ;
261+ mergeEStatus.contaminated = csvMemDisk-> getBytesFetched () > 0 ;
313262 }
314263
315264 return mergeEStatus;
316265}
317266
318267void MergingHandler::cancelFileMerge () {
319- auto csvStrm = _csvStream .lock ();
268+ auto csvStrm = _csvMemDisk .lock ();
320269 if (csvStrm != nullptr ) {
321270 csvStrm->cancel ();
322271 }
@@ -342,9 +291,6 @@ qdisp::MergeEndStatus MergingHandler::flushHttp(string const& fileUrl, uint64_t
342291 " MergingHandler::" << __func__ << " uberJob=" << uberJob->getIdStr () << " fileUrl=" << fileUrl);
343292
344293 qdisp::MergeEndStatus mergeStatus = _mergeHttp (uberJob, fileUrl, fileSize);
345- if (mergeStatus.success ) {
346- _infileMerger->mergeCompleteFor (uberJob->getUjId ());
347- }
348294 return mergeStatus;
349295}
350296
0 commit comments