4646#include " http/ClientConnPool.h"
4747#include " http/Method.h"
4848#include " mysql/CsvBuffer.h"
49- #include " proto/ProtoHeaderWrap.h"
50- #include " proto/worker.pb.h"
5149#include " qdisp/CzarStats.h"
5250#include " qdisp/Executive.h"
5351#include " qdisp/JobQuery.h"
@@ -145,106 +143,6 @@ string readHttpFileAndMerge(lsst::qserv::qdisp::UberJob::Ptr const& uberJob, str
145143 " , offset: " + to_string (offset) + " , ex: " + string (ex.what ());
146144 LOGS (_log, LOG_LVL_ERROR, context << errMsg);
147145 return errMsg;
148- /* &&&
149- // A value of the flag is set by the message processor when it's time to finish
150- // or abort reading the file.
151- bool last = false;
152- char const* next = inBuf;
153- char const* const end = inBuf + inBufSize;
154- LOGS(_log, LOG_LVL_TRACE, context << " next=" << (uint64_t)next << " end=" << (uint64_t)end);
155- while ((next < end) && !last) {
156- if (exec->getCancelled()) {
157- throw runtime_error(context + " query was cancelled");
158- }
159- if (msgSizeBytes == 0) {
160- // Continue or finish reading the frame header.
161- size_t const bytes2read =
162- std::min(sizeof(uint32_t) - msgSizeBufNext, (size_t)(end - next));
163- std::memcpy(msgSizeBuf.data() + msgSizeBufNext, next, bytes2read);
164- next += bytes2read;
165- offset += bytes2read;
166- msgSizeBufNext += bytes2read;
167- if (msgSizeBufNext == sizeof(uint32_t)) {
168- ++headerCount;
169- // Done reading the frame header.
170- msgSizeBufNext = 0;
171- // Parse and evaluate the message length.
172- msgSizeBytes = *(reinterpret_cast<uint32_t*>(msgSizeBuf.data()));
173- if (msgSizeBytes == 0) {
174- throw runtime_error("message size is 0 at offset " +
175- to_string(offset - sizeof(uint32_t)) + ", file: " +
176- httpUrl);
177- }
178- if (msgSizeBytes > ProtoHeaderWrap::PROTOBUFFER_HARD_LIMIT) {
179- throw runtime_error("message size " + to_string(msgSizeBytes) + " at offset "
180- + to_string(offset - sizeof(uint32_t)) + " exceeds the hard limit of " +
181- to_string(ProtoHeaderWrap::PROTOBUFFER_HARD_LIMIT) +
182- ", file: " + httpUrl);
183- }
184- // Extend the message buffer (if needed). Note that buffer never gets
185- // truncated to avoid excessive memory deallocations/allocations.
186- if (msgBufSize < msgSizeBytes) {
187- msgBufSize = msgSizeBytes;
188- msgBuf.reset(new char[msgBufSize]);
189- }
190- // Starts the tracker to measure the performance of the network I/O.
191- transmitRateTracker =
192- make_unique<lsst::qserv::TimeCountTracker<double>>(reportFileRecvRate);
193- }
194- } else {
195- // Continue or finish reading the message body.
196- size_t const bytes2read =
197- std::min((size_t)msgSizeBytes - msgBufNext, (size_t)(end - next));
198- std::memcpy(msgBuf.get() + msgBufNext, next, bytes2read);
199- next += bytes2read;
200- offset += bytes2read;
201- msgBufNext += bytes2read;
202- if (msgBufNext == msgSizeBytes) {
203- // Done reading message body.
204- msgBufNext = 0;
205-
206- // Destroying the tracker will result in stopping the tracker's timer and
207- // reporting the file read rate before proceeding to the merge.
208- if (transmitRateTracker != nullptr) {
209- transmitRateTracker->addToValue(msgSizeBytes);
210- transmitRateTracker->setSuccess();
211- transmitRateTracker.reset();
212- }
213-
214- // Parse and evaluate the message.
215- mergeHappened = true;
216- bool messageReadyResult = messageIsReady(msgBuf.get(), msgSizeBytes, last);
217- totalBytesRead += msgSizeBytes;
218- if (!messageReadyResult) {
219- success = false;
220- throw runtime_error("message processing failed at offset " +
221- to_string(offset - msgSizeBytes) + ", file: " + httpUrl);
222- }
223- // Reset the variable to prepare for reading the next header & message (if any).
224- msgSizeBytes = 0;
225- } else {
226- LOGS(_log, LOG_LVL_TRACE,
227- context << " headerCount=" << headerCount
228- << " incomplete read diff=" << (msgSizeBytes - msgBufNext));
229- }
230- }
231- }
232- });
233- LOGS(_log, LOG_LVL_TRACE,
234- context << " headerCount=" << headerCount << " msgSizeBytes=" << msgSizeBytes
235- << " totalBytesRead=" << totalBytesRead);
236- if (msgSizeBufNext != 0) {
237- throw runtime_error("short read of the message header at offset " +
238- to_string(offset - msgSizeBytes) + ", file: " + httpUrl);
239- }
240- if (msgBufNext != 0) {
241- throw runtime_error("short read of the message body at offset " +
242- to_string(offset - msgSizeBytes) + ", file: " + httpUrl);
243- }
244- } catch (exception const& ex) {
245- LOGS(_log, LOG_LVL_ERROR, context + " " + ex.what());
246- success = false;
247- */
248146 }
249147
250148 // Remove the file from the worker if it still exists. Report and ignore errors.
@@ -338,28 +236,20 @@ std::ostream& MergingHandler::print(std::ostream& os) const {
338236 return os << " MergingRequester(flushed=" << (_flushed ? " true)" : " false)" );
339237}
340238
341- bool queryIsNoLongerActive (qdisp::UberJob::Ptr const & uberJob) { // &&&
342- // Do nothing if the query got cancelled for any reason.
343- if (uberJob->isQueryCancelled ()) return true ;
344-
345- // Check for other indicators that the query may have cancelled or finished.
346- auto executive = uberJob->getExecutive ();
347- if (executive == nullptr || executive->getCancelled () || executive->isRowLimitComplete ()) {
348- return true ;
349- }
350- return false ;
351- }
352-
353- bool MergingHandler::_mergeHttp (qdisp::UberJob::Ptr const & uberJob, string const & fileUrl,
354- uint64_t fileSize) {
239+ qdisp::MergeEndStatus MergingHandler::_mergeHttp (qdisp::UberJob::Ptr const & uberJob, string const & fileUrl,
240+ uint64_t fileSize) {
355241 if (_flushed) {
356242 throw util::Bug (ERR_LOC, " already flushed" );
357243 }
358244
359- if (fileSize == 0 ) return true ;
245+ if (fileSize == 0 ) {
246+ return qdisp::MergeEndStatus (true );
247+ }
360248
361249 // After this final test the job's result processing can't be interrupted.
362- if (uberJob->isQueryCancelled ()) return true ;
250+ if (uberJob->isQueryCancelled ()) {
251+ return qdisp::MergeEndStatus (true );
252+ }
363253
364254 // Read from the http stream and push records into the CSV stream in a separate thread.
365255 // Note the fixed capacity of the stream which allows up to 2 records to be buffered
@@ -386,7 +276,7 @@ bool MergingHandler::_mergeHttp(qdisp::UberJob::Ptr const& uberJob, string const
386276 },
387277 MergingHandler::_getHttpConnPool ());
388278 // Push the stream terminator to indicate the end of the stream.
389- // It may be neeeded to unblock the table merger which may be still attempting to read
279+ // It may be needed to unblock the table merger which may be still attempting to read
390280 // from the CSV stream.
391281 if (!fileReadErrorMsg.empty ()) {
392282 csvStream->push (nullptr , 0 );
@@ -407,61 +297,16 @@ bool MergingHandler::_mergeHttp(qdisp::UberJob::Ptr const& uberJob, string const
407297 _setError (ccontrol::MSG_HTTP_RESULT, fileReadErrorMsg, util::ErrorCode::RESULT_IMPORT);
408298 }
409299 _flushed = true ;
410- return fileMergeSuccess && fileReadErrorMsg.empty ();
411- }
412-
413- void MergingHandler::_setError (int code, std::string const & msg, int errorState) {
414- LOGS (_log, LOG_LVL_DEBUG, " _setError: code: " << code << " , message: " << msg);
415- auto exec = _executive.lock ();
416- if (exec == nullptr ) return ;
417- exec->addMultiError (code, msg, errorState);
418- }
419-
420- tuple<bool , bool > MergingHandler::flushHttp (string const & fileUrl, uint64_t fileSize, uint64_t expectedRows,
421- uint64_t & resultRows) {
422- bool success = false ;
423- bool shouldCancel = false ;
424-
425- // This is needed to ensure the job query would be staying alive for the duration
426- // of the operation to prevent inconsistency within the application.
427- auto const uberJob = getUberJob ().lock ();
428- if (uberJob == nullptr ) {
429- LOGS (_log, LOG_LVL_ERROR, __func__ << " failed, uberJob was NULL" );
430- return {success, shouldCancel}; // both should still be false
431- }
432300
433- LOGS (_log, LOG_LVL_TRACE,
434- " MergingHandler::" << __func__ << " uberJob=" << uberJob->getIdStr () << " fileUrl=" << fileUrl);
435-
436- success = _mergeHttp (uberJob, fileUrl, fileSize);
437- // &&& FOULED_RESULTS need to do something about shouldCancel.
438- // &&& until there is some way to know if csvStream has merged any bytes, just assume it has fouled the
439- // results.
440- if (!success) shouldCancel = true ;
441-
442- if (!success || shouldCancel) {
443- LOGS (_log, LOG_LVL_WARN, __func__ << " success=" << success << " shouldCancel=" << shouldCancel);
444- }
445-
446- if (success) {
447- _infileMerger->mergeCompleteFor (uberJob->getUjId ());
448- }
449- return {success, shouldCancel};
450- }
451-
452- /* &&&
453- bool MergingHandler::_mergeHttp(shared_ptr<qdisp::UberJob> const& uberJob,
454- proto::ResponseData const& responseData) {
455- if (_flushed) {
456- throw util::Bug(ERR_LOC, "already flushed");
457- }
458- bool const success = _infileMerger->mergeHttp(uberJob, responseData);
459- if (!success) {
460- LOGS(_log, LOG_LVL_WARN, __func__ << " failed");
461- util::Error const& err = _infileMerger->getError();
462- _setError(ccontrol::MSG_RESULT_ERROR, err.getMsg(), util::ErrorCode::RESULT_IMPORT);
301+ qdisp::MergeEndStatus mergeEStatus (fileMergeSuccess && fileReadErrorMsg.empty ());
302+ if (!mergeEStatus.success ) {
303+ // This error check needs to come after the csvThread.join() to avoid race conditions.
304+ if (csvStream->getBytesWritten () > 0 ) {
305+ // There was a failure and bytes were written, result table is ruined.
306+ mergeEStatus.contaminated = true ;
307+ }
463308 }
464- return success ;
309+ return mergeEStatus ;
465310}
466311
467312void MergingHandler::_setError (int code, std::string const & msg, int errorState) {
@@ -471,54 +316,25 @@ void MergingHandler::_setError(int code, std::string const& msg, int errorState)
471316 exec->addMultiError (code, msg, errorState);
472317}
473318
474- tuple<bool, bool> MergingHandler::flushHttp(string const& fileUrl, uint64_t expectedRows,
475- uint64_t& resultRows) {
476- bool success = false;
477- bool shouldCancel = false;
478-
319+ qdisp::MergeEndStatus MergingHandler::flushHttp (string const & fileUrl, uint64_t fileSize) {
479320 // This is needed to ensure the job query would be staying alive for the duration
480321 // of the operation to prevent inconsistency within the application.
481322 auto const uberJob = getUberJob ().lock ();
482323 if (uberJob == nullptr ) {
483324 LOGS (_log, LOG_LVL_ERROR, __func__ << " failed, uberJob was NULL" );
484- return {success, shouldCancel}; // both should still be false
325+ return qdisp::MergeEndStatus ( false );
485326 }
486327
487328 LOGS (_log, LOG_LVL_TRACE,
488329 " MergingHandler::" << __func__ << " uberJob=" << uberJob->getIdStr () << " fileUrl=" << fileUrl);
489330
490- // Dispatch result processing to the corresponidng method which depends on
491- // the result delivery protocol configured at the worker.
492- // Notify the file reader when all rows have been read by setting 'last = true'.
493- auto const dataMergerHttp = [&](char const* buf, uint32_t bufSize, bool& last) {
494- LOGS(_log, LOG_LVL_TRACE, "dataMergerHttp");
495- last = true;
496- proto::ResponseData responseData;
497- if (responseData.ParseFromArray(buf, bufSize) && responseData.IsInitialized()) {
498- bool const mergeSuccess = _mergeHttp(uberJob, responseData);
499- if (mergeSuccess) {
500- resultRows += responseData.row_size();
501- last = resultRows >= expectedRows;
502- }
503- return mergeSuccess;
504- }
505- throw runtime_error("MergingHandler::flush ** message deserialization failed **");
506- };
507-
508- tie(success, shouldCancel) =
509- ::readHttpFileAndMergeHttp(uberJob, fileUrl, dataMergerHttp, MergingHandler::_getHttpConnPool());
510-
511- if (!success || shouldCancel) {
512- LOGS(_log, LOG_LVL_WARN, __func__ << " success=" << success << " shouldCancel=" << shouldCancel);
513- }
514-
515- if (success) {
331+ qdisp::MergeEndStatus mergeStatus = _mergeHttp (uberJob, fileUrl, fileSize);
332+ if (mergeStatus.success ) {
516333 _infileMerger->mergeCompleteFor (uberJob->getUjId ());
517334 }
518- return {success, shouldCancel} ;
335+ return mergeStatus ;
519336}
520337
521- >>>>>>> a27525c04017db9a30061fa0bb4b5228c0c5d1b2 */
522338void MergingHandler::flushHttpError (int errorCode, std::string const & errorMsg, int errState) {
523339 if (!_errorSet.exchange (true )) {
524340 _setError (errorCode, errorMsg, errState);
0 commit comments