Skip to content

Conversation

@Bouncheck
Copy link
Collaborator

@Bouncheck Bouncheck commented Oct 22, 2025

Adds requested test method that ALTERs the base table right before the next
page is fetched by the internal Reader of the WorkerCQL used by CDCConsumer.
This is achieved by waiting for read lock of ReentrantReadWriteLock until
the changes safely propagate and the test method releases the write lock.
Only after that the Reader is allowed to fetch the next page.
The fetch size is intentionally set to low value of 1.
The test data is limited to single primary key. This is in order to limit
the number of queued concurrent tasks.

Test-jar is now created to allow cdc-lib to use mocked class from driver3 module.

Addresses #133.

@Bouncheck Bouncheck self-assigned this Oct 22, 2025
@Bouncheck Bouncheck force-pushed the mockdriver3reader branch 3 times, most recently from 72b758f to 4a05251 Compare October 23, 2025 20:14
@Bouncheck Bouncheck changed the title [WIP] IT: Alter table right before fetching next page Add IT that ALTERs right before Reader fetches next page Oct 23, 2025
@Bouncheck Bouncheck marked this pull request as ready for review October 23, 2025 20:23
@Bouncheck Bouncheck requested a review from dkropachev October 23, 2025 20:24
Copy link
Collaborator

@dkropachev dkropachev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add separate tests for Alter table on new page and on new row.

@Bouncheck Bouncheck force-pushed the mockdriver3reader branch 2 times, most recently from d33847e to 390fe1d Compare October 29, 2025 12:09
@Bouncheck
Copy link
Collaborator Author

Bouncheck commented Oct 29, 2025

I've rebased on top of master, which should now have changes that recreate schema.
I'm suspecting that failure on latest is due to maven cache. Looking into that.

@dkropachev
Copy link
Collaborator

I've rebased on top of master, which should now have changes that recreate schema. I'm suspecting that failure on latest is due to maven cache. Looking into that.

Maven cache only for dependencies, it should not cache builds

@Bouncheck
Copy link
Collaborator Author

Found one bug in the reworked test yesterday, but some other bug still remains.

@dkropachev
Copy link
Collaborator

Found one bug in the reworked test yesterday, but some other bug still remains.

Please include fixes into this PR, update it's name and description to reflect that.

Setting this option is required for MockDriver3WorkerCQL to work properly.
Otherwise the types won't match.
Exposes in CDCConsumer already present option in CQLConfiguration Builder
for setting fetch size.
This will be needed for scylla-cdc-lib to have access to mock test class
from driver3 module.
Necessary to access mock test classes.
In order to complete the task I need to make Driver3WorkerCQL not final.
Additionally the Driver3Reader is made non-final and access is changed to
protected. This is in order to mock it in a test.
`findNext` method, `ResultSet rs`, `lastChangeId` are now protected instead
of private too.
Adds the mock version of Driver3WorkerCQL.
This mock is created with singular purpose of using it in AlterTableIT.
The mock adds ReentrantReadWriteLocks that allow to block the readers
in specific moments. In the case of AlterTableIT we want to block before
fetching the next page.
Reflection is used in order not to modify the actual code of Driver3MultiReader.
Adds requested test method that ALTERs the base table right before the next
page is fetched by the internal Reader of the WorkerCQL used by CDCConsumer.
This is achieved by waiting for read lock of ReentrantReadWriteLock until
the changes safely propagate and the test method releases the write lock.
Only after that the Reader is allowed to fetch the next page.
The fetch size is intentionally set to low value of 1.
The test data is limited to single primary key. This is in order to limit
the number of queued concurrent tasks.
@Bouncheck
Copy link
Collaborator Author

Bouncheck commented Oct 31, 2025

The initial difference came from my local cache, but after fixing that other bugs surfaced. All should be fixed now.
Fixed ResultSets not being overwritten in mock, fixed concurrency issues stemming from having too many tasks queued - after unblocking all readers it was possible that some other task not yet moving to the next page would be the first to pass the message to consumer.
Test was slightly reworked, now every row is checked rather than first and first after alter. Sleep and wait times were adjusted.
Addressed review comments, made more Driver3Reader fields protected in order to drop more reflection code.


