Skip to content

Commit 574bd69

Browse files
committed
use podList instead of GetActivePods
Signed-off-by: Kfir Toledo <[email protected]>
1 parent c94f1f6 commit 574bd69

File tree

6 files changed

+41
-51
lines changed

6 files changed

+41
-51
lines changed

cmd/epp/runner/runner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ func (r *Runner) parsePluginsConfiguration(ctx context.Context, ds datastore.Dat
325325
}
326326

327327
r.registerInTreePlugins()
328-
handle := plugins.NewEppHandle(ctx, ds.GetActivePods)
328+
handle := plugins.NewEppHandle(ctx, ds.PodList)
329329
config, err := loader.LoadConfig(configBytes, handle, logger)
330330

331331
if err != nil {

pkg/epp/datastore/datastore.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ type Datastore interface {
6363
PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics
6464
PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool
6565
PodDelete(namespacedName types.NamespacedName)
66-
GetActivePods() []types.NamespacedName
6766

6867
// Clears the store state, happens when the pool gets deleted.
6968
Clear()
@@ -233,16 +232,6 @@ func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool {
233232
return ok
234233
}
235234

236-
// GetActivePods returns a list of all active pods.
237-
func (ds *datastore) GetActivePods() []types.NamespacedName {
238-
var namespacedNames []types.NamespacedName
239-
ds.pods.Range(func(k, _ any) bool {
240-
namespacedNames = append(namespacedNames, k.(types.NamespacedName))
241-
return true
242-
})
243-
return namespacedNames
244-
}
245-
246235
func (ds *datastore) PodDelete(namespacedName types.NamespacedName) {
247236
v, ok := ds.pods.LoadAndDelete(namespacedName)
248237
if ok {

pkg/epp/plugins/handle.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222

23-
"k8s.io/apimachinery/pkg/types"
23+
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
2424
)
2525

2626
// Handle provides plugins a set of standard data and tools to work with
@@ -30,8 +30,8 @@ type Handle interface {
3030

3131
HandlePlugins
3232

33-
// GetActivePods returns a list of all active pods
34-
GetActivePods() []types.NamespacedName
33+
// PodList lists pods matching the given predicate.
34+
PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics
3535
}
3636

3737
// HandlePlugins defines a set of APIs to work with instantiated plugins
@@ -49,14 +49,14 @@ type HandlePlugins interface {
4949
GetAllPluginsWithNames() map[string]Plugin
5050
}
5151

52-
// GetActivePodsFunc is a function that returns a list of all active pods.
53-
type GetActivePodsFunc func() []types.NamespacedName
52+
// PodListFunc is a function type that filters and returns a list of pod metrics
53+
type PodListFunc func(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics
5454

5555
// eppHandle is an implementation of the interface plugins.Handle
5656
type eppHandle struct {
5757
ctx context.Context
5858
HandlePlugins
59-
getActivePods GetActivePodsFunc
59+
podList PodListFunc
6060
}
6161

6262
// Context returns a context the plugins can use, if they need one
@@ -93,17 +93,18 @@ func (h *eppHandlePlugins) GetAllPluginsWithNames() map[string]Plugin {
9393
return h.plugins
9494
}
9595

96-
// GetActivePods returns a function that returns a list of all active pods
97-
func (h *eppHandle) GetActivePods() []types.NamespacedName {
98-
return h.getActivePods()
96+
// PodList lists pods matching the given predicate.
97+
func (h *eppHandle) PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics {
98+
return h.podList(predicate)
9999
}
100100

101-
func NewEppHandle(ctx context.Context, getActivePods GetActivePodsFunc) Handle {
101+
func NewEppHandle(ctx context.Context, podList PodListFunc) Handle {
102102
return &eppHandle{
103103
ctx: ctx,
104104
HandlePlugins: &eppHandlePlugins{
105105
plugins: map[string]Plugin{},
106106
},
107+
podList: podList,
107108
}
108109
}
109110

pkg/epp/scheduling/framework/plugins/multi/prefix/indexer.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,3 +169,15 @@ func (i *indexer) RemovePod(pod ServerID) {
169169
delete(i.podToLRU, pod)
170170
i.mu.Unlock()
171171
}
172+
173+
// Pods returns the list of all pods currently tracked in the indexer.
174+
func (i *indexer) Pods() []ServerID {
175+
i.mu.RLock()
176+
defer i.mu.RUnlock()
177+
178+
pods := make([]ServerID, 0, len(i.podToLRU))
179+
for pod := range i.podToLRU {
180+
pods = append(pods, pod)
181+
}
182+
return pods
183+
}

pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
k8stypes "k8s.io/apimachinery/pkg/types"
2828
"sigs.k8s.io/controller-runtime/pkg/log"
2929

30+
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
3031
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
3132
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
3233
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
@@ -58,8 +59,7 @@ const (
5859
)
5960

6061
const (
61-
PodActiveCheckInterval = 1 * time.Minute
62-
PodInactivityTimeout = 5 * time.Minute
62+
PodActiveCheckInterval = 2 * time.Minute
6363
)
6464

6565
var DefaultConfig = Config{
@@ -93,6 +93,7 @@ type Indexer interface {
9393
Get(hash BlockHash) podSet
9494
Add(hashes []BlockHash, server ServerID)
9595
RemovePod(server ServerID)
96+
Pods() []ServerID
9697
}
9798

9899
// BlockHash is a hash of the block of request body.
@@ -148,7 +149,7 @@ func PrefixCachePluginFactory(name string, rawParameters json.RawMessage, handle
148149
}
149150

150151
p := New(handle.Context(), parameters).WithName(name)
151-
go p.StartPodActiveWatcher(handle.Context(), handle)
152+
go p.CleanUpInactivePods(handle.Context(), handle)
152153
return p, nil
153154
}
154155

@@ -254,39 +255,27 @@ func (p *Plugin) matchLongestPrefix(ctx context.Context, hashes []BlockHash) map
254255
return res
255256
}
256257

257-
// StartPodActiveWatcher starts a goroutine that watches for active pods.
258-
func (m *Plugin) StartPodActiveWatcher(ctx context.Context, handle plugins.Handle) {
258+
// CleanUpInactivePods starts a goroutine that watches for inactive pods.
259+
func (m *Plugin) CleanUpInactivePods(ctx context.Context, handle plugins.Handle) {
259260
logger := log.FromContext(ctx).V(logutil.VERBOSE)
260-
261261
ticker := time.NewTicker(PodActiveCheckInterval)
262262
defer ticker.Stop()
263263

264-
podLastSeen := make(map[ServerID]time.Time)
265-
266264
for {
267265
select {
268266
case <-ctx.Done():
269267
return
270268
case <-ticker.C:
271-
now := time.Now()
272-
activePods := handle.GetActivePods()
273-
274-
// Track active pods
275-
activeSet := make(map[ServerID]struct{}, len(activePods))
276-
for _, np := range activePods {
277-
id := ServerID(np)
278-
activeSet[id] = struct{}{}
279-
podLastSeen[id] = now
269+
activePodMetrics := handle.PodList(func(_ backendmetrics.PodMetrics) bool { return true })
270+
activePods := make(map[ServerID]struct{}, len(activePodMetrics))
271+
for _, pm := range activePodMetrics {
272+
activePods[ServerID(pm.GetPod().NamespacedName)] = struct{}{}
280273
}
281274

282-
// Remove stale pods
283-
for pod, lastSeen := range podLastSeen {
284-
if _, stillActive := activeSet[pod]; !stillActive {
285-
if now.Sub(lastSeen) > PodInactivityTimeout {
286-
m.indexer.RemovePod(pod)
287-
delete(podLastSeen, pod)
288-
logger.Info("Removed inactive pod from prefix cache", "pod", pod)
289-
}
275+
for _, pod := range m.indexer.Pods() {
276+
if _, ok := activePods[pod]; !ok {
277+
m.indexer.RemovePod(pod)
278+
logger.Info("Removed pod not in active set", "pod", pod)
290279
}
291280
}
292281
}

test/utils/handle.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@ package utils
1919
import (
2020
"context"
2121

22-
k8stypes "k8s.io/apimachinery/pkg/types"
23-
22+
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
2423
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
2524
)
2625

@@ -35,8 +34,8 @@ func (h *testHandle) Context() context.Context {
3534
return h.ctx
3635
}
3736

38-
func (h *testHandle) GetActivePods() []k8stypes.NamespacedName {
39-
return []k8stypes.NamespacedName{}
37+
func (h *testHandle) PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics {
38+
return []backendmetrics.PodMetrics{}
4039
}
4140

4241
type testHandlePlugins struct {

0 commit comments

Comments
 (0)