Skip to content

Commit

Permalink
Merge pull request #9 from guyshe/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
guyshe authored May 6, 2022
2 parents 96decad + f79b7bb commit 19b8807
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 46 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
long_description = f.read()

setup(name='telegram-handler',
version='1.4.1',
version='1.4.3',
description='async telegram handler',
author='Guyshe',
url='https://github.com/guyshe/telegram_handler',
Expand Down
10 changes: 6 additions & 4 deletions telegram_handler/consts.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
FLUSH_INTERVAL = 0.5
FLUSH_INTERVAL = 5

API_HOST = 'api.telegram.org'
API_URL = f'https://{API_HOST}/bot{{bot_token}}/sendMessage?' \
f'chat_id=@{{channel_name}}&parse_mode=HTML'
RETRY_COOLDOWN_TIME = 3
RETRY_COOLDOWN_TIME = 60
MAX_RETRYS = 20
RETRY_BACKOFF_TIME = 5
# max valid size is 4096, we take buffer to be on the safe side
MAX_MESSAGE_SIZE = 3800
MAX_TELEGRAM_MESSAGES = 300
MAX_MESSAGE_SIZE = 4000
TOO_MANY_REQUESTS = 429
MAX_BUFFER_SIZE = 10**16
67 changes: 26 additions & 41 deletions telegram_handler/handler.py
Original file line number Diff line number Diff line change
@@ -1,76 +1,61 @@
from distutils.log import Log
import logging
from time import sleep
from threading import Thread, RLock
from queue import Empty, Queue
import requests
from threading import Thread, RLock
from retry import retry
from telegram_handler.consts import API_URL, RETRY_COOLDOWN_TIME, MAX_RETRYS, MAX_MESSAGE_SIZE, FLUSH_INTERVAL
from telegram_handler.buffer import Buffer
from telegram_handler.consts import API_URL, RETRY_COOLDOWN_TIME, MAX_RETRYS, \
MAX_MESSAGE_SIZE, FLUSH_INTERVAL, RETRY_BACKOFF_TIME, MAX_BUFFER_SIZE

logger = logging.getLogger(__name__)


class TelegramLoggingHandler(logging.Handler):

_sentinel = None

def __init__(self, bot_token, channel_name, level=logging.NOTSET):
super().__init__(level)
self.bot_token = bot_token
self.channel_name = channel_name
self._buffer_lock = RLock()
self._buffer = ''
self.telegram_messages_queue = Queue()
self._buffer = Buffer(MAX_BUFFER_SIZE)
self._stop_signal = RLock()
self._writer_thread = None
self._start_writer_thread()

@retry(requests.RequestException,
@retry(requests.exceptions.RequestException,
tries=MAX_RETRYS,
delay=RETRY_COOLDOWN_TIME,
backoff=RETRY_BACKOFF_TIME,
logger=logger)
def write(self, message):
url = API_URL.format(bot_token=self.bot_token,
channel_name=self.channel_name)
requests.post(url, data={'text': message}).raise_for_status()
response = requests.post(url, data={'text': message})

response.raise_for_status()
if response.status_code == requests.codes.too_many_requests:
raise requests.exceptions.RequestException("Too many requests")

def emit(self, record: logging.LogRecord) -> None:
message = self.format(record)
self._buffer_lock.acquire()
new_buffer = f'{self._buffer}\n{message}'
if len(new_buffer) > MAX_MESSAGE_SIZE:
self._flush_buffer()
new_buffer = message[:MAX_MESSAGE_SIZE]
self._buffer = new_buffer
self._buffer_lock.release()
self._buffer.write(message)

def close(self):
self._flush_buffer()
self.telegram_messages_queue.put(TelegramLoggingHandler._sentinel)
self.telegram_messages_queue.join()
self._writer_thread.join()

def _flush_buffer(self):
self._buffer_lock.acquire()
# Avoid unnecessary flushing
if self._buffer != '':
self.telegram_messages_queue.put(self._buffer[:MAX_MESSAGE_SIZE])
self._buffer = ''
self._buffer_lock.release()
with self._stop_signal:
self._writer_thread.join()

def _write_manager(self):
q = self.telegram_messages_queue
while True:
try:
message = q.get(timeout=FLUSH_INTERVAL)
if message is self._sentinel:
q.task_done()
break
# as long as we can aquire the lock, we can continue
lock_status = self._stop_signal.acquire(blocking=False)
if not lock_status:
break
else:
self._stop_signal.release()

sleep(FLUSH_INTERVAL)
message = self._buffer.read(MAX_MESSAGE_SIZE)
if message != '':
self.write(message)
q.task_done()
except Empty:
self._flush_buffer()
except Exception:
logging.exception('Got exception while handling log')

def _start_writer_thread(self):
self._writer_thread = Thread(target=self._write_manager)
Expand Down

0 comments on commit 19b8807

Please sign in to comment.