Skip to content

Commit c25f1c9

Browse files
committed
Ensure that gossip state for LEFT nodes is expired eventually
By default the expiry time is calculated on each peer independently. It can be made to converge by disabling gossip quarantine using the configuration setting `gossip_quarantine_disabled` or via a hotprop on GossiperMBean. Patch by Sam Tunnicliffe; reviewed by XXX for CASSANDRA-21035
1 parent 4b0df8a commit c25f1c9

22 files changed

+564
-46
lines changed

src/java/org/apache/cassandra/config/Config.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1520,4 +1520,29 @@ public enum CQLStartTime
15201520
public boolean enforce_native_deadline_for_hints = false;
15211521

15221522
public boolean paxos_repair_race_wait = true;
1523+
1524+
/**
1525+
* If true, gossip state updates for nodes which have left the cluster will continue to be processed while the
1526+
* node is still present in ClusterMetadata. This enables the gossip expiry time for those nodes (the deadline
1527+
* after which their state is fully purged from gossip) to converge across the remaining nodes in the cluster.
1528+
* This is a change from previous behaviour as historically once a node has advertised a LEFT status further
1529+
* updates to gossip state for it are ignored for a period of time to prevent flapping if older/stale states
1530+
* are encountered.
1531+
* Following CEP-21, most significant state changes are handled by the cluster metadata log, so resurrection
1532+
* of left nodes is not a problem for gossip to solve and so quarantine is not really necessary. However,
1533+
* FailureDetector does still use gossip messages to assess node health and some external systems still use gossip
1534+
* state to inform decisions about topology/node health/etc. For those reasons, for now the disabling of quarantine
1535+
* is off by default and hot-proppable.
1536+
*
1537+
* With quarantine still in effect, expiry from gossip of LEFT nodes will occur at different times on each peer.
1538+
* Also, when there are LEFT nodes in gossip, the state will never fully converge across the cluster as each node
1539+
* will have its own expiry time for a LEFT peer.
1540+
*
1541+
* With quarantine disabled the STATUS_WITH_PORT values for the left node which include the expiry time will
1542+
* converge and peers will all evict it from gossip after the same deadline.
1543+
*
1544+
* Eventually, this configuration option should be removed and quarantine disabled entirely for clusters running
1545+
* 6.0 and later.
1546+
*/
1547+
public volatile boolean gossip_quarantine_disabled = false;
15231548
}

src/java/org/apache/cassandra/config/DatabaseDescriptor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6103,4 +6103,14 @@ public static void setPartitioner(String name)
61036103
{
61046104
partitioner = FBUtilities.newPartitioner(name);
61056105
}
6106+
6107+
public static boolean getGossipQuarantineDisabled()
6108+
{
6109+
return conf.gossip_quarantine_disabled;
6110+
}
6111+
6112+
public static void setGossipQuarantineDisabled(boolean disabled)
6113+
{
6114+
conf.gossip_quarantine_disabled = disabled;
6115+
}
61066116
}

src/java/org/apache/cassandra/gms/EndpointState.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ public CassandraVersion getReleaseVersion()
324324
public String toString()
325325
{
326326
View view = ref.get();
327-
return "EndpointState: HeartBeatState = " + view.hbState + ", AppStateMap = " + view.applicationState;
327+
return "EndpointState: HeartBeatState = " + view.hbState + ", AppStateMap = " + view.applicationState + ", isAlive = " + isAlive;
328328
}
329329

330330
public boolean isSupersededBy(EndpointState that)

src/java/org/apache/cassandra/gms/Gossiper.java

