@@ -93,45 +93,71 @@ ReplicationTask::ReplicationTask(Controller::Ptr const& controller,
9393          _disableQservSync(disableQservSync),
9494          _forceQservSync(forceQservSync),
9595          _qservChunkMapUpdate(qservChunkMapUpdate),
96-           _purge(purge) {}
97- 
98- void  ReplicationTask::_updateChunkMap () {
99-     //  Open MySQL connection using the RAII-style handler that would automatically
100-     //  abort the transaction should any problem occured when loading data into the table.
101-     ConnectionHandler h;
102-     try  {
103-         h.conn  = Connection::open (Configuration::qservCzarDbParams (" qservMeta"  ));
104-     } catch  (exception const & ex) {
105-         error (" failed to connect to the czar's database server, ex: "   + string (ex.what ()));
106-         return ;
107-     }
108-     QueryGenerator const  g (h.conn );
96+           _purge(purge),
97+           _chunkMap(make_shared<ChunkMap>()) {}
10998
99+ bool  ReplicationTask::_getChunkMap () {
110100    //  Get info on known chunk replicas from the persistent store of the Replication system
111-     //  and package those into ready-to-ingest data .
101+     //  and package those into the new chunk disposition map .
112102    bool  const  allDatabases = true ;
113103    string const  emptyDatabaseFilter;
114104    bool  const  isPublished = true ;
115105    bool  const  includeFileInfo = true ;  //  need this to access tables sizes
116-     vector<string> rows ;
106+     shared_ptr<ChunkMap> newChunkMap = make_shared<ChunkMap>() ;
117107    for  (auto  const & workerName : serviceProvider ()->config ()->workers ()) {
118108        vector<ReplicaInfo> replicas;
119109        serviceProvider ()->databaseServices ()->findWorkerReplicas (replicas, workerName, emptyDatabaseFilter,
120110                                                                  allDatabases, isPublished, includeFileInfo);
121111        for  (auto  const & replica : replicas) {
112+             //  Incomplete replicas should not be used by Czar for query processing.
113+             if  (replica.status () != ReplicaInfo::Status::COMPLETE) continue ;
122114            for  (auto  const & fileInfo : replica.fileInfo ()) {
123115                if  (fileInfo.isData () && !fileInfo.isOverlap ()) {
124-                     rows. push_back (g. packVals ( workerName,  replica.database (),  fileInfo.baseTable (), 
125-                                               replica. chunk (),  fileInfo.size )) ;
116+                     (*newChunkMap)[ workerName][ replica.database ()][ fileInfo.baseTable ()][replica. chunk ()] = 
117+                             fileInfo.size ;
126118                }
127119            }
128120        }
129121    }
130-     if  (rows.empty ()) {
131-         warn (" no replicas found in the persistent state of the Replication system"  );
122+ 
123+     //  Update the current map if the new one is different from the current one.
124+     if  (*_chunkMap != *newChunkMap) {
125+         _chunkMap = newChunkMap;
126+         return  true ;
127+     }
128+     return  false ;
129+ }
130+ 
131+ void  ReplicationTask::_updateChunkMap () {
132+     if  (!_getChunkMap () || _chunkMap->empty ()) {
133+         //  No changes in the chunk map, or the map is still empty so there's
134+         //  nothing to do.
132135        return ;
133136    }
134137
138+     //  Open MySQL connection using the RAII-style handler that would automatically
139+     //  abort the transaction should any problem occured when loading data into the table.
140+     ConnectionHandler h;
141+     try  {
142+         h.conn  = Connection::open (Configuration::qservCzarDbParams (" qservMeta"  ));
143+     } catch  (exception const & ex) {
144+         error (" failed to connect to the czar's database server, ex: "   + string (ex.what ()));
145+         return ;
146+     }
147+     QueryGenerator const  g (h.conn );
148+ 
149+     //  Pack the map into ready-to-ingest data.
150+     vector<string> rows;
151+     for  (auto  const & [workerName, databases] : *_chunkMap) {
152+         for  (auto  const & [databaseName, tables] : databases) {
153+             for  (auto  const & [tableName, chunks] : tables) {
154+                 for  (auto  const  [chunkId, size] : chunks) {
155+                     rows.push_back (g.packVals (workerName, databaseName, tableName, chunkId, size));
156+                 }
157+             }
158+         }
159+     }
160+ 
135161    //  Get the limit for the length of the bulk insert queries. The limit is needed
136162    //  to run the query generation.
137163    size_t  maxQueryLength = 0 ;
@@ -163,6 +189,7 @@ void ReplicationTask::_updateChunkMap() {
163189        error (" failed to update chunk map in the Czar database, ex: "   + string (ex.what ()));
164190        return ;
165191    }
192+     info (" chunk map has been updated in the Czar database"  );
166193}
167194
168195}  //  namespace lsst::qserv::replica
0 commit comments