8
8
from .state import subscriptions
9
9
10
10
LISTEN_PORT = int (os .getenv ('LISTEN_PORT' , '2055' ))
11
- FORWARD_HOST = os .getenv ('FORWARD_HOST' , 'host.docker.internal ' )
11
+ FORWARD_HOST = os .getenv ('FORWARD_HOST' , '127.0.0.1 ' )
12
12
FORWARD_PORTS = os .getenv ('FORWARD_PORTS' )
13
13
FORWARD = [
14
14
(FORWARD_HOST , pt )
15
15
for pt in map (int , FORWARD_PORTS .split (',' ))
16
16
] if FORWARD_PORTS else []
17
17
assert all (0 < pt < 65536 and pt != LISTEN_PORT for _ , pt in FORWARD )
18
+ assert len (FORWARD ) == len (set (FORWARD ))
18
19
19
20
COMMON_HEADER_FMT = '>HHL'
20
21
COMMON_HEADER_SZ = 8
21
22
22
23
23
24
class ServerProtocol (asyncio .Protocol ):
24
25
26
+ def __init__ (self ):
27
+ super ().__init__ ()
28
+ self .log_unsupported_version = 0
29
+ self .log_failed_to_forward = set ()
30
+
25
31
def connection_made (self , transport ):
26
32
self .transport = transport
27
33
@@ -36,16 +42,18 @@ def datagram_received(self, data, addr):
36
42
) = struct .unpack (COMMON_HEADER_FMT , data [:COMMON_HEADER_SZ ])
37
43
38
44
if version not in (5 , 9 , 10 ):
39
- # TODO 10 times
40
- logging .warning ('unsupported netflow version' )
45
+ if self .log_unsupported_version < 10 :
46
+ self .log_unsupported_version += 1
47
+ logging .warning ('unsupported netflow version' )
41
48
return
42
49
43
50
for dest in FORWARD :
44
51
try :
45
52
self .transport .sendto (data , dest )
46
53
except Exception :
47
- # TODO 1 time / conn
48
- logging .warning ('failed to forward package' )
54
+ if dest not in self .log_failed_to_forward :
55
+ self .log_failed_to_forward .add (dest )
56
+ logging .warning (f'failed to forward package to { dest } ' )
49
57
50
58
# v5 has no templates so could be ignored when no checks are listening
51
59
if version == 5 and not subscriptions :
0 commit comments