Skip to content

Commit

Permalink
Make BaseDeltaIterator honor allow_unprepared_value (#13111)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #13111

As a follow-up to #13105, the patch changes `BaseDeltaIterator` so that it honors the read option `allow_unprepared_value`. When the option is set and the `BaseDeltaIterator` lands on the base iterator, it defers calling `PrepareValue` on the base iterator and setting `value()` and `columns()` until `PrepareValue` is called on the `BaseDeltaIterator` itself.

Reviewed By: jowlyzhang

Differential Revision: D65344764

fbshipit-source-id: d79c77b5de7c690bf2deeff435e9b0a9065f6c5c
  • Loading branch information
ltamasi authored and facebook-github-bot committed Nov 1, 2024
1 parent 7c98a2d commit 1006edd
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
`BaseDeltaIterator` now honors the read option `allow_unprepared_value`.
6 changes: 4 additions & 2 deletions utilities/write_batch_with_index/write_batch_with_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -329,15 +329,17 @@ Iterator* WriteBatchWithIndex::NewIteratorWithBase(
}

return new BaseDeltaIterator(column_family, base_iterator, wbwiii,
GetColumnFamilyUserComparator(column_family));
GetColumnFamilyUserComparator(column_family),
read_options);
}

Iterator* WriteBatchWithIndex::NewIteratorWithBase(Iterator* base_iterator) {
// default column family's comparator
auto wbwiii = new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch,
&rep->comparator);
return new BaseDeltaIterator(nullptr, base_iterator, wbwiii,
rep->comparator.default_comparator());
rep->comparator.default_comparator(),
/* read_options */ nullptr);
}

