diff --git a/Tiltfile b/Tiltfile index a42fe43f4..6871d18b3 100644 --- a/Tiltfile +++ b/Tiltfile @@ -189,6 +189,13 @@ if 'nova' in ACTIVE_DEPLOYMENTS: trigger_mode=TRIGGER_MODE_MANUAL, auto_init=False, ) + local_resource( + 'Commitments E2E Tests', + '/bin/sh -c "kubectl exec deploy/cortex-nova-scheduling-controller-manager -- /manager e2e-commitments"', + labels=['Cortex-Nova'], + trigger_mode=TRIGGER_MODE_MANUAL, + auto_init=False, + ) if 'manila' in ACTIVE_DEPLOYMENTS: print("Activating Cortex Manila bundle") diff --git a/cmd/main.go b/cmd/main.go index 351bf29eb..5d8acf1e7 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -110,6 +110,10 @@ func main() { manilaChecksConfig := conf.GetConfigOrDie[manila.ChecksConfig]() manila.RunChecks(ctx, client, manilaChecksConfig) return + case "e2e-commitments": + commitmentsChecksConfig := conf.GetConfigOrDie[commitments.E2EChecksConfig]() + commitments.RunCommitmentsE2EChecks(ctx, commitmentsChecksConfig) + return } } @@ -665,7 +669,9 @@ func main() { if slices.Contains(mainConfig.EnabledTasks, "commitments-sync-task") { setupLog.Info("starting commitments syncer") - syncer := commitments.NewSyncer(multiclusterClient) + syncerMonitor := commitments.NewSyncerMonitor() + must.Succeed(metrics.Registry.Register(syncerMonitor)) + syncer := commitments.NewSyncer(multiclusterClient, syncerMonitor) syncerConfig := conf.GetConfigOrDie[commitments.SyncerConfig]() syncerDefaults := commitments.DefaultSyncerConfig() if syncerConfig.SyncInterval == 0 { diff --git a/go.mod b/go.mod index cf74f39f1..a6c3b9ce7 100644 --- a/go.mod +++ b/go.mod @@ -72,7 +72,7 @@ require ( github.com/poy/onpar v0.3.5 // indirect github.com/prometheus/common v0.67.5 // indirect github.com/prometheus/procfs v0.17.0 // indirect - github.com/sapcc/go-api-declarations v1.20.2 + github.com/sapcc/go-api-declarations v1.21.0 github.com/sirupsen/logrus v1.9.3 // indirect github.com/spf13/cobra v1.10.1 // indirect github.com/spf13/pflag v1.0.10 // indirect @@ -98,7 +98,7 @@ require ( golang.org/x/oauth2 v0.34.0 // indirect golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.42.0 // indirect - golang.org/x/term v0.41.0 // indirect + golang.org/x/term v0.41.0 golang.org/x/text v0.33.0 // indirect golang.org/x/time v0.14.0 // indirect gomodules.xyz/jsonpatch/v2 v2.5.0 // indirect @@ -108,7 +108,7 @@ require ( google.golang.org/protobuf v1.36.11 // indirect gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect + gopkg.in/yaml.v3 v3.0.1 gotest.tools v2.2.0+incompatible // indirect k8s.io/apiextensions-apiserver v0.35.0 // indirect k8s.io/apiserver v0.35.0 // indirect diff --git a/go.sum b/go.sum index 98a4f9477..6dba2b5cc 100644 --- a/go.sum +++ b/go.sum @@ -176,8 +176,8 @@ github.com/prometheus/procfs v0.17.0/go.mod h1:oPQLaDAMRbA+u8H5Pbfq+dl3VDAvHxMUO github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= -github.com/sapcc/go-api-declarations v1.20.2 h1:GWqv8VgsF4k9id6N051AVTaEpcjT02APsOuz2yCvTPQ= -github.com/sapcc/go-api-declarations v1.20.2/go.mod h1:eiRrXXUeQS5C/1kKn8/KMjk0Y0goUzgDQswj30rH0Zc= +github.com/sapcc/go-api-declarations v1.21.0 h1:Ag6GXgJLTFdBDKmrJU4QFllQbgGSenSGeHpLuvuxeDk= +github.com/sapcc/go-api-declarations v1.21.0/go.mod h1:eiRrXXUeQS5C/1kKn8/KMjk0Y0goUzgDQswj30rH0Zc= github.com/sapcc/go-bits v0.0.0-20260312170110-034b497ebb7e h1:4wgkrfAlnL6ffM7HTNoHn1HrBBurCRR71WNOszdiDNQ= github.com/sapcc/go-bits v0.0.0-20260312170110-034b497ebb7e/go.mod h1:NZjMiGVm04U25vwR6ZWvMw0XOOnvS1jkmXpjiepOeUw= github.com/sergi/go-diff v1.4.0 h1:n/SP9D5ad1fORl+llWyN+D6qoUETXNZARKjyY2/KVCw= @@ -251,12 +251,8 @@ golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= -golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= -golang.org/x/term v0.39.0 h1:RclSuaJf32jOqZz74CkPA9qFuVTX7vhLlpfj/IGWlqY= -golang.org/x/term v0.39.0/go.mod h1:yxzUCTP/U+FzoxfdKmLaA0RV1WgE0VY7hXBwKtY/4ww= golang.org/x/term v0.41.0 h1:QCgPso/Q3RTJx2Th4bDLqML4W6iJiaXFq2/ftQF13YU= golang.org/x/term v0.41.0/go.mod h1:3pfBgksrReYfZ5lvYM0kSO0LIkAl4Yl2bXOkKP7Ec2A= golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= diff --git a/helm/bundles/cortex-nova/alerts/nova.alerts.yaml b/helm/bundles/cortex-nova/alerts/nova.alerts.yaml index 9b80e079b..a69f3be51 100644 --- a/helm/bundles/cortex-nova/alerts/nova.alerts.yaml +++ b/helm/bundles/cortex-nova/alerts/nova.alerts.yaml @@ -276,7 +276,25 @@ groups: configuration. It is recommended to investigate the pipeline status and logs for more details. - # Committed Resource (Limes Integration) Alerts + # Committed Resource Info API Alerts + - alert: CortexNovaCommittedResourceInfoHttpRequest500sTooHigh + expr: rate(cortex_committed_resource_info_api_requests_total{service="cortex-nova-metrics", status_code=~"5.."}[5m]) > 0.1 + for: 5m + labels: + context: committed-resource-api + dashboard: cortex/cortex + service: cortex + severity: warning + support_group: workload-management + annotations: + summary: "Committed Resource info API HTTP 500 errors too high" + description: > + The committed resource info API (Limes LIQUID integration) is responding + with HTTP 5xx errors. This indicates internal problems building service info, + such as invalid flavor group data. Limes will not be able to discover available + resources until the issue is resolved. + + # Committed Resource Change API Alerts - alert: CortexNovaCommittedResourceHttpRequest400sTooHigh expr: rate(cortex_committed_resource_change_api_requests_total{service="cortex-nova-metrics", status_code=~"4.."}[5m]) > 0.1 for: 5m diff --git a/internal/scheduling/reservations/commitments/api.go b/internal/scheduling/reservations/commitments/api.go index daedb8a2f..3c6179d83 100644 --- a/internal/scheduling/reservations/commitments/api.go +++ b/internal/scheduling/reservations/commitments/api.go @@ -28,6 +28,7 @@ type HTTPAPI struct { monitor ChangeCommitmentsAPIMonitor usageMonitor ReportUsageAPIMonitor capacityMonitor ReportCapacityAPIMonitor + infoMonitor InfoAPIMonitor // Mutex to serialize change-commitments requests changeMutex sync.Mutex } @@ -44,6 +45,7 @@ func NewAPIWithConfig(client client.Client, config Config, novaClient UsageNovaC monitor: NewChangeCommitmentsAPIMonitor(), usageMonitor: NewReportUsageAPIMonitor(), capacityMonitor: NewReportCapacityAPIMonitor(), + infoMonitor: NewInfoAPIMonitor(), } } @@ -51,6 +53,7 @@ func (api *HTTPAPI) Init(mux *http.ServeMux, registry prometheus.Registerer, log registry.MustRegister(&api.monitor) registry.MustRegister(&api.usageMonitor) registry.MustRegister(&api.capacityMonitor) + registry.MustRegister(&api.infoMonitor) mux.HandleFunc("/v1/commitments/change-commitments", api.HandleChangeCommitments) mux.HandleFunc("/v1/commitments/report-capacity", api.HandleReportCapacity) mux.HandleFunc("/v1/commitments/info", api.HandleInfo) diff --git a/internal/scheduling/reservations/commitments/api_info.go b/internal/scheduling/reservations/commitments/api_info.go index c189c859a..7c8c72748 100644 --- a/internal/scheduling/reservations/commitments/api_info.go +++ b/internal/scheduling/reservations/commitments/api_info.go @@ -6,9 +6,12 @@ package commitments import ( "context" "encoding/json" + "errors" "fmt" "net/http" + "strconv" "strings" + "time" "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations" "github.com/go-logr/logr" @@ -16,10 +19,16 @@ import ( liquid "github.com/sapcc/go-api-declarations/liquid" ) +// errInternalServiceInfo indicates an internal error while building service info (e.g., invalid unit configuration) +var errInternalServiceInfo = errors.New("internal error building service info") + // handles GET /v1/info requests from Limes: // See: https://github.com/sapcc/go-api-declarations/blob/main/liquid/commitment.go // See: https://pkg.go.dev/github.com/sapcc/go-api-declarations/liquid func (api *HTTPAPI) HandleInfo(w http.ResponseWriter, r *http.Request) { + startTime := time.Now() + statusCode := http.StatusOK + // Extract or generate request ID for tracing requestID := r.Header.Get("X-Request-ID") if requestID == "" { @@ -33,7 +42,9 @@ func (api *HTTPAPI) HandleInfo(w http.ResponseWriter, r *http.Request) { // Only accept GET method if r.Method != http.MethodGet { - http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + statusCode = http.StatusMethodNotAllowed + http.Error(w, "Method not allowed", statusCode) + api.recordInfoMetrics(statusCode, startTime) return } @@ -42,20 +53,35 @@ func (api *HTTPAPI) HandleInfo(w http.ResponseWriter, r *http.Request) { // Build info response info, err := api.buildServiceInfo(ctx, logger) if err != nil { - // Use Info level for expected conditions like knowledge not being ready yet - logger.Info("service info not available yet", "error", err.Error()) - http.Error(w, "Service temporarily unavailable: "+err.Error(), - http.StatusServiceUnavailable) + if errors.Is(err, errInternalServiceInfo) { + logger.Error(err, "internal error building service info") + statusCode = http.StatusInternalServerError + http.Error(w, "Internal server error: "+err.Error(), statusCode) + } else { + // Use Info level for expected conditions like knowledge not being ready yet + logger.Info("service info not available yet", "error", err.Error()) + statusCode = http.StatusServiceUnavailable + http.Error(w, "Service temporarily unavailable: "+err.Error(), statusCode) + } + api.recordInfoMetrics(statusCode, startTime) return } // Return response w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) + w.WriteHeader(statusCode) if err := json.NewEncoder(w).Encode(info); err != nil { logger.Error(err, "failed to encode service info") - return } + api.recordInfoMetrics(statusCode, startTime) +} + +// recordInfoMetrics records Prometheus metrics for an info API request. +func (api *HTTPAPI) recordInfoMetrics(statusCode int, startTime time.Time) { + duration := time.Since(startTime).Seconds() + statusCodeStr := strconv.Itoa(statusCode) + api.infoMonitor.requestCounter.WithLabelValues(statusCodeStr).Inc() + api.infoMonitor.requestDuration.WithLabelValues(statusCodeStr).Observe(duration) } // resourceAttributes holds the custom attributes for a resource in the info API response. @@ -108,9 +134,22 @@ func (api *HTTPAPI) buildServiceInfo(ctx context.Context, logger logr.Logger) (l attrsJSON = nil } + // Build unit from smallest flavor memory (e.g., "131072 MiB" for 128 GiB) + // Validate memory is positive to avoid panic in MultiplyBy (which panics on factor=0) + if groupData.SmallestFlavor.MemoryMB == 0 { + return liquid.ServiceInfo{}, fmt.Errorf("%w: flavor group %q has invalid smallest flavor with memoryMB=0", + errInternalServiceInfo, groupName) + } + unit, err := liquid.UnitMebibytes.MultiplyBy(groupData.SmallestFlavor.MemoryMB) + if err != nil { + // Note: This error only occurs on uint64 overflow, which is unrealistic for memory values + return liquid.ServiceInfo{}, fmt.Errorf("%w: failed to create unit for flavor group %q: %w", + errInternalServiceInfo, groupName, err) + } + resources[resourceName] = liquid.ResourceInfo{ DisplayName: displayName, - Unit: liquid.UnitNone, // Countable: multiples of smallest flavor instances + Unit: unit, // Non-standard unit: multiples of smallest flavor RAM Topology: liquid.AZAwareTopology, // Commitments are per-AZ NeedsResourceDemand: false, // Capacity planning out of scope for now HasCapacity: handlesCommitments, // We report capacity via /v1/report-capacity only for groups that accept commitments diff --git a/internal/scheduling/reservations/commitments/api_info_monitor.go b/internal/scheduling/reservations/commitments/api_info_monitor.go new file mode 100644 index 000000000..aaae937ef --- /dev/null +++ b/internal/scheduling/reservations/commitments/api_info_monitor.go @@ -0,0 +1,53 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package commitments + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +// InfoAPIMonitor provides metrics for the CR info API. +type InfoAPIMonitor struct { + requestCounter *prometheus.CounterVec + requestDuration *prometheus.HistogramVec +} + +// NewInfoAPIMonitor creates a new monitor with Prometheus metrics. +// Metrics are pre-initialized with zero values for common HTTP status codes +// to ensure they appear in Prometheus before the first request. +func NewInfoAPIMonitor() InfoAPIMonitor { + m := InfoAPIMonitor{ + requestCounter: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_committed_resource_info_api_requests_total", + Help: "Total number of committed resource info API requests by HTTP status code", + }, []string{"status_code"}), + requestDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "cortex_committed_resource_info_api_request_duration_seconds", + Help: "Duration of committed resource info API requests in seconds by HTTP status code", + Buckets: []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10}, + }, []string{"status_code"}), + } + + // Pre-initialize metrics with zero values for common HTTP status codes. + // This ensures metrics exist in Prometheus before the first request, + // preventing "metric missing" warnings in alerting rules. + for _, statusCode := range []string{"200", "405", "500", "503"} { + m.requestCounter.WithLabelValues(statusCode) + m.requestDuration.WithLabelValues(statusCode) + } + + return m +} + +// Describe implements prometheus.Collector. +func (m *InfoAPIMonitor) Describe(ch chan<- *prometheus.Desc) { + m.requestCounter.Describe(ch) + m.requestDuration.Describe(ch) +} + +// Collect implements prometheus.Collector. +func (m *InfoAPIMonitor) Collect(ch chan<- prometheus.Metric) { + m.requestCounter.Collect(ch) + m.requestDuration.Collect(ch) +} diff --git a/internal/scheduling/reservations/commitments/api_info_test.go b/internal/scheduling/reservations/commitments/api_info_test.go index 828a255ff..4f42cd2a2 100644 --- a/internal/scheduling/reservations/commitments/api_info_test.go +++ b/internal/scheduling/reservations/commitments/api_info_test.go @@ -28,9 +28,7 @@ func TestHandleInfo_KnowledgeNotReady(t *testing.T) { WithScheme(scheme). Build() - api := &HTTPAPI{ - client: k8sClient, - } + api := NewAPI(k8sClient) req := httptest.NewRequest(http.MethodGet, "/v1/info", http.NoBody) w := httptest.NewRecorder() @@ -62,9 +60,7 @@ func TestHandleInfo_MethodNotAllowed(t *testing.T) { WithScheme(scheme). Build() - api := &HTTPAPI{ - client: k8sClient, - } + api := NewAPI(k8sClient) // Use POST instead of GET req := httptest.NewRequest(http.MethodPost, "/v1/info", http.NoBody) @@ -80,6 +76,67 @@ func TestHandleInfo_MethodNotAllowed(t *testing.T) { } } +func TestHandleInfo_InvalidFlavorMemory(t *testing.T) { + // Test that a 500 Internal Server Error is returned when a flavor group has invalid data. + // + // A flavor with memoryMB=0 is invalid and should trigger an HTTP 500 error. + // Such data could occur from a bug in the flavor groups extractor. + scheme := runtime.NewScheme() + if err := v1alpha1.AddToScheme(scheme); err != nil { + t.Fatalf("failed to add scheme: %v", err) + } + + // Create flavor group with memoryMB=0 (invalid data that could come from a buggy extractor) + features := []map[string]interface{}{ + { + "name": "invalid_group", + "flavors": []map[string]interface{}{ + {"name": "zero_memory_flavor", "vcpus": 4, "memoryMB": 0, "diskGB": 50}, + }, + "largestFlavor": map[string]interface{}{"name": "zero_memory_flavor", "vcpus": 4, "memoryMB": 0, "diskGB": 50}, + "smallestFlavor": map[string]interface{}{"name": "zero_memory_flavor", "vcpus": 4, "memoryMB": 0, "diskGB": 50}, + "ramCoreRatio": 4096, + }, + } + + raw, err := v1alpha1.BoxFeatureList(features) + if err != nil { + t.Fatalf("failed to box features: %v", err) + } + + knowledge := &v1alpha1.Knowledge{ + ObjectMeta: v1.ObjectMeta{Name: "flavor-groups"}, + Spec: v1alpha1.KnowledgeSpec{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + Extractor: v1alpha1.KnowledgeExtractorSpec{Name: "flavor_groups"}, + }, + Status: v1alpha1.KnowledgeStatus{ + Conditions: []v1.Condition{{Type: v1alpha1.KnowledgeConditionReady, Status: "True"}}, + Raw: raw, + LastContentChange: v1.Now(), + }, + } + + k8sClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(knowledge). + Build() + + api := NewAPI(k8sClient) + + req := httptest.NewRequest(http.MethodGet, "/v1/info", http.NoBody) + w := httptest.NewRecorder() + api.HandleInfo(w, req) + + resp := w.Result() + defer resp.Body.Close() + + // Should return 500 Internal Server Error when unit creation fails + if resp.StatusCode != http.StatusInternalServerError { + t.Errorf("expected status code %d (Internal Server Error), got %d", http.StatusInternalServerError, resp.StatusCode) + } +} + func TestHandleInfo_HasCapacityEqualsHandlesCommitments(t *testing.T) { // Test that HasCapacity == HandlesCommitments for all resources // Both should be true only for groups with fixed RAM/core ratio @@ -138,7 +195,7 @@ func TestHandleInfo_HasCapacityEqualsHandlesCommitments(t *testing.T) { WithObjects(knowledge). Build() - api := &HTTPAPI{client: k8sClient} + api := NewAPI(k8sClient) req := httptest.NewRequest(http.MethodGet, "/v1/info", http.NoBody) w := httptest.NewRecorder() diff --git a/internal/scheduling/reservations/commitments/context.go b/internal/scheduling/reservations/commitments/context.go index 64257fcae..aabee0241 100644 --- a/internal/scheduling/reservations/commitments/context.go +++ b/internal/scheduling/reservations/commitments/context.go @@ -21,6 +21,12 @@ func WithNewGlobalRequestID(ctx context.Context) context.Context { return reservations.WithGlobalRequestID(ctx, "committed-resource-"+uuid.New().String()) } +// WithGlobalRequestID creates a new context with the specified global request ID. +// This is used to propagate existing request IDs (e.g., from the creator annotation). +func WithGlobalRequestID(ctx context.Context, greq string) context.Context { + return reservations.WithGlobalRequestID(ctx, greq) +} + // LoggerFromContext returns a logger with greq and req values from the context. // This creates a child logger with the request tracking values pre-attached, // so you don't need to repeat them in every log call. diff --git a/internal/scheduling/reservations/commitments/controller.go b/internal/scheduling/reservations/commitments/controller.go index e03958151..71530cb59 100644 --- a/internal/scheduling/reservations/commitments/controller.go +++ b/internal/scheduling/reservations/commitments/controller.go @@ -49,20 +49,21 @@ type CommitmentReservationController struct { // move the current state of the cluster closer to the desired state. // Note: This controller only handles commitment reservations, as filtered by the predicate. func (r *CommitmentReservationController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - ctx = WithNewGlobalRequestID(ctx) - logger := LoggerFromContext(ctx).WithValues("component", "controller", "reservation", req.Name) - - // Fetch the reservation object. + // Fetch the reservation object first to check for creator request ID. var res v1alpha1.Reservation if err := r.Get(ctx, req.NamespacedName, &res); err != nil { // Ignore not-found errors, since they can't be fixed by an immediate requeue return ctrl.Result{}, client.IgnoreNotFound(err) } - // Extract creator request ID from annotation for end-to-end traceability + // Use creator request ID from annotation for end-to-end traceability if available, + // otherwise generate a new one for this reconcile loop. if creatorReq := res.Annotations[v1alpha1.AnnotationCreatorRequestID]; creatorReq != "" { - logger = logger.WithValues("creatorReq", creatorReq) + ctx = WithGlobalRequestID(ctx, creatorReq) + } else { + ctx = WithNewGlobalRequestID(ctx) } + logger := LoggerFromContext(ctx).WithValues("component", "controller", "reservation", req.Name) // filter for CR reservations resourceName := "" diff --git a/internal/scheduling/reservations/commitments/e2e_checks.go b/internal/scheduling/reservations/commitments/e2e_checks.go new file mode 100644 index 000000000..6272979bf --- /dev/null +++ b/internal/scheduling/reservations/commitments/e2e_checks.go @@ -0,0 +1,77 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package commitments + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log/slog" + "net/http" + + liquid "github.com/sapcc/go-api-declarations/liquid" + "github.com/sapcc/go-bits/must" +) + +const ( + // Default URL for the commitments API endpoint. + // This should match the service name in the helm chart. + defaultCommitmentsAPIURL = "http://cortex-nova-scheduler:8080" +) + +// E2EChecksConfig holds the configuration for CR e2e checks. +type E2EChecksConfig struct { + // Base URL for the commitments API. If empty, defaults to defaultCommitmentsAPIURL. + BaseURL string `json:"baseURL"` +} + +// CheckCommitmentsInfoEndpoint sends a GET request to the /v1/commitments/info endpoint +// and verifies that it returns HTTP 200 with a valid ServiceInfo response. +func CheckCommitmentsInfoEndpoint(ctx context.Context, config E2EChecksConfig) { + baseURL := config.BaseURL + if baseURL == "" { + baseURL = defaultCommitmentsAPIURL + } + apiURL := baseURL + "/v1/commitments/info" + slog.Info("checking commitments info endpoint", "apiURL", apiURL) + + httpReq := must.Return(http.NewRequestWithContext(ctx, http.MethodGet, apiURL, http.NoBody)) + httpReq.Header.Set("Accept", "application/json") + + //nolint:bodyclose // Body is closed in the deferred function below. + resp := must.Return(http.DefaultClient.Do(httpReq)) + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + bodyBytes := must.Return(io.ReadAll(resp.Body)) + slog.Error("commitments info API returned non-200 status code", + "statusCode", resp.StatusCode, + "responseBody", string(bodyBytes), + ) + panic(fmt.Sprintf("commitments info API returned status %d, expected 200", resp.StatusCode)) + } + + var serviceInfo liquid.ServiceInfo + if err := json.NewDecoder(resp.Body).Decode(&serviceInfo); err != nil { + panic(fmt.Sprintf("failed to decode ServiceInfo response: %v", err)) + } + + // Basic validation of the response + if serviceInfo.Version < 0 { + slog.Warn("commitments info returned version -1, knowledge may not be ready yet") + } + + slog.Info("commitments info endpoint check passed", + "version", serviceInfo.Version, + "resourceCount", len(serviceInfo.Resources), + ) +} + +// RunCommitmentsE2EChecks runs all e2e checks for the commitments API. +func RunCommitmentsE2EChecks(ctx context.Context, config E2EChecksConfig) { + slog.Info("running commitments e2e checks") + CheckCommitmentsInfoEndpoint(ctx, config) + slog.Info("all commitments e2e checks passed") +} diff --git a/internal/scheduling/reservations/commitments/syncer.go b/internal/scheduling/reservations/commitments/syncer.go index b7db11351..ace9488b2 100644 --- a/internal/scheduling/reservations/commitments/syncer.go +++ b/internal/scheduling/reservations/commitments/syncer.go @@ -42,12 +42,15 @@ type Syncer struct { CommitmentsClient // Kubernetes client for CRD operations client.Client + // Monitor for metrics + monitor *SyncerMonitor } -func NewSyncer(k8sClient client.Client) *Syncer { +func NewSyncer(k8sClient client.Client, monitor *SyncerMonitor) *Syncer { return &Syncer{ CommitmentsClient: NewCommitmentsClient(), Client: k8sClient, + monitor: monitor, } } @@ -58,7 +61,16 @@ func (s *Syncer) Init(ctx context.Context, config SyncerConfig) error { return nil } -func (s *Syncer) getCommitmentStates(ctx context.Context, log logr.Logger, flavorGroups map[string]compute.FlavorGroupFeature) ([]*CommitmentState, error) { +// getCommitmentStatesResult holds both processed and skipped commitment UUIDs +type getCommitmentStatesResult struct { + // states are the commitments that were successfully processed + states []*CommitmentState + // skippedUUIDs are commitment UUIDs that were skipped (e.g., due to unit mismatch) + // but should NOT have their existing CRDs deleted + skippedUUIDs map[string]bool +} + +func (s *Syncer) getCommitmentStates(ctx context.Context, log logr.Logger, flavorGroups map[string]compute.FlavorGroupFeature) (*getCommitmentStatesResult, error) { allProjects, err := s.ListProjects(ctx) if err != nil { return nil, err @@ -69,7 +81,10 @@ func (s *Syncer) getCommitmentStates(ctx context.Context, log logr.Logger, flavo } // Filter for compute commitments with RAM flavor group resources - var commitmentStates []*CommitmentState + result := &getCommitmentStatesResult{ + states: []*CommitmentState{}, + skippedUUIDs: make(map[string]bool), + } for id, commitment := range commitments { if commitment.ServiceType != "compute" { log.Info("skipping non-compute commitment", "id", id, "serviceType", commitment.ServiceType) @@ -99,6 +114,29 @@ func (s *Syncer) getCommitmentStates(ctx context.Context, log logr.Logger, flavo continue } + // Validate unit matches between Limes commitment and Cortex flavor group + // Expected format: " MiB" e.g. "131072 MiB" for 128 GiB + expectedUnit := fmt.Sprintf("%d MiB", flavorGroup.SmallestFlavor.MemoryMB) + if commitment.Unit != "" && commitment.Unit != expectedUnit { + // Unit mismatch: Limes has not yet updated this commitment to the new unit. + // Skip this commitment - trust what Cortex already has stored in CRDs. + // On the next sync cycle after Limes updates, this will be processed. + log.V(0).Info("WARNING: skipping commitment due to unit mismatch - Limes unit differs from Cortex flavor group, waiting for Limes to update", + "commitmentUUID", commitment.UUID, + "flavorGroup", flavorGroupName, + "limesUnit", commitment.Unit, + "expectedUnit", expectedUnit, + "smallestFlavorMemoryMB", flavorGroup.SmallestFlavor.MemoryMB) + if s.monitor != nil { + s.monitor.RecordUnitMismatch(flavorGroupName) + } + // Track skipped commitment so its existing CRDs won't be deleted + if commitment.UUID != "" { + result.skippedUUIDs[commitment.UUID] = true + } + continue + } + // Skip commitments with empty UUID if commitment.UUID == "" { log.Info("skipping commitment with empty UUID", @@ -121,10 +159,10 @@ func (s *Syncer) getCommitmentStates(ctx context.Context, log logr.Logger, flavo "amount", commitment.Amount, "totalMemoryBytes", state.TotalMemoryBytes) - commitmentStates = append(commitmentStates, state) + result.states = append(result.states, state) } - return commitmentStates, nil + return result, nil } // SyncReservations fetches commitments from Limes and synchronizes Reservation CRDs. @@ -158,7 +196,7 @@ func (s *Syncer) SyncReservations(ctx context.Context) error { } // Get all commitments as states - commitmentStates, err := s.getCommitmentStates(ctx, logger, flavorGroups) + commitmentResult, err := s.getCommitmentStates(ctx, logger, flavorGroups) if err != nil { logger.Error(err, "failed to get compute commitments") return err @@ -168,7 +206,7 @@ func (s *Syncer) SyncReservations(ctx context.Context) error { manager := NewReservationManager(s.Client) // Apply each commitment state using the manager - for _, state := range commitmentStates { + for _, state := range commitmentResult.states { logger.Info("applying commitment state", "commitmentUUID", state.CommitmentUUID, "projectID", state.ProjectID, @@ -194,11 +232,15 @@ func (s *Syncer) SyncReservations(ctx context.Context) error { return err } - // Build set of commitment UUIDs we should have + // Build set of commitment UUIDs we should have (processed + skipped) activeCommitments := make(map[string]bool) - for _, state := range commitmentStates { + for _, state := range commitmentResult.states { activeCommitments[state.CommitmentUUID] = true } + // Also include skipped commitments - don't delete their CRDs + for uuid := range commitmentResult.skippedUUIDs { + activeCommitments[uuid] = true + } // Delete reservations for commitments that no longer exist for _, existing := range existingReservations.Items { @@ -221,6 +263,8 @@ func (s *Syncer) SyncReservations(ctx context.Context) error { } } - logger.Info("synced reservations", "commitmentCount", len(commitmentStates)) + logger.Info("synced reservations", + "processedCount", len(commitmentResult.states), + "skippedCount", len(commitmentResult.skippedUUIDs)) return nil } diff --git a/internal/scheduling/reservations/commitments/syncer_monitor.go b/internal/scheduling/reservations/commitments/syncer_monitor.go new file mode 100644 index 000000000..0bbc7fe7f --- /dev/null +++ b/internal/scheduling/reservations/commitments/syncer_monitor.go @@ -0,0 +1,63 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package commitments + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +// SyncerMonitor provides metrics for the commitment syncer. +type SyncerMonitor struct { + syncRuns prometheus.Counter + syncErrors prometheus.Counter + unitMismatch *prometheus.CounterVec +} + +// NewSyncerMonitor creates a new monitor with Prometheus metrics. +func NewSyncerMonitor() *SyncerMonitor { + m := &SyncerMonitor{ + syncRuns: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "cortex_committed_resource_syncer_runs_total", + Help: "Total number of commitment syncer runs", + }), + syncErrors: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "cortex_committed_resource_syncer_errors_total", + Help: "Total number of commitment syncer errors", + }), + unitMismatch: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_committed_resource_syncer_unit_mismatch_total", + Help: "Total number of commitments with unit mismatch between Limes and Cortex flavor group knowledge", + }, []string{"flavor_group"}), + } + return m +} + +// RecordUnitMismatch records a unit mismatch for a flavor group. +func (m *SyncerMonitor) RecordUnitMismatch(flavorGroup string) { + m.unitMismatch.WithLabelValues(flavorGroup).Inc() +} + +// RecordSyncRun records a syncer run. +func (m *SyncerMonitor) RecordSyncRun() { + m.syncRuns.Inc() +} + +// RecordSyncError records a syncer error. +func (m *SyncerMonitor) RecordSyncError() { + m.syncErrors.Inc() +} + +// Describe implements prometheus.Collector. +func (m *SyncerMonitor) Describe(ch chan<- *prometheus.Desc) { + m.syncRuns.Describe(ch) + m.syncErrors.Describe(ch) + m.unitMismatch.Describe(ch) +} + +// Collect implements prometheus.Collector. +func (m *SyncerMonitor) Collect(ch chan<- prometheus.Metric) { + m.syncRuns.Collect(ch) + m.syncErrors.Collect(ch) + m.unitMismatch.Collect(ch) +} diff --git a/internal/scheduling/reservations/commitments/syncer_test.go b/internal/scheduling/reservations/commitments/syncer_test.go index 75512299a..4c438ea40 100644 --- a/internal/scheduling/reservations/commitments/syncer_test.go +++ b/internal/scheduling/reservations/commitments/syncer_test.go @@ -429,6 +429,171 @@ func TestSyncer_SyncReservations_UpdateExisting(t *testing.T) { } } +func TestSyncer_SyncReservations_UnitMismatch(t *testing.T) { + scheme := runtime.NewScheme() + if err := v1alpha1.AddToScheme(scheme); err != nil { + t.Fatalf("Failed to add scheme: %v", err) + } + + // Create flavor group knowledge CRD with smallest flavor of 1024MB + flavorGroupsKnowledge := createFlavorGroupKnowledge(t, map[string]FlavorGroupData{ + "test_group_v1": { + LargestFlavorName: "test-flavor-large", + LargestFlavorVCPUs: 8, + LargestFlavorMemoryMB: 8192, + SmallestFlavorName: "test-flavor-small", + SmallestFlavorVCPUs: 2, + SmallestFlavorMemoryMB: 1024, + }, + }) + + k8sClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(flavorGroupsKnowledge). + Build() + + // Create mock commitment with a unit that doesn't match Cortex's understanding + // Limes says "2048 MiB" but Cortex's smallest flavor is 1024 MB + mockCommitments := []Commitment{ + { + ID: 1, + UUID: "unit-mismatch-test-uuid", + ServiceType: "compute", + ResourceName: "ram_test_group_v1", + AvailabilityZone: "az1", + Amount: 2, + Unit: "2048 MiB", // Mismatched unit - should be "1024 MiB" + ProjectID: "test-project", + DomainID: "test-domain", + }, + } + + // Create monitor to capture the mismatch metric + monitor := NewSyncerMonitor() + + mockClient := &mockCommitmentsClient{ + listCommitmentsByIDFunc: func(ctx context.Context, projects ...Project) (map[string]Commitment, error) { + result := make(map[string]Commitment) + for _, c := range mockCommitments { + result[c.UUID] = c + } + return result, nil + }, + listProjectsFunc: func(ctx context.Context) ([]Project, error) { + return []Project{ + {ID: "test-project", DomainID: "test-domain", Name: "Test Project"}, + }, nil + }, + } + + syncer := &Syncer{ + CommitmentsClient: mockClient, + Client: k8sClient, + monitor: monitor, + } + + err := syncer.SyncReservations(context.Background()) + if err != nil { + t.Errorf("SyncReservations() error = %v", err) + return + } + + // Verify that NO reservations were created due to unit mismatch + // The commitment is skipped and Cortex trusts existing CRDs + var reservations v1alpha1.ReservationList + err = k8sClient.List(context.Background(), &reservations) + if err != nil { + t.Errorf("Failed to list reservations: %v", err) + return + } + + // Should have 0 reservations - commitment is skipped due to unit mismatch + // Cortex waits for Limes to update the unit before processing + if len(reservations.Items) != 0 { + t.Errorf("Expected 0 reservations (commitment skipped due to unit mismatch), got %d", len(reservations.Items)) + } +} + +func TestSyncer_SyncReservations_UnitMatch(t *testing.T) { + scheme := runtime.NewScheme() + if err := v1alpha1.AddToScheme(scheme); err != nil { + t.Fatalf("Failed to add scheme: %v", err) + } + + // Create flavor group knowledge CRD with smallest flavor of 1024MB + flavorGroupsKnowledge := createFlavorGroupKnowledge(t, map[string]FlavorGroupData{ + "test_group_v1": { + LargestFlavorName: "test-flavor-large", + LargestFlavorVCPUs: 8, + LargestFlavorMemoryMB: 8192, + SmallestFlavorName: "test-flavor-small", + SmallestFlavorVCPUs: 2, + SmallestFlavorMemoryMB: 1024, + }, + }) + + k8sClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(flavorGroupsKnowledge). + Build() + + // Create mock commitment with correct unit matching Cortex's smallest flavor + mockCommitments := []Commitment{ + { + ID: 1, + UUID: "unit-match-test-uuid", + ServiceType: "compute", + ResourceName: "ram_test_group_v1", + AvailabilityZone: "az1", + Amount: 2, + Unit: "1024 MiB", // Correct unit matching smallest flavor + ProjectID: "test-project", + DomainID: "test-domain", + }, + } + + monitor := NewSyncerMonitor() + + mockClient := &mockCommitmentsClient{ + listCommitmentsByIDFunc: func(ctx context.Context, projects ...Project) (map[string]Commitment, error) { + result := make(map[string]Commitment) + for _, c := range mockCommitments { + result[c.UUID] = c + } + return result, nil + }, + listProjectsFunc: func(ctx context.Context) ([]Project, error) { + return []Project{ + {ID: "test-project", DomainID: "test-domain", Name: "Test Project"}, + }, nil + }, + } + + syncer := &Syncer{ + CommitmentsClient: mockClient, + Client: k8sClient, + monitor: monitor, + } + + err := syncer.SyncReservations(context.Background()) + if err != nil { + t.Errorf("SyncReservations() error = %v", err) + return + } + + // Verify that reservations were created + var reservations v1alpha1.ReservationList + err = k8sClient.List(context.Background(), &reservations) + if err != nil { + t.Errorf("Failed to list reservations: %v", err) + return + } + + if len(reservations.Items) != 2 { + t.Errorf("Expected 2 reservations, got %d", len(reservations.Items)) + } +} + func TestSyncer_SyncReservations_EmptyUUID(t *testing.T) { scheme := runtime.NewScheme() if err := v1alpha1.AddToScheme(scheme); err != nil { diff --git a/internal/scheduling/reservations/scheduler_client.go b/internal/scheduling/reservations/scheduler_client.go index e89628b34..db7829e23 100644 --- a/internal/scheduling/reservations/scheduler_client.go +++ b/internal/scheduling/reservations/scheduler_client.go @@ -118,6 +118,7 @@ func (c *SchedulerClient) ScheduleReservation(ctx context.Context, req ScheduleR Context: api.NovaRequestContext{ RequestID: RequestIDFromContext(ctx), GlobalRequestID: globalReqID, + ProjectID: req.ProjectID, }, Spec: api.NovaObject[api.NovaSpec]{ Data: api.NovaSpec{