Skip to content

Commit

Permalink
Add an option to dump wal seqno gaps (#13014)
Browse files Browse the repository at this point in the history
Summary:
Add an option `--only_print_seqno_gaps` for wal dump to help with debugging. This option will check the continuity of sequence numbers in WAL logs, assuming `seq_per_batch` is false. `--walfile` option now also takes a directory, and it will check all WAL logs in the directory in chronological order.

When a gap is found, we can further check if it's related to operations like external file ingestion.

Pull Request resolved: #13014

Test Plan: Manually tested

Reviewed By: ltamasi

Differential Revision: D62989115

Pulled By: jowlyzhang

fbshipit-source-id: 22e3326344e7969ff9d5091d21fec2935770fbc7
  • Loading branch information
jowlyzhang authored and facebook-github-bot committed Sep 19, 2024
1 parent 10984e8 commit 1238120
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 18 deletions.
204 changes: 187 additions & 17 deletions tools/ldb_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,83 @@ const std::string LDBCommand::ARG_READ_TIMESTAMP = "read_timestamp";
const char* LDBCommand::DELIM = " ==> ";

namespace {
// Helper class to iterate WAL logs in a directory in chronological order.
class WALFileIterator {
public:
explicit WALFileIterator(const std::string& parent_dir,
const std::vector<std::string>& filenames);
// REQUIRES Valid() == true
std::string GetNextWAL();
bool Valid() const { return wal_file_iter_ != log_files_.end(); }

void DumpWalFile(Options options, std::string wal_file, bool print_header,
bool print_values, bool is_write_committed,
private:
// WAL log file names(s)
std::string parent_dir_;
std::vector<std::string> log_files_;
std::vector<std::string>::const_iterator wal_file_iter_;
};

WALFileIterator::WALFileIterator(const std::string& parent_dir,
const std::vector<std::string>& filenames)
: parent_dir_(parent_dir) {
// populate wal logs
assert(!filenames.empty());
for (const auto& fname : filenames) {
uint64_t file_num = 0;
FileType file_type;
bool parse_ok = ParseFileName(fname, &file_num, &file_type);
if (parse_ok && file_type == kWalFile) {
log_files_.push_back(fname);
}
}

std::sort(log_files_.begin(), log_files_.end(),
[](const std::string& lhs, const std::string& rhs) {
uint64_t num1 = 0;
uint64_t num2 = 0;
FileType type1;
FileType type2;
bool parse_ok1 = ParseFileName(lhs, &num1, &type1);
bool parse_ok2 = ParseFileName(rhs, &num2, &type2);
#ifndef NDEBUG
assert(parse_ok1);
assert(parse_ok2);
#else
(void)parse_ok1;
(void)parse_ok2;
#endif
return num1 < num2;
});
wal_file_iter_ = log_files_.begin();
}

std::string WALFileIterator::GetNextWAL() {
assert(Valid());
std::string ret;
if (wal_file_iter_ != log_files_.end()) {
ret.assign(parent_dir_);
if (ret.back() != kFilePathSeparator) {
ret.push_back(kFilePathSeparator);
}
ret.append(*wal_file_iter_);
++wal_file_iter_;
}
return ret;
}

void DumpWalFiles(Options options, const std::string& dir_or_file,
bool print_header, bool print_values,
bool only_print_seqno_gaps, bool is_write_committed,
const std::map<uint32_t, const Comparator*>& ucmps,
LDBCommandExecuteResult* exec_state);

void DumpWalFile(Options options, const std::string& wal_file,
bool print_header, bool print_values,
bool only_print_seqno_gaps, bool is_write_committed,
const std::map<uint32_t, const Comparator*>& ucmps,
LDBCommandExecuteResult* exec_state);
LDBCommandExecuteResult* exec_state,
std::optional<SequenceNumber>* prev_batch_seqno,
std::optional<uint32_t>* prev_batch_count);

void DumpSstFile(Options options, std::string filename, bool output_hex,
bool show_properties, bool decode_blob_index,
Expand Down Expand Up @@ -2213,9 +2285,10 @@ void DBDumperCommand::DoCommand() {
switch (type) {
case kWalFile:
// TODO(myabandeh): allow configuring is_write_commited
DumpWalFile(options_, path_, /* print_header_ */ true,
/* print_values_ */ true, true /* is_write_commited */,
ucmps_, &exec_state_);
DumpWalFiles(options_, path_, /* print_header_ */ true,
/* print_values_ */ true,
/* only_print_seqno_gaps */ false,
true /* is_write_commited */, ucmps_, &exec_state_);
break;
case kTableFile:
DumpSstFile(options_, path_, is_key_hex_, /* show_properties */ true,
Expand Down Expand Up @@ -2842,10 +2915,70 @@ class InMemoryHandler : public WriteBatch::Handler {
const std::map<uint32_t, const Comparator*> ucmps_;
};

void DumpWalFile(Options options, std::string wal_file, bool print_header,
bool print_values, bool is_write_committed,
void DumpWalFiles(Options options, const std::string& dir_or_file,
bool print_header, bool print_values,
bool only_print_seqno_gaps, bool is_write_committed,
const std::map<uint32_t, const Comparator*>& ucmps,
LDBCommandExecuteResult* exec_state) {
std::vector<std::string> filenames;
ROCKSDB_NAMESPACE::Env* env = options.env;
ROCKSDB_NAMESPACE::Status st = env->GetChildren(dir_or_file, &filenames);
std::optional<SequenceNumber> prev_batch_seqno;
std::optional<uint32_t> prev_batch_count;
if (!st.ok() || filenames.empty()) {
// dir_or_file does not exist or does not contain children
// Check its existence first
Status s = env->FileExists(dir_or_file);
// dir_or_file does not exist
if (!s.ok()) {
if (exec_state) {
*exec_state = LDBCommandExecuteResult::Failed(
dir_or_file + ": No such file or directory");
}
return;
}
// If it exists and doesn't have children, it should be a log file.
if (dir_or_file.length() <= 4 ||
dir_or_file.rfind(".log") != dir_or_file.length() - 4) {
if (exec_state) {
*exec_state = LDBCommandExecuteResult::Failed(
dir_or_file + ": Invalid log file name");
}
return;
}
DumpWalFile(options, dir_or_file, print_header, print_values,
only_print_seqno_gaps, is_write_committed, ucmps, exec_state,
&prev_batch_seqno, &prev_batch_count);
} else {
WALFileIterator wal_file_iter(dir_or_file, filenames);
if (!wal_file_iter.Valid()) {
if (exec_state) {
*exec_state = LDBCommandExecuteResult::Failed(
dir_or_file + ": No valid wal logs found");
}
return;
}
std::string wal_file = wal_file_iter.GetNextWAL();
while (!wal_file.empty()) {
std::cout << "Checking wal file: " << wal_file << std::endl;
DumpWalFile(options, wal_file, print_header, print_values,
only_print_seqno_gaps, is_write_committed, ucmps, exec_state,
&prev_batch_seqno, &prev_batch_count);
if (exec_state->IsFailed() || !wal_file_iter.Valid()) {
return;
}
wal_file = wal_file_iter.GetNextWAL();
}
}
}

void DumpWalFile(Options options, const std::string& wal_file,
bool print_header, bool print_values,
bool only_print_seqno_gaps, bool is_write_committed,
const std::map<uint32_t, const Comparator*>& ucmps,
LDBCommandExecuteResult* exec_state) {
LDBCommandExecuteResult* exec_state,
std::optional<SequenceNumber>* prev_batch_seqno,
std::optional<uint32_t>* prev_batch_count) {
const auto& fs = options.env->GetFileSystem();
FileOptions soptions(options);
std::unique_ptr<SequentialFileReader> wal_file_reader;
Expand Down Expand Up @@ -2948,8 +3081,32 @@ void DumpWalFile(Options options, std::string wal_file, bool print_header,
break;
}
}
row << WriteBatchInternal::Sequence(&batch) << ",";
row << WriteBatchInternal::Count(&batch) << ",";
SequenceNumber sequence_number = WriteBatchInternal::Sequence(&batch);
uint32_t batch_count = WriteBatchInternal::Count(&batch);
assert(prev_batch_seqno);
assert(prev_batch_count);
assert(prev_batch_seqno->has_value() == prev_batch_count->has_value());
// TODO(yuzhangyu): handle pessimistic transactions case.
if (only_print_seqno_gaps) {
if (!prev_batch_seqno->has_value() ||
!prev_batch_count->has_value() ||
prev_batch_seqno->value() + prev_batch_count->value() ==
sequence_number) {
*prev_batch_seqno = sequence_number;
*prev_batch_count = batch_count;
continue;
} else if (prev_batch_seqno->has_value() &&
prev_batch_count->has_value()) {
row << "Prev batch sequence number: " << prev_batch_seqno->value()
<< ", prev batch count: " << prev_batch_count->value() << ", ";
*prev_batch_seqno = sequence_number;
*prev_batch_count = batch_count;
}
}
row << sequence_number << ",";
row << batch_count << ",";
*prev_batch_seqno = sequence_number;
*prev_batch_count = batch_count;
row << WriteBatchInternal::ByteSize(&batch) << ",";
row << reader.LastRecordOffset() << ",";
ColumnFamilyCollector cf_collector;
Expand Down Expand Up @@ -3003,16 +3160,20 @@ const std::string WALDumperCommand::ARG_WAL_FILE = "walfile";
const std::string WALDumperCommand::ARG_WRITE_COMMITTED = "write_committed";
const std::string WALDumperCommand::ARG_PRINT_VALUE = "print_value";
const std::string WALDumperCommand::ARG_PRINT_HEADER = "header";
const std::string WALDumperCommand::ARG_ONLY_PRINT_SEQNO_GAPS =
"only_print_seqno_gaps";

WALDumperCommand::WALDumperCommand(
const std::vector<std::string>& /*params*/,
const std::map<std::string, std::string>& options,
const std::vector<std::string>& flags)
: LDBCommand(options, flags, true,
BuildCmdLineOptions({ARG_WAL_FILE, ARG_DB, ARG_WRITE_COMMITTED,
ARG_PRINT_HEADER, ARG_PRINT_VALUE})),
ARG_PRINT_HEADER, ARG_PRINT_VALUE,
ARG_ONLY_PRINT_SEQNO_GAPS})),
print_header_(false),
print_values_(false),
only_print_seqno_gaps_(false),
is_write_committed_(false) {
wal_file_.clear();

Expand All @@ -3023,6 +3184,7 @@ WALDumperCommand::WALDumperCommand(

print_header_ = IsFlagPresent(flags, ARG_PRINT_HEADER);
print_values_ = IsFlagPresent(flags, ARG_PRINT_VALUE);
only_print_seqno_gaps_ = IsFlagPresent(flags, ARG_ONLY_PRINT_SEQNO_GAPS);
is_write_committed_ = ParseBooleanOption(options, ARG_WRITE_COMMITTED, true);

if (wal_file_.empty()) {
Expand All @@ -3038,18 +3200,22 @@ WALDumperCommand::WALDumperCommand(
void WALDumperCommand::Help(std::string& ret) {
ret.append(" ");
ret.append(WALDumperCommand::Name());
ret.append(" --" + ARG_WAL_FILE + "=<write_ahead_log_file_path>");
ret.append(" --" + ARG_WAL_FILE +
"=<write_ahead_log_file_path_or_directory>");
ret.append(" [--" + ARG_DB + "=<db_path>]");
ret.append(" [--" + ARG_PRINT_HEADER + "] ");
ret.append(" [--" + ARG_PRINT_VALUE + "] ");
ret.append(" [--" + ARG_ONLY_PRINT_SEQNO_GAPS +
"] (only correct if not using pessimistic transactions)");
ret.append(" [--" + ARG_WRITE_COMMITTED + "=true|false] ");
ret.append("\n");
}

void WALDumperCommand::DoCommand() {
PrepareOptions();
DumpWalFile(options_, wal_file_, print_header_, print_values_,
is_write_committed_, ucmps_, &exec_state_);
DumpWalFiles(options_, wal_file_, print_header_, print_values_,
only_print_seqno_gaps_, is_write_committed_, ucmps_,
&exec_state_);
}

// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -4540,13 +4706,17 @@ void DBFileDumperCommand::DoCommand() {
} else {
wal_dir = NormalizePath(options_.wal_dir + "/");
}
std::optional<SequenceNumber> prev_batch_seqno;
std::optional<uint32_t> prev_batch_count;
for (auto& wal : wal_files) {
// TODO(qyang): option.wal_dir should be passed into ldb command
std::string filename = wal_dir + wal->PathName();
std::cout << filename << std::endl;
// TODO(myabandeh): allow configuring is_write_commited
DumpWalFile(options_, filename, true, true, true /* is_write_commited */,
ucmps_, &exec_state_);
DumpWalFile(
options_, filename, true /* print_header */, true /* print_values */,
false /* only_print_seqno_gapstrue */, true /* is_write_commited */,
ucmps_, &exec_state_, &prev_batch_seqno, &prev_batch_count);
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions tools/ldb_cmd_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -379,13 +379,15 @@ class WALDumperCommand : public LDBCommand {
bool print_header_;
std::string wal_file_;
bool print_values_;
bool only_print_seqno_gaps_;
bool is_write_committed_; // default will be set to true
bool no_db_open_ = true;

static const std::string ARG_WAL_FILE;
static const std::string ARG_WRITE_COMMITTED;
static const std::string ARG_PRINT_HEADER;
static const std::string ARG_PRINT_VALUE;
static const std::string ARG_ONLY_PRINT_SEQNO_GAPS;
};

class GetCommand : public LDBCommand {
Expand Down
2 changes: 1 addition & 1 deletion tools/ldb_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ def testMiscAdminTask(self):
0
== run_err_null(
"./ldb dump_wal --db=%s --walfile=%s --header"
% (origDbPath, os.path.join(origDbPath, "LOG"))
% (origDbPath, origDbPath)
)
)
self.assertRunOK("scan", "x1 ==> y1\nx2 ==> y2\nx3 ==> y3\nx4 ==> y4")
Expand Down

0 comments on commit 1238120

Please sign in to comment.