Skip to content

Conversation

aritrbas
Copy link
Collaborator

@aritrbas aritrbas commented Sep 11, 2025

This PR refactors the BGP peer management architecture to (1) eliminate the pubsub/event channel usage from PeerWatcher and SecretWatcher and (2) achieve a clear separation of the watching logic from the handling logic.

Earlier,

  • PeerWatcher contained both watching logic and business logic (selectPeers() and state reconciliation).
  • PeerHandler had an event loop, making it a "server" instead of a pure "handler".
  • SecretWatcher sent events through pubsub instead of direct calls.
  • PeerHandler maintained its own BGPConf (duplicate state) instead of using the shared Felix cache.

Now,

  • All pubsub/event channel usage have been removed from PeerWatcher. PeerWatcher now directly calls handler.ProcessPeers(peers).
  • PeerWatcher only watches Calico BGPPeer resources, all business logic (selectPeers() and state reconciliation) have been moved to PeerHandler.
  • SecretWatcher no longer sends events (no pubsub), instead it directly calls registered handlers.
  • PeerHandler has pure business logic now, all event looping and callback registrations have been removed. PeerHandler now exposes methods that are called directly by RoutingServer's event loop.
  • RoutingServer's event loop handles PeerNodeStateChanged events. BGPWatcher event loop receives the event and calls peerHandler.OnPeerNodeStateChanged(old, new).
  • PeerWatcherSecretWatcher relationship has been preserved. PeerWatcher still manages secret sweep for referenced secrets and calls secretWatcher.SweepStale(activeSecrets) after processing peers.

Architecture Changes

Before (Event-Based):

SecretWatcher ──BGPSecretChanged event──> routingServerEventChan
                                              │
                                              ├──> PeerWatcher (has state, event loop)
                                              │    ├── ProcessPeers(peers, state, secrets)
                                              │    └── selectPeers() logic
                                              │
                                              └──> BGPWatcher

After (Direct Calls, No Event Loop in Handler):

SecretWatcher ──RegisterHandler──> PeerWatcher ──calls──> PeerHandler (owns state, business logic)
      │                                                        │
      │                                                        ├── ProcessPeers(peers)
      │                                                        ├── selectPeers() logic
      └────────────GetSecret()──────────────────────────────>  └── getSecretValue()

FelixServer:
  - Initializes PeerHandler in NewFelixServer()
  - PeerHandler uses shared cache (cache.BGPConf)

PeerWatcher:
  - Watches peers from Calico
  - Sends peer list to Handler
  - Manages secret watcher sweep

PeerHandler:
  - Owns all state and business logic
  - Gets secrets directly when needed via SecretGetter interface

RoutingServer (bgp_watcher.go):
  - Has event loop (WatchBGPPath)
  - Handles PeerNodeStateChanged by calling PeerHandler.OnPeerNodeStateChanged()

Made the changes on top of the nsk-split-svc branch for now, will rebase on top of master once those commits for single thread agent are merged.

@aritrbas aritrbas force-pushed the abasu-peers-watcher-rem-pubsub branch from ffb8c29 to eb5adeb Compare October 10, 2025 00:20
Copy link
Collaborator

@sknat sknat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many thanks for taking a stab at this !
A couple comments inline, I think there is still room for more untangling of the logic.
Tell me if anything sounds unclear

w := &PeerWatcher{
log: log,
clientv3: clientv3,
handler: handler,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of passing a handler here, could we emit events ?
i.e.

felixServer.GetFelixServerEventChan() <- &common.ProcessPeers{peers.Items}

and in the felixServer main loop explicitly calling handlers

func (s *Server) handleFelixServerEvents(msg interface{}) (err error) {
...
case *common.ProcessPeers:
		s.peerHandler.ProcessPeers(evt)

}
return nil
// Send the peer list to the handler for processing
return w.handler.ProcessPeers(peers.Items)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would then be an event

// OnSecretChanged handles secret change events and triggers peer reconciliation
func (w *PeerWatcher) OnSecretChanged(secretName string) {
w.log.Infof("Secret '%s' changed, notifying handler", secretName)
w.handler.OnSecretChanged(secretName)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing here, we could make OnSecretChanged be an event,
that way we wouldn't have to keep a reference to the handler here

w.log.Infof("Secret '%s' changed, notifying handler", secretName)
w.handler.OnSecretChanged(secretName)
// Trigger a resync to apply the new secret
err := w.resyncPeers()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this shouldn't be needed anymore :
Before it was more convenient to call resyncPeers() as this was doing both scanning, and constituting the local cache of Peers+Secrets.
But now that the reconstitution is done in the handlers, we can do everything there, and maybe call the same code in both cases (peerChanged & secretChanged)

func (sw *secretWatcher) OnAdd(obj interface{}, isInInitialList bool) {
func (sw *SecretWatcher) notifySecretChanged(secretName string) {
for _, handler := range sw.handlers {
handler.OnSecretChanged(secretName)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we rely on the fact that the peerWatcher does not need to be informed when the secret changes, and we make a common.OnSecretChanged towards the felix server, we could remove the need to have keep a sw.handler reference.

We could simply

eventChan <- &common.OnSecretChanged{...}


nodeStatesByName map[string]common.LocalNodeSpec

secretWatcher watchers.SecretGetter
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could probably remove the reference to the secretWatcher by adding a secret store in the peerHandler. We could create events emitted by the secretWatcher

common.SecretAdded{...}
common.SecretDelete{...}

that would be received in the felixServer event loop,
call felix.peerHandler.OnSecret{Added,Deleted}(evt)
which would update a local cache map.

Below h.secretWatcher.GetSecret(secretName, secretKey) just becomes a lookup in the map in question

log: log,
clientv3: clientv3,
handler: handler,
secretWatcher: secretWatcher,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for simplicity, if we remove the need for handler's registration by introducing events
we could instantiate the secretWatcher directly here
i.e.

secretWatcher: watchers.NewSecretWatcher(k8sclient, log.WithFields(logrus.Fields{"component": "secret-watcher"}))

Comment on lines +176 to +183
activeSecrets := make(map[string]struct{})
for _, peer := range peers.Items {
if peer.Spec.Password != nil && peer.Spec.Password.SecretKeyRef != nil {
secretName := peer.Spec.Password.SecretKeyRef.Name
activeSecrets[secretName] = struct{}{}
}
}
return true
}
w.secretWatcher.SweepStale(activeSecrets)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make this w.secretWatcher.OnPeerListUpdated(peers.Items) and have the sweep logic in the secret watcher ?

}

func (sw *secretWatcher) GetSecret(name, key string) (string, error) {
func (sw *SecretWatcher) GetSecret(name, key string) (string, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following on the comment above, we could directly start watching for secrets when OnPeerListUpdated() is called with a new Peer that has secrets attached.

And emit events when we receive the secret's contents.

w.log.Warn("Unrecognized secret change event received. Ignoring...")
}
// Re-sync peers when watch events occur
err = w.resyncPeers()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we have an isolated watcher, we could implement the watch logic :

  • we would cache the list of peers locally on the peerWatcher
  • when update/delete/create happens, we update the list, and send a copy as an event over the eventChan

We only resort to listing again if an error happens

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Further simplification would be to send separated events for ADD/DEL/UPD over the eventchan, but this will require changing the handler's logic which might be beyond the scope of this PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants