From 019fcad4a05070842c99a11f3c2eaceabe9ed424 Mon Sep 17 00:00:00 2001 From: Jaroslav Pesek Date: Tue, 12 Nov 2024 19:00:15 +0100 Subject: [PATCH 1/5] ndp ctt table controller - init --- include/ipfixprobe/flowifc.hpp | 2 + include/ipfixprobe/packet.hpp | 3 +- include/ipfixprobe/storage.hpp | 7 +++ input/ndp.cpp | 5 +- input/parser.cpp | 21 +++++-- input/parser.hpp | 2 +- storage/ctt-controller.cpp | 111 +++++++++++++++++++++++++++++++++ storage/ctt-controller.hpp | 107 +++++++++++++++++++++++++++++++ 8 files changed, 250 insertions(+), 8 deletions(-) create mode 100644 storage/ctt-controller.cpp create mode 100644 storage/ctt-controller.hpp diff --git a/include/ipfixprobe/flowifc.hpp b/include/ipfixprobe/flowifc.hpp index e95db465..51964c0a 100644 --- a/include/ipfixprobe/flowifc.hpp +++ b/include/ipfixprobe/flowifc.hpp @@ -263,6 +263,8 @@ struct Flow : public Record { }; uint64_t flow_hash; + uint64_t flow_hash_ctt; /**< Flow hash for CTT. */ + bool ctt_valid; /**< CTT validity flag. */ PluginsStatus plugins_status; /**< Statuses of the process plugins for this flow, used to check if the flow process plugins requires all available data, only metadata or nothing of this. */ diff --git a/include/ipfixprobe/packet.hpp b/include/ipfixprobe/packet.hpp index 8ac75f3b..bd1e5aba 100644 --- a/include/ipfixprobe/packet.hpp +++ b/include/ipfixprobe/packet.hpp @@ -47,6 +47,7 @@ namespace ipxp { */ struct Packet : public Record { Metadata_CTT cttmeta; /**< Metadata from CTT */ + bool cttmeta_valid; /**< True if CTT metadata is valid */ struct timeval ts; uint8_t dst_mac[6]; @@ -108,7 +109,7 @@ struct Packet : public Record { * \brief Constructor. */ Packet() : - ts({0}), + cttmeta_valid(false), ts({0}), dst_mac(), src_mac(), ethertype(0), ip_len(0), ip_payload_len(0), ip_version(0), ip_ttl(0), ip_proto(0), ip_tos(0), ip_flags(0), src_ip({0}), dst_ip({0}), vlan_id(0), diff --git a/include/ipfixprobe/storage.hpp b/include/ipfixprobe/storage.hpp index 8a58f57e..d9573bbb 100644 --- a/include/ipfixprobe/storage.hpp +++ b/include/ipfixprobe/storage.hpp @@ -189,6 +189,13 @@ class StoragePlugin : public Plugin */ int plugins_post_create(Flow& rec, const Packet& pkt) { + // if metadata are valid, add flow hash ctt to the flow record + if (pkt.cttmeta_valid) { + rec.ctt_valid = true; + rec.flow_hash_ctt = pkt.cttmeta.flow_hash; + } else { + rec.ctt_valid = false; + } PluginStatusConverter plugin_status_converter(m_plugins_status); int ret = 0; for (unsigned int i = 0; i < m_plugin_cnt; i++) { diff --git a/input/ndp.cpp b/input/ndp.cpp index 8c72e0c3..398ccd34 100644 --- a/input/ndp.cpp +++ b/input/ndp.cpp @@ -170,7 +170,10 @@ InputPlugin::Result NdpPacketReader::get(PacketBlock &packets) m_stats.bad_metadata++; parse_packet(&opt, m_parser_stats, timestamp, ndp_packet->data, ndp_packet->data_length, ndp_packet->data_length); } else { - parse_packet_ctt_metadata(&opt, m_parser_stats, ctt, ndp_packet->data, ndp_packet->data_length, ndp_packet->data_length); + if (parse_packet_ctt_metadata(&opt, m_parser_stats, ctt, ndp_packet->data, ndp_packet->data_length, ndp_packet->data_length) == -1) { + m_stats.bad_metadata++; + parse_packet(&opt, m_parser_stats, timestamp, ndp_packet->data, ndp_packet->data_length, ndp_packet->data_length); + } } } else { parse_packet(&opt, m_parser_stats, timestamp, ndp_packet->data, ndp_packet->data_length, ndp_packet->data_length); diff --git a/input/parser.cpp b/input/parser.cpp index 6b914683..ec5a2b5b 100644 --- a/input/parser.cpp +++ b/input/parser.cpp @@ -35,6 +35,7 @@ #include "parser.hpp" #include "headers.hpp" +#include #include namespace ipxp { @@ -776,12 +777,21 @@ void parse_packet(parser_opt_t *opt, ParserStats& stats, struct timeval ts, cons opt->pblock->bytes += len; } -void parse_packet_ctt_metadata(parser_opt_t *opt, ParserStats& stats, const Metadata_CTT& metadata, const uint8_t *data, uint16_t len, uint16_t caplen) +int parse_packet_ctt_metadata(parser_opt_t *opt, ParserStats& stats, const Metadata_CTT& metadata, const uint8_t *data, uint16_t len, uint16_t caplen) { if (opt->pblock->cnt >= opt->pblock->size) { - return; + return 0; } Packet *pkt = &opt->pblock->pkts[opt->pblock->cnt]; + + // check metadata validity + if (metadata.parser_status == PA_OK) { + pkt->cttmeta_valid = true; + } else { + pkt->cttmeta_valid = false; + return -1; + } + pkt->cttmeta = metadata; pkt->packet_len_wire = len; @@ -831,7 +841,7 @@ void parse_packet_ctt_metadata(parser_opt_t *opt, ParserStats& stats, const Meta stats.pppoe_packets++; } else { // if not previous, we try delegate to original parser parse_packet(opt, stats, metadata.ts, data, len, caplen); - return; + return 0; } // L4 @@ -843,11 +853,11 @@ void parse_packet_ctt_metadata(parser_opt_t *opt, ParserStats& stats, const Meta stats.udp_packets++; } else { // if not previous, we try delegate to original parser parse_packet(opt, stats, metadata.ts, data, len, caplen); - return; + return 0; } } catch (const char *err) { DEBUG_MSG("%s\n", err); - return; + return 0; } if (pkt->vlan_id) { @@ -880,6 +890,7 @@ void parse_packet_ctt_metadata(parser_opt_t *opt, ParserStats& stats, const Meta opt->packet_valid = true; opt->pblock->cnt++; opt->pblock->bytes += len; + return 0; } } diff --git a/input/parser.hpp b/input/parser.hpp index 580e6703..d9a3b7f4 100644 --- a/input/parser.hpp +++ b/input/parser.hpp @@ -86,7 +86,7 @@ typedef struct parser_opt_s { */ void parse_packet(parser_opt_t *opt, ParserStats& stats, struct timeval ts, const uint8_t *data, uint16_t len, uint16_t caplen); -void parse_packet_ctt_metadata(parser_opt_t *opt, ParserStats& stats, const Metadata_CTT& metadata, const uint8_t *data, uint16_t len, uint16_t caplen); +int parse_packet_ctt_metadata(parser_opt_t *opt, ParserStats& stats, const Metadata_CTT& metadata, const uint8_t *data, uint16_t len, uint16_t caplen); } #endif /* IPXP_INPUT_PARSER_HPP */ diff --git a/storage/ctt-controller.cpp b/storage/ctt-controller.cpp new file mode 100644 index 00000000..649efeb2 --- /dev/null +++ b/storage/ctt-controller.cpp @@ -0,0 +1,111 @@ +/** + * \file ctt-controller.cpp + * \brief Connection Tracking Table (CTT) controller + * \author Jaroslav Pesek + * \date 2024 + */ +/* + * Copyright (C) 2024 CESNET + * + * LICENSE TERMS + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * + * + */ + +#include "ctt-controller.hpp" +#include +#include + +namespace ipxp { + +CttController::CttController(const std::string& nfb_dev, unsigned ctt_comp_index) + : m_commander(ctt::NfbParams{nfb_dev, ctt_comp_index}) +{ + try { + // Get UserInfo to determine key, state, and state_mask sizes + ctt::UserInfo user_info = m_commander.get_user_info(); + key_size_bytes = (user_info.key_bit_width + 7) / 8; + state_size_bytes = (user_info.state_bit_width + 7) / 8; + state_mask_size_bytes = (user_info.state_mask_bit_width + 7) / 8; + + // Enable the CTT + std::future enable_future = m_commander.enable(true); + enable_future.wait(); + } + catch (const std::exception& e) { + throw; + } +} + +void CttController::create_record(uint64_t flow_hash_ctt, const struct timeval& ts) +{ + try { + std::vector key = assemble_key(flow_hash_ctt); + std::vector state = assemble_state( + OffloadMode::PACKET_OFFLOAD_WITH_EXPORT, + MetaType::FULL, + ts); + + m_commander.write_record(std::move(key), std::move(state)); + } + catch (const std::exception& e) { + throw; + } +} + +void CttController::export_record(uint64_t flow_hash_ctt) +{ + try { + std::vector key = assemble_key(flow_hash_ctt); + + m_commander.export_record(std::move(key)); + } + catch (const std::exception& e) { + throw; + } +} + +std::vector CttController::assemble_key(uint64_t flow_hash_ctt) +{ + std::vector key(key_size_bytes, std::byte(0)); + for (size_t i = 0; i < sizeof(flow_hash_ctt) && i < key_size_bytes; ++i) { + key[i] = static_cast((flow_hash_ctt >> (8 * i)) & 0xFF); + } + return key; + +} + +std::vector CttController::assemble_state( + OffloadMode offload_mode, MetaType meta_type, const struct timeval& ts) +{ + std::vector state(state_size_bytes, std::byte(0)); + std::vector state_mask(state_mask_size_bytes, std::byte(0)); + + state[0] = static_cast(offload_mode); + state[1] = static_cast(meta_type); + + // timestamp in sec/ns format, 32+32 bits - 64 bits in total + for (size_t i = 0; i < sizeof(ts.tv_sec) && i < 4; ++i) { + state[2 + i] = static_cast((ts.tv_sec >> (8 * i)) & 0xFF); + } + for (size_t i = 0; i < sizeof(ts.tv_usec) && i < 4; ++i) { + state[6 + i] = static_cast((ts.tv_usec >> (8 * i)) & 0xFF); + } + return state; +} + +} // namespace ipxp \ No newline at end of file diff --git a/storage/ctt-controller.hpp b/storage/ctt-controller.hpp new file mode 100644 index 00000000..f5da0af0 --- /dev/null +++ b/storage/ctt-controller.hpp @@ -0,0 +1,107 @@ +/** + * \file ctt-controller.hpp + * \brief Connection Tracking Table (CTT) controller + * \author Jaroslav Pesek + * \date 2024 + */ +/* + * Copyright (C) 2024 CESNET + * + * LICENSE TERMS + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * + * + */ + +#ifndef IPXP_CTT_CONTROLLER_HPP +#define IPXP_CTT_CONTROLLER_HPP + +#include + +#include +#include +#include +#include + +#include + +namespace ipxp { + +class CttController { +public: + enum class OffloadMode : uint8_t { + NO_OFFLOAD = 0x0, + PACKET_OFFLOAD = 0x1, + META_EXPORT = 0x2, + PACKET_OFFLOAD_WITH_EXPORT = 0x3 + }; + enum class MetaType : uint8_t { + FULL = 0x0, + HALF = 0x1, + TS_ONLY = 0x2, + NO_META = 0x3 + }; + /** + * @brief Constructor that initializes the CTT. + * + * @param nfb_dev The NFB device file (e.g., "/dev/nfb0"). + * @param ctt_comp_index The index of the CTT component. + */ + CttController(const std::string& nfb_dev, unsigned ctt_comp_index); + + /** + * @brief Command: mark a flow for offload. + * + * @param flow_hash_ctt The flow hash to be offloaded. + */ + void create_record(uint64_t flow_hash_ctt, const struct timeval& timestamp_first); + + /** + * @brief Command: export a flow from the CTT. + * + * @param flow_hash_ctt The flow hash to be exported. + */ + void export_record(uint64_t flow_hash_ctt); + +private: + ctt::AsyncCommander m_commander; + size_t key_size_bytes; + size_t state_size_bytes; + size_t state_mask_size_bytes; + + /** + * @brief Assembles the state vector from the given values. + * + * @param offload_mode The offload mode. + * @param meta_type The metadata type. + * @param timestamp_first The first timestamp of the flow. + * @return A byte vector representing the assembled state vector. + */ + std::vector assemble_state( + OffloadMode offload_mode, MetaType meta_type, + const struct timeval& timestamp_first); + + /** + * @brief Assembles the key vector from the given flow hash. + * + * @param flow_hash_ctt The flow hash. + * @return A byte vector representing the assembled key vector. + */ + std::vector assemble_key(uint64_t flow_hash_ctt); +}; +} // namespace ipxp + +#endif /* IPXP_CTT_CONTROLLER_HPP */ From 5b59c0ece227bddc33b3f954bb3537419017135b Mon Sep 17 00:00:00 2001 From: Jaroslav Pesek Date: Wed, 13 Nov 2024 17:41:49 +0100 Subject: [PATCH 2/5] ctt - add cache-ctt --- storage/cache-ctt.cpp | 0 storage/cache-ctt.hpp | 57 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+) create mode 100644 storage/cache-ctt.cpp create mode 100644 storage/cache-ctt.hpp diff --git a/storage/cache-ctt.cpp b/storage/cache-ctt.cpp new file mode 100644 index 00000000..e69de29b diff --git a/storage/cache-ctt.hpp b/storage/cache-ctt.hpp new file mode 100644 index 00000000..e5f59f15 --- /dev/null +++ b/storage/cache-ctt.hpp @@ -0,0 +1,57 @@ +#ifndef IPXP_STORAGE_CACHE_CTT_HPP +#define IPXP_STORAGE_CACHE_CTT_HPP + +#include "cache.hpp" +#include "ctt-controller.hpp" + +namespace ipxp { + +// Extend CacheOptParser to create CacheCTTOptParser +class CacheCTTOptParser : public CacheOptParser +{ +public: + std::string m_dev; + + CacheCTTOptParser() : CacheOptParser(), m_dev("") + { + // Register the new option "dev=DEV for device name where is CTT running" + register_option("d", "dev", "DEV", "Device name", + [this](const char *arg){ + m_dev = arg; + return true; + }, + OptionFlags::RequiredArgument); + } +}; + +class NHTFlowCacheCTT : public NHTFlowCache +{ +public: + NHTFlowCacheCTT(); + ~NHTFlowCacheCTT(); + + void init(const char *params) override; + OptionsParser *get_parser() const override { return new CacheCTTOptParser(); } + std::string get_name() const override { return "cache_ctt"; } + + // override post_create method + int plugins_post_create(Flow &rec, Packet &pkt) { + int ret = StoragePlugin::plugins_post_create(rec, pkt); + if (no_data_required(rec)) { + m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first); + } else if (all_data_required(rec)) { + m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first); + } + + + return ret; + } + +private: + std::string m_dev; + CttController m_ctt_controller; +}; + +} + +#endif /* IPXP_STORAGE_CACHE_CTT_HPP */ \ No newline at end of file From ca900640816917e95f61fadb21a84cead85954fc Mon Sep 17 00:00:00 2001 From: Jaroslav Pesek Date: Mon, 18 Nov 2024 18:36:20 +0100 Subject: [PATCH 3/5] ctt controller - add ctt state to flow --- include/ipfixprobe/flowifc.hpp | 1 + storage/cache-ctt.hpp | 21 ++++++++++++++++++--- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/include/ipfixprobe/flowifc.hpp b/include/ipfixprobe/flowifc.hpp index 51964c0a..854cba38 100644 --- a/include/ipfixprobe/flowifc.hpp +++ b/include/ipfixprobe/flowifc.hpp @@ -265,6 +265,7 @@ struct Flow : public Record { uint64_t flow_hash; uint64_t flow_hash_ctt; /**< Flow hash for CTT. */ bool ctt_valid; /**< CTT validity flag. */ + int ctt_state; /**< CTT - offload or not. */ PluginsStatus plugins_status; /**< Statuses of the process plugins for this flow, used to check if the flow process plugins requires all available data, only metadata or nothing of this. */ diff --git a/storage/cache-ctt.hpp b/storage/cache-ctt.hpp index e5f59f15..8a79771b 100644 --- a/storage/cache-ctt.hpp +++ b/storage/cache-ctt.hpp @@ -37,16 +37,31 @@ class NHTFlowCacheCTT : public NHTFlowCache // override post_create method int plugins_post_create(Flow &rec, Packet &pkt) { int ret = StoragePlugin::plugins_post_create(rec, pkt); + rec.ctt_state = static_cast(CttController::OffloadMode::NO_OFFLOAD); if (no_data_required(rec)) { m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first); - } else if (all_data_required(rec)) { - m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first); + rec.ctt_state = static_cast(CttController::OffloadMode::PACKET_OFFLOAD); } + return ret; + } - + // override post_update method + int plugins_post_update(Flow &rec, Packet &pkt) { + int ret = StoragePlugin::plugins_post_update(rec, pkt); + if (no_data_required(rec) && (rec.ctt_state == static_cast(CttController::OffloadMode::NO_OFFLOAD))) { + m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first); + rec.ctt_state = static_cast(CttController::OffloadMode::PACKET_OFFLOAD); + } return ret; } + // override pre_export method + void plugins_pre_export(Flow &rec) { + StoragePlugin::plugins_pre_export(rec); + m_ctt_controller.export_record(rec.flow_hash_ctt); + } + + private: std::string m_dev; CttController m_ctt_controller; From ac1bcf5cb639aae77463ff47ad225b0008e03d40 Mon Sep 17 00:00:00 2001 From: Jaroslav Pesek Date: Thu, 21 Nov 2024 15:56:13 +0100 Subject: [PATCH 4/5] ndp ctt controller - ctt cond compilation --- configure.ac | 26 ++++++ include/ipfixprobe/flowifc.hpp | 4 + input/ndp.cpp | 1 + storage/cache-ctt.cpp | 0 storage/cache-ctt.hpp | 72 ----------------- storage/cache.cpp | 68 ++++++++++++++++ storage/cache.hpp | 139 ++++++++++++++++++++++++++++++++- storage/ctt-controller.cpp | 111 -------------------------- storage/ctt-controller.hpp | 107 ------------------------- 9 files changed, 236 insertions(+), 292 deletions(-) delete mode 100644 storage/cache-ctt.cpp delete mode 100644 storage/cache-ctt.hpp delete mode 100644 storage/ctt-controller.cpp delete mode 100644 storage/ctt-controller.hpp diff --git a/configure.ac b/configure.ac index c8f2ee0e..a50b5aa8 100644 --- a/configure.ac +++ b/configure.ac @@ -226,6 +226,32 @@ if [[ -z "$WITH_NDP_TRUE" ]]; then RPM_BUILDREQ+=" netcope-common-devel" fi +AC_ARG_WITH([ctt], + AC_HELP_STRING([--with-ctt],[Compile ipfixprobe with ctt plugin for using Connection Tracking Table]), + [ + if test "$withval" = "yes"; then + withctt="yes" + else + withctt="no" + fi + ], [withctt="no"] +) + +if test x${withctt} = xyes; then + AC_LANG_PUSH([C++]) + CXXFLAGS="$CXXFLAGS -std=c++17" + AC_CHECK_HEADERS([ctt.hpp], [libctt=yes], AC_MSG_ERROR([ctt.hpp not found. Try installing libctt-devel])) + AC_LANG_POP([C++]) +fi + +AM_CONDITIONAL(WITH_CTT, test x${libctt} = xyes && test x${withctt} = xyes) +if [[ -z "$WITH_CTT_TRUE" ]]; then + AC_DEFINE([WITH_CTT], [1], [Define to 1 if the ctt is available]) + LIBS="-lctt $LIBS" + RPM_REQUIRES+=" libctt" + RPM_BUILDREQ+=" libctt-devel" +fi + AC_ARG_WITH([pcap], AC_HELP_STRING([--with-pcap],[Compile ipfixprobe with pcap plugin for capturing using libpcap library]), [ diff --git a/include/ipfixprobe/flowifc.hpp b/include/ipfixprobe/flowifc.hpp index 854cba38..05c77774 100644 --- a/include/ipfixprobe/flowifc.hpp +++ b/include/ipfixprobe/flowifc.hpp @@ -263,9 +263,13 @@ struct Flow : public Record { }; uint64_t flow_hash; + + #ifdef WITH_CTT uint64_t flow_hash_ctt; /**< Flow hash for CTT. */ bool ctt_valid; /**< CTT validity flag. */ int ctt_state; /**< CTT - offload or not. */ + #endif + PluginsStatus plugins_status; /**< Statuses of the process plugins for this flow, used to check if the flow process plugins requires all available data, only metadata or nothing of this. */ diff --git a/input/ndp.cpp b/input/ndp.cpp index 398ccd34..34529554 100644 --- a/input/ndp.cpp +++ b/input/ndp.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include diff --git a/storage/cache-ctt.cpp b/storage/cache-ctt.cpp deleted file mode 100644 index e69de29b..00000000 diff --git a/storage/cache-ctt.hpp b/storage/cache-ctt.hpp deleted file mode 100644 index 8a79771b..00000000 --- a/storage/cache-ctt.hpp +++ /dev/null @@ -1,72 +0,0 @@ -#ifndef IPXP_STORAGE_CACHE_CTT_HPP -#define IPXP_STORAGE_CACHE_CTT_HPP - -#include "cache.hpp" -#include "ctt-controller.hpp" - -namespace ipxp { - -// Extend CacheOptParser to create CacheCTTOptParser -class CacheCTTOptParser : public CacheOptParser -{ -public: - std::string m_dev; - - CacheCTTOptParser() : CacheOptParser(), m_dev("") - { - // Register the new option "dev=DEV for device name where is CTT running" - register_option("d", "dev", "DEV", "Device name", - [this](const char *arg){ - m_dev = arg; - return true; - }, - OptionFlags::RequiredArgument); - } -}; - -class NHTFlowCacheCTT : public NHTFlowCache -{ -public: - NHTFlowCacheCTT(); - ~NHTFlowCacheCTT(); - - void init(const char *params) override; - OptionsParser *get_parser() const override { return new CacheCTTOptParser(); } - std::string get_name() const override { return "cache_ctt"; } - - // override post_create method - int plugins_post_create(Flow &rec, Packet &pkt) { - int ret = StoragePlugin::plugins_post_create(rec, pkt); - rec.ctt_state = static_cast(CttController::OffloadMode::NO_OFFLOAD); - if (no_data_required(rec)) { - m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first); - rec.ctt_state = static_cast(CttController::OffloadMode::PACKET_OFFLOAD); - } - return ret; - } - - // override post_update method - int plugins_post_update(Flow &rec, Packet &pkt) { - int ret = StoragePlugin::plugins_post_update(rec, pkt); - if (no_data_required(rec) && (rec.ctt_state == static_cast(CttController::OffloadMode::NO_OFFLOAD))) { - m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first); - rec.ctt_state = static_cast(CttController::OffloadMode::PACKET_OFFLOAD); - } - return ret; - } - - // override pre_export method - void plugins_pre_export(Flow &rec) { - StoragePlugin::plugins_pre_export(rec); - m_ctt_controller.export_record(rec.flow_hash_ctt); - } - - -private: - std::string m_dev; - CttController m_ctt_controller; -}; - -} - -#endif /* IPXP_STORAGE_CACHE_CTT_HPP */ \ No newline at end of file diff --git a/storage/cache.cpp b/storage/cache.cpp index 2f15b76d..cbd492d4 100644 --- a/storage/cache.cpp +++ b/storage/cache.cpp @@ -33,6 +33,7 @@ #include #include #include +#include #include #include @@ -192,6 +193,9 @@ void NHTFlowCache::init(const char *params) m_timeout_idx = 0; m_line_mask = (m_cache_size - 1) & ~(m_line_size - 1); m_line_new_idx = m_line_size / 2; + #ifdef WITH_CTT + m_ctt_controller.init(parser.m_dev, 0); + #endif /* WITH_CTT */ if (m_export_queue == nullptr) { throw PluginError("output queue must be set before init"); @@ -658,4 +662,68 @@ void NHTFlowCache::prefetch_export_expired() const __builtin_prefetch(m_flow_table[i], 0, 1); } } + +#ifdef WITH_CTT + +void CttController::create_record(uint64_t flow_hash_ctt, const struct timeval& ts) +{ + try { + std::vector key = assemble_key(flow_hash_ctt); + std::vector state = assemble_state( + OffloadMode::PACKET_OFFLOAD, + MetaType::FULL, + ts); + + std::cout << "Created record\n\tkey: " << flow_hash_ctt << "\n\tstate: "; + for (auto& byte : state) { + std::cout << std::hex << static_cast(byte) << " "; + } + std::cout << std::endl; + m_commander->write_record(std::move(key), std::move(state)); + } + catch (const std::exception& e) { + throw; + } +} + +void CttController::export_record(uint64_t flow_hash_ctt) +{ + try { + std::vector key = assemble_key(flow_hash_ctt); + m_commander->export_and_delete_record(std::move(key)); + std::cout << "Exported record with key: " << flow_hash_ctt << std::endl; + } + catch (const std::exception& e) { + throw; + } +} + +std::vector CttController::assemble_key(uint64_t flow_hash_ctt) +{ + std::vector key(key_size_bytes, std::byte(0)); + for (size_t i = 0; i < sizeof(flow_hash_ctt) && i < key_size_bytes; ++i) { + key[i] = static_cast((flow_hash_ctt >> (8 * i)) & 0xFF); + } + return key; +} + +std::vector CttController::assemble_state( + OffloadMode offload_mode, MetaType meta_type, const struct timeval& ts) +{ + std::vector state(state_size_bytes, std::byte(0)); + std::vector state_mask(state_mask_size_bytes, std::byte(0)); + + state[0] = static_cast(offload_mode); + state[1] = static_cast(meta_type); + + // timestamp in sec/ns format, 32+32 bits - 64 bits in total + for (size_t i = 0; i < sizeof(ts.tv_sec) && i < 4; ++i) { + state[2 + i] = static_cast((ts.tv_sec >> (8 * i)) & 0xFF); + } + for (size_t i = 0; i < sizeof(ts.tv_usec) && i < 4; ++i) { + state[6 + i] = static_cast((ts.tv_usec >> (8 * i)) & 0xFF); + } + return state; } +#endif // WITH_CTT +} \ No newline at end of file diff --git a/storage/cache.hpp b/storage/cache.hpp index 57087f5b..2abf3af8 100644 --- a/storage/cache.hpp +++ b/storage/cache.hpp @@ -42,8 +42,99 @@ #include "fragmentationCache/fragmentationCache.hpp" +#ifdef WITH_CTT +#include +#include +#include +#include +#include +#include +#endif /* WITH_CTT */ + namespace ipxp { +#ifdef WITH_CTT + +class CttController { +public: + enum class OffloadMode : uint8_t { + NO_OFFLOAD = 0x0, + PACKET_OFFLOAD = 0x1, + META_EXPORT = 0x2, + PACKET_OFFLOAD_WITH_EXPORT = 0x3 + }; + enum class MetaType : uint8_t { + FULL = 0x0, + HALF = 0x1, + TS_ONLY = 0x2, + NO_META = 0x3 + }; + /** + * @brief init the CTT. + * + * @param nfb_dev The NFB device file (e.g., "/dev/nfb0"). + * @param ctt_comp_index The index of the CTT component. + */ + void init(const std::string& nfb_dev, unsigned ctt_comp_index) { + m_commander = std::make_unique(ctt::NfbParams{nfb_dev, ctt_comp_index}); + try { + // Get UserInfo to determine key, state, and state_mask sizes + ctt::UserInfo user_info = m_commander->get_user_info(); + key_size_bytes = (user_info.key_bit_width + 7) / 8; + state_size_bytes = (user_info.state_bit_width + 7) / 8; + state_mask_size_bytes = (user_info.state_mask_bit_width + 7) / 8; + + // Enable the CTT + std::future enable_future = m_commander->enable(true); + enable_future.wait(); + } + catch (const std::exception& e) { + throw; + } + } + + /** + * @brief Command: mark a flow for offload. + * + * @param flow_hash_ctt The flow hash to be offloaded. + */ + void create_record(uint64_t flow_hash_ctt, const struct timeval& timestamp_first); + + /** + * @brief Command: export a flow from the CTT. + * + * @param flow_hash_ctt The flow hash to be exported. + */ + void export_record(uint64_t flow_hash_ctt); + +private: + std::unique_ptr m_commander; + size_t key_size_bytes; + size_t state_size_bytes; + size_t state_mask_size_bytes; + + /** + * @brief Assembles the state vector from the given values. + * + * @param offload_mode The offload mode. + * @param meta_type The metadata type. + * @param timestamp_first The first timestamp of the flow. + * @return A byte vector representing the assembled state vector. + */ + std::vector assemble_state( + OffloadMode offload_mode, MetaType meta_type, + const struct timeval& timestamp_first); + + /** + * @brief Assembles the key vector from the given flow hash. + * + * @param flow_hash_ctt The flow hash. + * @return A byte vector representing the assembled key vector. + */ + std::vector assemble_key(uint64_t flow_hash_ctt); +}; +#endif /* WITH_CTT */ + struct __attribute__((packed)) flow_key_v4_t { uint16_t src_port; uint16_t dst_port; @@ -99,6 +190,9 @@ class CacheOptParser : public OptionsParser bool m_enable_fragmentation_cache; std::size_t m_frag_cache_size; time_t m_frag_cache_timeout; + #ifdef WITH_CTT + std::string m_dev; + #endif /* WITH_CTT */ CacheOptParser() : OptionsParser("cache", "Storage plugin implemented as a hash table"), m_cache_size(1 << DEFAULT_FLOW_CACHE_SIZE), m_line_size(1 << DEFAULT_FLOW_LINE_SIZE), @@ -156,6 +250,16 @@ class CacheOptParser : public OptionsParser } return true; }); + + #ifdef WITH_CTT + register_option("d", "dev", "DEV", "Device name", + [this](const char *arg) { + m_dev = arg; + return true; + }, + OptionFlags::RequiredArgument); + #endif /* WITH_CTT */ + } }; @@ -214,6 +318,35 @@ class NHTFlowCache : TelemetryUtils, public StoragePlugin */ void set_telemetry_dir(std::shared_ptr dir) override; + #ifdef WITH_CTT + + int plugins_post_create(Flow &rec, Packet &pkt) { + int ret = StoragePlugin::plugins_post_create(rec, pkt); + rec.ctt_state = static_cast(CttController::OffloadMode::NO_OFFLOAD); + if (no_data_required(rec)) { + m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first); + rec.ctt_state = static_cast(CttController::OffloadMode::PACKET_OFFLOAD); + } + return ret; + } + + // override post_update method + int plugins_post_update(Flow &rec, Packet &pkt) { + int ret = StoragePlugin::plugins_post_update(rec, pkt); + if (no_data_required(rec) && (rec.ctt_state == static_cast(CttController::OffloadMode::NO_OFFLOAD))) { + m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first); + rec.ctt_state = static_cast(CttController::OffloadMode::PACKET_OFFLOAD); + } + return ret; + } + + // override pre_export method + void plugins_pre_export(Flow &rec) { + StoragePlugin::plugins_pre_export(rec); + m_ctt_controller.export_record(rec.flow_hash_ctt); + } + #endif /* WITH_CTT */ + private: uint32_t m_cache_size; uint32_t m_line_size; @@ -242,7 +375,9 @@ class NHTFlowCache : TelemetryUtils, public StoragePlugin char m_key_inv[MAX_KEY_LENGTH]; FlowRecord **m_flow_table; FlowRecord *m_flow_records; - +#ifdef WITH_CTT + CttController m_ctt_controller; +#endif /* WITH_CTT */ FragmentationCache m_fragmentation_cache; FlowEndReasonStats m_flow_end_reason_stats = {}; FlowRecordStats m_flow_record_stats = {}; @@ -265,4 +400,4 @@ class NHTFlowCache : TelemetryUtils, public StoragePlugin }; } -#endif /* IPXP_STORAGE_CACHE_HPP */ +#endif /* IPXP_STORAGE_CACHE_HPP */ \ No newline at end of file diff --git a/storage/ctt-controller.cpp b/storage/ctt-controller.cpp deleted file mode 100644 index 649efeb2..00000000 --- a/storage/ctt-controller.cpp +++ /dev/null @@ -1,111 +0,0 @@ -/** - * \file ctt-controller.cpp - * \brief Connection Tracking Table (CTT) controller - * \author Jaroslav Pesek - * \date 2024 - */ -/* - * Copyright (C) 2024 CESNET - * - * LICENSE TERMS - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions - * are met: - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in - * the documentation and/or other materials provided with the - * distribution. - * 3. Neither the name of the Company nor the names of its contributors - * may be used to endorse or promote products derived from this - * software without specific prior written permission. - * - * - * - */ - -#include "ctt-controller.hpp" -#include -#include - -namespace ipxp { - -CttController::CttController(const std::string& nfb_dev, unsigned ctt_comp_index) - : m_commander(ctt::NfbParams{nfb_dev, ctt_comp_index}) -{ - try { - // Get UserInfo to determine key, state, and state_mask sizes - ctt::UserInfo user_info = m_commander.get_user_info(); - key_size_bytes = (user_info.key_bit_width + 7) / 8; - state_size_bytes = (user_info.state_bit_width + 7) / 8; - state_mask_size_bytes = (user_info.state_mask_bit_width + 7) / 8; - - // Enable the CTT - std::future enable_future = m_commander.enable(true); - enable_future.wait(); - } - catch (const std::exception& e) { - throw; - } -} - -void CttController::create_record(uint64_t flow_hash_ctt, const struct timeval& ts) -{ - try { - std::vector key = assemble_key(flow_hash_ctt); - std::vector state = assemble_state( - OffloadMode::PACKET_OFFLOAD_WITH_EXPORT, - MetaType::FULL, - ts); - - m_commander.write_record(std::move(key), std::move(state)); - } - catch (const std::exception& e) { - throw; - } -} - -void CttController::export_record(uint64_t flow_hash_ctt) -{ - try { - std::vector key = assemble_key(flow_hash_ctt); - - m_commander.export_record(std::move(key)); - } - catch (const std::exception& e) { - throw; - } -} - -std::vector CttController::assemble_key(uint64_t flow_hash_ctt) -{ - std::vector key(key_size_bytes, std::byte(0)); - for (size_t i = 0; i < sizeof(flow_hash_ctt) && i < key_size_bytes; ++i) { - key[i] = static_cast((flow_hash_ctt >> (8 * i)) & 0xFF); - } - return key; - -} - -std::vector CttController::assemble_state( - OffloadMode offload_mode, MetaType meta_type, const struct timeval& ts) -{ - std::vector state(state_size_bytes, std::byte(0)); - std::vector state_mask(state_mask_size_bytes, std::byte(0)); - - state[0] = static_cast(offload_mode); - state[1] = static_cast(meta_type); - - // timestamp in sec/ns format, 32+32 bits - 64 bits in total - for (size_t i = 0; i < sizeof(ts.tv_sec) && i < 4; ++i) { - state[2 + i] = static_cast((ts.tv_sec >> (8 * i)) & 0xFF); - } - for (size_t i = 0; i < sizeof(ts.tv_usec) && i < 4; ++i) { - state[6 + i] = static_cast((ts.tv_usec >> (8 * i)) & 0xFF); - } - return state; -} - -} // namespace ipxp \ No newline at end of file diff --git a/storage/ctt-controller.hpp b/storage/ctt-controller.hpp deleted file mode 100644 index f5da0af0..00000000 --- a/storage/ctt-controller.hpp +++ /dev/null @@ -1,107 +0,0 @@ -/** - * \file ctt-controller.hpp - * \brief Connection Tracking Table (CTT) controller - * \author Jaroslav Pesek - * \date 2024 - */ -/* - * Copyright (C) 2024 CESNET - * - * LICENSE TERMS - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions - * are met: - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in - * the documentation and/or other materials provided with the - * distribution. - * 3. Neither the name of the Company nor the names of its contributors - * may be used to endorse or promote products derived from this - * software without specific prior written permission. - * - * - * - */ - -#ifndef IPXP_CTT_CONTROLLER_HPP -#define IPXP_CTT_CONTROLLER_HPP - -#include - -#include -#include -#include -#include - -#include - -namespace ipxp { - -class CttController { -public: - enum class OffloadMode : uint8_t { - NO_OFFLOAD = 0x0, - PACKET_OFFLOAD = 0x1, - META_EXPORT = 0x2, - PACKET_OFFLOAD_WITH_EXPORT = 0x3 - }; - enum class MetaType : uint8_t { - FULL = 0x0, - HALF = 0x1, - TS_ONLY = 0x2, - NO_META = 0x3 - }; - /** - * @brief Constructor that initializes the CTT. - * - * @param nfb_dev The NFB device file (e.g., "/dev/nfb0"). - * @param ctt_comp_index The index of the CTT component. - */ - CttController(const std::string& nfb_dev, unsigned ctt_comp_index); - - /** - * @brief Command: mark a flow for offload. - * - * @param flow_hash_ctt The flow hash to be offloaded. - */ - void create_record(uint64_t flow_hash_ctt, const struct timeval& timestamp_first); - - /** - * @brief Command: export a flow from the CTT. - * - * @param flow_hash_ctt The flow hash to be exported. - */ - void export_record(uint64_t flow_hash_ctt); - -private: - ctt::AsyncCommander m_commander; - size_t key_size_bytes; - size_t state_size_bytes; - size_t state_mask_size_bytes; - - /** - * @brief Assembles the state vector from the given values. - * - * @param offload_mode The offload mode. - * @param meta_type The metadata type. - * @param timestamp_first The first timestamp of the flow. - * @return A byte vector representing the assembled state vector. - */ - std::vector assemble_state( - OffloadMode offload_mode, MetaType meta_type, - const struct timeval& timestamp_first); - - /** - * @brief Assembles the key vector from the given flow hash. - * - * @param flow_hash_ctt The flow hash. - * @return A byte vector representing the assembled key vector. - */ - std::vector assemble_key(uint64_t flow_hash_ctt); -}; -} // namespace ipxp - -#endif /* IPXP_CTT_CONTROLLER_HPP */ From 036856ddd1e68b7ac2cb5436fdd6af595a20be89 Mon Sep 17 00:00:00 2001 From: Jaroslav Pesek Date: Thu, 28 Nov 2024 16:27:42 +0100 Subject: [PATCH 5/5] ctt - solving hazard during inconsistent state --- include/ipfixprobe/flowifc.hpp | 9 ++++--- include/ipfixprobe/packet.hpp | 6 +++-- include/ipfixprobe/storage.hpp | 3 --- storage/cache.cpp | 46 ++++++++++++++++++++++++++-------- storage/cache.hpp | 44 ++++++++++++++++++++++++-------- 5 files changed, 79 insertions(+), 29 deletions(-) diff --git a/include/ipfixprobe/flowifc.hpp b/include/ipfixprobe/flowifc.hpp index 05c77774..2f3b5acb 100644 --- a/include/ipfixprobe/flowifc.hpp +++ b/include/ipfixprobe/flowifc.hpp @@ -38,6 +38,7 @@ #include #include #include +#include #ifdef WITH_NEMEA #include @@ -265,9 +266,10 @@ struct Flow : public Record { uint64_t flow_hash; #ifdef WITH_CTT - uint64_t flow_hash_ctt; /**< Flow hash for CTT. */ - bool ctt_valid; /**< CTT validity flag. */ - int ctt_state; /**< CTT - offload or not. */ + uint64_t flow_hash_ctt; /**< Flow hash for CTT. */ + bool record_in_ctt; /**< CTT - offload or not. */ + bool is_delayed; /**< Delayed export flag. */ + time_t delay_time; /**< Time until export of the flow is delayed. */ #endif PluginsStatus plugins_status; /**< Statuses of the process plugins for this flow, used to check @@ -297,4 +299,5 @@ struct Flow : public Record { }; } + #endif /* IPXP_FLOWIFC_HPP */ diff --git a/include/ipfixprobe/packet.hpp b/include/ipfixprobe/packet.hpp index bd1e5aba..575c2fb9 100644 --- a/include/ipfixprobe/packet.hpp +++ b/include/ipfixprobe/packet.hpp @@ -46,8 +46,10 @@ namespace ipxp { * \brief Structure for storing parsed packet fields */ struct Packet : public Record { - Metadata_CTT cttmeta; /**< Metadata from CTT */ - bool cttmeta_valid; /**< True if CTT metadata is valid */ + #ifdef WITH_CTT + Metadata_CTT cttmeta; /**< Metadata from CTT */ + bool cttmeta_valid; /**< True if CTT metadata is valid */ + #endif /* WITH_CTT */ struct timeval ts; uint8_t dst_mac[6]; diff --git a/include/ipfixprobe/storage.hpp b/include/ipfixprobe/storage.hpp index d9573bbb..5296557e 100644 --- a/include/ipfixprobe/storage.hpp +++ b/include/ipfixprobe/storage.hpp @@ -191,10 +191,7 @@ class StoragePlugin : public Plugin { // if metadata are valid, add flow hash ctt to the flow record if (pkt.cttmeta_valid) { - rec.ctt_valid = true; rec.flow_hash_ctt = pkt.cttmeta.flow_hash; - } else { - rec.ctt_valid = false; } PluginStatusConverter plugin_status_converter(m_plugins_status); int ret = 0; diff --git a/storage/cache.cpp b/storage/cache.cpp index cbd492d4..af8171c4 100644 --- a/storage/cache.cpp +++ b/storage/cache.cpp @@ -139,10 +139,22 @@ void FlowRecord::create(const Packet &pkt, uint64_t hash) m_flow.src_port = pkt.src_port; m_flow.dst_port = pkt.dst_port; } + #ifdef WITH_CTT + m_flow.is_delayed = false; + m_delayed_flow_waiting = false; + #endif /* WITH_CTT */ } void FlowRecord::update(const Packet &pkt, bool src) { + if (m_flow.is_delayed && !pkt.cttmeta.ctt_rec_matched) { // it means, the flow is waiting for export and it is not matched in CTT -> it must be new flow + auto flow_hash = m_hash; + m_delayed_flow = m_flow; + m_delayed_flow_waiting = true; + erase(); // erase the old flow, keeping the delayed flow + create(pkt, flow_hash); + return; + } m_flow.time_last = pkt.ts; if (src) { m_flow.src_packets++; @@ -260,6 +272,17 @@ void NHTFlowCache::set_queue(ipx_ring_t *queue) void NHTFlowCache::export_flow(size_t index) { + if (m_flow_table[index]->m_flow.is_delayed) { + return; + } + if (m_flow_table[index]->m_delayed_flow_waiting && !m_flow_table[index]->m_delayed_flow.is_delayed) { + m_total_exported++; + update_flow_end_reason_stats(m_flow_table[index]->m_delayed_flow.end_reason); + update_flow_record_stats( + m_flow_table[index]->m_delayed_flow.src_packets + + m_flow_table[index]->m_delayed_flow.dst_packets); + ipx_ring_push(m_export_queue, &m_flow_table[index]->m_delayed_flow); + } m_total_exported++; update_flow_end_reason_stats(m_flow_table[index]->m_flow.end_reason); update_flow_record_stats( @@ -506,6 +529,16 @@ void NHTFlowCache::export_expired(time_t ts) m_flow_table[i]->m_flow.end_reason = get_export_reason(m_flow_table[i]->m_flow); plugins_pre_export(m_flow_table[i]->m_flow); export_flow(i); + if (!m_flow_table[i]->is_empty() && m_flow_table[i]->m_flow.is_delayed && m_flow_table[i]->m_flow.delay_time >= ts) { + m_flow_table[i]->m_flow.is_delayed = false; + plugins_pre_export(m_flow_table[i]->m_flow); + export_flow(i); + } + if(!m_flow_table[i]->is_empty() && m_flow_table[i]->m_delayed_flow_waiting && m_flow_table[i]->m_delayed_flow.delay_time >= ts) { + m_flow_table[i]->m_delayed_flow_waiting = false; + plugins_pre_export(m_flow_table[i]->m_delayed_flow); + export_flow(i); + } #ifdef FLOW_CACHE_STATS m_expired++; #endif /* FLOW_CACHE_STATS */ @@ -668,18 +701,12 @@ void NHTFlowCache::prefetch_export_expired() const void CttController::create_record(uint64_t flow_hash_ctt, const struct timeval& ts) { try { - std::vector key = assemble_key(flow_hash_ctt); - std::vector state = assemble_state( + std::vector key = assemble_key(flow_hash_ctt); + std::vector state = assemble_state( OffloadMode::PACKET_OFFLOAD, MetaType::FULL, ts); - - std::cout << "Created record\n\tkey: " << flow_hash_ctt << "\n\tstate: "; - for (auto& byte : state) { - std::cout << std::hex << static_cast(byte) << " "; - } - std::cout << std::endl; - m_commander->write_record(std::move(key), std::move(state)); + m_commander->write_record(std::move(key), std::move(state)); } catch (const std::exception& e) { throw; @@ -691,7 +718,6 @@ void CttController::export_record(uint64_t flow_hash_ctt) try { std::vector key = assemble_key(flow_hash_ctt); m_commander->export_and_delete_record(std::move(key)); - std::cout << "Exported record with key: " << flow_hash_ctt << std::endl; } catch (const std::exception& e) { throw; diff --git a/storage/cache.hpp b/storage/cache.hpp index 2abf3af8..ad8de024 100644 --- a/storage/cache.hpp +++ b/storage/cache.hpp @@ -32,6 +32,9 @@ #ifndef IPXP_STORAGE_CACHE_HPP #define IPXP_STORAGE_CACHE_HPP +#include +#include +#include #include #include @@ -49,6 +52,8 @@ #include #include #include +#include +#include #endif /* WITH_CTT */ namespace ipxp { @@ -269,6 +274,10 @@ class alignas(64) FlowRecord public: Flow m_flow; + #ifdef WITH_CTT + Flow m_delayed_flow; + bool m_delayed_flow_waiting; + #endif /* WITH_CTT */ FlowRecord(); ~FlowRecord(); @@ -322,29 +331,42 @@ class NHTFlowCache : TelemetryUtils, public StoragePlugin int plugins_post_create(Flow &rec, Packet &pkt) { int ret = StoragePlugin::plugins_post_create(rec, pkt); - rec.ctt_state = static_cast(CttController::OffloadMode::NO_OFFLOAD); - if (no_data_required(rec)) { - m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first); - rec.ctt_state = static_cast(CttController::OffloadMode::PACKET_OFFLOAD); - } + rec.record_in_ctt = false; + //if (only_metadata_required(rec)) { + if (only_metadata_required(rec)) { + m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first); + rec.record_in_ctt = true; + } return ret; } // override post_update method int plugins_post_update(Flow &rec, Packet &pkt) { int ret = StoragePlugin::plugins_post_update(rec, pkt); - if (no_data_required(rec) && (rec.ctt_state == static_cast(CttController::OffloadMode::NO_OFFLOAD))) { - m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first); - rec.ctt_state = static_cast(CttController::OffloadMode::PACKET_OFFLOAD); - } + //if (only_metadata_required(rec) && !rec.ctt_state) { + if (!rec.record_in_ctt) { // only for debug!!!!! line above is correct for production + m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first); + rec.record_in_ctt = true; + } return ret; } // override pre_export method void plugins_pre_export(Flow &rec) { - StoragePlugin::plugins_pre_export(rec); - m_ctt_controller.export_record(rec.flow_hash_ctt); + if (rec.record_in_ctt) { + rec.is_delayed = true; + rec.delay_time = time(nullptr) + 1; + m_ctt_controller.export_record(rec.flow_hash_ctt); + rec.record_in_ctt = false; + return; + } + if (rec.is_delayed) { + return; + } else { + StoragePlugin::plugins_pre_export(rec); + } } + #endif /* WITH_CTT */ private: