From 32593a6f57ffc2955a3e39b52ade93bdd135bcab Mon Sep 17 00:00:00 2001 From: Pavel Siska Date: Mon, 16 Dec 2024 13:34:43 +0100 Subject: [PATCH] WIP --- include/ipfixprobe/input.hpp | 6 + input/ndp.cpp | 18 + input/ndp.hpp | 6 + ipfixprobe.cpp | 4 + storage/cache.cpp | 1070 +++++++++++++++++----------------- storage/cache.hpp | 21 +- 6 files changed, 589 insertions(+), 536 deletions(-) diff --git a/include/ipfixprobe/input.hpp b/include/ipfixprobe/input.hpp index aa5c0935..875d8151 100644 --- a/include/ipfixprobe/input.hpp +++ b/include/ipfixprobe/input.hpp @@ -68,6 +68,12 @@ class InputPlugin : public TelemetryUtils, public Plugin std::shared_ptr plugin_dir, std::shared_ptr queues_dir); +#ifdef WITH_CTT + virtual std::pair get_ctt_config() const { + throw PluginError("CTT is not supported by this input plugin"); + } +#endif /* WITH_CTT */ + protected: virtual void configure_telemetry_dirs( std::shared_ptr plugin_dir, diff --git a/input/ndp.cpp b/input/ndp.cpp index 34529554..2e8d448d 100644 --- a/input/ndp.cpp +++ b/input/ndp.cpp @@ -97,6 +97,7 @@ void NdpPacketReader::init(const char *params) m_ctt_metadata = true; } init_ifc(parser.m_dev); + m_device = parser.m_dev; } void NdpPacketReader::close() @@ -104,6 +105,23 @@ void NdpPacketReader::close() ndpReader.close(); } +#ifdef WITH_CTT + +std::pair NdpPacketReader::get_ctt_config() const +{ + std::string dev = m_device; + int channel_id = 0; + std::size_t delimiter_found = m_device.find_last_of(":"); + if (delimiter_found != std::string::npos) { + std::string channel_str = m_device.substr(delimiter_found + 1); + dev = m_device.substr(0, delimiter_found); + channel_id = std::stoi(channel_str); + } + return std::make_pair(dev, channel_id); +} + +#endif /* WITH_CTT */ + void NdpPacketReader::init_ifc(const std::string &dev) { if (ndpReader.init_interface(dev) != 0) { diff --git a/input/ndp.hpp b/input/ndp.hpp index 61993463..12a98a4a 100644 --- a/input/ndp.hpp +++ b/input/ndp.hpp @@ -73,6 +73,10 @@ class NdpPacketReader : public InputPlugin std::shared_ptr plugin_dir, std::shared_ptr queues_dir) override; +#ifdef WITH_CTT + virtual std::pair get_ctt_config() const override; +#endif /* WITH_CTT */ + private: struct RxStats { uint64_t receivedPackets; @@ -87,6 +91,8 @@ class NdpPacketReader : public InputPlugin bool m_ctt_metadata = false; + std::string m_device; + void init_ifc(const std::string &dev); int parse_ctt_metadata(const ndp_packet *ndp_packet, Metadata_CTT &ctt); }; diff --git a/ipfixprobe.cpp b/ipfixprobe.cpp index 1c4ce6c3..9466a83d 100644 --- a/ipfixprobe.cpp +++ b/ipfixprobe.cpp @@ -358,6 +358,10 @@ bool process_plugin_args(ipxp_conf_t &conf, IpfixprobeOptParser &parser) if (storage_plugin == nullptr) { throw IPXPError("invalid storage plugin " + storage_name); } +#ifdef WITH_CTT + const auto& [device, comp_idx] = input_plugin->get_ctt_config(); + storage_plugin->set_ctt_config(device, comp_idx); +#endif /* WITH_CTT */ storage_plugin->set_queue(output_queue); storage_plugin->init(storage_params.c_str()); storage_plugin->set_telemetry_dir(pipeline_queue_dir); diff --git a/storage/cache.cpp b/storage/cache.cpp index af8171c4..3f1cb437 100644 --- a/storage/cache.cpp +++ b/storage/cache.cpp @@ -31,669 +31,695 @@ */ #include -#include #include +#include #include #include -#include #include "cache.hpp" #include "xxhash.h" +#include namespace ipxp { __attribute__((constructor)) static void register_this_plugin() { - static PluginRecord rec = PluginRecord("cache", [](){return new NHTFlowCache();}); - register_plugin(&rec); + static PluginRecord rec = PluginRecord("cache", []() { return new NHTFlowCache(); }); + register_plugin(&rec); } FlowRecord::FlowRecord() { - erase(); + erase(); }; FlowRecord::~FlowRecord() { - erase(); + erase(); }; void FlowRecord::erase() { - m_flow.remove_extensions(); - m_hash = 0; - - memset(&m_flow.time_first, 0, sizeof(m_flow.time_first)); - memset(&m_flow.time_last, 0, sizeof(m_flow.time_last)); - m_flow.ip_version = 0; - m_flow.ip_proto = 0; - memset(&m_flow.src_ip, 0, sizeof(m_flow.src_ip)); - memset(&m_flow.dst_ip, 0, sizeof(m_flow.dst_ip)); - m_flow.src_port = 0; - m_flow.dst_port = 0; - m_flow.src_packets = 0; - m_flow.dst_packets = 0; - m_flow.src_bytes = 0; - m_flow.dst_bytes = 0; - m_flow.src_tcp_flags = 0; - m_flow.dst_tcp_flags = 0; + m_flow.remove_extensions(); + m_hash = 0; + + memset(&m_flow.time_first, 0, sizeof(m_flow.time_first)); + memset(&m_flow.time_last, 0, sizeof(m_flow.time_last)); + m_flow.ip_version = 0; + m_flow.ip_proto = 0; + memset(&m_flow.src_ip, 0, sizeof(m_flow.src_ip)); + memset(&m_flow.dst_ip, 0, sizeof(m_flow.dst_ip)); + m_flow.src_port = 0; + m_flow.dst_port = 0; + m_flow.src_packets = 0; + m_flow.dst_packets = 0; + m_flow.src_bytes = 0; + m_flow.dst_bytes = 0; + m_flow.src_tcp_flags = 0; + m_flow.dst_tcp_flags = 0; } void FlowRecord::reuse() { - m_flow.remove_extensions(); - m_flow.time_first = m_flow.time_last; - m_flow.src_packets = 0; - m_flow.dst_packets = 0; - m_flow.src_bytes = 0; - m_flow.dst_bytes = 0; - m_flow.src_tcp_flags = 0; - m_flow.dst_tcp_flags = 0; + m_flow.remove_extensions(); + m_flow.time_first = m_flow.time_last; + m_flow.src_packets = 0; + m_flow.dst_packets = 0; + m_flow.src_bytes = 0; + m_flow.dst_bytes = 0; + m_flow.src_tcp_flags = 0; + m_flow.dst_tcp_flags = 0; } inline __attribute__((always_inline)) bool FlowRecord::is_empty() const { - return m_hash == 0; + return m_hash == 0; } inline __attribute__((always_inline)) bool FlowRecord::belongs(uint64_t hash) const { - return hash == m_hash; + return hash == m_hash; } -void FlowRecord::create(const Packet &pkt, uint64_t hash) -{ - m_flow.src_packets = 1; - - m_hash = hash; - - m_flow.time_first = pkt.ts; - m_flow.time_last = pkt.ts; - m_flow.flow_hash = hash; - - memcpy(m_flow.src_mac, pkt.src_mac, 6); - memcpy(m_flow.dst_mac, pkt.dst_mac, 6); - - if (pkt.ip_version == IP::v4) { - m_flow.ip_version = pkt.ip_version; - m_flow.ip_proto = pkt.ip_proto; - m_flow.src_ip.v4 = pkt.src_ip.v4; - m_flow.dst_ip.v4 = pkt.dst_ip.v4; - m_flow.src_bytes = pkt.ip_len; - } else if (pkt.ip_version == IP::v6) { - m_flow.ip_version = pkt.ip_version; - m_flow.ip_proto = pkt.ip_proto; - memcpy(m_flow.src_ip.v6, pkt.src_ip.v6, 16); - memcpy(m_flow.dst_ip.v6, pkt.dst_ip.v6, 16); - m_flow.src_bytes = pkt.ip_len; - } - - if (pkt.ip_proto == IPPROTO_TCP) { - m_flow.src_port = pkt.src_port; - m_flow.dst_port = pkt.dst_port; - m_flow.src_tcp_flags = pkt.tcp_flags; - } else if (pkt.ip_proto == IPPROTO_UDP) { - m_flow.src_port = pkt.src_port; - m_flow.dst_port = pkt.dst_port; - } else if (pkt.ip_proto == IPPROTO_ICMP || - pkt.ip_proto == IPPROTO_ICMPV6) { - m_flow.src_port = pkt.src_port; - m_flow.dst_port = pkt.dst_port; - } - #ifdef WITH_CTT - m_flow.is_delayed = false; - m_delayed_flow_waiting = false; - #endif /* WITH_CTT */ -} +void FlowRecord::create(const Packet& pkt, uint64_t hash) +{ + m_flow.src_packets = 1; + + m_hash = hash; + + m_flow.time_first = pkt.ts; + m_flow.time_last = pkt.ts; + m_flow.flow_hash = hash; -void FlowRecord::update(const Packet &pkt, bool src) -{ - if (m_flow.is_delayed && !pkt.cttmeta.ctt_rec_matched) { // it means, the flow is waiting for export and it is not matched in CTT -> it must be new flow - auto flow_hash = m_hash; - m_delayed_flow = m_flow; - m_delayed_flow_waiting = true; - erase(); // erase the old flow, keeping the delayed flow - create(pkt, flow_hash); - return; - } - m_flow.time_last = pkt.ts; - if (src) { - m_flow.src_packets++; - m_flow.src_bytes += pkt.ip_len; - - if (pkt.ip_proto == IPPROTO_TCP) { - m_flow.src_tcp_flags |= pkt.tcp_flags; - } - } else { - m_flow.dst_packets++; - m_flow.dst_bytes += pkt.ip_len; - - if (pkt.ip_proto == IPPROTO_TCP) { - m_flow.dst_tcp_flags |= pkt.tcp_flags; - } - } + memcpy(m_flow.src_mac, pkt.src_mac, 6); + memcpy(m_flow.dst_mac, pkt.dst_mac, 6); + + if (pkt.ip_version == IP::v4) { + m_flow.ip_version = pkt.ip_version; + m_flow.ip_proto = pkt.ip_proto; + m_flow.src_ip.v4 = pkt.src_ip.v4; + m_flow.dst_ip.v4 = pkt.dst_ip.v4; + m_flow.src_bytes = pkt.ip_len; + } else if (pkt.ip_version == IP::v6) { + m_flow.ip_version = pkt.ip_version; + m_flow.ip_proto = pkt.ip_proto; + memcpy(m_flow.src_ip.v6, pkt.src_ip.v6, 16); + memcpy(m_flow.dst_ip.v6, pkt.dst_ip.v6, 16); + m_flow.src_bytes = pkt.ip_len; + } + + if (pkt.ip_proto == IPPROTO_TCP) { + m_flow.src_port = pkt.src_port; + m_flow.dst_port = pkt.dst_port; + m_flow.src_tcp_flags = pkt.tcp_flags; + } else if (pkt.ip_proto == IPPROTO_UDP) { + m_flow.src_port = pkt.src_port; + m_flow.dst_port = pkt.dst_port; + } else if (pkt.ip_proto == IPPROTO_ICMP || pkt.ip_proto == IPPROTO_ICMPV6) { + m_flow.src_port = pkt.src_port; + m_flow.dst_port = pkt.dst_port; + } +#ifdef WITH_CTT + m_flow.is_delayed = false; + m_delayed_flow_waiting = false; +#endif /* WITH_CTT */ } +void FlowRecord::update(const Packet& pkt, bool src) +{ + if (m_flow.is_delayed + && !pkt.cttmeta.ctt_rec_matched) { // it means, the flow is waiting for export and it is not + // matched in CTT -> it must be new flow + auto flow_hash = m_hash; + m_delayed_flow = m_flow; + m_delayed_flow_waiting = true; + erase(); // erase the old flow, keeping the delayed flow + create(pkt, flow_hash); + return; + } + m_flow.time_last = pkt.ts; + if (src) { + m_flow.src_packets++; + m_flow.src_bytes += pkt.ip_len; + + if (pkt.ip_proto == IPPROTO_TCP) { + m_flow.src_tcp_flags |= pkt.tcp_flags; + } + } else { + m_flow.dst_packets++; + m_flow.dst_bytes += pkt.ip_len; + + if (pkt.ip_proto == IPPROTO_TCP) { + m_flow.dst_tcp_flags |= pkt.tcp_flags; + } + } +} -NHTFlowCache::NHTFlowCache() : - m_cache_size(0), m_line_size(0), m_line_mask(0), m_line_new_idx(0), - m_qsize(0), m_qidx(0), m_timeout_idx(0), m_active(0), m_inactive(0), - m_split_biflow(false), m_enable_fragmentation_cache(true), m_keylen(0), - m_key(), m_key_inv(), m_flow_table(nullptr), m_flow_records(nullptr), - m_fragmentation_cache(0, 0) +NHTFlowCache::NHTFlowCache() + : m_cache_size(0) + , m_line_size(0) + , m_line_mask(0) + , m_line_new_idx(0) + , m_qsize(0) + , m_qidx(0) + , m_timeout_idx(0) + , m_active(0) + , m_inactive(0) + , m_split_biflow(false) + , m_enable_fragmentation_cache(true) + , m_keylen(0) + , m_key() + , m_key_inv() + , m_flow_table(nullptr) + , m_flow_records(nullptr) + , m_fragmentation_cache(0, 0) { } NHTFlowCache::~NHTFlowCache() { - close(); + close(); } -void NHTFlowCache::init(const char *params) -{ - CacheOptParser parser; - try { - parser.parse(params); - } catch (ParserError &e) { - throw PluginError(e.what()); - } - - m_cache_size = parser.m_cache_size; - m_line_size = parser.m_line_size; - m_active = parser.m_active; - m_inactive = parser.m_inactive; - m_qidx = 0; - m_timeout_idx = 0; - m_line_mask = (m_cache_size - 1) & ~(m_line_size - 1); - m_line_new_idx = m_line_size / 2; - #ifdef WITH_CTT - m_ctt_controller.init(parser.m_dev, 0); - #endif /* WITH_CTT */ - - if (m_export_queue == nullptr) { - throw PluginError("output queue must be set before init"); - } - - if (m_line_size > m_cache_size) { - throw PluginError("flow cache line size must be greater or equal to cache size"); - } - if (m_cache_size == 0) { - throw PluginError("flow cache won't properly work with 0 records"); - } - - try { - m_flow_table = new FlowRecord*[m_cache_size + m_qsize]; - m_flow_records = new FlowRecord[m_cache_size + m_qsize]; - for (decltype(m_cache_size + m_qsize) i = 0; i < m_cache_size + m_qsize; i++) { - m_flow_table[i] = m_flow_records + i; - } - } catch (std::bad_alloc &e) { - throw PluginError("not enough memory for flow cache allocation"); - } - - m_split_biflow = parser.m_split_biflow; - m_enable_fragmentation_cache = parser.m_enable_fragmentation_cache; - - if (m_enable_fragmentation_cache) { - try { - m_fragmentation_cache = FragmentationCache(parser.m_frag_cache_size, parser.m_frag_cache_timeout); - } catch (std::bad_alloc &e) { - throw PluginError("not enough memory for fragment cache allocation"); - } - } +void NHTFlowCache::init(const char* params) +{ + CacheOptParser parser; + try { + parser.parse(params); + } catch (ParserError& e) { + throw PluginError(e.what()); + } + + m_cache_size = parser.m_cache_size; + m_line_size = parser.m_line_size; + m_active = parser.m_active; + m_inactive = parser.m_inactive; + m_qidx = 0; + m_timeout_idx = 0; + m_line_mask = (m_cache_size - 1) & ~(m_line_size - 1); + m_line_new_idx = m_line_size / 2; +#ifdef WITH_CTT + if (m_ctt_device.empty()) { + throw PluginError("CTT device must be set before init"); + } + m_ctt_controller.init(m_ctt_device, m_ctt_comp_index); +#endif /* WITH_CTT */ + + if (m_export_queue == nullptr) { + throw PluginError("output queue must be set before init"); + } + + if (m_line_size > m_cache_size) { + throw PluginError("flow cache line size must be greater or equal to cache size"); + } + if (m_cache_size == 0) { + throw PluginError("flow cache won't properly work with 0 records"); + } + + try { + m_flow_table = new FlowRecord*[m_cache_size + m_qsize]; + m_flow_records = new FlowRecord[m_cache_size + m_qsize]; + for (decltype(m_cache_size + m_qsize) i = 0; i < m_cache_size + m_qsize; i++) { + m_flow_table[i] = m_flow_records + i; + } + } catch (std::bad_alloc& e) { + throw PluginError("not enough memory for flow cache allocation"); + } + + m_split_biflow = parser.m_split_biflow; + m_enable_fragmentation_cache = parser.m_enable_fragmentation_cache; + + if (m_enable_fragmentation_cache) { + try { + m_fragmentation_cache + = FragmentationCache(parser.m_frag_cache_size, parser.m_frag_cache_timeout); + } catch (std::bad_alloc& e) { + throw PluginError("not enough memory for fragment cache allocation"); + } + } #ifdef FLOW_CACHE_STATS - m_empty = 0; - m_not_empty = 0; - m_hits = 0; - m_expired = 0; - m_flushed = 0; - m_lookups = 0; - m_lookups2 = 0; + m_empty = 0; + m_not_empty = 0; + m_hits = 0; + m_expired = 0; + m_flushed = 0; + m_lookups = 0; + m_lookups2 = 0; #endif /* FLOW_CACHE_STATS */ } void NHTFlowCache::close() { - if (m_flow_records != nullptr) { - delete [] m_flow_records; - m_flow_records = nullptr; - } - if (m_flow_table != nullptr) { - delete [] m_flow_table; - m_flow_table = nullptr; - } + if (m_flow_records != nullptr) { + delete[] m_flow_records; + m_flow_records = nullptr; + } + if (m_flow_table != nullptr) { + delete[] m_flow_table; + m_flow_table = nullptr; + } } -void NHTFlowCache::set_queue(ipx_ring_t *queue) +void NHTFlowCache::set_queue(ipx_ring_t* queue) { - m_export_queue = queue; - m_qsize = ipx_ring_size(queue); + m_export_queue = queue; + m_qsize = ipx_ring_size(queue); } +#ifdef WITH_CTT +void NHTFlowCache::set_ctt(const std::string& device_name, uint16_t queue_id) override {} +#endif /* WITH_CTT */ + void NHTFlowCache::export_flow(size_t index) { - if (m_flow_table[index]->m_flow.is_delayed) { - return; - } - if (m_flow_table[index]->m_delayed_flow_waiting && !m_flow_table[index]->m_delayed_flow.is_delayed) { - m_total_exported++; - update_flow_end_reason_stats(m_flow_table[index]->m_delayed_flow.end_reason); - update_flow_record_stats( - m_flow_table[index]->m_delayed_flow.src_packets - + m_flow_table[index]->m_delayed_flow.dst_packets); - ipx_ring_push(m_export_queue, &m_flow_table[index]->m_delayed_flow); - } - m_total_exported++; - update_flow_end_reason_stats(m_flow_table[index]->m_flow.end_reason); - update_flow_record_stats( - m_flow_table[index]->m_flow.src_packets - + m_flow_table[index]->m_flow.dst_packets); - m_flows_in_cache--; - - ipx_ring_push(m_export_queue, &m_flow_table[index]->m_flow); - std::swap(m_flow_table[index], m_flow_table[m_cache_size + m_qidx]); - m_flow_table[index]->erase(); - m_qidx = (m_qidx + 1) % m_qsize; + if (m_flow_table[index]->m_flow.is_delayed) { + return; + } + if (m_flow_table[index]->m_delayed_flow_waiting + && !m_flow_table[index]->m_delayed_flow.is_delayed) { + m_total_exported++; + update_flow_end_reason_stats(m_flow_table[index]->m_delayed_flow.end_reason); + update_flow_record_stats( + m_flow_table[index]->m_delayed_flow.src_packets + + m_flow_table[index]->m_delayed_flow.dst_packets); + ipx_ring_push(m_export_queue, &m_flow_table[index]->m_delayed_flow); + } + m_total_exported++; + update_flow_end_reason_stats(m_flow_table[index]->m_flow.end_reason); + update_flow_record_stats( + m_flow_table[index]->m_flow.src_packets + m_flow_table[index]->m_flow.dst_packets); + m_flows_in_cache--; + + ipx_ring_push(m_export_queue, &m_flow_table[index]->m_flow); + std::swap(m_flow_table[index], m_flow_table[m_cache_size + m_qidx]); + m_flow_table[index]->erase(); + m_qidx = (m_qidx + 1) % m_qsize; } void NHTFlowCache::finish() { - for (decltype(m_cache_size) i = 0; i < m_cache_size; i++) { - if (!m_flow_table[i]->is_empty()) { - plugins_pre_export(m_flow_table[i]->m_flow); - m_flow_table[i]->m_flow.end_reason = FLOW_END_FORCED; - export_flow(i); + for (decltype(m_cache_size) i = 0; i < m_cache_size; i++) { + if (!m_flow_table[i]->is_empty()) { + plugins_pre_export(m_flow_table[i]->m_flow); + m_flow_table[i]->m_flow.end_reason = FLOW_END_FORCED; + export_flow(i); #ifdef FLOW_CACHE_STATS - m_expired++; + m_expired++; #endif /* FLOW_CACHE_STATS */ - } - } + } + } } -void NHTFlowCache::flush(Packet &pkt, size_t flow_index, int ret, bool source_flow) +void NHTFlowCache::flush(Packet& pkt, size_t flow_index, int ret, bool source_flow) { #ifdef FLOW_CACHE_STATS - m_flushed++; + m_flushed++; #endif /* FLOW_CACHE_STATS */ - if (ret == ProcessPlugin::FlowAction::FLUSH_WITH_REINSERT) { - FlowRecord *flow = m_flow_table[flow_index]; - flow->m_flow.end_reason = FLOW_END_FORCED; - ipx_ring_push(m_export_queue, &flow->m_flow); - - std::swap(m_flow_table[flow_index], m_flow_table[m_cache_size + m_qidx]); - - flow = m_flow_table[flow_index]; - flow->m_flow.remove_extensions(); - *flow = *m_flow_table[m_cache_size + m_qidx]; - m_qidx = (m_qidx + 1) % m_qsize; - - flow->m_flow.m_exts = nullptr; - flow->reuse(); // Clean counters, set time first to last - flow->update(pkt, source_flow); // Set new counters from packet - - ret = plugins_post_create(flow->m_flow, pkt); - if (ret & ProcessPlugin::FlowAction::FLUSH) { - flush(pkt, flow_index, ret, source_flow); - } - } else { - m_flow_table[flow_index]->m_flow.end_reason = FLOW_END_FORCED; - export_flow(flow_index); - } + if (ret == ProcessPlugin::FlowAction::FLUSH_WITH_REINSERT) { + FlowRecord* flow = m_flow_table[flow_index]; + flow->m_flow.end_reason = FLOW_END_FORCED; + ipx_ring_push(m_export_queue, &flow->m_flow); + + std::swap(m_flow_table[flow_index], m_flow_table[m_cache_size + m_qidx]); + + flow = m_flow_table[flow_index]; + flow->m_flow.remove_extensions(); + *flow = *m_flow_table[m_cache_size + m_qidx]; + m_qidx = (m_qidx + 1) % m_qsize; + + flow->m_flow.m_exts = nullptr; + flow->reuse(); // Clean counters, set time first to last + flow->update(pkt, source_flow); // Set new counters from packet + + ret = plugins_post_create(flow->m_flow, pkt); + if (ret & ProcessPlugin::FlowAction::FLUSH) { + flush(pkt, flow_index, ret, source_flow); + } + } else { + m_flow_table[flow_index]->m_flow.end_reason = FLOW_END_FORCED; + export_flow(flow_index); + } } -int NHTFlowCache::put_pkt(Packet &pkt) +int NHTFlowCache::put_pkt(Packet& pkt) { - int ret = plugins_pre_create(pkt); + int ret = plugins_pre_create(pkt); - if (m_enable_fragmentation_cache) { - try_to_fill_ports_to_fragmented_packet(pkt); - } - - if (!create_hash_key(pkt)) { // saves key value and key length into attributes NHTFlowCache::key and NHTFlowCache::m_keylen - return 0; - } + if (m_enable_fragmentation_cache) { + try_to_fill_ports_to_fragmented_packet(pkt); + } - prefetch_export_expired(); + if (!create_hash_key(pkt)) { // saves key value and key length into attributes NHTFlowCache::key + // and NHTFlowCache::m_keylen + return 0; + } - uint64_t hashval = XXH64(m_key, m_keylen, 0); /* Calculates hash value from key created before. */ + prefetch_export_expired(); - FlowRecord *flow; /* Pointer to flow we will be working with. */ - bool found = false; - bool source_flow = true; - uint32_t line_index = hashval & m_line_mask; /* Get index of flow line. */ - uint32_t flow_index = 0; - uint32_t next_line = line_index + m_line_size; + uint64_t hashval + = XXH64(m_key, m_keylen, 0); /* Calculates hash value from key created before. */ - /* Find existing flow record in flow cache. */ - for (flow_index = line_index; flow_index < next_line; flow_index++) { - if (m_flow_table[flow_index]->belongs(hashval)) { - found = true; - break; - } - } + FlowRecord* flow; /* Pointer to flow we will be working with. */ + bool found = false; + bool source_flow = true; + uint32_t line_index = hashval & m_line_mask; /* Get index of flow line. */ + uint32_t flow_index = 0; + uint32_t next_line = line_index + m_line_size; - /* Find inversed flow. */ - if (!found && !m_split_biflow) { - uint64_t hashval_inv = XXH64(m_key_inv, m_keylen, 0); - uint64_t line_index_inv = hashval_inv & m_line_mask; - uint64_t next_line_inv = line_index_inv + m_line_size; - for (flow_index = line_index_inv; flow_index < next_line_inv; flow_index++) { - if (m_flow_table[flow_index]->belongs(hashval_inv)) { + /* Find existing flow record in flow cache. */ + for (flow_index = line_index; flow_index < next_line; flow_index++) { + if (m_flow_table[flow_index]->belongs(hashval)) { found = true; - source_flow = false; - hashval = hashval_inv; - line_index = line_index_inv; break; - } - } - } + } + } - if (found) { - /* Existing flow record was found, put flow record at the first index of flow line. */ + /* Find inversed flow. */ + if (!found && !m_split_biflow) { + uint64_t hashval_inv = XXH64(m_key_inv, m_keylen, 0); + uint64_t line_index_inv = hashval_inv & m_line_mask; + uint64_t next_line_inv = line_index_inv + m_line_size; + for (flow_index = line_index_inv; flow_index < next_line_inv; flow_index++) { + if (m_flow_table[flow_index]->belongs(hashval_inv)) { + found = true; + source_flow = false; + hashval = hashval_inv; + line_index = line_index_inv; + break; + } + } + } + + if (found) { + /* Existing flow record was found, put flow record at the first index of flow line. */ #ifdef FLOW_CACHE_STATS - m_lookups += (flow_index - line_index + 1); - m_lookups2 += (flow_index - line_index + 1) * (flow_index - line_index + 1); + m_lookups += (flow_index - line_index + 1); + m_lookups2 += (flow_index - line_index + 1) * (flow_index - line_index + 1); #endif /* FLOW_CACHE_STATS */ - flow = m_flow_table[flow_index]; - for (decltype(flow_index) j = flow_index; j > line_index; j--) { - m_flow_table[j] = m_flow_table[j - 1]; - } + flow = m_flow_table[flow_index]; + for (decltype(flow_index) j = flow_index; j > line_index; j--) { + m_flow_table[j] = m_flow_table[j - 1]; + } - m_flow_table[line_index] = flow; - flow_index = line_index; + m_flow_table[line_index] = flow; + flow_index = line_index; #ifdef FLOW_CACHE_STATS - m_hits++; + m_hits++; #endif /* FLOW_CACHE_STATS */ - } else { - /* Existing flow record was not found. Find free place in flow line. */ - for (flow_index = line_index; flow_index < next_line; flow_index++) { - if (m_flow_table[flow_index]->is_empty()) { - found = true; - break; - } - } - if (!found) { - /* If free place was not found (flow line is full), find - * record which will be replaced by new record. */ - flow_index = next_line - 1; - - // Export flow - plugins_pre_export(m_flow_table[flow_index]->m_flow); - m_flow_table[flow_index]->m_flow.end_reason = FLOW_END_NO_RES; - export_flow(flow_index); + } else { + /* Existing flow record was not found. Find free place in flow line. */ + for (flow_index = line_index; flow_index < next_line; flow_index++) { + if (m_flow_table[flow_index]->is_empty()) { + found = true; + break; + } + } + if (!found) { + /* If free place was not found (flow line is full), find + * record which will be replaced by new record. */ + flow_index = next_line - 1; + + // Export flow + plugins_pre_export(m_flow_table[flow_index]->m_flow); + m_flow_table[flow_index]->m_flow.end_reason = FLOW_END_NO_RES; + export_flow(flow_index); #ifdef FLOW_CACHE_STATS - m_expired++; + m_expired++; #endif /* FLOW_CACHE_STATS */ - uint32_t flow_new_index = line_index + m_line_new_idx; - flow = m_flow_table[flow_index]; - for (decltype(flow_index) j = flow_index; j > flow_new_index; j--) { - m_flow_table[j] = m_flow_table[j - 1]; - } - flow_index = flow_new_index; - m_flow_table[flow_new_index] = flow; + uint32_t flow_new_index = line_index + m_line_new_idx; + flow = m_flow_table[flow_index]; + for (decltype(flow_index) j = flow_index; j > flow_new_index; j--) { + m_flow_table[j] = m_flow_table[j - 1]; + } + flow_index = flow_new_index; + m_flow_table[flow_new_index] = flow; +#ifdef FLOW_CACHE_STATS + m_not_empty++; + } else { + m_empty++; +#endif /* FLOW_CACHE_STATS */ + } + } + + pkt.source_pkt = source_flow; + flow = m_flow_table[flow_index]; + + uint8_t flw_flags = source_flow ? flow->m_flow.src_tcp_flags : flow->m_flow.dst_tcp_flags; + if ((pkt.tcp_flags & 0x02) && (flw_flags & (0x01 | 0x04))) { + // Flows with FIN or RST TCP flags are exported when new SYN packet arrives + m_flow_table[flow_index]->m_flow.end_reason = FLOW_END_EOF; + export_flow(flow_index); + put_pkt(pkt); + return 0; + } + + if (flow->is_empty()) { + m_flows_in_cache++; + flow->create(pkt, hashval); + ret = plugins_post_create(flow->m_flow, pkt); + + if (ret & ProcessPlugin::FlowAction::FLUSH) { + export_flow(flow_index); #ifdef FLOW_CACHE_STATS - m_not_empty++; - } else { - m_empty++; + m_flushed++; #endif /* FLOW_CACHE_STATS */ - } - } - - pkt.source_pkt = source_flow; - flow = m_flow_table[flow_index]; - - uint8_t flw_flags = source_flow ? flow->m_flow.src_tcp_flags : flow->m_flow.dst_tcp_flags; - if ((pkt.tcp_flags & 0x02) && (flw_flags & (0x01 | 0x04))) { - // Flows with FIN or RST TCP flags are exported when new SYN packet arrives - m_flow_table[flow_index]->m_flow.end_reason = FLOW_END_EOF; - export_flow(flow_index); - put_pkt(pkt); - return 0; - } - - if (flow->is_empty()) { - m_flows_in_cache++; - flow->create(pkt, hashval); - ret = plugins_post_create(flow->m_flow, pkt); - - if (ret & ProcessPlugin::FlowAction::FLUSH) { - export_flow(flow_index); + } + } else { + /* Check if flow record is expired (inactive timeout). */ + if (pkt.ts.tv_sec - flow->m_flow.time_last.tv_sec >= m_inactive) { + m_flow_table[flow_index]->m_flow.end_reason = get_export_reason(flow->m_flow); + plugins_pre_export(flow->m_flow); + export_flow(flow_index); #ifdef FLOW_CACHE_STATS - m_flushed++; + m_expired++; #endif /* FLOW_CACHE_STATS */ - } - } else { - /* Check if flow record is expired (inactive timeout). */ - if (pkt.ts.tv_sec - flow->m_flow.time_last.tv_sec >= m_inactive) { - m_flow_table[flow_index]->m_flow.end_reason = get_export_reason(flow->m_flow); - plugins_pre_export(flow->m_flow); - export_flow(flow_index); - #ifdef FLOW_CACHE_STATS - m_expired++; - #endif /* FLOW_CACHE_STATS */ - return put_pkt(pkt); - } - - /* Check if flow record is expired (active timeout). */ - if (pkt.ts.tv_sec - flow->m_flow.time_first.tv_sec >= m_active) { - m_flow_table[flow_index]->m_flow.end_reason = FLOW_END_ACTIVE; - plugins_pre_export(flow->m_flow); - export_flow(flow_index); + return put_pkt(pkt); + } + + /* Check if flow record is expired (active timeout). */ + if (pkt.ts.tv_sec - flow->m_flow.time_first.tv_sec >= m_active) { + m_flow_table[flow_index]->m_flow.end_reason = FLOW_END_ACTIVE; + plugins_pre_export(flow->m_flow); + export_flow(flow_index); #ifdef FLOW_CACHE_STATS - m_expired++; + m_expired++; #endif /* FLOW_CACHE_STATS */ - return put_pkt(pkt); - } - - ret = plugins_pre_update(flow->m_flow, pkt); - if (ret & ProcessPlugin::FlowAction::FLUSH) { - flush(pkt, flow_index, ret, source_flow); - return 0; - } else { - flow->update(pkt, source_flow); - ret = plugins_post_update(flow->m_flow, pkt); - - if (ret & ProcessPlugin::FlowAction::FLUSH) { + return put_pkt(pkt); + } + + ret = plugins_pre_update(flow->m_flow, pkt); + if (ret & ProcessPlugin::FlowAction::FLUSH) { flush(pkt, flow_index, ret, source_flow); return 0; - } - } - } + } else { + flow->update(pkt, source_flow); + ret = plugins_post_update(flow->m_flow, pkt); + + if (ret & ProcessPlugin::FlowAction::FLUSH) { + flush(pkt, flow_index, ret, source_flow); + return 0; + } + } + } - export_expired(pkt.ts.tv_sec); - return 0; + export_expired(pkt.ts.tv_sec); + return 0; } void NHTFlowCache::try_to_fill_ports_to_fragmented_packet(Packet& packet) { - m_fragmentation_cache.process_packet(packet); + m_fragmentation_cache.process_packet(packet); } -uint8_t NHTFlowCache::get_export_reason(Flow &flow) +uint8_t NHTFlowCache::get_export_reason(Flow& flow) { - if ((flow.src_tcp_flags | flow.dst_tcp_flags) & (0x01 | 0x04)) { - // When FIN or RST is set, TCP connection ended naturally - return FLOW_END_EOF; - } else { - return FLOW_END_INACTIVE; - } + if ((flow.src_tcp_flags | flow.dst_tcp_flags) & (0x01 | 0x04)) { + // When FIN or RST is set, TCP connection ended naturally + return FLOW_END_EOF; + } else { + return FLOW_END_INACTIVE; + } } void NHTFlowCache::export_expired(time_t ts) { - for (decltype(m_timeout_idx) i = m_timeout_idx; i < m_timeout_idx + m_line_new_idx; i++) { - if (!m_flow_table[i]->is_empty() && ts - m_flow_table[i]->m_flow.time_last.tv_sec >= m_inactive) { - m_flow_table[i]->m_flow.end_reason = get_export_reason(m_flow_table[i]->m_flow); - plugins_pre_export(m_flow_table[i]->m_flow); - export_flow(i); - if (!m_flow_table[i]->is_empty() && m_flow_table[i]->m_flow.is_delayed && m_flow_table[i]->m_flow.delay_time >= ts) { - m_flow_table[i]->m_flow.is_delayed = false; - plugins_pre_export(m_flow_table[i]->m_flow); - export_flow(i); - } - if(!m_flow_table[i]->is_empty() && m_flow_table[i]->m_delayed_flow_waiting && m_flow_table[i]->m_delayed_flow.delay_time >= ts) { - m_flow_table[i]->m_delayed_flow_waiting = false; - plugins_pre_export(m_flow_table[i]->m_delayed_flow); - export_flow(i); - } + for (decltype(m_timeout_idx) i = m_timeout_idx; i < m_timeout_idx + m_line_new_idx; i++) { + if (!m_flow_table[i]->is_empty() + && ts - m_flow_table[i]->m_flow.time_last.tv_sec >= m_inactive) { + m_flow_table[i]->m_flow.end_reason = get_export_reason(m_flow_table[i]->m_flow); + plugins_pre_export(m_flow_table[i]->m_flow); + export_flow(i); + if (!m_flow_table[i]->is_empty() && m_flow_table[i]->m_flow.is_delayed + && m_flow_table[i]->m_flow.delay_time >= ts) { + m_flow_table[i]->m_flow.is_delayed = false; + plugins_pre_export(m_flow_table[i]->m_flow); + export_flow(i); + } + if (!m_flow_table[i]->is_empty() && m_flow_table[i]->m_delayed_flow_waiting + && m_flow_table[i]->m_delayed_flow.delay_time >= ts) { + m_flow_table[i]->m_delayed_flow_waiting = false; + plugins_pre_export(m_flow_table[i]->m_delayed_flow); + export_flow(i); + } #ifdef FLOW_CACHE_STATS - m_expired++; + m_expired++; #endif /* FLOW_CACHE_STATS */ - } - } + } + } - m_timeout_idx = (m_timeout_idx + m_line_new_idx) & (m_cache_size - 1); + m_timeout_idx = (m_timeout_idx + m_line_new_idx) & (m_cache_size - 1); } -bool NHTFlowCache::create_hash_key(Packet &pkt) -{ - if (pkt.ip_version == IP::v4) { - struct flow_key_v4_t *key_v4 = reinterpret_cast(m_key); - struct flow_key_v4_t *key_v4_inv = reinterpret_cast(m_key_inv); - - key_v4->proto = pkt.ip_proto; - key_v4->ip_version = IP::v4; - key_v4->src_port = pkt.src_port; - key_v4->dst_port = pkt.dst_port; - key_v4->src_ip = pkt.src_ip.v4; - key_v4->dst_ip = pkt.dst_ip.v4; - key_v4->vlan_id = pkt.vlan_id; - - key_v4_inv->proto = pkt.ip_proto; - key_v4_inv->ip_version = IP::v4; - key_v4_inv->src_port = pkt.dst_port; - key_v4_inv->dst_port = pkt.src_port; - key_v4_inv->src_ip = pkt.dst_ip.v4; - key_v4_inv->dst_ip = pkt.src_ip.v4; - key_v4_inv->vlan_id = pkt.vlan_id; - - m_keylen = sizeof(flow_key_v4_t); - return true; - } else if (pkt.ip_version == IP::v6) { - struct flow_key_v6_t *key_v6 = reinterpret_cast(m_key); - struct flow_key_v6_t *key_v6_inv = reinterpret_cast(m_key_inv); - - key_v6->proto = pkt.ip_proto; - key_v6->ip_version = IP::v6; - key_v6->src_port = pkt.src_port; - key_v6->dst_port = pkt.dst_port; - memcpy(key_v6->src_ip, pkt.src_ip.v6, sizeof(pkt.src_ip.v6)); - memcpy(key_v6->dst_ip, pkt.dst_ip.v6, sizeof(pkt.dst_ip.v6)); - key_v6->vlan_id = pkt.vlan_id; - - key_v6_inv->proto = pkt.ip_proto; - key_v6_inv->ip_version = IP::v6; - key_v6_inv->src_port = pkt.dst_port; - key_v6_inv->dst_port = pkt.src_port; - memcpy(key_v6_inv->src_ip, pkt.dst_ip.v6, sizeof(pkt.dst_ip.v6)); - memcpy(key_v6_inv->dst_ip, pkt.src_ip.v6, sizeof(pkt.src_ip.v6)); - key_v6_inv->vlan_id = pkt.vlan_id; - - m_keylen = sizeof(flow_key_v6_t); - return true; - } - - return false; +bool NHTFlowCache::create_hash_key(Packet& pkt) +{ + if (pkt.ip_version == IP::v4) { + struct flow_key_v4_t* key_v4 = reinterpret_cast(m_key); + struct flow_key_v4_t* key_v4_inv = reinterpret_cast(m_key_inv); + + key_v4->proto = pkt.ip_proto; + key_v4->ip_version = IP::v4; + key_v4->src_port = pkt.src_port; + key_v4->dst_port = pkt.dst_port; + key_v4->src_ip = pkt.src_ip.v4; + key_v4->dst_ip = pkt.dst_ip.v4; + key_v4->vlan_id = pkt.vlan_id; + + key_v4_inv->proto = pkt.ip_proto; + key_v4_inv->ip_version = IP::v4; + key_v4_inv->src_port = pkt.dst_port; + key_v4_inv->dst_port = pkt.src_port; + key_v4_inv->src_ip = pkt.dst_ip.v4; + key_v4_inv->dst_ip = pkt.src_ip.v4; + key_v4_inv->vlan_id = pkt.vlan_id; + + m_keylen = sizeof(flow_key_v4_t); + return true; + } else if (pkt.ip_version == IP::v6) { + struct flow_key_v6_t* key_v6 = reinterpret_cast(m_key); + struct flow_key_v6_t* key_v6_inv = reinterpret_cast(m_key_inv); + + key_v6->proto = pkt.ip_proto; + key_v6->ip_version = IP::v6; + key_v6->src_port = pkt.src_port; + key_v6->dst_port = pkt.dst_port; + memcpy(key_v6->src_ip, pkt.src_ip.v6, sizeof(pkt.src_ip.v6)); + memcpy(key_v6->dst_ip, pkt.dst_ip.v6, sizeof(pkt.dst_ip.v6)); + key_v6->vlan_id = pkt.vlan_id; + + key_v6_inv->proto = pkt.ip_proto; + key_v6_inv->ip_version = IP::v6; + key_v6_inv->src_port = pkt.dst_port; + key_v6_inv->dst_port = pkt.src_port; + memcpy(key_v6_inv->src_ip, pkt.dst_ip.v6, sizeof(pkt.dst_ip.v6)); + memcpy(key_v6_inv->dst_ip, pkt.src_ip.v6, sizeof(pkt.src_ip.v6)); + key_v6_inv->vlan_id = pkt.vlan_id; + + m_keylen = sizeof(flow_key_v6_t); + return true; + } + + return false; } #ifdef FLOW_CACHE_STATS void NHTFlowCache::print_report() { - float tmp = float(m_lookups) / m_hits; + float tmp = float(m_lookups) / m_hits; - cout << "Hits: " << m_hits << endl; - cout << "Empty: " << m_empty << endl; - cout << "Not empty: " << m_not_empty << endl; - cout << "Expired: " << m_expired << endl; - cout << "Flushed: " << m_flushed << endl; - cout << "Average Lookup: " << tmp << endl; - cout << "Variance Lookup: " << float(m_lookups2) / m_hits - tmp * tmp << endl; + cout << "Hits: " << m_hits << endl; + cout << "Empty: " << m_empty << endl; + cout << "Not empty: " << m_not_empty << endl; + cout << "Expired: " << m_expired << endl; + cout << "Flushed: " << m_flushed << endl; + cout << "Average Lookup: " << tmp << endl; + cout << "Variance Lookup: " << float(m_lookups2) / m_hits - tmp * tmp << endl; } #endif /* FLOW_CACHE_STATS */ void NHTFlowCache::set_telemetry_dir(std::shared_ptr dir) { - telemetry::FileOps statsOps = {[=]() { return get_cache_telemetry(); }, nullptr}; - register_file(dir, "cache-stats", statsOps); + telemetry::FileOps statsOps = {[=]() { return get_cache_telemetry(); }, nullptr}; + register_file(dir, "cache-stats", statsOps); - if (m_enable_fragmentation_cache) { - m_fragmentation_cache.set_telemetry_dir(dir); - } + if (m_enable_fragmentation_cache) { + m_fragmentation_cache.set_telemetry_dir(dir); + } } void NHTFlowCache::update_flow_record_stats(uint64_t packets_count) { - if (packets_count == 1) { - m_flow_record_stats.packets_count_1++; - } else if (packets_count >= 2 && packets_count <= 5) { - m_flow_record_stats.packets_count_2_5++; - } else if (packets_count >= 6 && packets_count <= 10) { - m_flow_record_stats.packets_count_6_10++; - } else if (packets_count >= 11 && packets_count <= 20) { - m_flow_record_stats.packets_count_11_20++; - } else if (packets_count >= 21 && packets_count <= 50) { - m_flow_record_stats.packets_count_21_50++; - } else { - m_flow_record_stats.packets_count_51_plus++; - } + if (packets_count == 1) { + m_flow_record_stats.packets_count_1++; + } else if (packets_count >= 2 && packets_count <= 5) { + m_flow_record_stats.packets_count_2_5++; + } else if (packets_count >= 6 && packets_count <= 10) { + m_flow_record_stats.packets_count_6_10++; + } else if (packets_count >= 11 && packets_count <= 20) { + m_flow_record_stats.packets_count_11_20++; + } else if (packets_count >= 21 && packets_count <= 50) { + m_flow_record_stats.packets_count_21_50++; + } else { + m_flow_record_stats.packets_count_51_plus++; + } } void NHTFlowCache::update_flow_end_reason_stats(uint8_t reason) { - switch (reason) { - case FLOW_END_ACTIVE: - m_flow_end_reason_stats.active_timeout++; - break; - case FLOW_END_INACTIVE: - m_flow_end_reason_stats.inactive_timeout++; - break; - case FLOW_END_EOF: - m_flow_end_reason_stats.end_of_flow++; - break; - case FLOW_END_NO_RES: - m_flow_end_reason_stats.collision++; - break; - case FLOW_END_FORCED: - m_flow_end_reason_stats.forced++; - break; - default: - break; - } + switch (reason) { + case FLOW_END_ACTIVE: + m_flow_end_reason_stats.active_timeout++; + break; + case FLOW_END_INACTIVE: + m_flow_end_reason_stats.inactive_timeout++; + break; + case FLOW_END_EOF: + m_flow_end_reason_stats.end_of_flow++; + break; + case FLOW_END_NO_RES: + m_flow_end_reason_stats.collision++; + break; + case FLOW_END_FORCED: + m_flow_end_reason_stats.forced++; + break; + default: + break; + } } telemetry::Content NHTFlowCache::get_cache_telemetry() { - telemetry::Dict dict; + telemetry::Dict dict; - dict["FlowEndReason:ActiveTimeout"] = m_flow_end_reason_stats.active_timeout; - dict["FlowEndReason:InactiveTimeout"] = m_flow_end_reason_stats.inactive_timeout; - dict["FlowEndReason:EndOfFlow"] = m_flow_end_reason_stats.end_of_flow; - dict["FlowEndReason:Collision"] = m_flow_end_reason_stats.collision; - dict["FlowEndReason:Forced"] = m_flow_end_reason_stats.forced; + dict["FlowEndReason:ActiveTimeout"] = m_flow_end_reason_stats.active_timeout; + dict["FlowEndReason:InactiveTimeout"] = m_flow_end_reason_stats.inactive_timeout; + dict["FlowEndReason:EndOfFlow"] = m_flow_end_reason_stats.end_of_flow; + dict["FlowEndReason:Collision"] = m_flow_end_reason_stats.collision; + dict["FlowEndReason:Forced"] = m_flow_end_reason_stats.forced; - dict["FlowsInCache"] = m_flows_in_cache; - dict["FlowCacheUsage"] = telemetry::ScalarWithUnit {double(m_flows_in_cache) / m_cache_size * 100, "%"}; + dict["FlowsInCache"] = m_flows_in_cache; + dict["FlowCacheUsage"] + = telemetry::ScalarWithUnit {double(m_flows_in_cache) / m_cache_size * 100, "%"}; - dict["FlowRecordStats:1packet"] = m_flow_record_stats.packets_count_1; - dict["FlowRecordStats:2-5packets"] = m_flow_record_stats.packets_count_2_5; - dict["FlowRecordStats:6-10packets"] = m_flow_record_stats.packets_count_6_10; - dict["FlowRecordStats:11-20packets"] = m_flow_record_stats.packets_count_11_20; - dict["FlowRecordStats:21-50packets"] = m_flow_record_stats.packets_count_21_50; - dict["FlowRecordStats:51-plusPackets"] = m_flow_record_stats.packets_count_51_plus; + dict["FlowRecordStats:1packet"] = m_flow_record_stats.packets_count_1; + dict["FlowRecordStats:2-5packets"] = m_flow_record_stats.packets_count_2_5; + dict["FlowRecordStats:6-10packets"] = m_flow_record_stats.packets_count_6_10; + dict["FlowRecordStats:11-20packets"] = m_flow_record_stats.packets_count_11_20; + dict["FlowRecordStats:21-50packets"] = m_flow_record_stats.packets_count_21_50; + dict["FlowRecordStats:51-plusPackets"] = m_flow_record_stats.packets_count_51_plus; - dict["TotalExportedFlows"] = m_total_exported; + dict["TotalExportedFlows"] = m_total_exported; - return dict; + return dict; } void NHTFlowCache::prefetch_export_expired() const { - for (decltype(m_timeout_idx) i = m_timeout_idx; i < m_timeout_idx + m_line_new_idx; i++) { - __builtin_prefetch(m_flow_table[i], 0, 1); - } + for (decltype(m_timeout_idx) i = m_timeout_idx; i < m_timeout_idx + m_line_new_idx; i++) { + __builtin_prefetch(m_flow_table[i], 0, 1); + } } #ifdef WITH_CTT @@ -701,14 +727,11 @@ void NHTFlowCache::prefetch_export_expired() const void CttController::create_record(uint64_t flow_hash_ctt, const struct timeval& ts) { try { - std::vector key = assemble_key(flow_hash_ctt); - std::vector state = assemble_state( - OffloadMode::PACKET_OFFLOAD, - MetaType::FULL, - ts); - m_commander->write_record(std::move(key), std::move(state)); - } - catch (const std::exception& e) { + std::vector key = assemble_key(flow_hash_ctt); + std::vector state + = assemble_state(OffloadMode::PACKET_OFFLOAD, MetaType::FULL, ts); + m_commander->write_record(std::move(key), std::move(state)); + } catch (const std::exception& e) { throw; } } @@ -718,8 +741,7 @@ void CttController::export_record(uint64_t flow_hash_ctt) try { std::vector key = assemble_key(flow_hash_ctt); m_commander->export_and_delete_record(std::move(key)); - } - catch (const std::exception& e) { + } catch (const std::exception& e) { throw; } } @@ -734,7 +756,9 @@ std::vector CttController::assemble_key(uint64_t flow_hash_ctt) } std::vector CttController::assemble_state( - OffloadMode offload_mode, MetaType meta_type, const struct timeval& ts) + OffloadMode offload_mode, + MetaType meta_type, + const struct timeval& ts) { std::vector state(state_size_bytes, std::byte(0)); std::vector state_mask(state_mask_size_bytes, std::byte(0)); @@ -752,4 +776,4 @@ std::vector CttController::assemble_state( return state; } #endif // WITH_CTT -} \ No newline at end of file +} // namespace ipxp \ No newline at end of file diff --git a/storage/cache.hpp b/storage/cache.hpp index ad8de024..32329a37 100644 --- a/storage/cache.hpp +++ b/storage/cache.hpp @@ -195,9 +195,6 @@ class CacheOptParser : public OptionsParser bool m_enable_fragmentation_cache; std::size_t m_frag_cache_size; time_t m_frag_cache_timeout; - #ifdef WITH_CTT - std::string m_dev; - #endif /* WITH_CTT */ CacheOptParser() : OptionsParser("cache", "Storage plugin implemented as a hash table"), m_cache_size(1 << DEFAULT_FLOW_CACHE_SIZE), m_line_size(1 << DEFAULT_FLOW_LINE_SIZE), @@ -255,16 +252,6 @@ class CacheOptParser : public OptionsParser } return true; }); - - #ifdef WITH_CTT - register_option("d", "dev", "DEV", "Device name", - [this](const char *arg) { - m_dev = arg; - return true; - }, - OptionFlags::RequiredArgument); - #endif /* WITH_CTT */ - } }; @@ -398,6 +385,14 @@ class NHTFlowCache : TelemetryUtils, public StoragePlugin FlowRecord **m_flow_table; FlowRecord *m_flow_records; #ifdef WITH_CTT + + void set_ctt_config(const std::string& device_name, unsigned comp_index) { + m_ctt_device = device_name; + m_ctt_comp_index = comp_index; + } + + std::string m_ctt_device; + unsigned m_ctt_comp_index; CttController m_ctt_controller; #endif /* WITH_CTT */ FragmentationCache m_fragmentation_cache;