Skip to content

Commit 17830dd

Browse files
authored
Merge pull request #942 from lsst/tickets/DM-51795
Changed FQDN calls to blocking or using stored value.
2 parents c16c9ab + 2021e5e commit 17830dd

File tree

7 files changed

+72
-55
lines changed

7 files changed

+72
-55
lines changed

src/cconfig/CzarConfig.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ class CzarConfig {
125125
int getMaxTableSizeMB() const { return _maxTableSizeMB->getVal(); }
126126
int getMaxSqlConnectionAttempts() const { return _maxSqlConnectionAttempts->getVal(); }
127127

128-
/// The size of the TCP connection pool witin the client API that is used
128+
/// The size of the TCP connection pool within the client API that is used
129129
/// by the merger to pool result files from workers via the HTTP protocol.
130130
int getResultMaxHttpConnections() const { return _resultMaxHttpConnections->getVal(); }
131131

src/czar/Czar.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@ Czar::Czar(string const& configFilePath, string const& czarName)
163163
_uqFactory(),
164164
_clientToQuery(),
165165
_monitorSleepTime(_czarConfig->getMonitorSleepTimeMilliSec()),
166-
_activeWorkerMap(new ActiveWorkerMap(_czarConfig)) {
166+
_activeWorkerMap(new ActiveWorkerMap(_czarConfig)),
167+
_fqdn(util::getCurrentHostFqdnBlocking()) {
167168
// set id counter to milliseconds since the epoch, mod 1 year.
168169
struct timeval tv;
169170
gettimeofday(&tv, nullptr);

src/czar/Czar.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,8 @@ class Czar {
161161

162162
std::shared_ptr<http::ClientConnPool> getCommandHttpPool() const { return _commandHttpPool; }
163163

164+
std::string const& getFqdn() const { return _fqdn; }
165+
164166
/// Startup time of czar, sent to workers so they can detect that the czar was
165167
/// was restarted when this value changes.
166168
static uint64_t const czarStartupTime;
@@ -254,6 +256,9 @@ class Czar {
254256
/// Pool of http client connections for sending commands (UberJobs
255257
/// and worker status requests).
256258
std::shared_ptr<http::ClientConnPool> _commandHttpPool;
259+
260+
/// FQDN for this czar.
261+
std::string const _fqdn;
257262
};
258263

259264
} // namespace lsst::qserv::czar

src/czar/CzarRegistry.cc

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,14 @@ void CzarRegistry::_registryUpdateLoop() {
8080
string const url = "http://" + _czarConfig->replicationRegistryHost() + ":" +
8181
to_string(_czarConfig->replicationRegistryPort()) + "/czar";
8282
vector<string> const headers = {"Content-Type: application/json"};
83+
string const fqdn = util::getCurrentHostFqdnBlocking();
8384
json const request = json::object({{"instance_id", _czarConfig->replicationInstanceId()},
8485
{"auth_key", _czarConfig->replicationAuthKey()},
8586
{"czar",
8687
{{"name", _czarConfig->name()},
8788
{"id", _czarConfig->id()},
8889
{"management-port", _czarConfig->replicationHttpPort()},
89-
{"management-host-name", util::get_current_host_fqdn()}}}});
90+
{"management-host-name", fqdn}}}});
9091
string const requestContext = "Czar: '" + http::method2string(method) + "' request to '" + url + "'";
9192
LOGS(_log, LOG_LVL_TRACE,
9293
__func__ << " czarPost url=" << url << " request=" << request.dump() << " headers=" << headers[0]);
@@ -113,7 +114,7 @@ void CzarRegistry::_registryWorkerInfoLoop() {
113114
string const replicationInstanceId = _czarConfig->replicationInstanceId();
114115
string const replicationAuthKey = _czarConfig->replicationAuthKey();
115116
uint64_t const czarStartTime = Czar::czarStartupTime;
116-
117+
string const fqdn = util::getCurrentHostFqdnBlocking();
117118
vector<string> const headers;
118119
auto const method = http::Method::GET;
119120
string const url = "http://" + _czarConfig->replicationRegistryHost() + ":" +
@@ -133,9 +134,9 @@ void CzarRegistry::_registryWorkerInfoLoop() {
133134
protojson::WorkerContactInfo::WCMapPtr wMap = _buildMapFromJson(response);
134135
// Update the values in the map
135136
{
136-
auto czInfo = protojson::CzarContactInfo::create(
137-
_czarConfig->name(), _czarConfig->id(), _czarConfig->replicationHttpPort(),
138-
util::get_current_host_fqdn(), czarStartTime);
137+
auto czInfo = protojson::CzarContactInfo::create(_czarConfig->name(), _czarConfig->id(),
138+
_czarConfig->replicationHttpPort(), fqdn,
139+
czarStartTime);
139140
lock_guard lck(_cmapMtx);
140141
if (wMap != nullptr) {
141142
_contactMap = wMap;

src/qdisp/UberJob.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ void UberJob::runUberJob() {
127127
uint64_t maxTableSizeMB = czarConfig->getMaxTableSizeMB();
128128
auto czInfo = protojson::CzarContactInfo::create(
129129
czarConfig->name(), czarConfig->id(), czarConfig->replicationHttpPort(),
130-
util::get_current_host_fqdn(), czar::Czar::czarStartupTime);
130+
czar::Czar::getCzar()->getFqdn(), czar::Czar::czarStartupTime);
131131
auto scanInfoPtr = exec->getScanInfo();
132132
bool scanInteractive = exec->getScanInteractive();
133133

src/wmain/WorkerMain.cc

Lines changed: 52 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -95,46 +95,6 @@ std::shared_ptr<wpublish::ChunkInventory> makeChunkInventory(string const& worke
9595
return inventory;
9696
}
9797

98-
/**
99-
* This function will keep periodically updating worker's info in the Replication
100-
* System's Registry.
101-
* @param id The unique identifier of a worker to be registered.
102-
* @note The thread will terminate the process if the registraton request to the Registry
103-
* was explicitly denied by the service. This means the application may be misconfigured.
104-
* Transient communication errors when attempting to connect or send requests to
105-
* the Registry will be posted into the log stream and ignored.
106-
*/
107-
void registryUpdateLoop(string const& id) {
108-
auto const workerConfig = wconfig::WorkerConfig::instance();
109-
auto const method = http::Method::POST;
110-
string const url = "http://" + workerConfig->replicationRegistryHost() + ":" +
111-
to_string(workerConfig->replicationRegistryPort()) + "/qserv-worker";
112-
vector<string> const headers = {"Content-Type: application/json"};
113-
json const request = json::object({{"version", http::MetaModule::version},
114-
{"instance_id", workerConfig->replicationInstanceId()},
115-
{"auth_key", workerConfig->replicationAuthKey()},
116-
{"worker",
117-
{{"name", id},
118-
{"management-port", workerConfig->replicationHttpPort()},
119-
{"management-host-name", util::get_current_host_fqdn()}}}});
120-
string const requestContext =
121-
"WorkerMain: '" + http::method2string(method) + "' request to '" + url + "'";
122-
http::Client client(method, url, request.dump(), headers);
123-
while (true) {
124-
try {
125-
json const response = client.readAsJson();
126-
if (0 == response.at("success").get<int>()) {
127-
string const error = response.at("error").get<string>();
128-
LOGS(_log, LOG_LVL_ERROR, requestContext + " was denied, error: '" + error + "'.");
129-
abort();
130-
}
131-
} catch (exception const& ex) {
132-
LOGS(_log, LOG_LVL_WARN, requestContext + " failed, ex: " + ex.what());
133-
}
134-
this_thread::sleep_for(chrono::seconds(max(1U, workerConfig->replicationRegistryHearbeatIvalSec())));
135-
}
136-
}
137-
13898
} // namespace
13999

140100
namespace lsst::qserv::wmain {
@@ -265,24 +225,70 @@ WorkerMain::WorkerMain() {
265225

266226
// Begin periodically updating worker's status in the Replication System's registry
267227
// in the detached thread. This will continue before the application gets terminated.
268-
thread registryUpdateThread(::registryUpdateLoop, _foreman->chunkInventory()->id());
269-
registryUpdateThread.detach();
228+
thread registryUpdateThread(&WorkerMain::_registryUpdateLoop, this);
229+
_registryUpdateThread = move(registryUpdateThread);
270230
}
271231

272232
void WorkerMain::waitForTerminate() {
273233
unique_lock uniq(_terminateMtx);
274-
_terminateCv.wait(uniq, [this]() { return _terminate; });
234+
_terminateCv.wait(uniq, [this]() -> bool { return _terminate; });
275235
}
276236

277237
void WorkerMain::terminate() {
278-
lock_guard lck(_terminateMtx);
279-
_terminate = true;
238+
{
239+
lock_guard lck(_terminateMtx);
240+
if (_terminate.exchange(true)) return;
241+
;
242+
}
280243
_terminateCv.notify_all();
244+
_controlHttpSvc->stop();
281245
}
282246

283247
WorkerMain::~WorkerMain() {
284248
LOGS(_log, LOG_LVL_INFO, "WorkerMain shutdown.");
285-
_controlHttpSvc->stop();
249+
terminate();
250+
_registryUpdateThread.join();
251+
}
252+
253+
/**
254+
* This function will keep periodically updating worker's info in the Replication
255+
* System's Registry.
256+
* @param id The unique identifier of a worker to be registered.
257+
* @note The thread will terminate the process if the registraton request to the Registry
258+
* was explicitly denied by the service. This means the application may be misconfigured.
259+
* Transient communication errors when attempting to connect or send requests to
260+
* the Registry will be posted into the log stream and ignored.
261+
*/
262+
void WorkerMain::_registryUpdateLoop() {
263+
string const id = _foreman->chunkInventory()->id();
264+
auto const workerConfig = wconfig::WorkerConfig::instance();
265+
auto const method = http::Method::POST;
266+
string const url = "http://" + workerConfig->replicationRegistryHost() + ":" +
267+
to_string(workerConfig->replicationRegistryPort()) + "/qserv-worker";
268+
vector<string> const headers = {"Content-Type: application/json"};
269+
json const request = json::object({{"version", http::MetaModule::version},
270+
{"instance_id", workerConfig->replicationInstanceId()},
271+
{"auth_key", workerConfig->replicationAuthKey()},
272+
{"worker",
273+
{{"name", id},
274+
{"management-port", workerConfig->replicationHttpPort()},
275+
{"management-host-name", _foreman->getFqdn()}}}});
276+
string const requestContext =
277+
"WorkerMain: '" + http::method2string(method) + "' request to '" + url + "'";
278+
http::Client client(method, url, request.dump(), headers);
279+
while (!_terminate) {
280+
try {
281+
json const response = client.readAsJson();
282+
if (0 == response.at("success").get<int>()) {
283+
string const error = response.at("error").get<string>();
284+
LOGS(_log, LOG_LVL_ERROR, requestContext + " was denied, error: '" + error + "'.");
285+
abort();
286+
}
287+
} catch (exception const& ex) {
288+
LOGS(_log, LOG_LVL_WARN, requestContext + " failed, ex: " + ex.what());
289+
}
290+
this_thread::sleep_for(chrono::seconds(max(1U, workerConfig->replicationRegistryHearbeatIvalSec())));
291+
}
286292
}
287293

288294
} // namespace lsst::qserv::wmain

src/wmain/WorkerMain.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,16 @@ class WorkerMain {
5959

6060
std::string getName() const { return _name; }
6161

62+
/// End WorkerMain, calling this multiple times is harmless.
6263
void terminate();
6364
void waitForTerminate();
6465

6566
private:
6667
WorkerMain();
6768

69+
void _registryUpdateLoop();
70+
std::thread _registryUpdateThread;
71+
6872
/// Weak pointer to allow global access without complicating lifetime issues.
6973
static std::weak_ptr<WorkerMain> _globalWorkerMain;
7074

@@ -85,7 +89,7 @@ class WorkerMain {
8589
std::shared_ptr<wcomms::HttpSvc> _controlHttpSvc;
8690

8791
/// Set to true when the program should terminate.
88-
bool _terminate = false;
92+
std::atomic<bool> _terminate{false};
8993
std::mutex _terminateMtx;
9094
std::condition_variable _terminateCv;
9195
};

0 commit comments

Comments
 (0)