diff --git a/.github/workflows/build_test_macos.yml b/.github/workflows/build_test_macos.yml index 2bc3b29..e91090c 100644 --- a/.github/workflows/build_test_macos.yml +++ b/.github/workflows/build_test_macos.yml @@ -13,7 +13,7 @@ jobs: strategy: fail-fast: false matrix: - os: [ "macos-12" ] + os: [ "macos-14" ] python-version: [ "3.11", "3.10", ] @@ -32,6 +32,7 @@ jobs: - name: Install requirements run: | + brew install libomp python -m pip install --upgrade pip python -m pip install pyyaml python -m pip install -r requirements/requirements.dev.txt -r requirements/requirements.detector.txt -r requirements/requirements.logcollector.txt -r requirements/requirements.prefilter.txt -r requirements/requirements.inspector.txt -r requirements/requirements.logserver.txt diff --git a/docker/docker-compose.kafka.yml b/docker/docker-compose.kafka.yml index b76a621..e9b8c0c 100644 --- a/docker/docker-compose.kafka.yml +++ b/docker/docker-compose.kafka.yml @@ -5,6 +5,7 @@ services: networks: heidgaf: ipv4_address: 172.27.0.2 + restart: "unless-stopped" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_SERVER_ID: 1 diff --git a/src/logserver/server.py b/src/logserver/server.py index ac2506b..50028a7 100644 --- a/src/logserver/server.py +++ b/src/logserver/server.py @@ -26,9 +26,9 @@ class LogServer: """ - Server for receiving, storing and sending single log lines. Opens a port for receiving messages, listens for log - lines via Kafka and reads newly added lines from an input file. To retrieve a log line from the server, - other modules can connect to its outgoing/sending port. The server will then send its oldest log line as a response. + Server for receiving, storing and sending single log lines. Opens a port for receiving messages, listens for + messages via Kafka and reads newly added lines from an input file. To retrieve a message from the server, + other modules can connect to its outgoing/sending port. The server will then send its oldest message as a response. """ def __init__(self) -> None: @@ -217,18 +217,29 @@ async def send_logline(writer, logline) -> None: async def receive_logline(self, reader) -> None: """ - Receives a log line encoded as UTF-8 from the connected component and adds it to the data queue. + Receives one or multiple log lines encoded as UTF-8 separated by and ending with separator '\n' from the + connected component and adds it or them to the data queue. Message must end with separator symbol. Args: reader: Responsible for reading incoming data """ while True: - data = await reader.read(1024) - if not data: + try: + data = await reader.readuntil(separator=b"\n") + if not data: + break + received_message = data.decode().strip() + logger.info(f"Received message:\n ⤷ {received_message}") + self.data_queue.put(received_message) + except asyncio.exceptions.IncompleteReadError as e: + logger.warning(f"Ignoring message: No separator symbol found: {e}") + break + except asyncio.LimitOverrunError: + logger.error(f"Message size exceeded, separator symbol not found") break - received_message = data.decode() - logger.info(f"Received message:\n ⤷ {received_message}") - self.data_queue.put(received_message) + except Exception as e: + logger.error(f"Unexpected error: {e}") + raise def get_next_logline(self) -> str | None: """ diff --git a/src/mock/generator.py b/src/mock/generator.py index 6e93f77..97f2f37 100644 --- a/src/mock/generator.py +++ b/src/mock/generator.py @@ -4,13 +4,11 @@ import time sys.path.append(os.getcwd()) -from src.base.log_config import setup_logging from src.mock.log_generator import generate_dns_log_line from src.base.log_config import get_logger logger = get_logger() - if __name__ == "__main__": with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket: client_socket.connect((str("127.0.0.1"), 9998)) diff --git a/tests/test_server.py b/tests/test_server.py index 4e6c626..4ff3098 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -352,7 +352,9 @@ async def test_receive_logline(self, mock_logger): server_instance = LogServer() server_instance.data_queue = data_queue - reader.read = AsyncMock(side_effect=[b"Test message 1", b"Test message 2", b""]) + reader.readuntil = AsyncMock( + side_effect=[b"Test message 1\n", b"Test message 2\n", b""] + ) receive_task = asyncio.create_task(server_instance.receive_logline(reader)) await receive_task @@ -362,6 +364,45 @@ async def test_receive_logline(self, mock_logger): self.assertEqual(data_queue.put.call_count, 2) + @patch("src.logserver.server.logger") + async def test_receive_without_separator(self, mock_logger): + reader = AsyncMock() + data_queue = MagicMock() + server_instance = LogServer() + server_instance.data_queue = data_queue + + reader.readuntil = AsyncMock( + side_effect=asyncio.exceptions.IncompleteReadError(b"", 100) + ) + + # noinspection PyAsyncCall + asyncio.create_task(server_instance.receive_logline(reader)) + + @patch("src.logserver.server.logger") + async def test_receive_too_long(self, mock_logger): + reader = AsyncMock() + data_queue = MagicMock() + server_instance = LogServer() + server_instance.data_queue = data_queue + + reader.readuntil = AsyncMock(side_effect=asyncio.LimitOverrunError("", 1)) + + # noinspection PyAsyncCall + asyncio.create_task(server_instance.receive_logline(reader)) + + @patch("src.logserver.server.logger") + async def test_receive_raise_other_exception(self, mock_logger): + reader = AsyncMock() + data_queue = MagicMock() + server_instance = LogServer() + server_instance.data_queue = data_queue + + reader.readuntil = AsyncMock(side_effect=ValueError("Something went wrong")) + + with self.assertRaises(ValueError): + task = asyncio.create_task(server_instance.receive_logline(reader)) + await task + class TestGetNextLogline(unittest.TestCase): def test_valid(self):