-
Notifications
You must be signed in to change notification settings - Fork 106
/
Copy pathloading.py
105 lines (80 loc) · 2.46 KB
/
loading.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import asyncio
import logging
import os
from aiogram import Bot, Dispatcher
from aiogram.filters import CommandStart
from aiogram.fsm.state import State, StatesGroup
from aiogram.fsm.storage.memory import MemoryStorage, SimpleEventIsolation
from aiogram.types import CallbackQuery, Message
from aiogram_dialog import (
BaseDialogManager,
Dialog,
DialogManager,
StartMode,
Window,
setup_dialogs,
)
from aiogram_dialog.widgets.kbd import Button
from aiogram_dialog.widgets.link_preview import LinkPreview
from aiogram_dialog.widgets.text import Const, Multi, Progress
API_TOKEN = os.getenv("BOT_TOKEN")
# name input dialog
class Bg(StatesGroup):
progress = State()
async def get_bg_data(dialog_manager: DialogManager, **kwargs):
return {
"progress": dialog_manager.dialog_data.get("progress", 0),
}
bg_dialog = Dialog(
Window(
Multi(
Const("Your click is processing, please wait..."),
Progress("progress", 10),
),
state=Bg.progress,
getter=get_bg_data,
),
)
# main dialog
class MainSG(StatesGroup):
main = State()
async def start_bg(
callback: CallbackQuery,
button: Button,
manager: DialogManager,
):
await manager.start(Bg.progress)
asyncio.create_task(background(callback, manager.bg())) # noqa: RUF006
async def background(callback: CallbackQuery, manager: BaseDialogManager):
count = 10
for i in range(1, count + 1):
await asyncio.sleep(1)
await manager.update({
"progress": i * 100 / count,
})
await asyncio.sleep(1)
await manager.done()
main_menu = Dialog(
Window(
Const("Press button to start processing"),
Button(Const("Start"), id="start", on_click=start_bg),
LinkPreview(url=Const("http://ya.ru")),
state=MainSG.main,
),
)
async def start(message: Message, dialog_manager: DialogManager):
await dialog_manager.start(MainSG.main, mode=StartMode.RESET_STACK)
async def main():
# real main
logging.basicConfig(level=logging.INFO)
logging.getLogger("aiogram_dialog").setLevel(logging.DEBUG)
storage = MemoryStorage()
bot = Bot(token=API_TOKEN)
dp = Dispatcher(storage=storage, events_isolation=SimpleEventIsolation())
dp.include_router(bg_dialog)
dp.include_router(main_menu)
dp.message.register(start, CommandStart())
setup_dialogs(dp)
await dp.start_polling(bot)
if __name__ == "__main__":
asyncio.run(main())