Skip to content

Commit

Permalink
Address Feedback and implement merge
Browse files Browse the repository at this point in the history
  • Loading branch information
jaykorean committed Mar 30, 2024
1 parent a25c3f6 commit e90568e
Show file tree
Hide file tree
Showing 14 changed files with 349 additions and 273 deletions.
15 changes: 3 additions & 12 deletions db/attribute_group_iterator_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,9 @@ namespace ROCKSDB_NAMESPACE {

const AttributeGroups kNoAttributeGroups;

bool AttributeGroupIteratorImpl::Valid() const { return impl_.Valid(); }
void AttributeGroupIteratorImpl::SeekToFirst() { impl_.SeekToFirst(); }
void AttributeGroupIteratorImpl::SeekToLast() { impl_.SeekToLast(); }
void AttributeGroupIteratorImpl::Seek(const Slice& target) {
impl_.Seek(target);
void AttributeGroupIteratorImpl::AddToAttributeGroups(
ColumnFamilyHandle* /*cfh*/, const WideColumns& /*columns*/) {
// TODO - Implement AttributeGroup population
}
void AttributeGroupIteratorImpl::SeekForPrev(const Slice& target) {
impl_.SeekForPrev(target);
}
void AttributeGroupIteratorImpl::Next() { impl_.Next(); }
void AttributeGroupIteratorImpl::Prev() { impl_.Prev(); }
Slice AttributeGroupIteratorImpl::key() const { return impl_.key(); }
Status AttributeGroupIteratorImpl::status() const { return impl_.status(); }

} // namespace ROCKSDB_NAMESPACE
46 changes: 23 additions & 23 deletions db/attribute_group_iterator_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,8 @@

#pragma once

#include <memory>
#include <variant>

#include "db/multi_cf_iterator_impl.h"
#include "rocksdb/attribute_groups.h"
#include "rocksdb/comparator.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "util/heap.h"
#include "util/overload.h"

namespace ROCKSDB_NAMESPACE {

Expand All @@ -24,32 +16,40 @@ class AttributeGroupIteratorImpl : public AttributeGroupIterator {
const Comparator* comparator,
const std::vector<ColumnFamilyHandle*>& column_families,
const std::vector<Iterator*>& child_iterators)
: impl_(comparator, column_families, child_iterators) {}
: impl_(
comparator, column_families, child_iterators, [this]() { Reset(); },
[this](ColumnFamilyHandle* cfh, Iterator* iter) {
AddToAttributeGroups(cfh, iter->columns());
}) {}
~AttributeGroupIteratorImpl() override {}

// No copy allowed
AttributeGroupIteratorImpl(const AttributeGroupIteratorImpl&) = delete;
AttributeGroupIteratorImpl& operator=(const AttributeGroupIteratorImpl&) =
delete;

bool Valid() const override;
void SeekToFirst() override;
void SeekToLast() override;
void Seek(const Slice& target) override;
void SeekForPrev(const Slice& target) override;
void Next() override;
void Prev() override;
Slice key() const override;
Status status() const override;
bool Valid() const override { return impl_.Valid(); }
void SeekToFirst() override { impl_.SeekToFirst(); }
void SeekToLast() override { impl_.SeekToLast(); }
void Seek(const Slice& target) override { impl_.Seek(target); }
void SeekForPrev(const Slice& target) override { impl_.SeekForPrev(target); }
void Next() override { impl_.Next(); }
void Prev() override { impl_.Prev(); }
Slice key() const override { return impl_.key(); }
Status status() const override { return impl_.status(); }

AttributeGroups attribute_groups() const override {
// TODO - Implement
assert(false);
return kNoAttributeGroups;
const AttributeGroups& attribute_groups() const override {
assert(Valid());
return attribute_groups_;
}

void Reset() { attribute_groups_.clear(); }

private:
MultiCfIteratorImpl impl_;
AttributeGroups attribute_groups_;
void AddToAttributeGroups(ColumnFamilyHandle* cfh,
const WideColumns& columns);
};

class EmptyAttributeGroupIterator : public AttributeGroupIterator {
Expand All @@ -68,7 +68,7 @@ class EmptyAttributeGroupIterator : public AttributeGroupIterator {
}
Status status() const override { return status_; }

AttributeGroups attribute_groups() const override {
const AttributeGroups& attribute_groups() const override {
return kNoAttributeGroups;
}

Expand Down
44 changes: 34 additions & 10 deletions db/coalescing_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,42 @@

#include "db/coalescing_iterator.h"

#include "db/wide/wide_columns_helper.h"

namespace ROCKSDB_NAMESPACE {

bool CoalescingIterator::Valid() const { return impl_.Valid(); }
void CoalescingIterator::SeekToFirst() { impl_.SeekToFirst(); }
void CoalescingIterator::SeekToLast() { impl_.SeekToLast(); }
void CoalescingIterator::Seek(const Slice& target) { impl_.Seek(target); }
void CoalescingIterator::SeekForPrev(const Slice& target) {
impl_.SeekForPrev(target);
void CoalescingIterator::Coalesce(const WideColumns& columns) {
WideColumns coalesced;
coalesced.reserve(wide_columns_.size() + columns.size());
auto base_cols = wide_columns_.begin();
auto new_cols = columns.begin();
while (base_cols != wide_columns_.end() && new_cols != columns.end()) {
auto comparison = base_cols->name().compare(new_cols->name());
if (comparison < 0) {
coalesced.push_back(*base_cols);
++base_cols;
} else if (comparison > 0) {
coalesced.push_back(*new_cols);
++new_cols;
} else {
coalesced.push_back(*new_cols);
++new_cols;
++base_cols;
}
}
while (base_cols != wide_columns_.end()) {
coalesced.push_back(*base_cols);
++base_cols;
}
while (new_cols != columns.end()) {
coalesced.push_back(*new_cols);
++new_cols;
}
wide_columns_.swap(coalesced);

if (WideColumnsHelper::HasDefaultColumn(wide_columns_)) {
value_ = WideColumnsHelper::GetDefaultColumn(wide_columns_);
}
}
void CoalescingIterator::Next() { impl_.Next(); }
void CoalescingIterator::Prev() { impl_.Prev(); }
Slice CoalescingIterator::key() const { return impl_.key(); }
Status CoalescingIterator::status() const { return impl_.status(); }

} // namespace ROCKSDB_NAMESPACE
66 changes: 28 additions & 38 deletions db/coalescing_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,66 +8,56 @@
#include <variant>

#include "db/multi_cf_iterator_impl.h"
#include "multi_cf_iterator_impl.h"
#include "rocksdb/comparator.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "util/heap.h"
#include "util/overload.h"

namespace ROCKSDB_NAMESPACE {

// UNDER CONSTRUCTION - DO NOT USE
// A cross-column-family iterator from a consistent database state.
// If a key exists in more than one column family, it chooses value/columns
// based on the coalescing rule provided by CoalescingOptions. See
// CoalescingOptions in options.h for details
class CoalescingIterator : public Iterator {
public:
CoalescingIterator(const CoalescingOptions coalesing_options,
const Comparator* comparator,
CoalescingIterator(const Comparator* comparator,
const std::vector<ColumnFamilyHandle*>& column_families,
const std::vector<Iterator*>& child_iterators)
: impl_(comparator, column_families, child_iterators),
rule_(coalesing_options.rule) {}
: impl_(
comparator, column_families, child_iterators, [this]() { Reset(); },
[this](ColumnFamilyHandle*, Iterator* iter) {
Coalesce(iter->columns());
}) {}
~CoalescingIterator() override {}

// No copy allowed
CoalescingIterator(const CoalescingIterator&) = delete;
CoalescingIterator& operator=(const CoalescingIterator&) = delete;

bool Valid() const override;
void SeekToFirst() override;
void SeekToLast() override;
void Seek(const Slice& target) override;
void SeekForPrev(const Slice& target) override;
void Next() override;
void Prev() override;
Slice key() const override;
Status status() const override;
bool Valid() const override { return impl_.Valid(); }
void SeekToFirst() override { impl_.SeekToFirst(); }
void SeekToLast() override { impl_.SeekToLast(); }
void Seek(const Slice& target) override { impl_.Seek(target); }
void SeekForPrev(const Slice& target) override { impl_.SeekForPrev(target); }
void Next() override { impl_.Next(); }
void Prev() override { impl_.Prev(); }
Slice key() const override { return impl_.key(); }
Status status() const override { return impl_.status(); }

Slice value() const override {
if (rule_ == CoalescingRule::kChooseFromFirstCfContainingKey) {
assert(Valid());
return impl_.current()->value();
}
// TODO - add more rules
assert(false);
return Slice();
assert(Valid());
return value_;
}
const WideColumns& columns() const override {
if (rule_ == CoalescingRule::kChooseFromFirstCfContainingKey) {
assert(Valid());
return impl_.current()->columns();
}
// TODO - add more rules
assert(false);
return kNoWideColumns;
assert(Valid());
return wide_columns_;
}

void Reset() {
value_.clear();
wide_columns_.clear();
}

private:
MultiCfIteratorImpl impl_;
CoalescingRule rule_;
Slice value_;
WideColumns wide_columns_;

void Coalesce(const WideColumns& columns);
};

} // namespace ROCKSDB_NAMESPACE
83 changes: 40 additions & 43 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3747,54 +3747,49 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(

std::unique_ptr<Iterator> DBImpl::NewCoalescingIterator(
const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_families,
const CoalescingOptions& coalesing_options) {
if (column_families.size() == 0) {
return std::unique_ptr<Iterator>(NewErrorIterator(
Status::InvalidArgument("No Column Family was provided")));
}
const Comparator* first_comparator = column_families[0]->GetComparator();
for (size_t i = 1; i < column_families.size(); ++i) {
const Comparator* cf_comparator = column_families[i]->GetComparator();
if (first_comparator != cf_comparator &&
first_comparator->GetId().compare(cf_comparator->GetId()) != 0) {
return std::unique_ptr<Iterator>(NewErrorIterator(Status::InvalidArgument(
"Different comparators are being used across CFs")));
}
}
const std::vector<ColumnFamilyHandle*>& column_families) {
std::vector<Iterator*> child_iterators;
Status s = NewIterators(_read_options, column_families, &child_iterators);
if (s.ok()) {
return std::make_unique<CoalescingIterator>(
coalesing_options, first_comparator, column_families,
std::move(child_iterators));
Status s = GetChildIteratorsForMultiCfIterator(_read_options, column_families,
&child_iterators);
if (!s.ok()) {
return std::unique_ptr<Iterator>(NewErrorIterator(s));
}
return std::unique_ptr<Iterator>(NewErrorIterator(s));
return std::make_unique<CoalescingIterator>(
column_families[0]->GetComparator(), column_families,
std::move(child_iterators));
}

std::unique_ptr<AttributeGroupIterator> DBImpl::NewAttributeGroupIterator(
const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_families) {
std::vector<Iterator*> child_iterators;
Status s = GetChildIteratorsForMultiCfIterator(_read_options, column_families,
&child_iterators);
if (!s.ok()) {
return NewAttributeGroupErrorIterator(s);
}
return std::make_unique<AttributeGroupIteratorImpl>(
column_families[0]->GetComparator(), column_families,
std::move(child_iterators));
}

Status DBImpl::GetChildIteratorsForMultiCfIterator(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* child_iterators) {
if (column_families.size() == 0) {
return NewAttributeGroupErrorIterator(
Status::InvalidArgument("No Column Family was provided"));
return Status::InvalidArgument("No Column Family was provided");
}
const Comparator* first_comparator = column_families[0]->GetComparator();
for (size_t i = 1; i < column_families.size(); ++i) {
const Comparator* cf_comparator = column_families[i]->GetComparator();
if (first_comparator != cf_comparator &&
first_comparator->GetId().compare(cf_comparator->GetId()) != 0) {
return NewAttributeGroupErrorIterator(Status::InvalidArgument(
"Different comparators are being used across CFs"));
return Status::InvalidArgument(
"Different comparators are being used across CFs");
}
}
std::vector<Iterator*> child_iterators;
Status s = NewIterators(_read_options, column_families, &child_iterators);
if (s.ok()) {
return std::make_unique<AttributeGroupIteratorImpl>(
first_comparator, column_families, std::move(child_iterators));
}
return std::make_unique<EmptyAttributeGroupIterator>(s);
return NewIterators(read_options, column_families, child_iterators);
}

Status DBImpl::NewIterators(
Expand Down Expand Up @@ -3871,8 +3866,8 @@ Status DBImpl::NewIterators(
}
} else {
// Note: no need to consider the special case of
// last_seq_same_as_publish_seq_==false since NewIterators is overridden in
// WritePreparedTxnDB
// last_seq_same_as_publish_seq_==false since NewIterators is overridden
// in WritePreparedTxnDB
auto snapshot = read_options.snapshot != nullptr
? read_options.snapshot->GetSequenceNumber()
: versions_->LastSequence();
Expand Down Expand Up @@ -3996,8 +3991,8 @@ DBImpl::CreateTimestampedSnapshotImpl(SequenceNumber snapshot_seq, uint64_t ts,
std::shared_ptr<const SnapshotImpl> latest =
timestamped_snapshots_.GetSnapshot(std::numeric_limits<uint64_t>::max());

// If there is already a latest timestamped snapshot, then we need to do some
// checks.
// If there is already a latest timestamped snapshot, then we need to do
// some checks.
if (latest) {
uint64_t latest_snap_ts = latest->GetTimestamp();
SequenceNumber latest_snap_seq = latest->GetSequenceNumber();
Expand All @@ -4006,8 +4001,8 @@ DBImpl::CreateTimestampedSnapshotImpl(SequenceNumber snapshot_seq, uint64_t ts,
Status status;
std::shared_ptr<const SnapshotImpl> ret;
if (latest_snap_ts > ts) {
// A snapshot created later cannot have smaller timestamp than a previous
// timestamped snapshot.
// A snapshot created later cannot have smaller timestamp than a
// previous timestamped snapshot.
needs_create_snap = false;
std::ostringstream oss;
oss << "snapshot exists with larger timestamp " << latest_snap_ts << " > "
Expand Down Expand Up @@ -4121,7 +4116,8 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) {

// Calculate a new threshold, skipping those CFs where compactions are
// scheduled. We do not do the same pass as the previous loop because
// mutex might be unlocked during the loop, making the result inaccurate.
// mutex might be unlocked during the loop, making the result
// inaccurate.
SequenceNumber new_bottommost_files_mark_threshold = kMaxSequenceNumber;
for (auto* cfd : *versions_->GetColumnFamilySet()) {
if (CfdListContains(cf_scheduled, cfd) ||
Expand Down Expand Up @@ -4549,7 +4545,8 @@ Status DBImpl::GetApproximateSizes(const SizeApproximationOptions& options,
sizes[i] = 0;
if (options.include_files) {
sizes[i] += versions_->ApproximateSize(
options, read_options, v, k1.Encode(), k2.Encode(), /*start_level=*/0,
options, read_options, v, k1.Encode(), k2.Encode(),
/*start_level=*/0,
/*end_level=*/-1, TableReaderCaller::kUserApproximateSize);
}
if (options.include_memtables) {
Expand Down Expand Up @@ -4834,9 +4831,9 @@ void DBImpl::GetColumnFamilyMetaData(ColumnFamilyHandle* column_family,
static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
auto* sv = GetAndRefSuperVersion(cfd);
{
// Without mutex, Version::GetColumnFamilyMetaData will have data race with
// Compaction::MarkFilesBeingCompacted. One solution is to use mutex, but
// this may cause regression. An alternative is to make
// Without mutex, Version::GetColumnFamilyMetaData will have data race
// with Compaction::MarkFilesBeingCompacted. One solution is to use mutex,
// but this may cause regression. An alternative is to make
// FileMetaData::being_compacted atomic, but it will make FileMetaData
// non-copy-able. Another option is to separate these variables from
// original FileMetaData struct, and this requires re-organization of data
Expand Down
Loading

0 comments on commit e90568e

Please sign in to comment.