Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix server receiving with separator #49

Merged
merged 6 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/build_test_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [ "macos-12" ]
os: [ "macos-14" ]
python-version: [ "3.11",
"3.10",
]
Expand All @@ -32,6 +32,7 @@ jobs:

- name: Install requirements
run: |
brew install libomp
python -m pip install --upgrade pip
python -m pip install pyyaml
python -m pip install -r requirements/requirements.dev.txt -r requirements/requirements.detector.txt -r requirements/requirements.logcollector.txt -r requirements/requirements.prefilter.txt -r requirements/requirements.inspector.txt -r requirements/requirements.logserver.txt
Expand Down
1 change: 1 addition & 0 deletions docker/docker-compose.kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ services:
networks:
heidgaf:
ipv4_address: 172.27.0.2
restart: "unless-stopped"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
Expand Down
29 changes: 20 additions & 9 deletions src/logserver/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@

class LogServer:
"""
Server for receiving, storing and sending single log lines. Opens a port for receiving messages, listens for log
lines via Kafka and reads newly added lines from an input file. To retrieve a log line from the server,
other modules can connect to its outgoing/sending port. The server will then send its oldest log line as a response.
Server for receiving, storing and sending single log lines. Opens a port for receiving messages, listens for
messages via Kafka and reads newly added lines from an input file. To retrieve a message from the server,
other modules can connect to its outgoing/sending port. The server will then send its oldest message as a response.
"""

def __init__(self) -> None:
Expand Down Expand Up @@ -217,18 +217,29 @@ async def send_logline(writer, logline) -> None:

async def receive_logline(self, reader) -> None:
"""
Receives a log line encoded as UTF-8 from the connected component and adds it to the data queue.
Receives one or multiple log lines encoded as UTF-8 separated by and ending with separator '\n' from the
connected component and adds it or them to the data queue. Message must end with separator symbol.

Args:
reader: Responsible for reading incoming data
"""
while True:
data = await reader.read(1024)
if not data:
try:
data = await reader.readuntil(separator=b"\n")
if not data:
break
received_message = data.decode().strip()
logger.info(f"Received message:\n ⤷ {received_message}")
self.data_queue.put(received_message)
except asyncio.exceptions.IncompleteReadError as e:
logger.warning(f"Ignoring message: No separator symbol found: {e}")
break
except asyncio.LimitOverrunError:
logger.error(f"Message size exceeded, separator symbol not found")
break
received_message = data.decode()
logger.info(f"Received message:\n ⤷ {received_message}")
self.data_queue.put(received_message)
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise

def get_next_logline(self) -> str | None:
"""
Expand Down
2 changes: 0 additions & 2 deletions src/mock/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@
import time

sys.path.append(os.getcwd())
from src.base.log_config import setup_logging
from src.mock.log_generator import generate_dns_log_line
from src.base.log_config import get_logger

logger = get_logger()


if __name__ == "__main__":
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket:
client_socket.connect((str("127.0.0.1"), 9998))
Expand Down
43 changes: 42 additions & 1 deletion tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,9 @@ async def test_receive_logline(self, mock_logger):
server_instance = LogServer()
server_instance.data_queue = data_queue

reader.read = AsyncMock(side_effect=[b"Test message 1", b"Test message 2", b""])
reader.readuntil = AsyncMock(
side_effect=[b"Test message 1\n", b"Test message 2\n", b""]
)

receive_task = asyncio.create_task(server_instance.receive_logline(reader))
await receive_task
Expand All @@ -362,6 +364,45 @@ async def test_receive_logline(self, mock_logger):

self.assertEqual(data_queue.put.call_count, 2)

@patch("src.logserver.server.logger")
async def test_receive_without_separator(self, mock_logger):
reader = AsyncMock()
data_queue = MagicMock()
server_instance = LogServer()
server_instance.data_queue = data_queue

reader.readuntil = AsyncMock(
side_effect=asyncio.exceptions.IncompleteReadError(b"", 100)
)

# noinspection PyAsyncCall
asyncio.create_task(server_instance.receive_logline(reader))

@patch("src.logserver.server.logger")
async def test_receive_too_long(self, mock_logger):
reader = AsyncMock()
data_queue = MagicMock()
server_instance = LogServer()
server_instance.data_queue = data_queue

reader.readuntil = AsyncMock(side_effect=asyncio.LimitOverrunError("", 1))

# noinspection PyAsyncCall
asyncio.create_task(server_instance.receive_logline(reader))

@patch("src.logserver.server.logger")
async def test_receive_raise_other_exception(self, mock_logger):
reader = AsyncMock()
data_queue = MagicMock()
server_instance = LogServer()
server_instance.data_queue = data_queue

reader.readuntil = AsyncMock(side_effect=ValueError("Something went wrong"))

with self.assertRaises(ValueError):
task = asyncio.create_task(server_instance.receive_logline(reader))
await task


class TestGetNextLogline(unittest.TestCase):
def test_valid(self):
Expand Down
Loading