Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -1831,17 +1831,19 @@ public CompactionManager.AllSSTableOpStatus verify(IVerifier.Options options) th
/**
* Rewrites all SSTables according to specified parameters
*
* @param skipIfCurrentVersion - if {@link true}, will rewrite only SSTables that have version older than the current one ({@link SSTableFormat#getLatestVersion()})
* @param skipIfCurrentVersion - if {@link true}, will rewrite only SSTables that have version older than the current one ({@link SSTableFormat#getLatestVersion()})
* @param skipIfNewerThanTimestamp - max timestamp (local creation time) for SSTable; SSTables created _after_ this timestamp will be excluded from compaction
* @param skipIfCompressionMatches - if {@link true}, will rewrite only SSTables whose compression parameters are different from {@code TableMetadata#params#getCompressionParameters()}
* @param jobs number of jobs for parallel execution
* @param latestColumnsOnly
* @param jobs number of jobs for parallel execution
*/
public CompactionManager.AllSSTableOpStatus sstablesRewrite(final boolean skipIfCurrentVersion,
final long skipIfNewerThanTimestamp,
final boolean skipIfCompressionMatches,
final int jobs) throws ExecutionException, InterruptedException
final int jobs,
boolean latestColumnsOnly) throws ExecutionException, InterruptedException
{
return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, skipIfCurrentVersion, skipIfNewerThanTimestamp, skipIfCompressionMatches, jobs);
return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, skipIfCurrentVersion, skipIfNewerThanTimestamp, skipIfCompressionMatches, jobs, latestColumnsOnly);
}

public CompactionManager.AllSSTableOpStatus relocateSSTables(int jobs) throws ExecutionException, InterruptedException
Expand Down
23 changes: 20 additions & 3 deletions src/java/org/apache/cassandra/db/SerializationHeader.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public static SerializationHeader makeWithoutStats(TableMetadata metadata)
return new SerializationHeader(true, metadata, metadata.regularAndStaticColumns(), EncodingStats.NO_STATS);
}

