diff --git a/db/multi_cf_iterator.cc b/db/multi_cf_iterator.cc index ce0967257ed9..883b132db724 100644 --- a/db/multi_cf_iterator.cc +++ b/db/multi_cf_iterator.cc @@ -9,44 +9,91 @@ namespace ROCKSDB_NAMESPACE { -void MultiCfIterator::SeekToFirst() { - SeekCommon([](Iterator* iter) { iter->SeekToFirst(); }); -} -void MultiCfIterator::Seek(const Slice& target) { - SeekCommon([&target](Iterator* iter) { iter->Seek(target); }); +void MultiCfIterator::SeekCommon( + const std::function& child_seek_func, + Direction direction) { + Reset(); + direction_ = direction; + int i = 0; + for (auto& cfh_iter_pair : cfh_iter_pairs_) { + auto& cfh = cfh_iter_pair.first; + auto& iter = cfh_iter_pair.second; + child_seek_func(iter.get()); + if (iter->Valid()) { + assert(iter->status().ok()); + if (direction_ == kReverse) { + max_heap_.push(MultiCfIteratorInfo{iter.get(), cfh, i}); + } else { + min_heap_.push(MultiCfIteratorInfo{iter.get(), cfh, i}); + } + } else { + considerStatus(iter->status()); + } + ++i; + } } -void MultiCfIterator::Next() { - assert(Valid()); +template +void MultiCfIterator::AdvanceIterator( + BinaryHeap& heap, const std::function& advance_func) { // 1. Keep the top iterator (by popping it from the heap) // 2. Make sure all others have iterated past the top iterator key slice // 3. Advance the top iterator, and add it back to the heap if valid - auto top = min_heap_.top(); - min_heap_.pop(); - if (!min_heap_.empty()) { - auto* current = min_heap_.top().iterator; + auto top = heap.top(); + heap.pop(); + if (!heap.empty()) { + auto* current = heap.top().iterator; while (current->Valid() && comparator_->Compare(top.iterator->key(), current->key()) == 0) { assert(current->status().ok()); - current->Next(); + advance_func(current); if (current->Valid()) { - min_heap_.replace_top(min_heap_.top()); + heap.replace_top(heap.top()); } else { considerStatus(current->status()); - min_heap_.pop(); + heap.pop(); } - if (!min_heap_.empty()) { - current = min_heap_.top().iterator; + if (!heap.empty()) { + current = heap.top().iterator; } } } - top.iterator->Next(); + advance_func(top.iterator); if (top.iterator->Valid()) { assert(top.iterator->status().ok()); - min_heap_.push(top); + heap.push(top); } else { considerStatus(top.iterator->status()); } } +void MultiCfIterator::SeekToFirst() { + SeekCommon([](Iterator* iter) { iter->SeekToFirst(); }, kForward); +} +void MultiCfIterator::Seek(const Slice& target) { + SeekCommon([&target](Iterator* iter) { iter->Seek(target); }, kForward); +} +void MultiCfIterator::SeekToLast() { + SeekCommon([](Iterator* iter) { iter->SeekToLast(); }, kReverse); +} +void MultiCfIterator::SeekForPrev(const Slice& target) { + SeekCommon([&target](Iterator* iter) { iter->SeekForPrev(target); }, + kReverse); +} + +void MultiCfIterator::Next() { + assert(Valid()); + if (direction_ != kForward) { + SwitchToDirection(kForward); + } + AdvanceIterator(min_heap_, [](Iterator* iter) { iter->Next(); }); +} +void MultiCfIterator::Prev() { + assert(Valid()); + if (direction_ != kReverse) { + SwitchToDirection(kReverse); + } + AdvanceIterator(max_heap_, [](Iterator* iter) { iter->Prev(); }); +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/multi_cf_iterator.h b/db/multi_cf_iterator.h index 88090242bc2c..203d5aa89ba0 100644 --- a/db/multi_cf_iterator.h +++ b/db/multi_cf_iterator.h @@ -23,7 +23,8 @@ class MultiCfIterator : public Iterator { const std::vector& column_families, const std::vector& child_iterators) : comparator_(comparator), - min_heap_(MultiCfMinHeapItemComparator(comparator_)) { + min_heap_(MultiCfMinHeapItemComparator(comparator_)), + max_heap_(MultiCfMaxHeapItemComparator(comparator_)) { assert(column_families.size() > 0 && column_families.size() == child_iterators.size()); cfh_iter_pairs_.reserve(column_families.size()); @@ -72,18 +73,65 @@ class MultiCfIterator : public Iterator { const Comparator* comparator_; }; + class MultiCfMaxHeapItemComparator { + public: + explicit MultiCfMaxHeapItemComparator(const Comparator* comparator) + : comparator_(comparator) {} + + bool operator()(const MultiCfIteratorInfo& a, + const MultiCfIteratorInfo& b) const { + assert(a.iterator); + assert(b.iterator); + assert(a.iterator->Valid()); + assert(b.iterator->Valid()); + int c = comparator_->Compare(a.iterator->key(), b.iterator->key()); + assert(c != 0 || a.order != b.order); + return c == 0 ? a.order - b.order > 0 : c < 0; + } + + private: + const Comparator* comparator_; + }; + const Comparator* comparator_; using MultiCfMinHeap = BinaryHeap; + using MultiCfMaxHeap = + BinaryHeap; MultiCfMinHeap min_heap_; - // TODO: MaxHeap for Reverse Iteration + MultiCfMaxHeap max_heap_; + + enum Direction : uint8_t { kForward, kReverse }; + Direction direction_ = kForward; + // TODO: Lower and Upper bounds + Iterator* current() const { + if (direction_ == kReverse) { + return max_heap_.top().iterator; + } + return min_heap_.top().iterator; + } + Slice key() const override { assert(Valid()); - return min_heap_.top().iterator->key(); + return current()->key(); + } + Slice value() const override { + assert(Valid()); + return current()->value(); + } + const WideColumns& columns() const override { + assert(Valid()); + return current()->columns(); + } + + bool Valid() const override { + if (direction_ == kReverse) { + return !max_heap_.empty() && status_.ok(); + } + return !min_heap_.empty() && status_.ok(); } - bool Valid() const override { return !min_heap_.empty() && status_.ok(); } Status status() const override { return status_; } void considerStatus(Status s) { if (!s.ok() && status_.ok()) { @@ -92,42 +140,32 @@ class MultiCfIterator : public Iterator { } void Reset() { min_heap_.clear(); + max_heap_.clear(); status_ = Status::OK(); } - void SeekCommon(const std::function& child_seek_func) { - Reset(); - int i = 0; - for (auto& cfh_iter_pair : cfh_iter_pairs_) { - auto& cfh = cfh_iter_pair.first; - auto& iter = cfh_iter_pair.second; - child_seek_func(iter.get()); - if (iter->Valid()) { - assert(iter->status().ok()); - min_heap_.push(MultiCfIteratorInfo{iter.get(), cfh, i}); - } else { - considerStatus(iter->status()); - } - ++i; + void SwitchToDirection(Direction new_direction) { + assert(direction_ != new_direction); + Slice target = key(); + if (new_direction == kForward) { + Seek(target); + } else { + SeekForPrev(target); } } + void SeekCommon(const std::function& child_seek_func, + Direction direction); + template + void AdvanceIterator(BinaryHeap& heap, + const std::function& advance_func); + void SeekToFirst() override; + void SeekToLast() override; void Seek(const Slice& /*target*/) override; + void SeekForPrev(const Slice& /*target*/) override; void Next() override; - - // TODO - Implement these - void SeekToLast() override {} - void SeekForPrev(const Slice& /*target*/) override {} - void Prev() override { assert(false); } - Slice value() const override { - assert(Valid()); - return min_heap_.top().iterator->value(); - } - const WideColumns& columns() const override { - assert(Valid()); - return min_heap_.top().iterator->columns(); - } + void Prev() override; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/multi_cf_iterator_test.cc b/db/multi_cf_iterator_test.cc index 343e16b64c98..4aa0bfedf22d 100644 --- a/db/multi_cf_iterator_test.cc +++ b/db/multi_cf_iterator_test.cc @@ -14,7 +14,8 @@ class MultiCfIteratorTest : public DBTestBase { MultiCfIteratorTest() : DBTestBase("multi_cf_iterator_test", /*env_do_fsync=*/true) {} - // Verify Iteration of MultiCfIterator by SeekToFirst() + Next() + // Verify Iteration of MultiCfIterator + // by SeekToFirst() + Next() and SeekToLast() + Prev() void verifyMultiCfIterator( const std::vector& cfhs, const std::vector& expected_keys, @@ -39,13 +40,28 @@ class MultiCfIteratorTest : public DBTestBase { // ASSERT_EQ(expected_attribute_groups.value()[i], // iter->attribute_groups()); } - if (expected_values.has_value()) { - ASSERT_EQ(expected_values.value()[i], iter->value()); - } ++i; } ASSERT_EQ(expected_keys.size(), i); ASSERT_OK(iter->status()); + + int rev_i = i - 1; + for (iter->SeekToLast(); iter->Valid(); iter->Prev()) { + ASSERT_EQ(expected_keys[rev_i], iter->key()); + if (expected_values.has_value()) { + ASSERT_EQ(expected_values.value()[rev_i], iter->value()); + } + if (expected_wide_columns.has_value()) { + ASSERT_EQ(expected_wide_columns.value()[rev_i], iter->columns()); + } + if (expected_attribute_groups.has_value()) { + // TODO - Add this back when attribute_groups() API is added + // ASSERT_EQ(expected_attribute_groups.value()[rev_i], + // iter->attribute_groups()); + } + rev_i--; + } + ASSERT_OK(iter->status()); } void verifyExpectedKeys(ColumnFamilyHandle* cfh, @@ -103,18 +119,39 @@ TEST_F(MultiCfIteratorTest, SimpleValues) { verifyMultiCfIterator(cfhs_order_3_1_0_2, expected_keys, expected_values); // Verify Seek() - std::unique_ptr iter = - db_->NewMultiCfIterator(ReadOptions(), cfhs_order_0_1_2_3); - iter->Seek(""); - ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_0_val"); - iter->Seek("key_1"); - ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_0_val"); - iter->Seek("key_2"); - ASSERT_EQ(IterStatus(iter.get()), "key_2->key_2_cf_1_val"); - iter->Next(); - ASSERT_EQ(IterStatus(iter.get()), "key_3->key_3_cf_2_val"); - iter->Seek("key_x"); - ASSERT_EQ(IterStatus(iter.get()), "(invalid)"); + { + std::unique_ptr iter = + db_->NewMultiCfIterator(ReadOptions(), cfhs_order_0_1_2_3); + iter->Seek(""); + ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_0_val"); + iter->Seek("key_1"); + ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_0_val"); + iter->Seek("key_2"); + ASSERT_EQ(IterStatus(iter.get()), "key_2->key_2_cf_1_val"); + iter->Next(); + ASSERT_EQ(IterStatus(iter.get()), "key_3->key_3_cf_2_val"); + iter->Seek("key_x"); + ASSERT_EQ(IterStatus(iter.get()), "(invalid)"); + } + // Verify SeekForPrev() + { + std::unique_ptr iter = + db_->NewMultiCfIterator(ReadOptions(), cfhs_order_0_1_2_3); + iter->SeekForPrev(""); + ASSERT_EQ(IterStatus(iter.get()), "(invalid)"); + iter->SeekForPrev("key_1"); + ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_0_val"); + iter->Next(); + ASSERT_EQ(IterStatus(iter.get()), "key_2->key_2_cf_1_val"); + iter->SeekForPrev("key_x"); + ASSERT_EQ(IterStatus(iter.get()), "key_4->key_4_cf_3_val"); + iter->Prev(); + ASSERT_EQ(IterStatus(iter.get()), "key_3->key_3_cf_2_val"); + iter->Next(); + ASSERT_EQ(IterStatus(iter.get()), "key_4->key_4_cf_3_val"); + iter->Next(); + ASSERT_EQ(IterStatus(iter.get()), "(invalid)"); + } } { // Case 2: Same key in multiple CFs @@ -146,24 +183,40 @@ TEST_F(MultiCfIteratorTest, SimpleValues) { verifyMultiCfIterator(cfhs_order_3_2_0_1, expected_keys, expected_values); // Verify Seek() - std::unique_ptr iter = - db_->NewMultiCfIterator(ReadOptions(), cfhs_order_3_2_0_1); - iter->Seek(""); - ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_3_val"); - iter->Seek("key_1"); - ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_3_val"); - iter->Seek("key_2"); - ASSERT_EQ(IterStatus(iter.get()), "key_2->key_2_cf_2_val"); - iter->Next(); - ASSERT_EQ(IterStatus(iter.get()), "key_3->key_3_cf_3_val"); - iter->Seek("key_x"); - ASSERT_EQ(IterStatus(iter.get()), "(invalid)"); + { + std::unique_ptr iter = + db_->NewMultiCfIterator(ReadOptions(), cfhs_order_3_2_0_1); + iter->Seek(""); + ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_3_val"); + iter->Seek("key_1"); + ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_3_val"); + iter->Seek("key_2"); + ASSERT_EQ(IterStatus(iter.get()), "key_2->key_2_cf_2_val"); + iter->Next(); + ASSERT_EQ(IterStatus(iter.get()), "key_3->key_3_cf_3_val"); + iter->Seek("key_x"); + ASSERT_EQ(IterStatus(iter.get()), "(invalid)"); + } + // Verify SeekForPrev() + { + std::unique_ptr iter = + db_->NewMultiCfIterator(ReadOptions(), cfhs_order_3_2_0_1); + iter->SeekForPrev(""); + ASSERT_EQ(IterStatus(iter.get()), "(invalid)"); + iter->SeekForPrev("key_1"); + ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_3_val"); + iter->Next(); + ASSERT_EQ(IterStatus(iter.get()), "key_2->key_2_cf_2_val"); + iter->SeekForPrev("key_x"); + ASSERT_EQ(IterStatus(iter.get()), "key_3->key_3_cf_3_val"); + iter->Next(); + ASSERT_EQ(IterStatus(iter.get()), "(invalid)"); + } } } TEST_F(MultiCfIteratorTest, EmptyCfs) { Options options = GetDefaultOptions(); - { // Case 1: No keys in any of the CFs CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options); @@ -191,8 +244,8 @@ TEST_F(MultiCfIteratorTest, EmptyCfs) { ASSERT_EQ(IterStatus(iter.get()), "(invalid)"); iter->SeekToFirst(); ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_1_val"); - // iter->Prev(); - // ASSERT_EQ(IterStatus(iter.get()), "(invalid)"); + iter->Prev(); + ASSERT_EQ(IterStatus(iter.get()), "(invalid)"); } { // Case 3: same keys in all of the CFs except one (cf_2). @@ -207,8 +260,8 @@ TEST_F(MultiCfIteratorTest, EmptyCfs) { ASSERT_EQ(IterStatus(iter.get()), "(invalid)"); iter->SeekToFirst(); ASSERT_EQ(IterStatus(iter.get()), "key_1->key_1_cf_0_val"); - // iter->Prev(); - // ASSERT_EQ(IterStatus(iter.get()), "(invalid)"); + iter->Prev(); + ASSERT_EQ(IterStatus(iter.get()), "(invalid)"); } }