Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,9 @@ func main() {

if slices.Contains(mainConfig.EnabledTasks, "commitments-sync-task") {
setupLog.Info("starting commitments syncer")
syncer := commitments.NewSyncer(multiclusterClient)
syncerMonitor := commitments.NewSyncerMonitor()
must.Succeed(metrics.Registry.Register(syncerMonitor))
syncer := commitments.NewSyncer(multiclusterClient, syncerMonitor)
syncerConfig := conf.GetConfigOrDie[commitments.SyncerConfig]()
syncerDefaults := commitments.DefaultSyncerConfig()
if syncerConfig.SyncInterval == 0 {
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ require (
github.com/poy/onpar v0.3.5 // indirect
github.com/prometheus/common v0.67.5 // indirect
github.com/prometheus/procfs v0.17.0 // indirect
github.com/sapcc/go-api-declarations v1.20.2
github.com/sapcc/go-api-declarations v1.21.0
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/cobra v1.10.1 // indirect
github.com/spf13/pflag v1.0.10 // indirect
Expand All @@ -98,7 +98,7 @@ require (
golang.org/x/oauth2 v0.34.0 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sys v0.42.0 // indirect
golang.org/x/term v0.41.0 // indirect
golang.org/x/term v0.41.0
golang.org/x/text v0.33.0 // indirect
golang.org/x/time v0.14.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.5.0 // indirect
Expand All @@ -108,7 +108,7 @@ require (
google.golang.org/protobuf v1.36.11 // indirect
gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gopkg.in/yaml.v3 v3.0.1
gotest.tools v2.2.0+incompatible // indirect
k8s.io/apiextensions-apiserver v0.35.0 // indirect
k8s.io/apiserver v0.35.0 // indirect
Expand Down
8 changes: 2 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ github.com/prometheus/procfs v0.17.0/go.mod h1:oPQLaDAMRbA+u8H5Pbfq+dl3VDAvHxMUO
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/sapcc/go-api-declarations v1.20.2 h1:GWqv8VgsF4k9id6N051AVTaEpcjT02APsOuz2yCvTPQ=
github.com/sapcc/go-api-declarations v1.20.2/go.mod h1:eiRrXXUeQS5C/1kKn8/KMjk0Y0goUzgDQswj30rH0Zc=
github.com/sapcc/go-api-declarations v1.21.0 h1:Ag6GXgJLTFdBDKmrJU4QFllQbgGSenSGeHpLuvuxeDk=
github.com/sapcc/go-api-declarations v1.21.0/go.mod h1:eiRrXXUeQS5C/1kKn8/KMjk0Y0goUzgDQswj30rH0Zc=
github.com/sapcc/go-bits v0.0.0-20260312170110-034b497ebb7e h1:4wgkrfAlnL6ffM7HTNoHn1HrBBurCRR71WNOszdiDNQ=
github.com/sapcc/go-bits v0.0.0-20260312170110-034b497ebb7e/go.mod h1:NZjMiGVm04U25vwR6ZWvMw0XOOnvS1jkmXpjiepOeUw=
github.com/sergi/go-diff v1.4.0 h1:n/SP9D5ad1fORl+llWyN+D6qoUETXNZARKjyY2/KVCw=
Expand Down Expand Up @@ -249,12 +249,8 @@ golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ=
golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/term v0.39.0 h1:RclSuaJf32jOqZz74CkPA9qFuVTX7vhLlpfj/IGWlqY=
golang.org/x/term v0.39.0/go.mod h1:yxzUCTP/U+FzoxfdKmLaA0RV1WgE0VY7hXBwKtY/4ww=
golang.org/x/term v0.41.0 h1:QCgPso/Q3RTJx2Th4bDLqML4W6iJiaXFq2/ftQF13YU=
golang.org/x/term v0.41.0/go.mod h1:3pfBgksrReYfZ5lvYM0kSO0LIkAl4Yl2bXOkKP7Ec2A=
golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE=
Expand Down
12 changes: 11 additions & 1 deletion internal/scheduling/reservations/commitments/api_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,19 @@ func (api *HTTPAPI) buildServiceInfo(ctx context.Context, logger logr.Logger) (l
attrsJSON = nil
}

// Build unit from smallest flavor memory (e.g., "131072 MiB" for 128 GiB)
unit, err := liquid.UnitMebibytes.MultiplyBy(groupData.SmallestFlavor.MemoryMB)
if err != nil {
logger.Error(err, "failed to create unit for flavor group",
"flavorGroup", groupName,
"smallestFlavorMemoryMB", groupData.SmallestFlavor.MemoryMB)
// Fall back to UnitNone if unit creation fails (should not happen in practice)
unit = liquid.UnitNone
}

resources[resourceName] = liquid.ResourceInfo{
DisplayName: displayName,
Unit: liquid.UnitNone, // Countable: multiples of smallest flavor instances
Unit: unit, // Non-standard unit: multiples of smallest flavor RAM
Comment on lines +111 to +123
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🌐 Web query:

go-api-declarations liquid Unit MultiplyBy method sapcc

💡 Result:

In github.com/sapcc/go-api-declarations/liquid, there is no Unit.MultiplyBy method. Instead, unit conversion is modeled via:

  • func (u Unit) Base() (Unit, uint64) which returns the base unit plus a multiplier to convert values in u into that base unit. For all binary-size units (KiB, MiB, …), the base unit is B (bytes); otherwise it returns (u, 1). [1][2]

Multiplier mapping (from Base()): KiB=1<<10, MiB=1<<20, GiB=1<<30, TiB=1<<40, PiB=1<<50, EiB=1<<60. [2]

How to “multiply by” in practice:

base, mul := u.Base()
valueInBase := value * mul

[1][2]

Sources: [1] [2]


Fix non-existent MultiplyBy API call — use Unit.Base() instead.

The liquid.UnitMebibytes.MultiplyBy() method does not exist in go-api-declarations/liquid. Use the Base() method to convert units:

baseUnit, multiplier := liquid.UnitMebibytes.Base()
// Then use: groupData.SmallestFlavor.MemoryMB * multiplier
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/scheduling/reservations/commitments/api_info.go` around lines 111 -
123, The code calls a non-existent method UnitMebibytes.MultiplyBy; replace that
logic by calling liquid.UnitMebibytes.Base() to get (baseUnit, multiplier), set
unit = baseUnit (or liquid.UnitNone on error), and compute any scaled value by
multiplying groupData.SmallestFlavor.MemoryMB by multiplier before assigning
into resources; update references around UnitMebibytes, Unit.Base(),
groupData.SmallestFlavor.MemoryMB, and the resources[resourceName] assignment to
use baseUnit and the computed scaled value.

Topology: liquid.AZAwareTopology, // Commitments are per-AZ
NeedsResourceDemand: false, // Capacity planning out of scope for now
HasCapacity: handlesCommitments, // We report capacity via /v1/report-capacity only for groups that accept commitments
Expand Down
6 changes: 6 additions & 0 deletions internal/scheduling/reservations/commitments/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ func WithNewGlobalRequestID(ctx context.Context) context.Context {
return reservations.WithGlobalRequestID(ctx, "committed-resource-"+uuid.New().String())
}

// WithGlobalRequestID creates a new context with the specified global request ID.
// This is used to propagate existing request IDs (e.g., from the creator annotation).
func WithGlobalRequestID(ctx context.Context, greq string) context.Context {
return reservations.WithGlobalRequestID(ctx, greq)
}

// LoggerFromContext returns a logger with greq and req values from the context.
// This creates a child logger with the request tracking values pre-attached,
// so you don't need to repeat them in every log call.
Expand Down
13 changes: 7 additions & 6 deletions internal/scheduling/reservations/commitments/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,21 @@ type CommitmentReservationController struct {
// move the current state of the cluster closer to the desired state.
// Note: This controller only handles commitment reservations, as filtered by the predicate.
func (r *CommitmentReservationController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
ctx = WithNewGlobalRequestID(ctx)
logger := LoggerFromContext(ctx).WithValues("component", "controller", "reservation", req.Name)

// Fetch the reservation object.
// Fetch the reservation object first to check for creator request ID.
var res v1alpha1.Reservation
if err := r.Get(ctx, req.NamespacedName, &res); err != nil {
// Ignore not-found errors, since they can't be fixed by an immediate requeue
return ctrl.Result{}, client.IgnoreNotFound(err)
}

// Extract creator request ID from annotation for end-to-end traceability
// Use creator request ID from annotation for end-to-end traceability if available,
// otherwise generate a new one for this reconcile loop.
if creatorReq := res.Annotations[v1alpha1.AnnotationCreatorRequestID]; creatorReq != "" {
logger = logger.WithValues("creatorReq", creatorReq)
ctx = WithGlobalRequestID(ctx, creatorReq)
} else {
ctx = WithNewGlobalRequestID(ctx)
}
logger := LoggerFromContext(ctx).WithValues("component", "controller", "reservation", req.Name)

// filter for CR reservations
resourceName := ""
Expand Down
64 changes: 54 additions & 10 deletions internal/scheduling/reservations/commitments/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,15 @@ type Syncer struct {
CommitmentsClient
// Kubernetes client for CRD operations
client.Client
// Monitor for metrics
monitor *SyncerMonitor
}

func NewSyncer(k8sClient client.Client) *Syncer {
func NewSyncer(k8sClient client.Client, monitor *SyncerMonitor) *Syncer {
return &Syncer{
CommitmentsClient: NewCommitmentsClient(),
Client: k8sClient,
monitor: monitor,
}
}

Expand All @@ -58,7 +61,16 @@ func (s *Syncer) Init(ctx context.Context, config SyncerConfig) error {
return nil
}

func (s *Syncer) getCommitmentStates(ctx context.Context, log logr.Logger, flavorGroups map[string]compute.FlavorGroupFeature) ([]*CommitmentState, error) {
// getCommitmentStatesResult holds both processed and skipped commitment UUIDs
type getCommitmentStatesResult struct {
// states are the commitments that were successfully processed
states []*CommitmentState
// skippedUUIDs are commitment UUIDs that were skipped (e.g., due to unit mismatch)
// but should NOT have their existing CRDs deleted
skippedUUIDs map[string]bool
}

func (s *Syncer) getCommitmentStates(ctx context.Context, log logr.Logger, flavorGroups map[string]compute.FlavorGroupFeature) (*getCommitmentStatesResult, error) {
allProjects, err := s.ListProjects(ctx)
if err != nil {
return nil, err
Expand All @@ -69,7 +81,10 @@ func (s *Syncer) getCommitmentStates(ctx context.Context, log logr.Logger, flavo
}

// Filter for compute commitments with RAM flavor group resources
var commitmentStates []*CommitmentState
result := &getCommitmentStatesResult{
states: []*CommitmentState{},
skippedUUIDs: make(map[string]bool),
}
for id, commitment := range commitments {
if commitment.ServiceType != "compute" {
log.Info("skipping non-compute commitment", "id", id, "serviceType", commitment.ServiceType)
Expand Down Expand Up @@ -99,6 +114,29 @@ func (s *Syncer) getCommitmentStates(ctx context.Context, log logr.Logger, flavo
continue
}

// Validate unit matches between Limes commitment and Cortex flavor group
// Expected format: "<memoryMB> MiB" e.g. "131072 MiB" for 128 GiB
expectedUnit := fmt.Sprintf("%d MiB", flavorGroup.SmallestFlavor.MemoryMB)
if commitment.Unit != "" && commitment.Unit != expectedUnit {
// Unit mismatch: Limes has not yet updated this commitment to the new unit.
// Skip this commitment - trust what Cortex already has stored in CRDs.
// On the next sync cycle after Limes updates, this will be processed.
log.V(0).Info("WARNING: skipping commitment due to unit mismatch - Limes unit differs from Cortex flavor group, waiting for Limes to update",
"commitmentUUID", commitment.UUID,
"flavorGroup", flavorGroupName,
"limesUnit", commitment.Unit,
"expectedUnit", expectedUnit,
"smallestFlavorMemoryMB", flavorGroup.SmallestFlavor.MemoryMB)
if s.monitor != nil {
s.monitor.RecordUnitMismatch(flavorGroupName)
}
// Track skipped commitment so its existing CRDs won't be deleted
if commitment.UUID != "" {
result.skippedUUIDs[commitment.UUID] = true
}
continue
}

// Skip commitments with empty UUID
if commitment.UUID == "" {
log.Info("skipping commitment with empty UUID",
Expand All @@ -121,10 +159,10 @@ func (s *Syncer) getCommitmentStates(ctx context.Context, log logr.Logger, flavo
"amount", commitment.Amount,
"totalMemoryBytes", state.TotalMemoryBytes)

commitmentStates = append(commitmentStates, state)
result.states = append(result.states, state)
}

return commitmentStates, nil
return result, nil
}

// SyncReservations fetches commitments from Limes and synchronizes Reservation CRDs.
Expand Down Expand Up @@ -158,7 +196,7 @@ func (s *Syncer) SyncReservations(ctx context.Context) error {
}

// Get all commitments as states
commitmentStates, err := s.getCommitmentStates(ctx, logger, flavorGroups)
commitmentResult, err := s.getCommitmentStates(ctx, logger, flavorGroups)
if err != nil {
logger.Error(err, "failed to get compute commitments")
return err
Expand All @@ -168,7 +206,7 @@ func (s *Syncer) SyncReservations(ctx context.Context) error {
manager := NewReservationManager(s.Client)

// Apply each commitment state using the manager
for _, state := range commitmentStates {
for _, state := range commitmentResult.states {
logger.Info("applying commitment state",
"commitmentUUID", state.CommitmentUUID,
"projectID", state.ProjectID,
Expand All @@ -194,11 +232,15 @@ func (s *Syncer) SyncReservations(ctx context.Context) error {
return err
}

// Build set of commitment UUIDs we should have
// Build set of commitment UUIDs we should have (processed + skipped)
activeCommitments := make(map[string]bool)
for _, state := range commitmentStates {
for _, state := range commitmentResult.states {
activeCommitments[state.CommitmentUUID] = true
}
// Also include skipped commitments - don't delete their CRDs
for uuid := range commitmentResult.skippedUUIDs {
activeCommitments[uuid] = true
}

// Delete reservations for commitments that no longer exist
for _, existing := range existingReservations.Items {
Expand All @@ -221,6 +263,8 @@ func (s *Syncer) SyncReservations(ctx context.Context) error {
}
}

logger.Info("synced reservations", "commitmentCount", len(commitmentStates))
logger.Info("synced reservations",
"processedCount", len(commitmentResult.states),
"skippedCount", len(commitmentResult.skippedUUIDs))
return nil
}
63 changes: 63 additions & 0 deletions internal/scheduling/reservations/commitments/syncer_monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright SAP SE
// SPDX-License-Identifier: Apache-2.0

package commitments

import (
"github.com/prometheus/client_golang/prometheus"
)

// SyncerMonitor provides metrics for the commitment syncer.
type SyncerMonitor struct {
syncRuns prometheus.Counter
syncErrors prometheus.Counter
unitMismatch *prometheus.CounterVec
}

// NewSyncerMonitor creates a new monitor with Prometheus metrics.
func NewSyncerMonitor() *SyncerMonitor {
m := &SyncerMonitor{
syncRuns: prometheus.NewCounter(prometheus.CounterOpts{
Name: "cortex_committed_resource_syncer_runs_total",
Help: "Total number of commitment syncer runs",
}),
syncErrors: prometheus.NewCounter(prometheus.CounterOpts{
Name: "cortex_committed_resource_syncer_errors_total",
Help: "Total number of commitment syncer errors",
}),
unitMismatch: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "cortex_committed_resource_syncer_unit_mismatch_total",
Help: "Total number of commitments with unit mismatch between Limes and Cortex flavor group knowledge",
}, []string{"flavor_group"}),
}
return m
}

// RecordUnitMismatch records a unit mismatch for a flavor group.
func (m *SyncerMonitor) RecordUnitMismatch(flavorGroup string) {
m.unitMismatch.WithLabelValues(flavorGroup).Inc()
}

// RecordSyncRun records a syncer run.
func (m *SyncerMonitor) RecordSyncRun() {
m.syncRuns.Inc()
}

// RecordSyncError records a syncer error.
func (m *SyncerMonitor) RecordSyncError() {
m.syncErrors.Inc()
}

// Describe implements prometheus.Collector.
func (m *SyncerMonitor) Describe(ch chan<- *prometheus.Desc) {
m.syncRuns.Describe(ch)
m.syncErrors.Describe(ch)
m.unitMismatch.Describe(ch)
}

// Collect implements prometheus.Collector.
func (m *SyncerMonitor) Collect(ch chan<- prometheus.Metric) {
m.syncRuns.Collect(ch)
m.syncErrors.Collect(ch)
m.unitMismatch.Collect(ch)
}
Loading
Loading