-
Notifications
You must be signed in to change notification settings - Fork 1
/
telekom-verfuegbarkeit
executable file
·136 lines (115 loc) · 4.26 KB
/
telekom-verfuegbarkeit
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import urllib.request, urllib.parse, urllib.error
import sys
import json
import argparse
from datetime import datetime
import re
from time import sleep
from influxdb import line_protocol
MEASUREMENT_NAME = 'telekom_verfuegbarkeit'
SESSION_URL = 'https://ebs01.telekom.de/acproxy/usesCaptcha.do?source=verfuegbarkeit'
API_URL = 'https://ebs01.telekom.de/acproxy/ace.do'
USER_AGENT = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36'
MAX_RETRIES = 10
RETRY_WAIT = 3
TIMEOUT = 30
def get_session_id():
req = urllib.request.Request(SESSION_URL)
res = urllib.request.urlopen(req, None, TIMEOUT)
headers = list(res.info().items())
session_id = ""
for header in headers:
if header[0] and header[0].lower() == 'set-cookie':
m = re.search('JSESSIONID=(.*?);', header[1])
session_id = m.group(1)
break
return session_id
def get_verfuegbarkeit(strasse, hausnummer, plz, ort, ortsteil='', hausnummerzusatz=''):
data = {
# configurable params
'plz': plz,
'strasse': strasse,
'hausnummer': hausnummer,
'hausnummerzusatz': hausnummerzusatz,
'ort': ort,
'ortsteil': ortsteil,
# hard-coded params
'captchacode': '',
'source': 'verfuegbarkeit',
'ausbauinformationen': 'true',
'wholebuy': 'true',
'empfehlung': 'true',
'ausbaufiber': 'true',
'ausbautmagic': 'true',
'geschaeftsfall': '2000',
'geschaeftsfallVespa': 'BESTANDSKUNDE_AENDERN',
'geschaeftsfallPom': 'UPGRADE',
'komplettwechsel': 'false',
'homespassed': 'true'
}
res = None
for i in range(MAX_RETRIES+1):
try:
headers = {
'User-Agent': USER_AGENT,
'Accept': 'application/json',
'Content-Type': 'application/x-www-form-urlencoded',
'Cookie': 'JSESSIONID=' + get_session_id()
}
req = urllib.request.Request(API_URL, headers=headers, data=urllib.parse.urlencode(data).encode("utf-8"))
res = json.loads(urllib.request.urlopen(req).read().decode('utf-8'))
except Exception as e:
eprint("Error: %s" % e)
if i < MAX_RETRIES:
eprint("Will do %i. retry in %i sec(s)..." % (i+1, RETRY_WAIT ))
sleep(RETRY_WAIT)
else:
eprint("Maximum number of retries exceeded, giving up")
continue
break
if not res: return None
return {
"max_downstream": res["maxDownstream"],
"max_upstream": res["maxUpstream"]
}
def get_data_points(address, verfuegbarkeit):
return {
"points": [{
"measurement": MEASUREMENT_NAME,
"tags": {
"address": address
},
"fields": {
"max_upstream": verfuegbarkeit['max_upstream'],
"max_downstream": verfuegbarkeit['max_downstream']
},
"time": get_current_utc_time()
}]
}
def get_current_utc_time():
return datetime.utcnow().replace(microsecond=0).isoformat()
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--strasse", help="Straße")
parser.add_argument("--hausnummer", help="Hausnummer")
parser.add_argument("--hausnummerzusatz", help="Hausnummerzusatz", default='')
parser.add_argument("--plz", help="PLZ")
parser.add_argument("--ort", help="Ort")
parser.add_argument("--ortsteil", help="Ortsteil", default='')
args = parser.parse_args()
if not args.strasse or not args.hausnummer or not args.plz or not args.ort:
eprint("Missing required parameter(s)")
exit(1)
verfuegbarkeit = get_verfuegbarkeit(args.strasse, args.hausnummer,
args.plz, args.ort, args.ortsteil, args.hausnummerzusatz)
if not verfuegbarkeit:
exit(1)
address = "%s %s, %s %s" % ( args.strasse, args.hausnummer, args.plz, args.ort )
lines = line_protocol.make_lines(get_data_points(address, verfuegbarkeit))
print(lines, end='')
if __name__ == '__main__':
main()