Skip to content

Commit

Permalink
Add an internal API MemTableList::GetEditForDroppingCurrentVersion
Browse files Browse the repository at this point in the history
  • Loading branch information
jowlyzhang committed Sep 6, 2024
1 parent 4b1d595 commit 34d70cb
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 29 deletions.
8 changes: 8 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2946,6 +2946,14 @@ DBOptions SanitizeOptions(const std::string& db, const DBOptions& src,
CompressionType GetCompressionFlush(const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options);

// Return a VersionEdit for the DB's recovery when the `memtables` of the
// specified column family are obsolete. Specifically, the min log number to
// keep, and the WAL files that can be deleted.
VersionEdit GetDBRecoveryEditForObsoletingMemTables(
VersionSet* vset, const ColumnFamilyData& cfd,
const autovector<VersionEdit*>& edit_list,
const autovector<MemTable*>& memtables, LogsWithPrepTracker* prep_tracker);

// Return the earliest log file to keep after the memtable flush is
// finalized.
// `cfd_to_flush` is the column family whose memtable (specified in
Expand Down
32 changes: 32 additions & 0 deletions db/db_impl/db_impl_files.cc
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,38 @@ void DBImpl::DeleteObsoleteFiles() {
mutex_.Lock();
}

VersionEdit GetDBRecoveryEditForObsoletingMemTables(
VersionSet* vset, const ColumnFamilyData& cfd,
const autovector<VersionEdit*>& edit_list,
const autovector<MemTable*>& memtables, LogsWithPrepTracker* prep_tracker) {
VersionEdit wal_deletion_edit;
uint64_t min_wal_number_to_keep = 0;
assert(edit_list.size() > 0);
if (vset->db_options()->allow_2pc) {
// Note that if mempurge is successful, the edit_list will
// not be applicable (contains info of new min_log number to keep,
// and level 0 file path of SST file created during normal flush,
// so both pieces of information are irrelevant after a successful
// mempurge operation).
min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC(
vset, cfd, edit_list, memtables, prep_tracker);

// We piggyback the information of earliest log file to keep in the
// manifest entry for the last file flushed.
} else {
min_wal_number_to_keep =
PrecomputeMinLogNumberToKeepNon2PC(vset, cfd, edit_list);
}

wal_deletion_edit.SetMinLogNumberToKeep(min_wal_number_to_keep);
if (vset->db_options()->track_and_verify_wals_in_manifest) {
if (min_wal_number_to_keep > vset->GetWalSet().GetMinWalNumberToKeep()) {
wal_deletion_edit.DeleteWalsBefore(min_wal_number_to_keep);
}
}
return wal_deletion_edit;
}

uint64_t FindMinPrepLogReferencedByMemTable(
VersionSet* vset, const autovector<MemTable*>& memtables_to_flush) {
uint64_t min_log = 0;
Expand Down
83 changes: 54 additions & 29 deletions db/memtable_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -582,37 +582,28 @@ Status MemTableList::TryInstallMemtableFlushResults(

// TODO(myabandeh): Not sure how batch_count could be 0 here.
if (batch_count > 0) {
uint64_t min_wal_number_to_keep = 0;
assert(edit_list.size() > 0);
if (vset->db_options()->allow_2pc) {
// Note that if mempurge is successful, the edit_list will
// not be applicable (contains info of new min_log number to keep,
// and level 0 file path of SST file created during normal flush,
// so both pieces of information are irrelevant after a successful
// mempurge operation).
min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC(
vset, *cfd, edit_list, memtables_to_flush, prep_tracker);

// We piggyback the information of earliest log file to keep in the
// manifest entry for the last file flushed.
VersionEdit edit;
#ifndef NDEBUG
if (memtables_to_flush.size() == memlist.size()) {
// TODO(yuzhangyu): remove this testing code once the
// `GetEditForDroppingCurrentVersion` API is used by the atomic data
// replacement. This function can get the same edits for wal related
// fields, and some duplicated fields as contained already in edit_list
// for column family's recovery.
edit = GetEditForDroppingCurrentVersion(cfd, vset, prep_tracker);
} else {
min_wal_number_to_keep =
PrecomputeMinLogNumberToKeepNon2PC(vset, *cfd, edit_list);
}

VersionEdit wal_deletion;
wal_deletion.SetMinLogNumberToKeep(min_wal_number_to_keep);
if (vset->db_options()->track_and_verify_wals_in_manifest) {
if (min_wal_number_to_keep >
vset->GetWalSet().GetMinWalNumberToKeep()) {
wal_deletion.DeleteWalsBefore(min_wal_number_to_keep);
}
TEST_SYNC_POINT_CALLBACK(
"MemTableList::TryInstallMemtableFlushResults:"
"AfterComputeMinWalToKeep",
nullptr);
edit = GetDBRecoveryEditForObsoletingMemTables(
vset, *cfd, edit_list, memtables_to_flush, prep_tracker);
}
edit_list.push_back(&wal_deletion);
#else
edit = GetDBRecoveryEditForObsoletingMemTables(
vset, *cfd, edit_list, memtables_to_flush, prep_tracker);
#endif /* !NDEBUG */
TEST_SYNC_POINT_CALLBACK(
"MemTableList::TryInstallMemtableFlushResults:"
"AfterComputeMinWalToKeep",
nullptr);
edit_list.push_back(&edit);

const auto manifest_write_cb = [this, cfd, batch_count, log_buffer,
to_delete, mu](const Status& status) {
Expand Down Expand Up @@ -1026,4 +1017,38 @@ void MemTableList::RemoveOldMemTables(uint64_t log_number,
ResetTrimHistoryNeeded();
}

VersionEdit MemTableList::GetEditForDroppingCurrentVersion(
const ColumnFamilyData* cfd, VersionSet* vset,
LogsWithPrepTracker* prep_tracker) const {
assert(cfd);
auto& memlist = current_->memlist_;
if (memlist.empty()) {
return VersionEdit();
}

uint64_t max_next_log_number = 0;
autovector<VersionEdit*> edit_list;
autovector<MemTable*> memtables_to_drop;
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it;
memtables_to_drop.push_back(m);
max_next_log_number = std::max(m->GetNextLogNumber(), max_next_log_number);
}

// Check the obsoleted MemTables' impact on WALs related to DB's recovery (min
// log number to keep, a delta of WAL files to delete).
VersionEdit edit_with_log_number;
edit_with_log_number.SetPrevLogNumber(0);
edit_with_log_number.SetLogNumber(max_next_log_number);
edit_list.push_back(&edit_with_log_number);
VersionEdit edit = GetDBRecoveryEditForObsoletingMemTables(
vset, *cfd, edit_list, memtables_to_drop, prep_tracker);

// Set fields related to the column family's recovery.
edit.SetColumnFamily(cfd->GetID());
edit.SetPrevLogNumber(0);
edit.SetLogNumber(max_next_log_number);
return edit;
}

} // namespace ROCKSDB_NAMESPACE
6 changes: 6 additions & 0 deletions db/memtable_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,12 @@ class MemTableList {
void RemoveOldMemTables(uint64_t log_number,
autovector<MemTable*>* to_delete);

// This API is only used by atomic date replacement. To get an edit for
// dropping the current `MemTableListVersion`.
VersionEdit GetEditForDroppingCurrentVersion(
const ColumnFamilyData* cfd, VersionSet* vset,
LogsWithPrepTracker* prep_tracker) const;

private:
friend Status InstallMemtableAtomicFlushResults(
const autovector<MemTableList*>* imm_lists,
Expand Down

0 comments on commit 34d70cb

Please sign in to comment.