Skip to content

Commit 8e2c3a5

Browse files
committed
Ensure that gossip state for LEFT nodes is expired eventually
By default the expiry time is calculated on each peer independently. It can be made to converge by disabling gossip quarantine using the configuration setting `gossip_quarantine_disabled` or via a hotprop on GossiperMBean. Patch by Sam Tunnicliffe; reviewed by XXX for CASSANDRA-21035
1 parent e621d8a commit 8e2c3a5

22 files changed

+564
-46
lines changed

src/java/org/apache/cassandra/config/Config.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1517,4 +1517,29 @@ public enum CQLStartTime
15171517
public boolean enforce_native_deadline_for_hints = false;
15181518

15191519
public boolean paxos_repair_race_wait = true;
1520+
1521+
/**
1522+
* If true, gossip state updates for nodes which have left the cluster will continue to be processed while the
1523+
* node is still present in ClusterMetadata. This enables the gossip expiry time for those nodes (the deadline
1524+
* after which their state is fully purged from gossip) to converge across the remaining nodes in the cluster.
1525+
* This is a change from previous behaviour as historically once a node has advertised a LEFT status further
1526+
* updates to gossip state for it are ignored for a period of time to prevent flapping if older/stale states
1527+
* are encountered.
1528+
* Following CEP-21, most significant state changes are handled by the cluster metadata log, so resurrection
1529+
* of left nodes is not a problem for gossip to solve and so quarantine is not really necessary. However,
1530+
* FailureDetector does still use gossip messages to assess node health and some external systems still use gossip
1531+
* state to inform decisions about topology/node health/etc. For those reasons, for now the disabling of quarantine
1532+
* is off by default and hot-proppable.
1533+
*
1534+
* With quarantine still in effect, expiry from gossip of LEFT nodes will occur at different times on each peer.
1535+
* Also, when there are LEFT nodes in gossip, the state will never fully converge across the cluster as each node
1536+
* will have its own expiry time for a LEFT peer.
1537+
*
1538+
* With quarantine disabled the STATUS_WITH_PORT values for the left node which include the expiry time will
1539+
* converge and peers will all evict it from gossip after the same deadline.
1540+
*
1541+
* Eventually, this configuration option should be removed and quarantine disabled entirely for clusters running
1542+
* 6.0 and later.
1543+
*/
1544+
public volatile boolean gossip_quarantine_disabled = false;
15201545
}

src/java/org/apache/cassandra/config/DatabaseDescriptor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6093,4 +6093,14 @@ public static void setPartitioner(String name)
60936093
{
60946094
partitioner = FBUtilities.newPartitioner(name);
60956095
}
6096+
6097+
public static boolean getGossipQuarantineDisabled()
6098+
{
6099+
return conf.gossip_quarantine_disabled;
6100+
}
6101+
6102+
public static void setGossipQuarantineDisabled(boolean disabled)
6103+
{
6104+
conf.gossip_quarantine_disabled = disabled;
6105+
}
60966106
}

src/java/org/apache/cassandra/gms/EndpointState.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ public CassandraVersion getReleaseVersion()
324324
public String toString()
325325
{
326326
View view = ref.get();
327-
return "EndpointState: HeartBeatState = " + view.hbState + ", AppStateMap = " + view.applicationState;
327+
return "EndpointState: HeartBeatState = " + view.hbState + ", AppStateMap = " + view.applicationState + ", isAlive = " + isAlive;
328328
}
329329

330330
public boolean isSupersededBy(EndpointState that)

src/java/org/apache/cassandra/gms/Gossiper.java

