Skip to content

Commit aa301d2

Browse files
committed
fixed unit test except datastore
Signed-off-by: Nir Rozenbaum <[email protected]>
1 parent 9a444e6 commit aa301d2

File tree

9 files changed

+46
-25
lines changed

9 files changed

+46
-25
lines changed

docs/proposals/003-model-server-protocol/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ This is the protocol between the EPP and the model servers.
44

55
### Inference API Protocol
66

7-
The model server MUST implement OpenAI’s [Completions](https://platform.openai.com/docs/api-reference/completions)
7+
The model server MUST implement OpenAI’s [Completions](https://platform.openai.com/docs/api-reference/completions)
88
and [Chat](https://platform.openai.com/docs/api-reference/chat) APIs.
99

1010
#### ** Experimental **

pkg/epp/backend/metrics/fake_metrics_scraper.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@ type FakeMetricsScraper struct {
3939
}
4040

4141
func (s *FakeMetricsScraper) Name() string {
42-
return "fake-metrics"
42+
return MetricsDataKey
4343
}
4444

4545
func (s *FakeMetricsScraper) InitData() podinfo.ScrapedData {
46-
return s.existingMetrics
46+
return NewMetrics()
4747
}
4848

4949
func (s *FakeMetricsScraper) Scrape(ctx context.Context, pod *backend.Pod, port int) (podinfo.ScrapedData, error) {
@@ -63,7 +63,7 @@ func (s *FakeMetricsScraper) Scrape(ctx context.Context, pod *backend.Pod, port
6363
return res.Clone(), nil
6464
}
6565

66-
func (s *FakeMetricsScraper) ProcessResult(podinfo.ScrapedData) {} // noop
66+
func (s *FakeMetricsScraper) ProcessResult(ctx context.Context, podinfo podinfo.ScrapedData) {} // noop
6767

6868
func (s *FakeMetricsScraper) SetRes(new map[types.NamespacedName]*Metrics) {
6969
s.resMu.Lock()

pkg/epp/backend/metrics/metrics_scraper.go

+10-5
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,15 @@ import (
2222
"net/http"
2323
"strconv"
2424
"strings"
25+
"time"
2526

2627
dto "github.com/prometheus/client_model/go"
2728
"github.com/prometheus/common/expfmt"
2829
"go.uber.org/multierr"
30+
"sigs.k8s.io/controller-runtime/pkg/log"
2931
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
3032
podinfo "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/pod-info"
33+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
3134
)
3235

3336
const (
@@ -85,16 +88,18 @@ func (s *MetricsScraper) Scrape(ctx context.Context, pod *backend.Pod, port int)
8588
return s.promToPodMetrics(metricFamilies)
8689
}
8790

88-
func (s *MetricsScraper) ProcessResult(result podinfo.ScrapedData) {
89-
if result == nil {
91+
func (s *MetricsScraper) ProcessResult(ctx context.Context, data podinfo.ScrapedData) {
92+
if data == nil {
9093
return
9194
}
92-
metrics := result.(*Metrics) //
95+
metrics := data.(*Metrics)
9396
if metrics == nil {
94-
return // cannot convert to *Metrics
97+
return
9598
}
99+
// if result cannot be converted to *Metrics, we cannot process result, otherwise
100+
metrics.UpdateTime = time.Now()
96101
s.existingMetrics = metrics
97-
//TODO complete
102+
log.FromContext(ctx).V(logutil.TRACE).Info("processed scraped metrics", "updated", metrics)
98103
}
99104

100105
// promToPodMetrics updates internal pod metrics with scraped Prometheus metrics.

pkg/epp/backend/models/models_scraper.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@ import (
2020
"io"
2121
"net/http"
2222

23+
"sigs.k8s.io/controller-runtime/pkg/log"
2324
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
2425
podinfo "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/pod-info"
26+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
2527
)
2628

2729
const (
@@ -80,6 +82,6 @@ func (s *ModelsScraper) parseResponse(responseBody io.Reader) (podinfo.ScrapedDa
8082
return &ModelsData{ModelIDs: modelIDs}, nil
8183
}
8284

83-
func (s *ModelsScraper) ProcessResult(result podinfo.ScrapedData) {
84-
85+
func (s *ModelsScraper) ProcessResult(ctx context.Context, data podinfo.ScrapedData) {
86+
log.FromContext(ctx).V(logutil.TRACE).Info("processed scraped models", "models", data)
8587
}

pkg/epp/backend/pod-info/pod_info.go

+18-8
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ type data struct {
4949
}
5050

5151
func newPodInfo(pod *corev1.Pod, scrapers map[Scraper]*ScraperConfig, ds Datastore) *podInfo {
52-
scraper := &podInfo{
52+
info := &podInfo{
5353
scrapers: scrapers,
5454
ds: ds,
5555
once: sync.Once{},
@@ -60,10 +60,10 @@ func newPodInfo(pod *corev1.Pod, scrapers map[Scraper]*ScraperConfig, ds Datasto
6060
data.data[scraper.Name()] = scraper.InitData()
6161
}
6262

63-
scraper.pod.Store(toInternalPod(pod))
64-
scraper.data.Store(data)
63+
info.pod.Store(toInternalPod(pod))
64+
info.data.Store(data)
6565

66-
return scraper
66+
return info
6767
}
6868

6969
type podInfo struct {
@@ -97,6 +97,11 @@ func (pi *podInfo) String() string {
9797
return fmt.Sprintf("Pod: %v; Data: %v", pi.GetPod(), pi.GetData())
9898
}
9999

100+
func (pi *podInfo) Stop() {
101+
pi.logger.V(logutil.DEFAULT).Info("Stopping scrape loop")
102+
pi.stopFunc()
103+
}
104+
100105
func (pi *podInfo) startScrapers(ctx context.Context) {
101106
// The returned context'pi Done channel is closed when the returned cancel function is called or when the parent context'pi Done channel is closed, whichever happens first.
102107
newCtx, cancel := context.WithCancel(ctx)
@@ -131,14 +136,19 @@ func (pi *podInfo) startScrapeLoop(ctx context.Context, scraper Scraper, interva
131136
}
132137
// allow processing partial results in case there was an error.
133138
// the scraper can return nil in case of an error and then do nothing in the ProcessResult function in case no partial update is required
134-
scraper.ProcessResult(scrapedData)
139+
scraper.ProcessResult(ctx, scrapedData)
140+
// store updated data if its valid
141+
if scrapedData != nil {
142+
pi.storeScrapedData(scraper.Name(), scrapedData)
143+
}
135144
}
136145
}
137146
}
138147

139-
func (pi *podInfo) Stop() {
140-
pi.logger.V(logutil.DEFAULT).Info("Stopping scrape loop")
141-
pi.stopFunc()
148+
func (pi *podInfo) storeScrapedData(key string, data ScrapedData) {
149+
updated := pi.data.Load()
150+
updated.data[key] = data
151+
pi.data.Store(updated)
142152
}
143153

144154
func toInternalPod(pod *corev1.Pod) *backend.Pod {

pkg/epp/backend/pod-info/types.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ type Scraper interface {
4848
// Scrape scrapes infromation from a pod.
4949
Scrape(ctx context.Context, pod *backend.Pod, port int) (ScrapedData, error)
5050
// ProcessResult process the returned object from Scrape function.
51-
// This function should update PodInfo data field with the new result.
52-
ProcessResult(ScrapedData)
51+
// This function is used to update internal fields in the scraper struct before storing the ScrapedData in the PodInfo data map.
52+
ProcessResult(ctx context.Context, data ScrapedData)
5353
}
5454

5555
// ScrapedData is a generic type for arbitrary scraperd data stored in PodInfo.

pkg/epp/datastore/datastore_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -353,8 +353,9 @@ func TestMetrics(t *testing.T) {
353353
got := ds.PodGetAll()
354354
metrics := []*backendmetrics.Metrics{}
355355
for _, one := range got {
356-
podMetrics := one.GetData()[backendmetrics.MetricsDataKey].(*backendmetrics.Metrics)
357-
metrics = append(metrics, podMetrics)
356+
if podMetrics, ok := one.GetData()[backendmetrics.MetricsDataKey].(*backendmetrics.Metrics); ok {
357+
metrics = append(metrics, podMetrics)
358+
}
358359
}
359360
diff := cmp.Diff(test.want, metrics, cmpopts.IgnoreFields(backendmetrics.Metrics{}, "UpdateTime"), cmpopts.SortSlices(func(a, b *backendmetrics.Metrics) bool {
360361
return a.String() < b.String()

pkg/epp/scheduling/config.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ limitations under the License.
1616

1717
package scheduling
1818

19-
import "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
19+
import (
20+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
21+
)
2022

2123
// NewSchedulerConfig creates a new SchedulerConfig object with the given plugins.
2224
func NewSchedulerConfig(preSchedulePlugins []plugins.PreSchedule, filters []plugins.Filter, scorers map[plugins.Scorer]int,

pkg/epp/scheduling/scheduler_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,8 @@ func TestSchedulePlugins(t *testing.T) {
394394

395395
// Validate output
396396
wantPod := &types.PodData{
397-
Pod: &backend.Pod{NamespacedName: test.wantTargetPod},
397+
Pod: &backend.Pod{NamespacedName: test.wantTargetPod},
398+
Data: map[string]podinfo.ScrapedData{},
398399
}
399400
wantRes := &types.Result{TargetPod: wantPod}
400401
if diff := cmp.Diff(wantRes, got); diff != "" {

0 commit comments

Comments
 (0)