From 4884c27aad9c403e3832894e2277f4bad5d4a827 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20B=C4=85czkowski?= Date: Thu, 23 Oct 2025 21:43:15 +0200 Subject: [PATCH 1/8] Shade test jars Setting this option is required for MockDriver3WorkerCQL to work properly. Otherwise the types won't match. --- scylla-cdc-driver3/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/scylla-cdc-driver3/pom.xml b/scylla-cdc-driver3/pom.xml index 4b317cad..b74984b5 100644 --- a/scylla-cdc-driver3/pom.xml +++ b/scylla-cdc-driver3/pom.xml @@ -83,6 +83,7 @@ shade + true com.google.flogger From 468939de79381e590aaea7f792e881370c957cab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20B=C4=85czkowski?= Date: Thu, 23 Oct 2025 21:45:02 +0200 Subject: [PATCH 2/8] Allow setting default query fetch size Exposes in CDCConsumer already present option in CQLConfiguration Builder for setting fetch size. --- .../src/main/java/com/scylladb/cdc/lib/CDCConsumer.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/scylla-cdc-lib/src/main/java/com/scylladb/cdc/lib/CDCConsumer.java b/scylla-cdc-lib/src/main/java/com/scylladb/cdc/lib/CDCConsumer.java index 90c81e49..350e0745 100644 --- a/scylla-cdc-lib/src/main/java/com/scylladb/cdc/lib/CDCConsumer.java +++ b/scylla-cdc-lib/src/main/java/com/scylladb/cdc/lib/CDCConsumer.java @@ -204,6 +204,11 @@ public Builder withCredentials(String user, String password) { return this; } + public Builder withQueryOptionsFetchSize(int fetchSize) { + cqlConfigurationBuilder.withQueryOptionsFetchSize(fetchSize); + return this; + } + public Builder withSleepBeforeFirstGenerationMs(long sleepBeforeFirstGenerationMs) { masterConfigurationBuilder.withSleepBeforeFirstGenerationMs(sleepBeforeFirstGenerationMs); return this; From 8c81cf4bbd1980442b2d1d2b2127c236e5c06ca9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20B=C4=85czkowski?= Date: Thu, 23 Oct 2025 21:54:34 +0200 Subject: [PATCH 3/8] Create test-jar of scylla-cdc-driver3 module This will be needed for scylla-cdc-lib to have access to mock test class from driver3 module. --- scylla-cdc-driver3/pom.xml | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/scylla-cdc-driver3/pom.xml b/scylla-cdc-driver3/pom.xml index b74984b5..6d9d94c4 100644 --- a/scylla-cdc-driver3/pom.xml +++ b/scylla-cdc-driver3/pom.xml @@ -72,6 +72,19 @@ + + org.apache.maven.plugins + maven-jar-plugin + 3.4.2 + + + + test-jar + + + + + org.apache.maven.plugins maven-shade-plugin From d4f5d88aa9be5bcc83349c5ed483469caecfd17a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20B=C4=85czkowski?= Date: Thu, 23 Oct 2025 21:55:37 +0200 Subject: [PATCH 4/8] Add driver3 test-jar dependency to scylla-cdc-lib Necessary to access mock test classes. --- scylla-cdc-lib/pom.xml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/scylla-cdc-lib/pom.xml b/scylla-cdc-lib/pom.xml index 13402022..c1b97c79 100644 --- a/scylla-cdc-lib/pom.xml +++ b/scylla-cdc-lib/pom.xml @@ -26,6 +26,13 @@ scylla-cdc-driver3 ${project.version} + + com.scylladb + scylla-cdc-driver3 + ${project.version} + test-jar + test + From ad09197c16c196ed6d49616c6a0db875909dbffd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20B=C4=85czkowski?= Date: Thu, 23 Oct 2025 21:59:47 +0200 Subject: [PATCH 5/8] Rework alter table tests in scylla-cdc-lib Adjust classifiers in Driver3WorkerCQL: Reader and worker classes were made non-final and protected if previously private. This is necessary in order to implement required mocks without use of reflection. Methods `findNext` method, `ResultSet rs`, `lastChangeId` within Driver3Reader are now protected instead of private too. Add MockDriver3WorkerCQL: Adds the mock version of Driver3WorkerCQL. This mock is created with singular purpose of using it in tests where we alter the table while consumer is working. The mock adds ReentrantReadWriteLocks that allow to block the readers in specific moments. In the case of planned tests we mainly want to block before fetching the next page. Add abstract `AlterTableBase`: Adds abstract class which outlines the test pattern, where we alter the table while the CDCConsumer is running. It allows for overriding what should be the table, keyspace, initial DDL, background task for data generation, what kind of alteration should be used, the correct RawChange schema before the alteration and after, the RawChangeConsumer used. Add several alter table tests: Adds `AlterAddColIT` (adds column before all Tasks fetch next page; also has add column before reading next row variant), adds `AlterUpdateUdtIT` (adds new field to UDT before next page), adds `AlterDropColIT` (drops column before next page), adds `AlterReAddColIT` (drops then adds the same column with different type). All tests extend `AlterTableBase` and use the same template, but different setup. Adds minor fixes to AlterTableIT: This is an old test that was left mostly as is. The minor adjustments are using logger instead of printStackTrace and gracefully stopping the consumer. --- .../cdc/cql/driver3/Driver3WorkerCQL.java | 29 +- .../cdc/cql/driver3/MockDriver3WorkerCQL.java | 89 +++++++ .../com/scylladb/cdc/lib/CDCConsumer.java | 16 +- .../com/scylladb/cdc/lib/AlterAddColIT.java | 112 ++++++++ .../com/scylladb/cdc/lib/AlterDropColIT.java | 105 ++++++++ .../com/scylladb/cdc/lib/AlterReAddColIT.java | 111 ++++++++ .../com/scylladb/cdc/lib/AlterTableBase.java | 247 ++++++++++++++++++ .../com/scylladb/cdc/lib/AlterTableIT.java | 5 +- .../scylladb/cdc/lib/AlterUpdateUdtIT.java | 144 ++++++++++ 9 files changed, 843 insertions(+), 15 deletions(-) create mode 100644 scylla-cdc-driver3/src/test/java/com/scylladb/cdc/cql/driver3/MockDriver3WorkerCQL.java create mode 100644 scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterAddColIT.java create mode 100644 scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterDropColIT.java create mode 100644 scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterReAddColIT.java create mode 100644 scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterTableBase.java create mode 100644 scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterUpdateUdtIT.java diff --git a/scylla-cdc-driver3/src/main/java/com/scylladb/cdc/cql/driver3/Driver3WorkerCQL.java b/scylla-cdc-driver3/src/main/java/com/scylladb/cdc/cql/driver3/Driver3WorkerCQL.java index 0ed5fb35..bd7a8316 100644 --- a/scylla-cdc-driver3/src/main/java/com/scylladb/cdc/cql/driver3/Driver3WorkerCQL.java +++ b/scylla-cdc-driver3/src/main/java/com/scylladb/cdc/cql/driver3/Driver3WorkerCQL.java @@ -37,7 +37,7 @@ import com.scylladb.cdc.model.worker.RawChange; import com.scylladb.cdc.model.worker.Task; -public final class Driver3WorkerCQL implements WorkerCQL { +public class Driver3WorkerCQL implements WorkerCQL { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); private final Session session; @@ -95,11 +95,11 @@ public void prepare(Set tables) throws InterruptedException, Executio } } - private final class Driver3Reader implements Reader { + protected class Driver3Reader implements Reader { - private volatile ResultSet rs; + protected volatile ResultSet rs; private volatile ChangeSchema schema; - private final Optional lastChangeId; + protected final Optional lastChangeId; private volatile boolean shouldTryRecreateSchema = true; private volatile int lastPageColDefsHashcode = 0; @@ -108,7 +108,7 @@ public Driver3Reader(ResultSet rs, Optional lastChangeId) { this.lastChangeId = Preconditions.checkNotNull(lastChangeId); } - private void findNext(CompletableFuture> fut) { + protected void findNext(CompletableFuture> fut) { if (rs.getAvailableWithoutFetching() == 0) { if (rs.isFullyFetched()) { fut.complete(Optional.empty()); @@ -183,10 +183,10 @@ public CompletableFuture> nextChange() { } - private final class Driver3MultiReader implements Reader { + protected class Driver3MultiReader implements Reader { - private volatile List readers; - private AtomicInteger currentReaderIndex; + protected volatile List readers; + protected AtomicInteger currentReaderIndex; public Driver3MultiReader(List rss, Optional lastChangeId) { this.readers = rss.stream().map(rs -> new Driver3Reader(rs, lastChangeId)).collect(Collectors.toList()); @@ -223,8 +223,7 @@ public CompletableFuture> nextChange() { } - private CompletableFuture query(PreparedStatement stmt, Task task) { - CompletableFuture result = new CompletableFuture<>(); + protected List queryTables(PreparedStatement stmt, Task task) { List futures = task.streams.stream().map(StreamId::getValue) .map(streamId -> session.executeAsync( @@ -235,9 +234,12 @@ private CompletableFuture query(PreparedStatement stmt, Task task) { ) ).collect(Collectors.toList()); logger.atFine().log("Querying window: [%s, %s] for task: %s, task state: %s", task.state.getWindowStart(), task.state.getWindowEnd(), task.id, task.state); + return futures; + } + protected CompletableFuture wrapResults(Task task, List futures) { + CompletableFuture result = new CompletableFuture<>(); Futures.addCallback(Futures.allAsList(futures), new FutureCallback>() { - @Override public void onSuccess(List rss) { result.complete(new Driver3MultiReader(rss, task.state.getLastConsumedChangeId())); @@ -251,6 +253,11 @@ public void onFailure(Throwable t) { return result; } + protected CompletableFuture query(PreparedStatement stmt, Task task) { + List futures = queryTables(stmt, task); + return wrapResults(task, futures); + } + @Override public CompletableFuture createReader(Task task) { PreparedStatement stmt = preparedStmts.get(task.id.getTable()); diff --git a/scylla-cdc-driver3/src/test/java/com/scylladb/cdc/cql/driver3/MockDriver3WorkerCQL.java b/scylla-cdc-driver3/src/test/java/com/scylladb/cdc/cql/driver3/MockDriver3WorkerCQL.java new file mode 100644 index 00000000..71d1cf07 --- /dev/null +++ b/scylla-cdc-driver3/src/test/java/com/scylladb/cdc/cql/driver3/MockDriver3WorkerCQL.java @@ -0,0 +1,89 @@ +package com.scylladb.cdc.cql.driver3; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import com.scylladb.cdc.model.worker.ChangeId; +import com.scylladb.cdc.model.worker.RawChange; +import com.scylladb.cdc.model.worker.Task; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +// Created for alterTableBeforeNextPage test method +public class MockDriver3WorkerCQL extends Driver3WorkerCQL { + public final ReentrantReadWriteLock nextPageLock; + public final ReentrantReadWriteLock nextRowLock; + + public MockDriver3WorkerCQL(Driver3Session driver3Session) { + this(driver3Session, new ReentrantReadWriteLock(), new ReentrantReadWriteLock()); + } + + public MockDriver3WorkerCQL(Driver3Session driver3Session, ReentrantReadWriteLock nextPageLock, ReentrantReadWriteLock nextRowLock) { + super(driver3Session); + this.nextPageLock = nextPageLock; + this.nextRowLock = nextRowLock; + } + + public class MockDriver3Reader extends Driver3WorkerCQL.Driver3Reader { + // Write lock will be used for blocking from outside. + // Read lock will be used inside the worker. + public final ReadWriteLock nextPageLock; + public final ReadWriteLock nextRowLock; + + public MockDriver3Reader(ResultSet rs, Optional lastChangeId, ReadWriteLock nextPageLock, ReadWriteLock nextRowLock) { + super(rs, lastChangeId); + this.nextPageLock = nextPageLock; + this.nextRowLock = nextRowLock; + } + + @Override + protected void findNext(CompletableFuture> fut) { + if (rs.getAvailableWithoutFetching() == 0 && !rs.isFullyFetched()) { + nextPageLock.readLock().lock(); + try { + super.findNext(fut); + } finally { + nextPageLock.readLock().unlock(); + } + } else { + nextRowLock.readLock().lock(); + try { + super.findNext(fut); + } finally { + nextRowLock.readLock().unlock(); + } + } + } + } + + public class MockDriver3MultiReader extends Driver3WorkerCQL.Driver3MultiReader { + public MockDriver3MultiReader(List rss, Optional lastChangeId) { + super(rss, lastChangeId); + this.readers = rss.stream().map(rs -> new MockDriver3Reader(rs, lastChangeId, nextPageLock, nextRowLock)).collect(Collectors.toList()); + } + } + + @Override + protected CompletableFuture wrapResults(Task task, List futures) { + CompletableFuture result = new CompletableFuture<>(); + Futures.addCallback(Futures.allAsList(futures), new FutureCallback>() { + @Override + public void onSuccess(List rss) { + result.complete(new MockDriver3MultiReader(rss, task.state.getLastConsumedChangeId())); + } + + @Override + public void onFailure(Throwable t) { + result.completeExceptionally(t); + } + }, MoreExecutors.directExecutor()); + return result; + } +} diff --git a/scylla-cdc-lib/src/main/java/com/scylladb/cdc/lib/CDCConsumer.java b/scylla-cdc-lib/src/main/java/com/scylladb/cdc/lib/CDCConsumer.java index 350e0745..39ce7104 100644 --- a/scylla-cdc-lib/src/main/java/com/scylladb/cdc/lib/CDCConsumer.java +++ b/scylla-cdc-lib/src/main/java/com/scylladb/cdc/lib/CDCConsumer.java @@ -6,11 +6,13 @@ import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.function.Function; import java.util.function.Supplier; import com.google.common.base.Preconditions; import com.scylladb.cdc.cql.CQLConfiguration; import com.scylladb.cdc.cql.MasterCQL; +import com.scylladb.cdc.cql.WorkerCQL; import com.scylladb.cdc.cql.driver3.Driver3MasterCQL; import com.scylladb.cdc.cql.driver3.Driver3Session; import com.scylladb.cdc.cql.driver3.Driver3WorkerCQL; @@ -31,15 +33,17 @@ public final class CDCConsumer implements AutoCloseable { private MasterThread master; private CDCConsumer(CQLConfiguration cqlConfiguration, MasterConfiguration.Builder masterConfigurationBuilder, - WorkerConfiguration.Builder workerConfigurationBuilder, Supplier executorServiceSupplier) { + WorkerConfiguration.Builder workerConfigurationBuilder, Supplier executorServiceSupplier, + Function workerCQLProvider) { Preconditions.checkNotNull(cqlConfiguration); Preconditions.checkNotNull(masterConfigurationBuilder); Preconditions.checkNotNull(workerConfigurationBuilder); + Preconditions.checkNotNull(workerCQLProvider); this.cdcThreadGroup = new ThreadGroup("Scylla-CDC-Threads"); this.session = new Driver3Session(cqlConfiguration); - workerConfigurationBuilder.withCQL(new Driver3WorkerCQL(session)); + workerConfigurationBuilder.withCQL(workerCQLProvider.apply(session)); this.transport = new LocalTransport(cdcThreadGroup, workerConfigurationBuilder, executorServiceSupplier); MasterCQL masterCQL = new Driver3MasterCQL(session); @@ -101,6 +105,7 @@ public static class Builder { private int workersCount = getDefaultWorkersCount(); private Supplier executorServiceSupplier = getDefaultExecutorServiceSupplier(workersCount); + private Function workerCQLProvider = Driver3WorkerCQL::new; @SuppressWarnings("deprecation") public Builder withConsumerProvider(RawChangeConsumerProvider consumerProvider) { @@ -252,9 +257,14 @@ public Builder withClock(Clock clock) { return this; } + public Builder withWorkerCQLProvider(Function workerCQLProvider) { + this.workerCQLProvider = Preconditions.checkNotNull(workerCQLProvider); + return this; + } + public CDCConsumer build() { return new CDCConsumer(cqlConfigurationBuilder.build(), - masterConfigurationBuilder, workerConfigurationBuilder, executorServiceSupplier); + masterConfigurationBuilder, workerConfigurationBuilder, executorServiceSupplier, workerCQLProvider); } private static int getDefaultWorkersCount() { diff --git a/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterAddColIT.java b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterAddColIT.java new file mode 100644 index 00000000..0f92221a --- /dev/null +++ b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterAddColIT.java @@ -0,0 +1,112 @@ +package com.scylladb.cdc.lib; + +import com.datastax.driver.core.PreparedStatement; +import com.google.common.util.concurrent.Uninterruptibles; +import com.scylladb.cdc.model.worker.RawChange; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.opentest4j.AssertionFailedError; + +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("integration") +public class AlterAddColIT extends AlterTableBase { + @Override + public String testKeyspace() { + return "AlterAddColIT".toLowerCase(); + } + + protected String testTableName; + @Override + public String testTable() { + return testTableName; + } + + public void setTestTableName(String name) { + this.testTableName = name; + } + + @Override + public String createTableQuery() { + return String.format( + "CREATE TABLE %s.%s (column1 int, column2 int, column3 int, PRIMARY" + + " KEY (column1, column2)) WITH cdc = {'enabled': 'true'};", + testKeyspace(), testTable()); + } + + @Override + public void applyAlteration() { + getDriverSession().execute(String.format("ALTER TABLE %s.%s " + "ADD column4 int;", testKeyspace(), testTable())); + } + + + @Override + public void verifyRawChangeBeforeAlter(RawChange change) { + assertEquals("cdc$stream_id", change.getSchema().getColumnDefinition(0).getColumnName()); + assertEquals("cdc$time", change.getSchema().getColumnDefinition(1).getColumnName()); + assertEquals("cdc$batch_seq_no", change.getSchema().getColumnDefinition(2).getColumnName()); + assertEquals( + "cdc$deleted_column3", change.getSchema().getColumnDefinition(3).getColumnName()); + assertEquals("cdc$end_of_batch", change.getSchema().getColumnDefinition(4).getColumnName()); + assertEquals("cdc$operation", change.getSchema().getColumnDefinition(5).getColumnName()); + assertEquals("cdc$ttl", change.getSchema().getColumnDefinition(6).getColumnName()); + assertEquals("column1", change.getSchema().getColumnDefinition(7).getColumnName()); + assertEquals("column2", change.getSchema().getColumnDefinition(8).getColumnName()); + assertEquals("column3", change.getSchema().getColumnDefinition(9).getColumnName()); + } + + @Override + public void verifyRawChangeAfterAlter(RawChange change) { + assertEquals("cdc$stream_id", change.getSchema().getColumnDefinition(0).getColumnName()); + assertEquals("cdc$time", change.getSchema().getColumnDefinition(1).getColumnName()); + assertEquals("cdc$batch_seq_no", change.getSchema().getColumnDefinition(2).getColumnName()); + assertEquals( + "cdc$deleted_column3", change.getSchema().getColumnDefinition(3).getColumnName()); + assertEquals( + "cdc$deleted_column4", change.getSchema().getColumnDefinition(4).getColumnName()); + assertEquals("cdc$end_of_batch", change.getSchema().getColumnDefinition(5).getColumnName()); + assertEquals("cdc$operation", change.getSchema().getColumnDefinition(6).getColumnName()); + assertEquals("cdc$ttl", change.getSchema().getColumnDefinition(7).getColumnName()); + assertEquals("column1", change.getSchema().getColumnDefinition(8).getColumnName()); + assertEquals("column2", change.getSchema().getColumnDefinition(9).getColumnName()); + assertEquals("column3", change.getSchema().getColumnDefinition(10).getColumnName()); + assertEquals("column4", change.getSchema().getColumnDefinition(11).getColumnName()); + } + + @Override + public Runnable createDatagenTask() { + return () -> { + PreparedStatement ps = + getDriverSession().prepare( + String.format( + "INSERT INTO %s.%s (column1, column2, column3) VALUES (?, ?, ?);", + testKeyspace(), testTable())); + while (!datagenShouldStop.get()) { + int current = datagenCounter.incrementAndGet(); + getDriverSession().execute(ps.bind(1, 1, current)); + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } + }; + } + + @Test + public void alterBeforeNextPageTest(TestInfo testInfo) { + setTestTableName(testInfo.getTestMethod().get().getName().toLowerCase()); + super.alterBeforeNextPageTestPattern(); + } + + @Test + public void alterBeforeNextRowTest(TestInfo testInfo) { + setTestTableName(testInfo.getTestMethod().get().getName().toLowerCase()); + try { + super.alterBeforeNextRowTestPattern(); + } catch (AssertionFailedError e) { + // Ignoring. It is bound to appear, because the alter cannot modify already fetched rows. + // CDCConsumer has no knowledge about from which page the RawChange comes. + // This test only checks that the setup does not break on anything else. + } + } +} diff --git a/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterDropColIT.java b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterDropColIT.java new file mode 100644 index 00000000..df11096f --- /dev/null +++ b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterDropColIT.java @@ -0,0 +1,105 @@ +package com.scylladb.cdc.lib; + +import com.datastax.driver.core.PreparedStatement; +import com.google.common.util.concurrent.Uninterruptibles; +import com.scylladb.cdc.model.worker.RawChange; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("integration") +public class AlterDropColIT extends AlterTableBase { + @Override + public String testKeyspace() { + return "AlterDropColIT".toLowerCase(); + } + + @Override + public String testTable() { + return "test"; + } + + @Override + public String createTableQuery() { + return String.format( + "CREATE TABLE %s.%s (column1 int, column2 int, column3 int, column4 int, PRIMARY" + + " KEY (column1, column2)) WITH cdc = {'enabled': 'true'};", + testKeyspace(), testTable()); + } + + @Override + public void applyAlteration() { + getDriverSession().execute(String.format("ALTER TABLE %s.%s " + "DROP column4;", testKeyspace(), testTable())); + } + + + @Override + public void verifyRawChangeBeforeAlter(RawChange change) { + assertEquals("cdc$stream_id", change.getSchema().getColumnDefinition(0).getColumnName()); + assertEquals("cdc$time", change.getSchema().getColumnDefinition(1).getColumnName()); + assertEquals("cdc$batch_seq_no", change.getSchema().getColumnDefinition(2).getColumnName()); + assertEquals( + "cdc$deleted_column3", change.getSchema().getColumnDefinition(3).getColumnName()); + assertEquals( + "cdc$deleted_column4", change.getSchema().getColumnDefinition(4).getColumnName()); + assertEquals("cdc$end_of_batch", change.getSchema().getColumnDefinition(5).getColumnName()); + assertEquals("cdc$operation", change.getSchema().getColumnDefinition(6).getColumnName()); + assertEquals("cdc$ttl", change.getSchema().getColumnDefinition(7).getColumnName()); + assertEquals("column1", change.getSchema().getColumnDefinition(8).getColumnName()); + assertEquals("column2", change.getSchema().getColumnDefinition(9).getColumnName()); + assertEquals("column3", change.getSchema().getColumnDefinition(10).getColumnName()); + assertEquals("column4", change.getSchema().getColumnDefinition(11).getColumnName()); + } + + @Override + public void verifyRawChangeAfterAlter(RawChange change) { + assertEquals("cdc$stream_id", change.getSchema().getColumnDefinition(0).getColumnName()); + assertEquals("cdc$time", change.getSchema().getColumnDefinition(1).getColumnName()); + assertEquals("cdc$batch_seq_no", change.getSchema().getColumnDefinition(2).getColumnName()); + assertEquals( + "cdc$deleted_column3", change.getSchema().getColumnDefinition(3).getColumnName()); + assertEquals("cdc$end_of_batch", change.getSchema().getColumnDefinition(4).getColumnName()); + assertEquals("cdc$operation", change.getSchema().getColumnDefinition(5).getColumnName()); + assertEquals("cdc$ttl", change.getSchema().getColumnDefinition(6).getColumnName()); + assertEquals("column1", change.getSchema().getColumnDefinition(7).getColumnName()); + assertEquals("column2", change.getSchema().getColumnDefinition(8).getColumnName()); + assertEquals("column3", change.getSchema().getColumnDefinition(9).getColumnName()); + } + + @Override + public Runnable createDatagenTask() { + return () -> { + PreparedStatement psBeforeAlter = null; + PreparedStatement psAfterAlter = null; + while (!datagenShouldStop.get()) { + int current = datagenCounter.incrementAndGet(); + if (!isAfterAlter.get()) { + if (psBeforeAlter == null) { + psBeforeAlter = getDriverSession().prepare( + String.format( + "INSERT INTO %s.%s (column1, column2, column3, column4) VALUES (?, ?, ?, ?);", + testKeyspace(), testTable())); + } + getDriverSession().execute(psBeforeAlter.bind(1, 1, current, current)); + } else { + if (psAfterAlter == null) { + psAfterAlter = getDriverSession().prepare( + String.format( + "INSERT INTO %s.%s (column1, column2, column3) VALUES (?, ?, ?);", + testKeyspace(), testTable())); + } + getDriverSession().execute(psAfterAlter.bind(1, 1, current)); + } + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } + }; + } + + @Test + public void alterBeforeNextPageTestPattern() { + super.alterBeforeNextPageTestPattern(); + } +} diff --git a/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterReAddColIT.java b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterReAddColIT.java new file mode 100644 index 00000000..50a4765f --- /dev/null +++ b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterReAddColIT.java @@ -0,0 +1,111 @@ +package com.scylladb.cdc.lib; + +import com.datastax.driver.core.PreparedStatement; +import com.google.common.util.concurrent.Uninterruptibles; +import com.scylladb.cdc.model.worker.ChangeSchema; +import com.scylladb.cdc.model.worker.RawChange; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +// DROP followed by ADD with different type +@Tag("integration") +public class AlterReAddColIT extends AlterTableBase { + @Override + public String testKeyspace() { + return "AlterReAddColIT".toLowerCase(); + } + + @Override + public String testTable() { + return "test"; + } + + @Override + public String createTableQuery() { + return String.format( + "CREATE TABLE %s.%s (column1 int, column2 int, column3 int, column4 int, PRIMARY" + + " KEY (column1, column2)) WITH cdc = {'enabled': 'true'};", + testKeyspace(), testTable()); + } + + @Override + public void applyAlteration() { + getDriverSession().execute(String.format("ALTER TABLE %s.%s " + "DROP column4;", testKeyspace(), testTable())); + getDriverSession().execute(String.format("ALTER TABLE %s.%s " + "ADD column4 text;", testKeyspace(), testTable())); + } + + + @Override + public void verifyRawChangeBeforeAlter(RawChange change) { + assertEquals("cdc$stream_id", change.getSchema().getColumnDefinition(0).getColumnName()); + assertEquals("cdc$time", change.getSchema().getColumnDefinition(1).getColumnName()); + assertEquals("cdc$batch_seq_no", change.getSchema().getColumnDefinition(2).getColumnName()); + assertEquals( + "cdc$deleted_column3", change.getSchema().getColumnDefinition(3).getColumnName()); + assertEquals( + "cdc$deleted_column4", change.getSchema().getColumnDefinition(4).getColumnName()); + assertEquals("cdc$end_of_batch", change.getSchema().getColumnDefinition(5).getColumnName()); + assertEquals("cdc$operation", change.getSchema().getColumnDefinition(6).getColumnName()); + assertEquals("cdc$ttl", change.getSchema().getColumnDefinition(7).getColumnName()); + assertEquals("column1", change.getSchema().getColumnDefinition(8).getColumnName()); + assertEquals("column2", change.getSchema().getColumnDefinition(9).getColumnName()); + assertEquals("column3", change.getSchema().getColumnDefinition(10).getColumnName()); + assertEquals("column4", change.getSchema().getColumnDefinition(11).getColumnName()); + assertEquals(ChangeSchema.CqlType.INT, change.getSchema().getColumnDefinition(11).getBaseTableDataType().getCqlType()); + } + + @Override + public void verifyRawChangeAfterAlter(RawChange change) { + assertEquals("cdc$stream_id", change.getSchema().getColumnDefinition(0).getColumnName()); + assertEquals("cdc$time", change.getSchema().getColumnDefinition(1).getColumnName()); + assertEquals("cdc$batch_seq_no", change.getSchema().getColumnDefinition(2).getColumnName()); + assertEquals( + "cdc$deleted_column3", change.getSchema().getColumnDefinition(3).getColumnName()); + assertEquals( + "cdc$deleted_column4", change.getSchema().getColumnDefinition(4).getColumnName()); + assertEquals("cdc$end_of_batch", change.getSchema().getColumnDefinition(5).getColumnName()); + assertEquals("cdc$operation", change.getSchema().getColumnDefinition(6).getColumnName()); + assertEquals("cdc$ttl", change.getSchema().getColumnDefinition(7).getColumnName()); + assertEquals("column1", change.getSchema().getColumnDefinition(8).getColumnName()); + assertEquals("column2", change.getSchema().getColumnDefinition(9).getColumnName()); + assertEquals("column3", change.getSchema().getColumnDefinition(10).getColumnName()); + assertEquals("column4", change.getSchema().getColumnDefinition(11).getColumnName()); + assertEquals(ChangeSchema.CqlType.VARCHAR, change.getSchema().getColumnDefinition(11).getBaseTableDataType().getCqlType()); + } + + @Override + public Runnable createDatagenTask() { + return () -> { + PreparedStatement ps = getDriverSession().prepare( + String.format( + "INSERT INTO %s.%s (column1, column2, column3, column4) VALUES (?, ?, ?, ?);", + testKeyspace(), testTable())); + boolean reprepareOnce = true; + while (!datagenShouldStop.get()) { + int current = datagenCounter.incrementAndGet(); + if (!isAfterAlter.get()) { + getDriverSession().execute(ps.bind(1, 1, current, current)); + } else { + if (reprepareOnce) { + reprepareOnce = false; + ps = getDriverSession().prepare( + String.format( + "INSERT INTO %s.%s (column1, column2, column3, column4) VALUES (?, ?, ?, ?) ;", + testKeyspace(), testTable())); + } + getDriverSession().execute(ps.bind(1, 1, current, Integer.toString(current))); + } + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } + }; + } + + @Test + public void alterBeforeNextPageTestPattern() { + super.alterBeforeNextPageTestPattern(); + } +} diff --git a/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterTableBase.java b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterTableBase.java new file mode 100644 index 00000000..7852f6e2 --- /dev/null +++ b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterTableBase.java @@ -0,0 +1,247 @@ +package com.scylladb.cdc.lib; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import com.google.common.base.Preconditions; +import com.google.common.flogger.FluentLogger; +import com.google.common.util.concurrent.Uninterruptibles; +import com.scylladb.cdc.cql.driver3.MockDriver3WorkerCQL; +import com.scylladb.cdc.model.TableName; +import com.scylladb.cdc.model.worker.ChangeSchema; +import com.scylladb.cdc.model.worker.RawChange; +import com.scylladb.cdc.model.worker.RawChangeConsumer; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterAll; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.logging.Level; + +public abstract class AlterTableBase { + protected static final FluentLogger log = FluentLogger.forEnclosingClass(); + protected Properties systemProperties = System.getProperties(); + protected String hostname = Preconditions.checkNotNull(systemProperties.getProperty("scylla.docker.hostname")); + protected int port = Integer.parseInt(systemProperties.getProperty("scylla.docker.port")); + protected String scyllaVersion = Preconditions.checkNotNull(systemProperties.getProperty("scylla.docker.version")); + + private static Session driverSession; + private static Cluster cluster; + + public abstract String testKeyspace(); + + public abstract String testTable(); + + public void wipeKeyspace() { + Session session = getDriverSession(); + String dropKeyspaceQuery = String.format("DROP KEYSPACE IF EXISTS %s;", testKeyspace()); + session.execute(dropKeyspaceQuery); + } + + public String createKeyspaceQuery() { + return String.format("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', " + "'replication_factor" + + "': 1};", testKeyspace()); + } + + public abstract String createTableQuery(); + + public abstract void applyAlteration(); + + public abstract void verifyRawChangeBeforeAlter(RawChange change); + + public abstract void verifyRawChangeAfterAlter(RawChange change); + + public abstract Runnable createDatagenTask(); + + protected AtomicBoolean isAfterAlter; + protected AtomicBoolean consumedChangeAfterAlter; + protected AtomicLong lastConsumedTime; + protected AtomicBoolean datagenShouldStop; + protected AtomicInteger datagenCounter; + protected List rawChangesBeforeAlter; + protected List rawChangesAfterAlter; + + public RawChangeConsumer createChangeConsumer() { + return change -> { + lastConsumedTime.set(System.currentTimeMillis()); + if (!isAfterAlter.get()) { + rawChangesBeforeAlter.add(change); + } else { + consumedChangeAfterAlter.set(true); + rawChangesAfterAlter.add(change); + } + return CompletableFuture.completedFuture(null); + }; + } + + protected int confidenceWindowSeconds = 15; + protected ReentrantReadWriteLock nextPageLock; + protected ReentrantReadWriteLock nextRowLock; + + public CDCConsumer.Builder defaultCDCConsumerBuilder() { + return CDCConsumer.builder() + .addContactPoint(new InetSocketAddress(hostname, port)) + .addTable(new TableName(testKeyspace(), testTable())) + .withConsumer(createChangeConsumer()) + .withQueryTimeWindowSizeMs(15 * 1000) + .withConfidenceWindowSizeMs(confidenceWindowSeconds * 1000) + .withWorkersCount(1) + .withWorkerCQLProvider((driver3Session) -> new MockDriver3WorkerCQL(driver3Session, nextPageLock, nextRowLock)); + } + + public Session getDriverSession() { + if (cluster == null || cluster.isClosed()) { + cluster = Cluster.builder().addContactPoint(hostname).withPort(port).build(); + } + if (driverSession == null || driverSession.isClosed()) { + driverSession = cluster.connect(); + } + return driverSession; + } + + protected void clearSharedVariables() { + isAfterAlter = new AtomicBoolean(false); + consumedChangeAfterAlter = new AtomicBoolean(false); + lastConsumedTime = new AtomicLong(0); + datagenShouldStop = new AtomicBoolean(false); + datagenCounter = new AtomicInteger(0); + rawChangesBeforeAlter = new ArrayList<>(); + rawChangesAfterAlter = new ArrayList<>(); + confidenceWindowSeconds = 15; + nextPageLock = new ReentrantReadWriteLock(); + nextRowLock = new ReentrantReadWriteLock(); + } + + protected void createKeyspaceAndTable() { + wipeKeyspace(); + getDriverSession().execute(createKeyspaceQuery()); + getDriverSession().execute(createTableQuery()); + } + + public void alterBeforeNextPageTestPattern() { + clearSharedVariables(); + createKeyspaceAndTable(); + Thread datagenThread = new Thread(createDatagenTask()); + datagenThread.setName("DatagenThread"); + datagenThread.start(); + + try (CDCConsumer consumer = defaultCDCConsumerBuilder().withQueryOptionsFetchSize(1).build()) { + Uninterruptibles.sleepUninterruptibly(confidenceWindowSeconds, TimeUnit.SECONDS); + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> { + if (datagenCounter.get() > 15) { + return true; + } + return false; + }); + nextPageLock.writeLock().lock(); + consumer.start(); + Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> nextPageLock.hasQueuedThreads() && (System.currentTimeMillis() - lastConsumedTime.get() > 10000)); + applyAlteration(); + isAfterAlter.set(true); + nextPageLock.writeLock().unlock(); + Awaitility.await().atMost(125, TimeUnit.SECONDS).until(consumedChangeAfterAlter::get); + Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS); + consumer.stop(); + } catch (InterruptedException e) { + log.atInfo().withCause(e).log("Caught InterruptedException"); + } + + datagenShouldStop.set(true); + try { + datagenThread.join(5000); + } catch (InterruptedException e) { + throw new RuntimeException("Failed to join the datagen thread", e); + } + + verifyAllRawChanges(); + } + + public void alterBeforeNextRowTestPattern() { + clearSharedVariables(); + createKeyspaceAndTable(); + Thread datagenThread = new Thread(createDatagenTask()); + datagenThread.start(); + + try (CDCConsumer consumer = defaultCDCConsumerBuilder().build()) { + Uninterruptibles.sleepUninterruptibly(confidenceWindowSeconds, TimeUnit.SECONDS); + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> { + if (datagenCounter.get() > 15) { + return true; + } + return false; + }); + consumer.start(); + // Let consumer reach the first RawChange. Otherwise, everything will lock at the very start. + Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> (System.currentTimeMillis() - lastConsumedTime.get() < 10000)); + nextRowLock.writeLock().lock(); + Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> nextRowLock.hasQueuedThreads() && (System.currentTimeMillis() - lastConsumedTime.get() > 10000)); + applyAlteration(); + isAfterAlter.set(true); + nextRowLock.writeLock().unlock(); + Awaitility.await().atMost(125, TimeUnit.SECONDS).until(consumedChangeAfterAlter::get); + Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS); + consumer.stop(); + } catch (InterruptedException e) { + log.atInfo().withCause(e).log("Caught InterruptedException"); + } + datagenShouldStop.set(true); + try { + datagenThread.join(5000); + } catch (InterruptedException e) { + throw new RuntimeException("Failed to join the datagen thread", e); + } + + verifyAllRawChanges(); + } + + protected void verifyAllRawChanges() { + for (RawChange c : rawChangesBeforeAlter) { + c.toString(); // forces Driver3RawChange to go through all ColumnDefinitions + verifyRawChangeBeforeAlter(c); + } + for (RawChange c : rawChangesAfterAlter) { + c.toString(); + verifyRawChangeAfterAlter(c); + } + } + + @AfterAll + public static void closeSession() { + if (driverSession != null && !driverSession.isClosed()) { + driverSession.close(); + } + if (cluster != null && !cluster.isClosed()) { + cluster.close(); + } + } + + protected synchronized void printDetails(RawChange change) { + List list = change.getSchema().getAllColumnDefinitions(); + System.out.println("RawChange details:"); + for (ChangeSchema.ColumnDefinition cdef : list) { + StringBuilder sb = new StringBuilder(); + sb.append("column name: ") + .append(cdef.getColumnName()) + .append(" column index: ") + .append(cdef.getIndex()) + .append(" datatype: ") + .append(cdef.getCdcLogDataType()); + if (cdef.isCdcColumn()) { + sb.append(" basedatatype: isCdcColumn"); + } else { + sb.append(" basedatatype: ").append(cdef.getBaseTableDataType()); + } + System.out.println(sb); + } + System.out.println(change.getAsObject("column1")); + System.out.flush(); + } + +} diff --git a/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterTableIT.java b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterTableIT.java index ba0c5b96..d464a77e 100644 --- a/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterTableIT.java +++ b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterTableIT.java @@ -7,6 +7,7 @@ import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Session; import com.google.common.base.Preconditions; +import com.google.common.flogger.FluentLogger; import com.google.common.util.concurrent.Uninterruptibles; import com.scylladb.cdc.model.TableName; import com.scylladb.cdc.model.worker.ChangeSchema; @@ -25,6 +26,7 @@ @Tag("integration") public class AlterTableIT { + private static final FluentLogger log = FluentLogger.forEnclosingClass(); Properties systemProperties = System.getProperties(); String hostname = Preconditions.checkNotNull(systemProperties.getProperty("scylla.docker.hostname")); @@ -102,8 +104,9 @@ public void alterBaseTableAtRuntime() { Thread.sleep(35 * 1000); session.execute(String.format("ALTER TABLE %s.%s " + "ADD column4 int;", keyspace, table)); Thread.sleep(20 * 1000); + consumer.stop(); } catch (InterruptedException e) { - e.printStackTrace(); + log.atInfo().withCause(e).log("Caught InterruptedException"); } taskShouldStop.set(true); try { diff --git a/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterUpdateUdtIT.java b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterUpdateUdtIT.java new file mode 100644 index 00000000..a8dcf525 --- /dev/null +++ b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterUpdateUdtIT.java @@ -0,0 +1,144 @@ +package com.scylladb.cdc.lib; + +import com.datastax.driver.core.PreparedStatement; +import com.google.common.util.concurrent.Uninterruptibles; +import com.scylladb.cdc.model.worker.ChangeSchema; +import com.scylladb.cdc.model.worker.RawChange; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("integration") +public class AlterUpdateUdtIT extends AlterTableBase { + + @Override + public String testKeyspace() { + return "AlterUpdateUdtIT".toLowerCase(); + } + + @Override + public String testTable() { + return "test"; + } + + @Override + protected void createKeyspaceAndTable() { + wipeKeyspace(); + getDriverSession().execute(createKeyspaceQuery()); + getDriverSession().execute(createUdtQuery()); + getDriverSession().execute(createTableQuery()); + } + + private String createUdtQuery() { + return String.format( + "CREATE TYPE IF NOT EXISTS %s.simple_udt (a int, b text);", + testKeyspace()); + } + + @Override + public String createTableQuery() { + return String.format( + "CREATE TABLE %s.%s (column1 int, column2 int, column3 simple_udt, PRIMARY KEY (column1, column2)) WITH cdc = {'enabled': 'true'};", + testKeyspace(), testTable()); + } + + @Override + public void applyAlteration() { + getDriverSession().execute(String.format( + "ALTER TYPE %s.simple_udt ADD c int;", + testKeyspace())); + } + + + @Override + public void verifyRawChangeBeforeAlter(RawChange change) { + assertEquals("cdc$stream_id", change.getSchema().getColumnDefinition(0).getColumnName()); + assertEquals("cdc$time", change.getSchema().getColumnDefinition(1).getColumnName()); + assertEquals("cdc$batch_seq_no", change.getSchema().getColumnDefinition(2).getColumnName()); + assertEquals("cdc$deleted_column3", change.getSchema().getColumnDefinition(3).getColumnName()); + assertEquals("cdc$deleted_elements_column3", change.getSchema().getColumnDefinition(4).getColumnName()); + assertEquals("cdc$end_of_batch", change.getSchema().getColumnDefinition(5).getColumnName()); + assertEquals("cdc$operation", change.getSchema().getColumnDefinition(6).getColumnName()); + assertEquals("cdc$ttl", change.getSchema().getColumnDefinition(7).getColumnName()); + assertEquals("column1", change.getSchema().getColumnDefinition(8).getColumnName()); + assertEquals("column2", change.getSchema().getColumnDefinition(9).getColumnName()); + assertEquals("column3", change.getSchema().getColumnDefinition(10).getColumnName()); + assertEquals(ChangeSchema.CqlType.UDT, change.getSchema().getColumnDefinition(10).getBaseTableDataType().getCqlType()); + assertEquals("UDT(alterupdateudtit.simple_udt){a INT, b VARCHAR}", change.getSchema().getColumnDefinition(10).getBaseTableDataType().toString()); + assertEquals( + "FROZEN" + , change.getSchema().getColumnDefinition(10).getCdcLogDataType().toString() + ); + } + + @Override + public void verifyRawChangeAfterAlter(RawChange change) { + + assertEquals("cdc$stream_id", change.getSchema().getColumnDefinition(0).getColumnName()); + assertEquals("cdc$time", change.getSchema().getColumnDefinition(1).getColumnName()); + assertEquals("cdc$batch_seq_no", change.getSchema().getColumnDefinition(2).getColumnName()); + assertEquals("cdc$deleted_column3", change.getSchema().getColumnDefinition(3).getColumnName()); + assertEquals("cdc$deleted_elements_column3", change.getSchema().getColumnDefinition(4).getColumnName()); + assertEquals("cdc$end_of_batch", change.getSchema().getColumnDefinition(5).getColumnName()); + assertEquals("cdc$operation", change.getSchema().getColumnDefinition(6).getColumnName()); + assertEquals("cdc$ttl", change.getSchema().getColumnDefinition(7).getColumnName()); + assertEquals("column1", change.getSchema().getColumnDefinition(8).getColumnName()); + assertEquals("column2", change.getSchema().getColumnDefinition(9).getColumnName()); + assertEquals("column3", change.getSchema().getColumnDefinition(10).getColumnName()); + assertEquals(ChangeSchema.CqlType.UDT, change.getSchema().getColumnDefinition(10).getBaseTableDataType().getCqlType()); + assertEquals("UDT(alterupdateudtit.simple_udt){a INT, b VARCHAR, c INT}", change.getSchema().getColumnDefinition(10).getBaseTableDataType().toString()); + assertEquals( + "FROZEN" + , change.getSchema().getColumnDefinition(10).getCdcLogDataType().toString() + ); + } + + @Override + public Runnable createDatagenTask() { + return () -> { + // Prepare UDT type and statements for both before and after alter + com.datastax.driver.core.UserType udtTypeBefore = getDriverSession().getCluster() + .getMetadata().getKeyspace(testKeyspace()).getUserType("simple_udt"); + PreparedStatement psBefore = + getDriverSession().prepare( + String.format( + "INSERT INTO %s.%s (column1, column2, column3) VALUES (?, ?, ?);", + testKeyspace(), testTable())); + PreparedStatement psAfter = null; + while (!datagenShouldStop.get()) { + int current = datagenCounter.incrementAndGet(); + if (!isAfterAlter.get()) { + // Before alter: UDT has fields a int, b text + com.datastax.driver.core.UDTValue udtValue = udtTypeBefore.newValue() + .setInt("a", current) + .setString("b", "val" + current); + getDriverSession().execute(psBefore.bind(1, 1, udtValue)); + } else { + if (psAfter == null) { + psAfter = getDriverSession().prepare( + String.format( + "INSERT INTO %s.%s (column1, column2, column3) VALUES (?, ?, ?); ", + testKeyspace(), testTable())); + } + // After alter: refresh UDT type to get new field c + com.datastax.driver.core.UserType udtTypeAfter = getDriverSession().getCluster() + .getMetadata().getKeyspace(testKeyspace()).getUserType("simple_udt"); + com.datastax.driver.core.UDTValue udtValue = udtTypeAfter.newValue() + .setInt("a", current) + .setString("b", "val" + current) + .setInt("c", current * 10); + getDriverSession().execute(psAfter.bind(1, 1, udtValue)); + } + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } + }; + } + + @Test + public void alterBeforeNextPageTestPattern() { + super.alterBeforeNextPageTestPattern(); + } +} From 2ba2cfeadb365bbdbe979d66eba70ec4efcfcb99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20B=C4=85czkowski?= Date: Thu, 6 Nov 2025 15:11:21 +0100 Subject: [PATCH 6/8] Rename method in AlterTableBase Applies suggestion to rename `alterBeforeNextPageTestPattern` to `alterBeforeNextPageTestBody` and `alterBeforeNextRowTestPattern` to `alterBeforeNextRowTestBody`. --- .../src/test/java/com/scylladb/cdc/lib/AlterAddColIT.java | 4 ++-- .../src/test/java/com/scylladb/cdc/lib/AlterDropColIT.java | 4 ++-- .../src/test/java/com/scylladb/cdc/lib/AlterReAddColIT.java | 4 ++-- .../src/test/java/com/scylladb/cdc/lib/AlterTableBase.java | 5 ++--- .../src/test/java/com/scylladb/cdc/lib/AlterUpdateUdtIT.java | 4 ++-- 5 files changed, 10 insertions(+), 11 deletions(-) diff --git a/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterAddColIT.java b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterAddColIT.java index 0f92221a..3170f3d3 100644 --- a/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterAddColIT.java +++ b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterAddColIT.java @@ -95,14 +95,14 @@ public Runnable createDatagenTask() { @Test public void alterBeforeNextPageTest(TestInfo testInfo) { setTestTableName(testInfo.getTestMethod().get().getName().toLowerCase()); - super.alterBeforeNextPageTestPattern(); + super.alterBeforeNextPageTestBody(); } @Test public void alterBeforeNextRowTest(TestInfo testInfo) { setTestTableName(testInfo.getTestMethod().get().getName().toLowerCase()); try { - super.alterBeforeNextRowTestPattern(); + super.alterBeforeNextRowTestBody(); } catch (AssertionFailedError e) { // Ignoring. It is bound to appear, because the alter cannot modify already fetched rows. // CDCConsumer has no knowledge about from which page the RawChange comes. diff --git a/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterDropColIT.java b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterDropColIT.java index df11096f..293d21c9 100644 --- a/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterDropColIT.java +++ b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterDropColIT.java @@ -99,7 +99,7 @@ public Runnable createDatagenTask() { } @Test - public void alterBeforeNextPageTestPattern() { - super.alterBeforeNextPageTestPattern(); + public void alterBeforeNextPageTestBody() { + super.alterBeforeNextPageTestBody(); } } diff --git a/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterReAddColIT.java b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterReAddColIT.java index 50a4765f..0eb54c15 100644 --- a/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterReAddColIT.java +++ b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterReAddColIT.java @@ -105,7 +105,7 @@ public Runnable createDatagenTask() { } @Test - public void alterBeforeNextPageTestPattern() { - super.alterBeforeNextPageTestPattern(); + public void alterBeforeNextPageTestBody() { + super.alterBeforeNextPageTestBody(); } } diff --git a/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterTableBase.java b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterTableBase.java index 7852f6e2..6ad03b37 100644 --- a/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterTableBase.java +++ b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterTableBase.java @@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.logging.Level; public abstract class AlterTableBase { protected static final FluentLogger log = FluentLogger.forEnclosingClass(); @@ -125,7 +124,7 @@ protected void createKeyspaceAndTable() { getDriverSession().execute(createTableQuery()); } - public void alterBeforeNextPageTestPattern() { + public void alterBeforeNextPageTestBody() { clearSharedVariables(); createKeyspaceAndTable(); Thread datagenThread = new Thread(createDatagenTask()); @@ -163,7 +162,7 @@ public void alterBeforeNextPageTestPattern() { verifyAllRawChanges(); } - public void alterBeforeNextRowTestPattern() { + public void alterBeforeNextRowTestBody() { clearSharedVariables(); createKeyspaceAndTable(); Thread datagenThread = new Thread(createDatagenTask()); diff --git a/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterUpdateUdtIT.java b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterUpdateUdtIT.java index a8dcf525..f26d015a 100644 --- a/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterUpdateUdtIT.java +++ b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterUpdateUdtIT.java @@ -138,7 +138,7 @@ public Runnable createDatagenTask() { } @Test - public void alterBeforeNextPageTestPattern() { - super.alterBeforeNextPageTestPattern(); + public void alterBeforeNextPageTestBody() { + super.alterBeforeNextPageTestBody(); } } From ac6eb12ebbb007911703dbab97697043685e9371 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20B=C4=85czkowski?= Date: Thu, 6 Nov 2025 17:45:29 +0100 Subject: [PATCH 7/8] Add exception handling to datagen task in AlterDropColIT --- .../test/java/com/scylladb/cdc/lib/AlterDropColIT.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterDropColIT.java b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterDropColIT.java index 293d21c9..43d7d01b 100644 --- a/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterDropColIT.java +++ b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterDropColIT.java @@ -83,7 +83,13 @@ public Runnable createDatagenTask() { "INSERT INTO %s.%s (column1, column2, column3, column4) VALUES (?, ?, ?, ?);", testKeyspace(), testTable())); } - getDriverSession().execute(psBeforeAlter.bind(1, 1, current, current)); + try { + getDriverSession().execute(psBeforeAlter.bind(1, 1, current, current)); + } catch (Exception e) { + // It is possible to send a query right when column is being dropped. + // In such case the exception here should not fail the test. + log.atInfo().withCause(e).log("Datagen task exception encountered"); + } } else { if (psAfterAlter == null) { psAfterAlter = getDriverSession().prepare( From 9f8458917a2993ce012f4d094b5702fed76fa6c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20B=C4=85czkowski?= Date: Thu, 6 Nov 2025 21:11:20 +0100 Subject: [PATCH 8/8] Set method as table name in all AlterTableBase tests --- .../test/java/com/scylladb/cdc/lib/AlterAddColIT.java | 10 ---------- .../test/java/com/scylladb/cdc/lib/AlterDropColIT.java | 9 +++------ .../java/com/scylladb/cdc/lib/AlterReAddColIT.java | 9 +++------ .../test/java/com/scylladb/cdc/lib/AlterTableBase.java | 10 +++++++++- .../java/com/scylladb/cdc/lib/AlterUpdateUdtIT.java | 9 +++------ 5 files changed, 18 insertions(+), 29 deletions(-) diff --git a/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterAddColIT.java b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterAddColIT.java index 3170f3d3..b307fe1f 100644 --- a/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterAddColIT.java +++ b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterAddColIT.java @@ -19,16 +19,6 @@ public String testKeyspace() { return "AlterAddColIT".toLowerCase(); } - protected String testTableName; - @Override - public String testTable() { - return testTableName; - } - - public void setTestTableName(String name) { - this.testTableName = name; - } - @Override public String createTableQuery() { return String.format( diff --git a/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterDropColIT.java b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterDropColIT.java index 43d7d01b..4a16d670 100644 --- a/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterDropColIT.java +++ b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterDropColIT.java @@ -5,6 +5,7 @@ import com.scylladb.cdc.model.worker.RawChange; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; import java.util.concurrent.TimeUnit; @@ -17,11 +18,6 @@ public String testKeyspace() { return "AlterDropColIT".toLowerCase(); } - @Override - public String testTable() { - return "test"; - } - @Override public String createTableQuery() { return String.format( @@ -105,7 +101,8 @@ public Runnable createDatagenTask() { } @Test - public void alterBeforeNextPageTestBody() { + public void alterBeforeNextPageTestBody(TestInfo testInfo) { + setTestTableName(testInfo.getTestMethod().get().getName().toLowerCase()); super.alterBeforeNextPageTestBody(); } } diff --git a/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterReAddColIT.java b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterReAddColIT.java index 0eb54c15..c94f28c2 100644 --- a/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterReAddColIT.java +++ b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterReAddColIT.java @@ -6,6 +6,7 @@ import com.scylladb.cdc.model.worker.RawChange; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; import java.util.concurrent.TimeUnit; @@ -19,11 +20,6 @@ public String testKeyspace() { return "AlterReAddColIT".toLowerCase(); } - @Override - public String testTable() { - return "test"; - } - @Override public String createTableQuery() { return String.format( @@ -105,7 +101,8 @@ public Runnable createDatagenTask() { } @Test - public void alterBeforeNextPageTestBody() { + public void alterBeforeNextPageTestBody(TestInfo testInfo) { + setTestTableName(testInfo.getTestMethod().get().getName().toLowerCase()); super.alterBeforeNextPageTestBody(); } } diff --git a/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterTableBase.java b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterTableBase.java index 6ad03b37..157800ed 100644 --- a/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterTableBase.java +++ b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterTableBase.java @@ -36,7 +36,15 @@ public abstract class AlterTableBase { public abstract String testKeyspace(); - public abstract String testTable(); + protected String testTableName; + + public String testTable() { + return testTableName; + } + + public void setTestTableName(String name) { + this.testTableName = name; + } public void wipeKeyspace() { Session session = getDriverSession(); diff --git a/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterUpdateUdtIT.java b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterUpdateUdtIT.java index f26d015a..66f2ddec 100644 --- a/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterUpdateUdtIT.java +++ b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterUpdateUdtIT.java @@ -6,6 +6,7 @@ import com.scylladb.cdc.model.worker.RawChange; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; import java.util.concurrent.TimeUnit; @@ -19,11 +20,6 @@ public String testKeyspace() { return "AlterUpdateUdtIT".toLowerCase(); } - @Override - public String testTable() { - return "test"; - } - @Override protected void createKeyspaceAndTable() { wipeKeyspace(); @@ -138,7 +134,8 @@ public Runnable createDatagenTask() { } @Test - public void alterBeforeNextPageTestBody() { + public void alterBeforeNextPageTestBody(TestInfo testInfo) { + setTestTableName(testInfo.getTestMethod().get().getName().toLowerCase()); super.alterBeforeNextPageTestBody(); } }