-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1 from dan-sazonov/dev
Release v0.1.0-beta
- Loading branch information
Showing
12 changed files
with
1,877 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
BOT_TOKEN = ... | ||
ADMIN_ID = ... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
import os | ||
from dataclasses import dataclass | ||
|
||
from dotenv import load_dotenv | ||
|
||
|
||
@dataclass | ||
class Bot: | ||
bot_token: str | ||
admin_id: int | ||
|
||
|
||
def _get_settings() -> Bot: | ||
load_dotenv() | ||
return Bot(bot_token=os.getenv("BOT_TOKEN"), | ||
admin_id=int(os.getenv("ADMIN_ID"))) | ||
|
||
|
||
settings = _get_settings() | ||
|
||
DB_FILE = '../data.db' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
from datetime import datetime | ||
|
||
import services | ||
from models import models_list, db, Word, User, prev_state | ||
|
||
|
||
def _add_words() -> None: | ||
with db: | ||
Word.insert_many(services.get_words_objects()).execute() | ||
|
||
|
||
def create_tables() -> None: | ||
has_db = prev_state | ||
|
||
with db: | ||
db.create_tables(models_list) | ||
|
||
if not has_db: | ||
_add_words() | ||
|
||
|
||
def add_user(usr_id: int, usr_date: datetime = None) -> None: | ||
usr_date = usr_date if usr_date else datetime.now() | ||
|
||
usr_obj = User( | ||
tg_id=usr_id, | ||
date_reg=usr_date, | ||
resp_num=0 | ||
) | ||
|
||
query = User.select().where(User.tg_id == usr_id) # change to EAFP way | ||
with db: | ||
if not query.exists(): | ||
usr_obj.save() | ||
|
||
|
||
def update_user(usr_id: int, usr_date: datetime = None) -> None: | ||
usr = User.get(User.tg_id == usr_id) | ||
usr.date_act = usr_date if usr_date else datetime.now() | ||
usr.resp_num += 1 | ||
|
||
with db: | ||
usr.save() | ||
|
||
|
||
def update_show_num(word_id: int) -> None: | ||
word = Word.get(Word.id == word_id) | ||
word.show_num += 1 | ||
|
||
with db: | ||
word.save() | ||
|
||
|
||
def update_voted_word(voted_word_id: int) -> None: | ||
if not voted_word_id: | ||
return | ||
|
||
voted_word = Word.get(Word.id == voted_word_id) | ||
voted_word.vote_num += 1 | ||
|
||
with db: | ||
voted_word.save() | ||
|
||
|
||
def get_words(words_ids: tuple[int, int]) -> tuple[str]: | ||
out = [] | ||
|
||
for i in words_ids: | ||
out.append(str(Word.get(Word.id == i).word)) | ||
|
||
return tuple(out) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
from aiogram import types | ||
|
||
import messages | ||
|
||
_kb_on_start = [ | ||
[ | ||
types.KeyboardButton(text=messages.KB_START_TEXT), | ||
], | ||
] | ||
|
||
keyboard_start = types.ReplyKeyboardMarkup( | ||
keyboard=_kb_on_start, | ||
resize_keyboard=True, | ||
input_field_placeholder=messages.KB_START_PH | ||
) | ||
|
||
_kb_on_voting = [ | ||
[ | ||
types.KeyboardButton(text="1"), | ||
types.KeyboardButton(text="2"), | ||
], | ||
] | ||
|
||
keyboard_voting = types.ReplyKeyboardMarkup( | ||
keyboard=_kb_on_voting, | ||
resize_keyboard=True, | ||
input_field_placeholder=messages.KB_VOTING_PH | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,125 @@ | ||
def print_hi(name): | ||
print(f'Hi, {name}') | ||
import asyncio | ||
import logging | ||
import sys | ||
|
||
from aiogram import Bot, Dispatcher | ||
from aiogram import F | ||
from aiogram.enums import ParseMode | ||
from aiogram.filters import Command | ||
from aiogram.fsm.context import FSMContext | ||
from aiogram.fsm.state import StatesGroup, State | ||
from aiogram.types import Message, BotCommand, BotCommandScopeDefault | ||
from aiogram.utils.markdown import hbold | ||
|
||
if __name__ == '__main__': | ||
print_hi('PyCharm') | ||
import config | ||
import db | ||
import messages | ||
import services | ||
from keyboards import keyboard_voting, keyboard_start | ||
|
||
dp = Dispatcher() | ||
bot = Bot(config.settings.bot_token, parse_mode=ParseMode.HTML) | ||
|
||
|
||
async def _set_commands(target: Bot): | ||
commands = [ | ||
BotCommand( | ||
command='start', | ||
description=messages.COMMAND_START | ||
), | ||
BotCommand( | ||
command='help', | ||
description=messages.COMMAND_HELP | ||
) | ||
] | ||
|
||
await target.set_my_commands(commands, BotCommandScopeDefault()) | ||
|
||
|
||
class Answer(StatesGroup): | ||
prev_id = State() | ||
|
||
|
||
def _new_pair(ids: tuple[int, int]) -> str: | ||
out = db.get_words(ids) | ||
return f'{messages.VOTING_TITLE}\n' \ | ||
f'1. {hbold(out[0])}\n\n' \ | ||
f'2. {hbold(out[1])}' | ||
|
||
|
||
def _parse_state_data(data: dict) -> list[int] | list: | ||
try: | ||
ans = data['prev_id'].split('|') | ||
return [int(i) for i in ans] | ||
except KeyError: | ||
return [] | ||
|
||
|
||
def _get_usr_ans(message: Message, ans: list[int]) -> int: | ||
if message.text.isdecimal() and ans: | ||
index = int(message.text) - 1 | ||
return ans[index] | ||
return 0 | ||
|
||
|
||
def _update_counters(message: Message, ids: list[int, int]) -> None: | ||
if not ids: | ||
return | ||
|
||
for i in ids: | ||
db.update_show_num(i) | ||
db.update_user(message.from_user.id, message.date) | ||
|
||
|
||
@dp.startup() | ||
async def on_startup(): | ||
db.create_tables() | ||
await _set_commands(bot) | ||
await bot.send_message(chat_id=config.settings.admin_id, text=messages.ON_START) | ||
|
||
|
||
@dp.shutdown() | ||
async def on_shutdown(): | ||
await bot.close() | ||
await bot.send_message(chat_id=config.settings.admin_id, text=messages.ON_STOP) | ||
|
||
|
||
@dp.message((F.text == "1") | (F.text == "2") | (F.text == messages.KB_START_TEXT)) | ||
async def polling_handler(message: Message, state: FSMContext) -> None: | ||
data = _parse_state_data(await state.get_data()) | ||
_update_counters(message, data) | ||
voted_id = _get_usr_ans(message, data) | ||
db.update_voted_word(voted_id) | ||
|
||
ids = services.get_words_ids() | ||
await state.update_data(prev_id=f'{ids[0]}|{ids[1]}') | ||
ans = _new_pair(ids) | ||
await message.answer(ans, reply_markup=keyboard_voting) | ||
|
||
|
||
@dp.message(Command("start")) | ||
async def command_start_handler(message: Message) -> None: | ||
""" | ||
This handler receives messages with `/start` command | ||
""" | ||
db.add_user(message.from_user.id, message.date) | ||
await message.answer(messages.START, reply_markup=keyboard_start) | ||
|
||
|
||
@dp.message(Command("help")) | ||
async def command_help_handler(message: Message) -> None: | ||
await message.answer(messages.HELP) | ||
|
||
|
||
@dp.message() | ||
async def unknown_command_handler(message: Message) -> None: | ||
await message.answer(messages.UNKNOWN) | ||
|
||
|
||
async def main() -> None: | ||
await dp.start_polling(bot) | ||
|
||
|
||
if __name__ == "__main__": | ||
logging.basicConfig(level=logging.INFO, stream=sys.stdout) | ||
asyncio.run(main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
from aiogram.utils.markdown import hbold | ||
|
||
START = f"""{hbold('Привет!')}\n | ||
Это текст команды, которая отобразится на старте""" | ||
|
||
HELP = "Текст для команды help" | ||
|
||
UNKNOWN = "Текст, который отобразится, если пользователь отправил неожидаемое сообщение" | ||
|
||
ON_START = "Бот запущен (будет отправлено админу)" | ||
|
||
ON_STOP = "Бот остановлен (будет отправлено админу)" | ||
|
||
KB_VOTING_PH = "Плэйсхолдер для выбора ответа" | ||
|
||
KB_START_PH = "Плэйсхолдер для старта бота" | ||
KB_START_TEXT = "Сообщение для старта бота" | ||
|
||
VOTING_TITLE = "Текст, который будет показан перед двумя вариантами в голосовании" | ||
|
||
COMMAND_START = "Описание команды /start" | ||
COMMAND_HELP = "Описание команды /help" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
import os | ||
|
||
from peewee import Model, PrimaryKeyField, BigIntegerField, DateTimeField, IntegerField, CharField, SqliteDatabase | ||
|
||
import config | ||
|
||
prev_state = os.path.isfile(config.DB_FILE) | ||
db = SqliteDatabase(config.DB_FILE) | ||
|
||
|
||
class BaseModel(Model): | ||
class Meta: | ||
database = db | ||
|
||
|
||
class User(BaseModel): | ||
id = PrimaryKeyField(unique=True) | ||
tg_id = BigIntegerField(unique=True) | ||
date_reg = DateTimeField() | ||
date_act = DateTimeField(null=True) | ||
resp_num = IntegerField() | ||
|
||
class Meta: | ||
db_table = 'users' | ||
order_by = 'resp_num' | ||
|
||
|
||
class Word(BaseModel): | ||
id = PrimaryKeyField(unique=True) | ||
word = CharField(max_length=100) | ||
show_num = IntegerField() | ||
vote_num = IntegerField() | ||
|
||
class Meta: | ||
db_table = 'words' | ||
|
||
|
||
models_list = [User, Word] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
import os.path | ||
import random | ||
|
||
from peewee import fn | ||
|
||
from models import Word | ||
|
||
|
||
def _check_data_file(path: str) -> None: | ||
if not os.path.isfile(path): | ||
print("Can't read data file with words") | ||
exit(1) | ||
|
||
|
||
def _get_words() -> list[str]: | ||
""" | ||
Converts a txt file with words on separate lines to a list of strings. | ||
Empty strings would be deleted | ||
:return: list of str | ||
""" | ||
path = '../words.txt' | ||
_check_data_file(path) | ||
|
||
with open(path, 'r', encoding='utf-8') as f: | ||
file_content = f.read().split('\n') | ||
words = list(filter(None, file_content)) # remove empty strings | ||
|
||
return words | ||
|
||
|
||
def get_words_objects() -> list[dict]: | ||
word_list = _get_words() | ||
words = [] | ||
|
||
for index, _word in enumerate(word_list, start=1): | ||
words.append({ | ||
'id': index, | ||
'word': _word, | ||
'show_num': 0, | ||
'vote_num': 0 | ||
}) | ||
|
||
return words | ||
|
||
|
||
def get_words_ids() -> tuple[int, int]: | ||
max_id = Word.select(fn.MAX(Word.id)).scalar() | ||
id_1 = id_2 = random.randint(1, max_id) | ||
|
||
while id_1 == id_2: | ||
id_2 = random.randint(1, max_id) | ||
|
||
return id_1, id_2 |
Oops, something went wrong.