Skip to content

Commit 2d26b12

Browse files
iagaponenkojgates108
authored andcommitted
Improved implementatin of the chunk map building algorithm
1 parent c939cd2 commit 2d26b12

File tree

2 files changed

+77
-25
lines changed

2 files changed

+77
-25
lines changed

src/replica/contr/ReplicationTask.cc

Lines changed: 46 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -93,45 +93,71 @@ ReplicationTask::ReplicationTask(Controller::Ptr const& controller,
9393
_disableQservSync(disableQservSync),
9494
_forceQservSync(forceQservSync),
9595
_qservChunkMapUpdate(qservChunkMapUpdate),
96-
_purge(purge) {}
97-
98-
void ReplicationTask::_updateChunkMap() {
99-
// Open MySQL connection using the RAII-style handler that would automatically
100-
// abort the transaction should any problem occured when loading data into the table.
101-
ConnectionHandler h;
102-
try {
103-
h.conn = Connection::open(Configuration::qservCzarDbParams("qservMeta"));
104-
} catch (exception const& ex) {
105-
error("failed to connect to the czar's database server, ex: " + string(ex.what()));
106-
return;
107-
}
108-
QueryGenerator const g(h.conn);
96+
_purge(purge),
97+
_chunkMap(make_shared<ChunkMap>()) {}
10998

99+
bool ReplicationTask::_getChunkMap() {
110100
// Get info on known chunk replicas from the persistent store of the Replication system
111-
// and package those into ready-to-ingest data.
101+
// and package those into the new chunk disposition map.
112102
bool const allDatabases = true;
113103
string const emptyDatabaseFilter;
114104
bool const isPublished = true;
115105
bool const includeFileInfo = true; // need this to access tables sizes
116-
vector<string> rows;
106+
shared_ptr<ChunkMap> newChunkMap = make_shared<ChunkMap>();
117107
for (auto const& workerName : serviceProvider()->config()->workers()) {
118108
vector<ReplicaInfo> replicas;
119109
serviceProvider()->databaseServices()->findWorkerReplicas(replicas, workerName, emptyDatabaseFilter,
120110
allDatabases, isPublished, includeFileInfo);
121111
for (auto const& replica : replicas) {
112+
// Incomplete replicas should not be used by Czar for query processing.
113+
if (replica.status() != ReplicaInfo::Status::COMPLETE) continue;
122114
for (auto const& fileInfo : replica.fileInfo()) {
123115
if (fileInfo.isData() && !fileInfo.isOverlap()) {
124-
rows.push_back(g.packVals(workerName, replica.database(), fileInfo.baseTable(),
125-
replica.chunk(), fileInfo.size));
116+
(*newChunkMap)[workerName][replica.database()][fileInfo.baseTable()][replica.chunk()] =
117+
fileInfo.size;
126118
}
127119
}
128120
}
129121
}
130-
if (rows.empty()) {
131-
warn("no replicas found in the persistent state of the Replication system");
122+
123+
// Update the current map if the new one is different from the current one.
124+
if (*_chunkMap != *newChunkMap) {
125+
_chunkMap = newChunkMap;
126+
return true;
127+
}
128+
return false;
129+
}
130+
131+
void ReplicationTask::_updateChunkMap() {
132+
if (!_getChunkMap() || _chunkMap->empty()) {
133+
// No changes in the chunk map, or the map is still empty so there's
134+
// nothing to do.
132135
return;
133136
}
134137

138+
// Open MySQL connection using the RAII-style handler that would automatically
139+
// abort the transaction should any problem occured when loading data into the table.
140+
ConnectionHandler h;
141+
try {
142+
h.conn = Connection::open(Configuration::qservCzarDbParams("qservMeta"));
143+
} catch (exception const& ex) {
144+
error("failed to connect to the czar's database server, ex: " + string(ex.what()));
145+
return;
146+
}
147+
QueryGenerator const g(h.conn);
148+
149+
// Pack the map into ready-to-ingest data.
150+
vector<string> rows;
151+
for (auto const& [workerName, databases] : *_chunkMap) {
152+
for (auto const& [databaseName, tables] : databases) {
153+
for (auto const& [tableName, chunks] : tables) {
154+
for (auto const [chunkId, size] : chunks) {
155+
rows.push_back(g.packVals(workerName, databaseName, tableName, chunkId, size));
156+
}
157+
}
158+
}
159+
}
160+
135161
// Get the limit for the length of the bulk insert queries. The limit is needed
136162
// to run the query generation.
137163
size_t maxQueryLength = 0;
@@ -163,6 +189,7 @@ void ReplicationTask::_updateChunkMap() {
163189
error("failed to update chunk map in the Czar database, ex: " + string(ex.what()));
164190
return;
165191
}
192+
info("chunk map has been updated in the Czar database");
166193
}
167194

168195
} // namespace lsst::qserv::replica

src/replica/contr/ReplicationTask.h

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@
2121
#ifndef LSST_QSERV_REPLICATIONTASK_H
2222
#define LSST_QSERV_REPLICATIONTASK_H
2323

24+
// System headers
25+
#include <cstdint>
26+
#include <map>
27+
#include <string>
28+
2429
// Qserv headers
2530
#include "replica/contr/Task.h"
2631

@@ -56,7 +61,8 @@ class ReplicationTask : public Task {
5661
* up on the Qserv synchronization requests.
5762
* @param disableQservSync Disable replica synchronization at Qserv workers if 'true'.
5863
* @param forceQservSync Force chunk removal at worker resource collections if 'true'.
59-
* @param qservChunkMapUpdate Update the chunk disposition map in Qserv's QMeta database if 'true'.
64+
* @param qservChunkMapUpdate Enable updating the chunk disposition map in Qserv's QMeta database if
65+
* 'true'.
6066
* @param replicationIntervalSec The number of seconds to wait in the end of each
6167
* iteration loop before to begin the new one.
6268
* @param purge Purge excess replicas if 'true'.
@@ -77,17 +83,36 @@ class ReplicationTask : public Task {
7783
unsigned int qservSyncTimeoutSec, bool disableQservSync, bool forceQservSync,
7884
bool qservChunkMapUpdate, unsigned int replicationIntervalSec, bool purge);
7985

86+
/// Get info on known chunk replicas from the persistent store of the Replication system
87+
/// and package those into the new chunk disposition map. Update the current map if the new one is
88+
/// different from the current one.
89+
/// @return 'true' if the map has been updated, 'false' otherwise.
90+
bool _getChunkMap();
91+
92+
/// Update the chunk disposition map in QMeta when changes in the map are detected.
8093
void _updateChunkMap();
8194

8295
/// The maximum number of seconds to be waited before giving up
8396
/// on the Qserv synchronization requests.
8497
unsigned int const _qservSyncTimeoutSec;
8598

86-
bool const _disableQservSync; ///< Disable replica synchroization at Qserv workers if 'true'.
87-
bool const _forceQservSync; ///< Force removal at worker resource collections if 'true'.
88-
bool const
89-
_qservChunkMapUpdate; /// Update the chunk disposition map in Qserv's QMeta database if 'true'.
90-
bool const _purge; ///< Purge excess replicas if 'true'.
99+
bool const _disableQservSync; ///< Disable replica synchroization at Qserv workers if 'true'.
100+
bool const _forceQservSync; ///< Force removal at worker resource collections if 'true'.
101+
bool const _qservChunkMapUpdate; ///< Enable updating the chunk disposition map in Qserv's QMeta
102+
/// database if 'true'.
103+
bool const _purge; ///< Purge excess replicas if 'true'.
104+
105+
/// [worker] -> [database] -> [baseTable] -> [chunk] -> size
106+
///
107+
/// The map represents the information on the replica disposition across Qserv workers.
108+
/// The information is obtained from the persistent state of the Replication system on each
109+
/// run of the task. The maps gets updated only if the new map is different from the current one.
110+
///
111+
using ChunkMap =
112+
std::map<std::string,
113+
std::map<std::string, std::map<std::string, std::map<unsigned int, std::uint64_t>>>>;
114+
115+
std::shared_ptr<ChunkMap> _chunkMap; ///< The current chunk disposition map
91116
};
92117

93118
} // namespace lsst::qserv::replica

0 commit comments

Comments
 (0)