diff --git a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcPublisher.java b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcPublisher.java index 4388c3e61..3e87233ab 100644 --- a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcPublisher.java +++ b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcPublisher.java @@ -19,6 +19,7 @@ import com.google.inject.Provider; import io.vertx.core.Vertx; +import org.apache.cassandra.bridge.CassandraBridgeFactory; import org.apache.cassandra.cdc.api.EventConsumer; import org.apache.cassandra.cdc.api.SchemaSupplier; import org.apache.cassandra.cdc.msg.CdcEvent; @@ -62,12 +63,14 @@ public TestCdcPublisher(Vertx vertx, VirtualTablesDatabaseAccessor virtualTables, SidecarCdcStats sidecarCdcStats, Serializer avroSerializer, - Provider rangeManagerProvider) + Provider rangeManagerProvider, + CassandraBridgeFactory cassandraBridgeFactory) { super(vertx, sidecarConfiguration, executorPools, clusterConfigProvider, schemaSupplier, sidecarInstancesProvider, clientConfig, instanceMetadataFetcher, conf, databaseAccessor, cdcStats, - virtualTables, sidecarCdcStats, avroSerializer, rangeManagerProvider); + virtualTables, sidecarCdcStats, avroSerializer, rangeManagerProvider, + cassandraBridgeFactory); this.databaseAccessor = databaseAccessor; } diff --git a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/testing/SharedClusterCdcSidecarIntegrationTestBase.java b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/testing/SharedClusterCdcSidecarIntegrationTestBase.java index 56b68df28..9bc56154a 100644 --- a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/testing/SharedClusterCdcSidecarIntegrationTestBase.java +++ b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/testing/SharedClusterCdcSidecarIntegrationTestBase.java @@ -27,6 +27,7 @@ import com.google.inject.Provides; import com.google.inject.Singleton; import io.vertx.core.Vertx; +import org.apache.cassandra.bridge.CassandraBridgeFactory; import org.apache.cassandra.cdc.api.SchemaSupplier; import org.apache.cassandra.cdc.msg.CdcEvent; import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider; @@ -80,16 +81,6 @@ void cleanupCdcConsumerAfterEachTest() } } - @Override - protected void beforeClusterProvisioning() - { - // The current CDC implementation cannot read 5.x commitlogs, so verify Cassandra version is 4.x - SimpleCassandraVersion version = SimpleCassandraVersion.create(testVersion.version()); - assumeThat(version.major) - .as("Current CDC implementation cannot read 5.x commitlogs, requires Cassandra 4.x") - .isEqualTo(4); - } - @Override protected ClusterBuilderConfiguration testClusterConfiguration() { @@ -167,7 +158,8 @@ CdcPublisher cdcPublisher(Vertx vertx, VirtualTablesDatabaseAccessor virtualTables, SidecarCdcStats sidecarCdcStats, Serializer avroSerializer, - TokenRingProvider tokenRingProvider) + TokenRingProvider tokenRingProvider, + CassandraBridgeFactory cassandraBridgeFactory) { RangeManager rangeManager = new ContentionFreeRangeManager(vertx, tokenRingProvider); return new TestCdcPublisher(vertx, @@ -184,7 +176,8 @@ CdcPublisher cdcPublisher(Vertx vertx, virtualTables, sidecarCdcStats, avroSerializer, - () -> rangeManager); + () -> rangeManager, + cassandraBridgeFactory); } @Provides diff --git a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java index 75aa6667d..ac0568d28 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java @@ -33,6 +33,8 @@ import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.eventbus.Message; +import org.apache.cassandra.bridge.CassandraBridgeFactory; +import org.apache.cassandra.bridge.CassandraVersion; import org.apache.cassandra.cdc.CdcLogMode; import org.apache.cassandra.cdc.api.EventConsumer; import org.apache.cassandra.cdc.api.SchemaSupplier; @@ -94,6 +96,7 @@ public class CdcPublisher implements Handler>, PeriodicTask private CdcManager cdcManager; private final Serializer avroSerializer; private final Provider rangeManagerProvider; + private final CassandraBridgeFactory cassandraBridgeFactory; KafkaProducer producer; KafkaPublisher kafkaPublisher; @@ -112,7 +115,8 @@ public CdcPublisher(Vertx vertx, VirtualTablesDatabaseAccessor virtualTables, SidecarCdcStats sidecarCdcStats, Serializer avroSerializer, - Provider rangeManagerProvider) + Provider rangeManagerProvider, + CassandraBridgeFactory cassandraBridgeFactory) { this.sidecarCdcStats = sidecarCdcStats; this.executorPools = executorPools.internal(); @@ -129,6 +133,7 @@ public CdcPublisher(Vertx vertx, this.sidecarConfiguration = sidecarConfiguration; this.avroSerializer = avroSerializer; this.rangeManagerProvider = rangeManagerProvider; + this.cassandraBridgeFactory = cassandraBridgeFactory; if (conf.cdcEnabled()) { @@ -184,13 +189,18 @@ public EventConsumer eventConsumer(CdcConfig conf, this.kafkaPublisher.close(); } this.producer = new KafkaProducer<>(conf.kafkaConfigs()); - this.kafkaPublisher = new KafkaPublisher(TopicSupplier.staticTopicSupplier(conf.kafkaTopic()), - producer, - avroSerializer, - conf.maxRecordSizeBytes(), - conf.failOnRecordTooLargeError(), - conf.failOnKafkaError(), - CdcLogMode.FULL); + CassandraVersion version = cassandraBridgeFactory.get( + instanceMetadataFetcher.callOnFirstAvailableInstance(instance -> + instance.delegate().nodeSettings()).releaseVersion() + ).getVersion(); + this.kafkaPublisher = new KafkaPublisher(version, + TopicSupplier.staticTopicSupplier(conf.kafkaTopic()), + producer, + avroSerializer, + conf.maxRecordSizeBytes(), + conf.failOnRecordTooLargeError(), + conf.failOnKafkaError(), + CdcLogMode.FULL); return new CdcEventConsumer(kafkaPublisher); } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/cdc/SidecarCdcOptions.java b/server/src/main/java/org/apache/cassandra/sidecar/cdc/SidecarCdcOptions.java index a29687199..cf6ae4e5a 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/cdc/SidecarCdcOptions.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/cdc/SidecarCdcOptions.java @@ -20,6 +20,7 @@ import java.util.Map; +import org.apache.cassandra.bridge.CassandraVersion; import org.apache.cassandra.cdc.api.CdcOptions; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; import org.apache.cassandra.spark.data.ReplicationFactor; @@ -50,4 +51,12 @@ public String dc() { return instanceMetadataFetcher.callOnFirstAvailableInstance(instance-> instance.delegate().nodeSettings().datacenter()); } + + @Override + public CassandraVersion version() + { + String releaseVersion = instanceMetadataFetcher.callOnFirstAvailableInstance( + instance -> instance.delegate().nodeSettings().releaseVersion()); + return CassandraVersion.fromVersion(releaseVersion).orElse(CassandraVersion.FOURZERO); + } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java b/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java index ddf094f9c..5fef0957e 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java @@ -400,7 +400,8 @@ CdcPublisher cdcPublisher(Vertx vertx, VirtualTablesDatabaseAccessor virtualTables, SidecarCdcStats sidecarCdcStats, Serializer avroSerializer, - RangeManager rangeManager) + RangeManager rangeManager, + CassandraBridgeFactory cassandraBridgeFactory) { return new CdcPublisher(vertx, sidecarConfiguration, @@ -416,7 +417,8 @@ CdcPublisher cdcPublisher(Vertx vertx, virtualTables, sidecarCdcStats, avroSerializer, - () -> rangeManager); + () -> rangeManager, + cassandraBridgeFactory); } @Provides diff --git a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java index 4665e5551..81769c28b 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java @@ -28,6 +28,7 @@ import com.google.inject.Provider; import io.vertx.core.Vertx; +import org.apache.cassandra.bridge.CassandraBridgeFactory; import org.apache.cassandra.cdc.api.EventConsumer; import org.apache.cassandra.cdc.api.SchemaSupplier; import org.apache.cassandra.cdc.msg.CdcEvent; @@ -89,6 +90,8 @@ public class CdcPublisherTests private Serializer avroSerializer; @Mock private Provider rangeManager; + @Mock + private CassandraBridgeFactory cassandraBridgeFactory; private SidecarConfiguration sidecarConfiguration; private CdcConfig cdcConfig; @@ -124,7 +127,8 @@ void setUp() virtualTables, sidecarCdcStats, avroSerializer, - rangeManager + rangeManager, + cassandraBridgeFactory ); }