From db28b914ca4d806f3eddbac55323aa9372ecfda5 Mon Sep 17 00:00:00 2001 From: Yu Zhang Date: Fri, 13 Sep 2024 14:52:16 -0700 Subject: [PATCH] Support dump a directory of wal in chronological order --- tools/ldb_cmd.cc | 226 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 179 insertions(+), 47 deletions(-) diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index a2062c1563e..845867c07e3 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -114,12 +114,90 @@ const std::string LDBCommand::ARG_READ_TIMESTAMP = "read_timestamp"; const char* LDBCommand::DELIM = " ==> "; namespace { +// Helper class to iterate WAL logs in a directory in chronological order. +class WALPicker { + public: + explicit WALPicker(const std::string& parent_dir, + const std::vector& filenames); + // REQUIRES Valid() == true + std::string GetNextWAL(); + bool Valid() const { return wal_file_iter_ != log_files_.end(); } + + private: + // WAL log file names(s) + std::string parent_dir_; + std::vector log_files_; + std::vector::const_iterator wal_file_iter_; +}; + +WALPicker::WALPicker(const std::string& parent_dir, + const std::vector& filenames) + : parent_dir_(parent_dir) { + // populate wal logs + assert(!filenames.empty()); + for (const auto& fname : filenames) { + uint64_t file_num = 0; + FileType file_type; + bool parse_ok = ParseFileName(fname, &file_num, &file_type); + if (parse_ok && file_type == kWalFile) { + log_files_.push_back(fname); + } + } + + std::sort(log_files_.begin(), log_files_.end(), + [](const std::string& lhs, const std::string& rhs) { + uint64_t num1 = 0; + uint64_t num2 = 0; + FileType type1; + FileType type2; + bool parse_ok1 = ParseFileName(lhs, &num1, &type1); + bool parse_ok2 = ParseFileName(rhs, &num2, &type2); +#ifndef NDEBUG + assert(parse_ok1); + assert(parse_ok2); +#else + (void)parse_ok1; + (void)parse_ok2; +#endif + return num1 < num2; + }); + wal_file_iter_ = log_files_.begin(); +} + +std::string WALPicker::GetNextWAL() { + assert(Valid()); + std::string ret; + if (wal_file_iter_ != log_files_.end()) { + ret.assign(parent_dir_); + if (ret.back() != kFilePathSeparator) { + ret.push_back(kFilePathSeparator); + } + ret.append(*wal_file_iter_); + ++wal_file_iter_; + } + return ret; +} + +void DumpWalFiles(Options options, const std::string& dir_or_file, + bool print_header, bool print_values, + bool only_print_seqno_gaps, bool is_write_committed, + const std::map& ucmps, + LDBCommandExecuteResult* exec_state); + +void DumpWalFiles(Options options, + const std::vector>& sorted_wal_files, + bool print_header, bool print_values, + bool only_print_seqno_gaps, bool is_write_committed, + const std::map& ucmps, + LDBCommandExecuteResult* exec_state); -void DumpWalFile(Options options, std::string wal_file, bool print_header, - bool print_values, bool only_print_seqno_gaps, - bool is_write_committed, +void DumpWalFile(Options options, const std::string& wal_file, + bool print_header, bool print_values, + bool only_print_seqno_gaps, bool is_write_committed, const std::map& ucmps, - LDBCommandExecuteResult* exec_state); + LDBCommandExecuteResult* exec_state, + std::optional* prev_batch_seqno, + std::optional* prev_batch_count); void DumpSstFile(Options options, std::string filename, bool output_hex, bool show_properties, bool decode_blob_index, @@ -2214,10 +2292,10 @@ void DBDumperCommand::DoCommand() { switch (type) { case kWalFile: // TODO(myabandeh): allow configuring is_write_commited - DumpWalFile(options_, path_, /* print_header_ */ true, - /* print_values_ */ true, - /* only_print_seqno_gaps */ false, - true /* is_write_commited */, ucmps_, &exec_state_); + DumpWalFiles(options_, path_, /* print_header_ */ true, + /* print_values_ */ true, + /* only_print_seqno_gaps */ false, + true /* is_write_commited */, ucmps_, &exec_state_); break; case kTableFile: DumpSstFile(options_, path_, is_key_hex_, /* show_properties */ true, @@ -2844,11 +2922,79 @@ class InMemoryHandler : public WriteBatch::Handler { const std::map ucmps_; }; -void DumpWalFile(Options options, std::string wal_file, bool print_header, - bool print_values, bool only_print_seqno_gaps, - bool is_write_committed, +void DumpWalFiles(Options options, const std::string& dir_or_file, + bool print_header, bool print_values, + bool only_print_seqno_gaps, bool is_write_committed, + const std::map& ucmps, + LDBCommandExecuteResult* exec_state) { + std::vector filenames; + ROCKSDB_NAMESPACE::Env* env = options.env; + ROCKSDB_NAMESPACE::Status st = env->GetChildren(dir_or_file, &filenames); + std::optional prev_batch_seqno = std::nullopt; + std::optional prev_batch_count = std::nullopt; + if (!st.ok() || filenames.empty()) { + // dir_or_file does not exist or does not contain children + // Check its existence first + Status s = env->FileExists(dir_or_file); + // dir_or_file does not exist + if (!s.ok()) { + if (exec_state) { + *exec_state = LDBCommandExecuteResult::Failed( + dir_or_file + ": No such file or directory"); + } + return; + } + DumpWalFile(options, dir_or_file, print_header, print_values, + only_print_seqno_gaps, is_write_committed, ucmps, exec_state, + &prev_batch_seqno, &prev_batch_count); + } else { + WALPicker wal_picker(dir_or_file, filenames); + if (!wal_picker.Valid()) { + if (exec_state) { + *exec_state = LDBCommandExecuteResult::Failed( + dir_or_file + ": No valid wal logs found"); + } + return; + } + std::string wal_file = wal_picker.GetNextWAL(); + while (!wal_file.empty()) { + std::cout << "Checking wal file: " << wal_file << std::endl; + DumpWalFile(options, wal_file, print_header, print_values, + only_print_seqno_gaps, is_write_committed, ucmps, exec_state, + &prev_batch_seqno, &prev_batch_count); + if (exec_state->IsFailed() || !wal_picker.Valid()) { + return; + } + wal_file = wal_picker.GetNextWAL(); + } + } +} + +void DumpWalFiles(Options options, + const std::vector>& sorted_wal_files, + bool print_header, bool print_values, + bool only_print_seqno_gaps, bool is_write_committed, + const std::map& ucmps, + LDBCommandExecuteResult* exec_state) { + std::optional prev_batch_seqno = std::nullopt; + std::optional prev_batch_count = std::nullopt; + for (const auto& wal : sorted_wal_files) { + // TODO(qyang): option.wal_dir should be passed into ldb command + std::string wal_file_name = options.wal_dir + wal->PathName(); + std::cout << "Checking wal file: " << wal_file_name << std::endl; + DumpWalFile(options, wal_file_name, print_header, print_values, + only_print_seqno_gaps, is_write_committed, ucmps, exec_state, + &prev_batch_seqno, &prev_batch_count); + } +} + +void DumpWalFile(Options options, const std::string& wal_file, + bool print_header, bool print_values, + bool only_print_seqno_gaps, bool is_write_committed, const std::map& ucmps, - LDBCommandExecuteResult* exec_state) { + LDBCommandExecuteResult* exec_state, + std::optional* prev_batch_seqno, + std::optional* prev_batch_count) { const auto& fs = options.env->GetFileSystem(); FileOptions soptions(options); std::unique_ptr wal_file_reader; @@ -2898,8 +3044,6 @@ void DumpWalFile(Options options, std::string wal_file, bool print_header, } std::cout << "\n"; } - std::optional prev_batch_seqno = std::nullopt; - std::optional prev_batch_count = std::nullopt; while (status.ok() && reader.ReadRecord(&record, &scratch)) { row.str(""); if (record.size() < WriteBatchInternal::kHeader) { @@ -2955,18 +3099,20 @@ void DumpWalFile(Options options, std::string wal_file, bool print_header, } SequenceNumber sequence_number = WriteBatchInternal::Sequence(&batch); uint32_t batch_count = WriteBatchInternal::Count(&batch); + assert(prev_batch_seqno); + assert(prev_batch_count); if (only_print_seqno_gaps && - (!prev_batch_seqno.has_value() || !prev_batch_count.has_value() || - prev_batch_seqno.value() + prev_batch_count.value() == + (!prev_batch_seqno->has_value() || !prev_batch_count->has_value() || + prev_batch_seqno->value() + prev_batch_count->value() == sequence_number)) { - prev_batch_seqno = sequence_number; - prev_batch_count = batch_count; + *prev_batch_seqno = sequence_number; + *prev_batch_count = batch_count; continue; } row << sequence_number << ","; row << batch_count << ","; - prev_batch_seqno = sequence_number; - prev_batch_count = batch_count; + *prev_batch_seqno = sequence_number; + *prev_batch_count = batch_count; row << WriteBatchInternal::ByteSize(&batch) << ","; row << reader.LastRecordOffset() << ","; ColumnFamilyCollector cf_collector; @@ -3060,21 +3206,22 @@ WALDumperCommand::WALDumperCommand( void WALDumperCommand::Help(std::string& ret) { ret.append(" "); ret.append(WALDumperCommand::Name()); - ret.append(" --" + ARG_WAL_FILE + "="); + ret.append(" --" + ARG_WAL_FILE + + "="); ret.append(" [--" + ARG_DB + "=]"); ret.append(" [--" + ARG_PRINT_HEADER + "] "); ret.append(" [--" + ARG_PRINT_VALUE + "] "); ret.append(" [--" + ARG_ONLY_PRINT_SEQNO_GAPS + - "] (assuming seq_per_batch = false) "); + "] (assuming seq_per_batch=false) "); ret.append(" [--" + ARG_WRITE_COMMITTED + "=true|false] "); ret.append("\n"); } void WALDumperCommand::DoCommand() { PrepareOptions(); - DumpWalFile(options_, wal_file_, print_header_, print_values_, - only_print_seqno_gaps_, is_write_committed_, ucmps_, - &exec_state_); + DumpWalFiles(options_, wal_file_, print_header_, print_values_, + only_print_seqno_gaps_, is_write_committed_, ucmps_, + &exec_state_); } // ---------------------------------------------------------------------------- @@ -4554,28 +4701,13 @@ void DBFileDumperCommand::DoCommand() { std::cout << "Write Ahead Log Files" << std::endl; std::cout << "==============================" << std::endl; - ROCKSDB_NAMESPACE::VectorWalPtr wal_files; - s = db_->GetSortedWalFiles(wal_files); - if (!s.ok()) { - std::cerr << "Error when getting WAL files" << std::endl; - } else { - std::string wal_dir; - if (options_.wal_dir.empty()) { - wal_dir = db_->GetName(); - } else { - wal_dir = NormalizePath(options_.wal_dir + "/"); - } - for (auto& wal : wal_files) { - // TODO(qyang): option.wal_dir should be passed into ldb command - std::string filename = wal_dir + wal->PathName(); - std::cout << filename << std::endl; - // TODO(myabandeh): allow configuring is_write_commited - DumpWalFile(options_, filename, true /* print_header */, - true /* print_values */, - false /* only_print_seqno_gapstrue */, - true /* is_write_commited */, ucmps_, &exec_state_); - } - } + ROCKSDB_NAMESPACE::VectorWalPtr sorted_wal_files; + s = db_->GetSortedWalFiles(sorted_wal_files); + // TODO(qyang): option.wal_dir should be passed into ldb command + // TODO(myabandeh): allow configuring is_write_commited + DumpWalFiles(options_, sorted_wal_files, true /* print_header */, + true /* print_values */, false /* only_print_seqno_gapstrue */, + true /* is_write_commited */, ucmps_, &exec_state_); } const std::string DBLiveFilesMetadataDumperCommand::ARG_SORT_BY_FILENAME =