Skip to content

Commit

Permalink
Access DBImpl* and CFD* by CFHImpl* in Iterators (facebook#12395)
Browse files Browse the repository at this point in the history
Summary:
In the current implementation of iterators, `DBImpl*` and `ColumnFamilyData*` are held in `DBIter` and `ArenaWrappedDBIter` for two purposes: tracing and Refresh() API. With the introduction of a new iterator called MultiCfIterator in PR facebook#12153 , which is a cross-column-family iterator that maintains multiple DBIters as child iterators from a consistent database state, we need to make some changes to the existing implementation. The new iterator will still be exposed through the generic Iterator interface with an additional capability to return AttributeGroups (via `attribute_groups()`) which is a list of wide columns grouped by column family. For more information about AttributeGroup, please refer to previous PRs:  facebook#11925 facebook#11943, and facebook#11977.

To be able to return AttributeGroup in the default single CF iterator created, access to `ColumnFamilyHandle*` within `DBIter` is necessary. However, this is not currently available in `DBIter`. Since `DBImpl*` and `ColumnFamilyData*` can be easily accessed via `ColumnFamilyHandleImpl*`, we have decided to replace the pointers to `ColumnFamilyData` and `DBImpl` in `DBIter` with a pointer to `ColumnFamilyHandleImpl`.

Pull Request resolved: facebook#12395

Test Plan:
# Summary

In the current implementation of iterators, `DBImpl*` and `ColumnFamilyData*` are held in `DBIter` and `ArenaWrappedDBIter` for two purposes: tracing and Refresh() API. With the introduction of a new iterator called MultiCfIterator in PR facebook#12153 , which is a cross-column-family iterator that maintains multiple DBIters as child iterators from a consistent database state, we need to make some changes to the existing implementation. The new iterator will still be exposed through the generic Iterator interface with an additional capability to return AttributeGroups (via `attribute_groups()`) which is a list of wide columns grouped by column family. For more information about AttributeGroup, please refer to previous PRs:  facebook#11925 facebook#11943, and facebook#11977.

To be able to return AttributeGroup in the default single CF iterator created, access to `ColumnFamilyHandle*` within `DBIter` is necessary. However, this is not currently available in `DBIter`. Since `DBImpl*` and `ColumnFamilyData*` can be easily accessed via `ColumnFamilyHandleImpl*`, we have decided to replace the pointers to `ColumnFamilyData` and `DBImpl` in `DBIter` with a pointer to `ColumnFamilyHandleImpl`.

# Test Plan

There should be no behavior changes. Existing tests and CI for the correctness tests.

**Test for Perf Regression**
Build
```
$> make -j64 release
```
Setup
```
$> TEST_TMPDIR=/dev/shm/db_bench ./db_bench -benchmarks="filluniquerandom" -key_size=32 -value_size=512 -num=1000000 -compression_type=none
```
Run
```
TEST_TMPDIR=/dev/shm/db_bench ./db_bench -use_existing_db=1 -benchmarks="newiterator,seekrandom" -cache_size=10485760000
```

Before the change
```
DB path: [/dev/shm/db_bench/dbbench]
newiterator  :       0.552 micros/op 1810157 ops/sec 0.552 seconds 1000000 operations;
DB path: [/dev/shm/db_bench/dbbench]
seekrandom   :       4.502 micros/op 222143 ops/sec 4.502 seconds 1000000 operations; (0 of 1000000 found)
```
After the change
```
DB path: [/dev/shm/db_bench/dbbench]
newiterator  :       0.520 micros/op 1924401 ops/sec 0.520 seconds 1000000 operations;
DB path: [/dev/shm/db_bench/dbbench]
seekrandom   :       4.532 micros/op 220657 ops/sec 4.532 seconds 1000000 operations; (0 of 1000000 found)
```

Reviewed By: pdillinger

Differential Revision: D54332713

Pulled By: jaykorean

fbshipit-source-id: b28d897ad519e58b1ca82eb068a6319544a4fae5
  • Loading branch information
jaykorean authored and facebook-github-bot committed Mar 1, 2024
1 parent 5bcc184 commit c00c168
Show file tree
Hide file tree
Showing 15 changed files with 143 additions and 146 deletions.
57 changes: 30 additions & 27 deletions db/arena_wrapped_db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,13 @@ void ArenaWrappedDBIter::Init(
Env* env, const ReadOptions& read_options, const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iteration,
uint64_t version_number, ReadCallback* read_callback, DBImpl* db_impl,
ColumnFamilyData* cfd, bool expose_blob_index, bool allow_refresh) {
uint64_t version_number, ReadCallback* read_callback,
ColumnFamilyHandleImpl* cfh, bool expose_blob_index, bool allow_refresh) {
auto mem = arena_.AllocateAligned(sizeof(DBIter));
db_iter_ =
new (mem) DBIter(env, read_options, ioptions, mutable_cf_options,
ioptions.user_comparator, /* iter */ nullptr, version,
sequence, true, max_sequential_skip_in_iteration,
read_callback, db_impl, cfd, expose_blob_index);
db_iter_ = new (mem) DBIter(
env, read_options, ioptions, mutable_cf_options, ioptions.user_comparator,
/* iter */ nullptr, version, sequence, true,
max_sequential_skip_in_iteration, read_callback, cfh, expose_blob_index);
sv_number_ = version_number;
read_options_ = read_options;
allow_refresh_ = allow_refresh;
Expand All @@ -65,40 +64,44 @@ void ArenaWrappedDBIter::Init(
Status ArenaWrappedDBIter::Refresh() { return Refresh(nullptr); }

Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) {
if (cfd_ == nullptr || db_impl_ == nullptr || !allow_refresh_) {
if (cfh_ == nullptr || !allow_refresh_) {
return Status::NotSupported("Creating renew iterator is not allowed.");
}
assert(db_iter_ != nullptr);
auto cfd = cfh_->cfd();
auto db_impl = cfh_->db();

// TODO(yiwu): For last_seq_same_as_publish_seq_==false, this is not the
// correct behavior. Will be corrected automatically when we take a snapshot
// here for the case of WritePreparedTxnDB.
uint64_t cur_sv_number = cfd_->GetSuperVersionNumber();
uint64_t cur_sv_number = cfd->GetSuperVersionNumber();
// If we recreate a new internal iterator below (NewInternalIterator()),
// we will pass in read_options_. We need to make sure it
// has the right snapshot.
read_options_.snapshot = snapshot;
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:1");
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:2");

auto reinit_internal_iter = [&]() {
Env* env = db_iter_->env();
db_iter_->~DBIter();
arena_.~Arena();
new (&arena_) Arena();

SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_);
SuperVersion* sv = cfd->GetReferencedSuperVersion(db_impl);
assert(sv->version_number >= cur_sv_number);
SequenceNumber read_seq = GetSeqNum(db_impl_, snapshot);
SequenceNumber read_seq = GetSeqNum(db_impl, snapshot);
if (read_callback_) {
read_callback_->Refresh(read_seq);
}
Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options,
Init(env, read_options_, *(cfd->ioptions()), sv->mutable_cf_options,
sv->current, read_seq,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, read_callback_, db_impl_, cfd_, expose_blob_index_,
sv->version_number, read_callback_, cfh_, expose_blob_index_,
allow_refresh_);

InternalIterator* internal_iter = db_impl_->NewInternalIterator(
read_options_, cfd_, sv, &arena_, read_seq,
InternalIterator* internal_iter = db_impl->NewInternalIterator(
read_options_, cfd, sv, &arena_, read_seq,
/* allow_unprepared_value */ true, /* db_iter */ this);
SetIterUnderDBIter(internal_iter);
};
Expand All @@ -107,10 +110,10 @@ Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) {
reinit_internal_iter();
break;
} else {
SequenceNumber read_seq = GetSeqNum(db_impl_, snapshot);
SequenceNumber read_seq = GetSeqNum(db_impl, snapshot);
// Refresh range-tombstones in MemTable
if (!read_options_.ignore_range_deletions) {
SuperVersion* sv = cfd_->GetThreadLocalSuperVersion(db_impl_);
SuperVersion* sv = cfd->GetThreadLocalSuperVersion(db_impl);
TEST_SYNC_POINT_CALLBACK("ArenaWrappedDBIter::Refresh:SV", nullptr);
auto t = sv->mem->NewRangeTombstoneIterator(
read_options_, read_seq, false /* immutable_memtable */);
Expand All @@ -123,13 +126,13 @@ Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) {
// will be freed during db_iter destruction there.
if (memtable_range_tombstone_iter_) {
assert(!*memtable_range_tombstone_iter_ ||
sv_number_ != cfd_->GetSuperVersionNumber());
sv_number_ != cfd->GetSuperVersionNumber());
}
delete t;
} else { // current mutable memtable has range tombstones
if (!memtable_range_tombstone_iter_) {
delete t;
db_impl_->ReturnAndCleanupSuperVersion(cfd_, sv);
db_impl->ReturnAndCleanupSuperVersion(cfd, sv);
// The memtable under DBIter did not have range tombstone before
// refresh.
reinit_internal_iter();
Expand All @@ -138,13 +141,13 @@ Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) {
delete *memtable_range_tombstone_iter_;
*memtable_range_tombstone_iter_ = new TruncatedRangeDelIterator(
std::unique_ptr<FragmentedRangeTombstoneIterator>(t),
&cfd_->internal_comparator(), nullptr, nullptr);
&cfd->internal_comparator(), nullptr, nullptr);
}
}
db_impl_->ReturnAndCleanupSuperVersion(cfd_, sv);
db_impl->ReturnAndCleanupSuperVersion(cfd, sv);
}
// Check again if the latest super version number is changed
uint64_t latest_sv_number = cfd_->GetSuperVersionNumber();
uint64_t latest_sv_number = cfd->GetSuperVersionNumber();
if (latest_sv_number != cur_sv_number) {
// If the super version number is changed after refreshing,
// fallback to Re-Init the InternalIterator
Expand All @@ -163,14 +166,14 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const ReadOptions& read_options, const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations,
uint64_t version_number, ReadCallback* read_callback, DBImpl* db_impl,
ColumnFamilyData* cfd, bool expose_blob_index, bool allow_refresh) {
uint64_t version_number, ReadCallback* read_callback,
ColumnFamilyHandleImpl* cfh, bool expose_blob_index, bool allow_refresh) {
ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
iter->Init(env, read_options, ioptions, mutable_cf_options, version, sequence,
max_sequential_skip_in_iterations, version_number, read_callback,
db_impl, cfd, expose_blob_index, allow_refresh);
if (db_impl != nullptr && cfd != nullptr && allow_refresh) {
iter->StoreRefreshInfo(db_impl, cfd, read_callback, expose_blob_index);
cfh, expose_blob_index, allow_refresh);
if (cfh != nullptr && allow_refresh) {
iter->StoreRefreshInfo(cfh, read_callback, expose_blob_index);
}

return iter;
Expand Down
16 changes: 7 additions & 9 deletions db/arena_wrapped_db_iter.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,14 @@ class ArenaWrappedDBIter : public Iterator {
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
ReadCallback* read_callback, ColumnFamilyHandleImpl* cfh,
bool expose_blob_index, bool allow_refresh);

// Store some parameters so we can refresh the iterator at a later point
// with these same params
void StoreRefreshInfo(DBImpl* db_impl, ColumnFamilyData* cfd,
void StoreRefreshInfo(ColumnFamilyHandleImpl* cfh,
ReadCallback* read_callback, bool expose_blob_index) {
db_impl_ = db_impl;
cfd_ = cfd;
cfh_ = cfh;
read_callback_ = read_callback;
expose_blob_index_ = expose_blob_index;
}
Expand All @@ -104,8 +103,7 @@ class ArenaWrappedDBIter : public Iterator {
DBIter* db_iter_ = nullptr;
Arena arena_;
uint64_t sv_number_;
ColumnFamilyData* cfd_ = nullptr;
DBImpl* db_impl_ = nullptr;
ColumnFamilyHandleImpl* cfh_ = nullptr;
ReadOptions read_options_;
ReadCallback* read_callback_;
bool expose_blob_index_ = false;
Expand All @@ -116,13 +114,13 @@ class ArenaWrappedDBIter : public Iterator {
};

// Generate the arena wrapped iterator class.
// `db_impl` and `cfd` are used for reneweal. If left null, renewal will not
// `cfh` is used for reneweal. If left null, renewal will not
// be supported.
ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const ReadOptions& read_options, const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations,
uint64_t version_number, ReadCallback* read_callback,
DBImpl* db_impl = nullptr, ColumnFamilyData* cfd = nullptr,
bool expose_blob_index = false, bool allow_refresh = true);
ColumnFamilyHandleImpl* cfh = nullptr, bool expose_blob_index = false,
bool allow_refresh = true);
} // namespace ROCKSDB_NAMESPACE
10 changes: 4 additions & 6 deletions db/blob/db_blob_index_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ class DBBlobIndexTest : public DBTestBase {
DBBlobIndexTest() : DBTestBase("db_blob_index_test", /*env_do_fsync=*/true) {}

ColumnFamilyHandle* cfh() { return dbfull()->DefaultColumnFamily(); }

ColumnFamilyData* cfd() {
return static_cast_with_check<ColumnFamilyHandleImpl>(cfh())->cfd();
ColumnFamilyHandleImpl* cfh_impl() {
return static_cast_with_check<ColumnFamilyHandleImpl>(cfh());
}
ColumnFamilyData* cfd() { return cfh_impl()->cfd(); }

Status PutBlobIndex(WriteBatch* batch, const Slice& key,
const Slice& blob_index) {
Expand Down Expand Up @@ -96,11 +96,9 @@ class DBBlobIndexTest : public DBTestBase {
}

ArenaWrappedDBIter* GetBlobIterator() {
ColumnFamilyData* column_family = cfd();
DBImpl* db_impl = dbfull();
return db_impl->NewIteratorImpl(
ReadOptions(), column_family,
column_family->GetReferencedSuperVersion(db_impl),
ReadOptions(), cfh_impl(), cfd()->GetReferencedSuperVersion(db_impl),
db_impl->GetLatestSequenceNumber(), nullptr /*read_callback*/,
true /*expose_blob_index*/);
}
Expand Down
1 change: 1 addition & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ class ColumnFamilyHandleImpl : public ColumnFamilyHandle {
// destroy without mutex
virtual ~ColumnFamilyHandleImpl();
virtual ColumnFamilyData* cfd() const { return cfd_; }
virtual DBImpl* db() const { return db_; }

uint32_t GetID() const override;
const std::string& GetName() const override;
Expand Down
54 changes: 27 additions & 27 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3618,9 +3618,9 @@ Iterator* DBImpl::NewIterator(const ReadOptions& _read_options,
}

auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
assert(cfh != nullptr);
ColumnFamilyData* cfd = cfh->cfd();
assert(cfd != nullptr);
ReadCallback* read_callback = nullptr; // No read callback provided.
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
if (read_options.timestamp && read_options.timestamp->size() > 0) {
const Status s =
Expand All @@ -3636,24 +3636,24 @@ Iterator* DBImpl::NewIterator(const ReadOptions& _read_options,
result = NewDBIterator(
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
cfd->user_comparator(), iter, sv->current, kMaxSequenceNumber,
sv->mutable_cf_options.max_sequential_skip_in_iterations, read_callback,
this, cfd);
sv->mutable_cf_options.max_sequential_skip_in_iterations,
nullptr /* read_callback */, cfh);
} else {
// Note: no need to consider the special case of
// last_seq_same_as_publish_seq_==false since NewIterator is overridden in
// WritePreparedTxnDB
result = NewIteratorImpl(read_options, cfd, sv,
result = NewIteratorImpl(read_options, cfh, sv,
(read_options.snapshot != nullptr)
? read_options.snapshot->GetSequenceNumber()
: kMaxSequenceNumber,
read_callback);
nullptr /* read_callback */);
}
return result;
}

ArenaWrappedDBIter* DBImpl::NewIteratorImpl(
const ReadOptions& read_options, ColumnFamilyData* cfd, SuperVersion* sv,
SequenceNumber snapshot, ReadCallback* read_callback,
const ReadOptions& read_options, ColumnFamilyHandleImpl* cfh,
SuperVersion* sv, SequenceNumber snapshot, ReadCallback* read_callback,
bool expose_blob_index, bool allow_refresh) {
TEST_SYNC_POINT("DBImpl::NewIterator:1");
TEST_SYNC_POINT("DBImpl::NewIterator:2");
Expand Down Expand Up @@ -3716,13 +3716,13 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(
// likely that any iterator pointer is close to the iterator it points to so
// that they are likely to be in the same cache line and/or page.
ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, sv->current,
snapshot, sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, read_callback, this, cfd, expose_blob_index,
allow_refresh);
env_, read_options, *cfh->cfd()->ioptions(), sv->mutable_cf_options,
sv->current, snapshot,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, read_callback, cfh, expose_blob_index, allow_refresh);

InternalIterator* internal_iter = NewInternalIterator(
db_iter->GetReadOptions(), cfd, sv, db_iter->GetArena(), snapshot,
db_iter->GetReadOptions(), cfh->cfd(), sv, db_iter->GetArena(), snapshot,
/* allow_unprepared_value */ true, db_iter);
db_iter->SetIterUnderDBIter(internal_iter);

Expand Down Expand Up @@ -3769,37 +3769,37 @@ Status DBImpl::NewIterators(
}
}

ReadCallback* read_callback = nullptr; // No read callback provided.
iterators->clear();
iterators->reserve(column_families.size());
autovector<std::tuple<ColumnFamilyData*, SuperVersion*>> cfd_to_sv;
autovector<std::tuple<ColumnFamilyHandleImpl*, SuperVersion*>> cfh_to_sv;
const bool check_read_ts =
read_options.timestamp && read_options.timestamp->size() > 0;
for (auto cfh : column_families) {
auto cfd = static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd();
for (auto cf : column_families) {
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(cf);
auto cfd = cfh->cfd();
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
cfd_to_sv.emplace_back(cfd, sv);
cfh_to_sv.emplace_back(cfh, sv);
if (check_read_ts) {
const Status s =
FailIfReadCollapsedHistory(cfd, sv, *(read_options.timestamp));
if (!s.ok()) {
for (auto prev_entry : cfd_to_sv) {
for (auto prev_entry : cfh_to_sv) {
CleanupSuperVersion(std::get<1>(prev_entry));
}
return s;
}
}
}
assert(cfd_to_sv.size() == column_families.size());
assert(cfh_to_sv.size() == column_families.size());
if (read_options.tailing) {
for (auto [cfd, sv] : cfd_to_sv) {
auto iter = new ForwardIterator(this, read_options, cfd, sv,
for (auto [cfh, sv] : cfh_to_sv) {
auto iter = new ForwardIterator(this, read_options, cfh->cfd(), sv,
/* allow_unprepared_value */ true);
iterators->push_back(NewDBIterator(
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
cfd->user_comparator(), iter, sv->current, kMaxSequenceNumber,
env_, read_options, *cfh->cfd()->ioptions(), sv->mutable_cf_options,
cfh->cfd()->user_comparator(), iter, sv->current, kMaxSequenceNumber,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
read_callback, this, cfd));
nullptr /*read_callback*/, cfh));
}
} else {
// Note: no need to consider the special case of
Expand All @@ -3808,9 +3808,9 @@ Status DBImpl::NewIterators(
auto snapshot = read_options.snapshot != nullptr
? read_options.snapshot->GetSequenceNumber()
: versions_->LastSequence();
for (auto [cfd, sv] : cfd_to_sv) {
iterators->push_back(
NewIteratorImpl(read_options, cfd, sv, snapshot, read_callback));
for (auto [cfh, sv] : cfh_to_sv) {
iterators->push_back(NewIteratorImpl(read_options, cfh, sv, snapshot,
nullptr /*read_callback*/));
}
}

Expand Down
4 changes: 2 additions & 2 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -646,8 +646,8 @@ class DBImpl : public DB {

// If `snapshot` == kMaxSequenceNumber, set a recent one inside the file.
ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options,
ColumnFamilyData* cfd, SuperVersion* sv,
SequenceNumber snapshot,
ColumnFamilyHandleImpl* cfh,
SuperVersion* sv, SequenceNumber snapshot,
ReadCallback* read_callback,
bool expose_blob_index = false,
bool allow_refresh = true);
Expand Down
Loading

0 comments on commit c00c168

Please sign in to comment.