@Override
protected void findNext(CompletableFuture<Optional<RawChange>> fut) {
CompletableFuture.runAsync( () -> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked closer at findNext, and now I don't think that we need to run this guy in separate feature.
Only place it tends to do something async is via Futures.addCallback, but it uses MoreExecutors.directExecutor(), so it is completely sync function, therefore we can and should dump CompletableFuture.runAsync( () -> { here

Preconditions.checkNotNull(systemProperties.getProperty("scylla.docker.version"));

@Test
public void alterBaseTableAtRuntime() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you either update this test to make use of nextRowLock, or create test that target both nextRowLock and nextPageLock

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the point of the test using nextRowLock? I've added it but I don't understand it's purpose exactly. AFAIK already fetched rows will not react to the changes on the cluster so I'm not sure what is there to test.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be sure that it works fine when it happens

verifySchemaBeforeAlter(firstChange.get());
verifySchemaAfterAlter(firstChangeAfterFetchingNewPage.get());

assertFalse(failedDueToInvalidTypeEx.get());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see you ever change failedDueToInvalidTypeEx

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a leftover, will remove that

Comment on lines +260 to +285
try {
// Access the LocalTransport field in CDCConsumer
java.lang.reflect.Field transportField = CDCConsumer.class.getDeclaredField("transport");
transportField.setAccessible(true);
LocalTransport transport = (LocalTransport) transportField.get(consumer);

// Access the WorkerConfiguration.Builder field in LocalTransport
java.lang.reflect.Field builderField = LocalTransport.class.getDeclaredField("workerConfigurationBuilder");
builderField.setAccessible(true);
WorkerConfiguration.Builder builder = (WorkerConfiguration.Builder) builderField.get(transport);

// Access the session field in CDCConsumer
java.lang.reflect.Field sessionField = CDCConsumer.class.getDeclaredField("session");
sessionField.setAccessible(true);
Driver3Session driver3Session = (Driver3Session) sessionField.get(consumer);

// Create MockDriver3WorkerCQL and replace the cql field in the builder
MockDriver3WorkerCQL mockWorkerCQL = new MockDriver3WorkerCQL(driver3Session);
// Grab the lock to control it later on
nextPageLock = mockWorkerCQL.nextPageLock;
java.lang.reflect.Field cqlField = WorkerConfiguration.Builder.class.getDeclaredField("cql");
cqlField.setAccessible(true);
cqlField.set(builder, mockWorkerCQL);
} catch (Exception e) {
throw new RuntimeException("Failed to replace Driver3WorkerCQL with MockDriver3WorkerCQL", e);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please find better way to do that, without hacking into instances.
If necessary add additional API to builders.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will require modifying the accesses in production code, but I can.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably someone would want to override these classes, since most of it already has interfaces it will not be a big deal.

+ " KEY (column1, column2)) WITH cdc = {'enabled': 'true'};",
keyspace, table));

AtomicInteger counter = new AtomicInteger(0);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
AtomicInteger counter = new AtomicInteger(0);
AtomicInteger updatesCount = new AtomicInteger(0);

thread.start();

AtomicBoolean failedDueToInvalidTypeEx = new AtomicBoolean(false);
AtomicBoolean unlockedNextPageLock = new AtomicBoolean(false);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
AtomicBoolean unlockedNextPageLock = new AtomicBoolean(false);
AtomicBoolean alterTableIsDone = new AtomicBoolean(false);

AtomicBoolean consumedChangeAfterAlter = new AtomicBoolean(false);
AtomicLong lastConsumedTime = new AtomicLong(0);
AtomicReference<RawChange> firstChange, firstChangeAfterFetchingNewPage;
firstChange = new AtomicReference<>(null);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
firstChange = new AtomicReference<>(null);
changeBeforeAlter = new AtomicReference<>(null);

AtomicLong lastConsumedTime = new AtomicLong(0);
AtomicReference<RawChange> firstChange, firstChangeAfterFetchingNewPage;
firstChange = new AtomicReference<>(null);
firstChangeAfterFetchingNewPage = new AtomicReference<>(null);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
firstChangeAfterFetchingNewPage = new AtomicReference<>(null);
changeAfterAlter = new AtomicReference<>(null);

// and consumer has not consumed any change for the last 10 seconds
ReentrantReadWriteLock finalNextPageLock = nextPageLock;
Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> finalNextPageLock.hasQueuedThreads() && (System.currentTimeMillis() - lastConsumedTime.get() > 10000));
ResultSet rs = session.execute(String.format("ALTER TABLE %s.%s " + "ADD column4 int;", keyspace, table));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also have tests on:

  1. Updating UDT
  2. Removing column
  3. Changing column type (by removing and adding column with the same name, but different type)

It would be great if you can generalize the test

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll make the alter query a parameter, or if it will be more complex some kind of runnable/callable that performs the modification and use that as a test parameter.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants