Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 107 additions & 14 deletions src/java/org/apache/cassandra/cache/ChunkCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nullable;

Expand All @@ -52,8 +53,6 @@
import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.ChunkReader;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.PrefetchingRebufferer;
import org.apache.cassandra.io.util.ReadPattern;
import org.apache.cassandra.io.util.Rebufferer;
import org.apache.cassandra.io.util.RebuffererFactory;
import org.apache.cassandra.metrics.ChunkCacheMetrics;
Expand All @@ -63,15 +62,17 @@
import org.apache.cassandra.utils.memory.BufferPools;
import org.github.jamm.Unmetered;

import java.util.Map;

public class ChunkCache
implements RemovalListener<ChunkCache.Key, ChunkCache.Chunk>, CacheSize
implements RemovalListener<ChunkCache.Key, ChunkCache.Chunk>, CacheSize
{
private final static Logger logger = LoggerFactory.getLogger(ChunkCache.class);

public static final int RESERVED_POOL_SPACE_IN_MB = 32;
private static final int INITIAL_CAPACITY = Integer.getInteger("cassandra.chunkcache_initialcapacity", 16);
private static final boolean ASYNC_CLEANUP = Boolean.parseBoolean(System.getProperty("cassandra.chunkcache.async_cleanup", "true"));
private static final int CLEANER_THREADS = Integer.getInteger("dse.chunk.cache.cleaner.threads",1);
private static final int CLEANER_THREADS = Integer.getInteger("dse.chunk.cache.cleaner.threads", 1);

private static final Class PERFORM_CLEANUP_TASK_CLASS;
// cached value in order to not call System.getProperty on a hotpath
Expand Down Expand Up @@ -186,7 +187,8 @@ public void onRemoval(Key key, Chunk chunk, RemovalCause cause)
/**
* Clears the cache, used in the CNDB Writer for testing purposes.
*/
public void clear() {
public void clear()
{
// Clear keysByFile first to prevent unnecessary computation in onRemoval method.
synchronousCache.invalidateAll();
}
Expand Down Expand Up @@ -240,7 +242,7 @@ public void intercept(Function<RebuffererFactory, RebuffererFactory> interceptor

/**
* Maps a reader to a reader id, used by the cache to find content.
*
* <p>
* Uses the file name (through the fileIdMap), reader type and chunk size to define the id.
* The lowest {@link #READER_TYPE_BITS} are occupied by reader type, then the next {@link #CHUNK_SIZE_LOG2_BITS}
* are occupied by log 2 of chunk size (we assume the chunk size is the power of 2), and the rest of the bits
Expand Down Expand Up @@ -270,7 +272,7 @@ private long assignFileId(File file)
* Invalidate all buffers from the given file, i.e. make sure they can not be accessed by any reader using a
* FileHandle opened after this call. The buffers themselves will remain in the cache until they get normally
* evicted, because it is too costly to remove them.
*
* <p>
* Note that this call has no effect of handles that are already opened. The correct usage is to call this when
* a file is deleted, or when a file is created for writing. It cannot be used to update and resynchronize the
* cached view of an existing file.
Expand All @@ -293,7 +295,7 @@ public void invalidateFileNow(File file)
if (fileIdMaybeNull == null)
return;
long fileId = fileIdMaybeNull << (CHUNK_SIZE_LOG2_BITS + READER_TYPE_BITS);
long mask = - (1 << (CHUNK_SIZE_LOG2_BITS + READER_TYPE_BITS));
long mask = -(1 << (CHUNK_SIZE_LOG2_BITS + READER_TYPE_BITS));
synchronousCache.invalidateAll(Iterables.filter(cache.asMap().keySet(), x -> (x.readerId & mask) == fileId));
}

Expand Down Expand Up @@ -341,10 +343,14 @@ public boolean equals(Object obj)
*/
abstract static class Chunk
{
/** The offset in the file where the chunk is read */
/**
* The offset in the file where the chunk is read
*/
final long offset;

/** The number of bytes read from disk, this could be less than the memory space allocated */
/**
* The number of bytes read from disk, this could be less than the memory space allocated
*/
int bytesRead;

private volatile int references;
Expand Down Expand Up @@ -374,7 +380,6 @@ Rebufferer.BufferHolder getReferencedBuffer(long position)

if (refCount == 0)
return null; // Buffer was released before we managed to reference it.

} while (!referencesUpdater.compareAndSet(this, refCount, refCount + 1));

return getBuffer(position);
Expand Down Expand Up @@ -754,8 +759,8 @@ public int size()
public long weightedSize()
{
return synchronousCache.policy().eviction()
.map(policy -> policy.weightedSize().orElseGet(synchronousCache::estimatedSize))
.orElseGet(synchronousCache::estimatedSize);
.map(policy -> policy.weightedSize().orElseGet(synchronousCache::estimatedSize))
.orElseGet(synchronousCache::estimatedSize);
}

/**
Expand All @@ -767,7 +772,95 @@ public int sizeOfFile(File file) {
if (fileIdMaybeNull == null)
return 0;
long fileId = fileIdMaybeNull << (CHUNK_SIZE_LOG2_BITS + READER_TYPE_BITS);
long mask = - (1 << (CHUNK_SIZE_LOG2_BITS + READER_TYPE_BITS));
long mask = -(1 << (CHUNK_SIZE_LOG2_BITS + READER_TYPE_BITS));
return (int) cacheAsMap.keySet().stream().filter(x -> (x.readerId & mask) == fileId).count();
}

/**
* A snapshot of a specific chunk currently held in the cache.
* Used for diagnostics and inspection tools.
*/
public enum CacheOrder
{HOTTEST, COLDEST}

public static class ChunkCacheInspectionEntry
{
public final File file;
public final long position;
public final int size;

public ChunkCacheInspectionEntry(File file, long position, int size)
{
this.file = file;
this.position = position;
this.size = size;
}

@Override
public String toString()
{
return String.format("Chunk{file='%s', pos=%d, size=%d}", file, position, size);
}
}

/**
* Inspects chunks in the cache by access frequency/recency.
* Uses a consumer pattern to avoid materializing a full list in memory.
*
* @param limit maximum number of entries to inspect
* @param order whether to inspect hottest (most used) or coldest (eviction candidates)
* @param consumer consumer to process each entry
*/
public void inspectEntries(int limit, CacheOrder order, Consumer<ChunkCacheInspectionEntry> consumer)
{
inspectCacheSegments(limit, order == CacheOrder.HOTTEST, consumer);
}

private void inspectCacheSegments(int limit, boolean hottest, Consumer<ChunkCacheInspectionEntry> consumer)
{
if (!enabled)
throw new IllegalStateException("chunk cache not enabled");

// Eviction policy is required to determine hot/cold entries
// Note: In practice this will always be present due to maximumWeight() configuration,
// but we check explicitly to document the requirement and fail fast if cache setup changes.
if (synchronousCache.policy().eviction().isEmpty())
throw new IllegalStateException("no eviction policy configured - cannot determine hot/cold entries");

// The readerId packs multiple values into a single long: [File ID][Chunk Size][Reader Type]
// We need to shift right to extract just the File ID portion by discarding the lower bits
int shift = CHUNK_SIZE_LOG2_BITS + READER_TYPE_BITS;

var policy = synchronousCache.policy().eviction().get();
Map<Key, Chunk> orderedMap = hottest ? policy.hottest(limit) : policy.coldest(limit);

orderedMap.forEach((key, chunk) -> {
// Skip entries where the chunk was evicted but the key still exists
if (chunk == null)
return;

// Extract the file ID by shifting away the lower bits.
// The >>> operator does an unsigned right shift, moving the bits right and filling with zeros.
// For example, if readerId is [FileID:42][ChunkSize:3][ReaderType:1] and shift is 5,
// this operation discards the rightmost 5 bits (ChunkSize + ReaderType) leaving just FileID:42
long fileId = key.readerId >>> shift;

// Look up the File by searching through fileIdMap entries
File file = null;
for (Map.Entry<File, Long> entry : fileIdMap.entrySet())
{
if (entry.getValue().equals(fileId))
{
file = entry.getKey();
break;
}
}

// Skip if we can't find the file (it may have been invalidated)
if (file == null)
return;

consumer.accept(new ChunkCacheInspectionEntry(file, key.position, chunk.capacity()));
});
}
}
Loading