Skip to content

Commit

Permalink
Access DBImpl* and CFD* by CFHImpl* in Iterators
Browse files Browse the repository at this point in the history
  • Loading branch information
jaykorean committed Feb 28, 2024
1 parent a4ff83d commit 1debf09
Show file tree
Hide file tree
Showing 15 changed files with 132 additions and 137 deletions.
53 changes: 26 additions & 27 deletions db/arena_wrapped_db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,13 @@ void ArenaWrappedDBIter::Init(
Env* env, const ReadOptions& read_options, const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iteration,
uint64_t version_number, ReadCallback* read_callback, DBImpl* db_impl,
ColumnFamilyData* cfd, bool expose_blob_index, bool allow_refresh) {
uint64_t version_number, ReadCallback* read_callback,
ColumnFamilyHandleImpl* cfh, bool expose_blob_index, bool allow_refresh) {
auto mem = arena_.AllocateAligned(sizeof(DBIter));
db_iter_ =
new (mem) DBIter(env, read_options, ioptions, mutable_cf_options,
ioptions.user_comparator, /* iter */ nullptr, version,
sequence, true, max_sequential_skip_in_iteration,
read_callback, db_impl, cfd, expose_blob_index);
db_iter_ = new (mem) DBIter(
env, read_options, ioptions, mutable_cf_options, ioptions.user_comparator,
/* iter */ nullptr, version, sequence, true,
max_sequential_skip_in_iteration, read_callback, cfh, expose_blob_index);
sv_number_ = version_number;
read_options_ = read_options;
allow_refresh_ = allow_refresh;
Expand All @@ -65,14 +64,14 @@ void ArenaWrappedDBIter::Init(
Status ArenaWrappedDBIter::Refresh() { return Refresh(nullptr); }

Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) {
if (cfd_ == nullptr || db_impl_ == nullptr || !allow_refresh_) {
if (cfh_ == nullptr || !allow_refresh_) {
return Status::NotSupported("Creating renew iterator is not allowed.");
}
assert(db_iter_ != nullptr);
// TODO(yiwu): For last_seq_same_as_publish_seq_==false, this is not the
// correct behavior. Will be corrected automatically when we take a snapshot
// here for the case of WritePreparedTxnDB.
uint64_t cur_sv_number = cfd_->GetSuperVersionNumber();
uint64_t cur_sv_number = cfh_->cfd()->GetSuperVersionNumber();
// If we recreate a new internal iterator below (NewInternalIterator()),
// we will pass in read_options_. We need to make sure it
// has the right snapshot.
Expand All @@ -85,20 +84,20 @@ Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) {
arena_.~Arena();
new (&arena_) Arena();

SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_);
SuperVersion* sv = cfh_->cfd()->GetReferencedSuperVersion(cfh_->db());
assert(sv->version_number >= cur_sv_number);
SequenceNumber read_seq = GetSeqNum(db_impl_, snapshot);
SequenceNumber read_seq = GetSeqNum(cfh_->db(), snapshot);
if (read_callback_) {
read_callback_->Refresh(read_seq);
}
Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options,
Init(env, read_options_, *(cfh_->cfd()->ioptions()), sv->mutable_cf_options,
sv->current, read_seq,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, read_callback_, db_impl_, cfd_, expose_blob_index_,
sv->version_number, read_callback_, cfh_, expose_blob_index_,
allow_refresh_);

InternalIterator* internal_iter = db_impl_->NewInternalIterator(
read_options_, cfd_, sv, &arena_, read_seq,
InternalIterator* internal_iter = cfh_->db()->NewInternalIterator(
read_options_, cfh_->cfd(), sv, &arena_, read_seq,
/* allow_unprepared_value */ true, /* db_iter */ this);
SetIterUnderDBIter(internal_iter);
};
Expand All @@ -107,10 +106,10 @@ Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) {
reinit_internal_iter();
break;
} else {
SequenceNumber read_seq = GetSeqNum(db_impl_, snapshot);
SequenceNumber read_seq = GetSeqNum(cfh_->db(), snapshot);
// Refresh range-tombstones in MemTable
if (!read_options_.ignore_range_deletions) {
SuperVersion* sv = cfd_->GetThreadLocalSuperVersion(db_impl_);
SuperVersion* sv = cfh_->cfd()->GetThreadLocalSuperVersion(cfh_->db());
TEST_SYNC_POINT_CALLBACK("ArenaWrappedDBIter::Refresh:SV", nullptr);
auto t = sv->mem->NewRangeTombstoneIterator(
read_options_, read_seq, false /* immutable_memtable */);
Expand All @@ -123,13 +122,13 @@ Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) {
// will be freed during db_iter destruction there.
if (memtable_range_tombstone_iter_) {
assert(!*memtable_range_tombstone_iter_ ||
sv_number_ != cfd_->GetSuperVersionNumber());
sv_number_ != cfh_->cfd()->GetSuperVersionNumber());
}
delete t;
} else { // current mutable memtable has range tombstones
if (!memtable_range_tombstone_iter_) {
delete t;
db_impl_->ReturnAndCleanupSuperVersion(cfd_, sv);
cfh_->db()->ReturnAndCleanupSuperVersion(cfh_->cfd(), sv);
// The memtable under DBIter did not have range tombstone before
// refresh.
reinit_internal_iter();
Expand All @@ -138,13 +137,13 @@ Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) {
delete *memtable_range_tombstone_iter_;
*memtable_range_tombstone_iter_ = new TruncatedRangeDelIterator(
std::unique_ptr<FragmentedRangeTombstoneIterator>(t),
&cfd_->internal_comparator(), nullptr, nullptr);
&cfh_->cfd()->internal_comparator(), nullptr, nullptr);
}
}
db_impl_->ReturnAndCleanupSuperVersion(cfd_, sv);
cfh_->db()->ReturnAndCleanupSuperVersion(cfh_->cfd(), sv);
}
// Check again if the latest super version number is changed
uint64_t latest_sv_number = cfd_->GetSuperVersionNumber();
uint64_t latest_sv_number = cfh_->cfd()->GetSuperVersionNumber();
if (latest_sv_number != cur_sv_number) {
// If the super version number is changed after refreshing,
// fallback to Re-Init the InternalIterator
Expand All @@ -163,14 +162,14 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const ReadOptions& read_options, const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations,
uint64_t version_number, ReadCallback* read_callback, DBImpl* db_impl,
ColumnFamilyData* cfd, bool expose_blob_index, bool allow_refresh) {
uint64_t version_number, ReadCallback* read_callback,
ColumnFamilyHandleImpl* cfh, bool expose_blob_index, bool allow_refresh) {
ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
iter->Init(env, read_options, ioptions, mutable_cf_options, version, sequence,
max_sequential_skip_in_iterations, version_number, read_callback,
db_impl, cfd, expose_blob_index, allow_refresh);
if (db_impl != nullptr && cfd != nullptr && allow_refresh) {
iter->StoreRefreshInfo(db_impl, cfd, read_callback, expose_blob_index);
cfh, expose_blob_index, allow_refresh);
if (cfh != nullptr && allow_refresh) {
iter->StoreRefreshInfo(cfh, read_callback, expose_blob_index);
}

return iter;
Expand Down
16 changes: 7 additions & 9 deletions db/arena_wrapped_db_iter.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,14 @@ class ArenaWrappedDBIter : public Iterator {
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
ReadCallback* read_callback, ColumnFamilyHandleImpl* cfh,
bool expose_blob_index, bool allow_refresh);

// Store some parameters so we can refresh the iterator at a later point
// with these same params
void StoreRefreshInfo(DBImpl* db_impl, ColumnFamilyData* cfd,
void StoreRefreshInfo(ColumnFamilyHandleImpl* cfh,
ReadCallback* read_callback, bool expose_blob_index) {
db_impl_ = db_impl;
cfd_ = cfd;
cfh_ = cfh;
read_callback_ = read_callback;
expose_blob_index_ = expose_blob_index;
}
Expand All @@ -104,8 +103,7 @@ class ArenaWrappedDBIter : public Iterator {
DBIter* db_iter_ = nullptr;
Arena arena_;
uint64_t sv_number_;
ColumnFamilyData* cfd_ = nullptr;
DBImpl* db_impl_ = nullptr;
ColumnFamilyHandleImpl* cfh_ = nullptr;
ReadOptions read_options_;
ReadCallback* read_callback_;
bool expose_blob_index_ = false;
Expand All @@ -116,13 +114,13 @@ class ArenaWrappedDBIter : public Iterator {
};

// Generate the arena wrapped iterator class.
// `db_impl` and `cfd` are used for reneweal. If left null, renewal will not
// `cfh` is used for reneweal. If left null, renewal will not
// be supported.
ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const ReadOptions& read_options, const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations,
uint64_t version_number, ReadCallback* read_callback,
DBImpl* db_impl = nullptr, ColumnFamilyData* cfd = nullptr,
bool expose_blob_index = false, bool allow_refresh = true);
ColumnFamilyHandleImpl* cfh = nullptr, bool expose_blob_index = false,
bool allow_refresh = true);
} // namespace ROCKSDB_NAMESPACE
10 changes: 4 additions & 6 deletions db/blob/db_blob_index_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ class DBBlobIndexTest : public DBTestBase {
DBBlobIndexTest() : DBTestBase("db_blob_index_test", /*env_do_fsync=*/true) {}

ColumnFamilyHandle* cfh() { return dbfull()->DefaultColumnFamily(); }

ColumnFamilyData* cfd() {
return static_cast_with_check<ColumnFamilyHandleImpl>(cfh())->cfd();
ColumnFamilyHandleImpl* cfh_impl() {
return static_cast_with_check<ColumnFamilyHandleImpl>(cfh());
}
ColumnFamilyData* cfd() { return cfh_impl()->cfd(); }

Status PutBlobIndex(WriteBatch* batch, const Slice& key,
const Slice& blob_index) {
Expand Down Expand Up @@ -96,11 +96,9 @@ class DBBlobIndexTest : public DBTestBase {
}

ArenaWrappedDBIter* GetBlobIterator() {
ColumnFamilyData* column_family = cfd();
DBImpl* db_impl = dbfull();
return db_impl->NewIteratorImpl(
ReadOptions(), column_family,
column_family->GetReferencedSuperVersion(db_impl),
ReadOptions(), cfh_impl(), cfd()->GetReferencedSuperVersion(db_impl),
db_impl->GetLatestSequenceNumber(), nullptr /*read_callback*/,
true /*expose_blob_index*/);
}
Expand Down
1 change: 1 addition & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ class ColumnFamilyHandleImpl : public ColumnFamilyHandle {
// destroy without mutex
virtual ~ColumnFamilyHandleImpl();
virtual ColumnFamilyData* cfd() const { return cfd_; }
virtual DBImpl* db() const { return db_; }

uint32_t GetID() const override;
const std::string& GetName() const override;
Expand Down
41 changes: 22 additions & 19 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3628,12 +3628,12 @@ Iterator* DBImpl::NewIterator(const ReadOptions& _read_options,
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
cfd->user_comparator(), iter, sv->current, kMaxSequenceNumber,
sv->mutable_cf_options.max_sequential_skip_in_iterations, read_callback,
this, cfd);
cfh);
} else {
// Note: no need to consider the special case of
// last_seq_same_as_publish_seq_==false since NewIterator is overridden in
// WritePreparedTxnDB
result = NewIteratorImpl(read_options, cfd, sv,
result = NewIteratorImpl(read_options, cfh, sv,
(read_options.snapshot != nullptr)
? read_options.snapshot->GetSequenceNumber()
: kMaxSequenceNumber,
Expand All @@ -3643,8 +3643,8 @@ Iterator* DBImpl::NewIterator(const ReadOptions& _read_options,
}

ArenaWrappedDBIter* DBImpl::NewIteratorImpl(
const ReadOptions& read_options, ColumnFamilyData* cfd, SuperVersion* sv,
SequenceNumber snapshot, ReadCallback* read_callback,
const ReadOptions& read_options, ColumnFamilyHandleImpl* cfh,
SuperVersion* sv, SequenceNumber snapshot, ReadCallback* read_callback,
bool expose_blob_index, bool allow_refresh) {
TEST_SYNC_POINT("DBImpl::NewIterator:1");
TEST_SYNC_POINT("DBImpl::NewIterator:2");
Expand Down Expand Up @@ -3707,13 +3707,13 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(
// likely that any iterator pointer is close to the iterator it points to so
// that they are likely to be in the same cache line and/or page.
ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, sv->current,
snapshot, sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, read_callback, this, cfd, expose_blob_index,
allow_refresh);
env_, read_options, *cfh->cfd()->ioptions(), sv->mutable_cf_options,
sv->current, snapshot,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, read_callback, cfh, expose_blob_index, allow_refresh);

InternalIterator* internal_iter = NewInternalIterator(
db_iter->GetReadOptions(), cfd, sv, db_iter->GetArena(), snapshot,
db_iter->GetReadOptions(), cfh->cfd(), sv, db_iter->GetArena(), snapshot,
/* allow_unprepared_value */ true, db_iter);
db_iter->SetIterUnderDBIter(internal_iter);

Expand Down Expand Up @@ -3763,34 +3763,37 @@ Status DBImpl::NewIterators(
ReadCallback* read_callback = nullptr; // No read callback provided.
iterators->clear();
iterators->reserve(column_families.size());
autovector<std::tuple<ColumnFamilyData*, SuperVersion*>> cfd_to_sv;
autovector<
std::tuple<ColumnFamilyData*, SuperVersion*, ColumnFamilyHandleImpl*>>
cfd_to_sv_to_cfh;
const bool check_read_ts =
read_options.timestamp && read_options.timestamp->size() > 0;
for (auto cfh : column_families) {
auto cfd = static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd();
for (auto cf : column_families) {
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(cf);
auto cfd = cfh->cfd();
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
cfd_to_sv.emplace_back(cfd, sv);
cfd_to_sv_to_cfh.emplace_back(cfd, sv, cfh);
if (check_read_ts) {
const Status s =
FailIfReadCollapsedHistory(cfd, sv, *(read_options.timestamp));
if (!s.ok()) {
for (auto prev_entry : cfd_to_sv) {
for (auto prev_entry : cfd_to_sv_to_cfh) {
CleanupSuperVersion(std::get<1>(prev_entry));
}
return s;
}
}
}
assert(cfd_to_sv.size() == column_families.size());
assert(cfd_to_sv_to_cfh.size() == column_families.size());
if (read_options.tailing) {
for (auto [cfd, sv] : cfd_to_sv) {
for (auto [cfd, sv, cfh] : cfd_to_sv_to_cfh) {
auto iter = new ForwardIterator(this, read_options, cfd, sv,
/* allow_unprepared_value */ true);
iterators->push_back(NewDBIterator(
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
cfd->user_comparator(), iter, sv->current, kMaxSequenceNumber,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
read_callback, this, cfd));
read_callback, cfh));
}
} else {
// Note: no need to consider the special case of
Expand All @@ -3799,9 +3802,9 @@ Status DBImpl::NewIterators(
auto snapshot = read_options.snapshot != nullptr
? read_options.snapshot->GetSequenceNumber()
: versions_->LastSequence();
for (auto [cfd, sv] : cfd_to_sv) {
for (auto [cfd, sv, cfh] : cfd_to_sv_to_cfh) {
iterators->push_back(
NewIteratorImpl(read_options, cfd, sv, snapshot, read_callback));
NewIteratorImpl(read_options, cfh, sv, snapshot, read_callback));
}
}

Expand Down
4 changes: 2 additions & 2 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -646,8 +646,8 @@ class DBImpl : public DB {

// If `snapshot` == kMaxSequenceNumber, set a recent one inside the file.
ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options,
ColumnFamilyData* cfd, SuperVersion* sv,
SequenceNumber snapshot,
ColumnFamilyHandleImpl* cfh,
SuperVersion* sv, SequenceNumber snapshot,
ReadCallback* read_callback,
bool expose_blob_index = false,
bool allow_refresh = true);
Expand Down
33 changes: 17 additions & 16 deletions db/db_impl/db_impl_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -517,27 +517,27 @@ Iterator* DBImplSecondary::NewIterator(const ReadOptions& _read_options,
return NewErrorIterator(s);
}
}
result = NewIteratorImpl(read_options, cfd, sv, snapshot, read_callback);
result = NewIteratorImpl(read_options, cfh, sv, snapshot, read_callback);
}
return result;
}

ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl(
const ReadOptions& read_options, ColumnFamilyData* cfd,
const ReadOptions& read_options, ColumnFamilyHandleImpl* cfh,
SuperVersion* super_version, SequenceNumber snapshot,
ReadCallback* read_callback, bool expose_blob_index, bool allow_refresh) {
assert(nullptr != cfd);
assert(nullptr != cfh);
assert(snapshot == kMaxSequenceNumber);
snapshot = versions_->LastSequence();
assert(snapshot != kMaxSequenceNumber);
auto db_iter = NewArenaWrappedDbIterator(
env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
super_version->current, snapshot,
env_, read_options, *cfh->cfd()->ioptions(),
super_version->mutable_cf_options, super_version->current, snapshot,
super_version->mutable_cf_options.max_sequential_skip_in_iterations,
super_version->version_number, read_callback, this, cfd,
expose_blob_index, allow_refresh);
super_version->version_number, read_callback, cfh, expose_blob_index,
allow_refresh);
auto internal_iter = NewInternalIterator(
db_iter->GetReadOptions(), cfd, super_version, db_iter->GetArena(),
db_iter->GetReadOptions(), cfh->cfd(), super_version, db_iter->GetArena(),
snapshot, /* allow_unprepared_value */ true, db_iter);
db_iter->SetIterUnderDBIter(internal_iter);
return db_iter;
Expand Down Expand Up @@ -596,28 +596,29 @@ Status DBImplSecondary::NewIterators(
return Status::NotSupported("snapshot not supported in secondary mode");
} else {
SequenceNumber read_seq(kMaxSequenceNumber);
autovector<std::tuple<ColumnFamilyData*, SuperVersion*>> cfd_to_sv;
autovector<std::tuple<ColumnFamilyHandleImpl*, SuperVersion*>> cfh_to_sv;
const bool check_read_ts =
read_options.timestamp && read_options.timestamp->size() > 0;
for (auto cfh : column_families) {
ColumnFamilyData* cfd = static_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
for (auto cf : column_families) {
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(cf);
auto cfd = cfh->cfd();
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
cfd_to_sv.emplace_back(cfd, sv);
cfh_to_sv.emplace_back(cfh, sv);
if (check_read_ts) {
const Status s =
FailIfReadCollapsedHistory(cfd, sv, *(read_options.timestamp));
if (!s.ok()) {
for (auto prev_entry : cfd_to_sv) {
for (auto prev_entry : cfh_to_sv) {
CleanupSuperVersion(std::get<1>(prev_entry));
}
return s;
}
}
}
assert(cfd_to_sv.size() == column_families.size());
for (auto [cfd, sv] : cfd_to_sv) {
assert(cfh_to_sv.size() == column_families.size());
for (auto [cfh, sv] : cfh_to_sv) {
iterators->push_back(
NewIteratorImpl(read_options, cfd, sv, read_seq, read_callback));
NewIteratorImpl(read_options, cfh, sv, read_seq, read_callback));
}
}
return Status::OK();
Expand Down
Loading

0 comments on commit 1debf09

Please sign in to comment.