diff --git a/db/attribute_group_iterator_impl.cc b/db/attribute_group_iterator_impl.cc index 622633aa38ba..352c293f0703 100644 --- a/db/attribute_group_iterator_impl.cc +++ b/db/attribute_group_iterator_impl.cc @@ -9,18 +9,9 @@ namespace ROCKSDB_NAMESPACE { const AttributeGroups kNoAttributeGroups; -bool AttributeGroupIteratorImpl::Valid() const { return impl_.Valid(); } -void AttributeGroupIteratorImpl::SeekToFirst() { impl_.SeekToFirst(); } -void AttributeGroupIteratorImpl::SeekToLast() { impl_.SeekToLast(); } -void AttributeGroupIteratorImpl::Seek(const Slice& target) { - impl_.Seek(target); +void AttributeGroupIteratorImpl::AddToAttributeGroups( + ColumnFamilyHandle* /*cfh*/, const WideColumns& /*columns*/) { + // TODO - Implement AttributeGroup population } -void AttributeGroupIteratorImpl::SeekForPrev(const Slice& target) { - impl_.SeekForPrev(target); -} -void AttributeGroupIteratorImpl::Next() { impl_.Next(); } -void AttributeGroupIteratorImpl::Prev() { impl_.Prev(); } -Slice AttributeGroupIteratorImpl::key() const { return impl_.key(); } -Status AttributeGroupIteratorImpl::status() const { return impl_.status(); } } // namespace ROCKSDB_NAMESPACE diff --git a/db/attribute_group_iterator_impl.h b/db/attribute_group_iterator_impl.h index 1d12ee0e7036..d42c2ef8daf8 100644 --- a/db/attribute_group_iterator_impl.h +++ b/db/attribute_group_iterator_impl.h @@ -5,16 +5,8 @@ #pragma once -#include -#include - #include "db/multi_cf_iterator_impl.h" #include "rocksdb/attribute_groups.h" -#include "rocksdb/comparator.h" -#include "rocksdb/iterator.h" -#include "rocksdb/options.h" -#include "util/heap.h" -#include "util/overload.h" namespace ROCKSDB_NAMESPACE { @@ -24,7 +16,11 @@ class AttributeGroupIteratorImpl : public AttributeGroupIterator { const Comparator* comparator, const std::vector& column_families, const std::vector& child_iterators) - : impl_(comparator, column_families, child_iterators) {} + : impl_( + comparator, column_families, child_iterators, [this]() { Reset(); }, + [this](ColumnFamilyHandle* cfh, Iterator* iter) { + AddToAttributeGroups(cfh, iter->columns()); + }) {} ~AttributeGroupIteratorImpl() override {} // No copy allowed @@ -32,24 +28,28 @@ class AttributeGroupIteratorImpl : public AttributeGroupIterator { AttributeGroupIteratorImpl& operator=(const AttributeGroupIteratorImpl&) = delete; - bool Valid() const override; - void SeekToFirst() override; - void SeekToLast() override; - void Seek(const Slice& target) override; - void SeekForPrev(const Slice& target) override; - void Next() override; - void Prev() override; - Slice key() const override; - Status status() const override; + bool Valid() const override { return impl_.Valid(); } + void SeekToFirst() override { impl_.SeekToFirst(); } + void SeekToLast() override { impl_.SeekToLast(); } + void Seek(const Slice& target) override { impl_.Seek(target); } + void SeekForPrev(const Slice& target) override { impl_.SeekForPrev(target); } + void Next() override { impl_.Next(); } + void Prev() override { impl_.Prev(); } + Slice key() const override { return impl_.key(); } + Status status() const override { return impl_.status(); } - AttributeGroups attribute_groups() const override { - // TODO - Implement - assert(false); - return kNoAttributeGroups; + const AttributeGroups& attribute_groups() const override { + assert(Valid()); + return attribute_groups_; } + void Reset() { attribute_groups_.clear(); } + private: MultiCfIteratorImpl impl_; + AttributeGroups attribute_groups_; + void AddToAttributeGroups(ColumnFamilyHandle* cfh, + const WideColumns& columns); }; class EmptyAttributeGroupIterator : public AttributeGroupIterator { @@ -68,7 +68,7 @@ class EmptyAttributeGroupIterator : public AttributeGroupIterator { } Status status() const override { return status_; } - AttributeGroups attribute_groups() const override { + const AttributeGroups& attribute_groups() const override { return kNoAttributeGroups; } diff --git a/db/coalescing_iterator.cc b/db/coalescing_iterator.cc index 6a249a1c06c4..6abc2d4b82a9 100644 --- a/db/coalescing_iterator.cc +++ b/db/coalescing_iterator.cc @@ -5,18 +5,42 @@ #include "db/coalescing_iterator.h" +#include "db/wide/wide_columns_helper.h" + namespace ROCKSDB_NAMESPACE { -bool CoalescingIterator::Valid() const { return impl_.Valid(); } -void CoalescingIterator::SeekToFirst() { impl_.SeekToFirst(); } -void CoalescingIterator::SeekToLast() { impl_.SeekToLast(); } -void CoalescingIterator::Seek(const Slice& target) { impl_.Seek(target); } -void CoalescingIterator::SeekForPrev(const Slice& target) { - impl_.SeekForPrev(target); +void CoalescingIterator::Coalesce(const WideColumns& columns) { + WideColumns coalesced; + coalesced.reserve(wide_columns_.size() + columns.size()); + auto base_cols = wide_columns_.begin(); + auto new_cols = columns.begin(); + while (base_cols != wide_columns_.end() && new_cols != columns.end()) { + auto comparison = base_cols->name().compare(new_cols->name()); + if (comparison < 0) { + coalesced.push_back(*base_cols); + ++base_cols; + } else if (comparison > 0) { + coalesced.push_back(*new_cols); + ++new_cols; + } else { + coalesced.push_back(*new_cols); + ++new_cols; + ++base_cols; + } + } + while (base_cols != wide_columns_.end()) { + coalesced.push_back(*base_cols); + ++base_cols; + } + while (new_cols != columns.end()) { + coalesced.push_back(*new_cols); + ++new_cols; + } + wide_columns_.swap(coalesced); + + if (WideColumnsHelper::HasDefaultColumn(wide_columns_)) { + value_ = WideColumnsHelper::GetDefaultColumn(wide_columns_); + } } -void CoalescingIterator::Next() { impl_.Next(); } -void CoalescingIterator::Prev() { impl_.Prev(); } -Slice CoalescingIterator::key() const { return impl_.key(); } -Status CoalescingIterator::status() const { return impl_.status(); } } // namespace ROCKSDB_NAMESPACE diff --git a/db/coalescing_iterator.h b/db/coalescing_iterator.h index 80e98aa48491..2c13514ea014 100644 --- a/db/coalescing_iterator.h +++ b/db/coalescing_iterator.h @@ -8,66 +8,56 @@ #include #include "db/multi_cf_iterator_impl.h" -#include "multi_cf_iterator_impl.h" -#include "rocksdb/comparator.h" -#include "rocksdb/iterator.h" -#include "rocksdb/options.h" -#include "util/heap.h" -#include "util/overload.h" namespace ROCKSDB_NAMESPACE { // UNDER CONSTRUCTION - DO NOT USE -// A cross-column-family iterator from a consistent database state. -// If a key exists in more than one column family, it chooses value/columns -// based on the coalescing rule provided by CoalescingOptions. See -// CoalescingOptions in options.h for details class CoalescingIterator : public Iterator { public: - CoalescingIterator(const CoalescingOptions coalesing_options, - const Comparator* comparator, + CoalescingIterator(const Comparator* comparator, const std::vector& column_families, const std::vector& child_iterators) - : impl_(comparator, column_families, child_iterators), - rule_(coalesing_options.rule) {} + : impl_( + comparator, column_families, child_iterators, [this]() { Reset(); }, + [this](ColumnFamilyHandle*, Iterator* iter) { + Coalesce(iter->columns()); + }) {} ~CoalescingIterator() override {} // No copy allowed CoalescingIterator(const CoalescingIterator&) = delete; CoalescingIterator& operator=(const CoalescingIterator&) = delete; - bool Valid() const override; - void SeekToFirst() override; - void SeekToLast() override; - void Seek(const Slice& target) override; - void SeekForPrev(const Slice& target) override; - void Next() override; - void Prev() override; - Slice key() const override; - Status status() const override; + bool Valid() const override { return impl_.Valid(); } + void SeekToFirst() override { impl_.SeekToFirst(); } + void SeekToLast() override { impl_.SeekToLast(); } + void Seek(const Slice& target) override { impl_.Seek(target); } + void SeekForPrev(const Slice& target) override { impl_.SeekForPrev(target); } + void Next() override { impl_.Next(); } + void Prev() override { impl_.Prev(); } + Slice key() const override { return impl_.key(); } + Status status() const override { return impl_.status(); } Slice value() const override { - if (rule_ == CoalescingRule::kChooseFromFirstCfContainingKey) { - assert(Valid()); - return impl_.current()->value(); - } - // TODO - add more rules - assert(false); - return Slice(); + assert(Valid()); + return value_; } const WideColumns& columns() const override { - if (rule_ == CoalescingRule::kChooseFromFirstCfContainingKey) { - assert(Valid()); - return impl_.current()->columns(); - } - // TODO - add more rules - assert(false); - return kNoWideColumns; + assert(Valid()); + return wide_columns_; + } + + void Reset() { + value_.clear(); + wide_columns_.clear(); } private: MultiCfIteratorImpl impl_; - CoalescingRule rule_; + Slice value_; + WideColumns wide_columns_; + + void Coalesce(const WideColumns& columns); }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 493aa783ffc8..558d05b08f5b 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -3747,54 +3747,49 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl( std::unique_ptr DBImpl::NewCoalescingIterator( const ReadOptions& _read_options, - const std::vector& column_families, - const CoalescingOptions& coalesing_options) { - if (column_families.size() == 0) { - return std::unique_ptr(NewErrorIterator( - Status::InvalidArgument("No Column Family was provided"))); - } - const Comparator* first_comparator = column_families[0]->GetComparator(); - for (size_t i = 1; i < column_families.size(); ++i) { - const Comparator* cf_comparator = column_families[i]->GetComparator(); - if (first_comparator != cf_comparator && - first_comparator->GetId().compare(cf_comparator->GetId()) != 0) { - return std::unique_ptr(NewErrorIterator(Status::InvalidArgument( - "Different comparators are being used across CFs"))); - } - } + const std::vector& column_families) { std::vector child_iterators; - Status s = NewIterators(_read_options, column_families, &child_iterators); - if (s.ok()) { - return std::make_unique( - coalesing_options, first_comparator, column_families, - std::move(child_iterators)); + Status s = GetChildIteratorsForMultiCfIterator(_read_options, column_families, + &child_iterators); + if (!s.ok()) { + return std::unique_ptr(NewErrorIterator(s)); } - return std::unique_ptr(NewErrorIterator(s)); + return std::make_unique( + column_families[0]->GetComparator(), column_families, + std::move(child_iterators)); } std::unique_ptr DBImpl::NewAttributeGroupIterator( const ReadOptions& _read_options, const std::vector& column_families) { + std::vector child_iterators; + Status s = GetChildIteratorsForMultiCfIterator(_read_options, column_families, + &child_iterators); + if (!s.ok()) { + return NewAttributeGroupErrorIterator(s); + } + return std::make_unique( + column_families[0]->GetComparator(), column_families, + std::move(child_iterators)); +} + +Status DBImpl::GetChildIteratorsForMultiCfIterator( + const ReadOptions& read_options, + const std::vector& column_families, + std::vector* child_iterators) { if (column_families.size() == 0) { - return NewAttributeGroupErrorIterator( - Status::InvalidArgument("No Column Family was provided")); + return Status::InvalidArgument("No Column Family was provided"); } const Comparator* first_comparator = column_families[0]->GetComparator(); for (size_t i = 1; i < column_families.size(); ++i) { const Comparator* cf_comparator = column_families[i]->GetComparator(); if (first_comparator != cf_comparator && first_comparator->GetId().compare(cf_comparator->GetId()) != 0) { - return NewAttributeGroupErrorIterator(Status::InvalidArgument( - "Different comparators are being used across CFs")); + return Status::InvalidArgument( + "Different comparators are being used across CFs"); } } - std::vector child_iterators; - Status s = NewIterators(_read_options, column_families, &child_iterators); - if (s.ok()) { - return std::make_unique( - first_comparator, column_families, std::move(child_iterators)); - } - return std::make_unique(s); + return NewIterators(read_options, column_families, child_iterators); } Status DBImpl::NewIterators( @@ -3871,8 +3866,8 @@ Status DBImpl::NewIterators( } } else { // Note: no need to consider the special case of - // last_seq_same_as_publish_seq_==false since NewIterators is overridden in - // WritePreparedTxnDB + // last_seq_same_as_publish_seq_==false since NewIterators is overridden + // in WritePreparedTxnDB auto snapshot = read_options.snapshot != nullptr ? read_options.snapshot->GetSequenceNumber() : versions_->LastSequence(); @@ -3996,8 +3991,8 @@ DBImpl::CreateTimestampedSnapshotImpl(SequenceNumber snapshot_seq, uint64_t ts, std::shared_ptr latest = timestamped_snapshots_.GetSnapshot(std::numeric_limits::max()); - // If there is already a latest timestamped snapshot, then we need to do some - // checks. + // If there is already a latest timestamped snapshot, then we need to do + // some checks. if (latest) { uint64_t latest_snap_ts = latest->GetTimestamp(); SequenceNumber latest_snap_seq = latest->GetSequenceNumber(); @@ -4006,8 +4001,8 @@ DBImpl::CreateTimestampedSnapshotImpl(SequenceNumber snapshot_seq, uint64_t ts, Status status; std::shared_ptr ret; if (latest_snap_ts > ts) { - // A snapshot created later cannot have smaller timestamp than a previous - // timestamped snapshot. + // A snapshot created later cannot have smaller timestamp than a + // previous timestamped snapshot. needs_create_snap = false; std::ostringstream oss; oss << "snapshot exists with larger timestamp " << latest_snap_ts << " > " @@ -4121,7 +4116,8 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) { // Calculate a new threshold, skipping those CFs where compactions are // scheduled. We do not do the same pass as the previous loop because - // mutex might be unlocked during the loop, making the result inaccurate. + // mutex might be unlocked during the loop, making the result + // inaccurate. SequenceNumber new_bottommost_files_mark_threshold = kMaxSequenceNumber; for (auto* cfd : *versions_->GetColumnFamilySet()) { if (CfdListContains(cf_scheduled, cfd) || @@ -4549,7 +4545,8 @@ Status DBImpl::GetApproximateSizes(const SizeApproximationOptions& options, sizes[i] = 0; if (options.include_files) { sizes[i] += versions_->ApproximateSize( - options, read_options, v, k1.Encode(), k2.Encode(), /*start_level=*/0, + options, read_options, v, k1.Encode(), k2.Encode(), + /*start_level=*/0, /*end_level=*/-1, TableReaderCaller::kUserApproximateSize); } if (options.include_memtables) { @@ -4834,9 +4831,9 @@ void DBImpl::GetColumnFamilyMetaData(ColumnFamilyHandle* column_family, static_cast_with_check(column_family)->cfd(); auto* sv = GetAndRefSuperVersion(cfd); { - // Without mutex, Version::GetColumnFamilyMetaData will have data race with - // Compaction::MarkFilesBeingCompacted. One solution is to use mutex, but - // this may cause regression. An alternative is to make + // Without mutex, Version::GetColumnFamilyMetaData will have data race + // with Compaction::MarkFilesBeingCompacted. One solution is to use mutex, + // but this may cause regression. An alternative is to make // FileMetaData::being_compacted atomic, but it will make FileMetaData // non-copy-able. Another option is to separate these variables from // original FileMetaData struct, and this requires re-organization of data diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 9f950680ed63..7cd82bd95a22 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -355,8 +355,7 @@ class DBImpl : public DB { // UNDER CONSTRUCTION - DO NOT USE std::unique_ptr NewCoalescingIterator( const ReadOptions& options, - const std::vector& column_families, - const CoalescingOptions& coalescing_options) override; + const std::vector& column_families) override; // UNDER CONSTRUCTION - DO NOT USE std::unique_ptr NewAttributeGroupIterator( @@ -2417,6 +2416,11 @@ class DBImpl : public DB { bool ShouldReferenceSuperVersion(const MergeContext& merge_context); + Status GetChildIteratorsForMultiCfIterator( + const ReadOptions& read_options, + const std::vector& column_families, + std::vector* child_iterators); + // Lock over the persistent DB state. Non-nullptr iff successfully acquired. FileLock* db_lock_; diff --git a/db/db_test.cc b/db/db_test.cc index a3a434e7e757..2f16d73d4dd5 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3202,8 +3202,7 @@ class ModelDB : public DB { // UNDER CONSTRUCTION - DO NOT USE std::unique_ptr NewCoalescingIterator( const ReadOptions& /*options*/, - const std::vector& /*column_families*/, - const CoalescingOptions& /*coalescing_options*/) override { + const std::vector& /*column_families*/) override { return nullptr; } diff --git a/db/multi_cf_iterator_impl.h b/db/multi_cf_iterator_impl.h index 37fbb473c8f8..c50e2aaa55f1 100644 --- a/db/multi_cf_iterator_impl.h +++ b/db/multi_cf_iterator_impl.h @@ -5,6 +5,7 @@ #pragma once +#include #include #include "rocksdb/comparator.h" @@ -17,12 +18,17 @@ namespace ROCKSDB_NAMESPACE { class MultiCfIteratorImpl { public: - MultiCfIteratorImpl(const Comparator* comparator, - const std::vector& column_families, - const std::vector& child_iterators) + MultiCfIteratorImpl( + const Comparator* comparator, + const std::vector& column_families, + const std::vector& child_iterators, + std::function reset_func, + std::function populate_func) : comparator_(comparator), heap_(MultiCfMinHeap( - MultiCfHeapItemComparator>(comparator_))) { + MultiCfHeapItemComparator>(comparator_))), + reset_func_(reset_func), + populate_func_(populate_func) { assert(column_families.size() > 0 && column_families.size() == child_iterators.size()); cfh_iter_pairs_.reserve(column_families.size()); @@ -42,15 +48,6 @@ class MultiCfIteratorImpl { return current()->key(); } - Iterator* current() const { - if (std::holds_alternative(heap_)) { - auto& max_heap = std::get(heap_); - return max_heap.top().iterator; - } - auto& min_heap = std::get(heap_); - return min_heap.top().iterator; - } - bool Valid() const { if (std::holds_alternative(heap_)) { auto& max_heap = std::get(heap_); @@ -64,20 +61,27 @@ class MultiCfIteratorImpl { void SeekToFirst() { auto& min_heap = GetHeap([this]() { InitMinHeap(); }); - SeekCommon(min_heap, [](Iterator* iter) { iter->SeekToFirst(); }); + SeekCommon( + min_heap, [](Iterator* iter) { iter->SeekToFirst(); }, + [](Iterator* iter) { iter->Next(); }); } void Seek(const Slice& target) { auto& min_heap = GetHeap([this]() { InitMinHeap(); }); - SeekCommon(min_heap, [&target](Iterator* iter) { iter->Seek(target); }); + SeekCommon( + min_heap, [&target](Iterator* iter) { iter->Seek(target); }, + [](Iterator* iter) { iter->Next(); }); } void SeekToLast() { auto& max_heap = GetHeap([this]() { InitMaxHeap(); }); - SeekCommon(max_heap, [](Iterator* iter) { iter->SeekToLast(); }); + SeekCommon( + max_heap, [](Iterator* iter) { iter->SeekToLast(); }, + [](Iterator* iter) { iter->Prev(); }); } void SeekForPrev(const Slice& target) { auto& max_heap = GetHeap([this]() { InitMaxHeap(); }); - SeekCommon(max_heap, - [&target](Iterator* iter) { iter->SeekForPrev(target); }); + SeekCommon( + max_heap, [&target](Iterator* iter) { iter->SeekForPrev(target); }, + [](Iterator* iter) { iter->Prev(); }); } void Next() { @@ -141,8 +145,20 @@ class MultiCfIteratorImpl { MultiCfIterHeap heap_; + std::function reset_func_; + std::function populate_func_; + // TODO: Lower and Upper bounds + Iterator* current() const { + if (std::holds_alternative(heap_)) { + auto& max_heap = std::get(heap_); + return max_heap.top().iterator; + } + auto& min_heap = std::get(heap_); + return min_heap.top().iterator; + } + void considerStatus(Status s) { if (!s.ok() && status_.ok()) { status_ = std::move(s); @@ -166,55 +182,95 @@ class MultiCfIteratorImpl { MultiCfHeapItemComparator>(comparator_)); } - template - void SeekCommon(BinaryHeap& heap, ChildSeekFuncType child_seek_func) { + template + void SeekCommon(BinaryHeap& heap, ChildSeekFuncType child_seek_func, + AdvanceFuncType advance_func) { + reset_func_(); heap.clear(); int i = 0; - for (auto& cfh_iter_pair : cfh_iter_pairs_) { - auto& cfh = cfh_iter_pair.first; - auto& iter = cfh_iter_pair.second; + for (auto& [cfh, iter] : cfh_iter_pairs_) { child_seek_func(iter.get()); if (iter->Valid()) { assert(iter->status().ok()); heap.push(MultiCfIteratorInfo{iter.get(), cfh, i}); } else { considerStatus(iter->status()); + if (!status_.ok()) { + // Non-OK status from the iterator. Bail out early + break; + } } ++i; } + if (!heap.empty()) { + PopulateIterator(heap, advance_func); + } } template void AdvanceIterator(BinaryHeap& heap, AdvanceFuncType advance_func) { - // 1. Keep the top iterator (by popping it from the heap) - // 2. Make sure all others have iterated past the top iterator key slice - // 3. Advance the top iterator, and add it back to the heap if valid + assert(!heap.empty()); + reset_func_(); + + // Because PopulateIterator() advances the same key in all non-top + // iterators, it is guaranteed that all iterator's first elements are unique + // here. Just advance the top iterator and re-heapify auto top = heap.top(); + advance_func(top.iterator); + if (top.iterator->Valid()) { + assert(top.iterator->status().ok()); + heap.replace_top(top); + } else { + considerStatus(top.iterator->status()); + if (!status_.ok()) { + heap.clear(); + return; + } else { + heap.pop(); + } + } + if (!heap.empty()) { + PopulateIterator(heap, advance_func); + } + } + + template + void PopulateIterator(BinaryHeap& heap, AdvanceFuncType advance_func) { + // 1. Keep the top iterator (by popping it from the heap) and populate + // value, columns and attribute_groups + // 2. Make sure all others have iterated past the top iterator key slice. + // While iterating, coalesce/populate value, columns and attribute_groups + // 3. Add the top iterator back without advancing it + assert(!heap.empty()); + auto top = heap.top(); + populate_func_(top.cfh, top.iterator); heap.pop(); if (!heap.empty()) { auto* current = heap.top().iterator; while (current->Valid() && comparator_->Compare(top.iterator->key(), current->key()) == 0) { assert(current->status().ok()); + populate_func_(heap.top().cfh, heap.top().iterator); advance_func(current); if (current->Valid()) { heap.replace_top(heap.top()); } else { considerStatus(current->status()); - heap.pop(); + if (!status_.ok()) { + // Non-OK status from the iterator. Bail out early + heap.clear(); + break; + } else { + heap.pop(); + } } if (!heap.empty()) { current = heap.top().iterator; } } } - advance_func(top.iterator); - if (top.iterator->Valid()) { - assert(top.iterator->status().ok()); - heap.push(top); - } else { - considerStatus(top.iterator->status()); - } + heap.push(top); } }; diff --git a/db/multi_cf_iterator_test.cc b/db/multi_cf_iterator_test.cc index 208a19f1cad2..57d0f9d39266 100644 --- a/db/multi_cf_iterator_test.cc +++ b/db/multi_cf_iterator_test.cc @@ -10,15 +10,10 @@ namespace ROCKSDB_NAMESPACE { -class CoalescingIteratorTest - : public DBTestBase, - public testing::WithParamInterface { +class CoalescingIteratorTest : public DBTestBase { public: CoalescingIteratorTest() - : DBTestBase("coalescing_iterator_test", /*env_do_fsync=*/true) { - coalesing_options_.rule = GetParam(); - } - CoalescingOptions coalesing_options_; + : DBTestBase("coalescing_iterator_test", /*env_do_fsync=*/true) {} // Verify Iteration of CoalescingIterator // by SeekToFirst() + Next() and SeekToLast() + Prev() @@ -29,7 +24,7 @@ class CoalescingIteratorTest expected_wide_columns = std::nullopt) { int i = 0; std::unique_ptr iter = - db_->NewCoalescingIterator(ReadOptions(), cfhs, coalesing_options_); + db_->NewCoalescingIterator(ReadOptions(), cfhs); for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { ASSERT_EQ(expected_keys[i], iter->key()); ASSERT_EQ(expected_values[i], iter->value()); @@ -67,25 +62,20 @@ class CoalescingIteratorTest } }; -INSTANTIATE_TEST_CASE_P( - CoalescingIteratorTest, CoalescingIteratorTest, - testing::Values(CoalescingRule::kChooseFromFirstCfContainingKey)); - -TEST_P(CoalescingIteratorTest, InvalidArguments) { +TEST_F(CoalescingIteratorTest, InvalidArguments) { Options options = GetDefaultOptions(); { CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options); // Invalid - No CF is provided std::unique_ptr iter_with_no_cf = - db_->NewCoalescingIterator(ReadOptions(), {}, coalesing_options_); + db_->NewCoalescingIterator(ReadOptions(), {}); ASSERT_NOK(iter_with_no_cf->status()); ASSERT_TRUE(iter_with_no_cf->status().IsInvalidArgument()); } } -// TODO - generate expected values and verify per coalesing_options_ -TEST_P(CoalescingIteratorTest, SimpleValues) { +TEST_F(CoalescingIteratorTest, SimpleValues) { Options options = GetDefaultOptions(); { // Case 1: Unique key per CF @@ -116,8 +106,8 @@ TEST_P(CoalescingIteratorTest, SimpleValues) { // Verify Seek() { - std::unique_ptr iter = db_->NewCoalescingIterator( - ReadOptions(), cfhs_order_0_1_2_3, coalesing_options_); + std::unique_ptr iter = + db_->NewCoalescingIterator(ReadOptions(), cfhs_order_0_1_2_3); iter->Seek(""); ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_0_val"); iter->Seek("key_1"); @@ -131,8 +121,8 @@ TEST_P(CoalescingIteratorTest, SimpleValues) { } // Verify SeekForPrev() { - std::unique_ptr iter = db_->NewCoalescingIterator( - ReadOptions(), cfhs_order_0_1_2_3, coalesing_options_); + std::unique_ptr iter = + db_->NewCoalescingIterator(ReadOptions(), cfhs_order_0_1_2_3); iter->SeekForPrev(""); ASSERT_EQ(IterStatus(iter.get()), "(invalid)"); iter->SeekForPrev("key_1"); @@ -168,58 +158,58 @@ TEST_P(CoalescingIteratorTest, SimpleValues) { // Test for iteration over CFs default->1->2->3 std::vector cfhs_order_0_1_2_3 = { handles_[0], handles_[1], handles_[2], handles_[3]}; - std::vector expected_values = {"key_1_cf_0_val", "key_2_cf_1_val", - "key_3_cf_0_val"}; + std::vector expected_values = {"key_1_cf_3_val", "key_2_cf_2_val", + "key_3_cf_3_val"}; verifyCoalescingIterator(cfhs_order_0_1_2_3, expected_keys, expected_values); // Test for iteration over CFs 3->2->default_cf->1 std::vector cfhs_order_3_2_0_1 = { handles_[3], handles_[2], handles_[0], handles_[1]}; - expected_values = {"key_1_cf_3_val", "key_2_cf_2_val", "key_3_cf_3_val"}; + expected_values = {"key_1_cf_0_val", "key_2_cf_1_val", "key_3_cf_1_val"}; verifyCoalescingIterator(cfhs_order_3_2_0_1, expected_keys, expected_values); // Verify Seek() { - std::unique_ptr iter = db_->NewCoalescingIterator( - ReadOptions(), cfhs_order_3_2_0_1, coalesing_options_); + std::unique_ptr iter = + db_->NewCoalescingIterator(ReadOptions(), cfhs_order_3_2_0_1); iter->Seek(""); - ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_3_val"); + ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_0_val"); iter->Seek("key_1"); - ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_3_val"); + ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_0_val"); iter->Seek("key_2"); - ASSERT_EQ(IterStatus(iter.get()), "key_2->key_2_cf_2_val"); + ASSERT_EQ(IterStatus(iter.get()), "key_2->key_2_cf_1_val"); iter->Next(); - ASSERT_EQ(IterStatus(iter.get()), "key_3->key_3_cf_3_val"); + ASSERT_EQ(IterStatus(iter.get()), "key_3->key_3_cf_1_val"); iter->Seek("key_x"); ASSERT_EQ(IterStatus(iter.get()), "(invalid)"); } // Verify SeekForPrev() { - std::unique_ptr iter = db_->NewCoalescingIterator( - ReadOptions(), cfhs_order_3_2_0_1, coalesing_options_); + std::unique_ptr iter = + db_->NewCoalescingIterator(ReadOptions(), cfhs_order_3_2_0_1); iter->SeekForPrev(""); ASSERT_EQ(IterStatus(iter.get()), "(invalid)"); iter->SeekForPrev("key_1"); - ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_3_val"); + ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_0_val"); iter->Next(); - ASSERT_EQ(IterStatus(iter.get()), "key_2->key_2_cf_2_val"); + ASSERT_EQ(IterStatus(iter.get()), "key_2->key_2_cf_1_val"); iter->SeekForPrev("key_x"); - ASSERT_EQ(IterStatus(iter.get()), "key_3->key_3_cf_3_val"); + ASSERT_EQ(IterStatus(iter.get()), "key_3->key_3_cf_1_val"); iter->Next(); ASSERT_EQ(IterStatus(iter.get()), "(invalid)"); } } } -TEST_P(CoalescingIteratorTest, EmptyCfs) { +TEST_F(CoalescingIteratorTest, EmptyCfs) { Options options = GetDefaultOptions(); { // Case 1: No keys in any of the CFs CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options); std::unique_ptr iter = - db_->NewCoalescingIterator(ReadOptions(), handles_, coalesing_options_); + db_->NewCoalescingIterator(ReadOptions(), handles_); iter->SeekToFirst(); ASSERT_EQ(IterStatus(iter.get()), "(invalid)"); iter->SeekToLast(); @@ -235,7 +225,7 @@ TEST_P(CoalescingIteratorTest, EmptyCfs) { // Case 2: A single key exists in only one of the CF. Rest CFs are empty. ASSERT_OK(Put(1, "key_1", "key_1_cf_1_val")); std::unique_ptr iter = - db_->NewCoalescingIterator(ReadOptions(), handles_, coalesing_options_); + db_->NewCoalescingIterator(ReadOptions(), handles_); iter->SeekToFirst(); ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_1_val"); iter->Next(); @@ -249,22 +239,21 @@ TEST_P(CoalescingIteratorTest, EmptyCfs) { // Case 3: same key exists in all of the CFs except one (cf_2) ASSERT_OK(Put(0, "key_1", "key_1_cf_0_val")); ASSERT_OK(Put(3, "key_1", "key_1_cf_3_val")); - // handles_ are in the order of 0->1->2->3. We should expect value from cf_0 + // handles_ are in the order of 0->1->2->3 std::unique_ptr iter = - db_->NewCoalescingIterator(ReadOptions(), handles_, coalesing_options_); + db_->NewCoalescingIterator(ReadOptions(), handles_); iter->SeekToFirst(); - ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_0_val"); + ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_3_val"); iter->Next(); ASSERT_EQ(IterStatus(iter.get()), "(invalid)"); iter->SeekToLast(); - ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_0_val"); + ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_3_val"); iter->Prev(); ASSERT_EQ(IterStatus(iter.get()), "(invalid)"); } } -// TODO - generate expected values and verify per coalesing_options_ -TEST_P(CoalescingIteratorTest, WideColumns) { +TEST_F(CoalescingIteratorTest, WideColumns) { // Set up the DB and Column Families Options options = GetDefaultOptions(); CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options); @@ -273,30 +262,65 @@ TEST_P(CoalescingIteratorTest, WideColumns) { WideColumns key_1_columns_in_cf_2{ {kDefaultWideColumnName, "cf_2_col_val_0_key_1"}, {"cf_2_col_name_1", "cf_2_col_val_1_key_1"}, - {"cf_2_col_name_2", "cf_2_col_val_2_key_1"}}; + {"cf_2_col_name_2", "cf_2_col_val_2_key_1"}, + {"cf_overlap_col_name", "cf_2_overlap_value_key_1"}}; WideColumns key_1_columns_in_cf_3{ {"cf_3_col_name_1", "cf_3_col_val_1_key_1"}, {"cf_3_col_name_2", "cf_3_col_val_2_key_1"}, - {"cf_3_col_name_3", "cf_3_col_val_3_key_1"}}; + {"cf_3_col_name_3", "cf_3_col_val_3_key_1"}, + {"cf_overlap_col_name", "cf_3_overlap_value_key_1"}}; + WideColumns key_1_expected_columns_cfh_order_2_3{ + {kDefaultWideColumnName, "cf_2_col_val_0_key_1"}, + {"cf_2_col_name_1", "cf_2_col_val_1_key_1"}, + {"cf_2_col_name_2", "cf_2_col_val_2_key_1"}, + {"cf_3_col_name_1", "cf_3_col_val_1_key_1"}, + {"cf_3_col_name_2", "cf_3_col_val_2_key_1"}, + {"cf_3_col_name_3", "cf_3_col_val_3_key_1"}, + {"cf_overlap_col_name", "cf_3_overlap_value_key_1"}}; + WideColumns key_1_expected_columns_cfh_order_3_2{ + {kDefaultWideColumnName, "cf_2_col_val_0_key_1"}, + {"cf_2_col_name_1", "cf_2_col_val_1_key_1"}, + {"cf_2_col_name_2", "cf_2_col_val_2_key_1"}, + {"cf_3_col_name_1", "cf_3_col_val_1_key_1"}, + {"cf_3_col_name_2", "cf_3_col_val_2_key_1"}, + {"cf_3_col_name_3", "cf_3_col_val_3_key_1"}, + {"cf_overlap_col_name", "cf_2_overlap_value_key_1"}}; constexpr char key_2[] = "key_2"; WideColumns key_2_columns_in_cf_1{ - {"cf_1_col_name_1", "cf_1_col_val_1_key_2"}}; + {"cf_overlap_col_name", "cf_1_overlap_value_key_2"}}; WideColumns key_2_columns_in_cf_2{ {"cf_2_col_name_1", "cf_2_col_val_1_key_2"}, - {"cf_2_col_name_2", "cf_2_col_val_2_key_2"}}; + {"cf_2_col_name_2", "cf_2_col_val_2_key_2"}, + {"cf_overlap_col_name", "cf_2_overlap_value_key_2"}}; + WideColumns key_2_expected_columns_cfh_order_1_2{ + {"cf_2_col_name_1", "cf_2_col_val_1_key_2"}, + {"cf_2_col_name_2", "cf_2_col_val_2_key_2"}, + {"cf_overlap_col_name", "cf_2_overlap_value_key_2"}}; + WideColumns key_2_expected_columns_cfh_order_2_1{ + {"cf_2_col_name_1", "cf_2_col_val_1_key_2"}, + {"cf_2_col_name_2", "cf_2_col_val_2_key_2"}, + {"cf_overlap_col_name", "cf_1_overlap_value_key_2"}}; constexpr char key_3[] = "key_3"; WideColumns key_3_columns_in_cf_1{ {"cf_1_col_name_1", "cf_1_col_val_1_key_3"}}; WideColumns key_3_columns_in_cf_3{ {"cf_3_col_name_1", "cf_3_col_val_1_key_3"}}; + WideColumns key_3_expected_columns{ + {"cf_1_col_name_1", "cf_1_col_val_1_key_3"}, + {"cf_3_col_name_1", "cf_3_col_val_1_key_3"}, + }; constexpr char key_4[] = "key_4"; WideColumns key_4_columns_in_cf_0{ {"cf_0_col_name_1", "cf_0_col_val_1_key_4"}}; WideColumns key_4_columns_in_cf_2{ {"cf_2_col_name_1", "cf_2_col_val_1_key_4"}}; + WideColumns key_4_expected_columns{ + {"cf_0_col_name_1", "cf_0_col_val_1_key_4"}, + {"cf_2_col_name_1", "cf_2_col_val_1_key_4"}, + }; // Use AttributeGroup PutEntity API to insert them together AttributeGroups key_1_attribute_groups{ @@ -317,29 +341,45 @@ TEST_P(CoalescingIteratorTest, WideColumns) { ASSERT_OK(db_->PutEntity(WriteOptions(), key_3, key_3_attribute_groups)); ASSERT_OK(db_->PutEntity(WriteOptions(), key_4, key_4_attribute_groups)); - // Test for iteration over CF default->1->2->3 - std::vector cfhs_order_0_1_2_3 = { - handles_[0], handles_[1], handles_[2], handles_[3]}; + // Keys should be returned in order regardless of cfh order std::vector expected_keys = {key_1, key_2, key_3, key_4}; - // Pick what DBIter would return for value() in the first CF that key exists + // Since value for kDefaultWideColumnName only exists for key_1, rest will - // return empty value + // return empty value after coalesced std::vector expected_values = {"cf_2_col_val_0_key_1", "", "", ""}; - // Pick columns from the first CF that the key exists and value is stored as - // wide column - std::vector expected_wide_columns = { - {{kDefaultWideColumnName, "cf_2_col_val_0_key_1"}, - {"cf_2_col_name_1", "cf_2_col_val_1_key_1"}, - {"cf_2_col_name_2", "cf_2_col_val_2_key_1"}}, - {{"cf_1_col_name_1", "cf_1_col_val_1_key_2"}}, - {{"cf_1_col_name_1", "cf_1_col_val_1_key_3"}}, - {{"cf_0_col_name_1", "cf_0_col_val_1_key_4"}}}; - verifyCoalescingIterator(cfhs_order_0_1_2_3, expected_keys, expected_values, - expected_wide_columns); + // Test for iteration over CF default->1->2->3 + { + std::vector cfhs_order_0_1_2_3 = { + handles_[0], handles_[1], handles_[2], handles_[3]}; + + // Coalesced columns + std::vector expected_wide_columns_0_1_2_3 = { + key_1_expected_columns_cfh_order_2_3, + key_2_expected_columns_cfh_order_1_2, key_3_expected_columns, + key_4_expected_columns}; + + verifyCoalescingIterator(cfhs_order_0_1_2_3, expected_keys, expected_values, + expected_wide_columns_0_1_2_3); + } + + // Test for iteration over CF 3->2->default->1 + { + std::vector cfhs_order_3_2_0_1 = { + handles_[3], handles_[2], handles_[0], handles_[1]}; + + // Coalesced columns + std::vector expected_wide_columns_3_2_0_1 = { + key_1_expected_columns_cfh_order_3_2, + key_2_expected_columns_cfh_order_2_1, key_3_expected_columns, + key_4_expected_columns}; + + verifyCoalescingIterator(cfhs_order_3_2_0_1, expected_keys, expected_values, + expected_wide_columns_3_2_0_1); + } } -TEST_P(CoalescingIteratorTest, DifferentComparatorsInMultiCFs) { +TEST_F(CoalescingIteratorTest, DifferentComparatorsInMultiCFs) { // This test creates two column families with two different comparators. // Attempting to create the MultiCFIterator should fail. Options options = GetDefaultOptions(); @@ -361,12 +401,12 @@ TEST_P(CoalescingIteratorTest, DifferentComparatorsInMultiCFs) { verifyExpectedKeys(handles_[1], {"key_3", "key_2", "key_1"}); std::unique_ptr iter = - db_->NewCoalescingIterator(ReadOptions(), handles_, coalesing_options_); + db_->NewCoalescingIterator(ReadOptions(), handles_); ASSERT_NOK(iter->status()); ASSERT_TRUE(iter->status().IsInvalidArgument()); } -TEST_P(CoalescingIteratorTest, CustomComparatorsInMultiCFs) { +TEST_F(CoalescingIteratorTest, CustomComparatorsInMultiCFs) { // This test creates two column families with the same custom test // comparators (but instantiated independently). Attempting to create the // MultiCFIterator should not fail. @@ -409,12 +449,12 @@ TEST_P(CoalescingIteratorTest, CustomComparatorsInMultiCFs) { std::vector expected_keys = { "key_001_003", "key_001_002", "key_001_001", "key_002_003", "key_002_002", "key_002_001", "key_003_006", "key_003_005", "key_003_004"}; - std::vector expected_values = {"value_0_1", "value_0_2", "value_0_3", + std::vector expected_values = {"value_1_1", "value_1_2", "value_1_3", "value_0_4", "value_0_5", "value_0_6", "value_1_4", "value_1_5", "value_1_6"}; int i = 0; std::unique_ptr iter = - db_->NewCoalescingIterator(ReadOptions(), handles_, coalesing_options_); + db_->NewCoalescingIterator(ReadOptions(), handles_); for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { ASSERT_EQ(expected_keys[i], iter->key()); ASSERT_EQ(expected_values[i], iter->value()); diff --git a/include/rocksdb/attribute_groups.h b/include/rocksdb/attribute_groups.h index ae240369579f..23747a811d02 100644 --- a/include/rocksdb/attribute_groups.h +++ b/include/rocksdb/attribute_groups.h @@ -5,16 +5,7 @@ #pragma once -#include -#include -#include -#include - -#include "rocksdb/comparator.h" #include "rocksdb/iterator_base.h" -#include "rocksdb/rocksdb_namespace.h" -#include "rocksdb/slice.h" -#include "rocksdb/status.h" #include "rocksdb/wide_columns.h" namespace ROCKSDB_NAMESPACE { @@ -98,7 +89,7 @@ class AttributeGroupIterator : public IteratorBase { AttributeGroupIterator(const AttributeGroupIterator&) = delete; AttributeGroupIterator& operator=(const AttributeGroupIterator&) = delete; - virtual AttributeGroups attribute_groups() const = 0; + virtual const AttributeGroups& attribute_groups() const = 0; }; } // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 49b2215bb4b0..73f9283deb21 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -974,13 +974,21 @@ class DB { // UNDER CONSTRUCTION - DO NOT USE // Return a cross-column-family iterator from a consistent database state. - // If a key exists in more than one column family, it chooses value/columns - // based on the coalescing rule provided by CoalescingOptions. See - // CoalescingOptions in options.h for details + // + // If a key exists in more than one column family, value() will be determined + // by the wide column value of kDefaultColumnName after coalesced as described + // below. + // + // Each wide column will be independently shadowed by the CFs. + // For example, if CF1 has "key_1" ==> {"col_1": "foo", + // "col_2", "baz"} and CF2 has "key_1" ==> {"col_2": "quux", "col_3", "bla"}, + // and when the iterator is at key_1, columns() will return + // {"col_1": "foo", "col_2", "quux", "col_3", "bla"} + // In this example, value() will be empty, because none of them have values + // for kDefaultColumnName virtual std::unique_ptr NewCoalescingIterator( const ReadOptions& options, - const std::vector& column_families, - const CoalescingOptions& coalescing_options) = 0; + const std::vector& column_families) = 0; // UNDER CONSTRUCTION - DO NOT USE // A cross-column-family iterator that collects and returns attribute groups diff --git a/include/rocksdb/iterator_base.h b/include/rocksdb/iterator_base.h index 017e58bbe3a2..036936e2b674 100644 --- a/include/rocksdb/iterator_base.h +++ b/include/rocksdb/iterator_base.h @@ -5,8 +5,6 @@ #pragma once -#include - #include "rocksdb/cleanable.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index dde8182bbd1c..3c3fcf3fecee 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -2226,24 +2226,4 @@ struct WaitForCompactOptions { std::chrono::microseconds timeout = std::chrono::microseconds::zero(); }; -// Options for Coalescing Iterator -enum class CoalescingRule { - // Choose the value or columns from the first column family that contains - // the key. CF is selected based on the order of column_families provided - // during the creation of the iterator. - // e.g. if 'foo ==> {"col_1": "v1", "col_2": "v2"}' and 'foobar ==> - // {"col_1": "vx"}' are in CF1, and 'foo ==> {"col_1": "v3", "col_4": "v4"} - // in CF2, and the iterator is currently at 'foo' with column_families - // parameter provided as [CF1,CF2] the columns() function will return - // {"col_1": "v1", "col_2": "v2"}, and the Next() function will bypass the - // key 'foo' in CF2 and proceed to 'foobar'. - // If the column_families parameter is provided as [CF2,CF1], then when at - // 'foo', the columns() function will return {"col_1": "v3", "col_4": "v4"} - kChooseFromFirstCfContainingKey, -}; - -struct CoalescingOptions { - CoalescingRule rule = CoalescingRule::kChooseFromFirstCfContainingKey; -}; - } // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index ea475c702363..e63d246c2634 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -262,10 +262,8 @@ class StackableDB : public DB { using DB::NewCoalescingIterator; std::unique_ptr NewCoalescingIterator( const ReadOptions& options, - const std::vector& column_families, - const CoalescingOptions& coalescing_options) override { - return db_->NewCoalescingIterator(options, column_families, - coalescing_options); + const std::vector& column_families) override { + return db_->NewCoalescingIterator(options, column_families); } using DB::NewAttributeGroupIterator;