From 96bee04c19828c379e632e013b13a5256325ae4d Mon Sep 17 00:00:00 2001 From: nayihz Date: Tue, 17 Jun 2025 18:09:05 +0800 Subject: [PATCH] feat: make model metrics endpoints configurable --- cmd/epp/runner/runner.go | 36 ++++++++++++++++++++++++- pkg/epp/backend/metrics/fake.go | 2 +- pkg/epp/backend/metrics/metrics.go | 16 +++++++---- pkg/epp/backend/metrics/metrics_test.go | 4 +-- 4 files changed, 49 insertions(+), 9 deletions(-) diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index 1340ff6dc..161d63447 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -21,6 +21,7 @@ import ( "flag" "fmt" "net/http/pprof" + "os" "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus" @@ -116,6 +117,10 @@ var ( configFile = flag.String("configFile", "", "The path to the configuration file") configText = flag.String("configText", "", "The configuration specified as text, in lieu of a file") + modelServerMetricsPort = flag.Int("modelServerMetricsPort", 0, "Port to scrape metrics from pods. "+ + "Default value will be set to InferencePool.Spec.TargetPortNumber if not set.") + modelServerMetricsPath = flag.String("modelServerMetricsPath", "/metrics", "Path to scrape metrics from pods") + setupLog = ctrl.Log.WithName("setup") // Environment variables @@ -147,7 +152,32 @@ func (r *Runner) WithSchedulerConfig(schedulerConfig *scheduling.SchedulerConfig return r } +func bindEnvToFlags() { + // map[ENV_VAR]flagName – add more as needed + for env, flg := range map[string]string{ + "GRPC_PORT": "grpcPort", + "GRPC_HEALTH_PORT": "grpcHealthPort", + "MODEL_SERVER_METRICS_PORT": "modelServerMetricsPort", + "MODEL_SERVER_METRICS_PATH": "modelServerMetricsPath", + "DESTINATION_ENDPOINT_HINT_KEY": "destinationEndpointHintKey", + "POOL_NAME": "poolName", + "POOL_NAMESPACE": "poolNamespace", + // durations & bools work too; flag.Set expects the *string* form + "REFRESH_METRICS_INTERVAL": "refreshMetricsInterval", + "SECURE_SERVING": "secureServing", + } { + if v := os.Getenv(env); v != "" { + // ignore error; Parse() will catch invalid values later + _ = flag.Set(flg, v) + } + } +} + func (r *Runner) Run(ctx context.Context) error { + // Defaults already baked into flag declarations + // Load env vars as "soft" overrides + bindEnvToFlags() + opts := zap.Options{ Development: true, } @@ -189,7 +219,11 @@ func (r *Runner) Run(ctx context.Context) error { return err } verifyMetricMapping(*mapping, setupLog) - pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.PodMetricsClientImpl{MetricMapping: mapping}, *refreshMetricsInterval) + pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.PodMetricsClientImpl{ + MetricMapping: mapping, + ModelServerMetricsPort: int32(*modelServerMetricsPort), + ModelServerMetricsPath: *modelServerMetricsPath, + }, *refreshMetricsInterval) datastore := datastore.NewDatastore(ctx, pmf) diff --git a/pkg/epp/backend/metrics/fake.go b/pkg/epp/backend/metrics/fake.go index 5599d4ec0..68283187a 100644 --- a/pkg/epp/backend/metrics/fake.go +++ b/pkg/epp/backend/metrics/fake.go @@ -56,7 +56,7 @@ type FakePodMetricsClient struct { Res map[types.NamespacedName]*MetricsState } -func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, pod *backend.Pod, existing *MetricsState, port int32) (*MetricsState, error) { +func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, pod *backend.Pod, existing *MetricsState, _ int32) (*MetricsState, error) { f.errMu.RLock() err, ok := f.Err[pod.NamespacedName] f.errMu.RUnlock() diff --git a/pkg/epp/backend/metrics/metrics.go b/pkg/epp/backend/metrics/metrics.go index 8899e00ce..590685c37 100644 --- a/pkg/epp/backend/metrics/metrics.go +++ b/pkg/epp/backend/metrics/metrics.go @@ -37,15 +37,14 @@ const ( ) type PodMetricsClientImpl struct { - MetricMapping *MetricMapping + MetricMapping *MetricMapping + ModelServerMetricsPort int32 + ModelServerMetricsPath string } // FetchMetrics fetches metrics from a given pod, clones the existing metrics object and returns an updated one. func (p *PodMetricsClientImpl) FetchMetrics(ctx context.Context, pod *backend.Pod, existing *MetricsState, port int32) (*MetricsState, error) { - // Currently the metrics endpoint is hard-coded, which works with vLLM. - // TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/16): Consume this from InferencePool config. - url := "http://" + pod.Address + ":" + strconv.Itoa(int(port)) + "/metrics" - + url := p.getMetricEndpoint(pod, port) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { return nil, fmt.Errorf("failed to create request: %v", err) @@ -70,6 +69,13 @@ func (p *PodMetricsClientImpl) FetchMetrics(ctx context.Context, pod *backend.Po return p.promToPodMetrics(metricFamilies, existing) } +func (p *PodMetricsClientImpl) getMetricEndpoint(pod *backend.Pod, targetPortNumber int32) string { + if p.ModelServerMetricsPort == 0 { + p.ModelServerMetricsPort = targetPortNumber + } + return fmt.Sprintf("http://%s:%d%s", pod.Address, p.ModelServerMetricsPort, p.ModelServerMetricsPath) +} + // promToPodMetrics updates internal pod metrics with scraped Prometheus metrics. func (p *PodMetricsClientImpl) promToPodMetrics( metricFamilies map[string]*dto.MetricFamily, diff --git a/pkg/epp/backend/metrics/metrics_test.go b/pkg/epp/backend/metrics/metrics_test.go index bfc3e01fa..9f7c2b896 100644 --- a/pkg/epp/backend/metrics/metrics_test.go +++ b/pkg/epp/backend/metrics/metrics_test.go @@ -495,9 +495,9 @@ func TestFetchMetrics(t *testing.T) { }, } existing := &MetricsState{} - p := &PodMetricsClientImpl{} // No MetricMapping needed for this basic test + p := &PodMetricsClientImpl{ModelServerMetricsPort: 9999, ModelServerMetricsPath: "/metrics"} // No MetricMapping needed for this basic test - _, err := p.FetchMetrics(ctx, pod, existing, 9999) // Use a port that's unlikely to be in use. + _, err := p.FetchMetrics(ctx, pod, existing, 9999) // Use a port that's unlikely to be in use if err == nil { t.Errorf("FetchMetrics() expected error, got nil") }