Skip to content

Commit 4a05251

Browse files
committed
Add AlterTableIT#alterTableBeforeNextPage
Adds requested test method that ALTERs the base table right before a next page is fetched by the internal Reader of the WorkerCQL used by CDCConsumer. This is achieved by blocking on a Semaphore until the changes safely propagate. Only after that the Reader is allowed to fetch the next page. The fetch size is intentionally set to low value of 1. Currently this test fails, because the Schema once built by the Driver3Reader does not change within its lifetime. The schema after the alter will fail to reflect that the received data contains newly added `column4`.
1 parent b5f5ec2 commit 4a05251

File tree

1 file changed

+176
-0
lines changed

1 file changed

+176
-0
lines changed

scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/AlterTableIT.java

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,35 @@
55

66
import com.datastax.driver.core.Cluster;
77
import com.datastax.driver.core.PreparedStatement;
8+
import com.datastax.driver.core.ResultSet;
89
import com.datastax.driver.core.Session;
910
import com.google.common.base.Preconditions;
11+
import com.google.common.flogger.FluentLogger;
1012
import com.google.common.util.concurrent.Uninterruptibles;
13+
import com.scylladb.cdc.cql.driver3.Driver3Session;
14+
import com.scylladb.cdc.cql.driver3.MockDriver3WorkerCQL;
1115
import com.scylladb.cdc.model.TableName;
1216
import com.scylladb.cdc.model.worker.ChangeSchema;
1317
import com.scylladb.cdc.model.worker.RawChange;
1418
import com.scylladb.cdc.model.worker.RawChangeConsumer;
19+
import com.scylladb.cdc.model.worker.WorkerConfiguration;
1520
import java.net.InetSocketAddress;
1621
import java.util.List;
1722
import java.util.Properties;
1823
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.Semaphore;
1925
import java.util.concurrent.TimeUnit;
2026
import java.util.concurrent.atomic.AtomicBoolean;
2127
import java.util.concurrent.atomic.AtomicInteger;
2228
import java.util.concurrent.atomic.AtomicReference;
29+
30+
import org.awaitility.Awaitility;
2331
import org.junit.jupiter.api.Tag;
2432
import org.junit.jupiter.api.Test;
2533

