Skip to content

Commit

Permalink
Merge branch '18-mrt-funktionen-refactoring' into 'main'
Browse files Browse the repository at this point in the history
Resolve "MRT Funktionen Refactoring"

Closes #18

See merge request imprj/01-bgp-testbed/zettabgp!14
  • Loading branch information
DlieBG committed Nov 22, 2024
2 parents c111e52 + aa14cad commit cb33b50
Show file tree
Hide file tree
Showing 24 changed files with 752 additions and 174 deletions.
8 changes: 8 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/inspectionProfiles/profiles_settings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions .idea/zettabgp.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
MIT License

Copyright (c) 2024 Benedikt Schwering
Copyright (c) 2024 Benedikt Schwering and Sebastian Forstner

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ ZETTABGP_WEBAPP_MRT_LIBRARY_PATH
## Usage
ZettaBGP provides a CLI interface with some commands for testbed simulations as well for production use.

### Comands
### Commands
#### `zettabgp exabgp`
The `exabgp` subcommand is used for processing ExaBGP Messages.\
The process can be started from within ExaBGP.\
Expand Down Expand Up @@ -112,7 +112,7 @@ When no `-g` option is present, no grouped updates will appear at all.

#### `zettabgp mrt-simulation`
The `mrt-simulation` subcommand is used for processing mrt files.\
It is mendatory to provide a valid path to at least one mrt file.\
It is mandatory to provide a valid path to at least one mrt file.\
`mrt-simulation` also supports the handling of multiple mrt files.\
But keep in mind to provide sequentially sorted mrt files based on the timeframe.\
Otherwise the grouping feature will not work properly!
Expand Down Expand Up @@ -147,7 +147,7 @@ zettabgp mrt-simulation <mrt-file> -p 2
```

##### Playback Interval
For debugging the timebased group update queue, it is very useful to playback all update messages that occur within an interval of for example 5 minutes.\
For debugging the timebase group update queue, it is very useful to playback all update messages that occur within an interval of for example 5 minutes.\
When you specify option `-o` you can set a playback interval in minutes that defaults to 5 minutes.\
Between the intervals you have to press enter to continue with the replay of the next interval.\
Of course you can combine this option with the playback speed option.\
Expand Down
52 changes: 42 additions & 10 deletions src/adapters/mongodb.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
# -*- coding: utf-8 -*-
'''
ZettaBGP - Advanced Anomaly Detection in Internet Routing
Copyright (c) 2024 Benedikt Schwering and Sebastian Forstner
This work is licensed under the terms of the MIT license.
For a copy, see LICENSE in the project root.
Author:
Benedikt Schwering <bes9584@thi.de>
Sebastian Forstner <sef9869@thi.de>
'''
from src.parsers.route_update import RouteUpdateParser
from src.models.route_update import RouteUpdate
from src.models.route_update import ChangeType
Expand All @@ -7,10 +19,28 @@
import os

class MongoDBAdapter:
'''This class is responsible for receiving the parsed messages and forwarding them to both MongoDB databases'''
'''
This class is responsible for receiving the parsed messages and forwarding them to both MongoDB databases.
Author:
Sebastian Forstner <sef9869@thi.de>
'''
def __init__(self, parser: RouteUpdateParser, no_mongodb_log: bool, no_mongodb_state: bool, no_mongodb_statistics: bool, clear_mongodb: bool):
'''
Initializes the MongoDBAdapter.
Author:
Sebastian Forstner <sef9869@thi.de>
Args:
parser (RouteUpdateParser): The parser to receive the parsed messages from.
no_mongodb_log (bool): Whether to disable the log storage.
no_mongodb_state (bool): Whether to disable the state storage.
no_mongodb_statistics (bool): Whether to disable the statistics storage.
clear_mongodb (bool): Whether to clear the MongoDB databases.
'''
try:
'''Connects to MongoDB-Container running with Docker'''
# Connects to MongoDB-Container running with Docker
database_client = MongoClient(
host=os.getenv('MONGO_DB_HOST', 'localhost'),
port=int(os.getenv('MONGO_DB_PORT', 27017)),
Expand All @@ -21,21 +51,22 @@ def __init__(self, parser: RouteUpdateParser, no_mongodb_log: bool, no_mongodb_s
log_flag = no_mongodb_log
state_flag = no_mongodb_state
statistics_flag = no_mongodb_statistics
'''Creates database and collection for log storrage'''

# Creates database and collection for log storage
if not log_flag:
log_db = database_client.message_log
log_collection = log_db.storage
if clear_mongodb:
log_collection.delete_many({})

'''Creates database and collection for state storrage'''
# Creates database and collection for state storage
if not state_flag:
state_db = database_client.message_state
state_collection = state_db.storage
if clear_mongodb:
state_collection.delete_many({})

'''Creates database and collection for statistics storrage'''
# Creates database and collection for statistics storage
if not statistics_flag:
statistics_db = database_client.message_statistics
statistics_collection = statistics_db.storage
Expand All @@ -45,7 +76,7 @@ def __init__(self, parser: RouteUpdateParser, no_mongodb_log: bool, no_mongodb_s
@parser.on_update
def on_update(message: RouteUpdate):

'''saves optional, non-base-type attributes for later use; required to guarantee save use of mongodb'''
# Saves optional, non-base-type attributes for later use; required to guarantee save use of mongodb
if message.path_attributes.origin:
origins = message.path_attributes.origin.value
else:
Expand Down Expand Up @@ -75,7 +106,7 @@ def on_update(message: RouteUpdate):
else:
extended_community.append(str(ext_com))

'''creates dict for message with _id and other unique attributes, that dont change'''
# Creates dict for message with _id and other unique attributes, that don't change
new_message_id = {
'timestamp' : message.timestamp,
'peer_ip' : message.peer_ip,
Expand Down Expand Up @@ -103,7 +134,8 @@ def on_update(message: RouteUpdate):
},
'_id' : ObjectId(),
}
'''creates dict used for collection updates, MUST NOT contain _id and should not contain other non changing attributes'''

# Creates dict used for collection updates, MUST NOT contain _id and should not contain other non changing attributes
set_message = {
'$set': {
'timestamp' : message.timestamp,
Expand All @@ -127,7 +159,7 @@ def on_update(message: RouteUpdate):
}
}

'''route got withdrawn, db actions accordingly'''
# Route got withdrawn, db actions accordingly
if message.change_type == ChangeType.WITHDRAW:
if not log_flag:
log_announce = log_collection.insert_one(new_message_id)
Expand Down Expand Up @@ -159,7 +191,7 @@ def on_update(message: RouteUpdate):
}
statistics_announce = statistics_collection.update_one(statistics_filter, new_values, upsert=True)

'''route got announced, db actions accordingly'''
# Route got announced, db actions accordingly
if message.change_type == ChangeType.ANNOUNCE:
if not log_flag:
log_announce = log_collection.insert_one(new_message_id)
Expand Down
34 changes: 31 additions & 3 deletions src/adapters/rabbitmq.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,53 @@
# -*- coding: utf-8 -*-
'''
ZettaBGP - Advanced Anomaly Detection in Internet Routing
Copyright (c) 2024 Benedikt Schwering and Sebastian Forstner
This work is licensed under the terms of the MIT license.
For a copy, see LICENSE in the project root.
Author:
Benedikt Schwering <bes9584@thi.de>
Sebastian Forstner <sef9869@thi.de>
'''
from src.parsers.route_update import RouteUpdateParser
from src.models.route_update import RouteUpdate
from datetime import timedelta, datetime
import pika, json, os

class RabbitMQAdapter:
'''This class is responsible for receiving the parsed messages and forwarding them to the RabbitMQ message broker'''
'''
This class is responsible for receiving the parsed messages and forwarding them to the RabbitMQ message broker.
Author:
Benedikt Schwering <bes9584@thi.de>
'''
def __init__(self, parser: RouteUpdateParser, no_direct: bool, queue_interval: int):
'''
Initializes the RabbitMQAdapter.
Author:
Benedikt Schwering <bes9584@thi.de>
Args:
parser (RouteUpdateParser): The parser to receive the parsed messages from.
no_direct (bool): Whether to disable the direct route updates.
queue_interval (int): The interval in minutes to group the route updates.
'''
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=os.getenv('RABBIT_MQ_HOST', 'localhost'),
)
)

'''Creates a channel for the connection and declares the zettabgp exchange'''
# Creates a channel for the connection and declares the zettabgp exchange
channel = connection.channel()
channel.exchange_declare(
exchange='zettabgp',
exchange_type='direct',
)

'''Declares the test_bgp_updates queue and binds it to the zettabgp exchange'''
# Declares the test_bgp_updates queue and binds it to the zettabgp exchange
def _declare_test_queue(queue_name: str, routing_key: str):
channel.queue_declare(
queue=queue_name,
Expand Down
Loading

0 comments on commit cb33b50

Please sign in to comment.