public static SerializationHeader make(TableMetadata metadata, Collection<SSTableReader> sstables)
public static SerializationHeader make(TableMetadata metadata, Collection<SSTableReader> sstables, boolean latestColumnsOnly)
{
// The serialization header has to be computed before the start of compaction (since it's used to write)
// the result. This means that when compacting multiple sources, we won't have perfectly accurate stats
Expand All @@ -91,16 +91,33 @@ public static SerializationHeader make(TableMetadata metadata, Collection<SSTabl
EncodingStats.Collector stats = new EncodingStats.Collector();
RegularAndStaticColumns.Builder columns = RegularAndStaticColumns.builder();
// We need to order the SSTables by descending generation to be sure that we use latest column metadata.
for (SSTableReader sstable : orderByDescendingGeneration(sstables))

Collection<SSTableReader> ssTableReaders = orderByDescendingGeneration(sstables);

for (SSTableReader sstable : ssTableReaders)
{
stats.updateTimestamp(sstable.getMinTimestamp());
stats.updateLocalDeletionTime(sstable.getMinLocalDeletionTime());
stats.updateTTL(sstable.getMinTTL());
columns.addAll(sstable.header.columns());
}

if (latestColumnsOnly)
{
columns.addAll(metadata.regularAndStaticColumns());
}
else
{
for (SSTableReader sstable : ssTableReaders)
columns.addAll(sstable.header.columns());
}
return new SerializationHeader(true, metadata, columns.build(), stats.get());
}

public static SerializationHeader make(TableMetadata metadata, Collection<SSTableReader> sstables)
{
return make(metadata, sstables, false);
}

private static Collection<SSTableReader> orderByDescendingGeneration(Collection<SSTableReader> sstables)
{
if (sstables.size() < 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,12 @@ public void shutdown()

public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, final long gcBefore, long maxSSTableBytes)
{
return new CompactionTask(cfs, txn, gcBefore);
return getCompactionTask(txn, gcBefore, maxSSTableBytes, false);
}

public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, final long gcBefore, long maxSSTableBytes, boolean latestColumnsOnly)
{
return new CompactionTask(cfs, txn, gcBefore, latestColumnsOnly);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,29 @@ public abstract class AbstractCompactionTask extends WrappedRunnable
protected ILifecycleTransaction transaction;
protected boolean isUserDefined;
protected OperationType compactionType;
protected final boolean latestColumnsOnly;

/**
* @param cfs
* @param cfs column family of this compaction task
* @param transaction the modifying managing the status of the sstables we're replacing
*/
public AbstractCompactionTask(ColumnFamilyStore cfs, ILifecycleTransaction transaction)
{
this(cfs, transaction, false);
}

/**
* @param cfs column family of this compaction task
* @param transaction the modifying managing the status of the sstables we're replacing
* @param latestColumnsOnly true if compaction should produce SSTables without e.g. dropped columns in serialisation header
*/
public AbstractCompactionTask(ColumnFamilyStore cfs, ILifecycleTransaction transaction, boolean latestColumnsOnly)
{
this.cfs = cfs;
this.transaction = transaction;
this.isUserDefined = false;
this.compactionType = OperationType.COMPACTION;
this.latestColumnsOnly = latestColumnsOnly;

try
{
Expand Down
22 changes: 18 additions & 4 deletions src/java/org/apache/cassandra/db/compaction/CompactionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.repair.NoSuchRepairSessionException;
import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
import org.apache.cassandra.schema.DroppedColumn;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService;
Expand Down Expand Up @@ -689,9 +690,22 @@ public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs,
final boolean skipIfCurrentVersion,
final long skipIfOlderThanTimestamp,
final boolean skipIfCompressionMatches,
int jobs) throws InterruptedException, ExecutionException
int jobs,
boolean latestColumnsOnly) throws InterruptedException, ExecutionException
{
return performSSTableRewrite(cfs, (sstable) -> {
// Should we remove dropped columns from header?
if (latestColumnsOnly)
{
for (DroppedColumn droppedColumn : cfs.metadata().droppedColumns.values())
{
if (sstable.header.columns().contains(droppedColumn.column))
{
return true;
}
}
}

// Skip if descriptor version matches current version
if (skipIfCurrentVersion && sstable.descriptor.version.equals(sstable.descriptor.getFormat().getLatestVersion()))
return false;
Expand All @@ -708,15 +722,15 @@ public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs,
return false;

return true;
}, jobs);
}, jobs, latestColumnsOnly);
}

/**
* Perform SSTable rewrite

* @param sstableFilter sstables for which predicate returns {@link false} will be excluded
*/
public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, Predicate<SSTableReader> sstableFilter, int jobs) throws InterruptedException, ExecutionException
public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, Predicate<SSTableReader> sstableFilter, int jobs, boolean latestColumnsOnly) throws InterruptedException, ExecutionException
{
return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
{
Expand All @@ -741,7 +755,7 @@ public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction)
@Override
public void execute(LifecycleTransaction txn)
{
AbstractCompactionTask task = cfs.getCompactionStrategyManager().getCompactionTask(txn, NO_GC, Long.MAX_VALUE);
AbstractCompactionTask task = cfs.getCompactionStrategyManager().getCompactionTask(txn, NO_GC, Long.MAX_VALUE, latestColumnsOnly);
task.setUserDefined(true);
task.setCompactionType(OperationType.UPGRADE_SSTABLES);
task.execute(active);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1036,14 +1036,14 @@ public long getMaxSSTableBytes()
return maxSSTableSizeBytes;
}

public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, long gcBefore, long maxSSTableBytes)
public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, long gcBefore, long maxSSTableBytes, boolean latestColumnsOnly)
{
maybeReloadDiskBoundaries();
readLock.lock();
try
{
validateForCompaction(txn.originals());
return compactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
return compactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes, latestColumnsOnly);
}
finally
{
Expand All @@ -1052,6 +1052,11 @@ public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, long g

}

