Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/spider/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ set(SPIDER_WORKER_SOURCES
worker/ChildPid.cpp
worker/DllLoader.hpp
worker/DllLoader.cpp
worker/WorkerErrorCode.cpp
worker/WorkerErrorCode.hpp
worker/Process.hpp
worker/Process.cpp
worker/TaskExecutor.hpp
Expand Down
40 changes: 40 additions & 0 deletions src/spider/worker/WorkerErrorCode.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#include "WorkerErrorCode.hpp"

#include <string>
#include <string_view>

#include <ystdlib/error_handling/ErrorCode.hpp>

namespace spider::worker {
using WorkerErrorCategory = ystdlib::error_handling::ErrorCategory<WorkerErrorCodeEnum>;

constexpr std::string_view cWorkerErrorCategoryName = "Worker Error Code";
} // namespace spider::worker

template <>
auto spider::worker::WorkerErrorCategory::name() const noexcept -> char const* {
return spider::worker::cWorkerErrorCategoryName.data();
}

template <>
auto spider::worker::WorkerErrorCategory::message(spider::worker::WorkerErrorCodeEnum code) const
-> std::string {
switch (code) {
case spider::worker::WorkerErrorCodeEnum::Success:
return "Success";
case spider::worker::WorkerErrorCodeEnum::CmdLineArgumentInvalid:
return "Invalid command line argument";
case spider::worker::WorkerErrorCodeEnum::TaskArgumentInvalid:
return "Invalid task argument";
case spider::worker::WorkerErrorCodeEnum::TaskFailed:
return "Task failed";
case spider::worker::WorkerErrorCodeEnum::TaskOutputUnavailable:
return "Task output unavailable";
case spider::worker::WorkerErrorCodeEnum::TaskOutputInvalid:
return "Task output invalid";
case spider::worker::WorkerErrorCodeEnum::StorageError:
return "Storage error";
default:
return "Unknown error";
}
}
25 changes: 25 additions & 0 deletions src/spider/worker/WorkerErrorCode.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#ifndef SPIDER_WORKER_ERROR_CODE_HPP
#define SPIDER_WORKER_ERROR_CODE_HPP

#include <cstdint>

#include <ystdlib/error_handling/ErrorCode.hpp>

namespace spider::worker {
enum class WorkerErrorCodeEnum : uint8_t {
Success = 0,
CmdLineArgumentInvalid = 1,
TaskArgumentInvalid = 2,
TaskFailed = 3,
TaskOutputUnavailable = 4,
TaskOutputInvalid = 5,
// TODO: Move storage related errors to an ErrorCode in the storage namespace.
StorageError = 6,
};

using WorkerErrorCode = ystdlib::error_handling::ErrorCode<WorkerErrorCodeEnum>;
} // namespace spider::worker

YSTDLIB_ERROR_HANDLING_MARK_AS_ERROR_CODE_ENUM(spider::worker::WorkerErrorCodeEnum);

#endif
74 changes: 47 additions & 27 deletions src/spider/worker/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
#include <fmt/format.h>
#include <spdlog/sinks/stdout_color_sinks.h> // IWYU pragma: keep
#include <spdlog/spdlog.h>
// Temporarily disable quickcpplib to include valgrind headers to avoid
// conflicts with abseil valgrind headers, until the issue is resolved at
// https://github.com/y-scope/ystdlib-cpp/issues/59.
#undef QUICKCPPLIB_ENABLE_VALGRIND
#include <ystdlib/error_handling/Result.hpp>

#include <spider/core/Data.hpp>
#include <spider/core/Driver.hpp>
Expand All @@ -49,6 +54,7 @@
#include <spider/worker/ChildPid.hpp>
#include <spider/worker/TaskExecutor.hpp>
#include <spider/worker/WorkerClient.hpp>
#include <spider/worker/WorkerErrorCode.hpp>

constexpr int cCmdArgParseErr = 1;
constexpr int cSignalHandleErr = 2;
Expand Down Expand Up @@ -180,7 +186,7 @@ fetch_task(spider::worker::WorkerClient& client, std::optional<boost::uuids::uui
return std::nullopt;
}

/*
/**
* Sets up a task by fetching the task metadata from storage and creating argument buffers from task
* inputs.
*
Expand All @@ -189,44 +195,54 @@ fetch_task(spider::worker::WorkerClient& client, std::optional<boost::uuids::uui
* @param instance The task instance to set up.
* @param task Output parameter to store the fetched task details.
* @return A vector of buffers containing the serialized arguments of the task.
* @return std::nullopt if any failure occurs.
* @return WorkerErrorCode if any failure occurs.
*/
auto setup_task(
std::shared_ptr<spider::core::StorageFactory> const& storage_factory,
std::shared_ptr<spider::core::MetadataStorage> const& metadata_store,
spider::core::TaskInstance const& instance,
spider::core::Task& task
) -> std::optional<std::vector<msgpack::sbuffer>> {
) -> ystdlib::error_handling::
Result<std::vector<msgpack::sbuffer>, spider::worker::WorkerErrorCode> {
std::variant<std::unique_ptr<spider::core::StorageConnection>, spider::core::StorageErr>
conn_result = storage_factory->provide_storage_connection();
if (std::holds_alternative<spider::core::StorageErr>(conn_result)) {
spdlog::error(
"Failed to connect to storage: {}",
std::get<spider::core::StorageErr>(conn_result).description
);
return std::nullopt;
return spider::worker::WorkerErrorCodeEnum::StorageError;
}
std::unique_ptr<spider::core::StorageConnection> conn
= std::move(std::get<std::unique_ptr<spider::core::StorageConnection>>(conn_result));
// Get task details
spider::core::StorageErr const err = metadata_store->get_task(*conn, instance.task_id, &task);
if (!err.success()) {
spdlog::error("Failed to fetch task detail: {}", err.description);
return std::nullopt;
return spider::worker::WorkerErrorCodeEnum::StorageError;
}

std::optional<std::vector<msgpack::sbuffer>> optional_arg_buffers = task.get_arg_buffers();
if (!optional_arg_buffers.has_value()) {
spdlog::error("Failed to fetch task arguments");
metadata_store->task_fail(*conn, instance, fmt::format("Failed to fetch task arguments"));
return std::nullopt;
return spider::worker::WorkerErrorCodeEnum::TaskArgumentInvalid;
}
return optional_arg_buffers;
return std::move(optional_arg_buffers.value());
}

/**
* Parses the task outputs from the result buffers.
*
* @param task The task that was executed.
* @param result_buffers The result buffers containing the outputs.
* @return A vector of TaskOutput objects if parsing is successful.
* @return WorkerErrorCode::TaskOutputInvalid if any failure occurs.
*/
auto
parse_outputs(spider::core::Task const& task, std::vector<msgpack::sbuffer> const& result_buffers)
-> std::optional<std::vector<spider::core::TaskOutput>> {
-> ystdlib::error_handling::
Result<std::vector<spider::core::TaskOutput>, spider::worker::WorkerErrorCode> {
std::vector<spider::core::TaskOutput> outputs;
outputs.reserve(task.get_num_outputs());
for (size_t i = 0; i < task.get_num_outputs(); ++i) {
Expand All @@ -244,7 +260,7 @@ parse_outputs(spider::core::Task const& task, std::vector<msgpack::sbuffer> cons
"Task {} failed to parse result as data id",
task.get_function_name()
);
return std::nullopt;
return spider::worker::WorkerErrorCodeEnum::TaskOutputInvalid;
}
} else {
msgpack::sbuffer const& buffer = result_buffers[i];
Expand All @@ -263,23 +279,24 @@ parse_outputs(spider::core::Task const& task, std::vector<msgpack::sbuffer> cons
* @param instance Task instance that was executed.
* @param task The task that was executed.
* @param executor The executor that ran the task.
* @return true if results were successfully handled, false if any errors occurred.
* @return ystdlib::error_handling::success() if successful.
* @return WorkerErrorCode if any failure occurs.
*/
auto handle_executor_result(
std::shared_ptr<spider::core::StorageFactory> const& storage_factory,
std::shared_ptr<spider::core::MetadataStorage> const& metadata_store,
spider::core::TaskInstance const& instance,
spider::core::Task const& task,
spider::worker::TaskExecutor& executor
) -> bool {
) -> ystdlib::error_handling::Result<void, spider::worker::WorkerErrorCode> {
std::variant<std::unique_ptr<spider::core::StorageConnection>, spider::core::StorageErr>
conn_result = storage_factory->provide_storage_connection();
if (std::holds_alternative<spider::core::StorageErr>(conn_result)) {
spdlog::error(
"Failed to connect to storage: {}",
std::get<spider::core::StorageErr>(conn_result).description
);
return false;
return spider::worker::WorkerErrorCodeEnum::StorageError;
}
auto conn = std::move(std::get<std::unique_ptr<spider::core::StorageConnection>>(conn_result));

Expand All @@ -290,7 +307,7 @@ auto handle_executor_result(
instance,
fmt::format("Task {} failed", task.get_function_name())
);
return false;
return spider::worker::WorkerErrorCodeEnum::TaskFailed;
}

// Parse result
Expand All @@ -303,12 +320,14 @@ auto handle_executor_result(
instance,
fmt::format("Task {} failed to parse result into buffers", task.get_function_name())
);
return false;
return spider::worker::WorkerErrorCodeEnum::TaskOutputUnavailable;
}
std::vector<msgpack::sbuffer> const& result_buffers = optional_result_buffers.value();
std::optional<std::vector<spider::core::TaskOutput>> const optional_outputs
ystdlib::error_handling::Result<
std::vector<spider::core::TaskOutput>,
spider::worker::WorkerErrorCode> const output_result
= parse_outputs(task, result_buffers);
if (!optional_outputs.has_value()) {
if (!output_result.has_value()) {
metadata_store->task_fail(
*conn,
instance,
Expand All @@ -317,10 +336,10 @@ auto handle_executor_result(
task.get_function_name()
)
);
return false;
return output_result.error();
}

std::vector<spider::core::TaskOutput> const& outputs = optional_outputs.value();
std::vector<spider::core::TaskOutput> const& outputs = output_result.value();
// Submit result
spdlog::debug("Submitting result for task {}", boost::uuids::to_string(task.get_id()));
spider::core::StorageErr err;
Expand All @@ -336,9 +355,9 @@ auto handle_executor_result(
}
if (!err.success()) {
spdlog::error("Submit task {} fails: {}", task.get_function_name(), err.description);
return false;
return spider::worker::WorkerErrorCodeEnum::StorageError;
}
return true;
return ystdlib::error_handling::success();
}

// NOLINTBEGIN(clang-analyzer-unix.BlockInCriticalSection)
Expand All @@ -365,15 +384,16 @@ auto task_loop(
spdlog::debug("Fetched task {}", boost::uuids::to_string(task_id));
// Fetch task detail from metadata storage
spider::core::Task task{""};

std::optional<std::vector<msgpack::sbuffer>> optional_arg_buffers
ystdlib::error_handling::
Result<std::vector<msgpack::sbuffer>, spider::worker::WorkerErrorCode>
arg_buffers_result
= setup_task(storage_factory, metadata_store, instance, task);
if (!optional_arg_buffers.has_value()) {
if (!arg_buffers_result.has_value()) {
spdlog::error("Failed to setup task {}", task.get_function_name());
fail_task_id = task.get_id();
continue;
}
std::vector<msgpack::sbuffer> const& arg_buffers = optional_arg_buffers.value();
std::vector<msgpack::sbuffer> const& arg_buffers = arg_buffers_result.value();

// Execute task
spider::worker::TaskExecutor executor{
Expand All @@ -397,9 +417,9 @@ auto task_loop(
context.run();
executor.wait();

spider::core::ChildPid::set_pid(0);

if (handle_executor_result(storage_factory, metadata_store, instance, task, executor)) {
if (handle_executor_result(storage_factory, metadata_store, instance, task, executor)
.has_value())
{
fail_task_id = std::nullopt;
} else {
fail_task_id = task.get_id();
Expand Down