-
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathimport_nginx_waf.py
123 lines (100 loc) · 3.88 KB
/
import_nginx_waf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import os
import subprocess
import logging
from pathlib import Path
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler()],
)
# Constants (configurable via environment variables)
WAF_DIR = Path(os.getenv("WAF_DIR", "/tmp/waf_patterns/nginx")).resolve() # Source directory for WAF files
NGINX_WAF_DIR = Path(os.getenv("NGINX_WAF_DIR", "/etc/nginx/waf/")).resolve() # Target directory
NGINX_CONF = Path(os.getenv("NGINX_CONF", "/etc/nginx/nginx.conf")).resolve() # Nginx config file
INCLUDE_STATEMENT = "include /etc/nginx/waf/*.conf;" # Include directive
def copy_waf_files():
"""
Copy Nginx WAF configuration files to the target directory.
Raises:
Exception: If there is an error copying files.
"""
logging.info("Copying Nginx WAF patterns...")
try:
# Ensure the target directory exists
NGINX_WAF_DIR.mkdir(parents=True, exist_ok=True)
logging.info(f"[+] Created or verified directory: {NGINX_WAF_DIR}")
# Copy .conf files from source to target directory
for conf_file in WAF_DIR.glob("*.conf"):
dst_path = NGINX_WAF_DIR / conf_file.name
if dst_path.exists():
logging.warning(f"[!] File already exists: {dst_path}")
continue
try:
subprocess.run(["cp", str(conf_file), str(dst_path)], check=True)
logging.info(f"[+] Copied {conf_file} to {NGINX_WAF_DIR}")
except subprocess.CalledProcessError as e:
logging.error(f"[!] Failed to copy {conf_file}: {e}")
raise
except Exception as e:
logging.error(f"[!] Error copying WAF files: {e}")
raise
def update_nginx_conf():
"""
Ensure the WAF include statement is present in the Nginx configuration file.
Raises:
Exception: If there is an error updating the Nginx configuration.
"""
logging.info("Ensuring WAF patterns are included in nginx.conf...")
try:
# Read the current configuration
with open(NGINX_CONF, "r") as f:
config = f.read()
# Append include statement if not present
if INCLUDE_STATEMENT not in config:
logging.info("Adding WAF include to nginx.conf...")
with open(NGINX_CONF, "a") as f:
f.write(f"\n{INCLUDE_STATEMENT}\n")
logging.info("[+] WAF include statement added to nginx.conf.")
else:
logging.info("WAF already included in nginx.conf.")
except Exception as e:
logging.error(f"[!] Error updating Nginx configuration: {e}")
raise
def reload_nginx():
"""
Reload Nginx to apply the new WAF rules.
Raises:
Exception: If there is an error reloading Nginx.
"""
logging.info("Reloading Nginx to apply new WAF rules...")
try:
# Test Nginx configuration
subprocess.run(["nginx", "-t"], check=True)
logging.info("[+] Nginx configuration test passed.")
# Reload Nginx
subprocess.run(["systemctl", "reload", "nginx"], check=True)
logging.info("[+] Nginx reloaded successfully.")
except subprocess.CalledProcessError as e:
logging.error(f"[!] Nginx configuration test failed: {e}")
raise
except FileNotFoundError:
logging.error("[!] 'nginx' or 'systemctl' command not found. Are you on a supported system?")
raise
except Exception as e:
logging.error(f"[!] Error reloading Nginx: {e}")
raise
def main():
"""
Main function to execute the script.
"""
try:
copy_waf_files()
update_nginx_conf()
reload_nginx()
logging.info("[✔] Nginx configured with latest WAF rules.")
except Exception as e:
logging.critical(f"[!] Script failed: {e}")
exit(1)
if __name__ == "__main__":
main()