Skip to content

Commit

Permalink
Merge branch '16-anomaly-replay' into 'main'
Browse files Browse the repository at this point in the history
Resolve "Zeitbasierte Annomalie Wiederholung"

Closes #13

See merge request imprj/01-bgp-testbed/zettabgp!15
  • Loading branch information
DlieBG committed Nov 27, 2024
2 parents a606065 + 95f55d0 commit a7e42af
Show file tree
Hide file tree
Showing 4 changed files with 392 additions and 3 deletions.
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,32 @@ Options:
-c, --clear-mongodb
```

#### `zettabgp message-replay`
This command lets you load already saved BGP messages from the database and replays them.\
You can use all options as in the `mrt-simulation` command. \
Additionally you can give a timeframe for the messages you want to replay.

```
Options:
-d, --no-rabbitmq-direct
-g, --rabbitmq-grouped INTEGER Queue group interval in minutes. [default: (5)]
-l, --no-mongodb-log
-s, --no-mongodb-state
-t, --no-mongodb-statistics
-c, --clear-mongodb
-p, --playback-speed INTEGER Playback speed in multiples of real time. [default: (1)]
-o, --playback-interval INTEGER Playback interval in minutes. [default: (5)]
-b, --start-timestamp FLOAT Timestamp for the starttime of the replay
-e, --end-timestamp FLOAT Timestamp for the endtime of the replay
-r, --start-time STRING Time for the starttime of the replay, format (T is a set character): YYYY-MM-DDThh:mm:ss
-f, --end-time STRING Time for the endtime of the replay, format (T is a set character): YYYY-MM-DDThh:mm:ss
```

##### Timestamps and string as timeframe
When using the `-b` option you must use the `-e` option aswell.\
For `-r` and `-f` the same logic applies.\
When no timeframe is provided or only one of the two necessary options is set, the whole database will be loaded and used for the replay.

## Debugging
Some sample json messages for debugging purposes from ExaBGP can be found in the `samples` directory.

Expand Down
44 changes: 44 additions & 0 deletions src/adapters/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from src.models.route_update import RouteUpdate
from src.models.route_update import ChangeType
from pymongo import MongoClient
from datetime import datetime
from typing import Optional
from bson import ObjectId
import os
Expand Down Expand Up @@ -221,3 +222,46 @@ def on_update(message: RouteUpdate):
}
}
statistics_announce = statistics_collection.update_one(statistics_filter, new_values, upsert=True)

class MongoDBLogLoader:
'''
This class is responsible for loading messages from the MongoDB Log.
Author:
Sebastian Forstner <sef9869@thi.de>
'''
@staticmethod
def load_messages(timestamp_start: datetime, timestamp_end: datetime) -> list[dict]:
'''
Loads messages from the MongoDB Log.
Author:
Sebastian Forstner <sef9869@thi.de>
Args:
timestamp_start (datetime): The start timestamp.
timestamp_end (datetime): The end timestamp.
Returns:
list[dict]: The loaded messages.
'''
try:
'''Connects to MongoDB-Container running with Docker'''
database_client = MongoClient(
host=os.getenv('MONGO_DB_HOST', 'localhost'),
port=int(os.getenv('MONGO_DB_PORT', 27017)),
)
except:
print('Could not connect to the database')

log_db = database_client.message_log
log_collection = log_db.storage


if timestamp_start and timestamp_end:
filter = {'timestamp': {'$gte': timestamp_start, '$lte': timestamp_end}}
all_messages = log_collection.find(filter)
else:
all_messages = log_collection.find()

return all_messages
174 changes: 171 additions & 3 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@
Benedikt Schwering <bes9584@thi.de>
Sebastian Forstner <sef9869@thi.de>
'''
from src.adapters.mongodb import MongoDBAdapter, MongoDBLogLoader
import src.services.mrt_simulation as mrt_simulation_service
from src.adapters.rabbitmq import RabbitMQAdapter
from src.adapters.mongodb import MongoDBAdapter
from src.parsers.reverse import ReverseParser
import src.services.exabgp as exabgp_service
from datetime import timedelta, datetime
from src.parsers.rib import RibParser
from collections import OrderedDict
from src.webapp import start_webapp
from mrtparse import Reader
import click
from rich import print
import click, time

@click.group()
def cli():
Expand Down Expand Up @@ -279,7 +283,7 @@ def rib_load(no_rabbitmq_direct: bool, rabbitmq_grouped: int, no_mongodb_log: bo
no_direct=no_rabbitmq_direct,
queue_interval=rabbitmq_grouped,
)

if not no_mongodb_log or not no_mongodb_state or not no_mongodb_statistics:
MongoDBAdapter(
parser=parser,
Expand All @@ -298,3 +302,167 @@ def rib_load(no_rabbitmq_direct: bool, rabbitmq_grouped: int, no_mongodb_log: bo
parser.parse(
statement=message.data,
)

@cli.command(
name='message-replay',
help='Replay messages from database.',
)
@click.option(
'--no-rabbitmq-direct',
'-d',
is_flag=True,
)
@click.option(
'--rabbitmq-grouped',
'-g',
type=int,
default=None,
show_default='5',
is_flag=False,
flag_value=5,
help='Queue group interval in minutes.',
)
@click.option(
'--no-mongodb-log',
'-l',
is_flag=True,
)
@click.option(
'--no-mongodb-state',
'-s',
is_flag=True,
)
@click.option(
'--no-mongodb-statistics',
'-t',
is_flag=True,
)
@click.option(
'--clear-mongodb',
'-c',
is_flag=True,
)
@click.option(
'--playback-speed',
'-p',
type=int,
default=None,
show_default='1',
is_flag=False,
flag_value=1,
help='Playback speed in multiples of real time.',
)
@click.option(
'--playback-interval',
'-o',
type=int,
default=None,
show_default='5',
is_flag=False,
flag_value=5,
help='Playback interval in minutes.',
)
@click.option(
'--start-timestamp',
'-b',
type=float,
default=None,
help='Starttime of replay as timestamp.',
)
@click.option(
'--end-timestamp',
'-e',
type=float,
default=None,
help='Endtime of replay as timestamp.',
)
@click.option(
'--start-time',
'-r',
type=str,
help='Starttime of replay as time; in format (T is a set character): YYYY-MM-DDThh:mm:ss.',
)
@click.option(
'--end-time',
'-f',
type=str,
help='Endtime of replay as time; in format (T is a set character): YYYY-MM-DDThh:mm:ss.',
)
def message_replay(no_rabbitmq_direct: bool, rabbitmq_grouped: int, no_mongodb_log: bool, no_mongodb_state: bool, no_mongodb_statistics: bool, clear_mongodb: bool, playback_speed: int, playback_interval: int, start_timestamp: float, end_timestamp: float, start_time: str, end_time: str):
'''
Message replay command for replaying BGP messages from Database log.
Author:
Sebastian Forstner <sef9869@thi.de>
Args:
no_rabbitmq_direct (bool): Disable direct RabbitMQ direct queue..
rabbitmq_grouped (int): Queue group interval in minutes.
no_mongodb_log (bool): Disable logging to MongoDB.
no_mongodb_state (bool): Disable state storage to MongoDB.
no_mongodb_statistics (bool): Disable statistics storage to MongoDB.
clear_mongodb (bool): Clear MongoDB collections.
playback_speed (int): Playback speed in multiples of real time.
playback_interval (int): Playback interval in minutes.
start_timestamp (float): Starttime of replay as timestamp.
end_timestamp (float): Endtime of replay as timestamp.
start_time (str): Starttime of replay as time; in format (T is a set character): YYYY-MM-DDThh:mm:ss.
end_time (str): Endtime of replay as time; in format (T is a set character): YYYY-MM-DDThh:mm:ss.
'''
parser = ReverseParser()

if not no_rabbitmq_direct or rabbitmq_grouped:
RabbitMQAdapter(
parser=parser,
no_direct=no_rabbitmq_direct,
queue_interval=rabbitmq_grouped,
)

playback_speed_reference: datetime = None
playback_interval_stop: datetime = None

# Check if start and end are given and in which format; no time given results in replaying whole db
if start_timestamp and end_timestamp:
start_time = datetime.fromtimestamp(start_timestamp)
end_time = datetime.fromtimestamp(end_timestamp)
new_messages = MongoDBLogLoader.load_messages(timestamp_start = start_time, timestamp_end = end_time)
elif start_time and end_time:
time_start = datetime.fromisoformat(start_time)
time_end = datetime.fromisoformat(end_time)
new_messages = MongoDBLogLoader.load_messages(timestamp_start = time_start, timestamp_end = time_end)
else:
new_messages = MongoDBLogLoader.load_messages(timestamp_start = None, timestamp_end = None)

# Copy messages in local list to avoid deleting them if -c is set; new_messages is corsor pointing to db
all_messages: list[OrderedDict] = []
for message in new_messages:
all_messages.append(message)

# Init for MongoDBAdapter to avouid deleting messages before loading
if not no_mongodb_log or not no_mongodb_state or not no_mongodb_statistics:
MongoDBAdapter(
parser=parser,
no_mongodb_log=no_mongodb_log,
no_mongodb_state=no_mongodb_state,
no_mongodb_statistics=no_mongodb_statistics,
clear_mongodb=clear_mongodb,
)

for message in all_messages:
current_timestamp: datetime = message['timestamp']

if playback_speed:
if playback_speed_reference:
time.sleep((current_timestamp - playback_speed_reference).seconds / playback_speed)

playback_speed_reference = current_timestamp

if playback_interval:
if playback_interval_stop:
if current_timestamp > playback_interval_stop:
input('Enter for next interval...')
playback_interval_stop = playback_interval_stop + timedelta(minutes=playback_interval)
else:
playback_interval_stop = current_timestamp + timedelta(minutes=playback_interval)

parser.parse(message)
Loading

0 comments on commit a7e42af

Please sign in to comment.