Skip to content

Commit

Permalink
Fixed invalid flow risk aggregation in collectd/influxd examples.
Browse files Browse the repository at this point in the history
 * CI: build single nDPId executable with `-Wall -Wextra -std=gnu99`
 * fixed missing error events in influxd example
 * added additional test cases for collectd
 * extended grafana dashboard

Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
  • Loading branch information
utoni committed Jan 6, 2024
1 parent 876aef9 commit a007a90
Show file tree
Hide file tree
Showing 222 changed files with 6,097 additions and 3,793 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ jobs:
- name: Build single nDPId executable (invoke CC directly)
if: (endsWith(matrix.compiler, 'gcc') || endsWith(matrix.compiler, 'clang')) && startsWith(matrix.coverage, '-DENABLE_COVERAGE=OFF') && startsWith(matrix.sanitizer, '-DENABLE_SANITIZER=ON') && startsWith(matrix.ndpid_gcrypt, '-DNDPI_WITH_GCRYPT=OFF') && startsWith(matrix.ndpid_zlib, '-DENABLE_ZLIB=ON')
run: |
cc -fsanitize=address -fsanitize=undefined -fno-sanitize=alignment -fsanitize=enum -fsanitize=leak nDPId.c nio.c utils.c -I./build/libnDPI/include/ndpi -I. -I./dependencies -I./dependencies/jsmn -I./dependencies/uthash/include -o /tmp/a.out -lpcap ./build/libnDPI/lib/libndpi.a -pthread -lm -lz
cc -Wall -Wextra -std=gnu99 -fsanitize=address -fsanitize=undefined -fno-sanitize=alignment -fsanitize=enum -fsanitize=leak nDPId.c nio.c utils.c -I./build/libnDPI/include/ndpi -I. -I./dependencies -I./dependencies/jsmn -I./dependencies/uthash/include -o /tmp/a.out -lpcap ./build/libnDPI/lib/libndpi.a -pthread -lm -lz
- name: Test EXEC
run: |
./build/nDPId-test
Expand Down
2 changes: 1 addition & 1 deletion .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ build_and_test_static_libndpi:
- >
if ldd ./build-cmake-submodule/nDPId | grep -qoEi libndpi; then \
echo 'nDPId linked against a static libnDPI should not contain a shared linked libnDPI.' >&2; false; fi
- cc nDPId.c nio.c utils.c -I./build-cmake-submodule/libnDPI/include/ndpi -I. -I./dependencies -I./dependencies/jsmn -I./dependencies/uthash/include -o /tmp/a.out -lpcap ./build-cmake-submodule/libnDPI/lib/libndpi.a -pthread -lm -lz
- cc -Wall -Wextra -std=gnu99 nDPId.c nio.c utils.c -I./build-cmake-submodule/libnDPI/include/ndpi -I. -I./dependencies -I./dependencies/jsmn -I./dependencies/uthash/include -o /tmp/a.out -lpcap ./build-cmake-submodule/libnDPI/lib/libndpi.a -pthread -lm -lz
artifacts:
expire_in: 1 week
paths:
Expand Down
127 changes: 62 additions & 65 deletions examples/c-collectd/c-collectd.c
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ static struct
uint64_t flow_guessed_count;
uint64_t flow_not_detected_count;

nDPIsrvd_ull flow_risk_count[NDPI_MAX_RISK - 1];
nDPIsrvd_ull flow_risk_count[NDPI_MAX_RISK - 1 /* NDPI_NO_RISK */];
nDPIsrvd_ull flow_risk_unknown_count;
} gauges[2];
} collectd_statistics = {};
Expand Down Expand Up @@ -739,7 +739,7 @@ static void print_collectd_exec_output(void)
COLLECTD_GAUGE_N(flow_guessed_count),
COLLECTD_GAUGE_N(flow_not_detected_count));

