Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 65 additions & 10 deletions calico-vpp-agent/cmd/calico_vpp_dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,12 @@ import (
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common"
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/connectivity"
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/felix"
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/health"
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/prometheus"
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/routing"
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/services"
"github.com/projectcalico/vpp-dataplane/v3/config"

watchdog "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/watch_dog"
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/watchers"
"github.com/projectcalico/vpp-dataplane/v3/config"
)

/*
Expand Down Expand Up @@ -83,19 +82,32 @@ func main() {
log.Fatalf("Error writing pidfile: %v", err)
}

/**
* Start health check server
*/
healthServer := health.NewHealthServer(
log.WithFields(logrus.Fields{"component": "health"}),
*config.GetCalicoVppInitialConfig().HealthCheckPort,
)
Go(healthServer.ServeHealth)

/**
* Connect to VPP & wait for it to be up
*/
vpp, err := common.CreateVppLink(config.VppAPISocket, log.WithFields(logrus.Fields{"component": "vpp-api"}))
if err != nil {
log.Fatalf("Cannot create VPP client: %v", err)
}
healthServer.SetComponentStatus(health.ComponentVPP, true, "VPP connection established")

// Once we have the api connection, we know vpp & vpp-manager are running and the
// state is accurately reported. Wait for vpp-manager to finish the config.
common.VppManagerInfo, err = common.WaitForVppManager()
if err != nil {
log.Fatalf("Vpp Manager not started: %v", err)
}
healthServer.SetComponentStatus(health.ComponentVPPManager, true, "VPP Manager ready")

common.ThePubSub = common.NewPubSub(log.WithFields(logrus.Fields{"component": "pubsub"}))

/**
Expand Down Expand Up @@ -164,15 +176,50 @@ func main() {
routingServer.SetBGPConf(bgpConf)
serviceServer.SetBGPConf(bgpConf)

watchDog := watchdog.NewWatchDog(log.WithFields(logrus.Fields{"component": "watchDog"}), &t)
Go(felixServer.ServeFelix)
felixConfig := watchDog.Wait(felixServer.FelixConfigChan, "Waiting for FelixConfig to be provided by the calico pod")
ourBGPSpec := watchDog.Wait(felixServer.GotOurNodeBGPchan, "Waiting for bgp spec to be provided on node add")
// check if the watchDog timer has issued the t.Kill() which would mean we are dead
if !t.Alive() {
log.Fatal("WatchDog timed out waiting for config from felix. Exiting...")

/*
* Mark as unhealthy while waiting for Felix config
* Kubernetes startup probe handles pod restart if needed
*/
healthServer.MarkAsUnhealthy("Waiting for Felix configuration")
log.Info("Waiting for Felix configuration...")

ticker := time.NewTicker(20 * time.Second)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can consider reducing the interval; 20s might be too long for retries

defer ticker.Stop()

var felixConfig interface{}
var ourBGPSpec interface{}
felixConfigReceived := false
bgpSpecReceived := false

for !felixConfigReceived || !bgpSpecReceived {
select {
case value := <-felixServer.FelixConfigChan:
felixConfig = value
felixConfigReceived = true
log.Info("FelixConfig received from calico pod")
case value := <-felixServer.GotOurNodeBGPchan:
ourBGPSpec = value
bgpSpecReceived = true
log.Info("BGP spec received from node add")
case <-t.Dying():
log.Error("Tomb dying while waiting for Felix config")
return
case <-ticker.C:
if !felixConfigReceived {
log.Info("Still waiting for FelixConfig from calico pod...")
}
if !bgpSpecReceived {
log.Info("Still waiting for BGP spec from node add...")
}
}
}

healthServer.MarkAsHealthy("Felix configuration received")
healthServer.SetComponentStatus(health.ComponentFelix, true, "Felix config received")
log.Info("Felix configuration received")