Lines changed: 150 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
import org.apache.cassandra.net.RequestCallback;
8888
import org.apache.cassandra.service.StorageService;
8989
import org.apache.cassandra.tcm.ClusterMetadata;
90+
import org.apache.cassandra.tcm.Epoch;
9091
import org.apache.cassandra.tcm.membership.NodeId;
9192
import org.apache.cassandra.tcm.membership.NodeState;
9293
import org.apache.cassandra.utils.FBUtilities;
@@ -106,6 +107,11 @@
106107
import static org.apache.cassandra.gms.Gossiper.GossipedWith.CMS;
107108
import static org.apache.cassandra.gms.Gossiper.GossipedWith.SEED;
108109
import static org.apache.cassandra.gms.VersionedValue.BOOTSTRAPPING_STATUS;
110+
import static org.apache.cassandra.gms.VersionedValue.HIBERNATE;
111+
import static org.apache.cassandra.gms.VersionedValue.REMOVED_TOKEN;
112+
import static org.apache.cassandra.gms.VersionedValue.REMOVING_TOKEN;
113+
import static org.apache.cassandra.gms.VersionedValue.SHUTDOWN;
114+
import static org.apache.cassandra.gms.VersionedValue.STATUS_LEFT;
109115
import static org.apache.cassandra.gms.VersionedValue.unsafeMakeVersionedValue;
110116
import static org.apache.cassandra.net.NoPayload.noPayload;
111117
import static org.apache.cassandra.net.Verb.ECHO_REQ;
@@ -136,8 +142,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean,
136142
private static final ScheduledExecutorPlus executor = executorFactory().scheduled("GossipTasks");
137143

138144
static final ApplicationState[] STATES = ApplicationState.values();
139-
static final List<String> DEAD_STATES = Arrays.asList(VersionedValue.REMOVING_TOKEN, VersionedValue.REMOVED_TOKEN,
140-
VersionedValue.STATUS_LEFT, VersionedValue.HIBERNATE);
145+
static final List<String> DEAD_STATES = Arrays.asList(REMOVING_TOKEN, REMOVED_TOKEN, STATUS_LEFT, HIBERNATE);
141146
static ArrayList<String> SILENT_SHUTDOWN_STATES = new ArrayList<>();
142147
static
143148
{
@@ -184,7 +189,15 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean,
184189
/* map where key is endpoint and value is timestamp when this endpoint was removed from
185190
* gossip. We will ignore any gossip regarding these endpoints for QUARANTINE_DELAY time
186191
* after removal to prevent nodes from falsely reincarnating during the time when removal
187-
* gossip gets propagated to all nodes */
192+
* gossip gets propagated to all nodes.
193+
* Note: in future, this need only be used when ClusterMetadataService is in the GOSSIP state,
194+
* i.e. during the major upgrade to the version with CEP-21, but before the CMS is initialized.
195+
* In this state, gossip is still used to propagate changes to broadcast address and release
196+
* version. Once the CMS initialization is complete, this is no longer necessary.
197+
* Currently in order to support a controlled rollout of that change to behaviour, quarantine
198+
* is still used by default, but can be disabled via config (gossip_quarantine_disabled) or
199+
* JMX (GossiperMBean::setQuarantineDisabled)
200+
*/
188201
private final Map<InetAddressAndPort, Long> justRemovedEndpoints = new ConcurrentHashMap<>();
189202

190203
private final Map<InetAddressAndPort, Long> expireTimeEndpointMap = new ConcurrentHashMap<>();
@@ -449,14 +462,7 @@ private static boolean isShutdown(EndpointState epState)
449462

450463
public static boolean isShutdown(VersionedValue vv)
451464
{
452-
if (vv == null)
453-
return false;
454-
455-
String value = vv.value;
456-
String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1);
457-
assert (pieces.length > 0);
458-
String state = pieces[0];
459-
return state.equals(VersionedValue.SHUTDOWN);
465+
return matchesStatusString(vv, SHUTDOWN);
460466
}
461467

462468
public static boolean isHibernate(EndpointState epState)
@@ -468,15 +474,39 @@ public static boolean isHibernate(EndpointState epState)
468474
}
469475

