Skip to content

Commit 0baa2a7

Browse files
committed
Added memory/disk hybrid for transfering csv files.
1 parent 53a06a2 commit 0baa2a7

File tree

19 files changed

+669
-129
lines changed

19 files changed

+669
-129
lines changed

admin/local/docker/compose/docker-compose.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ volumes:
3636
volume_czar_xrootd:
3737
volume_czar_home:
3838
volume_czar_cfg:
39+
volume_czar_transfer:
3940

4041
volume_czar_mariadb_data:
4142
volume_czar_mariadb_cfg:
@@ -272,6 +273,10 @@ services:
272273
- type: volume
273274
source: volume_czar_mariadb_run
274275
target: /qserv/mariadb/run
276+
- type: volume
277+
source: volume_czar_transfer
278+
target: /tmp
279+
275280
- << : *log-volume
276281
expose:
277282
- "3306" # for czar-mariadb
@@ -308,6 +313,9 @@ services:
308313
- type: volume
309314
source: volume_czar_cfg
310315
target: /config-etc
316+
- type: volume
317+
source: volume_czar_transfer
318+
target: /tmp
311319
- type: volume
312320
source: volume_czar_home
313321
target: /home/qserv

src/admin/templates/proxy/etc/qserv-czar.cnf.jinja

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,29 @@ port = {{ czar_db_port }}
2525
# Any table in resultdb that hasn't been updated in this many days is deleted.
2626
oldestResultKeptDays = 7
2727

28+
# Either this should be changed to a high performance docker volume directory
29+
# or /tmp should be mounted as a high performance docker volume directory
30+
# to avoid using limited docker memory to store the contents.
31+
transferDir = /tmp
32+
2833
# maximum number of connection retries to SQL databse (per connection attempt)
2934
maxsqlconnectionattempts = 10
3035

3136
# maximum user query result size in MB
3237
maxtablesize_mb = 5100
3338

39+
# maximum number of MB of concurrent csv transfer files allowed to be kept in
40+
# memory, after this point the will be temporarily written to disk.
41+
# 0 is used for testing. 10000 is usually reasonable.
42+
maxTransferMemMB = 0
43+
44+
# minimum number of MB for each csv transfer file to be kept in memory
45+
# before possibly going to disk.
46+
# 0 for testing, up to 10 should be reasonable.
47+
transferMinMBInMem = 0
48+
49+
50+
3451

3552
# database connection for QMeta database
3653
[qmeta]