if ourBGPSpec != nil {
bgpSpec, ok := ourBGPSpec.(*common.LocalNodeSpec)
if !ok {
Expand All @@ -189,7 +236,14 @@ func main() {

if *config.GetCalicoVppFeatureGates().MultinetEnabled {
Go(netWatcher.WatchNetworks)
watchDog.Wait(netWatcher.InSync, "Waiting for networks to be listed and synced")
log.Info("Waiting for networks to be listed and synced...")
select {
case <-netWatcher.InSync:
log.Info("Networks synced")
case <-t.Dying():
log.Error("Tomb dying while waiting for networks sync")
return
}
}

if felixConfig != nil {
Expand Down Expand Up @@ -218,6 +272,7 @@ func main() {
Go(localSIDWatcher.WatchLocalSID)
}

healthServer.SetComponentStatus(health.ComponentAgent, true, "Agent ready")
log.Infof("Agent started")

sigChan := make(chan os.Signal, 2)
Expand Down
259 changes: 259 additions & 0 deletions calico-vpp-agent/health/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
// Copyright (C) 2025 Cisco Systems Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package health

import (
"encoding/json"
"fmt"
"net"
"net/http"
"sync"
"time"

"github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"
)

// HealthStatus represents the current health state
type HealthStatus struct {
Healthy bool `json:"healthy"`
Ready bool `json:"ready"`
Components map[string]ComponentStatus `json:"components"`
Message string `json:"message,omitempty"`
LastUpdate time.Time `json:"lastUpdate"`
}

// ComponentStatus tracks the status of individual components
type ComponentStatus struct {
Initialized bool `json:"initialized"`
Message string `json:"message,omitempty"`
UpdatedAt time.Time `json:"updatedAt"`
}

// HealthServer provides HTTP health check endpoints
type HealthServer struct {
log *logrus.Entry
port uint32
status HealthStatus
statusMutex sync.RWMutex
server *http.Server
}

const (
ComponentVPP = "vpp"
ComponentVPPManager = "vpp-manager"
ComponentFelix = "felix"
ComponentAgent = "agent"
)

// NewHealthServer creates a new health check server
func NewHealthServer(log *logrus.Entry, port uint32) *HealthServer {
return &HealthServer{
log: log,
port: port,
status: HealthStatus{
Healthy: true,
Ready: false,
Components: make(map[string]ComponentStatus),
LastUpdate: time.Now(),
},
}
}

// SetComponentStatus updates the status of a specific component
func (hs *HealthServer) SetComponentStatus(component string, initialized bool, message string) {
hs.statusMutex.Lock()
defer hs.statusMutex.Unlock()

hs.status.Components[component] = ComponentStatus{
Initialized: initialized,
Message: message,
UpdatedAt: time.Now(),
}
hs.status.LastUpdate = time.Now()

// Update overall readiness
hs.updateReadiness()

hs.log.WithFields(logrus.Fields{
"component": component,
"initialized": initialized,
"message": message,
}).Debug("Component status updated")
}

// updateReadiness determines overall readiness based on component status
func (hs *HealthServer) updateReadiness() {
// Required components for readiness
requiredComponents := []string{
ComponentVPP,
ComponentVPPManager,
ComponentFelix,
ComponentAgent,
}

allReady := true
for _, comp := range requiredComponents {
status, exists := hs.status.Components[comp]
if !exists || !status.Initialized {
allReady = false
break
}
}

hs.status.Ready = allReady

if allReady {
hs.status.Message = "All components initialized"
} else {
hs.status.Message = "Waiting for components to initialize"
}
}

// MarkAsHealthy marks the agent as healthy (but not necessarily ready)
func (hs *HealthServer) MarkAsHealthy(message string) {
hs.statusMutex.Lock()
defer hs.statusMutex.Unlock()

hs.status.Healthy = true
if message != "" {
hs.status.Message = message
} else {
hs.status.Message = "Agent is healthy"
}
hs.status.LastUpdate = time.Now()

hs.log.WithField("message", message).Info("Agent marked as healthy")
}

// MarkAsUnhealthy marks the agent as unhealthy
func (hs *HealthServer) MarkAsUnhealthy(reason string) {
hs.statusMutex.Lock()
defer hs.statusMutex.Unlock()

hs.status.Healthy = false
hs.status.Ready = false
hs.status.Message = reason
hs.status.LastUpdate = time.Now()

hs.log.WithField("reason", reason).Warn("Agent marked as unhealthy")
}

// GetStatus returns the current health status (thread-safe)
func (hs *HealthServer) GetStatus() HealthStatus {
hs.statusMutex.RLock()
defer hs.statusMutex.RUnlock()

// Create a copy to avoid race conditions
statusCopy := hs.status
statusCopy.Components = make(map[string]ComponentStatus)
for k, v := range hs.status.Components {
statusCopy.Components[k] = v
}

return statusCopy
}

// livenessHandler handles the /liveness endpoint
func (hs *HealthServer) livenessHandler(w http.ResponseWriter, r *http.Request) {
status := hs.GetStatus()

if status.Healthy {
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, "OK")
} else {
w.WriteHeader(http.StatusServiceUnavailable)
fmt.Fprintf(w, "Unhealthy: %s", status.Message)
}
}

// readinessHandler handles the /readiness endpoint
func (hs *HealthServer) readinessHandler(w http.ResponseWriter, r *http.Request) {
status := hs.GetStatus()

if status.Ready {
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, "Ready")
} else {
w.WriteHeader(http.StatusServiceUnavailable)
fmt.Fprintf(w, "Not ready: %s", status.Message)
}
}

// statusHandler handles the /status endpoint (detailed JSON)
func (hs *HealthServer) statusHandler(w http.ResponseWriter, r *http.Request) {
status := hs.GetStatus()

w.Header().Set("Content-Type", "application/json")

httpStatus := http.StatusOK
if !status.Ready {
httpStatus = http.StatusServiceUnavailable
}
w.WriteHeader(httpStatus)

if err := json.NewEncoder(w).Encode(status); err != nil {
hs.log.WithError(err).Error("Failed to encode status response")
}
}

func (hs *HealthServer) ServeHealth(t *tomb.Tomb) error {
mux := http.NewServeMux()
mux.HandleFunc("/liveness", hs.livenessHandler)
mux.HandleFunc("/readiness", hs.readinessHandler)
mux.HandleFunc("/status", hs.statusHandler)

// Create TCP listener for the health server
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", hs.port))
if err != nil {
// Try with a retry mechanism
for i := 0; i < 3; i++ {
hs.log.Warnf("Failed to bind to port %d, retrying in 5 seconds...", hs.port)
time.Sleep(5 * time.Second)
listener, err = net.Listen("tcp", fmt.Sprintf(":%d", hs.port))
if err == nil {
break
}
}
if err != nil {
return fmt.Errorf("health server error: %w", err)
}
}

hs.server = &http.Server{
Addr: fmt.Sprintf(":%d", hs.port),
Handler: mux,
}

hs.log.Infof("Starting health check server on port %d", hs.port)

// Start server with our custom listener
errChan := make(chan error, 1)
go func() {
if err := hs.server.Serve(listener); err != nil && err != http.ErrServerClosed {
errChan <- err
}
}()

// Wait for tomb to die or server error
select {
case <-t.Dying():
hs.log.Info("Shutting down health check server")
return hs.server.Close()
case err := <-errChan:
return fmt.Errorf("health server error: %w", err)
}
}
Loading
Loading