Skip to content

Commit

Permalink
MultiCFSnapshot for NewIterators() API
Browse files Browse the repository at this point in the history
  • Loading branch information
jaykorean committed Apr 22, 2024
1 parent ca3814a commit 5da427f
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 91 deletions.
13 changes: 7 additions & 6 deletions db/db_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1413,7 +1413,7 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCF) {
int get_sv_count = 0;
ROCKSDB_NAMESPACE::DBImpl* db = static_cast_with_check<DBImpl>(db_);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
"DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) {
if (++get_sv_count == 2) {
// After MultiGet refs a couple of CFs, flush all CFs so MultiGet
// is forced to repeat the process
Expand Down Expand Up @@ -1513,9 +1513,10 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFMutex) {
int retries = 0;
bool last_try = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MultiGet::LastTry", [&](void* /*arg*/) { last_try = true; });
"DBImpl::MultiCFSnapshot::LastTry",
[&](void* /*arg*/) { last_try = true; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
"DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) {
if (last_try) {
return;
}
Expand All @@ -1531,10 +1532,10 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFMutex) {
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{"DBImpl::MultiGet::AfterLastTryRefSV",
{"DBImpl::MultiCFSnapshot::AfterLastTryRefSV",
"DBMultiGetTestWithParam::MultiGetMultiCFMutex:BeforeCreateSV"},
{"DBMultiGetTestWithParam::MultiGetMultiCFMutex:AfterCreateSV",
"DBImpl::MultiGet::BeforeLastTryUnRefSV"},
"DBImpl::MultiCFSnapshot::BeforeLastTryUnRefSV"},
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

Expand Down Expand Up @@ -1600,7 +1601,7 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFSnapshot) {
int get_sv_count = 0;
ROCKSDB_NAMESPACE::DBImpl* db = static_cast_with_check<DBImpl>(db_);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
"DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) {
if (++get_sv_count == 2) {
for (int i = 0; i < 8; ++i) {
ASSERT_OK(Flush(i));
Expand Down
158 changes: 79 additions & 79 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2522,7 +2522,8 @@ Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options,
ReadCallback* callback,
IterDerefFuncType iter_deref_func, T* cf_list,
SequenceNumber* snapshot,
bool* sv_from_thread_local) {
bool* sv_from_thread_local,
bool sv_exclusive_access) {
PERF_TIMER_GUARD(get_snapshot_time);

assert(sv_from_thread_local);
Expand All @@ -2539,7 +2540,7 @@ Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options,
SuperVersion* super_version = node->super_version;
ColumnFamilyData* cfd = node->cfd;
if (super_version != nullptr) {
if (*sv_from_thread_local) {
if (*sv_from_thread_local && sv_exclusive_access) {
ReturnAndCleanupSuperVersion(cfd, super_version);
} else {
CleanupSuperVersion(super_version);
Expand Down Expand Up @@ -2602,7 +2603,7 @@ Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options,
}
if (read_options.snapshot == nullptr) {
if (last_try) {
TEST_SYNC_POINT("DBImpl::MultiGet::LastTry");
TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::LastTry");
// We're close to max number of retries. For the last retry,
// acquire the lock so we're sure to succeed
mutex_.Lock();
Expand All @@ -2617,11 +2618,15 @@ Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options,
++cf_iter) {
auto node = iter_deref_func(cf_iter);
if (!last_try) {
node->super_version = GetAndRefSuperVersion(node->cfd);
if (sv_exclusive_access) {
node->super_version = GetAndRefSuperVersion(node->cfd);
} else {
node->super_version = node->cfd->GetReferencedSuperVersion(this);
}
} else {
node->super_version = node->cfd->GetSuperVersion()->Ref();
}
TEST_SYNC_POINT("DBImpl::MultiGet::AfterRefSV");
TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::AfterRefSV");
if (check_read_ts) {
s = FailIfReadCollapsedHistory(node->cfd, node->super_version,
*(read_options.timestamp));
Expand Down Expand Up @@ -2658,7 +2663,7 @@ Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options,
if (!retry) {
if (last_try) {
mutex_.Unlock();
TEST_SYNC_POINT("DBImpl::MultiGet::AfterLastTryRefSV");
TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::AfterLastTryRefSV");
}
break;
}
Expand Down Expand Up @@ -2772,34 +2777,35 @@ void DBImpl::MultiGetCommon(const ReadOptions& read_options,

autovector<MultiGetKeyRangePerCf, MultiGetContext::MAX_BATCH_SIZE>
key_range_per_cf;
autovector<ColumnFamilyDataSuperVersionPair, MultiGetContext::MAX_BATCH_SIZE>
cfd_sv_pairs;
autovector<ColumnFamilySuperVersionPair, MultiGetContext::MAX_BATCH_SIZE>
cf_sv_pairs;
size_t cf_start = 0;
ColumnFamilyHandle* cf = sorted_keys[0]->column_family;

for (size_t i = 0; i < num_keys; ++i) {
KeyContext* key_ctx = sorted_keys[i];
if (key_ctx->column_family != cf) {
key_range_per_cf.emplace_back(cf_start, i - cf_start);
cfd_sv_pairs.emplace_back(cf, nullptr);
cf_sv_pairs.emplace_back(cf, nullptr);
cf_start = i;
cf = key_ctx->column_family;
}
}

key_range_per_cf.emplace_back(cf_start, num_keys - cf_start);
cfd_sv_pairs.emplace_back(cf, nullptr);
cf_sv_pairs.emplace_back(cf, nullptr);

SequenceNumber consistent_seqnum = kMaxSequenceNumber;
bool sv_from_thread_local;
Status s = MultiCFSnapshot<autovector<ColumnFamilyDataSuperVersionPair,
Status s = MultiCFSnapshot<autovector<ColumnFamilySuperVersionPair,
MultiGetContext::MAX_BATCH_SIZE>>(
read_options, nullptr,
[](autovector<ColumnFamilyDataSuperVersionPair,
[](autovector<ColumnFamilySuperVersionPair,
MultiGetContext::MAX_BATCH_SIZE>::iterator& cf_iter) {
return &(*cf_iter);
},
&cfd_sv_pairs, &consistent_seqnum, &sv_from_thread_local);
&cf_sv_pairs, &consistent_seqnum, &sv_from_thread_local,
/* sv_exclusive_access */ true);

if (!s.ok()) {
for (size_t i = 0; i < num_keys; ++i) {
Expand All @@ -2817,20 +2823,20 @@ void DBImpl::MultiGetCommon(const ReadOptions& read_options,
read_callback = &timestamp_read_callback;
}

assert(key_range_per_cf.size() == cfd_sv_pairs.size());
assert(key_range_per_cf.size() == cf_sv_pairs.size());
auto key_range_per_cf_iter = key_range_per_cf.begin();
auto cfd_sv_pair_iter = cfd_sv_pairs.begin();
auto cf_sv_pair_iter = cf_sv_pairs.begin();
while (key_range_per_cf_iter != key_range_per_cf.end() &&
cfd_sv_pair_iter != cfd_sv_pairs.end()) {
cf_sv_pair_iter != cf_sv_pairs.end()) {
s = MultiGetImpl(read_options, key_range_per_cf_iter->start,
key_range_per_cf_iter->num_keys, &sorted_keys,
cfd_sv_pair_iter->super_version, consistent_seqnum,
cf_sv_pair_iter->super_version, consistent_seqnum,
read_callback);
if (!s.ok()) {
break;
}
++key_range_per_cf_iter;
++cfd_sv_pair_iter;
++cf_sv_pair_iter;
}
if (!s.ok()) {
assert(s.IsTimedOut() || s.IsAborted());
Expand All @@ -2845,11 +2851,11 @@ void DBImpl::MultiGetCommon(const ReadOptions& read_options,
}
}

for (const auto& cfd_sv_pair : cfd_sv_pairs) {
for (const auto& cfd_sv_pair : cf_sv_pairs) {
if (sv_from_thread_local) {
ReturnAndCleanupSuperVersion(cfd_sv_pair.cfd, cfd_sv_pair.super_version);
} else {
TEST_SYNC_POINT("DBImpl::MultiGet::BeforeLastTryUnRefSV");
TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::BeforeLastTryUnRefSV");
CleanupSuperVersion(cfd_sv_pair.super_version);
}
}
Expand Down Expand Up @@ -2982,17 +2988,18 @@ void DBImpl::MultiGetWithCallbackImpl(
const ReadOptions& read_options, ColumnFamilyHandle* column_family,
ReadCallback* callback,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
std::array<ColumnFamilyDataSuperVersionPair, 1> cfd_sv_pairs;
cfd_sv_pairs[0] = ColumnFamilyDataSuperVersionPair(column_family, nullptr);
std::array<ColumnFamilySuperVersionPair, 1> cf_sv_pairs;
cf_sv_pairs[0] = ColumnFamilySuperVersionPair(column_family, nullptr);
size_t num_keys = sorted_keys->size();
SequenceNumber consistent_seqnum = kMaxSequenceNumber;
bool sv_from_thread_local;
Status s = MultiCFSnapshot<std::array<ColumnFamilyDataSuperVersionPair, 1>>(
Status s = MultiCFSnapshot<std::array<ColumnFamilySuperVersionPair, 1>>(
read_options, callback,
[](std::array<ColumnFamilyDataSuperVersionPair, 1>::iterator& cf_iter) {
[](std::array<ColumnFamilySuperVersionPair, 1>::iterator& cf_iter) {
return &(*cf_iter);
},
&cfd_sv_pairs, &consistent_seqnum, &sv_from_thread_local);
&cf_sv_pairs, &consistent_seqnum, &sv_from_thread_local,
/* sv_exclusive_access */ true);
if (!s.ok()) {
return;
}
Expand Down Expand Up @@ -3031,11 +3038,11 @@ void DBImpl::MultiGetWithCallbackImpl(
}

s = MultiGetImpl(read_options, 0, num_keys, sorted_keys,
cfd_sv_pairs[0].super_version, consistent_seqnum,
cf_sv_pairs[0].super_version, consistent_seqnum,
read_callback);
assert(s.ok() || s.IsTimedOut() || s.IsAborted());
ReturnAndCleanupSuperVersion(cfd_sv_pairs[0].cfd,
cfd_sv_pairs[0].super_version);
ReturnAndCleanupSuperVersion(cf_sv_pairs[0].cfd,
cf_sv_pairs[0].super_version);
}

// The actual implementation of batched MultiGet. Parameters -
Expand Down Expand Up @@ -3817,69 +3824,62 @@ Status DBImpl::NewIterators(
"ReadTier::kPersistedData is not yet supported in iterators.");
}

if (read_options.timestamp) {
for (auto* cf : column_families) {
assert(cf);
const Status s = FailIfTsMismatchCf(cf, *(read_options.timestamp));
if (!s.ok()) {
return s;
}
autovector<ColumnFamilySuperVersionPair, MultiGetContext::MAX_BATCH_SIZE>
cf_sv_pairs;

Status s;
for (auto* cf : column_families) {
assert(cf);
if (read_options.timestamp) {
s = FailIfTsMismatchCf(cf, *(read_options.timestamp));
} else {
s = FailIfCfHasTs(cf);
}
} else {
for (auto* cf : column_families) {
assert(cf);
const Status s = FailIfCfHasTs(cf);
if (!s.ok()) {
return s;
}
if (!s.ok()) {
return s;
}
cf_sv_pairs.emplace_back(cf, nullptr);
}

iterators->clear();
iterators->reserve(column_families.size());
autovector<std::tuple<ColumnFamilyHandleImpl*, SuperVersion*>> cfh_to_sv;
const bool check_read_ts =
read_options.timestamp && read_options.timestamp->size() > 0;
for (auto cf : column_families) {
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(cf);
auto cfd = cfh->cfd();
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
cfh_to_sv.emplace_back(cfh, sv);
if (check_read_ts) {
const Status s =
FailIfReadCollapsedHistory(cfd, sv, *(read_options.timestamp));
if (!s.ok()) {
for (auto prev_entry : cfh_to_sv) {
CleanupSuperVersion(std::get<1>(prev_entry));
}
return s;
}
}

SequenceNumber consistent_seqnum = kMaxSequenceNumber;
bool sv_from_thread_local;
s = MultiCFSnapshot<autovector<ColumnFamilySuperVersionPair,
MultiGetContext::MAX_BATCH_SIZE>>(
read_options, nullptr /* read_callback*/,
[](autovector<ColumnFamilySuperVersionPair,
MultiGetContext::MAX_BATCH_SIZE>::iterator& cf_iter) {
return &(*cf_iter);
},
&cf_sv_pairs, &consistent_seqnum, &sv_from_thread_local,
/* sv_exclusive_access */ false);
if (!s.ok()) {
return s;
}
assert(cfh_to_sv.size() == column_families.size());

assert(cf_sv_pairs.size() == column_families.size());
if (read_options.tailing) {
for (auto [cfh, sv] : cfh_to_sv) {
auto iter = new ForwardIterator(this, read_options, cfh->cfd(), sv,
for (const auto& cfd_sv_pair : cf_sv_pairs) {
auto iter = new ForwardIterator(this, read_options, cfd_sv_pair.cfd,
cfd_sv_pair.super_version,
/* allow_unprepared_value */ true);
iterators->push_back(NewDBIterator(
env_, read_options, *cfh->cfd()->ioptions(), sv->mutable_cf_options,
cfh->cfd()->user_comparator(), iter, sv->current, kMaxSequenceNumber,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/, cfh));
iterators->push_back(
NewDBIterator(env_, read_options, *cfd_sv_pair.cfd->ioptions(),
cfd_sv_pair.super_version->mutable_cf_options,
cfd_sv_pair.cfd->user_comparator(), iter,
cfd_sv_pair.super_version->current, kMaxSequenceNumber,
cfd_sv_pair.super_version->mutable_cf_options
.max_sequential_skip_in_iterations,
nullptr /*read_callback*/, cfd_sv_pair.cfh));
}
} else {
// Note: no need to consider the special case of
// last_seq_same_as_publish_seq_==false since NewIterators is overridden
// in WritePreparedTxnDB
auto snapshot = read_options.snapshot != nullptr
? read_options.snapshot->GetSequenceNumber()
: versions_->LastSequence();
for (auto [cfh, sv] : cfh_to_sv) {
iterators->push_back(NewIteratorImpl(read_options, cfh, sv, snapshot,
nullptr /*read_callback*/));
for (const auto& cfd_sv_pair : cf_sv_pairs) {
iterators->push_back(NewIteratorImpl(
read_options, cfd_sv_pair.cfh, cfd_sv_pair.super_version,
consistent_seqnum, nullptr /*read_callback*/));
}
}

return Status::OK();
}

Expand Down
22 changes: 16 additions & 6 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2355,18 +2355,20 @@ class DBImpl : public DB {

// A structure to contain ColumnFamilyData and the SuperVersion obtained for
// the consistent view of DB
struct ColumnFamilyDataSuperVersionPair {
struct ColumnFamilySuperVersionPair {
ColumnFamilyHandleImpl* cfh;
ColumnFamilyData* cfd;

// SuperVersion for the column family obtained in a manner that ensures a
// consistent view across all column families in the DB
SuperVersion* super_version;
ColumnFamilyDataSuperVersionPair(ColumnFamilyHandle* column_family,
SuperVersion* sv)
: cfd(static_cast<ColumnFamilyHandleImpl*>(column_family)->cfd()),
ColumnFamilySuperVersionPair(ColumnFamilyHandle* column_family,
SuperVersion* sv)
: cfh(static_cast<ColumnFamilyHandleImpl*>(column_family)),
cfd(cfh->cfd()),
super_version(sv) {}

ColumnFamilyDataSuperVersionPair() = default;
ColumnFamilySuperVersionPair() = default;
};

// A common function to obtain a consistent snapshot, which can be implicit
Expand All @@ -2383,14 +2385,22 @@ class DBImpl : public DB {
// `sv_from_thread_local` being set to false indicates that the SuperVersion
// obtained from the ColumnFamilyData, whereas true indicates they are thread
// local.
// `sv_exclusive_access` is used to indicate whether thread-local SuperVersion
// can be obtained without extra ref (by GetAndRefSuperVersion()) or not
// (by GetReferencedSuperVersion()). For instance, point lookup like MultiGet
// does not require SuperVersion to be re-acquired throughout the entire
// invocation (no need extra ref), while MultiCfIterators may need the
// SuperVersion to be updated during Refresh() (requires extra ref).
//
// A non-OK status will be returned if for a column family that enables
// user-defined timestamp feature, the specified `ReadOptions.timestamp`
// attemps to read collapsed history.
template <class T, typename IterDerefFuncType>
Status MultiCFSnapshot(const ReadOptions& read_options,
ReadCallback* callback,
IterDerefFuncType iter_deref_func, T* cf_list,
SequenceNumber* snapshot, bool* sv_from_thread_local);
SequenceNumber* snapshot, bool* sv_from_thread_local,
bool sv_exclusive_access);

// The actual implementation of the batching MultiGet. The caller is expected
// to have acquired the SuperVersion and pass in a snapshot sequence number
Expand Down
Loading

0 comments on commit 5da427f

Please sign in to comment.