Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CI ONLY] 9.7.4 candidate #13110

Merged
merged 3 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# Rocksdb Change Log
> NOTE: Entries for next release do not go here. Follow instructions in `unreleased_history/README.txt`

## 9.7.4 (10/31/2024)
### Bug Fixes
* Fix a leak of obsolete blob files left open until DB::Close(). This bug was introduced in version 9.4.0.

## 9.7.3 (10/16/2024)
### Behavior Changes
* OPTIONS file to be loaded by remote worker is now preserved so that it does not get purged by the primary host. A similar technique as how we are preserving new SST files from getting purged is used for this. min_options_file_numbers_ is tracked like pending_outputs_ is tracked.
Expand Down
10 changes: 10 additions & 0 deletions db/blob/blob_file_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Status BlobFileCache::GetBlobFileReader(
assert(blob_file_reader);
assert(blob_file_reader->IsEmpty());

// NOTE: sharing same Cache with table_cache
const Slice key = GetSliceForKey(&blob_file_number);

assert(cache_);
Expand Down Expand Up @@ -98,4 +99,13 @@ Status BlobFileCache::GetBlobFileReader(
return Status::OK();
}

void BlobFileCache::Evict(uint64_t blob_file_number) {
// NOTE: sharing same Cache with table_cache
const Slice key = GetSliceForKey(&blob_file_number);

assert(cache_);

cache_.get()->Erase(key);
}

} // namespace ROCKSDB_NAMESPACE
9 changes: 9 additions & 0 deletions db/blob/blob_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ class BlobFileCache {
uint64_t blob_file_number,
CacheHandleGuard<BlobFileReader>* blob_file_reader);

// Called when a blob file is obsolete to ensure it is removed from the cache
// to avoid effectively leaking the open file and assicated memory
void Evict(uint64_t blob_file_number);

// Used to identify cache entries for blob files (not normally useful)
static const Cache::CacheItemHelper* GetHelper() {
return CacheInterface::GetBasicHelper();
}

private:
using CacheInterface =
BasicTypedCacheInterface<BlobFileReader, CacheEntryRole::kMisc>;
Expand Down
1 change: 1 addition & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ class ColumnFamilyData {
SequenceNumber earliest_seq);

TableCache* table_cache() const { return table_cache_.get(); }
BlobFileCache* blob_file_cache() const { return blob_file_cache_.get(); }
BlobSource* blob_source() const { return blob_source_.get(); }

// See documentation in compaction_picker.h
Expand Down
10 changes: 8 additions & 2 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -659,15 +659,19 @@ Status DBImpl::CloseHelper() {
// We need to release them before the block cache is destroyed. The block
// cache may be destroyed inside versions_.reset(), when column family data
// list is destroyed, so leaving handles in table cache after
// versions_.reset() may cause issues.
// Here we clean all unreferenced handles in table cache.
// versions_.reset() may cause issues. Here we clean all unreferenced handles
// in table cache, and (for certain builds/conditions) assert that no obsolete
// files are hanging around unreferenced (leak) in the table/blob file cache.
// Now we assume all user queries have finished, so only version set itself
// can possibly hold the blocks from block cache. After releasing unreferenced
// handles here, only handles held by version set left and inside
// versions_.reset(), we will release them. There, we need to make sure every
// time a handle is released, we erase it from the cache too. By doing that,
// we can guarantee that after versions_.reset(), table cache is empty
// so the cache can be safely destroyed.
#ifndef NDEBUG
TEST_VerifyNoObsoleteFilesCached(/*db_mutex_already_held=*/true);
#endif // !NDEBUG
table_cache_->EraseUnRefEntries();

for (auto& txn_entry : recovered_transactions_) {
Expand Down Expand Up @@ -3227,6 +3231,8 @@ Status DBImpl::MultiGetImpl(
s = Status::Aborted();
break;
}
// This could be a long-running operation
ROCKSDB_THREAD_YIELD_HOOK();
}

// Post processing (decrement reference counts and record statistics)
Expand Down
7 changes: 6 additions & 1 deletion db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1241,9 +1241,14 @@ class DBImpl : public DB {
static Status TEST_ValidateOptions(const DBOptions& db_options) {
return ValidateOptions(db_options);
}

#endif // NDEBUG

// In certain configurations, verify that the table/blob file cache only
// contains entries for live files, to check for effective leaks of open
// files. This can only be called when purging of obsolete files has
// "settled," such as during parts of DB Close().
void TEST_VerifyNoObsoleteFilesCached(bool db_mutex_already_held) const;

// persist stats to column family "_persistent_stats"
void PersistStats();

Expand Down
45 changes: 45 additions & 0 deletions db/db_impl/db_impl_debug.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#ifndef NDEBUG

#include "db/blob/blob_file_cache.h"
#include "db/column_family.h"
#include "db/db_impl/db_impl.h"
#include "db/error_handler.h"
Expand Down Expand Up @@ -328,5 +329,49 @@ size_t DBImpl::TEST_EstimateInMemoryStatsHistorySize() const {
InstrumentedMutexLock l(&const_cast<DBImpl*>(this)->stats_history_mutex_);
return EstimateInMemoryStatsHistorySize();
}

void DBImpl::TEST_VerifyNoObsoleteFilesCached(
bool db_mutex_already_held) const {
// This check is somewhat expensive and obscure to make a part of every
// unit test in every build variety. Thus, we only enable it for ASAN builds.
if (!kMustFreeHeapAllocations) {
return;
}

std::optional<InstrumentedMutexLock> l;
if (db_mutex_already_held) {
mutex_.AssertHeld();
} else {
l.emplace(&mutex_);
}

std::vector<uint64_t> live_files;
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
// Sneakily add both SST and blob files to the same list
cfd->current()->AddLiveFiles(&live_files, &live_files);
}
std::sort(live_files.begin(), live_files.end());

auto fn = [&live_files](const Slice& key, Cache::ObjectPtr, size_t,
const Cache::CacheItemHelper* helper) {
if (helper != BlobFileCache::GetHelper()) {
// Skip non-blob files for now
// FIXME: diagnose and fix the leaks of obsolete SST files revealed in
// unit tests.
return;
}
// See TableCache and BlobFileCache
assert(key.size() == sizeof(uint64_t));
uint64_t file_number;
GetUnaligned(reinterpret_cast<const uint64_t*>(key.data()), &file_number);
// Assert file is in sorted live_files
assert(
std::binary_search(live_files.begin(), live_files.end(), file_number));
};
table_cache_->ApplyToAllEntries(fn, {});
}
} // namespace ROCKSDB_NAMESPACE
#endif // NDEBUG
2 changes: 2 additions & 0 deletions db/db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,8 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
} else {
iter_.Next();
}
// This could be a long-running operation due to tombstones, etc.
ROCKSDB_THREAD_YIELD_HOOK();
} while (iter_.Valid());

