4545#include " http/Client.h"
4646#include " http/ClientConnPool.h"
4747#include " http/Method.h"
48- #include " mysql/CsvBuffer .h"
48+ #include " mysql/CsvMemDisk .h"
4949#include " qdisp/CzarStats.h"
5050#include " qdisp/Executive.h"
5151#include " qdisp/JobQuery.h"
@@ -195,13 +195,8 @@ qdisp::MergeEndStatus MergingHandler::_mergeHttp(qdisp::UberJob::Ptr const& uber
195195 }
196196
197197 if (fileSize == 0 ) return qdisp::MergeEndStatus (true );
198-
199- // Read from the http stream and push records into the CSV stream in a separate thread.
200- // Note the fixed capacity of the stream which allows up to 2 records to be buffered
201- // in the stream. This is enough to hide the latency of the HTTP connection and
202- // the time needed to read the file.
203- auto csvStream = mysql::CsvStream::create (2 );
204- _csvStream = csvStream;
198+ auto csvMemDisk = mysql::CsvMemDisk::create (fileSize, uberJob->getQueryId (), uberJob->getUjId ());
199+ _csvMemDisk = csvMemDisk;
205200
206201 // This must be after setting _csvStream to avoid cancelFileMerge()
207202 // race issues, and it needs to be before the thread starts.
@@ -211,46 +206,46 @@ qdisp::MergeEndStatus MergingHandler::_mergeHttp(qdisp::UberJob::Ptr const& uber
211206 }
212207
213208 string fileReadErrorMsg;
214- thread csvThread ([uberJob, csvStream, fileUrl, fileSize, &fileReadErrorMsg ]() {
209+ auto transferFunc = [& ]() {
215210 size_t bytesRead = 0 ;
216211 fileReadErrorMsg = ::readHttpFileAndMerge (
217212 uberJob, fileUrl, fileSize,
218- [uberJob, csvStream, fileSize, &bytesRead ](char const * buf, uint32_t size) {
213+ [& ](char const * buf, uint32_t size) {
219214 bool last = false ;
220215 if (buf == nullptr || size == 0 ) {
221216 last = true ;
222217 } else {
223- csvStream ->push (buf, size);
218+ csvMemDisk ->push (buf, size);
224219 bytesRead += size;
225220 last = bytesRead >= fileSize;
226221 }
227222 if (last) {
228- csvStream ->push (nullptr , 0 );
223+ csvMemDisk ->push (nullptr , 0 );
229224 }
230225 },
231226 MergingHandler::_getHttpConnPool ());
232227 // Push the stream terminator to indicate the end of the stream.
233228 // It may be needed to unblock the table merger which may be still attempting to read
234229 // from the CSV stream.
235230 if (!fileReadErrorMsg.empty ()) {
236- csvStream ->push (nullptr , 0 );
231+ csvMemDisk ->push (nullptr , 0 );
237232 }
238- });
233+ };
234+ csvMemDisk->transferDataFromWorker (transferFunc);
239235
240236 // Attempt the actual merge.
241- bool fileMergeSuccess = _infileMerger->mergeHttp (uberJob, fileSize, csvStream );
237+ bool fileMergeSuccess = _infileMerger->mergeHttp (uberJob, fileSize, csvMemDisk );
242238 if (!fileMergeSuccess) {
243239 LOGS (_log, LOG_LVL_WARN, __func__ << " merge failed" );
244240 util::Error const & err = _infileMerger->getError ();
245241 _setError (ccontrol::MSG_RESULT_ERROR, err.getMsg (), util::ErrorCode::RESULT_IMPORT);
246242 }
247- if (csvStream ->getContaminated ()) {
243+ if (csvMemDisk ->getContaminated ()) {
248244 LOGS (_log, LOG_LVL_ERROR, __func__ << " merge stream contaminated" );
249245 fileMergeSuccess = false ;
250246 _setError (ccontrol::MSG_RESULT_ERROR, " merge stream contaminated" , util::ErrorCode::RESULT_IMPORT);
251247 }
252248
253- csvThread.join ();
254249 if (!fileReadErrorMsg.empty ()) {
255250 LOGS (_log, LOG_LVL_WARN, __func__ << " result file read failed" );
256251 _setError (ccontrol::MSG_HTTP_RESULT, fileReadErrorMsg, util::ErrorCode::RESULT_IMPORT);
@@ -261,15 +256,14 @@ qdisp::MergeEndStatus MergingHandler::_mergeHttp(qdisp::UberJob::Ptr const& uber
261256 if (!mergeEStatus.success ) {
262257 // This error check needs to come after the csvThread.join() to ensure writing
263258 // is finished. If any bytes were written, the result table is ruined.
264- mergeEStatus.contaminated = csvStream-> getBytesWritten () > 0 ;
259+ mergeEStatus.contaminated = csvMemDisk-> getBytesFetched () > 0 ;
265260 }
266- // TODO:UJ Make it impossible to contaminate the result table for all errors
267- // short of czar or mariadb crash.
261+
268262 return mergeEStatus;
269263}
270264
271265void MergingHandler::cancelFileMerge () {
272- auto csvStrm = _csvStream .lock ();
266+ auto csvStrm = _csvMemDisk .lock ();
273267 if (csvStrm != nullptr ) {
274268 csvStrm->cancel ();
275269 }
@@ -295,9 +289,6 @@ qdisp::MergeEndStatus MergingHandler::flushHttp(string const& fileUrl, uint64_t
295289 " MergingHandler::" << __func__ << " uberJob=" << uberJob->getIdStr () << " fileUrl=" << fileUrl);
296290
297291 qdisp::MergeEndStatus mergeStatus = _mergeHttp (uberJob, fileUrl, fileSize);
298- if (mergeStatus.success ) {
299- _infileMerger->mergeCompleteFor (uberJob->getUjId ());
300- }
301292 return mergeStatus;
302293}
303294
0 commit comments