diff --git a/extra_plugins/output/clickhouse/CMakeLists.txt b/extra_plugins/output/clickhouse/CMakeLists.txt new file mode 100644 index 00000000..6498a9ec --- /dev/null +++ b/extra_plugins/output/clickhouse/CMakeLists.txt @@ -0,0 +1,87 @@ +cmake_minimum_required(VERSION 3.12) +project(ipfixcol2-clichouse-output) + +# Description of the project +set(CH_DESCRIPTION + "Output plugin for IPFIXcol2 that store flow records to ClickHouse database." +) + +set(CH_VERSION_MAJOR 1) +set(CH_VERSION_MINOR 0) +set(CH_VERSION_PATCH 0) +set(CH_VERSION + ${CH_VERSION_MAJOR}.${CH_VERSION_MINOR}.${CH_VERSION_PATCH}) + +include(CMakeModules/install_dirs.cmake) +include(CMakeModules/clickhouse-cpp.cmake) +include(CMakeModules/fmt.cmake) +include(CheckCCompilerFlag) +include(CheckCXXCompilerFlag) +# Include custom FindXXX modules +list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/CMakeModules") + +# Find IPFIXcol +find_package(IPFIXcol2 2.3.0 REQUIRED) + +# Set default build type if not specified by user +if (NOT CMAKE_BUILD_TYPE) + set (CMAKE_BUILD_TYPE Debug + CACHE STRING "Choose type of build (Release/Debug)." FORCE) +endif() + +option(ENABLE_DOC_MANPAGE "Enable manual page building" ON) + +set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_STANDARD_REQUIRED ON) +set(CMAKE_CXX_EXTENSIONS ON) + +# Hard coded definitions +set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fvisibility=hidden") +set(CMAKE_C_FLAGS_RELEASE "-O2 -DNDEBUG") +set(CMAKE_C_FLAGS_DEBUG "-g -O0 -Wall -Wextra -pedantic") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fvisibility=hidden") +set(CMAKE_CXX_FLAGS_RELEASE "-O2 -DNDEBUG") +set(CMAKE_CXX_FLAGS_DEBUG "-g -O0 -Wall -Wextra -pedantic") + +# Header files for source code building +include_directories( + "${IPFIXCOL2_INCLUDE_DIRS}" # IPFIXcol2 header files +) + +# Create a linkable module +add_library(clickhouse-output MODULE + src/common.cpp + src/config.cpp + src/datatype.cpp + src/main.cpp + src/plugin.cpp +) + +target_link_libraries(clickhouse-output PRIVATE clickhouse::client fmt::fmt) + +install( + TARGETS clickhouse-output + LIBRARY DESTINATION "${INSTALL_DIR_LIB}/ipfixcol2/" +) + +if (ENABLE_DOC_MANPAGE) + find_package(Rst2Man) + if (NOT RST2MAN_FOUND) + message(FATAL_ERROR "rst2man is not available. Install python-docutils or disable manual page generation (-DENABLE_DOC_MANPAGE=False)") + endif() + + # Build a manual page + set(SRC_FILE "${CMAKE_CURRENT_SOURCE_DIR}/doc/ipfixcol2-clickhouse-output.7.rst") + set(DST_FILE "${CMAKE_CURRENT_BINARY_DIR}/ipfixcol2-clickhouse-output.7") + + add_custom_command(TARGET clickhouse-output PRE_BUILD + COMMAND ${RST2MAN_EXECUTABLE} --syntax-highlight=none ${SRC_FILE} ${DST_FILE} + DEPENDS ${SRC_FILE} + VERBATIM + ) + + install( + FILES "${DST_FILE}" + DESTINATION "${INSTALL_DIR_MAN}/man7" + ) +endif() diff --git a/extra_plugins/output/clickhouse/CMakeModules/FindIPFIXcol2.cmake b/extra_plugins/output/clickhouse/CMakeModules/FindIPFIXcol2.cmake new file mode 100644 index 00000000..c42e0d7c --- /dev/null +++ b/extra_plugins/output/clickhouse/CMakeModules/FindIPFIXcol2.cmake @@ -0,0 +1,39 @@ +# IPFIXCOL2_FOUND - System has IPFIXcol +# IPFIXCOL2_INCLUDE_DIRS - The IPFIXcol include directories +# IPFIXCOL2_DEFINITIONS - Compiler switches required for using IPFIXcol + +# use pkg-config to get the directories and then use these values +# in the find_path() and find_library() calls +find_package(PkgConfig) +pkg_check_modules(PC_IPFIXCOL QUIET ipfixcol2) +set(IPFIXCOL2_DEFINITIONS ${PC_IPFIXCOL_CFLAGS_OTHER}) + +find_path( + IPFIXCOL2_INCLUDE_DIR ipfixcol2.h + HINTS ${PC_IPFIXCOL_INCLUDEDIR} ${PC_IPFIXCOL_INCLUDE_DIRS} + PATH_SUFFIXES include +) + +if (PC_IPFIXCOL_VERSION) + # Version extracted from pkg-config + set(IPFIXCOL_VERSION_STRING ${PC_IPFIXCOL_VERSION}) +elseif(IPFIXCOL2_INCLUDE_DIR AND EXISTS "${IPFIXCOL2_INCLUDE_DIR}/ipfixcol2/api.h") + # Try to extract library version from a header file + file(STRINGS "${IPFIXCOL2_INCLUDE_DIR}/ipfixcol2/api.h" ipfixcol_version_str + REGEX "^#define[\t ]+IPX_API_VERSION_STR[\t ]+\".*\"") + + string(REGEX REPLACE "^#define[\t ]+IPX_API_VERSION_STR[\t ]+\"([^\"]*)\".*" "\\1" + IPFIXCOL_VERSION_STRING "${ipfixcol_version_str}") + unset(ipfixcol_version_str) +endif() + +# handle the QUIETLY and REQUIRED arguments and set IPFIXCOL2_FOUND to TRUE +# if all listed variables are TRUE +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(IPFIXcol2 + REQUIRED_VARS IPFIXCOL2_INCLUDE_DIR + VERSION_VAR IPFIXCOL_VERSION_STRING +) + +set(IPFIXCOL2_INCLUDE_DIRS ${IPFIXCOL2_INCLUDE_DIR}) +mark_as_advanced(IPFIXCOL2_INCLUDE_DIR) diff --git a/extra_plugins/output/clickhouse/CMakeModules/FindRst2Man.cmake b/extra_plugins/output/clickhouse/CMakeModules/FindRst2Man.cmake new file mode 100644 index 00000000..df213bce --- /dev/null +++ b/extra_plugins/output/clickhouse/CMakeModules/FindRst2Man.cmake @@ -0,0 +1,30 @@ +# RST2MAN_FOUND - true if the program was found +# RST2MAN_VERSION - version of rst2man +# RST2MAN_EXECUTABLE - path to the rst2man program + +find_program(RST2MAN_EXECUTABLE + NAMES rst2man rst2man.py rst2man-3 rst2man-3.py + DOC "The Python Docutils generator of Unix Manpages from reStructuredText" +) + +if (RST2MAN_EXECUTABLE) + # Get the version string + execute_process( + COMMAND ${RST2MAN_EXECUTABLE} --version + OUTPUT_VARIABLE rst2man_version_str + ) + # Expected format: rst2man (Docutils 0.13.1 [release], Python 2.7.15, on linux2) + string(REGEX REPLACE "^rst2man[\t ]+\\(Docutils[\t ]+([^\t ]*).*" "\\1" + RST2MAN_VERSION "${rst2man_version_str}") + unset(rst2man_version_str) +endif() + +# handle the QUIETLY and REQUIRED arguments and set RST2MAN_FOUND to TRUE +# if all listed variables are set +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(Rst2Man + REQUIRED_VARS RST2MAN_EXECUTABLE + VERSION_VAR RST2MAN_VERSION +) + +mark_as_advanced(RST2MAN_EXECUTABLE RST2MAN_VERSION) diff --git a/extra_plugins/output/clickhouse/CMakeModules/clickhouse-cpp.cmake b/extra_plugins/output/clickhouse/CMakeModules/clickhouse-cpp.cmake new file mode 100644 index 00000000..01160347 --- /dev/null +++ b/extra_plugins/output/clickhouse/CMakeModules/clickhouse-cpp.cmake @@ -0,0 +1,19 @@ +# clickhouse-cpp library (C++ client for ClickHouse) +# +# The project consists of a library that can be independently +# added as dependency: +# - clickhouse::client + +include(FetchContent) +set(FETCHCONTENT_QUIET OFF) + +set(CMAKE_POSITION_INDEPENDENT_CODE ON) +FetchContent_Declare( + clickhouse + GIT_REPOSITORY "https://github.com/ClickHouse/clickhouse-cpp.git" + GIT_TAG "v2.5.1" + GIT_SHALLOW ON +) + +FetchContent_MakeAvailable(clickhouse) +add_library(clickhouse::client ALIAS clickhouse-cpp-lib) diff --git a/extra_plugins/output/clickhouse/CMakeModules/fmt.cmake b/extra_plugins/output/clickhouse/CMakeModules/fmt.cmake new file mode 100644 index 00000000..ed124713 --- /dev/null +++ b/extra_plugins/output/clickhouse/CMakeModules/fmt.cmake @@ -0,0 +1,18 @@ +# fmt library +# +# The project consists of a library that can be independently +# added as dependency: +# - fmt::fmt + +include(FetchContent) +set(FETCHCONTENT_QUIET OFF) + +set(CMAKE_POSITION_INDEPENDENT_CODE ON) +FetchContent_Declare( + fmt + GIT_REPOSITORY "https://github.com/fmtlib/fmt" + GIT_TAG "11.0.2" + GIT_SHALLOW ON +) + +FetchContent_MakeAvailable(fmt) diff --git a/extra_plugins/output/clickhouse/CMakeModules/install_dirs.cmake b/extra_plugins/output/clickhouse/CMakeModules/install_dirs.cmake new file mode 100644 index 00000000..6b540e08 --- /dev/null +++ b/extra_plugins/output/clickhouse/CMakeModules/install_dirs.cmake @@ -0,0 +1,43 @@ +# The purpose of this file is to automatically determine install directories +# +# If no directories are defined, use GNU install directories by default. +# However, in case of RPM build, install directories are typically passed +# to CMake as definitions that overwrites the default paths. +# + +include(GNUInstallDirs) + +# Binary directories +set(INSTALL_DIR_BIN ${CMAKE_INSTALL_FULL_BINDIR}) + +# Library directories +if (DEFINED LIB_INSTALL_DIR) + set(INSTALL_DIR_LIB ${LIB_INSTALL_DIR}) +else() + set(INSTALL_DIR_LIB ${CMAKE_INSTALL_FULL_LIBDIR}) +endif() + +# Include directories +if (DEFINED INCLUDE_INSTALL_DIR) + set(INSTALL_DIR_INCLUDE ${INCLUDE_INSTALL_DIR}) +else() + set(INSTALL_DIR_INCLUDE ${CMAKE_INSTALL_FULL_INCLUDEDIR}) +endif() + +# System configuration +if (DEFINED SYSCONF_INSTALL_DIR) + set(INSTALL_DIR_SYSCONF ${SYSCONF_INSTALL_DIR}) +else() + set(INSTALL_DIR_SYSCONF ${CMAKE_INSTALL_FULL_SYSCONFDIR}) +endif() + +# Share files (architecture independend data) +if (DEFINED SHARE_INSTALL_PREFIX) + set(INSTALL_DIR_SHARE ${SHARE_INSTALL_PREFIX}) +else() + set(INSTALL_DIR_SHARE ${CMAKE_INSTALL_FULL_DATAROOTDIR}) +endif() + +set(INSTALL_DIR_INFO "${INSTALL_DIR_SHARE}/info/") +set(INSTALL_DIR_MAN "${INSTALL_DIR_SHARE}/man/") +set(INSTALL_DIR_DOC "${INSTALL_DIR_SHARE}/doc/${CMAKE_PROJECT_NAME}/") diff --git a/extra_plugins/output/clickhouse/doc/ipfixcol2-clickhouse-output.7.rst b/extra_plugins/output/clickhouse/doc/ipfixcol2-clickhouse-output.7.rst new file mode 100644 index 00000000..8b999cc8 --- /dev/null +++ b/extra_plugins/output/clickhouse/doc/ipfixcol2-clickhouse-output.7.rst @@ -0,0 +1,24 @@ +============================= + ipfixcol2-clickhouse-output +============================= + +-------------------------- +ClickHouse (output plugin) +-------------------------- + +:Author: Michal Sedlak (sedlakm@cesnet.cz) +:Date: 2024-11-04 +:Copyright: Copyright © 2024 CESNET, z.s.p.o. +:Version: 1.0 +:Manual section: 7 +:Manual group: IPFIXcol collector + +Description +----------- + +.. .. include:: ../README.rst +.. :start-line: 3 +.. :end-before: How to build +.. +.. .. include:: ../README.rst +.. :start-after: make install diff --git a/extra_plugins/output/clickhouse/src/common.cpp b/extra_plugins/output/clickhouse/src/common.cpp new file mode 100644 index 00000000..7f5c2349 --- /dev/null +++ b/extra_plugins/output/clickhouse/src/common.cpp @@ -0,0 +1,63 @@ +/** + * @file + * @author Michal Sedlak + * @brief + * @date 2024 + * + * Copyright(c) 2024 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "common.h" + +#include + +void print_block(const clickhouse::Block& block) { + std::cout << "================================================================================\n"; + for (size_t i = 0; i < block.GetColumnCount(); i++) { + std::cout << block.GetColumnName(i) << ":" << block[i]->GetType().GetName() << (i < block.GetColumnCount() - 1 ? '\t' : '\n'); + } + std::cout << "--------------------------------------------------------------------------------\n"; + + for (size_t i = 0; i < block.GetRowCount(); i++) { + for (size_t j = 0; j < block.GetColumnCount(); j++) { + switch (block[j]->GetType().GetCode()) { + case clickhouse::Type::Int8: + std::cout << +block[j]->As()->At(i); + break; + case clickhouse::Type::Int16: + std::cout << block[j]->As()->At(i); + break; + case clickhouse::Type::Int32: + std::cout << block[j]->As()->At(i); + break; + case clickhouse::Type::Int64: + std::cout << block[j]->As()->At(i); + break; + case clickhouse::Type::Int128: + std::cout << block[j]->As()->At(i); + break; + case clickhouse::Type::UInt8: + std::cout << +block[j]->As()->At(i); + break; + case clickhouse::Type::UInt16: + std::cout << block[j]->As()->At(i); + break; + case clickhouse::Type::UInt32: + std::cout << block[j]->As()->At(i); + break; + case clickhouse::Type::UInt64: + std::cout << block[j]->As()->At(i); + break; + case clickhouse::Type::String: + std::cout << block[j]->As()->At(i); + break; + default: + std::cout << "-"; + break; + } + std::cout << (j < block.GetColumnCount() - 1 ? '\t' : '\n'); + } + } + std::cout << "================================================================================\n\n"; +}; diff --git a/extra_plugins/output/clickhouse/src/common.h b/extra_plugins/output/clickhouse/src/common.h new file mode 100644 index 00000000..1dee10df --- /dev/null +++ b/extra_plugins/output/clickhouse/src/common.h @@ -0,0 +1,90 @@ +#pragma once + +#include +#include +#include + +#include +#include +#include +#include + +struct Nonmoveable { + Nonmoveable() = default; + Nonmoveable(Nonmoveable&&) = delete; + Nonmoveable& operator=(Nonmoveable&&) = delete; +}; + +struct Noncopyable { + Noncopyable() = default; + Noncopyable(const Noncopyable&) = delete; + Noncopyable& operator=(const Noncopyable&) = delete; +}; + +class Error : public std::runtime_error { +public: + template + Error(Args&& ...args) : std::runtime_error(fmt::format(args...)) {} +}; + +class Stats { +public: + Stats() + { + reset(); + } + + void reset() + { + m_records_processed = 0; + m_start_time = std::time(nullptr); + } + + void add_records(uint64_t num_records) + { + m_records_processed += num_records; + } + + void print() + { + uint64_t secs_diff = std::time(nullptr) - m_start_time; + double recs_per_sec = double(m_records_processed) / secs_diff; + fmt::println("Avg recs per sec: {:.2f}", recs_per_sec); + } + +private: + uint64_t m_records_processed; + std::time_t m_start_time; +}; + +template +class SyncQueue { +public: + void put(Item item) + { + std::lock_guard lock(m_mutex); + m_items.push(item); + m_avail_cv.notify_all(); + } + + Item get() + { + std::unique_lock lock(m_mutex); + while (true) { + if (!m_items.empty()) { + auto item = m_items.front(); + m_items.pop(); + return item; + } + m_avail_cv.wait(lock); + } + } + +private: + std::queue m_items; + std::mutex m_mutex; + std::condition_variable m_avail_cv; +}; + + +void print_block(const clickhouse::Block& block); diff --git a/extra_plugins/output/clickhouse/src/config.cpp b/extra_plugins/output/clickhouse/src/config.cpp new file mode 100644 index 00000000..f160ece1 --- /dev/null +++ b/extra_plugins/output/clickhouse/src/config.cpp @@ -0,0 +1,122 @@ +/** + * @file + * @author Michal Sedlak + * @brief + * @date 2024 + * + * Copyright(c) 2024 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "config.h" + +#include +#include +#include + +namespace args { + +enum { + HOST, + PORT, + USER, + PASSWORD, + DATABASE, + TABLE, + FIELDS, + FIELD, +}; + + +static const struct fds_xml_args fields[] = { + FDS_OPTS_ELEM(FIELD, "field", FDS_OPTS_T_STRING, FDS_OPTS_P_MULTI), + FDS_OPTS_END, +}; + +static const struct fds_xml_args root[] = { + FDS_OPTS_ROOT ("params"), + FDS_OPTS_ELEM (HOST, "host", FDS_OPTS_T_STRING, 0), + FDS_OPTS_ELEM (PORT, "port", FDS_OPTS_T_UINT, 0), + FDS_OPTS_ELEM (USER, "user", FDS_OPTS_T_STRING, 0), + FDS_OPTS_ELEM (PASSWORD, "password", FDS_OPTS_T_STRING, 0), + FDS_OPTS_ELEM (DATABASE, "database", FDS_OPTS_T_STRING, 0), + FDS_OPTS_ELEM (TABLE, "table", FDS_OPTS_T_STRING, 0), + FDS_OPTS_NESTED(FIELDS, "fields", fields, 0), + FDS_OPTS_END, +}; + +} + +static void parse_fields(fds_xml_ctx_t *fields_ctx, const fds_iemgr_t *iemgr, Config &config) +{ + const fds_xml_cont *content; + + while (fds_xml_next(fields_ctx, &content) == FDS_OK) { + const fds_iemgr_elem *elem = fds_iemgr_elem_find_name(iemgr, content->ptr_string); + const fds_iemgr_alias *alias = fds_iemgr_alias_find(iemgr, content->ptr_string); + if (!elem && !alias) { + throw std::runtime_error("IPFIX element with name \"" + std::string(content->ptr_string) + "\" not found"); + } else if (elem) { + config.fields.emplace_back(elem); + } else if (alias) { + config.fields.emplace_back(alias); + } + } +} + +static void parse_root(fds_xml_ctx_t *root_ctx, const fds_iemgr_t *iemgr, Config &config) +{ + const fds_xml_cont *content; + + while (fds_xml_next(root_ctx, &content) == FDS_OK) { + if (content->id == args::USER) { + config.user = content->ptr_string; + + } else if (content->id == args::PASSWORD) { + config.password = content->ptr_string; + + } else if (content->id == args::HOST) { + config.host = content->ptr_string; + + } else if (content->id == args::PORT) { + if (content->val_uint > std::numeric_limits::max()) { + throw std::runtime_error(std::to_string(content->val_uint) + " is not a valid port number"); + } + config.port = content->val_uint; + + } else if (content->id == args::DATABASE) { + config.database = content->ptr_string; + + } else if (content->id == args::TABLE) { + config.table = content->ptr_string; + + } else if (content->id == args::FIELDS) { + parse_fields(content->ptr_ctx, iemgr, config); + } + } +} + +Config parse_config(const char *xml_string, const fds_iemgr_t *iemgr) +{ + Config config{}; + + std::unique_ptr parser(fds_xml_create(), &fds_xml_destroy); + if (!parser) { + throw std::runtime_error("Failed to create an XML parser!"); + } + + if (fds_xml_set_args(parser.get(), args::root) != FDS_OK) { + std::string err = fds_xml_last_err(parser.get()); + throw std::runtime_error("Failed to parse the description of an XML document: " + err); + } + + fds_xml_ctx_t *root_ctx = fds_xml_parse_mem(parser.get(), xml_string, true); + if (!root_ctx) { + std::string err = fds_xml_last_err(parser.get()); + throw std::runtime_error("Failed to parse the configuration: " + err); + } + + parse_root(root_ctx, iemgr, config); + + return config; +} diff --git a/extra_plugins/output/clickhouse/src/config.h b/extra_plugins/output/clickhouse/src/config.h new file mode 100644 index 00000000..759d3813 --- /dev/null +++ b/extra_plugins/output/clickhouse/src/config.h @@ -0,0 +1,35 @@ +/** + * @file + * @author Michal Sedlak + * @brief + * @date 2024 + * + * Copyright(c) 2024 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ + +#pragma once + +#include + +#include +#include +#include +#include + +using VectorOfElemOrAlias = std::vector>; + +struct Config { + std::string host; + uint16_t port; + std::string user; + std::string password; + std::string database; + std::string table; + VectorOfElemOrAlias fields; + unsigned int num_inserter_threads = 32; + unsigned int num_blocks = 256; + unsigned int block_insert_threshold = 100000; +}; + +Config parse_config(const char *xml, const fds_iemgr_t *iemgr); diff --git a/extra_plugins/output/clickhouse/src/datatype.cpp b/extra_plugins/output/clickhouse/src/datatype.cpp new file mode 100644 index 00000000..b7318066 --- /dev/null +++ b/extra_plugins/output/clickhouse/src/datatype.cpp @@ -0,0 +1,246 @@ +/** + * @file + * @author Michal Sedlak + * @brief + * @date 2024 + * + * Copyright(c) 2024 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "datatype.h" +#include "common.h" + +DataType type_from_ipfix(fds_iemgr_element_type type) +{ + switch (type) { + case FDS_ET_STRING: return DataType::String; + case FDS_ET_SIGNED_8: return DataType::Int8; + case FDS_ET_SIGNED_16: return DataType::Int16; + case FDS_ET_SIGNED_32: return DataType::Int32; + case FDS_ET_SIGNED_64: return DataType::Int64; + case FDS_ET_UNSIGNED_8: return DataType::UInt8; + case FDS_ET_UNSIGNED_16: return DataType::UInt16; + case FDS_ET_UNSIGNED_32: return DataType::UInt32; + case FDS_ET_UNSIGNED_64: return DataType::UInt64; + case FDS_ET_IPV4_ADDRESS: return DataType::IPv4; + case FDS_ET_IPV6_ADDRESS: return DataType::IPv6; + case FDS_ET_DATE_TIME_SECONDS: return DataType::DatetimeSecs; + case FDS_ET_DATE_TIME_MILLISECONDS: return DataType::DatetimeMillisecs; + case FDS_ET_DATE_TIME_MICROSECONDS: return DataType::DatetimeMicrosecs; + case FDS_ET_DATE_TIME_NANOSECONDS: return DataType::DatetimeNanosecs; + default: throw Error("Unsupported IPFIX data type {}", type); + } +} + + +std::string type_to_clickhouse(DataType type) +{ + switch (type) { + case DataType::Int8: return "Int8"; + case DataType::Int16: return "Int16"; + case DataType::Int32: return "Int32"; + case DataType::Int64: return "Int64"; + case DataType::UInt8: return "UInt8"; + case DataType::UInt16: return "UInt16"; + case DataType::UInt32: return "UInt32"; + case DataType::UInt64: return "UInt64"; + case DataType::IP: return "IPv6"; + case DataType::IPv4: return "IPv4"; + case DataType::IPv6: return "IPv6"; + case DataType::String: return "String"; + case DataType::DatetimeNanosecs: return "DateTime64(9)"; + case DataType::DatetimeMicrosecs: return "DateTime64(6)"; + case DataType::DatetimeMillisecs: return "DateTime64(3)"; + case DataType::DatetimeSecs: return "Datetime"; + case DataType::Invalid: throw Error("Cannot convert {} to Clickhouse type", type); + } +} + +static DataType unify_type(DataType a, DataType b) +{ + auto is_int = [](DataType t) { return t == DataType::Int8 || t == DataType::Int16 || t == DataType::Int32 || t == DataType::Int64; }; + auto is_uint = [](DataType t) { return t == DataType::UInt8 || t == DataType::UInt16 || t == DataType::UInt32 || t == DataType::UInt64; }; + auto is_ip = [](DataType t) { return t == DataType::IPv4 || t == DataType::IPv6 || t == DataType::IP; }; + + if (a == b) { + return a; + } + if (is_int(a) && is_int(b)) { + return std::max(a, b); + } + if (is_uint(a) && is_uint(b)) { + return std::max(a, b); + } + if (is_ip(a) && is_ip(b)) { + return DataType::IP; + } + + throw Error("Cannot unify types {} and {}", a, b); +} + +DataType find_common_type(const fds_iemgr_alias &alias) +{ + if (alias.sources_cnt == 0) { + throw Error("Alias \"{}\" has no sources", alias.name); + } + + DataType common_type = type_from_ipfix(alias.sources[0]->data_type); + for (size_t i = 1; i < alias.sources_cnt; i++) { + common_type = unify_type(common_type, type_from_ipfix(alias.sources[i]->data_type)); + } + return common_type; +} + + +template +static void write_uint(std::optional field, clickhouse::Column &column) +{ + uint64_t value = 0; + if (field) { + int ret = fds_get_uint_be(field->data, field->size, &value); + if (ret != FDS_OK) { + throw Error("fds_get_uint_be() has failed: {}", ret); + } + } + dynamic_cast(&column)->Append(static_cast(value)); +} + +template +static void write_int(std::optional field, clickhouse::Column &column) +{ + int64_t value = 0; + if (field) { + int ret = fds_get_int_be(field->data, field->size, &value); + if (ret != FDS_OK) { + throw Error("fds_get_int_be() has failed: {}", ret); + } + } + dynamic_cast(&column)->Append(static_cast(value)); +} + +static void write_ipv4(std::optional field, clickhouse::Column &column) +{ + return write_uint(field, column); +} + +static void write_ipv6(std::optional field, clickhouse::Column &column) +{ + in6_addr value; + memset(&value, 0, sizeof(value)); + if (field) { + int ret = fds_get_ip(field->data, field->size, &value); + if (ret != FDS_OK) { + throw Error("fds_get_ip() has failed: {}", ret); + } + } + dynamic_cast(&column)->Append(value); +} + +static void write_ip(std::optional field, clickhouse::Column &column) +{ + in6_addr value; + memset(&value, 0, sizeof(value)); + if (field) { + int ret; + if (field->size == 4) { + value.__in6_u.__u6_addr32[0] = 0; + value.__in6_u.__u6_addr32[1] = 0; + value.__in6_u.__u6_addr32[2] = 0; + ret = fds_get_ip(field->data, field->size, &value.__in6_u.__u6_addr32[3]); + } else /* if (field->size == 6) */ { + ret = fds_get_ip(field->data, field->size, &value); + } + if (ret != FDS_OK) { + throw Error("fds_get_ip() has failed: {}", ret); + } + } + dynamic_cast(&column)->Append(value); +} + +static void write_string(std::optional field, clickhouse::Column &column) +{ + std::string value; + if (field) { + value.resize(field->size); + int ret = fds_get_string(field->data, field->size, &value[0]); + if (ret != FDS_OK) { + throw Error("fds_get_string() has failed: {}", ret); + } + } + dynamic_cast(&column)->Append(value); +} + +static void write_datetime(std::optional field, clickhouse::Column &column) +{ + uint64_t value = 0; + if (field) { + int ret = fds_get_datetime_lp_be(field->data, field->size, field->info->def->data_type, &value); + if (ret != FDS_OK) { + throw Error("fds_get_datetime_lp_be() has failed: {}", ret); + } + value /= 1000; + } + dynamic_cast(&column)->Append(value); +} + +template +static void write_datetime64(std::optional field, clickhouse::Column &column) +{ + int64_t value = 0; + timespec ts; + if (field) { + int ret = fds_get_datetime_hp_be(field->data, field->size, field->info->def->data_type, &ts); + if (ret != FDS_OK) { + throw Error("fds_get_datetime_hp_be() has failed: {}", ret); + } + value = (static_cast(ts.tv_sec) * 1'000'000'000 + static_cast(ts.tv_nsec)) / Divisor; + } + dynamic_cast(&column)->Append(value); +} + +WriterFn make_writer(DataType type) +{ + switch (type) { + case DataType::Invalid: throw Error("Invalid data type"); + case DataType::UInt8: return &write_uint; + case DataType::UInt16: return &write_uint; + case DataType::UInt32: return &write_uint; + case DataType::UInt64: return &write_uint; + case DataType::Int8: return &write_int; + case DataType::Int16: return &write_int; + case DataType::Int32: return &write_int; + case DataType::Int64: return &write_int; + case DataType::String: return &write_string; + case DataType::DatetimeMillisecs: return &write_datetime64<1'000'000>; + case DataType::DatetimeMicrosecs: return &write_datetime64<1'000>; + case DataType::DatetimeNanosecs: return &write_datetime64<1>; + case DataType::DatetimeSecs: return &write_datetime; + case DataType::IPv4: return &write_ipv4; + case DataType::IPv6: return &write_ipv6; + case DataType::IP: return &write_ip; + } +} + +std::shared_ptr make_column(DataType type) +{ + switch (type) { + case DataType::Invalid: throw Error("Invalid data type"); + case DataType::UInt8: return std::make_shared(); + case DataType::UInt16: return std::make_shared(); + case DataType::UInt32: return std::make_shared(); + case DataType::UInt64: return std::make_shared(); + case DataType::Int8: return std::make_shared(); + case DataType::Int16: return std::make_shared(); + case DataType::Int32: return std::make_shared(); + case DataType::Int64: return std::make_shared(); + case DataType::String: return std::make_shared(); + case DataType::DatetimeMillisecs: return std::make_shared(3); + case DataType::DatetimeMicrosecs: return std::make_shared(6); + case DataType::DatetimeNanosecs: return std::make_shared(9); + case DataType::DatetimeSecs: return std::make_shared(); + case DataType::IPv4: return std::make_shared(); + case DataType::IPv6: return std::make_shared(); + case DataType::IP: return std::make_shared(); + } +} diff --git a/extra_plugins/output/clickhouse/src/datatype.h b/extra_plugins/output/clickhouse/src/datatype.h new file mode 100644 index 00000000..961393f3 --- /dev/null +++ b/extra_plugins/output/clickhouse/src/datatype.h @@ -0,0 +1,152 @@ +/** + * @file + * @author Michal Sedlak + * @brief + * @date 2024 + * + * Copyright(c) 2024 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ + +#pragma once + +#include +#include +#include +#include +#include + + +/** An intermediary data type used for conversions between IPFIX and Clickhouse types */ +enum class DataType { + Invalid, + + Int8, + Int16, + Int32, + Int64, + + UInt8, + UInt16, + UInt32, + UInt64, + + IP, // IP address regardless of version + IPv4, + IPv6, + + String, + + DatetimeNanosecs, + DatetimeMicrosecs, + DatetimeMillisecs, + DatetimeSecs, +}; + +/** Formatter for the DataType enum */ +template <> struct fmt::formatter : formatter { + auto format(DataType type, format_context& ctx) const -> format_context::iterator { + const std::string_view name = [=]() { + switch (type) { + case DataType::Invalid: return "Invalid"; + case DataType::Int8: return "Int8"; + case DataType::Int16: return "Int16"; + case DataType::Int32: return "Int32"; + case DataType::Int64: return "Int64"; + case DataType::UInt8: return "UInt8"; + case DataType::UInt16: return "UInt16"; + case DataType::UInt32: return "UInt32"; + case DataType::UInt64: return "UInt64"; + case DataType::IP: return "IP"; + case DataType::IPv4: return "IPv4"; + case DataType::IPv6: return "IPv6"; + case DataType::String: return "String"; + case DataType::DatetimeNanosecs: return "DatetimeNanosecs"; + case DataType::DatetimeMicrosecs: return "DatetimeMicrosecs"; + case DataType::DatetimeMillisecs: return "DatetimeMillisecs"; + case DataType::DatetimeSecs: return "DatetimeSecs"; + } + }(); + return formatter::format(name, ctx); + } +}; + +/** Formatter for the iemgr element type */ +template <> struct fmt::formatter : formatter { + auto format(fds_iemgr_element_type type, format_context& ctx) const -> format_context::iterator { + const std::string_view name = [=]() { + switch (type) { + case FDS_ET_OCTET_ARRAY: return "OCTET_ARRAY"; + case FDS_ET_UNSIGNED_8: return "UNSIGNED_8"; + case FDS_ET_UNSIGNED_16: return "UNSIGNED_16"; + case FDS_ET_UNSIGNED_32: return "UNSIGNED_32"; + case FDS_ET_UNSIGNED_64: return "UNSIGNED_64"; + case FDS_ET_SIGNED_8: return "SIGNED_8"; + case FDS_ET_SIGNED_16: return "SIGNED_16"; + case FDS_ET_SIGNED_32: return "SIGNED_32"; + case FDS_ET_SIGNED_64: return "SIGNED_64"; + case FDS_ET_FLOAT_32: return "FLOAT_32"; + case FDS_ET_FLOAT_64: return "FLOAT_64"; + case FDS_ET_BOOLEAN: return "BOOLEAN"; + case FDS_ET_MAC_ADDRESS: return "MAC_ADDRESS"; + case FDS_ET_STRING: return "STRING"; + case FDS_ET_DATE_TIME_SECONDS: return "DATE_TIME_SECONDS"; + case FDS_ET_DATE_TIME_MILLISECONDS: return "DATE_TIME_MILLISECONDS"; + case FDS_ET_DATE_TIME_MICROSECONDS: return "DATE_TIME_MICROSECONDS"; + case FDS_ET_DATE_TIME_NANOSECONDS: return "DATE_TIME_NANOSECONDS"; + case FDS_ET_IPV4_ADDRESS: return "IPV4_ADDRESS"; + case FDS_ET_IPV6_ADDRESS: return "IPV6_ADDRESS"; + case FDS_ET_BASIC_LIST: return "BASIC_LIST"; + case FDS_ET_SUB_TEMPLATE_LIST: return "SUB_TEMPLATE_LIST"; + case FDS_ET_SUB_TEMPLATE_MULTILIST: return "SUB_TEMPLATE_MULTILIST"; + case FDS_ET_UNASSIGNED: return "UNASSIGNED"; + } + }(); + return formatter::format(name, ctx); + } +}; + +/** + * @brief Get intermediary data type for a corresponding IPFIX element type + * + * @param type The IPFIX type + * @return The intermediary data type + */ +DataType type_from_ipfix(fds_iemgr_element_type type); + +/** + * @brief Get Clickhouse data type for the intermediary data type + * + * @param type The intermediary data type + * @return The Clickhouse data type + */ +std::string type_to_clickhouse(DataType type); + +/** + * @brief Find an intermediary data type that can be used to store all the possible data types of the alias + * + * @param alias The iemgr alias + * @return The common data type + */ +DataType find_common_type(const fds_iemgr_alias &alias); + + +using WriterFn = std::function field, clickhouse::Column &column)>; + +/** + * @brief Make a writer function that is able to write an IPFIX field of the + * provided data type into a Clickhouse column of a corresponding data type + * + * @param type The data type + * @return The writer function + */ +WriterFn make_writer(DataType type); + +/** + * @brief Make a column factory function that is able to create Clickhouse + * columns that are able to store values of the supplied data type + * + * @param type The data type + * @return The Clickhouse column object + */ +std::shared_ptr make_column(DataType type); diff --git a/extra_plugins/output/clickhouse/src/main.cpp b/extra_plugins/output/clickhouse/src/main.cpp new file mode 100644 index 00000000..02fe9f1c --- /dev/null +++ b/extra_plugins/output/clickhouse/src/main.cpp @@ -0,0 +1,79 @@ +/** + * @file + * @author Michal Sedlak + * @brief + * @date 2024 + * + * Copyright(c) 2024 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "plugin.h" + +#include +#include + +/** Plugin description */ +IPX_API struct ipx_plugin_info ipx_plugin_info = { + // Plugin identification name + "clickhouse", + // Brief description of plugin + "Output plugin that store flow records to ClickHouse database.", + // Plugin type + IPX_PT_OUTPUT, + // Configuration flags (reserved for future use) + 0, + // Plugin version string (like "1.2.3") + "1.0.0", + // Minimal IPFIXcol version string (like "1.2.3") + "2.3.0" +}; + +int +ipx_plugin_init(ipx_ctx_t *ctx, const char *xml_config) +{ + std::unique_ptr plugin; + try { + plugin = std::make_unique(ctx, xml_config); + } catch (const std::exception &ex) { + IPX_CTX_ERROR(ctx, "An unexpected exception has occured: %s", ex.what()); + return IPX_ERR_DENIED; + } catch (...) { + IPX_CTX_ERROR(ctx, "An unexpected exception has occured."); + return IPX_ERR_DENIED; + } + ipx_ctx_private_set(ctx, plugin.release()); + return IPX_OK; +} + +void +ipx_plugin_destroy(ipx_ctx_t *ctx, void *priv) +{ + (void) ctx; + Plugin *plugin = reinterpret_cast(priv); + try { + plugin->stop(); + } catch (const std::exception &ex) { + IPX_CTX_ERROR(ctx, "An unexpected exception has occured: %s", ex.what()); + } catch (...) { + IPX_CTX_ERROR(ctx, "An unexpected exception has occured."); + } + delete plugin; +} + +int +ipx_plugin_process(ipx_ctx_t *ctx, void *priv, ipx_msg_t *msg) +{ + Plugin *plugin = reinterpret_cast(priv); + try { + plugin->process(msg); + } catch (const std::exception &ex) { + IPX_CTX_ERROR(ctx, "An unexpected exception has occured: %s", ex.what()); + return IPX_ERR_DENIED; + } catch (...) { + IPX_CTX_ERROR(ctx, "An unexpected exception has occured."); + return IPX_ERR_DENIED; + } + + return IPX_OK; +} diff --git a/extra_plugins/output/clickhouse/src/plugin.cpp b/extra_plugins/output/clickhouse/src/plugin.cpp new file mode 100644 index 00000000..bb4491dd --- /dev/null +++ b/extra_plugins/output/clickhouse/src/plugin.cpp @@ -0,0 +1,299 @@ +/** + * @file + * @author Michal Sedlak + * @brief + * @date 2024 + * + * Copyright(c) 2024 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "plugin.h" +#include "datatype.h" + +#include +#include +#include + +static constexpr int ERR_TABLE_NOT_FOUND = 60; + +using ExtractorFn = std::function (fds_drec &drec)>; + +static ExtractorFn make_extractor(const fds_iemgr_elem &elem) +{ + return [=](fds_drec &drec) -> std::optional { + uint32_t pen = elem.scope->pen; + uint16_t id = elem.id; + fds_drec_iter iter; + fds_drec_iter_init(&iter, &drec, 0); + int ret = fds_drec_iter_find(&iter, pen, id); + if (ret == FDS_EOC) { + return {}; + } + return iter.field; + }; +} + +static ExtractorFn make_extractor(const fds_iemgr_alias &alias) +{ + return [=](fds_drec &drec) -> std::optional { + fds_drec_iter iter; + for (size_t i = 0; i < alias.sources_cnt; i++) { + uint32_t pen = alias.sources[i]->scope->pen; + uint16_t id = alias.sources[i]->id; + fds_drec_iter_init(&iter, &drec, 0); + int ret = fds_drec_iter_find(&iter, pen, id); + if (ret != FDS_EOC) { + return iter.field; + } + } + return {}; + }; +} +static std::vector prepare_fields(VectorOfElemOrAlias &fields_cfg) +{ + std::vector fields; + + for (const auto &field_cfg : fields_cfg) { + DataType type; + ExtractorFn extractor; + std::string name; + + if (std::holds_alternative(field_cfg)) { + const fds_iemgr_elem *elem = std::get(field_cfg); + name = elem->name; + type = type_from_ipfix(elem->data_type); + extractor = make_extractor(*elem); + + } else /* if (std::holds_alternative(field_cfg)) */ { + const fds_iemgr_alias *alias = std::get(field_cfg); + name = alias->name; + type = find_common_type(*alias); + extractor = make_extractor(*alias); + } + + WriterFn writer = make_writer(type); + + FieldCtx field; + field.name = name; + field.type = type_to_clickhouse(type); + field.handler = [=](fds_drec &drec, clickhouse::Column &column) { + std::optional field = extractor(drec); + writer(field, column); + }; + field.column_factory = [=]() { + return make_column(type); + }; + fields.emplace_back(std::move(field)); + } + + return fields; +} + +void check_table_exists_and_matches_fields(clickhouse::Client &client, const std::string &table, const std::vector &expected_table_fields) +{ + std::vector> field_name_and_type; + try { + client.Select("DESCRIBE TABLE " + table, [&](const clickhouse::Block &block) { + if (block.GetColumnCount() > 0 && block.GetRowCount() > 0) { + print_block(block); + const auto &name = block[0]->As(); + const auto &type = block[1]->As(); + for (size_t i = 0; i < block.GetRowCount(); i++) { + field_name_and_type.emplace_back(name->At(i), type->At(i)); + } + } + }); + } catch (const clickhouse::ServerException &exc) { + if (exc.GetCode() == ERR_TABLE_NOT_FOUND) { + throw Error("Table \"{}\" does not exist", table); + } else { + throw; + } + } + + if (field_name_and_type.size() != expected_table_fields.size()) { + throw Error("Config has {} fields but table \"{}\" has {}", expected_table_fields.size(), table, field_name_and_type.size()); + } + + for (size_t i = 0; i < expected_table_fields.size(); i++) { + const auto &expected_name = expected_table_fields[i].name; + const auto &expected_type = expected_table_fields[i].type; + const auto &[actual_name, actual_type] = field_name_and_type[i]; + + if (expected_name != actual_name) { + throw Error("Expected field #{} in table \"{}\" to be named \"{}\" but it is \"{}\"", i, table, expected_name, actual_name); + } + if (expected_type != actual_type) { + throw Error("Expected field #{} in table \"{}\" to be of type \"{}\" but it is \"{}\"", i, table, expected_type, actual_type); + } + } +} + +Inserter::Inserter(std::unique_ptr client, const std::string &table, SyncQueue &filled_blocks, SyncQueue &empty_blocks) + : m_client(std::move(client)) + , m_table(table) + , m_filled_blocks(filled_blocks) + , m_empty_blocks(empty_blocks) +{ +} + +void Inserter::start() +{ + m_thread = std::thread([this]() { + try { + run(); + } catch (...) { + m_exception = std::current_exception(); + m_errored = true; + } + }); +} + +void Inserter::run() { + while (!m_stop_signal) { + BlockCtx *block = m_filled_blocks.get(); + if (!block) { + continue; + } + + block->block.RefreshRowCount(); + m_client->Insert(m_table, block->block); + + for (auto &column : block->columns) { + column->Clear(); + } + block->rows = 0; + m_empty_blocks.put(block); + } +} + +void Inserter::stop() +{ + m_stop_signal = true; +} + +void Inserter::join() +{ + m_thread.join(); +} + +void Inserter::check_error() +{ + if (m_errored) { + std::rethrow_exception(m_exception); + } +} + +Plugin::Plugin(ipx_ctx_t *ctx, const char *xml_config) + : m_ctx(ctx) +{ + m_config = parse_config(xml_config, ipx_ctx_iemgr_get(ctx)); + + m_client = std::make_unique( + clickhouse::ClientOptions() + .SetHost(m_config.host) + .SetPort(m_config.port) + .SetUser(m_config.user) + .SetPassword(m_config.password) + .SetDefaultDatabase(m_config.database) + ); + + m_fields = prepare_fields(m_config.fields); + check_table_exists_and_matches_fields(*m_client.get(), m_config.table, m_fields); + + // Prepare blocks + IPX_CTX_INFO(m_ctx, "Preparing %d blocks", m_config.num_blocks); + for (unsigned int i = 0; i < m_config.num_blocks; i++) { + m_blocks.emplace_back(std::make_unique()); + BlockCtx &block = *m_blocks.back().get(); + for (const auto &field : m_fields) { + block.columns.emplace_back(field.column_factory()); + auto &column = block.columns.back(); + block.block.AppendColumn(field.name, column); + } + m_empty_blocks.put(&block); + } + + // Prepare inserters + IPX_CTX_INFO(m_ctx, "Preparing %d inserter threads", m_config.num_inserter_threads); + for (unsigned int i = 0; i < m_config.num_inserter_threads; i++) { + auto client = std::make_unique( + clickhouse::ClientOptions() + .SetHost(m_config.host) + .SetPort(m_config.port) + .SetUser(m_config.user) + .SetPassword(m_config.password) + .SetDefaultDatabase(m_config.database) + ); + + auto inserter = std::make_unique(std::move(client), m_config.table, m_filled_blocks, m_empty_blocks); + m_inserters.emplace_back(std::move(inserter)); + } + + // Start inserter threads + IPX_CTX_INFO(m_ctx, "Starting inserter threads", 0); + for (auto &inserter : m_inserters) { + inserter->start(); + } + + IPX_CTX_INFO(m_ctx, "Clickhouse plugin is ready", 0); +} + +void +Plugin::process(ipx_msg_t *msg) +{ + if (ipx_msg_get_type(msg) != IPX_MSG_IPFIX) { + return; + } + ipx_msg_ipfix_t *ipfix_msg = ipx_msg_base2ipfix(msg); + + // Get new empty block if there is no current block + if (!m_current_block) { + m_current_block = m_empty_blocks.get(); + } + + // Go through all the records and store them + uint32_t drec_cnt = ipx_msg_ipfix_get_drec_cnt(ipfix_msg); + for (uint32_t idx = 0; idx < drec_cnt; idx++) { + ipx_ipfix_record *rec = ipx_msg_ipfix_get_drec(ipfix_msg, idx); + for (size_t i = 0; i < m_fields.size(); i++) { + m_fields[i].handler(rec->rec, *m_current_block->columns[i]); + } + } + m_current_block->rows += drec_cnt; + + // Send the block for insertion if it is sufficiently full + if (m_current_block->rows >= m_config.block_insert_threshold) { + m_filled_blocks.put(m_current_block); + m_current_block = nullptr; + } + + // Update stats + m_stats.add_records(drec_cnt); + m_stats.print(); + + // Check for any exceptions was thrown by the inserter threads + for (auto &inserter : m_inserters) { + inserter->check_error(); + } +} + +void Plugin::stop() +{ + IPX_CTX_INFO(m_ctx, "Sending stop signal to inserter threads..."); + for (auto &inserter : m_inserters) { + inserter->stop(); + } + for (auto &_ : m_inserters) { + // Wake up the inserter threads in case they are waiting on a .get() + m_filled_blocks.put(nullptr); + } + + IPX_CTX_INFO(m_ctx, "Waiting for inserter threads to finish..."); + for (auto &inserter : m_inserters) { + inserter->join(); + } + + IPX_CTX_INFO(m_ctx, "All inserter threads stopped"); +} diff --git a/extra_plugins/output/clickhouse/src/plugin.h b/extra_plugins/output/clickhouse/src/plugin.h new file mode 100644 index 00000000..ca452bb6 --- /dev/null +++ b/extra_plugins/output/clickhouse/src/plugin.h @@ -0,0 +1,82 @@ +/** + * @file + * @author Michal Sedlak + * @brief + * @date 2024 + * + * Copyright(c) 2024 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "common.h" +#include "config.h" + +#include +#include +#include +#include + +using ColumnFactoryFn = std::function()>; +using HandlerFn = std::function; + +struct FieldCtx { + std::string name; + std::string type; + ColumnFactoryFn column_factory; + HandlerFn handler; +}; + +struct BlockCtx { + std::vector> columns; + clickhouse::Block block; + unsigned int rows; +}; + +class Inserter : Nonmoveable, Noncopyable { +public: + Inserter(std::unique_ptr client, const std::string &table, SyncQueue &filled_blocks, SyncQueue &empty_blocks); + + void start(); + + void stop(); + + void join(); + + void check_error(); + +private: + std::thread m_thread; + std::atomic_bool m_stop_signal = false; + std::atomic_bool m_errored = false; + std::exception_ptr m_exception = nullptr; + + std::unique_ptr m_client; + const std::string &m_table; + SyncQueue &m_filled_blocks; + SyncQueue &m_empty_blocks; + + void run(); +}; + +class Plugin : Nonmoveable, Noncopyable { +public: + Plugin(ipx_ctx_t *ctx, const char *xml_config); + + void process(ipx_msg_t *msg); + + void stop(); + +private: + ipx_ctx_t *m_ctx; + Config m_config; + std::unique_ptr m_client; + std::vector m_fields; + Stats m_stats; + uint64_t m_recs_processed = 0; + + BlockCtx *m_current_block = nullptr; + std::vector> m_inserters; + std::vector> m_blocks; + SyncQueue m_empty_blocks; + SyncQueue m_filled_blocks; +};