-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
122 lines (97 loc) · 3.51 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
from config.constants import (
MESSAGES_SAMPLES_DIR,
MESSAGES_FLUSH_INTERVAL,
)
from telebot.asyncio_helper import session_manager
from loguru import logger
from commands import COMMANDS
from config.keys import ENV_TYPE, HOST, PORT, WEBHOOK_URL_BASE, WEBHOOK_URL_PATH
from config.app import app
from config.commands import set_bot_commands
from utils import cancel_tasks, load_commands, load_middlewares
from lib.messages import MessagesStorage
from lib.db.config import engine, Base
from config.bot import bot, MESSAGES
from uvicorn import Config, Server
import aioschedule
import asyncio
import sys
import os
from uvicorn.main import Server
# Create folder for messages models if it not exists
if not os.path.exists(MESSAGES_SAMPLES_DIR):
os.makedirs(MESSAGES_SAMPLES_DIR)
class AppStatus:
should_exit = False
@staticmethod
def handle_exit(*args, **kwargs):
AppStatus.should_exit = True
original_handler(*args, **kwargs)
# Monkey patch exit handler for `uvicorn` server
original_handler = Server.handle_exit
Server.handle_exit = AppStatus.handle_exit
# Add middlewares dynamically
load_middlewares()
# Add commands dynamically
load_commands()
async def flush_messages():
"""
Flushes `MESSAGES` dict to the storage
"""
if len(MESSAGES) != 0:
logger.info(f"Flushing messages into storage: {MESSAGES}")
for chat_id, messages in MESSAGES.items():
storage = MessagesStorage(chat_id)
await storage.push(messages)
MESSAGES.clear()
async def scheduler():
"""
Runs `aioschedule` tasks in specified interval
"""
while AppStatus.should_exit is False:
await aioschedule.run_pending()
await asyncio.sleep(0.1)
async def db_setup():
"""
Sets up DB tables
"""
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
async def main():
logger.info(f"Starting bot with ENV_TYPE: {ENV_TYPE}")
logger.info("Started Telegram bot and async schedulers")
logger.info(f"Started FastAPI webhooks service on port {PORT}")
config = Config(app=app, loop=asyncio.get_event_loop(), port=PORT, host=HOST)
server = Server(config)
aioschedule.every(MESSAGES_FLUSH_INTERVAL).seconds.do(flush_messages).tag(0)
current_webhook = await bot.get_webhook_info()
if current_webhook.url != WEBHOOK_URL_BASE + WEBHOOK_URL_PATH:
await bot.remove_webhook()
await bot.set_webhook(url=WEBHOOK_URL_BASE + WEBHOOK_URL_PATH)
await asyncio.gather(
session_manager.session.close(), # Fixes aiohttp warning for unclosed session
db_setup(),
set_bot_commands(COMMANDS),
server.serve(),
scheduler(),
)
if __name__ == "__main__":
loop = asyncio.new_event_loop()
main_task = loop.create_task(main())
try:
asyncio.set_event_loop(loop)
loop.run_until_complete(main_task)
except KeyboardInterrupt:
pass
except Exception as e:
logger.error("Caught exception: " + e)
finally:
logger.info("Stopping bot by user request...")
cancel_tasks({main_task, *asyncio.all_tasks(loop)}, loop)
loop.run_until_complete(loop.shutdown_asyncgens())
loop.run_until_complete(session_manager.session.close())
loop.stop()
loop.close()
asyncio.set_event_loop(None)
sys.exit(0)