diff --git a/calico-vpp-agent/cmd/calico_vpp_dataplane.go b/calico-vpp-agent/cmd/calico_vpp_dataplane.go index bae4d9c2..53d39478 100644 --- a/calico-vpp-agent/cmd/calico_vpp_dataplane.go +++ b/calico-vpp-agent/cmd/calico_vpp_dataplane.go @@ -147,6 +147,9 @@ func main() { cniServer := watchers.NewCNIServer(felixServer.GetFelixServerEventChan(), log.WithFields(logrus.Fields{"component": "cni"})) serviceServer := watchers.NewServiceServer(felixServer.GetFelixServerEventChan(), k8sclient, log.WithFields(logrus.Fields{"component": "services"})) + felixServer.GetRouteHandler().SetRouteWatcher(routeWatcher) + netWatcher.SetRouteHandler(felixServer.GetRouteHandler()) + err = watchers.InstallFelixPlugin() if err != nil { log.Fatalf("could not install felix plugin: %s", err) diff --git a/calico-vpp-agent/common/pubsub.go b/calico-vpp-agent/common/pubsub.go index 2ac7b253..01262e5f 100644 --- a/calico-vpp-agent/common/pubsub.go +++ b/calico-vpp-agent/common/pubsub.go @@ -27,7 +27,6 @@ const ( ChanSize = 500 PeerNodeStateChanged CalicoVppEventType = "PeerNodeStateChanged" - IpamConfChanged CalicoVppEventType = "IpamConfChanged" BGPConfChanged CalicoVppEventType = "BGPConfChanged" ConnectivityAdded CalicoVppEventType = "ConnectivityAdded" @@ -61,7 +60,6 @@ const ( NetAddedOrUpdated CalicoVppEventType = "NetAddedOrUpdated" NetDeleted CalicoVppEventType = "NetDeleted" - NetsSynced CalicoVppEventType = "NetsSynced" IpamPoolUpdate CalicoVppEventType = "IpamPoolUpdate" IpamPoolRemove CalicoVppEventType = "IpamPoolRemove" diff --git a/calico-vpp-agent/felix/felix_server.go b/calico-vpp-agent/felix/felix_server.go index e13fc128..72e4ff71 100644 --- a/calico-vpp-agent/felix/felix_server.go +++ b/calico-vpp-agent/felix/felix_server.go @@ -59,6 +59,7 @@ type Server struct { cniHandler *cni.CNIHandler connectivityHandler *connectivity.ConnectivityHandler serviceHandler *services.ServiceHandler + routeHandler *RouteHandler } // NewFelixServer creates a felix server @@ -77,6 +78,7 @@ func NewFelixServer(vpp *vpplink.VppLink, clientv3 calicov3cli.Interface, log *l cniHandler: cni.NewCNIHandler(vpp, cache, log), connectivityHandler: connectivity.NewConnectivityHandler(vpp, cache, clientv3, log), serviceHandler: services.NewServiceHandler(vpp, cache, log), + routeHandler: NewRouteHandler(log), } reg := common.RegisterHandler(server.felixServerEventChan, "felix server events") @@ -96,6 +98,10 @@ func NewFelixServer(vpp *vpplink.VppLink, clientv3 calicov3cli.Interface, log *l return server } +func (s *Server) GetRouteHandler() *RouteHandler { + return s.routeHandler +} + func (s *Server) GetFelixServerEventChan() chan any { return s.felixServerEventChan } diff --git a/calico-vpp-agent/felix/ipam.go b/calico-vpp-agent/felix/ipam.go index 7fa4d1d6..72aac3e0 100644 --- a/calico-vpp-agent/felix/ipam.go +++ b/calico-vpp-agent/felix/ipam.go @@ -21,8 +21,6 @@ import ( "github.com/pkg/errors" "github.com/projectcalico/calico/felix/proto" - - "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common" ) func (s *Server) handleIpamPoolUpdate(msg *proto.IPAMPoolUpdate) (err error) { @@ -49,11 +47,12 @@ func (s *Server) handleIpamPoolUpdate(msg *proto.IPAMPoolUpdate) (err error) { } s.connectivityHandler.OnIpamConfChanged(oldIpamPool, newIpamPool) s.cniHandler.OnIpamConfChanged(oldIpamPool, newIpamPool) - common.SendEvent(common.CalicoVppEvent{ - Type: common.IpamConfChanged, - Old: ipamPoolCopy(oldIpamPool), - New: ipamPoolCopy(newIpamPool), - }) + if s.routeHandler != nil { + err := s.routeHandler.OnIpamConfChanged(oldIpamPool, newIpamPool) + if err != nil { + s.log.Errorf("Failed to handle IPAM update in RouteHandler: %v", err) + } + } } } else { s.log.Infof("Adding pool: %s, nat:%t", msg.GetId(), newIpamPool.GetMasquerade()) @@ -65,10 +64,12 @@ func (s *Server) handleIpamPoolUpdate(msg *proto.IPAMPoolUpdate) (err error) { } s.connectivityHandler.OnIpamConfChanged(nil /*old*/, newIpamPool) s.cniHandler.OnIpamConfChanged(nil /*old*/, newIpamPool) - common.SendEvent(common.CalicoVppEvent{ - Type: common.IpamConfChanged, - New: ipamPoolCopy(newIpamPool), - }) + if s.routeHandler != nil { + err := s.routeHandler.OnIpamConfChanged(nil, newIpamPool) + if err != nil { + s.log.Errorf("Failed to handle IPAM addition in RouteHandler: %v", err) + } + } } return nil } @@ -88,13 +89,14 @@ func (s *Server) handleIpamPoolRemove(msg *proto.IPAMPoolRemove) (err error) { if err != nil { return errors.Wrap(err, "error handling ipam deletion") } - common.SendEvent(common.CalicoVppEvent{ - Type: common.IpamConfChanged, - Old: ipamPoolCopy(oldIpamPool), - New: nil, - }) s.connectivityHandler.OnIpamConfChanged(oldIpamPool, nil /* new */) s.cniHandler.OnIpamConfChanged(oldIpamPool, nil /* new */) + if s.routeHandler != nil { + err := s.routeHandler.OnIpamConfChanged(oldIpamPool, nil) + if err != nil { + s.log.Errorf("Failed to handle IPAM deletion in RouteHandler: %v", err) + } + } } else { s.log.Warnf("Deleting unknown ippool") return nil diff --git a/calico-vpp-agent/felix/route_handler.go b/calico-vpp-agent/felix/route_handler.go new file mode 100644 index 00000000..1d7c1165 --- /dev/null +++ b/calico-vpp-agent/felix/route_handler.go @@ -0,0 +1,149 @@ +// Copyright (C) 2025 Cisco Systems Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package felix + +import ( + "net" + "syscall" + + "github.com/pkg/errors" + "github.com/projectcalico/calico/felix/proto" + "github.com/sirupsen/logrus" + "github.com/vishvananda/netlink" + + "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common" + "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/watchers" +) + +// RouteHandler handles network and IPAM events by updating VPP routing configuration +type RouteHandler struct { + log *logrus.Entry + routeWatcher *watchers.RouteWatcher +} + +// NewRouteHandler creates a new RouteHandler instance +func NewRouteHandler(log *logrus.Entry) *RouteHandler { + return &RouteHandler{ + log: log, + routeWatcher: nil, + } +} + +// SetRouteWatcher sets the route watcher for performing route operations +func (h *RouteHandler) SetRouteWatcher(routeWatcher *watchers.RouteWatcher) { + h.routeWatcher = routeWatcher +} + +// OnNetDeleted handles network deletion events +func (h *RouteHandler) OnNetDeleted(netDef *common.NetworkDefinition) error { + key := netDef.Range + routes, err := h.getNetworkRoute(key, netDef.PhysicalNetworkName) + if err != nil { + h.log.Errorf("Error getting route from network deletion: %v", err) + return err + } + for _, route := range routes { + err = h.routeWatcher.DelRoute(route) + if err != nil { + h.log.Errorf("Cannot delete pool route %s through vpp tap: %v", key, err) + return err + } + } + return nil +} + +// OnNetAddedOrUpdated handles network addition/update events +func (h *RouteHandler) OnNetAddedOrUpdated(netDef *common.NetworkDefinition) error { + key := netDef.Range + routes, err := h.getNetworkRoute(key, netDef.PhysicalNetworkName) + if err != nil { + h.log.Errorf("Error getting route from network addition/update: %v", err) + return err + } + for _, route := range routes { + err = h.routeWatcher.AddRoute(route) + if err != nil { + h.log.Errorf("Cannot add pool route %s through vpp tap: %v", key, err) + return err + } + } + return nil +} + +// OnIpamConfChanged handles IPAM configuration changes +func (h *RouteHandler) OnIpamConfChanged(oldPool, newPool *proto.IPAMPool) error { + h.log.Debugf("Received IPAM config update in route handler old:%+v new:%+v", oldPool, newPool) + if newPool == nil && oldPool != nil { + routes, err := h.getNetworkRoute(oldPool.Cidr, "") + if err != nil { + h.log.Errorf("Error getting route from ipam update: %v", err) + return err + } + for _, route := range routes { + err = h.routeWatcher.DelRoute(route) + if err != nil { + h.log.Errorf("Cannot delete pool route %s through vpp tap: %v", oldPool.Cidr, err) + return err + } + } + } else if newPool != nil { + routes, err := h.getNetworkRoute(newPool.Cidr, "") + if err != nil { + h.log.Errorf("Error getting route from ipam update: %v", err) + return err + } + for _, route := range routes { + err = h.routeWatcher.AddRoute(route) + if err != nil { + h.log.Errorf("Cannot add pool route %s through vpp tap: %v", newPool.Cidr, err) + return err + } + } + } + return nil +} + +func (h *RouteHandler) getNetworkRoute(network string, physicalNet string) (route []*netlink.Route, err error) { + _, cidr, err := net.ParseCIDR(network) + if err != nil { + return nil, errors.Wrapf(err, "error parsing %s", network) + } + var routes []*netlink.Route + var order int + for _, uplinkStatus := range common.VppManagerInfo.UplinkStatuses { + if uplinkStatus.PhysicalNetworkName == physicalNet { + gw := uplinkStatus.FakeNextHopIP4 + if cidr.IP.To4() == nil { + gw = uplinkStatus.FakeNextHopIP6 + } + var priority int + if uplinkStatus.IsMain { + priority = 0 + } else { + order += 1 + priority = order + } + routes = append(routes, &netlink.Route{ + Dst: cidr, + Gw: gw, + Protocol: syscall.RTPROT_STATIC, + MTU: watchers.GetUplinkMtu(), + Priority: priority, + }) + } + } + return routes, nil +} diff --git a/calico-vpp-agent/watchers/net_watcher.go b/calico-vpp-agent/watchers/net_watcher.go index d185d20d..4091b426 100644 --- a/calico-vpp-agent/watchers/net_watcher.go +++ b/calico-vpp-agent/watchers/net_watcher.go @@ -24,6 +24,7 @@ import ( netv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1" "github.com/pkg/errors" + "github.com/projectcalico/calico/felix/proto" "github.com/sirupsen/logrus" "gopkg.in/tomb.v2" "k8s.io/apimachinery/pkg/runtime" @@ -36,6 +37,13 @@ import ( "github.com/projectcalico/vpp-dataplane/v3/vpplink" ) +// RouteHandler defines the interface for handling network and IPAM events +type RouteHandler interface { + OnNetDeleted(netDef *common.NetworkDefinition) error + OnNetAddedOrUpdated(netDef *common.NetworkDefinition) error + OnIpamConfChanged(oldPool, newPool *proto.IPAMPool) error +} + type NetWatcher struct { log *logrus.Entry vpp *vpplink.VppLink @@ -45,6 +53,7 @@ type NetWatcher struct { nads map[string]string InSync chan interface{} nodeBGPSpec *common.LocalNodeSpec + routeHandler RouteHandler currentWatchRevisionNet string currentWatchRevisionNad string @@ -65,10 +74,15 @@ func NewNetWatcher(vpp *vpplink.VppLink, log *logrus.Entry) *NetWatcher { networkDefinitions: make(map[string]*common.NetworkDefinition), nads: make(map[string]string), InSync: make(chan interface{}), + routeHandler: nil, } return &w } +func (w *NetWatcher) SetRouteHandler(routeHandler RouteHandler) { + w.routeHandler = routeHandler +} + func (w *NetWatcher) SetOurBGPSpec(nodeBGPSpec *common.LocalNodeSpec) { w.nodeBGPSpec = nodeBGPSpec } @@ -106,16 +120,13 @@ func (w *NetWatcher) resyncAndCreateWatchers() error { return errors.Wrapf(err, "Listing NetworkAttachmentDefinitions failed") } for _, nad := range nadList.Items { - err = w.onNadAdded(&nad) + err = w.OnNadAdded(&nad) if err != nil { return errors.Wrapf(err, "OnNadAdded failed for %v", nad) } } } w.InSync <- 1 - common.SendEvent(common.CalicoVppEvent{ - Type: common.NetsSynced, - }) w.currentWatchRevisionNet = netList.ResourceVersion w.currentWatchRevisionNad = nadList.ResourceVersion } @@ -200,7 +211,7 @@ func (w *NetWatcher) WatchNetworks(t *tomb.Tomb) error { w.log.Errorf("update.Object is not *NetworkAttachmentDefinition, %v", update.Object) continue } - err := w.onNadAdded(nad) + err := w.OnNadAdded(nad) if err != nil { w.log.Error(err) } @@ -210,7 +221,7 @@ func (w *NetWatcher) WatchNetworks(t *tomb.Tomb) error { w.log.Errorf("update.Object is not *NetworkAttachmentDefinition, %v", update.Object) continue } - err := w.onNadDeleted(nad) + err := w.OnNadDeleted(nad) if err != nil { w.log.Error(err) } @@ -230,7 +241,7 @@ func (w *NetWatcher) Stop() { close(w.stop) } -func (w *NetWatcher) onNadDeleted(nad *netv1.NetworkAttachmentDefinition) error { +func (w *NetWatcher) OnNadDeleted(nad *netv1.NetworkAttachmentDefinition) error { delete(w.nads, nad.Namespace+"/"+nad.Name) for key, net := range w.networkDefinitions { if net.NetAttachDefs == nad.Namespace+"/"+nad.Name { @@ -239,12 +250,18 @@ func (w *NetWatcher) onNadDeleted(nad *netv1.NetworkAttachmentDefinition) error Type: common.NetAddedOrUpdated, New: w.networkDefinitions[key], }) + if w.routeHandler != nil { + err := w.routeHandler.OnNetAddedOrUpdated(w.networkDefinitions[key]) + if err != nil { + w.log.Errorf("Failed to handle network update in RouteHandler: %v", err) + } + } } } return nil } -func (w *NetWatcher) onNadAdded(nad *netv1.NetworkAttachmentDefinition) error { +func (w *NetWatcher) OnNadAdded(nad *netv1.NetworkAttachmentDefinition) error { var nadConfig nadv1.NetConfList err := json.Unmarshal([]byte(nad.Spec.Config), &nadConfig) if err != nil { @@ -260,6 +277,12 @@ func (w *NetWatcher) onNadAdded(nad *netv1.NetworkAttachmentDefinition) error { Type: common.NetAddedOrUpdated, New: w.networkDefinitions[key], }) + if w.routeHandler != nil { + err := w.routeHandler.OnNetAddedOrUpdated(w.networkDefinitions[key]) + if err != nil { + w.log.Errorf("Failed to handle network update in RouteHandler: %v", err) + } + } } } } @@ -283,6 +306,12 @@ func (w *NetWatcher) OnNetAdded(net *networkv3.Network) error { Type: common.NetAddedOrUpdated, New: netDef, }) + if w.routeHandler != nil { + err := w.routeHandler.OnNetAddedOrUpdated(netDef) + if err != nil { + w.log.Errorf("Failed to handle network addition in RouteHandler: %v", err) + } + } return nil } @@ -300,6 +329,12 @@ func (w *NetWatcher) OnNetDeleted(netName string) error { Type: common.NetDeleted, Old: netDef, }) + if w.routeHandler != nil { + err := w.routeHandler.OnNetDeleted(netDef) + if err != nil { + w.log.Errorf("Failed to handle network deletion in RouteHandler: %v", err) + } + } return nil } diff --git a/calico-vpp-agent/watchers/uplink_route_watcher.go b/calico-vpp-agent/watchers/uplink_route_watcher.go index 048642fd..8d6d4155 100644 --- a/calico-vpp-agent/watchers/uplink_route_watcher.go +++ b/calico-vpp-agent/watchers/uplink_route_watcher.go @@ -16,14 +16,10 @@ package watchers import ( - "net" "sync" "syscall" "time" - "github.com/pkg/errors" - "github.com/projectcalico/calico/felix/proto" - "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common" "github.com/projectcalico/vpp-dataplane/v3/config" "github.com/projectcalico/vpp-dataplane/v3/vpplink" @@ -41,21 +37,13 @@ type RouteWatcher struct { addrNetlinkFailed chan struct{} addrUpdate chan struct{} closeLock sync.Mutex - eventChan chan any log *log.Entry } func NewRouteWatcher(log *log.Entry) *RouteWatcher { routeWatcher := &RouteWatcher{ - eventChan: make(chan any, common.ChanSize), - log: log, + log: log, } - reg := common.RegisterHandler(routeWatcher.eventChan, "route watcher events") - reg.ExpectEvents( - common.IpamConfChanged, - common.NetAddedOrUpdated, - common.NetDeleted, - ) return routeWatcher } @@ -142,38 +130,6 @@ func GetUplinkMtu() int { return hostMtu - encapSize } -func (r *RouteWatcher) getNetworkRoute(network string, physicalNet string) (route []*netlink.Route, err error) { - _, cidr, err := net.ParseCIDR(network) - if err != nil { - return nil, errors.Wrapf(err, "error parsing %s", network) - } - var routes []*netlink.Route - var order int - for _, uplinkStatus := range common.VppManagerInfo.UplinkStatuses { - if uplinkStatus.PhysicalNetworkName == physicalNet { - gw := uplinkStatus.FakeNextHopIP4 - if cidr.IP.To4() == nil { - gw = uplinkStatus.FakeNextHopIP6 - } - var priority int - if uplinkStatus.IsMain { - priority = 0 - } else { - order += 1 - priority = order - } - routes = append(routes, &netlink.Route{ - Dst: cidr, - Gw: gw, - Protocol: syscall.RTPROT_STATIC, - MTU: GetUplinkMtu(), - Priority: priority, - }) - } - } - return routes, nil -} - func (r *RouteWatcher) WatchRoutes(t *tomb.Tomb) error { r.netlinkFailed = make(chan struct{}, 1) r.addrUpdate = make(chan struct{}, 10) @@ -237,90 +193,6 @@ func (r *RouteWatcher) WatchRoutes(t *tomb.Tomb) error { } r.log.Warn("Route watcher stopped") return nil - case msg := <-r.eventChan: - event, ok := msg.(common.CalicoVppEvent) - if !ok { - continue - } - switch event.Type { - case common.NetDeleted: - netDef, ok := event.Old.(*common.NetworkDefinition) - if !ok { - r.log.Errorf("event.Old is not a (*common.NetworkDefinition) %v", event.Old) - goto restart - } - key := netDef.Range - routes, err := r.getNetworkRoute(key, netDef.PhysicalNetworkName) - if err != nil { - r.log.Error("Error getting route from ipam update:", err) - goto restart - } - for _, route := range routes { - err = r.DelRoute(route) - if err != nil { - r.log.Errorf("Cannot add pool route %s through vpp tap: %v", key, err) - goto restart - } - } - case common.NetAddedOrUpdated: - netDef, ok := event.New.(*common.NetworkDefinition) - if !ok { - r.log.Errorf("event.New is not a (*common.NetworkDefinition) %v", event.New) - goto restart - } - key := netDef.Range - routes, err := r.getNetworkRoute(key, netDef.PhysicalNetworkName) - if err != nil { - r.log.Error("Error getting route from ipam update:", err) - goto restart - } - for _, route := range routes { - err = r.AddRoute(route) - if err != nil { - r.log.Errorf("Cannot add pool route %s through vpp tap: %v", key, err) - goto restart - } - } - case common.IpamConfChanged: - r.log.Debugf("Received IPAM config update in route watcher old:%+v new:%+v", event.Old, event.New) - if event.New == nil && event.Old != nil { - old, ok := event.Old.(*proto.IPAMPool) - if !ok { - r.log.Errorf("event.Old is not a (*proto.IPAMPool) %v", event.Old) - goto restart - } - routes, err := r.getNetworkRoute(old.Cidr, "") - if err != nil { - r.log.Error("Error getting route from ipam update:", err) - goto restart - } - for _, route := range routes { - err = r.DelRoute(route) - if err != nil { - r.log.Errorf("Cannot delete pool route %s through vpp tap: %v", old.Cidr, err) - goto restart - } - } - } else if event.New != nil { - new, ok := event.New.(*proto.IPAMPool) - if !ok { - r.log.Errorf("event.New is not a (*proto.IPAMPool) %v", event.New) - goto restart - } - routes, err := r.getNetworkRoute(new.Cidr, "") - if err != nil { - r.log.Error("Error getting route from ipam update:", err) - goto restart - } - for _, route := range routes { - err = r.AddRoute(route) - if err != nil { - r.log.Errorf("Cannot add pool route %s through vpp tap: %v", new.Cidr, err) - goto restart - } - } - } - } case <-r.netlinkFailed: goto restart case update, ok := <-netlinkUpdates: