-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmonitor_urls.py
176 lines (146 loc) · 6.54 KB
/
monitor_urls.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
#!/usr/bin/env python3
# v1.0.3
# monitor_urls - by bjoerrrn
# github: https://github.com/bjoerrrn/monitor_urls
# Licensed under GNU GPL version 3.0 or later
import sys
sys.stdout.reconfigure(encoding='utf-8')
import os
import json
import requests
import shlex
import urllib3
import logging
from bs4 import BeautifulSoup
from urllib.parse import urlparse
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Configurations
CONFIG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "monitor_urls.credo")
FAILURE_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "failures.json")
FAILURE_THRESHOLD = 5
TIMEOUT = 10
RETRY_COUNT = 2 # Retry before marking as DOWN
LOG_FILE = "monitor_urls.log"
# Set up logging
logging.basicConfig(filename=LOG_FILE, level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
def load_failures():
"""Load failure tracking data, reset on corruption."""
if os.path.exists(FAILURE_FILE):
try:
with open(FAILURE_FILE, "r") as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
logging.error("Failed to parse failures.json, resetting...")
print("[ERROR] Corrupted failures.json, resetting failure tracking.")
return {}
return {}
def save_failures(failures):
"""Save failure tracking data safely."""
try:
with open(FAILURE_FILE, "w") as f:
json.dump(failures, f, indent=2)
except IOError as e:
logging.error(f"Failed to save failures.json: {e}")
print(f"[ERROR] Unable to save failure data: {e}")
failures = load_failures()
def is_internal_ip(url):
parsed_url = urlparse(url)
hostname = parsed_url.hostname
return hostname and hostname.startswith("192.168.178.")
def load_urls():
"""Load URLs, webhooks, descriptions, and optional keywords from the config file."""
urls = []
if not os.path.exists(CONFIG_FILE):
logging.error(f"Config file {CONFIG_FILE} not found.")
print(f"[ERROR] Config file {CONFIG_FILE} not found.")
return urls
try:
with open(CONFIG_FILE, "r") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
parts = shlex.split(line)
if len(parts) < 3:
logging.warning(f"Invalid line in config: {line}")
print(f"[WARNING] Skipping invalid config line: {line}")
continue
description, url, webhook = parts[:3]
keyword = parts[3] if len(parts) > 3 else None
urls.append({"description": description, "url": url, "webhook": webhook, "keyword": keyword})
except Exception as e:
logging.error(f"Error loading config: {e}")
print(f"[ERROR] Could not load config file: {e}")
return urls
def check_url(url):
"""Checks if a URL is reachable, retries before marking it as DOWN."""
verify_ssl = not is_internal_ip(url)
for attempt in range(1, RETRY_COUNT + 1):
try:
logging.info(f"Checking {url} (Attempt {attempt}/{RETRY_COUNT})")
print(f"[INFO] Checking {url} (Attempt {attempt}/{RETRY_COUNT})")
response = requests.get(url, timeout=TIMEOUT, verify=verify_ssl)
if response.status_code in [200, 401]:
return True, response.text
except requests.RequestException as e:
logging.warning(f"Failed attempt {attempt} for {url}: {e}")
print(f"[WARNING] {url} failed (Attempt {attempt})")
return False, None
def keyword_found(content, keyword):
"""Checks if the keyword exists in the page content."""
if not content or not keyword:
return False
keyword = keyword.lower()
soup = BeautifulSoup(content, "html.parser")
text_only = soup.get_text().lower()
return keyword in text_only
def notify_discord(webhook, message):
"""Sends a notification to Discord and logs it."""
logging.info(f"NOTIFY: {message}")
print(f"[NOTIFY] {message}")
try:
requests.post(webhook, json={"content": message})
except requests.RequestException as e:
logging.error(f"Discord notification failed: {e}")
print(f"[ERROR] Failed to send Discord notification: {e}")
def monitor():
"""Monitors URLs, sending alerts on failures and recoveries only once."""
global failures
urls = load_urls()
for entry in urls:
description = entry["description"]
url = entry["url"]
webhook = entry["webhook"]
keyword = entry["keyword"]
print(f"\n[INFO] Checking {description} ({url})...")
reachable, content = check_url(url)
keyword_missing = keyword and reachable and not keyword_found(content, keyword)
if url not in failures:
failures[url] = {"failures": 0, "notified_down": False, "notified_up": False}
failure_count = failures[url]["failures"]
notified_down = failures[url]["notified_down"]
notified_up = failures[url]["notified_up"]
if not reachable or keyword_missing:
failure_count += 1
logging.warning(f"{description} ({url}) failure count: {failure_count}/{FAILURE_THRESHOLD}")
print(f"[WARNING] {description} ({url}) failure count: {failure_count}/{FAILURE_THRESHOLD}")
if failure_count >= FAILURE_THRESHOLD and not notified_down:
msg = f"❌ {description} ({url})" if not reachable else f"⚠️ {description} ({url}) MISSING '{keyword}'"
notify_discord(webhook, msg)
failures[url]["notified_down"] = True
failures[url]["notified_up"] = False
failures[url]["failures"] = failure_count
else:
if failure_count >= FAILURE_THRESHOLD and not notified_up:
logging.info(f"{description} ({url}) RECOVERED! Sending ✅ notification.")
print(f"[INFO] {description} ({url}) RECOVERED! Sending ✅ notification.")
notify_discord(webhook, f"✅ {description} ({url})")
failures[url]["notified_up"] = True
failures[url]["notified_down"] = False
logging.info(f"{description} ({url}) is UP (Failures Reset).")
print(f"[INFO] {description} ({url}) is UP (Failures Reset).")
failures[url]["failures"] = 0
save_failures(failures)
if __name__ == "__main__":
print("\n[INFO] Starting URL monitor...\n")
monitor()
print("\n[INFO] Monitoring completed.\n")