-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
otx.py
executable file
·126 lines (102 loc) · 3.29 KB
/
otx.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#!/usr/bin/python3
"""
Stream subscribed OTX events.
Arguments:
count: minimum count of related pulses that is required for the
IP to be added to the blocklist.
Example playbook:
- name: otx events
hosts: all
sources:
- name: Match all messages
ansible.eda.otx:
count: "1"
rules:
- name: Send to playboox
condition: event.otx is defined
action:
run_playbook:
name: otx_ufw.yml
"""
import os
import datetime
import ipaddress
import asyncio
import sys
import time
from typing import Any, Dict
from OTXv2 import OTXv2, IndicatorTypes
OTX_APIKEY = os.getenv("OTX_APIKEY")
otx = OTXv2(OTX_APIKEY)
def get_indicator(indicator_type, indicator_address, count):
"""Get indicator IP address."""
count = int(count)
block_ips = set()
indicator_type_map = {
"domain": IndicatorTypes.DOMAIN,
"hostname": IndicatorTypes.HOSTNAME,
"IPv4": IndicatorTypes.IPv4,
"IPv6": IndicatorTypes.IPv6,
}
if indicator_type not in indicator_type_map:
return
event_indicator = otx.get_indicator_details_full(
indicator_type_map[indicator_type], indicator_address
)
if event_indicator["general"]["pulse_info"]["count"] < count:
return
for dns in event_indicator["passive_dns"]["passive_dns"]:
try:
if ipaddress.ip_address(dns["address"]):
block_ips.add(dns["address"])
except ValueError:
pass
if block_ips:
return block_ips
async def main(queue: asyncio.Queue, args: Dict[str, Any]):
"""
Main loop that sends any pulse info to the get_indicator function
and add adds otx={"ip": block_ip} to the queue.
"""
delay = args.get("delay", 1)
count = args.get("count", [])
block_ips = set()
reported_ips = set()
if not count:
sys.exit(1)
while True:
queue_time = (
datetime.datetime.utcnow() - datetime.timedelta(days=1)
).isoformat()
pulses = otx.getall(modified_since=queue_time)
# Just so we don't trigger rate-limit.
time.sleep(300)
for pulse in pulses:
if pulse["indicators"]:
supported_types = ["domain", "hostname", "IPv4", "IPv6"]
if (
pulse["indicators"][0]["is_active"] == 1
and pulse["indicators"][0]["type"] in supported_types
):
block_ips = get_indicator(
pulse["indicators"][0]["type"],
pulse["indicators"][0]["indicator"],
count,
)
try:
for block_ip in block_ips:
if block_ip not in reported_ips:
reported_ips.add(block_ip)
await queue.put(dict(otx={"ip": block_ip}))
await asyncio.sleep(delay)
if len(reported_ips) >= 1000:
reported_ips = set()
except TypeError:
pass
if __name__ == "__main__":
class MockQueue:
"""Create a mock queue for testing."""
async def put(self, event):
"""Print event."""
print(event)
asyncio.run(main(MockQueue(), {"count": "1"}))