Skip to content

Commit abf97b6

Browse files
committed
CNDB-15360: fixes from git review
1 parent 1cdb563 commit abf97b6

File tree

2 files changed

+80
-79
lines changed

2 files changed

+80
-79
lines changed

src/java/org/apache/cassandra/cache/ChunkCache.java

Lines changed: 64 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.concurrent.TimeUnit;
2929
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
3030
import java.util.concurrent.atomic.AtomicLong;
31+
import java.util.function.Consumer;
3132
import java.util.function.Function;
3233
import javax.annotation.Nullable;
3334

@@ -52,8 +53,6 @@
5253
import org.apache.cassandra.io.util.ChannelProxy;
5354
import org.apache.cassandra.io.util.ChunkReader;
5455
import org.apache.cassandra.io.util.File;
55-
import org.apache.cassandra.io.util.PrefetchingRebufferer;
56-
import org.apache.cassandra.io.util.ReadPattern;
5756
import org.apache.cassandra.io.util.Rebufferer;
5857
import org.apache.cassandra.io.util.RebuffererFactory;
5958
import org.apache.cassandra.metrics.ChunkCacheMetrics;
@@ -62,22 +61,18 @@
6261
import org.apache.cassandra.utils.memory.BufferPool;
6362
import org.apache.cassandra.utils.memory.BufferPools;
6463
import org.github.jamm.Unmetered;
65-
import java.util.List;
64+
6665
import java.util.Map;
67-
import java.util.HashMap;
68-
import java.util.Collections;
69-
import java.util.stream.Collectors;
70-
import java.util.stream.Stream;
7166

