Files

103 lines
4.5 KiB
Python
Raw Permalink Normal View History

2026-05-31 18:46:09 +08:00
"""Мидлварь для проверки доступа пользователей к боту."""
import logging
from aiogram import BaseMiddleware
from aiogram.types import Message, CallbackQuery
from aiogram.filters import Command
from database.database import db
logger = logging.getLogger(__name__)
class AccessCheckMiddleware(BaseMiddleware):
"""Мидлварь проверяет, имеет ли пользователь доступ к боту."""
# Команды, которые всегда разрешены (без проверки доступа)
ALLOWED_COMMANDS = {"start", "menu"}
async def __call__(self, handler, event, data):
# Определяем тип события и извлекаем user_id
user_id = None
username = None
display_name = None
if isinstance(event, Message):
user_id = event.from_user.id
username = event.from_user.username
display_name = event.from_user.full_name
# Проверяем, является ли это разрешённой командой
if isinstance(event, Message) and event.text:
text = event.text.strip().lower()
for cmd in self.ALLOWED_COMMANDS:
# Проверяем /cmd и /cmd@username
if text == f"/{cmd}" or text.startswith(f"/{cmd}@"):
return await handler(event, data)
elif isinstance(event, CallbackQuery):
user_id = event.from_user.id
username = event.from_user.username
display_name = event.from_user.full_name
if user_id is None:
return await handler(event, data)
# Проверяем доступ
access_info = await db.check_user_access(user_id)
if access_info["has_access"]:
# Сохраняем информацию о пользователе в data для обработчиков
data["user_access"] = access_info
return await handler(event, data)
# Доступ запрещён — отправляем сообщение
reason = access_info["reason"]
user_info = access_info["user_info"]
# Формируем сообщение в зависимости от причины
if reason == "not_registered":
text = (
"🔒 <b>Доступ запрещён</b>\n\n"
"У вас нет доступа к этому боту. "
"Обратитесь к администратору для получения доступа."
)
elif reason == "deactivated":
text = (
"🔒 <b>Доступ заблокирован</b>\n\n"
"Ваш доступ к боту был отозван администратором."
)
elif reason == "expired":
expires_str = "не указан"
if user_info and user_info.get("access_expires_at"):
from datetime import datetime
try:
exp_date = datetime.fromisoformat(user_info["access_expires_at"])
expires_str = exp_date.strftime("%d.%m.%Y %H:%M")
except Exception:
pass
text = (
"⏰ <b>Срок доступа истёк</b>\n\n"
f"Ваш доступ к боту истёк ({expires_str}).\n"
"Обратитесь к администратору для продления."
)
elif reason == "generations_limit":
used = user_info.get("used_generations", "?") if user_info else "?"
max_gens = user_info.get("max_generations", "?") if user_info else "?"
text = (
"🎨 <b>Лимит генераций исчерпан</b>\n\n"
f"Вы использовали все {max_gens} генераций.\n"
"Обратитесь к администратору для увеличения лимита."
)
else:
text = "🔒 <b>Доступ запрещён</b>"
if isinstance(event, Message):
await event.answer(text, parse_mode="HTML")
elif isinstance(event, CallbackQuery):
await event.answer("Доступ запрещён", show_alert=True)
try:
await event.message.answer(text, parse_mode="HTML")
except Exception:
pass
return None