From 69fe52be2971766a33cf9989b78ca283efa289d8 Mon Sep 17 00:00:00 2001 From: Kirill Zhosul <79853674+kirillzhosul@users.noreply.github.com> Date: Mon, 5 Dec 2022 09:19:41 +0400 Subject: [PATCH] Run thread to flush buffered events every N. --- gatey_sdk/client.py | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/gatey_sdk/client.py b/gatey_sdk/client.py index 979ff4d..e0a7aca 100644 --- a/gatey_sdk/client.py +++ b/gatey_sdk/client.py @@ -4,6 +4,8 @@ """ import atexit +from threading import Thread +from time import sleep from typing import Callable, Union, Dict, List, Optional, Any # Utils. @@ -40,6 +42,7 @@ class _Client: # Events data queue that waiting for being sent to server. _events_buffer: List[Dict[str, Any]] = [] + _events_buffer_flush_thread: Optional[Thread] = None # Settings. @@ -53,6 +56,7 @@ class _Client: # Buffering settings for bulk sending. buffer_events_for_bulk_sending = None buffer_events_max_capacity = 0 + buffer_events_flush_every = 10.0 # Include event data. include_runtime_info = True @@ -70,6 +74,7 @@ def __init__( global_handler_skip_internal_exceptions: bool = True, buffer_events_for_bulk_sending: bool = False, buffer_events_max_capacity: int = 3, + buffer_events_flush_every: float = 10.0, handle_global_exceptions: bool = False, include_runtime_info: bool = True, include_platform_info: bool = True, @@ -119,6 +124,7 @@ def __init__( self.exceptions_capture_code_context = exceptions_capture_code_context self.buffer_events_for_bulk_sending = buffer_events_for_bulk_sending self.buffer_events_max_capacity = buffer_events_max_capacity + self.buffer_events_flush_every = buffer_events_flush_every self.include_runtime_info = include_runtime_info self.include_platform_info = include_platform_info self.include_sdk_info = include_sdk_info @@ -142,6 +148,10 @@ def __init__( skip_internal_exceptions=global_handler_skip_internal_exceptions, ) + # Start flush thread for bulk sending after timeout (every N). + if self.buffer_events_for_bulk_sending: + self._ensure_running_buffer_flush_thread() + def catch( self, *, @@ -299,6 +309,41 @@ def _build_default_tags_context( default_tags.update(foreign_tags) return default_tags + def _ensure_running_buffer_flush_thread(self) -> Thread: + """ + Runs buffer flush thread is it not running, and returns thread. + Flush thread is used to send events buffer after some time, not causing to wait core application + for terminate or new events that will trigger bulk sending (buffer flush). + """ + + if ( + isinstance(self._events_buffer_flush_thread, Thread) + and self._events_buffer_flush_thread.is_alive() + ): + # If thread is currently alive - there is no need to create new thread by removing old reference. + return self._events_buffer_flush_thread + + # Thread. + self._events_buffer_flush_thread = Thread( + target=self._buffer_flush_thread_target, + args=(), + name="sentry_sdk.events_buffer.flusher", + ) + + # Mark thread as daemon (which is required for graceful main thread termination) and start. + self._events_buffer_flush_thread.daemon = True + self._events_buffer_flush_thread.start() + + return self._events_buffer_flush_thread + + def _buffer_flush_thread_target(self) -> None: + """ + Thread target for events buffer flusher. + """ + while True: + sleep(self.buffer_events_flush_every) + self.bulk_send_buffered_events() + def _on_catch_exception_hook(self, exception): """ Hook that will be called when catched exception.