7267
public class ChunkCache
73-
implements RemovalListener<ChunkCache.Key, ChunkCache.Chunk>, CacheSize
68+
implements RemovalListener<ChunkCache.Key, ChunkCache.Chunk>, CacheSize
7469
{
7570
private final static Logger logger = LoggerFactory.getLogger(ChunkCache.class);
7671

7772
public static final int RESERVED_POOL_SPACE_IN_MB = 32;
7873
private static final int INITIAL_CAPACITY = Integer.getInteger("cassandra.chunkcache_initialcapacity", 16);
7974
private static final boolean ASYNC_CLEANUP = Boolean.parseBoolean(System.getProperty("cassandra.chunkcache.async_cleanup", "true"));
80-
private static final int CLEANER_THREADS = Integer.getInteger("dse.chunk.cache.cleaner.threads",1);
75+
private static final int CLEANER_THREADS = Integer.getInteger("dse.chunk.cache.cleaner.threads", 1);
8176

8277
private static final Class PERFORM_CLEANUP_TASK_CLASS;
8378
// cached value in order to not call System.getProperty on a hotpath
@@ -192,7 +187,8 @@ public void onRemoval(Key key, Chunk chunk, RemovalCause cause)
192187
/**
193188
* Clears the cache, used in the CNDB Writer for testing purposes.
194189
*/
195-
public void clear() {
190+
public void clear()
191+
{
196192
// Clear keysByFile first to prevent unnecessary computation in onRemoval method.
197193
synchronousCache.invalidateAll();
198194
}
@@ -246,7 +242,7 @@ public void intercept(Function<RebuffererFactory, RebuffererFactory> interceptor
246242

247243
/**
248244
* Maps a reader to a reader id, used by the cache to find content.
249-
*
245+
* <p>
250246
* Uses the file name (through the fileIdMap), reader type and chunk size to define the id.
251247
* The lowest {@link #READER_TYPE_BITS} are occupied by reader type, then the next {@link #CHUNK_SIZE_LOG2_BITS}
252248
* are occupied by log 2 of chunk size (we assume the chunk size is the power of 2), and the rest of the bits
@@ -269,15 +265,14 @@ protected long readerIdFor(ChunkReader source)
269265

270266
private long assignFileId(File file)
271267
{
272-
long id = nextFileId.getAndIncrement();
273-
return id;
268+
return nextFileId.getAndIncrement();
274269
}
275270

276271
/**
277272
* Invalidate all buffers from the given file, i.e. make sure they can not be accessed by any reader using a
278273
* FileHandle opened after this call. The buffers themselves will remain in the cache until they get normally
279274
* evicted, because it is too costly to remove them.
280-
*
275+
* <p>
281276
* Note that this call has no effect of handles that are already opened. The correct usage is to call this when
282277
* a file is deleted, or when a file is created for writing. It cannot be used to update and resynchronize the
283278
* cached view of an existing file.
@@ -300,7 +295,7 @@ public void invalidateFileNow(File file)
300295
if (fileIdMaybeNull == null)
301296
return;
302297
long fileId = fileIdMaybeNull << (CHUNK_SIZE_LOG2_BITS + READER_TYPE_BITS);
303-
long mask = - (1 << (CHUNK_SIZE_LOG2_BITS + READER_TYPE_BITS));
298+
long mask = -(1 << (CHUNK_SIZE_LOG2_BITS + READER_TYPE_BITS));
304299
synchronousCache.invalidateAll(Iterables.filter(cache.asMap().keySet(), x -> (x.readerId & mask) == fileId));
305300
}
306301

@@ -348,10 +343,14 @@ public boolean equals(Object obj)
348343
*/
349344
abstract static class Chunk
350345
{
351-
/** The offset in the file where the chunk is read */
346+
/**
347+
* The offset in the file where the chunk is read
348+
*/
352349
final long offset;
353350

354-
/** The number of bytes read from disk, this could be less than the memory space allocated */
351+
/**
352+
* The number of bytes read from disk, this could be less than the memory space allocated
353+
*/
355354
int bytesRead;
356355

357356
private volatile int references;
@@ -381,7 +380,6 @@ Rebufferer.BufferHolder getReferencedBuffer(long position)
381380

382381
if (refCount == 0)
383382
return null; // Buffer was released before we managed to reference it.
384-
385383
} while (!referencesUpdater.compareAndSet(this, refCount, refCount + 1));
386384

387385
return getBuffer(position);
@@ -761,8 +759,8 @@ public int size()
761759
public long weightedSize()
762760
{
763761
return synchronousCache.policy().eviction()
764-
.map(policy -> policy.weightedSize().orElseGet(synchronousCache::estimatedSize))
765-
.orElseGet(synchronousCache::estimatedSize);
762+
.map(policy -> policy.weightedSize().orElseGet(synchronousCache::estimatedSize))
763+
.orElseGet(synchronousCache::estimatedSize);
766764
}
767765

768766
/**
@@ -774,14 +772,17 @@ public int sizeOfFile(File file) {
774772
if (fileIdMaybeNull == null)
775773
return 0;
776774
long fileId = fileIdMaybeNull << (CHUNK_SIZE_LOG2_BITS + READER_TYPE_BITS);
777-
long mask = - (1 << (CHUNK_SIZE_LOG2_BITS + READER_TYPE_BITS));
775+
long mask = -(1 << (CHUNK_SIZE_LOG2_BITS + READER_TYPE_BITS));
778776
return (int) cacheAsMap.keySet().stream().filter(x -> (x.readerId & mask) == fileId).count();
779777
}
780778

781779
/**
782780
* A snapshot of a specific chunk currently held in the cache.
783781
* Used for diagnostics and inspection tools.
784782
*/
783+
enum CacheOrder
784+
{HOTTEST, COLDEST}
785+
785786
public static class ChunkCacheInspectionEntry
786787
{
787788
public final File file;
@@ -801,66 +802,65 @@ public String toString()
801802
return String.format("Chunk{file='%s', pos=%d, size=%d}", file, position, size);
802803
}
803804
}
804-
/**
805-
* Inspects the "hottest" (most frequently/recently used) chunks in the cache.
806-
* Uses a consumer pattern to avoid materializing a full list in memory.
807-
*
808-
* @param limit maximum number of entries to inspect
809-
* @param consumer consumer to process each entry
810-
*/
811-
public void inspectHotEntries(int limit, java.util.function.Consumer<ChunkCacheInspectionEntry> consumer)
812-
{
813-
inspectCacheSegments(limit, true, consumer);
814-
}
815805

816806
/**
817-
* Inspects the "coldest" (candidates for eviction) chunks in the cache.
807+
* Inspects chunks in the cache by access frequency/recency.
818808
* Uses a consumer pattern to avoid materializing a full list in memory.
819809
*
820810
* @param limit maximum number of entries to inspect
811+
* @param order whether to inspect hottest (most used) or coldest (eviction candidates)
821812
* @param consumer consumer to process each entry
822813
*/
823-
public void inspectColdEntries(int limit, java.util.function.Consumer<ChunkCacheInspectionEntry> consumer)
814+
public void inspectEntries(int limit, CacheOrder order, Consumer<ChunkCacheInspectionEntry> consumer)
824815
{
825-
inspectCacheSegments(limit, false, consumer);
816+
inspectCacheSegments(limit, order == CacheOrder.HOTTEST, consumer);
826817
}
827818

828-
private void inspectCacheSegments(int limit, boolean hottest, java.util.function.Consumer<ChunkCacheInspectionEntry> consumer)
819+
private void inspectCacheSegments(int limit, boolean hottest, Consumer<ChunkCacheInspectionEntry> consumer)
829820
{
830821
if (!enabled)
831-
return;
822+
throw new IllegalStateException("chunk cache not enabled");
823+
824+
// Eviction policy is required to determine hot/cold entries
825+
// Note: In practice this will always be present due to maximumWeight() configuration,
826+
// but we check explicitly to document the requirement and fail fast if cache setup changes.
827+
if (synchronousCache.policy().eviction().isEmpty())
828+
throw new IllegalStateException("no eviction policy configured - cannot determine hot/cold entries");
832829

833830
// The readerId packs multiple values into a single long: [File ID][Chunk Size][Reader Type]
834831
// We need to shift right to extract just the File ID portion by discarding the lower bits
835832
int shift = CHUNK_SIZE_LOG2_BITS + READER_TYPE_BITS;
836833

837-
synchronousCache.policy().eviction().ifPresent(policy -> {
838-
Map<Key, Chunk> orderedMap = hottest ? policy.hottest(limit) : policy.coldest(limit);
839-
840-
orderedMap.forEach((key, chunk) -> {
841-
// Skip entries where the chunk was evicted but the key still exists
842-
if (chunk == null)
843-
return;
844-
845-
// Extract the file ID by shifting away the lower bits.
846-
// The >>> operator does an unsigned right shift, moving the bits right and filling with zeros.
847-
// For example, if readerId is [FileID:42][ChunkSize:3][ReaderType:1] and shift is 5,
848-
// this operation discards the rightmost 5 bits (ChunkSize + ReaderType) leaving just FileID:42
849-
long fileId = key.readerId >>> shift;
850-
851-
// Look up the File by searching through fileIdMap entries
852-
File file = fileIdMap.entrySet().stream()
853-
.filter(e -> e.getValue().equals(fileId))
854-
.map(Map.Entry::getKey)
855-
.findFirst()
856-
.orElse(null);
857-
858-
// Skip if we can't find the file (it may have been invalidated)
859-
if (file == null)
860-
return;
861-
862-
consumer.accept(new ChunkCacheInspectionEntry(file, key.position, chunk.capacity()));
863-
});
834+
var policy = synchronousCache.policy().eviction().get();
835+
Map<Key, Chunk> orderedMap = hottest ? policy.hottest(limit) : policy.coldest(limit);
836+
837+
orderedMap.forEach((key, chunk) -> {
838+
// Skip entries where the chunk was evicted but the key still exists
839+
if (chunk == null)
840+
return;
841+
842+
// Extract the file ID by shifting away the lower bits.
843+
// The >>> operator does an unsigned right shift, moving the bits right and filling with zeros.
844+
// For example, if readerId is [FileID:42][ChunkSize:3][ReaderType:1] and shift is 5,
845+
// this operation discards the rightmost 5 bits (ChunkSize + ReaderType) leaving just FileID:42
846+
long fileId = key.readerId >>> shift;
847+
848+
// Look up the File by searching through fileIdMap entries
849+
File file = null;
850+
for (Map.Entry<File, Long> entry : fileIdMap.entrySet())
851+
{
852+
if (entry.getValue().equals(fileId))
853+
{
854+
file = entry.getKey();
855+
break;
856+
}
857+
}
858+
859+
// Skip if we can't find the file (it may have been invalidated)
860+
if (file == null)
861+
return;
862+
863+
consumer.accept(new ChunkCacheInspectionEntry(file, key.position, chunk.capacity()));
864864
});
865865
}
866866
}

test/unit/org/apache/cassandra/cache/ChunkCacheTest.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -615,7 +615,7 @@ public void testInspectHotEntries() throws IOException
615615

616616
// Inspect hot entries
617617
List<ChunkCache.ChunkCacheInspectionEntry> hotEntries = new ArrayList<>();
618-
ChunkCache.instance.inspectHotEntries(10, hotEntries::add);
618+
ChunkCache.instance.inspectEntries(10, ChunkCache.CacheOrder.HOTTEST, hotEntries::add);
619619

620620
// Should have exactly 3 entries
621621
assertEquals(3, hotEntries.size());
@@ -667,7 +667,7 @@ public void testInspectColdEntries() throws IOException
667667

668668
// Inspect cold entries
669669
List<ChunkCache.ChunkCacheInspectionEntry> coldEntries = new ArrayList<>();
670-
ChunkCache.instance.inspectColdEntries(10, coldEntries::add);
670+
ChunkCache.instance.inspectEntries(10, ChunkCache.CacheOrder.COLDEST , coldEntries::add);
671671

672672
assertEquals(2, coldEntries.size());
673673

@@ -710,7 +710,7 @@ public void testInspectEntriesWithLimit() throws IOException
710710

711711
// Test with limit smaller than cache size
712712
List<ChunkCache.ChunkCacheInspectionEntry> limitedEntries = new ArrayList<>();
713-
ChunkCache.instance.inspectHotEntries(2, limitedEntries::add);
713+
ChunkCache.instance.inspectEntries(2, ChunkCache.CacheOrder.HOTTEST, limitedEntries::add);
714714

715715
// Should respect the limit
716716
assertEquals(2, limitedEntries.size());
@@ -726,20 +726,13 @@ public void testInspectEntriesWithLimit() throws IOException
726726
}
727727
}
728728

729-
@Test
729+
@Test(expected = IllegalStateException.class)
730730
public void testInspectEntriesWhenCacheDisabled()
731731
{
732732
BufferPool pool = mock(BufferPool.class);
733733
ChunkCache disabledCache = new ChunkCache(pool, 0, ChunkCacheMetrics::create);
734734

735-
List<ChunkCache.ChunkCacheInspectionEntry> entries = new ArrayList<>();
736-
737-
// Should not throw and should return no entries
738-
disabledCache.inspectHotEntries(10, entries::add);
739-
assertEquals(0, entries.size());
740-
741-
disabledCache.inspectColdEntries(10, entries::add);
742-
assertEquals(0, entries.size());
735+
disabledCache.inspectEntries(10, ChunkCache.CacheOrder.HOTTEST, e -> {});
743736
}
744737

745738
@Test
@@ -759,7 +752,7 @@ public void testInspectEntriesWithZeroLimit() throws IOException
759752
assertEquals(1, ChunkCache.instance.size());
760753

761754
List<ChunkCache.ChunkCacheInspectionEntry> entries = new ArrayList<>();
762-
ChunkCache.instance.inspectHotEntries(0, entries::add);
755+
ChunkCache.instance.inspectEntries(0, ChunkCache.CacheOrder.HOTTEST , entries::add);
763756

764757
// Should return no entries when limit is 0
765758
assertEquals(0, entries.size());
@@ -776,10 +769,18 @@ public void testInspectEntriesWithEmptyCache()
776769
List<ChunkCache.ChunkCacheInspectionEntry> coldEntries = new ArrayList<>();
777770

778771
// Should not throw when cache is empty
779-
ChunkCache.instance.inspectHotEntries(10, hotEntries::add);
780-
ChunkCache.instance.inspectColdEntries(10, coldEntries::add);
772+
ChunkCache.instance.inspectEntries(10, ChunkCache.CacheOrder.HOTTEST , hotEntries::add);
773+
ChunkCache.instance.inspectEntries(10, ChunkCache.CacheOrder.COLDEST , coldEntries::add);
781774

782775
assertEquals(0, hotEntries.size());
783776
assertEquals(0, coldEntries.size());
784777
}
778+
@Test(expected = IllegalStateException.class)
779+
public void testInspectEntriesThrowsWhenCacheDisabled()
780+
{
781+
BufferPool pool = mock(BufferPool.class);
782+
ChunkCache disabledCache = new ChunkCache(pool, 0, ChunkCacheMetrics::create);
783+
784+
disabledCache.inspectEntries(10, ChunkCache.CacheOrder.HOTTEST, e -> {});
785+
}
785786
}

0 commit comments

Comments
 (0)