Status WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ namespace ROCKSDB_NAMESPACE {
BaseDeltaIterator::BaseDeltaIterator(ColumnFamilyHandle* column_family,
Iterator* base_iterator,
WBWIIteratorImpl* delta_iterator,
const Comparator* comparator)
const Comparator* comparator,
const ReadOptions* read_options)
: forward_(true),
current_at_base_(true),
equal_keys_(false),
allow_unprepared_value_(
read_options ? read_options->allow_unprepared_value : false),
status_(Status::OK()),
column_family_(column_family),
base_iterator_(base_iterator),
Expand Down Expand Up @@ -424,7 +427,9 @@ void BaseDeltaIterator::UpdateCurrent() {
} else if (!DeltaValid()) {
// Delta has finished.
current_at_base_ = true;
SetValueAndColumnsFromBase();
if (!allow_unprepared_value_) {
SetValueAndColumnsFromBase();
}
return;
} else {
int compare =
Expand All @@ -448,7 +453,9 @@ void BaseDeltaIterator::UpdateCurrent() {
}
} else {
current_at_base_ = true;
SetValueAndColumnsFromBase();
if (!allow_unprepared_value_) {
SetValueAndColumnsFromBase();
}
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ class BaseDeltaIterator : public Iterator {
public:
BaseDeltaIterator(ColumnFamilyHandle* column_family, Iterator* base_iterator,
WBWIIteratorImpl* delta_iterator,
const Comparator* comparator);
const Comparator* comparator,
const ReadOptions* read_options);

~BaseDeltaIterator() override;

Expand All @@ -47,6 +48,23 @@ class BaseDeltaIterator : public Iterator {
void SeekForPrev(const Slice& k) override;
void Next() override;
void Prev() override;

bool PrepareValue() override {
assert(Valid());

if (!allow_unprepared_value_) {
return true;
}

if (!current_at_base_) {
return true;
}

SetValueAndColumnsFromBase();

return Valid();
}

Slice key() const override;
Slice value() const override { return value_; }
const WideColumns& columns() const override { return columns_; }
Expand All @@ -69,6 +87,7 @@ class BaseDeltaIterator : public Iterator {
bool forward_;
bool current_at_base_;
bool equal_keys_;
bool allow_unprepared_value_;
Status status_;
ColumnFamilyHandle* column_family_;
std::unique_ptr<Iterator> base_iterator_;
Expand Down
101 changes: 99 additions & 2 deletions utilities/write_batch_with_index/write_batch_with_index_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1742,8 +1742,6 @@ TEST_P(WriteBatchWithIndexTest, TestNewIteratorWithBaseFromWbwi) {
TEST_P(WriteBatchWithIndexTest, NewIteratorWithBasePrepareValue) {
// BaseDeltaIterator by default should call PrepareValue if it lands on the
// base iterator in case it was created with allow_unprepared_value=true.
// (Note that BaseDeltaIterator itself does not support allow_unprepared_value
// yet but the base iterator might have been created with the flag set.)
ColumnFamilyHandleImplDummy cf1(1, BytewiseComparator());
KVMap map{{"a", "aa"}, {"c", "cc"}, {"e", "ee"}};

Expand Down Expand Up @@ -1814,6 +1812,105 @@ TEST_P(WriteBatchWithIndexTest, NewIteratorWithBasePrepareValue) {
}
}

TEST_P(WriteBatchWithIndexTest, NewIteratorWithBaseAllowUnpreparedValue) {
ColumnFamilyHandleImplDummy cf1(1, BytewiseComparator());
KVMap map{{"a", "aa"}, {"c", "cc"}, {"e", "ee"}};

ASSERT_OK(batch_->Put(&cf1, "c", "cc1"));

ReadOptions read_options = read_opts_;
read_options.allow_unprepared_value = true;

{
std::unique_ptr<Iterator> iter(batch_->NewIteratorWithBase(
&cf1, new KVIter(&map, /* allow_unprepared_value */ true),
&read_options));

iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "a");
ASSERT_TRUE(iter->value().empty());
ASSERT_TRUE(iter->PrepareValue());
ASSERT_EQ(iter->value(), "aa");

iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "c");
ASSERT_EQ(iter->value(), "cc1");
// This key is served out of the delta iterator so this PrepareValue() is a
// no-op
ASSERT_TRUE(iter->PrepareValue());
ASSERT_EQ(iter->value(), "cc1");

iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "e");
ASSERT_TRUE(iter->value().empty());
ASSERT_TRUE(iter->PrepareValue());
ASSERT_EQ(iter->value(), "ee");

iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());

iter->SeekToLast();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "e");
ASSERT_TRUE(iter->value().empty());
ASSERT_TRUE(iter->PrepareValue());
ASSERT_EQ(iter->value(), "ee");

iter->Prev();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "c");
ASSERT_EQ(iter->value(), "cc1");
// This key is served out of the delta iterator so this PrepareValue() is a
// no-op
ASSERT_TRUE(iter->PrepareValue());
ASSERT_EQ(iter->value(), "cc1");

iter->Prev();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "a");
ASSERT_TRUE(iter->value().empty());
ASSERT_TRUE(iter->PrepareValue());
ASSERT_EQ(iter->value(), "aa");

iter->Prev();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
}

// PrepareValue failures from the base iterator should be propagated
{
std::unique_ptr<Iterator> iter(batch_->NewIteratorWithBase(
&cf1,
new KVIter(&map, /* allow_unprepared_value */ true,
/* fail_prepare_value */ true),
&read_options));

iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_FALSE(iter->PrepareValue());
ASSERT_FALSE(iter->Valid());
ASSERT_TRUE(iter->status().IsCorruption());

iter->SeekToLast();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_FALSE(iter->PrepareValue());
ASSERT_FALSE(iter->Valid());
ASSERT_TRUE(iter->status().IsCorruption());
}
}

TEST_P(WriteBatchWithIndexTest, NewIteratorWithBaseMergePrepareValue) {
// When performing a merge across the base and delta iterators,
// BaseDeltaIterator should call PrepareValue on the base iterator in case it
Expand Down

0 comments on commit 1006edd

Please sign in to comment.