-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathcheckfritz.py
executable file
·146 lines (113 loc) · 4.9 KB
/
checkfritz.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#!/opt/bin/python3
from fritzconnection import FritzConnection
import sys
FRITZBOX_USER = "monitoring"
FRITZBOX_PASSWORD = "xxx"
try:
fc = FritzConnection(address='192.168.178.1', user=FRITZBOX_USER, password=FRITZBOX_PASSWORD, timeout=2.0)
except BaseException:
print("Cannot connect to fritzbox.")
sys.exit(1)
def readout(module, action, variable=None, show=False, numeric=True):
'''
Generic readout function, that wraps values in a json-compliant way.
:module: TR-064 sub-modules, such as 'WANIPConn1'
:action: Calls an action, e.g. 'GetStatusInfo', as defined by TR-04 (cf. https://avm.de/service/schnittstellen/)
:variable: (optional) a specific variable out of this set to extract
:show: print variable name
:numeric: cast value to numeric
'''
try:
answer_dict = fc.call_action(module, action)
except BaseException:
print(f"Could not query {module} with action {action}")
raise
# cast the 64 bit traffic counters into int
if action == "GetAddonInfos":
answer_dict['NewX_AVM_DE_TotalBytesSent64'] = int(answer_dict['NewX_AVM_DE_TotalBytesSent64'])
answer_dict['NewX_AVM_DE_TotalBytesReceived64'] = int(answer_dict['NewX_AVM_DE_TotalBytesReceived64'])
if variable:
# single variable extraction mode
answer_dict = str(answer_dict[variable])
# FIXME: try type-conversion to int, then fallback to string.
if not numeric:
answer_dict = '"' + answer_dict + '"'
if show:
answer_dict = '"' + variable + '": ' + answer_dict
else:
# remove unwanted keys in a safe way
entitiesToRemove = ('NewAllowedCharsSSID', 'NewDNSServer1', 'NewDNSServer2', 'NewVoipDNSServer1',
'NewVoipDNSServer2', 'NewATURVendor', 'NewATURCountry', 'NewDeviceLog')
entitiesToRemove = [answer_dict.pop(k, None) for k in entitiesToRemove]
# cast to string, omit the {} without a regex :)
answer_dict = str(answer_dict)[1:-1]
# handle stupid naming of counters in LAN, so we can use grouping in grafana...
answer_dict = answer_dict.replace("NewBytes", "NewTotalBytes")
answer_dict = answer_dict.replace("NewPackets", "NewTotalPackets")
# ugly string-cast to a dictionary that has json compliant "
flattened_string = answer_dict.replace("'", '"').replace("True", "true").replace("False", "false")
return flattened_string
def assemble(*args):
# ugly hack json array constructor.
json_dict = '\t "v": {' + ', '.join(list(args)) + "}"
print(json_dict)
def add_device_tag(starting=False):
if starting:
comma = ""
else:
comma = ","
print('\t' + comma + '{"box": "' + deviceinfo + '",')
def add_interface_tag(interface):
print('\t"interface": "' + interface + '",')
def end_device():
print('\t}')
#############
# tag every meaurement by fritzbox serial number
deviceinfo = readout('DeviceInfo1', 'GetInfo', 'NewSerialNumber')
# list of measurements - so telegraf puts them in separate lines
print('[')
# box generic info
add_device_tag(starting=True)
uptime = readout('DeviceInfo1', 'GetInfo', 'NewUpTime', show=True)
version = readout('DeviceInfo1', 'GetInfo', 'NewDescription', show=True, numeric=False)
dhcp_leases = readout('Hosts1', 'GetHostNumberOfEntries', show=True)
assemble(uptime, version, dhcp_leases)
end_device()
# tag list by box & interface
add_device_tag()
add_interface_tag("wan")
status = readout('WANIPConn1', 'GetStatusInfo')
link = readout('WANCommonIFC1', 'GetCommonLinkProperties')
my_ip = readout('WANIPConn', 'GetExternalIPAddress')
my_ipv6 = readout('WANIPConn', 'X_AVM_DE_GetExternalIPv6Address', 'NewExternalIPv6Address', show=True, numeric=False)
# my_ipv6_prefix = readout('WANIPConn','X_AVM_DE_GetIPv6Prefix','NewIPv6Prefix', show=False)+"/"+
# readout('WANIPConn','X_AVM_DE_GetIPv6Prefix','NewPrefixLength', show=False)
info = readout('WANDSLInterfaceConfig1', 'GetInfo')
traffic = readout('WANCommonIFC1', 'GetAddonInfos')
assemble(status, link, my_ip, my_ipv6, info, traffic)
end_device()
# check dect
add_device_tag()
add_interface_tag("dect")
registered = readout('X_AVM-DE_Dect1', 'GetNumberOfDectEntries')
assemble(registered)
end_device()
# check voip
add_device_tag()
add_interface_tag("voip")
registered = readout('X_VoIP1', 'X_AVM-DE_GetNumberOfNumbers')
assemble(registered)
end_device()
# tag list for the other networks
for i, interface in enumerate(['lan', 'wlan24', 'wlan5', 'wlanGuest']):
add_device_tag()
add_interface_tag(interface)
if i == 0:
assemble(readout('LANEthernetInterfaceConfig1', 'GetStatistics'))
else:
stats = readout('WLANConfiguration'+str(i), 'GetStatistics')
associations = readout('WLANConfiguration'+str(i), 'GetTotalAssociations')
info = readout('WLANConfiguration'+str(i), 'GetInfo', "NewChannel", show=True)
assemble(stats, associations, info)
end_device()
print("]")