diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..47d517c --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..382ed1e --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/zettabgp.iml b/.idea/zettabgp.iml new file mode 100644 index 0000000..e60727b --- /dev/null +++ b/.idea/zettabgp.iml @@ -0,0 +1,10 @@ + + + + + + + + + + \ No newline at end of file diff --git a/LICENSE b/LICENSE index db8a66a..69c6983 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2024 Benedikt Schwering +Copyright (c) 2024 Benedikt Schwering and Sebastian Forstner Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index 7fe5b8a..b7a7f4a 100644 --- a/README.md +++ b/README.md @@ -84,7 +84,7 @@ ZETTABGP_WEBAPP_MRT_LIBRARY_PATH ## Usage ZettaBGP provides a CLI interface with some commands for testbed simulations as well for production use. -### Comands +### Commands #### `zettabgp exabgp` The `exabgp` subcommand is used for processing ExaBGP Messages.\ The process can be started from within ExaBGP.\ @@ -112,7 +112,7 @@ When no `-g` option is present, no grouped updates will appear at all. #### `zettabgp mrt-simulation` The `mrt-simulation` subcommand is used for processing mrt files.\ -It is mendatory to provide a valid path to at least one mrt file.\ +It is mandatory to provide a valid path to at least one mrt file.\ `mrt-simulation` also supports the handling of multiple mrt files.\ But keep in mind to provide sequentially sorted mrt files based on the timeframe.\ Otherwise the grouping feature will not work properly! @@ -147,7 +147,7 @@ zettabgp mrt-simulation -p 2 ``` ##### Playback Interval -For debugging the timebased group update queue, it is very useful to playback all update messages that occur within an interval of for example 5 minutes.\ +For debugging the timebase group update queue, it is very useful to playback all update messages that occur within an interval of for example 5 minutes.\ When you specify option `-o` you can set a playback interval in minutes that defaults to 5 minutes.\ Between the intervals you have to press enter to continue with the replay of the next interval.\ Of course you can combine this option with the playback speed option.\ diff --git a/src/adapters/mongodb.py b/src/adapters/mongodb.py index e6d208b..92a6299 100644 --- a/src/adapters/mongodb.py +++ b/src/adapters/mongodb.py @@ -1,3 +1,15 @@ +# -*- coding: utf-8 -*- +''' +ZettaBGP - Advanced Anomaly Detection in Internet Routing +Copyright (c) 2024 Benedikt Schwering and Sebastian Forstner + +This work is licensed under the terms of the MIT license. +For a copy, see LICENSE in the project root. + +Author: + Benedikt Schwering + Sebastian Forstner +''' from src.parsers.route_update import RouteUpdateParser from src.models.route_update import RouteUpdate from src.models.route_update import ChangeType @@ -7,10 +19,28 @@ import os class MongoDBAdapter: - '''This class is responsible for receiving the parsed messages and forwarding them to both MongoDB databases''' + ''' + This class is responsible for receiving the parsed messages and forwarding them to both MongoDB databases. + + Author: + Sebastian Forstner + ''' def __init__(self, parser: RouteUpdateParser, no_mongodb_log: bool, no_mongodb_state: bool, no_mongodb_statistics: bool, clear_mongodb: bool): + ''' + Initializes the MongoDBAdapter. + + Author: + Sebastian Forstner + + Args: + parser (RouteUpdateParser): The parser to receive the parsed messages from. + no_mongodb_log (bool): Whether to disable the log storage. + no_mongodb_state (bool): Whether to disable the state storage. + no_mongodb_statistics (bool): Whether to disable the statistics storage. + clear_mongodb (bool): Whether to clear the MongoDB databases. + ''' try: - '''Connects to MongoDB-Container running with Docker''' + # Connects to MongoDB-Container running with Docker database_client = MongoClient( host=os.getenv('MONGO_DB_HOST', 'localhost'), port=int(os.getenv('MONGO_DB_PORT', 27017)), @@ -21,21 +51,22 @@ def __init__(self, parser: RouteUpdateParser, no_mongodb_log: bool, no_mongodb_s log_flag = no_mongodb_log state_flag = no_mongodb_state statistics_flag = no_mongodb_statistics - '''Creates database and collection for log storrage''' + + # Creates database and collection for log storage if not log_flag: log_db = database_client.message_log log_collection = log_db.storage if clear_mongodb: log_collection.delete_many({}) - '''Creates database and collection for state storrage''' + # Creates database and collection for state storage if not state_flag: state_db = database_client.message_state state_collection = state_db.storage if clear_mongodb: state_collection.delete_many({}) - '''Creates database and collection for statistics storrage''' + # Creates database and collection for statistics storage if not statistics_flag: statistics_db = database_client.message_statistics statistics_collection = statistics_db.storage @@ -45,7 +76,7 @@ def __init__(self, parser: RouteUpdateParser, no_mongodb_log: bool, no_mongodb_s @parser.on_update def on_update(message: RouteUpdate): - '''saves optional, non-base-type attributes for later use; required to guarantee save use of mongodb''' + # Saves optional, non-base-type attributes for later use; required to guarantee save use of mongodb if message.path_attributes.origin: origins = message.path_attributes.origin.value else: @@ -75,7 +106,7 @@ def on_update(message: RouteUpdate): else: extended_community.append(str(ext_com)) - '''creates dict for message with _id and other unique attributes, that dont change''' + # Creates dict for message with _id and other unique attributes, that don't change new_message_id = { 'timestamp' : message.timestamp, 'peer_ip' : message.peer_ip, @@ -103,7 +134,8 @@ def on_update(message: RouteUpdate): }, '_id' : ObjectId(), } - '''creates dict used for collection updates, MUST NOT contain _id and should not contain other non changing attributes''' + + # Creates dict used for collection updates, MUST NOT contain _id and should not contain other non changing attributes set_message = { '$set': { 'timestamp' : message.timestamp, @@ -127,7 +159,7 @@ def on_update(message: RouteUpdate): } } - '''route got withdrawn, db actions accordingly''' + # Route got withdrawn, db actions accordingly if message.change_type == ChangeType.WITHDRAW: if not log_flag: log_announce = log_collection.insert_one(new_message_id) @@ -159,7 +191,7 @@ def on_update(message: RouteUpdate): } statistics_announce = statistics_collection.update_one(statistics_filter, new_values, upsert=True) - '''route got announced, db actions accordingly''' + # Route got announced, db actions accordingly if message.change_type == ChangeType.ANNOUNCE: if not log_flag: log_announce = log_collection.insert_one(new_message_id) diff --git a/src/adapters/rabbitmq.py b/src/adapters/rabbitmq.py index 9d3cb9c..4d059a0 100644 --- a/src/adapters/rabbitmq.py +++ b/src/adapters/rabbitmq.py @@ -1,25 +1,53 @@ +# -*- coding: utf-8 -*- +''' +ZettaBGP - Advanced Anomaly Detection in Internet Routing +Copyright (c) 2024 Benedikt Schwering and Sebastian Forstner + +This work is licensed under the terms of the MIT license. +For a copy, see LICENSE in the project root. + +Author: + Benedikt Schwering + Sebastian Forstner +''' from src.parsers.route_update import RouteUpdateParser from src.models.route_update import RouteUpdate from datetime import timedelta, datetime import pika, json, os class RabbitMQAdapter: - '''This class is responsible for receiving the parsed messages and forwarding them to the RabbitMQ message broker''' + ''' + This class is responsible for receiving the parsed messages and forwarding them to the RabbitMQ message broker. + + Author: + Benedikt Schwering + ''' def __init__(self, parser: RouteUpdateParser, no_direct: bool, queue_interval: int): + ''' + Initializes the RabbitMQAdapter. + + Author: + Benedikt Schwering + + Args: + parser (RouteUpdateParser): The parser to receive the parsed messages from. + no_direct (bool): Whether to disable the direct route updates. + queue_interval (int): The interval in minutes to group the route updates. + ''' connection = pika.BlockingConnection( pika.ConnectionParameters( host=os.getenv('RABBIT_MQ_HOST', 'localhost'), ) ) - '''Creates a channel for the connection and declares the zettabgp exchange''' + # Creates a channel for the connection and declares the zettabgp exchange channel = connection.channel() channel.exchange_declare( exchange='zettabgp', exchange_type='direct', ) - '''Declares the test_bgp_updates queue and binds it to the zettabgp exchange''' + # Declares the test_bgp_updates queue and binds it to the zettabgp exchange def _declare_test_queue(queue_name: str, routing_key: str): channel.queue_declare( queue=queue_name, diff --git a/src/controllers/mrt_library.py b/src/controllers/mrt_library.py index 33e6116..9f503f5 100644 --- a/src/controllers/mrt_library.py +++ b/src/controllers/mrt_library.py @@ -1,18 +1,33 @@ +# -*- coding: utf-8 -*- +''' +ZettaBGP - Advanced Anomaly Detection in Internet Routing +Copyright (c) 2024 Benedikt Schwering and Sebastian Forstner + +This work is licensed under the terms of the MIT license. +For a copy, see LICENSE in the project root. + +Author: + Benedikt Schwering + Sebastian Forstner +''' from src.models.mrt_library import MRTScenarioRequest, MRTScenarioResult, MRTScenario, MRTLibrary -from src.parsers.mrt_bgp4mp import MrtBgp4MpParser -from src.adapters.rabbitmq import RabbitMQAdapter -from src.adapters.mongodb import MongoDBAdapter -from src.models.route_update import ChangeType -from fastapi.exceptions import HTTPException -from datetime import datetime -from fastapi import APIRouter -from mrtparse import Reader +from src.services.mrt_simulation import mrt_simulation +from fastapi import APIRouter, HTTPException from pathlib import Path -import json, time, os +import json, os mrt_library_router = APIRouter() def _get_mrt_library() -> MRTLibrary: + ''' + This function returns the MRT library. + + Author: + Benedikt Schwering + + Returns: + MRTLibrary: The MRT library. + ''' mrt_library = MRTLibrary( scenarios=[], ) @@ -34,80 +49,74 @@ def _get_mrt_library() -> MRTLibrary: return mrt_library def _get_mrt_scenario(id: str) -> MRTScenario: + ''' + This function returns an MRT scenario by its ID. + + Author: + Benedikt Schwering + + Args: + id (str): The ID of the MRT scenario. + + Returns: + MRTScenario: The MRT scenario. + ''' for scenario in _get_mrt_library().scenarios: if scenario.id == id: return scenario @mrt_library_router.get('/') def get_mrt_library() -> MRTLibrary: + ''' + This function returns the MRT library. + + Author: + Benedikt Schwering + + Returns: + MRTLibrary: The MRT library. + ''' return _get_mrt_library() @mrt_library_router.post('/') def start_mrt_scenario(mrt_scenario_request: MRTScenarioRequest) -> MRTScenarioResult: + ''' + This function starts an MRT scenario. + + Author: + Benedikt Schwering + + Args: + mrt_scenario_request (MRTScenarioRequest): The MRT scenario request. + + Returns: + MRTScenarioResult: The MRT scenario result. + ''' scenario = _get_mrt_scenario( id=mrt_scenario_request.id, ) if not scenario: - return HTTPException( + raise HTTPException( status_code=400, detail='Scenario not found.', ) - mrt_scenario_result = MRTScenarioResult( - count_announce=0, - count_withdraw=0, + mrt_simulation_result = mrt_simulation( + no_rabbitmq_direct=scenario.no_rabbitmq_direct, + rabbitmq_grouped=scenario.rabbitmq_grouped, + no_mongodb_log=scenario.no_mongodb_log, + no_mongodb_state=scenario.no_mongodb_state, + no_mongodb_statistics=scenario.no_mongodb_statistics, + clear_mongodb=scenario.clear_mongodb, + playback_speed=scenario.playback_speed, + mrt_files=tuple([ + str(Path(scenario.path) / Path(mrt_file)) + for mrt_file in scenario.mrt_files + ]), ) - parser = MrtBgp4MpParser() - - if not scenario.no_rabbitmq_direct or scenario.rabbitmq_grouped: - RabbitMQAdapter( - parser=parser, - no_direct=scenario.no_rabbitmq_direct, - queue_interval=scenario.rabbitmq_grouped, - ) - - if not scenario.no_mongodb_log or not scenario.no_mongodb_state or not scenario.no_mongodb_statistics: - MongoDBAdapter( - parser=parser, - no_mongodb_log=scenario.no_mongodb_log, - no_mongodb_state=scenario.no_mongodb_state, - no_mongodb_statistics=scenario.no_mongodb_statistics, - clear_mongodb=scenario.clear_mongodb, - ) - - playback_speed_reference: datetime = None - - for mrt_file in scenario.mrt_files: - mrt_file = str(Path(scenario.path) / Path(mrt_file)) - - for message in Reader(mrt_file): - if message.data['type'] != {16: 'BGP4MP'}: - print('[dark_orange]\[WARN][/] Skipping unsupported MRT type: ', end='') - print(message.data['type']) - continue - - current_timestamp: datetime = datetime.fromtimestamp( - timestamp=list(message.data['timestamp'].keys())[0], - ) - - if scenario.playback_speed: - if playback_speed_reference: - time.sleep((current_timestamp - playback_speed_reference).seconds / scenario.playback_speed) - - playback_speed_reference = current_timestamp - - updates = parser.parse( - bgp4mp_message=message, - ) - - if updates: - for update in updates: - match update.change_type: - case ChangeType.ANNOUNCE: - mrt_scenario_result.count_announce += 1 - case ChangeType.WITHDRAW: - mrt_scenario_result.count_withdraw += 1 - - return mrt_scenario_result + return MRTScenarioResult( + count_announce=mrt_simulation_result.count_announce, + count_withdraw=mrt_simulation_result.count_withdraw, + ) diff --git a/src/controllers/version.py b/src/controllers/version.py index d7c22a5..5e70e11 100644 --- a/src/controllers/version.py +++ b/src/controllers/version.py @@ -1,8 +1,29 @@ +# -*- coding: utf-8 -*- +''' +ZettaBGP - Advanced Anomaly Detection in Internet Routing +Copyright (c) 2024 Benedikt Schwering and Sebastian Forstner + +This work is licensed under the terms of the MIT license. +For a copy, see LICENSE in the project root. + +Author: + Benedikt Schwering + Sebastian Forstner +''' from fastapi import APIRouter import pkg_resources version_router = APIRouter() @version_router.get('/') -def get_version(): +def get_version() -> str: + ''' + This function returns the version of the ZettaBGP package. + + Author: + Benedikt Schwering + + Returns: + str: The version of the ZettaBGP package. + ''' return pkg_resources.get_distribution('zettabgp').version diff --git a/src/main.py b/src/main.py index 1ad8e2a..d4df4ad 100644 --- a/src/main.py +++ b/src/main.py @@ -1,12 +1,19 @@ -from src.parsers.mrt_bgp4mp import MrtBgp4MpParser -from src.adapters.rabbitmq import RabbitMQAdapter -from src.adapters.mongodb import MongoDBAdapter -from src.parsers.exabgp import ExaBGPParser -from datetime import timedelta, datetime +# -*- coding: utf-8 -*- +''' +ZettaBGP - Advanced Anomaly Detection in Internet Routing +Copyright (c) 2024 Benedikt Schwering and Sebastian Forstner + +This work is licensed under the terms of the MIT license. +For a copy, see LICENSE in the project root. + +Author: + Benedikt Schwering + Sebastian Forstner +''' +import src.services.mrt_simulation as mrt_simulation_service +import src.services.exabgp as exabgp_service from src.webapp import start_webapp -from mrtparse import Reader -import click, time, sys -from rich import print +import click @click.group() def cli(): @@ -52,31 +59,29 @@ def cli(): is_flag=True, ) def exabgp(no_rabbitmq_direct: bool, rabbitmq_grouped: int, no_mongodb_log: bool, no_mongodb_state: bool, no_mongodb_statistics: bool, clear_mongodb: bool): - parser = ExaBGPParser() - - if not no_rabbitmq_direct or rabbitmq_grouped: - RabbitMQAdapter( - parser=parser, - no_direct=no_rabbitmq_direct, - queue_interval=rabbitmq_grouped, - ) - - if not no_mongodb_log or not no_mongodb_state or not no_mongodb_statistics: - MongoDBAdapter( - parser=parser, - no_mongodb_log=no_mongodb_log, - no_mongodb_state=no_mongodb_state, - no_mongodb_statistics=no_mongodb_statistics, - clear_mongodb=clear_mongodb, - ) - - while True: - for line in sys.stdin: - parser.parse( - line=line, - ) - - time.sleep(1) + ''' + ExaBGP command for retrieving BGP messages from ExaBGP via stdin and processing them. + + Author: + Benedikt Schwering + Sebastian Forstner + + Args: + no_rabbitmq_direct (bool): Disable direct RabbitMQ direct queue.. + rabbitmq_grouped (int): Queue group interval in minutes. + no_mongodb_log (bool): Disable logging to MongoDB. + no_mongodb_state (bool): Disable state storage to MongoDB. + no_mongodb_statistics (bool): Disable statistics storage to MongoDB. + clear_mongodb (bool): Clear MongoDB collections. + ''' + exabgp_service.exabgp( + no_rabbitmq_direct=no_rabbitmq_direct, + rabbitmq_grouped=rabbitmq_grouped, + no_mongodb_log=no_mongodb_log, + no_mongodb_state=no_mongodb_state, + no_mongodb_statistics=no_mongodb_statistics, + clear_mongodb=clear_mongodb, + ) @cli.command( name='mrt-simulation', @@ -146,56 +151,36 @@ def exabgp(no_rabbitmq_direct: bool, rabbitmq_grouped: int, no_mongodb_log: bool required=True, nargs=-1, ) -def mrt_simulation(no_rabbitmq_direct: bool, rabbitmq_grouped: int, no_mongodb_log: bool, no_mongodb_state: bool, no_mongodb_statistics: bool, clear_mongodb: bool, playback_speed: int, playback_interval: int, mrt_files: tuple[str]): - parser = MrtBgp4MpParser() - - if not no_rabbitmq_direct or rabbitmq_grouped: - RabbitMQAdapter( - parser=parser, - no_direct=no_rabbitmq_direct, - queue_interval=rabbitmq_grouped, - ) - - if not no_mongodb_log or not no_mongodb_state or not no_mongodb_statistics: - MongoDBAdapter( - parser=parser, - no_mongodb_log=no_mongodb_log, - no_mongodb_state=no_mongodb_state, - no_mongodb_statistics=no_mongodb_statistics, - clear_mongodb=clear_mongodb, - ) - - playback_speed_reference: datetime = None - playback_interval_stop: datetime = None - - for mrt_file in mrt_files: - for message in Reader(mrt_file): - if message.data['type'] != {16: 'BGP4MP'}: - print('[dark_orange]\[WARN][/] Skipping unsupported MRT type: ', end='') - print(message.data['type']) - continue - - current_timestamp: datetime = datetime.fromtimestamp( - timestamp=list(message.data['timestamp'].keys())[0], - ) - - if playback_speed: - if playback_speed_reference: - time.sleep((current_timestamp - playback_speed_reference).seconds / playback_speed) - - playback_speed_reference = current_timestamp - - if playback_interval: - if playback_interval_stop: - if current_timestamp > playback_interval_stop: - input('Enter for next interval...') - playback_interval_stop = playback_interval_stop + timedelta(minutes=playback_interval) - else: - playback_interval_stop = current_timestamp + timedelta(minutes=playback_interval) - - parser.parse( - bgp4mp_message=message, - ) +def mrt_simulation(no_rabbitmq_direct: bool, rabbitmq_grouped: int, no_mongodb_log: bool, no_mongodb_state: bool, no_mongodb_statistics: bool, clear_mongodb: bool, playback_speed: int, playback_interval: int, mrt_files: tuple[str, ...]): + ''' + MRT Simulation command for retrieving BGP messages from MRT files and processing them. + + Author: + Benedikt Schwering + Sebastian Forstner + + Args: + no_rabbitmq_direct (bool): Disable direct RabbitMQ direct queue.. + rabbitmq_grouped (int): Queue group interval in minutes. + no_mongodb_log (bool): Disable logging to MongoDB. + no_mongodb_state (bool): Disable state storage to MongoDB. + no_mongodb_statistics (bool): Disable statistics storage to MongoDB. + clear_mongodb (bool): Clear MongoDB collections. + playback_speed (int): Playback speed in multiples of real time. + playback_interval (int): Playback interval in minutes. + mrt_files (tuple[str, ...]): MRT files to process. + ''' + mrt_simulation_service.mrt_simulation( + no_rabbitmq_direct=no_rabbitmq_direct, + rabbitmq_grouped=rabbitmq_grouped, + no_mongodb_log=no_mongodb_log, + no_mongodb_state=no_mongodb_state, + no_mongodb_statistics=no_mongodb_statistics, + clear_mongodb=clear_mongodb, + playback_speed=playback_speed, + playback_interval=playback_interval, + mrt_files=mrt_files, + ) @cli.command( name='webapp', @@ -207,6 +192,15 @@ def mrt_simulation(no_rabbitmq_direct: bool, rabbitmq_grouped: int, no_mongodb_l is_flag=True, ) def webapp(reload: bool): + ''' + WebApp command for launching the admin WebApp. + + Author: + Benedikt Schwering + + Args: + reload (bool): Reload the WebApp on changes. + ''' start_webapp( reload=reload, ) diff --git a/src/models/mrt_library.py b/src/models/mrt_library.py index d8fe642..7be2637 100644 --- a/src/models/mrt_library.py +++ b/src/models/mrt_library.py @@ -1,14 +1,44 @@ +# -*- coding: utf-8 -*- +''' +ZettaBGP - Advanced Anomaly Detection in Internet Routing +Copyright (c) 2024 Benedikt Schwering and Sebastian Forstner + +This work is licensed under the terms of the MIT license. +For a copy, see LICENSE in the project root. + +Author: + Benedikt Schwering + Sebastian Forstner +''' from pydantic import BaseModel from typing import Optional class MRTScenarioRequest(BaseModel): + ''' + This class represents a request to run an MRT scenario. + + Author: + Benedikt Schwering + ''' id: str class MRTScenarioResult(BaseModel): + ''' + This class represents the result of an MRT scenario. + + Author: + Benedikt Schwering + ''' count_announce: int count_withdraw: int class MRTScenario(BaseModel): + ''' + This class represents an MRT scenario. + + Author: + Benedikt Schwering + ''' id: str path: str name: str @@ -23,4 +53,10 @@ class MRTScenario(BaseModel): mrt_files: list[str] class MRTLibrary(BaseModel): + ''' + This class represents an MRT library. + + Author: + Benedikt Schwering + ''' scenarios: list[MRTScenario] diff --git a/src/models/route_update.py b/src/models/route_update.py index 04a7a92..d5805bc 100644 --- a/src/models/route_update.py +++ b/src/models/route_update.py @@ -1,36 +1,90 @@ +# -*- coding: utf-8 -*- +''' +ZettaBGP - Advanced Anomaly Detection in Internet Routing +Copyright (c) 2024 Benedikt Schwering and Sebastian Forstner + +This work is licensed under the terms of the MIT license. +For a copy, see LICENSE in the project root. + +Author: + Benedikt Schwering + Sebastian Forstner +''' from pydantic import BaseModel from datetime import datetime from typing import Optional from enum import Enum class ChangeType(Enum): + ''' + This class represents the change type of a route update. + + Author: + Benedikt Schwering + ''' ANNOUNCE = 1, WITHDRAW = 2, class NLRI(BaseModel): + ''' + This class represents the network layer reachability information of a route update. + + Author: + Benedikt Schwering + ''' prefix: str length: int class OriginType(Enum): + ''' + This class represents the origin type of a route update. + + Author: + Benedikt Schwering + ''' IGP = 1 EGP = 2 INCOMPLETE = 3 class AsPathType(Enum): + ''' + This class represents the AS path type of a route update. + + Author: + Benedikt Schwering + ''' AS_SET = 1 AS_SEQUENCE = 2 AS_CONFED_SET = 3 AS_CONFED_SEQUENCE = 4 class AsPath(BaseModel): + ''' + This class represents the AS path of a route update. + + Author: + Benedikt Schwering + ''' type: AsPathType value: list[int] class Aggregator(BaseModel): + ''' + This class represents the aggregator of a route update. + + Author: + Benedikt Schwering + ''' router_id: str router_as: int class PathAttributes(BaseModel): + ''' + This class represents the path attributes of a route update. + + Author: + Benedikt Schwering + ''' origin: Optional[OriginType] = None as_path: Optional[list[AsPath]] = None next_hop: Optional[list[str]] = None @@ -45,6 +99,12 @@ class PathAttributes(BaseModel): cluster_list: Optional[list[str]] = None class RouteUpdate(BaseModel): + ''' + This class represents a route update. + + Author: + Benedikt Schwering + ''' timestamp: datetime = datetime.now() peer_ip: str local_ip: str diff --git a/src/parsers/exabgp.py b/src/parsers/exabgp.py index 85e6c9c..4763a51 100644 --- a/src/parsers/exabgp.py +++ b/src/parsers/exabgp.py @@ -1,10 +1,27 @@ +# -*- coding: utf-8 -*- +''' +ZettaBGP - Advanced Anomaly Detection in Internet Routing +Copyright (c) 2024 Benedikt Schwering and Sebastian Forstner + +This work is licensed under the terms of the MIT license. +For a copy, see LICENSE in the project root. + +Author: + Benedikt Schwering + Sebastian Forstner +''' from src.models.route_update import PathAttributes, RouteUpdate, OriginType, Aggregator, ChangeType, AsPathType, AsPath, NLRI from src.parsers.route_update import RouteUpdateParser from datetime import datetime import json class ExaBGPParser(RouteUpdateParser): - '''This class is responsible for parsing ExaBGP messages''' + ''' + This class is responsible for parsing ExaBGP messages. + + Author: + Benedikt Schwering + ''' def _parse_origin(self, origin: str) -> OriginType: if origin is None: return None @@ -76,6 +93,18 @@ def _parse_path_attributes(self, exabgp_message: dict, next_hop: str = None) -> ) def parse(self, line: str) -> list[RouteUpdate]: + ''' + Parse an ExaBGP message. + + Author: + Benedikt Schwering + + Args: + line (str): The ExaBGP message. + + Returns: + list[RouteUpdate]: The parsed RouteUpdate objects. + ''' route_updates: list[RouteUpdate] = [] exabgp_message = json.loads(line) @@ -96,7 +125,7 @@ def parse(self, line: str) -> list[RouteUpdate]: ), ) - '''Iterate over the withdraw routes and create RouteUpdate objects''' + # Iterate over the withdraw routes and create RouteUpdate objects for withdraw_routes in exabgp_message['neighbor']['message']['update'].get('withdraw', {}).values(): for withdraw_route in withdraw_routes: route_updates.append( @@ -111,7 +140,7 @@ def parse(self, line: str) -> list[RouteUpdate]: ) ) - '''Iterate over the announce routes and create RouteUpdate objects''' + # Iterate over the announce routes and create RouteUpdate objects for announce_hops in exabgp_message['neighbor']['message']['update'].get('announce', {}).values(): for announce_hop, announce_routes in announce_hops.items(): for announce_route in announce_routes: diff --git a/src/parsers/mrt_bgp4mp.py b/src/parsers/mrt_bgp4mp.py index 8ad68f5..6881371 100644 --- a/src/parsers/mrt_bgp4mp.py +++ b/src/parsers/mrt_bgp4mp.py @@ -1,3 +1,15 @@ +# -*- coding: utf-8 -*- +''' +ZettaBGP - Advanced Anomaly Detection in Internet Routing +Copyright (c) 2024 Benedikt Schwering and Sebastian Forstner + +This work is licensed under the terms of the MIT license. +For a copy, see LICENSE in the project root. + +Author: + Benedikt Schwering + Sebastian Forstner +''' from src.models.route_update import PathAttributes, RouteUpdate, OriginType, Aggregator, ChangeType, AsPathType, AsPath, NLRI from src.parsers.route_update import RouteUpdateParser from collections import OrderedDict @@ -5,7 +17,12 @@ from mrtparse import Bgp4Mp class MrtBgp4MpParser(RouteUpdateParser): - '''This class is responsible for parsing MRT BGP4MP messages''' + ''' + This class is responsible for parsing MRT BGP4MP messages. + + Author: + Benedikt Schwering + ''' def _parse_nlri(self, nlri: OrderedDict) -> NLRI: nlri = dict(nlri) @@ -14,7 +31,7 @@ def _parse_nlri(self, nlri: OrderedDict) -> NLRI: length=nlri['length'], ) - def _get_path_attribute(self, path_attributes: OrderedDict, type: dict) -> dict: + def _get_path_attribute(self, path_attributes: list[OrderedDict], type: dict) -> dict: for path_attribute in path_attributes: path_attribute = dict(path_attribute) @@ -100,7 +117,7 @@ def _parse_multi_exit_disc(self, path_attributes: list[OrderedDict]) -> int: return multi_exit_disc['value'] - def _parse_atomic_aggregate(self, path_attributes: list[OrderedDict]) -> int: + def _parse_atomic_aggregate(self, path_attributes: list[OrderedDict]) -> bool: atomic_aggregate = self._get_path_attribute( path_attributes=path_attributes, type={6: 'ATOMIC_AGGREGATE'}, @@ -240,6 +257,18 @@ def _parse_mp_unreach_nlri(self, path_attributes: list[OrderedDict]) -> list[NLR ] def parse(self, bgp4mp_message: Bgp4Mp) -> list[RouteUpdate]: + ''' + Parse a BGP4MP message. + + Author: + Benedikt Schwering + + Args: + bgp4mp_message (Bgp4Mp): The BGP4MP message. + + Returns: + list[RouteUpdate]: The parsed RouteUpdate objects. + ''' route_updates: list[RouteUpdate] = [] bgp4mp_message = dict(bgp4mp_message.data) @@ -261,7 +290,7 @@ def parse(self, bgp4mp_message: Bgp4Mp) -> list[RouteUpdate]: ), ) - '''Iterate over the withdraw routes and create RouteUpdate objects''' + # Iterate over the withdraw routes and create RouteUpdate objects for withdraw_route in nested_bgp4mp_message.get('withdrawn_routes', []) + self._parse_mp_unreach_nlri( path_attributes=nested_bgp4mp_message.get('path_attributes', []), ): @@ -276,7 +305,7 @@ def parse(self, bgp4mp_message: Bgp4Mp) -> list[RouteUpdate]: ) ) - '''Iterate over the announce routes and create RouteUpdate objects''' + # Iterate over the announce routes and create RouteUpdate objects for announce_route in nested_bgp4mp_message.get('nlri', []) + self._parse_mp_reach_nlri( path_attributes=nested_bgp4mp_message.get('path_attributes', []), ): diff --git a/src/parsers/route_update.py b/src/parsers/route_update.py index 8a66864..6560d72 100644 --- a/src/parsers/route_update.py +++ b/src/parsers/route_update.py @@ -1,6 +1,24 @@ +# -*- coding: utf-8 -*- +''' +ZettaBGP - Advanced Anomaly Detection in Internet Routing +Copyright (c) 2024 Benedikt Schwering and Sebastian Forstner + +This work is licensed under the terms of the MIT license. +For a copy, see LICENSE in the project root. + +Author: + Benedikt Schwering + Sebastian Forstner +''' from src.models.route_update import RouteUpdate class RouteUpdateParser: + ''' + This class defines a common interface for all route update parsers. + + Author: + Benedikt Schwering + ''' _on_update_functions = [] def _send_messages(self, messages: list[RouteUpdate]): @@ -9,4 +27,13 @@ def _send_messages(self, messages: list[RouteUpdate]): fn(message) def on_update(self, fn): + ''' + Register a function that should be called when a new route update is parsed. + + Author: + Benedikt Schwering + + Args: + fn: The function that should be called when a new route update is parsed. + ''' self._on_update_functions.append(fn) diff --git a/src/services/__init__.py b/src/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/services/exabgp.py b/src/services/exabgp.py new file mode 100644 index 0000000..6ea10e4 --- /dev/null +++ b/src/services/exabgp.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- +''' +ZettaBGP - Advanced Anomaly Detection in Internet Routing +Copyright (c) 2024 Benedikt Schwering and Sebastian Forstner + +This work is licensed under the terms of the MIT license. +For a copy, see LICENSE in the project root. + +Author: + Benedikt Schwering + Sebastian Forstner +''' +from src.adapters.rabbitmq import RabbitMQAdapter +from src.adapters.mongodb import MongoDBAdapter +from src.parsers.exabgp import ExaBGPParser +import time, sys + +def exabgp(no_rabbitmq_direct: bool, rabbitmq_grouped: int, no_mongodb_log: bool, no_mongodb_state: bool, no_mongodb_statistics: bool, clear_mongodb: bool): + ''' + ExaBGP service for retrieving BGP messages from ExaBGP via stdin and processing them. + + Author: + Benedikt Schwering + Sebastian Forstner + + Args: + no_rabbitmq_direct (bool): Disable direct RabbitMQ direct queue.. + rabbitmq_grouped (int): Queue group interval in minutes. + no_mongodb_log (bool): Disable logging to MongoDB. + no_mongodb_state (bool): Disable state storage to MongoDB. + no_mongodb_statistics (bool): Disable statistics storage to MongoDB. + clear_mongodb (bool): Clear MongoDB collections. + ''' + parser = ExaBGPParser() + + if not no_rabbitmq_direct or rabbitmq_grouped: + RabbitMQAdapter( + parser=parser, + no_direct=no_rabbitmq_direct, + queue_interval=rabbitmq_grouped, + ) + + if not no_mongodb_log or not no_mongodb_state or not no_mongodb_statistics: + MongoDBAdapter( + parser=parser, + no_mongodb_log=no_mongodb_log, + no_mongodb_state=no_mongodb_state, + no_mongodb_statistics=no_mongodb_statistics, + clear_mongodb=clear_mongodb, + ) + + while True: + for line in sys.stdin: + parser.parse( + line=line, + ) + + time.sleep(1) diff --git a/src/services/mrt_simulation.py b/src/services/mrt_simulation.py new file mode 100644 index 0000000..bb95c97 --- /dev/null +++ b/src/services/mrt_simulation.py @@ -0,0 +1,110 @@ +# -*- coding: utf-8 -*- +''' +ZettaBGP - Advanced Anomaly Detection in Internet Routing +Copyright (c) 2024 Benedikt Schwering and Sebastian Forstner + +This work is licensed under the terms of the MIT license. +For a copy, see LICENSE in the project root. + +Author: + Benedikt Schwering + Sebastian Forstner +''' +from src.parsers.mrt_bgp4mp import MrtBgp4MpParser +from src.adapters.rabbitmq import RabbitMQAdapter +from src.adapters.mongodb import MongoDBAdapter +from src.models.route_update import ChangeType +from datetime import timedelta, datetime +from pydantic import BaseModel +from mrtparse import Reader +import time + +class MRTSimulationResult(BaseModel): + count_announce: int + count_withdraw: int + +def mrt_simulation(no_rabbitmq_direct: bool = False, rabbitmq_grouped: int = None, no_mongodb_log: bool = False, no_mongodb_state: bool = False, no_mongodb_statistics: bool = False, clear_mongodb: bool = False, playback_speed: int = None, playback_interval: int = None, mrt_files: tuple[str, ...] = ()) -> MRTSimulationResult: + ''' + MRT Simulation service for retrieving BGP messages from MRT files and processing them. + + Author: + Benedikt Schwering + Sebastian Forstner + + Args: + no_rabbitmq_direct (bool): Disable direct RabbitMQ direct queue.. + rabbitmq_grouped (int): Queue group interval in minutes. + no_mongodb_log (bool): Disable logging to MongoDB. + no_mongodb_state (bool): Disable state storage to MongoDB. + no_mongodb_statistics (bool): Disable statistics storage to MongoDB. + clear_mongodb (bool): Clear MongoDB collections. + playback_speed (int): Playback speed in multiples of real time. + playback_interval (int): Playback interval in minutes. + mrt_files (tuple[str, ...]): MRT files to process. + + Returns: + MRTSimulationResult: The result of the MRT simulation. + ''' + mrt_simulation_result = MRTSimulationResult( + count_announce=0, + count_withdraw=0, + ) + + parser = MrtBgp4MpParser() + + if not no_rabbitmq_direct or rabbitmq_grouped: + RabbitMQAdapter( + parser=parser, + no_direct=no_rabbitmq_direct, + queue_interval=rabbitmq_grouped, + ) + + if not no_mongodb_log or not no_mongodb_state or not no_mongodb_statistics: + MongoDBAdapter( + parser=parser, + no_mongodb_log=no_mongodb_log, + no_mongodb_state=no_mongodb_state, + no_mongodb_statistics=no_mongodb_statistics, + clear_mongodb=clear_mongodb, + ) + + playback_speed_reference: datetime = None + playback_interval_stop: datetime = None + + for mrt_file in mrt_files: + for message in Reader(mrt_file): + if message.data['type'] != {16: 'BGP4MP'}: + print('[dark_orange]\[WARN][/] Skipping unsupported MRT type: ', end='') + print(message.data['type']) + continue + + current_timestamp: datetime = datetime.fromtimestamp( + timestamp=list(message.data['timestamp'].keys())[0], + ) + + if playback_speed: + if playback_speed_reference: + time.sleep((current_timestamp - playback_speed_reference).seconds / playback_speed) + + playback_speed_reference = current_timestamp + + if playback_interval: + if playback_interval_stop: + if current_timestamp > playback_interval_stop: + input('Enter for next interval...') + playback_interval_stop = playback_interval_stop + timedelta(minutes=playback_interval) + else: + playback_interval_stop = current_timestamp + timedelta(minutes=playback_interval) + + updates = parser.parse( + bgp4mp_message=message, + ) + + if updates: + for update in updates: + if update.change_type == ChangeType.ANNOUNCE: + mrt_simulation_result.count_announce += 1 + elif update.change_type == ChangeType.WITHDRAW: + mrt_simulation_result.count_withdraw += 1 + + return mrt_simulation_result diff --git a/src/webapp.py b/src/webapp.py index 1fd4dcd..0ff481c 100644 --- a/src/webapp.py +++ b/src/webapp.py @@ -1,3 +1,15 @@ +# -*- coding: utf-8 -*- +''' +ZettaBGP - Advanced Anomaly Detection in Internet Routing +Copyright (c) 2024 Benedikt Schwering and Sebastian Forstner + +This work is licensed under the terms of the MIT license. +For a copy, see LICENSE in the project root. + +Author: + Benedikt Schwering + Sebastian Forstner +''' from src.controllers.mrt_library import mrt_library_router from src.controllers.version import version_router from fastapi.staticfiles import StaticFiles @@ -26,13 +38,35 @@ @app.get('/') def serve_angular_root(): + ''' + Route to serve the index.html of the Angular UI. + + Author: + Benedikt Schwering + ''' return FileResponse(os.getenv('ZETTABGP_WEBAPP_UI_PATH', 'src/ui') + '/index.html') @app.exception_handler(404) def serve_angular_root(_, __): + ''' + Route to serve the index.html of the Angular UI when a 404 error occurs. + This is needed due to the reactive routing of Angular. + + Author: + Benedikt Schwering + ''' return FileResponse(os.getenv('ZETTABGP_WEBAPP_UI_PATH', 'src/ui') + '/index.html') def start_webapp(reload: bool): + ''' + Start the web application. + + Author: + Benedikt Schwering + + Args: + reload (bool): Reload the WebApp on changes. + ''' uvicorn.run( app=os.getenv('ZETTABGP_WEBAPP_APP', 'src.webapp:app'), host='0.0.0.0', diff --git a/tests/test_exabgp_parser.py b/tests/test_exabgp_parser.py index 0dc6246..03d3c7a 100644 --- a/tests/test_exabgp_parser.py +++ b/tests/test_exabgp_parser.py @@ -1,12 +1,36 @@ +# -*- coding: utf-8 -*- +''' +ZettaBGP - Advanced Anomaly Detection in Internet Routing +Copyright (c) 2024 Benedikt Schwering and Sebastian Forstner + +This work is licensed under the terms of the MIT license. +For a copy, see LICENSE in the project root. + +Author: + Benedikt Schwering + Sebastian Forstner +''' from src.models.route_update import PathAttributes, RouteUpdate, OriginType, Aggregator, ChangeType, AsPathType, AsPath, NLRI from src.parsers.exabgp import ExaBGPParser from datetime import datetime import unittest class ExaBGPParserTests(unittest.TestCase): + ''' + Tests for the ExaBGP parser. + + Author: + Benedikt Schwering + ''' exabgp_parser = ExaBGPParser() def test_announce_basic_1(self): + ''' + Test the parsing of a basic announce message. + + Author: + Benedikt Schwering + ''' self.assertEqual( first=self.exabgp_parser.parse( line='{ "exabgp": "4.0.1", "time": 1729362675.303443, "host" : "node103", "pid" : 41610, "ppid" : 41609, "counter": 18, "type": "update", "neighbor": { "address": { "local": "172.17.179.103", "peer": "172.17.179.104" }, "asn": { "local": 1, "peer": 1 } , "direction": "receive", "message": { "update": { "attribute": { "origin": "igp", "local-preference": 100 }, "announce": { "ipv4 unicast": { "172.17.179.104": [ { "nlri": "1.1.0.0/24" } ] } } } } } }', @@ -35,6 +59,12 @@ def test_announce_basic_1(self): ) def test_announce_basic_2(self): + ''' + Test the parsing of a basic announce message. + + Author: + Benedikt Schwering + ''' self.assertEqual( first=self.exabgp_parser.parse( line='{ "exabgp": "4.0.1", "time": 1729362676.3019273, "host" : "node103", "pid" : 41610, "ppid" : 41609, "counter": 19, "type": "update", "neighbor": { "address": { "local": "172.17.179.103", "peer": "172.17.179.104" }, "asn": { "local": 1, "peer": 1 } , "direction": "receive", "message": { "update": { "attribute": { "origin": "igp", "local-preference": 100 }, "announce": { "ipv4 unicast": { "172.17.179.104": [ { "nlri": "1.1.0.0/25" } ] } } } } } }', @@ -63,6 +93,12 @@ def test_announce_basic_2(self): ) def test_announce_large_community(self): + ''' + Test the parsing of a announce message with large communities. + + Author: + Benedikt Schwering + ''' self.assertEqual( first=self.exabgp_parser.parse( line='{ "exabgp": "4.0.1", "time": 1729363609.4892604, "host" : "node103", "pid" : 41733, "ppid" : 41732, "counter": 34, "type": "update", "neighbor": { "address": { "local": "172.17.179.103", "peer": "172.17.179.104" }, "asn": { "local": 1, "peer": 1 } , "direction": "receive", "message": { "update": { "attribute": { "origin": "igp", "as-path": [ 12779, 12654 ], "confederation-path": [], "med": 1110, "local-preference": 100, "aggregator": "64521:10.6.39.0", "community": [ [ 12779, 10401 ], [ 12779, 65000 ] ], "large-community": [ [ 6695, 1911 , 172 ], [ 6695, 1912 , 0 ], [ 6695, 1913 , 276 ], [ 6695, 1914 , 150 ] ] }, "announce": { "ipv6 unicast": { "2001:7f8::31eb:0:1": [ { "nlri": "2001:7fb:fe15::/48" } ] } } } } } }', @@ -138,6 +174,12 @@ def test_announce_large_community(self): ) def test_withdraw_basic(self): + ''' + Test the parsing of a basic withdraw message. + + Author: + Benedikt Schwering + ''' self.assertEqual( first=self.exabgp_parser.parse( line='{ "exabgp": "4.0.1", "time": 1729362677.302448, "host" : "node103", "pid" : 41610, "ppid" : 41609, "counter": 20, "type": "update", "neighbor": { "address": { "local": "172.17.179.103", "peer": "172.17.179.104" }, "asn": { "local": 1, "peer": 1 } , "direction": "receive", "message": { "update": { "withdraw": { "ipv4 unicast": [ { "nlri": "1.1.0.0/24" } ] } } } } }', diff --git a/tests/test_mrt_bgp4mp_parser.py b/tests/test_mrt_bgp4mp_parser.py index 96b9907..a855b1f 100644 --- a/tests/test_mrt_bgp4mp_parser.py +++ b/tests/test_mrt_bgp4mp_parser.py @@ -1,3 +1,15 @@ +# -*- coding: utf-8 -*- +''' +ZettaBGP - Advanced Anomaly Detection in Internet Routing +Copyright (c) 2024 Benedikt Schwering and Sebastian Forstner + +This work is licensed under the terms of the MIT license. +For a copy, see LICENSE in the project root. + +Author: + Benedikt Schwering + Sebastian Forstner +''' from src.models.route_update import PathAttributes, RouteUpdate, OriginType, Aggregator, ChangeType, AsPathType, AsPath, NLRI from src.parsers.mrt_bgp4mp import MrtBgp4MpParser from datetime import datetime @@ -5,6 +17,12 @@ import unittest, copy class MrtBgp4MpParserTests(unittest.TestCase): + ''' + Tests for the MRT BGP4MP parser. + + Author: + Benedikt Schwering + ''' mrt_bgp4mp_parser = MrtBgp4MpParser() messages = [ @@ -13,6 +31,12 @@ class MrtBgp4MpParserTests(unittest.TestCase): ] def test_announce_1(self): + ''' + Test the parsing of a basic announce message. + + Author: + Benedikt Schwering + ''' self.assertEqual( first=self.mrt_bgp4mp_parser.parse( bgp4mp_message=self.messages[-1],