470476
public static boolean isHibernate(VersionedValue vv)
477+
{
478+
return matchesStatusString(vv, HIBERNATE);
479+
}
480+
481+
public static boolean isLeft(VersionedValue vv)
482+
{
483+
return matchesStatusString(vv, STATUS_LEFT);
484+
}
485+
486+
private static boolean matchesStatusString(VersionedValue vv, String toMatch)
471487
{
472488
if (vv == null)
473489
return false;
474490

475-
String value = vv.value;
476-
String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1);
491+
String[] pieces = vv.splitValue();
477492
assert (pieces.length > 0);
478493
String state = pieces[0];
479-
return state.equals(VersionedValue.HIBERNATE);
494+
return state.equals(toMatch);
495+
}
496+
497+
public static long extractExpireTime(String[] pieces)
498+
{
499+
if (pieces.length < 3)
500+
return 0L;
501+
try
502+
{
503+
return Long.parseLong(pieces[2]);
504+
}
505+
catch (NumberFormatException e)
506+
{
507+
logger.debug("Invalid value found for expire time ({}), ignoring", pieces[2]);
508+
return 0L;
509+
}
480510
}
481511

482512
public static void runInGossipStageBlocking(Runnable runnable)
@@ -695,10 +725,21 @@ private void quarantineEndpoint(InetAddressAndPort endpoint, long quarantineExpi
695725
{
696726
if (disableEndpointRemoval)
697727
return;
728+
729+
// Quarantine is only necessary while upgrading from gossip-driven management of cluster metadata
730+
if (getQuarantineDisabled() && ClusterMetadata.current().epoch.isAfter(Epoch.UPGRADE_GOSSIP))
731+
return;
732+
698733
justRemovedEndpoints.put(endpoint, quarantineExpiration);
699734
GossiperDiagnostics.quarantinedEndpoint(this, endpoint, quarantineExpiration);
700735
}
701736

737+
public void clearQuarantinedEndpoints()
738+
{
739+
logger.info("Clearing quarantined endpoints");
740+
justRemovedEndpoints.clear();
741+
}
742+
702743
/**
703744
* The gossip digest is built based on randomization
704745
* rather than just looping through the collection of live endpoints.
@@ -943,15 +984,14 @@ void doStatusCheck()
943984
}
944985

945986
// check for dead state removal
946-
long expireTime = getExpireTimeForEndpoint(endpoint);
947-
if (!epState.isAlive() && (now > expireTime)
948-
&& (!metadata.directory.allAddresses().contains(endpoint)))
987+
if (!epState.isAlive() && (!metadata.directory.allJoinedEndpoints().contains(endpoint)))
949988
{
950-
if (logger.isDebugEnabled())
989+
long expireTime = getExpireTimeForEndpoint(endpoint);
990+
if (now > expireTime)
951991
{
952-
logger.debug("time is expiring for endpoint : {} ({})", endpoint, expireTime);
992+
logger.info("Reached gossip expiry time for endpoint : {} ({})", endpoint, expireTime);
993+
runInGossipStageBlocking(() -> evictFromMembership(endpoint));
953994
}
954-
runInGossipStageBlocking(() -> evictFromMembership(endpoint));
955995
}
956996
}
957997
}
@@ -1892,11 +1932,15 @@ public int getCurrentGenerationNumber(String address) throws UnknownHostExceptio
18921932

18931933
public void addExpireTimeForEndpoint(InetAddressAndPort endpoint, long expireTime)
18941934
{
1895-
if (logger.isDebugEnabled())
1935+
if (expireTime == 0L)
1936+
{
1937+
logger.debug("Supplied expire time for {} was 0, not recording", endpoint);
1938+
}
1939+
else
18961940
{
18971941
logger.debug("adding expire time for endpoint : {} ({})", endpoint, expireTime);
1942+
expireTimeEndpointMap.put(endpoint, expireTime);
18981943
}
1899-
expireTimeEndpointMap.put(endpoint, expireTime);
19001944
}
19011945

19021946
public static long computeExpireTime()
@@ -2099,6 +2143,50 @@ public void unsafeSendLocalEndpointStateTo(InetAddressAndPort ep)
20992143
MessagingService.instance().send(message, ep);
21002144
}
21012145

2146+
public void unsafeBroadcastLeftStatus(InetAddressAndPort left,
2147+
Collection<Token> tokens,
2148+
Iterable<InetAddressAndPort> sendTo)
2149+
{
2150+
runInGossipStageBlocking(() -> {
2151+
EndpointState epState = endpointStateMap.get(left);
2152+
if (epState == null)
2153+
{
2154+
logger.info("No gossip state for node {}", left);
2155+
return;
2156+
}
2157+
2158+
NodeState state = ClusterMetadata.current().directory.peerState(left);
2159+
if (state != NodeState.LEFT)
2160+
{
2161+
logger.info("Node Status for {} is not LEFT ({})", left, state);
2162+
return;
2163+
}
2164+
2165+
EndpointState toSend = new EndpointState(epState);
2166+
toSend.forceNewerGenerationUnsafe();
2167+
toSend.markDead();
2168+
VersionedValue value = StorageService.instance.valueFactory.left(tokens, computeExpireTime());
2169+
2170+
if (left.equals(getBroadcastAddressAndPort()))
2171+
{
2172+
// Adding local state bumps the value's version. To keep this consistent across
2173+
// the cluster, re-fetch it before broadcasting.
2174+
Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS_WITH_PORT, value);
2175+
value = Gossiper.instance.endpointStateMap.get(getBroadcastAddressAndPort())
2176+
.getApplicationState(ApplicationState.STATUS_WITH_PORT);
2177+
}
2178+
2179+
toSend.addApplicationState(ApplicationState.STATUS_WITH_PORT, value);
2180+
GossipDigestAck2 payload = new GossipDigestAck2(Collections.singletonMap(left, toSend));
2181+
logger.info("Sending app state with status {} to {}", value.value, sendTo);
2182+
for (InetAddressAndPort ep : sendTo)
2183+
{
2184+
Message<GossipDigestAck2> message = Message.out(Verb.GOSSIP_DIGEST_ACK2, payload);
2185+
MessagingService.instance().send(message, ep);
2186+
}
2187+
});
2188+
}
2189+
21022190
private void unsafeUpdateEpStates(InetAddressAndPort endpoint, EndpointState epstate)
21032191
{
21042192
checkProperThreadForStateMutation();
@@ -2116,6 +2204,10 @@ private void unsafeUpdateEpStates(InetAddressAndPort endpoint, EndpointState eps
21162204
if (epstate.getHeartBeatState().getGeneration() > 0 &&
21172205
(old == null || old.getHeartBeatState().getGeneration() < epstate.getHeartBeatState().getGeneration()))
21182206
handleMajorStateChange(endpoint, epstate);
2207+
2208+
// mark dead if the supplied epstate said so
2209+
if (isDeadState(epstate))
2210+
markDead(endpoint, old == null ? epstate : old);
21192211
}
21202212
}
21212213

@@ -2216,9 +2308,31 @@ private void mergeNodeToGossip(NodeId nodeId, ClusterMetadata metadata, Collecti
22162308
newValue = valueFactory.hibernate(true);
22172309
break;
22182310
}
2311+
22192312
if (isLocal && !StorageService.instance.shouldJoinRing())
22202313
break;
2221-
newValue = GossipHelper.nodeStateToStatus(nodeId, metadata, tokens, valueFactory, oldValue);
2314+
2315+
// If quarantine has been disabled and we have already seen a LEFT status for a remote peer
2316+
// which originated from the peer itself or the node which coordinated its removal (and so
2317+
// has a version > 0), keep it as this is how we ensure the gossip expiry time encoded in
2318+
// the status string converges across peers.
2319+
// Should a node leave and then rejoin after resetting its local state (i.e. wipe and
2320+
// rejoin), it is automatically unregistered which removes all gossip state for it so there
2321+
// will be no oldValue in that case.
2322+
//
2323+
// Note: don't reorder these conditions as isLeft includes a null check
2324+
if (getQuarantineDisabled() && !isLocal && Gossiper.isLeft(oldValue) && oldValue.version > 0)
2325+
{
2326+
logger.debug("Already seen a LEFT status for {} with a non-zero version, " +
2327+
"dropping derived value {}", endpoint, newValue);
2328+
newValue = oldValue;
2329+
}
2330+
else
2331+
{
2332+
newValue = GossipHelper.nodeStateToStatus(nodeId, metadata, tokens, valueFactory, oldValue);
2333+
if (Gossiper.isLeft(newValue))
2334+
Gossiper.instance.addExpireTimeForEndpoint(endpoint, Gossiper.extractExpireTime(newValue.splitValue()));
2335+
}
22222336
break;
22232337
default:
22242338
newValue = oldValue;
@@ -2264,4 +2378,17 @@ public void triggerRoundWithCMS()
22642378
sendGossip(message, cms);
22652379
}
22662380
}
2381+
2382+
@Override
2383+
public boolean getQuarantineDisabled()
2384+
{
2385+
return DatabaseDescriptor.getGossipQuarantineDisabled();
2386+
}
2387+
2388+
@Override
2389+
public void setQuarantineDisabled(boolean enabled)
2390+
{
2391+
logger.info("Setting gossip_quarantine_disabled: {}", enabled);
2392+
DatabaseDescriptor.setGossipQuarantineDisabled(enabled);
2393+
}
22672394
}

src/java/org/apache/cassandra/gms/GossiperMBean.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,8 @@ public interface GossiperMBean
4343
public boolean getLooseEmptyEnabled();
4444

4545
public void setLooseEmptyEnabled(boolean enabled);
46+
47+
public boolean getQuarantineDisabled();
48+
49+
public void setQuarantineDisabled(boolean disabled);
4650
}

src/java/org/apache/cassandra/gms/VersionedValue.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,11 @@ public byte[] toBytes()
159159
return value.getBytes(ISO_8859_1);
160160
}
161161

162+
public String[] splitValue()
163+
{
164+
return value.split(DELIMITER_STR, -1);
165+
}
166+
162167
private static String versionString(String... args)
163168
{
164169
return StringUtils.join(args, VersionedValue.DELIMITER);

src/java/org/apache/cassandra/service/StorageService.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2158,6 +2158,12 @@ public void onChange(InetAddressAndPort endpoint, ApplicationState state, Versio
21582158
Gossiper.instance.markDead(endpoint, epState);
21592159
});
21602160
}
2161+
else if (Gossiper.isLeft(value))
2162+
{
2163+
long expireTime = Gossiper.extractExpireTime(value.splitValue());
2164+
logger.info("Node state LEFT detected, setting or updating expire time {}", expireTime);
2165+
Gossiper.instance.addExpireTimeForEndpoint(endpoint, expireTime);
2166+
}
21612167
}
21622168

21632169
if (epState == null || Gossiper.instance.isDeadState(epState))
@@ -2207,7 +2213,7 @@ public void onChange(InetAddressAndPort endpoint, ApplicationState state, Versio
22072213
updateNetVersion(endpoint, value);
22082214
break;
22092215
case STATUS_WITH_PORT:
2210-
String[] pieces = splitValue(value);
2216+
String[] pieces = value.splitValue();
22112217
String moveName = pieces[0];
22122218
if (moveName.equals(VersionedValue.SHUTDOWN))
22132219
logger.info("Node {} state jump to shutdown", endpoint);
@@ -2226,11 +2232,6 @@ else if (moveName.equals(VersionedValue.STATUS_NORMAL))
22262232
}
22272233
}
22282234

2229-
private static String[] splitValue(VersionedValue value)
2230-
{
2231-
return value.value.split(VersionedValue.DELIMITER_STR, -1);
2232-
}
2233-
22342235
public static void updateIndexStatus(InetAddressAndPort endpoint, VersionedValue versionedValue)
22352236
{
22362237
IndexStatusManager.instance.receivePeerIndexStatus(endpoint, versionedValue);

src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,7 @@ public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next, boolean
6565
next.tokenMap.lastModified().equals(prev.tokenMap.lastModified()))
6666
return;
6767

68-
Set<InetAddressAndPort> removedAddr = Sets.difference(new HashSet<>(prev.directory.allAddresses()),
69-
new HashSet<>(next.directory.allAddresses()));
68+
Set<InetAddressAndPort> removedAddr = Sets.difference(prev.directory.allAddresses(), next.directory.allAddresses());
7069

7170
Set<NodeId> changed = new HashSet<>();
7271
for (NodeId node : next.directory.peerIds())

0 commit comments

Comments
 (0)