diff --git a/configure.ac b/configure.ac index c8f2ee0e..a50b5aa8 100644 --- a/configure.ac +++ b/configure.ac @@ -226,6 +226,32 @@ if [[ -z "$WITH_NDP_TRUE" ]]; then RPM_BUILDREQ+=" netcope-common-devel" fi +AC_ARG_WITH([ctt], + AC_HELP_STRING([--with-ctt],[Compile ipfixprobe with ctt plugin for using Connection Tracking Table]), + [ + if test "$withval" = "yes"; then + withctt="yes" + else + withctt="no" + fi + ], [withctt="no"] +) + +if test x${withctt} = xyes; then + AC_LANG_PUSH([C++]) + CXXFLAGS="$CXXFLAGS -std=c++17" + AC_CHECK_HEADERS([ctt.hpp], [libctt=yes], AC_MSG_ERROR([ctt.hpp not found. Try installing libctt-devel])) + AC_LANG_POP([C++]) +fi + +AM_CONDITIONAL(WITH_CTT, test x${libctt} = xyes && test x${withctt} = xyes) +if [[ -z "$WITH_CTT_TRUE" ]]; then + AC_DEFINE([WITH_CTT], [1], [Define to 1 if the ctt is available]) + LIBS="-lctt $LIBS" + RPM_REQUIRES+=" libctt" + RPM_BUILDREQ+=" libctt-devel" +fi + AC_ARG_WITH([pcap], AC_HELP_STRING([--with-pcap],[Compile ipfixprobe with pcap plugin for capturing using libpcap library]), [ diff --git a/include/ipfixprobe/flowifc.hpp b/include/ipfixprobe/flowifc.hpp index e95db465..2f3b5acb 100644 --- a/include/ipfixprobe/flowifc.hpp +++ b/include/ipfixprobe/flowifc.hpp @@ -38,6 +38,7 @@ #include #include #include +#include #ifdef WITH_NEMEA #include @@ -263,6 +264,14 @@ struct Flow : public Record { }; uint64_t flow_hash; + + #ifdef WITH_CTT + uint64_t flow_hash_ctt; /**< Flow hash for CTT. */ + bool record_in_ctt; /**< CTT - offload or not. */ + bool is_delayed; /**< Delayed export flag. */ + time_t delay_time; /**< Time until export of the flow is delayed. */ + #endif + PluginsStatus plugins_status; /**< Statuses of the process plugins for this flow, used to check if the flow process plugins requires all available data, only metadata or nothing of this. */ @@ -290,4 +299,5 @@ struct Flow : public Record { }; } + #endif /* IPXP_FLOWIFC_HPP */ diff --git a/include/ipfixprobe/packet.hpp b/include/ipfixprobe/packet.hpp index 8ac75f3b..575c2fb9 100644 --- a/include/ipfixprobe/packet.hpp +++ b/include/ipfixprobe/packet.hpp @@ -46,7 +46,10 @@ namespace ipxp { * \brief Structure for storing parsed packet fields */ struct Packet : public Record { - Metadata_CTT cttmeta; /**< Metadata from CTT */ + #ifdef WITH_CTT + Metadata_CTT cttmeta; /**< Metadata from CTT */ + bool cttmeta_valid; /**< True if CTT metadata is valid */ + #endif /* WITH_CTT */ struct timeval ts; uint8_t dst_mac[6]; @@ -108,7 +111,7 @@ struct Packet : public Record { * \brief Constructor. */ Packet() : - ts({0}), + cttmeta_valid(false), ts({0}), dst_mac(), src_mac(), ethertype(0), ip_len(0), ip_payload_len(0), ip_version(0), ip_ttl(0), ip_proto(0), ip_tos(0), ip_flags(0), src_ip({0}), dst_ip({0}), vlan_id(0), diff --git a/include/ipfixprobe/storage.hpp b/include/ipfixprobe/storage.hpp index 8a58f57e..5296557e 100644 --- a/include/ipfixprobe/storage.hpp +++ b/include/ipfixprobe/storage.hpp @@ -189,6 +189,10 @@ class StoragePlugin : public Plugin */ int plugins_post_create(Flow& rec, const Packet& pkt) { + // if metadata are valid, add flow hash ctt to the flow record + if (pkt.cttmeta_valid) { + rec.flow_hash_ctt = pkt.cttmeta.flow_hash; + } PluginStatusConverter plugin_status_converter(m_plugins_status); int ret = 0; for (unsigned int i = 0; i < m_plugin_cnt; i++) { diff --git a/input/ndp.cpp b/input/ndp.cpp index 8c72e0c3..34529554 100644 --- a/input/ndp.cpp +++ b/input/ndp.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -170,7 +171,10 @@ InputPlugin::Result NdpPacketReader::get(PacketBlock &packets) m_stats.bad_metadata++; parse_packet(&opt, m_parser_stats, timestamp, ndp_packet->data, ndp_packet->data_length, ndp_packet->data_length); } else { - parse_packet_ctt_metadata(&opt, m_parser_stats, ctt, ndp_packet->data, ndp_packet->data_length, ndp_packet->data_length); + if (parse_packet_ctt_metadata(&opt, m_parser_stats, ctt, ndp_packet->data, ndp_packet->data_length, ndp_packet->data_length) == -1) { + m_stats.bad_metadata++; + parse_packet(&opt, m_parser_stats, timestamp, ndp_packet->data, ndp_packet->data_length, ndp_packet->data_length); + } } } else { parse_packet(&opt, m_parser_stats, timestamp, ndp_packet->data, ndp_packet->data_length, ndp_packet->data_length); diff --git a/input/parser.cpp b/input/parser.cpp index 6b914683..ec5a2b5b 100644 --- a/input/parser.cpp +++ b/input/parser.cpp @@ -35,6 +35,7 @@ #include "parser.hpp" #include "headers.hpp" +#include #include namespace ipxp { @@ -776,12 +777,21 @@ void parse_packet(parser_opt_t *opt, ParserStats& stats, struct timeval ts, cons opt->pblock->bytes += len; } -void parse_packet_ctt_metadata(parser_opt_t *opt, ParserStats& stats, const Metadata_CTT& metadata, const uint8_t *data, uint16_t len, uint16_t caplen) +int parse_packet_ctt_metadata(parser_opt_t *opt, ParserStats& stats, const Metadata_CTT& metadata, const uint8_t *data, uint16_t len, uint16_t caplen) { if (opt->pblock->cnt >= opt->pblock->size) { - return; + return 0; } Packet *pkt = &opt->pblock->pkts[opt->pblock->cnt]; + + // check metadata validity + if (metadata.parser_status == PA_OK) { + pkt->cttmeta_valid = true; + } else { + pkt->cttmeta_valid = false; + return -1; + } + pkt->cttmeta = metadata; pkt->packet_len_wire = len; @@ -831,7 +841,7 @@ void parse_packet_ctt_metadata(parser_opt_t *opt, ParserStats& stats, const Meta stats.pppoe_packets++; } else { // if not previous, we try delegate to original parser parse_packet(opt, stats, metadata.ts, data, len, caplen); - return; + return 0; } // L4 @@ -843,11 +853,11 @@ void parse_packet_ctt_metadata(parser_opt_t *opt, ParserStats& stats, const Meta stats.udp_packets++; } else { // if not previous, we try delegate to original parser parse_packet(opt, stats, metadata.ts, data, len, caplen); - return; + return 0; } } catch (const char *err) { DEBUG_MSG("%s\n", err); - return; + return 0; } if (pkt->vlan_id) { @@ -880,6 +890,7 @@ void parse_packet_ctt_metadata(parser_opt_t *opt, ParserStats& stats, const Meta opt->packet_valid = true; opt->pblock->cnt++; opt->pblock->bytes += len; + return 0; } } diff --git a/input/parser.hpp b/input/parser.hpp index 580e6703..d9a3b7f4 100644 --- a/input/parser.hpp +++ b/input/parser.hpp @@ -86,7 +86,7 @@ typedef struct parser_opt_s { */ void parse_packet(parser_opt_t *opt, ParserStats& stats, struct timeval ts, const uint8_t *data, uint16_t len, uint16_t caplen); -void parse_packet_ctt_metadata(parser_opt_t *opt, ParserStats& stats, const Metadata_CTT& metadata, const uint8_t *data, uint16_t len, uint16_t caplen); +int parse_packet_ctt_metadata(parser_opt_t *opt, ParserStats& stats, const Metadata_CTT& metadata, const uint8_t *data, uint16_t len, uint16_t caplen); } #endif /* IPXP_INPUT_PARSER_HPP */ diff --git a/storage/cache.cpp b/storage/cache.cpp index 2f15b76d..af8171c4 100644 --- a/storage/cache.cpp +++ b/storage/cache.cpp @@ -33,6 +33,7 @@ #include #include #include +#include #include #include @@ -138,10 +139,22 @@ void FlowRecord::create(const Packet &pkt, uint64_t hash) m_flow.src_port = pkt.src_port; m_flow.dst_port = pkt.dst_port; } + #ifdef WITH_CTT + m_flow.is_delayed = false; + m_delayed_flow_waiting = false; + #endif /* WITH_CTT */ } void FlowRecord::update(const Packet &pkt, bool src) { + if (m_flow.is_delayed && !pkt.cttmeta.ctt_rec_matched) { // it means, the flow is waiting for export and it is not matched in CTT -> it must be new flow + auto flow_hash = m_hash; + m_delayed_flow = m_flow; + m_delayed_flow_waiting = true; + erase(); // erase the old flow, keeping the delayed flow + create(pkt, flow_hash); + return; + } m_flow.time_last = pkt.ts; if (src) { m_flow.src_packets++; @@ -192,6 +205,9 @@ void NHTFlowCache::init(const char *params) m_timeout_idx = 0; m_line_mask = (m_cache_size - 1) & ~(m_line_size - 1); m_line_new_idx = m_line_size / 2; + #ifdef WITH_CTT + m_ctt_controller.init(parser.m_dev, 0); + #endif /* WITH_CTT */ if (m_export_queue == nullptr) { throw PluginError("output queue must be set before init"); @@ -256,6 +272,17 @@ void NHTFlowCache::set_queue(ipx_ring_t *queue) void NHTFlowCache::export_flow(size_t index) { + if (m_flow_table[index]->m_flow.is_delayed) { + return; + } + if (m_flow_table[index]->m_delayed_flow_waiting && !m_flow_table[index]->m_delayed_flow.is_delayed) { + m_total_exported++; + update_flow_end_reason_stats(m_flow_table[index]->m_delayed_flow.end_reason); + update_flow_record_stats( + m_flow_table[index]->m_delayed_flow.src_packets + + m_flow_table[index]->m_delayed_flow.dst_packets); + ipx_ring_push(m_export_queue, &m_flow_table[index]->m_delayed_flow); + } m_total_exported++; update_flow_end_reason_stats(m_flow_table[index]->m_flow.end_reason); update_flow_record_stats( @@ -502,6 +529,16 @@ void NHTFlowCache::export_expired(time_t ts) m_flow_table[i]->m_flow.end_reason = get_export_reason(m_flow_table[i]->m_flow); plugins_pre_export(m_flow_table[i]->m_flow); export_flow(i); + if (!m_flow_table[i]->is_empty() && m_flow_table[i]->m_flow.is_delayed && m_flow_table[i]->m_flow.delay_time >= ts) { + m_flow_table[i]->m_flow.is_delayed = false; + plugins_pre_export(m_flow_table[i]->m_flow); + export_flow(i); + } + if(!m_flow_table[i]->is_empty() && m_flow_table[i]->m_delayed_flow_waiting && m_flow_table[i]->m_delayed_flow.delay_time >= ts) { + m_flow_table[i]->m_delayed_flow_waiting = false; + plugins_pre_export(m_flow_table[i]->m_delayed_flow); + export_flow(i); + } #ifdef FLOW_CACHE_STATS m_expired++; #endif /* FLOW_CACHE_STATS */ @@ -658,4 +695,61 @@ void NHTFlowCache::prefetch_export_expired() const __builtin_prefetch(m_flow_table[i], 0, 1); } } + +#ifdef WITH_CTT + +void CttController::create_record(uint64_t flow_hash_ctt, const struct timeval& ts) +{ + try { + std::vector key = assemble_key(flow_hash_ctt); + std::vector state = assemble_state( + OffloadMode::PACKET_OFFLOAD, + MetaType::FULL, + ts); + m_commander->write_record(std::move(key), std::move(state)); + } + catch (const std::exception& e) { + throw; + } +} + +void CttController::export_record(uint64_t flow_hash_ctt) +{ + try { + std::vector key = assemble_key(flow_hash_ctt); + m_commander->export_and_delete_record(std::move(key)); + } + catch (const std::exception& e) { + throw; + } +} + +std::vector CttController::assemble_key(uint64_t flow_hash_ctt) +{ + std::vector key(key_size_bytes, std::byte(0)); + for (size_t i = 0; i < sizeof(flow_hash_ctt) && i < key_size_bytes; ++i) { + key[i] = static_cast((flow_hash_ctt >> (8 * i)) & 0xFF); + } + return key; +} + +std::vector CttController::assemble_state( + OffloadMode offload_mode, MetaType meta_type, const struct timeval& ts) +{ + std::vector state(state_size_bytes, std::byte(0)); + std::vector state_mask(state_mask_size_bytes, std::byte(0)); + + state[0] = static_cast(offload_mode); + state[1] = static_cast(meta_type); + + // timestamp in sec/ns format, 32+32 bits - 64 bits in total + for (size_t i = 0; i < sizeof(ts.tv_sec) && i < 4; ++i) { + state[2 + i] = static_cast((ts.tv_sec >> (8 * i)) & 0xFF); + } + for (size_t i = 0; i < sizeof(ts.tv_usec) && i < 4; ++i) { + state[6 + i] = static_cast((ts.tv_usec >> (8 * i)) & 0xFF); + } + return state; } +#endif // WITH_CTT +} \ No newline at end of file diff --git a/storage/cache.hpp b/storage/cache.hpp index 57087f5b..ad8de024 100644 --- a/storage/cache.hpp +++ b/storage/cache.hpp @@ -32,6 +32,9 @@ #ifndef IPXP_STORAGE_CACHE_HPP #define IPXP_STORAGE_CACHE_HPP +#include +#include +#include #include #include @@ -42,8 +45,101 @@ #include "fragmentationCache/fragmentationCache.hpp" +#ifdef WITH_CTT +#include +#include +#include +#include +#include +#include +#include +#include +#endif /* WITH_CTT */ + namespace ipxp { +#ifdef WITH_CTT + +class CttController { +public: + enum class OffloadMode : uint8_t { + NO_OFFLOAD = 0x0, + PACKET_OFFLOAD = 0x1, + META_EXPORT = 0x2, + PACKET_OFFLOAD_WITH_EXPORT = 0x3 + }; + enum class MetaType : uint8_t { + FULL = 0x0, + HALF = 0x1, + TS_ONLY = 0x2, + NO_META = 0x3 + }; + /** + * @brief init the CTT. + * + * @param nfb_dev The NFB device file (e.g., "/dev/nfb0"). + * @param ctt_comp_index The index of the CTT component. + */ + void init(const std::string& nfb_dev, unsigned ctt_comp_index) { + m_commander = std::make_unique(ctt::NfbParams{nfb_dev, ctt_comp_index}); + try { + // Get UserInfo to determine key, state, and state_mask sizes + ctt::UserInfo user_info = m_commander->get_user_info(); + key_size_bytes = (user_info.key_bit_width + 7) / 8; + state_size_bytes = (user_info.state_bit_width + 7) / 8; + state_mask_size_bytes = (user_info.state_mask_bit_width + 7) / 8; + + // Enable the CTT + std::future enable_future = m_commander->enable(true); + enable_future.wait(); + } + catch (const std::exception& e) { + throw; + } + } + + /** + * @brief Command: mark a flow for offload. + * + * @param flow_hash_ctt The flow hash to be offloaded. + */ + void create_record(uint64_t flow_hash_ctt, const struct timeval& timestamp_first); + + /** + * @brief Command: export a flow from the CTT. + * + * @param flow_hash_ctt The flow hash to be exported. + */ + void export_record(uint64_t flow_hash_ctt); + +private: + std::unique_ptr m_commander; + size_t key_size_bytes; + size_t state_size_bytes; + size_t state_mask_size_bytes; + + /** + * @brief Assembles the state vector from the given values. + * + * @param offload_mode The offload mode. + * @param meta_type The metadata type. + * @param timestamp_first The first timestamp of the flow. + * @return A byte vector representing the assembled state vector. + */ + std::vector assemble_state( + OffloadMode offload_mode, MetaType meta_type, + const struct timeval& timestamp_first); + + /** + * @brief Assembles the key vector from the given flow hash. + * + * @param flow_hash_ctt The flow hash. + * @return A byte vector representing the assembled key vector. + */ + std::vector assemble_key(uint64_t flow_hash_ctt); +}; +#endif /* WITH_CTT */ + struct __attribute__((packed)) flow_key_v4_t { uint16_t src_port; uint16_t dst_port; @@ -99,6 +195,9 @@ class CacheOptParser : public OptionsParser bool m_enable_fragmentation_cache; std::size_t m_frag_cache_size; time_t m_frag_cache_timeout; + #ifdef WITH_CTT + std::string m_dev; + #endif /* WITH_CTT */ CacheOptParser() : OptionsParser("cache", "Storage plugin implemented as a hash table"), m_cache_size(1 << DEFAULT_FLOW_CACHE_SIZE), m_line_size(1 << DEFAULT_FLOW_LINE_SIZE), @@ -156,6 +255,16 @@ class CacheOptParser : public OptionsParser } return true; }); + + #ifdef WITH_CTT + register_option("d", "dev", "DEV", "Device name", + [this](const char *arg) { + m_dev = arg; + return true; + }, + OptionFlags::RequiredArgument); + #endif /* WITH_CTT */ + } }; @@ -165,6 +274,10 @@ class alignas(64) FlowRecord public: Flow m_flow; + #ifdef WITH_CTT + Flow m_delayed_flow; + bool m_delayed_flow_waiting; + #endif /* WITH_CTT */ FlowRecord(); ~FlowRecord(); @@ -214,6 +327,48 @@ class NHTFlowCache : TelemetryUtils, public StoragePlugin */ void set_telemetry_dir(std::shared_ptr dir) override; + #ifdef WITH_CTT + + int plugins_post_create(Flow &rec, Packet &pkt) { + int ret = StoragePlugin::plugins_post_create(rec, pkt); + rec.record_in_ctt = false; + //if (only_metadata_required(rec)) { + if (only_metadata_required(rec)) { + m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first); + rec.record_in_ctt = true; + } + return ret; + } + + // override post_update method + int plugins_post_update(Flow &rec, Packet &pkt) { + int ret = StoragePlugin::plugins_post_update(rec, pkt); + //if (only_metadata_required(rec) && !rec.ctt_state) { + if (!rec.record_in_ctt) { // only for debug!!!!! line above is correct for production + m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first); + rec.record_in_ctt = true; + } + return ret; + } + + // override pre_export method + void plugins_pre_export(Flow &rec) { + if (rec.record_in_ctt) { + rec.is_delayed = true; + rec.delay_time = time(nullptr) + 1; + m_ctt_controller.export_record(rec.flow_hash_ctt); + rec.record_in_ctt = false; + return; + } + if (rec.is_delayed) { + return; + } else { + StoragePlugin::plugins_pre_export(rec); + } + } + + #endif /* WITH_CTT */ + private: uint32_t m_cache_size; uint32_t m_line_size; @@ -242,7 +397,9 @@ class NHTFlowCache : TelemetryUtils, public StoragePlugin char m_key_inv[MAX_KEY_LENGTH]; FlowRecord **m_flow_table; FlowRecord *m_flow_records; - +#ifdef WITH_CTT + CttController m_ctt_controller; +#endif /* WITH_CTT */ FragmentationCache m_fragmentation_cache; FlowEndReasonStats m_flow_end_reason_stats = {}; FlowRecordStats m_flow_record_stats = {}; @@ -265,4 +422,4 @@ class NHTFlowCache : TelemetryUtils, public StoragePlugin }; } -#endif /* IPXP_STORAGE_CACHE_HPP */ +#endif /* IPXP_STORAGE_CACHE_HPP */ \ No newline at end of file