Skip to content

Commit

Permalink
Add is_remote_compaction to compaction_job_stats
Browse files Browse the repository at this point in the history
  • Loading branch information
jaykorean committed Sep 24, 2024
1 parent fb980aa commit 39142b3
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 33 deletions.
3 changes: 2 additions & 1 deletion db/compaction/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ void VerifyInitializationOfCompactionJobStats(
ASSERT_EQ(compaction_job_stats.num_output_records, 0U);
ASSERT_EQ(compaction_job_stats.num_output_files, 0U);

ASSERT_EQ(compaction_job_stats.is_manual_compaction, true);
ASSERT_TRUE(compaction_job_stats.is_manual_compaction);
ASSERT_FALSE(compaction_job_stats.is_remote_compaction);

ASSERT_EQ(compaction_job_stats.total_input_bytes, 0U);
ASSERT_EQ(compaction_job_stats.total_output_bytes, 0U);
Expand Down
5 changes: 5 additions & 0 deletions db/compaction/compaction_service_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ Status CompactionServiceCompactionJob::Run() {
// Build compaction result
compaction_result_->output_level = compact_->compaction->output_level();
compaction_result_->output_path = output_path_;
compaction_result_->stats.is_remote_compaction = true;
for (const auto& output_file : sub_compact->GetOutputs()) {
auto& meta = output_file.meta;
compaction_result_->output_files.emplace_back(
Expand Down Expand Up @@ -530,6 +531,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
{offsetof(struct CompactionJobStats, is_manual_compaction),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"is_remote_compaction",
{offsetof(struct CompactionJobStats, is_remote_compaction),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"total_input_bytes",
{offsetof(struct CompactionJobStats, total_input_bytes),
OptionType::kUInt64T, OptionVerificationType::kNormal,
Expand Down
17 changes: 15 additions & 2 deletions db/compaction/compaction_service_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,14 @@ class MyTestCompactionService : public CompactionService {
Status s =
DB::OpenAndCompact(options, db_path_, db_path_ + "/" + scheduled_job_id,
compaction_input, result, options_override);
if (is_override_wait_result_) {
*result = override_wait_result_;
{
InstrumentedMutexLock l(&mutex_);
if (is_override_wait_result_) {
*result = override_wait_result_;
} else {
CompactionServiceResult::Read(*result, &deserialized_result_)
.PermitUncheckedError();
}
}
compaction_num_.fetch_add(1);
if (s.ok()) {
Expand Down Expand Up @@ -143,6 +149,8 @@ class MyTestCompactionService : public CompactionService {

void SetCanceled(bool canceled) { canceled_ = canceled; }

CompactionServiceResult GetResult() { return deserialized_result_; }

CompactionServiceJobStatus GetFinalCompactionServiceJobStatus() {
return final_updated_status_.load();
}
Expand All @@ -164,6 +172,7 @@ class MyTestCompactionService : public CompactionService {
CompactionServiceJobStatus override_wait_status_ =
CompactionServiceJobStatus::kFailure;
bool is_override_wait_result_ = false;
CompactionServiceResult deserialized_result_;
std::string override_wait_result_;
std::vector<std::shared_ptr<EventListener>> listeners_;
std::vector<std::shared_ptr<TablePropertiesCollectorFactory>>
Expand Down Expand Up @@ -333,6 +342,7 @@ TEST_F(CompactionServiceTest, BasicCompactions) {
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "cf_1", "cf_2", "cf_3"},
options);
ASSERT_GT(verify_passed, 0);
ASSERT_TRUE(my_cs->GetResult().stats.is_remote_compaction);
Close();
}

Expand Down Expand Up @@ -371,6 +381,9 @@ TEST_F(CompactionServiceTest, ManualCompaction) {
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
VerifyTestData();

ASSERT_TRUE(my_cs->GetResult().stats.is_manual_compaction);
ASSERT_TRUE(my_cs->GetResult().stats.is_remote_compaction);
}

TEST_F(CompactionServiceTest, CancelCompactionOnRemoteSide) {
Expand Down
63 changes: 33 additions & 30 deletions include/rocksdb/compaction_job_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,93 +19,96 @@ struct CompactionJobStats {
void Add(const CompactionJobStats& stats);

// the elapsed time of this compaction in microseconds.
uint64_t elapsed_micros;
uint64_t elapsed_micros = 0;

// the elapsed CPU time of this compaction in microseconds.
uint64_t cpu_micros;
uint64_t cpu_micros = 0;

// Used internally indicating whether a subcompaction's
// `num_input_records` is accurate.
bool has_num_input_records;
bool has_num_input_records = false;
// the number of compaction input records.
uint64_t num_input_records;
uint64_t num_input_records = 0;
// the number of blobs read from blob files
uint64_t num_blobs_read;
uint64_t num_blobs_read = 0;
// the number of compaction input files (table files)
size_t num_input_files;
size_t num_input_files = 0;
// the number of compaction input files at the output level (table files)
size_t num_input_files_at_output_level;
size_t num_input_files_at_output_level = 0;

// the number of compaction output records.
uint64_t num_output_records;
uint64_t num_output_records = 0;
// the number of compaction output files (table files)
size_t num_output_files;
size_t num_output_files = 0;
// the number of compaction output files (blob files)
size_t num_output_files_blob;
size_t num_output_files_blob = 0;

// true if the compaction is a full compaction (all live SST files input)
bool is_full_compaction;
bool is_full_compaction = false;
// true if the compaction is a manual compaction
bool is_manual_compaction;
bool is_manual_compaction = false;
// true if the compaction ran in a remote worker
bool is_remote_compaction = false;

// the total size of table files in the compaction input
uint64_t total_input_bytes;
uint64_t total_input_bytes = 0;
// the total size of blobs read from blob files
uint64_t total_blob_bytes_read;
uint64_t total_blob_bytes_read = 0;
// the total size of table files in the compaction output
uint64_t total_output_bytes;
uint64_t total_output_bytes = 0;
// the total size of blob files in the compaction output
uint64_t total_output_bytes_blob;
uint64_t total_output_bytes_blob = 0;
;

// number of records being replaced by newer record associated with same key.
// this could be a new value or a deletion entry for that key so this field
// sums up all updated and deleted keys
uint64_t num_records_replaced;
uint64_t num_records_replaced = 0;

// the sum of the uncompressed input keys in bytes.
uint64_t total_input_raw_key_bytes;
uint64_t total_input_raw_key_bytes = 0;
// the sum of the uncompressed input values in bytes.
uint64_t total_input_raw_value_bytes;
uint64_t total_input_raw_value_bytes = 0;

// the number of deletion entries before compaction. Deletion entries
// can disappear after compaction because they expired
uint64_t num_input_deletion_records;
uint64_t num_input_deletion_records = 0;
// number of deletion records that were found obsolete and discarded
// because it is not possible to delete any more keys with this entry
// (i.e. all possible deletions resulting from it have been completed)
uint64_t num_expired_deletion_records;
uint64_t num_expired_deletion_records = 0;

// number of corrupt keys (ParseInternalKey returned false when applied to
// the key) encountered and written out.
uint64_t num_corrupt_keys;
uint64_t num_corrupt_keys = 0;

// Following counters are only populated if
// options.report_bg_io_stats = true;

// Time spent on file's Append() call.
uint64_t file_write_nanos;
uint64_t file_write_nanos = 0;

// Time spent on sync file range.
uint64_t file_range_sync_nanos;
uint64_t file_range_sync_nanos = 0;

// Time spent on file fsync.
uint64_t file_fsync_nanos;
uint64_t file_fsync_nanos = 0;

// Time spent on preparing file write (fallocate, etc)
uint64_t file_prepare_write_nanos;
uint64_t file_prepare_write_nanos = 0;

// 0-terminated strings storing the first 8 bytes of the smallest and
// largest key in the output.
static const size_t kMaxPrefixLength = 8;

std::string smallest_output_key_prefix;
std::string largest_output_key_prefix;
std::string smallest_output_key_prefix = "";
std::string largest_output_key_prefix = "";

// number of single-deletes which do not meet a put
uint64_t num_single_del_fallthru;
uint64_t num_single_del_fallthru = 0;

// number of single-deletes which meet something other than a put
uint64_t num_single_del_mismatch;
uint64_t num_single_del_mismatch = 0;

// TODO: Add output_to_penultimate_level output information
};
Expand Down
1 change: 1 addition & 0 deletions util/compaction_job_stats_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ void CompactionJobStats::Reset() {

is_full_compaction = false;
is_manual_compaction = false;
is_remote_compaction = false;

total_input_bytes = 0;
total_blob_bytes_read = 0;
Expand Down

0 comments on commit 39142b3

Please sign in to comment.