Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.inject.Provider;
import io.vertx.core.Vertx;
import org.apache.cassandra.bridge.CassandraBridgeFactory;
import org.apache.cassandra.cdc.api.EventConsumer;
import org.apache.cassandra.cdc.api.SchemaSupplier;
import org.apache.cassandra.cdc.msg.CdcEvent;
Expand Down Expand Up @@ -62,12 +63,14 @@ public TestCdcPublisher(Vertx vertx,
VirtualTablesDatabaseAccessor virtualTables,
SidecarCdcStats sidecarCdcStats,
Serializer<CdcEvent> avroSerializer,
Provider<RangeManager> rangeManagerProvider)
Provider<RangeManager> rangeManagerProvider,
CassandraBridgeFactory cassandraBridgeFactory)
{
super(vertx, sidecarConfiguration, executorPools, clusterConfigProvider,
schemaSupplier, sidecarInstancesProvider, clientConfig,
instanceMetadataFetcher, conf, databaseAccessor, cdcStats,
virtualTables, sidecarCdcStats, avroSerializer, rangeManagerProvider);
virtualTables, sidecarCdcStats, avroSerializer, rangeManagerProvider,
cassandraBridgeFactory);
this.databaseAccessor = databaseAccessor;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.inject.Provides;
import com.google.inject.Singleton;
import io.vertx.core.Vertx;
import org.apache.cassandra.bridge.CassandraBridgeFactory;
import org.apache.cassandra.cdc.api.SchemaSupplier;
import org.apache.cassandra.cdc.msg.CdcEvent;
import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider;
Expand Down Expand Up @@ -80,16 +81,6 @@ void cleanupCdcConsumerAfterEachTest()
}
}

@Override
protected void beforeClusterProvisioning()
{
// The current CDC implementation cannot read 5.x commitlogs, so verify Cassandra version is 4.x
SimpleCassandraVersion version = SimpleCassandraVersion.create(testVersion.version());
assumeThat(version.major)
.as("Current CDC implementation cannot read 5.x commitlogs, requires Cassandra 4.x")
.isEqualTo(4);
}