valid_ = false;
Expand Down
2 changes: 2 additions & 0 deletions db/table_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ Status TableCache::GetTableReader(
}

Cache::Handle* TableCache::Lookup(Cache* cache, uint64_t file_number) {
// NOTE: sharing same Cache with BlobFileCache
Slice key = GetSliceForFileNumber(&file_number);
return cache->Lookup(key);
}
Expand All @@ -179,6 +180,7 @@ Status TableCache::FindTable(
size_t max_file_size_for_l0_meta_pin, Temperature file_temperature) {
PERF_TIMER_GUARD_WITH_CLOCK(find_table_nanos, ioptions_.clock);
uint64_t number = file_meta.fd.GetNumber();
// NOTE: sharing same Cache with BlobFileCache
Slice key = GetSliceForFileNumber(&number);
*handle = cache_.Lookup(key);
TEST_SYNC_POINT_CALLBACK("TableCache::FindTable:0",
Expand Down
15 changes: 8 additions & 7 deletions db/version_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <vector>

#include "cache/cache_reservation_manager.h"
#include "db/blob/blob_file_cache.h"
#include "db/blob/blob_file_meta.h"
#include "db/dbformat.h"
#include "db/internal_stats.h"
Expand Down Expand Up @@ -744,12 +745,9 @@ class VersionBuilder::Rep {
return Status::Corruption("VersionBuilder", oss.str());
}

// Note: we use C++11 for now but in C++14, this could be done in a more
// elegant way using generalized lambda capture.
VersionSet* const vs = version_set_;
const ImmutableCFOptions* const ioptions = ioptions_;

auto deleter = [vs, ioptions](SharedBlobFileMetaData* shared_meta) {
auto deleter = [vs = version_set_, ioptions = ioptions_,
bc = cfd_ ? cfd_->blob_file_cache()
: nullptr](SharedBlobFileMetaData* shared_meta) {
if (vs) {
assert(ioptions);
assert(!ioptions->cf_paths.empty());
Expand All @@ -758,6 +756,9 @@ class VersionBuilder::Rep {
vs->AddObsoleteBlobFile(shared_meta->GetBlobFileNumber(),
ioptions->cf_paths.front().path);
}
if (bc) {
bc->Evict(shared_meta->GetBlobFileNumber());
}

delete shared_meta;
};
Expand All @@ -766,7 +767,7 @@ class VersionBuilder::Rep {
blob_file_number, blob_file_addition.GetTotalBlobCount(),
blob_file_addition.GetTotalBlobBytes(),
blob_file_addition.GetChecksumMethod(),
blob_file_addition.GetChecksumValue(), deleter);
blob_file_addition.GetChecksumValue(), std::move(deleter));

mutable_blob_file_metas_.emplace(
blob_file_number, MutableBlobFileMetaData(std::move(shared_meta)));
Expand Down
1 change: 0 additions & 1 deletion db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -1514,7 +1514,6 @@ class VersionSet {
void GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata);

void AddObsoleteBlobFile(uint64_t blob_file_number, std::string path) {
// TODO: Erase file from BlobFileCache?
obsolete_blob_files_.emplace_back(blob_file_number, std::move(path));
}

Expand Down
2 changes: 1 addition & 1 deletion include/rocksdb/version.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// minor or major version number planned for release.
#define ROCKSDB_MAJOR 9
#define ROCKSDB_MINOR 7
#define ROCKSDB_PATCH 3
#define ROCKSDB_PATCH 4

// Do not use these. We made the mistake of declaring macros starting with
// double underscore. Now we have to live with our choice. We'll deprecate these
Expand Down
16 changes: 16 additions & 0 deletions port/port.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,19 @@
#elif defined(OS_WIN)
#include "port/win/port_win.h"
#endif

#ifdef OS_LINUX
// A temporary hook into long-running RocksDB threads to support modifying their
// priority etc. This should become a public API hook once the requirements
// are better understood.
extern "C" void RocksDbThreadYield() __attribute__((__weak__));
#define ROCKSDB_THREAD_YIELD_HOOK() \
{ \
if (RocksDbThreadYield) { \
RocksDbThreadYield(); \
} \
}
#else
#define ROCKSDB_THREAD_YIELD_HOOK() \
{}
#endif
Loading