diff --git a/src/schematic/event_buffer.py b/src/schematic/event_buffer.py index 9e62c37..0c60c0d 100644 --- a/src/schematic/event_buffer.py +++ b/src/schematic/event_buffer.py @@ -6,7 +6,7 @@ from .events.client import AsyncEventsClient, EventsClient from .types import CreateEventRequestBody -DEFAULT_BUFFER_MAX_SIZE = 10 * 1024 # 10KB +DEFAULT_MAX_EVENTS = 100 # Default maximum number of events DEFAULT_EVENT_BUFFER_PERIOD = 5 # 5 seconds @@ -16,13 +16,13 @@ def __init__( events_api: EventsClient, logger: logging.Logger, period: Optional[int] = None, + max_events: int = DEFAULT_MAX_EVENTS, ): - self.current_size = 0 self.events: List[CreateEventRequestBody] = [] self.events_api = events_api self.interval = period or DEFAULT_EVENT_BUFFER_PERIOD self.logger = logger - self.max_size = DEFAULT_BUFFER_MAX_SIZE + self.max_events = max_events self.flush_lock = threading.Lock() self.push_lock = threading.Lock() self.shutdown = threading.Event() @@ -45,7 +45,6 @@ def _flush(self): self.logger.error(e) self.events.clear() - self.current_size = 0 def _periodic_flush(self): while not self.shutdown.is_set(): @@ -58,12 +57,10 @@ def push(self, event: CreateEventRequestBody): return with self.push_lock: - event_size = event.__sizeof__() - if self.current_size + event_size > self.max_size: + if len(self.events) >= self.max_events: self._flush() self.events.append(event) - self.current_size += event_size def stop(self): try: @@ -80,13 +77,13 @@ def __init__( events_api: AsyncEventsClient, logger: logging.Logger, period: Optional[int] = None, + max_events: int = DEFAULT_MAX_EVENTS, ): - self.current_size = 0 self.events: List[CreateEventRequestBody] = [] self.events_api = events_api self.interval = period or DEFAULT_EVENT_BUFFER_PERIOD self.logger = logger - self.max_size = DEFAULT_BUFFER_MAX_SIZE + self.max_events = max_events self.shutdown_event = asyncio.Event() self.stopped = False self.flush_lock = asyncio.Lock() @@ -107,7 +104,6 @@ async def _flush(self): self.logger.error(e) self.events.clear() - self.current_size = 0 async def _periodic_flush(self): while not self.shutdown_event.is_set(): @@ -125,12 +121,10 @@ async def push(self, event: CreateEventRequestBody): return async with self.push_lock: - event_size = event.__sizeof__() - if self.current_size + event_size > self.max_size: + if len(self.events) >= self.max_events: await self._flush() self.events.append(event) - self.current_size += event_size async def stop(self): try: diff --git a/tests/custom/test_event_buffer.py b/tests/custom/test_event_buffer.py index eab265a..9449095 100644 --- a/tests/custom/test_event_buffer.py +++ b/tests/custom/test_event_buffer.py @@ -12,37 +12,38 @@ class TestEventBuffer(unittest.TestCase): def setUp(self): self.mock_api = MagicMock() self.mock_logger = MagicMock() - self.event_buffer = EventBuffer(events_api=self.mock_api, logger=self.mock_logger, period=1) + self.event_buffer = EventBuffer( + events_api=self.mock_api, logger=self.mock_logger, period=1, max_events=5 + ) def tearDown(self): self.event_buffer.stop() def test_push_event(self): event = MagicMock(spec=CreateEventRequestBody) - event.__sizeof__ = MagicMock(return_value=100) self.event_buffer.push(event) self.assertEqual(len(self.event_buffer.events), 1) - self.assertEqual(self.event_buffer.current_size, 100) - def test_push_event_exceeding_max_size(self): - self.event_buffer.max_size = 50 + def test_push_event_exceeding_max_events(self): event = MagicMock(spec=CreateEventRequestBody) - event.__sizeof__ = MagicMock(return_value=60) with patch.object(self.event_buffer, "_flush") as mock_flush: + for _ in range(5): + self.event_buffer.push(event) + self.assertEqual(len(self.event_buffer.events), 5) + + # Pushing one more event should trigger a flush self.event_buffer.push(event) mock_flush.assert_called_once() def test_flush(self): event = MagicMock(spec=CreateEventRequestBody) - event.__sizeof__ = MagicMock(return_value=100) self.event_buffer.events = [event] self.event_buffer._flush() self.mock_api.create_event_batch.assert_called_once_with(events=[event]) self.assertEqual(len(self.event_buffer.events), 0) - self.assertEqual(self.event_buffer.current_size, 0) def test_stop(self): with patch.object(self.event_buffer.flush_thread, "join"): @@ -58,37 +59,36 @@ async def setup_and_teardown(self): self.mock_api = MagicMock() self.mock_logger = MagicMock() self.async_event_buffer = AsyncEventBuffer( - events_api=self.mock_api, logger=self.mock_logger, period=1 + events_api=self.mock_api, logger=self.mock_logger, period=1, max_events=5 ) yield await self.async_event_buffer.stop() async def test_push_event(self): event = MagicMock(spec=CreateEventRequestBody) - event.__sizeof__ = MagicMock(return_value=100) await self.async_event_buffer.push(event) assert len(self.async_event_buffer.events) == 1 - assert self.async_event_buffer.current_size == 100 - async def test_push_event_exceeding_max_size(self): - self.async_event_buffer.max_size = 50 + async def test_push_event_exceeding_max_events(self): event = MagicMock(spec=CreateEventRequestBody) - event.__sizeof__ = MagicMock(return_value=60) with patch.object(self.async_event_buffer, "_flush") as mock_flush: + for _ in range(5): + await self.async_event_buffer.push(event) + assert len(self.async_event_buffer.events) == 5 + + # Pushing one more event should trigger a flush await self.async_event_buffer.push(event) mock_flush.assert_called_once() async def test_flush(self): event = MagicMock(spec=CreateEventRequestBody) - event.__sizeof__ = MagicMock(return_value=100) self.async_event_buffer.events = [event] await self.async_event_buffer._flush() self.mock_api.create_event_batch.assert_called_once_with(events=[event]) assert len(self.async_event_buffer.events) == 0 - assert self.async_event_buffer.current_size == 0 async def test_stop(self): with patch.object(self.async_event_buffer.flush_task, "cancel"):