diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 02cec7e6be70..6ea0d5c39b85 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -488,7 +488,12 @@ func (wfc *WorkflowController) initManagers(ctx context.Context) error { return err } - wfc.syncManager.Initialize(ctx, wfList.Items) + // A non-nil error means a recorded lock holder could not be re-established + // (undecodable lock name, or an unavailable database session). This is fatal + // by design: we fail closed rather than risk a silent double-acquire. + if err := wfc.syncManager.Initialize(ctx, wfList.Items); err != nil { + return err + } if err := wfc.throttler.Init(wfList.Items); err != nil { return err diff --git a/workflow/sync/common.go b/workflow/sync/common.go index 743c99bcb9c2..4ffc1ea5ab5a 100644 --- a/workflow/sync/common.go +++ b/workflow/sync/common.go @@ -9,6 +9,13 @@ import ( type semaphore interface { acquire(ctx context.Context, holderKey string, tx *sqldb.SessionProxy) bool + // reacquire re-establishes a recorded holder at controller startup, ignoring + // the current limit. Unlike acquire it always represents the hold, so the + // in-memory count reflects persisted reality even when recorded holders + // exceed a (since lowered) limit - new acquisitions then correctly wait until + // the count drains below the limit, rather than dropping a holder (a + // double-acquire) or poisoning the lock over a routine limit change. + reacquire(ctx context.Context, holderKey string, tx *sqldb.SessionProxy) checkAcquire(ctx context.Context, holderKey string, tx *sqldb.SessionProxy) (bool, bool, string) tryAcquire(ctx context.Context, holderKey string, tx *sqldb.SessionProxy) (bool, string) release(ctx context.Context, key string) bool diff --git a/workflow/sync/database_semaphore.go b/workflow/sync/database_semaphore.go index 69b7ef44b684..21d8494e591b 100644 --- a/workflow/sync/database_semaphore.go +++ b/workflow/sync/database_semaphore.go @@ -372,6 +372,16 @@ func (s *databaseSemaphore) acquire(ctx context.Context, holderKey string, tx *s return false } +// reacquire re-establishes a recorded holder at startup. For a database-backed +// lock the holder is durable - its row survives the controller restart - so the +// hold is already represented regardless of the current limit. We still call +// acquire as a best-effort re-assertion (it inserts the held row if missing); +// any failure (already-held row, limit exceeded, transient error) is harmless +// because the persisted row keeps the hold counted by currentHolders. +func (s *databaseSemaphore) reacquire(ctx context.Context, holderKey string, tx *sqldb.SessionProxy) { + s.acquire(ctx, holderKey, tx) +} + func (s *databaseSemaphore) tryAcquire(ctx context.Context, holderKey string, tx *sqldb.SessionProxy) (bool, string) { logger := s.logger(ctx) acq, already, msg := s.checkAcquire(ctx, holderKey, tx) diff --git a/workflow/sync/mutex_test.go b/workflow/sync/mutex_test.go index 5557c8f85b6c..59827cd5f54d 100644 --- a/workflow/sync/mutex_test.go +++ b/workflow/sync/mutex_test.go @@ -123,7 +123,7 @@ func TestMutexLock(t *testing.T) { wfList, err := wfclientset.ArgoprojV1alpha1().Workflows("default").List(ctx, metav1.ListOptions{}) require.NoError(t, err) - syncManager.Initialize(ctx, wfList.Items) + require.NoError(t, syncManager.Initialize(ctx, wfList.Items)) assert.Len(t, syncManager.syncLockMap, 1) }) t.Run("WfLevelMutexAcquireAndRelease", func(t *testing.T) { diff --git a/workflow/sync/poison.go b/workflow/sync/poison.go new file mode 100644 index 000000000000..2ec6e3f6385b --- /dev/null +++ b/workflow/sync/poison.go @@ -0,0 +1,81 @@ +package sync + +import ( + "context" + "fmt" + "time" + + "github.com/argoproj/argo-workflows/v4/util/sqldb" +) + +// poisonedLock is a sentinel lock installed into the Manager's syncLockMap when, +// during Initialize, the controller cannot re-establish a holder that a Running +// workflow's status claims to hold. +// +// The soundness invariant is: if a Workflow's status records that it is holding +// a lock, the in-memory lock map must reflect that hold after Initialize. +// Otherwise a racing workflow's TryAcquire would find the lock absent, create a +// fresh one, and acquire a lock that is - per persisted state - already held. +// For a mutex that means two workflows running concurrently under the same +// mutex. +// +// Rather than silently dropping the holder (the previous behaviour), we install +// this lock, which refuses every acquire and reports a poisoned-state message. +// That message surfaces on the waiting node's synchronization status, marking +// the node/workflow as blocked by a poisoned lock so an operator can intervene. +// +// The poison is in-memory only and is cleared on the next controller restart, +// at which point Initialize re-evaluates: if the offending workflow is no longer +// Running the lock is recreated clean; if it is still Running and still +// unresolvable, it is poisoned again. +type poisonedLock struct { + name string + reason string +} + +var _ semaphore = &poisonedLock{} + +func newPoisonedLock(name, reason string) *poisonedLock { + return &poisonedLock{name: name, reason: reason} +} + +func (p *poisonedLock) message() string { + return fmt.Sprintf("lock %s is in a poisoned state: %s; manual intervention required", p.name, p.reason) +} + +func (p *poisonedLock) acquire(_ context.Context, _ string, _ *sqldb.SessionProxy) bool { + return false +} + +// reacquire is a no-op: a poisoned lock refuses all holds until restart. +func (p *poisonedLock) reacquire(_ context.Context, _ string, _ *sqldb.SessionProxy) {} + +func (p *poisonedLock) checkAcquire(_ context.Context, _ string, _ *sqldb.SessionProxy) (bool, bool, string) { + return false, false, p.message() +} + +func (p *poisonedLock) tryAcquire(_ context.Context, _ string, _ *sqldb.SessionProxy) (bool, string) { + return false, p.message() +} + +func (p *poisonedLock) release(_ context.Context, _ string) bool { return false } + +func (p *poisonedLock) addToQueue(_ context.Context, _ string, _ int32, _ time.Time) error { + return nil +} + +func (p *poisonedLock) removeFromQueue(_ context.Context, _ string) error { return nil } + +func (p *poisonedLock) getCurrentHolders(_ context.Context) ([]string, error) { return nil, nil } + +func (p *poisonedLock) getCurrentPending(_ context.Context) ([]string, error) { return nil, nil } + +func (p *poisonedLock) getLimit(_ context.Context) int { return 0 } + +func (p *poisonedLock) probeWaiting(_ context.Context) {} + +// lock returns true so that tryAcquireImpl proceeds to checkAcquire, which +// returns the poisoned-state message rather than a generic "failed to lock()". +func (p *poisonedLock) lock(_ context.Context) bool { return true } + +func (p *poisonedLock) unlock(_ context.Context) {} diff --git a/workflow/sync/semaphore.go b/workflow/sync/semaphore.go index 37df4de1c38f..af96723122a7 100644 --- a/workflow/sync/semaphore.go +++ b/workflow/sync/semaphore.go @@ -172,6 +172,22 @@ func (s *prioritySemaphore) acquire(_ context.Context, holderKey string, _ *sqld return false } +// reacquire re-establishes a recorded holder at startup, ignoring the limit. It +// always registers the holder, even when the recorded holders already exceed the +// current limit (e.g. the limit was lowered while held). The weighted semaphore +// is capped at the limit, so a slot is only taken when one is free; the excess is +// tracked solely in lockHolder, exactly as a downward resize leaves it. release() +// already tolerates len(lockHolder) > limit and only frees a weighted slot once +// the count drops below the limit, so new acquisitions wait until every recorded +// holder has drained. +func (s *prioritySemaphore) reacquire(_ context.Context, holderKey string, _ *sqldb.SessionProxy) { + if _, ok := s.lockHolder[holderKey]; ok { + return + } + s.semaphore.TryAcquire(1) // best effort: take a slot if one is free + s.lockHolder[holderKey] = true +} + func isSameWorkflowNodeKeys(firstKey, secondKey string) bool { firstItems := strings.Split(firstKey, "/") secondItems := strings.Split(secondKey, "/") diff --git a/workflow/sync/sync_manager.go b/workflow/sync/sync_manager.go index 627888316253..44e18cfa7a53 100644 --- a/workflow/sync/sync_manager.go +++ b/workflow/sync/sync_manager.go @@ -147,6 +147,26 @@ func getUpgradedKey(wf *wfv1.Workflow, key string, level LevelType) string { return key } +// upgradeHolderKey resolves a holder key recorded in a Workflow's +// synchronization status into the key form the in-memory lock expects. +// +// V2 keys are self-describing - they already encode whether the hold is at the +// workflow level (ns/wfname) or the template level (ns/wfname/nodeID) - so they +// are returned verbatim and no spec lookup is needed. Only legacy V1 keys are +// ambiguous and require getWorkflowSyncLevelByName to determine the level. This +// matters for workflowTemplateRef workflows, whose wf.Spec is empty: a V2 key +// can be re-established without ever resolving a level from the spec. +func upgradeHolderKey(ctx context.Context, wf *wfv1.Workflow, holderKey, lockName string) (string, error) { + if wfv1.CheckHolderKeyVersion(holderKey) != wfv1.HoldingNameV1 { + return holderKey, nil + } + level, err := getWorkflowSyncLevelByName(ctx, wf, lockName) + if err != nil { + return "", err + } + return getUpgradedKey(wf, holderKey, level), nil +} + type LevelType int const ( @@ -177,8 +197,25 @@ const ( // and at the workflow level -> impossible to upgrade correctly // due to ambiguity. Currently we just assume workflow level. func getWorkflowSyncLevelByName(ctx context.Context, wf *wfv1.Workflow, lockName string) (LevelType, error) { - if wf.Spec.Synchronization != nil { - syncItems, err := allSyncItems(wf.Spec.Synchronization) + // For workflowTemplateRef workflows wf.Spec.Synchronization and + // wf.Spec.Templates are empty; the rendered spec lives in + // wf.Status.StoredWorkflowSpec. Inspect both so the level can be resolved + // regardless of where the synchronization block was declared. + syncBlocks := []*wfv1.Synchronization{wf.Spec.Synchronization} + templates := wf.Spec.Templates + if wf.Status.StoredWorkflowSpec != nil { + syncBlocks = append(syncBlocks, wf.Status.StoredWorkflowSpec.Synchronization) + // slices.Concat allocates a fresh backing array; a plain append could + // write into wf.Spec.Templates' spare capacity and corrupt the caller's + // slice (which aliases the workflow in wfs). + templates = slices.Concat(wf.Spec.Templates, wf.Status.StoredWorkflowSpec.Templates) + } + + for _, sync := range syncBlocks { + if sync == nil { + continue + } + syncItems, err := allSyncItems(sync) if err != nil { return ErrorLevel, err } @@ -194,7 +231,7 @@ func getWorkflowSyncLevelByName(ctx context.Context, wf *wfv1.Workflow, lockName } var lastErr error - for _, template := range wf.Spec.Templates { + for _, template := range templates { if template.Synchronization != nil { syncItems, err := allSyncItems(template.Synchronization) if err != nil { @@ -218,34 +255,122 @@ func getWorkflowSyncLevelByName(ctx context.Context, wf *wfv1.Workflow, lockName return ErrorLevel, lastErr } -func (sm *Manager) Initialize(ctx context.Context, wfs []wfv1.Workflow) { - for _, wf := range wfs { +// initFailureFatal reports whether a failure to (re)establish a lock at startup +// is unrecoverable and must fail closed (crashloop). Only two cases qualify: +// - the lock name is undecodable, so there is no key to poison under and no way +// to prove the workflow's spec re-acquires the same lock; and +// - the lock is database-backed but no database session is configured, so +// nothing can back the lock. +// +// Everything else - a transient ConfigMap/DB read failure, a limit fetch +// returning 0 - is recoverable: the name decodes, so the lock can be poisoned +// (the poison key matches what a racer would compute) without halting the whole +// controller. +func (sm *Manager) initFailureFatal(ctx context.Context, lockName string) bool { + decoded, err := DecodeLockName(ctx, lockName) + if err != nil { + return true + } + return decoded.getKind() == lockKindDatabase && sm.dbInfo.SessionProxy == nil +} + +// poison installs a poisoned lock, refusing all acquires until the next restart. +func (sm *Manager) poison(ctx context.Context, lockName, reason string) { + sm.log.WithField("lock", lockName).WithField("reason", reason).Warn(ctx, "poisoning lock") + sm.syncLockMap[lockName] = newPoisonedLock(lockName, reason) +} + +// reestablishHolder re-establishes a single recorded holder of lockName in the +// in-memory lock map. lockType is "semaphore" or "mutex" (for logging) and +// initLock builds the backing lock when it is not yet present. +// +// It always reads the current lock from the map (never a stale local), so a lock +// poisoned by a previous holder stays poisoned and is not acquired on an orphaned +// object. A returned error is fatal (see initFailureFatal). An init failure or an +// unresolvable holder key poisons the lock; a failed re-acquire leaves the lock +// intact. +// +// Poisoning, not leaving absent, is required for the init-failure case: a +// ConfigMap-backed semaphore keeps its holders only in memory, so if we left the +// lock absent, prepAcquire would later rebuild it with zero holders once the +// backend recovered and let a racer acquire the slot this holder still owns. The +// poison is lock-scoped and clears on the next controller restart. +func (sm *Manager) reestablishHolder(ctx context.Context, wf *wfv1.Workflow, lockType, lockName, holder string, initLock func(context.Context, string) (semaphore, error)) error { + if sm.syncLockMap[lockName] == nil { + lock, err := initLock(ctx, lockName) + if err != nil { + if sm.initFailureFatal(ctx, lockName) { + // Undecodable name or a database hold with no session: we cannot + // poison to protect the recorded hold, so halt for an operator + // rather than risk a silent double-acquire. + sm.log.WithField(lockType, lockName).WithError(err).Error(ctx, "cannot initialize lock, failing closed") + return fmt.Errorf("cannot re-establish %s %q held by workflow %s/%s at startup: %w", lockType, lockName, wf.Namespace, wf.Name, err) + } + // Recoverable (e.g. transient ConfigMap unavailability) but the name + // decodes, so poison protects the recorded hold without crashlooping. + // Leaving the lock absent would be unsound: an in-memory semaphore + // rebuilt later would have zero holders and let a racer double-acquire. + sm.poison(ctx, lockName, fmt.Sprintf("controller could not initialize lock at startup: %v", err)) + return nil + } + sm.syncLockMap[lockName] = lock + } + + if holder == "" { + return nil + } + + key, err := upgradeHolderKey(ctx, wf, holder, lockName) + if err != nil { + sm.poison(ctx, lockName, fmt.Sprintf("controller could not re-establish recorded holder %q at startup: %v", holder, err)) + return nil + } + + // Re-read from the map: a previous holder of this same lock may have poisoned + // it, in which case reacquire is a no-op and we must not resurrect it. + lock := sm.syncLockMap[lockName] + // reacquire ignores the limit so the recorded hold is always represented: + // dropping it (leaving the lock intact) would let a racer double-acquire an + // in-memory semaphore whose holders exceed a lowered limit, and poisoning + // would block the whole shared lock over a routine limit change. + lock.reacquire(ctx, key, sm.dbInfo.SessionProxy) + sm.log.WithFields(logging.Fields{"key": key, lockType: lockName}).Info(ctx, "re-established recorded holder") + return nil +} + +// Initialize re-establishes, in the in-memory lock map, the holds that Running +// workflows record in their status. +// +// It fails closed only when a holder is genuinely unrecoverable (see +// initFailureFatal): an undecodable lock name, or a database-backed hold with no +// database session. Those return an error the controller treats as fatal, +// because we can neither poison the lock nor prove the spec re-acquires it, so +// continuing risks a silent double-acquire. +// +// Recoverable failures never crashloop: a lock that cannot be built (transient +// ConfigMap/DB read) or whose holder key is unresolvable is poisoned (lock-scoped, +// clears on restart); a holder that simply cannot be re-acquired (limit lowered, +// transient error) leaves the lock intact. +func (sm *Manager) Initialize(ctx context.Context, wfs []wfv1.Workflow) error { + // Hold the lock for the whole pass: a DB-backed Manager starts its + // backgroundNotifier goroutine in createLockManager (before initManagers + // calls Initialize), and that goroutine iterates syncLockMap under sm.lock. + sm.lock.Lock() + defer sm.lock.Unlock() + + for i := range wfs { + wf := &wfs[i] if wf.Status.Synchronization == nil { continue } if wf.Status.Synchronization.Semaphore != nil { for _, holding := range wf.Status.Synchronization.Semaphore.Holding { - sem := sm.syncLockMap[holding.Semaphore] - if sem == nil { - var err error - sem, err = sm.initializeSemaphore(ctx, holding.Semaphore) - if err != nil { - sm.log.WithField("semaphore", holding.Semaphore).WithError(err).Warn(ctx, "cannot initialize semaphore") - continue - } - sm.syncLockMap[holding.Semaphore] = sem - } - - for _, holders := range holding.Holders { - level, err := getWorkflowSyncLevelByName(ctx, &wf, holding.Semaphore) - if err != nil { - sm.log.WithField("semaphore", holding.Semaphore).WithError(err).Warn(ctx, "cannot obtain lock level") - continue - } - key := getUpgradedKey(&wf, holders, level) - if sem != nil && sem.acquire(ctx, key, sm.dbInfo.SessionProxy) { - sm.log.WithFields(logging.Fields{"key": key, "semaphore": holding.Semaphore}).Info(ctx, "Lock acquired") + for _, holder := range holding.Holders { + if err := sm.reestablishHolder(ctx, wf, "semaphore", holding.Semaphore, holder, func(ctx context.Context, name string) (semaphore, error) { + return sm.initializeSemaphore(ctx, name) + }); err != nil { + return err } } } @@ -253,29 +378,16 @@ func (sm *Manager) Initialize(ctx context.Context, wfs []wfv1.Workflow) { if wf.Status.Synchronization.Mutex != nil { for _, holding := range wf.Status.Synchronization.Mutex.Holding { - mutex := sm.syncLockMap[holding.Mutex] - if mutex == nil { - var err error - mutex, err = sm.initializeMutex(ctx, holding.Mutex) - if err != nil { - sm.log.WithField("mutex", holding.Mutex).WithError(err).Warn(ctx, "cannot initialize mutex") - continue - } - if holding.Holder != "" { - level, err := getWorkflowSyncLevelByName(ctx, &wf, holding.Mutex) - if err != nil { - sm.log.WithField("mutex", holding.Mutex).WithError(err).Warn(ctx, "cannot obtain lock level") - continue - } - key := getUpgradedKey(&wf, holding.Holder, level) - mutex.acquire(ctx, key, sm.dbInfo.SessionProxy) - } - sm.syncLockMap[holding.Mutex] = mutex + if err := sm.reestablishHolder(ctx, wf, "mutex", holding.Mutex, holding.Holder, func(ctx context.Context, name string) (semaphore, error) { + return sm.initializeMutex(ctx, name) + }); err != nil { + return err } } } } sm.log.Info(ctx, "Sync manager initialized successfully") + return nil } // TryAcquire tries to acquire the lock from semaphore. diff --git a/workflow/sync/sync_manager_test.go b/workflow/sync/sync_manager_test.go index 4d6bd52e29b1..e213292ef829 100644 --- a/workflow/sync/sync_manager_test.go +++ b/workflow/sync/sync_manager_test.go @@ -371,7 +371,7 @@ func TestSemaphoreWfLevel(t *testing.T) { wfList, err := wfclientset.ArgoprojV1alpha1().Workflows("default").List(ctx, metav1.ListOptions{}) require.NoError(t, err) - syncManager.Initialize(ctx, wfList.Items) + require.NoError(t, syncManager.Initialize(ctx, wfList.Items)) assert.Len(t, syncManager.syncLockMap, 1) }) t.Run("InitializeSynchronizationWithInvalid", func(t *testing.T) { @@ -385,7 +385,11 @@ func TestSemaphoreWfLevel(t *testing.T) { wfclientset := fakewfclientset.NewClientset(wf) wfList, err := wfclientset.ArgoprojV1alpha1().Workflows("default").List(ctx, metav1.ListOptions{}) require.NoError(t, err) - syncManager.Initialize(ctx, wfList.Items) + // An invalid/undecodable lock name now fails closed rather than being + // silently skipped: we can't poison it or prove the spec re-acquires it, + // so the controller halts for an operator instead of risking a double-acquire. + err = syncManager.Initialize(ctx, wfList.Items) + require.Error(t, err) assert.Empty(t, syncManager.syncLockMap) }) t.Run("InitializeMultipleWorkflowsHolding", func(t *testing.T) { @@ -417,7 +421,7 @@ func TestSemaphoreWfLevel(t *testing.T) { wf2.Status.Synchronization.Semaphore.Holding[0].Holders = []string{"default/hello-world-two"} // Initialize with both workflows - syncManager.Initialize(ctx, []wfv1.Workflow{*wf1, *wf2}) + require.NoError(t, syncManager.Initialize(ctx, []wfv1.Workflow{*wf1, *wf2})) // Verify the semaphore was created assert.Len(t, syncManager.syncLockMap, 1) @@ -432,6 +436,220 @@ func TestSemaphoreWfLevel(t *testing.T) { assert.Contains(t, holders, "default/hello-world-two") }) + t.Run("InitializeConfigMapInitFailurePoisonsNotFatal", func(t *testing.T) { + // The ConfigMap backing the semaphore limit is absent, so the limit fetch + // fails and initializeSemaphore errors. The name decodes (kind ConfigMap), + // so this is recoverable - it must not crashloop the controller. It must + // POISON the lock rather than leave it absent: a ConfigMap semaphore keeps + // its holders only in memory, so leaving it absent would let a later + // rebuild start with zero holders and a racer double-acquire this hold. + kubeClient := fake.NewClientset() // no ConfigMap created + syncManager, err := NewLockManager(ctx, kubeClient, "", nil, GetSyncLimitFunc(kubeClient), func(key string) { + }, WorkflowExistenceFunc, false) + require.NoError(t, err) + + wf := wfv1.MustUnmarshalWorkflow(wfWithStatus) + wf.Name = "cm-holder" + wf.Status.Synchronization.Semaphore.Holding[0].Holders = []string{"default/cm-holder"} + + require.NoError(t, syncManager.Initialize(ctx, []wfv1.Workflow{*wf}), "a recoverable ConfigMap failure must not be fatal") + lock := syncManager.syncLockMap["default/ConfigMap/my-config/workflow"] + require.NotNil(t, lock) + _, poisoned := lock.(*poisonedLock) + assert.True(t, poisoned, "decodable lock that cannot be initialized must be poisoned, not left absent") + }) + + t.Run("InitializeLoweredLimitForceRegistersAllHolders", func(t *testing.T) { + // Two workflows are recorded as holders but the limit is now 1 (lowered + // from 2). Lowering a limit is routine: it must NOT poison the shared + // semaphore (which would block every contender until restart), NOR drop + // the over-limit holder (which would let a racer double-acquire once the + // other holder releases). Both holders must be force-registered so the + // in-memory count reflects reality and new acquisitions wait until the + // count drains below the limit. + kubeClient := fake.NewClientset() + _, err := kubeClient.CoreV1().ConfigMaps("default").Create(ctx, &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: "my-config"}, + Data: map[string]string{"workflow": "1"}, + }, metav1.CreateOptions{}) + require.NoError(t, err) + + syncManager, err := NewLockManager(ctx, kubeClient, "", nil, GetSyncLimitFunc(kubeClient), func(key string) { + }, WorkflowExistenceFunc, false) + require.NoError(t, err) + + wf1 := wfv1.MustUnmarshalWorkflow(wfWithStatus) + wf1.Name = "holder-one" + wf1.Status.Synchronization.Semaphore.Holding[0].Holders = []string{"default/holder-one"} + wf2 := wfv1.MustUnmarshalWorkflow(wfWithStatus) + wf2.Name = "holder-two" + wf2.Status.Synchronization.Semaphore.Holding[0].Holders = []string{"default/holder-two"} + + require.NoError(t, syncManager.Initialize(ctx, []wfv1.Workflow{*wf1, *wf2})) + + lock := syncManager.syncLockMap["default/ConfigMap/my-config/workflow"] + require.NotNil(t, lock) + _, poisoned := lock.(*poisonedLock) + assert.False(t, poisoned, "lowering a limit must not poison the semaphore") + holders, err := lock.getCurrentHolders(ctx) + require.NoError(t, err) + assert.Len(t, holders, 2, "both recorded holders must be force-registered, even over the lowered limit") + assert.Contains(t, holders, "default/holder-one") + assert.Contains(t, holders, "default/holder-two") + + // A new contender must NOT be able to acquire: the count (2) is over the + // limit (1), so the lock is correctly unavailable until holders drain. + racer := wfv1.MustUnmarshalWorkflow(wfWithStatus) + racer.Name = "racer" + acquired, _, _, _, err := syncManager.TryAcquire(ctx, racer, "", racer.Spec.Synchronization) + require.NoError(t, err) + assert.False(t, acquired, "a racer must wait while recorded holders exceed the lowered limit") + }) + + t.Run("InitializeMutexWithWorkflowTemplateRef", func(t *testing.T) { + // A workflowTemplateRef workflow has empty Spec.Synchronization and + // Spec.Templates; its rendered mutex lives in Status.StoredWorkflowSpec. + // Its recorded holder uses a self-describing V2 key, so Initialize must + // re-establish the hold WITHOUT needing to resolve a level from wf.Spec. + // Before the fix, getWorkflowSyncLevelByName could not find the mutex in + // the empty spec, Initialize hit "cannot obtain lock level" and dropped + // the holder, leaving the lock free for a racing workflow to acquire. + syncManager, err := NewLockManager(ctx, kube, "", nil, syncLimitFunc, func(key string) { + }, WorkflowExistenceFunc, false) + require.NoError(t, err) + + wf := &wfv1.Workflow{ + ObjectMeta: metav1.ObjectMeta{Name: "tmplref-holder", Namespace: "default"}, + Spec: wfv1.WorkflowSpec{ + WorkflowTemplateRef: &wfv1.WorkflowTemplateRef{Name: "my-template"}, + }, + Status: wfv1.WorkflowStatus{ + StoredWorkflowSpec: &wfv1.WorkflowSpec{ + Synchronization: &wfv1.Synchronization{ + Mutexes: []*wfv1.Mutex{{Name: "my-mutex"}}, + }, + }, + Synchronization: &wfv1.SynchronizationStatus{ + Mutex: &wfv1.MutexStatus{ + Holding: []wfv1.MutexHolding{{ + Mutex: "default/Mutex/my-mutex", + Holder: "default/tmplref-holder", + }}, + }, + }, + }, + } + + require.NoError(t, syncManager.Initialize(ctx, []wfv1.Workflow{*wf})) + + lock := syncManager.syncLockMap["default/Mutex/my-mutex"] + require.NotNil(t, lock) + _, poisoned := lock.(*poisonedLock) + assert.False(t, poisoned, "holder should be re-established, not poisoned") + holders, err := lock.getCurrentHolders(ctx) + require.NoError(t, err) + assert.Equal(t, []string{"default/tmplref-holder"}, holders) + }) + + t.Run("InitializePoisonsUnresolvableHolder", func(t *testing.T) { + // A legacy V1 holder key that cannot be resolved to a level (no matching + // synchronization block in spec or stored spec) must poison the lock, + // rather than silently dropping the holder. Otherwise a racing workflow + // could acquire a mutex that persisted state says is already held. + syncManager, err := NewLockManager(ctx, kube, "", nil, syncLimitFunc, func(key string) { + }, WorkflowExistenceFunc, false) + require.NoError(t, err) + + holder := &wfv1.Workflow{ + ObjectMeta: metav1.ObjectMeta{Name: "ghost-holder", Namespace: "default"}, + Status: wfv1.WorkflowStatus{ + Synchronization: &wfv1.SynchronizationStatus{ + Mutex: &wfv1.MutexStatus{ + Holding: []wfv1.MutexHolding{{ + Mutex: "default/Mutex/my-mutex", + Holder: "ghost-holder", // V1 key, unresolvable against empty spec + }}, + }, + }, + }, + } + + // A decodable name whose holder can't be re-established is poisoned, not + // fatal: the poison key matches what a racer computes, so soundness holds. + require.NoError(t, syncManager.Initialize(ctx, []wfv1.Workflow{*holder})) + + lock := syncManager.syncLockMap["default/Mutex/my-mutex"] + require.NotNil(t, lock) + _, poisoned := lock.(*poisonedLock) + assert.True(t, poisoned, "unresolvable holder must poison the lock") + + // A racing workflow must not be able to acquire the poisoned mutex. + racer := wfv1.MustUnmarshalWorkflow(wfWithMutex) + racer.Name = "racer" + acquired, _, msg, _, err := syncManager.TryAcquire(ctx, racer, "", racer.Spec.Synchronization) + require.NoError(t, err) + assert.False(t, acquired, "racing workflow must not acquire a poisoned lock") + assert.Contains(t, msg, "poisoned state") + }) + + t.Run("InitializeFailsClosedOnUndecodableLockName", func(t *testing.T) { + // A holder whose lock name cannot even be decoded is an unknowable hold: + // we can neither poison the lock (no key) nor prove the spec re-acquires + // it, so continuing risks a silent double-acquire. Initialize must fail + // closed (return an error) so the controller halts for an operator. + syncManager, err := NewLockManager(ctx, kube, "", nil, syncLimitFunc, func(key string) { + }, WorkflowExistenceFunc, false) + require.NoError(t, err) + + holder := &wfv1.Workflow{ + ObjectMeta: metav1.ObjectMeta{Name: "broken-holder", Namespace: "default"}, + Status: wfv1.WorkflowStatus{ + Synchronization: &wfv1.SynchronizationStatus{ + Mutex: &wfv1.MutexStatus{ + Holding: []wfv1.MutexHolding{{ + Mutex: "garbage-undecodable-name", + Holder: "default/broken-holder", + }}, + }, + }, + }, + } + + err = syncManager.Initialize(ctx, []wfv1.Workflow{*holder}) + require.Error(t, err) + assert.Contains(t, err.Error(), "broken-holder") + assert.NotContains(t, syncManager.syncLockMap, "garbage-undecodable-name") + }) + + t.Run("InitializeFailsClosedWhenDBUnavailable", func(t *testing.T) { + // A Running workflow holds a database-backed lock, but the manager has no + // database session (e.g. the DB was unreachable at startup). The lock + // cannot be re-established, so Initialize must fail closed and let the + // controller crashloop until the database is reachable. + syncManager, err := NewLockManager(ctx, kube, "", nil, syncLimitFunc, func(key string) { + }, WorkflowExistenceFunc, false) + require.NoError(t, err) + require.Nil(t, syncManager.dbInfo.SessionProxy) + + holder := &wfv1.Workflow{ + ObjectMeta: metav1.ObjectMeta{Name: "db-holder", Namespace: "default"}, + Status: wfv1.WorkflowStatus{ + Synchronization: &wfv1.SynchronizationStatus{ + Mutex: &wfv1.MutexStatus{ + Holding: []wfv1.MutexHolding{{ + Mutex: "default/Database/my-db-lock", + Holder: "default/db-holder", + }}, + }, + }, + }, + } + + err = syncManager.Initialize(ctx, []wfv1.Workflow{*holder}) + require.Error(t, err) + assert.Contains(t, err.Error(), "database session") + }) + t.Run("WfLevelAcquireAndRelease", func(t *testing.T) { var nextKey string syncManager, err := NewLockManager(ctx, kube, "", nil, syncLimitFunc, func(key string) { @@ -1586,7 +1804,7 @@ func TestMutexMigration(t *testing.T) { syncMgr.syncLockMap = make(map[string]semaphore) wfs := []wfv1.Workflow{*wfMutex2.DeepCopy()} - syncMgr.Initialize(ctx, wfs) + require.NoError(syncMgr.Initialize(ctx, wfs)) syncItems, err := allSyncItems(wfMutex2.Spec.Synchronization) require.NoError(err) @@ -1631,7 +1849,7 @@ func TestMutexMigration(t *testing.T) { assert.Equal(1, numFound) wfs := []wfv1.Workflow{*wfMutex3.DeepCopy()} - syncMgr.Initialize(ctx, wfs) + require.NoError(syncMgr.Initialize(ctx, wfs)) syncItems, err := allSyncItems(wfMutex3.Spec.Templates[1].Synchronization) require.NoError(err) @@ -1906,3 +2124,105 @@ func TestUnconfiguredSemaphores(t *testing.T) { } }) } + +// TestDatabaseInitializeLoweredLimitAfterRestart drives the full restart path +// through Manager.Initialize for a database-backed semaphore. +// +// Scenario: a controller acquires a database semaphore for two workflows under +// limit 2, the controller is turned off, the limit is lowered to 1, and the +// controller restarts while both workflows are still Running. On restart +// Initialize re-establishes the recorded holders from the workflows' status. +// +// For a database semaphore the holds live in durable rows, so the holder count +// survives the restart regardless of the lowered limit. We assert that: +// - the lock is live (not poisoned - lowering a limit is routine); +// - both holders are still counted, even over the lowered limit; +// - each still-running workflow keeps its lock when it re-reconciles; +// - a new contender waits until the over-subscription drains below the limit. +func TestDatabaseInitializeLoweredLimitAfterRestart(t *testing.T) { + ctx := logging.TestContext(t.Context()) + for _, dbType := range testDBTypes { + t.Run(string(dbType), func(t *testing.T) { + const dbLimitKey = "default/my-database-sem" + const lockName = "default/Database/my-database-sem" + + info, cleanup, syncConfig, err := createTestDBSession(ctx, t, dbType) + require.NoError(t, err) + defer cleanup() + + // Original limit: 2. + _, err = info.SessionProxy.Session().SQL().Exec("INSERT INTO sync_limit (name, sizelimit) VALUES (?, ?)", dbLimitKey, 2) + require.NoError(t, err) + + // --- Controller instance #1: two workflows acquire the semaphore. --- + mgr1 := createLockManager(ctx, info.SessionProxy, &syncConfig, nil, func(key string) {}, WorkflowExistenceFunc) + + creationTime := metav1.NewTime(time.Now()) + wf1 := wfv1.MustUnmarshalWorkflow(wfWithDBSemaphore) + wf1.Name = "holder-one" + wf1.CreationTimestamp = creationTime + wf2 := wfv1.MustUnmarshalWorkflow(wfWithDBSemaphore) + wf2.Name = "holder-two" + wf2.CreationTimestamp = creationTime + + acquired, _, _, _, err := mgr1.TryAcquire(ctx, wf1, "", wf1.Spec.Synchronization) + require.NoError(t, err) + require.True(t, acquired, "holder-one should acquire under limit 2") + acquired, _, _, _, err = mgr1.TryAcquire(ctx, wf2, "", wf2.Spec.Synchronization) + require.NoError(t, err) + require.True(t, acquired, "holder-two should acquire under limit 2") + + // Both workflows now record the hold in their status - this is what the + // informer persists and re-lists into Initialize after a restart. + require.NotNil(t, wf1.Status.Synchronization) + require.NotNil(t, wf2.Status.Synchronization) + + // --- Limit lowered to 1 while the controller is down. --- + _, err = info.SessionProxy.Session().SQL().Exec("UPDATE sync_limit SET sizelimit = ? WHERE name = ?", 1, dbLimitKey) + require.NoError(t, err) + + // --- Controller instance #2: restart. Initialize from the running workflows. --- + mgr2 := createLockManager(ctx, info.SessionProxy, &syncConfig, nil, func(key string) {}, WorkflowExistenceFunc) + require.NoError(t, mgr2.Initialize(ctx, []wfv1.Workflow{*wf1, *wf2})) + + lock := mgr2.syncLockMap[lockName] + require.NotNil(t, lock) + _, poisoned := lock.(*poisonedLock) + assert.False(t, poisoned, "lowering a DB semaphore limit must not poison the lock") + + holders, err := lock.getCurrentHolders(ctx) + require.NoError(t, err) + assert.Len(t, holders, 2, "both recorded holders must survive the restart, even over the lowered limit") + + // Each still-running workflow re-reconciles and re-acquires the lock it + // already holds. Both must succeed regardless of the lowered limit, since + // the already-held check precedes the limit check. + acquired, _, _, _, err = mgr2.TryAcquire(ctx, wf1, "", wf1.Spec.Synchronization) + require.NoError(t, err) + assert.True(t, acquired, "holder-one must keep its lock after the limit was lowered") + acquired, _, _, _, err = mgr2.TryAcquire(ctx, wf2, "", wf2.Spec.Synchronization) + require.NoError(t, err) + assert.True(t, acquired, "holder-two must keep its lock after the limit was lowered") + + // A new contender must wait: holders (2) exceed the lowered limit (1). + racer := wfv1.MustUnmarshalWorkflow(wfWithDBSemaphore) + racer.Name = "racer" + racer.CreationTimestamp = creationTime + acquired, _, _, _, err = mgr2.TryAcquire(ctx, racer, "", racer.Spec.Synchronization) + require.NoError(t, err) + assert.False(t, acquired, "a racer must wait while holders exceed the lowered limit") + + // Drain one holder. Still at the limit (1 holder, limit 1) - racer waits. + mgr2.Release(ctx, wf1, "", wf1.Spec.Synchronization) + acquired, _, _, _, err = mgr2.TryAcquire(ctx, racer, "", racer.Spec.Synchronization) + require.NoError(t, err) + assert.False(t, acquired, "racer still waits: one holder remains at limit 1") + + // Drain the last holder - now below the limit, so the racer is admitted. + mgr2.Release(ctx, wf2, "", wf2.Spec.Synchronization) + acquired, _, _, _, err = mgr2.TryAcquire(ctx, racer, "", racer.Spec.Synchronization) + require.NoError(t, err) + assert.True(t, acquired, "racer acquires once holders drain below the lowered limit") + }) + } +}