"""Мидлварь для проверки доступа пользователей к боту.""" import logging from aiogram import BaseMiddleware from aiogram.types import Message, CallbackQuery from aiogram.filters import Command from database.database import db logger = logging.getLogger(__name__) class AccessCheckMiddleware(BaseMiddleware): """Мидлварь проверяет, имеет ли пользователь доступ к боту.""" # Команды, которые всегда разрешены (без проверки доступа) ALLOWED_COMMANDS = {"start", "menu"} async def __call__(self, handler, event, data): # Определяем тип события и извлекаем user_id user_id = None username = None display_name = None if isinstance(event, Message): user_id = event.from_user.id username = event.from_user.username display_name = event.from_user.full_name # Проверяем, является ли это разрешённой командой if isinstance(event, Message) and event.text: text = event.text.strip().lower() for cmd in self.ALLOWED_COMMANDS: # Проверяем /cmd и /cmd@username if text == f"/{cmd}" or text.startswith(f"/{cmd}@"): return await handler(event, data) elif isinstance(event, CallbackQuery): user_id = event.from_user.id username = event.from_user.username display_name = event.from_user.full_name if user_id is None: return await handler(event, data) # Проверяем доступ access_info = await db.check_user_access(user_id) if access_info["has_access"]: # Сохраняем информацию о пользователе в data для обработчиков data["user_access"] = access_info return await handler(event, data) # Доступ запрещён — отправляем сообщение reason = access_info["reason"] user_info = access_info["user_info"] # Формируем сообщение в зависимости от причины if reason == "not_registered": text = ( "🔒 Доступ запрещён\n\n" "У вас нет доступа к этому боту. " "Обратитесь к администратору для получения доступа." ) elif reason == "deactivated": text = ( "🔒 Доступ заблокирован\n\n" "Ваш доступ к боту был отозван администратором." ) elif reason == "expired": expires_str = "не указан" if user_info and user_info.get("access_expires_at"): from datetime import datetime try: exp_date = datetime.fromisoformat(user_info["access_expires_at"]) expires_str = exp_date.strftime("%d.%m.%Y %H:%M") except Exception: pass text = ( "⏰ Срок доступа истёк\n\n" f"Ваш доступ к боту истёк ({expires_str}).\n" "Обратитесь к администратору для продления." ) elif reason == "generations_limit": used = user_info.get("used_generations", "?") if user_info else "?" max_gens = user_info.get("max_generations", "?") if user_info else "?" text = ( "🎨 Лимит генераций исчерпан\n\n" f"Вы использовали все {max_gens} генераций.\n" "Обратитесь к администратору для увеличения лимита." ) else: text = "🔒 Доступ запрещён" if isinstance(event, Message): await event.answer(text, parse_mode="HTML") elif isinstance(event, CallbackQuery): await event.answer("Доступ запрещён", show_alert=True) try: await event.message.answer(text, parse_mode="HTML") except Exception: pass return None