-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathip2dns.py
105 lines (86 loc) · 2.04 KB
/
ip2dns.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
domain2ip={}
ip2domain={}
ip2short={}
import ipaddress
def feed(ip,domain):
#print(ip,domain)
ipnum=int(ipaddress.ip_address(ip))
l=ip2domain.get(ipnum)
checkForShort=False
if not l:
l=[]
ip2domain[ipnum]=l
checkForMore=True
if domain not in l:
l.append(domain)
checkForShort=True
if checkForShort:
sdomain=shorten(domain)
l=ip2short.get(ipnum)
if not l:
l=[]
ip2short[ipnum]=l
if sdomain not in l:
l.append(sdomain)
l=domain2ip.get(domain)
if not l:
l=[]
domain2ip[domain]=l
if ip not in l:
l.append(ip)
def getByIp(ip):
return ip2domain.get(int(ip))
def getShortByIp(ip):
return ip2short.get(int(ip))
def getByDomain(domain):
return domain2ip.get(domain)
def dump():
return (domain2ip,ip2domain,ip2short,)
def load(*args):
domain2ip,ip2domain,ip2short=args
from publicsuffixlist import PublicSuffixList
psl = PublicSuffixList()
def shorten(domain):
if not domain:
return domain
ret = psl.privatesuffix(domain)
if ret:
return ret
if not psl.is_public(domain):
print("FIXME: PSL could not digest:",domain)
return domain # it IS a private suffix or not a domain
import dnsmasq_parser,config
def onJournalMessage(entry):
l=entry["MESSAGE"]
r=dnsmasq_parser.parse_reply(l)
if r:
ip=r["ip"]
domain=r["domain"]
if domain in config.ignored_domains:
return
if not "NODATA" in ip and not "NXDOMAIN" in ip:
try:
feed(ip,domain)
except ValueError as e:
print("FAILED?",l) #TODO: FAILED? reply 142.250.74.36 is arn09s22-in-f4.1e100.net FAILED? reply error is SERVFAIL
return r
def seedFromDnsmasq():
import select
from systemd import journal
j = journal.Reader()
j.log_level(journal.LOG_INFO)
j.add_match(_SYSTEMD_UNIT="dnsmasq.service")
for entry in j:
onJournalMessage(entry)
if __name__ == '__main__':
seedFromDnsmasq()
import pprint
domains={}
for domain,ips in domain2ip.items():
domain = shorten(domain)
if not domains.get(domain):
domains[domain]=ips
for k,v in domains.items():
print(k,"\t","\t".join(v))
continue
#import code; code.interact(local=locals())