Skip to content

Commit

Permalink
Update APIs to support generic unique identifier format (facebook#12384)
Browse files Browse the repository at this point in the history
Summary:
The current design proposes using a combination of `job_id`, `db_id`, and `db_session_id` to create a unique identifier for remote compaction jobs. However, this approach may not be suitable for users who prefer a different format for the unique identifier.

At Meta, we are utilizing generic compute offload to offload compaction tasks to remote workers. The compute offload client generates a UUID for each task, which requires an update to the current RocksDB API for onboarding purposes.

Users still have the option to create the unique identifier by combining `job_id`, `db_id`, and `db_session_id` if they prefer.

Pull Request resolved: facebook#12384

Test Plan:
```
$> ./compaction_service_test                                                                                                                             13:29:35
[==========] Running 14 tests from 1 test case.
[----------] Global test environment set-up.
[----------] 14 tests from CompactionServiceTest
[ RUN      ] CompactionServiceTest.BasicCompactions
[       OK ] CompactionServiceTest.BasicCompactions (2642 ms)
[ RUN      ] CompactionServiceTest.ManualCompaction
[       OK ] CompactionServiceTest.ManualCompaction (454 ms)
[ RUN      ] CompactionServiceTest.CancelCompactionOnRemoteSide
[       OK ] CompactionServiceTest.CancelCompactionOnRemoteSide (1643 ms)
[ RUN      ] CompactionServiceTest.FailedToStart
[       OK ] CompactionServiceTest.FailedToStart (1332 ms)
[ RUN      ] CompactionServiceTest.InvalidResult
[       OK ] CompactionServiceTest.InvalidResult (1516 ms)
[ RUN      ] CompactionServiceTest.SubCompaction
[       OK ] CompactionServiceTest.SubCompaction (551 ms)
[ RUN      ] CompactionServiceTest.CompactionFilter
[       OK ] CompactionServiceTest.CompactionFilter (563 ms)
[ RUN      ] CompactionServiceTest.Snapshot
[       OK ] CompactionServiceTest.Snapshot (124 ms)
[ RUN      ] CompactionServiceTest.ConcurrentCompaction
[       OK ] CompactionServiceTest.ConcurrentCompaction (660 ms)
[ RUN      ] CompactionServiceTest.CompactionInfo
[       OK ] CompactionServiceTest.CompactionInfo (984 ms)
[ RUN      ] CompactionServiceTest.FallbackLocalAuto
[       OK ] CompactionServiceTest.FallbackLocalAuto (343 ms)
[ RUN      ] CompactionServiceTest.FallbackLocalManual
[       OK ] CompactionServiceTest.FallbackLocalManual (380 ms)
[ RUN      ] CompactionServiceTest.RemoteEventListener
[       OK ] CompactionServiceTest.RemoteEventListener (491 ms)
[ RUN      ] CompactionServiceTest.TablePropertiesCollector
[       OK ] CompactionServiceTest.TablePropertiesCollector (169 ms)
[----------] 14 tests from CompactionServiceTest (11854 ms total)

[----------] Global test environment tear-down
[==========] 14 tests from 1 test case ran. (11855 ms total)
[  PASSED  ] 14 tests.
```

Reviewed By: hx235

Differential Revision: D54220339

Pulled By: jaykorean

fbshipit-source-id: 5a9054f31933d1996adca02082eb37b6d5353224
  • Loading branch information
jaykorean authored and facebook-github-bot committed Mar 1, 2024
1 parent 4aed229 commit 5bcc184
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 39 deletions.
28 changes: 14 additions & 14 deletions db/compaction/compaction_service_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,24 +74,24 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
compaction_input.output_level, input_files_oss.str().c_str());
CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_,
GetCompactionId(sub_compact), thread_pri_);
CompactionServiceJobStatus compaction_status =
db_options_.compaction_service->StartV2(info, compaction_input_binary);
switch (compaction_status) {
CompactionServiceScheduleResponse response =
db_options_.compaction_service->Schedule(info, compaction_input_binary);
switch (response.status) {
case CompactionServiceJobStatus::kSuccess:
break;
case CompactionServiceJobStatus::kFailure:
sub_compact->status = Status::Incomplete(
"CompactionService failed to start compaction job.");
"CompactionService failed to schedule a remote compaction job.");
ROCKS_LOG_WARN(db_options_.info_log,
"[%s] [JOB %d] Remote compaction failed to start.",
compaction_input.column_family.name.c_str(), job_id_);
return compaction_status;
return response.status;
case CompactionServiceJobStatus::kUseLocal:
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Remote compaction fallback to local by API Start.",
"[%s] [JOB %d] Remote compaction fallback to local by API (Schedule)",
compaction_input.column_family.name.c_str(), job_id_);
return compaction_status;
return response.status;
default:
assert(false); // unknown status
break;
Expand All @@ -101,14 +101,15 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
"[%s] [JOB %d] Waiting for remote compaction...",
compaction_input.column_family.name.c_str(), job_id_);
std::string compaction_result_binary;
compaction_status = db_options_.compaction_service->WaitForCompleteV2(
info, &compaction_result_binary);
CompactionServiceJobStatus compaction_status =
db_options_.compaction_service->Wait(response.scheduled_job_id,
&compaction_result_binary);

if (compaction_status == CompactionServiceJobStatus::kUseLocal) {
ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Remote compaction fallback to local by API "
"WaitForComplete.",
compaction_input.column_family.name.c_str(), job_id_);
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Remote compaction fallback to local by API (Wait)",
compaction_input.column_family.name.c_str(), job_id_);
return compaction_status;
}

