diff --git a/src/adapters/mongodb.py b/src/adapters/mongodb.py index 3720b87..023862d 100644 --- a/src/adapters/mongodb.py +++ b/src/adapters/mongodb.py @@ -14,10 +14,9 @@ from src.models.route_update import RouteUpdate from src.models.route_update import ChangeType from pymongo import MongoClient +from datetime import datetime from typing import Optional from bson import ObjectId -from datetime import datetime -from collections import OrderedDict import os class MongoDBAdapter: @@ -224,9 +223,28 @@ def on_update(message: RouteUpdate): } statistics_announce = statistics_collection.update_one(statistics_filter, new_values, upsert=True) +class MongoDBLogLoader: + ''' + This class is responsible for loading messages from the MongoDB Log. + + Author: + Sebastian Forstner + ''' + @staticmethod + def load_messages(timestamp_start: datetime, timestamp_end: datetime) -> list[dict]: + ''' + Loads messages from the MongoDB Log. + + Author: + Sebastian Forstner -class DB_logoutput: - def load_messages(timestamp_start: datetime, timestamp_end: datetime): + Args: + timestamp_start (datetime): The start timestamp. + timestamp_end (datetime): The end timestamp. + + Returns: + list[dict]: The loaded messages. + ''' try: '''Connects to MongoDB-Container running with Docker''' database_client = MongoClient( diff --git a/src/main.py b/src/main.py index 9d5f50b..886f9a0 100644 --- a/src/main.py +++ b/src/main.py @@ -10,10 +10,10 @@ Benedikt Schwering Sebastian Forstner ''' -from src.adapters.mongodb import MongoDBAdapter, DB_logoutput +from src.adapters.mongodb import MongoDBAdapter, MongoDBLogLoader import src.services.mrt_simulation as mrt_simulation_service from src.adapters.rabbitmq import RabbitMQAdapter -from src.parsers.dbreverse import ReverseParser +from src.parsers.reverse import ReverseParser import src.services.exabgp as exabgp_service from datetime import timedelta, datetime from src.parsers.rib import RibParser @@ -367,28 +367,48 @@ def rib_load(no_rabbitmq_direct: bool, rabbitmq_grouped: int, no_mongodb_log: bo '-b', type=float, default=None, - help='Starttime of replay as timestamp', + help='Starttime of replay as timestamp.', ) @click.option( '--end-timestamp', '-e', type=float, default=None, - help='Endtime of replay as timestamp', + help='Endtime of replay as timestamp.', ) @click.option( '--start-time', '-r', type=str, - help='Starttime of replay as time; in format (T is a set character): YYYY-MM-DDThh:mm:ss', + help='Starttime of replay as time; in format (T is a set character): YYYY-MM-DDThh:mm:ss.', ) @click.option( '--end-time', '-f', type=str, - help='Endtime of replay as time; in format (T is a set character): YYYY-MM-DDThh:mm:ss', + help='Endtime of replay as time; in format (T is a set character): YYYY-MM-DDThh:mm:ss.', ) def message_replay(no_rabbitmq_direct: bool, rabbitmq_grouped: int, no_mongodb_log: bool, no_mongodb_state: bool, no_mongodb_statistics: bool, clear_mongodb: bool, playback_speed: int, playback_interval: int, start_timestamp: float, end_timestamp: float, start_time: str, end_time: str): + ''' + Message replay command for replaying BGP messages from Database log. + + Author: + Sebastian Forstner + + Args: + no_rabbitmq_direct (bool): Disable direct RabbitMQ direct queue.. + rabbitmq_grouped (int): Queue group interval in minutes. + no_mongodb_log (bool): Disable logging to MongoDB. + no_mongodb_state (bool): Disable state storage to MongoDB. + no_mongodb_statistics (bool): Disable statistics storage to MongoDB. + clear_mongodb (bool): Clear MongoDB collections. + playback_speed (int): Playback speed in multiples of real time. + playback_interval (int): Playback interval in minutes. + start_timestamp (float): Starttime of replay as timestamp. + end_timestamp (float): Endtime of replay as timestamp. + start_time (str): Starttime of replay as time; in format (T is a set character): YYYY-MM-DDThh:mm:ss. + end_time (str): Endtime of replay as time; in format (T is a set character): YYYY-MM-DDThh:mm:ss. + ''' parser = ReverseParser() if not no_rabbitmq_direct or rabbitmq_grouped: @@ -405,13 +425,13 @@ def message_replay(no_rabbitmq_direct: bool, rabbitmq_grouped: int, no_mongodb_l if start_timestamp and end_timestamp: start_time = datetime.fromtimestamp(start_timestamp) end_time = datetime.fromtimestamp(end_timestamp) - new_messages = DB_logoutput.load_messages(timestamp_start = start_time, timestamp_end = end_time) + new_messages = MongoDBLogLoader.load_messages(timestamp_start = start_time, timestamp_end = end_time) elif start_time and end_time: time_start = datetime.fromisoformat(start_time) time_end = datetime.fromisoformat(end_time) - new_messages = DB_logoutput.load_messages(timestamp_start = time_start, timestamp_end = time_end) + new_messages = MongoDBLogLoader.load_messages(timestamp_start = time_start, timestamp_end = time_end) else: - new_messages = DB_logoutput.load_messages(timestamp_start = None, timestamp_end = None) + new_messages = MongoDBLogLoader.load_messages(timestamp_start = None, timestamp_end = None) # Copy messages in local list to avoid deleting them if -c is set; new_messages is corsor pointing to db all_messages: list[OrderedDict] = [] diff --git a/src/parsers/dbreverse.py b/src/parsers/dbreverse.py deleted file mode 100644 index a875b96..0000000 --- a/src/parsers/dbreverse.py +++ /dev/null @@ -1,124 +0,0 @@ -from src.models.route_update import PathAttributes, RouteUpdate, OriginType, Aggregator, ChangeType, AsPathType, AsPath, NLRI -from src.parsers.route_update import RouteUpdateParser -from collections import OrderedDict -from datetime import datetime -from mrtparse import Bgp4Mp -from typing import Optional - -class ReverseParser(RouteUpdateParser): - - def _parse_origin(self, orig_value: int) -> OriginType: - if orig_value: - match orig_value: - case 1: - return OriginType.IGP - case 2: - return OriginType.EGP - case 3: - return OriginType.INCOMPLETE - case _: - return None - else: - return None - - def _parse_as_path(self, as_paths: list[int, list[int]]) -> list[AsPath]: - all_paths: Optional[list[AsPath]] = None - if as_paths: - for path in as_paths: - path_type: AsPathType - match path[0]: - case 1: - path_type = AsPathType.AS_SET - case 2: - path_type = AsPathType.AS_SEQUENCE - case 3: - path_type = AsPathType.AS_CONFED_SET - case 4: - path_type = AsPathType.AS_CONFED_SEQUENCE - case _: - return None - new_path = AsPath( - type = path_type, - value = path[1] - ) - if all_paths: - all_paths.append(new_path) - else: - all_paths = [new_path] - - return all_paths - - def _parse_aggregator(self, aggregators: dict) -> Optional[Aggregator]: - all_aggregators: Optional[Aggregator] = None - if aggregators: - #aggregators = dict(aggregators) - new_aggregator = Aggregator( - router_id = aggregators['router_id'], - router_as = aggregators['router_as'], - ) - - return new_aggregator - - def _parse_extendet_community(self, ext_com: Optional[list[str]]) -> Optional[list[int]]: - all_aggregators: Optional[list[int]] = None - if ext_com: - for com in ext_com: - if all_aggregators: - all_aggregators.append(int(com)) - else: - all_aggregators = [int(com)] - - return all_aggregators - - def _parse_nlri(self, nlri: dict) -> NLRI: - new_nlri = NLRI( - prefix = nlri['prefix'], - length = nlri['length'], - ) - return new_nlri - - def _parse_change_type(self, ch_type: int) -> ChangeType: - match ch_type: - case 1: - return ChangeType.ANNOUNCE - case 2: - return ChangeType.WITHDRAW - case _: - return None - - def parse(self, message_data: dict) -> RouteUpdate: - route_updates: list[RouteUpdate] = [] - if message_data['path_attributes']['aggregator']: - aggregators = self._parse_aggregator(dict(message_data['path_attributes']['aggregator'])) - else: - aggregators = None - new_path_attribute = PathAttributes( - origin = self._parse_origin(message_data['path_attributes']['origin']), - as_path = self._parse_as_path(message_data['path_attributes']['as_path']), - next_hop = message_data['path_attributes']['next_hop'], - multi_exit_disc = message_data['path_attributes']['multi_exit_disc'], - local_pref = message_data['path_attributes']['local_pref'], - atomic_aggregate = message_data['path_attributes']['atomic_aggregate'], - aggregator = aggregators, - community = message_data['path_attributes']['community'], - large_community = message_data['path_attributes']['large_community'], - extended_community = self._parse_extendet_community(message_data['path_attributes']['extended_community']), - orginator_id = message_data['path_attributes']['orginator_id'], - cluster_list = message_data['path_attributes']['cluster_list'], - ) - new_route_update = RouteUpdate( - timestamp = message_data['timestamp'], - peer_ip = message_data['peer_ip'], - local_ip = message_data['local_ip'], - peer_as = message_data['peer_as'], - local_as = message_data['local_as'], - path_attributes = new_path_attribute, - change_type = self._parse_change_type(message_data['change_type'][0]), - nlri = self._parse_nlri(message_data['nlri']), - ) - route_updates.append(new_route_update) - self._send_messages(route_updates) - return route_updates - - - diff --git a/src/parsers/reverse.py b/src/parsers/reverse.py new file mode 100644 index 0000000..d479752 --- /dev/null +++ b/src/parsers/reverse.py @@ -0,0 +1,151 @@ +# -*- coding: utf-8 -*- +''' +ZettaBGP - Advanced Anomaly Detection in Internet Routing +Copyright (c) 2024 Benedikt Schwering and Sebastian Forstner + +This work is licensed under the terms of the MIT license. +For a copy, see LICENSE in the project root. + +Author: + Benedikt Schwering + Sebastian Forstner +''' +from src.models.route_update import PathAttributes, RouteUpdate, OriginType, Aggregator, ChangeType, AsPathType, AsPath, NLRI +from src.parsers.route_update import RouteUpdateParser +from typing import Optional + +class ReverseParser(RouteUpdateParser): + ''' + This class is responsible for parsing RIB messages. + + Author: + Benedikt Schwering + Sebastian Forstner + ''' + def _parse_origin(self, orig_value: int) -> OriginType: + if orig_value: + match orig_value: + case 1: + return OriginType.IGP + case 2: + return OriginType.EGP + case 3: + return OriginType.INCOMPLETE + case _: + return None + else: + return None + + def _parse_as_path(self, as_paths: list[int, list[int]]) -> list[AsPath]: + all_paths: Optional[list[AsPath]] = None + if as_paths: + for path in as_paths: + path_type: AsPathType + match path[0]: + case 1: + path_type = AsPathType.AS_SET + case 2: + path_type = AsPathType.AS_SEQUENCE + case 3: + path_type = AsPathType.AS_CONFED_SET + case 4: + path_type = AsPathType.AS_CONFED_SEQUENCE + case _: + return None + new_path = AsPath( + type=path_type, + value=path[1] + ) + if all_paths: + all_paths.append(new_path) + else: + all_paths = [new_path] + + return all_paths + + def _parse_aggregator(self, aggregators: dict) -> Optional[Aggregator]: + all_aggregators: Optional[Aggregator] = None + if aggregators: + # aggregators = dict(aggregators) + new_aggregator = Aggregator( + router_id=aggregators['router_id'], + router_as=aggregators['router_as'], + ) + + return new_aggregator + + def _parse_extendet_community(self, ext_com: Optional[list[str]]) -> Optional[list[int]]: + all_aggregators: Optional[list[int]] = None + if ext_com: + for com in ext_com: + if all_aggregators: + all_aggregators.append(int(com)) + else: + all_aggregators = [int(com)] + + return all_aggregators + + def _parse_nlri(self, nlri: dict) -> NLRI: + new_nlri = NLRI( + prefix=nlri['prefix'], + length=nlri['length'], + ) + return new_nlri + + def _parse_change_type(self, ch_type: int) -> ChangeType: + match ch_type: + case 1: + return ChangeType.ANNOUNCE + case 2: + return ChangeType.WITHDRAW + case _: + return None + + def parse(self, message_data: dict) -> list[RouteUpdate]: + ''' + Parse a Database Log message. + + Author: + Benedikt Schwering + Sebastian Forstner + + Args: + message_data (dict): The Database Log message. + + Returns: + list[RouteUpdate]: The parsed RouteUpdate objects. + ''' + route_updates: list[RouteUpdate] = [] + + if message_data['path_attributes']['aggregator']: + aggregators = self._parse_aggregator(dict(message_data['path_attributes']['aggregator'])) + else: + aggregators = None + new_path_attribute = PathAttributes( + origin=self._parse_origin(message_data['path_attributes']['origin']), + as_path=self._parse_as_path(message_data['path_attributes']['as_path']), + next_hop=message_data['path_attributes']['next_hop'], + multi_exit_disc=message_data['path_attributes']['multi_exit_disc'], + local_pref=message_data['path_attributes']['local_pref'], + atomic_aggregate=message_data['path_attributes']['atomic_aggregate'], + aggregator=aggregators, + community=message_data['path_attributes']['community'], + large_community=message_data['path_attributes']['large_community'], + extended_community=self._parse_extendet_community(message_data['path_attributes']['extended_community']), + orginator_id=message_data['path_attributes']['orginator_id'], + cluster_list=message_data['path_attributes']['cluster_list'], + ) + new_route_update = RouteUpdate( + timestamp=message_data['timestamp'], + peer_ip=message_data['peer_ip'], + local_ip=message_data['local_ip'], + peer_as=message_data['peer_as'], + local_as=message_data['local_as'], + path_attributes=new_path_attribute, + change_type=self._parse_change_type(message_data['change_type'][0]), + nlri=self._parse_nlri(message_data['nlri']), + ) + route_updates.append(new_route_update) + + self._send_messages(route_updates) + return route_updates