Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,10 @@ func (r *Runner) Run(ctx context.Context) error {
setupLog.Error(err, "Failed to extract GKNN")
return err
}
disableK8sCrdReconcile := *endpointSelector != ""
ds, err := setupDatastore(setupLog, ctx, epf, int32(*modelServerMetricsPort), disableK8sCrdReconcile, *poolName, *poolNamespace, *endpointSelector, *endpointTargetPorts)
startCrdReconcilers := *endpointSelector == "" // If endpointSelector is empty, it means it's not in the standalone mode. Then we should start the inferencePool and other CRD Reconciler.
controllerCfg := runserver.NewControllerConfig(startCrdReconcilers)

ds, err := setupDatastore(setupLog, ctx, epf, int32(*modelServerMetricsPort), startCrdReconcilers, *poolName, *poolNamespace, *endpointSelector, *endpointTargetPorts)
if err != nil {
setupLog.Error(err, "Failed to setup datastore")
return err
Expand Down Expand Up @@ -286,7 +288,7 @@ func (r *Runner) Run(ctx context.Context) error {
isLeader := &atomic.Bool{}
isLeader.Store(false)

mgr, err := runserver.NewDefaultManager(disableK8sCrdReconcile, *gknn, cfg, metricsServerOptions, *haEnableLeaderElection)
mgr, err := runserver.NewDefaultManager(controllerCfg, *gknn, cfg, metricsServerOptions, *haEnableLeaderElection)
if err != nil {
setupLog.Error(err, "Failed to create controller manager")
return err
Expand Down Expand Up @@ -367,7 +369,7 @@ func (r *Runner) Run(ctx context.Context) error {
GrpcPort: *grpcPort,
GKNN: *gknn,
Datastore: ds,
DisableK8sCrdReconcile: disableK8sCrdReconcile,
ControllerCfg: controllerCfg,
SecureServing: *secureServing,
HealthChecking: *healthChecking,
CertPath: *certPath,
Expand Down Expand Up @@ -405,8 +407,8 @@ func (r *Runner) Run(ctx context.Context) error {
return nil
}

func setupDatastore(setupLog logr.Logger, ctx context.Context, epFactory datalayer.EndpointFactory, modelServerMetricsPort int32, disableK8sCrdReconcile bool, namespace, name, endpointSelector, endpointTargetPorts string) (datastore.Datastore, error) {
if !disableK8sCrdReconcile {
func setupDatastore(setupLog logr.Logger, ctx context.Context, epFactory datalayer.EndpointFactory, modelServerMetricsPort int32, startCrdReconcilers bool, namespace, name, endpointSelector, endpointTargetPorts string) (datastore.Datastore, error) {
if startCrdReconcilers {
return datastore.NewDatastore(ctx, epFactory, modelServerMetricsPort), nil
} else {
endpointPool := datalayer.NewEndpointPool(namespace, name)
Expand Down
79 changes: 79 additions & 0 deletions pkg/epp/server/controller_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package server

import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2"
)

type ControllerConfig struct {
startCrdReconcilers bool
hasInferenceObjective bool
hasInferenceModelRewrites bool
}

func NewControllerConfig(startCrdReconcilers bool) ControllerConfig {
return ControllerConfig{
startCrdReconcilers: startCrdReconcilers,
}
}

func (cc *ControllerConfig) PopulateControllerConfig(cfg *rest.Config) error {
if !cc.startCrdReconcilers {
return nil
}
dc, err := discovery.NewDiscoveryClientForConfig(cfg)
if err != nil {
return err
}
cc.populateWithDiscovery(dc)
return nil
}

func (cc *ControllerConfig) populateWithDiscovery(dc discovery.DiscoveryInterface) {
inferenceObjectiveGVK := schema.GroupVersionKind{
Group: v1alpha2.GroupVersion.Group,
Version: v1alpha2.GroupVersion.Version,
Kind: "InferenceObjective",
}
cc.hasInferenceObjective = gvkExists(dc, inferenceObjectiveGVK)

inferenceModelRewriteGVK := schema.GroupVersionKind{
Group: v1alpha2.GroupVersion.Group,
Version: v1alpha2.GroupVersion.Version,
Kind: "InferenceModelRewrite",
}
cc.hasInferenceModelRewrites = gvkExists(dc, inferenceModelRewriteGVK)
}

func gvkExists(dc discovery.DiscoveryInterface, gvk schema.GroupVersionKind) bool {
apiResourceList, err := dc.ServerResourcesForGroupVersion(gvk.GroupVersion().String())
if err != nil {
ctrl.Log.WithName("controllerConfig").Error(err, "Checking server resources error.")
return false
}
for _, r := range apiResourceList.APIResources {
if r.Kind == gvk.Kind {
return true
}
}
return false
}
104 changes: 104 additions & 0 deletions pkg/epp/server/controller_config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package server

import (
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/discovery/fake"
k8stesting "k8s.io/client-go/testing"
"sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2"
)

func TestNewControllerConfig(t *testing.T) {
c := NewControllerConfig(true)
if !c.startCrdReconcilers {
t.Error("expected startCrdReconcilers to be true")
}

c = NewControllerConfig(false)
if c.startCrdReconcilers {
t.Error("expected startCrdReconcilers to be false")
}
}

func TestPopulateWithDiscovery(t *testing.T) {
tests := []struct {
name string
apiResources []metav1.APIResource
wantInferenceObjective bool
wantInferenceModelRewrite bool
}{
{
name: "Both resources exist",
apiResources: []metav1.APIResource{
{Kind: "InferenceObjective"},
{Kind: "InferenceModelRewrite"},
},
wantInferenceObjective: true,
wantInferenceModelRewrite: true,
},
{
name: "Resources do not exist",
apiResources: []metav1.APIResource{},
wantInferenceObjective: false,
wantInferenceModelRewrite: false,
},
{
name: "Only InferenceObjective exists",
apiResources: []metav1.APIResource{
{Kind: "InferenceObjective"},
},
wantInferenceObjective: true,
wantInferenceModelRewrite: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Setup fake discovery for this specific test case
fakeDiscovery := &fake.FakeDiscovery{
Fake: &k8stesting.Fake{},
}
fakeDiscovery.Resources = []*metav1.APIResourceList{
{
GroupVersion: v1alpha2.GroupVersion.String(),
APIResources: tt.apiResources,
},
}

cc := &ControllerConfig{}
cc.populateWithDiscovery(fakeDiscovery)

if cc.hasInferenceObjective != tt.wantInferenceObjective {
t.Errorf("populateWithDiscovery() hasInferenceObjective = %v, want %v", cc.hasInferenceObjective, tt.wantInferenceObjective)
}
if cc.hasInferenceModelRewrites != tt.wantInferenceModelRewrite {
t.Errorf("populateWithDiscovery() hasInferenceModelRewrites = %v, want %v", cc.hasInferenceModelRewrites, tt.wantInferenceModelRewrite)
}
})
}
}

func TestPopulateControllerConfig_Disable(t *testing.T) {
c := NewControllerConfig(false)
err := c.PopulateControllerConfig(nil)
if err != nil {
t.Errorf("expected nil error, got %v", err)
}
}
32 changes: 15 additions & 17 deletions pkg/epp/server/controller_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func init() {
}

// defaultManagerOptions returns the default options used to create the manager.
func defaultManagerOptions(disableK8sCrdReconcile bool, gknn common.GKNN, metricsServerOptions metricsserver.Options) (ctrl.Options, error) {
func defaultManagerOptions(cfg ControllerConfig, gknn common.GKNN, metricsServerOptions metricsserver.Options) (ctrl.Options, error) {
opt := ctrl.Options{
Scheme: scheme,
Cache: cache.Options{
Expand All @@ -55,24 +55,22 @@ func defaultManagerOptions(disableK8sCrdReconcile bool, gknn common.GKNN, metric
gknn.Namespace: {},
},
},
&v1alpha2.InferenceObjective{}: {
Namespaces: map[string]cache.Config{
gknn.Namespace: {},
},
},
&v1alpha2.InferenceModelRewrite{}: {
Namespaces: map[string]cache.Config{
gknn.Namespace: {},
},
},
},
},
Metrics: metricsServerOptions,
}
if !disableK8sCrdReconcile {
opt.Cache.ByObject[&v1alpha2.InferenceObjective{}] = cache.ByObject{Namespaces: map[string]cache.Config{
gknn.Namespace: {},
}}
if cfg.startCrdReconcilers {
if cfg.hasInferenceObjective {
opt.Cache.ByObject[&v1alpha2.InferenceObjective{}] = cache.ByObject{Namespaces: map[string]cache.Config{
gknn.Namespace: {},
}}
}
if cfg.hasInferenceModelRewrites {
opt.Cache.ByObject[&v1alpha2.InferenceModelRewrite{}] = cache.ByObject{Namespaces: map[string]cache.Config{
gknn.Namespace: {},
}}
}

switch gknn.Group {
case v1alpha2.GroupName:
opt.Cache.ByObject[&v1alpha2.InferencePool{}] = cache.ByObject{
Expand All @@ -95,8 +93,8 @@ func defaultManagerOptions(disableK8sCrdReconcile bool, gknn common.GKNN, metric
}

// NewDefaultManager creates a new controller manager with default configuration.
func NewDefaultManager(disableK8sCrdReconcile bool, gknn common.GKNN, restConfig *rest.Config, metricsServerOptions metricsserver.Options, leaderElectionEnabled bool) (ctrl.Manager, error) {
opt, err := defaultManagerOptions(disableK8sCrdReconcile, gknn, metricsServerOptions)
func NewDefaultManager(controllerCfg ControllerConfig, gknn common.GKNN, restConfig *rest.Config, metricsServerOptions metricsserver.Options, leaderElectionEnabled bool) (ctrl.Manager, error) {
opt, err := defaultManagerOptions(controllerCfg, gknn, metricsServerOptions)
if err != nil {
return nil, fmt.Errorf("failed to create controller manager options: %v", err)
}
Expand Down
37 changes: 20 additions & 17 deletions pkg/epp/server/runserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import (
type ExtProcServerRunner struct {
GrpcPort int
GKNN common.GKNN
DisableK8sCrdReconcile bool
ControllerCfg ControllerConfig
Datastore datastore.Datastore
SecureServing bool
HealthChecking bool
Expand Down Expand Up @@ -104,7 +104,7 @@ func NewDefaultExtProcServerRunner() *ExtProcServerRunner {
return &ExtProcServerRunner{
GrpcPort: DefaultGrpcPort,
GKNN: gknn,
DisableK8sCrdReconcile: false,
ControllerCfg: ControllerConfig{true, true, true},
SecureServing: DefaultSecureServing,
HealthChecking: DefaultHealthChecking,
RefreshPrometheusMetricsInterval: DefaultRefreshPrometheusMetricsInterval,
Expand All @@ -116,7 +116,7 @@ func NewDefaultExtProcServerRunner() *ExtProcServerRunner {
// SetupWithManager sets up the runner with the given manager.
func (r *ExtProcServerRunner) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
// Create the controllers and register them with the manager
if !r.DisableK8sCrdReconcile {
if r.ControllerCfg.startCrdReconcilers {
if err := (&controller.InferencePoolReconciler{
Datastore: r.Datastore,
Reader: mgr.GetClient(),
Expand All @@ -125,21 +125,24 @@ func (r *ExtProcServerRunner) SetupWithManager(ctx context.Context, mgr ctrl.Man
return fmt.Errorf("failed setting up InferencePoolReconciler: %w", err)
}

if err := (&controller.InferenceObjectiveReconciler{
Datastore: r.Datastore,
Reader: mgr.GetClient(),
PoolGKNN: r.GKNN,
}).SetupWithManager(ctx, mgr); err != nil {
return fmt.Errorf("failed setting up InferenceObjectiveReconciler: %w", err)
if r.ControllerCfg.hasInferenceObjective {
if err := (&controller.InferenceObjectiveReconciler{
Datastore: r.Datastore,
Reader: mgr.GetClient(),
PoolGKNN: r.GKNN,
}).SetupWithManager(ctx, mgr); err != nil {
return fmt.Errorf("failed setting up InferenceObjectiveReconciler: %w", err)
}
}
if r.ControllerCfg.hasInferenceModelRewrites {
if err := (&controller.InferenceModelRewriteReconciler{
Datastore: r.Datastore,
Reader: mgr.GetClient(),
PoolGKNN: r.GKNN,
}).SetupWithManager(ctx, mgr); err != nil {
return fmt.Errorf("failed setting up InferenceModelRewriteReconciler: %w", err)
}
}
}

if err := (&controller.InferenceModelRewriteReconciler{
Datastore: r.Datastore,
Reader: mgr.GetClient(),
PoolGKNN: r.GKNN,
}).SetupWithManager(ctx, mgr); err != nil {
return fmt.Errorf("failed setting up InferenceModelRewriteReconciler: %w", err)
}

if err := (&controller.PodReconciler{
Expand Down