for (i = 0; i < NDPI_MAX_RISK - 1; ++i)
for (i = 0; i < NDPI_MAX_RISK - 1 /* NDPI_NO_RISK */; ++i)
{
char gauge_name[BUFSIZ];
snprintf(gauge_name, sizeof(gauge_name), "flow_risk_%zu_count", i + 1);
Expand Down Expand Up @@ -832,7 +832,7 @@ static void print_collectd_exec_output(void)
COLLECTD_STATS_GAUGE_SUB(flow_guessed_count);
COLLECTD_STATS_GAUGE_SUB(flow_not_detected_count);

for (size_t i = 0; i < NDPI_MAX_RISK - 1; ++i)
for (size_t i = 0; i < NDPI_MAX_RISK - 1 /* NDPI_NO_RISK */; ++i)
{
COLLECTD_STATS_GAUGE_SUB(flow_risk_count[i]);
}
Expand Down Expand Up @@ -1219,95 +1219,92 @@ static void process_flow_stats(struct nDPIsrvd_socket * const sock, struct nDPIs
{
size_t numeric_risk_len = 0;
char const * const numeric_risk_str = TOKEN_GET_KEY(sock, current, &numeric_risk_len);
nDPIsrvd_ull numeric_risk_value = (nDPIsrvd_ull)-1;
nDPIsrvd_ull numeric_risk_value = 0;
char numeric_risk_buf[numeric_risk_len + 1];

if (numeric_risk_len > 0 && numeric_risk_str != NULL)
if (numeric_risk_len == 0 || numeric_risk_str == NULL)
{
strncpy(numeric_risk_buf, numeric_risk_str, numeric_risk_len);
numeric_risk_buf[numeric_risk_len] = '\0';
logger(1, "%s", "Missing numeric risk value");
continue;
}

struct nDPIsrvd_json_token const * const severity =
TOKEN_GET_SZ(sock, "ndpi", "flow_risk", numeric_risk_buf, "severity");
uint8_t severity_index;
strncpy(numeric_risk_buf, numeric_risk_str, numeric_risk_len);
numeric_risk_buf[numeric_risk_len] = '\0';

if (collectd_map_flow_u8(
sock, severity, severity_map, nDPIsrvd_ARRAY_LENGTH(severity_map), &severity_index) != 0)
{
severity_index = 0;
}
struct nDPIsrvd_json_token const * const severity =
TOKEN_GET_SZ(sock, "ndpi", "flow_risk", numeric_risk_buf, "severity");
uint8_t severity_index;

if (collectd_map_flow_u8(
sock, severity, severity_map, nDPIsrvd_ARRAY_LENGTH(severity_map), &severity_index) != 0)
{
severity_index = 0;
}

if (severity_index != 0)
if (severity_index != 0)
{
for (uint8_t i = 0; i < MAX_SEVERITIES_PER_FLOW; ++i)
{
for (uint8_t i = 0; i < MAX_SEVERITIES_PER_FLOW; ++i)
if (flow_user_data->severities[i] != 0)
{
continue;
}
if (flow_user_data->severities[i] == severity_index)
{
if (flow_user_data->severities[i] != 0)
{
continue;
}
if (flow_user_data->severities[i] == severity_index)
{
break;
}

if (collectd_map_value_to_stat(
sock, severity, severity_map, nDPIsrvd_ARRAY_LENGTH(severity_map)) != 0)
{
severity_index = 0;
break;
}
flow_user_data->severities[i] = severity_index;
break;
}
}
if (severity_index == 0)
{
size_t value_len = 0;
char const * const value_str = TOKEN_GET_VALUE(sock, severity, &value_len);

if (value_len > 0 && value_str != NULL)
if (collectd_map_value_to_stat(
sock, severity, severity_map, nDPIsrvd_ARRAY_LENGTH(severity_map)) != 0)
{
logger(1,
"Unknown/Invalid JSON value for key 'ndpi','breed': %.*s",
(int)value_len,
value_str);
severity_index = 0;
break;
}
flow_user_data->severities[i] = severity_index;
break;
}
}

if (severity_index == 0)
{
size_t value_len = 0;
char const * const value_str = TOKEN_GET_VALUE(sock, severity, &value_len);

if (str_value_to_ull(numeric_risk_str, &numeric_risk_value) == CONVERSION_OK)
if (value_len > 0 && value_str != NULL)
{
if (numeric_risk_value < NDPI_MAX_RISK && numeric_risk_value > 0)
logger(1, "Unknown/Invalid JSON value for key 'ndpi','breed': %.*s", (int)value_len, value_str);
}
}

if (str_value_to_ull(numeric_risk_str, &numeric_risk_value) == CONVERSION_OK)
{
if (numeric_risk_value < NDPI_MAX_RISK && numeric_risk_value > 0)
{
for (uint8_t i = 0; i < MAX_RISKS_PER_FLOW; ++i)
{
for (uint8_t i = 0; i < MAX_RISKS_PER_FLOW; ++i)
if (flow_user_data->risks[i] != 0)
{
continue;
}
if (flow_user_data->risks[i] == numeric_risk_value - 1)
{
if (flow_user_data->risks[i] != 0)
{
continue;
}
if (flow_user_data->risks[i] == numeric_risk_value)
{
break;
}

COLLECTD_STATS_GAUGE_INC(flow_risk_count[numeric_risk_value]);
flow_user_data->risks[i] = numeric_risk_value;
break;
}
}
else if (flow_user_data->risk_ndpid_invalid == 0)
{
flow_user_data->risk_ndpid_invalid = 1;
COLLECTD_STATS_GAUGE_INC(flow_risk_unknown_count);

COLLECTD_STATS_GAUGE_INC(flow_risk_count[numeric_risk_value - 1]);
flow_user_data->risks[i] = numeric_risk_value - 1;
break;
}
}
else
else if (flow_user_data->risk_ndpid_invalid == 0)
{
logger(1, "Invalid numeric risk value: %s", numeric_risk_buf);
flow_user_data->risk_ndpid_invalid = 1;
COLLECTD_STATS_GAUGE_INC(flow_risk_unknown_count);
}
}
else
{
logger(1, "%s", "Missing numeric risk value");
logger(1, "Invalid numeric risk value: %s", numeric_risk_buf);
}
}
}
Expand Down
18 changes: 5 additions & 13 deletions examples/c-collectd/rrdgraph.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,11 @@ rrdtool_graph() {
}

rrdtool_graph Flows Amount "${OUTDIR}/flows" \
DEF:flows_new=${RRDDIR}/counter-flow_new_count.rrd:value:AVERAGE \
DEF:flows_end=${RRDDIR}/counter-flow_end_count.rrd:value:AVERAGE \
DEF:flows_idle=${RRDDIR}/counter-flow_idle_count.rrd:value:AVERAGE \
$(rrdtool_graph_colorize_missing_data flows_new) \
AREA:flows_new#54EC48::STACK \
AREA:flows_end#ECD748::STACK \
AREA:flows_idle#EC9D48::STACK \
LINE2:flows_new#24BC14:"New." \
$(rrdtool_graph_print_cur_min_max_avg flows_new) \
LINE2:flows_end#C9B215:"End." \
$(rrdtool_graph_print_cur_min_max_avg flows_end) \
LINE2:flows_idle#CC7016:"Idle" \
$(rrdtool_graph_print_cur_min_max_avg flows_idle)
DEF:flows_active=${RRDDIR}/gauge-flow_active_count.rrd:value:AVERAGE \
$(rrdtool_graph_colorize_missing_data flows_active) \
AREA:flows_active#54EC48::STACK \
LINE2:flows_active#24BC14:"Active" \
$(rrdtool_graph_print_cur_min_max_avg flows_active)
rrdtool_graph Detections Amount "${OUTDIR}/detections" \
DEF:flows_detected=${RRDDIR}/gauge-flow_detected_count.rrd:value:AVERAGE \
DEF:flows_guessed=${RRDDIR}/gauge-flow_guessed_count.rrd:value:AVERAGE \
Expand Down
25 changes: 15 additions & 10 deletions examples/c-influxd/c-influxd.c
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ static struct
uint64_t flow_guessed_count;
uint64_t flow_not_detected_count;

nDPIsrvd_ull flow_risk_count[NDPI_MAX_RISK - 1];
nDPIsrvd_ull flow_risk_count[NDPI_MAX_RISK - 1 /* NDPI_NO_RISK */];
nDPIsrvd_ull flow_risk_unknown_count;
} gauges[2]; /* values after InfluxDB push: gauges[0] -= gauges[1], gauges[1] is zero'd afterwards */
} influxd_statistics = {.rw_lock = PTHREAD_MUTEX_INITIALIZER};
Expand Down Expand Up @@ -388,7 +388,8 @@ static int serialize_influx_line(char * buf, size_t siz)
INFLUXDB_FORMAT() INFLUXDB_FORMAT() INFLUXDB_FORMAT() INFLUXDB_FORMAT()
INFLUXDB_FORMAT() INFLUXDB_FORMAT() INFLUXDB_FORMAT() INFLUXDB_FORMAT()
INFLUXDB_FORMAT() INFLUXDB_FORMAT() INFLUXDB_FORMAT() INFLUXDB_FORMAT()
INFLUXDB_FORMAT() INFLUXDB_FORMAT() INFLUXDB_FORMAT_END(),
INFLUXDB_FORMAT() INFLUXDB_FORMAT() INFLUXDB_FORMAT() INFLUXDB_FORMAT()
INFLUXDB_FORMAT() INFLUXDB_FORMAT_END(),
"events",
INFLUXDB_VALUE_COUNTER(flow_new_count),
INFLUXDB_VALUE_COUNTER(flow_end_count),
Expand Down Expand Up @@ -419,7 +420,10 @@ static int serialize_influx_line(char * buf, size_t siz)
INFLUXDB_VALUE_COUNTER(error_ip6_size_smaller_than_header),
INFLUXDB_VALUE_COUNTER(error_ip6_l4_payload_detection),
INFLUXDB_VALUE_COUNTER(error_tcp_packet_too_short),
INFLUXDB_VALUE_COUNTER(error_udp_packet_too_short));
INFLUXDB_VALUE_COUNTER(error_udp_packet_too_short),
INFLUXDB_VALUE_COUNTER(error_capture_size_smaller_than_packet),
INFLUXDB_VALUE_COUNTER(error_max_flows_to_track),
INFLUXDB_VALUE_COUNTER(error_flow_memory_alloc));
CHECK_SNPRINTF_RET(bytes);

