Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions app/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@


class LogInterceptor(io.TextIOWrapper):
# Maximum logs to buffer between flushes to prevent unbounded memory growth
# if callbacks persistently fail. 10000 entries is ~2-5MB depending on message size.
MAX_PENDING_LOGS = 10000

def __init__(self, stream, *args, **kwargs):
buffer = stream.buffer
encoding = stream.encoding
Expand All @@ -23,6 +27,9 @@ def write(self, data):
entry = {"t": datetime.now().isoformat(), "m": data}
with self._lock:
self._logs_since_flush.append(entry)
# Enforce max size to prevent OOM if callbacks persistently fail
if len(self._logs_since_flush) > self.MAX_PENDING_LOGS:
self._logs_since_flush = self._logs_since_flush[-self.MAX_PENDING_LOGS:]

# Simple handling for cr to overwrite the last output if it isnt a full line
# else logs just get full of progress messages
Expand All @@ -32,10 +39,21 @@ def write(self, data):
super().write(data)

def flush(self):
super().flush()
try:
super().flush()
except OSError as e:
# errno 22 (EINVAL) can occur on Windows with piped/redirected streams
# This is safe to ignore as write() already succeeded
if e.errno != 22:
raise
if not self._logs_since_flush:
return
# Copy to prevent callback mutations from affecting retry on failure
logs_to_send = list(self._logs_since_flush)
for cb in self._flush_callbacks:
cb(self._logs_since_flush)
self._logs_since_flush = []
cb(logs_to_send)
# Only clear after all callbacks succeed - if any raises, logs remain for retry
self._logs_since_flush = []

def on_flush(self, callback):
self._flush_callbacks.append(callback)
Expand Down
321 changes: 321 additions & 0 deletions tests-unit/app_test/test_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,321 @@
"""Tests for the logger module, specifically LogInterceptor."""

import io
import pytest
from unittest.mock import MagicMock


class TestLogInterceptorFlush:
"""Test that LogInterceptor.flush() handles OSError gracefully."""

def test_flush_handles_errno_22(self):
"""Test that flush() catches OSError with errno 22 and still executes callbacks."""
# We can't easily mock the parent flush, so we test the behavior by
# creating a LogInterceptor and verifying the flush method exists
# with the try-except structure.

# Read the source to verify the fix is in place
import inspect
from app.logger import LogInterceptor

source = inspect.getsource(LogInterceptor.flush)

# Verify the try-except structure is present
assert 'try:' in source
assert 'super().flush()' in source
assert 'except OSError as e:' in source
assert 'e.errno != 22' in source or 'e.errno == 22' in source

def test_flush_callback_execution(self):
"""Test that flush callbacks are executed."""
from app.logger import LogInterceptor

# Create a proper stream for LogInterceptor
import sys

# Use a StringIO-based approach with a real buffer
class MockStream:
def __init__(self):
self._buffer = io.BytesIO()
self.encoding = 'utf-8'
self.line_buffering = False

@property
def buffer(self):
return self._buffer

mock_stream = MockStream()
interceptor = LogInterceptor(mock_stream)

# Register a callback
callback_results = []
interceptor.on_flush(lambda logs: callback_results.append(len(logs)))

# Add some logs
interceptor._logs_since_flush = [
{"t": "test", "m": "message1"},
{"t": "test", "m": "message2"}
]

# Flush should execute callback
interceptor.flush()

assert len(callback_results) == 1
assert callback_results[0] == 2 # Two log entries

def test_flush_clears_logs_after_callback(self):
"""Test that logs are cleared after flush callbacks."""
from app.logger import LogInterceptor

class MockStream:
def __init__(self):
self._buffer = io.BytesIO()
self.encoding = 'utf-8'
self.line_buffering = False

@property
def buffer(self):
return self._buffer

mock_stream = MockStream()
interceptor = LogInterceptor(mock_stream)

# Add a dummy callback
interceptor.on_flush(lambda logs: None)

# Add some logs
interceptor._logs_since_flush = [{"t": "test", "m": "message"}]

# Flush
interceptor.flush()

# Logs should be cleared
assert interceptor._logs_since_flush == []

def test_flush_multiple_callbacks_receive_same_logs(self):
"""Test that all callbacks receive the same logs, not just the first one."""
from app.logger import LogInterceptor

class MockStream:
def __init__(self):
self._buffer = io.BytesIO()
self.encoding = 'utf-8'
self.line_buffering = False

@property
def buffer(self):
return self._buffer

mock_stream = MockStream()
interceptor = LogInterceptor(mock_stream)

# Register multiple callbacks
callback1_results = []
callback2_results = []
callback3_results = []
interceptor.on_flush(lambda logs: callback1_results.append(len(logs)))
interceptor.on_flush(lambda logs: callback2_results.append(len(logs)))
interceptor.on_flush(lambda logs: callback3_results.append(len(logs)))

# Add some logs
interceptor._logs_since_flush = [
{"t": "test", "m": "message1"},
{"t": "test", "m": "message2"},
{"t": "test", "m": "message3"}
]

# Flush should execute all callbacks with the same logs
interceptor.flush()

