@@ -236,232 +236,6 @@ string ActiveWorker::_dump() const {
236
236
return os.str ();
237
237
}
238
238
239
- string ActiveWorker::getStateStr (State st) {
240
- switch (st) {
241
- case ALIVE:
242
- return string (" ALIVE" );
243
- case QUESTIONABLE:
244
- return string (" QUESTIONABLE" );
245
- case DEAD:
246
- return string (" DEAD" );
247
- }
248
- return string (" unknown" );
249
- }
250
-
251
- bool ActiveWorker::compareContactInfo (http::WorkerContactInfo const & wcInfo) const {
252
- lock_guard<mutex> lg (_aMtx);
253
- auto wInfo_ = _wqsData->getWInfo ();
254
- if (wInfo_ == nullptr ) return false ;
255
- return wInfo_->isSameContactInfo (wcInfo);
256
- }
257
-
258
- void ActiveWorker::setWorkerContactInfo (protojson::WorkerContactInfo::Ptr const & wcInfo) {
259
- LOGS (_log, LOG_LVL_INFO, cName (__func__) << " new info=" << wcInfo->dump ());
260
- lock_guard<mutex> lg (_aMtx);
261
- _wqsData->setWInfo (wcInfo);
262
- }
263
-
264
- void ActiveWorker::_changeStateTo (State newState, double secsSinceUpdate, string const & note) {
265
- auto lLvl = (newState == DEAD) ? LOG_LVL_ERROR : LOG_LVL_INFO;
266
- LOGS (_log, lLvl,
267
- note << " oldState=" << getStateStr (_state) << " newState=" << getStateStr (newState)
268
- << " secsSince=" << secsSinceUpdate);
269
- _state = newState;
270
- }
271
-
272
- void ActiveWorker::updateStateAndSendMessages (double timeoutAliveSecs, double timeoutDeadSecs,
273
- double maxLifetime) {
274
- LOGS (_log, LOG_LVL_TRACE, cName (__func__) << " start" );
275
- bool newlyDeadWorker = false ;
276
- protojson::WorkerContactInfo::Ptr wInfo_;
277
- {
278
- lock_guard<mutex> lg (_aMtx);
279
- wInfo_ = _wqsData->getWInfo ();
280
- if (wInfo_ == nullptr ) {
281
- LOGS (_log, LOG_LVL_ERROR, cName (__func__) << " no WorkerContactInfo" );
282
- return ;
283
- }
284
- double secsSinceUpdate = wInfo_->timeSinceRegUpdateSeconds ();
285
- LOGS (_log, LOG_LVL_TRACE,
286
- cName (__func__) << " wInfo=" << wInfo_->dump ()
287
- << " secsSince=" << wInfo_->timeSinceRegUpdateSeconds ()
288
- << " secsSinceUpdate=" << secsSinceUpdate);
289
-
290
- // Update the last time the registry contacted this worker.
291
- // TODO:UJ - This needs to be added to the dashboard.
292
- switch (_state) {
293
- case ALIVE: {
294
- if (secsSinceUpdate >= timeoutAliveSecs) {
295
- _changeStateTo (QUESTIONABLE, secsSinceUpdate, cName (__func__));
296
- }
297
- break ;
298
- }
299
- case QUESTIONABLE: {
300
- if (secsSinceUpdate < timeoutAliveSecs) {
301
- _changeStateTo (ALIVE, secsSinceUpdate, cName (__func__));
302
- }
303
- if (secsSinceUpdate >= timeoutDeadSecs) {
304
- _changeStateTo (DEAD, secsSinceUpdate, cName (__func__));
305
- // All uberjobs for this worker need to die.
306
- newlyDeadWorker = true ;
307
- }
308
- break ;
309
- }
310
- case DEAD: {
311
- if (secsSinceUpdate < timeoutAliveSecs) {
312
- _changeStateTo (ALIVE, secsSinceUpdate, cName (__func__));
313
- } else {
314
- // Don't waste time on this worker until the registry has heard from it.
315
- return ;
316
- }
317
- break ;
318
- }
319
- }
320
- }
321
-
322
- // _aMtx must not be held when calling this.
323
- if (newlyDeadWorker) {
324
- LOGS (_log, LOG_LVL_WARN,
325
- cName (__func__) << " worker " << wInfo_->wId << " appears to have died, reassigning its jobs." );
326
- czar::Czar::getCzar ()->killIncompleteUbjerJobsOn (wInfo_->wId );
327
- }
328
-
329
- shared_ptr<json> jsWorkerReqPtr;
330
- {
331
- // Go through the _qIdDoneKeepFiles, _qIdDoneDeleteFiles, and _qIdDeadUberJobs lists to build a
332
- // message to send to the worker.
333
- jsWorkerReqPtr = _wqsData->serializeJson (maxLifetime);
334
- }
335
-
336
- // Always send the message as it's a way to inform the worker that this
337
- // czar is functioning and capable of receiving requests.
338
- Ptr thisPtr = shared_from_this ();
339
- auto sendStatusMsgFunc = [thisPtr, wInfo_, jsWorkerReqPtr](util::CmdData*) {
340
- thisPtr->_sendStatusMsg (wInfo_, jsWorkerReqPtr);
341
- };
342
-
343
- auto cmd = util::PriorityCommand::Ptr (new util::PriorityCommand (sendStatusMsgFunc));
344
- auto qdisppool = czar::Czar::getCzar ()->getQdispPool ();
345
- LOGS (_log, LOG_LVL_DEBUG, cName (__func__) << " queuing message" );
346
- qdisppool->queCmd (cmd, 1 );
347
- }
348
-
349
- void ActiveWorker::_sendStatusMsg (protojson::WorkerContactInfo::Ptr const & wInf,
350
- std::shared_ptr<nlohmann::json> const & jsWorkerReqPtr) {
351
- auto & jsWorkerReq = *jsWorkerReqPtr;
352
- auto const method = http::Method::POST;
353
- if (wInf == nullptr ) {
354
- LOGS (_log, LOG_LVL_ERROR, cName (__func__) << " wInfo was null." );
355
- return ;
356
- }
357
- auto [ciwId, ciwHost, ciwManag, ciwPort] = wInf->getAll ();
358
- string const url = " http://" + ciwHost + " :" + to_string (ciwPort) + " /querystatus" ;
359
- vector<string> const headers = {" Content-Type: application/json" };
360
- auto const & czarConfig = cconfig::CzarConfig::instance ();
361
-
362
- LOGS (_log, LOG_LVL_DEBUG, cName (__func__) << " REQ " << jsWorkerReq);
363
- string const requestContext = " Czar: '" + http::method2string (method) + " ' stat request to '" + url + " '" ;
364
- LOGS (_log, LOG_LVL_TRACE,
365
- cName (__func__) << " czarPost url=" << url << " request=" << jsWorkerReq.dump ()
366
- << " headers=" << headers[0 ]);
367
- http::Client client (method, url, jsWorkerReq.dump (), headers);
368
- bool transmitSuccess = false ;
369
- string exceptionWhat;
370
- json response;
371
- try {
372
- LOGS (_log, LOG_LVL_DEBUG, cName (__func__) << " read start" );
373
- response = client.readAsJson ();
374
- LOGS (_log, LOG_LVL_DEBUG, cName (__func__) << " read end" );
375
- if (0 != response.at (" success" ).get <int >()) {
376
- bool startupTimeChanged = false ;
377
- startupTimeChanged = _wqsData->handleResponseJson (response);
378
- transmitSuccess = true ;
379
- if (startupTimeChanged) {
380
- LOGS (_log, LOG_LVL_WARN, cName (__func__) << " worker startupTime changed, likely rebooted." );
381
- // kill all incomplete UberJobs on this worker.
382
- czar::Czar::getCzar ()->killIncompleteUbjerJobsOn (wInf->wId );
383
- }
384
- } else {
385
- LOGS (_log, LOG_LVL_ERROR, cName (__func__) << " transmit failure response success=0 " << response);
386
- }
387
- } catch (exception const & ex) {
388
- LOGS (_log, LOG_LVL_ERROR, requestContext + " transmit failure, ex: " + ex.what ());
389
- exceptionWhat = ex.what ();
390
- }
391
- if (!transmitSuccess) {
392
- LOGS (_log, LOG_LVL_ERROR,
393
- cName (__func__) << " transmit failure " << jsWorkerReq.dump () << " resp=" << response);
394
- }
395
- }
396
-
397
- void ActiveWorker::addToDoneDeleteFiles (QueryId qId) { _wqsData->addToDoneDeleteFiles (qId); }
398
-
399
- void ActiveWorker::addToDoneKeepFiles (QueryId qId) { _wqsData->addToDoneKeepFiles (qId); }
400
-
401
- void ActiveWorker::removeDeadUberJobsFor (QueryId qId) { _wqsData->removeDeadUberJobsFor (qId); }
402
-
403
- void ActiveWorker::addDeadUberJob (QueryId qId, UberJobId ujId) {
404
- auto now = CLOCK::now ();
405
- _wqsData->addDeadUberJob (qId, ujId, now);
406
- }
407
-
408
- protojson::WorkerContactInfo::Ptr ActiveWorker::getWInfo () const {
409
- std::lock_guard lg (_aMtx);
410
- if (_wqsData == nullptr ) return nullptr ;
411
- return _wqsData->getWInfo ();
412
- }
413
-
414
- ActiveWorker::State ActiveWorker::getState () const {
415
- std::lock_guard lg (_aMtx);
416
- return _state;
417
- }
418
-
419
- string ActiveWorker::dump () const {
420
- lock_guard<mutex> lg (_aMtx);
421
- return _dump ();
422
- }
423
-
424
- string ActiveWorker::_dump () const {
425
- stringstream os;
426
- os << " ActiveWorker " << (_wqsData->dump ());
427
- return os.str ();
428
- }
429
-
430
- ActiveWorkerMap::ActiveWorkerMap (std::shared_ptr<cconfig::CzarConfig> const & czarConfig)
431
- : _timeoutAliveSecs(czarConfig->getActiveWorkerTimeoutAliveSecs ()),
432
- _timeoutDeadSecs(czarConfig->getActiveWorkerTimeoutDeadSecs ()),
433
- _maxLifetime(czarConfig->getActiveWorkerMaxLifetimeSecs ()) {}
434
-
435
- void ActiveWorkerMap::updateMap (protojson::WorkerContactInfo::WCMap const & wcMap,
436
- protojson::CzarContactInfo::Ptr const & czInfo,
437
- std::string const & replicationInstanceId,
438
- std::string const & replicationAuthKey) {
439
- // Go through wcMap, update existing entries in _awMap, create new entries for those that don't exist,
440
- lock_guard<mutex> awLg (_awMapMtx);
441
- for (auto const & [wcKey, wcVal] : wcMap) {
442
- auto iter = _awMap.find (wcKey);
443
- if (iter == _awMap.end ()) {
444
- auto newAW = ActiveWorker::create (wcVal, czInfo, replicationInstanceId, replicationAuthKey);
445
- LOGS (_log, LOG_LVL_INFO, cName (__func__) << " ActiveWorker created for " << wcKey);
446
- _awMap[wcKey] = newAW;
447
- if (_czarCancelAfterRestart) {
448
- newAW->setCzarCancelAfterRestart (_czarCancelAfterRestartCzId, _czarCancelAfterRestartQId);
449
- }
450
- } else {
451
- auto aWorker = iter->second ;
452
- if (!aWorker->compareContactInfo (*wcVal)) {
453
- // This should not happen, but try to handle it gracefully if it does.
454
- LOGS (_log, LOG_LVL_WARN,
455
- cName (__func__) << " worker contact info changed for " << wcKey
456
- << " new=" << wcVal->dump () << " old=" << aWorker->dump ());
457
- // If there is existing information, only host and port values will change.
458
- aWorker->setWorkerContactInfo (wcVal);
459
- }
460
- aWorker->getWInfo ()->setRegUpdateTime (wcVal->getRegUpdateTime ());
461
- }
462
- }
463
- }
464
-
465
239
void ActiveWorkerMap::setCzarCancelAfterRestart (CzarIdType czId, QueryId lastQId) {
466
240
_czarCancelAfterRestart = true ;
467
241
_czarCancelAfterRestartCzId = czId;
@@ -499,16 +273,6 @@ void ActiveWorkerMap::addToDoneKeepFiles(QueryId qId) {
499
273
}
500
274
}
501
275
502
- /* &&&
503
- /// &&& doc
504
- void ActiveWorkerMap::removeDeadUberJobsFor(QueryId qId) {
505
- lock_guard<mutex> lck(_awMapMtx);
506
- for (auto const& [wName, awPtr] : _awMap) {
507
- awPtr->removeDeadUberJobsFor(qId);
508
- }
509
- }
510
- */
511
-
512
276
ActiveWorkerMap::ActiveWorkerMap (std::shared_ptr<cconfig::CzarConfig> const & czarConfig)
513
277
: _timeoutAliveSecs(czarConfig->getActiveWorkerTimeoutAliveSecs ()),
514
278
_timeoutDeadSecs(czarConfig->getActiveWorkerTimeoutDeadSecs ()),
@@ -544,41 +308,4 @@ void ActiveWorkerMap::updateMap(protojson::WorkerContactInfo::WCMap const& wcMap
544
308
}
545
309
}
546
310
547
- void ActiveWorkerMap::setCzarCancelAfterRestart (CzarIdType czId, QueryId lastQId) {
548
- _czarCancelAfterRestart = true ;
549
- _czarCancelAfterRestartCzId = czId;
550
- _czarCancelAfterRestartQId = lastQId;
551
- }
552
-
553
- ActiveWorker::Ptr ActiveWorkerMap::getActiveWorker (string const & workerId) const {
554
- lock_guard<mutex> lck (_awMapMtx);
555
- auto iter = _awMap.find (workerId);
556
- if (iter == _awMap.end ()) return nullptr ;
557
- return iter->second ;
558
- }
559
-
560
- void ActiveWorkerMap::sendActiveWorkersMessages () {
561
- // Send messages to each active worker as needed
562
- lock_guard<mutex> lck (_awMapMtx);
563
- for (auto && [wName, awPtr] : _awMap) {
564
- awPtr->updateStateAndSendMessages (_timeoutAliveSecs, _timeoutDeadSecs, _maxLifetime);
565
- }
566
- }
567
-
568
- void ActiveWorkerMap::addToDoneDeleteFiles (QueryId qId) {
569
- lock_guard<mutex> lck (_awMapMtx);
570
- for (auto const & [wName, awPtr] : _awMap) {
571
- awPtr->addToDoneDeleteFiles (qId);
572
- awPtr->removeDeadUberJobsFor (qId);
573
- }
574
- }
575
-
576
- void ActiveWorkerMap::addToDoneKeepFiles (QueryId qId) {
577
- lock_guard<mutex> lck (_awMapMtx);
578
- for (auto const & [wName, awPtr] : _awMap) {
579
- awPtr->addToDoneKeepFiles (qId);
580
- awPtr->removeDeadUberJobsFor (qId);
581
- }
582
- }
583
-
584
311
} // namespace lsst::qserv::czar
0 commit comments