Skip to content

Commit e12b068

Browse files
committed
Split Connectivity into watcher/handler under felix
This patch moves the Connectivity handlers in the main felix loop to allow lockless access to the cache. The intent is to move away from a model with multiple servers replicating state and communicating over a pubsub. This being prone to race conditions, deadlocks, and not providing many benefits as scale & asynchronicity will not be a constraint on nodes with relatively small number of pods (~100) as is k8s default. Signed-off-by: Nathan Skrzypczak <[email protected]>
1 parent 9ff5761 commit e12b068

File tree

21 files changed

+646
-727
lines changed

21 files changed

+646
-727
lines changed

calico-vpp-agent/cmd/calico_vpp_dataplane.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
apipb "github.com/osrg/gobgp/v3/api"
2727
bgpserver "github.com/osrg/gobgp/v3/pkg/server"
2828
"github.com/pkg/errors"
29-
felixconfig "github.com/projectcalico/calico/felix/config"
3029
calicov3cli "github.com/projectcalico/calico/libcalico-go/lib/clientv3"
3130
"github.com/sirupsen/logrus"
3231
"google.golang.org/grpc"
@@ -35,7 +34,6 @@ import (
3534
"k8s.io/client-go/rest"
3635

3736
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common"
38-
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/connectivity"
3937
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/felix"
4038
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/health"
4139
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/prometheus"
@@ -160,7 +158,6 @@ func main() {
160158
if err != nil {
161159
log.Fatalf("could not install felix plugin: %s", err)
162160
}
163-
connectivityServer := connectivity.NewConnectivityServer(vpp, felixServer, clientv3, log.WithFields(logrus.Fields{"subcomponent": "connectivity"}))
164161

165162
/* Pubsub should now be registered */
166163

@@ -187,14 +184,13 @@ func main() {
187184
ticker := time.NewTicker(10 * time.Second)
188185
defer ticker.Stop()
189186

190-
var felixConfig *felixconfig.Config
191187
var ourBGPSpec *common.LocalNodeSpec
192188
felixConfigReceived := false
193189
bgpSpecReceived := false
194190

195191
for !felixConfigReceived || !bgpSpecReceived {
196192
select {
197-
case felixConfig = <-felixServer.FelixConfigChan:
193+
case <-felixServer.GotFelixConfig:
198194
felixConfigReceived = true
199195
log.Info("FelixConfig received from calico pod")
200196
case ourBGPSpec = <-felixServer.GotOurNodeBGPchan():
@@ -218,7 +214,6 @@ func main() {
218214
log.Info("Felix configuration received")
219215

220216
prefixWatcher.SetOurBGPSpec(ourBGPSpec)
221-
connectivityServer.SetOurBGPSpec(ourBGPSpec)
222217
routingServer.SetOurBGPSpec(ourBGPSpec)
223218
serviceServer.SetOurBGPSpec(ourBGPSpec)
224219
localSIDWatcher.SetOurBGPSpec(ourBGPSpec)
@@ -236,15 +231,12 @@ func main() {
236231
}
237232
}
238233

239-
connectivityServer.SetFelixConfig(felixConfig)
240-
241234
Go(routeWatcher.WatchRoutes)
242235
Go(linkWatcher.WatchLinks)
243236
Go(bgpConfigurationWatcher.WatchBGPConfiguration)
244237
Go(prefixWatcher.WatchPrefix)
245238
Go(peerWatcher.WatchBGPPeers)
246239
Go(bgpFilterWatcher.WatchBGPFilters)
247-
Go(connectivityServer.ServeConnectivity)
248240
Go(routingServer.ServeRouting)
249241
Go(serviceServer.ServeService)
250242
Go(cniServer.ServeCNI)

calico-vpp-agent/common/common.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -540,24 +540,6 @@ func FormatBGPConfiguration(conf *calicov3.BGPConfigurationSpec) string {
540540
)
541541
}
542542

543-
func FetchNDataThreads(vpp *vpplink.VppLink, log *logrus.Entry) int {
544-
nVppWorkers, err := vpp.GetNumVPPWorkers()
545-
if err != nil {
546-
log.Panicf("Error getting number of VPP workers: %v", err)
547-
}
548-
nDataThreads := nVppWorkers
549-
if config.GetCalicoVppIpsec().IpsecNbAsyncCryptoThread > 0 {
550-
nDataThreads = nVppWorkers - config.GetCalicoVppIpsec().IpsecNbAsyncCryptoThread
551-
if nDataThreads <= 0 {
552-
log.Errorf("Couldn't fulfill request [crypto=%d total=%d]", config.GetCalicoVppIpsec().IpsecNbAsyncCryptoThread, nVppWorkers)
553-
nDataThreads = nVppWorkers
554-
}
555-
log.Infof("Using ipsec workers [data=%d crypto=%d]", nDataThreads, nVppWorkers-nDataThreads)
556-
557-
}
558-
return nDataThreads
559-
}
560-
561543
func CompareIPList(newIPList, oldIPList []net.IP) (added []net.IP, deleted []net.IP, changed bool) {
562544
oldIPListMap := make(map[string]bool)
563545
newIPListMap := make(map[string]bool)

calico-vpp-agent/common/pubsub.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ const (
2727
ChanSize = 500
2828

2929
PeerNodeStateChanged CalicoVppEventType = "PeerNodeStateChanged"
30-
FelixConfChanged CalicoVppEventType = "FelixConfChanged"
3130
IpamConfChanged CalicoVppEventType = "IpamConfChanged"
3231
BGPConfChanged CalicoVppEventType = "BGPConfChanged"
3332

@@ -66,8 +65,6 @@ const (
6665

6766
IpamPoolUpdate CalicoVppEventType = "IpamPoolUpdate"
6867
IpamPoolRemove CalicoVppEventType = "IpamPoolRemove"
69-
70-
WireguardPublicKeyChanged CalicoVppEventType = "WireguardPublicKeyChanged"
7168
)
7269

7370
var (

0 commit comments

Comments
 (0)