Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/v1alpha1/reservation_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ const (
ReservationTypeLabelFailover = "failover"
)

// Annotation keys for Reservation metadata.
const (
// AnnotationCreatorRequestID tracks the request ID that created this reservation.
// Used for end-to-end traceability across API calls, controller reconciles, and scheduler invocations.
AnnotationCreatorRequestID = "reservations.cortex.cloud/creator-request-id"
)

// CommittedResourceAllocation represents a workload's assignment to a committed resource reservation slot.
// The workload could be a VM (Nova/IronCore), Pod (Kubernetes), or other resource.
type CommittedResourceAllocation struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ func (api *HTTPAPI) HandleChangeCommitments(w http.ResponseWriter, r *http.Reque
req := liquid.CommitmentChangeRequest{}
statusCode := http.StatusOK

// Extract or generate request ID for tracing - always set in response header
requestID := r.Header.Get("X-Request-ID")
if requestID == "" {
requestID = uuid.New().String()
}
w.Header().Set("X-Request-ID", requestID)

// Check if API is enabled
if !api.config.EnableChangeCommitmentsAPI {
statusCode = http.StatusServiceUnavailable
Expand All @@ -61,11 +68,6 @@ func (api *HTTPAPI) HandleChangeCommitments(w http.ResponseWriter, r *http.Reque
api.changeMutex.Lock()
defer api.changeMutex.Unlock()

// Extract or generate request ID for tracing
requestID := r.Header.Get("X-Request-ID")
if requestID == "" {
requestID = uuid.New().String()
}
ctx := reservations.WithGlobalRequestID(context.Background(), "committed-resource-"+requestID)
logger := LoggerFromContext(ctx).WithValues("component", "api", "endpoint", "/v1/change-commitments")

Expand Down Expand Up @@ -132,7 +134,7 @@ func (api *HTTPAPI) processCommitmentChanges(ctx context.Context, w http.Respons
manager := NewReservationManager(api.client)
requireRollback := false
failedCommitments := make(map[string]string) // commitmentUUID to reason for failure, for better response messages in case of rollback
logger.Info("processing commitment change request", "availabilityZone", req.AZ, "dryRun", req.DryRun, "affectedProjects", len(req.ByProject))
creatorRequestID := reservations.GlobalRequestIDFromContext(ctx)

knowledge := &reservations.FlavorGroupKnowledgeClient{Client: api.client}
flavorGroups, err := knowledge.GetAllFlavorGroups(ctx, nil)
Expand Down Expand Up @@ -194,8 +196,7 @@ ProcessLoop:
}

for _, commitment := range resourceChanges.Commitments {
// Additional per-commitment validation if needed
logger.Info("processing commitment change", "commitmentUUID", commitment.UUID, "projectID", projectID, "resourceName", resourceName, "oldStatus", commitment.OldStatus.UnwrapOr("none"), "newStatus", commitment.NewStatus.UnwrapOr("none"))
logger.V(1).Info("processing commitment", "commitmentUUID", commitment.UUID, "oldStatus", commitment.OldStatus.UnwrapOr("none"), "newStatus", commitment.NewStatus.UnwrapOr("none"))

// TODO add configurable upper limit validation for commitment size (number of instances) to prevent excessive reservation creation
// TODO add domain
Expand Down Expand Up @@ -247,8 +248,10 @@ ProcessLoop:
requireRollback = true
break ProcessLoop
}
// Set creator request ID for traceability across controller reconciles
stateDesired.CreatorRequestID = creatorRequestID

logger.Info("applying commitment state change", "commitmentUUID", commitment.UUID, "oldState", stateBefore, "desiredState", stateDesired)
logger.V(1).Info("applying commitment state change", "commitmentUUID", commitment.UUID, "oldMemory", stateBefore.TotalMemoryBytes, "desiredMemory", stateDesired.TotalMemoryBytes)

touchedReservations, deletedReservations, err := manager.ApplyCommitmentState(ctx, logger, stateDesired, flavorGroups, "changeCommitmentsApi")
if err != nil {
Expand All @@ -257,7 +260,7 @@ ProcessLoop:
requireRollback = true
break ProcessLoop
}
logger.Info("applied commitment state change", "commitmentUUID", commitment.UUID, "touchedReservations", len(touchedReservations), "deletedReservations", len(deletedReservations))
logger.V(1).Info("applied commitment state change", "commitmentUUID", commitment.UUID, "touchedReservations", len(touchedReservations), "deletedReservations", len(deletedReservations))
reservationsToWatch = append(reservationsToWatch, touchedReservations...)
}
}
Expand Down Expand Up @@ -318,6 +321,7 @@ ProcessLoop:
}

