diff --git a/mooncake-integration/store/store_py.cpp b/mooncake-integration/store/store_py.cpp index 0c48fd3e1..9faf207b7 100644 --- a/mooncake-integration/store/store_py.cpp +++ b/mooncake-integration/store/store_py.cpp @@ -10,6 +10,7 @@ #include #include "types.h" +#include "utils.h" namespace py = pybind11; diff --git a/mooncake-store/include/client.h b/mooncake-store/include/client.h index 350332c45..10f6fdd99 100644 --- a/mooncake-store/include/client.h +++ b/mooncake-store/include/client.h @@ -19,6 +19,7 @@ namespace mooncake { class PutOperation; +struct PutOp; /** * @brief Client for interacting with the mooncake distributed object store @@ -247,16 +248,14 @@ class Client { /** * @brief Batch put helper methods for structured approach */ - std::vector CreatePutOperations( + std::vector makeOps( const std::vector& keys, const std::vector>& batched_slices); - void StartBatchPut(std::vector& ops, - const ReplicateConfig& config); - void SubmitTransfers(std::vector& ops); - void WaitForTransfers(std::vector& ops); - void FinalizeBatchPut(std::vector& ops); - std::vector> CollectResults( - const std::vector& ops); + void stageStart(std::vector& ops, const ReplicateConfig& config); + void stageTransfer(std::vector& ops); + void stageEnd(std::vector& ops); + std::vector> collect( + const std::vector& ops); // Core components TransferEngine transfer_engine_; @@ -286,4 +285,4 @@ class Client { UUID client_id_; }; -} // namespace mooncake \ No newline at end of file +} // namespace mooncake diff --git a/mooncake-store/include/client_batch_put.h b/mooncake-store/include/client_batch_put.h new file mode 100644 index 000000000..ffcba6c0c --- /dev/null +++ b/mooncake-store/include/client_batch_put.h @@ -0,0 +1,34 @@ +#pragma once + +#include + +#include +#include +#include + +#include "transfer_task.h" +#include "types.h" + +namespace mooncake { + +class Client; + +// Calculate total size of all slices in the collection +[[nodiscard]] size_t CalculateSliceSize(const std::vector& slices); +[[nodiscard]] size_t CalculateSliceSize(std::span slices); + +// Represents a single put operation in a batch +struct PutOp { + explicit PutOp(std::string key, std::vector slices) + : key(std::move(key)), slices(std::move(slices)) {} + + std::string key; // Object key to store + std::vector slices; // Data slices to be stored + std::vector + replicas; // Replica locations (filled by stage-1, put start) + std::vector> + replica_futures; // Transfer futures for each replica + tl::expected result; // Final operation result +}; + +} // namespace mooncake diff --git a/mooncake-store/include/master_client.h b/mooncake-store/include/master_client.h index d50740f74..6f44c0503 100644 --- a/mooncake-store/include/master_client.h +++ b/mooncake-store/include/master_client.h @@ -101,7 +101,8 @@ class MasterClient { * @param key Object key * @return tl::expected indicating success/failure */ - [[nodiscard]] tl::expected PutEnd(const std::string& key); + [[nodiscard]] tl::expected PutEnd( + const std::string& key, const std::vector& put_results); /** * @brief Ends a put operation for a batch of objects @@ -109,23 +110,8 @@ class MasterClient { * @return ErrorCode indicating success/failure */ [[nodiscard]] std::vector> BatchPutEnd( - const std::vector& keys); - - /** - * @brief Revokes a put operation - * @param key Object key - * @return tl::expected indicating success/failure - */ - [[nodiscard]] tl::expected PutRevoke( - const std::string& key); - - /** - * @brief Revokes a put operation for a batch of objects - * @param keys Vector of object keys - * @return ErrorCode indicating success/failure - */ - [[nodiscard]] std::vector> BatchPutRevoke( - const std::vector& keys); + const std::vector& keys, + const std::vector>& put_results); /** * @brief Removes an object and all its replicas diff --git a/mooncake-store/include/master_metric_manager.h b/mooncake-store/include/master_metric_manager.h index e35b359c5..ae2db1599 100644 --- a/mooncake-store/include/master_metric_manager.h +++ b/mooncake-store/include/master_metric_manager.h @@ -47,8 +47,6 @@ class MasterMetricManager { void inc_put_start_failures(int64_t val = 1); void inc_put_end_requests(int64_t val = 1); void inc_put_end_failures(int64_t val = 1); - void inc_put_revoke_requests(int64_t val = 1); - void inc_put_revoke_failures(int64_t val = 1); void inc_get_replica_list_requests(int64_t val = 1); void inc_get_replica_list_failures(int64_t val = 1); void inc_exist_key_requests(int64_t val = 1); @@ -75,17 +73,12 @@ class MasterMetricManager { void inc_batch_put_start_failures(int64_t val = 1); void inc_batch_put_end_requests(int64_t val = 1); void inc_batch_put_end_failures(int64_t val = 1); - void inc_batch_put_revoke_requests(int64_t val = 1); - void inc_batch_put_revoke_failures(int64_t val = 1); - // Operation Statistics Getters int64_t get_put_start_requests(); int64_t get_put_start_failures(); int64_t get_put_end_requests(); int64_t get_put_end_failures(); - int64_t get_put_revoke_requests(); - int64_t get_put_revoke_failures(); int64_t get_get_replica_list_requests(); int64_t get_get_replica_list_failures(); int64_t get_exist_key_requests(); @@ -112,12 +105,10 @@ class MasterMetricManager { int64_t get_batch_put_start_failures(); int64_t get_batch_put_end_requests(); int64_t get_batch_put_end_failures(); - int64_t get_batch_put_revoke_requests(); - int64_t get_batch_put_revoke_failures(); // Eviction Metrics void inc_eviction_success(int64_t key_count, int64_t size); - void inc_eviction_fail(); // not a single object is evicted + void inc_eviction_fail(); // not a single object is evicted // Eviction Metrics Getters int64_t get_eviction_success(); @@ -165,8 +156,6 @@ class MasterMetricManager { ylt::metric::counter_t put_start_failures_; ylt::metric::counter_t put_end_requests_; ylt::metric::counter_t put_end_failures_; - ylt::metric::counter_t put_revoke_requests_; - ylt::metric::counter_t put_revoke_failures_; ylt::metric::counter_t get_replica_list_requests_; ylt::metric::counter_t get_replica_list_failures_; ylt::metric::counter_t exist_key_requests_; @@ -193,8 +182,6 @@ class MasterMetricManager { ylt::metric::counter_t batch_put_start_failures_; ylt::metric::counter_t batch_put_end_requests_; ylt::metric::counter_t batch_put_end_failures_; - ylt::metric::counter_t batch_put_revoke_requests_; - ylt::metric::counter_t batch_put_revoke_failures_; // Eviction Metrics ylt::metric::counter_t eviction_success_; diff --git a/mooncake-store/include/master_service.h b/mooncake-store/include/master_service.h index dd2cfc2d5..98527014f 100644 --- a/mooncake-store/include/master_service.h +++ b/mooncake-store/include/master_service.h @@ -26,6 +26,7 @@ namespace mooncake { // Forward declarations class AllocationStrategy; class EvictionStrategy; +enum class PutResult : uint8_t; // Structure to store garbage collection tasks struct GCTask { @@ -178,34 +179,13 @@ class MasterService { -> tl::expected, ErrorCode>; /** - * @brief Complete a put operation + * @brief Complete a put operation of a replica * @return ErrorCode::OK on success, ErrorCode::OBJECT_NOT_FOUND if not * found, ErrorCode::INVALID_WRITE if replica status is invalid */ - auto PutEnd(const std::string& key) -> tl::expected; - - /** - * @brief Revoke a put operation - * @return ErrorCode::OK on success, ErrorCode::OBJECT_NOT_FOUND if not - * found, ErrorCode::INVALID_WRITE if replica status is invalid - */ - auto PutRevoke(const std::string& key) -> tl::expected; - - /** - * @brief Complete a batch of put operations - * @return ErrorCode::OK on success, ErrorCode::OBJECT_NOT_FOUND if not - * found, ErrorCode::INVALID_WRITE if replica status is invalid - */ - std::vector> BatchPutEnd( - const std::vector& keys); - - /** - * @brief Revoke a batch of put operations - * @return ErrorCode::OK on success, ErrorCode::OBJECT_NOT_FOUND if not - * found, ErrorCode::INVALID_WRITE if replica status is invalid - */ - std::vector> BatchPutRevoke( - const std::vector& keys); + auto PutEnd(const std::string& key, + const std::vector& put_success) + -> tl::expected; /** * @brief Remove an object and its replicas @@ -442,6 +422,7 @@ class MasterService { void ClientMonitorFunc(); std::thread client_monitor_thread_; std::atomic client_monitor_running_{false}; + std::atomic_uint64_t replica_id_allocator_{0}; static constexpr uint64_t kClientMonitorSleepMs = 1000; // 1000 ms sleep between client monitor checks // boost lockfree queue requires trivial assignment operator diff --git a/mooncake-store/include/rpc_service.h b/mooncake-store/include/rpc_service.h index 0fb19b462..cb3a8ff1e 100644 --- a/mooncake-store/include/rpc_service.h +++ b/mooncake-store/include/rpc_service.h @@ -3,7 +3,6 @@ #include #include -#include #include #include #include @@ -12,16 +11,17 @@ #include #include -#include "master_metric_manager.h" #include "master_service.h" -#include "rpc_helper.h" #include "types.h" -#include "utils/scoped_vlog_timer.h" namespace mooncake { constexpr uint64_t kMetricReportIntervalSeconds = 10; +// We use this enum instead because std::vector can't be serialized +// properly over RPC. +enum class PutResult : uint8_t { SUCCESS = 0, FAILED = 1 }; + class WrappedMasterService { public: WrappedMasterService( @@ -36,527 +36,69 @@ class WrappedMasterService { ViewVersionId view_version = 0, int64_t client_live_ttl_sec = DEFAULT_CLIENT_LIVE_TTL_SEC, bool enable_ha = false, - const std::string& cluster_id = DEFAULT_CLUSTER_ID) - : master_service_(enable_gc, default_kv_lease_ttl, - default_kv_soft_pin_ttl, - allow_evict_soft_pinned_objects, eviction_ratio, - eviction_high_watermark_ratio, view_version, - client_live_ttl_sec, enable_ha, cluster_id), - http_server_(4, http_port), - metric_report_running_(enable_metric_reporting) { - // Initialize HTTP server for metrics - init_http_server(); - - // Set the config for metric reporting - MasterMetricManager::instance().set_enable_ha(enable_ha); - - // Start metric reporting thread if enabled - if (enable_metric_reporting) { - metric_report_thread_ = std::thread([this]() { - while (metric_report_running_) { - std::string metrics_summary = - MasterMetricManager::instance().get_summary_string(); - LOG(INFO) << "Master Metrics: " << metrics_summary; - std::this_thread::sleep_for( - std::chrono::seconds(kMetricReportIntervalSeconds)); - } - }); - } - } - - ~WrappedMasterService() { - metric_report_running_ = false; - if (metric_report_thread_.joinable()) { - metric_report_thread_.join(); - } - // Stop HTTP server - http_server_.stop(); - } - - // Initialize and start the HTTP server - void init_http_server() { - using namespace coro_http; - - // Endpoint for Prometheus metrics - http_server_.set_http_handler( - "/metrics", [](coro_http_request& req, coro_http_response& resp) { - std::string metrics = - MasterMetricManager::instance().serialize_metrics(); - resp.add_header("Content-Type", "text/plain; version=0.0.4"); - resp.set_status_and_content(status_type::ok, metrics); - }); - - // Endpoint for human-readable metrics summary - http_server_.set_http_handler( - "/metrics/summary", - [](coro_http_request& req, coro_http_response& resp) { - std::string summary = - MasterMetricManager::instance().get_summary_string(); - resp.add_header("Content-Type", "text/plain; version=0.0.4"); - resp.set_status_and_content(status_type::ok, summary); - }); - - // Endpoint for query a key's location - http_server_.set_http_handler( - "/query_key", - [&](coro_http_request& req, coro_http_response& resp) { - auto key = req.get_query_value("key"); - auto get_result = GetReplicaList(std::string(key)); - resp.add_header("Content-Type", "text/plain; version=0.0.4"); - if (get_result) { - std::string ss = ""; - for (size_t i = 0; i < get_result.value().size(); i++) { - if (get_result.value()[i].is_memory_replica()) { - auto& memory_descriptors = - get_result.value()[i].get_memory_descriptor(); - for (const auto& handle : - memory_descriptors.buffer_descriptors) { - std::string tmp = ""; - struct_json::to_json(handle, tmp); - ss += tmp; - ss += "\n"; - } - } - } - resp.set_status_and_content(status_type::ok, ss); - } else { - resp.set_status_and_content(status_type::not_found, - toString(get_result.error())); - } - }); - - // Endpoint for query all keys - http_server_.set_http_handler( - "/get_all_keys", - [&](coro_http_request& req, coro_http_response& resp) { - resp.add_header("Content-Type", "text/plain; version=0.0.4"); - - auto result = master_service_.GetAllKeys(); - if (result) { - std::string ss = ""; - auto keys = result.value(); - for (const auto& key : keys) { - ss += key; - ss += "\n"; - } - resp.set_status_and_content(status_type::ok, ss); - } else { - resp.set_status_and_content( - status_type::internal_server_error, - "Failed to get all keys"); - } - }); - - // Endpoint for query all segments - http_server_.set_http_handler( - "/get_all_segments", - [&](coro_http_request& req, coro_http_response& resp) { - resp.add_header("Content-Type", "text/plain; version=0.0.4"); - auto result = master_service_.GetAllSegments(); - if (result) { - std::string ss = ""; - auto segments = result.value(); - for (const auto& segment_name : segments) { - ss += segment_name; - ss += "\n"; - } - resp.set_status_and_content(status_type::ok, ss); - } else { - resp.set_status_and_content( - status_type::internal_server_error, - "Failed to get all segments"); - } - }); + const std::string& cluster_id = DEFAULT_CLUSTER_ID); - // Endpoint for query segment details - http_server_.set_http_handler( - "/query_segment", - [&](coro_http_request& req, coro_http_response& resp) { - auto segment = req.get_query_value("segment"); - resp.add_header("Content-Type", "text/plain; version=0.0.4"); - auto result = - master_service_.QuerySegments(std::string(segment)); + ~WrappedMasterService(); - if (result) { - std::string ss = ""; - auto [used, capacity] = result.value(); - ss += segment; - ss += "\n"; - ss += "Used(bytes): "; - ss += std::to_string(used); - ss += "\nCapacity(bytes) : "; - ss += std::to_string(capacity); - ss += "\n"; - resp.set_status_and_content(status_type::ok, ss); - } else { - resp.set_status_and_content( - status_type::internal_server_error, - "Failed to query segment"); - } - }); + void init_http_server(); - // Health check endpoint - http_server_.set_http_handler( - "/health", [](coro_http_request& req, coro_http_response& resp) { - resp.add_header("Content-Type", "text/plain; version=0.0.4"); - resp.set_status_and_content(status_type::ok, "OK"); - }); - - // Start the HTTP server asynchronously - http_server_.async_start(); - LOG(INFO) << "HTTP metrics server started on port " - << http_server_.port(); - } - - tl::expected ExistKey(const std::string& key) { - return execute_rpc( - "ExistKey", [&] { return master_service_.ExistKey(key); }, - [&](auto& timer) { timer.LogRequest("key=", key); }, - [] { MasterMetricManager::instance().inc_exist_key_requests(); }, - [] { MasterMetricManager::instance().inc_exist_key_failures(); }); - } + tl::expected ExistKey(const std::string& key); std::vector> BatchExistKey( - const std::vector& keys) { - ScopedVLogTimer timer(1, "BatchExistKey"); - timer.LogRequest("keys_count=", keys.size()); - MasterMetricManager::instance().inc_batch_exist_key_requests(); - - auto result = master_service_.BatchExistKey(keys); - - // Count failures and log errors - size_t failure_count = 0; - for (size_t i = 0; i < result.size(); ++i) { - if (!result[i].has_value()) { - failure_count++; - LOG(ERROR) << "BatchExistKey failed for key[" << i << "] '" - << keys[i] << "': " << toString(result[i].error()); - } - } - MasterMetricManager::instance().inc_batch_exist_key_failures( - failure_count); - - timer.LogResponse("total=", result.size(), - ", success=", result.size() - failure_count, - ", failures=", failure_count); - return result; - } + const std::vector& keys); tl::expected, ErrorCode> GetReplicaList( - const std::string& key) { - return execute_rpc( - "GetReplicaList", - [&] { return master_service_.GetReplicaList(key); }, - [&](auto& timer) { timer.LogRequest("key=", key); }, - [] { - MasterMetricManager::instance().inc_get_replica_list_requests(); - }, - [] { - MasterMetricManager::instance().inc_get_replica_list_failures(); - }); - } + const std::string& key); std::vector, ErrorCode>> - BatchGetReplicaList(const std::vector& keys) { - ScopedVLogTimer timer(1, "BatchGetReplicaList"); - timer.LogRequest("keys_count=", keys.size()); - MasterMetricManager::instance().inc_batch_get_replica_list_requests(); - - std::vector, ErrorCode>> - results; - results.reserve(keys.size()); - - for (const auto& key : keys) { - results.emplace_back(master_service_.GetReplicaList(key)); - } - - // Count failures and log errors - size_t failure_count = 0; - for (size_t i = 0; i < results.size(); ++i) { - if (!results[i].has_value()) { - failure_count++; - LOG(ERROR) << "BatchGetReplicaList failed for key[" << i - << "] '" << keys[i] - << "': " << toString(results[i].error()); - } - } - MasterMetricManager::instance().inc_batch_get_replica_list_failures( - failure_count); - - timer.LogResponse("total=", results.size(), - ", success=", results.size() - failure_count, - ", failures=", failure_count); - return results; - } + BatchGetReplicaList(const std::vector& keys); tl::expected, ErrorCode> PutStart( const std::string& key, const std::vector& slice_lengths, - const ReplicateConfig& config) { - return execute_rpc( - "PutStart", - [&] { - return master_service_.PutStart(key, slice_lengths, config); - }, - [&](auto& timer) { - timer.LogRequest("key=", key, - ", slice_lengths=", slice_lengths.size()); - }, - [&] { MasterMetricManager::instance().inc_put_start_requests(); }, - [] { MasterMetricManager::instance().inc_put_start_failures(); }); - } + const ReplicateConfig& config); - tl::expected PutEnd(const std::string& key) { - return execute_rpc( - "PutEnd", [&] { return master_service_.PutEnd(key); }, - [&](auto& timer) { timer.LogRequest("key=", key); }, - [] { MasterMetricManager::instance().inc_put_end_requests(); }, - [] { MasterMetricManager::instance().inc_put_end_failures(); }); - } + tl::expected PutEnd( + const std::string& key, const std::vector& put_success); - tl::expected PutRevoke(const std::string& key) { - return execute_rpc( - "PutRevoke", [&] { return master_service_.PutRevoke(key); }, - [&](auto& timer) { timer.LogRequest("key=", key); }, - [] { MasterMetricManager::instance().inc_put_revoke_requests(); }, - [] { MasterMetricManager::instance().inc_put_revoke_failures(); }); - } + std::vector> BatchPutEnd( + const std::vector& keys, + const std::vector>& put_success); std::vector, ErrorCode>> BatchPutStart(const std::vector& keys, const std::vector>& slice_lengths, - const ReplicateConfig& config) { - ScopedVLogTimer timer(1, "BatchPutStart"); - timer.LogRequest("keys_count=", keys.size()); - MasterMetricManager::instance().inc_batch_put_start_requests(); + const ReplicateConfig& config); - std::vector, ErrorCode>> - results; - results.reserve(keys.size()); + tl::expected Remove(const std::string& key); - for (size_t i = 0; i < keys.size(); ++i) { - results.emplace_back( - master_service_.PutStart(keys[i], slice_lengths[i], config)); - } - - // Count failures and log errors - size_t failure_count = 0; - for (size_t i = 0; i < results.size(); ++i) { - if (!results[i].has_value()) { - failure_count++; - LOG(ERROR) << "BatchPutStart failed for key[" << i << "] '" - << keys[i] << "': " << toString(results[i].error()); - } - } - MasterMetricManager::instance().inc_batch_put_start_failures( - failure_count); - - timer.LogResponse("total=", results.size(), - ", success=", results.size() - failure_count, - ", failures=", failure_count); - return results; - } - - std::vector> BatchPutEnd( - const std::vector& keys) { - ScopedVLogTimer timer(1, "BatchPutEnd"); - timer.LogRequest("keys_count=", keys.size()); - MasterMetricManager::instance().inc_batch_put_end_requests(); - - std::vector> results; - results.reserve(keys.size()); - - for (const auto& key : keys) { - results.emplace_back(master_service_.PutEnd(key)); - } - - // Count failures and log errors - size_t failure_count = 0; - for (size_t i = 0; i < results.size(); ++i) { - if (!results[i].has_value()) { - failure_count++; - LOG(ERROR) << "BatchPutEnd failed for key[" << i << "] '" - << keys[i] << "': " << toString(results[i].error()); - } - } - MasterMetricManager::instance().inc_batch_put_end_failures( - failure_count); - - timer.LogResponse("total=", results.size(), - ", success=", results.size() - failure_count, - ", failures=", failure_count); - return results; - } - - std::vector> BatchPutRevoke( - const std::vector& keys) { - ScopedVLogTimer timer(1, "BatchPutRevoke"); - timer.LogRequest("keys_count=", keys.size()); - MasterMetricManager::instance().inc_batch_put_revoke_requests(); - - std::vector> results; - results.reserve(keys.size()); - - for (const auto& key : keys) { - results.emplace_back(master_service_.PutRevoke(key)); - } - - // Count failures and log errors - size_t failure_count = 0; - for (size_t i = 0; i < results.size(); ++i) { - if (!results[i].has_value()) { - failure_count++; - LOG(ERROR) << "BatchPutRevoke failed for key[" << i << "] '" - << keys[i] << "': " << toString(results[i].error()); - } - } - MasterMetricManager::instance().inc_batch_put_revoke_failures( - failure_count); - - timer.LogResponse("total=", results.size(), - ", success=", results.size() - failure_count, - ", failures=", failure_count); - return results; - } - - tl::expected Remove(const std::string& key) { - return execute_rpc( - "Remove", [&] { return master_service_.Remove(key); }, - [&](auto& timer) { timer.LogRequest("key=", key); }, - [] { MasterMetricManager::instance().inc_remove_requests(); }, - [] { MasterMetricManager::instance().inc_remove_failures(); }); - } - - long RemoveAll() { - ScopedVLogTimer timer(1, "RemoveAll"); - timer.LogRequest("action=remove_all_objects"); - MasterMetricManager::instance().inc_remove_all_requests(); - long result = master_service_.RemoveAll(); - timer.LogResponse("items_removed=", result); - return result; - } + long RemoveAll(); tl::expected MountSegment(const Segment& segment, - const UUID& client_id) { - return execute_rpc( - "MountSegment", - [&] { return master_service_.MountSegment(segment, client_id); }, - [&](auto& timer) { - timer.LogRequest("base=", segment.base, ", size=", segment.size, - ", segment_name=", segment.name, - ", id=", segment.id); - }, - [] { - MasterMetricManager::instance().inc_mount_segment_requests(); - }, - [] { - MasterMetricManager::instance().inc_mount_segment_failures(); - }); - } + const UUID& client_id); tl::expected ReMountSegment( - const std::vector& segments, const UUID& client_id) { - return execute_rpc( - "ReMountSegment", - [&] { return master_service_.ReMountSegment(segments, client_id); }, - [&](auto& timer) { - timer.LogRequest("segments_count=", segments.size(), - ", client_id=", client_id); - }, - [] { - MasterMetricManager::instance().inc_remount_segment_requests(); - }, - [] { - MasterMetricManager::instance().inc_remount_segment_failures(); - }); - } + const std::vector& segments, const UUID& client_id); tl::expected UnmountSegment(const UUID& segment_id, - const UUID& client_id) { - return execute_rpc( - "UnmountSegment", - [&] { - return master_service_.UnmountSegment(segment_id, client_id); - }, - [&](auto& timer) { - timer.LogRequest("segment_id=", segment_id, - ", client_id=", client_id); - }, - [] { - MasterMetricManager::instance().inc_unmount_segment_requests(); - }, - [] { - MasterMetricManager::instance().inc_unmount_segment_failures(); - }); - } - - tl::expected GetFsdir() { - ScopedVLogTimer timer(1, "GetFsdir"); - timer.LogRequest("action=get_fsdir"); + const UUID& client_id); - auto result = master_service_.GetFsdir(); - - timer.LogResponseExpected(result); - return result; - } + tl::expected GetFsdir(); tl::expected, ErrorCode> Ping( - const UUID& client_id) { - ScopedVLogTimer timer(1, "Ping"); - timer.LogRequest("client_id=", client_id); - - MasterMetricManager::instance().inc_ping_requests(); - - auto result = master_service_.Ping(client_id); - - timer.LogResponseExpected(result); - return result; - } + const UUID& client_id); private: + friend void RegisterRpcService( + coro_rpc::coro_rpc_server& server, + mooncake::WrappedMasterService& wrapped_master_service); + MasterService master_service_; std::thread metric_report_thread_; coro_http::coro_http_server http_server_; std::atomic metric_report_running_; }; -inline void RegisterRpcService( - coro_rpc::coro_rpc_server& server, - mooncake::WrappedMasterService& wrapped_master_service) { - server.register_handler<&mooncake::WrappedMasterService::ExistKey>( - &wrapped_master_service); - server.register_handler<&mooncake::WrappedMasterService::GetReplicaList>( - &wrapped_master_service); - server - .register_handler<&mooncake::WrappedMasterService::BatchGetReplicaList>( - &wrapped_master_service); - server.register_handler<&mooncake::WrappedMasterService::PutStart>( - &wrapped_master_service); - server.register_handler<&mooncake::WrappedMasterService::PutEnd>( - &wrapped_master_service); - server.register_handler<&mooncake::WrappedMasterService::PutRevoke>( - &wrapped_master_service); - server.register_handler<&mooncake::WrappedMasterService::BatchPutStart>( - &wrapped_master_service); - server.register_handler<&mooncake::WrappedMasterService::BatchPutEnd>( - &wrapped_master_service); - server.register_handler<&mooncake::WrappedMasterService::BatchPutRevoke>( - &wrapped_master_service); - server.register_handler<&mooncake::WrappedMasterService::Remove>( - &wrapped_master_service); - server.register_handler<&mooncake::WrappedMasterService::RemoveAll>( - &wrapped_master_service); - server.register_handler<&mooncake::WrappedMasterService::MountSegment>( - &wrapped_master_service); - server.register_handler<&mooncake::WrappedMasterService::ReMountSegment>( - &wrapped_master_service); - server.register_handler<&mooncake::WrappedMasterService::UnmountSegment>( - &wrapped_master_service); - server.register_handler<&mooncake::WrappedMasterService::Ping>( - &wrapped_master_service); - server.register_handler<&mooncake::WrappedMasterService::GetFsdir>( - &wrapped_master_service); - server.register_handler<&mooncake::WrappedMasterService::BatchExistKey>( - &wrapped_master_service); -} +void RegisterRpcService(coro_rpc::coro_rpc_server& server, + mooncake::WrappedMasterService& wrapped_master_service); } // namespace mooncake diff --git a/mooncake-store/include/types.h b/mooncake-store/include/types.h index 67d8ce91c..3b3990c68 100644 --- a/mooncake-store/include/types.h +++ b/mooncake-store/include/types.h @@ -307,20 +307,23 @@ class Replica { public: struct Descriptor; - Replica() = default; - Replica(std::vector> buffers, - ReplicaStatus status) - : buffers_(std::move(buffers)), status_(status) {} - - void reset() noexcept { - buffers_.clear(); - status_ = ReplicaStatus::UNDEFINED; - } + Replica() = delete; + Replica(std::vector> buffers) + : buffers_(std::move(buffers)), status_(ReplicaStatus::PROCESSING) {} + Replica(const Replica&) = delete; + Replica& operator=(const Replica&) = delete; + + Replica(Replica&&) noexcept = default; + Replica& operator=(Replica&&) noexcept = default; [[nodiscard]] Descriptor get_descriptor() const; [[nodiscard]] ReplicaStatus status() const { return status_; } + [[nodiscard]] bool completed() const { + return status_ == ReplicaStatus::COMPLETE; + } + [[nodiscard]] bool has_invalid_handle() const { return std::any_of(buffers_.begin(), buffers_.end(), [](const std::unique_ptr& buf_ptr) { diff --git a/mooncake-store/src/CMakeLists.txt b/mooncake-store/src/CMakeLists.txt index 06179e08c..80d31ab32 100644 --- a/mooncake-store/src/CMakeLists.txt +++ b/mooncake-store/src/CMakeLists.txt @@ -5,6 +5,7 @@ set(MOONCAKE_STORE_SOURCES allocator.cpp master_service.cpp client.cpp + client_batch_put.cpp types.cpp master_client.cpp utils.cpp @@ -16,8 +17,7 @@ set(MOONCAKE_STORE_SOURCES ha_helper.cpp segment.cpp transfer_task.cpp - etcd_helper.cpp - ha_helper.cpp + rpc_service.cpp ) # The cache_allocator library diff --git a/mooncake-store/src/client.cpp b/mooncake-store/src/client.cpp index 93769c901..2ace2b309 100644 --- a/mooncake-store/src/client.cpp +++ b/mooncake-store/src/client.cpp @@ -9,6 +9,7 @@ #include #include +#include "client_batch_put.h" #include "transfer_engine.h" #include "transfer_task.h" #include "transport/transport.h" @@ -481,405 +482,48 @@ tl::expected Client::Put(const ObjectKey& key, return tl::unexpected(err); } - // Transfer data using allocated handles from all replicas + // Transfer data using allocated handles from all replicas and collect + // results + std::vector put_results; + put_results.reserve(start_result.value().size()); + bool any_transfer_failed = false; + + ErrorCode first_transfer_error = ErrorCode::OK; + for (const auto& replica : start_result.value()) { ErrorCode transfer_err = TransferWrite(replica, slices); if (transfer_err != ErrorCode::OK) { - // Revoke put operation - auto revoke_result = master_client_.PutRevoke(key); - if (!revoke_result) { - LOG(ERROR) << "Failed to revoke put operation"; - return tl::unexpected(revoke_result.error()); + put_results.emplace_back(PutResult::FAILED); + any_transfer_failed = true; + if (first_transfer_error == ErrorCode::OK) { + first_transfer_error = transfer_err; } - return tl::unexpected(transfer_err); + LOG(ERROR) << "Failed to transfer data to replica: " + << transfer_err; + } else { + put_results.emplace_back(PutResult::SUCCESS); } } - // End put operation - auto end_result = master_client_.PutEnd(key); + // End put operation with results for all replicas + auto end_result = master_client_.PutEnd(key, put_results); if (!end_result) { ErrorCode err = end_result.error(); LOG(ERROR) << "Failed to end put operation: " << err; return tl::unexpected(err); } + // If any transfer failed, return the first error + if (any_transfer_failed) { + return tl::unexpected(first_transfer_error); + } + // Store to local file if storage backend is available PutToLocalFile(key, slices); return {}; } -// TODO: `client.cpp` is too long, consider split it into multiple files -enum class PutOperationState { - PENDING, - MASTER_FAILED, - TRANSFER_FAILED, - FINALIZE_FAILED, - SUCCESS -}; - -class PutOperation { - public: - PutOperation(std::string_view k, const std::vector& s) - : key(k), slices(s) { - value_length = CalculateSliceSize(slices); - // Initialize with a pending error state to ensure result is always set - result = tl::unexpected(ErrorCode::INTERNAL_ERROR); - } - - std::string key; - std::vector slices; - size_t value_length; - - // Enhanced state tracking - PutOperationState state = PutOperationState::PENDING; - tl::expected result; - std::vector replicas; - std::vector pending_transfers; - - // Error context for debugging - std::optional failure_context; - - // Helper methods for robust state management - void SetSuccess() { - state = PutOperationState::SUCCESS; - result = {}; - failure_context.reset(); - } - - void SetError(ErrorCode error, const std::string& context = "") { - result = tl::unexpected(error); - if (!context.empty()) { - failure_context = toString(error) + ": " + context + "; " + - failure_context.value_or(""); - } - - // Update state based on current processing stage - if (replicas.empty()) { - state = PutOperationState::MASTER_FAILED; - } else if (pending_transfers.empty()) { - state = PutOperationState::TRANSFER_FAILED; - } else { - state = PutOperationState::FINALIZE_FAILED; - } - LOG(WARNING) << "Put operation failed for key " << key << ", context: " - << failure_context.value_or("unknown error"); - } - - bool IsResolved() const { return state != PutOperationState::PENDING; } - - bool IsSuccessful() const { - return state == PutOperationState::SUCCESS && result.has_value(); - } -}; - -std::vector Client::CreatePutOperations( - const std::vector& keys, - const std::vector>& batched_slices) { - std::vector ops; - ops.reserve(keys.size()); - for (size_t i = 0; i < keys.size(); ++i) { - ops.emplace_back(keys[i], batched_slices[i]); - } - return ops; -} - -void Client::StartBatchPut(std::vector& ops, - const ReplicateConfig& config) { - std::vector keys; - std::vector> slice_lengths; - - keys.reserve(ops.size()); - slice_lengths.reserve(ops.size()); - - for (const auto& op : ops) { - keys.emplace_back(op.key); - - std::vector slice_sizes; - slice_sizes.reserve(op.slices.size()); - for (const auto& slice : op.slices) { - slice_sizes.emplace_back(slice.size); - } - slice_lengths.emplace_back(std::move(slice_sizes)); - } - - auto start_responses = - master_client_.BatchPutStart(keys, slice_lengths, config); - - // Ensure response size matches request size - if (start_responses.size() != ops.size()) { - LOG(ERROR) << "BatchPutStart response size mismatch: expected " - << ops.size() << ", got " << start_responses.size(); - for (auto& op : ops) { - op.SetError(ErrorCode::RPC_FAIL, - "BatchPutStart response size mismatch"); - } - return; - } - - // Process individual responses with robust error handling - for (size_t i = 0; i < ops.size(); ++i) { - if (!start_responses[i]) { - ops[i].SetError(start_responses[i].error(), - "Master failed to start put operation"); - } else { - ops[i].replicas = start_responses[i].value(); - // Operation continues to next stage - result remains INTERNAL_ERROR - // until fully successful - VLOG(1) << "Successfully started put for key " << ops[i].key - << " with " << ops[i].replicas.size() << " replicas"; - } - } -} - -void Client::SubmitTransfers(std::vector& ops) { - CHECK(transfer_submitter_) << "TransferSubmitter not initialized"; - - for (auto& op : ops) { - // Skip operations that already failed in previous stages - if (op.IsResolved()) { - continue; - } - - // Skip operations that don't have replicas (failed in StartBatchPut) - if (op.replicas.empty()) { - op.SetError(ErrorCode::INTERNAL_ERROR, - "No replicas available for transfer"); - continue; - } - - bool all_transfers_submitted = true; - std::string failure_context; - - for (size_t replica_idx = 0; replica_idx < op.replicas.size(); - ++replica_idx) { - const auto& replica = op.replicas[replica_idx]; - - auto submit_result = transfer_submitter_->submit( - replica, op.slices, TransferRequest::WRITE); - - if (!submit_result) { - failure_context = "Failed to submit transfer for replica " + - std::to_string(replica_idx); - all_transfers_submitted = false; - break; - } - - op.pending_transfers.emplace_back(std::move(submit_result.value())); - } - - if (!all_transfers_submitted) { - LOG(ERROR) << "Transfer submission failed for key " << op.key - << ": " << failure_context; - op.SetError(ErrorCode::TRANSFER_FAIL, failure_context); - op.pending_transfers.clear(); - } else { - VLOG(1) << "Successfully submitted " << op.pending_transfers.size() - << " transfers for key " << op.key; - } - } -} - -void Client::WaitForTransfers(std::vector& ops) { - for (auto& op : ops) { - // Skip operations that already failed or completed - if (op.IsResolved()) { - continue; - } - - // Skip operations with no pending transfers (failed in SubmitTransfers) - if (op.pending_transfers.empty()) { - op.SetError(ErrorCode::INTERNAL_ERROR, - "No pending transfers to wait for"); - continue; - } - - bool all_transfers_succeeded = true; - ErrorCode first_error = ErrorCode::OK; - size_t failed_transfer_idx = 0; - - for (size_t i = 0; i < op.pending_transfers.size(); ++i) { - ErrorCode transfer_result = op.pending_transfers[i].get(); - if (transfer_result != ErrorCode::OK) { - if (all_transfers_succeeded) { - // Record the first error for reporting - first_error = transfer_result; - failed_transfer_idx = i; - all_transfers_succeeded = false; - } - // Continue waiting for all transfers to avoid resource leaks - } - } - - if (all_transfers_succeeded) { - VLOG(1) << "All transfers completed successfully for key " - << op.key; - // Transfer phase successful - continue to finalization - // Note: Don't mark as SUCCESS yet, need to complete finalization - } else { - std::string error_context = - "Transfer " + std::to_string(failed_transfer_idx) + " failed"; - LOG(ERROR) << "Transfer failed for key " << op.key << ": " - << toString(first_error) << " (" << error_context << ")"; - op.SetError(first_error, error_context); - } - } -} - -void Client::FinalizeBatchPut(std::vector& ops) { - // For each operation, - // If transfers completed successfully, we need to call BatchPutEnd - // If the operation failed but has allocated replicas, we need to call - // BatchPutRevoke - - std::vector successful_keys; - std::vector successful_indices; - std::vector failed_keys; - std::vector failed_indices; - - // Reserve space to avoid reallocations - successful_keys.reserve(ops.size()); - successful_indices.reserve(ops.size()); - failed_keys.reserve(ops.size()); - failed_indices.reserve(ops.size()); - - for (size_t i = 0; i < ops.size(); ++i) { - auto& op = ops[i]; - - // Check if operation completed transfers successfully and needs - // finalization - if (!op.IsResolved() && !op.replicas.empty() && - !op.pending_transfers.empty()) { - // Transfers completed, needs BatchPutEnd - successful_keys.emplace_back(op.key); - successful_indices.emplace_back(i); - } else if (op.state != PutOperationState::PENDING && - !op.replicas.empty()) { - // Operation failed but has allocated replicas, needs BatchPutRevoke - failed_keys.emplace_back(op.key); - failed_indices.emplace_back(i); - } - // Operations without replicas (early failures) don't need finalization - } - - // Process successful operations - if (!successful_keys.empty()) { - auto end_responses = master_client_.BatchPutEnd(successful_keys); - if (end_responses.size() != successful_keys.size()) { - LOG(ERROR) << "BatchPutEnd response size mismatch: expected " - << successful_keys.size() << ", got " - << end_responses.size(); - for (size_t idx : successful_indices) { - ops[idx].SetError(ErrorCode::RPC_FAIL, - "BatchPutEnd response size mismatch"); - } - } else { - // Process individual responses - for (size_t i = 0; i < end_responses.size(); ++i) { - const size_t op_idx = successful_indices[i]; - if (!end_responses[i]) { - LOG(ERROR) << "Failed to finalize put for key " - << successful_keys[i] << ": " - << toString(end_responses[i].error()); - ops[op_idx].SetError(end_responses[i].error(), - "BatchPutEnd failed"); - } else { - // Operation fully successful - ops[op_idx].SetSuccess(); - VLOG(1) << "Successfully completed put for key " - << successful_keys[i]; - } - } - } - } - - // Process failed operations that need cleanup - if (!failed_keys.empty()) { - auto revoke_responses = master_client_.BatchPutRevoke(failed_keys); - if (revoke_responses.size() != failed_keys.size()) { - LOG(ERROR) << "BatchPutRevoke response size mismatch: expected " - << failed_keys.size() << ", got " - << revoke_responses.size(); - // Mark all failed operations with revoke RPC failure - for (size_t idx : failed_indices) { - ops[idx].SetError(ErrorCode::RPC_FAIL, - "BatchPutRevoke response size mismatch"); - } - } else { - // Process individual revoke responses - for (size_t i = 0; i < revoke_responses.size(); ++i) { - const size_t op_idx = failed_indices[i]; - if (!revoke_responses[i]) { - LOG(ERROR) - << "Failed to revoke put for key " << failed_keys[i] - << ": " << toString(revoke_responses[i].error()); - // Preserve original error but note revoke failure in - // context - std::string original_context = - ops[op_idx].failure_context.value_or("unknown error"); - ops[op_idx].failure_context = - original_context + "; revoke also failed"; - } else { - LOG(INFO) << "Successfully revoked failed put for key " - << failed_keys[i]; - } - } - } - } - - // Ensure all operations have definitive results - for (auto& op : ops) { - if (!op.IsResolved()) { - op.SetError(ErrorCode::INTERNAL_ERROR, - "Operation not resolved after finalization"); - LOG(ERROR) << "Operation for key " << op.key - << " was not properly resolved"; - } - } -} - -std::vector> Client::CollectResults( - const std::vector& ops) { - std::vector> results; - results.reserve(ops.size()); - - for (const auto& op : ops) { - // With the new structure, result is always set (never nullopt) - results.emplace_back(op.result); - - // Additional validation and logging for debugging - if (!op.result.has_value()) { - // if error == object already exist, consider as ok - if (op.result.error() == ErrorCode::OBJECT_ALREADY_EXISTS) { - results.back() = {}; - continue; - } - LOG(ERROR) << "Operation for key " << op.key - << " failed: " << toString(op.result.error()) - << (op.failure_context - ? (" (" + *op.failure_context + ")") - : ""); - } else { - VLOG(1) << "Operation for key " << op.key - << " completed successfully"; - } - } - - return results; -} - -std::vector> Client::BatchPut( - const std::vector& keys, - std::vector>& batched_slices, - const ReplicateConfig& config) { - std::vector ops = CreatePutOperations(keys, batched_slices); - StartBatchPut(ops, config); - SubmitTransfers(ops); - WaitForTransfers(ops); - FinalizeBatchPut(ops); - return CollectResults(ops); -} - tl::expected Client::Remove(const ObjectKey& key) { auto result = master_client_.Remove(key); if (storage_backend_) { diff --git a/mooncake-store/src/client_batch_put.cpp b/mooncake-store/src/client_batch_put.cpp new file mode 100644 index 000000000..0cb00f4ee --- /dev/null +++ b/mooncake-store/src/client_batch_put.cpp @@ -0,0 +1,188 @@ +#include "client_batch_put.h" + +#include + +#include +#include +#include +#include + +#include "client.h" +#include "transfer_engine.h" +#include "transfer_task.h" +#include "types.h" + +namespace mooncake { + +// Create PutOp objects from keys and slices for batch processing +std::vector Client::makeOps( + const std::vector& keys, + const std::vector>& batched_slices) { + std::vector ops; + ops.reserve(keys.size()); + // Pair each key with its corresponding slices + for (size_t i = 0; i < keys.size(); ++i) { + ops.emplace_back(keys[i], batched_slices[i]); + } + return ops; +} + +// Stage 1: Initialize put operations and get replica assignments from master +void Client::stageStart(std::vector& ops, + const ReplicateConfig& config) { + std::vector keys; + std::vector> slice_lengths; + keys.reserve(ops.size()); + slice_lengths.reserve(ops.size()); + + // Extract keys and slice sizes from operations for master request + for (const auto& op : ops) { + std::vector slice_sizes; + slice_sizes.reserve(op.slices.size()); + for (const auto& slice : op.slices) { + slice_sizes.emplace_back(slice.size); + } + + keys.emplace_back(op.key); + slice_lengths.emplace_back(std::move(slice_sizes)); + } + + // Request replica assignments from master server + auto start_responses = + master_client_.BatchPutStart(keys, slice_lengths, config); + + // Validate response size matches request size + if (start_responses.size() != ops.size()) { + LOG(ERROR) << "BatchPutStart response size mismatch: expected " + << ops.size() << ", got " << start_responses.size(); + for (auto& op : ops) { + op.result = tl::unexpected(ErrorCode::RPC_FAIL); + } + return; + } + + // Process responses and assign replicas to operations + for (size_t i = 0; i < ops.size(); ++i) { + if (!start_responses[i]) { + ops[i].result = tl::unexpected(start_responses[i].error()); + } else { + ops[i].replicas = std::move(start_responses[i].value()); + } + } +} + +// Stage 2: Transfer data to replicas using the transfer engine +void Client::stageTransfer(std::vector& ops) { + CHECK(transfer_submitter_) << "TransferSubmitter not initialized"; + + // Filter operations that put start successfully + std::vector success_ops; + success_ops.reserve(ops.size()); + for (auto& op : ops) { + if (op.result.has_value()) { + success_ops.emplace_back(&op); + } + } + + // Submit transfer tasks for each replica of successful operations + for (auto& op : success_ops) { + op->replica_futures.reserve(op->replicas.size()); + for (const auto& replica : op->replicas) { + auto future = transfer_submitter_->submit(replica, op->slices, + TransferRequest::WRITE); + op->replica_futures.emplace_back( + future.has_value() ? tl::expected( + std::move(*future)) + : tl::unexpected(ErrorCode::TRANSFER_FAIL)); + } + } + + // Wait for all transfer tasks to complete and collect results + for (auto& op : success_ops) { + for (auto& future : op->replica_futures) { + if (!future.has_value()) { + continue; + } + ErrorCode result = future->get(); + if (result != ErrorCode::OK) { + future = tl::unexpected(result); + } + } + } +} + +// Stage 3: Finalize put operations by reporting results to master +void Client::stageEnd(std::vector& ops) { + std::vector keys; + std::vector> put_results; + + keys.reserve(ops.size()); + put_results.reserve(ops.size()); + + // Collect transfer results for each operation + for (const auto& op : ops) { + keys.emplace_back(op.key); + + std::vector op_results; + if (op.result.has_value()) { + // Operation was successful so far, check individual replica results + assert(op.replicas.size() == op.replica_futures.size()); + op_results.reserve(op.replica_futures.size()); + + for (const auto& future : op.replica_futures) { + op_results.emplace_back(future.has_value() ? PutResult::SUCCESS + : PutResult::FAILED); + } + } else { + // Operation failed, mark all replicas as failed + op_results.resize(op.replicas.size(), PutResult::FAILED); + } + put_results.emplace_back(std::move(op_results)); + } + + // Report results to master server for finalization + if (!keys.empty()) { + auto end_results = master_client_.BatchPutEnd(keys, put_results); + + // Update operation results based on server response + for (size_t i = 0; i < ops.size() && i < end_results.size(); ++i) { + if (!end_results[i].has_value()) { + ops[i].result = tl::unexpected(end_results[i].error()); + } + } + } +} + +// Collect final results from all operations, treating OBJECT_ALREADY_EXISTS as +// success +std::vector> Client::collect( + const std::vector& ops) { + std::vector> results; + results.reserve(ops.size()); + for (const auto& op : ops) { + // Treat OBJECT_ALREADY_EXISTS as a successful operation + if (!op.result && + op.result.error() == ErrorCode::OBJECT_ALREADY_EXISTS) { + results.emplace_back(); + } else { + results.emplace_back(op.result); + } + } + return results; +} + +// Main entry point for batch put operations - orchestrates the three-stage +// process +std::vector> Client::BatchPut( + const std::vector& keys, + std::vector>& batched_slices, + const ReplicateConfig& config) { + // Execute the three-stage batch put process + auto ops = makeOps(keys, batched_slices); // Create operation objects + stageStart(ops, config); // Stage 1: Get replica assignments + stageTransfer(ops); // Stage 2: Transfer data to replicas + stageEnd(ops); // Stage 3: Finalize with master + return collect(ops); // Collect and return final results +} + +} // namespace mooncake diff --git a/mooncake-store/src/local_file.cpp b/mooncake-store/src/local_file.cpp index 1a271626c..a4f55b440 100644 --- a/mooncake-store/src/local_file.cpp +++ b/mooncake-store/src/local_file.cpp @@ -1,19 +1,21 @@ -#include -#include -#include -#include +#include "local_file.h" + #include -#include #include +#include +#include -#include "local_file.h" +#include +#include +#include namespace mooncake { -LocalFile::LocalFile(const std::string& filename,FILE *file,ErrorCode ec) : filename_(filename),file_(file),error_code_(ec) { +LocalFile::LocalFile(const std::string &filename, FILE *file, ErrorCode ec) + : filename_(filename), file_(file), error_code_(ec) { if (!file_ || ferror(file_)) { - error_code_ = ErrorCode::FILE_INVALID_HANDLE; + error_code_ = ErrorCode::FILE_INVALID_HANDLE; } else if (ec != ErrorCode::OK) { - error_code_ = ec; + error_code_ = ec; } } @@ -31,12 +33,12 @@ LocalFile::~LocalFile() { } else { LOG(INFO) << "Deleted corrupted file: " << filename_; } - } + } } file_ = nullptr; } -ssize_t LocalFile::write(const std::string &buffer, size_t length){ +ssize_t LocalFile::write(const std::string &buffer, size_t length) { if (file_ == nullptr) { error_code_ = ErrorCode::FILE_NOT_FOUND; return -1; @@ -46,7 +48,7 @@ ssize_t LocalFile::write(const std::string &buffer, size_t length){ return -1; } - if(length > static_cast(std::numeric_limits::max())) { + if (length > static_cast(std::numeric_limits::max())) { error_code_ = ErrorCode::FILE_INVALID_BUFFER; return -1; } @@ -58,11 +60,11 @@ ssize_t LocalFile::write(const std::string &buffer, size_t length){ size_t remaining = length; size_t written_bytes = 0; - const char* ptr = buffer.data(); + const char *ptr = buffer.data(); while (remaining > 0) { size_t written = fwrite(ptr, 1, remaining, file_); - if (written == 0) break; + if (written == 0) break; remaining -= written; ptr += written; written_bytes += written; @@ -71,7 +73,7 @@ ssize_t LocalFile::write(const std::string &buffer, size_t length){ if (remaining > 0) { error_code_ = ErrorCode::FILE_WRITE_FAIL; return -1; - } + } if (release_lock() == -1) { error_code_ = ErrorCode::FILE_LOCK_FAIL; @@ -86,7 +88,7 @@ ssize_t LocalFile::write(const std::string &buffer, size_t length){ return written_bytes; } -ssize_t LocalFile::read(std::string &buffer, size_t length){ +ssize_t LocalFile::read(std::string &buffer, size_t length) { if (file_ == nullptr) { error_code_ = ErrorCode::FILE_NOT_FOUND; return -1; @@ -96,7 +98,7 @@ ssize_t LocalFile::read(std::string &buffer, size_t length){ return -1; } - if(length > static_cast(std::numeric_limits::max())) { + if (length > static_cast(std::numeric_limits::max())) { error_code_ = ErrorCode::FILE_INVALID_BUFFER; return -1; } @@ -120,17 +122,17 @@ ssize_t LocalFile::read(std::string &buffer, size_t length){ return -1; } - buffer.resize(read_bytes); // shrink to actual read size + buffer.resize(read_bytes); // shrink to actual read size return read_bytes; } -ssize_t LocalFile::pwritev(const iovec *iov, int iovcnt, off_t offset){ - if(!file_){ +ssize_t LocalFile::pwritev(const iovec *iov, int iovcnt, off_t offset) { + if (!file_) { error_code_ = ErrorCode::FILE_NOT_FOUND; return -1; } - int fd=fileno(file_); + int fd = fileno(file_); if (fd == -1) { error_code_ = ErrorCode::FILE_INVALID_HANDLE; @@ -162,14 +164,13 @@ ssize_t LocalFile::pwritev(const iovec *iov, int iovcnt, off_t offset){ return ret; } - -ssize_t LocalFile::preadv(const iovec *iov, int iovcnt, off_t offset){ - if(!file_){ +ssize_t LocalFile::preadv(const iovec *iov, int iovcnt, off_t offset) { + if (!file_) { error_code_ = ErrorCode::FILE_NOT_FOUND; return -1; } - int fd=fileno(file_); + int fd = fileno(file_); if (fd == -1) { error_code_ = ErrorCode::FILE_INVALID_HANDLE; @@ -196,7 +197,7 @@ ssize_t LocalFile::preadv(const iovec *iov, int iovcnt, off_t offset){ return ret; } -int LocalFile::acquire_write_lock(){ +int LocalFile::acquire_write_lock() { if (flock(fileno(file_), LOCK_EX) == -1) { return -1; } @@ -204,7 +205,7 @@ int LocalFile::acquire_write_lock(){ return 0; } -int LocalFile::acquire_read_lock(){ +int LocalFile::acquire_read_lock() { if (flock(fileno(file_), LOCK_SH) == -1) { return -1; } @@ -212,7 +213,7 @@ int LocalFile::acquire_read_lock(){ return 0; } -int LocalFile::release_lock(){ +int LocalFile::release_lock() { if (!is_locked_) return 0; if (flock(fileno(file_), LOCK_UN) == -1) { return -1; @@ -221,4 +222,4 @@ int LocalFile::release_lock(){ return 0; } -} \ No newline at end of file +} // namespace mooncake \ No newline at end of file diff --git a/mooncake-store/src/master_client.cpp b/mooncake-store/src/master_client.cpp index 338fe2e93..a8d07fe69 100644 --- a/mooncake-store/src/master_client.cpp +++ b/mooncake-store/src/master_client.cpp @@ -277,7 +277,8 @@ MasterClient::BatchPutStart( return result; } -tl::expected MasterClient::PutEnd(const std::string& key) { +tl::expected MasterClient::PutEnd( + const std::string& key, const std::vector& put_results) { ScopedVLogTimer timer(1, "MasterClient::PutEnd"); timer.LogRequest("key=", key); @@ -289,7 +290,7 @@ tl::expected MasterClient::PutEnd(const std::string& key) { } auto request_result = - client->send_request<&WrappedMasterService::PutEnd>(key); + client->send_request<&WrappedMasterService::PutEnd>(key, put_results); auto result = coro::syncAwait([&]() -> coro::Lazy> { auto result = co_await co_await request_result; @@ -305,75 +306,15 @@ tl::expected MasterClient::PutEnd(const std::string& key) { } std::vector> MasterClient::BatchPutEnd( - const std::vector& keys) { - ScopedVLogTimer timer(1, "MasterClient::BatchPutEnd"); - timer.LogRequest("keys_count=", keys.size()); - - auto client = client_accessor_.GetClient(); - if (!client) { - LOG(ERROR) << "Client not available"; - timer.LogResponse("error=Client not available"); - std::vector> error_results; - error_results.reserve(keys.size()); - for (size_t i = 0; i < keys.size(); ++i) { - error_results.emplace_back( - tl::make_unexpected(ErrorCode::RPC_FAIL)); - } - return error_results; - } - - auto request_result = - client->send_request<&WrappedMasterService::BatchPutEnd>(keys); - auto result = coro::syncAwait( - [&]() -> coro::Lazy>> { - auto result = co_await co_await request_result; - if (!result) { - LOG(ERROR) << "Failed to end batch put operation: " - << result.error().msg; - std::vector> error_results; - error_results.reserve(keys.size()); - for (size_t i = 0; i < keys.size(); ++i) { - error_results.emplace_back( - tl::make_unexpected(ErrorCode::RPC_FAIL)); - } - co_return error_results; - } - co_return result->result(); - }()); - timer.LogResponse("result=", result.size(), " operations"); - return result; -} - -tl::expected MasterClient::PutRevoke(const std::string& key) { - ScopedVLogTimer timer(1, "MasterClient::PutRevoke"); - timer.LogRequest("key=", key); - - auto client = client_accessor_.GetClient(); - if (!client) { - LOG(ERROR) << "Client not available"; - timer.LogResponse("error=Client not available"); - return tl::make_unexpected(ErrorCode::RPC_FAIL); + const std::vector& keys, + const std::vector>& put_results) { + if (keys.size() != put_results.size()) { + LOG(ERROR) << "BatchPutEnd failed: keys.size() != put_results.size()"; + return std::vector>( + keys.size(), tl::make_unexpected(ErrorCode::INVALID_PARAMS)); } - auto request_result = - client->send_request<&WrappedMasterService::PutRevoke>(key); - auto result = - coro::syncAwait([&]() -> coro::Lazy> { - auto result = co_await co_await request_result; - if (!result) { - LOG(ERROR) << "Failed to revoke put operation: " - << result.error().msg; - co_return tl::make_unexpected(ErrorCode::RPC_FAIL); - } - co_return result->result(); - }()); - timer.LogResponseExpected(result); - return result; -} - -std::vector> MasterClient::BatchPutRevoke( - const std::vector& keys) { - ScopedVLogTimer timer(1, "MasterClient::BatchPutRevoke"); + ScopedVLogTimer timer(1, "MasterClient::BatchPutEnd"); timer.LogRequest("keys_count=", keys.size()); auto client = client_accessor_.GetClient(); @@ -390,12 +331,13 @@ std::vector> MasterClient::BatchPutRevoke( } auto request_result = - client->send_request<&WrappedMasterService::BatchPutRevoke>(keys); + client->send_request<&WrappedMasterService::BatchPutEnd>(keys, + put_results); auto result = coro::syncAwait( [&]() -> coro::Lazy>> { auto result = co_await co_await request_result; if (!result) { - LOG(ERROR) << "Failed to revoke batch put operation: " + LOG(ERROR) << "Failed to end batch put operation: " << result.error().msg; std::vector> error_results; error_results.reserve(keys.size()); diff --git a/mooncake-store/src/master_metric_manager.cpp b/mooncake-store/src/master_metric_manager.cpp index a584cf2ef..e7e0b6d62 100644 --- a/mooncake-store/src/master_metric_manager.cpp +++ b/mooncake-store/src/master_metric_manager.cpp @@ -22,13 +22,13 @@ MasterMetricManager::MasterMetricManager() "Total capacity across all mounted segments"), key_count_("master_key_count", "Total number of keys managed by the master"), - soft_pin_key_count_("master_soft_pin_key_count", - "Total number of soft-pinned keys managed by the master"), + soft_pin_key_count_( + "master_soft_pin_key_count", + "Total number of soft-pinned keys managed by the master"), // Initialize Histogram (4KB, 64KB, 256KB, 1MB, 4MB, 16MB, 64MB) - value_size_distribution_("master_value_size_bytes", - "Distribution of object value sizes", - {4096, 65536, 262144, 1048576, 4194304, - 16777216, 67108864}), + value_size_distribution_( + "master_value_size_bytes", "Distribution of object value sizes", + {4096, 65536, 262144, 1048576, 4194304, 16777216, 67108864}), // Initialize cluster metrics active_clients_("master_active_clients", "Total number of active clients"), @@ -42,30 +42,24 @@ MasterMetricManager::MasterMetricManager() "Total number of PutEnd requests received"), put_end_failures_("master_put_end_failures_total", "Total number of failed PutEnd requests"), - put_revoke_requests_("master_put_revoke_requests_total", - "Total number of PutRevoke requests received"), - put_revoke_failures_("master_put_revoke_failures_total", - "Total number of failed PutRevoke requests"), get_replica_list_requests_( "master_get_replica_list_requests_total", "Total number of GetReplicaList requests received"), get_replica_list_failures_( "master_get_replica_list_failures_total", "Total number of failed GetReplicaList requests"), - exist_key_requests_( - "master_exist_key_requests_total", - "Total number of ExistKey requests received"), - exist_key_failures_( - "master_exist_key_failures_total", - "Total number of failed ExistKey requests"), + exist_key_requests_("master_exist_key_requests_total", + "Total number of ExistKey requests received"), + exist_key_failures_("master_exist_key_failures_total", + "Total number of failed ExistKey requests"), remove_requests_("master_remove_requests_total", "Total number of Remove requests received"), remove_failures_("master_remove_failures_total", "Total number of failed Remove requests"), remove_all_requests_("master_remove_all_requests_total", - "Total number of Remove all requests received"), + "Total number of Remove all requests received"), remove_all_failures_("master_remove_all_failures_total", - "Total number of failed Remove all requests"), + "Total number of failed Remove all requests"), mount_segment_requests_("master_mount_segment_requests_total", "Total number of MountSegment requests received"), @@ -89,34 +83,36 @@ MasterMetricManager::MasterMetricManager() "Total number of failed ping requests"), // Initialize Batch Request Counters - batch_exist_key_requests_("master_batch_exist_key_requests_total", - "Total number of BatchExistKey requests received"), - batch_exist_key_failures_("master_batch_exist_key_failures_total", - "Total number of failed BatchExistKey requests"), - batch_get_replica_list_requests_("master_batch_get_replica_list_requests_total", - "Total number of BatchGetReplicaList requests received"), - batch_get_replica_list_failures_("master_batch_get_replica_list_failures_total", - "Total number of failed BatchGetReplicaList requests"), - batch_put_start_requests_("master_batch_put_start_requests_total", - "Total number of BatchPutStart requests received"), - batch_put_start_failures_("master_batch_put_start_failures_total", - "Total number of failed BatchPutStart requests"), + batch_exist_key_requests_( + "master_batch_exist_key_requests_total", + "Total number of BatchExistKey requests received"), + batch_exist_key_failures_( + "master_batch_exist_key_failures_total", + "Total number of failed BatchExistKey requests"), + batch_get_replica_list_requests_( + "master_batch_get_replica_list_requests_total", + "Total number of BatchGetReplicaList requests received"), + batch_get_replica_list_failures_( + "master_batch_get_replica_list_failures_total", + "Total number of failed BatchGetReplicaList requests"), + batch_put_start_requests_( + "master_batch_put_start_requests_total", + "Total number of BatchPutStart requests received"), + batch_put_start_failures_( + "master_batch_put_start_failures_total", + "Total number of failed BatchPutStart requests"), batch_put_end_requests_("master_batch_put_end_requests_total", "Total number of BatchPutEnd requests received"), batch_put_end_failures_("master_batch_put_end_failures_total", "Total number of failed BatchPutEnd requests"), - batch_put_revoke_requests_("master_batch_put_revoke_requests_total", - "Total number of BatchPutRevoke requests received"), - batch_put_revoke_failures_("master_batch_put_revoke_failures_total", - "Total number of failed BatchPutRevoke requests"), // Initialize Eviction Counters eviction_success_("master_successful_evictions_total", - "Total number of successful eviction operations"), + "Total number of successful eviction operations"), eviction_attempts_("master_attempted_evictions_total", "Total number of attempted eviction operations"), evicted_key_count_("master_evicted_key_count", - "Total number of keys evicted"), + "Total number of keys evicted"), evicted_size_("master_evicted_size_bytes", "Total bytes of evicted objects") {} @@ -158,16 +154,18 @@ double MasterMetricManager::get_global_used_ratio(void) { void MasterMetricManager::inc_key_count(int64_t val) { key_count_.inc(val); } void MasterMetricManager::dec_key_count(int64_t val) { key_count_.dec(val); } -void MasterMetricManager::inc_soft_pin_key_count(int64_t val) { soft_pin_key_count_.inc(val); } -void MasterMetricManager::dec_soft_pin_key_count(int64_t val) { soft_pin_key_count_.dec(val); } +void MasterMetricManager::inc_soft_pin_key_count(int64_t val) { + soft_pin_key_count_.inc(val); +} +void MasterMetricManager::dec_soft_pin_key_count(int64_t val) { + soft_pin_key_count_.dec(val); +} void MasterMetricManager::observe_value_size(int64_t size) { value_size_distribution_.observe(size); } -int64_t MasterMetricManager::get_key_count() { - return key_count_.value(); -} +int64_t MasterMetricManager::get_key_count() { return key_count_.value(); } int64_t MasterMetricManager::get_soft_pin_key_count() { return soft_pin_key_count_.value(); @@ -205,12 +203,7 @@ void MasterMetricManager::inc_put_end_requests(int64_t val) { void MasterMetricManager::inc_put_end_failures(int64_t val) { put_end_failures_.inc(val); } -void MasterMetricManager::inc_put_revoke_requests(int64_t val) { - put_revoke_requests_.inc(val); -} -void MasterMetricManager::inc_put_revoke_failures(int64_t val) { - put_revoke_failures_.inc(val); -} + void MasterMetricManager::inc_get_replica_list_requests(int64_t val) { get_replica_list_requests_.inc(val); } @@ -279,12 +272,6 @@ void MasterMetricManager::inc_batch_put_end_requests(int64_t val) { void MasterMetricManager::inc_batch_put_end_failures(int64_t val) { batch_put_end_failures_.inc(val); } -void MasterMetricManager::inc_batch_put_revoke_requests(int64_t val) { - batch_put_revoke_requests_.inc(val); -} -void MasterMetricManager::inc_batch_put_revoke_failures(int64_t val) { - batch_put_revoke_failures_.inc(val); -} int64_t MasterMetricManager::get_put_start_requests() { return put_start_requests_.value(); @@ -302,14 +289,6 @@ int64_t MasterMetricManager::get_put_end_failures() { return put_end_failures_.value(); } -int64_t MasterMetricManager::get_put_revoke_requests() { - return put_revoke_requests_.value(); -} - -int64_t MasterMetricManager::get_put_revoke_failures() { - return put_revoke_failures_.value(); -} - int64_t MasterMetricManager::get_get_replica_list_requests() { return get_replica_list_requests_.value(); } @@ -406,25 +385,16 @@ int64_t MasterMetricManager::get_batch_put_end_failures() { return batch_put_end_failures_.value(); } -int64_t MasterMetricManager::get_batch_put_revoke_requests() { - return batch_put_revoke_requests_.value(); -} - -int64_t MasterMetricManager::get_batch_put_revoke_failures() { - return batch_put_revoke_failures_.value(); -} - // Eviction Metrics -void MasterMetricManager::inc_eviction_success(int64_t key_count, int64_t size) { +void MasterMetricManager::inc_eviction_success(int64_t key_count, + int64_t size) { evicted_key_count_.inc(key_count); evicted_size_.inc(size); eviction_success_.inc(); eviction_attempts_.inc(); } -void MasterMetricManager::inc_eviction_fail() { - eviction_attempts_.inc(); -} +void MasterMetricManager::inc_eviction_fail() { eviction_attempts_.inc(); } int64_t MasterMetricManager::get_eviction_success() { return eviction_success_.value(); @@ -480,8 +450,6 @@ std::string MasterMetricManager::serialize_metrics() { serialize_metric(put_start_failures_); serialize_metric(put_end_requests_); serialize_metric(put_end_failures_); - serialize_metric(put_revoke_requests_); - serialize_metric(put_revoke_failures_); serialize_metric(get_replica_list_requests_); serialize_metric(get_replica_list_failures_); serialize_metric(remove_requests_); @@ -508,8 +476,6 @@ std::string MasterMetricManager::serialize_metrics() { serialize_metric(batch_put_start_failures_); serialize_metric(batch_put_end_requests_); serialize_metric(batch_put_end_failures_); - serialize_metric(batch_put_revoke_requests_); - serialize_metric(batch_put_revoke_failures_); // Serialize Eviction Counters serialize_metric(eviction_success_); @@ -576,7 +542,7 @@ std::string MasterMetricManager::get_summary_string() { << format_bytes(capacity); if (capacity > 0) { ss << " (" << std::fixed << std::setprecision(1) - << ((double) allocated / (double)capacity * 100.0) << "%)"; + << ((double)allocated / (double)capacity * 100.0) << "%)"; } ss << " | Keys: " << keys << " (soft-pinned: " << soft_pin_keys << ")"; if (enable_ha_) { @@ -585,22 +551,24 @@ std::string MasterMetricManager::get_summary_string() { // Request summary - focus on the most important metrics ss << " | Requests (Success/Total): "; - ss << "Put=" - << put_starts - put_start_fails + put_ends - put_end_fails + ss << "Put=" << put_starts - put_start_fails + put_ends - put_end_fails << "/" << put_starts + put_ends << ", "; - ss << "Get=" << get_replicas - get_replica_fails << "/" << get_replicas << ", "; + ss << "Get=" << get_replicas - get_replica_fails << "/" << get_replicas + << ", "; ss << "Exist=" << exist_keys - exist_key_fails << "/" << exist_keys << ", "; ss << "Del=" << removes - remove_fails << "/" << removes << ", "; - ss << "DelAll=" << remove_all - remove_all_fails << "/" << remove_all << ", "; + ss << "DelAll=" << remove_all - remove_all_fails << "/" << remove_all + << ", "; if (enable_ha_) { ss << "Ping=" << ping - ping_fails << "/" << ping << ", "; } // Eviction summary ss << " | Eviction: " - << "Success/Attempts=" << eviction_success << "/" << eviction_attempts << ", " - << "keys=" << evicted_key_count << ", " - << "size=" << format_bytes(evicted_size); + << "Success/Attempts=" << eviction_success << "/" << eviction_attempts + << ", " + << "keys=" << evicted_key_count << ", " + << "size=" << format_bytes(evicted_size); return ss.str(); } diff --git a/mooncake-store/src/master_service.cpp b/mooncake-store/src/master_service.cpp index 806e1a3ec..02326ce17 100644 --- a/mooncake-store/src/master_service.cpp +++ b/mooncake-store/src/master_service.cpp @@ -7,6 +7,7 @@ #include #include "master_metric_manager.h" +#include "rpc_service.h" #include "types.h" namespace mooncake { @@ -160,23 +161,28 @@ auto MasterService::ReMountSegment(const std::vector& segments, } void MasterService::ClearInvalidHandles() { + // Drop replicas whose handles are no longer valid; since we do not have + // replica re-scheduling yet, an object is erased entirely once all of its + // replicas are gone. + + // TODO: We should have replica re-scheduling + + // TODO: Since we iterate all shards and all objects, this is a very + // expensive operation. We could do it parallelly. for (auto& shard : metadata_shards_) { MutexLocker lock(&shard.mutex); - auto it = shard.metadata.begin(); - while (it != shard.metadata.end()) { - // Check if the object has any invalid replicas - bool has_invalid = false; - for (auto& replica : it->second.replicas) { - if (replica.has_invalid_handle()) { - has_invalid = true; - break; - } - } - // Remove the object if it has no valid replicas - if (has_invalid || CleanupStaleHandles(it->second)) { - it = shard.metadata.erase(it); + auto obj_it = shard.metadata.begin(); + while (obj_it != shard.metadata.end()) { + auto& meta = obj_it->second; + auto& reps = meta.replicas; + + std::erase_if( + reps, [](const Replica& r) { return r.has_invalid_handle(); }); + + if (reps.empty()) { + obj_it = shard.metadata.erase(obj_it); } else { - ++it; + ++obj_it; } } } @@ -372,6 +378,7 @@ auto MasterService::PutStart(const std::string& key, segment_manager_.getAllocatorAccess(); auto& allocators = allocator_access.getAllocators(); auto& allocators_by_name = allocator_access.getAllocatorsByName(); + for (size_t i = 0; i < config.replica_num; ++i) { std::vector> handles; handles.reserve(slice_lengths.size()); @@ -399,9 +406,7 @@ auto MasterService::PutStart(const std::string& key, << ", action=slice_allocated"; handles.emplace_back(std::move(handle)); } - - replicas.emplace_back(std::move(handles), - ReplicaStatus::PROCESSING); + replicas.emplace_back(std::move(handles)); } } @@ -420,7 +425,8 @@ auto MasterService::PutStart(const std::string& key, return replica_list; } -auto MasterService::PutEnd(const std::string& key) +auto MasterService::PutEnd(const std::string& key, + const std::vector& put_success) -> tl::expected { MetadataAccessor accessor(this, key); if (!accessor.Exists()) { @@ -429,53 +435,41 @@ auto MasterService::PutEnd(const std::string& key) } auto& metadata = accessor.Get(); - for (auto& replica : metadata.replicas) { - replica.mark_complete(); - } - // 1. Set lease timeout to now, indicating that the object has no lease - // at beginning. 2. If this object has soft pin enabled, set it to be soft - // pinned. - metadata.GrantLease(0, default_kv_soft_pin_ttl_); - return {}; -} - -auto MasterService::PutRevoke(const std::string& key) - -> tl::expected { - MetadataAccessor accessor(this, key); - if (!accessor.Exists()) { - LOG(INFO) << "key=" << key << ", info=object_not_found"; - return tl::make_unexpected(ErrorCode::OBJECT_NOT_FOUND); + if (put_success.size() != metadata.replicas.size()) { + LOG(ERROR) << "key=" << key << ", error=invalid_params"; + return tl::make_unexpected(ErrorCode::INVALID_PARAMS); } - auto& metadata = accessor.Get(); - if (auto status = metadata.HasDiffRepStatus(ReplicaStatus::PROCESSING)) { - LOG(ERROR) << "key=" << key << ", status=" << *status - << ", error=invalid_replica_status"; - return tl::make_unexpected(ErrorCode::INVALID_WRITE); + auto replica_it = metadata.replicas.begin(); + auto success_it = put_success.begin(); + while (replica_it != metadata.replicas.end() && + success_it != put_success.end()) { + if (*success_it == PutResult::SUCCESS) { + replica_it->mark_complete(); + } + ++replica_it; + ++success_it; } - accessor.Erase(); - return {}; -} + // remove failed replicas + metadata.replicas.erase( + std::remove_if( + metadata.replicas.begin(), metadata.replicas.end(), + [](const auto& replica) { return !replica.completed(); }), + metadata.replicas.end()); -std::vector> MasterService::BatchPutEnd( - const std::vector& keys) { - std::vector> results; - results.reserve(keys.size()); - for (const auto& key : keys) { - results.emplace_back(PutEnd(key)); + if (metadata.replicas.empty()) { + VLOG(1) << "key=" << key + << ", does not have any valid replicas, remove it"; + accessor.Erase(); + } else { + // 1. Set lease timeout to now, indicating that the object has no lease + // at beginning. 2. If this object has soft pin enabled, set it to be + // soft pinned. + metadata.GrantLease(0, default_kv_soft_pin_ttl_); } - return results; -} -std::vector> MasterService::BatchPutRevoke( - const std::vector& keys) { - std::vector> results; - results.reserve(keys.size()); - for (const auto& key : keys) { - results.emplace_back(PutRevoke(key)); - } - return results; + return {}; } auto MasterService::Remove(const std::string& key) diff --git a/mooncake-store/src/rpc_service.cpp b/mooncake-store/src/rpc_service.cpp new file mode 100644 index 000000000..b84f60836 --- /dev/null +++ b/mooncake-store/src/rpc_service.cpp @@ -0,0 +1,471 @@ +#include "rpc_service.h" + +#include + +#include "rpc_helper.h" + +namespace mooncake { + +WrappedMasterService::WrappedMasterService( + bool enable_gc, uint64_t default_kv_lease_ttl, + uint64_t default_kv_soft_pin_ttl, bool allow_evict_soft_pinned_objects, + bool enable_metric_reporting, uint16_t http_port, double eviction_ratio, + double eviction_high_watermark_ratio, ViewVersionId view_version, + int64_t client_live_ttl_sec, bool enable_ha, const std::string& cluster_id) + : master_service_(enable_gc, default_kv_lease_ttl, default_kv_soft_pin_ttl, + allow_evict_soft_pinned_objects, eviction_ratio, + eviction_high_watermark_ratio, view_version, + client_live_ttl_sec, enable_ha, cluster_id), + http_server_(4, http_port), + metric_report_running_(enable_metric_reporting) { + // Initialize HTTP server for metrics + init_http_server(); + + // Set the config for metric reporting + MasterMetricManager::instance().set_enable_ha(enable_ha); + + // Start metric reporting thread if enabled + if (enable_metric_reporting) { + metric_report_thread_ = std::thread([this]() { + while (metric_report_running_) { + std::string metrics_summary = + MasterMetricManager::instance().get_summary_string(); + LOG(INFO) << "Master Metrics: " << metrics_summary; + std::this_thread::sleep_for( + std::chrono::seconds(kMetricReportIntervalSeconds)); + } + }); + } +} + +WrappedMasterService::~WrappedMasterService() { + metric_report_running_ = false; + if (metric_report_thread_.joinable()) { + metric_report_thread_.join(); + } + // Stop HTTP server + http_server_.stop(); +} + +void WrappedMasterService::init_http_server() { + using namespace coro_http; + + // Endpoint for Prometheus metrics + http_server_.set_http_handler( + "/metrics", [](coro_http_request& req, coro_http_response& resp) { + std::string metrics = + MasterMetricManager::instance().serialize_metrics(); + resp.add_header("Content-Type", "text/plain; version=0.0.4"); + resp.set_status_and_content(status_type::ok, metrics); + }); + + // Endpoint for human-readable metrics summary + http_server_.set_http_handler( + "/metrics/summary", + [](coro_http_request& req, coro_http_response& resp) { + std::string summary = + MasterMetricManager::instance().get_summary_string(); + resp.add_header("Content-Type", "text/plain; version=0.0.4"); + resp.set_status_and_content(status_type::ok, summary); + }); + + // Endpoint for query a key's location + http_server_.set_http_handler( + "/query_key", [&](coro_http_request& req, coro_http_response& resp) { + auto key = req.get_query_value("key"); + auto get_result = GetReplicaList(std::string(key)); + resp.add_header("Content-Type", "text/plain; version=0.0.4"); + if (get_result) { + std::string ss = ""; + for (size_t i = 0; i < get_result.value().size(); i++) { + if (get_result.value()[i].is_memory_replica()) { + auto& memory_descriptors = + get_result.value()[i].get_memory_descriptor(); + for (const auto& handle : + memory_descriptors.buffer_descriptors) { + std::string tmp = ""; + struct_json::to_json(handle, tmp); + ss += tmp; + ss += "\n"; + } + } + } + resp.set_status_and_content(status_type::ok, ss); + } else { + resp.set_status_and_content(status_type::not_found, + toString(get_result.error())); + } + }); + + // Endpoint for query all keys + http_server_.set_http_handler( + "/get_all_keys", [&](coro_http_request& req, coro_http_response& resp) { + resp.add_header("Content-Type", "text/plain; version=0.0.4"); + + auto result = master_service_.GetAllKeys(); + if (result) { + std::string ss = ""; + auto keys = result.value(); + for (const auto& key : keys) { + ss += key; + ss += "\n"; + } + resp.set_status_and_content(status_type::ok, ss); + } else { + resp.set_status_and_content(status_type::internal_server_error, + "Failed to get all keys"); + } + }); + + // Endpoint for query all segments + http_server_.set_http_handler( + "/get_all_segments", + [&](coro_http_request& req, coro_http_response& resp) { + resp.add_header("Content-Type", "text/plain; version=0.0.4"); + auto result = master_service_.GetAllSegments(); + if (result) { + std::string ss = ""; + auto segments = result.value(); + for (const auto& segment_name : segments) { + ss += segment_name; + ss += "\n"; + } + resp.set_status_and_content(status_type::ok, ss); + } else { + resp.set_status_and_content(status_type::internal_server_error, + "Failed to get all segments"); + } + }); + + // Endpoint for query segment details + http_server_.set_http_handler( + "/query_segment", + [&](coro_http_request& req, coro_http_response& resp) { + auto segment = req.get_query_value("segment"); + resp.add_header("Content-Type", "text/plain; version=0.0.4"); + auto result = master_service_.QuerySegments(std::string(segment)); + + if (result) { + std::string ss = ""; + auto [used, capacity] = result.value(); + ss += segment; + ss += "\n"; + ss += "Used(bytes): "; + ss += std::to_string(used); + ss += "\nCapacity(bytes) : "; + ss += std::to_string(capacity); + ss += "\n"; + resp.set_status_and_content(status_type::ok, ss); + } else { + resp.set_status_and_content(status_type::internal_server_error, + "Failed to query segment"); + } + }); + + // Health check endpoint + http_server_.set_http_handler( + "/health", [](coro_http_request& req, coro_http_response& resp) { + resp.add_header("Content-Type", "text/plain; version=0.0.4"); + resp.set_status_and_content(status_type::ok, "OK"); + }); + + // Start the HTTP server asynchronously + http_server_.async_start(); + LOG(INFO) << "HTTP metrics server started on port " << http_server_.port(); +} + +tl::expected WrappedMasterService::ExistKey( + const std::string& key) { + return execute_rpc( + "ExistKey", [&] { return master_service_.ExistKey(key); }, + [&](auto& timer) { timer.LogRequest("key=", key); }, + [] { MasterMetricManager::instance().inc_exist_key_requests(); }, + [] { MasterMetricManager::instance().inc_exist_key_failures(); }); +} + +std::vector> WrappedMasterService::BatchExistKey( + const std::vector& keys) { + ScopedVLogTimer timer(1, "BatchExistKey"); + timer.LogRequest("keys_count=", keys.size()); + MasterMetricManager::instance().inc_batch_exist_key_requests(); + + auto result = master_service_.BatchExistKey(keys); + + // Count failures and log errors + size_t failure_count = 0; + for (size_t i = 0; i < result.size(); ++i) { + if (!result[i].has_value()) { + failure_count++; + LOG(ERROR) << "BatchExistKey failed for key[" << i << "] '" + << keys[i] << "': " << toString(result[i].error()); + } + } + MasterMetricManager::instance().inc_batch_exist_key_failures(failure_count); + + timer.LogResponse("total=", result.size(), + ", success=", result.size() - failure_count, + ", failures=", failure_count); + return result; +} + +tl::expected, ErrorCode> +WrappedMasterService::GetReplicaList(const std::string& key) { + return execute_rpc( + "GetReplicaList", [&] { return master_service_.GetReplicaList(key); }, + [&](auto& timer) { timer.LogRequest("key=", key); }, + [] { MasterMetricManager::instance().inc_get_replica_list_requests(); }, + [] { + MasterMetricManager::instance().inc_get_replica_list_failures(); + }); +} + +std::vector, ErrorCode>> +WrappedMasterService::BatchGetReplicaList( + const std::vector& keys) { + ScopedVLogTimer timer(1, "BatchGetReplicaList"); + timer.LogRequest("keys_count=", keys.size()); + MasterMetricManager::instance().inc_batch_get_replica_list_requests(); + + std::vector, ErrorCode>> + results; + results.reserve(keys.size()); + + for (const auto& key : keys) { + results.emplace_back(master_service_.GetReplicaList(key)); + } + + // Count failures and log errors + size_t failure_count = 0; + for (size_t i = 0; i < results.size(); ++i) { + if (!results[i].has_value()) { + failure_count++; + LOG(ERROR) << "BatchGetReplicaList failed for key[" << i << "] '" + << keys[i] << "': " << toString(results[i].error()); + } + } + MasterMetricManager::instance().inc_batch_get_replica_list_failures( + failure_count); + + timer.LogResponse("total=", results.size(), + ", success=", results.size() - failure_count, + ", failures=", failure_count); + return results; +} + +tl::expected, ErrorCode> +WrappedMasterService::PutStart(const std::string& key, + const std::vector& slice_lengths, + const ReplicateConfig& config) { + return execute_rpc( + "PutStart", + [&] { return master_service_.PutStart(key, slice_lengths, config); }, + [&](auto& timer) { + timer.LogRequest("key=", key, + ", slice_lengths=", slice_lengths.size()); + }, + [&] { MasterMetricManager::instance().inc_put_start_requests(); }, + [] { MasterMetricManager::instance().inc_put_start_failures(); }); +} + +tl::expected WrappedMasterService::PutEnd( + const std::string& key, const std::vector& put_results) { + return execute_rpc( + "PutEnd", [&] { return master_service_.PutEnd(key, put_results); }, + [&](auto& timer) { + timer.LogRequest("key=", key, + ", put_results_size=", put_results.size()); + }, + [] { MasterMetricManager::instance().inc_put_end_requests(); }, + [] { MasterMetricManager::instance().inc_put_end_failures(); }); +} + +std::vector, ErrorCode>> +WrappedMasterService::BatchPutStart( + const std::vector& keys, + const std::vector>& slice_lengths, + const ReplicateConfig& config) { + ScopedVLogTimer timer(1, "BatchPutStart"); + timer.LogRequest("keys_count=", keys.size()); + MasterMetricManager::instance().inc_batch_put_start_requests(); + + std::vector, ErrorCode>> + results; + results.reserve(keys.size()); + + for (size_t i = 0; i < keys.size(); ++i) { + results.emplace_back( + master_service_.PutStart(keys[i], slice_lengths[i], config)); + } + + // Count failures and log errors + size_t failure_count = 0; + for (size_t i = 0; i < results.size(); ++i) { + if (!results[i].has_value()) { + failure_count++; + LOG(ERROR) << "BatchPutStart failed for key[" << i << "] '" + << keys[i] << "': " << toString(results[i].error()); + } + } + MasterMetricManager::instance().inc_batch_put_start_failures(failure_count); + + timer.LogResponse("total=", results.size(), + ", success=", results.size() - failure_count, + ", failures=", failure_count); + return results; +} + +std::vector> WrappedMasterService::BatchPutEnd( + const std::vector& keys, + const std::vector>& put_results) { + ScopedVLogTimer timer(1, "BatchPutEnd"); + timer.LogRequest("keys_count=", keys.size()); + MasterMetricManager::instance().inc_batch_put_end_requests(); + + std::vector> results; + results.reserve(keys.size()); + + if (keys.size() != put_results.size()) { + LOG(ERROR) << "BatchPutEnd failed: keys.size() != put_results.size()"; + return std::vector>( + keys.size(), tl::make_unexpected(ErrorCode::INVALID_PARAMS)); + } + + for (size_t i = 0; i < keys.size(); ++i) { + results.emplace_back(master_service_.PutEnd(keys[i], put_results[i])); + } + + // Count failures and log errors + size_t failure_count = 0; + for (size_t i = 0; i < results.size(); ++i) { + if (!results[i].has_value()) { + failure_count++; + LOG(ERROR) << "BatchPutEnd failed for key[" << i << "] '" << keys[i] + << "': " << toString(results[i].error()); + } + } + MasterMetricManager::instance().inc_batch_put_end_failures(failure_count); + + timer.LogResponse("total=", results.size(), + ", success=", results.size() - failure_count, + ", failures=", failure_count); + return results; +} + +tl::expected WrappedMasterService::Remove( + const std::string& key) { + return execute_rpc( + "Remove", [&] { return master_service_.Remove(key); }, + [&](auto& timer) { timer.LogRequest("key=", key); }, + [] { MasterMetricManager::instance().inc_remove_requests(); }, + [] { MasterMetricManager::instance().inc_remove_failures(); }); +} + +long WrappedMasterService::RemoveAll() { + ScopedVLogTimer timer(1, "RemoveAll"); + timer.LogRequest("action=remove_all_objects"); + MasterMetricManager::instance().inc_remove_all_requests(); + long result = master_service_.RemoveAll(); + timer.LogResponse("items_removed=", result); + return result; +} + +tl::expected WrappedMasterService::MountSegment( + const Segment& segment, const UUID& client_id) { + return execute_rpc( + "MountSegment", + [&] { return master_service_.MountSegment(segment, client_id); }, + [&](auto& timer) { + timer.LogRequest("base=", segment.base, ", size=", segment.size, + ", segment_name=", segment.name, + ", id=", segment.id); + }, + [] { MasterMetricManager::instance().inc_mount_segment_requests(); }, + [] { MasterMetricManager::instance().inc_mount_segment_failures(); }); +} + +tl::expected WrappedMasterService::ReMountSegment( + const std::vector& segments, const UUID& client_id) { + return execute_rpc( + "ReMountSegment", + [&] { return master_service_.ReMountSegment(segments, client_id); }, + [&](auto& timer) { + timer.LogRequest("segments_count=", segments.size(), + ", client_id=", client_id); + }, + [] { MasterMetricManager::instance().inc_remount_segment_requests(); }, + [] { MasterMetricManager::instance().inc_remount_segment_failures(); }); +} + +tl::expected WrappedMasterService::UnmountSegment( + const UUID& segment_id, const UUID& client_id) { + return execute_rpc( + "UnmountSegment", + [&] { return master_service_.UnmountSegment(segment_id, client_id); }, + [&](auto& timer) { + timer.LogRequest("segment_id=", segment_id, + ", client_id=", client_id); + }, + [] { MasterMetricManager::instance().inc_unmount_segment_requests(); }, + [] { MasterMetricManager::instance().inc_unmount_segment_failures(); }); +} + +tl::expected WrappedMasterService::GetFsdir() { + ScopedVLogTimer timer(1, "GetFsdir"); + timer.LogRequest("action=get_fsdir"); + + auto result = master_service_.GetFsdir(); + + timer.LogResponseExpected(result); + return result; +} + +tl::expected, ErrorCode> +WrappedMasterService::Ping(const UUID& client_id) { + ScopedVLogTimer timer(1, "Ping"); + timer.LogRequest("client_id=", client_id); + + MasterMetricManager::instance().inc_ping_requests(); + + auto result = master_service_.Ping(client_id); + + timer.LogResponseExpected(result); + return result; +} + +void RegisterRpcService( + coro_rpc::coro_rpc_server& server, + mooncake::WrappedMasterService& wrapped_master_service) { + server.register_handler<&mooncake::WrappedMasterService::ExistKey>( + &wrapped_master_service); + server.register_handler<&mooncake::WrappedMasterService::GetReplicaList>( + &wrapped_master_service); + server + .register_handler<&mooncake::WrappedMasterService::BatchGetReplicaList>( + &wrapped_master_service); + server.register_handler<&mooncake::WrappedMasterService::PutStart>( + &wrapped_master_service); + server.register_handler<&mooncake::WrappedMasterService::PutEnd>( + &wrapped_master_service); + server.register_handler<&mooncake::WrappedMasterService::BatchPutStart>( + &wrapped_master_service); + server.register_handler<&mooncake::WrappedMasterService::BatchPutEnd>( + &wrapped_master_service); + server.register_handler<&mooncake::WrappedMasterService::Remove>( + &wrapped_master_service); + server.register_handler<&mooncake::WrappedMasterService::RemoveAll>( + &wrapped_master_service); + server.register_handler<&mooncake::WrappedMasterService::MountSegment>( + &wrapped_master_service); + server.register_handler<&mooncake::WrappedMasterService::ReMountSegment>( + &wrapped_master_service); + server.register_handler<&mooncake::WrappedMasterService::UnmountSegment>( + &wrapped_master_service); + server.register_handler<&mooncake::WrappedMasterService::Ping>( + &wrapped_master_service); + server.register_handler<&mooncake::WrappedMasterService::GetFsdir>( + &wrapped_master_service); + server.register_handler<&mooncake::WrappedMasterService::BatchExistKey>( + &wrapped_master_service); +} + +} // namespace mooncake diff --git a/mooncake-store/tests/CMakeLists.txt b/mooncake-store/tests/CMakeLists.txt index 83ba7c3d8..ce5c4caca 100644 --- a/mooncake-store/tests/CMakeLists.txt +++ b/mooncake-store/tests/CMakeLists.txt @@ -14,6 +14,10 @@ add_executable(master_service_test master_service_test.cpp) target_link_libraries(master_service_test PUBLIC mooncake_store cachelib_memory_allocator ${ETCD_WRAPPER_LIB} glog gtest gtest_main pthread) add_test(NAME master_service_test COMMAND master_service_test) +add_executable(master_service_put_test master_service_put_test.cpp) +target_link_libraries(master_service_put_test PUBLIC mooncake_store cachelib_memory_allocator ${ETCD_WRAPPER_LIB} glog gtest gtest_main pthread) +add_test(NAME master_service_put_test COMMAND master_service_put_test) + add_executable(client_integration_test client_integration_test.cpp) target_link_libraries(client_integration_test PUBLIC mooncake_store diff --git a/mooncake-store/tests/master_metrics_test.cpp b/mooncake-store/tests/master_metrics_test.cpp index bb966dfbf..1b9358a4d 100644 --- a/mooncake-store/tests/master_metrics_test.cpp +++ b/mooncake-store/tests/master_metrics_test.cpp @@ -1,9 +1,6 @@ #include #include -#include -#include -#include #include #include @@ -41,8 +38,6 @@ TEST_F(MasterMetricsTest, InitialStatusTest) { ASSERT_EQ(metrics.get_put_start_failures(), 0); ASSERT_EQ(metrics.get_put_end_requests(), 0); ASSERT_EQ(metrics.get_put_end_failures(), 0); - ASSERT_EQ(metrics.get_put_revoke_requests(), 0); - ASSERT_EQ(metrics.get_put_revoke_failures(), 0); ASSERT_EQ(metrics.get_get_replica_list_requests(), 0); ASSERT_EQ(metrics.get_get_replica_list_failures(), 0); ASSERT_EQ(metrics.get_exist_key_requests(), 0); @@ -71,8 +66,6 @@ TEST_F(MasterMetricsTest, InitialStatusTest) { ASSERT_EQ(metrics.get_batch_put_start_failures(), 0); ASSERT_EQ(metrics.get_batch_put_end_requests(), 0); ASSERT_EQ(metrics.get_batch_put_end_failures(), 0); - ASSERT_EQ(metrics.get_batch_put_revoke_requests(), 0); - ASSERT_EQ(metrics.get_batch_put_revoke_failures(), 0); } TEST_F(MasterMetricsTest, BasicRequestTest) { @@ -107,19 +100,20 @@ TEST_F(MasterMetricsTest, BasicRequestTest) { ASSERT_EQ(metrics.get_mount_segment_requests(), 1); ASSERT_EQ(metrics.get_mount_segment_failures(), 0); - // Test PutStart and PutRevoke request + // Test PutStart and PutEnd request auto put_start_result1 = service_.PutStart(key, slice_lengths, config); ASSERT_TRUE(put_start_result1.has_value()); ASSERT_EQ(metrics.get_key_count(), 1); ASSERT_EQ(metrics.get_allocated_size(), value_length); ASSERT_EQ(metrics.get_put_start_requests(), 1); ASSERT_EQ(metrics.get_put_start_failures(), 0); - auto put_revoke_result = service_.PutRevoke(key); + std::vector put_results = {PutResult::FAILED}; + auto put_revoke_result = service_.PutEnd(key, put_results); ASSERT_TRUE(put_revoke_result.has_value()); ASSERT_EQ(metrics.get_key_count(), 0); ASSERT_EQ(metrics.get_allocated_size(), 0); - ASSERT_EQ(metrics.get_put_revoke_requests(), 1); - ASSERT_EQ(metrics.get_put_revoke_failures(), 0); + ASSERT_EQ(metrics.get_put_end_requests(), 1); + ASSERT_EQ(metrics.get_put_end_failures(), 0); // Test PutStart and PutEnd request auto put_start_result2 = service_.PutStart(key, slice_lengths, config); @@ -128,11 +122,12 @@ TEST_F(MasterMetricsTest, BasicRequestTest) { ASSERT_EQ(metrics.get_allocated_size(), value_length); ASSERT_EQ(metrics.get_put_start_requests(), 2); ASSERT_EQ(metrics.get_put_start_failures(), 0); - auto put_end_result = service_.PutEnd(key); + put_results[0] = PutResult::SUCCESS; + auto put_end_result = service_.PutEnd(key, put_results); ASSERT_TRUE(put_end_result.has_value()); ASSERT_EQ(metrics.get_key_count(), 1); ASSERT_EQ(metrics.get_allocated_size(), value_length); - ASSERT_EQ(metrics.get_put_end_requests(), 1); + ASSERT_EQ(metrics.get_put_end_requests(), 2); ASSERT_EQ(metrics.get_put_end_failures(), 0); // Test ExistKey request @@ -160,7 +155,7 @@ TEST_F(MasterMetricsTest, BasicRequestTest) { // Test RemoveAll request auto put_start_result3 = service_.PutStart(key, slice_lengths, config); ASSERT_TRUE(put_start_result3.has_value()); - auto put_end_result2 = service_.PutEnd(key); + auto put_end_result2 = service_.PutEnd(key, put_results); ASSERT_TRUE(put_end_result2.has_value()); ASSERT_EQ(metrics.get_key_count(), 1); ASSERT_EQ(1, service_.RemoveAll()); @@ -172,7 +167,7 @@ TEST_F(MasterMetricsTest, BasicRequestTest) { // Test UnmountSegment request auto put_start_result4 = service_.PutStart(key, slice_lengths, config); ASSERT_TRUE(put_start_result4.has_value()); - auto put_end_result3 = service_.PutEnd(key); + auto put_end_result3 = service_.PutEnd(key, put_results); ASSERT_TRUE(put_end_result3.has_value()); auto unmount_result = service_.UnmountSegment(segment_id, client_id); ASSERT_TRUE(unmount_result.has_value()); @@ -229,7 +224,9 @@ TEST_F(MasterMetricsTest, BatchRequestTest) { ASSERT_EQ(metrics.get_batch_get_replica_list_failures(), 3); // Test BatchPutEnd request - auto batch_put_end_result = service_.BatchPutEnd(keys); + std::vector> put_results = { + {PutResult::SUCCESS}, {PutResult::SUCCESS}, {PutResult::SUCCESS}}; + auto batch_put_end_result = service_.BatchPutEnd(keys, put_results); ASSERT_EQ(batch_put_end_result.size(), 3); ASSERT_EQ(metrics.get_batch_put_end_requests(), 1); ASSERT_EQ(metrics.get_batch_put_end_failures(), 0); @@ -246,11 +243,11 @@ TEST_F(MasterMetricsTest, BatchRequestTest) { ASSERT_EQ(metrics.get_batch_get_replica_list_requests(), 2); ASSERT_EQ(metrics.get_batch_get_replica_list_failures(), 3); - // Test BatchPutRevoke request (should all fail) - auto batch_put_revoke_result = service_.BatchPutRevoke(keys); - ASSERT_EQ(batch_put_revoke_result.size(), 3); - ASSERT_EQ(metrics.get_batch_put_revoke_requests(), 1); - ASSERT_EQ(metrics.get_batch_put_revoke_failures(), 3); + // Test BatchPutEnd request (should all fail) + auto batch_put_end_result2 = service_.BatchPutEnd(keys, put_results); + ASSERT_EQ(batch_put_end_result2.size(), 3); + ASSERT_EQ(metrics.get_batch_put_end_requests(), 2); + ASSERT_EQ(metrics.get_batch_put_end_failures(), 0); } } // namespace mooncake::test diff --git a/mooncake-store/tests/master_service_put_test.cpp b/mooncake-store/tests/master_service_put_test.cpp new file mode 100644 index 000000000..8243c17ff --- /dev/null +++ b/mooncake-store/tests/master_service_put_test.cpp @@ -0,0 +1,125 @@ +#include +#include + +#include +#include + +#include "master_service.h" +#include "rpc_service.h" +#include "types.h" + +namespace mooncake::test { + +class MasterServicePutTest : public ::testing::Test { + protected: + std::unique_ptr service_; + UUID client_id_; + const testing::TestInfo* test_info_; + + void SetUp() override { + service_ = std::make_unique(); + client_id_ = generate_uuid(); + test_info_ = ::testing::UnitTest::GetInstance()->current_test_info(); + google::InitGoogleLogging(test_info_->name()); + FLAGS_logtostderr = true; + + constexpr size_t buffer = 0x300000000; + constexpr size_t size = 1024 * 1024 * 16; + std::string segment_name = "test_segment"; + Segment segment(generate_uuid(), segment_name, buffer, size); + auto mount_result = service_->MountSegment(segment, client_id_); + ASSERT_TRUE(mount_result.has_value()); + } + + void TearDown() override { google::ShutdownGoogleLogging(); } +}; + +TEST_F(MasterServicePutTest, PutMultipleReplicasEndAllSuccess) { + std::string key = "test_multi_replica_key"; + std::vector slice_lengths = {1024}; + ReplicateConfig config; + config.replica_num = 3; + + auto put_start_result = service_->PutStart(key, slice_lengths, config); + ASSERT_TRUE(put_start_result.has_value()); + auto replica_list = put_start_result.value(); + ASSERT_EQ(config.replica_num, replica_list.size()); + + for (const auto& replica : replica_list) { + EXPECT_EQ(ReplicaStatus::PROCESSING, replica.status); + } + + // During put, Get/Remove should fail + auto get_replica_result = service_->GetReplicaList(key); + EXPECT_FALSE(get_replica_result.has_value()); + EXPECT_EQ(ErrorCode::REPLICA_IS_NOT_READY, get_replica_result.error()); + auto remove_result = service_->Remove(key); + EXPECT_FALSE(remove_result.has_value()); + EXPECT_EQ(ErrorCode::REPLICA_IS_NOT_READY, remove_result.error()); + + // Test PutEnd with multiple successful results + std::vector put_results(config.replica_num, PutResult::SUCCESS); + auto put_end_result = service_->PutEnd(key, put_results); + EXPECT_TRUE(put_end_result.has_value()); + + // Verify replica list after PutEnd + auto final_get_result = service_->GetReplicaList(key); + EXPECT_TRUE(final_get_result.has_value()); + replica_list = final_get_result.value(); + EXPECT_EQ(config.replica_num, replica_list.size()); + for (const auto& replica : replica_list) { + EXPECT_EQ(ReplicaStatus::COMPLETE, replica.status); + } +} + +TEST_F(MasterServicePutTest, PutMultipleReplicasOneFailed) { + std::string key = "test_multi_replica_one_failed_key"; + std::vector slice_lengths = {1024}; + ReplicateConfig config; + config.replica_num = 3; + + auto put_start_result = service_->PutStart(key, slice_lengths, config); + ASSERT_TRUE(put_start_result.has_value()); + auto replica_list = put_start_result.value(); + ASSERT_EQ(config.replica_num, replica_list.size()); + + // Test PutEnd with one failure + std::vector put_results(config.replica_num, PutResult::SUCCESS); + put_results[1] = PutResult::FAILED; + auto put_end_result = service_->PutEnd(key, put_results); + EXPECT_TRUE(put_end_result.has_value()); + + // Verify replica list after PutEnd, the object should be removed + auto final_get_result = service_->GetReplicaList(key); + EXPECT_FALSE(final_get_result.has_value()); + EXPECT_EQ(ErrorCode::OBJECT_NOT_FOUND, final_get_result.error()); +} + +TEST_F(MasterServicePutTest, PutMultipleReplicasEndAllFailed) { + std::string key = "test_multi_replica_all_failed_key"; + std::vector slice_lengths = {1024}; + ReplicateConfig config; + config.replica_num = 3; + + auto put_start_result = service_->PutStart(key, slice_lengths, config); + ASSERT_TRUE(put_start_result.has_value()); + auto replica_list = put_start_result.value(); + ASSERT_EQ(config.replica_num, replica_list.size()); + + // Test PutEnd with all failures + std::vector put_results(config.replica_num, PutResult::FAILED); + auto put_end_result = service_->PutEnd(key, put_results); + EXPECT_TRUE(put_end_result.has_value()); + + // Verify replica list after PutEnd, the object should be removed + auto final_get_result = service_->GetReplicaList(key); + EXPECT_FALSE(final_get_result.has_value()); + EXPECT_EQ(ErrorCode::OBJECT_NOT_FOUND, final_get_result.error()); +} + +} // namespace mooncake::test + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/mooncake-store/tests/master_service_test.cpp b/mooncake-store/tests/master_service_test.cpp index fbf04da10..5404ff26d 100644 --- a/mooncake-store/tests/master_service_test.cpp +++ b/mooncake-store/tests/master_service_test.cpp @@ -9,6 +9,7 @@ #include #include +#include "rpc_service.h" #include "types.h" namespace mooncake::test { @@ -54,7 +55,7 @@ std::string GenerateKeyForSegment(const std::unique_ptr& service, throw std::runtime_error("PutStart failed with code: " + std::to_string(static_cast(code))); } - auto put_end_result = service->PutEnd(key); + auto put_end_result = service->PutEnd(key, {PutResult::SUCCESS}); if (!put_end_result.has_value()) { throw std::runtime_error("PutEnd failed"); } @@ -275,7 +276,7 @@ TEST_F(MasterServiceTest, PutStartEndFlow) { EXPECT_EQ(ErrorCode::REPLICA_IS_NOT_READY, remove_result.error()); // Test PutEnd - auto put_end_result = service_->PutEnd(key); + auto put_end_result = service_->PutEnd(key, {PutResult::SUCCESS}); EXPECT_TRUE(put_end_result.has_value()); // Verify replica list after PutEnd @@ -321,7 +322,8 @@ TEST_F(MasterServiceTest, RandomPutStartEndFlow) { EXPECT_FALSE(remove_result.has_value()); EXPECT_EQ(ErrorCode::REPLICA_IS_NOT_READY, remove_result.error()); // Test PutEnd - auto put_end_result = service_->PutEnd(key); + auto put_end_result = service_->PutEnd( + key, std::vector(random_number, PutResult::SUCCESS)); EXPECT_TRUE(put_end_result.has_value()); // Verify replica list after PutEnd auto get_result2 = service_->GetReplicaList(key); @@ -357,7 +359,7 @@ TEST_F(MasterServiceTest, GetReplicaList) { config.replica_num = 1; auto put_start_result = service_->PutStart(key, slice_lengths, config); ASSERT_TRUE(put_start_result.has_value()); - auto put_end_result = service_->PutEnd(key); + auto put_end_result = service_->PutEnd(key, {PutResult::SUCCESS}); ASSERT_TRUE(put_end_result.has_value()); // Test getting existing key @@ -386,7 +388,7 @@ TEST_F(MasterServiceTest, RemoveObject) { config.replica_num = 1; auto put_start_result = service_->PutStart(key, slice_lengths, config); ASSERT_TRUE(put_start_result.has_value()); - auto put_end_result = service_->PutEnd(key); + auto put_end_result = service_->PutEnd(key, {PutResult::SUCCESS}); ASSERT_TRUE(put_end_result.has_value()); // Test removing the object @@ -427,7 +429,7 @@ TEST_F(MasterServiceTest, RandomRemoveObject) { config.replica_num = 1; auto put_start_result = service_->PutStart(key, slice_lengths, config); ASSERT_TRUE(put_start_result.has_value()); - auto put_end_result = service_->PutEnd(key); + auto put_end_result = service_->PutEnd(key, {PutResult::SUCCESS}); ASSERT_TRUE(put_end_result.has_value()); // Test removing the object @@ -463,7 +465,7 @@ TEST_F(MasterServiceTest, RemoveAll) { config.replica_num = 1; auto put_start_result = service_->PutStart(key, slice_lengths, config); ASSERT_TRUE(put_start_result.has_value()); - auto put_end_result = service_->PutEnd(key); + auto put_end_result = service_->PutEnd(key, {PutResult::SUCCESS}); ASSERT_TRUE(put_end_result.has_value()); auto exist_result = service_->ExistKey(key); ASSERT_TRUE(exist_result.has_value()); @@ -553,7 +555,8 @@ TEST_F(MasterServiceTest, MultiSliceMultiReplicaFlow) { EXPECT_EQ(ErrorCode::REPLICA_IS_NOT_READY, get_result.error()); // Complete the put operation - auto put_end_result = service_->PutEnd(key); + auto put_end_result = service_->PutEnd( + key, std::vector(replica_list.size(), PutResult::SUCCESS)); ASSERT_TRUE(put_end_result.has_value()); // Test GetReplicaList after completion @@ -625,7 +628,8 @@ TEST_F(MasterServiceTest, ConcurrentGarbageCollectionTest) { auto put_start_result = service_->PutStart(key, slice_lengths, config); ASSERT_TRUE(put_start_result.has_value()); - auto put_end_result = service_->PutEnd(key); + auto put_end_result = + service_->PutEnd(key, {PutResult::SUCCESS}); ASSERT_TRUE(put_end_result.has_value()); // Add the key to the tracking list @@ -691,7 +695,7 @@ TEST_F(MasterServiceTest, CleanupStaleHandlesTest) { // Create the object auto put_start_result = service_->PutStart(key, slice_lengths, config); ASSERT_TRUE(put_start_result.has_value()); - auto put_end_result = service_->PutEnd(key); + auto put_end_result = service_->PutEnd(key, {PutResult::SUCCESS}); ASSERT_TRUE(put_end_result.has_value()); // Verify object exists @@ -718,7 +722,7 @@ TEST_F(MasterServiceTest, CleanupStaleHandlesTest) { std::string key2 = "another_segment_object"; auto put_start_result2 = service_->PutStart(key2, slice_lengths, config); ASSERT_TRUE(put_start_result2.has_value()); - auto put_end_result2 = service_->PutEnd(key2); + auto put_end_result2 = service_->PutEnd(key2, {PutResult::SUCCESS}); ASSERT_TRUE(put_end_result2.has_value()); // Verify we can get it @@ -766,7 +770,8 @@ TEST_F(MasterServiceTest, ConcurrentWriteAndRemoveAll) { auto put_start_result = service_->PutStart(key, slice_lengths, config); if (put_start_result.has_value()) { - auto put_end_result = service_->PutEnd(key); + auto put_end_result = + service_->PutEnd(key, {PutResult::SUCCESS}); if (put_end_result.has_value()) { success_writes++; } @@ -832,7 +837,7 @@ TEST_F(MasterServiceTest, ConcurrentReadAndRemoveAll) { auto put_start_result = service_->PutStart(key, slice_lengths, config); ASSERT_TRUE(put_start_result.has_value()); - auto put_end_result = service_->PutEnd(key); + auto put_end_result = service_->PutEnd(key, {PutResult::SUCCESS}); ASSERT_TRUE(put_end_result.has_value()); } @@ -913,7 +918,7 @@ TEST_F(MasterServiceTest, ConcurrentRemoveAllOperations) { auto put_start_result = service_->PutStart(key, slice_lengths, config); ASSERT_TRUE(put_start_result.has_value()); - auto put_end_result = service_->PutEnd(key); + auto put_end_result = service_->PutEnd(key, {PutResult::SUCCESS}); ASSERT_TRUE(put_end_result.has_value()); } @@ -987,7 +992,7 @@ TEST_F(MasterServiceTest, UnmountSegmentImmediateCleanup) { auto put_start_result = service_->PutStart(key1, slice_lengths, config); ASSERT_TRUE(put_start_result.has_value()); replica_list = put_start_result.value(); - auto put_end_result = service_->PutEnd(key1); + auto put_end_result = service_->PutEnd(key1, {PutResult::SUCCESS}); ASSERT_TRUE(put_end_result.has_value()); auto get_result3 = service_->GetReplicaList(key1); ASSERT_TRUE(get_result3.has_value()); @@ -1080,7 +1085,7 @@ TEST_F(MasterServiceTest, RemoveLeasedObject) { // Verify lease is granted on ExistsKey auto put_start_result = service_->PutStart(key, slice_lengths, config); ASSERT_TRUE(put_start_result.has_value()); - auto put_end_result = service_->PutEnd(key); + auto put_end_result = service_->PutEnd(key, {PutResult::SUCCESS}); ASSERT_TRUE(put_end_result.has_value()); auto exist_result = service_->ExistKey(key); ASSERT_TRUE(exist_result.has_value()); @@ -1094,7 +1099,7 @@ TEST_F(MasterServiceTest, RemoveLeasedObject) { // Verify lease is extended on successive ExistsKey auto put_start_result2 = service_->PutStart(key, slice_lengths, config); ASSERT_TRUE(put_start_result2.has_value()); - auto put_end_result2 = service_->PutEnd(key); + auto put_end_result2 = service_->PutEnd(key, {PutResult::SUCCESS}); ASSERT_TRUE(put_end_result2.has_value()); auto exist_result2 = service_->ExistKey(key); ASSERT_TRUE(exist_result2.has_value()); @@ -1111,7 +1116,7 @@ TEST_F(MasterServiceTest, RemoveLeasedObject) { // Verify lease is granted on GetReplicaList auto put_start_result3 = service_->PutStart(key, slice_lengths, config); ASSERT_TRUE(put_start_result3.has_value()); - auto put_end_result3 = service_->PutEnd(key); + auto put_end_result3 = service_->PutEnd(key, {PutResult::SUCCESS}); ASSERT_TRUE(put_end_result3.has_value()); auto get_result = service_->GetReplicaList(key); ASSERT_TRUE(get_result.has_value()); @@ -1125,7 +1130,7 @@ TEST_F(MasterServiceTest, RemoveLeasedObject) { // Verify lease is extended on successive GetReplicaList auto put_start_result4 = service_->PutStart(key, slice_lengths, config); ASSERT_TRUE(put_start_result4.has_value()); - auto put_end_result4 = service_->PutEnd(key); + auto put_end_result4 = service_->PutEnd(key, {PutResult::SUCCESS}); ASSERT_TRUE(put_end_result4.has_value()); auto get_result2 = service_->GetReplicaList(key); ASSERT_TRUE(get_result2.has_value()); @@ -1164,7 +1169,7 @@ TEST_F(MasterServiceTest, RemoveAllLeasedObject) { config.replica_num = 1; auto put_start_result = service_->PutStart(key, slice_lengths, config); ASSERT_TRUE(put_start_result.has_value()); - auto put_end_result = service_->PutEnd(key); + auto put_end_result = service_->PutEnd(key, {PutResult::SUCCESS}); ASSERT_TRUE(put_end_result.has_value()); if (i >= 5) { auto exist_result = service_->ExistKey(key); @@ -1214,7 +1219,7 @@ TEST_F(MasterServiceTest, EvictObject) { config.replica_num = 1; auto put_start_result = service_->PutStart(key, slice_lengths, config); if (put_start_result.has_value()) { - auto put_end_result = service_->PutEnd(key); + auto put_end_result = service_->PutEnd(key, {PutResult::SUCCESS}); ASSERT_TRUE(put_end_result.has_value()); success_puts++; } else { @@ -1252,7 +1257,7 @@ TEST_F(MasterServiceTest, TryEvictLeasedObject) { config.replica_num = 1; auto put_start_result = service_->PutStart(key, slice_lengths, config); if (put_start_result.has_value()) { - auto put_end_result = service_->PutEnd(key); + auto put_end_result = service_->PutEnd(key, {PutResult::SUCCESS}); ASSERT_TRUE(put_end_result.has_value()); // the object is leased auto get_result = service_->GetReplicaList(key); @@ -1299,12 +1304,12 @@ TEST_F(MasterServiceTest, RemoveSoftPinObject) { // Verify soft pin does not block remove ASSERT_TRUE(service_->PutStart(key, slice_lengths, config).has_value()); - ASSERT_TRUE(service_->PutEnd(key).has_value()); + ASSERT_TRUE(service_->PutEnd(key, {PutResult::SUCCESS}).has_value()); EXPECT_TRUE(service_->Remove(key).has_value()); // Verify soft pin does not block RemoveAll ASSERT_TRUE(service_->PutStart(key, slice_lengths, config).has_value()); - ASSERT_TRUE(service_->PutEnd(key).has_value()); + ASSERT_TRUE(service_->PutEnd(key, {PutResult::SUCCESS}).has_value()); EXPECT_EQ(1, service_->RemoveAll()); } @@ -1340,7 +1345,8 @@ TEST_F(MasterServiceTest, SoftPinObjectsNotEvictedBeforeOtherObjects) { ASSERT_TRUE( service_->PutStart(pin_key, slice_lengths, soft_pin_config) .has_value()); - ASSERT_TRUE(service_->PutEnd(pin_key).has_value()); + ASSERT_TRUE( + service_->PutEnd(pin_key, {PutResult::SUCCESS}).has_value()); } // Fill the segment to trigger eviction @@ -1351,7 +1357,8 @@ TEST_F(MasterServiceTest, SoftPinObjectsNotEvictedBeforeOtherObjects) { ReplicateConfig config; config.replica_num = 1; if (service_->PutStart(key, slice_lengths, config).has_value()) { - ASSERT_TRUE(service_->PutEnd(key).has_value()); + ASSERT_TRUE( + service_->PutEnd(key, {PutResult::SUCCESS}).has_value()); } else { failed_puts++; } @@ -1399,7 +1406,8 @@ TEST_F(MasterServiceTest, SoftPinObjectsCanBeEvicted) { config.replica_num = 1; config.with_soft_pin = true; if (service_->PutStart(key, slice_lengths, config).has_value()) { - ASSERT_TRUE(service_->PutEnd(key).has_value()); + ASSERT_TRUE( + service_->PutEnd(key, {PutResult::SUCCESS}).has_value()); success_puts++; } else { // wait for gc thread to work @@ -1446,7 +1454,8 @@ TEST_F(MasterServiceTest, SoftPinExtendedOnGet) { ASSERT_TRUE( service_->PutStart(pin_key, slice_lengths, soft_pin_config)); - ASSERT_TRUE(service_->PutEnd(pin_key).has_value()); + ASSERT_TRUE( + service_->PutEnd(pin_key, {PutResult::SUCCESS}).has_value()); } // Wait for the soft pin to expire @@ -1466,7 +1475,8 @@ TEST_F(MasterServiceTest, SoftPinExtendedOnGet) { ReplicateConfig config; config.replica_num = 1; if (service_->PutStart(key, slice_lengths, config).has_value()) { - ASSERT_TRUE(service_->PutEnd(key).has_value()); + ASSERT_TRUE( + service_->PutEnd(key, {PutResult::SUCCESS}).has_value()); } else { failed_puts++; } @@ -1517,7 +1527,8 @@ TEST_F(MasterServiceTest, SoftPinObjectsNotAllowEvict) { config.replica_num = 1; config.with_soft_pin = true; if (service_->PutStart(key, slice_lengths, config).has_value()) { - ASSERT_TRUE(service_->PutEnd(key).has_value()); + ASSERT_TRUE( + service_->PutEnd(key, {PutResult::SUCCESS}).has_value()); success_keys.push_back(key); } else { // wait for gc thread to work @@ -1556,7 +1567,8 @@ TEST_F(MasterServiceTest, BatchExistKeyTest) { auto put_start_result = service_->PutStart(test_keys[i], slice_lengths, config); ASSERT_TRUE(put_start_result.has_value()); - auto put_end_result = service_->PutEnd(test_keys[i]); + auto put_end_result = + service_->PutEnd(test_keys[i], {PutResult::SUCCESS}); ASSERT_TRUE(put_end_result.has_value()); }