Files
tg-sd/bot/handlers_admin.py
dinlo b88ccf3b4b Initial commit
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 18:46:09 +08:00

514 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Обработчики админ-панели для управления пользователями."""
import logging
import math
from datetime import datetime, timedelta
from aiogram import Router, F
from aiogram.types import CallbackQuery, Message, InlineKeyboardMarkup, InlineKeyboardButton
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from aiogram.filters import Command
from config import settings
from database.database import db
router = Router()
logger = logging.getLogger(__name__)
# Константы для пагинации
USERS_PER_PAGE = 10
class AddUserState(StatesGroup):
"""Состояния для процесса добавления пользователя."""
waiting_for_user_id = State()
waiting_for_access_type = State() # unlimited, time, generations
waiting_for_time_days = State()
waiting_for_generations_limit = State()
def is_admin(user_id: int) -> bool:
"""Проверить, является ли пользователь админом."""
return user_id == settings.ADMIN_ID
def check_admin(handler):
"""Декоратор для проверки прав админа."""
async def wrapper(callback_or_message, *args, **kwargs):
# Фильтруем лишние kwargs, которые не ожидаются хендлером
# Оставляем только state и другие FSM-аргументы
allowed_kwargs = {'state', 'session', 'middleware_data'}
filtered_kwargs = {k: v for k, v in kwargs.items() if k in allowed_kwargs}
if isinstance(callback_or_message, CallbackQuery):
user_id = callback_or_message.from_user.id
elif isinstance(callback_or_message, Message):
user_id = callback_or_message.from_user.id
else:
return await callback_or_message.answer("⛔ Ошибка определения пользователя.")
if not is_admin(user_id):
if isinstance(callback_or_message, CallbackQuery):
await callback_or_message.answer("⛔ Недостаточно прав.", show_alert=True)
else:
await callback_or_message.answer("⛔ Недостаточно прав.")
return None
return await handler(callback_or_message, *args, **filtered_kwargs)
return wrapper
@router.message(Command("admin"))
@check_admin
async def cmd_admin(message: Message, state: FSMContext):
"""Админ-панель."""
await state.clear()
await show_admin_panel(message)
@router.callback_query(F.data == "admin_panel")
@check_admin
async def admin_panel(callback: CallbackQuery, state: FSMContext):
"""Показать админ-панель."""
await state.clear()
await show_admin_panel(callback.message)
await callback.answer()
async def show_admin_panel(target):
"""Отобразить админ-панель."""
all_users = await db.get_all_users()
active_users = [u for u in all_users if u["is_active"]]
total_gens = sum(u.get("used_generations", 0) for u in all_users)
text = (
"🛡️ <b>Админ-панель</b>\n\n"
f"👥 Всего пользователей: <b>{len(all_users)}</b>\n"
f"✅ Активных: <b>{len(active_users)}</b>\n"
f"🎨 Всего генераций: <b>{total_gens}</b>\n\n"
"<b>Управление:</b>"
)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="➕ Добавить пользователя", callback_data="admin_add_user")],
[InlineKeyboardButton(text="📋 Список пользователей", callback_data="admin_users_list")],
[InlineKeyboardButton(text="🧹 Очистить истёкшие доступы", callback_data="admin_cleanup_expired")],
[InlineKeyboardButton(text="⬅️ Назад", callback_data="main_menu")],
])
if hasattr(target, 'answer'):
try:
await target.edit_text(text, parse_mode="HTML", reply_markup=keyboard)
except Exception:
await target.answer(text, parse_mode="HTML", reply_markup=keyboard)
else:
await target.answer(text, parse_mode="HTML", reply_markup=keyboard)
# --- Добавление пользователя ---
@router.callback_query(F.data == "admin_add_user")
@check_admin
async def admin_add_user(callback: CallbackQuery, state: FSMContext):
"""Начать процесс добавления пользователя."""
await state.set_state(AddUserState.waiting_for_user_id)
await callback.message.edit_text(
"➕ <b>Добавление нового пользователя</b>\n\n"
"Введите <b>Telegram User ID</b> пользователя.\n\n"
"💡 <i>Чтобы узнать ID, пользователь может отправить "
"команду /start боту @userinfobot</i>\n\n"
"Используйте <code>/cancel</code> для отмены.",
parse_mode="HTML",
)
await callback.answer()
@router.message(AddUserState.waiting_for_user_id)
@check_admin
async def process_user_id(message: Message, state: FSMContext):
"""Обработка введённого User ID."""
text = message.text.strip()
if not text.isdigit():
await message.answer("❌ Введите корректный числовой ID.")
return
user_id = int(text)
# Проверяем, не админ ли это
if user_id == settings.ADMIN_ID:
await message.answer("❌ Нельзя добавить самого себя как обычного пользователя.")
return
# Проверяем, существует ли уже пользователь
existing = await db.get_user_by_id(user_id)
if existing:
status_text = "активен" if existing["is_active"] else "неактивен"
await message.answer(
f"⚠️ Пользователь <code>{user_id}</code> уже существует в базе.\n"
f"Статус: {status_text}\n"
f"Его параметры будут обновлены.\n\n"
"Продолжить? (да/нет)"
)
# Сохраняем и переходим к выбору типа доступа
await state.update_data(user_id=user_id, existing_user=True)
else:
await state.update_data(user_id=user_id, existing_user=False)
await state.set_state(AddUserState.waiting_for_access_type)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="♾️ Без ограничений", callback_data="access_unlimited")],
[InlineKeyboardButton(text="⏰ По времени (дни)", callback_data="access_time")],
[InlineKeyboardButton(text="🎨 По количеству генераций", callback_data="access_gens")],
[InlineKeyboardButton(text="❌ Отмена", callback_data="admin_panel")],
])
await message.answer(
"📋 <b>Выберите тип доступа:</b>",
parse_mode="HTML",
reply_markup=keyboard,
)
@router.callback_query(F.data.startswith("access_"), AddUserState.waiting_for_access_type)
@check_admin
async def process_access_type(callback: CallbackQuery, state: FSMContext):
"""Обработка выбора типа доступа."""
access_type = callback.data.replace("access_", "")
if access_type == "unlimited":
await state.update_data(access_type="unlimited", access_expires_at=None, max_generations=None)
await _confirm_and_add_user(callback, state)
elif access_type == "time":
await state.set_state(AddUserState.waiting_for_time_days)
await state.update_data(access_type="time")
await callback.message.edit_text(
"⏰ <b>Введите количество дней</b> доступа:\n\n"
"Например: 7, 30, 90, 365\n"
"Используйте <code>/cancel</code> для отмены.",
parse_mode="HTML",
)
elif access_type == "gens":
await state.set_state(AddUserState.waiting_for_generations_limit)
await state.update_data(access_type="gens")
await callback.message.edit_text(
"🎨 <b>Введите максимальное количество генераций:</b>\n\n"
"Например: 10, 50, 100\n"
"Используйте <code>/cancel</code> для отмены.",
parse_mode="HTML",
)
await callback.answer()
@router.message(AddUserState.waiting_for_time_days)
@check_admin
async def process_time_days(message: Message, state: FSMContext):
"""Обработка введённого количества дней."""
text = message.text.strip()
if not text.isdigit():
await message.answer("❌ Введите корректное число дней.")
return
days = int(text)
if days < 1 or days > 3650:
await message.answer("❌ Количество дней должно быть от 1 до 3650.")
return
expires_at = datetime.now() + timedelta(days=days)
await state.update_data(access_expires_at=expires_at, max_generations=None)
await _confirm_and_add_user(message, state)
@router.message(AddUserState.waiting_for_generations_limit)
@check_admin
async def process_gens_limit(message: Message, state: FSMContext):
"""Обработка введённого лимита генераций."""
text = message.text.strip()
if not text.isdigit():
await message.answer("❌ Введите корректное число.")
return
max_gens = int(text)
if max_gens < 1 or max_gens > 100000:
await message.answer("❌ Лимит должен быть от 1 до 100000.")
return
await state.update_data(max_generations=max_gens, access_expires_at=None)
await _confirm_and_add_user(message, state)
async def _confirm_and_add_user(target, state: FSMContext):
"""Подтвердить и добавить пользователя."""
data = await state.get_data()
user_id = data.get("user_id")
if user_id is None:
await target.answer("❌ Ошибка: не указан User ID.")
await state.clear()
return
access_expires_at = data.get("access_expires_at")
max_generations = data.get("max_generations")
# Добавляем/обновляем пользователя
await db.add_user(
user_id=user_id,
added_by=target.from_user.id,
access_expires_at=access_expires_at,
max_generations=max_generations,
)
# Формируем описание доступа
if access_expires_at:
expires_str = access_expires_at.strftime("%d.%m.%Y %H:%M")
access_desc = f"⏰ До: <b>{expires_str}</b>"
elif max_generations:
access_desc = f"🎨 Лимит: <b>{max_generations}</b> генераций"
else:
access_desc = "♾️ Без ограничений"
text = (
f"✅ <b>Пользователь <code>{user_id}</code> добавлен!</b>\n\n"
f"{access_desc}"
)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text=" Добавить ещё", callback_data="admin_add_user")],
[InlineKeyboardButton(text="📋 Список пользователей", callback_data="admin_users_list")],
[InlineKeyboardButton(text="🛡️ Админ-панель", callback_data="admin_panel")],
])
if hasattr(target, 'edit_text'):
try:
await target.edit_text(text, parse_mode="HTML", reply_markup=keyboard)
except Exception:
await target.answer(text, parse_mode="HTML", reply_markup=keyboard)
else:
await target.answer(text, parse_mode="HTML", reply_markup=keyboard)
await state.clear()
# --- Список пользователей ---
@router.callback_query(F.data == "admin_users_list")
@check_admin
async def admin_users_list(callback: CallbackQuery, state: FSMContext):
"""Показать список пользователей."""
await state.clear()
await _show_users_list(callback.message, 0)
await callback.answer()
@router.callback_query(F.data.startswith("admin_users_page_"))
@check_admin
async def admin_users_page(callback: CallbackQuery, state: FSMContext):
"""Переключение страницы пользователей."""
page = int(callback.data.split("_")[-1])
await _show_users_list(callback.message, page)
await callback.answer()
async def _show_users_list(target, page: int):
"""Отобразить список пользователей."""
all_users = await db.get_all_users()
total_pages = math.ceil(len(all_users) / USERS_PER_PAGE) if all_users else 1
if page >= total_pages:
page = total_pages - 1
start = page * USERS_PER_PAGE
end = start + USERS_PER_PAGE
page_users = all_users[start:end]
if not all_users:
text = "📋 <b>Список пользователей пуст</b>"
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="➕ Добавить пользователя", callback_data="admin_add_user")],
[InlineKeyboardButton(text="🛡️ Админ-панель", callback_data="admin_panel")],
])
else:
text = f"👥 <b>Пользователи ({len(all_users)})</b>\n\n"
for i, user in enumerate(page_users, start + 1):
status = "👑" if user["is_admin"] else ("" if user["is_active"] else "")
# Формируем описание доступа
if user["is_admin"]:
access_info = "админ"
elif user.get("access_expires_at"):
try:
exp = datetime.fromisoformat(user["access_expires_at"])
if exp > datetime.now():
days_left = (exp - datetime.now()).days
access_info = f"до {exp.strftime('%d.%m.%Y')} ({days_left} дн.)"
else:
access_info = "истёк"
except Exception:
access_info = ""
elif user.get("max_generations"):
used = user.get("used_generations", 0)
access_info = f"{used}/{user['max_generations']} ген."
else:
access_info = "без ограничений"
name = user.get("display_name") or user.get("username") or f"ID: {user['user_id']}"
text += f"{i}. {status} <code>{user['user_id']}</code> — {name}\n"
text += f" {access_info}\n\n"
# Навигация
nav_row = []
if page > 0:
nav_row.append(InlineKeyboardButton(text="⬅️", callback_data=f"admin_users_page_{page - 1}"))
nav_row.append(InlineKeyboardButton(text=f"{page + 1}/{total_pages}", callback_data="noop"))
if page < total_pages - 1:
nav_row.append(InlineKeyboardButton(text="➡️", callback_data=f"admin_users_page_{page + 1}"))
kb_buttons = []
for user in page_users:
if not user["is_admin"]:
kb_buttons.append([
InlineKeyboardButton(
text=f"{'' if user['is_active'] else ''} {user['user_id']}",
callback_data=f"admin_user_manage_{user['user_id']}"
)
])
kb_buttons.append(nav_row)
kb_buttons.append([InlineKeyboardButton(text=" Добавить", callback_data="admin_add_user")])
kb_buttons.append([InlineKeyboardButton(text="🛡️ Админ-панель", callback_data="admin_panel")])
keyboard = InlineKeyboardMarkup(inline_keyboard=kb_buttons)
if hasattr(target, 'edit_text'):
try:
await target.edit_text(text, parse_mode="HTML", reply_markup=keyboard)
except Exception:
await target.answer(text, parse_mode="HTML", reply_markup=keyboard)
else:
await target.answer(text, parse_mode="HTML", reply_markup=keyboard)
# --- Управление конкретным пользователем ---
@router.callback_query(F.data.startswith("admin_user_manage_"))
@check_admin
async def admin_user_manage(callback: CallbackQuery):
"""Управление конкретным пользователем."""
user_id = int(callback.data.split("_")[-1])
user = await db.get_user_by_id(user_id)
if not user:
await callback.answer("Пользователь не найден.", show_alert=True)
return
# Формируем информацию
status = "👑 Админ" if user["is_admin"] else ("✅ Активен" if user["is_active"] else "❌ Неактивен")
if user.get("access_expires_at"):
try:
exp = datetime.fromisoformat(user["access_expires_at"])
expires_str = exp.strftime("%d.%m.%Y %H:%M")
except Exception:
expires_str = ""
else:
expires_str = "не указан"
max_gens = user.get("max_generations") or "без ограничений"
used_gens = user.get("used_generations", 0)
added_by = user.get("added_by", "")
added_at = user.get("added_at", "")
name = user.get("display_name") or user.get("username") or f"ID: {user_id}"
text = (
f"👤 <b>{name}</b>\n\n"
f"ID: <code>{user_id}</code>\n"
f"Статус: {status}\n"
f"Добавлен: {added_at} (админом {added_by})\n"
f"Истекает: {expires_str}\n"
f"Генерации: {used_gens}/{max_gens}"
)
kb_buttons = []
if not user["is_admin"]:
if user["is_active"]:
kb_buttons.append([InlineKeyboardButton(text="🚫 Заблокировать", callback_data=f"admin_user_block_{user_id}")])
else:
kb_buttons.append([InlineKeyboardButton(text="✅ Разблокировать", callback_data=f"admin_user_unblock_{user_id}")])
kb_buttons.append([InlineKeyboardButton(text="🔄 Обновить доступ", callback_data=f"admin_user_update_{user_id}")])
kb_buttons.append([InlineKeyboardButton(text="🗑️ Удалить", callback_data=f"admin_user_delete_{user_id}")])
kb_buttons.append([InlineKeyboardButton(text="⬅️ Назад к списку", callback_data="admin_users_list")])
keyboard = InlineKeyboardMarkup(inline_keyboard=kb_buttons)
await callback.message.edit_text(text, parse_mode="HTML", reply_markup=keyboard)
await callback.answer()
@router.callback_query(F.data.startswith("admin_user_block_"))
@check_admin
async def admin_user_block(callback: CallbackQuery):
"""Заблокировать пользователя."""
user_id = int(callback.data.split("_")[-1])
await db.remove_user(user_id)
await callback.answer("🚫 Пользователь заблокирован.")
await admin_user_manage(callback)
@router.callback_query(F.data.startswith("admin_user_unblock_"))
@check_admin
async def admin_user_unblock(callback: CallbackQuery):
"""Разблокировать пользователя."""
user_id = int(callback.data.split("_")[-1])
await db.update_user_access(user_id)
await callback.answer("✅ Пользователь разблокирован.")
await admin_user_manage(callback)
@router.callback_query(F.data.startswith("admin_user_delete_"))
@check_admin
async def admin_user_delete(callback: CallbackQuery):
"""Удалить пользователя (окончательно)."""
user_id = int(callback.data.split("_")[-1])
# В текущей схеме просто деактивируем
await db.remove_user(user_id)
await callback.answer("🗑️ Пользователь удалён.")
await admin_users_list(callback)
@router.callback_query(F.data.startswith("admin_user_update_"))
@check_admin
async def admin_user_update(callback: CallbackQuery, state: FSMContext):
"""Обновить параметры доступа пользователя."""
user_id = int(callback.data.split("_")[-1])
await state.update_data(update_user_id=user_id)
await state.set_state(AddUserState.waiting_for_access_type)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="♾️ Без ограничений", callback_data="access_unlimited")],
[InlineKeyboardButton(text="⏰ По времени (дни)", callback_data="access_time")],
[InlineKeyboardButton(text="🎨 По количеству генераций", callback_data="access_gens")],
[InlineKeyboardButton(text="❌ Отмена", callback_data=f"admin_user_manage_{user_id}")],
])
await callback.message.edit_text(
"🔄 <b>Обновление параметров доступа</b>\n\n"
"Выберите новый тип доступа:",
parse_mode="HTML",
reply_markup=keyboard,
)
await callback.answer()
# --- Очистка истёкших доступов ---
@router.callback_query(F.data == "admin_cleanup_expired")
@check_admin
async def admin_cleanup(callback: CallbackQuery):
"""Очистка истёкших доступов."""
deactivated = await db.deactivate_expired_users()
await callback.answer(f"🧹 Деактивировано пользователей: {deactivated}", show_alert=True)
await admin_panel(callback)