Skip to content

Commit

Permalink
Add a ReadOnlyMemTable interface.
Browse files Browse the repository at this point in the history
This allows custom implementation of immutable memtables.
  • Loading branch information
cbi42 committed Oct 31, 2024
1 parent 1987313 commit d14b6bc
Show file tree
Hide file tree
Showing 19 changed files with 513 additions and 422 deletions.
6 changes: 3 additions & 3 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ void SuperVersion::Cleanup() {
// decrement reference to the immutable MemtableList
// this SV object was pointing to.
imm->Unref(&to_delete);
MemTable* m = mem->Unref();
auto m = static_cast<MemTable*>(mem->Unref());
if (m != nullptr) {
auto* memory_usage = current->cfd()->imm()->current_memory_usage();
assert(*memory_usage >= m->ApproximateMemoryUsage());
Expand Down Expand Up @@ -693,9 +693,9 @@ ColumnFamilyData::~ColumnFamilyData() {
if (mem_ != nullptr) {
delete mem_->Unref();
}
autovector<MemTable*> to_delete;
autovector<ReadOnlyMemTable*> to_delete;
imm_.current()->Unref(&to_delete);
for (MemTable* m : to_delete) {
for (auto* m : to_delete) {
delete m;
}

Expand Down
2 changes: 1 addition & 1 deletion db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ struct SuperVersion {
// We need to_delete because during Cleanup(), imm->Unref() returns
// all memtables that we need to free through this vector. We then
// delete all those memtables outside of mutex, during destruction
autovector<MemTable*> to_delete;
autovector<ReadOnlyMemTable*> to_delete;
};

Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options);
Expand Down
4 changes: 2 additions & 2 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4726,9 +4726,9 @@ void DBImpl::GetApproximateMemTableStats(ColumnFamilyHandle* column_family,
// Convert user_key into a corresponding internal key.
InternalKey k1(start.value(), kMaxSequenceNumber, kValueTypeForSeek);
InternalKey k2(limit.value(), kMaxSequenceNumber, kValueTypeForSeek);
MemTable::MemTableStats memStats =
ReadOnlyMemTable::MemTableStats memStats =
sv->mem->ApproximateStats(k1.Encode(), k2.Encode());
MemTable::MemTableStats immStats =
ReadOnlyMemTable::MemTableStats immStats =
sv->imm->ApproximateStats(k1.Encode(), k2.Encode());
*count = memStats.count + immStats.count;
*size = memStats.size + immStats.size;
Expand Down
15 changes: 9 additions & 6 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1704,7 +1704,7 @@ class DBImpl : public DB {

struct WriteContext {
SuperVersionContext superversion_context;
autovector<MemTable*> memtables_to_free_;
autovector<ReadOnlyMemTable*> memtables_to_free_;

explicit WriteContext(bool create_superversion = false)
: superversion_context(create_superversion) {}
Expand Down Expand Up @@ -2046,6 +2046,8 @@ class DBImpl : public DB {

Status TrimMemtableHistory(WriteContext* context);

// Switches the current live memtable to immutable/read-only memtable.
// A new WAL is created if the current WAL is not empty.
Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context);

// Select and output column families qualified for atomic flush in
Expand Down Expand Up @@ -3002,7 +3004,8 @@ CompressionType GetCompressionFlush(const ImmutableCFOptions& ioptions,
VersionEdit GetDBRecoveryEditForObsoletingMemTables(
VersionSet* vset, const ColumnFamilyData& cfd,
const autovector<VersionEdit*>& edit_list,
const autovector<MemTable*>& memtables, LogsWithPrepTracker* prep_tracker);
const autovector<ReadOnlyMemTable*>& memtables,
LogsWithPrepTracker* prep_tracker);

// Return the earliest log file to keep after the memtable flush is
// finalized.
Expand All @@ -3013,13 +3016,13 @@ VersionEdit GetDBRecoveryEditForObsoletingMemTables(
uint64_t PrecomputeMinLogNumberToKeep2PC(
VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
const autovector<VersionEdit*>& edit_list,
const autovector<MemTable*>& memtables_to_flush,
const autovector<ReadOnlyMemTable*>& memtables_to_flush,
LogsWithPrepTracker* prep_tracker);
// For atomic flush.
uint64_t PrecomputeMinLogNumberToKeep2PC(
VersionSet* vset, const autovector<ColumnFamilyData*>& cfds_to_flush,
const autovector<autovector<VersionEdit*>>& edit_lists,
const autovector<const autovector<MemTable*>*>& memtables_to_flush,
const autovector<const autovector<ReadOnlyMemTable*>*>& memtables_to_flush,
LogsWithPrepTracker* prep_tracker);

// In non-2PC mode, WALs with log number < the returned number can be
Expand All @@ -3036,11 +3039,11 @@ uint64_t PrecomputeMinLogNumberToKeepNon2PC(
// will not depend on any WAL file. nullptr means no memtable is being flushed.
// The function is only applicable to 2pc mode.
uint64_t FindMinPrepLogReferencedByMemTable(
VersionSet* vset, const autovector<MemTable*>& memtables_to_flush);
VersionSet* vset, const autovector<ReadOnlyMemTable*>& memtables_to_flush);
// For atomic flush.
uint64_t FindMinPrepLogReferencedByMemTable(
VersionSet* vset,
const autovector<const autovector<MemTable*>*>& memtables_to_flush);
const autovector<const autovector<ReadOnlyMemTable*>*>& memtables_to_flush);

// Fix user-supplied options to be reasonable
template <class T, class V>
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(

if (s.ok()) {
autovector<ColumnFamilyData*> tmp_cfds;
autovector<const autovector<MemTable*>*> mems_list;
autovector<const autovector<ReadOnlyMemTable*>*> mems_list;
autovector<const MutableCFOptions*> mutable_cf_options_list;
autovector<FileMetaData*> tmp_file_meta;
autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_debug.cc
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ size_t DBImpl::TEST_LogsWithPrepSize() {
}

uint64_t DBImpl::TEST_FindMinPrepLogReferencedByMemTable() {
autovector<MemTable*> empty_list;
autovector<ReadOnlyMemTable*> empty_list;
return FindMinPrepLogReferencedByMemTable(versions_.get(), empty_list);
}

Expand Down
19 changes: 10 additions & 9 deletions db/db_impl/db_impl_files.cc
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,8 @@ void DBImpl::DeleteObsoleteFiles() {
VersionEdit GetDBRecoveryEditForObsoletingMemTables(
VersionSet* vset, const ColumnFamilyData& cfd,
const autovector<VersionEdit*>& edit_list,
const autovector<MemTable*>& memtables, LogsWithPrepTracker* prep_tracker) {
const autovector<ReadOnlyMemTable*>& memtables,
LogsWithPrepTracker* prep_tracker) {
VersionEdit wal_deletion_edit;
uint64_t min_wal_number_to_keep = 0;
assert(edit_list.size() > 0);
Expand Down Expand Up @@ -769,12 +770,12 @@ VersionEdit GetDBRecoveryEditForObsoletingMemTables(
}

uint64_t FindMinPrepLogReferencedByMemTable(
VersionSet* vset, const autovector<MemTable*>& memtables_to_flush) {
VersionSet* vset, const autovector<ReadOnlyMemTable*>& memtables_to_flush) {
uint64_t min_log = 0;

// we must look through the memtables for two phase transactions
// that have been committed but not yet flushed
std::unordered_set<MemTable*> memtables_to_flush_set(
std::unordered_set<ReadOnlyMemTable*> memtables_to_flush_set(
memtables_to_flush.begin(), memtables_to_flush.end());
for (auto loop_cfd : *vset->GetColumnFamilySet()) {
if (loop_cfd->IsDropped()) {
Expand All @@ -799,12 +800,12 @@ uint64_t FindMinPrepLogReferencedByMemTable(
}

uint64_t FindMinPrepLogReferencedByMemTable(
VersionSet* vset,
const autovector<const autovector<MemTable*>*>& memtables_to_flush) {
VersionSet* vset, const autovector<const autovector<ReadOnlyMemTable*>*>&
memtables_to_flush) {
uint64_t min_log = 0;

std::unordered_set<MemTable*> memtables_to_flush_set;
for (const autovector<MemTable*>* memtables : memtables_to_flush) {
std::unordered_set<ReadOnlyMemTable*> memtables_to_flush_set;
for (const autovector<ReadOnlyMemTable*>* memtables : memtables_to_flush) {
memtables_to_flush_set.insert(memtables->begin(), memtables->end());
}
for (auto loop_cfd : *vset->GetColumnFamilySet()) {
Expand Down Expand Up @@ -896,7 +897,7 @@ uint64_t PrecomputeMinLogNumberToKeepNon2PC(
uint64_t PrecomputeMinLogNumberToKeep2PC(
VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
const autovector<VersionEdit*>& edit_list,
const autovector<MemTable*>& memtables_to_flush,
const autovector<ReadOnlyMemTable*>& memtables_to_flush,
LogsWithPrepTracker* prep_tracker) {
assert(vset != nullptr);
assert(prep_tracker != nullptr);
Expand Down Expand Up @@ -937,7 +938,7 @@ uint64_t PrecomputeMinLogNumberToKeep2PC(
uint64_t PrecomputeMinLogNumberToKeep2PC(
VersionSet* vset, const autovector<ColumnFamilyData*>& cfds_to_flush,
const autovector<autovector<VersionEdit*>>& edit_lists,
const autovector<const autovector<MemTable*>*>& memtables_to_flush,
const autovector<const autovector<ReadOnlyMemTable*>*>& memtables_to_flush,
LogsWithPrepTracker* prep_tracker) {
assert(vset != nullptr);
assert(prep_tracker != nullptr);
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1768,7 +1768,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
s = io_s;
}

uint64_t total_num_entries = mem->num_entries();
uint64_t total_num_entries = mem->NumEntries();
if (s.ok() && total_num_entries != num_input_entries) {
std::string msg = "Expected " + std::to_string(total_num_entries) +
" entries in memtable, but read " +
Expand Down
8 changes: 5 additions & 3 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
}
}
// Note: if we are to resume after non-OK statuses we need to revisit how
// we reacts to non-OK statuses here.
// we react to non-OK statuses here.
versions_->SetLastSequence(last_sequence);
}
MemTableInsertStatusCheck(w.status);
Expand Down Expand Up @@ -1610,6 +1610,8 @@ IOStatus DBImpl::ConcurrentWriteToWAL(
Status DBImpl::WriteRecoverableState() {
mutex_.AssertHeld();
if (!cached_recoverable_state_empty_) {
// Only for write-prepared and write-unprepared.
assert(seq_per_batch_);
bool dont_care_bool;
SequenceNumber next_seq;
if (two_write_queues_) {
Expand Down Expand Up @@ -2251,8 +2253,8 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
memtable_info.cf_name = cfd->GetName();
memtable_info.first_seqno = cfd->mem()->GetFirstSequenceNumber();
memtable_info.earliest_seqno = cfd->mem()->GetEarliestSequenceNumber();
memtable_info.num_entries = cfd->mem()->num_entries();
memtable_info.num_deletes = cfd->mem()->num_deletes();
memtable_info.num_entries = cfd->mem()->NumEntries();
memtable_info.num_deletes = cfd->mem()->NumDeletion();
if (!cfd->ioptions()->persist_user_defined_timestamps &&
cfd->user_comparator()->timestamp_size() > 0) {
const Slice& newest_udt = cfd->mem()->GetNewestUDT();
Expand Down
33 changes: 17 additions & 16 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ void FlushJob::ReportStartedFlush() {
IOSTATS_RESET(bytes_written);
}

void FlushJob::ReportFlushInputSize(const autovector<MemTable*>& mems) {
void FlushJob::ReportFlushInputSize(const autovector<ReadOnlyMemTable*>& mems) {
uint64_t input_size = 0;
for (auto* mem : mems) {
input_size += mem->ApproximateMemoryUsage();
Expand Down Expand Up @@ -204,7 +204,7 @@ void FlushJob::PickMemTable() {
// entries mems are (implicitly) sorted in ascending order by their created
// time. We will use the first memtable's `edit` to keep the meta info for
// this flush.
MemTable* m = mems_[0];
ReadOnlyMemTable* m = mems_[0];
edit_ = m->GetEdits();
edit_->SetPrevLogNumber(0);
// SetLogNumber(log_num) indicates logs with number smaller than log_num
Expand Down Expand Up @@ -420,7 +420,7 @@ Status FlushJob::MemPurge() {
std::vector<InternalIterator*> memtables;
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
range_del_iters;
for (MemTable* m : mems_) {
for (ReadOnlyMemTable* m : mems_) {
memtables.push_back(m->NewIterator(ro, /*seqno_to_time_mapping=*/nullptr,
&arena, /*prefix_extractor=*/nullptr));
auto* range_del_iter = m->NewRangeTombstoneIterator(
Expand Down Expand Up @@ -713,11 +713,11 @@ bool FlushJob::MemPurgeDecider(double threshold) {

// Iterate over each memtable of the set.
for (auto mem_iter = std::begin(mems_); mem_iter != std::end(mems_);
mem_iter++) {
MemTable* mt = *mem_iter;
++mem_iter) {
ReadOnlyMemTable* mt = *mem_iter;

// Else sample from the table.
uint64_t nentries = mt->num_entries();
uint64_t nentries = mt->NumEntries();
// Corrected Cochran formula for small populations
// (converges to n0 for large populations).
uint64_t target_sample_size =
Expand Down Expand Up @@ -894,11 +894,12 @@ Status FlushJob::WriteLevel0Table() {
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:num_memtables",
&mems_size);
assert(job_context_);
for (MemTable* m : mems_) {
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
for (ReadOnlyMemTable* m : mems_) {
ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Flushing memtable id %" PRIu64
" with next log file: %" PRIu64 "\n",
cfd_->GetName().c_str(), job_context_->job_id, m->GetID(),
m->GetNextLogNumber());
if (logical_strip_timestamp) {
memtables.push_back(m->NewTimestampStrippingIterator(
ro, /*seqno_to_time_mapping=*/nullptr, &arena,
Expand All @@ -917,11 +918,11 @@ Status FlushJob::WriteLevel0Table() {
if (range_del_iter != nullptr) {
range_del_iters.emplace_back(range_del_iter);
}
total_num_entries += m->num_entries();
total_num_deletes += m->num_deletes();
total_data_size += m->get_data_size();
total_num_entries += m->NumEntries();
total_num_deletes += m->NumDeletion();
total_data_size += m->GetDataSize();
total_memory_usage += m->ApproximateMemoryUsage();
total_num_range_deletes += m->num_range_deletes();
total_num_range_deletes += m->NumRangeDeletion();
}

// TODO(cbi): when memtable is flushed due to number of range deletions
Expand Down Expand Up @@ -1171,7 +1172,7 @@ void FlushJob::GetEffectiveCutoffUDTForPickedMemTables() {
return;
}
// Find the newest user-defined timestamps from all the flushed memtables.
for (MemTable* m : mems_) {
for (const ReadOnlyMemTable* m : mems_) {
Slice table_newest_udt = m->GetNewestUDT();
// Empty memtables can be legitimately created and flushed, for example
// by error recovery flush attempts.
Expand Down
7 changes: 4 additions & 3 deletions db/flush_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class FlushJob {
bool* skipped_since_bg_error = nullptr,
ErrorHandler* error_handler = nullptr);
void Cancel();
const autovector<MemTable*>& GetMemTables() const { return mems_; }
const autovector<ReadOnlyMemTable*>& GetMemTables() const { return mems_; }

std::list<std::unique_ptr<FlushJobInfo>>* GetCommittedFlushJobsInfo() {
return &committed_flush_jobs_info_;
Expand All @@ -101,7 +101,7 @@ class FlushJob {
friend class FlushJobTest_GetRateLimiterPriorityForWrite_Test;

void ReportStartedFlush();
void ReportFlushInputSize(const autovector<MemTable*>& mems);
static void ReportFlushInputSize(const autovector<ReadOnlyMemTable*>& mems);
void RecordFlushIOStats();
Status WriteLevel0Table();

Expand Down Expand Up @@ -205,7 +205,8 @@ class FlushJob {

// Variables below are set by PickMemTable():
FileMetaData meta_;
autovector<MemTable*> mems_;
// Memtables to be flushed by this job.
autovector<ReadOnlyMemTable*> mems_;
VersionEdit* edit_;
Version* base_;
bool pick_memtable_called;
Expand Down
Loading

0 comments on commit d14b6bc

Please sign in to comment.