Skip to content

Commit

Permalink
Run thread to flush buffered events every N.
Browse files Browse the repository at this point in the history
  • Loading branch information
kirillzhosul committed Dec 5, 2022
1 parent 03caded commit 69fe52b
Showing 1 changed file with 45 additions and 0 deletions.
45 changes: 45 additions & 0 deletions gatey_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
"""

import atexit
from threading import Thread
from time import sleep
from typing import Callable, Union, Dict, List, Optional, Any

# Utils.
Expand Down Expand Up @@ -40,6 +42,7 @@ class _Client:

# Events data queue that waiting for being sent to server.
_events_buffer: List[Dict[str, Any]] = []
_events_buffer_flush_thread: Optional[Thread] = None

# Settings.

Expand All @@ -53,6 +56,7 @@ class _Client:
# Buffering settings for bulk sending.
buffer_events_for_bulk_sending = None
buffer_events_max_capacity = 0
buffer_events_flush_every = 10.0

# Include event data.
include_runtime_info = True
Expand All @@ -70,6 +74,7 @@ def __init__(
global_handler_skip_internal_exceptions: bool = True,
buffer_events_for_bulk_sending: bool = False,
buffer_events_max_capacity: int = 3,
buffer_events_flush_every: float = 10.0,
handle_global_exceptions: bool = False,
include_runtime_info: bool = True,
include_platform_info: bool = True,
Expand Down Expand Up @@ -119,6 +124,7 @@ def __init__(
self.exceptions_capture_code_context = exceptions_capture_code_context
self.buffer_events_for_bulk_sending = buffer_events_for_bulk_sending
self.buffer_events_max_capacity = buffer_events_max_capacity
self.buffer_events_flush_every = buffer_events_flush_every
self.include_runtime_info = include_runtime_info
self.include_platform_info = include_platform_info
self.include_sdk_info = include_sdk_info
Expand All @@ -142,6 +148,10 @@ def __init__(
skip_internal_exceptions=global_handler_skip_internal_exceptions,
)

# Start flush thread for bulk sending after timeout (every N).
if self.buffer_events_for_bulk_sending:
self._ensure_running_buffer_flush_thread()

def catch(
self,
*,
Expand Down Expand Up @@ -299,6 +309,41 @@ def _build_default_tags_context(
default_tags.update(foreign_tags)
return default_tags

def _ensure_running_buffer_flush_thread(self) -> Thread:
"""
Runs buffer flush thread is it not running, and returns thread.
Flush thread is used to send events buffer after some time, not causing to wait core application
for terminate or new events that will trigger bulk sending (buffer flush).
"""

if (
isinstance(self._events_buffer_flush_thread, Thread)
and self._events_buffer_flush_thread.is_alive()
):
# If thread is currently alive - there is no need to create new thread by removing old reference.
return self._events_buffer_flush_thread

# Thread.
self._events_buffer_flush_thread = Thread(
target=self._buffer_flush_thread_target,
args=(),
name="sentry_sdk.events_buffer.flusher",
)

# Mark thread as daemon (which is required for graceful main thread termination) and start.
self._events_buffer_flush_thread.daemon = True
self._events_buffer_flush_thread.start()

return self._events_buffer_flush_thread

def _buffer_flush_thread_target(self) -> None:
"""
Thread target for events buffer flusher.
"""
while True:
sleep(self.buffer_events_flush_every)
self.bulk_send_buffered_events()

def _on_catch_exception_hook(self, exception):
"""
Hook that will be called when catched exception.
Expand Down

0 comments on commit 69fe52b

Please sign in to comment.