Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,12 @@ func (wfc *WorkflowController) initManagers(ctx context.Context) error {
return err
}

wfc.syncManager.Initialize(ctx, wfList.Items)
// A non-nil error means a recorded lock holder could not be re-established
// (undecodable lock name, or an unavailable database session). This is fatal
// by design: we fail closed rather than risk a silent double-acquire.
if err := wfc.syncManager.Initialize(ctx, wfList.Items); err != nil {
return err
}

if err := wfc.throttler.Init(wfList.Items); err != nil {
return err
Expand Down
7 changes: 7 additions & 0 deletions workflow/sync/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ import (

type semaphore interface {
acquire(ctx context.Context, holderKey string, tx *sqldb.SessionProxy) bool
// reacquire re-establishes a recorded holder at controller startup, ignoring
// the current limit. Unlike acquire it always represents the hold, so the
// in-memory count reflects persisted reality even when recorded holders
// exceed a (since lowered) limit - new acquisitions then correctly wait until
// the count drains below the limit, rather than dropping a holder (a
// double-acquire) or poisoning the lock over a routine limit change.
reacquire(ctx context.Context, holderKey string, tx *sqldb.SessionProxy)
checkAcquire(ctx context.Context, holderKey string, tx *sqldb.SessionProxy) (bool, bool, string)
tryAcquire(ctx context.Context, holderKey string, tx *sqldb.SessionProxy) (bool, string)
release(ctx context.Context, key string) bool
Expand Down
10 changes: 10 additions & 0 deletions workflow/sync/database_semaphore.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,16 @@ func (s *databaseSemaphore) acquire(ctx context.Context, holderKey string, tx *s
return false
}

// reacquire re-establishes a recorded holder at startup. For a database-backed
// lock the holder is durable - its row survives the controller restart - so the
// hold is already represented regardless of the current limit. We still call
// acquire as a best-effort re-assertion (it inserts the held row if missing);
// any failure (already-held row, limit exceeded, transient error) is harmless
// because the persisted row keeps the hold counted by currentHolders.
func (s *databaseSemaphore) reacquire(ctx context.Context, holderKey string, tx *sqldb.SessionProxy) {
s.acquire(ctx, holderKey, tx)
}

func (s *databaseSemaphore) tryAcquire(ctx context.Context, holderKey string, tx *sqldb.SessionProxy) (bool, string) {
logger := s.logger(ctx)
acq, already, msg := s.checkAcquire(ctx, holderKey, tx)
Expand Down
2 changes: 1 addition & 1 deletion workflow/sync/mutex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestMutexLock(t *testing.T) {

wfList, err := wfclientset.ArgoprojV1alpha1().Workflows("default").List(ctx, metav1.ListOptions{})
require.NoError(t, err)
syncManager.Initialize(ctx, wfList.Items)
require.NoError(t, syncManager.Initialize(ctx, wfList.Items))
assert.Len(t, syncManager.syncLockMap, 1)
})
t.Run("WfLevelMutexAcquireAndRelease", func(t *testing.T) {
Expand Down
81 changes: 81 additions & 0 deletions workflow/sync/poison.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package sync

import (
"context"
"fmt"
"time"

"github.com/argoproj/argo-workflows/v4/util/sqldb"
)

// poisonedLock is a sentinel lock installed into the Manager's syncLockMap when,
// during Initialize, the controller cannot re-establish a holder that a Running
// workflow's status claims to hold.
//
// The soundness invariant is: if a Workflow's status records that it is holding
// a lock, the in-memory lock map must reflect that hold after Initialize.
// Otherwise a racing workflow's TryAcquire would find the lock absent, create a
// fresh one, and acquire a lock that is - per persisted state - already held.
// For a mutex that means two workflows running concurrently under the same
// mutex.
//
// Rather than silently dropping the holder (the previous behaviour), we install
// this lock, which refuses every acquire and reports a poisoned-state message.
// That message surfaces on the waiting node's synchronization status, marking
// the node/workflow as blocked by a poisoned lock so an operator can intervene.
//
// The poison is in-memory only and is cleared on the next controller restart,
// at which point Initialize re-evaluates: if the offending workflow is no longer
// Running the lock is recreated clean; if it is still Running and still
// unresolvable, it is poisoned again.
type poisonedLock struct {
name string
reason string
}

var _ semaphore = &poisonedLock{}

func newPoisonedLock(name, reason string) *poisonedLock {
return &poisonedLock{name: name, reason: reason}
}

func (p *poisonedLock) message() string {
return fmt.Sprintf("lock %s is in a poisoned state: %s; manual intervention required", p.name, p.reason)
}

func (p *poisonedLock) acquire(_ context.Context, _ string, _ *sqldb.SessionProxy) bool {
return false
}

// reacquire is a no-op: a poisoned lock refuses all holds until restart.
func (p *poisonedLock) reacquire(_ context.Context, _ string, _ *sqldb.SessionProxy) {}

func (p *poisonedLock) checkAcquire(_ context.Context, _ string, _ *sqldb.SessionProxy) (bool, bool, string) {
return false, false, p.message()
}

func (p *poisonedLock) tryAcquire(_ context.Context, _ string, _ *sqldb.SessionProxy) (bool, string) {
return false, p.message()
}

func (p *poisonedLock) release(_ context.Context, _ string) bool { return false }

func (p *poisonedLock) addToQueue(_ context.Context, _ string, _ int32, _ time.Time) error {
return nil
}

func (p *poisonedLock) removeFromQueue(_ context.Context, _ string) error { return nil }

func (p *poisonedLock) getCurrentHolders(_ context.Context) ([]string, error) { return nil, nil }

func (p *poisonedLock) getCurrentPending(_ context.Context) ([]string, error) { return nil, nil }

func (p *poisonedLock) getLimit(_ context.Context) int { return 0 }

func (p *poisonedLock) probeWaiting(_ context.Context) {}

// lock returns true so that tryAcquireImpl proceeds to checkAcquire, which
// returns the poisoned-state message rather than a generic "failed to lock()".
func (p *poisonedLock) lock(_ context.Context) bool { return true }

func (p *poisonedLock) unlock(_ context.Context) {}
16 changes: 16 additions & 0 deletions workflow/sync/semaphore.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,22 @@ func (s *prioritySemaphore) acquire(_ context.Context, holderKey string, _ *sqld
return false
}

// reacquire re-establishes a recorded holder at startup, ignoring the limit. It
// always registers the holder, even when the recorded holders already exceed the
// current limit (e.g. the limit was lowered while held). The weighted semaphore
// is capped at the limit, so a slot is only taken when one is free; the excess is
// tracked solely in lockHolder, exactly as a downward resize leaves it. release()
// already tolerates len(lockHolder) > limit and only frees a weighted slot once
// the count drops below the limit, so new acquisitions wait until every recorded
// holder has drained.
func (s *prioritySemaphore) reacquire(_ context.Context, holderKey string, _ *sqldb.SessionProxy) {
if _, ok := s.lockHolder[holderKey]; ok {
return
}
s.semaphore.TryAcquire(1) // best effort: take a slot if one is free
s.lockHolder[holderKey] = true
}

func isSameWorkflowNodeKeys(firstKey, secondKey string) bool {
firstItems := strings.Split(firstKey, "/")
secondItems := strings.Split(secondKey, "/")
Expand Down
Loading
Loading