Skip to content

Commit

Permalink
multiple exporters and mulitple observation domains
Browse files Browse the repository at this point in the history
  • Loading branch information
Koos85 committed Aug 20, 2024
1 parent 632b4c9 commit 78f153a
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 31 deletions.
23 changes: 12 additions & 11 deletions lib/ipflow/flow.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import struct
from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network
from typing import Iterator, Union
from typing import Dict, Iterator, Tuple, Union
from .field_type import FIELD_TYPE_FUNC
from .field_type import FieldType
from .template import DataTemplate


# negative values don't collide with normal template ID's (uint)
V5_TEMPLATE_ID = -1
V5_TEMPLATE_KEY = None, None, None
V5_TEMPLATE_FMT = '>4s4s4sHHLLLLHH2sBBB3s4s'
V5_TEMPLATE_SIZE = struct.calcsize(V5_TEMPLATE_FMT)

flowset_templates = {
V5_TEMPLATE_ID: (
flowset_templates: Dict[Tuple[str, int, int], DataTemplate] = {
V5_TEMPLATE_KEY: DataTemplate(
V5_TEMPLATE_FMT,
V5_TEMPLATE_SIZE,
[],
Expand All @@ -34,22 +34,23 @@
None, # 3 byte padding
None, # reserved
],
0, # uptime not used for v5 packets
)
}


class Flow:
__slots__ = (
'flowset_id',
'template',
'values',
)

def __init__(self, flowset_id, values):
self.flowset_id = flowset_id
def __init__(self, template: DataTemplate, values: Tuple[bytes]):
self.template = template
self.values = values

def serialize(self) -> dict:
_, _, fields, _ = flowset_templates[self.flowset_id]
fields = self.template.fields
return {
f.name: FIELD_TYPE_FUNC.get(f.id, lambda val: val)(val)
for f, val in zip(fields, self.values)
Expand All @@ -59,7 +60,7 @@ def test_address(
self,
address: Union[IPv4Address, IPv6Address]
) -> Iterator[Union[IPv4Address, IPv6Address]]:
_, _, _, fields_idx = flowset_templates[self.flowset_id]
fields_idx = self.template.index
for ft in (
FieldType.IPV4_DST_ADDR,
FieldType.IPV4_NEXT_HOP,
Expand All @@ -84,7 +85,7 @@ def test_network(
self,
network: Union[IPv4Network, IPv6Network]
) -> Iterator[Union[IPv4Network, IPv6Network]]:
_, _, _, fields_idx = flowset_templates[self.flowset_id]
fields_idx = self.template.index
for ft in (
FieldType.IPV4_DST_ADDR,
FieldType.IPV4_NEXT_HOP,
Expand Down
40 changes: 32 additions & 8 deletions lib/ipflow/flowset.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,53 @@
import struct
from .field import Field
from .flow import Flow, flowset_templates
from .template import DataTemplate


def on_flowset_template(line: bytes):
def on_flowset_template(
line: bytes,
source: str,
source_id: int,
source_uptime: int,
):
pos = 0
while pos < len(line):
template_id, field_count = struct.unpack('>HH', line[pos:pos+4])
# rfc 3954 5.1
# NetFlow Collectors SHOULD use the combination of the source IP
# address and the Source ID field to separate different export
# streams originating from the same Exporter.
key = source, source_id, template_id
template = flowset_templates.get(key)
if template and template.source_uptime < source_uptime:
pos += 4 + field_count * 4
continue

fields = [
Field(*struct.unpack('>HH', line[i:i+4]))
for i in range(pos + 4, pos + 4 + field_count * 4, 4)
]

pos += 4 + field_count * 4
flowset_templates[template_id] = (
flowset_templates[key] = DataTemplate(
'>' + ''.join(f._fmt for f in fields),
sum(f.length for f in fields),
fields,
[f.id for f in fields],
source_uptime,
)


def on_flowset(line: bytes, flowset_id: int):
if flowset_id in flowset_templates:
fmt, length, _, _ = flowset_templates[flowset_id]
for i in range(0, len(line) - 4, length):
values = struct.unpack(fmt, line[i:i+length])
yield Flow(flowset_id, values)
def on_flowset(
line: bytes,
flowset_id: int,
source: str,
source_id: int,
):
key = source, source_id, flowset_id
template = flowset_templates.get(key)
if template:
fmt = template._fmt
pad = len(line) % template.length
for values in struct.iter_unpack(fmt, line[:-pad] if pad else line):
yield Flow(template, values)
6 changes: 3 additions & 3 deletions lib/ipflow/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
HEADER_SIZE = struct.calcsize(HEADER_FMT)


def on_packet(line: bytes):
def on_packet(line: bytes, source: str):
(
version,
count,
Expand All @@ -33,13 +33,13 @@ def on_packet(line: bytes):

if flowset_id == 0:
try:
on_flowset_template(flowset)
on_flowset_template(flowset, source, source_id, sysuptime)
except Exception:
logging.error('failed to parse FlowSet template')
break
elif flowset_id > 255:
try:
for flow in on_flowset(flowset, flowset_id):
for flow in on_flowset(flowset, flowset_id, source, source_id):
yield flow
except Exception:
logging.warning('failed to parse FlowSet')
Expand Down
8 changes: 5 additions & 3 deletions lib/ipflow/parser_v10.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
HEADER_SIZE = struct.calcsize(HEADER_FMT)


def on_packet_v10(line: bytes):
def on_packet_v10(line: bytes, source: str):
(
version,
message_length,
Expand All @@ -30,13 +30,15 @@ def on_packet_v10(line: bytes):

if flowset_id == 2:
try:
on_flowset_template(flowset)
on_flowset_template(flowset, source, observation_domain_id,
export_time)
except Exception:
logging.error('failed to parse FlowSet template')
break
elif flowset_id > 255:
try:
for flow in on_flowset(flowset, flowset_id):
for flow in on_flowset(flowset, flowset_id, source,
observation_domain_id):
yield flow
except Exception:
logging.warning('failed to parse FlowSet')
Expand Down
4 changes: 2 additions & 2 deletions lib/ipflow/parser_v5.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import struct
from .flowset import on_flowset
from .flow import V5_TEMPLATE_ID, V5_TEMPLATE_SIZE
from .flow import V5_TEMPLATE_KEY, V5_TEMPLATE_SIZE


HEADER_FMT = '>HHLLLLBBH'
Expand All @@ -28,7 +28,7 @@ def on_packet_v5(line: bytes):
pos += flowset_size

try:
for flow in on_flowset(flowset, V5_TEMPLATE_ID):
for flow in on_flowset(flowset, *V5_TEMPLATE_KEY):
yield flow
except Exception:
logging.warning('failed to parse FlowSet')
Expand Down
20 changes: 20 additions & 0 deletions lib/ipflow/template.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from typing import List
from .field import Field


class DataTemplate:
__slots__ = (
'_fmt',
'length',
'fields',
'index',
'source_uptime',
)

def __init__(self, fmt: str, length: int, fields: List[Field],
index: List[int], source_uptime: int):
self._fmt = fmt
self.fields = fields
self.index = index
self.length = length
self.source_uptime = source_uptime
2 changes: 1 addition & 1 deletion lib/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def datagram_received(self, data, addr):
}[version]
# parse every packet regardless of any subscriptions
# whe need the template flowsets
for flow in parser(data):
for flow in parser(data, addr[0]):
for subs in subscriptions.values():
subs.on_flow(flow)

Expand Down
8 changes: 5 additions & 3 deletions lib/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ class Subscription(NamedTuple):
def make(cls, address: Union[IPv4Address, IPv6Address]):
self = cls(
address=address,
result={},
result=[],
timestamp=int(time.time()),
)
return self

def on_flow(self, flow: Flow):
if flow.test_address(self.address):
self.append(flow)
# compare the raw address bytes against all parsed fields regardless of
# field type
if self.address.packed in flow.values:
self.result.append(flow)

0 comments on commit 78f153a

Please sign in to comment.