diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index d6ac98c86bd..35af4dcbef8 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -281,12 +281,6 @@ Status DBImpl::ValidateOptions(const DBOptions& db_options) { "atomic_flush is incompatible with enable_pipelined_write"); } - // TODO remove this restriction - if (db_options.atomic_flush && db_options.best_efforts_recovery) { - return Status::InvalidArgument( - "atomic_flush is currently incompatible with best-efforts recovery"); - } - if (db_options.use_direct_io_for_flush_and_compaction && 0 == db_options.writable_file_max_buffer_size) { return Status::InvalidArgument( diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc index 874f6b2a2c2..42bdcd3c33a 100644 --- a/db/version_edit_handler.cc +++ b/db/version_edit_handler.cc @@ -34,37 +34,36 @@ void VersionEditHandlerBase::Iterate(log::Reader& reader, reader.ReadRecord(&record, &scratch) && log_read_status->ok()) { VersionEdit edit; s = edit.DecodeFrom(record); - if (!s.ok()) { - break; - } - - s = read_buffer_.AddEdit(&edit); - if (!s.ok()) { - break; + if (s.ok()) { + s = read_buffer_.AddEdit(&edit); } - ColumnFamilyData* cfd = nullptr; - if (edit.IsInAtomicGroup()) { - if (read_buffer_.IsFull()) { - for (auto& e : read_buffer_.replay_buffer()) { - s = ApplyVersionEdit(e, &cfd); - if (!s.ok()) { - break; + if (s.ok()) { + ColumnFamilyData* cfd = nullptr; + if (edit.IsInAtomicGroup()) { + if (read_buffer_.IsFull()) { + s = OnAtomicGroupReplayBegin(); + for (size_t i = 0; s.ok() && i < read_buffer_.replay_buffer().size(); + i++) { + auto& e = read_buffer_.replay_buffer()[i]; + s = ApplyVersionEdit(e, &cfd); + if (s.ok()) { + recovered_edits++; + } + } + if (s.ok()) { + read_buffer_.Clear(); + s = OnAtomicGroupReplayEnd(); } - ++recovered_edits; } - if (!s.ok()) { - break; + } else { + s = ApplyVersionEdit(edit, &cfd); + if (s.ok()) { + recovered_edits++; } - read_buffer_.Clear(); - } - } else { - s = ApplyVersionEdit(edit, &cfd); - if (s.ok()) { - ++recovered_edits; } } } - if (!log_read_status->ok()) { + if (s.ok() && !log_read_status->ok()) { s = *log_read_status; } @@ -735,12 +734,80 @@ VersionEditHandlerPointInTime::VersionEditHandlerPointInTime( read_options, epoch_number_requirement) {} VersionEditHandlerPointInTime::~VersionEditHandlerPointInTime() { + for (const auto& cfid_and_version : atomic_update_versions_) { + delete cfid_and_version.second; + } for (const auto& elem : versions_) { delete elem.second; } versions_.clear(); } +Status VersionEditHandlerPointInTime::OnAtomicGroupReplayBegin() { + if (in_atomic_group_) { + return Status::Corruption("unexpected AtomicGroup start"); + } + + // The AtomicGroup that is about to begin may block column families in a valid + // state from saving any more updates. So we should save any valid states + // before proceeding. + for (const auto& cfid_and_builder : builders_) { + ColumnFamilyData* cfd = version_set_->GetColumnFamilySet()->GetColumnFamily( + cfid_and_builder.first); + assert(!cfd->IsDropped()); + assert(cfd->initialized()); + VersionEdit edit; + Status s = MaybeCreateVersion(edit, cfd, true /* force_create_version */); + if (!s.ok()) { + return s; + } + } + + // An old AtomicGroup is incomplete. Throw away the versions that failed to + // complete it. They must not be used for completing the upcoming + // AtomicGroup since they are too old. + for (auto& cfid_and_version : atomic_update_versions_) { + delete cfid_and_version.second; + } + + in_atomic_group_ = true; + // We lazily assume the column families that exist at this point are all + // involved in the AtomicGroup. Overestimating the scope of the AtomicGroup + // will sometimes cause less data to be recovered, which is fine for + // best-effort recovery. + atomic_update_versions_.clear(); + for (const auto& cfid_and_builder : builders_) { + atomic_update_versions_[cfid_and_builder.first] = nullptr; + } + atomic_update_versions_missing_ = atomic_update_versions_.size(); + return Status::OK(); +} + +Status VersionEditHandlerPointInTime::OnAtomicGroupReplayEnd() { + if (!in_atomic_group_) { + return Status::Corruption("unexpected AtomicGroup end"); + } + in_atomic_group_ = false; + + // The AtomicGroup must not have changed the column families. We don't support + // CF adds or drops in an AtomicGroup. + if (builders_.size() != atomic_update_versions_.size()) { + return Status::Corruption("unexpected CF change in AtomicGroup"); + } + for (const auto& cfid_and_builder : builders_) { + if (atomic_update_versions_.find(cfid_and_builder.first) == + atomic_update_versions_.end()) { + return Status::Corruption("unexpected CF add in AtomicGroup"); + } + } + for (const auto& cfid_and_version : atomic_update_versions_) { + if (builders_.find(cfid_and_version.first) == builders_.end()) { + return Status::Corruption("unexpected CF drop in AtomicGroup"); + } + } + return Status::OK(); +} + void VersionEditHandlerPointInTime::CheckIterationResult( const log::Reader& reader, Status* s) { VersionEditHandler::CheckIterationResult(reader, s); @@ -770,7 +837,14 @@ void VersionEditHandlerPointInTime::CheckIterationResult( ColumnFamilyData* VersionEditHandlerPointInTime::DestroyCfAndCleanup( const VersionEdit& edit) { ColumnFamilyData* cfd = VersionEditHandler::DestroyCfAndCleanup(edit); - auto v_iter = versions_.find(edit.GetColumnFamily()); + uint32_t cfid = edit.GetColumnFamily(); + if (AtomicUpdateVersionsContains(cfid)) { + AtomicUpdateVersionsDropCf(cfid); + if (AtomicUpdateVersionsCompleted()) { + AtomicUpdateVersionsApply(); + } + } + auto v_iter = versions_.find(cfid); if (v_iter != versions_.end()) { delete v_iter->second; versions_.erase(v_iter); @@ -871,15 +945,16 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion( // Create version before apply edit. The version will represent the state // before applying the version edit. - // A new version will created if: + // A new version will be created if: // 1) no error has occurred so far, and // 2) log_number_, next_file_number_ and last_sequence_ are known, and - // 3) any of the following: + // 3) not in an AtomicGroup + // 4) any of the following: // a) no missing file before, but will have missing file(s) after applying // this version edit. // b) no missing file after applying the version edit, and the caller // explicitly request that a new version be created. - if (s.ok() && !missing_info && + if (s.ok() && !missing_info && !in_atomic_group_ && ((has_missing_files && !prev_has_missing_files) || (!has_missing_files && force_create_version))) { if (!builder) { @@ -908,15 +983,22 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion( } s = builder->SaveTo(version->storage_info()); if (s.ok()) { - version->PrepareAppend( - *cfd->GetLatestMutableCFOptions(), read_options_, - !version_set_->db_options_->skip_stats_update_on_db_open); - auto v_iter = versions_.find(cfd->GetID()); - if (v_iter != versions_.end()) { - delete v_iter->second; - v_iter->second = version; + if (AtomicUpdateVersionsContains(cfd->GetID())) { + AtomicUpdateVersionsPut(version); + if (AtomicUpdateVersionsCompleted()) { + AtomicUpdateVersionsApply(); + } } else { - versions_.emplace(cfd->GetID(), version); + version->PrepareAppend( + *cfd->GetLatestMutableCFOptions(), read_options_, + !version_set_->db_options_->skip_stats_update_on_db_open); + auto v_iter = versions_.find(cfd->GetID()); + if (v_iter != versions_.end()) { + delete v_iter->second; + v_iter->second = version; + } else { + versions_.emplace(cfd->GetID(), version); + } } } else { delete version; @@ -956,6 +1038,60 @@ Status VersionEditHandlerPointInTime::LoadTables( return Status::OK(); } +bool VersionEditHandlerPointInTime::AtomicUpdateVersionsCompleted() { + return atomic_update_versions_missing_ == 0; +} + +bool VersionEditHandlerPointInTime::AtomicUpdateVersionsContains( + uint32_t cfid) { + return atomic_update_versions_.find(cfid) != atomic_update_versions_.end(); +} + +void VersionEditHandlerPointInTime::AtomicUpdateVersionsDropCf(uint32_t cfid) { + assert(!AtomicUpdateVersionsCompleted()); + auto atomic_update_versions_iter = atomic_update_versions_.find(cfid); + assert(atomic_update_versions_iter != atomic_update_versions_.end()); + if (atomic_update_versions_iter->second == nullptr) { + atomic_update_versions_missing_--; + } else { + delete atomic_update_versions_iter->second; + } + atomic_update_versions_.erase(atomic_update_versions_iter); +} + +void VersionEditHandlerPointInTime::AtomicUpdateVersionsPut(Version* version) { + assert(!AtomicUpdateVersionsCompleted()); + auto atomic_update_versions_iter = + atomic_update_versions_.find(version->cfd()->GetID()); + assert(atomic_update_versions_iter != atomic_update_versions_.end()); + if (atomic_update_versions_iter->second == nullptr) { + atomic_update_versions_missing_--; + } else { + delete atomic_update_versions_iter->second; + } + atomic_update_versions_iter->second = version; +} + +void VersionEditHandlerPointInTime::AtomicUpdateVersionsApply() { + assert(AtomicUpdateVersionsCompleted()); + for (const auto& cfid_and_version : atomic_update_versions_) { + uint32_t cfid = cfid_and_version.first; + Version* version = cfid_and_version.second; + assert(version != nullptr); + version->PrepareAppend( + *version->cfd()->GetLatestMutableCFOptions(), read_options_, + !version_set_->db_options_->skip_stats_update_on_db_open); + auto versions_iter = versions_.find(cfid); + if (versions_iter != versions_.end()) { + delete versions_iter->second; + versions_iter->second = version; + } else { + versions_.emplace(cfid, version); + } + } + atomic_update_versions_.clear(); +} + Status ManifestTailer::Initialize() { if (Mode::kRecovery == mode_) { return VersionEditHandler::Initialize(); diff --git a/db/version_edit_handler.h b/db/version_edit_handler.h index af0817e4a17..4caa9c08988 100644 --- a/db/version_edit_handler.h +++ b/db/version_edit_handler.h @@ -40,6 +40,9 @@ class VersionEditHandlerBase { virtual Status ApplyVersionEdit(VersionEdit& edit, ColumnFamilyData** cfd) = 0; + virtual Status OnAtomicGroupReplayBegin() { return Status::OK(); } + virtual Status OnAtomicGroupReplayEnd() { return Status::OK(); } + virtual void CheckIterationResult(const log::Reader& /*reader*/, Status* /*s*/) {} @@ -237,8 +240,16 @@ class VersionEditHandlerPointInTime : public VersionEditHandler { ~VersionEditHandlerPointInTime() override; protected: + Status OnAtomicGroupReplayBegin() override; + Status OnAtomicGroupReplayEnd() override; void CheckIterationResult(const log::Reader& reader, Status* s) override; + ColumnFamilyData* DestroyCfAndCleanup(const VersionEdit& edit) override; + // `MaybeCreateVersion(..., false)` creates a version upon a negative edge + // trigger (transition from valid to invalid). + // + // `MaybeCreateVersion(..., true)` creates a version on a positive level + // trigger (state is valid). Status MaybeCreateVersion(const VersionEdit& edit, ColumnFamilyData* cfd, bool force_create_version) override; virtual Status VerifyFile(ColumnFamilyData* cfd, const std::string& fpath, @@ -251,6 +262,30 @@ class VersionEditHandlerPointInTime : public VersionEditHandler { bool is_initial_load) override; std::unordered_map versions_; + + // `atomic_update_versions_` is for ensuring all-or-nothing AtomicGroup + // recoveries. When `atomic_update_versions_` is nonempty, it serves as a + // barrier to updating `versions_` until all its values are populated. + std::unordered_map atomic_update_versions_; + // `atomic_update_versions_missing_` counts the nullptr values in + // `atomic_update_versions_`. + size_t atomic_update_versions_missing_; + + bool in_atomic_group_ = false; + + private: + bool AtomicUpdateVersionsCompleted(); + bool AtomicUpdateVersionsContains(uint32_t cfid); + void AtomicUpdateVersionsDropCf(uint32_t cfid); + + // This function is called for `Version*` updates for column families in an + // incomplete atomic update. It buffers `Version*` updates in + // `atomic_update_versions_`. + void AtomicUpdateVersionsPut(Version* version); + + // This function is called upon completion of an atomic update. It applies + // `Version*` updates in `atomic_update_versions_` to `versions_`. + void AtomicUpdateVersionsApply(); }; class ManifestTailer : public VersionEditHandlerPointInTime { diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 5590b6d2288..f6a983d6b21 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -1219,6 +1219,7 @@ class VersionSetTestBase { const static std::string kColumnFamilyName1; const static std::string kColumnFamilyName2; const static std::string kColumnFamilyName3; + const static int kNumColumnFamilies = 4; int num_initial_edits_; explicit VersionSetTestBase(const std::string& name) @@ -1232,7 +1233,7 @@ class VersionSetTestBase { table_cache_(NewLRUCache(50000, 16)), write_buffer_manager_(db_options_.db_write_buffer_size), shutting_down_(false), - mock_table_factory_(std::make_shared()) { + table_factory_(std::make_shared()) { EXPECT_OK(test::CreateEnvFromSystem(ConfigOptions(), &env_, &env_guard_)); if (env_ == Env::Default() && getenv("MEM_ENV")) { env_guard_.reset(NewMemEnv(Env::Default())); @@ -1332,12 +1333,74 @@ class VersionSetTestBase { } ASSERT_OK(s); - cf_options_.table_factory = mock_table_factory_; + cf_options_.table_factory = table_factory_; for (const auto& cf_name : cf_names) { column_families->emplace_back(cf_name, cf_options_); } } + struct SstInfo { + uint64_t file_number; + std::string column_family; + std::string key; // the only key + int level = 0; + uint64_t epoch_number; + SstInfo(uint64_t file_num, const std::string& cf_name, + const std::string& _key, + uint64_t _epoch_number = kUnknownEpochNumber) + : SstInfo(file_num, cf_name, _key, 0, _epoch_number) {} + SstInfo(uint64_t file_num, const std::string& cf_name, + const std::string& _key, int lvl, + uint64_t _epoch_number = kUnknownEpochNumber) + : file_number(file_num), + column_family(cf_name), + key(_key), + level(lvl), + epoch_number(_epoch_number) {} + }; + + // Create dummy sst, return their metadata. Note that only file name and size + // are used. + void CreateDummyTableFiles(const std::vector& file_infos, + std::vector* file_metas) { + assert(file_metas != nullptr); + for (const auto& info : file_infos) { + uint64_t file_num = info.file_number; + std::string fname = MakeTableFileName(dbname_, file_num); + std::unique_ptr file; + Status s = fs_->NewWritableFile(fname, FileOptions(), &file, nullptr); + ASSERT_OK(s); + std::unique_ptr fwriter(new WritableFileWriter( + std::move(file), fname, FileOptions(), env_->GetSystemClock().get())); + InternalTblPropCollFactories internal_tbl_prop_coll_factories; + + const ReadOptions read_options; + const WriteOptions write_options; + std::unique_ptr builder(table_factory_->NewTableBuilder( + TableBuilderOptions( + immutable_options_, mutable_cf_options_, read_options, + write_options, InternalKeyComparator(options_.comparator), + &internal_tbl_prop_coll_factories, kNoCompression, + CompressionOptions(), + TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, + info.column_family, info.level), + fwriter.get())); + InternalKey ikey(info.key, 0, ValueType::kTypeValue); + builder->Add(ikey.Encode(), "value"); + ASSERT_OK(builder->Finish()); + ASSERT_OK(fwriter->Flush(IOOptions())); + uint64_t file_size = 0; + s = fs_->GetFileSize(fname, IOOptions(), &file_size, nullptr); + ASSERT_OK(s); + ASSERT_NE(0, file_size); + file_metas->emplace_back( + file_num, /*file_path_id=*/0, file_size, ikey, ikey, 0, 0, false, + Temperature::kUnknown, 0, 0, 0, info.epoch_number, + kUnknownFileChecksum, kUnknownFileChecksumFuncName, kNullUniqueId64x2, + 0, 0, /* user_defined_timestamps_persisted */ true); + } + } + // Create DB with 3 column families. void NewDB() { SequenceNumber last_seqno; @@ -1469,7 +1532,7 @@ class VersionSetTestBase { std::shared_ptr reactive_versions_; InstrumentedMutex mutex_; std::atomic shutting_down_; - std::shared_ptr mock_table_factory_; + std::shared_ptr table_factory_; std::vector column_families_; }; @@ -2506,6 +2569,9 @@ class VersionSetAtomicGroupTest : public VersionSetTestBase, VersionSetAtomicGroupTest() : VersionSetTestBase("version_set_atomic_group_test") {} + explicit VersionSetAtomicGroupTest(const std::string& name) + : VersionSetTestBase(name) {} + void SetUp() override { PrepareManifest(&column_families_, &last_seqno_, &log_writer_); SetupTestSyncPoints(); @@ -2603,7 +2669,7 @@ class VersionSetAtomicGroupTest : public VersionSetTestBase, void AddNewEditsToLog(int num_edits) { for (int i = 0; i < num_edits; i++) { std::string record; - edits_[i].EncodeTo(&record); + edits_[i].EncodeTo(&record, 0 /* ts_sz */); ASSERT_OK(log_writer_->AddRecord(WriteOptions(), record)); } } @@ -2872,6 +2938,425 @@ TEST_F(VersionSetAtomicGroupTest, edit_with_incorrect_group_size_.DebugString()); } +class AtomicGroupBestEffortRecoveryTest : public VersionSetAtomicGroupTest { + public: + AtomicGroupBestEffortRecoveryTest() + : VersionSetAtomicGroupTest("atomic_group_best_effort_recovery_test") {} +}; + +TEST_F(AtomicGroupBestEffortRecoveryTest, + HandleAtomicGroupUpdatesValidInitially) { + // One AtomicGroup contains updates that are valid at the outset. + std::vector file_infos; + for (int cfid = 0; cfid < kNumColumnFamilies; cfid++) { + int file_number = 10 + cfid; + file_infos.emplace_back(file_number, column_families_[cfid].name, + "" /* key */, 0 /* level */, + file_number /* epoch_number */); + } + + std::vector file_metas; + CreateDummyTableFiles(file_infos, &file_metas); + + edits_.clear(); + for (int cfid = 0; cfid < kNumColumnFamilies; cfid++) { + edits_.emplace_back(); + edits_.back().SetColumnFamily(cfid); + edits_.back().AddFile(0 /* level */, file_metas[cfid]); + edits_.back().SetLastSequence(++last_seqno_); + edits_.back().MarkAtomicGroup(kNumColumnFamilies - 1 - + cfid /* remaining_entries */); + } + AddNewEditsToLog(kNumColumnFamilies); + + { + bool has_missing_table_file; + ASSERT_OK(versions_->TryRecover(column_families_, false /* read_only */, + {DescriptorFileName(1 /* number */)}, + nullptr /* db_id */, + &has_missing_table_file)); + ASSERT_FALSE(has_missing_table_file); + } + std::vector all_table_files; + std::vector all_blob_files; + versions_->AddLiveFiles(&all_table_files, &all_blob_files); + ASSERT_EQ(file_metas.size(), all_table_files.size()); +} + +TEST_F(AtomicGroupBestEffortRecoveryTest, HandleAtomicGroupUpdatesValidLater) { + // One AtomicGroup contains updates that become valid after applying further + // updates. + + // `SetupTestSyncPoints()` creates sync points that assume there is only one + // AtomicGroup, which is not the case in this test. + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + std::vector file_infos; + for (int cfid = 0; cfid < kNumColumnFamilies; cfid++) { + int file_number = 10 + cfid; + file_infos.emplace_back(file_number, column_families_[cfid].name, + "" /* key */, 0 /* level */, + file_number /* epoch_number */); + } + + std::vector file_metas; + CreateDummyTableFiles(file_infos, &file_metas); + + edits_.clear(); + for (int cfid = 0; cfid < kNumColumnFamilies; cfid++) { + if (cfid == kNumColumnFamilies - 1) { + // Corrupt the number of the last file. + file_metas[cfid].fd.packed_number_and_path_id = + PackFileNumberAndPathId(20 /* number */, 0 /* path_id */); + } + edits_.emplace_back(); + edits_.back().SetColumnFamily(cfid); + edits_.back().AddFile(0 /* level */, file_metas[cfid]); + edits_.back().SetLastSequence(++last_seqno_); + edits_.back().MarkAtomicGroup(kNumColumnFamilies - 1 - + cfid /* remaining_entries */); + } + AddNewEditsToLog(kNumColumnFamilies); + + { + // Delete the file with the corrupted number. + VersionEdit fixup_edit; + fixup_edit.SetColumnFamily(kNumColumnFamilies - 1); + fixup_edit.DeleteFile(0 /* level */, 20 /* number */); + assert(log_writer_.get() != nullptr); + std::string record; + ASSERT_TRUE(fixup_edit.EncodeTo(&record, 0 /* ts_sz */)); + ASSERT_OK(log_writer_->AddRecord(WriteOptions(), record)); + + // Throw in an impossible AtomicGroup afterwards for extra challenge. + VersionEdit broken_edit; + broken_edit.SetColumnFamily(0 /* column_family_id */); + file_metas[0].fd.packed_number_and_path_id = + PackFileNumberAndPathId(30 /* number */, 0 /* path_id */); + broken_edit.AddFile(0 /* level */, file_metas[0]); + broken_edit.SetLastSequence(++last_seqno_); + broken_edit.MarkAtomicGroup(0 /* remaining_entries */); + record.clear(); + ASSERT_TRUE(broken_edit.EncodeTo(&record, 0 /* ts_sz */)); + ASSERT_OK(log_writer_->AddRecord(WriteOptions(), record)); + assert(log_writer_.get() != nullptr); + } + + { + bool has_missing_table_file = false; + ASSERT_OK(versions_->TryRecover(column_families_, false /* read_only */, + {DescriptorFileName(1 /* number */)}, + nullptr /* db_id */, + &has_missing_table_file)); + ASSERT_TRUE(has_missing_table_file); + } + std::vector all_table_files; + std::vector all_blob_files; + versions_->AddLiveFiles(&all_table_files, &all_blob_files); + ASSERT_EQ(file_metas.size() - 1, all_table_files.size()); +} + +TEST_F(AtomicGroupBestEffortRecoveryTest, HandleAtomicGroupUpdatesInvalid) { + // One AtomicGroup contains updates that never become valid. + std::vector file_infos; + for (int cfid = 0; cfid < kNumColumnFamilies; cfid++) { + int file_number = 10 + cfid; + file_infos.emplace_back(file_number, column_families_[cfid].name, + "" /* key */, 0 /* level */, + file_number /* epoch_number */); + } + + std::vector file_metas; + CreateDummyTableFiles(file_infos, &file_metas); + + edits_.clear(); + for (int cfid = 0; cfid < kNumColumnFamilies; cfid++) { + if (cfid == kNumColumnFamilies - 1) { + // Corrupt the number of the last file. + file_metas[cfid].fd.packed_number_and_path_id = + PackFileNumberAndPathId(20 /* number */, 0 /* path_id */); + } + edits_.emplace_back(); + edits_.back().SetColumnFamily(cfid); + edits_.back().AddFile(0 /* level */, file_metas[cfid]); + edits_.back().SetLastSequence(++last_seqno_); + edits_.back().MarkAtomicGroup(kNumColumnFamilies - 1 - + cfid /* remaining_entries */); + } + AddNewEditsToLog(kNumColumnFamilies); + + { + bool has_missing_table_file = false; + ASSERT_OK(versions_->TryRecover(column_families_, false /* read_only */, + {DescriptorFileName(1 /* number */)}, + nullptr /* db_id */, + &has_missing_table_file)); + ASSERT_TRUE(has_missing_table_file); + } + std::vector all_table_files; + std::vector all_blob_files; + versions_->AddLiveFiles(&all_table_files, &all_blob_files); + ASSERT_TRUE(all_table_files.empty()); +} + +TEST_F(AtomicGroupBestEffortRecoveryTest, + HandleAtomicGroupUpdatesValidTooLate) { + // One AtomicGroup contains updates that become valid after the next + // AtomicGroup is reached, which is too late. + + // `SetupTestSyncPoints()` creates sync points that assume there is only one + // AtomicGroup, which is not the case in this test. + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + std::vector file_infos; + for (int cfid = 0; cfid < kNumColumnFamilies; cfid++) { + int file_number = 10 + cfid; + file_infos.emplace_back(file_number, column_families_[cfid].name, + "" /* key */, 0 /* level */, + file_number /* epoch_number */); + } + + std::vector file_metas; + CreateDummyTableFiles(file_infos, &file_metas); + + edits_.clear(); + for (int cfid = 0; cfid < kNumColumnFamilies; cfid++) { + if (cfid == kNumColumnFamilies - 1) { + // Corrupt the number of the last file. + file_metas[cfid].fd.packed_number_and_path_id = + PackFileNumberAndPathId(20 /* number */, 0 /* path_id */); + } + edits_.emplace_back(); + edits_.back().SetColumnFamily(cfid); + edits_.back().AddFile(0 /* level */, file_metas[cfid]); + edits_.back().SetLastSequence(++last_seqno_); + edits_.back().MarkAtomicGroup(kNumColumnFamilies - 1 - + cfid /* remaining_entries */); + } + AddNewEditsToLog(kNumColumnFamilies); + + { + // Delete the file with the corrupted number. But bundle it in an + // AtomicGroup with an update that can never be applied. + VersionEdit broken_edit; + broken_edit.SetColumnFamily(0 /* column_family_id */); + file_metas[0].fd.packed_number_and_path_id = + PackFileNumberAndPathId(30 /* number */, 0 /* path_id */); + broken_edit.AddFile(0 /* level */, file_metas[0]); + broken_edit.SetLastSequence(++last_seqno_); + broken_edit.MarkAtomicGroup(1 /* remaining_entries */); + std::string record; + ASSERT_TRUE(broken_edit.EncodeTo(&record, 0 /* ts_sz */)); + ASSERT_OK(log_writer_->AddRecord(WriteOptions(), record)); + + VersionEdit fixup_edit; + fixup_edit.SetColumnFamily(kNumColumnFamilies - 1); + fixup_edit.DeleteFile(0 /* level */, 20 /* number */); + fixup_edit.MarkAtomicGroup(0 /* remaining_entries */); + record.clear(); + ASSERT_TRUE(fixup_edit.EncodeTo(&record, 0 /* ts_sz */)); + ASSERT_OK(log_writer_->AddRecord(WriteOptions(), record)); + assert(log_writer_.get() != nullptr); + } + + { + bool has_missing_table_file = false; + ASSERT_OK(versions_->TryRecover(column_families_, false /* read_only */, + {DescriptorFileName(1 /* number */)}, + nullptr /* db_id */, + &has_missing_table_file)); + ASSERT_TRUE(has_missing_table_file); + } + std::vector all_table_files; + std::vector all_blob_files; + versions_->AddLiveFiles(&all_table_files, &all_blob_files); + ASSERT_TRUE(all_table_files.empty()); +} + +TEST_F(AtomicGroupBestEffortRecoveryTest, + HandleAtomicGroupUpdatesInDuplicateInvalid) { + // One AtomicGroup has multiple updates for the same CF. One of the earlier + // updates for this CF can lead to a valid state if applied. But the last + // update for this CF is invalid so the AtomicGroup must not be recovered. + std::vector file_infos; + for (int cfid = 0; cfid < kNumColumnFamilies; cfid++) { + int file_number = 10 + cfid; + file_infos.emplace_back(file_number, column_families_[cfid].name, + "" /* key */, 0 /* level */, + file_number /* epoch_number */); + } + + std::vector file_metas; + CreateDummyTableFiles(file_infos, &file_metas); + + edits_.clear(); + for (int cfid = 0; cfid < kNumColumnFamilies; cfid++) { + edits_.emplace_back(); + edits_.back().SetColumnFamily(cfid); + edits_.back().AddFile(0 /* level */, file_metas[cfid]); + edits_.back().SetLastSequence(++last_seqno_); + edits_.back().MarkAtomicGroup(kNumColumnFamilies - + cfid /* remaining_entries */); + } + // Here is the unrecoverable update. + edits_.emplace_back(); + edits_.back().SetColumnFamily(0 /* column_family_id */); + file_metas[0].fd.packed_number_and_path_id = + PackFileNumberAndPathId(20 /* number */, 0 /* path_id */); + edits_.back().AddFile(0 /* level */, file_metas[0]); + edits_.back().SetLastSequence(++last_seqno_); + edits_.back().MarkAtomicGroup(0 /* remaining_entries */); + AddNewEditsToLog(kNumColumnFamilies + 1); + + { + bool has_missing_table_file = false; + ASSERT_OK(versions_->TryRecover(column_families_, false /* read_only */, + {DescriptorFileName(1 /* number */)}, + nullptr /* db_id */, + &has_missing_table_file)); + ASSERT_TRUE(has_missing_table_file); + } + std::vector all_table_files; + std::vector all_blob_files; + versions_->AddLiveFiles(&all_table_files, &all_blob_files); + ASSERT_TRUE(all_table_files.empty()); +} + +TEST_F(AtomicGroupBestEffortRecoveryTest, + HandleAtomicGroupMadeWholeByDeletingCf) { + // One AtomicGroup contains an update that becomes valid when its column + // family is deleted, making it irrelevant. + std::vector file_infos; + for (int cfid = 0; cfid < kNumColumnFamilies; cfid++) { + int file_number = 10 + cfid; + file_infos.emplace_back(file_number, column_families_[cfid].name, + "" /* key */, 0 /* level */, + file_number /* epoch_number */); + } + + std::vector file_metas; + CreateDummyTableFiles(file_infos, &file_metas); + + edits_.clear(); + for (int cfid = 0; cfid < kNumColumnFamilies; cfid++) { + if (cfid == kNumColumnFamilies - 1) { + // Corrupt the number of the last file. + file_metas[cfid].fd.packed_number_and_path_id = + PackFileNumberAndPathId(20 /* number */, 0 /* path_id */); + } + edits_.emplace_back(); + edits_.back().SetColumnFamily(cfid); + edits_.back().AddFile(0 /* level */, file_metas[cfid]); + edits_.back().SetLastSequence(++last_seqno_); + edits_.back().MarkAtomicGroup(kNumColumnFamilies - 1 - + cfid /* remaining_entries */); + } + AddNewEditsToLog(kNumColumnFamilies); + + { + // Delete the column family with the corrupted file number. + VersionEdit fixup_edit; + fixup_edit.DropColumnFamily(); + fixup_edit.SetColumnFamily(kNumColumnFamilies - 1); + assert(log_writer_.get() != nullptr); + std::string record; + ASSERT_TRUE(fixup_edit.EncodeTo(&record, 0 /* ts_sz */)); + ASSERT_OK(log_writer_->AddRecord(WriteOptions(), record)); + } + + { + bool has_missing_table_file = false; + ASSERT_OK(versions_->TryRecover(column_families_, false /* read_only */, + {DescriptorFileName(1 /* number */)}, + nullptr /* db_id */, + &has_missing_table_file)); + ASSERT_FALSE(has_missing_table_file); + } + std::vector all_table_files; + std::vector all_blob_files; + versions_->AddLiveFiles(&all_table_files, &all_blob_files); + ASSERT_EQ(file_metas.size() - 1, all_table_files.size()); +} + +TEST_F(AtomicGroupBestEffortRecoveryTest, + HandleAtomicGroupMadeWholeAfterNewCf) { + // One AtomicGroup contains updates that become valid after a new column + // family is added. + std::vector file_infos; + for (int cfid = 0; cfid < kNumColumnFamilies; cfid++) { + int file_number = 10 + cfid; + file_infos.emplace_back(file_number, column_families_[cfid].name, + "" /* key */, 0 /* level */, + file_number /* epoch_number */); + } + + std::vector file_metas; + CreateDummyTableFiles(file_infos, &file_metas); + + edits_.clear(); + for (int cfid = 0; cfid < kNumColumnFamilies; cfid++) { + if (cfid == kNumColumnFamilies - 1) { + // Corrupt the number of the last file. + file_metas[cfid].fd.packed_number_and_path_id = + PackFileNumberAndPathId(20 /* number */, 0 /* path_id */); + } + edits_.emplace_back(); + edits_.back().SetColumnFamily(cfid); + edits_.back().AddFile(0 /* level */, file_metas[cfid]); + edits_.back().SetLastSequence(++last_seqno_); + edits_.back().MarkAtomicGroup(kNumColumnFamilies - 1 - + cfid /* remaining_entries */); + } + AddNewEditsToLog(kNumColumnFamilies); + + { + // Add a new CF. + VersionEdit add_cf_edit; + add_cf_edit.AddColumnFamily("extra_cf"); + add_cf_edit.SetColumnFamily(kNumColumnFamilies); + std::string record; + ASSERT_TRUE(add_cf_edit.EncodeTo(&record, 0 /* ts_sz */)); + ASSERT_OK(log_writer_->AddRecord(WriteOptions(), record)); + + // Have the new CF refer to a non-existent file for an extra challenge. + VersionEdit broken_edit; + broken_edit.SetColumnFamily(kNumColumnFamilies); + file_metas[0].fd.packed_number_and_path_id = + PackFileNumberAndPathId(30 /* number */, 0 /* path_id */); + broken_edit.AddFile(0 /* level */, file_metas[0]); + broken_edit.SetLastSequence(++last_seqno_); + record.clear(); + ASSERT_TRUE(broken_edit.EncodeTo(&record, 0 /* ts_sz */)); + ASSERT_OK(log_writer_->AddRecord(WriteOptions(), record)); + + // This fixes up the first of the two non-existent file references. + VersionEdit fixup_edit; + fixup_edit.SetColumnFamily(kNumColumnFamilies - 1); + fixup_edit.DeleteFile(0 /* level */, 20 /* number */); + record.clear(); + ASSERT_TRUE(fixup_edit.EncodeTo(&record, 0 /* ts_sz */)); + ASSERT_OK(log_writer_->AddRecord(WriteOptions(), record)); + assert(log_writer_.get() != nullptr); + } + + { + bool has_missing_table_file = false; + std::vector column_families = column_families_; + column_families.emplace_back("extra_cf", cf_options_); + ASSERT_OK(versions_->TryRecover(column_families, false /* read_only */, + {DescriptorFileName(1 /* number */)}, + nullptr /* db_id */, + &has_missing_table_file)); + ASSERT_TRUE(has_missing_table_file); + } + std::vector all_table_files; + std::vector all_blob_files; + versions_->AddLiveFiles(&all_table_files, &all_blob_files); + ASSERT_EQ(file_metas.size() - 1, all_table_files.size()); +} + class VersionSetTestDropOneCF : public VersionSetTestBase, public testing::TestWithParam { public: @@ -3381,9 +3866,6 @@ class VersionSetTestMissingFiles : public VersionSetTestBase, public: VersionSetTestMissingFiles() : VersionSetTestBase("version_set_test_missing_files"), - block_based_table_options_(), - table_factory_(std::make_shared( - block_based_table_options_)), internal_comparator_( std::make_shared(options_.comparator)) {} @@ -3457,68 +3939,6 @@ class VersionSetTestMissingFiles : public VersionSetTestBase, *last_seqno = seq + 1; } - struct SstInfo { - uint64_t file_number; - std::string column_family; - std::string key; // the only key - int level = 0; - uint64_t epoch_number; - SstInfo(uint64_t file_num, const std::string& cf_name, - const std::string& _key, - uint64_t _epoch_number = kUnknownEpochNumber) - : SstInfo(file_num, cf_name, _key, 0, _epoch_number) {} - SstInfo(uint64_t file_num, const std::string& cf_name, - const std::string& _key, int lvl, - uint64_t _epoch_number = kUnknownEpochNumber) - : file_number(file_num), - column_family(cf_name), - key(_key), - level(lvl), - epoch_number(_epoch_number) {} - }; - - // Create dummy sst, return their metadata. Note that only file name and size - // are used. - void CreateDummyTableFiles(const std::vector& file_infos, - std::vector* file_metas) { - assert(file_metas != nullptr); - for (const auto& info : file_infos) { - uint64_t file_num = info.file_number; - std::string fname = MakeTableFileName(dbname_, file_num); - std::unique_ptr file; - Status s = fs_->NewWritableFile(fname, FileOptions(), &file, nullptr); - ASSERT_OK(s); - std::unique_ptr fwriter(new WritableFileWriter( - std::move(file), fname, FileOptions(), env_->GetSystemClock().get())); - InternalTblPropCollFactories internal_tbl_prop_coll_factories; - - const ReadOptions read_options; - const WriteOptions write_options; - std::unique_ptr builder(table_factory_->NewTableBuilder( - TableBuilderOptions( - immutable_options_, mutable_cf_options_, read_options, - write_options, *internal_comparator_, - &internal_tbl_prop_coll_factories, kNoCompression, - CompressionOptions(), - TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, - info.column_family, info.level), - fwriter.get())); - InternalKey ikey(info.key, 0, ValueType::kTypeValue); - builder->Add(ikey.Encode(), "value"); - ASSERT_OK(builder->Finish()); - ASSERT_OK(fwriter->Flush(IOOptions())); - uint64_t file_size = 0; - s = fs_->GetFileSize(fname, IOOptions(), &file_size, nullptr); - ASSERT_OK(s); - ASSERT_NE(0, file_size); - file_metas->emplace_back( - file_num, /*file_path_id=*/0, file_size, ikey, ikey, 0, 0, false, - Temperature::kUnknown, 0, 0, 0, info.epoch_number, - kUnknownFileChecksum, kUnknownFileChecksumFuncName, kNullUniqueId64x2, - 0, 0, /* user_defined_timestamps_persisted */ true); - } - } - // This method updates last_sequence_. void WriteFileAdditionAndDeletionToManifest( uint32_t cf, const std::vector>& added_files, @@ -3542,8 +3962,6 @@ class VersionSetTestMissingFiles : public VersionSetTestBase, ASSERT_OK(s); } - BlockBasedTableOptions block_based_table_options_; - std::shared_ptr table_factory_; std::shared_ptr internal_comparator_; std::vector column_families_; SequenceNumber last_seqno_; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 45bcbac9e54..dff06ee2ad5 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1356,6 +1356,13 @@ struct DBOptions { // is like applying WALRecoveryMode::kPointInTimeRecovery to each column // family rather than just the WAL. // + // The behavior changes in the presence of "AtomicGroup"s in the MANIFEST, + // which is currently only the case when `atomic_flush == true`. In that + // case, all pre-existing CFs must recover the atomic group in order for + // that group to be applied in an all-or-nothing manner. This means that + // unused/inactive CF(s) with invalid filesystem state can block recovery of + // all other CFs at an atomic group. + // // Best-efforts recovery (BER) is specifically designed to recover a DB with // files that are missing or truncated to some smaller size, such as the // result of an incomplete DB "physical" (FileSystem) copy. BER can also @@ -1373,8 +1380,6 @@ struct DBOptions { // setting. BER does require at least one valid MANIFEST to recover to a // non-trivial DB state, unlike `ldb repair`. // - // Currently, best_efforts_recovery=true is not compatible with atomic flush. - // // Default: false bool best_efforts_recovery = false; diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index b1916381005..7037156b52a 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -425,7 +425,6 @@ def is_direct_io_supported(dbname): best_efforts_recovery_params = { "best_efforts_recovery": 1, - "atomic_flush": 0, "disable_wal": 1, "column_families": 1, "skip_verifydb": 1, @@ -668,7 +667,6 @@ def finalize_and_sanitize(src_params): dest_params["enable_pipelined_write"] = 0 if dest_params.get("best_efforts_recovery") == 1: dest_params["disable_wal"] = 1 - dest_params["atomic_flush"] = 0 dest_params["enable_compaction_filter"] = 0 dest_params["sync"] = 0 dest_params["write_fault_one_in"] = 0 diff --git a/unreleased_history/public_api_changes/best_effort_atomic.md b/unreleased_history/public_api_changes/best_effort_atomic.md new file mode 100644 index 00000000000..5f610ec1e49 --- /dev/null +++ b/unreleased_history/public_api_changes/best_effort_atomic.md @@ -0,0 +1 @@ +* Best-effort recovery (`best_efforts_recovery == true`) may now be used together with atomic flush (`atomic_flush == true`). The all-or-nothing recovery guarantee for atomically flushed data will be upheld.