Skip to content

Commit

Permalink
Reformat code using Black
Browse files Browse the repository at this point in the history
  • Loading branch information
lamr02n committed Oct 2, 2024
1 parent f35f27d commit 3d6acd5
Show file tree
Hide file tree
Showing 13 changed files with 957 additions and 543 deletions.
2 changes: 1 addition & 1 deletion src/base/kafka_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
import sys
import time

import marshmallow_dataclass
from confluent_kafka import (
Consumer,
KafkaError,
KafkaException,
Producer,
TopicPartition,
)
import marshmallow_dataclass

sys.path.append(os.getcwd())
from src.base import Batch
Expand Down
8 changes: 3 additions & 5 deletions src/inspector/inspector.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
from datetime import datetime
from enum import Enum, unique
import importlib
import json
import logging
import os
import sys
import importlib
import numpy as np

from datetime import datetime
from enum import Enum, unique

import numpy as np
from streamad.util import StreamGenerator, CustomDS
Expand Down
24 changes: 13 additions & 11 deletions src/logcollector/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ def __init__(self):
f"Calling CollectorKafkaBatchSender(transactional_id='collector')..."
)
self.batch_handler = CollectorKafkaBatchSender()
logger.debug(
f"Calling LoglineHandler()..."
)
logger.debug(f"Calling LoglineHandler()...")
self.logline_handler = LoglineHandler()
logger.debug("Initialized LogCollector.")

Expand All @@ -68,7 +66,9 @@ def fetch_logline(self) -> None:
)
logger.debug("Connected to LogServer. Retrieving data...")

data = self.client_socket.recv(1024) # loglines are at most ~150 bytes long
data = self.client_socket.recv(
1024
) # loglines are at most ~150 bytes long

if not data:
logger.debug("No data available on LogServer.")
Expand All @@ -90,11 +90,11 @@ def add_logline_to_batch(self) -> None:
"""
logger.debug("Adding logline to batch...")
if not self.logline:
raise ValueError(
"Failed to add logline to batch: No logline."
)
raise ValueError("Failed to add logline to batch: No logline.")

log_data = self.logline_handler.validate_logline_and_get_fields_as_json(self.logline)
log_data = self.logline_handler.validate_logline_and_get_fields_as_json(
self.logline
)

logger.debug("Calling KafkaBatchSender to add message...")
subnet_id = f"{utils.get_first_part_of_ipv4_address(ipaddress.ip_address(log_data.get('client_ip')), SUBNET_BITS)}_{SUBNET_BITS}"
Expand Down Expand Up @@ -131,9 +131,11 @@ def main(one_iteration: bool = False) -> None:
"""
logger.info("Starting LogCollector...")
collector = LogCollector()
logger.info("LogCollector started.\n"
" ⤷ Fetching loglines from LogServer...\n"
" ⤷ Data will be sent when the respective batch is full or the global timer runs out.")
logger.info(
"LogCollector started.\n"
" ⤷ Fetching loglines from LogServer...\n"
" ⤷ Data will be sent when the respective batch is full or the global timer runs out."
)

iterations = 0

Expand Down
1 change: 0 additions & 1 deletion src/mock/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import time

sys.path.append(os.getcwd())
from src.base import utils
from src.base.log_config import setup_logging
from src.mock.log_generator import generate_dns_log_line

Expand Down
36 changes: 24 additions & 12 deletions src/prefilter/prefilter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
sys.path.append(os.getcwd())
from src.base.logline_handler import LoglineHandler
from src.base.utils import setup_config
from src.base.kafka_handler import KafkaConsumeHandler, KafkaMessageFetchException, KafkaProduceHandler
from src.base.kafka_handler import (
KafkaConsumeHandler,
KafkaMessageFetchException,
KafkaProduceHandler,
)
from src.base.log_config import setup_logging

setup_logging()
Expand All @@ -33,7 +37,7 @@ def __init__(self):
logger.debug(f"Calling LoglineHandler()...")
self.logline_handler = LoglineHandler()
logger.debug(f"Calling KafkaProduceHandler(transactional_id='prefilter')...")
self.kafka_produce_handler = KafkaProduceHandler(transactional_id='prefilter')
self.kafka_produce_handler = KafkaProduceHandler(transactional_id="prefilter")
logger.debug(f"Calling KafkaConsumeHandler(topic='Prefilter')...")
self.kafka_consume_handler = KafkaConsumeHandler(topic="Prefilter")
logger.debug("Initialized Prefilter.")
Expand All @@ -55,12 +59,16 @@ def get_and_fill_data(self):
self.unfiltered_data = data.get("data")

if not self.unfiltered_data:
logger.info(f"Received message:\n"
f" ⤷ Empty data field: No unfiltered data available. subnet_id: '{self.subnet_id}'")
logger.info(
f"Received message:\n"
f" ⤷ Empty data field: No unfiltered data available. subnet_id: '{self.subnet_id}'"
)
else:
logger.info(f"Received message:\n"
f" ⤷ Contains data field of {len(self.unfiltered_data)} message(s) with "
f"subnet_id: '{self.subnet_id}'.")
logger.info(
f"Received message:\n"
f" ⤷ Contains data field of {len(self.unfiltered_data)} message(s) with "
f"subnet_id: '{self.subnet_id}'."
)

logger.debug("Received consumer message as JSON data.")
logger.debug(f"{data=}")
Expand Down Expand Up @@ -102,11 +110,15 @@ def send_filtered_data(self):
data=json.dumps(data_to_send),
key=self.subnet_id,
)
logger.debug(f"Sent filtered data with time frame from {self.begin_timestamp} to {self.end_timestamp} and data"
f" ({len(self.filtered_data)} message(s)).")
logger.info(f"Filtered data was successfully sent:\n"
f" ⤷ Contains data field of {len(self.filtered_data)} message(s). Originally: "
f"{len(self.unfiltered_data)} message(s). Belongs to subnet_id '{self.subnet_id}'.")
logger.debug(
f"Sent filtered data with time frame from {self.begin_timestamp} to {self.end_timestamp} and data"
f" ({len(self.filtered_data)} message(s))."
)
logger.info(
f"Filtered data was successfully sent:\n"
f" ⤷ Contains data field of {len(self.filtered_data)} message(s). Originally: "
f"{len(self.unfiltered_data)} message(s). Belongs to subnet_id '{self.subnet_id}'."
)

def clear_data(self):
"""
Expand Down
Loading

0 comments on commit 3d6acd5

Please sign in to comment.