src/cconfig/CzarConfig.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,14 @@ class CzarConfig {
130130
/// Getters for result aggregation options.
131131
int getMaxTableSizeMB() const { return _maxTableSizeMB->getVal(); }
132132
int getMaxSqlConnectionAttempts() const { return _maxSqlConnectionAttempts->getVal(); }
133+
unsigned int getMaxTransferMemMB() const { return _resultMaxTransferMemMB->getVal(); }
134+
/// Return the transfer directory. This is customizable to allow for a
135+
/// high performance volume.
136+
std::string getTransferDir() const { return _resultTransferDir->getVal(); }
137+
138+
/// Return the minimum amount of memory per UberJob to keep in memory. This much transfer
139+
/// data will be stored in memory regardless of other conditions.
140+
unsigned int getTransferMinMBInMem() const { return _resultTransferMinMBInMem->getVal(); }
133141

134142
/// The size of the TCP connection pool within the client API that is used
135143
/// by the merger to pool result files from workers via the HTTP protocol.
@@ -306,6 +314,14 @@ class CzarConfig {
306314
CVTIntPtr _oldestAsyncResultKeptSeconds = util::ConfigValTInt::create(
307315
_configValMap, "resultdb", "oldestAsyncResultKeptSeconds", notReq, 3600);
308316

317+
// This must be larger than _maxTableSizeMB when using the "memory" TransferMethod
318+
CVTUIntPtr _resultMaxTransferMemMB =
319+
util::ConfigValTUInt::create(_configValMap, "resultdb", "maxTransferMemMB", notReq, 10000);
320+
CVTStrPtr _resultTransferDir =
321+
util::ConfigValTStr::create(_configValMap, "resultdb", "transferDir", notReq, "/tmp");
322+
CVTUIntPtr _resultTransferMinMBInMem =
323+
util::ConfigValTUInt::create(_configValMap, "resultdb", "transferMinMBInMem", notReq, 10);
324+
309325
/// Get all the elements in the css section.
310326
CVTStrPtr _cssTechnology =
311327
util::ConfigValTStr::create(_configValMap, "css", "technology", notReq, "mysql");

src/ccontrol/MergingHandler.cc

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
#include "http/Client.h"
4646
#include "http/ClientConnPool.h"
4747
#include "http/Method.h"
48-
#include "mysql/CsvBuffer.h"
48+
#include "mysql/CsvMemDisk.h"
4949
#include "qdisp/CzarStats.h"
5050
#include "qdisp/Executive.h"
5151
#include "qdisp/JobQuery.h"
@@ -244,13 +244,8 @@ qdisp::MergeEndStatus MergingHandler::_mergeHttp(qdisp::UberJob::Ptr const& uber
244244
}
245245

246246
if (fileSize == 0) return qdisp::MergeEndStatus(true);
247-
248-
// Read from the http stream and push records into the CSV stream in a separate thread.
249-
// Note the fixed capacity of the stream which allows up to 2 records to be buffered
250-
// in the stream. This is enough to hide the latency of the HTTP connection and
251-
// the time needed to read the file.
252-
auto csvStream = mysql::CsvStream::create(2);
253-
_csvStream = csvStream;
247+
auto csvMemDisk = mysql::CsvMemDisk::create(fileSize, uberJob->getQueryId(), uberJob->getUjId());
248+
_csvMemDisk = csvMemDisk;
254249

255250
// This must be after setting _csvStream to avoid cancelFileMerge()
256251
// race issues, and it needs to be before the thread starts.
@@ -260,46 +255,46 @@ qdisp::MergeEndStatus MergingHandler::_mergeHttp(qdisp::UberJob::Ptr const& uber
260255
}
261256

262257
string fileReadErrorMsg;
263-
thread csvThread([uberJob, csvStream, fileUrl, fileSize, &fileReadErrorMsg]() {
258+
auto transferFunc = [&]() {
264259
size_t bytesRead = 0;
265260
fileReadErrorMsg = ::readHttpFileAndMerge(
266261
uberJob, fileUrl, fileSize,
267-
[uberJob, csvStream, fileSize, &bytesRead](char const* buf, uint32_t size) {
262+
[&](char const* buf, uint32_t size) {
268263
bool last = false;
269264
if (buf == nullptr || size == 0) {
270265
last = true;
271266
} else {
272-
csvStream->push(buf, size);
267+
csvMemDisk->push(buf, size);
273268
bytesRead += size;
274269
last = bytesRead >= fileSize;
275270
}
276271
if (last) {
277-
csvStream->push(nullptr, 0);
272+
csvMemDisk->push(nullptr, 0);
278273
}
279274
},
280275
MergingHandler::_getHttpConnPool());
281276
// Push the stream terminator to indicate the end of the stream.
282277
// It may be needed to unblock the table merger which may be still attempting to read
283278
// from the CSV stream.
284279
if (!fileReadErrorMsg.empty()) {
285-
csvStream->push(nullptr, 0);
280+
csvMemDisk->push(nullptr, 0);
286281
}
287-
});
282+
};
283+
csvMemDisk->transferDataFromWorker(transferFunc);
288284

289285
// Attempt the actual merge.
290-
bool fileMergeSuccess = _infileMerger->mergeHttp(uberJob, fileSize, csvStream);
286+
bool fileMergeSuccess = _infileMerger->mergeHttp(uberJob, fileSize, csvMemDisk);
291287
if (!fileMergeSuccess) {
292288
LOGS(_log, LOG_LVL_WARN, __func__ << " merge failed");
293289
util::Error const& err = _infileMerger->getError();
294290
_setError(ccontrol::MSG_RESULT_ERROR, err.getMsg(), util::ErrorCode::RESULT_IMPORT);
295291
}
296-
if (csvStream->getContaminated()) {
292+
if (csvMemDisk->getContaminated()) {
297293
LOGS(_log, LOG_LVL_ERROR, __func__ << " merge stream contaminated");
298294
fileMergeSuccess = false;
299295
_setError(ccontrol::MSG_RESULT_ERROR, "merge stream contaminated", util::ErrorCode::RESULT_IMPORT);
300296
}
301297

302-
csvThread.join();
303298
if (!fileReadErrorMsg.empty()) {
304299
LOGS(_log, LOG_LVL_WARN, __func__ << " result file read failed");
305300
_setError(ccontrol::MSG_HTTP_RESULT, fileReadErrorMsg, util::ErrorCode::RESULT_IMPORT);
@@ -310,14 +305,14 @@ qdisp::MergeEndStatus MergingHandler::_mergeHttp(qdisp::UberJob::Ptr const& uber
310305
if (!mergeEStatus.success) {
311306
// This error check needs to come after the csvThread.join() to ensure writing
312307
// is finished. If any bytes were written, the result table is ruined.
313-
mergeEStatus.contaminated = csvStream->getBytesWritten() > 0;
308+
mergeEStatus.contaminated = csvMemDisk->getBytesFetched() > 0;
314309
}
315310

316311
return mergeEStatus;
317312
}
318313

319314
void MergingHandler::cancelFileMerge() {
320-
auto csvStrm = _csvStream.lock();
315+
auto csvStrm = _csvMemDisk.lock();
321316
if (csvStrm != nullptr) {
322317
csvStrm->cancel();
323318
}
@@ -343,9 +338,6 @@ qdisp::MergeEndStatus MergingHandler::flushHttp(string const& fileUrl, uint64_t
343338
"MergingHandler::" << __func__ << " uberJob=" << uberJob->getIdStr() << " fileUrl=" << fileUrl);
344339

345340
qdisp::MergeEndStatus mergeStatus = _mergeHttp(uberJob, fileUrl, fileSize);
346-
if (mergeStatus.success) {
347-
_infileMerger->mergeCompleteFor(uberJob->getUjId());
348-
}
349341
return mergeStatus;
350342
}
351343

src/ccontrol/MergingHandler.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class ClientConnPool;
3737
} // namespace lsst::qserv::http
3838

3939
namespace lsst::qserv::mysql {
40-
class CsvStream;
40+
class CsvMemDisk;
4141
} // namespace lsst::qserv::mysql
4242

4343
namespace lsst::qserv::qdisp {
@@ -103,8 +103,8 @@ class MergingHandler : public qdisp::ResponseHandler {
103103
bool _flushed{false}; ///< flushed to InfileMerger?
104104
std::string _wName{"~"}; ///< worker name
105105

106-
std::weak_ptr<qdisp::Executive> _executive; ///< Weak pointer to the executive for errors.
107-
std::weak_ptr<mysql::CsvStream> _csvStream; ///< Weak pointer to cancel infile merge.
106+
std::weak_ptr<qdisp::Executive> _executive; ///< Weak pointer to the executive for errors.
107+
std::weak_ptr<mysql::CsvMemDisk> _csvMemDisk; ///< Weak pointer to cancel infile merge.
108108
};
109109

110110
} // namespace lsst::qserv::ccontrol

src/czar/ActiveWorker.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,8 @@ void ActiveWorkerMap::updateMap(protojson::WorkerContactInfo::WCMap const& wcMap
288288
auto iter = _awMap.find(wcKey);
289289
if (iter == _awMap.end()) {
290290
auto newAW = ActiveWorker::create(wcVal, czInfo, replicationInstanceId, replicationAuthKey);
291-
LOGS(_log, LOG_LVL_INFO, cName(__func__) << " ActiveWorker created for " << wcKey);
291+
LOGS(_log, LOG_LVL_INFO,
292+
cName(__func__) << " ActiveWorker created for " << wcKey << " " << newAW->dump());
292293
_awMap[wcKey] = newAW;
293294
if (_czarCancelAfterRestart) {
294295
newAW->setCzarCancelAfterRestart(_czarCancelAfterRestartCzId, _czarCancelAfterRestartQId);

src/czar/Czar.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
#include "http/ClientConnPool.h"
5454
#include "http/MetaModule.h"
5555
#include "http/Method.h"
56+
#include "mysql/CsvMemDisk.h"
5657
#include "qdisp/CzarStats.h"
5758
#include "qdisp/Executive.h"
5859
#include "qproc/DatabaseModels.h"
@@ -178,6 +179,15 @@ Czar::Czar(string const& configFilePath, string const& czarName)
178179
// the name of the Czar gets translated into a numeric identifier.
179180
_czarConfig->setId(_uqFactory->userQuerySharedResources()->czarId);
180181

182+
CzarIdType czarId = _czarConfig->id();
183+
size_t const MB_SIZE_BYTES = 1024 * 1024;
184+
size_t maxResultTableSizeBytes = _czarConfig->getMaxTableSizeMB() * MB_SIZE_BYTES;
185+
size_t maxMemToUse = _czarConfig->getMaxTransferMemMB() * MB_SIZE_BYTES;
186+
string const transferDirectory = _czarConfig->getTransferDir();
187+
std::size_t const transferMinBytesInMem = _czarConfig->getTransferMinMBInMem() * MB_SIZE_BYTES;
188+
mysql::TransferTracker::setup(maxMemToUse, transferDirectory, transferMinBytesInMem,
189+
maxResultTableSizeBytes, czarId);
190+
181191
// Tell workers to cancel any queries that were submitted before this restart of Czar.
182192
// Figure out which query (if any) was recorded in Czar databases before the restart.
183193
// The id will be used as the high-watermark for queries that need to be cancelled.

src/czar/CzarChunkMap.cc

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,10 @@ void CzarChunkMap::verify(string const& familyName) const {
139139
LOGS(_log, LOG_LVL_WARN, cName(__func__) << " family=" << familyName << " verified");
140140
}
141141

142-
string CzarChunkMap::dumpChunkMap(ChunkMap const& chunkMap) {
142+
string CzarChunkMap::dumpChunkMap() const {
143143
stringstream os;
144144
os << "ChunkMap{";
145-
for (auto const& [cId, cDataPtr] : chunkMap) {
145+
for (auto const& [cId, cDataPtr] : *_chunkMap) {
146146
os << "(cId=" << cId << ":";
147147
os << ((cDataPtr == nullptr) ? "null" : cDataPtr->dump()) << ")";
148148
}
@@ -355,6 +355,10 @@ bool CzarFamilyMap::_read() {
355355

356356
verify(familyMapPtr);
357357

358+
for (auto const& [fam, ccMap] : *familyMapPtr) {
359+
LOGS(_log, LOG_LVL_INFO, "{family=" << fam << "{" << ccMap->dumpChunkMap() << "}}");
360+
}
361+
358362
_familyMap = familyMapPtr;
359363

360364
_lastUpdateTime = qChunkMap.updateTime;

src/czar/CzarChunkMap.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ class CzarChunkMap {
205205
/// @throws ChunkMapException
206206
void verify(std::string const& familyName) const;
207207

208-
static std::string dumpChunkMap(ChunkMap const& chunkMap);
208+
std::string dumpChunkMap() const;
209209

210210
static std::string dumpWorkerChunkMap(WorkerChunkMap const& wcMap);
211211

src/czar/CzarRegistry.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,14 +80,13 @@ void CzarRegistry::_registryUpdateLoop() {
8080
string const url = "http://" + _czarConfig->replicationRegistryHost() + ":" +
8181
to_string(_czarConfig->replicationRegistryPort()) + "/czar";
8282
vector<string> const headers = {"Content-Type: application/json"};
83-
string const fqdn = util::getCurrentHostFqdnBlocking();
8483
json const request = json::object({{"instance_id", _czarConfig->replicationInstanceId()},
8584
{"auth_key", _czarConfig->replicationAuthKey()},
8685
{"czar",
8786
{{"name", _czarConfig->name()},
8887
{"id", _czarConfig->id()},
8988
{"management-port", _czarConfig->replicationHttpPort()},
90-
{"management-host-name", fqdn}}}});
89+
{"management-host-name", util::getCurrentHostFqdnBlocking()}}}});
9190
string const requestContext = "Czar: '" + http::method2string(method) + "' request to '" + url + "'";
9291
LOGS(_log, LOG_LVL_TRACE,
9392
__func__ << " czarPost url=" << url << " request=" << request.dump() << " headers=" << headers[0]);

0 commit comments

Comments
 (0)