Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More accurate accounting of compressed cache memory #13032

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 38 additions & 8 deletions cache/compressed_secondary_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ std::unique_ptr<SecondaryCacheResultHandle> CompressedSecondaryCache::Lookup(
data_ptr = GetVarint32Ptr(data_ptr, data_ptr + 1,
static_cast<uint32_t*>(&source_32));
source = static_cast<CacheTier>(source_32);
handle_value_charge -= (data_ptr - ptr->get());
uint64_t data_size = 0;
data_ptr = GetVarint64Ptr(data_ptr, ptr->get() + handle_value_charge,
static_cast<uint64_t*>(&data_size));
assert(handle_value_charge > data_size);
handle_value_charge = data_size;
}
MemoryAllocator* allocator = cache_options_.memory_allocator.get();

Expand Down Expand Up @@ -169,13 +173,15 @@ Status CompressedSecondaryCache::InsertInternal(
}

auto internal_helper = GetHelper(cache_options_.enable_custom_split_merge);
char header[10];
char header[20];
char* payload = header;
payload = EncodeVarint32(payload, static_cast<uint32_t>(type));
payload = EncodeVarint32(payload, static_cast<uint32_t>(source));
size_t data_size = (*helper->size_cb)(value);
char* data_size_ptr = payload;
payload = EncodeVarint64(payload, data_size);

size_t header_size = payload - header;
size_t data_size = (*helper->size_cb)(value);
size_t total_size = data_size + header_size;
CacheAllocationPtr ptr =
AllocateBlock(total_size, cache_options_.memory_allocator.get());
Expand Down Expand Up @@ -210,6 +216,8 @@ Status CompressedSecondaryCache::InsertInternal(

val = Slice(compressed_val);
data_size = compressed_val.size();
payload = EncodeVarint64(data_size_ptr, data_size);
header_size = payload - header;
total_size = header_size + data_size;
PERF_COUNTER_ADD(compressed_sec_cache_compressed_bytes, data_size);

Expand All @@ -222,14 +230,21 @@ Status CompressedSecondaryCache::InsertInternal(

PERF_COUNTER_ADD(compressed_sec_cache_insert_real_count, 1);
if (cache_options_.enable_custom_split_merge) {
size_t charge{0};
CacheValueChunk* value_chunks_head =
SplitValueIntoChunks(val, cache_options_.compression_type, charge);
return cache_->Insert(key, value_chunks_head, internal_helper, charge);
size_t split_charge{0};
CacheValueChunk* value_chunks_head = SplitValueIntoChunks(
val, cache_options_.compression_type, split_charge);
return cache_->Insert(key, value_chunks_head, internal_helper,
split_charge);
} else {
#ifdef ROCKSDB_MALLOC_USABLE_SIZE
size_t charge = malloc_usable_size(ptr.get());
#else
size_t charge = total_size;
#endif
std::memcpy(ptr.get(), header, header_size);
CacheAllocationPtr* buf = new CacheAllocationPtr(std::move(ptr));
return cache_->Insert(key, buf, internal_helper, total_size);
charge += sizeof(CacheAllocationPtr);
return cache_->Insert(key, buf, internal_helper, charge);
}
}

Expand Down Expand Up @@ -398,6 +413,21 @@ const Cache::CacheItemHelper* CompressedSecondaryCache::GetHelper(
}
}

size_t CompressedSecondaryCache::TEST_GetCharge(const Slice& key) {
Cache::Handle* lru_handle = cache_->Lookup(key);
if (lru_handle == nullptr) {
return 0;
}

size_t charge = cache_->GetCharge(lru_handle);
if (cache_->Value(lru_handle) != nullptr &&
!cache_options_.enable_custom_split_merge) {
charge -= 10;
}
cache_->Release(lru_handle, /*erase_if_last_ref=*/false);
return charge;
}

std::shared_ptr<SecondaryCache>
CompressedSecondaryCacheOptions::MakeSharedSecondaryCache() const {
return std::make_shared<CompressedSecondaryCache>(*this);
Expand Down
2 changes: 2 additions & 0 deletions cache/compressed_secondary_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ class CompressedSecondaryCache : public SecondaryCache {
const Cache::CacheItemHelper* helper,
CompressionType type, CacheTier source);

size_t TEST_GetCharge(const Slice& key);

// TODO: clean up to use cleaner interfaces in typed_cache.h
const Cache::CacheItemHelper* GetHelper(bool enable_custom_split_merge) const;
std::shared_ptr<Cache> cache_;
Expand Down
4 changes: 4 additions & 0 deletions cache/compressed_secondary_cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class CompressedSecondaryCacheTestBase : public testing::Test,
protected:
void BasicTestHelper(std::shared_ptr<SecondaryCache> sec_cache,
bool sec_cache_is_compressed) {
CompressedSecondaryCache* comp_sec_cache =
static_cast<CompressedSecondaryCache*>(sec_cache.get());
get_perf_context()->Reset();
bool kept_in_sec_cache{true};
// Lookup an non-existent key.
Expand Down Expand Up @@ -66,6 +68,8 @@ class CompressedSecondaryCacheTestBase : public testing::Test,
ASSERT_OK(sec_cache->Insert(key1, &item1, GetHelper(), false));
ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_real_count, 1);

ASSERT_GT(comp_sec_cache->TEST_GetCharge(key1), 1000);

std::unique_ptr<SecondaryCacheResultHandle> handle1_2 =
sec_cache->Lookup(key1, GetHelper(), this, true, /*advise_erase=*/true,
/*stats=*/nullptr, kept_in_sec_cache);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix under counting of allocated memory in the compressed secondary cache due to looking at the compressed block size rather than the actual memory allocated, which could be larger due to internal fragmentation.
Loading