Skip to content

Commit

Permalink
MultiCfIterator Reverse Direction Impl
Browse files Browse the repository at this point in the history
  • Loading branch information
jaykorean committed Mar 12, 2024
1 parent cf2dfc3 commit e7ceef4
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 82 deletions.
83 changes: 65 additions & 18 deletions db/multi_cf_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,44 +9,91 @@

namespace ROCKSDB_NAMESPACE {

void MultiCfIterator::SeekToFirst() {
SeekCommon([](Iterator* iter) { iter->SeekToFirst(); });
}
void MultiCfIterator::Seek(const Slice& target) {
SeekCommon([&target](Iterator* iter) { iter->Seek(target); });
void MultiCfIterator::SeekCommon(
const std::function<void(Iterator*)>& child_seek_func,
Direction direction) {
Reset();
direction_ = direction;
int i = 0;
for (auto& cfh_iter_pair : cfh_iter_pairs_) {
auto& cfh = cfh_iter_pair.first;
auto& iter = cfh_iter_pair.second;
child_seek_func(iter.get());
if (iter->Valid()) {
assert(iter->status().ok());
if (direction_ == kReverse) {
max_heap_.push(MultiCfIteratorInfo{iter.get(), cfh, i});
} else {
min_heap_.push(MultiCfIteratorInfo{iter.get(), cfh, i});
}
} else {
considerStatus(iter->status());
}
++i;
}
}

void MultiCfIterator::Next() {
assert(Valid());
template <typename BinaryHeap>
void MultiCfIterator::AdvanceIterator(
BinaryHeap& heap, const std::function<void(Iterator*)>& advance_func) {
// 1. Keep the top iterator (by popping it from the heap)
// 2. Make sure all others have iterated past the top iterator key slice
// 3. Advance the top iterator, and add it back to the heap if valid
auto top = min_heap_.top();
min_heap_.pop();
if (!min_heap_.empty()) {
auto* current = min_heap_.top().iterator;
auto top = heap.top();
heap.pop();
if (!heap.empty()) {
auto* current = heap.top().iterator;
while (current->Valid() &&
comparator_->Compare(top.iterator->key(), current->key()) == 0) {
assert(current->status().ok());
current->Next();
advance_func(current);
if (current->Valid()) {
min_heap_.replace_top(min_heap_.top());
heap.replace_top(heap.top());
} else {
considerStatus(current->status());
min_heap_.pop();
heap.pop();
}
if (!min_heap_.empty()) {
current = min_heap_.top().iterator;
if (!heap.empty()) {
current = heap.top().iterator;
}
}
}
top.iterator->Next();
advance_func(top.iterator);
if (top.iterator->Valid()) {
assert(top.iterator->status().ok());
min_heap_.push(top);
heap.push(top);
} else {
considerStatus(top.iterator->status());
}
}

void MultiCfIterator::SeekToFirst() {
SeekCommon([](Iterator* iter) { iter->SeekToFirst(); }, kForward);
}
void MultiCfIterator::Seek(const Slice& target) {
SeekCommon([&target](Iterator* iter) { iter->Seek(target); }, kForward);
}
void MultiCfIterator::SeekToLast() {
SeekCommon([](Iterator* iter) { iter->SeekToLast(); }, kReverse);
}
void MultiCfIterator::SeekForPrev(const Slice& target) {
SeekCommon([&target](Iterator* iter) { iter->SeekForPrev(target); },
kReverse);
}

void MultiCfIterator::Next() {
assert(Valid());
if (direction_ != kForward) {
SwitchToDirection(kForward);
}
AdvanceIterator(min_heap_, [](Iterator* iter) { iter->Next(); });
}
void MultiCfIterator::Prev() {
assert(Valid());
if (direction_ != kReverse) {
SwitchToDirection(kReverse);
}
AdvanceIterator(max_heap_, [](Iterator* iter) { iter->Prev(); });
}

} // namespace ROCKSDB_NAMESPACE
100 changes: 69 additions & 31 deletions db/multi_cf_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ class MultiCfIterator : public Iterator {
const std::vector<ColumnFamilyHandle*>& column_families,
const std::vector<Iterator*>& child_iterators)
: comparator_(comparator),
min_heap_(MultiCfMinHeapItemComparator(comparator_)) {
min_heap_(MultiCfMinHeapItemComparator(comparator_)),
max_heap_(MultiCfMaxHeapItemComparator(comparator_)) {
assert(column_families.size() > 0 &&
column_families.size() == child_iterators.size());
cfh_iter_pairs_.reserve(column_families.size());
Expand Down Expand Up @@ -72,18 +73,65 @@ class MultiCfIterator : public Iterator {
const Comparator* comparator_;
};

class MultiCfMaxHeapItemComparator {
public:
explicit MultiCfMaxHeapItemComparator(const Comparator* comparator)
: comparator_(comparator) {}

bool operator()(const MultiCfIteratorInfo& a,
const MultiCfIteratorInfo& b) const {
assert(a.iterator);
assert(b.iterator);
assert(a.iterator->Valid());
assert(b.iterator->Valid());
int c = comparator_->Compare(a.iterator->key(), b.iterator->key());
assert(c != 0 || a.order != b.order);
return c == 0 ? a.order - b.order > 0 : c < 0;
}

private:
const Comparator* comparator_;
};

const Comparator* comparator_;
using MultiCfMinHeap =
BinaryHeap<MultiCfIteratorInfo, MultiCfMinHeapItemComparator>;
using MultiCfMaxHeap =
BinaryHeap<MultiCfIteratorInfo, MultiCfMaxHeapItemComparator>;
MultiCfMinHeap min_heap_;
// TODO: MaxHeap for Reverse Iteration
MultiCfMaxHeap max_heap_;

enum Direction : uint8_t { kForward, kReverse };
Direction direction_ = kForward;

// TODO: Lower and Upper bounds

Iterator* current() const {
if (direction_ == kReverse) {
return max_heap_.top().iterator;
}
return min_heap_.top().iterator;
}

Slice key() const override {
assert(Valid());
return min_heap_.top().iterator->key();
return current()->key();
}
Slice value() const override {
assert(Valid());
return current()->value();
}
const WideColumns& columns() const override {
assert(Valid());
return current()->columns();
}

bool Valid() const override {
if (direction_ == kReverse) {
return !max_heap_.empty() && status_.ok();
}
return !min_heap_.empty() && status_.ok();
}
bool Valid() const override { return !min_heap_.empty() && status_.ok(); }
Status status() const override { return status_; }
void considerStatus(Status s) {
if (!s.ok() && status_.ok()) {
Expand All @@ -92,42 +140,32 @@ class MultiCfIterator : public Iterator {
}
void Reset() {
min_heap_.clear();
max_heap_.clear();
status_ = Status::OK();
}

void SeekCommon(const std::function<void(Iterator*)>& child_seek_func) {
Reset();
int i = 0;
for (auto& cfh_iter_pair : cfh_iter_pairs_) {
auto& cfh = cfh_iter_pair.first;
auto& iter = cfh_iter_pair.second;
child_seek_func(iter.get());
if (iter->Valid()) {
assert(iter->status().ok());
min_heap_.push(MultiCfIteratorInfo{iter.get(), cfh, i});
} else {
considerStatus(iter->status());
}
++i;
void SwitchToDirection(Direction new_direction) {
assert(direction_ != new_direction);
Slice target = key();
if (new_direction == kForward) {
Seek(target);
} else {
SeekForPrev(target);
}
}

void SeekCommon(const std::function<void(Iterator*)>& child_seek_func,
Direction direction);
template <typename BinaryHeap>
void AdvanceIterator(BinaryHeap& heap,
const std::function<void(Iterator*)>& advance_func);

void SeekToFirst() override;
void SeekToLast() override;
void Seek(const Slice& /*target*/) override;
void SeekForPrev(const Slice& /*target*/) override;
void Next() override;

// TODO - Implement these
void SeekToLast() override {}
void SeekForPrev(const Slice& /*target*/) override {}
void Prev() override { assert(false); }
Slice value() const override {
assert(Valid());
return min_heap_.top().iterator->value();
}
const WideColumns& columns() const override {
assert(Valid());
return min_heap_.top().iterator->columns();
}
void Prev() override;
};

} // namespace ROCKSDB_NAMESPACE
Loading

0 comments on commit e7ceef4

Please sign in to comment.