@@ -236,232 +236,6 @@ string ActiveWorker::_dump() const {
236236 return os.str ();
237237}
238238
239- string ActiveWorker::getStateStr (State st) {
240- switch (st) {
241- case ALIVE:
242- return string (" ALIVE" );
243- case QUESTIONABLE:
244- return string (" QUESTIONABLE" );
245- case DEAD:
246- return string (" DEAD" );
247- }
248- return string (" unknown" );
249- }
250-
251- bool ActiveWorker::compareContactInfo (http::WorkerContactInfo const & wcInfo) const {
252- lock_guard<mutex> lg (_aMtx);
253- auto wInfo_ = _wqsData->getWInfo ();
254- if (wInfo_ == nullptr ) return false ;
255- return wInfo_->isSameContactInfo (wcInfo);
256- }
257-
258- void ActiveWorker::setWorkerContactInfo (protojson::WorkerContactInfo::Ptr const & wcInfo) {
259- LOGS (_log, LOG_LVL_INFO, cName (__func__) << " new info=" << wcInfo->dump ());
260- lock_guard<mutex> lg (_aMtx);
261- _wqsData->setWInfo (wcInfo);
262- }
263-
264- void ActiveWorker::_changeStateTo (State newState, double secsSinceUpdate, string const & note) {
265- auto lLvl = (newState == DEAD) ? LOG_LVL_ERROR : LOG_LVL_INFO;
266- LOGS (_log, lLvl,
267- note << " oldState=" << getStateStr (_state) << " newState=" << getStateStr (newState)
268- << " secsSince=" << secsSinceUpdate);
269- _state = newState;
270- }
271-
272- void ActiveWorker::updateStateAndSendMessages (double timeoutAliveSecs, double timeoutDeadSecs,
273- double maxLifetime) {
274- LOGS (_log, LOG_LVL_TRACE, cName (__func__) << " start" );
275- bool newlyDeadWorker = false ;
276- protojson::WorkerContactInfo::Ptr wInfo_;
277- {
278- lock_guard<mutex> lg (_aMtx);
279- wInfo_ = _wqsData->getWInfo ();
280- if (wInfo_ == nullptr ) {
281- LOGS (_log, LOG_LVL_ERROR, cName (__func__) << " no WorkerContactInfo" );
282- return ;
283- }
284- double secsSinceUpdate = wInfo_->timeSinceRegUpdateSeconds ();
285- LOGS (_log, LOG_LVL_TRACE,
286- cName (__func__) << " wInfo=" << wInfo_->dump ()
287- << " secsSince=" << wInfo_->timeSinceRegUpdateSeconds ()
288- << " secsSinceUpdate=" << secsSinceUpdate);
289-
290- // Update the last time the registry contacted this worker.
291- // TODO:UJ - This needs to be added to the dashboard.
292- switch (_state) {
293- case ALIVE: {
294- if (secsSinceUpdate >= timeoutAliveSecs) {
295- _changeStateTo (QUESTIONABLE, secsSinceUpdate, cName (__func__));
296- }
297- break ;
298- }
299- case QUESTIONABLE: {
300- if (secsSinceUpdate < timeoutAliveSecs) {
301- _changeStateTo (ALIVE, secsSinceUpdate, cName (__func__));
302- }
303- if (secsSinceUpdate >= timeoutDeadSecs) {
304- _changeStateTo (DEAD, secsSinceUpdate, cName (__func__));
305- // All uberjobs for this worker need to die.
306- newlyDeadWorker = true ;
307- }
308- break ;
309- }
310- case DEAD: {
311- if (secsSinceUpdate < timeoutAliveSecs) {
312- _changeStateTo (ALIVE, secsSinceUpdate, cName (__func__));
313- } else {
314- // Don't waste time on this worker until the registry has heard from it.
315- return ;
316- }
317- break ;
318- }
319- }
320- }
321-
322- // _aMtx must not be held when calling this.
323- if (newlyDeadWorker) {
324- LOGS (_log, LOG_LVL_WARN,
325- cName (__func__) << " worker " << wInfo_->wId << " appears to have died, reassigning its jobs." );
326- czar::Czar::getCzar ()->killIncompleteUbjerJobsOn (wInfo_->wId );
327- }
328-
329- shared_ptr<json> jsWorkerReqPtr;
330- {
331- // Go through the _qIdDoneKeepFiles, _qIdDoneDeleteFiles, and _qIdDeadUberJobs lists to build a
332- // message to send to the worker.
333- jsWorkerReqPtr = _wqsData->serializeJson (maxLifetime);
334- }
335-
336- // Always send the message as it's a way to inform the worker that this
337- // czar is functioning and capable of receiving requests.
338- Ptr thisPtr = shared_from_this ();
339- auto sendStatusMsgFunc = [thisPtr, wInfo_, jsWorkerReqPtr](util::CmdData*) {
340- thisPtr->_sendStatusMsg (wInfo_, jsWorkerReqPtr);
341- };
342-
343- auto cmd = util::PriorityCommand::Ptr (new util::PriorityCommand (sendStatusMsgFunc));
344- auto qdisppool = czar::Czar::getCzar ()->getQdispPool ();
345- LOGS (_log, LOG_LVL_DEBUG, cName (__func__) << " queuing message" );
346- qdisppool->queCmd (cmd, 1 );
347- }
348-
349- void ActiveWorker::_sendStatusMsg (protojson::WorkerContactInfo::Ptr const & wInf,
350- std::shared_ptr<nlohmann::json> const & jsWorkerReqPtr) {
351- auto & jsWorkerReq = *jsWorkerReqPtr;
352- auto const method = http::Method::POST;
353- if (wInf == nullptr ) {
354- LOGS (_log, LOG_LVL_ERROR, cName (__func__) << " wInfo was null." );
355- return ;
356- }
357- auto [ciwId, ciwHost, ciwManag, ciwPort] = wInf->getAll ();
358- string const url = " http://" + ciwHost + " :" + to_string (ciwPort) + " /querystatus" ;
359- vector<string> const headers = {" Content-Type: application/json" };
360- auto const & czarConfig = cconfig::CzarConfig::instance ();
361-
362- LOGS (_log, LOG_LVL_DEBUG, cName (__func__) << " REQ " << jsWorkerReq);
363- string const requestContext = " Czar: '" + http::method2string (method) + " ' stat request to '" + url + " '" ;
364- LOGS (_log, LOG_LVL_TRACE,
365- cName (__func__) << " czarPost url=" << url << " request=" << jsWorkerReq.dump ()
366- << " headers=" << headers[0 ]);
367- http::Client client (method, url, jsWorkerReq.dump (), headers);
368- bool transmitSuccess = false ;
369- string exceptionWhat;
370- json response;
371- try {
372- LOGS (_log, LOG_LVL_DEBUG, cName (__func__) << " read start" );
373- response = client.readAsJson ();
374- LOGS (_log, LOG_LVL_DEBUG, cName (__func__) << " read end" );
375- if (0 != response.at (" success" ).get <int >()) {
376- bool startupTimeChanged = false ;
377- startupTimeChanged = _wqsData->handleResponseJson (response);
378- transmitSuccess = true ;
379- if (startupTimeChanged) {
380- LOGS (_log, LOG_LVL_WARN, cName (__func__) << " worker startupTime changed, likely rebooted." );
381- // kill all incomplete UberJobs on this worker.
382- czar::Czar::getCzar ()->killIncompleteUbjerJobsOn (wInf->wId );
383- }
384- } else {
385- LOGS (_log, LOG_LVL_ERROR, cName (__func__) << " transmit failure response success=0 " << response);
386- }
387- } catch (exception const & ex) {
388- LOGS (_log, LOG_LVL_ERROR, requestContext + " transmit failure, ex: " + ex.what ());
389- exceptionWhat = ex.what ();
390- }
391- if (!transmitSuccess) {
392- LOGS (_log, LOG_LVL_ERROR,
393- cName (__func__) << " transmit failure " << jsWorkerReq.dump () << " resp=" << response);
394- }
395- }
396-
397- void ActiveWorker::addToDoneDeleteFiles (QueryId qId) { _wqsData->addToDoneDeleteFiles (qId); }
398-
399- void ActiveWorker::addToDoneKeepFiles (QueryId qId) { _wqsData->addToDoneKeepFiles (qId); }
400-
401- void ActiveWorker::removeDeadUberJobsFor (QueryId qId) { _wqsData->removeDeadUberJobsFor (qId); }
402-
403- void ActiveWorker::addDeadUberJob (QueryId qId, UberJobId ujId) {
404- auto now = CLOCK::now ();
405- _wqsData->addDeadUberJob (qId, ujId, now);
406- }
407-
408- protojson::WorkerContactInfo::Ptr ActiveWorker::getWInfo () const {
409- std::lock_guard lg (_aMtx);
410- if (_wqsData == nullptr ) return nullptr ;
411- return _wqsData->getWInfo ();
412- }
413-
414- ActiveWorker::State ActiveWorker::getState () const {
415- std::lock_guard lg (_aMtx);
416- return _state;
417- }
418-
419- string ActiveWorker::dump () const {
420- lock_guard<mutex> lg (_aMtx);
421- return _dump ();
422- }
423-
424- string ActiveWorker::_dump () const {
425- stringstream os;
426- os << " ActiveWorker " << (_wqsData->dump ());
427- return os.str ();
428- }
429-
430- ActiveWorkerMap::ActiveWorkerMap (std::shared_ptr<cconfig::CzarConfig> const & czarConfig)
431- : _timeoutAliveSecs(czarConfig->getActiveWorkerTimeoutAliveSecs ()),
432- _timeoutDeadSecs(czarConfig->getActiveWorkerTimeoutDeadSecs ()),
433- _maxLifetime(czarConfig->getActiveWorkerMaxLifetimeSecs ()) {}
434-
435- void ActiveWorkerMap::updateMap (protojson::WorkerContactInfo::WCMap const & wcMap,
436- protojson::CzarContactInfo::Ptr const & czInfo,
437- std::string const & replicationInstanceId,
438- std::string const & replicationAuthKey) {
439- // Go through wcMap, update existing entries in _awMap, create new entries for those that don't exist,
440- lock_guard<mutex> awLg (_awMapMtx);
441- for (auto const & [wcKey, wcVal] : wcMap) {
442- auto iter = _awMap.find (wcKey);
443- if (iter == _awMap.end ()) {
444- auto newAW = ActiveWorker::create (wcVal, czInfo, replicationInstanceId, replicationAuthKey);
445- LOGS (_log, LOG_LVL_INFO, cName (__func__) << " ActiveWorker created for " << wcKey);
446- _awMap[wcKey] = newAW;
447- if (_czarCancelAfterRestart) {
448- newAW->setCzarCancelAfterRestart (_czarCancelAfterRestartCzId, _czarCancelAfterRestartQId);
449- }
450- } else {
451- auto aWorker = iter->second ;
452- if (!aWorker->compareContactInfo (*wcVal)) {
453- // This should not happen, but try to handle it gracefully if it does.
454- LOGS (_log, LOG_LVL_WARN,
455- cName (__func__) << " worker contact info changed for " << wcKey
456- << " new=" << wcVal->dump () << " old=" << aWorker->dump ());
457- // If there is existing information, only host and port values will change.
458- aWorker->setWorkerContactInfo (wcVal);
459- }
460- aWorker->getWInfo ()->setRegUpdateTime (wcVal->getRegUpdateTime ());
461- }
462- }
463- }
464-
465239void ActiveWorkerMap::setCzarCancelAfterRestart (CzarIdType czId, QueryId lastQId) {
466240 _czarCancelAfterRestart = true ;
467241 _czarCancelAfterRestartCzId = czId;
@@ -499,16 +273,6 @@ void ActiveWorkerMap::addToDoneKeepFiles(QueryId qId) {
499273 }
500274}
501275
502- /* &&&
503- /// &&& doc
504- void ActiveWorkerMap::removeDeadUberJobsFor(QueryId qId) {
505- lock_guard<mutex> lck(_awMapMtx);
506- for (auto const& [wName, awPtr] : _awMap) {
507- awPtr->removeDeadUberJobsFor(qId);
508- }
509- }
510- */
511-
512276ActiveWorkerMap::ActiveWorkerMap (std::shared_ptr<cconfig::CzarConfig> const & czarConfig)
513277 : _timeoutAliveSecs(czarConfig->getActiveWorkerTimeoutAliveSecs ()),
514278 _timeoutDeadSecs(czarConfig->getActiveWorkerTimeoutDeadSecs ()),
@@ -544,41 +308,4 @@ void ActiveWorkerMap::updateMap(protojson::WorkerContactInfo::WCMap const& wcMap
544308 }
545309}
546310
547- void ActiveWorkerMap::setCzarCancelAfterRestart (CzarIdType czId, QueryId lastQId) {
548- _czarCancelAfterRestart = true ;
549- _czarCancelAfterRestartCzId = czId;
550- _czarCancelAfterRestartQId = lastQId;
551- }
552-
553- ActiveWorker::Ptr ActiveWorkerMap::getActiveWorker (string const & workerId) const {
554- lock_guard<mutex> lck (_awMapMtx);
555- auto iter = _awMap.find (workerId);
556- if (iter == _awMap.end ()) return nullptr ;
557- return iter->second ;
558- }
559-
560- void ActiveWorkerMap::sendActiveWorkersMessages () {
561- // Send messages to each active worker as needed
562- lock_guard<mutex> lck (_awMapMtx);
563- for (auto && [wName, awPtr] : _awMap) {
564- awPtr->updateStateAndSendMessages (_timeoutAliveSecs, _timeoutDeadSecs, _maxLifetime);
565- }
566- }
567-
568- void ActiveWorkerMap::addToDoneDeleteFiles (QueryId qId) {
569- lock_guard<mutex> lck (_awMapMtx);
570- for (auto const & [wName, awPtr] : _awMap) {
571- awPtr->addToDoneDeleteFiles (qId);
572- awPtr->removeDeadUberJobsFor (qId);
573- }
574- }
575-
576- void ActiveWorkerMap::addToDoneKeepFiles (QueryId qId) {
577- lock_guard<mutex> lck (_awMapMtx);
578- for (auto const & [wName, awPtr] : _awMap) {
579- awPtr->addToDoneKeepFiles (qId);
580- awPtr->removeDeadUberJobsFor (qId);
581- }
582- }
583-
584311} // namespace lsst::qserv::czar
0 commit comments