diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index e3eb3253e6a..d45f0c2b8e6 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -2946,6 +2946,14 @@ DBOptions SanitizeOptions(const std::string& db, const DBOptions& src, CompressionType GetCompressionFlush(const ImmutableCFOptions& ioptions, const MutableCFOptions& mutable_cf_options); +// Return a VersionEdit for the DB's recovery when the `memtables` of the +// specified column family are obsolete. Specifically, the min log number to +// keep, and the WAL files that can be deleted. +VersionEdit GetDBRecoveryEditForObsoletingMemTables( + VersionSet* vset, const ColumnFamilyData& cfd, + const autovector& edit_list, + const autovector& memtables, LogsWithPrepTracker* prep_tracker); + // Return the earliest log file to keep after the memtable flush is // finalized. // `cfd_to_flush` is the column family whose memtable (specified in diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index 0db7293682d..bb0ff3985a9 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -722,6 +722,38 @@ void DBImpl::DeleteObsoleteFiles() { mutex_.Lock(); } +VersionEdit GetDBRecoveryEditForObsoletingMemTables( + VersionSet* vset, const ColumnFamilyData& cfd, + const autovector& edit_list, + const autovector& memtables, LogsWithPrepTracker* prep_tracker) { + VersionEdit wal_deletion_edit; + uint64_t min_wal_number_to_keep = 0; + assert(edit_list.size() > 0); + if (vset->db_options()->allow_2pc) { + // Note that if mempurge is successful, the edit_list will + // not be applicable (contains info of new min_log number to keep, + // and level 0 file path of SST file created during normal flush, + // so both pieces of information are irrelevant after a successful + // mempurge operation). + min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC( + vset, cfd, edit_list, memtables, prep_tracker); + + // We piggyback the information of earliest log file to keep in the + // manifest entry for the last file flushed. + } else { + min_wal_number_to_keep = + PrecomputeMinLogNumberToKeepNon2PC(vset, cfd, edit_list); + } + + wal_deletion_edit.SetMinLogNumberToKeep(min_wal_number_to_keep); + if (vset->db_options()->track_and_verify_wals_in_manifest) { + if (min_wal_number_to_keep > vset->GetWalSet().GetMinWalNumberToKeep()) { + wal_deletion_edit.DeleteWalsBefore(min_wal_number_to_keep); + } + } + return wal_deletion_edit; +} + uint64_t FindMinPrepLogReferencedByMemTable( VersionSet* vset, const autovector& memtables_to_flush) { uint64_t min_log = 0; diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 3675a280b93..a4ad1de5719 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -582,37 +582,28 @@ Status MemTableList::TryInstallMemtableFlushResults( // TODO(myabandeh): Not sure how batch_count could be 0 here. if (batch_count > 0) { - uint64_t min_wal_number_to_keep = 0; - assert(edit_list.size() > 0); - if (vset->db_options()->allow_2pc) { - // Note that if mempurge is successful, the edit_list will - // not be applicable (contains info of new min_log number to keep, - // and level 0 file path of SST file created during normal flush, - // so both pieces of information are irrelevant after a successful - // mempurge operation). - min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC( - vset, *cfd, edit_list, memtables_to_flush, prep_tracker); - - // We piggyback the information of earliest log file to keep in the - // manifest entry for the last file flushed. + VersionEdit edit; +#ifndef NDEBUG + if (memtables_to_flush.size() == memlist.size()) { + // TODO(yuzhangyu): remove this testing code once the + // `GetEditForDroppingCurrentVersion` API is used by the atomic data + // replacement. This function can get the same edits for wal related + // fields, and some duplicated fields as contained already in edit_list + // for column family's recovery. + edit = GetEditForDroppingCurrentVersion(cfd, vset, prep_tracker); } else { - min_wal_number_to_keep = - PrecomputeMinLogNumberToKeepNon2PC(vset, *cfd, edit_list); - } - - VersionEdit wal_deletion; - wal_deletion.SetMinLogNumberToKeep(min_wal_number_to_keep); - if (vset->db_options()->track_and_verify_wals_in_manifest) { - if (min_wal_number_to_keep > - vset->GetWalSet().GetMinWalNumberToKeep()) { - wal_deletion.DeleteWalsBefore(min_wal_number_to_keep); - } - TEST_SYNC_POINT_CALLBACK( - "MemTableList::TryInstallMemtableFlushResults:" - "AfterComputeMinWalToKeep", - nullptr); + edit = GetDBRecoveryEditForObsoletingMemTables( + vset, *cfd, edit_list, memtables_to_flush, prep_tracker); } - edit_list.push_back(&wal_deletion); +#else + edit = GetDBRecoveryEditForObsoletingMemTables( + vset, *cfd, edit_list, memtables_to_flush, prep_tracker); +#endif /* !NDEBUG */ + TEST_SYNC_POINT_CALLBACK( + "MemTableList::TryInstallMemtableFlushResults:" + "AfterComputeMinWalToKeep", + nullptr); + edit_list.push_back(&edit); const auto manifest_write_cb = [this, cfd, batch_count, log_buffer, to_delete, mu](const Status& status) { @@ -1026,4 +1017,38 @@ void MemTableList::RemoveOldMemTables(uint64_t log_number, ResetTrimHistoryNeeded(); } +VersionEdit MemTableList::GetEditForDroppingCurrentVersion( + const ColumnFamilyData* cfd, VersionSet* vset, + LogsWithPrepTracker* prep_tracker) const { + assert(cfd); + auto& memlist = current_->memlist_; + if (memlist.empty()) { + return VersionEdit(); + } + + uint64_t max_next_log_number = 0; + autovector edit_list; + autovector memtables_to_drop; + for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { + MemTable* m = *it; + memtables_to_drop.push_back(m); + max_next_log_number = std::max(m->GetNextLogNumber(), max_next_log_number); + } + + // Check the obsoleted MemTables' impact on WALs related to DB's recovery (min + // log number to keep, a delta of WAL files to delete). + VersionEdit edit_with_log_number; + edit_with_log_number.SetPrevLogNumber(0); + edit_with_log_number.SetLogNumber(max_next_log_number); + edit_list.push_back(&edit_with_log_number); + VersionEdit edit = GetDBRecoveryEditForObsoletingMemTables( + vset, *cfd, edit_list, memtables_to_drop, prep_tracker); + + // Set fields related to the column family's recovery. + edit.SetColumnFamily(cfd->GetID()); + edit.SetPrevLogNumber(0); + edit.SetLogNumber(max_next_log_number); + return edit; +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/memtable_list.h b/db/memtable_list.h index 218701e0b3b..dd439de5590 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -447,6 +447,12 @@ class MemTableList { void RemoveOldMemTables(uint64_t log_number, autovector* to_delete); + // This API is only used by atomic date replacement. To get an edit for + // dropping the current `MemTableListVersion`. + VersionEdit GetEditForDroppingCurrentVersion( + const ColumnFamilyData* cfd, VersionSet* vset, + LogsWithPrepTracker* prep_tracker) const; + private: friend Status InstallMemtableAtomicFlushResults( const autovector* imm_lists,