Skip to content

Commit 53a06a2

Browse files
committed
Changed FQDN calls to blocking or using stored value.
1 parent f5e04ab commit 53a06a2

File tree

7 files changed

+72
-55
lines changed

7 files changed

+72
-55
lines changed

src/cconfig/CzarConfig.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ class CzarConfig {
131131
int getMaxTableSizeMB() const { return _maxTableSizeMB->getVal(); }
132132
int getMaxSqlConnectionAttempts() const { return _maxSqlConnectionAttempts->getVal(); }
133133

134-
/// The size of the TCP connection pool witin the client API that is used
134+
/// The size of the TCP connection pool within the client API that is used
135135
/// by the merger to pool result files from workers via the HTTP protocol.
136136
int getResultMaxHttpConnections() const { return _resultMaxHttpConnections->getVal(); }
137137

src/czar/Czar.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,8 @@ Czar::Czar(string const& configFilePath, string const& czarName)
160160
_uqFactory(),
161161
_clientToQuery(),
162162
_monitorSleepTime(_czarConfig->getMonitorSleepTimeMilliSec()),
163-
_activeWorkerMap(new ActiveWorkerMap(_czarConfig)) {
163+
_activeWorkerMap(new ActiveWorkerMap(_czarConfig)),
164+
_fqdn(util::getCurrentHostFqdnBlocking()) {
164165
// set id counter to milliseconds since the epoch, mod 1 year.
165166
struct timeval tv;
166167
gettimeofday(&tv, nullptr);

src/czar/Czar.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,8 @@ class Czar {
161161

162162
std::shared_ptr<http::ClientConnPool> getCommandHttpPool() const { return _commandHttpPool; }
163163

164+
std::string const& getFqdn() const { return _fqdn; }
165+
164166
/// Startup time of czar, sent to workers so they can detect that the czar was
165167
/// was restarted when this value changes.
166168
static uint64_t const czarStartupTime;
@@ -254,6 +256,9 @@ class Czar {
254256
/// Pool of http client connections for sending commands (UberJobs
255257
/// and worker status requests).
256258
std::shared_ptr<http::ClientConnPool> _commandHttpPool;
259+
260+
/// FQDN for this czar.
261+
std::string const _fqdn;
257262
};
258263

259264
} // namespace lsst::qserv::czar

src/czar/CzarRegistry.cc

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,14 @@ void CzarRegistry::_registryUpdateLoop() {
8080
string const url = "http://" + _czarConfig->replicationRegistryHost() + ":" +
8181
to_string(_czarConfig->replicationRegistryPort()) + "/czar";
8282
vector<string> const headers = {"Content-Type: application/json"};
83+
string const fqdn = util::getCurrentHostFqdnBlocking();
8384
json const request = json::object({{"instance_id", _czarConfig->replicationInstanceId()},
8485
{"auth_key", _czarConfig->replicationAuthKey()},
8586
{"czar",
8687
{{"name", _czarConfig->name()},
8788
{"id", _czarConfig->id()},
8889
{"management-port", _czarConfig->replicationHttpPort()},
89-
{"management-host-name", util::get_current_host_fqdn()}}}});
90+
{"management-host-name", fqdn}}}});
9091
string const requestContext = "Czar: '" + http::method2string(method) + "' request to '" + url + "'";
9192
LOGS(_log, LOG_LVL_TRACE,
9293
__func__ << " czarPost url=" << url << " request=" << request.dump() << " headers=" << headers[0]);
@@ -113,7 +114,7 @@ void CzarRegistry::_registryWorkerInfoLoop() {
113114
string const replicationInstanceId = _czarConfig->replicationInstanceId();
114115
string const replicationAuthKey = _czarConfig->replicationAuthKey();
115116
uint64_t const czarStartTime = Czar::czarStartupTime;
116-
117+
string const fqdn = util::getCurrentHostFqdnBlocking();
117118
vector<string> const headers;
118119
auto const method = http::Method::GET;
119120
string const url = "http://" + _czarConfig->replicationRegistryHost() + ":" +
@@ -133,9 +134,9 @@ void CzarRegistry::_registryWorkerInfoLoop() {
133134
protojson::WorkerContactInfo::WCMapPtr wMap = _buildMapFromJson(response);
134135
// Update the values in the map
135136
{
136-
auto czInfo = protojson::CzarContactInfo::create(
137-
_czarConfig->name(), _czarConfig->id(), _czarConfig->replicationHttpPort(),
138-
util::get_current_host_fqdn(), czarStartTime);
137+
auto czInfo = protojson::CzarContactInfo::create(_czarConfig->name(), _czarConfig->id(),
138+
_czarConfig->replicationHttpPort(), fqdn,
139+
czarStartTime);
139140
lock_guard lck(_cmapMtx);
140141
if (wMap != nullptr) {
141142
_contactMap = wMap;

src/qdisp/UberJob.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ void UberJob::runUberJob() {
127127
uint64_t maxTableSizeMB = czarConfig->getMaxTableSizeMB();
128128
auto czInfo = protojson::CzarContactInfo::create(
129129
czarConfig->name(), czarConfig->id(), czarConfig->replicationHttpPort(),
130-
util::get_current_host_fqdn(), czar::Czar::czarStartupTime);
130+
czar::Czar::getCzar()->getFqdn(), czar::Czar::czarStartupTime);
131131
auto scanInfoPtr = exec->getScanInfo();
132132
bool scanInteractive = exec->getScanInteractive();
133133

src/wmain/WorkerMain.cc

Lines changed: 52 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -97,46 +97,6 @@ std::shared_ptr<wpublish::ChunkInventory> makeChunkInventory(string const& worke
9797
return inventory;
9898
}
9999

100-
/**
101-
* This function will keep periodically updating worker's info in the Replication
102-
* System's Registry.
103-
* @param id The unique identifier of a worker to be registered.
104-
* @note The thread will terminate the process if the registraton request to the Registry
105-
* was explicitly denied by the service. This means the application may be misconfigured.
106-
* Transient communication errors when attempting to connect or send requests to
107-
* the Registry will be posted into the log stream and ignored.
108-
*/
109-
void registryUpdateLoop(string const& id) {
110-
auto const workerConfig = wconfig::WorkerConfig::instance();
111-
auto const method = http::Method::POST;
112-
string const url = "http://" + workerConfig->replicationRegistryHost() + ":" +
113-
to_string(workerConfig->replicationRegistryPort()) + "/qserv-worker";
114-
vector<string> const headers = {"Content-Type: application/json"};
115-
json const request = json::object({{"version", http::MetaModule::version},
116-
{"instance_id", workerConfig->replicationInstanceId()},
117-
{"auth_key", workerConfig->replicationAuthKey()},
118-
{"worker",
119-
{{"name", id},
120-
{"management-port", workerConfig->replicationHttpPort()},
121-
{"management-host-name", util::get_current_host_fqdn()}}}});
122-
string const requestContext =
123-
"WorkerMain: '" + http::method2string(method) + "' request to '" + url + "'";
124-
http::Client client(method, url, request.dump(), headers);
125-
while (true) {
126-
try {
127-
json const response = client.readAsJson();
128-
if (0 == response.at("success").get<int>()) {
129-
string const error = response.at("error").get<string>();
130-
LOGS(_log, LOG_LVL_ERROR, requestContext + " was denied, error: '" + error + "'.");
131-
abort();
132-
}
133-
} catch (exception const& ex) {
134-
LOGS(_log, LOG_LVL_WARN, requestContext + " failed, ex: " + ex.what());
135-
}
136-
this_thread::sleep_for(chrono::seconds(max(1U, workerConfig->replicationRegistryHearbeatIvalSec())));
137-
}
138-
}
139-
140100
} // namespace
141101

142102
namespace lsst::qserv::wmain {
@@ -266,24 +226,70 @@ WorkerMain::WorkerMain() {
266226

267227
// Begin periodically updating worker's status in the Replication System's registry
268228
// in the detached thread. This will continue before the application gets terminated.
269-
thread registryUpdateThread(::registryUpdateLoop, _foreman->chunkInventory()->id());
270-
registryUpdateThread.detach();
229+
thread registryUpdateThread(&WorkerMain::_registryUpdateLoop, this);
230+
_registryUpdateThread = move(registryUpdateThread);
271231
}
272232

273233
void WorkerMain::waitForTerminate() {
274234
unique_lock uniq(_terminateMtx);
275-
_terminateCv.wait(uniq, [this]() { return _terminate; });
235+
_terminateCv.wait(uniq, [this]() -> bool { return _terminate; });
276236
}
277237

278238
void WorkerMain::terminate() {
279-
lock_guard lck(_terminateMtx);
280-
_terminate = true;
239+
{
240+
lock_guard lck(_terminateMtx);
241+
if (_terminate.exchange(true)) return;
242+
;
243+
}
281244
_terminateCv.notify_all();
245+
_controlHttpSvc->stop();
282246
}
283247

284248
WorkerMain::~WorkerMain() {
285249
LOGS(_log, LOG_LVL_INFO, "WorkerMain shutdown.");
286-
_controlHttpSvc->stop();
250+
terminate();
251+
_registryUpdateThread.join();
252+
}
253+
254+
/**
255+
* This function will keep periodically updating worker's info in the Replication
256+
* System's Registry.
257+
* @param id The unique identifier of a worker to be registered.
258+
* @note The thread will terminate the process if the registraton request to the Registry
259+
* was explicitly denied by the service. This means the application may be misconfigured.
260+
* Transient communication errors when attempting to connect or send requests to
261+
* the Registry will be posted into the log stream and ignored.
262+
*/
263+
void WorkerMain::_registryUpdateLoop() {
264+
string const id = _foreman->chunkInventory()->id();
265+
auto const workerConfig = wconfig::WorkerConfig::instance();
266+
auto const method = http::Method::POST;
267+
string const url = "http://" + workerConfig->replicationRegistryHost() + ":" +
268+
to_string(workerConfig->replicationRegistryPort()) + "/qserv-worker";
269+
vector<string> const headers = {"Content-Type: application/json"};
270+
json const request = json::object({{"version", http::MetaModule::version},
271+
{"instance_id", workerConfig->replicationInstanceId()},
272+
{"auth_key", workerConfig->replicationAuthKey()},
273+
{"worker",
274+
{{"name", id},
275+
{"management-port", workerConfig->replicationHttpPort()},
276+
{"management-host-name", _foreman->getFqdn()}}}});
277+
string const requestContext =
278+
"WorkerMain: '" + http::method2string(method) + "' request to '" + url + "'";
279+
http::Client client(method, url, request.dump(), headers);
280+
while (!_terminate) {
281+
try {
282+
json const response = client.readAsJson();
283+
if (0 == response.at("success").get<int>()) {
284+
string const error = response.at("error").get<string>();
285+
LOGS(_log, LOG_LVL_ERROR, requestContext + " was denied, error: '" + error + "'.");
286+
abort();
287+
}
288+
} catch (exception const& ex) {
289+
LOGS(_log, LOG_LVL_WARN, requestContext + " failed, ex: " + ex.what());
290+
}
291+
this_thread::sleep_for(chrono::seconds(max(1U, workerConfig->replicationRegistryHearbeatIvalSec())));
292+
}
287293
}
288294

289295
} // namespace lsst::qserv::wmain

src/wmain/WorkerMain.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,16 @@ class WorkerMain {
5959

6060
std::string getName() const { return _name; }
6161

62+
/// End WorkerMain, calling this multiple times is harmless.
6263
void terminate();
6364
void waitForTerminate();
6465

6566
private:
6667
WorkerMain();
6768

69+
void _registryUpdateLoop();
70+
std::thread _registryUpdateThread;
71+
6872
/// Weak pointer to allow global access without complicating lifetime issues.
6973
static std::weak_ptr<WorkerMain> _globalWorkerMain;
7074

@@ -85,7 +89,7 @@ class WorkerMain {
8589
std::shared_ptr<wcomms::HttpSvc> _controlHttpSvc;
8690

8791
/// Set to true when the program should terminate.
88-
bool _terminate = false;
92+
std::atomic<bool> _terminate{false};
8993
std::mutex _terminateMtx;
9094
std::condition_variable _terminateCv;
9195
};

0 commit comments

Comments
 (0)