diff --git a/common/util.go b/common/util.go index 6004f8d0169..f25a451545b 100644 --- a/common/util.go +++ b/common/util.go @@ -29,6 +29,7 @@ import ( "math/rand" "strings" "sync" + "sync/atomic" "time" "github.com/dgryski/go-farm" @@ -142,6 +143,22 @@ func AwaitWaitGroup(wg *sync.WaitGroup, timeout time.Duration) bool { } } +func WatchMissedDeadline(shouldBeDoneBy time.Duration, deadMansSwitch func()) (isDone func()) { + done := atomic.Bool{} + go func() { + time.Sleep(shouldBeDoneBy) + isDone := done.Load() + if isDone { + return + } + deadMansSwitch() + }() + + return func() { + done.Swap(true) + } +} + // CreatePersistenceRetryPolicy creates a retry policy for persistence layer operations func CreatePersistenceRetryPolicy() backoff.RetryPolicy { policy := backoff.NewExponentialRetryPolicy(retryPersistenceOperationInitialInterval) diff --git a/common/util_test.go b/common/util_test.go index 155b30c8d85..3455f1d5661 100644 --- a/common/util_test.go +++ b/common/util_test.go @@ -1721,3 +1721,22 @@ func TestCheckEventBlobSizeLimit(t *testing.T) { }) } } + +func TestWatchMissedDeadlineDoesNotFireWhenDone(t *testing.T) { + fired := false + isDone := WatchMissedDeadline(time.Millisecond*10, func() { + fired = true + }) + isDone() + assert.False(t, fired) +} + +func TestWatchMissedDeadlineDoesFireWhenNotDone(t *testing.T) { + fired := false + isDone := WatchMissedDeadline(time.Millisecond*10, func() { + fired = true + }) + time.Sleep(time.Millisecond * 11) + isDone() + assert.True(t, fired) +} diff --git a/service/history/queue/transfer_queue_processor.go b/service/history/queue/transfer_queue_processor.go index 4573fddba2b..ddcb9c40def 100644 --- a/service/history/queue/transfer_queue_processor.go +++ b/service/history/queue/transfer_queue_processor.go @@ -192,6 +192,9 @@ func (t *transferQueueProcessor) Stop() { defer t.logger.Info("Transfer queue processor stopped") if !t.shard.GetConfig().QueueProcessorEnableGracefulSyncShutdown() { + isDone := common.WatchMissedDeadline(time.Minute, func() { + t.logger.Error("Graceful shutdown of transfer queue processor has failed") + }) t.activeQueueProcessor.Stop() for _, standbyQueueProcessor := range t.standbyQueueProcessors { standbyQueueProcessor.Stop() @@ -199,6 +202,7 @@ func (t *transferQueueProcessor) Stop() { close(t.shutdownChan) common.AwaitWaitGroup(&t.shutdownWG, time.Minute) + isDone() return }