Skip to content

Commit 0e1b83a

Browse files
committed
use podList instead of GetActivePods
Signed-off-by: Kfir Toledo <[email protected]>
1 parent c94f1f6 commit 0e1b83a

File tree

5 files changed

+22
-31
lines changed

5 files changed

+22
-31
lines changed

cmd/epp/runner/runner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ func (r *Runner) parsePluginsConfiguration(ctx context.Context, ds datastore.Dat
325325
}
326326

327327
r.registerInTreePlugins()
328-
handle := plugins.NewEppHandle(ctx, ds.GetActivePods)
328+
handle := plugins.NewEppHandle(ctx, ds.PodList)
329329
config, err := loader.LoadConfig(configBytes, handle, logger)
330330

331331
if err != nil {

pkg/epp/datastore/datastore.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ type Datastore interface {
6363
PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics
6464
PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool
6565
PodDelete(namespacedName types.NamespacedName)
66-
GetActivePods() []types.NamespacedName
6766

6867
// Clears the store state, happens when the pool gets deleted.
6968
Clear()
@@ -233,16 +232,6 @@ func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool {
233232
return ok
234233
}
235234

236-
// GetActivePods returns a list of all active pods.
237-
func (ds *datastore) GetActivePods() []types.NamespacedName {
238-
var namespacedNames []types.NamespacedName
239-
ds.pods.Range(func(k, _ any) bool {
240-
namespacedNames = append(namespacedNames, k.(types.NamespacedName))
241-
return true
242-
})
243-
return namespacedNames
244-
}
245-
246235
func (ds *datastore) PodDelete(namespacedName types.NamespacedName) {
247236
v, ok := ds.pods.LoadAndDelete(namespacedName)
248237
if ok {

pkg/epp/plugins/handle.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222

23-
"k8s.io/apimachinery/pkg/types"
23+
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
2424
)
2525

2626
// Handle provides plugins a set of standard data and tools to work with
@@ -30,8 +30,8 @@ type Handle interface {
3030

3131
HandlePlugins
3232

33-
// GetActivePods returns a list of all active pods
34-
GetActivePods() []types.NamespacedName
33+
// PodList lists pods matching the given predicate.
34+
PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics
3535
}
3636

3737
// HandlePlugins defines a set of APIs to work with instantiated plugins
@@ -49,14 +49,14 @@ type HandlePlugins interface {
4949
GetAllPluginsWithNames() map[string]Plugin
5050
}
5151

52-
// GetActivePodsFunc is a function that returns a list of all active pods.
53-
type GetActivePodsFunc func() []types.NamespacedName
52+
// PodListFunc is a function type that filters and returns a list of pod metrics
53+
type PodListFunc func(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics
5454

5555
// eppHandle is an implementation of the interface plugins.Handle
5656
type eppHandle struct {
5757
ctx context.Context
5858
HandlePlugins
59-
getActivePods GetActivePodsFunc
59+
podList PodListFunc
6060
}
6161

6262
// Context returns a context the plugins can use, if they need one
@@ -93,17 +93,18 @@ func (h *eppHandlePlugins) GetAllPluginsWithNames() map[string]Plugin {
9393
return h.plugins
9494
}
9595

96-
// GetActivePods returns a function that returns a list of all active pods
97-
func (h *eppHandle) GetActivePods() []types.NamespacedName {
98-
return h.getActivePods()
96+
// PodList lists pods matching the given predicate.
97+
func (h *eppHandle) PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics {
98+
return h.podList(predicate)
9999
}
100100

101-
func NewEppHandle(ctx context.Context, getActivePods GetActivePodsFunc) Handle {
101+
func NewEppHandle(ctx context.Context, podList PodListFunc) Handle {
102102
return &eppHandle{
103103
ctx: ctx,
104104
HandlePlugins: &eppHandlePlugins{
105105
plugins: map[string]Plugin{},
106106
},
107+
podList: podList,
107108
}
108109
}
109110

pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
k8stypes "k8s.io/apimachinery/pkg/types"
2828
"sigs.k8s.io/controller-runtime/pkg/log"
2929

30+
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
3031
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
3132
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
3233
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
@@ -148,7 +149,7 @@ func PrefixCachePluginFactory(name string, rawParameters json.RawMessage, handle
148149
}
149150

150151
p := New(handle.Context(), parameters).WithName(name)
151-
go p.StartPodActiveWatcher(handle.Context(), handle)
152+
go p.CleanUpInactivePods(handle.Context(), handle)
152153
return p, nil
153154
}
154155

@@ -254,8 +255,8 @@ func (p *Plugin) matchLongestPrefix(ctx context.Context, hashes []BlockHash) map
254255
return res
255256
}
256257

257-
// StartPodActiveWatcher starts a goroutine that watches for active pods.
258-
func (m *Plugin) StartPodActiveWatcher(ctx context.Context, handle plugins.Handle) {
258+
// CleanUpInactivePods starts a goroutine that watches for inactive pods.
259+
func (m *Plugin) CleanUpInactivePods(ctx context.Context, handle plugins.Handle) {
259260
logger := log.FromContext(ctx).V(logutil.VERBOSE)
260261

261262
ticker := time.NewTicker(PodActiveCheckInterval)
@@ -269,12 +270,13 @@ func (m *Plugin) StartPodActiveWatcher(ctx context.Context, handle plugins.Handl
269270
return
270271
case <-ticker.C:
271272
now := time.Now()
272-
activePods := handle.GetActivePods()
273+
274+
activePods := handle.PodList(func(_ backendmetrics.PodMetrics) bool { return true })
273275

274276
// Track active pods
275277
activeSet := make(map[ServerID]struct{}, len(activePods))
276278
for _, np := range activePods {
277-
id := ServerID(np)
279+
id := ServerID(np.GetPod().NamespacedName)
278280
activeSet[id] = struct{}{}
279281
podLastSeen[id] = now
280282
}

test/utils/handle.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@ package utils
1919
import (
2020
"context"
2121

22-
k8stypes "k8s.io/apimachinery/pkg/types"
23-
22+
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
2423
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
2524
)
2625

@@ -35,8 +34,8 @@ func (h *testHandle) Context() context.Context {
3534
return h.ctx
3635
}
3736

38-
func (h *testHandle) GetActivePods() []k8stypes.NamespacedName {
39-
return []k8stypes.NamespacedName{}
37+
func (h *testHandle) PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics {
38+
return []backendmetrics.PodMetrics{}
4039
}
4140

4241
type testHandlePlugins struct {

0 commit comments

Comments
 (0)