Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 74 additions & 1 deletion apix/config/v1alpha1/endpointpickerconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,25 @@ type EndpointPickerConfig struct {
// SchedulingProfiles is the list of named SchedulingProfiles
// that will be created.
SchedulingProfiles []SchedulingProfile `json:"schedulingProfiles"`

// +optional
// FeatureGates is a set of flags that enable various experimental features with the EPP.
// If omitted non of these experimental features will be enabled.
FeatureGates FeatureGates `json:"featureGates,omitempty"`

// +optional
// SaturationDetector when present specifies the configuration of the
// Saturation detector. If not present, default values are used.
SaturationDetector *SaturationDetector `json:"saturationDetector,omitempty"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is too specific and should be generalized.
we've talked in the past about having a parameters section in the config API, such that multiple structs in the code can consume. for example, I might want to consume the metricsStalenessThreshold in more places.
This approach is not future proof and we might need to add more and more fields here instead of just generalizing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parts of the code, "just knowing" what parameters they are looking for in a pool of parameters, is as error prone as environment variables. A typo will cause a parameter to be ignored and presumably a default value will be used. I believe this will be hard to "debug" scenarios like this.

The sections need to be explicit.

As for having constants as I did or references to a parameter, that is a different issue. It does however make for a very verbose configuration. With the need for more validation code. Unless you do something like:

saturationDetector:
   queueDepthThreshold:
     value: 5
   kvCacheUtilThreshold:
     parameterRef: plover

}

func (cfg EndpointPickerConfig) String() string {
return fmt.Sprintf(
"{Plugins: %v, SchedulingProfiles: %v}",
"{Plugins: %v, SchedulingProfiles: %v, FeatureGates: %v, SaturationDetector: %v}",
cfg.Plugins,
cfg.SchedulingProfiles,
cfg.FeatureGates,
cfg.SaturationDetector,
)
}

Expand Down Expand Up @@ -118,3 +130,64 @@ func (sp SchedulingPlugin) String() string {
}
return fmt.Sprintf("{PluginRef: %s%s}", sp.PluginRef, weight)
}

// FeatureGates is a set of flags that enable various experimental features with the EPP
type FeatureGates map[string]bool

func (fg FeatureGates) String() string {
if fg == nil {
return "{}"
}

result := ""
for key, value := range fg {
result += fmt.Sprintf("%s:%v,", key, value)
}

if len(result) > 0 {
result = result[:len(result)-1]
}
return "{" + result + "}"
}

// SaturationDetector
type SaturationDetector struct {
// +optional
// QueueDepthThreshold defines the backend waiting queue size above which a
// pod is considered to have insufficient capacity for new requests.
QueueDepthThreshold int `json:"queueDepthThreshold,omitempty"`

// +optional
// KVCacheUtilThreshold defines the KV cache utilization (0.0 to 1.0) above
// which a pod is considered to have insufficient capacity.
KVCacheUtilThreshold float64 `json:"kvCacheUtilThreshold,omitempty"`

// +optional
// MetricsStalenessThreshold defines how old a pod's metrics can be.
// If a pod's metrics are older than this, it might be excluded from
// "good capacity" considerations or treated as having no capacity for
// safety.
MetricsStalenessThreshold metav1.Duration `json:"metricsStalenessThreshold,omitempty"`
}

func (sd *SaturationDetector) String() string {
result := ""
if sd != nil {
if sd.QueueDepthThreshold != 0 {
result += fmt.Sprintf("QueueDepthThreshold: %d", sd.QueueDepthThreshold)
}
if sd.KVCacheUtilThreshold != 0.0 {
if len(result) != 0 {
result += ", "
}
result += fmt.Sprintf("KVCacheUtilThreshold: %g", sd.KVCacheUtilThreshold)
}
if sd.MetricsStalenessThreshold.Duration != 0.0 {
if len(result) != 0 {
result += ", "
}
result += fmt.Sprintf("MetricsStalenessThreshold: %s", sd.MetricsStalenessThreshold)
}
}
return "{" + result + "}"
}
49 changes: 49 additions & 0 deletions apix/config/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 19 additions & 27 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
"sigs.k8s.io/gateway-api-inference-extension/pkg/common"
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/config"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/config/loader"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
dlmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/metrics"
Expand All @@ -62,17 +63,10 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/scorer"
testfilter "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/test/filter"
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
"sigs.k8s.io/gateway-api-inference-extension/version"
)

const (
// enableExperimentalDatalayerV2 defines the environment variable
// used as feature flag for the pluggable data layer.
enableExperimentalDatalayerV2 = "ENABLE_EXPERIMENTAL_DATALAYER_V2"
)

var (
grpcPort = flag.Int("grpc-port", runserver.DefaultGrpcPort, "The gRPC port used for communicating with Envoy proxy")
grpcHealthPort = flag.Int("grpc-health-port", runserver.DefaultGrpcHealthPort, "The port used for gRPC liveness and readiness probes")
Expand Down Expand Up @@ -156,19 +150,21 @@ func (r *Runner) Run(ctx context.Context) error {
})
setupLog.Info("Flags processed", "flags", flags)

// --- Load Configurations from Environment Variables ---
sdConfig := saturationdetector.LoadConfigFromEnv()

// --- Get Kubernetes Config ---
cfg, err := ctrl.GetConfig()
if err != nil {
setupLog.Error(err, "Failed to get Kubernetes rest config")
return err
}

eppConfig, err := r.parseConfiguration(ctx)
if err != nil {
setupLog.Error(err, "Failed to parse plugins configuration")
return err
}

// --- Setup Datastore ---
useDatalayerV2 := env.GetEnvBool(enableExperimentalDatalayerV2, false, setupLog)
epf, err := r.setupMetricsCollection(setupLog, useDatalayerV2)
epf, err := r.setupMetricsCollection(setupLog, eppConfig.FeatureConfig[datalayer.FeatureGate])
if err != nil {
return err
}
Expand Down Expand Up @@ -233,12 +229,6 @@ func (r *Runner) Run(ctx context.Context) error {
runtime.SetBlockProfileRate(1)
}

err = r.parsePluginsConfiguration(ctx)
if err != nil {
setupLog.Error(err, "Failed to parse plugins configuration")
return err
}

// --- Initialize Core EPP Components ---
if r.schedulerConfig == nil {
err := errors.New("scheduler config must be set either by config api or through code")
Expand All @@ -250,7 +240,7 @@ func (r *Runner) Run(ctx context.Context) error {

scheduler := scheduling.NewSchedulerWithConfig(r.schedulerConfig)

saturationDetector := saturationdetector.NewDetector(sdConfig, setupLog)
saturationDetector := saturationdetector.NewDetector(&eppConfig.SaturationDetectorConfig, setupLog)

director := requestcontrol.NewDirectorWithConfig(datastore, scheduler, saturationDetector, r.requestControlConfig)

Expand All @@ -267,7 +257,7 @@ func (r *Runner) Run(ctx context.Context) error {
MetricsStalenessThreshold: *metricsStalenessThreshold,
Director: director,
SaturationDetector: saturationDetector,
UseExperimentalDatalayerV2: useDatalayerV2, // pluggable data layer feature flag
UseExperimentalDatalayerV2: eppConfig.FeatureConfig[datalayer.FeatureGate], // pluggable data layer feature flag
}
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
setupLog.Error(err, "Failed to setup EPP controllers")
Expand Down Expand Up @@ -309,9 +299,9 @@ func (r *Runner) registerInTreePlugins() {
plugins.Register(testfilter.HeaderBasedTestingFilterType, testfilter.HeaderBasedTestingFilterFactory)
}

func (r *Runner) parsePluginsConfiguration(ctx context.Context) error {
func (r *Runner) parseConfiguration(ctx context.Context) (*config.Config, error) {
if *configText == "" && *configFile == "" {
return nil // configuring through code, not through file
return nil, nil // configuring through code, not through file
}

logger := log.FromContext(ctx)
Expand All @@ -323,24 +313,26 @@ func (r *Runner) parsePluginsConfiguration(ctx context.Context) error {
var err error
configBytes, err = os.ReadFile(*configFile)
if err != nil {
return fmt.Errorf("failed to load config from a file '%s' - %w", *configFile, err)
return nil, fmt.Errorf("failed to load config from a file '%s' - %w", *configFile, err)
}
}

loader.RegisterFeatureGate(datalayer.FeatureGate)

r.registerInTreePlugins()
handle := plugins.NewEppHandle(ctx)
config, err := loader.LoadConfig(configBytes, handle, logger)
cfg, err := loader.LoadConfig(configBytes, handle, logger)
if err != nil {
return fmt.Errorf("failed to load the configuration - %w", err)
return nil, fmt.Errorf("failed to load the configuration - %w", err)
}

r.schedulerConfig = config.SchedulerConfig
r.schedulerConfig = cfg.SchedulerConfig

// Add requestControl plugins
r.requestControlConfig.AddPlugins(handle.GetAllPlugins()...)

logger.Info("loaded configuration from file/text successfully")
return nil
return cfg, nil
}

func (r *Runner) setupMetricsCollection(setupLog logr.Logger, useExperimentalDatalayer bool) (datalayer.EndpointFactory, error) {
Expand Down
2 changes: 1 addition & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ nav:
- InferencePool Rollout: guides/inferencepool-rollout.md
- Metrics and Observability: guides/metrics-and-observability.md
- Configuration Guide:
- Configuring the plugins via configuration files or text: guides/epp-configuration/config-text.md
- Configuring the EndPoint Picker via configuration files or text: guides/epp-configuration/config-text.md
- Prefix Cache Aware Plugin: guides/epp-configuration/prefix-aware.md
- Troubleshooting Guide: guides/troubleshooting.md
- Implementer Guides:
Expand Down
9 changes: 7 additions & 2 deletions pkg/epp/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@ limitations under the License.

package config

import "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
)

// Config is the configuration loaded from the text based configuration
type Config struct {
SchedulerConfig *scheduling.SchedulerConfig
SchedulerConfig *scheduling.SchedulerConfig
FeatureConfig map[string]bool
SaturationDetectorConfig saturationdetector.Config
}
Loading