Skip to content

Commit 84b4f7d

Browse files
authored
Merge pull request #180 from CESNET/fragmentation-cache
Introduce fragmentation cache Cache merge releated fragmented packets to single flow
2 parents ee85a9d + 87b7e47 commit 84b4f7d

16 files changed

+997
-8
lines changed

Makefile.am

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,13 @@ ipfixprobe_input_src+=\
6060
endif
6161

6262
ipfixprobe_storage_src=\
63+
storage/fragmentationCache/ringBuffer.hpp \
64+
storage/fragmentationCache/timevalUtils.hpp \
65+
storage/fragmentationCache/fragmentationKeyData.hpp \
66+
storage/fragmentationCache/fragmentationTable.hpp \
67+
storage/fragmentationCache/fragmentationTable.cpp \
68+
storage/fragmentationCache/fragmentationCache.hpp \
69+
storage/fragmentationCache/fragmentationCache.cpp \
6370
storage/cache.cpp \
6471
storage/cache.hpp \
6572
storage/xxhash.c \

include/ipfixprobe/flowifc.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848

4949
#include <arpa/inet.h>
5050
#include "ipaddr.hpp"
51+
#include <string>
5152

5253
namespace ipxp {
5354

include/ipfixprobe/packet.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ struct Packet : public Record {
6161
ipaddr_t src_ip;
6262
ipaddr_t dst_ip;
6363
uint32_t vlan_id;
64+
uint32_t frag_id;
65+
uint16_t frag_off;
66+
bool more_fragments;
6467

6568
uint16_t src_port;
6669
uint16_t dst_port;
@@ -107,6 +110,7 @@ struct Packet : public Record {
107110
dst_mac(), src_mac(), ethertype(0),
108111
ip_len(0), ip_payload_len(0), ip_version(0), ip_ttl(0),
109112
ip_proto(0), ip_tos(0), ip_flags(0), src_ip({0}), dst_ip({0}), vlan_id(0),
113+
frag_id(0), frag_off(0), more_fragments(false),
110114
src_port(0), dst_port(0), tcp_flags(0), tcp_window(0),
111115
tcp_options(0), tcp_mss(0), tcp_seq(0), tcp_ack(0), mplsTop(0),
112116
packet(nullptr), packet_len(0), packet_len_wire(0),

init/ipfixprobed

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,23 @@ if [ -e "$CONFFILE" ]; then
5454
if [ ! -z ${ACTIVE_TIMEOUT+x} ]; then
5555
CACHE_ACTIVET_PARAM=";active=${ACTIVE_TIMEOUT}"
5656
fi
57-
CACHE_INACTIVET_PARAM=""
57+
CACHE_INACTIVE_PARAM=""
5858
if [ ! -z ${INACTIVE_TIMEOUT+x} ]; then
59-
CACHE_INACTIVET_PARAM=";inactive=${INACTIVE_TIMEOUT}"
59+
CACHE_INACTIVE_PARAM=";inactive=${INACTIVE_TIMEOUT}"
6060
fi
61-
storage="-s cache;${CACHE_SIZE_PARAM}${CACHE_ACTIVET_PARAM}${CACHE_INACTIVET_PARAM}"
61+
CACHE_FRAG_ENABLE_PARAM=""
62+
if [ ! -z ${FRAG_CACHE_ENABLE+x} ]; then
63+
CACHE_FRAG_ENABLE_PARAM=";frag-enable=${FRAG_CACHE_ENABLE}"
64+
fi
65+
CACHE_FRAG_SIZE=""
66+
if [ ! -z ${FRAG_CACHE_SIZE+x} ]; then
67+
CACHE_FRAG_SIZE=";frag-size=${FRAG_CACHE_SIZE}"
68+
fi
69+
CACHE_FRAG_TIMEOUT=""
70+
if [ ! -z ${FRAG_CACHE_TIMEOUT+x} ]; then
71+
CACHE_FRAG_TIMEOUT=";frag-timeout=${FRAG_CACHE_TIMEOUT}"
72+
fi
73+
storage="-s cache;${CACHE_SIZE_PARAM}${CACHE_ACTIVET_PARAM}${CACHE_INACTIVE_PARAM}${CACHE_FRAG_ENABLE_PARAM}${CACHE_FRAG_SIZE}${CACHE_FRAG_TIMEOUT}"
6274
process=""
6375
if `declare -p PROCESS > /dev/null 2>/dev/null`; then
6476
# list of input plugins

init/link0.conf.example

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,15 @@ CACHE_SIZE=17
146146
ACTIVE_TIMEOUT=300
147147
INACTIVE_TIMEOUT=65
148148

149+
# Enable/disable fragmentation cache (true, false)
150+
FRAG_CACHE_ENABLE=true
151+
152+
# size of fragmentation cache
153+
FRAG_CACHE_SIZE=10007
154+
155+
# timeout in seconds for fragments in fragmentation cache
156+
FRAG_CACHE_TIMEOUT=3
157+
149158
#
150159
# $$$$$$\ $$\ $$\
151160
# $$ __$$\ $$ | $$ |

input/headers.hpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,15 @@
4646

4747
namespace ipxp {
4848

49+
struct ip6_frag {
50+
uint8_t ip_proto;
51+
uint8_t reserved;
52+
uint16_t frag_off;
53+
#define IPV6_FRAGMENT_OFFSET 0xFFF8
54+
#define IPV6_MORE_FRAGMENTS 0x1
55+
uint32_t frag_id;
56+
} __attribute__((packed));
57+
4958
struct grehdr {
5059
uint16_t flags;
5160
#define GRE_CHECKSUM 0x8000

input/parser.cpp

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ static uint32_t s_total_pkts = 0;
5151
#define DEBUG_CODE(code)
5252
#endif
5353

54+
// masks for iphdr::frag_off
55+
#define IPV4_MORE_FRAGMENTS 0x2000
56+
#define IPV4_FRAGMENT_OFFSET 0x1FFF
57+
5458
/**
5559
* \brief Parse specific fields from ETHERNET frame header.
5660
* \param [in] data_ptr Pointer to begin of header.
@@ -316,6 +320,9 @@ inline uint16_t parse_ipv4_hdr(const u_char *data_ptr, uint16_t data_len, Packet
316320
pkt->ip_flags = (ntohs(ip->frag_off) & 0xE000) >> 13;
317321
pkt->src_ip.v4 = ip->saddr;
318322
pkt->dst_ip.v4 = ip->daddr;
323+
pkt->frag_id = ntohs(ip->id);
324+
pkt->frag_off = ntohs(ip->frag_off) & IPV4_FRAGMENT_OFFSET;
325+
pkt->more_fragments = ntohs(ip->frag_off) & IPV4_MORE_FRAGMENTS;
319326

320327
DEBUG_MSG("IPv4 header:\n");
321328
DEBUG_MSG("\tHDR version:\t%u\n", ip->version);
@@ -335,7 +342,7 @@ inline uint16_t parse_ipv4_hdr(const u_char *data_ptr, uint16_t data_len, Packet
335342
}
336343

337344
/**
338-
* \brief Skip IPv6 extension headers.
345+
* \brief Skip/parse IPv6 extension headers.
339346
* \param [in] data_ptr Pointer to begin of header.
340347
* \param [in] data_len Length of packet data in `data_ptr`.
341348
* \param [out] pkt Pointer to Packet structure where parsed fields will be stored.
@@ -347,7 +354,7 @@ uint16_t skip_ipv6_ext_hdrs(const u_char *data_ptr, uint16_t data_len, Packet *p
347354
uint8_t next_hdr = pkt->ip_proto;
348355
uint16_t hdrs_len = 0;
349356

350-
/* Skip extension headers... */
357+
/* Skip/parse extension headers... */
351358
while (1) {
352359
if ((int)sizeof(struct ip6_ext) > data_len - hdrs_len) {
353360
throw "Parser detected malformed packet";
@@ -361,7 +368,13 @@ uint16_t skip_ipv6_ext_hdrs(const u_char *data_ptr, uint16_t data_len, Packet *p
361368
} else if (next_hdr == IPPROTO_AH) {
362369
hdrs_len += (ext->ip6e_len << 2) - 2;
363370
} else if (next_hdr == IPPROTO_FRAGMENT) {
364-
hdrs_len += 8;
371+
// extract the fragmentation info
372+
auto *frag = reinterpret_cast<const ip6_frag *>(data_ptr + hdrs_len);
373+
pkt->frag_id = ntohl(frag->frag_id);
374+
pkt->frag_off = ntohs(frag->frag_off) & IPV6_FRAGMENT_OFFSET;
375+
pkt->more_fragments = ntohs(frag->frag_off) & IPV6_MORE_FRAGMENTS;
376+
377+
hdrs_len += sizeof(ip6_frag);
365378
} else if (next_hdr == IPPROTO_MH) {
366379
hdrs_len += (ext->ip6e_len << 3) + 8;
367380
// Mobility header can't have payload now but may have it in the future

storage/cache.cpp

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,9 @@ void FlowRecord::update(const Packet &pkt, bool src)
164164
NHTFlowCache::NHTFlowCache() :
165165
m_cache_size(0), m_line_size(0), m_line_mask(0), m_line_new_idx(0),
166166
m_qsize(0), m_qidx(0), m_timeout_idx(0), m_active(0), m_inactive(0),
167-
m_split_biflow(false), m_keylen(0), m_key(), m_key_inv(), m_flow_table(nullptr), m_flow_records(nullptr)
167+
m_split_biflow(false), m_enable_fragmentation_cache(true), m_keylen(0),
168+
m_key(), m_key_inv(), m_flow_table(nullptr), m_flow_records(nullptr),
169+
m_fragmentation_cache(0, 0)
168170
{
169171
}
170172

@@ -213,6 +215,15 @@ void NHTFlowCache::init(const char *params)
213215
}
214216

215217
m_split_biflow = parser.m_split_biflow;
218+
m_enable_fragmentation_cache = parser.m_enable_fragmentation_cache;
219+
220+
if (m_enable_fragmentation_cache) {
221+
try {
222+
m_fragmentation_cache = FragmentationCache(parser.m_frag_cache_size, parser.m_frag_cache_timeout);
223+
} catch (std::bad_alloc &e) {
224+
throw PluginError("not enough memory for fragment cache allocation");
225+
}
226+
}
216227

217228
#ifdef FLOW_CACHE_STATS
218229
m_empty = 0;
@@ -301,6 +312,10 @@ int NHTFlowCache::put_pkt(Packet &pkt)
301312
{
302313
int ret = plugins_pre_create(pkt);
303314

315+
if (m_enable_fragmentation_cache) {
316+
try_to_fill_ports_to_fragmented_packet(pkt);
317+
}
318+
304319
if (!create_hash_key(pkt)) { // saves key value and key length into attributes NHTFlowCache::key and NHTFlowCache::m_keylen
305320
return 0;
306321
}
@@ -455,6 +470,11 @@ int NHTFlowCache::put_pkt(Packet &pkt)
455470
return 0;
456471
}
457472

473+
void NHTFlowCache::try_to_fill_ports_to_fragmented_packet(Packet& packet)
474+
{
475+
m_fragmentation_cache.process_packet(packet);
476+
}
477+
458478
uint8_t NHTFlowCache::get_export_reason(Flow &flow)
459479
{
460480
if ((flow.src_tcp_flags | flow.dst_tcp_flags) & (0x01 | 0x04)) {

storage/cache.hpp

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
#include <ipfixprobe/flowifc.hpp>
4040
#include <ipfixprobe/utils.hpp>
4141

42+
#include "fragmentationCache/fragmentationCache.hpp"
43+
4244
namespace ipxp {
4345

4446
struct __attribute__((packed)) flow_key_v4_t {
@@ -93,10 +95,15 @@ class CacheOptParser : public OptionsParser
9395
uint32_t m_active;
9496
uint32_t m_inactive;
9597
bool m_split_biflow;
98+
bool m_enable_fragmentation_cache;
99+
std::size_t m_frag_cache_size;
100+
time_t m_frag_cache_timeout;
96101

97102
CacheOptParser() : OptionsParser("cache", "Storage plugin implemented as a hash table"),
98103
m_cache_size(1 << DEFAULT_FLOW_CACHE_SIZE), m_line_size(1 << DEFAULT_FLOW_LINE_SIZE),
99-
m_active(DEFAULT_ACTIVE_TIMEOUT), m_inactive(DEFAULT_INACTIVE_TIMEOUT), m_split_biflow(false)
104+
m_active(DEFAULT_ACTIVE_TIMEOUT), m_inactive(DEFAULT_INACTIVE_TIMEOUT), m_split_biflow(false),
105+
m_enable_fragmentation_cache(true), m_frag_cache_size(10007), // Prime for better distribution in hash table
106+
m_frag_cache_timeout(3)
100107
{
101108
register_option("s", "size", "EXPONENT", "Cache size exponent to the power of two",
102109
[this](const char *arg){try {unsigned exp = str2num<decltype(exp)>(arg);
@@ -121,6 +128,33 @@ class CacheOptParser : public OptionsParser
121128
OptionFlags::RequiredArgument);
122129
register_option("S", "split", "", "Split biflows into uniflows",
123130
[this](const char *arg){ m_split_biflow = true; return true;}, OptionFlags::NoArgument);
131+
register_option("fe", "frag-enable", "true|false", "Enable/disable fragmentation cache. Enabled (true) by default.",
132+
[this](const char *arg){
133+
if (strcmp(arg, "true") == 0) {
134+
m_enable_fragmentation_cache = true;
135+
} else if (strcmp(arg, "false") == 0) {
136+
m_enable_fragmentation_cache = false;
137+
} else {
138+
return false;
139+
}
140+
return true;
141+
}, OptionFlags::RequiredArgument);
142+
register_option("fs", "frag-size", "size", "Size of fragmentation cache, must be at least 1. Default value is 10007.", [this](const char *arg) {
143+
try {
144+
m_frag_cache_size = str2num<decltype(m_frag_cache_size)>(arg);
145+
} catch(std::invalid_argument &e) {
146+
return false;
147+
}
148+
return m_frag_cache_size > 0;
149+
});
150+
register_option("ft", "frag-timeout", "TIME", "Timeout of fragments in fragmentation cache in seconds. Default value is 3.", [this](const char *arg) {
151+
try {
152+
m_frag_cache_timeout = str2num<decltype(m_frag_cache_timeout)>(arg);
153+
} catch(std::invalid_argument &e) {
154+
return false;
155+
}
156+
return true;
157+
});
124158
}
125159
};
126160

@@ -177,12 +211,16 @@ class NHTFlowCache : public StoragePlugin
177211
uint32_t m_active;
178212
uint32_t m_inactive;
179213
bool m_split_biflow;
214+
bool m_enable_fragmentation_cache;
180215
uint8_t m_keylen;
181216
char m_key[MAX_KEY_LENGTH];
182217
char m_key_inv[MAX_KEY_LENGTH];
183218
FlowRecord **m_flow_table;
184219
FlowRecord *m_flow_records;
185220

221+
FragmentationCache m_fragmentation_cache;
222+
223+
void try_to_fill_ports_to_fragmented_packet(Packet& packet);
186224
void flush(Packet &pkt, size_t flow_index, int ret, bool source_flow);
187225
bool create_hash_key(Packet &pkt);
188226
void export_flow(size_t index);
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/**
2+
* \file
3+
* \author Pavel Siska <siska@cesnet.cz>
4+
* \author Jakub Antonín Štigler xstigl00@stud.fit.vut.cz
5+
* \brief Contains implementation of the FragmentationCache class for managing fragmented packet
6+
* data using a fragmentation table.
7+
*
8+
* The FragmentationCache class handles the processing and management of fragmented network packets.
9+
* It utilizes a fragmentation table to store and retrieve necessary data for processing fragmented
10+
* packets.
11+
*/
12+
/*
13+
* Copyright (C) 2023 CESNET
14+
*
15+
* LICENSE TERMS
16+
*
17+
* Redistribution and use in source and binary forms, with or without
18+
* modification, are permitted provided that the following conditions
19+
* are met:
20+
* 1. Redistributions of source code must retain the above copyright
21+
* notice, this list of conditions and the following disclaimer.
22+
* 2. Redistributions in binary form must reproduce the above copyright
23+
* notice, this list of conditions and the following disclaimer in
24+
* the documentation and/or other materials provided with the
25+
* distribution.
26+
* 3. Neither the name of the Company nor the names of its contributors
27+
* may be used to endorse or promote products derived from this
28+
* software without specific prior written permission.
29+
*/
30+
31+
#include "../xxhash.h"
32+
#include "fragmentationCache.hpp"
33+
#include "timevalUtils.hpp"
34+
35+
#include <cstring>
36+
37+
namespace ipxp {
38+
39+
FragmentationCache::FragmentationCache(std::size_t table_size, time_t timeout_in_seconds)
40+
: m_timeout({timeout_in_seconds, 0})
41+
, m_fragmentation_table(table_size)
42+
{
43+
}
44+
45+
void FragmentationCache::process_packet(Packet& packet)
46+
{
47+
if (!is_packet_fragmented(packet)) {
48+
return; // Packet is not fragmented, no further action needed.
49+
}
50+
process_fragmented_packet(packet);
51+
}
52+
53+
void FragmentationCache::process_fragmented_packet(Packet& packet) noexcept
54+
{
55+
if (is_packet_first_fragment(packet)) {
56+
m_fragmentation_table.insert(packet);
57+
} else {
58+
auto fragmentation_data = m_fragmentation_table.find(packet);
59+
if (fragmentation_data) {
60+
fill_missing_packet_data(packet, *fragmentation_data);
61+
}
62+
}
63+
}
64+
65+
void FragmentationCache::fill_missing_packet_data(
66+
Packet& packet,
67+
const FragmentationData& fragmentation_data) noexcept
68+
{
69+
if (!is_fragmentation_data_timedouted(packet, fragmentation_data)) {
70+
fill_ports_to_packet(packet, fragmentation_data);
71+
}
72+
}
73+
74+
bool FragmentationCache::is_fragmentation_data_timedouted(
75+
const Packet& packet,
76+
const FragmentationData& data) const noexcept
77+
{
78+
return packet.ts > data.timestamp + m_timeout;
79+
}
80+
81+
void FragmentationCache::fill_ports_to_packet(
82+
Packet& packet,
83+
const FragmentationData& fragmentation_data) const noexcept
84+
{
85+
packet.src_port = fragmentation_data.source_port;
86+
packet.dst_port = fragmentation_data.destination_port;
87+
}
88+
89+
} // namespace ipxp

0 commit comments

Comments
 (0)