Skip to content

Commit

Permalink
Support dump a directory of wal in chronological order
Browse files Browse the repository at this point in the history
  • Loading branch information
jowlyzhang committed Sep 13, 2024
1 parent 1c5faa7 commit db28b91
Showing 1 changed file with 179 additions and 47 deletions.
226 changes: 179 additions & 47 deletions tools/ldb_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,90 @@ const std::string LDBCommand::ARG_READ_TIMESTAMP = "read_timestamp";
const char* LDBCommand::DELIM = " ==> ";

namespace {
// Helper class to iterate WAL logs in a directory in chronological order.
class WALPicker {
public:
explicit WALPicker(const std::string& parent_dir,
const std::vector<std::string>& filenames);
// REQUIRES Valid() == true
std::string GetNextWAL();
bool Valid() const { return wal_file_iter_ != log_files_.end(); }

private:
// WAL log file names(s)
std::string parent_dir_;
std::vector<std::string> log_files_;
std::vector<std::string>::const_iterator wal_file_iter_;
};

WALPicker::WALPicker(const std::string& parent_dir,
const std::vector<std::string>& filenames)
: parent_dir_(parent_dir) {
// populate wal logs
assert(!filenames.empty());
for (const auto& fname : filenames) {
uint64_t file_num = 0;
FileType file_type;
bool parse_ok = ParseFileName(fname, &file_num, &file_type);
if (parse_ok && file_type == kWalFile) {
log_files_.push_back(fname);
}
}

std::sort(log_files_.begin(), log_files_.end(),
[](const std::string& lhs, const std::string& rhs) {
uint64_t num1 = 0;
uint64_t num2 = 0;
FileType type1;
FileType type2;
bool parse_ok1 = ParseFileName(lhs, &num1, &type1);
bool parse_ok2 = ParseFileName(rhs, &num2, &type2);
#ifndef NDEBUG
assert(parse_ok1);
assert(parse_ok2);
#else
(void)parse_ok1;
(void)parse_ok2;
#endif
return num1 < num2;
});
wal_file_iter_ = log_files_.begin();
}

std::string WALPicker::GetNextWAL() {
assert(Valid());
std::string ret;
if (wal_file_iter_ != log_files_.end()) {
ret.assign(parent_dir_);
if (ret.back() != kFilePathSeparator) {
ret.push_back(kFilePathSeparator);
}
ret.append(*wal_file_iter_);
++wal_file_iter_;
}
return ret;
}

void DumpWalFiles(Options options, const std::string& dir_or_file,
bool print_header, bool print_values,
bool only_print_seqno_gaps, bool is_write_committed,
const std::map<uint32_t, const Comparator*>& ucmps,
LDBCommandExecuteResult* exec_state);

void DumpWalFiles(Options options,
const std::vector<std::unique_ptr<WalFile>>& sorted_wal_files,
bool print_header, bool print_values,
bool only_print_seqno_gaps, bool is_write_committed,
const std::map<uint32_t, const Comparator*>& ucmps,
LDBCommandExecuteResult* exec_state);

void DumpWalFile(Options options, std::string wal_file, bool print_header,
bool print_values, bool only_print_seqno_gaps,
bool is_write_committed,
void DumpWalFile(Options options, const std::string& wal_file,
bool print_header, bool print_values,
bool only_print_seqno_gaps, bool is_write_committed,
const std::map<uint32_t, const Comparator*>& ucmps,
LDBCommandExecuteResult* exec_state);
LDBCommandExecuteResult* exec_state,
std::optional<SequenceNumber>* prev_batch_seqno,
std::optional<uint32_t>* prev_batch_count);

void DumpSstFile(Options options, std::string filename, bool output_hex,
bool show_properties, bool decode_blob_index,
Expand Down Expand Up @@ -2214,10 +2292,10 @@ void DBDumperCommand::DoCommand() {
switch (type) {
case kWalFile:
// TODO(myabandeh): allow configuring is_write_commited
DumpWalFile(options_, path_, /* print_header_ */ true,
/* print_values_ */ true,
/* only_print_seqno_gaps */ false,
true /* is_write_commited */, ucmps_, &exec_state_);
DumpWalFiles(options_, path_, /* print_header_ */ true,
/* print_values_ */ true,
/* only_print_seqno_gaps */ false,
true /* is_write_commited */, ucmps_, &exec_state_);
break;
case kTableFile:
DumpSstFile(options_, path_, is_key_hex_, /* show_properties */ true,
Expand Down Expand Up @@ -2844,11 +2922,79 @@ class InMemoryHandler : public WriteBatch::Handler {
const std::map<uint32_t, const Comparator*> ucmps_;
};

void DumpWalFile(Options options, std::string wal_file, bool print_header,
bool print_values, bool only_print_seqno_gaps,
bool is_write_committed,
void DumpWalFiles(Options options, const std::string& dir_or_file,
bool print_header, bool print_values,
bool only_print_seqno_gaps, bool is_write_committed,
const std::map<uint32_t, const Comparator*>& ucmps,
LDBCommandExecuteResult* exec_state) {
std::vector<std::string> filenames;
ROCKSDB_NAMESPACE::Env* env = options.env;
ROCKSDB_NAMESPACE::Status st = env->GetChildren(dir_or_file, &filenames);
std::optional<SequenceNumber> prev_batch_seqno = std::nullopt;
std::optional<uint32_t> prev_batch_count = std::nullopt;
if (!st.ok() || filenames.empty()) {
// dir_or_file does not exist or does not contain children
// Check its existence first
Status s = env->FileExists(dir_or_file);
// dir_or_file does not exist
if (!s.ok()) {
if (exec_state) {
*exec_state = LDBCommandExecuteResult::Failed(
dir_or_file + ": No such file or directory");
}
return;
}
DumpWalFile(options, dir_or_file, print_header, print_values,
only_print_seqno_gaps, is_write_committed, ucmps, exec_state,
&prev_batch_seqno, &prev_batch_count);
} else {
WALPicker wal_picker(dir_or_file, filenames);
if (!wal_picker.Valid()) {
if (exec_state) {
*exec_state = LDBCommandExecuteResult::Failed(
dir_or_file + ": No valid wal logs found");
}
return;
}
std::string wal_file = wal_picker.GetNextWAL();
while (!wal_file.empty()) {
std::cout << "Checking wal file: " << wal_file << std::endl;
DumpWalFile(options, wal_file, print_header, print_values,
only_print_seqno_gaps, is_write_committed, ucmps, exec_state,
&prev_batch_seqno, &prev_batch_count);
if (exec_state->IsFailed() || !wal_picker.Valid()) {
return;
}
wal_file = wal_picker.GetNextWAL();
}
}
}

void DumpWalFiles(Options options,
const std::vector<std::unique_ptr<WalFile>>& sorted_wal_files,
bool print_header, bool print_values,
bool only_print_seqno_gaps, bool is_write_committed,
const std::map<uint32_t, const Comparator*>& ucmps,
LDBCommandExecuteResult* exec_state) {
std::optional<SequenceNumber> prev_batch_seqno = std::nullopt;
std::optional<uint32_t> prev_batch_count = std::nullopt;
for (const auto& wal : sorted_wal_files) {
// TODO(qyang): option.wal_dir should be passed into ldb command
std::string wal_file_name = options.wal_dir + wal->PathName();
std::cout << "Checking wal file: " << wal_file_name << std::endl;
DumpWalFile(options, wal_file_name, print_header, print_values,
only_print_seqno_gaps, is_write_committed, ucmps, exec_state,
&prev_batch_seqno, &prev_batch_count);
}
}

void DumpWalFile(Options options, const std::string& wal_file,
bool print_header, bool print_values,
bool only_print_seqno_gaps, bool is_write_committed,
const std::map<uint32_t, const Comparator*>& ucmps,
LDBCommandExecuteResult* exec_state) {
LDBCommandExecuteResult* exec_state,
std::optional<SequenceNumber>* prev_batch_seqno,
std::optional<uint32_t>* prev_batch_count) {
const auto& fs = options.env->GetFileSystem();
FileOptions soptions(options);
std::unique_ptr<SequentialFileReader> wal_file_reader;
Expand Down Expand Up @@ -2898,8 +3044,6 @@ void DumpWalFile(Options options, std::string wal_file, bool print_header,
}
std::cout << "\n";
}
std::optional<SequenceNumber> prev_batch_seqno = std::nullopt;
std::optional<uint32_t> prev_batch_count = std::nullopt;
while (status.ok() && reader.ReadRecord(&record, &scratch)) {
row.str("");
if (record.size() < WriteBatchInternal::kHeader) {
Expand Down Expand Up @@ -2955,18 +3099,20 @@ void DumpWalFile(Options options, std::string wal_file, bool print_header,
}
SequenceNumber sequence_number = WriteBatchInternal::Sequence(&batch);
uint32_t batch_count = WriteBatchInternal::Count(&batch);
assert(prev_batch_seqno);
assert(prev_batch_count);
if (only_print_seqno_gaps &&
(!prev_batch_seqno.has_value() || !prev_batch_count.has_value() ||
prev_batch_seqno.value() + prev_batch_count.value() ==
(!prev_batch_seqno->has_value() || !prev_batch_count->has_value() ||
prev_batch_seqno->value() + prev_batch_count->value() ==
sequence_number)) {
prev_batch_seqno = sequence_number;
prev_batch_count = batch_count;
*prev_batch_seqno = sequence_number;
*prev_batch_count = batch_count;
continue;
}
row << sequence_number << ",";
row << batch_count << ",";
prev_batch_seqno = sequence_number;
prev_batch_count = batch_count;
*prev_batch_seqno = sequence_number;
*prev_batch_count = batch_count;
row << WriteBatchInternal::ByteSize(&batch) << ",";
row << reader.LastRecordOffset() << ",";
ColumnFamilyCollector cf_collector;
Expand Down Expand Up @@ -3060,21 +3206,22 @@ WALDumperCommand::WALDumperCommand(
void WALDumperCommand::Help(std::string& ret) {
ret.append(" ");
ret.append(WALDumperCommand::Name());
ret.append(" --" + ARG_WAL_FILE + "=<write_ahead_log_file_path>");
ret.append(" --" + ARG_WAL_FILE +
"=<write_ahead_log_file_path_or_directory>");
ret.append(" [--" + ARG_DB + "=<db_path>]");
ret.append(" [--" + ARG_PRINT_HEADER + "] ");
ret.append(" [--" + ARG_PRINT_VALUE + "] ");
ret.append(" [--" + ARG_ONLY_PRINT_SEQNO_GAPS +
"] (assuming seq_per_batch = false) ");
"] (assuming seq_per_batch=false) ");
ret.append(" [--" + ARG_WRITE_COMMITTED + "=true|false] ");
ret.append("\n");
}

void WALDumperCommand::DoCommand() {
PrepareOptions();
DumpWalFile(options_, wal_file_, print_header_, print_values_,
only_print_seqno_gaps_, is_write_committed_, ucmps_,
&exec_state_);
DumpWalFiles(options_, wal_file_, print_header_, print_values_,
only_print_seqno_gaps_, is_write_committed_, ucmps_,
&exec_state_);
}

// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -4554,28 +4701,13 @@ void DBFileDumperCommand::DoCommand() {

std::cout << "Write Ahead Log Files" << std::endl;
std::cout << "==============================" << std::endl;
ROCKSDB_NAMESPACE::VectorWalPtr wal_files;
s = db_->GetSortedWalFiles(wal_files);
if (!s.ok()) {
std::cerr << "Error when getting WAL files" << std::endl;
} else {
std::string wal_dir;
if (options_.wal_dir.empty()) {
wal_dir = db_->GetName();
} else {
wal_dir = NormalizePath(options_.wal_dir + "/");
}
for (auto& wal : wal_files) {
// TODO(qyang): option.wal_dir should be passed into ldb command
std::string filename = wal_dir + wal->PathName();
std::cout << filename << std::endl;
// TODO(myabandeh): allow configuring is_write_commited
DumpWalFile(options_, filename, true /* print_header */,
true /* print_values */,
false /* only_print_seqno_gapstrue */,
true /* is_write_commited */, ucmps_, &exec_state_);
}
}
ROCKSDB_NAMESPACE::VectorWalPtr sorted_wal_files;
s = db_->GetSortedWalFiles(sorted_wal_files);
// TODO(qyang): option.wal_dir should be passed into ldb command
// TODO(myabandeh): allow configuring is_write_commited
DumpWalFiles(options_, sorted_wal_files, true /* print_header */,
true /* print_values */, false /* only_print_seqno_gapstrue */,
true /* is_write_commited */, ucmps_, &exec_state_);
}

const std::string DBLiveFilesMetadataDumperCommand::ARG_SORT_BY_FILENAME =
Expand Down

0 comments on commit db28b91

Please sign in to comment.