| 
 | 1 | +package com.scylladb.cdc.lib;  | 
 | 2 | + | 
 | 3 | +import static org.junit.jupiter.api.Assertions.assertEquals;  | 
 | 4 | +import static org.junit.jupiter.api.Assumptions.abort;  | 
 | 5 | + | 
 | 6 | +import com.datastax.driver.core.Cluster;  | 
 | 7 | +import com.datastax.driver.core.PreparedStatement;  | 
 | 8 | +import com.datastax.driver.core.ResultSet;  | 
 | 9 | +import com.datastax.driver.core.Row;  | 
 | 10 | +import com.datastax.driver.core.Session;  | 
 | 11 | +import com.datastax.driver.core.exceptions.InvalidQueryException;  | 
 | 12 | +import com.google.common.base.Preconditions;  | 
 | 13 | +import com.scylladb.cdc.model.TableName;  | 
 | 14 | +import com.scylladb.cdc.model.worker.RawChange;  | 
 | 15 | +import com.scylladb.cdc.model.worker.RawChangeConsumer;  | 
 | 16 | +import java.net.InetSocketAddress;  | 
 | 17 | +import java.util.Properties;  | 
 | 18 | +import java.util.concurrent.CompletableFuture;  | 
 | 19 | +import java.util.concurrent.atomic.AtomicBoolean;  | 
 | 20 | +import java.util.concurrent.atomic.AtomicInteger;  | 
 | 21 | +import org.junit.jupiter.api.Tag;  | 
 | 22 | +import org.junit.jupiter.api.Test;  | 
 | 23 | + | 
 | 24 | +@Tag("integration")  | 
 | 25 | +public class TabletsIT {  | 
 | 26 | +  Properties systemProperties = System.getProperties();  | 
 | 27 | +  String hostname =  | 
 | 28 | +      Preconditions.checkNotNull(systemProperties.getProperty("scylla.docker.hostname"));  | 
 | 29 | +  int port = Integer.parseInt(systemProperties.getProperty("scylla.docker.port"));  | 
 | 30 | +  String scyllaVersion =  | 
 | 31 | +      Preconditions.checkNotNull(systemProperties.getProperty("scylla.docker.version"));  | 
 | 32 | + | 
 | 33 | +  @Test  | 
 | 34 | +  public void consumeFromTabletsKeyspace() throws InterruptedException {  | 
 | 35 | +    String keyspace = "tabletsks";  | 
 | 36 | +    String table = "tabletstest";  | 
 | 37 | +    Session session;  | 
 | 38 | + | 
 | 39 | +    try (Cluster cluster = Cluster.builder().addContactPoint(hostname).withPort(port).build()) {  | 
 | 40 | +      session = cluster.connect();  | 
 | 41 | + | 
 | 42 | +      // Create keyspace with tablets enabled  | 
 | 43 | +      session.execute(String.format("DROP KEYSPACE IF EXISTS %s;", keyspace));  | 
 | 44 | +      tryCreateKeyspace(session, String.format(  | 
 | 45 | +          "CREATE KEYSPACE %s WITH replication = {'class': 'NetworkTopologyStrategy', "  | 
 | 46 | +              + "'replication_factor': 1} AND tablets = {'initial': 8};", keyspace));  | 
 | 47 | + | 
 | 48 | +      session.execute(String.format("DROP TABLE IF EXISTS %s.%s;", keyspace, table));  | 
 | 49 | +      tryCreateTable(session,  | 
 | 50 | +          String.format(  | 
 | 51 | +              "CREATE TABLE %s.%s (id int, value text, PRIMARY KEY (id)) "  | 
 | 52 | +                  + "WITH cdc = {'enabled': 'true'};",  | 
 | 53 | +              keyspace, table));  | 
 | 54 | + | 
 | 55 | +      AtomicInteger changeCounter = new AtomicInteger(0);  | 
 | 56 | +      RawChangeConsumer changeConsumer =  | 
 | 57 | +          change -> {  | 
 | 58 | +            changeCounter.incrementAndGet();  | 
 | 59 | +            return CompletableFuture.completedFuture(null);  | 
 | 60 | +          };  | 
 | 61 | + | 
 | 62 | +      try (CDCConsumer consumer =  | 
 | 63 | +          CDCConsumer.builder()  | 
 | 64 | +              .addContactPoint(new InetSocketAddress(hostname, port))  | 
 | 65 | +              .addTable(new TableName(keyspace, table))  | 
 | 66 | +              .withConsumer(changeConsumer)  | 
 | 67 | +              .withQueryTimeWindowSizeMs(10 * 1000)  | 
 | 68 | +              .withConfidenceWindowSizeMs(5 * 1000)  | 
 | 69 | +              .withWorkersCount(1)  | 
 | 70 | +              .build()) {  | 
 | 71 | + | 
 | 72 | +        consumer.start();  | 
 | 73 | + | 
 | 74 | +        // Perform inserts  | 
 | 75 | +        PreparedStatement ps = session.prepare(  | 
 | 76 | +            String.format("INSERT INTO %s.%s (id, value) VALUES (?, ?);", keyspace, table));  | 
 | 77 | + | 
 | 78 | +        int expectedChanges = 10;  | 
 | 79 | +        for (int i = 1; i <= expectedChanges; i++) {  | 
 | 80 | +          session.execute(ps.bind(i, "value" + i));  | 
 | 81 | +        }  | 
 | 82 | + | 
 | 83 | +        // Wait for all changes to be consumed  | 
 | 84 | +        long timeoutMs = 60 * 1000;  | 
 | 85 | +        long startTime = System.currentTimeMillis();  | 
 | 86 | +        long pollIntervalMs = 500; // Check every 500ms  | 
 | 87 | + | 
 | 88 | +        while (changeCounter.get() < expectedChanges &&  | 
 | 89 | +               (System.currentTimeMillis() - startTime) < timeoutMs) {  | 
 | 90 | +          Thread.sleep(pollIntervalMs);  | 
 | 91 | +        }  | 
 | 92 | + | 
 | 93 | +        // Verify we received all expected changes  | 
 | 94 | +        assertEquals(expectedChanges, changeCounter.get(),  | 
 | 95 | +            "Expected to receive " + expectedChanges + " changes but got " + changeCounter.get());  | 
 | 96 | +      }  | 
 | 97 | + | 
 | 98 | +      session.execute(String.format("DROP KEYSPACE %s;", keyspace));  | 
 | 99 | +    }  | 
 | 100 | +  }  | 
 | 101 | + | 
 | 102 | +  @Test  | 
 | 103 | +  public void consumeFromMultipleTablesInTabletsKeyspace() throws InterruptedException {  | 
 | 104 | +    String keyspace = "tabletsks_multi";  | 
 | 105 | +    String table1 = "tabletstest1";  | 
 | 106 | +    String table2 = "tabletstest2";  | 
 | 107 | +    Session session;  | 
 | 108 | + | 
 | 109 | +    try (Cluster cluster = Cluster.builder().addContactPoint(hostname).withPort(port).build()) {  | 
 | 110 | +      session = cluster.connect();  | 
 | 111 | + | 
 | 112 | +      // Create keyspace with tablets enabled  | 
 | 113 | +      session.execute(String.format("DROP KEYSPACE IF EXISTS %s;", keyspace));  | 
 | 114 | +      tryCreateKeyspace(session, String.format(  | 
 | 115 | +          "CREATE KEYSPACE %s WITH replication = {'class': 'NetworkTopologyStrategy', "  | 
 | 116 | +              + "'replication_factor': 1} AND tablets = {'initial': 8};", keyspace));  | 
 | 117 | + | 
 | 118 | +      // Create two tables  | 
 | 119 | +      session.execute(String.format("DROP TABLE IF EXISTS %s.%s;", keyspace, table1));  | 
 | 120 | +      session.execute(String.format("DROP TABLE IF EXISTS %s.%s;", keyspace, table2));  | 
 | 121 | + | 
 | 122 | +      tryCreateTable(session,  | 
 | 123 | +          String.format(  | 
 | 124 | +              "CREATE TABLE %s.%s (id int, value text, PRIMARY KEY (id)) "  | 
 | 125 | +                  + "WITH cdc = {'enabled': 'true'};",  | 
 | 126 | +              keyspace, table1));  | 
 | 127 | + | 
 | 128 | +      tryCreateTable(session,  | 
 | 129 | +          String.format(  | 
 | 130 | +              "CREATE TABLE %s.%s (id int, name text, PRIMARY KEY (id)) "  | 
 | 131 | +                  + "WITH cdc = {'enabled': 'true'};",  | 
 | 132 | +              keyspace, table2));  | 
 | 133 | + | 
 | 134 | +      AtomicInteger changeCounter = new AtomicInteger(0);  | 
 | 135 | +      RawChangeConsumer changeConsumer =  | 
 | 136 | +          change -> {  | 
 | 137 | +            changeCounter.incrementAndGet();  | 
 | 138 | +            return CompletableFuture.completedFuture(null);  | 
 | 139 | +          };  | 
 | 140 | + | 
 | 141 | +      try (CDCConsumer consumer =  | 
 | 142 | +          CDCConsumer.builder()  | 
 | 143 | +              .addContactPoint(new InetSocketAddress(hostname, port))  | 
 | 144 | +              .addTable(new TableName(keyspace, table1))  | 
 | 145 | +              .addTable(new TableName(keyspace, table2))  | 
 | 146 | +              .withConsumer(changeConsumer)  | 
 | 147 | +              .withQueryTimeWindowSizeMs(10 * 1000)  | 
 | 148 | +              .withConfidenceWindowSizeMs(5 * 1000)  | 
 | 149 | +              .withWorkersCount(1)  | 
 | 150 | +              .build()) {  | 
 | 151 | + | 
 | 152 | +        consumer.start();  | 
 | 153 | + | 
 | 154 | +        // Perform inserts to both tables  | 
 | 155 | +        PreparedStatement ps1 = session.prepare(  | 
 | 156 | +            String.format("INSERT INTO %s.%s (id, value) VALUES (?, ?);", keyspace, table1));  | 
 | 157 | +        PreparedStatement ps2 = session.prepare(  | 
 | 158 | +            String.format("INSERT INTO %s.%s (id, name) VALUES (?, ?);", keyspace, table2));  | 
 | 159 | + | 
 | 160 | +        int changesPerTable = 5;  | 
 | 161 | +        int expectedTotalChanges = changesPerTable * 2;  | 
 | 162 | + | 
 | 163 | +        for (int i = 1; i <= changesPerTable; i++) {  | 
 | 164 | +          session.execute(ps1.bind(i, "value" + i));  | 
 | 165 | +          session.execute(ps2.bind(i, "name" + i));  | 
 | 166 | +        }  | 
 | 167 | + | 
 | 168 | +        // Wait for all changes to be consumed  | 
 | 169 | +        long timeoutMs = 60 * 1000;  | 
 | 170 | +        long startTime = System.currentTimeMillis();  | 
 | 171 | +        long pollIntervalMs = 500; // Check every 500ms  | 
 | 172 | + | 
 | 173 | +        while (changeCounter.get() < expectedTotalChanges &&  | 
 | 174 | +               (System.currentTimeMillis() - startTime) < timeoutMs) {  | 
 | 175 | +          Thread.sleep(pollIntervalMs);  | 
 | 176 | +        }  | 
 | 177 | + | 
 | 178 | +        // Verify we received all expected changes  | 
 | 179 | +        assertEquals(expectedTotalChanges, changeCounter.get(),  | 
 | 180 | +            "Expected to receive " + expectedTotalChanges + " changes but got " + changeCounter.get());  | 
 | 181 | +      }  | 
 | 182 | + | 
 | 183 | +      session.execute(String.format("DROP KEYSPACE %s;", keyspace));  | 
 | 184 | +    }  | 
 | 185 | +  }  | 
 | 186 | + | 
 | 187 | +  @Test  | 
 | 188 | +  public void consumeFromTabletsKeyspaceDuringTabletAlteration() throws InterruptedException {  | 
 | 189 | +    String keyspace = "tabletsks";  | 
 | 190 | +    String table = "tabletstest";  | 
 | 191 | +    Session session;  | 
 | 192 | + | 
 | 193 | +    try (Cluster cluster = Cluster.builder().addContactPoint(hostname).withPort(port).build()) {  | 
 | 194 | +      session = cluster.connect();  | 
 | 195 | + | 
 | 196 | +      // Create keyspace with tablets enabled  | 
 | 197 | +      session.execute(String.format("DROP KEYSPACE IF EXISTS %s;", keyspace));  | 
 | 198 | +      tryCreateKeyspace(session, String.format(  | 
 | 199 | +          "CREATE KEYSPACE %s WITH replication = {'class': 'NetworkTopologyStrategy', "  | 
 | 200 | +              + "'replication_factor': 1} AND tablets = {'initial': 8};", keyspace));  | 
 | 201 | + | 
 | 202 | +      session.execute(String.format("DROP TABLE IF EXISTS %s.%s;", keyspace, table));  | 
 | 203 | +      tryCreateTable(session,  | 
 | 204 | +          String.format(  | 
 | 205 | +              "CREATE TABLE %s.%s (id int, value text, PRIMARY KEY (id)) "  | 
 | 206 | +                  + "WITH cdc = {'enabled': 'true'};",  | 
 | 207 | +              keyspace, table));  | 
 | 208 | + | 
 | 209 | +      AtomicInteger changeCounter = new AtomicInteger(0);  | 
 | 210 | +      RawChangeConsumer changeConsumer =  | 
 | 211 | +          change -> {  | 
 | 212 | +            changeCounter.incrementAndGet();  | 
 | 213 | +            return CompletableFuture.completedFuture(null);  | 
 | 214 | +          };  | 
 | 215 | + | 
 | 216 | +      try (CDCConsumer consumer =  | 
 | 217 | +          CDCConsumer.builder()  | 
 | 218 | +              .addContactPoint(new InetSocketAddress(hostname, port))  | 
 | 219 | +              .addTable(new TableName(keyspace, table))  | 
 | 220 | +              .withConsumer(changeConsumer)  | 
 | 221 | +              .withQueryTimeWindowSizeMs(10 * 1000)  | 
 | 222 | +              .withConfidenceWindowSizeMs(5 * 1000)  | 
 | 223 | +              .withWorkersCount(1)  | 
 | 224 | +              .build()) {  | 
 | 225 | + | 
 | 226 | +        consumer.start();  | 
 | 227 | + | 
 | 228 | +        // Start writing in a separate thread  | 
 | 229 | +        AtomicBoolean stopWriting = new AtomicBoolean(false);  | 
 | 230 | +        AtomicInteger totalWrites = new AtomicInteger(0);  | 
 | 231 | + | 
 | 232 | +        Thread writerThread = new Thread(() -> {  | 
 | 233 | +          PreparedStatement ps = session.prepare(  | 
 | 234 | +              String.format("INSERT INTO %s.%s (id, value) VALUES (?, ?);", keyspace, table));  | 
 | 235 | + | 
 | 236 | +          int id = 1;  | 
 | 237 | +          while (!stopWriting.get()) {  | 
 | 238 | +            try {  | 
 | 239 | +              session.execute(ps.bind(id, "value" + id));  | 
 | 240 | +              totalWrites.incrementAndGet();  | 
 | 241 | +              id++;  | 
 | 242 | +              Thread.sleep(100); // Write every 100ms  | 
 | 243 | +            } catch (InterruptedException e) {  | 
 | 244 | +              Thread.currentThread().interrupt();  | 
 | 245 | +              break;  | 
 | 246 | +            }  | 
 | 247 | +          }  | 
 | 248 | +        });  | 
 | 249 | + | 
 | 250 | +        writerThread.start();  | 
 | 251 | + | 
 | 252 | +        // Let some writes happen before altering tablets  | 
 | 253 | +        Thread.sleep(2000);  | 
 | 254 | + | 
 | 255 | +        // Alter tablet configuration  | 
 | 256 | +        session.execute(String.format(  | 
 | 257 | +            "ALTER TABLE %s.%s WITH tablets={'min_tablet_count':16};", keyspace, table));  | 
 | 258 | + | 
 | 259 | +        // Wait for new generation to appear  | 
 | 260 | +        String generationQuery = String.format(  | 
 | 261 | +            "SELECT COUNT(timestamp) FROM system.cdc_timestamps WHERE keyspace_name='%s' AND table_name='%s';",  | 
 | 262 | +            keyspace, table);  | 
 | 263 | + | 
 | 264 | +        long timeoutMs = 300 * 1000;  | 
 | 265 | +        long startTime = System.currentTimeMillis();  | 
 | 266 | + | 
 | 267 | +        ResultSet rs = session.execute(generationQuery);  | 
 | 268 | +        long origGenerationCount = rs.one().getLong(0);  | 
 | 269 | +        Long generationCount = origGenerationCount;  | 
 | 270 | + | 
 | 271 | +        while (generationCount == origGenerationCount && (System.currentTimeMillis() - startTime) < timeoutMs) {  | 
 | 272 | +          rs = session.execute(generationQuery);  | 
 | 273 | +          generationCount = rs.one().getLong(0);  | 
 | 274 | +          Thread.sleep(1000); // Check every second  | 
 | 275 | +        }  | 
 | 276 | + | 
 | 277 | +        // Continue writing for a bit more after tablet alteration  | 
 | 278 | +        Thread.sleep(3000);  | 
 | 279 | + | 
 | 280 | +        // Stop the writer  | 
 | 281 | +        stopWriting.set(true);  | 
 | 282 | +        writerThread.join();  | 
 | 283 | + | 
 | 284 | +        int expectedChanges = totalWrites.get();  | 
 | 285 | + | 
 | 286 | +        // Wait for all changes to be consumed  | 
 | 287 | +        timeoutMs = 60 * 1000; // 60 seconds timeout  | 
 | 288 | +        startTime = System.currentTimeMillis();  | 
 | 289 | +        long pollIntervalMs = 500; // Check every 500ms  | 
 | 290 | + | 
 | 291 | +        while (changeCounter.get() < expectedChanges &&  | 
 | 292 | +               (System.currentTimeMillis() - startTime) < timeoutMs) {  | 
 | 293 | +          Thread.sleep(pollIntervalMs);  | 
 | 294 | +        }  | 
 | 295 | + | 
 | 296 | +        // Verify we received all expected changes  | 
 | 297 | +        assertEquals(expectedChanges, changeCounter.get(),  | 
 | 298 | +            "Expected to receive " + expectedChanges + " changes but got " + changeCounter.get());  | 
 | 299 | +      }  | 
 | 300 | + | 
 | 301 | +      session.execute(String.format("DROP KEYSPACE %s;", keyspace));  | 
 | 302 | +    }  | 
 | 303 | +  }  | 
 | 304 | + | 
 | 305 | +  public void tryCreateKeyspace(Session session, String query) {  | 
 | 306 | +      try {  | 
 | 307 | +          session.execute(query);  | 
 | 308 | +      } catch (Exception e) {  | 
 | 309 | +          if (e.getMessage().contains("Unknown property 'tablets'")) {  | 
 | 310 | +              abort("Test aborted: This version of Scylla doesn't support CDC with tablets. " +  | 
 | 311 | +                              "Error message: " + e.getMessage());  | 
 | 312 | +          }  | 
 | 313 | +          throw e;  | 
 | 314 | +      }  | 
 | 315 | +  }  | 
 | 316 | + | 
 | 317 | +  public void tryCreateTable(Session session, String query) throws InvalidQueryException {  | 
 | 318 | +      try {  | 
 | 319 | +          session.execute(query);  | 
 | 320 | +      } catch (InvalidQueryException e) {  | 
 | 321 | +          if (e.getMessage().contains("Cannot create CDC log for a table") &&  | 
 | 322 | +                  e.getMessage().contains("because keyspace uses tablets")) {  | 
 | 323 | +              abort("Test aborted: This version of Scylla doesn't support CDC with tablets. " +  | 
 | 324 | +                              "Error message: " + e.getMessage());  | 
 | 325 | +          }  | 
 | 326 | +          throw e;  | 
 | 327 | +      }  | 
 | 328 | +  }  | 
 | 329 | +}  | 
0 commit comments