public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, long gcBefore, long maxSSTableBytes)
{
return getCompactionTask(txn, gcBefore, maxSSTableBytes, false);
}

private void validateForCompaction(Iterable<SSTableReader> input)
{
readLock.lock();
Expand Down
12 changes: 9 additions & 3 deletions src/java/org/apache/cassandra/db/compaction/CompactionTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,15 @@ public CompactionTask(ColumnFamilyStore cfs, ILifecycleTransaction txn, long gcB
this(cfs, txn, gcBefore, false);
}

public CompactionTask(ColumnFamilyStore cfs, ILifecycleTransaction txn, long gcBefore, boolean keepOriginals)
public CompactionTask(ColumnFamilyStore cfs, ILifecycleTransaction txn, long gcBefore, boolean latestColumnsOnly)
{
super(cfs, txn);
this(cfs, txn, gcBefore, false, latestColumnsOnly);
}


public CompactionTask(ColumnFamilyStore cfs, ILifecycleTransaction txn, long gcBefore, boolean keepOriginals, boolean latestColumnsOnly)
{
super(cfs, txn, latestColumnsOnly);
this.gcBefore = gcBefore;
this.keepOriginals = keepOriginals;
}
Expand Down Expand Up @@ -350,7 +356,7 @@ public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
ILifecycleTransaction transaction,
Set<SSTableReader> nonExpiredSSTables)
{
return new DefaultCompactionWriter(cfs, directories, transaction, nonExpiredSSTables, keepOriginals, getLevel());
return new DefaultCompactionWriter(cfs, directories, transaction, nonExpiredSSTables, keepOriginals, getLevel(), latestColumnsOnly);
}

public static String updateCompactionHistory(TimeUUID taskId, String keyspaceName, String columnFamilyName, long[] mergedRowCounts, long startSize, long endSize, Map<String, String> compactionProperties)
Expand Down
8 changes: 4 additions & 4 deletions src/java/org/apache/cassandra/db/compaction/Upgrader.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public Upgrader(ColumnFamilyStore cfs, LifecycleTransaction txn, OutputHandler o
this.estimatedRows = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
}

private SSTableWriter createCompactionWriter(StatsMetadata metadata)
private SSTableWriter createCompactionWriter(StatsMetadata metadata, boolean latestColumnsOnly)
{
MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.getComparator());
sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel());
Expand All @@ -82,22 +82,22 @@ private SSTableWriter createCompactionWriter(StatsMetadata metadata)
.setTransientSSTable(metadata.isTransient)
.setTableMetadataRef(cfs.metadata)
.setMetadataCollector(sstableMetadataCollector)
.setSerializationHeader(SerializationHeader.make(cfs.metadata(), Sets.newHashSet(sstable)))
.setSerializationHeader(SerializationHeader.make(cfs.metadata(), Sets.newHashSet(sstable), latestColumnsOnly))
.addDefaultComponents(cfs.indexManager.listIndexGroups())
.setSecondaryIndexGroups(cfs.indexManager.listIndexGroups())
.setCompressionDictionaryManager(cfs.compressionDictionaryManager())
.build(transaction, cfs);
}

public void upgrade(boolean keepOriginals)
public void upgrade(boolean keepOriginals, boolean latestColumnsOnly)
{
outputHandler.output("Upgrading " + sstable);
long nowInSec = FBUtilities.nowInSeconds();
try (SSTableRewriter writer = SSTableRewriter.construct(cfs, transaction, keepOriginals, CompactionTask.getMaxDataAge(transaction.originals()));
AbstractCompactionStrategy.ScannerList scanners = strategyManager.getScanners(transaction.originals());
CompactionIterator iter = new CompactionIterator(transaction.opType(), scanners.scanners, controller, nowInSec, nextTimeUUID()))
{
writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata()));
writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata(), latestColumnsOnly));
iter.setTargetDirectory(writer.currentWriter().getFilename());
while (iter.hasNext())
writer.append(iter.next());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
protected final long minRepairedAt;
protected final TimeUUID pendingRepair;
protected final boolean isTransient;
protected final boolean latestColumnsOnly;

