From 53580b00239656a550745c81e1f5b9b73005ea78 Mon Sep 17 00:00:00 2001 From: Imran Pochi Date: Mon, 27 Oct 2025 23:30:57 +0000 Subject: [PATCH 1/2] implement server agent draining Signed-off-by: Imran Pochi --- pkg/server/backend_manager.go | 44 +++++++++++++++++++++++--- pkg/server/desthost_backend_manager.go | 10 ++++-- pkg/server/server.go | 2 ++ 3 files changed, 49 insertions(+), 7 deletions(-) diff --git a/pkg/server/backend_manager.go b/pkg/server/backend_manager.go index 97b5d8c7d..78cad9e84 100644 --- a/pkg/server/backend_manager.go +++ b/pkg/server/backend_manager.go @@ -49,6 +49,25 @@ type Backend struct { // cached from conn.Context() id string idents header.Identifiers + + // draining indicates if this backend is draining and should not accept new connections + draining bool + // mu protects draining field + mu sync.RWMutex +} + +// IsDraining returns true if the backend is draining +func (b *Backend) IsDraining() bool { + b.mu.RLock() + defer b.mu.RUnlock() + return b.draining +} + +// SetDraining marks the backend as draining +func (b *Backend) SetDraining() { + b.mu.Lock() + defer b.mu.Unlock() + b.draining = true } func (b *Backend) Send(p *client.Packet) error { @@ -346,9 +365,24 @@ func (s *DefaultBackendStorage) GetRandomBackend() (*Backend, error) { if len(s.backends) == 0 { return nil, &ErrNotFound{} } - agentID := s.agentIDs[s.random.Intn(len(s.agentIDs))] - klog.V(3).InfoS("Pick agent as backend", "agentID", agentID) - // always return the first connection to an agent, because the agent - // will close later connections if there are multiple. - return s.backends[agentID][0], nil + + // Start at a random agent and check each agent in sequence + startIdx := s.random.Intn(len(s.agentIDs)) + for i := 0; i < len(s.agentIDs); i++ { + // Wrap around using modulo + currentIdx := (startIdx + i) % len(s.agentIDs) + agentID := s.agentIDs[currentIdx] + // always return the first connection to an agent, because the agent + // will close later connections if there are multiple. + backend := s.backends[agentID][0] + + if !backend.IsDraining() { + klog.V(3).InfoS("Pick agent as backend", "agentID", agentID) + return backend, nil + } + } + + // All agents are draining + klog.V(2).InfoS("No non-draining backends available") + return nil, &ErrNotFound{} } diff --git a/pkg/server/desthost_backend_manager.go b/pkg/server/desthost_backend_manager.go index 280065775..4ae5bc270 100644 --- a/pkg/server/desthost_backend_manager.go +++ b/pkg/server/desthost_backend_manager.go @@ -79,8 +79,14 @@ func (dibm *DestHostBackendManager) Backend(ctx context.Context) (*Backend, erro if destHost != "" { bes, exist := dibm.backends[destHost] if exist && len(bes) > 0 { - klog.V(5).InfoS("Get the backend through the DestHostBackendManager", "destHost", destHost) - return dibm.backends[destHost][0], nil + // Find a non-draining backend for this destination host + for _, backend := range bes { + if !backend.IsDraining() { + klog.V(5).InfoS("Get the backend through the DestHostBackendManager", "destHost", destHost) + return backend, nil + } + } + klog.V(4).InfoS("All backends for destination host are draining", "destHost", destHost) } } return nil, &ErrNotFound{} diff --git a/pkg/server/server.go b/pkg/server/server.go index ffec433c3..b3ef519df 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -999,6 +999,8 @@ func (s *ProxyServer) serveRecvBackend(backend *Backend, agentID string, recvCh case client.PacketType_DRAIN: klog.V(2).InfoS("agent is draining", "agentID", agentID) + backend.SetDraining() + klog.V(2).InfoS("marked backend as draining, will not route new requests to this agent", "agentID", agentID) default: klog.V(5).InfoS("Ignoring unrecognized packet from backend", "packet", pkt, "agentID", agentID) } From 26bd0645a556853fb157f17654881df931e40965 Mon Sep 17 00:00:00 2001 From: Imran Pochi Date: Tue, 11 Nov 2025 23:47:03 +0000 Subject: [PATCH 2/2] fallback and agent side stop sync This commit adds a fallback in the case where all the agents in system are draining. Rather than drop the request with error, we fallback to the existing behavior i.e continue to the send the request to the agent even if its draining. As for the agent side issue, if the agent has sent the DRAIN signal to the server, ideally it should stop doing the syncOnce with the server. This mistakes the server the agent is back ready. Signed-off-by: Imran Pochi --- pkg/agent/clientset.go | 8 ++++++++ pkg/server/backend_manager.go | 16 ++++++++++++++-- pkg/server/desthost_backend_manager.go | 13 ++++++++++++- 3 files changed, 34 insertions(+), 3 deletions(-) diff --git a/pkg/agent/clientset.go b/pkg/agent/clientset.go index db44b5e4a..d961ae5e1 100644 --- a/pkg/agent/clientset.go +++ b/pkg/agent/clientset.go @@ -255,6 +255,14 @@ func (cs *ClientSet) sync() { } func (cs *ClientSet) connectOnce() error { + // Skip establishing new connections if draining + select { + case <-cs.drainCh: + klog.V(2).InfoS("Skipping connectOnce - agent is draining") + return nil + default: + } + serverCount := cs.determineServerCount() // If not in syncForever mode, we only connect if we have fewer connections than the server count. diff --git a/pkg/server/backend_manager.go b/pkg/server/backend_manager.go index 78cad9e84..0ab55c285 100644 --- a/pkg/server/backend_manager.go +++ b/pkg/server/backend_manager.go @@ -366,6 +366,8 @@ func (s *DefaultBackendStorage) GetRandomBackend() (*Backend, error) { return nil, &ErrNotFound{} } + var firstDrainingBackend *Backend + // Start at a random agent and check each agent in sequence startIdx := s.random.Intn(len(s.agentIDs)) for i := 0; i < len(s.agentIDs); i++ { @@ -380,9 +382,19 @@ func (s *DefaultBackendStorage) GetRandomBackend() (*Backend, error) { klog.V(3).InfoS("Pick agent as backend", "agentID", agentID) return backend, nil } + + // Keep track of first draining backend as fallback + if firstDrainingBackend == nil { + firstDrainingBackend = backend + } + } + + // All agents are draining, use one as fallback + if firstDrainingBackend != nil { + agentID := firstDrainingBackend.id + klog.V(2).InfoS("No non-draining backends available, using draining backend as fallback", "agentID", agentID) + return firstDrainingBackend, nil } - // All agents are draining - klog.V(2).InfoS("No non-draining backends available") return nil, &ErrNotFound{} } diff --git a/pkg/server/desthost_backend_manager.go b/pkg/server/desthost_backend_manager.go index 4ae5bc270..d2a3e0f14 100644 --- a/pkg/server/desthost_backend_manager.go +++ b/pkg/server/desthost_backend_manager.go @@ -79,14 +79,25 @@ func (dibm *DestHostBackendManager) Backend(ctx context.Context) (*Backend, erro if destHost != "" { bes, exist := dibm.backends[destHost] if exist && len(bes) > 0 { + var firstDrainingBackend *Backend + // Find a non-draining backend for this destination host for _, backend := range bes { if !backend.IsDraining() { klog.V(5).InfoS("Get the backend through the DestHostBackendManager", "destHost", destHost) return backend, nil } + // Keep track of first draining backend as fallback + if firstDrainingBackend == nil { + firstDrainingBackend = backend + } + } + + // All backends for this destination are draining, use one as fallback + if firstDrainingBackend != nil { + klog.V(4).InfoS("All backends for destination host are draining, using one as fallback", "destHost", destHost) + return firstDrainingBackend, nil } - klog.V(4).InfoS("All backends for destination host are draining", "destHost", destHost) } } return nil, &ErrNotFound{}