From 15d3e4e29f317f582c662a0a6c703a3eaf294118 Mon Sep 17 00:00:00 2001 From: Lukasz Antoniak Date: Fri, 17 Apr 2026 21:45:18 +0200 Subject: [PATCH] CASSANALYTICS-151: Cannot read start offset from BTI index with big partitions Patch by Lukasz Antoniak; Reviewed by Yifan Cai and Mick Semb Wever for CASSANALYTICS-151 --- CHANGES.txt | 1 + .../io/sstable/format/bti/BtiReaderUtils.java | 33 +++---- .../spark/reader/IndexOffsetTests.java | 96 ++++++++++++------- .../spark/reader/IndexOffsetTests.java | 12 +-- 4 files changed, 80 insertions(+), 62 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index cf4540d56..7def2565b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 0.4.0 ----- + * Cannot read start offset from BTI index with big partitions (CASSANALYTICS-151) * BufferingInputStream fails to read last unaligned chunk (CASSANALYTICS-147) * Support per-instance sidecar port resolution in CDC client (CASSANALYTICS-130) * Change log level for logs in CassandraSchema to debug level to avoid log spamming (CASSANALYTICS-149) diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/format/bti/BtiReaderUtils.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/format/bti/BtiReaderUtils.java index 444610bfd..9eb88d76b 100644 --- a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/format/bti/BtiReaderUtils.java +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/format/bti/BtiReaderUtils.java @@ -87,7 +87,7 @@ public static boolean primaryIndexContainsAnyKey(@NotNull SSTable ssTable, @NotNull List filters) throws IOException { final AtomicBoolean exists = new AtomicBoolean(false); - withPartitionIndex(ssTable, descriptor, metadata, true, true, (dataFileHandle, partitionFileHandle, rowFileHandle, partitionIndex) -> { + withPartitionIndex(ssTable, descriptor, metadata, (dataFileHandle, partitionFileHandle, rowFileHandle, partitionIndex) -> { TableMetadataRef metadataRef = TableMetadataRef.forOfflineTools(metadata); BtiTableReader btiTableReader = new BtiTableReader.Builder(descriptor) .setDataFile(dataFileHandle) @@ -135,11 +135,12 @@ public static Long startOffsetInDataFile(@NotNull SSTable ssTable, try { - withPartitionIndex(ssTable, descriptor, metadata, true, false, (dataFileHandle, partitionFileHandle, rowFileHandle, partitionIndex) -> { + withPartitionIndex(ssTable, descriptor, metadata, (dataFileHandle, partitionFileHandle, rowFileHandle, partitionIndex) -> { TableMetadataRef metadataRef = TableMetadataRef.forOfflineTools(metadata); BtiTableReader btiTableReader = new BtiTableReader.Builder(descriptor) .setDataFile(dataFileHandle) .setPartitionIndex(partitionIndex) + .setRowIndexFile(rowFileHandle) .setComponents(indexComponents) .setTableMetadataRef(metadataRef) .setFilter(FilterFactory.AlwaysPresent) @@ -179,7 +180,7 @@ public static void consumePrimaryIndex(@NotNull SSTable ssTable, org.apache.cassandra.spark.reader.CompressionMetadata compressionMetadata = SSTableCache.INSTANCE.compressionMetadata( ssTable, descriptor.version.hasMaxCompressedLength(), metadata.params.crcCheckChance); - withPartitionIndex(ssTable, descriptor, metadata, true, true, (dataFileHandle, partitionFileHandle, rowFileHandle, partitionIndex) -> { + withPartitionIndex(ssTable, descriptor, metadata, (dataFileHandle, partitionFileHandle, rowFileHandle, partitionIndex) -> { BtiTableReader btiTableReader = new BtiTableReader.Builder(descriptor) .setDataFile(dataFileHandle) .setPartitionIndex(partitionIndex) @@ -249,7 +250,7 @@ public static void readPrimaryIndex(@NotNull SSTable sstable, double crcCheckChance, @NotNull Function tracker) throws IOException { - withPartitionIndex(sstable, descriptor, partitioner, crcCheckChance, true, true, + withPartitionIndex(sstable, descriptor, partitioner, crcCheckChance, (dataFileHandle, partitionFileHandle, rowFileHandle, partitionIndex) -> { try (PartitionIterator iter = PartitionIterator.create(partitionIndex, partitioner, rowFileHandle, dataFileHandle, @@ -272,11 +273,9 @@ public static void readPrimaryIndex(@NotNull SSTable sstable, private static void withPartitionIndex(@NotNull SSTable sstable, @NotNull Descriptor descriptor, @NotNull TableMetadata metadata, - boolean loadDataFile, - boolean loadRowsIndex, @NotNull BtiPartitionIndexConsumer consumer) throws IOException { - withPartitionIndex(sstable, descriptor, metadata.partitioner, metadata.params.crcCheckChance, loadDataFile, loadRowsIndex, consumer); + withPartitionIndex(sstable, descriptor, metadata.partitioner, metadata.params.crcCheckChance, consumer); } /** @@ -285,8 +284,6 @@ private static void withPartitionIndex(@NotNull SSTable sstable, * @param descriptor sstable descriptor * @param partitioner partitioner * @param crcCheckChance crc check chance - * @param loadDataFile when true, open the data component. The dataFile for BtiPartitionIndexConsumer is not null; otherwise, null - * @param loadRowsIndex when true, open the rowIndex component. The rowFile for BtiPartitionIndexConsumer is not null; otherwise, null * @param consumer BtiPartitionIndexConsumer * @throws IOException on I/O errors */ @@ -294,24 +291,22 @@ private static void withPartitionIndex(@NotNull SSTable sstable, @NotNull Descriptor descriptor, @NotNull IPartitioner partitioner, double crcCheckChance, - boolean loadDataFile, - boolean loadRowsIndex, @NotNull BtiPartitionIndexConsumer consumer) throws IOException { File file = new File(sstable.getDataFileName()); try (CompressionMetadata compression = getCompressionMetadata(sstable, crcCheckChance, descriptor); - FileHandle dataFileHandle = loadDataFile ? createFileHandle(file, - sstable.openDataStream(), - sstable.length(FileType.DATA), - compression) : null; + FileHandle dataFileHandle = createFileHandle(file, + sstable.openDataStream(), + sstable.length(FileType.DATA), + compression); FileHandle partitionFileHandle = createFileHandle(file, sstable.openPrimaryIndexStream(), sstable.length(FileType.PARTITIONS_INDEX), null); - FileHandle rowFileHandle = loadRowsIndex ? createFileHandle(file, - sstable.openRowIndexStream(), - sstable.length(FileType.ROWS_INDEX), - null) : null; + FileHandle rowFileHandle = createFileHandle(file, + sstable.openRowIndexStream(), + sstable.length(FileType.ROWS_INDEX), + null); PartitionIndex partitionIndex = PartitionIndex.load(partitionFileHandle, partitioner, false)) { consumer.accept(dataFileHandle, partitionFileHandle, rowFileHandle, partitionIndex); diff --git a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/IndexOffsetTests.java b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/IndexOffsetTests.java index 316a9a1c7..4d55d81d7 100644 --- a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/IndexOffsetTests.java +++ b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/IndexOffsetTests.java @@ -23,17 +23,21 @@ import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.Collection; +import java.util.stream.Stream; import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.Multimap; import org.apache.commons.lang.mutable.MutableInt; import org.apache.commons.lang.mutable.MutableLong; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.bridge.CassandraBridgeImplementation; import org.apache.cassandra.bridge.TokenRange; +import org.apache.cassandra.db.rows.Unfiltered; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.schema.Schema; @@ -78,23 +82,27 @@ public class IndexOffsetTests new BigInteger("9223372036854775807"))) .build(); - @Test @SuppressWarnings("static-access") - public void testReadIndexOffsets() + @ParameterizedTest + @MethodSource("partitionSizeProvider") + public void testReadIndexOffsets(int numPartitions, int numRowsPerPartition) { qt().forAll(arbitrary().enumValues(Partitioner.class), booleans().all()) .checkAssert((partitioner, enableCompression) -> { try (TemporaryDirectory directory = new TemporaryDirectory()) { - int numKeys = 100000; + int numKeys = numPartitions * numRowsPerPartition; TestSchema schema = TestSchema.basicBuilder(BRIDGE) .withCompression(enableCompression) .build(); schema.writeSSTable(directory, BRIDGE, partitioner, writer -> { - for (int index = 0; index < numKeys; index++) + for (int pk = 0; pk < numPartitions; pk++) { - writer.write(index, 0, index); + for (int ck = 0; ck < numRowsPerPartition; ck++) + { + writer.write(pk, ck, pk); + } } }); assertThat(TestSSTable.countIn(directory.path())).isEqualTo(1); @@ -113,7 +121,7 @@ public void testReadIndexOffsets() MutableInt skippedPartitions = new MutableInt(0); MutableLong skippedDataOffsets = new MutableLong(0); - int[] counts = new int[numKeys]; + int[][] counts = new int[numPartitions][numRowsPerPartition]; for (TokenRange range : ranges) { SSTableReader reader = SSTableReader.builder(metadata, ssTable) @@ -144,47 +152,43 @@ public void skippedDataDbStartOffset(long length) while (scanner.hasNext()) { UnfilteredRowIterator rowIterator = scanner.next(); - int key = rowIterator.partitionKey().getKey().getInt(); - // Count how many times we read a key across all 'spark' token partitions - counts[key]++; + int pk = rowIterator.partitionKey().getKey().getInt(); while (rowIterator.hasNext()) { - rowIterator.next(); + Unfiltered unfiltered = rowIterator.next(); + int ck = unfiltered.clustering().bufferAt(0).asIntBuffer().get(); + // Count how many times we read a key across all 'spark' token partitions + counts[pk][ck]++; } } } } // Verify we read each key exactly once across all Spark partitions - assertThat(counts).hasSize(numKeys); - int index = 0; - for (int count : counts) + assertThat(counts.length).isEqualTo(numPartitions); + for (int partitionNum = 0; partitionNum < counts.length; partitionNum++) { - if (count == 0) - { - LOGGER.error("Missing key key={} token={} partitioner={}", - index, - // Cast to ByteBuffer required when compiling with Java 8 - ReaderUtils.tokenToBigInteger(BRIDGE - .getPartitioner(partitioner) - .decorateKey((ByteBuffer) ByteBuffer.allocate(4).putInt(index).flip()) - .getToken()), - partitioner.name()); - } - else if (count > 1) + for (int rowNumInPartition = 0; rowNumInPartition < counts[partitionNum].length; rowNumInPartition++) { - LOGGER.error("Key read by more than 1 Spark partition key={} token={} partitioner={}", - index, - // Cast to ByteBuffer required when compiling with Java 8 - ReaderUtils.tokenToBigInteger(BRIDGE - .getPartitioner(partitioner) - .decorateKey((ByteBuffer) ByteBuffer.allocate(4).putInt(index).flip()) - .getToken()), - partitioner.name()); + String key = partitionNum + "/" + rowNumInPartition; + int count = counts[partitionNum][rowNumInPartition]; + if (count == 0) + { + LOGGER.error("Missing key key={} token={} partitioner={}", + key, + toToken(partitioner, partitionNum), + partitioner.name()); + } + else if (count > 1) + { + LOGGER.error("Key read by more than 1 Spark partition key={} token={} partitioner={}", + key, + toToken(partitioner, partitionNum), + partitioner.name()); + } + assertThat(count).as(count > 0 ? "Key " + key + " read " + count + " times" + : "Key not found: " + key).isEqualTo(1); } - assertThat(count).as(count > 0 ? "Key " + index + " read " + count + " times" - : "Key not found: " + index).isEqualTo(1); - index++; } assertThat(skippedDataOffsets.longValue()).isGreaterThan(0); @@ -198,4 +202,22 @@ else if (count > 1) } }); } + + static Stream partitionSizeProvider() + { + return Stream.of( + Arguments.of(100000, 1), + Arguments.of(1000, 100), + Arguments.of(100, 1000) + ); + } + + private BigInteger toToken(Partitioner partitioner, int index) + { + // Cast to ByteBuffer required when compiling with Java 8 + return ReaderUtils.tokenToBigInteger(BRIDGE + .getPartitioner(partitioner) + .decorateKey((ByteBuffer) ByteBuffer.allocate(4).putInt(index).flip()) + .getToken()); + } } diff --git a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/IndexOffsetTests.java b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/IndexOffsetTests.java index 316a9a1c7..b6d7db8c2 100644 --- a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/IndexOffsetTests.java +++ b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/IndexOffsetTests.java @@ -166,9 +166,9 @@ public void skippedDataDbStartOffset(long length) index, // Cast to ByteBuffer required when compiling with Java 8 ReaderUtils.tokenToBigInteger(BRIDGE - .getPartitioner(partitioner) - .decorateKey((ByteBuffer) ByteBuffer.allocate(4).putInt(index).flip()) - .getToken()), + .getPartitioner(partitioner) + .decorateKey((ByteBuffer) ByteBuffer.allocate(4).putInt(index).flip()) + .getToken()), partitioner.name()); } else if (count > 1) @@ -177,9 +177,9 @@ else if (count > 1) index, // Cast to ByteBuffer required when compiling with Java 8 ReaderUtils.tokenToBigInteger(BRIDGE - .getPartitioner(partitioner) - .decorateKey((ByteBuffer) ByteBuffer.allocate(4).putInt(index).flip()) - .getToken()), + .getPartitioner(partitioner) + .decorateKey((ByteBuffer) ByteBuffer.allocate(4).putInt(index).flip()) + .getToken()), partitioner.name()); } assertThat(count).as(count > 0 ? "Key " + index + " read " + count + " times"