# All callbacks should have received 3 log entries
assert callback1_results == [3]
assert callback2_results == [3]
assert callback3_results == [3]

def test_flush_preserves_logs_when_callback_raises(self):
"""Test that logs are preserved for retry if a callback raises an exception."""
from app.logger import LogInterceptor

class MockStream:
def __init__(self):
self._buffer = io.BytesIO()
self.encoding = 'utf-8'
self.line_buffering = False

@property
def buffer(self):
return self._buffer

mock_stream = MockStream()
interceptor = LogInterceptor(mock_stream)

# Register a callback that raises
def raising_callback(logs):
raise ValueError("Callback error")

interceptor.on_flush(raising_callback)

# Add some logs
original_logs = [
{"t": "test", "m": "message1"},
{"t": "test", "m": "message2"}
]
interceptor._logs_since_flush = original_logs.copy()

# Flush should raise
with pytest.raises(ValueError, match="Callback error"):
interceptor.flush()

# Logs should be preserved for retry on next flush
assert interceptor._logs_since_flush == original_logs

def test_flush_protects_logs_from_callback_mutation(self):
"""Test that callback mutations don't affect preserved logs on failure."""
from app.logger import LogInterceptor

class MockStream:
def __init__(self):
self._buffer = io.BytesIO()
self.encoding = 'utf-8'
self.line_buffering = False

@property
def buffer(self):
return self._buffer

mock_stream = MockStream()
interceptor = LogInterceptor(mock_stream)

# First callback mutates the list, second raises
def mutating_callback(logs):
logs.clear() # Mutate the passed list

def raising_callback(logs):
raise ValueError("Callback error")

interceptor.on_flush(mutating_callback)
interceptor.on_flush(raising_callback)

# Add some logs
original_logs = [
{"t": "test", "m": "message1"},
{"t": "test", "m": "message2"}
]
interceptor._logs_since_flush = original_logs.copy()

# Flush should raise
with pytest.raises(ValueError, match="Callback error"):
interceptor.flush()

# Logs should be preserved despite mutation by first callback
assert interceptor._logs_since_flush == original_logs

def test_flush_clears_logs_after_all_callbacks_succeed(self):
"""Test that logs are cleared only after all callbacks execute successfully."""
from app.logger import LogInterceptor

class MockStream:
def __init__(self):
self._buffer = io.BytesIO()
self.encoding = 'utf-8'
self.line_buffering = False

@property
def buffer(self):
return self._buffer

mock_stream = MockStream()
interceptor = LogInterceptor(mock_stream)

# Register multiple callbacks
callback1_results = []
callback2_results = []
interceptor.on_flush(lambda logs: callback1_results.append(len(logs)))
interceptor.on_flush(lambda logs: callback2_results.append(len(logs)))

# Add some logs
interceptor._logs_since_flush = [
{"t": "test", "m": "message1"},
{"t": "test", "m": "message2"}
]

# Flush should succeed
interceptor.flush()

# All callbacks should have executed
assert callback1_results == [2]
assert callback2_results == [2]

# Logs should be cleared after success
assert interceptor._logs_since_flush == []


class TestLogInterceptorWrite:
"""Test that LogInterceptor.write() works correctly."""

def test_write_adds_to_logs(self):
"""Test that write() adds entries to the log buffer."""
from app.logger import LogInterceptor

class MockStream:
def __init__(self):
self._buffer = io.BytesIO()
self.encoding = 'utf-8'
self.line_buffering = False

@property
def buffer(self):
return self._buffer

mock_stream = MockStream()
interceptor = LogInterceptor(mock_stream)

# Initialize the global logs
import app.logger
from collections import deque
app.logger.logs = deque(maxlen=100)

# Write a message
interceptor.write("test message")

# Check that it was added to _logs_since_flush
assert len(interceptor._logs_since_flush) == 1
assert interceptor._logs_since_flush[0]["m"] == "test message"

def test_write_enforces_max_pending_logs(self):
"""Test that write() enforces MAX_PENDING_LOGS to prevent OOM."""
from app.logger import LogInterceptor

class MockStream:
def __init__(self):
self._buffer = io.BytesIO()
self.encoding = 'utf-8'
self.line_buffering = False

@property
def buffer(self):
return self._buffer

mock_stream = MockStream()
interceptor = LogInterceptor(mock_stream)

# Initialize the global logs
import app.logger
from collections import deque
app.logger.logs = deque(maxlen=100)

# Manually set _logs_since_flush to be at the limit
interceptor._logs_since_flush = [
{"t": "test", "m": f"old_message_{i}"}
for i in range(LogInterceptor.MAX_PENDING_LOGS)
]

# Write one more message - should trigger trimming
interceptor.write("new_message")

# Should still be at MAX_PENDING_LOGS, oldest dropped
assert len(interceptor._logs_since_flush) == LogInterceptor.MAX_PENDING_LOGS
# The new message should be at the end
assert interceptor._logs_since_flush[-1]["m"] == "new_message"
# The oldest message should have been dropped (old_message_0)
assert interceptor._logs_since_flush[0]["m"] == "old_message_1"