Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event buffer size based on number of events #14

Merged
merged 1 commit into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 7 additions & 13 deletions src/schematic/event_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from .events.client import AsyncEventsClient, EventsClient
from .types import CreateEventRequestBody

DEFAULT_BUFFER_MAX_SIZE = 10 * 1024 # 10KB
DEFAULT_MAX_EVENTS = 100 # Default maximum number of events
DEFAULT_EVENT_BUFFER_PERIOD = 5 # 5 seconds


Expand All @@ -16,13 +16,13 @@ def __init__(
events_api: EventsClient,
logger: logging.Logger,
period: Optional[int] = None,
max_events: int = DEFAULT_MAX_EVENTS,
):
self.current_size = 0
self.events: List[CreateEventRequestBody] = []
self.events_api = events_api
self.interval = period or DEFAULT_EVENT_BUFFER_PERIOD
self.logger = logger
self.max_size = DEFAULT_BUFFER_MAX_SIZE
self.max_events = max_events
self.flush_lock = threading.Lock()
self.push_lock = threading.Lock()
self.shutdown = threading.Event()
Expand All @@ -45,7 +45,6 @@ def _flush(self):
self.logger.error(e)

self.events.clear()
self.current_size = 0

def _periodic_flush(self):
while not self.shutdown.is_set():
Expand All @@ -58,12 +57,10 @@ def push(self, event: CreateEventRequestBody):
return

with self.push_lock:
event_size = event.__sizeof__()
if self.current_size + event_size > self.max_size:
if len(self.events) >= self.max_events:
self._flush()

self.events.append(event)
self.current_size += event_size

def stop(self):
try:
Expand All @@ -80,13 +77,13 @@ def __init__(
events_api: AsyncEventsClient,
logger: logging.Logger,
period: Optional[int] = None,
max_events: int = DEFAULT_MAX_EVENTS,
):
self.current_size = 0
self.events: List[CreateEventRequestBody] = []
self.events_api = events_api
self.interval = period or DEFAULT_EVENT_BUFFER_PERIOD
self.logger = logger
self.max_size = DEFAULT_BUFFER_MAX_SIZE
self.max_events = max_events
self.shutdown_event = asyncio.Event()
self.stopped = False
self.flush_lock = asyncio.Lock()
Expand All @@ -107,7 +104,6 @@ async def _flush(self):
self.logger.error(e)

self.events.clear()
self.current_size = 0

async def _periodic_flush(self):
while not self.shutdown_event.is_set():
Expand All @@ -125,12 +121,10 @@ async def push(self, event: CreateEventRequestBody):
return

async with self.push_lock:
event_size = event.__sizeof__()
if self.current_size + event_size > self.max_size:
if len(self.events) >= self.max_events:
await self._flush()

self.events.append(event)
self.current_size += event_size

async def stop(self):
try:
Expand Down
32 changes: 16 additions & 16 deletions tests/custom/test_event_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,37 +12,38 @@ class TestEventBuffer(unittest.TestCase):
def setUp(self):
self.mock_api = MagicMock()
self.mock_logger = MagicMock()
self.event_buffer = EventBuffer(events_api=self.mock_api, logger=self.mock_logger, period=1)
self.event_buffer = EventBuffer(
events_api=self.mock_api, logger=self.mock_logger, period=1, max_events=5
)

def tearDown(self):
self.event_buffer.stop()

def test_push_event(self):
event = MagicMock(spec=CreateEventRequestBody)
event.__sizeof__ = MagicMock(return_value=100)

self.event_buffer.push(event)
self.assertEqual(len(self.event_buffer.events), 1)
self.assertEqual(self.event_buffer.current_size, 100)

def test_push_event_exceeding_max_size(self):
self.event_buffer.max_size = 50
def test_push_event_exceeding_max_events(self):
event = MagicMock(spec=CreateEventRequestBody)
event.__sizeof__ = MagicMock(return_value=60)

with patch.object(self.event_buffer, "_flush") as mock_flush:
for _ in range(5):
self.event_buffer.push(event)
self.assertEqual(len(self.event_buffer.events), 5)

# Pushing one more event should trigger a flush
self.event_buffer.push(event)
mock_flush.assert_called_once()

def test_flush(self):
event = MagicMock(spec=CreateEventRequestBody)
event.__sizeof__ = MagicMock(return_value=100)
self.event_buffer.events = [event]

self.event_buffer._flush()
self.mock_api.create_event_batch.assert_called_once_with(events=[event])
self.assertEqual(len(self.event_buffer.events), 0)
self.assertEqual(self.event_buffer.current_size, 0)

def test_stop(self):
with patch.object(self.event_buffer.flush_thread, "join"):
Expand All @@ -58,37 +59,36 @@ async def setup_and_teardown(self):
self.mock_api = MagicMock()
self.mock_logger = MagicMock()
self.async_event_buffer = AsyncEventBuffer(
events_api=self.mock_api, logger=self.mock_logger, period=1
events_api=self.mock_api, logger=self.mock_logger, period=1, max_events=5
)
yield
await self.async_event_buffer.stop()

async def test_push_event(self):
event = MagicMock(spec=CreateEventRequestBody)
event.__sizeof__ = MagicMock(return_value=100)

await self.async_event_buffer.push(event)
assert len(self.async_event_buffer.events) == 1
assert self.async_event_buffer.current_size == 100

async def test_push_event_exceeding_max_size(self):
self.async_event_buffer.max_size = 50
async def test_push_event_exceeding_max_events(self):
event = MagicMock(spec=CreateEventRequestBody)
event.__sizeof__ = MagicMock(return_value=60)

with patch.object(self.async_event_buffer, "_flush") as mock_flush:
for _ in range(5):
await self.async_event_buffer.push(event)
assert len(self.async_event_buffer.events) == 5

# Pushing one more event should trigger a flush
await self.async_event_buffer.push(event)
mock_flush.assert_called_once()

async def test_flush(self):
event = MagicMock(spec=CreateEventRequestBody)
event.__sizeof__ = MagicMock(return_value=100)
self.async_event_buffer.events = [event]

await self.async_event_buffer._flush()
self.mock_api.create_event_batch.assert_called_once_with(events=[event])
assert len(self.async_event_buffer.events) == 0
assert self.async_event_buffer.current_size == 0

async def test_stop(self):
with patch.object(self.async_event_buffer.flush_task, "cancel"):
Expand Down
Loading