diff --git a/cassandra-analytics-integration-framework/build.gradle b/cassandra-analytics-integration-framework/build.gradle index a88c0da9a..d21249a11 100644 --- a/cassandra-analytics-integration-framework/build.gradle +++ b/cassandra-analytics-integration-framework/build.gradle @@ -59,6 +59,10 @@ dependencies { api("org.junit.vintage:junit-vintage-engine:${junitVersion}") api('org.mockito:mockito-inline:4.10.0') + // ByteBuddy agent is core test infrastructure: SharedClusterIntegrationTestBase installs it for + // class redefinition during @AfterEach cluster reset. Transitive via mockito-inline but declared + // explicitly since the framework uses it directly. + api('net.bytebuddy:byte-buddy-agent:1.12.19') api("org.assertj:assertj-core:${assertjCoreVersion}") api('com.datastax.cassandra:cassandra-driver-core:3.11.3') diff --git a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java index 460d8565b..dca759987 100644 --- a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java +++ b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java @@ -27,10 +27,12 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -38,7 +40,10 @@ import java.util.stream.StreamSupport; import com.google.common.util.concurrent.Uninterruptibles; +import net.bytebuddy.agent.ByteBuddyAgent; +import net.bytebuddy.dynamic.loading.ClassReloadingStrategy; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.extension.ExtendWith; @@ -64,6 +69,7 @@ import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.ICluster; import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.distributed.api.IIsolatedExecutor; import org.apache.cassandra.distributed.api.IInstanceConfig; import org.apache.cassandra.distributed.shared.JMXUtil; import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; @@ -167,11 +173,34 @@ public abstract class SharedClusterIntegrationTestBase protected MtlsTestHelper mtlsTestHelper; private IsolatedDTestClassLoaderWrapper classLoaderWrapper; private Injector sidecarServerInjector; + private int initialClusterSize; + + /** + * Instance indices (1-based) that should not be restarted or awaited during {@link #resetClusterState()}. + * Subclasses that permanently alter topology (e.g. host replacement, decommission) should add the + * affected node indices here during {@link #afterClusterProvisioned()}. + */ + protected final Set nonResettableInstances = new HashSet<>(); + + /** + * Shared ByteBuddy class-reloading strategy. Subclasses that redefine classes via ByteBuddy should + * use this strategy instance so that {@link #resetClusterState()} (or overrides in child classes) + * can restore original bytecode. A fresh {@code fromInstalledAgent()} would have an empty internal + * map and {@code reset()} would be a no-op. + * + * Since we share a single cluster across multiple test calls, our options are either to isolate and split up any + * tests that make ByteBuddy changes or to wire them up to the reset logic. + */ + protected final ClassReloadingStrategy classReloadingStrategy; static { - // Initialize defaults to configure the in-jvm dtest TestUtils.configureDefaultDTestJarProperties(); + ByteBuddyAgent.install(); + } + + { + classReloadingStrategy = ClassReloadingStrategy.fromInstalledAgent(); } @BeforeAll @@ -188,6 +217,12 @@ protected void setup() throws Exception beforeClusterProvisioning(); cluster = provisionClusterWithRetries(this.testVersion); assertThat(cluster).isNotNull(); + initialClusterSize = cluster.size(); + // If we have a test timeout, we'll often get a wall of FSWriteErrors inside tmp files as memtable flushing + // and transaction log I/O races w/shutdowns from our tests here. Rather than get extra insult to injury if a + // test times out, we instead unregister the StorageService shutdown hooks; we don't much care about memtable + // content from a node getting flushed with unit tests that run ephemerally. + removeShutdownHooks(); afterClusterProvisioned(); initializeSchemaForTest(); mtlsTestHelper = new MtlsTestHelper(secretsPath); @@ -265,6 +300,50 @@ protected void tearDown() throws Exception } } + /** + * Resets shared cluster state after each test. Clears message filters, restarts any stopped + * instances, waits for gossip and sidecar JMX readiness, and re-removes shutdown hooks. + * + *

Subclasses that need additional cleanup (e.g. ByteBuddy class resets, data resets) should + * override, perform their cleanup, then call {@code super.resetClusterState()}. + */ + @AfterEach + protected void resetClusterState() + { + if (cluster == null) + { + return; + } + + cluster.filters().reset(); + + for (int i = 1; i <= initialClusterSize; i++) + { + if (nonResettableInstances.contains(i)) + { + continue; + } + IInstance instance = cluster.get(i); + if (instance.isShutdown()) + { + instance.startup(); + } + } + + IInstance reference = cluster.getFirstRunningInstance(); + for (int i = 1; i <= initialClusterSize; i++) + { + if (nonResettableInstances.contains(i)) + { + continue; + } + cluster.awaitRingState(reference, cluster.get(i), "Normal"); + } + + waitForSidecarJmxReady(60); + removeShutdownHooks(); + } + /** * Returns the configuration for the test cluster. The default configuration for the cluster has 1 * node, 1 DC, 1 data directory per node, with the {@link org.apache.cassandra.distributed.api.Feature#GOSSIP}, @@ -330,6 +409,57 @@ protected void afterClusterShutdown() { } + /** + * Removes the StorageService drain shutdown hook from each instance to prevent a race condition + * between the JVM shutdown hook (which runs drain and flushes memtables) and Instance.shutdown() + * (which tears down executors and cleans up data directories). Without this, a SIGTERM or JVM + * exit during teardown can trigger FSWriteError as drain flushes write to directories that no + * longer exist. + * + * In theory this is brittle, but the code we're relying on here is 12 years old so we're probably fine. If it fails + * in the future due to us reflecting in for this, it should be pretty clear why. + * + *

This is safe because Instance.shutdown() already performs its own orderly teardown of flush + * writers and executors - the drain hook is redundant in the test context. + */ + protected void removeShutdownHooks() + { + if (cluster == null) + { + return; + } + for (int i = 1; i <= initialClusterSize; i++) + { + if (nonResettableInstances.contains(i)) + { + continue; + } + try + { + IInstance instance = cluster.get(i); + if (!instance.isShutdown()) + { + instance.sync((IIsolatedExecutor.SerializableRunnable) () -> { + try + { + Class ssClass = Class.forName("org.apache.cassandra.service.StorageService"); + Object ssInstance = ssClass.getField("instance").get(null); + ssClass.getMethod("removeShutdownHook").invoke(ssInstance); + } + catch (Exception e) + { + throw new RuntimeException("Failed to remove StorageService shutdown hook", e); + } + }).run(); + } + } + catch (Throwable t) + { + logger.warn("Failed to remove shutdown hook for instance {}", i, t); + } + } + } + protected void createTestKeyspace(QualifiedName name, Map rf) { createTestKeyspace(name.maybeQuotedKeyspace(), rf); @@ -388,6 +518,57 @@ protected Server startSidecarWithInstances(Iterable instanc return sidecarServer; } + /** + * Waits for the sidecar's JMX connections to all running Cassandra instances to be re-established. + * + *

After a node restart, the gossip ring state reaches "Normal" before the sidecar's async JMX + * reconnection loop fires. Calling this method ensures the sidecar can serve JMX-backed requests + * (e.g. {@code /api/v1/cassandra/settings}) for every running node before the next test starts. + * + * @param timeoutSeconds maximum time to wait; method returns early once all instances are ready + */ + protected void waitForSidecarJmxReady(long timeoutSeconds) + { + if (sidecarServerInjector == null || cluster == null) + { + return; + } + + InstancesMetadata instancesMetadata = sidecarServerInjector.getInstance(InstancesMetadata.class); + long deadlineMs = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(timeoutSeconds); + + for (int i = 1; i <= initialClusterSize; i++) + { + if (nonResettableInstances.contains(i)) + { + continue; + } + IInstance instance = cluster.get(i); + if (instance.isShutdown()) + { + continue; + } + + int instanceId = i; + while (System.currentTimeMillis() < deadlineMs) + { + try + { + CassandraAdapterDelegate delegate = instancesMetadata.instanceFromId(instanceId).delegate(); + if (delegate.isJmxUp()) + { + break; + } + } + catch (Exception ignored) + { + // delegate not ready yet, continue polling + } + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + } + } + } + protected void waitForSchemaReady(long timeout, TimeUnit timeUnit) { assertThat(sidecarServerInjector) diff --git a/cassandra-analytics-integration-tests/build.gradle b/cassandra-analytics-integration-tests/build.gradle index d8d469f66..354623dd8 100644 --- a/cassandra-analytics-integration-tests/build.gradle +++ b/cassandra-analytics-integration-tests/build.gradle @@ -35,7 +35,7 @@ if (propertyWithDefault("artifactType", null) == "spark") def integrationMaxHeapSize = System.getenv("INTEGRATION_MAX_HEAP_SIZE") ?: "3000M" println("Using ${integrationMaxHeapSize} maxHeapSize") -def integrationMaxParallelForks = (System.getenv("INTEGRATION_MAX_PARALLEL_FORKS") ?: "4") as int +def integrationMaxParallelForks = (System.getenv("INTEGRATION_MAX_PARALLEL_FORKS") ?: "2") as int println("Using ${integrationMaxParallelForks} maxParallelForks") def integrationEnableMtls = (System.getenv("INTEGRATION_MTLS_ENABLED") ?: "true") as boolean diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java index b4bf20653..71f1065f5 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java @@ -25,11 +25,10 @@ import java.util.List; import java.util.stream.Collectors; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import net.bytebuddy.ByteBuddy; -import net.bytebuddy.agent.ByteBuddyAgent; -import net.bytebuddy.dynamic.loading.ClassReloadingStrategy; import net.bytebuddy.implementation.MethodCall; import net.bytebuddy.matcher.ElementMatchers; import org.apache.cassandra.distributed.api.ConsistencyLevel; @@ -59,6 +58,23 @@ public class BulkReaderMultiDCConsistencyTest extends SharedClusterSparkIntegrat static final String TEST_VAL = "C*"; QualifiedName table1 = uniqueTestTableFullName(TEST_KEYSPACE); + /** + * Test-specific data reset. CassandraDataLayer bytecode restoration and generic cluster recovery + * (filter reset, node restart, ring wait, JMX wait, shutdown hooks) are handled by the parent chain. + */ + @AfterEach + @Override + protected void resetClusterState() + { + super.resetClusterState(); + + // Restore data to original values after generic cluster recovery completes + for (int i = 0; i < OG_DATASET.size(); i++) + { + setValueForALL(i, OG_DATASET.get(i)); + } + } + @Override protected ClusterBuilderConfiguration testClusterConfiguration() { @@ -112,9 +128,6 @@ void happyPathTest() String valQuorum = readValueForKey(TEST_KEY, ConsistencyLevel.QUORUM); String valEachQuorum = readValueForKey(TEST_KEY, ConsistencyLevel.EACH_QUORUM); assertThat(valAll).isEqualTo(valQuorum).isEqualTo(valEachQuorum).isEqualTo(rowList.get(1).getString(1)); - - // Revert the value update for all nodes - setValueForALL(TEST_KEY, OG_DATASET.get(TEST_KEY)); } public static PartitionedDataLayer.AvailabilityHint getAvailability(CassandraInstance instance) @@ -149,19 +162,18 @@ void eachQuorumIsNotQuorum() throws NoSuchMethodException // primaryReplicas: [Node1, Node2, Node3, Node4] // secondaryReplicas: [Node5, Node6] // Number of nodes required for QUORUM read id 6/1 + 1 = 4. Bulk reader will read from [Node1, Node2, Node3, Node4] only. - ByteBuddyAgent.install(); new ByteBuddy() - .redefine(CassandraDataLayer.class) - .method(ElementMatchers.named("getAvailability")) - .intercept( - MethodCall.invoke(BulkReaderMultiDCConsistencyTest.class.getMethod("getAvailability", CassandraInstance.class)) - .withAllArguments() - ) - .make() - .load( - CassandraDataLayer.class.getClassLoader(), - ClassReloadingStrategy.fromInstalledAgent() - ); + .redefine(CassandraDataLayer.class) + .method(ElementMatchers.named("getAvailability")) + .intercept( + MethodCall.invoke(BulkReaderMultiDCConsistencyTest.class.getMethod("getAvailability", CassandraInstance.class)) + .withAllArguments() + ) + .make() + .load( + CassandraDataLayer.class.getClassLoader(), + classReloadingStrategy + ); // Bulk read with QUORUM consistency List rowList = bulkRead(ConsistencyLevel.QUORUM.name()); @@ -190,19 +202,14 @@ void eachQuorumIsNotQuorum() throws NoSuchMethodException String eachQuorumVal = readValueForKey(TEST_KEY, ConsistencyLevel.EACH_QUORUM); // Validate that EACH_QUORUM read using driver and the bulk reader are the same assertThat(eachQuorumVal).isEqualTo(rowList.get(TEST_KEY).getString(1)); - - // Revert the value update for all nodes - setValueForALL(TEST_KEY, OG_DATASET.get(TEST_KEY)); } /** * Tests that EACH_QUORUM read succeeds with one node down in each DC. * Tests that value read using driver is the same as the value read using bulk reader. - * - * @throws Exception */ @Test - void eachQuorumSuccessWithOneNodeDownEachDC() throws Exception + void eachQuorumSuccessWithOneNodeDownEachDC() { // Stop Node1(DC1) cluster.stopUnchecked(cluster.get(1)); @@ -217,10 +224,6 @@ void eachQuorumSuccessWithOneNodeDownEachDC() throws Exception String eachQuorumVal = readValueForKey(TEST_KEY, ConsistencyLevel.EACH_QUORUM); // Validate that data from driver and bulk reader are the same assertThat(eachQuorumVal).isEqualTo(rowList.get(TEST_KEY).getString(1)); - - // Tear down and re-create the cluster - tearDown(); - setup(); } /** @@ -229,11 +232,9 @@ void eachQuorumSuccessWithOneNodeDownEachDC() throws Exception * QUORUM read value using bulk reader equals QUORUM read value using driver. * EACH_QUORUM read with bulk reader fails with cause as NotEnoughReplicasException. * EACH_QUORUM read with driver fails. - * - * @throws Exception */ @Test - void eachQuorumFailureWithTwoNodesDownOneDC() throws Exception + void eachQuorumFailureWithTwoNodesDownOneDC() { // Stop Node4(DC2) cluster.stopUnchecked(cluster.get(4)); @@ -271,10 +272,6 @@ void eachQuorumFailureWithTwoNodesDownOneDC() throws Exception assertThat(ex).isNotNull(); assertThat(ex.getMessage()).isEqualTo("Cannot achieve consistency level EACH_QUORUM in DC datacenter2"); } - - // Tear down and re-create the cluster - tearDown(); - setup(); } /** diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java index c6a98ca73..c4628aacb 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java @@ -19,6 +19,8 @@ package org.apache.cassandra.analytics; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -27,6 +29,7 @@ import java.util.function.Function; import java.util.stream.Collectors; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.TestInstance.Lifecycle; import org.junit.jupiter.api.extension.ExtendWith; @@ -38,6 +41,7 @@ import org.apache.cassandra.bridge.CassandraBridgeFactory; import org.apache.cassandra.sidecar.testing.QualifiedName; import org.apache.cassandra.sidecar.testing.SharedClusterIntegrationTestBase; +import org.apache.cassandra.spark.data.CassandraDataLayer; import org.apache.spark.SparkConf; import org.apache.spark.sql.DataFrameReader; import org.apache.spark.sql.DataFrameWriter; @@ -82,6 +86,26 @@ protected void afterClusterShutdown() sparkTestUtils.tearDown(); } + /** + * Restores any ByteBuddy redefinitions of {@link CassandraDataLayer} before the generic cluster + * recovery in the grandparent. No-op if {@code CassandraDataLayer} was never redefined through the + * shared strategy. + */ + @AfterEach + @Override + protected void resetClusterState() + { + try + { + classReloadingStrategy.reset(CassandraDataLayer.class); + } + catch (IOException e) + { + throw new UncheckedIOException("Failed to reset CassandraDataLayer bytecode", e); + } + super.resetClusterState(); + } + /** * A preconfigured {@link DataFrameReader} with pre-populated required options that can be overridden * with additional options for every specific test. diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCTest.java index 19c902dbe..027da4216 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCTest.java @@ -103,9 +103,9 @@ protected CountDownLatch nodeStart() static Stream multiDCTestInputs() { return Stream.of( - Arguments.of(TestConsistencyLevel.of(LOCAL_QUORUM, LOCAL_QUORUM)), - Arguments.of(TestConsistencyLevel.of(LOCAL_QUORUM, EACH_QUORUM)), - Arguments.of(TestConsistencyLevel.of(QUORUM, QUORUM)) + Arguments.of(TestConsistencyLevel.of(LOCAL_QUORUM, LOCAL_QUORUM)), + Arguments.of(TestConsistencyLevel.of(LOCAL_QUORUM, EACH_QUORUM)), + Arguments.of(TestConsistencyLevel.of(QUORUM, QUORUM)) ); } diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java index 9cbbeba5d..9d363ce73 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java @@ -31,6 +31,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.cassandra.distributed.api.IIsolatedExecutor; + import com.google.common.util.concurrent.Uninterruptibles; import org.junit.jupiter.params.provider.Arguments; @@ -258,6 +260,29 @@ private void stopNodes(IInstance seed, List nodesToRemove) { for (IInstance node : nodesToRemove) { + nonResettableInstances.add(node.config().num()); + // Explicitly stop gossip before killing the node. removeShutdownHooks() strips the StorageService + // drain hook to prevent FSWriteError races, but that hook was also responsible for broadcasting + // STATUS=shutdown to peers via Gossiper.stop(). Without it, peers' Gossiper.isAlive() still returns + // true for this node even after the failure detector convicts it, causing + // StorageService.handleStateBootreplacing to throw "trying to replace alive node". Stopping gossip + // here broadcasts the shutdown tombstone and halts the GossipStage thread pool before we kill the node. + node.sync((IIsolatedExecutor.SerializableRunnable) () -> { + try + { + Class gossipClass = Class.forName("org.apache.cassandra.gms.Gossiper"); + Object gossiper = gossipClass.getField("instance").get(null); + gossipClass.getMethod("stop").invoke(gossiper); + } + catch (ClassNotFoundException | NoSuchFieldException | NoSuchMethodException ignored) + { + // Gossip not present (e.g. pure TCM versions); isAlive() race cannot occur there either + } + catch (Exception e) + { + throw new RuntimeException("Failed to stop gossip before node shutdown", e); + } + }).run(); cluster.stopUnchecked(node); // awaitRingStatus will assert that the node status is down. It retries multiple times until a timeout // is reached and fails if the expected status is not seen. diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingTestBase.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingTestBase.java index 11a6f2b9f..aefeda5b8 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingTestBase.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingTestBase.java @@ -147,6 +147,7 @@ protected List decommissionNodes(ICluster cluste for (int i = 0; i < leavingNodesPerDC * numDcs; i++) { IInstance node = cluster.get(cluster.size() - i); + nonResettableInstances.add(node.config().num()); new Thread(() -> { NodeToolResult decommission = node.nodetoolResult("decommission"); if (decommission.getRc() != 0 || decommission.getError() != null) diff --git a/cassandra-analytics-integration-tests/src/test/resources/logback-test.xml b/cassandra-analytics-integration-tests/src/test/resources/logback-test.xml index 14a4cda7a..cf8032ccc 100644 --- a/cassandra-analytics-integration-tests/src/test/resources/logback-test.xml +++ b/cassandra-analytics-integration-tests/src/test/resources/logback-test.xml @@ -46,4 +46,5 @@ + diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CdcBridgeImplementation.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CdcBridgeImplementation.java index 6062f8cc2..cfcea225b 100644 --- a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CdcBridgeImplementation.java +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CdcBridgeImplementation.java @@ -67,7 +67,7 @@ protected static synchronized void setCDC(Path path, int commitLogSegmentSize, b DatabaseDescriptor.setCommitLogSyncGroupWindow(30); DatabaseDescriptor.setCommitLogSegmentSize(commitLogSegmentSize); DatabaseDescriptor.getRawConfig().commitlog_total_space = new DataStorageSpec.IntMebibytesBound(1024); - DatabaseDescriptor.setCommitLogWriteDiskAccessMode(Config.DiskAccessMode.direct); + DatabaseDescriptor.setCommitLogWriteDiskAccessMode(Config.DiskAccessMode.mmap); DatabaseDescriptor.setCDCTotalSpaceInMiB(1024); DatabaseDescriptor.setCommitLogSegmentMgrProvider((commitLog -> new CommitLogSegmentManagerCDC(commitLog, commitLogPath.toString()))); setup = true;