Lines changed: 150 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
import org.apache.cassandra.net.RequestCallback;
8888
import org.apache.cassandra.service.StorageService;
8989
import org.apache.cassandra.tcm.ClusterMetadata;
90+
import org.apache.cassandra.tcm.Epoch;
9091
import org.apache.cassandra.tcm.membership.NodeId;
9192
import org.apache.cassandra.tcm.membership.NodeState;
9293
import org.apache.cassandra.utils.FBUtilities;
@@ -106,6 +107,11 @@
106107
import static org.apache.cassandra.gms.Gossiper.GossipedWith.CMS;
107108
import static org.apache.cassandra.gms.Gossiper.GossipedWith.SEED;
108109
import static org.apache.cassandra.gms.VersionedValue.BOOTSTRAPPING_STATUS;
110+
import static org.apache.cassandra.gms.VersionedValue.HIBERNATE;
111+
import static org.apache.cassandra.gms.VersionedValue.REMOVED_TOKEN;
112+
import static org.apache.cassandra.gms.VersionedValue.REMOVING_TOKEN;
113+
import static org.apache.cassandra.gms.VersionedValue.SHUTDOWN;
114+
import static org.apache.cassandra.gms.VersionedValue.STATUS_LEFT;
109115
import static org.apache.cassandra.gms.VersionedValue.unsafeMakeVersionedValue;
110116
import static org.apache.cassandra.net.NoPayload.noPayload;
111117
import static org.apache.cassandra.net.Verb.ECHO_REQ;
@@ -136,8 +142,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean,
136142
private static final ScheduledExecutorPlus executor = executorFactory().scheduled("GossipTasks");
137143

138144
static final ApplicationState[] STATES = ApplicationState.values();
139-
static final List<String> DEAD_STATES = Arrays.asList(VersionedValue.REMOVING_TOKEN, VersionedValue.REMOVED_TOKEN,
140-
VersionedValue.STATUS_LEFT, VersionedValue.HIBERNATE);
145+
static final List<String> DEAD_STATES = Arrays.asList(REMOVING_TOKEN, REMOVED_TOKEN, STATUS_LEFT, HIBERNATE);
141146
static ArrayList<String> SILENT_SHUTDOWN_STATES = new ArrayList<>();
142147
static
143148
{
@@ -184,7 +189,15 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean,
184189
/* map where key is endpoint and value is timestamp when this endpoint was removed from
185190
* gossip. We will ignore any gossip regarding these endpoints for QUARANTINE_DELAY time
186191
* after removal to prevent nodes from falsely reincarnating during the time when removal
187-
* gossip gets propagated to all nodes */
192+
* gossip gets propagated to all nodes.
193+
* Note: in future, this need only be used when ClusterMetadataService is in the GOSSIP state,
194+
* i.e. during the major upgrade to the version with CEP-21, but before the CMS is initialized.
195+
* In this state, gossip is still used to propagate changes to broadcast address and release
196+
* version. Once the CMS initialization is complete, this is no longer necessary.
197+
* Currently in order to support a controlled rollout of that change to behaviour, quarantine
198+
* is still used by default, but can be disabled via config (gossip_quarantine_disabled) or
199+
* JMX (GossiperMBean::setQuarantineDisabled)
200+
*/
188201
private final Map<InetAddressAndPort, Long> justRemovedEndpoints = new ConcurrentHashMap<>();
189202

190203
private final Map<InetAddressAndPort, Long> expireTimeEndpointMap = new ConcurrentHashMap<>();
@@ -444,14 +457,7 @@ private static boolean isShutdown(EndpointState epState)
444457

445458
public static boolean isShutdown(VersionedValue vv)
446459
{
447-
if (vv == null)
448-
return false;
449-
450-
String value = vv.value;
451-
String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1);
452-
assert (pieces.length > 0);
453-
String state = pieces[0];
454-
return state.equals(VersionedValue.SHUTDOWN);
460+
return matchesStatusString(vv, SHUTDOWN);
455461
}
456462

457463
public static boolean isHibernate(EndpointState epState)
@@ -463,15 +469,39 @@ public static boolean isHibernate(EndpointState epState)
463469
}
464470

