3939#include  " lsst/log/Log.h" 
4040
4141//  Qserv headers
42+ #include  " cconfig/CzarConfig.h" 
4243#include  " ccontrol/msgCode.h" 
4344#include  " global/clock_defs.h" 
4445#include  " global/debugUtil.h" 
4546#include  " http/Client.h" 
4647#include  " http/ClientConnPool.h" 
4748#include  " http/Method.h" 
48- #include  " mysql/CsvBuffer .h" 
49+ #include  " mysql/CsvMemDisk .h" 
4950#include  " qdisp/CzarStats.h" 
5051#include  " qdisp/Executive.h" 
5152#include  " qdisp/JobQuery.h" 
52- #include  " qdisp/QueryRequest.h" 
5353#include  " qdisp/UberJob.h" 
5454#include  " rproc/InfileMerger.h" 
5555#include  " util/Bug.h" 
@@ -84,7 +84,6 @@ lsst::qserv::TimeCountTracker<double>::CALLBACKFUNC const reportFileRecvRate =
8484            }
8585        };
8686
87- 
8887string readHttpFileAndMerge (lsst::qserv::qdisp::UberJob::Ptr const & uberJob, string const & httpUrl,
8988                            size_t  fileSize, function<void (char  const *, uint32_t )> const & messageIsReady,
9089                            shared_ptr<http::ClientConnPool> const & httpConnPool) {
@@ -180,52 +179,6 @@ MergingHandler::MergingHandler(std::shared_ptr<rproc::InfileMerger> const& merge
180179
181180MergingHandler::~MergingHandler () { LOGS (_log, LOG_LVL_TRACE, __func__); }
182181
183- 
184- bool  MergingHandler::flush (proto::ResponseSummary const & resp) {
185-     _wName = resp.wname ();
186- 
187-     //  This is needed to ensure the job query would be staying alive for the duration
188-     //  of the operation to prevent inconsistency within the application.
189-     auto  const  jobQuery = getJobQuery ().lock ();
190-     if  (jobQuery == nullptr ) {
191-         LOGS (_log, LOG_LVL_ERROR, __func__ << "  failed, jobQuery was NULL" 
192-         return  false ;
193-     }
194-     auto  const  jobQuery = std::dynamic_pointer_cast<qdisp::JobQuery>(jobBase);
195- 
196-     LOGS (_log, LOG_LVL_TRACE,
197-          " MergingHandler::" "  jobid=" jobid () << "  transmitsize=" 
198-                             << resp.transmitsize () << "  rowcount=" rowcount () << "  rowSize=" 
199-                             << "  attemptcount=" attemptcount () << "  errorcode=" errorcode ()
200-                             << "  errormsg=" errormsg ());
201- 
202-     if  (resp.errorcode () != 0  || !resp.errormsg ().empty ()) {
203-         _error = util::Error (resp.errorcode (), resp.errormsg (), util::ErrorCode::MYSQLEXEC);
204-         _setError (ccontrol::MSG_RESULT_ERROR, _error.getMsg ());
205-         LOGS (_log, LOG_LVL_ERROR,
206-              " MergingHandler::" "  error from worker:" wname () << "  error: " 
207-         //  This way we can track if the worker has reported this error. The current implementation
208-         //  requires the large result size to be reported as an error via the InfileMerger regardless
209-         //  of an origin of the error (Czar or the worker). Note that large results can be produced
210-         //  by the Czar itself, e.g., when the aggregate result of multiple worker queries is too large
211-         //  or by the worker when the result set of a single query is too large.
212-         //  The error will be reported to the Czar as a part of the response summary.
213-         if  (resp.errorcode () == util::ErrorCode::WORKER_RESULT_TOO_LARGE) {
214-             _infileMerger->setResultSizeLimitExceeded ();
215-         }
216-         return  false ;
217-     }
218- 
219-     bool  const  success = _merge (resp, jobQuery);
220- 
221-     if  (success) {
222-         _infileMerger->mergeCompleteFor (resp.jobid ());
223-         qdisp::CzarStats::get ()->addTotalRowsRecv (resp.rowcount ());
224-         qdisp::CzarStats::get ()->addTotalBytesRecv (resp.transmitsize ());
225-     }
226-     return  success;
227- }
228- 
229182void  MergingHandler::errorFlush (std::string const & msg, int  code) {
230183    _setError (code, msg, util::ErrorCode::RESULT_IMPORT);
231184    //  Might want more info from result service.
@@ -244,13 +197,8 @@ qdisp::MergeEndStatus MergingHandler::_mergeHttp(qdisp::UberJob::Ptr const& uber
244197    }
245198
246199    if  (fileSize == 0 ) return  qdisp::MergeEndStatus (true );
247- 
248-     //  Read from the http stream and push records into the CSV stream in a separate thread.
249-     //  Note the fixed capacity of the stream which allows up to 2 records to be buffered
250-     //  in the stream. This is enough to hide the latency of the HTTP connection and
251-     //  the time needed to read the file.
252-     auto  csvStream = mysql::CsvStream::create (2 );
253-     _csvStream = csvStream;
200+     auto  csvMemDisk = mysql::CsvMemDisk::create (fileSize, uberJob->getQueryId (), uberJob->getUjId ());
201+     _csvMemDisk = csvMemDisk;
254202
255203    //  This must be after setting _csvStream to avoid cancelFileMerge()
256204    //  race issues, and it needs to be before the thread starts.
@@ -260,46 +208,46 @@ qdisp::MergeEndStatus MergingHandler::_mergeHttp(qdisp::UberJob::Ptr const& uber
260208    }
261209
262210    string fileReadErrorMsg;
263-     thread  csvThread ([uberJob, csvStream, fileUrl, fileSize, &fileReadErrorMsg ]() {
211+     auto  transferFunc = [& ]() {
264212        size_t  bytesRead = 0 ;
265213        fileReadErrorMsg = ::readHttpFileAndMerge (
266214                uberJob, fileUrl, fileSize,
267-                 [uberJob, csvStream, fileSize, &bytesRead ](char  const * buf, uint32_t  size) {
215+                 [& ](char  const * buf, uint32_t  size) {
268216                    bool  last = false ;
269217                    if  (buf == nullptr  || size == 0 ) {
270218                        last = true ;
271219                    } else  {
272-                         csvStream ->push (buf, size);
220+                         csvMemDisk ->push (buf, size);
273221                        bytesRead += size;
274222                        last = bytesRead >= fileSize;
275223                    }
276224                    if  (last) {
277-                         csvStream ->push (nullptr , 0 );
225+                         csvMemDisk ->push (nullptr , 0 );
278226                    }
279227                },
280228                MergingHandler::_getHttpConnPool ());
281229        //  Push the stream terminator to indicate the end of the stream.
282230        //  It may be needed to unblock the table merger which may be still attempting to read
283231        //  from the CSV stream.
284232        if  (!fileReadErrorMsg.empty ()) {
285-             csvStream ->push (nullptr , 0 );
233+             csvMemDisk ->push (nullptr , 0 );
286234        }
287-     });
235+     };
236+     csvMemDisk->transferDataFromWorker (transferFunc);
288237
289238    //  Attempt the actual merge.
290-     bool  fileMergeSuccess = _infileMerger->mergeHttp (uberJob, fileSize, csvStream );
239+     bool  fileMergeSuccess = _infileMerger->mergeHttp (uberJob, fileSize, csvMemDisk );
291240    if  (!fileMergeSuccess) {
292241        LOGS (_log, LOG_LVL_WARN, __func__ << "  merge failed" 
293242        util::Error const & err = _infileMerger->getError ();
294243        _setError (ccontrol::MSG_RESULT_ERROR, err.getMsg (), util::ErrorCode::RESULT_IMPORT);
295244    }
296-     if  (csvStream ->getContaminated ()) {
245+     if  (csvMemDisk ->getContaminated ()) {
297246        LOGS (_log, LOG_LVL_ERROR, __func__ << "  merge stream contaminated" 
298247        fileMergeSuccess = false ;
299248        _setError (ccontrol::MSG_RESULT_ERROR, " merge stream contaminated" 
300249    }
301250
302-     csvThread.join ();
303251    if  (!fileReadErrorMsg.empty ()) {
304252        LOGS (_log, LOG_LVL_WARN, __func__ << "  result file read failed" 
305253        _setError (ccontrol::MSG_HTTP_RESULT, fileReadErrorMsg, util::ErrorCode::RESULT_IMPORT);
@@ -310,14 +258,14 @@ qdisp::MergeEndStatus MergingHandler::_mergeHttp(qdisp::UberJob::Ptr const& uber
310258    if  (!mergeEStatus.success ) {
311259        //  This error check needs to come after the csvThread.join() to ensure writing
312260        //  is finished. If any bytes were written, the result table is ruined.
313-         mergeEStatus.contaminated  = csvStream-> getBytesWritten () > 0 ;
261+         mergeEStatus.contaminated  = csvMemDisk-> getBytesFetched () > 0 ;
314262    }
315263
316264    return  mergeEStatus;
317265}
318266
319267void  MergingHandler::cancelFileMerge () {
320-     auto  csvStrm = _csvStream .lock ();
268+     auto  csvStrm = _csvMemDisk .lock ();
321269    if  (csvStrm != nullptr ) {
322270        csvStrm->cancel ();
323271    }
@@ -343,9 +291,6 @@ qdisp::MergeEndStatus MergingHandler::flushHttp(string const& fileUrl, uint64_t
343291         " MergingHandler::" "  uberJob=" getIdStr () << "  fileUrl=" 
344292
345293    qdisp::MergeEndStatus mergeStatus = _mergeHttp (uberJob, fileUrl, fileSize);
346-     if  (mergeStatus.success ) {
347-         _infileMerger->mergeCompleteFor (uberJob->getUjId ());
348-     }
349294    return  mergeStatus;
350295}
351296
0 commit comments