Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cassandra-analytics-integration-framework/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ dependencies {

api("org.junit.vintage:junit-vintage-engine:${junitVersion}")
api('org.mockito:mockito-inline:4.10.0')
// ByteBuddy agent is core test infrastructure: SharedClusterIntegrationTestBase installs it for
// class redefinition during @AfterEach cluster reset. Transitive via mockito-inline but declared
// explicitly since the framework uses it directly.
api('net.bytebuddy:byte-buddy-agent:1.12.19')
api("org.assertj:assertj-core:${assertjCoreVersion}")

api('com.datastax.cassandra:cassandra-driver-core:3.11.3')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,23 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import com.google.common.util.concurrent.Uninterruptibles;
import net.bytebuddy.agent.ByteBuddyAgent;
import net.bytebuddy.dynamic.loading.ClassReloadingStrategy;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -64,6 +69,7 @@
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.api.IIsolatedExecutor;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.shared.JMXUtil;
import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
Expand Down Expand Up @@ -167,11 +173,34 @@ public abstract class SharedClusterIntegrationTestBase
protected MtlsTestHelper mtlsTestHelper;
private IsolatedDTestClassLoaderWrapper classLoaderWrapper;
private Injector sidecarServerInjector;
private int initialClusterSize;

/**
* Instance indices (1-based) that should not be restarted or awaited during {@link #resetClusterState()}.
* Subclasses that permanently alter topology (e.g. host replacement, decommission) should add the
* affected node indices here during {@link #afterClusterProvisioned()}.
*/
protected final Set<Integer> nonResettableInstances = new HashSet<>();

/**
* Shared ByteBuddy class-reloading strategy. Subclasses that redefine classes via ByteBuddy should
* use this strategy instance so that {@link #resetClusterState()} (or overrides in child classes)
* can restore original bytecode. A fresh {@code fromInstalledAgent()} would have an empty internal
* map and {@code reset()} would be a no-op.
*
* Since we share a single cluster across multiple test calls, our options are either to isolate and split up any
* tests that make ByteBuddy changes or to wire them up to the reset logic.
*/
protected final ClassReloadingStrategy classReloadingStrategy;

static
{
// Initialize defaults to configure the in-jvm dtest
TestUtils.configureDefaultDTestJarProperties();
ByteBuddyAgent.install();
}

{
classReloadingStrategy = ClassReloadingStrategy.fromInstalledAgent();
}

@BeforeAll
Expand All @@ -188,6 +217,12 @@ protected void setup() throws Exception
beforeClusterProvisioning();
cluster = provisionClusterWithRetries(this.testVersion);
assertThat(cluster).isNotNull();
initialClusterSize = cluster.size();
// If we have a test timeout, we'll often get a wall of FSWriteErrors inside tmp files as memtable flushing
// and transaction log I/O races w/shutdowns from our tests here. Rather than get extra insult to injury if a
// test times out, we instead unregister the StorageService shutdown hooks; we don't much care about memtable
// content from a node getting flushed with unit tests that run ephemerally.
removeShutdownHooks();
afterClusterProvisioned();
initializeSchemaForTest();
mtlsTestHelper = new MtlsTestHelper(secretsPath);
Expand Down Expand Up @@ -265,6 +300,50 @@ protected void tearDown() throws Exception
}
}

/**
* Resets shared cluster state after each test. Clears message filters, restarts any stopped
* instances, waits for gossip and sidecar JMX readiness, and re-removes shutdown hooks.
*
* <p>Subclasses that need additional cleanup (e.g. ByteBuddy class resets, data resets) should
* override, perform their cleanup, then call {@code super.resetClusterState()}.
*/
@AfterEach
protected void resetClusterState()
{
if (cluster == null)
{
return;
}

cluster.filters().reset();

for (int i = 1; i <= initialClusterSize; i++)
{
if (nonResettableInstances.contains(i))
{
continue;
}
IInstance instance = cluster.get(i);
if (instance.isShutdown())
{
instance.startup();
}
}

IInstance reference = cluster.getFirstRunningInstance();
for (int i = 1; i <= initialClusterSize; i++)
{
if (nonResettableInstances.contains(i))
{
continue;
}
cluster.awaitRingState(reference, cluster.get(i), "Normal");
}

waitForSidecarJmxReady(60);
removeShutdownHooks();
}

