Skip to content

Commit eb7b311

Browse files
committed
Add Fortinet detection
Add Fortinet products detection for module network_device
1 parent 56e46e3 commit eb7b311

File tree

4 files changed

+488
-72
lines changed

4 files changed

+488
-72
lines changed

tests/attack/test_mod_network_device.py

Lines changed: 235 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,21 @@
22
from unittest.mock import AsyncMock
33

44
import httpx
5+
from httpx import RequestError
56
import respx
67
import pytest
78

89
from wapitiCore.net.classes import CrawlerConfiguration
910
from wapitiCore.net import Request
1011
from wapitiCore.net.crawler import AsyncCrawler
1112
from wapitiCore.attack.mod_network_device import ModuleNetworkDevice
13+
from wapitiCore.attack.network_devices.mod_forti import ModuleForti
1214

1315

1416
@pytest.mark.asyncio
1517
@respx.mock
16-
async def test_no_ubika():
17-
# Test no UBIKA detected
18+
async def test_no_net_device():
19+
# Test no network device detected
1820
respx.get("http://perdu.com/").mock(
1921
return_value=httpx.Response(
2022
200,
@@ -130,3 +132,234 @@ async def test_ubika_with_version():
130132
assert persister.add_payload.call_args_list[0][1]["info"] == (
131133
'{"name": "UBIKA WAAP", "version": "6.5.6", "categories": ["Network Equipment"], "groups": ["Content"]}'
132134
)
135+
136+
137+
@pytest.mark.asyncio
138+
@respx.mock
139+
async def test_detect_fortimanager():
140+
respx.get("http://perdu.com/p/login/").mock(
141+
return_value=httpx.Response(
142+
200,
143+
content='<html><head><title>Vous Perdu ?</title></head><body><h1>Perdu sur Internet ?</h1> \
144+
<h2>Pas de panique, on va vous aider</h2> \
145+
<div class="sign-in-header" style="visibility: hidden"><span class="platform">FortiManager-3000G</span>'
146+
'</body></html>'
147+
)
148+
)
149+
respx.get("http://perdu.com/").mock(
150+
return_value=httpx.Response(
151+
200,
152+
content='<html><head><title>Vous Perdu ?</title></head><body><h1>Perdu sur Internet ?</h1> \
153+
<h2>Pas de panique, on va vous aider</h2> </body></html>'
154+
)
155+
)
156+
157+
respx.get(url__regex=r"http://perdu.com/.*?").mock(return_value=httpx.Response(404))
158+
159+
persister = AsyncMock()
160+
161+
request = Request("http://perdu.com/")
162+
request.path_id = 1
163+
164+
crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"))
165+
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
166+
options = {"timeout": 10, "level": 2, "tasks": 20}
167+
168+
module = ModuleNetworkDevice(crawler, persister, options, Event(), crawler_configuration)
169+
170+
await module.attack(request)
171+
172+
assert persister.add_payload.call_count == 1
173+
assert persister.add_payload.call_args_list[0][1]["info"] == (
174+
'{"name": "FortiManager", "version": "", "categories": ["Network Equipment"], "groups": ["Content"]}'
175+
)
176+
177+
178+
@pytest.mark.asyncio
179+
@respx.mock
180+
async def test_detect_ssl_vpn():
181+
respx.get("http://perdu.com/remote/login?lang=en").mock(
182+
return_value=httpx.Response(
183+
200,
184+
content='<html><head><title>Login</title></head><body><h1>Perdu sur Internet ?</h1> \
185+
<h2>Pas de panique, on va vous aider</h2> '
186+
)
187+
)
188+
189+
respx.get("http://perdu.com/remote/fgt_lang?lang=fr").mock(
190+
return_value=httpx.Response(
191+
200,
192+
content='<html><head><title>Vous Perdu ?</title></head><body><h1>Perdu sur Internet ?</h1> \
193+
<h2>Pas de panique, on va vous aider</h2> </body></html>'
194+
)
195+
)
196+
197+
respx.get(url__regex=r"http://perdu.com/.*?").mock(return_value=httpx.Response(404))
198+
199+
persister = AsyncMock()
200+
201+
request = Request("http://perdu.com/")
202+
request.path_id = 1
203+
204+
crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"))
205+
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
206+
options = {"timeout": 10, "level": 2, "tasks": 20}
207+
208+
module = ModuleNetworkDevice(crawler, persister, options, Event(), crawler_configuration)
209+
210+
await module.attack(request)
211+
212+
assert persister.add_payload.call_count == 1
213+
assert persister.add_payload.call_args_list[0][1]["info"] == (
214+
'{"name": "Fortinet SSL-VPN", "version": "", "categories": ["Network Equipment"], "groups": ["Content"]}'
215+
)
216+
217+
218+
@pytest.mark.asyncio
219+
@respx.mock
220+
async def test_detect_fortinet():
221+
respx.get("http://perdu.com/login/?next=/").mock(
222+
return_value=httpx.Response(
223+
200,
224+
content='<html><head><title>Login</title></head><body><h1>Perdu sur Internet ?</h1> \
225+
<h2>Pas de panique, on va vous aider</h2> '
226+
)
227+
)
228+
respx.get("http://perdu.com/").mock(
229+
return_value=httpx.Response(
230+
200,
231+
content='<html><head><title>Vous Perdu ?</title></head><body><h1>Perdu sur Internet ?</h1> \
232+
<h2>Pas de panique, on va vous aider</h2> </body></html>'
233+
)
234+
)
235+
236+
respx.get(url__regex=r"http://perdu.com/.*?").mock(return_value=httpx.Response(404))
237+
238+
persister = AsyncMock()
239+
240+
request = Request("http://perdu.com/")
241+
request.path_id = 1
242+
243+
crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"))
244+
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
245+
options = {"timeout": 10, "level": 2, "tasks": 20}
246+
247+
module = ModuleNetworkDevice(crawler, persister, options, Event(), crawler_configuration)
248+
249+
await module.attack(request)
250+
251+
assert persister.add_payload.call_count == 1
252+
assert persister.add_payload.call_args_list[0][1]["info"] == (
253+
'{"name": "Fortinet", "version": "", "categories": ["Network Equipment"], "groups": ["Content"]}'
254+
)
255+
256+
257+
@pytest.mark.asyncio
258+
@respx.mock
259+
async def test_detect_fortiportal_from_title():
260+
respx.get("http://perdu.com/fpc/app/login").mock(
261+
return_value=httpx.Response(
262+
200,
263+
content='<html><head><title>FortiPortal</title></head><body><h1>Perdu sur Internet ?</h1> \
264+
<h2>Pas de panique, on va vous aider</h2> '
265+
)
266+
)
267+
respx.get("http://perdu.com/").mock(
268+
return_value=httpx.Response(
269+
200,
270+
content='<html><head><title>Vous Perdu ?</title></head><body><h1>Perdu sur Internet ?</h1> \
271+
<h2>Pas de panique, on va vous aider</h2> </body></html>'
272+
)
273+
)
274+
275+
respx.get(url__regex=r"http://perdu.com/.*?").mock(return_value=httpx.Response(404))
276+
277+
persister = AsyncMock()
278+
279+
request = Request("http://perdu.com/")
280+
request.path_id = 1
281+
282+
crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"))
283+
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
284+
options = {"timeout": 10, "level": 2, "tasks": 20}
285+
286+
module = ModuleNetworkDevice(crawler, persister, options, Event(), crawler_configuration)
287+
288+
await module.attack(request)
289+
290+
assert persister.add_payload.call_count == 1
291+
assert persister.add_payload.call_args_list[0][1]["info"] == (
292+
'{"name": "FortiPortal", "version": "", "categories": ["Network Equipment"], "groups": ["Content"]}'
293+
)
294+
295+
296+
@pytest.mark.asyncio
297+
@respx.mock
298+
async def test_detect_fortimail():
299+
respx.get("http://perdu.com/admin/").mock(
300+
return_value=httpx.Response(
301+
200,
302+
content='<html><head><title>FortiMail</title><meta name="FortiMail" content="width=device-width, initial-scale=1">\
303+
</head><body><h1>Perdu sur Internet ?</h1> \
304+
<h2>Pas de panique, on va vous aider</h2> '
305+
)
306+
)
307+
respx.get("http://perdu.com/").mock(
308+
return_value=httpx.Response(
309+
200,
310+
content='<html><head><title>Vous Perdu ?</title></head><body><h1>Perdu sur Internet ?</h1> \
311+
<h2>Pas de panique, on va vous aider</h2> </body></html>'
312+
)
313+
)
314+
315+
respx.get(url__regex=r"http://perdu.com/.*?").mock(return_value=httpx.Response(404))
316+
317+
persister = AsyncMock()
318+
319+
request = Request("http://perdu.com/")
320+
request.path_id = 1
321+
322+
crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"))
323+
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
324+
options = {"timeout": 10, "level": 2, "tasks": 20}
325+
326+
module = ModuleNetworkDevice(crawler, persister, options, Event(), crawler_configuration)
327+
328+
await module.attack(request)
329+
330+
assert persister.add_payload.call_count == 1
331+
assert persister.add_payload.call_args_list[0][1]["info"] == (
332+
'{"name": "FortiMail", "version": "", "categories": ["Network Equipment"], "groups": ["Content"]}'
333+
)
334+
335+
336+
@pytest.mark.asyncio
337+
@respx.mock
338+
async def test_raise_on_request_error():
339+
"""Tests that a RequestError is raised when calling the module with wrong URL."""
340+
341+
respx.get("http://perdu.com/").mock(
342+
return_value=httpx.Response(
343+
200,
344+
content='<html><head><title>Vous Perdu ?</title></head><body><h1>Perdu sur Internet ?</h1> \
345+
<h2>Pas de panique, on va vous aider</h2> </body></html>'
346+
)
347+
)
348+
349+
respx.get(url__regex=r"http://perdu.com/.*").mock(side_effect=RequestError("RequestError occurred: [Errno -2] Name or service not known"))
350+
351+
persister = AsyncMock()
352+
353+
request = Request("http://perdu.com/")
354+
request.path_id = 1
355+
356+
crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"))
357+
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
358+
options = {"timeout": 10, "level": 2, "tasks": 20}
359+
360+
module = ModuleForti(crawler, persister, options, Event(), crawler_configuration)
361+
362+
with pytest.raises(RequestError) as exc_info:
363+
await module.check_forti("http://perdu.com/")
364+
365+
assert exc_info.value.args[0] == "RequestError occurred: [Errno -2] Name or service not known"
Lines changed: 9 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,12 @@
1-
import json
2-
1+
from asyncio import Event
32
from typing import Optional
4-
from urllib.parse import urljoin
5-
from bs4 import BeautifulSoup
6-
from httpx import RequestError
73

8-
from wapitiCore.main.log import logging
4+
from wapitiCore.attack.network_devices.mod_forti import ModuleForti
5+
from wapitiCore.attack.network_devices.mod_ubika import ModuleUbika
96
from wapitiCore.attack.attack import Attack
107
from wapitiCore.net import Request
118
from wapitiCore.net.response import Response
12-
from wapitiCore.definitions.fingerprint import NAME as TECHNO_DETECTED, WSTG_CODE
139

14-
from wapitiCore.main.log import log_blue
1510

1611
MSG_TECHNO_VERSIONED = "{0} {1} detected"
1712
MSG_NO_UBIKA = "No UBIKA Detected"
@@ -20,36 +15,6 @@
2015
class ModuleNetworkDevice(Attack):
2116
"""Base class for detecting version."""
2217
name = "network_device"
23-
version = ""
24-
25-
async def check_ubika(self, url):
26-
check_list = ['app/monitor/']
27-
for item in check_list:
28-
full_url = urljoin(url, item)
29-
request = Request(full_url, 'GET')
30-
try:
31-
response: Response = await self.crawler.async_send(request, follow_redirects=True)
32-
except RequestError:
33-
self.network_errors += 1
34-
raise
35-
soup = BeautifulSoup(response.content, 'html.parser')
36-
title_tag = soup.title
37-
return response.is_success and title_tag and "UBIKA" in title_tag.text.strip()
38-
39-
async def get_ubika_version(self, url):
40-
version = ""
41-
version_uri = "app/monitor/api/info/product"
42-
full_url = urljoin(url, version_uri)
43-
request = Request(full_url, 'GET')
44-
try:
45-
response: Response = await self.crawler.async_send(request, follow_redirects=True)
46-
except RequestError:
47-
self.network_errors += 1
48-
raise
49-
50-
if response.is_success:
51-
version = response.json.get("result", {}).get("product", {}).get("version", '')
52-
return version
5318

5419
async def must_attack(self, request: Request, response: Optional[Response] = None):
5520
if self.finished:
@@ -63,35 +28,9 @@ async def must_attack(self, request: Request, response: Optional[Response] = Non
6328
async def attack(self, request: Request, response: Optional[Response] = None):
6429
self.finished = True
6530
request_to_root = Request(request.url)
66-
67-
try:
68-
if await self.check_ubika(request_to_root.url):
69-
try:
70-
self.version = await self.get_ubika_version(request_to_root.url)
71-
except RequestError as req_error:
72-
self.network_errors += 1
73-
logging.error(f"Request Error occurred: {req_error}")
74-
75-
ubika_detected = {
76-
"name": "UBIKA WAAP",
77-
"version": self.version,
78-
"categories": ["Network Equipment"],
79-
"groups": ["Content"]
80-
}
81-
log_blue(
82-
MSG_TECHNO_VERSIONED,
83-
"UBIKA WAAP",
84-
self.version
85-
)
86-
87-
await self.add_addition(
88-
category=TECHNO_DETECTED,
89-
request=request_to_root,
90-
info=json.dumps(ubika_detected),
91-
wstg=WSTG_CODE
92-
)
93-
else:
94-
log_blue(MSG_NO_UBIKA)
95-
except RequestError as req_error:
96-
self.network_errors += 1
97-
logging.error(f"Request Error occurred: {req_error}")
31+
modules_list = [ModuleUbika, ModuleForti]
32+
for module in modules_list:
33+
mod = module(
34+
self.crawler, self.persister, self.options, Event(), self.crawler_configuration
35+
)
36+
await mod.attack(request_to_root)

0 commit comments

Comments
 (0)