diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index e78b6877c..08d847d64 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -251,8 +251,10 @@ func (r *Runner) Run(ctx context.Context) error { setupLog.Error(err, "Failed to extract GKNN") return err } - disableK8sCrdReconcile := *endpointSelector != "" - ds, err := setupDatastore(setupLog, ctx, epf, int32(*modelServerMetricsPort), disableK8sCrdReconcile, *poolName, *poolNamespace, *endpointSelector, *endpointTargetPorts) + startCrdReconcilers := *endpointSelector == "" // If endpointSelector is empty, it means it's not in the standalone mode. Then we should start the inferencePool and other CRD Reconciler. + controllerCfg := runserver.NewControllerConfig(startCrdReconcilers) + + ds, err := setupDatastore(setupLog, ctx, epf, int32(*modelServerMetricsPort), startCrdReconcilers, *poolName, *poolNamespace, *endpointSelector, *endpointTargetPorts) if err != nil { setupLog.Error(err, "Failed to setup datastore") return err @@ -286,7 +288,7 @@ func (r *Runner) Run(ctx context.Context) error { isLeader := &atomic.Bool{} isLeader.Store(false) - mgr, err := runserver.NewDefaultManager(disableK8sCrdReconcile, *gknn, cfg, metricsServerOptions, *haEnableLeaderElection) + mgr, err := runserver.NewDefaultManager(controllerCfg, *gknn, cfg, metricsServerOptions, *haEnableLeaderElection) if err != nil { setupLog.Error(err, "Failed to create controller manager") return err @@ -367,7 +369,7 @@ func (r *Runner) Run(ctx context.Context) error { GrpcPort: *grpcPort, GKNN: *gknn, Datastore: ds, - DisableK8sCrdReconcile: disableK8sCrdReconcile, + ControllerCfg: controllerCfg, SecureServing: *secureServing, HealthChecking: *healthChecking, CertPath: *certPath, @@ -405,8 +407,8 @@ func (r *Runner) Run(ctx context.Context) error { return nil } -func setupDatastore(setupLog logr.Logger, ctx context.Context, epFactory datalayer.EndpointFactory, modelServerMetricsPort int32, disableK8sCrdReconcile bool, namespace, name, endpointSelector, endpointTargetPorts string) (datastore.Datastore, error) { - if !disableK8sCrdReconcile { +func setupDatastore(setupLog logr.Logger, ctx context.Context, epFactory datalayer.EndpointFactory, modelServerMetricsPort int32, startCrdReconcilers bool, namespace, name, endpointSelector, endpointTargetPorts string) (datastore.Datastore, error) { + if startCrdReconcilers { return datastore.NewDatastore(ctx, epFactory, modelServerMetricsPort), nil } else { endpointPool := datalayer.NewEndpointPool(namespace, name) diff --git a/pkg/epp/server/controller_config.go b/pkg/epp/server/controller_config.go new file mode 100644 index 000000000..6672bcdc9 --- /dev/null +++ b/pkg/epp/server/controller_config.go @@ -0,0 +1,79 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/discovery" + "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" +) + +type ControllerConfig struct { + startCrdReconcilers bool + hasInferenceObjective bool + hasInferenceModelRewrites bool +} + +func NewControllerConfig(startCrdReconcilers bool) ControllerConfig { + return ControllerConfig{ + startCrdReconcilers: startCrdReconcilers, + } +} + +func (cc *ControllerConfig) PopulateControllerConfig(cfg *rest.Config) error { + if !cc.startCrdReconcilers { + return nil + } + dc, err := discovery.NewDiscoveryClientForConfig(cfg) + if err != nil { + return err + } + cc.populateWithDiscovery(dc) + return nil +} + +func (cc *ControllerConfig) populateWithDiscovery(dc discovery.DiscoveryInterface) { + inferenceObjectiveGVK := schema.GroupVersionKind{ + Group: v1alpha2.GroupVersion.Group, + Version: v1alpha2.GroupVersion.Version, + Kind: "InferenceObjective", + } + cc.hasInferenceObjective = gvkExists(dc, inferenceObjectiveGVK) + + inferenceModelRewriteGVK := schema.GroupVersionKind{ + Group: v1alpha2.GroupVersion.Group, + Version: v1alpha2.GroupVersion.Version, + Kind: "InferenceModelRewrite", + } + cc.hasInferenceModelRewrites = gvkExists(dc, inferenceModelRewriteGVK) +} + +func gvkExists(dc discovery.DiscoveryInterface, gvk schema.GroupVersionKind) bool { + apiResourceList, err := dc.ServerResourcesForGroupVersion(gvk.GroupVersion().String()) + if err != nil { + ctrl.Log.WithName("controllerConfig").Error(err, "Checking server resources error.") + return false + } + for _, r := range apiResourceList.APIResources { + if r.Kind == gvk.Kind { + return true + } + } + return false +} diff --git a/pkg/epp/server/controller_config_test.go b/pkg/epp/server/controller_config_test.go new file mode 100644 index 000000000..b106b4d60 --- /dev/null +++ b/pkg/epp/server/controller_config_test.go @@ -0,0 +1,104 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/discovery/fake" + k8stesting "k8s.io/client-go/testing" + "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" +) + +func TestNewControllerConfig(t *testing.T) { + c := NewControllerConfig(true) + if !c.startCrdReconcilers { + t.Error("expected startCrdReconcilers to be true") + } + + c = NewControllerConfig(false) + if c.startCrdReconcilers { + t.Error("expected startCrdReconcilers to be false") + } +} + +func TestPopulateWithDiscovery(t *testing.T) { + tests := []struct { + name string + apiResources []metav1.APIResource + wantInferenceObjective bool + wantInferenceModelRewrite bool + }{ + { + name: "Both resources exist", + apiResources: []metav1.APIResource{ + {Kind: "InferenceObjective"}, + {Kind: "InferenceModelRewrite"}, + }, + wantInferenceObjective: true, + wantInferenceModelRewrite: true, + }, + { + name: "Resources do not exist", + apiResources: []metav1.APIResource{}, + wantInferenceObjective: false, + wantInferenceModelRewrite: false, + }, + { + name: "Only InferenceObjective exists", + apiResources: []metav1.APIResource{ + {Kind: "InferenceObjective"}, + }, + wantInferenceObjective: true, + wantInferenceModelRewrite: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Setup fake discovery for this specific test case + fakeDiscovery := &fake.FakeDiscovery{ + Fake: &k8stesting.Fake{}, + } + fakeDiscovery.Resources = []*metav1.APIResourceList{ + { + GroupVersion: v1alpha2.GroupVersion.String(), + APIResources: tt.apiResources, + }, + } + + cc := &ControllerConfig{} + cc.populateWithDiscovery(fakeDiscovery) + + if cc.hasInferenceObjective != tt.wantInferenceObjective { + t.Errorf("populateWithDiscovery() hasInferenceObjective = %v, want %v", cc.hasInferenceObjective, tt.wantInferenceObjective) + } + if cc.hasInferenceModelRewrites != tt.wantInferenceModelRewrite { + t.Errorf("populateWithDiscovery() hasInferenceModelRewrites = %v, want %v", cc.hasInferenceModelRewrites, tt.wantInferenceModelRewrite) + } + }) + } +} + +func TestPopulateControllerConfig_Disable(t *testing.T) { + c := NewControllerConfig(false) + err := c.PopulateControllerConfig(nil) + if err != nil { + t.Errorf("expected nil error, got %v", err) + } +} diff --git a/pkg/epp/server/controller_manager.go b/pkg/epp/server/controller_manager.go index acc0bd51b..32748885e 100644 --- a/pkg/epp/server/controller_manager.go +++ b/pkg/epp/server/controller_manager.go @@ -45,7 +45,7 @@ func init() { } // defaultManagerOptions returns the default options used to create the manager. -func defaultManagerOptions(disableK8sCrdReconcile bool, gknn common.GKNN, metricsServerOptions metricsserver.Options) (ctrl.Options, error) { +func defaultManagerOptions(cfg ControllerConfig, gknn common.GKNN, metricsServerOptions metricsserver.Options) (ctrl.Options, error) { opt := ctrl.Options{ Scheme: scheme, Cache: cache.Options{ @@ -55,24 +55,22 @@ func defaultManagerOptions(disableK8sCrdReconcile bool, gknn common.GKNN, metric gknn.Namespace: {}, }, }, - &v1alpha2.InferenceObjective{}: { - Namespaces: map[string]cache.Config{ - gknn.Namespace: {}, - }, - }, - &v1alpha2.InferenceModelRewrite{}: { - Namespaces: map[string]cache.Config{ - gknn.Namespace: {}, - }, - }, }, }, Metrics: metricsServerOptions, } - if !disableK8sCrdReconcile { - opt.Cache.ByObject[&v1alpha2.InferenceObjective{}] = cache.ByObject{Namespaces: map[string]cache.Config{ - gknn.Namespace: {}, - }} + if cfg.startCrdReconcilers { + if cfg.hasInferenceObjective { + opt.Cache.ByObject[&v1alpha2.InferenceObjective{}] = cache.ByObject{Namespaces: map[string]cache.Config{ + gknn.Namespace: {}, + }} + } + if cfg.hasInferenceModelRewrites { + opt.Cache.ByObject[&v1alpha2.InferenceModelRewrite{}] = cache.ByObject{Namespaces: map[string]cache.Config{ + gknn.Namespace: {}, + }} + } + switch gknn.Group { case v1alpha2.GroupName: opt.Cache.ByObject[&v1alpha2.InferencePool{}] = cache.ByObject{ @@ -95,8 +93,8 @@ func defaultManagerOptions(disableK8sCrdReconcile bool, gknn common.GKNN, metric } // NewDefaultManager creates a new controller manager with default configuration. -func NewDefaultManager(disableK8sCrdReconcile bool, gknn common.GKNN, restConfig *rest.Config, metricsServerOptions metricsserver.Options, leaderElectionEnabled bool) (ctrl.Manager, error) { - opt, err := defaultManagerOptions(disableK8sCrdReconcile, gknn, metricsServerOptions) +func NewDefaultManager(controllerCfg ControllerConfig, gknn common.GKNN, restConfig *rest.Config, metricsServerOptions metricsserver.Options, leaderElectionEnabled bool) (ctrl.Manager, error) { + opt, err := defaultManagerOptions(controllerCfg, gknn, metricsServerOptions) if err != nil { return nil, fmt.Errorf("failed to create controller manager options: %v", err) } diff --git a/pkg/epp/server/runserver.go b/pkg/epp/server/runserver.go index d35e70c74..2fd5720a4 100644 --- a/pkg/epp/server/runserver.go +++ b/pkg/epp/server/runserver.go @@ -49,7 +49,7 @@ import ( type ExtProcServerRunner struct { GrpcPort int GKNN common.GKNN - DisableK8sCrdReconcile bool + ControllerCfg ControllerConfig Datastore datastore.Datastore SecureServing bool HealthChecking bool @@ -104,7 +104,7 @@ func NewDefaultExtProcServerRunner() *ExtProcServerRunner { return &ExtProcServerRunner{ GrpcPort: DefaultGrpcPort, GKNN: gknn, - DisableK8sCrdReconcile: false, + ControllerCfg: ControllerConfig{true, true, true}, SecureServing: DefaultSecureServing, HealthChecking: DefaultHealthChecking, RefreshPrometheusMetricsInterval: DefaultRefreshPrometheusMetricsInterval, @@ -116,7 +116,7 @@ func NewDefaultExtProcServerRunner() *ExtProcServerRunner { // SetupWithManager sets up the runner with the given manager. func (r *ExtProcServerRunner) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error { // Create the controllers and register them with the manager - if !r.DisableK8sCrdReconcile { + if r.ControllerCfg.startCrdReconcilers { if err := (&controller.InferencePoolReconciler{ Datastore: r.Datastore, Reader: mgr.GetClient(), @@ -125,21 +125,24 @@ func (r *ExtProcServerRunner) SetupWithManager(ctx context.Context, mgr ctrl.Man return fmt.Errorf("failed setting up InferencePoolReconciler: %w", err) } - if err := (&controller.InferenceObjectiveReconciler{ - Datastore: r.Datastore, - Reader: mgr.GetClient(), - PoolGKNN: r.GKNN, - }).SetupWithManager(ctx, mgr); err != nil { - return fmt.Errorf("failed setting up InferenceObjectiveReconciler: %w", err) + if r.ControllerCfg.hasInferenceObjective { + if err := (&controller.InferenceObjectiveReconciler{ + Datastore: r.Datastore, + Reader: mgr.GetClient(), + PoolGKNN: r.GKNN, + }).SetupWithManager(ctx, mgr); err != nil { + return fmt.Errorf("failed setting up InferenceObjectiveReconciler: %w", err) + } + } + if r.ControllerCfg.hasInferenceModelRewrites { + if err := (&controller.InferenceModelRewriteReconciler{ + Datastore: r.Datastore, + Reader: mgr.GetClient(), + PoolGKNN: r.GKNN, + }).SetupWithManager(ctx, mgr); err != nil { + return fmt.Errorf("failed setting up InferenceModelRewriteReconciler: %w", err) + } } - } - - if err := (&controller.InferenceModelRewriteReconciler{ - Datastore: r.Datastore, - Reader: mgr.GetClient(), - PoolGKNN: r.GKNN, - }).SetupWithManager(ctx, mgr); err != nil { - return fmt.Errorf("failed setting up InferenceModelRewriteReconciler: %w", err) } if err := (&controller.PodReconciler{