/**
* Returns the configuration for the test cluster. The default configuration for the cluster has 1
* node, 1 DC, 1 data directory per node, with the {@link org.apache.cassandra.distributed.api.Feature#GOSSIP},
Expand Down Expand Up @@ -330,6 +409,57 @@ protected void afterClusterShutdown()
{
}

/**
* Removes the StorageService drain shutdown hook from each instance to prevent a race condition
* between the JVM shutdown hook (which runs drain and flushes memtables) and Instance.shutdown()
* (which tears down executors and cleans up data directories). Without this, a SIGTERM or JVM
* exit during teardown can trigger FSWriteError as drain flushes write to directories that no
* longer exist.
*
* In theory this is brittle, but the code we're relying on here is 12 years old so we're probably fine. If it fails
* in the future due to us reflecting in for this, it should be pretty clear why.
*
* <p>This is safe because Instance.shutdown() already performs its own orderly teardown of flush
* writers and executors - the drain hook is redundant in the test context.
*/
protected void removeShutdownHooks()
{
if (cluster == null)
{
return;
}
for (int i = 1; i <= initialClusterSize; i++)
{
if (nonResettableInstances.contains(i))
{
continue;
}
try
{
IInstance instance = cluster.get(i);
if (!instance.isShutdown())
{
instance.sync((IIsolatedExecutor.SerializableRunnable) () -> {
try
{
Class<?> ssClass = Class.forName("org.apache.cassandra.service.StorageService");
Object ssInstance = ssClass.getField("instance").get(null);
ssClass.getMethod("removeShutdownHook").invoke(ssInstance);
}
catch (Exception e)
{
throw new RuntimeException("Failed to remove StorageService shutdown hook", e);
}
}).run();
}
}
catch (Throwable t)
{
logger.warn("Failed to remove shutdown hook for instance {}", i, t);
}
}
}

protected void createTestKeyspace(QualifiedName name, Map<String, Integer> rf)
{
createTestKeyspace(name.maybeQuotedKeyspace(), rf);
Expand Down Expand Up @@ -388,6 +518,57 @@ protected Server startSidecarWithInstances(Iterable<? extends IInstance> instanc
return sidecarServer;
}

/**
* Waits for the sidecar's JMX connections to all running Cassandra instances to be re-established.
*
* <p>After a node restart, the gossip ring state reaches "Normal" before the sidecar's async JMX
* reconnection loop fires. Calling this method ensures the sidecar can serve JMX-backed requests
* (e.g. {@code /api/v1/cassandra/settings}) for every running node before the next test starts.
*
* @param timeoutSeconds maximum time to wait; method returns early once all instances are ready
*/
protected void waitForSidecarJmxReady(long timeoutSeconds)
{
if (sidecarServerInjector == null || cluster == null)
{
return;
}

InstancesMetadata instancesMetadata = sidecarServerInjector.getInstance(InstancesMetadata.class);
long deadlineMs = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(timeoutSeconds);

for (int i = 1; i <= initialClusterSize; i++)
{
if (nonResettableInstances.contains(i))
{
continue;
}
IInstance instance = cluster.get(i);
if (instance.isShutdown())
{
continue;
}

int instanceId = i;
while (System.currentTimeMillis() < deadlineMs)
{
try
{
CassandraAdapterDelegate delegate = instancesMetadata.instanceFromId(instanceId).delegate();
if (delegate.isJmxUp())
{
break;
}
}
catch (Exception ignored)
{
// delegate not ready yet, continue polling
}
Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
}
}
}

protected void waitForSchemaReady(long timeout, TimeUnit timeUnit)
{
assertThat(sidecarServerInjector)
Expand Down
2 changes: 1 addition & 1 deletion cassandra-analytics-integration-tests/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ if (propertyWithDefault("artifactType", null) == "spark")
def integrationMaxHeapSize = System.getenv("INTEGRATION_MAX_HEAP_SIZE") ?: "3000M"
println("Using ${integrationMaxHeapSize} maxHeapSize")

def integrationMaxParallelForks = (System.getenv("INTEGRATION_MAX_PARALLEL_FORKS") ?: "4") as int
def integrationMaxParallelForks = (System.getenv("INTEGRATION_MAX_PARALLEL_FORKS") ?: "2") as int
println("Using ${integrationMaxParallelForks} maxParallelForks")

def integrationEnableMtls = (System.getenv("INTEGRATION_MTLS_ENABLED") ?: "true") as boolean
Expand Down
Loading
Loading