Expand Down Expand Up @@ -830,4 +831,3 @@ bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other,
}
#endif // NDEBUG
} // namespace ROCKSDB_NAMESPACE

55 changes: 30 additions & 25 deletions db/compaction/compaction_service_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,40 +31,44 @@ class MyTestCompactionService : public CompactionService {

const char* Name() const override { return kClassName(); }

CompactionServiceJobStatus StartV2(
CompactionServiceScheduleResponse Schedule(
const CompactionServiceJobInfo& info,
const std::string& compaction_service_input) override {
InstrumentedMutexLock l(&mutex_);
start_info_ = info;
assert(info.db_name == db_path_);
jobs_.emplace(info.job_id, compaction_service_input);
CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess;
if (is_override_start_status_) {
return override_start_status_;
}
return s;
}

CompactionServiceJobStatus WaitForCompleteV2(
const CompactionServiceJobInfo& info,
std::string* compaction_service_result) override {
std::string unique_id = Env::Default()->GenerateUniqueId();
jobs_.emplace(unique_id, compaction_service_input);
infos_.emplace(unique_id, info);
CompactionServiceScheduleResponse response(
unique_id, is_override_start_status_
? override_start_status_
: CompactionServiceJobStatus::kSuccess);
return response;
}

CompactionServiceJobStatus Wait(const std::string& scheduled_job_id,
std::string* result) override {
std::string compaction_input;
assert(info.db_name == db_path_);
{
InstrumentedMutexLock l(&mutex_);
wait_info_ = info;
auto i = jobs_.find(info.job_id);
if (i == jobs_.end()) {
auto job_index = jobs_.find(scheduled_job_id);
if (job_index == jobs_.end()) {
return CompactionServiceJobStatus::kFailure;
}
compaction_input = std::move(i->second);
jobs_.erase(i);
}
compaction_input = std::move(job_index->second);
jobs_.erase(job_index);

auto info_index = infos_.find(scheduled_job_id);
if (info_index == infos_.end()) {
return CompactionServiceJobStatus::kFailure;
}
wait_info_ = std::move(info_index->second);
infos_.erase(info_index);
}
if (is_override_wait_status_) {
return override_wait_status_;
}

CompactionServiceOptionsOverride options_override;
options_override.env = options_.env;
options_override.file_checksum_gen_factory =
Expand All @@ -90,11 +94,11 @@ class MyTestCompactionService : public CompactionService {
OpenAndCompactOptions options;
options.canceled = &canceled_;

Status s = DB::OpenAndCompact(
options, db_path_, db_path_ + "/" + std::to_string(info.job_id),
compaction_input, compaction_service_result, options_override);
Status s =
DB::OpenAndCompact(options, db_path_, db_path_ + "/" + scheduled_job_id,
compaction_input, result, options_override);
if (is_override_wait_result_) {
*compaction_service_result = override_wait_result_;
*result = override_wait_result_;
}
compaction_num_.fetch_add(1);
if (s.ok()) {
Expand Down Expand Up @@ -135,7 +139,8 @@ class MyTestCompactionService : public CompactionService {
private:
InstrumentedMutex mutex_;
std::atomic_int compaction_num_{0};
std::map<uint64_t, std::string> jobs_;
std::map<std::string, std::string> jobs_;
std::map<std::string, CompactionServiceJobInfo> infos_;
const std::string db_path_;
Options options_;
std::shared_ptr<Statistics> statistics_;
Expand Down
29 changes: 29 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,17 @@ struct CompactionServiceJobInfo {
priority(priority_) {}
};

struct CompactionServiceScheduleResponse {
std::string scheduled_job_id; // Generated outside of primary host, unique
// across different DBs and sessions
CompactionServiceJobStatus status;
CompactionServiceScheduleResponse(std::string scheduled_job_id_,
CompactionServiceJobStatus status_)
: scheduled_job_id(scheduled_job_id_), status(status_) {}
explicit CompactionServiceScheduleResponse(CompactionServiceJobStatus status_)
: status(status_) {}
};

// Exceptions MUST NOT propagate out of overridden functions into RocksDB,
// because RocksDB is not exception-safe. This could cause undefined behavior
// including data loss, unreported corruption, deadlocks, and more.
Expand All @@ -438,6 +449,24 @@ class CompactionService : public Customizable {
// Returns the name of this compaction service.
const char* Name() const override = 0;

// Schedule compaction to be processed remotely.
virtual CompactionServiceScheduleResponse Schedule(
const CompactionServiceJobInfo& /*info*/,
const std::string& /*compaction_service_input*/) {
CompactionServiceScheduleResponse response(
CompactionServiceJobStatus::kUseLocal);
return response;
}

// Wait for the scheduled compaction to finish from the remote worker
virtual CompactionServiceJobStatus Wait(
const std::string& /*scheduled_job_id*/, std::string* /*result*/) {
return CompactionServiceJobStatus::kUseLocal;
}

// Deprecated. Please implement Schedule() and Wait() API to handle remote
// compaction

// Start the remote compaction with `compaction_service_input`, which can be
// passed to `DB::OpenAndCompact()` on the remote side. `info` provides the
// information the user might want to know, which includes `job_id`.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Deprecate experimental Remote Compaction APIs - StartV2() and WaitForCompleteV2() and introduce Schedule() and Wait(). The new APIs essentially does the same thing as the old APIs. They allow taking externally generated unique id to wait for remote compaction to complete.

0 comments on commit 5bcc184

Please sign in to comment.