bytes = snprintf(buf,
Expand Down Expand Up @@ -564,7 +568,7 @@ static int serialize_influx_line(char * buf, size_t siz)
bytes = snprintf(buf, siz, "%s " INFLUXDB_FORMAT(), "risks", INFLUXDB_VALUE_GAUGE(flow_risk_unknown_count));
CHECK_SNPRINTF_RET(bytes);

for (size_t i = 0; i < NDPI_MAX_RISK - 1; ++i)
for (size_t i = 0; i < NDPI_MAX_RISK - 1 /* NDPI_NO_RISK */; ++i)
{
bytes = snprintf(buf,
siz,
Expand Down Expand Up @@ -664,7 +668,7 @@ static int serialize_influx_line(char * buf, size_t siz)
INFLUXD_STATS_GAUGE_SUB(flow_guessed_count);
INFLUXD_STATS_GAUGE_SUB(flow_not_detected_count);

for (size_t i = 0; i < NDPI_MAX_RISK - 1; ++i)
for (size_t i = 0; i < NDPI_MAX_RISK - 1 /* NDPI_NO_RISK */; ++i)
{
INFLUXD_STATS_GAUGE_SUB(flow_risk_count[i]);
}
Expand Down Expand Up @@ -1086,7 +1090,7 @@ static void process_flow_stats(struct nDPIsrvd_socket * const sock, struct nDPIs
{
size_t numeric_risk_len = 0;
char const * const numeric_risk_str = TOKEN_GET_KEY(sock, current, &numeric_risk_len);
nDPIsrvd_ull numeric_risk_value = (nDPIsrvd_ull)-1;
nDPIsrvd_ull numeric_risk_value = 0;
char numeric_risk_buf[numeric_risk_len + 1];

if (numeric_risk_len > 0 && numeric_risk_str != NULL)
Expand Down Expand Up @@ -1151,13 +1155,13 @@ static void process_flow_stats(struct nDPIsrvd_socket * const sock, struct nDPIs
{
continue;
}
if (flow_user_data->risks[i] == numeric_risk_value)
if (flow_user_data->risks[i] == numeric_risk_value - 1)
{
break;
}

INFLUXD_STATS_GAUGE_INC(flow_risk_count[numeric_risk_value]);
flow_user_data->risks[i] = numeric_risk_value;
INFLUXD_STATS_GAUGE_INC(flow_risk_count[numeric_risk_value - 1]);
flow_user_data->risks[i] = numeric_risk_value - 1;
break;
}
}
Expand Down Expand Up @@ -1545,10 +1549,11 @@ static int parse_options(int argc, char ** argv, struct nDPIsrvd_socket * const

static void sighandler(int signum)
{
(void)signum;
logger(0, "Received SIGNAL %d", signum);

if (main_thread_shutdown == 0)
{
logger(0, "%s", "Shutting down ..");
main_thread_shutdown = 1;
}
}
Expand Down
Loading

0 comments on commit a007a90

Please sign in to comment.