Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 39 additions & 5 deletions pkg/server/backend_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,25 @@ type Backend struct {
// cached from conn.Context()
id string
idents header.Identifiers

// draining indicates if this backend is draining and should not accept new connections
draining bool
// mu protects draining field
mu sync.RWMutex
}

// IsDraining returns true if the backend is draining
func (b *Backend) IsDraining() bool {
b.mu.RLock()
defer b.mu.RUnlock()
return b.draining
}

// SetDraining marks the backend as draining
func (b *Backend) SetDraining() {
b.mu.Lock()
defer b.mu.Unlock()
b.draining = true
}

func (b *Backend) Send(p *client.Packet) error {
Expand Down Expand Up @@ -346,9 +365,24 @@ func (s *DefaultBackendStorage) GetRandomBackend() (*Backend, error) {
if len(s.backends) == 0 {
return nil, &ErrNotFound{}
}
agentID := s.agentIDs[s.random.Intn(len(s.agentIDs))]
klog.V(3).InfoS("Pick agent as backend", "agentID", agentID)
// always return the first connection to an agent, because the agent
// will close later connections if there are multiple.
return s.backends[agentID][0], nil

// Start at a random agent and check each agent in sequence
startIdx := s.random.Intn(len(s.agentIDs))
for i := 0; i < len(s.agentIDs); i++ {
// Wrap around using modulo
currentIdx := (startIdx + i) % len(s.agentIDs)
agentID := s.agentIDs[currentIdx]
// always return the first connection to an agent, because the agent
// will close later connections if there are multiple.
backend := s.backends[agentID][0]

if !backend.IsDraining() {
klog.V(3).InfoS("Pick agent as backend", "agentID", agentID)
return backend, nil
}
}

// All agents are draining
klog.V(2).InfoS("No non-draining backends available")
return nil, &ErrNotFound{}
}
10 changes: 8 additions & 2 deletions pkg/server/desthost_backend_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,14 @@ func (dibm *DestHostBackendManager) Backend(ctx context.Context) (*Backend, erro
if destHost != "" {
bes, exist := dibm.backends[destHost]
if exist && len(bes) > 0 {
klog.V(5).InfoS("Get the backend through the DestHostBackendManager", "destHost", destHost)
return dibm.backends[destHost][0], nil
// Find a non-draining backend for this destination host
for _, backend := range bes {
if !backend.IsDraining() {
klog.V(5).InfoS("Get the backend through the DestHostBackendManager", "destHost", destHost)
return backend, nil
}
}
klog.V(4).InfoS("All backends for destination host are draining", "destHost", destHost)
}
}
return nil, &ErrNotFound{}
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,8 @@ func (s *ProxyServer) serveRecvBackend(backend *Backend, agentID string, recvCh

case client.PacketType_DRAIN:
klog.V(2).InfoS("agent is draining", "agentID", agentID)
backend.SetDraining()
klog.V(2).InfoS("marked backend as draining, will not route new requests to this agent", "agentID", agentID)
default:
klog.V(5).InfoS("Ignoring unrecognized packet from backend", "packet", pkt, "agentID", agentID)
}
Expand Down