@Override
protected ClusterBuilderConfiguration testClusterConfiguration()
{
Expand Down Expand Up @@ -167,7 +158,8 @@ CdcPublisher cdcPublisher(Vertx vertx,
VirtualTablesDatabaseAccessor virtualTables,
SidecarCdcStats sidecarCdcStats,
Serializer<CdcEvent> avroSerializer,
TokenRingProvider tokenRingProvider)
TokenRingProvider tokenRingProvider,
CassandraBridgeFactory cassandraBridgeFactory)
{
RangeManager rangeManager = new ContentionFreeRangeManager(vertx, tokenRingProvider);
return new TestCdcPublisher(vertx,
Expand All @@ -184,7 +176,8 @@ CdcPublisher cdcPublisher(Vertx vertx,
virtualTables,
sidecarCdcStats,
avroSerializer,
() -> rangeManager);
() -> rangeManager,
cassandraBridgeFactory);
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import org.apache.cassandra.bridge.CassandraBridgeFactory;
import org.apache.cassandra.bridge.CassandraVersion;
import org.apache.cassandra.cdc.CdcLogMode;
import org.apache.cassandra.cdc.api.EventConsumer;
import org.apache.cassandra.cdc.api.SchemaSupplier;
Expand Down Expand Up @@ -94,6 +96,7 @@ public class CdcPublisher implements Handler<Message<Object>>, PeriodicTask
private CdcManager cdcManager;
private final Serializer<CdcEvent> avroSerializer;
private final Provider<RangeManager> rangeManagerProvider;
private final CassandraBridgeFactory cassandraBridgeFactory;
KafkaProducer<String, byte[]> producer;
KafkaPublisher kafkaPublisher;

Expand All @@ -112,7 +115,8 @@ public CdcPublisher(Vertx vertx,
VirtualTablesDatabaseAccessor virtualTables,
SidecarCdcStats sidecarCdcStats,
Serializer<CdcEvent> avroSerializer,
Provider<RangeManager> rangeManagerProvider)
Provider<RangeManager> rangeManagerProvider,
CassandraBridgeFactory cassandraBridgeFactory)
{
this.sidecarCdcStats = sidecarCdcStats;
this.executorPools = executorPools.internal();
Expand All @@ -129,6 +133,7 @@ public CdcPublisher(Vertx vertx,
this.sidecarConfiguration = sidecarConfiguration;
this.avroSerializer = avroSerializer;
this.rangeManagerProvider = rangeManagerProvider;
this.cassandraBridgeFactory = cassandraBridgeFactory;

if (conf.cdcEnabled())
{
Expand Down Expand Up @@ -184,13 +189,18 @@ public EventConsumer eventConsumer(CdcConfig conf,
this.kafkaPublisher.close();
}
this.producer = new KafkaProducer<>(conf.kafkaConfigs());
this.kafkaPublisher = new KafkaPublisher(TopicSupplier.staticTopicSupplier(conf.kafkaTopic()),
producer,
avroSerializer,
conf.maxRecordSizeBytes(),
conf.failOnRecordTooLargeError(),
conf.failOnKafkaError(),
CdcLogMode.FULL);
CassandraVersion version = cassandraBridgeFactory.get(
instanceMetadataFetcher.callOnFirstAvailableInstance(instance ->
instance.delegate().nodeSettings()).releaseVersion()
).getVersion();
this.kafkaPublisher = new KafkaPublisher(version,
TopicSupplier.staticTopicSupplier(conf.kafkaTopic()),
producer,
avroSerializer,
conf.maxRecordSizeBytes(),
conf.failOnRecordTooLargeError(),
conf.failOnKafkaError(),
CdcLogMode.FULL);
return new CdcEventConsumer(kafkaPublisher);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Map;

import org.apache.cassandra.bridge.CassandraVersion;
import org.apache.cassandra.cdc.api.CdcOptions;
import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
import org.apache.cassandra.spark.data.ReplicationFactor;
Expand Down Expand Up @@ -50,4 +51,12 @@ public String dc()
{
return instanceMetadataFetcher.callOnFirstAvailableInstance(instance-> instance.delegate().nodeSettings().datacenter());
}

@Override
public CassandraVersion version()
{
String releaseVersion = instanceMetadataFetcher.callOnFirstAvailableInstance(
instance -> instance.delegate().nodeSettings().releaseVersion());
return CassandraVersion.fromVersion(releaseVersion).orElse(CassandraVersion.FOURZERO);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,8 @@ CdcPublisher cdcPublisher(Vertx vertx,
VirtualTablesDatabaseAccessor virtualTables,
SidecarCdcStats sidecarCdcStats,
Serializer<CdcEvent> avroSerializer,
RangeManager rangeManager)
RangeManager rangeManager,
CassandraBridgeFactory cassandraBridgeFactory)
{
return new CdcPublisher(vertx,
sidecarConfiguration,
Expand All @@ -416,7 +417,8 @@ CdcPublisher cdcPublisher(Vertx vertx,
virtualTables,
sidecarCdcStats,
avroSerializer,
() -> rangeManager);
() -> rangeManager,
cassandraBridgeFactory);
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import com.google.inject.Provider;
import io.vertx.core.Vertx;
import org.apache.cassandra.bridge.CassandraBridgeFactory;
import org.apache.cassandra.cdc.api.EventConsumer;
import org.apache.cassandra.cdc.api.SchemaSupplier;
import org.apache.cassandra.cdc.msg.CdcEvent;
Expand Down Expand Up @@ -89,6 +90,8 @@ public class CdcPublisherTests
private Serializer<CdcEvent> avroSerializer;
@Mock
private Provider<RangeManager> rangeManager;
@Mock
private CassandraBridgeFactory cassandraBridgeFactory;

private SidecarConfiguration sidecarConfiguration;
private CdcConfig cdcConfig;
Expand Down Expand Up @@ -124,7 +127,8 @@ void setUp()
virtualTables,
sidecarCdcStats,
avroSerializer,
rangeManager
rangeManager,
cassandraBridgeFactory
);
}

Expand Down
Loading