Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
0.4.0
-----
* Cannot read start offset from BTI index with big partitions (CASSANALYTICS-151)
* BufferingInputStream fails to read last unaligned chunk (CASSANALYTICS-147)
* Support per-instance sidecar port resolution in CDC client (CASSANALYTICS-130)
* Change log level for logs in CassandraSchema to debug level to avoid log spamming (CASSANALYTICS-149)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,12 @@ public static Long startOffsetInDataFile(@NotNull SSTable ssTable,

try
{
withPartitionIndex(ssTable, descriptor, metadata, true, false, (dataFileHandle, partitionFileHandle, rowFileHandle, partitionIndex) -> {
withPartitionIndex(ssTable, descriptor, metadata, true, true, (dataFileHandle, partitionFileHandle, rowFileHandle, partitionIndex) -> {
TableMetadataRef metadataRef = TableMetadataRef.forOfflineTools(metadata);
BtiTableReader btiTableReader = new BtiTableReader.Builder(descriptor)
.setDataFile(dataFileHandle)
.setPartitionIndex(partitionIndex)
.setRowIndexFile(rowFileHandle)
.setComponents(indexComponents)
.setTableMetadataRef(metadataRef)
.setFilter(FilterFactory.AlwaysPresent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,21 @@
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.stream.Stream;

import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.commons.lang.mutable.MutableLong;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.bridge.CassandraBridgeImplementation;
import org.apache.cassandra.bridge.TokenRange;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.schema.Schema;
Expand Down Expand Up @@ -78,23 +82,27 @@ public class IndexOffsetTests
new BigInteger("9223372036854775807")))
.build();

@Test
@SuppressWarnings("static-access")
public void testReadIndexOffsets()
@ParameterizedTest
@MethodSource("partitionSizeProvider")
public void testReadIndexOffsets(int numPartitions, int numRowsPerPartition)
{
qt().forAll(arbitrary().enumValues(Partitioner.class), booleans().all())
.checkAssert((partitioner, enableCompression) -> {
try (TemporaryDirectory directory = new TemporaryDirectory())
{
int numKeys = 100000;
int numKeys = numPartitions * numRowsPerPartition;
TestSchema schema = TestSchema.basicBuilder(BRIDGE)
.withCompression(enableCompression)
.build();

schema.writeSSTable(directory, BRIDGE, partitioner, writer -> {
for (int index = 0; index < numKeys; index++)
for (int pk = 0; pk < numPartitions; pk++)
{
writer.write(index, 0, index);
for (int ck = 0; ck < numRowsPerPartition; ck++)
{
writer.write(pk, ck, pk);
}
}
});
assertThat(TestSSTable.countIn(directory.path())).isEqualTo(1);
Expand All @@ -113,7 +121,7 @@ public void testReadIndexOffsets()

MutableInt skippedPartitions = new MutableInt(0);
MutableLong skippedDataOffsets = new MutableLong(0);
int[] counts = new int[numKeys];
int[][] counts = new int[numPartitions][numRowsPerPartition];
for (TokenRange range : ranges)
{
SSTableReader reader = SSTableReader.builder(metadata, ssTable)
Expand Down Expand Up @@ -144,47 +152,45 @@ public void skippedDataDbStartOffset(long length)
while (scanner.hasNext())
{
UnfilteredRowIterator rowIterator = scanner.next();
int key = rowIterator.partitionKey().getKey().getInt();
// Count how many times we read a key across all 'spark' token partitions
counts[key]++;
int pk = rowIterator.partitionKey().getKey().getInt();
while (rowIterator.hasNext())
{
rowIterator.next();
Unfiltered unfiltered = rowIterator.next();
int ck = unfiltered.clustering().bufferAt(0).asIntBuffer().get();
// Count how many times we read a key across all 'spark' token partitions
counts[pk][ck]++;
}
}
}
}

// Verify we read each key exactly once across all Spark partitions
assertThat(counts).hasSize(numKeys);
assertThat(counts.length).isEqualTo(numPartitions);
int index = 0;
for (int count : counts)
for (int i = 0; i < counts.length; i++)
{
if (count == 0)
{
LOGGER.error("Missing key key={} token={} partitioner={}",
index,
// Cast to ByteBuffer required when compiling with Java 8
ReaderUtils.tokenToBigInteger(BRIDGE
.getPartitioner(partitioner)
.decorateKey((ByteBuffer) ByteBuffer.allocate(4).putInt(index).flip())
.getToken()),
partitioner.name());
}
else if (count > 1)
for (int j = 0; j < counts[i].length; j++)
{
LOGGER.error("Key read by more than 1 Spark partition key={} token={} partitioner={}",
index,
// Cast to ByteBuffer required when compiling with Java 8
ReaderUtils.tokenToBigInteger(BRIDGE
.getPartitioner(partitioner)
.decorateKey((ByteBuffer) ByteBuffer.allocate(4).putInt(index).flip())
.getToken()),
partitioner.name());
String key = i + "/" + j;
int count = counts[i][j];
if (count == 0)
{
LOGGER.error("Missing key key={} token={} partitioner={}",
key,
toToken(partitioner, index),
partitioner.name());
}
else if (count > 1)
{
LOGGER.error("Key read by more than 1 Spark partition key={} token={} partitioner={}",
key,
toToken(partitioner, index),
partitioner.name());
}
assertThat(count).as(count > 0 ? "Key " + key + " read " + count + " times"
: "Key not found: " + key).isEqualTo(1);
index++;
}
assertThat(count).as(count > 0 ? "Key " + index + " read " + count + " times"
: "Key not found: " + index).isEqualTo(1);
index++;
}

assertThat(skippedDataOffsets.longValue()).isGreaterThan(0);
Expand All @@ -198,4 +204,22 @@ else if (count > 1)
}
});
}

static Stream<Arguments> partitionSizeProvider()
{
return Stream.of(
Arguments.of(100000, 1),
Arguments.of(1000, 100),
Arguments.of(100, 1000)
Comment on lines +211 to +213
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indentation is off

);
}

private BigInteger toToken(Partitioner partitioner, int index)
{
// Cast to ByteBuffer required when compiling with Java 8
return ReaderUtils.tokenToBigInteger(BRIDGE
.getPartitioner(partitioner)
.decorateKey((ByteBuffer) ByteBuffer.allocate(4).putInt(index).flip())
.getToken());
Comment on lines +221 to +223
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indentation is off

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,21 @@
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.stream.Stream;

import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.commons.lang.mutable.MutableLong;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.bridge.CassandraBridgeImplementation;
import org.apache.cassandra.bridge.TokenRange;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.schema.Schema;
Expand Down Expand Up @@ -78,23 +82,27 @@ public class IndexOffsetTests
new BigInteger("9223372036854775807")))
.build();

@Test
@SuppressWarnings("static-access")
public void testReadIndexOffsets()
@ParameterizedTest
@MethodSource("partitionSizeProvider")
public void testReadIndexOffsets(int numPartitions, int numRowsPerPartition)
{
qt().forAll(arbitrary().enumValues(Partitioner.class), booleans().all())
.checkAssert((partitioner, enableCompression) -> {
try (TemporaryDirectory directory = new TemporaryDirectory())
{
int numKeys = 100000;
int numKeys = numPartitions * numRowsPerPartition;
TestSchema schema = TestSchema.basicBuilder(BRIDGE)
.withCompression(enableCompression)
.build();

schema.writeSSTable(directory, BRIDGE, partitioner, writer -> {
for (int index = 0; index < numKeys; index++)
for (int pk = 0; pk < numPartitions; pk++)
{
writer.write(index, 0, index);
for (int ck = 0; ck < numRowsPerPartition; ck++)
{
writer.write(pk, ck, pk);
}
}
});
assertThat(TestSSTable.countIn(directory.path())).isEqualTo(1);
Expand All @@ -113,7 +121,7 @@ public void testReadIndexOffsets()

MutableInt skippedPartitions = new MutableInt(0);
MutableLong skippedDataOffsets = new MutableLong(0);
int[] counts = new int[numKeys];
int[][] counts = new int[numPartitions][numRowsPerPartition];
for (TokenRange range : ranges)
{
SSTableReader reader = SSTableReader.builder(metadata, ssTable)
Expand Down Expand Up @@ -144,47 +152,45 @@ public void skippedDataDbStartOffset(long length)
while (scanner.hasNext())
{
UnfilteredRowIterator rowIterator = scanner.next();
int key = rowIterator.partitionKey().getKey().getInt();
// Count how many times we read a key across all 'spark' token partitions
counts[key]++;
int pk = rowIterator.partitionKey().getKey().getInt();
while (rowIterator.hasNext())
{
rowIterator.next();
Unfiltered unfiltered = rowIterator.next();
int ck = unfiltered.clustering().bufferAt(0).asIntBuffer().get();
// Count how many times we read a key across all 'spark' token partitions
counts[pk][ck]++;
}
}
}
}

// Verify we read each key exactly once across all Spark partitions
assertThat(counts).hasSize(numKeys);
assertThat(counts.length).isEqualTo(numPartitions);
int index = 0;
for (int count : counts)
for (int i = 0; i < counts.length; i++)
{
if (count == 0)
{
LOGGER.error("Missing key key={} token={} partitioner={}",
index,
// Cast to ByteBuffer required when compiling with Java 8
ReaderUtils.tokenToBigInteger(BRIDGE
.getPartitioner(partitioner)
.decorateKey((ByteBuffer) ByteBuffer.allocate(4).putInt(index).flip())
.getToken()),
partitioner.name());
}
else if (count > 1)
for (int j = 0; j < counts[i].length; j++)
{
LOGGER.error("Key read by more than 1 Spark partition key={} token={} partitioner={}",
index,
// Cast to ByteBuffer required when compiling with Java 8
ReaderUtils.tokenToBigInteger(BRIDGE
.getPartitioner(partitioner)
.decorateKey((ByteBuffer) ByteBuffer.allocate(4).putInt(index).flip())
.getToken()),
partitioner.name());
String key = i + "/" + j;
int count = counts[i][j];
if (count == 0)
{
LOGGER.error("Missing key key={} token={} partitioner={}",
key,
toToken(partitioner, index),
partitioner.name());
}
else if (count > 1)
{
LOGGER.error("Key read by more than 1 Spark partition key={} token={} partitioner={}",
key,
toToken(partitioner, index),
partitioner.name());
}
assertThat(count).as(count > 0 ? "Key " + key + " read " + count + " times"
: "Key not found: " + key).isEqualTo(1);
index++;
}
assertThat(count).as(count > 0 ? "Key " + index + " read " + count + " times"
: "Key not found: " + index).isEqualTo(1);
index++;
}

assertThat(skippedDataOffsets.longValue()).isGreaterThan(0);
Expand All @@ -198,4 +204,22 @@ else if (count > 1)
}
});
}

static Stream<Arguments> partitionSizeProvider()
{
return Stream.of(
Arguments.of(100000, 1),
Arguments.of(1000, 100),
Arguments.of(100, 1000)
Comment on lines +211 to +213
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indentation is off

);
}

private BigInteger toToken(Partitioner partitioner, int index)
{
// Cast to ByteBuffer required when compiling with Java 8
return ReaderUtils.tokenToBigInteger(BRIDGE
.getPartitioner(partitioner)
.decorateKey((ByteBuffer) ByteBuffer.allocate(4).putInt(index).flip())
.getToken());
Comment on lines +221 to +223
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indentation is off

}
}
Loading