From d64a12dd5b7d76d2ac76850dea6c85b009696ff0 Mon Sep 17 00:00:00 2001 From: Aritra Basu Date: Thu, 11 Sep 2025 12:45:31 -0700 Subject: [PATCH] refactor: make BGP peer and secret management system event-driven Signed-off-by: Aritra Basu --- calico-vpp-agent/cmd/calico_vpp_dataplane.go | 3 +- calico-vpp-agent/common/pubsub.go | 16 +- calico-vpp-agent/common/types.go | 43 ++ calico-vpp-agent/felix/felix_server.go | 68 +++ calico-vpp-agent/routing/bgp_watcher.go | 6 + calico-vpp-agent/routing/peer_handler.go | 556 +++++++++++++++++++ calico-vpp-agent/routing/routing_server.go | 7 + calico-vpp-agent/watchers/peers_watcher.go | 478 +++------------- calico-vpp-agent/watchers/secret_watcher.go | 71 +-- go.mod | 2 +- 10 files changed, 803 insertions(+), 447 deletions(-) create mode 100644 calico-vpp-agent/routing/peer_handler.go diff --git a/calico-vpp-agent/cmd/calico_vpp_dataplane.go b/calico-vpp-agent/cmd/calico_vpp_dataplane.go index bae4d9c2..6ef165a7 100644 --- a/calico-vpp-agent/cmd/calico_vpp_dataplane.go +++ b/calico-vpp-agent/cmd/calico_vpp_dataplane.go @@ -159,10 +159,11 @@ func main() { log.Fatalf("cannot get default BGP config %s", err) } - peerWatcher.SetBGPConf(bgpConf) routingServer.SetBGPConf(bgpConf) felixServer.SetBGPConf(bgpConf) + routingServer.SetPeerHandler(felixServer.GetPeerHandler()) + watchDog := watchdog.NewWatchDog(log.WithFields(logrus.Fields{"component": "watchDog"}), &t) Go(felixServer.ServeFelix) Go(felixWatcher.WatchFelix) diff --git a/calico-vpp-agent/common/pubsub.go b/calico-vpp-agent/common/pubsub.go index 2ac7b253..18db9456 100644 --- a/calico-vpp-agent/common/pubsub.go +++ b/calico-vpp-agent/common/pubsub.go @@ -45,10 +45,9 @@ const ( TunnelAdded CalicoVppEventType = "TunnelAdded" TunnelDeleted CalicoVppEventType = "TunnelDeleted" - BGPPeerAdded CalicoVppEventType = "BGPPeerAdded" - BGPPeerDeleted CalicoVppEventType = "BGPPeerDeleted" - BGPPeerUpdated CalicoVppEventType = "BGPPeerUpdated" - BGPSecretChanged CalicoVppEventType = "BGPSecretChanged" + BGPPeerAdded CalicoVppEventType = "BGPPeerAdded" + BGPPeerDeleted CalicoVppEventType = "BGPPeerDeleted" + BGPPeerUpdated CalicoVppEventType = "BGPPeerUpdated" BGPFilterAddedOrUpdated CalicoVppEventType = "BGPFilterAddedOrUpdated" BGPFilterDeleted CalicoVppEventType = "BGPFilterDeleted" @@ -65,6 +64,15 @@ const ( IpamPoolUpdate CalicoVppEventType = "IpamPoolUpdate" IpamPoolRemove CalicoVppEventType = "IpamPoolRemove" + + PeersChanged CalicoVppEventType = "PeersChanged" + PeerAdded CalicoVppEventType = "PeerAdded" + PeerUpdated CalicoVppEventType = "PeerUpdated" + PeerDeleted CalicoVppEventType = "PeerDeleted" + + SecretAdded CalicoVppEventType = "SecretAdded" + SecretChanged CalicoVppEventType = "SecretChanged" + SecretDeleted CalicoVppEventType = "SecretDeleted" ) var ( diff --git a/calico-vpp-agent/common/types.go b/calico-vpp-agent/common/types.go index 0033252c..13fcad24 100644 --- a/calico-vpp-agent/common/types.go +++ b/calico-vpp-agent/common/types.go @@ -16,6 +16,7 @@ package common import ( + calicov3 "github.com/projectcalico/api/pkg/apis/projectcalico/v3" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -70,3 +71,45 @@ type ServiceEndpointsUpdate struct { type ServiceEndpointsDelete struct { Meta *metav1.ObjectMeta } + +// PeersChangedEvent is emitted when the list of BGP peers changes +type PeersChangedEvent struct { + Peers []calicov3.BGPPeer +} + +// PeerAddedEvent is emitted when a BGP peer is added +type PeerAddedEvent struct { + Peer calicov3.BGPPeer +} + +// PeerUpdatedEvent is emitted when a BGP peer is updated +type PeerUpdatedEvent struct { + Old calicov3.BGPPeer + New calicov3.BGPPeer +} + +// PeerDeletedEvent is emitted when a BGP peer is deleted +type PeerDeletedEvent struct { + Peer calicov3.BGPPeer +} + +// SecretData represents secret information +type SecretData struct { + Name string + Data map[string][]byte +} + +// SecretAddedEvent is emitted when a secret is added +type SecretAddedEvent struct { + Secret *SecretData +} + +// SecretChangedEvent is emitted when a secret changes +type SecretChangedEvent struct { + SecretName string +} + +// SecretDeletedEvent is emitted when a secret is deleted +type SecretDeletedEvent struct { + SecretName string +} diff --git a/calico-vpp-agent/felix/felix_server.go b/calico-vpp-agent/felix/felix_server.go index e13fc128..47703366 100644 --- a/calico-vpp-agent/felix/felix_server.go +++ b/calico-vpp-agent/felix/felix_server.go @@ -34,6 +34,7 @@ import ( "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/felix/connectivity" "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/felix/policies" "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/felix/services" + "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/routing" "github.com/projectcalico/vpp-dataplane/v3/config" "github.com/projectcalico/vpp-dataplane/v3/vpplink" "github.com/projectcalico/vpp-dataplane/v3/vpplink/types" @@ -59,6 +60,7 @@ type Server struct { cniHandler *cni.CNIHandler connectivityHandler *connectivity.ConnectivityHandler serviceHandler *services.ServiceHandler + peerHandler *routing.PeerHandler } // NewFelixServer creates a felix server @@ -77,6 +79,7 @@ func NewFelixServer(vpp *vpplink.VppLink, clientv3 calicov3cli.Interface, log *l cniHandler: cni.NewCNIHandler(vpp, cache, log), connectivityHandler: connectivity.NewConnectivityHandler(vpp, cache, clientv3, log), serviceHandler: services.NewServiceHandler(vpp, cache, log), + peerHandler: routing.NewPeerHandler(cache, log), } reg := common.RegisterHandler(server.felixServerEventChan, "felix server events") @@ -91,6 +94,13 @@ func NewFelixServer(vpp *vpplink.VppLink, clientv3 calicov3cli.Interface, log *l common.ConnectivityDeleted, common.SRv6PolicyAdded, common.SRv6PolicyDeleted, + common.PeersChanged, + common.PeerAdded, + common.PeerUpdated, + common.PeerDeleted, + common.SecretAdded, + common.SecretChanged, + common.SecretDeleted, ) return server @@ -104,6 +114,10 @@ func (s *Server) GetCache() *cache.Cache { return s.cache } +func (s *Server) GetPeerHandler() *routing.PeerHandler { + return s.peerHandler +} + func (s *Server) SetBGPConf(bgpConf *calicov3.BGPConfigurationSpec) { s.cache.BGPConf = bgpConf } @@ -404,6 +418,60 @@ func (s *Server) handleFelixServerEvents(msg interface{}) (err error) { if err != nil { s.log.Errorf("Error while deleting SRv6 Policy %s", err) } + case common.PeersChanged: + peersEvent, ok := evt.New.(*common.PeersChangedEvent) + if !ok { + return fmt.Errorf("evt.New is not a (*common.PeersChangedEvent) %v", evt.New) + } + err := s.peerHandler.ProcessPeers(peersEvent.Peers) + if err != nil { + s.log.Errorf("Error processing peers: %v", err) + } + case common.PeerAdded: + peerEvent, ok := evt.New.(*common.PeerAddedEvent) + if !ok { + return fmt.Errorf("evt.New is not a (*common.PeerAddedEvent) %v", evt.New) + } + err := s.peerHandler.OnPeerAdded(&peerEvent.Peer) + if err != nil { + s.log.Errorf("Error adding peer: %v", err) + } + case common.PeerUpdated: + peerEvent, ok := evt.New.(*common.PeerUpdatedEvent) + if !ok { + return fmt.Errorf("evt.New is not a (*common.PeerUpdatedEvent) %v", evt.New) + } + err := s.peerHandler.OnPeerUpdated(&peerEvent.Old, &peerEvent.New) + if err != nil { + s.log.Errorf("Error updating peer: %v", err) + } + case common.PeerDeleted: + peerEvent, ok := evt.New.(*common.PeerDeletedEvent) + if !ok { + return fmt.Errorf("evt.New is not a (*common.PeerDeletedEvent) %v", evt.New) + } + err := s.peerHandler.OnPeerDeleted(&peerEvent.Peer) + if err != nil { + s.log.Errorf("Error deleting peer: %v", err) + } + case common.SecretAdded: + secretEvent, ok := evt.New.(*common.SecretAddedEvent) + if !ok { + return fmt.Errorf("evt.New is not a (*common.SecretAddedEvent) %v", evt.New) + } + s.peerHandler.OnSecretAdded(secretEvent.Secret) + case common.SecretChanged: + secretEvent, ok := evt.New.(*common.SecretChangedEvent) + if !ok { + return fmt.Errorf("evt.New is not a (*common.SecretChangedEvent) %v", evt.New) + } + s.peerHandler.OnSecretChanged(secretEvent.SecretName) + case common.SecretDeleted: + secretEvent, ok := evt.New.(*common.SecretDeletedEvent) + if !ok { + return fmt.Errorf("evt.New is not a (*common.SecretDeletedEvent) %v", evt.New) + } + s.peerHandler.OnSecretDeleted(secretEvent.SecretName) default: s.log.Warnf("Unhandled CalicoVppEvent.Type: %s", evt.Type) } diff --git a/calico-vpp-agent/routing/bgp_watcher.go b/calico-vpp-agent/routing/bgp_watcher.go index b56d3793..9a6d628b 100644 --- a/calico-vpp-agent/routing/bgp_watcher.go +++ b/calico-vpp-agent/routing/bgp_watcher.go @@ -669,6 +669,12 @@ func (s *Server) WatchBGPPath(t *tomb.Tomb) error { } s.log.Infof("bgp(del) filter deleted: %s", filter.Name) delete(s.bgpFilters, filter.Name) + case common.PeerNodeStateChanged: + old, _ := evt.Old.(*common.LocalNodeSpec) + new, _ := evt.New.(*common.LocalNodeSpec) + if s.peerHandler != nil { + s.peerHandler.OnPeerNodeStateChanged(old, new) + } } } } diff --git a/calico-vpp-agent/routing/peer_handler.go b/calico-vpp-agent/routing/peer_handler.go new file mode 100644 index 00000000..272cb160 --- /dev/null +++ b/calico-vpp-agent/routing/peer_handler.go @@ -0,0 +1,556 @@ +// Copyright (C) 2025 Cisco Systems Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package routing + +import ( + "net" + + bgpapi "github.com/osrg/gobgp/v3/api" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + calicov3 "github.com/projectcalico/api/pkg/apis/projectcalico/v3" + "github.com/projectcalico/calico/libcalico-go/lib/selector" + + "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common" + "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/felix/cache" + "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/watchers" + "github.com/projectcalico/vpp-dataplane/v3/config" +) + +// BGPPeerState represents the state of a BGP peer +type BGPPeerState struct { + AS uint32 + SweepFlag bool + BGPPeerSpec *calicov3.BGPPeerSpec + SecretChanged bool +} + +// PeerHandler handles BGP peer configuration and business logic +type PeerHandler struct { + log *logrus.Entry + cache *cache.Cache + + nodeStatesByName map[string]common.LocalNodeSpec // nodeName -> nodeSpec + secretCache map[string]map[string]string // secretName -> key -> value + state map[string]*BGPPeerState // peerIP -> state +} + +func NewPeerHandler(cache *cache.Cache, log *logrus.Entry) *PeerHandler { + handler := &PeerHandler{ + log: log, + cache: cache, + nodeStatesByName: make(map[string]common.LocalNodeSpec), + secretCache: make(map[string]map[string]string), + state: make(map[string]*BGPPeerState), + } + + return handler +} + +func (h *PeerHandler) OnSecretAdded(secretData *common.SecretData) { + h.log.Debugf("Secret '%s' added to cache", secretData.Name) + if h.secretCache[secretData.Name] == nil { + h.secretCache[secretData.Name] = make(map[string]string) + } + for key, value := range secretData.Data { + h.secretCache[secretData.Name][key] = string(value) + } +} + +func (h *PeerHandler) OnSecretDeleted(secretName string) { + h.log.Debugf("Secret '%s' deleted from cache", secretName) + delete(h.secretCache, secretName) +} + +// selectsNode determines whether or not the selector mySelector +// matches the labels on the given node. +func (h *PeerHandler) selectsNode(mySelector string, n *common.LocalNodeSpec) (bool, error) { + // No node selector means that the selector matches the node. + if len(mySelector) == 0 { + return true, nil + } + // Check for valid selector syntax. + sel, err := selector.Parse(mySelector) + if err != nil { + return false, err + } + // Return whether or not the selector matches. + return sel.Evaluate(n.Labels), nil +} + +func (h *PeerHandler) shouldPeer(peer *calicov3.BGPPeer) bool { + matches, err := h.selectsNode(peer.Spec.NodeSelector, h.currentCalicoNode()) + if err != nil { + h.log.Error(errors.Wrapf(err, "Error in nodeSelector matching for peer %s", peer.Name)) + } + if (peer.Spec.Node != "" && peer.Spec.Node != *config.NodeName) || (peer.Spec.NodeSelector != "" && !matches) { + return false + } + return true +} + +func (h *PeerHandler) getAsNumber(node *common.LocalNodeSpec) uint32 { + if node.ASNumber == nil { + return uint32(*h.cache.BGPConf.ASNumber) + } else { + return uint32(*node.ASNumber) + } +} + +// Select among the nodes those that match with peerSelector +// Return corresponding ips and ASN in a map +func (h *PeerHandler) selectPeers(peerSelector string) map[string]uint32 { + ipAsn := make(map[string]uint32) + for _, node := range h.nodeStatesByName { + if node.Name == *config.NodeName { + continue // Don't peer with ourselves :) + } + matches, err := h.selectsNode(peerSelector, &node) + if err != nil { + h.log.Errorf("Error in peerSelector matching: %v", err) + } + if matches { + if node.IPv4Address != nil && h.currentCalicoNode().IPv4Address != nil { + ipAsn[node.IPv4Address.IP.String()] = h.getAsNumber(&node) + } + if node.IPv6Address != nil && h.currentCalicoNode().IPv6Address != nil { + ipAsn[node.IPv6Address.IP.String()] = h.getAsNumber(&node) + } + } + } + return ipAsn +} + +func (h *PeerHandler) currentCalicoNode() *common.LocalNodeSpec { + node := h.nodeStatesByName[*config.NodeName] + return &node +} + +func (h *PeerHandler) isMeshMode() bool { + if h.cache.BGPConf.NodeToNodeMeshEnabled != nil { + return *h.cache.BGPConf.NodeToNodeMeshEnabled + } + return true +} + +// Given peer's BGPPeerConf check if Password is set and return secret name +func (h *PeerHandler) getSecretName(spec *calicov3.BGPPeerSpec) string { + if spec.Password != nil && spec.Password.SecretKeyRef != nil { + return spec.Password.SecretKeyRef.Name + } + return "" +} + +// getSecretValueForPeer retrieves the actual secret value for a peer spec +func (h *PeerHandler) getSecretValueForPeer(spec *calicov3.BGPPeerSpec) string { + secretName := h.getSecretName(spec) + if secretName != "" { + return h.getSecretValue(spec) + } + return "" +} + +// getPeerIpAsnMap returns the IP to ASN mapping for a given peer +func (h *PeerHandler) getPeerIpAsnMap(peer *calicov3.BGPPeer) map[string]uint32 { + ipAsn := make(map[string]uint32) + if peer.Spec.PeerSelector != "" { + // this peer has a peerSelector, use it + ipAsn = h.selectPeers(peer.Spec.PeerSelector) + } else { + // use peerIP and ASNumber specified in the peer + ipAsn[peer.Spec.PeerIP] = uint32(peer.Spec.ASNumber) + } + return ipAsn +} + +func (h *PeerHandler) createBGPPeer(ip string, asn uint32, secretValue string) (*bgpapi.Peer, error) { + h.log.Infof("createBGPPeer with ip %s", ip) + ipAddr, err := net.ResolveIPAddr("ip", ip) + if err != nil { + return nil, err + } + + typ := &common.BgpFamilyUnicastIPv4 + typSRv6 := &common.BgpFamilySRv6IPv6 + typvpn4 := &common.BgpFamilyUnicastIPv4VPN + typvpn6 := &common.BgpFamilyUnicastIPv6VPN + + if ipAddr.IP.To4() == nil { + typ = &common.BgpFamilyUnicastIPv6 + } + + afiSafis := []*bgpapi.AfiSafi{ + { + Config: &bgpapi.AfiSafiConfig{ + Family: typ, + Enabled: true, + }, + MpGracefulRestart: &bgpapi.MpGracefulRestart{ + Config: &bgpapi.MpGracefulRestartConfig{ + Enabled: true, + }, + }, + }, + { + Config: &bgpapi.AfiSafiConfig{ + Family: typSRv6, + Enabled: true, + }, + }, + { + Config: &bgpapi.AfiSafiConfig{ + Family: typvpn4, + Enabled: true, + }, + }, + { + Config: &bgpapi.AfiSafiConfig{ + Family: typvpn6, + Enabled: true, + }, + }, + } + peer := &bgpapi.Peer{ + Conf: &bgpapi.PeerConf{ + NeighborAddress: ipAddr.String(), + PeerAsn: asn, + }, + GracefulRestart: &bgpapi.GracefulRestart{ + Enabled: true, + RestartTime: 120, + LonglivedEnabled: true, + NotificationEnabled: true, + }, + AfiSafis: afiSafis, + } + + if secretValue != "" { + peer.Conf.AuthPassword = secretValue + } + return peer, nil +} + +func (h *PeerHandler) addBGPPeer(ip string, asn uint32, peerSpec *calicov3.BGPPeerSpec, secretValue string) error { + peer, err := h.createBGPPeer(ip, asn, secretValue) + if err != nil { + return errors.Wrap(err, "cannot add bgp peer") + } + common.SendEvent(common.CalicoVppEvent{ + Type: common.BGPPeerAdded, + New: &watchers.LocalBGPPeer{Peer: peer, BGPFilterNames: peerSpec.Filters}, + }) + return nil +} + +func (h *PeerHandler) updateBGPPeer(ip string, asn uint32, peerSpec, oldPeerSpec *calicov3.BGPPeerSpec, secretValue string) error { + peer, err := h.createBGPPeer(ip, asn, secretValue) + if err != nil { + return errors.Wrap(err, "cannot update bgp peer") + } + common.SendEvent(common.CalicoVppEvent{ + Type: common.BGPPeerUpdated, + New: &watchers.LocalBGPPeer{Peer: peer, BGPFilterNames: peerSpec.Filters}, + Old: &watchers.LocalBGPPeer{BGPFilterNames: oldPeerSpec.Filters}, + }) + return nil +} + +func (h *PeerHandler) deleteBGPPeer(ip string) error { + common.SendEvent(common.CalicoVppEvent{ + Type: common.BGPPeerDeleted, + New: ip, + }) + return nil +} + +// ProcessPeers processes a list of BGP peers and reconciles them +func (h *PeerHandler) ProcessPeers(peers []calicov3.BGPPeer) error { + h.log.Debugf("Processing %d BGP peers", len(peers)) + + // Start mark and sweep + for _, p := range h.state { + p.SweepFlag = true + } + + // If in mesh mode, add a fake peer to the list to select all nodes + if h.isMeshMode() { + h.log.Debugf("Node to node mesh enabled") + peers = append(peers, calicov3.BGPPeer{ + ObjectMeta: metav1.ObjectMeta{ + Name: " virtual full mesh peer", + }, + Spec: calicov3.BGPPeerSpec{ + Node: *config.NodeName, + PeerSelector: "all()", + }, + }) + } else { + h.log.Debugf("Node to node mesh disabled") + } + + // Process all peers + for _, peer := range peers { + if !h.shouldPeer(&peer) { + continue + } + ipAsn := h.getPeerIpAsnMap(&peer) + for ip, asn := range ipAsn { + existing, ok := h.state[ip] + if ok { + h.log.Debugf("peer(update) neighbor ip=%s for BGPPeer=%s", ip, peer.Name) + existing.SweepFlag = false + oldSecret := h.getSecretName(existing.BGPPeerSpec) + newSecret := h.getSecretName(&peer.Spec) + + // Get the secret value if needed + secretValue := h.getSecretValueForPeer(&peer.Spec) + + if oldSecret != newSecret || existing.SecretChanged { + h.log.Infof("peer(upd-secret) neighbor ip=%s oldSecret=%s newSecret=%s", ip, oldSecret, newSecret) + err := h.updateBGPPeer(ip, asn, &peer.Spec, existing.BGPPeerSpec, secretValue) + if err != nil { + return errors.Wrapf(err, "Error updating bgp peer %s", ip) + } + existing.BGPPeerSpec = &peer.Spec + existing.SecretChanged = false + } else { + h.log.Debugf("peer(same) neighbor ip=%s", ip) + } + } else { + // New peer + h.log.Infof("peer(add) neighbor ip=%s for BGPPeer=%s", ip, peer.Name) + + // Get the secret value if needed + secretValue := h.getSecretValueForPeer(&peer.Spec) + + err := h.addBGPPeer(ip, asn, &peer.Spec, secretValue) + if err != nil { + return errors.Wrapf(err, "Error adding bgp peer %s", ip) + } + h.state[ip] = &BGPPeerState{ + AS: asn, + SweepFlag: false, + BGPPeerSpec: &peer.Spec, + SecretChanged: false, + } + } + } + } + + // Remove all peers that still have sweepflag to true + for ip, peer := range h.state { + if peer.SweepFlag { + h.log.Infof("peer(del) neighbor ip=%s", ip) + err := h.deleteBGPPeer(ip) + if err != nil { + return errors.Wrapf(err, "Error deleting bgp peer %s", ip) + } + delete(h.state, ip) + } + } + + return nil +} + +// OnPeerAdded handles a single peer being added +func (h *PeerHandler) OnPeerAdded(peer *calicov3.BGPPeer) error { + h.log.Debugf("OnPeerAdded: %s", peer.Name) + + if !h.shouldPeer(peer) { + h.log.Debugf("Skipping peer %s (doesn't match node selector)", peer.Name) + return nil + } + + ipAsn := h.getPeerIpAsnMap(peer) + + for ip, asn := range ipAsn { + if _, exists := h.state[ip]; exists { + // Already exists, treat as update + h.log.Debugf("Peer %s already exists for IP %s, treating as update", peer.Name, ip) + continue + } + + h.log.Infof("peer(add) neighbor ip=%s for BGPPeer=%s", ip, peer.Name) + + // Get the secret value if needed + secretValue := h.getSecretValueForPeer(&peer.Spec) + + err := h.addBGPPeer(ip, asn, &peer.Spec, secretValue) + if err != nil { + return errors.Wrapf(err, "Error adding bgp peer %s", ip) + } + h.state[ip] = &BGPPeerState{ + AS: asn, + SweepFlag: false, + BGPPeerSpec: &peer.Spec, + SecretChanged: false, + } + } + + return nil +} + +// OnPeerUpdated handles a single peer being updated +func (h *PeerHandler) OnPeerUpdated(oldPeer, newPeer *calicov3.BGPPeer) error { + h.log.Debugf("OnPeerUpdated: %s", newPeer.Name) + + // Remove old peer's IPs if selector changed + if !h.shouldPeer(oldPeer) && h.shouldPeer(newPeer) { + // Peer now matches, treat as add + return h.OnPeerAdded(newPeer) + } else if h.shouldPeer(oldPeer) && !h.shouldPeer(newPeer) { + // Peer no longer matches, treat as delete + return h.OnPeerDeleted(oldPeer) + } else if !h.shouldPeer(newPeer) { + // Still doesn't match, ignore + return nil + } + + // Get old and new IP/ASN mappings + oldIpAsn := h.getPeerIpAsnMap(oldPeer) + newIpAsn := h.getPeerIpAsnMap(newPeer) + + // Delete IPs that are no longer in the new peer + for ip := range oldIpAsn { + if _, exists := newIpAsn[ip]; !exists { + h.log.Infof("peer(del) neighbor ip=%s (no longer in peer %s)", ip, newPeer.Name) + err := h.deleteBGPPeer(ip) + if err != nil { + return errors.Wrapf(err, "Error deleting bgp peer %s", ip) + } + delete(h.state, ip) + } + } + + // Add or update IPs in the new peer + for ip, asn := range newIpAsn { + existing, ok := h.state[ip] + if ok { + h.log.Debugf("peer(update) neighbor ip=%s for BGPPeer=%s", ip, newPeer.Name) + oldSecret := h.getSecretName(existing.BGPPeerSpec) + newSecret := h.getSecretName(&newPeer.Spec) + + // Get the secret value if needed + secretValue := h.getSecretValueForPeer(&newPeer.Spec) + + if oldSecret != newSecret || existing.SecretChanged { + h.log.Infof("peer(upd-secret) neighbor ip=%s oldSecret=%s newSecret=%s", ip, oldSecret, newSecret) + err := h.updateBGPPeer(ip, asn, &newPeer.Spec, existing.BGPPeerSpec, secretValue) + if err != nil { + return errors.Wrapf(err, "Error updating bgp peer %s", ip) + } + existing.BGPPeerSpec = &newPeer.Spec + existing.SecretChanged = false + } else { + h.log.Debugf("peer(same) neighbor ip=%s", ip) + } + } else { + // New IP for this peer + h.log.Infof("peer(add) neighbor ip=%s for BGPPeer=%s", ip, newPeer.Name) + + // Get the secret value if needed + secretValue := h.getSecretValueForPeer(&newPeer.Spec) + + err := h.addBGPPeer(ip, asn, &newPeer.Spec, secretValue) + if err != nil { + return errors.Wrapf(err, "Error adding bgp peer %s", ip) + } + h.state[ip] = &BGPPeerState{ + AS: asn, + SweepFlag: false, + BGPPeerSpec: &newPeer.Spec, + SecretChanged: false, + } + } + } + + return nil +} + +// OnPeerDeleted handles a single peer being deleted +func (h *PeerHandler) OnPeerDeleted(peer *calicov3.BGPPeer) error { + h.log.Debugf("OnPeerDeleted: %s", peer.Name) + + if !h.shouldPeer(peer) { + h.log.Debugf("Skipping peer %s (doesn't match node selector)", peer.Name) + return nil + } + + ipAsn := h.getPeerIpAsnMap(peer) + + for ip := range ipAsn { + if _, exists := h.state[ip]; !exists { + h.log.Debugf("Peer IP %s not found in state, skipping", ip) + continue + } + + h.log.Infof("peer(del) neighbor ip=%s for BGPPeer=%s", ip, peer.Name) + err := h.deleteBGPPeer(ip) + if err != nil { + return errors.Wrapf(err, "Error deleting bgp peer %s", ip) + } + delete(h.state, ip) + } + + return nil +} + +// getSecretValue retrieves the actual secret value from the secret cache +func (h *PeerHandler) getSecretValue(spec *calicov3.BGPPeerSpec) string { + if spec.Password == nil || spec.Password.SecretKeyRef == nil { + return "" + } + + secretName := spec.Password.SecretKeyRef.Name + secretKey := spec.Password.SecretKeyRef.Key + + if secretData, ok := h.secretCache[secretName]; ok { + if value, ok := secretData[secretKey]; ok { + return value + } + h.log.Warnf("Secret %s does not have key %s", secretName, secretKey) + } else { + h.log.Warnf("Secret %s not found in cache", secretName) + } + + return "" +} + +// OnPeerNodeStateChanged handles peer node state changes +func (h *PeerHandler) OnPeerNodeStateChanged(old, new *common.LocalNodeSpec) { + if old != nil { + delete(h.nodeStatesByName, old.Name) + } + if new != nil { + h.nodeStatesByName[new.Name] = *new + } + h.log.Debugf("Nodes updated in peer handler, old %v new %v", old, new) +} + +// OnSecretChanged handles secret changes +func (h *PeerHandler) OnSecretChanged(secretName string) { + h.log.Infof("Secret '%s' changed, marking affected peers for update", secretName) + + // sweep through the peers and update the SecretChanged field of impacted peers + for _, peer := range h.state { + if h.getSecretName(peer.BGPPeerSpec) == secretName { + h.log.Infof("SecretChanged field set for peer=%s", peer.BGPPeerSpec.PeerIP) + peer.SecretChanged = true + } + } +} diff --git a/calico-vpp-agent/routing/routing_server.go b/calico-vpp-agent/routing/routing_server.go index 89a5081b..ecfa82a3 100644 --- a/calico-vpp-agent/routing/routing_server.go +++ b/calico-vpp-agent/routing/routing_server.go @@ -58,6 +58,7 @@ type Server struct { routingServerEventChan chan any nodeBGPSpec *common.LocalNodeSpec + peerHandler *PeerHandler } func (s *Server) SetBGPConf(bgpConf *calicov3.BGPConfigurationSpec) { @@ -75,6 +76,11 @@ func (s *Server) SetOurBGPSpec(nodeBGPSpec *common.LocalNodeSpec) { s.nodeBGPSpec = nodeBGPSpec } +// SetPeerHandler sets the peer handler for the routing server +func (s *Server) SetPeerHandler(peerHandler *PeerHandler) { + s.peerHandler = peerHandler +} + func NewRoutingServer(vpp *vpplink.VppLink, bgpServer *bgpserver.BgpServer, log *logrus.Entry) *Server { server := Server{ log: log, @@ -100,6 +106,7 @@ func NewRoutingServer(vpp *vpplink.VppLink, bgpServer *bgpserver.BgpServer, log common.BGPPeerUpdated, common.BGPFilterAddedOrUpdated, common.BGPFilterDeleted, + common.PeerNodeStateChanged, ) return &server diff --git a/calico-vpp-agent/watchers/peers_watcher.go b/calico-vpp-agent/watchers/peers_watcher.go index 281e9551..da59677a 100644 --- a/calico-vpp-agent/watchers/peers_watcher.go +++ b/calico-vpp-agent/watchers/peers_watcher.go @@ -16,7 +16,6 @@ package watchers import ( - "net" "reflect" "sort" "time" @@ -26,20 +25,15 @@ import ( "github.com/sirupsen/logrus" "golang.org/x/net/context" "gopkg.in/tomb.v2" - - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" calicov3 "github.com/projectcalico/api/pkg/apis/projectcalico/v3" "github.com/projectcalico/calico/libcalico-go/lib/backend/api" calicov3cli "github.com/projectcalico/calico/libcalico-go/lib/clientv3" "github.com/projectcalico/calico/libcalico-go/lib/options" - "github.com/projectcalico/calico/libcalico-go/lib/selector" "github.com/projectcalico/calico/libcalico-go/lib/watch" "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common" - "github.com/projectcalico/vpp-dataplane/v3/config" ) type LocalBGPPeer struct { @@ -64,102 +58,35 @@ type PeerWatcher struct { log *logrus.Entry clientv3 calicov3cli.Interface - // Subcomponent for accessing and watching secrets (that hold BGP passwords). - secretWatcher *secretWatcher - - nodeStatesByName map[string]common.LocalNodeSpec - peerWatcherEventChan chan any - BGPConf *calicov3.BGPConfigurationSpec watcher watch.Interface currentWatchRevision string -} - -type bgpPeer struct { - AS uint32 - SweepFlag bool - BGPPeerSpec *calicov3.BGPPeerSpec - SecretChanged bool -} - -// selectsNode determines whether or not the selector mySelector -// matches the labels on the given node. -func selectsNode(mySelector string, n *common.LocalNodeSpec) (bool, error) { - // No node selector means that the selector matches the node. - if len(mySelector) == 0 { - return true, nil - } - // Check for valid selector syntax. - sel, err := selector.Parse(mySelector) - if err != nil { - return false, err - } - // Return whether or not the selector matches. - return sel.Evaluate(n.Labels), nil -} - -func (w *PeerWatcher) shouldPeer(peer *calicov3.BGPPeer) bool { - matches, err := selectsNode(peer.Spec.NodeSelector, w.currentCalicoNode()) - if err != nil { - w.log.Error(errors.Wrapf(err, "Error in nodeSelector matching for peer %s", peer.Name)) - } - if (peer.Spec.Node != "" && peer.Spec.Node != *config.NodeName) || (peer.Spec.NodeSelector != "" && !matches) { - return false - } - return true -} -func (w *PeerWatcher) getAsNumber(node *common.LocalNodeSpec) uint32 { - if node.ASNumber == nil { - return uint32(*w.BGPConf.ASNumber) - } else { - return uint32(*node.ASNumber) - } + secretWatcher *secretWatcher + cachedPeers map[string]calicov3.BGPPeer // name -> peer } -// Select among the nodes those that match with peerSelector -// Return corresponding ips and ASN in a map -func (w *PeerWatcher) selectPeers(peerSelector string) map[string]uint32 { - ipAsn := make(map[string]uint32) - for _, node := range w.nodeStatesByName { - if node.Name == *config.NodeName { - continue // Don't peer with ourselves :) - } - matches, err := selectsNode(peerSelector, &node) - if err != nil { - w.log.Errorf("Error in peerSelector matching: %v", err) - } - if matches { - if node.IPv4Address != nil && w.currentCalicoNode().IPv4Address != nil { - ipAsn[node.IPv4Address.IP.String()] = w.getAsNumber(&node) - } - if node.IPv6Address != nil && w.currentCalicoNode().IPv6Address != nil { - ipAsn[node.IPv6Address.IP.String()] = w.getAsNumber(&node) - } - } +func NewPeerWatcher(clientv3 calicov3cli.Interface, k8sclient *kubernetes.Clientset, log *logrus.Entry) *PeerWatcher { + w := &PeerWatcher{ + log: log, + clientv3: clientv3, + secretWatcher: NewSecretWatcher(k8sclient), + cachedPeers: make(map[string]calicov3.BGPPeer), } - return ipAsn -} -func (w *PeerWatcher) currentCalicoNode() *common.LocalNodeSpec { - node := w.nodeStatesByName[*config.NodeName] - return &node + return w } -// This function watches BGP peers configured in Calico -// These peers are configured in GoBGP in addition to the other nodes in the cluster -// They may also control which nodes to pair with if the peerSelector is set +// WatchBGPPeers watches BGP peers configured in Calico and emits granular events func (w *PeerWatcher) WatchBGPPeers(t *tomb.Tomb) error { w.log.Infof("PEER watcher starts") - state := make(map[string]*bgpPeer) for t.Alive() { w.currentWatchRevision = "" - err := w.resyncAndCreateWatcher(state) + err := w.resyncAndCreateWatcher() if err != nil { w.log.Error(err) goto restart } - // node and peer updates should be infrequent enough so just reevaluate - // all peerings everytime there is an update. + select { case <-t.Dying(): w.log.Infof("Peers Watcher asked to stop") @@ -167,7 +94,7 @@ func (w *PeerWatcher) WatchBGPPeers(t *tomb.Tomb) error { return nil case event, ok := <-w.watcher.ResultChan(): if !ok { - err := w.resyncAndCreateWatcher(state) + err := w.resyncAndCreateWatcher() if err != nil { goto restart } @@ -177,70 +104,36 @@ func (w *PeerWatcher) WatchBGPPeers(t *tomb.Tomb) error { case watch.EventType(api.WatchError): w.log.Debug("peers watch returned, restarting...") goto restart + case watch.Added: + peer := event.Object.(*calicov3.BGPPeer) + w.log.Infof("Peer added: %s", peer.Name) + w.cachedPeers[peer.Name] = *peer + w.updateSecretWatcher() + common.SendEvent(common.CalicoVppEvent{ + Type: common.PeerAdded, + New: &common.PeerAddedEvent{Peer: *peer}, + }) + case watch.Modified: + peer := event.Object.(*calicov3.BGPPeer) + w.log.Infof("Peer updated: %s", peer.Name) + old := w.cachedPeers[peer.Name] + w.cachedPeers[peer.Name] = *peer + w.updateSecretWatcher() + common.SendEvent(common.CalicoVppEvent{ + Type: common.PeerUpdated, + New: &common.PeerUpdatedEvent{Old: old, New: *peer}, + }) + case watch.Deleted: + peer := event.Previous.(*calicov3.BGPPeer) + w.log.Infof("Peer deleted: %s", peer.Name) + delete(w.cachedPeers, peer.Name) + w.updateSecretWatcher() + common.SendEvent(common.CalicoVppEvent{ + Type: common.PeerDeleted, + New: &common.PeerDeletedEvent{Peer: *peer}, + }) default: - w.log.Info("Peers updated, reevaluating peerings") - } - case msg := <-w.peerWatcherEventChan: - evt, ok := msg.(common.CalicoVppEvent) - if !ok { - continue - } - /* Note: we will only receive events we ask for when registering the chan */ - switch evt.Type { - case common.PeerNodeStateChanged: - old, _ := evt.Old.(*common.LocalNodeSpec) - new, _ := evt.New.(*common.LocalNodeSpec) - if old != nil { - delete(w.nodeStatesByName, old.Name) - } - if new != nil { - w.nodeStatesByName[new.Name] = *new - } - w.log.Debugf("Nodes updated, reevaluating peerings old %v new %v", old, new) - case common.BGPSecretChanged: - old, _ := evt.Old.(*v1.Secret) - new, _ := evt.New.(*v1.Secret) - secretEvt := "" - secretName := "" - // secret added - if old == nil && new != nil { - secretEvt = "add" - secretName = new.Name - w.log.Infof("New secret '%s' added", new.Name) - } - // secret deleted - if old != nil && new == nil { - secretEvt = "del" - secretName = old.Name - w.log.Infof("secret '%s' deleted", old.Name) - } - // secret updated - if old != nil && new != nil { - secretEvt = "upd" - secretName = old.Name - w.log.Infof("secret '%s' updated", old.Name) - } - // sweep through the peers and update the SecretChanged field of impacted peers - for _, peer := range state { - switch secretEvt { - case "add": - // Note: any future add event specifc processing code goes here. For now we fallthrough. - fallthrough - case "del": - // Note: any future delete event specifc processing code goes here. For now we fallthrough. - fallthrough - case "upd": - // BGP password has changed - if w.getSecretName(peer.BGPPeerSpec) == secretName { - w.log.Infof("SecretChanged field set for peer=%s", peer.BGPPeerSpec.PeerIP) - peer.SecretChanged = true - } - default: - w.log.Warn("Unrecognized secret change event received. Ignoring...") - } - } - default: - goto restart + w.log.Warnf("Unknown watch event type: %v", event.Type) } } @@ -266,110 +159,12 @@ func CompareStringSlices(slice1, slice2 []string) bool { return reflect.DeepEqual(slice1, slice2) } -func (w *PeerWatcher) resyncAndCreateWatcher(state map[string]*bgpPeer) error { +func (w *PeerWatcher) resyncAndCreateWatcher() error { if w.currentWatchRevision == "" { - w.log.Debugf("Reconciliating peers...") - peers, err := w.clientv3.BGPPeers().List(context.Background(), options.ListOptions{ - ResourceVersion: w.currentWatchRevision, - }) + err := w.resyncPeers() if err != nil { - return errors.Wrap(err, "cannot list bgp peers") - } - w.currentWatchRevision = peers.ResourceVersion - // Start mark and sweep - for _, p := range state { - p.SweepFlag = true - } - - // If in mesh mode, add a fake peer to the list to select all nodes - if w.isMeshMode() { - w.log.Debugf("Node to node mesh enabled") - peers.Items = append(peers.Items, calicov3.BGPPeer{ - ObjectMeta: metav1.ObjectMeta{ - Name: " virtual full mesh peer", - }, - Spec: calicov3.BGPPeerSpec{ - Node: *config.NodeName, - PeerSelector: "all()", - }, - }) - } else { - w.log.Debugf("Node to node mesh disabled") - } - // Initialize the set consisting of active secrets - activeSecrets := map[string]struct{}{} - for _, peer := range peers.Items { - if !w.shouldPeer(&peer) { - continue - } - ipAsn := make(map[string]uint32) - if peer.Spec.PeerSelector != "" { - // this peer has a peerSelector, use it - ipAsn = w.selectPeers(peer.Spec.PeerSelector) - } else { - // use peerIP and ASNumber specified in the peer - ipAsn[peer.Spec.PeerIP] = uint32(peer.Spec.ASNumber) - } - for ip, asn := range ipAsn { - existing, ok := state[ip] - if ok { - w.log.Debugf("peer(update) neighbor ip=%s for BGPPeer=%s", ip, peer.Name) - existing.SweepFlag = false - oldSecret := w.getSecretName(existing.BGPPeerSpec) - newSecret := w.getSecretName(&peer.Spec) - w.log.Debugf("peer(update) oldSecret=%s newSecret=%s SecretChanged=%t for BGPPeer=%s", oldSecret, newSecret, existing.SecretChanged, peer.Name) - filtersChanged := !CompareStringSlices(existing.BGPPeerSpec.Filters, peer.Spec.Filters) - if existing.AS != asn || oldSecret != newSecret || existing.SecretChanged || filtersChanged { - err := w.updateBGPPeer(ip, asn, &peer.Spec, existing.BGPPeerSpec) - if err != nil { - w.log.Warn(errors.Wrapf(err, "error updating BGP peer %s, ip=%s", peer.Name, ip)) - continue - } - existing.AS = asn - existing.BGPPeerSpec = peer.Spec.DeepCopy() - existing.SecretChanged = false - } // Else no change, nothing to do - } else { - // New peer - w.log.Infof("peer(add) neighbor ip=%s for BGPPeer=%s", ip, peer.Name) - err := w.addBGPPeer(ip, asn, &peer.Spec) - if err != nil { - w.log.Warn(errors.Wrapf(err, "error adding BGP peer %s, ip=%s", peer.Name, ip)) - // Add the secret to the set of active secrets so it does not get cleaned up - secretName := w.getSecretName(&peer.Spec) - if secretName != "" { - activeSecrets[secretName] = struct{}{} - } - continue - } - state[ip] = &bgpPeer{ - AS: asn, - SweepFlag: false, - SecretChanged: false, - BGPPeerSpec: peer.Spec.DeepCopy(), - } - } - } - } - // Remove all peers that still have sweepflag to true - for ip, peer := range state { - if peer.SweepFlag { - w.log.Infof("peer(del) neighbor ip=%s", ip) - err := w.deleteBGPPeer(ip) - if err != nil { - w.log.Warn(errors.Wrapf(err, "error deleting BGP peer %s", ip)) - } - delete(state, ip) - } - } - // Clean up any secrets that are no longer referenced by any bgp peers - for _, peer := range state { - secretName := w.getSecretName(peer.BGPPeerSpec) - if secretName != "" { - activeSecrets[secretName] = struct{}{} - } + return err } - w.secretWatcher.SweepStale(activeSecrets) } w.cleanExistingWatcher() watcher, err := w.clientv3.BGPPeers().Watch( @@ -383,176 +178,47 @@ func (w *PeerWatcher) resyncAndCreateWatcher(state map[string]*bgpPeer) error { return nil } -func (w *PeerWatcher) cleanExistingWatcher() { - if w.watcher != nil { - w.watcher.Stop() - w.log.Debug("Stopped watcher") - w.watcher = nil - } -} - -func (w *PeerWatcher) createBGPPeer(ip string, asn uint32, peerSpec *calicov3.BGPPeerSpec) (*bgpapi.Peer, error) { - w.log.Infof("createBGPPeer with ip %s", ip) - ipAddr, err := net.ResolveIPAddr("ip", ip) +func (w *PeerWatcher) resyncPeers() error { + w.log.Debugf("Reconciliating peers...") + peers, err := w.clientv3.BGPPeers().List(context.Background(), options.ListOptions{ + ResourceVersion: w.currentWatchRevision, + }) if err != nil { - return nil, err + return errors.Wrap(err, "cannot list bgp peers") } + w.currentWatchRevision = peers.ResourceVersion - typ := &common.BgpFamilyUnicastIPv4 - typSRv6 := &common.BgpFamilySRv6IPv6 - typvpn4 := &common.BgpFamilyUnicastIPv4VPN - typvpn6 := &common.BgpFamilyUnicastIPv6VPN - - if ipAddr.IP.To4() == nil { - typ = &common.BgpFamilyUnicastIPv6 + // Update local cache + w.cachedPeers = make(map[string]calicov3.BGPPeer) + for _, peer := range peers.Items { + w.cachedPeers[peer.Name] = peer } - afiSafis := []*bgpapi.AfiSafi{ - { - Config: &bgpapi.AfiSafiConfig{ - Family: typ, - Enabled: true, - }, - MpGracefulRestart: &bgpapi.MpGracefulRestart{ - Config: &bgpapi.MpGracefulRestartConfig{ - Enabled: true, - }, - }, - }, - { - Config: &bgpapi.AfiSafiConfig{ - Family: typSRv6, - Enabled: true, - }, - }, - { - Config: &bgpapi.AfiSafiConfig{ - Family: typvpn4, - Enabled: true, - }, - }, - { - Config: &bgpapi.AfiSafiConfig{ - Family: typvpn6, - Enabled: true, - }, - }, - } - peer := &bgpapi.Peer{ - Conf: &bgpapi.PeerConf{ - NeighborAddress: ipAddr.String(), - PeerAsn: asn, - }, - GracefulRestart: &bgpapi.GracefulRestart{ - Enabled: true, - RestartTime: 120, - LonglivedEnabled: true, - NotificationEnabled: true, - }, - AfiSafis: afiSafis, - } - - if w.getSecretKeyRef(peerSpec) != nil { - peer.Conf.AuthPassword, err = w.getPassword(peerSpec.Password.SecretKeyRef) - if err != nil { - return nil, err - } - } - return peer, nil -} + // Update secret watcher with current peer list + w.updateSecretWatcher() -func (w *PeerWatcher) addBGPPeer(ip string, asn uint32, peerSpec *calicov3.BGPPeerSpec) error { - peer, err := w.createBGPPeer(ip, asn, peerSpec) - if err != nil { - return errors.Wrap(err, "cannot add bgp peer") - } + // Emit event for initial peer list processing common.SendEvent(common.CalicoVppEvent{ - Type: common.BGPPeerAdded, - New: &LocalBGPPeer{Peer: peer, BGPFilterNames: peerSpec.Filters}, + Type: common.PeersChanged, + New: &common.PeersChangedEvent{Peers: peers.Items}, }) - return nil -} -func (w *PeerWatcher) updateBGPPeer(ip string, asn uint32, peerSpec, oldPeerSpec *calicov3.BGPPeerSpec) error { - peer, err := w.createBGPPeer(ip, asn, peerSpec) - if err != nil { - return errors.Wrap(err, "cannot update bgp peer") - } - common.SendEvent(common.CalicoVppEvent{ - Type: common.BGPPeerUpdated, - New: &LocalBGPPeer{Peer: peer, BGPFilterNames: peerSpec.Filters}, - Old: &LocalBGPPeer{BGPFilterNames: oldPeerSpec.Filters}, - }) return nil } -func (w *PeerWatcher) deleteBGPPeer(ip string) error { - common.SendEvent(common.CalicoVppEvent{ - Type: common.BGPPeerDeleted, - New: ip, - }) - return nil -} - -func (w *PeerWatcher) isMeshMode() bool { - if w.BGPConf.NodeToNodeMeshEnabled != nil { - return *w.BGPConf.NodeToNodeMeshEnabled - } - return true -} - -func (w *PeerWatcher) SetBGPConf(bgpConf *calicov3.BGPConfigurationSpec) { - w.BGPConf = bgpConf -} - -// Given peer's BGPPeerConf check if Password is set and return SecretKeyRef -func (w *PeerWatcher) getSecretKeyRef(spec *calicov3.BGPPeerSpec) *v1.SecretKeySelector { - if spec.Password != nil && spec.Password.SecretKeyRef != nil { - return spec.Password.SecretKeyRef +// updateSecretWatcher updates the secret watcher with the current peer list +func (w *PeerWatcher) updateSecretWatcher() { + peers := make([]calicov3.BGPPeer, 0, len(w.cachedPeers)) + for _, peer := range w.cachedPeers { + peers = append(peers, peer) } - return nil -} - -// Given peer's BGPPeerConf check if Password is set and return secret name -func (w *PeerWatcher) getSecretName(spec *calicov3.BGPPeerSpec) string { - if spec.Password != nil && spec.Password.SecretKeyRef != nil { - return spec.Password.SecretKeyRef.Name - } - return "" -} - -// Get the BGP password from SecretWatcher -func (w *PeerWatcher) getPassword(secretKeySelector *v1.SecretKeySelector) (string, error) { - password, err := w.secretWatcher.GetSecret( - secretKeySelector.Name, - secretKeySelector.Key, - ) - return password, err + w.secretWatcher.OnPeerListUpdated(peers) } -// This function gets called from SecretWatcher when a secret is added, updated or deleted -func (w *PeerWatcher) OnSecretUpdate(old, new *v1.Secret) { - common.SendEvent(common.CalicoVppEvent{ - Type: common.BGPSecretChanged, - Old: old, - New: new, - }) -} - -func NewPeerWatcher(clientv3 calicov3cli.Interface, k8sclient *kubernetes.Clientset, log *logrus.Entry) *PeerWatcher { - var err error - w := PeerWatcher{ - clientv3: clientv3, - nodeStatesByName: make(map[string]common.LocalNodeSpec), - log: log, - peerWatcherEventChan: make(chan any, common.ChanSize), - } - w.secretWatcher, err = NewSecretWatcher(&w, k8sclient) - if err != nil { - log.Fatalf("NewSecretWatcher failed with %s", err) +func (w *PeerWatcher) cleanExistingWatcher() { + if w.watcher != nil { + w.watcher.Stop() + w.log.Debug("Stopped watcher") + w.watcher = nil } - reg := common.RegisterHandler(w.peerWatcherEventChan, "peers watcher events") - reg.ExpectEvents(common.PeerNodeStateChanged, common.BGPSecretChanged) - - return &w } diff --git a/calico-vpp-agent/watchers/secret_watcher.go b/calico-vpp-agent/watchers/secret_watcher.go index 7655c58c..2a0f9c3d 100644 --- a/calico-vpp-agent/watchers/secret_watcher.go +++ b/calico-vpp-agent/watchers/secret_watcher.go @@ -30,11 +30,12 @@ package watchers import ( - "fmt" "os" "sync" "time" + calicov3 "github.com/projectcalico/api/pkg/apis/projectcalico/v3" + "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common" log "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" @@ -50,22 +51,15 @@ type secretWatchData struct { secret *v1.Secret } -type SecretWatcherClient interface { - // this function is invoked upon add|update|delete of a secret - OnSecretUpdate(old, new *v1.Secret) -} - type secretWatcher struct { - client SecretWatcherClient namespace string k8sClientset *kubernetes.Clientset mutex sync.Mutex watches map[string]*secretWatchData } -func NewSecretWatcher(c SecretWatcherClient, k8sclient *kubernetes.Clientset) (*secretWatcher, error) { +func NewSecretWatcher(k8sclient *kubernetes.Clientset) *secretWatcher { sw := &secretWatcher{ - client: c, watches: make(map[string]*secretWatchData), k8sClientset: k8sclient, } @@ -74,11 +68,11 @@ func NewSecretWatcher(c SecretWatcherClient, k8sclient *kubernetes.Clientset) (* // are being run in a namespace other than calico-vpp-dataplane) sw.namespace = os.Getenv("NAMESPACE") if sw.namespace == "" { - // Default to kube-system. + // Default to calico-vpp-dataplane. sw.namespace = "calico-vpp-dataplane" } - return sw, nil + return sw } func (sw *secretWatcher) ensureWatchingSecret(name string) { @@ -137,25 +131,6 @@ func (sw *secretWatcher) allowTimeForControllerSync(name string, controller cach log.Debug("Relock...") } -func (sw *secretWatcher) GetSecret(name, key string) (string, error) { - sw.mutex.Lock() - defer sw.mutex.Unlock() - log.Debugf("Get secret for name '%v' key '%v'", name, key) - - // Ensure that we're watching this secret. - sw.ensureWatchingSecret(name) - - // Get and decode the key of interest. - if sw.watches[name].secret == nil { - return "", fmt.Errorf("no data available for secret %v", name) - } - if data, ok := sw.watches[name].secret.Data[key]; ok { - return string(data), nil - } else { - return "", fmt.Errorf("secret %v does not have key %v", name, key) - } -} - func (sw *secretWatcher) OnAdd(obj interface{}, isInInitialList bool) { sw.mutex.Lock() defer sw.mutex.Unlock() @@ -165,13 +140,21 @@ func (sw *secretWatcher) OnAdd(obj interface{}, isInInitialList bool) { panic("secret add, old is not *v1.Secret") } sw.watches[secret.Name].secret = secret - sw.client.OnSecretUpdate(nil, secret) + common.SendEvent(common.CalicoVppEvent{ + Type: common.SecretAdded, + New: &common.SecretAddedEvent{ + Secret: &common.SecretData{ + Name: secret.Name, + Data: secret.Data, + }, + }, + }) } func (sw *secretWatcher) OnUpdate(oldObj, newObj interface{}) { sw.mutex.Lock() defer sw.mutex.Unlock() - oldSecret, ok := oldObj.(*v1.Secret) + _, ok := oldObj.(*v1.Secret) if !ok { panic("secret update, old is not *v1.Secret") } @@ -181,7 +164,10 @@ func (sw *secretWatcher) OnUpdate(oldObj, newObj interface{}) { } log.Debug("Secret updated") sw.watches[secret.Name].secret = secret - sw.client.OnSecretUpdate(oldSecret, secret) + common.SendEvent(common.CalicoVppEvent{ + Type: common.SecretChanged, + New: &common.SecretChangedEvent{SecretName: secret.Name}, + }) } func (sw *secretWatcher) OnDelete(obj interface{}) { @@ -193,13 +179,28 @@ func (sw *secretWatcher) OnDelete(obj interface{}) { panic("secret delete, old is not *v1.Secret") } sw.watches[secret.Name].secret = nil - sw.client.OnSecretUpdate(secret, nil) + common.SendEvent(common.CalicoVppEvent{ + Type: common.SecretDeleted, + New: &common.SecretDeletedEvent{SecretName: secret.Name}, + }) } -func (sw *secretWatcher) SweepStale(activeSecrets map[string]struct{}) { +// OnPeerListUpdated updates the list of active secrets, starts watching new ones, and sweeps stale ones +func (sw *secretWatcher) OnPeerListUpdated(peers []calicov3.BGPPeer) { sw.mutex.Lock() defer sw.mutex.Unlock() + activeSecrets := make(map[string]struct{}) + for _, peer := range peers { + if peer.Spec.Password != nil && peer.Spec.Password.SecretKeyRef != nil { + secretName := peer.Spec.Password.SecretKeyRef.Name + activeSecrets[secretName] = struct{}{} + // Start watching this secret if we're not already + sw.ensureWatchingSecret(secretName) + } + } + + // Sweep stale secrets for name, watchData := range sw.watches { if _, ok := activeSecrets[name]; !ok { log.Debugf("Deleting secret '%s'", name) diff --git a/go.mod b/go.mod index 72f28edc..8dad6eba 100644 --- a/go.mod +++ b/go.mod @@ -97,10 +97,10 @@ require ( github.com/spf13/viper v1.20.0 // indirect github.com/subosito/gotenv v1.6.0 // indirect github.com/x448/float16 v0.8.4 // indirect + github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778 // indirect go.etcd.io/etcd/api/v3 v3.5.19 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.19 // indirect go.etcd.io/etcd/client/v3 v3.5.19 // indirect - github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778 // indirect go.opencensus.io v0.24.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect