Skip to content

Commit

Permalink
Best-effort recovery support for atomic flush (facebook#12406)
Browse files Browse the repository at this point in the history
Summary:
This PR updates `VersionEditHandlerPointInTime` to recover all or none of the updates in an AtomicGroup. This makes best-effort recovery properly handle atomic flushes during recovery, so the features are now allowed to both be enabled at once.

The new logic requires that AtomicGroups do not contain column family additions or removals. AtomicGroups are currently written for atomic flush, which does not include such edits.

Column family additions or removals are recovered independently of AtomicGroups. The new logic needs to be aware of removal, though, so that a dropped CF does not prevent completion of an AtomicGroup recovery.

The new logic treats each AtomicGroup as if it contains updates for all existing column families, even though it is possible to create AtomicGroups that only affect a subset of column families. This simplifies the logic at the expense of recovering less data in certain edge case scenarios.

The usage of `MaybeCreateVersion()` is pretty tricky. The goal is to create a barrier at the start of an AtomicGroup such that all valid states up to that point will be applied to `versions_`. Here is a summary.

- `MaybeCreateVersion(..., false)` creates a `Version` on a negative edge trigger (transition from valid to invalid). It was  previously called when applying each update. Now, it is only called when applying non-AtomicGroup updates.
- `MaybeCreateVersion(..., true)` creates a `Version` on a positive level trigger (valid state). It was previously called only at the end of iteration. Now, it is additionally called before processing an AtomicGroup.

Pull Request resolved: facebook#12406

Reviewed By: jaykorean, cbi42

Differential Revision: D54494904

Pulled By: ajkr

fbshipit-source-id: 0114a9fe1d04b471d086dcab5978ea8a3a56ad52
  • Loading branch information
ajkr authored and facebook-github-bot committed Mar 6, 2024
1 parent 583fded commit 27a2473
Show file tree
Hide file tree
Showing 7 changed files with 704 additions and 117 deletions.
6 changes: 0 additions & 6 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,6 @@ Status DBImpl::ValidateOptions(const DBOptions& db_options) {
"atomic_flush is incompatible with enable_pipelined_write");
}

// TODO remove this restriction
if (db_options.atomic_flush && db_options.best_efforts_recovery) {
return Status::InvalidArgument(
"atomic_flush is currently incompatible with best-efforts recovery");
}

if (db_options.use_direct_io_for_flush_and_compaction &&
0 == db_options.writable_file_max_buffer_size) {
return Status::InvalidArgument(
Expand Down
208 changes: 172 additions & 36 deletions db/version_edit_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,37 +34,36 @@ void VersionEditHandlerBase::Iterate(log::Reader& reader,
reader.ReadRecord(&record, &scratch) && log_read_status->ok()) {
VersionEdit edit;
s = edit.DecodeFrom(record);
if (!s.ok()) {
break;
}

s = read_buffer_.AddEdit(&edit);
if (!s.ok()) {
break;
if (s.ok()) {
s = read_buffer_.AddEdit(&edit);
}
ColumnFamilyData* cfd = nullptr;
if (edit.IsInAtomicGroup()) {
if (read_buffer_.IsFull()) {
for (auto& e : read_buffer_.replay_buffer()) {
s = ApplyVersionEdit(e, &cfd);
if (!s.ok()) {
break;
if (s.ok()) {
ColumnFamilyData* cfd = nullptr;
if (edit.IsInAtomicGroup()) {
if (read_buffer_.IsFull()) {
s = OnAtomicGroupReplayBegin();
for (size_t i = 0; s.ok() && i < read_buffer_.replay_buffer().size();
i++) {
auto& e = read_buffer_.replay_buffer()[i];
s = ApplyVersionEdit(e, &cfd);
if (s.ok()) {
recovered_edits++;
}
}
if (s.ok()) {
read_buffer_.Clear();
s = OnAtomicGroupReplayEnd();
}
++recovered_edits;
}
if (!s.ok()) {
break;
} else {
s = ApplyVersionEdit(edit, &cfd);
if (s.ok()) {
recovered_edits++;
}
read_buffer_.Clear();
}
} else {
s = ApplyVersionEdit(edit, &cfd);
if (s.ok()) {
++recovered_edits;
}
}
}
if (!log_read_status->ok()) {
if (s.ok() && !log_read_status->ok()) {
s = *log_read_status;
}

Expand Down Expand Up @@ -735,12 +734,80 @@ VersionEditHandlerPointInTime::VersionEditHandlerPointInTime(
read_options, epoch_number_requirement) {}

VersionEditHandlerPointInTime::~VersionEditHandlerPointInTime() {
for (const auto& cfid_and_version : atomic_update_versions_) {
delete cfid_and_version.second;
}
for (const auto& elem : versions_) {
delete elem.second;
}
versions_.clear();
}

Status VersionEditHandlerPointInTime::OnAtomicGroupReplayBegin() {
if (in_atomic_group_) {
return Status::Corruption("unexpected AtomicGroup start");
}

// The AtomicGroup that is about to begin may block column families in a valid
// state from saving any more updates. So we should save any valid states
// before proceeding.
for (const auto& cfid_and_builder : builders_) {
ColumnFamilyData* cfd = version_set_->GetColumnFamilySet()->GetColumnFamily(
cfid_and_builder.first);
assert(!cfd->IsDropped());
assert(cfd->initialized());
VersionEdit edit;
Status s = MaybeCreateVersion(edit, cfd, true /* force_create_version */);
if (!s.ok()) {
return s;
}
}

// An old AtomicGroup is incomplete. Throw away the versions that failed to
// complete it. They must not be used for completing the upcoming
// AtomicGroup since they are too old.
for (auto& cfid_and_version : atomic_update_versions_) {
delete cfid_and_version.second;
}

in_atomic_group_ = true;
// We lazily assume the column families that exist at this point are all
// involved in the AtomicGroup. Overestimating the scope of the AtomicGroup
// will sometimes cause less data to be recovered, which is fine for
// best-effort recovery.
atomic_update_versions_.clear();
for (const auto& cfid_and_builder : builders_) {
atomic_update_versions_[cfid_and_builder.first] = nullptr;
}
atomic_update_versions_missing_ = atomic_update_versions_.size();
return Status::OK();
}

Status VersionEditHandlerPointInTime::OnAtomicGroupReplayEnd() {
if (!in_atomic_group_) {
return Status::Corruption("unexpected AtomicGroup end");
}
in_atomic_group_ = false;

// The AtomicGroup must not have changed the column families. We don't support
// CF adds or drops in an AtomicGroup.
if (builders_.size() != atomic_update_versions_.size()) {
return Status::Corruption("unexpected CF change in AtomicGroup");
}
for (const auto& cfid_and_builder : builders_) {
if (atomic_update_versions_.find(cfid_and_builder.first) ==
atomic_update_versions_.end()) {
return Status::Corruption("unexpected CF add in AtomicGroup");
}
}
for (const auto& cfid_and_version : atomic_update_versions_) {
if (builders_.find(cfid_and_version.first) == builders_.end()) {
return Status::Corruption("unexpected CF drop in AtomicGroup");
}
}
return Status::OK();
}

void VersionEditHandlerPointInTime::CheckIterationResult(
const log::Reader& reader, Status* s) {
VersionEditHandler::CheckIterationResult(reader, s);
Expand Down Expand Up @@ -770,7 +837,14 @@ void VersionEditHandlerPointInTime::CheckIterationResult(
ColumnFamilyData* VersionEditHandlerPointInTime::DestroyCfAndCleanup(
const VersionEdit& edit) {
ColumnFamilyData* cfd = VersionEditHandler::DestroyCfAndCleanup(edit);
auto v_iter = versions_.find(edit.GetColumnFamily());
uint32_t cfid = edit.GetColumnFamily();
if (AtomicUpdateVersionsContains(cfid)) {
AtomicUpdateVersionsDropCf(cfid);
if (AtomicUpdateVersionsCompleted()) {
AtomicUpdateVersionsApply();
}
}
auto v_iter = versions_.find(cfid);
if (v_iter != versions_.end()) {
delete v_iter->second;
versions_.erase(v_iter);
Expand Down Expand Up @@ -871,15 +945,16 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion(

// Create version before apply edit. The version will represent the state
// before applying the version edit.
// A new version will created if:
// A new version will be created if:
// 1) no error has occurred so far, and
// 2) log_number_, next_file_number_ and last_sequence_ are known, and
// 3) any of the following:
// 3) not in an AtomicGroup
// 4) any of the following:
// a) no missing file before, but will have missing file(s) after applying
// this version edit.
// b) no missing file after applying the version edit, and the caller
// explicitly request that a new version be created.
if (s.ok() && !missing_info &&
if (s.ok() && !missing_info && !in_atomic_group_ &&
((has_missing_files && !prev_has_missing_files) ||
(!has_missing_files && force_create_version))) {
if (!builder) {
Expand Down Expand Up @@ -908,15 +983,22 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion(
}
s = builder->SaveTo(version->storage_info());
if (s.ok()) {
version->PrepareAppend(
*cfd->GetLatestMutableCFOptions(), read_options_,
!version_set_->db_options_->skip_stats_update_on_db_open);
auto v_iter = versions_.find(cfd->GetID());
if (v_iter != versions_.end()) {
delete v_iter->second;
v_iter->second = version;
if (AtomicUpdateVersionsContains(cfd->GetID())) {
AtomicUpdateVersionsPut(version);
if (AtomicUpdateVersionsCompleted()) {
AtomicUpdateVersionsApply();
}
} else {
versions_.emplace(cfd->GetID(), version);
version->PrepareAppend(
*cfd->GetLatestMutableCFOptions(), read_options_,
!version_set_->db_options_->skip_stats_update_on_db_open);
auto v_iter = versions_.find(cfd->GetID());
if (v_iter != versions_.end()) {
delete v_iter->second;
v_iter->second = version;
} else {
versions_.emplace(cfd->GetID(), version);
}
}
} else {
delete version;
Expand Down Expand Up @@ -956,6 +1038,60 @@ Status VersionEditHandlerPointInTime::LoadTables(
return Status::OK();
}

bool VersionEditHandlerPointInTime::AtomicUpdateVersionsCompleted() {
return atomic_update_versions_missing_ == 0;
}

bool VersionEditHandlerPointInTime::AtomicUpdateVersionsContains(
uint32_t cfid) {
return atomic_update_versions_.find(cfid) != atomic_update_versions_.end();
}

void VersionEditHandlerPointInTime::AtomicUpdateVersionsDropCf(uint32_t cfid) {
assert(!AtomicUpdateVersionsCompleted());
auto atomic_update_versions_iter = atomic_update_versions_.find(cfid);
assert(atomic_update_versions_iter != atomic_update_versions_.end());
if (atomic_update_versions_iter->second == nullptr) {
atomic_update_versions_missing_--;
} else {
delete atomic_update_versions_iter->second;
}
atomic_update_versions_.erase(atomic_update_versions_iter);
}

void VersionEditHandlerPointInTime::AtomicUpdateVersionsPut(Version* version) {
assert(!AtomicUpdateVersionsCompleted());
auto atomic_update_versions_iter =
atomic_update_versions_.find(version->cfd()->GetID());
assert(atomic_update_versions_iter != atomic_update_versions_.end());
if (atomic_update_versions_iter->second == nullptr) {
atomic_update_versions_missing_--;
} else {
delete atomic_update_versions_iter->second;
}
atomic_update_versions_iter->second = version;
}

void VersionEditHandlerPointInTime::AtomicUpdateVersionsApply() {
assert(AtomicUpdateVersionsCompleted());
for (const auto& cfid_and_version : atomic_update_versions_) {
uint32_t cfid = cfid_and_version.first;
Version* version = cfid_and_version.second;
assert(version != nullptr);
version->PrepareAppend(
*version->cfd()->GetLatestMutableCFOptions(), read_options_,
!version_set_->db_options_->skip_stats_update_on_db_open);
auto versions_iter = versions_.find(cfid);
if (versions_iter != versions_.end()) {
delete versions_iter->second;
versions_iter->second = version;
} else {
versions_.emplace(cfid, version);
}
}
atomic_update_versions_.clear();
}

Status ManifestTailer::Initialize() {
if (Mode::kRecovery == mode_) {
return VersionEditHandler::Initialize();
Expand Down
35 changes: 35 additions & 0 deletions db/version_edit_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class VersionEditHandlerBase {
virtual Status ApplyVersionEdit(VersionEdit& edit,
ColumnFamilyData** cfd) = 0;

virtual Status OnAtomicGroupReplayBegin() { return Status::OK(); }
virtual Status OnAtomicGroupReplayEnd() { return Status::OK(); }

virtual void CheckIterationResult(const log::Reader& /*reader*/,
Status* /*s*/) {}

Expand Down Expand Up @@ -237,8 +240,16 @@ class VersionEditHandlerPointInTime : public VersionEditHandler {
~VersionEditHandlerPointInTime() override;

protected:
Status OnAtomicGroupReplayBegin() override;
Status OnAtomicGroupReplayEnd() override;
void CheckIterationResult(const log::Reader& reader, Status* s) override;

ColumnFamilyData* DestroyCfAndCleanup(const VersionEdit& edit) override;
// `MaybeCreateVersion(..., false)` creates a version upon a negative edge
// trigger (transition from valid to invalid).
//
// `MaybeCreateVersion(..., true)` creates a version on a positive level
// trigger (state is valid).
Status MaybeCreateVersion(const VersionEdit& edit, ColumnFamilyData* cfd,
bool force_create_version) override;
virtual Status VerifyFile(ColumnFamilyData* cfd, const std::string& fpath,
Expand All @@ -251,6 +262,30 @@ class VersionEditHandlerPointInTime : public VersionEditHandler {
bool is_initial_load) override;

std::unordered_map<uint32_t, Version*> versions_;

// `atomic_update_versions_` is for ensuring all-or-nothing AtomicGroup
// recoveries. When `atomic_update_versions_` is nonempty, it serves as a
// barrier to updating `versions_` until all its values are populated.
std::unordered_map<uint32_t, Version*> atomic_update_versions_;
// `atomic_update_versions_missing_` counts the nullptr values in
// `atomic_update_versions_`.
size_t atomic_update_versions_missing_;

bool in_atomic_group_ = false;

private:
bool AtomicUpdateVersionsCompleted();
bool AtomicUpdateVersionsContains(uint32_t cfid);
void AtomicUpdateVersionsDropCf(uint32_t cfid);

// This function is called for `Version*` updates for column families in an
// incomplete atomic update. It buffers `Version*` updates in
// `atomic_update_versions_`.
void AtomicUpdateVersionsPut(Version* version);

// This function is called upon completion of an atomic update. It applies
// `Version*` updates in `atomic_update_versions_` to `versions_`.
void AtomicUpdateVersionsApply();
};

class ManifestTailer : public VersionEditHandlerPointInTime {
Expand Down
Loading

0 comments on commit 27a2473

Please sign in to comment.