diff --git a/src/spider/CMakeLists.txt b/src/spider/CMakeLists.txt index da1e3f23..a29fd233 100644 --- a/src/spider/CMakeLists.txt +++ b/src/spider/CMakeLists.txt @@ -68,6 +68,8 @@ set(SPIDER_WORKER_SOURCES worker/ChildPid.cpp worker/DllLoader.hpp worker/DllLoader.cpp + worker/WorkerErrorCode.cpp + worker/WorkerErrorCode.hpp worker/Process.hpp worker/Process.cpp worker/TaskExecutor.hpp diff --git a/src/spider/worker/WorkerErrorCode.cpp b/src/spider/worker/WorkerErrorCode.cpp new file mode 100644 index 00000000..735c35fa --- /dev/null +++ b/src/spider/worker/WorkerErrorCode.cpp @@ -0,0 +1,40 @@ +#include "WorkerErrorCode.hpp" + +#include +#include + +#include + +namespace spider::worker { +using WorkerErrorCategory = ystdlib::error_handling::ErrorCategory; + +constexpr std::string_view cWorkerErrorCategoryName = "Worker Error Code"; +} // namespace spider::worker + +template <> +auto spider::worker::WorkerErrorCategory::name() const noexcept -> char const* { + return spider::worker::cWorkerErrorCategoryName.data(); +} + +template <> +auto spider::worker::WorkerErrorCategory::message(spider::worker::WorkerErrorCodeEnum code) const + -> std::string { + switch (code) { + case spider::worker::WorkerErrorCodeEnum::Success: + return "Success"; + case spider::worker::WorkerErrorCodeEnum::CmdLineArgumentInvalid: + return "Invalid command line argument"; + case spider::worker::WorkerErrorCodeEnum::TaskArgumentInvalid: + return "Invalid task argument"; + case spider::worker::WorkerErrorCodeEnum::TaskFailed: + return "Task failed"; + case spider::worker::WorkerErrorCodeEnum::TaskOutputUnavailable: + return "Task output unavailable"; + case spider::worker::WorkerErrorCodeEnum::TaskOutputInvalid: + return "Task output invalid"; + case spider::worker::WorkerErrorCodeEnum::StorageError: + return "Storage error"; + default: + return "Unknown error"; + } +} diff --git a/src/spider/worker/WorkerErrorCode.hpp b/src/spider/worker/WorkerErrorCode.hpp new file mode 100644 index 00000000..010919cd --- /dev/null +++ b/src/spider/worker/WorkerErrorCode.hpp @@ -0,0 +1,25 @@ +#ifndef SPIDER_WORKER_ERROR_CODE_HPP +#define SPIDER_WORKER_ERROR_CODE_HPP + +#include + +#include + +namespace spider::worker { +enum class WorkerErrorCodeEnum : uint8_t { + Success = 0, + CmdLineArgumentInvalid = 1, + TaskArgumentInvalid = 2, + TaskFailed = 3, + TaskOutputUnavailable = 4, + TaskOutputInvalid = 5, + // TODO: Move storage related errors to an ErrorCode in the storage namespace. + StorageError = 6, +}; + +using WorkerErrorCode = ystdlib::error_handling::ErrorCode; +} // namespace spider::worker + +YSTDLIB_ERROR_HANDLING_MARK_AS_ERROR_CODE_ENUM(spider::worker::WorkerErrorCodeEnum); + +#endif diff --git a/src/spider/worker/worker.cpp b/src/spider/worker/worker.cpp index ade6ce6b..273405d1 100644 --- a/src/spider/worker/worker.cpp +++ b/src/spider/worker/worker.cpp @@ -32,6 +32,11 @@ #include #include // IWYU pragma: keep #include +// Temporarily disable quickcpplib to include valgrind headers to avoid +// conflicts with abseil valgrind headers, until the issue is resolved at +// https://github.com/y-scope/ystdlib-cpp/issues/59. +#undef QUICKCPPLIB_ENABLE_VALGRIND +#include #include #include @@ -49,6 +54,7 @@ #include #include #include +#include constexpr int cCmdArgParseErr = 1; constexpr int cSignalHandleErr = 2; @@ -180,7 +186,7 @@ fetch_task(spider::worker::WorkerClient& client, std::optional const& storage_factory, std::shared_ptr const& metadata_store, spider::core::TaskInstance const& instance, spider::core::Task& task -) -> std::optional> { +) -> ystdlib::error_handling:: + Result, spider::worker::WorkerErrorCode> { std::variant, spider::core::StorageErr> conn_result = storage_factory->provide_storage_connection(); if (std::holds_alternative(conn_result)) { @@ -204,7 +211,7 @@ auto setup_task( "Failed to connect to storage: {}", std::get(conn_result).description ); - return std::nullopt; + return spider::worker::WorkerErrorCodeEnum::StorageError; } std::unique_ptr conn = std::move(std::get>(conn_result)); @@ -212,21 +219,30 @@ auto setup_task( spider::core::StorageErr const err = metadata_store->get_task(*conn, instance.task_id, &task); if (!err.success()) { spdlog::error("Failed to fetch task detail: {}", err.description); - return std::nullopt; + return spider::worker::WorkerErrorCodeEnum::StorageError; } std::optional> optional_arg_buffers = task.get_arg_buffers(); if (!optional_arg_buffers.has_value()) { spdlog::error("Failed to fetch task arguments"); metadata_store->task_fail(*conn, instance, fmt::format("Failed to fetch task arguments")); - return std::nullopt; + return spider::worker::WorkerErrorCodeEnum::TaskArgumentInvalid; } - return optional_arg_buffers; + return std::move(optional_arg_buffers.value()); } +/** + * Parses the task outputs from the result buffers. + * + * @param task The task that was executed. + * @param result_buffers The result buffers containing the outputs. + * @return A vector of TaskOutput objects if parsing is successful. + * @return WorkerErrorCode::TaskOutputInvalid if any failure occurs. + */ auto parse_outputs(spider::core::Task const& task, std::vector const& result_buffers) - -> std::optional> { + -> ystdlib::error_handling:: + Result, spider::worker::WorkerErrorCode> { std::vector outputs; outputs.reserve(task.get_num_outputs()); for (size_t i = 0; i < task.get_num_outputs(); ++i) { @@ -244,7 +260,7 @@ parse_outputs(spider::core::Task const& task, std::vector cons "Task {} failed to parse result as data id", task.get_function_name() ); - return std::nullopt; + return spider::worker::WorkerErrorCodeEnum::TaskOutputInvalid; } } else { msgpack::sbuffer const& buffer = result_buffers[i]; @@ -263,7 +279,8 @@ parse_outputs(spider::core::Task const& task, std::vector cons * @param instance Task instance that was executed. * @param task The task that was executed. * @param executor The executor that ran the task. - * @return true if results were successfully handled, false if any errors occurred. + * @return ystdlib::error_handling::success() if successful. + * @return WorkerErrorCode if any failure occurs. */ auto handle_executor_result( std::shared_ptr const& storage_factory, @@ -271,7 +288,7 @@ auto handle_executor_result( spider::core::TaskInstance const& instance, spider::core::Task const& task, spider::worker::TaskExecutor& executor -) -> bool { +) -> ystdlib::error_handling::Result { std::variant, spider::core::StorageErr> conn_result = storage_factory->provide_storage_connection(); if (std::holds_alternative(conn_result)) { @@ -279,7 +296,7 @@ auto handle_executor_result( "Failed to connect to storage: {}", std::get(conn_result).description ); - return false; + return spider::worker::WorkerErrorCodeEnum::StorageError; } auto conn = std::move(std::get>(conn_result)); @@ -290,7 +307,7 @@ auto handle_executor_result( instance, fmt::format("Task {} failed", task.get_function_name()) ); - return false; + return spider::worker::WorkerErrorCodeEnum::TaskFailed; } // Parse result @@ -303,12 +320,14 @@ auto handle_executor_result( instance, fmt::format("Task {} failed to parse result into buffers", task.get_function_name()) ); - return false; + return spider::worker::WorkerErrorCodeEnum::TaskOutputUnavailable; } std::vector const& result_buffers = optional_result_buffers.value(); - std::optional> const optional_outputs + ystdlib::error_handling::Result< + std::vector, + spider::worker::WorkerErrorCode> const output_result = parse_outputs(task, result_buffers); - if (!optional_outputs.has_value()) { + if (!output_result.has_value()) { metadata_store->task_fail( *conn, instance, @@ -317,10 +336,10 @@ auto handle_executor_result( task.get_function_name() ) ); - return false; + return output_result.error(); } - std::vector const& outputs = optional_outputs.value(); + std::vector const& outputs = output_result.value(); // Submit result spdlog::debug("Submitting result for task {}", boost::uuids::to_string(task.get_id())); spider::core::StorageErr err; @@ -336,9 +355,9 @@ auto handle_executor_result( } if (!err.success()) { spdlog::error("Submit task {} fails: {}", task.get_function_name(), err.description); - return false; + return spider::worker::WorkerErrorCodeEnum::StorageError; } - return true; + return ystdlib::error_handling::success(); } // NOLINTBEGIN(clang-analyzer-unix.BlockInCriticalSection) @@ -365,15 +384,16 @@ auto task_loop( spdlog::debug("Fetched task {}", boost::uuids::to_string(task_id)); // Fetch task detail from metadata storage spider::core::Task task{""}; - - std::optional> optional_arg_buffers + ystdlib::error_handling:: + Result, spider::worker::WorkerErrorCode> + arg_buffers_result = setup_task(storage_factory, metadata_store, instance, task); - if (!optional_arg_buffers.has_value()) { + if (!arg_buffers_result.has_value()) { spdlog::error("Failed to setup task {}", task.get_function_name()); fail_task_id = task.get_id(); continue; } - std::vector const& arg_buffers = optional_arg_buffers.value(); + std::vector const& arg_buffers = arg_buffers_result.value(); // Execute task spider::worker::TaskExecutor executor{ @@ -397,9 +417,9 @@ auto task_loop( context.run(); executor.wait(); - spider::core::ChildPid::set_pid(0); - - if (handle_executor_result(storage_factory, metadata_store, instance, task, executor)) { + if (handle_executor_result(storage_factory, metadata_store, instance, task, executor) + .has_value()) + { fail_task_id = std::nullopt; } else { fail_task_id = task.get_id();