Skip to content

Commit

Permalink
Add an internal API MemTableList::GetEditForDroppingCurrentVersion (f…
Browse files Browse the repository at this point in the history
…acebook#13001)

Summary:
Prepare this internal API to be used by atomic data replacement. The main purpose of this API is to get a `VersionEdit` to mark the entire current `MemTableListVersion` as dropped.  Flush needs the similar functionality when installing results, so that logic is refactored into a util function `GetDBRecoveryEditForObsoletingMemTables` to be shared by flush and this internal API.

To test this internal API, flush's result installation is redirected to use this API when it is flushing all the immutable MemTables in debug mode. It should achieve the exact same results, just with a duplicated `VersionEdit::log_number` field that doesn't upsets the recovery logic.

Pull Request resolved: facebook#13001

Test Plan: Existing tests

Reviewed By: pdillinger

Differential Revision: D62309591

Pulled By: jowlyzhang

fbshipit-source-id: e25914d9a2e281c25ab7ee31a66eaf6adfae4b88
  • Loading branch information
jowlyzhang authored and facebook-github-bot committed Sep 10, 2024
1 parent 55ac0b7 commit 43bc71f
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 29 deletions.
8 changes: 8 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2963,6 +2963,14 @@ DBOptions SanitizeOptions(const std::string& db, const DBOptions& src,
CompressionType GetCompressionFlush(const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options);

// Return a VersionEdit for the DB's recovery when the `memtables` of the
// specified column family are obsolete. Specifically, the min log number to
// keep, and the WAL files that can be deleted.
VersionEdit GetDBRecoveryEditForObsoletingMemTables(
VersionSet* vset, const ColumnFamilyData& cfd,
const autovector<VersionEdit*>& edit_list,
const autovector<MemTable*>& memtables, LogsWithPrepTracker* prep_tracker);

// Return the earliest log file to keep after the memtable flush is
// finalized.
// `cfd_to_flush` is the column family whose memtable (specified in
Expand Down
32 changes: 32 additions & 0 deletions db/db_impl/db_impl_files.cc
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,38 @@ void DBImpl::DeleteObsoleteFiles() {
mutex_.Lock();
}

VersionEdit GetDBRecoveryEditForObsoletingMemTables(
VersionSet* vset, const ColumnFamilyData& cfd,
const autovector<VersionEdit*>& edit_list,
const autovector<MemTable*>& memtables, LogsWithPrepTracker* prep_tracker) {
VersionEdit wal_deletion_edit;
uint64_t min_wal_number_to_keep = 0;
assert(edit_list.size() > 0);
if (vset->db_options()->allow_2pc) {
// Note that if mempurge is successful, the edit_list will
// not be applicable (contains info of new min_log number to keep,
// and level 0 file path of SST file created during normal flush,
// so both pieces of information are irrelevant after a successful
// mempurge operation).
min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC(
vset, cfd, edit_list, memtables, prep_tracker);

// We piggyback the information of earliest log file to keep in the
// manifest entry for the last file flushed.
} else {
min_wal_number_to_keep =
PrecomputeMinLogNumberToKeepNon2PC(vset, cfd, edit_list);
}

wal_deletion_edit.SetMinLogNumberToKeep(min_wal_number_to_keep);
if (vset->db_options()->track_and_verify_wals_in_manifest) {
if (min_wal_number_to_keep > vset->GetWalSet().GetMinWalNumberToKeep()) {
wal_deletion_edit.DeleteWalsBefore(min_wal_number_to_keep);
}
}
return wal_deletion_edit;
}

uint64_t FindMinPrepLogReferencedByMemTable(
VersionSet* vset, const autovector<MemTable*>& memtables_to_flush) {
uint64_t min_log = 0;
Expand Down
83 changes: 54 additions & 29 deletions db/memtable_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -582,37 +582,28 @@ Status MemTableList::TryInstallMemtableFlushResults(

// TODO(myabandeh): Not sure how batch_count could be 0 here.
if (batch_count > 0) {
uint64_t min_wal_number_to_keep = 0;
assert(edit_list.size() > 0);
if (vset->db_options()->allow_2pc) {
// Note that if mempurge is successful, the edit_list will
// not be applicable (contains info of new min_log number to keep,
// and level 0 file path of SST file created during normal flush,
// so both pieces of information are irrelevant after a successful
// mempurge operation).
min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC(
vset, *cfd, edit_list, memtables_to_flush, prep_tracker);

// We piggyback the information of earliest log file to keep in the
// manifest entry for the last file flushed.
VersionEdit edit;
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
if (memtables_to_flush.size() == memlist.size()) {
// TODO(yuzhangyu): remove this testing code once the
// `GetEditForDroppingCurrentVersion` API is used by the atomic data
// replacement. This function can get the same edits for wal related
// fields, and some duplicated fields as contained already in edit_list
// for column family's recovery.
edit = GetEditForDroppingCurrentVersion(cfd, vset, prep_tracker);
} else {
min_wal_number_to_keep =
PrecomputeMinLogNumberToKeepNon2PC(vset, *cfd, edit_list);
}

VersionEdit wal_deletion;
wal_deletion.SetMinLogNumberToKeep(min_wal_number_to_keep);
if (vset->db_options()->track_and_verify_wals_in_manifest) {
if (min_wal_number_to_keep >
vset->GetWalSet().GetMinWalNumberToKeep()) {
wal_deletion.DeleteWalsBefore(min_wal_number_to_keep);
}
TEST_SYNC_POINT_CALLBACK(
"MemTableList::TryInstallMemtableFlushResults:"
"AfterComputeMinWalToKeep",
nullptr);
edit = GetDBRecoveryEditForObsoletingMemTables(
vset, *cfd, edit_list, memtables_to_flush, prep_tracker);
}
edit_list.push_back(&wal_deletion);
#else
edit = GetDBRecoveryEditForObsoletingMemTables(
vset, *cfd, edit_list, memtables_to_flush, prep_tracker);
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
TEST_SYNC_POINT_CALLBACK(
"MemTableList::TryInstallMemtableFlushResults:"
"AfterComputeMinWalToKeep",
nullptr);
edit_list.push_back(&edit);

const auto manifest_write_cb = [this, cfd, batch_count, log_buffer,
to_delete, mu](const Status& status) {
Expand Down Expand Up @@ -1026,4 +1017,38 @@ void MemTableList::RemoveOldMemTables(uint64_t log_number,
ResetTrimHistoryNeeded();
}

VersionEdit MemTableList::GetEditForDroppingCurrentVersion(
const ColumnFamilyData* cfd, VersionSet* vset,
LogsWithPrepTracker* prep_tracker) const {
assert(cfd);
auto& memlist = current_->memlist_;
if (memlist.empty()) {
return VersionEdit();
}

uint64_t max_next_log_number = 0;
autovector<VersionEdit*> edit_list;
autovector<MemTable*> memtables_to_drop;
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it;
memtables_to_drop.push_back(m);
max_next_log_number = std::max(m->GetNextLogNumber(), max_next_log_number);
}

// Check the obsoleted MemTables' impact on WALs related to DB's recovery (min
// log number to keep, a delta of WAL files to delete).
VersionEdit edit_with_log_number;
edit_with_log_number.SetPrevLogNumber(0);
edit_with_log_number.SetLogNumber(max_next_log_number);
edit_list.push_back(&edit_with_log_number);
VersionEdit edit = GetDBRecoveryEditForObsoletingMemTables(
vset, *cfd, edit_list, memtables_to_drop, prep_tracker);

// Set fields related to the column family's recovery.
edit.SetColumnFamily(cfd->GetID());
edit.SetPrevLogNumber(0);
edit.SetLogNumber(max_next_log_number);
return edit;
}

} // namespace ROCKSDB_NAMESPACE
6 changes: 6 additions & 0 deletions db/memtable_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,12 @@ class MemTableList {
void RemoveOldMemTables(uint64_t log_number,
autovector<MemTable*>* to_delete);

// This API is only used by atomic date replacement. To get an edit for
// dropping the current `MemTableListVersion`.
VersionEdit GetEditForDroppingCurrentVersion(
const ColumnFamilyData* cfd, VersionSet* vset,
LogsWithPrepTracker* prep_tracker) const;

private:
friend Status InstallMemtableAtomicFlushResults(
const autovector<MemTableList*>* imm_lists,
Expand Down

0 comments on commit 43bc71f

Please sign in to comment.