From f8f205159b42e848069a890215061f3a3c22260f Mon Sep 17 00:00:00 2001 From: Stefan Miklosovic Date: Fri, 14 Nov 2025 12:26:12 +0100 Subject: [PATCH] wip --- .../cassandra/db/ColumnFamilyStore.java | 10 +++--- .../cassandra/db/SerializationHeader.java | 23 +++++++++++-- .../AbstractCompactionStrategy.java | 7 +++- .../db/compaction/AbstractCompactionTask.java | 14 +++++++- .../db/compaction/CompactionManager.java | 22 ++++++++++--- .../compaction/CompactionStrategyManager.java | 9 ++++-- .../db/compaction/CompactionTask.java | 12 +++++-- .../cassandra/db/compaction/Upgrader.java | 8 ++--- .../writers/CompactionAwareWriter.java | 28 +++++++++++++--- .../writers/DefaultCompactionWriter.java | 7 +++- .../cassandra/service/StorageService.java | 17 ++++++++-- .../service/StorageServiceMBean.java | 10 +++++- .../org/apache/cassandra/tools/NodeProbe.java | 8 ++--- .../cassandra/tools/StandaloneUpgrader.java | 32 ++++++++++++++++--- .../tools/nodetool/UpgradeSSTable.java | 7 +++- 15 files changed, 172 insertions(+), 42 deletions(-) diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 839a21d55d67..11251bc2417c 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1831,17 +1831,19 @@ public CompactionManager.AllSSTableOpStatus verify(IVerifier.Options options) th /** * Rewrites all SSTables according to specified parameters * - * @param skipIfCurrentVersion - if {@link true}, will rewrite only SSTables that have version older than the current one ({@link SSTableFormat#getLatestVersion()}) + * @param skipIfCurrentVersion - if {@link true}, will rewrite only SSTables that have version older than the current one ({@link SSTableFormat#getLatestVersion()}) * @param skipIfNewerThanTimestamp - max timestamp (local creation time) for SSTable; SSTables created _after_ this timestamp will be excluded from compaction * @param skipIfCompressionMatches - if {@link true}, will rewrite only SSTables whose compression parameters are different from {@code TableMetadata#params#getCompressionParameters()} - * @param jobs number of jobs for parallel execution + * @param latestColumnsOnly + * @param jobs number of jobs for parallel execution */ public CompactionManager.AllSSTableOpStatus sstablesRewrite(final boolean skipIfCurrentVersion, final long skipIfNewerThanTimestamp, final boolean skipIfCompressionMatches, - final int jobs) throws ExecutionException, InterruptedException + final int jobs, + boolean latestColumnsOnly) throws ExecutionException, InterruptedException { - return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, skipIfCurrentVersion, skipIfNewerThanTimestamp, skipIfCompressionMatches, jobs); + return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, skipIfCurrentVersion, skipIfNewerThanTimestamp, skipIfCompressionMatches, jobs, latestColumnsOnly); } public CompactionManager.AllSSTableOpStatus relocateSSTables(int jobs) throws ExecutionException, InterruptedException diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java index 0c5fc53beb71..d058b5a78bf1 100644 --- a/src/java/org/apache/cassandra/db/SerializationHeader.java +++ b/src/java/org/apache/cassandra/db/SerializationHeader.java @@ -76,7 +76,7 @@ public static SerializationHeader makeWithoutStats(TableMetadata metadata) return new SerializationHeader(true, metadata, metadata.regularAndStaticColumns(), EncodingStats.NO_STATS); } - public static SerializationHeader make(TableMetadata metadata, Collection sstables) + public static SerializationHeader make(TableMetadata metadata, Collection sstables, boolean latestColumnsOnly) { // The serialization header has to be computed before the start of compaction (since it's used to write) // the result. This means that when compacting multiple sources, we won't have perfectly accurate stats @@ -91,16 +91,33 @@ public static SerializationHeader make(TableMetadata metadata, Collection ssTableReaders = orderByDescendingGeneration(sstables); + + for (SSTableReader sstable : ssTableReaders) { stats.updateTimestamp(sstable.getMinTimestamp()); stats.updateLocalDeletionTime(sstable.getMinLocalDeletionTime()); stats.updateTTL(sstable.getMinTTL()); - columns.addAll(sstable.header.columns()); + } + + if (latestColumnsOnly) + { + columns.addAll(metadata.regularAndStaticColumns()); + } + else + { + for (SSTableReader sstable : ssTableReaders) + columns.addAll(sstable.header.columns()); } return new SerializationHeader(true, metadata, columns.build(), stats.get()); } + public static SerializationHeader make(TableMetadata metadata, Collection sstables) + { + return make(metadata, sstables, false); + } + private static Collection orderByDescendingGeneration(Collection sstables) { if (sstables.size() < 2) diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index def444f9a9c0..495fbfd5167b 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -206,7 +206,12 @@ public void shutdown() public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, final long gcBefore, long maxSSTableBytes) { - return new CompactionTask(cfs, txn, gcBefore); + return getCompactionTask(txn, gcBefore, maxSSTableBytes, false); + } + + public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, final long gcBefore, long maxSSTableBytes, boolean latestColumnsOnly) + { + return new CompactionTask(cfs, txn, gcBefore, latestColumnsOnly); } /** diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java index d0b3dc498f06..8b60d35b630f 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java @@ -35,17 +35,29 @@ public abstract class AbstractCompactionTask extends WrappedRunnable protected ILifecycleTransaction transaction; protected boolean isUserDefined; protected OperationType compactionType; + protected final boolean latestColumnsOnly; /** - * @param cfs + * @param cfs column family of this compaction task * @param transaction the modifying managing the status of the sstables we're replacing */ public AbstractCompactionTask(ColumnFamilyStore cfs, ILifecycleTransaction transaction) + { + this(cfs, transaction, false); + } + + /** + * @param cfs column family of this compaction task + * @param transaction the modifying managing the status of the sstables we're replacing + * @param latestColumnsOnly true if compaction should produce SSTables without e.g. dropped columns in serialisation header + */ + public AbstractCompactionTask(ColumnFamilyStore cfs, ILifecycleTransaction transaction, boolean latestColumnsOnly) { this.cfs = cfs; this.transaction = transaction; this.isUserDefined = false; this.compactionType = OperationType.COMPACTION; + this.latestColumnsOnly = latestColumnsOnly; try { diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 4313eb82a1d4..f75ceeaf5de4 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -109,6 +109,7 @@ import org.apache.cassandra.metrics.TableMetrics; import org.apache.cassandra.repair.NoSuchRepairSessionException; import org.apache.cassandra.schema.CompactionParams.TombstoneOption; +import org.apache.cassandra.schema.DroppedColumn; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ActiveRepairService; @@ -689,9 +690,22 @@ public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, final boolean skipIfCurrentVersion, final long skipIfOlderThanTimestamp, final boolean skipIfCompressionMatches, - int jobs) throws InterruptedException, ExecutionException + int jobs, + boolean latestColumnsOnly) throws InterruptedException, ExecutionException { return performSSTableRewrite(cfs, (sstable) -> { + // Should we remove dropped columns from header? + if (latestColumnsOnly) + { + for (DroppedColumn droppedColumn : cfs.metadata().droppedColumns.values()) + { + if (sstable.header.columns().contains(droppedColumn.column)) + { + return true; + } + } + } + // Skip if descriptor version matches current version if (skipIfCurrentVersion && sstable.descriptor.version.equals(sstable.descriptor.getFormat().getLatestVersion())) return false; @@ -708,7 +722,7 @@ public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, return false; return true; - }, jobs); + }, jobs, latestColumnsOnly); } /** @@ -716,7 +730,7 @@ public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, * @param sstableFilter sstables for which predicate returns {@link false} will be excluded */ - public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, Predicate sstableFilter, int jobs) throws InterruptedException, ExecutionException + public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, Predicate sstableFilter, int jobs, boolean latestColumnsOnly) throws InterruptedException, ExecutionException { return parallelAllSSTableOperation(cfs, new OneSSTableOperation() { @@ -741,7 +755,7 @@ public Iterable filterSSTables(LifecycleTransaction transaction) @Override public void execute(LifecycleTransaction txn) { - AbstractCompactionTask task = cfs.getCompactionStrategyManager().getCompactionTask(txn, NO_GC, Long.MAX_VALUE); + AbstractCompactionTask task = cfs.getCompactionStrategyManager().getCompactionTask(txn, NO_GC, Long.MAX_VALUE, latestColumnsOnly); task.setUserDefined(true); task.setCompactionType(OperationType.UPGRADE_SSTABLES); task.execute(active); diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index b8eaa5bd812c..4d48645ab372 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -1036,14 +1036,14 @@ public long getMaxSSTableBytes() return maxSSTableSizeBytes; } - public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, long gcBefore, long maxSSTableBytes) + public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, long gcBefore, long maxSSTableBytes, boolean latestColumnsOnly) { maybeReloadDiskBoundaries(); readLock.lock(); try { validateForCompaction(txn.originals()); - return compactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes); + return compactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes, latestColumnsOnly); } finally { @@ -1052,6 +1052,11 @@ public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, long g } + public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, long gcBefore, long maxSSTableBytes) + { + return getCompactionTask(txn, gcBefore, maxSSTableBytes, false); + } + private void validateForCompaction(Iterable input) { readLock.lock(); diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index c9af97fe95bf..391b03c1cbfd 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -81,9 +81,15 @@ public CompactionTask(ColumnFamilyStore cfs, ILifecycleTransaction txn, long gcB this(cfs, txn, gcBefore, false); } - public CompactionTask(ColumnFamilyStore cfs, ILifecycleTransaction txn, long gcBefore, boolean keepOriginals) + public CompactionTask(ColumnFamilyStore cfs, ILifecycleTransaction txn, long gcBefore, boolean latestColumnsOnly) { - super(cfs, txn); + this(cfs, txn, gcBefore, false, latestColumnsOnly); + } + + + public CompactionTask(ColumnFamilyStore cfs, ILifecycleTransaction txn, long gcBefore, boolean keepOriginals, boolean latestColumnsOnly) + { + super(cfs, txn, latestColumnsOnly); this.gcBefore = gcBefore; this.keepOriginals = keepOriginals; } @@ -350,7 +356,7 @@ public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, ILifecycleTransaction transaction, Set nonExpiredSSTables) { - return new DefaultCompactionWriter(cfs, directories, transaction, nonExpiredSSTables, keepOriginals, getLevel()); + return new DefaultCompactionWriter(cfs, directories, transaction, nonExpiredSSTables, keepOriginals, getLevel(), latestColumnsOnly); } public static String updateCompactionHistory(TimeUUID taskId, String keyspaceName, String columnFamilyName, long[] mergedRowCounts, long startSize, long endSize, Map compactionProperties) diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java index 22913a84f612..96cc218e028f 100644 --- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java +++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java @@ -69,7 +69,7 @@ public Upgrader(ColumnFamilyStore cfs, LifecycleTransaction txn, OutputHandler o this.estimatedRows = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables); } - private SSTableWriter createCompactionWriter(StatsMetadata metadata) + private SSTableWriter createCompactionWriter(StatsMetadata metadata, boolean latestColumnsOnly) { MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.getComparator()); sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel()); @@ -82,14 +82,14 @@ private SSTableWriter createCompactionWriter(StatsMetadata metadata) .setTransientSSTable(metadata.isTransient) .setTableMetadataRef(cfs.metadata) .setMetadataCollector(sstableMetadataCollector) - .setSerializationHeader(SerializationHeader.make(cfs.metadata(), Sets.newHashSet(sstable))) + .setSerializationHeader(SerializationHeader.make(cfs.metadata(), Sets.newHashSet(sstable), latestColumnsOnly)) .addDefaultComponents(cfs.indexManager.listIndexGroups()) .setSecondaryIndexGroups(cfs.indexManager.listIndexGroups()) .setCompressionDictionaryManager(cfs.compressionDictionaryManager()) .build(transaction, cfs); } - public void upgrade(boolean keepOriginals) + public void upgrade(boolean keepOriginals, boolean latestColumnsOnly) { outputHandler.output("Upgrading " + sstable); long nowInSec = FBUtilities.nowInSeconds(); @@ -97,7 +97,7 @@ public void upgrade(boolean keepOriginals) AbstractCompactionStrategy.ScannerList scanners = strategyManager.getScanners(transaction.originals()); CompactionIterator iter = new CompactionIterator(transaction.opType(), scanners.scanners, controller, nowInSec, nextTimeUUID())) { - writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata())); + writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata(), latestColumnsOnly)); iter.setTargetDirectory(writer.currentWriter().getFilename()); while (iter.hasNext()) writer.append(iter.next()); diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java index fd1966ad35d3..8310a37634d3 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java @@ -62,6 +62,7 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa protected final long minRepairedAt; protected final TimeUUID pendingRepair; protected final boolean isTransient; + protected final boolean latestColumnsOnly; protected final SSTableRewriter sstableWriter; protected final ILifecycleTransaction txn; @@ -76,7 +77,7 @@ public CompactionAwareWriter(ColumnFamilyStore cfs, Set nonExpiredSSTables, boolean keepOriginals) { - this(cfs, directories, txn, nonExpiredSSTables, keepOriginals, true); + this(cfs, directories, txn, nonExpiredSSTables, keepOriginals, true, false); } public CompactionAwareWriter(ColumnFamilyStore cfs, @@ -84,11 +85,23 @@ public CompactionAwareWriter(ColumnFamilyStore cfs, ILifecycleTransaction txn, Set nonExpiredSSTables, boolean keepOriginals, - boolean earlyOpenAllowed) + boolean latestColumnsOnly) + { + this(cfs, directories, txn, nonExpiredSSTables, keepOriginals, true, latestColumnsOnly); + } + + public CompactionAwareWriter(ColumnFamilyStore cfs, + Directories directories, + ILifecycleTransaction txn, + Set nonExpiredSSTables, + boolean keepOriginals, + boolean earlyOpenAllowed, + boolean latestColumnsOnly) { this.cfs = cfs; this.directories = directories; this.nonExpiredSSTables = nonExpiredSSTables; + this.latestColumnsOnly = latestColumnsOnly; this.txn = txn; estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables); @@ -230,15 +243,15 @@ protected boolean maybeSwitchLocation(DecoratedKey key) protected void switchCompactionWriter(Directories.DataDirectory directory, DecoratedKey nextKey) { currentDirectory = directory; - sstableWriter.switchWriter(sstableWriter(directory, nextKey)); + sstableWriter.switchWriter(sstableWriter(directory, nextKey, latestColumnsOnly)); } - protected SSTableWriter sstableWriter(Directories.DataDirectory directory, DecoratedKey nextKey) + protected SSTableWriter sstableWriter(Directories.DataDirectory directory, DecoratedKey nextKey, boolean latestColumnsOnly) { Descriptor descriptor = cfs.newSSTableDescriptor(getDirectories().getLocationForDisk(directory)); MetadataCollector collector = new MetadataCollector(txn.originals(), cfs.metadata().comparator) .sstableLevel(sstableLevel()); - SerializationHeader header = SerializationHeader.make(cfs.metadata(), nonExpiredSSTables); + SerializationHeader header = SerializationHeader.make(cfs.metadata(), nonExpiredSSTables, latestColumnsOnly); return newWriterBuilder(descriptor).setMetadataCollector(collector) .setSerializationHeader(header) @@ -246,6 +259,11 @@ protected SSTableWriter sstableWriter(Directories.DataDirectory directory, Decor .build(txn, cfs); } + protected SSTableWriter sstableWriter(Directories.DataDirectory directory, DecoratedKey nextKey) + { + return sstableWriter(directory, nextKey, false); + } + /** * Returns the level that should be used when creating sstables. */ diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java index 2b124f4417ab..b0e143a44ec7 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java @@ -44,7 +44,12 @@ public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, I public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, ILifecycleTransaction txn, Set nonExpiredSSTables, boolean keepOriginals, int sstableLevel) { - super(cfs, directories, txn, nonExpiredSSTables, keepOriginals); + this(cfs, directories, txn, nonExpiredSSTables, keepOriginals, sstableLevel, false); + } + + public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, ILifecycleTransaction txn, Set nonExpiredSSTables, boolean keepOriginals, int sstableLevel, boolean latestColumnsOnly) + { + super(cfs, directories, txn, nonExpiredSSTables, keepOriginals, latestColumnsOnly); this.sstableLevel = sstableLevel; } diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 84a61f6b2646..9bc511b680d4 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -2719,14 +2719,24 @@ public int upgradeSSTables(String keyspaceName, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException { - return rewriteSSTables(keyspaceName, skipIfCurrentVersion, skipIfNewerThanTimestamp, false, jobs, tableNames); + return upgradeSSTables(keyspaceName, skipIfCurrentVersion, skipIfNewerThanTimestamp, jobs, false, tableNames); + } + + public int upgradeSSTables(String keyspaceName, + final boolean skipIfCurrentVersion, + final long skipIfNewerThanTimestamp, + int jobs, + boolean latestColumnsOnly, + String... tableNames) throws IOException, ExecutionException, InterruptedException + { + return rewriteSSTables(keyspaceName, skipIfCurrentVersion, skipIfNewerThanTimestamp, false, jobs, latestColumnsOnly, tableNames); } public int recompressSSTables(String keyspaceName, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException { - return rewriteSSTables(keyspaceName, false, Long.MAX_VALUE, true, jobs, tableNames); + return rewriteSSTables(keyspaceName, false, Long.MAX_VALUE, true, jobs, false, tableNames); } @@ -2735,13 +2745,14 @@ public int rewriteSSTables(String keyspaceName, final long skipIfNewerThanTimestamp, final boolean skipIfCompressionMatches, int jobs, + boolean latestColumnsOnly, String... tableNames) throws IOException, ExecutionException, InterruptedException { CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL; logger.info("Starting {} on {}.{}", OperationType.UPGRADE_SSTABLES, keyspaceName, Arrays.toString(tableNames)); for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, true, keyspaceName, tableNames)) { - CompactionManager.AllSSTableOpStatus oneStatus = cfStore.sstablesRewrite(skipIfCurrentVersion, skipIfNewerThanTimestamp, skipIfCompressionMatches, jobs); + CompactionManager.AllSSTableOpStatus oneStatus = cfStore.sstablesRewrite(skipIfCurrentVersion, skipIfNewerThanTimestamp, skipIfCompressionMatches, jobs, latestColumnsOnly); if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL) status = oneStatus; } diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index e5cbd980c4cc..a132f4253d02 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -468,7 +468,15 @@ default int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, return upgradeSSTables(keyspaceName, excludeCurrentVersion, Long.MAX_VALUE, jobs, tableNames); } - public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, long maxSSTableTimestamp, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException; + @Deprecated(since = "5.1") + /** @deprecated See CASSANDRA-21000 */ + default int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, long maxSSTableTimestamp, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException + { + return upgradeSSTables(keyspaceName, excludeCurrentVersion, maxSSTableTimestamp, jobs, false, tableNames); + } + + public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, long maxSSTableTimestamp, int jobs, boolean latestColumnsOnly, String... tableNames) throws IOException, ExecutionException, InterruptedException; + public int recompressSSTables(String keyspaceName, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException; /** diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 662716f5e9d6..77da1683e549 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -396,9 +396,9 @@ public int verify(boolean extendedVerify, boolean checkVersion, boolean diskFail return ssProxy.verify(extendedVerify, checkVersion, diskFailurePolicy, mutateRepairStatus, checkOwnsTokens, quick, keyspaceName, tableNames); } - public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, long maxSSTableTimestamp, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException + public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, long maxSSTableTimestamp, int jobs, boolean latestColumnsOnly, String... tableNames) throws IOException, ExecutionException, InterruptedException { - return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, maxSSTableTimestamp, jobs, tableNames); + return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, maxSSTableTimestamp, jobs, latestColumnsOnly, tableNames); } public int garbageCollect(String tombstoneOption, int jobs, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException @@ -449,11 +449,11 @@ public void recompressSSTables(PrintStream out, String keyspaceName, int jobs, S "recompressing sstables"); } - public void upgradeSSTables(PrintStream out, String keyspaceName, boolean excludeCurrentVersion, long maxSSTableTimestamp, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException + public void upgradeSSTables(PrintStream out, String keyspaceName, boolean excludeCurrentVersion, long maxSSTableTimestamp, int jobs, boolean latestColumnsOnly, String... tableNames) throws IOException, ExecutionException, InterruptedException { checkJobs(out, jobs); perform(out, keyspaceName, - () -> upgradeSSTables(keyspaceName, excludeCurrentVersion, maxSSTableTimestamp, jobs, tableNames), + () -> upgradeSSTables(keyspaceName, excludeCurrentVersion, maxSSTableTimestamp, jobs, latestColumnsOnly, tableNames), "upgrading sstables"); } diff --git a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java index 069fdbe8451f..a4a7196d302e 100644 --- a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java +++ b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java @@ -40,6 +40,7 @@ import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.DroppedColumn; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.utils.JVMStabilityInspector; @@ -53,10 +54,12 @@ public class StandaloneUpgrader private static final String DEBUG_OPTION = "debug"; private static final String HELP_OPTION = "help"; private static final String KEEP_SOURCE = "keep-source"; + private static final String LATEST_COLUMNS_ONLY = "latest-columns-only"; public static void main(String args[]) { Options options = Options.parseArgs(args); + if (TEST_UTIL_ALLOW_TOOL_REINIT_FOR_TEST.getBoolean()) DatabaseDescriptor.toolInitialization(false); //Necessary for testing else @@ -91,12 +94,28 @@ public static void main(String args[]) try { SSTableReader sstable = SSTableReader.openNoValidation(entry.getKey(), components, cfs); - if (sstable.descriptor.version.equals(DatabaseDescriptor.getSelectedSSTableFormat().getLatestVersion())) + boolean shouldAdd = false; + if (options.latestColumnsOnly) { - sstable.selfRef().release(); - continue; + for (DroppedColumn droppedColumn : cfs.metadata().droppedColumns.values()) + { + if (sstable.header.columns().contains(droppedColumn.column)) + { + shouldAdd = true; + break; + } + } } - readers.add(sstable); + + if (!shouldAdd && !sstable.descriptor.version.equals(DatabaseDescriptor.getSelectedSSTableFormat().getLatestVersion())) + { + shouldAdd = true; + } + + if (shouldAdd) + readers.add(sstable); + else + sstable.selfRef().release(); } catch (Exception e) { @@ -115,7 +134,7 @@ public static void main(String args[]) try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.UPGRADE_SSTABLES, sstable)) { Upgrader upgrader = new Upgrader(cfs, txn, handler); - upgrader.upgrade(options.keepSource); + upgrader.upgrade(options.keepSource, options.latestColumnsOnly); } catch (Exception e) { @@ -151,6 +170,7 @@ private static class Options public boolean debug; public boolean keepSource; + public boolean latestColumnsOnly; private Options(String keyspace, String cf, String snapshot) { @@ -191,6 +211,7 @@ public static Options parseArgs(String cmdArgs[]) opts.debug = cmd.hasOption(DEBUG_OPTION); opts.keepSource = cmd.hasOption(KEEP_SOURCE); + opts.latestColumnsOnly = snapshot != null && cmd.hasOption(LATEST_COLUMNS_ONLY); return opts; } @@ -214,6 +235,7 @@ private static CmdLineOptions getCmdLineOptions() options.addOption(null, DEBUG_OPTION, "display stack traces"); options.addOption("h", HELP_OPTION, "display this help message"); options.addOption("k", KEEP_SOURCE, "do not delete the source sstables"); + options.addOption("l", LATEST_COLUMNS_ONLY, "remove dropped columns from metadata"); return options; } diff --git a/src/java/org/apache/cassandra/tools/nodetool/UpgradeSSTable.java b/src/java/org/apache/cassandra/tools/nodetool/UpgradeSSTable.java index 93638cd82046..3ba2651ecb27 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/UpgradeSSTable.java +++ b/src/java/org/apache/cassandra/tools/nodetool/UpgradeSSTable.java @@ -61,6 +61,11 @@ public class UpgradeSSTable extends AbstractCommand description = "Number of sstables to upgrade simultanously, set to 0 to use all available compaction threads") private int jobs = 2; + @Option(paramLabel = "latestColumns", + names = { "--latest-columns" }, + description = "Use this flag when you want to upgrade SSTables without having e.g. dropped columns in serialisation header.") + private boolean latestColumnsOnly = false; + @Override public void execute(NodeProbe probe) { @@ -76,7 +81,7 @@ public void execute(NodeProbe probe) { if (retries > 0) Thread.sleep(500); - probe.upgradeSSTables(probe.output().out, keyspace, !includeAll, maxSSTableTimestamp, jobs, tableNames); + probe.upgradeSSTables(probe.output().out, keyspace, !includeAll, maxSSTableTimestamp, jobs, latestColumnsOnly, tableNames); break; } catch (RuntimeException cie)