2634
@Tag("integration")
2735
public class AlterTableIT {
36+
private static final FluentLogger log = FluentLogger.forEnclosingClass();
2837
Properties systemProperties = System.getProperties();
2938
String hostname =
3039
Preconditions.checkNotNull(systemProperties.getProperty("scylla.docker.hostname"));
@@ -167,4 +176,171 @@ private synchronized void printDetails(RawChange change) {
167176
System.out.println(change.getAsObject("column1"));
168177
System.out.flush();
169178
}
179+
180+
@Test
181+
public void alterTableBeforeNextPage() {
182+
// A variation that alters the table right before the next page is fetched.
183+
String keyspace = "testks";
184+
String table = "alternextpage";
185+
Session session;
186+
int confidenceWindowSeconds = 5;
187+
try (Cluster cluster = Cluster.builder().addContactPoint(hostname).withPort(port).build()) {
188+
session = cluster.connect();
189+
session.execute(
190+
String.format(
191+
"CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', "
192+
+ "'replication_factor': 1};",
193+
keyspace));
194+
session.execute(String.format("DROP TABLE IF EXISTS %s.%s;", keyspace, table));
195+
session.execute(
196+
String.format(
197+
"CREATE TABLE IF NOT EXISTS %s.%s (column1 int, column2 int, column3 int, PRIMARY"
198+
+ " KEY (column1, column2)) WITH cdc = {'enabled': 'true'};",
199+
keyspace, table));
200+
201+
AtomicInteger counter = new AtomicInteger(0);
202+
AtomicBoolean taskShouldStop = new AtomicBoolean(false);
203+
// Start continuously populating the base table in the background
204+
Runnable task =
205+
() -> {
206+
PreparedStatement ps =
207+
session.prepare(
208+
String.format(
209+
"INSERT INTO %s.%s (column1, column2, column3) VALUES (?, ?, ?);",
210+
keyspace, table));
211+
while (!taskShouldStop.get()) {
212+
int current = counter.incrementAndGet();
213+
session.execute(ps.bind(current, current, current));
214+
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
215+
}
216+
};
217+
Thread thread = new Thread(task);
218+
thread.start();
219+
220+
AtomicBoolean failedDueToInvalidTypeEx = new AtomicBoolean(false);
221+
AtomicBoolean liftedProceedGate = new AtomicBoolean(false);
222+
AtomicReference<RawChange> firstChange, firstChangeAfterFetchingNewPage;
223+
firstChange = new AtomicReference<>(null);
224+
firstChangeAfterFetchingNewPage = new AtomicReference<>(null);
225+
RawChangeConsumer changeConsumer =
226+
change -> {
227+
try {
228+
firstChange.compareAndSet(null, change);
229+
if (liftedProceedGate.get()) {
230+
firstChangeAfterFetchingNewPage.compareAndSet(null, change);
231+
}
232+
String toString =
233+
change.toString(); // forces Driver3RawChange to go through all ColumnDefinitions
234+
} catch (Exception ex) {
235+
// Meant to catch InvalidTypeException, but it's shaded as part of driver3 and not
236+
// exposed.
237+
failedDueToInvalidTypeEx.set(true);
238+
}
239+
return CompletableFuture.completedFuture(null);
240+
};
241+
242+
try (CDCConsumer consumer =
243+
CDCConsumer.builder()
244+
.addContactPoint(new InetSocketAddress(hostname, port))
245+
.addTable(new TableName(keyspace, table))
246+
.withConsumer(changeConsumer)
247+
.withQueryTimeWindowSizeMs(15 * 1000)
248+
.withConfidenceWindowSizeMs(confidenceWindowSeconds * 1000)
249+
.withWorkersCount(1)
250+
.withQueryOptionsFetchSize(1)
251+
.build()) {
252+
Semaphore proceedGate = null;
253+
try {
254+
// Access the LocalTransport field in CDCConsumer
255+
java.lang.reflect.Field transportField = CDCConsumer.class.getDeclaredField("transport");
256+
transportField.setAccessible(true);
257+
LocalTransport transport = (LocalTransport) transportField.get(consumer);
258+
259+
// Access the WorkerConfiguration.Builder field in LocalTransport
260+
java.lang.reflect.Field builderField = LocalTransport.class.getDeclaredField("workerConfigurationBuilder");
261+
builderField.setAccessible(true);
262+
WorkerConfiguration.Builder builder = (WorkerConfiguration.Builder) builderField.get(transport);
263+
264+
// Access the session field in CDCConsumer
265+
java.lang.reflect.Field sessionField = CDCConsumer.class.getDeclaredField("session");
266+
sessionField.setAccessible(true);
267+
Driver3Session driver3Session = (Driver3Session) sessionField.get(consumer);
268+
269+
// Create MockDriver3WorkerCQL and replace the cql field in the builder
270+
MockDriver3WorkerCQL mockWorkerCQL = new MockDriver3WorkerCQL(driver3Session);
271+
// Grab the Semaphore to control it later on
272+
proceedGate = mockWorkerCQL.getProceedGate();
273+
java.lang.reflect.Field cqlField = WorkerConfiguration.Builder.class.getDeclaredField("cql");
274+
cqlField.setAccessible(true);
275+
cqlField.set(builder, mockWorkerCQL);
276+
} catch (Exception e) {
277+
throw new RuntimeException("Failed to replace Driver3WorkerCQL with MockDriver3WorkerCQL", e);
278+
}
279+
// Let some records be generated first
280+
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> {
281+
if (counter.get() > 15) {
282+
return true;
283+
}
284+
return false;
285+
});
286+
Uninterruptibles.sleepUninterruptibly(confidenceWindowSeconds, TimeUnit.SECONDS);
287+
consumer.start();
288+
Semaphore finalProceedGate = proceedGate;
289+
// Wait until something needs to iterate to the next page
290+
Awaitility.await().atMost(20, TimeUnit.SECONDS).until(finalProceedGate::hasQueuedThreads);
291+
ResultSet rs = session.execute(String.format("ALTER TABLE %s.%s " + "ADD column4 int;", keyspace, table));
292+
Awaitility.await().atMost(15, TimeUnit.SECONDS).until(() ->
293+
{
294+
try {
295+
return session.getCluster().getMetadata().getKeyspace(keyspace).getTable(table).asCQLQuery().contains("column4");
296+
} catch (Exception e) {
297+
return false;
298+
}
299+
});
300+
liftedProceedGate.set(true);
301+
proceedGate.release(100);
302+
Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
303+
} catch (InterruptedException e) {
304+
log.atInfo().withCause(e).log("Caught InterruptedException, likely from CDCConsumer");
305+
}
306+
taskShouldStop.set(true);
307+
try {
308+
thread.join();
309+
} catch (InterruptedException e) {
310+
throw new RuntimeException("Failed to join the background thread", e);
311+
}
312+
313+
// Verify the schema of first and firstNewPage RawChange
314+
RawChange sample = firstChange.get();
315+
assertEquals("cdc$stream_id", sample.getSchema().getColumnDefinition(0).getColumnName());
316+
assertEquals("cdc$time", sample.getSchema().getColumnDefinition(1).getColumnName());
317+
assertEquals("cdc$batch_seq_no", sample.getSchema().getColumnDefinition(2).getColumnName());
318+
assertEquals(
319+
"cdc$deleted_column3", sample.getSchema().getColumnDefinition(3).getColumnName());
320+
assertEquals("cdc$end_of_batch", sample.getSchema().getColumnDefinition(4).getColumnName());
321+
assertEquals("cdc$operation", sample.getSchema().getColumnDefinition(5).getColumnName());
322+
assertEquals("cdc$ttl", sample.getSchema().getColumnDefinition(6).getColumnName());
323+
assertEquals("column1", sample.getSchema().getColumnDefinition(7).getColumnName());
324+
assertEquals("column2", sample.getSchema().getColumnDefinition(8).getColumnName());
325+
assertEquals("column3", sample.getSchema().getColumnDefinition(9).getColumnName());
326+
327+
sample = firstChangeAfterFetchingNewPage.get();
328+
assertEquals("cdc$stream_id", sample.getSchema().getColumnDefinition(0).getColumnName());
329+
assertEquals("cdc$time", sample.getSchema().getColumnDefinition(1).getColumnName());
330+
assertEquals("cdc$batch_seq_no", sample.getSchema().getColumnDefinition(2).getColumnName());
331+
assertEquals(
332+
"cdc$deleted_column3", sample.getSchema().getColumnDefinition(3).getColumnName());
333+
assertEquals(
334+
"cdc$deleted_column4", sample.getSchema().getColumnDefinition(4).getColumnName());
335+
assertEquals("cdc$end_of_batch", sample.getSchema().getColumnDefinition(5).getColumnName());
336+
assertEquals("cdc$operation", sample.getSchema().getColumnDefinition(6).getColumnName());
337+
assertEquals("cdc$ttl", sample.getSchema().getColumnDefinition(7).getColumnName());
338+
assertEquals("column1", sample.getSchema().getColumnDefinition(8).getColumnName());
339+
assertEquals("column2", sample.getSchema().getColumnDefinition(9).getColumnName());
340+
assertEquals("column3", sample.getSchema().getColumnDefinition(10).getColumnName());
341+
assertEquals("column4", sample.getSchema().getColumnDefinition(11).getColumnName());
342+
343+
assertFalse(failedDueToInvalidTypeEx.get());
344+
}
345+
}
170346
}

0 commit comments

Comments
 (0)