Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions scylla-cdc-driver3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,19 @@

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.4.2</version>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
Expand All @@ -83,6 +96,7 @@
<goal>shade</goal>
</goals>
<configuration>
<shadeTestJar>true</shadeTestJar>
<artifactSet>
<excludes>
<exclude>com.google.flogger</exclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import com.scylladb.cdc.model.worker.RawChange;
import com.scylladb.cdc.model.worker.Task;

public final class Driver3WorkerCQL implements WorkerCQL {
public class Driver3WorkerCQL implements WorkerCQL {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();

private final Session session;
Expand Down Expand Up @@ -95,11 +95,11 @@ public void prepare(Set<TableName> tables) throws InterruptedException, Executio
}
}

private final class Driver3Reader implements Reader {
protected class Driver3Reader implements Reader {

private volatile ResultSet rs;
protected volatile ResultSet rs;
private volatile ChangeSchema schema;
private final Optional<ChangeId> lastChangeId;
protected final Optional<ChangeId> lastChangeId;
private volatile boolean shouldTryRecreateSchema = true;
private volatile int lastPageColDefsHashcode = 0;

Expand All @@ -108,7 +108,7 @@ public Driver3Reader(ResultSet rs, Optional<ChangeId> lastChangeId) {
this.lastChangeId = Preconditions.checkNotNull(lastChangeId);
}

private void findNext(CompletableFuture<Optional<RawChange>> fut) {
protected void findNext(CompletableFuture<Optional<RawChange>> fut) {
if (rs.getAvailableWithoutFetching() == 0) {
if (rs.isFullyFetched()) {
fut.complete(Optional.empty());
Expand Down Expand Up @@ -183,10 +183,10 @@ public CompletableFuture<Optional<RawChange>> nextChange() {

}

private final class Driver3MultiReader implements Reader {
protected class Driver3MultiReader implements Reader {

private volatile List<Driver3Reader> readers;
private AtomicInteger currentReaderIndex;
protected volatile List<Driver3Reader> readers;
protected AtomicInteger currentReaderIndex;

public Driver3MultiReader(List<ResultSet> rss, Optional<ChangeId> lastChangeId) {
this.readers = rss.stream().map(rs -> new Driver3Reader(rs, lastChangeId)).collect(Collectors.toList());
Expand Down Expand Up @@ -223,8 +223,7 @@ public CompletableFuture<Optional<RawChange>> nextChange() {

}

private CompletableFuture<Reader> query(PreparedStatement stmt, Task task) {
CompletableFuture<Reader> result = new CompletableFuture<>();
protected List<ResultSetFuture> queryTables(PreparedStatement stmt, Task task) {
List<ResultSetFuture> futures = task.streams.stream().map(StreamId::getValue)
.map(streamId ->
session.executeAsync(
Expand All @@ -235,9 +234,12 @@ private CompletableFuture<Reader> query(PreparedStatement stmt, Task task) {
)
).collect(Collectors.toList());
logger.atFine().log("Querying window: [%s, %s] for task: %s, task state: %s", task.state.getWindowStart(), task.state.getWindowEnd(), task.id, task.state);
return futures;
}

protected CompletableFuture<Reader> wrapResults(Task task, List<ResultSetFuture> futures) {
CompletableFuture<Reader> result = new CompletableFuture<>();
Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<ResultSet>>() {

@Override
public void onSuccess(List<ResultSet> rss) {
result.complete(new Driver3MultiReader(rss, task.state.getLastConsumedChangeId()));
Expand All @@ -251,6 +253,11 @@ public void onFailure(Throwable t) {
return result;
}

protected CompletableFuture<Reader> query(PreparedStatement stmt, Task task) {
List<ResultSetFuture> futures = queryTables(stmt, task);
return wrapResults(task, futures);
}

@Override
public CompletableFuture<Reader> createReader(Task task) {
PreparedStatement stmt = preparedStmts.get(task.id.getTable());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.scylladb.cdc.cql.driver3;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.scylladb.cdc.model.worker.ChangeId;
import com.scylladb.cdc.model.worker.RawChange;
import com.scylladb.cdc.model.worker.Task;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

// Created for alterTableBeforeNextPage test method
public class MockDriver3WorkerCQL extends Driver3WorkerCQL {
public final ReentrantReadWriteLock nextPageLock;
public final ReentrantReadWriteLock nextRowLock;

public MockDriver3WorkerCQL(Driver3Session driver3Session) {
this(driver3Session, new ReentrantReadWriteLock(), new ReentrantReadWriteLock());
}

public MockDriver3WorkerCQL(Driver3Session driver3Session, ReentrantReadWriteLock nextPageLock, ReentrantReadWriteLock nextRowLock) {
super(driver3Session);
this.nextPageLock = nextPageLock;
this.nextRowLock = nextRowLock;
}

public class MockDriver3Reader extends Driver3WorkerCQL.Driver3Reader {
// Write lock will be used for blocking from outside.
// Read lock will be used inside the worker.
public final ReadWriteLock nextPageLock;
public final ReadWriteLock nextRowLock;

public MockDriver3Reader(ResultSet rs, Optional<ChangeId> lastChangeId, ReadWriteLock nextPageLock, ReadWriteLock nextRowLock) {
super(rs, lastChangeId);
this.nextPageLock = nextPageLock;
this.nextRowLock = nextRowLock;
}

@Override
protected void findNext(CompletableFuture<Optional<RawChange>> fut) {
if (rs.getAvailableWithoutFetching() == 0 && !rs.isFullyFetched()) {
nextPageLock.readLock().lock();
try {
super.findNext(fut);
} finally {
nextPageLock.readLock().unlock();
}
} else {
nextRowLock.readLock().lock();
try {
super.findNext(fut);
} finally {
nextRowLock.readLock().unlock();
}
}
}
}

public class MockDriver3MultiReader extends Driver3WorkerCQL.Driver3MultiReader {
public MockDriver3MultiReader(List<ResultSet> rss, Optional<ChangeId> lastChangeId) {
super(rss, lastChangeId);
this.readers = rss.stream().map(rs -> new MockDriver3Reader(rs, lastChangeId, nextPageLock, nextRowLock)).collect(Collectors.toList());
}
}

@Override
protected CompletableFuture<Reader> wrapResults(Task task, List<ResultSetFuture> futures) {
CompletableFuture<Reader> result = new CompletableFuture<>();
Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<ResultSet>>() {
@Override
public void onSuccess(List<ResultSet> rss) {
result.complete(new MockDriver3MultiReader(rss, task.state.getLastConsumedChangeId()));
}

@Override
public void onFailure(Throwable t) {
result.completeExceptionally(t);
}
}, MoreExecutors.directExecutor());
return result;
}
}
7 changes: 7 additions & 0 deletions scylla-cdc-lib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@
<artifactId>scylla-cdc-driver3</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.scylladb</groupId>
<artifactId>scylla-cdc-driver3</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
21 changes: 18 additions & 3 deletions scylla-cdc-lib/src/main/java/com/scylladb/cdc/lib/CDCConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.function.Function;
import java.util.function.Supplier;

import com.google.common.base.Preconditions;
import com.scylladb.cdc.cql.CQLConfiguration;
import com.scylladb.cdc.cql.MasterCQL;
import com.scylladb.cdc.cql.WorkerCQL;
import com.scylladb.cdc.cql.driver3.Driver3MasterCQL;
import com.scylladb.cdc.cql.driver3.Driver3Session;
import com.scylladb.cdc.cql.driver3.Driver3WorkerCQL;
Expand All @@ -31,15 +33,17 @@ public final class CDCConsumer implements AutoCloseable {
private MasterThread master;

private CDCConsumer(CQLConfiguration cqlConfiguration, MasterConfiguration.Builder masterConfigurationBuilder,
WorkerConfiguration.Builder workerConfigurationBuilder, Supplier<ScheduledExecutorService> executorServiceSupplier) {
WorkerConfiguration.Builder workerConfigurationBuilder, Supplier<ScheduledExecutorService> executorServiceSupplier,
Function<Driver3Session, WorkerCQL> workerCQLProvider) {
Preconditions.checkNotNull(cqlConfiguration);
Preconditions.checkNotNull(masterConfigurationBuilder);
Preconditions.checkNotNull(workerConfigurationBuilder);
Preconditions.checkNotNull(workerCQLProvider);

this.cdcThreadGroup = new ThreadGroup("Scylla-CDC-Threads");

this.session = new Driver3Session(cqlConfiguration);
workerConfigurationBuilder.withCQL(new Driver3WorkerCQL(session));
workerConfigurationBuilder.withCQL(workerCQLProvider.apply(session));
this.transport = new LocalTransport(cdcThreadGroup, workerConfigurationBuilder, executorServiceSupplier);

MasterCQL masterCQL = new Driver3MasterCQL(session);
Expand Down Expand Up @@ -101,6 +105,7 @@ public static class Builder {

private int workersCount = getDefaultWorkersCount();
private Supplier<ScheduledExecutorService> executorServiceSupplier = getDefaultExecutorServiceSupplier(workersCount);
private Function<Driver3Session, WorkerCQL> workerCQLProvider = Driver3WorkerCQL::new;

@SuppressWarnings("deprecation")
public Builder withConsumerProvider(RawChangeConsumerProvider consumerProvider) {
Expand Down Expand Up @@ -204,6 +209,11 @@ public Builder withCredentials(String user, String password) {
return this;
}

public Builder withQueryOptionsFetchSize(int fetchSize) {
cqlConfigurationBuilder.withQueryOptionsFetchSize(fetchSize);
return this;
}

public Builder withSleepBeforeFirstGenerationMs(long sleepBeforeFirstGenerationMs) {
masterConfigurationBuilder.withSleepBeforeFirstGenerationMs(sleepBeforeFirstGenerationMs);
return this;
Expand Down Expand Up @@ -247,9 +257,14 @@ public Builder withClock(Clock clock) {
return this;
}

public Builder withWorkerCQLProvider(Function<Driver3Session, WorkerCQL> workerCQLProvider) {
this.workerCQLProvider = Preconditions.checkNotNull(workerCQLProvider);
return this;
}

public CDCConsumer build() {
return new CDCConsumer(cqlConfigurationBuilder.build(),
masterConfigurationBuilder, workerConfigurationBuilder, executorServiceSupplier);
masterConfigurationBuilder, workerConfigurationBuilder, executorServiceSupplier, workerCQLProvider);
}

private static int getDefaultWorkersCount() {
Expand Down
102 changes: 102 additions & 0 deletions scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterAddColIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package com.scylladb.cdc.lib;

import com.datastax.driver.core.PreparedStatement;
import com.google.common.util.concurrent.Uninterruptibles;
import com.scylladb.cdc.model.worker.RawChange;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.opentest4j.AssertionFailedError;

import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertEquals;

@Tag("integration")
public class AlterAddColIT extends AlterTableBase {
@Override
public String testKeyspace() {
return "AlterAddColIT".toLowerCase();
}

@Override
public String createTableQuery() {
return String.format(
"CREATE TABLE %s.%s (column1 int, column2 int, column3 int, PRIMARY"
+ " KEY (column1, column2)) WITH cdc = {'enabled': 'true'};",
testKeyspace(), testTable());
}

@Override
public void applyAlteration() {
getDriverSession().execute(String.format("ALTER TABLE %s.%s " + "ADD column4 int;", testKeyspace(), testTable()));
}


@Override
public void verifyRawChangeBeforeAlter(RawChange change) {
assertEquals("cdc$stream_id", change.getSchema().getColumnDefinition(0).getColumnName());
assertEquals("cdc$time", change.getSchema().getColumnDefinition(1).getColumnName());
assertEquals("cdc$batch_seq_no", change.getSchema().getColumnDefinition(2).getColumnName());
assertEquals(
"cdc$deleted_column3", change.getSchema().getColumnDefinition(3).getColumnName());
assertEquals("cdc$end_of_batch", change.getSchema().getColumnDefinition(4).getColumnName());
assertEquals("cdc$operation", change.getSchema().getColumnDefinition(5).getColumnName());
assertEquals("cdc$ttl", change.getSchema().getColumnDefinition(6).getColumnName());
assertEquals("column1", change.getSchema().getColumnDefinition(7).getColumnName());
assertEquals("column2", change.getSchema().getColumnDefinition(8).getColumnName());
assertEquals("column3", change.getSchema().getColumnDefinition(9).getColumnName());
}

@Override
public void verifyRawChangeAfterAlter(RawChange change) {
assertEquals("cdc$stream_id", change.getSchema().getColumnDefinition(0).getColumnName());
assertEquals("cdc$time", change.getSchema().getColumnDefinition(1).getColumnName());
assertEquals("cdc$batch_seq_no", change.getSchema().getColumnDefinition(2).getColumnName());
assertEquals(
"cdc$deleted_column3", change.getSchema().getColumnDefinition(3).getColumnName());
assertEquals(
"cdc$deleted_column4", change.getSchema().getColumnDefinition(4).getColumnName());
assertEquals("cdc$end_of_batch", change.getSchema().getColumnDefinition(5).getColumnName());
assertEquals("cdc$operation", change.getSchema().getColumnDefinition(6).getColumnName());
assertEquals("cdc$ttl", change.getSchema().getColumnDefinition(7).getColumnName());
assertEquals("column1", change.getSchema().getColumnDefinition(8).getColumnName());
assertEquals("column2", change.getSchema().getColumnDefinition(9).getColumnName());
assertEquals("column3", change.getSchema().getColumnDefinition(10).getColumnName());
assertEquals("column4", change.getSchema().getColumnDefinition(11).getColumnName());
}

@Override
public Runnable createDatagenTask() {
return () -> {
PreparedStatement ps =
getDriverSession().prepare(
String.format(
"INSERT INTO %s.%s (column1, column2, column3) VALUES (?, ?, ?);",
testKeyspace(), testTable()));
while (!datagenShouldStop.get()) {
int current = datagenCounter.incrementAndGet();
getDriverSession().execute(ps.bind(1, 1, current));
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
}
};
}

@Test
public void alterBeforeNextPageTest(TestInfo testInfo) {
setTestTableName(testInfo.getTestMethod().get().getName().toLowerCase());
super.alterBeforeNextPageTestBody();
}

@Test
public void alterBeforeNextRowTest(TestInfo testInfo) {
setTestTableName(testInfo.getTestMethod().get().getName().toLowerCase());
try {
super.alterBeforeNextRowTestBody();
} catch (AssertionFailedError e) {
// Ignoring. It is bound to appear, because the alter cannot modify already fetched rows.
// CDCConsumer has no knowledge about from which page the RawChange comes.
// This test only checks that the setup does not break on anything else.
}
}
}
Loading
Loading