diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs b/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs index 6308ec4f171..a32a42379d7 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs @@ -958,6 +958,7 @@ public override string ToString() private readonly MessageBufferMap _messageBuffers = new(); private IActorRef? _handOffStopper; + private bool _preparingForShutdown = false; private readonly ICancelable? _passivateIdleTask; private readonly Lease? _lease; private readonly TimeSpan _leaseRetryInterval = TimeSpan.FromSeconds(5); // won't be used @@ -1030,6 +1031,14 @@ protected override bool Receive(object message) protected override void PreStart() { + Cluster.Get(system: Context.System).Subscribe( + subscriber: Self, + initialStateMode: ClusterEvent.SubscriptionInitialStateMode.InitialStateAsEvents, + to: new [] + { + typeof(ClusterEvent.MemberPreparingForShutdown), + typeof(ClusterEvent.MemberReadyForShutdown) + }); AcquireLeaseIfNeeded(); } @@ -1110,6 +1119,10 @@ private bool AwaitingLease(object message) case LeaseLost ll: ReceiveLeaseLost(ll); return true; + + case ClusterEvent.IMemberEvent evt: + ReceiveMemberEvent(evt); + return true; } if (_verboseDebug) @@ -1121,6 +1134,18 @@ private bool AwaitingLease(object message) return true; } + private void ReceiveMemberEvent(ClusterEvent.IMemberEvent evt) + { + if (evt is ClusterEvent.MemberReadyForShutdown or ClusterEvent.MemberPreparingForShutdown) + { + if (!_preparingForShutdown) + { + Log.Info("{0}: Preparing for shutdown", _typeName); + _preparingForShutdown = true; + } + } + } + private void TryGetLease(Lease lease) { Log.Info("{0}: Acquiring lease {1}", _typeName, lease.Settings); @@ -1164,6 +1189,9 @@ private bool AwaitingRememberedEntities(object message) case RememberEntityTimeout _: LoadingEntityIdsFailed(); return true; + case ClusterEvent.IMemberEvent me: + ReceiveMemberEvent(me); + return true; } if (_verboseDebug) @@ -1218,6 +1246,9 @@ private bool Idle(object message) case Terminated t: ReceiveTerminated(t.ActorRef); return true; + case ClusterEvent.IMemberEvent me: + ReceiveMemberEvent(me); + return true; case EntityTerminated t: ReceiveEntityTerminated(t.Ref); return true; @@ -1328,6 +1359,9 @@ bool WaitingForRememberEntitiesStore(object message) case ShardRegion.StartEntity se: StartEntity(se.EntityId, Sender); return true; + case ClusterEvent.IMemberEvent me: + ReceiveMemberEvent(me); + return true; case Terminated t: ReceiveTerminated(t.ActorRef); return true; @@ -1593,7 +1627,16 @@ private void HandOff(IActorRef replyTo) // does conversion so only do once var activeEntities = _entities.ActiveEntities; - if (activeEntities.Count > 0) + if (_preparingForShutdown) + { + Log.Info("{0}: HandOff shard [{1}] while preparing for shutdown. Stopping right away.", _typeName, _shardId); + foreach (var entity in activeEntities) + { + entity.Tell(_handOffStopMessage); + replyTo.Tell(new ShardStopped(_shardId)); + Context.Stop(Self); + } + } else if (activeEntities.Count > 0 && !_preparingForShutdown) { var entityHandOffTimeout = (_settings.TuningParameters.HandOffTimeout - TimeSpan.FromSeconds(5)).Max( diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardCoordinator.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardCoordinator.cs index 4155c9f98d4..6f81ae3f2a3 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardCoordinator.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardCoordinator.cs @@ -1630,7 +1630,7 @@ private void Done(bool ok) internal ILoggingAdapter Log { get; } internal bool VerboseDebug { get; } internal CoordinatorState State { get; set; } - + private bool _preparingForShutdown = false; public ShardCoordinator( string typeName, @@ -1668,7 +1668,15 @@ Action unstashOneGetShardHomeRequest _rebalanceTask = context.System.Scheduler.ScheduleTellRepeatedlyCancelable(Settings.TuningParameters.RebalanceInterval, Settings.TuningParameters.RebalanceInterval, context.Self, RebalanceTick.Instance, ActorRefs.NoSender); - _cluster.Subscribe(context.Self, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsEvents, new[] { typeof(ClusterEvent.ClusterShuttingDown) }); + _cluster.Subscribe( + context.Self, + ClusterEvent.SubscriptionInitialStateMode.InitialStateAsEvents, + new[] + { + typeof(ClusterEvent.ClusterShuttingDown), + typeof(ClusterEvent.MemberReadyForShutdown), + typeof(ClusterEvent.MemberPreparingForShutdown) + }); } @@ -1916,6 +1924,16 @@ internal bool Active(object message) _context.Become(ShuttingDown); return true; + case ClusterEvent.MemberPreparingForShutdown: + case ClusterEvent.MemberReadyForShutdown: + if (!_preparingForShutdown) + { + _preparingForShutdown = true; + Log.Info("{0}: Shard coordinator detected prepare for full cluster shutdown. No new rebalances will take place.", TypeName); + _rebalanceTask.Cancel(); + } + return true; + case GetCurrentRegions _: var reply = new CurrentRegions(State.Regions.Keys.Select(r => string.IsNullOrEmpty(r.Path.Address.Host) ? _cluster.SelfAddress : r.Path.Address).ToImmutableHashSet()); diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs index 0e76a5c89ef..6ea7ffde560 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs @@ -427,6 +427,7 @@ internal static Props ProxyProps( private bool _loggedFullBufferWarning; private const int RetryCountThreshold = 5; private bool _gracefulShutdownInProgress; + private bool _preparingForShutdown = false; private readonly CoordinatedShutdown _coordShutdown = CoordinatedShutdown.Get(Context.System); private readonly TaskCompletionSource _gracefulShutdownProgress = new(); @@ -868,6 +869,15 @@ private void HandleShardRegionCommand(IShardRegionCommand command) break; case GracefulShutdown _: + if (_preparingForShutdown) + { + _log.Debug("{0}: Skipping graceful shutdown of region and all its shards as cluster is preparing for shutdown", + _typeName); + _gracefulShutdownProgress.TrySetResult(Done.Instance); + Context.Stop(Self); + return; + } + _log.Debug("{0}: Starting graceful shutdown of region and all its shards", _typeName); var coordShutdown = CoordinatedShutdown.Get(Context.System); @@ -1162,6 +1172,12 @@ private void HandleCoordinatorMessage(ShardCoordinator.ICoordinatorMessage messa break; case ShardCoordinator.BeginHandOff bho: { + if (_preparingForShutdown) + { + _log.Debug("{0}: Ignoring begin handoff as preparing to shutdown", _typeName); + break; + } + var shard = bho.Shard; _log.Debug("{0}: BeginHandOff shard [{1}]", _typeName, shard); if (_regionByShard.TryGetValue(shard, out var regionRef)) @@ -1179,8 +1195,8 @@ private void HandleCoordinatorMessage(ShardCoordinator.ICoordinatorMessage messa } Sender.Tell(new ShardCoordinator.BeginHandOffAck(shard)); - } break; + } case ShardCoordinator.HandOff ho: { var shard = ho.Shard; @@ -1354,6 +1370,14 @@ private void HandleClusterEvent(ClusterEvent.IClusterDomainEvent e) Context.Stop(Self); } + break; + case ClusterEvent.MemberReadyForShutdown: + case ClusterEvent.MemberPreparingForShutdown: + if (!_preparingForShutdown) + { + _preparingForShutdown = true; + _log.Info("{0}: Preparing for shutdown", _typeName); + } break; case ClusterEvent.IMemberEvent _: // these are expected, no need to warn about them diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonManager.cs b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonManager.cs index c6d5c73528f..d49db41bfc6 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonManager.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonManager.cs @@ -619,6 +619,7 @@ public static Props Props(Props singletonProps, object terminationMessage, Clust // Previous GetNext request delivered event and new GetNext is to be sent private bool _oldestChangedReceived = true; + private bool _preparingForFullShutdown = false; private bool _selfExited; // started when self member is Up @@ -716,7 +717,16 @@ protected override void PreStart() throw new ActorInitializationException("Cluster node must not be terminated"); // subscribe to cluster changes, re-subscribe when restart - _cluster.Subscribe(Self, ClusterEvent.InitialStateAsEvents, typeof(ClusterEvent.MemberRemoved), typeof(ClusterEvent.MemberDowned)); + _cluster.Subscribe( + subscriber: Self, + initialStateMode: InitialStateAsEvents, + to: new[] + { + typeof(MemberRemoved), + typeof(MemberDowned), + typeof(MemberPreparingForShutdown), + typeof(MemberReadyForShutdown) + }); SetTimer(CleanupTimer, Cleanup.Instance, TimeSpan.FromMinutes(1.0), repeat: true); @@ -768,16 +778,19 @@ private void GetNextOldestChanged() private State TryAcquireLease() { - var self = Self; - lease.Acquire(reason => - { - self.Tell(new LeaseLost(reason)); - }).ContinueWith(r => + if (!_preparingForFullShutdown) { - if (r.IsFaulted || r.IsCanceled) - return (object)new AcquireLeaseFailure(r.Exception); - return new AcquireLeaseResult(r.Result); - }).PipeTo(Self); + var self = Self; + lease.Acquire(reason => + { + self.Tell(new LeaseLost(reason)); + }).ContinueWith(r => + { + if (r.IsFaulted || r.IsCanceled) + return (object)new AcquireLeaseFailure(r.Exception); + return new AcquireLeaseResult(r.Result); + }).PipeTo(Self); + } return GoTo(ClusterSingletonState.AcquiringLease).Using(new AcquiringLeaseData(true, null)); } @@ -797,10 +810,15 @@ private State TryGotoOldest() private State GoToOldest() { + if (_preparingForFullShutdown) + { + _log.Info("Singleton manager NOT starting singleton actor [{}] as cluster is preparing to shutdown", + Self.Path / _settings.SingletonName); + return GoTo(ClusterSingletonState.Oldest).Using(new OldestData(Nobody.Instance)); + } var singleton = Context.Watch(Context.ActorOf(_singletonProps, _settings.SingletonName)); Log.Info("Singleton manager started singleton actor [{0}] ", singleton.Path); - return - GoTo(ClusterSingletonState.Oldest).Using(new OldestData(singleton)); + return GoTo(ClusterSingletonState.Oldest).Using(new OldestData(singleton)); } private State HandleOldestChanged(IActorRef singleton, UniqueAddress oldest) @@ -819,13 +837,19 @@ private State HandleOldestChanged( return GoToHandingOver(singleton, null); case UniqueAddress a: // send TakeOver request in case the new oldest doesn't know previous oldest - Peer(a.Address).Tell(TakeOverFromMe.Instance); - SetTimer(TakeOverRetryTimer, new TakeOverRetry(1), _settings.HandOverRetryInterval, repeat: false); + if (!_preparingForFullShutdown) + { + Peer(a.Address).Tell(TakeOverFromMe.Instance); + SetTimer(TakeOverRetryTimer, new TakeOverRetry(1), _settings.HandOverRetryInterval, repeat: false); + } return GoTo(ClusterSingletonState.WasOldest) .Using(new WasOldestData(singleton, a)); case null: // new oldest will initiate the hand-over - SetTimer(TakeOverRetryTimer, new TakeOverRetry(1), _settings.HandOverRetryInterval, repeat: false); + if(!_preparingForFullShutdown) + { + SetTimer(TakeOverRetryTimer, new TakeOverRetry(1), _settings.HandOverRetryInterval, repeat: false); + } return GoTo(ClusterSingletonState.WasOldest) .Using(new WasOldestData(singleton, newOldest: null)); } @@ -899,6 +923,10 @@ private void InitializeFSM() case HandOverToMe _: // nothing to hand over in start return Stay(); + + case IMemberEvent evt: + return HandleMemberEvent(evt); + default: return null; } @@ -919,7 +947,10 @@ private void InitializeFSM() } else { - Peer(youngerData.Oldest.Head().Address).Tell(HandOverToMe.Instance); + if(!_preparingForFullShutdown) + { + Peer(youngerData.Oldest.Head().Address).Tell(HandOverToMe.Instance); + } return GoTo(ClusterSingletonState.BecomingOldest).Using(new BecomingOldestData(youngerData.Oldest)); } } @@ -932,21 +963,15 @@ private void InitializeFSM() return Stay().Using(new YoungerData(youngerData.Oldest)); } } - else if (e.FsmEvent is MemberDowned memberDowned && memberDowned.Member.UniqueAddress.Equals(_cluster.SelfUniqueAddress)) + else if (e.FsmEvent is MemberRemoved memberRemoved && !memberRemoved.Member.UniqueAddress.Equals(_cluster.SelfUniqueAddress)) { - Log.Info("Self downed, stopping ClusterSingletonManager"); - return Stop(); - } - else if (e.FsmEvent is MemberRemoved memberRemoved) - { - if (memberRemoved.Member.UniqueAddress.Equals(_cluster.SelfUniqueAddress)) - { - Log.Info("Self removed, stopping ClusterSingletonManager"); - return Stop(); - } ScheduleDelayedMemberRemoved(memberRemoved.Member); return Stay(); } + else if (e.FsmEvent is IMemberEvent evt) + { + return HandleMemberEvent(evt); + } else if (e.FsmEvent is DelayedMemberRemoved removed && e.StateData is YoungerData data) { if (!_selfExited) @@ -967,7 +992,10 @@ private void InitializeFSM() { // this node was probably quickly restarted with same hostname:port, // confirm that the old singleton instance has been stopped - Sender.Tell(HandOverDone.Instance); + if(!_preparingForFullShutdown) + { + Sender.Tell(HandOverDone.Instance); + } } return Stay(); } @@ -1001,23 +1029,14 @@ private void InitializeFSM() Log.Info("Ignoring HandOverDone in BecomingOldest from [{0}].", Sender.Path.Address); return Stay(); } - else if (e.FsmEvent is MemberDowned memberDowned && memberDowned.Member.UniqueAddress.Equals(_cluster.SelfUniqueAddress)) + else if (e.FsmEvent is MemberRemoved memberRemoved && !memberRemoved.Member.UniqueAddress.Equals(_cluster.SelfUniqueAddress)) { - Log.Info("Self downed, stopping ClusterSingletonManager"); - return Stop(); + ScheduleDelayedMemberRemoved(memberRemoved.Member); + return Stay(); } - else if (e.FsmEvent is MemberRemoved memberRemoved) + else if (e.FsmEvent is IMemberEvent evt && evt.Member.UniqueAddress.Equals(_cluster.SelfUniqueAddress)) { - if (memberRemoved.Member.UniqueAddress.Equals(_cluster.SelfUniqueAddress)) - { - Log.Info("Self removed, stopping ClusterSingletonManager"); - return Stop(); - } - else - { - ScheduleDelayedMemberRemoved(memberRemoved.Member); - return Stay(); - } + return HandleMemberEvent(evt); } else if (e.FsmEvent is DelayedMemberRemoved delayed && e.StateData is BecomingOldestData becoming) { @@ -1061,13 +1080,21 @@ private void InitializeFSM() { case UniqueAddress oldest: if (oldest.Equals(senderUniqueAddress)) - Sender.Tell(HandOverToMe.Instance); + { + if(!_preparingForFullShutdown) + { + Sender.Tell(HandOverToMe.Instance); + } + } else Log.Info("Ignoring TakeOver request in BecomingOldest from [{0}]. Expected previous oldest [{1}]", Sender.Path.Address, oldest.Address); return Stay(); case null: - Sender.Tell(HandOverToMe.Instance); + if(!_preparingForFullShutdown) + { + Sender.Tell(HandOverToMe.Instance); + } becomingOldestData.PreviousOldest.Insert(0, senderUniqueAddress); return Stay().Using(new BecomingOldestData(becomingOldestData.PreviousOldest)); } @@ -1078,10 +1105,13 @@ private void InitializeFSM() { if (handOverRetry.Count <= _maxHandOverRetries) { - var oldest = becomingOldest.PreviousOldest.Head(); - Log.Info("Retry [{0}], sending HandOverToMe to [{1}]", handOverRetry.Count, oldest?.Address); - if (oldest != null) Peer(oldest.Address).Tell(HandOverToMe.Instance); - SetTimer(HandOverRetryTimer, new HandOverRetry(handOverRetry.Count + 1), _settings.HandOverRetryInterval); + if (!_preparingForFullShutdown) + { + var oldest = becomingOldest.PreviousOldest.Head(); + Log.Info("Retry [{0}], sending HandOverToMe to [{1}]", handOverRetry.Count, oldest?.Address); + if (oldest != null) Peer(oldest.Address).Tell(HandOverToMe.Instance); + SetTimer(HandOverRetryTimer, new HandOverRetry(handOverRetry.Count + 1), _settings.HandOverRetryInterval); + } return Stay(); } else if (becomingOldest.PreviousOldest != null && becomingOldest.PreviousOldest.All(m => _removed.ContainsKey(m))) @@ -1168,11 +1198,8 @@ private void InitializeFSM() return Stay(); } - else if (e.FsmEvent is MemberDowned md && md.Member.UniqueAddress.Equals(_cluster.SelfUniqueAddress)) - { - Log.Info("Self downed, stopping ClusterSingletonManager"); - return Stop(); - } + else if (e.FsmEvent is IMemberEvent evt) + return HandleMemberEvent(evt); return null; }); @@ -1212,6 +1239,10 @@ private void InitializeFSM() Log.Info("Self downed, stopping"); return GoToStopping(od.Singleton); + // Downed in this case is handled differently so keep this below it + case IMemberEvent evt: + return HandleMemberEvent(evt); + case LeaseLost ll when e.StateData is OldestData od2: Log.Warning(ll.Reason, "Lease has been lost. Terminating singleton and trying to re-acquire lease"); if (od2.Singleton != null) @@ -1249,10 +1280,13 @@ private void InitializeFSM() else Log.Debug("Retry [{0}], sending TakeOverFromMe to [{1}]", takeOverRetry.Count, wasOldestData.NewOldest?.Address); - if (wasOldestData.NewOldest != null) - Peer(wasOldestData.NewOldest.Address).Tell(TakeOverFromMe.Instance); + if (!_preparingForFullShutdown) + { + if (wasOldestData.NewOldest != null) + Peer(wasOldestData.NewOldest.Address).Tell(TakeOverFromMe.Instance); - SetTimer(TakeOverRetryTimer, new TakeOverRetry(takeOverRetry.Count + 1), _settings.HandOverRetryInterval, false); + SetTimer(TakeOverRetryTimer, new TakeOverRetry(takeOverRetry.Count + 1), _settings.HandOverRetryInterval, false); + } return Stay(); } else @@ -1307,7 +1341,10 @@ private void InitializeFSM() return GoToStopping(od.Singleton); } } - + else if (e.FsmEvent is IMemberEvent evt) + { + return HandleMemberEvent(evt); + } return null; }); @@ -1324,7 +1361,10 @@ private void InitializeFSM() && d.HandOverTo.Equals(Sender)) { // retry - Sender.Tell(HandOverInProgress.Instance); + if(!_preparingForFullShutdown) + { + Sender.Tell(HandOverInProgress.Instance); + } return Stay(); } else if (e.FsmEvent is SelfExiting) @@ -1334,21 +1374,36 @@ private void InitializeFSM() Sender.Tell(Done.Instance); return Stay(); } + else if (e.FsmEvent is MemberReadyForShutdown shutdownEvent && shutdownEvent.Member.UniqueAddress.Equals(_cluster.SelfUniqueAddress)) + { + _log.Info("Ready for shutdown when handing over. Giving up on handover."); + return Stop(); + } + else if (e.FsmEvent is MemberPreparingForShutdown preparingEvent && preparingEvent.Member.UniqueAddress.Equals(_cluster.SelfUniqueAddress)) + { + _log.Info("Preparing for shutdown when handing over. Giving up on handover."); + return Stop(); + } return null; }); When(ClusterSingletonState.Stopping, e => { - if (e.FsmEvent is Terminated terminated - && e.StateData is StoppingData stoppingData - && terminated.ActorRef.Equals(stoppingData.Singleton)) + switch (e.FsmEvent) { - Log.Info("Singleton actor [{0}] was terminated", stoppingData.Singleton.Path); - return Stop(); + case Terminated terminated when e.StateData is StoppingData stoppingData && terminated.ActorRef.Equals(stoppingData.Singleton): + Log.Info("Singleton actor [{0}] was terminated", stoppingData.Singleton.Path); + return Stop(); + case MemberReadyForShutdown evt when evt.Member.UniqueAddress.Equals(_cluster.SelfUniqueAddress): + Log.Info("Ready for shutdown when stopping. Not waiting for user actor to shutdown"); + return Stop(); + case MemberPreparingForShutdown evt when evt.Member.UniqueAddress.Equals(_cluster.SelfUniqueAddress): + Log.Info("Preparing for shutdown when stopping. Not waiting for user actor to shutdown"); + return Stop(); + default: + return null; } - - return null; }); When(ClusterSingletonState.End, e => @@ -1441,6 +1496,11 @@ private void InitializeFSM() } return Stay(); } + + if (e.FsmEvent is IMemberEvent) + { + return Stay(); // silence + } return null; }); @@ -1489,10 +1549,42 @@ private void InitializeFSM() StartWith(ClusterSingletonState.Start, Uninitialized.Instance); } + private State HandleMemberEvent(IMemberEvent memberEvent) + { + switch (memberEvent) + { + case MemberRemoved evt when evt.Member.UniqueAddress.Equals(_cluster.SelfUniqueAddress): + _log.Info("Self removed, stopping ClusterSingletonManager"); + return Stop(); + case MemberDowned evt when evt.Member.UniqueAddress.Equals(_cluster.SelfUniqueAddress): + _log.Info("Self removed, stopping ClusterSingletonManager"); + return Stop(); + case MemberReadyForShutdown: + case MemberPreparingForShutdown: + if (!_preparingForFullShutdown) + { + _log.Info("Preparing for shut down, disabling expensive actions"); + _preparingForFullShutdown = true; + } + return Stay(); + default: + return Stay(); + } + } + private void SelfMemberExited() { _selfExited = true; - Log.Info("Exited [{0}]", _cluster.SelfAddress); + if (_preparingForFullShutdown) + { + Log.Info("Exited [{0}]. From preparing from shutdown", _cluster.SelfAddress); + // handover won't be done so just complete right away + _memberExitingProgress.TrySetResult(Done.Instance); + } + else + { + Log.Info("Exited [{0}]", _cluster.SelfAddress); + } } private void ScheduleDelayedMemberRemoved(Member member) diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.DotNet.verified.txt index 38b3fc8834f..73a31b98e72 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.DotNet.verified.txt @@ -118,6 +118,14 @@ namespace Akka.Cluster { public MemberLeft(Akka.Cluster.Member member) { } } + public sealed class MemberPreparingForShutdown : Akka.Cluster.ClusterEvent.MemberStatusChange + { + public MemberPreparingForShutdown(Akka.Cluster.Member member) { } + } + public sealed class MemberReadyForShutdown : Akka.Cluster.ClusterEvent.MemberStatusChange + { + public MemberReadyForShutdown(Akka.Cluster.Member member) { } + } public sealed class MemberRemoved : Akka.Cluster.ClusterEvent.MemberStatusChange { public MemberRemoved(Akka.Cluster.Member member, Akka.Cluster.MemberStatus previousStatus) { } @@ -268,6 +276,8 @@ namespace Akka.Cluster Down = 4, Removed = 5, WeaklyUp = 6, + PreparingForShutdown = 7, + ReadyForShutdown = 8, } public sealed class NoDowning : Akka.Cluster.IDowningProvider { diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.Net.verified.txt index f9aa7b26004..1d8d2a50e4d 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCluster.Net.verified.txt @@ -118,6 +118,14 @@ namespace Akka.Cluster { public MemberLeft(Akka.Cluster.Member member) { } } + public sealed class MemberPreparingForShutdown : Akka.Cluster.ClusterEvent.MemberStatusChange + { + public MemberPreparingForShutdown(Akka.Cluster.Member member) { } + } + public sealed class MemberReadyForShutdown : Akka.Cluster.ClusterEvent.MemberStatusChange + { + public MemberReadyForShutdown(Akka.Cluster.Member member) { } + } public sealed class MemberRemoved : Akka.Cluster.ClusterEvent.MemberStatusChange { public MemberRemoved(Akka.Cluster.Member member, Akka.Cluster.MemberStatus previousStatus) { } @@ -268,6 +276,8 @@ namespace Akka.Cluster Down = 4, Removed = 5, WeaklyUp = 6, + PreparingForShutdown = 7, + ReadyForShutdown = 8, } public sealed class NoDowning : Akka.Cluster.IDowningProvider { diff --git a/src/core/Akka.Cluster.Tests/MemberOrderingSpec.cs b/src/core/Akka.Cluster.Tests/MemberOrderingSpec.cs index 6add0f1b98e..c5ec19d8854 100644 --- a/src/core/Akka.Cluster.Tests/MemberOrderingSpec.cs +++ b/src/core/Akka.Cluster.Tests/MemberOrderingSpec.cs @@ -267,6 +267,34 @@ public void LeaderOrdering_must_order_members_with_status_joining_exiting_down_l shuffled.Sort(Member.LeaderStatusOrdering).Should().BeEquivalentTo(expected); } + [Theory(DisplayName = "HighestPriorityOf should return the correct priority member")] + [InlineData(MemberStatus.Removed, MemberStatus.Up, 0)] + [InlineData(MemberStatus.Up, MemberStatus.Removed, 1)] + [InlineData(MemberStatus.ReadyForShutdown, MemberStatus.Up, 0)] + [InlineData(MemberStatus.Up, MemberStatus.ReadyForShutdown, 1)] + [InlineData(MemberStatus.Down, MemberStatus.Up, 0)] + [InlineData(MemberStatus.Up, MemberStatus.Down, 1)] + [InlineData(MemberStatus.Exiting, MemberStatus.Up, 0)] + [InlineData(MemberStatus.Up, MemberStatus.Exiting, 1)] + [InlineData(MemberStatus.Leaving, MemberStatus.Up, 0)] + [InlineData(MemberStatus.Up, MemberStatus.Leaving, 1)] + [InlineData(MemberStatus.Joining, MemberStatus.Up, 0)] + [InlineData(MemberStatus.Up, MemberStatus.Joining, 1)] + [InlineData(MemberStatus.WeaklyUp, MemberStatus.Up, 0)] + [InlineData(MemberStatus.Up, MemberStatus.WeaklyUp, 1)] + [InlineData(MemberStatus.PreparingForShutdown, MemberStatus.Up, 0)] + [InlineData(MemberStatus.Up, MemberStatus.PreparingForShutdown, 1)] + [InlineData(MemberStatus.Up, MemberStatus.Up, 0)] + public void HighestPriorityOfTest(MemberStatus first, MemberStatus second, int returnIndex) + { + var address = new Address("akka.tcp", "sys1", "host1", 5000); + var members = new []{ + TestMember.Create(address, first), + TestMember.Create(address.WithPort(7000), second) + }; + Member.HighestPriorityOf(members[0], members[1]).Should().Be(members[returnIndex]); + } + [Fact] public void MemberAgeOrdering_must_order_members_by_ascending_UpNumber() { diff --git a/src/core/Akka.Cluster/Cluster.cs b/src/core/Akka.Cluster/Cluster.cs index 9909c64156f..f6e81c97b58 100644 --- a/src/core/Akka.Cluster/Cluster.cs +++ b/src/core/Akka.Cluster/Cluster.cs @@ -253,6 +253,14 @@ public void Join(Address address) ClusterCore.Tell(new ClusterUserAction.JoinTo(FillLocal(address))); } + /// + /// Change the state of every member in preparation for a full cluster shutdown. + /// + internal void PrepareForFullClusterShutdown() + { + ClusterCore.Tell(ClusterUserAction.PrepareForShutdown.Instance); + } + /// /// Try to asynchronously join this cluster node specified by . /// A command is sent to the node to join. Returned task will be completed diff --git a/src/core/Akka.Cluster/ClusterDaemon.cs b/src/core/Akka.Cluster/ClusterDaemon.cs index 63b30897a19..29fa8308f19 100644 --- a/src/core/Akka.Cluster/ClusterDaemon.cs +++ b/src/core/Akka.Cluster/ClusterDaemon.cs @@ -122,6 +122,17 @@ public Down(Address address) { } } + + /// + /// Command to mark all nodes as shutting down + /// + internal sealed class PrepareForShutdown: BaseClusterUserAction, IClusterMessage + { + public static PrepareForShutdown Instance { get; } = new(); + + private PrepareForShutdown() : base(Address.AllSystems) + { } + } } /// @@ -959,6 +970,7 @@ internal static string VclockName(UniqueAddress node) private readonly IActorRef _publisher; private int _leaderActionCounter = 0; private int _selfDownCounter = 0; + private bool _preparingForShutdown = false; private bool _exitingTasksInProgress = false; private readonly TaskCompletionSource _selfExiting = new(); @@ -1334,6 +1346,10 @@ private void Initialized(object message) { Leaving(leave.Address); } + else if (message is ClusterUserAction.PrepareForShutdown) + { + StartPrepareForShutdown(); + } else if (message is InternalClusterAction.SendGossipTo sendGossipTo) { SendGossipTo(sendGossipTo.Address); @@ -1513,6 +1529,12 @@ public void StopSeedNodeProcess() /// The software version of the joining node. public void Joining(UniqueAddress node, ImmutableHashSet roles, AppVersion appVersion) { + if (_preparingForShutdown) + { + _log.Info("Ignoring join request from [{0}] as cluster is preparing for shutdown", node); + return; + } + var selfStatus = LatestGossip.GetMember(SelfUniqueAddress).Status; if (!node.Address.Protocol.Equals(_cluster.SelfAddress.Protocol)) { @@ -1626,6 +1648,32 @@ public void Welcome(Address joinWith, UniqueAddress from, Gossip gossip) } } + /// + /// State transition to PreparingForShutdown + /// + public void StartPrepareForShutdown() + { + if (!_preparingForShutdown) + { + _preparingForShutdown = true; + var changedMembers = LatestGossip.Members + .Select(m => AllowedToPrepareToShutdown.Contains(m.Status) + ? m.Copy(MemberStatus.PreparingForShutdown) + : m) + .ToImmutableSortedSet(); + var newGossip = LatestGossip.Copy(changedMembers); + UpdateLatestGossip(newGossip); + foreach (var member in changedMembers) + { + _log.Info("Preparing for shutdown [{0}] as [{1}]", + member.Address, + MemberStatus.PreparingForShutdown); + } + PublishMembershipState(); + SendGossip(); + } + } + /// /// State transition to LEAVING. /// The node will eventually be removed by the leader, after hand-off in EXITING, and only after @@ -1635,7 +1683,13 @@ public void Welcome(Address joinWith, UniqueAddress from, Gossip gossip) public void Leaving(Address address) { // only try to update if the node is available (in the member ring) - if (LatestGossip.Members.Any(m => m.Address.Equals(address) && m.Status is MemberStatus.Joining or MemberStatus.WeaklyUp or MemberStatus.Up)) + if (LatestGossip.Members.Any(m => + m.Address.Equals(address) + && m.Status is MemberStatus.Joining + or MemberStatus.WeaklyUp + or MemberStatus.Up + or MemberStatus.PreparingForShutdown + or MemberStatus.ReadyForShutdown)) { // mark node as LEAVING var newMembers = LatestGossip.Members.Select(m => @@ -2116,7 +2170,7 @@ public void LeaderActions() { _leaderActionCounter += 1; - if (_cluster.Settings.AllowWeaklyUpMembers && (_leaderActionCounter * _cluster.Settings.LeaderActionsInterval.TotalMilliseconds) >= _cluster.Settings.WeaklyUpAfter.TotalMilliseconds) + if (!_preparingForShutdown && _cluster.Settings.AllowWeaklyUpMembers && (_leaderActionCounter * _cluster.Settings.LeaderActionsInterval.TotalMilliseconds) >= _cluster.Settings.WeaklyUpAfter.TotalMilliseconds) MoveJoiningToWeaklyUp(); if (_leaderActionCounter == firstNotice || _leaderActionCounter % periodicNotice == 0) @@ -2139,9 +2193,19 @@ public void LeaderActions() } CleanupExitingConfirmed(); + CheckForPrepareForShutdown(); ShutdownSelfWhenDown(); } + private void CheckForPrepareForShutdown() + { + if( + AllowedToPrepareToShutdown.Contains(LatestGossip.Members.First(m => m.UniqueAddress == SelfUniqueAddress).Status) + && LatestGossip.Members.Any(m => PrepareForShutdownStates.Contains(m.Status))) + _log.Debug("Detected full cluster shutdown"); + Self.Tell(ClusterUserAction.PrepareForShutdown.Instance); + } + private void MoveJoiningToWeaklyUp() { var localGossip = LatestGossip; @@ -2251,10 +2315,11 @@ public void LeaderActionsOnConvergence() var upNumber = 0; var changedMembers = localMembers.Select(m => { - if (IsJoiningUp(m)) + if (IsJoiningUp(m) && !_preparingForShutdown) { // Move JOINING => UP (once all nodes have seen that this node is JOINING, i.e. we have a convergence) - // and minimum number of nodes have joined the cluster + // and minimum number of nodes have joined the cluster. + // don't move members to up when preparing for shutdown if (upNumber == 0) { // It is alright to use same upNumber as already used by a removed member, since the upNumber @@ -2276,6 +2341,12 @@ public void LeaderActionsOnConvergence() return m.Copy(MemberStatus.Exiting); } + if (m.Status == MemberStatus.PreparingForShutdown) + { + // Move PreparingForShutdown => ReadyForShutdown (once we have a convergence on PreparingForShutdown) + return m.Copy(MemberStatus.ReadyForShutdown); + } + return null; }).Where(m => m != null).ToImmutableSortedSet(); diff --git a/src/core/Akka.Cluster/ClusterEvent.cs b/src/core/Akka.Cluster/ClusterEvent.cs index e945e8878d2..5fa1df987e0 100644 --- a/src/core/Akka.Cluster/ClusterEvent.cs +++ b/src/core/Akka.Cluster/ClusterEvent.cs @@ -345,6 +345,18 @@ public sealed class MemberLeft : MemberStatusChange public MemberLeft(Member member) : base(member, MemberStatus.Leaving) { } } + + public sealed class MemberPreparingForShutdown: MemberStatusChange + { + public MemberPreparingForShutdown(Member member) + : base(member, MemberStatus.PreparingForShutdown) { } + } + + public sealed class MemberReadyForShutdown: MemberStatusChange + { + public MemberReadyForShutdown(Member member) + : base(member, MemberStatus.ReadyForShutdown) { } + } /// /// This class represents a event where the @@ -914,6 +926,16 @@ private static IEnumerable CollectMemberEvents(IEnumerable case MemberStatus.Down: yield return new MemberDowned(member); break; + case MemberStatus.PreparingForShutdown: + yield return new MemberPreparingForShutdown(member); + break; + case MemberStatus.ReadyForShutdown: + yield return new MemberReadyForShutdown(member); + break; + case MemberStatus.Removed: + default: + // no events for other transitions + break; } } } diff --git a/src/core/Akka.Cluster/Member.cs b/src/core/Akka.Cluster/Member.cs index 1abd73d5873..a0c6767b1d6 100644 --- a/src/core/Akka.Cluster/Member.cs +++ b/src/core/Akka.Cluster/Member.cs @@ -388,21 +388,27 @@ public static Member HighestPriorityOf(Member m1, Member m2) return m1.IsOlderThan(m2) ? m1 : m2; } - var m1Status = m1.Status; - var m2Status = m2.Status; - if (m1Status == MemberStatus.Removed) return m1; - if (m2Status == MemberStatus.Removed) return m2; - if (m1Status == MemberStatus.Down) return m1; - if (m2Status == MemberStatus.Down) return m2; - if (m1Status == MemberStatus.Exiting) return m1; - if (m2Status == MemberStatus.Exiting) return m2; - if (m1Status == MemberStatus.Leaving) return m1; - if (m2Status == MemberStatus.Leaving) return m2; - if (m1Status == MemberStatus.Joining) return m2; - if (m2Status == MemberStatus.Joining) return m1; - if (m1Status == MemberStatus.WeaklyUp) return m2; - if (m2Status == MemberStatus.WeaklyUp) return m1; - return m1; + return (m1.Status, m2.Status) switch + { + (MemberStatus.Removed, _) => m1, + (_, MemberStatus.Removed) => m2, + (MemberStatus.ReadyForShutdown, _) => m1, + (_, MemberStatus.ReadyForShutdown) => m2, + (MemberStatus.Down, _) => m1, + (_, MemberStatus.Down) => m2, + (MemberStatus.Exiting, _) => m1, + (_, MemberStatus.Exiting) => m2, + (MemberStatus.Leaving, _) => m1, + (_, MemberStatus.Leaving) => m2, + (MemberStatus.Joining, _) => m1, + (_, MemberStatus.Joining) => m2, + (MemberStatus.WeaklyUp, _) => m1, + (_, MemberStatus.WeaklyUp) => m2, + (MemberStatus.PreparingForShutdown, _) => m1, + (_, MemberStatus.PreparingForShutdown) => m2, + (MemberStatus.Up, MemberStatus.Up) => m1, + _ => throw new InvalidOperationException($"Should never reach this line. m1.Status: {m1.Status}, m2.Status: {m2.Status}") + }; } /// @@ -413,10 +419,12 @@ public static Member HighestPriorityOf(Member m1, Member m2) { {MemberStatus.Joining, ImmutableHashSet.Create(MemberStatus.WeaklyUp, MemberStatus.Up,MemberStatus.Leaving, MemberStatus.Down, MemberStatus.Removed)}, {MemberStatus.WeaklyUp, ImmutableHashSet.Create(MemberStatus.Up, MemberStatus.Leaving, MemberStatus.Down, MemberStatus.Removed) }, - {MemberStatus.Up, ImmutableHashSet.Create(MemberStatus.Leaving, MemberStatus.Down, MemberStatus.Removed)}, + {MemberStatus.Up, ImmutableHashSet.Create(MemberStatus.Leaving, MemberStatus.Down, MemberStatus.Removed, MemberStatus.PreparingForShutdown)}, {MemberStatus.Leaving, ImmutableHashSet.Create(MemberStatus.Exiting, MemberStatus.Down, MemberStatus.Removed)}, {MemberStatus.Down, ImmutableHashSet.Create(MemberStatus.Removed)}, {MemberStatus.Exiting, ImmutableHashSet.Create(MemberStatus.Removed, MemberStatus.Down)}, + {MemberStatus.PreparingForShutdown, ImmutableHashSet.Create(MemberStatus.ReadyForShutdown, MemberStatus.Removed, MemberStatus.Leaving, MemberStatus.Down)}, + {MemberStatus.ReadyForShutdown, ImmutableHashSet.Create(MemberStatus.Removed, MemberStatus.Leaving, MemberStatus.Down)}, {MemberStatus.Removed, ImmutableHashSet.Create()} }.ToImmutableDictionary(); } @@ -457,6 +465,8 @@ public enum MemberStatus /// because cluster convergence cannot be reached i.e. because of unreachable nodes. /// WeaklyUp = 6, + PreparingForShutdown = 7, + ReadyForShutdown = 8 } /// diff --git a/src/core/Akka.Cluster/MembershipState.cs b/src/core/Akka.Cluster/MembershipState.cs index 196f2a6721a..986294b6ae7 100644 --- a/src/core/Akka.Cluster/MembershipState.cs +++ b/src/core/Akka.Cluster/MembershipState.cs @@ -20,10 +20,10 @@ namespace Akka.Cluster internal sealed class MembershipState : IEquatable { private static readonly ImmutableHashSet LeaderMemberStatus = - ImmutableHashSet.Create(MemberStatus.Up, MemberStatus.Leaving); + ImmutableHashSet.Create(MemberStatus.Up, MemberStatus.Leaving, MemberStatus.PreparingForShutdown, MemberStatus.ReadyForShutdown); private static readonly ImmutableHashSet ConvergenceMemberStatus = - ImmutableHashSet.Create(MemberStatus.Up, MemberStatus.Leaving); + ImmutableHashSet.Create(MemberStatus.Up, MemberStatus.Leaving, MemberStatus.PreparingForShutdown, MemberStatus.ReadyForShutdown); /// /// If there are unreachable members in the cluster with any of these statuses, they will be skipped during convergence checks. @@ -37,6 +37,13 @@ internal sealed class MembershipState : IEquatable public static readonly ImmutableHashSet RemoveUnreachableWithMemberStatus = ImmutableHashSet.Create(MemberStatus.Down, MemberStatus.Exiting); + // If a member hasn't joined yet or has already started leaving don't mark it as shutting down + public static readonly ImmutableHashSet AllowedToPrepareToShutdown = + ImmutableHashSet.Create(MemberStatus.Up); + + public static readonly ImmutableHashSet PrepareForShutdownStates = + ImmutableHashSet.Create(MemberStatus.PreparingForShutdown, MemberStatus.ReadyForShutdown); + public MembershipState(Gossip latestGossip, UniqueAddress selfUniqueAddress) { LatestGossip = latestGossip; diff --git a/src/core/Akka.Cluster/Serialization/ClusterMessageSerializer.cs b/src/core/Akka.Cluster/Serialization/ClusterMessageSerializer.cs index 867665e11a4..c5e00f99691 100644 --- a/src/core/Akka.Cluster/Serialization/ClusterMessageSerializer.cs +++ b/src/core/Akka.Cluster/Serialization/ClusterMessageSerializer.cs @@ -35,6 +35,7 @@ public class ClusterMessageSerializer : SerializerWithStringManifest internal const string WelcomeManifest = "Akka.Cluster.InternalClusterAction+Welcome, Akka.Cluster"; internal const string LeaveManifest = "Akka.Cluster.ClusterUserAction+Leave, Akka.Cluster"; internal const string DownManifest = "Akka.Cluster.ClusterUserAction+Down, Akka.Cluster"; + internal const string PrepareForShutdownManifest = "PS"; internal const string InitJoinManifest = "Akka.Cluster.InternalClusterAction+InitJoin, Akka.Cluster"; @@ -184,6 +185,8 @@ public override string Manifest(object o) return GossipEnvelopeManifest; case ClusterRouterPool _: return ClusterRouterPoolManifest; + case ClusterUserAction.PrepareForShutdown: + return PrepareForShutdownManifest; default: throw new ArgumentException($"Can't serialize object of type [{o.GetType()}] in [{GetType()}]"); } diff --git a/src/protobuf/ClusterMessages.proto b/src/protobuf/ClusterMessages.proto index 4206c0f3c21..7b5ce151ca3 100644 --- a/src/protobuf/ClusterMessages.proto +++ b/src/protobuf/ClusterMessages.proto @@ -146,6 +146,8 @@ message Member { Down = 4; Removed = 5; WeaklyUp = 6; + PreparingForShutdown = 7; + ReadyForShutdown = 8; } int32 addressIndex = 1;