465471
public static boolean isHibernate(VersionedValue vv)
472+
{
473+
return matchesStatusString(vv, HIBERNATE);
474+
}
475+
476+
public static boolean isLeft(VersionedValue vv)
477+
{
478+
return matchesStatusString(vv, STATUS_LEFT);
479+
}
480+
481+
private static boolean matchesStatusString(VersionedValue vv, String toMatch)
466482
{
467483
if (vv == null)
468484
return false;
469485

470-
String value = vv.value;
471-
String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1);
486+
String[] pieces = vv.splitValue();
472487
assert (pieces.length > 0);
473488
String state = pieces[0];
474-
return state.equals(VersionedValue.HIBERNATE);
489+
return state.equals(toMatch);
490+
}
491+
492+
public static long extractExpireTime(String[] pieces)
493+
{
494+
if (pieces.length < 3)
495+
return 0L;
496+
try
497+
{
498+
return Long.parseLong(pieces[2]);
499+
}
500+
catch (NumberFormatException e)
501+
{
502+
logger.debug("Invalid value found for expire time ({}), ignoring", pieces[2]);
503+
return 0L;
504+
}
475505
}
476506

477507
public static void runInGossipStageBlocking(Runnable runnable)
@@ -690,10 +720,21 @@ private void quarantineEndpoint(InetAddressAndPort endpoint, long quarantineExpi
690720
{
691721
if (disableEndpointRemoval)
692722
return;
723+
724+
// Quarantine is only necessary while upgrading from gossip-driven management of cluster metadata
725+
if (getQuarantineDisabled() && ClusterMetadata.current().epoch.isAfter(Epoch.UPGRADE_GOSSIP))
726+
return;
727+
693728
justRemovedEndpoints.put(endpoint, quarantineExpiration);
694729
GossiperDiagnostics.quarantinedEndpoint(this, endpoint, quarantineExpiration);
695730
}
696731

732+
public void clearQuarantinedEndpoints()
733+
{
734+
logger.info("Clearing quarantined endpoints");
735+
justRemovedEndpoints.clear();
736+
}
737+
697738
/**
698739
* The gossip digest is built based on randomization
699740
* rather than just looping through the collection of live endpoints.
@@ -938,15 +979,14 @@ void doStatusCheck()
938979
}
939980

940981
// check for dead state removal
941-
long expireTime = getExpireTimeForEndpoint(endpoint);
942-
if (!epState.isAlive() && (now > expireTime)
943-
&& (!metadata.directory.allAddresses().contains(endpoint)))
982+
if (!epState.isAlive() && (!metadata.directory.allJoinedEndpoints().contains(endpoint)))
944983
{
945-
if (logger.isDebugEnabled())
984+
long expireTime = getExpireTimeForEndpoint(endpoint);
985+
if (now > expireTime)
946986
{
947-
logger.debug("time is expiring for endpoint : {} ({})", endpoint, expireTime);
987+
logger.info("Reached gossip expiry time for endpoint : {} ({})", endpoint, expireTime);
988+
runInGossipStageBlocking(() -> evictFromMembership(endpoint));
948989
}
949-
runInGossipStageBlocking(() -> evictFromMembership(endpoint));
950990
}
951991
}
952992
}
@@ -1887,11 +1927,15 @@ public int getCurrentGenerationNumber(String address) throws UnknownHostExceptio
18871927

18881928
public void addExpireTimeForEndpoint(InetAddressAndPort endpoint, long expireTime)
18891929
{
1890-
if (logger.isDebugEnabled())
1930+
if (expireTime == 0L)
1931+
{
1932+
logger.debug("Supplied expire time for {} was 0, not recording", endpoint);
1933+
}
1934+
else
18911935
{
18921936
logger.debug("adding expire time for endpoint : {} ({})", endpoint, expireTime);
1937+
expireTimeEndpointMap.put(endpoint, expireTime);
18931938
}
1894-
expireTimeEndpointMap.put(endpoint, expireTime);
18951939
}
18961940

18971941
public static long computeExpireTime()
@@ -2094,6 +2138,50 @@ public void unsafeSendLocalEndpointStateTo(InetAddressAndPort ep)
20942138
MessagingService.instance().send(message, ep);
20952139
}
20962140

2141+
public void unsafeBroadcastLeftStatus(InetAddressAndPort left,
2142+
Collection<Token> tokens,
2143+
Iterable<InetAddressAndPort> sendTo)
2144+
{
2145+
runInGossipStageBlocking(() -> {
2146+
EndpointState epState = endpointStateMap.get(left);
2147+
if (epState == null)
2148+
{
2149+
logger.info("No gossip state for node {}", left);
2150+
return;
2151+
}
2152+
2153+
NodeState state = ClusterMetadata.current().directory.peerState(left);
2154+
if (state != NodeState.LEFT)
2155+
{
2156+
logger.info("Node Status for {} is not LEFT ({})", left, state);
2157+
return;
2158+
}
2159+
2160+
EndpointState toSend = new EndpointState(epState);
2161+
toSend.forceNewerGenerationUnsafe();
2162+
toSend.markDead();
2163+
VersionedValue value = StorageService.instance.valueFactory.left(tokens, computeExpireTime());
2164+
2165+
if (left.equals(getBroadcastAddressAndPort()))
2166+
{
2167+
// Adding local state bumps the value's version. To keep this consistent across
2168+
// the cluster, re-fetch it before broadcasting.
2169+
Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS_WITH_PORT, value);
2170+
value = Gossiper.instance.endpointStateMap.get(getBroadcastAddressAndPort())
2171+
.getApplicationState(ApplicationState.STATUS_WITH_PORT);
2172+
}
2173+
2174+
toSend.addApplicationState(ApplicationState.STATUS_WITH_PORT, value);
2175+
GossipDigestAck2 payload = new GossipDigestAck2(Collections.singletonMap(left, toSend));
2176+
logger.info("Sending app state with status {} to {}", value.value, sendTo);
2177+
for (InetAddressAndPort ep : sendTo)
2178+
{
2179+
Message<GossipDigestAck2> message = Message.out(Verb.GOSSIP_DIGEST_ACK2, payload);
2180+
MessagingService.instance().send(message, ep);
2181+
}
2182+
});
2183+
}
2184+
20972185
private void unsafeUpdateEpStates(InetAddressAndPort endpoint, EndpointState epstate)
20982186
{
20992187
checkProperThreadForStateMutation();
@@ -2111,6 +2199,10 @@ private void unsafeUpdateEpStates(InetAddressAndPort endpoint, EndpointState eps
21112199
if (epstate.getHeartBeatState().getGeneration() > 0 &&
21122200
(old == null || old.getHeartBeatState().getGeneration() < epstate.getHeartBeatState().getGeneration()))
21132201
handleMajorStateChange(endpoint, epstate);
2202+
2203+
// mark dead if the supplied epstate said so
2204+
if (isDeadState(epstate))
2205+
markDead(endpoint, old == null ? epstate : old);
21142206
}
21152207
}
21162208

@@ -2211,9 +2303,31 @@ private void mergeNodeToGossip(NodeId nodeId, ClusterMetadata metadata, Collecti
22112303
newValue = valueFactory.hibernate(true);
22122304
break;
22132305
}
2306+
22142307
if (isLocal && !StorageService.instance.shouldJoinRing())
22152308
break;
2216-
newValue = GossipHelper.nodeStateToStatus(nodeId, metadata, tokens, valueFactory, oldValue);
2309+
2310+
// If quarantine has been disabled and we have already seen a LEFT status for a remote peer
2311+
// which originated from the peer itself or the node which coordinated its removal (and so
2312+
// has a version > 0), keep it as this is how we ensure the gossip expiry time encoded in
2313+
// the status string converges across peers.
2314+
// Should a node leave and then rejoin after resetting its local state (i.e. wipe and
2315+
// rejoin), it is automatically unregistered which removes all gossip state for it so there
2316+
// will be no oldValue in that case.
2317+
//
2318+
// Note: don't reorder these conditions as isLeft includes a null check
2319+
if (getQuarantineDisabled() && !isLocal && Gossiper.isLeft(oldValue) && oldValue.version > 0)
2320+
{
2321+
logger.debug("Already seen a LEFT status for {} with a non-zero version, " +
2322+
"dropping derived value {}", endpoint, newValue);
2323+
newValue = oldValue;
2324+
}
2325+
else
2326+
{
2327+
newValue = GossipHelper.nodeStateToStatus(nodeId, metadata, tokens, valueFactory, oldValue);
2328+
if (Gossiper.isLeft(newValue))
2329+
Gossiper.instance.addExpireTimeForEndpoint(endpoint, Gossiper.extractExpireTime(newValue.splitValue()));
2330+
}
22172331
break;
22182332
default:
22192333
newValue = oldValue;
@@ -2259,4 +2373,17 @@ public void triggerRoundWithCMS()
22592373
sendGossip(message, cms);
22602374
}
22612375
}
2376+
2377+
@Override
2378+
public boolean getQuarantineDisabled()
2379+
{
2380+
return DatabaseDescriptor.getGossipQuarantineDisabled();
2381+
}
2382+
2383+
@Override
2384+
public void setQuarantineDisabled(boolean enabled)
2385+
{
2386+
logger.info("Setting gossip_quarantine_disabled: {}", enabled);
2387+
DatabaseDescriptor.setGossipQuarantineDisabled(enabled);
2388+
}
22622389
}

src/java/org/apache/cassandra/gms/GossiperMBean.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,8 @@ public interface GossiperMBean
4343
public boolean getLooseEmptyEnabled();
4444

4545
public void setLooseEmptyEnabled(boolean enabled);
46+
47+
public boolean getQuarantineDisabled();
48+
49+
public void setQuarantineDisabled(boolean disabled);
4650
}

src/java/org/apache/cassandra/gms/VersionedValue.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,11 @@ public byte[] toBytes()
159159
return value.getBytes(ISO_8859_1);
160160
}
161161

162+
public String[] splitValue()
163+
{
164+
return value.split(DELIMITER_STR, -1);
165+
}
166+
162167
private static String versionString(String... args)
163168
{
164169
return StringUtils.join(args, VersionedValue.DELIMITER);

src/java/org/apache/cassandra/service/StorageService.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2153,6 +2153,12 @@ public void onChange(InetAddressAndPort endpoint, ApplicationState state, Versio
21532153
Gossiper.instance.markDead(endpoint, epState);
21542154
});
21552155
}
2156+
else if (Gossiper.isLeft(value))
2157+
{
2158+
long expireTime = Gossiper.extractExpireTime(value.splitValue());
2159+
logger.info("Node state LEFT detected, setting or updating expire time {}", expireTime);
2160+
Gossiper.instance.addExpireTimeForEndpoint(endpoint, expireTime);
2161+
}
21562162
}
21572163

21582164
if (epState == null || Gossiper.instance.isDeadState(epState))
@@ -2202,7 +2208,7 @@ public void onChange(InetAddressAndPort endpoint, ApplicationState state, Versio
22022208
updateNetVersion(endpoint, value);
22032209
break;
22042210
case STATUS_WITH_PORT:
2205-
String[] pieces = splitValue(value);
2211+
String[] pieces = value.splitValue();
22062212
String moveName = pieces[0];
22072213
if (moveName.equals(VersionedValue.SHUTDOWN))
22082214
logger.info("Node {} state jump to shutdown", endpoint);
@@ -2221,11 +2227,6 @@ else if (moveName.equals(VersionedValue.STATUS_NORMAL))
22212227
}
22222228
}
22232229

2224-
private static String[] splitValue(VersionedValue value)
2225-
{
2226-
return value.value.split(VersionedValue.DELIMITER_STR, -1);
2227-
}
2228-
22292230
public static void updateIndexStatus(InetAddressAndPort endpoint, VersionedValue versionedValue)
22302231
{
22312232
IndexStatusManager.instance.receivePeerIndexStatus(endpoint, versionedValue);

src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,7 @@ public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next, boolean
6565
next.tokenMap.lastModified().equals(prev.tokenMap.lastModified()))
6666
return;
6767

68-
Set<InetAddressAndPort> removedAddr = Sets.difference(new HashSet<>(prev.directory.allAddresses()),
69-
new HashSet<>(next.directory.allAddresses()));
68+
Set<InetAddressAndPort> removedAddr = Sets.difference(prev.directory.allAddresses(), next.directory.allAddresses());
7069

7170
Set<NodeId> changed = new HashSet<>();
7271
for (NodeId node : next.directory.peerIds())

0 commit comments

Comments
 (0)