From 5bcc184975a53c8648836b43556501e648e0a6ae Mon Sep 17 00:00:00 2001 From: Jay Huh Date: Fri, 1 Mar 2024 09:55:30 -0800 Subject: [PATCH] Update APIs to support generic unique identifier format (#12384) Summary: The current design proposes using a combination of `job_id`, `db_id`, and `db_session_id` to create a unique identifier for remote compaction jobs. However, this approach may not be suitable for users who prefer a different format for the unique identifier. At Meta, we are utilizing generic compute offload to offload compaction tasks to remote workers. The compute offload client generates a UUID for each task, which requires an update to the current RocksDB API for onboarding purposes. Users still have the option to create the unique identifier by combining `job_id`, `db_id`, and `db_session_id` if they prefer. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12384 Test Plan: ``` $> ./compaction_service_test 13:29:35 [==========] Running 14 tests from 1 test case. [----------] Global test environment set-up. [----------] 14 tests from CompactionServiceTest [ RUN ] CompactionServiceTest.BasicCompactions [ OK ] CompactionServiceTest.BasicCompactions (2642 ms) [ RUN ] CompactionServiceTest.ManualCompaction [ OK ] CompactionServiceTest.ManualCompaction (454 ms) [ RUN ] CompactionServiceTest.CancelCompactionOnRemoteSide [ OK ] CompactionServiceTest.CancelCompactionOnRemoteSide (1643 ms) [ RUN ] CompactionServiceTest.FailedToStart [ OK ] CompactionServiceTest.FailedToStart (1332 ms) [ RUN ] CompactionServiceTest.InvalidResult [ OK ] CompactionServiceTest.InvalidResult (1516 ms) [ RUN ] CompactionServiceTest.SubCompaction [ OK ] CompactionServiceTest.SubCompaction (551 ms) [ RUN ] CompactionServiceTest.CompactionFilter [ OK ] CompactionServiceTest.CompactionFilter (563 ms) [ RUN ] CompactionServiceTest.Snapshot [ OK ] CompactionServiceTest.Snapshot (124 ms) [ RUN ] CompactionServiceTest.ConcurrentCompaction [ OK ] CompactionServiceTest.ConcurrentCompaction (660 ms) [ RUN ] CompactionServiceTest.CompactionInfo [ OK ] CompactionServiceTest.CompactionInfo (984 ms) [ RUN ] CompactionServiceTest.FallbackLocalAuto [ OK ] CompactionServiceTest.FallbackLocalAuto (343 ms) [ RUN ] CompactionServiceTest.FallbackLocalManual [ OK ] CompactionServiceTest.FallbackLocalManual (380 ms) [ RUN ] CompactionServiceTest.RemoteEventListener [ OK ] CompactionServiceTest.RemoteEventListener (491 ms) [ RUN ] CompactionServiceTest.TablePropertiesCollector [ OK ] CompactionServiceTest.TablePropertiesCollector (169 ms) [----------] 14 tests from CompactionServiceTest (11854 ms total) [----------] Global test environment tear-down [==========] 14 tests from 1 test case ran. (11855 ms total) [ PASSED ] 14 tests. ``` Reviewed By: hx235 Differential Revision: D54220339 Pulled By: jaykorean fbshipit-source-id: 5a9054f31933d1996adca02082eb37b6d5353224 --- db/compaction/compaction_service_job.cc | 28 +++++----- db/compaction/compaction_service_test.cc | 55 ++++++++++--------- include/rocksdb/options.h | 29 ++++++++++ .../new_remote_compaction_api.md | 1 + 4 files changed, 74 insertions(+), 39 deletions(-) create mode 100644 unreleased_history/public_api_changes/new_remote_compaction_api.md diff --git a/db/compaction/compaction_service_job.cc b/db/compaction/compaction_service_job.cc index 442eaf8ea72..2411c27aac3 100644 --- a/db/compaction/compaction_service_job.cc +++ b/db/compaction/compaction_service_job.cc @@ -74,24 +74,24 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( compaction_input.output_level, input_files_oss.str().c_str()); CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_, GetCompactionId(sub_compact), thread_pri_); - CompactionServiceJobStatus compaction_status = - db_options_.compaction_service->StartV2(info, compaction_input_binary); - switch (compaction_status) { + CompactionServiceScheduleResponse response = + db_options_.compaction_service->Schedule(info, compaction_input_binary); + switch (response.status) { case CompactionServiceJobStatus::kSuccess: break; case CompactionServiceJobStatus::kFailure: sub_compact->status = Status::Incomplete( - "CompactionService failed to start compaction job."); + "CompactionService failed to schedule a remote compaction job."); ROCKS_LOG_WARN(db_options_.info_log, "[%s] [JOB %d] Remote compaction failed to start.", compaction_input.column_family.name.c_str(), job_id_); - return compaction_status; + return response.status; case CompactionServiceJobStatus::kUseLocal: ROCKS_LOG_INFO( db_options_.info_log, - "[%s] [JOB %d] Remote compaction fallback to local by API Start.", + "[%s] [JOB %d] Remote compaction fallback to local by API (Schedule)", compaction_input.column_family.name.c_str(), job_id_); - return compaction_status; + return response.status; default: assert(false); // unknown status break; @@ -101,14 +101,15 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( "[%s] [JOB %d] Waiting for remote compaction...", compaction_input.column_family.name.c_str(), job_id_); std::string compaction_result_binary; - compaction_status = db_options_.compaction_service->WaitForCompleteV2( - info, &compaction_result_binary); + CompactionServiceJobStatus compaction_status = + db_options_.compaction_service->Wait(response.scheduled_job_id, + &compaction_result_binary); if (compaction_status == CompactionServiceJobStatus::kUseLocal) { - ROCKS_LOG_INFO(db_options_.info_log, - "[%s] [JOB %d] Remote compaction fallback to local by API " - "WaitForComplete.", - compaction_input.column_family.name.c_str(), job_id_); + ROCKS_LOG_INFO( + db_options_.info_log, + "[%s] [JOB %d] Remote compaction fallback to local by API (Wait)", + compaction_input.column_family.name.c_str(), job_id_); return compaction_status; } @@ -830,4 +831,3 @@ bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other, } #endif // NDEBUG } // namespace ROCKSDB_NAMESPACE - diff --git a/db/compaction/compaction_service_test.cc b/db/compaction/compaction_service_test.cc index 7c87f88d1be..3fd6ad83bca 100644 --- a/db/compaction/compaction_service_test.cc +++ b/db/compaction/compaction_service_test.cc @@ -31,40 +31,44 @@ class MyTestCompactionService : public CompactionService { const char* Name() const override { return kClassName(); } - CompactionServiceJobStatus StartV2( + CompactionServiceScheduleResponse Schedule( const CompactionServiceJobInfo& info, const std::string& compaction_service_input) override { InstrumentedMutexLock l(&mutex_); start_info_ = info; assert(info.db_name == db_path_); - jobs_.emplace(info.job_id, compaction_service_input); - CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess; - if (is_override_start_status_) { - return override_start_status_; - } - return s; - } - - CompactionServiceJobStatus WaitForCompleteV2( - const CompactionServiceJobInfo& info, - std::string* compaction_service_result) override { + std::string unique_id = Env::Default()->GenerateUniqueId(); + jobs_.emplace(unique_id, compaction_service_input); + infos_.emplace(unique_id, info); + CompactionServiceScheduleResponse response( + unique_id, is_override_start_status_ + ? override_start_status_ + : CompactionServiceJobStatus::kSuccess); + return response; + } + + CompactionServiceJobStatus Wait(const std::string& scheduled_job_id, + std::string* result) override { std::string compaction_input; - assert(info.db_name == db_path_); { InstrumentedMutexLock l(&mutex_); - wait_info_ = info; - auto i = jobs_.find(info.job_id); - if (i == jobs_.end()) { + auto job_index = jobs_.find(scheduled_job_id); + if (job_index == jobs_.end()) { return CompactionServiceJobStatus::kFailure; } - compaction_input = std::move(i->second); - jobs_.erase(i); - } + compaction_input = std::move(job_index->second); + jobs_.erase(job_index); + auto info_index = infos_.find(scheduled_job_id); + if (info_index == infos_.end()) { + return CompactionServiceJobStatus::kFailure; + } + wait_info_ = std::move(info_index->second); + infos_.erase(info_index); + } if (is_override_wait_status_) { return override_wait_status_; } - CompactionServiceOptionsOverride options_override; options_override.env = options_.env; options_override.file_checksum_gen_factory = @@ -90,11 +94,11 @@ class MyTestCompactionService : public CompactionService { OpenAndCompactOptions options; options.canceled = &canceled_; - Status s = DB::OpenAndCompact( - options, db_path_, db_path_ + "/" + std::to_string(info.job_id), - compaction_input, compaction_service_result, options_override); + Status s = + DB::OpenAndCompact(options, db_path_, db_path_ + "/" + scheduled_job_id, + compaction_input, result, options_override); if (is_override_wait_result_) { - *compaction_service_result = override_wait_result_; + *result = override_wait_result_; } compaction_num_.fetch_add(1); if (s.ok()) { @@ -135,7 +139,8 @@ class MyTestCompactionService : public CompactionService { private: InstrumentedMutex mutex_; std::atomic_int compaction_num_{0}; - std::map jobs_; + std::map jobs_; + std::map infos_; const std::string db_path_; Options options_; std::shared_ptr statistics_; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 8932075de0a..5a8d8a9eef5 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -428,6 +428,17 @@ struct CompactionServiceJobInfo { priority(priority_) {} }; +struct CompactionServiceScheduleResponse { + std::string scheduled_job_id; // Generated outside of primary host, unique + // across different DBs and sessions + CompactionServiceJobStatus status; + CompactionServiceScheduleResponse(std::string scheduled_job_id_, + CompactionServiceJobStatus status_) + : scheduled_job_id(scheduled_job_id_), status(status_) {} + explicit CompactionServiceScheduleResponse(CompactionServiceJobStatus status_) + : status(status_) {} +}; + // Exceptions MUST NOT propagate out of overridden functions into RocksDB, // because RocksDB is not exception-safe. This could cause undefined behavior // including data loss, unreported corruption, deadlocks, and more. @@ -438,6 +449,24 @@ class CompactionService : public Customizable { // Returns the name of this compaction service. const char* Name() const override = 0; + // Schedule compaction to be processed remotely. + virtual CompactionServiceScheduleResponse Schedule( + const CompactionServiceJobInfo& /*info*/, + const std::string& /*compaction_service_input*/) { + CompactionServiceScheduleResponse response( + CompactionServiceJobStatus::kUseLocal); + return response; + } + + // Wait for the scheduled compaction to finish from the remote worker + virtual CompactionServiceJobStatus Wait( + const std::string& /*scheduled_job_id*/, std::string* /*result*/) { + return CompactionServiceJobStatus::kUseLocal; + } + + // Deprecated. Please implement Schedule() and Wait() API to handle remote + // compaction + // Start the remote compaction with `compaction_service_input`, which can be // passed to `DB::OpenAndCompact()` on the remote side. `info` provides the // information the user might want to know, which includes `job_id`. diff --git a/unreleased_history/public_api_changes/new_remote_compaction_api.md b/unreleased_history/public_api_changes/new_remote_compaction_api.md new file mode 100644 index 00000000000..eac106c9de0 --- /dev/null +++ b/unreleased_history/public_api_changes/new_remote_compaction_api.md @@ -0,0 +1 @@ +Deprecate experimental Remote Compaction APIs - StartV2() and WaitForCompleteV2() and introduce Schedule() and Wait(). The new APIs essentially does the same thing as the old APIs. They allow taking externally generated unique id to wait for remote compaction to complete.