Skip to content

Commit 6179119

Browse files
committed
Merge branch '20-admin-webapp-anomaly-explorer' into 'main'
Resolve "Admin WebApp Anomaly Explorer" See merge request imprj/01-bgp-testbed/zettabgp!16
2 parents a61390c + f3d81d5 commit 6179119

21 files changed

+568
-103
lines changed

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
setup(
44
name='ZettaBGP',
5-
version='0.2.2',
5+
version='0.2.3',
66
description='',
77
url='https://git.univ.leitwert.net/imprj/01-bgp-testbed/zettabgp',
88
author='Benedikt Schwering & Sebastian Forstner',

src/controllers/message_replay.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# -*- coding: utf-8 -*-
2+
'''
3+
ZettaBGP - Advanced Anomaly Detection in Internet Routing
4+
Copyright (c) 2024 Benedikt Schwering and Sebastian Forstner
5+
6+
This work is licensed under the terms of the MIT license.
7+
For a copy, see LICENSE in the project root.
8+
9+
Author:
10+
Benedikt Schwering <bes9584@thi.de>
11+
Sebastian Forstner <sef9869@thi.de>
12+
'''
13+
from src.models.message_replay import MessageReplayRequest, MessageReplayResult
14+
from src.services.message_replay import message_replay
15+
from fastapi import APIRouter
16+
import json, os
17+
18+
message_replay_router = APIRouter()
19+
20+
@message_replay_router.post('/')
21+
def start_message_replay(message_replay_request: MessageReplayRequest) -> MessageReplayResult:
22+
'''
23+
This function starts a message replay.
24+
25+
Author:
26+
Benedikt Schwering <bes9584@thi.de>
27+
28+
Args:
29+
message_replay_request (MessageReplayRequest): The message replay request.
30+
31+
Returns:
32+
MessageReplayResult: The message replay result.
33+
'''
34+
message_replay_result = message_replay(
35+
no_rabbitmq_direct=message_replay_request.no_rabbitmq_direct,
36+
rabbitmq_grouped=message_replay_request.rabbitmq_grouped,
37+
no_mongodb_log=message_replay_request.no_mongodb_log,
38+
no_mongodb_state=message_replay_request.no_mongodb_state,
39+
no_mongodb_statistics=message_replay_request.no_mongodb_statistics,
40+
clear_mongodb=message_replay_request.clear_mongodb,
41+
playback_speed=message_replay_request.playback_speed,
42+
playback_interval=None,
43+
start_timestamp=None,
44+
end_timestamp=None,
45+
start_time=message_replay_request.start_time + ':00',
46+
end_time=message_replay_request.end_time + ':00',
47+
)
48+
49+
return MessageReplayResult(
50+
count_announce=message_replay_result.count_announce,
51+
count_withdraw=message_replay_result.count_withdraw,
52+
)

src/main.py

Lines changed: 26 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,12 @@
1010
Benedikt Schwering <bes9584@thi.de>
1111
Sebastian Forstner <sef9869@thi.de>
1212
'''
13-
from src.adapters.mongodb import MongoDBAdapter, MongoDBLogLoader
1413
import src.services.mrt_simulation as mrt_simulation_service
15-
from src.adapters.rabbitmq import RabbitMQAdapter
16-
from src.parsers.reverse import ReverseParser
14+
import src.services.message_replay as message_replay_service
15+
import src.services.rib_load as rib_load_service
1716
import src.services.exabgp as exabgp_service
18-
from datetime import timedelta, datetime
19-
from src.parsers.rib import RibParser
20-
from collections import OrderedDict
2117
from src.webapp import start_webapp
22-
from mrtparse import Reader
23-
from rich import print
24-
import click, time
18+
import click
2519

2620
@click.group()
2721
def cli():
@@ -275,33 +269,15 @@ def rib_load(no_rabbitmq_direct: bool, rabbitmq_grouped: int, no_mongodb_log: bo
275269
clear_mongodb (bool): Clear MongoDB collections.
276270
rib_file (str): RIB file to process.
277271
'''
278-
parser = RibParser()
279-
280-
if not no_rabbitmq_direct or rabbitmq_grouped:
281-
RabbitMQAdapter(
282-
parser=parser,
283-
no_direct=no_rabbitmq_direct,
284-
queue_interval=rabbitmq_grouped,
285-
)
286-
287-
if not no_mongodb_log or not no_mongodb_state or not no_mongodb_statistics:
288-
MongoDBAdapter(
289-
parser=parser,
290-
no_mongodb_log=no_mongodb_log,
291-
no_mongodb_state=no_mongodb_state,
292-
no_mongodb_statistics=no_mongodb_statistics,
293-
clear_mongodb=clear_mongodb,
294-
)
295-
296-
for message in Reader(rib_file):
297-
if message.data['type'] != {13: 'TABLE_DUMP_V2'}:
298-
print('[dark_orange]\[WARN][/] Skipping unsupported MRT type: ', end='')
299-
print(message.data['type'])
300-
continue
301-
302-
parser.parse(
303-
statement=message.data,
304-
)
272+
rib_load_service.rib_load(
273+
no_rabbitmq_direct=no_rabbitmq_direct,
274+
rabbitmq_grouped=rabbitmq_grouped,
275+
no_mongodb_log=no_mongodb_log,
276+
no_mongodb_state=no_mongodb_state,
277+
no_mongodb_statistics=no_mongodb_statistics,
278+
clear_mongodb=clear_mongodb,
279+
rib_file=rib_file,
280+
)
305281

306282
@cli.command(
307283
name='message-replay',
@@ -409,60 +385,17 @@ def message_replay(no_rabbitmq_direct: bool, rabbitmq_grouped: int, no_mongodb_l
409385
start_time (str): Starttime of replay as time; in format (T is a set character): YYYY-MM-DDThh:mm:ss.
410386
end_time (str): Endtime of replay as time; in format (T is a set character): YYYY-MM-DDThh:mm:ss.
411387
'''
412-
parser = ReverseParser()
413-
414-
if not no_rabbitmq_direct or rabbitmq_grouped:
415-
RabbitMQAdapter(
416-
parser=parser,
417-
no_direct=no_rabbitmq_direct,
418-
queue_interval=rabbitmq_grouped,
419-
)
420-
421-
playback_speed_reference: datetime = None
422-
playback_interval_stop: datetime = None
423-
424-
# Check if start and end are given and in which format; no time given results in replaying whole db
425-
if start_timestamp and end_timestamp:
426-
start_time = datetime.fromtimestamp(start_timestamp)
427-
end_time = datetime.fromtimestamp(end_timestamp)
428-
new_messages = MongoDBLogLoader.load_messages(timestamp_start = start_time, timestamp_end = end_time)
429-
elif start_time and end_time:
430-
time_start = datetime.fromisoformat(start_time)
431-
time_end = datetime.fromisoformat(end_time)
432-
new_messages = MongoDBLogLoader.load_messages(timestamp_start = time_start, timestamp_end = time_end)
433-
else:
434-
new_messages = MongoDBLogLoader.load_messages(timestamp_start = None, timestamp_end = None)
435-
436-
# Copy messages in local list to avoid deleting them if -c is set; new_messages is corsor pointing to db
437-
all_messages: list[OrderedDict] = []
438-
for message in new_messages:
439-
all_messages.append(message)
440-
441-
# Init for MongoDBAdapter to avouid deleting messages before loading
442-
if not no_mongodb_log or not no_mongodb_state or not no_mongodb_statistics:
443-
MongoDBAdapter(
444-
parser=parser,
445-
no_mongodb_log=no_mongodb_log,
446-
no_mongodb_state=no_mongodb_state,
447-
no_mongodb_statistics=no_mongodb_statistics,
448-
clear_mongodb=clear_mongodb,
449-
)
450-
451-
for message in all_messages:
452-
current_timestamp: datetime = message['timestamp']
453-
454-
if playback_speed:
455-
if playback_speed_reference:
456-
time.sleep((current_timestamp - playback_speed_reference).seconds / playback_speed)
457-
458-
playback_speed_reference = current_timestamp
459-
460-
if playback_interval:
461-
if playback_interval_stop:
462-
if current_timestamp > playback_interval_stop:
463-
input('Enter for next interval...')
464-
playback_interval_stop = playback_interval_stop + timedelta(minutes=playback_interval)
465-
else:
466-
playback_interval_stop = current_timestamp + timedelta(minutes=playback_interval)
467-
468-
parser.parse(message)
388+
message_replay_service.message_replay(
389+
no_rabbitmq_direct=no_rabbitmq_direct,
390+
rabbitmq_grouped=rabbitmq_grouped,
391+
no_mongodb_log=no_mongodb_log,
392+
no_mongodb_state=no_mongodb_state,
393+
no_mongodb_statistics=no_mongodb_statistics,
394+
clear_mongodb=clear_mongodb,
395+
playback_speed=playback_speed,
396+
playback_interval=playback_interval,
397+
start_timestamp=start_timestamp,
398+
end_timestamp=end_timestamp,
399+
start_time=start_time,
400+
end_time=end_time,
401+
)

src/models/message_replay.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# -*- coding: utf-8 -*-
2+
'''
3+
ZettaBGP - Advanced Anomaly Detection in Internet Routing
4+
Copyright (c) 2024 Benedikt Schwering and Sebastian Forstner
5+
6+
This work is licensed under the terms of the MIT license.
7+
For a copy, see LICENSE in the project root.
8+
9+
Author:
10+
Benedikt Schwering <bes9584@thi.de>
11+
Sebastian Forstner <sef9869@thi.de>
12+
'''
13+
from pydantic import BaseModel
14+
from typing import Optional
15+
16+
class MessageReplayRequest(BaseModel):
17+
'''
18+
This class represents a request to replay messages.
19+
20+
Author:
21+
Benedikt Schwering <bes9584@thi.de>
22+
'''
23+
no_rabbitmq_direct: bool
24+
rabbitmq_grouped: Optional[int]
25+
no_mongodb_log: bool
26+
no_mongodb_state: bool
27+
no_mongodb_statistics: bool
28+
clear_mongodb: bool
29+
playback_speed: Optional[int]
30+
start_time: str
31+
end_time: str
32+
33+
class MessageReplayResult(BaseModel):
34+
'''
35+
This class represents the result of replay messages.
36+
37+
Author:
38+
Benedikt Schwering <bes9584@thi.de>
39+
'''
40+
count_withdraw: int
41+
count_announce: int

src/services/message_replay.py

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
# -*- coding: utf-8 -*-
2+
'''
3+
ZettaBGP - Advanced Anomaly Detection in Internet Routing
4+
Copyright (c) 2024 Benedikt Schwering and Sebastian Forstner
5+
6+
This work is licensed under the terms of the MIT license.
7+
For a copy, see LICENSE in the project root.
8+
9+
Author:
10+
Benedikt Schwering <bes9584@thi.de>
11+
Sebastian Forstner <sef9869@thi.de>
12+
'''
13+
from src.adapters.mongodb import MongoDBAdapter, MongoDBLogLoader
14+
from src.adapters.rabbitmq import RabbitMQAdapter
15+
from src.models.route_update import ChangeType
16+
from src.parsers.reverse import ReverseParser
17+
from datetime import timedelta, datetime
18+
from collections import OrderedDict
19+
from pydantic import BaseModel
20+
import time
21+
22+
class MessageReplayResult(BaseModel):
23+
count_announce: int
24+
count_withdraw: int
25+
26+
def message_replay(no_rabbitmq_direct: bool, rabbitmq_grouped: int, no_mongodb_log: bool, no_mongodb_state: bool, no_mongodb_statistics: bool, clear_mongodb: bool, playback_speed: int, playback_interval: int, start_timestamp: float, end_timestamp: float, start_time: str, end_time: str) -> MessageReplayResult:
27+
'''
28+
Message replay service for replaying BGP messages from Database log.
29+
30+
Author:
31+
Sebastian Forstner <sef9869@thi.de>
32+
33+
Args:
34+
no_rabbitmq_direct (bool): Disable direct RabbitMQ direct queue..
35+
rabbitmq_grouped (int): Queue group interval in minutes.
36+
no_mongodb_log (bool): Disable logging to MongoDB.
37+
no_mongodb_state (bool): Disable state storage to MongoDB.
38+
no_mongodb_statistics (bool): Disable statistics storage to MongoDB.
39+
clear_mongodb (bool): Clear MongoDB collections.
40+
playback_speed (int): Playback speed in multiples of real time.
41+
playback_interval (int): Playback interval in minutes.
42+
start_timestamp (float): Starttime of replay as timestamp.
43+
end_timestamp (float): Endtime of replay as timestamp.
44+
start_time (str): Starttime of replay as time; in format (T is a set character): YYYY-MM-DDThh:mm:ss.
45+
end_time (str): Endtime of replay as time; in format (T is a set character): YYYY-MM-DDThh:mm:ss.
46+
47+
Returns:
48+
MessageReplayResult: The message replay result.
49+
'''
50+
message_replay_result = MessageReplayResult(
51+
count_announce=0,
52+
count_withdraw=0,
53+
)
54+
55+
parser = ReverseParser()
56+
57+
if not no_rabbitmq_direct or rabbitmq_grouped:
58+
RabbitMQAdapter(
59+
parser=parser,
60+
no_direct=no_rabbitmq_direct,
61+
queue_interval=rabbitmq_grouped,
62+
)
63+
64+
playback_speed_reference: datetime = None
65+
playback_interval_stop: datetime = None
66+
67+
# Check if start and end are given and in which format; no time given results in replaying whole db
68+
if start_timestamp and end_timestamp:
69+
start_time = datetime.fromtimestamp(start_timestamp)
70+
end_time = datetime.fromtimestamp(end_timestamp)
71+
new_messages = MongoDBLogLoader.load_messages(timestamp_start = start_time, timestamp_end = end_time)
72+
elif start_time and end_time:
73+
time_start = datetime.fromisoformat(start_time)
74+
time_end = datetime.fromisoformat(end_time)
75+
new_messages = MongoDBLogLoader.load_messages(timestamp_start = time_start, timestamp_end = time_end)
76+
else:
77+
new_messages = MongoDBLogLoader.load_messages(timestamp_start = None, timestamp_end = None)
78+
79+
# Copy messages in local list to avoid deleting them if -c is set; new_messages is corsor pointing to db
80+
all_messages: list[OrderedDict] = []
81+
for message in new_messages:
82+
all_messages.append(message)
83+
84+
# Init for MongoDBAdapter to avouid deleting messages before loading
85+
if not no_mongodb_log or not no_mongodb_state or not no_mongodb_statistics:
86+
MongoDBAdapter(
87+
parser=parser,
88+
no_mongodb_log=no_mongodb_log,
89+
no_mongodb_state=no_mongodb_state,
90+
no_mongodb_statistics=no_mongodb_statistics,
91+
clear_mongodb=clear_mongodb,
92+
)
93+
94+
for message in all_messages:
95+
current_timestamp: datetime = message['timestamp']
96+
97+
if playback_speed:
98+
if playback_speed_reference:
99+
time.sleep((current_timestamp - playback_speed_reference).seconds / playback_speed)
100+
101+
playback_speed_reference = current_timestamp
102+
103+
if playback_interval:
104+
if playback_interval_stop:
105+
if current_timestamp > playback_interval_stop:
106+
input('Enter for next interval...')
107+
playback_interval_stop = playback_interval_stop + timedelta(minutes=playback_interval)
108+
else:
109+
playback_interval_stop = current_timestamp + timedelta(minutes=playback_interval)
110+
111+
updates = parser.parse(message)
112+
113+
if updates:
114+
for update in updates:
115+
if update.change_type == ChangeType.ANNOUNCE:
116+
message_replay_result.count_announce += 1
117+
elif update.change_type == ChangeType.WITHDRAW:
118+
message_replay_result.count_withdraw += 1
119+
120+
return message_replay_result

0 commit comments

Comments
 (0)