// watchReservationsUntilReady polls until all reservations reach Ready=True or timeout.
// Returns failed reservations and any errors encountered.
func watchReservationsUntilReady(
ctx context.Context,
logger logr.Logger,
Expand All @@ -332,19 +336,32 @@ func watchReservationsUntilReady(
}

deadline := time.Now().Add(timeout)
startTime := time.Now()
totalReservations := len(reservations)

reservationsToWatch := make([]v1alpha1.Reservation, len(reservations))
copy(reservationsToWatch, reservations)

// Track successful reservations for summary
var successfulReservations []string
pollCount := 0

for {
pollCount++
var stillWaiting []v1alpha1.Reservation
if time.Now().After(deadline) {
errors = append(errors, fmt.Errorf("timeout after %v waiting for reservations to become ready", timeout))
// Log summary on timeout
logger.Info("reservation watch completed (timeout)",
"total", totalReservations,
"ready", len(successfulReservations),
"failed", len(failedReservations),
"timedOut", len(reservationsToWatch),
"duration", time.Since(startTime).Round(time.Millisecond),
"polls", pollCount)
return failedReservations, errors
}

allAreReady := true

for _, res := range reservationsToWatch {
// Fetch current state
var current v1alpha1.Reservation
Expand All @@ -354,9 +371,7 @@ func watchReservationsUntilReady(
}

if err := k8sClient.Get(ctx, nn, &current); err != nil {
allAreReady = false
// Reservation is still in process of being created, or there is a transient error, continue waiting for it
logger.V(1).Info("transient error getting reservation, will retry", "reservation", res.Name, "error", err)
// Reservation is still in process of being created, or there is a transient error
stillWaiting = append(stillWaiting, res)
continue
}
Expand All @@ -369,43 +384,40 @@ func watchReservationsUntilReady(

if readyCond == nil {
// Condition not set yet, keep waiting
allAreReady = false
stillWaiting = append(stillWaiting, res)
continue
}

switch readyCond.Status {
case metav1.ConditionTrue:
// check if host is not set in spec or status: if so, no capacity left to schedule the reservation
// Only consider truly ready if Status.Host is populated
if current.Spec.TargetHost == "" || current.Status.Host == "" {
allAreReady = false
failedReservations = append(failedReservations, current)
logger.Info("insufficient capacity for reservation", "reservation", current.Name, "reason", readyCond.Reason, "message", readyCond.Message, "targetHostInSpec", current.Spec.TargetHost, "hostInStatus", current.Status.Host)
} else {
// Reservation is successfully scheduled, no further action needed
logger.Info("reservation ready", "reservation", current.Name, "host", current.Spec.TargetHost)
stillWaiting = append(stillWaiting, res)
continue
}
// Reservation is successfully scheduled - track for summary
successfulReservations = append(successfulReservations, current.Name)

case metav1.ConditionFalse:
failedReservations = append(failedReservations, res)
// Any failure reason counts as failed
failedReservations = append(failedReservations, current)
case metav1.ConditionUnknown:
allAreReady = false
stillWaiting = append(stillWaiting, res)
}
}

if allAreReady || len(stillWaiting) == 0 {
logger.Info("all reservations checked",
"failed", len(failedReservations))
if len(stillWaiting) == 0 {
// All reservations have reached a terminal state - log summary
logger.Info("reservation watch completed",
"total", totalReservations,
"ready", len(successfulReservations),
"failed", len(failedReservations),
"duration", time.Since(startTime).Round(time.Millisecond),
"polls", pollCount)
return failedReservations, errors
}

reservationsToWatch = stillWaiting
// Log progress
logger.V(1).Info("waiting for reservations to become ready",
"notReady", len(reservationsToWatch),
"total", len(reservations),
"timeRemaining", time.Until(deadline).Round(time.Second))

// Wait before next poll
select {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ type ChangeCommitmentsAPIMonitor struct {
}

// NewChangeCommitmentsAPIMonitor creates a new monitor with Prometheus metrics.
// Metrics are pre-initialized with zero values for common HTTP status codes
// to ensure they appear in Prometheus before the first request.
func NewChangeCommitmentsAPIMonitor() ChangeCommitmentsAPIMonitor {
return ChangeCommitmentsAPIMonitor{
m := ChangeCommitmentsAPIMonitor{
requestCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "cortex_committed_resource_change_api_requests_total",
Help: "Total number of committed resource change API requests by HTTP status code",
Expand All @@ -35,6 +37,21 @@ func NewChangeCommitmentsAPIMonitor() ChangeCommitmentsAPIMonitor {
Help: "Total number of commitment change requests that timed out while waiting for reservations to become ready",
}),
}

// Pre-initialize metrics with zero values for common HTTP status codes.
// This ensures metrics exist in Prometheus before the first request,
// preventing "metric missing" warnings in alerting rules.
for _, statusCode := range []string{"200", "400", "409", "500", "503"} {
m.requestCounter.WithLabelValues(statusCode)
m.requestDuration.WithLabelValues(statusCode)
}

// Pre-initialize commitment change result labels
for _, result := range []string{"accepted", "rejected"} {
m.commitmentChanges.WithLabelValues(result)
}

return m
}

// Describe implements prometheus.Collector.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,13 @@ func TestChangeCommitmentsAPIMonitor_MetricLabels(t *testing.T) {
// Verify request counter has correct labels
for _, family := range families {
if *family.Name == "cortex_committed_resource_change_api_requests_total" {
if len(family.Metric) != 3 {
t.Errorf("Expected 3 request counter metrics, got %d", len(family.Metric))
// At minimum we expect the 3 labels we added (200, 409, 503)
// Plus pre-initialized labels (400, 500) - so >= 5 total
if len(family.Metric) < 3 {
t.Errorf("Expected at least 3 request counter metrics, got %d", len(family.Metric))
}

// Check label names
// Check all metrics have the status_code label
for _, metric := range family.Metric {
labelNames := make(map[string]bool)
for _, label := range metric.Label {
Expand All @@ -120,11 +122,13 @@ func TestChangeCommitmentsAPIMonitor_MetricLabels(t *testing.T) {
}

if *family.Name == "cortex_committed_resource_change_api_request_duration_seconds" {
if len(family.Metric) != 1 {
t.Errorf("Expected 1 histogram metric, got %d", len(family.Metric))
// At minimum we expect the label we used (200)
// Plus pre-initialized labels - so >= 1 total
if len(family.Metric) < 1 {
t.Errorf("Expected at least 1 histogram metric, got %d", len(family.Metric))
}

// Check label names
// Check all metrics have the status_code label
for _, metric := range family.Metric {
labelNames := make(map[string]bool)
for _, label := range metric.Label {
Expand All @@ -138,11 +142,13 @@ func TestChangeCommitmentsAPIMonitor_MetricLabels(t *testing.T) {
}

if *family.Name == "cortex_committed_resource_change_api_commitment_changes_total" {
if len(family.Metric) != 2 {
t.Errorf("Expected 2 commitment changes metrics, got %d", len(family.Metric))
// At minimum we expect the 2 labels we added (success, rejected)
// Plus pre-initialized labels (accepted) - so >= 2 total
if len(family.Metric) < 2 {
t.Errorf("Expected at least 2 commitment changes metrics, got %d", len(family.Metric))
}

// Check label names
// Check all metrics have the result label
for _, metric := range family.Metric {
labelNames := make(map[string]bool)
for _, label := range metric.Label {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1277,7 +1277,8 @@ func (env *CommitmentTestEnv) processNewReservation(res *v1alpha1.Reservation) {
}
}

// markReservationSchedulerProcessedStatus updates a reservation to have Ready=True status (scheduling can be succeeded or not - depending on host status)
// markReservationSchedulerProcessedStatus updates a reservation status based on scheduling result.
// If host is non-empty, sets Ready=True (success). If host is empty, sets Ready=False with NoHostsFound (failure).
func (env *CommitmentTestEnv) markReservationSchedulerProcessedStatus(res *v1alpha1.Reservation, host string) {
ctx := context.Background()

Expand All @@ -1288,16 +1289,28 @@ func (env *CommitmentTestEnv) markReservationSchedulerProcessedStatus(res *v1alp
return
}

// Then update status
// Then update status - Ready=True only if host was found, Ready=False otherwise
res.Status.Host = host
res.Status.Conditions = []metav1.Condition{
{
Type: v1alpha1.ReservationConditionReady,
Status: metav1.ConditionTrue,
Reason: "ReservationActive",
Message: "Reservation is ready (set by test controller)",
LastTransitionTime: metav1.Now(),
},
if host != "" {
res.Status.Conditions = []metav1.Condition{
{
Type: v1alpha1.ReservationConditionReady,
Status: metav1.ConditionTrue,
Reason: "ReservationActive",
Message: "Reservation is ready (set by test controller)",
LastTransitionTime: metav1.Now(),
},
}
} else {
res.Status.Conditions = []metav1.Condition{
{
Type: v1alpha1.ReservationConditionReady,
Status: metav1.ConditionFalse,
Reason: "NoHostsFound",
Message: "No hosts with sufficient capacity (set by test controller)",
LastTransitionTime: metav1.Now(),
},
}
}
if err := env.K8sClient.Status().Update(ctx, res); err != nil {
env.T.Logf("Warning: Failed to update reservation status: %v", err)
Expand Down
11 changes: 10 additions & 1 deletion internal/scheduling/reservations/commitments/api_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,23 @@ import (

"github.com/cobaltcore-dev/cortex/internal/scheduling/reservations"
"github.com/go-logr/logr"
"github.com/google/uuid"
liquid "github.com/sapcc/go-api-declarations/liquid"
)

// handles GET /v1/info requests from Limes:
// See: https://github.com/sapcc/go-api-declarations/blob/main/liquid/commitment.go
// See: https://pkg.go.dev/github.com/sapcc/go-api-declarations/liquid
func (api *HTTPAPI) HandleInfo(w http.ResponseWriter, r *http.Request) {
ctx := WithNewGlobalRequestID(r.Context())
// Extract or generate request ID for tracing
requestID := r.Header.Get("X-Request-ID")
if requestID == "" {
requestID = uuid.New().String()
}
// Set request ID in response header for client correlation
w.Header().Set("X-Request-ID", requestID)

ctx := reservations.WithGlobalRequestID(r.Context(), "committed-resource-"+requestID)
logger := LoggerFromContext(ctx).WithValues("component", "api", "endpoint", "/v1/info")

// Only accept GET method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"strconv"
"time"

"github.com/cobaltcore-dev/cortex/internal/scheduling/reservations"
"github.com/google/uuid"
"github.com/sapcc/go-api-declarations/liquid"
)

Expand All @@ -20,7 +22,22 @@ func (api *HTTPAPI) HandleReportCapacity(w http.ResponseWriter, r *http.Request)
startTime := time.Now()
statusCode := http.StatusOK

ctx := WithNewGlobalRequestID(r.Context())
// Extract or generate request ID for tracing - always set in response header
requestID := r.Header.Get("X-Request-ID")
if requestID == "" {
requestID = uuid.New().String()
}
w.Header().Set("X-Request-ID", requestID)

// Check if API is enabled
if !api.config.EnableReportCapacityAPI {
statusCode = http.StatusServiceUnavailable
http.Error(w, "report-capacity API is disabled", statusCode)
api.recordCapacityMetrics(statusCode, startTime)
return
}

ctx := reservations.WithGlobalRequestID(r.Context(), "committed-resource-"+requestID)
logger := LoggerFromContext(ctx).WithValues("component", "api", "endpoint", "/v1/commitments/report-capacity")

// Only accept POST method
Expand Down
Loading
Loading