protected final SSTableRewriter sstableWriter;
protected final ILifecycleTransaction txn;
Expand All @@ -76,19 +77,31 @@ public CompactionAwareWriter(ColumnFamilyStore cfs,
Set<SSTableReader> nonExpiredSSTables,
boolean keepOriginals)
{
this(cfs, directories, txn, nonExpiredSSTables, keepOriginals, true);
this(cfs, directories, txn, nonExpiredSSTables, keepOriginals, true, false);
}

public CompactionAwareWriter(ColumnFamilyStore cfs,
Directories directories,
ILifecycleTransaction txn,
Set<SSTableReader> nonExpiredSSTables,
boolean keepOriginals,
boolean earlyOpenAllowed)
boolean latestColumnsOnly)
{
this(cfs, directories, txn, nonExpiredSSTables, keepOriginals, true, latestColumnsOnly);
}

public CompactionAwareWriter(ColumnFamilyStore cfs,
Directories directories,
ILifecycleTransaction txn,
Set<SSTableReader> nonExpiredSSTables,
boolean keepOriginals,
boolean earlyOpenAllowed,
boolean latestColumnsOnly)
{
this.cfs = cfs;
this.directories = directories;
this.nonExpiredSSTables = nonExpiredSSTables;
this.latestColumnsOnly = latestColumnsOnly;
this.txn = txn;

estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
Expand Down Expand Up @@ -230,22 +243,27 @@ protected boolean maybeSwitchLocation(DecoratedKey key)
protected void switchCompactionWriter(Directories.DataDirectory directory, DecoratedKey nextKey)
{
currentDirectory = directory;
sstableWriter.switchWriter(sstableWriter(directory, nextKey));
sstableWriter.switchWriter(sstableWriter(directory, nextKey, latestColumnsOnly));
}

protected SSTableWriter sstableWriter(Directories.DataDirectory directory, DecoratedKey nextKey)
protected SSTableWriter sstableWriter(Directories.DataDirectory directory, DecoratedKey nextKey, boolean latestColumnsOnly)
{
Descriptor descriptor = cfs.newSSTableDescriptor(getDirectories().getLocationForDisk(directory));
MetadataCollector collector = new MetadataCollector(txn.originals(), cfs.metadata().comparator)
.sstableLevel(sstableLevel());
SerializationHeader header = SerializationHeader.make(cfs.metadata(), nonExpiredSSTables);
SerializationHeader header = SerializationHeader.make(cfs.metadata(), nonExpiredSSTables, latestColumnsOnly);

return newWriterBuilder(descriptor).setMetadataCollector(collector)
.setSerializationHeader(header)
.setKeyCount(sstableKeyCount())
.build(txn, cfs);
}

protected SSTableWriter sstableWriter(Directories.DataDirectory directory, DecoratedKey nextKey)
{
return sstableWriter(directory, nextKey, false);
}

/**
* Returns the level that should be used when creating sstables.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, I

public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, ILifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean keepOriginals, int sstableLevel)
{
super(cfs, directories, txn, nonExpiredSSTables, keepOriginals);
this(cfs, directories, txn, nonExpiredSSTables, keepOriginals, sstableLevel, false);
}

public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, ILifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean keepOriginals, int sstableLevel, boolean latestColumnsOnly)
{
super(cfs, directories, txn, nonExpiredSSTables, keepOriginals, latestColumnsOnly);
this.sstableLevel = sstableLevel;
}

Expand Down
Loading