55 "context"
66 "encoding/json"
77 "errors"
8+ "fmt"
89 "net/url"
910 "os"
1011 "regexp"
@@ -21,13 +22,18 @@ import (
2122 "github.com/viamrobotics/agent/subsystems/viamserver"
2223 "github.com/viamrobotics/agent/utils"
2324 pb "go.viam.com/api/app/agent/v1"
25+ apppb "go.viam.com/api/app/v1"
2426 "go.viam.com/rdk/logging"
2527 goutils "go.viam.com/utils"
2628 "go.viam.com/utils/rpc"
2729)
2830
2931const (
30- minimalCheckInterval = time .Second * 5
32+ // The minimal (and default) interval for checking for config updates via DeviceAgentConfig.
33+ minimalDeviceAgentConfigCheckInterval = time .Second * 5
34+ // The minimal (and default) interval for checking whether agent needs to be restarted.
35+ minimalNeedsRestartCheckInterval = time .Second * 1
36+
3137 defaultNetworkTimeout = time .Second * 15
3238 // stopAllTimeout must be lower than systemd subsystems/viamagent/viam-agent.service timeout of 4mins
3339 // and higher than subsystems/viamserver/viamserver.go timeout of 2mins.
@@ -42,7 +48,6 @@ type Manager struct {
4248
4349 connMu sync.RWMutex
4450 conn rpc.ClientConn
45- client pb.AgentDeviceServiceClient
4651 cloudConfig * logging.CloudConfig
4752
4853 logger logging.Logger
@@ -209,7 +214,7 @@ func (m *Manager) SubsystemUpdates(ctx context.Context) {
209214 m .logger .Warn (err )
210215 }
211216 if m .viamAgentNeedsRestart {
212- m .Exit ()
217+ m .Exit (fmt . Sprintf ( "A new version of %s has been installed" , SubsystemName ) )
213218 return
214219 }
215220 } else {
@@ -221,17 +226,19 @@ func (m *Manager) SubsystemUpdates(ctx context.Context) {
221226 needRestartConfigChange := m .viamServer .Update (ctx , m .cfg )
222227
223228 if needRestart || needRestartConfigChange || m .viamServerNeedsRestart || m .viamAgentNeedsRestart {
224- if m .viamServer .(viamserver.RestartCheck ).SafeToRestart (ctx ) {
229+ if m .viamServer .Property (ctx , viamserver .RestartPropertyRestartAllowed ) {
230+ m .logger .Infof ("%s has allowed a restart; will restart" , viamserver .SubsysName )
225231 if err := m .viamServer .Stop (ctx ); err != nil {
226232 m .logger .Warn (err )
227233 } else {
228234 m .viamServerNeedsRestart = false
229235 }
230236 if m .viamAgentNeedsRestart {
231- m .Exit ()
237+ m .Exit (fmt . Sprintf ( "A new version of %s has been installed" , SubsystemName ) )
232238 return
233239 }
234240 } else {
241+ m .logger .Warnf ("%s has NOT allowed a restart; will NOT restart" , viamserver .SubsysName )
235242 m .viamServerNeedsRestart = true
236243 }
237244 }
@@ -280,26 +287,26 @@ func (m *Manager) SubsystemUpdates(ctx context.Context) {
280287// CheckUpdates retrieves an updated config from the cloud, and then passes it to SubsystemUpdates().
281288func (m * Manager ) CheckUpdates (ctx context.Context ) time.Duration {
282289 defer utils .Recover (m .logger , nil )
283- m .logger .Debug ("Checking cloud for update " )
284- interval , err := m .GetConfig (ctx )
290+ m .logger .Debug ("Checking cloud for device agent config updates " )
291+ deviceAgentConfigCheckInterval , err := m .GetConfig (ctx )
285292
286- if interval < minimalCheckInterval {
287- interval = minimalCheckInterval
293+ if deviceAgentConfigCheckInterval < minimalDeviceAgentConfigCheckInterval {
294+ deviceAgentConfigCheckInterval = minimalDeviceAgentConfigCheckInterval
288295 }
289296
290297 // randomly fuzz the interval by +/- 5%
291- interval = utils .FuzzTime (interval , 0.05 )
298+ deviceAgentConfigCheckInterval = utils .FuzzTime (deviceAgentConfigCheckInterval , 0.05 )
292299
293300 // we already log in all error cases inside GetConfig, so
294301 // no need to log again.
295302 if err != nil {
296- return interval
303+ return deviceAgentConfigCheckInterval
297304 }
298305
299306 // update and (re)start subsystems
300307 m .SubsystemUpdates (ctx )
301308
302- return interval
309+ return deviceAgentConfigCheckInterval
303310}
304311
305312func (m * Manager ) setDebug (debug bool ) {
@@ -380,13 +387,51 @@ func (m *Manager) SubsystemHealthChecks(ctx context.Context) {
380387 }
381388}
382389
390+ // CheckIfNeedsRestart returns the check restart interval and whether the agent (and
391+ // therefore all its subsystems) has been forcibly restarted by app.
392+ func (m * Manager ) CheckIfNeedsRestart (ctx context.Context ) (time.Duration , bool ) {
393+ m .logger .Debug ("Checking cloud for forced restarts" )
394+ if m .cloudConfig == nil {
395+ m .logger .Warn ("can't CheckIfNeedsRestart until successful config load" )
396+ return minimalNeedsRestartCheckInterval , false
397+ }
398+
399+ // Only continue this check if viam-server does not handle restart checking itself
400+ // (return early if viamserver _does_ handle restart checking).
401+ if ! m .viamServer .Property (ctx , viamserver .RestartPropertyDoesNotHandleNeedsRestart ) {
402+ return minimalNeedsRestartCheckInterval , false
403+ }
404+
405+ m .logger .Debug ("Checking cloud for forced restarts" )
406+ timeoutCtx , cancelFunc := context .WithTimeout (ctx , defaultNetworkTimeout )
407+ defer cancelFunc ()
408+
409+ if err := m .dial (timeoutCtx ); err != nil {
410+ m .logger .Warn (errw .Wrapf (err , "dialing to check if restart needed" ))
411+ return minimalNeedsRestartCheckInterval , false
412+ }
413+
414+ robotServiceClient := apppb .NewRobotServiceClient (m .conn )
415+ req := & apppb.NeedsRestartRequest {Id : m .cloudConfig .ID }
416+ res , err := robotServiceClient .NeedsRestart (timeoutCtx , req )
417+ if err != nil {
418+ m .logger .Warn (errw .Wrapf (err , "checking if restart needed" ))
419+ return minimalNeedsRestartCheckInterval , false
420+ }
421+
422+ return res .GetRestartCheckInterval ().AsDuration (), res .GetMustRestart ()
423+ }
424+
383425// CloseAll stops all subsystems and closes the cloud connection.
384426func (m * Manager ) CloseAll () {
385427 ctx , cancel := context .WithCancel (context .Background ())
386428
387429 // Use a slow goroutine watcher to log and continue if shutdown is taking too long.
388430 slowWatcher , slowWatcherCancel := goutils .SlowGoroutineWatcher (
389- stopAllTimeout , "Agent is taking a while to shut down," , m .logger )
431+ stopAllTimeout ,
432+ fmt .Sprintf ("Viam agent subsystems and/or background workers failed to shut down within %v" , stopAllTimeout ),
433+ m .logger ,
434+ )
390435
391436 slowTicker := time .NewTicker (10 * time .Second )
392437 defer slowTicker .Stop ()
@@ -430,7 +475,6 @@ func (m *Manager) CloseAll() {
430475 }
431476 }
432477
433- m .client = nil
434478 m .conn = nil
435479 })
436480
@@ -479,7 +523,8 @@ func (m *Manager) CloseAll() {
479523 }
480524}
481525
482- // StartBackgroundChecks kicks off a go routine that loops on a timer to check for updates and health checks.
526+ // StartBackgroundChecks kicks off go routines that loop on a timerr to check for updates,
527+ // health checks, and restarts.
483528func (m * Manager ) StartBackgroundChecks (ctx context.Context ) {
484529 if ctx .Err () != nil {
485530 return
@@ -495,18 +540,18 @@ func (m *Manager) StartBackgroundChecks(ctx context.Context) {
495540 })
496541 defer m .activeBackgroundWorkers .Done ()
497542
498- checkInterval := minimalCheckInterval
543+ deviceAgentConfigCheckInterval := minimalDeviceAgentConfigCheckInterval
499544 m .cfgMu .RLock ()
500545 wait := m .cfg .AdvancedSettings .WaitForUpdateCheck .Get ()
501546 m .cfgMu .RUnlock ()
502547 if wait {
503- checkInterval = m .CheckUpdates (ctx )
548+ deviceAgentConfigCheckInterval = m .CheckUpdates (ctx )
504549 } else {
505550 // premptively start things before we go into the regular update/check/restart
506551 m .SubsystemHealthChecks (ctx )
507552 }
508553
509- timer := time .NewTimer (checkInterval )
554+ timer := time .NewTimer (deviceAgentConfigCheckInterval )
510555 defer timer .Stop ()
511556 for {
512557 if ctx .Err () != nil {
@@ -516,9 +561,39 @@ func (m *Manager) StartBackgroundChecks(ctx context.Context) {
516561 case <- ctx .Done ():
517562 return
518563 case <- timer .C :
519- checkInterval = m .CheckUpdates (ctx )
564+ deviceAgentConfigCheckInterval = m .CheckUpdates (ctx )
520565 m .SubsystemHealthChecks (ctx )
521- timer .Reset (checkInterval )
566+ timer .Reset (deviceAgentConfigCheckInterval )
567+ }
568+ }
569+ }()
570+
571+ m .activeBackgroundWorkers .Add (1 )
572+ go func () {
573+ defer m .activeBackgroundWorkers .Done ()
574+
575+ timer := time .NewTimer (minimalNeedsRestartCheckInterval )
576+ defer timer .Stop ()
577+ for {
578+ if ctx .Err () != nil {
579+ return
580+ }
581+ select {
582+ case <- ctx .Done ():
583+ return
584+ case <- timer .C :
585+ needsRestartCheckInterval , needsRestart := m .CheckIfNeedsRestart (ctx )
586+ if needsRestartCheckInterval < minimalNeedsRestartCheckInterval {
587+ needsRestartCheckInterval = minimalNeedsRestartCheckInterval
588+ }
589+ if needsRestart {
590+ // Do not mark m.agentNeedsRestart and instead Exit immediately; we do not want
591+ // to wait for viam-server to allow a restart as it may be in a bad state.
592+ m .Exit (fmt .Sprintf ("A restart of %s was requested from app" , SubsystemName ))
593+ }
594+ // As with the device agent config check interval, randomly fuzz the interval by
595+ // +/- 5%.
596+ timer .Reset (utils .FuzzTime (needsRestartCheckInterval , 0.05 ))
522597 }
523598 }
524599 }()
@@ -531,11 +606,11 @@ func (m *Manager) dial(ctx context.Context) error {
531606 return ctx .Err ()
532607 }
533608 if m .cloudConfig == nil {
534- return errors .New ("cannot dial() until successful LoadConfig " )
609+ return errors .New ("cannot dial() until successful config load " )
535610 }
536611 m .connMu .Lock ()
537612 defer m .connMu .Unlock ()
538- if m .client != nil {
613+ if m .conn != nil {
539614 return nil
540615 }
541616
@@ -564,7 +639,6 @@ func (m *Manager) dial(ctx context.Context) error {
564639 return err
565640 }
566641 m .conn = conn
567- m .client = pb .NewAgentDeviceServiceClient (m .conn )
568642
569643 if m .netAppender != nil {
570644 m .netAppender .SetConn (conn , true )
@@ -577,27 +651,28 @@ func (m *Manager) dial(ctx context.Context) error {
577651// GetConfig retrieves the configuration from the cloud.
578652func (m * Manager ) GetConfig (ctx context.Context ) (time.Duration , error ) {
579653 if m .cloudConfig == nil {
580- err := errors .New ("can't GetConfig until successful LoadConfig " )
654+ err := errors .New ("can't GetConfig until successful config load " )
581655 m .logger .Warn (err )
582- return minimalCheckInterval , err
656+ return minimalDeviceAgentConfigCheckInterval , err
583657 }
584658 timeoutCtx , cancelFunc := context .WithTimeout (ctx , defaultNetworkTimeout )
585659 defer cancelFunc ()
586660
587661 if err := m .dial (timeoutCtx ); err != nil {
588- m .logger .Warn (errw .Wrapf (err , "fetching %s config" , SubsystemName ))
589- return minimalCheckInterval , err
662+ m .logger .Warn (errw .Wrapf (err , "dialing to fetch %s config" , SubsystemName ))
663+ return minimalDeviceAgentConfigCheckInterval , err
590664 }
591665
666+ agentDeviceServiceClient := pb .NewAgentDeviceServiceClient (m .conn )
592667 req := & pb.DeviceAgentConfigRequest {
593668 Id : m .cloudConfig .ID ,
594669 HostInfo : m .getHostInfo (),
595670 VersionInfo : m .getVersions (),
596671 }
597- resp , err := m . client .DeviceAgentConfig (timeoutCtx , req )
672+ resp , err := agentDeviceServiceClient .DeviceAgentConfig (timeoutCtx , req )
598673 if err != nil {
599674 m .logger .Warn (errw .Wrapf (err , "fetching %s config" , SubsystemName ))
600- return minimalCheckInterval , err
675+ return minimalDeviceAgentConfigCheckInterval , err
601676 }
602677 fixWindowsPaths (resp )
603678
@@ -699,7 +774,7 @@ func (m *Manager) getVersions() *pb.VersionInfo {
699774 return vers
700775}
701776
702- func (m * Manager ) Exit () {
703- m .logger .Info ( "A new viam-agent has been installed. Will now exit to be restarted by service manager." )
777+ func (m * Manager ) Exit (reason string ) {
778+ m .logger .Infow ( fmt . Sprintf ( "%s will now exit to be restarted by service manager" , SubsystemName ), "reason" , reason )
704779 m .globalCancel ()
705780}
0 commit comments